"""User 数据访问:查询与「清空用户业务数据」批量删除。""" from sqlalchemy import delete, select, update from sqlalchemy.ext.asyncio import AsyncSession from app.core.cos_url_keys import ( collect_cos_keys_from_tts_url_list, extract_cos_object_key_if_owned, ) from app.features.asset.models import Asset from app.features.auth.models import RefreshToken from app.features.conversation.models import Conversation, ConversationMessage, Segment from app.features.memoir.models import ( Book, Chapter, ChapterCoverIntent, MemoirImage, MemoirState, ) from app.features.memory.models import ( MemoryChunk, MemoryCurationAction, MemoryFact, MemorySource, MemorySummary, TimelineEvent, ) from app.features.payment.models import Order from app.features.story.models import Story, StoryImageIntent from app.features.user.models import User async def get_user_by_id(user_id: str, db: AsyncSession) -> User | None: return await db.get(User, user_id) async def clear_user_demographics(db: AsyncSession, user_id: str) -> None: """ 清空 users 表上由访谈收集的档案字段(不删账号行;手机号、密码等登录字段保留)。 聊天助手会从这些字段注入「出生地」等上下文,清空后才会真正「忘记」。 """ await db.execute( update(User) .where(User.id == user_id) .values( birth_year=None, birth_place=None, grew_up_place=None, occupation=None, ) ) async def collect_purge_context( db: AsyncSession, user_id: str ) -> tuple[list[str], list[str], list[str]]: """在删除前收集 Redis / 分布式锁相关 id。""" conv_rows = await db.execute( select(Conversation.id).where(Conversation.user_id == user_id) ) conv_ids = list(conv_rows.scalars().all()) ch_rows = await db.execute(select(Chapter.id).where(Chapter.user_id == user_id)) chapter_ids = list(ch_rows.scalars().all()) st_rows = await db.execute(select(Story.id).where(Story.user_id == user_id)) story_ids = list(st_rows.scalars().all()) return conv_ids, chapter_ids, story_ids async def related_asset_ids(db: AsyncSession, user_id: str) -> set[str]: """用户故事插图 / 章节封面意图引用的 Asset.id。""" asset_rows = await db.execute( select(StoryImageIntent.asset_id) .join(Story, StoryImageIntent.story_id == Story.id) .where(Story.user_id == user_id, StoryImageIntent.asset_id.isnot(None)) ) from_story = {r for r in asset_rows.scalars().all() if r} asset_rows_ch = await db.execute( select(ChapterCoverIntent.asset_id) .join(Chapter, ChapterCoverIntent.chapter_id == Chapter.id) .where(Chapter.user_id == user_id, ChapterCoverIntent.asset_id.isnot(None)) ) return from_story | {r for r in asset_rows_ch.scalars().all() if r} async def collect_object_storage_keys_before_purge( db: AsyncSession, user_id: str ) -> list[str]: """删除行前收集 COS 等对象存储 key(尽力删除孤儿对象)。""" keys: set[str] = set() r = await db.execute( select(MemorySource.storage_key).where( MemorySource.user_id == user_id, MemorySource.storage_key.isnot(None), ) ) keys.update(x for x in r.scalars().all() if x) r2 = await db.execute( select(MemoirImage.storage_key) .join(Chapter, MemoirImage.chapter_id == Chapter.id) .where(Chapter.user_id == user_id, MemoirImage.storage_key.isnot(None)) ) keys.update(x for x in r2.scalars().all() if x) asset_ids = await related_asset_ids(db, user_id) if asset_ids: r3 = await db.execute( select(Asset.storage_key).where( Asset.id.in_(asset_ids), Asset.storage_key.isnot(None), ) ) keys.update(x for x in r3.scalars().all() if x) seg_rows = await db.execute( select(Segment.audio_url, Segment.tts_audio_urls) .join(Conversation, Segment.conversation_id == Conversation.id) .where(Conversation.user_id == user_id) ) for audio_url, tts_urls in seg_rows.all(): k = extract_cos_object_key_if_owned(audio_url) if k: keys.add(k) keys |= collect_cos_keys_from_tts_url_list( tts_urls if isinstance(tts_urls, list) else None ) return sorted(keys) async def purge_user_related_rows(db: AsyncSession, user_id: str) -> None: """ 物理删除当前用户除账号(users 行)外的业务数据。 顺序按外键依赖:memory → 资源意图关联的 assets → story/chapter/book → conversation_messages(引用 segments)→ segments → conversations → … """ await db.execute(delete(MemoryFact).where(MemoryFact.user_id == user_id)) await db.execute(delete(MemoryChunk).where(MemoryChunk.user_id == user_id)) await db.execute(delete(MemorySource).where(MemorySource.user_id == user_id)) await db.execute(delete(MemorySummary).where(MemorySummary.user_id == user_id)) await db.execute(delete(TimelineEvent).where(TimelineEvent.user_id == user_id)) await db.execute( delete(MemoryCurationAction).where(MemoryCurationAction.user_id == user_id) ) asset_ids = await related_asset_ids(db, user_id) if asset_ids: await db.execute(delete(Asset).where(Asset.id.in_(asset_ids))) await db.execute(delete(Story).where(Story.user_id == user_id)) await db.execute(delete(Chapter).where(Chapter.user_id == user_id)) await db.execute(delete(Book).where(Book.user_id == user_id)) await db.execute( delete(ConversationMessage).where( ConversationMessage.conversation_id.in_( select(Conversation.id).where(Conversation.user_id == user_id) ) ) ) await db.execute( delete(Segment).where( Segment.conversation_id.in_( select(Conversation.id).where(Conversation.user_id == user_id) ) ) ) await db.execute(delete(Conversation).where(Conversation.user_id == user_id)) await db.execute(delete(MemoirState).where(MemoirState.user_id == user_id)) await db.execute(delete(Order).where(Order.user_id == user_id)) await db.execute(delete(RefreshToken).where(RefreshToken.user_id == user_id))