Files
operating-room-monitor-server/backend/app/services/video_batch_runner.py
Kevin 09885b4184 实现 video batch 自动清理与按需标注视频,并补充子进程调用测试。
batch 完成后仅保留数据库文本结果,勾选时才生成临时标注视频(24h TTL);新增 FastAPI 到 reference bundle 与 algorithm_runner 的单元测试。

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-21 16:30:48 +08:00

969 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Batch video mode: run the configured reference bundle on a complete MP4.
This path generates the YAML/Excel/whitelist inputs expected by
the reference bundle's ``main.py``. Intermediate files are cached per
surgery_id, video sha256, and candidate list, never shared across different
surgeries.
"""
from __future__ import annotations
import csv
import copy
import hashlib
import json
import os
import re
import signal
import shutil
import subprocess
import sys
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
import yaml
from loguru import logger
from app.algorithm_runner.reference_bundle_runtime import (
default_reference_bundle_dir,
ensure_reference_nms_patch,
load_reference_default_config,
resolve_reference_bundle_dir,
)
from app.baked import pipeline as bp
from app.consumable_catalog import build_name_mapping, effective_candidate_consumables, normalize_candidate_consumables_raw
from app.domain.consumption import SurgeryConsumptionStored
from app.services.video_batch_cleanup import (
RAW_VISUALIZATION_FILENAME,
VISUALIZATION_FILENAME,
purge_visualization_artifacts,
purge_visualization_pending,
visualization_output_path,
visualization_pending_input_path,
visualization_pending_result_path,
)
@dataclass(frozen=True)
class ReferenceDoctorInfo:
"""Parsed from algorithm_subprocesses/5.15 result footer line ``医生信息:...``."""
doctor_id: str
doctor_name: str | None
display: str
raw_line: str
@dataclass(frozen=True)
class VideoBatchRunResult:
video_sha256: str
candidate_cache_key: str
input_path: Path
work_dir: Path
output_path: Path
details: list[SurgeryConsumptionStored]
reused_cache: bool
doctor: ReferenceDoctorInfo | None = None
visualization_path: Path | None = None
@dataclass(frozen=True)
class ReferenceRunFiles:
config_path: Path
excel_path: Path
whitelist_path: Path
# 标注视频最长边上限(宽 1920 ≈ 1080p绘制与转码共用避免 4K 逐帧 YOLO。
VISUALIZATION_MAX_WIDTH = 1920
def _visualization_ffmpeg_scale_filter() -> str:
return f"scale='min({VISUALIZATION_MAX_WIDTH},iw)':-2"
def browser_transcode_tmp_path(output_path: Path) -> Path:
"""Temp file for atomic replace; must end in ``.mp4`` so ffmpeg picks the muxer."""
return output_path.with_name(f"{output_path.stem}.part{output_path.suffix}")
def _log_subprocess_output(prefix: str, stdout: str, stderr: str, *, max_lines: int = 40) -> None:
"""Emit captured child-process lines at INFO (used after visualize / transcode)."""
for label, text in (("stdout", stdout), ("stderr", stderr)):
lines = [ln for ln in (text or "").splitlines() if ln.strip()]
if not lines:
continue
tail = lines[-max_lines:] if len(lines) > max_lines else lines
for line in tail:
logger.info("{} {}", prefix, line)
def sha256_file(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
return h.hexdigest()
def build_reference_command(
*,
bundle_dir: Path,
config_path: Path,
) -> list[str]:
return [
"uv",
"run",
"python",
"-X",
"faulthandler",
str(bundle_dir / "main.py"),
"--config",
str(config_path),
]
def build_reference_env() -> dict[str, str]:
env = os.environ.copy()
env["PYTHONFAULTHANDLER"] = "1"
env["PYTHONUNBUFFERED"] = "1"
return env
def build_reference_visualization_command(
*,
bundle_dir: Path,
video_path: Path,
result_path: Path,
output_video_path: Path,
) -> list[str]:
cfg = load_reference_default_config(bundle_dir)
weights = cfg.get("weights") if isinstance(cfg.get("weights"), dict) else {}
phase2 = cfg.get("phase2") if isinstance(cfg.get("phase2"), dict) else {}
device_cfg = cfg.get("device") if isinstance(cfg.get("device"), dict) else {}
hand_raw = str((weights or {}).get("hand") or "weights/hand_detect.pt").strip()
hand_model = Path(hand_raw)
if not hand_model.is_absolute():
hand_model = (bundle_dir / hand_model).resolve()
return [
sys.executable,
"-X",
"faulthandler",
str((bundle_dir / "visualize_result_video.py").resolve()),
"--video",
str(video_path.resolve()),
"--result-txt",
str(result_path.resolve()),
"--hand-model",
str(hand_model),
"--out-video",
str(output_video_path.resolve()),
"--device",
str(device_cfg.get("type") or "cuda"),
"--det-conf",
str(phase2.get("det_conf", 0.6)),
"--imgsz-det",
str(phase2.get("imgsz_det", 640)),
"--pad-ratio",
str(phase2.get("pad_ratio", 0.2)),
"--max-width",
str(VISUALIZATION_MAX_WIDTH),
]
def _is_readable_mp4(path: Path) -> bool:
ffprobe = shutil.which("ffprobe")
if ffprobe is None or not path.is_file() or path.stat().st_size < 4096:
return False
proc = subprocess.run(
[ffprobe, "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=codec_name", "-of", "csv=p=0", str(path)],
check=False,
text=True,
capture_output=True,
)
return proc.returncode == 0 and bool((proc.stdout or "").strip())
def _ffprobe_fields(path: Path, entries: str) -> dict[str, str]:
ffprobe = shutil.which("ffprobe")
if ffprobe is None or not path.is_file():
return {}
proc = subprocess.run(
[
ffprobe,
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
entries,
"-of",
"default=noprint_wrappers=1",
str(path),
],
check=False,
text=True,
capture_output=True,
)
if proc.returncode != 0:
return {}
fields: dict[str, str] = {}
for line in proc.stdout.splitlines():
if "=" not in line:
continue
key, value = line.split("=", 1)
fields[key.strip().lower()] = value.strip().lower()
return fields
def _ffprobe_container_format(path: Path) -> str:
ffprobe = shutil.which("ffprobe")
if ffprobe is None or not path.is_file():
return ""
proc = subprocess.run(
[
ffprobe,
"-v",
"error",
"-show_entries",
"format=format_name",
"-of",
"default=noprint_wrappers=1:nokey=1",
str(path),
],
check=False,
text=True,
capture_output=True,
)
if proc.returncode != 0:
return ""
return (proc.stdout or "").strip().lower()
def _is_browser_compatible_mp4(path: Path) -> bool:
fields = _ffprobe_fields(path, "stream=codec_name,pix_fmt")
return fields.get("codec_name") == "h264" and fields.get("pix_fmt") in {"yuv420p", "yuvj420p"}
def _batch_input_needs_normalize(path: Path) -> bool:
"""True when the upload is likely to crash OpenCV/VideoSwin (HEVC, MPEG-PS, >1080p)."""
if not _is_readable_mp4(path):
return True
if not _is_browser_compatible_mp4(path):
return True
container = _ffprobe_container_format(path)
if container and "mpeg" in container:
return True
fields = _ffprobe_fields(path, "stream=codec_name,width,height")
try:
width = int(fields.get("width") or "0")
except ValueError:
width = 0
return width > 1920
def _normalize_batch_input_video(source_path: Path, output_path: Path) -> bool:
"""Remux/transcode DVR uploads to H.264 MP4 (<=1080p) for stable feature extraction."""
ffmpeg = shutil.which("ffmpeg")
if ffmpeg is None or not source_path.is_file():
return False
if not _is_readable_mp4(source_path):
logger.warning("skip batch input normalize: unreadable source {}", source_path)
return False
output_path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = browser_transcode_tmp_path(output_path)
if tmp_path.exists():
tmp_path.unlink()
logger.info(
"ffmpeg batch input normalize starting: {} -> {}",
source_path,
output_path,
)
proc = subprocess.run(
[
ffmpeg,
"-y",
"-hide_banner",
"-loglevel",
"error",
"-fflags",
"+genpts",
"-i",
str(source_path),
"-map",
"0:v:0",
"-an",
"-f",
"mp4",
"-c:v",
"libx264",
"-pix_fmt",
"yuv420p",
"-preset",
"veryfast",
"-crf",
"23",
"-vf",
"scale='min(1920,iw)':-2",
"-movflags",
"+faststart",
str(tmp_path),
],
check=False,
text=True,
capture_output=True,
)
if proc.returncode != 0:
stderr = (proc.stderr or "").strip()
logger.warning("ffmpeg batch input normalize failed: {}", stderr[-3000:])
if tmp_path.exists():
tmp_path.unlink()
return False
if not tmp_path.is_file() or tmp_path.stat().st_size <= 0:
logger.warning("ffmpeg batch input normalize produced empty file: {}", tmp_path)
if tmp_path.exists():
tmp_path.unlink()
return False
tmp_path.replace(output_path)
if not _is_browser_compatible_mp4(output_path):
logger.warning("ffmpeg batch input normalize output not h264/yuv420p: {}", output_path)
output_path.unlink(missing_ok=True)
return False
logger.info(
"ffmpeg batch input normalize complete: {} ({} bytes)",
output_path,
output_path.stat().st_size,
)
return True
def ensure_batch_pipeline_input_video(*, source_path: Path, dest_path: Path) -> None:
"""Write a pipeline-ready MP4 at dest_path (normalize or copy)."""
dest_path.parent.mkdir(parents=True, exist_ok=True)
if dest_path.is_file() and dest_path.stat().st_size > 0 and not _batch_input_needs_normalize(dest_path):
return
if _batch_input_needs_normalize(source_path):
if _normalize_batch_input_video(source_path, dest_path):
return
logger.warning(
"batch input normalize failed, falling back to raw copy: {} -> {}",
source_path,
dest_path,
)
if not dest_path.is_file():
shutil.copy2(source_path, dest_path)
_DOCTOR_NAME_ID_RE = re.compile(
r"^(?P<name>.+?)\s*\(id=(?P<id>[^,\s)]+)(?:,\s*conf=[\d.]+)?\)\s*(?:\[低置信度\])?\s*$"
)
_DOCTOR_ID_ONLY_RE = re.compile(
r"^doctor_id=(?P<id>[^\s(]+)(?:\s*\(conf=[\d.]+\))?\s*(?:\[低置信度\])?\s*$"
)
def parse_reference_doctor_info(path: Path) -> ReferenceDoctorInfo | None:
"""Read ``医生信息:姓名 (id=...)`` footer appended by algorithm_subprocesses/5.15 orchestrator."""
if not path.is_file():
return None
raw_line = ""
for line in path.read_text(encoding="utf-8").splitlines():
stripped = line.strip()
if stripped.startswith("医生信息:") or stripped.startswith("医生信息:"):
raw_line = stripped
break
if not raw_line:
return None
body = raw_line.split("", 1)[-1].split(":", 1)[-1].strip()
if not body or body == "未启用":
return ReferenceDoctorInfo(
doctor_id=bp.VIDEO_RESULT_DOCTOR_ID,
doctor_name=None,
display=body or "未启用",
raw_line=raw_line,
)
if body.startswith("识别失败"):
return ReferenceDoctorInfo(
doctor_id=bp.VIDEO_RESULT_DOCTOR_ID,
doctor_name=None,
display=body,
raw_line=raw_line,
)
match = _DOCTOR_NAME_ID_RE.match(body)
if match:
name = match.group("name").strip()
did = match.group("id").strip()
return ReferenceDoctorInfo(
doctor_id=did,
doctor_name=name,
display=f"{name} ({did})",
raw_line=raw_line,
)
match = _DOCTOR_ID_ONLY_RE.match(body)
if match:
did = match.group("id").strip()
return ReferenceDoctorInfo(
doctor_id=did,
doctor_name=None,
display=did,
raw_line=raw_line,
)
return ReferenceDoctorInfo(
doctor_id=bp.VIDEO_RESULT_DOCTOR_ID,
doctor_name=None,
display=body,
raw_line=raw_line,
)
def is_reference_result_complete(path: Path) -> bool:
"""True when algorithm_subprocesses/5.15 orchestrator has finished writing the TSV (incl. footer)."""
if not path.is_file() or path.stat().st_size <= 0:
return False
lines = [line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip()]
if not any(line.lower().startswith("rank\t") for line in lines):
return False
has_doctor_footer = any(
line.startswith("医生信息:") or line.startswith("医生信息:") for line in lines
)
has_segment_row = False
for line in lines:
if line.lower().startswith("rank\t"):
continue
if line.startswith("医生信息"):
continue
parts = line.split("\t")
if len(parts) >= 5 and parts[0].strip().isdigit():
has_segment_row = True
break
return has_doctor_footer and has_segment_row
def doctor_id_for_consumption_rows(doctor: ReferenceDoctorInfo | None) -> str:
if doctor is None:
return bp.VIDEO_RESULT_DOCTOR_ID
if doctor.doctor_name:
return f"{doctor.doctor_name} ({doctor.doctor_id})"
if doctor.doctor_id and doctor.doctor_id != bp.VIDEO_RESULT_DOCTOR_ID:
return doctor.doctor_id
return bp.VIDEO_RESULT_DOCTOR_ID
def _transcode_visualization_for_browser(source_path: Path, output_path: Path) -> bool:
ffmpeg = shutil.which("ffmpeg")
if ffmpeg is None or not source_path.is_file():
return False
if not _is_readable_mp4(source_path):
logger.warning("skip visualization transcode: unreadable source {}", source_path)
return False
output_path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = browser_transcode_tmp_path(output_path)
if tmp_path.exists():
tmp_path.unlink()
logger.info(
"ffmpeg visualization transcode starting: {} -> {}",
source_path,
output_path,
)
proc = subprocess.run(
[
ffmpeg,
"-y",
"-hide_banner",
"-loglevel",
"error",
"-i",
str(source_path),
"-map",
"0:v:0",
"-an",
"-f",
"mp4",
"-c:v",
"libx264",
"-pix_fmt",
"yuv420p",
"-preset",
"ultrafast",
"-crf",
"23",
"-vf",
_visualization_ffmpeg_scale_filter(),
"-movflags",
"+faststart",
str(tmp_path),
],
check=False,
text=True,
capture_output=True,
)
if proc.returncode != 0:
stderr = (proc.stderr or "").strip()
logger.warning("ffmpeg visualization transcode failed: {}", stderr[-3000:])
if tmp_path.exists():
tmp_path.unlink()
return False
if not tmp_path.is_file() or tmp_path.stat().st_size <= 0:
logger.warning("ffmpeg visualization transcode produced empty file: {}", tmp_path)
if tmp_path.exists():
tmp_path.unlink()
return False
tmp_path.replace(output_path)
if not _is_browser_compatible_mp4(output_path):
logger.warning("ffmpeg output is not browser-compatible h264/yuv420p: {}", output_path)
output_path.unlink(missing_ok=True)
return False
logger.info(
"ffmpeg visualization transcode complete: {} ({} bytes)",
output_path,
output_path.stat().st_size,
)
return True
def ensure_reference_actionformer_nms_patch(bundle_dir: Path) -> bool:
"""Make the reference bundle use our pure-PyTorch ActionFormer NMS.
The upstream bundle imports a compiled ``nms_1d_cpu`` extension during eval.
That extension is not present in deployment, so batch mode must use the same
runtime-safe NMS implementation as the online ActionFormer path.
"""
return ensure_reference_nms_patch(bundle_dir)
def _signal_name(signum: int) -> str:
try:
return signal.Signals(signum).name
except ValueError:
return f"signal {signum}"
def describe_batch_returncode(returncode: int) -> str:
if returncode < 0:
signum = -returncode
return f"terminated by {_signal_name(signum)} ({signum})"
if returncode > 128:
wrapped = returncode - 256
if wrapped < 0:
signum = -wrapped
return f"exit={returncode} (possibly propagated {wrapped}/{_signal_name(signum)})"
return f"exit={returncode}"
def format_batch_failure(returncode: int, *, stdout: str, stderr: str, work_dir: Path, output_path: Path) -> str:
chunks: list[str] = [describe_batch_returncode(returncode), f"work_dir={work_dir}", f"output={output_path}"]
stdout = stdout.strip()
stderr = stderr.strip()
if stdout:
chunks.append(f"stdout:\n{stdout[-3000:]}")
if stderr:
chunks.append(f"stderr:\n{stderr[-3000:]}")
return "\n".join(chunks)
def parse_reference_tsv(
path: Path,
*,
base_timestamp: datetime | None = None,
doctor: ReferenceDoctorInfo | None = None,
) -> list[SurgeryConsumptionStored]:
if base_timestamp is None:
base_timestamp = datetime.now(timezone.utc)
if doctor is None:
doctor = parse_reference_doctor_info(path)
row_doctor_id = doctor_id_for_consumption_rows(doctor)
out: list[SurgeryConsumptionStored] = []
with path.open("r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f, delimiter="\t")
for row in reader:
name = (row.get("top1_name") or "").strip()
if not name or name.startswith(""):
continue
if name.startswith("医生信息"):
continue
item_id = (row.get("product_id_top1") or "").strip() or name
try:
start_sec = float((row.get("start_sec") or "0").strip() or 0.0)
except ValueError:
start_sec = 0.0
out.append(
SurgeryConsumptionStored(
item_id=item_id,
item_name=name,
qty=1,
doctor_id=row_doctor_id,
timestamp=base_timestamp + timedelta(seconds=max(0.0, start_sec)),
source="video_batch",
)
)
return out
def _candidate_cache_key(candidate_consumables: list[str]) -> str:
raw = "\n".join(candidate_consumables).encode("utf-8")
return hashlib.sha256(raw).hexdigest()[:12]
def resolve_reference_candidates(candidate_consumables: list[str] | None) -> list[str]:
requested = normalize_candidate_consumables_raw(list(candidate_consumables or []))
return effective_candidate_consumables(requested)
def write_reference_catalog_excel(
path: Path,
*,
candidate_consumables: list[str],
) -> None:
import pandas as pd
name_to_code = build_name_mapping(candidate_consumables)
rows = [
{
"序号": idx,
"产品编码": name_to_code.get(name, name),
"商品名称": name,
}
for idx, name in enumerate(candidate_consumables, start=1)
]
path.parent.mkdir(parents=True, exist_ok=True)
pd.DataFrame(rows, columns=["序号", "产品编码", "商品名称"]).to_excel(path, index=False)
def write_reference_whitelist_json(path: Path, *, candidate_consumables: list[str]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(
json.dumps({"allowed_names": candidate_consumables}, ensure_ascii=False, indent=2),
encoding="utf-8",
)
def build_reference_config(
*,
bundle_dir: Path,
video_path: Path,
output_path: Path,
work_dir: Path,
excel_path: Path,
whitelist_path: Path,
) -> dict:
cfg = copy.deepcopy(load_reference_default_config(bundle_dir))
cfg["io"]["video"] = str(video_path.resolve())
cfg["io"]["excel"] = str(excel_path.resolve())
cfg["io"]["out"] = str(output_path.resolve())
cfg["io"]["whitelist_json"] = str(whitelist_path.resolve())
cfg["runtime"]["work_dir"] = str(work_dir.resolve())
cfg["runtime"]["keep_work_dir"] = False
return cfg
def prepare_reference_run_files(
*,
bundle_dir: Path,
video_path: Path,
output_path: Path,
work_dir: Path,
config_path: Path,
excel_path: Path,
whitelist_path: Path,
candidate_consumables: list[str],
) -> ReferenceRunFiles:
write_reference_catalog_excel(excel_path, candidate_consumables=candidate_consumables)
write_reference_whitelist_json(whitelist_path, candidate_consumables=candidate_consumables)
config = build_reference_config(
bundle_dir=bundle_dir,
video_path=video_path,
output_path=output_path,
work_dir=work_dir,
excel_path=excel_path,
whitelist_path=whitelist_path,
)
config_path.parent.mkdir(parents=True, exist_ok=True)
config_path.write_text(
yaml.safe_dump(config, allow_unicode=True, sort_keys=False),
encoding="utf-8",
)
return ReferenceRunFiles(
config_path=config_path,
excel_path=excel_path,
whitelist_path=whitelist_path,
)
def _read_reference_video_path_from_config(config_path: Path) -> Path | None:
if not config_path.is_file():
return None
data = yaml.safe_load(config_path.read_text(encoding="utf-8"))
if not isinstance(data, dict):
return None
raw = (((data.get("io") or {}) if isinstance(data.get("io"), dict) else {}).get("video") or "").strip()
if not raw:
return None
return Path(raw).expanduser().resolve()
class VideoBatchRunner:
def __init__(
self,
*,
bundle_dir: Path | None = None,
root_dir: Path | None = None,
) -> None:
repo_root = Path(__file__).resolve().parents[2]
self._bundle_dir_override = bundle_dir
self._root_dir = root_dir or (repo_root / "logs" / "video_batch")
@property
def bundle_dir(self) -> Path:
if self._bundle_dir_override is not None:
return Path(self._bundle_dir_override).expanduser().resolve()
return default_reference_bundle_dir()
@property
def root_dir(self) -> Path:
return self._root_dir
def _generate_visualization(
self,
*,
bundle_dir: Path,
video_path: Path,
result_path: Path,
output_video_path: Path,
) -> Path | None:
raw_video_path = output_video_path.with_name(RAW_VISUALIZATION_FILENAME)
script_path = bundle_dir / "visualize_result_video.py"
if not script_path.is_file():
logger.warning("reference visualization script not found: {}", script_path)
return None
if not video_path.is_file() or not result_path.is_file():
return None
if output_video_path.is_file() and _is_browser_compatible_mp4(output_video_path):
return output_video_path
if raw_video_path.is_file() and not _is_readable_mp4(raw_video_path):
raw_video_path.unlink(missing_ok=True)
if output_video_path.is_file() and not _is_browser_compatible_mp4(output_video_path):
output_video_path.unlink(missing_ok=True)
if raw_video_path.is_file() and _is_readable_mp4(raw_video_path):
logger.info(
"reusing existing visualization source for transcode: {}",
raw_video_path,
)
if _transcode_visualization_for_browser(raw_video_path, output_video_path):
return output_video_path
logger.warning(
"transcode from existing source failed; regenerating visualization: {}",
raw_video_path,
)
raw_video_path.unlink(missing_ok=True)
cmd = build_reference_visualization_command(
bundle_dir=bundle_dir,
video_path=video_path.resolve(),
result_path=result_path.resolve(),
output_video_path=raw_video_path.resolve(),
)
logger.info(
"reference visualization script starting: {}",
" ".join(cmd),
)
proc = subprocess.run(
cmd,
cwd=str(bundle_dir),
check=False,
text=True,
capture_output=True,
env=build_reference_env(),
)
if proc.returncode != 0:
msg = format_batch_failure(
proc.returncode,
stdout=proc.stdout or "",
stderr=proc.stderr or "",
work_dir=output_video_path.parent.resolve(),
output_path=output_video_path.resolve(),
)
logger.error("reference visualization failed: {}", msg)
_log_subprocess_output("visualize", proc.stdout or "", proc.stderr or "")
return None
_log_subprocess_output("visualize", proc.stdout or "", proc.stderr or "")
if not _is_readable_mp4(raw_video_path):
logger.error("reference visualization produced unreadable mp4: {}", raw_video_path)
return None
if _transcode_visualization_for_browser(raw_video_path, output_video_path):
purge_visualization_artifacts(output_video_path.parent)
return output_video_path
logger.error("reference visualization transcode to browser mp4 failed: {}", output_video_path)
return None
def finalize_visualization(
self,
*,
surgery_id: str,
video_path: Path | None = None,
result_path: Path | None = None,
) -> Path | None:
"""Run hand-overlay visualization using staged ``vis_pending/`` inputs."""
logger.info(
"video batch visualization starting for surgery_id={} (visualize_result_video.py)",
surgery_id,
)
video_path = (video_path or visualization_pending_input_path(self._root_dir, surgery_id)).resolve()
result_path = (result_path or visualization_pending_result_path(self._root_dir, surgery_id)).resolve()
output_video_path = visualization_output_path(self._root_dir, surgery_id)
if not is_reference_result_complete(result_path):
logger.warning("skip visualization: incomplete result {}", result_path)
purge_visualization_pending(self._root_dir, surgery_id)
return None
if not video_path.is_file():
logger.warning("skip visualization: missing staged video {}", video_path)
purge_visualization_pending(self._root_dir, surgery_id)
return None
bundle_dir = resolve_reference_bundle_dir(self._bundle_dir_override)
output_video_path.parent.mkdir(parents=True, exist_ok=True)
vis_path = self._generate_visualization(
bundle_dir=bundle_dir,
video_path=video_path,
result_path=result_path,
output_video_path=output_video_path,
)
purge_visualization_pending(self._root_dir, surgery_id)
if vis_path is not None:
logger.info(
"video batch visualization complete for surgery_id={} ({})",
surgery_id,
vis_path,
)
else:
logger.warning("video batch visualization failed for surgery_id={}", surgery_id)
return vis_path
def latest_visualization_path(self, surgery_id: str) -> Path | None:
"""Return an already-generated browser mp4; does not run visualization."""
path = visualization_output_path(self._root_dir, surgery_id)
if path.is_file() and path.stat().st_size > 0 and _is_browser_compatible_mp4(path):
return path
return None
def run(
self,
*,
surgery_id: str,
uploaded_video_path: Path,
original_filename: str = "video.mp4",
candidate_consumables: list[str] | None = None,
include_visualization: bool = False,
) -> VideoBatchRunResult:
bundle_dir = resolve_reference_bundle_dir(self._bundle_dir_override)
uploaded_video_path = uploaded_video_path.resolve()
digest = sha256_file(uploaded_video_path)
candidates = resolve_reference_candidates(candidate_consumables)
candidate_key = _candidate_cache_key(candidates)
surgery_input_dir = self._root_dir / surgery_id / "input"
surgery_input_dir.mkdir(parents=True, exist_ok=True)
surgery_input = surgery_input_dir / f"{digest[:12]}.mp4"
ensure_batch_pipeline_input_video(
source_path=uploaded_video_path,
dest_path=surgery_input,
)
cache_dir = self._root_dir / "cache" / surgery_id / digest / candidate_key
cache_input_dir = cache_dir / "input"
cache_output_dir = cache_dir / "output"
cache_work_dir = cache_dir / "work"
cache_config_dir = cache_dir / "config"
cache_input_dir.mkdir(parents=True, exist_ok=True)
cache_output_dir.mkdir(parents=True, exist_ok=True)
cache_work_dir.mkdir(parents=True, exist_ok=True)
cache_config_dir.mkdir(parents=True, exist_ok=True)
cache_input = cache_input_dir / "input.mp4"
ensure_batch_pipeline_input_video(
source_path=uploaded_video_path,
dest_path=cache_input,
)
output_path = cache_output_dir / "result.tsv"
run_files = prepare_reference_run_files(
bundle_dir=bundle_dir,
video_path=cache_input.resolve(),
output_path=output_path.resolve(),
work_dir=cache_work_dir.resolve(),
config_path=cache_config_dir / "config.yaml",
excel_path=cache_config_dir / "商品信息表.xlsx",
whitelist_path=cache_config_dir / "whitelist.json",
candidate_consumables=candidates,
)
reused_cache = output_path.is_file() and is_reference_result_complete(output_path)
if reused_cache:
logger.info(
"reference batch cache hit for surgery_id={} ({})",
surgery_id,
output_path,
)
else:
ensure_reference_actionformer_nms_patch(bundle_dir)
cmd = build_reference_command(
bundle_dir=bundle_dir,
config_path=run_files.config_path.resolve(),
)
logger.info(
"reference batch starting for surgery_id={} (algorithm_subprocesses main.py, work_dir={})",
surgery_id,
cache_work_dir,
)
proc = subprocess.run(
cmd,
cwd=str(bundle_dir),
check=False,
text=True,
capture_output=True,
env=build_reference_env(),
)
if proc.returncode != 0:
msg = format_batch_failure(
proc.returncode,
stdout=proc.stdout or "",
stderr=proc.stderr or "",
work_dir=cache_work_dir.resolve(),
output_path=output_path.resolve(),
)
raise RuntimeError(f"reference bundle batch run failed {msg}")
if not is_reference_result_complete(output_path):
raise RuntimeError(
f"reference bundle finished but result.tsv is incomplete: {output_path}"
)
logger.info(
"reference batch complete for surgery_id={} ({})",
surgery_id,
output_path,
)
doctor = parse_reference_doctor_info(output_path)
details = parse_reference_tsv(output_path, doctor=doctor)
batch_result = VideoBatchRunResult(
video_sha256=digest,
candidate_cache_key=candidate_key,
input_path=surgery_input,
work_dir=cache_work_dir,
output_path=output_path,
details=details,
reused_cache=reused_cache,
doctor=doctor,
visualization_path=None,
)
return batch_result