Route all memory ingest/retrieve/enrichment/compaction through async MemoryService. Remove legacy sync memory implementations (ingest/retrieve/compaction); Celery and memoir Phase2 call asyncio.run into MemoryService-backed helpers. Memoir Phase1 batch ingest uses MemoryService.ingest_transcripts_batch; drop chapters. evidence_bundle_json mirror (Alembic 0015). Evaluation uses snapshot/link-only bundles; raise EvidenceClosureMissing instead of partial/fallback lineage tiers. Split memoir state into NarrativeCoverageState and InterviewControlState; delete the _interview_meta_store adapter layer. Remove rolling-query and recent-fact fallback settings from config and evidence assembly. Update judges, docs, tests, and PlaygroundPage alignment. Made-with: Cursor
275 lines
10 KiB
Python
275 lines
10 KiB
Python
"""组装 Chapter/Story 评测证据闭包并格式化为评审输入。"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.features.conversation.lineage_schemas import aggregate_lineage_from_segments
|
|
from app.features.evaluation.eval_trace_format import (
|
|
build_segment_transcript,
|
|
format_chapter_for_judge,
|
|
format_story_for_judge,
|
|
)
|
|
from app.features.evaluation.eval_trace_repo import (
|
|
fetch_ai_messages_for_segments,
|
|
fetch_segments_for_user,
|
|
get_chapter_for_eval_trace,
|
|
get_story_for_eval_trace,
|
|
list_chapter_ids_for_story,
|
|
load_chunks_by_ids,
|
|
load_facts_by_ids,
|
|
load_summaries_by_ids,
|
|
load_timeline_by_ids,
|
|
normalize_source_segment_ids,
|
|
story_link_ids_by_type,
|
|
)
|
|
from app.features.evaluation.eval_trace_schemas import (
|
|
ChapterEvidenceBundle,
|
|
FormattedMemoirEvidence,
|
|
StoryEvidenceBundle,
|
|
)
|
|
from app.features.memoir.chapter_evidence_snapshot import (
|
|
EVIDENCE_SNAPSHOT_SCHEMA_VERSION,
|
|
)
|
|
from app.features.memoir.models import Chapter
|
|
from app.features.story.models import Story, StoryVersion
|
|
|
|
|
|
def _segments_in_order(segments: list, segment_ids: list[str]) -> list:
|
|
order = {str(sid): i for i, sid in enumerate(segment_ids)}
|
|
return sorted(segments, key=lambda s: order.get(str(s.id), 9999))
|
|
|
|
|
|
class EvidenceClosureMissing(RuntimeError):
|
|
"""评测 artifact 缺少严格证据闭包。"""
|
|
|
|
|
|
class EvalTraceService:
|
|
def __init__(self, db: AsyncSession) -> None:
|
|
self._db = db
|
|
|
|
async def _story_dialogue_lineage(
|
|
self,
|
|
st: Story,
|
|
segments: list,
|
|
segment_ids_ordered: list[str],
|
|
) -> dict | None:
|
|
if getattr(st, "current_version_id", None):
|
|
ver = await self._db.get(StoryVersion, st.current_version_id)
|
|
if ver and isinstance(getattr(ver, "lineage_json", None), dict):
|
|
lj = ver.lineage_json
|
|
if lj.get("turns"):
|
|
return lj
|
|
if segments and segment_ids_ordered:
|
|
ordered = _segments_in_order(segments, segment_ids_ordered)
|
|
conv_ids = sorted(
|
|
{str(s.conversation_id) for s in segments if s.conversation_id}
|
|
)
|
|
return aggregate_lineage_from_segments(
|
|
ordered, conversation_id_fallback=conv_ids[0] if conv_ids else None
|
|
)
|
|
return None
|
|
|
|
def _assert_strict_closure(
|
|
self,
|
|
*,
|
|
artifact_type: str,
|
|
artifact_id: str,
|
|
segment_ids: list[str],
|
|
memory_chunk_ids: list[str],
|
|
memory_fact_ids: list[str],
|
|
timeline_event_ids: list[str],
|
|
summary_ids: list[str],
|
|
) -> None:
|
|
has_transcript = bool(segment_ids)
|
|
has_memory = bool(
|
|
memory_chunk_ids or memory_fact_ids or timeline_event_ids or summary_ids
|
|
)
|
|
if not (has_transcript and has_memory):
|
|
raise EvidenceClosureMissing(
|
|
f"{artifact_type} {artifact_id} missing strict evidence closure"
|
|
)
|
|
|
|
async def build_chapter_bundle(
|
|
self, user_id: str, chapter: Chapter
|
|
) -> ChapterEvidenceBundle:
|
|
notes: list[str] = []
|
|
row = getattr(chapter, "current_evidence_snapshot", None)
|
|
if (
|
|
row is None
|
|
or str(row.user_id) != str(user_id)
|
|
or str(row.chapter_id) != str(chapter.id)
|
|
or int(row.schema_version or 0) != EVIDENCE_SNAPSHOT_SCHEMA_VERSION
|
|
):
|
|
raise EvidenceClosureMissing(
|
|
f"chapter {chapter.id} missing current evidence snapshot"
|
|
)
|
|
|
|
segment_ids = [str(x) for x in (row.segment_ids or []) if str(x).strip()]
|
|
conv_ids = sorted(
|
|
{str(x) for x in (row.conversation_ids or []) if str(x).strip()}
|
|
)
|
|
chunk_ids = [str(x) for x in (row.memory_chunk_ids or []) if str(x).strip()]
|
|
fact_ids = [str(x) for x in (row.memory_fact_ids or []) if str(x).strip()]
|
|
tl_ids = [str(x) for x in (row.timeline_event_ids or []) if str(x).strip()]
|
|
sum_ids = [str(x) for x in (row.summary_ids or []) if str(x).strip()]
|
|
self._assert_strict_closure(
|
|
artifact_type="chapter",
|
|
artifact_id=str(chapter.id),
|
|
segment_ids=segment_ids,
|
|
memory_chunk_ids=chunk_ids,
|
|
memory_fact_ids=fact_ids,
|
|
timeline_event_ids=tl_ids,
|
|
summary_ids=sum_ids,
|
|
)
|
|
notes.extend([str(x) for x in (row.notes or []) if x])
|
|
notes.append("evidence_from_chapter_evidence_snapshot_table")
|
|
dlg = getattr(row, "message_lineage_json", None)
|
|
return ChapterEvidenceBundle(
|
|
user_id=user_id,
|
|
chapter_id=str(chapter.id),
|
|
segment_ids=segment_ids,
|
|
conversation_ids=conv_ids,
|
|
memory_chunk_ids=chunk_ids,
|
|
memory_fact_ids=fact_ids,
|
|
timeline_event_ids=tl_ids,
|
|
summary_ids=sum_ids,
|
|
notes=notes,
|
|
dialogue_lineage=dlg if isinstance(dlg, dict) else None,
|
|
)
|
|
|
|
async def format_chapter_bundle(
|
|
self, bundle: ChapterEvidenceBundle
|
|
) -> tuple[FormattedMemoirEvidence, ChapterEvidenceBundle]:
|
|
segs = await fetch_segments_for_user(
|
|
self._db, user_id=bundle.user_id, segment_ids=bundle.segment_ids
|
|
)
|
|
ai_map = await fetch_ai_messages_for_segments(
|
|
self._db, user_id=bundle.user_id, segment_ids=[s.id for s in segs]
|
|
)
|
|
transcript = build_segment_transcript(segs, ai_map)
|
|
chunks = await load_chunks_by_ids(
|
|
self._db, user_id=bundle.user_id, chunk_ids=bundle.memory_chunk_ids
|
|
)
|
|
facts = await load_facts_by_ids(
|
|
self._db, user_id=bundle.user_id, fact_ids=bundle.memory_fact_ids
|
|
)
|
|
events = await load_timeline_by_ids(
|
|
self._db, user_id=bundle.user_id, event_ids=bundle.timeline_event_ids
|
|
)
|
|
summaries = await load_summaries_by_ids(
|
|
self._db, user_id=bundle.user_id, summary_ids=bundle.summary_ids
|
|
)
|
|
formatted = format_chapter_for_judge(
|
|
bundle,
|
|
transcript=transcript,
|
|
chunks=chunks,
|
|
facts=facts,
|
|
events=events,
|
|
summaries=summaries,
|
|
)
|
|
return formatted, bundle
|
|
|
|
async def build_story_bundle(
|
|
self, user_id: str, story_id: str
|
|
) -> StoryEvidenceBundle:
|
|
st = await get_story_for_eval_trace(
|
|
self._db, user_id=user_id, story_id=story_id
|
|
)
|
|
if not st:
|
|
raise EvidenceClosureMissing(f"story {story_id} not found")
|
|
|
|
links = list(st.evidence_links or [])
|
|
lc, lf, lt, ls = story_link_ids_by_type(links)
|
|
if not (lc or lf or lt or ls):
|
|
raise EvidenceClosureMissing(
|
|
f"story {story_id} missing StoryEvidenceLink closure"
|
|
)
|
|
|
|
notes: list[str] = ["evidence_from_story_evidence_links"]
|
|
chapter_ids = await list_chapter_ids_for_story(
|
|
self._db, user_id=user_id, story_id=str(st.id)
|
|
)
|
|
seg_ids: list[str] = []
|
|
conv_ids: list[str] = []
|
|
for cid in chapter_ids:
|
|
ch = await get_chapter_for_eval_trace(
|
|
self._db, user_id=user_id, chapter_id=cid
|
|
)
|
|
if not ch:
|
|
continue
|
|
seg_ids.extend(normalize_source_segment_ids(ch.source_segments))
|
|
seen_s: set[str] = set()
|
|
dedup_seg: list[str] = []
|
|
for s in seg_ids:
|
|
if s not in seen_s:
|
|
seen_s.add(s)
|
|
dedup_seg.append(s)
|
|
segments = await fetch_segments_for_user(
|
|
self._db, user_id=user_id, segment_ids=dedup_seg
|
|
)
|
|
if not segments:
|
|
raise EvidenceClosureMissing(f"story {story_id} missing bound transcript")
|
|
|
|
conv_ids = sorted(
|
|
{str(s.conversation_id) for s in segments if s.conversation_id}
|
|
)
|
|
self._assert_strict_closure(
|
|
artifact_type="story",
|
|
artifact_id=str(st.id),
|
|
segment_ids=[s.id for s in segments],
|
|
memory_chunk_ids=lc,
|
|
memory_fact_ids=lf,
|
|
timeline_event_ids=lt,
|
|
summary_ids=ls,
|
|
)
|
|
notes.append("transcript_from_bound_chapter_segments")
|
|
dlg = await self._story_dialogue_lineage(st, segments, dedup_seg)
|
|
return StoryEvidenceBundle(
|
|
user_id=user_id,
|
|
story_id=str(st.id),
|
|
segment_ids=[s.id for s in segments],
|
|
conversation_ids=conv_ids,
|
|
memory_chunk_ids=lc,
|
|
memory_fact_ids=lf,
|
|
timeline_event_ids=lt,
|
|
summary_ids=ls,
|
|
notes=notes,
|
|
augmented_with_chapter_context=bool(chapter_ids),
|
|
story_link_evidence_count=len(links),
|
|
dialogue_lineage=dlg,
|
|
)
|
|
|
|
async def format_story_bundle(
|
|
self, bundle: StoryEvidenceBundle
|
|
) -> tuple[FormattedMemoirEvidence, StoryEvidenceBundle]:
|
|
segs = await fetch_segments_for_user(
|
|
self._db, user_id=bundle.user_id, segment_ids=bundle.segment_ids
|
|
)
|
|
ai_map = await fetch_ai_messages_for_segments(
|
|
self._db, user_id=bundle.user_id, segment_ids=[s.id for s in segs]
|
|
)
|
|
transcript = build_segment_transcript(segs, ai_map)
|
|
|
|
chunks = await load_chunks_by_ids(
|
|
self._db, user_id=bundle.user_id, chunk_ids=bundle.memory_chunk_ids
|
|
)
|
|
facts = await load_facts_by_ids(
|
|
self._db, user_id=bundle.user_id, fact_ids=bundle.memory_fact_ids
|
|
)
|
|
events = await load_timeline_by_ids(
|
|
self._db, user_id=bundle.user_id, event_ids=bundle.timeline_event_ids
|
|
)
|
|
summaries = await load_summaries_by_ids(
|
|
self._db, user_id=bundle.user_id, summary_ids=bundle.summary_ids
|
|
)
|
|
formatted = format_story_for_judge(
|
|
bundle,
|
|
transcript=transcript,
|
|
chunks=chunks,
|
|
facts=facts,
|
|
events=events,
|
|
summaries=summaries,
|
|
)
|
|
return formatted, bundle
|