Files
life-echo/api/app/features/evaluation/eval_trace_format.py

272 lines
8.3 KiB
Python
Raw Normal View History

"""将证据闭包格式化为评审可读文本,并记录截断/丢弃区块(可审计)。"""
from __future__ import annotations
from app.core.config import settings
from app.features.conversation.models import Segment
from app.features.evaluation.eval_trace_schemas import (
ChapterEvidenceBundle,
EvidenceFormatMeta,
FormattedMemoirEvidence,
StoryEvidenceBundle,
)
from app.features.memory.models import (
MemoryChunk,
MemoryFact,
MemorySummary,
TimelineEvent,
)
def _memoir_evidence_char_cap() -> int:
"""与 ``Settings.eval_judge_memoir_evidence_max_chars`` 对齐。"""
return max(1000, int(settings.eval_judge_memoir_evidence_max_chars))
def _approx_tokens(chars: int) -> int:
return max(0, chars // 4)
def _segment_message_id_header(seg: Segment) -> str:
um: str | None = None
am: str | None = None
lj = getattr(seg, "lineage_json", None)
if isinstance(lj, dict):
turns = lj.get("turns")
if isinstance(turns, list) and turns:
t0 = turns[0]
if isinstance(t0, dict):
um = str(t0.get("user_message_id") or "").strip() or None
am = str(t0.get("assistant_message_id") or "").strip() or None
if um is None:
raw_um = getattr(seg, "user_message_id", None)
if raw_um:
um = str(raw_um)
parts: list[str] = []
if um:
parts.append(f"user_msg={um}")
if am:
parts.append(f"assistant_msg={am}")
return " · ".join(parts) if parts else ""
def build_segment_transcript(
segments: list[Segment],
ai_by_segment: dict[str, str],
) -> str:
"""按 segment 绑定的局部访谈块(用户句 + AI 回复)。"""
blocks: list[str] = []
for i, seg in enumerate(segments, start=1):
uid = str(seg.id)
user_txt = (seg.user_input_text or "").strip()
ai_txt = (ai_by_segment.get(uid) or seg.agent_response or "").strip()
id_extra = _segment_message_id_header(seg)
head = f"### Segment {i} · id={uid} · conversation={seg.conversation_id}" + (
f" · {id_extra}" if id_extra else ""
)
body_u = f"用户: {user_txt}" if user_txt else "用户: (空)"
body_a = f"AI: {ai_txt}" if ai_txt else "AI: (无日志/无 agent_response"
blocks.append(f"{head}\n{body_u}\n{body_a}")
return "\n\n".join(blocks)
def build_structured_evidence_text(
*,
chunks: list[MemoryChunk],
facts: list[MemoryFact],
events: list[TimelineEvent],
summaries: list[MemorySummary],
max_chars: int | None = None,
) -> tuple[str, bool, list[str]]:
"""
结构化记忆证据块返回 (text, truncated, dropped_section_tags)
"""
cap = max_chars if max_chars is not None else _memoir_evidence_char_cap()
parts: list[str] = []
dropped: list[str] = []
used = 0
truncated = False
def _add_section(title: str, body: str) -> None:
nonlocal used, truncated
block = f"{title}\n{body}".strip()
if not block:
return
if used + len(block) + 2 > cap:
truncated = True
dropped.append(title.strip("【】").split("·")[0].strip())
return
parts.append(block)
used += len(block) + 2
if chunks:
lines = []
for c in chunks:
snippet = (c.content or "").strip()
if len(snippet) > 1200:
snippet = snippet[:1200] + ""
lines.append(f"- chunk `{c.id}`: {snippet}")
_add_section("【记忆片段 chunks】", "\n".join(lines))
if facts:
lines = []
for f in facts:
subj = (f.subject or "").strip()
pred = (f.predicate or "").strip()
lines.append(
f"- fact `{f.id}` ({f.fact_type}): {subj} · {pred}".strip(" ·")
)
_add_section("【记忆事实 facts】", "\n".join(lines))
if events:
lines = []
for e in events:
lines.append(
f"- timeline `{e.id}`: {e.title} ({e.event_year or e.event_date or ''})"
)
if e.description:
desc = (e.description or "").strip()
if len(desc) > 400:
desc = desc[:400] + ""
lines.append(f" {desc}")
_add_section("【时间线 timeline】", "\n".join(lines))
if summaries:
lines = []
for s in summaries:
body = (s.content or "").strip()
if len(body) > 2000:
body = body[:2000] + ""
lines.append(f"- summary `{s.id}` ({s.summary_type}): {body}")
_add_section("【摘要 summaries】", "\n".join(lines))
return "\n\n".join(parts).strip(), truncated, dropped
def evidence_summary_line(
*,
segment_n: int,
conv_n: int,
chunk_n: int,
fact_n: int,
tl_n: int,
sum_n: int,
notes: list[str],
) -> str:
bits = [
f"segments={segment_n}",
f"conversations={conv_n}",
f"chunks={chunk_n}",
f"facts={fact_n}",
f"timeline={tl_n}",
f"summaries={sum_n}",
]
if notes:
bits.append("notes=" + "; ".join(notes[:3]))
return "; ".join(bits)
def format_chapter_for_judge(
bundle: ChapterEvidenceBundle,
*,
transcript: str,
chunks: list[MemoryChunk],
facts: list[MemoryFact],
events: list[TimelineEvent],
summaries: list[MemorySummary],
) -> FormattedMemoirEvidence:
ev_cap = _memoir_evidence_char_cap()
dropped: list[str] = []
truncated = False
t_in = transcript.strip()
if len(t_in) > ev_cap:
truncated = True
dropped.append("source_transcript_tail")
t_in = t_in[:ev_cap] + "\n\n…(原始对话证据已截断)"
struct, s_trunc, s_drop = build_structured_evidence_text(
chunks=chunks,
facts=facts,
events=events,
summaries=summaries,
max_chars=ev_cap,
)
if s_trunc:
truncated = True
dropped.extend(s_drop)
meta = EvidenceFormatMeta(
truncated=truncated,
dropped_sections=sorted(set(dropped)),
included_token_estimate=_approx_tokens(len(t_in) + len(struct)),
transcript_chars_included=len(t_in),
structured_evidence_chars_included=len(struct),
)
summary = evidence_summary_line(
segment_n=len(bundle.segment_ids),
conv_n=len(bundle.conversation_ids),
chunk_n=len(bundle.memory_chunk_ids),
fact_n=len(bundle.memory_fact_ids),
tl_n=len(bundle.timeline_event_ids),
sum_n=len(bundle.summary_ids),
notes=bundle.notes,
)
return FormattedMemoirEvidence(
source_transcript=t_in,
structured_evidence=struct,
format_meta=meta,
evidence_summary=summary,
)
def format_story_for_judge(
bundle: StoryEvidenceBundle,
*,
transcript: str,
chunks: list[MemoryChunk],
facts: list[MemoryFact],
events: list[TimelineEvent],
summaries: list[MemorySummary],
) -> FormattedMemoirEvidence:
ev_cap = _memoir_evidence_char_cap()
dropped: list[str] = []
truncated = False
t_in = transcript.strip()
if len(t_in) > ev_cap:
truncated = True
dropped.append("source_transcript_tail")
t_in = t_in[:ev_cap] + "\n\n…(原始对话证据已截断)"
struct, s_trunc, s_drop = build_structured_evidence_text(
chunks=chunks,
facts=facts,
events=events,
summaries=summaries,
max_chars=ev_cap,
)
if s_trunc:
truncated = True
dropped.extend(s_drop)
meta = EvidenceFormatMeta(
truncated=truncated,
dropped_sections=sorted(set(dropped)),
included_token_estimate=_approx_tokens(len(t_in) + len(struct)),
transcript_chars_included=len(t_in),
structured_evidence_chars_included=len(struct),
)
summary = evidence_summary_line(
segment_n=len(bundle.segment_ids),
conv_n=len(bundle.conversation_ids),
chunk_n=len(bundle.memory_chunk_ids),
fact_n=len(bundle.memory_fact_ids),
tl_n=len(bundle.timeline_event_ids),
sum_n=len(bundle.summary_ids),
notes=bundle.notes,
)
return FormattedMemoirEvidence(
source_transcript=t_in,
structured_evidence=struct,
format_meta=meta,
evidence_summary=summary,
)