Files
life-echo/api/app/tasks/chapter_compose_tasks.py
Kevin aac484463d feat(api): 拆分章节物化与 Story 后处理,并加固 Redis 锁与腾讯 ASR
回忆录 Story 流水线(同步)
- 同步路径仅写入 Story 与章节关联,改为 mark_chapter_dirty_sync,不再内联 compose
- 物化由 Celery recompose_chapter 异步完成;compose 不变量与异常时保留 dirty 的语义在 repo 中补充说明
- Evidence:大批次时降低 top_k;路由候选 story 携带 char_count/version_count;append 超长/版本过多时强制新开 story
- 叙事 prompt:relevant_chunks 去重,减少重复证据噪声
- 叙事回退与忠实度 gate:返回 fallback 类型并记录结构化日志(含耗时、JSON 有效性等)

Post-commit 与任务编排
- 新增 post_commit.enqueue_story_post_commit_effects:统一派发 generate_story_image(Redis 去重)、延迟 recompose_chapter、可选 memory compaction
- memoir_tasks / story_service / story_image_tasks 改为调用 post-commit 入口;主图回填后按关联章节重算并调度物化与 compacs(锁委托、Redis 单例、ASR to_thread)
- 更新 test_narrative_pipeline 以适配 _apply_narrative_fallbacks 返回值
2026-03-30 11:53:04 +08:00

139 lines
5.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Celerystory 变更后重组关联章节的 canonical_markdown物化视图"""
from datetime import datetime, timezone
from celery import shared_task
from sqlalchemy import select
from app.core.chapter_pipeline_lock import (
acquire_chapter_pipeline_lock,
release_chapter_pipeline_lock,
)
from app.core.config import settings
from app.core.db import get_sync_db
from app.core.logging import get_logger
from app.core.memory_compaction_schedule import schedule_memory_compaction_run
from app.features.memoir import repo as memoir_repo
from app.features.memoir.models import Chapter, ChapterStoryLink
from app.features.story.models import Story
logger = get_logger(__name__)
@shared_task(bind=True, max_retries=3, default_retry_delay=30)
def recompose_chapter(self, chapter_id: str) -> dict:
"""
按章节物化 canonical_markdown仅当 markdown_compose_dirty 为 True 时执行;
与 pipeline 共用章节级 Redis 锁,拿不到锁则跳过(依赖后续触发重试)。
"""
lock_ttl = int(settings.chapter_pipeline_lock_ttl_seconds)
user_id: str | None = None
composed = False
with get_sync_db() as session:
chapter = session.get(Chapter, chapter_id)
if not chapter:
logger.info("recompose_chapter: chapter_id={} status=not_found", chapter_id)
return {"status": "not_found"}
if chapter.markdown_compose_dirty is not True:
logger.info(
"recompose_chapter: chapter_id={} status=skip_not_dirty",
chapter_id,
)
return {"status": "skip_not_dirty"}
uid = str(chapter.user_id)
stage = str(chapter.category)
lock_handle = acquire_chapter_pipeline_lock(uid, stage, ttl_seconds=lock_ttl)
if lock_handle is None:
logger.info(
"event=recompose_chapter status=skip_lock_contention "
"chapter_id={} user_id={} stage={}",
chapter_id,
uid,
stage,
)
return {"status": "skip_lock_contention"}
try:
composed = memoir_repo.compose_chapter_from_story_links_sync(
session, chapter_id
)
session.commit()
user_id = uid
except Exception as exc:
session.rollback()
logger.warning(
"recompose_chapter failed chapter_id={} err={}", chapter_id, exc
)
raise self.retry(exc=exc) from exc
finally:
release_chapter_pipeline_lock(lock_handle)
if user_id:
schedule_memory_compaction_run(
user_id,
{
"trigger_source": "chapter_recompose",
"trigger_time": datetime.now(timezone.utc).isoformat(),
"pipeline_run_id": str(self.request.id),
"recomposed_chapter_ids": [chapter_id],
},
)
logger.info(
"recompose_chapter: chapter_id={} status={}",
chapter_id,
"composed" if composed else "empty",
)
return {"status": "composed" if composed else "empty", "chapter_id": chapter_id}
@shared_task(bind=True, max_retries=3, default_retry_delay=30)
def recompose_chapters_for_story(self, story_id: str) -> dict:
"""
按 story 找出 dirty 章节并物化。
.. deprecated::
请改用 `recompose_chapter`(按章聚合)+ `enqueue_story_post_commit_effects`
保留兼容,待调用方全部迁移后删除。
"""
user_id: str | None = None
try:
with get_sync_db() as session:
story = session.get(Story, story_id)
user_id = str(story.user_id) if story else None
stmt = (
select(Chapter.id)
.join(
ChapterStoryLink,
ChapterStoryLink.chapter_id == Chapter.id,
)
.where(
ChapterStoryLink.story_id == story_id,
Chapter.markdown_compose_dirty.is_(True),
)
)
ids = list(session.scalars(stmt).all())
for cid in ids:
memoir_repo.compose_chapter_from_story_links_sync(session, cid)
session.commit()
if user_id:
schedule_memory_compaction_run(
user_id,
{
"trigger_source": "chapter_recompose",
"trigger_time": datetime.now(timezone.utc).isoformat(),
"pipeline_run_id": str(self.request.id),
"story_ids": [story_id],
"recomposed_chapter_ids": ids,
},
)
logger.info(
"recompose_chapters_for_story: story={} recomposed_chapters={}",
story_id,
ids,
)
return {"story_id": story_id, "recomposed_chapter_ids": ids}
except Exception as exc:
logger.warning(
"recompose_chapters_for_story failed story={} err={}", story_id, exc
)
raise self.retry(exc=exc) from exc