Files
life-echo/api/app/agents/memoir/story_route_payload.py
Sully 53e0065e3e refactor(api): TOML 配置 SSOT、统一错误契约、Auth/事务加固与可观测性 (#33)
配置 SSOT(TOML + .env)
统一错误契约
Auth 与事务边界
Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client
可观测性(OpenTelemetry + LGTM)
2026-05-22 13:44:50 +08:00

253 lines
7.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Story 路由:候选故事 JSON 载荷summary 优先、预算裁剪、固定排序)。
供 StoryRouteAgent 与单测复用。
"""
from __future__ import annotations
import json
import re
from datetime import timezone
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from app.core.config import Settings
from app.features.story.models import Story
from app.features.story.constants import story
_PLAIN_SNIPPET_NOISE = re.compile(r"[`*_#]+")
def _plain_opening_snippet_from_markdown(md: str, *, max_chars: int) -> str:
"""无 summary 时供路由辨题的短文摘(弱化 Markdown 噪声)。"""
t = (md or "").strip()
if not t:
return ""
t = re.sub(r"!\[[^\]]*\]\([^)]+\)", "", t)
t = re.sub(r"\[([^\]]+)\]\([^)]+\)", r"\1", t)
t = re.sub(r"asset://\S+", "", t)
t = _PLAIN_SNIPPET_NOISE.sub("", t)
t = re.sub(r"\s+", " ", t).strip()
if len(t) <= max_chars:
return t
return t[: max_chars - 1] + ""
def _linked_chapters(s: Story) -> list[str]:
links: list[str] = []
for cl in getattr(s, "chapter_links", None) or []:
ch = getattr(cl, "chapter", None)
if ch is None:
continue
cat = getattr(ch, "category", None) or ""
tit = getattr(ch, "title", None) or ""
links.append(f"{tit}({cat})")
return links
def _updated_at_iso(s: Story) -> str:
ua = getattr(s, "updated_at", None)
if ua is None:
return ""
if ua.tzinfo is None:
ua = ua.replace(tzinfo=timezone.utc)
return ua.isoformat()
def _has_usable_summary(s: Story, summary_min_len: int) -> bool:
t = (getattr(s, "summary", None) or "").strip()
return len(t) >= summary_min_len
def _truncate_body_for_route(
md: str,
*,
body_max_chars: int,
head_chars: int,
tail_chars: int,
) -> str:
"""单篇正文进入路由 prompt 的裁剪:尽量全文,否则 head+tail。"""
m = (md or "").strip()
if not m:
return ""
if len(m) <= body_max_chars:
return m
hc = max(1, min(head_chars, body_max_chars // 2))
tc = max(1, min(tail_chars, body_max_chars // 2))
mid_omit = len(m) - hc - tc
if mid_omit <= 0:
return m[:body_max_chars]
return f"{m[:hc]}\n…(中间省略 {mid_omit} 字)…\n{m[-tc:]}"
def sort_stories_for_route(
stories: list[Story],
story_meta: dict[str, dict[str, int]],
*,
summary_min_chars: int,
) -> list[Story]:
"""has_summary(desc) → updated_at(desc) → version_count(desc) → char_count(desc) → id(asc)"""
def key(s: Story) -> tuple:
sid = str(s.id)
m = story_meta.get(sid) or {}
vc = int(m.get("version_count", 0))
cc = int(m.get("char_count", 0))
ua = getattr(s, "updated_at", None)
ts = 0.0
if ua is not None:
if ua.tzinfo is None:
ua = ua.replace(tzinfo=timezone.utc)
ts = ua.timestamp()
return (
not _has_usable_summary(s, summary_min_chars),
-ts,
-vc,
-cc,
sid,
)
return sorted(stories, key=key)
def _build_full_row(
s: Story,
story_meta: dict[str, dict[str, int]],
*,
summary_min_chars: int,
body_max_chars: int,
head_chars: int,
tail_chars: int,
) -> dict[str, Any]:
sid = str(s.id)
meta = story_meta.get(sid) or {}
canon = (s.canonical_markdown or "").strip()
char_count = int(meta.get("char_count", len(canon)))
version_count = int(meta.get("version_count", 0))
row: dict[str, Any] = {
"id": s.id,
"title": s.title,
"char_count": char_count,
"version_count": version_count,
"updated_at": _updated_at_iso(s),
"linked_chapters": _linked_chapters(s),
}
if _has_usable_summary(s, summary_min_chars):
row["summary"] = (getattr(s, "summary", None) or "").strip()
return row
body = _truncate_body_for_route(
canon,
body_max_chars=body_max_chars,
head_chars=head_chars,
tail_chars=tail_chars,
)
if body:
row["body_for_route"] = body
osnip = _plain_opening_snippet_from_markdown(canon, max_chars=260)
if osnip and len(osnip) >= 40:
row["opening_snippet"] = osnip
return row
def _build_index_row(
s: Story,
story_meta: dict[str, dict[str, int]],
*,
preview_chars: int,
) -> dict[str, Any]:
sid = str(s.id)
meta = story_meta.get(sid) or {}
canon = (s.canonical_markdown or "").strip().replace("\n", " ")
preview = canon[:preview_chars] + ("" if len(canon) > preview_chars else "")
char_count = int(meta.get("char_count", len((s.canonical_markdown or "").strip())))
return {
"id": s.id,
"title": s.title,
"char_count": char_count,
"preview": preview,
}
def _rows_json_len(rows: list[dict[str, Any]]) -> int:
return len(json.dumps(rows, ensure_ascii=False))
def apply_total_budget_downgrade(
rows: list[dict[str, Any]],
*,
stories_by_id: dict[str, Story],
story_meta: dict[str, dict[str, int]],
total_max_chars: int,
index_preview_chars: int,
) -> list[dict[str, Any]]:
"""从列表尾部(低优先级)起将整行降级为索引行,直到 JSON 总长不超过预算。"""
out = [dict(r) for r in rows]
def _is_index_row(r: dict[str, Any]) -> bool:
return "preview" in r and "summary" not in r and "body_for_route" not in r
while _rows_json_len(out) > total_max_chars:
replaced = False
for i in range(len(out) - 1, -1, -1):
sid = str(out[i].get("id", ""))
st = stories_by_id.get(sid)
if st is None or _is_index_row(out[i]):
continue
out[i] = _build_index_row(
st,
story_meta,
preview_chars=index_preview_chars,
)
replaced = True
break
if not replaced:
break
return out
def build_route_candidate_rows(
stories: list[Story],
story_meta: dict[str, dict[str, int]] | None,
settings: "Settings",
) -> list[dict[str, Any]]:
"""排序 + 完整候选行(尚未做总预算降级)。"""
meta = story_meta or {}
summary_min = int(story.route_summary_min_chars)
ordered = sort_stories_for_route(stories, meta, summary_min_chars=summary_min)
body_max = int(story.route_candidate_body_max_chars)
head_c = int(story.route_long_body_head_chars)
tail_c = int(story.route_long_body_tail_chars)
rows: list[dict[str, Any]] = []
for s in ordered:
rows.append(
_build_full_row(
s,
meta,
summary_min_chars=summary_min,
body_max_chars=body_max,
head_chars=head_c,
tail_chars=tail_c,
)
)
by_id = {str(s.id): s for s in ordered}
total_max = int(story.route_candidate_total_max_chars)
index_prev = int(story.route_index_preview_chars)
return apply_total_budget_downgrade(
rows,
stories_by_id=by_id,
story_meta=meta,
total_max_chars=total_max,
index_preview_chars=index_prev,
)
def build_route_candidate_json(
stories: list[Story],
story_meta: dict[str, dict[str, int]] | None,
settings: "Settings",
) -> str:
rows = build_route_candidate_rows(stories, story_meta, settings)
return json.dumps(rows, ensure_ascii=False, indent=2)