Files
life-echo/api/app/tasks/celery_app.py
Sully 53e0065e3e refactor(api): TOML 配置 SSOT、统一错误契约、Auth/事务加固与可观测性 (#33)
配置 SSOT(TOML + .env)
统一错误契约
Auth 与事务边界
Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client
可观测性(OpenTelemetry + LGTM)
2026-05-22 13:44:50 +08:00

239 lines
8.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Celery 应用配置
配置从 app.core.config.settings 读取。
Worker 启动时需聚合注册所有 feature 的 model否则 User 等 relationship("Order", ...) 解析时会报找不到 Order。
与 main.py / Alembic 一致:下方 import 仅用于注册 ORM model。
"""
from __future__ import annotations
from typing import Any
from app.core.logging import get_logger, setup_logging
# 与 app.main 一致:先配置 loguru + InterceptHandler再加载会打日志的依赖
setup_logging()
from app.core.config import settings
from app.core.telemetry import instrument_celery, setup_telemetry
# Worker 与 API 共用 .env固定 service.name勿读 OTEL_SERVICE_NAME留给主站 / internal
setup_telemetry(service_name="life-echo-celery-worker")
instrument_celery()
from celery import Celery
from celery.signals import (
task_failure,
task_postrun,
task_prerun,
task_success,
worker_shutting_down,
)
from app.core.celery_log_context import clear_celery_log_extras, set_celery_log_extras
from app.core.log_events import celery_prerun_extras
from app.features.asset import models as _asset_models # noqa: F401 - register Asset
from app.features.auth import models as _auth_models # noqa: F401
from app.features.conversation import models as _conv_models # noqa: F401
from app.features.evaluation import models as _eval_models # noqa: F401
from app.features.memoir import models as _memoir_models # noqa: F401
from app.features.memory import models as _memory_models # noqa: F401
from app.features.payment import models as _payment_models # noqa: F401
from app.features.story import models as _story_models # noqa: F401
from app.features.user import models as _user_models # noqa: F401
from app.core.runtime_constants import celery_defaults
from app.features.memory.constants import memory
CELERY_REDIS_URL = settings.celery_redis_url_resolved
_celery_lifecycle_log = get_logger(__name__)
_celery_lifecycle_log.info(
"event=celery_redis_urls business_redis_url={} celery_redis_url={} "
"msg=Celery broker/backend URL resolved",
settings.redis_url_resolved,
CELERY_REDIS_URL,
)
# 创建 Celery 应用
celery_app = Celery(
"life_echo",
broker=CELERY_REDIS_URL,
backend=CELERY_REDIS_URL,
include=[
"app.tasks.memoir_tasks",
"app.tasks.story_title_tasks",
"app.tasks.story_image_tasks",
"app.tasks.chapter_cover_tasks",
"app.tasks.chapter_compose_tasks",
"app.tasks.memory_compaction_tasks",
"app.tasks.memory_enrichment_tasks",
"app.tasks.memoir_quality_pass_tasks",
],
)
# Celery 配置
celery_app.conf.update(
# 不劫持根 logger便于与 loguru + InterceptHandler 统一格式与等级
worker_hijack_root_logger=False,
# 任务序列化
task_serializer="json",
accept_content=["json"],
result_serializer="json",
# 时区
timezone="UTC",
enable_utc=True,
# 任务结果过期时间1小时
result_expires=3600,
# 任务执行设置
task_soft_time_limit=300, # 5分钟软超时默认重任务见 task_annotations
task_time_limit=600, # 10分钟硬超时
# 并发设置
worker_prefetch_multiplier=1, # 每次只预取一个任务
worker_concurrency=4, # 并发 worker 数量
# Broker 连接
broker_pool_limit=celery_defaults.broker_pool_limit,
broker_connection_retry_on_startup=celery_defaults.broker_connection_retry_on_startup,
# 任务重试设置
task_acks_late=True, # 任务完成后再确认
task_reject_on_worker_lost=True, # worker 丢失时拒绝任务
task_routes={
"app.tasks.memory_enrichment_tasks.embed_memory_source": {
"queue": celery_defaults.memory_enrichment_queue,
},
"app.tasks.memory_enrichment_tasks.enrich_memory_source": {
"queue": celery_defaults.memory_enrichment_queue,
},
},
)
celery_app.conf.task_annotations = {
"app.tasks.memory_enrichment_tasks.embed_memory_source": {
"soft_time_limit": celery_defaults.enrichment_soft_time_limit,
"time_limit": celery_defaults.enrichment_hard_time_limit,
},
"app.tasks.memory_enrichment_tasks.enrich_memory_source": {
"soft_time_limit": celery_defaults.enrichment_soft_time_limit,
"time_limit": celery_defaults.enrichment_hard_time_limit,
},
"app.tasks.memoir_tasks.process_memoir_phase1": {
"soft_time_limit": celery_defaults.memoir_soft_time_limit,
"time_limit": celery_defaults.memoir_hard_time_limit,
},
"app.tasks.memoir_tasks.process_memoir_phase2": {
"soft_time_limit": celery_defaults.memoir_soft_time_limit,
"time_limit": celery_defaults.memoir_hard_time_limit,
},
"app.tasks.memoir_tasks.generate_chapter_content": {
"soft_time_limit": celery_defaults.memoir_soft_time_limit,
"time_limit": celery_defaults.memoir_hard_time_limit,
},
"app.tasks.story_image_tasks.generate_story_image": {
"soft_time_limit": celery_defaults.image_soft_time_limit,
"time_limit": celery_defaults.image_hard_time_limit,
},
"app.tasks.chapter_cover_tasks.generate_chapter_cover": {
"soft_time_limit": celery_defaults.image_soft_time_limit,
"time_limit": celery_defaults.image_hard_time_limit,
},
"app.tasks.chapter_compose_tasks.recompose_chapter": {
"soft_time_limit": celery_defaults.image_soft_time_limit,
"time_limit": celery_defaults.image_hard_time_limit,
},
"app.tasks.memory_compaction_tasks.memory_compaction_sweep": {
"soft_time_limit": celery_defaults.compaction_sweep_soft_time_limit,
"time_limit": celery_defaults.compaction_sweep_hard_time_limit,
},
}
celery_app.conf.beat_schedule = {
"memory-compaction-sweep": {
"task": "app.tasks.memory_compaction_tasks.memory_compaction_sweep",
"schedule": 6 * 3600.0,
},
}
@worker_shutting_down.connect
def _shutdown_otel_on_worker_exit(**_: object) -> None:
from app.core.telemetry import shutdown_telemetry
shutdown_telemetry()
def _summarize_task_return(retval: object) -> str:
if retval is None:
return "None"
if isinstance(retval, dict):
keys = list(retval.keys())[:14]
return "dict:" + ",".join(str(k) for k in keys)
text = repr(retval)
if len(text) > 180:
return text[:180] + "..."
return text
@task_prerun.connect
def _log_task_prerun(
task_id: str | None = None,
task: object | None = None,
args: tuple[Any, ...] | None = None,
kwargs: dict[str, Any] | None = None,
**_: object,
) -> None:
name = getattr(task, "name", None) or "?"
from app.core.telemetry import current_trace_context
extras = celery_prerun_extras(name, tuple(args or ()), dict(kwargs or {}))
if task_id:
extras["task_id"] = str(task_id).strip()
extras.update(current_trace_context())
set_celery_log_extras(extras if extras else None)
_celery_lifecycle_log.info(
"event=celery_task_start task={} task_id={} msg=Celery 任务已开始",
name,
task_id,
)
@task_success.connect
def _log_task_success(
sender: object | None = None, result: object | None = None, **_: object
) -> None:
"""仅成功路径;失败见 ``task_failure``(避免 ``task_postrun`` 在异常态仍触发)。"""
name = getattr(sender, "name", None) if sender is not None else None
name = name or "?"
task_id: str | None = None
if sender is not None:
req = getattr(sender, "request", None)
if req is not None:
task_id = getattr(req, "id", None)
_celery_lifecycle_log.info(
"event=celery_task_ok task={} task_id={} result={} msg=Celery 任务已成功结束",
name,
task_id,
_summarize_task_return(result),
)
@task_failure.connect
def _log_task_failure(
task_id: str | None = None,
task: object | None = None,
exception: BaseException | None = None,
**kwargs: object,
) -> None:
name = getattr(task, "name", None) or "?"
et = type(exception).__name__ if isinstance(exception, BaseException) else "?"
_celery_lifecycle_log.warning(
"event=celery_task_failed task={} task_id={} exc_type={} exc={} msg=Celery 任务失败",
name,
task_id,
et,
exception,
)
@task_postrun.connect
def _clear_worker_log_context(**kwargs: object) -> None:
"""任务体结束后清除 ContextVar避免同一 worker 进程串上下文。"""
clear_celery_log_extras()