Files
life-echo/api/app/tasks/celery_app.py

239 lines
8.4 KiB
Python
Raw Permalink Normal View History

"""
Celery 应用配置
配置从 app.core.config.settings 读取
Worker 启动时需聚合注册所有 feature model否则 User relationship("Order", ...) 解析时会报找不到 Order
2026-03-20 15:15:35 +08:00
main.py / Alembic 一致下方 import 仅用于注册 ORM model
"""
2026-03-19 14:36:14 +08:00
from __future__ import annotations
from typing import Any
from app.core.logging import get_logger, setup_logging
2026-03-20 15:15:35 +08:00
# 与 app.main 一致:先配置 loguru + InterceptHandler再加载会打日志的依赖
setup_logging()
feat: OpenTelemetry LGTM observability, dev tooling, and memoir UX fixes (#31) (#32) * add staging ios app build script * feat(api): add OpenTelemetry LGTM stack for local observability Wire OTel traces, metrics, and logs through a collector to Tempo, Prometheus, and Loki, with custom LLM instrumentation, dev compose overlay, Grafana provisioning, env templates, and development.sh auto-start. * feat: expand observability, harden dev tooling, and fix expo staging UX Add business and LLM Prometheus metrics with Grafana dashboards, alerting, and a metrics verification script. Wire telemetry through adapters and core LLM paths, and document the local LGTM workflow. Fix development.sh for macOS bash 3.2, open Grafana and eval-web in Chrome, and repair eval-web auto-open (unbound EVAL_WEB_BROWSER_SCHEDULED). Merge internal-eval into the main dev script with improved compose handling. Require EXPO_PUBLIC_* at build time, improve iOS HTTP ATS for staging IPs, show memoir empty state instead of load errors when no chapters exist, and add jest env setup plus chapter list response normalization. * chore: enable Grafana Assistant Cursor plugin * fix: memoir empty state and repair withdrawn 0020_chapters_book_id stamp Show empty memoir UI when the chapter list succeeds with no items; treat auth/404 as non-fatal. Extend alembic revision repair so local dev DBs stamped with the removed 0020_chapters_book_id migration can roll back and upgrade to 0019. --------- Co-authored-by: Kevin <kevin@brighteng.org> Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-20 15:14:13 +08:00
from app.core.config import settings
from app.core.telemetry import instrument_celery, setup_telemetry
# Worker 与 API 共用 .env固定 service.name勿读 OTEL_SERVICE_NAME留给主站 / internal
setup_telemetry(service_name="life-echo-celery-worker")
instrument_celery()
from celery import Celery
from celery.signals import (
task_failure,
task_postrun,
task_prerun,
task_success,
worker_shutting_down,
)
from app.core.celery_log_context import clear_celery_log_extras, set_celery_log_extras
from app.core.log_events import celery_prerun_extras
重构回忆录为 story-first / markdown-first 架构并整合图片意图与前端 UI 修复 本次 squash merge 将 codex-story-first-image-intent 的整体改动合入 development,核心内容包括: 1. 后端数据与迁移:新增 stories、story_versions、story_image_intents、chapter_cover_intents、assets 等模型与 Alembic 迁移,建立 story-first、markdown-first、asset-first 的主数据链路。 2. 生成与任务链:引入 StoryBuilderOrchestrator、ChapterComposerOrchestrator、story_image_tasks、chapter_cover_tasks,图片生成从正文占位符改为结构化 intent -> asset -> markdown 回填。 3. 并发与一致性:为 story/chapter intent 增加 claim_token、claimed_at、attempt_count,采用数据库原子 claim 为主、Redis 锁为辅,避免重复生成、锁误删和 processing 卡死。 4. Memoir 读写路径:章节 canonical_markdown 成为正文真源,列表/详情接口补齐 markdown、cover_asset、word_count 等字段,PDF 与 asset 解析链路同步升级。 5. Memory / Retrieval:扩展 transcript ingest、chunking、evidence 检索与 story 聚合基础设施,为后续 story-first RAG 与多 agent 编排提供底座。 6. App 端体验:章节页继续走 MarkdownRenderer 阅读链,同时吸收 fix3-19 的跨平台 UI glitch 修复;更新对话页、首页、文案资源与章节列表映射逻辑。 7. 测试与文档:补充 asset resolver、story image task、章节封面派发、markdown 映射等回归测试,并加入图片占位符退役设计文档。
2026-03-20 10:30:07 +08:00
from app.features.asset import models as _asset_models # noqa: F401 - register Asset
from app.features.auth import models as _auth_models # noqa: F401
from app.features.conversation import models as _conv_models # noqa: F401
from app.features.evaluation import models as _eval_models # noqa: F401
from app.features.memoir import models as _memoir_models # noqa: F401
2026-03-20 15:15:35 +08:00
from app.features.memory import models as _memory_models # noqa: F401
from app.features.payment import models as _payment_models # noqa: F401
2026-03-20 15:15:35 +08:00
from app.features.story import models as _story_models # noqa: F401
from app.features.user import models as _user_models # noqa: F401
from app.core.runtime_constants import celery_defaults
from app.features.memory.constants import memory
CELERY_REDIS_URL = settings.celery_redis_url_resolved
_celery_lifecycle_log = get_logger(__name__)
_celery_lifecycle_log.info(
"event=celery_redis_urls business_redis_url={} celery_redis_url={} "
"msg=Celery broker/backend URL resolved",
settings.redis_url_resolved,
CELERY_REDIS_URL,
)
# 创建 Celery 应用
celery_app = Celery(
"life_echo",
broker=CELERY_REDIS_URL,
backend=CELERY_REDIS_URL,
重构回忆录为 story-first / markdown-first 架构并整合图片意图与前端 UI 修复 本次 squash merge 将 codex-story-first-image-intent 的整体改动合入 development,核心内容包括: 1. 后端数据与迁移:新增 stories、story_versions、story_image_intents、chapter_cover_intents、assets 等模型与 Alembic 迁移,建立 story-first、markdown-first、asset-first 的主数据链路。 2. 生成与任务链:引入 StoryBuilderOrchestrator、ChapterComposerOrchestrator、story_image_tasks、chapter_cover_tasks,图片生成从正文占位符改为结构化 intent -> asset -> markdown 回填。 3. 并发与一致性:为 story/chapter intent 增加 claim_token、claimed_at、attempt_count,采用数据库原子 claim 为主、Redis 锁为辅,避免重复生成、锁误删和 processing 卡死。 4. Memoir 读写路径:章节 canonical_markdown 成为正文真源,列表/详情接口补齐 markdown、cover_asset、word_count 等字段,PDF 与 asset 解析链路同步升级。 5. Memory / Retrieval:扩展 transcript ingest、chunking、evidence 检索与 story 聚合基础设施,为后续 story-first RAG 与多 agent 编排提供底座。 6. App 端体验:章节页继续走 MarkdownRenderer 阅读链,同时吸收 fix3-19 的跨平台 UI glitch 修复;更新对话页、首页、文案资源与章节列表映射逻辑。 7. 测试与文档:补充 asset resolver、story image task、章节封面派发、markdown 映射等回归测试,并加入图片占位符退役设计文档。
2026-03-20 10:30:07 +08:00
include=[
"app.tasks.memoir_tasks",
"app.tasks.story_title_tasks",
重构回忆录为 story-first / markdown-first 架构并整合图片意图与前端 UI 修复 本次 squash merge 将 codex-story-first-image-intent 的整体改动合入 development,核心内容包括: 1. 后端数据与迁移:新增 stories、story_versions、story_image_intents、chapter_cover_intents、assets 等模型与 Alembic 迁移,建立 story-first、markdown-first、asset-first 的主数据链路。 2. 生成与任务链:引入 StoryBuilderOrchestrator、ChapterComposerOrchestrator、story_image_tasks、chapter_cover_tasks,图片生成从正文占位符改为结构化 intent -> asset -> markdown 回填。 3. 并发与一致性:为 story/chapter intent 增加 claim_token、claimed_at、attempt_count,采用数据库原子 claim 为主、Redis 锁为辅,避免重复生成、锁误删和 processing 卡死。 4. Memoir 读写路径:章节 canonical_markdown 成为正文真源,列表/详情接口补齐 markdown、cover_asset、word_count 等字段,PDF 与 asset 解析链路同步升级。 5. Memory / Retrieval:扩展 transcript ingest、chunking、evidence 检索与 story 聚合基础设施,为后续 story-first RAG 与多 agent 编排提供底座。 6. App 端体验:章节页继续走 MarkdownRenderer 阅读链,同时吸收 fix3-19 的跨平台 UI glitch 修复;更新对话页、首页、文案资源与章节列表映射逻辑。 7. 测试与文档:补充 asset resolver、story image task、章节封面派发、markdown 映射等回归测试,并加入图片占位符退役设计文档。
2026-03-20 10:30:07 +08:00
"app.tasks.story_image_tasks",
"app.tasks.chapter_cover_tasks",
2026-03-20 15:15:35 +08:00
"app.tasks.chapter_compose_tasks",
"app.tasks.memory_compaction_tasks",
"app.tasks.memory_enrichment_tasks",
"app.tasks.memoir_quality_pass_tasks",
重构回忆录为 story-first / markdown-first 架构并整合图片意图与前端 UI 修复 本次 squash merge 将 codex-story-first-image-intent 的整体改动合入 development,核心内容包括: 1. 后端数据与迁移:新增 stories、story_versions、story_image_intents、chapter_cover_intents、assets 等模型与 Alembic 迁移,建立 story-first、markdown-first、asset-first 的主数据链路。 2. 生成与任务链:引入 StoryBuilderOrchestrator、ChapterComposerOrchestrator、story_image_tasks、chapter_cover_tasks,图片生成从正文占位符改为结构化 intent -> asset -> markdown 回填。 3. 并发与一致性:为 story/chapter intent 增加 claim_token、claimed_at、attempt_count,采用数据库原子 claim 为主、Redis 锁为辅,避免重复生成、锁误删和 processing 卡死。 4. Memoir 读写路径:章节 canonical_markdown 成为正文真源,列表/详情接口补齐 markdown、cover_asset、word_count 等字段,PDF 与 asset 解析链路同步升级。 5. Memory / Retrieval:扩展 transcript ingest、chunking、evidence 检索与 story 聚合基础设施,为后续 story-first RAG 与多 agent 编排提供底座。 6. App 端体验:章节页继续走 MarkdownRenderer 阅读链,同时吸收 fix3-19 的跨平台 UI glitch 修复;更新对话页、首页、文案资源与章节列表映射逻辑。 7. 测试与文档:补充 asset resolver、story image task、章节封面派发、markdown 映射等回归测试,并加入图片占位符退役设计文档。
2026-03-20 10:30:07 +08:00
],
)
# Celery 配置
celery_app.conf.update(
2026-03-20 15:15:35 +08:00
# 不劫持根 logger便于与 loguru + InterceptHandler 统一格式与等级
worker_hijack_root_logger=False,
# 任务序列化
task_serializer="json",
accept_content=["json"],
result_serializer="json",
# 时区
timezone="UTC",
enable_utc=True,
# 任务结果过期时间1小时
result_expires=3600,
# 任务执行设置
task_soft_time_limit=300, # 5分钟软超时默认重任务见 task_annotations
task_time_limit=600, # 10分钟硬超时
# 并发设置
worker_prefetch_multiplier=1, # 每次只预取一个任务
worker_concurrency=4, # 并发 worker 数量
# Broker 连接
broker_pool_limit=celery_defaults.broker_pool_limit,
broker_connection_retry_on_startup=celery_defaults.broker_connection_retry_on_startup,
# 任务重试设置
task_acks_late=True, # 任务完成后再确认
task_reject_on_worker_lost=True, # worker 丢失时拒绝任务
task_routes={
2026-04-30 16:22:55 +08:00
"app.tasks.memory_enrichment_tasks.embed_memory_source": {
"queue": celery_defaults.memory_enrichment_queue,
2026-04-30 16:22:55 +08:00
},
"app.tasks.memory_enrichment_tasks.enrich_memory_source": {
"queue": celery_defaults.memory_enrichment_queue,
},
},
)
celery_app.conf.task_annotations = {
2026-04-30 16:22:55 +08:00
"app.tasks.memory_enrichment_tasks.embed_memory_source": {
"soft_time_limit": celery_defaults.enrichment_soft_time_limit,
"time_limit": celery_defaults.enrichment_hard_time_limit,
2026-04-30 16:22:55 +08:00
},
"app.tasks.memory_enrichment_tasks.enrich_memory_source": {
"soft_time_limit": celery_defaults.enrichment_soft_time_limit,
"time_limit": celery_defaults.enrichment_hard_time_limit,
},
"app.tasks.memoir_tasks.process_memoir_phase1": {
"soft_time_limit": celery_defaults.memoir_soft_time_limit,
"time_limit": celery_defaults.memoir_hard_time_limit,
},
"app.tasks.memoir_tasks.process_memoir_phase2": {
"soft_time_limit": celery_defaults.memoir_soft_time_limit,
"time_limit": celery_defaults.memoir_hard_time_limit,
},
"app.tasks.memoir_tasks.generate_chapter_content": {
"soft_time_limit": celery_defaults.memoir_soft_time_limit,
"time_limit": celery_defaults.memoir_hard_time_limit,
},
"app.tasks.story_image_tasks.generate_story_image": {
"soft_time_limit": celery_defaults.image_soft_time_limit,
"time_limit": celery_defaults.image_hard_time_limit,
},
"app.tasks.chapter_cover_tasks.generate_chapter_cover": {
"soft_time_limit": celery_defaults.image_soft_time_limit,
"time_limit": celery_defaults.image_hard_time_limit,
},
"app.tasks.chapter_compose_tasks.recompose_chapter": {
"soft_time_limit": celery_defaults.image_soft_time_limit,
"time_limit": celery_defaults.image_hard_time_limit,
},
"app.tasks.memory_compaction_tasks.memory_compaction_sweep": {
"soft_time_limit": celery_defaults.compaction_sweep_soft_time_limit,
"time_limit": celery_defaults.compaction_sweep_hard_time_limit,
},
}
celery_app.conf.beat_schedule = {
"memory-compaction-sweep": {
"task": "app.tasks.memory_compaction_tasks.memory_compaction_sweep",
"schedule": 6 * 3600.0,
},
}
@worker_shutting_down.connect
def _shutdown_otel_on_worker_exit(**_: object) -> None:
from app.core.telemetry import shutdown_telemetry
shutdown_telemetry()
def _summarize_task_return(retval: object) -> str:
if retval is None:
return "None"
if isinstance(retval, dict):
keys = list(retval.keys())[:14]
return "dict:" + ",".join(str(k) for k in keys)
text = repr(retval)
if len(text) > 180:
return text[:180] + "..."
return text
@task_prerun.connect
def _log_task_prerun(
task_id: str | None = None,
task: object | None = None,
args: tuple[Any, ...] | None = None,
kwargs: dict[str, Any] | None = None,
**_: object,
) -> None:
name = getattr(task, "name", None) or "?"
feat: OpenTelemetry LGTM observability, dev tooling, and memoir UX fixes (#31) (#32) * add staging ios app build script * feat(api): add OpenTelemetry LGTM stack for local observability Wire OTel traces, metrics, and logs through a collector to Tempo, Prometheus, and Loki, with custom LLM instrumentation, dev compose overlay, Grafana provisioning, env templates, and development.sh auto-start. * feat: expand observability, harden dev tooling, and fix expo staging UX Add business and LLM Prometheus metrics with Grafana dashboards, alerting, and a metrics verification script. Wire telemetry through adapters and core LLM paths, and document the local LGTM workflow. Fix development.sh for macOS bash 3.2, open Grafana and eval-web in Chrome, and repair eval-web auto-open (unbound EVAL_WEB_BROWSER_SCHEDULED). Merge internal-eval into the main dev script with improved compose handling. Require EXPO_PUBLIC_* at build time, improve iOS HTTP ATS for staging IPs, show memoir empty state instead of load errors when no chapters exist, and add jest env setup plus chapter list response normalization. * chore: enable Grafana Assistant Cursor plugin * fix: memoir empty state and repair withdrawn 0020_chapters_book_id stamp Show empty memoir UI when the chapter list succeeds with no items; treat auth/404 as non-fatal. Extend alembic revision repair so local dev DBs stamped with the removed 0020_chapters_book_id migration can roll back and upgrade to 0019. --------- Co-authored-by: Kevin <kevin@brighteng.org> Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-20 15:14:13 +08:00
from app.core.telemetry import current_trace_context
extras = celery_prerun_extras(name, tuple(args or ()), dict(kwargs or {}))
if task_id:
extras["task_id"] = str(task_id).strip()
feat: OpenTelemetry LGTM observability, dev tooling, and memoir UX fixes (#31) (#32) * add staging ios app build script * feat(api): add OpenTelemetry LGTM stack for local observability Wire OTel traces, metrics, and logs through a collector to Tempo, Prometheus, and Loki, with custom LLM instrumentation, dev compose overlay, Grafana provisioning, env templates, and development.sh auto-start. * feat: expand observability, harden dev tooling, and fix expo staging UX Add business and LLM Prometheus metrics with Grafana dashboards, alerting, and a metrics verification script. Wire telemetry through adapters and core LLM paths, and document the local LGTM workflow. Fix development.sh for macOS bash 3.2, open Grafana and eval-web in Chrome, and repair eval-web auto-open (unbound EVAL_WEB_BROWSER_SCHEDULED). Merge internal-eval into the main dev script with improved compose handling. Require EXPO_PUBLIC_* at build time, improve iOS HTTP ATS for staging IPs, show memoir empty state instead of load errors when no chapters exist, and add jest env setup plus chapter list response normalization. * chore: enable Grafana Assistant Cursor plugin * fix: memoir empty state and repair withdrawn 0020_chapters_book_id stamp Show empty memoir UI when the chapter list succeeds with no items; treat auth/404 as non-fatal. Extend alembic revision repair so local dev DBs stamped with the removed 0020_chapters_book_id migration can roll back and upgrade to 0019. --------- Co-authored-by: Kevin <kevin@brighteng.org> Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-20 15:14:13 +08:00
extras.update(current_trace_context())
set_celery_log_extras(extras if extras else None)
_celery_lifecycle_log.info(
"event=celery_task_start task={} task_id={} msg=Celery 任务已开始",
name,
task_id,
)
@task_success.connect
def _log_task_success(
sender: object | None = None, result: object | None = None, **_: object
) -> None:
"""仅成功路径;失败见 ``task_failure``(避免 ``task_postrun`` 在异常态仍触发)。"""
name = getattr(sender, "name", None) if sender is not None else None
name = name or "?"
task_id: str | None = None
if sender is not None:
req = getattr(sender, "request", None)
if req is not None:
task_id = getattr(req, "id", None)
_celery_lifecycle_log.info(
"event=celery_task_ok task={} task_id={} result={} msg=Celery 任务已成功结束",
name,
task_id,
_summarize_task_return(result),
)
@task_failure.connect
def _log_task_failure(
task_id: str | None = None,
task: object | None = None,
exception: BaseException | None = None,
**kwargs: object,
) -> None:
name = getattr(task, "name", None) or "?"
et = type(exception).__name__ if isinstance(exception, BaseException) else "?"
_celery_lifecycle_log.warning(
"event=celery_task_failed task={} task_id={} exc_type={} exc={} msg=Celery 任务失败",
name,
task_id,
et,
exception,
)
@task_postrun.connect
def _clear_worker_log_context(**kwargs: object) -> None:
"""任务体结束后清除 ContextVar避免同一 worker 进程串上下文。"""
clear_celery_log_extras()