From 78f96e24b1f391f8ed0df4b7982cf9738423f67f Mon Sep 17 00:00:00 2001 From: Kevin Date: Fri, 22 May 2026 11:19:12 +0800 Subject: [PATCH] Defer offline batch labeling until after inference results are ready. Run visualization from pipeline/result paths without vis_pending copies, publish labeled MP4 for browser only after visualize_result_video completes, and purge batch cache after the background labeling task finishes. Co-authored-by: Cursor --- backend/app/algo_host/batch_service.py | 21 +++- backend/app/algo_host/transcode.py | 140 +++------------------- backend/app/routers/recording_demo.py | 46 ++++--- backend/tests/test_algo_host_batch.py | 45 +++---- backend/tests/test_video_batch_cleanup.py | 32 +++++ 5 files changed, 118 insertions(+), 166 deletions(-) diff --git a/backend/app/algo_host/batch_service.py b/backend/app/algo_host/batch_service.py index e2fd4a5..f93b5fd 100644 --- a/backend/app/algo_host/batch_service.py +++ b/backend/app/algo_host/batch_service.py @@ -22,8 +22,8 @@ from app.algo_host.subprocess_runner import run_batch_main, run_visualization_sc from app.algo_host.transcode import ( is_browser_compatible_mp4, is_readable_mp4, + publish_labeled_video_for_browser, stage_batch_pipeline_input, - transcode_visualization_for_browser, ) from app.domain.consumption import SurgeryConsumptionStored from app.services.video_batch_cleanup import ( @@ -89,6 +89,9 @@ class BatchAlgorithmService: return None if not video_path.is_file() or not result_path.is_file(): return None + if not is_reference_result_complete(result_path): + logger.warning("skip visualization: incomplete result {}", result_path) + return None if output_video_path.is_file() and is_browser_compatible_mp4(output_video_path): return output_video_path if raw_video_path.is_file() and not is_readable_mp4(raw_video_path): @@ -97,13 +100,16 @@ class BatchAlgorithmService: output_video_path.unlink(missing_ok=True) if raw_video_path.is_file() and is_readable_mp4(raw_video_path): logger.info( - "reusing existing visualization source for transcode: {}", + "reusing existing labeled visualization for browser publish: {}", raw_video_path, ) - if transcode_visualization_for_browser(raw_video_path, output_video_path): + if publish_labeled_video_for_browser( + labeled_source=raw_video_path, + browser_output=output_video_path, + ): return output_video_path logger.warning( - "transcode from existing source failed; regenerating visualization: {}", + "browser publish from existing labeled source failed; regenerating visualization: {}", raw_video_path, ) raw_video_path.unlink(missing_ok=True) @@ -120,10 +126,13 @@ class BatchAlgorithmService: if not is_readable_mp4(raw_video_path): logger.error("reference visualization produced unreadable mp4: {}", raw_video_path) return None - if transcode_visualization_for_browser(raw_video_path, output_video_path): + if publish_labeled_video_for_browser( + labeled_source=raw_video_path, + browser_output=output_video_path, + ): purge_visualization_artifacts(output_video_path.parent) return output_video_path - logger.error("reference visualization transcode to browser mp4 failed: {}", output_video_path) + logger.error("labeled visualization browser publish failed: {}", output_video_path) return None def finalize_visualization( diff --git a/backend/app/algo_host/transcode.py b/backend/app/algo_host/transcode.py index f353224..760dcaa 100644 --- a/backend/app/algo_host/transcode.py +++ b/backend/app/algo_host/transcode.py @@ -75,121 +75,11 @@ def ffprobe_fields(path: Path, entries: str) -> dict[str, str]: return fields -def ffprobe_container_format(path: Path) -> str: - ffprobe = shutil.which("ffprobe") - if ffprobe is None or not path.is_file(): - return "" - proc = subprocess.run( - [ - ffprobe, - "-v", - "error", - "-show_entries", - "format=format_name", - "-of", - "default=noprint_wrappers=1:nokey=1", - str(path), - ], - check=False, - text=True, - capture_output=True, - ) - if proc.returncode != 0: - return "" - return (proc.stdout or "").strip().lower() - - def is_browser_compatible_mp4(path: Path) -> bool: fields = ffprobe_fields(path, "stream=codec_name,pix_fmt") return fields.get("codec_name") == "h264" and fields.get("pix_fmt") in {"yuv420p", "yuvj420p"} -def batch_input_needs_normalize(path: Path) -> bool: - if not is_readable_mp4(path): - return True - if not is_browser_compatible_mp4(path): - return True - container = ffprobe_container_format(path) - if container and "mpeg" in container: - return True - fields = ffprobe_fields(path, "stream=codec_name,width,height") - try: - width = int(fields.get("width") or "0") - except ValueError: - width = 0 - return width > 1920 - - -def normalize_batch_input_video(source_path: Path, output_path: Path) -> bool: - ffmpeg = shutil.which("ffmpeg") - if ffmpeg is None or not source_path.is_file(): - return False - if not is_readable_mp4(source_path): - logger.warning("skip batch input normalize: unreadable source {}", source_path) - return False - output_path.parent.mkdir(parents=True, exist_ok=True) - tmp_path = browser_transcode_tmp_path(output_path) - if tmp_path.exists(): - tmp_path.unlink() - logger.info("ffmpeg batch input normalize starting: {} -> {}", source_path, output_path) - proc = subprocess.run( - [ - ffmpeg, - "-y", - "-hide_banner", - "-loglevel", - "error", - "-fflags", - "+genpts", - "-i", - str(source_path), - "-map", - "0:v:0", - "-an", - "-f", - "mp4", - "-c:v", - "libx264", - "-pix_fmt", - "yuv420p", - "-preset", - "veryfast", - "-crf", - "23", - "-vf", - "scale='min(1920,iw)':-2", - "-movflags", - "+faststart", - str(tmp_path), - ], - check=False, - text=True, - capture_output=True, - ) - if proc.returncode != 0: - stderr = (proc.stderr or "").strip() - logger.warning("ffmpeg batch input normalize failed: {}", stderr[-3000:]) - if tmp_path.exists(): - tmp_path.unlink() - return False - if not tmp_path.is_file() or tmp_path.stat().st_size <= 0: - logger.warning("ffmpeg batch input normalize produced empty file: {}", tmp_path) - if tmp_path.exists(): - tmp_path.unlink() - return False - tmp_path.replace(output_path) - if not is_browser_compatible_mp4(output_path): - logger.warning("ffmpeg batch input normalize output not h264/yuv420p: {}", output_path) - output_path.unlink(missing_ok=True) - return False - logger.info( - "ffmpeg batch input normalize complete: {} ({} bytes)", - output_path, - output_path.stat().st_size, - ) - return True - - def stage_batch_pipeline_input(*, source_path: Path, dest_path: Path) -> None: """Copy upload to digest-level pipeline input without browser normalize/transcode.""" @@ -199,22 +89,22 @@ def stage_batch_pipeline_input(*, source_path: Path, dest_path: Path) -> None: shutil.copy2(source_path, dest_path) -def ensure_batch_pipeline_input_video(*, source_path: Path, dest_path: Path) -> None: - """Browser-compatible normalize/copy; use only for visualization inputs, not batch inference.""" +def publish_labeled_video_for_browser(*, labeled_source: Path, browser_output: Path) -> bool: + """Publish a post-inference labeled MP4 for browser playback (transcode only if needed).""" - dest_path.parent.mkdir(parents=True, exist_ok=True) - if dest_path.is_file() and dest_path.stat().st_size > 0 and not batch_input_needs_normalize(dest_path): - return - if batch_input_needs_normalize(source_path): - if normalize_batch_input_video(source_path, dest_path): - return - logger.warning( - "batch input normalize failed, falling back to raw copy: {} -> {}", - source_path, - dest_path, - ) - if not dest_path.is_file(): - shutil.copy2(source_path, dest_path) + if not labeled_source.is_file(): + return False + if is_browser_compatible_mp4(labeled_source): + browser_output.parent.mkdir(parents=True, exist_ok=True) + if labeled_source.resolve() == browser_output.resolve(): + return True + tmp_path = browser_transcode_tmp_path(browser_output) + if tmp_path.exists(): + tmp_path.unlink() + shutil.copy2(labeled_source, tmp_path) + tmp_path.replace(browser_output) + return is_browser_compatible_mp4(browser_output) + return transcode_visualization_for_browser(labeled_source, browser_output) def transcode_visualization_for_browser(source_path: Path, output_path: Path) -> bool: diff --git a/backend/app/routers/recording_demo.py b/backend/app/routers/recording_demo.py index ec8c96a..348559a 100644 --- a/backend/app/routers/recording_demo.py +++ b/backend/app/routers/recording_demo.py @@ -29,7 +29,6 @@ from app.services.video_batch_cleanup import ( purge_expired_pipeline_inputs, purge_expired_visualizations, purge_surgery_batch_tree, - stage_visualization_pending, ) from app.algo_host import BatchAlgorithmService from app.services.voice_terminal_hub import VoiceTerminalHub @@ -66,12 +65,28 @@ def _require_site_config_path() -> Path: def _background_finalize_visualization( runner: BatchAlgorithmService, surgery_id: str, + *, + video_path: Path, + result_path: Path, + digest: str, + candidate_key: str, ) -> None: try: - runner.finalize_visualization(surgery_id=surgery_id) + runner.finalize_visualization( + surgery_id=surgery_id, + video_path=video_path, + result_path=result_path, + ) except Exception: logger.exception("offline batch visualization failed surgery_id={}", surgery_id) finally: + purge_batch_artifacts( + runner.root_dir, + surgery_id, + digest=digest, + candidate_key=candidate_key, + ) + purge_surgery_batch_tree(runner.root_dir, surgery_id) purge_expired_visualizations( runner.root_dir, ttl_hours=float(bp.VIDEO_BATCH_VIS_TTL_HOURS), @@ -178,23 +193,24 @@ async def offline_batch( len(result.details), ) - cache_input = result.input_path if include_visualization: - stage_visualization_pending( + background_tasks.add_task( + _background_finalize_visualization, + runner, + surgery_id, + video_path=result.input_path, + result_path=result.output_path, + digest=result.video_sha256, + candidate_key=result.candidate_cache_key, + ) + else: + purge_batch_artifacts( runner.root_dir, surgery_id, - source_mp4=cache_input, - result_tsv=result.output_path, + digest=result.video_sha256, + candidate_key=result.candidate_cache_key, ) - background_tasks.add_task(_background_finalize_visualization, runner, surgery_id) - - purge_batch_artifacts( - runner.root_dir, - surgery_id, - digest=result.video_sha256, - candidate_key=result.candidate_cache_key, - ) - purge_surgery_batch_tree(runner.root_dir, surgery_id) + purge_surgery_batch_tree(runner.root_dir, surgery_id) visualization_url: str | None = None if include_visualization: diff --git a/backend/tests/test_algo_host_batch.py b/backend/tests/test_algo_host_batch.py index 9c13b86..982e811 100644 --- a/backend/tests/test_algo_host_batch.py +++ b/backend/tests/test_algo_host_batch.py @@ -31,10 +31,9 @@ from app.algo_host.subprocess_runner import ( ) from app.algo_host.transcode import ( VISUALIZATION_MAX_WIDTH, - batch_input_needs_normalize, browser_transcode_tmp_path, - ensure_batch_pipeline_input_video, is_browser_compatible_mp4, + publish_labeled_video_for_browser, stage_batch_pipeline_input, transcode_visualization_for_browser, ) @@ -197,21 +196,21 @@ def test_stage_batch_pipeline_input_copies_without_normalize(tmp_path: Path) -> text=True, ) assert proc.returncode == 0, proc.stderr - assert batch_input_needs_normalize(source) + assert not is_browser_compatible_mp4(source) stage_batch_pipeline_input(source_path=source, dest_path=dest) assert dest.is_file() assert dest.read_bytes() == source.read_bytes() - assert batch_input_needs_normalize(dest) + assert not is_browser_compatible_mp4(dest) stage_batch_pipeline_input(source_path=tmp_path / "other.mp4", dest_path=dest) assert dest.read_bytes() == source.read_bytes() @pytest.mark.skipif(shutil.which("ffmpeg") is None, reason="ffmpeg not installed") -def test_ensure_batch_pipeline_input_video_normalizes_non_h264(tmp_path: Path) -> None: +def test_publish_labeled_video_for_browser_transcodes_non_h264(tmp_path: Path) -> None: ffmpeg = shutil.which("ffmpeg") assert ffmpeg is not None - source = tmp_path / "upload.mp4" - dest = tmp_path / "input.mp4" + source = tmp_path / "result_vis_source.mp4" + output = tmp_path / "result_vis.mp4" proc = subprocess.run( [ ffmpeg, @@ -233,11 +232,9 @@ def test_ensure_batch_pipeline_input_video_normalizes_non_h264(tmp_path: Path) - text=True, ) assert proc.returncode == 0, proc.stderr - assert batch_input_needs_normalize(source) - ensure_batch_pipeline_input_video(source_path=source, dest_path=dest) - assert dest.is_file() - assert is_browser_compatible_mp4(dest) - assert not batch_input_needs_normalize(dest) + assert publish_labeled_video_for_browser(labeled_source=source, browser_output=output) + assert output.is_file() + assert is_browser_compatible_mp4(output) @pytest.mark.skipif(shutil.which("ffmpeg") is None, reason="ffmpeg not installed") @@ -698,7 +695,7 @@ def test_demo_video_batch_endpoint_stages_vis_and_purges_cache_when_requested( source="video_batch", ) root_dir = tmp_path / "video_batch" - vis_calls: list[str] = [] + vis_calls: list[tuple[str, Path | None, Path | None]] = [] class _FakeRunner: def __init__(self) -> None: @@ -722,8 +719,14 @@ def test_demo_video_batch_endpoint_stages_vis_and_purges_cache_when_requested( reused_cache=False, ) - def finalize_visualization(self, *, surgery_id: str) -> None: - vis_calls.append(surgery_id) + def finalize_visualization( + self, + *, + surgery_id: str, + video_path: Path | None = None, + result_path: Path | None = None, + ) -> None: + vis_calls.append((surgery_id, video_path, result_path)) class _FakePipeline: async def save_video_batch_result( @@ -751,10 +754,12 @@ def test_demo_video_batch_endpoint_stages_vis_and_purges_cache_when_requested( assert res.status_code == 200, res.text body = res.json() assert body["visualization_url"] == "/internal/demo/offline-batch/100001/visualization" - assert vis_calls == ["100001"] + assert len(vis_calls) == 1 + assert vis_calls[0][0] == "100001" + assert vis_calls[0][1] is not None + assert vis_calls[0][2] is not None + assert str(vis_calls[0][1]).endswith("pipeline.mp4") + assert str(vis_calls[0][2]).endswith("result.tsv") assert not (root_dir / "cache" / ("b" * 64) / "c1").exists() assert (root_dir / "cache" / ("b" * 64) / "input" / "pipeline.mp4").is_file() - pending_input = root_dir / "vis_pending" / "100001" / "input.mp4" - pending_tsv = root_dir / "vis_pending" / "100001" / "result.tsv" - assert pending_input.read_bytes() == b"pipeline-input" - assert "医生信息" in pending_tsv.read_text(encoding="utf-8") + assert not (root_dir / "vis_pending" / "100001").exists() diff --git a/backend/tests/test_video_batch_cleanup.py b/backend/tests/test_video_batch_cleanup.py index c2fb10d..8b8e71d 100644 --- a/backend/tests/test_video_batch_cleanup.py +++ b/backend/tests/test_video_batch_cleanup.py @@ -6,6 +6,7 @@ from pathlib import Path from app.services.video_batch_cleanup import ( VISUALIZATION_FILENAME, purge_batch_artifacts, + purge_expired_pipeline_inputs, purge_expired_visualizations, purge_surgery_batch_tree, purge_visualization_artifacts, @@ -91,6 +92,37 @@ def test_purge_expired_visualizations_removes_old_vis_mp4(tmp_path: Path) -> Non assert not vis_path.exists() +def test_purge_expired_pipeline_inputs_removes_old_pipeline_video(tmp_path: Path) -> None: + root = tmp_path / "video_batch" + digest = "d" * 64 + pipeline = root / "cache" / digest / "input" / "pipeline.mp4" + pipeline.parent.mkdir(parents=True) + pipeline.write_bytes(b"pipeline") + old = time.time() - (25 * 3600) + import os + + os.utime(pipeline, (old, old)) + + removed = purge_expired_pipeline_inputs(root, ttl_hours=24.0) + + assert removed == 1 + assert not pipeline.exists() + assert not (root / "cache" / digest / "input").exists() + + +def test_purge_expired_pipeline_inputs_keeps_recent_pipeline_video(tmp_path: Path) -> None: + root = tmp_path / "video_batch" + digest = "e" * 64 + pipeline = root / "cache" / digest / "input" / "pipeline.mp4" + pipeline.parent.mkdir(parents=True) + pipeline.write_bytes(b"pipeline") + + removed = purge_expired_pipeline_inputs(root, ttl_hours=24.0) + + assert removed == 0 + assert pipeline.is_file() + + def test_purge_expired_visualizations_keeps_recent_vis_mp4(tmp_path: Path) -> None: root = tmp_path / "video_batch" vis_path = visualization_output_path(root, "100002")