From aac484463de3d67c90dd99221a2936da28a15c03 Mon Sep 17 00:00:00 2001 From: Kevin Date: Mon, 30 Mar 2026 11:53:04 +0800 Subject: [PATCH] =?UTF-8?q?feat(api):=20=E6=8B=86=E5=88=86=E7=AB=A0?= =?UTF-8?q?=E8=8A=82=E7=89=A9=E5=8C=96=E4=B8=8E=20Story=20=E5=90=8E?= =?UTF-8?q?=E5=A4=84=E7=90=86=EF=BC=8C=E5=B9=B6=E5=8A=A0=E5=9B=BA=20Redis?= =?UTF-8?q?=20=E9=94=81=E4=B8=8E=E8=85=BE=E8=AE=AF=20ASR?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 回忆录 Story 流水线(同步) - 同步路径仅写入 Story 与章节关联,改为 mark_chapter_dirty_sync,不再内联 compose - 物化由 Celery recompose_chapter 异步完成;compose 不变量与异常时保留 dirty 的语义在 repo 中补充说明 - Evidence:大批次时降低 top_k;路由候选 story 携带 char_count/version_count;append 超长/版本过多时强制新开 story - 叙事 prompt:relevant_chunks 去重,减少重复证据噪声 - 叙事回退与忠实度 gate:返回 fallback 类型并记录结构化日志(含耗时、JSON 有效性等) Post-commit 与任务编排 - 新增 post_commit.enqueue_story_post_commit_effects:统一派发 generate_story_image(Redis 去重)、延迟 recompose_chapter、可选 memory compaction - memoir_tasks / story_service / story_image_tasks 改为调用 post-commit 入口;主图回填后按关联章节重算并调度物化与 compacs(锁委托、Redis 单例、ASR to_thread) - 更新 test_narrative_pipeline 以适配 _apply_narrative_fallbacks 返回值 --- api/.env.example | 12 + api/app/adapters/asr/tencent_asr.py | 4 +- api/app/agents/memoir/prompts.py | 42 +++- api/app/agents/memoir/story_route_agent.py | 34 ++- api/app/core/chapter_pipeline_lock.py | 24 ++ api/app/core/config.py | 13 + api/app/features/memoir/repo.py | 17 ++ .../features/memoir/story_pipeline_sync.py | 227 ++++++++++++++---- api/app/features/story/post_commit.py | 148 ++++++++++++ api/app/features/story/service.py | 61 +++-- api/app/tasks/chapter_compose_tasks.py | 79 +++++- api/app/tasks/memoir_tasks.py | 108 +++++---- api/app/tasks/story_image_tasks.py | 31 ++- api/tests/test_infra_regressions.py | 109 +++++++++ api/tests/test_narrative_pipeline.py | 10 +- 15 files changed, 775 insertions(+), 144 deletions(-) create mode 100644 api/app/core/chapter_pipeline_lock.py create mode 100644 api/app/features/story/post_commit.py create mode 100644 api/tests/test_infra_regressions.py diff --git a/api/.env.example b/api/.env.example index e875a90..656061b 100644 --- a/api/.env.example +++ b/api/.env.example @@ -75,6 +75,18 @@ REDIS_SESSION_TTL=86400 # MEMORY_COMPACTION_TEXT_JACCARD_MIN=0.55 # MEMORY_COMPACTION_METADATA_EVENT_YEAR_WINDOW=1 +# ============================================================================= +# Story 流水线(post-commit、章节物化、append 上限、evidence 检索) +# ============================================================================= +# STORY_IMAGE_ENQUEUE_DEDUP_TTL=300 +# RECOMPOSE_CHAPTER_DELAY_SECONDS=8 +# CHAPTER_PIPELINE_LOCK_TTL_SECONDS=120 +# STORY_APPEND_MAX_CANONICAL_CHARS=12000 +# STORY_APPEND_MAX_VERSIONS=20 +# EVIDENCE_TOP_K_DEFAULT=10 +# EVIDENCE_TOP_K_LARGE_BATCH=5 +# EVIDENCE_LARGE_BATCH_THRESHOLD=3 + # ============================================================================= # Auth # ============================================================================= diff --git a/api/app/adapters/asr/tencent_asr.py b/api/app/adapters/asr/tencent_asr.py index d7f6690..87d7335 100644 --- a/api/app/adapters/asr/tencent_asr.py +++ b/api/app/adapters/asr/tencent_asr.py @@ -1,5 +1,6 @@ """Tencent Cloud ASR adapter — implements ASRProvider port.""" +import asyncio import base64 from app.core.logging import get_logger @@ -51,7 +52,8 @@ class TencentASRProvider: req.Data = audio_base64 req.DataLen = len(audio) - resp = client.SentenceRecognition(req) + # 腾讯 SDK 为同步阻塞调用;放到线程池里避免卡住事件循环。 + resp = await asyncio.to_thread(client.SentenceRecognition, req) text = (resp.Result or "").strip() if text: return text diff --git a/api/app/agents/memoir/prompts.py b/api/app/agents/memoir/prompts.py index fb772b7..c62a7c4 100644 --- a/api/app/agents/memoir/prompts.py +++ b/api/app/agents/memoir/prompts.py @@ -673,18 +673,58 @@ def format_narrative_user_content(oral_text: str, evidence_text: str = "") -> st ) +def _normalize_evidence_line(s: str) -> str: + return re.sub(r"\s+", " ", (s or "").strip().lower()) + + +def dedupe_evidence_chunk_rows(chunks: list) -> list: + """ + 对 relevant_chunks 做稳定去重:按归一化后长度降序 + 原下标,单遍包含判定; + 复杂度 O(n log n);输出按原顺序中保留条目的相对顺序稳定。 + """ + extracted: list[tuple[int, str, object]] = [] + for i, c in enumerate(chunks): + content = ( + c.get("content", "") if isinstance(c, dict) else getattr(c, "content", "") + ) + t = (content or "").strip() + if not t: + continue + extracted.append((i, t, c)) + if len(extracted) <= 1: + return [x[2] for x in extracted] + extracted.sort( + key=lambda x: (-len(_normalize_evidence_line(x[1])), x[0]), + ) + kept_norms: list[str] = [] + kept: list[tuple[int, object]] = [] + for orig_idx, text, c in extracted: + n = _normalize_evidence_line(text) + dup = False + for kn in kept_norms: + if len(n) <= len(kn) and n in kn: + dup = True + break + if not dup: + kept_norms.append(n) + kept.append((orig_idx, c)) + kept.sort(key=lambda x: x[0]) + return [x[1] for x in kept] + + def format_evidence_chunks_for_prompt(evidence: dict) -> str: """将 retrieve_evidence / retrieve_evidence_sync 结果格式化为简短文本,供叙事 prompt 使用。 包含 chunks、摘要(若有)、confirmed facts、timeline、故事摘要(若有)。 """ chunks = evidence.get("relevant_chunks") or [] + chunks = dedupe_evidence_chunk_rows(chunks[:10]) summaries = evidence.get("relevant_summaries") or [] facts = evidence.get("relevant_facts") or [] timeline = evidence.get("timeline_hints") or [] stories = evidence.get("relevant_stories") or [] parts: list[str] = [] - for c in chunks[:10]: + for c in chunks: content = ( c.get("content", "") if isinstance(c, dict) else getattr(c, "content", "") ) diff --git a/api/app/agents/memoir/story_route_agent.py b/api/app/agents/memoir/story_route_agent.py index 118382e..449d0cb 100644 --- a/api/app/agents/memoir/story_route_agent.py +++ b/api/app/agents/memoir/story_route_agent.py @@ -63,8 +63,15 @@ class StoryRouteDecision(BaseModel): return str(v) -def _build_candidate_json(stories: list[Story], *, preview_chars: int = 220) -> str: +def _build_candidate_json( + stories: list[Story], + *, + preview_chars: int = 220, + story_meta: dict[str, dict[str, int]] | None = None, +) -> str: + """story_meta: story_id -> { char_count, version_count },供路由感知篇幅与版本数。""" rows: list[dict[str, Any]] = [] + meta = story_meta or {} for s in stories: md = (s.canonical_markdown or "").strip().replace("\n", " ") preview = md[:preview_chars] + ("…" if len(md) > preview_chars else "") @@ -76,14 +83,17 @@ def _build_candidate_json(stories: list[Story], *, preview_chars: int = 220) -> cat = getattr(ch, "category", None) or "" tit = getattr(ch, "title", None) or "" links.append(f"{tit}({cat})") - rows.append( - { - "id": s.id, - "title": s.title, - "preview": preview, - "linked_chapters": links, - } - ) + row: dict[str, Any] = { + "id": s.id, + "title": s.title, + "preview": preview, + "linked_chapters": links, + } + m = meta.get(str(s.id)) + if m: + row["char_count"] = int(m.get("char_count", 0)) + row["version_count"] = int(m.get("version_count", 0)) + rows.append(row) return json.dumps(rows, ensure_ascii=False, indent=2) @@ -142,6 +152,7 @@ class StoryRouteAgent: candidate_stories: list[Story], llm: Any, valid_story_ids: set[str], + story_meta: dict[str, dict[str, int]] | None = None, ) -> StoryRouteDecision: if not llm: return StoryRouteDecision( @@ -149,7 +160,7 @@ class StoryRouteAgent: new_story_title=None, reason="no_llm", ) - payload = _build_candidate_json(candidate_stories) + payload = _build_candidate_json(candidate_stories, story_meta=story_meta) prompt = get_story_route_prompt( chapter_category=chapter_category, chapter_title=chapter_title, @@ -200,13 +211,14 @@ class StoryRouteAgent: candidate_stories: list[Story], llm: Any, valid_story_ids: set[str], + story_meta: dict[str, dict[str, int]] | None = None, ) -> StoryBatchPlan | None: """ 将本批 segment 划分为多个写入单元。解析失败返回 None,由调用方回退 decide()。 """ if not llm or len(segments) < 2: return None - payload = _build_candidate_json(candidate_stories) + payload = _build_candidate_json(candidate_stories, story_meta=story_meta) segments_json = _build_segments_json_for_plan(segments) prompt = get_story_batch_plan_prompt( chapter_category=chapter_category, diff --git a/api/app/core/chapter_pipeline_lock.py b/api/app/core/chapter_pipeline_lock.py new file mode 100644 index 0000000..55b86cf --- /dev/null +++ b/api/app/core/chapter_pipeline_lock.py @@ -0,0 +1,24 @@ +"""Pipeline 章节级 Redis 锁:与 memoir pipeline / recompose_chapter 共用同一 key 空间。""" + +from app.core.redis_lock import ( + RedisLockHandle, + acquire_redis_lock, + release_redis_lock, +) + +ChapterPipelineLockHandle = RedisLockHandle + + +def _lock_key(user_id: str, stage: str) -> str: + return f"lock:chapter:{user_id}:{stage}" + + +def acquire_chapter_pipeline_lock( + user_id: str, stage: str, *, ttl_seconds: int +) -> ChapterPipelineLockHandle | None: + """获取 `lock:chapter:{user_id}:{stage}`;失败返回 None。""" + return acquire_redis_lock(_lock_key(user_id, stage), ttl_seconds=ttl_seconds) + + +def release_chapter_pipeline_lock(handle: ChapterPipelineLockHandle | None) -> None: + release_redis_lock(handle) diff --git a/api/app/core/config.py b/api/app/core/config.py index 9f74303..b55bd63 100644 --- a/api/app/core/config.py +++ b/api/app/core/config.py @@ -145,6 +145,19 @@ class Settings(BaseSettings): memoir_image_download_hosts: str = "" # Story 正文至少多少字才创建主图 intent / 调图(0 表示不限制) story_image_min_body_chars: int = 400 + # generate_story_image 入队去重(Redis SET NX,秒) + story_image_enqueue_dedup_ttl: int = Field(default=300, ge=30, le=86400) + # 章节物化异步任务延迟入队(秒),削峰 + recompose_chapter_delay_seconds: int = Field(default=8, ge=0, le=600) + # 与 memoir pipeline 一致的章节互斥锁 TTL(秒) + chapter_pipeline_lock_ttl_seconds: int = Field(default=120, ge=10, le=3600) + # Append 硬上限:canonical 字符数、版本数(超限强制 new_story) + story_append_max_canonical_chars: int = Field(default=12000, ge=1000, le=500_000) + story_append_max_versions: int = Field(default=20, ge=1, le=500) + # Evidence 检索 top_k:大批次 unit 时降低检索量 + evidence_top_k_default: int = Field(default=10, ge=1, le=50) + evidence_top_k_large_batch: int = Field(default=5, ge=1, le=50) + evidence_large_batch_threshold: int = Field(default=3, ge=1, le=100) # 叙事输出相对口述过短则回退为口述原文(比例与下限) memoir_narrative_fallback_body_ratio: float = 0.5 memoir_narrative_fallback_min_chars: int = 20 diff --git a/api/app/features/memoir/repo.py b/api/app/features/memoir/repo.py index b68dba5..49fe4cc 100644 --- a/api/app/features/memoir/repo.py +++ b/api/app/features/memoir/repo.py @@ -115,6 +115,21 @@ def mark_chapters_dirty_for_story_sync(session: Session, story_id: str) -> None: ) +def get_chapter_ids_linked_to_story_sync(session: Session, story_id: str) -> list[str]: + stmt = select(ChapterStoryLink.chapter_id).where( + ChapterStoryLink.story_id == story_id + ) + return list(dict.fromkeys(session.scalars(stmt).all())) + + +def mark_chapter_dirty_sync(session: Session, chapter_id: str) -> None: + """幂等:将章节标为需物化 markdown(多次调用安全)。""" + ch = session.get(Chapter, chapter_id) + if ch: + ch.markdown_compose_dirty = True + session.flush() + + async def get_chapter_with_story_links_for_compose( chapter_id: str, db: AsyncSession ) -> Chapter | None: @@ -229,6 +244,8 @@ def compose_chapter_from_story_links_sync(session: Session, chapter_id: str) -> """ 按 story_links 重组 canonical_markdown 并写入版本链。 若无 story_links 则清除 dirty 并返回 False。 + + 不变量:成功物化或空链接分支均会将 markdown_compose_dirty=False;异常退出保留 dirty。 """ stmt = ( select(Chapter) diff --git a/api/app/features/memoir/story_pipeline_sync.py b/api/app/features/memoir/story_pipeline_sync.py index ff3be8b..97e1a5a 100644 --- a/api/app/features/memoir/story_pipeline_sync.py +++ b/api/app/features/memoir/story_pipeline_sync.py @@ -1,10 +1,13 @@ """ -Celery 用:按批次将 transcript 写入 Story,并物化 Chapter canonical_markdown。 +Celery 用:按批次将 transcript 写入 Story,并标记 Chapter 需物化(markdown_compose_dirty)。 + +同步路径不执行 compose;物化由 commit 后 `recompose_chapter` 异步完成。 """ from __future__ import annotations import json +import time import uuid from typing import Any @@ -17,26 +20,27 @@ from app.agents.memoir.prompts import ( format_evidence_chunks_for_prompt, format_narrative_user_content, ) -from app.core.config import settings from app.agents.memoir.story_route_agent import ( PLAN_BATCH_MAX_SEGMENTS, StoryBatchPlan, StoryRouteAgent, ) from app.agents.state_schema import MemoirStateSchema +from app.core.config import settings from app.core.logging import get_logger from app.features.memoir.cover_eligibility import chapter_needs_cover_enqueue from app.features.memoir.memoir_images.settings import MemoirImageSettings from app.features.memoir.models import Chapter from app.features.memoir.narrative_to_markdown import narrative_to_markdown from app.features.memoir.repo import ( - compose_chapter_from_story_links_sync, + mark_chapter_dirty_sync, reorder_chapter_story_links_by_life_order_sync, ) from app.features.memory.repo import retrieve_evidence_sync from app.features.story.models import Story from app.features.story.sync_write import ( append_story_version_sync, + count_story_versions_sync, create_story_with_version_sync, ensure_chapter_story_link_sync, list_active_stories_for_user_sync, @@ -71,12 +75,12 @@ def _gate_narrative_fidelity( llm: Any, *, existing_canonical: str | None = None, -) -> str: - """叙事 JSON 忠实度检查;不通过则回退为口述正文(续写时保留已有故事 + 口述)。""" +) -> tuple[str, str]: + """返回 (文本, fallback 原因);忠实度不通过时第二项为 fidelity_failed。""" from app.agents.memoir.fidelity_check_agent import FidelityCheckAgent if not settings.memoir_fidelity_check_enabled or not llm: - return narrative_raw + return narrative_raw, "none" agent = FidelityCheckAgent() ex = (existing_canonical or "").strip() or None if agent.passes( @@ -85,7 +89,7 @@ def _gate_narrative_fidelity( llm=llm, existing_canonical_markdown=ex, ): - return narrative_raw + return narrative_raw, "none" logger.warning( "event=fidelity_gate_fallback oral_len={} merge={}", len((oral_text or "").strip()), @@ -93,8 +97,8 @@ def _gate_narrative_fidelity( ) o = (oral_text or "").strip() if not o and not ex: - return narrative_raw - return _fidelity_fallback_json(o, ex) + return narrative_raw, "none" + return _fidelity_fallback_json(o, ex), "fidelity_failed" def _should_fallback_to_transcript(md: str, oral: str) -> bool: @@ -155,7 +159,8 @@ def _apply_narrative_fallbacks( existing_for_narrative: str, *, chapter_category: str, -) -> str: +) -> tuple[str, str]: + """返回 (文本, fallback_type);无改写时为 none。""" # 整篇合并(JSON)输出异常缩水:回退为旧文 + 本段口述,避免覆盖丢失 if existing_for_narrative and _is_json_narrative(narrative_raw): merged_md = narrative_to_markdown(narrative_raw).strip() @@ -166,7 +171,7 @@ def _apply_narrative_fallbacks( "chapter_category={}", chapter_category, ) - return f"{ex}\n\n{combined_unit_text.strip()}" + return f"{ex}\n\n{combined_unit_text.strip()}", "merge_shrink" if ( existing_for_narrative @@ -178,10 +183,10 @@ def _apply_narrative_fallbacks( "chapter_category={}", chapter_category, ) - return f"{existing_for_narrative}\n\n{combined_unit_text}" - - # 禁止把「章节级 canonical」(多故事拼接)写进单条 Story:会把全章正文塞进一个故事, - # 且该 story 若挂多章会导致各章阅读视图串台。新建故事时宁可短,也不拼接 existing_chapter_md。 + return ( + f"{existing_for_narrative}\n\n{combined_unit_text}", + "coalesce_to_old_plus_oral", + ) md_check = narrative_to_markdown(narrative_raw).strip() oral = (combined_unit_text or "").strip() @@ -195,7 +200,7 @@ def _apply_narrative_fallbacks( len(oral), len(md_check), ) - return f"{ex_fb}\n\n{oral}" + return f"{ex_fb}\n\n{oral}", "coalesce_to_old_plus_oral" logger.warning( "event=narrative_fallback reason=body_too_short_vs_oral " "chapter_category={} oral_len={} md_len={}", @@ -203,9 +208,28 @@ def _apply_narrative_fallbacks( len(oral), len(md_check), ) - return oral + return oral, "coalesce_to_oral" - return narrative_raw + return narrative_raw, "none" + + +def _merge_fallback_type(gate_ft: str, apply_ft: str) -> str: + if apply_ft != "none": + return apply_ft + return gate_ft + + +def _story_meta_for_route( + session: Session, candidates: list +) -> dict[str, dict[str, int]]: + meta: dict[str, dict[str, int]] = {} + for s in candidates: + sid = str(s.id) + meta[sid] = { + "char_count": len((s.canonical_markdown or "").strip()), + "version_count": count_story_versions_sync(session, sid), + } + return meta def _ensure_chapter_record( @@ -268,19 +292,37 @@ def _run_batch_plan_writes( narrative_agent: NarrativeAgent, ) -> set[str]: dispatch_ids: set[str] = set() + max_chars = int(settings.story_append_max_canonical_chars) + max_ver = int(settings.story_append_max_versions) for unit in plan.units: + t0 = time.perf_counter() unit_text = _ordered_text_for_segment_ids(category_segments, unit.segment_ids) new_content_input = format_narrative_user_content(unit_text, evidence_text) target_story_id: str | None = None existing_for_narrative = "" + decision_source = "batch_plan" if unit.decision == "append_story" and unit.target_story_id: st = session.get(Story, unit.target_story_id) if st and st.user_id == user_id: - target_story_id = st.id - existing_for_narrative = (st.canonical_markdown or "").strip() + canon = (st.canonical_markdown or "").strip() + vc = count_story_versions_sync(session, str(st.id)) + if len(canon) > max_chars or vc >= max_ver: + logger.info( + "event=append_overflow_to_new story_id={} canonical_chars={} " + "versions={} decision_source=batch_plan", + str(st.id), + len(canon), + vc, + ) + target_story_id = None + existing_for_narrative = "" + decision_source = "forced_new_due_to_append_limit" + else: + target_story_id = st.id + existing_for_narrative = canon - narrative_raw = narrative_agent.generate_narrative( + raw_gen = narrative_agent.generate_narrative( stage=chapter_category, slots=slot_snippets, new_content=new_content_input, @@ -289,18 +331,29 @@ def _run_batch_plan_writes( birth_year=user_birth_year, llm=llm, ) - narrative_raw = _gate_narrative_fidelity( + json_invalid = False + s0 = (raw_gen or "").strip() + if s0.startswith("{") and "paragraphs" in s0: + try: + json.loads(s0) + except json.JSONDecodeError: + json_invalid = True + + narrative_raw, fb_gate = _gate_narrative_fidelity( unit_text, - narrative_raw, + raw_gen, llm, existing_canonical=existing_for_narrative or None, ) - narrative_raw = _apply_narrative_fallbacks( + narrative_raw, fb_apply = _apply_narrative_fallbacks( narrative_raw, unit_text, existing_for_narrative, chapter_category=chapter_category, ) + fallback_type = _merge_fallback_type(fb_gate, fb_apply) + if json_invalid and fallback_type == "none": + fallback_type = "json_invalid" md = _coalesce_story_markdown( narrative_to_markdown(narrative_raw).strip(), @@ -309,11 +362,13 @@ def _run_batch_plan_writes( ) if target_story_id: - append_story_version_sync(session, target_story_id, md) - dispatch_ids.add(target_story_id) + append_story_version_sync(session, str(target_story_id), md) + dispatch_ids.add(str(target_story_id)) ensure_chapter_story_link_sync( - session, chapter_id=chapter.id, story_id=target_story_id + session, chapter_id=str(chapter.id), story_id=str(target_story_id) ) + sid_log = target_story_id + is_append = True else: story_title = (unit.new_story_title or "").strip() if not story_title: @@ -332,10 +387,33 @@ def _run_batch_plan_writes( canonical_markdown=md, stage=chapter_category, ) - dispatch_ids.add(st.id) + dispatch_ids.add(str(st.id)) ensure_chapter_story_link_sync( - session, chapter_id=chapter.id, story_id=st.id + session, chapter_id=str(chapter.id), story_id=str(st.id) ) + sid_log = st.id + is_append = False + + elapsed = time.perf_counter() - t0 + logger.info( + "event=story_generated route_type=batch decision_source={} route_decision={} " + "unit_segments={} used_evidence={} narrative_json_valid={} fidelity_passed={} " + "fallback_type={} oral_len={} md_len={} chapter_category={} is_append={} " + "story_id={} seconds={:.3f}", + decision_source, + unit.decision, + len(unit.segment_ids), + bool(evidence_text.strip()), + _is_json_narrative(raw_gen), + fb_gate == "none", + fallback_type, + len(unit_text.strip()), + len(md.strip()), + chapter_category, + is_append, + sid_log, + elapsed, + ) return dispatch_ids @@ -361,8 +439,12 @@ def run_story_pipeline_for_category_batch( combined_text = "\n\n".join(segment_texts) source_ids = [seg.id for seg in category_segments] + n_units = len(category_segments) + top_k = int(settings.evidence_top_k_default) + if n_units > int(settings.evidence_large_batch_threshold): + top_k = int(settings.evidence_top_k_large_batch) try: - evidence = retrieve_evidence_sync(session, user_id, combined_text, top_k=10) + evidence = retrieve_evidence_sync(session, user_id, combined_text, top_k=top_k) except Exception as e: logger.warning("Evidence 检索跳过: {}", e) evidence = { @@ -412,7 +494,8 @@ def run_story_pipeline_for_category_batch( ) candidates = list_active_stories_for_user_sync(session, user_id) - valid_ids = {s.id for s in candidates} + valid_ids = {str(s.id) for s in candidates} + story_meta = _story_meta_for_route(session, candidates) batch_for_route = ( f"{combined_text}\n\n{evidence_text}" @@ -437,6 +520,7 @@ def run_story_pipeline_for_category_batch( candidate_stories=candidates, llm=llm, valid_story_ids=valid_ids, + story_meta=story_meta, ) chapter = _ensure_chapter_record( @@ -471,17 +555,36 @@ def run_story_pipeline_for_category_batch( candidate_stories=candidates, llm=llm, valid_story_ids=valid_ids, + story_meta=story_meta, ) + t0 = time.perf_counter() target_story_id: str | None = None existing_for_narrative = "" + decision_source = "fallback_no_llm" if not llm else "single_decide" + max_chars = int(settings.story_append_max_canonical_chars) + max_ver = int(settings.story_append_max_versions) if route.decision == "append_story" and route.target_story_id: st = session.get(Story, route.target_story_id) if st and st.user_id == user_id: - target_story_id = st.id - existing_for_narrative = (st.canonical_markdown or "").strip() + canon = (st.canonical_markdown or "").strip() + vc = count_story_versions_sync(session, str(st.id)) + if len(canon) > max_chars or vc >= max_ver: + logger.info( + "event=append_overflow_to_new story_id={} canonical_chars={} " + "versions={} decision_source=single_decide", + str(st.id), + len(canon), + vc, + ) + target_story_id = None + existing_for_narrative = "" + decision_source = "forced_new_due_to_append_limit" + else: + target_story_id = st.id + existing_for_narrative = canon - narrative_raw = narrative_agent.generate_narrative( + raw_gen = narrative_agent.generate_narrative( stage=chapter_category, slots=slot_snippets, new_content=new_content_input, @@ -490,19 +593,30 @@ def run_story_pipeline_for_category_batch( birth_year=user_birth_year, llm=llm, ) - narrative_raw = _gate_narrative_fidelity( + json_invalid = False + s0 = (raw_gen or "").strip() + if s0.startswith("{") and "paragraphs" in s0: + try: + json.loads(s0) + except json.JSONDecodeError: + json_invalid = True + + narrative_raw, fb_gate = _gate_narrative_fidelity( combined_text, - narrative_raw, + raw_gen, llm, existing_canonical=existing_for_narrative or None, ) - narrative_raw = _apply_narrative_fallbacks( + narrative_raw, fb_apply = _apply_narrative_fallbacks( narrative_raw, combined_text, existing_for_narrative, chapter_category=chapter_category, ) + fallback_type = _merge_fallback_type(fb_gate, fb_apply) + if json_invalid and fallback_type == "none": + fallback_type = "json_invalid" md = _coalesce_story_markdown( narrative_to_markdown(narrative_raw).strip(), @@ -513,11 +627,13 @@ def run_story_pipeline_for_category_batch( do_append = target_story_id is not None if do_append: - append_story_version_sync(session, target_story_id, md) - dispatch_ids.add(target_story_id) + append_story_version_sync(session, str(target_story_id), md) + dispatch_ids.add(str(target_story_id)) ensure_chapter_story_link_sync( - session, chapter_id=chapter.id, story_id=target_story_id + session, chapter_id=str(chapter.id), story_id=str(target_story_id) ) + sid_log = target_story_id + is_append = True else: story_title = (route.new_story_title or "").strip() if not story_title: @@ -536,13 +652,36 @@ def run_story_pipeline_for_category_batch( canonical_markdown=md, stage=chapter_category, ) - dispatch_ids.add(st.id) + dispatch_ids.add(str(st.id)) ensure_chapter_story_link_sync( - session, chapter_id=chapter.id, story_id=st.id + session, chapter_id=str(chapter.id), story_id=str(st.id) ) + sid_log = st.id + is_append = False - reorder_chapter_story_links_by_life_order_sync(session, chapter.id) - compose_chapter_from_story_links_sync(session, chapter.id) + elapsed = time.perf_counter() - t0 + logger.info( + "event=story_generated route_type=single decision_source={} route_decision={} " + "unit_segments={} used_evidence={} narrative_json_valid={} fidelity_passed={} " + "fallback_type={} oral_len={} md_len={} chapter_category={} is_append={} " + "story_id={} seconds={:.3f}", + decision_source, + route.decision, + len(category_segments), + bool(evidence_text.strip()), + _is_json_narrative(raw_gen), + fb_gate == "none", + fallback_type, + len(combined_text.strip()), + len(md.strip()), + chapter_category, + is_append, + sid_log, + elapsed, + ) + + reorder_chapter_story_links_by_life_order_sync(session, str(chapter.id)) + mark_chapter_dirty_sync(session, str(chapter.id)) session.flush() image_settings = MemoirImageSettings.from_env() diff --git a/api/app/features/story/post_commit.py b/api/app/features/story/post_commit.py new file mode 100644 index 0000000..1eb053b --- /dev/null +++ b/api/app/features/story/post_commit.py @@ -0,0 +1,148 @@ +""" +Story 写入提交后的统一异步调度:插图、章节物化、可选 memory compaction。 + +enqueue 失败不回滚已提交数据,仅记录日志;依赖后续触发或重试收敛。 +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime, timezone +import threading +from typing import Any, cast + +import redis + +from app.core.config import settings +from app.core.logging import get_logger +from app.core.memory_compaction_schedule import schedule_memory_compaction_run + +logger = get_logger(__name__) +_redis_client: redis.Redis | None = None +_redis_lock = threading.Lock() + + +def _story_image_enqueue_key(story_id: str) -> str: + return f"enqueue:story-image:{story_id}" + + +def _get_redis() -> redis.Redis: + """进程内复用单个 Redis 客户端,避免重复创建连接池。""" + global _redis_client + if _redis_client is None: + with _redis_lock: + if _redis_client is None: + _redis_client = redis.from_url( + settings.redis_url, decode_responses=True + ) + return _redis_client + + +@dataclass +class PostCommitResult: + enqueued_story_image_count: int = 0 + enqueued_chapter_recompose_count: int = 0 + compaction_scheduled: bool = False + errors: list[str] = field(default_factory=list) + + +def enqueue_story_post_commit_effects( + *, + user_id: str, + story_ids: set[str], + chapter_ids: set[str], + trigger_source: str, + need_image: bool = True, + need_recompose: bool = True, + need_compaction: bool = False, + compaction_extra: dict[str, Any] | None = None, +) -> PostCommitResult: + """ + story_ids 为空则跳过 image;chapter_ids 为空则跳过 recompose。 + need_compaction=True 时仅按 user_id 调度 compaction(不依赖 story/chapter 集合)。 + """ + result = PostCommitResult() + r = _get_redis() + ttl = int(settings.story_image_enqueue_dedup_ttl) + + if need_image and story_ids: + from app.tasks.story_image_tasks import ( + generate_story_image as gen_story_image_task, + ) + + for sid in sorted(story_ids): + key = _story_image_enqueue_key(sid) + try: + if not r.set(key, "1", nx=True, ex=ttl): + logger.debug( + "story_image enqueue skipped (dedup): story={} trigger={}", + sid, + trigger_source, + ) + continue + except Exception as exc: + logger.warning( + "story_image enqueue dedup redis failed, allowing enqueue: " + "story={} err={}", + sid, + exc, + ) + try: + cast(Any, gen_story_image_task).delay(sid) + result.enqueued_story_image_count += 1 + except Exception as exc: + logger.warning( + "generate_story_image.delay failed story={} trigger={}: {}", + sid, + trigger_source, + exc, + ) + result.errors.append(f"generate_story_image:{sid}:{exc}") + try: + r.delete(key) + except Exception: + pass + + if need_recompose and chapter_ids: + from app.tasks.chapter_compose_tasks import ( + recompose_chapter as recompose_chapter_task, + ) + + cd = int(settings.recompose_chapter_delay_seconds) + for cid in sorted(chapter_ids): + try: + cast(Any, recompose_chapter_task).apply_async( + args=[cid], countdown=max(0, cd) + ) + result.enqueued_chapter_recompose_count += 1 + except Exception as exc: + logger.warning( + "recompose_chapter.apply_async failed chapter={} trigger={}: {}", + cid, + trigger_source, + exc, + ) + result.errors.append(f"recompose_chapter:{cid}:{exc}") + + if need_compaction: + try: + ctx: dict[str, Any] = { + "trigger_source": trigger_source, + "trigger_time": datetime.now(timezone.utc).isoformat(), + "story_ids": sorted(story_ids), + "chapter_ids": sorted(chapter_ids), + } + if compaction_extra: + ctx.update(compaction_extra) + schedule_memory_compaction_run(user_id, ctx) + result.compaction_scheduled = True + except Exception as exc: + logger.warning( + "schedule_memory_compaction_run failed user_id={} trigger={}: {}", + user_id, + trigger_source, + exc, + ) + result.errors.append(f"compaction:{exc}") + + return result diff --git a/api/app/features/story/service.py b/api/app/features/story/service.py index b6b39b9..1f84b75 100644 --- a/api/app/features/story/service.py +++ b/api/app/features/story/service.py @@ -11,11 +11,10 @@ from datetime import datetime, timezone from sqlalchemy.ext.asyncio import AsyncSession from app.core.logging import get_logger -from app.features.memoir.asset_resolver import strip_asset_image_refs_from_markdown from app.features.memoir import repo as memoir_repo +from app.features.memoir.asset_resolver import strip_asset_image_refs_from_markdown from app.features.memoir.memoir_images.settings import MemoirImageSettings from app.features.story.image_intent_extractor import extract_primary_image_intent -from app.features.story.time_hints import apply_infer_story_time_start_to_model from app.features.story.repo import ( count_story_versions, create_story, @@ -27,6 +26,7 @@ from app.features.story.repo import ( get_story_by_id, get_story_image_intent_by_story, ) +from app.features.story.time_hints import apply_infer_story_time_start_to_model logger = get_logger(__name__) @@ -156,17 +156,26 @@ class StoryService: await memoir_repo.mark_chapters_dirty_for_story(self._db, story.id) await self._db.commit() if md.strip(): - from app.tasks.chapter_compose_tasks import recompose_chapters_for_story - from app.tasks.story_image_tasks import generate_story_image + from app.features.memoir.repo import get_chapter_ids_linked_to_story + from app.features.story.post_commit import enqueue_story_post_commit_effects - try: - generate_story_image.delay(story.id) - except Exception as exc: - logger.warning("派发 generate_story_image 失败: {}", exc) - try: - recompose_chapters_for_story.delay(story.id) - except Exception as exc: - logger.warning("派发 recompose_chapters_for_story 失败: {}", exc) + chapter_ids = set(await get_chapter_ids_linked_to_story(self._db, story.id)) + pc = enqueue_story_post_commit_effects( + user_id=user_id, + story_ids={story.id}, + chapter_ids=chapter_ids, + trigger_source="manual_api", + need_compaction=False, + ) + logger.info( + "event=story_post_commit user_id={} trigger=manual_api " + "enqueued_story_image_count={} enqueued_chapter_recompose_count={} " + "errors={}", + user_id, + pc.enqueued_story_image_count, + pc.enqueued_chapter_recompose_count, + pc.errors, + ) return story.id async def append_version( @@ -208,17 +217,25 @@ class StoryService: ) await memoir_repo.mark_chapters_dirty_for_story(self._db, story_id) await self._db.commit() - from app.tasks.chapter_compose_tasks import recompose_chapters_for_story - from app.tasks.story_image_tasks import generate_story_image + from app.features.memoir.repo import get_chapter_ids_linked_to_story + from app.features.story.post_commit import enqueue_story_post_commit_effects - try: - generate_story_image.delay(story_id) - except Exception as exc: - logger.warning("派发 generate_story_image 失败: {}", exc) - try: - recompose_chapters_for_story.delay(story_id) - except Exception as exc: - logger.warning("派发 recompose_chapters_for_story 失败: {}", exc) + chapter_ids = set(await get_chapter_ids_linked_to_story(self._db, story_id)) + pc = enqueue_story_post_commit_effects( + user_id=story.user_id, + story_ids={story_id}, + chapter_ids=chapter_ids, + trigger_source="manual_api", + need_compaction=False, + ) + logger.info( + "event=story_post_commit user_id={} trigger=manual_api_append " + "enqueued_story_image_count={} enqueued_chapter_recompose_count={} errors={}", + story.user_id, + pc.enqueued_story_image_count, + pc.enqueued_chapter_recompose_count, + pc.errors, + ) return version.id async def link_evidence( diff --git a/api/app/tasks/chapter_compose_tasks.py b/api/app/tasks/chapter_compose_tasks.py index d46b191..bd40e8f 100644 --- a/api/app/tasks/chapter_compose_tasks.py +++ b/api/app/tasks/chapter_compose_tasks.py @@ -5,6 +5,11 @@ from datetime import datetime, timezone from celery import shared_task from sqlalchemy import select +from app.core.chapter_pipeline_lock import ( + acquire_chapter_pipeline_lock, + release_chapter_pipeline_lock, +) +from app.core.config import settings from app.core.db import get_sync_db from app.core.logging import get_logger from app.core.memory_compaction_schedule import schedule_memory_compaction_run @@ -15,13 +20,85 @@ from app.features.story.models import Story logger = get_logger(__name__) +@shared_task(bind=True, max_retries=3, default_retry_delay=30) +def recompose_chapter(self, chapter_id: str) -> dict: + """ + 按章节物化 canonical_markdown:仅当 markdown_compose_dirty 为 True 时执行; + 与 pipeline 共用章节级 Redis 锁,拿不到锁则跳过(依赖后续触发重试)。 + """ + lock_ttl = int(settings.chapter_pipeline_lock_ttl_seconds) + user_id: str | None = None + composed = False + with get_sync_db() as session: + chapter = session.get(Chapter, chapter_id) + if not chapter: + logger.info("recompose_chapter: chapter_id={} status=not_found", chapter_id) + return {"status": "not_found"} + if chapter.markdown_compose_dirty is not True: + logger.info( + "recompose_chapter: chapter_id={} status=skip_not_dirty", + chapter_id, + ) + return {"status": "skip_not_dirty"} + uid = str(chapter.user_id) + stage = str(chapter.category) + lock_handle = acquire_chapter_pipeline_lock(uid, stage, ttl_seconds=lock_ttl) + if lock_handle is None: + logger.info( + "event=recompose_chapter status=skip_lock_contention " + "chapter_id={} user_id={} stage={}", + chapter_id, + uid, + stage, + ) + return {"status": "skip_lock_contention"} + try: + composed = memoir_repo.compose_chapter_from_story_links_sync( + session, chapter_id + ) + session.commit() + user_id = uid + except Exception as exc: + session.rollback() + logger.warning( + "recompose_chapter failed chapter_id={} err={}", chapter_id, exc + ) + raise self.retry(exc=exc) from exc + finally: + release_chapter_pipeline_lock(lock_handle) + + if user_id: + schedule_memory_compaction_run( + user_id, + { + "trigger_source": "chapter_recompose", + "trigger_time": datetime.now(timezone.utc).isoformat(), + "pipeline_run_id": str(self.request.id), + "recomposed_chapter_ids": [chapter_id], + }, + ) + logger.info( + "recompose_chapter: chapter_id={} status={}", + chapter_id, + "composed" if composed else "empty", + ) + return {"status": "composed" if composed else "empty", "chapter_id": chapter_id} + + @shared_task(bind=True, max_retries=3, default_retry_delay=30) def recompose_chapters_for_story(self, story_id: str) -> dict: + """ + 按 story 找出 dirty 章节并物化。 + + .. deprecated:: + 请改用 `recompose_chapter`(按章聚合)+ `enqueue_story_post_commit_effects`; + 保留兼容,待调用方全部迁移后删除。 + """ user_id: str | None = None try: with get_sync_db() as session: story = session.get(Story, story_id) - user_id = story.user_id if story else None + user_id = str(story.user_id) if story else None stmt = ( select(Chapter.id) .join( diff --git a/api/app/tasks/memoir_tasks.py b/api/app/tasks/memoir_tasks.py index 3681c3f..60cc978 100644 --- a/api/app/tasks/memoir_tasks.py +++ b/api/app/tasks/memoir_tasks.py @@ -15,10 +15,16 @@ from sqlalchemy.orm import Session from app.agents.chat.prompts_profile import format_user_profile_context from app.agents.memoir import MemoirOrchestrator from app.agents.state_schema import MemoirStateSchema, SlotData, default_state +from app.core.chapter_pipeline_lock import ( + acquire_chapter_pipeline_lock as _acquire_chapter_lock, +) +from app.core.chapter_pipeline_lock import ( + release_chapter_pipeline_lock as _release_chapter_lock, +) +from app.core.config import settings from app.core.db import get_sync_db from app.core.dependencies import get_llm_provider from app.core.logging import get_logger -from app.core.memory_compaction_schedule import schedule_memory_compaction_run from app.features.conversation.models import Segment from app.features.memoir.cover_eligibility import ( chapter_needs_cover_enqueue, @@ -71,18 +77,8 @@ def _get_redis_client(*, decode_responses: bool = False) -> redis.Redis: return client -def _acquire_chapter_lock(user_id: str, stage: str, timeout: int = 120) -> bool: - """获取章节分布式锁,防止并发写入同一章节""" - r = _get_redis_client() - lock_key = f"lock:chapter:{user_id}:{stage}" - return r.set(lock_key, "1", nx=True, ex=timeout) - - -def _release_chapter_lock(user_id: str, stage: str): - """释放章节分布式锁""" - r = _get_redis_client() - lock_key = f"lock:chapter:{user_id}:{stage}" - r.delete(lock_key) +def _chapter_lock_ttl() -> int: + return int(settings.chapter_pipeline_lock_ttl_seconds) def _update_task_status_sync( @@ -338,11 +334,15 @@ def process_memoir_segments(self, user_id: str, segment_ids: List[str]): ) chapters_to_enqueue: Set[str] = set() + affected_chapter_ids: Set[str] = set() for ( chapter_category, category_segments, ) in prepared.category_to_segments.items(): - if not _acquire_chapter_lock(user_id, chapter_category): + lock_handle = _acquire_chapter_lock( + user_id, chapter_category, ttl_seconds=_chapter_lock_ttl() + ) + if lock_handle is None: logger.warning( "章节锁竞争: category={}, 延迟重试", chapter_category, @@ -362,6 +362,7 @@ def process_memoir_segments(self, user_id: str, segment_ids: List[str]): story_dispatch_ids |= disp db.flush() db.refresh(chapter) + affected_chapter_ids.add(chapter.id) needs_cover_enqueue = ( image_settings.enabled and chapter_needs_cover_enqueue(chapter) @@ -390,7 +391,7 @@ def process_memoir_segments(self, user_id: str, segment_ids: List[str]): if chapter and needs_cover_enqueue: chapters_to_enqueue.add(chapter.id) finally: - _release_chapter_lock(user_id, chapter_category) + _release_chapter_lock(lock_handle) # 标记段落为已处理 for seg in segments: @@ -398,18 +399,30 @@ def process_memoir_segments(self, user_id: str, segment_ids: List[str]): db.commit() - from app.tasks.chapter_compose_tasks import recompose_chapters_for_story - from app.tasks.story_image_tasks import generate_story_image + from app.features.story.post_commit import enqueue_story_post_commit_effects - for sid in story_dispatch_ids: - try: - generate_story_image.delay(sid) - except Exception as exc: - logger.warning("generate_story_image delay: {}", exc) - try: - recompose_chapters_for_story.delay(sid) - except Exception as exc: - logger.warning("recompose_chapters_for_story delay: {}", exc) + pc = enqueue_story_post_commit_effects( + user_id=user_id, + story_ids=set(story_dispatch_ids), + chapter_ids=affected_chapter_ids, + trigger_source="pipeline", + need_compaction=True, + compaction_extra={ + "pipeline_run_id": str(task_id), + "story_dispatch_ids": sorted(story_dispatch_ids), + "chapters_to_enqueue": sorted(chapters_to_enqueue), + }, + ) + logger.info( + "event=story_post_commit user_id={} trigger=pipeline " + "enqueued_story_image_count={} enqueued_chapter_recompose_count={} " + "compaction_scheduled={} errors={}", + user_id, + pc.enqueued_story_image_count, + pc.enqueued_chapter_recompose_count, + pc.compaction_scheduled, + pc.errors, + ) from app.tasks.chapter_cover_enqueue import ( try_enqueue_generate_chapter_cover, @@ -419,17 +432,6 @@ def process_memoir_segments(self, user_id: str, segment_ids: List[str]): if try_enqueue_generate_chapter_cover(chapter_id, source="pipeline"): logger.info(f"派发章节封面任务: chapter={chapter_id}") - schedule_memory_compaction_run( - user_id, - { - "trigger_source": "memoir_segments", - "trigger_time": datetime.now(timezone.utc).isoformat(), - "pipeline_run_id": str(task_id), - "story_dispatch_ids": sorted(story_dispatch_ids), - "chapters_to_enqueue": sorted(chapters_to_enqueue), - }, - ) - categories_processed = sorted(prepared.category_to_segments.keys()) logger.info( "回忆录处理完成: user_id={} task_id={} segment_count={} " @@ -513,18 +515,28 @@ def generate_chapter_content(self, user_id: str, stage: str, new_content: str): db.commit() db.refresh(chapter) - from app.tasks.chapter_compose_tasks import recompose_chapters_for_story - from app.tasks.story_image_tasks import generate_story_image + from app.features.story.post_commit import enqueue_story_post_commit_effects - for sid in dispatch_ids: - try: - generate_story_image.delay(sid) - except Exception as exc: - logger.warning("generate_story_image delay: {}", exc) - try: - recompose_chapters_for_story.delay(sid) - except Exception as exc: - logger.warning("recompose_chapters_for_story delay: {}", exc) + ch_ids: set[str] = set() + if chapter is not None: + ch_ids.add(str(chapter.id)) + pc = enqueue_story_post_commit_effects( + user_id=user_id, + story_ids=set(dispatch_ids), + chapter_ids=ch_ids, + trigger_source="pipeline", + need_compaction=False, + ) + logger.info( + "event=story_post_commit user_id={} trigger=pipeline_generate_chapter " + "enqueued_story_image_count={} enqueued_chapter_recompose_count={} " + "compaction_scheduled={} errors={}", + user_id, + pc.enqueued_story_image_count, + pc.enqueued_chapter_recompose_count, + pc.compaction_scheduled, + pc.errors, + ) image_settings = MemoirImageSettings.from_env() if ( diff --git a/api/app/tasks/story_image_tasks.py b/api/app/tasks/story_image_tasks.py index 3cc7589..0857797 100644 --- a/api/app/tasks/story_image_tasks.py +++ b/api/app/tasks/story_image_tasks.py @@ -31,27 +31,38 @@ STORY_IMAGE_LOCK_TTL_SECONDS = 1800 STORY_IMAGE_CLAIM_TTL_SECONDS = 1800 -def _enqueue_chapter_recompose_for_story(story_id: str) -> None: - """story 正文因主图回填变更后,标记关联章节 dirty 并异步物化。""" +def _enqueue_chapter_effects_after_image_backfill(story_id: str) -> None: + """主图回填后标记关联章节 dirty,并经统一 post-commit 入口派发章节物化与 compaction。""" try: with get_sync_db() as session: from app.features.memoir import repo as memoir_repo + story = session.get(Story, story_id) + if not story: + return + uid = str(story.user_id) memoir_repo.mark_chapters_dirty_for_story_sync(session, story_id) + chapter_ids = memoir_repo.get_chapter_ids_linked_to_story_sync( + session, story_id + ) session.commit() + user_id = uid except Exception as exc: logger.warning( "mark_chapters_dirty_for_story_sync failed story={}: {}", story_id, exc ) return - try: - from app.tasks.chapter_compose_tasks import recompose_chapters_for_story + from app.features.story.post_commit import enqueue_story_post_commit_effects - recompose_chapters_for_story.delay(story_id) - except Exception as exc: - logger.warning( - "recompose_chapters_for_story.delay failed story={}: {}", story_id, exc - ) + enqueue_story_post_commit_effects( + user_id=user_id, + story_ids=set(), + chapter_ids=set(chapter_ids), + trigger_source="story_image_backfill", + need_image=False, + need_recompose=True, + need_compaction=True, + ) def _build_story_image_cos_key( @@ -320,7 +331,7 @@ def generate_story_image(self, story_id: str): db.commit() - _enqueue_chapter_recompose_for_story(story_id) + _enqueue_chapter_effects_after_image_backfill(story_id) logger.info( "generate_story_image: story={}, asset={}", diff --git a/api/tests/test_infra_regressions.py b/api/tests/test_infra_regressions.py new file mode 100644 index 0000000..13c6337 --- /dev/null +++ b/api/tests/test_infra_regressions.py @@ -0,0 +1,109 @@ +import asyncio +import sys +from types import ModuleType, SimpleNamespace + +import pytest + +from app.adapters.asr.tencent_asr import TencentASRProvider +from app.core import chapter_pipeline_lock +from app.features.story import post_commit + + +def test_chapter_pipeline_lock_delegates_to_token_lock( + monkeypatch: pytest.MonkeyPatch, +) -> None: + handle = object() + recorded: dict[str, object] = {} + + def fake_acquire(key: str, *, ttl_seconds: int) -> object: + recorded["key"] = key + recorded["ttl_seconds"] = ttl_seconds + return handle + + def fake_release(arg: object) -> None: + recorded["released"] = arg + + monkeypatch.setattr(chapter_pipeline_lock, "acquire_redis_lock", fake_acquire) + monkeypatch.setattr(chapter_pipeline_lock, "release_redis_lock", fake_release) + + acquired = chapter_pipeline_lock.acquire_chapter_pipeline_lock( + "user-1", "childhood", ttl_seconds=120 + ) + chapter_pipeline_lock.release_chapter_pipeline_lock(acquired) + + assert acquired is handle + assert recorded == { + "key": "lock:chapter:user-1:childhood", + "ttl_seconds": 120, + "released": handle, + } + + +def test_post_commit_reuses_singleton_redis_client( + monkeypatch: pytest.MonkeyPatch, +) -> None: + created: list[object] = [] + + class FakeRedis: + pass + + def fake_from_url(url: str, *, decode_responses: bool) -> FakeRedis: + client = FakeRedis() + created.append(client) + assert decode_responses is True + assert url + return client + + monkeypatch.setattr(post_commit, "_redis_client", None) + monkeypatch.setattr(post_commit.redis, "from_url", fake_from_url) + + first = post_commit._get_redis() + second = post_commit._get_redis() + + assert first is second + assert created == [first] + + +@pytest.mark.asyncio +async def test_tencent_asr_transcribe_uses_to_thread( + monkeypatch: pytest.MonkeyPatch, +) -> None: + to_thread_calls: list[tuple[object, tuple[object, ...]]] = [] + + class FakeRequest: + EngSerViceType: str | None = None + SourceType: int | None = None + VoiceFormat: str | None = None + Data: str | None = None + DataLen: int | None = None + + class FakeClient: + def SentenceRecognition(self, req: FakeRequest) -> SimpleNamespace: + return SimpleNamespace(Result=" 你好,世界 ") + + async def fake_to_thread(fn, *args): + to_thread_calls.append((fn, args)) + return fn(*args) + + models_module = ModuleType("tencentcloud.asr.v20190614.models") + models_module.SentenceRecognitionRequest = FakeRequest + package_module = ModuleType("tencentcloud.asr.v20190614") + package_module.models = models_module + + monkeypatch.setitem(sys.modules, "tencentcloud.asr.v20190614", package_module) + monkeypatch.setattr(asyncio, "to_thread", fake_to_thread) + + provider = TencentASRProvider("sid", "skey") + client = FakeClient() + monkeypatch.setattr(provider, "_get_client", lambda: client) + + text = await provider.transcribe(b"fake-audio", format="m4a") + + assert text == "你好,世界" + assert len(to_thread_calls) == 1 + fn, args = to_thread_calls[0] + assert getattr(fn, "__self__", None) is client + assert getattr(fn, "__name__", "") == "SentenceRecognition" + request = args[0] + assert request.VoiceFormat == "m4a" + assert request.DataLen == len(b"fake-audio") diff --git a/api/tests/test_narrative_pipeline.py b/api/tests/test_narrative_pipeline.py index 40c5fcd..ee9d303 100644 --- a/api/tests/test_narrative_pipeline.py +++ b/api/tests/test_narrative_pipeline.py @@ -32,11 +32,10 @@ def test_apply_narrative_fallbacks_merge_shrink_appends_oral() -> None: """整篇合并 JSON 输出过短:保留旧文并拼本段口述。""" long_existing = "x" * 500 raw = '{"paragraphs": [{"content": "短"}]}' - out = sps._apply_narrative_fallbacks( + out, _ft = sps._apply_narrative_fallbacks( raw, "新口述补充", - existing_for_narrative=long_existing, - existing_chapter_md="", + long_existing, chapter_category="childhood", ) assert long_existing in out @@ -50,11 +49,10 @@ def test_apply_narrative_fallbacks_json_too_short_returns_oral( monkeypatch.setattr(sps.settings, "memoir_narrative_fallback_min_chars", 20) oral = "我1999年出生在上海,小学时爷爷常带我去河边散步。" raw = '{"paragraphs": [{"content": "1999"}]}' - out = sps._apply_narrative_fallbacks( + out, _ft = sps._apply_narrative_fallbacks( raw, oral, - existing_for_narrative="", - existing_chapter_md="", + "", chapter_category="childhood", ) assert out.strip() == oral