Files
Kevin 6b3adb4ad8 feat: 站点 JSON、语音终端 WebSocket 指派与客户端联调
- 用 OR_SITE_CONFIG_JSON_FILE 统一术间配置(video_rtsp_urls + voice_or_room_bindings)
- VoiceTerminalHub:assignment、WS 推送与 HTTP 查询;开录/停录后 notify
- 一键联调 orchestrate-and-start 与 /client/surgeries/start 共用指派逻辑,修复 demo 路径不发 WS
- 语音桌面端:SIGINT 退出、shutdown 清理、仅 WS 指派、固定 pending 轮询间隔、界面仅保留录音时长
- 新增/调整契约与绑定测试,文档与示例配置同步

Made-with: Cursor
2026-04-27 11:21:16 +08:00

575 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import asyncio
import time
from datetime import datetime, timezone
from pathlib import Path
from loguru import logger
from app.baked import algorithm as ba
from app.baked import pipeline as bp
from app.config import Settings
from sqlalchemy.ext.asyncio import async_sessionmaker
from app.database import AsyncSessionLocal
from app.domain.consumption import SurgeryConsumptionStored
from app.repositories.surgery_results import SurgeryResultRepository
from app.services.consumable_vision_algorithm import (
ConsumableVisionAlgorithmService,
PredictionResult,
)
from app.services.video.archive_persister import ArchivePersister
from app.services.video.backend_resolver import BackendResolver
from app.services.video.classification_handler import VisionClassificationHandler
from app.services.video.hikvision_runtime import HikvisionInitRefCount, HikvisionRuntime
from app.services.video.inference_aggregator import WindowInferenceAggregator
from app.services.video.session_registry import (
CameraStreamInferState,
PendingConsumableConfirmation,
RunningSurgery,
SurgerySessionRegistry,
SurgerySessionState,
format_elapsed_mmss_since,
)
from app.services.tear_gated_segment_consumption.product_map import (
load_tear_segment_name_to_id,
resolve_tear_segment_labels_yaml_path,
)
from app.services.tear_gated_segment_consumption.report import write_tear_segment_txt
from app.services.tear_gated_segment_consumption.runner import (
TearGatedSegmentModelBundle,
TearGatedSegmentRunner,
)
from app.services.video.stream_worker import CameraStreamWorker, redact_rtsp_url
from app.services.video.types import VideoBackendKind
from app.schemas import SurgeryConsumptionDetail, build_consumption_summary
from app.services.consumption_tsv_log import (
append_consumption_log_summary,
init_consumption_log_file,
print_consumption_summary_markdown,
)
from app.services.voice_file_log import init_voice_log_file
from app.surgery_errors import SurgeryPipelineError
__all__ = [
"CameraSessionManager",
"CameraStreamInferState",
"PendingConsumableConfirmation",
"RunningSurgery",
"SurgerySessionState",
]
class CameraSessionManager:
"""Per-surgery camera orchestration.
本类负责:
1. 开始/停止手术:创建 `SurgerySessionState`、拉起相机 worker、停录时收尾。
2. 把「语音确认所需的内存态」委托给 ``SurgerySessionRegistry``(实现 `PendingConfirmationStore`)。
3. 把「结果写库 + 失败重试 + durable fallback」委托给 ``ArchivePersister``。
对外接口保持不变,上游(``SurgeryPipeline`` / ``VoiceConfirmationService``)无需感知拆分。
"""
def __init__(
self,
*,
settings: Settings,
vision_algorithm: ConsumableVisionAlgorithmService,
hikvision_runtime: HikvisionRuntime | None,
result_repository: SurgeryResultRepository | None = None,
session_factory: async_sessionmaker | None = None,
registry: SurgerySessionRegistry | None = None,
archive_persister: ArchivePersister | None = None,
tear_segment_models: TearGatedSegmentModelBundle | None = None,
) -> None:
self._s = settings
self._vision = vision_algorithm
self._hik = hikvision_runtime
self._tear_models = tear_segment_models
self._session_factory: async_sessionmaker = session_factory or AsyncSessionLocal
self._resolver = BackendResolver(settings, hikvision_runtime=hikvision_runtime)
self._registry = registry or SurgerySessionRegistry()
self._archive = archive_persister or ArchivePersister(
repository=result_repository,
session_factory=self._session_factory,
)
self._aggregator = WindowInferenceAggregator()
self._classifier_handler = VisionClassificationHandler(
registry=self._registry,
)
# ------------------------------------------------------------------
# 生命周期
# ------------------------------------------------------------------
async def start_archive_retry_loop(self) -> None:
await self._archive.recover_from_durable_fallback()
await self._archive.start_retry_loop()
async def shutdown(self) -> None:
await self._archive.shutdown()
ids = self._registry.active_ids()
for sid in ids:
try:
await self.stop_surgery(sid, require_active=False)
except Exception as exc:
logger.warning("shutdown stop_surgery {}: {}", sid, exc)
# ------------------------------------------------------------------
# Surgery start / stop
# ------------------------------------------------------------------
async def start_surgery(
self,
surgery_id: str,
camera_ids: list[str],
candidate_consumables: list[str],
) -> None:
if self._registry.has_active(surgery_id):
raise SurgeryPipelineError(
"RECORDING_CANNOT_START",
"该手术已在录制中,请勿重复开始。",
)
stale = await self._archive.take_archived_details(surgery_id)
if stale is not None:
logger.warning(
"surgery_id={} 仍有未落库归档,尝试写入数据库后再开始新会话",
surgery_id,
)
if self._archive.repository is None:
logger.error(
"surgery_id={} 有内存归档但未配置数据库仓库,无法持久化;"
"开始新会话将丢弃该归档(仅开发/无库模式)",
surgery_id,
)
else:
ok = await self._archive.persist_or_archive(surgery_id, stale)
if not ok:
raise SurgeryPipelineError(
"RECORDING_CANNOT_START",
"该手术号存在尚未写入数据库的历史结果,请修复数据库或等待自动重试成功后再开始。",
)
resolved = self._vision.effective_candidate_consumables(candidate_consumables)
if not resolved:
raise SurgeryPipelineError(
"RECORDING_CANNOT_START",
"耗材候选为空:请在请求中传入 candidate_consumables或提供有效的 consumable_classifier_labels.yaml / 分类模型。",
)
if not any(str(x).strip() for x in candidate_consumables):
logger.info(
"surgery {}: candidate_consumables 未提供,使用默认全量 {}",
surgery_id,
len(resolved),
)
name_to_code = self._vision.build_name_mapping(resolved)
state = SurgerySessionState(
candidate_consumables=list(resolved),
name_to_code=name_to_code,
surgery_started_wall=time.time(),
)
stop_event = asyncio.Event()
readies = [asyncio.Event() for _ in camera_ids]
tasks: list[asyncio.Task[None]] = []
open_timeout = bp.VIDEO_OPEN_TIMEOUT_SEC + 5.0
for cam_id, ready in zip(camera_ids, readies, strict=True):
tasks.append(
asyncio.create_task(
self._camera_worker(
surgery_id=surgery_id,
camera_id=cam_id,
stream_ready=ready,
stop_event=stop_event,
state=state,
),
name=f"camera:{surgery_id}:{cam_id}",
)
)
run = RunningSurgery(stop_event=stop_event, state=state, tasks=tasks)
init_consumption_log_file(surgery_id)
init_voice_log_file(surgery_id)
await self._registry.register(surgery_id, run)
if ba.TEAR_SEGMENT_ENABLED:
primary = (ba.TEAR_SEGMENT_PRIMARY_CAMERA_ID or "").strip()
if primary and primary not in camera_ids:
logger.warning(
"撕段算法已开启但主摄 id={!r} 不在本台开录 camera_ids={} 中,该路不会跑撕段流水线",
primary,
camera_ids,
)
try:
await asyncio.wait_for(
asyncio.gather(*(r.wait() for r in readies)),
timeout=open_timeout,
)
state.ready.set()
except TimeoutError as exc:
logger.error(
"Surgery {} cameras not all ready within {}s",
surgery_id,
open_timeout,
)
await self.stop_surgery(surgery_id, require_active=True)
raise SurgeryPipelineError(
"RECORDING_CANNOT_START",
"开录未能确认:部分摄像头在超时内未成功拉到首帧。",
) from exc
except Exception:
await self.stop_surgery(surgery_id, require_active=True)
raise
def set_voice_terminal_id(self, surgery_id: str, terminal_id: str | None) -> None:
"""开录成功后写入,供停录时向对应桌面终端推送 end。"""
run = self._registry.get_running(surgery_id)
if run is None:
return
tid = (terminal_id or "").strip()
run.state.voice_terminal_id = tid or None
async def stop_surgery(
self, surgery_id: str, *, require_active: bool = True
) -> str | None:
run = await self._registry.unregister(surgery_id)
if run is None:
if require_active:
raise SurgeryPipelineError(
"RECORDING_NOT_STOPPED",
"停录未能完成:当前没有该手术的活跃录制会话。",
)
return None
voice_tid = run.state.voice_terminal_id
run.stop_event.set()
results = await asyncio.gather(*run.tasks, return_exceptions=True)
for res in results:
if isinstance(res, BaseException):
logger.warning("surgery task finished with error: {}", res)
details = list(run.state.details)
detail_rows = [
SurgeryConsumptionDetail(
item_id=r.item_id,
item_name=r.item_name,
qty=r.qty,
doctor_id=r.doctor_id,
timestamp=r.timestamp,
)
for r in details
]
summ = build_consumption_summary(detail_rows)
totals = {s.item_id: (s.item_name, s.total_quantity) for s in summ}
append_consumption_log_summary(surgery_id, totals)
print_consumption_summary_markdown(totals)
await self._archive.persist_or_archive(surgery_id, details)
return voice_tid
# ------------------------------------------------------------------
# PendingConfirmationStore 协议委托
# ------------------------------------------------------------------
def live_consumption_if_active(
self, surgery_id: str
) -> list[SurgeryConsumptionStored] | None:
return self._registry.live_consumption_if_active(surgery_id)
def archived_consumption_fallback(
self, surgery_id: str
) -> list[SurgeryConsumptionStored] | None:
return self._archive.archived_details(surgery_id)
def record_voice_trace(
self,
surgery_id: str,
*,
asr_text: str | None,
error: str | None,
) -> None:
self._registry.record_voice_trace(surgery_id, asr_text=asr_text, error=error)
def get_pending_confirmation_by_id(
self,
surgery_id: str,
confirmation_id: str,
) -> PendingConsumableConfirmation | None:
return self._registry.get_pending_confirmation_by_id(surgery_id, confirmation_id)
def get_surgery_candidate_consumables(self, surgery_id: str) -> list[str]:
return self._registry.get_surgery_candidate_consumables(surgery_id)
async def record_voice_parse_failure(
self, surgery_id: str, confirmation_id: str
) -> tuple[int, int]:
return await self._registry.record_voice_parse_failure(surgery_id, confirmation_id)
def next_pending_confirmation(
self, surgery_id: str
) -> PendingConsumableConfirmation | None:
return self._registry.next_pending_confirmation(surgery_id)
async def resolve_pending_confirmation(
self,
surgery_id: str,
confirmation_id: str,
*,
chosen_label: str | None,
rejected: bool,
) -> None:
await self._registry.resolve_pending_confirmation(
surgery_id,
confirmation_id,
chosen_label=chosen_label,
rejected=rejected,
)
# ------------------------------------------------------------------
# Camera worker拉流 + 推理节流 + 时间窗分桶 + 分类结果处理)
# ------------------------------------------------------------------
async def _finalize_tear_segment_runner(
self,
*,
surgery_id: str,
camera_id: str,
state: SurgerySessionState,
runner: TearGatedSegmentRunner,
) -> None:
recs = runner.finalize()
for rec in recs:
wall_ts = runner.wall_time_for_record(rec)
detail_ts = datetime.fromtimestamp(wall_ts, tz=timezone.utc)
await self._registry.append_confirmed_detail(
state=state,
item_id=rec.item_id,
item_name=rec.item_name,
doctor_id=bp.VIDEO_RESULT_DOCTOR_ID,
source="tear_segment",
cooldown_key=f"{surgery_id}:tear_seg:{rec.segment_index}",
detail_timestamp=detail_ts,
)
if ba.TEAR_SEGMENT_LOG_TXT and recs:
raw_tpl = (ba.TEAR_SEGMENT_LOG_TXT_PATH or "").strip()
if raw_tpl and "{surgery_id}" in raw_tpl:
p = Path(raw_tpl.format(surgery_id=surgery_id)).expanduser()
elif raw_tpl:
p = Path(raw_tpl).expanduser()
else:
p = Path(f"logs/tear_segment_{surgery_id}.txt")
labels_src = str(resolve_tear_segment_labels_yaml_path())
write_tear_segment_txt(
path=p,
surgery_id=surgery_id,
camera_id=camera_id,
labels_source=labels_src,
records=recs,
)
logger.info("撕段报告已写: {}", p)
async def _camera_worker(
self,
*,
surgery_id: str,
camera_id: str,
stream_ready: asyncio.Event,
stop_event: asyncio.Event,
state: SurgerySessionState,
) -> None:
kind = self._resolver.backend_for_camera(camera_id)
hik_user_id: int | None = None
hik_init_retained = False
try:
url, hik_user_id, hik_init_retained = await self._resolve_rtsp_url(
camera_id=camera_id,
kind=kind,
)
assert url is not None
primary = (ba.TEAR_SEGMENT_PRIMARY_CAMERA_ID or "").strip()
use_tear_req = (
ba.TEAR_SEGMENT_ENABLED
and self._tear_models is not None
and primary
and camera_id == primary
)
runner: TearGatedSegmentRunner | None = None
if use_tear_req:
name_to_id = load_tear_segment_name_to_id()
try:
self._tear_models.ensure_loaded()
runner = self._tear_models.create_runner(name_to_id)
except Exception as exc:
logger.exception(
"撕段模型未就绪,本路回退为原时间窗算法 camera={} surgery={}: {}",
camera_id,
surgery_id,
exc,
)
runner = None
if runner is not None:
async def _frame_handler_tear(frame: object) -> None:
await asyncio.to_thread(runner.process_frame_bgr, frame)
w_tear = CameraStreamWorker(
surgery_id=surgery_id,
camera_id=camera_id,
url=url,
)
try:
await w_tear.run(
stream_ready=stream_ready,
stop_event=stop_event,
frame_handler=_frame_handler_tear,
)
finally:
await self._finalize_tear_segment_runner(
surgery_id=surgery_id,
camera_id=camera_id,
state=state,
runner=runner,
)
else:
last_infer = 0.0
async def _frame_handler(frame: object) -> None:
nonlocal last_infer
now = time.monotonic()
if now - last_infer < bp.VIDEO_INFERENCE_INTERVAL_SEC:
await asyncio.sleep(0.01)
return
last_infer = now
try:
snap = await asyncio.to_thread(
self._vision.infer_frame_bgr,
frame,
state.name_to_code,
)
except Exception as exc:
logger.debug(
"Inference skip camera={} surgery={}: {}",
camera_id,
surgery_id,
exc,
)
return
if snap is None:
return
if bp.VIDEO_LOG_INFERENCE_RESULTS:
logger.info(
"Vision result surgery={} camera={} 相对开录={} top1={}({:.3f}) top2={}({:.3f}) top3={}({:.3f})",
surgery_id,
camera_id,
format_elapsed_mmss_since(
state.surgery_started_wall,
at_epoch=time.time(),
),
snap.t1_name,
snap.t1_conf,
snap.t2_name,
snap.t2_conf,
snap.t3_name,
snap.t3_conf,
)
async with state.lock:
ready_windows = self._aggregator.ingest_snapshot_and_collect_ready(
surgery_id=surgery_id,
camera_id=camera_id,
snap=snap,
state=state,
)
for win in ready_windows:
await self._classifier_handler.handle(
state=state,
cls_res=win.prediction,
ready=win,
surgery_id=surgery_id,
camera_id=camera_id,
)
worker = CameraStreamWorker(
surgery_id=surgery_id,
camera_id=camera_id,
url=url,
)
await worker.run(
stream_ready=stream_ready,
stop_event=stop_event,
frame_handler=_frame_handler,
)
finally:
if hik_user_id is not None and self._hik is not None:
await asyncio.to_thread(self._hik.logout, hik_user_id)
if hik_init_retained and self._hik is not None:
HikvisionInitRefCount.release(self._hik)
async def _handle_classification_result(
self,
*,
state: SurgerySessionState,
cls_res: PredictionResult,
) -> None:
"""Deprecated test-shim沿用旧签名转发给 ``VisionClassificationHandler``。
保留此方法是因为单元测试直接调用了它。新代码应使用 ``self._classifier_handler.handle``。
"""
await self._classifier_handler.handle(state=state, cls_res=cls_res)
async def _resolve_rtsp_url(
self,
*,
camera_id: str,
kind: VideoBackendKind,
) -> tuple[str, int | None, bool]:
"""Returns (url, hikvision_user_id, whether NET_DVR_Init refcount was retained)."""
if kind != VideoBackendKind.HIKVISION_SDK:
return self._resolver.rtsp_url_for_camera(camera_id), None, False
if self._hik is None:
if self._s.hikvision_sdk_fallback_to_rtsp:
logger.warning(
"Hikvision SDK not loaded; fallback to RTSP for camera {}",
camera_id,
)
return self._resolver.rtsp_url_for_camera(camera_id), None, False
raise RuntimeError("Hikvision SDK requested but libhcnetsdk.so not loaded")
if not (
self._s.hikvision_device_ip.strip()
and self._s.hikvision_user.strip()
and self._s.hikvision_password.strip()
):
if self._s.hikvision_sdk_fallback_to_rtsp:
logger.warning(
"Hikvision credentials incomplete; fallback to RTSP for camera {}",
camera_id,
)
return self._resolver.rtsp_url_for_camera(camera_id), None, False
raise RuntimeError("Hikvision SDK requires HIKVISION_DEVICE_IP, user, password")
HikvisionInitRefCount.retain(self._hik)
try:
login = await asyncio.to_thread(
lambda: self._hik.login_v30(
ip=self._s.hikvision_device_ip.strip(),
port=int(self._s.hikvision_device_port),
username=self._s.hikvision_user.strip(),
password=self._s.hikvision_password.strip(),
)
)
except Exception as exc:
HikvisionInitRefCount.release(self._hik)
if self._s.hikvision_sdk_fallback_to_rtsp:
logger.warning("Hikvision login failed ({}); fallback to RTSP", exc)
return self._resolver.rtsp_url_for_camera(camera_id), None, False
raise
url = self._resolver.rtsp_url_after_hikvision_login(camera_id)
return url, login.user_id, True