613 lines
19 KiB
Python
613 lines
19 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
"""
|
|||
|
|
从视频读取连续帧,使用 OpenCV Farneback 稠密光流估计运动,
|
|||
|
|
输出带光流伪彩色(方向=色调、速度=亮度)的可视化视频。
|
|||
|
|
|
|||
|
|
声呐类画面背景噪声也会在光流里产生响应;可用 --fish-mask 仅在高亮「鱼团」
|
|||
|
|
区域显示光流,并用连通域面积过滤掉细碎亮点。
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
from __future__ import annotations
|
|||
|
|
import argparse
|
|||
|
|
import logging
|
|||
|
|
import time
|
|||
|
|
from pathlib import Path
|
|||
|
|
from types import SimpleNamespace
|
|||
|
|
from typing import Any
|
|||
|
|
|
|||
|
|
import cv2
|
|||
|
|
import numpy as np
|
|||
|
|
|
|||
|
|
_log = logging.getLogger("optical_flow")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def flow_to_bgr(
|
|||
|
|
flow: np.ndarray,
|
|||
|
|
mag_clip_percentile: float = 95.0,
|
|||
|
|
valid_mask: np.ndarray | None = None,
|
|||
|
|
) -> np.ndarray:
|
|||
|
|
"""将 (H,W,2) 光流转为 BGR 伪彩色图。若给定 valid_mask,幅值分位数仅在掩膜内统计。"""
|
|||
|
|
fx = flow[..., 0].astype(np.float32)
|
|||
|
|
fy = flow[..., 1].astype(np.float32)
|
|||
|
|
mag = np.sqrt(fx * fx + fy * fy)
|
|||
|
|
ang = np.arctan2(fy, fx)
|
|||
|
|
|
|||
|
|
if valid_mask is not None and valid_mask.size > 0:
|
|||
|
|
vm = valid_mask > 0
|
|||
|
|
if np.any(vm):
|
|||
|
|
clip = float(np.percentile(mag[vm], mag_clip_percentile))
|
|||
|
|
else:
|
|||
|
|
clip = float(np.percentile(mag, mag_clip_percentile))
|
|||
|
|
else:
|
|||
|
|
clip = float(np.percentile(mag, mag_clip_percentile))
|
|||
|
|
if clip < 1e-6:
|
|||
|
|
clip = 1e-6
|
|||
|
|
mag_norm = np.clip(mag / clip, 0.0, 1.0)
|
|||
|
|
|
|||
|
|
h, w = flow.shape[:2]
|
|||
|
|
hsv = np.zeros((h, w, 3), dtype=np.float32)
|
|||
|
|
hsv[..., 0] = (ang + np.pi) / (2.0 * np.pi) * 179.0
|
|||
|
|
hsv[..., 1] = 255.0
|
|||
|
|
hsv[..., 2] = mag_norm * 255.0
|
|||
|
|
hsv_u8 = hsv.astype(np.uint8)
|
|||
|
|
return cv2.cvtColor(hsv_u8, cv2.COLOR_HSV2BGR)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _odd_k(k: int) -> int:
|
|||
|
|
k = max(1, int(k))
|
|||
|
|
return k if k % 2 == 1 else k + 1
|
|||
|
|
|
|||
|
|
|
|||
|
|
def build_fish_mask(
|
|||
|
|
gray_u8: np.ndarray,
|
|||
|
|
*,
|
|||
|
|
bright_percentile: float,
|
|||
|
|
min_blob_area: int,
|
|||
|
|
open_k: int,
|
|||
|
|
close_k: int,
|
|||
|
|
dilate_k: int,
|
|||
|
|
blur_sigma: float,
|
|||
|
|
keep_largest_blobs: int = 0,
|
|||
|
|
) -> np.ndarray:
|
|||
|
|
"""
|
|||
|
|
声呐/暗背景高亮目标:取灰度高分位作为阈值,形态学去噪,保留足够大的连通域。
|
|||
|
|
"""
|
|||
|
|
g = gray_u8
|
|||
|
|
if blur_sigma > 1e-6:
|
|||
|
|
k = _odd_k(int(round(blur_sigma * 6)) | 1)
|
|||
|
|
k = max(3, min(k, 31))
|
|||
|
|
g = cv2.GaussianBlur(g, (k, k), blur_sigma)
|
|||
|
|
|
|||
|
|
thr = float(np.percentile(g.astype(np.float32), bright_percentile))
|
|||
|
|
binary = (g.astype(np.float32) >= thr).astype(np.uint8) * 255
|
|||
|
|
|
|||
|
|
if open_k > 0:
|
|||
|
|
ok = _odd_k(open_k)
|
|||
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (ok, ok))
|
|||
|
|
binary = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel)
|
|||
|
|
if close_k > 0:
|
|||
|
|
ck = _odd_k(close_k)
|
|||
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (ck, ck))
|
|||
|
|
binary = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel)
|
|||
|
|
|
|||
|
|
num, labels, stats, _ = cv2.connectedComponentsWithStats(binary, connectivity=8)
|
|||
|
|
out = np.zeros_like(binary)
|
|||
|
|
for i in range(1, num):
|
|||
|
|
if stats[i, cv2.CC_STAT_AREA] >= min_blob_area:
|
|||
|
|
out[labels == i] = 255
|
|||
|
|
|
|||
|
|
if dilate_k > 0:
|
|||
|
|
dk = _odd_k(dilate_k)
|
|||
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (dk, dk))
|
|||
|
|
out = cv2.dilate(out, kernel)
|
|||
|
|
|
|||
|
|
if keep_largest_blobs > 0:
|
|||
|
|
num, labels, stats, _ = cv2.connectedComponentsWithStats(out, connectivity=8)
|
|||
|
|
ranked: list[tuple[int, int]] = []
|
|||
|
|
for i in range(1, num):
|
|||
|
|
ranked.append((int(stats[i, cv2.CC_STAT_AREA]), i))
|
|||
|
|
ranked.sort(reverse=True)
|
|||
|
|
trimmed = np.zeros_like(out)
|
|||
|
|
for _, comp_idx in ranked[:keep_largest_blobs]:
|
|||
|
|
trimmed[labels == comp_idx] = 255
|
|||
|
|
out = trimmed
|
|||
|
|
return out
|
|||
|
|
|
|||
|
|
|
|||
|
|
def apply_flow_mask(flow_bgr: np.ndarray, mask_u8: np.ndarray) -> np.ndarray:
|
|||
|
|
"""掩膜外置黑;三通道与 mask 相乘。"""
|
|||
|
|
m = (mask_u8.astype(np.float32) / 255.0)[..., np.newaxis]
|
|||
|
|
return np.clip(flow_bgr.astype(np.float32) * m, 0, 255).astype(np.uint8)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def draw_flow_arrows(
|
|||
|
|
flow: np.ndarray,
|
|||
|
|
canvas_bgr: np.ndarray,
|
|||
|
|
*,
|
|||
|
|
mask_u8: np.ndarray | None,
|
|||
|
|
step: int,
|
|||
|
|
scale: float,
|
|||
|
|
min_magnitude: float,
|
|||
|
|
color: tuple[int, int, int],
|
|||
|
|
thickness: int,
|
|||
|
|
) -> np.ndarray:
|
|||
|
|
"""
|
|||
|
|
将稠密光流按网格采样绘制为箭头,便于精确观察局部方向与速度。
|
|||
|
|
"""
|
|||
|
|
out = canvas_bgr.copy()
|
|||
|
|
h, w = flow.shape[:2]
|
|||
|
|
s = max(2, int(step))
|
|||
|
|
t = max(1, int(thickness))
|
|||
|
|
mag2_thr = float(min_magnitude) * float(min_magnitude)
|
|||
|
|
|
|||
|
|
for y in range(s // 2, h, s):
|
|||
|
|
for x in range(s // 2, w, s):
|
|||
|
|
if mask_u8 is not None and mask_u8[y, x] == 0:
|
|||
|
|
continue
|
|||
|
|
fx = float(flow[y, x, 0])
|
|||
|
|
fy = float(flow[y, x, 1])
|
|||
|
|
if fx * fx + fy * fy < mag2_thr:
|
|||
|
|
continue
|
|||
|
|
x2 = int(round(x + fx * scale))
|
|||
|
|
y2 = int(round(y + fy * scale))
|
|||
|
|
x2 = max(0, min(w - 1, x2))
|
|||
|
|
y2 = max(0, min(h - 1, y2))
|
|||
|
|
cv2.arrowedLine(
|
|||
|
|
out,
|
|||
|
|
(x, y),
|
|||
|
|
(x2, y2),
|
|||
|
|
color=color,
|
|||
|
|
thickness=t,
|
|||
|
|
tipLength=0.3,
|
|||
|
|
)
|
|||
|
|
return out
|
|||
|
|
|
|||
|
|
|
|||
|
|
def build_output_frame(
|
|||
|
|
frame_bgr: np.ndarray,
|
|||
|
|
flow_bgr: np.ndarray,
|
|||
|
|
mode: str,
|
|||
|
|
) -> np.ndarray:
|
|||
|
|
if mode == "sidebyside":
|
|||
|
|
return np.hstack([frame_bgr, flow_bgr])
|
|||
|
|
if mode == "overlay":
|
|||
|
|
return cv2.addWeighted(frame_bgr, 0.55, flow_bgr, 0.45, 0.0)
|
|||
|
|
raise ValueError(f"未知 mode: {mode}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _try_gst_nvenc_writer(
|
|||
|
|
path: Path,
|
|||
|
|
fps: float,
|
|||
|
|
size: tuple[int, int],
|
|||
|
|
) -> cv2.VideoWriter | None:
|
|||
|
|
"""Try to create a cv2.VideoWriter backed by GStreamer + Jetson NVENC.
|
|||
|
|
|
|||
|
|
Returns None if GStreamer or the hardware encoder is unavailable.
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
if not hasattr(cv2, "CAP_GSTREAMER"):
|
|||
|
|
return None
|
|||
|
|
w, h = size
|
|||
|
|
loc = str(path).replace('"', '\\"')
|
|||
|
|
gst_pipe = (
|
|||
|
|
f'appsrc ! videoconvert ! video/x-raw,format=BGRx ! '
|
|||
|
|
f'nvvidconv ! video/x-raw(memory:NVMM) ! '
|
|||
|
|
f'nvv4l2h264enc bitrate=4000000 ! h264parse ! '
|
|||
|
|
f'mp4mux ! filesink location="{loc}"'
|
|||
|
|
)
|
|||
|
|
writer = cv2.VideoWriter(gst_pipe, cv2.CAP_GSTREAMER, 0, fps, (w, h))
|
|||
|
|
if writer.isOpened():
|
|||
|
|
_log.info("[optical-flow] using GStreamer NVENC writer for %s", path.name)
|
|||
|
|
return writer
|
|||
|
|
writer.release()
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
|
|||
|
|
def open_writer(
|
|||
|
|
path: Path,
|
|||
|
|
fps: float,
|
|||
|
|
size: tuple[int, int],
|
|||
|
|
fourcc_str: str,
|
|||
|
|
) -> cv2.VideoWriter:
|
|||
|
|
gst = _try_gst_nvenc_writer(path, fps, size)
|
|||
|
|
if gst is not None:
|
|||
|
|
return gst
|
|||
|
|
fourcc = cv2.VideoWriter_fourcc(*fourcc_str)
|
|||
|
|
writer = cv2.VideoWriter(str(path), fourcc, fps, size)
|
|||
|
|
if not writer.isOpened():
|
|||
|
|
raise RuntimeError(
|
|||
|
|
f"无法创建输出视频: {path}(codec={fourcc_str})。"
|
|||
|
|
"可尝试 --fourcc avc1 或 XVID 并配合 .avi 扩展名。"
|
|||
|
|
)
|
|||
|
|
return writer
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _validate_flow_args(args: Any) -> None:
|
|||
|
|
if not isinstance(args.resize, (int, float)) or float(args.resize) <= 0:
|
|||
|
|
raise ValueError("--resize 必须为正数")
|
|||
|
|
if not 0 < float(args.bright_percentile) < 100:
|
|||
|
|
raise ValueError("--bright-percentile 应在 (0, 100) 内")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _run_flow_core(args: Any) -> None:
|
|||
|
|
"""稠密光流 + 可视化;``args`` 需含与 CLI 相同的字段(含 ``input`` / ``output`` 路径)。"""
|
|||
|
|
in_path = Path(args.input)
|
|||
|
|
out_path = Path(args.output)
|
|||
|
|
if not in_path.is_file():
|
|||
|
|
raise FileNotFoundError(f"找不到输入视频: {in_path}")
|
|||
|
|
|
|||
|
|
src_size_mb = in_path.stat().st_size / (1024 * 1024)
|
|||
|
|
_log.info("[optical-flow] start: %s (%.1f MB), mode=%s, resize=%.2f",
|
|||
|
|
in_path.name, src_size_mb, args.mode, args.resize)
|
|||
|
|
t0 = time.monotonic()
|
|||
|
|
|
|||
|
|
cap = cv2.VideoCapture(str(in_path))
|
|||
|
|
if not cap.isOpened():
|
|||
|
|
raise RuntimeError(f"无法打开视频: {in_path}")
|
|||
|
|
|
|||
|
|
writer: cv2.VideoWriter | None = None
|
|||
|
|
try:
|
|||
|
|
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
|
|||
|
|
w_in = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|||
|
|
h_in = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|||
|
|
_log.info("[optical-flow] src %dx%d @ %.1f fps", w_in, h_in, fps)
|
|||
|
|
scale = float(args.resize)
|
|||
|
|
|
|||
|
|
w = max(1, int(round(w_in * scale)))
|
|||
|
|
h = max(1, int(round(h_in * scale)))
|
|||
|
|
|
|||
|
|
if args.mode == "sidebyside":
|
|||
|
|
out_w, out_h = w * 2, h
|
|||
|
|
else:
|
|||
|
|
out_w, out_h = w, h
|
|||
|
|
|
|||
|
|
writer = open_writer(out_path, fps, (out_w, out_h), args.fourcc)
|
|||
|
|
|
|||
|
|
ret, prev_bgr = cap.read()
|
|||
|
|
if not ret:
|
|||
|
|
raise RuntimeError("视频为空或无法读取首帧")
|
|||
|
|
|
|||
|
|
if scale != 1.0:
|
|||
|
|
prev_bgr = cv2.resize(prev_bgr, (w, h), interpolation=cv2.INTER_AREA)
|
|||
|
|
prev_gray = cv2.cvtColor(prev_bgr, cv2.COLOR_BGR2GRAY)
|
|||
|
|
|
|||
|
|
black_flow = np.zeros((h, w, 3), dtype=np.uint8)
|
|||
|
|
first_out = build_output_frame(prev_bgr, black_flow, args.mode)
|
|||
|
|
writer.write(first_out)
|
|||
|
|
written = 1
|
|||
|
|
frame_idx = 0
|
|||
|
|
|
|||
|
|
fb_flags = 0
|
|||
|
|
progress_log = bool(getattr(args, "progress_log", False))
|
|||
|
|
while True:
|
|||
|
|
ret, frame_bgr = cap.read()
|
|||
|
|
if not ret:
|
|||
|
|
break
|
|||
|
|
if scale != 1.0:
|
|||
|
|
frame_bgr = cv2.resize(frame_bgr, (w, h), interpolation=cv2.INTER_AREA)
|
|||
|
|
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
|
|||
|
|
|
|||
|
|
flow = cv2.calcOpticalFlowFarneback(
|
|||
|
|
prev_gray,
|
|||
|
|
gray,
|
|||
|
|
None,
|
|||
|
|
args.pyr_scale,
|
|||
|
|
args.levels,
|
|||
|
|
args.winsize,
|
|||
|
|
args.iterations,
|
|||
|
|
args.poly_n,
|
|||
|
|
args.poly_sigma,
|
|||
|
|
fb_flags,
|
|||
|
|
)
|
|||
|
|
mask: np.ndarray | None = None
|
|||
|
|
if args.fish_mask:
|
|||
|
|
mask = build_fish_mask(
|
|||
|
|
gray,
|
|||
|
|
bright_percentile=args.bright_percentile,
|
|||
|
|
min_blob_area=args.min_blob_area,
|
|||
|
|
open_k=args.mask_open,
|
|||
|
|
close_k=args.mask_close,
|
|||
|
|
dilate_k=args.mask_dilate,
|
|||
|
|
blur_sigma=args.mask_blur,
|
|||
|
|
keep_largest_blobs=args.keep_largest_blobs,
|
|||
|
|
)
|
|||
|
|
has_mask = np.count_nonzero(mask) > 0
|
|||
|
|
else:
|
|||
|
|
has_mask = False
|
|||
|
|
|
|||
|
|
if args.viz_style in ("hsv", "hsv_arrows"):
|
|||
|
|
if args.fish_mask and has_mask and mask is not None:
|
|||
|
|
hsv_bgr = flow_to_bgr(flow, valid_mask=mask)
|
|||
|
|
hsv_bgr = apply_flow_mask(hsv_bgr, mask)
|
|||
|
|
elif args.fish_mask:
|
|||
|
|
hsv_bgr = np.zeros((h, w, 3), dtype=np.uint8)
|
|||
|
|
else:
|
|||
|
|
hsv_bgr = flow_to_bgr(flow)
|
|||
|
|
else:
|
|||
|
|
hsv_bgr = np.zeros((h, w, 3), dtype=np.uint8)
|
|||
|
|
|
|||
|
|
if args.viz_style in ("arrows", "hsv_arrows"):
|
|||
|
|
arrow_base = np.zeros((h, w, 3), dtype=np.uint8)
|
|||
|
|
arrow_bgr = draw_flow_arrows(
|
|||
|
|
flow,
|
|||
|
|
arrow_base,
|
|||
|
|
mask_u8=mask if (args.fish_mask and has_mask) else None,
|
|||
|
|
step=args.arrow_step,
|
|||
|
|
scale=args.arrow_scale,
|
|||
|
|
min_magnitude=args.arrow_threshold,
|
|||
|
|
color=(0, 255, 255),
|
|||
|
|
thickness=args.arrow_thickness,
|
|||
|
|
)
|
|||
|
|
else:
|
|||
|
|
arrow_bgr = np.zeros((h, w, 3), dtype=np.uint8)
|
|||
|
|
|
|||
|
|
flow_bgr = cv2.addWeighted(hsv_bgr, 0.8, arrow_bgr, 1.0, 0.0)
|
|||
|
|
out_frame = build_output_frame(frame_bgr, flow_bgr, args.mode)
|
|||
|
|
writer.write(out_frame)
|
|||
|
|
written += 1
|
|||
|
|
|
|||
|
|
prev_gray = gray
|
|||
|
|
frame_idx += 1
|
|||
|
|
if progress_log and frame_idx % 30 == 0:
|
|||
|
|
print(f"已处理 {frame_idx} 帧光流…", flush=True)
|
|||
|
|
|
|||
|
|
elapsed = time.monotonic() - t0
|
|||
|
|
_log.info("[optical-flow] done: %d frames in %.1fs (%.1f fps) -> %s",
|
|||
|
|
written, elapsed, written / max(elapsed, 0.001), out_path.name)
|
|||
|
|
if progress_log:
|
|||
|
|
print(f"完成,共写入 {written} 帧({elapsed:.1f}s),保存至: {out_path}", flush=True)
|
|||
|
|
finally:
|
|||
|
|
cap.release()
|
|||
|
|
if writer is not None:
|
|||
|
|
writer.release()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def run_optical_flow_video(
|
|||
|
|
input_path: Path,
|
|||
|
|
output_path: Path,
|
|||
|
|
*,
|
|||
|
|
mode: str = "overlay",
|
|||
|
|
viz_style: str = "hsv",
|
|||
|
|
resize: float = 1.0,
|
|||
|
|
fourcc: str = "mp4v",
|
|||
|
|
pyr_scale: float = 0.5,
|
|||
|
|
levels: int = 3,
|
|||
|
|
winsize: int = 15,
|
|||
|
|
iterations: int = 3,
|
|||
|
|
poly_n: int = 5,
|
|||
|
|
poly_sigma: float = 1.2,
|
|||
|
|
fish_mask: bool = True,
|
|||
|
|
bright_percentile: float = 97.5,
|
|||
|
|
min_blob_area: int = 500,
|
|||
|
|
mask_open: int = 3,
|
|||
|
|
mask_close: int = 11,
|
|||
|
|
mask_dilate: int = 5,
|
|||
|
|
mask_blur: float = 1.0,
|
|||
|
|
keep_largest_blobs: int = 0,
|
|||
|
|
arrow_step: int = 12,
|
|||
|
|
arrow_scale: float = 2.0,
|
|||
|
|
arrow_threshold: float = 0.8,
|
|||
|
|
arrow_thickness: int = 1,
|
|||
|
|
progress_log: bool = False,
|
|||
|
|
) -> bool:
|
|||
|
|
"""供 fish_api 等调用:成功写出 ``output_path`` 返回 True,否则 False。"""
|
|||
|
|
args = SimpleNamespace(
|
|||
|
|
input=input_path,
|
|||
|
|
output=output_path,
|
|||
|
|
mode=mode,
|
|||
|
|
viz_style=viz_style,
|
|||
|
|
resize=resize,
|
|||
|
|
fourcc=fourcc,
|
|||
|
|
pyr_scale=pyr_scale,
|
|||
|
|
levels=levels,
|
|||
|
|
winsize=winsize,
|
|||
|
|
iterations=iterations,
|
|||
|
|
poly_n=poly_n,
|
|||
|
|
poly_sigma=poly_sigma,
|
|||
|
|
fish_mask=fish_mask,
|
|||
|
|
bright_percentile=bright_percentile,
|
|||
|
|
min_blob_area=min_blob_area,
|
|||
|
|
mask_open=mask_open,
|
|||
|
|
mask_close=mask_close,
|
|||
|
|
mask_dilate=mask_dilate,
|
|||
|
|
mask_blur=mask_blur,
|
|||
|
|
keep_largest_blobs=keep_largest_blobs,
|
|||
|
|
arrow_step=arrow_step,
|
|||
|
|
arrow_scale=arrow_scale,
|
|||
|
|
arrow_threshold=arrow_threshold,
|
|||
|
|
arrow_thickness=arrow_thickness,
|
|||
|
|
progress_log=progress_log,
|
|||
|
|
)
|
|||
|
|
try:
|
|||
|
|
_validate_flow_args(args)
|
|||
|
|
_run_flow_core(args)
|
|||
|
|
except Exception:
|
|||
|
|
_log.exception("[optical-flow] run_optical_flow_video failed: %s -> %s",
|
|||
|
|
input_path, output_path)
|
|||
|
|
return False
|
|||
|
|
return output_path.is_file() and output_path.stat().st_size > 0
|
|||
|
|
|
|||
|
|
|
|||
|
|
def main() -> None:
|
|||
|
|
script_dir = Path(__file__).resolve().parent
|
|||
|
|
default_in = script_dir / "fish_echo.MP4"
|
|||
|
|
default_out = script_dir / "fish_echo_flow_vis.mp4"
|
|||
|
|
|
|||
|
|
p = argparse.ArgumentParser(description="视频稠密光流可视化(OpenCV Farneback)")
|
|||
|
|
p.add_argument(
|
|||
|
|
"--input",
|
|||
|
|
"-i",
|
|||
|
|
type=Path,
|
|||
|
|
default=default_in,
|
|||
|
|
help=f"输入视频路径(默认: {default_in.name})",
|
|||
|
|
)
|
|||
|
|
p.add_argument(
|
|||
|
|
"--output",
|
|||
|
|
"-o",
|
|||
|
|
type=Path,
|
|||
|
|
default=default_out,
|
|||
|
|
help=f"输出视频路径(默认: {default_out.name})",
|
|||
|
|
)
|
|||
|
|
p.add_argument(
|
|||
|
|
"--mode",
|
|||
|
|
choices=("sidebyside", "overlay"),
|
|||
|
|
default="sidebyside",
|
|||
|
|
help="sidebyside: 原图|光流;overlay: 原图与光流半透明叠加",
|
|||
|
|
)
|
|||
|
|
p.add_argument(
|
|||
|
|
"--viz-style",
|
|||
|
|
choices=("hsv", "arrows", "hsv_arrows"),
|
|||
|
|
default="hsv",
|
|||
|
|
help="光流可视化风格:hsv 伪彩色、arrows 箭头、hsv_arrows 组合",
|
|||
|
|
)
|
|||
|
|
p.add_argument(
|
|||
|
|
"--resize",
|
|||
|
|
type=float,
|
|||
|
|
default=1.0,
|
|||
|
|
metavar="SCALE",
|
|||
|
|
help="处理前将帧宽高乘以该比例以加速(例如 0.5)",
|
|||
|
|
)
|
|||
|
|
p.add_argument(
|
|||
|
|
"--fourcc",
|
|||
|
|
default="mp4v",
|
|||
|
|
help="VideoWriter 四字符编码,常见: mp4v, avc1, XVID",
|
|||
|
|
)
|
|||
|
|
p.add_argument(
|
|||
|
|
"--pyr-scale",
|
|||
|
|
type=float,
|
|||
|
|
default=0.5,
|
|||
|
|
help="Farneback 金字塔缩放(OpenCV pyr_scale)",
|
|||
|
|
)
|
|||
|
|
p.add_argument(
|
|||
|
|
"--levels",
|
|||
|
|
type=int,
|
|||
|
|
default=3,
|
|||
|
|
help="Farneback 金字塔层数",
|
|||
|
|
)
|
|||
|
|
p.add_argument(
|
|||
|
|
"--winsize",
|
|||
|
|
type=int,
|
|||
|
|
default=15,
|
|||
|
|
help="Farneback 窗口大小",
|
|||
|
|
)
|
|||
|
|
p.add_argument(
|
|||
|
|
"--iterations",
|
|||
|
|
type=int,
|
|||
|
|
default=3,
|
|||
|
|
help="Farneback 每层迭代次数",
|
|||
|
|
)
|
|||
|
|
p.add_argument(
|
|||
|
|
"--poly-n",
|
|||
|
|
type=int,
|
|||
|
|
default=5,
|
|||
|
|
help="Farneback 像素邻域大小(poly_n)",
|
|||
|
|
)
|
|||
|
|
p.add_argument(
|
|||
|
|
"--poly-sigma",
|
|||
|
|
type=float,
|
|||
|
|
default=1.2,
|
|||
|
|
help="Farneback 高斯标准差(poly_sigma)",
|
|||
|
|
)
|
|||
|
|
p.add_argument(
|
|||
|
|
"--fish-mask",
|
|||
|
|
dest="fish_mask",
|
|||
|
|
action="store_true",
|
|||
|
|
default=True,
|
|||
|
|
help="仅在高亮鱼团区域显示光流(默认开启,抑制背景噪声)",
|
|||
|
|
)
|
|||
|
|
p.add_argument(
|
|||
|
|
"--no-fish-mask",
|
|||
|
|
dest="fish_mask",
|
|||
|
|
action="store_false",
|
|||
|
|
help="关闭鱼团掩膜,整幅画面显示光流(与旧行为一致)",
|
|||
|
|
)
|
|||
|
|
p.add_argument(
|
|||
|
|
"--bright-percentile",
|
|||
|
|
type=float,
|
|||
|
|
default=97.5,
|
|||
|
|
metavar="P",
|
|||
|
|
help="灰度阈值:取 >= 该分位数的像素作为「亮斑」候选(越高越只保留最亮区域)",
|
|||
|
|
)
|
|||
|
|
p.add_argument(
|
|||
|
|
"--min-blob-area",
|
|||
|
|
type=int,
|
|||
|
|
default=500,
|
|||
|
|
metavar="PX",
|
|||
|
|
help="连通域最小面积(像素),小于此面积的亮斑视为噪声并丢弃",
|
|||
|
|
)
|
|||
|
|
p.add_argument(
|
|||
|
|
"--mask-open",
|
|||
|
|
type=int,
|
|||
|
|
default=3,
|
|||
|
|
help="形态学开运算核大小(奇数,0 表示跳过),去小噪点",
|
|||
|
|
)
|
|||
|
|
p.add_argument(
|
|||
|
|
"--mask-close",
|
|||
|
|
type=int,
|
|||
|
|
default=11,
|
|||
|
|
help="形态学闭运算核大小(奇数,0 表示跳过),连接同一鱼团碎片",
|
|||
|
|
)
|
|||
|
|
p.add_argument(
|
|||
|
|
"--mask-dilate",
|
|||
|
|
type=int,
|
|||
|
|
default=5,
|
|||
|
|
help="掩膜膨胀核大小(奇数,0 表示不膨胀),略扩大显示区域包住边缘运动",
|
|||
|
|
)
|
|||
|
|
p.add_argument(
|
|||
|
|
"--mask-blur",
|
|||
|
|
type=float,
|
|||
|
|
default=1.0,
|
|||
|
|
help="阈值前高斯模糊 sigma(0 表示不模糊),可平滑细碎纹理",
|
|||
|
|
)
|
|||
|
|
p.add_argument(
|
|||
|
|
"--keep-largest-blobs",
|
|||
|
|
type=int,
|
|||
|
|
default=0,
|
|||
|
|
metavar="N",
|
|||
|
|
help="在面积过滤后仅保留面积最大的 N 个连通域(0 表示不限制,适合多鱼场景)",
|
|||
|
|
)
|
|||
|
|
p.add_argument(
|
|||
|
|
"--arrow-step",
|
|||
|
|
type=int,
|
|||
|
|
default=12,
|
|||
|
|
help="箭头网格步长(像素),越小越密",
|
|||
|
|
)
|
|||
|
|
p.add_argument(
|
|||
|
|
"--arrow-scale",
|
|||
|
|
type=float,
|
|||
|
|
default=2.0,
|
|||
|
|
help="箭头长度缩放系数",
|
|||
|
|
)
|
|||
|
|
p.add_argument(
|
|||
|
|
"--arrow-threshold",
|
|||
|
|
type=float,
|
|||
|
|
default=0.8,
|
|||
|
|
help="绘制箭头的最小速度阈值(像素/帧)",
|
|||
|
|
)
|
|||
|
|
p.add_argument(
|
|||
|
|
"--arrow-thickness",
|
|||
|
|
type=int,
|
|||
|
|
default=1,
|
|||
|
|
help="箭头线宽",
|
|||
|
|
)
|
|||
|
|
args = p.parse_args()
|
|||
|
|
if not args.input.is_file():
|
|||
|
|
raise SystemExit(f"找不到输入视频: {args.input}")
|
|||
|
|
try:
|
|||
|
|
_validate_flow_args(args)
|
|||
|
|
except ValueError as e:
|
|||
|
|
raise SystemExit(str(e)) from e
|
|||
|
|
args.progress_log = True
|
|||
|
|
try:
|
|||
|
|
_run_flow_core(args)
|
|||
|
|
except FileNotFoundError as e:
|
|||
|
|
raise SystemExit(str(e)) from e
|
|||
|
|
except RuntimeError as e:
|
|||
|
|
raise SystemExit(str(e)) from e
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
main()
|