"""章节证据闭包:统一计算(评测与生产共用)+ Phase C 表持久化(快照行 + chapter_evidence_links)。""" from __future__ import annotations import uuid from datetime import datetime, timezone from sqlalchemy import delete, func, select from sqlalchemy.orm import Session, joinedload from app.core.logging import get_logger from app.features.conversation.lineage_schemas import aggregate_lineage_from_segments from app.features.conversation.models import Conversation, Segment from app.features.memoir.models import ( Chapter, ChapterEvidenceLink, ChapterEvidenceSnapshot, ) from app.features.memory.repo import fetch_memory_closure_for_conversations_sync EVIDENCE_SNAPSHOT_SCHEMA_VERSION = 1 logger = get_logger(__name__) def _normalize_segment_ids(raw: object) -> list[str]: if not raw or not isinstance(raw, list): return [] out: list[str] = [] for x in raw: s = str(x).strip() if s: out.append(s) seen: set[str] = set() deduped: list[str] = [] for s in out: if s not in seen: seen.add(s) deduped.append(s) return deduped def _story_ids_ordered(chapter: Chapter) -> list[str]: links = sorted( list(getattr(chapter, "story_links", None) or []), key=lambda lnk: getattr(lnk, "order_index", 0), ) out: list[str] = [] for ln in links: sid = getattr(ln, "story_id", None) if sid: out.append(str(sid)) return out def build_chapter_evidence_closure_payload_sync( session: Session, chapter: Chapter ) -> dict: """ 唯一闭包计算入口:由 `refresh_chapter_evidence_snapshot_sync` 与评测侧(经 JSON 镜像) 共用同一套 segment / conversation / memory 推导逻辑。 """ uid = str(chapter.user_id) segment_ids = _normalize_segment_ids(chapter.source_segments) story_ids = _story_ids_ordered(chapter) segs: list = [] if not segment_ids: conv_ids: list[str] = [] chunk_ids, fact_ids, tl_ids, sum_ids = [], [], [], [] notes = [ "no_source_segments", "snapshot_materialized", ] else: stmt = ( select(Segment) .join(Conversation, Segment.conversation_id == Conversation.id) .where( Segment.id.in_(segment_ids), Conversation.user_id == uid, Conversation.deleted_at.is_(None), ) ) segs = list(session.execute(stmt).scalars().all()) conv_ids = sorted({str(s.conversation_id) for s in segs if s.conversation_id}) chunk_ids, fact_ids, tl_ids, sum_ids = ( fetch_memory_closure_for_conversations_sync(session, uid, conv_ids) if conv_ids else ([], [], [], []) ) notes = ["snapshot_materialized"] if len(segs) < len(segment_ids): notes.append("some_segment_ids_unresolved_or_foreign_user") message_lineage_json = None if segs: order_map = {sid: i for i, sid in enumerate(segment_ids)} segs_ordered = sorted(segs, key=lambda s: order_map.get(str(s.id), 9999)) message_lineage_json = aggregate_lineage_from_segments( segs_ordered, conversation_id_fallback=conv_ids[0] if conv_ids else None, ) return { "schema_version": EVIDENCE_SNAPSHOT_SCHEMA_VERSION, "captured_at": datetime.now(timezone.utc).isoformat(), "chapter_id": str(chapter.id), "user_id": uid, "segment_ids": segment_ids, "conversation_ids": conv_ids, "story_ids": story_ids, "memory_chunk_ids": chunk_ids, "memory_fact_ids": fact_ids, "timeline_event_ids": tl_ids, "summary_ids": sum_ids, "notes": notes, "message_lineage_json": message_lineage_json, } # 旧名保留,避免外部 import 断裂 build_chapter_evidence_snapshot_sync = build_chapter_evidence_closure_payload_sync def _replace_chapter_evidence_links_sync( session: Session, *, chapter_id: str, payload: dict ) -> None: session.execute( delete(ChapterEvidenceLink).where(ChapterEvidenceLink.chapter_id == chapter_id) ) for cid in payload.get("memory_chunk_ids") or []: session.add( ChapterEvidenceLink( id=str(uuid.uuid4()), chapter_id=chapter_id, evidence_type="chunk", evidence_id=str(cid), role="primary", ) ) for fid in payload.get("memory_fact_ids") or []: session.add( ChapterEvidenceLink( id=str(uuid.uuid4()), chapter_id=chapter_id, evidence_type="fact", evidence_id=str(fid), role="supporting", ) ) for tid in payload.get("timeline_event_ids") or []: session.add( ChapterEvidenceLink( id=str(uuid.uuid4()), chapter_id=chapter_id, evidence_type="timeline_event", evidence_id=str(tid), role="supporting", ) ) for sid in payload.get("summary_ids") or []: session.add( ChapterEvidenceLink( id=str(uuid.uuid4()), chapter_id=chapter_id, evidence_type="summary", evidence_id=str(sid), role="background", ) ) def refresh_chapter_evidence_snapshot_sync(session: Session, chapter_id: str) -> bool: """写入新版本快照行、替换 evidence_links、更新 Chapter 当前指针;镜像 evidence_bundle_json。""" stmt = ( select(Chapter) .where(Chapter.id == chapter_id) .options(joinedload(Chapter.story_links)) ) ch = session.execute(stmt).unique().scalar_one_or_none() if not ch: return False payload = build_chapter_evidence_closure_payload_sync(session, ch) max_v = session.execute( select(func.coalesce(func.max(ChapterEvidenceSnapshot.version_no), 0)).where( ChapterEvidenceSnapshot.chapter_id == chapter_id ) ).scalar() next_v = int(max_v or 0) + 1 cap_at = datetime.now(timezone.utc) snap = ChapterEvidenceSnapshot( id=str(uuid.uuid4()), chapter_id=str(ch.id), user_id=str(ch.user_id), version_no=next_v, schema_version=int(payload.get("schema_version") or EVIDENCE_SNAPSHOT_SCHEMA_VERSION), segment_ids=list(payload.get("segment_ids") or []), conversation_ids=list(payload.get("conversation_ids") or []), story_ids=list(payload.get("story_ids") or []), memory_chunk_ids=list(payload.get("memory_chunk_ids") or []), memory_fact_ids=list(payload.get("memory_fact_ids") or []), timeline_event_ids=list(payload.get("timeline_event_ids") or []), summary_ids=list(payload.get("summary_ids") or []), notes=list(payload.get("notes") or []), message_lineage_json=payload.get("message_lineage_json"), captured_at=cap_at, ) session.add(snap) session.flush() _replace_chapter_evidence_links_sync(session, chapter_id=str(ch.id), payload=payload) ch.current_evidence_snapshot_id = snap.id ch.evidence_bundle_json = payload if payload.get("message_lineage_json") is not None: ch.source_lineage_json = payload.get("message_lineage_json") session.flush() return True def refresh_chapter_evidence_snapshot_with_retry_sync( session: Session, chapter_id: str ) -> bool: """ 同 `refresh_chapter_evidence_snapshot_sync`,失败时整体再试 1 次(共 2 次)。 日志前缀 `evidence_snapshot_refresh_failed` 便于检索。 """ last_exc: Exception | None = None for attempt in range(2): try: return refresh_chapter_evidence_snapshot_sync(session, chapter_id) except Exception as e: last_exc = e logger.warning( "evidence_snapshot_refresh_failed attempt={} chapter_id={}: {}", attempt + 1, chapter_id, e, ) if last_exc: logger.warning( "evidence_snapshot_refresh_failed exhausted chapter_id={}: {}", chapter_id, last_exc, ) return False