Files
life-echo/api/app/features/memoir/story_pipeline_sync.py
Kevin a3f61fcc0f feat(api+app): 对话阶段化、回忆录流水线与客户端会话体验
- DB: segments 用户输入文本(Alembic 0002)
- Chat: 阶段检测/阶段提示/回复限制,编排与访谈/画像 prompts 调整
- Memoir: 忠实度检查 agent,叙事与分类等链路更新
- Core: agent 日志、Alembic 启动、LangChain/日志/配置等
- Story: time_hints;Memory 检索与相关测试
- Expo: 助手头像、会话页与消息拆分、实时会话与文案/i18n
- Docs/scripts/tests: 迁移脚本、LLM JSON/记忆检索文档、新增单测
2026-03-26 12:13:36 +08:00

497 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Celery 用:按批次将 transcript 写入 Story并物化 Chapter canonical_markdown。
"""
from __future__ import annotations
import json
import uuid
from typing import Any
from sqlalchemy import select
from sqlalchemy.orm import Session, joinedload
from app.agents.memoir.narrative_agent import NarrativeAgent
from app.agents.memoir.prompts import (
STAGE_TO_ORDER,
format_evidence_chunks_for_prompt,
format_narrative_user_content,
)
from app.core.config import settings
from app.agents.memoir.story_route_agent import (
PLAN_BATCH_MAX_SEGMENTS,
StoryBatchPlan,
StoryRouteAgent,
)
from app.agents.state_schema import MemoirStateSchema
from app.core.logging import get_logger
from app.features.memoir.cover_eligibility import chapter_needs_cover_enqueue
from app.features.memoir.helpers import _chapter_markdown
from app.features.memoir.memoir_images.settings import MemoirImageSettings
from app.features.memoir.models import Chapter
from app.features.memoir.narrative_to_markdown import narrative_to_markdown
from app.features.memoir.repo import (
compose_chapter_from_story_links_sync,
reorder_chapter_story_links_by_life_order_sync,
)
from app.features.memory.repo import retrieve_evidence_sync
from app.features.story.models import Story
from app.features.story.sync_write import (
append_story_version_sync,
create_story_with_version_sync,
ensure_chapter_story_link_sync,
list_active_stories_for_user_sync,
)
logger = get_logger(__name__)
def _gate_narrative_fidelity(oral_text: str, narrative_raw: str, llm: Any) -> str:
"""叙事 JSON 忠实度检查;不通过则回退为单段口述正文。"""
from app.agents.memoir.fidelity_check_agent import FidelityCheckAgent
if not settings.memoir_fidelity_check_enabled or not llm:
return narrative_raw
agent = FidelityCheckAgent()
if agent.passes(oral_text=oral_text, narrative_json=narrative_raw, llm=llm):
return narrative_raw
logger.warning(
"event=fidelity_gate_fallback oral_len={}",
len((oral_text or "").strip()),
)
o = (oral_text or "").strip()
if not o:
return narrative_raw
return json.dumps(
{"paragraphs": [{"content": o[:15000]}]},
ensure_ascii=False,
)
def _should_fallback_to_transcript(md: str, oral: str) -> bool:
"""模型输出相对口述明显过短时回退为口述原文防「1999」类压缩"""
o = (oral or "").strip()
if not o:
return False
m = (md or "").strip()
if not m:
return True
if len(o) < 12:
return len(m) < len(o)
ratio = float(settings.memoir_narrative_fallback_body_ratio)
min_abs = int(settings.memoir_narrative_fallback_min_chars)
threshold = max(min_abs, int(len(o) * ratio))
return len(m) < threshold
def _is_json_narrative(text: str) -> bool:
if not text or not text.strip():
return False
s = text.strip()
return s.startswith("{") and "paragraphs" in s
def _ordered_text_for_segment_ids(
category_segments: list, segment_ids: list[str]
) -> str:
id_to_text = {seg.id: (seg.user_input_text or "") for seg in category_segments}
return "\n\n".join(id_to_text.get(sid, "") for sid in segment_ids)
def _apply_narrative_fallbacks(
narrative_raw: str,
combined_unit_text: str,
existing_for_narrative: str,
existing_chapter_md: str,
*,
chapter_category: str,
) -> str:
# 整篇合并JSON输出异常缩水回退为旧文 + 本段口述,避免覆盖丢失
if existing_for_narrative and _is_json_narrative(narrative_raw):
merged_md = narrative_to_markdown(narrative_raw).strip()
ex = (existing_for_narrative or "").strip()
if ex and len(ex) > 400 and len(merged_md) < len(ex) * 0.35:
logger.warning(
"event=narrative_fallback reason=merge_shrink action=append_oral "
"chapter_category={}",
chapter_category,
)
return f"{ex}\n\n{combined_unit_text.strip()}"
if (
existing_for_narrative
and not _is_json_narrative(narrative_raw)
and len(narrative_raw) < len(existing_for_narrative) * 0.8
):
logger.warning(
"event=narrative_fallback reason=length_anomaly action=append_raw "
"chapter_category={}",
chapter_category,
)
return f"{existing_for_narrative}\n\n{combined_unit_text}"
if (
not existing_for_narrative
and existing_chapter_md
and not _is_json_narrative(narrative_raw)
and len(narrative_raw) < len(existing_chapter_md) * 0.8
):
logger.warning(
"event=narrative_fallback reason=chapter_length_anomaly action=append_transcript "
"chapter_category={}",
chapter_category,
)
return f"{existing_chapter_md}\n\n{combined_unit_text}"
md_check = narrative_to_markdown(narrative_raw).strip()
oral = (combined_unit_text or "").strip()
if oral and _should_fallback_to_transcript(md_check, oral):
logger.warning(
"event=narrative_fallback reason=body_too_short_vs_oral "
"chapter_category={} oral_len={} md_len={}",
chapter_category,
len(oral),
len(md_check),
)
return oral
return narrative_raw
def _ensure_chapter_record(
session: Session,
*,
user_id: str,
chapter_category: str,
title: str,
source_ids: list[str],
calculated_order_index: int,
) -> Chapter:
stmt_chapter = (
select(Chapter)
.where(
Chapter.user_id == user_id,
Chapter.category == chapter_category,
Chapter.is_active == True, # noqa: E712
)
.options(
joinedload(Chapter.images),
joinedload(Chapter.story_links),
)
)
chapter = session.execute(stmt_chapter).unique().scalar_one_or_none()
if not chapter:
chapter = Chapter(
id=str(uuid.uuid4()),
user_id=user_id,
title=title,
order_index=calculated_order_index,
status="completed",
category=chapter_category,
is_new=True,
source_segments=source_ids,
)
session.add(chapter)
session.flush()
else:
chapter.source_segments = list(
set((chapter.source_segments or []) + source_ids)
)
chapter.is_new = True
session.flush()
return chapter
def _run_batch_plan_writes(
session: Session,
*,
plan: StoryBatchPlan,
category_segments: list,
chapter: Chapter,
chapter_category: str,
evidence_text: str,
existing_chapter_md: str,
slot_snippets: dict[str, str],
user_id: str,
user_profile: str,
user_birth_year: int | None,
llm: Any,
narrative_agent: NarrativeAgent,
) -> set[str]:
dispatch_ids: set[str] = set()
for unit in plan.units:
unit_text = _ordered_text_for_segment_ids(category_segments, unit.segment_ids)
new_content_input = format_narrative_user_content(unit_text, evidence_text)
target_story_id: str | None = None
existing_for_narrative = ""
if unit.decision == "append_story" and unit.target_story_id:
st = session.get(Story, unit.target_story_id)
if st and st.user_id == user_id:
target_story_id = st.id
existing_for_narrative = (st.canonical_markdown or "").strip()
narrative_raw = narrative_agent.generate_narrative(
stage=chapter_category,
slots=slot_snippets,
new_content=new_content_input,
existing_content=existing_for_narrative,
user_profile=user_profile,
birth_year=user_birth_year,
llm=llm,
)
narrative_raw = _gate_narrative_fidelity(unit_text, narrative_raw, llm)
narrative_raw = _apply_narrative_fallbacks(
narrative_raw,
unit_text,
existing_for_narrative,
existing_chapter_md,
chapter_category=chapter_category,
)
md = narrative_to_markdown(narrative_raw).strip()
if not md:
md = unit_text.strip()
elif _should_fallback_to_transcript(md, unit_text.strip()):
md = unit_text.strip()
if target_story_id:
append_story_version_sync(session, target_story_id, md)
dispatch_ids.add(target_story_id)
ensure_chapter_story_link_sync(
session, chapter_id=chapter.id, story_id=target_story_id
)
else:
story_title = (unit.new_story_title or "").strip()
if not story_title:
story_title = narrative_agent.generate_title(
stage=chapter_category,
emotion="neutral",
slots=slot_snippets,
user_profile=user_profile,
birth_year=user_birth_year,
llm=llm,
)
st = create_story_with_version_sync(
session,
user_id=user_id,
title=story_title,
canonical_markdown=md,
stage=chapter_category,
)
dispatch_ids.add(st.id)
ensure_chapter_story_link_sync(
session, chapter_id=chapter.id, story_id=st.id
)
return dispatch_ids
def run_story_pipeline_for_category_batch(
session: Session,
*,
user_id: str,
chapter_category: str,
category_segments: list,
state: MemoirStateSchema,
user_profile: str,
user_birth_year: int | None,
llm: Any,
) -> tuple[Chapter | None, bool, set[str]]:
"""
返回 (chapter, needs_cover_enqueue, story_ids_to_dispatch_after_commit)。
"""
narrative_agent = NarrativeAgent()
route_agent = StoryRouteAgent()
dispatch_ids: set[str] = set()
segment_texts = [seg.user_input_text or "" for seg in category_segments]
combined_text = "\n\n".join(segment_texts)
source_ids = [seg.id for seg in category_segments]
try:
evidence = retrieve_evidence_sync(session, user_id, combined_text, top_k=10)
except Exception as e:
logger.warning("Evidence 检索跳过: {}", e)
evidence = {
"relevant_chunks": [],
"relevant_summaries": [],
"relevant_facts": [],
"timeline_hints": [],
"relevant_stories": [],
}
evidence_text = format_evidence_chunks_for_prompt(evidence)
new_content_input = format_narrative_user_content(combined_text, evidence_text)
stmt_chapter = (
select(Chapter)
.where(
Chapter.user_id == user_id,
Chapter.category == chapter_category,
Chapter.is_active == True, # noqa: E712
)
.options(
joinedload(Chapter.images),
joinedload(Chapter.story_links),
)
)
chapter = session.execute(stmt_chapter).unique().scalar_one_or_none()
slot_snippets: dict[str, str] = {}
stage_slots = state.slots.get(chapter_category, {}) or {}
for key, value in stage_slots.items():
snip = getattr(value, "snippet", None) or (
value.get("snippet") if isinstance(value, dict) else None
)
if snip:
slot_snippets[key] = snip
title = chapter.title if chapter else f"{chapter_category} 回忆"
existing_chapter_md = _chapter_markdown(chapter) if chapter else ""
if not chapter:
title = narrative_agent.generate_title(
stage=chapter_category,
emotion="neutral",
slots=slot_snippets,
user_profile=user_profile,
birth_year=user_birth_year,
llm=llm,
)
candidates = list_active_stories_for_user_sync(session, user_id)
valid_ids = {s.id for s in candidates}
batch_for_route = (
f"{combined_text}\n\n{evidence_text}"
if evidence_text.strip()
else combined_text
)
calculated_order_index = STAGE_TO_ORDER.get(chapter_category, 999)
use_batch_plan = (
llm
and len(category_segments) >= 2
and len(category_segments) <= PLAN_BATCH_MAX_SEGMENTS
)
plan: StoryBatchPlan | None = None
if use_batch_plan:
segs = [(seg.id, seg.user_input_text or "") for seg in category_segments]
plan = route_agent.plan_batch(
chapter_category=chapter_category,
chapter_title=title,
segments=segs,
candidate_stories=candidates,
llm=llm,
valid_story_ids=valid_ids,
)
chapter = _ensure_chapter_record(
session,
user_id=user_id,
chapter_category=chapter_category,
title=title,
source_ids=source_ids,
calculated_order_index=calculated_order_index,
)
if plan is not None:
dispatch_ids = _run_batch_plan_writes(
session,
plan=plan,
category_segments=category_segments,
chapter=chapter,
chapter_category=chapter_category,
evidence_text=evidence_text,
existing_chapter_md=existing_chapter_md,
slot_snippets=slot_snippets,
user_id=user_id,
user_profile=user_profile,
user_birth_year=user_birth_year,
llm=llm,
narrative_agent=narrative_agent,
)
else:
route = route_agent.decide(
chapter_category=chapter_category,
chapter_title=title,
batch_transcript=batch_for_route,
candidate_stories=candidates,
llm=llm,
valid_story_ids=valid_ids,
)
target_story_id: str | None = None
existing_for_narrative = ""
if route.decision == "append_story" and route.target_story_id:
st = session.get(Story, route.target_story_id)
if st and st.user_id == user_id:
target_story_id = st.id
existing_for_narrative = (st.canonical_markdown or "").strip()
narrative_raw = narrative_agent.generate_narrative(
stage=chapter_category,
slots=slot_snippets,
new_content=new_content_input,
existing_content=existing_for_narrative,
user_profile=user_profile,
birth_year=user_birth_year,
llm=llm,
)
narrative_raw = _gate_narrative_fidelity(combined_text, narrative_raw, llm)
narrative_raw = _apply_narrative_fallbacks(
narrative_raw,
combined_text,
existing_for_narrative,
existing_chapter_md,
chapter_category=chapter_category,
)
md = narrative_to_markdown(narrative_raw).strip()
if not md:
md = combined_text.strip()
elif _should_fallback_to_transcript(md, combined_text.strip()):
md = combined_text.strip()
do_append = target_story_id is not None
if do_append:
append_story_version_sync(session, target_story_id, md)
dispatch_ids.add(target_story_id)
ensure_chapter_story_link_sync(
session, chapter_id=chapter.id, story_id=target_story_id
)
else:
story_title = (route.new_story_title or "").strip()
if not story_title:
story_title = narrative_agent.generate_title(
stage=chapter_category,
emotion="neutral",
slots=slot_snippets,
user_profile=user_profile,
birth_year=user_birth_year,
llm=llm,
)
st = create_story_with_version_sync(
session,
user_id=user_id,
title=story_title,
canonical_markdown=md,
stage=chapter_category,
)
dispatch_ids.add(st.id)
ensure_chapter_story_link_sync(
session, chapter_id=chapter.id, story_id=st.id
)
reorder_chapter_story_links_by_life_order_sync(session, chapter.id)
compose_chapter_from_story_links_sync(session, chapter.id)
session.flush()
image_settings = MemoirImageSettings.from_env()
needs_cover = image_settings.enabled and chapter_needs_cover_enqueue(chapter)
return chapter, needs_cover, dispatch_ids