596 lines
18 KiB
Python
596 lines
18 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
"""
|
|||
|
|
后处理:原始 MP4 + 结果 TSV(+ 可选篮子 ROI JSON)→ 带框标注演示 MP4。
|
|||
|
|
|
|||
|
|
段内复跑 hand_detect 画手部/union ROI;Top3 与医生信息直接读 TSV,不重跑耗材/医生模型。
|
|||
|
|
"""
|
|||
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
import argparse
|
|||
|
|
import os
|
|||
|
|
import subprocess
|
|||
|
|
import sys
|
|||
|
|
from pathlib import Path
|
|||
|
|
from typing import Any
|
|||
|
|
|
|||
|
|
import cv2
|
|||
|
|
import numpy as np
|
|||
|
|
from ultralytics import YOLO
|
|||
|
|
|
|||
|
|
PACK_ROOT = Path(__file__).resolve().parent.parent
|
|||
|
|
_SCRIPTS = Path(__file__).resolve().parent
|
|||
|
|
sys.path.insert(0, str(PACK_ROOT / "src"))
|
|||
|
|
sys.path.insert(0, str(_SCRIPTS))
|
|||
|
|
|
|||
|
|
from paths import ensure_code_on_path # noqa: E402
|
|||
|
|
|
|||
|
|
ensure_code_on_path(PACK_ROOT)
|
|||
|
|
|
|||
|
|
from basket_segmenter import load_basket_roi_json # noqa: E402
|
|||
|
|
from config import load_run_config # noqa: E402
|
|||
|
|
from pipeline.hand_roi_merge import bbox_iou_xyxy, two_largest_hands, union_xyxy # noqa: E402
|
|||
|
|
from run_segments_consumable_vote import ( # noqa: E402
|
|||
|
|
collect_hand_boxes,
|
|||
|
|
pad_box_bottom_only,
|
|||
|
|
)
|
|||
|
|
from vis_text import CjkTextRenderer # noqa: E402
|
|||
|
|
from visualize_tsv import ( # noqa: E402
|
|||
|
|
SegmentVis,
|
|||
|
|
find_active_segment,
|
|||
|
|
parse_result_tsv,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _line_w(h: int, w: int) -> int:
|
|||
|
|
return max(1, min(w, h) // 400)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _scale_xyxy(
|
|||
|
|
xyxy: list[float], scale_x: float, scale_y: float
|
|||
|
|
) -> tuple[int, int, int, int]:
|
|||
|
|
x1, y1, x2, y2 = xyxy
|
|||
|
|
return (
|
|||
|
|
int(round(x1 * scale_x)),
|
|||
|
|
int(round(y1 * scale_y)),
|
|||
|
|
int(round(x2 * scale_x)),
|
|||
|
|
int(round(y2 * scale_y)),
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def draw_dashed_rect(
|
|||
|
|
img: np.ndarray,
|
|||
|
|
x1: int,
|
|||
|
|
y1: int,
|
|||
|
|
x2: int,
|
|||
|
|
y2: int,
|
|||
|
|
color: tuple[int, int, int],
|
|||
|
|
thickness: int,
|
|||
|
|
dash: int = 12,
|
|||
|
|
) -> None:
|
|||
|
|
pts = [
|
|||
|
|
((x1, y1), (x2, y1)),
|
|||
|
|
((x2, y1), (x2, y2)),
|
|||
|
|
((x2, y2), (x1, y2)),
|
|||
|
|
((x1, y2), (x1, y1)),
|
|||
|
|
]
|
|||
|
|
for (a, b) in pts:
|
|||
|
|
dx, dy = b[0] - a[0], b[1] - a[1]
|
|||
|
|
length = int((dx * dx + dy * dy) ** 0.5)
|
|||
|
|
if length <= 0:
|
|||
|
|
continue
|
|||
|
|
steps = max(1, length // dash)
|
|||
|
|
for i in range(0, steps, 2):
|
|||
|
|
t0 = i / steps
|
|||
|
|
t1 = min((i + 1) / steps, 1.0)
|
|||
|
|
p0 = (int(a[0] + dx * t0), int(a[1] + dy * t0))
|
|||
|
|
p1 = (int(a[0] + dx * t1), int(a[1] + dy * t1))
|
|||
|
|
cv2.line(img, p0, p1, color, thickness, cv2.LINE_AA)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def draw_labeled_box(
|
|||
|
|
img: np.ndarray,
|
|||
|
|
x1: int,
|
|||
|
|
y1: int,
|
|||
|
|
x2: int,
|
|||
|
|
y2: int,
|
|||
|
|
color: tuple[int, int, int],
|
|||
|
|
label: str,
|
|||
|
|
*,
|
|||
|
|
thickness: int,
|
|||
|
|
dashed: bool = False,
|
|||
|
|
text: CjkTextRenderer,
|
|||
|
|
) -> None:
|
|||
|
|
x1, y1 = max(0, x1), max(0, y1)
|
|||
|
|
h, w = img.shape[:2]
|
|||
|
|
x2, y2 = min(w - 1, x2), min(h - 1, y2)
|
|||
|
|
if x2 <= x1 or y2 <= y1:
|
|||
|
|
return
|
|||
|
|
if dashed:
|
|||
|
|
draw_dashed_rect(img, x1, y1, x2, y2, color, thickness)
|
|||
|
|
else:
|
|||
|
|
cv2.rectangle(img, (x1, y1), (x2, y2), color, thickness, cv2.LINE_AA)
|
|||
|
|
fs = text.font_size_for_frame(h, w, kind="label")
|
|||
|
|
text.draw_label_on_box(img, x1, y1, label, size_px=fs, color_bgr=color, bg_bgr=color)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def draw_hud(
|
|||
|
|
img: np.ndarray,
|
|||
|
|
seg: SegmentVis | None,
|
|||
|
|
*,
|
|||
|
|
t_sec: float,
|
|||
|
|
doctor_summary: str | None,
|
|||
|
|
video_name: str,
|
|||
|
|
tsv_name: str,
|
|||
|
|
title_mode: bool = False,
|
|||
|
|
text: CjkTextRenderer,
|
|||
|
|
) -> None:
|
|||
|
|
h, w = img.shape[:2]
|
|||
|
|
|
|||
|
|
if title_mode:
|
|||
|
|
lines = [
|
|||
|
|
"手术室耗材流水线 — 可视化",
|
|||
|
|
f"视频: {video_name}",
|
|||
|
|
f"结果: {tsv_name}",
|
|||
|
|
]
|
|||
|
|
if doctor_summary:
|
|||
|
|
lines.append(f"医生: {doctor_summary}")
|
|||
|
|
fs = text.font_size_for_frame(h, w, kind="title")
|
|||
|
|
text.draw_lines_block(
|
|||
|
|
img,
|
|||
|
|
lines,
|
|||
|
|
12,
|
|||
|
|
int(h * 0.10),
|
|||
|
|
size_px=fs,
|
|||
|
|
)
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
if seg is None:
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
r = seg.row
|
|||
|
|
lines = [
|
|||
|
|
f"rank={r.rank} t={t_sec:.2f}s [{r.start_sec:.2f}, {r.end_sec:.2f}]",
|
|||
|
|
]
|
|||
|
|
if seg.is_failure():
|
|||
|
|
lines.append(r.n1.strip())
|
|||
|
|
else:
|
|||
|
|
if r.n1.strip():
|
|||
|
|
lines.append(f"Top1: {r.n1} ({r.c1}) id={r.id1}")
|
|||
|
|
if r.n2.strip():
|
|||
|
|
lines.append(f"Top2: {r.n2} ({r.c2})")
|
|||
|
|
if r.n3.strip():
|
|||
|
|
lines.append(f"Top3: {r.n3} ({r.c3})")
|
|||
|
|
doc = seg.doctor_line()
|
|||
|
|
if doc:
|
|||
|
|
lines.append(doc)
|
|||
|
|
|
|||
|
|
fs = text.font_size_for_frame(h, w, kind="hud")
|
|||
|
|
text.draw_lines_top(img, lines, size_px=fs)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def filter_hands_by_basket(
|
|||
|
|
hand_confs: list[tuple[list[float], float]],
|
|||
|
|
basket_xyxy: list[float],
|
|||
|
|
min_iou: float,
|
|||
|
|
) -> list[tuple[list[float], float]]:
|
|||
|
|
"""仅保留与篮子 ROI IoU 超过阈值的手(排除远处背景误检)。"""
|
|||
|
|
basket = [float(v) for v in basket_xyxy]
|
|||
|
|
kept: list[tuple[list[float], float]] = []
|
|||
|
|
for xyxy, conf in hand_confs:
|
|||
|
|
if bbox_iou_xyxy(xyxy, basket) > float(min_iou) + 1e-12:
|
|||
|
|
kept.append((xyxy, conf))
|
|||
|
|
return kept
|
|||
|
|
|
|||
|
|
|
|||
|
|
def expand_basket_xyxy(
|
|||
|
|
basket_xyxy: list[float],
|
|||
|
|
expand_frac: float,
|
|||
|
|
img_w: int,
|
|||
|
|
img_h: int,
|
|||
|
|
) -> list[float]:
|
|||
|
|
"""判定手是否靠近篮子时,外扩篮子框,避免贴边操作 IoU 偏低。"""
|
|||
|
|
x1, y1, x2, y2 = [float(v) for v in basket_xyxy]
|
|||
|
|
bw, bh = max(1.0, x2 - x1), max(1.0, y2 - y1)
|
|||
|
|
px, py = bw * expand_frac, bh * expand_frac
|
|||
|
|
return [
|
|||
|
|
max(0.0, x1 - px),
|
|||
|
|
max(0.0, y1 - py),
|
|||
|
|
min(float(img_w - 1), x2 + px),
|
|||
|
|
min(float(img_h - 1), y2 + py),
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
|
|||
|
|
def union_roi_from_basket_hands(
|
|||
|
|
near_hands: list[tuple[list[float], float]],
|
|||
|
|
basket_xyxy: list[float],
|
|||
|
|
img_w: int,
|
|||
|
|
img_h: int,
|
|||
|
|
pad_bottom_ratio: float,
|
|||
|
|
) -> tuple[tuple[int, int, int, int] | None, list[tuple[list[float], float]]]:
|
|||
|
|
"""
|
|||
|
|
黄 ROI:在篮筐附近的手中取与篮子 IoU 最高的两只做 union(与绿框同源)。
|
|||
|
|
"""
|
|||
|
|
if len(near_hands) < 2:
|
|||
|
|
return None, near_hands
|
|||
|
|
basket = [float(v) for v in basket_xyxy]
|
|||
|
|
ranked = sorted(
|
|||
|
|
near_hands,
|
|||
|
|
key=lambda t: bbox_iou_xyxy(t[0], basket),
|
|||
|
|
reverse=True,
|
|||
|
|
)
|
|||
|
|
h1, h2 = ranked[0][0], ranked[1][0]
|
|||
|
|
u = union_xyxy(h1, h2)
|
|||
|
|
roi = pad_box_bottom_only(u, img_w, img_h, pad_bottom_ratio)
|
|||
|
|
return roi, near_hands
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _scale_basket_xyxy(
|
|||
|
|
basket_xyxy: list[float], scale_x: float, scale_y: float
|
|||
|
|
) -> list[float]:
|
|||
|
|
x1, y1, x2, y2 = basket_xyxy
|
|||
|
|
return [x1 * scale_x, y1 * scale_y, x2 * scale_x, y2 * scale_y]
|
|||
|
|
|
|||
|
|
|
|||
|
|
def detect_hands_and_union(
|
|||
|
|
det_model: YOLO,
|
|||
|
|
frame: np.ndarray,
|
|||
|
|
*,
|
|||
|
|
det_conf: float,
|
|||
|
|
imgsz_det: int,
|
|||
|
|
pad_bottom_ratio: float,
|
|||
|
|
predict_kw: dict[str, Any],
|
|||
|
|
basket_xyxy: list[float] | None = None,
|
|||
|
|
hand_basket_min_iou: float | None = None,
|
|||
|
|
basket_expand_frac: float = 0.2,
|
|||
|
|
use_basket_near_hands: bool = True,
|
|||
|
|
) -> tuple[tuple[int, int, int, int] | None, list[tuple[list[float], float]]]:
|
|||
|
|
"""
|
|||
|
|
返回 (union_roi, 待绘制 hand 列表)。坐标系与输入 frame 一致(已缩放后的画面)。
|
|||
|
|
有篮子时默认:仅保留靠近篮子的手,黄 ROI 由其中 IoU 最高的两只合并。
|
|||
|
|
"""
|
|||
|
|
h, w = frame.shape[:2]
|
|||
|
|
r = det_model.predict(
|
|||
|
|
frame, imgsz=imgsz_det, conf=det_conf, verbose=False, **predict_kw
|
|||
|
|
)[0]
|
|||
|
|
hand_confs: list[tuple[list[float], float]] = []
|
|||
|
|
if r.boxes is not None:
|
|||
|
|
names = det_model.names
|
|||
|
|
for box in r.boxes:
|
|||
|
|
cid = int(box.cls[0])
|
|||
|
|
if names.get(cid, "") == "hand":
|
|||
|
|
conf = float(box.conf[0]) if box.conf is not None else 0.0
|
|||
|
|
hand_confs.append((box.xyxy[0].tolist(), conf))
|
|||
|
|
|
|||
|
|
if (
|
|||
|
|
basket_xyxy is not None
|
|||
|
|
and use_basket_near_hands
|
|||
|
|
and hand_basket_min_iou is not None
|
|||
|
|
):
|
|||
|
|
basket_match = expand_basket_xyxy(
|
|||
|
|
basket_xyxy, basket_expand_frac, w, h
|
|||
|
|
)
|
|||
|
|
near = filter_hands_by_basket(
|
|||
|
|
hand_confs, basket_match, hand_basket_min_iou
|
|||
|
|
)
|
|||
|
|
return union_roi_from_basket_hands(
|
|||
|
|
near, basket_xyxy, w, h, pad_bottom_ratio
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 无篮子或未启用过滤:全图最大两只(仅作兜底)
|
|||
|
|
draw_confs = hand_confs
|
|||
|
|
union_roi: tuple[int, int, int, int] | None = None
|
|||
|
|
all_xyxy = [hb for hb, _ in hand_confs]
|
|||
|
|
if len(all_xyxy) >= 2:
|
|||
|
|
h1, h2 = two_largest_hands(all_xyxy)
|
|||
|
|
u = union_xyxy(h1, h2)
|
|||
|
|
union_roi = pad_box_bottom_only(u, w, h, pad_bottom_ratio)
|
|||
|
|
return union_roi, draw_confs
|
|||
|
|
|
|||
|
|
|
|||
|
|
def resize_frame(frame: np.ndarray, preview_width: int) -> tuple[np.ndarray, float, float]:
|
|||
|
|
h, w = frame.shape[:2]
|
|||
|
|
if w <= preview_width:
|
|||
|
|
return frame, 1.0, 1.0
|
|||
|
|
scale = preview_width / float(w)
|
|||
|
|
nw = int(round(w * scale))
|
|||
|
|
nh = int(round(h * scale))
|
|||
|
|
out = cv2.resize(frame, (nw, nh), interpolation=cv2.INTER_AREA)
|
|||
|
|
return out, scale, scale
|
|||
|
|
|
|||
|
|
|
|||
|
|
def open_ffmpeg_writer(
|
|||
|
|
out_path: Path, width: int, height: int, fps: float
|
|||
|
|
) -> subprocess.Popen[bytes]:
|
|||
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|||
|
|
cmd = [
|
|||
|
|
"ffmpeg",
|
|||
|
|
"-y",
|
|||
|
|
"-f",
|
|||
|
|
"rawvideo",
|
|||
|
|
"-vcodec",
|
|||
|
|
"rawvideo",
|
|||
|
|
"-pix_fmt",
|
|||
|
|
"bgr24",
|
|||
|
|
"-s",
|
|||
|
|
f"{width}x{height}",
|
|||
|
|
"-r",
|
|||
|
|
f"{fps:.6f}",
|
|||
|
|
"-i",
|
|||
|
|
"-",
|
|||
|
|
"-an",
|
|||
|
|
"-c:v",
|
|||
|
|
"libx264",
|
|||
|
|
"-preset",
|
|||
|
|
"ultrafast",
|
|||
|
|
"-crf",
|
|||
|
|
"23",
|
|||
|
|
"-pix_fmt",
|
|||
|
|
"yuv420p",
|
|||
|
|
str(out_path),
|
|||
|
|
]
|
|||
|
|
return subprocess.Popen(
|
|||
|
|
cmd,
|
|||
|
|
stdin=subprocess.PIPE,
|
|||
|
|
stderr=subprocess.DEVNULL,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def run_visualize(args: argparse.Namespace, cfg: Any) -> int:
|
|||
|
|
video_path = args.video.resolve()
|
|||
|
|
tsv_path = args.tsv.resolve()
|
|||
|
|
out_path = args.out.resolve()
|
|||
|
|
|
|||
|
|
if not video_path.is_file():
|
|||
|
|
print(f"[vis] 视频不存在: {video_path}", file=sys.stderr)
|
|||
|
|
return 1
|
|||
|
|
if not tsv_path.is_file():
|
|||
|
|
print(f"[vis] TSV 不存在: {tsv_path}", file=sys.stderr)
|
|||
|
|
return 1
|
|||
|
|
if not Path(cfg.hand_model).is_file():
|
|||
|
|
print(f"[vis] 缺少手部权重: {cfg.hand_model}", file=sys.stderr)
|
|||
|
|
return 1
|
|||
|
|
|
|||
|
|
segments, doctor_summary = parse_result_tsv(tsv_path)
|
|||
|
|
if not segments:
|
|||
|
|
print(f"[vis] TSV 无有效数据段: {tsv_path}", file=sys.stderr)
|
|||
|
|
return 1
|
|||
|
|
print(f"[vis] 已加载 {len(segments)} 段; 医生汇总: {doctor_summary or '(无)'}")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
cjk = CjkTextRenderer(
|
|||
|
|
args.font.resolve() if getattr(args, "font", None) else None
|
|||
|
|
)
|
|||
|
|
except FileNotFoundError as ex:
|
|||
|
|
print(f"[vis] {ex}", file=sys.stderr)
|
|||
|
|
return 1
|
|||
|
|
|
|||
|
|
basket_roi: list[float] | None = None
|
|||
|
|
if args.basket_roi is not None:
|
|||
|
|
basket_roi = load_basket_roi_json(args.basket_roi.resolve())
|
|||
|
|
|
|||
|
|
use_basket_near = not args.no_hand_basket_filter
|
|||
|
|
hand_basket_min_iou: float | None = None
|
|||
|
|
basket_expand_frac = float(args.basket_expand_frac)
|
|||
|
|
if basket_roi is not None and use_basket_near:
|
|||
|
|
hand_basket_min_iou = float(
|
|||
|
|
args.hand_basket_min_iou
|
|||
|
|
if args.hand_basket_min_iou is not None
|
|||
|
|
else getattr(cfg, "basket_contact_iou_on", 0.03)
|
|||
|
|
)
|
|||
|
|
print(
|
|||
|
|
f"[vis] 篮筐附近手检: 外扩篮子 {basket_expand_frac:.0%} 后 IoU > "
|
|||
|
|
f"{hand_basket_min_iou:.4f};绿框与黄 ROI 均仅用附近手"
|
|||
|
|
)
|
|||
|
|
elif basket_roi is None and use_basket_near:
|
|||
|
|
print(
|
|||
|
|
"[vis] 未提供 --basket-roi,无法按篮子过滤;"
|
|||
|
|
"将绘制全图手检结果",
|
|||
|
|
file=sys.stderr,
|
|||
|
|
)
|
|||
|
|
elif args.no_hand_basket_filter:
|
|||
|
|
print("[vis] 已关闭篮筐过滤(--no-hand-basket-filter)")
|
|||
|
|
|
|||
|
|
predict_kw: dict[str, Any] = {"device": cfg.device}
|
|||
|
|
if cfg.half:
|
|||
|
|
predict_kw["half"] = True
|
|||
|
|
|
|||
|
|
det_model = YOLO(str(cfg.hand_model))
|
|||
|
|
cap = cv2.VideoCapture(str(video_path))
|
|||
|
|
if not cap.isOpened():
|
|||
|
|
print(f"[vis] 无法打开视频: {video_path}", file=sys.stderr)
|
|||
|
|
return 1
|
|||
|
|
|
|||
|
|
fps = float(cap.get(cv2.CAP_PROP_FPS) or 25.0)
|
|||
|
|
if fps <= 0:
|
|||
|
|
fps = 25.0
|
|||
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)
|
|||
|
|
|
|||
|
|
ret, frame0 = cap.read()
|
|||
|
|
if not ret or frame0 is None:
|
|||
|
|
print("[vis] 无法读取首帧", file=sys.stderr)
|
|||
|
|
cap.release()
|
|||
|
|
return 1
|
|||
|
|
|
|||
|
|
frame0, sx0, sy0 = resize_frame(frame0, int(args.preview_width))
|
|||
|
|
out_h, out_w = frame0.shape[:2]
|
|||
|
|
proc = open_ffmpeg_writer(out_path, out_w, out_h, fps)
|
|||
|
|
|
|||
|
|
def write_frame(img: np.ndarray) -> None:
|
|||
|
|
if proc.stdin is None:
|
|||
|
|
raise RuntimeError("ffmpeg stdin 不可用")
|
|||
|
|
if img.shape[1] != out_w or img.shape[0] != out_h:
|
|||
|
|
img = cv2.resize(img, (out_w, out_h), interpolation=cv2.INTER_AREA)
|
|||
|
|
proc.stdin.write(img.tobytes())
|
|||
|
|
|
|||
|
|
title_frames = max(1, int(round(float(args.title_sec) * fps)))
|
|||
|
|
video_name = video_path.name
|
|||
|
|
tsv_name = tsv_path.name
|
|||
|
|
|
|||
|
|
for _ in range(title_frames):
|
|||
|
|
title_img = frame0.copy()
|
|||
|
|
draw_hud(
|
|||
|
|
title_img,
|
|||
|
|
None,
|
|||
|
|
t_sec=0.0,
|
|||
|
|
doctor_summary=doctor_summary,
|
|||
|
|
video_name=video_name,
|
|||
|
|
tsv_name=tsv_name,
|
|||
|
|
title_mode=True,
|
|||
|
|
text=cjk,
|
|||
|
|
)
|
|||
|
|
write_frame(title_img)
|
|||
|
|
|
|||
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
|
|||
|
|
lw = _line_w(out_h, out_w)
|
|||
|
|
cached_union: tuple[int, int, int, int] | None = None
|
|||
|
|
cached_hand_confs: list[tuple[list[float], float]] = []
|
|||
|
|
det_calls = 0
|
|||
|
|
frame_idx = 0
|
|||
|
|
|
|||
|
|
while True:
|
|||
|
|
ret, frame = cap.read()
|
|||
|
|
if not ret or frame is None:
|
|||
|
|
break
|
|||
|
|
frame, sx, sy = resize_frame(frame, int(args.preview_width))
|
|||
|
|
t_sec = frame_idx / fps
|
|||
|
|
active = find_active_segment(segments, t_sec)
|
|||
|
|
vis = frame.copy()
|
|||
|
|
|
|||
|
|
if basket_roi is not None:
|
|||
|
|
bx1, by1, bx2, by2 = _scale_xyxy(basket_roi, sx, sy)
|
|||
|
|
draw_labeled_box(
|
|||
|
|
vis, bx1, by1, bx2, by2, (255, 200, 0), "篮子",
|
|||
|
|
thickness=lw, dashed=True, text=cjk,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
in_segment = active is not None
|
|||
|
|
if in_segment and (frame_idx % int(args.det_stride) == 0):
|
|||
|
|
basket_for_det: list[float] | None = None
|
|||
|
|
if basket_roi is not None:
|
|||
|
|
basket_for_det = _scale_basket_xyxy(basket_roi, sx, sy)
|
|||
|
|
cached_union, cached_hand_confs = detect_hands_and_union(
|
|||
|
|
det_model,
|
|||
|
|
frame,
|
|||
|
|
det_conf=float(cfg.det_conf),
|
|||
|
|
imgsz_det=int(cfg.imgsz_det),
|
|||
|
|
pad_bottom_ratio=float(cfg.pad_bottom_ratio),
|
|||
|
|
predict_kw=predict_kw,
|
|||
|
|
basket_xyxy=basket_for_det,
|
|||
|
|
hand_basket_min_iou=hand_basket_min_iou,
|
|||
|
|
basket_expand_frac=basket_expand_frac,
|
|||
|
|
use_basket_near_hands=use_basket_near and basket_roi is not None,
|
|||
|
|
)
|
|||
|
|
det_calls += 1
|
|||
|
|
|
|||
|
|
if in_segment:
|
|||
|
|
for hxyxy, conf in cached_hand_confs:
|
|||
|
|
x1, y1, x2, y2 = (int(round(v)) for v in hxyxy[:4])
|
|||
|
|
draw_labeled_box(
|
|||
|
|
vis, x1, y1, x2, y2, (0, 220, 0), f"手 {conf:.2f}",
|
|||
|
|
thickness=lw,
|
|||
|
|
text=cjk,
|
|||
|
|
)
|
|||
|
|
if cached_union is not None:
|
|||
|
|
ux1, uy1, ux2, uy2 = cached_union
|
|||
|
|
draw_labeled_box(
|
|||
|
|
vis, ux1, uy1, ux2, uy2, (0, 220, 255), "ROI",
|
|||
|
|
thickness=max(lw + 1, 2),
|
|||
|
|
text=cjk,
|
|||
|
|
)
|
|||
|
|
draw_hud(
|
|||
|
|
vis,
|
|||
|
|
active,
|
|||
|
|
t_sec=t_sec,
|
|||
|
|
doctor_summary=doctor_summary,
|
|||
|
|
video_name=video_name,
|
|||
|
|
tsv_name=tsv_name,
|
|||
|
|
text=cjk,
|
|||
|
|
)
|
|||
|
|
else:
|
|||
|
|
cached_union = None
|
|||
|
|
cached_hand_confs = []
|
|||
|
|
if args.draw_outside_segments:
|
|||
|
|
fs = cjk.font_size_for_frame(out_h, out_w, kind="small")
|
|||
|
|
cjk.draw(
|
|||
|
|
vis,
|
|||
|
|
"非识别段",
|
|||
|
|
10,
|
|||
|
|
out_h - fs - 12,
|
|||
|
|
size_px=fs,
|
|||
|
|
color_bgr=(180, 180, 180),
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
write_frame(vis)
|
|||
|
|
frame_idx += 1
|
|||
|
|
if frame_idx % 500 == 0:
|
|||
|
|
print(f"[vis] 进度 {frame_idx}/{total_frames or '?'} 帧, 手检次数={det_calls}")
|
|||
|
|
|
|||
|
|
cap.release()
|
|||
|
|
if proc.stdin:
|
|||
|
|
proc.stdin.close()
|
|||
|
|
rc = proc.wait()
|
|||
|
|
if rc != 0:
|
|||
|
|
print(f"[vis] ffmpeg 退出码 {rc}", file=sys.stderr)
|
|||
|
|
return 1
|
|||
|
|
|
|||
|
|
print(f"[vis] 完成: {out_path} ({frame_idx} 帧 + {title_frames} 片头, 段内手检 {det_calls} 次)")
|
|||
|
|
return 0
|
|||
|
|
|
|||
|
|
|
|||
|
|
def main() -> int:
|
|||
|
|
os.environ.setdefault("OPENCV_FFMPEG_LOGLEVEL", "8")
|
|||
|
|
ap = argparse.ArgumentParser(description="MP4 + TSV → 带框标注演示视频")
|
|||
|
|
ap.add_argument("--video", type=Path, required=True, help="原始 MP4")
|
|||
|
|
ap.add_argument("--tsv", type=Path, required=True, help="main_basket 输出的 TSV/txt")
|
|||
|
|
ap.add_argument("--out", type=Path, required=True, help="输出 MP4")
|
|||
|
|
ap.add_argument(
|
|||
|
|
"--config",
|
|||
|
|
type=Path,
|
|||
|
|
default=PACK_ROOT / "configs" / "default_config.yaml",
|
|||
|
|
)
|
|||
|
|
ap.add_argument(
|
|||
|
|
"--basket-roi",
|
|||
|
|
type=Path,
|
|||
|
|
default=None,
|
|||
|
|
help="篮子 ROI JSON(main_basket --save-basket-roi)",
|
|||
|
|
)
|
|||
|
|
ap.add_argument("--det-stride", type=int, default=3, help="段内每 N 帧手检一次")
|
|||
|
|
ap.add_argument("--preview-width", type=int, default=1920, help="输出宽度上限")
|
|||
|
|
ap.add_argument(
|
|||
|
|
"--draw-outside-segments",
|
|||
|
|
action="store_true",
|
|||
|
|
help="非 TSV 时间段角标「非识别段」",
|
|||
|
|
)
|
|||
|
|
ap.add_argument("--title-sec", type=float, default=3.0, help="片头时长(秒)")
|
|||
|
|
ap.add_argument(
|
|||
|
|
"--font",
|
|||
|
|
type=Path,
|
|||
|
|
default=None,
|
|||
|
|
help="中文字体路径(.ttc/.ttf);默认自动查找 Noto/WQY 等",
|
|||
|
|
)
|
|||
|
|
ap.add_argument(
|
|||
|
|
"--no-hand-basket-filter",
|
|||
|
|
action="store_true",
|
|||
|
|
help="关闭篮筐附近过滤(默认开启:少画背景手,黄 ROI 在篮筐处)",
|
|||
|
|
)
|
|||
|
|
ap.add_argument(
|
|||
|
|
"--hand-basket-min-iou",
|
|||
|
|
type=float,
|
|||
|
|
default=None,
|
|||
|
|
help="手与(外扩后)篮子最小 IoU;默认 basket.contact_iou_on",
|
|||
|
|
)
|
|||
|
|
ap.add_argument(
|
|||
|
|
"--basket-expand-frac",
|
|||
|
|
type=float,
|
|||
|
|
default=0.2,
|
|||
|
|
help="判定靠近篮子时外扩 ROI 比例(默认 0.2)",
|
|||
|
|
)
|
|||
|
|
args = ap.parse_args()
|
|||
|
|
|
|||
|
|
cfg = load_run_config(PACK_ROOT, args.config.resolve())
|
|||
|
|
return run_visualize(args, cfg)
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
raise SystemExit(main())
|