diff --git a/.gitignore b/.gitignore index db48a2d..dbafff1 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,9 @@ build/ .ruff_cache/ *.egg +# Large model weights (download or copy from backup) +FishMeasure/sam_vit_h_4b8939.pth + # Local / runtime outputs (regenerate on server) FishMeasure/output_weight_estimator/ FishMeasure/output-yolo-sam/ diff --git a/FishMeasure/sam_vit_h_4b8939.pth b/FishMeasure/sam_vit_h_4b8939.pth deleted file mode 100755 index 69df95f..0000000 Binary files a/FishMeasure/sam_vit_h_4b8939.pth and /dev/null differ diff --git a/fish_api/app/main.py b/fish_api/app/main.py index bc98cc1..76febdd 100644 --- a/fish_api/app/main.py +++ b/fish_api/app/main.py @@ -8,6 +8,7 @@ from fastapi.staticfiles import StaticFiles from app.routers import biomass, ingest from app.services.action_watch import run_action_watch_loop +from app.services.measure_watch import run_measure_watch_loop from app.settings import get_settings @@ -16,14 +17,17 @@ async def lifespan(app: FastAPI): s = get_settings() s.media_root.mkdir(parents=True, exist_ok=True) s.stream_tmp_dir.mkdir(parents=True, exist_ok=True) - watch_task: asyncio.Task[None] | None = None + tasks: list[asyncio.Task[None]] = [] if s.action_watch_dir is not None: - watch_task = asyncio.create_task(run_action_watch_loop(s)) + tasks.append(asyncio.create_task(run_action_watch_loop(s))) + if s.measure_watch_dir is not None: + tasks.append(asyncio.create_task(run_measure_watch_loop(s))) yield - if watch_task is not None: - watch_task.cancel() + for t in tasks: + t.cancel() + for t in tasks: try: - await watch_task + await t except asyncio.CancelledError: pass @@ -49,4 +53,5 @@ async def root(): "ingest": "/api/v1/ingest/", "biomass_camera": "/api/v1/biomass/real/camera/", "biomass_health": "/api/v1/biomass/health/result/", + "note": "若配置了 ACTION_WATCH_DIR / MEASURE_WATCH_DIR,启动后会后台监控对应目录。", } diff --git a/fish_api/app/services/measure_watch.py b/fish_api/app/services/measure_watch.py new file mode 100644 index 0000000..89c1f53 --- /dev/null +++ b/fish_api/app/services/measure_watch.py @@ -0,0 +1,139 @@ +"""后台轮询目录中的 .svo2,跑 FishMeasure,写入 app_state.last_measure(与 ingest 共用状态)。""" + +from __future__ import annotations + +import asyncio +import traceback +from pathlib import Path +from typing import Dict, Set + +from app.services import measure as measure_svc +from app.services.action_watch import load_processed, save_processed +from app.settings import Settings +from app.state import MeasureSnapshot, app_state + + +def _state_path(settings: Settings) -> Path: + if settings.measure_watch_state_file is not None: + return settings.measure_watch_state_file + assert settings.measure_watch_dir is not None + return settings.measure_watch_dir / ".fishmeasure_watch_processed.json" + + +def iter_svo2(watch_dir: Path, recursive: bool) -> list[Path]: + if recursive: + return sorted( + p + for p in watch_dir.rglob("*") + if p.is_file() and p.suffix.lower() == ".svo2" + ) + return sorted( + p + for p in watch_dir.iterdir() + if p.is_file() and p.suffix.lower() == ".svo2" + ) + + +async def _run_measure_and_state( + svo: Path, + settings: Settings, + processed: Set[str], + state_file: Path, +) -> None: + key = str(svo.resolve()) + if key in processed: + return + print(f"[measure-watch] inference: {svo}", flush=True) + async with app_state.measure_lock: + app_state.measure_status = "running" + try: + snap = await asyncio.to_thread(measure_svc.run_full_measure, svo, settings) + app_state.last_measure = snap + app_state.measure_status = "idle" + processed.add(key) + if settings.measure_watch_use_state_file: + save_processed(state_file, processed) + r0 = snap.result[0] if snap.result else {} + w = r0.get("weight", "") + print(f"[measure-watch] done: {svo.name} weight={w!r}", flush=True) + except Exception as e: + app_state.last_measure = MeasureSnapshot( + result=[], + video_left="", + video_right="", + error=str(e), + ) + app_state.measure_status = "error" + print(f"[measure-watch] error on {svo}: {e}", flush=True) + traceback.print_exc() + raise + + +async def watch_tick( + settings: Settings, + processed: Set[str], + stability: Dict[str, tuple[int, int]], + state_file: Path, +) -> bool: + assert settings.measure_watch_dir is not None + watch_dir = settings.measure_watch_dir + did = False + seen_keys: Set[str] = set() + for svo in iter_svo2(watch_dir, settings.measure_watch_recursive): + key = str(svo.resolve()) + seen_keys.add(key) + if key in processed: + continue + try: + st = svo.stat() + except OSError: + continue + size = int(st.st_size) + if size <= 0: + stability.pop(key, None) + continue + last = stability.get(key) + if last is None or last[0] != size: + stability[key] = (size, 1) + else: + _, cnt = last + stability[key] = (size, cnt + 1) + _, cnt = stability[key] + if cnt >= settings.measure_watch_stable_polls: + try: + await _run_measure_and_state(svo, settings, processed, state_file) + stability.pop(key, None) + did = True + except Exception: + stability[key] = (size, 1) + for k in list(stability.keys()): + if k not in seen_keys: + del stability[k] + return did + + +async def run_measure_watch_loop(settings: Settings) -> None: + assert settings.measure_watch_dir is not None + wd = settings.measure_watch_dir + if not wd.is_dir(): + print(f"[measure-watch] skip: not a directory: {wd}", flush=True) + return + + state_file = _state_path(settings) + processed: Set[str] = ( + load_processed(state_file) if settings.measure_watch_use_state_file else set() + ) + stability: Dict[str, tuple[int, int]] = {} + + print( + f"[measure-watch] watching {wd} " + f"(poll={settings.measure_watch_poll_interval}s, " + f"stable_polls={settings.measure_watch_stable_polls}, " + f"state={'on' if settings.measure_watch_use_state_file else 'off'} " + f"{state_file if settings.measure_watch_use_state_file else ''})", + flush=True, + ) + + while True: + await watch_tick(settings, processed, stability, state_file) + await asyncio.sleep(max(settings.measure_watch_poll_interval, 0.1)) diff --git a/fish_api/app/settings.py b/fish_api/app/settings.py index 5eab214..3641781 100644 --- a/fish_api/app/settings.py +++ b/fish_api/app/settings.py @@ -66,11 +66,21 @@ class Settings(BaseSettings): action_watch_state_file: Optional[Path] = None action_watch_use_state_file: bool = True + #: 非空时后台持续扫描该目录中的新 .svo2 并跑 FishMeasure(与 ingest 共用 app_state) + measure_watch_dir: Optional[Path] = None + measure_watch_poll_interval: float = Field(default=2.0, ge=0.1) + measure_watch_stable_polls: int = Field(default=3, ge=1) + measure_watch_recursive: bool = False + measure_watch_state_file: Optional[Path] = None + measure_watch_use_state_file: bool = True + default_fish_species: str = "大黄鱼" @field_validator( "action_watch_dir", "action_watch_state_file", + "measure_watch_dir", + "measure_watch_state_file", mode="before", ) @classmethod diff --git a/mockdata/README.md b/mockdata/README.md index 154074a..2924775 100644 --- a/mockdata/README.md +++ b/mockdata/README.md @@ -1,30 +1,27 @@ # mockdata -用于本地联调 **FishAction 目录监控**(`ACTION_WATCH_DIR`)的示例视频。 -目录内 **除本 README 外** 的大文件已加入 `.gitignore`,不会随仓库克隆;请在本机从训练集拷贝或自行放入 MP4。 +本地联调用目录(**除本 README 外** 的大文件已在 `.gitignore` 中忽略)。 -## 本机已拷贝的样本(来源:`~/data/fish/fish_action_videos/`) +## FishAction(MP4) -| 文件 | 说明 | -|------|------| -| `fish_action_feeding_sample.mp4` | feeding,约 1.5MB | -| `fish_action_feeding_02.mp4` | feeding,约 1.7MB | -| `fish_action_feeding_03.mp4` | feeding,约 2.3MB | -| `fish_action_scared_sample.mp4` | scared,约 14MB | -| `fish_action_normal_sample.mp4` | normal,约 59MB | - -## FishMeasure(SVO2) - -大体积 `.svo2` 未放入本目录。本机示例: - -`/home/ubuntu/data/fish/2016-1-22-last/fish17/HD1080_SN43186771_13-23-08.svo2` - -可通过 ingest 分块上传做称重联调。 - -## `.env` - -`ACTION_WATCH_DIR` 指向本目录绝对路径,例如: +`ACTION_WATCH_DIR` 指向本目录根路径,例如: `ACTION_WATCH_DIR=/home/ubuntu/projects/FishServer/mockdata` -监控已处理列表会生成 `mockdata/.fishaction_watch_processed.json`(同样被 gitignore 忽略)。 +轮询顶层 `*.mp4`。已处理列表:`mockdata/.fishaction_watch_processed.json`。 + +## FishMeasure(SVO2) + +子目录 **`svo_inbox/`** 中放置 `*.svo2`,并单独配置: + +`MEASURE_WATCH_DIR=/home/ubuntu/projects/FishServer/mockdata/svo_inbox` + +已处理列表:`svo_inbox/.fishmeasure_watch_processed.json`。 + +本机拷贝的示例(约 26MB,来源 `~/data/fish/2016-1-22-last/fish18/`): + +| 文件 | 说明 | +|------|------| +| `svo_inbox/sample_fish_01.svo2` | ZED SVO2 样本 | + +更多 `.svo2` 可从 `~/data/fish/2016-1-22-last/` 各子目录自行复制进来。