Files
life-echo/api/app/agents/memoir/story_route_agent.py
Sully 53e0065e3e refactor(api): TOML 配置 SSOT、统一错误契约、Auth/事务加固与可观测性 (#33)
配置 SSOT(TOML + .env)
统一错误契约
Auth 与事务边界
Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client
可观测性(OpenTelemetry + LGTM)
2026-05-22 13:44:50 +08:00

321 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
StoryRouteAgentCelery 批次内判断 new_story vs append_storyJSON
"""
from __future__ import annotations
import json
from typing import Any, Literal
from pydantic import BaseModel, field_validator
from app.agents.memoir.prompts import (
get_story_batch_plan_prompt,
get_story_route_prompt,
)
from app.agents.memoir.story_route_payload import (
build_route_candidate_json,
sort_stories_for_route,
)
from app.core.config import settings
from app.core.llm_call import LLMCallError, llm_json_call
from app.core.logging import get_logger
from app.features.story.models import Story
from app.features.memoir.constants import memoir
from app.features.story.constants import story
logger = get_logger(__name__)
# 超过此数量跳过批量规划(单次路由),避免 prompt 过大
PLAN_BATCH_MAX_SEGMENTS = 48
# 童年 / 求学 / 家庭:模型与后处理均倾向「少拆分、优先续写」
APPEND_FIRST_CHAPTER_CATEGORIES = frozenset({"childhood", "education", "family"})
# These route outcomes are conservative fail-safes, not semantic append matches.
FALLBACK_NEW_STORY_REASONS = frozenset({"no_llm", "parse_error", "invalid_target"})
def default_append_target_story_id(
candidate_stories: list[Story],
story_meta: dict[str, dict[str, int]] | None,
settings: Any,
) -> str | None:
"""排序后的首选续写目标(与路由候选 JSON 顺序一致)。"""
if not candidate_stories:
return None
meta = story_meta or {}
ordered = sort_stories_for_route(
candidate_stories,
meta,
summary_min_chars=int(story.route_summary_min_chars),
)
if not ordered:
return None
return str(ordered[0].id)
def merge_consecutive_new_story_units(
units: list[StoryBatchPlanUnit],
) -> list[StoryBatchPlanUnit]:
"""将相邻的多个 new_story 单元合并为一个,减少同批碎片叙事。"""
if not units:
return units
out: list[StoryBatchPlanUnit] = []
i = 0
while i < len(units):
u = units[i]
if u.decision != "new_story":
out.append(u)
i += 1
continue
run_segs: list[str] = list(u.segment_ids)
j = i + 1
while j < len(units) and units[j].decision == "new_story":
run_segs.extend(units[j].segment_ids)
j += 1
if j > i + 1:
out.append(
StoryBatchPlanUnit(
segment_ids=run_segs,
decision="new_story",
target_story_id=None,
reason="coalesced_consecutive_new_story",
)
)
else:
out.append(u)
i = j
return out
def normalize_batch_plan_reduce_new_story_fragmentation(
plan: StoryBatchPlan,
ordered_segment_ids: list[str],
*,
chapter_category: str,
candidate_stories: list[Story],
valid_story_ids: set[str],
story_meta: dict[str, dict[str, int]] | None,
settings: Any,
) -> StoryBatchPlan:
"""
LLM 校验通过后的确定性归一:合并相邻 new_story在 append-first 类目下若整批只有一个 new 块则改为 append。
"""
units = merge_consecutive_new_story_units(list(plan.units))
if (
chapter_category in APPEND_FIRST_CHAPTER_CATEGORIES
and candidate_stories
and len(units) == 1
and units[0].decision == "new_story"
):
tid = default_append_target_story_id(candidate_stories, story_meta, settings)
if tid and tid in valid_story_ids:
units = [
StoryBatchPlanUnit(
segment_ids=list(ordered_segment_ids),
decision="append_story",
target_story_id=tid,
reason="append_first_whole_batch_fallback",
)
]
candidate = StoryBatchPlan(units=units)
ok, err = validate_story_batch_plan(ordered_segment_ids, candidate, valid_story_ids)
if not ok:
logger.warning(
"batch_plan_normalize_revalidate_failed err={} keep_original",
err,
)
return plan
return candidate
class StoryBatchPlanUnit(BaseModel):
"""批量写入中的一个单元(连续 segment 块)。"""
segment_ids: list[str]
decision: Literal["new_story", "append_story"]
target_story_id: str | None = None
new_story_title: str | None = None
reason: str | None = None
@field_validator("target_story_id", mode="before")
@classmethod
def empty_str_to_none_tid(cls, v: Any) -> str | None:
if v is None or v == "":
return None
if isinstance(v, str):
return v.strip() or None
return str(v)
class StoryBatchPlan(BaseModel):
units: list[StoryBatchPlanUnit]
class StoryRouteDecision(BaseModel):
decision: Literal["new_story", "append_story"]
target_story_id: str | None = None
new_story_title: str | None = None
reason: str | None = None
@field_validator("target_story_id", mode="before")
@classmethod
def empty_str_to_none(cls, v: Any) -> str | None:
if v is None or v == "":
return None
if isinstance(v, str):
return v.strip() or None
return str(v)
def _build_segments_json_for_plan(
segments: list[tuple[str, str]], *, text_preview_chars: int = 4000
) -> str:
"""segments: (id, user_input_text) 按口述顺序。"""
rows: list[dict[str, str]] = []
for sid, text in segments:
t = (text or "").strip()
if len(t) > text_preview_chars:
t = t[:text_preview_chars] + ""
rows.append({"id": sid, "text": t})
return json.dumps(rows, ensure_ascii=False, indent=2)
def validate_story_batch_plan(
ordered_segment_ids: list[str],
plan: StoryBatchPlan,
valid_story_ids: set[str],
) -> tuple[bool, str | None]:
"""
校验segment 全覆盖、顺序一致、append 目标合法。
标题由 NarrativeAgent 延迟生成,路由阶段不再要求 new_story_title。
返回 (ok, error_code)。
"""
if not plan.units:
return False, "empty_units"
flat: list[str] = []
for u in plan.units:
if not u.segment_ids:
return False, "empty_unit_segment_ids"
flat.extend(u.segment_ids)
if len(flat) != len(set(flat)):
return False, "duplicate_segment"
if flat != ordered_segment_ids:
return False, "segment_mismatch"
for u in plan.units:
if u.decision == "append_story":
tid = u.target_story_id
if not tid or tid not in valid_story_ids:
return False, "invalid_append_target"
return True, None
class StoryRouteAgent:
def decide(
self,
*,
chapter_category: str,
chapter_title: str,
batch_transcript: str,
candidate_stories: list[Story],
llm: Any,
valid_story_ids: set[str],
story_meta: dict[str, dict[str, int]] | None = None,
) -> StoryRouteDecision:
if not llm:
return StoryRouteDecision(
decision="new_story",
new_story_title=None,
reason="no_llm",
)
payload = build_route_candidate_json(candidate_stories, story_meta, settings)
prompt = get_story_route_prompt(
chapter_category=chapter_category,
chapter_title=chapter_title,
batch_transcript=batch_transcript,
candidate_stories_json=payload,
)
def _decide_fallback() -> StoryRouteDecision:
return StoryRouteDecision(
decision="new_story",
new_story_title=None,
reason="parse_error",
)
decision = llm_json_call(
llm,
prompt,
StoryRouteDecision,
max_tokens=memoir.story_route_max_tokens,
agent="StoryRouteAgent.decide",
fallback_factory=_decide_fallback,
)
if decision.decision == "append_story":
tid = decision.target_story_id
if not tid or tid not in valid_story_ids:
logger.warning(
"StoryRoute append 无效 target_story_id={},回退 new_story",
tid,
)
return StoryRouteDecision(
decision="new_story",
new_story_title=decision.new_story_title,
reason="invalid_target",
)
return decision
def plan_batch(
self,
*,
chapter_category: str,
chapter_title: str,
segments: list[tuple[str, str]],
candidate_stories: list[Story],
llm: Any,
valid_story_ids: set[str],
story_meta: dict[str, dict[str, int]] | None = None,
) -> StoryBatchPlan | None:
"""
将本批 segment 划分为多个写入单元。解析失败返回 None由调用方回退 decide()。
"""
if not llm or len(segments) < 2:
return None
payload = build_route_candidate_json(candidate_stories, story_meta, settings)
segments_json = _build_segments_json_for_plan(segments)
prompt = get_story_batch_plan_prompt(
chapter_category=chapter_category,
chapter_title=chapter_title,
segments_json=segments_json,
candidate_stories_json=payload,
)
try:
plan = llm_json_call(
llm,
prompt,
StoryBatchPlan,
max_tokens=memoir.story_batch_plan_max_tokens,
agent="StoryRouteAgent.plan_batch",
)
except LLMCallError as e:
logger.warning("StoryRouteAgent.plan_batch 解析失败: {}", e)
return None
ordered = [s[0] for s in segments]
ok, err = validate_story_batch_plan(ordered, plan, valid_story_ids)
if not ok:
logger.warning("StoryRouteAgent.plan_batch 校验失败: {}", err)
return None
return normalize_batch_plan_reduce_new_story_fragmentation(
plan,
ordered,
chapter_category=chapter_category,
candidate_stories=candidate_stories,
valid_story_ids=valid_story_ids,
story_meta=story_meta,
settings=settings,
)