Files
FishServer/fish_api/app/services/sonar_optical_flow.py
2026-05-13 09:19:31 +08:00

91 lines
3.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""声呐视频发布前:在 FishMeasure ``optical_flow/visualize_optical_flow.py`` 中做稠密光流 overlay。"""
from __future__ import annotations
import importlib.util
import threading
import time
from pathlib import Path
from types import ModuleType
from typing import Optional
from loguru import logger
from app.settings import Settings
_mod_lock = threading.Lock()
_cached_mod: Optional[ModuleType] = None
_cached_mod_path: Optional[str] = None
def _load_optical_flow_module(settings: Settings) -> Optional[ModuleType]:
global _cached_mod, _cached_mod_path
path = settings.fish_measure_root / "optical_flow" / "visualize_optical_flow.py"
path_str = str(path)
with _mod_lock:
if _cached_mod is not None and _cached_mod_path == path_str:
return _cached_mod
if not path.is_file():
logger.bind(pipeline="sonar_watch").warning(
"[声呐监控] 光流模块未找到:{}", path
)
return None
spec = importlib.util.spec_from_file_location("fishmeasure_optical_flow_viz", path)
if spec is None or spec.loader is None:
return None
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
_cached_mod = mod
_cached_mod_path = path_str
return mod
def run_sonar_optical_flow_overlay(
src: Path,
flow_out: Path,
settings: Settings,
) -> bool:
"""将 ``src`` 处理为 overlay 光流视频写入 ``flow_out``。失败返回 False调用方应回退直转码"""
if not settings.biomass_sonar_optical_flow:
return False
mod = _load_optical_flow_module(settings)
if mod is None:
return False
run_fn = getattr(mod, "run_optical_flow_video", None)
log = logger.bind(pipeline="sonar_watch", source=src.name)
if not callable(run_fn):
log.warning("[声呐监控] 光流模块缺少 run_optical_flow_video")
return False
src_size_mb = src.stat().st_size / (1024 * 1024) if src.is_file() else 0
log.info(
"[声呐监控] 开始光流叠加:{} ({:.1f} MB) | resize={}",
src.name, src_size_mb, settings.biomass_sonar_optical_flow_resize,
)
t0 = time.monotonic()
try:
ok = run_fn(
src,
flow_out,
mode="overlay",
resize=float(settings.biomass_sonar_optical_flow_resize),
progress_log=False,
)
elapsed = time.monotonic() - t0
if ok:
out_size_mb = flow_out.stat().st_size / (1024 * 1024) if flow_out.is_file() else 0
log.info(
"[声呐监控] 光流完成 {:.1f}s | {} -> {} ({:.1f} MB)",
elapsed, src.name, flow_out.name, out_size_mb,
)
else:
log.warning(
"[声呐监控] 光流返回 False耗时 {:.1f}s{}", elapsed, src.name,
)
return bool(ok)
except Exception:
elapsed = time.monotonic() - t0
log.exception(
"[声呐监控] 光流处理异常(耗时 {:.1f}s{}", elapsed, src.name,
)
return False