from __future__ import annotations import time from pathlib import Path from app.services.video_batch_cleanup import ( VISUALIZATION_FILENAME, purge_batch_artifacts, purge_expired_visualizations, purge_surgery_batch_tree, purge_visualization_artifacts, stage_visualization_pending, visualization_output_path, visualization_pending_input_path, visualization_pending_result_path, ) def test_stage_visualization_pending_copies_mp4_and_tsv(tmp_path: Path) -> None: root = tmp_path / "video_batch" source = tmp_path / "source.mp4" tsv = tmp_path / "result.tsv" source.write_bytes(b"mp4-bytes") tsv.write_text("rank\tx\n1\ta\n", encoding="utf-8") stage_visualization_pending(root, "100001", source_mp4=source, result_tsv=tsv) assert visualization_pending_input_path(root, "100001").read_bytes() == b"mp4-bytes" assert visualization_pending_result_path(root, "100001").read_text(encoding="utf-8") == tsv.read_text( encoding="utf-8" ) def test_purge_batch_artifacts_removes_cache_and_uploads(tmp_path: Path) -> None: root = tmp_path / "video_batch" surgery_id = "100001" digest = "d" * 64 candidate_key = "c1" cache_entry = root / "cache" / digest / candidate_key pipeline_input = root / "cache" / digest / "input" / "pipeline.mp4" pipeline_input.parent.mkdir(parents=True) pipeline_input.write_bytes(b"x" * 100) (cache_entry / "output").mkdir(parents=True) (cache_entry / "output" / "result.tsv").write_text("ok\n", encoding="utf-8") (cache_entry / "work").mkdir(parents=True) (cache_entry / "work" / "features").mkdir(parents=True) (cache_entry / "work" / "features" / "input.npy").write_bytes(b"npy") (root / surgery_id / "upload").mkdir(parents=True) (root / surgery_id / "upload" / "upload.mp4").write_bytes(b"upload") (root / surgery_id / "input").mkdir(parents=True) (root / surgery_id / "input" / "abc.mp4").write_bytes(b"input") purge_batch_artifacts(root, surgery_id, digest=digest, candidate_key=candidate_key) purge_surgery_batch_tree(root, surgery_id) assert not cache_entry.exists() assert pipeline_input.is_file() assert not (root / surgery_id).exists() def test_purge_visualization_artifacts_keeps_browser_mp4(tmp_path: Path) -> None: out_dir = tmp_path / "vis" / "100001" out_dir.mkdir(parents=True) browser = out_dir / VISUALIZATION_FILENAME raw = out_dir / "result_vis_source.mp4" part = out_dir / "result_vis.part.mp4" browser.write_bytes(b"browser") raw.write_bytes(b"raw") part.write_bytes(b"part") purge_visualization_artifacts(out_dir) assert browser.is_file() assert not raw.exists() assert not part.exists() def test_purge_expired_visualizations_removes_old_vis_mp4(tmp_path: Path) -> None: root = tmp_path / "video_batch" vis_path = visualization_output_path(root, "100001") vis_path.parent.mkdir(parents=True) vis_path.write_bytes(b"vis") old = time.time() - (25 * 3600) import os os.utime(vis_path, (old, old)) removed = purge_expired_visualizations(root, ttl_hours=24.0) assert removed == 1 assert not vis_path.exists() def test_purge_expired_visualizations_keeps_recent_vis_mp4(tmp_path: Path) -> None: root = tmp_path / "video_batch" vis_path = visualization_output_path(root, "100002") vis_path.parent.mkdir(parents=True) vis_path.write_bytes(b"vis") removed = purge_expired_visualizations(root, ttl_hours=24.0) assert removed == 0 assert vis_path.is_file()