"""后台轮询目录中的 .svo2,逐段跑 FishMeasure,齐套后聚合 final(与 ingest 共用 SQLite)。""" from __future__ import annotations import asyncio import hashlib from pathlib import Path from typing import Dict, List, Set, Tuple from loguru import logger from app.compat import to_thread from app.db import ( add_watch_processed, load_watch_processed, measure_snapshot_deliverable, save_measure_snapshot, ) from app.logging_config import new_run_id, stage from app.services import measure as measure_svc from app.settings import Settings from app.state import app_state from app.watch_idle import IdleWatchWarnState, idle_warn_interval_sec, maybe_warn_idle_watch _MEASURE_IDLE_WARN_INTERVAL_SEC = idle_warn_interval_sec( "FISH_MEASURE_WATCH_IDLE_WARN_INTERVAL_SEC" ) def _state_path(settings: Settings) -> Path: """返回旧版 JSON 状态文件路径(仅用于兼容导入 SQLite)。""" assert settings.measure_watch_dir is not None return settings.measure_watch_dir / ".fishmeasure_watch_processed.json" def iter_svo2_folders(watch_dir: Path) -> List[Tuple[List[Path], str]]: """扫描子文件夹,返回 (svo文件路径列表, fish_id) 列表。 文件夹命名格式为 fish{N}。每个子文件夹内多个 .svo2 先逐段测量,齐套后再聚合 final。 """ result = [] # type: List[Tuple[List[Path], str]] if not watch_dir.is_dir(): return result for entry in sorted(watch_dir.iterdir()): if not entry.is_dir(): continue folder_name = entry.name if not folder_name.startswith("fish"): continue try: fish_id = folder_name[4:] if not fish_id.isdigit(): continue except (IndexError, ValueError): continue svo_files = sorted([ p for p in entry.iterdir() if p.is_file() and p.suffix.lower() == ".svo2" ]) if svo_files: result.append((svo_files, fish_id)) return result def _final_processed_key(fish_id: str, svo_list: List[Path]) -> str: sig = "|".join(sorted(str(p.resolve()) for p in svo_list)) h = hashlib.sha256(sig.encode("utf-8")).hexdigest()[:24] return f"__measure_final__fish{fish_id}:{h}" def _folder_size_tuple(svo_list: List[Path]) -> Tuple[Tuple[str, int], ...]: out: List[Tuple[str, int]] = [] for p in sorted(svo_list, key=lambda x: str(x.resolve())): try: st = p.stat() out.append((str(p.resolve()), int(st.st_size))) except OSError: return tuple() return tuple(out) async def _run_single_svo_measure( svo: Path, fish_id: str, settings: Settings, processed: Set[str], state_file: Path, ) -> None: key = str(svo.resolve()) fish_folder = svo.parent.resolve() fish_output_root = settings.measure_output_root / f"fish{fish_id}" fish_output_root.mkdir(parents=True, exist_ok=True) try: size_mb = svo.stat().st_size / (1024 * 1024) except OSError: size_mb = 0.0 rid = new_run_id("measure_watch") log = logger.bind(pipeline="measure_watch", run_id=rid, fish_id=fish_id) log.info( "[测量监控] 触发分段测量 | fish_id={} | svo={} | 大小={:.1f} MB", fish_id, svo.name, size_mb, ) async with app_state.measure_lock: app_state.measure_status = "running" try: with stage( f"分段测量 fish{fish_id}/{svo.name}", pipeline="measure_watch", step="segment_total", run_id=rid, source=svo.name, fish_id=fish_id, metrics={"fish_id": fish_id, "svo_size_mb": round(size_mb, 2)}, ): def _run(): with app_state.measure_thread_lock: return measure_svc.run_full_measure( svo, settings, output_root=fish_output_root, run_id=rid ) snap = await to_thread(_run) snap = measure_svc.tag_measure_snapshot_meta( snap, measurement_phase="segment", fish_folder=str(fish_folder), segment_source=str(svo.resolve()), ) if measure_snapshot_deliverable(snap): save_measure_snapshot( settings, snap, source_path=str(svo.resolve()), client_id=None, ) else: log.warning( "[测量监控] 无可投递结果,跳过写库 | fish_id={} | svo={}", fish_id, svo.name, ) app_state.measure_status = "idle" processed.add(key) if settings.measure_watch_use_state_file: add_watch_processed(settings, key, "measure") r0 = snap.result[0] if snap.result else {} log.info( "[测量监控] 分段完成 | fish_id={} | svo={} | 体重={} g | 体长={} mm | 置信(*)={}", fish_id, svo.name, r0.get("weight", "") or "-", r0.get("length", "") or "-", "是" if snap.star else "否", ) except (RuntimeError, FileNotFoundError) as e: log.warning( "[测量监控] 分段测量失败 | fish_id={} | svo={} | {}: {}", fish_id, svo.name, type(e).__name__, e, ) app_state.measure_status = "idle" processed.add(key) if settings.measure_watch_use_state_file: add_watch_processed(settings, key, "measure") except Exception as e: log.exception( "[测量监控] 分段测量异常 | fish_id={} | svo={} | {}", fish_id, svo.name, e, ) app_state.measure_status = "idle" processed.add(key) if settings.measure_watch_use_state_file: add_watch_processed(settings, key, "measure") async def _run_final_aggregate( svo_list: List[Path], fish_id: str, settings: Settings, processed: Set[str], state_file: Path, final_key: str, ) -> None: fish_folder = svo_list[0].parent.resolve() rid = new_run_id("measure_watch_final") log = logger.bind(pipeline="measure_watch", run_id=rid, fish_id=fish_id) log.info( "[测量监控] 触发齐套聚合 | fish_id={} | 段数={} | 文件夹={}", fish_id, len(svo_list), fish_folder, ) async with app_state.measure_lock: app_state.measure_status = "running" try: with stage( f"齐套聚合 fish{fish_id}({len(svo_list)} 段)", pipeline="measure_watch", step="final_total", run_id=rid, source=str(fish_folder), fish_id=fish_id, metrics={ "fish_id": fish_id, "segment_count": len(svo_list), "aggregate_mode": settings.measure_final_aggregate_mode, }, ): def _reload(): return measure_svc.reload_segment_snapshots_for_aggregate( svo_list, fish_id, settings ) pairs = await to_thread(_reload) contributing_svos = [p[0] for p in pairs] segments = [p[1] for p in pairs] paths_joined = "|".join(sorted(str(p.resolve()) for p in contributing_svos)) snap = measure_svc.build_measure_snapshot_aggregate( segments, fish_id, settings, contributing_svos=contributing_svos, fish_folder=str(fish_folder), segment_source_paths=paths_joined, ) if measure_snapshot_deliverable(snap): try: v_left, v_right = await to_thread( measure_svc.generate_aggregate_preview_media, contributing_svos, snap, fish_id, settings, final_key=final_key, run_id=rid, ) snap.video_left = v_left snap.video_right = v_right except Exception as e: log.warning( "[测量监控] 齐套预览视频生成失败 | fish_id={} | {}", fish_id, e, ) save_measure_snapshot( settings, snap, source_path=f"aggregate:{final_key}", client_id=None, ) else: log.warning( "[测量监控] 齐套结果无可投递行,跳过写库 | fish_id={}", fish_id, ) app_state.measure_status = "idle" processed.add(final_key) if settings.measure_watch_use_state_file: add_watch_processed(settings, final_key, "measure") r0 = snap.result[0] if snap.result else {} log.info( "[测量监控] 齐套完成 | fish_id={} | 体重={} g | 体长={} mm | 置信(*)={}", fish_id, r0.get("weight", "") or "-", r0.get("length", "") or "-", "是" if snap.star else "否", ) except Exception as e: log.exception( "[测量监控] 齐套聚合异常 | fish_id={} | {}", fish_id, e, ) app_state.measure_status = "idle" processed.add(final_key) if settings.measure_watch_use_state_file: add_watch_processed(settings, final_key, "measure") async def watch_tick( settings: Settings, processed: Set[str], stability: Dict[str, Tuple[int, int]], final_stability: Dict[str, Tuple[Tuple[Tuple[str, int], ...], int]], state_file: Path, ) -> bool: """逐段稳定即测量;同一 fish 目录全部段已处理且整体稳定后写 final。""" assert settings.measure_watch_dir is not None watch_dir = settings.measure_watch_dir did = False seen_keys: Set[str] = set() for svo_list, fish_id in iter_svo2_folders(watch_dir): if not svo_list: continue fish_folder = svo_list[0].parent folder_key = str(fish_folder.resolve()) for svo in svo_list: key = str(svo.resolve()) seen_keys.add(key) if key in processed: continue try: st = svo.stat() except OSError: continue size = int(st.st_size) if size <= 0: stability.pop(key, None) continue last = stability.get(key) if last is None or last[0] != size: stability[key] = (size, 1) else: _, cnt = last cnt += 1 stability[key] = (size, cnt) if cnt >= settings.measure_watch_stable_polls: await _run_single_svo_measure( svo, fish_id, settings, processed, state_file ) stability.pop(key, None) did = True fk = _final_processed_key(fish_id, svo_list) if fk in processed: continue if not all(str(p.resolve()) in processed for p in svo_list): final_stability.pop(folder_key, None) continue tup = _folder_size_tuple(svo_list) if not tup: continue prev = final_stability.get(folder_key) if prev is None or prev[0] != tup: final_stability[folder_key] = (tup, 1) else: _, c = prev c += 1 final_stability[folder_key] = (tup, c) if c >= settings.measure_watch_stable_polls: await _run_final_aggregate( svo_list, fish_id, settings, processed, state_file, fk ) final_stability.pop(folder_key, None) did = True for k in list(stability.keys()): if k not in seen_keys: del stability[k] return did async def run_measure_watch_loop(settings: Settings) -> None: assert settings.measure_watch_dir is not None wd = settings.measure_watch_dir if not wd.is_dir(): logger.bind(pipeline="measure_watch").warning( "[测量监控] 启动跳过:路径不是目录 {}", wd ) return state_file = _state_path(settings) processed: Set[str] = ( load_watch_processed(settings, state_file, "measure") if settings.measure_watch_use_state_file else set() ) stability = {} # type: Dict[str, Tuple[int, int]] final_stability = {} # type: Dict[str, Tuple[Tuple[Tuple[str, int], ...], int]] logger.bind(pipeline="measure_watch").info( "[测量监控] 监控目录已启动 | dir={} | poll={}s | stable_polls={} | aggregate={} | state={} {}", wd, settings.measure_watch_poll_interval, settings.measure_watch_stable_polls, settings.measure_final_aggregate_mode, "on" if settings.measure_watch_use_state_file else "off", state_file if settings.measure_watch_use_state_file else "", ) idle_warn_state = IdleWatchWarnState() while True: did = await watch_tick( settings, processed, stability, final_stability, state_file ) maybe_warn_idle_watch( did_work=did, log_tag="measure-watch", algo_name="FishMeasure", idle_hint="目录内无 fish{N} 子文件夹、已全部处理完毕,或文件尚未达到稳定轮询次数", watch_dir=wd, state=idle_warn_state, interval_sec=_MEASURE_IDLE_WARN_INTERVAL_SEC, ) await asyncio.sleep(max(settings.measure_watch_poll_interval, 0.1))