#!/usr/bin/env python3 """ 后处理:原始 MP4 + 结果 TSV(+ 可选篮子 ROI JSON)→ 带框标注演示 MP4。 段内复跑 hand_detect 画手部/union ROI;Top3 与医生信息直接读 TSV,不重跑耗材/医生模型。 """ from __future__ import annotations import argparse import os import subprocess import sys from pathlib import Path from typing import Any import cv2 import numpy as np PACK_ROOT = Path(__file__).resolve().parent.parent _SCRIPTS = Path(__file__).resolve().parent sys.path.insert(0, str(PACK_ROOT / "src")) sys.path.insert(0, str(_SCRIPTS)) from paths import ensure_code_on_path # noqa: E402 ensure_code_on_path(PACK_ROOT) from basket_segmenter import load_basket_roi_json # noqa: E402 from config import load_run_config # noqa: E402 from hand_detector import ( # noqa: E402 create_hand_detector, detect_hands_xyxy, validate_hand_assets, ) from pipeline.hand_roi_merge import bbox_iou_xyxy, two_largest_hands, union_xyxy # noqa: E402 from run_segments_consumable_vote import pad_box_bottom_only # noqa: E402 from vis_text import CjkTextRenderer # noqa: E402 from visualize_tsv import ( # noqa: E402 SegmentVis, find_active_segment, parse_result_tsv, ) def _line_w(h: int, w: int) -> int: return max(1, min(w, h) // 400) def _scale_xyxy( xyxy: list[float], scale_x: float, scale_y: float ) -> tuple[int, int, int, int]: x1, y1, x2, y2 = xyxy return ( int(round(x1 * scale_x)), int(round(y1 * scale_y)), int(round(x2 * scale_x)), int(round(y2 * scale_y)), ) def draw_dashed_rect( img: np.ndarray, x1: int, y1: int, x2: int, y2: int, color: tuple[int, int, int], thickness: int, dash: int = 12, ) -> None: pts = [ ((x1, y1), (x2, y1)), ((x2, y1), (x2, y2)), ((x2, y2), (x1, y2)), ((x1, y2), (x1, y1)), ] for (a, b) in pts: dx, dy = b[0] - a[0], b[1] - a[1] length = int((dx * dx + dy * dy) ** 0.5) if length <= 0: continue steps = max(1, length // dash) for i in range(0, steps, 2): t0 = i / steps t1 = min((i + 1) / steps, 1.0) p0 = (int(a[0] + dx * t0), int(a[1] + dy * t0)) p1 = (int(a[0] + dx * t1), int(a[1] + dy * t1)) cv2.line(img, p0, p1, color, thickness, cv2.LINE_AA) def draw_labeled_box( img: np.ndarray, x1: int, y1: int, x2: int, y2: int, color: tuple[int, int, int], label: str, *, thickness: int, dashed: bool = False, text: CjkTextRenderer, ) -> None: x1, y1 = max(0, x1), max(0, y1) h, w = img.shape[:2] x2, y2 = min(w - 1, x2), min(h - 1, y2) if x2 <= x1 or y2 <= y1: return if dashed: draw_dashed_rect(img, x1, y1, x2, y2, color, thickness) else: cv2.rectangle(img, (x1, y1), (x2, y2), color, thickness, cv2.LINE_AA) fs = text.font_size_for_frame(h, w, kind="label") text.draw_label_on_box(img, x1, y1, label, size_px=fs, color_bgr=color, bg_bgr=color) def draw_hud( img: np.ndarray, seg: SegmentVis | None, *, t_sec: float, doctor_summary: str | None, video_name: str, tsv_name: str, title_mode: bool = False, text: CjkTextRenderer, ) -> None: h, w = img.shape[:2] if title_mode: lines = [ "手术室耗材流水线 — 可视化", f"视频: {video_name}", f"结果: {tsv_name}", ] if doctor_summary: lines.append(f"医生: {doctor_summary}") fs = text.font_size_for_frame(h, w, kind="title") text.draw_lines_block( img, lines, 12, int(h * 0.10), size_px=fs, ) return if seg is None: return r = seg.row lines = [ f"rank={r.rank} t={t_sec:.2f}s [{r.start_sec:.2f}, {r.end_sec:.2f}]", ] if seg.is_failure(): lines.append(r.n1.strip()) else: if r.n1.strip(): lines.append(f"Top1: {r.n1} ({r.c1}) id={r.id1}") if r.n2.strip(): lines.append(f"Top2: {r.n2} ({r.c2})") if r.n3.strip(): lines.append(f"Top3: {r.n3} ({r.c3})") doc = seg.doctor_line() if doc: lines.append(doc) fs = text.font_size_for_frame(h, w, kind="hud") text.draw_lines_top(img, lines, size_px=fs) def filter_hands_by_basket( hand_confs: list[tuple[list[float], float]], basket_xyxy: list[float], min_iou: float, ) -> list[tuple[list[float], float]]: """仅保留与篮子 ROI IoU 超过阈值的手(排除远处背景误检)。""" basket = [float(v) for v in basket_xyxy] kept: list[tuple[list[float], float]] = [] for xyxy, conf in hand_confs: if bbox_iou_xyxy(xyxy, basket) > float(min_iou) + 1e-12: kept.append((xyxy, conf)) return kept def expand_basket_xyxy( basket_xyxy: list[float], expand_frac: float, img_w: int, img_h: int, ) -> list[float]: """判定手是否靠近篮子时,外扩篮子框,避免贴边操作 IoU 偏低。""" x1, y1, x2, y2 = [float(v) for v in basket_xyxy] bw, bh = max(1.0, x2 - x1), max(1.0, y2 - y1) px, py = bw * expand_frac, bh * expand_frac return [ max(0.0, x1 - px), max(0.0, y1 - py), min(float(img_w - 1), x2 + px), min(float(img_h - 1), y2 + py), ] def union_roi_from_basket_hands( near_hands: list[tuple[list[float], float]], basket_xyxy: list[float], img_w: int, img_h: int, pad_bottom_ratio: float, ) -> tuple[tuple[int, int, int, int] | None, list[tuple[list[float], float]]]: """ 黄 ROI:在篮筐附近的手中取与篮子 IoU 最高的两只做 union(与绿框同源)。 """ if len(near_hands) < 2: return None, near_hands basket = [float(v) for v in basket_xyxy] ranked = sorted( near_hands, key=lambda t: bbox_iou_xyxy(t[0], basket), reverse=True, ) h1, h2 = ranked[0][0], ranked[1][0] u = union_xyxy(h1, h2) roi = pad_box_bottom_only(u, img_w, img_h, pad_bottom_ratio) return roi, near_hands def _scale_basket_xyxy( basket_xyxy: list[float], scale_x: float, scale_y: float ) -> list[float]: x1, y1, x2, y2 = basket_xyxy return [x1 * scale_x, y1 * scale_y, x2 * scale_x, y2 * scale_y] def detect_hands_and_union( det: Any, frame: np.ndarray, *, det_conf: float, imgsz_det: int, pad_bottom_ratio: float, predict_kw: dict[str, Any], basket_xyxy: list[float] | None = None, hand_basket_min_iou: float | None = None, basket_expand_frac: float = 0.2, use_basket_near_hands: bool = True, ) -> tuple[tuple[int, int, int, int] | None, list[tuple[list[float], float]]]: """ 返回 (union_roi, 待绘制 hand 列表)。坐标系与输入 frame 一致(已缩放后的画面)。 有篮子时默认:仅保留靠近篮子的手,黄 ROI 由其中 IoU 最高的两只合并。 """ h, w = frame.shape[:2] hands = detect_hands_xyxy( det, frame, det_conf=det_conf, imgsz_det=imgsz_det, predict_kw=predict_kw, ) hand_confs: list[tuple[list[float], float]] = [ (xyxy, 1.0) for xyxy in hands ] if ( basket_xyxy is not None and use_basket_near_hands and hand_basket_min_iou is not None ): basket_match = expand_basket_xyxy( basket_xyxy, basket_expand_frac, w, h ) near = filter_hands_by_basket( hand_confs, basket_match, hand_basket_min_iou ) return union_roi_from_basket_hands( near, basket_xyxy, w, h, pad_bottom_ratio ) # 无篮子或未启用过滤:全图最大两只(仅作兜底) draw_confs = hand_confs union_roi: tuple[int, int, int, int] | None = None all_xyxy = [hb for hb, _ in hand_confs] if len(all_xyxy) >= 2: h1, h2 = two_largest_hands(all_xyxy) u = union_xyxy(h1, h2) union_roi = pad_box_bottom_only(u, w, h, pad_bottom_ratio) return union_roi, draw_confs def resize_frame(frame: np.ndarray, preview_width: int) -> tuple[np.ndarray, float, float]: h, w = frame.shape[:2] if w <= preview_width: return frame, 1.0, 1.0 scale = preview_width / float(w) nw = int(round(w * scale)) nh = int(round(h * scale)) out = cv2.resize(frame, (nw, nh), interpolation=cv2.INTER_AREA) return out, scale, scale def open_ffmpeg_writer( out_path: Path, width: int, height: int, fps: float ) -> subprocess.Popen[bytes]: out_path.parent.mkdir(parents=True, exist_ok=True) cmd = [ "ffmpeg", "-y", "-f", "rawvideo", "-vcodec", "rawvideo", "-pix_fmt", "bgr24", "-s", f"{width}x{height}", "-r", f"{fps:.6f}", "-i", "-", "-an", "-c:v", "libx264", "-preset", "ultrafast", "-crf", "23", "-pix_fmt", "yuv420p", str(out_path), ] return subprocess.Popen( cmd, stdin=subprocess.PIPE, stderr=subprocess.DEVNULL, ) def run_visualize(args: argparse.Namespace, cfg: Any) -> int: video_path = args.video.resolve() tsv_path = args.tsv.resolve() out_path = args.out.resolve() if not video_path.is_file(): print(f"[vis] 视频不存在: {video_path}", file=sys.stderr) return 1 if not tsv_path.is_file(): print(f"[vis] TSV 不存在: {tsv_path}", file=sys.stderr) return 1 ok, hand_lab = validate_hand_assets(cfg) if not ok: backend = str(getattr(cfg, "hand_backend", "yolo")) if backend == "mediapipe": print(f"[vis] 缺少 MediaPipe 手部模型: {cfg.hand_mediapipe_task}", file=sys.stderr) else: print(f"[vis] 缺少手部权重: {cfg.hand_model}", file=sys.stderr) return 1 hand_is_mediapipe = str(getattr(cfg, "hand_backend", "yolo")).lower() == "mediapipe" print(f"[vis] 手部检测: {hand_lab}") segments, doctor_summary = parse_result_tsv(tsv_path) if not segments: print(f"[vis] TSV 无有效数据段: {tsv_path}", file=sys.stderr) return 1 print(f"[vis] 已加载 {len(segments)} 段; 医生汇总: {doctor_summary or '(无)'}") try: cjk = CjkTextRenderer( args.font.resolve() if getattr(args, "font", None) else None ) except FileNotFoundError as ex: print(f"[vis] {ex}", file=sys.stderr) return 1 basket_roi: list[float] | None = None if args.basket_roi is not None: basket_roi = load_basket_roi_json(args.basket_roi.resolve()) use_basket_near = not args.no_hand_basket_filter hand_basket_min_iou: float | None = None basket_expand_frac = float(args.basket_expand_frac) if basket_roi is not None and use_basket_near: hand_basket_min_iou = float( args.hand_basket_min_iou if args.hand_basket_min_iou is not None else getattr(cfg, "basket_contact_iou_on", 0.03) ) print( f"[vis] 篮筐附近手检: 外扩篮子 {basket_expand_frac:.0%} 后 IoU > " f"{hand_basket_min_iou:.4f};绿框与黄 ROI 均仅用附近手" ) elif basket_roi is None and use_basket_near: print( "[vis] 未提供 --basket-roi,无法按篮子过滤;" "将绘制全图手检结果", file=sys.stderr, ) elif args.no_hand_basket_filter: print("[vis] 已关闭篮筐过滤(--no-hand-basket-filter)") predict_kw: dict[str, Any] = {"device": cfg.device} if cfg.half: predict_kw["half"] = True det = create_hand_detector(cfg) cap = cv2.VideoCapture(str(video_path)) if not cap.isOpened(): print(f"[vis] 无法打开视频: {video_path}", file=sys.stderr) return 1 fps = float(cap.get(cv2.CAP_PROP_FPS) or 25.0) if fps <= 0: fps = 25.0 total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0) ret, frame0 = cap.read() if not ret or frame0 is None: print("[vis] 无法读取首帧", file=sys.stderr) cap.release() return 1 frame0, sx0, sy0 = resize_frame(frame0, int(args.preview_width)) out_h, out_w = frame0.shape[:2] proc = open_ffmpeg_writer(out_path, out_w, out_h, fps) def write_frame(img: np.ndarray) -> None: if proc.stdin is None: raise RuntimeError("ffmpeg stdin 不可用") if img.shape[1] != out_w or img.shape[0] != out_h: img = cv2.resize(img, (out_w, out_h), interpolation=cv2.INTER_AREA) proc.stdin.write(img.tobytes()) title_frames = max(1, int(round(float(args.title_sec) * fps))) video_name = video_path.name tsv_name = tsv_path.name for _ in range(title_frames): title_img = frame0.copy() draw_hud( title_img, None, t_sec=0.0, doctor_summary=doctor_summary, video_name=video_name, tsv_name=tsv_name, title_mode=True, text=cjk, ) write_frame(title_img) cap.set(cv2.CAP_PROP_POS_FRAMES, 0) lw = _line_w(out_h, out_w) cached_union: tuple[int, int, int, int] | None = None cached_hand_confs: list[tuple[list[float], float]] = [] det_calls = 0 frame_idx = 0 while True: ret, frame = cap.read() if not ret or frame is None: break frame, sx, sy = resize_frame(frame, int(args.preview_width)) t_sec = frame_idx / fps active = find_active_segment(segments, t_sec) vis = frame.copy() if basket_roi is not None: bx1, by1, bx2, by2 = _scale_xyxy(basket_roi, sx, sy) draw_labeled_box( vis, bx1, by1, bx2, by2, (255, 200, 0), "篮子", thickness=lw, dashed=True, text=cjk, ) in_segment = active is not None if in_segment and (frame_idx % int(args.det_stride) == 0): basket_for_det: list[float] | None = None if basket_roi is not None: basket_for_det = _scale_basket_xyxy(basket_roi, sx, sy) cached_union, cached_hand_confs = detect_hands_and_union( det, frame, det_conf=float(cfg.det_conf), imgsz_det=int(cfg.imgsz_det), pad_bottom_ratio=float(cfg.pad_bottom_ratio), predict_kw=predict_kw, basket_xyxy=basket_for_det, hand_basket_min_iou=hand_basket_min_iou, basket_expand_frac=basket_expand_frac, use_basket_near_hands=use_basket_near and basket_roi is not None, ) det_calls += 1 if in_segment: for hxyxy, conf in cached_hand_confs: x1, y1, x2, y2 = (int(round(v)) for v in hxyxy[:4]) hand_lbl = "手 mp" if hand_is_mediapipe else f"手 {conf:.2f}" draw_labeled_box( vis, x1, y1, x2, y2, (0, 220, 0), hand_lbl, thickness=lw, text=cjk, ) if cached_union is not None: ux1, uy1, ux2, uy2 = cached_union draw_labeled_box( vis, ux1, uy1, ux2, uy2, (0, 220, 255), "ROI", thickness=max(lw + 1, 2), text=cjk, ) draw_hud( vis, active, t_sec=t_sec, doctor_summary=doctor_summary, video_name=video_name, tsv_name=tsv_name, text=cjk, ) else: cached_union = None cached_hand_confs = [] if args.draw_outside_segments: fs = cjk.font_size_for_frame(out_h, out_w, kind="small") cjk.draw( vis, "非识别段", 10, out_h - fs - 12, size_px=fs, color_bgr=(180, 180, 180), ) write_frame(vis) frame_idx += 1 if frame_idx % 500 == 0: print(f"[vis] 进度 {frame_idx}/{total_frames or '?'} 帧, 手检次数={det_calls}") cap.release() if hasattr(det, "close"): det.close() if proc.stdin: proc.stdin.close() rc = proc.wait() if rc != 0: print(f"[vis] ffmpeg 退出码 {rc}", file=sys.stderr) return 1 print(f"[vis] 完成: {out_path} ({frame_idx} 帧 + {title_frames} 片头, 段内手检 {det_calls} 次)") return 0 def main() -> int: os.environ.setdefault("OPENCV_FFMPEG_LOGLEVEL", "8") ap = argparse.ArgumentParser(description="MP4 + TSV → 带框标注演示视频") ap.add_argument("--video", type=Path, required=True, help="原始 MP4") ap.add_argument("--tsv", type=Path, required=True, help="main_basket 输出的 TSV/txt") ap.add_argument("--out", type=Path, required=True, help="输出 MP4") ap.add_argument( "--config", type=Path, default=PACK_ROOT / "configs" / "default_config.yaml", ) ap.add_argument( "--basket-roi", type=Path, default=None, help="篮子 ROI JSON(main_basket --save-basket-roi)", ) ap.add_argument("--det-stride", type=int, default=3, help="段内每 N 帧手检一次") ap.add_argument("--preview-width", type=int, default=1920, help="输出宽度上限") ap.add_argument( "--draw-outside-segments", action="store_true", help="非 TSV 时间段角标「非识别段」", ) ap.add_argument("--title-sec", type=float, default=3.0, help="片头时长(秒)") ap.add_argument( "--font", type=Path, default=None, help="中文字体路径(.ttc/.ttf);默认自动查找 Noto/WQY 等", ) ap.add_argument( "--no-hand-basket-filter", action="store_true", help="关闭篮筐附近过滤(默认开启:少画背景手,黄 ROI 在篮筐处)", ) ap.add_argument( "--hand-basket-min-iou", type=float, default=None, help="手与(外扩后)篮子最小 IoU;默认 basket.contact_iou_on", ) ap.add_argument( "--basket-expand-frac", type=float, default=0.2, help="判定靠近篮子时外扩 ROI 比例(默认 0.2)", ) ap.add_argument( "--hand-backend", choices=("mediapipe", "yolo"), default=None, help="覆盖 yaml hand.backend(默认 mediapipe + hand_landmarker.task)", ) args = ap.parse_args() cfg = load_run_config(PACK_ROOT, args.config.resolve()) if args.hand_backend is not None: cfg.hand_backend = args.hand_backend return run_visualize(args, cfg) if __name__ == "__main__": raise SystemExit(main())