Files
operating-room-monitor-server/backend/tests/test_video_batch_cleanup.py
Kevin 78f96e24b1 Defer offline batch labeling until after inference results are ready.
Run visualization from pipeline/result paths without vis_pending copies, publish labeled MP4 for browser only after visualize_result_video completes, and purge batch cache after the background labeling task finishes.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-22 11:19:12 +08:00

136 lines
4.5 KiB
Python

from __future__ import annotations
import time
from pathlib import Path
from app.services.video_batch_cleanup import (
VISUALIZATION_FILENAME,
purge_batch_artifacts,
purge_expired_pipeline_inputs,
purge_expired_visualizations,
purge_surgery_batch_tree,
purge_visualization_artifacts,
stage_visualization_pending,
visualization_output_path,
visualization_pending_input_path,
visualization_pending_result_path,
)
def test_stage_visualization_pending_copies_mp4_and_tsv(tmp_path: Path) -> None:
root = tmp_path / "video_batch"
source = tmp_path / "source.mp4"
tsv = tmp_path / "result.tsv"
source.write_bytes(b"mp4-bytes")
tsv.write_text("rank\tx\n1\ta\n", encoding="utf-8")
stage_visualization_pending(root, "100001", source_mp4=source, result_tsv=tsv)
assert visualization_pending_input_path(root, "100001").read_bytes() == b"mp4-bytes"
assert visualization_pending_result_path(root, "100001").read_text(encoding="utf-8") == tsv.read_text(
encoding="utf-8"
)
def test_purge_batch_artifacts_removes_cache_and_uploads(tmp_path: Path) -> None:
root = tmp_path / "video_batch"
surgery_id = "100001"
digest = "d" * 64
candidate_key = "c1"
cache_entry = root / "cache" / digest / candidate_key
pipeline_input = root / "cache" / digest / "input" / "pipeline.mp4"
pipeline_input.parent.mkdir(parents=True)
pipeline_input.write_bytes(b"x" * 100)
(cache_entry / "output").mkdir(parents=True)
(cache_entry / "output" / "result.tsv").write_text("ok\n", encoding="utf-8")
(cache_entry / "work").mkdir(parents=True)
(cache_entry / "work" / "features").mkdir(parents=True)
(cache_entry / "work" / "features" / "input.npy").write_bytes(b"npy")
(root / surgery_id / "upload").mkdir(parents=True)
(root / surgery_id / "upload" / "upload.mp4").write_bytes(b"upload")
(root / surgery_id / "input").mkdir(parents=True)
(root / surgery_id / "input" / "abc.mp4").write_bytes(b"input")
purge_batch_artifacts(root, surgery_id, digest=digest, candidate_key=candidate_key)
purge_surgery_batch_tree(root, surgery_id)
assert not cache_entry.exists()
assert pipeline_input.is_file()
assert not (root / surgery_id).exists()
def test_purge_visualization_artifacts_keeps_browser_mp4(tmp_path: Path) -> None:
out_dir = tmp_path / "vis" / "100001"
out_dir.mkdir(parents=True)
browser = out_dir / VISUALIZATION_FILENAME
raw = out_dir / "result_vis_source.mp4"
part = out_dir / "result_vis.part.mp4"
browser.write_bytes(b"browser")
raw.write_bytes(b"raw")
part.write_bytes(b"part")
purge_visualization_artifacts(out_dir)
assert browser.is_file()
assert not raw.exists()
assert not part.exists()
def test_purge_expired_visualizations_removes_old_vis_mp4(tmp_path: Path) -> None:
root = tmp_path / "video_batch"
vis_path = visualization_output_path(root, "100001")
vis_path.parent.mkdir(parents=True)
vis_path.write_bytes(b"vis")
old = time.time() - (25 * 3600)
import os
os.utime(vis_path, (old, old))
removed = purge_expired_visualizations(root, ttl_hours=24.0)
assert removed == 1
assert not vis_path.exists()
def test_purge_expired_pipeline_inputs_removes_old_pipeline_video(tmp_path: Path) -> None:
root = tmp_path / "video_batch"
digest = "d" * 64
pipeline = root / "cache" / digest / "input" / "pipeline.mp4"
pipeline.parent.mkdir(parents=True)
pipeline.write_bytes(b"pipeline")
old = time.time() - (25 * 3600)
import os
os.utime(pipeline, (old, old))
removed = purge_expired_pipeline_inputs(root, ttl_hours=24.0)
assert removed == 1
assert not pipeline.exists()
assert not (root / "cache" / digest / "input").exists()
def test_purge_expired_pipeline_inputs_keeps_recent_pipeline_video(tmp_path: Path) -> None:
root = tmp_path / "video_batch"
digest = "e" * 64
pipeline = root / "cache" / digest / "input" / "pipeline.mp4"
pipeline.parent.mkdir(parents=True)
pipeline.write_bytes(b"pipeline")
removed = purge_expired_pipeline_inputs(root, ttl_hours=24.0)
assert removed == 0
assert pipeline.is_file()
def test_purge_expired_visualizations_keeps_recent_vis_mp4(tmp_path: Path) -> None:
root = tmp_path / "video_batch"
vis_path = visualization_output_path(root, "100002")
vis_path.parent.mkdir(parents=True)
vis_path.write_bytes(b"vis")
removed = purge_expired_visualizations(root, ttl_hours=24.0)
assert removed == 0
assert vis_path.is_file()