Files
operating-room-monitor-server/backend/app/routers/recording_demo.py
2026-05-22 11:17:20 +08:00

356 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Demo 录制模式:链路 2 模拟实时、链路 3 离线 batch需 DEMO_ORCHESTRATOR_ENABLED"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Annotated
import anyio
from fastapi import APIRouter, BackgroundTasks, Depends, File, Form, HTTPException, UploadFile, status
from fastapi.responses import FileResponse
from loguru import logger
from pydantic import BaseModel
from app.config import settings
from app.consumable_catalog import normalize_candidate_consumables_raw
from app.dependencies import get_surgery_pipeline, get_voice_terminal_hub
from app.schemas import SurgeryApiResponse, SurgeryStartRequest
from app.services.recording_live import accept_live_recording
from app.services.simulated_rtsp_setup import (
prepare_simulated_rtsp_streams,
read_simulated_stream_uploads,
)
from app.services.surgery_pipeline import SurgeryPipeline
from app.baked import pipeline as bp
from app.services.synthetic_rtsp import SyntheticRtspManager
from app.services.video_batch_cleanup import (
purge_batch_artifacts,
purge_expired_pipeline_inputs,
purge_expired_visualizations,
purge_surgery_batch_tree,
stage_visualization_pending,
)
from app.algo_host import BatchAlgorithmService
from app.services.voice_terminal_hub import VoiceTerminalHub
from app.surgery_errors import SurgeryPipelineError
router = APIRouter(prefix="/internal/demo", tags=["demo"])
# Grep in logs after restart to confirm new offline-batch code is loaded.
OFFLINE_BATCH_FLOW_MARKER = "offline-batch-v5"
def _require_demo_orchestrator() -> None:
if not settings.demo_orchestrator_enabled:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Demo recording modes disabled (set DEMO_ORCHESTRATOR_ENABLED=true).",
)
def _require_site_config_path() -> Path:
path_raw = (settings.or_site_config_json_file or "").strip()
if not path_raw:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(
"OR_SITE_CONFIG_JSON_FILE must be set to a writable path "
"(strict site JSON with video_rtsp_urls + voice_or_room_bindings); "
"in Docker, bind-mount a host file to this path."
),
)
return Path(path_raw).expanduser()
def _background_finalize_visualization(
runner: BatchAlgorithmService,
surgery_id: str,
) -> None:
try:
runner.finalize_visualization(surgery_id=surgery_id)
except Exception:
logger.exception("offline batch visualization failed surgery_id={}", surgery_id)
finally:
purge_expired_visualizations(
runner.root_dir,
ttl_hours=float(bp.VIDEO_BATCH_VIS_TTL_HOURS),
)
purge_expired_pipeline_inputs(
runner.root_dir,
ttl_hours=float(bp.VIDEO_BATCH_PIPELINE_INPUT_TTL_HOURS),
)
class OfflineBatchResponse(BaseModel):
surgery_id: str
status: str
message: str
visualization_url: str | None = None
doctor_name: str | None = None
doctor_id: str | None = None
doctor_display: str | None = None
@router.post(
"/offline-batch",
response_model=OfflineBatchResponse,
summary="链路 3非实时精确模式上传 MP4 + 可选标注视频)",
description=(
"仅当 DEMO_ORCHESTRATOR_ENABLED=true。不启动 RTSP 实时会话、不触发语音终端;"
"调用 algorithm_subprocesses/5.15 main.py解析 TSV 后写入最终结果。"
),
)
async def offline_batch(
background_tasks: BackgroundTasks,
surgery_id: Annotated[str, Form()],
video1: Annotated[UploadFile, File(description="单路完整 MP4")],
candidate_consumables_json: Annotated[str, Form()] = "[]",
include_visualization: Annotated[bool, Form()] = False,
pipeline: SurgeryPipeline = Depends(get_surgery_pipeline),
) -> OfflineBatchResponse:
_require_demo_orchestrator()
if len(surgery_id) != 6 or not surgery_id.isdigit():
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="surgery_id must be exactly 6 digits",
)
try:
candidates = json.loads(candidate_consumables_json)
except json.JSONDecodeError as exc:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail=f"invalid candidate_consumables_json: {exc}",
) from exc
if not isinstance(candidates, list):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="candidate_consumables_json must be a JSON array",
)
candidates = normalize_candidate_consumables_raw(candidates)
raw = await video1.read()
if not raw:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="video1 is empty",
)
logger.info(
"offline batch request surgery_id={} flow={} include_visualization={}",
surgery_id,
OFFLINE_BATCH_FLOW_MARKER,
include_visualization,
)
runner = BatchAlgorithmService()
suffix = Path(video1.filename or "video.mp4").suffix or ".mp4"
work_root = runner.root_dir / surgery_id / "upload"
work_root.mkdir(parents=True, exist_ok=True)
uploaded = work_root / f"upload{suffix}"
try:
uploaded.write_bytes(raw)
except OSError as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"failed to save upload: {exc}",
) from exc
try:
result = await anyio.to_thread.run_sync(
lambda: runner.run(
surgery_id=surgery_id,
uploaded_video_path=uploaded,
original_filename=video1.filename or "video.mp4",
candidate_consumables=candidates,
include_visualization=False,
)
)
except (FileNotFoundError, RuntimeError, OSError, ValueError) as exc:
logger.exception("offline batch failed surgery_id={}: {}", surgery_id, exc)
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=f"offline batch failed: {exc}",
) from exc
await pipeline.save_video_batch_result(surgery_id, result.details)
logger.info(
"offline batch result saved surgery_id={} rows={}",
surgery_id,
len(result.details),
)
cache_input = result.input_path
if include_visualization:
stage_visualization_pending(
runner.root_dir,
surgery_id,
source_mp4=cache_input,
result_tsv=result.output_path,
)
background_tasks.add_task(_background_finalize_visualization, runner, surgery_id)
purge_batch_artifacts(
runner.root_dir,
surgery_id,
digest=result.video_sha256,
candidate_key=result.candidate_cache_key,
)
purge_surgery_batch_tree(runner.root_dir, surgery_id)
visualization_url: str | None = None
if include_visualization:
visualization_url = f"/internal/demo/offline-batch/{surgery_id}/visualization"
doctor = result.doctor
doctor_suffix = ""
if doctor is not None and doctor.display:
doctor_suffix = f";医生={doctor.display}"
vis_suffix = ""
if include_visualization:
vis_suffix = ";标注视频后台生成中(完成后刷新 visualization URL24 小时内有效)"
return OfflineBatchResponse(
surgery_id=surgery_id,
status="accepted",
message=(
"非实时精确视频处理完成;"
f"rows={len(result.details)} cache={'hit' if result.reused_cache else 'miss'}"
f"{doctor_suffix}{vis_suffix}"
),
visualization_url=visualization_url,
doctor_name=doctor.doctor_name if doctor is not None else None,
doctor_id=doctor.doctor_id if doctor is not None else None,
doctor_display=doctor.display if doctor is not None else None,
)
@router.get(
"/offline-batch/{surgery_id}/visualization",
summary="链路 3获取离线 batch 生成的标注视频",
)
async def offline_batch_visualization(surgery_id: str) -> FileResponse:
_require_demo_orchestrator()
if len(surgery_id) != 6 or not surgery_id.isdigit():
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="surgery_id must be exactly 6 digits",
)
runner = BatchAlgorithmService()
path = runner.latest_visualization_path(surgery_id)
if path is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="offline batch visualization not found; run offline-batch first.",
)
return FileResponse(
path,
media_type="video/mp4",
filename=f"{surgery_id}_result_vis.mp4",
headers={"Accept-Ranges": "bytes", "Cache-Control": "no-cache"},
)
@router.post(
"/simulated-start",
response_model=SurgeryApiResponse,
summary="链路 2模拟实时上传 14 路视频并开录 + 语音)",
description=(
"仅当 DEMO_ORCHESTRATOR_ENABLED=true。合成假 RTSP 并写入 OR_SITE_CONFIG_JSON_FILE"
"再执行与 POST /client/surgeries/start 相同的实时开录与语音终端指派。"
),
)
async def simulated_start(
surgery_id: Annotated[str, Form()],
video1: Annotated[UploadFile, File(description="第 1 路视频(必填,至少一路)")],
video2: Annotated[UploadFile | None, File(description="第 2 路视频(可选)")] = None,
video3: Annotated[UploadFile | None, File(description="第 3 路视频(可选)")] = None,
video4: Annotated[UploadFile | None, File(description="第 4 路视频(可选)")] = None,
camera_1: Annotated[str, Form()] = "or-cam-01",
camera_2: Annotated[str, Form()] = "or-cam-02",
camera_3: Annotated[str, Form()] = "or-cam-03",
camera_4: Annotated[str, Form()] = "or-cam-04",
rtsp_path_1: Annotated[str, Form()] = "demo1",
rtsp_path_2: Annotated[str, Form()] = "demo2",
rtsp_path_3: Annotated[str, Form()] = "demo3",
rtsp_path_4: Annotated[str, Form()] = "demo4",
candidate_consumables_json: Annotated[str, Form()] = "[]",
pipeline: SurgeryPipeline = Depends(get_surgery_pipeline),
voice_hub: VoiceTerminalHub = Depends(get_voice_terminal_hub),
) -> SurgeryApiResponse:
_require_demo_orchestrator()
json_path = _require_site_config_path()
logger.info(
"simulated-start: surgery_id={} cameras={} rpaths={}",
surgery_id,
(camera_1, camera_2, camera_3, camera_4),
(rtsp_path_1, rtsp_path_2, rtsp_path_3, rtsp_path_4),
)
try:
candidates = json.loads(candidate_consumables_json)
except json.JSONDecodeError as exc:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail=f"invalid candidate_consumables_json: {exc}",
) from exc
if not isinstance(candidates, list):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="candidate_consumables_json must be a JSON array",
)
candidates = normalize_candidate_consumables_raw(candidates)
uploads = await read_simulated_stream_uploads(
video1=video1,
video2=video2,
video3=video3,
video4=video4,
camera_1=camera_1,
camera_2=camera_2,
camera_3=camera_3,
camera_4=camera_4,
rtsp_path_1=rtsp_path_1,
rtsp_path_2=rtsp_path_2,
rtsp_path_3=rtsp_path_3,
rtsp_path_4=rtsp_path_4,
)
try:
body = SurgeryStartRequest(
surgery_id=surgery_id,
camera_ids=[u.camera_id for u in uploads],
candidate_consumables=candidates,
)
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail=str(exc),
) from exc
try:
await prepare_simulated_rtsp_streams(
site_config_json_path=json_path,
uploads=uploads,
)
except HTTPException:
raise
except Exception as exc:
await anyio.to_thread.run_sync(SyntheticRtspManager.stop_active)
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=f"simulated RTSP setup failed: {exc}",
) from exc
try:
return await accept_live_recording(
pipeline,
voice_hub,
surgery_id=body.surgery_id,
camera_ids=list(body.camera_ids),
candidate_consumables=list(body.candidate_consumables),
message="假 RTSP 已起;映射已写入;摄像头录制已开始。",
)
except SurgeryPipelineError as exc:
await anyio.to_thread.run_sync(SyntheticRtspManager.stop_active)
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail={"code": exc.code, "message": exc.message, "surgery_id": body.surgery_id},
) from exc