Files
life-echo/api/app/agents/memoir/story_route_payload.py
Kevin 6772e1269c feat(evaluation): memoir readiness, judge/replay updates, eval web playground
Add memoir_readiness_service and router tests; extend judge schemas/services, replay_service, and conversation rubric; align story route agent, payload, prompts, and story_pipeline_sync; update agent logging, config, and DI. Document internal-eval; add replayDraft util and PlaygroundPage changes in app-eval-web.
2026-04-08 09:43:34 +08:00

252 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Story 路由:候选故事 JSON 载荷summary 优先、预算裁剪、固定排序)。
供 StoryRouteAgent 与单测复用。
"""
from __future__ import annotations
import json
import re
from datetime import timezone
from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
from app.core.config import Settings
from app.features.story.models import Story
_PLAIN_SNIPPET_NOISE = re.compile(r"[`*_#]+")
def _plain_opening_snippet_from_markdown(md: str, *, max_chars: int) -> str:
"""无 summary 时供路由辨题的短文摘(弱化 Markdown 噪声)。"""
t = (md or "").strip()
if not t:
return ""
t = re.sub(r"!\[[^\]]*\]\([^)]+\)", "", t)
t = re.sub(r"\[([^\]]+)\]\([^)]+\)", r"\1", t)
t = re.sub(r"asset://\S+", "", t)
t = _PLAIN_SNIPPET_NOISE.sub("", t)
t = re.sub(r"\s+", " ", t).strip()
if len(t) <= max_chars:
return t
return t[: max_chars - 1] + ""
def _linked_chapters(s: Story) -> list[str]:
links: list[str] = []
for cl in getattr(s, "chapter_links", None) or []:
ch = getattr(cl, "chapter", None)
if ch is None:
continue
cat = getattr(ch, "category", None) or ""
tit = getattr(ch, "title", None) or ""
links.append(f"{tit}({cat})")
return links
def _updated_at_iso(s: Story) -> str:
ua = getattr(s, "updated_at", None)
if ua is None:
return ""
if ua.tzinfo is None:
ua = ua.replace(tzinfo=timezone.utc)
return ua.isoformat()
def _has_usable_summary(s: Story, summary_min_len: int) -> bool:
t = (getattr(s, "summary", None) or "").strip()
return len(t) >= summary_min_len
def _truncate_body_for_route(
md: str,
*,
body_max_chars: int,
head_chars: int,
tail_chars: int,
) -> str:
"""单篇正文进入路由 prompt 的裁剪:尽量全文,否则 head+tail。"""
m = (md or "").strip()
if not m:
return ""
if len(m) <= body_max_chars:
return m
hc = max(1, min(head_chars, body_max_chars // 2))
tc = max(1, min(tail_chars, body_max_chars // 2))
mid_omit = len(m) - hc - tc
if mid_omit <= 0:
return m[:body_max_chars]
return f"{m[:hc]}\n…(中间省略 {mid_omit} 字)…\n{m[-tc:]}"
def sort_stories_for_route(
stories: list[Story],
story_meta: dict[str, dict[str, int]],
*,
summary_min_chars: int,
) -> list[Story]:
"""has_summary(desc) → updated_at(desc) → version_count(desc) → char_count(desc) → id(asc)"""
def key(s: Story) -> tuple:
sid = str(s.id)
m = story_meta.get(sid) or {}
vc = int(m.get("version_count", 0))
cc = int(m.get("char_count", 0))
ua = getattr(s, "updated_at", None)
ts = 0.0
if ua is not None:
if ua.tzinfo is None:
ua = ua.replace(tzinfo=timezone.utc)
ts = ua.timestamp()
return (
not _has_usable_summary(s, summary_min_chars),
-ts,
-vc,
-cc,
sid,
)
return sorted(stories, key=key)
def _build_full_row(
s: Story,
story_meta: dict[str, dict[str, int]],
*,
summary_min_chars: int,
body_max_chars: int,
head_chars: int,
tail_chars: int,
) -> dict[str, Any]:
sid = str(s.id)
meta = story_meta.get(sid) or {}
canon = (s.canonical_markdown or "").strip()
char_count = int(meta.get("char_count", len(canon)))
version_count = int(meta.get("version_count", 0))
row: dict[str, Any] = {
"id": s.id,
"title": s.title,
"char_count": char_count,
"version_count": version_count,
"updated_at": _updated_at_iso(s),
"linked_chapters": _linked_chapters(s),
}
if _has_usable_summary(s, summary_min_chars):
row["summary"] = (getattr(s, "summary", None) or "").strip()
return row
body = _truncate_body_for_route(
canon,
body_max_chars=body_max_chars,
head_chars=head_chars,
tail_chars=tail_chars,
)
if body:
row["body_for_route"] = body
osnip = _plain_opening_snippet_from_markdown(canon, max_chars=260)
if osnip and len(osnip) >= 40:
row["opening_snippet"] = osnip
return row
def _build_index_row(
s: Story,
story_meta: dict[str, dict[str, int]],
*,
preview_chars: int,
) -> dict[str, Any]:
sid = str(s.id)
meta = story_meta.get(sid) or {}
canon = (s.canonical_markdown or "").strip().replace("\n", " ")
preview = canon[:preview_chars] + ("" if len(canon) > preview_chars else "")
char_count = int(meta.get("char_count", len((s.canonical_markdown or "").strip())))
return {
"id": s.id,
"title": s.title,
"char_count": char_count,
"preview": preview,
}
def _rows_json_len(rows: list[dict[str, Any]]) -> int:
return len(json.dumps(rows, ensure_ascii=False))
def apply_total_budget_downgrade(
rows: list[dict[str, Any]],
*,
stories_by_id: dict[str, Story],
story_meta: dict[str, dict[str, int]],
total_max_chars: int,
index_preview_chars: int,
) -> list[dict[str, Any]]:
"""从列表尾部(低优先级)起将整行降级为索引行,直到 JSON 总长不超过预算。"""
out = [dict(r) for r in rows]
def _is_index_row(r: dict[str, Any]) -> bool:
return "preview" in r and "summary" not in r and "body_for_route" not in r
while _rows_json_len(out) > total_max_chars:
replaced = False
for i in range(len(out) - 1, -1, -1):
sid = str(out[i].get("id", ""))
st = stories_by_id.get(sid)
if st is None or _is_index_row(out[i]):
continue
out[i] = _build_index_row(
st,
story_meta,
preview_chars=index_preview_chars,
)
replaced = True
break
if not replaced:
break
return out
def build_route_candidate_rows(
stories: list[Story],
story_meta: dict[str, dict[str, int]] | None,
settings: "Settings",
) -> list[dict[str, Any]]:
"""排序 + 完整候选行(尚未做总预算降级)。"""
meta = story_meta or {}
summary_min = int(settings.story_route_summary_min_chars)
ordered = sort_stories_for_route(stories, meta, summary_min_chars=summary_min)
body_max = int(settings.story_route_candidate_body_max_chars)
head_c = int(settings.story_route_long_body_head_chars)
tail_c = int(settings.story_route_long_body_tail_chars)
rows: list[dict[str, Any]] = []
for s in ordered:
rows.append(
_build_full_row(
s,
meta,
summary_min_chars=summary_min,
body_max_chars=body_max,
head_chars=head_c,
tail_chars=tail_c,
)
)
by_id = {str(s.id): s for s in ordered}
total_max = int(settings.story_route_candidate_total_max_chars)
index_prev = int(settings.story_route_index_preview_chars)
return apply_total_budget_downgrade(
rows,
stories_by_id=by_id,
story_meta=meta,
total_max_chars=total_max,
index_preview_chars=index_prev,
)
def build_route_candidate_json(
stories: list[Story],
story_meta: dict[str, dict[str, int]] | None,
settings: "Settings",
) -> str:
rows = build_route_candidate_rows(stories, story_meta, settings)
return json.dumps(rows, ensure_ascii=False, indent=2)