Files
life-echo/api/app/tasks/chapter_cover_enqueue.py
Sully 53e0065e3e refactor(api): TOML 配置 SSOT、统一错误契约、Auth/事务加固与可观测性 (#33)
配置 SSOT(TOML + .env)
统一错误契约
Auth 与事务边界
Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client
可观测性(OpenTelemetry + LGTM)
2026-05-22 13:44:50 +08:00

136 lines
4.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
章节封面 Celery 任务入队闸门DB 二次校验 + Redis 短时去重,避免重复 delay 同一 chapter。
"""
from __future__ import annotations
from typing import Literal
from sqlalchemy import select
from sqlalchemy.orm import joinedload
from app.core.db import get_sync_db
from app.core.logging import get_logger
from app.core.redis_sync import get_sync_redis
from app.features.memoir.asset_resolver import strip_image_placeholders
from app.features.memoir.cover_eligibility import (
chapter_eligible_for_cover_by_inline_body_image_count,
chapter_has_story_links,
chapter_needs_cover_enqueue,
effective_chapter_markdown_for_cover_gates,
primary_chapter_memoir_image,
)
from app.features.memoir.models import Chapter, ChapterStoryLink
logger = get_logger(__name__)
CHAPTER_COVER_ENQUEUE_DEDUP_TTL_SECONDS = 450
_ENQUEUE_KEY_PREFIX = "enqueue:chapter-cover:"
def _enqueue_dedup_key(chapter_id: str) -> str:
return f"{_ENQUEUE_KEY_PREFIX}{chapter_id}"
def _chapter_eligible_for_http_enqueue(chapter: Chapter | None) -> bool:
"""与 MemoirService.check_and_trigger_cover_generation 循环条件一致。"""
if not chapter:
return False
if not chapter.category:
return False
if not chapter_has_story_links(chapter):
return False
if getattr(chapter, "cover_asset_id", None):
return False
view = effective_chapter_markdown_for_cover_gates(chapter)
body = strip_image_placeholders(view).strip()
if not body:
return False
if not chapter_eligible_for_cover_by_inline_body_image_count(
chapter, markdown=view
):
return False
cover_rec = primary_chapter_memoir_image(chapter)
if cover_rec and (cover_rec.status or "").strip() == "completed":
return False
return True
def _chapter_eligible_for_pipeline_enqueue(chapter: Chapter | None) -> bool:
"""尚无 cover_asset、正文插图数达 MEMOIR_MIN_INLINE_IMAGES_FOR_CHAPTER_COVER与 HTTP 闸门共用)。"""
return bool(chapter_needs_cover_enqueue(chapter))
def _load_chapter_for_enqueue_sync(chapter_id: str) -> Chapter | None:
with get_sync_db() as db:
stmt = (
select(Chapter)
.where(Chapter.id == chapter_id)
.options(
joinedload(Chapter.images),
joinedload(Chapter.story_links).joinedload(ChapterStoryLink.story),
)
)
return db.execute(stmt).unique().scalar_one_or_none()
def try_enqueue_generate_chapter_cover(
chapter_id: str,
source: Literal["http", "pipeline"] = "pipeline",
) -> bool:
"""
若章节仍需要生成封面,则 SET NX 去重后派发 generate_chapter_cover。
Returns:
True 当且仅当成功调用 delay通过闸门
"""
chapter = _load_chapter_for_enqueue_sync(chapter_id)
if source == "http":
eligible = _chapter_eligible_for_http_enqueue(chapter)
else:
eligible = _chapter_eligible_for_pipeline_enqueue(chapter)
if not eligible:
return False
key = _enqueue_dedup_key(chapter_id)
client = get_sync_redis(decode_responses=True)
try:
if not client.set(
key, "1", nx=True, ex=CHAPTER_COVER_ENQUEUE_DEDUP_TTL_SECONDS
):
logger.debug(
"chapter_cover enqueue skipped (dedup): chapter={} source={}",
chapter_id,
source,
)
return False
except Exception as exc:
logger.warning(
"chapter_cover enqueue dedup redis failed, allowing enqueue: chapter={} error={}",
chapter_id,
exc,
)
from app.tasks.chapter_cover_tasks import generate_chapter_cover
try:
generate_chapter_cover.delay(chapter_id)
except Exception as exc:
logger.warning(
"chapter_cover delay failed: chapter={} error={}",
chapter_id,
exc,
)
try:
client.delete(key)
except Exception:
pass
return False
logger.info(
"chapter_cover enqueued: chapter={} source={}",
chapter_id,
source,
)
return True