"""开发环境:可选在 API 启动时清空 Celery broker 队列(queue_purge,不 FLUSH 整库)。""" from __future__ import annotations import asyncio from typing import TYPE_CHECKING from app.core.config import settings from app.core.logging import get_logger if TYPE_CHECKING: from redis.asyncio import Redis logger = get_logger(__name__) _PURGE_GATE_KEY = "life_echo:celery_dev_broker_purge_gate" _PURGE_GATE_TTL_SEC = 25 def _is_production_environment() -> bool: env = (settings.app_environment or "").strip().lower() return env in ("production", "prod") def _purge_celery_queues_sync() -> int: from app.tasks.celery_app import celery_app names = set(celery_app.amqp.queues.keys()) if not names: return 0 total = 0 with celery_app.connection_for_write() as conn: channel = conn.default_channel for q in sorted(names): try: n = channel.queue_purge(q) total += int(n or 0) except conn.channel_errors: logger.debug("Celery queue_purge 跳过 queue={}", q) return total async def maybe_purge_celery_broker_on_startup(redis_client: Redis) -> None: """在已连接的 Redis 上抢门闩后清空已知任务队列;生产环境永不执行。""" if _is_production_environment(): return got = await redis_client.set( _PURGE_GATE_KEY, "1", nx=True, ex=_PURGE_GATE_TTL_SEC, ) if not got: logger.debug("Celery broker 清空跳过(短时门闩已由其他进程设置)") return try: purged = await asyncio.to_thread(_purge_celery_queues_sync) if purged: logger.info("开发环境已清空 Celery broker 队列(移除 {} 条消息)", purged) else: logger.info("开发环境 Celery broker 无积压消息(已尝试 purge)") except Exception as e: logger.warning("清空 Celery broker 队列失败(可忽略): {}", e)