feat(api): 拆分章节物化与 Story 后处理,并加固 Redis 锁与腾讯 ASR

回忆录 Story 流水线(同步)
- 同步路径仅写入 Story 与章节关联,改为 mark_chapter_dirty_sync,不再内联 compose
- 物化由 Celery recompose_chapter 异步完成;compose 不变量与异常时保留 dirty 的语义在 repo 中补充说明
- Evidence:大批次时降低 top_k;路由候选 story 携带 char_count/version_count;append 超长/版本过多时强制新开 story
- 叙事 prompt:relevant_chunks 去重,减少重复证据噪声
- 叙事回退与忠实度 gate:返回 fallback 类型并记录结构化日志(含耗时、JSON 有效性等)

Post-commit 与任务编排
- 新增 post_commit.enqueue_story_post_commit_effects:统一派发 generate_story_image(Redis 去重)、延迟 recompose_chapter、可选 memory compaction
- memoir_tasks / story_service / story_image_tasks 改为调用 post-commit 入口;主图回填后按关联章节重算并调度物化与 compacs(锁委托、Redis 单例、ASR to_thread)
- 更新 test_narrative_pipeline 以适配 _apply_narrative_fallbacks 返回值
This commit is contained in:
Kevin
2026-03-30 11:53:04 +08:00
parent e884409410
commit aac484463d
15 changed files with 775 additions and 144 deletions

View File

@@ -15,10 +15,16 @@ from sqlalchemy.orm import Session
from app.agents.chat.prompts_profile import format_user_profile_context
from app.agents.memoir import MemoirOrchestrator
from app.agents.state_schema import MemoirStateSchema, SlotData, default_state
from app.core.chapter_pipeline_lock import (
acquire_chapter_pipeline_lock as _acquire_chapter_lock,
)
from app.core.chapter_pipeline_lock import (
release_chapter_pipeline_lock as _release_chapter_lock,
)
from app.core.config import settings
from app.core.db import get_sync_db
from app.core.dependencies import get_llm_provider
from app.core.logging import get_logger
from app.core.memory_compaction_schedule import schedule_memory_compaction_run
from app.features.conversation.models import Segment
from app.features.memoir.cover_eligibility import (
chapter_needs_cover_enqueue,
@@ -71,18 +77,8 @@ def _get_redis_client(*, decode_responses: bool = False) -> redis.Redis:
return client
def _acquire_chapter_lock(user_id: str, stage: str, timeout: int = 120) -> bool:
"""获取章节分布式锁,防止并发写入同一章节"""
r = _get_redis_client()
lock_key = f"lock:chapter:{user_id}:{stage}"
return r.set(lock_key, "1", nx=True, ex=timeout)
def _release_chapter_lock(user_id: str, stage: str):
"""释放章节分布式锁"""
r = _get_redis_client()
lock_key = f"lock:chapter:{user_id}:{stage}"
r.delete(lock_key)
def _chapter_lock_ttl() -> int:
return int(settings.chapter_pipeline_lock_ttl_seconds)
def _update_task_status_sync(
@@ -338,11 +334,15 @@ def process_memoir_segments(self, user_id: str, segment_ids: List[str]):
)
chapters_to_enqueue: Set[str] = set()
affected_chapter_ids: Set[str] = set()
for (
chapter_category,
category_segments,
) in prepared.category_to_segments.items():
if not _acquire_chapter_lock(user_id, chapter_category):
lock_handle = _acquire_chapter_lock(
user_id, chapter_category, ttl_seconds=_chapter_lock_ttl()
)
if lock_handle is None:
logger.warning(
"章节锁竞争: category={}, 延迟重试",
chapter_category,
@@ -362,6 +362,7 @@ def process_memoir_segments(self, user_id: str, segment_ids: List[str]):
story_dispatch_ids |= disp
db.flush()
db.refresh(chapter)
affected_chapter_ids.add(chapter.id)
needs_cover_enqueue = (
image_settings.enabled and chapter_needs_cover_enqueue(chapter)
@@ -390,7 +391,7 @@ def process_memoir_segments(self, user_id: str, segment_ids: List[str]):
if chapter and needs_cover_enqueue:
chapters_to_enqueue.add(chapter.id)
finally:
_release_chapter_lock(user_id, chapter_category)
_release_chapter_lock(lock_handle)
# 标记段落为已处理
for seg in segments:
@@ -398,18 +399,30 @@ def process_memoir_segments(self, user_id: str, segment_ids: List[str]):
db.commit()
from app.tasks.chapter_compose_tasks import recompose_chapters_for_story
from app.tasks.story_image_tasks import generate_story_image
from app.features.story.post_commit import enqueue_story_post_commit_effects
for sid in story_dispatch_ids:
try:
generate_story_image.delay(sid)
except Exception as exc:
logger.warning("generate_story_image delay: {}", exc)
try:
recompose_chapters_for_story.delay(sid)
except Exception as exc:
logger.warning("recompose_chapters_for_story delay: {}", exc)
pc = enqueue_story_post_commit_effects(
user_id=user_id,
story_ids=set(story_dispatch_ids),
chapter_ids=affected_chapter_ids,
trigger_source="pipeline",
need_compaction=True,
compaction_extra={
"pipeline_run_id": str(task_id),
"story_dispatch_ids": sorted(story_dispatch_ids),
"chapters_to_enqueue": sorted(chapters_to_enqueue),
},
)
logger.info(
"event=story_post_commit user_id={} trigger=pipeline "
"enqueued_story_image_count={} enqueued_chapter_recompose_count={} "
"compaction_scheduled={} errors={}",
user_id,
pc.enqueued_story_image_count,
pc.enqueued_chapter_recompose_count,
pc.compaction_scheduled,
pc.errors,
)
from app.tasks.chapter_cover_enqueue import (
try_enqueue_generate_chapter_cover,
@@ -419,17 +432,6 @@ def process_memoir_segments(self, user_id: str, segment_ids: List[str]):
if try_enqueue_generate_chapter_cover(chapter_id, source="pipeline"):
logger.info(f"派发章节封面任务: chapter={chapter_id}")
schedule_memory_compaction_run(
user_id,
{
"trigger_source": "memoir_segments",
"trigger_time": datetime.now(timezone.utc).isoformat(),
"pipeline_run_id": str(task_id),
"story_dispatch_ids": sorted(story_dispatch_ids),
"chapters_to_enqueue": sorted(chapters_to_enqueue),
},
)
categories_processed = sorted(prepared.category_to_segments.keys())
logger.info(
"回忆录处理完成: user_id={} task_id={} segment_count={} "
@@ -513,18 +515,28 @@ def generate_chapter_content(self, user_id: str, stage: str, new_content: str):
db.commit()
db.refresh(chapter)
from app.tasks.chapter_compose_tasks import recompose_chapters_for_story
from app.tasks.story_image_tasks import generate_story_image
from app.features.story.post_commit import enqueue_story_post_commit_effects
for sid in dispatch_ids:
try:
generate_story_image.delay(sid)
except Exception as exc:
logger.warning("generate_story_image delay: {}", exc)
try:
recompose_chapters_for_story.delay(sid)
except Exception as exc:
logger.warning("recompose_chapters_for_story delay: {}", exc)
ch_ids: set[str] = set()
if chapter is not None:
ch_ids.add(str(chapter.id))
pc = enqueue_story_post_commit_effects(
user_id=user_id,
story_ids=set(dispatch_ids),
chapter_ids=ch_ids,
trigger_source="pipeline",
need_compaction=False,
)
logger.info(
"event=story_post_commit user_id={} trigger=pipeline_generate_chapter "
"enqueued_story_image_count={} enqueued_chapter_recompose_count={} "
"compaction_scheduled={} errors={}",
user_id,
pc.enqueued_story_image_count,
pc.enqueued_chapter_recompose_count,
pc.compaction_scheduled,
pc.errors,
)
image_settings = MemoirImageSettings.from_env()
if (