"""声呐视频:后台处理 ``BIOMASS_SONAR_VIDEO_DIR`` 中的**当前最新**视频文件。 支持 MP4、MKV、MOV。MP4/MOV 在录制中缺少 ``moov`` atom,须等录完才能处理; MKV 的元数据写在文件头,录制中即可读取,无需等待。 每次轮询取目录中 **mtime 最新** 的文件;当其 **(path, size)** 变化时,用 ffmpeg ``-sseof`` 截取**最后 N 秒**(默认 60,见 ``BIOMASS_SONAR_VIDEO_SLICE_SEC``),再对该 切片做光流 overlay → H.264 转码并原子替换发布。``GET /api/v1/biomass/sonar/video/`` 立即返回最近成功发布的 URL。 """ from __future__ import annotations import asyncio import datetime import subprocess import time from pathlib import Path from typing import Optional, Tuple from loguru import logger from app.compat import to_thread from app.services.action_watch import iter_mp4 from app.services.measure import transcode_src_to_h264_dst from app.services.sonar_optical_flow import run_sonar_optical_flow_overlay from app.services.video_slice import _get_ffmpeg_path from app.settings import Settings DEFAULT_CLIENT_ID = "default" # --- published state (read by GET endpoint) --- _published_url: str = "" _published_lock = asyncio.Lock() def _public_media_url(settings: Settings, basename: str) -> str: base = settings.public_base_url.rstrip("/") return f"{base}/media/{basename}" def _safe_sonar_media_basename(raw: str) -> str: n = (raw or "").strip() if not n: return "biomass_sonar.mp4" return Path(n).name or "biomass_sonar.mp4" # Formats that store metadata at the start → readable while recording _STREAMING_SUFFIXES = frozenset({".mkv", ".ts", ".webm"}) # --------------------------------------------------------------------------- # Probe: is the video file ready to process? # --------------------------------------------------------------------------- def _is_ready_to_process(path: Path) -> bool: """MKV/TS/WebM are always readable (metadata at start); MP4/MOV need moov at end.""" if path.suffix.lower() in _STREAMING_SUFFIXES: return path.is_file() and path.stat().st_size > 0 return _probe_moov_readable(path) def _probe_moov_readable(path: Path) -> bool: """Quick check via ffprobe (fallback cv2): does the MP4/MOV have a moov atom?""" try: r = subprocess.run( ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(path)], capture_output=True, timeout=5, ) if r.returncode == 0 and r.stdout.strip(): return True logger.debug("[sonar-watch] ffprobe: moov missing: {}", path.name) return False except FileNotFoundError: pass except Exception as e: logger.debug("[sonar-watch] ffprobe failed ({}), trying cv2", e) try: import cv2 cap = cv2.VideoCapture(str(path)) ok = cap.isOpened() and cap.get(cv2.CAP_PROP_FRAME_COUNT) > 0 cap.release() if not ok: logger.debug("[sonar-watch] cv2: unreadable: {}", path.name) return ok except Exception: return False def _extract_tail_slice(src: Path, slice_out: Path, duration_sec: float) -> bool: """Extract last ``duration_sec`` seconds with ``ffmpeg -sseof`` + stream copy. For growing MKV files ``ffprobe`` often returns ``N/A`` for duration, so we always attempt ``-sseof`` first (ffmpeg clamps to file start when the file is shorter than the requested window). Only when ``-sseof`` fails do we fall back to a plain ``-c copy`` of the entire file. """ slice_out.parent.mkdir(parents=True, exist_ok=True) slice_out.unlink(missing_ok=True) if not src.is_file() or src.stat().st_size <= 0: return False ffmpeg = _get_ffmpeg_path() sec = float(duration_sec) # --- primary: -sseof (works on growing MKV even when duration is unknown) --- sseof_cmd = [ ffmpeg, "-y", "-hide_banner", "-loglevel", "error", "-sseof", f"-{sec}", "-i", str(src), "-t", str(sec), "-c", "copy", "-avoid_negative_ts", "make_zero", str(slice_out), ] try: r = subprocess.run(sseof_cmd, capture_output=True, text=True, timeout=600) if r.returncode == 0 and slice_out.is_file() and slice_out.stat().st_size > 0: logger.debug("[sonar-watch] tail slice ok (-sseof {:.0f}s): {} -> {}", sec, src.name, slice_out.name) return True except Exception as e: logger.debug("[sonar-watch] tail slice -sseof error: {} ({})", src.name, e) # --- fallback: copy entire file (source shorter than window or -sseof unsupported) --- slice_out.unlink(missing_ok=True) copy_cmd = [ ffmpeg, "-y", "-hide_banner", "-loglevel", "error", "-i", str(src), "-c", "copy", str(slice_out), ] try: r = subprocess.run(copy_cmd, capture_output=True, text=True, timeout=600) if r.returncode == 0 and slice_out.is_file() and slice_out.stat().st_size > 0: logger.debug("[sonar-watch] tail slice ok (full copy fallback): {} -> {}", src.name, slice_out.name) return True except Exception as e: logger.warning("[sonar-watch] tail slice fallback error: {} ({})", src.name, e) logger.warning("[sonar-watch] tail slice failed for {} | stderr={}", src.name, (r.stderr or "")[:500]) slice_out.unlink(missing_ok=True) return False # --------------------------------------------------------------------------- # Publish pipeline: tail slice → optical flow (optional) → H.264 transcode → atomic replace # --------------------------------------------------------------------------- async def _publish_video( src: Path, media_root: Path, dst_stem: str, settings: Settings, ) -> Optional[Path]: """Extract tail slice → optical-flow → H.264 transcode → verify → save. Every output file gets a unique timestamp so successive cycles never overwrite each other: * ``_optical_flow_.mp4`` — optical-flow overlay (kept) * ``_.mp4`` — final H.264 output (kept, published) Returns the path of the published file, or ``None`` on failure. """ src_size_mb = src.stat().st_size / (1024 * 1024) if src.is_file() else 0 slice_sec = float(settings.biomass_sonar_video_slice_sec) logger.info( "[sonar-watch] processing: {} ({:.1f} MB), slice_sec={:.0f}, optical_flow={}", src.name, src_size_mb, slice_sec, settings.biomass_sonar_optical_flow, ) t0 = time.monotonic() ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") slice_tmp = media_root / f"{dst_stem}_tail_slice.mkv" slice_tmp.unlink(missing_ok=True) tmp = media_root / f"{dst_stem}_tmp.mp4" tmp.unlink(missing_ok=True) try: slice_ok = await to_thread(_extract_tail_slice, src, slice_tmp, slice_sec) if not slice_ok: logger.warning("[sonar-watch] tail extract failed, skip publish: {}", src.name) return None transcode_src = slice_tmp if settings.biomass_sonar_optical_flow: flow_dst = media_root / f"{dst_stem}_optical_flow_{ts}.mp4" flow_tmp = media_root / f"{dst_stem}_flow_tmp.mp4" flow_tmp.unlink(missing_ok=True) flow_ok = await to_thread( run_sonar_optical_flow_overlay, slice_tmp, flow_tmp, settings, ) if flow_ok and flow_tmp.is_file() and flow_tmp.stat().st_size > 0: flow_tmp.replace(flow_dst) transcode_src = flow_dst logger.info("[sonar-watch] optical flow saved: {} ({:.1f} MB)", flow_dst.name, flow_dst.stat().st_size / (1024 * 1024)) else: flow_tmp.unlink(missing_ok=True) logger.warning("[sonar-watch] optical flow failed, transcoding raw slice: {}", src.name) ok = await to_thread(transcode_src_to_h264_dst, transcode_src, tmp) if not (ok and tmp.is_file() and tmp.stat().st_size > 0): tmp.unlink(missing_ok=True) logger.warning("[sonar-watch] transcode failed for {}", src.name) return None if not _probe_moov_readable(tmp): logger.warning("[sonar-watch] transcoded file not playable, discarding: {}", src.name) tmp.unlink(missing_ok=True) return None dst = media_root / f"{dst_stem}_{ts}.mp4" tmp.replace(dst) elapsed = time.monotonic() - t0 dst_mb = dst.stat().st_size / (1024 * 1024) logger.info("[sonar-watch] published in {:.1f}s: {} -> {} ({:.1f} MB)", elapsed, src.name, dst.name, dst_mb) return dst except Exception: logger.exception("[sonar-watch] publish exception for {}", src.name) tmp.unlink(missing_ok=True) return None finally: slice_tmp.unlink(missing_ok=True) # --------------------------------------------------------------------------- # Background watcher loop (started from lifespan, like action_watch) # --------------------------------------------------------------------------- async def run_sonar_video_watch_loop(settings: Settings) -> None: """Poll ``BIOMASS_SONAR_VIDEO_DIR``, follow the **latest** file by mtime. When that file's ``(resolved_path, size)`` changes and the file is ready to read, extract tail slice → publish. Each published video gets a unique timestamped filename; the GET endpoint always returns the most recent one. """ global _published_url d = settings.biomass_sonar_video_dir if d is None: logger.info("[sonar-watch] BIOMASS_SONAR_VIDEO_DIR not set, sonar watch disabled") return poll = max(1.0, settings.biomass_sonar_video_poll_interval) basename = _safe_sonar_media_basename(settings.biomass_sonar_video_media_name) dst_stem = Path(basename).stem # e.g. "biomass_sonar" media_root = settings.media_root media_root.mkdir(parents=True, exist_ok=True) # Seed published URL from the newest existing published file in media_root existing = sorted( media_root.glob(f"{dst_stem}_*.mp4"), key=lambda p: p.stat().st_mtime, ) # Exclude intermediates (_tmp, _flow_tmp, _optical_flow_, _tail_slice) existing = [ p for p in existing if not any(tag in p.stem for tag in ("_tmp", "_flow_tmp", "_optical_flow_", "_tail_slice")) ] if existing: seed = existing[-1] async with _published_lock: _published_url = _public_media_url(settings, seed.name) logger.info("[sonar-watch] seeded from existing: {}", _published_url) last_published_key: Optional[Tuple[str, int]] = None logger.info("[sonar-watch] watching {} (poll={:.0f}s, recursive={})", d, poll, settings.biomass_sonar_video_recursive) while True: try: if d.is_dir(): all_videos = iter_mp4(d, settings.biomass_sonar_video_recursive) if all_videos: latest = max(all_videos, key=lambda p: p.stat().st_mtime) try: rp = str(latest.resolve()) sz = latest.stat().st_size except OSError: await asyncio.sleep(poll) continue if sz > 0 and last_published_key != (rp, sz): ready = await to_thread(_is_ready_to_process, latest) if ready: published = await _publish_video( latest, media_root, dst_stem, settings, ) if published is not None: async with _published_lock: _published_url = _public_media_url( settings, published.name, ) last_published_key = (rp, sz) else: logger.debug( "[sonar-watch] {} not ready yet (still recording?), waiting", latest.name, ) except asyncio.CancelledError: raise except Exception: logger.exception("[sonar-watch] unexpected error in watch loop") await asyncio.sleep(poll) # --------------------------------------------------------------------------- # GET endpoint helper (called from biomass router, no change to API contract) # --------------------------------------------------------------------------- async def get_sonar_video_public_url( settings: Settings, _client_id: str = DEFAULT_CLIENT_ID, ) -> str: """Return the URL of the latest successfully published sonar video, or ``""``.""" async with _published_lock: return _published_url