""" Celery 用:按批次将 transcript 写入 Story,并标记 Chapter 需物化(markdown_compose_dirty)。 同步路径不执行 compose;物化由 commit 后 `recompose_chapter` 异步完成。 """ from __future__ import annotations import json import time import uuid from typing import Any from sqlalchemy import select from sqlalchemy.orm import Session, joinedload from app.agents.memoir.narrative_agent import NarrativeAgent from app.agents.memoir.prompts import ( STAGE_TO_ORDER, format_evidence_chunks_for_prompt, format_narrative_user_content, ) from app.agents.memoir.story_route_agent import ( PLAN_BATCH_MAX_SEGMENTS, StoryBatchPlan, StoryRouteAgent, ) from app.agents.state_schema import MemoirStateSchema from app.core.config import settings from app.core.logging import get_logger from app.features.memoir.cover_eligibility import chapter_needs_cover_enqueue from app.features.memoir.memoir_images.settings import MemoirImageSettings from app.features.memoir.models import Chapter from app.features.memoir.narrative_to_markdown import narrative_to_markdown from app.features.memoir.oral_normalize import ( apply_oral_rules, normalize_oral_for_memoir, ) from app.features.memoir.repo import ( mark_chapter_dirty_sync, reorder_chapter_story_links_by_life_order_sync, ) from app.features.memory.repo import retrieve_evidence_sync from app.features.story.models import Story from app.features.story.sync_write import ( append_story_version_sync, count_story_versions_sync, create_story_with_version_sync, ensure_chapter_story_link_sync, list_active_stories_for_user_sync, ) logger = get_logger(__name__) def _route_segment_texts(category_segments: list) -> list[tuple[str, str]]: """批量路由 plan_batch:每段仅做规则归一,避免 N 次 LLM。""" out: list[tuple[str, str]] = [] for seg in category_segments: raw = seg.user_input_text or "" if ( settings.memoir_oral_normalize_enabled and (settings.memoir_oral_normalize_mode or "rules").strip().lower() != "off" ): t = apply_oral_rules(raw) else: t = raw out.append((str(seg.id), t)) return out def _fidelity_fallback_json(oral: str, existing_canonical: str | None) -> str: """忠实度未通过时的安全回退:续写场景保留旧文 + 本段口述,避免只剩一句。""" o = (oral or "").strip()[:15000] ex = (existing_canonical or "").strip()[:15000] if ex and o: return json.dumps( {"paragraphs": [{"content": ex}, {"content": o}]}, ensure_ascii=False, ) if ex: return json.dumps( {"paragraphs": [{"content": ex}]}, ensure_ascii=False, ) return json.dumps( {"paragraphs": [{"content": o}]}, ensure_ascii=False, ) def _gate_narrative_fidelity( oral_text: str, narrative_raw: str, llm: Any, *, existing_canonical: str | None = None, ) -> tuple[str, str]: """返回 (文本, fallback 原因);忠实度不通过时第二项为 fidelity_failed。""" from app.agents.memoir.fidelity_check_agent import FidelityCheckAgent if not settings.memoir_fidelity_check_enabled or not llm: return narrative_raw, "none" agent = FidelityCheckAgent() ex = (existing_canonical or "").strip() or None if agent.passes( oral_text=oral_text, narrative_json=narrative_raw, llm=llm, existing_canonical_markdown=ex, ): return narrative_raw, "none" logger.warning( "event=fidelity_gate_fallback oral_len={} merge={}", len((oral_text or "").strip()), bool(ex), ) o = (oral_text or "").strip() if not o and not ex: return narrative_raw, "none" return _fidelity_fallback_json(o, ex), "fidelity_failed" def _should_fallback_to_transcript(md: str, oral: str) -> bool: """模型输出相对口述极度过短时才回退(仅防极端压缩如「1999」)。""" o = (oral or "").strip() if not o: return False m = (md or "").strip() if not m: return True if len(o) < 12: return len(m) < len(o) ratio = float(settings.memoir_narrative_fallback_body_ratio) min_abs = int(settings.memoir_narrative_fallback_min_chars) threshold = max(min_abs, int(len(o) * ratio)) return len(m) < threshold def _coalesce_story_markdown( md: str, oral: str, existing_for_narrative: str, ) -> str: """落库前对齐正文:空输出或过短回退时,续写场景保留「已有故事 + 本段口述」。""" o = (oral or "").strip() ex = (existing_for_narrative or "").strip() m = (md or "").strip() if not m: if ex and o: return f"{ex}\n\n{o}" if o: return o return ex if o and _should_fallback_to_transcript(m, o): if ex: return f"{ex}\n\n{o}" return o return m def _is_json_narrative(text: str) -> bool: if not text or not text.strip(): return False s = text.strip() return s.startswith("{") and "paragraphs" in s def _ordered_text_for_segment_ids( category_segments: list, segment_ids: list[str] ) -> str: id_to_text = {seg.id: (seg.user_input_text or "") for seg in category_segments} return "\n\n".join(id_to_text.get(sid, "") for sid in segment_ids) def _apply_narrative_fallbacks( narrative_raw: str, combined_unit_text: str, existing_for_narrative: str, *, chapter_category: str, ) -> tuple[str, str]: """返回 (文本, fallback_type);无改写时为 none。""" # 整篇合并(JSON)输出异常缩水:回退为旧文 + 本段口述,避免覆盖丢失 if existing_for_narrative and _is_json_narrative(narrative_raw): merged_md = narrative_to_markdown(narrative_raw).strip() ex = (existing_for_narrative or "").strip() if ex and len(ex) > 400 and len(merged_md) < len(ex) * 0.25: logger.warning( "event=narrative_fallback reason=merge_shrink action=append_oral " "chapter_category={}", chapter_category, ) return f"{ex}\n\n{combined_unit_text.strip()}", "merge_shrink" if ( existing_for_narrative and not _is_json_narrative(narrative_raw) and len(narrative_raw) < len(existing_for_narrative) * 0.5 ): logger.warning( "event=narrative_fallback reason=length_anomaly action=append_raw " "chapter_category={}", chapter_category, ) return ( f"{existing_for_narrative}\n\n{combined_unit_text}", "coalesce_to_old_plus_oral", ) md_check = narrative_to_markdown(narrative_raw).strip() oral = (combined_unit_text or "").strip() ex_fb = (existing_for_narrative or "").strip() if oral and _should_fallback_to_transcript(md_check, oral): if ex_fb: logger.warning( "event=narrative_fallback reason=body_too_short_vs_oral_merge " "chapter_category={} oral_len={} md_len={}", chapter_category, len(oral), len(md_check), ) return f"{ex_fb}\n\n{oral}", "coalesce_to_old_plus_oral" logger.warning( "event=narrative_fallback reason=body_too_short_vs_oral " "chapter_category={} oral_len={} md_len={}", chapter_category, len(oral), len(md_check), ) return oral, "coalesce_to_oral" return narrative_raw, "none" def _merge_fallback_type(gate_ft: str, apply_ft: str) -> str: if apply_ft != "none": return apply_ft return gate_ft def _story_meta_for_route( session: Session, candidates: list ) -> dict[str, dict[str, int]]: meta: dict[str, dict[str, int]] = {} for s in candidates: sid = str(s.id) meta[sid] = { "char_count": len((s.canonical_markdown or "").strip()), "version_count": count_story_versions_sync(session, sid), } return meta def _ensure_chapter_record( session: Session, *, user_id: str, chapter_category: str, title: str, source_ids: list[str], calculated_order_index: int, ) -> Chapter: stmt_chapter = ( select(Chapter) .where( Chapter.user_id == user_id, Chapter.category == chapter_category, Chapter.is_active == True, # noqa: E712 ) .options( joinedload(Chapter.images), joinedload(Chapter.story_links), ) ) chapter = session.execute(stmt_chapter).unique().scalar_one_or_none() if not chapter: chapter = Chapter( id=str(uuid.uuid4()), user_id=user_id, title=title, order_index=calculated_order_index, status="completed", category=chapter_category, is_new=True, source_segments=source_ids, ) session.add(chapter) session.flush() else: chapter.source_segments = list( set((chapter.source_segments or []) + source_ids) ) chapter.is_new = True session.flush() return chapter def _run_batch_plan_writes( session: Session, *, plan: StoryBatchPlan, category_segments: list, chapter: Chapter, chapter_category: str, evidence_text: str, slot_snippets: dict[str, str], user_id: str, user_profile: str, user_birth_year: int | None, llm: Any, narrative_agent: NarrativeAgent, background_voice: str = "default", occupation: str = "", ) -> set[str]: dispatch_ids: set[str] = set() max_chars = int(settings.story_append_max_canonical_chars) max_ver = int(settings.story_append_max_versions) for unit in plan.units: t0 = time.perf_counter() unit_text = _ordered_text_for_segment_ids(category_segments, unit.segment_ids) oral_unit = normalize_oral_for_memoir(unit_text, llm=llm) ut_raw = (unit_text or "").strip() ut_norm = (oral_unit or "").strip() if ut_raw != ut_norm: logger.info( "event=oral_normalized context=batch_unit raw_len={} norm_len={}", len(ut_raw), len(ut_norm), ) new_content_input = format_narrative_user_content(oral_unit, evidence_text) target_story_id: str | None = None existing_for_narrative = "" decision_source = "batch_plan" if unit.decision == "append_story" and unit.target_story_id: st = session.get(Story, unit.target_story_id) if st and st.user_id == user_id: canon = (st.canonical_markdown or "").strip() vc = count_story_versions_sync(session, str(st.id)) if len(canon) > max_chars or vc >= max_ver: logger.info( "event=append_overflow_to_new story_id={} canonical_chars={} " "versions={} decision_source=batch_plan", str(st.id), len(canon), vc, ) target_story_id = None existing_for_narrative = "" decision_source = "forced_new_due_to_append_limit" else: target_story_id = st.id existing_for_narrative = canon raw_gen = narrative_agent.generate_narrative( stage=chapter_category, slots=slot_snippets, new_content=new_content_input, existing_content=existing_for_narrative, user_profile=user_profile, birth_year=user_birth_year, llm=llm, background_voice=background_voice, occupation=occupation, ) json_invalid = False s0 = (raw_gen or "").strip() if s0.startswith("{") and "paragraphs" in s0: try: json.loads(s0) except json.JSONDecodeError: json_invalid = True narrative_raw, fb_gate = _gate_narrative_fidelity( oral_unit, raw_gen, llm, existing_canonical=existing_for_narrative or None, ) narrative_raw, fb_apply = _apply_narrative_fallbacks( narrative_raw, oral_unit, existing_for_narrative, chapter_category=chapter_category, ) fallback_type = _merge_fallback_type(fb_gate, fb_apply) if json_invalid and fallback_type == "none": fallback_type = "json_invalid" md = _coalesce_story_markdown( narrative_to_markdown(narrative_raw).strip(), oral_unit.strip(), existing_for_narrative or "", ) if target_story_id: append_story_version_sync(session, str(target_story_id), md) dispatch_ids.add(str(target_story_id)) ensure_chapter_story_link_sync( session, chapter_id=str(chapter.id), story_id=str(target_story_id) ) sid_log = target_story_id is_append = True else: story_title = (unit.new_story_title or "").strip() if not story_title: story_title = narrative_agent.generate_title( stage=chapter_category, emotion="neutral", slots=slot_snippets, user_profile=user_profile, birth_year=user_birth_year, llm=llm, ) st = create_story_with_version_sync( session, user_id=user_id, title=story_title, canonical_markdown=md, stage=chapter_category, ) dispatch_ids.add(str(st.id)) ensure_chapter_story_link_sync( session, chapter_id=str(chapter.id), story_id=str(st.id) ) sid_log = st.id is_append = False elapsed = time.perf_counter() - t0 logger.info( "event=story_generated route_type=batch decision_source={} route_decision={} " "unit_segments={} used_evidence={} narrative_json_valid={} fidelity_passed={} " "fallback_type={} oral_len={} md_len={} chapter_category={} is_append={} " "story_id={} seconds={:.3f} oral_normalize_changed={}", decision_source, unit.decision, len(unit.segment_ids), bool(evidence_text.strip()), _is_json_narrative(raw_gen), fb_gate == "none", fallback_type, len(ut_norm), len(md.strip()), chapter_category, is_append, sid_log, elapsed, ut_raw != ut_norm, ) return dispatch_ids def run_story_pipeline_for_category_batch( session: Session, *, user_id: str, chapter_category: str, category_segments: list, state: MemoirStateSchema, user_profile: str, user_birth_year: int | None, llm: Any, background_voice: str = "default", occupation: str = "", ) -> tuple[Chapter | None, bool, set[str]]: """ 返回 (chapter, needs_cover_enqueue, story_ids_to_dispatch_after_commit)。 """ narrative_agent = NarrativeAgent() route_agent = StoryRouteAgent() dispatch_ids: set[str] = set() segment_texts = [seg.user_input_text or "" for seg in category_segments] combined_text = "\n\n".join(segment_texts) source_ids = [seg.id for seg in category_segments] n_units = len(category_segments) top_k = int(settings.evidence_top_k_default) if n_units > int(settings.evidence_large_batch_threshold): top_k = int(settings.evidence_top_k_large_batch) try: evidence = retrieve_evidence_sync(session, user_id, combined_text, top_k=top_k) except Exception as e: logger.warning("Evidence 检索跳过: {}", e) evidence = { "relevant_chunks": [], "relevant_summaries": [], "relevant_facts": [], "timeline_hints": [], "relevant_stories": [], } evidence_text = format_evidence_chunks_for_prompt(evidence) oral_for_memoir = normalize_oral_for_memoir(combined_text, llm=llm) ct_raw = (combined_text or "").strip() om_norm = (oral_for_memoir or "").strip() if ct_raw != om_norm: logger.info( "event=oral_normalized context=category_batch raw_len={} norm_len={}", len(ct_raw), len(om_norm), ) new_content_input = format_narrative_user_content(oral_for_memoir, evidence_text) stmt_chapter = ( select(Chapter) .where( Chapter.user_id == user_id, Chapter.category == chapter_category, Chapter.is_active == True, # noqa: E712 ) .options( joinedload(Chapter.images), joinedload(Chapter.story_links), ) ) chapter = session.execute(stmt_chapter).unique().scalar_one_or_none() slot_snippets: dict[str, str] = {} stage_slots = state.slots.get(chapter_category, {}) or {} for key, value in stage_slots.items(): snip = getattr(value, "snippet", None) or ( value.get("snippet") if isinstance(value, dict) else None ) if snip: slot_snippets[key] = snip title = chapter.title if chapter else f"{chapter_category} 回忆" if not chapter: title = narrative_agent.generate_title( stage=chapter_category, emotion="neutral", slots=slot_snippets, user_profile=user_profile, birth_year=user_birth_year, llm=llm, ) # 仅同 chapter_category(story.stage)的 Story 可作为 append 候选,避免跨章节链接导致多章内容相同 all_stories = list_active_stories_for_user_sync(session, user_id) candidates = [s for s in all_stories if s.stage == chapter_category] valid_ids = {str(s.id) for s in candidates} story_meta = _story_meta_for_route(session, candidates) # Story route 仅依据本批用户口述;evidence 只进入叙事/合并,不参与 new/append 判定。 route_transcript = oral_for_memoir calculated_order_index = STAGE_TO_ORDER.get(chapter_category, 999) use_batch_plan = ( llm and len(category_segments) >= 2 and len(category_segments) <= PLAN_BATCH_MAX_SEGMENTS ) plan: StoryBatchPlan | None = None if use_batch_plan: segs = _route_segment_texts(category_segments) plan = route_agent.plan_batch( chapter_category=chapter_category, chapter_title=title, segments=segs, candidate_stories=candidates, llm=llm, valid_story_ids=valid_ids, story_meta=story_meta, ) chapter = _ensure_chapter_record( session, user_id=user_id, chapter_category=chapter_category, title=title, source_ids=source_ids, calculated_order_index=calculated_order_index, ) if plan is not None: dispatch_ids = _run_batch_plan_writes( session, plan=plan, category_segments=category_segments, chapter=chapter, chapter_category=chapter_category, evidence_text=evidence_text, slot_snippets=slot_snippets, user_id=user_id, user_profile=user_profile, user_birth_year=user_birth_year, llm=llm, narrative_agent=narrative_agent, background_voice=background_voice, occupation=occupation, ) else: route = route_agent.decide( chapter_category=chapter_category, chapter_title=title, batch_transcript=route_transcript, candidate_stories=candidates, llm=llm, valid_story_ids=valid_ids, story_meta=story_meta, ) t0 = time.perf_counter() target_story_id: str | None = None existing_for_narrative = "" decision_source = "fallback_no_llm" if not llm else "single_decide" max_chars = int(settings.story_append_max_canonical_chars) max_ver = int(settings.story_append_max_versions) if route.decision == "append_story" and route.target_story_id: st = session.get(Story, route.target_story_id) if st and st.user_id == user_id: canon = (st.canonical_markdown or "").strip() vc = count_story_versions_sync(session, str(st.id)) if len(canon) > max_chars or vc >= max_ver: logger.info( "event=append_overflow_to_new story_id={} canonical_chars={} " "versions={} decision_source=single_decide", str(st.id), len(canon), vc, ) target_story_id = None existing_for_narrative = "" decision_source = "forced_new_due_to_append_limit" else: target_story_id = st.id existing_for_narrative = canon raw_gen = narrative_agent.generate_narrative( stage=chapter_category, slots=slot_snippets, new_content=new_content_input, existing_content=existing_for_narrative, user_profile=user_profile, birth_year=user_birth_year, llm=llm, background_voice=background_voice, occupation=occupation, ) json_invalid = False s0 = (raw_gen or "").strip() if s0.startswith("{") and "paragraphs" in s0: try: json.loads(s0) except json.JSONDecodeError: json_invalid = True narrative_raw, fb_gate = _gate_narrative_fidelity( oral_for_memoir, raw_gen, llm, existing_canonical=existing_for_narrative or None, ) narrative_raw, fb_apply = _apply_narrative_fallbacks( narrative_raw, oral_for_memoir, existing_for_narrative, chapter_category=chapter_category, ) fallback_type = _merge_fallback_type(fb_gate, fb_apply) if json_invalid and fallback_type == "none": fallback_type = "json_invalid" md = _coalesce_story_markdown( narrative_to_markdown(narrative_raw).strip(), oral_for_memoir.strip(), existing_for_narrative or "", ) do_append = target_story_id is not None if do_append: append_story_version_sync(session, str(target_story_id), md) dispatch_ids.add(str(target_story_id)) ensure_chapter_story_link_sync( session, chapter_id=str(chapter.id), story_id=str(target_story_id) ) sid_log = target_story_id is_append = True else: story_title = (route.new_story_title or "").strip() if not story_title: story_title = narrative_agent.generate_title( stage=chapter_category, emotion="neutral", slots=slot_snippets, user_profile=user_profile, birth_year=user_birth_year, llm=llm, ) st = create_story_with_version_sync( session, user_id=user_id, title=story_title, canonical_markdown=md, stage=chapter_category, ) dispatch_ids.add(str(st.id)) ensure_chapter_story_link_sync( session, chapter_id=str(chapter.id), story_id=str(st.id) ) sid_log = st.id is_append = False elapsed = time.perf_counter() - t0 logger.info( "event=story_generated route_type=single decision_source={} route_decision={} " "unit_segments={} used_evidence={} narrative_json_valid={} fidelity_passed={} " "fallback_type={} oral_len={} md_len={} chapter_category={} is_append={} " "story_id={} seconds={:.3f} oral_normalize_changed={}", decision_source, route.decision, len(category_segments), bool(evidence_text.strip()), _is_json_narrative(raw_gen), fb_gate == "none", fallback_type, len(om_norm), len(md.strip()), chapter_category, is_append, sid_log, elapsed, ct_raw != om_norm, ) reorder_chapter_story_links_by_life_order_sync(session, str(chapter.id)) mark_chapter_dirty_sync(session, str(chapter.id)) session.flush() image_settings = MemoirImageSettings.from_env() needs_cover = image_settings.enabled and chapter_needs_cover_enqueue(chapter) return chapter, needs_cover, dispatch_ids