配置 SSOT(TOML + .env) 统一错误契约 Auth 与事务边界 Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client 可观测性(OpenTelemetry + LGTM)
201 lines
7.5 KiB
Python
201 lines
7.5 KiB
Python
from datetime import timedelta
|
||
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from app.core.cos_url_keys import avatar_url_for_api_response
|
||
from app.core.db import transactional, utc_now
|
||
from app.core.errors import BadRequestError, NotFoundError
|
||
from app.core.logging import get_logger
|
||
from app.core.redis import redis_service
|
||
from app.core.task_tracker import task_tracker
|
||
from app.features.user import repo
|
||
from app.features.user.models import User
|
||
from app.features.user.schemas import (
|
||
PURGE_USER_DATA_CONFIRMATION,
|
||
PurgeUserDataResponse,
|
||
TestSubscriptionResponse,
|
||
UpdateUserProfileRequest,
|
||
UserProfileResponse,
|
||
)
|
||
from app.ports.storage import ObjectStorage
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
def _coerce_language(raw) -> str:
|
||
"""Normalize a stored language token to the 'zh' / 'en' Literal."""
|
||
s = str(raw).strip().lower() if isinstance(raw, str) else ""
|
||
return s if s in ("zh", "en") else "zh"
|
||
|
||
|
||
def _user_to_profile(user: User) -> UserProfileResponse:
|
||
return UserProfileResponse(
|
||
id=user.id,
|
||
phone=user.phone,
|
||
email=user.email,
|
||
nickname=user.nickname,
|
||
avatar_url=avatar_url_for_api_response(user.avatar_url),
|
||
subscription_type=user.subscription_type,
|
||
created_at=user.created_at.isoformat(),
|
||
birth_year=user.birth_year,
|
||
birth_place=user.birth_place,
|
||
grew_up_place=user.grew_up_place,
|
||
occupation=user.occupation,
|
||
language_preference=_coerce_language(
|
||
getattr(user, "language_preference", "zh")
|
||
),
|
||
)
|
||
|
||
|
||
class UserService:
|
||
def __init__(self, db: AsyncSession):
|
||
self._db = db
|
||
|
||
async def get_by_id(self, user_id: str) -> User | None:
|
||
return await repo.get_user_by_id(user_id, self._db)
|
||
|
||
async def update_profile(
|
||
self, user_id: str, body: UpdateUserProfileRequest
|
||
) -> UserProfileResponse:
|
||
user = await repo.get_user_by_id(user_id, self._db)
|
||
if not user:
|
||
raise NotFoundError("用户不存在")
|
||
async with transactional(self._db):
|
||
for field in ("birth_year", "birth_place", "grew_up_place", "occupation"):
|
||
if field in body.model_fields_set:
|
||
setattr(user, field, getattr(body, field))
|
||
await self._db.refresh(user)
|
||
return _user_to_profile(user)
|
||
|
||
async def toggle_test_subscription(
|
||
self, user_id: str, action: str, plan_id: str
|
||
) -> TestSubscriptionResponse:
|
||
user = await repo.get_user_by_id(user_id, self._db)
|
||
if not user:
|
||
raise NotFoundError("用户不存在")
|
||
now = utc_now()
|
||
async with transactional(self._db):
|
||
if action == "activate":
|
||
user.subscription_type = plan_id
|
||
user.subscription_expires_at = now + timedelta(days=365)
|
||
subscription_type = plan_id
|
||
message = f"已开启测试订阅:{plan_id}"
|
||
else:
|
||
user.subscription_type = "free"
|
||
user.subscription_expires_at = None
|
||
subscription_type = "free"
|
||
message = "已关闭测试订阅,恢复免费体验版"
|
||
return TestSubscriptionResponse(
|
||
success=True,
|
||
message=message,
|
||
subscription_type=subscription_type,
|
||
)
|
||
|
||
async def purge_all_user_data(
|
||
self,
|
||
user_id: str,
|
||
*,
|
||
confirmation: str,
|
||
object_storage: ObjectStorage | None = None,
|
||
) -> PurgeUserDataResponse:
|
||
"""物理删除该用户业务数据(保留 users 行与登录字段);并清空出生年/出生地等档案字段;提交后再清 Redis 等。"""
|
||
if confirmation != PURGE_USER_DATA_CONFIRMATION:
|
||
raise BadRequestError("确认文案不正确,请按提示完整输入口令")
|
||
|
||
user = await repo.get_user_by_id(user_id, self._db)
|
||
if not user:
|
||
raise NotFoundError("用户不存在")
|
||
|
||
logger.info("用户数据清空开始 user_id={}", user_id)
|
||
|
||
storage_keys = await repo.collect_object_storage_keys_before_purge(
|
||
self._db, user_id
|
||
)
|
||
conv_ids, chapter_ids, story_ids = await repo.collect_purge_context(
|
||
self._db, user_id
|
||
)
|
||
logger.debug(
|
||
"清空前收集 user_id={} storage_keys={} conversations={} chapters={} stories={}",
|
||
user_id,
|
||
len(storage_keys),
|
||
len(conv_ids),
|
||
len(chapter_ids),
|
||
len(story_ids),
|
||
)
|
||
|
||
async with transactional(self._db):
|
||
await repo.purge_user_related_rows(self._db, user_id)
|
||
await repo.clear_user_demographics(self._db, user_id)
|
||
await repo.clear_user_avatar_url(self._db, user_id)
|
||
logger.info("用户数据 DB 行已删除、档案字段已清空并提交 user_id={}", user_id)
|
||
|
||
if object_storage and storage_keys:
|
||
logger.debug(
|
||
"对象存储尝试删除 user_id={} key_count={}", user_id, len(storage_keys)
|
||
)
|
||
for key in storage_keys:
|
||
try:
|
||
object_storage.delete(key)
|
||
except Exception as e:
|
||
logger.warning(
|
||
"对象存储删除失败 user_id={} key={} err={}", user_id, key, e
|
||
)
|
||
elif storage_keys and not object_storage:
|
||
logger.warning(
|
||
"用户数据清空:未注入 object_storage,跳过 {} 个对象存储 key user_id={}",
|
||
len(storage_keys),
|
||
user_id,
|
||
)
|
||
|
||
for cid in conv_ids:
|
||
try:
|
||
await redis_service.clear_conversation_history(cid)
|
||
except Exception as e:
|
||
logger.warning(
|
||
"清空会话 Redis 历史失败 conversation_id={} err={}", cid, e
|
||
)
|
||
if conv_ids:
|
||
logger.debug(
|
||
"已请求清空 Redis 会话历史 user_id={} conversation_count={}",
|
||
user_id,
|
||
len(conv_ids),
|
||
)
|
||
|
||
try:
|
||
await task_tracker.clear_user_tasks(user_id)
|
||
logger.debug("用户任务追踪已清空 user_id={}", user_id)
|
||
except Exception as e:
|
||
logger.warning("清空用户任务追踪失败 user_id={} err={}", user_id, e)
|
||
|
||
try:
|
||
await redis_service.delete_keys_matching_pattern(
|
||
f"lock:chapter:{user_id}:*"
|
||
)
|
||
for ch_id in chapter_ids:
|
||
await redis_service.delete_keys_matching_pattern(
|
||
f"lock:chapter-images:{ch_id}"
|
||
)
|
||
for sid in story_ids:
|
||
await redis_service.delete_keys_matching_pattern(
|
||
f"lock:story-image:{sid}"
|
||
)
|
||
logger.debug(
|
||
"Redis 分布式锁 key 已清理 user_id={} chapter_count={} story_count={}",
|
||
user_id,
|
||
len(chapter_ids),
|
||
len(story_ids),
|
||
)
|
||
except Exception as e:
|
||
logger.warning("清理 Redis 锁 key 失败 user_id={} err={}", user_id, e)
|
||
|
||
logger.info("用户数据清空完成 user_id={}", user_id)
|
||
|
||
return PurgeUserDataResponse(
|
||
success=True,
|
||
message=(
|
||
"已清空该账号下的对话、记忆、故事、章节、订单等业务数据,并已尝试删除关联的对象存储文件;"
|
||
"个人档案中的出生年份、出生地、成长地、职业等已清空;"
|
||
"所有登录会话已失效,请重新登录"
|
||
),
|
||
)
|