import asyncio from collections.abc import Awaitable, Callable from typing import Annotated from fastapi import ( APIRouter, Depends, File, HTTPException, Path, Query, Request, UploadFile, WebSocket, status, ) from pydantic import BaseModel, Field from fastapi.responses import JSONResponse, Response from loguru import logger from sqlalchemy.exc import SQLAlchemyError from app.baked import pipeline as bp from app.config import settings from app.database import check_database from app.dependencies import get_surgery_pipeline, get_voice_terminal_hub from app.schemas import ( HealthResponse, SurgeryApiResponse, SurgeryClientErrorResponse, SurgeryEndRequest, SurgeryPendingConfirmationResolveResponse, SurgeryPendingConfirmationResponse, SurgeryResultResponse, SurgeryStartRequest, VoiceTerminalAssignmentResponse, build_consumption_summary, ) from app.services.recording_live import accept_live_recording from app.services.hls_preview import HlsPreviewManager, fetch_hls_upstream from app.services.rtsp_preview import capture_rtsp_jpeg_frame from app.services.surgery_pipeline import SurgeryPipeline from app.services.video.backend_resolver import BackendResolver from app.services.voice_terminal_hub import VoiceTerminalHub from app.surgery_errors import SurgeryPipelineError router = APIRouter() # 上传 WAV 后 ASR/解析失败:HTTP 200 + status=failed,待确认项仍留在 FIFO 队首,便于桌面端重试。 _RECOVERABLE_VOICE_RESOLVE_CODES = frozenset({"VOICE_ASR_FAILED", "VOICE_TEXT_EMPTY", "VOICE_PARSE_FAILED"}) def _pipeline_error_detail(exc: SurgeryPipelineError, surgery_id: str) -> dict: d: dict = { "code": exc.code, "message": exc.message, "surgery_id": surgery_id, } if exc.extra: d.update(exc.extra) return d def _raise_surgery_pipeline_http(exc: SurgeryPipelineError, surgery_id: str) -> None: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=_pipeline_error_detail(exc, surgery_id), ) from exc def _raise_confirmation_http(exc: SurgeryPipelineError, surgery_id: str) -> None: status_map = { "CONFIRMATION_NOT_FOUND": status.HTTP_404_NOT_FOUND, "CONFIRMATION_NOT_ACTIVE": status.HTTP_404_NOT_FOUND, "CONFIRMATION_ALREADY_RESOLVED": status.HTTP_409_CONFLICT, "CONFIRMATION_INVALID": status.HTTP_422_UNPROCESSABLE_CONTENT, "VOICE_ASR_FAILED": status.HTTP_422_UNPROCESSABLE_CONTENT, "VOICE_PARSE_FAILED": status.HTTP_422_UNPROCESSABLE_CONTENT, "VOICE_TEXT_EMPTY": status.HTTP_422_UNPROCESSABLE_CONTENT, "TTS_TEXT_EMPTY": status.HTTP_422_UNPROCESSABLE_CONTENT, "TTS_ERROR": status.HTTP_503_SERVICE_UNAVAILABLE, "VOICE_AUDIO_INVALID": status.HTTP_422_UNPROCESSABLE_CONTENT, "MINIO_NOT_CONFIGURED": status.HTTP_503_SERVICE_UNAVAILABLE, "MINIO_UPLOAD_FAILED": status.HTTP_503_SERVICE_UNAVAILABLE, "BAIDU_NOT_CONFIGURED": status.HTTP_503_SERVICE_UNAVAILABLE, } st = status_map.get(exc.code, status.HTTP_500_INTERNAL_SERVER_ERROR) raise HTTPException( status_code=st, detail=_pipeline_error_detail(exc, surgery_id), ) from exc async def _call_recording_with_retries( factory: Callable[[], Awaitable[None]], *, max_attempts: int, delay_seconds: float, log_prefix: str, ) -> None: """录制相关操作失败时按配置重试;全部失败后抛出最后一次错误(message 会附带重试说明)。""" last_exc: SurgeryPipelineError | None = None for attempt in range(1, max_attempts + 1): try: await factory() return except SurgeryPipelineError as exc: last_exc = exc if attempt < max_attempts: logger.warning( "{} attempt {}/{} failed ({}), retrying in {}s", log_prefix, attempt, max_attempts, exc.code, delay_seconds, ) await asyncio.sleep(delay_seconds) if last_exc is None: return raise SurgeryPipelineError( last_exc.code, f"{last_exc.message}(已重试 {max_attempts} 次仍失败)", ) from last_exc @router.get("/health", response_model=HealthResponse, tags=["health"]) async def health() -> HealthResponse | JSONResponse: logger.debug("Health check") try: await check_database() except SQLAlchemyError as exc: logger.warning("Health check: database unavailable: {}", exc) return JSONResponse( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, content={"status": "degraded", "database": "unavailable"}, ) return HealthResponse(status="ok", database="connected") @router.get( "/internal/demo/recording-modes-status", tags=["demo"], summary="Demo 录制模式(链路 2/3)是否可用", description="供 demo 页探测;始终注册,不依赖 DEMO_ORCHESTRATOR_ENABLED。", ) async def recording_modes_status() -> dict: f = (settings.or_site_config_json_file or "").strip() enabled = bool(settings.demo_orchestrator_enabled) return { "demo_recording_modes_enabled": enabled, "orchestrator_enabled": enabled, "simulated_start_method": "POST", "simulated_start_path": "/internal/demo/simulated-start", "offline_batch_method": "POST", "offline_batch_path": "/internal/demo/offline-batch", "or_site_config_json_file_set": bool(f), "or_site_config_json_file": f or None, "orchestrator_rtsp_port": settings.demo_orchestrator_rtsp_port, "orchestrator_rtsp_json_host": settings.demo_orchestrator_rtsp_json_host, "hls_preview_port": settings.demo_hls_preview_port, "hls_preview_upstream": (settings.demo_hls_preview_upstream or "").strip() or None, } class HlsPreviewEnsureRequest(BaseModel): camera_ids: list[str] = Field(min_length=1, description="需要预览的 camera_id 列表(仅链路 1 真 RTSP)") class HlsPreviewCameraInfo(BaseModel): camera_id: str playlist_url: str class HlsPreviewEnsureResponse(BaseModel): cameras: list[HlsPreviewCameraInfo] hls_preview_port: int def _hls_playlist_proxy_path(camera_id: str) -> str: return f"/internal/demo/hls-preview/{camera_id}/index.m3u8" def _rewrite_hls_playlist(body: bytes, *, camera_id: str, proxy_origin: str) -> bytes: sess = HlsPreviewManager.active() if sess is None: return body try: text = body.decode("utf-8") except UnicodeDecodeError: return body if "#EXTM3U" not in text: return body upstream = sess.camera_upstream_base(camera_id) proxy_base = f"{proxy_origin.rstrip('/')}/internal/demo/hls-preview/{camera_id}" text = text.replace(upstream, proxy_base) return text.encode("utf-8") @router.post( "/internal/demo/hls-preview/ensure", response_model=HlsPreviewEnsureResponse, tags=["demo"], summary="Demo:启动/登记 HLS 预览(MediaMTX)", ) async def hls_preview_ensure( payload: HlsPreviewEnsureRequest, request: Request, ) -> HlsPreviewEnsureResponse: if not (settings.or_site_config_json_file or "").strip(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="OR_SITE_CONFIG_JSON_FILE is not set", ) hls_port = int(settings.demo_hls_preview_port) upstream_url = (settings.demo_hls_preview_upstream or "").strip() resolver = BackendResolver(settings, hikvision_runtime=None) sources: dict[str, str] = {} for cid in payload.camera_ids: cam = cid.strip() if not cam: continue try: sources[cam] = resolver.rtsp_url_for_camera(cam) except ValueError as exc: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=str(exc), ) from exc if not sources: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail="camera_ids is empty", ) def _start() -> None: HlsPreviewManager.start( sources=sources, upstream_url=upstream_url, config_dir=settings.demo_hls_preview_config_dir, attached_container_name=settings.demo_hls_preview_container_name, ephemeral_access_host=settings.demo_hls_preview_host, ephemeral_port=hls_port, ) try: await asyncio.to_thread(_start) except Exception as exc: logger.exception("HLS preview start failed: {}", exc) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=f"HLS preview start failed: {exc}", ) from exc sess = HlsPreviewManager.active() if sess is None: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="HLS preview session not available", ) origin = str(request.base_url).rstrip("/") cameras: list[HlsPreviewCameraInfo] = [] for cid in payload.camera_ids: cam = cid.strip() if cam not in sess.path_by_camera: continue cameras.append( HlsPreviewCameraInfo( camera_id=cam, playlist_url=origin + _hls_playlist_proxy_path(cam), ) ) if not cameras: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="no HLS preview paths for requested camera_ids", ) return HlsPreviewEnsureResponse(cameras=cameras, hls_preview_port=hls_port) @router.delete( "/internal/demo/hls-preview/stop", tags=["demo"], summary="Demo:停止 HLS 预览 MediaMTX(拉流模式)", ) async def hls_preview_stop() -> dict[str, str]: def _stop() -> None: HlsPreviewManager.stop() await asyncio.to_thread(_stop) return {"status": "stopped"} @router.get( "/internal/demo/hls-preview/{camera_id}/{path:path}", tags=["demo"], summary="Demo:反代 MediaMTX HLS(m3u8 / ts)", ) async def hls_preview_proxy( camera_id: Annotated[str, Path(min_length=1, max_length=64)], path: str, request: Request, ) -> Response: sess = HlsPreviewManager.active() if sess is None or camera_id not in sess.path_by_camera: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="HLS preview not active for this camera_id", ) subpath = path.strip() or "index.m3u8" try: upstream = HlsPreviewManager.resolve_upstream_url(camera_id, subpath) except KeyError as exc: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="unknown camera") from exc def _fetch() -> tuple[bytes, str]: return fetch_hls_upstream(upstream) try: body, ctype = await asyncio.to_thread(_fetch) except Exception as exc: logger.warning("HLS proxy fetch failed {}: {}", upstream, exc) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=f"HLS upstream fetch failed: {exc}", ) from exc if subpath.endswith(".m3u8") or "mpegurl" in ctype: body = _rewrite_hls_playlist( body, camera_id=camera_id, proxy_origin=str(request.base_url).rstrip("/"), ) ctype = "application/vnd.apple.mpegurl" return Response( content=body, media_type=ctype, headers={"Cache-Control": "no-store", "Access-Control-Allow-Origin": "*"}, ) @router.get( "/internal/demo/rtsp-preview/{camera_id}/frame.jpg", tags=["demo"], summary="Demo:抓取一路 RTSP 当前帧(JPEG)", description=( "供联调台在链路 1/2 中预览各 camera_id 画面;从 OR_SITE_CONFIG_JSON_FILE 解析 RTSP URL。" "浏览器无法直接播放 RTSP,故由服务端拉流并返回单帧 JPEG。" ), responses={ status.HTTP_404_NOT_FOUND: {"description": "无该 camera_id 的 RTSP 映射"}, status.HTTP_503_SERVICE_UNAVAILABLE: {"description": "拉流或编码失败"}, }, ) async def rtsp_preview_frame( camera_id: Annotated[ str, Path( min_length=1, max_length=64, pattern=r"^[a-zA-Z0-9][a-zA-Z0-9._-]*$", description="站点 JSON video_rtsp_urls 中的 camera_id。", ), ], ) -> Response: if not (settings.or_site_config_json_file or "").strip(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="OR_SITE_CONFIG_JSON_FILE is not set", ) resolver = BackendResolver(settings, hikvision_runtime=None) try: rtsp_url = resolver.rtsp_url_for_camera(camera_id) except ValueError as exc: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=str(exc), ) from exc def _capture() -> bytes: return capture_rtsp_jpeg_frame(rtsp_url) try: jpeg = await asyncio.to_thread(_capture) except Exception as exc: logger.warning("rtsp preview failed camera_id={}: {}", camera_id, exc) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=f"rtsp preview failed: {exc}", ) from exc return Response( content=jpeg, media_type="image/jpeg", headers={"Cache-Control": "no-store"}, ) @router.post( "/client/surgeries/start", response_model=SurgeryApiResponse, responses={ status.HTTP_503_SERVICE_UNAVAILABLE: { "description": ("未能在确认摄像头已开始录制后完成请求;录制子系统未就绪、开录未确认或发生故障。"), "model": SurgeryClientErrorResponse, }, }, tags=["client"], summary="开始手术", description=( "开始一台手术:服务端启动关联摄像头录制。仅在确认开录完成后返回 HTTP 200;否则按配置重试,仍失败则返回 503。" ), ) async def start_surgery( payload: SurgeryStartRequest, pipeline: Annotated[SurgeryPipeline, Depends(get_surgery_pipeline)], voice_hub: Annotated[VoiceTerminalHub, Depends(get_voice_terminal_hub)], ) -> SurgeryApiResponse: logger.info( "Start surgery: surgery_id={}, cameras={}, candidates={}", payload.surgery_id, payload.camera_ids, payload.candidate_consumables, ) accepted: SurgeryApiResponse | None = None async def _start() -> None: nonlocal accepted accepted = await accept_live_recording( pipeline, voice_hub, surgery_id=payload.surgery_id, camera_ids=list(payload.camera_ids), candidate_consumables=list(payload.candidate_consumables), message="摄像头录制已开始,手术已启动。", ) try: await _call_recording_with_retries( _start, max_attempts=bp.SURGERY_RECORDING_MAX_ATTEMPTS, delay_seconds=bp.SURGERY_RECORDING_RETRY_DELAY_SECONDS, log_prefix=f"Start surgery {payload.surgery_id}", ) except SurgeryPipelineError as exc: _raise_surgery_pipeline_http(exc, payload.surgery_id) assert accepted is not None return accepted @router.post( "/client/surgeries/end", response_model=SurgeryApiResponse, responses={ status.HTTP_503_SERVICE_UNAVAILABLE: { "description": ("未能在确认摄像头已全部停止录制后完成请求;停录未确认、录制子系统未就绪或发生故障。"), "model": SurgeryClientErrorResponse, }, }, tags=["client"], summary="结束手术", description=( "结束一台手术:服务端停止关联摄像头录制。仅在确认停录完成后返回 HTTP 200;否则按配置重试,仍失败则返回 503。" ), ) async def end_surgery( payload: SurgeryEndRequest, pipeline: Annotated[SurgeryPipeline, Depends(get_surgery_pipeline)], voice_hub: Annotated[VoiceTerminalHub, Depends(get_voice_terminal_hub)], ) -> SurgeryApiResponse: logger.info("End surgery: surgery_id={}", payload.surgery_id) voice_terminal_id: str | None = None try: async def _stop() -> None: nonlocal voice_terminal_id voice_terminal_id = await pipeline.stop_recording(payload.surgery_id) await _call_recording_with_retries( _stop, max_attempts=bp.SURGERY_RECORDING_MAX_ATTEMPTS, delay_seconds=bp.SURGERY_RECORDING_RETRY_DELAY_SECONDS, log_prefix=f"End surgery {payload.surgery_id}", ) except SurgeryPipelineError as exc: _raise_surgery_pipeline_http(exc, payload.surgery_id) if voice_terminal_id: await voice_hub.notify_end(voice_terminal_id, payload.surgery_id) return SurgeryApiResponse( surgery_id=payload.surgery_id, status="accepted", message="摄像头录制已停止,手术已结束。", ) @router.get( "/client/voice-terminals/{terminal_id}/assignment", response_model=VoiceTerminalAssignmentResponse, tags=["client"], summary="查询语音终端当前指派的手术", description="供桌面客户端在 WebSocket 不可用时的轮询兜底;与 WS 推送的 assignment 状态一致。", ) async def get_voice_terminal_assignment( terminal_id: Annotated[str, Path(min_length=1, max_length=256)], hub: Annotated[VoiceTerminalHub, Depends(get_voice_terminal_hub)], ) -> VoiceTerminalAssignmentResponse: tid = terminal_id.strip() return VoiceTerminalAssignmentResponse( voice_terminal_id=tid, active_surgery_id=hub.get_assignment(tid), ) @router.websocket("/client/voice-terminals/ws") async def voice_terminal_websocket( websocket: WebSocket, terminal_id: Annotated[str, Query(..., min_length=1, max_length=256)], ) -> None: container = websocket.app.state.container await container.voice_terminal_hub.handle_websocket(websocket, terminal_id) @router.get( "/client/surgeries/{surgery_id}/result", response_model=SurgeryResultResponse, responses={ status.HTTP_503_SERVICE_UNAVAILABLE: { "description": ("结果尚不可查询:无至少一条消耗明细,或手术未开始、未开录成功、尚无可查归档等。"), "model": SurgeryClientErrorResponse, }, }, tags=["client"], summary="查询手术结果", description=( "根据手术 6 位号查询该台手术的耗材消耗明细(多行)及按物品汇总。" "手术进行中返回当前内存已记账结果;结束后返回数据库持久化结果。" "若无至少一条消耗明细(含已归档但明细为空)、手术从未开始或尚无可查归档,返回 503。" "使用 GET:只读、幂等。\n\n" "响应体 `details` 与 `summary` 的字段定义见模式 SurgeryConsumptionDetail / SurgeryConsumptionSummary;" "若服务端启用耗材 TSV 文本日志,文件明细列为 tab 分隔的 " "item_id、item_name、qty、doctor_id、timestamp(文末另有仅三列的汇总块 item_id、item_name、qty)," "与 HTTP JSON 字段一致。" ), ) async def get_surgery_result( surgery_id: Annotated[ str, Path( min_length=6, max_length=6, pattern=r"^\d{6}$", description="手术 6 位号,仅允许 6 位数字。", ), ], pipeline: Annotated[SurgeryPipeline, Depends(get_surgery_pipeline)], ) -> SurgeryResultResponse: logger.info("Query surgery result: surgery_id={}", surgery_id) details = await pipeline.get_consumption_details_for_client(surgery_id) if not details: code, message = await pipeline.classify_result_unavailable(surgery_id) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail={ "code": code, "message": message, "surgery_id": surgery_id, }, ) return SurgeryResultResponse( surgery_id=surgery_id, status="completed", message="查询成功。", details=details, summary=build_consumption_summary(details), ) @router.get( "/client/surgeries/{surgery_id}/pending-confirmation", response_model=SurgeryPendingConfirmationResponse, responses={ status.HTTP_404_NOT_FOUND: { "description": "当前无待确认项或手术未在进行。", "model": SurgeryClientErrorResponse, }, status.HTTP_422_UNPROCESSABLE_CONTENT: { "description": "提示文本为空等导致无法合成播报。", "model": SurgeryClientErrorResponse, }, status.HTTP_503_SERVICE_UNAVAILABLE: { "description": "百度语音未配置或 TTS 调用失败。", "model": SurgeryClientErrorResponse, }, }, tags=["client"], summary="拉取待确认耗材(含 TTS 音频)", description=( "返回当前 FIFO 队首的一条低置信度识别;`pending_queue_length` 为仍排队中的 pending 条数(含本条)。" "响应内 `prompt_audio_mp3_base64` 为与 `prompt_text` 一致的 MP3(Base64),客户端可直接解码播放。" "无待确认项时返回 404;提示文本为空为 422;未配置百度或 TTS 失败为 503(不返回空音频兜底)。" "医生确认后请使用 `POST .../resolve` 上传 WAV。" ), ) async def get_pending_consumable_confirmation( surgery_id: Annotated[ str, Path( min_length=6, max_length=6, pattern=r"^\d{6}$", description="手术 6 位号,仅允许 6 位数字。", ), ], pipeline: Annotated[SurgeryPipeline, Depends(get_surgery_pipeline)], ) -> SurgeryPendingConfirmationResponse: try: payload = await pipeline.get_pending_confirmation_for_client(surgery_id) except SurgeryPipelineError as exc: _raise_confirmation_http(exc, surgery_id) if payload is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail={ "code": "NO_PENDING_CONFIRMATION", "message": "当前无待确认的耗材识别,或手术未在进行。", "surgery_id": surgery_id, }, ) return payload @router.post( "/client/surgeries/{surgery_id}/pending-confirmation/{confirmation_id}/resolve", response_model=SurgeryPendingConfirmationResolveResponse, responses={ status.HTTP_404_NOT_FOUND: {"model": SurgeryClientErrorResponse}, status.HTTP_409_CONFLICT: {"model": SurgeryClientErrorResponse}, status.HTTP_422_UNPROCESSABLE_CONTENT: {"model": SurgeryClientErrorResponse}, status.HTTP_503_SERVICE_UNAVAILABLE: {"model": SurgeryClientErrorResponse}, }, tags=["client"], summary="提交耗材确认结果(上传医生语音 WAV)", description=( "multipart/form-data 上传单个 WAV 文件(字段名 `audio`)。" "服务端将音频存入 MinIO、调用百度 ASR 识别、解析候选项并完成确认。" "解析并确认后记一条消耗明细;若语音表示否认全部候选则不记消耗。" "ASR/解析可重试失败时仍返回 HTTP 200,`status`=`failed`,队首待确认项不弹出,便于桌面端重试。" ), ) async def resolve_pending_consumable_confirmation( surgery_id: Annotated[ str, Path( min_length=6, max_length=6, pattern=r"^\d{6}$", description="手术 6 位号,仅允许 6 位数字。", ), ], confirmation_id: Annotated[str, Path(min_length=1, max_length=128)], audio: Annotated[ UploadFile, File( ..., description="医生语音 WAV 文件(建议 16kHz 单声道 PCM,其他格式将尝试 ffmpeg 转码)", ), ], pipeline: Annotated[SurgeryPipeline, Depends(get_surgery_pipeline)], ) -> SurgeryPendingConfirmationResolveResponse: raw = await audio.read() if not raw: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail={ "code": "VOICE_AUDIO_INVALID", "message": "音频文件为空。", "surgery_id": surgery_id, }, ) filename = (audio.filename or "voice.wav").strip() if not filename.lower().endswith(".wav"): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail={ "code": "VOICE_AUDIO_INVALID", "message": "仅支持上传 .wav 文件。", "surgery_id": surgery_id, }, ) try: result = await pipeline.resolve_pending_confirmation_from_audio( surgery_id=surgery_id, confirmation_id=confirmation_id, wav_bytes=raw, filename=filename, content_type=audio.content_type, ) except SurgeryPipelineError as exc: if exc.code in _RECOVERABLE_VOICE_RESOLVE_CODES: extra = exc.extra or {} asr_txt = extra.get("asr_text") akey = extra.get("audio_object_key") return SurgeryPendingConfirmationResolveResponse( surgery_id=surgery_id, confirmation_id=confirmation_id, status="failed", message=exc.message, resolved_label=None, rejected=False, asr_text=asr_txt if isinstance(asr_txt, str) else None, audio_object_key=akey if isinstance(akey, str) else None, error_code=exc.code, ) _raise_confirmation_http(exc, surgery_id) return SurgeryPendingConfirmationResolveResponse( surgery_id=surgery_id, confirmation_id=confirmation_id, status="accepted", message=result.message, resolved_label=result.resolved_label, rejected=result.rejected, asr_text=result.asr_text, audio_object_key=result.audio_object_key, error_code=None, )