Files
operating-room-monitor-server/backend/app/algorithm_ipc/schema.py
Kevin 1af442481e 重组为 backend/clients/docs 三层结构,并清理 git 污染。
将后端迁入 backend/,完善根目录 .gitignore,删除误提交的 .mypy_cache 缓存文件。

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-21 16:02:25 +08:00

82 lines
2.7 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""算法 IPC白名单 JSON + JSON Lines 事件。
事件类型type 字段):
- ``ready``:子进程已打开流、模型就绪。
- ``segment_confirmed``:自动记账段(高置信且 top1 在候选内)。
- ``needs_voice_confirm``:需语音/客户端确认,带 `confirmation_id` 与 topK 选项。
- ``done``:正常结束(停录或流结束)。
- ``error``不可恢复错误payload 含 ``message``。
"""
from __future__ import annotations
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Literal
from app.consumable_catalog import normalize_candidate_consumables_raw
EventType = Literal["ready", "segment_confirmed", "needs_voice_confirm", "done", "error"]
@dataclass
class WhitelistSpec:
"""写入 `--whitelist-json` 的结构(与 HTTP 开录候选一致)。"""
candidate_consumables: list[str]
name_to_id: dict[str, str]
@classmethod
def from_session(cls, candidate_consumables: list[str], name_to_id: dict[str, str]) -> WhitelistSpec:
return cls(
candidate_consumables=list(candidate_consumables),
name_to_id=dict(name_to_id),
)
def to_json_obj(self) -> dict[str, Any]:
return {
"schema_version": 1,
"candidate_consumables": self.candidate_consumables,
"name_to_id": self.name_to_id,
}
@classmethod
def load_path(cls, path: Path) -> WhitelistSpec:
raw = path.read_text(encoding="utf-8")
data = json.loads(raw)
if not isinstance(data, dict):
raise ValueError("whitelist json must be object")
cand = data.get("candidate_consumables") or []
n2id = data.get("name_to_id") or {}
if not isinstance(cand, list) or not isinstance(n2id, dict):
raise ValueError("invalid whitelist json")
return cls(
candidate_consumables=normalize_candidate_consumables_raw(cand),
name_to_id={str(k).strip(): str(v).strip() for k, v in n2id.items() if str(k).strip()},
)
def events_path_for_surgery(surgery_id: str, *, base_dir: str | Path = "logs") -> Path:
root = Path(base_dir)
return (root / f"algo_events_{surgery_id}.jsonl").expanduser()
def append_event_line(path: Path, event: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
line = json.dumps(event, ensure_ascii=False) + "\n"
with path.open("a", encoding="utf-8") as f:
f.write(line)
f.flush()
def parse_event_obj(line: str) -> dict[str, Any] | None:
line = line.strip()
if not line:
return None
try:
o = json.loads(line)
except json.JSONDecodeError:
return None
return o if isinstance(o, dict) else None