Files
life-echo/api/app/features/memoir/chapter_evidence_snapshot.py
Kevin 71fbd39e32 feat(api)!: memory single chain — async MemoryService, strict eval closure
Route all memory ingest/retrieve/enrichment/compaction through async MemoryService.
Remove legacy sync memory implementations (ingest/retrieve/compaction); Celery and
memoir Phase2 call asyncio.run into MemoryService-backed helpers.

Memoir Phase1 batch ingest uses MemoryService.ingest_transcripts_batch; drop chapters.
evidence_bundle_json mirror (Alembic 0015). Evaluation uses snapshot/link-only bundles;
raise EvidenceClosureMissing instead of partial/fallback lineage tiers.

Split memoir state into NarrativeCoverageState and InterviewControlState; delete the
_interview_meta_store adapter layer. Remove rolling-query and recent-fact fallback
settings from config and evidence assembly.

Update judges, docs, tests, and PlaygroundPage alignment.

Made-with: Cursor
2026-04-30 14:11:50 +08:00

293 lines
9.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""章节证据闭包:快照行 + chapter_evidence_links 是评测唯一证据来源。"""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from sqlalchemy import delete, func, select
from sqlalchemy.orm import Session, joinedload
from app.core.logging import get_logger
from app.features.conversation.lineage_schemas import aggregate_lineage_from_segments
from app.features.conversation.models import Conversation, Segment
from app.features.memoir.models import (
Chapter,
ChapterEvidenceLink,
ChapterEvidenceSnapshot,
)
from app.features.story.models import StoryEvidenceLink
EVIDENCE_SNAPSHOT_SCHEMA_VERSION = 1
logger = get_logger(__name__)
def _normalize_segment_ids(raw: object) -> list[str]:
if not raw or not isinstance(raw, list):
return []
out: list[str] = []
for x in raw:
s = str(x).strip()
if s:
out.append(s)
seen: set[str] = set()
deduped: list[str] = []
for s in out:
if s not in seen:
seen.add(s)
deduped.append(s)
return deduped
def _story_ids_ordered(chapter: Chapter) -> list[str]:
links = sorted(
list(getattr(chapter, "story_links", None) or []),
key=lambda lnk: getattr(lnk, "order_index", 0),
)
out: list[str] = []
for ln in links:
sid = getattr(ln, "story_id", None)
if sid:
out.append(str(sid))
return out
def _dedupe_ids(raw: list[str]) -> list[str]:
seen: set[str] = set()
out: list[str] = []
for item in raw:
sid = str(item).strip()
if sid and sid not in seen:
seen.add(sid)
out.append(sid)
return out
def _story_evidence_ids_for_chapter(
session: Session, story_ids: list[str]
) -> tuple[list[str], list[str], list[str], list[str]]:
"""Collect strict story-bound evidence ids for the chapter snapshot."""
if not story_ids:
return [], [], [], []
stmt = select(
StoryEvidenceLink.evidence_type,
StoryEvidenceLink.evidence_id,
).where(StoryEvidenceLink.story_id.in_(story_ids))
chunk_ids: list[str] = []
fact_ids: list[str] = []
timeline_ids: list[str] = []
summary_ids: list[str] = []
for evidence_type, evidence_id in session.execute(stmt).all():
et = str(evidence_type or "").strip()
eid = str(evidence_id or "").strip()
if not eid:
continue
if et == "chunk":
chunk_ids.append(eid)
elif et == "fact":
fact_ids.append(eid)
elif et == "timeline_event":
timeline_ids.append(eid)
elif et == "summary":
summary_ids.append(eid)
return (
_dedupe_ids(chunk_ids),
_dedupe_ids(fact_ids),
_dedupe_ids(timeline_ids),
_dedupe_ids(summary_ids),
)
def build_chapter_evidence_closure_payload_sync(
session: Session, chapter: Chapter
) -> dict:
"""
唯一闭包计算入口transcript 证据来自 chapter.segment 绑定;
memory 证据只来自 StoryEvidenceLink不再做 live memory closure fallback。
"""
uid = str(chapter.user_id)
segment_ids = _normalize_segment_ids(chapter.source_segments)
story_ids = _story_ids_ordered(chapter)
chunk_ids, fact_ids, tl_ids, sum_ids = _story_evidence_ids_for_chapter(
session, story_ids
)
segs: list = []
if not segment_ids:
conv_ids: list[str] = []
notes = [
"no_source_segments",
"snapshot_materialized",
]
else:
stmt = (
select(Segment)
.join(Conversation, Segment.conversation_id == Conversation.id)
.where(
Segment.id.in_(segment_ids),
Conversation.user_id == uid,
Conversation.deleted_at.is_(None),
)
)
segs = list(session.execute(stmt).scalars().all())
conv_ids = sorted({str(s.conversation_id) for s in segs if s.conversation_id})
notes = ["snapshot_materialized"]
if len(segs) < len(segment_ids):
notes.append("some_segment_ids_unresolved_or_foreign_user")
message_lineage_json = None
if segs:
order_map = {sid: i for i, sid in enumerate(segment_ids)}
segs_ordered = sorted(segs, key=lambda s: order_map.get(str(s.id), 9999))
message_lineage_json = aggregate_lineage_from_segments(
segs_ordered,
conversation_id_fallback=conv_ids[0] if conv_ids else None,
)
return {
"schema_version": EVIDENCE_SNAPSHOT_SCHEMA_VERSION,
"captured_at": datetime.now(timezone.utc).isoformat(),
"chapter_id": str(chapter.id),
"user_id": uid,
"segment_ids": segment_ids,
"conversation_ids": conv_ids,
"story_ids": story_ids,
"memory_chunk_ids": chunk_ids,
"memory_fact_ids": fact_ids,
"timeline_event_ids": tl_ids,
"summary_ids": sum_ids,
"notes": notes,
"message_lineage_json": message_lineage_json,
}
# 旧名保留,避免外部 import 断裂
build_chapter_evidence_snapshot_sync = build_chapter_evidence_closure_payload_sync
def _replace_chapter_evidence_links_sync(
session: Session, *, chapter_id: str, payload: dict
) -> None:
session.execute(
delete(ChapterEvidenceLink).where(ChapterEvidenceLink.chapter_id == chapter_id)
)
for cid in payload.get("memory_chunk_ids") or []:
session.add(
ChapterEvidenceLink(
id=str(uuid.uuid4()),
chapter_id=chapter_id,
evidence_type="chunk",
evidence_id=str(cid),
role="primary",
)
)
for fid in payload.get("memory_fact_ids") or []:
session.add(
ChapterEvidenceLink(
id=str(uuid.uuid4()),
chapter_id=chapter_id,
evidence_type="fact",
evidence_id=str(fid),
role="supporting",
)
)
for tid in payload.get("timeline_event_ids") or []:
session.add(
ChapterEvidenceLink(
id=str(uuid.uuid4()),
chapter_id=chapter_id,
evidence_type="timeline_event",
evidence_id=str(tid),
role="supporting",
)
)
for sid in payload.get("summary_ids") or []:
session.add(
ChapterEvidenceLink(
id=str(uuid.uuid4()),
chapter_id=chapter_id,
evidence_type="summary",
evidence_id=str(sid),
role="background",
)
)
def refresh_chapter_evidence_snapshot_sync(session: Session, chapter_id: str) -> bool:
"""写入新版本快照行、替换 evidence_links、更新 Chapter 当前指针。"""
stmt = (
select(Chapter)
.where(Chapter.id == chapter_id)
.options(joinedload(Chapter.story_links))
)
ch = session.execute(stmt).unique().scalar_one_or_none()
if not ch:
return False
payload = build_chapter_evidence_closure_payload_sync(session, ch)
max_v = session.execute(
select(func.coalesce(func.max(ChapterEvidenceSnapshot.version_no), 0)).where(
ChapterEvidenceSnapshot.chapter_id == chapter_id
)
).scalar()
next_v = int(max_v or 0) + 1
cap_at = datetime.now(timezone.utc)
snap = ChapterEvidenceSnapshot(
id=str(uuid.uuid4()),
chapter_id=str(ch.id),
user_id=str(ch.user_id),
version_no=next_v,
schema_version=int(
payload.get("schema_version") or EVIDENCE_SNAPSHOT_SCHEMA_VERSION
),
segment_ids=list(payload.get("segment_ids") or []),
conversation_ids=list(payload.get("conversation_ids") or []),
story_ids=list(payload.get("story_ids") or []),
memory_chunk_ids=list(payload.get("memory_chunk_ids") or []),
memory_fact_ids=list(payload.get("memory_fact_ids") or []),
timeline_event_ids=list(payload.get("timeline_event_ids") or []),
summary_ids=list(payload.get("summary_ids") or []),
notes=list(payload.get("notes") or []),
message_lineage_json=payload.get("message_lineage_json"),
captured_at=cap_at,
)
session.add(snap)
session.flush()
_replace_chapter_evidence_links_sync(
session, chapter_id=str(ch.id), payload=payload
)
ch.current_evidence_snapshot_id = snap.id
if payload.get("message_lineage_json") is not None:
ch.source_lineage_json = payload.get("message_lineage_json")
session.flush()
return True
def refresh_chapter_evidence_snapshot_with_retry_sync(
session: Session, chapter_id: str
) -> bool:
"""
同 `refresh_chapter_evidence_snapshot_sync`,失败时整体再试 1 次(共 2 次)。
日志前缀 `evidence_snapshot_refresh_failed` 便于检索。
"""
last_exc: Exception | None = None
for attempt in range(2):
try:
return refresh_chapter_evidence_snapshot_sync(session, chapter_id)
except Exception as e:
last_exc = e
logger.warning(
"evidence_snapshot_refresh_failed attempt={} chapter_id={}: {}",
attempt + 1,
chapter_id,
e,
)
if last_exc:
logger.warning(
"evidence_snapshot_refresh_failed exhausted chapter_id={}: {}",
chapter_id,
last_exc,
)
return False