Files
life-echo/api/app/features/evaluation/eval_trace_service.py
Kevin 71fbd39e32 feat(api)!: memory single chain — async MemoryService, strict eval closure
Route all memory ingest/retrieve/enrichment/compaction through async MemoryService.
Remove legacy sync memory implementations (ingest/retrieve/compaction); Celery and
memoir Phase2 call asyncio.run into MemoryService-backed helpers.

Memoir Phase1 batch ingest uses MemoryService.ingest_transcripts_batch; drop chapters.
evidence_bundle_json mirror (Alembic 0015). Evaluation uses snapshot/link-only bundles;
raise EvidenceClosureMissing instead of partial/fallback lineage tiers.

Split memoir state into NarrativeCoverageState and InterviewControlState; delete the
_interview_meta_store adapter layer. Remove rolling-query and recent-fact fallback
settings from config and evidence assembly.

Update judges, docs, tests, and PlaygroundPage alignment.

Made-with: Cursor
2026-04-30 14:11:50 +08:00

275 lines
10 KiB
Python

"""组装 Chapter/Story 评测证据闭包并格式化为评审输入。"""
from __future__ import annotations
from sqlalchemy.ext.asyncio import AsyncSession
from app.features.conversation.lineage_schemas import aggregate_lineage_from_segments
from app.features.evaluation.eval_trace_format import (
build_segment_transcript,
format_chapter_for_judge,
format_story_for_judge,
)
from app.features.evaluation.eval_trace_repo import (
fetch_ai_messages_for_segments,
fetch_segments_for_user,
get_chapter_for_eval_trace,
get_story_for_eval_trace,
list_chapter_ids_for_story,
load_chunks_by_ids,
load_facts_by_ids,
load_summaries_by_ids,
load_timeline_by_ids,
normalize_source_segment_ids,
story_link_ids_by_type,
)
from app.features.evaluation.eval_trace_schemas import (
ChapterEvidenceBundle,
FormattedMemoirEvidence,
StoryEvidenceBundle,
)
from app.features.memoir.chapter_evidence_snapshot import (
EVIDENCE_SNAPSHOT_SCHEMA_VERSION,
)
from app.features.memoir.models import Chapter
from app.features.story.models import Story, StoryVersion
def _segments_in_order(segments: list, segment_ids: list[str]) -> list:
order = {str(sid): i for i, sid in enumerate(segment_ids)}
return sorted(segments, key=lambda s: order.get(str(s.id), 9999))
class EvidenceClosureMissing(RuntimeError):
"""评测 artifact 缺少严格证据闭包。"""
class EvalTraceService:
def __init__(self, db: AsyncSession) -> None:
self._db = db
async def _story_dialogue_lineage(
self,
st: Story,
segments: list,
segment_ids_ordered: list[str],
) -> dict | None:
if getattr(st, "current_version_id", None):
ver = await self._db.get(StoryVersion, st.current_version_id)
if ver and isinstance(getattr(ver, "lineage_json", None), dict):
lj = ver.lineage_json
if lj.get("turns"):
return lj
if segments and segment_ids_ordered:
ordered = _segments_in_order(segments, segment_ids_ordered)
conv_ids = sorted(
{str(s.conversation_id) for s in segments if s.conversation_id}
)
return aggregate_lineage_from_segments(
ordered, conversation_id_fallback=conv_ids[0] if conv_ids else None
)
return None
def _assert_strict_closure(
self,
*,
artifact_type: str,
artifact_id: str,
segment_ids: list[str],
memory_chunk_ids: list[str],
memory_fact_ids: list[str],
timeline_event_ids: list[str],
summary_ids: list[str],
) -> None:
has_transcript = bool(segment_ids)
has_memory = bool(
memory_chunk_ids or memory_fact_ids or timeline_event_ids or summary_ids
)
if not (has_transcript and has_memory):
raise EvidenceClosureMissing(
f"{artifact_type} {artifact_id} missing strict evidence closure"
)
async def build_chapter_bundle(
self, user_id: str, chapter: Chapter
) -> ChapterEvidenceBundle:
notes: list[str] = []
row = getattr(chapter, "current_evidence_snapshot", None)
if (
row is None
or str(row.user_id) != str(user_id)
or str(row.chapter_id) != str(chapter.id)
or int(row.schema_version or 0) != EVIDENCE_SNAPSHOT_SCHEMA_VERSION
):
raise EvidenceClosureMissing(
f"chapter {chapter.id} missing current evidence snapshot"
)
segment_ids = [str(x) for x in (row.segment_ids or []) if str(x).strip()]
conv_ids = sorted(
{str(x) for x in (row.conversation_ids or []) if str(x).strip()}
)
chunk_ids = [str(x) for x in (row.memory_chunk_ids or []) if str(x).strip()]
fact_ids = [str(x) for x in (row.memory_fact_ids or []) if str(x).strip()]
tl_ids = [str(x) for x in (row.timeline_event_ids or []) if str(x).strip()]
sum_ids = [str(x) for x in (row.summary_ids or []) if str(x).strip()]
self._assert_strict_closure(
artifact_type="chapter",
artifact_id=str(chapter.id),
segment_ids=segment_ids,
memory_chunk_ids=chunk_ids,
memory_fact_ids=fact_ids,
timeline_event_ids=tl_ids,
summary_ids=sum_ids,
)
notes.extend([str(x) for x in (row.notes or []) if x])
notes.append("evidence_from_chapter_evidence_snapshot_table")
dlg = getattr(row, "message_lineage_json", None)
return ChapterEvidenceBundle(
user_id=user_id,
chapter_id=str(chapter.id),
segment_ids=segment_ids,
conversation_ids=conv_ids,
memory_chunk_ids=chunk_ids,
memory_fact_ids=fact_ids,
timeline_event_ids=tl_ids,
summary_ids=sum_ids,
notes=notes,
dialogue_lineage=dlg if isinstance(dlg, dict) else None,
)
async def format_chapter_bundle(
self, bundle: ChapterEvidenceBundle
) -> tuple[FormattedMemoirEvidence, ChapterEvidenceBundle]:
segs = await fetch_segments_for_user(
self._db, user_id=bundle.user_id, segment_ids=bundle.segment_ids
)
ai_map = await fetch_ai_messages_for_segments(
self._db, user_id=bundle.user_id, segment_ids=[s.id for s in segs]
)
transcript = build_segment_transcript(segs, ai_map)
chunks = await load_chunks_by_ids(
self._db, user_id=bundle.user_id, chunk_ids=bundle.memory_chunk_ids
)
facts = await load_facts_by_ids(
self._db, user_id=bundle.user_id, fact_ids=bundle.memory_fact_ids
)
events = await load_timeline_by_ids(
self._db, user_id=bundle.user_id, event_ids=bundle.timeline_event_ids
)
summaries = await load_summaries_by_ids(
self._db, user_id=bundle.user_id, summary_ids=bundle.summary_ids
)
formatted = format_chapter_for_judge(
bundle,
transcript=transcript,
chunks=chunks,
facts=facts,
events=events,
summaries=summaries,
)
return formatted, bundle
async def build_story_bundle(
self, user_id: str, story_id: str
) -> StoryEvidenceBundle:
st = await get_story_for_eval_trace(
self._db, user_id=user_id, story_id=story_id
)
if not st:
raise EvidenceClosureMissing(f"story {story_id} not found")
links = list(st.evidence_links or [])
lc, lf, lt, ls = story_link_ids_by_type(links)
if not (lc or lf or lt or ls):
raise EvidenceClosureMissing(
f"story {story_id} missing StoryEvidenceLink closure"
)
notes: list[str] = ["evidence_from_story_evidence_links"]
chapter_ids = await list_chapter_ids_for_story(
self._db, user_id=user_id, story_id=str(st.id)
)
seg_ids: list[str] = []
conv_ids: list[str] = []
for cid in chapter_ids:
ch = await get_chapter_for_eval_trace(
self._db, user_id=user_id, chapter_id=cid
)
if not ch:
continue
seg_ids.extend(normalize_source_segment_ids(ch.source_segments))
seen_s: set[str] = set()
dedup_seg: list[str] = []
for s in seg_ids:
if s not in seen_s:
seen_s.add(s)
dedup_seg.append(s)
segments = await fetch_segments_for_user(
self._db, user_id=user_id, segment_ids=dedup_seg
)
if not segments:
raise EvidenceClosureMissing(f"story {story_id} missing bound transcript")
conv_ids = sorted(
{str(s.conversation_id) for s in segments if s.conversation_id}
)
self._assert_strict_closure(
artifact_type="story",
artifact_id=str(st.id),
segment_ids=[s.id for s in segments],
memory_chunk_ids=lc,
memory_fact_ids=lf,
timeline_event_ids=lt,
summary_ids=ls,
)
notes.append("transcript_from_bound_chapter_segments")
dlg = await self._story_dialogue_lineage(st, segments, dedup_seg)
return StoryEvidenceBundle(
user_id=user_id,
story_id=str(st.id),
segment_ids=[s.id for s in segments],
conversation_ids=conv_ids,
memory_chunk_ids=lc,
memory_fact_ids=lf,
timeline_event_ids=lt,
summary_ids=ls,
notes=notes,
augmented_with_chapter_context=bool(chapter_ids),
story_link_evidence_count=len(links),
dialogue_lineage=dlg,
)
async def format_story_bundle(
self, bundle: StoryEvidenceBundle
) -> tuple[FormattedMemoirEvidence, StoryEvidenceBundle]:
segs = await fetch_segments_for_user(
self._db, user_id=bundle.user_id, segment_ids=bundle.segment_ids
)
ai_map = await fetch_ai_messages_for_segments(
self._db, user_id=bundle.user_id, segment_ids=[s.id for s in segs]
)
transcript = build_segment_transcript(segs, ai_map)
chunks = await load_chunks_by_ids(
self._db, user_id=bundle.user_id, chunk_ids=bundle.memory_chunk_ids
)
facts = await load_facts_by_ids(
self._db, user_id=bundle.user_id, fact_ids=bundle.memory_fact_ids
)
events = await load_timeline_by_ids(
self._db, user_id=bundle.user_id, event_ids=bundle.timeline_event_ids
)
summaries = await load_summaries_by_ids(
self._db, user_id=bundle.user_id, summary_ids=bundle.summary_ids
)
formatted = format_story_for_judge(
bundle,
transcript=transcript,
chunks=chunks,
facts=facts,
events=events,
summaries=summaries,
)
return formatted, bundle