Files
life-echo/api/app/features/memoir/story_pipeline_sync.py
Kevin bb16d3a5c9 refactor(agents): 抽取阶段常量与对话上下文;快档 LLM;图片 prompt 可禁止回退
访谈与阶段
- 新增 app/agents/stage_constants.py:集中 CHAT_STAGES、章节分类/顺序、阶段到默认 memoir 类别等,与 MemoirState 默认槽位顺序对齐;减少散落在 prompts 内的重复常量。
- 新增 app/agents/chat/prompt_context.py:以 ChatPromptContext 汇总 guided 系统提示所需字段(阶段、槽位、轮次、人设、记忆证据、回复长度模式、背景声线、职业等),统一走 get_guided_conversation_prompt。
- 大幅收敛 app/agents/chat/prompts_conversation.py;调整 prompts.py、stage_prompts.py、stage_detection.py;同步 interview_agent、profile_agent、helpers 与 state_schema,使对话侧构造提示的方式一致、可测。

回忆录流水线
- memoir/prompts.py 删除已迁至 stage_constants / 独立模板的大段常量与图片占位相关逻辑;classification / extraction / fidelity / narrative agents 与 orchest(全量历史仍可用于计数,注入模型时按轮次与字符上限截断)、image_prompt_fallback_disabled。
- dependencies 增加 get_llm_provider_fast(LRU 缓存,可与默认共用密钥与 base_url)。

任务与编排
- memoir_tasks:prepare_batches 注入 llm_fast;开启独立快档模型时打结构化日志。
- chapter_cover_tasks、story_image_tasks:与图片 prompt / JSON 工具路径或策略变更对齐(import 与行为一致)。
- story_pipeline_sync 等小处同步。

其它核心
- langchain_llm、text_normalize 随上述调用链微调。

开发者体验
- .cursor/settings.json:启用 redis-development、postman 插件。

测试
- 新增 test_image_prompt_policy:覆盖「禁止回退」等图片 prompt 策略。
- 更新 test_interview_prompts、test_interview_reply_length、test_experience_regressions、test_json_and_memory_utils,匹配新常量位置、json_utils 与对话/长度行为。
2026-04-02 12:00:00 +08:00

741 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Celery 用:按批次将 transcript 写入 Story并标记 Chapter 需物化markdown_compose_dirty
同步路径不执行 compose物化由 commit 后 `recompose_chapter` 异步完成。
"""
from __future__ import annotations
import json
import time
import uuid
from typing import Any
from sqlalchemy import select
from sqlalchemy.orm import Session, joinedload
from app.agents.memoir.narrative_agent import NarrativeAgent
from app.agents.memoir.prompts import (
format_evidence_chunks_for_prompt,
format_narrative_user_content,
)
from app.agents.stage_constants import STAGE_TO_ORDER
from app.agents.memoir.story_route_agent import (
PLAN_BATCH_MAX_SEGMENTS,
StoryBatchPlan,
StoryRouteAgent,
)
from app.agents.state_schema import MemoirStateSchema
from app.core.config import settings
from app.core.logging import get_logger
from app.features.memoir.cover_eligibility import chapter_needs_cover_enqueue
from app.features.memoir.memoir_images.settings import MemoirImageSettings
from app.features.memoir.models import Chapter
from app.features.memoir.narrative_to_markdown import narrative_to_markdown
from app.features.memoir.oral_normalize import (
apply_oral_rules,
normalize_oral_for_memoir,
)
from app.features.memoir.repo import (
mark_chapter_dirty_sync,
reorder_chapter_story_links_by_life_order_sync,
)
from app.features.memory.repo import retrieve_evidence_sync
from app.features.story.models import Story
from app.features.story.sync_write import (
append_story_version_sync,
count_story_versions_sync,
create_story_with_version_sync,
ensure_chapter_story_link_sync,
list_active_stories_for_user_sync,
)
logger = get_logger(__name__)
def _route_segment_texts(category_segments: list) -> list[tuple[str, str]]:
"""批量路由 plan_batch每段仅做规则归一避免 N 次 LLM。"""
out: list[tuple[str, str]] = []
for seg in category_segments:
raw = seg.user_input_text or ""
if (
settings.memoir_oral_normalize_enabled
and (settings.memoir_oral_normalize_mode or "rules").strip().lower()
!= "off"
):
t = apply_oral_rules(raw)
else:
t = raw
out.append((str(seg.id), t))
return out
def _fidelity_fallback_json(oral: str, existing_canonical: str | None) -> str:
"""忠实度未通过时的安全回退:续写场景保留旧文 + 本段口述,避免只剩一句。"""
o = (oral or "").strip()[:15000]
ex = (existing_canonical or "").strip()[:15000]
if ex and o:
return json.dumps(
{"paragraphs": [{"content": ex}, {"content": o}]},
ensure_ascii=False,
)
if ex:
return json.dumps(
{"paragraphs": [{"content": ex}]},
ensure_ascii=False,
)
return json.dumps(
{"paragraphs": [{"content": o}]},
ensure_ascii=False,
)
def _gate_narrative_fidelity(
oral_text: str,
narrative_raw: str,
llm: Any,
*,
existing_canonical: str | None = None,
) -> tuple[str, str]:
"""返回 (文本, fallback 原因);忠实度不通过时第二项为 fidelity_failed。"""
from app.agents.memoir.fidelity_check_agent import FidelityCheckAgent
if not settings.memoir_fidelity_check_enabled or not llm:
return narrative_raw, "none"
agent = FidelityCheckAgent()
ex = (existing_canonical or "").strip() or None
if agent.passes(
oral_text=oral_text,
narrative_json=narrative_raw,
llm=llm,
existing_canonical_markdown=ex,
):
return narrative_raw, "none"
logger.warning(
"event=fidelity_gate_fallback oral_len={} merge={}",
len((oral_text or "").strip()),
bool(ex),
)
o = (oral_text or "").strip()
if not o and not ex:
return narrative_raw, "none"
return _fidelity_fallback_json(o, ex), "fidelity_failed"
def _should_fallback_to_transcript(md: str, oral: str) -> bool:
"""模型输出相对口述极度过短时才回退仅防极端压缩如「1999」"""
o = (oral or "").strip()
if not o:
return False
m = (md or "").strip()
if not m:
return True
if len(o) < 12:
return len(m) < len(o)
ratio = float(settings.memoir_narrative_fallback_body_ratio)
min_abs = int(settings.memoir_narrative_fallback_min_chars)
threshold = max(min_abs, int(len(o) * ratio))
return len(m) < threshold
def _coalesce_story_markdown(
md: str,
oral: str,
existing_for_narrative: str,
) -> str:
"""落库前对齐正文:空输出或过短回退时,续写场景保留「已有故事 + 本段口述」。"""
o = (oral or "").strip()
ex = (existing_for_narrative or "").strip()
m = (md or "").strip()
if not m:
if ex and o:
return f"{ex}\n\n{o}"
if o:
return o
return ex
if o and _should_fallback_to_transcript(m, o):
if ex:
return f"{ex}\n\n{o}"
return o
return m
def _is_json_narrative(text: str) -> bool:
if not text or not text.strip():
return False
s = text.strip()
return s.startswith("{") and "paragraphs" in s
def _ordered_text_for_segment_ids(
category_segments: list, segment_ids: list[str]
) -> str:
id_to_text = {seg.id: (seg.user_input_text or "") for seg in category_segments}
return "\n\n".join(id_to_text.get(sid, "") for sid in segment_ids)
def _apply_narrative_fallbacks(
narrative_raw: str,
combined_unit_text: str,
existing_for_narrative: str,
*,
chapter_category: str,
) -> tuple[str, str]:
"""返回 (文本, fallback_type);无改写时为 none。"""
# 整篇合并JSON输出异常缩水回退为旧文 + 本段口述,避免覆盖丢失
if existing_for_narrative and _is_json_narrative(narrative_raw):
merged_md = narrative_to_markdown(narrative_raw).strip()
ex = (existing_for_narrative or "").strip()
if ex and len(ex) > 400 and len(merged_md) < len(ex) * 0.25:
logger.warning(
"event=narrative_fallback reason=merge_shrink action=append_oral "
"chapter_category={}",
chapter_category,
)
return f"{ex}\n\n{combined_unit_text.strip()}", "merge_shrink"
if (
existing_for_narrative
and not _is_json_narrative(narrative_raw)
and len(narrative_raw) < len(existing_for_narrative) * 0.5
):
logger.warning(
"event=narrative_fallback reason=length_anomaly action=append_raw "
"chapter_category={}",
chapter_category,
)
return (
f"{existing_for_narrative}\n\n{combined_unit_text}",
"coalesce_to_old_plus_oral",
)
md_check = narrative_to_markdown(narrative_raw).strip()
oral = (combined_unit_text or "").strip()
ex_fb = (existing_for_narrative or "").strip()
if oral and _should_fallback_to_transcript(md_check, oral):
if ex_fb:
logger.warning(
"event=narrative_fallback reason=body_too_short_vs_oral_merge "
"chapter_category={} oral_len={} md_len={}",
chapter_category,
len(oral),
len(md_check),
)
return f"{ex_fb}\n\n{oral}", "coalesce_to_old_plus_oral"
logger.warning(
"event=narrative_fallback reason=body_too_short_vs_oral "
"chapter_category={} oral_len={} md_len={}",
chapter_category,
len(oral),
len(md_check),
)
return oral, "coalesce_to_oral"
return narrative_raw, "none"
def _merge_fallback_type(gate_ft: str, apply_ft: str) -> str:
if apply_ft != "none":
return apply_ft
return gate_ft
def _story_meta_for_route(
session: Session, candidates: list
) -> dict[str, dict[str, int]]:
meta: dict[str, dict[str, int]] = {}
for s in candidates:
sid = str(s.id)
meta[sid] = {
"char_count": len((s.canonical_markdown or "").strip()),
"version_count": count_story_versions_sync(session, sid),
}
return meta
def _ensure_chapter_record(
session: Session,
*,
user_id: str,
chapter_category: str,
title: str,
source_ids: list[str],
calculated_order_index: int,
) -> Chapter:
stmt_chapter = (
select(Chapter)
.where(
Chapter.user_id == user_id,
Chapter.category == chapter_category,
Chapter.is_active == True, # noqa: E712
)
.options(
joinedload(Chapter.images),
joinedload(Chapter.story_links),
)
)
chapter = session.execute(stmt_chapter).unique().scalar_one_or_none()
if not chapter:
chapter = Chapter(
id=str(uuid.uuid4()),
user_id=user_id,
title=title,
order_index=calculated_order_index,
status="completed",
category=chapter_category,
is_new=True,
source_segments=source_ids,
)
session.add(chapter)
session.flush()
else:
chapter.source_segments = list(
set((chapter.source_segments or []) + source_ids)
)
chapter.is_new = True
session.flush()
return chapter
def _run_batch_plan_writes(
session: Session,
*,
plan: StoryBatchPlan,
category_segments: list,
chapter: Chapter,
chapter_category: str,
evidence_text: str,
slot_snippets: dict[str, str],
user_id: str,
user_profile: str,
user_birth_year: int | None,
llm: Any,
narrative_agent: NarrativeAgent,
background_voice: str = "default",
occupation: str = "",
) -> set[str]:
dispatch_ids: set[str] = set()
max_chars = int(settings.story_append_max_canonical_chars)
max_ver = int(settings.story_append_max_versions)
for unit in plan.units:
t0 = time.perf_counter()
unit_text = _ordered_text_for_segment_ids(category_segments, unit.segment_ids)
oral_unit = normalize_oral_for_memoir(unit_text, llm=llm)
ut_raw = (unit_text or "").strip()
ut_norm = (oral_unit or "").strip()
if ut_raw != ut_norm:
logger.info(
"event=oral_normalized context=batch_unit raw_len={} norm_len={}",
len(ut_raw),
len(ut_norm),
)
new_content_input = format_narrative_user_content(oral_unit, evidence_text)
target_story_id: str | None = None
existing_for_narrative = ""
decision_source = "batch_plan"
if unit.decision == "append_story" and unit.target_story_id:
st = session.get(Story, unit.target_story_id)
if st and st.user_id == user_id:
canon = (st.canonical_markdown or "").strip()
vc = count_story_versions_sync(session, str(st.id))
if len(canon) > max_chars or vc >= max_ver:
logger.info(
"event=append_overflow_to_new story_id={} canonical_chars={} "
"versions={} decision_source=batch_plan",
str(st.id),
len(canon),
vc,
)
target_story_id = None
existing_for_narrative = ""
decision_source = "forced_new_due_to_append_limit"
else:
target_story_id = st.id
existing_for_narrative = canon
raw_gen = narrative_agent.generate_narrative(
stage=chapter_category,
slots=slot_snippets,
new_content=new_content_input,
existing_content=existing_for_narrative,
user_profile=user_profile,
birth_year=user_birth_year,
llm=llm,
background_voice=background_voice,
occupation=occupation,
)
json_invalid = False
s0 = (raw_gen or "").strip()
if s0.startswith("{") and "paragraphs" in s0:
try:
json.loads(s0)
except json.JSONDecodeError:
json_invalid = True
narrative_raw, fb_gate = _gate_narrative_fidelity(
oral_unit,
raw_gen,
llm,
existing_canonical=existing_for_narrative or None,
)
narrative_raw, fb_apply = _apply_narrative_fallbacks(
narrative_raw,
oral_unit,
existing_for_narrative,
chapter_category=chapter_category,
)
fallback_type = _merge_fallback_type(fb_gate, fb_apply)
if json_invalid and fallback_type == "none":
fallback_type = "json_invalid"
md = _coalesce_story_markdown(
narrative_to_markdown(narrative_raw).strip(),
oral_unit.strip(),
existing_for_narrative or "",
)
if target_story_id:
append_story_version_sync(session, str(target_story_id), md)
dispatch_ids.add(str(target_story_id))
ensure_chapter_story_link_sync(
session, chapter_id=str(chapter.id), story_id=str(target_story_id)
)
sid_log = target_story_id
is_append = True
else:
story_title = (unit.new_story_title or "").strip()
if not story_title:
story_title = narrative_agent.generate_title(
stage=chapter_category,
emotion="neutral",
slots=slot_snippets,
user_profile=user_profile,
birth_year=user_birth_year,
llm=llm,
)
st = create_story_with_version_sync(
session,
user_id=user_id,
title=story_title,
canonical_markdown=md,
stage=chapter_category,
)
dispatch_ids.add(str(st.id))
ensure_chapter_story_link_sync(
session, chapter_id=str(chapter.id), story_id=str(st.id)
)
sid_log = st.id
is_append = False
elapsed = time.perf_counter() - t0
logger.info(
"event=story_generated route_type=batch decision_source={} route_decision={} "
"unit_segments={} used_evidence={} narrative_json_valid={} fidelity_passed={} "
"fallback_type={} oral_len={} md_len={} chapter_category={} is_append={} "
"story_id={} seconds={:.3f} oral_normalize_changed={}",
decision_source,
unit.decision,
len(unit.segment_ids),
bool(evidence_text.strip()),
_is_json_narrative(raw_gen),
fb_gate == "none",
fallback_type,
len(ut_norm),
len(md.strip()),
chapter_category,
is_append,
sid_log,
elapsed,
ut_raw != ut_norm,
)
return dispatch_ids
def run_story_pipeline_for_category_batch(
session: Session,
*,
user_id: str,
chapter_category: str,
category_segments: list,
state: MemoirStateSchema,
user_profile: str,
user_birth_year: int | None,
llm: Any,
background_voice: str = "default",
occupation: str = "",
) -> tuple[Chapter | None, bool, set[str]]:
"""
返回 (chapter, needs_cover_enqueue, story_ids_to_dispatch_after_commit)。
"""
narrative_agent = NarrativeAgent()
route_agent = StoryRouteAgent()
dispatch_ids: set[str] = set()
segment_texts = [seg.user_input_text or "" for seg in category_segments]
combined_text = "\n\n".join(segment_texts)
source_ids = [seg.id for seg in category_segments]
n_units = len(category_segments)
top_k = int(settings.evidence_top_k_default)
if n_units > int(settings.evidence_large_batch_threshold):
top_k = int(settings.evidence_top_k_large_batch)
try:
evidence = retrieve_evidence_sync(session, user_id, combined_text, top_k=top_k)
except Exception as e:
logger.warning("Evidence 检索跳过: {}", e)
evidence = {
"relevant_chunks": [],
"relevant_summaries": [],
"relevant_facts": [],
"timeline_hints": [],
"relevant_stories": [],
}
evidence_text = format_evidence_chunks_for_prompt(evidence)
oral_for_memoir = normalize_oral_for_memoir(combined_text, llm=llm)
ct_raw = (combined_text or "").strip()
om_norm = (oral_for_memoir or "").strip()
if ct_raw != om_norm:
logger.info(
"event=oral_normalized context=category_batch raw_len={} norm_len={}",
len(ct_raw),
len(om_norm),
)
new_content_input = format_narrative_user_content(oral_for_memoir, evidence_text)
stmt_chapter = (
select(Chapter)
.where(
Chapter.user_id == user_id,
Chapter.category == chapter_category,
Chapter.is_active == True, # noqa: E712
)
.options(
joinedload(Chapter.images),
joinedload(Chapter.story_links),
)
)
chapter = session.execute(stmt_chapter).unique().scalar_one_or_none()
slot_snippets: dict[str, str] = {}
stage_slots = state.slots.get(chapter_category, {}) or {}
for key, value in stage_slots.items():
snip = getattr(value, "snippet", None) or (
value.get("snippet") if isinstance(value, dict) else None
)
if snip:
slot_snippets[key] = snip
title = chapter.title if chapter else f"{chapter_category} 回忆"
if not chapter:
title = narrative_agent.generate_title(
stage=chapter_category,
emotion="neutral",
slots=slot_snippets,
user_profile=user_profile,
birth_year=user_birth_year,
llm=llm,
)
# 仅同 chapter_categorystory.stage的 Story 可作为 append 候选,避免跨章节链接导致多章内容相同
all_stories = list_active_stories_for_user_sync(session, user_id)
candidates = [s for s in all_stories if s.stage == chapter_category]
valid_ids = {str(s.id) for s in candidates}
story_meta = _story_meta_for_route(session, candidates)
# Story route 仅依据本批用户口述evidence 只进入叙事/合并,不参与 new/append 判定。
route_transcript = oral_for_memoir
calculated_order_index = STAGE_TO_ORDER.get(chapter_category, 999)
use_batch_plan = (
llm
and len(category_segments) >= 2
and len(category_segments) <= PLAN_BATCH_MAX_SEGMENTS
)
plan: StoryBatchPlan | None = None
if use_batch_plan:
segs = _route_segment_texts(category_segments)
plan = route_agent.plan_batch(
chapter_category=chapter_category,
chapter_title=title,
segments=segs,
candidate_stories=candidates,
llm=llm,
valid_story_ids=valid_ids,
story_meta=story_meta,
)
chapter = _ensure_chapter_record(
session,
user_id=user_id,
chapter_category=chapter_category,
title=title,
source_ids=source_ids,
calculated_order_index=calculated_order_index,
)
if plan is not None:
dispatch_ids = _run_batch_plan_writes(
session,
plan=plan,
category_segments=category_segments,
chapter=chapter,
chapter_category=chapter_category,
evidence_text=evidence_text,
slot_snippets=slot_snippets,
user_id=user_id,
user_profile=user_profile,
user_birth_year=user_birth_year,
llm=llm,
narrative_agent=narrative_agent,
background_voice=background_voice,
occupation=occupation,
)
else:
route = route_agent.decide(
chapter_category=chapter_category,
chapter_title=title,
batch_transcript=route_transcript,
candidate_stories=candidates,
llm=llm,
valid_story_ids=valid_ids,
story_meta=story_meta,
)
t0 = time.perf_counter()
target_story_id: str | None = None
existing_for_narrative = ""
decision_source = "fallback_no_llm" if not llm else "single_decide"
max_chars = int(settings.story_append_max_canonical_chars)
max_ver = int(settings.story_append_max_versions)
if route.decision == "append_story" and route.target_story_id:
st = session.get(Story, route.target_story_id)
if st and st.user_id == user_id:
canon = (st.canonical_markdown or "").strip()
vc = count_story_versions_sync(session, str(st.id))
if len(canon) > max_chars or vc >= max_ver:
logger.info(
"event=append_overflow_to_new story_id={} canonical_chars={} "
"versions={} decision_source=single_decide",
str(st.id),
len(canon),
vc,
)
target_story_id = None
existing_for_narrative = ""
decision_source = "forced_new_due_to_append_limit"
else:
target_story_id = st.id
existing_for_narrative = canon
raw_gen = narrative_agent.generate_narrative(
stage=chapter_category,
slots=slot_snippets,
new_content=new_content_input,
existing_content=existing_for_narrative,
user_profile=user_profile,
birth_year=user_birth_year,
llm=llm,
background_voice=background_voice,
occupation=occupation,
)
json_invalid = False
s0 = (raw_gen or "").strip()
if s0.startswith("{") and "paragraphs" in s0:
try:
json.loads(s0)
except json.JSONDecodeError:
json_invalid = True
narrative_raw, fb_gate = _gate_narrative_fidelity(
oral_for_memoir,
raw_gen,
llm,
existing_canonical=existing_for_narrative or None,
)
narrative_raw, fb_apply = _apply_narrative_fallbacks(
narrative_raw,
oral_for_memoir,
existing_for_narrative,
chapter_category=chapter_category,
)
fallback_type = _merge_fallback_type(fb_gate, fb_apply)
if json_invalid and fallback_type == "none":
fallback_type = "json_invalid"
md = _coalesce_story_markdown(
narrative_to_markdown(narrative_raw).strip(),
oral_for_memoir.strip(),
existing_for_narrative or "",
)
do_append = target_story_id is not None
if do_append:
append_story_version_sync(session, str(target_story_id), md)
dispatch_ids.add(str(target_story_id))
ensure_chapter_story_link_sync(
session, chapter_id=str(chapter.id), story_id=str(target_story_id)
)
sid_log = target_story_id
is_append = True
else:
story_title = (route.new_story_title or "").strip()
if not story_title:
story_title = narrative_agent.generate_title(
stage=chapter_category,
emotion="neutral",
slots=slot_snippets,
user_profile=user_profile,
birth_year=user_birth_year,
llm=llm,
)
st = create_story_with_version_sync(
session,
user_id=user_id,
title=story_title,
canonical_markdown=md,
stage=chapter_category,
)
dispatch_ids.add(str(st.id))
ensure_chapter_story_link_sync(
session, chapter_id=str(chapter.id), story_id=str(st.id)
)
sid_log = st.id
is_append = False
elapsed = time.perf_counter() - t0
logger.info(
"event=story_generated route_type=single decision_source={} route_decision={} "
"unit_segments={} used_evidence={} narrative_json_valid={} fidelity_passed={} "
"fallback_type={} oral_len={} md_len={} chapter_category={} is_append={} "
"story_id={} seconds={:.3f} oral_normalize_changed={}",
decision_source,
route.decision,
len(category_segments),
bool(evidence_text.strip()),
_is_json_narrative(raw_gen),
fb_gate == "none",
fallback_type,
len(om_norm),
len(md.strip()),
chapter_category,
is_append,
sid_log,
elapsed,
ct_raw != om_norm,
)
reorder_chapter_story_links_by_life_order_sync(session, str(chapter.id))
mark_chapter_dirty_sync(session, str(chapter.id))
session.flush()
image_settings = MemoirImageSettings.from_env()
needs_cover = image_settings.enabled and chapter_needs_cover_enqueue(chapter)
return chapter, needs_cover, dispatch_ids