"""后台轮询目录中的 .svo2,跑 FishMeasure,写入 SQLite(与 ingest 共用)。""" from __future__ import annotations import asyncio from pathlib import Path from typing import Dict, Set from loguru import logger from app.db import ( add_watch_processed, load_watch_processed, measure_snapshot_deliverable, save_measure_snapshot, ) from app.services import measure as measure_svc from app.settings import Settings from app.state import app_state from app.watch_idle import IdleWatchWarnState, idle_warn_interval_sec, maybe_warn_idle_watch _MEASURE_IDLE_WARN_INTERVAL_SEC = idle_warn_interval_sec( "FISH_MEASURE_WATCH_IDLE_WARN_INTERVAL_SEC" ) def _state_path(settings: Settings) -> Path: """返回旧版 JSON 状态文件路径(仅用于兼容导入 SQLite)。""" assert settings.measure_watch_dir is not None return settings.measure_watch_dir / ".fishmeasure_watch_processed.json" def iter_svo2_folders(watch_dir: Path) -> list[tuple[list[Path], str]]: """扫描子文件夹,返回 (svo文件路径列表, fish_id) 列表。 文件夹命名格式为 fish{N},如 fish1、fish2 等。 每个子文件夹可以包含多个 .svo2 文件(同一条鱼的多段视频), 这些 SVO 文件会被批量处理,点云合并后进行重量预测。 """ result: list[tuple[list[Path], str]] = [] if not watch_dir.is_dir(): return result for entry in sorted(watch_dir.iterdir()): if not entry.is_dir(): continue # 从文件夹名提取 fish_id,格式为 fish{N} folder_name = entry.name if not folder_name.startswith("fish"): continue try: fish_id = folder_name[4:] # 去掉 "fish" 前缀 if not fish_id.isdigit(): continue except (IndexError, ValueError): continue # 在子文件夹中查找所有 .svo2 文件 svo_files = sorted([ p for p in entry.iterdir() if p.is_file() and p.suffix.lower() == ".svo2" ]) if svo_files: # 返回该文件夹中的所有 SVO 文件,它们将被批量处理 result.append((svo_files, fish_id)) return result async def _run_measure_and_state( svo_list: list[Path], fish_id: str, settings: Settings, processed: Set[str], state_file: Path, ) -> None: """批量处理同一条鱼的多个 SVO:合并点云后一次 DGCNN;解析与 test_dgcnn summary 对齐。 """ if not svo_list: return # 生成唯一的 key 列表(用于 processed 标记) keys = [str(svo.resolve()) for svo in svo_list] # 检查是否全部已处理 if all(key in processed for key in keys): return svo_names = ", ".join(svo.name for svo in svo_list) logger.info("[measure-watch] batch inference for fish_id={}: {} SVO(s): {}", fish_id, len(svo_list), svo_names) async with app_state.measure_lock: app_state.measure_status = "running" try: # 使用 batch 模式处理所有 SVO,传入 fish_id 作为结果 id snap = await asyncio.to_thread( measure_svc.run_full_measure_batch, svo_list, settings, fish_id ) if measure_snapshot_deliverable(snap): # 保存结果,client_id=None 表示对所有客户端可见 # fish_id 只用于 result 中的 id 字段,不作为 client_id source_paths = "|".join(keys) # 合并所有 source_path save_measure_snapshot( settings, snap, source_path=source_paths, client_id=None ) else: logger.warning( "[measure-watch] no deliverable measure rows for fish_id={}, skip SQLite", fish_id, ) app_state.measure_status = "idle" # 标记所有 SVO 为已处理 for key in keys: processed.add(key) if settings.measure_watch_use_state_file: add_watch_processed(settings, key, "measure") r0 = snap.result[0] if snap.result else {} w = r0.get("weight", "") logger.info( "[measure-watch] done: fish_id={} SVOs={} weight={!r}", fish_id, len(svo_list), w ) except (RuntimeError, FileNotFoundError) as e: logger.warning("[measure-watch] measure failed for fish_id={}: {}", fish_id, e) app_state.measure_status = "idle" for key in keys: processed.add(key) if settings.measure_watch_use_state_file: add_watch_processed(settings, key, "measure") except Exception as e: logger.exception("[measure-watch] error on fish_id={}: {}", fish_id, e) app_state.measure_status = "idle" for key in keys: processed.add(key) if settings.measure_watch_use_state_file: add_watch_processed(settings, key, "measure") async def watch_tick( settings: Settings, processed: Set[str], stability: Dict[str, tuple[int, int]], state_file: Path, ) -> bool: """处理一轮目录扫描;若处理了至少一个文件返回 True。 使用 batch 模式:同一条鱼(fish{N} 文件夹)下的所有 SVO 文件会被一起处理, 点云合并后进行重量预测(与 test_dgcnn.sh --batch-root 相同的逻辑)。 """ assert settings.measure_watch_dir is not None watch_dir = settings.measure_watch_dir did = False seen_keys: Set[str] = set() # 使用新的子文件夹扫描方式,返回 (svo_list, fish_id) for svo_list, fish_id in iter_svo2_folders(watch_dir): if not svo_list: continue # 为该 fish 文件夹中的所有 SVO 文件计算稳定性 # 只有当所有 SVO 都达到稳定轮询次数时才处理 all_stable = True any_new = False for svo in svo_list: key = str(svo.resolve()) seen_keys.add(key) if key in processed: continue any_new = True try: st = svo.stat() except OSError: all_stable = False continue size = int(st.st_size) if size <= 0: stability.pop(key, None) all_stable = False continue last = stability.get(key) if last is None or last[0] != size: stability[key] = (size, 1) all_stable = False else: _, cnt = last cnt += 1 stability[key] = (size, cnt) if cnt < settings.measure_watch_stable_polls: all_stable = False # 如果该文件夹下有新的 SVO 文件且全部达到稳定,则批量处理 if any_new and all_stable: await _run_measure_and_state(svo_list, fish_id, settings, processed, state_file) # 清理已处理的 SVO 文件的稳定性记录 for svo in svo_list: key = str(svo.resolve()) stability.pop(key, None) did = True # 清理不再看到的文件的稳定性记录 for k in list(stability.keys()): if k not in seen_keys: del stability[k] return did async def run_measure_watch_loop(settings: Settings) -> None: assert settings.measure_watch_dir is not None wd = settings.measure_watch_dir if not wd.is_dir(): logger.warning("[measure-watch] skip: not a directory: {}", wd) return state_file = _state_path(settings) processed: Set[str] = ( load_watch_processed(settings, state_file, "measure") if settings.measure_watch_use_state_file else set() ) stability: Dict[str, tuple[int, int]] = {} logger.info( "[measure-watch] watching {} (poll={}s, stable_polls={}, state={} {})", wd, settings.measure_watch_poll_interval, settings.measure_watch_stable_polls, "on" if settings.measure_watch_use_state_file else "off", state_file if settings.measure_watch_use_state_file else "", ) idle_warn_state = IdleWatchWarnState() while True: did = await watch_tick(settings, processed, stability, state_file) maybe_warn_idle_watch( did_work=did, log_tag="measure-watch", algo_name="FishMeasure", idle_hint="目录内无 fish{N} 子文件夹、已全部处理完毕,或文件尚未达到稳定轮询次数", watch_dir=wd, state=idle_warn_state, interval_sec=_MEASURE_IDLE_WARN_INTERVAL_SEC, ) await asyncio.sleep(max(settings.measure_watch_poll_interval, 0.1))