diff --git a/backend/app/api.py b/backend/app/api.py index 71d8679..7fc1fb7 100644 --- a/backend/app/api.py +++ b/backend/app/api.py @@ -319,13 +319,12 @@ async def get_surgery_result( logger.info("Query surgery result: surgery_id={}", surgery_id) details = await pipeline.get_consumption_details_for_client(surgery_id) if not details: + code, message = await pipeline.classify_result_unavailable(surgery_id) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail={ - "code": "RESULT_NOT_READY", - "message": ( - "当前无该手术的可查询结果:手术未开始、未成功开录、尚无至少一条消耗明细,或尚无可返回的数据。" - ), + "code": code, + "message": message, "surgery_id": surgery_id, }, ) diff --git a/backend/app/routers/recording_demo.py b/backend/app/routers/recording_demo.py index 348559a..41d69b8 100644 --- a/backend/app/routers/recording_demo.py +++ b/backend/app/routers/recording_demo.py @@ -3,20 +3,27 @@ from __future__ import annotations import json +import time from pathlib import Path -from typing import Annotated +from typing import Annotated, Literal import anyio from fastapi import APIRouter, BackgroundTasks, Depends, File, Form, HTTPException, UploadFile, status from fastapi.responses import FileResponse from loguru import logger -from pydantic import BaseModel +from pydantic import BaseModel, Field from app.config import settings from app.consumable_catalog import normalize_candidate_consumables_raw from app.dependencies import get_surgery_pipeline, get_voice_terminal_hub from app.schemas import SurgeryApiResponse, SurgeryStartRequest from app.services.recording_live import accept_live_recording +from app.services.offline_batch_timing import ( + get_timing, + mark_video_failed, + mark_video_ready, + set_text_timing, +) from app.services.simulated_rtsp_setup import ( prepare_simulated_rtsp_streams, read_simulated_stream_uploads, @@ -71,13 +78,19 @@ def _background_finalize_visualization( digest: str, candidate_key: str, ) -> None: + t0 = time.monotonic() try: - runner.finalize_visualization( + vis = runner.finalize_visualization( surgery_id=surgery_id, video_path=video_path, result_path=result_path, ) + if vis is not None: + mark_video_ready(surgery_id=surgery_id, video_duration_sec=time.monotonic() - t0) + else: + mark_video_failed(surgery_id=surgery_id) except Exception: + mark_video_failed(surgery_id=surgery_id) logger.exception("offline batch visualization failed surgery_id={}", surgery_id) finally: purge_batch_artifacts( @@ -105,6 +118,20 @@ class OfflineBatchResponse(BaseModel): doctor_name: str | None = None doctor_id: str | None = None doctor_display: str | None = None + text_duration_sec: float = Field(description="文本结果(main.py / TSV)耗时,秒。") + video_duration_sec: float | None = Field( + default=None, + description="标注视频耗时(秒);未勾选生成或仍在后台时为 null,请轮询 timing 接口。", + ) + total_duration_sec: float = Field(description="文本耗时 + 已完成视频耗时(秒)。") + + +class OfflineBatchTimingResponse(BaseModel): + surgery_id: str + text_duration_sec: float + video_duration_sec: float | None = None + total_duration_sec: float + video_status: Literal["skipped", "pending", "ready", "failed"] @router.post( @@ -169,6 +196,7 @@ async def offline_batch( detail=f"failed to save upload: {exc}", ) from exc + text_t0 = time.monotonic() try: result = await anyio.to_thread.run_sync( lambda: runner.run( @@ -185,6 +213,12 @@ async def offline_batch( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=f"offline batch failed: {exc}", ) from exc + text_duration_sec = time.monotonic() - text_t0 + set_text_timing( + surgery_id=surgery_id, + text_duration_sec=text_duration_sec, + include_video=include_visualization, + ) await pipeline.save_video_batch_result(surgery_id, result.details) logger.info( @@ -222,6 +256,10 @@ async def offline_batch( vis_suffix = "" if include_visualization: vis_suffix = ";标注视频后台生成中(完成后刷新 visualization URL,24 小时内有效)" + timing = get_timing(surgery_id) + text_sec = timing.text_duration_sec if timing is not None else text_duration_sec + video_sec = timing.video_duration_sec if timing is not None else None + total_sec = timing.total_duration_sec if timing is not None else text_sec return OfflineBatchResponse( surgery_id=surgery_id, status="accepted", @@ -234,6 +272,36 @@ async def offline_batch( doctor_name=doctor.doctor_name if doctor is not None else None, doctor_id=doctor.doctor_id if doctor is not None else None, doctor_display=doctor.display if doctor is not None else None, + text_duration_sec=round(text_sec, 3), + video_duration_sec=round(video_sec, 3) if video_sec is not None else None, + total_duration_sec=round(total_sec, 3), + ) + + +@router.get( + "/offline-batch/{surgery_id}/timing", + response_model=OfflineBatchTimingResponse, + summary="链路 3:查询离线 batch 各阶段耗时", +) +async def offline_batch_timing(surgery_id: str) -> OfflineBatchTimingResponse: + _require_demo_orchestrator() + if len(surgery_id) != 6 or not surgery_id.isdigit(): + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, + detail="surgery_id must be exactly 6 digits", + ) + rec = get_timing(surgery_id) + if rec is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="offline batch timing not found for this surgery_id", + ) + return OfflineBatchTimingResponse( + surgery_id=surgery_id, + text_duration_sec=round(rec.text_duration_sec, 3), + video_duration_sec=round(rec.video_duration_sec, 3) if rec.video_duration_sec is not None else None, + total_duration_sec=round(rec.total_duration_sec, 3), + video_status=rec.video_status, ) diff --git a/backend/app/schemas.py b/backend/app/schemas.py index 7d42e79..8b27dd8 100644 --- a/backend/app/schemas.py +++ b/backend/app/schemas.py @@ -84,7 +84,12 @@ class VoiceTerminalAssignmentResponse(BaseModel): class SurgeryClientErrorDetail(BaseModel): """与 `HTTPException(detail={...})` 对应;最终 JSON 为 `{"detail": {...}}`。""" - code: str = Field(description="业务错误码,如 RECORDING_CANNOT_START、RECORDING_NOT_STOPPED、RESULT_NOT_READY。") + code: str = Field( + description=( + "业务错误码,如 SURGERY_ALREADY_RECORDING、SURGERY_NOT_STARTED、" + "SURGERY_IN_PROGRESS_NO_DETAILS、SURGERY_ENDED_NO_CONSUMPTION、RECORDING_CANNOT_START。" + ) + ) message: str = Field(description="人类可读说明。") surgery_id: str = Field(description="手术 6 位号。") diff --git a/backend/app/services/offline_batch_timing.py b/backend/app/services/offline_batch_timing.py new file mode 100644 index 0000000..089b372 --- /dev/null +++ b/backend/app/services/offline_batch_timing.py @@ -0,0 +1,72 @@ +"""离线 batch 各阶段耗时(进程内缓存,供 demo 客户端轮询)。""" + +from __future__ import annotations + +from dataclasses import dataclass +from threading import Lock +from typing import Literal + +VideoStatus = Literal["skipped", "pending", "ready", "failed"] + + +@dataclass +class OfflineBatchTimingRecord: + surgery_id: str + text_duration_sec: float + video_status: VideoStatus = "skipped" + video_duration_sec: float | None = None + + @property + def total_duration_sec(self) -> float: + total = self.text_duration_sec + if self.video_duration_sec is not None: + total += self.video_duration_sec + return total + + +_lock = Lock() +_records: dict[str, OfflineBatchTimingRecord] = {} + + +def set_text_timing(*, surgery_id: str, text_duration_sec: float, include_video: bool) -> None: + with _lock: + _records[surgery_id] = OfflineBatchTimingRecord( + surgery_id=surgery_id, + text_duration_sec=text_duration_sec, + video_status="pending" if include_video else "skipped", + ) + + +def mark_video_ready(*, surgery_id: str, video_duration_sec: float) -> None: + with _lock: + rec = _records.get(surgery_id) + if rec is None: + return + rec.video_status = "ready" + rec.video_duration_sec = video_duration_sec + + +def mark_video_failed(*, surgery_id: str) -> None: + with _lock: + rec = _records.get(surgery_id) + if rec is None: + return + rec.video_status = "failed" + + +def get_timing(surgery_id: str) -> OfflineBatchTimingRecord | None: + with _lock: + rec = _records.get(surgery_id) + if rec is None: + return None + return OfflineBatchTimingRecord( + surgery_id=rec.surgery_id, + text_duration_sec=rec.text_duration_sec, + video_status=rec.video_status, + video_duration_sec=rec.video_duration_sec, + ) + + +def clear_timing(surgery_id: str) -> None: + with _lock: + _records.pop(surgery_id, None) diff --git a/backend/app/services/surgery_pipeline.py b/backend/app/services/surgery_pipeline.py index 25cdc77..5b8efe9 100644 --- a/backend/app/services/surgery_pipeline.py +++ b/backend/app/services/surgery_pipeline.py @@ -101,6 +101,38 @@ class SurgeryPipeline: def set_voice_terminal_id(self, surgery_id: str, terminal_id: str | None) -> None: self._sessions.set_voice_terminal_id(surgery_id, terminal_id) + async def classify_result_unavailable(self, surgery_id: str) -> tuple[str, str]: + """无至少一条消耗明细时,区分未开始 / 进行中无结果 / 已结束无消耗等。""" + phase = self._sessions.active_recording_phase(surgery_id) + if phase == "starting": + return ( + "SURGERY_STARTING", + "手术正在启动,算法尚未就绪,请稍后再查。", + ) + if phase == "recording": + return ( + "SURGERY_IN_PROGRESS_NO_DETAILS", + "手术进行中,尚无至少一条消耗明细。", + ) + async with self._session_factory() as session: + async with session.begin(): + persisted = await self._repo.load_final_details(session, surgery_id) + if persisted is not None: + return ( + "SURGERY_ENDED_NO_CONSUMPTION", + "手术已结束,当前无消耗明细。", + ) + archived = await self._sessions.archived_consumption_fallback(surgery_id) + if archived is not None: + return ( + "SURGERY_ENDED_NO_CONSUMPTION", + "手术已结束(尚未落库),当前无消耗明细。", + ) + return ( + "SURGERY_NOT_STARTED", + "手术未开始,请先调用开始手术接口。", + ) + async def get_consumption_details_for_client( self, surgery_id: str, diff --git a/backend/app/services/video/session_manager.py b/backend/app/services/video/session_manager.py index d3569ea..5f2d79f 100644 --- a/backend/app/services/video/session_manager.py +++ b/backend/app/services/video/session_manager.py @@ -6,6 +6,7 @@ import sys import time from datetime import datetime, timezone from pathlib import Path +from typing import Literal from loguru import logger from sqlalchemy.ext.asyncio import async_sessionmaker @@ -116,7 +117,7 @@ class CameraSessionManager: ) -> None: if self._registry.has_active(surgery_id): raise SurgeryPipelineError( - "RECORDING_CANNOT_START", + "SURGERY_ALREADY_RECORDING", "该手术已在录制中,请勿重复开始。", ) stale = await self._archive.take_archived_details(surgery_id) @@ -536,6 +537,9 @@ class CameraSessionManager: url = self._resolver.rtsp_url_after_hikvision_login(camera_id) return url, login.user_id, True + def active_recording_phase(self, surgery_id: str) -> Literal["starting", "recording"] | None: + return self._registry.active_recording_phase(surgery_id) + def live_consumption_if_active(self, surgery_id: str) -> list[SurgeryConsumptionStored] | None: return self._registry.live_consumption_if_active(surgery_id) diff --git a/backend/app/services/video/session_registry.py b/backend/app/services/video/session_registry.py index 156bd73..41fc91a 100644 --- a/backend/app/services/video/session_registry.py +++ b/backend/app/services/video/session_registry.py @@ -129,6 +129,15 @@ class SurgerySessionRegistry: def has_active(self, surgery_id: str) -> bool: return surgery_id in self._active + def active_recording_phase(self, surgery_id: str) -> Literal["starting", "recording"] | None: + """活跃会话阶段:算法未就绪为 starting,已 ready 为 recording。""" + run = self._active.get(surgery_id) + if run is None: + return None + if not run.state.ready.is_set(): + return "starting" + return "recording" + def get_running(self, surgery_id: str) -> RunningSurgery | None: return self._active.get(surgery_id) diff --git a/backend/tests/test_api_contract.py b/backend/tests/test_api_contract.py index 9d21fc5..a2c5ccb 100644 --- a/backend/tests/test_api_contract.py +++ b/backend/tests/test_api_contract.py @@ -300,21 +300,27 @@ def test_get_result_200(api_app: FastAPI) -> None: def test_get_result_503_not_ready(api_app: FastAPI) -> None: pipeline = MagicMock() pipeline.get_consumption_details_for_client = AsyncMock(return_value=None) + pipeline.classify_result_unavailable = AsyncMock( + return_value=("SURGERY_NOT_STARTED", "手术未开始,请先调用开始手术接口。") + ) api_app.dependency_overrides[get_surgery_pipeline] = lambda: pipeline client = TestClient(api_app) r = client.get("/client/surgeries/123456/result") assert r.status_code == 503 - assert r.json()["detail"]["code"] == "RESULT_NOT_READY" + assert r.json()["detail"]["code"] == "SURGERY_NOT_STARTED" def test_get_result_503_empty_details(api_app: FastAPI) -> None: pipeline = MagicMock() pipeline.get_consumption_details_for_client = AsyncMock(return_value=[]) + pipeline.classify_result_unavailable = AsyncMock( + return_value=("SURGERY_ENDED_NO_CONSUMPTION", "手术已结束,当前无消耗明细。") + ) api_app.dependency_overrides[get_surgery_pipeline] = lambda: pipeline client = TestClient(api_app) r = client.get("/client/surgeries/123456/result") assert r.status_code == 503 - assert r.json()["detail"]["code"] == "RESULT_NOT_READY" + assert r.json()["detail"]["code"] == "SURGERY_ENDED_NO_CONSUMPTION" def test_pending_confirmation_200_and_404(api_app: FastAPI) -> None: diff --git a/backend/tests/test_app_integration.py b/backend/tests/test_app_integration.py index 1d36325..cbf83bc 100644 --- a/backend/tests/test_app_integration.py +++ b/backend/tests/test_app_integration.py @@ -58,7 +58,7 @@ class _StubCameraSessionManager: from app.surgery_errors import SurgeryPipelineError raise SurgeryPipelineError( - "RECORDING_CANNOT_START", + "SURGERY_ALREADY_RECORDING", "该手术已在录制中,请勿重复开始。", ) state = SurgerySessionState( @@ -282,7 +282,7 @@ def test_full_flow_start_pending_resolve_end_result( def test_result_not_ready_before_start(integration_client: TestClient) -> None: r = integration_client.get("/client/surgeries/999999/result") assert r.status_code == 503 - assert r.json()["detail"]["code"] == "RESULT_NOT_READY" + assert r.json()["detail"]["code"] == "SURGERY_NOT_STARTED" def test_health_endpoint_ok_via_real_app(integration_client: TestClient) -> None: diff --git a/backend/tests/test_result_unavailable.py b/backend/tests/test_result_unavailable.py new file mode 100644 index 0000000..83dbd7f --- /dev/null +++ b/backend/tests/test_result_unavailable.py @@ -0,0 +1,54 @@ +"""查询结果不可用时的原因分类。""" + +from __future__ import annotations + +import asyncio +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from app.services.surgery_pipeline import SurgeryPipeline +from app.services.video.session_manager import CameraSessionManager, RunningSurgery, SurgerySessionState + + +@pytest.mark.asyncio +async def test_classify_not_started() -> None: + sessions = MagicMock(spec=CameraSessionManager) + sessions.active_recording_phase = MagicMock(return_value=None) + sessions.archived_consumption_fallback = AsyncMock(return_value=None) + repo = MagicMock() + repo.load_final_details = AsyncMock(return_value=None) + pipeline = SurgeryPipeline(sessions, result_repository=repo, voice_confirmation=MagicMock()) + code, _ = await pipeline.classify_result_unavailable("123456") + assert code == "SURGERY_NOT_STARTED" + + +@pytest.mark.asyncio +async def test_classify_in_progress_no_details() -> None: + sessions = MagicMock(spec=CameraSessionManager) + sessions.active_recording_phase = MagicMock(return_value="recording") + pipeline = SurgeryPipeline(sessions, result_repository=MagicMock(), voice_confirmation=MagicMock()) + code, _ = await pipeline.classify_result_unavailable("123456") + assert code == "SURGERY_IN_PROGRESS_NO_DETAILS" + + +@pytest.mark.asyncio +async def test_classify_starting() -> None: + sessions = MagicMock(spec=CameraSessionManager) + sessions.active_recording_phase = MagicMock(return_value="starting") + pipeline = SurgeryPipeline(sessions, result_repository=MagicMock(), voice_confirmation=MagicMock()) + code, _ = await pipeline.classify_result_unavailable("123456") + assert code == "SURGERY_STARTING" + + +def test_active_recording_phase() -> None: + from app.services.video.session_registry import SurgerySessionRegistry + + reg = SurgerySessionRegistry() + st = SurgerySessionState(candidate_consumables=["纱布"]) + run = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[]) + reg._active["111111"] = run + assert reg.active_recording_phase("111111") == "starting" + st.ready.set() + assert reg.active_recording_phase("111111") == "recording" + assert reg.active_recording_phase("999999") is None diff --git a/clients/demo-client/app.js b/clients/demo-client/app.js index 572f179..f9ef6c5 100644 --- a/clients/demo-client/app.js +++ b/clients/demo-client/app.js @@ -72,6 +72,83 @@ return String(detail || body); } + const RESULT_UNAVAILABLE_LABELS = { + SURGERY_NOT_STARTED: "手术未开始", + SURGERY_STARTING: "手术启动中,算法尚未就绪", + SURGERY_IN_PROGRESS_NO_DETAILS: "手术进行中,尚无消耗明细", + SURGERY_ENDED_NO_CONSUMPTION: "手术已结束,无消耗明细", + SURGERY_ALREADY_RECORDING: "该手术已在录制中", + RESULT_NOT_READY: "结果尚不可查询", + }; + + function formatResultUnavailable(body) { + const detail = body?.detail; + if (detail && typeof detail === "object") { + const code = detail.code || ""; + const label = RESULT_UNAVAILABLE_LABELS[code]; + const msg = detail.message || ""; + if (label && msg) return `${label}:${msg}`; + if (label) return label; + if (msg) return msg; + } + return formatDetail(body); + } + + function formatDurationSec(sec) { + if (sec == null || Number.isNaN(Number(sec))) return "—"; + const n = Number(sec); + if (n < 60) return n.toFixed(2) + " 秒"; + const m = Math.floor(n / 60); + const s = (n % 60).toFixed(1); + return m + " 分 " + s + " 秒"; + } + + function showOfflineBatchTiming(textSec, videoSec, totalSec, videoStatus) { + const el = $("offline-batch-timing"); + if (!el) return; + if (textSec == null) { + el.textContent = ""; + el.classList.add("hidden"); + return; + } + let line = "耗时 · 文本结果:" + formatDurationSec(textSec); + if (videoStatus === "pending") { + line += ";标注视频:生成中…"; + } else if (videoStatus === "ready" && videoSec != null) { + line += ";标注视频:" + formatDurationSec(videoSec); + } else if (videoStatus === "failed") { + line += ";标注视频:生成失败"; + } else if (videoStatus === "skipped") { + line += ";标注视频:未生成"; + } + line += ";合计:" + formatDurationSec(totalSec); + el.textContent = line; + el.classList.remove("hidden"); + } + + async function pollOfflineBatchTiming(sid, includeVis) { + if (!includeVis) return; + const maxAttempts = 120; + for (let i = 0; i < maxAttempts; i++) { + await sleep(2000); + try { + const { res, body } = await apiJson("GET", `/internal/demo/offline-batch/${sid}/timing`); + if (!res.ok || !body) continue; + showOfflineBatchTiming( + body.text_duration_sec, + body.video_duration_sec, + body.total_duration_sec, + body.video_status, + ); + if (body.video_status === "ready" || body.video_status === "failed" || body.video_status === "skipped") { + return; + } + } catch { + /* ignore poll errors */ + } + } + } + const logEl = $("log-scroll"); function addLog(method, url, status, body, { error = false, hint = "" } = {}) { const item = document.createElement("div"); @@ -445,6 +522,19 @@ } }; showBanner("标注视频已就绪", "ok"); + try { + const { res: tr, body: tb } = await apiJson("GET", `/internal/demo/offline-batch/${sid}/timing`); + if (tr.ok && tb) { + showOfflineBatchTiming( + tb.text_duration_sec, + tb.video_duration_sec, + tb.total_duration_sec, + tb.video_status, + ); + } + } catch { + /* ignore */ + } return; } if (attempt === 0) { @@ -650,6 +740,16 @@ return; } lastVideoBatchDoctorDisplay = body?.doctor_display || ""; + const includeVis = $("offline-batch-include-vis")?.checked; + showOfflineBatchTiming( + body.text_duration_sec, + body.video_duration_sec, + body.total_duration_sec, + includeVis ? "pending" : "skipped", + ); + if (includeVis) { + void pollOfflineBatchTiming(sid, true); + } if ($("offline-batch-include-vis")?.checked && body?.visualization_url) { showVideoBatchVisualization(sid, body.visualization_url, lastVideoBatchDoctorDisplay); } else { @@ -694,7 +794,11 @@ "simulated-start", ); if (!res.ok) { - showBanner("模拟开录失败:" + formatDetail(body), "err"); + const dup = body?.detail?.code === "SURGERY_ALREADY_RECORDING"; + showBanner( + dup ? "请勿重复开始:" + formatResultUnavailable(body) : "模拟开录失败:" + formatDetail(body), + "err", + ); return; } showBanner("模拟开录已接受,请打开语音终端", "ok"); @@ -715,7 +819,11 @@ candidate_consumables: candidateConsumables, }); if (!res.ok) { - showBanner("开录失败:" + formatDetail(body), "err"); + const dup = body?.detail?.code === "SURGERY_ALREADY_RECORDING"; + showBanner( + dup ? "请勿重复开始:" + formatResultUnavailable(body) : "开录失败:" + formatDetail(body), + "err", + ); return; } showBanner("开录已接受,请打开语音终端", "ok"); @@ -756,7 +864,7 @@ if (!target) return; target.innerHTML = ""; if (!res.ok || !body || typeof body !== "object") { - showBanner(res.ok ? "无结果数据" : "查询失败:" + formatDetail(body), "err"); + showBanner(res.ok ? "无结果数据" : "查询失败:" + formatResultUnavailable(body), "err"); return; } const { details = [], summary = [] } = body; diff --git a/clients/demo-client/index.html b/clients/demo-client/index.html index b13fdc4..03834d7 100755 --- a/clients/demo-client/index.html +++ b/clients/demo-client/index.html @@ -191,6 +191,7 @@

结果

+