From 170febf0b0cbc8f469ef4c4d39b64a2e52f294e5 Mon Sep 17 00:00:00 2001 From: guest Date: Thu, 14 May 2026 17:17:10 +0800 Subject: [PATCH] everything works --- fish_api/app/routers/biomass.py | 3 +- fish_api/app/services/sonar_video.py | 369 +++++++++++++++++++++++---- fish_api/app/settings.py | 31 +++ 3 files changed, 350 insertions(+), 53 deletions(-) diff --git a/fish_api/app/routers/biomass.py b/fish_api/app/routers/biomass.py index 2d44461..a8a72ba 100644 --- a/fish_api/app/routers/biomass.py +++ b/fish_api/app/routers/biomass.py @@ -178,7 +178,8 @@ async def get_sonar_video( settings: Settings = Depends(get_settings), client_id: str = Depends(_resolve_client_id), ): - """声呐视频:与水面相同经 ffmpeg 转 H.264 后托管在 /media/;整段一个文件,每次返回同一 `video_path` URL。""" + """声呐视频:经 ffmpeg 切片(顺序完整块或末尾 N 秒,见 BIOMASS_SONAR_SLICE_ORDER)后 + 托管在 /media/;每次返回最近成功发布的 ``video_path`` URL。""" video_path = await get_sonar_video_public_url(settings, client_id) return JSONResponse( content={ diff --git a/fish_api/app/services/sonar_video.py b/fish_api/app/services/sonar_video.py index 240f945..e35075a 100644 --- a/fish_api/app/services/sonar_video.py +++ b/fish_api/app/services/sonar_video.py @@ -1,18 +1,23 @@ -"""声呐视频:后台处理 ``BIOMASS_SONAR_VIDEO_DIR`` 中的**当前最新**视频文件。 +"""声呐视频:后台处理 ``BIOMASS_SONAR_VIDEO_DIR`` 中的**当前 mtime 最新**视频文件。 支持 MP4、MKV、MOV。MP4/MOV 在录制中缺少 ``moov`` atom,须等录完才能处理; MKV 的元数据写在文件头,录制中即可读取,无需等待。 -每次轮询取目录中 **mtime 最新** 的文件;当其 **(path, size)** 变化时,用 ffmpeg -``-sseof`` 截取**最后 N 秒**(默认 60,见 ``BIOMASS_SONAR_VIDEO_SLICE_SEC``),再对该 -切片做光流 overlay → H.264 转码并原子替换发布。``GET /api/v1/biomass/sonar/video/`` -立即返回最近成功发布的 URL。 +切片策略由 ``Settings.biomass_sonar_slice_order`` 决定: + +* **sequential**(默认):从 t=0 起按 ``BIOMASS_SONAR_VIDEO_SLICE_SEC`` 顺序切**完整** + 块(ffprobe 可得 duration 且剩余不足一块则不发布);增长中的文件随 duration 增大继续切。 +* **tail**:当原文件 **(path, size)** 变化时,用 ``-sseof`` 取**最后 N 秒**再处理。 + +之后对切片做光流 overlay(可选)→ H.264 转码并发布。``GET .../sonar/video/`` 返回 +最近一次成功发布的 URL。 """ from __future__ import annotations import asyncio import datetime +import math import subprocess import time from pathlib import Path @@ -111,6 +116,109 @@ def _probe_moov_readable(path: Path) -> bool: return False +def _probe_media_duration_sec(path: Path) -> Optional[float]: + """ffprobe ``format=duration``;无法解析(N/A、moov 未就绪等)时返回 None。""" + ffprobe = _get_ffprobe_path() + try: + r = subprocess.run( + [ + ffprobe, + "-v", + "error", + "-show_entries", + "format=duration", + "-of", + "default=noprint_wrappers=1:nokey=1", + str(path), + ], + capture_output=True, + text=True, + timeout=30, + ) + if r.returncode != 0: + return None + raw = (r.stdout or "").strip() + if not raw or raw.lower() in ("n/a", "nan"): + return None + d = float(raw) + if not math.isfinite(d) or d <= 0: + return None + return d + except (ValueError, subprocess.TimeoutExpired, OSError): + return None + + +def _extract_range_slice( + src: Path, + slice_out: Path, + start_sec: float, + duration_sec: float, +) -> bool: + """Extract ``[start_sec, start_sec + duration_sec)`` with stream copy (``-ss`` before ``-i``).""" + slice_out.parent.mkdir(parents=True, exist_ok=True) + slice_out.unlink(missing_ok=True) + if not src.is_file() or src.stat().st_size <= 0: + return False + start = float(start_sec) + dur = float(duration_sec) + if not math.isfinite(start) or start < 0 or not math.isfinite(dur) or dur <= 0: + return False + + ffmpeg = _get_ffmpeg_path() + log = logger.bind(pipeline="sonar_watch", source=src.name) + log.info( + "[声呐监控] 顺序切片提取 | ffmpeg={} | {:.3f}s–{:.3f}s({:.0f}s)| {} -> {}", + ffmpeg, + start, + start + dur, + dur, + src.name, + slice_out.name, + ) + cmd = [ + ffmpeg, + "-y", + "-hide_banner", + "-loglevel", + "error", + "-ss", + str(start), + "-t", + str(dur), + "-i", + str(src), + "-c", + "copy", + "-avoid_negative_ts", + "make_zero", + str(slice_out), + ] + r: subprocess.CompletedProcess[str] + try: + r = subprocess.run(cmd, capture_output=True, text=True, timeout=600) + if r.returncode == 0 and slice_out.is_file() and slice_out.stat().st_size > 0: + log.info( + "[声呐监控] 顺序切片成功 | {} -> {}({} 字节)", + src.name, + slice_out.name, + slice_out.stat().st_size, + ) + return True + except Exception as e: + log.debug("[声呐监控] 顺序切片异常:{} | {}", src.name, e) + slice_out.unlink(missing_ok=True) + return False + + tail = (r.stderr or "")[-500:] + log.warning( + "[声呐监控] 顺序切片失败 | {} | stderr尾={}", + src.name, + tail, + ) + slice_out.unlink(missing_ok=True) + return False + + def _extract_tail_slice(src: Path, slice_out: Path, duration_sec: float) -> bool: """Extract last ``duration_sec`` seconds with ``ffmpeg -sseof`` + stream copy. @@ -191,7 +299,7 @@ def _extract_tail_slice(src: Path, slice_out: Path, duration_sec: float) -> bool # --------------------------------------------------------------------------- -# Publish pipeline: tail slice → optical flow (optional) → H.264 transcode → atomic replace +# Publish pipeline: slice (tail or sequential) → optical flow (optional) → H.264 # --------------------------------------------------------------------------- async def _publish_video( @@ -199,52 +307,104 @@ async def _publish_video( media_root: Path, dst_stem: str, settings: Settings, + *, + range_start_sec: Optional[float] = None, ) -> Optional[Path]: - """Extract tail slice → optical-flow → H.264 transcode → verify → save. + """Extract a slice → optional optical-flow → H.264 transcode → verify → save. - Every output file gets a unique timestamp so successive cycles never - overwrite each other: - - * ``_optical_flow_.mp4`` — optical-flow overlay (kept) - * ``_.mp4`` — final H.264 output (kept, published) + * ``tail`` order: ignores ``range_start_sec``; uses ``-sseof`` last N seconds. + * ``sequential`` order: requires ``range_start_sec``; uses ``-ss`` / ``-t`` copy. Returns the path of the published file, or ``None`` on failure. """ + order = settings.biomass_sonar_slice_order src_size_mb = src.stat().st_size / (1024 * 1024) if src.is_file() else 0 slice_sec = float(settings.biomass_sonar_video_slice_sec) rid = new_run_id("sonar_watch") log = logger.bind(pipeline="sonar_watch", run_id=rid, source=src.name) - log.info( - "[声呐监控] 开始处理:{} ({:.1f} MB) | 尾段={:.0f}s | 光流={}", - src.name, src_size_mb, slice_sec, settings.biomass_sonar_optical_flow, - ) + + if order == "sequential": + if range_start_sec is None: + log.error("[声呐监控] sequential 模式缺少 range_start_sec:{}", src.name) + return None + log.info( + "[声呐监控] 开始处理:{} ({:.1f} MB) | 顺序切片 t={:.3f}s | 块长={:.0f}s | 光流={}", + src.name, + src_size_mb, + range_start_sec, + slice_sec, + settings.biomass_sonar_optical_flow, + ) + else: + log.info( + "[声呐监控] 开始处理:{} ({:.1f} MB) | 尾段={:.0f}s | 光流={}", + src.name, + src_size_mb, + slice_sec, + settings.biomass_sonar_optical_flow, + ) + t0 = time.monotonic() - ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") - slice_tmp = media_root / f"{dst_stem}_tail_slice.mkv" slice_tmp.unlink(missing_ok=True) tmp = media_root / f"{dst_stem}_tmp.mp4" tmp.unlink(missing_ok=True) + extract_metrics = { + "slice_sec": slice_sec, + "src_mb": round(src_size_mb, 2), + "slice_order": order, + } + if order == "sequential" and range_start_sec is not None: + extract_metrics["range_start_sec"] = round(range_start_sec, 4) + try: - with stage( - f"声呐尾段提取({src.name})", - pipeline="sonar_watch", - step="extract_tail", - run_id=rid, - source=src.name, - metrics={"slice_sec": slice_sec, "src_mb": round(src_size_mb, 2)}, - raise_on_error=False, - ): - slice_ok = await to_thread(_extract_tail_slice, src, slice_tmp, slice_sec) + if order == "tail": + stage_title = f"声呐尾段提取({src.name})" + extract_step = "extract_tail" + with stage( + stage_title, + pipeline="sonar_watch", + step=extract_step, + run_id=rid, + source=src.name, + metrics=extract_metrics, + raise_on_error=False, + ): + slice_ok = await to_thread( + _extract_tail_slice, src, slice_tmp, slice_sec, + ) + else: + assert range_start_sec is not None + stage_title = ( + f"声呐顺序切片 {range_start_sec:.1f}s–{range_start_sec + slice_sec:.1f}s" + f"({src.name})" + ) + with stage( + stage_title, + pipeline="sonar_watch", + step="extract_sequential", + run_id=rid, + source=src.name, + metrics=extract_metrics, + raise_on_error=False, + ): + slice_ok = await to_thread( + _extract_range_slice, + src, + slice_tmp, + range_start_sec, + slice_sec, + ) + if not slice_ok: - log.warning("[声呐监控] 尾段提取失败,跳过发布:{}", src.name) + log.warning("[声呐监控] 切片提取失败,跳过发布:{}", src.name) return None if slice_tmp.is_file(): log.info( - "[声呐监控] 尾段切片已就绪 | {}({:.2f} MB)→ 后续光流/H.264", + "[声呐监控] 切片已就绪 | {}({:.2f} MB)→ 后续光流/H.264", slice_tmp.name, slice_tmp.stat().st_size / (1024 * 1024), ) @@ -271,7 +431,8 @@ async def _publish_video( transcode_src = flow_dst log.info( "[声呐监控] 光流文件已保存:{} ({:.1f} MB)", - flow_dst.name, flow_dst.stat().st_size / (1024 * 1024), + flow_dst.name, + flow_dst.stat().st_size / (1024 * 1024), ) else: flow_tmp.unlink(missing_ok=True) @@ -316,7 +477,16 @@ async def _publish_video( tmp.replace(dst) elapsed = time.monotonic() - t0 dst_mb = dst.stat().st_size / (1024 * 1024) - # 发布完成事件 + + pub_metrics = { + "src_mb": round(src_size_mb, 2), + "dst_mb": round(dst_mb, 2), + "dst_name": dst.name, + "slice_order": order, + } + if order == "sequential" and range_start_sec is not None: + pub_metrics["range_start_sec"] = round(range_start_sec, 4) + logger.bind( event=True, pipeline="sonar_watch", @@ -325,14 +495,14 @@ async def _publish_video( source=src.name, status="success", duration_ms=round(elapsed * 1000.0, 3), - metrics={ - "src_mb": round(src_size_mb, 2), - "dst_mb": round(dst_mb, 2), - "dst_name": dst.name, - }, + metrics=pub_metrics, ).info( "[声呐监控] 发布完成 | 源 {} ({:.1f} MB) → {} ({:.1f} MB) | 耗时={:.1f}s", - src.name, src_size_mb, dst.name, dst_mb, elapsed, + src.name, + src_size_mb, + dst.name, + dst_mb, + elapsed, ) return dst except Exception: @@ -350,9 +520,10 @@ async def _publish_video( async def run_sonar_video_watch_loop(settings: Settings) -> None: """Poll ``BIOMASS_SONAR_VIDEO_DIR``, follow the **latest** file by mtime. - When that file's ``(resolved_path, size)`` changes and the file is ready to read, - extract tail slice → publish. Each published video gets a unique timestamped - filename; the GET endpoint always returns the most recent one. + * ``tail``: when ``(resolved_path, size)`` changes and the file is ready, run one + tail-``N``-second publish. + * ``sequential``: maintain a cursor from t=0; each poll publishes up to + ``max_chunks_per_poll`` full ``N``-second blocks while ``duration`` allows. """ global _published_url, _last_sonar_skip_logged_path @@ -364,21 +535,24 @@ async def run_sonar_video_watch_loop(settings: Settings) -> None: return poll = max(1.0, settings.biomass_sonar_video_poll_interval) + slice_order = settings.biomass_sonar_slice_order + max_chunks = settings.biomass_sonar_max_chunks_per_poll basename = _safe_sonar_media_basename(settings.biomass_sonar_video_media_name) - dst_stem = Path(basename).stem # e.g. "biomass_sonar" + dst_stem = Path(basename).stem media_root = settings.media_root media_root.mkdir(parents=True, exist_ok=True) - # Seed published URL from the newest existing published file in media_root existing = sorted( media_root.glob(f"{dst_stem}_*.mp4"), key=lambda p: p.stat().st_mtime, ) - # Exclude intermediates (_tmp, _flow_tmp, _optical_flow_, _tail_slice) existing = [ p for p in existing - if not any(tag in p.stem for tag in ("_tmp", "_flow_tmp", "_optical_flow_", "_tail_slice")) + if not any( + tag in p.stem + for tag in ("_tmp", "_flow_tmp", "_optical_flow_", "_tail_slice") + ) ] if existing: seed = existing[-1] @@ -389,10 +563,17 @@ async def run_sonar_video_watch_loop(settings: Settings) -> None: ) last_published_key: Optional[Tuple[str, int]] = None + seq_cursor_rp: Optional[str] = None + seq_next_start: float = 0.0 logger.bind(pipeline="sonar_watch").info( - "[声呐监控] 监控目录已启动 | dir={} | poll={:.0f}s | recursive={}", - d, poll, settings.biomass_sonar_video_recursive, + "[声呐监控] 监控目录已启动 | dir={} | poll={:.0f}s | recursive={} | " + "slice_order={} | max_chunks_per_poll={}", + d, + poll, + settings.biomass_sonar_video_recursive, + slice_order, + max_chunks, ) while True: @@ -405,15 +586,20 @@ async def run_sonar_video_watch_loop(settings: Settings) -> None: rp = str(latest.resolve()) sz = latest.stat().st_size except OSError: - await asyncio.sleep(poll) continue - if sz > 0 and last_published_key != (rp, sz): + if sz <= 0: + continue + + cycle = logger.bind(pipeline="sonar_watch", source=latest.name) + + if slice_order == "tail": + if last_published_key == (rp, sz): + continue ready = await to_thread(_is_ready_to_process, latest) - cycle = logger.bind(pipeline="sonar_watch", source=latest.name) if ready: cycle.info( - "[声呐监控] 源文件已就绪,进入发布管线 | {:.2f} MB | {}", + "[声呐监控] 源文件已就绪,进入发布管线(尾段)| {:.2f} MB | {}", sz / (1024 * 1024), rp, ) @@ -429,7 +615,7 @@ async def run_sonar_video_watch_loop(settings: Settings) -> None: _last_sonar_skip_logged_path = None else: cycle.warning( - "[声呐监控] 发布失败(尾段/光流/转码见上方步骤)| {}", + "[声呐监控] 发布失败(切片/光流/转码见上方)| {}", latest.name, ) else: @@ -445,6 +631,85 @@ async def run_sonar_video_watch_loop(settings: Settings) -> None: "[声呐监控] 仍在等待就绪:{}", latest.name, ) + else: + if rp != seq_cursor_rp: + seq_cursor_rp = rp + seq_next_start = 0.0 + cycle.info( + "[声呐监控] 顺序模式:跟踪新文件,游标归零 | {}", + latest.name, + ) + + ready = await to_thread(_is_ready_to_process, latest) + if not ready: + if _last_sonar_skip_logged_path != rp: + _last_sonar_skip_logged_path = rp + cycle.info( + "[声呐监控] 最新文件尚未可处理(录制中或 MP4/MOV 缺 moov)," + "等待:{}", + latest.name, + ) + else: + cycle.debug( + "[声呐监控] 仍在等待就绪:{}", + latest.name, + ) + else: + duration = await to_thread( + _probe_media_duration_sec, latest, + ) + slice_sec = float(settings.biomass_sonar_video_slice_sec) + if duration is None: + cycle.warning( + "[声呐监控] 顺序模式:无法读取 duration(ffprobe)," + "跳过本周期 | {}", + latest.name, + ) + elif seq_next_start + slice_sec > duration + 1e-3: + cycle.debug( + "[声呐监控] 顺序模式:无完整 {:.0f}s 块可切 " + "(cursor={:.3f}s duration={:.3f}s)| {}", + slice_sec, + seq_next_start, + duration, + latest.name, + ) + else: + _last_sonar_skip_logged_path = None + n_done = 0 + while ( + seq_next_start + slice_sec <= duration + 1e-3 + ): + if max_chunks > 0 and n_done >= max_chunks: + break + cycle.info( + "[声呐监控] 顺序发布 | t={:.3f}s | " + "duration={:.3f}s | {}", + seq_next_start, + duration, + latest.name, + ) + published = await _publish_video( + latest, + media_root, + dst_stem, + settings, + range_start_sec=seq_next_start, + ) + if published is None: + cycle.warning( + "[声呐监控] 顺序发布失败 @ t={:.3f}s | {}", + seq_next_start, + latest.name, + ) + break + seq_next_start += slice_sec + n_done += 1 + async with _published_lock: + _published_url = _public_media_url( + settings, published.name, + ) + await asyncio.sleep(0) else: logger.bind(pipeline="sonar_watch").debug( diff --git a/fish_api/app/settings.py b/fish_api/app/settings.py index 2835821..24903ca 100644 --- a/fish_api/app/settings.py +++ b/fish_api/app/settings.py @@ -285,6 +285,22 @@ class Settings(BaseSettings): "BIOMASS_SONAR_VIDEO_SLICE_SEC", "biomass_sonar_video_slice_sec" ), ) + #: 声呐切片顺序:``tail`` = 每次取文件末尾 N 秒(``-sseof``);``sequential`` = 从 t=0 起按 N 秒顺序切完整块(不足一块不发布)。**BIOMASS_SONAR_SLICE_ORDER** + biomass_sonar_slice_order: str = Field( + default="sequential", + validation_alias=AliasChoices( + "BIOMASS_SONAR_SLICE_ORDER", "biomass_sonar_slice_order" + ), + ) + #: 顺序模式下每轮监控循环最多发布几块完整切片;``0`` = 不限制(长视频可能长时间占住循环)。**BIOMASS_SONAR_MAX_CHUNKS_PER_POLL** + biomass_sonar_max_chunks_per_poll: int = Field( + default=1, + ge=0, + validation_alias=AliasChoices( + "BIOMASS_SONAR_MAX_CHUNKS_PER_POLL", + "biomass_sonar_max_chunks_per_poll", + ), + ) #: 发布到 MEDIA_ROOT 的 H.264 文件名。**BIOMASS_SONAR_VIDEO_MEDIA_NAME** biomass_sonar_video_media_name: str = "biomass_sonar.mp4" #: 为 True 时声呐发布管线在 ffmpeg 转 H.264 之前先做 Farneback 光流 overlay(与 ``/sonar/video/`` 返回的仍是同一 ``video_path`` 字段)。**BIOMASS_SONAR_OPTICAL_FLOW** @@ -396,6 +412,21 @@ class Settings(BaseSettings): return None return v + @field_validator("biomass_sonar_slice_order", mode="before") + @classmethod + def _normalize_biomass_sonar_slice_order(cls, v: object) -> str: + if v is None: + return "sequential" + s = str(v).strip().lower() + if s in frozenset({"tail", "last", "last_n", "sseof"}): + return "tail" + if s in frozenset({"sequential", "seq", "from_start"}): + return "sequential" + raise ValueError( + "biomass_sonar_slice_order must be 'tail' or 'sequential' " + f"(got {v!r})" + ) + @field_validator( "action_watch_dir", "biomass_water_video_source",