配置 SSOT(TOML + .env) 统一错误契约 Auth 与事务边界 Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client 可观测性(OpenTelemetry + LGTM)
259 lines
8.8 KiB
Python
259 lines
8.8 KiB
Python
"""
|
||
Transcript ingest 之后的记忆富化:单次 LLM 调用产出会话摘要 + 结构化事实。
|
||
|
||
由 async MemoryService 调用;失败仅打日志,不阻断主流程。
|
||
不再维护 ingest 路径上的 rolling 摘要与 timeline 物化。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from typing import Any
|
||
|
||
from sqlalchemy import select
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from app.core.llm_gateway import LlmGateway, LlmUseCase
|
||
from app.core.logging import get_logger
|
||
from app.features.memory.enrichment_pipeline import (
|
||
dedupe_key,
|
||
normalize_object_json,
|
||
normalize_subject,
|
||
)
|
||
from app.features.memory.llm_schemas import (
|
||
EnrichmentPayload,
|
||
enrichment_payload_to_fact_dicts,
|
||
)
|
||
from app.features.memory.models import MemoryChunk, MemorySource
|
||
from app.features.memory.repo import (
|
||
create_memory_fact,
|
||
create_memory_summary,
|
||
set_source_enrichment_status,
|
||
)
|
||
from app.features.user.models import User
|
||
from app.features.memory.constants import memory
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
def _short_error(exc: BaseException | str, *, max_chars: int = 500) -> str:
|
||
text = str(exc)
|
||
if len(text) > max_chars:
|
||
return text[: max_chars - 3] + "..."
|
||
return text
|
||
|
||
|
||
async def _commit_if_available(db: AsyncSession) -> None:
|
||
commit = getattr(db, "commit", None)
|
||
if commit is not None:
|
||
await commit()
|
||
|
||
|
||
def _lineage_snapshot_from_source(source: MemorySource | None) -> dict | None:
|
||
raw = getattr(source, "lineage_json", None) if source else None
|
||
return raw if isinstance(raw, dict) and raw else None
|
||
|
||
|
||
def _resolve_gateway_llm() -> Any | None:
|
||
try:
|
||
return LlmGateway().langchain_llm_for(
|
||
LlmUseCase("memory.enrichment", fast=True)
|
||
)
|
||
except Exception as e:
|
||
logger.warning("memory enrichment 无法获取 LLM: {}", e)
|
||
return None
|
||
|
||
|
||
def _max_enrichment_chars() -> int:
|
||
from app.core.config import settings
|
||
|
||
return memory.enrichment_max_chars
|
||
|
||
|
||
def _enrichment_prompt(numbered_blocks: str, narrator_label: str) -> str:
|
||
"""合并会话摘要说明与事实抽取规则,供单次 JSON 输出。"""
|
||
text = numbered_blocks.strip()[: _max_enrichment_chars()]
|
||
return (
|
||
"你是回忆录记忆分析助手。用户正在口述人生回忆,所有内容默认是**过去发生的事**,"
|
||
"而非当前或未来计划(除非原文明确说「现在」「打算」「准备将要」等)。\n\n"
|
||
"请从下列口述内容中完成两件事:\n"
|
||
"1) 用 2~8 句中文概括要点(不编造、不评价)\n"
|
||
"2) 抽取结构化事实列表(见下方规则)\n\n"
|
||
"## 事实抽取规则\n"
|
||
"1. subject 必须用明确的人名或固定称谓:\n"
|
||
f" - 叙述者本人统一用「{narrator_label}」\n"
|
||
" - 其他人用全名或稳定专名(如「王伟」),禁止用「他」「她」「我」「我们大伙」等代词作 subject;"
|
||
"若代词在上下文中可唯一解析为某人,则 subject 写该人姓名/专名\n"
|
||
"2. 事件、职务变动、地点迁移等一律按**过去回忆**理解;travel/调动/命令类表述勿写成「即将要做」"
|
||
"除非原文明确为未来时态\n"
|
||
"3. 若可推断大约年代或人生阶段,将 approximate_era 写入 object_json(与 value 等字段并存),"
|
||
'例如 "1990年代"、"2001年"、"退休后"、"30岁前后"\n'
|
||
"4. fact_type: person|event|relation|place|milestone\n"
|
||
"5. predicate:简短中文谓语(如「出生地」「担任职务」「调往」)\n"
|
||
"6. object_json:字符串或对象;可含 value、approximate_era 等\n"
|
||
"7. confidence 0..1;source_chunk_id 必须等于某段 [chunk_id=...] 中的 id\n\n"
|
||
'只输出 JSON:{"summary":"...","facts":[...]},无事实则 "facts":[]。\n\n'
|
||
f"{text}"
|
||
)
|
||
|
||
|
||
async def _run_enrichment_llm_async(
|
||
llm: Any, numbered: str, narrator_label: str
|
||
) -> EnrichmentPayload | None:
|
||
if not llm or not (numbered or "").strip():
|
||
return None
|
||
prompt = _enrichment_prompt(numbered, narrator_label)
|
||
return await LlmGateway().json_object(
|
||
prompt,
|
||
EnrichmentPayload,
|
||
use_case=LlmUseCase("memory.enrichment", fast=True, max_tokens=8192),
|
||
)
|
||
|
||
|
||
async def enrich_memory_after_ingest_async(
|
||
db: AsyncSession,
|
||
user_id: str,
|
||
source_id: str,
|
||
llm: Any | None = None,
|
||
*,
|
||
raise_on_failure: bool = False,
|
||
) -> dict:
|
||
from app.core.config import settings
|
||
|
||
if not memory.enrichment_enabled:
|
||
await set_source_enrichment_status(
|
||
db,
|
||
source_id=source_id,
|
||
user_id=user_id,
|
||
status="skipped",
|
||
error="disabled",
|
||
)
|
||
await _commit_if_available(db)
|
||
return {"status": "skipped", "reason": "disabled"}
|
||
if llm is None:
|
||
llm = _resolve_gateway_llm()
|
||
if not llm:
|
||
err = "llm_unavailable"
|
||
await set_source_enrichment_status(
|
||
db,
|
||
source_id=source_id,
|
||
user_id=user_id,
|
||
status="failed",
|
||
error=err,
|
||
)
|
||
await _commit_if_available(db)
|
||
if raise_on_failure:
|
||
raise RuntimeError(err)
|
||
return {"status": "failed", "error": err}
|
||
await set_source_enrichment_status(
|
||
db,
|
||
source_id=source_id,
|
||
user_id=user_id,
|
||
status="running",
|
||
error=None,
|
||
)
|
||
await _commit_if_available(db)
|
||
narrator_name: str | None = None
|
||
u_row = await db.get(User, user_id)
|
||
if u_row and (u_row.nickname or "").strip():
|
||
narrator_name = (u_row.nickname or "").strip()
|
||
stmt = (
|
||
select(MemoryChunk)
|
||
.where(MemoryChunk.source_id == source_id)
|
||
.order_by(MemoryChunk.chunk_index.asc())
|
||
)
|
||
result = await db.execute(stmt)
|
||
chunks = list(result.unique().scalars().all())
|
||
if not chunks:
|
||
await set_source_enrichment_status(
|
||
db,
|
||
source_id=source_id,
|
||
user_id=user_id,
|
||
status="skipped",
|
||
error="no_chunks",
|
||
)
|
||
await _commit_if_available(db)
|
||
return {"status": "skipped", "reason": "no_chunks"}
|
||
src_row = await db.get(MemorySource, source_id)
|
||
lineage_snapshot = _lineage_snapshot_from_source(src_row)
|
||
chunk_ids = [c.id for c in chunks]
|
||
chunk_texts = [c.content for c in chunks]
|
||
numbered = "\n\n".join(
|
||
f"[chunk_id={cid}]\n{txt}"
|
||
for cid, txt in zip(chunk_ids, chunk_texts, strict=False)
|
||
)
|
||
narrator_label = (narrator_name or "").strip() or "叙述者"
|
||
|
||
try:
|
||
payload = await _run_enrichment_llm_async(llm, numbered, narrator_label)
|
||
if payload is None:
|
||
raise ValueError("empty_enrichment_payload")
|
||
except Exception as e:
|
||
err = _short_error(e)
|
||
await set_source_enrichment_status(
|
||
db,
|
||
source_id=source_id,
|
||
user_id=user_id,
|
||
status="failed",
|
||
error=err,
|
||
)
|
||
await _commit_if_available(db)
|
||
logger.warning(
|
||
"event=memory_enrichment_llm_failed user_id={} source_id={} exc_type={} exc={}",
|
||
user_id,
|
||
source_id,
|
||
type(e).__name__,
|
||
err,
|
||
)
|
||
if raise_on_failure:
|
||
raise
|
||
return {"status": "failed", "error": err}
|
||
|
||
session_summary_text = str(payload.summary or "").strip()
|
||
if session_summary_text:
|
||
await create_memory_summary(
|
||
db,
|
||
user_id=user_id,
|
||
summary_type="session",
|
||
content=session_summary_text,
|
||
source_chunk_ids=chunk_ids,
|
||
)
|
||
|
||
raw_facts = enrichment_payload_to_fact_dicts(payload)
|
||
seen: set[tuple] = set()
|
||
for f in raw_facts:
|
||
key = dedupe_key(f, narrator_name=narrator_name)
|
||
if key in seen:
|
||
continue
|
||
seen.add(key)
|
||
scid = f.get("source_chunk_id")
|
||
if scid and scid not in chunk_ids:
|
||
scid = chunk_ids[0] if chunk_ids else None
|
||
await create_memory_fact(
|
||
db,
|
||
user_id=user_id,
|
||
fact_type=f.get("fact_type") or "event",
|
||
subject=normalize_subject(f.get("subject"), narrator_name),
|
||
predicate=f.get("predicate"),
|
||
object_json=normalize_object_json(f.get("object_json")),
|
||
confidence=float(f.get("confidence") or 0.75),
|
||
source_chunk_id=scid,
|
||
status="confirmed",
|
||
lineage_json=lineage_snapshot,
|
||
)
|
||
|
||
await set_source_enrichment_status(
|
||
db,
|
||
source_id=source_id,
|
||
user_id=user_id,
|
||
status="success",
|
||
error=None,
|
||
)
|
||
await _commit_if_available(db)
|
||
return {
|
||
"status": "success",
|
||
"source_id": source_id,
|
||
"chunks": len(chunks),
|
||
"facts": len(seen),
|
||
"summary": bool(session_summary_text),
|
||
}
|