Files
life-echo/api/app/agents/memoir/story_route_payload.py

231 lines
6.7 KiB
Python
Raw Normal View History

"""
Story 路由候选故事 JSON 载荷summary 优先预算裁剪固定排序
StoryRouteAgent 与单测复用
"""
from __future__ import annotations
import json
from datetime import timezone
from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
from app.core.config import Settings
from app.features.story.models import Story
def _linked_chapters(s: Story) -> list[str]:
links: list[str] = []
for cl in getattr(s, "chapter_links", None) or []:
ch = getattr(cl, "chapter", None)
if ch is None:
continue
cat = getattr(ch, "category", None) or ""
tit = getattr(ch, "title", None) or ""
links.append(f"{tit}({cat})")
return links
def _updated_at_iso(s: Story) -> str:
ua = getattr(s, "updated_at", None)
if ua is None:
return ""
if ua.tzinfo is None:
ua = ua.replace(tzinfo=timezone.utc)
return ua.isoformat()
def _has_usable_summary(s: Story, summary_min_len: int) -> bool:
t = (getattr(s, "summary", None) or "").strip()
return len(t) >= summary_min_len
def _truncate_body_for_route(
md: str,
*,
body_max_chars: int,
head_chars: int,
tail_chars: int,
) -> str:
"""单篇正文进入路由 prompt 的裁剪:尽量全文,否则 head+tail。"""
m = (md or "").strip()
if not m:
return ""
if len(m) <= body_max_chars:
return m
hc = max(1, min(head_chars, body_max_chars // 2))
tc = max(1, min(tail_chars, body_max_chars // 2))
mid_omit = len(m) - hc - tc
if mid_omit <= 0:
return m[:body_max_chars]
return f"{m[:hc]}\n…(中间省略 {mid_omit} 字)…\n{m[-tc:]}"
def sort_stories_for_route(
stories: list[Story],
story_meta: dict[str, dict[str, int]],
*,
summary_min_chars: int,
) -> list[Story]:
"""has_summary(desc) → updated_at(desc) → version_count(desc) → char_count(desc) → id(asc)"""
def key(s: Story) -> tuple:
sid = str(s.id)
m = story_meta.get(sid) or {}
vc = int(m.get("version_count", 0))
cc = int(m.get("char_count", 0))
ua = getattr(s, "updated_at", None)
ts = 0.0
if ua is not None:
if ua.tzinfo is None:
ua = ua.replace(tzinfo=timezone.utc)
ts = ua.timestamp()
return (
not _has_usable_summary(s, summary_min_chars),
-ts,
-vc,
-cc,
sid,
)
return sorted(stories, key=key)
def _build_full_row(
s: Story,
story_meta: dict[str, dict[str, int]],
*,
summary_min_chars: int,
body_max_chars: int,
head_chars: int,
tail_chars: int,
) -> dict[str, Any]:
sid = str(s.id)
meta = story_meta.get(sid) or {}
canon = (s.canonical_markdown or "").strip()
char_count = int(meta.get("char_count", len(canon)))
version_count = int(meta.get("version_count", 0))
row: dict[str, Any] = {
"id": s.id,
"title": s.title,
"char_count": char_count,
"version_count": version_count,
"updated_at": _updated_at_iso(s),
"linked_chapters": _linked_chapters(s),
}
if _has_usable_summary(s, summary_min_chars):
row["summary"] = (getattr(s, "summary", None) or "").strip()
return row
body = _truncate_body_for_route(
canon,
body_max_chars=body_max_chars,
head_chars=head_chars,
tail_chars=tail_chars,
)
if body:
row["body_for_route"] = body
return row
def _build_index_row(
s: Story,
story_meta: dict[str, dict[str, int]],
*,
preview_chars: int,
) -> dict[str, Any]:
sid = str(s.id)
meta = story_meta.get(sid) or {}
canon = (s.canonical_markdown or "").strip().replace("\n", " ")
preview = canon[:preview_chars] + ("" if len(canon) > preview_chars else "")
char_count = int(meta.get("char_count", len((s.canonical_markdown or "").strip())))
return {
"id": s.id,
"title": s.title,
"char_count": char_count,
"preview": preview,
}
def _rows_json_len(rows: list[dict[str, Any]]) -> int:
return len(json.dumps(rows, ensure_ascii=False))
def apply_total_budget_downgrade(
rows: list[dict[str, Any]],
*,
stories_by_id: dict[str, Story],
story_meta: dict[str, dict[str, int]],
total_max_chars: int,
index_preview_chars: int,
) -> list[dict[str, Any]]:
"""从列表尾部(低优先级)起将整行降级为索引行,直到 JSON 总长不超过预算。"""
out = [dict(r) for r in rows]
def _is_index_row(r: dict[str, Any]) -> bool:
return "preview" in r and "summary" not in r and "body_for_route" not in r
while _rows_json_len(out) > total_max_chars:
replaced = False
for i in range(len(out) - 1, -1, -1):
sid = str(out[i].get("id", ""))
st = stories_by_id.get(sid)
if st is None or _is_index_row(out[i]):
continue
out[i] = _build_index_row(
st,
story_meta,
preview_chars=index_preview_chars,
)
replaced = True
break
if not replaced:
break
return out
def build_route_candidate_rows(
stories: list[Story],
story_meta: dict[str, dict[str, int]] | None,
settings: "Settings",
) -> list[dict[str, Any]]:
"""排序 + 完整候选行(尚未做总预算降级)。"""
meta = story_meta or {}
summary_min = int(settings.story_route_summary_min_chars)
ordered = sort_stories_for_route(stories, meta, summary_min_chars=summary_min)
body_max = int(settings.story_route_candidate_body_max_chars)
head_c = int(settings.story_route_long_body_head_chars)
tail_c = int(settings.story_route_long_body_tail_chars)
rows: list[dict[str, Any]] = []
for s in ordered:
rows.append(
_build_full_row(
s,
meta,
summary_min_chars=summary_min,
body_max_chars=body_max,
head_chars=head_c,
tail_chars=tail_c,
)
)
by_id = {str(s.id): s for s in ordered}
total_max = int(settings.story_route_candidate_total_max_chars)
index_prev = int(settings.story_route_index_preview_chars)
return apply_total_budget_downgrade(
rows,
stories_by_id=by_id,
story_meta=meta,
total_max_chars=total_max,
index_preview_chars=index_prev,
)
def build_route_candidate_json(
stories: list[Story],
story_meta: dict[str, dict[str, int]] | None,
settings: "Settings",
) -> str:
rows = build_route_candidate_rows(stories, story_meta, settings)
return json.dumps(rows, ensure_ascii=False, indent=2)