741 lines
28 KiB
Python
741 lines
28 KiB
Python
"""声呐视频:后台处理 ``BIOMASS_SONAR_VIDEO_DIR`` 中的**当前 mtime 最新**视频文件。
|
||
|
||
支持 MP4、MKV、MOV。MP4/MOV 在录制中缺少 ``moov`` atom,须等录完才能处理;
|
||
MKV 的元数据写在文件头,录制中即可读取,无需等待。
|
||
|
||
切片策略由 ``Settings.biomass_sonar_slice_order`` 决定:
|
||
|
||
* **sequential**(默认):从 t=0 起按 ``BIOMASS_SONAR_VIDEO_SLICE_SEC`` 顺序切**完整**
|
||
块(ffprobe 可得 duration 且剩余不足一块则不发布);增长中的文件随 duration 增大继续切。
|
||
* **tail**:当原文件 **(path, size)** 变化时,用 ``-sseof`` 取**最后 N 秒**再处理。
|
||
|
||
之后对切片做光流 overlay(可选)→ H.264 转码并发布。``GET .../sonar/video/`` 返回
|
||
最近一次成功发布的 URL。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import datetime
|
||
import math
|
||
import subprocess
|
||
import time
|
||
from pathlib import Path
|
||
from typing import Optional, Tuple
|
||
|
||
from loguru import logger
|
||
|
||
from app.compat import to_thread
|
||
from app.logging_config import new_run_id, stage
|
||
from app.services.action_watch import iter_mp4
|
||
from app.services.measure import _ffprobe_video_codec_name, transcode_src_to_h264_dst
|
||
from app.services.sonar_optical_flow import run_sonar_optical_flow_overlay
|
||
from app.services.video_slice import _get_ffmpeg_path, _get_ffprobe_path
|
||
from app.settings import Settings
|
||
|
||
DEFAULT_CLIENT_ID = "default"
|
||
|
||
# --- published state (read by GET endpoint) ---
|
||
_published_url: str = ""
|
||
_published_lock = asyncio.Lock()
|
||
|
||
# 避免对同一「未就绪」文件每个 poll 都打 INFO
|
||
_last_sonar_skip_logged_path: Optional[str] = None
|
||
|
||
|
||
def _public_media_url(settings: Settings, basename: str) -> str:
|
||
base = settings.public_base_url.rstrip("/")
|
||
return f"{base}/media/{basename}"
|
||
|
||
|
||
def _safe_sonar_media_basename(raw: str) -> str:
|
||
n = (raw or "").strip()
|
||
if not n:
|
||
return "biomass_sonar.mp4"
|
||
return Path(n).name or "biomass_sonar.mp4"
|
||
|
||
|
||
# Formats that store metadata at the start → readable while recording
|
||
_STREAMING_SUFFIXES = frozenset({".mkv", ".ts", ".webm"})
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Probe: is the video file ready to process?
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _is_ready_to_process(path: Path) -> bool:
|
||
"""MKV/TS/WebM are always readable (metadata at start); MP4/MOV need moov at end."""
|
||
if path.suffix.lower() in _STREAMING_SUFFIXES:
|
||
return path.is_file() and path.stat().st_size > 0
|
||
return _probe_moov_readable(path)
|
||
|
||
|
||
def _probe_moov_readable(path: Path) -> bool:
|
||
"""Quick check via ffprobe (fallback cv2): does the MP4/MOV have a moov atom?"""
|
||
log = logger.bind(pipeline="sonar_watch", source=path.name)
|
||
ffprobe = _get_ffprobe_path()
|
||
try:
|
||
r = subprocess.run(
|
||
[
|
||
ffprobe, "-v", "error", "-show_entries", "format=duration",
|
||
"-of", "default=noprint_wrappers=1:nokey=1", str(path),
|
||
],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=5,
|
||
)
|
||
if r.returncode == 0 and r.stdout.strip():
|
||
return True
|
||
log.debug(
|
||
"[声呐监控] ffprobe 无有效 duration(可能缺 moov)| ffprobe={} | {}",
|
||
ffprobe,
|
||
path.name,
|
||
)
|
||
return False
|
||
except FileNotFoundError:
|
||
log.warning(
|
||
"[声呐监控] 未找到 ffprobe(与 FFMPEG_PATH 同目录或 PATH)| 配置路径={}",
|
||
ffprobe,
|
||
)
|
||
except Exception as e:
|
||
log.debug(
|
||
"[声呐监控] ffprobe 失败({}),改用 cv2 探测 | ffprobe={}",
|
||
e,
|
||
ffprobe,
|
||
)
|
||
|
||
try:
|
||
import cv2
|
||
cap = cv2.VideoCapture(str(path))
|
||
ok = cap.isOpened() and cap.get(cv2.CAP_PROP_FRAME_COUNT) > 0
|
||
cap.release()
|
||
if not ok:
|
||
log.debug("[声呐监控] cv2:无法读取 {}", path.name)
|
||
return ok
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def _probe_media_duration_sec(path: Path) -> Optional[float]:
|
||
"""ffprobe ``format=duration``;无法解析(N/A、moov 未就绪等)时返回 None。"""
|
||
ffprobe = _get_ffprobe_path()
|
||
try:
|
||
r = subprocess.run(
|
||
[
|
||
ffprobe,
|
||
"-v",
|
||
"error",
|
||
"-show_entries",
|
||
"format=duration",
|
||
"-of",
|
||
"default=noprint_wrappers=1:nokey=1",
|
||
str(path),
|
||
],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=30,
|
||
)
|
||
if r.returncode != 0:
|
||
return None
|
||
raw = (r.stdout or "").strip()
|
||
if not raw or raw.lower() in ("n/a", "nan"):
|
||
return None
|
||
d = float(raw)
|
||
if not math.isfinite(d) or d <= 0:
|
||
return None
|
||
return d
|
||
except (ValueError, subprocess.TimeoutExpired, OSError):
|
||
return None
|
||
|
||
|
||
def _extract_range_slice(
|
||
src: Path,
|
||
slice_out: Path,
|
||
start_sec: float,
|
||
duration_sec: float,
|
||
) -> bool:
|
||
"""Extract ``[start_sec, start_sec + duration_sec)`` with stream copy (``-ss`` before ``-i``)."""
|
||
slice_out.parent.mkdir(parents=True, exist_ok=True)
|
||
slice_out.unlink(missing_ok=True)
|
||
if not src.is_file() or src.stat().st_size <= 0:
|
||
return False
|
||
start = float(start_sec)
|
||
dur = float(duration_sec)
|
||
if not math.isfinite(start) or start < 0 or not math.isfinite(dur) or dur <= 0:
|
||
return False
|
||
|
||
ffmpeg = _get_ffmpeg_path()
|
||
log = logger.bind(pipeline="sonar_watch", source=src.name)
|
||
log.info(
|
||
"[声呐监控] 顺序切片提取 | ffmpeg={} | {:.3f}s–{:.3f}s({:.0f}s)| {} -> {}",
|
||
ffmpeg,
|
||
start,
|
||
start + dur,
|
||
dur,
|
||
src.name,
|
||
slice_out.name,
|
||
)
|
||
cmd = [
|
||
ffmpeg,
|
||
"-y",
|
||
"-hide_banner",
|
||
"-loglevel",
|
||
"error",
|
||
"-ss",
|
||
str(start),
|
||
"-t",
|
||
str(dur),
|
||
"-i",
|
||
str(src),
|
||
"-c",
|
||
"copy",
|
||
"-avoid_negative_ts",
|
||
"make_zero",
|
||
str(slice_out),
|
||
]
|
||
r: subprocess.CompletedProcess[str]
|
||
try:
|
||
r = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
|
||
if r.returncode == 0 and slice_out.is_file() and slice_out.stat().st_size > 0:
|
||
log.info(
|
||
"[声呐监控] 顺序切片成功 | {} -> {}({} 字节)",
|
||
src.name,
|
||
slice_out.name,
|
||
slice_out.stat().st_size,
|
||
)
|
||
return True
|
||
except Exception as e:
|
||
log.debug("[声呐监控] 顺序切片异常:{} | {}", src.name, e)
|
||
slice_out.unlink(missing_ok=True)
|
||
return False
|
||
|
||
tail = (r.stderr or "")[-500:]
|
||
log.warning(
|
||
"[声呐监控] 顺序切片失败 | {} | stderr尾={}",
|
||
src.name,
|
||
tail,
|
||
)
|
||
slice_out.unlink(missing_ok=True)
|
||
return False
|
||
|
||
|
||
def _extract_tail_slice(src: Path, slice_out: Path, duration_sec: float) -> bool:
|
||
"""Extract last ``duration_sec`` seconds with ``ffmpeg -sseof`` + stream copy.
|
||
|
||
For growing MKV files ``ffprobe`` often returns ``N/A`` for duration, so we
|
||
always attempt ``-sseof`` first (ffmpeg clamps to file start when the file is
|
||
shorter than the requested window). Only when ``-sseof`` fails do we fall
|
||
back to a plain ``-c copy`` of the entire file.
|
||
"""
|
||
slice_out.parent.mkdir(parents=True, exist_ok=True)
|
||
slice_out.unlink(missing_ok=True)
|
||
if not src.is_file() or src.stat().st_size <= 0:
|
||
return False
|
||
|
||
ffmpeg = _get_ffmpeg_path()
|
||
sec = float(duration_sec)
|
||
log = logger.bind(pipeline="sonar_watch", source=src.name)
|
||
log.info(
|
||
"[声呐监控] 尾段提取开始 | ffmpeg={} | 最后 {:.0f}s | {} -> {}",
|
||
ffmpeg,
|
||
sec,
|
||
src.name,
|
||
slice_out.name,
|
||
)
|
||
|
||
# --- primary: -sseof (works on growing MKV even when duration is unknown) ---
|
||
sseof_cmd = [
|
||
ffmpeg, "-y", "-hide_banner", "-loglevel", "error",
|
||
"-sseof", f"-{sec}",
|
||
"-i", str(src),
|
||
"-t", str(sec),
|
||
"-c", "copy",
|
||
"-avoid_negative_ts", "make_zero",
|
||
str(slice_out),
|
||
]
|
||
try:
|
||
r = subprocess.run(sseof_cmd, capture_output=True, text=True, timeout=600)
|
||
if r.returncode == 0 and slice_out.is_file() and slice_out.stat().st_size > 0:
|
||
log.info(
|
||
"[声呐监控] 尾段提取成功(-sseof {:.0f}s copy)| ffmpeg={} | {} -> {}({} 字节)",
|
||
sec,
|
||
ffmpeg,
|
||
src.name,
|
||
slice_out.name,
|
||
slice_out.stat().st_size,
|
||
)
|
||
return True
|
||
except Exception as e:
|
||
log.debug("[声呐监控] -sseof 提取异常:{}({})", src.name, e)
|
||
|
||
# --- fallback: copy entire file (source shorter than window or -sseof unsupported) ---
|
||
slice_out.unlink(missing_ok=True)
|
||
copy_cmd = [
|
||
ffmpeg, "-y", "-hide_banner", "-loglevel", "error",
|
||
"-i", str(src),
|
||
"-c", "copy",
|
||
str(slice_out),
|
||
]
|
||
try:
|
||
r = subprocess.run(copy_cmd, capture_output=True, text=True, timeout=600)
|
||
if r.returncode == 0 and slice_out.is_file() and slice_out.stat().st_size > 0:
|
||
log.info(
|
||
"[声呐监控] 尾段提取成功(整段 -c copy 回退)| ffmpeg={} | {} -> {}({} 字节)",
|
||
ffmpeg,
|
||
src.name,
|
||
slice_out.name,
|
||
slice_out.stat().st_size,
|
||
)
|
||
return True
|
||
except Exception as e:
|
||
log.warning("[声呐监控] 尾段提取回退异常:{}({})", src.name, e)
|
||
|
||
log.warning(
|
||
"[声呐监控] 尾段提取失败 | {} | stderr={}",
|
||
src.name, (r.stderr or "")[:500],
|
||
)
|
||
slice_out.unlink(missing_ok=True)
|
||
return False
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Publish pipeline: slice (tail or sequential) → optical flow (optional) → H.264
|
||
# ---------------------------------------------------------------------------
|
||
|
||
async def _publish_video(
|
||
src: Path,
|
||
media_root: Path,
|
||
dst_stem: str,
|
||
settings: Settings,
|
||
*,
|
||
range_start_sec: Optional[float] = None,
|
||
) -> Optional[Path]:
|
||
"""Extract a slice → optional optical-flow → H.264 transcode → verify → save.
|
||
|
||
* ``tail`` order: ignores ``range_start_sec``; uses ``-sseof`` last N seconds.
|
||
* ``sequential`` order: requires ``range_start_sec``; uses ``-ss`` / ``-t`` copy.
|
||
|
||
Returns the path of the published file, or ``None`` on failure.
|
||
"""
|
||
order = settings.biomass_sonar_slice_order
|
||
src_size_mb = src.stat().st_size / (1024 * 1024) if src.is_file() else 0
|
||
slice_sec = float(settings.biomass_sonar_video_slice_sec)
|
||
rid = new_run_id("sonar_watch")
|
||
log = logger.bind(pipeline="sonar_watch", run_id=rid, source=src.name)
|
||
|
||
if order == "sequential":
|
||
if range_start_sec is None:
|
||
log.error("[声呐监控] sequential 模式缺少 range_start_sec:{}", src.name)
|
||
return None
|
||
log.info(
|
||
"[声呐监控] 开始处理:{} ({:.1f} MB) | 顺序切片 t={:.3f}s | 块长={:.0f}s | 光流={}",
|
||
src.name,
|
||
src_size_mb,
|
||
range_start_sec,
|
||
slice_sec,
|
||
settings.biomass_sonar_optical_flow,
|
||
)
|
||
else:
|
||
log.info(
|
||
"[声呐监控] 开始处理:{} ({:.1f} MB) | 尾段={:.0f}s | 光流={}",
|
||
src.name,
|
||
src_size_mb,
|
||
slice_sec,
|
||
settings.biomass_sonar_optical_flow,
|
||
)
|
||
|
||
t0 = time.monotonic()
|
||
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
slice_tmp = media_root / f"{dst_stem}_tail_slice.mkv"
|
||
slice_tmp.unlink(missing_ok=True)
|
||
tmp = media_root / f"{dst_stem}_tmp.mp4"
|
||
tmp.unlink(missing_ok=True)
|
||
|
||
extract_metrics = {
|
||
"slice_sec": slice_sec,
|
||
"src_mb": round(src_size_mb, 2),
|
||
"slice_order": order,
|
||
}
|
||
if order == "sequential" and range_start_sec is not None:
|
||
extract_metrics["range_start_sec"] = round(range_start_sec, 4)
|
||
|
||
try:
|
||
if order == "tail":
|
||
stage_title = f"声呐尾段提取({src.name})"
|
||
extract_step = "extract_tail"
|
||
with stage(
|
||
stage_title,
|
||
pipeline="sonar_watch",
|
||
step=extract_step,
|
||
run_id=rid,
|
||
source=src.name,
|
||
metrics=extract_metrics,
|
||
raise_on_error=False,
|
||
):
|
||
slice_ok = await to_thread(
|
||
_extract_tail_slice, src, slice_tmp, slice_sec,
|
||
)
|
||
else:
|
||
assert range_start_sec is not None
|
||
stage_title = (
|
||
f"声呐顺序切片 {range_start_sec:.1f}s–{range_start_sec + slice_sec:.1f}s"
|
||
f"({src.name})"
|
||
)
|
||
with stage(
|
||
stage_title,
|
||
pipeline="sonar_watch",
|
||
step="extract_sequential",
|
||
run_id=rid,
|
||
source=src.name,
|
||
metrics=extract_metrics,
|
||
raise_on_error=False,
|
||
):
|
||
slice_ok = await to_thread(
|
||
_extract_range_slice,
|
||
src,
|
||
slice_tmp,
|
||
range_start_sec,
|
||
slice_sec,
|
||
)
|
||
|
||
if not slice_ok:
|
||
log.warning("[声呐监控] 切片提取失败,跳过发布:{}", src.name)
|
||
return None
|
||
|
||
if slice_tmp.is_file():
|
||
log.info(
|
||
"[声呐监控] 切片已就绪 | {}({:.2f} MB)→ 后续光流/H.264",
|
||
slice_tmp.name,
|
||
slice_tmp.stat().st_size / (1024 * 1024),
|
||
)
|
||
|
||
transcode_src = slice_tmp
|
||
if settings.biomass_sonar_optical_flow:
|
||
flow_dst = media_root / f"{dst_stem}_optical_flow_{ts}.mp4"
|
||
flow_tmp = media_root / f"{dst_stem}_flow_tmp.mp4"
|
||
flow_tmp.unlink(missing_ok=True)
|
||
with stage(
|
||
f"声呐光流叠加({src.name})",
|
||
pipeline="sonar_watch",
|
||
step="optical_flow",
|
||
run_id=rid,
|
||
source=src.name,
|
||
metrics={"resize": settings.biomass_sonar_optical_flow_resize},
|
||
raise_on_error=False,
|
||
):
|
||
flow_ok = await to_thread(
|
||
run_sonar_optical_flow_overlay, slice_tmp, flow_tmp, settings,
|
||
)
|
||
if flow_ok and flow_tmp.is_file() and flow_tmp.stat().st_size > 0:
|
||
flow_tmp.replace(flow_dst)
|
||
transcode_src = flow_dst
|
||
log.info(
|
||
"[声呐监控] 光流文件已保存:{} ({:.1f} MB)",
|
||
flow_dst.name,
|
||
flow_dst.stat().st_size / (1024 * 1024),
|
||
)
|
||
else:
|
||
flow_tmp.unlink(missing_ok=True)
|
||
log.warning("[声呐监控] 光流叠加失败,对原切片直接转码:{}", src.name)
|
||
|
||
in_codec = _ffprobe_video_codec_name(transcode_src) or "未知"
|
||
log.info(
|
||
"[声呐监控] H.264 发布转码(FishMeasure 同款管线)| 输入编码={} | {} -> {}",
|
||
in_codec,
|
||
transcode_src.name,
|
||
tmp.name,
|
||
)
|
||
|
||
with stage(
|
||
f"声呐 H.264 转码({src.name})",
|
||
pipeline="sonar_watch",
|
||
step="transcode_h264",
|
||
run_id=rid,
|
||
source=src.name,
|
||
raise_on_error=False,
|
||
):
|
||
ok = await to_thread(transcode_src_to_h264_dst, transcode_src, tmp)
|
||
if not (ok and tmp.is_file() and tmp.stat().st_size > 0):
|
||
tmp.unlink(missing_ok=True)
|
||
log.warning("[声呐监控] 转码失败:{}", src.name)
|
||
return None
|
||
|
||
if not _probe_moov_readable(tmp):
|
||
log.warning("[声呐监控] 转码产物不可播放,已丢弃:{}", src.name)
|
||
tmp.unlink(missing_ok=True)
|
||
return None
|
||
|
||
out_codec = _ffprobe_video_codec_name(tmp) or "未知"
|
||
log.info(
|
||
"[声呐监控] H.264 转码产物已验证 | codec_name={} | {}({:.2f} MB)",
|
||
out_codec,
|
||
tmp.name,
|
||
tmp.stat().st_size / (1024 * 1024),
|
||
)
|
||
|
||
dst = media_root / f"{dst_stem}_{ts}.mp4"
|
||
tmp.replace(dst)
|
||
elapsed = time.monotonic() - t0
|
||
dst_mb = dst.stat().st_size / (1024 * 1024)
|
||
|
||
pub_metrics = {
|
||
"src_mb": round(src_size_mb, 2),
|
||
"dst_mb": round(dst_mb, 2),
|
||
"dst_name": dst.name,
|
||
"slice_order": order,
|
||
}
|
||
if order == "sequential" and range_start_sec is not None:
|
||
pub_metrics["range_start_sec"] = round(range_start_sec, 4)
|
||
|
||
logger.bind(
|
||
event=True,
|
||
pipeline="sonar_watch",
|
||
step="publish_total",
|
||
run_id=rid,
|
||
source=src.name,
|
||
status="success",
|
||
duration_ms=round(elapsed * 1000.0, 3),
|
||
metrics=pub_metrics,
|
||
).info(
|
||
"[声呐监控] 发布完成 | 源 {} ({:.1f} MB) → {} ({:.1f} MB) | 耗时={:.1f}s",
|
||
src.name,
|
||
src_size_mb,
|
||
dst.name,
|
||
dst_mb,
|
||
elapsed,
|
||
)
|
||
return dst
|
||
except Exception:
|
||
log.exception("[声呐监控] 发布异常 | {}", src.name)
|
||
tmp.unlink(missing_ok=True)
|
||
return None
|
||
finally:
|
||
slice_tmp.unlink(missing_ok=True)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Background watcher loop (started from lifespan, like action_watch)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
async def run_sonar_video_watch_loop(settings: Settings) -> None:
|
||
"""Poll ``BIOMASS_SONAR_VIDEO_DIR``, follow the **latest** file by mtime.
|
||
|
||
* ``tail``: when ``(resolved_path, size)`` changes and the file is ready, run one
|
||
tail-``N``-second publish.
|
||
* ``sequential``: maintain a cursor from t=0; each poll publishes up to
|
||
``max_chunks_per_poll`` full ``N``-second blocks while ``duration`` allows.
|
||
"""
|
||
global _published_url, _last_sonar_skip_logged_path
|
||
|
||
d = settings.biomass_sonar_video_dir
|
||
if d is None:
|
||
logger.bind(pipeline="sonar_watch").info(
|
||
"[声呐监控] BIOMASS_SONAR_VIDEO_DIR 未设置,声呐监控未启用"
|
||
)
|
||
return
|
||
|
||
poll = max(1.0, settings.biomass_sonar_video_poll_interval)
|
||
slice_order = settings.biomass_sonar_slice_order
|
||
max_chunks = settings.biomass_sonar_max_chunks_per_poll
|
||
|
||
basename = _safe_sonar_media_basename(settings.biomass_sonar_video_media_name)
|
||
dst_stem = Path(basename).stem
|
||
media_root = settings.media_root
|
||
media_root.mkdir(parents=True, exist_ok=True)
|
||
|
||
existing = sorted(
|
||
media_root.glob(f"{dst_stem}_*.mp4"),
|
||
key=lambda p: p.stat().st_mtime,
|
||
)
|
||
existing = [
|
||
p for p in existing
|
||
if not any(
|
||
tag in p.stem
|
||
for tag in ("_tmp", "_flow_tmp", "_optical_flow_", "_tail_slice")
|
||
)
|
||
]
|
||
if existing:
|
||
seed = existing[-1]
|
||
async with _published_lock:
|
||
_published_url = _public_media_url(settings, seed.name)
|
||
logger.bind(pipeline="sonar_watch").info(
|
||
"[声呐监控] 已用历史发布文件回种:{}", _published_url
|
||
)
|
||
|
||
last_published_key: Optional[Tuple[str, int]] = None
|
||
seq_cursor_rp: Optional[str] = None
|
||
seq_next_start: float = 0.0
|
||
|
||
logger.bind(pipeline="sonar_watch").info(
|
||
"[声呐监控] 监控目录已启动 | dir={} | poll={:.0f}s | recursive={} | "
|
||
"slice_order={} | max_chunks_per_poll={}",
|
||
d,
|
||
poll,
|
||
settings.biomass_sonar_video_recursive,
|
||
slice_order,
|
||
max_chunks,
|
||
)
|
||
|
||
while True:
|
||
try:
|
||
if d.is_dir():
|
||
all_videos = iter_mp4(d, settings.biomass_sonar_video_recursive)
|
||
if all_videos:
|
||
latest = max(all_videos, key=lambda p: p.stat().st_mtime)
|
||
try:
|
||
rp = str(latest.resolve())
|
||
sz = latest.stat().st_size
|
||
except OSError:
|
||
continue
|
||
|
||
if sz <= 0:
|
||
continue
|
||
|
||
cycle = logger.bind(pipeline="sonar_watch", source=latest.name)
|
||
|
||
if slice_order == "tail":
|
||
if last_published_key == (rp, sz):
|
||
continue
|
||
ready = await to_thread(_is_ready_to_process, latest)
|
||
if ready:
|
||
cycle.info(
|
||
"[声呐监控] 源文件已就绪,进入发布管线(尾段)| {:.2f} MB | {}",
|
||
sz / (1024 * 1024),
|
||
rp,
|
||
)
|
||
published = await _publish_video(
|
||
latest, media_root, dst_stem, settings,
|
||
)
|
||
if published is not None:
|
||
async with _published_lock:
|
||
_published_url = _public_media_url(
|
||
settings, published.name,
|
||
)
|
||
last_published_key = (rp, sz)
|
||
_last_sonar_skip_logged_path = None
|
||
else:
|
||
cycle.warning(
|
||
"[声呐监控] 发布失败(切片/光流/转码见上方)| {}",
|
||
latest.name,
|
||
)
|
||
else:
|
||
if _last_sonar_skip_logged_path != rp:
|
||
_last_sonar_skip_logged_path = rp
|
||
cycle.info(
|
||
"[声呐监控] 最新文件尚未可处理(录制中或 MP4/MOV 缺 moov),"
|
||
"等待:{}",
|
||
latest.name,
|
||
)
|
||
else:
|
||
cycle.debug(
|
||
"[声呐监控] 仍在等待就绪:{}",
|
||
latest.name,
|
||
)
|
||
else:
|
||
if rp != seq_cursor_rp:
|
||
seq_cursor_rp = rp
|
||
seq_next_start = 0.0
|
||
cycle.info(
|
||
"[声呐监控] 顺序模式:跟踪新文件,游标归零 | {}",
|
||
latest.name,
|
||
)
|
||
|
||
ready = await to_thread(_is_ready_to_process, latest)
|
||
if not ready:
|
||
if _last_sonar_skip_logged_path != rp:
|
||
_last_sonar_skip_logged_path = rp
|
||
cycle.info(
|
||
"[声呐监控] 最新文件尚未可处理(录制中或 MP4/MOV 缺 moov),"
|
||
"等待:{}",
|
||
latest.name,
|
||
)
|
||
else:
|
||
cycle.debug(
|
||
"[声呐监控] 仍在等待就绪:{}",
|
||
latest.name,
|
||
)
|
||
else:
|
||
duration = await to_thread(
|
||
_probe_media_duration_sec, latest,
|
||
)
|
||
slice_sec = float(settings.biomass_sonar_video_slice_sec)
|
||
if duration is None:
|
||
cycle.warning(
|
||
"[声呐监控] 顺序模式:无法读取 duration(ffprobe),"
|
||
"跳过本周期 | {}",
|
||
latest.name,
|
||
)
|
||
elif seq_next_start + slice_sec > duration + 1e-3:
|
||
cycle.debug(
|
||
"[声呐监控] 顺序模式:无完整 {:.0f}s 块可切 "
|
||
"(cursor={:.3f}s duration={:.3f}s)| {}",
|
||
slice_sec,
|
||
seq_next_start,
|
||
duration,
|
||
latest.name,
|
||
)
|
||
else:
|
||
_last_sonar_skip_logged_path = None
|
||
n_done = 0
|
||
while (
|
||
seq_next_start + slice_sec <= duration + 1e-3
|
||
):
|
||
if max_chunks > 0 and n_done >= max_chunks:
|
||
break
|
||
cycle.info(
|
||
"[声呐监控] 顺序发布 | t={:.3f}s | "
|
||
"duration={:.3f}s | {}",
|
||
seq_next_start,
|
||
duration,
|
||
latest.name,
|
||
)
|
||
published = await _publish_video(
|
||
latest,
|
||
media_root,
|
||
dst_stem,
|
||
settings,
|
||
range_start_sec=seq_next_start,
|
||
)
|
||
if published is None:
|
||
cycle.warning(
|
||
"[声呐监控] 顺序发布失败 @ t={:.3f}s | {}",
|
||
seq_next_start,
|
||
latest.name,
|
||
)
|
||
break
|
||
seq_next_start += slice_sec
|
||
n_done += 1
|
||
async with _published_lock:
|
||
_published_url = _public_media_url(
|
||
settings, published.name,
|
||
)
|
||
await asyncio.sleep(0)
|
||
|
||
else:
|
||
logger.bind(pipeline="sonar_watch").debug(
|
||
"[声呐监控] 目录中暂无 .mp4/.mkv/.mov:{}",
|
||
d,
|
||
)
|
||
|
||
except asyncio.CancelledError:
|
||
raise
|
||
except Exception:
|
||
logger.bind(pipeline="sonar_watch").exception(
|
||
"[声呐监控] 监控循环异常"
|
||
)
|
||
|
||
await asyncio.sleep(poll)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# GET endpoint helper (called from biomass router, no change to API contract)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
async def get_sonar_video_public_url(
|
||
settings: Settings,
|
||
_client_id: str = DEFAULT_CLIENT_ID,
|
||
) -> str:
|
||
"""Return the URL of the latest successfully published sonar video, or ``""``."""
|
||
async with _published_lock:
|
||
return _published_url
|