""" MemoryService — conversation / memoir 的统一门面。 - ingest_transcript: transcript -> memory_sources, chunks, embedding, FTS - retrieve: 委托 HybridRetriever 返回 evidence bundle(FTS + 可选向量 RRF) Celery 侧使用 `ingest_transcript_sync` + `retrieve_evidence_sync`,与异步路径差异见 `api/docs/memory-retrieval.md`。 """ from sqlalchemy.ext.asyncio import AsyncSession from app.features.memory.chunker import chunk_transcript from app.features.memory.repo import ( create_chunk, create_source, update_chunk_embedding, update_chunk_fts, ) from app.ports.embedding import EmbeddingProvider class MemoryService: def __init__( self, db: AsyncSession, *, embedding_provider: EmbeddingProvider | None = None, ): self._db = db self._embedding = embedding_provider async def ingest_transcript( self, user_id: str, conversation_id: str, transcript: str ) -> str: """ Ingest conversation transcript into memory. Creates MemorySource, chunks, populates embedding + FTS. Returns source_id. """ if not transcript or not transcript.strip(): raise ValueError("transcript cannot be empty") source = await create_source( self._db, user_id=user_id, source_type="transcript", raw_text=transcript.strip(), conversation_id=conversation_id, ) chunks_text = chunk_transcript(transcript.strip()) chunk_records = [] for i, content in enumerate(chunks_text): chunk = await create_chunk( self._db, source_id=source.id, user_id=user_id, content=content, chunk_index=i, ) chunk_records.append((chunk.id, content)) await self._db.flush() # FTS: populate content_tsv for chunk_id, _ in chunk_records: await update_chunk_fts(self._db, chunk_id) # Embedding: 若有 provider 则写入 if self._embedding and chunk_records: texts = [c for _, c in chunk_records] embeddings = await self._embedding.embed_texts(texts) for (chunk_id, _), emb in zip(chunk_records, embeddings): if emb: await update_chunk_embedding(self._db, chunk_id, emb) await self._db.commit() return source.id async def retrieve(self, user_id: str, query: str, *, top_k: int = 10) -> dict: """Retrieve relevant evidence. 委托 HybridRetriever。""" from app.features.memory.retriever import HybridRetriever retriever = HybridRetriever(self._db, embedding_provider=self._embedding) return await retriever.retrieve(user_id=user_id, query=query, top_k=top_k) def ingest_transcript_sync( session, user_id: str, conversation_id: str, transcript: str, ) -> str: """ Sync transcript ingest for Celery tasks. Creates source + chunks + FTS. Skips embedding (async). Returns source_id. """ from app.features.memory.chunker import chunk_transcript from app.features.memory.repo import ( create_chunk_sync, create_source_sync, update_chunk_fts_sync, ) if not transcript or not transcript.strip(): raise ValueError("transcript cannot be empty") source = create_source_sync( session, user_id=user_id, source_type="transcript", raw_text=transcript.strip(), conversation_id=conversation_id, ) session.flush() chunks_text = chunk_transcript(transcript.strip()) for i, content in enumerate(chunks_text): chunk = create_chunk_sync( session, source_id=source.id, user_id=user_id, content=content, chunk_index=i, ) session.flush() update_chunk_fts_sync(session, chunk.id) session.commit() return source.id