85 lines
2.6 KiB
Python
85 lines
2.6 KiB
Python
"""推流帧缓存单元测试。"""
|
|
from __future__ import annotations
|
|
|
|
import sys
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock
|
|
|
|
import numpy as np
|
|
|
|
PACK_ROOT = Path(__file__).resolve().parents[1]
|
|
sys.path.insert(0, str(PACK_ROOT / "src"))
|
|
|
|
from paths import ensure_code_on_path # noqa: E402
|
|
|
|
ensure_code_on_path(PACK_ROOT)
|
|
|
|
from stream_frame_buffer import ( # noqa: E402
|
|
FrameRingBuffer,
|
|
decode_cached_frame,
|
|
encode_frame_for_cache,
|
|
)
|
|
from stream_basket_session import StreamBasketSession # noqa: E402
|
|
|
|
|
|
class TestFrameRingBuffer(unittest.TestCase):
|
|
def test_jpeg_roundtrip(self) -> None:
|
|
raw = np.random.randint(0, 255, (1080, 1920, 3), dtype=np.uint8)
|
|
data = encode_frame_for_cache(raw, max_width=1280)
|
|
self.assertLess(len(data), raw.nbytes // 4)
|
|
back = decode_cached_frame(data)
|
|
self.assertEqual(back.ndim, 3)
|
|
self.assertLessEqual(back.shape[1], 1280)
|
|
|
|
def test_slice_inclusive(self) -> None:
|
|
buf = FrameRingBuffer(max_seconds=10.0, fps=10.0, cache_max_width=640)
|
|
for i in range(50):
|
|
f = np.zeros((64, 64, 3), dtype=np.uint8)
|
|
buf.append(i * 0.1, f)
|
|
part = buf.slice_decoded(2.0, 2.5)
|
|
ts = [p[0] for p in part]
|
|
self.assertTrue(all(2.0 - 1e-6 <= t <= 2.5 + 1e-6 for t in ts))
|
|
self.assertGreaterEqual(len(ts), 5)
|
|
|
|
|
|
class TestStreamBasketSession(unittest.TestCase):
|
|
def test_pending_clip_ready_after_window(self) -> None:
|
|
trigger = MagicMock()
|
|
trigger.process_frame = MagicMock(
|
|
side_effect=lambda t, _h, _b: 2.0 if abs(t - 2.0) < 1e-9 else None
|
|
)
|
|
hand_model = MagicMock()
|
|
|
|
session = StreamBasketSession(
|
|
[0, 0, 100, 100],
|
|
hand_model,
|
|
trigger,
|
|
segment_start_offset_sec=2.0,
|
|
segment_end_offset_sec=8.0,
|
|
min_segment_sec=4.0,
|
|
ring_buffer_sec=10.0,
|
|
fps=25.0,
|
|
cache_max_width=640,
|
|
)
|
|
|
|
frame = np.zeros((64, 64, 3), dtype=np.uint8)
|
|
contact_t = None
|
|
for i in range(260):
|
|
t = i * 0.04
|
|
start = session.push_frame(t, frame)
|
|
if start is not None and contact_t is None:
|
|
contact_t = start
|
|
|
|
self.assertAlmostEqual(contact_t, 2.0, places=3)
|
|
clips = session.poll_ready_clips()
|
|
self.assertGreaterEqual(len(clips), 1)
|
|
clip = clips[0]
|
|
self.assertAlmostEqual(clip.start_sec, 4.0, places=2)
|
|
self.assertAlmostEqual(clip.end_sec, 10.0, places=2)
|
|
self.assertGreater(len(clip.frames), 0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|