""" MemoryService — conversation / memoir 的统一门面。 - ingest_transcript: transcript -> memory_sources, chunks, embedding - ingest 成功后:向 ``memory_idle`` 队列派发 LLM 富化(见 ``schedule_memory_enrichment``),不阻塞请求 - retrieve: 委托 HybridRetriever 返回 evidence bundle(向量 chunks) Celery 侧使用 `ingest_transcript_sync` + `retrieve_evidence_sync`,与异步路径对齐见 `api/docs/memory-retrieval.md`。 """ from sqlalchemy.ext.asyncio import AsyncSession from app.core.logging import get_logger from app.features.conversation.lineage_schemas import ( primary_user_message_id_from_lineage, ) from app.features.memory.enrichment_scheduler import MemoryEnrichmentScheduler from app.features.memory.ingest_service import MemoryIngestService from app.features.memory.repo import ( create_curation_action, set_chunk_excluded, set_memory_fact_status, ) from app.features.memory.retrieval_service import MemoryRetrievalService from app.features.memory.schemas import EvidenceBundle from app.ports.embedding import EmbeddingProvider logger = get_logger(__name__) class MemoryService: def __init__( self, db: AsyncSession, *, embedding_provider: EmbeddingProvider | None = None, ): self._db = db self._embedding = embedding_provider async def ingest_transcript( self, user_id: str, conversation_id: str, transcript: str, *, lineage_json: dict | None = None, ) -> str: """ Ingest conversation transcript into memory. Creates MemorySource, chunks, populates embedding. Returns source_id. """ service = MemoryIngestService(self._db, embedding_provider=self._embedding) return await service.ingest_transcript( user_id, conversation_id, transcript, lineage_json=lineage_json, ) async def retrieve( self, user_id: str, query: str, *, top_k: int = 10 ) -> EvidenceBundle: """Retrieve relevant evidence. 委托 HybridRetriever。""" service = MemoryRetrievalService(self._db, embedding_provider=self._embedding) return await service.retrieve(user_id, query, top_k=top_k) async def exclude_chunk( self, user_id: str, chunk_id: str, *, reason: str = "" ) -> bool: ok = await set_chunk_excluded(self._db, chunk_id, user_id, True) if not ok: return False await create_curation_action( self._db, user_id=user_id, action_type="exclude", target_type="chunk", target_id=chunk_id, details={"reason": reason} if reason else None, ) await self._db.commit() return True async def restore_chunk(self, user_id: str, chunk_id: str) -> bool: ok = await set_chunk_excluded(self._db, chunk_id, user_id, False) if not ok: return False await create_curation_action( self._db, user_id=user_id, action_type="restore", target_type="chunk", target_id=chunk_id, details=None, ) await self._db.commit() return True async def confirm_fact(self, user_id: str, fact_id: str) -> bool: ok = await set_memory_fact_status(self._db, fact_id, user_id, "confirmed") if not ok: return False await create_curation_action( self._db, user_id=user_id, action_type="confirm", target_type="fact", target_id=fact_id, details=None, ) await self._db.commit() return True async def reject_fact( self, user_id: str, fact_id: str, *, reason: str = "" ) -> bool: ok = await set_memory_fact_status(self._db, fact_id, user_id, "rejected") if not ok: return False await create_curation_action( self._db, user_id=user_id, action_type="reject", target_type="fact", target_id=fact_id, details={"reason": reason} if reason else None, ) await self._db.commit() return ok def ingest_transcript_sync( session, user_id: str, conversation_id: str, transcript: str, *, lineage_json: dict | None = None, ) -> str: """ Sync transcript ingest for Celery tasks. Creates source + chunks, and best-effort populates embeddings. Returns source_id. """ from app.core.dependencies import get_embedding_provider from app.features.memory.chunker import chunk_transcript from app.features.memory.repo import ( create_chunk_sync, create_source_sync, update_chunk_embedding_sync, ) if not transcript or not transcript.strip(): raise ValueError("transcript cannot be empty") primary_mid = ( primary_user_message_id_from_lineage(lineage_json) if lineage_json else None ) source = create_source_sync( session, user_id=user_id, source_type="transcript", raw_text=transcript.strip(), conversation_id=conversation_id, lineage_json=lineage_json, primary_user_message_id=primary_mid, ) session.flush() chunks_text = chunk_transcript(transcript.strip()) chunk_records: list[tuple[str, str]] = [] for i, content in enumerate(chunks_text): chunk = create_chunk_sync( session, source_id=source.id, user_id=user_id, content=content, chunk_index=i, ) session.flush() chunk_records.append((chunk.id, content)) from app.core.config import settings vectors_written = 0 embedding_available = False try: embedding_provider = get_embedding_provider() if embedding_provider is not None: embedding_available = embedding_provider.is_available() except Exception as e: logger.warning( "memory embedding provider 不可用(sync): {} exc_type={}", e, type(e).__name__, ) embedding_provider = None # 向量写入在 SAVEPOINT 内;失败仅回滚本段,source/chunks 主体仍可由外层提交。 # LLM enrichment 在 commit 后由 schedule_memory_enrichment 入 memory_idle 队列。 try: with session.begin_nested(): if chunk_records and embedding_provider is not None: texts = [content for _, content in chunk_records] embeddings = embedding_provider.embed_texts_sync(texts) for (chunk_id, _), emb in zip( chunk_records, embeddings, strict=False ): if emb: vectors_written += 1 update_chunk_embedding_sync(session, chunk_id, emb) except Exception as e: logger.warning( "memory embedding 跳过(sync): {} exc_type={}", e, type(e).__name__, ) session.commit() enrichment_task_id: str | None = None if settings.memory_enrichment_enabled: try: from app.tasks.memory_enrichment_tasks import schedule_memory_enrichment enrichment_task_id = schedule_memory_enrichment( user_id, source.id, memoir_correlation_id=None ) except Exception as e: logger.warning( "memory enrichment 任务派发失败: {} exc_type={}", e, type(e).__name__, ) logger.info( "event=memory_ingest_done user_id={} conversation_id={} source_id={} " "chunks={} vectors_written={} embedding_available={} enrichment_enabled={} enrichment_task_id={} sync=1", user_id, conversation_id, source.id, len(chunk_records), vectors_written, embedding_available, settings.memory_enrichment_enabled, enrichment_task_id, ) return source.id def ingest_transcripts_batch_sync( session, user_id: str, items: list[tuple[str, str, dict | None]], ) -> list[str]: """ Phase1 批量:多段 transcript 在同一会话内建 source/chunks,并单次 embed_texts_sync(在适配器 batch 限制内)。 不 commit;不派发 enrichment(由调用方 commit 后 ``schedule_enrichment_for_sources``)。 items: (conversation_id, transcript, lineage_json) 返回与有效 items 顺序一致的 source_id 列表。 """ from app.core.dependencies import get_embedding_provider from app.features.memory.chunker import chunk_transcript from app.features.memory.repo import ( create_chunk_sync, create_source_sync, update_chunk_embedding_sync, ) source_ids: list[str] = [] all_chunk_records: list[tuple[str, str]] = [] for conversation_id, transcript, lineage_json in items: text = (transcript or "").strip() if not text: continue primary_mid = ( primary_user_message_id_from_lineage(lineage_json) if lineage_json else None ) source = create_source_sync( session, user_id=user_id, source_type="transcript", raw_text=text, conversation_id=conversation_id or None, lineage_json=lineage_json, primary_user_message_id=primary_mid, ) session.flush() chunks_text = chunk_transcript(text) for i, content in enumerate(chunks_text): chunk = create_chunk_sync( session, source_id=source.id, user_id=user_id, content=content, chunk_index=i, ) session.flush() all_chunk_records.append((chunk.id, content)) source_ids.append(source.id) embedding_provider = None try: embedding_provider = get_embedding_provider() except Exception as e: logger.warning( "memory embedding provider 不可用(batch sync): {} exc_type={}", e, type(e).__name__, ) vectors_written = 0 try: with session.begin_nested(): if all_chunk_records and embedding_provider is not None: texts = [content for _, content in all_chunk_records] embeddings = embedding_provider.embed_texts_sync(texts) for (chunk_id, _), emb in zip( all_chunk_records, embeddings, strict=False ): if emb: vectors_written += 1 update_chunk_embedding_sync(session, chunk_id, emb) except Exception as e: logger.warning( "memory embedding 跳过(batch sync): {} exc_type={}", e, type(e).__name__, ) emb_ok = ( embedding_provider.is_available() if embedding_provider is not None else False ) logger.info( "event=memory_ingest_batch_done user_id={} sources={} chunks={} " "vectors_written={} embedding_available={}", user_id, len(source_ids), len(all_chunk_records), vectors_written, emb_ok, ) return source_ids def schedule_enrichment_for_sources( user_id: str, source_ids: list[str], *, memoir_correlation_id: str | None = None, ) -> None: """After successful ingest commit, enqueue LLM enrichment for each source (memory_idle queue).""" MemoryEnrichmentScheduler().schedule_many( user_id, source_ids, memoir_correlation_id=memoir_correlation_id, )