64 lines
2.0 KiB
Python
64 lines
2.0 KiB
Python
|
|
"""开发环境:可选在 API 启动时清空 Celery broker 队列(queue_purge,不 FLUSH 整库)。"""
|
|||
|
|
|
|||
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
import asyncio
|
|||
|
|
from typing import TYPE_CHECKING
|
|||
|
|
|
|||
|
|
from app.core.config import settings
|
|||
|
|
from app.core.logging import get_logger
|
|||
|
|
|
|||
|
|
if TYPE_CHECKING:
|
|||
|
|
from redis.asyncio import Redis
|
|||
|
|
|
|||
|
|
logger = get_logger(__name__)
|
|||
|
|
|
|||
|
|
_PURGE_GATE_KEY = "life_echo:celery_dev_broker_purge_gate"
|
|||
|
|
_PURGE_GATE_TTL_SEC = 25
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _is_production_environment() -> bool:
|
|||
|
|
env = (settings.app_environment or "").strip().lower()
|
|||
|
|
return env in ("production", "prod")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _purge_celery_queues_sync() -> int:
|
|||
|
|
from app.tasks.celery_app import celery_app
|
|||
|
|
|
|||
|
|
names = set(celery_app.amqp.queues.keys())
|
|||
|
|
if not names:
|
|||
|
|
return 0
|
|||
|
|
total = 0
|
|||
|
|
with celery_app.connection_for_write() as conn:
|
|||
|
|
channel = conn.default_channel
|
|||
|
|
for q in sorted(names):
|
|||
|
|
try:
|
|||
|
|
n = channel.queue_purge(q)
|
|||
|
|
total += int(n or 0)
|
|||
|
|
except conn.channel_errors:
|
|||
|
|
logger.debug("Celery queue_purge 跳过 queue={}", q)
|
|||
|
|
return total
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def maybe_purge_celery_broker_on_startup(redis_client: Redis) -> None:
|
|||
|
|
"""在已连接的 Redis 上抢门闩后清空已知任务队列;生产环境永不执行。"""
|
|||
|
|
if _is_production_environment():
|
|||
|
|
return
|
|||
|
|
got = await redis_client.set(
|
|||
|
|
_PURGE_GATE_KEY,
|
|||
|
|
"1",
|
|||
|
|
nx=True,
|
|||
|
|
ex=_PURGE_GATE_TTL_SEC,
|
|||
|
|
)
|
|||
|
|
if not got:
|
|||
|
|
logger.debug("Celery broker 清空跳过(短时门闩已由其他进程设置)")
|
|||
|
|
return
|
|||
|
|
try:
|
|||
|
|
purged = await asyncio.to_thread(_purge_celery_queues_sync)
|
|||
|
|
if purged:
|
|||
|
|
logger.info("开发环境已清空 Celery broker 队列(移除 {} 条消息)", purged)
|
|||
|
|
else:
|
|||
|
|
logger.info("开发环境 Celery broker 无积压消息(已尝试 purge)")
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.warning("清空 Celery broker 队列失败(可忽略): {}", e)
|