Files
Kevin 6b3adb4ad8 feat: 站点 JSON、语音终端 WebSocket 指派与客户端联调
- 用 OR_SITE_CONFIG_JSON_FILE 统一术间配置(video_rtsp_urls + voice_or_room_bindings)
- VoiceTerminalHub:assignment、WS 推送与 HTTP 查询;开录/停录后 notify
- 一键联调 orchestrate-and-start 与 /client/surgeries/start 共用指派逻辑,修复 demo 路径不发 WS
- 语音桌面端:SIGINT 退出、shutdown 清理、仅 WS 指派、固定 pending 轮询间隔、界面仅保留录音时长
- 新增/调整契约与绑定测试,文档与示例配置同步

Made-with: Cursor
2026-04-27 11:21:16 +08:00

224 lines
7.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Start/stop local fake RTSP streams (MediaMTX + ffmpeg) for dev orchestration.
Each input file is published once (no ``-stream_loop``); when ffmpeg exits the
process is gone — reconnect or re-orchestrate for another playthrough.
"""
from __future__ import annotations
import os
import shutil
import socket
import subprocess
import time
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from typing import ClassVar
from loguru import logger
MEDIAMTX_IMAGE = os.environ.get("MEDIAMTX_DOCKER_IMAGE", "bluenviron/mediamtx:latest")
CONTAINER_NAME_PREFIX = "orm-fake-rtsp-"
# 等待 127.0.0.1:host_port 可连接(避免开录时 Connection refused
_MEDIAMTX_TCP_READY_SEC = float(os.environ.get("MEDIAMTX_TCP_READY_SEC", "30"))
def _wait_tcp_listening(host: str, port: int, *, total_timeout: float) -> None:
"""Block until something accepts TCP on host:port (MediaMTX 映射口就绪)."""
deadline = time.monotonic() + max(1.0, total_timeout)
last: OSError | None = None
while time.monotonic() < deadline:
try:
with socket.create_connection((host, port), timeout=1.5):
logger.info("RTSP port ready {}:{}", host, port)
return
except OSError as exc:
last = exc
time.sleep(0.2)
hint = " MediaMTX 未监听:检查 docker 是否起成功、18554 是否被占用(orm-fake-rtsp-*) 已 docker ps。"
if last is not None:
raise RuntimeError(
f"等待 {host}:{port} 可连接超时({total_timeout:g}s: {last}{hint}"
) from last
raise RuntimeError(
f"等待 {host}:{port} 可连接超时({total_timeout:g}s.{hint}"
)
@dataclass
class StreamSpec:
camera_id: str
file_path: Path
rtsp_path: str # last segment, e.g. demo1
def __post_init__(self) -> None:
self.rtsp_path = (self.rtsp_path or "demo").strip().strip("/") or "demo"
@dataclass
class SyntheticRtspRun:
"""Holds Popen handles and docker container for one multi-stream session."""
container_name: str
procs: list[subprocess.Popen] = field(default_factory=list)
work_dir: Path | None = None # temp dir for uploaded video files; removed on stop
def stop(self) -> None:
for p in self.procs:
if p.poll() is None:
p.terminate()
try:
p.wait(timeout=5.0)
except subprocess.TimeoutExpired:
p.kill()
self.procs.clear()
if self.work_dir is not None and self.work_dir.is_dir():
try:
shutil.rmtree(self.work_dir, ignore_errors=True)
except OSError as exc:
logger.debug("rmtree work_dir: {}", exc)
self.work_dir = None
if shutil.which("docker") is not None:
try:
subprocess.run(
["docker", "rm", "-f", self.container_name],
capture_output=True,
timeout=30,
)
except (OSError, subprocess.SubprocessError) as exc:
logger.debug("docker rm: {}", exc)
self.work_dir = None
class SyntheticRtspManager:
_instance: ClassVar[SyntheticRtspManager | None] = None
_active: ClassVar[SyntheticRtspRun | None] = None
@classmethod
def get(cls) -> SyntheticRtspManager:
if cls._instance is None:
cls._instance = cls()
return cls._instance
@classmethod
def active_run(cls) -> SyntheticRtspRun | None:
return cls._active
@classmethod
def _cleanup_prefixed_containers(cls) -> None:
"""Remove stale MediaMTX containers left by earlier runs/reloads."""
if shutil.which("docker") is None:
return
try:
listed = subprocess.run(
[
"docker",
"ps",
"-aq",
"--filter",
f"name={CONTAINER_NAME_PREFIX}",
],
capture_output=True,
text=True,
timeout=30,
check=False,
)
except (OSError, subprocess.SubprocessError) as exc:
logger.debug("docker ps stale cleanup: {}", exc)
return
ids = [x.strip() for x in (listed.stdout or "").splitlines() if x.strip()]
if not ids:
return
try:
subprocess.run(
["docker", "rm", "-f", *ids],
capture_output=True,
text=True,
timeout=60,
check=False,
)
logger.info("Removed stale fake RTSP containers: {}", ids)
except (OSError, subprocess.SubprocessError) as exc:
logger.debug("docker rm stale cleanup: {}", exc)
@classmethod
def stop_active(cls) -> None:
if cls._active is not None:
cls._active.stop()
cls._active = None
cls._cleanup_prefixed_containers()
def start(
self,
streams: list[StreamSpec],
*,
host_port: int,
work_dir: Path,
) -> tuple[SyntheticRtspRun, dict[str, str]]:
"""Start MediaMTX and one ffmpeg per stream. Returns (run, url_by_camera)."""
if not streams:
raise ValueError("no streams")
if not shutil.which("ffmpeg"):
raise RuntimeError("ffmpeg not in PATH")
if not shutil.which("docker"):
raise RuntimeError("docker not in PATH (required to run MediaMTX)")
self.stop_active()
for s in streams:
if not s.file_path.is_file():
raise FileNotFoundError(str(s.file_path))
for ch in s.rtsp_path:
if ch not in "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_.-":
raise ValueError(f"invalid RTSP path segment: {s.rtsp_path!r}")
container = CONTAINER_NAME_PREFIX + uuid.uuid4().hex[:12]
cmd = [
"docker", "run", "-d", "--name", container,
"-p", f"127.0.0.1:{host_port}:8554",
MEDIAMTX_IMAGE,
]
r = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if r.returncode != 0:
try:
subprocess.run(
["docker", "rm", "-f", container],
capture_output=True,
text=True,
timeout=30,
check=False,
)
except (OSError, subprocess.SubprocessError) as exc:
logger.debug("docker rm failed container cleanup: {}", exc)
err = (r.stderr or r.stdout or "").strip()
raise RuntimeError(f"MediaMTX docker failed: {err}")
run = SyntheticRtspRun(container_name=container)
url_map: dict[str, str] = {}
time.sleep(0.5)
_wait_tcp_listening("127.0.0.1", host_port, total_timeout=_MEDIAMTX_TCP_READY_SEC)
run.work_dir = work_dir
try:
for s in streams:
dest = f"rtsp://127.0.0.1:{host_port}/{s.rtsp_path}"
url_map[s.camera_id] = dest
pub = [
"ffmpeg", "-hide_banner", "-loglevel", "warning",
"-re",
"-i", str(s.file_path),
"-c", "copy", "-f", "rtsp", "-rtsp_transport", "tcp", dest,
]
p = subprocess.Popen(pub) # noqa: S603
run.procs.append(p)
except Exception:
run.stop()
raise
# 给 ffmpeg 一点时间连上 MediaMTX减少首帧前 OpenCV 连上却 DESCRIBE 失败
time.sleep(0.4)
self._active = run
return run, url_map