Compare commits

...

2 Commits

Author SHA1 Message Date
guest
170febf0b0 everything works 2026-05-14 17:17:10 +08:00
guest
8cb1f1e654 can run 2026-05-14 16:49:05 +08:00
6 changed files with 614 additions and 99 deletions

View File

@@ -2,6 +2,12 @@
CLEAR_SQLITE_DATABASE=1 CLEAR_MEASURE_OUTPUT=1 CLEAR_ACTION_OUTPUT=1 CLEAR_MEDIA=1 CLEAR_STREAM_TMP=1 LD_PRELOAD=/lib/aarch64-linux-gnu/libGLdispatch.so.0 bash scripts/start_fresh.sh
CLEAR_SQLITE_DATABASE=1 CLEAR_MEASURE_OUTPUT=1 CLEAR_ACTION_OUTPUT=1 CLEAR_MEDIA=1 CLEAR_STREAM_TMP=1 bash scripts/start_fresh.sh
## 配置(环境变量)

View File

@@ -178,7 +178,8 @@ async def get_sonar_video(
settings: Settings = Depends(get_settings),
client_id: str = Depends(_resolve_client_id),
):
"""声呐视频:与水面相同经 ffmpeg 转 H.264 后托管在 /media/;整段一个文件,每次返回同一 `video_path` URL。"""
"""声呐视频:经 ffmpeg 切片(顺序完整块或末尾 N 秒,见 BIOMASS_SONAR_SLICE_ORDER
托管在 /media/;每次返回最近成功发布的 ``video_path`` URL。"""
video_path = await get_sonar_video_public_url(settings, client_id)
return JSONResponse(
content={

View File

@@ -18,6 +18,7 @@ from app.db import (
from app.logging_config import new_run_id, stage
from app.services import action as action_svc
from app.services.measure import transcode_src_to_h264_dst
from app.services.video_slice import _get_ffmpeg_path
from app.settings import Settings
from app.state import app_state
from app.watch_idle import IdleWatchWarnState, idle_warn_interval_sec, maybe_warn_idle_watch
@@ -73,6 +74,12 @@ def _publish_slice_video(
tmp.unlink(missing_ok=True)
log = logger.bind(pipeline="action_watch", source=src.name)
try:
log.info(
"[行为监控] 切片 H.264 转码开始 | ffmpeg={} | {} -> {}",
_get_ffmpeg_path(),
src.name,
tmp.name,
)
ok = transcode_src_to_h264_dst(src, tmp)
if ok and tmp.is_file() and tmp.stat().st_size > 0:
tmp.replace(dst)
@@ -81,7 +88,9 @@ def _publish_slice_video(
tmp.unlink(missing_ok=True)
shutil.copy2(src, dst)
log.warning(
"[行为监控] 转码失败,已直接拷贝原文件:{} -> {}", src.name, dst.name,
"[行为监控] 转码失败,已直接拷贝原文件:{} -> {}(详见 measure 管线日志)",
src.name,
dst.name,
)
return _public_media_url(settings, dst.name)
except Exception:

View File

@@ -1268,6 +1268,18 @@ def _split_sbs_video(src: Path, left_dst: Path, right_dst: Path) -> bool:
return False
encoder, encoder_options, _ = _get_h264_encoder()
sb_log = logger.bind(pipeline="measure")
if encoder:
sb_log.info(
"[FishMeasure] SBS 双目拆分 | 源={} | ffmpeg={} | 编码器={}",
src.name, ffmpeg_path, encoder,
)
else:
sb_log.warning(
"[FishMeasure] SBS 双目拆分 | 源={} | ffmpeg={} | 无 H.264 编码器,"
"将用 ffmpeg 默认视频编码(常为 mpeg4非 H.264",
src.name, ffmpeg_path,
)
for crop, dst in [
(f"crop={half_w}:{h}:{half_w}:0", left_dst),
(f"crop={half_w}:{h}:0:0", right_dst),
@@ -1281,7 +1293,18 @@ def _split_sbs_video(src: Path, left_dst: Path, right_dst: Path) -> bool:
cmd.append(str(dst))
r = subprocess.run(cmd, capture_output=True, text=True)
if r.returncode != 0:
tail = (r.stderr or "")[-500:]
sb_log.warning(
"[FishMeasure] SBS 拆分 ffmpeg 失败 rc={} | {} -> {} | stderr尾={}",
r.returncode, src.name, dst.name, tail,
)
return False
sb_log.info(
"[FishMeasure] SBS 拆分完成 | 源={} | 输出={}, {}",
src.name,
left_dst.name,
right_dst.name,
)
return True
@@ -1310,17 +1333,41 @@ def _get_h264_encoder() -> Tuple[str, List[str], str]:
]
ffmpeg_path = _get_ffmpeg_path()
enc_log = logger.bind(pipeline="measure")
try:
result = subprocess.run(
[ffmpeg_path, "-encoders"],
capture_output=True, text=True, timeout=10
)
if result.returncode != 0:
enc_log.warning(
"[FishMeasure] ffmpeg -encoders 失败 rc={} path={} | stderr尾={}",
result.returncode,
ffmpeg_path,
(result.stderr or "")[-300:],
)
return "", [], ffmpeg_path
encoders_output = result.stdout
for encoder, options in encoders_to_try:
if encoder in encoders_output:
enc_log.debug(
"[FishMeasure] 检测到 H.264 编码器:{}ffmpeg={}",
encoder,
ffmpeg_path,
)
return encoder, options, ffmpeg_path
except Exception:
pass
enc_log.warning(
"[FishMeasure] ffmpeg 无可用 H.264 编码器(需 libx264 / h264_nvenc / libopenh264{}",
ffmpeg_path,
)
except subprocess.TimeoutExpired:
enc_log.warning(
"[FishMeasure] ffmpeg -encoders 超时 path={}", ffmpeg_path
)
except Exception as e:
enc_log.warning(
"[FishMeasure] ffmpeg -encoders 异常 path={}{}", ffmpeg_path, e
)
return "", [], ffmpeg_path
@@ -1484,6 +1531,17 @@ def _transcode_with_x264(src: Path, dst: Path) -> bool:
)
return False
logger.bind(pipeline="measure").info(
"[FishMeasure] x264 CLI 转码 | ffmpeg={} | x264={} | {}x{} {:.2f}fps | {} -> {}",
ffmpeg_path,
x264_path,
width,
height,
fps,
src.name,
dst.name,
)
tmp_yuv = None
try:
# 创建临时 YUV 文件
@@ -1557,6 +1615,15 @@ def _transcode_fallback(src: Path, dst: Path) -> bool:
if not encoder:
return False
fb = logger.bind(pipeline="measure")
fb.info(
"[FishMeasure] 帧序列回退转码 | ffmpeg={} | 编码器={} | {} -> {}",
ffmpeg_path,
encoder,
src.name,
dst.name,
)
tmp_dir = tempfile.mkdtemp()
try:
# 步骤1: 提取帧为 jpg 序列
@@ -1614,26 +1681,44 @@ def _transcode_fallback(src: Path, dst: Path) -> bool:
shutil.rmtree(tmp_dir, ignore_errors=True)
def _is_already_h264(src: Path) -> bool:
"""Quick ffprobe check: is the video stream already H.264?"""
def _ffprobe_video_codec_name(src: Path) -> Optional[str]:
"""返回首路视频流的 codec_name如 h264、mpeg4失败时 None。"""
try:
from app.services.video_slice import _get_ffprobe_path
r = subprocess.run(
[_get_ffprobe_path(), "-v", "error",
"-select_streams", "v:0",
"-show_entries", "stream=codec_name",
"-of", "default=noprint_wrappers=1:nokey=1",
str(src)],
capture_output=True, text=True, timeout=5,
[
_get_ffprobe_path(),
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=codec_name",
"-of",
"default=noprint_wrappers=1:nokey=1",
str(src),
],
capture_output=True,
text=True,
timeout=5,
)
return r.returncode == 0 and r.stdout.strip() == "h264"
if r.returncode != 0:
return None
name = r.stdout.strip()
return name or None
except Exception:
return False
return None
def _is_already_h264(src: Path) -> bool:
"""Quick ffprobe check: is the video stream already H.264?"""
return _ffprobe_video_codec_name(src) == "h264"
def _remux_h264_faststart(src: Path, dst: Path) -> bool:
"""Stream-copy an already-H.264 file into an MP4 with moov at the front."""
ffmpeg_path = _get_ffmpeg_path()
rmx = logger.bind(pipeline="measure")
try:
r = subprocess.run(
[ffmpeg_path, "-y", "-i", str(src),
@@ -1642,13 +1727,27 @@ def _remux_h264_faststart(src: Path, dst: Path) -> bool:
capture_output=True, text=True, timeout=60,
)
if r.returncode == 0 and dst.is_file() and dst.stat().st_size > 0:
logger.bind(pipeline="measure").info(
"[FishMeasure] 已是 H.264,仅重封装:{} -> {}{} 字节)",
src.name, dst.name, dst.stat().st_size,
rmx.info(
"[FishMeasure] 已是 H.264,仅重封装:{} -> {}{} 字节)| ffmpeg={}",
src.name, dst.name, dst.stat().st_size, ffmpeg_path,
)
return True
except Exception:
pass
rmx.warning(
"[FishMeasure] H.264 重封装失败 rc={} | ffmpeg={} | {} -> {} | stderr尾={}",
r.returncode,
ffmpeg_path,
src.name,
dst.name,
(r.stderr or "")[-400:],
)
except Exception as e:
rmx.warning(
"[FishMeasure] H.264 重封装异常 ffmpeg={} | {} -> {}{}",
ffmpeg_path,
src.name,
dst.name,
e,
)
return False
@@ -1658,10 +1757,22 @@ def _transcode_to_h264(src: Path, dst: Path) -> bool:
If the source is already H.264, remux without re-encoding (instant).
Otherwise try Jetson NVENC hardware, then fall back to ffmpeg software.
"""
if _is_already_h264(src) and _remux_h264_faststart(src, dst):
src_codec = _ffprobe_video_codec_name(src)
ffmpeg_bin = _get_ffmpeg_path()
tc = logger.bind(pipeline="measure")
tc.info(
"[FishMeasure] H.264 管线开始 | 源编码={} | ffmpeg={} | {} -> {}",
src_codec or "未知",
ffmpeg_bin,
src.name,
dst.name,
)
if src_codec == "h264" and _remux_h264_faststart(src, dst):
return True
if _transcode_with_gst_nvenc(src, dst):
tc.info("[FishMeasure] H.264 管线结束GStreamer NVENC| {}", dst.name)
return True
encoder, encoder_options, ffmpeg_path = _get_h264_encoder()
@@ -1679,40 +1790,47 @@ def _transcode_to_h264(src: Path, dst: Path) -> bool:
cmd.extend(encoder_options)
cmd.append(str(dst))
logger.bind(pipeline="measure").info(
"[FishMeasure] 使用 {} 编码器({})转码:{} -> {}",
encoder, ffmpeg_path, src.name, dst.name,
tc.info(
"[FishMeasure] ffmpeg 软件转码 | 编码器={} | ffmpeg={} | {} -> {}",
encoder,
ffmpeg_path,
src.name,
dst.name,
)
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=300
)
if result.returncode == 0 and dst.is_file():
logger.bind(pipeline="measure").info(
"[FishMeasure] 转码成功:{}{} 字节)",
dst.name, dst.stat().st_size,
out_codec = _ffprobe_video_codec_name(dst) or "?"
tc.info(
"[FishMeasure] 转码成功:{}{} 字节)| 输出编码={} | 编码器={}",
dst.name,
dst.stat().st_size,
out_codec,
encoder,
)
return True
else:
stderr = result.stderr[-500:] if result.stderr else "Unknown error"
logger.bind(pipeline="measure").warning(
tc.warning(
"[FishMeasure] 直接转码失败,尝试帧序列回退:{}", stderr
)
if _transcode_fallback(src, dst):
return True
logger.bind(pipeline="measure").info(
tc.info(
"[FishMeasure] 帧序列回退失败,尝试 x264 命令行..."
)
return _transcode_with_x264(src, dst)
except Exception as e:
logger.bind(pipeline="measure").warning(
tc.warning(
"[FishMeasure] 转码异常:{}", str(e)
)
if _transcode_fallback(src, dst):
return True
return _transcode_with_x264(src, dst)
else:
logger.bind(pipeline="measure").warning(
tc.warning(
"[FishMeasure] ffmpeg 中无可用 H.264 编码器,尝试 x264 命令行..."
)
return _transcode_with_x264(src, dst)
@@ -1750,10 +1868,13 @@ def _publish_media(
"[FishMeasure] 转码为 H.264 完成:{} -> {}", src.name, dst.name
)
else:
src_codec = _ffprobe_video_codec_name(src) or "未知"
shutil.copy2(src, dst)
logger.bind(pipeline="measure").warning(
"[FishMeasure] 转码失败,直接拷贝原文件:{} -> {}",
src.name, dst.name,
"[FishMeasure] 转码失败,直接拷贝原文件(源编码={}{} -> {}",
src_codec,
src.name,
dst.name,
)
return f"{base}/media/{dst.name}"

View File

@@ -1,18 +1,23 @@
"""声呐视频:后台处理 ``BIOMASS_SONAR_VIDEO_DIR`` 中的**当前最新**视频文件。
"""声呐视频:后台处理 ``BIOMASS_SONAR_VIDEO_DIR`` 中的**当前 mtime 最新**视频文件。
支持 MP4、MKV、MOV。MP4/MOV 在录制中缺少 ``moov`` atom须等录完才能处理
MKV 的元数据写在文件头,录制中即可读取,无需等待。
每次轮询取目录中 **mtime 最新** 的文件;当其 **(path, size)** 变化时,用 ffmpeg
``-sseof`` 截取**最后 N 秒**(默认 60见 ``BIOMASS_SONAR_VIDEO_SLICE_SEC``),再对该
切片做光流 overlay → H.264 转码并原子替换发布。``GET /api/v1/biomass/sonar/video/``
立即返回最近成功发布的 URL
切片策略由 ``Settings.biomass_sonar_slice_order`` 决定:
* **sequential**(默认):从 t=0 起按 ``BIOMASS_SONAR_VIDEO_SLICE_SEC`` 顺序切**完整**
ffprobe 可得 duration 且剩余不足一块则不发布);增长中的文件随 duration 增大继续切
* **tail**:当原文件 **(path, size)** 变化时,用 ``-sseof`` 取**最后 N 秒**再处理。
之后对切片做光流 overlay可选→ H.264 转码并发布。``GET .../sonar/video/`` 返回
最近一次成功发布的 URL。
"""
from __future__ import annotations
import asyncio
import datetime
import math
import subprocess
import time
from pathlib import Path
@@ -23,9 +28,9 @@ from loguru import logger
from app.compat import to_thread
from app.logging_config import new_run_id, stage
from app.services.action_watch import iter_mp4
from app.services.measure import transcode_src_to_h264_dst
from app.services.measure import _ffprobe_video_codec_name, transcode_src_to_h264_dst
from app.services.sonar_optical_flow import run_sonar_optical_flow_overlay
from app.services.video_slice import _get_ffmpeg_path
from app.services.video_slice import _get_ffmpeg_path, _get_ffprobe_path
from app.settings import Settings
DEFAULT_CLIENT_ID = "default"
@@ -34,6 +39,9 @@ DEFAULT_CLIENT_ID = "default"
_published_url: str = ""
_published_lock = asyncio.Lock()
# 避免对同一「未就绪」文件每个 poll 都打 INFO
_last_sonar_skip_logged_path: Optional[str] = None
def _public_media_url(settings: Settings, basename: str) -> str:
base = settings.public_base_url.rstrip("/")
@@ -65,20 +73,36 @@ def _is_ready_to_process(path: Path) -> bool:
def _probe_moov_readable(path: Path) -> bool:
"""Quick check via ffprobe (fallback cv2): does the MP4/MOV have a moov atom?"""
log = logger.bind(pipeline="sonar_watch", source=path.name)
ffprobe = _get_ffprobe_path()
try:
r = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", str(path)],
capture_output=True, timeout=5,
[
ffprobe, "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", str(path),
],
capture_output=True,
text=True,
timeout=5,
)
if r.returncode == 0 and r.stdout.strip():
return True
log.debug("[声呐监控] ffprobemoov 缺失:{}", path.name)
log.debug(
"[声呐监控] ffprobe 无有效 duration可能缺 moov| ffprobe={} | {}",
ffprobe,
path.name,
)
return False
except FileNotFoundError:
pass
log.warning(
"[声呐监控] 未找到 ffprobe与 FFMPEG_PATH 同目录或 PATH| 配置路径={}",
ffprobe,
)
except Exception as e:
log.debug("[声呐监控] ffprobe 失败({}),改用 cv2 探测", e)
log.debug(
"[声呐监控] ffprobe 失败({}),改用 cv2 探测 | ffprobe={}",
e,
ffprobe,
)
try:
import cv2
@@ -92,6 +116,109 @@ def _probe_moov_readable(path: Path) -> bool:
return False
def _probe_media_duration_sec(path: Path) -> Optional[float]:
"""ffprobe ``format=duration``无法解析N/A、moov 未就绪等)时返回 None。"""
ffprobe = _get_ffprobe_path()
try:
r = subprocess.run(
[
ffprobe,
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
str(path),
],
capture_output=True,
text=True,
timeout=30,
)
if r.returncode != 0:
return None
raw = (r.stdout or "").strip()
if not raw or raw.lower() in ("n/a", "nan"):
return None
d = float(raw)
if not math.isfinite(d) or d <= 0:
return None
return d
except (ValueError, subprocess.TimeoutExpired, OSError):
return None
def _extract_range_slice(
src: Path,
slice_out: Path,
start_sec: float,
duration_sec: float,
) -> bool:
"""Extract ``[start_sec, start_sec + duration_sec)`` with stream copy (``-ss`` before ``-i``)."""
slice_out.parent.mkdir(parents=True, exist_ok=True)
slice_out.unlink(missing_ok=True)
if not src.is_file() or src.stat().st_size <= 0:
return False
start = float(start_sec)
dur = float(duration_sec)
if not math.isfinite(start) or start < 0 or not math.isfinite(dur) or dur <= 0:
return False
ffmpeg = _get_ffmpeg_path()
log = logger.bind(pipeline="sonar_watch", source=src.name)
log.info(
"[声呐监控] 顺序切片提取 | ffmpeg={} | {:.3f}s{:.3f}s{:.0f}s| {} -> {}",
ffmpeg,
start,
start + dur,
dur,
src.name,
slice_out.name,
)
cmd = [
ffmpeg,
"-y",
"-hide_banner",
"-loglevel",
"error",
"-ss",
str(start),
"-t",
str(dur),
"-i",
str(src),
"-c",
"copy",
"-avoid_negative_ts",
"make_zero",
str(slice_out),
]
r: subprocess.CompletedProcess[str]
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
if r.returncode == 0 and slice_out.is_file() and slice_out.stat().st_size > 0:
log.info(
"[声呐监控] 顺序切片成功 | {} -> {}{} 字节)",
src.name,
slice_out.name,
slice_out.stat().st_size,
)
return True
except Exception as e:
log.debug("[声呐监控] 顺序切片异常:{} | {}", src.name, e)
slice_out.unlink(missing_ok=True)
return False
tail = (r.stderr or "")[-500:]
log.warning(
"[声呐监控] 顺序切片失败 | {} | stderr尾={}",
src.name,
tail,
)
slice_out.unlink(missing_ok=True)
return False
def _extract_tail_slice(src: Path, slice_out: Path, duration_sec: float) -> bool:
"""Extract last ``duration_sec`` seconds with ``ffmpeg -sseof`` + stream copy.
@@ -107,6 +234,14 @@ def _extract_tail_slice(src: Path, slice_out: Path, duration_sec: float) -> bool
ffmpeg = _get_ffmpeg_path()
sec = float(duration_sec)
log = logger.bind(pipeline="sonar_watch", source=src.name)
log.info(
"[声呐监控] 尾段提取开始 | ffmpeg={} | 最后 {:.0f}s | {} -> {}",
ffmpeg,
sec,
src.name,
slice_out.name,
)
# --- primary: -sseof (works on growing MKV even when duration is unknown) ---
sseof_cmd = [
@@ -118,13 +253,16 @@ def _extract_tail_slice(src: Path, slice_out: Path, duration_sec: float) -> bool
"-avoid_negative_ts", "make_zero",
str(slice_out),
]
log = logger.bind(pipeline="sonar_watch", source=src.name)
try:
r = subprocess.run(sseof_cmd, capture_output=True, text=True, timeout=600)
if r.returncode == 0 and slice_out.is_file() and slice_out.stat().st_size > 0:
log.debug(
"[声呐监控] 尾段提取成功(-sseof {:.0f}s{} -> {}",
sec, src.name, slice_out.name,
log.info(
"[声呐监控] 尾段提取成功(-sseof {:.0f}s copy| ffmpeg={} | {} -> {}{} 字节)",
sec,
ffmpeg,
src.name,
slice_out.name,
slice_out.stat().st_size,
)
return True
except Exception as e:
@@ -141,9 +279,12 @@ def _extract_tail_slice(src: Path, slice_out: Path, duration_sec: float) -> bool
try:
r = subprocess.run(copy_cmd, capture_output=True, text=True, timeout=600)
if r.returncode == 0 and slice_out.is_file() and slice_out.stat().st_size > 0:
log.debug(
"[声呐监控] 尾段提取成功(整段拷贝回退):{} -> {}",
src.name, slice_out.name,
log.info(
"[声呐监控] 尾段提取成功(整段 -c copy 回退)| ffmpeg={} | {} -> {}{} 字节)",
ffmpeg,
src.name,
slice_out.name,
slice_out.stat().st_size,
)
return True
except Exception as e:
@@ -158,7 +299,7 @@ def _extract_tail_slice(src: Path, slice_out: Path, duration_sec: float) -> bool
# ---------------------------------------------------------------------------
# Publish pipeline: tail slice → optical flow (optional) → H.264 transcode → atomic replace
# Publish pipeline: slice (tail or sequential) → optical flow (optional) → H.264
# ---------------------------------------------------------------------------
async def _publish_video(
@@ -166,49 +307,108 @@ async def _publish_video(
media_root: Path,
dst_stem: str,
settings: Settings,
*,
range_start_sec: Optional[float] = None,
) -> Optional[Path]:
"""Extract tail slice → optical-flow → H.264 transcode → verify → save.
"""Extract a slice → optional optical-flow → H.264 transcode → verify → save.
Every output file gets a unique timestamp so successive cycles never
overwrite each other:
* ``<dst_stem>_optical_flow_<ts>.mp4`` — optical-flow overlay (kept)
* ``<dst_stem>_<ts>.mp4`` — final H.264 output (kept, published)
* ``tail`` order: ignores ``range_start_sec``; uses ``-sseof`` last N seconds.
* ``sequential`` order: requires ``range_start_sec``; uses ``-ss`` / ``-t`` copy.
Returns the path of the published file, or ``None`` on failure.
"""
order = settings.biomass_sonar_slice_order
src_size_mb = src.stat().st_size / (1024 * 1024) if src.is_file() else 0
slice_sec = float(settings.biomass_sonar_video_slice_sec)
rid = new_run_id("sonar_watch")
log = logger.bind(pipeline="sonar_watch", run_id=rid, source=src.name)
log.info(
"[声呐监控] 开始处理:{} ({:.1f} MB) | 尾段={:.0f}s | 光流={}",
src.name, src_size_mb, slice_sec, settings.biomass_sonar_optical_flow,
)
if order == "sequential":
if range_start_sec is None:
log.error("[声呐监控] sequential 模式缺少 range_start_sec{}", src.name)
return None
log.info(
"[声呐监控] 开始处理:{} ({:.1f} MB) | 顺序切片 t={:.3f}s | 块长={:.0f}s | 光流={}",
src.name,
src_size_mb,
range_start_sec,
slice_sec,
settings.biomass_sonar_optical_flow,
)
else:
log.info(
"[声呐监控] 开始处理:{} ({:.1f} MB) | 尾段={:.0f}s | 光流={}",
src.name,
src_size_mb,
slice_sec,
settings.biomass_sonar_optical_flow,
)
t0 = time.monotonic()
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
slice_tmp = media_root / f"{dst_stem}_tail_slice.mkv"
slice_tmp.unlink(missing_ok=True)
tmp = media_root / f"{dst_stem}_tmp.mp4"
tmp.unlink(missing_ok=True)
extract_metrics = {
"slice_sec": slice_sec,
"src_mb": round(src_size_mb, 2),
"slice_order": order,
}
if order == "sequential" and range_start_sec is not None:
extract_metrics["range_start_sec"] = round(range_start_sec, 4)
try:
with stage(
f"声呐尾段提取({src.name}",
pipeline="sonar_watch",
step="extract_tail",
run_id=rid,
source=src.name,
metrics={"slice_sec": slice_sec, "src_mb": round(src_size_mb, 2)},
raise_on_error=False,
):
slice_ok = await to_thread(_extract_tail_slice, src, slice_tmp, slice_sec)
if order == "tail":
stage_title = f"声呐尾段提取({src.name}"
extract_step = "extract_tail"
with stage(
stage_title,
pipeline="sonar_watch",
step=extract_step,
run_id=rid,
source=src.name,
metrics=extract_metrics,
raise_on_error=False,
):
slice_ok = await to_thread(
_extract_tail_slice, src, slice_tmp, slice_sec,
)
else:
assert range_start_sec is not None
stage_title = (
f"声呐顺序切片 {range_start_sec:.1f}s{range_start_sec + slice_sec:.1f}s"
f"{src.name}"
)
with stage(
stage_title,
pipeline="sonar_watch",
step="extract_sequential",
run_id=rid,
source=src.name,
metrics=extract_metrics,
raise_on_error=False,
):
slice_ok = await to_thread(
_extract_range_slice,
src,
slice_tmp,
range_start_sec,
slice_sec,
)
if not slice_ok:
log.warning("[声呐监控] 尾段提取失败,跳过发布:{}", src.name)
log.warning("[声呐监控] 切片提取失败,跳过发布:{}", src.name)
return None
if slice_tmp.is_file():
log.info(
"[声呐监控] 切片已就绪 | {}{:.2f} MB→ 后续光流/H.264",
slice_tmp.name,
slice_tmp.stat().st_size / (1024 * 1024),
)
transcode_src = slice_tmp
if settings.biomass_sonar_optical_flow:
flow_dst = media_root / f"{dst_stem}_optical_flow_{ts}.mp4"
@@ -231,12 +431,21 @@ async def _publish_video(
transcode_src = flow_dst
log.info(
"[声呐监控] 光流文件已保存:{} ({:.1f} MB)",
flow_dst.name, flow_dst.stat().st_size / (1024 * 1024),
flow_dst.name,
flow_dst.stat().st_size / (1024 * 1024),
)
else:
flow_tmp.unlink(missing_ok=True)
log.warning("[声呐监控] 光流叠加失败,对原切片直接转码:{}", src.name)
in_codec = _ffprobe_video_codec_name(transcode_src) or "未知"
log.info(
"[声呐监控] H.264 发布转码FishMeasure 同款管线)| 输入编码={} | {} -> {}",
in_codec,
transcode_src.name,
tmp.name,
)
with stage(
f"声呐 H.264 转码({src.name}",
pipeline="sonar_watch",
@@ -256,11 +465,28 @@ async def _publish_video(
tmp.unlink(missing_ok=True)
return None
out_codec = _ffprobe_video_codec_name(tmp) or "未知"
log.info(
"[声呐监控] H.264 转码产物已验证 | codec_name={} | {}{:.2f} MB",
out_codec,
tmp.name,
tmp.stat().st_size / (1024 * 1024),
)
dst = media_root / f"{dst_stem}_{ts}.mp4"
tmp.replace(dst)
elapsed = time.monotonic() - t0
dst_mb = dst.stat().st_size / (1024 * 1024)
# 发布完成事件
pub_metrics = {
"src_mb": round(src_size_mb, 2),
"dst_mb": round(dst_mb, 2),
"dst_name": dst.name,
"slice_order": order,
}
if order == "sequential" and range_start_sec is not None:
pub_metrics["range_start_sec"] = round(range_start_sec, 4)
logger.bind(
event=True,
pipeline="sonar_watch",
@@ -269,14 +495,14 @@ async def _publish_video(
source=src.name,
status="success",
duration_ms=round(elapsed * 1000.0, 3),
metrics={
"src_mb": round(src_size_mb, 2),
"dst_mb": round(dst_mb, 2),
"dst_name": dst.name,
},
metrics=pub_metrics,
).info(
"[声呐监控] 发布完成 | 源 {} ({:.1f} MB) → {} ({:.1f} MB) | 耗时={:.1f}s",
src.name, src_size_mb, dst.name, dst_mb, elapsed,
src.name,
src_size_mb,
dst.name,
dst_mb,
elapsed,
)
return dst
except Exception:
@@ -294,11 +520,12 @@ async def _publish_video(
async def run_sonar_video_watch_loop(settings: Settings) -> None:
"""Poll ``BIOMASS_SONAR_VIDEO_DIR``, follow the **latest** file by mtime.
When that file's ``(resolved_path, size)`` changes and the file is ready to read,
extract tail slice → publish. Each published video gets a unique timestamped
filename; the GET endpoint always returns the most recent one.
* ``tail``: when ``(resolved_path, size)`` changes and the file is ready, run one
tail-``N``-second publish.
* ``sequential``: maintain a cursor from t=0; each poll publishes up to
``max_chunks_per_poll`` full ``N``-second blocks while ``duration`` allows.
"""
global _published_url
global _published_url, _last_sonar_skip_logged_path
d = settings.biomass_sonar_video_dir
if d is None:
@@ -308,21 +535,24 @@ async def run_sonar_video_watch_loop(settings: Settings) -> None:
return
poll = max(1.0, settings.biomass_sonar_video_poll_interval)
slice_order = settings.biomass_sonar_slice_order
max_chunks = settings.biomass_sonar_max_chunks_per_poll
basename = _safe_sonar_media_basename(settings.biomass_sonar_video_media_name)
dst_stem = Path(basename).stem # e.g. "biomass_sonar"
dst_stem = Path(basename).stem
media_root = settings.media_root
media_root.mkdir(parents=True, exist_ok=True)
# Seed published URL from the newest existing published file in media_root
existing = sorted(
media_root.glob(f"{dst_stem}_*.mp4"),
key=lambda p: p.stat().st_mtime,
)
# Exclude intermediates (_tmp, _flow_tmp, _optical_flow_, _tail_slice)
existing = [
p for p in existing
if not any(tag in p.stem for tag in ("_tmp", "_flow_tmp", "_optical_flow_", "_tail_slice"))
if not any(
tag in p.stem
for tag in ("_tmp", "_flow_tmp", "_optical_flow_", "_tail_slice")
)
]
if existing:
seed = existing[-1]
@@ -333,10 +563,17 @@ async def run_sonar_video_watch_loop(settings: Settings) -> None:
)
last_published_key: Optional[Tuple[str, int]] = None
seq_cursor_rp: Optional[str] = None
seq_next_start: float = 0.0
logger.bind(pipeline="sonar_watch").info(
"[声呐监控] 监控目录已启动 | dir={} | poll={:.0f}s | recursive={}",
d, poll, settings.biomass_sonar_video_recursive,
"[声呐监控] 监控目录已启动 | dir={} | poll={:.0f}s | recursive={} | "
"slice_order={} | max_chunks_per_poll={}",
d,
poll,
settings.biomass_sonar_video_recursive,
slice_order,
max_chunks,
)
while True:
@@ -349,12 +586,23 @@ async def run_sonar_video_watch_loop(settings: Settings) -> None:
rp = str(latest.resolve())
sz = latest.stat().st_size
except OSError:
await asyncio.sleep(poll)
continue
if sz > 0 and last_published_key != (rp, sz):
if sz <= 0:
continue
cycle = logger.bind(pipeline="sonar_watch", source=latest.name)
if slice_order == "tail":
if last_published_key == (rp, sz):
continue
ready = await to_thread(_is_ready_to_process, latest)
if ready:
cycle.info(
"[声呐监控] 源文件已就绪,进入发布管线(尾段)| {:.2f} MB | {}",
sz / (1024 * 1024),
rp,
)
published = await _publish_video(
latest, media_root, dst_stem, settings,
)
@@ -364,12 +612,111 @@ async def run_sonar_video_watch_loop(settings: Settings) -> None:
settings, published.name,
)
last_published_key = (rp, sz)
_last_sonar_skip_logged_path = None
else:
cycle.warning(
"[声呐监控] 发布失败(切片/光流/转码见上方)| {}",
latest.name,
)
else:
logger.bind(pipeline="sonar_watch", source=latest.name).debug(
"[声呐监控] {} 尚未就绪(仍在录制?),等待下一轮",
if _last_sonar_skip_logged_path != rp:
_last_sonar_skip_logged_path = rp
cycle.info(
"[声呐监控] 最新文件尚未可处理(录制中或 MP4/MOV 缺 moov"
"等待:{}",
latest.name,
)
else:
cycle.debug(
"[声呐监控] 仍在等待就绪:{}",
latest.name,
)
else:
if rp != seq_cursor_rp:
seq_cursor_rp = rp
seq_next_start = 0.0
cycle.info(
"[声呐监控] 顺序模式:跟踪新文件,游标归零 | {}",
latest.name,
)
ready = await to_thread(_is_ready_to_process, latest)
if not ready:
if _last_sonar_skip_logged_path != rp:
_last_sonar_skip_logged_path = rp
cycle.info(
"[声呐监控] 最新文件尚未可处理(录制中或 MP4/MOV 缺 moov"
"等待:{}",
latest.name,
)
else:
cycle.debug(
"[声呐监控] 仍在等待就绪:{}",
latest.name,
)
else:
duration = await to_thread(
_probe_media_duration_sec, latest,
)
slice_sec = float(settings.biomass_sonar_video_slice_sec)
if duration is None:
cycle.warning(
"[声呐监控] 顺序模式:无法读取 durationffprobe"
"跳过本周期 | {}",
latest.name,
)
elif seq_next_start + slice_sec > duration + 1e-3:
cycle.debug(
"[声呐监控] 顺序模式:无完整 {:.0f}s 块可切 "
"cursor={:.3f}s duration={:.3f}s| {}",
slice_sec,
seq_next_start,
duration,
latest.name,
)
else:
_last_sonar_skip_logged_path = None
n_done = 0
while (
seq_next_start + slice_sec <= duration + 1e-3
):
if max_chunks > 0 and n_done >= max_chunks:
break
cycle.info(
"[声呐监控] 顺序发布 | t={:.3f}s | "
"duration={:.3f}s | {}",
seq_next_start,
duration,
latest.name,
)
published = await _publish_video(
latest,
media_root,
dst_stem,
settings,
range_start_sec=seq_next_start,
)
if published is None:
cycle.warning(
"[声呐监控] 顺序发布失败 @ t={:.3f}s | {}",
seq_next_start,
latest.name,
)
break
seq_next_start += slice_sec
n_done += 1
async with _published_lock:
_published_url = _public_media_url(
settings, published.name,
)
await asyncio.sleep(0)
else:
logger.bind(pipeline="sonar_watch").debug(
"[声呐监控] 目录中暂无 .mp4/.mkv/.mov{}",
d,
)
except asyncio.CancelledError:
raise
except Exception:

View File

@@ -99,7 +99,7 @@ class Settings(BaseSettings):
python_fish_measure: str = ""
python_fish_action: str = ""
#: ffmpeg 可执行文件路径;为空时按顺序尝试 tools/ffmpeg/bin/ffmpeg → 系统 PATH。**FFMPEG_PATH**
#: ffmpeg 可执行文件路径。非空则优先用该文件;为空时依次尝试 ``/usr/bin/ffmpeg``、``/usr/local/bin/ffmpeg``,否则用 PATH 中的 ``ffmpeg``。(若要用仓库 ``tools/ffmpeg``,请设为本机绝对路径。)**FFMPEG_PATH**
ffmpeg_path: str = ""
#: SAM/CUDA 设备cuda 或 cpu
@@ -285,6 +285,22 @@ class Settings(BaseSettings):
"BIOMASS_SONAR_VIDEO_SLICE_SEC", "biomass_sonar_video_slice_sec"
),
)
#: 声呐切片顺序:``tail`` = 每次取文件末尾 N 秒(``-sseof````sequential`` = 从 t=0 起按 N 秒顺序切完整块(不足一块不发布)。**BIOMASS_SONAR_SLICE_ORDER**
biomass_sonar_slice_order: str = Field(
default="sequential",
validation_alias=AliasChoices(
"BIOMASS_SONAR_SLICE_ORDER", "biomass_sonar_slice_order"
),
)
#: 顺序模式下每轮监控循环最多发布几块完整切片;``0`` = 不限制(长视频可能长时间占住循环)。**BIOMASS_SONAR_MAX_CHUNKS_PER_POLL**
biomass_sonar_max_chunks_per_poll: int = Field(
default=1,
ge=0,
validation_alias=AliasChoices(
"BIOMASS_SONAR_MAX_CHUNKS_PER_POLL",
"biomass_sonar_max_chunks_per_poll",
),
)
#: 发布到 MEDIA_ROOT 的 H.264 文件名。**BIOMASS_SONAR_VIDEO_MEDIA_NAME**
biomass_sonar_video_media_name: str = "biomass_sonar.mp4"
#: 为 True 时声呐发布管线在 ffmpeg 转 H.264 之前先做 Farneback 光流 overlay与 ``/sonar/video/`` 返回的仍是同一 ``video_path`` 字段)。**BIOMASS_SONAR_OPTICAL_FLOW**
@@ -396,6 +412,21 @@ class Settings(BaseSettings):
return None
return v
@field_validator("biomass_sonar_slice_order", mode="before")
@classmethod
def _normalize_biomass_sonar_slice_order(cls, v: object) -> str:
if v is None:
return "sequential"
s = str(v).strip().lower()
if s in frozenset({"tail", "last", "last_n", "sseof"}):
return "tail"
if s in frozenset({"sequential", "seq", "from_start"}):
return "sequential"
raise ValueError(
"biomass_sonar_slice_order must be 'tail' or 'sequential' "
f"(got {v!r})"
)
@field_validator(
"action_watch_dir",
"biomass_water_video_source",