Files
life-echo/api/app/features/user/service.py
Sully 53e0065e3e refactor(api): TOML 配置 SSOT、统一错误契约、Auth/事务加固与可观测性 (#33)
配置 SSOT(TOML + .env)
统一错误契约
Auth 与事务边界
Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client
可观测性(OpenTelemetry + LGTM)
2026-05-22 13:44:50 +08:00

201 lines
7.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from datetime import timedelta
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.cos_url_keys import avatar_url_for_api_response
from app.core.db import transactional, utc_now
from app.core.errors import BadRequestError, NotFoundError
from app.core.logging import get_logger
from app.core.redis import redis_service
from app.core.task_tracker import task_tracker
from app.features.user import repo
from app.features.user.models import User
from app.features.user.schemas import (
PURGE_USER_DATA_CONFIRMATION,
PurgeUserDataResponse,
TestSubscriptionResponse,
UpdateUserProfileRequest,
UserProfileResponse,
)
from app.ports.storage import ObjectStorage
logger = get_logger(__name__)
def _coerce_language(raw) -> str:
"""Normalize a stored language token to the 'zh' / 'en' Literal."""
s = str(raw).strip().lower() if isinstance(raw, str) else ""
return s if s in ("zh", "en") else "zh"
def _user_to_profile(user: User) -> UserProfileResponse:
return UserProfileResponse(
id=user.id,
phone=user.phone,
email=user.email,
nickname=user.nickname,
avatar_url=avatar_url_for_api_response(user.avatar_url),
subscription_type=user.subscription_type,
created_at=user.created_at.isoformat(),
birth_year=user.birth_year,
birth_place=user.birth_place,
grew_up_place=user.grew_up_place,
occupation=user.occupation,
language_preference=_coerce_language(
getattr(user, "language_preference", "zh")
),
)
class UserService:
def __init__(self, db: AsyncSession):
self._db = db
async def get_by_id(self, user_id: str) -> User | None:
return await repo.get_user_by_id(user_id, self._db)
async def update_profile(
self, user_id: str, body: UpdateUserProfileRequest
) -> UserProfileResponse:
user = await repo.get_user_by_id(user_id, self._db)
if not user:
raise NotFoundError("用户不存在")
async with transactional(self._db):
for field in ("birth_year", "birth_place", "grew_up_place", "occupation"):
if field in body.model_fields_set:
setattr(user, field, getattr(body, field))
await self._db.refresh(user)
return _user_to_profile(user)
async def toggle_test_subscription(
self, user_id: str, action: str, plan_id: str
) -> TestSubscriptionResponse:
user = await repo.get_user_by_id(user_id, self._db)
if not user:
raise NotFoundError("用户不存在")
now = utc_now()
async with transactional(self._db):
if action == "activate":
user.subscription_type = plan_id
user.subscription_expires_at = now + timedelta(days=365)
subscription_type = plan_id
message = f"已开启测试订阅:{plan_id}"
else:
user.subscription_type = "free"
user.subscription_expires_at = None
subscription_type = "free"
message = "已关闭测试订阅,恢复免费体验版"
return TestSubscriptionResponse(
success=True,
message=message,
subscription_type=subscription_type,
)
async def purge_all_user_data(
self,
user_id: str,
*,
confirmation: str,
object_storage: ObjectStorage | None = None,
) -> PurgeUserDataResponse:
"""物理删除该用户业务数据(保留 users 行与登录字段);并清空出生年/出生地等档案字段;提交后再清 Redis 等。"""
if confirmation != PURGE_USER_DATA_CONFIRMATION:
raise BadRequestError("确认文案不正确,请按提示完整输入口令")
user = await repo.get_user_by_id(user_id, self._db)
if not user:
raise NotFoundError("用户不存在")
logger.info("用户数据清空开始 user_id={}", user_id)
storage_keys = await repo.collect_object_storage_keys_before_purge(
self._db, user_id
)
conv_ids, chapter_ids, story_ids = await repo.collect_purge_context(
self._db, user_id
)
logger.debug(
"清空前收集 user_id={} storage_keys={} conversations={} chapters={} stories={}",
user_id,
len(storage_keys),
len(conv_ids),
len(chapter_ids),
len(story_ids),
)
async with transactional(self._db):
await repo.purge_user_related_rows(self._db, user_id)
await repo.clear_user_demographics(self._db, user_id)
await repo.clear_user_avatar_url(self._db, user_id)
logger.info("用户数据 DB 行已删除、档案字段已清空并提交 user_id={}", user_id)
if object_storage and storage_keys:
logger.debug(
"对象存储尝试删除 user_id={} key_count={}", user_id, len(storage_keys)
)
for key in storage_keys:
try:
object_storage.delete(key)
except Exception as e:
logger.warning(
"对象存储删除失败 user_id={} key={} err={}", user_id, key, e
)
elif storage_keys and not object_storage:
logger.warning(
"用户数据清空:未注入 object_storage跳过 {} 个对象存储 key user_id={}",
len(storage_keys),
user_id,
)
for cid in conv_ids:
try:
await redis_service.clear_conversation_history(cid)
except Exception as e:
logger.warning(
"清空会话 Redis 历史失败 conversation_id={} err={}", cid, e
)
if conv_ids:
logger.debug(
"已请求清空 Redis 会话历史 user_id={} conversation_count={}",
user_id,
len(conv_ids),
)
try:
await task_tracker.clear_user_tasks(user_id)
logger.debug("用户任务追踪已清空 user_id={}", user_id)
except Exception as e:
logger.warning("清空用户任务追踪失败 user_id={} err={}", user_id, e)
try:
await redis_service.delete_keys_matching_pattern(
f"lock:chapter:{user_id}:*"
)
for ch_id in chapter_ids:
await redis_service.delete_keys_matching_pattern(
f"lock:chapter-images:{ch_id}"
)
for sid in story_ids:
await redis_service.delete_keys_matching_pattern(
f"lock:story-image:{sid}"
)
logger.debug(
"Redis 分布式锁 key 已清理 user_id={} chapter_count={} story_count={}",
user_id,
len(chapter_ids),
len(story_ids),
)
except Exception as e:
logger.warning("清理 Redis 锁 key 失败 user_id={} err={}", user_id, e)
logger.info("用户数据清空完成 user_id={}", user_id)
return PurgeUserDataResponse(
success=True,
message=(
"已清空该账号下的对话、记忆、故事、章节、订单等业务数据,并已尝试删除关联的对象存储文件;"
"个人档案中的出生年份、出生地、成长地、职业等已清空;"
"所有登录会话已失效,请重新登录"
),
)