from __future__ import annotations import json import os import shutil import subprocess import sys from datetime import date, datetime, timezone from pathlib import Path from typing import Any, Dict, Optional, Tuple from app.settings import Settings from app.state import MeasureSnapshot def _py_exe(settings: Settings) -> str: return settings.python_fish_measure or sys.executable def run_measure_subprocess(svo_path: Path, settings: Settings) -> None: script = settings.fish_measure_root / "predict_weigth_from_svo2.py" if not script.is_file(): raise FileNotFoundError(f"Missing FishMeasure script: {script}") settings.measure_output_root.mkdir(parents=True, exist_ok=True) cmd = [ _py_exe(settings), str(script), "--svo", str(svo_path.resolve()), "--save-output", str(settings.measure_output_root.resolve()), "--yolo-model", settings.yolo_model, "--weight-checkpoint", settings.weight_checkpoint, "--conf", str(settings.predict_conf), "--imgsz", str(settings.predict_imgsz), "--sam-device", settings.sam_device, "--max-frames", str(settings.predict_max_frames), "--frame-stride", str(settings.predict_frame_stride), ] proc = subprocess.run( cmd, cwd=str(settings.fish_measure_root), env=os.environ.copy(), capture_output=True, text=True, ) if proc.returncode != 0: err = (proc.stderr or "") + (proc.stdout or "") raise RuntimeError( f"predict_weigth_from_svo2.py failed ({proc.returncode}): {err[-4000:]}" ) def _load_weight_json(svo_path: Path, settings: Settings) -> Dict[str, Any]: stem = svo_path.stem candidate = settings.measure_output_root / stem / "weight_prediction.json" if not candidate.is_file(): raise FileNotFoundError(f"Expected output missing: {candidate}") with open(candidate, encoding="utf-8") as f: return json.load(f) def _find_preview_videos(output_dir: Path) -> Tuple[Optional[Path], Optional[Path]]: previews = sorted(output_dir.glob("*preview*.mp4")) if len(previews) >= 2: return previews[0], previews[1] all_mp4 = sorted(output_dir.glob("*.mp4")) if len(all_mp4) >= 2: return all_mp4[0], all_mp4[1] if len(all_mp4) == 1: return all_mp4[0], all_mp4[0] if len(previews) == 1: return previews[0], previews[0] return None, None def _publish_media( left: Optional[Path], right: Optional[Path], settings: Settings, ) -> Tuple[str, str]: settings.media_root.mkdir(parents=True, exist_ok=True) left_dst = settings.media_root / "latest_left.mp4" right_dst = settings.media_root / "latest_right.mp4" base = settings.public_base_url.rstrip("/") def publish(src: Optional[Path], dst: Path) -> str: if src is None or not src.is_file(): return "" shutil.copy2(src, dst) return f"{base}/media/{dst.name}" vl = publish(left, left_dst) vr = publish(right, right_dst) return vl, vr def build_measure_snapshot(svo_path: Path, settings: Settings) -> MeasureSnapshot: data = _load_weight_json(svo_path, settings) summary = data.get("dgcnn_summary") or data.get("weight_summary") or {} length_mm = summary.get("avg_length_input") weight_g = summary.get("avg_predicted_weight_g") if length_mm is None: length_mm = data.get("avg_length_input") if weight_g is None: weight_g = data.get("avg_predicted_weight_g") today = date.today().isoformat() result_item = { "id": 1, "type": settings.default_fish_species, "length": "" if length_mm is None else str(int(round(float(length_mm)))), "weight": "" if weight_g is None else str(int(round(float(weight_g)))), "date": today, } out_dir = Path(data.get("output_dir", settings.measure_output_root / svo_path.stem)) lv, rv = _find_preview_videos(out_dir) v_left, v_right = _publish_media(lv, rv, settings) return MeasureSnapshot( result=[result_item], video_left=v_left, video_right=v_right, updated_at=datetime.now(timezone.utc), raw_prediction_path=str( settings.measure_output_root / svo_path.stem / "weight_prediction.json" ), ) def run_full_measure(svo_path: Path, settings: Settings) -> MeasureSnapshot: run_measure_subprocess(svo_path, settings) return build_measure_snapshot(svo_path, settings)