feat(fish_api): add SVO2 folder watch for FishMeasure

Add MEASURE_WATCH_* settings and measure_watch background loop parallel
to action watch. Gitignore SAM 2.4GB weight and fix corrupted DGCNN
checkpoint. Clear stale outputs for fresh rerun.

Made-with: Cursor
This commit is contained in:
zaiun xu
2026-04-08 20:35:55 +08:00
parent b573efa588
commit 7baaa72965
6 changed files with 182 additions and 28 deletions

View File

@@ -8,6 +8,7 @@ from fastapi.staticfiles import StaticFiles
from app.routers import biomass, ingest
from app.services.action_watch import run_action_watch_loop
from app.services.measure_watch import run_measure_watch_loop
from app.settings import get_settings
@@ -16,14 +17,17 @@ async def lifespan(app: FastAPI):
s = get_settings()
s.media_root.mkdir(parents=True, exist_ok=True)
s.stream_tmp_dir.mkdir(parents=True, exist_ok=True)
watch_task: asyncio.Task[None] | None = None
tasks: list[asyncio.Task[None]] = []
if s.action_watch_dir is not None:
watch_task = asyncio.create_task(run_action_watch_loop(s))
tasks.append(asyncio.create_task(run_action_watch_loop(s)))
if s.measure_watch_dir is not None:
tasks.append(asyncio.create_task(run_measure_watch_loop(s)))
yield
if watch_task is not None:
watch_task.cancel()
for t in tasks:
t.cancel()
for t in tasks:
try:
await watch_task
await t
except asyncio.CancelledError:
pass
@@ -49,4 +53,5 @@ async def root():
"ingest": "/api/v1/ingest/",
"biomass_camera": "/api/v1/biomass/real/camera/",
"biomass_health": "/api/v1/biomass/health/result/",
"note": "若配置了 ACTION_WATCH_DIR / MEASURE_WATCH_DIR启动后会后台监控对应目录。",
}

View File

@@ -0,0 +1,139 @@
"""后台轮询目录中的 .svo2跑 FishMeasure写入 app_state.last_measure与 ingest 共用状态)。"""
from __future__ import annotations
import asyncio
import traceback
from pathlib import Path
from typing import Dict, Set
from app.services import measure as measure_svc
from app.services.action_watch import load_processed, save_processed
from app.settings import Settings
from app.state import MeasureSnapshot, app_state
def _state_path(settings: Settings) -> Path:
if settings.measure_watch_state_file is not None:
return settings.measure_watch_state_file
assert settings.measure_watch_dir is not None
return settings.measure_watch_dir / ".fishmeasure_watch_processed.json"
def iter_svo2(watch_dir: Path, recursive: bool) -> list[Path]:
if recursive:
return sorted(
p
for p in watch_dir.rglob("*")
if p.is_file() and p.suffix.lower() == ".svo2"
)
return sorted(
p
for p in watch_dir.iterdir()
if p.is_file() and p.suffix.lower() == ".svo2"
)
async def _run_measure_and_state(
svo: Path,
settings: Settings,
processed: Set[str],
state_file: Path,
) -> None:
key = str(svo.resolve())
if key in processed:
return
print(f"[measure-watch] inference: {svo}", flush=True)
async with app_state.measure_lock:
app_state.measure_status = "running"
try:
snap = await asyncio.to_thread(measure_svc.run_full_measure, svo, settings)
app_state.last_measure = snap
app_state.measure_status = "idle"
processed.add(key)
if settings.measure_watch_use_state_file:
save_processed(state_file, processed)
r0 = snap.result[0] if snap.result else {}
w = r0.get("weight", "")
print(f"[measure-watch] done: {svo.name} weight={w!r}", flush=True)
except Exception as e:
app_state.last_measure = MeasureSnapshot(
result=[],
video_left="",
video_right="",
error=str(e),
)
app_state.measure_status = "error"
print(f"[measure-watch] error on {svo}: {e}", flush=True)
traceback.print_exc()
raise
async def watch_tick(
settings: Settings,
processed: Set[str],
stability: Dict[str, tuple[int, int]],
state_file: Path,
) -> bool:
assert settings.measure_watch_dir is not None
watch_dir = settings.measure_watch_dir
did = False
seen_keys: Set[str] = set()
for svo in iter_svo2(watch_dir, settings.measure_watch_recursive):
key = str(svo.resolve())
seen_keys.add(key)
if key in processed:
continue
try:
st = svo.stat()
except OSError:
continue
size = int(st.st_size)
if size <= 0:
stability.pop(key, None)
continue
last = stability.get(key)
if last is None or last[0] != size:
stability[key] = (size, 1)
else:
_, cnt = last
stability[key] = (size, cnt + 1)
_, cnt = stability[key]
if cnt >= settings.measure_watch_stable_polls:
try:
await _run_measure_and_state(svo, settings, processed, state_file)
stability.pop(key, None)
did = True
except Exception:
stability[key] = (size, 1)
for k in list(stability.keys()):
if k not in seen_keys:
del stability[k]
return did
async def run_measure_watch_loop(settings: Settings) -> None:
assert settings.measure_watch_dir is not None
wd = settings.measure_watch_dir
if not wd.is_dir():
print(f"[measure-watch] skip: not a directory: {wd}", flush=True)
return
state_file = _state_path(settings)
processed: Set[str] = (
load_processed(state_file) if settings.measure_watch_use_state_file else set()
)
stability: Dict[str, tuple[int, int]] = {}
print(
f"[measure-watch] watching {wd} "
f"(poll={settings.measure_watch_poll_interval}s, "
f"stable_polls={settings.measure_watch_stable_polls}, "
f"state={'on' if settings.measure_watch_use_state_file else 'off'} "
f"{state_file if settings.measure_watch_use_state_file else ''})",
flush=True,
)
while True:
await watch_tick(settings, processed, stability, state_file)
await asyncio.sleep(max(settings.measure_watch_poll_interval, 0.1))

View File

@@ -66,11 +66,21 @@ class Settings(BaseSettings):
action_watch_state_file: Optional[Path] = None
action_watch_use_state_file: bool = True
#: 非空时后台持续扫描该目录中的新 .svo2 并跑 FishMeasure与 ingest 共用 app_state
measure_watch_dir: Optional[Path] = None
measure_watch_poll_interval: float = Field(default=2.0, ge=0.1)
measure_watch_stable_polls: int = Field(default=3, ge=1)
measure_watch_recursive: bool = False
measure_watch_state_file: Optional[Path] = None
measure_watch_use_state_file: bool = True
default_fish_species: str = "大黄鱼"
@field_validator(
"action_watch_dir",
"action_watch_state_file",
"measure_watch_dir",
"measure_watch_state_file",
mode="before",
)
@classmethod