Files
life-echo/api/app/features/memory/enrichment.py
Kevin ac49bc7f23 feat(eval): memoir A/B chapter judging and eval-web parity with dialogue
- Judge baseline excerpt and library chapter separately; build_memoir_compare_summary for gate, nine-dim and leaf deltas.

- Memoir SSE chapter payload: baseline_judge, compare_summary, baseline_judge_error.

- MemoirJudgeOutput: loose score coercion and post-validate clamp; memoir judge prompt caps from settings.

- app-eval-web: two-column MemoirScoreCard layout, MemoirCompareSummary, chapter blocks and CSS.

- Add memoir_compare_summary, log_events, celery_log_context, memoir_pipeline_progress; tests and migration 0014.

- Misc: memory/evidence and enrichment paths, task/orchestrator updates, internal-eval docs, env examples.
2026-04-10 10:25:15 +08:00

267 lines
9.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Transcript ingest 之后的记忆富化:单次 LLM 调用产出会话摘要 + 结构化事实。
由 Celerysync与 MemoryService.ingestasync调用失败仅打日志不阻断主流程。
不再维护 ingest 路径上的 rolling 摘要与 timeline 物化。
"""
from __future__ import annotations
from typing import Any
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Session
from app.core.langchain_llm import ainvoke_json_object, invoke_json_object
from app.core.logging import get_logger
from app.features.memory.enrichment_pipeline import (
dedupe_key,
normalize_object_json,
normalize_subject,
)
from app.features.memory.llm_schemas import (
EnrichmentPayload,
enrichment_payload_to_fact_dicts,
parse_json_payload,
)
from app.features.memory.models import MemoryChunk, MemorySource
from app.features.memory.repo import (
create_memory_fact,
create_memory_fact_sync,
create_memory_summary,
create_memory_summary_sync,
list_chunks_for_source_sync,
)
from app.features.user.models import User
logger = get_logger(__name__)
def _lineage_snapshot_from_source(source: MemorySource | None) -> dict | None:
raw = getattr(source, "lineage_json", None) if source else None
return raw if isinstance(raw, dict) and raw else None
def _resolve_llm_sync() -> Any | None:
try:
from app.core.dependencies import get_llm_provider_fast
return get_llm_provider_fast().langchain_llm
except Exception as e:
logger.warning("memory enrichment 无法获取 LLM: {}", e)
return None
def _max_enrichment_chars() -> int:
from app.core.config import settings
return settings.memory_enrichment_max_chars
def _enrichment_prompt(numbered_blocks: str, narrator_label: str) -> str:
"""合并会话摘要说明与事实抽取规则,供单次 JSON 输出。"""
text = numbered_blocks.strip()[: _max_enrichment_chars()]
return (
"你是回忆录记忆分析助手。用户正在口述人生回忆,所有内容默认是**过去发生的事**"
"而非当前或未来计划(除非原文明确说「现在」「打算」「准备将要」等)。\n\n"
"请从下列口述内容中完成两件事:\n"
"1) 用 28 句中文概括要点(不编造、不评价)\n"
"2) 抽取结构化事实列表(见下方规则)\n\n"
"## 事实抽取规则\n"
"1. subject 必须用明确的人名或固定称谓:\n"
f" - 叙述者本人统一用「{narrator_label}\n"
" - 其他人用全名或稳定专名(如「王伟」),禁止用「他」「她」「我」「我们大伙」等代词作 subject"
"若代词在上下文中可唯一解析为某人,则 subject 写该人姓名/专名\n"
"2. 事件、职务变动、地点迁移等一律按**过去回忆**理解travel/调动/命令类表述勿写成「即将要做」"
"除非原文明确为未来时态\n"
"3. 若可推断大约年代或人生阶段,将 approximate_era 写入 object_json与 value 等字段并存),"
'例如 "1990年代""2001年""退休后""30岁前后"\n'
"4. fact_type: person|event|relation|place|milestone\n"
"5. predicate简短中文谓语如「出生地」「担任职务」「调往」\n"
"6. object_json字符串或对象可含 value、approximate_era 等\n"
"7. confidence 0..1source_chunk_id 必须等于某段 [chunk_id=...] 中的 id\n\n"
'只输出 JSON{"summary":"...","facts":[...]},无事实则 "facts":[]。\n\n'
f"{text}"
)
def _run_enrichment_llm_sync(
llm: Any, numbered: str, narrator_label: str
) -> EnrichmentPayload | None:
if not llm or not (numbered or "").strip():
return None
prompt = _enrichment_prompt(numbered, narrator_label)
try:
raw = invoke_json_object(
llm,
prompt,
max_tokens=8192,
agent="memory.enrichment_sync",
)
return parse_json_payload(raw, EnrichmentPayload)
except (TypeError, ValueError) as e:
logger.warning("enrichment LLM sync 解析失败: {}", e)
return None
async def _run_enrichment_llm_async(
llm: Any, numbered: str, narrator_label: str
) -> EnrichmentPayload | None:
if not llm or not (numbered or "").strip():
return None
prompt = _enrichment_prompt(numbered, narrator_label)
try:
raw = await ainvoke_json_object(
llm,
prompt,
max_tokens=8192,
agent="memory.enrichment_async",
)
return parse_json_payload(raw, EnrichmentPayload)
except (TypeError, ValueError) as e:
logger.warning("enrichment LLM async 解析失败: {}", e)
return None
def enrich_memory_after_ingest_sync(
session: Session,
user_id: str,
source_id: str,
llm: Any | None = None,
) -> None:
from app.core.config import settings
if not settings.memory_enrichment_enabled:
return
if llm is None:
llm = _resolve_llm_sync()
if not llm:
return
narrator_name: str | None = None
u_row = session.get(User, user_id)
if u_row and (u_row.nickname or "").strip():
narrator_name = (u_row.nickname or "").strip()
chunks = list_chunks_for_source_sync(session, source_id)
if not chunks:
return
src_row = session.get(MemorySource, source_id)
lineage_snapshot = _lineage_snapshot_from_source(src_row)
chunk_ids = [c.id for c in chunks]
chunk_texts = [c.content for c in chunks]
numbered = "\n\n".join(
f"[chunk_id={cid}]\n{txt}" for cid, txt in zip(chunk_ids, chunk_texts)
)
narrator_label = (narrator_name or "").strip() or "叙述者"
payload = _run_enrichment_llm_sync(llm, numbered, narrator_label)
if payload is None:
return
session_summary_text = str(payload.summary or "").strip()
if session_summary_text:
create_memory_summary_sync(
session,
user_id=user_id,
summary_type="session",
content=session_summary_text,
source_chunk_ids=chunk_ids,
)
raw_facts = enrichment_payload_to_fact_dicts(payload)
seen: set[tuple] = set()
for f in raw_facts:
key = dedupe_key(f, narrator_name=narrator_name)
if key in seen:
continue
seen.add(key)
scid = f.get("source_chunk_id")
if scid and scid not in chunk_ids:
scid = chunk_ids[0] if chunk_ids else None
create_memory_fact_sync(
session,
user_id=user_id,
fact_type=f.get("fact_type") or "event",
subject=normalize_subject(f.get("subject"), narrator_name),
predicate=f.get("predicate"),
object_json=normalize_object_json(f.get("object_json")),
confidence=float(f.get("confidence") or 0.75),
source_chunk_id=scid,
status="confirmed",
lineage_json=lineage_snapshot,
)
async def enrich_memory_after_ingest_async(
db: AsyncSession,
user_id: str,
source_id: str,
llm: Any | None = None,
) -> None:
from app.core.config import settings
if not settings.memory_enrichment_enabled:
return
if llm is None:
llm = _resolve_llm_sync()
if not llm:
return
narrator_name: str | None = None
u_row = await db.get(User, user_id)
if u_row and (u_row.nickname or "").strip():
narrator_name = (u_row.nickname or "").strip()
stmt = (
select(MemoryChunk)
.where(MemoryChunk.source_id == source_id)
.order_by(MemoryChunk.chunk_index.asc())
)
result = await db.execute(stmt)
chunks = list(result.unique().scalars().all())
if not chunks:
return
src_row = await db.get(MemorySource, source_id)
lineage_snapshot = _lineage_snapshot_from_source(src_row)
chunk_ids = [c.id for c in chunks]
chunk_texts = [c.content for c in chunks]
numbered = "\n\n".join(
f"[chunk_id={cid}]\n{txt}" for cid, txt in zip(chunk_ids, chunk_texts)
)
narrator_label = (narrator_name or "").strip() or "叙述者"
payload = await _run_enrichment_llm_async(llm, numbered, narrator_label)
if payload is None:
return
session_summary_text = str(payload.summary or "").strip()
if session_summary_text:
await create_memory_summary(
db,
user_id=user_id,
summary_type="session",
content=session_summary_text,
source_chunk_ids=chunk_ids,
)
raw_facts = enrichment_payload_to_fact_dicts(payload)
seen: set[tuple] = set()
for f in raw_facts:
key = dedupe_key(f, narrator_name=narrator_name)
if key in seen:
continue
seen.add(key)
scid = f.get("source_chunk_id")
if scid and scid not in chunk_ids:
scid = chunk_ids[0] if chunk_ids else None
await create_memory_fact(
db,
user_id=user_id,
fact_type=f.get("fact_type") or "event",
subject=normalize_subject(f.get("subject"), narrator_name),
predicate=f.get("predicate"),
object_json=normalize_object_json(f.get("object_json")),
confidence=float(f.get("confidence") or 0.75),
source_chunk_id=scid,
status="confirmed",
lineage_json=lineage_snapshot,
)