配置 SSOT(TOML + .env) 统一错误契约 Auth 与事务边界 Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client 可观测性(OpenTelemetry + LGTM)
253 lines
7.4 KiB
Python
253 lines
7.4 KiB
Python
"""
|
||
Story 路由:候选故事 JSON 载荷(summary 优先、预算裁剪、固定排序)。
|
||
|
||
供 StoryRouteAgent 与单测复用。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import re
|
||
from datetime import timezone
|
||
from typing import TYPE_CHECKING, Any
|
||
|
||
if TYPE_CHECKING:
|
||
from app.core.config import Settings
|
||
|
||
from app.features.story.models import Story
|
||
from app.features.story.constants import story
|
||
|
||
_PLAIN_SNIPPET_NOISE = re.compile(r"[`*_#]+")
|
||
|
||
|
||
def _plain_opening_snippet_from_markdown(md: str, *, max_chars: int) -> str:
|
||
"""无 summary 时供路由辨题的短文摘(弱化 Markdown 噪声)。"""
|
||
t = (md or "").strip()
|
||
if not t:
|
||
return ""
|
||
t = re.sub(r"!\[[^\]]*\]\([^)]+\)", "", t)
|
||
t = re.sub(r"\[([^\]]+)\]\([^)]+\)", r"\1", t)
|
||
t = re.sub(r"asset://\S+", "", t)
|
||
t = _PLAIN_SNIPPET_NOISE.sub("", t)
|
||
t = re.sub(r"\s+", " ", t).strip()
|
||
if len(t) <= max_chars:
|
||
return t
|
||
return t[: max_chars - 1] + "…"
|
||
|
||
|
||
def _linked_chapters(s: Story) -> list[str]:
|
||
links: list[str] = []
|
||
for cl in getattr(s, "chapter_links", None) or []:
|
||
ch = getattr(cl, "chapter", None)
|
||
if ch is None:
|
||
continue
|
||
cat = getattr(ch, "category", None) or ""
|
||
tit = getattr(ch, "title", None) or ""
|
||
links.append(f"{tit}({cat})")
|
||
return links
|
||
|
||
|
||
def _updated_at_iso(s: Story) -> str:
|
||
ua = getattr(s, "updated_at", None)
|
||
if ua is None:
|
||
return ""
|
||
if ua.tzinfo is None:
|
||
ua = ua.replace(tzinfo=timezone.utc)
|
||
return ua.isoformat()
|
||
|
||
|
||
def _has_usable_summary(s: Story, summary_min_len: int) -> bool:
|
||
t = (getattr(s, "summary", None) or "").strip()
|
||
return len(t) >= summary_min_len
|
||
|
||
|
||
def _truncate_body_for_route(
|
||
md: str,
|
||
*,
|
||
body_max_chars: int,
|
||
head_chars: int,
|
||
tail_chars: int,
|
||
) -> str:
|
||
"""单篇正文进入路由 prompt 的裁剪:尽量全文,否则 head+tail。"""
|
||
m = (md or "").strip()
|
||
if not m:
|
||
return ""
|
||
if len(m) <= body_max_chars:
|
||
return m
|
||
hc = max(1, min(head_chars, body_max_chars // 2))
|
||
tc = max(1, min(tail_chars, body_max_chars // 2))
|
||
mid_omit = len(m) - hc - tc
|
||
if mid_omit <= 0:
|
||
return m[:body_max_chars]
|
||
return f"{m[:hc]}\n…(中间省略 {mid_omit} 字)…\n{m[-tc:]}"
|
||
|
||
|
||
def sort_stories_for_route(
|
||
stories: list[Story],
|
||
story_meta: dict[str, dict[str, int]],
|
||
*,
|
||
summary_min_chars: int,
|
||
) -> list[Story]:
|
||
"""has_summary(desc) → updated_at(desc) → version_count(desc) → char_count(desc) → id(asc)"""
|
||
|
||
def key(s: Story) -> tuple:
|
||
sid = str(s.id)
|
||
m = story_meta.get(sid) or {}
|
||
vc = int(m.get("version_count", 0))
|
||
cc = int(m.get("char_count", 0))
|
||
ua = getattr(s, "updated_at", None)
|
||
ts = 0.0
|
||
if ua is not None:
|
||
if ua.tzinfo is None:
|
||
ua = ua.replace(tzinfo=timezone.utc)
|
||
ts = ua.timestamp()
|
||
return (
|
||
not _has_usable_summary(s, summary_min_chars),
|
||
-ts,
|
||
-vc,
|
||
-cc,
|
||
sid,
|
||
)
|
||
|
||
return sorted(stories, key=key)
|
||
|
||
|
||
def _build_full_row(
|
||
s: Story,
|
||
story_meta: dict[str, dict[str, int]],
|
||
*,
|
||
summary_min_chars: int,
|
||
body_max_chars: int,
|
||
head_chars: int,
|
||
tail_chars: int,
|
||
) -> dict[str, Any]:
|
||
sid = str(s.id)
|
||
meta = story_meta.get(sid) or {}
|
||
canon = (s.canonical_markdown or "").strip()
|
||
char_count = int(meta.get("char_count", len(canon)))
|
||
version_count = int(meta.get("version_count", 0))
|
||
row: dict[str, Any] = {
|
||
"id": s.id,
|
||
"title": s.title,
|
||
"char_count": char_count,
|
||
"version_count": version_count,
|
||
"updated_at": _updated_at_iso(s),
|
||
"linked_chapters": _linked_chapters(s),
|
||
}
|
||
if _has_usable_summary(s, summary_min_chars):
|
||
row["summary"] = (getattr(s, "summary", None) or "").strip()
|
||
return row
|
||
body = _truncate_body_for_route(
|
||
canon,
|
||
body_max_chars=body_max_chars,
|
||
head_chars=head_chars,
|
||
tail_chars=tail_chars,
|
||
)
|
||
if body:
|
||
row["body_for_route"] = body
|
||
osnip = _plain_opening_snippet_from_markdown(canon, max_chars=260)
|
||
if osnip and len(osnip) >= 40:
|
||
row["opening_snippet"] = osnip
|
||
return row
|
||
|
||
|
||
def _build_index_row(
|
||
s: Story,
|
||
story_meta: dict[str, dict[str, int]],
|
||
*,
|
||
preview_chars: int,
|
||
) -> dict[str, Any]:
|
||
sid = str(s.id)
|
||
meta = story_meta.get(sid) or {}
|
||
canon = (s.canonical_markdown or "").strip().replace("\n", " ")
|
||
preview = canon[:preview_chars] + ("…" if len(canon) > preview_chars else "")
|
||
char_count = int(meta.get("char_count", len((s.canonical_markdown or "").strip())))
|
||
return {
|
||
"id": s.id,
|
||
"title": s.title,
|
||
"char_count": char_count,
|
||
"preview": preview,
|
||
}
|
||
|
||
|
||
def _rows_json_len(rows: list[dict[str, Any]]) -> int:
|
||
return len(json.dumps(rows, ensure_ascii=False))
|
||
|
||
|
||
def apply_total_budget_downgrade(
|
||
rows: list[dict[str, Any]],
|
||
*,
|
||
stories_by_id: dict[str, Story],
|
||
story_meta: dict[str, dict[str, int]],
|
||
total_max_chars: int,
|
||
index_preview_chars: int,
|
||
) -> list[dict[str, Any]]:
|
||
"""从列表尾部(低优先级)起将整行降级为索引行,直到 JSON 总长不超过预算。"""
|
||
out = [dict(r) for r in rows]
|
||
|
||
def _is_index_row(r: dict[str, Any]) -> bool:
|
||
return "preview" in r and "summary" not in r and "body_for_route" not in r
|
||
|
||
while _rows_json_len(out) > total_max_chars:
|
||
replaced = False
|
||
for i in range(len(out) - 1, -1, -1):
|
||
sid = str(out[i].get("id", ""))
|
||
st = stories_by_id.get(sid)
|
||
if st is None or _is_index_row(out[i]):
|
||
continue
|
||
out[i] = _build_index_row(
|
||
st,
|
||
story_meta,
|
||
preview_chars=index_preview_chars,
|
||
)
|
||
replaced = True
|
||
break
|
||
if not replaced:
|
||
break
|
||
return out
|
||
|
||
|
||
def build_route_candidate_rows(
|
||
stories: list[Story],
|
||
story_meta: dict[str, dict[str, int]] | None,
|
||
settings: "Settings",
|
||
) -> list[dict[str, Any]]:
|
||
"""排序 + 完整候选行(尚未做总预算降级)。"""
|
||
meta = story_meta or {}
|
||
summary_min = int(story.route_summary_min_chars)
|
||
ordered = sort_stories_for_route(stories, meta, summary_min_chars=summary_min)
|
||
body_max = int(story.route_candidate_body_max_chars)
|
||
head_c = int(story.route_long_body_head_chars)
|
||
tail_c = int(story.route_long_body_tail_chars)
|
||
rows: list[dict[str, Any]] = []
|
||
for s in ordered:
|
||
rows.append(
|
||
_build_full_row(
|
||
s,
|
||
meta,
|
||
summary_min_chars=summary_min,
|
||
body_max_chars=body_max,
|
||
head_chars=head_c,
|
||
tail_chars=tail_c,
|
||
)
|
||
)
|
||
by_id = {str(s.id): s for s in ordered}
|
||
total_max = int(story.route_candidate_total_max_chars)
|
||
index_prev = int(story.route_index_preview_chars)
|
||
return apply_total_budget_downgrade(
|
||
rows,
|
||
stories_by_id=by_id,
|
||
story_meta=meta,
|
||
total_max_chars=total_max,
|
||
index_preview_chars=index_prev,
|
||
)
|
||
|
||
|
||
def build_route_candidate_json(
|
||
stories: list[Story],
|
||
story_meta: dict[str, dict[str, int]] | None,
|
||
settings: "Settings",
|
||
) -> str:
|
||
rows = build_route_candidate_rows(stories, story_meta, settings)
|
||
return json.dumps(rows, ensure_ascii=False, indent=2)
|