""" 章节封面 Celery 任务入队闸门:DB 二次校验 + Redis 短时去重,避免重复 delay 同一 chapter。 """ from __future__ import annotations from typing import Literal from sqlalchemy import select from sqlalchemy.orm import joinedload from app.core.db import get_sync_db from app.core.logging import get_logger from app.core.redis_sync import get_sync_redis from app.features.memoir.asset_resolver import strip_image_placeholders from app.features.memoir.cover_eligibility import ( chapter_eligible_for_cover_by_inline_body_image_count, chapter_has_story_links, chapter_needs_cover_enqueue, effective_chapter_markdown_for_cover_gates, primary_chapter_memoir_image, ) from app.features.memoir.models import Chapter, ChapterStoryLink logger = get_logger(__name__) CHAPTER_COVER_ENQUEUE_DEDUP_TTL_SECONDS = 450 _ENQUEUE_KEY_PREFIX = "enqueue:chapter-cover:" def _enqueue_dedup_key(chapter_id: str) -> str: return f"{_ENQUEUE_KEY_PREFIX}{chapter_id}" def _chapter_eligible_for_http_enqueue(chapter: Chapter | None) -> bool: """与 MemoirService.check_and_trigger_cover_generation 循环条件一致。""" if not chapter: return False if not chapter.category: return False if not chapter_has_story_links(chapter): return False if getattr(chapter, "cover_asset_id", None): return False view = effective_chapter_markdown_for_cover_gates(chapter) body = strip_image_placeholders(view).strip() if not body: return False if not chapter_eligible_for_cover_by_inline_body_image_count( chapter, markdown=view ): return False cover_rec = primary_chapter_memoir_image(chapter) if cover_rec and (cover_rec.status or "").strip() == "completed": return False return True def _chapter_eligible_for_pipeline_enqueue(chapter: Chapter | None) -> bool: """尚无 cover_asset、正文插图数达 MEMOIR_MIN_INLINE_IMAGES_FOR_CHAPTER_COVER(与 HTTP 闸门共用)。""" return bool(chapter_needs_cover_enqueue(chapter)) def _load_chapter_for_enqueue_sync(chapter_id: str) -> Chapter | None: with get_sync_db() as db: stmt = ( select(Chapter) .where(Chapter.id == chapter_id) .options( joinedload(Chapter.images), joinedload(Chapter.story_links).joinedload(ChapterStoryLink.story), ) ) return db.execute(stmt).unique().scalar_one_or_none() def try_enqueue_generate_chapter_cover( chapter_id: str, source: Literal["http", "pipeline"] = "pipeline", ) -> bool: """ 若章节仍需要生成封面,则 SET NX 去重后派发 generate_chapter_cover。 Returns: True 当且仅当成功调用 delay(通过闸门)。 """ chapter = _load_chapter_for_enqueue_sync(chapter_id) if source == "http": eligible = _chapter_eligible_for_http_enqueue(chapter) else: eligible = _chapter_eligible_for_pipeline_enqueue(chapter) if not eligible: return False key = _enqueue_dedup_key(chapter_id) client = get_sync_redis(decode_responses=True) try: if not client.set( key, "1", nx=True, ex=CHAPTER_COVER_ENQUEUE_DEDUP_TTL_SECONDS ): logger.debug( "chapter_cover enqueue skipped (dedup): chapter={} source={}", chapter_id, source, ) return False except Exception as exc: logger.warning( "chapter_cover enqueue dedup redis failed, allowing enqueue: chapter={} error={}", chapter_id, exc, ) from app.tasks.chapter_cover_tasks import generate_chapter_cover try: generate_chapter_cover.delay(chapter_id) except Exception as exc: logger.warning( "chapter_cover delay failed: chapter={} error={}", chapter_id, exc, ) try: client.delete(key) except Exception: pass return False logger.info( "chapter_cover enqueued: chapter={} source={}", chapter_id, source, ) return True