Files
life-echo/api/app/features/user/service.py
Kevin d0c26242db fix(conversation): 离屏不丢回复、列表预热 WS 与非阻塞进入聊天
- 后端:文本/转写后 AI 生成改为独立任务,避免断连取消整轮;按需 TTS 等与 WS 改动
- 前端:RealtimeSession 重绑 UI 时恢复流式 buffer;列表 onPressIn/挂载预热、已有会话立即 push
- 同步会话相关类型、i18n、测试与 env/资源等累计改动

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-08 17:28:31 +08:00

187 lines
6.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from datetime import timedelta
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.db import utc_now
from app.core.logging import get_logger
from app.core.redis import redis_service
from app.core.task_tracker import task_tracker
from app.features.user import repo
from app.features.user.models import User
from app.features.user.schemas import (
PURGE_USER_DATA_CONFIRMATION,
PurgeUserDataResponse,
TestSubscriptionResponse,
UpdateUserProfileRequest,
UserProfileResponse,
)
from app.ports.storage import ObjectStorage
logger = get_logger(__name__)
def _user_to_profile(user: User) -> UserProfileResponse:
return UserProfileResponse(
id=user.id,
phone=user.phone,
email=user.email,
nickname=user.nickname,
avatar_url=user.avatar_url,
subscription_type=user.subscription_type,
created_at=user.created_at.isoformat(),
birth_year=user.birth_year,
birth_place=user.birth_place,
grew_up_place=user.grew_up_place,
occupation=user.occupation,
)
class UserService:
def __init__(self, db: AsyncSession):
self._db = db
async def update_profile(
self, user_id: str, body: UpdateUserProfileRequest
) -> UserProfileResponse:
user = await repo.get_user_by_id(user_id, self._db)
if not user:
raise ValueError("用户不存在")
for field in ("birth_year", "birth_place", "grew_up_place", "occupation"):
if field in body.model_fields_set:
setattr(user, field, getattr(body, field))
await self._db.commit()
await self._db.refresh(user)
return _user_to_profile(user)
async def toggle_test_subscription(
self, user_id: str, action: str, plan_id: str
) -> TestSubscriptionResponse:
user = await repo.get_user_by_id(user_id, self._db)
if not user:
raise ValueError("用户不存在")
now = utc_now()
if action == "activate":
user.subscription_type = plan_id
user.subscription_expires_at = now + timedelta(days=365)
await self._db.commit()
return TestSubscriptionResponse(
success=True,
message=f"已开启测试订阅:{plan_id}",
subscription_type=plan_id,
)
user.subscription_type = "free"
user.subscription_expires_at = None
await self._db.commit()
return TestSubscriptionResponse(
success=True,
message="已关闭测试订阅,恢复免费体验版",
subscription_type="free",
)
async def purge_all_user_data(
self,
user_id: str,
*,
confirmation: str,
object_storage: ObjectStorage | None = None,
) -> PurgeUserDataResponse:
"""物理删除该用户业务数据(保留 users 行与登录字段);并清空出生年/出生地等档案字段;提交后再清 Redis 等。"""
if confirmation != PURGE_USER_DATA_CONFIRMATION:
raise ValueError("确认文案不正确,请按提示完整输入口令")
user = await repo.get_user_by_id(user_id, self._db)
if not user:
raise ValueError("用户不存在")
logger.info("用户数据清空开始 user_id={}", user_id)
storage_keys = await repo.collect_object_storage_keys_before_purge(
self._db, user_id
)
conv_ids, chapter_ids, story_ids = await repo.collect_purge_context(
self._db, user_id
)
logger.debug(
"清空前收集 user_id={} storage_keys={} conversations={} chapters={} stories={}",
user_id,
len(storage_keys),
len(conv_ids),
len(chapter_ids),
len(story_ids),
)
await repo.purge_user_related_rows(self._db, user_id)
await repo.clear_user_demographics(self._db, user_id)
await self._db.commit()
logger.info("用户数据 DB 行已删除、档案字段已清空并提交 user_id={}", user_id)
if object_storage and storage_keys:
logger.debug(
"对象存储尝试删除 user_id={} key_count={}", user_id, len(storage_keys)
)
for key in storage_keys:
try:
object_storage.delete(key)
except Exception as e:
logger.warning(
"对象存储删除失败 user_id={} key={} err={}", user_id, key, e
)
elif storage_keys and not object_storage:
logger.warning(
"用户数据清空:未注入 object_storage跳过 {} 个对象存储 key user_id={}",
len(storage_keys),
user_id,
)
for cid in conv_ids:
try:
await redis_service.clear_conversation_history(cid)
except Exception as e:
logger.warning(
"清空会话 Redis 历史失败 conversation_id={} err={}", cid, e
)
if conv_ids:
logger.debug(
"已请求清空 Redis 会话历史 user_id={} conversation_count={}",
user_id,
len(conv_ids),
)
try:
await task_tracker.clear_user_tasks(user_id)
logger.debug("用户任务追踪已清空 user_id={}", user_id)
except Exception as e:
logger.warning("清空用户任务追踪失败 user_id={} err={}", user_id, e)
try:
await redis_service.delete_keys_matching_pattern(
f"lock:chapter:{user_id}:*"
)
for ch_id in chapter_ids:
await redis_service.delete_keys_matching_pattern(
f"lock:chapter-images:{ch_id}"
)
for sid in story_ids:
await redis_service.delete_keys_matching_pattern(
f"lock:story-image:{sid}"
)
logger.debug(
"Redis 分布式锁 key 已清理 user_id={} chapter_count={} story_count={}",
user_id,
len(chapter_ids),
len(story_ids),
)
except Exception as e:
logger.warning("清理 Redis 锁 key 失败 user_id={} err={}", user_id, e)
logger.info("用户数据清空完成 user_id={}", user_id)
return PurgeUserDataResponse(
success=True,
message=(
"已清空该账号下的对话、记忆、故事、章节、订单等业务数据,并已尝试删除关联的对象存储文件;"
"个人档案中的出生年份、出生地、成长地、职业等已清空;"
"所有登录会话已失效,请重新登录"
),
)