#!/usr/bin/env python3 """ 从视频读取连续帧,使用 OpenCV Farneback 稠密光流估计运动, 输出带光流伪彩色(方向=色调、速度=亮度)的可视化视频。 声呐类画面背景噪声也会在光流里产生响应;可用 --fish-mask 仅在高亮「鱼团」 区域显示光流,并用连通域面积过滤掉细碎亮点。 """ from __future__ import annotations import argparse import logging import time from pathlib import Path from types import SimpleNamespace from typing import Any import cv2 import numpy as np _log = logging.getLogger("optical_flow") def flow_to_bgr( flow: np.ndarray, mag_clip_percentile: float = 95.0, valid_mask: np.ndarray | None = None, ) -> np.ndarray: """将 (H,W,2) 光流转为 BGR 伪彩色图。若给定 valid_mask,幅值分位数仅在掩膜内统计。""" fx = flow[..., 0].astype(np.float32) fy = flow[..., 1].astype(np.float32) mag = np.sqrt(fx * fx + fy * fy) ang = np.arctan2(fy, fx) if valid_mask is not None and valid_mask.size > 0: vm = valid_mask > 0 if np.any(vm): clip = float(np.percentile(mag[vm], mag_clip_percentile)) else: clip = float(np.percentile(mag, mag_clip_percentile)) else: clip = float(np.percentile(mag, mag_clip_percentile)) if clip < 1e-6: clip = 1e-6 mag_norm = np.clip(mag / clip, 0.0, 1.0) h, w = flow.shape[:2] hsv = np.zeros((h, w, 3), dtype=np.float32) hsv[..., 0] = (ang + np.pi) / (2.0 * np.pi) * 179.0 hsv[..., 1] = 255.0 hsv[..., 2] = mag_norm * 255.0 hsv_u8 = hsv.astype(np.uint8) return cv2.cvtColor(hsv_u8, cv2.COLOR_HSV2BGR) def _odd_k(k: int) -> int: k = max(1, int(k)) return k if k % 2 == 1 else k + 1 def build_fish_mask( gray_u8: np.ndarray, *, bright_percentile: float, min_blob_area: int, open_k: int, close_k: int, dilate_k: int, blur_sigma: float, keep_largest_blobs: int = 0, ) -> np.ndarray: """ 声呐/暗背景高亮目标:取灰度高分位作为阈值,形态学去噪,保留足够大的连通域。 """ g = gray_u8 if blur_sigma > 1e-6: k = _odd_k(int(round(blur_sigma * 6)) | 1) k = max(3, min(k, 31)) g = cv2.GaussianBlur(g, (k, k), blur_sigma) thr = float(np.percentile(g.astype(np.float32), bright_percentile)) binary = (g.astype(np.float32) >= thr).astype(np.uint8) * 255 if open_k > 0: ok = _odd_k(open_k) kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (ok, ok)) binary = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel) if close_k > 0: ck = _odd_k(close_k) kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (ck, ck)) binary = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel) num, labels, stats, _ = cv2.connectedComponentsWithStats(binary, connectivity=8) out = np.zeros_like(binary) for i in range(1, num): if stats[i, cv2.CC_STAT_AREA] >= min_blob_area: out[labels == i] = 255 if dilate_k > 0: dk = _odd_k(dilate_k) kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (dk, dk)) out = cv2.dilate(out, kernel) if keep_largest_blobs > 0: num, labels, stats, _ = cv2.connectedComponentsWithStats(out, connectivity=8) ranked: list[tuple[int, int]] = [] for i in range(1, num): ranked.append((int(stats[i, cv2.CC_STAT_AREA]), i)) ranked.sort(reverse=True) trimmed = np.zeros_like(out) for _, comp_idx in ranked[:keep_largest_blobs]: trimmed[labels == comp_idx] = 255 out = trimmed return out def apply_flow_mask(flow_bgr: np.ndarray, mask_u8: np.ndarray) -> np.ndarray: """掩膜外置黑;三通道与 mask 相乘。""" m = (mask_u8.astype(np.float32) / 255.0)[..., np.newaxis] return np.clip(flow_bgr.astype(np.float32) * m, 0, 255).astype(np.uint8) def draw_flow_arrows( flow: np.ndarray, canvas_bgr: np.ndarray, *, mask_u8: np.ndarray | None, step: int, scale: float, min_magnitude: float, color: tuple[int, int, int], thickness: int, ) -> np.ndarray: """ 将稠密光流按网格采样绘制为箭头,便于精确观察局部方向与速度。 """ out = canvas_bgr.copy() h, w = flow.shape[:2] s = max(2, int(step)) t = max(1, int(thickness)) mag2_thr = float(min_magnitude) * float(min_magnitude) for y in range(s // 2, h, s): for x in range(s // 2, w, s): if mask_u8 is not None and mask_u8[y, x] == 0: continue fx = float(flow[y, x, 0]) fy = float(flow[y, x, 1]) if fx * fx + fy * fy < mag2_thr: continue x2 = int(round(x + fx * scale)) y2 = int(round(y + fy * scale)) x2 = max(0, min(w - 1, x2)) y2 = max(0, min(h - 1, y2)) cv2.arrowedLine( out, (x, y), (x2, y2), color=color, thickness=t, tipLength=0.3, ) return out def build_output_frame( frame_bgr: np.ndarray, flow_bgr: np.ndarray, mode: str, ) -> np.ndarray: if mode == "sidebyside": return np.hstack([frame_bgr, flow_bgr]) if mode == "overlay": return cv2.addWeighted(frame_bgr, 0.55, flow_bgr, 0.45, 0.0) raise ValueError(f"未知 mode: {mode}") def _try_gst_nvenc_writer( path: Path, fps: float, size: tuple[int, int], ) -> cv2.VideoWriter | None: """Try to create a cv2.VideoWriter backed by GStreamer + Jetson NVENC. Returns None if GStreamer or the hardware encoder is unavailable. """ try: if not hasattr(cv2, "CAP_GSTREAMER"): return None w, h = size loc = str(path).replace('"', '\\"') gst_pipe = ( f'appsrc ! videoconvert ! video/x-raw,format=BGRx ! ' f'nvvidconv ! video/x-raw(memory:NVMM) ! ' f'nvv4l2h264enc bitrate=4000000 ! h264parse ! ' f'mp4mux ! filesink location="{loc}"' ) writer = cv2.VideoWriter(gst_pipe, cv2.CAP_GSTREAMER, 0, fps, (w, h)) if writer.isOpened(): _log.info("[optical-flow] using GStreamer NVENC writer for %s", path.name) return writer writer.release() except Exception: pass return None def open_writer( path: Path, fps: float, size: tuple[int, int], fourcc_str: str, ) -> cv2.VideoWriter: gst = _try_gst_nvenc_writer(path, fps, size) if gst is not None: return gst fourcc = cv2.VideoWriter_fourcc(*fourcc_str) writer = cv2.VideoWriter(str(path), fourcc, fps, size) if not writer.isOpened(): raise RuntimeError( f"无法创建输出视频: {path}(codec={fourcc_str})。" "可尝试 --fourcc avc1 或 XVID 并配合 .avi 扩展名。" ) return writer def _validate_flow_args(args: Any) -> None: if not isinstance(args.resize, (int, float)) or float(args.resize) <= 0: raise ValueError("--resize 必须为正数") if not 0 < float(args.bright_percentile) < 100: raise ValueError("--bright-percentile 应在 (0, 100) 内") def _run_flow_core(args: Any) -> None: """稠密光流 + 可视化;``args`` 需含与 CLI 相同的字段(含 ``input`` / ``output`` 路径)。""" in_path = Path(args.input) out_path = Path(args.output) if not in_path.is_file(): raise FileNotFoundError(f"找不到输入视频: {in_path}") src_size_mb = in_path.stat().st_size / (1024 * 1024) _log.info("[optical-flow] start: %s (%.1f MB), mode=%s, resize=%.2f", in_path.name, src_size_mb, args.mode, args.resize) t0 = time.monotonic() cap = cv2.VideoCapture(str(in_path)) if not cap.isOpened(): raise RuntimeError(f"无法打开视频: {in_path}") writer: cv2.VideoWriter | None = None try: fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 w_in = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h_in = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) _log.info("[optical-flow] src %dx%d @ %.1f fps", w_in, h_in, fps) scale = float(args.resize) w = max(1, int(round(w_in * scale))) h = max(1, int(round(h_in * scale))) if args.mode == "sidebyside": out_w, out_h = w * 2, h else: out_w, out_h = w, h writer = open_writer(out_path, fps, (out_w, out_h), args.fourcc) ret, prev_bgr = cap.read() if not ret: raise RuntimeError("视频为空或无法读取首帧") if scale != 1.0: prev_bgr = cv2.resize(prev_bgr, (w, h), interpolation=cv2.INTER_AREA) prev_gray = cv2.cvtColor(prev_bgr, cv2.COLOR_BGR2GRAY) black_flow = np.zeros((h, w, 3), dtype=np.uint8) first_out = build_output_frame(prev_bgr, black_flow, args.mode) writer.write(first_out) written = 1 frame_idx = 0 fb_flags = 0 progress_log = bool(getattr(args, "progress_log", False)) while True: ret, frame_bgr = cap.read() if not ret: break if scale != 1.0: frame_bgr = cv2.resize(frame_bgr, (w, h), interpolation=cv2.INTER_AREA) gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY) flow = cv2.calcOpticalFlowFarneback( prev_gray, gray, None, args.pyr_scale, args.levels, args.winsize, args.iterations, args.poly_n, args.poly_sigma, fb_flags, ) mask: np.ndarray | None = None if args.fish_mask: mask = build_fish_mask( gray, bright_percentile=args.bright_percentile, min_blob_area=args.min_blob_area, open_k=args.mask_open, close_k=args.mask_close, dilate_k=args.mask_dilate, blur_sigma=args.mask_blur, keep_largest_blobs=args.keep_largest_blobs, ) has_mask = np.count_nonzero(mask) > 0 else: has_mask = False if args.viz_style in ("hsv", "hsv_arrows"): if args.fish_mask and has_mask and mask is not None: hsv_bgr = flow_to_bgr(flow, valid_mask=mask) hsv_bgr = apply_flow_mask(hsv_bgr, mask) elif args.fish_mask: hsv_bgr = np.zeros((h, w, 3), dtype=np.uint8) else: hsv_bgr = flow_to_bgr(flow) else: hsv_bgr = np.zeros((h, w, 3), dtype=np.uint8) if args.viz_style in ("arrows", "hsv_arrows"): arrow_base = np.zeros((h, w, 3), dtype=np.uint8) arrow_bgr = draw_flow_arrows( flow, arrow_base, mask_u8=mask if (args.fish_mask and has_mask) else None, step=args.arrow_step, scale=args.arrow_scale, min_magnitude=args.arrow_threshold, color=(0, 255, 255), thickness=args.arrow_thickness, ) else: arrow_bgr = np.zeros((h, w, 3), dtype=np.uint8) flow_bgr = cv2.addWeighted(hsv_bgr, 0.8, arrow_bgr, 1.0, 0.0) out_frame = build_output_frame(frame_bgr, flow_bgr, args.mode) writer.write(out_frame) written += 1 prev_gray = gray frame_idx += 1 if progress_log and frame_idx % 30 == 0: print(f"已处理 {frame_idx} 帧光流…", flush=True) elapsed = time.monotonic() - t0 _log.info("[optical-flow] done: %d frames in %.1fs (%.1f fps) -> %s", written, elapsed, written / max(elapsed, 0.001), out_path.name) if progress_log: print(f"完成,共写入 {written} 帧({elapsed:.1f}s),保存至: {out_path}", flush=True) finally: cap.release() if writer is not None: writer.release() def run_optical_flow_video( input_path: Path, output_path: Path, *, mode: str = "overlay", viz_style: str = "hsv", resize: float = 1.0, fourcc: str = "mp4v", pyr_scale: float = 0.5, levels: int = 3, winsize: int = 15, iterations: int = 3, poly_n: int = 5, poly_sigma: float = 1.2, fish_mask: bool = True, bright_percentile: float = 97.5, min_blob_area: int = 500, mask_open: int = 3, mask_close: int = 11, mask_dilate: int = 5, mask_blur: float = 1.0, keep_largest_blobs: int = 0, arrow_step: int = 12, arrow_scale: float = 2.0, arrow_threshold: float = 0.8, arrow_thickness: int = 1, progress_log: bool = False, ) -> bool: """供 fish_api 等调用:成功写出 ``output_path`` 返回 True,否则 False。""" args = SimpleNamespace( input=input_path, output=output_path, mode=mode, viz_style=viz_style, resize=resize, fourcc=fourcc, pyr_scale=pyr_scale, levels=levels, winsize=winsize, iterations=iterations, poly_n=poly_n, poly_sigma=poly_sigma, fish_mask=fish_mask, bright_percentile=bright_percentile, min_blob_area=min_blob_area, mask_open=mask_open, mask_close=mask_close, mask_dilate=mask_dilate, mask_blur=mask_blur, keep_largest_blobs=keep_largest_blobs, arrow_step=arrow_step, arrow_scale=arrow_scale, arrow_threshold=arrow_threshold, arrow_thickness=arrow_thickness, progress_log=progress_log, ) try: _validate_flow_args(args) _run_flow_core(args) except Exception: _log.exception("[optical-flow] run_optical_flow_video failed: %s -> %s", input_path, output_path) return False return output_path.is_file() and output_path.stat().st_size > 0 def main() -> None: script_dir = Path(__file__).resolve().parent default_in = script_dir / "fish_echo.MP4" default_out = script_dir / "fish_echo_flow_vis.mp4" p = argparse.ArgumentParser(description="视频稠密光流可视化(OpenCV Farneback)") p.add_argument( "--input", "-i", type=Path, default=default_in, help=f"输入视频路径(默认: {default_in.name})", ) p.add_argument( "--output", "-o", type=Path, default=default_out, help=f"输出视频路径(默认: {default_out.name})", ) p.add_argument( "--mode", choices=("sidebyside", "overlay"), default="sidebyside", help="sidebyside: 原图|光流;overlay: 原图与光流半透明叠加", ) p.add_argument( "--viz-style", choices=("hsv", "arrows", "hsv_arrows"), default="hsv", help="光流可视化风格:hsv 伪彩色、arrows 箭头、hsv_arrows 组合", ) p.add_argument( "--resize", type=float, default=1.0, metavar="SCALE", help="处理前将帧宽高乘以该比例以加速(例如 0.5)", ) p.add_argument( "--fourcc", default="mp4v", help="VideoWriter 四字符编码,常见: mp4v, avc1, XVID", ) p.add_argument( "--pyr-scale", type=float, default=0.5, help="Farneback 金字塔缩放(OpenCV pyr_scale)", ) p.add_argument( "--levels", type=int, default=3, help="Farneback 金字塔层数", ) p.add_argument( "--winsize", type=int, default=15, help="Farneback 窗口大小", ) p.add_argument( "--iterations", type=int, default=3, help="Farneback 每层迭代次数", ) p.add_argument( "--poly-n", type=int, default=5, help="Farneback 像素邻域大小(poly_n)", ) p.add_argument( "--poly-sigma", type=float, default=1.2, help="Farneback 高斯标准差(poly_sigma)", ) p.add_argument( "--fish-mask", dest="fish_mask", action="store_true", default=True, help="仅在高亮鱼团区域显示光流(默认开启,抑制背景噪声)", ) p.add_argument( "--no-fish-mask", dest="fish_mask", action="store_false", help="关闭鱼团掩膜,整幅画面显示光流(与旧行为一致)", ) p.add_argument( "--bright-percentile", type=float, default=97.5, metavar="P", help="灰度阈值:取 >= 该分位数的像素作为「亮斑」候选(越高越只保留最亮区域)", ) p.add_argument( "--min-blob-area", type=int, default=500, metavar="PX", help="连通域最小面积(像素),小于此面积的亮斑视为噪声并丢弃", ) p.add_argument( "--mask-open", type=int, default=3, help="形态学开运算核大小(奇数,0 表示跳过),去小噪点", ) p.add_argument( "--mask-close", type=int, default=11, help="形态学闭运算核大小(奇数,0 表示跳过),连接同一鱼团碎片", ) p.add_argument( "--mask-dilate", type=int, default=5, help="掩膜膨胀核大小(奇数,0 表示不膨胀),略扩大显示区域包住边缘运动", ) p.add_argument( "--mask-blur", type=float, default=1.0, help="阈值前高斯模糊 sigma(0 表示不模糊),可平滑细碎纹理", ) p.add_argument( "--keep-largest-blobs", type=int, default=0, metavar="N", help="在面积过滤后仅保留面积最大的 N 个连通域(0 表示不限制,适合多鱼场景)", ) p.add_argument( "--arrow-step", type=int, default=12, help="箭头网格步长(像素),越小越密", ) p.add_argument( "--arrow-scale", type=float, default=2.0, help="箭头长度缩放系数", ) p.add_argument( "--arrow-threshold", type=float, default=0.8, help="绘制箭头的最小速度阈值(像素/帧)", ) p.add_argument( "--arrow-thickness", type=int, default=1, help="箭头线宽", ) args = p.parse_args() if not args.input.is_file(): raise SystemExit(f"找不到输入视频: {args.input}") try: _validate_flow_args(args) except ValueError as e: raise SystemExit(str(e)) from e args.progress_log = True try: _run_flow_core(args) except FileNotFoundError as e: raise SystemExit(str(e)) from e except RuntimeError as e: raise SystemExit(str(e)) from e if __name__ == "__main__": main()