Files
life-echo/api/app/core/celery_broker_dev.py

64 lines
2.0 KiB
Python
Raw Permalink Normal View History

"""开发环境:可选在 API 启动时清空 Celery broker 队列queue_purge不 FLUSH 整库)。"""
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
from app.core.config import settings
from app.core.logging import get_logger
if TYPE_CHECKING:
from redis.asyncio import Redis
logger = get_logger(__name__)
_PURGE_GATE_KEY = "life_echo:celery_dev_broker_purge_gate"
_PURGE_GATE_TTL_SEC = 25
def _is_production_environment() -> bool:
env = (settings.app_environment or "").strip().lower()
return env in ("production", "prod")
def _purge_celery_queues_sync() -> int:
from app.tasks.celery_app import celery_app
names = set(celery_app.amqp.queues.keys())
if not names:
return 0
total = 0
with celery_app.connection_for_write() as conn:
channel = conn.default_channel
for q in sorted(names):
try:
n = channel.queue_purge(q)
total += int(n or 0)
except conn.channel_errors:
logger.debug("Celery queue_purge 跳过 queue={}", q)
return total
async def maybe_purge_celery_broker_on_startup(redis_client: Redis) -> None:
"""在已连接的 Redis 上抢门闩后清空已知任务队列;生产环境永不执行。"""
if _is_production_environment():
return
got = await redis_client.set(
_PURGE_GATE_KEY,
"1",
nx=True,
ex=_PURGE_GATE_TTL_SEC,
)
if not got:
logger.debug("Celery broker 清空跳过(短时门闩已由其他进程设置)")
return
try:
purged = await asyncio.to_thread(_purge_celery_queues_sync)
if purged:
logger.info("开发环境已清空 Celery broker 队列(移除 {} 条消息)", purged)
else:
logger.info("开发环境 Celery broker 无积压消息(已尝试 purge")
except Exception as e:
logger.warning("清空 Celery broker 队列失败(可忽略): {}", e)