import asyncio from collections.abc import Awaitable, Callable from typing import Annotated from fastapi import ( APIRouter, Depends, File, HTTPException, Path, Query, UploadFile, WebSocket, status, ) from fastapi.responses import JSONResponse from loguru import logger from sqlalchemy.exc import SQLAlchemyError from app.baked import pipeline as bp from app.config import settings from app.database import check_database from app.dependencies import get_surgery_pipeline, get_voice_terminal_hub from app.schemas import ( HealthResponse, SurgeryApiResponse, SurgeryClientErrorResponse, SurgeryEndRequest, SurgeryPendingConfirmationResolveResponse, SurgeryPendingConfirmationResponse, SurgeryResultResponse, SurgeryStartRequest, VoiceTerminalAssignmentResponse, build_consumption_summary, ) from app.services.surgery_pipeline import SurgeryPipeline from app.services.voice_terminal_hub import ( VoiceTerminalHub, assign_voice_terminal_after_recording_started, ) from app.surgery_errors import SurgeryPipelineError router = APIRouter() def _pipeline_error_detail(exc: SurgeryPipelineError, surgery_id: str) -> dict: d: dict = { "code": exc.code, "message": exc.message, "surgery_id": surgery_id, } if exc.extra: d.update(exc.extra) return d def _raise_surgery_pipeline_http(exc: SurgeryPipelineError, surgery_id: str) -> None: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=_pipeline_error_detail(exc, surgery_id), ) from exc def _raise_confirmation_http(exc: SurgeryPipelineError, surgery_id: str) -> None: status_map = { "CONFIRMATION_NOT_FOUND": status.HTTP_404_NOT_FOUND, "CONFIRMATION_NOT_ACTIVE": status.HTTP_404_NOT_FOUND, "CONFIRMATION_ALREADY_RESOLVED": status.HTTP_409_CONFLICT, "CONFIRMATION_INVALID": status.HTTP_422_UNPROCESSABLE_CONTENT, "VOICE_ASR_FAILED": status.HTTP_422_UNPROCESSABLE_CONTENT, "VOICE_PARSE_FAILED": status.HTTP_422_UNPROCESSABLE_CONTENT, "VOICE_TEXT_EMPTY": status.HTTP_422_UNPROCESSABLE_CONTENT, "TTS_TEXT_EMPTY": status.HTTP_422_UNPROCESSABLE_CONTENT, "TTS_ERROR": status.HTTP_503_SERVICE_UNAVAILABLE, "VOICE_AUDIO_INVALID": status.HTTP_422_UNPROCESSABLE_CONTENT, "MINIO_NOT_CONFIGURED": status.HTTP_503_SERVICE_UNAVAILABLE, "MINIO_UPLOAD_FAILED": status.HTTP_503_SERVICE_UNAVAILABLE, "BAIDU_NOT_CONFIGURED": status.HTTP_503_SERVICE_UNAVAILABLE, } st = status_map.get(exc.code, status.HTTP_500_INTERNAL_SERVER_ERROR) raise HTTPException( status_code=st, detail=_pipeline_error_detail(exc, surgery_id), ) from exc async def _call_recording_with_retries( factory: Callable[[], Awaitable[None]], *, max_attempts: int, delay_seconds: float, log_prefix: str, ) -> None: """录制相关操作失败时按配置重试;全部失败后抛出最后一次错误(message 会附带重试说明)。""" last_exc: SurgeryPipelineError | None = None for attempt in range(1, max_attempts + 1): try: await factory() return except SurgeryPipelineError as exc: last_exc = exc if attempt < max_attempts: logger.warning( "{} attempt {}/{} failed ({}), retrying in {}s", log_prefix, attempt, max_attempts, exc.code, delay_seconds, ) await asyncio.sleep(delay_seconds) if last_exc is None: return raise SurgeryPipelineError( last_exc.code, f"{last_exc.message}(已重试 {max_attempts} 次仍失败)", ) from last_exc @router.get("/health", response_model=HealthResponse, tags=["health"]) async def health() -> HealthResponse | JSONResponse: logger.debug("Health check") try: await check_database() except SQLAlchemyError as exc: logger.warning("Health check: database unavailable: {}", exc) return JSONResponse( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, content={"status": "degraded", "database": "unavailable"}, ) return HealthResponse(status="ok", database="connected") @router.get( "/internal/demo/orchestrator-status", tags=["demo"], summary="一键联调接口是否可用", description="供 demo 页探测:是否启用 orchestrator、RTSP 文件配置等;此路由始终存在,不依赖 DEMO_ORCHESTRATOR_ENABLED。", ) async def demo_orchestrator_status() -> dict: f = (settings.or_site_config_json_file or "").strip() return { "orchestrator_enabled": bool(settings.demo_orchestrator_enabled), "orchestrate_method": "POST", "orchestrate_path": "/internal/demo/orchestrate-and-start", "or_site_config_json_file_set": bool(f), "or_site_config_json_file": f or None, "orchestrator_rtsp_port": settings.demo_orchestrator_rtsp_port, "orchestrator_rtsp_json_host": settings.demo_orchestrator_rtsp_json_host, } @router.post( "/client/surgeries/start", response_model=SurgeryApiResponse, responses={ status.HTTP_503_SERVICE_UNAVAILABLE: { "description": ( "未能在确认摄像头已开始录制后完成请求;" "录制子系统未就绪、开录未确认或发生故障。" ), "model": SurgeryClientErrorResponse, }, }, tags=["client"], summary="开始手术", description=( "开始一台手术:服务端启动关联摄像头录制。" "仅在确认开录完成后返回 HTTP 200;否则按配置重试,仍失败则返回 503。" ), ) async def start_surgery( payload: SurgeryStartRequest, pipeline: Annotated[SurgeryPipeline, Depends(get_surgery_pipeline)], voice_hub: Annotated[VoiceTerminalHub, Depends(get_voice_terminal_hub)], ) -> SurgeryApiResponse: logger.info( "Start surgery: surgery_id={}, cameras={}, candidates={}", payload.surgery_id, payload.camera_ids, payload.candidate_consumables, ) try: async def _start() -> None: await pipeline.start_recording( payload.surgery_id, payload.camera_ids, payload.candidate_consumables, ) await _call_recording_with_retries( _start, max_attempts=bp.SURGERY_RECORDING_MAX_ATTEMPTS, delay_seconds=bp.SURGERY_RECORDING_RETRY_DELAY_SECONDS, log_prefix=f"Start surgery {payload.surgery_id}", ) except SurgeryPipelineError as exc: _raise_surgery_pipeline_http(exc, payload.surgery_id) await assign_voice_terminal_after_recording_started( voice_hub, surgery_id=payload.surgery_id, camera_ids=list(payload.camera_ids), set_voice_terminal_id=pipeline.set_voice_terminal_id, ) return SurgeryApiResponse( surgery_id=payload.surgery_id, status="accepted", message="摄像头录制已开始,手术已启动。", ) @router.post( "/client/surgeries/end", response_model=SurgeryApiResponse, responses={ status.HTTP_503_SERVICE_UNAVAILABLE: { "description": ( "未能在确认摄像头已全部停止录制后完成请求;" "停录未确认、录制子系统未就绪或发生故障。" ), "model": SurgeryClientErrorResponse, }, }, tags=["client"], summary="结束手术", description=( "结束一台手术:服务端停止关联摄像头录制。" "仅在确认停录完成后返回 HTTP 200;否则按配置重试,仍失败则返回 503。" ), ) async def end_surgery( payload: SurgeryEndRequest, pipeline: Annotated[SurgeryPipeline, Depends(get_surgery_pipeline)], voice_hub: Annotated[VoiceTerminalHub, Depends(get_voice_terminal_hub)], ) -> SurgeryApiResponse: logger.info("End surgery: surgery_id={}", payload.surgery_id) voice_terminal_id: str | None = None try: async def _stop() -> None: nonlocal voice_terminal_id voice_terminal_id = await pipeline.stop_recording(payload.surgery_id) await _call_recording_with_retries( _stop, max_attempts=bp.SURGERY_RECORDING_MAX_ATTEMPTS, delay_seconds=bp.SURGERY_RECORDING_RETRY_DELAY_SECONDS, log_prefix=f"End surgery {payload.surgery_id}", ) except SurgeryPipelineError as exc: _raise_surgery_pipeline_http(exc, payload.surgery_id) if voice_terminal_id: await voice_hub.notify_end(voice_terminal_id, payload.surgery_id) return SurgeryApiResponse( surgery_id=payload.surgery_id, status="accepted", message="摄像头录制已停止,手术已结束。", ) @router.get( "/client/voice-terminals/{terminal_id}/assignment", response_model=VoiceTerminalAssignmentResponse, tags=["client"], summary="查询语音终端当前指派的手术", description="供桌面客户端在 WebSocket 不可用时的轮询兜底;与 WS 推送的 assignment 状态一致。", ) async def get_voice_terminal_assignment( terminal_id: Annotated[str, Path(min_length=1, max_length=256)], hub: Annotated[VoiceTerminalHub, Depends(get_voice_terminal_hub)], ) -> VoiceTerminalAssignmentResponse: tid = terminal_id.strip() return VoiceTerminalAssignmentResponse( voice_terminal_id=tid, active_surgery_id=hub.get_assignment(tid), ) @router.websocket("/client/voice-terminals/ws") async def voice_terminal_websocket( websocket: WebSocket, terminal_id: Annotated[str, Query(..., min_length=1, max_length=256)], ) -> None: container = websocket.app.state.container await container.voice_terminal_hub.handle_websocket(websocket, terminal_id) @router.get( "/client/surgeries/{surgery_id}/result", response_model=SurgeryResultResponse, responses={ status.HTTP_503_SERVICE_UNAVAILABLE: { "description": ( "结果尚不可查询:无至少一条消耗明细,或手术未开始、未开录成功、尚无可查归档等。" ), "model": SurgeryClientErrorResponse, }, }, tags=["client"], summary="查询手术结果", description=( "根据手术 6 位号查询该台手术的耗材消耗明细(多行)及按物品汇总。" "手术进行中返回当前内存已记账结果;结束后返回数据库持久化结果。" "若无至少一条消耗明细(含已归档但明细为空)、手术从未开始或尚无可查归档,返回 503。" "使用 GET:只读、幂等。\n\n" "响应体 `details` 与 `summary` 的字段定义见模式 SurgeryConsumptionDetail / SurgeryConsumptionSummary;" "若服务端启用耗材 TSV 文本日志,文件明细列为 tab 分隔的 " "item_id、item_name、qty、doctor_id、timestamp(文末另有仅三列的汇总块 item_id、item_name、qty)," "与 HTTP JSON 字段一致。" ), ) async def get_surgery_result( surgery_id: Annotated[ str, Path( min_length=6, max_length=6, pattern=r"^\d{6}$", description="手术 6 位号,仅允许 6 位数字。", ), ], pipeline: Annotated[SurgeryPipeline, Depends(get_surgery_pipeline)], ) -> SurgeryResultResponse: logger.info("Query surgery result: surgery_id={}", surgery_id) details = await pipeline.get_consumption_details_for_client(surgery_id) if not details: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail={ "code": "RESULT_NOT_READY", "message": ( "当前无该手术的可查询结果:手术未开始、未成功开录、尚无至少一条消耗明细," "或尚无可返回的数据。" ), "surgery_id": surgery_id, }, ) return SurgeryResultResponse( surgery_id=surgery_id, status="completed", message="查询成功。", details=details, summary=build_consumption_summary(details), ) @router.get( "/client/surgeries/{surgery_id}/pending-confirmation", response_model=SurgeryPendingConfirmationResponse, responses={ status.HTTP_404_NOT_FOUND: { "description": "当前无待确认项或手术未在进行。", "model": SurgeryClientErrorResponse, }, status.HTTP_422_UNPROCESSABLE_CONTENT: { "description": "提示文本为空等导致无法合成播报。", "model": SurgeryClientErrorResponse, }, status.HTTP_503_SERVICE_UNAVAILABLE: { "description": "百度语音未配置或 TTS 调用失败。", "model": SurgeryClientErrorResponse, }, }, tags=["client"], summary="拉取待确认耗材(含 TTS 音频)", description=( "返回当前 FIFO 队首的一条低置信度识别;" "响应内 `prompt_audio_mp3_base64` 为与 `prompt_text` 一致的 MP3(Base64),客户端可直接解码播放。" "无待确认项时返回 404;提示文本为空为 422;未配置百度或 TTS 失败为 503(不返回空音频兜底)。" "医生确认后请使用 `POST .../resolve` 上传 WAV。" ), ) async def get_pending_consumable_confirmation( surgery_id: Annotated[ str, Path( min_length=6, max_length=6, pattern=r"^\d{6}$", description="手术 6 位号,仅允许 6 位数字。", ), ], pipeline: Annotated[SurgeryPipeline, Depends(get_surgery_pipeline)], ) -> SurgeryPendingConfirmationResponse: try: payload = await pipeline.get_pending_confirmation_for_client(surgery_id) except SurgeryPipelineError as exc: _raise_confirmation_http(exc, surgery_id) if payload is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail={ "code": "NO_PENDING_CONFIRMATION", "message": "当前无待确认的耗材识别,或手术未在进行。", "surgery_id": surgery_id, }, ) return payload @router.post( "/client/surgeries/{surgery_id}/pending-confirmation/{confirmation_id}/resolve", response_model=SurgeryPendingConfirmationResolveResponse, responses={ status.HTTP_404_NOT_FOUND: {"model": SurgeryClientErrorResponse}, status.HTTP_409_CONFLICT: {"model": SurgeryClientErrorResponse}, status.HTTP_422_UNPROCESSABLE_CONTENT: {"model": SurgeryClientErrorResponse}, status.HTTP_503_SERVICE_UNAVAILABLE: {"model": SurgeryClientErrorResponse}, }, tags=["client"], summary="提交耗材确认结果(上传医生语音 WAV)", description=( "multipart/form-data 上传单个 WAV 文件(字段名 `audio`)。" "服务端将音频存入 MinIO、调用百度 ASR 识别、解析候选项并完成确认。" "解析并确认后记一条消耗明细;若语音表示否认全部候选则不记消耗。" ), ) async def resolve_pending_consumable_confirmation( surgery_id: Annotated[ str, Path( min_length=6, max_length=6, pattern=r"^\d{6}$", description="手术 6 位号,仅允许 6 位数字。", ), ], confirmation_id: Annotated[str, Path(min_length=1, max_length=128)], audio: Annotated[ UploadFile, File( ..., description="医生语音 WAV 文件(建议 16kHz 单声道 PCM,其他格式将尝试 ffmpeg 转码)", ), ], pipeline: Annotated[SurgeryPipeline, Depends(get_surgery_pipeline)], ) -> SurgeryPendingConfirmationResolveResponse: raw = await audio.read() if not raw: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail={ "code": "VOICE_AUDIO_INVALID", "message": "音频文件为空。", "surgery_id": surgery_id, }, ) filename = (audio.filename or "voice.wav").strip() if not filename.lower().endswith(".wav"): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail={ "code": "VOICE_AUDIO_INVALID", "message": "仅支持上传 .wav 文件。", "surgery_id": surgery_id, }, ) try: result = await pipeline.resolve_pending_confirmation_from_audio( surgery_id=surgery_id, confirmation_id=confirmation_id, wav_bytes=raw, filename=filename, content_type=audio.content_type, ) except SurgeryPipelineError as exc: _raise_confirmation_http(exc, surgery_id) return SurgeryPendingConfirmationResolveResponse( surgery_id=surgery_id, confirmation_id=confirmation_id, status="accepted", message=result.message, resolved_label=result.resolved_label, rejected=result.rejected, asr_text=result.asr_text, audio_object_key=result.audio_object_key, )