Prevent concurrent RTSP recording on the same camera.

Reserve camera ownership under registry lock before spawning ffmpeg, return 409 for conflicts without retry, and transcode slices to 1080p H.264 for more stable playback.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
op
2026-05-26 16:32:55 +08:00
parent 32c1c3ac75
commit 3979e13ca9
12 changed files with 314 additions and 56 deletions

View File

@@ -95,6 +95,9 @@ POSTGRES_PORT=45432
# RTSP_RECORD_ALL_CAMERAS=false # RTSP_RECORD_ALL_CAMERAS=false
# RTSP_SEGMENT_DURATION_SEC=120 # RTSP_SEGMENT_DURATION_SEC=120
# RTSP_SEGMENT_MIN_SEC=10 # RTSP_SEGMENT_MIN_SEC=10
# RTSP_RECORD_HEIGHT=1080
# RTSP_RECORD_CRF=23
# RTSP_RECORD_PRESET=veryfast
# RTSP_FFMPEG_SOCKET_TIMEOUT_USEC=5000000 # RTSP_FFMPEG_SOCKET_TIMEOUT_USEC=5000000
# RTSP_PREWARM_ENABLED=false # RTSP_PREWARM_ENABLED=false
# RTSP_PREWARM_RECONNECT_MAX_SEC=30 # RTSP_PREWARM_RECONNECT_MAX_SEC=30

View File

@@ -46,6 +46,13 @@ from app.surgery_errors import SurgeryPipelineError
router = APIRouter() router = APIRouter()
_RECORDING_NON_RETRYABLE_CODES = frozenset(
{
"CAMERA_ALREADY_RECORDING",
"SURGERY_ALREADY_RECORDING",
}
)
def _pipeline_error_detail(exc: SurgeryPipelineError, surgery_id: str) -> dict: def _pipeline_error_detail(exc: SurgeryPipelineError, surgery_id: str) -> dict:
d: dict = { d: dict = {
@@ -59,8 +66,12 @@ def _pipeline_error_detail(exc: SurgeryPipelineError, surgery_id: str) -> dict:
def _raise_surgery_pipeline_http(exc: SurgeryPipelineError, surgery_id: str) -> None: def _raise_surgery_pipeline_http(exc: SurgeryPipelineError, surgery_id: str) -> None:
status_map = {
"CAMERA_ALREADY_RECORDING": status.HTTP_409_CONFLICT,
"SURGERY_ALREADY_RECORDING": status.HTTP_409_CONFLICT,
}
raise HTTPException( raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE, status_code=status_map.get(exc.code, status.HTTP_503_SERVICE_UNAVAILABLE),
detail=_pipeline_error_detail(exc, surgery_id), detail=_pipeline_error_detail(exc, surgery_id),
) from exc ) from exc
@@ -103,6 +114,8 @@ async def _call_recording_with_retries(
return return
except SurgeryPipelineError as exc: except SurgeryPipelineError as exc:
last_exc = exc last_exc = exc
if exc.code in _RECORDING_NON_RETRYABLE_CODES:
raise
if attempt < max_attempts: if attempt < max_attempts:
logger.warning( logger.warning(
"{} attempt {}/{} failed ({}), retrying in {}s", "{} attempt {}/{} failed ({}), retrying in {}s",
@@ -123,6 +136,7 @@ async def _call_recording_with_retries(
@router.get("/health", response_model=HealthResponse, tags=["health"]) @router.get("/health", response_model=HealthResponse, tags=["health"])
async def health() -> HealthResponse | JSONResponse: async def health() -> HealthResponse | JSONResponse:
logger.debug("Health check")
try: try:
await check_database() await check_database()
except SQLAlchemyError as exc: except SQLAlchemyError as exc:

View File

@@ -43,6 +43,9 @@ class _VideoGroup(_SettingsGroup):
"rtsp_record_all_cameras", "rtsp_record_all_cameras",
"rtsp_segment_duration_sec", "rtsp_segment_duration_sec",
"rtsp_segment_min_sec", "rtsp_segment_min_sec",
"rtsp_record_height",
"rtsp_record_crf",
"rtsp_record_preset",
"rtsp_slice_batch_max_concurrent", "rtsp_slice_batch_max_concurrent",
"rtsp_slice_batch_drain_timeout_sec", "rtsp_slice_batch_drain_timeout_sec",
"rtsp_segment_ttl_hours", "rtsp_segment_ttl_hours",
@@ -209,6 +212,22 @@ class Settings(BaseSettings):
le=600.0, le=600.0,
validation_alias=AliasChoices("RTSP_SEGMENT_MIN_SEC", "rtsp_segment_min_sec"), validation_alias=AliasChoices("RTSP_SEGMENT_MIN_SEC", "rtsp_segment_min_sec"),
) )
rtsp_record_height: int = Field(
default=1080,
ge=144,
le=2160,
validation_alias=AliasChoices("RTSP_RECORD_HEIGHT", "rtsp_record_height"),
)
rtsp_record_crf: int = Field(
default=23,
ge=18,
le=35,
validation_alias=AliasChoices("RTSP_RECORD_CRF", "rtsp_record_crf"),
)
rtsp_record_preset: str = Field(
default="veryfast",
validation_alias=AliasChoices("RTSP_RECORD_PRESET", "rtsp_record_preset"),
)
rtsp_slice_batch_max_concurrent: int = Field( rtsp_slice_batch_max_concurrent: int = Field(
default=1, default=1,
ge=1, ge=1,

View File

@@ -88,7 +88,7 @@ class SurgeryClientErrorDetail(BaseModel):
code: str = Field( code: str = Field(
description=( description=(
"业务错误码,如 RESULT_NOT_READY、RECORDING_CANNOT_START、" "业务错误码,如 RESULT_NOT_READY、RECORDING_CANNOT_START、CAMERA_ALREADY_RECORDING、"
"NO_PENDING_CONFIRMATION、VOICE_ASR_FAILED。" "NO_PENDING_CONFIRMATION、VOICE_ASR_FAILED。"
) )
) )

View File

@@ -39,11 +39,20 @@ def _safe_name(value: str) -> str:
return out[:96] or "unknown" return out[:96] or "unknown"
def rtsp_record_ffmpeg_scale_filter(*, max_height: int = 1080) -> str:
"""Scale RTSP input to at most *max_height* pixels tall (preserve aspect, even width)."""
height = max(144, int(max_height))
return f"scale=-2:{height}"
def _build_ffmpeg_cmd( def _build_ffmpeg_cmd(
*, *,
rtsp_url: str, rtsp_url: str,
output_path: Path, output_path: Path,
duration_sec: float, duration_sec: float,
record_height: int = 1080,
record_crf: int = 23,
record_preset: str = "veryfast",
) -> list[str]: ) -> list[str]:
return [ return [
ffmpeg_bin(), ffmpeg_bin(),
@@ -55,12 +64,19 @@ def _build_ffmpeg_cmd(
rtsp_url, rtsp_url,
"-t", "-t",
str(max(1.0, duration_sec)), str(max(1.0, duration_sec)),
# OR cameras often expose G.711/AAC audio that cannot be copied into MP4; vision needs video only.
"-map", "-map",
"0:v:0", "0:v:0",
"-c:v",
"copy",
"-an", "-an",
"-vf",
rtsp_record_ffmpeg_scale_filter(max_height=record_height),
"-c:v",
"libx264",
"-pix_fmt",
"yuv420p",
"-preset",
record_preset,
"-crf",
str(max(18, min(35, int(record_crf)))),
"-movflags", "-movflags",
"+faststart", "+faststart",
"-y", "-y",
@@ -80,6 +96,9 @@ class RtspSegmentRecorder:
output_dir: Path, output_dir: Path,
segment_duration_sec: float, segment_duration_sec: float,
segment_min_sec: float, segment_min_sec: float,
record_height: int = 1080,
record_crf: int = 23,
record_preset: str = "veryfast",
on_segment_complete: SegmentCallback, on_segment_complete: SegmentCallback,
ready_event: asyncio.Event | None = None, ready_event: asyncio.Event | None = None,
) -> None: ) -> None:
@@ -89,6 +108,9 @@ class RtspSegmentRecorder:
self._output_dir = output_dir self._output_dir = output_dir
self._segment_duration_sec = float(segment_duration_sec) self._segment_duration_sec = float(segment_duration_sec)
self._segment_min_sec = float(segment_min_sec) self._segment_min_sec = float(segment_min_sec)
self._record_height = int(record_height)
self._record_crf = int(record_crf)
self._record_preset = str(record_preset)
self._on_segment_complete = on_segment_complete self._on_segment_complete = on_segment_complete
self._ready_event = ready_event self._ready_event = ready_event
self._slice_index = 0 self._slice_index = 0
@@ -155,6 +177,9 @@ class RtspSegmentRecorder:
rtsp_url=self._rtsp_url, rtsp_url=self._rtsp_url,
output_path=output_path, output_path=output_path,
duration_sec=duration_sec, duration_sec=duration_sec,
record_height=self._record_height,
record_crf=self._record_crf,
record_preset=self._record_preset,
) )
logger.info( logger.info(
"RTSP recorder start surgery={} camera={} slice={} cmd={}", "RTSP recorder start surgery={} camera={} slice={} cmd={}",

View File

@@ -231,17 +231,19 @@ class CameraSessionManager:
str(exc), str(exc),
) from exc ) from exc
conflicts = self._registry.camera_recording_conflicts(surgery_id, record_cameras)
if conflicts:
cam, owner = conflicts[0]
raise SurgeryPipelineError(
"CAMERA_ALREADY_RECORDING",
f"机位 {cam} 正被手术 {owner} 录制,请先结束该场次再开录。",
)
primary_cam = record_cameras[0] primary_cam = record_cameras[0]
ready_event = asyncio.Event() ready_event = asyncio.Event()
hik_logouts: list[tuple[int, bool]] = [] hik_logouts: list[tuple[int, bool]] = []
t_start = time.monotonic() t_start = time.monotonic()
for cam in record_cameras:
if self._rtsp_prewarm is not None:
await self._rtsp_prewarm.pause(cam)
t_resolve = time.monotonic()
async def on_segment(event: SegmentCompleteEvent) -> None: async def on_segment(event: SegmentCompleteEvent) -> None:
await self._slice_batch.submit_slice( await self._slice_batch.submit_slice(
surgery_id=surgery_id, surgery_id=surgery_id,
@@ -250,58 +252,64 @@ class CameraSessionManager:
state=state, state=state,
) )
recorder_tasks: list[asyncio.Task[None]] = []
for cam in record_cameras:
kind = self._resolver.backend_for_camera(cam)
url, hik_uid, hik_retained = await self._resolve_rtsp_url(camera_id=cam, kind=kind)
if hik_uid is not None:
hik_logouts.append((hik_uid, hik_retained))
out_dir = rtsp_segments_dir(surgery_id) / _safe_log_name(cam)
recorder = RtspSegmentRecorder(
surgery_id=surgery_id,
camera_id=cam,
rtsp_url=url,
output_dir=out_dir,
segment_duration_sec=self._s.rtsp_segment_duration_sec,
segment_min_sec=self._s.rtsp_segment_min_sec,
on_segment_complete=on_segment,
ready_event=ready_event if cam == primary_cam else None,
)
recorder_tasks.append(
asyncio.create_task(
recorder.run(stop_event),
name=f"rtsp_recorder:{surgery_id}:{cam}",
)
)
resolve_ms = (time.monotonic() - t_resolve) * 1000.0
t_spawn = time.monotonic()
self._slice_batch.ensure_worker(surgery_id) self._slice_batch.ensure_worker(surgery_id)
run = RunningSurgery( run = RunningSurgery(
stop_event=stop_event, stop_event=stop_event,
state=state, state=state,
tasks=recorder_tasks, tasks=[],
record_camera_ids=list(record_cameras), record_camera_ids=list(record_cameras),
algo_process=None, algo_process=None,
) )
init_consumption_log_file(surgery_id) init_consumption_log_file(surgery_id)
init_voice_log_file(surgery_id) init_voice_log_file(surgery_id)
# Reserve cameras under registry lock before spawning ffmpeg (avoid dual RTSP pull).
await self._registry.register(surgery_id, run) await self._registry.register(surgery_id, run)
spawn_ms = (time.monotonic() - t_spawn) * 1000.0 t_resolve = time.monotonic()
t_ready = time.monotonic() open_timeout = float(self._s.video_open_timeout_sec) + 5.0
started_ok = False
prewarm_was_warm = ( prewarm_was_warm = (
self._rtsp_prewarm.was_warm(primary_cam) if self._rtsp_prewarm is not None else False self._rtsp_prewarm.was_warm(primary_cam) if self._rtsp_prewarm is not None else False
) )
open_timeout = float(self._s.video_open_timeout_sec) + 5.0
started_ok = False
try: try:
for cam in record_cameras:
if self._rtsp_prewarm is not None:
await self._rtsp_prewarm.pause(cam)
for cam in record_cameras:
kind = self._resolver.backend_for_camera(cam)
url, hik_uid, hik_retained = await self._resolve_rtsp_url(camera_id=cam, kind=kind)
if hik_uid is not None:
hik_logouts.append((hik_uid, hik_retained))
out_dir = rtsp_segments_dir(surgery_id) / _safe_log_name(cam)
recorder = RtspSegmentRecorder(
surgery_id=surgery_id,
camera_id=cam,
rtsp_url=url,
output_dir=out_dir,
segment_duration_sec=self._s.rtsp_segment_duration_sec,
segment_min_sec=self._s.rtsp_segment_min_sec,
record_height=self._s.rtsp_record_height,
record_crf=self._s.rtsp_record_crf,
record_preset=self._s.rtsp_record_preset,
on_segment_complete=on_segment,
ready_event=ready_event if cam == primary_cam else None,
)
run.tasks.append(
asyncio.create_task(
recorder.run(stop_event),
name=f"rtsp_recorder:{surgery_id}:{cam}",
)
)
resolve_ms = (time.monotonic() - t_resolve) * 1000.0
t_spawn = time.monotonic()
await asyncio.wait_for(ready_event.wait(), timeout=open_timeout) await asyncio.wait_for(ready_event.wait(), timeout=open_timeout)
state.ready.set() state.ready.set()
started_ok = True started_ok = True
ready_ms = (time.monotonic() - t_ready) * 1000.0 spawn_ms = (time.monotonic() - t_spawn) * 1000.0
ready_ms = spawn_ms
total_ms = (time.monotonic() - t_start) * 1000.0 total_ms = (time.monotonic() - t_start) * 1000.0
logger.info( logger.info(
"RTSP start_surgery timing surgery={} camera={} prewarm_was_warm={} " "RTSP start_surgery timing surgery={} camera={} prewarm_was_warm={} "
@@ -320,15 +328,13 @@ class CameraSessionManager:
surgery_id, surgery_id,
open_timeout, open_timeout,
) )
await self._force_stop_run(run, surgery_id)
await self._registry.unregister(surgery_id)
raise SurgeryPipelineError( raise SurgeryPipelineError(
"RECORDING_CANNOT_START", "RECORDING_CANNOT_START",
"开录未能确认RTSP 录像在超时内未就绪。", "开录未能确认RTSP 录像在超时内未就绪。",
) from exc ) from exc
except SurgeryPipelineError:
raise
except Exception: except Exception:
await self._force_stop_run(run, surgery_id)
await self._registry.unregister(surgery_id)
raise raise
finally: finally:
for hik_uid, hik_retained in hik_logouts: for hik_uid, hik_retained in hik_logouts:
@@ -336,9 +342,12 @@ class CameraSessionManager:
await asyncio.to_thread(self._hik.logout, hik_uid) await asyncio.to_thread(self._hik.logout, hik_uid)
if hik_retained and self._hik is not None: if hik_retained and self._hik is not None:
HikvisionInitRefCount.release(self._hik) HikvisionInitRefCount.release(self._hik)
if not started_ok and self._rtsp_prewarm is not None: if not started_ok:
for cam in record_cameras: await self._force_stop_run(run, surgery_id)
await self._rtsp_prewarm.resume(cam) await self._registry.unregister(surgery_id)
if self._rtsp_prewarm is not None:
for cam in record_cameras:
await self._rtsp_prewarm.resume(cam)
async def _force_stop_run(self, run: RunningSurgery, surgery_id: str) -> None: async def _force_stop_run(self, run: RunningSurgery, surgery_id: str) -> None:
run.stop_event.set() run.stop_event.set()

View File

@@ -121,6 +121,7 @@ class SurgerySessionRegistry:
def __init__(self) -> None: def __init__(self) -> None:
self._active: dict[str, RunningSurgery] = {} self._active: dict[str, RunningSurgery] = {}
self._camera_owners: dict[str, str] = {}
self._manager_lock = asyncio.Lock() self._manager_lock = asyncio.Lock()
@property @property
@@ -145,13 +146,40 @@ class SurgerySessionRegistry:
def active_ids(self) -> list[str]: def active_ids(self) -> list[str]:
return list(self._active.keys()) return list(self._active.keys())
def camera_recording_conflicts(
self,
surgery_id: str,
camera_ids: list[str],
) -> list[tuple[str, str]]:
"""Return ``(camera_id, owning_surgery_id)`` for cameras already in use."""
conflicts: list[tuple[str, str]] = []
for cam in camera_ids:
owner = self._camera_owners.get(cam)
if owner is not None and owner != surgery_id:
conflicts.append((cam, owner))
return conflicts
async def register(self, surgery_id: str, running: RunningSurgery) -> None: async def register(self, surgery_id: str, running: RunningSurgery) -> None:
async with self._manager_lock: async with self._manager_lock:
conflicts = self.camera_recording_conflicts(surgery_id, running.record_camera_ids)
if conflicts:
cam, owner = conflicts[0]
raise SurgeryPipelineError(
"CAMERA_ALREADY_RECORDING",
f"机位 {cam} 正被手术 {owner} 录制,请先结束该场次再开录。",
)
for cam in running.record_camera_ids:
self._camera_owners[cam] = surgery_id
self._active[surgery_id] = running self._active[surgery_id] = running
async def unregister(self, surgery_id: str) -> RunningSurgery | None: async def unregister(self, surgery_id: str) -> RunningSurgery | None:
async with self._manager_lock: async with self._manager_lock:
return self._active.pop(surgery_id, None) run = self._active.pop(surgery_id, None)
if run is not None:
for cam in run.record_camera_ids:
if self._camera_owners.get(cam) == surgery_id:
self._camera_owners.pop(cam, None)
return run
def live_consumption_if_active(self, surgery_id: str) -> list[SurgeryConsumptionStored] | None: def live_consumption_if_active(self, surgery_id: str) -> list[SurgeryConsumptionStored] | None:
run = self._active.get(surgery_id) run = self._active.get(surgery_id)

View File

@@ -229,6 +229,30 @@ def test_end_surgery_notifies_voice_terminal(api_app: FastAPI, instant_sleep: No
assert hub.get_assignment("t-end") is None assert hub.get_assignment("t-end") is None
def test_start_surgery_409_when_camera_already_recording(
api_app: FastAPI,
instant_sleep: None,
) -> None:
pipeline = MagicMock()
pipeline.start_recording = AsyncMock(
side_effect=SurgeryPipelineError(
"CAMERA_ALREADY_RECORDING",
"机位 or-cam-03 正被手术 100155 录制,请先结束该场次再开录。",
)
)
api_app.dependency_overrides[get_surgery_pipeline] = lambda: pipeline
client = TestClient(api_app)
r = client.post(
"/client/surgeries/start",
json={"surgery_id": "100003", "camera_ids": ["or-cam-03"], "candidate_consumables": []},
)
assert r.status_code == 409
d = r.json()["detail"]
assert d["code"] == "CAMERA_ALREADY_RECORDING"
assert d["surgery_id"] == "100003"
pipeline.start_recording.assert_awaited_once()
def test_start_surgery_503_on_pipeline_error(api_app: FastAPI, instant_sleep: None) -> None: def test_start_surgery_503_on_pipeline_error(api_app: FastAPI, instant_sleep: None) -> None:
pipeline = MagicMock() pipeline = MagicMock()
pipeline.start_recording = AsyncMock(side_effect=SurgeryPipelineError("RECORDING_CANNOT_START", "cannot")) pipeline.start_recording = AsyncMock(side_effect=SurgeryPipelineError("RECORDING_CANNOT_START", "cannot"))

View File

@@ -66,7 +66,13 @@ class _StubCameraSessionManager:
name_to_code={}, name_to_code={},
) )
state.ready.set() state.ready.set()
run = RunningSurgery(stop_event=asyncio.Event(), state=state, tasks=[]) primary = (camera_ids[0] if camera_ids else "or-cam-03").strip() or "or-cam-03"
run = RunningSurgery(
stop_event=asyncio.Event(),
state=state,
tasks=[],
record_camera_ids=[primary],
)
await self._registry.register(surgery_id, run) await self._registry.register(surgery_id, run)
def set_voice_terminal_id(self, surgery_id: str, terminal_id: str | None) -> None: def set_voice_terminal_id(self, surgery_id: str, terminal_id: str | None) -> None:

View File

@@ -5,7 +5,10 @@ from __future__ import annotations
from pathlib import Path from pathlib import Path
from app.services.video.rtsp_ffmpeg_opts import parse_rtsp_transport, rtsp_ffmpeg_input_opts from app.services.video.rtsp_ffmpeg_opts import parse_rtsp_transport, rtsp_ffmpeg_input_opts
from app.services.video.rtsp_segment_recorder import _build_ffmpeg_cmd from app.services.video.rtsp_segment_recorder import (
_build_ffmpeg_cmd,
rtsp_record_ffmpeg_scale_filter,
)
def test_parse_rtsp_transport_opencv_semicolon_format() -> None: def test_parse_rtsp_transport_opencv_semicolon_format() -> None:
@@ -38,12 +41,17 @@ def test_rtsp_ffmpeg_input_opts_omits_timeout_when_zero(monkeypatch) -> None:
assert "-stimeout" not in opts assert "-stimeout" not in opts
def test_rtsp_record_scale_filter_caps_height() -> None:
assert rtsp_record_ffmpeg_scale_filter(max_height=1080) == "scale=-2:1080"
def test_build_ffmpeg_cmd_uses_tcp_transport(monkeypatch) -> None: def test_build_ffmpeg_cmd_uses_tcp_transport(monkeypatch) -> None:
monkeypatch.setenv("OPENCV_FFMPEG_CAPTURE_OPTIONS", "rtsp_transport;tcp") monkeypatch.setenv("OPENCV_FFMPEG_CAPTURE_OPTIONS", "rtsp_transport;tcp")
cmd = _build_ffmpeg_cmd( cmd = _build_ffmpeg_cmd(
rtsp_url="rtsp://example/stream", rtsp_url="rtsp://example/stream",
output_path=Path("/tmp/slice_0000.mp4"), output_path=Path("/tmp/slice_0000.mp4"),
duration_sec=120.0, duration_sec=120.0,
record_height=1080,
) )
transport_idx = cmd.index("-rtsp_transport") transport_idx = cmd.index("-rtsp_transport")
assert cmd[transport_idx + 1] == "tcp" assert cmd[transport_idx + 1] == "tcp"
@@ -53,7 +61,10 @@ def test_build_ffmpeg_cmd_uses_tcp_transport(monkeypatch) -> None:
map_idx = cmd.index("-map") map_idx = cmd.index("-map")
assert cmd[map_idx + 1] == "0:v:0" assert cmd[map_idx + 1] == "0:v:0"
assert "-an" in cmd assert "-an" in cmd
assert "-c:v" in cmd and "copy" in cmd assert "-c:v" in cmd
assert cmd[cmd.index("-c:v") + 1] == "libx264"
assert "scale=-2:1080" in cmd
assert "-pix_fmt" in cmd and "yuv420p" in cmd
assert cmd.index("-i") > timeout_idx assert cmd.index("-i") > timeout_idx

View File

@@ -308,6 +308,123 @@ async def test_start_surgery_pauses_prewarm_and_resumes_on_failure(
prewarm.resume.assert_awaited_once_with("or-cam-03") prewarm.resume.assert_awaited_once_with("or-cam-03")
@pytest.mark.asyncio
async def test_start_surgery_rejects_when_camera_already_recording() -> None:
settings = Settings()
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
busy = SurgerySessionState(candidate_consumables=["纱布"])
busy.ready.set()
mgr._registry._active["100155"] = RunningSurgery(
stop_event=asyncio.Event(),
state=busy,
tasks=[],
record_camera_ids=["or-cam-03"],
)
mgr._registry._camera_owners["or-cam-03"] = "100155"
with pytest.raises(SurgeryPipelineError) as exc:
await mgr.start_surgery("100003", ["or-cam-03"], ["纱布"])
assert exc.value.code == "CAMERA_ALREADY_RECORDING"
assert "100155" in exc.value.message
assert "100003" not in mgr._registry._active
@pytest.mark.asyncio
async def test_registry_register_rejects_duplicate_camera_owner() -> None:
from app.services.video.session_registry import SurgerySessionRegistry
registry = SurgerySessionRegistry()
first = RunningSurgery(
stop_event=asyncio.Event(),
state=SurgerySessionState(candidate_consumables=["纱布"]),
tasks=[],
record_camera_ids=["or-cam-03"],
)
await registry.register("100155", first)
second = RunningSurgery(
stop_event=asyncio.Event(),
state=SurgerySessionState(candidate_consumables=["纱布"]),
tasks=[],
record_camera_ids=["or-cam-03"],
)
with pytest.raises(SurgeryPipelineError) as exc:
await registry.register("100003", second)
assert exc.value.code == "CAMERA_ALREADY_RECORDING"
await registry.unregister("100155")
assert registry.camera_recording_conflicts("100003", ["or-cam-03"]) == []
await registry.register("100003", second)
assert registry.get_running("100003") is second
@pytest.mark.asyncio
async def test_start_surgery_concurrent_camera_recording_spawns_single_ffmpeg(
monkeypatch: pytest.MonkeyPatch,
) -> None:
settings = Settings(video_open_timeout_sec=5.0)
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
ffmpeg_spawn_count = 0
ready_gate = asyncio.Event()
async def fake_recorder_run(_self: object, stop_event: asyncio.Event) -> None:
nonlocal ffmpeg_spawn_count
ffmpeg_spawn_count += 1
ready = getattr(_self, "_ready_event", None)
if ready is not None and not ready.is_set():
await ready_gate.wait()
ready.set()
await stop_event.wait()
monkeypatch.setattr(
"app.services.video.session_manager.RtspSegmentRecorder.run",
fake_recorder_run,
)
monkeypatch.setattr(
"app.services.video.session_manager.resolve_recording_cameras",
lambda *_a, **_k: ["or-cam-03"],
)
monkeypatch.setattr(
mgr,
"_resolve_rtsp_url",
AsyncMock(return_value=("rtsp://example/stream", None, False)),
)
async def start_one(surgery_id: str) -> SurgeryPipelineError | None:
try:
await mgr.start_surgery(surgery_id, ["or-cam-03"], ["纱布"])
except SurgeryPipelineError as exc:
return exc
return None
first = asyncio.create_task(start_one("100155"))
second = asyncio.create_task(start_one("100003"))
await asyncio.sleep(0.05)
ready_gate.set()
outcomes = await asyncio.gather(first, second)
codes = ["ok" if err is None else err.code for err in outcomes]
assert codes.count("ok") == 1
assert codes.count("CAMERA_ALREADY_RECORDING") == 1
assert ffmpeg_spawn_count == 1
active = mgr._registry.active_ids()
assert len(active) == 1
assert active[0] in {"100155", "100003"}
owner = active[0]
await mgr.stop_surgery(owner)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_stop_surgery_resumes_prewarm_for_recorded_cameras() -> None: async def test_stop_surgery_resumes_prewarm_for_recorded_cameras() -> None:
settings = Settings() settings = Settings()
@@ -328,6 +445,7 @@ async def test_stop_surgery_resumes_prewarm_for_recorded_cameras() -> None:
record_camera_ids=["or-cam-03"], record_camera_ids=["or-cam-03"],
) )
mgr._registry._active["123456"] = run mgr._registry._active["123456"] = run
mgr._registry._camera_owners["or-cam-03"] = "123456"
await mgr.stop_surgery("123456") await mgr.stop_surgery("123456")

View File

@@ -22,6 +22,7 @@
- 停录时 flush 尾切片并等待 batch 队列 drain`RTSP_SLICE_BATCH_DRAIN_TIMEOUT_SEC`)。 - 停录时 flush 尾切片并等待 batch 队列 drain`RTSP_SLICE_BATCH_DRAIN_TIMEOUT_SEC`)。
- 落盘切片默认 **24 小时**后自动删除(`RTSP_SEGMENT_TTL_HOURS`;进程启动与后台定时 sweep - 落盘切片默认 **24 小时**后自动删除(`RTSP_SEGMENT_TTL_HOURS`;进程启动与后台定时 sweep
- 设置 `RTSP_RECORD_ALL_CAMERAS=true` 可对请求中所有可解析 RTSP 的机位分别录像+跑 batch多机位代码已预留 - 设置 `RTSP_RECORD_ALL_CAMERAS=true` 可对请求中所有可解析 RTSP 的机位分别录像+跑 batch多机位代码已预留
- **同一机位同时只允许一场手术录制**(默认主摄 `RTSP_PRIMARY_CAMERA_ID`):另一场次开录同一 camera 时返回 `409` / `CAMERA_ALREADY_RECORDING`;注册表在拉起 ffmpeg 前即占用机位,避免双路 RTSP 抢流导致录像周期性丢帧。
## Docker 与 RTSP 地址 ## Docker 与 RTSP 地址