Files
FishServer/fish_api/app/services/measure.py
2026-04-08 21:05:45 +08:00

192 lines
6.0 KiB
Python

from __future__ import annotations
import json
import os
import shutil
import subprocess
import sys
from datetime import date, datetime, timezone
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
from app.settings import Settings
from app.state import MeasureSnapshot
def _py_exe(settings: Settings) -> str:
return settings.python_fish_measure or sys.executable
def run_measure_subprocess(svo_path: Path, settings: Settings) -> None:
script = settings.fish_measure_root / "predict_weigth_from_svo2.py"
if not script.is_file():
raise FileNotFoundError(f"Missing FishMeasure script: {script}")
settings.measure_output_root.mkdir(parents=True, exist_ok=True)
cmd = [
_py_exe(settings),
str(script),
"--svo",
str(svo_path.resolve()),
"--save-output",
str(settings.measure_output_root.resolve()),
"--yolo-model",
settings.yolo_model,
"--weight-checkpoint",
settings.weight_checkpoint,
"--conf",
str(settings.predict_conf),
"--imgsz",
str(settings.predict_imgsz),
"--sam-device",
settings.sam_device,
"--max-frames",
str(settings.predict_max_frames),
"--frame-stride",
str(settings.predict_frame_stride),
]
proc = subprocess.run(
cmd,
cwd=str(settings.fish_measure_root),
env=os.environ.copy(),
capture_output=True,
text=True,
)
if proc.returncode != 0:
err = (proc.stderr or "") + (proc.stdout or "")
raise RuntimeError(
f"predict_weigth_from_svo2.py failed ({proc.returncode}): {err[-4000:]}"
)
def _load_weight_json(svo_path: Path, settings: Settings) -> Dict[str, Any]:
stem = svo_path.stem
candidate = settings.measure_output_root / stem / "weight_prediction.json"
if not candidate.is_file():
raise FileNotFoundError(f"Expected output missing: {candidate}")
with open(candidate, encoding="utf-8") as f:
return json.load(f)
def _find_preview_videos(output_dir: Path) -> Tuple[Optional[Path], Optional[Path]]:
previews = sorted(output_dir.rglob("*preview*.mp4"))
if len(previews) >= 2:
return previews[0], previews[1]
all_mp4 = sorted(output_dir.rglob("*.mp4"))
if len(all_mp4) >= 2:
return all_mp4[0], all_mp4[1]
if len(all_mp4) == 1:
return all_mp4[0], all_mp4[0]
if len(previews) == 1:
return previews[0], previews[0]
return None, None
def _split_sbs_video(src: Path, left_dst: Path, right_dst: Path) -> bool:
"""Split a side-by-side stereo video (W x H where W == 2*H_single) into left/right halves.
Returns True if split succeeded, False otherwise (caller should fall back to copy).
"""
probe = subprocess.run(
[
"ffprobe", "-v", "quiet", "-print_format", "json",
"-show_streams", str(src),
],
capture_output=True, text=True,
)
if probe.returncode != 0:
return False
import json as _json
try:
streams = _json.loads(probe.stdout).get("streams", [])
vstream = next((s for s in streams if s.get("codec_type") == "video"), None)
if vstream is None:
return False
w, h = int(vstream["width"]), int(vstream["height"])
except Exception:
return False
half_w = w // 2
if half_w < 1 or w < h:
return False
for crop, dst in [
(f"crop={half_w}:{h}:{half_w}:0", left_dst),
(f"crop={half_w}:{h}:0:0", right_dst),
]:
r = subprocess.run(
["ffmpeg", "-y", "-i", str(src), "-vf", crop, "-an", "-q:v", "5", str(dst)],
capture_output=True, text=True,
)
if r.returncode != 0:
return False
return True
def _publish_media(
left: Optional[Path],
right: Optional[Path],
settings: Settings,
) -> Tuple[str, str]:
settings.media_root.mkdir(parents=True, exist_ok=True)
left_dst = settings.media_root / "latest_left.mp4"
right_dst = settings.media_root / "latest_right.mp4"
base = settings.public_base_url.rstrip("/")
if left is not None and left == right and left.is_file():
if _split_sbs_video(left, left_dst, right_dst):
return (
f"{base}/media/{left_dst.name}",
f"{base}/media/{right_dst.name}",
)
def publish(src: Optional[Path], dst: Path) -> str:
if src is None or not src.is_file():
return ""
shutil.copy2(src, dst)
return f"{base}/media/{dst.name}"
vl = publish(left, left_dst)
vr = publish(right, right_dst)
return vl, vr
def build_measure_snapshot(svo_path: Path, settings: Settings) -> MeasureSnapshot:
data = _load_weight_json(svo_path, settings)
summary = data.get("dgcnn_summary") or data.get("weight_summary") or {}
length_mm = summary.get("avg_length_input")
weight_g = summary.get("avg_predicted_weight_g")
if length_mm is None:
length_mm = data.get("avg_length_input")
if weight_g is None:
weight_g = data.get("avg_predicted_weight_g")
today = date.today().isoformat()
result_item = {
"id": 1,
"type": settings.default_fish_species,
"length": "" if length_mm is None else str(int(round(float(length_mm)))),
"weight": "" if weight_g is None else str(int(round(float(weight_g)))),
"date": today,
}
out_dir = Path(data.get("output_dir", settings.measure_output_root / svo_path.stem))
lv, rv = _find_preview_videos(out_dir)
v_left, v_right = _publish_media(lv, rv, settings)
return MeasureSnapshot(
result=[result_item],
video_left=v_left,
video_right=v_right,
updated_at=datetime.now(timezone.utc),
raw_prediction_path=str(
settings.measure_output_root / svo_path.stem / "weight_prediction.json"
),
)
def run_full_measure(svo_path: Path, settings: Settings) -> MeasureSnapshot:
run_measure_subprocess(svo_path, settings)
return build_measure_snapshot(svo_path, settings)