""" Story 路由:候选故事 JSON 载荷(summary 优先、预算裁剪、固定排序)。 供 StoryRouteAgent 与单测复用。 """ from __future__ import annotations import json import re from datetime import timezone from typing import TYPE_CHECKING, Any if TYPE_CHECKING: from app.core.config import Settings from app.features.story.models import Story _PLAIN_SNIPPET_NOISE = re.compile(r"[`*_#]+") def _plain_opening_snippet_from_markdown(md: str, *, max_chars: int) -> str: """无 summary 时供路由辨题的短文摘(弱化 Markdown 噪声)。""" t = (md or "").strip() if not t: return "" t = re.sub(r"!\[[^\]]*\]\([^)]+\)", "", t) t = re.sub(r"\[([^\]]+)\]\([^)]+\)", r"\1", t) t = re.sub(r"asset://\S+", "", t) t = _PLAIN_SNIPPET_NOISE.sub("", t) t = re.sub(r"\s+", " ", t).strip() if len(t) <= max_chars: return t return t[: max_chars - 1] + "…" def _linked_chapters(s: Story) -> list[str]: links: list[str] = [] for cl in getattr(s, "chapter_links", None) or []: ch = getattr(cl, "chapter", None) if ch is None: continue cat = getattr(ch, "category", None) or "" tit = getattr(ch, "title", None) or "" links.append(f"{tit}({cat})") return links def _updated_at_iso(s: Story) -> str: ua = getattr(s, "updated_at", None) if ua is None: return "" if ua.tzinfo is None: ua = ua.replace(tzinfo=timezone.utc) return ua.isoformat() def _has_usable_summary(s: Story, summary_min_len: int) -> bool: t = (getattr(s, "summary", None) or "").strip() return len(t) >= summary_min_len def _truncate_body_for_route( md: str, *, body_max_chars: int, head_chars: int, tail_chars: int, ) -> str: """单篇正文进入路由 prompt 的裁剪:尽量全文,否则 head+tail。""" m = (md or "").strip() if not m: return "" if len(m) <= body_max_chars: return m hc = max(1, min(head_chars, body_max_chars // 2)) tc = max(1, min(tail_chars, body_max_chars // 2)) mid_omit = len(m) - hc - tc if mid_omit <= 0: return m[:body_max_chars] return f"{m[:hc]}\n…(中间省略 {mid_omit} 字)…\n{m[-tc:]}" def sort_stories_for_route( stories: list[Story], story_meta: dict[str, dict[str, int]], *, summary_min_chars: int, ) -> list[Story]: """has_summary(desc) → updated_at(desc) → version_count(desc) → char_count(desc) → id(asc)""" def key(s: Story) -> tuple: sid = str(s.id) m = story_meta.get(sid) or {} vc = int(m.get("version_count", 0)) cc = int(m.get("char_count", 0)) ua = getattr(s, "updated_at", None) ts = 0.0 if ua is not None: if ua.tzinfo is None: ua = ua.replace(tzinfo=timezone.utc) ts = ua.timestamp() return ( not _has_usable_summary(s, summary_min_chars), -ts, -vc, -cc, sid, ) return sorted(stories, key=key) def _build_full_row( s: Story, story_meta: dict[str, dict[str, int]], *, summary_min_chars: int, body_max_chars: int, head_chars: int, tail_chars: int, ) -> dict[str, Any]: sid = str(s.id) meta = story_meta.get(sid) or {} canon = (s.canonical_markdown or "").strip() char_count = int(meta.get("char_count", len(canon))) version_count = int(meta.get("version_count", 0)) row: dict[str, Any] = { "id": s.id, "title": s.title, "char_count": char_count, "version_count": version_count, "updated_at": _updated_at_iso(s), "linked_chapters": _linked_chapters(s), } if _has_usable_summary(s, summary_min_chars): row["summary"] = (getattr(s, "summary", None) or "").strip() return row body = _truncate_body_for_route( canon, body_max_chars=body_max_chars, head_chars=head_chars, tail_chars=tail_chars, ) if body: row["body_for_route"] = body osnip = _plain_opening_snippet_from_markdown(canon, max_chars=260) if osnip and len(osnip) >= 40: row["opening_snippet"] = osnip return row def _build_index_row( s: Story, story_meta: dict[str, dict[str, int]], *, preview_chars: int, ) -> dict[str, Any]: sid = str(s.id) meta = story_meta.get(sid) or {} canon = (s.canonical_markdown or "").strip().replace("\n", " ") preview = canon[:preview_chars] + ("…" if len(canon) > preview_chars else "") char_count = int(meta.get("char_count", len((s.canonical_markdown or "").strip()))) return { "id": s.id, "title": s.title, "char_count": char_count, "preview": preview, } def _rows_json_len(rows: list[dict[str, Any]]) -> int: return len(json.dumps(rows, ensure_ascii=False)) def apply_total_budget_downgrade( rows: list[dict[str, Any]], *, stories_by_id: dict[str, Story], story_meta: dict[str, dict[str, int]], total_max_chars: int, index_preview_chars: int, ) -> list[dict[str, Any]]: """从列表尾部(低优先级)起将整行降级为索引行,直到 JSON 总长不超过预算。""" out = [dict(r) for r in rows] def _is_index_row(r: dict[str, Any]) -> bool: return "preview" in r and "summary" not in r and "body_for_route" not in r while _rows_json_len(out) > total_max_chars: replaced = False for i in range(len(out) - 1, -1, -1): sid = str(out[i].get("id", "")) st = stories_by_id.get(sid) if st is None or _is_index_row(out[i]): continue out[i] = _build_index_row( st, story_meta, preview_chars=index_preview_chars, ) replaced = True break if not replaced: break return out def build_route_candidate_rows( stories: list[Story], story_meta: dict[str, dict[str, int]] | None, settings: "Settings", ) -> list[dict[str, Any]]: """排序 + 完整候选行(尚未做总预算降级)。""" meta = story_meta or {} summary_min = int(settings.story_route_summary_min_chars) ordered = sort_stories_for_route(stories, meta, summary_min_chars=summary_min) body_max = int(settings.story_route_candidate_body_max_chars) head_c = int(settings.story_route_long_body_head_chars) tail_c = int(settings.story_route_long_body_tail_chars) rows: list[dict[str, Any]] = [] for s in ordered: rows.append( _build_full_row( s, meta, summary_min_chars=summary_min, body_max_chars=body_max, head_chars=head_c, tail_chars=tail_c, ) ) by_id = {str(s.id): s for s in ordered} total_max = int(settings.story_route_candidate_total_max_chars) index_prev = int(settings.story_route_index_preview_chars) return apply_total_budget_downgrade( rows, stories_by_id=by_id, story_meta=meta, total_max_chars=total_max, index_preview_chars=index_prev, ) def build_route_candidate_json( stories: list[Story], story_meta: dict[str, dict[str, int]] | None, settings: "Settings", ) -> str: rows = build_route_candidate_rows(stories, story_meta, settings) return json.dumps(rows, ensure_ascii=False, indent=2)