diff --git a/backend/.env.example b/backend/.env.example index 0fd4e29..944839e 100755 --- a/backend/.env.example +++ b/backend/.env.example @@ -81,22 +81,28 @@ POSTGRES_PORT=45432 # MINIO_PORT=19000 # MINIO_CONSOLE_PORT=19001 +# --- 视频:RTSP 切片 + batch(链路 1)--- +# RTSP_PRIMARY_CAMERA_ID=or-cam-03 +# RTSP_RECORD_ALL_CAMERAS=false +# RTSP_SEGMENT_DURATION_SEC=120 +# RTSP_SEGMENT_MIN_SEC=10 +# RTSP_SEGMENT_TTL_HOURS=24 +# RTSP_SLICE_BATCH_MAX_CONCURRENT=1 +# RTSP_SLICE_BATCH_DRAIN_TIMEOUT_SEC=900 + # --- Demo 客户端 / 语音确认客户端 --- # 独立部署的 Demo 客户端 / 语音确认页访问 API 时需放行 CORS;正式部署建议收窄 origins。 # DEMO_CORS_ENABLED=true # 跨主机语音页访问 API 时,可先用 * 联调;生产建议改成具体语音页来源,如 http://192.168.1.100:8080 # DEMO_CORS_ORIGINS=* -# 链路 2(模拟实时)与链路 3(离线 batch)需开启;链路 1 真 RTSP 开录不依赖此项 +# 链路 3(离线 batch)Demo 需开启;链路 1 真 RTSP 开录不依赖此项 # DEMO_ORCHESTRATOR_ENABLED=false -# DEMO_ORCHESTRATOR_RTSP_PORT=18554 # 联调台 HLS(Compose:先 docker compose up -d mediamtx-hls api) # Compose 内 api 反代侧车:DEMO_HLS_PREVIEW_UPSTREAM=http://mediamtx-hls:8888 # 本机 uvicorn 直连 published 端口:DEMO_HLS_PREVIEW_UPSTREAM=http://127.0.0.1:18888 # 本机 uvicorn 且无 mediamtx-hls 侧车时改为:DEMO_HLS_PREVIEW_UPSTREAM=ephemeral # DEMO_HLS_PREVIEW_PORT=18888 -# Docker 内 API 访问宿主机假流时写入站点 JSON 的主机名(默认 host.docker.internal) -# DOCKER_DEMO_ORCHESTRATOR_RTSP_JSON_HOST=host.docker.internal -# 链路 2 simulated-start / fake_rtsp_from_file 起 MediaMTX 容器用 +# HLS 预览 MediaMTX 镜像(Compose mediamtx-hls 与 ensure 临时容器共用) # MEDIAMTX_DOCKER_IMAGE=m.daocloud.io/docker.io/bluenviron/mediamtx:latest # --- 语音确认静态页(clients/voice-confirmation/start.sh)--- diff --git a/backend/app/algo_host/result_adapter.py b/backend/app/algo_host/result_adapter.py index ccd759d..9ea9423 100644 --- a/backend/app/algo_host/result_adapter.py +++ b/backend/app/algo_host/result_adapter.py @@ -140,6 +140,61 @@ def doctor_id_for_consumption_rows(doctor: ReferenceDoctorInfo | None) -> str: return bp.VIDEO_RESULT_DOCTOR_ID +@dataclass(frozen=True) +class ReferenceTsvRow: + rank: int + start_sec: float + end_sec: float + item_id: str + item_name: str + top1_conf: float + top2_name: str + top2_conf: float + top3_name: str + top3_conf: float + + +def _float_cell(raw: str | None, default: float = 0.0) -> float: + try: + return float((raw or "").strip() or default) + except ValueError: + return default + + +def parse_reference_tsv_rows(path: Path) -> list[ReferenceTsvRow]: + out: list[ReferenceTsvRow] = [] + if not path.is_file(): + return out + with path.open("r", encoding="utf-8", newline="") as f: + reader = csv.DictReader(f, delimiter="\t") + for row in reader: + name = (row.get("top1_name") or "").strip() + if not name or name.startswith("("): + continue + if name.startswith("医生信息"): + continue + try: + rank = int((row.get("rank") or "0").strip() or 0) + except ValueError: + rank = 0 + item_id = (row.get("product_id_top1") or "").strip() or name + out.append( + ReferenceTsvRow( + rank=rank, + start_sec=_float_cell(row.get("start_sec")), + end_sec=_float_cell(row.get("end_sec")), + item_id=item_id, + item_name=name, + top1_conf=_float_cell(row.get("top1_conf")), + top2_name=(row.get("top2_name") or "").strip(), + top2_conf=_float_cell(row.get("top2_conf")), + top3_name=(row.get("top3_name") or "").strip(), + top3_conf=_float_cell(row.get("top3_conf")), + ) + ) + return out + + def parse_reference_tsv( path: Path, *, @@ -152,27 +207,15 @@ def parse_reference_tsv( doctor = parse_reference_doctor_info(path) row_doctor_id = doctor_id_for_consumption_rows(doctor) out: list[SurgeryConsumptionStored] = [] - with path.open("r", encoding="utf-8", newline="") as f: - reader = csv.DictReader(f, delimiter="\t") - for row in reader: - name = (row.get("top1_name") or "").strip() - if not name or name.startswith("("): - continue - if name.startswith("医生信息"): - continue - item_id = (row.get("product_id_top1") or "").strip() or name - try: - start_sec = float((row.get("start_sec") or "0").strip() or 0.0) - except ValueError: - start_sec = 0.0 - out.append( - SurgeryConsumptionStored( - item_id=item_id, - item_name=name, - qty=1, - doctor_id=row_doctor_id, - timestamp=base_timestamp + timedelta(seconds=max(0.0, start_sec)), - source="video_batch", - ) + for row in parse_reference_tsv_rows(path): + out.append( + SurgeryConsumptionStored( + item_id=row.item_id, + item_name=row.item_name, + qty=1, + doctor_id=row_doctor_id, + timestamp=base_timestamp + timedelta(seconds=max(0.0, row.start_sec)), + source="video_batch", ) + ) return out diff --git a/backend/app/algo_host/slice_result_integrator.py b/backend/app/algo_host/slice_result_integrator.py new file mode 100644 index 0000000..5f77cde --- /dev/null +++ b/backend/app/algo_host/slice_result_integrator.py @@ -0,0 +1,268 @@ +"""Integrate 5.15 batch TSV rows into live surgery session (voice thresholds).""" + +from __future__ import annotations + +import uuid +from datetime import datetime, timedelta, timezone +from pathlib import Path + +from loguru import logger + +from app.algo_host.result_adapter import ( + ReferenceTsvRow, + doctor_id_for_consumption_rows, + parse_reference_doctor_info, + parse_reference_tsv_rows, +) +from app.algorithm_runner.segment_policy import ( + pad_ranked_candidates_for_voice, + rank_topk_for_candidates, +) +from app.baked import pipeline as bp +from app.domain.vision_prediction import ClsTop3, PredictionCandidate +from app.services.consumption_tsv_log import ( + append_consumption_pending_window, + append_consumption_window, + resolve_consumption_item_id, +) +from app.services.video.session_registry import ( + SurgerySessionRegistry, + SurgerySessionState, + format_elapsed_mmss_since, + pending_display_item_name_for_confirmation, +) +from app.services.voice_terminal_hub import VoiceTerminalHub + + +async def integrate_batch_slice_tsv( + *, + tsv_path: Path, + state: SurgerySessionState, + surgery_id: str, + camera_id: str, + slice_offset_sec: float, + surgery_started_wall: float | None, + registry: SurgerySessionRegistry, + voice_hub: VoiceTerminalHub | None, +) -> None: + """Apply batch TSV rows to session state using the same thresholds as realtime segment_policy.""" + path = tsv_path + doctor = parse_reference_doctor_info(path) + row_doctor_id = doctor_id_for_consumption_rows(doctor) + base_ts = datetime.fromtimestamp(surgery_started_wall or 0.0, tz=timezone.utc) + cand_order = [c.strip() for c in state.candidate_consumables if c.strip()] + cand_set = set(cand_order) + + for row in parse_reference_tsv_rows(path): + await _integrate_row( + row=row, + state=state, + surgery_id=surgery_id, + camera_id=camera_id, + slice_offset_sec=slice_offset_sec, + surgery_started_wall=surgery_started_wall, + registry=registry, + voice_hub=voice_hub, + row_doctor_id=row_doctor_id, + base_ts=base_ts, + cand_order=cand_order, + cand_set=cand_set, + ) + + +async def _integrate_row( + *, + row: ReferenceTsvRow, + state: SurgerySessionState, + surgery_id: str, + camera_id: str, + slice_offset_sec: float, + surgery_started_wall: float | None, + registry: SurgerySessionRegistry, + voice_hub: VoiceTerminalHub | None, + row_doctor_id: str, + base_ts: datetime, + cand_order: list[str], + cand_set: set[str], +) -> None: + conf = row.top1_conf + label = (row.item_name or "").strip() + voice_floor = bp.VIDEO_VOICE_CONFIRM_MIN_CONFIDENCE + if conf < voice_floor or not label: + return + if not cand_order: + return + + topk = [ + PredictionCandidate(label=label, confidence=conf), + ] + if row.top2_name: + topk.append(PredictionCandidate(label=row.top2_name, confidence=row.top2_conf)) + if row.top3_name: + topk.append(PredictionCandidate(label=row.top3_name, confidence=row.top3_conf)) + ranked = rank_topk_for_candidates(topk, cand_order) + auto_th = bp.VIDEO_AUTO_CONFIRM_CONFIDENCE + item_id = resolve_consumption_item_id(label, row.item_id, state.name_to_code) or "unknown" + cooldown_key = f"{camera_id}:{label}" + wall_lo = (surgery_started_wall or 0.0) + slice_offset_sec + row.start_sec + wall_hi = (surgery_started_wall or 0.0) + slice_offset_sec + row.end_sec + detail_ts = base_ts + timedelta(seconds=slice_offset_sec + row.start_sec) + + def in_allowed(name: str) -> bool: + return name in cand_set + + snap = ClsTop3( + t1_name=label, + t1_conf=conf, + t2_name=row.top2_name or "", + t2_conf=row.top2_conf, + t3_name=row.top3_name or "", + t3_conf=row.top3_conf, + t1_pid=row.item_id, + t2_pid="", + t3_pid="", + ) + + if conf >= auto_th and in_allowed(label): + await registry.append_confirmed_detail( + state=state, + item_id=item_id, + item_name=label, + doctor_id=row_doctor_id, + source="video_batch", + cooldown_key=cooldown_key, + detail_timestamp=detail_ts, + ) + if bp.CONSUMPTION_TSV_LOG_ENABLED or bp.CONSUMPTION_LOG_MARKDOWN_TERMINAL: + append_consumption_window( + surgery_id=surgery_id, + name_to_code=state.name_to_code, + best=snap, + doctor_id=row_doctor_id, + camera_id=camera_id, + wall_start_epoch=wall_lo, + wall_end_epoch=wall_hi, + since_recording_start=format_elapsed_mmss_since( + surgery_started_wall, + at_epoch=wall_hi, + ), + ) + return + + if conf >= auto_th and not in_allowed(label): + if ranked and bp.VOICE_CONFIRMATION_ENABLED: + await _enqueue_voice( + ranked=ranked, + cand_order=cand_order, + label=label, + conf=conf, + state=state, + surgery_id=surgery_id, + camera_id=camera_id, + snap=snap, + wall_lo=wall_lo, + wall_hi=wall_hi, + row_doctor_id=row_doctor_id, + registry=registry, + voice_hub=voice_hub, + ) + return + + if not bp.VOICE_CONFIRMATION_ENABLED: + return + + if ranked: + await _enqueue_voice( + ranked=ranked, + cand_order=cand_order, + label=label, + conf=conf, + state=state, + surgery_id=surgery_id, + camera_id=camera_id, + snap=snap, + wall_lo=wall_lo, + wall_hi=wall_hi, + row_doctor_id=row_doctor_id, + registry=registry, + voice_hub=voice_hub, + ) + elif in_allowed(label): + await _enqueue_voice( + ranked=[PredictionCandidate(label=label, confidence=conf)], + cand_order=cand_order, + label=label, + conf=conf, + state=state, + surgery_id=surgery_id, + camera_id=camera_id, + snap=snap, + wall_lo=wall_lo, + wall_hi=wall_hi, + row_doctor_id=row_doctor_id, + registry=registry, + voice_hub=voice_hub, + ) + + +async def _enqueue_voice( + *, + ranked: list[PredictionCandidate], + cand_order: list[str], + label: str, + conf: float, + state: SurgerySessionState, + surgery_id: str, + camera_id: str, + snap: ClsTop3, + wall_lo: float, + wall_hi: float, + row_doctor_id: str, + registry: SurgerySessionRegistry, + voice_hub: VoiceTerminalHub | None, +) -> None: + voice_opts = pad_ranked_candidates_for_voice( + ranked, + cand_order, + max_options=int(bp.VOICE_PROMPT_MAX_OPTIONS), + ) + cid = str(uuid.uuid4()) + ret = await registry.enqueue_pending_confirmation( + state, + voice_opts, + top_key=label, + top_confidence=conf, + confirmation_id=cid, + ) + if ret is None: + return + await registry.append_pending_consumption_detail( + state=state, + confirmation_id=ret, + doctor_id=row_doctor_id, + ) + pd_name = pending_display_item_name_for_confirmation(state.pending_by_id.get(ret)) + append_consumption_pending_window( + surgery_id=surgery_id, + confirmation_id=ret, + model_snap=snap, + doctor_id=row_doctor_id, + camera_id=camera_id, + wall_start_epoch=wall_lo, + wall_end_epoch=wall_hi, + item_display_name=pd_name, + since_recording_start=format_elapsed_mmss_since( + state.surgery_started_wall, + at_epoch=wall_hi, + ), + ) + vtid = (state.voice_terminal_id or "").strip() + if voice_hub is not None and vtid: + voice_hub.schedule_notify_pending_head(vtid, surgery_id) + logger.debug( + "batch slice voice confirm surgery={} camera={} label={} conf={:.3f}", + surgery_id, + camera_id, + label, + conf, + ) diff --git a/backend/app/api.py b/backend/app/api.py index 567b729..7d67117 100644 --- a/backend/app/api.py +++ b/backend/app/api.py @@ -138,7 +138,7 @@ async def health() -> HealthResponse | JSONResponse: @router.get( "/internal/demo/recording-modes-status", tags=["demo"], - summary="Demo 录制模式(链路 2/3)是否可用", + summary="Demo 离线 batch(链路 3)是否可用", description="供 demo 页探测;始终注册,不依赖 DEMO_ORCHESTRATOR_ENABLED。", ) async def recording_modes_status() -> dict: @@ -147,14 +147,10 @@ async def recording_modes_status() -> dict: return { "demo_recording_modes_enabled": enabled, "orchestrator_enabled": enabled, - "simulated_start_method": "POST", - "simulated_start_path": "/internal/demo/simulated-start", "offline_batch_method": "POST", "offline_batch_path": "/internal/demo/offline-batch", "or_site_config_json_file_set": bool(f), "or_site_config_json_file": f or None, - "orchestrator_rtsp_port": settings.demo_orchestrator_rtsp_port, - "orchestrator_rtsp_json_host": settings.demo_orchestrator_rtsp_json_host, "hls_preview_port": settings.demo_hls_preview_port, "hls_preview_upstream": HlsPreviewManager.resolve_upstream_setting( settings.demo_hls_preview_upstream @@ -165,6 +161,11 @@ async def recording_modes_status() -> dict: if HlsPreviewManager.resolve_upstream_setting(settings.demo_hls_preview_upstream) else "ephemeral" ), + "rtsp_primary_camera_id": settings.rtsp_primary_camera_id, + "rtsp_record_all_cameras": settings.rtsp_record_all_cameras, + "rtsp_segment_duration_sec": float(settings.rtsp_segment_duration_sec), + "rtsp_segment_ttl_hours": float(settings.rtsp_segment_ttl_hours), + "video_batch_vis_ttl_hours": float(bp.VIDEO_BATCH_VIS_TTL_HOURS), } diff --git a/backend/app/baked/pipeline.py b/backend/app/baked/pipeline.py index ffb7449..753dca4 100644 --- a/backend/app/baked/pipeline.py +++ b/backend/app/baked/pipeline.py @@ -44,3 +44,12 @@ VOICE_CONFIRM_MAX_FAILED_PARSE_ROUNDS: int = 2 # --- 非实时 batch 标注视频 / digest 级 pipeline 输入临时保留(小时)--- VIDEO_BATCH_VIS_TTL_HOURS: int = 24 VIDEO_BATCH_PIPELINE_INPUT_TTL_HOURS: int = 24 + +# --- 链路 1:RTSP 录像切片 + batch 子进程 --- +RTSP_PRIMARY_CAMERA_ID: str = "or-cam-03" +RTSP_RECORD_ALL_CAMERAS: bool = False +RTSP_SEGMENT_DURATION_SEC: float = 120.0 +RTSP_SEGMENT_MIN_SEC: float = 10.0 +RTSP_SLICE_BATCH_MAX_CONCURRENT: int = 1 +RTSP_SLICE_BATCH_DRAIN_TIMEOUT_SEC: float = 900.0 +RTSP_SEGMENT_TTL_HOURS: float = 24.0 diff --git a/backend/app/config.py b/backend/app/config.py index 40ce23d..3ce03f3 100755 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -39,6 +39,13 @@ class _VideoGroup(_SettingsGroup): "video_rtsp_url_template", "video_open_timeout_sec", "or_site_config_json_file", + "rtsp_primary_camera_id", + "rtsp_record_all_cameras", + "rtsp_segment_duration_sec", + "rtsp_segment_min_sec", + "rtsp_slice_batch_max_concurrent", + "rtsp_slice_batch_drain_timeout_sec", + "rtsp_segment_ttl_hours", ) @@ -89,8 +96,6 @@ class _DemoGroup(_SettingsGroup): "demo_cors_enabled", "demo_cors_origins", "demo_orchestrator_enabled", - "demo_orchestrator_rtsp_port", - "demo_orchestrator_rtsp_json_host", ) @@ -151,6 +156,51 @@ class Settings(BaseSettings): #: 手术室站点配置(UTF-8 JSON):须含 video_rtsp_urls 与 voice_or_room_bindings,见 or_site_config.sample.json or_site_config_json_file: str = "" + rtsp_primary_camera_id: str = Field( + default="or-cam-03", + validation_alias=AliasChoices("RTSP_PRIMARY_CAMERA_ID", "rtsp_primary_camera_id"), + ) + rtsp_record_all_cameras: bool = Field( + default=False, + validation_alias=AliasChoices("RTSP_RECORD_ALL_CAMERAS", "rtsp_record_all_cameras"), + ) + rtsp_segment_duration_sec: float = Field( + default=120.0, + ge=10.0, + le=3600.0, + validation_alias=AliasChoices("RTSP_SEGMENT_DURATION_SEC", "rtsp_segment_duration_sec"), + ) + rtsp_segment_min_sec: float = Field( + default=10.0, + ge=0.0, + le=600.0, + validation_alias=AliasChoices("RTSP_SEGMENT_MIN_SEC", "rtsp_segment_min_sec"), + ) + rtsp_slice_batch_max_concurrent: int = Field( + default=1, + ge=1, + le=8, + validation_alias=AliasChoices( + "RTSP_SLICE_BATCH_MAX_CONCURRENT", + "rtsp_slice_batch_max_concurrent", + ), + ) + rtsp_slice_batch_drain_timeout_sec: float = Field( + default=900.0, + ge=30.0, + le=86400.0, + validation_alias=AliasChoices( + "RTSP_SLICE_BATCH_DRAIN_TIMEOUT_SEC", + "rtsp_slice_batch_drain_timeout_sec", + ), + ) + rtsp_segment_ttl_hours: float = Field( + default=24.0, + ge=1.0, + le=720.0, + validation_alias=AliasChoices("RTSP_SEGMENT_TTL_HOURS", "rtsp_segment_ttl_hours"), + ) + hikvision_lib_dir: str = "/opt/hikvision/lib" hikvision_sdk_enabled: bool = False hikvision_device_ip: str = "" @@ -185,8 +235,6 @@ class Settings(BaseSettings): demo_cors_enabled: bool = True demo_cors_origins: str = "*" demo_orchestrator_enabled: bool = False - demo_orchestrator_rtsp_port: int = Field(default=18554, ge=1, le=65535) - demo_orchestrator_rtsp_json_host: str = "host.docker.internal" demo_hls_preview_port: int = Field(default=18888, ge=1, le=65535) demo_hls_preview_upstream: str = Field( default="http://127.0.0.1:18888", diff --git a/backend/app/dependencies.py b/backend/app/dependencies.py index d7c3699..8a0f8db 100644 --- a/backend/app/dependencies.py +++ b/backend/app/dependencies.py @@ -14,6 +14,7 @@ from sqlalchemy.ext.asyncio import async_sessionmaker from app.config import Settings from app.config import settings as _default_settings from app.database import AsyncSessionLocal +from app.algo_host.batch_service import BatchAlgorithmService from app.repositories.surgery_results import SurgeryResultRepository from app.repositories.voice_audits import VoiceAuditRepository from app.services.baidu_speech import BaiduSpeechService @@ -21,6 +22,8 @@ from app.services.minio_audio_storage import MinioAudioStorageService from app.services.surgery_pipeline import SurgeryPipeline from app.services.video.hikvision_runtime import HikvisionRuntime from app.services.video.session_manager import CameraSessionManager +from app.services.video.session_registry import SurgerySessionRegistry +from app.services.video.slice_batch_processor import SliceBatchProcessor from app.services.voice_resolution import VoiceConfirmationService from app.services.voice_terminal_hub import VoiceTerminalHub @@ -65,11 +68,21 @@ def build_container( voice_audit_repo = VoiceAuditRepository() baidu = BaiduSpeechService(app_settings=s) minio = MinioAudioStorageService(s) + batch_service = BatchAlgorithmService() + session_registry = SurgerySessionRegistry() + slice_batch = SliceBatchProcessor( + batch_service=batch_service, + registry=session_registry, + max_concurrent=s.rtsp_slice_batch_max_concurrent, + drain_timeout_sec=s.rtsp_slice_batch_drain_timeout_sec, + ) camera_mgr = CameraSessionManager( settings=s, hikvision_runtime=hik_runtime, + slice_batch_processor=slice_batch, result_repository=surgery_repo, session_factory=sf, + registry=session_registry, ) voice = VoiceConfirmationService( settings=s, diff --git a/backend/app/or_site_config.py b/backend/app/or_site_config.py index 683d8cd..56c31bd 100644 --- a/backend/app/or_site_config.py +++ b/backend/app/or_site_config.py @@ -60,34 +60,3 @@ def load_or_site_config_from_path(path: Path) -> OrSiteConfig: except json.JSONDecodeError as exc: raise ValueError(f"Invalid JSON in OR site config {p}: {exc}") from exc return parse_or_site_config_object(data, source=p) - - -def merge_video_rtsp_urls_into_file( - path: Path, - url_map: dict[str, str], - *, - replace_host: str, -) -> None: - """写入/更新站点配置中的 ``video_rtsp_urls``,保留 ``voice_or_room_bindings``。""" - if replace_host in ("", "127.0.0.1"): - out_urls = dict(url_map) - else: - out_urls = {k: v.replace("127.0.0.1", replace_host, 1) for k, v in url_map.items()} - path = path.expanduser() - path.parent.mkdir(parents=True, exist_ok=True) - if path.is_file(): - raw_text = path.read_text(encoding="utf-8") - data: Any = json.loads(raw_text) - parse_or_site_config_object(data, source=path) - bindings_list = data["voice_or_room_bindings"] - else: - bindings_list = [] - doc = { - "video_rtsp_urls": out_urls, - "voice_or_room_bindings": bindings_list, - } - text = json.dumps(doc, ensure_ascii=False, indent=2, sort_keys=True) + "\n" - temp = path.with_name(path.name + ".tmp") - temp.write_text(text, encoding="utf-8") - temp.replace(path) - logger.info("Updated video_rtsp_urls in OR site config {}", path) diff --git a/backend/app/routers/recording_demo.py b/backend/app/routers/recording_demo.py index 41d69b8..681ccea 100644 --- a/backend/app/routers/recording_demo.py +++ b/backend/app/routers/recording_demo.py @@ -1,4 +1,4 @@ -"""Demo 录制模式:链路 2 模拟实时、链路 3 离线 batch(需 DEMO_ORCHESTRATOR_ENABLED)。""" +"""Demo 录制模式:链路 3 离线 batch(需 DEMO_ORCHESTRATOR_ENABLED)。""" from __future__ import annotations @@ -15,22 +15,15 @@ from pydantic import BaseModel, Field from app.config import settings from app.consumable_catalog import normalize_candidate_consumables_raw -from app.dependencies import get_surgery_pipeline, get_voice_terminal_hub -from app.schemas import SurgeryApiResponse, SurgeryStartRequest -from app.services.recording_live import accept_live_recording +from app.dependencies import get_surgery_pipeline from app.services.offline_batch_timing import ( get_timing, mark_video_failed, mark_video_ready, set_text_timing, ) -from app.services.simulated_rtsp_setup import ( - prepare_simulated_rtsp_streams, - read_simulated_stream_uploads, -) from app.services.surgery_pipeline import SurgeryPipeline from app.baked import pipeline as bp -from app.services.synthetic_rtsp import SyntheticRtspManager from app.services.video_batch_cleanup import ( purge_batch_artifacts, purge_expired_pipeline_inputs, @@ -38,8 +31,6 @@ from app.services.video_batch_cleanup import ( purge_surgery_batch_tree, ) from app.algo_host import BatchAlgorithmService -from app.services.voice_terminal_hub import VoiceTerminalHub -from app.surgery_errors import SurgeryPipelineError router = APIRouter(prefix="/internal/demo", tags=["demo"]) @@ -55,20 +46,6 @@ def _require_demo_orchestrator() -> None: ) -def _require_site_config_path() -> Path: - path_raw = (settings.or_site_config_json_file or "").strip() - if not path_raw: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=( - "OR_SITE_CONFIG_JSON_FILE must be set to a writable path " - "(strict site JSON with video_rtsp_urls + voice_or_room_bindings); " - "in Docker, bind-mount a host file to this path." - ), - ) - return Path(path_raw).expanduser() - - def _background_finalize_visualization( runner: BatchAlgorithmService, surgery_id: str, @@ -329,111 +306,3 @@ async def offline_batch_visualization(surgery_id: str) -> FileResponse: filename=f"{surgery_id}_result_vis.mp4", headers={"Accept-Ranges": "bytes", "Cache-Control": "no-cache"}, ) - - -@router.post( - "/simulated-start", - response_model=SurgeryApiResponse, - summary="链路 2:模拟实时(上传 1–4 路视频并开录 + 语音)", - description=( - "仅当 DEMO_ORCHESTRATOR_ENABLED=true。合成假 RTSP 并写入 OR_SITE_CONFIG_JSON_FILE," - "再执行与 POST /client/surgeries/start 相同的实时开录与语音终端指派。" - ), -) -async def simulated_start( - surgery_id: Annotated[str, Form()], - video1: Annotated[UploadFile, File(description="第 1 路视频(必填,至少一路)")], - video2: Annotated[UploadFile | None, File(description="第 2 路视频(可选)")] = None, - video3: Annotated[UploadFile | None, File(description="第 3 路视频(可选)")] = None, - video4: Annotated[UploadFile | None, File(description="第 4 路视频(可选)")] = None, - camera_1: Annotated[str, Form()] = "or-cam-01", - camera_2: Annotated[str, Form()] = "or-cam-02", - camera_3: Annotated[str, Form()] = "or-cam-03", - camera_4: Annotated[str, Form()] = "or-cam-04", - rtsp_path_1: Annotated[str, Form()] = "demo1", - rtsp_path_2: Annotated[str, Form()] = "demo2", - rtsp_path_3: Annotated[str, Form()] = "demo3", - rtsp_path_4: Annotated[str, Form()] = "demo4", - candidate_consumables_json: Annotated[str, Form()] = "[]", - pipeline: SurgeryPipeline = Depends(get_surgery_pipeline), - voice_hub: VoiceTerminalHub = Depends(get_voice_terminal_hub), -) -> SurgeryApiResponse: - _require_demo_orchestrator() - json_path = _require_site_config_path() - logger.info( - "simulated-start: surgery_id={} cameras={} rpaths={}", - surgery_id, - (camera_1, camera_2, camera_3, camera_4), - (rtsp_path_1, rtsp_path_2, rtsp_path_3, rtsp_path_4), - ) - - try: - candidates = json.loads(candidate_consumables_json) - except json.JSONDecodeError as exc: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, - detail=f"invalid candidate_consumables_json: {exc}", - ) from exc - if not isinstance(candidates, list): - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, - detail="candidate_consumables_json must be a JSON array", - ) - candidates = normalize_candidate_consumables_raw(candidates) - - uploads = await read_simulated_stream_uploads( - video1=video1, - video2=video2, - video3=video3, - video4=video4, - camera_1=camera_1, - camera_2=camera_2, - camera_3=camera_3, - camera_4=camera_4, - rtsp_path_1=rtsp_path_1, - rtsp_path_2=rtsp_path_2, - rtsp_path_3=rtsp_path_3, - rtsp_path_4=rtsp_path_4, - ) - - try: - body = SurgeryStartRequest( - surgery_id=surgery_id, - camera_ids=[u.camera_id for u in uploads], - candidate_consumables=candidates, - ) - except Exception as exc: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, - detail=str(exc), - ) from exc - - try: - await prepare_simulated_rtsp_streams( - site_config_json_path=json_path, - uploads=uploads, - ) - except HTTPException: - raise - except Exception as exc: - await anyio.to_thread.run_sync(SyntheticRtspManager.stop_active) - raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, - detail=f"simulated RTSP setup failed: {exc}", - ) from exc - - try: - return await accept_live_recording( - pipeline, - voice_hub, - surgery_id=body.surgery_id, - camera_ids=list(body.camera_ids), - candidate_consumables=list(body.candidate_consumables), - message="假 RTSP 已起;映射已写入;摄像头录制已开始。", - ) - except SurgeryPipelineError as exc: - await anyio.to_thread.run_sync(SyntheticRtspManager.stop_active) - raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, - detail={"code": exc.code, "message": exc.message, "surgery_id": body.surgery_id}, - ) from exc diff --git a/backend/app/services/hls_preview.py b/backend/app/services/hls_preview.py index acb2b58..3c74c03 100644 --- a/backend/app/services/hls_preview.py +++ b/backend/app/services/hls_preview.py @@ -23,7 +23,7 @@ from urllib.request import Request, urlopen from loguru import logger -from app.services.synthetic_rtsp import MEDIAMTX_IMAGE +from app.services.mediamtx_constants import MEDIAMTX_IMAGE CONTAINER_NAME_PREFIX = "orm-hls-preview-" _MEDIAMTX_HLS_INTERNAL_PORT = 8888 diff --git a/backend/app/services/mediamtx_constants.py b/backend/app/services/mediamtx_constants.py new file mode 100644 index 0000000..57fa70b --- /dev/null +++ b/backend/app/services/mediamtx_constants.py @@ -0,0 +1,8 @@ +"""Shared MediaMTX Docker image constant (HLS preview).""" + +from __future__ import annotations + +import os + +_DEFAULT_MEDIAMTX_IMAGE = "m.daocloud.io/docker.io/bluenviron/mediamtx:latest" +MEDIAMTX_IMAGE = os.environ.get("MEDIAMTX_DOCKER_IMAGE", _DEFAULT_MEDIAMTX_IMAGE) diff --git a/backend/app/services/recording_live.py b/backend/app/services/recording_live.py index 702d647..c51b350 100644 --- a/backend/app/services/recording_live.py +++ b/backend/app/services/recording_live.py @@ -1,4 +1,4 @@ -"""共享实时开录:CameraSessionManager + 语音终端指派(链路 1 真 RTSP、链路 2 模拟 RTSP)。""" +"""共享实时开录:CameraSessionManager + 语音终端指派(链路 1 真 RTSP)。""" from __future__ import annotations diff --git a/backend/app/services/simulated_rtsp_setup.py b/backend/app/services/simulated_rtsp_setup.py deleted file mode 100644 index 698884e..0000000 --- a/backend/app/services/simulated_rtsp_setup.py +++ /dev/null @@ -1,160 +0,0 @@ -"""链路 2:上传视频 → 合成假 RTSP → 写入站点 JSON 的 video_rtsp_urls。""" - -from __future__ import annotations - -import shutil -import tempfile -from dataclasses import dataclass -from pathlib import Path - -import anyio -from fastapi import HTTPException, UploadFile, status -from loguru import logger - -from app.config import settings -from app.or_site_config import merge_video_rtsp_urls_into_file -from app.services.synthetic_rtsp import StreamSpec, SyntheticRtspManager - - -def simulated_rtsp_json_host() -> str: - """本进程内 MediaMTX 监听 127.0.0.1;OpenCV 须连同命名空间内的地址。""" - return "127.0.0.1" - - -@dataclass(frozen=True) -class SimulatedStreamUpload: - raw: bytes - ext: str - camera_id: str - rtsp_path: str - - -async def read_simulated_stream_uploads( - *, - video1: UploadFile, - video2: UploadFile | None, - video3: UploadFile | None, - video4: UploadFile | None, - camera_1: str, - camera_2: str, - camera_3: str, - camera_4: str, - rtsp_path_1: str, - rtsp_path_2: str, - rtsp_path_3: str, - rtsp_path_4: str, -) -> list[SimulatedStreamUpload]: - default_rtsp = ("demo1", "demo2", "demo3", "demo4") - slot_uploads = (video1, video2, video3, video4) - slot_cameras = ( - camera_1.strip(), - camera_2.strip(), - camera_3.strip(), - camera_4.strip(), - ) - slot_rpaths = ( - rtsp_path_1.strip(), - rtsp_path_2.strip(), - rtsp_path_3.strip(), - rtsp_path_4.strip(), - ) - - async def _bytes_and_suffix(u: UploadFile) -> tuple[bytes, str]: - raw = await u.read() - ext = Path(u.filename or "clip.mp4").suffix or ".mp4" - return raw, ext - - gathered: list[SimulatedStreamUpload] = [] - for idx, u in enumerate(slot_uploads): - if u is None: - break - raw, ext = await _bytes_and_suffix(u) - if not raw: - break - cam = slot_cameras[idx] or f"or-cam-0{idx + 1}" - rp = slot_rpaths[idx] or default_rtsp[idx] - gathered.append( - SimulatedStreamUpload(raw=raw, ext=ext, camera_id=cam, rtsp_path=rp) - ) - - if not gathered: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, - detail="至少需要一路非空视频(video1)", - ) - if len(gathered) > 4: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, - detail="最多 4 路视频", - ) - return gathered - - -async def prepare_simulated_rtsp_streams( - *, - site_config_json_path: Path, - uploads: list[SimulatedStreamUpload], -) -> list[str]: - """保存上传、启动假 RTSP、合并写入站点 JSON;返回 camera_id 列表。""" - work_root = Path(tempfile.mkdtemp(prefix="orm-sim-")) - try: - - def _save_files() -> None: - for i, item in enumerate(uploads): - fp = work_root / f"v{i + 1}{item.ext}" - fp.write_bytes(item.raw) - - await anyio.to_thread.run_sync(_save_files) - except OSError as exc: - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"failed to save uploads: {exc}", - ) from exc - - streams = [ - StreamSpec( - camera_id=item.camera_id, - file_path=work_root / f"v{i + 1}{item.ext}", - rtsp_path=item.rtsp_path, - ) - for i, item in enumerate(uploads) - ] - port = int(settings.demo_orchestrator_rtsp_port) - - try: - - def _start_synth() -> dict[str, str]: - mgr = SyntheticRtspManager.get() - _run, url_map = mgr.start(streams, host_port=port, work_dir=work_root) - return url_map - - url_map_host = await anyio.to_thread.run_sync(_start_synth) - except (FileNotFoundError, OSError, ValueError, RuntimeError) as exc: - logger.exception("synthetic RTSP start failed: {}", exc) - await anyio.to_thread.run_sync(SyntheticRtspManager.stop_active) - shutil.rmtree(work_root, ignore_errors=True) - raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, - detail=f"synthetic RTSP failed: {exc}", - ) from exc - - host_for_json = simulated_rtsp_json_host() - try: - - def _write() -> None: - merge_video_rtsp_urls_into_file( - site_config_json_path, - url_map_host, - replace_host=host_for_json, - ) - - await anyio.to_thread.run_sync(_write) - except OSError as exc: - await anyio.to_thread.run_sync(SyntheticRtspManager.stop_active) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"failed to write RTSP JSON file: {exc}", - ) from exc - - await anyio.sleep(0.2) - return [item.camera_id for item in uploads] diff --git a/backend/app/services/surgery_pipeline.py b/backend/app/services/surgery_pipeline.py index a40223f..ac56455 100644 --- a/backend/app/services/surgery_pipeline.py +++ b/backend/app/services/surgery_pipeline.py @@ -1,4 +1,4 @@ -"""手术录制与实时算法流水线(待接入真实子系统)。""" +"""手术录制与 RTSP 切片 batch 流水线。""" from __future__ import annotations diff --git a/backend/app/services/synthetic_rtsp.py b/backend/app/services/synthetic_rtsp.py deleted file mode 100644 index 6c73040..0000000 --- a/backend/app/services/synthetic_rtsp.py +++ /dev/null @@ -1,236 +0,0 @@ -"""Start/stop local fake RTSP streams (MediaMTX + ffmpeg) for dev orchestration. - -Each input file is published once (no ``-stream_loop``); when ffmpeg exits the -process is gone — reconnect or re-orchestrate for another playthrough. -""" - -from __future__ import annotations - -import os -import shutil -import socket -import subprocess -import time -import uuid -from dataclasses import dataclass, field -from pathlib import Path -from typing import ClassVar - -from loguru import logger - -# 默认走 DaoCloud 前缀,避免直连 registry-1.docker.io 超时(与 docker-compose.yml 基础镜像策略一致)。 -_DEFAULT_MEDIAMTX_IMAGE = "m.daocloud.io/docker.io/bluenviron/mediamtx:latest" -MEDIAMTX_IMAGE = os.environ.get("MEDIAMTX_DOCKER_IMAGE", _DEFAULT_MEDIAMTX_IMAGE) -CONTAINER_NAME_PREFIX = "orm-fake-rtsp-" -# 等待 127.0.0.1:host_port 可连接(避免开录时 Connection refused) -_MEDIAMTX_TCP_READY_SEC = float(os.environ.get("MEDIAMTX_TCP_READY_SEC", "30")) - - -def _wait_tcp_listening(host: str, port: int, *, total_timeout: float) -> None: - """Block until something accepts TCP on host:port (MediaMTX 映射口就绪).""" - deadline = time.monotonic() + max(1.0, total_timeout) - last: OSError | None = None - while time.monotonic() < deadline: - try: - with socket.create_connection((host, port), timeout=1.5): - logger.info("RTSP port ready {}:{}", host, port) - return - except OSError as exc: - last = exc - time.sleep(0.2) - hint = " MediaMTX 未监听:检查 docker 是否起成功、18554 是否被占用(orm-fake-rtsp-*) 已 docker ps。" - if last is not None: - raise RuntimeError(f"等待 {host}:{port} 可连接超时({total_timeout:g}s): {last}{hint}") from last - raise RuntimeError(f"等待 {host}:{port} 可连接超时({total_timeout:g}s).{hint}") - - -@dataclass -class StreamSpec: - camera_id: str - file_path: Path - rtsp_path: str # last segment, e.g. demo1 - - def __post_init__(self) -> None: - self.rtsp_path = (self.rtsp_path or "demo").strip().strip("/") or "demo" - - -@dataclass -class SyntheticRtspRun: - """Holds Popen handles and docker container for one multi-stream session.""" - - container_name: str - procs: list[subprocess.Popen] = field(default_factory=list) - work_dir: Path | None = None # temp dir for uploaded video files; removed on stop - - def stop(self) -> None: - for p in self.procs: - if p.poll() is None: - p.terminate() - try: - p.wait(timeout=5.0) - except subprocess.TimeoutExpired: - p.kill() - self.procs.clear() - if self.work_dir is not None and self.work_dir.is_dir(): - try: - shutil.rmtree(self.work_dir, ignore_errors=True) - except OSError as exc: - logger.debug("rmtree work_dir: {}", exc) - self.work_dir = None - if shutil.which("docker") is not None: - try: - subprocess.run( - ["docker", "rm", "-f", self.container_name], - capture_output=True, - timeout=30, - ) - except (OSError, subprocess.SubprocessError) as exc: - logger.debug("docker rm: {}", exc) - self.work_dir = None - - -class SyntheticRtspManager: - _instance: ClassVar[SyntheticRtspManager | None] = None - _active: ClassVar[SyntheticRtspRun | None] = None - - @classmethod - def get(cls) -> SyntheticRtspManager: - if cls._instance is None: - cls._instance = cls() - return cls._instance - - @classmethod - def active_run(cls) -> SyntheticRtspRun | None: - return cls._active - - @classmethod - def _cleanup_prefixed_containers(cls) -> None: - """Remove stale MediaMTX containers left by earlier runs/reloads.""" - if shutil.which("docker") is None: - return - try: - listed = subprocess.run( - [ - "docker", - "ps", - "-aq", - "--filter", - f"name={CONTAINER_NAME_PREFIX}", - ], - capture_output=True, - text=True, - timeout=30, - check=False, - ) - except (OSError, subprocess.SubprocessError) as exc: - logger.debug("docker ps stale cleanup: {}", exc) - return - ids = [x.strip() for x in (listed.stdout or "").splitlines() if x.strip()] - if not ids: - return - try: - subprocess.run( - ["docker", "rm", "-f", *ids], - capture_output=True, - text=True, - timeout=60, - check=False, - ) - logger.info("Removed stale fake RTSP containers: {}", ids) - except (OSError, subprocess.SubprocessError) as exc: - logger.debug("docker rm stale cleanup: {}", exc) - - @classmethod - def stop_active(cls) -> None: - if cls._active is not None: - cls._active.stop() - cls._active = None - cls._cleanup_prefixed_containers() - - def start( - self, - streams: list[StreamSpec], - *, - host_port: int, - work_dir: Path, - ) -> tuple[SyntheticRtspRun, dict[str, str]]: - """Start MediaMTX and one ffmpeg per stream. Returns (run, url_by_camera).""" - if not streams: - raise ValueError("no streams") - if not shutil.which("ffmpeg"): - raise RuntimeError("ffmpeg not in PATH") - if not shutil.which("docker"): - raise RuntimeError("docker not in PATH (required to run MediaMTX)") - - self.stop_active() - - for s in streams: - if not s.file_path.is_file(): - raise FileNotFoundError(str(s.file_path)) - for ch in s.rtsp_path: - if ch not in "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_.-": - raise ValueError(f"invalid RTSP path segment: {s.rtsp_path!r}") - - container = CONTAINER_NAME_PREFIX + uuid.uuid4().hex[:12] - cmd = [ - "docker", - "run", - "-d", - "--name", - container, - "-p", - f"127.0.0.1:{host_port}:8554", - MEDIAMTX_IMAGE, - ] - r = subprocess.run(cmd, capture_output=True, text=True, timeout=120) - if r.returncode != 0: - try: - subprocess.run( - ["docker", "rm", "-f", container], - capture_output=True, - text=True, - timeout=30, - check=False, - ) - except (OSError, subprocess.SubprocessError) as exc: - logger.debug("docker rm failed container cleanup: {}", exc) - err = (r.stderr or r.stdout or "").strip() - raise RuntimeError(f"MediaMTX docker failed: {err}") - - run = SyntheticRtspRun(container_name=container) - url_map: dict[str, str] = {} - time.sleep(0.5) - _wait_tcp_listening("127.0.0.1", host_port, total_timeout=_MEDIAMTX_TCP_READY_SEC) - - run.work_dir = work_dir - try: - for s in streams: - dest = f"rtsp://127.0.0.1:{host_port}/{s.rtsp_path}" - url_map[s.camera_id] = dest - pub = [ - "ffmpeg", - "-hide_banner", - "-loglevel", - "warning", - "-re", - "-i", - str(s.file_path), - "-c", - "copy", - "-f", - "rtsp", - "-rtsp_transport", - "tcp", - dest, - ] - p = subprocess.Popen(pub) # noqa: S603 - run.procs.append(p) - except Exception: - run.stop() - raise - - # 给 ffmpeg 一点时间连上 MediaMTX,减少首帧前 OpenCV 连上却 DESCRIBE 失败 - time.sleep(0.4) - - SyntheticRtspManager._active = run - return run, url_map diff --git a/backend/app/services/video/__init__.py b/backend/app/services/video/__init__.py index 53de40f..0fbfd8b 100644 --- a/backend/app/services/video/__init__.py +++ b/backend/app/services/video/__init__.py @@ -1,5 +1,3 @@ """Video capture backends: RTSP (OpenCV) and optional Hikvision HCNetSDK (Linux .so).""" -from app.services.video.session_manager import CameraSessionManager - -__all__ = ["CameraSessionManager"] +__all__: list[str] = [] diff --git a/backend/app/services/video/recording_camera_policy.py b/backend/app/services/video/recording_camera_policy.py new file mode 100644 index 0000000..f6375c3 --- /dev/null +++ b/backend/app/services/video/recording_camera_policy.py @@ -0,0 +1,49 @@ +"""Resolve which camera_ids to record for RTSP slice + batch pipeline.""" + +from __future__ import annotations + +from loguru import logger + +from app.config import Settings +from app.services.video.backend_resolver import BackendResolver + + +def resolve_recording_cameras( + requested: list[str], + settings: Settings, + *, + resolver: BackendResolver, +) -> list[str]: + """Return camera_ids that should receive RTSP segment recorders. + + Default (``RTSP_RECORD_ALL_CAMERAS=false``): only ``RTSP_PRIMARY_CAMERA_ID`` + when present in the request, otherwise the first requested id. + When all-cameras mode is enabled: every requested id with a resolvable RTSP URL. + """ + cleaned = [str(c).strip() for c in requested if str(c).strip()] + if not cleaned: + raise ValueError("camera_ids must not be empty") + + primary = (settings.rtsp_primary_camera_id or "").strip() + if not settings.rtsp_record_all_cameras: + if primary: + if primary not in cleaned: + logger.info( + "RTSP recording: primary {} not in request {}; recording primary only", + primary, + cleaned, + ) + return [primary] + return [cleaned[0]] + + out: list[str] = [] + for cam in cleaned: + try: + resolver.rtsp_url_for_camera(cam) + except ValueError as exc: + logger.warning("skip camera {} (no RTSP URL): {}", cam, exc) + continue + out.append(cam) + if not out: + raise ValueError("no requested camera_id has a resolvable RTSP URL") + return out diff --git a/backend/app/services/video/rtsp_segment_cleanup.py b/backend/app/services/video/rtsp_segment_cleanup.py new file mode 100644 index 0000000..0cec16e --- /dev/null +++ b/backend/app/services/video/rtsp_segment_cleanup.py @@ -0,0 +1,69 @@ +"""TTL cleanup for RTSP slice MP4 files under ``logs/rtsp_segments/``.""" + +from __future__ import annotations + +import shutil +import time +from pathlib import Path + +from loguru import logger + + +def default_rtsp_segments_root(*, base_dir: Path | None = None) -> Path: + return (base_dir or Path("logs")) / "rtsp_segments" + + +def _safe_rmtree(path: Path) -> None: + if not path.exists(): + return + try: + shutil.rmtree(path) + except OSError as exc: + logger.warning("failed to remove {}: {}", path, exc) + + +def _prune_empty_parents(path: Path, *, stop_at: Path) -> None: + current = path + stop_at = stop_at.resolve() + while current != stop_at and current.is_dir(): + try: + next(current.iterdir()) + except StopIteration: + parent = current.parent + _safe_rmtree(current) + current = parent + continue + except OSError: + break + break + + +def purge_expired_rtsp_segments( + segments_root: Path, + *, + ttl_hours: float = 24.0, +) -> int: + """Delete ``slice_*.mp4`` under *segments_root* older than *ttl_hours* (by mtime).""" + + root = segments_root.expanduser().resolve() + if not root.is_dir(): + return 0 + + cutoff = time.time() - float(ttl_hours) * 3600.0 + removed = 0 + for mp4 in root.rglob("slice_*.mp4"): + if not mp4.is_file(): + continue + try: + if mp4.stat().st_mtime >= cutoff: + continue + except OSError: + continue + mp4.unlink(missing_ok=True) + removed += 1 + logger.info("purged expired RTSP segment {}", mp4) + _prune_empty_parents(mp4.parent, stop_at=root) + + if removed: + logger.info("RTSP segment TTL sweep removed {} file(s)", removed) + return removed diff --git a/backend/app/services/video/rtsp_segment_recorder.py b/backend/app/services/video/rtsp_segment_recorder.py new file mode 100644 index 0000000..325255b --- /dev/null +++ b/backend/app/services/video/rtsp_segment_recorder.py @@ -0,0 +1,277 @@ +"""RTSP → MP4 segment recorder using ffmpeg (Chain 1 slice pipeline).""" + +from __future__ import annotations + +import asyncio +import os +import shutil +import time +from collections.abc import Awaitable, Callable +from dataclasses import dataclass +from pathlib import Path + +from loguru import logger + +SegmentCallback = Callable[ + ["SegmentCompleteEvent"], + Awaitable[None], +] + + +@dataclass(frozen=True) +class SegmentCompleteEvent: + surgery_id: str + camera_id: str + path: Path + slice_index: int + slice_offset_sec: float + duration_sec: float + + +def rtsp_segments_dir(surgery_id: str, *, base_dir: Path | None = None) -> Path: + root = base_dir or Path("logs") / "rtsp_segments" + return root / _safe_name(surgery_id) + + +def _safe_name(value: str) -> str: + out = "".join(ch if ch.isalnum() or ch in "._-" else "_" for ch in value.strip()) + return out[:96] or "unknown" + + +def _ffmpeg_bin() -> str: + return shutil.which("ffmpeg") or "ffmpeg" + + +def _build_ffmpeg_cmd( + *, + rtsp_url: str, + output_path: Path, + duration_sec: float, +) -> list[str]: + opts = os.environ.get("OPENCV_FFMPEG_CAPTURE_OPTIONS", "rtsp_transport;tcp") + transport = "tcp" + if "rtsp_transport" in opts: + for part in opts.split(";"): + if part.startswith("rtsp_transport"): + transport = part.split(":", 1)[-1].strip() or "tcp" + break + return [ + _ffmpeg_bin(), + "-hide_banner", + "-loglevel", + "warning", + "-rtsp_transport", + transport, + "-stimeout", + "5000000", + "-i", + rtsp_url, + "-t", + str(max(1.0, duration_sec)), + "-c", + "copy", + "-movflags", + "+faststart", + "-y", + str(output_path), + ] + + +class RtspSegmentRecorder: + """Record one RTSP URL into fixed-duration MP4 slices until ``stop_event``.""" + + def __init__( + self, + *, + surgery_id: str, + camera_id: str, + rtsp_url: str, + output_dir: Path, + segment_duration_sec: float, + segment_min_sec: float, + on_segment_complete: SegmentCallback, + ready_event: asyncio.Event | None = None, + ) -> None: + self._surgery_id = surgery_id + self._camera_id = camera_id + self._rtsp_url = rtsp_url + self._output_dir = output_dir + self._segment_duration_sec = float(segment_duration_sec) + self._segment_min_sec = float(segment_min_sec) + self._on_segment_complete = on_segment_complete + self._ready_event = ready_event + self._slice_index = 0 + self._offset_sec = 0.0 + + @property + def camera_id(self) -> str: + return self._camera_id + + async def run(self, stop_event: asyncio.Event) -> None: + self._output_dir.mkdir(parents=True, exist_ok=True) + first_segment = True + while not stop_event.is_set(): + output_path = self._output_dir / f"slice_{self._slice_index:04d}.mp4" + duration = self._segment_duration_sec + if stop_event.is_set(): + break + proc, stderr_task = await self._start_ffmpeg(output_path, duration) + try: + await self._wait_ffmpeg( + proc, + stop_event, + timeout=duration + 30.0, + stderr_task=stderr_task, + ) + finally: + if proc.returncode is None: + proc.terminate() + try: + await asyncio.wait_for(proc.wait(), timeout=8.0) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + if not stderr_task.done(): + stderr_task.cancel() + try: + await stderr_task + except asyncio.CancelledError: + pass + + if first_segment and self._ready_event is not None and not self._ready_event.is_set(): + self._ready_event.set() + + actual_duration = await self._probe_duration(output_path) + if actual_duration >= self._segment_min_sec: + await self._emit_segment(output_path, actual_duration) + elif output_path.is_file(): + output_path.unlink(missing_ok=True) + + first_segment = False + if stop_event.is_set(): + break + + async def _start_ffmpeg( + self, + output_path: Path, + duration_sec: float, + ) -> tuple[asyncio.subprocess.Process, asyncio.Task[None]]: + cmd = _build_ffmpeg_cmd( + rtsp_url=self._rtsp_url, + output_path=output_path, + duration_sec=duration_sec, + ) + logger.info( + "RTSP recorder start surgery={} camera={} slice={} cmd={}", + self._surgery_id, + self._camera_id, + self._slice_index, + " ".join(cmd[:8]), + ) + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.PIPE, + ) + stderr_task = asyncio.create_task(self._watch_stderr(proc, output_path)) + return proc, stderr_task + + async def _watch_stderr( + self, + proc: asyncio.subprocess.Process, + output_path: Path, + ) -> None: + if proc.stderr is None: + return + try: + while True: + line = await proc.stderr.readline() + if not line: + break + text = line.decode("utf-8", errors="replace").strip() + if not text: + continue + if "error" in text.lower() or "failed" in text.lower(): + logger.warning( + "ffmpeg surgery={} camera={} slice={}: {}", + self._surgery_id, + self._camera_id, + self._slice_index, + text, + ) + if self._ready_event is not None and not self._ready_event.is_set(): + if output_path.is_file() and output_path.stat().st_size > 0: + self._ready_event.set() + elif "Input #" in text or "Stream mapping" in text: + self._ready_event.set() + except asyncio.CancelledError: + pass + + async def _wait_ffmpeg( + self, + proc: asyncio.subprocess.Process, + stop_event: asyncio.Event, + *, + timeout: float, + stderr_task: asyncio.Task[None], + ) -> None: + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + if proc.returncode is not None: + return + if stop_event.is_set(): + return + await asyncio.sleep(0.25) + logger.warning( + "ffmpeg timeout surgery={} camera={} slice={}", + self._surgery_id, + self._camera_id, + self._slice_index, + ) + + async def _probe_duration(self, path: Path) -> float: + if not path.is_file() or path.stat().st_size <= 0: + return 0.0 + ffprobe = shutil.which("ffprobe") + if not ffprobe: + return self._segment_duration_sec + proc = await asyncio.create_subprocess_exec( + ffprobe, + "-v", + "error", + "-show_entries", + "format=duration", + "-of", + "default=noprint_wrappers=1:nokey=1", + str(path), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL, + ) + stdout, _ = await proc.communicate() + if proc.returncode != 0: + return self._segment_duration_sec + try: + return max(0.0, float(stdout.decode().strip())) + except ValueError: + return self._segment_duration_sec + + async def _emit_segment(self, path: Path, duration_sec: float) -> None: + event = SegmentCompleteEvent( + surgery_id=self._surgery_id, + camera_id=self._camera_id, + path=path, + slice_index=self._slice_index, + slice_offset_sec=self._offset_sec, + duration_sec=duration_sec, + ) + logger.info( + "RTSP segment complete surgery={} camera={} slice={} path={} dur={:.1f}s", + self._surgery_id, + self._camera_id, + self._slice_index, + path, + duration_sec, + ) + await self._on_segment_complete(event) + self._offset_sec += duration_sec + self._slice_index += 1 diff --git a/backend/app/services/video/session_manager.py b/backend/app/services/video/session_manager.py index 5f2d79f..b3ab7cf 100644 --- a/backend/app/services/video/session_manager.py +++ b/backend/app/services/video/session_manager.py @@ -2,43 +2,47 @@ from __future__ import annotations import asyncio import json -import sys import time -from datetime import datetime, timezone from pathlib import Path from typing import Literal from loguru import logger from sqlalchemy.ext.asyncio import async_sessionmaker -from app.algorithm_ipc.schema import WhitelistSpec, events_path_for_surgery, parse_event_obj -from app.baked import algorithm as ba +from app.algorithm_ipc.schema import WhitelistSpec +from app.algo_host.batch_service import BatchAlgorithmService from app.baked import pipeline as bp from app.config import Settings from app.consumable_catalog import build_name_mapping, effective_candidate_consumables from app.database import AsyncSessionLocal from app.domain.consumption import SurgeryConsumptionStored -from app.domain.vision_prediction import ClsTop3, PredictionCandidate from app.repositories.surgery_results import SurgeryResultRepository from app.schemas import SurgeryConsumptionDetail, build_consumption_summary from app.services.consumption_tsv_log import ( append_consumption_log_summary, - append_consumption_pending_window, - append_consumption_window, init_consumption_log_file, print_consumption_summary_markdown, ) from app.services.video.archive_persister import ArchivePersister from app.services.video.backend_resolver import BackendResolver from app.services.video.hikvision_runtime import HikvisionInitRefCount, HikvisionRuntime +from app.services.video.recording_camera_policy import resolve_recording_cameras +from app.services.video.rtsp_segment_cleanup import ( + default_rtsp_segments_root, + purge_expired_rtsp_segments, +) +from app.services.video.rtsp_segment_recorder import ( + RtspSegmentRecorder, + SegmentCompleteEvent, + rtsp_segments_dir, +) from app.services.video.session_registry import ( PendingConsumableConfirmation, RunningSurgery, SurgerySessionRegistry, SurgerySessionState, - format_elapsed_mmss_since, - pending_display_item_name_for_confirmation, ) +from app.services.video.slice_batch_processor import SliceBatchProcessor from app.services.video.types import VideoBackendKind from app.services.voice_file_log import init_voice_log_file from app.services.voice_terminal_hub import VoiceTerminalHub @@ -60,13 +64,14 @@ def _safe_log_name(value: str) -> str: class CameraSessionManager: - """手术会话:启动算法子进程(RTSP)+ JSONL tail 驱动内存明细与待确认队列。""" + """手术会话:RTSP 录像切片 + batch 子进程(5.15/main.py)驱动明细与待确认队列。""" def __init__( self, *, settings: Settings, hikvision_runtime: HikvisionRuntime | None, + slice_batch_processor: SliceBatchProcessor | None = None, result_repository: SurgeryResultRepository | None = None, session_factory: async_sessionmaker | None = None, registry: SurgerySessionRegistry | None = None, @@ -77,14 +82,41 @@ class CameraSessionManager: self._session_factory: async_sessionmaker = session_factory or AsyncSessionLocal self._resolver = BackendResolver(settings, hikvision_runtime=hikvision_runtime) self._registry = registry or SurgerySessionRegistry() + self._slice_batch = slice_batch_processor or SliceBatchProcessor( + batch_service=BatchAlgorithmService(), + registry=self._registry, + max_concurrent=int(settings.rtsp_slice_batch_max_concurrent), + drain_timeout_sec=float(settings.rtsp_slice_batch_drain_timeout_sec), + ) self._archive = archive_persister or ArchivePersister( repository=result_repository, session_factory=self._session_factory, ) self._voice_hub: VoiceTerminalHub | None = None + self._rtsp_ttl_stop = asyncio.Event() + self._rtsp_ttl_task: asyncio.Task[None] | None = None + + async def _purge_expired_rtsp_segments(self) -> None: + ttl = float(self._s.rtsp_segment_ttl_hours or bp.RTSP_SEGMENT_TTL_HOURS) + root = default_rtsp_segments_root(base_dir=LOGS_DIR) + await asyncio.to_thread(purge_expired_rtsp_segments, root, ttl_hours=ttl) + + async def _rtsp_segment_ttl_loop(self) -> None: + interval = max(3600.0, float(bp.RTSP_SEGMENT_TTL_HOURS) * 3600.0 / 4.0) + while not self._rtsp_ttl_stop.is_set(): + try: + await asyncio.wait_for(self._rtsp_ttl_stop.wait(), timeout=interval) + break + except TimeoutError: + pass + try: + await self._purge_expired_rtsp_segments() + except Exception as exc: + logger.warning("RTSP segment TTL sweep failed: {}", exc) def set_voice_terminal_hub(self, hub: VoiceTerminalHub | None) -> None: self._voice_hub = hub + self._slice_batch.set_voice_terminal_hub(hub) def get_voice_terminal_id_if_active(self, surgery_id: str) -> str | None: run = self._registry.get_running(surgery_id) @@ -96,8 +128,23 @@ class CameraSessionManager: async def start_archive_retry_loop(self) -> None: await self._archive.recover_from_durable_fallback() await self._archive.start_retry_loop() + await self._purge_expired_rtsp_segments() + if self._rtsp_ttl_task is None or self._rtsp_ttl_task.done(): + self._rtsp_ttl_stop.clear() + self._rtsp_ttl_task = asyncio.create_task( + self._rtsp_segment_ttl_loop(), + name="rtsp_segment_ttl", + ) async def shutdown(self) -> None: + self._rtsp_ttl_stop.set() + if self._rtsp_ttl_task is not None: + self._rtsp_ttl_task.cancel() + try: + await self._rtsp_ttl_task + except asyncio.CancelledError: + pass + self._rtsp_ttl_task = None await self._archive.shutdown() ids = self._registry.active_ids() for sid in ids: @@ -158,278 +205,103 @@ class CameraSessionManager: surgery_started_wall=time.time(), ) stop_event = asyncio.Event() - events_path = events_path_for_surgery(surgery_id, base_dir=LOGS_DIR) - events_path.parent.mkdir(parents=True, exist_ok=True) - events_path.write_text("", encoding="utf-8") wl = WhitelistSpec.from_session(resolved, name_to_code) wp = self._whitelist_path(surgery_id) + wp.parent.mkdir(parents=True, exist_ok=True) wp.write_text( json.dumps(wl.to_json_obj(), ensure_ascii=False, indent=2), encoding="utf-8", ) - primary_cfg = (ba.ACTIONFORMER_PRIMARY_CAMERA_ID or "").strip() - if primary_cfg: - primary_cam = primary_cfg - else: - primary_cam = (camera_ids[0] if camera_ids else "").strip() or "cam01" - kind = self._resolver.backend_for_camera(primary_cam) - url, hik_uid, hik_retained = await self._resolve_rtsp_url(camera_id=primary_cam, kind=kind) + try: + record_cameras = resolve_recording_cameras( + camera_ids, + self._s, + resolver=self._resolver, + ) + except ValueError as exc: + raise SurgeryPipelineError( + "RECORDING_CANNOT_START", + str(exc), + ) from exc - cmd = [ - sys.executable, - "-m", - "app.algorithm_runner", - "--source", - url, - "--whitelist-json", - str(wp.resolve()), - "--events-jsonl", - str(events_path.resolve()), - "--wall-anchor", - str(state.surgery_started_wall or time.time()), - "--surgery-id", - surgery_id, - "--camera-id", - primary_cam, - "--source-mode", - "realtime", - ] - proc = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.DEVNULL, - stderr=asyncio.subprocess.DEVNULL, - ) + primary_cam = record_cameras[0] + ready_event = asyncio.Event() + hik_logouts: list[tuple[int, bool]] = [] - tail_task = asyncio.create_task( - self._tail_algo_events(surgery_id, events_path, state, stop_event), - name=f"algo_tail:{surgery_id}", - ) - run = RunningSurgery(stop_event=stop_event, state=state, tasks=[tail_task], algo_process=proc) + async def on_segment(event: SegmentCompleteEvent) -> None: + await self._slice_batch.submit_slice( + surgery_id=surgery_id, + event=event, + candidate_consumables=list(resolved), + state=state, + ) + + recorder_tasks: list[asyncio.Task[None]] = [] + for cam in record_cameras: + kind = self._resolver.backend_for_camera(cam) + url, hik_uid, hik_retained = await self._resolve_rtsp_url(camera_id=cam, kind=kind) + if hik_uid is not None: + hik_logouts.append((hik_uid, hik_retained)) + out_dir = rtsp_segments_dir(surgery_id) / _safe_log_name(cam) + recorder = RtspSegmentRecorder( + surgery_id=surgery_id, + camera_id=cam, + rtsp_url=url, + output_dir=out_dir, + segment_duration_sec=self._s.rtsp_segment_duration_sec, + segment_min_sec=self._s.rtsp_segment_min_sec, + on_segment_complete=on_segment, + ready_event=ready_event if cam == primary_cam else None, + ) + recorder_tasks.append( + asyncio.create_task( + recorder.run(stop_event), + name=f"rtsp_recorder:{surgery_id}:{cam}", + ) + ) + + self._slice_batch.ensure_worker(surgery_id) + run = RunningSurgery(stop_event=stop_event, state=state, tasks=recorder_tasks, algo_process=None) init_consumption_log_file(surgery_id) init_voice_log_file(surgery_id) await self._registry.register(surgery_id, run) open_timeout = float(self._s.video_open_timeout_sec) + 5.0 try: - await asyncio.wait_for(self._wait_algo_ready(events_path), timeout=open_timeout) + await asyncio.wait_for(ready_event.wait(), timeout=open_timeout) state.ready.set() except TimeoutError as exc: - logger.error("Surgery {} algorithm not ready within {}s", surgery_id, open_timeout) - await self._force_stop_subprocess(run, surgery_id) - tail_task.cancel() - try: - await tail_task - except asyncio.CancelledError: - pass + logger.error( + "Surgery {} RTSP recorder not ready within {}s", + surgery_id, + open_timeout, + ) + await self._force_stop_run(run, surgery_id) await self._registry.unregister(surgery_id) raise SurgeryPipelineError( "RECORDING_CANNOT_START", - "开录未能确认:算法子进程在超时内未就绪(未收到 ready 事件)。", + "开录未能确认:RTSP 录像在超时内未就绪。", ) from exc except Exception: - await self._force_stop_subprocess(run, surgery_id) - tail_task.cancel() - try: - await tail_task - except asyncio.CancelledError: - pass + await self._force_stop_run(run, surgery_id) await self._registry.unregister(surgery_id) raise finally: - if hik_uid is not None and self._hik is not None: - await asyncio.to_thread(self._hik.logout, hik_uid) - if hik_retained and self._hik is not None: - HikvisionInitRefCount.release(self._hik) + for hik_uid, hik_retained in hik_logouts: + if self._hik is not None: + await asyncio.to_thread(self._hik.logout, hik_uid) + if hik_retained and self._hik is not None: + HikvisionInitRefCount.release(self._hik) - async def _wait_algo_ready(self, events_path: Path) -> None: - deadline = time.monotonic() + float(self._s.video_open_timeout_sec) + 5.0 - while time.monotonic() < deadline: - if events_path.is_file() and events_path.stat().st_size > 0: - try: - text = events_path.read_text(encoding="utf-8") - except OSError: - text = "" - for line in text.splitlines(): - ev = parse_event_obj(line) - if ev and ev.get("type") == "ready": - return - if ev and ev.get("type") == "error": - raise SurgeryPipelineError( - "RECORDING_CANNOT_START", - f"算法子进程错误: {ev.get('message', ev)!s}", - ) - await asyncio.sleep(0.05) - raise TimeoutError("algo ready") - - async def _tail_algo_events( - self, - surgery_id: str, - path: Path, - state: SurgerySessionState, - stop_event: asyncio.Event, - ) -> None: - offset = 0 - try: - while not stop_event.is_set(): - await asyncio.sleep(0.12) - if not path.is_file(): - continue - try: - data = path.read_bytes() - except OSError: - continue - if len(data) < offset: - offset = 0 - if len(data) <= offset: - continue - chunk = data[offset:].decode("utf-8", errors="replace") - offset = len(data) - for line in chunk.splitlines(): - ev = parse_event_obj(line) - if ev: - await self._apply_algo_event(surgery_id, state, ev) - except asyncio.CancelledError: - pass - finally: - if path.is_file(): - try: - data = path.read_bytes() - if len(data) > offset: - chunk = data[offset:].decode("utf-8", errors="replace") - for line in chunk.splitlines(): - ev = parse_event_obj(line) - if ev: - await self._apply_algo_event(surgery_id, state, ev) - except Exception: - logger.debug("final algo tail drain skipped") - - async def _apply_algo_event(self, surgery_id: str, state: SurgerySessionState, ev: dict) -> None: - t = ev.get("type") - if t == "ready" or t == "done": - return - if t == "error": - logger.warning("algo subprocess surgery={} error={}", surgery_id, ev.get("message")) - return - if t == "segment_confirmed": - if ev.get("frozen", True) is False: - logger.debug("skip provisional segment_confirmed surgery={}", surgery_id) - return - wall_hi = float(ev.get("wall_end_epoch") or time.time()) - ts = datetime.fromtimestamp(wall_hi, tz=timezone.utc) - await self._registry.append_confirmed_detail( - state=state, - item_id=str(ev.get("item_id") or "unknown"), - item_name=str(ev.get("item_name") or "unknown"), - doctor_id=bp.VIDEO_RESULT_DOCTOR_ID, - source="algo_subprocess", - cooldown_key=str(ev.get("cooldown_key") or "") or None, - detail_timestamp=ts, - ) - if bp.CONSUMPTION_TSV_LOG_ENABLED or bp.CONSUMPTION_LOG_MARKDOWN_TERMINAL: - snap = ClsTop3( - t1_name=str(ev.get("item_name") or ""), - t1_conf=float(ev.get("top1_conf") or 0.0), - t2_name=str(ev.get("top2_name") or ""), - t2_conf=float(ev.get("top2_conf") or 0.0), - t3_name=str(ev.get("top3_name") or ""), - t3_conf=float(ev.get("top3_conf") or 0.0), - t1_pid="", - t2_pid="", - t3_pid="", - ) - append_consumption_window( - surgery_id=surgery_id, - name_to_code=state.name_to_code, - best=snap, - doctor_id=bp.VIDEO_RESULT_DOCTOR_ID, - camera_id=str(ev.get("camera_id") or ""), - wall_start_epoch=float(ev.get("wall_start_epoch") or wall_hi), - wall_end_epoch=wall_hi, - since_recording_start=format_elapsed_mmss_since( - state.surgery_started_wall, - at_epoch=wall_hi, - ), - ) - return - if t == "needs_voice_confirm": - if ev.get("frozen", True) is False: - logger.debug("skip provisional needs_voice_confirm surgery={}", surgery_id) - return - opts_raw = ev.get("options") or [] - ranked: list[PredictionCandidate] = [] - if isinstance(opts_raw, list): - for o in opts_raw: - if isinstance(o, dict): - ranked.append( - PredictionCandidate( - str(o.get("label") or "").strip(), - float(o.get("confidence") or 0.0), - ) - ) - cid_in = str(ev.get("confirmation_id") or "").strip() - ret = await self._registry.enqueue_pending_confirmation( - state, - ranked, - top_key=str(ev.get("model_top1_label") or ""), - top_confidence=float(ev.get("model_top1_confidence") or 0.0), - confirmation_id=cid_in or None, - ) - if ret is None: - return - cls = ev.get("cls_top3") if isinstance(ev.get("cls_top3"), dict) else {} - snap = ClsTop3( - t1_name=str(cls.get("t1_name") or ""), - t1_conf=float(cls.get("t1_conf") or 0.0), - t2_name=str(cls.get("t2_name") or ""), - t2_conf=float(cls.get("t2_conf") or 0.0), - t3_name=str(cls.get("t3_name") or ""), - t3_conf=float(cls.get("t3_conf") or 0.0), - t1_pid=str(cls.get("t1_pid") or ""), - t2_pid=str(cls.get("t2_pid") or ""), - t3_pid=str(cls.get("t3_pid") or ""), - ) - wall_lo = float(ev.get("wall_start_epoch") or time.time()) - wall_hi = float(ev.get("wall_end_epoch") or time.time()) - cam = str(ev.get("camera_id") or "") - await self._registry.append_pending_consumption_detail( - state=state, - confirmation_id=ret, - doctor_id=bp.VIDEO_RESULT_DOCTOR_ID, - ) - pd_name = pending_display_item_name_for_confirmation(state.pending_by_id.get(ret)) - append_consumption_pending_window( - surgery_id=surgery_id, - confirmation_id=ret, - model_snap=snap, - doctor_id=bp.VIDEO_RESULT_DOCTOR_ID, - camera_id=cam, - wall_start_epoch=wall_lo, - wall_end_epoch=wall_hi, - item_display_name=pd_name, - since_recording_start=format_elapsed_mmss_since( - state.surgery_started_wall, - at_epoch=wall_hi, - ), - ) - hub = self._voice_hub - vtid = (state.voice_terminal_id or "").strip() - if hub is not None and vtid: - hub.schedule_notify_pending_head(vtid, surgery_id) - - async def _force_stop_subprocess(self, run: RunningSurgery, surgery_id: str) -> None: + async def _force_stop_run(self, run: RunningSurgery, surgery_id: str) -> None: run.stop_event.set() - proc = run.algo_process - if proc is None: - return - try: - proc.terminate() - await asyncio.wait_for(proc.wait(), timeout=12.0) - except asyncio.TimeoutError: - proc.kill() - except Exception as exc: - logger.warning("terminate algo subprocess surgery={}: {}", surgery_id, exc) + await self._slice_batch.drain(surgery_id, timeout=30.0) + for task in run.tasks: + task.cancel() + if run.tasks: + await asyncio.gather(*run.tasks, return_exceptions=True) def set_voice_terminal_id(self, surgery_id: str, terminal_id: str | None) -> None: run = self._registry.get_running(surgery_id) @@ -450,21 +322,16 @@ class CameraSessionManager: voice_tid = run.state.voice_terminal_id run.stop_event.set() - if run.algo_process is not None: - try: - run.algo_process.terminate() - await asyncio.wait_for(run.algo_process.wait(), timeout=20.0) - except asyncio.TimeoutError: - run.algo_process.kill() - except ProcessLookupError: - pass - except Exception as exc: - logger.warning("algo subprocess wait surgery={}: {}", surgery_id, exc) results = await asyncio.gather(*run.tasks, return_exceptions=True) for res in results: if isinstance(res, BaseException): - logger.warning("surgery task finished with error: {}", res) + logger.warning("surgery recorder task finished with error: {}", res) + + await self._slice_batch.drain( + surgery_id, + timeout=self._s.rtsp_slice_batch_drain_timeout_sec, + ) details = list(run.state.details) detail_rows = [ diff --git a/backend/app/services/video/slice_batch_processor.py b/backend/app/services/video/slice_batch_processor.py new file mode 100644 index 0000000..b1ab41c --- /dev/null +++ b/backend/app/services/video/slice_batch_processor.py @@ -0,0 +1,164 @@ +"""Queue RTSP MP4 slices through BatchAlgorithmService during live surgeries.""" + +from __future__ import annotations + +import asyncio +import time +from dataclasses import dataclass +from pathlib import Path + +from loguru import logger + +from app.algo_host.batch_service import BatchAlgorithmService +from app.services.video.rtsp_segment_recorder import SegmentCompleteEvent +from app.services.video.session_registry import SurgerySessionRegistry, SurgerySessionState +from app.services.voice_terminal_hub import VoiceTerminalHub + + +@dataclass(frozen=True) +class _SliceJob: + event: SegmentCompleteEvent + candidate_consumables: list[str] + state: SurgerySessionState + + +class SliceBatchProcessor: + def __init__( + self, + *, + batch_service: BatchAlgorithmService, + registry: SurgerySessionRegistry, + max_concurrent: int = 1, + drain_timeout_sec: float = 900.0, + ) -> None: + self._batch = batch_service + self._registry = registry + self._max_concurrent = max(1, int(max_concurrent)) + self._drain_timeout_sec = float(drain_timeout_sec) + self._queues: dict[str, asyncio.Queue[_SliceJob | None]] = {} + self._workers: dict[str, asyncio.Task[None]] = {} + self._pending: dict[str, int] = {} + self._voice_hub: VoiceTerminalHub | None = None + + def set_voice_terminal_hub(self, hub: VoiceTerminalHub | None) -> None: + self._voice_hub = hub + + def ensure_worker(self, surgery_id: str) -> None: + if surgery_id in self._workers and not self._workers[surgery_id].done(): + return + queue: asyncio.Queue[_SliceJob | None] = asyncio.Queue() + self._queues[surgery_id] = queue + self._pending[surgery_id] = 0 + self._workers[surgery_id] = asyncio.create_task( + self._worker_loop(surgery_id, queue), + name=f"slice_batch:{surgery_id}", + ) + + async def submit_slice( + self, + *, + surgery_id: str, + event: SegmentCompleteEvent, + candidate_consumables: list[str], + state: SurgerySessionState, + ) -> None: + self.ensure_worker(surgery_id) + queue = self._queues[surgery_id] + self._pending[surgery_id] = self._pending.get(surgery_id, 0) + 1 + await queue.put( + _SliceJob( + event=event, + candidate_consumables=list(candidate_consumables), + state=state, + ) + ) + + async def drain(self, surgery_id: str, *, timeout: float | None = None) -> None: + if surgery_id not in self._queues: + return + deadline = time.monotonic() + (timeout if timeout is not None else self._drain_timeout_sec) + while self._pending.get(surgery_id, 0) > 0: + if time.monotonic() >= deadline: + logger.warning( + "slice batch drain timeout surgery={} pending={}", + surgery_id, + self._pending.get(surgery_id, 0), + ) + break + await asyncio.sleep(0.2) + queue = self._queues.get(surgery_id) + if queue is not None: + await queue.put(None) + worker = self._workers.get(surgery_id) + if worker is not None: + try: + await asyncio.wait_for(worker, timeout=30.0) + except asyncio.TimeoutError: + worker.cancel() + self._queues.pop(surgery_id, None) + self._workers.pop(surgery_id, None) + self._pending.pop(surgery_id, None) + + async def _worker_loop(self, surgery_id: str, queue: asyncio.Queue[_SliceJob | None]) -> None: + sem = asyncio.Semaphore(self._max_concurrent) + while True: + job = await queue.get() + try: + if job is None: + return + async with sem: + await self._process_job(job) + finally: + queue.task_done() + if job is not None: + self._pending[surgery_id] = max(0, self._pending.get(surgery_id, 1) - 1) + + async def _process_job(self, job: _SliceJob) -> None: + event = job.event + mp4 = event.path + if not mp4.is_file() or mp4.stat().st_size <= 0: + logger.warning("skip empty slice surgery={} path={}", event.surgery_id, mp4) + return + logger.info( + "slice batch start surgery={} camera={} slice={} path={}", + event.surgery_id, + event.camera_id, + event.slice_index, + mp4, + ) + try: + result = await asyncio.to_thread( + self._batch.run, + surgery_id=event.surgery_id, + uploaded_video_path=mp4, + original_filename=mp4.name, + candidate_consumables=job.candidate_consumables, + include_visualization=False, + ) + except Exception as exc: + logger.error( + "slice batch failed surgery={} camera={} slice={}: {}", + event.surgery_id, + event.camera_id, + event.slice_index, + exc, + ) + return + from app.algo_host.slice_result_integrator import integrate_batch_slice_tsv + + await integrate_batch_slice_tsv( + tsv_path=result.output_path, + state=job.state, + surgery_id=event.surgery_id, + camera_id=event.camera_id, + slice_offset_sec=event.slice_offset_sec, + surgery_started_wall=job.state.surgery_started_wall, + registry=self._registry, + voice_hub=self._voice_hub, + ) + logger.info( + "slice batch complete surgery={} camera={} slice={}", + event.surgery_id, + event.camera_id, + event.slice_index, + ) diff --git a/backend/docker-compose.yml b/backend/docker-compose.yml index bc65c2d..3f15195 100644 --- a/backend/docker-compose.yml +++ b/backend/docker-compose.yml @@ -113,8 +113,9 @@ services: DEMO_CORS_ENABLED: ${DEMO_CORS_ENABLED:-true} DEMO_CORS_ORIGINS: ${DEMO_CORS_ORIGINS:-*} DEMO_ORCHESTRATOR_ENABLED: ${DEMO_ORCHESTRATOR_ENABLED:-false} - DEMO_ORCHESTRATOR_RTSP_PORT: ${DEMO_ORCHESTRATOR_RTSP_PORT:-18554} - DEMO_ORCHESTRATOR_RTSP_JSON_HOST: ${DOCKER_DEMO_ORCHESTRATOR_RTSP_JSON_HOST:-host.docker.internal} + RTSP_PRIMARY_CAMERA_ID: ${RTSP_PRIMARY_CAMERA_ID:-or-cam-03} + RTSP_RECORD_ALL_CAMERAS: ${RTSP_RECORD_ALL_CAMERAS:-false} + RTSP_SEGMENT_DURATION_SEC: ${RTSP_SEGMENT_DURATION_SEC:-120} # api 在 Compose 桥接网内反代 mediamtx-hls:8888;宿主机调试 HLS 仍用 127.0.0.1:18888 DEMO_HLS_PREVIEW_UPSTREAM: ${DEMO_HLS_PREVIEW_UPSTREAM:-http://mediamtx-hls:8888} DEMO_HLS_PREVIEW_CONFIG_DIR: ${DEMO_HLS_PREVIEW_CONFIG_DIR:-/hls-preview-config} diff --git a/backend/main.py b/backend/main.py index 1acee31..f7599e0 100644 --- a/backend/main.py +++ b/backend/main.py @@ -50,6 +50,10 @@ async def lifespan(app: FastAPI): "Database connection verified; ensure schema is applied with `alembic upgrade head` before serving traffic" ) from app.baked import pipeline as bp + from app.services.video.rtsp_segment_cleanup import ( + default_rtsp_segments_root, + purge_expired_rtsp_segments, + ) from app.services.video_batch_cleanup import ( purge_expired_pipeline_inputs, purge_expired_visualizations, @@ -66,6 +70,10 @@ async def lifespan(app: FastAPI): batch_root, ttl_hours=float(bp.VIDEO_BATCH_PIPELINE_INPUT_TTL_HOURS), ) + purge_expired_rtsp_segments( + default_rtsp_segments_root(base_dir=repo_root / "logs"), + ttl_hours=float(bp.RTSP_SEGMENT_TTL_HOURS), + ) container = build_container(settings) app.state.container = container await container.start() @@ -104,15 +112,14 @@ def create_app() -> FastAPI: application.include_router(recording_demo.router) logger.info( - "Demo recording modes enabled: POST /internal/demo/simulated-start; " - "POST /internal/demo/offline-batch; flow={}", + "Demo offline batch enabled: POST /internal/demo/offline-batch; flow={}", recording_demo.OFFLINE_BATCH_FLOW_MARKER, ) else: logger.info( "Demo recording modes disabled (DEMO_ORCHESTRATOR_ENABLED=false): " "GET /internal/demo/recording-modes-status for status; " - "simulated-start and offline-batch are not registered", + "offline-batch is not registered", ) return application diff --git a/backend/tests/test_fastapi_algorithm_subprocess.py b/backend/tests/test_fastapi_algorithm_subprocess.py index 73aac19..3987e0e 100644 --- a/backend/tests/test_fastapi_algorithm_subprocess.py +++ b/backend/tests/test_fastapi_algorithm_subprocess.py @@ -2,16 +2,15 @@ 覆盖两条生产路径: 1. ``POST /internal/demo/offline-batch`` → ``BatchAlgorithmService`` → ``subprocess.run``(reference bundle ``main.py``) -2. ``POST /client/surgeries/start`` → ``CameraSessionManager`` → ``asyncio.create_subprocess_exec``(``python -m app.algorithm_runner``) +2. ``POST /client/surgeries/start`` → ``CameraSessionManager`` → ``RtspSegmentRecorder`` + ``SliceBatchProcessor`` """ from __future__ import annotations import json -import sys from pathlib import Path from typing import Any -from unittest.mock import MagicMock +from unittest.mock import AsyncMock, MagicMock import pytest import yaml @@ -23,7 +22,7 @@ from app.api import router as api_router from app.config import Settings from app.dependencies import build_container, get_surgery_pipeline, get_voice_terminal_hub from app.routers import recording_demo -from app.services.video.session_manager import CameraSessionManager +from app.services.video.rtsp_segment_recorder import RtspSegmentRecorder from app.algo_host.batch_service import BatchAlgorithmService from app.algo_host.subprocess_runner import build_batch_main_command from tests.reference_bundle_fixtures import complete_result_tsv_body, write_minimal_reference_bundle @@ -125,7 +124,7 @@ def test_video_batch_endpoint_invokes_reference_bundle_subprocess( @pytest.mark.asyncio -async def test_start_surgery_endpoint_spawns_algorithm_runner_subprocess( +async def test_start_surgery_endpoint_starts_rtsp_segment_recorders( monkeypatch: pytest.MonkeyPatch, sqlite_session_factory, tmp_path: Path, @@ -142,6 +141,7 @@ async def test_start_surgery_endpoint_spawns_algorithm_runner_subprocess( settings = Settings( video_rtsp_url_template="rtsp://lab/{camera_id}/live", video_open_timeout_sec=5.0, + rtsp_primary_camera_id="or-cam-03", ) container = build_container(settings, session_factory=sqlite_session_factory) @@ -153,39 +153,22 @@ async def test_start_surgery_endpoint_spawns_algorithm_runner_subprocess( ) -> tuple[str, int | None, bool]: return f"rtsp://unittest/{camera_id}/live", None, False - monkeypatch.setattr(CameraSessionManager, "_resolve_rtsp_url", _fake_resolve_rtsp) - - captured_cmds: list[list[str]] = [] - - async def fake_create_subprocess_exec(*cmd: str, **kwargs: Any): - captured_cmds.append(list(cmd)) - events_idx = list(cmd).index("--events-jsonl") + 1 - events_path = Path(cmd[events_idx]) - events_path.parent.mkdir(parents=True, exist_ok=True) - events_path.write_text( - json.dumps({"type": "ready", "camera_id": "cam1"}) + "\n", - encoding="utf-8", - ) - proc = MagicMock() - proc.returncode = 0 - proc.terminate = MagicMock() - proc.kill = MagicMock() - - async def _wait() -> int: - return 0 - - proc.wait = _wait - return proc - monkeypatch.setattr( - "app.services.video.session_manager.asyncio.create_subprocess_exec", - fake_create_subprocess_exec, + "app.services.video.session_manager.CameraSessionManager._resolve_rtsp_url", + _fake_resolve_rtsp, ) - async def _noop_tail(self, *args: Any, **kwargs: Any) -> None: - return None + recorder_starts: list[tuple[str, str]] = [] - monkeypatch.setattr(CameraSessionManager, "_tail_algo_events", _noop_tail) + async def fake_recorder_run(self: RtspSegmentRecorder, stop_event: Any) -> None: + recorder_starts.append((self._surgery_id, self._camera_id)) + if self._ready_event is not None: + self._ready_event.set() + await stop_event.wait() + + monkeypatch.setattr(RtspSegmentRecorder, "run", fake_recorder_run) + + container.camera_session_manager._slice_batch.drain = AsyncMock() async def _instant_sleep(_delay: float) -> None: return None @@ -212,7 +195,7 @@ async def test_start_surgery_endpoint_spawns_algorithm_runner_subprocess( "/client/surgeries/start", json={ "surgery_id": surgery_id, - "camera_ids": ["cam1"], + "camera_ids": ["cam1", "or-cam-03"], "candidate_consumables": ["纱布"], }, ) @@ -222,20 +205,12 @@ async def test_start_surgery_endpoint_spawns_algorithm_runner_subprocess( end = await client.post("/client/surgeries/end", json={"surgery_id": surgery_id}) assert end.status_code == 200, end.text - assert len(captured_cmds) == 1, "expected one algorithm_runner subprocess spawn" - cmd = captured_cmds[0] - assert cmd[0] == sys.executable - assert cmd[1:3] == ["-m", "app.algorithm_runner"] - assert cmd[cmd.index("--source") + 1] == "rtsp://unittest/or-cam-03/live" - assert cmd[cmd.index("--source-mode") + 1] == "realtime" - assert cmd[cmd.index("--surgery-id") + 1] == surgery_id - assert cmd[cmd.index("--camera-id") + 1] == "or-cam-03" + assert len(recorder_starts) == 1 + assert recorder_starts[0] == (surgery_id, "or-cam-03") - whitelist_path = Path(cmd[cmd.index("--whitelist-json") + 1]) + whitelist_path = tmp_path / "logs" / f"surgery_{surgery_id}_whitelist.json" assert whitelist_path.is_file() whitelist = json.loads(whitelist_path.read_text(encoding="utf-8")) assert whitelist["candidate_consumables"] == ["纱布"] - events_path = Path(cmd[cmd.index("--events-jsonl") + 1]) - assert events_path.is_file() - assert '"type": "ready"' in events_path.read_text(encoding="utf-8") + container.camera_session_manager._slice_batch.drain.assert_awaited() diff --git a/backend/tests/test_recording_camera_policy.py b/backend/tests/test_recording_camera_policy.py new file mode 100644 index 0000000..873e90e --- /dev/null +++ b/backend/tests/test_recording_camera_policy.py @@ -0,0 +1,49 @@ +"""Tests for RTSP recording camera policy.""" + +from __future__ import annotations + +import pytest + +from app.config import Settings +from app.services.video.backend_resolver import BackendResolver +from app.services.video.recording_camera_policy import resolve_recording_cameras + + +def test_default_records_primary_only() -> None: + settings = Settings( + video_rtsp_url_template="rtsp://lab/{camera_id}", + rtsp_primary_camera_id="or-cam-03", + rtsp_record_all_cameras=False, + ) + resolver = BackendResolver(settings, hikvision_runtime=None) + out = resolve_recording_cameras( + ["or-cam-01", "or-cam-03"], + settings, + resolver=resolver, + ) + assert out == ["or-cam-03"] + + +def test_record_all_resolves_each_requested_camera() -> None: + settings = Settings( + video_rtsp_url_template="rtsp://lab/{camera_id}", + rtsp_primary_camera_id="or-cam-03", + rtsp_record_all_cameras=True, + ) + resolver = BackendResolver(settings, hikvision_runtime=None) + out = resolve_recording_cameras( + ["or-cam-01", "or-cam-03"], + settings, + resolver=resolver, + ) + assert out == ["or-cam-01", "or-cam-03"] + + +def test_record_all_skips_unresolvable_camera() -> None: + settings = Settings( + video_rtsp_url_template="", + rtsp_record_all_cameras=True, + ) + resolver = BackendResolver(settings, hikvision_runtime=None) + with pytest.raises(ValueError, match="no requested camera_id"): + resolve_recording_cameras(["missing-cam"], settings, resolver=resolver) diff --git a/backend/tests/test_recording_modes_status.py b/backend/tests/test_recording_modes_status.py index aba4304..884f079 100644 --- a/backend/tests/test_recording_modes_status.py +++ b/backend/tests/test_recording_modes_status.py @@ -13,6 +13,9 @@ def test_recording_modes_status_paths() -> None: res = client.get("/internal/demo/recording-modes-status") assert res.status_code == 200 body = res.json() - assert body["simulated_start_path"] == "/internal/demo/simulated-start" assert body["offline_batch_path"] == "/internal/demo/offline-batch" assert "demo_recording_modes_enabled" in body + assert "simulated_start_path" not in body + assert body["rtsp_segment_ttl_hours"] == 24.0 + assert body["rtsp_segment_duration_sec"] == 120.0 + assert body["video_batch_vis_ttl_hours"] == 24.0 diff --git a/backend/tests/test_rtsp_segment_cleanup.py b/backend/tests/test_rtsp_segment_cleanup.py new file mode 100644 index 0000000..e3c982c --- /dev/null +++ b/backend/tests/test_rtsp_segment_cleanup.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +import os +import time +from pathlib import Path + +from app.services.video.rtsp_segment_cleanup import ( + default_rtsp_segments_root, + purge_expired_rtsp_segments, +) + + +def test_purge_expired_rtsp_segments_removes_old_slice(tmp_path: Path) -> None: + root = default_rtsp_segments_root(base_dir=tmp_path / "logs") + mp4 = root / "123456" / "or-cam-03" / "slice_0000.mp4" + mp4.parent.mkdir(parents=True) + mp4.write_bytes(b"slice") + old = time.time() - (25 * 3600) + os.utime(mp4, (old, old)) + + removed = purge_expired_rtsp_segments(root, ttl_hours=24.0) + + assert removed == 1 + assert not mp4.exists() + assert not (root / "123456" / "or-cam-03").exists() + + +def test_purge_expired_rtsp_segments_keeps_recent_slice(tmp_path: Path) -> None: + root = default_rtsp_segments_root(base_dir=tmp_path / "logs") + mp4 = root / "123456" / "or-cam-03" / "slice_0001.mp4" + mp4.parent.mkdir(parents=True) + mp4.write_bytes(b"slice") + + removed = purge_expired_rtsp_segments(root, ttl_hours=24.0) + + assert removed == 0 + assert mp4.is_file() diff --git a/backend/tests/test_session_manager_unit.py b/backend/tests/test_session_manager_unit.py index 569cbe8..d0f6af4 100644 --- a/backend/tests/test_session_manager_unit.py +++ b/backend/tests/test_session_manager_unit.py @@ -138,85 +138,6 @@ async def test_resolve_reject_keeps_pending_detail_and_queue() -> None: assert st.pending_fifo == [pid] -@pytest.mark.asyncio -async def test_apply_segment_confirmed_event() -> None: - settings = Settings() - mgr = CameraSessionManager( - settings=settings, - hikvision_runtime=None, - result_repository=None, - ) - st = SurgerySessionState( - candidate_consumables=["纱布"], - name_to_code={"纱布": "HC1"}, - ) - wall = 1_700_000_000.0 - ev = { - "type": "segment_confirmed", - "cooldown_key": "k1", - "item_id": "HC1", - "item_name": "纱布", - "qty": 1, - "wall_start_epoch": wall - 1, - "wall_end_epoch": wall, - "camera_id": "cam01", - "top1_conf": 0.95, - "top2_name": "", - "top2_conf": 0.0, - "top3_name": "", - "top3_conf": 0.0, - } - await mgr._apply_algo_event("123456", st, ev) - assert len(st.details) == 1 - assert st.details[0].item_name == "纱布" - assert st.details[0].source == "algo_subprocess" - - -@pytest.mark.asyncio -async def test_apply_needs_voice_confirm_event(monkeypatch: pytest.MonkeyPatch) -> None: - monkeypatch.setattr(bp, "VIDEO_AUTO_CONFIRM_CONFIDENCE", 0.99) - settings = Settings() - mgr = CameraSessionManager( - settings=settings, - hikvision_runtime=None, - result_repository=None, - ) - hub = MagicMock() - mgr.set_voice_terminal_hub(hub) - st = SurgerySessionState( - candidate_consumables=["缝线"], - name_to_code={}, - voice_terminal_id="T1", - ) - ev = { - "type": "needs_voice_confirm", - "confirmation_id": "cid-voice-1", - "model_top1_label": "纱布", - "model_top1_confidence": 0.91, - "options": [{"label": "纱布", "confidence": 0.91}, {"label": "缝线", "confidence": 0.05}], - "wall_start_epoch": 100.0, - "wall_end_epoch": 110.0, - "camera_id": "cam01", - "cls_top3": { - "t1_name": "纱布", - "t1_conf": 0.91, - "t2_name": "缝线", - "t2_conf": 0.05, - "t3_name": "", - "t3_conf": 0.0, - "t1_pid": "", - "t2_pid": "", - "t3_pid": "", - }, - } - await mgr._apply_algo_event("123456", st, ev) - assert len(st.pending_fifo) == 1 - assert st.pending_fifo[0] == "cid-voice-1" - assert len(st.details) == 1 - assert st.details[0].item_name == "纱布(待确认)" - hub.schedule_notify_pending_head.assert_called_once() - - @pytest.mark.asyncio async def test_archive_retry_loop_starts() -> None: settings = Settings() diff --git a/backend/tests/test_slice_result_integrator.py b/backend/tests/test_slice_result_integrator.py new file mode 100644 index 0000000..76f7b8b --- /dev/null +++ b/backend/tests/test_slice_result_integrator.py @@ -0,0 +1,124 @@ +"""Tests for batch slice TSV integration thresholds.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from app.algo_host.slice_result_integrator import integrate_batch_slice_tsv +from app.baked import pipeline as bp +from app.services.video.session_registry import SurgerySessionRegistry, SurgerySessionState + + +def _write_tsv(path: Path, *, top1_conf: str) -> None: + path.write_text( + "\t".join( + [ + "rank", + "start_sec", + "end_sec", + "product_id_top1", + "top1_name", + "top1_conf", + "product_id_top2", + "top2_name", + "top2_conf", + "product_id_top3", + "top3_name", + "top3_conf", + ] + ) + + "\n" + + "\t".join( + [ + "1", + "1.0", + "2.0", + "P1", + "纱布", + top1_conf, + "", + "", + "", + "", + "", + "", + ] + ) + + "\n医生信息:未启用\n", + encoding="utf-8", + ) + + +@pytest.mark.asyncio +async def test_high_confidence_appends_confirmed_detail(tmp_path: Path) -> None: + tsv = tmp_path / "result.tsv" + _write_tsv(tsv, top1_conf="0.95") + registry = SurgerySessionRegistry() + state = SurgerySessionState( + candidate_consumables=["纱布"], + name_to_code={"纱布": "P1"}, + surgery_started_wall=1_700_000_000.0, + ) + await integrate_batch_slice_tsv( + tsv_path=tsv, + state=state, + surgery_id="123456", + camera_id="or-cam-03", + slice_offset_sec=0.0, + surgery_started_wall=state.surgery_started_wall, + registry=registry, + voice_hub=None, + ) + assert len(state.details) == 1 + assert state.details[0].item_name == "纱布" + assert state.pending_fifo == [] + + +@pytest.mark.asyncio +async def test_mid_confidence_enqueues_pending(tmp_path: Path) -> None: + tsv = tmp_path / "result.tsv" + _write_tsv(tsv, top1_conf="0.5") + registry = SurgerySessionRegistry() + state = SurgerySessionState( + candidate_consumables=["纱布"], + name_to_code={"纱布": "P1"}, + surgery_started_wall=1_700_000_000.0, + ) + await integrate_batch_slice_tsv( + tsv_path=tsv, + state=state, + surgery_id="123456", + camera_id="or-cam-03", + slice_offset_sec=0.0, + surgery_started_wall=state.surgery_started_wall, + registry=registry, + voice_hub=None, + ) + assert len(state.pending_fifo) == 1 + assert state.pending_fifo[0] in state.pending_by_id + + +@pytest.mark.asyncio +async def test_low_confidence_skipped(tmp_path: Path) -> None: + tsv = tmp_path / "result.tsv" + _write_tsv(tsv, top1_conf="0.1") + registry = SurgerySessionRegistry() + state = SurgerySessionState( + candidate_consumables=["纱布"], + name_to_code={"纱布": "P1"}, + surgery_started_wall=1_700_000_000.0, + ) + await integrate_batch_slice_tsv( + tsv_path=tsv, + state=state, + surgery_id="123456", + camera_id="or-cam-03", + slice_offset_sec=0.0, + surgery_started_wall=state.surgery_started_wall, + registry=registry, + voice_hub=None, + ) + assert state.details == [] + assert state.pending_fifo == [] diff --git a/backend/tests/test_voice_terminal_binding.py b/backend/tests/test_voice_terminal_binding.py index f92600a..cbb79e5 100644 --- a/backend/tests/test_voice_terminal_binding.py +++ b/backend/tests/test_voice_terminal_binding.py @@ -9,7 +9,6 @@ import pytest from app.config import Settings from app.or_site_config import ( load_or_site_config_from_path, - merge_video_rtsp_urls_into_file, parse_or_site_config_object, ) from app.services.voice_terminal_binding import VoiceTerminalBindingIndex @@ -115,17 +114,3 @@ def test_settings_video_map(tmp_path: Path) -> None: ) s = Settings(or_site_config_json_file=str(p)) assert s.video_rtsp_url_map() == {"c1": "rtsp://x"} - - -def test_merge_preserves_bindings(tmp_path: Path) -> None: - p = tmp_path / "site.json" - p.write_text( - '{"video_rtsp_urls":{"old":"rtsp://old"},' - '"voice_or_room_bindings":[{"or_room_id":"R","camera_ids":["x"],' - '"voice_terminal_id":"T"}]}', - encoding="utf-8", - ) - merge_video_rtsp_urls_into_file(p, {"x": "rtsp://127.0.0.1:1/p"}, replace_host="127.0.0.1") - cfg = load_or_site_config_from_path(p) - assert cfg.video_rtsp_urls == {"x": "rtsp://127.0.0.1:1/p"} - assert cfg.voice_bindings.resolve_terminal(["x"]) == "T" diff --git a/clients/README.md b/clients/README.md index 75ed628..e8c2e01 100644 --- a/clients/README.md +++ b/clients/README.md @@ -4,7 +4,7 @@ | 目录 | 用途 | 默认端口 | |------|------|----------| -| [demo-client/](demo-client/) | 三链路联调(真 RTSP / 模拟实时 / 离线 batch) | 38081 | -| [voice-confirmation/](voice-confirmation/) | 语音确认终端页(链路 1/2 开录后使用) | 8080 | +| [demo-client/](demo-client/) | 联调(真 RTSP 切片 batch / 离线 batch);链路 1 落盘 slice 默认 24h TTL | 38081 | +| [voice-confirmation/](voice-confirmation/) | 语音确认终端页(链路 1 开录后使用) | 8080 | 启动前请先在 `backend/` 执行 `docker compose up -d --build`。 diff --git a/clients/demo-client/README.md b/clients/demo-client/README.md index 4b19bbf..22976e8 100755 --- a/clients/demo-client/README.md +++ b/clients/demo-client/README.md @@ -1,55 +1,32 @@ # Demo Client · 联调台 -浏览器联调页,覆盖三条录制链路。语音待确认请使用 [`../voice-confirmation/`](../voice-confirmation/)(默认 :8080)。 +浏览器联调页,覆盖两条录制链路。语音待确认请使用 [`../voice-confirmation/`](../voice-confirmation/)(默认 :8080)。 -## 三条链路 +## 两条链路 | 模式 | 操作 | API | 语音 | 结束手术 | |------|------|-----|------|----------| | **链路 1 · 真摄像头** | 填 camera_id → 开始手术 | `POST /client/surgeries/start` | 需要 | 需要 | -| **链路 2 · 模拟实时** | 选满 N 路视频 → 开始模拟开录 | `POST /internal/demo/simulated-start` | 需要 | 需要 | | **链路 3 · 离线精确** | 选 MP4 → 上传并处理 | `POST /internal/demo/offline-batch` | 无 | 不需要 | -链路 2/3 需 `DEMO_ORCHESTRATOR_ENABLED=true`;链路 2 还需可写 `OR_SITE_CONFIG_JSON_FILE`。页顶「刷新状态」可查看 API 与 Demo 模式是否就绪。 +链路 3 需 `DEMO_ORCHESTRATOR_ENABLED=true`。页顶「刷新状态」可查看 API 与 Demo 模式是否就绪。 ## 界面说明 -- **模式卡片**:点选切换,只显示当前模式相关配置 -- **耗材**:标签 chip 多选;「高级」可编辑 JSON -- **链路 2**:默认只需上传视频;RTSP 路径与 camera_id 在「高级」折叠 -- **链路 3**:独立 MP4 上传区,可选生成标注视频 -- **开发者日志**:右侧折叠,记录完整 HTTP 请求/响应 +- **链路 1**:填写 `camera_ids`(逗号分隔);默认仅 `or-cam-03` 参与 RTSP 录像切片与 batch 算法。服务端落盘 `slice_*.mp4` 仅用于推理,默认 **24 小时**后自动删除(`RTSP_SEGMENT_TTL_HOURS`),不影响已入库消耗明细;页顶「刷新状态」会显示当前切片间隔与保留时长。 +- **链路 3**:独立 MP4 上传区,可选生成标注视频(独立 TTL:`VIDEO_BATCH_VIS_TTL_HOURS`,默认 24 小时) -## 运行 +详见 [`docs/video-backends.md`](../../docs/video-backends.md)。 + +## 启动 ```bash -cd backend && docker compose up -d --build -cd ../clients/demo-client && ./start.sh -open http://127.0.0.1:38081/ +cd clients/demo-client +python3 -m http.server 38081 ``` -「API 地址」默认 `http://127.0.0.1:38080`;从局域网访问 demo 页时会自动改用当前主机 IP。 +浏览器打开 `http://127.0.0.1:38081/`,API 地址填后端(默认 `http://127.0.0.1:38080`)。 -## 文件 +## HLS 预览(链路 1) -``` -clients/demo-client/ - index.html # 页面骨架 - styles.css # 样式 - app.js # 逻辑 - server.py # 静态服务 + GET /labels.json - labels.yaml # 耗材标签(与后端同步) - start.sh -``` - -## 手跑假 RTSP(链路 2 高级) - -```bash -python3 fake_rtsp_from_file.py /path/to/video.mp4 --port 18554 --path demo -``` - -详见 [`../../docs/video-backends.md`](../../docs/video-backends.md)。 - -## CORS - -跨域访问 API 时设置 `DEMO_CORS_ENABLED=true`。 +真 RTSP 可通过 MediaMTX 转 HLS 在页内预览;点击「启动 / 刷新预览」。 diff --git a/clients/demo-client/app.js b/clients/demo-client/app.js index 04df945..28c6aa0 100644 --- a/clients/demo-client/app.js +++ b/clients/demo-client/app.js @@ -5,8 +5,6 @@ "use strict"; const $ = (id) => document.getElementById(id); - const DEF_CAMS = ["or-cam-03", "or-cam-02", "or-cam-04", "or-cam-01"]; - const DEF_RP = ["demo1", "demo2", "demo3", "demo4"]; let activeMode = "live-rtsp"; let allLabels = []; @@ -14,8 +12,7 @@ let lastVideoBatchDoctorDisplay = ""; let videoVisToken = 0; const hlsPlayers = {}; - const slotBlobUrls = {}; - const webcamSlotState = {}; + let serverRecordingConfig = null; const baseUrl = () => $("base-url").value.trim().replace(/\/+$/, ""); const surgeryId = () => $("surgery-id").value.trim(); @@ -101,6 +98,71 @@ return m + " 分 " + s + " 秒"; } + function formatHoursLabel(hours) { + const n = Number(hours); + if (!Number.isFinite(n) || n <= 0) return "—"; + return Number.isInteger(n) ? `${n} 小时` : `${n} 小时`; + } + + function applyRecordingConfigHints(data) { + if (!data || typeof data !== "object") return; + serverRecordingConfig = data; + + const primary = String(data.rtsp_primary_camera_id || "or-cam-03").trim() || "or-cam-03"; + const duration = formatDurationSec(data.rtsp_segment_duration_sec ?? 120); + const sliceTtl = formatHoursLabel(data.rtsp_segment_ttl_hours ?? 24); + const visTtl = formatHoursLabel(data.video_batch_vis_ttl_hours ?? 24); + const recordAll = data.rtsp_record_all_cameras === true; + + const liveHint = $("live-rtsp-segment-hint"); + if (liveHint) { + const cameraLine = recordAll + ? "请求中所有可解析 RTSP 的机位均参与录像" + : `默认仅录制 ${primary} 机位`; + liveHint.textContent = + `${cameraLine},每 ${duration} 切片跑 batch;` + + `服务端落盘 slice 仅用于推理,${sliceTtl} 后自动删除(不影响已入库明细)。` + + "下方可预览每路画面。"; + } + + const visLabel = $("offline-batch-vis-label"); + if (visLabel) { + visLabel.textContent = `生成标注视频(${visTtl} 内可预览)`; + } + } + + function updateDemoModesPill(data, resOk) { + const pill = $("pill-demo-modes"); + if (!pill) return; + + if (activeMode === "live-rtsp") { + if (!resOk || !data) { + pill.textContent = "链路 1 配置未拉取"; + pill.className = "pill err"; + return; + } + const duration = formatDurationSec(data.rtsp_segment_duration_sec ?? 120); + const sliceTtl = formatHoursLabel(data.rtsp_segment_ttl_hours ?? 24); + pill.textContent = `切片 ${duration} · 保留 ${sliceTtl}`; + pill.className = "pill ok"; + return; + } + + if (!resOk || !data) { + pill.textContent = "状态拉取失败"; + pill.className = "pill err"; + return; + } + const on = data.demo_recording_modes_enabled === true || data.orchestrator_enabled === true; + if (on) { + pill.textContent = "链路 3 已就绪"; + pill.className = "pill ok"; + } else { + pill.textContent = "DEMO 未开启"; + pill.className = "pill err"; + } + } + function showOfflineBatchTiming(textSec, videoSec, totalSec, videoStatus) { const el = $("offline-batch-timing"); if (!el) return; @@ -362,20 +424,11 @@ const btnStart = $("btn-start"); const btnEnd = $("btn-end"); if (btnStart) { - btnStart.textContent = - mode === "offline-batch" - ? "上传并处理" - : mode === "live-simulated" - ? "开始模拟开录" - : "开始手术"; + btnStart.textContent = mode === "offline-batch" ? "上传并处理" : "开始手术"; } if (btnEnd) btnEnd.disabled = mode === "offline-batch"; $("pill-mode").textContent = - mode === "live-rtsp" - ? "链路 1 · 真摄像头" - : mode === "live-simulated" - ? "链路 2 · 模拟实时" - : "链路 3 · 离线精确"; + mode === "live-rtsp" ? "链路 1 · 真摄像头" : "链路 3 · 离线精确"; if (mode !== "offline-batch") hideVideoBatchVisualization(); refreshRecordingModesStatus(); if (mode === "live-rtsp") { @@ -402,35 +455,6 @@ return parseCameraIdsInput().map((id) => ({ id, label: id, source: "rtsp" })); } - function revokeSlotBlobUrls() { - for (const key of Object.keys(slotBlobUrls)) { - URL.revokeObjectURL(slotBlobUrls[key]); - delete slotBlobUrls[key]; - } - } - - function updateSlotFilePreview(slot, file) { - const vid = $("sim-preview-" + slot); - if (!vid) return; - if (slotBlobUrls[slot]) { - URL.revokeObjectURL(slotBlobUrls[slot]); - delete slotBlobUrls[slot]; - } - if (!file) { - vid.removeAttribute("src"); - vid.classList.add("hidden"); - vid.pause(); - return; - } - const url = URL.createObjectURL(file); - slotBlobUrls[slot] = url; - vid.src = url; - vid.classList.remove("hidden"); - vid.loop = true; - vid.muted = true; - void vid.play().catch(() => {}); - } - function setPreviewStatus(text, state) { const status = $("preview-status"); if (!status) return; @@ -620,39 +644,19 @@ } async function refreshRecordingModesStatus() { - const pill = $("pill-demo-modes"); - if (activeMode === "live-rtsp") { - if (pill) { - pill.textContent = "链路 2/3 未检测"; - pill.className = "pill"; - } - return; - } const url = baseUrl() + "/internal/demo/recording-modes-status"; try { const res = await fetch(url); const data = await res.json(); addLog("GET", url, res.status, data, { error: !res.ok }); - if (!pill) return; - const on = data.demo_recording_modes_enabled === true || data.orchestrator_enabled === true; - const fset = data.or_site_config_json_file_set === true; - if (!res.ok) { - pill.textContent = "状态拉取失败"; - pill.className = "pill err"; - return; - } - if (on && fset) { - pill.textContent = "链路 2/3 已就绪"; - pill.className = "pill ok"; - } else if (on) { - pill.textContent = "缺 OR_SITE_CONFIG"; - pill.className = "pill warn"; - } else { - pill.textContent = "DEMO 未开启"; - pill.className = "pill err"; + if (res.ok) { + applyRecordingConfigHints(data); } + updateDemoModesPill(data, res.ok); } catch (e) { - if (pill) { + updateDemoModesPill(null, false); + const pill = $("pill-demo-modes"); + if (pill && activeMode !== "live-rtsp") { pill.textContent = "状态失败"; pill.className = "pill err"; } @@ -794,36 +798,12 @@ void waitForVideoBatchVisualization(sid, urlPath, doctorDisplay); } - function getDebugStreamCount() { - const sel = $("debug-stream-count"); - const v = sel ? parseInt(sel.value, 10) : 1; - return v >= 1 && v <= 4 ? v : 1; - } - - function applyDebugStreamVisibility() { - const n = getDebugStreamCount(); - for (let i = 1; i <= 4; i++) { - const el = $("sim-stream-" + i); - if (el) el.classList.toggle("hidden", i > n); - } - } - function assignFileToInput(inputEl, file) { const dt = new DataTransfer(); dt.items.add(file); inputEl.files = dt.files; } - function setSlotFile(slot, file, source) { - const vfile = $("sim-vfile-" + slot); - const hint = $("sim-hint-" + slot); - const fname = $("sim-fname-" + slot); - if (vfile && file) assignFileToInput(vfile, file); - if (fname) fname.textContent = file ? file.name : "点击或拖放视频"; - if (hint) hint.textContent = file ? "已选:" + file.name + (source ? " (" + source + ")" : "") : ""; - updateSlotFilePreview(slot, file || null); - } - function setOfflineFile(file, source) { const vfile = $("offline-vfile"); const fname = $("offline-fname"); @@ -856,78 +836,6 @@ ); } - function pickMediaRecorderMime() { - if (typeof MediaRecorder === "undefined" || !MediaRecorder.isTypeSupported) return ""; - for (const m of [ - "video/webm;codecs=vp9,opus", - "video/webm;codecs=vp8,opus", - "video/webm", - "video/mp4", - ]) { - if (MediaRecorder.isTypeSupported(m)) return m; - } - return ""; - } - - async function toggleWebcamSlot(slot) { - const st = webcamSlotState[slot] || (webcamSlotState[slot] = {}); - const btn = $("sim-webcam-" + slot); - const hint = $("sim-hint-" + slot); - if (!st.recording) { - if (!navigator.mediaDevices?.getUserMedia) { - showBanner("需要 HTTPS 或 localhost 才能使用摄像头", "warn"); - return; - } - try { - const stream = await navigator.mediaDevices.getUserMedia({ - video: { facingMode: "user" }, - audio: true, - }); - const mime = pickMediaRecorderMime(); - st.chunks = []; - const rec = mime ? new MediaRecorder(stream, { mimeType: mime }) : new MediaRecorder(stream); - rec.ondataavailable = (ev) => { - if (ev.data?.size) st.chunks.push(ev.data); - }; - rec.start(250); - st.recording = true; - st.stream = stream; - st.recorder = rec; - if (btn) { - btn.textContent = "停止录制"; - btn.classList.add("warn"); - } - if (hint) hint.textContent = "录制中…"; - } catch (e) { - showBanner("无法打开摄像头:" + (e.message || String(e)), "err"); - } - return; - } - const rec = st.recorder; - const stream = st.stream; - st.recording = false; - st.recorder = null; - st.stream = null; - if (btn) { - btn.textContent = "摄像头"; - btn.classList.remove("warn"); - } - await new Promise((resolve) => { - rec.addEventListener("stop", resolve, { once: true }); - rec.stop(); - }); - stream?.getTracks().forEach((t) => t.stop()); - const blob = new Blob(st.chunks, { type: rec.mimeType || "video/webm" }); - st.chunks = []; - if (!blob.size) { - if (hint) hint.textContent = "未录到数据"; - return; - } - const ext = (rec.mimeType || "").includes("mp4") ? ".mp4" : ".webm"; - const file = new File([blob], "webcam-" + slot + "-" + Date.now() + ext, { type: blob.type }); - setSlotFile(slot, file, "摄像头"); - } - async function handleStart() { const sid = ensureSurgeryId(); if (!sid) return; @@ -984,50 +892,6 @@ return; } - if (mode === "live-simulated") { - const n = getDebugStreamCount(); - const files = []; - for (let i = 1; i <= n; i++) { - files.push($("sim-vfile-" + i)?.files?.[0]); - } - if (files.length !== n || !files.every(Boolean)) { - showBanner("请为路 1…" + n + " 全部选择视频文件", "err"); - return; - } - const fd = new FormData(); - fd.append("surgery_id", sid); - fd.append("video1", files[0], files[0].name); - if (n >= 2) fd.append("video2", files[1], files[1].name); - if (n >= 3) fd.append("video3", files[2], files[2].name); - if (n >= 4) fd.append("video4", files[3], files[3].name); - for (let i = 1; i <= 4; i++) { - fd.append( - "camera_" + i, - ($("adv-cam-" + i)?.value || DEF_CAMS[i - 1]).trim() || DEF_CAMS[i - 1], - ); - fd.append( - "rtsp_path_" + i, - ($("adv-rpath-" + i)?.value || DEF_RP[i - 1]).trim() || DEF_RP[i - 1], - ); - } - fd.append("candidate_consumables_json", JSON.stringify(candidateConsumables)); - const { res, body } = await apiMultipart( - "/internal/demo/simulated-start", - fd, - "simulated-start", - ); - if (!res.ok) { - const dup = body?.detail?.code === "SURGERY_ALREADY_RECORDING"; - showBanner( - dup ? "请勿重复开始:" + formatResultUnavailable(body) : "模拟开录失败:" + formatDetail(body), - "err", - ); - return; - } - showBanner("模拟开录已接受,请打开语音终端", "ok"); - return; - } - const camera_ids = $("camera-ids") .value.split(",") .map((s) => s.trim()) @@ -1136,29 +1000,6 @@ setActiveMode("live-rtsp"); } - function initSimulatedUploads() { - $("debug-stream-count")?.addEventListener("change", applyDebugStreamVisibility); - applyDebugStreamVisibility(); - for (let i = 1; i <= 4; i++) { - const pick = $("sim-pick-" + i); - const vfile = $("sim-vfile-" + i); - const zone = $("sim-zone-" + i); - if (pick && vfile) { - pick.onclick = () => vfile.click(); - vfile.onchange = () => { - const f = vfile.files?.[0]; - if (f) setSlotFile(i, f, "选择"); - }; - } - bindDropZone(zone, (f) => setSlotFile(i, f, "拖放")); - zone?.addEventListener("click", (ev) => { - if (ev.target.closest("button")) return; - vfile?.click(); - }); - $("sim-webcam-" + i)?.addEventListener("click", () => toggleWebcamSlot(i)); - } - } - function initOfflineUpload() { const zone = $("offline-zone"); const vfile = $("offline-vfile"); @@ -1193,7 +1034,6 @@ function init() { applyLanDefaultApiBase(); initModeCards(); - initSimulatedUploads(); initOfflineUpload(); initConsumables(); loadLabels(); @@ -1212,13 +1052,13 @@ $("camera-ids")?.addEventListener("input", () => { if (activeMode === "live-rtsp") startPreviewPolling(); }); - $("debug-stream-count")?.addEventListener("change", applyDebugStreamVisibility); $("btn-preview-refresh")?.addEventListener("click", () => startPreviewPolling()); $("btn-preview-stop")?.addEventListener("click", () => { void stopPreviewPolling(); }); refreshHealth(); + refreshRecordingModesStatus(); } if (document.readyState === "loading") { diff --git a/clients/demo-client/fake_rtsp_from_file.py b/clients/demo-client/fake_rtsp_from_file.py deleted file mode 100755 index 42df3c2..0000000 --- a/clients/demo-client/fake_rtsp_from_file.py +++ /dev/null @@ -1,281 +0,0 @@ -#!/usr/bin/env python3 -"""Publish local video file(s) to RTSP once per file (fake camera) for local dev. - -The Operation Room server only opens RTSP URLs (OpenCV); there is no video-upload API. -This script does NOT change the application backend: it runs ffmpeg + a small -RTSP server (MediaMTX); put the printed ``video_rtsp_urls`` into ``OR_SITE_CONFIG_JSON_FILE``. - -Requires: - - ffmpeg in PATH - - Docker; 默认拉取 ``MEDIAMTX_DOCKER_IMAGE``(DaoCloud 前缀的 bluenviron/mediamtx), - 或本地已拉取的镜像;也可用 PATH 中的 ``mediamtx`` 二进制(高级)。 - -Single stream (legacy):: - python3 fake_rtsp_from_file.py /path/to/video.mp4 - python3 fake_rtsp_from_file.py video.mp4 --port 18554 --path demo - -Multiple streams (one MediaMTX, one ffmpeg per camera; different RTSP path per stream):: - - python3 fake_rtsp_from_file.py --port 18554 \\ - --stream 'or-cam-01|./a.mp4|demo1' \\ - --stream 'or-cam-02|./b.mp4|demo2' - ---stream format: ``CAMERA_ID|FILE|RTSP_PATH`` (use quotes in shell; RTSP path is -the last segment, e.g. ``demo1`` -> ``rtsp://127.0.0.1:/demo1``). -""" - -from __future__ import annotations - -import argparse -import atexit -import json -import os -import signal -import shutil -import subprocess -import sys -import time -from pathlib import Path - -# 默认 DaoCloud 镜像前缀;可设 MEDIAMTX_DOCKER_IMAGE=bluenviron/mediamtx:latest 直连 Docker Hub -MEDIAMTX_IMAGE = os.environ.get( - "MEDIAMTX_DOCKER_IMAGE", - "m.daocloud.io/docker.io/bluenviron/mediamtx:latest", -) -CONTAINER_NAME = "orm-fake-rtsp-mediamtx" - - -def _has_docker() -> bool: - return shutil.which("docker") is not None - - -def _has_ffmpeg() -> bool: - return shutil.which("ffmpeg") is not None - - -def _stop_mediamtx_container() -> None: - if not _has_docker(): - return - try: - subprocess.run( - ["docker", "rm", "-f", CONTAINER_NAME], - capture_output=True, - check=False, - timeout=30, - ) - except (OSError, subprocess.SubprocessError): - pass - - -def _start_mediamtx_docker(host_port: int) -> bool: - _stop_mediamtx_container() - cmd = [ - "docker", - "run", - "-d", - "--name", - CONTAINER_NAME, - "-p", - f"127.0.0.1:{host_port}:8554", - MEDIAMTX_IMAGE, - ] - print("[fake-rtsp] Starting MediaMTX:", " ".join(cmd), file=sys.stderr) - try: - proc = subprocess.run(cmd, capture_output=True, text=True, timeout=120) - except (OSError, subprocess.SubprocessError) as exc: - print(f"[fake-rtsp] docker run failed: {exc}", file=sys.stderr) - return False - if proc.returncode != 0: - err = (proc.stderr or proc.stdout or "").strip() - print(f"[fake-rtsp] docker run exit {proc.returncode}: {err}", file=sys.stderr) - return False - atexit.register(_stop_mediamtx_container) - return True - - -def _parse_stream_arg(spec: str) -> tuple[str, Path, str]: - parts = spec.split("|", 2) - if len(parts) != 3: - raise ValueError(f"Invalid --stream {spec!r}; expected CAM|FILE|RTSP_PATH (three fields separated by |)") - cam = parts[0].strip() - fpath = Path(parts[1].strip()).expanduser() - rpath = parts[2].strip().strip("/") - if not cam: - raise ValueError("empty camera id in --stream") - if not rpath: - rpath = "demo" - return cam, fpath, rpath - - -def main() -> int: - parser = argparse.ArgumentParser( - description="Play each video file once to an RTSP URL (dev fake camera; no backend code change).", - ) - parser.add_argument( - "video", - nargs="?", - type=Path, - default=None, - help="(single-stream mode) Path to a video file", - ) - parser.add_argument( - "--path", - default="demo", - help="(single-stream mode) RTSP path segment (rtsp://host:port/)", - ) - parser.add_argument( - "--port", - type=int, - default=18554, - help="Host port mapped to MediaMTX RTSP (container internal 8554). Default: 18554", - ) - parser.add_argument( - "--stream", - action="append", - default=None, - help=("Multi-stream mode. Repeat for each camera. Format: CAM|FILE|RTSP_PATH e.g. or-cam-01|./a.mp4|demo1"), - ) - parser.add_argument( - "--no-docker", - action="store_true", - help="Do not start Docker; run MediaMTX yourself on the host port mapping.", - ) - args = parser.parse_args() - - if not _has_ffmpeg(): - print("ffmpeg not found in PATH. Install ffmpeg and retry.", file=sys.stderr) - return 1 - - streams: list[tuple[str, Path, str]] = [] - if args.stream: - for s in args.stream: - try: - streams.append(_parse_stream_arg(s)) - except ValueError as exc: - print(f"[fake-rtsp] {exc}", file=sys.stderr) - return 1 - elif args.video is not None: - fpath = args.video.resolve() - sp = (args.path or "demo").strip().strip("/") or "demo" - streams = [("or-cam-01", fpath, sp)] - else: - parser.error("Provide a video file (single mode) or one or more --stream CAM|FILE|RTSP_PATH") - - for cam, fpath, rpath in streams: - rp_file = fpath.resolve() - if not rp_file.is_file(): - print(f"File not found: {rp_file} (camera {cam!r})", file=sys.stderr) - return 1 - for ch in rpath: - if ch not in "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_.-": - print( - f"[fake-rtsp] RTSP path segment {rpath!r} for {cam!r} should be " - r"[a-zA-Z0-9_.-] only; adjust --path/--stream", - file=sys.stderr, - ) - return 1 - - host_port: int = args.port - if not args.no_docker: - if not _has_docker(): - print("Docker not found. Use --no-docker and start MediaMTX manually.", file=sys.stderr) - return 1 - if not _start_mediamtx_docker(host_port): - return 1 - print("[fake-rtsp] MediaMTX container started. Waiting for RTSP…", file=sys.stderr) - time.sleep(1.0) - else: - print( - f"[fake-rtsp] --no-docker: ensure an RTSP server is listening for publish on port {host_port}.", - file=sys.stderr, - ) - - procs: list[subprocess.Popen] = [] - url_map: dict[str, str] = {} - - for cam, fpath, stream_path in streams: - fp = fpath.resolve() - dest_url = f"rtsp://127.0.0.1:{host_port}/{stream_path}" - url_map[cam] = dest_url - publish_cmd: list[str] = [ - "ffmpeg", - "-hide_banner", - "-loglevel", - "info", - "-re", - "-i", - str(fp), - "-c", - "copy", - "-f", - "rtsp", - "-rtsp_transport", - "tcp", - dest_url, - ] - print("---", file=sys.stderr) - print(f"Publish {cam} -> {dest_url}", file=sys.stderr) - print(" " + " ".join(publish_cmd), file=sys.stderr) - p = subprocess.Popen(publish_cmd) # noqa: S603 - procs.append(p) - - site_doc = {"video_rtsp_urls": url_map, "voice_or_room_bindings": []} - print("---", file=sys.stderr) - print("RTSP mapping (per camera):", file=sys.stderr) - for k, u in url_map.items(): - print(f" {k}: {u}", file=sys.stderr) - print("", file=sys.stderr) - print( - "OR site config (merge video_rtsp_urls into OR_SITE_CONFIG_JSON_FILE; add voice_or_room_bindings as needed):", - file=sys.stderr, - ) - print(json.dumps(site_doc, ensure_ascii=False, indent=2), file=sys.stderr) - print("", file=sys.stderr) - print("If the server runs in Docker on Mac/Win, use host.docker.internal, e.g.:", file=sys.stderr) - for cam, u in url_map.items(): - h = u.replace("127.0.0.1", "host.docker.internal", 1) - print(f" {cam}: {h}", file=sys.stderr) - print("---", file=sys.stderr) - print( - "Fake RTSP running: each file plays once; script exits when ffmpeg ends " - "(Ctrl+C to stop early; MediaMTX container removed on exit).", - file=sys.stderr, - ) - - def on_sigint(_sig: int, _frame) -> None: - for p in procs: - if p.poll() is None: - p.terminate() - _stop_mediamtx_container() - raise SystemExit(130) - - signal.signal(signal.SIGINT, on_sigint) - signal.signal(signal.SIGTERM, on_sigint) - - try: - while True: - time.sleep(0.5) - for p in procs: - if p.poll() is not None: - print( - f"[fake-rtsp] ffmpeg ended (code {p.returncode}), stopping all.", - file=sys.stderr, - ) - raise KeyboardInterrupt - except KeyboardInterrupt: - pass - finally: - for p in procs: - if p.poll() is None: - p.terminate() - try: - p.wait(timeout=5) - except subprocess.TimeoutExpired: - p.kill() - _stop_mediamtx_container() - - return 0 - - -if __name__ == "__main__": - raise SystemExit(main()) diff --git a/clients/demo-client/index.html b/clients/demo-client/index.html index 73b743d..955e4ec 100755 --- a/clients/demo-client/index.html +++ b/clients/demo-client/index.html @@ -10,7 +10,7 @@

手术监控 · 联调台

-

三条链路:真摄像头实时、模拟实时、离线精确处理

+

两条链路:真摄像头 RTSP 切片 batch、离线精确处理

@@ -46,11 +46,7 @@
- - -
-
- -
- - - -
-
- 高级:每路 RTSP 路径与 camera_id -
-
-
-
-
-
-
-
-
-
-
-
-
+

+ 默认仅录制 3 号机位(or-cam-03)并每 2 分钟切片跑 batch;服务端落盘 slice 仅用于推理,24 小时后自动删除(不影响已入库明细)。下方可预览每路画面。 +

@@ -180,7 +93,7 @@
diff --git a/docs/video-backends.md b/docs/video-backends.md index c37b978..627fc2f 100755 --- a/docs/video-backends.md +++ b/docs/video-backends.md @@ -4,21 +4,29 @@ - **推荐**:`Linux x86_64` + **glibc**(与当前 `python:3.13-slim-bookworm` 一致)。 - **不推荐**:Alpine(musl)加载海康预编译 `.so` 往往失败。 -- 镜像已安装 **ffmpeg** 与 OpenCV 常用系统库,便于 `cv2.VideoCapture(..., cv2.CAP_FFMPEG)` 拉 RTSP。 +- 镜像已安装 **ffmpeg** 与 OpenCV 常用系统库,便于 RTSP 录像切片与预览。 ## RTSP 模式(默认) 1. 配置 `**camera_id` → RTSP URL** 映射: - `**OR_SITE_CONFIG_JSON_FILE**`(推荐):UTF-8 JSON 文件,**仅支持**站点对象:`{"video_rtsp_urls":{...},"voice_or_room_bindings":[...]}`。根级只允许这两个键;`voice_or_room_bindings` 可为 `[]`。见 `[app/resources/or_site_config.sample.json](../app/resources/or_site_config.sample.json)`。服务每次解析映射时会重新读文件,便于联调覆盖 `video_rtsp_urls`。 - `**VIDEO_RTSP_URL_TEMPLATE**`(可选):单模板字符串,可用 `{camera_id}`;在 `video_rtsp_urls` 未给出某路时使用。 -2. 调用 `POST /client/surgeries/start` 时,`camera_ids` 必须能在上述配置中解析出 RTSP 地址。 -3. **开录确认**:每路摄像头在超时内成功打开并读到**首帧**后,才认为该路已开录。 +2. 调用 `POST /client/surgeries/start` 时,`camera_ids` 用于语音终端绑定;**默认仅 `RTSP_PRIMARY_CAMERA_ID`(or-cam-03)** 参与 RTSP 录像与 batch 算法。 +3. **开录确认**:主摄 RTSP 录像进程在超时内成功连接并写入首段数据后,才认为开录成功。 + +## RTSP 录像切片(链路 1) + +- FastAPI 进程内 **ffmpeg** 从 RTSP 拉流,默认每 **120 秒**(`RTSP_SEGMENT_DURATION_SEC`)落盘一个 MP4 切片到 `logs/rtsp_segments/{surgery_id}/{camera_id}/`。 +- 每个完整切片通过 [`BatchAlgorithmService`](../backend/app/algo_host/batch_service.py) 子进程调用 `algorithm_subprocesses/5.15/main.py`(与链路 3 相同),解析 `result.tsv` 后写入活跃会话。 +- 停录时 flush 尾切片并等待 batch 队列 drain(`RTSP_SLICE_BATCH_DRAIN_TIMEOUT_SEC`)。 +- 落盘切片默认 **24 小时**后自动删除(`RTSP_SEGMENT_TTL_HOURS`;进程启动与后台定时 sweep)。 +- 设置 `RTSP_RECORD_ALL_CAMERAS=true` 可对请求中所有可解析 RTSP 的机位分别录像+跑 batch(多机位代码已预留)。 ## Docker 与 RTSP 地址 - **站点 JSON 中的局域网 IP**(如 `[or_site_config.sample.json](../app/resources/or_site_config.sample.json)` 的 `192.168.3.x`):API 在默认 **bridge** 网络下出站流量经 **宿主机** 转发,只要**宿主机**能访问该网段,容器内一般可直接使用相同 URL,无需改成 `172.x` 等。 -- **`127.0.0.1` / `localhost`**:在容器内指向**容器自身**。若 RTSP 服务跑在宿主机(含 `fake_rtsp_from_file.py`、本机 MediaMTX),URL 应使用 **`rtsp://host.docker.internal:<端口>/<路径>`**。[`backend/docker-compose.yml`](../backend/docker-compose.yml) 已为 `api` 服务配置 `extra_hosts: host.docker.internal:host-gateway`(Linux 兼容;macOS/Windows Desktop 通常已内置该主机名)。 -- **传输协议**:compose 默认设置环境变量 **`OPENCV_FFMPEG_CAPTURE_OPTIONS=rtsp_transport;tcp`**,使 OpenCV 经 FFmpeg 以 **TCP** 拉 RTSP,降低容器/NAT 下 UDP 丢包导致的首帧超时;可通过环境变量覆盖。 +- **`127.0.0.1` / `localhost`**:在容器内指向**容器自身**。若 RTSP 服务跑在宿主机或本机 MediaMTX,URL 应使用 **`rtsp://host.docker.internal:<端口>/<路径>`**。[`backend/docker-compose.yml`](../backend/docker-compose.yml) 已为 `api` 服务配置 `extra_hosts: host.docker.internal:host-gateway`(Linux 兼容;macOS/Windows Desktop 通常已内置该主机名)。 +- **传输协议**:compose 默认设置环境变量 **`OPENCV_FFMPEG_CAPTURE_OPTIONS=rtsp_transport;tcp`**,使 ffmpeg/OpenCV 经 **TCP** 拉 RTSP,降低容器/NAT 下 UDP 丢包导致的首帧超时;可通过环境变量覆盖。 ## 海康官方 SDK 模式(可选) @@ -37,11 +45,11 @@ SDK **不作为构建期依赖**:将厂商提供的 Linux x86_64 动态库挂 ## 推理与结果查询 -### 链路 1/2:实时(`python -m app.algorithm_runner`) +### 链路 1:RTSP 切片 + batch(`5.15/main.py`) -- 开录后子进程从 RTSP 读帧,经 ActionFormer gated 流水线(VideoSwin → 段检测 + 手检/好坏帧/耗材投票)产出 JSONL 事件。 +- 开录后 FastAPI 用 ffmpeg 录像并按 `RTSP_SEGMENT_DURATION_SEC` 切片;每段 MP4 经 batch 子进程推理,结果按 TSV 置信度写入会话。 - **候选耗材清单**(`candidate_consumables`):非空时**仅**清单内名称参与自动记账与待确认;**缺省或 `[]`** 时,用 `consumable_classifier_labels.yaml` 的 **全部类名**作为候选。 -- 当段内 Top1 置信度 **≥** `VIDEO_AUTO_CONFIRM_CONFIDENCE`(**默认 0.9**)且标签在候选清单内时,自动写入 `source=vision` 明细;中间区间产生 `needs_voice_confirm`,由语音终端 TTS/确认。 +- 当 Top1 置信度 **≥** `VIDEO_AUTO_CONFIRM_CONFIDENCE`(**默认 0.9**)且标签在候选清单内时,自动写入 `source=video_batch` 明细;中间区间入待确认队列,由语音终端 TTS/确认。 - 客户端 `GET /client/surgeries/{surgery_id}/pending-confirmation`,确认后 `POST .../pending-confirmation/{id}/resolve` 等。 ### 链路 3:离线 batch(`POST /internal/demo/offline-batch`) @@ -52,10 +60,8 @@ SDK **不作为构建期依赖**:将厂商提供的 Linux x86_64 动态库挂 ### 通用 -- `GET /client/surgeries/{surgery_id}/result` 仅在存在**至少一条**消耗明细时返回 200;否则 503 `RESULT_NOT_READY`。 - `GET /client/surgeries/{surgery_id}/result` 仅在存在**至少一条**消耗明细时返回 200;无明细(已开录但尚未记账、已结束但零消耗、或尚无归档等)返回 503 `RESULT_NOT_READY`。 - 同类物品写入受 `VIDEO_DETAIL_COOLDOWN_SEC` 节流。 -- RTSP 读帧连续失败达到 `VIDEO_READ_FAILURE_RECONNECT_THRESHOLD` 时会 `release` 并尝试重连,间隔 `VIDEO_RECONNECT_BACKOFF_SECONDS`。 ## Demo 联调台:HLS 浏览器预览 @@ -69,13 +75,12 @@ SDK **不作为构建期依赖**:将厂商提供的 Linux x86_64 动态库挂 | 链路 | 行为 | |------|------| | **链路 1**(真 RTSP) | `POST /internal/demo/hls-preview/ensure` 按 `or_site_config` 的 `video_rtsp_urls` 配置 MediaMTX 路径 | -| **链路 2**(模拟实时) | 不使用 HLS;联调台在各路槽位用本地 `