from __future__ import annotations import os import threading from ctypes import ( CDLL, POINTER, RTLD_GLOBAL, byref, c_byte, c_char_p, c_int, c_uint16, ) from ctypes import Structure from dataclasses import dataclass from pathlib import Path from loguru import logger @dataclass class HikvisionLoginResult: user_id: int device_info_raw: bytes class NET_DVR_DEVICEINFO_V30(Structure): """Opaque device info buffer for NET_DVR_Login_V30 (layout varies by SDK version).""" _fields_ = [("data", c_byte * 512)] @dataclass class HikvisionRuntime: """Loaded HCNetSDK with Init/Login/Logout/Cleanup.""" lib: CDLL _inited: bool = False @classmethod def try_load(cls, lib_dir: str | None) -> HikvisionRuntime | None: candidates: list[Path] = [] if lib_dir and lib_dir.strip(): base = Path(lib_dir).expanduser() candidates.append(base / "libhcnetsdk.so") candidates.append(base / "libHCNetSDK.so") env_path = os.environ.get("HIKVISION_LIB_PATH", "").strip() if env_path: candidates.append(Path(env_path).expanduser()) candidates.append(Path("/opt/hikvision/lib/libhcnetsdk.so")) candidates.append(Path("/opt/hikvision/lib/libHCNetSDK.so")) for path in candidates: if path.is_file(): try: lib = CDLL(str(path), mode=RTLD_GLOBAL) logger.info("Loaded Hikvision SDK: {}", path) return cls(lib=lib) except OSError as exc: logger.warning("Failed CDLL {}: {}", path, exc) return None def init(self) -> None: if self._inited: return fn = getattr(self.lib, "NET_DVR_Init", None) if fn is None: raise RuntimeError("NET_DVR_Init not found in HCNetSDK") fn.restype = c_int fn.argtypes = [] ret = int(fn()) if ret == 0: raise RuntimeError("NET_DVR_Init returned false") self._inited = True def cleanup(self) -> None: fn = getattr(self.lib, "NET_DVR_Cleanup", None) if fn is None: self._inited = False return fn.restype = None fn.argtypes = [] fn() self._inited = False def login_v30( self, *, ip: str, port: int, username: str, password: str, ) -> HikvisionLoginResult: login = getattr(self.lib, "NET_DVR_Login_V30", None) if login is None: raise RuntimeError("NET_DVR_Login_V30 not found in HCNetSDK") login.restype = c_int login.argtypes = [ c_char_p, c_uint16, c_char_p, c_char_p, POINTER(NET_DVR_DEVICEINFO_V30), ] dev = NET_DVR_DEVICEINFO_V30() user_id = login( ip.encode("utf-8"), c_uint16(port), username.encode("utf-8"), password.encode("utf-8"), byref(dev), ) if user_id < 0: err = self._last_error() raise RuntimeError(f"NET_DVR_Login_V30 failed (user_id={user_id}, err={err})") raw = bytes(dev.data) return HikvisionLoginResult(user_id=int(user_id), device_info_raw=raw) def logout(self, user_id: int) -> None: fn = getattr(self.lib, "NET_DVR_Logout", None) if fn is None: return fn.restype = c_int fn.argtypes = [c_int] fn(c_int(user_id)) def _last_error(self) -> str: get_err = getattr(self.lib, "NET_DVR_GetLastError", None) if get_err is None: return "unknown" get_err.restype = c_int get_err.argtypes = [] return str(int(get_err())) class HikvisionInitRefCount: """Process-wide NET_DVR_Init / NET_DVR_Cleanup pairing.""" _lock = threading.Lock() _count = 0 @classmethod def retain(cls, rt: HikvisionRuntime) -> None: with cls._lock: cls._count += 1 if cls._count == 1: rt.init() @classmethod def release(cls, rt: HikvisionRuntime) -> None: with cls._lock: if cls._count <= 0: return cls._count -= 1 if cls._count == 0: rt.cleanup()