Files
FishServer/fish_api/app/services/measure_watch.py

258 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""后台轮询目录中的 .svo2跑 FishMeasure写入 SQLite与 ingest 共用)。"""
from __future__ import annotations
import asyncio
from pathlib import Path
from typing import Dict, Set
from loguru import logger
from app.db import (
add_watch_processed,
load_watch_processed,
measure_snapshot_deliverable,
save_measure_snapshot,
)
from app.services import measure as measure_svc
from app.settings import Settings
from app.state import app_state
from app.watch_idle import IdleWatchWarnState, idle_warn_interval_sec, maybe_warn_idle_watch
_MEASURE_IDLE_WARN_INTERVAL_SEC = idle_warn_interval_sec(
"FISH_MEASURE_WATCH_IDLE_WARN_INTERVAL_SEC"
)
def _state_path(settings: Settings) -> Path:
"""返回旧版 JSON 状态文件路径(仅用于兼容导入 SQLite"""
assert settings.measure_watch_dir is not None
return settings.measure_watch_dir / ".fishmeasure_watch_processed.json"
def iter_svo2_folders(watch_dir: Path) -> list[tuple[list[Path], str]]:
"""扫描子文件夹,返回 (svo文件路径列表, fish_id) 列表。
文件夹命名格式为 fish{N},如 fish1、fish2 等。
每个子文件夹可以包含多个 .svo2 文件(同一条鱼的多段视频),
这些 SVO 文件会被批量处理,点云合并后进行重量预测。
"""
result: list[tuple[list[Path], str]] = []
if not watch_dir.is_dir():
return result
for entry in sorted(watch_dir.iterdir()):
if not entry.is_dir():
continue
# 从文件夹名提取 fish_id格式为 fish{N}
folder_name = entry.name
if not folder_name.startswith("fish"):
continue
try:
fish_id = folder_name[4:] # 去掉 "fish" 前缀
if not fish_id.isdigit():
continue
except (IndexError, ValueError):
continue
# 在子文件夹中查找所有 .svo2 文件
svo_files = sorted([
p for p in entry.iterdir()
if p.is_file() and p.suffix.lower() == ".svo2"
])
if svo_files:
# 返回该文件夹中的所有 SVO 文件,它们将被批量处理
result.append((svo_files, fish_id))
return result
async def _run_measure_and_state(
svo_list: list[Path],
fish_id: str,
settings: Settings,
processed: Set[str],
state_file: Path,
) -> None:
"""批量处理同一条鱼的多个 SVO合并点云后一次 DGCNN解析与 test_dgcnn summary 对齐。
"""
if not svo_list:
return
# 生成唯一的 key 列表(用于 processed 标记)
keys = [str(svo.resolve()) for svo in svo_list]
# 检查是否全部已处理
if all(key in processed for key in keys):
return
svo_names = ", ".join(svo.name for svo in svo_list)
logger.info("[measure-watch] batch inference for fish_id={}: {} SVO(s): {}",
fish_id, len(svo_list), svo_names)
async with app_state.measure_lock:
app_state.measure_status = "running"
try:
# 使用 batch 模式处理所有 SVO传入 fish_id 作为结果 id
snap = await asyncio.to_thread(
measure_svc.run_full_measure_batch, svo_list, settings, fish_id
)
if measure_snapshot_deliverable(snap):
# 保存结果client_id=None 表示对所有客户端可见
# fish_id 只用于 result 中的 id 字段,不作为 client_id
source_paths = "|".join(keys) # 合并所有 source_path
save_measure_snapshot(
settings, snap, source_path=source_paths, client_id=None
)
else:
logger.warning(
"[measure-watch] no deliverable measure rows for fish_id={}, skip SQLite",
fish_id,
)
app_state.measure_status = "idle"
# 标记所有 SVO 为已处理
for key in keys:
processed.add(key)
if settings.measure_watch_use_state_file:
add_watch_processed(settings, key, "measure")
r0 = snap.result[0] if snap.result else {}
w = r0.get("weight", "")
logger.info(
"[measure-watch] done: fish_id={} SVOs={} weight={!r}",
fish_id, len(svo_list), w
)
except (RuntimeError, FileNotFoundError) as e:
logger.warning("[measure-watch] measure failed for fish_id={}: {}", fish_id, e)
app_state.measure_status = "idle"
for key in keys:
processed.add(key)
if settings.measure_watch_use_state_file:
add_watch_processed(settings, key, "measure")
except Exception as e:
logger.exception("[measure-watch] error on fish_id={}: {}", fish_id, e)
app_state.measure_status = "idle"
for key in keys:
processed.add(key)
if settings.measure_watch_use_state_file:
add_watch_processed(settings, key, "measure")
async def watch_tick(
settings: Settings,
processed: Set[str],
stability: Dict[str, tuple[int, int]],
state_file: Path,
) -> bool:
"""处理一轮目录扫描;若处理了至少一个文件返回 True。
使用 batch 模式同一条鱼fish{N} 文件夹)下的所有 SVO 文件会被一起处理,
点云合并后进行重量预测(与 test_dgcnn.sh --batch-root 相同的逻辑)。
"""
assert settings.measure_watch_dir is not None
watch_dir = settings.measure_watch_dir
did = False
seen_keys: Set[str] = set()
# 使用新的子文件夹扫描方式,返回 (svo_list, fish_id)
for svo_list, fish_id in iter_svo2_folders(watch_dir):
if not svo_list:
continue
# 为该 fish 文件夹中的所有 SVO 文件计算稳定性
# 只有当所有 SVO 都达到稳定轮询次数时才处理
all_stable = True
any_new = False
for svo in svo_list:
key = str(svo.resolve())
seen_keys.add(key)
if key in processed:
continue
any_new = True
try:
st = svo.stat()
except OSError:
all_stable = False
continue
size = int(st.st_size)
if size <= 0:
stability.pop(key, None)
all_stable = False
continue
last = stability.get(key)
if last is None or last[0] != size:
stability[key] = (size, 1)
all_stable = False
else:
_, cnt = last
cnt += 1
stability[key] = (size, cnt)
if cnt < settings.measure_watch_stable_polls:
all_stable = False
# 如果该文件夹下有新的 SVO 文件且全部达到稳定,则批量处理
if any_new and all_stable:
await _run_measure_and_state(svo_list, fish_id, settings, processed, state_file)
# 清理已处理的 SVO 文件的稳定性记录
for svo in svo_list:
key = str(svo.resolve())
stability.pop(key, None)
did = True
# 清理不再看到的文件的稳定性记录
for k in list(stability.keys()):
if k not in seen_keys:
del stability[k]
return did
async def run_measure_watch_loop(settings: Settings) -> None:
assert settings.measure_watch_dir is not None
wd = settings.measure_watch_dir
if not wd.is_dir():
logger.warning("[measure-watch] skip: not a directory: {}", wd)
return
state_file = _state_path(settings)
processed: Set[str] = (
load_watch_processed(settings, state_file, "measure")
if settings.measure_watch_use_state_file
else set()
)
stability: Dict[str, tuple[int, int]] = {}
logger.info(
"[measure-watch] watching {} (poll={}s, stable_polls={}, state={} {})",
wd,
settings.measure_watch_poll_interval,
settings.measure_watch_stable_polls,
"on" if settings.measure_watch_use_state_file else "off",
state_file if settings.measure_watch_use_state_file else "",
)
idle_warn_state = IdleWatchWarnState()
while True:
did = await watch_tick(settings, processed, stability, state_file)
maybe_warn_idle_watch(
did_work=did,
log_tag="measure-watch",
algo_name="FishMeasure",
idle_hint="目录内无 fish{N} 子文件夹、已全部处理完毕,或文件尚未达到稳定轮询次数",
watch_dir=wd,
state=idle_warn_state,
interval_sec=_MEASURE_IDLE_WARN_INTERVAL_SEC,
)
await asyncio.sleep(max(settings.measure_watch_poll_interval, 0.1))