""" Transcript ingest 之后的记忆富化:单次 LLM 调用产出会话摘要 + 结构化事实。 由 async MemoryService 调用;失败仅打日志,不阻断主流程。 不再维护 ingest 路径上的 rolling 摘要与 timeline 物化。 """ from __future__ import annotations from typing import Any from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.langchain_llm import ainvoke_json_object from app.core.llm_gateway import LlmGateway, LlmUseCase from app.core.logging import get_logger from app.features.memory.enrichment_pipeline import ( dedupe_key, normalize_object_json, normalize_subject, ) from app.features.memory.llm_schemas import ( EnrichmentPayload, enrichment_payload_to_fact_dicts, parse_json_payload, ) from app.features.memory.models import MemoryChunk, MemorySource from app.features.memory.repo import ( create_memory_fact, create_memory_summary, ) from app.features.user.models import User logger = get_logger(__name__) def _lineage_snapshot_from_source(source: MemorySource | None) -> dict | None: raw = getattr(source, "lineage_json", None) if source else None return raw if isinstance(raw, dict) and raw else None def _resolve_llm() -> Any | None: try: return LlmGateway().langchain_llm_for( LlmUseCase("memory.enrichment", fast=True) ) except Exception as e: logger.warning("memory enrichment 无法获取 LLM: {}", e) return None def _max_enrichment_chars() -> int: from app.core.config import settings return settings.memory_enrichment_max_chars def _enrichment_prompt(numbered_blocks: str, narrator_label: str) -> str: """合并会话摘要说明与事实抽取规则,供单次 JSON 输出。""" text = numbered_blocks.strip()[: _max_enrichment_chars()] return ( "你是回忆录记忆分析助手。用户正在口述人生回忆,所有内容默认是**过去发生的事**," "而非当前或未来计划(除非原文明确说「现在」「打算」「准备将要」等)。\n\n" "请从下列口述内容中完成两件事:\n" "1) 用 2~8 句中文概括要点(不编造、不评价)\n" "2) 抽取结构化事实列表(见下方规则)\n\n" "## 事实抽取规则\n" "1. subject 必须用明确的人名或固定称谓:\n" f" - 叙述者本人统一用「{narrator_label}」\n" " - 其他人用全名或稳定专名(如「王伟」),禁止用「他」「她」「我」「我们大伙」等代词作 subject;" "若代词在上下文中可唯一解析为某人,则 subject 写该人姓名/专名\n" "2. 事件、职务变动、地点迁移等一律按**过去回忆**理解;travel/调动/命令类表述勿写成「即将要做」" "除非原文明确为未来时态\n" "3. 若可推断大约年代或人生阶段,将 approximate_era 写入 object_json(与 value 等字段并存)," '例如 "1990年代"、"2001年"、"退休后"、"30岁前后"\n' "4. fact_type: person|event|relation|place|milestone\n" "5. predicate:简短中文谓语(如「出生地」「担任职务」「调往」)\n" "6. object_json:字符串或对象;可含 value、approximate_era 等\n" "7. confidence 0..1;source_chunk_id 必须等于某段 [chunk_id=...] 中的 id\n\n" '只输出 JSON:{"summary":"...","facts":[...]},无事实则 "facts":[]。\n\n' f"{text}" ) async def _run_enrichment_llm_async( llm: Any, numbered: str, narrator_label: str ) -> EnrichmentPayload | None: if not llm or not (numbered or "").strip(): return None prompt = _enrichment_prompt(numbered, narrator_label) try: raw = await ainvoke_json_object( llm, prompt, max_tokens=8192, agent="memory.enrichment", ) return parse_json_payload(raw, EnrichmentPayload) except (TypeError, ValueError) as e: logger.warning("enrichment LLM async 解析失败: {}", e) return None async def enrich_memory_after_ingest_async( db: AsyncSession, user_id: str, source_id: str, llm: Any | None = None, ) -> None: from app.core.config import settings if not settings.memory_enrichment_enabled: return if llm is None: llm = _resolve_llm() if not llm: return narrator_name: str | None = None u_row = await db.get(User, user_id) if u_row and (u_row.nickname or "").strip(): narrator_name = (u_row.nickname or "").strip() stmt = ( select(MemoryChunk) .where(MemoryChunk.source_id == source_id) .order_by(MemoryChunk.chunk_index.asc()) ) result = await db.execute(stmt) chunks = list(result.unique().scalars().all()) if not chunks: return src_row = await db.get(MemorySource, source_id) lineage_snapshot = _lineage_snapshot_from_source(src_row) chunk_ids = [c.id for c in chunks] chunk_texts = [c.content for c in chunks] numbered = "\n\n".join( f"[chunk_id={cid}]\n{txt}" for cid, txt in zip(chunk_ids, chunk_texts, strict=False) ) narrator_label = (narrator_name or "").strip() or "叙述者" payload = await _run_enrichment_llm_async(llm, numbered, narrator_label) if payload is None: return session_summary_text = str(payload.summary or "").strip() if session_summary_text: await create_memory_summary( db, user_id=user_id, summary_type="session", content=session_summary_text, source_chunk_ids=chunk_ids, ) raw_facts = enrichment_payload_to_fact_dicts(payload) seen: set[tuple] = set() for f in raw_facts: key = dedupe_key(f, narrator_name=narrator_name) if key in seen: continue seen.add(key) scid = f.get("source_chunk_id") if scid and scid not in chunk_ids: scid = chunk_ids[0] if chunk_ids else None await create_memory_fact( db, user_id=user_id, fact_type=f.get("fact_type") or "event", subject=normalize_subject(f.get("subject"), narrator_name), predicate=f.get("predicate"), object_json=normalize_object_json(f.get("object_json")), confidence=float(f.get("confidence") or 0.75), source_chunk_id=scid, status="confirmed", lineage_json=lineage_snapshot, )