"""Celery:story 变更后重组关联章节的 canonical_markdown(物化视图)。""" import time from datetime import datetime, timezone from celery import shared_task from app.core.chapter_pipeline_lock import ( acquire_chapter_pipeline_lock, release_chapter_pipeline_lock, ) from app.core.config import settings from app.core.db import get_sync_db from app.core.logging import get_logger from app.core.memoir_pipeline_progress import merge_fanout_item from app.core.memoir_pipeline_trace import new_memoir_correlation_id from app.core.memory_compaction_schedule import schedule_memory_compaction_run from app.features.memoir import repo as memoir_repo from app.features.memoir.models import Chapter logger = get_logger(__name__) @shared_task(bind=True, max_retries=8, default_retry_delay=30) def recompose_chapter( self, chapter_id: str, memoir_correlation_id: str | None = None ) -> dict: """ 按章节物化 canonical_markdown:仅当 markdown_compose_dirty 为 True 时执行; 与 pipeline 共用章节级 Redis 锁,拿不到锁则跳过(依赖后续触发重试)。 """ lock_ttl = int(settings.chapter_pipeline_lock_ttl_seconds) tid = str(self.request.id) t0 = time.perf_counter() merge_fanout_item( memoir_correlation_id, list_name="recompose_chapters", id_field="chapter_id", item_id=chapter_id, task_id=tid, status="running", ) user_id: str | None = None composed = False with get_sync_db() as session: chapter = session.get(Chapter, chapter_id) if not chapter: ms = (time.perf_counter() - t0) * 1000 logger.info( "event=recompose_chapter status=not_found chapter_id={} duration_ms={:.1f} " "msg=章节重组跳过(章节不存在)", chapter_id, ms, ) merge_fanout_item( memoir_correlation_id, list_name="recompose_chapters", id_field="chapter_id", item_id=chapter_id, task_id=tid, status="not_found", ) return {"status": "not_found"} if chapter.markdown_compose_dirty is not True: ms = (time.perf_counter() - t0) * 1000 logger.info( "event=recompose_chapter status=skip_not_dirty chapter_id={} duration_ms={:.1f} " "msg=章节重组跳过(无需重组)", chapter_id, ms, ) merge_fanout_item( memoir_correlation_id, list_name="recompose_chapters", id_field="chapter_id", item_id=chapter_id, task_id=tid, status="skip_not_dirty", ) return {"status": "skip_not_dirty"} uid = str(chapter.user_id) stage = str(chapter.category) lock_handle = acquire_chapter_pipeline_lock(uid, stage, ttl_seconds=lock_ttl) if lock_handle is None: ms = (time.perf_counter() - t0) * 1000 logger.info( "event=recompose_chapter status=lock_busy_retry " "chapter_id={} user_id={} stage={} retry_on_lock={} duration_ms={:.1f} " "msg=章节重组等待锁或重试", chapter_id, uid, stage, settings.memoir_recompose_retry_on_lock_contention, ms, ) if settings.memoir_recompose_retry_on_lock_contention: countdown = max(15, min(120, lock_ttl // 4)) raise self.retry(countdown=countdown) merge_fanout_item( memoir_correlation_id, list_name="recompose_chapters", id_field="chapter_id", item_id=chapter_id, task_id=tid, status="skip_lock_contention", ) return {"status": "skip_lock_contention"} try: composed = memoir_repo.compose_chapter_from_story_links_sync( session, chapter_id ) session.commit() user_id = uid except Exception as exc: session.rollback() logger.warning( "recompose_chapter failed chapter_id={} err={}", chapter_id, exc ) raise self.retry(exc=exc) from exc finally: release_chapter_pipeline_lock(lock_handle) if user_id: schedule_memory_compaction_run( user_id, { "trigger_source": "chapter_recompose", "trigger_time": datetime.now(timezone.utc).isoformat(), "pipeline_run_id": str(self.request.id), "memoir_correlation_id": new_memoir_correlation_id(), "recomposed_chapter_ids": [chapter_id], }, ) ms = (time.perf_counter() - t0) * 1000 st = "composed" if composed else "empty" logger.info( "event=recompose_chapter status={} chapter_id={} user_id={} duration_ms={:.1f} " "msg=章节物化重组完成", st, chapter_id, user_id or "-", ms, ) merge_fanout_item( memoir_correlation_id, list_name="recompose_chapters", id_field="chapter_id", item_id=chapter_id, task_id=tid, status="composed" if composed else "empty", ) return {"status": "composed" if composed else "empty", "chapter_id": chapter_id}