Files

57 lines
1.5 KiB
Python
Raw Permalink Normal View History

"""后台 watch 无新任务时的限流 warningFishMeasure / FishAction 共用)。"""
from __future__ import annotations
import os
import time
from pathlib import Path
from loguru import logger
def idle_warn_interval_sec(override_env: str) -> float:
"""优先读 override_env否则 FISH_WATCH_IDLE_WARN_INTERVAL_SEC默认 120。"""
v = os.environ.get(override_env)
if v is not None and str(v).strip() != "":
return float(v)
return float(os.environ.get("FISH_WATCH_IDLE_WARN_INTERVAL_SEC", "120"))
class IdleWatchWarnState:
"""记录上次告警时间;有推理成功时重置计时。"""
__slots__ = ("_last_mono",)
def __init__(self) -> None:
self._last_mono = time.monotonic()
def on_did_work(self) -> None:
self._last_mono = time.monotonic()
def maybe_warn_idle_watch(
*,
did_work: bool,
log_tag: str,
algo_name: str,
idle_hint: str,
watch_dir: Path,
state: IdleWatchWarnState,
interval_sec: float,
) -> None:
"""本轮未跑推理时按 interval 限流打 warninginterval<=0 则每轮无任务都打。"""
if did_work:
state.on_did_work()
return
now = time.monotonic()
if interval_sec > 0 and now - state._last_mono < interval_sec:
return
state._last_mono = now
logger.warning(
"[{}] 本轮无新数据可供 {} 处理({} | dir={}",
log_tag,
algo_name,
idle_hint,
watch_dir,
)