配置 SSOT(TOML + .env) 统一错误契约 Auth 与事务边界 Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client 可观测性(OpenTelemetry + LGTM)
64 lines
2.0 KiB
Python
64 lines
2.0 KiB
Python
"""开发环境:可选在 API 启动时清空 Celery broker 队列(queue_purge,不 FLUSH 整库)。"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
from typing import TYPE_CHECKING
|
||
|
||
from app.core.config import settings
|
||
from app.core.logging import get_logger
|
||
|
||
if TYPE_CHECKING:
|
||
from redis.asyncio import Redis
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
_PURGE_GATE_KEY = "life_echo:celery_dev_broker_purge_gate"
|
||
_PURGE_GATE_TTL_SEC = 25
|
||
|
||
|
||
def _is_production_environment() -> bool:
|
||
env = (settings.app_environment or "").strip().lower()
|
||
return env in ("production", "prod")
|
||
|
||
|
||
def _purge_celery_queues_sync() -> int:
|
||
from app.tasks.celery_app import celery_app
|
||
|
||
names = set(celery_app.amqp.queues.keys())
|
||
if not names:
|
||
return 0
|
||
total = 0
|
||
with celery_app.connection_for_write() as conn:
|
||
channel = conn.default_channel
|
||
for q in sorted(names):
|
||
try:
|
||
n = channel.queue_purge(q)
|
||
total += int(n or 0)
|
||
except conn.channel_errors:
|
||
logger.debug("Celery queue_purge 跳过 queue={}", q)
|
||
return total
|
||
|
||
|
||
async def maybe_purge_celery_broker_on_startup(redis_client: Redis) -> None:
|
||
"""在已连接的 Redis 上抢门闩后清空已知任务队列;生产环境永不执行。"""
|
||
if _is_production_environment():
|
||
return
|
||
got = await redis_client.set(
|
||
_PURGE_GATE_KEY,
|
||
"1",
|
||
nx=True,
|
||
ex=_PURGE_GATE_TTL_SEC,
|
||
)
|
||
if not got:
|
||
logger.debug("Celery broker 清空跳过(短时门闩已由其他进程设置)")
|
||
return
|
||
try:
|
||
purged = await asyncio.to_thread(_purge_celery_queues_sync)
|
||
if purged:
|
||
logger.info("开发环境已清空 Celery broker 队列(移除 {} 条消息)", purged)
|
||
else:
|
||
logger.info("开发环境 Celery broker 无积压消息(已尝试 purge)")
|
||
except Exception as e:
|
||
logger.warning("清空 Celery broker 队列失败(可忽略): {}", e)
|