Run visualization from pipeline/result paths without vis_pending copies, publish labeled MP4 for browser only after visualize_result_video completes, and purge batch cache after the background labeling task finishes. Co-authored-by: Cursor <cursoragent@cursor.com>
136 lines
4.5 KiB
Python
136 lines
4.5 KiB
Python
from __future__ import annotations
|
|
|
|
import time
|
|
from pathlib import Path
|
|
|
|
from app.services.video_batch_cleanup import (
|
|
VISUALIZATION_FILENAME,
|
|
purge_batch_artifacts,
|
|
purge_expired_pipeline_inputs,
|
|
purge_expired_visualizations,
|
|
purge_surgery_batch_tree,
|
|
purge_visualization_artifacts,
|
|
stage_visualization_pending,
|
|
visualization_output_path,
|
|
visualization_pending_input_path,
|
|
visualization_pending_result_path,
|
|
)
|
|
|
|
|
|
def test_stage_visualization_pending_copies_mp4_and_tsv(tmp_path: Path) -> None:
|
|
root = tmp_path / "video_batch"
|
|
source = tmp_path / "source.mp4"
|
|
tsv = tmp_path / "result.tsv"
|
|
source.write_bytes(b"mp4-bytes")
|
|
tsv.write_text("rank\tx\n1\ta\n", encoding="utf-8")
|
|
|
|
stage_visualization_pending(root, "100001", source_mp4=source, result_tsv=tsv)
|
|
|
|
assert visualization_pending_input_path(root, "100001").read_bytes() == b"mp4-bytes"
|
|
assert visualization_pending_result_path(root, "100001").read_text(encoding="utf-8") == tsv.read_text(
|
|
encoding="utf-8"
|
|
)
|
|
|
|
|
|
def test_purge_batch_artifacts_removes_cache_and_uploads(tmp_path: Path) -> None:
|
|
root = tmp_path / "video_batch"
|
|
surgery_id = "100001"
|
|
digest = "d" * 64
|
|
candidate_key = "c1"
|
|
cache_entry = root / "cache" / digest / candidate_key
|
|
pipeline_input = root / "cache" / digest / "input" / "pipeline.mp4"
|
|
pipeline_input.parent.mkdir(parents=True)
|
|
pipeline_input.write_bytes(b"x" * 100)
|
|
(cache_entry / "output").mkdir(parents=True)
|
|
(cache_entry / "output" / "result.tsv").write_text("ok\n", encoding="utf-8")
|
|
(cache_entry / "work").mkdir(parents=True)
|
|
(cache_entry / "work" / "features").mkdir(parents=True)
|
|
(cache_entry / "work" / "features" / "input.npy").write_bytes(b"npy")
|
|
(root / surgery_id / "upload").mkdir(parents=True)
|
|
(root / surgery_id / "upload" / "upload.mp4").write_bytes(b"upload")
|
|
(root / surgery_id / "input").mkdir(parents=True)
|
|
(root / surgery_id / "input" / "abc.mp4").write_bytes(b"input")
|
|
|
|
purge_batch_artifacts(root, surgery_id, digest=digest, candidate_key=candidate_key)
|
|
purge_surgery_batch_tree(root, surgery_id)
|
|
|
|
assert not cache_entry.exists()
|
|
assert pipeline_input.is_file()
|
|
assert not (root / surgery_id).exists()
|
|
|
|
|
|
def test_purge_visualization_artifacts_keeps_browser_mp4(tmp_path: Path) -> None:
|
|
out_dir = tmp_path / "vis" / "100001"
|
|
out_dir.mkdir(parents=True)
|
|
browser = out_dir / VISUALIZATION_FILENAME
|
|
raw = out_dir / "result_vis_source.mp4"
|
|
part = out_dir / "result_vis.part.mp4"
|
|
browser.write_bytes(b"browser")
|
|
raw.write_bytes(b"raw")
|
|
part.write_bytes(b"part")
|
|
|
|
purge_visualization_artifacts(out_dir)
|
|
|
|
assert browser.is_file()
|
|
assert not raw.exists()
|
|
assert not part.exists()
|
|
|
|
|
|
def test_purge_expired_visualizations_removes_old_vis_mp4(tmp_path: Path) -> None:
|
|
root = tmp_path / "video_batch"
|
|
vis_path = visualization_output_path(root, "100001")
|
|
vis_path.parent.mkdir(parents=True)
|
|
vis_path.write_bytes(b"vis")
|
|
old = time.time() - (25 * 3600)
|
|
import os
|
|
|
|
os.utime(vis_path, (old, old))
|
|
|
|
removed = purge_expired_visualizations(root, ttl_hours=24.0)
|
|
|
|
assert removed == 1
|
|
assert not vis_path.exists()
|
|
|
|
|
|
def test_purge_expired_pipeline_inputs_removes_old_pipeline_video(tmp_path: Path) -> None:
|
|
root = tmp_path / "video_batch"
|
|
digest = "d" * 64
|
|
pipeline = root / "cache" / digest / "input" / "pipeline.mp4"
|
|
pipeline.parent.mkdir(parents=True)
|
|
pipeline.write_bytes(b"pipeline")
|
|
old = time.time() - (25 * 3600)
|
|
import os
|
|
|
|
os.utime(pipeline, (old, old))
|
|
|
|
removed = purge_expired_pipeline_inputs(root, ttl_hours=24.0)
|
|
|
|
assert removed == 1
|
|
assert not pipeline.exists()
|
|
assert not (root / "cache" / digest / "input").exists()
|
|
|
|
|
|
def test_purge_expired_pipeline_inputs_keeps_recent_pipeline_video(tmp_path: Path) -> None:
|
|
root = tmp_path / "video_batch"
|
|
digest = "e" * 64
|
|
pipeline = root / "cache" / digest / "input" / "pipeline.mp4"
|
|
pipeline.parent.mkdir(parents=True)
|
|
pipeline.write_bytes(b"pipeline")
|
|
|
|
removed = purge_expired_pipeline_inputs(root, ttl_hours=24.0)
|
|
|
|
assert removed == 0
|
|
assert pipeline.is_file()
|
|
|
|
|
|
def test_purge_expired_visualizations_keeps_recent_vis_mp4(tmp_path: Path) -> None:
|
|
root = tmp_path / "video_batch"
|
|
vis_path = visualization_output_path(root, "100002")
|
|
vis_path.parent.mkdir(parents=True)
|
|
vis_path.write_bytes(b"vis")
|
|
|
|
removed = purge_expired_visualizations(root, ttl_hours=24.0)
|
|
|
|
assert removed == 0
|
|
assert vis_path.is_file()
|