Defer offline batch labeling until after inference results are ready.

Run visualization from pipeline/result paths without vis_pending copies, publish labeled MP4 for browser only after visualize_result_video completes, and purge batch cache after the background labeling task finishes.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Kevin
2026-05-22 11:19:12 +08:00
parent 580e3c0d3b
commit 78f96e24b1
5 changed files with 118 additions and 166 deletions

View File

@@ -22,8 +22,8 @@ from app.algo_host.subprocess_runner import run_batch_main, run_visualization_sc
from app.algo_host.transcode import (
is_browser_compatible_mp4,
is_readable_mp4,
publish_labeled_video_for_browser,
stage_batch_pipeline_input,
transcode_visualization_for_browser,
)
from app.domain.consumption import SurgeryConsumptionStored
from app.services.video_batch_cleanup import (
@@ -89,6 +89,9 @@ class BatchAlgorithmService:
return None
if not video_path.is_file() or not result_path.is_file():
return None
if not is_reference_result_complete(result_path):
logger.warning("skip visualization: incomplete result {}", result_path)
return None
if output_video_path.is_file() and is_browser_compatible_mp4(output_video_path):
return output_video_path
if raw_video_path.is_file() and not is_readable_mp4(raw_video_path):
@@ -97,13 +100,16 @@ class BatchAlgorithmService:
output_video_path.unlink(missing_ok=True)
if raw_video_path.is_file() and is_readable_mp4(raw_video_path):
logger.info(
"reusing existing visualization source for transcode: {}",
"reusing existing labeled visualization for browser publish: {}",
raw_video_path,
)
if transcode_visualization_for_browser(raw_video_path, output_video_path):
if publish_labeled_video_for_browser(
labeled_source=raw_video_path,
browser_output=output_video_path,
):
return output_video_path
logger.warning(
"transcode from existing source failed; regenerating visualization: {}",
"browser publish from existing labeled source failed; regenerating visualization: {}",
raw_video_path,
)
raw_video_path.unlink(missing_ok=True)
@@ -120,10 +126,13 @@ class BatchAlgorithmService:
if not is_readable_mp4(raw_video_path):
logger.error("reference visualization produced unreadable mp4: {}", raw_video_path)
return None
if transcode_visualization_for_browser(raw_video_path, output_video_path):
if publish_labeled_video_for_browser(
labeled_source=raw_video_path,
browser_output=output_video_path,
):
purge_visualization_artifacts(output_video_path.parent)
return output_video_path
logger.error("reference visualization transcode to browser mp4 failed: {}", output_video_path)
logger.error("labeled visualization browser publish failed: {}", output_video_path)
return None
def finalize_visualization(

View File

@@ -75,121 +75,11 @@ def ffprobe_fields(path: Path, entries: str) -> dict[str, str]:
return fields
def ffprobe_container_format(path: Path) -> str:
ffprobe = shutil.which("ffprobe")
if ffprobe is None or not path.is_file():
return ""
proc = subprocess.run(
[
ffprobe,
"-v",
"error",
"-show_entries",
"format=format_name",
"-of",
"default=noprint_wrappers=1:nokey=1",
str(path),
],
check=False,
text=True,
capture_output=True,
)
if proc.returncode != 0:
return ""
return (proc.stdout or "").strip().lower()
def is_browser_compatible_mp4(path: Path) -> bool:
fields = ffprobe_fields(path, "stream=codec_name,pix_fmt")
return fields.get("codec_name") == "h264" and fields.get("pix_fmt") in {"yuv420p", "yuvj420p"}
def batch_input_needs_normalize(path: Path) -> bool:
if not is_readable_mp4(path):
return True
if not is_browser_compatible_mp4(path):
return True
container = ffprobe_container_format(path)
if container and "mpeg" in container:
return True
fields = ffprobe_fields(path, "stream=codec_name,width,height")
try:
width = int(fields.get("width") or "0")
except ValueError:
width = 0
return width > 1920
def normalize_batch_input_video(source_path: Path, output_path: Path) -> bool:
ffmpeg = shutil.which("ffmpeg")
if ffmpeg is None or not source_path.is_file():
return False
if not is_readable_mp4(source_path):
logger.warning("skip batch input normalize: unreadable source {}", source_path)
return False
output_path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = browser_transcode_tmp_path(output_path)
if tmp_path.exists():
tmp_path.unlink()
logger.info("ffmpeg batch input normalize starting: {} -> {}", source_path, output_path)
proc = subprocess.run(
[
ffmpeg,
"-y",
"-hide_banner",
"-loglevel",
"error",
"-fflags",
"+genpts",
"-i",
str(source_path),
"-map",
"0:v:0",
"-an",
"-f",
"mp4",
"-c:v",
"libx264",
"-pix_fmt",
"yuv420p",
"-preset",
"veryfast",
"-crf",
"23",
"-vf",
"scale='min(1920,iw)':-2",
"-movflags",
"+faststart",
str(tmp_path),
],
check=False,
text=True,
capture_output=True,
)
if proc.returncode != 0:
stderr = (proc.stderr or "").strip()
logger.warning("ffmpeg batch input normalize failed: {}", stderr[-3000:])
if tmp_path.exists():
tmp_path.unlink()
return False
if not tmp_path.is_file() or tmp_path.stat().st_size <= 0:
logger.warning("ffmpeg batch input normalize produced empty file: {}", tmp_path)
if tmp_path.exists():
tmp_path.unlink()
return False
tmp_path.replace(output_path)
if not is_browser_compatible_mp4(output_path):
logger.warning("ffmpeg batch input normalize output not h264/yuv420p: {}", output_path)
output_path.unlink(missing_ok=True)
return False
logger.info(
"ffmpeg batch input normalize complete: {} ({} bytes)",
output_path,
output_path.stat().st_size,
)
return True
def stage_batch_pipeline_input(*, source_path: Path, dest_path: Path) -> None:
"""Copy upload to digest-level pipeline input without browser normalize/transcode."""
@@ -199,22 +89,22 @@ def stage_batch_pipeline_input(*, source_path: Path, dest_path: Path) -> None:
shutil.copy2(source_path, dest_path)
def ensure_batch_pipeline_input_video(*, source_path: Path, dest_path: Path) -> None:
"""Browser-compatible normalize/copy; use only for visualization inputs, not batch inference."""
def publish_labeled_video_for_browser(*, labeled_source: Path, browser_output: Path) -> bool:
"""Publish a post-inference labeled MP4 for browser playback (transcode only if needed)."""
dest_path.parent.mkdir(parents=True, exist_ok=True)
if dest_path.is_file() and dest_path.stat().st_size > 0 and not batch_input_needs_normalize(dest_path):
return
if batch_input_needs_normalize(source_path):
if normalize_batch_input_video(source_path, dest_path):
return
logger.warning(
"batch input normalize failed, falling back to raw copy: {} -> {}",
source_path,
dest_path,
)
if not dest_path.is_file():
shutil.copy2(source_path, dest_path)
if not labeled_source.is_file():
return False
if is_browser_compatible_mp4(labeled_source):
browser_output.parent.mkdir(parents=True, exist_ok=True)
if labeled_source.resolve() == browser_output.resolve():
return True
tmp_path = browser_transcode_tmp_path(browser_output)
if tmp_path.exists():
tmp_path.unlink()
shutil.copy2(labeled_source, tmp_path)
tmp_path.replace(browser_output)
return is_browser_compatible_mp4(browser_output)
return transcode_visualization_for_browser(labeled_source, browser_output)
def transcode_visualization_for_browser(source_path: Path, output_path: Path) -> bool:

View File

@@ -29,7 +29,6 @@ from app.services.video_batch_cleanup import (
purge_expired_pipeline_inputs,
purge_expired_visualizations,
purge_surgery_batch_tree,
stage_visualization_pending,
)
from app.algo_host import BatchAlgorithmService
from app.services.voice_terminal_hub import VoiceTerminalHub
@@ -66,12 +65,28 @@ def _require_site_config_path() -> Path:
def _background_finalize_visualization(
runner: BatchAlgorithmService,
surgery_id: str,
*,
video_path: Path,
result_path: Path,
digest: str,
candidate_key: str,
) -> None:
try:
runner.finalize_visualization(surgery_id=surgery_id)
runner.finalize_visualization(
surgery_id=surgery_id,
video_path=video_path,
result_path=result_path,
)
except Exception:
logger.exception("offline batch visualization failed surgery_id={}", surgery_id)
finally:
purge_batch_artifacts(
runner.root_dir,
surgery_id,
digest=digest,
candidate_key=candidate_key,
)
purge_surgery_batch_tree(runner.root_dir, surgery_id)
purge_expired_visualizations(
runner.root_dir,
ttl_hours=float(bp.VIDEO_BATCH_VIS_TTL_HOURS),
@@ -178,23 +193,24 @@ async def offline_batch(
len(result.details),
)
cache_input = result.input_path
if include_visualization:
stage_visualization_pending(
background_tasks.add_task(
_background_finalize_visualization,
runner,
surgery_id,
video_path=result.input_path,
result_path=result.output_path,
digest=result.video_sha256,
candidate_key=result.candidate_cache_key,
)
else:
purge_batch_artifacts(
runner.root_dir,
surgery_id,
source_mp4=cache_input,
result_tsv=result.output_path,
digest=result.video_sha256,
candidate_key=result.candidate_cache_key,
)
background_tasks.add_task(_background_finalize_visualization, runner, surgery_id)
purge_batch_artifacts(
runner.root_dir,
surgery_id,
digest=result.video_sha256,
candidate_key=result.candidate_cache_key,
)
purge_surgery_batch_tree(runner.root_dir, surgery_id)
purge_surgery_batch_tree(runner.root_dir, surgery_id)
visualization_url: str | None = None
if include_visualization:

View File

@@ -31,10 +31,9 @@ from app.algo_host.subprocess_runner import (
)
from app.algo_host.transcode import (
VISUALIZATION_MAX_WIDTH,
batch_input_needs_normalize,
browser_transcode_tmp_path,
ensure_batch_pipeline_input_video,
is_browser_compatible_mp4,
publish_labeled_video_for_browser,
stage_batch_pipeline_input,
transcode_visualization_for_browser,
)
@@ -197,21 +196,21 @@ def test_stage_batch_pipeline_input_copies_without_normalize(tmp_path: Path) ->
text=True,
)
assert proc.returncode == 0, proc.stderr
assert batch_input_needs_normalize(source)
assert not is_browser_compatible_mp4(source)
stage_batch_pipeline_input(source_path=source, dest_path=dest)
assert dest.is_file()
assert dest.read_bytes() == source.read_bytes()
assert batch_input_needs_normalize(dest)
assert not is_browser_compatible_mp4(dest)
stage_batch_pipeline_input(source_path=tmp_path / "other.mp4", dest_path=dest)
assert dest.read_bytes() == source.read_bytes()
@pytest.mark.skipif(shutil.which("ffmpeg") is None, reason="ffmpeg not installed")
def test_ensure_batch_pipeline_input_video_normalizes_non_h264(tmp_path: Path) -> None:
def test_publish_labeled_video_for_browser_transcodes_non_h264(tmp_path: Path) -> None:
ffmpeg = shutil.which("ffmpeg")
assert ffmpeg is not None
source = tmp_path / "upload.mp4"
dest = tmp_path / "input.mp4"
source = tmp_path / "result_vis_source.mp4"
output = tmp_path / "result_vis.mp4"
proc = subprocess.run(
[
ffmpeg,
@@ -233,11 +232,9 @@ def test_ensure_batch_pipeline_input_video_normalizes_non_h264(tmp_path: Path) -
text=True,
)
assert proc.returncode == 0, proc.stderr
assert batch_input_needs_normalize(source)
ensure_batch_pipeline_input_video(source_path=source, dest_path=dest)
assert dest.is_file()
assert is_browser_compatible_mp4(dest)
assert not batch_input_needs_normalize(dest)
assert publish_labeled_video_for_browser(labeled_source=source, browser_output=output)
assert output.is_file()
assert is_browser_compatible_mp4(output)
@pytest.mark.skipif(shutil.which("ffmpeg") is None, reason="ffmpeg not installed")
@@ -698,7 +695,7 @@ def test_demo_video_batch_endpoint_stages_vis_and_purges_cache_when_requested(
source="video_batch",
)
root_dir = tmp_path / "video_batch"
vis_calls: list[str] = []
vis_calls: list[tuple[str, Path | None, Path | None]] = []
class _FakeRunner:
def __init__(self) -> None:
@@ -722,8 +719,14 @@ def test_demo_video_batch_endpoint_stages_vis_and_purges_cache_when_requested(
reused_cache=False,
)
def finalize_visualization(self, *, surgery_id: str) -> None:
vis_calls.append(surgery_id)
def finalize_visualization(
self,
*,
surgery_id: str,
video_path: Path | None = None,
result_path: Path | None = None,
) -> None:
vis_calls.append((surgery_id, video_path, result_path))
class _FakePipeline:
async def save_video_batch_result(
@@ -751,10 +754,12 @@ def test_demo_video_batch_endpoint_stages_vis_and_purges_cache_when_requested(
assert res.status_code == 200, res.text
body = res.json()
assert body["visualization_url"] == "/internal/demo/offline-batch/100001/visualization"
assert vis_calls == ["100001"]
assert len(vis_calls) == 1
assert vis_calls[0][0] == "100001"
assert vis_calls[0][1] is not None
assert vis_calls[0][2] is not None
assert str(vis_calls[0][1]).endswith("pipeline.mp4")
assert str(vis_calls[0][2]).endswith("result.tsv")
assert not (root_dir / "cache" / ("b" * 64) / "c1").exists()
assert (root_dir / "cache" / ("b" * 64) / "input" / "pipeline.mp4").is_file()
pending_input = root_dir / "vis_pending" / "100001" / "input.mp4"
pending_tsv = root_dir / "vis_pending" / "100001" / "result.tsv"
assert pending_input.read_bytes() == b"pipeline-input"
assert "医生信息" in pending_tsv.read_text(encoding="utf-8")
assert not (root_dir / "vis_pending" / "100001").exists()

View File

@@ -6,6 +6,7 @@ from pathlib import Path
from app.services.video_batch_cleanup import (
VISUALIZATION_FILENAME,
purge_batch_artifacts,
purge_expired_pipeline_inputs,
purge_expired_visualizations,
purge_surgery_batch_tree,
purge_visualization_artifacts,
@@ -91,6 +92,37 @@ def test_purge_expired_visualizations_removes_old_vis_mp4(tmp_path: Path) -> Non
assert not vis_path.exists()
def test_purge_expired_pipeline_inputs_removes_old_pipeline_video(tmp_path: Path) -> None:
root = tmp_path / "video_batch"
digest = "d" * 64
pipeline = root / "cache" / digest / "input" / "pipeline.mp4"
pipeline.parent.mkdir(parents=True)
pipeline.write_bytes(b"pipeline")
old = time.time() - (25 * 3600)
import os
os.utime(pipeline, (old, old))
removed = purge_expired_pipeline_inputs(root, ttl_hours=24.0)
assert removed == 1
assert not pipeline.exists()
assert not (root / "cache" / digest / "input").exists()
def test_purge_expired_pipeline_inputs_keeps_recent_pipeline_video(tmp_path: Path) -> None:
root = tmp_path / "video_batch"
digest = "e" * 64
pipeline = root / "cache" / digest / "input" / "pipeline.mp4"
pipeline.parent.mkdir(parents=True)
pipeline.write_bytes(b"pipeline")
removed = purge_expired_pipeline_inputs(root, ttl_hours=24.0)
assert removed == 0
assert pipeline.is_file()
def test_purge_expired_visualizations_keeps_recent_vis_mp4(tmp_path: Path) -> None:
root = tmp_path / "video_batch"
vis_path = visualization_output_path(root, "100002")