From 62b14d73860bbefa38dd45a085e225c84a8d2b88 Mon Sep 17 00:00:00 2001 From: Kevin Date: Fri, 22 May 2026 09:35:41 +0800 Subject: [PATCH] update minio port --- backend/.env.example | 6 +- backend/app/algo_host/__init__.py | 24 + backend/app/algo_host/batch_service.py | 250 +++++ backend/app/algo_host/bundle.py | 59 ++ backend/app/algo_host/job_workspace.py | 120 +++ backend/app/algo_host/result_adapter.py | 178 ++++ backend/app/algo_host/subprocess_runner.py | 180 ++++ backend/app/algo_host/transcode.py | 276 +++++ .../reference_bundle_runtime.py | 72 +- backend/app/baked/algorithm.py | 2 +- backend/app/baked/pipeline.py | 3 - backend/app/routers/recording_demo.py | 8 +- backend/app/services/video_batch_cleanup.py | 2 +- backend/app/services/video_batch_runner.py | 968 ------------------ backend/docker-compose.yml | 4 +- backend/main.py | 4 +- backend/tests/reference_bundle_fixtures.py | 41 + ...atch_runner.py => test_algo_host_batch.py} | 196 ++-- .../test_fastapi_algorithm_subprocess.py | 25 +- backend/tests/test_video_batch_cleanup.py | 2 +- docs/Docker部署.md | 2 +- docs/video-backends.md | 4 +- 22 files changed, 1256 insertions(+), 1170 deletions(-) create mode 100644 backend/app/algo_host/__init__.py create mode 100644 backend/app/algo_host/batch_service.py create mode 100644 backend/app/algo_host/bundle.py create mode 100644 backend/app/algo_host/job_workspace.py create mode 100644 backend/app/algo_host/result_adapter.py create mode 100644 backend/app/algo_host/subprocess_runner.py create mode 100644 backend/app/algo_host/transcode.py delete mode 100644 backend/app/services/video_batch_runner.py create mode 100644 backend/tests/reference_bundle_fixtures.py rename backend/tests/{test_video_batch_runner.py => test_algo_host_batch.py} (80%) diff --git a/backend/.env.example b/backend/.env.example index 7f80c98..ff07384 100755 --- a/backend/.env.example +++ b/backend/.env.example @@ -72,14 +72,14 @@ POSTGRES_PORT=35432 # --- MinIO(语音 WAV)--- # 容器内 API 默认通过 DOCKER_MINIO_ENDPOINT=minio:9000 访问;以下为 compose 默认值。 -# MINIO_ENDPOINT=127.0.0.1:9000 +# MINIO_ENDPOINT=127.0.0.1:19000 # MINIO_ACCESS_KEY=minioadmin # MINIO_SECRET_KEY=minioadmin # MINIO_BUCKET=operation-room-voice # MINIO_SECURE=false # MINIO_REGION= -# MINIO_PORT=9000 -# MINIO_CONSOLE_PORT=9001 +# MINIO_PORT=19000 +# MINIO_CONSOLE_PORT=19001 # --- Demo 客户端 / 语音确认客户端 --- # 独立部署的 Demo 客户端 / 语音确认页访问 API 时需放行 CORS;正式部署建议收窄 origins。 diff --git a/backend/app/algo_host/__init__.py b/backend/app/algo_host/__init__.py new file mode 100644 index 0000000..f788d3e --- /dev/null +++ b/backend/app/algo_host/__init__.py @@ -0,0 +1,24 @@ +"""Thin orchestration around algorithm_subprocesses reference bundles (no in-process inference).""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from app.algo_host.result_adapter import ReferenceDoctorInfo + +if TYPE_CHECKING: + from app.algo_host.batch_service import BatchAlgorithmService, BatchRunResult + +__all__ = ["BatchAlgorithmService", "BatchRunResult", "ReferenceDoctorInfo"] + + +def __getattr__(name: str): + if name == "BatchAlgorithmService": + from app.algo_host.batch_service import BatchAlgorithmService + + return BatchAlgorithmService + if name == "BatchRunResult": + from app.algo_host.batch_service import BatchRunResult + + return BatchRunResult + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/backend/app/algo_host/batch_service.py b/backend/app/algo_host/batch_service.py new file mode 100644 index 0000000..da4f844 --- /dev/null +++ b/backend/app/algo_host/batch_service.py @@ -0,0 +1,250 @@ +"""Offline batch orchestration: spawn 5.15 main.py and optional visualization subprocess.""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path + +from loguru import logger + +from app.algo_host.bundle import default_reference_bundle_dir, resolve_reference_bundle_dir +from app.algo_host.job_workspace import prepare_batch_job +from app.algo_host.result_adapter import ( + ReferenceDoctorInfo, + candidate_cache_key, + is_reference_result_complete, + parse_reference_doctor_info, + parse_reference_tsv, + resolve_reference_candidates, + sha256_file, +) +from app.algo_host.subprocess_runner import run_batch_main, run_visualization_script +from app.algo_host.transcode import ( + ensure_batch_pipeline_input_video, + is_browser_compatible_mp4, + is_readable_mp4, + transcode_visualization_for_browser, +) +from app.domain.consumption import SurgeryConsumptionStored +from app.services.video_batch_cleanup import ( + RAW_VISUALIZATION_FILENAME, + purge_visualization_artifacts, + purge_visualization_pending, + visualization_output_path, + visualization_pending_input_path, + visualization_pending_result_path, +) + + +@dataclass(frozen=True) +class BatchRunResult: + video_sha256: str + candidate_cache_key: str + input_path: Path + work_dir: Path + output_path: Path + details: list[SurgeryConsumptionStored] + reused_cache: bool + doctor: ReferenceDoctorInfo | None = None + visualization_path: Path | None = None + + +def default_batch_root_dir() -> Path: + repo_root = Path(__file__).resolve().parents[2] + return repo_root / "logs" / "video_batch" + + +class BatchAlgorithmService: + def __init__( + self, + *, + bundle_dir: Path | None = None, + root_dir: Path | None = None, + ) -> None: + self._bundle_dir_override = bundle_dir + self._root_dir = root_dir or default_batch_root_dir() + + @property + def bundle_dir(self) -> Path: + if self._bundle_dir_override is not None: + return Path(self._bundle_dir_override).expanduser().resolve() + return default_reference_bundle_dir() + + @property + def root_dir(self) -> Path: + return self._root_dir + + def _generate_visualization( + self, + *, + bundle_dir: Path, + video_path: Path, + result_path: Path, + output_video_path: Path, + ) -> Path | None: + raw_video_path = output_video_path.with_name(RAW_VISUALIZATION_FILENAME) + script_path = bundle_dir / "visualize_result_video.py" + if not script_path.is_file(): + logger.warning("reference visualization script not found: {}", script_path) + return None + if not video_path.is_file() or not result_path.is_file(): + return None + if output_video_path.is_file() and is_browser_compatible_mp4(output_video_path): + return output_video_path + if raw_video_path.is_file() and not is_readable_mp4(raw_video_path): + raw_video_path.unlink(missing_ok=True) + if output_video_path.is_file() and not is_browser_compatible_mp4(output_video_path): + output_video_path.unlink(missing_ok=True) + if raw_video_path.is_file() and is_readable_mp4(raw_video_path): + logger.info( + "reusing existing visualization source for transcode: {}", + raw_video_path, + ) + if transcode_visualization_for_browser(raw_video_path, output_video_path): + return output_video_path + logger.warning( + "transcode from existing source failed; regenerating visualization: {}", + raw_video_path, + ) + raw_video_path.unlink(missing_ok=True) + try: + run_visualization_script( + bundle_dir=bundle_dir, + video_path=video_path, + result_path=result_path, + raw_output_video_path=raw_video_path, + ) + except RuntimeError as exc: + logger.error("reference visualization failed: {}", exc) + return None + if not is_readable_mp4(raw_video_path): + logger.error("reference visualization produced unreadable mp4: {}", raw_video_path) + return None + if transcode_visualization_for_browser(raw_video_path, output_video_path): + purge_visualization_artifacts(output_video_path.parent) + return output_video_path + logger.error("reference visualization transcode to browser mp4 failed: {}", output_video_path) + return None + + def finalize_visualization( + self, + *, + surgery_id: str, + video_path: Path | None = None, + result_path: Path | None = None, + ) -> Path | None: + logger.info( + "video batch visualization starting for surgery_id={} (visualize_result_video.py)", + surgery_id, + ) + video_path = (video_path or visualization_pending_input_path(self._root_dir, surgery_id)).resolve() + result_path = (result_path or visualization_pending_result_path(self._root_dir, surgery_id)).resolve() + output_video_path = visualization_output_path(self._root_dir, surgery_id) + if not is_reference_result_complete(result_path): + logger.warning("skip visualization: incomplete result {}", result_path) + purge_visualization_pending(self._root_dir, surgery_id) + return None + if not video_path.is_file(): + logger.warning("skip visualization: missing staged video {}", video_path) + purge_visualization_pending(self._root_dir, surgery_id) + return None + bundle_dir = resolve_reference_bundle_dir(self._bundle_dir_override) + output_video_path.parent.mkdir(parents=True, exist_ok=True) + vis_path = self._generate_visualization( + bundle_dir=bundle_dir, + video_path=video_path, + result_path=result_path, + output_video_path=output_video_path, + ) + purge_visualization_pending(self._root_dir, surgery_id) + if vis_path is not None: + logger.info( + "video batch visualization complete for surgery_id={} ({})", + surgery_id, + vis_path, + ) + else: + logger.warning("video batch visualization failed for surgery_id={}", surgery_id) + return vis_path + + def latest_visualization_path(self, surgery_id: str) -> Path | None: + path = visualization_output_path(self._root_dir, surgery_id) + if path.is_file() and path.stat().st_size > 0 and is_browser_compatible_mp4(path): + return path + return None + + def run( + self, + *, + surgery_id: str, + uploaded_video_path: Path, + original_filename: str = "video.mp4", + candidate_consumables: list[str] | None = None, + include_visualization: bool = False, + ) -> BatchRunResult: + del original_filename, include_visualization + bundle_dir = resolve_reference_bundle_dir(self._bundle_dir_override) + uploaded_video_path = uploaded_video_path.resolve() + digest = sha256_file(uploaded_video_path) + candidates = resolve_reference_candidates(candidate_consumables) + candidate_key = candidate_cache_key(candidates) + + surgery_input_dir = self._root_dir / surgery_id / "input" + surgery_input_dir.mkdir(parents=True, exist_ok=True) + surgery_input = surgery_input_dir / f"{digest[:12]}.mp4" + ensure_batch_pipeline_input_video( + source_path=uploaded_video_path, + dest_path=surgery_input, + ) + + cache_dir = self._root_dir / "cache" / digest / candidate_key + job = prepare_batch_job( + bundle_dir=self._bundle_dir_override, + cache_dir=cache_dir, + uploaded_video_path=uploaded_video_path, + candidate_consumables=candidates, + ) + + reused_cache = job.output_path.is_file() and is_reference_result_complete(job.output_path) + if reused_cache: + logger.info( + "reference batch cache hit digest={} candidate_key={} ({})", + digest[:12], + candidate_key, + job.output_path, + ) + else: + logger.info( + "reference batch starting for surgery_id={} (5.15/main.py, work_dir={})", + surgery_id, + job.work_dir, + ) + run_batch_main( + bundle_dir=bundle_dir, + config_path=job.config_path.resolve(), + work_dir=job.work_dir.resolve(), + output_path=job.output_path.resolve(), + ) + if not is_reference_result_complete(job.output_path): + raise RuntimeError( + f"reference bundle finished but result.tsv is incomplete: {job.output_path}" + ) + logger.info( + "reference batch complete for surgery_id={} ({})", + surgery_id, + job.output_path, + ) + + doctor = parse_reference_doctor_info(job.output_path) + details = parse_reference_tsv(job.output_path, doctor=doctor) + return BatchRunResult( + video_sha256=digest, + candidate_cache_key=candidate_key, + input_path=surgery_input, + work_dir=job.work_dir, + output_path=job.output_path, + details=details, + reused_cache=reused_cache, + doctor=doctor, + visualization_path=None, + ) diff --git a/backend/app/algo_host/bundle.py b/backend/app/algo_host/bundle.py new file mode 100644 index 0000000..4d0ebf9 --- /dev/null +++ b/backend/app/algo_host/bundle.py @@ -0,0 +1,59 @@ +"""Resolve reference algorithm bundle paths and default YAML (read-only; no vendor patches).""" + +from __future__ import annotations + +import copy +from pathlib import Path +from typing import Any + +import yaml + +REPO_ROOT = Path(__file__).resolve().parents[2] +DEFAULT_REFERENCE_BUNDLE_RELATIVE = "algorithm_subprocesses/5.15" + + +def configured_reference_bundle_relative() -> str: + from app.config import Settings + + raw = (Settings().reference_bundle_relative or "").strip() + return raw or DEFAULT_REFERENCE_BUNDLE_RELATIVE + + +def default_reference_bundle_dir() -> Path: + raw = configured_reference_bundle_relative() + path = Path(raw).expanduser() + if path.is_absolute(): + return path.resolve() + return (REPO_ROOT / path).resolve() + + +def resolve_reference_bundle_dir(bundle_dir: Path | None = None) -> Path: + if bundle_dir is None: + label = configured_reference_bundle_relative() + root = default_reference_bundle_dir() + else: + label = str(bundle_dir) + root = Path(bundle_dir).expanduser().resolve() + if not (root / "main.py").is_file(): + raise FileNotFoundError(f"reference bundle main.py not found: {label} -> {root}") + if not (root / "code" / "repo_root.py").is_file(): + raise FileNotFoundError(f"reference bundle vendor code not found: {label} -> {root / 'code'}") + return root + + +def load_reference_default_config(bundle_dir: Path | None = None) -> dict[str, Any]: + root = resolve_reference_bundle_dir(bundle_dir) + path = root / "configs" / "default_config.yaml" + data = yaml.safe_load(path.read_text(encoding="utf-8")) + if not isinstance(data, dict): + raise ValueError(f"invalid reference bundle default config: {path}") + return copy.deepcopy(data) + + +def resolve_bundle_relative_path(bundle_dir: Path, raw: str) -> Path: + p = Path((raw or "").strip()) + if not str(p): + raise ValueError("empty bundle-relative path") + if p.is_absolute(): + return p.resolve() + return (bundle_dir / p).resolve() diff --git a/backend/app/algo_host/job_workspace.py b/backend/app/algo_host/job_workspace.py new file mode 100644 index 0000000..06b32bb --- /dev/null +++ b/backend/app/algo_host/job_workspace.py @@ -0,0 +1,120 @@ +"""Prepare input artifacts expected by algorithm_subprocesses/5.15 main.py.""" + +from __future__ import annotations + +import copy +import json +from dataclasses import dataclass +from pathlib import Path + +import yaml + +from app.algo_host.bundle import load_reference_default_config, resolve_reference_bundle_dir +from app.algo_host.transcode import ensure_batch_pipeline_input_video +from app.consumable_catalog import build_name_mapping + + +@dataclass(frozen=True) +class BatchJobFiles: + config_path: Path + excel_path: Path + whitelist_path: Path + output_path: Path + work_dir: Path + input_video_path: Path + + +def write_reference_catalog_excel( + path: Path, + *, + candidate_consumables: list[str], +) -> None: + import pandas as pd + + name_to_code = build_name_mapping(candidate_consumables) + rows = [ + { + "序号": idx, + "产品编码": name_to_code.get(name, name), + "商品名称": name, + } + for idx, name in enumerate(candidate_consumables, start=1) + ] + path.parent.mkdir(parents=True, exist_ok=True) + pd.DataFrame(rows, columns=["序号", "产品编码", "商品名称"]).to_excel(path, index=False) + + +def write_reference_whitelist_json(path: Path, *, candidate_consumables: list[str]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text( + json.dumps({"allowed_names": candidate_consumables}, ensure_ascii=False, indent=2), + encoding="utf-8", + ) + + +def build_job_config( + *, + bundle_dir: Path, + video_path: Path, + output_path: Path, + work_dir: Path, + excel_path: Path, + whitelist_path: Path, +) -> dict: + cfg = copy.deepcopy(load_reference_default_config(bundle_dir)) + cfg["io"]["video"] = str(video_path.resolve()) + cfg["io"]["excel"] = str(excel_path.resolve()) + cfg["io"]["out"] = str(output_path.resolve()) + cfg["io"]["whitelist_json"] = str(whitelist_path.resolve()) + cfg["runtime"]["work_dir"] = str(work_dir.resolve()) + cfg["runtime"]["keep_work_dir"] = False + return cfg + + +def prepare_batch_job( + *, + bundle_dir: Path | None, + cache_dir: Path, + uploaded_video_path: Path, + candidate_consumables: list[str], +) -> BatchJobFiles: + root = resolve_reference_bundle_dir(bundle_dir) + cache_input_dir = cache_dir / "input" + cache_output_dir = cache_dir / "output" + cache_work_dir = cache_dir / "work" + cache_config_dir = cache_dir / "config" + for d in (cache_input_dir, cache_output_dir, cache_work_dir, cache_config_dir): + d.mkdir(parents=True, exist_ok=True) + + cache_input = cache_input_dir / "input.mp4" + ensure_batch_pipeline_input_video( + source_path=uploaded_video_path, + dest_path=cache_input, + ) + output_path = cache_output_dir / "result.tsv" + excel_path = cache_config_dir / "商品信息表.xlsx" + whitelist_path = cache_config_dir / "whitelist.json" + config_path = cache_config_dir / "config.yaml" + + write_reference_catalog_excel(excel_path, candidate_consumables=candidate_consumables) + write_reference_whitelist_json(whitelist_path, candidate_consumables=candidate_consumables) + config = build_job_config( + bundle_dir=root, + video_path=cache_input.resolve(), + output_path=output_path.resolve(), + work_dir=cache_work_dir.resolve(), + excel_path=excel_path.resolve(), + whitelist_path=whitelist_path.resolve(), + ) + config_path.write_text( + yaml.safe_dump(config, allow_unicode=True, sort_keys=False), + encoding="utf-8", + ) + return BatchJobFiles( + config_path=config_path, + excel_path=excel_path, + whitelist_path=whitelist_path, + output_path=output_path, + work_dir=cache_work_dir, + input_video_path=cache_input, + ) diff --git a/backend/app/algo_host/result_adapter.py b/backend/app/algo_host/result_adapter.py new file mode 100644 index 0000000..ccd759d --- /dev/null +++ b/backend/app/algo_host/result_adapter.py @@ -0,0 +1,178 @@ +"""Map algorithm_subprocesses/5.15 TSV output to domain objects (orchestration adapter only).""" + +from __future__ import annotations + +import csv +import hashlib +import re +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from pathlib import Path + +from app.baked import pipeline as bp +from app.consumable_catalog import ( + effective_candidate_consumables, + normalize_candidate_consumables_raw, +) +from app.domain.consumption import SurgeryConsumptionStored + + +@dataclass(frozen=True) +class ReferenceDoctorInfo: + doctor_id: str + doctor_name: str | None + display: str + raw_line: str + + +_DOCTOR_NAME_ID_RE = re.compile( + r"^(?P.+?)\s*\(id=(?P[^,\s)]+)(?:,\s*conf=[\d.]+)?\)\s*(?:\[低置信度\])?\s*$" +) +_DOCTOR_ID_ONLY_RE = re.compile( + r"^doctor_id=(?P[^\s(]+)(?:\s*\(conf=[\d.]+\))?\s*(?:\[低置信度\])?\s*$" +) + + +def sha256_file(path: Path) -> str: + h = hashlib.sha256() + with path.open("rb") as f: + for chunk in iter(lambda: f.read(1024 * 1024), b""): + h.update(chunk) + return h.hexdigest() + + +def candidate_cache_key(candidate_consumables: list[str]) -> str: + raw = "\n".join(candidate_consumables).encode("utf-8") + return hashlib.sha256(raw).hexdigest()[:12] + + +def resolve_reference_candidates(candidate_consumables: list[str] | None) -> list[str]: + requested = normalize_candidate_consumables_raw(list(candidate_consumables or [])) + return effective_candidate_consumables(requested) + + +def parse_reference_doctor_info(path: Path) -> ReferenceDoctorInfo | None: + if not path.is_file(): + return None + raw_line = "" + for line in path.read_text(encoding="utf-8").splitlines(): + stripped = line.strip() + if stripped.startswith("医生信息:") or stripped.startswith("医生信息:"): + raw_line = stripped + break + if not raw_line: + return None + + body = raw_line.split(":", 1)[-1].split(":", 1)[-1].strip() + if not body or body == "未启用": + return ReferenceDoctorInfo( + doctor_id=bp.VIDEO_RESULT_DOCTOR_ID, + doctor_name=None, + display=body or "未启用", + raw_line=raw_line, + ) + if body.startswith("识别失败"): + return ReferenceDoctorInfo( + doctor_id=bp.VIDEO_RESULT_DOCTOR_ID, + doctor_name=None, + display=body, + raw_line=raw_line, + ) + + match = _DOCTOR_NAME_ID_RE.match(body) + if match: + name = match.group("name").strip() + did = match.group("id").strip() + return ReferenceDoctorInfo( + doctor_id=did, + doctor_name=name, + display=f"{name} ({did})", + raw_line=raw_line, + ) + + match = _DOCTOR_ID_ONLY_RE.match(body) + if match: + did = match.group("id").strip() + return ReferenceDoctorInfo( + doctor_id=did, + doctor_name=None, + display=did, + raw_line=raw_line, + ) + + return ReferenceDoctorInfo( + doctor_id=bp.VIDEO_RESULT_DOCTOR_ID, + doctor_name=None, + display=body, + raw_line=raw_line, + ) + + +def is_reference_result_complete(path: Path) -> bool: + if not path.is_file() or path.stat().st_size <= 0: + return False + lines = [line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip()] + if not any(line.lower().startswith("rank\t") for line in lines): + return False + has_doctor_footer = any( + line.startswith("医生信息:") or line.startswith("医生信息:") for line in lines + ) + has_segment_row = False + for line in lines: + if line.lower().startswith("rank\t"): + continue + if line.startswith("医生信息"): + continue + parts = line.split("\t") + if len(parts) >= 5 and parts[0].strip().isdigit(): + has_segment_row = True + break + return has_doctor_footer and has_segment_row + + +def doctor_id_for_consumption_rows(doctor: ReferenceDoctorInfo | None) -> str: + if doctor is None: + return bp.VIDEO_RESULT_DOCTOR_ID + if doctor.doctor_name: + return f"{doctor.doctor_name} ({doctor.doctor_id})" + if doctor.doctor_id and doctor.doctor_id != bp.VIDEO_RESULT_DOCTOR_ID: + return doctor.doctor_id + return bp.VIDEO_RESULT_DOCTOR_ID + + +def parse_reference_tsv( + path: Path, + *, + base_timestamp: datetime | None = None, + doctor: ReferenceDoctorInfo | None = None, +) -> list[SurgeryConsumptionStored]: + if base_timestamp is None: + base_timestamp = datetime.now(timezone.utc) + if doctor is None: + doctor = parse_reference_doctor_info(path) + row_doctor_id = doctor_id_for_consumption_rows(doctor) + out: list[SurgeryConsumptionStored] = [] + with path.open("r", encoding="utf-8", newline="") as f: + reader = csv.DictReader(f, delimiter="\t") + for row in reader: + name = (row.get("top1_name") or "").strip() + if not name or name.startswith("("): + continue + if name.startswith("医生信息"): + continue + item_id = (row.get("product_id_top1") or "").strip() or name + try: + start_sec = float((row.get("start_sec") or "0").strip() or 0.0) + except ValueError: + start_sec = 0.0 + out.append( + SurgeryConsumptionStored( + item_id=item_id, + item_name=name, + qty=1, + doctor_id=row_doctor_id, + timestamp=base_timestamp + timedelta(seconds=max(0.0, start_sec)), + source="video_batch", + ) + ) + return out diff --git a/backend/app/algo_host/subprocess_runner.py b/backend/app/algo_host/subprocess_runner.py new file mode 100644 index 0000000..dc099b1 --- /dev/null +++ b/backend/app/algo_host/subprocess_runner.py @@ -0,0 +1,180 @@ +"""Spawn reference bundle child processes (main.py, visualize_result_video.py).""" + +from __future__ import annotations + +import os +import signal +import subprocess +import sys +from pathlib import Path + +from loguru import logger + +from app.algo_host.bundle import load_reference_default_config, resolve_bundle_relative_path +from app.algo_host.transcode import VISUALIZATION_MAX_WIDTH + + +def build_reference_env() -> dict[str, str]: + env = os.environ.copy() + env["PYTHONFAULTHANDLER"] = "1" + env["PYTHONUNBUFFERED"] = "1" + return env + + +def build_batch_main_command(*, bundle_dir: Path, config_path: Path) -> list[str]: + return [ + "uv", + "run", + "python", + "-X", + "faulthandler", + str(bundle_dir / "main.py"), + "--config", + str(config_path), + ] + + +def build_visualization_command( + *, + bundle_dir: Path, + video_path: Path, + result_path: Path, + output_video_path: Path, +) -> list[str]: + cfg = load_reference_default_config(bundle_dir) + weights = cfg.get("weights") if isinstance(cfg.get("weights"), dict) else {} + device_cfg = cfg.get("device") if isinstance(cfg.get("device"), dict) else {} + hand_raw = str((weights or {}).get("hand") or "weights/hand_detect.pt").strip() + hand_model = resolve_bundle_relative_path(bundle_dir, hand_raw) + return [ + sys.executable, + "-X", + "faulthandler", + str((bundle_dir / "visualize_result_video.py").resolve()), + "--video", + str(video_path.resolve()), + "--result-txt", + str(result_path.resolve()), + "--hand-model", + str(hand_model), + "--out-video", + str(output_video_path.resolve()), + "--device", + str(device_cfg.get("type") or "cuda"), + "--max-width", + str(VISUALIZATION_MAX_WIDTH), + ] + + +def _signal_name(signum: int) -> str: + try: + return signal.Signals(signum).name + except ValueError: + return f"signal {signum}" + + +def describe_batch_returncode(returncode: int) -> str: + if returncode < 0: + signum = -returncode + return f"terminated by {_signal_name(signum)} ({signum})" + if returncode > 128: + wrapped = returncode - 256 + if wrapped < 0: + signum = -wrapped + return f"exit={returncode} (possibly propagated {wrapped}/{_signal_name(signum)})" + return f"exit={returncode}" + + +def format_batch_failure( + returncode: int, + *, + stdout: str, + stderr: str, + work_dir: Path, + output_path: Path, +) -> str: + chunks: list[str] = [ + describe_batch_returncode(returncode), + f"work_dir={work_dir}", + f"output={output_path}", + ] + stdout = stdout.strip() + stderr = stderr.strip() + if stdout: + chunks.append(f"stdout:\n{stdout[-3000:]}") + if stderr: + chunks.append(f"stderr:\n{stderr[-3000:]}") + return "\n".join(chunks) + + +def _log_subprocess_output(prefix: str, stdout: str, stderr: str, *, max_lines: int = 40) -> None: + for label, text in (("stdout", stdout), ("stderr", stderr)): + lines = [ln for ln in (text or "").splitlines() if ln.strip()] + if not lines: + continue + tail = lines[-max_lines:] if len(lines) > max_lines else lines + for line in tail: + logger.info("{} {}", prefix, line) + + +def run_subprocess( + cmd: list[str], + *, + cwd: Path, + work_dir: Path, + output_path: Path, + log_label: str, +) -> None: + proc = subprocess.run( + cmd, + cwd=str(cwd), + check=False, + text=True, + capture_output=True, + env=build_reference_env(), + ) + if proc.returncode != 0: + msg = format_batch_failure( + proc.returncode, + stdout=proc.stdout or "", + stderr=proc.stderr or "", + work_dir=work_dir, + output_path=output_path, + ) + raise RuntimeError(f"{log_label} failed {msg}") + _log_subprocess_output(log_label, proc.stdout or "", proc.stderr or "") + + +def run_batch_main(*, bundle_dir: Path, config_path: Path, work_dir: Path, output_path: Path) -> None: + cmd = build_batch_main_command(bundle_dir=bundle_dir, config_path=config_path) + logger.info("reference batch starting: {}", " ".join(cmd)) + run_subprocess( + cmd, + cwd=bundle_dir, + work_dir=work_dir, + output_path=output_path, + log_label="reference batch", + ) + + +def run_visualization_script( + *, + bundle_dir: Path, + video_path: Path, + result_path: Path, + raw_output_video_path: Path, +) -> None: + cmd = build_visualization_command( + bundle_dir=bundle_dir, + video_path=video_path, + result_path=result_path, + output_video_path=raw_output_video_path, + ) + logger.info("reference visualization starting: {}", " ".join(cmd)) + run_subprocess( + cmd, + cwd=bundle_dir, + work_dir=raw_output_video_path.parent, + output_path=raw_output_video_path, + log_label="visualize", + ) diff --git a/backend/app/algo_host/transcode.py b/backend/app/algo_host/transcode.py new file mode 100644 index 0000000..3283059 --- /dev/null +++ b/backend/app/algo_host/transcode.py @@ -0,0 +1,276 @@ +"""FFmpeg/ffprobe helpers for batch uploads and browser-ready MP4 (infrastructure only).""" + +from __future__ import annotations + +import shutil +import subprocess +from pathlib import Path + +from loguru import logger + +VISUALIZATION_MAX_WIDTH = 1920 + + +def visualization_ffmpeg_scale_filter() -> str: + return f"scale='min({VISUALIZATION_MAX_WIDTH},iw)':-2" + + +def browser_transcode_tmp_path(output_path: Path) -> Path: + return output_path.with_name(f"{output_path.stem}.part{output_path.suffix}") + + +def is_readable_mp4(path: Path) -> bool: + ffprobe = shutil.which("ffprobe") + if ffprobe is None or not path.is_file() or path.stat().st_size < 4096: + return False + proc = subprocess.run( + [ + ffprobe, + "-v", + "error", + "-select_streams", + "v:0", + "-show_entries", + "stream=codec_name", + "-of", + "csv=p=0", + str(path), + ], + check=False, + text=True, + capture_output=True, + ) + return proc.returncode == 0 and bool((proc.stdout or "").strip()) + + +def ffprobe_fields(path: Path, entries: str) -> dict[str, str]: + ffprobe = shutil.which("ffprobe") + if ffprobe is None or not path.is_file(): + return {} + proc = subprocess.run( + [ + ffprobe, + "-v", + "error", + "-select_streams", + "v:0", + "-show_entries", + entries, + "-of", + "default=noprint_wrappers=1", + str(path), + ], + check=False, + text=True, + capture_output=True, + ) + if proc.returncode != 0: + return {} + fields: dict[str, str] = {} + for line in proc.stdout.splitlines(): + if "=" not in line: + continue + key, value = line.split("=", 1) + fields[key.strip().lower()] = value.strip().lower() + return fields + + +def ffprobe_container_format(path: Path) -> str: + ffprobe = shutil.which("ffprobe") + if ffprobe is None or not path.is_file(): + return "" + proc = subprocess.run( + [ + ffprobe, + "-v", + "error", + "-show_entries", + "format=format_name", + "-of", + "default=noprint_wrappers=1:nokey=1", + str(path), + ], + check=False, + text=True, + capture_output=True, + ) + if proc.returncode != 0: + return "" + return (proc.stdout or "").strip().lower() + + +def is_browser_compatible_mp4(path: Path) -> bool: + fields = ffprobe_fields(path, "stream=codec_name,pix_fmt") + return fields.get("codec_name") == "h264" and fields.get("pix_fmt") in {"yuv420p", "yuvj420p"} + + +def batch_input_needs_normalize(path: Path) -> bool: + if not is_readable_mp4(path): + return True + if not is_browser_compatible_mp4(path): + return True + container = ffprobe_container_format(path) + if container and "mpeg" in container: + return True + fields = ffprobe_fields(path, "stream=codec_name,width,height") + try: + width = int(fields.get("width") or "0") + except ValueError: + width = 0 + return width > 1920 + + +def normalize_batch_input_video(source_path: Path, output_path: Path) -> bool: + ffmpeg = shutil.which("ffmpeg") + if ffmpeg is None or not source_path.is_file(): + return False + if not is_readable_mp4(source_path): + logger.warning("skip batch input normalize: unreadable source {}", source_path) + return False + output_path.parent.mkdir(parents=True, exist_ok=True) + tmp_path = browser_transcode_tmp_path(output_path) + if tmp_path.exists(): + tmp_path.unlink() + logger.info("ffmpeg batch input normalize starting: {} -> {}", source_path, output_path) + proc = subprocess.run( + [ + ffmpeg, + "-y", + "-hide_banner", + "-loglevel", + "error", + "-fflags", + "+genpts", + "-i", + str(source_path), + "-map", + "0:v:0", + "-an", + "-f", + "mp4", + "-c:v", + "libx264", + "-pix_fmt", + "yuv420p", + "-preset", + "veryfast", + "-crf", + "23", + "-vf", + "scale='min(1920,iw)':-2", + "-movflags", + "+faststart", + str(tmp_path), + ], + check=False, + text=True, + capture_output=True, + ) + if proc.returncode != 0: + stderr = (proc.stderr or "").strip() + logger.warning("ffmpeg batch input normalize failed: {}", stderr[-3000:]) + if tmp_path.exists(): + tmp_path.unlink() + return False + if not tmp_path.is_file() or tmp_path.stat().st_size <= 0: + logger.warning("ffmpeg batch input normalize produced empty file: {}", tmp_path) + if tmp_path.exists(): + tmp_path.unlink() + return False + tmp_path.replace(output_path) + if not is_browser_compatible_mp4(output_path): + logger.warning("ffmpeg batch input normalize output not h264/yuv420p: {}", output_path) + output_path.unlink(missing_ok=True) + return False + logger.info( + "ffmpeg batch input normalize complete: {} ({} bytes)", + output_path, + output_path.stat().st_size, + ) + return True + + +def ensure_batch_pipeline_input_video(*, source_path: Path, dest_path: Path) -> None: + import shutil as sh + + dest_path.parent.mkdir(parents=True, exist_ok=True) + if dest_path.is_file() and dest_path.stat().st_size > 0 and not batch_input_needs_normalize(dest_path): + return + if batch_input_needs_normalize(source_path): + if normalize_batch_input_video(source_path, dest_path): + return + logger.warning( + "batch input normalize failed, falling back to raw copy: {} -> {}", + source_path, + dest_path, + ) + if not dest_path.is_file(): + sh.copy2(source_path, dest_path) + + +def transcode_visualization_for_browser(source_path: Path, output_path: Path) -> bool: + ffmpeg = shutil.which("ffmpeg") + if ffmpeg is None or not source_path.is_file(): + return False + if not is_readable_mp4(source_path): + logger.warning("skip visualization transcode: unreadable source {}", source_path) + return False + output_path.parent.mkdir(parents=True, exist_ok=True) + tmp_path = browser_transcode_tmp_path(output_path) + if tmp_path.exists(): + tmp_path.unlink() + logger.info("ffmpeg visualization transcode starting: {} -> {}", source_path, output_path) + proc = subprocess.run( + [ + ffmpeg, + "-y", + "-hide_banner", + "-loglevel", + "error", + "-i", + str(source_path), + "-map", + "0:v:0", + "-an", + "-f", + "mp4", + "-c:v", + "libx264", + "-pix_fmt", + "yuv420p", + "-preset", + "ultrafast", + "-crf", + "23", + "-vf", + visualization_ffmpeg_scale_filter(), + "-movflags", + "+faststart", + str(tmp_path), + ], + check=False, + text=True, + capture_output=True, + ) + if proc.returncode != 0: + stderr = (proc.stderr or "").strip() + logger.warning("ffmpeg visualization transcode failed: {}", stderr[-3000:]) + if tmp_path.exists(): + tmp_path.unlink() + return False + if not tmp_path.is_file() or tmp_path.stat().st_size <= 0: + logger.warning("ffmpeg visualization transcode produced empty file: {}", tmp_path) + if tmp_path.exists(): + tmp_path.unlink() + return False + tmp_path.replace(output_path) + if not is_browser_compatible_mp4(output_path): + logger.warning("ffmpeg output is not browser-compatible h264/yuv420p: {}", output_path) + output_path.unlink(missing_ok=True) + return False + logger.info( + "ffmpeg visualization transcode complete: {} ({} bytes)", + output_path, + output_path.stat().st_size, + ) + return True diff --git a/backend/app/algorithm_runner/reference_bundle_runtime.py b/backend/app/algorithm_runner/reference_bundle_runtime.py index c2cb396..78b63cd 100644 --- a/backend/app/algorithm_runner/reference_bundle_runtime.py +++ b/backend/app/algorithm_runner/reference_bundle_runtime.py @@ -1,46 +1,32 @@ -"""Helpers for running the configured reference algorithm bundle.""" +"""Realtime path: bundle sys.path setup + NMS patch. Shared bundle resolution lives in app.algo_host.bundle.""" from __future__ import annotations -import copy import shutil import sys from pathlib import Path -from typing import Any -import yaml +from app.algo_host.bundle import ( + DEFAULT_REFERENCE_BUNDLE_RELATIVE, + REPO_ROOT, + configured_reference_bundle_relative, + default_reference_bundle_dir, + load_reference_default_config, + resolve_bundle_relative_path, + resolve_reference_bundle_dir, +) -REPO_ROOT = Path(__file__).resolve().parents[2] -DEFAULT_REFERENCE_BUNDLE_RELATIVE = "algorithm_subprocesses/5.15" - - -def configured_reference_bundle_relative() -> str: - from app.config import Settings - - raw = (Settings().reference_bundle_relative or "").strip() - return raw or DEFAULT_REFERENCE_BUNDLE_RELATIVE - - -def default_reference_bundle_dir() -> Path: - raw = configured_reference_bundle_relative() - path = Path(raw).expanduser() - if path.is_absolute(): - return path.resolve() - return (REPO_ROOT / path).resolve() - - -def resolve_reference_bundle_dir(bundle_dir: Path | None = None) -> Path: - if bundle_dir is None: - label = configured_reference_bundle_relative() - root = default_reference_bundle_dir() - else: - label = str(bundle_dir) - root = Path(bundle_dir).expanduser().resolve() - if not (root / "main.py").is_file(): - raise FileNotFoundError(f"reference bundle main.py not found: {label} -> {root}") - if not (root / "code" / "repo_root.py").is_file(): - raise FileNotFoundError(f"reference bundle vendor code not found: {label} -> {root / 'code'}") - return root +__all__ = [ + "DEFAULT_REFERENCE_BUNDLE_RELATIVE", + "REPO_ROOT", + "configured_reference_bundle_relative", + "default_reference_bundle_dir", + "ensure_reference_bundle_on_path", + "ensure_reference_nms_patch", + "load_reference_default_config", + "reference_weight_path", + "resolve_reference_bundle_dir", +] def ensure_reference_bundle_on_path(bundle_dir: Path | None = None) -> Path: @@ -65,7 +51,7 @@ def ensure_reference_bundle_on_path(bundle_dir: Path | None = None) -> Path: def ensure_reference_nms_patch(bundle_dir: Path | None = None) -> bool: - """Patch the reference ActionFormer to use the backend's pure-PyTorch NMS.""" + """Patch the reference ActionFormer to use the backend's pure-PyTorch NMS (realtime only).""" root = resolve_reference_bundle_dir(bundle_dir) source = REPO_ROOT / "app" / "algorithm_runner" / "actionformer_release" / "libs" / "utils" / "nms.py" @@ -84,21 +70,9 @@ def ensure_reference_nms_patch(bundle_dir: Path | None = None) -> bool: return True -def load_reference_default_config(bundle_dir: Path | None = None) -> dict[str, Any]: - root = resolve_reference_bundle_dir(bundle_dir) - path = root / "configs" / "default_config.yaml" - data = yaml.safe_load(path.read_text(encoding="utf-8")) - if not isinstance(data, dict): - raise ValueError(f"invalid reference bundle default config: {path}") - return copy.deepcopy(data) - - def reference_weight_path(key: str, bundle_dir: Path | None = None) -> Path: cfg = load_reference_default_config(bundle_dir) raw = (((cfg.get("weights") or {}) if isinstance(cfg, dict) else {}).get(key) or "").strip() if not raw: raise KeyError(f"reference bundle weight key not configured: {key}") - p = Path(raw) - if p.is_absolute(): - return p.resolve() - return (resolve_reference_bundle_dir(bundle_dir) / p).resolve() + return resolve_bundle_relative_path(resolve_reference_bundle_dir(bundle_dir), raw) diff --git a/backend/app/baked/algorithm.py b/backend/app/baked/algorithm.py index 6fdfd1e..36553e5 100644 --- a/backend/app/baked/algorithm.py +++ b/backend/app/baked/algorithm.py @@ -4,7 +4,7 @@ from __future__ import annotations from pathlib import Path -from app.algorithm_runner.reference_bundle_runtime import default_reference_bundle_dir +from app.algo_host.bundle import default_reference_bundle_dir _PACKAGE_DIR = Path(__file__).resolve().parent.parent diff --git a/backend/app/baked/pipeline.py b/backend/app/baked/pipeline.py index 52d973e..abc5eeb 100644 --- a/backend/app/baked/pipeline.py +++ b/backend/app/baked/pipeline.py @@ -43,6 +43,3 @@ VOICE_CONFIRM_MAX_FAILED_PARSE_ROUNDS: int = 2 # --- 非实时 batch 标注视频临时保留(小时)--- VIDEO_BATCH_VIS_TTL_HOURS: int = 24 - -# --- 非实时 batch 标注视频临时保留 --- -VIDEO_BATCH_VIS_TTL_HOURS: int = 24 diff --git a/backend/app/routers/recording_demo.py b/backend/app/routers/recording_demo.py index 7838328..2cd53e6 100644 --- a/backend/app/routers/recording_demo.py +++ b/backend/app/routers/recording_demo.py @@ -30,7 +30,7 @@ from app.services.video_batch_cleanup import ( purge_surgery_batch_tree, stage_visualization_pending, ) -from app.services.video_batch_runner import VideoBatchRunner +from app.algo_host import BatchAlgorithmService from app.services.voice_terminal_hub import VoiceTerminalHub from app.surgery_errors import SurgeryPipelineError @@ -63,7 +63,7 @@ def _require_site_config_path() -> Path: def _background_finalize_visualization( - runner: VideoBatchRunner, + runner: BatchAlgorithmService, surgery_id: str, ) -> None: try: @@ -136,7 +136,7 @@ async def offline_batch( OFFLINE_BATCH_FLOW_MARKER, include_visualization, ) - runner = VideoBatchRunner() + runner = BatchAlgorithmService() suffix = Path(video1.filename or "video.mp4").suffix or ".mp4" work_root = runner.root_dir / surgery_id / "upload" work_root.mkdir(parents=True, exist_ok=True) @@ -227,7 +227,7 @@ async def offline_batch_visualization(surgery_id: str) -> FileResponse: status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail="surgery_id must be exactly 6 digits", ) - runner = VideoBatchRunner() + runner = BatchAlgorithmService() path = runner.latest_visualization_path(surgery_id) if path is None: raise HTTPException( diff --git a/backend/app/services/video_batch_cleanup.py b/backend/app/services/video_batch_cleanup.py index 04f5a1b..275e36e 100644 --- a/backend/app/services/video_batch_cleanup.py +++ b/backend/app/services/video_batch_cleanup.py @@ -88,7 +88,7 @@ def purge_batch_artifacts( ) -> None: """Remove one cache entry plus surgery upload/input copies.""" - cache_entry = root_dir / "cache" / surgery_id / digest / candidate_key + cache_entry = root_dir / "cache" / digest / candidate_key _safe_rmtree(cache_entry) _prune_empty_parents(cache_entry.parent, stop_at=root_dir / "cache") diff --git a/backend/app/services/video_batch_runner.py b/backend/app/services/video_batch_runner.py deleted file mode 100644 index 5df6188..0000000 --- a/backend/app/services/video_batch_runner.py +++ /dev/null @@ -1,968 +0,0 @@ -"""Batch video mode: run the configured reference bundle on a complete MP4. - -This path generates the YAML/Excel/whitelist inputs expected by -the reference bundle's ``main.py``. Intermediate files are cached per -surgery_id, video sha256, and candidate list, never shared across different -surgeries. -""" - -from __future__ import annotations - -import csv -import copy -import hashlib -import json -import os -import re -import signal -import shutil -import subprocess -import sys -from dataclasses import dataclass -from datetime import datetime, timedelta, timezone -from pathlib import Path - -import yaml -from loguru import logger - -from app.algorithm_runner.reference_bundle_runtime import ( - default_reference_bundle_dir, - ensure_reference_nms_patch, - load_reference_default_config, - resolve_reference_bundle_dir, -) -from app.baked import pipeline as bp -from app.consumable_catalog import build_name_mapping, effective_candidate_consumables, normalize_candidate_consumables_raw -from app.domain.consumption import SurgeryConsumptionStored -from app.services.video_batch_cleanup import ( - RAW_VISUALIZATION_FILENAME, - VISUALIZATION_FILENAME, - purge_visualization_artifacts, - purge_visualization_pending, - visualization_output_path, - visualization_pending_input_path, - visualization_pending_result_path, -) - - -@dataclass(frozen=True) -class ReferenceDoctorInfo: - """Parsed from algorithm_subprocesses/5.15 result footer line ``医生信息:...``.""" - - doctor_id: str - doctor_name: str | None - display: str - raw_line: str - - -@dataclass(frozen=True) -class VideoBatchRunResult: - video_sha256: str - candidate_cache_key: str - input_path: Path - work_dir: Path - output_path: Path - details: list[SurgeryConsumptionStored] - reused_cache: bool - doctor: ReferenceDoctorInfo | None = None - visualization_path: Path | None = None - - -@dataclass(frozen=True) -class ReferenceRunFiles: - config_path: Path - excel_path: Path - whitelist_path: Path - - -# 标注视频最长边上限(宽 1920 ≈ 1080p),绘制与转码共用,避免 4K 逐帧 YOLO。 -VISUALIZATION_MAX_WIDTH = 1920 - - -def _visualization_ffmpeg_scale_filter() -> str: - return f"scale='min({VISUALIZATION_MAX_WIDTH},iw)':-2" - - -def browser_transcode_tmp_path(output_path: Path) -> Path: - """Temp file for atomic replace; must end in ``.mp4`` so ffmpeg picks the muxer.""" - - return output_path.with_name(f"{output_path.stem}.part{output_path.suffix}") - - -def _log_subprocess_output(prefix: str, stdout: str, stderr: str, *, max_lines: int = 40) -> None: - """Emit captured child-process lines at INFO (used after visualize / transcode).""" - - for label, text in (("stdout", stdout), ("stderr", stderr)): - lines = [ln for ln in (text or "").splitlines() if ln.strip()] - if not lines: - continue - tail = lines[-max_lines:] if len(lines) > max_lines else lines - for line in tail: - logger.info("{} {}", prefix, line) - - -def sha256_file(path: Path) -> str: - h = hashlib.sha256() - with path.open("rb") as f: - for chunk in iter(lambda: f.read(1024 * 1024), b""): - h.update(chunk) - return h.hexdigest() - - -def build_reference_command( - *, - bundle_dir: Path, - config_path: Path, -) -> list[str]: - return [ - "uv", - "run", - "python", - "-X", - "faulthandler", - str(bundle_dir / "main.py"), - "--config", - str(config_path), - ] - - -def build_reference_env() -> dict[str, str]: - env = os.environ.copy() - env["PYTHONFAULTHANDLER"] = "1" - env["PYTHONUNBUFFERED"] = "1" - return env - - -def build_reference_visualization_command( - *, - bundle_dir: Path, - video_path: Path, - result_path: Path, - output_video_path: Path, -) -> list[str]: - cfg = load_reference_default_config(bundle_dir) - weights = cfg.get("weights") if isinstance(cfg.get("weights"), dict) else {} - phase2 = cfg.get("phase2") if isinstance(cfg.get("phase2"), dict) else {} - device_cfg = cfg.get("device") if isinstance(cfg.get("device"), dict) else {} - hand_raw = str((weights or {}).get("hand") or "weights/hand_detect.pt").strip() - hand_model = Path(hand_raw) - if not hand_model.is_absolute(): - hand_model = (bundle_dir / hand_model).resolve() - return [ - sys.executable, - "-X", - "faulthandler", - str((bundle_dir / "visualize_result_video.py").resolve()), - "--video", - str(video_path.resolve()), - "--result-txt", - str(result_path.resolve()), - "--hand-model", - str(hand_model), - "--out-video", - str(output_video_path.resolve()), - "--device", - str(device_cfg.get("type") or "cuda"), - "--det-conf", - str(phase2.get("det_conf", 0.6)), - "--imgsz-det", - str(phase2.get("imgsz_det", 640)), - "--pad-ratio", - str(phase2.get("pad_ratio", 0.2)), - "--max-width", - str(VISUALIZATION_MAX_WIDTH), - ] - - -def _is_readable_mp4(path: Path) -> bool: - ffprobe = shutil.which("ffprobe") - if ffprobe is None or not path.is_file() or path.stat().st_size < 4096: - return False - proc = subprocess.run( - [ffprobe, "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=codec_name", "-of", "csv=p=0", str(path)], - check=False, - text=True, - capture_output=True, - ) - return proc.returncode == 0 and bool((proc.stdout or "").strip()) - - -def _ffprobe_fields(path: Path, entries: str) -> dict[str, str]: - ffprobe = shutil.which("ffprobe") - if ffprobe is None or not path.is_file(): - return {} - proc = subprocess.run( - [ - ffprobe, - "-v", - "error", - "-select_streams", - "v:0", - "-show_entries", - entries, - "-of", - "default=noprint_wrappers=1", - str(path), - ], - check=False, - text=True, - capture_output=True, - ) - if proc.returncode != 0: - return {} - fields: dict[str, str] = {} - for line in proc.stdout.splitlines(): - if "=" not in line: - continue - key, value = line.split("=", 1) - fields[key.strip().lower()] = value.strip().lower() - return fields - - -def _ffprobe_container_format(path: Path) -> str: - ffprobe = shutil.which("ffprobe") - if ffprobe is None or not path.is_file(): - return "" - proc = subprocess.run( - [ - ffprobe, - "-v", - "error", - "-show_entries", - "format=format_name", - "-of", - "default=noprint_wrappers=1:nokey=1", - str(path), - ], - check=False, - text=True, - capture_output=True, - ) - if proc.returncode != 0: - return "" - return (proc.stdout or "").strip().lower() - - -def _is_browser_compatible_mp4(path: Path) -> bool: - fields = _ffprobe_fields(path, "stream=codec_name,pix_fmt") - return fields.get("codec_name") == "h264" and fields.get("pix_fmt") in {"yuv420p", "yuvj420p"} - - -def _batch_input_needs_normalize(path: Path) -> bool: - """True when the upload is likely to crash OpenCV/VideoSwin (HEVC, MPEG-PS, >1080p).""" - - if not _is_readable_mp4(path): - return True - if not _is_browser_compatible_mp4(path): - return True - container = _ffprobe_container_format(path) - if container and "mpeg" in container: - return True - fields = _ffprobe_fields(path, "stream=codec_name,width,height") - try: - width = int(fields.get("width") or "0") - except ValueError: - width = 0 - return width > 1920 - - -def _normalize_batch_input_video(source_path: Path, output_path: Path) -> bool: - """Remux/transcode DVR uploads to H.264 MP4 (<=1080p) for stable feature extraction.""" - - ffmpeg = shutil.which("ffmpeg") - if ffmpeg is None or not source_path.is_file(): - return False - if not _is_readable_mp4(source_path): - logger.warning("skip batch input normalize: unreadable source {}", source_path) - return False - output_path.parent.mkdir(parents=True, exist_ok=True) - tmp_path = browser_transcode_tmp_path(output_path) - if tmp_path.exists(): - tmp_path.unlink() - logger.info( - "ffmpeg batch input normalize starting: {} -> {}", - source_path, - output_path, - ) - proc = subprocess.run( - [ - ffmpeg, - "-y", - "-hide_banner", - "-loglevel", - "error", - "-fflags", - "+genpts", - "-i", - str(source_path), - "-map", - "0:v:0", - "-an", - "-f", - "mp4", - "-c:v", - "libx264", - "-pix_fmt", - "yuv420p", - "-preset", - "veryfast", - "-crf", - "23", - "-vf", - "scale='min(1920,iw)':-2", - "-movflags", - "+faststart", - str(tmp_path), - ], - check=False, - text=True, - capture_output=True, - ) - if proc.returncode != 0: - stderr = (proc.stderr or "").strip() - logger.warning("ffmpeg batch input normalize failed: {}", stderr[-3000:]) - if tmp_path.exists(): - tmp_path.unlink() - return False - if not tmp_path.is_file() or tmp_path.stat().st_size <= 0: - logger.warning("ffmpeg batch input normalize produced empty file: {}", tmp_path) - if tmp_path.exists(): - tmp_path.unlink() - return False - tmp_path.replace(output_path) - if not _is_browser_compatible_mp4(output_path): - logger.warning("ffmpeg batch input normalize output not h264/yuv420p: {}", output_path) - output_path.unlink(missing_ok=True) - return False - logger.info( - "ffmpeg batch input normalize complete: {} ({} bytes)", - output_path, - output_path.stat().st_size, - ) - return True - - -def ensure_batch_pipeline_input_video(*, source_path: Path, dest_path: Path) -> None: - """Write a pipeline-ready MP4 at dest_path (normalize or copy).""" - - dest_path.parent.mkdir(parents=True, exist_ok=True) - if dest_path.is_file() and dest_path.stat().st_size > 0 and not _batch_input_needs_normalize(dest_path): - return - if _batch_input_needs_normalize(source_path): - if _normalize_batch_input_video(source_path, dest_path): - return - logger.warning( - "batch input normalize failed, falling back to raw copy: {} -> {}", - source_path, - dest_path, - ) - if not dest_path.is_file(): - shutil.copy2(source_path, dest_path) - - -_DOCTOR_NAME_ID_RE = re.compile( - r"^(?P.+?)\s*\(id=(?P[^,\s)]+)(?:,\s*conf=[\d.]+)?\)\s*(?:\[低置信度\])?\s*$" -) -_DOCTOR_ID_ONLY_RE = re.compile( - r"^doctor_id=(?P[^\s(]+)(?:\s*\(conf=[\d.]+\))?\s*(?:\[低置信度\])?\s*$" -) - - -def parse_reference_doctor_info(path: Path) -> ReferenceDoctorInfo | None: - """Read ``医生信息:姓名 (id=...)`` footer appended by algorithm_subprocesses/5.15 orchestrator.""" - - if not path.is_file(): - return None - raw_line = "" - for line in path.read_text(encoding="utf-8").splitlines(): - stripped = line.strip() - if stripped.startswith("医生信息:") or stripped.startswith("医生信息:"): - raw_line = stripped - break - if not raw_line: - return None - - body = raw_line.split(":", 1)[-1].split(":", 1)[-1].strip() - if not body or body == "未启用": - return ReferenceDoctorInfo( - doctor_id=bp.VIDEO_RESULT_DOCTOR_ID, - doctor_name=None, - display=body or "未启用", - raw_line=raw_line, - ) - if body.startswith("识别失败"): - return ReferenceDoctorInfo( - doctor_id=bp.VIDEO_RESULT_DOCTOR_ID, - doctor_name=None, - display=body, - raw_line=raw_line, - ) - - match = _DOCTOR_NAME_ID_RE.match(body) - if match: - name = match.group("name").strip() - did = match.group("id").strip() - return ReferenceDoctorInfo( - doctor_id=did, - doctor_name=name, - display=f"{name} ({did})", - raw_line=raw_line, - ) - - match = _DOCTOR_ID_ONLY_RE.match(body) - if match: - did = match.group("id").strip() - return ReferenceDoctorInfo( - doctor_id=did, - doctor_name=None, - display=did, - raw_line=raw_line, - ) - - return ReferenceDoctorInfo( - doctor_id=bp.VIDEO_RESULT_DOCTOR_ID, - doctor_name=None, - display=body, - raw_line=raw_line, - ) - - -def is_reference_result_complete(path: Path) -> bool: - """True when algorithm_subprocesses/5.15 orchestrator has finished writing the TSV (incl. footer).""" - - if not path.is_file() or path.stat().st_size <= 0: - return False - lines = [line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip()] - if not any(line.lower().startswith("rank\t") for line in lines): - return False - has_doctor_footer = any( - line.startswith("医生信息:") or line.startswith("医生信息:") for line in lines - ) - has_segment_row = False - for line in lines: - if line.lower().startswith("rank\t"): - continue - if line.startswith("医生信息"): - continue - parts = line.split("\t") - if len(parts) >= 5 and parts[0].strip().isdigit(): - has_segment_row = True - break - return has_doctor_footer and has_segment_row - - -def doctor_id_for_consumption_rows(doctor: ReferenceDoctorInfo | None) -> str: - if doctor is None: - return bp.VIDEO_RESULT_DOCTOR_ID - if doctor.doctor_name: - return f"{doctor.doctor_name} ({doctor.doctor_id})" - if doctor.doctor_id and doctor.doctor_id != bp.VIDEO_RESULT_DOCTOR_ID: - return doctor.doctor_id - return bp.VIDEO_RESULT_DOCTOR_ID - - -def _transcode_visualization_for_browser(source_path: Path, output_path: Path) -> bool: - ffmpeg = shutil.which("ffmpeg") - if ffmpeg is None or not source_path.is_file(): - return False - if not _is_readable_mp4(source_path): - logger.warning("skip visualization transcode: unreadable source {}", source_path) - return False - output_path.parent.mkdir(parents=True, exist_ok=True) - tmp_path = browser_transcode_tmp_path(output_path) - if tmp_path.exists(): - tmp_path.unlink() - logger.info( - "ffmpeg visualization transcode starting: {} -> {}", - source_path, - output_path, - ) - proc = subprocess.run( - [ - ffmpeg, - "-y", - "-hide_banner", - "-loglevel", - "error", - "-i", - str(source_path), - "-map", - "0:v:0", - "-an", - "-f", - "mp4", - "-c:v", - "libx264", - "-pix_fmt", - "yuv420p", - "-preset", - "ultrafast", - "-crf", - "23", - "-vf", - _visualization_ffmpeg_scale_filter(), - "-movflags", - "+faststart", - str(tmp_path), - ], - check=False, - text=True, - capture_output=True, - ) - if proc.returncode != 0: - stderr = (proc.stderr or "").strip() - logger.warning("ffmpeg visualization transcode failed: {}", stderr[-3000:]) - if tmp_path.exists(): - tmp_path.unlink() - return False - if not tmp_path.is_file() or tmp_path.stat().st_size <= 0: - logger.warning("ffmpeg visualization transcode produced empty file: {}", tmp_path) - if tmp_path.exists(): - tmp_path.unlink() - return False - tmp_path.replace(output_path) - if not _is_browser_compatible_mp4(output_path): - logger.warning("ffmpeg output is not browser-compatible h264/yuv420p: {}", output_path) - output_path.unlink(missing_ok=True) - return False - logger.info( - "ffmpeg visualization transcode complete: {} ({} bytes)", - output_path, - output_path.stat().st_size, - ) - return True - - -def ensure_reference_actionformer_nms_patch(bundle_dir: Path) -> bool: - """Make the reference bundle use our pure-PyTorch ActionFormer NMS. - - The upstream bundle imports a compiled ``nms_1d_cpu`` extension during eval. - That extension is not present in deployment, so batch mode must use the same - runtime-safe NMS implementation as the online ActionFormer path. - """ - - return ensure_reference_nms_patch(bundle_dir) - - -def _signal_name(signum: int) -> str: - try: - return signal.Signals(signum).name - except ValueError: - return f"signal {signum}" - - -def describe_batch_returncode(returncode: int) -> str: - if returncode < 0: - signum = -returncode - return f"terminated by {_signal_name(signum)} ({signum})" - if returncode > 128: - wrapped = returncode - 256 - if wrapped < 0: - signum = -wrapped - return f"exit={returncode} (possibly propagated {wrapped}/{_signal_name(signum)})" - return f"exit={returncode}" - - -def format_batch_failure(returncode: int, *, stdout: str, stderr: str, work_dir: Path, output_path: Path) -> str: - chunks: list[str] = [describe_batch_returncode(returncode), f"work_dir={work_dir}", f"output={output_path}"] - stdout = stdout.strip() - stderr = stderr.strip() - if stdout: - chunks.append(f"stdout:\n{stdout[-3000:]}") - if stderr: - chunks.append(f"stderr:\n{stderr[-3000:]}") - return "\n".join(chunks) - - -def parse_reference_tsv( - path: Path, - *, - base_timestamp: datetime | None = None, - doctor: ReferenceDoctorInfo | None = None, -) -> list[SurgeryConsumptionStored]: - if base_timestamp is None: - base_timestamp = datetime.now(timezone.utc) - if doctor is None: - doctor = parse_reference_doctor_info(path) - row_doctor_id = doctor_id_for_consumption_rows(doctor) - out: list[SurgeryConsumptionStored] = [] - with path.open("r", encoding="utf-8", newline="") as f: - reader = csv.DictReader(f, delimiter="\t") - for row in reader: - name = (row.get("top1_name") or "").strip() - if not name or name.startswith("("): - continue - if name.startswith("医生信息"): - continue - item_id = (row.get("product_id_top1") or "").strip() or name - try: - start_sec = float((row.get("start_sec") or "0").strip() or 0.0) - except ValueError: - start_sec = 0.0 - out.append( - SurgeryConsumptionStored( - item_id=item_id, - item_name=name, - qty=1, - doctor_id=row_doctor_id, - timestamp=base_timestamp + timedelta(seconds=max(0.0, start_sec)), - source="video_batch", - ) - ) - return out - - -def _candidate_cache_key(candidate_consumables: list[str]) -> str: - raw = "\n".join(candidate_consumables).encode("utf-8") - return hashlib.sha256(raw).hexdigest()[:12] - - -def resolve_reference_candidates(candidate_consumables: list[str] | None) -> list[str]: - requested = normalize_candidate_consumables_raw(list(candidate_consumables or [])) - return effective_candidate_consumables(requested) - - -def write_reference_catalog_excel( - path: Path, - *, - candidate_consumables: list[str], -) -> None: - import pandas as pd - - name_to_code = build_name_mapping(candidate_consumables) - rows = [ - { - "序号": idx, - "产品编码": name_to_code.get(name, name), - "商品名称": name, - } - for idx, name in enumerate(candidate_consumables, start=1) - ] - path.parent.mkdir(parents=True, exist_ok=True) - pd.DataFrame(rows, columns=["序号", "产品编码", "商品名称"]).to_excel(path, index=False) - - -def write_reference_whitelist_json(path: Path, *, candidate_consumables: list[str]) -> None: - path.parent.mkdir(parents=True, exist_ok=True) - path.write_text( - json.dumps({"allowed_names": candidate_consumables}, ensure_ascii=False, indent=2), - encoding="utf-8", - ) - - -def build_reference_config( - *, - bundle_dir: Path, - video_path: Path, - output_path: Path, - work_dir: Path, - excel_path: Path, - whitelist_path: Path, -) -> dict: - cfg = copy.deepcopy(load_reference_default_config(bundle_dir)) - cfg["io"]["video"] = str(video_path.resolve()) - cfg["io"]["excel"] = str(excel_path.resolve()) - cfg["io"]["out"] = str(output_path.resolve()) - cfg["io"]["whitelist_json"] = str(whitelist_path.resolve()) - cfg["runtime"]["work_dir"] = str(work_dir.resolve()) - cfg["runtime"]["keep_work_dir"] = False - return cfg - - -def prepare_reference_run_files( - *, - bundle_dir: Path, - video_path: Path, - output_path: Path, - work_dir: Path, - config_path: Path, - excel_path: Path, - whitelist_path: Path, - candidate_consumables: list[str], -) -> ReferenceRunFiles: - write_reference_catalog_excel(excel_path, candidate_consumables=candidate_consumables) - write_reference_whitelist_json(whitelist_path, candidate_consumables=candidate_consumables) - config = build_reference_config( - bundle_dir=bundle_dir, - video_path=video_path, - output_path=output_path, - work_dir=work_dir, - excel_path=excel_path, - whitelist_path=whitelist_path, - ) - config_path.parent.mkdir(parents=True, exist_ok=True) - config_path.write_text( - yaml.safe_dump(config, allow_unicode=True, sort_keys=False), - encoding="utf-8", - ) - return ReferenceRunFiles( - config_path=config_path, - excel_path=excel_path, - whitelist_path=whitelist_path, - ) - - -def _read_reference_video_path_from_config(config_path: Path) -> Path | None: - if not config_path.is_file(): - return None - data = yaml.safe_load(config_path.read_text(encoding="utf-8")) - if not isinstance(data, dict): - return None - raw = (((data.get("io") or {}) if isinstance(data.get("io"), dict) else {}).get("video") or "").strip() - if not raw: - return None - return Path(raw).expanduser().resolve() - - -class VideoBatchRunner: - def __init__( - self, - *, - bundle_dir: Path | None = None, - root_dir: Path | None = None, - ) -> None: - repo_root = Path(__file__).resolve().parents[2] - self._bundle_dir_override = bundle_dir - self._root_dir = root_dir or (repo_root / "logs" / "video_batch") - - @property - def bundle_dir(self) -> Path: - if self._bundle_dir_override is not None: - return Path(self._bundle_dir_override).expanduser().resolve() - return default_reference_bundle_dir() - - @property - def root_dir(self) -> Path: - return self._root_dir - - def _generate_visualization( - self, - *, - bundle_dir: Path, - video_path: Path, - result_path: Path, - output_video_path: Path, - ) -> Path | None: - raw_video_path = output_video_path.with_name(RAW_VISUALIZATION_FILENAME) - script_path = bundle_dir / "visualize_result_video.py" - if not script_path.is_file(): - logger.warning("reference visualization script not found: {}", script_path) - return None - if not video_path.is_file() or not result_path.is_file(): - return None - if output_video_path.is_file() and _is_browser_compatible_mp4(output_video_path): - return output_video_path - if raw_video_path.is_file() and not _is_readable_mp4(raw_video_path): - raw_video_path.unlink(missing_ok=True) - if output_video_path.is_file() and not _is_browser_compatible_mp4(output_video_path): - output_video_path.unlink(missing_ok=True) - if raw_video_path.is_file() and _is_readable_mp4(raw_video_path): - logger.info( - "reusing existing visualization source for transcode: {}", - raw_video_path, - ) - if _transcode_visualization_for_browser(raw_video_path, output_video_path): - return output_video_path - logger.warning( - "transcode from existing source failed; regenerating visualization: {}", - raw_video_path, - ) - raw_video_path.unlink(missing_ok=True) - cmd = build_reference_visualization_command( - bundle_dir=bundle_dir, - video_path=video_path.resolve(), - result_path=result_path.resolve(), - output_video_path=raw_video_path.resolve(), - ) - logger.info( - "reference visualization script starting: {}", - " ".join(cmd), - ) - proc = subprocess.run( - cmd, - cwd=str(bundle_dir), - check=False, - text=True, - capture_output=True, - env=build_reference_env(), - ) - if proc.returncode != 0: - msg = format_batch_failure( - proc.returncode, - stdout=proc.stdout or "", - stderr=proc.stderr or "", - work_dir=output_video_path.parent.resolve(), - output_path=output_video_path.resolve(), - ) - logger.error("reference visualization failed: {}", msg) - _log_subprocess_output("visualize", proc.stdout or "", proc.stderr or "") - return None - _log_subprocess_output("visualize", proc.stdout or "", proc.stderr or "") - if not _is_readable_mp4(raw_video_path): - logger.error("reference visualization produced unreadable mp4: {}", raw_video_path) - return None - if _transcode_visualization_for_browser(raw_video_path, output_video_path): - purge_visualization_artifacts(output_video_path.parent) - return output_video_path - logger.error("reference visualization transcode to browser mp4 failed: {}", output_video_path) - return None - - def finalize_visualization( - self, - *, - surgery_id: str, - video_path: Path | None = None, - result_path: Path | None = None, - ) -> Path | None: - """Run hand-overlay visualization using staged ``vis_pending/`` inputs.""" - - logger.info( - "video batch visualization starting for surgery_id={} (visualize_result_video.py)", - surgery_id, - ) - video_path = (video_path or visualization_pending_input_path(self._root_dir, surgery_id)).resolve() - result_path = (result_path or visualization_pending_result_path(self._root_dir, surgery_id)).resolve() - output_video_path = visualization_output_path(self._root_dir, surgery_id) - if not is_reference_result_complete(result_path): - logger.warning("skip visualization: incomplete result {}", result_path) - purge_visualization_pending(self._root_dir, surgery_id) - return None - if not video_path.is_file(): - logger.warning("skip visualization: missing staged video {}", video_path) - purge_visualization_pending(self._root_dir, surgery_id) - return None - bundle_dir = resolve_reference_bundle_dir(self._bundle_dir_override) - output_video_path.parent.mkdir(parents=True, exist_ok=True) - vis_path = self._generate_visualization( - bundle_dir=bundle_dir, - video_path=video_path, - result_path=result_path, - output_video_path=output_video_path, - ) - purge_visualization_pending(self._root_dir, surgery_id) - if vis_path is not None: - logger.info( - "video batch visualization complete for surgery_id={} ({})", - surgery_id, - vis_path, - ) - else: - logger.warning("video batch visualization failed for surgery_id={}", surgery_id) - return vis_path - - def latest_visualization_path(self, surgery_id: str) -> Path | None: - """Return an already-generated browser mp4; does not run visualization.""" - - path = visualization_output_path(self._root_dir, surgery_id) - if path.is_file() and path.stat().st_size > 0 and _is_browser_compatible_mp4(path): - return path - return None - - def run( - self, - *, - surgery_id: str, - uploaded_video_path: Path, - original_filename: str = "video.mp4", - candidate_consumables: list[str] | None = None, - include_visualization: bool = False, - ) -> VideoBatchRunResult: - bundle_dir = resolve_reference_bundle_dir(self._bundle_dir_override) - uploaded_video_path = uploaded_video_path.resolve() - digest = sha256_file(uploaded_video_path) - candidates = resolve_reference_candidates(candidate_consumables) - candidate_key = _candidate_cache_key(candidates) - - surgery_input_dir = self._root_dir / surgery_id / "input" - surgery_input_dir.mkdir(parents=True, exist_ok=True) - surgery_input = surgery_input_dir / f"{digest[:12]}.mp4" - ensure_batch_pipeline_input_video( - source_path=uploaded_video_path, - dest_path=surgery_input, - ) - - cache_dir = self._root_dir / "cache" / surgery_id / digest / candidate_key - cache_input_dir = cache_dir / "input" - cache_output_dir = cache_dir / "output" - cache_work_dir = cache_dir / "work" - cache_config_dir = cache_dir / "config" - cache_input_dir.mkdir(parents=True, exist_ok=True) - cache_output_dir.mkdir(parents=True, exist_ok=True) - cache_work_dir.mkdir(parents=True, exist_ok=True) - cache_config_dir.mkdir(parents=True, exist_ok=True) - cache_input = cache_input_dir / "input.mp4" - ensure_batch_pipeline_input_video( - source_path=uploaded_video_path, - dest_path=cache_input, - ) - output_path = cache_output_dir / "result.tsv" - run_files = prepare_reference_run_files( - bundle_dir=bundle_dir, - video_path=cache_input.resolve(), - output_path=output_path.resolve(), - work_dir=cache_work_dir.resolve(), - config_path=cache_config_dir / "config.yaml", - excel_path=cache_config_dir / "商品信息表.xlsx", - whitelist_path=cache_config_dir / "whitelist.json", - candidate_consumables=candidates, - ) - - reused_cache = output_path.is_file() and is_reference_result_complete(output_path) - if reused_cache: - logger.info( - "reference batch cache hit for surgery_id={} ({})", - surgery_id, - output_path, - ) - else: - ensure_reference_actionformer_nms_patch(bundle_dir) - cmd = build_reference_command( - bundle_dir=bundle_dir, - config_path=run_files.config_path.resolve(), - ) - logger.info( - "reference batch starting for surgery_id={} (algorithm_subprocesses main.py, work_dir={})", - surgery_id, - cache_work_dir, - ) - proc = subprocess.run( - cmd, - cwd=str(bundle_dir), - check=False, - text=True, - capture_output=True, - env=build_reference_env(), - ) - if proc.returncode != 0: - msg = format_batch_failure( - proc.returncode, - stdout=proc.stdout or "", - stderr=proc.stderr or "", - work_dir=cache_work_dir.resolve(), - output_path=output_path.resolve(), - ) - raise RuntimeError(f"reference bundle batch run failed {msg}") - if not is_reference_result_complete(output_path): - raise RuntimeError( - f"reference bundle finished but result.tsv is incomplete: {output_path}" - ) - logger.info( - "reference batch complete for surgery_id={} ({})", - surgery_id, - output_path, - ) - - doctor = parse_reference_doctor_info(output_path) - details = parse_reference_tsv(output_path, doctor=doctor) - batch_result = VideoBatchRunResult( - video_sha256=digest, - candidate_cache_key=candidate_key, - input_path=surgery_input, - work_dir=cache_work_dir, - output_path=output_path, - details=details, - reused_cache=reused_cache, - doctor=doctor, - visualization_path=None, - ) - return batch_result diff --git a/backend/docker-compose.yml b/backend/docker-compose.yml index b1a4a80..5d937df 100644 --- a/backend/docker-compose.yml +++ b/backend/docker-compose.yml @@ -28,8 +28,8 @@ services: MINIO_ROOT_USER: ${MINIO_ACCESS_KEY:-minioadmin} MINIO_ROOT_PASSWORD: ${MINIO_SECRET_KEY:-minioadmin} ports: - - "${MINIO_PORT:-9000}:9000" - - "${MINIO_CONSOLE_PORT:-9001}:9001" + - "${MINIO_PORT:-19000}:9000" + - "${MINIO_CONSOLE_PORT:-19001}:9001" volumes: - minio_data:/data restart: unless-stopped diff --git a/backend/main.py b/backend/main.py index 19147ca..890e60d 100644 --- a/backend/main.py +++ b/backend/main.py @@ -51,11 +51,11 @@ async def lifespan(app: FastAPI): ) from app.baked import pipeline as bp from app.services.video_batch_cleanup import purge_expired_visualizations - from app.services.video_batch_runner import VideoBatchRunner + from app.algo_host.batch_service import BatchAlgorithmService repo_root = Path(__file__).resolve().parent purge_expired_visualizations( - VideoBatchRunner(root_dir=repo_root / "logs" / "video_batch").root_dir, + BatchAlgorithmService(root_dir=repo_root / "logs" / "video_batch").root_dir, ttl_hours=float(bp.VIDEO_BATCH_VIS_TTL_HOURS), ) container = build_container(settings) diff --git a/backend/tests/reference_bundle_fixtures.py b/backend/tests/reference_bundle_fixtures.py new file mode 100644 index 0000000..497cee6 --- /dev/null +++ b/backend/tests/reference_bundle_fixtures.py @@ -0,0 +1,41 @@ +"""Shared fixtures for minimal reference bundle trees in batch/subprocess tests.""" + +from __future__ import annotations + +from pathlib import Path + +import yaml + + +def complete_result_tsv_body() -> str: + return ( + "rank\tstart_sec\tend_sec\tproduct_id_top1\ttop1_name\ttop1_conf\n" + "1\t0\t1\tP1\t耗材1\t1.0\n" + "医生信息:测试医生 (id=123, conf=0.99)\n" + ) + + +def write_minimal_reference_bundle(bundle: Path) -> None: + bundle.mkdir(parents=True) + (bundle / "main.py").write_text("# fake\n", encoding="utf-8") + (bundle / "code").mkdir() + (bundle / "code" / "repo_root.py").write_text("# fake\n", encoding="utf-8") + (bundle / "configs").mkdir() + (bundle / "configs" / "default_config.yaml").write_text( + yaml.safe_dump( + { + "io": {"video": "", "excel": "", "out": "", "whitelist_json": None}, + "weights": {}, + "runtime": {"work_dir": None, "keep_work_dir": False, "python": None}, + "device": {}, + "phase1": {}, + "phase2": {}, + "classification": {}, + "tear_merge": {}, + "output": {}, + }, + allow_unicode=True, + sort_keys=False, + ), + encoding="utf-8", + ) diff --git a/backend/tests/test_video_batch_runner.py b/backend/tests/test_algo_host_batch.py similarity index 80% rename from backend/tests/test_video_batch_runner.py rename to backend/tests/test_algo_host_batch.py index 19e1810..ddad57c 100644 --- a/backend/tests/test_video_batch_runner.py +++ b/backend/tests/test_algo_host_batch.py @@ -1,3 +1,5 @@ +"""Tests for offline batch orchestration (app.algo_host).""" + from __future__ import annotations import json @@ -12,73 +14,42 @@ import yaml from fastapi import FastAPI from fastapi.testclient import TestClient -from app.algorithm_runner import reference_bundle_runtime +from app.algo_host import bundle as bundle_runtime +from app.algo_host.batch_service import BatchAlgorithmService, BatchRunResult +from app.algo_host.job_workspace import build_job_config +from app.algo_host.result_adapter import ( + doctor_id_for_consumption_rows, + is_reference_result_complete, + parse_reference_doctor_info, + parse_reference_tsv, +) +from app.algo_host.subprocess_runner import ( + build_batch_main_command, + build_visualization_command, + describe_batch_returncode, + format_batch_failure, +) +from app.algo_host.transcode import ( + VISUALIZATION_MAX_WIDTH, + batch_input_needs_normalize, + browser_transcode_tmp_path, + ensure_batch_pipeline_input_video, + is_browser_compatible_mp4, + transcode_visualization_for_browser, +) from app.api import router as api_router from app.dependencies import get_surgery_pipeline from app.domain.consumption import SurgeryConsumptionStored from app.routers import recording_demo from app.schemas import SurgeryConsumptionDetail -from app.services.video_batch_runner import ( - VideoBatchRunResult, - VideoBatchRunner, - _batch_input_needs_normalize, - _is_browser_compatible_mp4, - _transcode_visualization_for_browser, - browser_transcode_tmp_path, - build_reference_command, - VISUALIZATION_MAX_WIDTH, - build_reference_config, - build_reference_visualization_command, - describe_batch_returncode, - doctor_id_for_consumption_rows, - ensure_batch_pipeline_input_video, - ensure_reference_actionformer_nms_patch, - format_batch_failure, - is_reference_result_complete, - parse_reference_doctor_info, - parse_reference_tsv, -) from app.services.video_batch_cleanup import VISUALIZATION_FILENAME, visualization_output_path +from tests.reference_bundle_fixtures import complete_result_tsv_body, write_minimal_reference_bundle -def _complete_result_tsv_body() -> str: - return ( - "rank\tstart_sec\tend_sec\tproduct_id_top1\ttop1_name\ttop1_conf\n" - "1\t0\t1\tP1\t耗材1\t1.0\n" - "医生信息:测试医生 (id=123, conf=0.99)\n" - ) - - -def _write_minimal_reference_bundle(bundle: Path) -> None: - bundle.mkdir(parents=True) - (bundle / "main.py").write_text("# fake\n", encoding="utf-8") - (bundle / "code").mkdir() - (bundle / "code" / "repo_root.py").write_text("# fake\n", encoding="utf-8") - (bundle / "configs").mkdir() - (bundle / "configs" / "default_config.yaml").write_text( - yaml.safe_dump( - { - "io": {"video": "", "excel": "", "out": "", "whitelist_json": None}, - "weights": {}, - "runtime": {"work_dir": None, "keep_work_dir": False, "python": None}, - "device": {}, - "phase1": {}, - "phase2": {}, - "classification": {}, - "tear_merge": {}, - "output": {}, - }, - allow_unicode=True, - sort_keys=False, - ), - encoding="utf-8", - ) - - -def test_build_reference_config_does_not_keep_work_dir(tmp_path: Path) -> None: +def test_build_job_config_does_not_keep_work_dir(tmp_path: Path) -> None: bundle = tmp_path / "bundle" - _write_minimal_reference_bundle(bundle) - cfg = build_reference_config( + write_minimal_reference_bundle(bundle) + cfg = build_job_config( bundle_dir=bundle, video_path=tmp_path / "input.mp4", output_path=tmp_path / "out.tsv", @@ -91,7 +62,7 @@ def test_build_reference_config_does_not_keep_work_dir(tmp_path: Path) -> None: def test_latest_visualization_path_uses_vis_directory(tmp_path: Path) -> None: root = tmp_path / "batch" - runner = VideoBatchRunner(root_dir=root) + runner = BatchAlgorithmService(root_dir=root) assert runner.latest_visualization_path("100001") is None vis_path = visualization_output_path(root, "100001") @@ -102,7 +73,7 @@ def test_latest_visualization_path_uses_vis_directory(tmp_path: Path) -> None: def test_is_reference_result_complete_requires_footer_and_rows(tmp_path: Path) -> None: complete = tmp_path / "complete.tsv" - complete.write_text(_complete_result_tsv_body(), encoding="utf-8") + complete.write_text(complete_result_tsv_body(), encoding="utf-8") partial = tmp_path / "partial.tsv" partial.write_text( "rank\tstart_sec\tend_sec\tproduct_id_top1\ttop1_name\ttop1_conf\n" @@ -225,11 +196,11 @@ def test_ensure_batch_pipeline_input_video_normalizes_non_h264(tmp_path: Path) - text=True, ) assert proc.returncode == 0, proc.stderr - assert _batch_input_needs_normalize(source) + assert batch_input_needs_normalize(source) ensure_batch_pipeline_input_video(source_path=source, dest_path=dest) assert dest.is_file() - assert _is_browser_compatible_mp4(dest) - assert not _batch_input_needs_normalize(dest) + assert is_browser_compatible_mp4(dest) + assert not batch_input_needs_normalize(dest) @pytest.mark.skipif(shutil.which("ffmpeg") is None, reason="ffmpeg not installed") @@ -259,28 +230,27 @@ def test_transcode_visualization_for_browser_writes_h264_mp4(tmp_path: Path) -> text=True, ) assert proc.returncode == 0, proc.stderr - assert _transcode_visualization_for_browser(source, output) + assert transcode_visualization_for_browser(source, output) assert output.is_file() assert output.stat().st_size > 0 assert not browser_transcode_tmp_path(output).exists() - assert _is_browser_compatible_mp4(output) + assert is_browser_compatible_mp4(output) -def test_build_reference_visualization_command_uses_hand_model_and_result_tsv( +def test_build_visualization_command_uses_hand_model_and_result_tsv( tmp_path: Path, ) -> None: bundle = tmp_path / "bundle" - _write_minimal_reference_bundle(bundle) + write_minimal_reference_bundle(bundle) (bundle / "weights").mkdir() (bundle / "weights" / "hand_detect.pt").write_bytes(b"fake") (bundle / "visualize_result_video.py").write_text("# fake\n", encoding="utf-8") cfg_path = bundle / "configs" / "default_config.yaml" cfg = yaml.safe_load(cfg_path.read_text(encoding="utf-8")) cfg["weights"]["hand"] = "weights/hand_detect.pt" - cfg["phase2"]["det_conf"] = 0.55 cfg_path.write_text(yaml.safe_dump(cfg, allow_unicode=True, sort_keys=False), encoding="utf-8") - cmd = build_reference_visualization_command( + cmd = build_visualization_command( bundle_dir=bundle, video_path=tmp_path / "input.mp4", result_path=tmp_path / "result.tsv", @@ -291,14 +261,13 @@ def test_build_reference_visualization_command_uses_hand_model_and_result_tsv( assert str(tmp_path / "result.tsv") in cmd assert "--hand-model" in cmd assert str(bundle / "weights" / "hand_detect.pt") in cmd - assert "--det-conf" in cmd - assert cmd[cmd.index("--det-conf") + 1] == "0.55" + assert "--det-conf" not in cmd assert "--max-width" in cmd assert cmd[cmd.index("--max-width") + 1] == str(VISUALIZATION_MAX_WIDTH) -def test_build_reference_command_uses_5_15_main_py(tmp_path: Path) -> None: - cmd = build_reference_command( +def test_build_batch_main_command_uses_5_15_main_py(tmp_path: Path) -> None: + cmd = build_batch_main_command( bundle_dir=tmp_path / "algorithm_subprocesses" / "5.15", config_path=tmp_path / "config.yaml", ) @@ -309,12 +278,12 @@ def test_build_reference_command_uses_5_15_main_py(tmp_path: Path) -> None: assert cmd[6:] == ["--config", str(tmp_path / "config.yaml")] -def test_video_batch_runner_uses_reference_bundle_relative_env_override( +def test_batch_service_respects_reference_bundle_relative_env( tmp_path: Path, monkeypatch, ) -> None: bundle = tmp_path / "algorithm_subprocesses" / "custom" - _write_minimal_reference_bundle(bundle) + write_minimal_reference_bundle(bundle) video = tmp_path / "case.mp4" video.write_bytes(b"same-video") calls: list[list[str]] = [] @@ -329,14 +298,14 @@ def test_video_batch_runner_uses_reference_bundle_relative_env_override( config = yaml.safe_load(Path(cmd[cmd.index("--config") + 1]).read_text(encoding="utf-8")) output = Path(config["io"]["out"]) output.parent.mkdir(parents=True, exist_ok=True) - output.write_text(_complete_result_tsv_body(), encoding="utf-8") + output.write_text(complete_result_tsv_body(), encoding="utf-8") return _Proc() monkeypatch.setenv("REFERENCE_BUNDLE_RELATIVE", "algorithm_subprocesses/custom") - monkeypatch.setattr(reference_bundle_runtime, "REPO_ROOT", tmp_path) - monkeypatch.setattr("app.services.video_batch_runner.subprocess.run", fake_run) + monkeypatch.setattr(bundle_runtime, "REPO_ROOT", tmp_path) + monkeypatch.setattr("app.algo_host.subprocess_runner.subprocess.run", fake_run) - runner = VideoBatchRunner(root_dir=tmp_path / "batch") + runner = BatchAlgorithmService(root_dir=tmp_path / "batch") result = runner.run( surgery_id="100001", uploaded_video_path=video, @@ -349,7 +318,7 @@ def test_video_batch_runner_uses_reference_bundle_relative_env_override( assert result.details[0].item_name == "耗材1" -def test_video_batch_runner_reuses_cache_within_same_surgery( +def test_batch_service_reuses_cache_on_repeat_run( tmp_path: Path, monkeypatch, ) -> None: @@ -423,16 +392,16 @@ def test_video_batch_runner_reuses_cache_within_same_surgery( config = yaml.safe_load(Path(cmd[cmd.index("--config") + 1]).read_text(encoding="utf-8")) output = Path(config["io"]["out"]) output.parent.mkdir(parents=True, exist_ok=True) - output.write_text(_complete_result_tsv_body(), encoding="utf-8") + output.write_text(complete_result_tsv_body(), encoding="utf-8") return _Proc() - monkeypatch.setattr("app.services.video_batch_runner.subprocess.run", fake_run) + monkeypatch.setattr("app.algo_host.subprocess_runner.subprocess.run", fake_run) monkeypatch.setattr( - "app.services.video_batch_runner.VideoBatchRunner._generate_visualization", + "app.algo_host.batch_service.BatchAlgorithmService._generate_visualization", lambda *_a, **_k: None, ) - runner = VideoBatchRunner(bundle_dir=bundle, root_dir=tmp_path / "batch") + runner = BatchAlgorithmService(bundle_dir=bundle, root_dir=tmp_path / "batch") first = runner.run( surgery_id="100001", uploaded_video_path=video, @@ -459,7 +428,7 @@ def test_video_batch_runner_reuses_cache_within_same_surgery( assert whitelist == {"allowed_names": ["耗材1"]} -def test_video_batch_runner_does_not_share_cache_across_surgeries( +def test_batch_service_shares_cache_across_surgeries_for_same_video( tmp_path: Path, monkeypatch, ) -> None: @@ -501,40 +470,27 @@ def test_video_batch_runner_does_not_share_cache_across_surgeries( config = yaml.safe_load(Path(cmd[cmd.index("--config") + 1]).read_text(encoding="utf-8")) output = Path(config["io"]["out"]) output.parent.mkdir(parents=True, exist_ok=True) - output.write_text(_complete_result_tsv_body(), encoding="utf-8") + output.write_text(complete_result_tsv_body(), encoding="utf-8") return _Proc() - monkeypatch.setattr("app.services.video_batch_runner.subprocess.run", fake_run) + monkeypatch.setattr("app.algo_host.subprocess_runner.subprocess.run", fake_run) monkeypatch.setattr( - "app.services.video_batch_runner.VideoBatchRunner._generate_visualization", + "app.algo_host.batch_service.BatchAlgorithmService._generate_visualization", lambda *_a, **_k: None, ) - runner = VideoBatchRunner(bundle_dir=bundle, root_dir=tmp_path / "batch") + runner = BatchAlgorithmService(bundle_dir=bundle, root_dir=tmp_path / "batch") first = runner.run(surgery_id="100001", uploaded_video_path=video, original_filename="case.mp4", candidate_consumables=[]) second = runner.run(surgery_id="100002", uploaded_video_path=video, original_filename="case.mp4", candidate_consumables=[]) - assert len(calls) == 2 + assert len(calls) == 1 assert first.reused_cache is False - assert second.reused_cache is False + assert second.reused_cache is True assert first.video_sha256 == second.video_sha256 - assert "100001" in str(first.output_path) - assert "100002" in str(second.output_path) - - -def test_ensure_reference_actionformer_nms_patch_replaces_compiled_extension_import( - tmp_path: Path, -) -> None: - bundle = tmp_path / "bundle" - _write_minimal_reference_bundle(bundle) - nms_path = bundle / "code" / "actionformer_release" / "libs" / "utils" / "nms.py" - nms_path.parent.mkdir(parents=True) - nms_path.write_text("from . import nms_1d_cpu\n", encoding="utf-8") - - assert ensure_reference_actionformer_nms_patch(bundle) is True - patched = nms_path.read_text(encoding="utf-8") - assert "from . import nms_1d_cpu" not in patched - assert "pure-PyTorch" in patched + assert first.output_path == second.output_path + assert "/cache/" in str(first.output_path) + assert "100001" not in str(first.output_path) + assert "100002" not in str(second.output_path) def test_batch_failure_message_keeps_stdout_stderr_and_decodes_245(tmp_path: Path) -> None: @@ -575,19 +531,19 @@ def test_demo_video_batch_endpoint_writes_queryable_result( def __init__(self) -> None: self.root_dir = root_dir - def run(self, **kwargs: Any) -> VideoBatchRunResult: + def run(self, **kwargs: Any) -> BatchRunResult: assert kwargs["surgery_id"] == "100001" assert kwargs["uploaded_video_path"].is_file() assert kwargs["candidate_consumables"] == ["耗材1"] assert kwargs.get("include_visualization") is False - cache_dir = root_dir / "cache" / "100001" / ("a" * 64) / "c1" + cache_dir = root_dir / "cache" / ("a" * 64) / "c1" cache_input = cache_dir / "input" / "input.mp4" cache_input.parent.mkdir(parents=True) cache_input.write_bytes(b"pipeline-input") output_path = cache_dir / "output" / "result.tsv" output_path.parent.mkdir(parents=True) - output_path.write_text(_complete_result_tsv_body(), encoding="utf-8") - return VideoBatchRunResult( + output_path.write_text(complete_result_tsv_body(), encoding="utf-8") + return BatchRunResult( video_sha256="a" * 64, candidate_cache_key="c1", input_path=root_dir / "100001" / "input" / "saved.mp4", @@ -629,7 +585,7 @@ def test_demo_video_batch_endpoint_writes_queryable_result( for r in rows ] - monkeypatch.setattr(recording_demo, "VideoBatchRunner", _FakeRunner) + monkeypatch.setattr(recording_demo, "BatchAlgorithmService", _FakeRunner) pipeline = _FakePipeline() app = FastAPI() @@ -648,7 +604,7 @@ def test_demo_video_batch_endpoint_writes_queryable_result( assert body["status"] == "accepted" assert body["visualization_url"] is None assert vis_calls == [] - assert not (root_dir / "cache" / "100001").exists() + assert not (root_dir / "cache" / ("a" * 64)).exists() assert not (root_dir / "100001").exists() got = client.get("/client/surgeries/100001/result") @@ -679,15 +635,15 @@ def test_demo_video_batch_endpoint_stages_vis_and_purges_cache_when_requested( def __init__(self) -> None: self.root_dir = root_dir - def run(self, **kwargs: Any) -> VideoBatchRunResult: - cache_dir = root_dir / "cache" / "100001" / ("b" * 64) / "c1" + def run(self, **kwargs: Any) -> BatchRunResult: + cache_dir = root_dir / "cache" / ("b" * 64) / "c1" cache_input = cache_dir / "input" / "input.mp4" cache_input.parent.mkdir(parents=True) cache_input.write_bytes(b"pipeline-input") output_path = cache_dir / "output" / "result.tsv" output_path.parent.mkdir(parents=True) - output_path.write_text(_complete_result_tsv_body(), encoding="utf-8") - return VideoBatchRunResult( + output_path.write_text(complete_result_tsv_body(), encoding="utf-8") + return BatchRunResult( video_sha256="b" * 64, candidate_cache_key="c1", input_path=root_dir / "100001" / "input" / "saved.mp4", @@ -708,7 +664,7 @@ def test_demo_video_batch_endpoint_stages_vis_and_purges_cache_when_requested( ) -> None: return None - monkeypatch.setattr(recording_demo, "VideoBatchRunner", _FakeRunner) + monkeypatch.setattr(recording_demo, "BatchAlgorithmService", _FakeRunner) app = FastAPI() app.include_router(recording_demo.router) app.dependency_overrides[get_surgery_pipeline] = lambda: _FakePipeline() @@ -727,7 +683,7 @@ def test_demo_video_batch_endpoint_stages_vis_and_purges_cache_when_requested( body = res.json() assert body["visualization_url"] == "/internal/demo/offline-batch/100001/visualization" assert vis_calls == ["100001"] - assert not (root_dir / "cache" / "100001").exists() + assert not (root_dir / "cache" / ("b" * 64)).exists() pending_input = root_dir / "vis_pending" / "100001" / "input.mp4" pending_tsv = root_dir / "vis_pending" / "100001" / "result.tsv" assert pending_input.read_bytes() == b"pipeline-input" diff --git a/backend/tests/test_fastapi_algorithm_subprocess.py b/backend/tests/test_fastapi_algorithm_subprocess.py index 6e70581..6bd82e9 100644 --- a/backend/tests/test_fastapi_algorithm_subprocess.py +++ b/backend/tests/test_fastapi_algorithm_subprocess.py @@ -1,7 +1,7 @@ """FastAPI → 算法子进程调用链单元测试。 覆盖两条生产路径: -1. ``POST /internal/demo/offline-batch`` → ``VideoBatchRunner`` → ``subprocess.run``(reference bundle ``main.py``) +1. ``POST /internal/demo/offline-batch`` → ``BatchAlgorithmService`` → ``subprocess.run``(reference bundle ``main.py``) 2. ``POST /client/surgeries/start`` → ``CameraSessionManager`` → ``asyncio.create_subprocess_exec``(``python -m app.algorithm_runner``) """ @@ -24,8 +24,9 @@ from app.config import Settings from app.dependencies import build_container, get_surgery_pipeline, get_voice_terminal_hub from app.routers import recording_demo from app.services.video.session_manager import CameraSessionManager -from app.services.video_batch_runner import VideoBatchRunner, build_reference_command -from tests.test_video_batch_runner import _complete_result_tsv_body, _write_minimal_reference_bundle +from app.algo_host.batch_service import BatchAlgorithmService +from app.algo_host.subprocess_runner import build_batch_main_command +from tests.reference_bundle_fixtures import complete_result_tsv_body, write_minimal_reference_bundle def _fake_reference_subprocess_run(captured: list[dict[str, Any]]): @@ -46,7 +47,7 @@ def _fake_reference_subprocess_run(captured: list[dict[str, Any]]): ) output = Path(config["io"]["out"]) output.parent.mkdir(parents=True, exist_ok=True) - output.write_text(_complete_result_tsv_body(), encoding="utf-8") + output.write_text(complete_result_tsv_body(), encoding="utf-8") return _Proc() return _run @@ -55,7 +56,7 @@ def _fake_reference_subprocess_run(captured: list[dict[str, Any]]): @pytest.fixture def reference_bundle(tmp_path: Path) -> Path: bundle = tmp_path / "algorithm_subprocesses" / "5.15" - _write_minimal_reference_bundle(bundle) + write_minimal_reference_bundle(bundle) return bundle @@ -67,17 +68,13 @@ def test_video_batch_endpoint_invokes_reference_bundle_subprocess( ) -> None: monkeypatch.setattr(recording_demo.settings, "demo_orchestrator_enabled", True) monkeypatch.setattr( - "app.services.video_batch_runner.resolve_reference_bundle_dir", + "app.algo_host.bundle.resolve_reference_bundle_dir", lambda _override=None: reference_bundle.resolve(), ) - monkeypatch.setattr( - "app.services.video_batch_runner.ensure_reference_actionformer_nms_patch", - lambda _bundle: True, - ) monkeypatch.setattr( recording_demo, - "VideoBatchRunner", - lambda: VideoBatchRunner( + "BatchAlgorithmService", + lambda: BatchAlgorithmService( bundle_dir=reference_bundle, root_dir=tmp_path / "video_batch", ), @@ -85,7 +82,7 @@ def test_video_batch_endpoint_invokes_reference_bundle_subprocess( captured: list[dict[str, Any]] = [] monkeypatch.setattr( - "app.services.video_batch_runner.subprocess.run", + "app.algo_host.subprocess_runner.subprocess.run", _fake_reference_subprocess_run(captured), ) @@ -111,7 +108,7 @@ def test_video_batch_endpoint_invokes_reference_bundle_subprocess( cmd: list[str] = call["cmd"] kwargs: dict[str, Any] = call["kwargs"] - expected = build_reference_command( + expected = build_batch_main_command( bundle_dir=reference_bundle, config_path=Path(cmd[cmd.index("--config") + 1]), ) diff --git a/backend/tests/test_video_batch_cleanup.py b/backend/tests/test_video_batch_cleanup.py index f1fb536..110dc89 100644 --- a/backend/tests/test_video_batch_cleanup.py +++ b/backend/tests/test_video_batch_cleanup.py @@ -36,7 +36,7 @@ def test_purge_batch_artifacts_removes_cache_and_uploads(tmp_path: Path) -> None surgery_id = "100001" digest = "d" * 64 candidate_key = "c1" - cache_entry = root / "cache" / surgery_id / digest / candidate_key + cache_entry = root / "cache" / digest / candidate_key (cache_entry / "input").mkdir(parents=True) (cache_entry / "input" / "input.mp4").write_bytes(b"x" * 100) (cache_entry / "output").mkdir(parents=True) diff --git a/docs/Docker部署.md b/docs/Docker部署.md index ba0e867..ae2cdba 100644 --- a/docs/Docker部署.md +++ b/docs/Docker部署.md @@ -15,7 +15,7 @@ operation-room-monitor/ | 组件 | 部署方式 | 默认端口 | |------|----------|----------| -| API + PostgreSQL + MinIO | `cd backend && docker compose up -d --build` | 38080 / 35432 / 9000 | +| API + PostgreSQL + MinIO | `cd backend && docker compose up -d --build` | 38080 / 35432 / 19000 | | Demo 客户端 | `clients/demo-client/start.sh` | 38081 | | 语音确认页 | `clients/voice-confirmation/start.sh` | 8080 | diff --git a/docs/video-backends.md b/docs/video-backends.md index b6dcad8..15a8de9 100755 --- a/docs/video-backends.md +++ b/docs/video-backends.md @@ -46,7 +46,9 @@ SDK **不作为构建期依赖**:将厂商提供的 Linux x86_64 动态库挂 ### 链路 3:离线 batch(`POST /internal/demo/offline-batch`) -- 整段 MP4 跑 `algorithm_subprocesses/5.15/main.py`,结果直接入库;**不**启动实时会话、**不**触发语音终端。 +- FastAPI 编排层 [`app/algo_host`](../backend/app/algo_host/) 为 5.15 准备工作目录(`config.yaml`、白名单、商品表),子进程直调 `algorithm_subprocesses/5.15/main.py --config`;解析 `result.tsv` 后入库。**不**在 API 进程内加载模型,**不**修改 5.15 源码。 +- 可选标注视频由子进程 `visualize_result_video.py` 生成;结果缓存键为 `sha256(video) + candidate_list`(跨手术号复用)。 +- **不**启动实时会话、**不**触发语音终端。 ### 通用