Files
FishServer/fish_api/app/services/measure_watch.py
2026-05-13 09:19:31 +08:00

418 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""后台轮询目录中的 .svo2逐段跑 FishMeasure齐套后聚合 final与 ingest 共用 SQLite"""
from __future__ import annotations
import asyncio
import hashlib
from pathlib import Path
from typing import Dict, List, Set, Tuple
from loguru import logger
from app.compat import to_thread
from app.db import (
add_watch_processed,
load_watch_processed,
measure_snapshot_deliverable,
save_measure_snapshot,
)
from app.logging_config import new_run_id, stage
from app.services import measure as measure_svc
from app.settings import Settings
from app.state import app_state
from app.watch_idle import IdleWatchWarnState, idle_warn_interval_sec, maybe_warn_idle_watch
_MEASURE_IDLE_WARN_INTERVAL_SEC = idle_warn_interval_sec(
"FISH_MEASURE_WATCH_IDLE_WARN_INTERVAL_SEC"
)
def _state_path(settings: Settings) -> Path:
"""返回旧版 JSON 状态文件路径(仅用于兼容导入 SQLite"""
assert settings.measure_watch_dir is not None
return settings.measure_watch_dir / ".fishmeasure_watch_processed.json"
def iter_svo2_folders(watch_dir: Path) -> List[Tuple[List[Path], str]]:
"""扫描子文件夹,返回 (svo文件路径列表, fish_id) 列表。
文件夹命名格式为 fish{N}。每个子文件夹内多个 .svo2 先逐段测量,齐套后再聚合 final。
"""
result = [] # type: List[Tuple[List[Path], str]]
if not watch_dir.is_dir():
return result
for entry in sorted(watch_dir.iterdir()):
if not entry.is_dir():
continue
folder_name = entry.name
if not folder_name.startswith("fish"):
continue
try:
fish_id = folder_name[4:]
if not fish_id.isdigit():
continue
except (IndexError, ValueError):
continue
svo_files = sorted([
p for p in entry.iterdir()
if p.is_file() and p.suffix.lower() == ".svo2"
])
if svo_files:
result.append((svo_files, fish_id))
return result
def _final_processed_key(fish_id: str, svo_list: List[Path]) -> str:
sig = "|".join(sorted(str(p.resolve()) for p in svo_list))
h = hashlib.sha256(sig.encode("utf-8")).hexdigest()[:24]
return f"__measure_final__fish{fish_id}:{h}"
def _folder_size_tuple(svo_list: List[Path]) -> Tuple[Tuple[str, int], ...]:
out: List[Tuple[str, int]] = []
for p in sorted(svo_list, key=lambda x: str(x.resolve())):
try:
st = p.stat()
out.append((str(p.resolve()), int(st.st_size)))
except OSError:
return tuple()
return tuple(out)
async def _run_single_svo_measure(
svo: Path,
fish_id: str,
settings: Settings,
processed: Set[str],
state_file: Path,
) -> None:
key = str(svo.resolve())
fish_folder = svo.parent.resolve()
fish_output_root = settings.measure_output_root / f"fish{fish_id}"
fish_output_root.mkdir(parents=True, exist_ok=True)
try:
size_mb = svo.stat().st_size / (1024 * 1024)
except OSError:
size_mb = 0.0
rid = new_run_id("measure_watch")
log = logger.bind(pipeline="measure_watch", run_id=rid, fish_id=fish_id)
log.info(
"[测量监控] 触发分段测量 | fish_id={} | svo={} | 大小={:.1f} MB",
fish_id, svo.name, size_mb,
)
async with app_state.measure_lock:
app_state.measure_status = "running"
try:
with stage(
f"分段测量 fish{fish_id}/{svo.name}",
pipeline="measure_watch",
step="segment_total",
run_id=rid,
source=svo.name,
fish_id=fish_id,
metrics={"fish_id": fish_id, "svo_size_mb": round(size_mb, 2)},
):
def _run():
with app_state.measure_thread_lock:
return measure_svc.run_full_measure(
svo, settings, output_root=fish_output_root, run_id=rid
)
snap = await to_thread(_run)
snap = measure_svc.tag_measure_snapshot_meta(
snap,
measurement_phase="segment",
fish_folder=str(fish_folder),
segment_source=str(svo.resolve()),
)
if measure_snapshot_deliverable(snap):
save_measure_snapshot(
settings,
snap,
source_path=str(svo.resolve()),
client_id=None,
)
else:
log.warning(
"[测量监控] 无可投递结果,跳过写库 | fish_id={} | svo={}",
fish_id, svo.name,
)
app_state.measure_status = "idle"
processed.add(key)
if settings.measure_watch_use_state_file:
add_watch_processed(settings, key, "measure")
r0 = snap.result[0] if snap.result else {}
log.info(
"[测量监控] 分段完成 | fish_id={} | svo={} | 体重={} g | 体长={} mm | 置信(*)={}",
fish_id,
svo.name,
r0.get("weight", "") or "-",
r0.get("length", "") or "-",
"" if snap.star else "",
)
except (RuntimeError, FileNotFoundError) as e:
log.warning(
"[测量监控] 分段测量失败 | fish_id={} | svo={} | {}: {}",
fish_id, svo.name, type(e).__name__, e,
)
app_state.measure_status = "idle"
processed.add(key)
if settings.measure_watch_use_state_file:
add_watch_processed(settings, key, "measure")
except Exception as e:
log.exception(
"[测量监控] 分段测量异常 | fish_id={} | svo={} | {}",
fish_id, svo.name, e,
)
app_state.measure_status = "idle"
processed.add(key)
if settings.measure_watch_use_state_file:
add_watch_processed(settings, key, "measure")
async def _run_final_aggregate(
svo_list: List[Path],
fish_id: str,
settings: Settings,
processed: Set[str],
state_file: Path,
final_key: str,
) -> None:
fish_folder = svo_list[0].parent.resolve()
rid = new_run_id("measure_watch_final")
log = logger.bind(pipeline="measure_watch", run_id=rid, fish_id=fish_id)
log.info(
"[测量监控] 触发齐套聚合 | fish_id={} | 段数={} | 文件夹={}",
fish_id, len(svo_list), fish_folder,
)
async with app_state.measure_lock:
app_state.measure_status = "running"
try:
with stage(
f"齐套聚合 fish{fish_id}{len(svo_list)} 段)",
pipeline="measure_watch",
step="final_total",
run_id=rid,
source=str(fish_folder),
fish_id=fish_id,
metrics={
"fish_id": fish_id,
"segment_count": len(svo_list),
"aggregate_mode": settings.measure_final_aggregate_mode,
},
):
def _reload():
return measure_svc.reload_segment_snapshots_for_aggregate(
svo_list, fish_id, settings
)
pairs = await to_thread(_reload)
contributing_svos = [p[0] for p in pairs]
segments = [p[1] for p in pairs]
paths_joined = "|".join(sorted(str(p.resolve()) for p in contributing_svos))
snap = measure_svc.build_measure_snapshot_aggregate(
segments,
fish_id,
settings,
contributing_svos=contributing_svos,
fish_folder=str(fish_folder),
segment_source_paths=paths_joined,
)
if measure_snapshot_deliverable(snap):
try:
v_left, v_right = await to_thread(
measure_svc.generate_aggregate_preview_media,
contributing_svos,
snap,
fish_id,
settings,
final_key=final_key,
run_id=rid,
)
snap.video_left = v_left
snap.video_right = v_right
except Exception as e:
log.warning(
"[测量监控] 齐套预览视频生成失败 | fish_id={} | {}",
fish_id, e,
)
save_measure_snapshot(
settings,
snap,
source_path=f"aggregate:{final_key}",
client_id=None,
)
else:
log.warning(
"[测量监控] 齐套结果无可投递行,跳过写库 | fish_id={}",
fish_id,
)
app_state.measure_status = "idle"
processed.add(final_key)
if settings.measure_watch_use_state_file:
add_watch_processed(settings, final_key, "measure")
r0 = snap.result[0] if snap.result else {}
log.info(
"[测量监控] 齐套完成 | fish_id={} | 体重={} g | 体长={} mm | 置信(*)={}",
fish_id,
r0.get("weight", "") or "-",
r0.get("length", "") or "-",
"" if snap.star else "",
)
except Exception as e:
log.exception(
"[测量监控] 齐套聚合异常 | fish_id={} | {}", fish_id, e,
)
app_state.measure_status = "idle"
processed.add(final_key)
if settings.measure_watch_use_state_file:
add_watch_processed(settings, final_key, "measure")
async def watch_tick(
settings: Settings,
processed: Set[str],
stability: Dict[str, Tuple[int, int]],
final_stability: Dict[str, Tuple[Tuple[Tuple[str, int], ...], int]],
state_file: Path,
) -> bool:
"""逐段稳定即测量;同一 fish 目录全部段已处理且整体稳定后写 final。"""
assert settings.measure_watch_dir is not None
watch_dir = settings.measure_watch_dir
did = False
seen_keys: Set[str] = set()
for svo_list, fish_id in iter_svo2_folders(watch_dir):
if not svo_list:
continue
fish_folder = svo_list[0].parent
folder_key = str(fish_folder.resolve())
for svo in svo_list:
key = str(svo.resolve())
seen_keys.add(key)
if key in processed:
continue
try:
st = svo.stat()
except OSError:
continue
size = int(st.st_size)
if size <= 0:
stability.pop(key, None)
continue
last = stability.get(key)
if last is None or last[0] != size:
stability[key] = (size, 1)
else:
_, cnt = last
cnt += 1
stability[key] = (size, cnt)
if cnt >= settings.measure_watch_stable_polls:
await _run_single_svo_measure(
svo, fish_id, settings, processed, state_file
)
stability.pop(key, None)
did = True
fk = _final_processed_key(fish_id, svo_list)
if fk in processed:
continue
if not all(str(p.resolve()) in processed for p in svo_list):
final_stability.pop(folder_key, None)
continue
tup = _folder_size_tuple(svo_list)
if not tup:
continue
prev = final_stability.get(folder_key)
if prev is None or prev[0] != tup:
final_stability[folder_key] = (tup, 1)
else:
_, c = prev
c += 1
final_stability[folder_key] = (tup, c)
if c >= settings.measure_watch_stable_polls:
await _run_final_aggregate(
svo_list, fish_id, settings, processed, state_file, fk
)
final_stability.pop(folder_key, None)
did = True
for k in list(stability.keys()):
if k not in seen_keys:
del stability[k]
return did
async def run_measure_watch_loop(settings: Settings) -> None:
assert settings.measure_watch_dir is not None
wd = settings.measure_watch_dir
if not wd.is_dir():
logger.bind(pipeline="measure_watch").warning(
"[测量监控] 启动跳过:路径不是目录 {}", wd
)
return
state_file = _state_path(settings)
processed: Set[str] = (
load_watch_processed(settings, state_file, "measure")
if settings.measure_watch_use_state_file
else set()
)
stability = {} # type: Dict[str, Tuple[int, int]]
final_stability = {} # type: Dict[str, Tuple[Tuple[Tuple[str, int], ...], int]]
logger.bind(pipeline="measure_watch").info(
"[测量监控] 监控目录已启动 | dir={} | poll={}s | stable_polls={} | aggregate={} | state={} {}",
wd,
settings.measure_watch_poll_interval,
settings.measure_watch_stable_polls,
settings.measure_final_aggregate_mode,
"on" if settings.measure_watch_use_state_file else "off",
state_file if settings.measure_watch_use_state_file else "",
)
idle_warn_state = IdleWatchWarnState()
while True:
did = await watch_tick(
settings, processed, stability, final_stability, state_file
)
maybe_warn_idle_watch(
did_work=did,
log_tag="measure-watch",
algo_name="FishMeasure",
idle_hint="目录内无 fish{N} 子文件夹、已全部处理完毕,或文件尚未达到稳定轮询次数",
watch_dir=wd,
state=idle_warn_state,
interval_sec=_MEASURE_IDLE_WARN_INTERVAL_SEC,
)
await asyncio.sleep(max(settings.measure_watch_poll_interval, 0.1))