"""每例手术一个文本文件(制表符列):`start_surgery` 时截断并写表头,每次时间窗识别**追加**一行。终端 Markdown 中时间戳为可读形式;落盘行内仍为 ISO 便于程序解析。 时间戳:在拉流起点记录 `time.time()`,与 `time.monotonic()` 时间窗对齐。直播 RTSP 经 OpenCV 一般无可靠绝对时码,以本机接收时刻为准。 """ from __future__ import annotations import re import threading from datetime import datetime, timezone from pathlib import Path from zoneinfo import ZoneInfo, ZoneInfoNotFoundError from loguru import logger from app.config import settings from app.services.consumable_vision_algorithm import ClsTop3 from app.terminal_markdown import print_markdown_stderr # 制表符分隔;时间范围用 U+2013 连接;Top2/3 仅名称;本窗消耗数量恒为 1 HEADER = "物品id\t物品名称\tTop2物品名称\tTop3物品名称\t消耗数量\t医生id\t时间戳\n" _RANGE_SEP = "\u2013" # en dash,与样例 `00:00:00.000–00:00:45.000` 一致 _lock = threading.Lock() def _consumption_tzinfo(): raw = (settings.consumption_log_timezone or "").strip() if not raw: lt = datetime.now().astimezone().tzinfo return lt if lt is not None else timezone.utc try: return ZoneInfo(raw) except ZoneInfoNotFoundError: logger.warning("无效的 consumption_log_timezone={!r},回退为 UTC", raw) return timezone.utc def format_consumption_timestamp( camera_id: str, wall_start_epoch: float, wall_end_epoch: float, ) -> str: """落盘用:墙钟 + 配置时区 → `camXX@ISO8601–ISO8601`。""" tz = _consumption_tzinfo() a = datetime.fromtimestamp(wall_start_epoch, tz=tz) b = datetime.fromtimestamp(wall_end_epoch, tz=tz) cam = short_camera_label(camera_id) return f"{cam}@{a.isoformat(timespec='milliseconds')}{_RANGE_SEP}{b.isoformat(timespec='milliseconds')}" def format_consumption_timestamp_readable( camera_id: str, wall_start_epoch: float, wall_end_epoch: float, ) -> str: """仅终端 Rich:不含 `T` 的本地可读区间 + 摄像头简名,便于人眼对时。""" tz = _consumption_tzinfo() a = datetime.fromtimestamp(wall_start_epoch, tz=tz) b = datetime.fromtimestamp(wall_end_epoch, tz=tz) cam = short_camera_label(camera_id) def _fmt(d: datetime) -> str: return d.strftime("%Y-%m-%d %H:%M:%S") + f".{d.microsecond // 1000:03d}" return f"{_fmt(a)} {_RANGE_SEP} {_fmt(b)} · {cam}" def short_camera_label(camera_id: str) -> str: s = (camera_id or "").strip() m = re.match(r"^or-cam-(\d+)$", s, re.IGNORECASE) if m: return f"cam{int(m.group(1)):02d}" m2 = re.match(r"^cam-?0*(\d+)$", s, re.IGNORECASE) if m2: return f"cam{int(m2.group(1)):02d}" alnum = re.sub(r"[^\w-]", "", s)[:12] return alnum or "cam" def _encode_cell(value: str) -> str: s = (value or "").replace("\r", " ").replace("\n", " ").replace("\t", " ") return s def _item_id_for_row(name: str, pid: str, name_to_code: dict[str, str]) -> str: p = (pid or "").strip() if p: return p n = (name or "").strip() if n in name_to_code: return (name_to_code.get(n) or n).strip() return n def build_tsv_line( *, name_to_code: dict[str, str], best: ClsTop3, doctor_id: str, camera_id: str, wall_start_epoch: float, wall_end_epoch: float, ) -> str: id1 = _item_id_for_row(best.t1_name, best.t1_pid, name_to_code) # 与历史样例:Top1 为「名称 置信度」四位小数 name1 = f"{(best.t1_name or '').strip()} {best.t1_conf:.4f}".strip() n2 = (best.t2_name or "").strip() n3 = (best.t3_name or "").strip() ts = format_consumption_timestamp(camera_id, wall_start_epoch, wall_end_epoch) row = [ _encode_cell(id1), _encode_cell(name1), _encode_cell(n2), _encode_cell(n3), "1", _encode_cell(doctor_id), _encode_cell(ts), ] return "\t".join(row) + "\n" def _safe_surgery_path_segment(surgery_id: str) -> str: s = (surgery_id or "unknown").strip() or "unknown" s = re.sub(r"[^\w\-.@]", "_", s) return s[:200] if len(s) > 200 else s def resolved_consumption_log_path(surgery_id: str) -> Path: raw = (settings.consumption_tsv_log_path or "logs/consumption_{surgery_id}.txt").strip() safe = _safe_surgery_path_segment(surgery_id) if "{surgery_id}" in raw: raw = raw.replace("{surgery_id}", safe) else: p0 = Path(raw) if p0.suffix: raw = str(p0.with_name(f"{p0.stem}_{safe}{p0.suffix}")) else: raw = f"{raw.rstrip('/')}_{safe}.txt" p = Path(raw).expanduser() if not p.is_absolute(): p = Path.cwd() / p return p def init_consumption_log_file(surgery_id: str) -> None: """新手术开始:截断该手术对应文件并写入表头(一次)。""" if not settings.consumption_tsv_log_enabled: return path = resolved_consumption_log_path(surgery_id) path.parent.mkdir(parents=True, exist_ok=True) with _lock: with path.open("w", encoding="utf-8") as f: f.write(HEADER) def append_consumption_tsv_line(surgery_id: str, line: str) -> None: if not settings.consumption_tsv_log_enabled: return path = resolved_consumption_log_path(surgery_id) path.parent.mkdir(parents=True, exist_ok=True) with _lock: with path.open("a", encoding="utf-8") as f: f.write(line) def _md_cell(value: str) -> str: """避免破坏 Markdown 表格的 | 与换行。""" s = (value or "").replace("\r", " ").replace("\n", " ").replace("|", "|") return s def build_consumption_markdown( *, name_to_code: dict[str, str], best: ClsTop3, doctor_id: str, camera_id: str, wall_start_epoch: float, wall_end_epoch: float, ) -> str: """终端用:Top1 含 id/名称/置信度;Top2/3 仅名称;消耗数量恒为 1。""" id1 = _item_id_for_row(best.t1_name, best.t1_pid, name_to_code) n1 = (best.t1_name or "").strip() has2 = bool((best.t2_name or "").strip()) has3 = bool((best.t3_name or "").strip()) n2 = (best.t2_name or "").strip() if has2 else "" n3 = (best.t3_name or "").strip() if has3 else "" dash = "—" ts = format_consumption_timestamp_readable(camera_id, wall_start_epoch, wall_end_epoch) return "\n".join( [ "| Top1 物品id | Top1 物品名称 | Top1 置信度 | Top2 物品名称 | Top3 物品名称 | 消耗数量 | 医生id | 时间戳 |", "| :--- | :--- | ---: | :--- | :--- | ---: | :--- | :--- |", "| {} | {} | {:.4f} | {} | {} | 1 | {} | {} |".format( _md_cell(id1), _md_cell(n1), best.t1_conf, _md_cell(n2) if has2 else dash, _md_cell(n3) if has3 else dash, _md_cell(doctor_id), _md_cell(ts), ), "", ] ) def append_consumption_window( *, surgery_id: str, name_to_code: dict[str, str], best: ClsTop3, doctor_id: str, camera_id: str, wall_start_epoch: float, wall_end_epoch: float, ) -> None: if not settings.consumption_tsv_log_enabled and not settings.consumption_log_markdown_terminal: return if settings.consumption_tsv_log_enabled: line = build_tsv_line( name_to_code=name_to_code, best=best, doctor_id=doctor_id, camera_id=camera_id, wall_start_epoch=wall_start_epoch, wall_end_epoch=wall_end_epoch, ) append_consumption_tsv_line(surgery_id, line) if settings.consumption_log_markdown_terminal: print_markdown_stderr( build_consumption_markdown( name_to_code=name_to_code, best=best, doctor_id=doctor_id, camera_id=camera_id, wall_start_epoch=wall_start_epoch, wall_end_epoch=wall_end_epoch, ), )