Files
operating-room-monitor-server/app/services/video/hikvision_runtime.py
Kevin 04866559db feat: surgery pipeline API, video inference, voice confirm, and tests
- Add FastAPI routes for surgery start/end, results, pending confirmation (WAV upload), and health checks.
- Implement RTSP/Hikvision capture, consumable classification, session manager, MinIO/Baidu voice resolution, and DB persistence.
- Add documentation (client API, video backends, staging checklist) and sample camera/RTSP config.
- Add pytest suite (API contract, session manager, voice, repositories, pipeline persistence) and httpx dev dependency.
- Replace deprecated HTTP_422_UNPROCESSABLE_ENTITY with HTTP_422_UNPROCESSABLE_CONTENT.
- Fix SurgeryPipeline DB reads to use an explicit transaction with autobegin disabled.

Made-with: Cursor
2026-04-21 18:33:54 +08:00

158 lines
4.3 KiB
Python

from __future__ import annotations
import os
import threading
from ctypes import (
CDLL,
POINTER,
RTLD_GLOBAL,
byref,
c_byte,
c_char_p,
c_int,
c_uint16,
)
from ctypes import Structure
from dataclasses import dataclass
from pathlib import Path
from loguru import logger
@dataclass
class HikvisionLoginResult:
user_id: int
device_info_raw: bytes
class NET_DVR_DEVICEINFO_V30(Structure):
"""Opaque device info buffer for NET_DVR_Login_V30 (layout varies by SDK version)."""
_fields_ = [("data", c_byte * 512)]
@dataclass
class HikvisionRuntime:
"""Loaded HCNetSDK with Init/Login/Logout/Cleanup."""
lib: CDLL
_inited: bool = False
@classmethod
def try_load(cls, lib_dir: str | None) -> HikvisionRuntime | None:
candidates: list[Path] = []
if lib_dir and lib_dir.strip():
base = Path(lib_dir).expanduser()
candidates.append(base / "libhcnetsdk.so")
candidates.append(base / "libHCNetSDK.so")
env_path = os.environ.get("HIKVISION_LIB_PATH", "").strip()
if env_path:
candidates.append(Path(env_path).expanduser())
candidates.append(Path("/opt/hikvision/lib/libhcnetsdk.so"))
candidates.append(Path("/opt/hikvision/lib/libHCNetSDK.so"))
for path in candidates:
if path.is_file():
try:
lib = CDLL(str(path), mode=RTLD_GLOBAL)
logger.info("Loaded Hikvision SDK: {}", path)
return cls(lib=lib)
except OSError as exc:
logger.warning("Failed CDLL {}: {}", path, exc)
return None
def init(self) -> None:
if self._inited:
return
fn = getattr(self.lib, "NET_DVR_Init", None)
if fn is None:
raise RuntimeError("NET_DVR_Init not found in HCNetSDK")
fn.restype = c_int
fn.argtypes = []
ret = int(fn())
if ret == 0:
raise RuntimeError("NET_DVR_Init returned false")
self._inited = True
def cleanup(self) -> None:
fn = getattr(self.lib, "NET_DVR_Cleanup", None)
if fn is None:
self._inited = False
return
fn.restype = None
fn.argtypes = []
fn()
self._inited = False
def login_v30(
self,
*,
ip: str,
port: int,
username: str,
password: str,
) -> HikvisionLoginResult:
login = getattr(self.lib, "NET_DVR_Login_V30", None)
if login is None:
raise RuntimeError("NET_DVR_Login_V30 not found in HCNetSDK")
login.restype = c_int
login.argtypes = [
c_char_p,
c_uint16,
c_char_p,
c_char_p,
POINTER(NET_DVR_DEVICEINFO_V30),
]
dev = NET_DVR_DEVICEINFO_V30()
user_id = login(
ip.encode("utf-8"),
c_uint16(port),
username.encode("utf-8"),
password.encode("utf-8"),
byref(dev),
)
if user_id < 0:
err = self._last_error()
raise RuntimeError(f"NET_DVR_Login_V30 failed (user_id={user_id}, err={err})")
raw = bytes(dev.data)
return HikvisionLoginResult(user_id=int(user_id), device_info_raw=raw)
def logout(self, user_id: int) -> None:
fn = getattr(self.lib, "NET_DVR_Logout", None)
if fn is None:
return
fn.restype = c_int
fn.argtypes = [c_int]
fn(c_int(user_id))
def _last_error(self) -> str:
get_err = getattr(self.lib, "NET_DVR_GetLastError", None)
if get_err is None:
return "unknown"
get_err.restype = c_int
get_err.argtypes = []
return str(int(get_err()))
class HikvisionInitRefCount:
"""Process-wide NET_DVR_Init / NET_DVR_Cleanup pairing."""
_lock = threading.Lock()
_count = 0
@classmethod
def retain(cls, rt: HikvisionRuntime) -> None:
with cls._lock:
cls._count += 1
if cls._count == 1:
rt.init()
@classmethod
def release(cls, rt: HikvisionRuntime) -> None:
with cls._lock:
if cls._count <= 0:
return
cls._count -= 1
if cls._count == 0:
rt.cleanup()