Files
operating-room-monitor-server/backend/tests/test_result_unavailable.py
Kevin 0917109d6a Align client surgery API with documented contract and support consumable codes.
Unify result 503 to RESULT_NOT_READY, restore resolve ASR failures as HTTP 422, accept label_id-only candidate_consumables, and document the changelog for integrators.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-25 09:36:09 +08:00

55 lines
2.2 KiB
Python

"""查询结果不可用时的原因分类。"""
from __future__ import annotations
import asyncio
from unittest.mock import AsyncMock, MagicMock
import pytest
from app.services.surgery_pipeline import SurgeryPipeline
from app.services.video.session_manager import CameraSessionManager, RunningSurgery, SurgerySessionState
@pytest.mark.asyncio
async def test_classify_not_started() -> None:
sessions = MagicMock(spec=CameraSessionManager)
sessions.active_recording_phase = MagicMock(return_value=None)
sessions.archived_consumption_fallback = AsyncMock(return_value=None)
repo = MagicMock()
repo.load_final_details = AsyncMock(return_value=None)
pipeline = SurgeryPipeline(sessions, result_repository=repo, voice_confirmation=MagicMock())
code, _ = await pipeline.classify_result_unavailable("123456")
assert code == "RESULT_NOT_READY"
@pytest.mark.asyncio
async def test_classify_in_progress_no_details() -> None:
sessions = MagicMock(spec=CameraSessionManager)
sessions.active_recording_phase = MagicMock(return_value="recording")
pipeline = SurgeryPipeline(sessions, result_repository=MagicMock(), voice_confirmation=MagicMock())
code, _ = await pipeline.classify_result_unavailable("123456")
assert code == "RESULT_NOT_READY"
@pytest.mark.asyncio
async def test_classify_starting() -> None:
sessions = MagicMock(spec=CameraSessionManager)
sessions.active_recording_phase = MagicMock(return_value="starting")
pipeline = SurgeryPipeline(sessions, result_repository=MagicMock(), voice_confirmation=MagicMock())
code, _ = await pipeline.classify_result_unavailable("123456")
assert code == "RESULT_NOT_READY"
def test_active_recording_phase() -> None:
from app.services.video.session_registry import SurgerySessionRegistry
reg = SurgerySessionRegistry()
st = SurgerySessionState(candidate_consumables=["纱布"])
run = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
reg._active["111111"] = run
assert reg.active_recording_phase("111111") == "starting"
st.ready.set()
assert reg.active_recording_phase("111111") == "recording"
assert reg.active_recording_phase("999999") is None