Files
FishServer/FishMeasure/optical_flow/visualize_optical_flow.py

613 lines
19 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
从视频读取连续帧使用 OpenCV Farneback 稠密光流估计运动
输出带光流伪彩色方向=色调速度=亮度的可视化视频
声呐类画面背景噪声也会在光流里产生响应可用 --fish-mask 仅在高亮鱼团
区域显示光流并用连通域面积过滤掉细碎亮点
"""
from __future__ import annotations
import argparse
import logging
import time
from pathlib import Path
from types import SimpleNamespace
from typing import Any
import cv2
import numpy as np
_log = logging.getLogger("optical_flow")
def flow_to_bgr(
flow: np.ndarray,
mag_clip_percentile: float = 95.0,
valid_mask: np.ndarray | None = None,
) -> np.ndarray:
"""将 (H,W,2) 光流转为 BGR 伪彩色图。若给定 valid_mask幅值分位数仅在掩膜内统计。"""
fx = flow[..., 0].astype(np.float32)
fy = flow[..., 1].astype(np.float32)
mag = np.sqrt(fx * fx + fy * fy)
ang = np.arctan2(fy, fx)
if valid_mask is not None and valid_mask.size > 0:
vm = valid_mask > 0
if np.any(vm):
clip = float(np.percentile(mag[vm], mag_clip_percentile))
else:
clip = float(np.percentile(mag, mag_clip_percentile))
else:
clip = float(np.percentile(mag, mag_clip_percentile))
if clip < 1e-6:
clip = 1e-6
mag_norm = np.clip(mag / clip, 0.0, 1.0)
h, w = flow.shape[:2]
hsv = np.zeros((h, w, 3), dtype=np.float32)
hsv[..., 0] = (ang + np.pi) / (2.0 * np.pi) * 179.0
hsv[..., 1] = 255.0
hsv[..., 2] = mag_norm * 255.0
hsv_u8 = hsv.astype(np.uint8)
return cv2.cvtColor(hsv_u8, cv2.COLOR_HSV2BGR)
def _odd_k(k: int) -> int:
k = max(1, int(k))
return k if k % 2 == 1 else k + 1
def build_fish_mask(
gray_u8: np.ndarray,
*,
bright_percentile: float,
min_blob_area: int,
open_k: int,
close_k: int,
dilate_k: int,
blur_sigma: float,
keep_largest_blobs: int = 0,
) -> np.ndarray:
"""
声呐/暗背景高亮目标取灰度高分位作为阈值形态学去噪保留足够大的连通域
"""
g = gray_u8
if blur_sigma > 1e-6:
k = _odd_k(int(round(blur_sigma * 6)) | 1)
k = max(3, min(k, 31))
g = cv2.GaussianBlur(g, (k, k), blur_sigma)
thr = float(np.percentile(g.astype(np.float32), bright_percentile))
binary = (g.astype(np.float32) >= thr).astype(np.uint8) * 255
if open_k > 0:
ok = _odd_k(open_k)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (ok, ok))
binary = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel)
if close_k > 0:
ck = _odd_k(close_k)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (ck, ck))
binary = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel)
num, labels, stats, _ = cv2.connectedComponentsWithStats(binary, connectivity=8)
out = np.zeros_like(binary)
for i in range(1, num):
if stats[i, cv2.CC_STAT_AREA] >= min_blob_area:
out[labels == i] = 255
if dilate_k > 0:
dk = _odd_k(dilate_k)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (dk, dk))
out = cv2.dilate(out, kernel)
if keep_largest_blobs > 0:
num, labels, stats, _ = cv2.connectedComponentsWithStats(out, connectivity=8)
ranked: list[tuple[int, int]] = []
for i in range(1, num):
ranked.append((int(stats[i, cv2.CC_STAT_AREA]), i))
ranked.sort(reverse=True)
trimmed = np.zeros_like(out)
for _, comp_idx in ranked[:keep_largest_blobs]:
trimmed[labels == comp_idx] = 255
out = trimmed
return out
def apply_flow_mask(flow_bgr: np.ndarray, mask_u8: np.ndarray) -> np.ndarray:
"""掩膜外置黑;三通道与 mask 相乘。"""
m = (mask_u8.astype(np.float32) / 255.0)[..., np.newaxis]
return np.clip(flow_bgr.astype(np.float32) * m, 0, 255).astype(np.uint8)
def draw_flow_arrows(
flow: np.ndarray,
canvas_bgr: np.ndarray,
*,
mask_u8: np.ndarray | None,
step: int,
scale: float,
min_magnitude: float,
color: tuple[int, int, int],
thickness: int,
) -> np.ndarray:
"""
将稠密光流按网格采样绘制为箭头便于精确观察局部方向与速度
"""
out = canvas_bgr.copy()
h, w = flow.shape[:2]
s = max(2, int(step))
t = max(1, int(thickness))
mag2_thr = float(min_magnitude) * float(min_magnitude)
for y in range(s // 2, h, s):
for x in range(s // 2, w, s):
if mask_u8 is not None and mask_u8[y, x] == 0:
continue
fx = float(flow[y, x, 0])
fy = float(flow[y, x, 1])
if fx * fx + fy * fy < mag2_thr:
continue
x2 = int(round(x + fx * scale))
y2 = int(round(y + fy * scale))
x2 = max(0, min(w - 1, x2))
y2 = max(0, min(h - 1, y2))
cv2.arrowedLine(
out,
(x, y),
(x2, y2),
color=color,
thickness=t,
tipLength=0.3,
)
return out
def build_output_frame(
frame_bgr: np.ndarray,
flow_bgr: np.ndarray,
mode: str,
) -> np.ndarray:
if mode == "sidebyside":
return np.hstack([frame_bgr, flow_bgr])
if mode == "overlay":
return cv2.addWeighted(frame_bgr, 0.55, flow_bgr, 0.45, 0.0)
raise ValueError(f"未知 mode: {mode}")
def _try_gst_nvenc_writer(
path: Path,
fps: float,
size: tuple[int, int],
) -> cv2.VideoWriter | None:
"""Try to create a cv2.VideoWriter backed by GStreamer + Jetson NVENC.
Returns None if GStreamer or the hardware encoder is unavailable.
"""
try:
if not hasattr(cv2, "CAP_GSTREAMER"):
return None
w, h = size
loc = str(path).replace('"', '\\"')
gst_pipe = (
f'appsrc ! videoconvert ! video/x-raw,format=BGRx ! '
f'nvvidconv ! video/x-raw(memory:NVMM) ! '
f'nvv4l2h264enc bitrate=4000000 ! h264parse ! '
f'mp4mux ! filesink location="{loc}"'
)
writer = cv2.VideoWriter(gst_pipe, cv2.CAP_GSTREAMER, 0, fps, (w, h))
if writer.isOpened():
_log.info("[optical-flow] using GStreamer NVENC writer for %s", path.name)
return writer
writer.release()
except Exception:
pass
return None
def open_writer(
path: Path,
fps: float,
size: tuple[int, int],
fourcc_str: str,
) -> cv2.VideoWriter:
gst = _try_gst_nvenc_writer(path, fps, size)
if gst is not None:
return gst
fourcc = cv2.VideoWriter_fourcc(*fourcc_str)
writer = cv2.VideoWriter(str(path), fourcc, fps, size)
if not writer.isOpened():
raise RuntimeError(
f"无法创建输出视频: {path}codec={fourcc_str})。"
"可尝试 --fourcc avc1 或 XVID 并配合 .avi 扩展名。"
)
return writer
def _validate_flow_args(args: Any) -> None:
if not isinstance(args.resize, (int, float)) or float(args.resize) <= 0:
raise ValueError("--resize 必须为正数")
if not 0 < float(args.bright_percentile) < 100:
raise ValueError("--bright-percentile 应在 (0, 100) 内")
def _run_flow_core(args: Any) -> None:
"""稠密光流 + 可视化;``args`` 需含与 CLI 相同的字段(含 ``input`` / ``output`` 路径)。"""
in_path = Path(args.input)
out_path = Path(args.output)
if not in_path.is_file():
raise FileNotFoundError(f"找不到输入视频: {in_path}")
src_size_mb = in_path.stat().st_size / (1024 * 1024)
_log.info("[optical-flow] start: %s (%.1f MB), mode=%s, resize=%.2f",
in_path.name, src_size_mb, args.mode, args.resize)
t0 = time.monotonic()
cap = cv2.VideoCapture(str(in_path))
if not cap.isOpened():
raise RuntimeError(f"无法打开视频: {in_path}")
writer: cv2.VideoWriter | None = None
try:
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
w_in = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h_in = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
_log.info("[optical-flow] src %dx%d @ %.1f fps", w_in, h_in, fps)
scale = float(args.resize)
w = max(1, int(round(w_in * scale)))
h = max(1, int(round(h_in * scale)))
if args.mode == "sidebyside":
out_w, out_h = w * 2, h
else:
out_w, out_h = w, h
writer = open_writer(out_path, fps, (out_w, out_h), args.fourcc)
ret, prev_bgr = cap.read()
if not ret:
raise RuntimeError("视频为空或无法读取首帧")
if scale != 1.0:
prev_bgr = cv2.resize(prev_bgr, (w, h), interpolation=cv2.INTER_AREA)
prev_gray = cv2.cvtColor(prev_bgr, cv2.COLOR_BGR2GRAY)
black_flow = np.zeros((h, w, 3), dtype=np.uint8)
first_out = build_output_frame(prev_bgr, black_flow, args.mode)
writer.write(first_out)
written = 1
frame_idx = 0
fb_flags = 0
progress_log = bool(getattr(args, "progress_log", False))
while True:
ret, frame_bgr = cap.read()
if not ret:
break
if scale != 1.0:
frame_bgr = cv2.resize(frame_bgr, (w, h), interpolation=cv2.INTER_AREA)
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
flow = cv2.calcOpticalFlowFarneback(
prev_gray,
gray,
None,
args.pyr_scale,
args.levels,
args.winsize,
args.iterations,
args.poly_n,
args.poly_sigma,
fb_flags,
)
mask: np.ndarray | None = None
if args.fish_mask:
mask = build_fish_mask(
gray,
bright_percentile=args.bright_percentile,
min_blob_area=args.min_blob_area,
open_k=args.mask_open,
close_k=args.mask_close,
dilate_k=args.mask_dilate,
blur_sigma=args.mask_blur,
keep_largest_blobs=args.keep_largest_blobs,
)
has_mask = np.count_nonzero(mask) > 0
else:
has_mask = False
if args.viz_style in ("hsv", "hsv_arrows"):
if args.fish_mask and has_mask and mask is not None:
hsv_bgr = flow_to_bgr(flow, valid_mask=mask)
hsv_bgr = apply_flow_mask(hsv_bgr, mask)
elif args.fish_mask:
hsv_bgr = np.zeros((h, w, 3), dtype=np.uint8)
else:
hsv_bgr = flow_to_bgr(flow)
else:
hsv_bgr = np.zeros((h, w, 3), dtype=np.uint8)
if args.viz_style in ("arrows", "hsv_arrows"):
arrow_base = np.zeros((h, w, 3), dtype=np.uint8)
arrow_bgr = draw_flow_arrows(
flow,
arrow_base,
mask_u8=mask if (args.fish_mask and has_mask) else None,
step=args.arrow_step,
scale=args.arrow_scale,
min_magnitude=args.arrow_threshold,
color=(0, 255, 255),
thickness=args.arrow_thickness,
)
else:
arrow_bgr = np.zeros((h, w, 3), dtype=np.uint8)
flow_bgr = cv2.addWeighted(hsv_bgr, 0.8, arrow_bgr, 1.0, 0.0)
out_frame = build_output_frame(frame_bgr, flow_bgr, args.mode)
writer.write(out_frame)
written += 1
prev_gray = gray
frame_idx += 1
if progress_log and frame_idx % 30 == 0:
print(f"已处理 {frame_idx} 帧光流…", flush=True)
elapsed = time.monotonic() - t0
_log.info("[optical-flow] done: %d frames in %.1fs (%.1f fps) -> %s",
written, elapsed, written / max(elapsed, 0.001), out_path.name)
if progress_log:
print(f"完成,共写入 {written} 帧({elapsed:.1f}s保存至: {out_path}", flush=True)
finally:
cap.release()
if writer is not None:
writer.release()
def run_optical_flow_video(
input_path: Path,
output_path: Path,
*,
mode: str = "overlay",
viz_style: str = "hsv",
resize: float = 1.0,
fourcc: str = "mp4v",
pyr_scale: float = 0.5,
levels: int = 3,
winsize: int = 15,
iterations: int = 3,
poly_n: int = 5,
poly_sigma: float = 1.2,
fish_mask: bool = True,
bright_percentile: float = 97.5,
min_blob_area: int = 500,
mask_open: int = 3,
mask_close: int = 11,
mask_dilate: int = 5,
mask_blur: float = 1.0,
keep_largest_blobs: int = 0,
arrow_step: int = 12,
arrow_scale: float = 2.0,
arrow_threshold: float = 0.8,
arrow_thickness: int = 1,
progress_log: bool = False,
) -> bool:
"""供 fish_api 等调用:成功写出 ``output_path`` 返回 True否则 False。"""
args = SimpleNamespace(
input=input_path,
output=output_path,
mode=mode,
viz_style=viz_style,
resize=resize,
fourcc=fourcc,
pyr_scale=pyr_scale,
levels=levels,
winsize=winsize,
iterations=iterations,
poly_n=poly_n,
poly_sigma=poly_sigma,
fish_mask=fish_mask,
bright_percentile=bright_percentile,
min_blob_area=min_blob_area,
mask_open=mask_open,
mask_close=mask_close,
mask_dilate=mask_dilate,
mask_blur=mask_blur,
keep_largest_blobs=keep_largest_blobs,
arrow_step=arrow_step,
arrow_scale=arrow_scale,
arrow_threshold=arrow_threshold,
arrow_thickness=arrow_thickness,
progress_log=progress_log,
)
try:
_validate_flow_args(args)
_run_flow_core(args)
except Exception:
_log.exception("[optical-flow] run_optical_flow_video failed: %s -> %s",
input_path, output_path)
return False
return output_path.is_file() and output_path.stat().st_size > 0
def main() -> None:
script_dir = Path(__file__).resolve().parent
default_in = script_dir / "fish_echo.MP4"
default_out = script_dir / "fish_echo_flow_vis.mp4"
p = argparse.ArgumentParser(description="视频稠密光流可视化OpenCV Farneback")
p.add_argument(
"--input",
"-i",
type=Path,
default=default_in,
help=f"输入视频路径(默认: {default_in.name}",
)
p.add_argument(
"--output",
"-o",
type=Path,
default=default_out,
help=f"输出视频路径(默认: {default_out.name}",
)
p.add_argument(
"--mode",
choices=("sidebyside", "overlay"),
default="sidebyside",
help="sidebyside: 原图|光流overlay: 原图与光流半透明叠加",
)
p.add_argument(
"--viz-style",
choices=("hsv", "arrows", "hsv_arrows"),
default="hsv",
help="光流可视化风格hsv 伪彩色、arrows 箭头、hsv_arrows 组合",
)
p.add_argument(
"--resize",
type=float,
default=1.0,
metavar="SCALE",
help="处理前将帧宽高乘以该比例以加速(例如 0.5",
)
p.add_argument(
"--fourcc",
default="mp4v",
help="VideoWriter 四字符编码,常见: mp4v, avc1, XVID",
)
p.add_argument(
"--pyr-scale",
type=float,
default=0.5,
help="Farneback 金字塔缩放OpenCV pyr_scale",
)
p.add_argument(
"--levels",
type=int,
default=3,
help="Farneback 金字塔层数",
)
p.add_argument(
"--winsize",
type=int,
default=15,
help="Farneback 窗口大小",
)
p.add_argument(
"--iterations",
type=int,
default=3,
help="Farneback 每层迭代次数",
)
p.add_argument(
"--poly-n",
type=int,
default=5,
help="Farneback 像素邻域大小poly_n",
)
p.add_argument(
"--poly-sigma",
type=float,
default=1.2,
help="Farneback 高斯标准差poly_sigma",
)
p.add_argument(
"--fish-mask",
dest="fish_mask",
action="store_true",
default=True,
help="仅在高亮鱼团区域显示光流(默认开启,抑制背景噪声)",
)
p.add_argument(
"--no-fish-mask",
dest="fish_mask",
action="store_false",
help="关闭鱼团掩膜,整幅画面显示光流(与旧行为一致)",
)
p.add_argument(
"--bright-percentile",
type=float,
default=97.5,
metavar="P",
help="灰度阈值:取 >= 该分位数的像素作为「亮斑」候选(越高越只保留最亮区域)",
)
p.add_argument(
"--min-blob-area",
type=int,
default=500,
metavar="PX",
help="连通域最小面积(像素),小于此面积的亮斑视为噪声并丢弃",
)
p.add_argument(
"--mask-open",
type=int,
default=3,
help="形态学开运算核大小奇数0 表示跳过),去小噪点",
)
p.add_argument(
"--mask-close",
type=int,
default=11,
help="形态学闭运算核大小奇数0 表示跳过),连接同一鱼团碎片",
)
p.add_argument(
"--mask-dilate",
type=int,
default=5,
help="掩膜膨胀核大小奇数0 表示不膨胀),略扩大显示区域包住边缘运动",
)
p.add_argument(
"--mask-blur",
type=float,
default=1.0,
help="阈值前高斯模糊 sigma0 表示不模糊),可平滑细碎纹理",
)
p.add_argument(
"--keep-largest-blobs",
type=int,
default=0,
metavar="N",
help="在面积过滤后仅保留面积最大的 N 个连通域0 表示不限制,适合多鱼场景)",
)
p.add_argument(
"--arrow-step",
type=int,
default=12,
help="箭头网格步长(像素),越小越密",
)
p.add_argument(
"--arrow-scale",
type=float,
default=2.0,
help="箭头长度缩放系数",
)
p.add_argument(
"--arrow-threshold",
type=float,
default=0.8,
help="绘制箭头的最小速度阈值(像素/帧)",
)
p.add_argument(
"--arrow-thickness",
type=int,
default=1,
help="箭头线宽",
)
args = p.parse_args()
if not args.input.is_file():
raise SystemExit(f"找不到输入视频: {args.input}")
try:
_validate_flow_args(args)
except ValueError as e:
raise SystemExit(str(e)) from e
args.progress_log = True
try:
_run_flow_core(args)
except FileNotFoundError as e:
raise SystemExit(str(e)) from e
except RuntimeError as e:
raise SystemExit(str(e)) from e
if __name__ == "__main__":
main()