Files
operating-room-monitor-server/app/services/consumption_tsv_log.py

671 lines
22 KiB
Python
Raw Normal View History

"""每例手术一个文本文件(制表符列):`start_surgery` 时截断并写表头,每次时间窗识别**追加**一行
item_id, item_name, qty, doctor_id, timestamp, top13 名与置信度待确认行首列为 ``pending:{confirmation_id}``
item_name 待确认top13 仍为模型输出语音落锤后**整行替换**为与客户端一致的最终真值不再重复追加
终端 Markdown 时间戳为可读形式落盘时间戳为 ISO 区间便于程序解析
手术结束时再追加一节汇总行item_id, item_name, qty无其它列 HTTP ``summary`` 同算法由内存 ``details`` ``build_consumption_summary`` 得到非录制过程中按窗累计
时间戳在拉流起点记录 `time.time()` `time.monotonic()` 时间窗对齐直播 RTSP OpenCV 一般无可靠绝对时码以本机接收时刻为准
"""
from __future__ import annotations
import re
import threading
from datetime import datetime, timezone
from pathlib import Path
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
from loguru import logger
from app.baked import pipeline as bp
from app.services.consumable_vision_algorithm import ClsTop3, _norm_product_name
from app.terminal_markdown import print_markdown_stderr
# 制表符分隔;时间范围用 U+2013 连接;本窗消耗数量恒为 1。
# top1/2/3 为模型原始排序(未按手术候选重排);确认行 item_name 与 top1_name 同为 Top1 标签。
# item_id 只写与展示名不同的业务 idlabel_id与名称相同时留空。
HEADER = (
"item_id\titem_name\tqty\tdoctor_id\ttimestamp\t"
"top1_name\ttop1_conf\ttop2_name\ttop2_conf\ttop3_name\ttop3_conf\n"
)
SUMMARY_HEADER = "item_id\titem_name\tqty\n"
_RANGE_SEP = "\u2013" # en dash与样例 `00:00:00.00000:00:45.000` 一致
_lock = threading.Lock()
def _consumption_tzinfo():
raw = (bp.CONSUMPTION_LOG_TIMEZONE or "").strip()
if not raw:
lt = datetime.now().astimezone().tzinfo
return lt if lt is not None else timezone.utc
try:
return ZoneInfo(raw)
except ZoneInfoNotFoundError:
logger.warning("无效的 consumption_log_timezone={!r},回退为 UTC", raw)
return timezone.utc
def format_consumption_timestamp(
camera_id: str,
wall_start_epoch: float,
wall_end_epoch: float,
) -> str:
"""落盘用:墙钟 + 配置时区 → `camXX@ISO8601ISO8601`。"""
tz = _consumption_tzinfo()
a = datetime.fromtimestamp(wall_start_epoch, tz=tz)
b = datetime.fromtimestamp(wall_end_epoch, tz=tz)
cam = short_camera_label(camera_id)
return f"{cam}@{a.isoformat(timespec='milliseconds')}{_RANGE_SEP}{b.isoformat(timespec='milliseconds')}"
def format_consumption_timestamp_readable(
camera_id: str,
wall_start_epoch: float,
wall_end_epoch: float,
) -> str:
"""仅终端 Rich不含 `T` 的本地可读区间 + 摄像头简名,便于人眼对时。"""
tz = _consumption_tzinfo()
a = datetime.fromtimestamp(wall_start_epoch, tz=tz)
b = datetime.fromtimestamp(wall_end_epoch, tz=tz)
cam = short_camera_label(camera_id)
def _fmt(d: datetime) -> str:
return d.strftime("%Y-%m-%d %H:%M:%S") + f".{d.microsecond // 1000:03d}"
return f"{_fmt(a)} {_RANGE_SEP} {_fmt(b)} · {cam}"
def short_camera_label(camera_id: str) -> str:
s = (camera_id or "").strip()
m = re.match(r"^or-cam-(\d+)$", s, re.IGNORECASE)
if m:
return f"cam{int(m.group(1)):02d}"
m2 = re.match(r"^cam-?0*(\d+)$", s, re.IGNORECASE)
if m2:
return f"cam{int(m2.group(1)):02d}"
alnum = re.sub(r"[^\w-]", "", s)[:12]
return alnum or "cam"
def _encode_cell(value: str) -> str:
s = (value or "").replace("\r", " ").replace("\n", " ").replace("\t", " ")
return s
def resolve_consumption_ids(
t1_name: str,
t1_pid: str,
name_to_code: dict[str, str],
) -> tuple[str, str]:
"""TSV 第一列 item_id 与内存汇总键。
- ``tsv_item_id``业务 id或模型侧 t1_pid与展示名相同则视为无独立 id留空
- ``totals_key``汇总用稳定键无编码时用归一化名称避免多行空 id 碰撞
"""
n = (t1_name or "").strip()
norm = _norm_product_name(n)
code = (name_to_code.get(norm) or name_to_code.get(n) or "").strip()
p = (t1_pid or "").strip()
catalog = (code or p).strip()
if catalog and catalog != n:
return catalog, catalog
if catalog == n and catalog:
return "", norm
return "", norm if norm else (n or "unknown")
def resolve_consumption_item_id(
t1_name: str,
t1_pid: str,
name_to_code: dict[str, str],
) -> str:
"""兼容旧调用:有编码则返回编码,否则返回汇总键(归一化名或 unknown"""
tsv_id, totals_key = resolve_consumption_ids(t1_name, t1_pid, name_to_code)
return tsv_id or totals_key
def _fmt_top_conf(v: float) -> str:
try:
return f"{float(v):.4f}"
except (TypeError, ValueError):
return "0.0000"
def build_tsv_line(
*,
name_to_code: dict[str, str],
best: ClsTop3,
doctor_id: str,
camera_id: str,
wall_start_epoch: float,
wall_end_epoch: float,
) -> str:
tsv_id, _tot_key = resolve_consumption_ids(
best.t1_name, best.t1_pid, name_to_code
)
name1 = (best.t1_name or "").strip()
ts = format_consumption_timestamp(camera_id, wall_start_epoch, wall_end_epoch)
n2 = (best.t2_name or "").strip()
n3 = (best.t3_name or "").strip()
row = [
_encode_cell(tsv_id),
_encode_cell(name1),
"1",
_encode_cell(doctor_id),
_encode_cell(ts),
_encode_cell(name1),
_fmt_top_conf(best.t1_conf),
_encode_cell(n2),
_fmt_top_conf(best.t2_conf),
_encode_cell(n3),
_fmt_top_conf(best.t3_conf),
]
return "\t".join(row) + "\n"
def _safe_surgery_path_segment(surgery_id: str) -> str:
s = (surgery_id or "unknown").strip() or "unknown"
s = re.sub(r"[^\w\-.@]", "_", s)
return s[:200] if len(s) > 200 else s
def resolved_consumption_log_path(surgery_id: str) -> Path:
raw = (bp.CONSUMPTION_TSV_LOG_PATH or "logs/consumption_{surgery_id}.txt").strip()
safe = _safe_surgery_path_segment(surgery_id)
if "{surgery_id}" in raw:
raw = raw.replace("{surgery_id}", safe)
else:
p0 = Path(raw)
if p0.suffix:
raw = str(p0.with_name(f"{p0.stem}_{safe}{p0.suffix}"))
else:
raw = f"{raw.rstrip('/')}_{safe}.txt"
p = Path(raw).expanduser()
if not p.is_absolute():
p = Path.cwd() / p
return p
def init_consumption_log_file(surgery_id: str) -> None:
"""新手术开始:截断该手术对应文件并写入表头(一次)。"""
if not bp.CONSUMPTION_TSV_LOG_ENABLED:
return
path = resolved_consumption_log_path(surgery_id)
path.parent.mkdir(parents=True, exist_ok=True)
with _lock:
with path.open("w", encoding="utf-8") as f:
f.write(HEADER)
def append_consumption_tsv_line(surgery_id: str, line: str) -> None:
if not bp.CONSUMPTION_TSV_LOG_ENABLED:
return
path = resolved_consumption_log_path(surgery_id)
path.parent.mkdir(parents=True, exist_ok=True)
with _lock:
with path.open("a", encoding="utf-8") as f:
f.write(line)
def _md_cell(value: str) -> str:
"""避免破坏 Markdown 表格的 | 与换行。"""
s = (value or "").replace("\r", " ").replace("\n", " ").replace("|", "")
return s
def build_consumption_markdown(
*,
name_to_code: dict[str, str],
best: ClsTop3,
doctor_id: str,
camera_id: str,
wall_start_epoch: float,
wall_end_epoch: float,
since_recording_start: str | None = None,
) -> str:
"""终端用:与落盘列一致;本窗 qty 恒为 1。"""
tsv_id, _ = resolve_consumption_ids(best.t1_name, best.t1_pid, name_to_code)
n1 = (best.t1_name or "").strip()
n2 = (best.t2_name or "").strip()
n3 = (best.t3_name or "").strip()
ts = format_consumption_timestamp_readable(camera_id, wall_start_epoch, wall_end_epoch)
rel = _md_cell(since_recording_start or "")
return "\n".join(
[
"| item_id | item_name | qty | doctor_id | 相对开录 | timestamp | top2 | top3 |",
"| :--- | :--- | ---: | :--- | :--- | :--- | :--- | :--- |",
"| {} | {} | 1 | {} | {} | {} | {} | {} |".format(
_md_cell(tsv_id),
_md_cell(n1),
_md_cell(doctor_id),
rel,
_md_cell(ts),
_md_cell(
f"{n2} ({_fmt_top_conf(best.t2_conf)})" if n2 else "",
),
_md_cell(
f"{n3} ({_fmt_top_conf(best.t3_conf)})" if n3 else "",
),
),
"",
]
)
PENDING_CONSUMPTION_ITEM_NAME = "待确认"
def _build_pending_tsv_line(
*,
confirmation_id: str,
model_snap: ClsTop3,
doctor_id: str,
camera_id: str,
wall_start_epoch: float,
wall_end_epoch: float,
) -> str:
pid = f"pending:{confirmation_id}"
ts = format_consumption_timestamp(camera_id, wall_start_epoch, wall_end_epoch)
n1 = (model_snap.t1_name or "").strip()
n2 = (model_snap.t2_name or "").strip()
n3 = (model_snap.t3_name or "").strip()
row = [
_encode_cell(pid),
_encode_cell(PENDING_CONSUMPTION_ITEM_NAME),
"1",
_encode_cell(doctor_id),
_encode_cell(ts),
_encode_cell(n1),
_fmt_top_conf(model_snap.t1_conf),
_encode_cell(n2),
_fmt_top_conf(model_snap.t2_conf),
_encode_cell(n3),
_fmt_top_conf(model_snap.t3_conf),
]
return "\t".join(row) + "\n"
def build_pending_consumption_markdown(
*,
confirmation_id: str,
model_snap: ClsTop3,
doctor_id: str,
camera_id: str,
wall_start_epoch: float,
wall_end_epoch: float,
since_recording_start: str | None = None,
) -> str:
pid = f"pending:{confirmation_id}"
n1 = (model_snap.t1_name or "").strip()
n2 = (model_snap.t2_name or "").strip()
n3 = (model_snap.t3_name or "").strip()
ts = format_consumption_timestamp_readable(camera_id, wall_start_epoch, wall_end_epoch)
rel = _md_cell(since_recording_start or "")
def _top_cell(name: str, conf: float) -> str:
return _md_cell(f"{name} ({_fmt_top_conf(conf)})" if name else "")
return "\n".join(
[
"| item_id | item_name | qty | doctor_id | 相对开录 | timestamp | top1 | top2 | top3 |",
"| :--- | :--- | ---: | :--- | :--- | :--- | :--- | :--- | :--- |",
"| {} | {} | 1 | {} | {} | {} | {} | {} | {} |".format(
_md_cell(pid),
_md_cell(PENDING_CONSUMPTION_ITEM_NAME),
_md_cell(doctor_id),
rel,
_md_cell(ts),
_top_cell(n1, model_snap.t1_conf),
_top_cell(n2, model_snap.t2_conf),
_top_cell(n3, model_snap.t3_conf),
),
"",
]
)
def append_consumption_pending_window(
*,
surgery_id: str,
confirmation_id: str,
model_snap: ClsTop3,
doctor_id: str,
camera_id: str,
wall_start_epoch: float,
wall_end_epoch: float,
tsv_enabled: bool | None = None,
markdown_terminal: bool | None = None,
since_recording_start: str | None = None,
) -> None:
"""需医生确认的时间窗:落盘/终端记「待确认」top1/2/3 保留模型提示;不更新消耗汇总。"""
en_tsv = bp.CONSUMPTION_TSV_LOG_ENABLED if tsv_enabled is None else tsv_enabled
en_md = (
bp.CONSUMPTION_LOG_MARKDOWN_TERMINAL
if markdown_terminal is None
else markdown_terminal
)
if not en_tsv and not en_md:
return
line = _build_pending_tsv_line(
confirmation_id=confirmation_id,
model_snap=model_snap,
doctor_id=doctor_id,
camera_id=camera_id,
wall_start_epoch=wall_start_epoch,
wall_end_epoch=wall_end_epoch,
)
if en_tsv:
append_consumption_tsv_line(surgery_id, line)
if en_md:
print_markdown_stderr(
build_pending_consumption_markdown(
confirmation_id=confirmation_id,
model_snap=model_snap,
doctor_id=doctor_id,
camera_id=camera_id,
wall_start_epoch=wall_start_epoch,
wall_end_epoch=wall_end_epoch,
since_recording_start=since_recording_start,
),
)
def _build_voice_resolved_tsv_data_line(
*,
name_to_code: dict[str, str],
chosen_label: str,
doctor_id: str,
wall_epoch: float,
) -> str:
"""与客户端一致的最终行top2/3 空列timestamp 为单点)。"""
lb = (chosen_label or "").strip()
norm = _norm_product_name(lb)
p = (
name_to_code.get(norm) or name_to_code.get(lb) or ""
).strip()
snap = ClsTop3(
t1_name=lb,
t1_conf=1.0,
t2_name="",
t2_conf=0.0,
t3_name="",
t3_conf=0.0,
t1_pid=p,
t2_pid="",
t3_pid="",
)
return build_tsv_line(
name_to_code=name_to_code,
best=snap,
doctor_id=doctor_id,
camera_id="voice",
wall_start_epoch=wall_epoch,
wall_end_epoch=wall_epoch,
)
def _data_line_starts_with_pending_id(line: str, confirmation_id: str) -> bool:
s = (line or "").rstrip("\r\n")
if not s or s.startswith("item_id\t"):
return False
want = f"pending:{(confirmation_id or '').strip()}"
first = s.split("\t", 1)[0]
return first == _encode_cell(want) or first == want
def replace_pending_line_with_voice_resolution(
*,
surgery_id: str,
confirmation_id: str,
name_to_code: dict[str, str],
chosen_label: str,
doctor_id: str,
wall_epoch: float,
tsv_enabled: bool | None = None,
) -> None:
"""将同一 ``confirmation_id`` 下先前追加的「待确认」行整行改为最终真值,不再多追加一行。
未找到 ``pending:{confirmation_id}`` 时回退为追加兼容旧文件或行缺失读改写全程持
:data:`_lock`避免与并发的 append 交错
"""
en = bp.CONSUMPTION_TSV_LOG_ENABLED if tsv_enabled is None else tsv_enabled
if not en:
return
if not (chosen_label or "").strip() or not (confirmation_id or "").strip():
return
new_line = _build_voice_resolved_tsv_data_line(
name_to_code=name_to_code,
chosen_label=chosen_label,
doctor_id=doctor_id,
wall_epoch=wall_epoch,
)
path = resolved_consumption_log_path(surgery_id)
with _lock:
if not path.is_file():
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as f:
f.write(HEADER)
f.write(new_line)
logger.warning(
"consumption TSV 无文件,无法替换 pending 行,已写表头+最终行: {}",
path,
)
return
text = path.read_text(encoding="utf-8")
lines = text.splitlines(keepends=True)
replaced = False
out: list[str] = []
for line in lines:
if (
not replaced
and _data_line_starts_with_pending_id(line, confirmation_id)
):
out.append(new_line)
replaced = True
else:
out.append(line)
if not replaced:
logger.warning(
"未在 TSV 中找到 pending 行,回退为追加: surgery={} confirm={}",
surgery_id,
confirmation_id,
)
with path.open("a", encoding="utf-8") as f:
f.write(new_line)
return
path.write_text("".join(out), encoding="utf-8")
def append_consumption_voice_resolution_line(
*,
surgery_id: str,
name_to_code: dict[str, str],
chosen_label: str,
doctor_id: str,
wall_epoch: float,
tsv_enabled: bool | None = None,
) -> None:
"""已弃用:请使用 ``replace_pending_line_with_voice_resolution`` 并传 ``confirmation_id``。"""
if not (chosen_label or "").strip():
return
line = _build_voice_resolved_tsv_data_line(
name_to_code=name_to_code,
chosen_label=chosen_label,
doctor_id=doctor_id,
wall_epoch=wall_epoch,
)
append_consumption_tsv_line(surgery_id, line)
def append_consumption_log_summary(
surgery_id: str,
totals: dict[str, tuple[str, int]],
) -> None:
"""在明细行之后追加汇总块(表头 + 每物品一行)。"""
if not bp.CONSUMPTION_TSV_LOG_ENABLED or not totals:
return
path = resolved_consumption_log_path(surgery_id)
if not path.is_file():
return
body = "".join(
["\n", SUMMARY_HEADER]
+ [
"\t".join([_encode_cell(iid), _encode_cell(name), str(qty)]) + "\n"
for iid, (name, qty) in sorted(totals.items(), key=lambda x: x[0])
]
)
with _lock:
with path.open("a", encoding="utf-8") as f:
f.write(body)
def print_consumption_summary_markdown(
totals: dict[str, tuple[str, int]],
) -> None:
if not bp.CONSUMPTION_LOG_MARKDOWN_TERMINAL or not totals:
return
lines = [
"## 消耗汇总",
"",
"| item_id | item_name | qty |",
"| :--- | :--- | ---: |",
]
for iid, (name, qty) in sorted(totals.items(), key=lambda x: x[0]):
lines.append(
"| {} | {} | {} |".format(_md_cell(iid), _md_cell(name), qty)
)
lines.append("")
print_markdown_stderr("\n".join(lines))
class ConsumptionTsvWriter:
"""注入式 consumption 日志写入器,取代模块全局 ``settings`` 读取。
行为与模块级函数完全一致保留模块级函数以维持旧调用点的兼容期
"""
def __init__(self, app_settings=None) -> None:
_ = app_settings
def init_file(self, surgery_id: str) -> None:
if not bp.CONSUMPTION_TSV_LOG_ENABLED:
return
path = resolved_consumption_log_path(surgery_id)
path.parent.mkdir(parents=True, exist_ok=True)
with _lock:
with path.open("w", encoding="utf-8") as f:
f.write(HEADER)
def append_window(
self,
*,
surgery_id: str,
name_to_code: dict[str, str],
best: ClsTop3,
doctor_id: str,
camera_id: str,
wall_start_epoch: float,
wall_end_epoch: float,
since_recording_start: str | None = None,
) -> None:
if not bp.CONSUMPTION_TSV_LOG_ENABLED and not bp.CONSUMPTION_LOG_MARKDOWN_TERMINAL:
return
if bp.CONSUMPTION_TSV_LOG_ENABLED:
line = build_tsv_line(
name_to_code=name_to_code,
best=best,
doctor_id=doctor_id,
camera_id=camera_id,
wall_start_epoch=wall_start_epoch,
wall_end_epoch=wall_end_epoch,
)
append_consumption_tsv_line(surgery_id, line)
if bp.CONSUMPTION_LOG_MARKDOWN_TERMINAL:
print_markdown_stderr(
build_consumption_markdown(
name_to_code=name_to_code,
best=best,
doctor_id=doctor_id,
camera_id=camera_id,
wall_start_epoch=wall_start_epoch,
wall_end_epoch=wall_end_epoch,
since_recording_start=since_recording_start,
),
)
def append_summary(
self,
surgery_id: str,
totals: dict[str, tuple[str, int]],
) -> None:
if not bp.CONSUMPTION_TSV_LOG_ENABLED or not totals:
return
path = resolved_consumption_log_path(surgery_id)
if not path.is_file():
return
body = "".join(
["\n", SUMMARY_HEADER]
+ [
"\t".join([_encode_cell(iid), _encode_cell(name), str(qty)]) + "\n"
for iid, (name, qty) in sorted(totals.items(), key=lambda x: x[0])
]
)
with _lock:
with path.open("a", encoding="utf-8") as f:
f.write(body)
def print_summary_markdown(self, totals: dict[str, tuple[str, int]]) -> None:
if not bp.CONSUMPTION_LOG_MARKDOWN_TERMINAL or not totals:
return
lines = [
"## 消耗汇总",
"",
"| item_id | item_name | qty |",
"| :--- | :--- | ---: |",
]
for iid, (name, qty) in sorted(totals.items(), key=lambda x: x[0]):
lines.append(
"| {} | {} | {} |".format(_md_cell(iid), _md_cell(name), qty)
)
lines.append("")
print_markdown_stderr("\n".join(lines))
def append_consumption_window(
*,
surgery_id: str,
name_to_code: dict[str, str],
best: ClsTop3,
doctor_id: str,
camera_id: str,
wall_start_epoch: float,
wall_end_epoch: float,
since_recording_start: str | None = None,
) -> None:
if not bp.CONSUMPTION_TSV_LOG_ENABLED and not bp.CONSUMPTION_LOG_MARKDOWN_TERMINAL:
return
if bp.CONSUMPTION_TSV_LOG_ENABLED:
line = build_tsv_line(
name_to_code=name_to_code,
best=best,
doctor_id=doctor_id,
camera_id=camera_id,
wall_start_epoch=wall_start_epoch,
wall_end_epoch=wall_end_epoch,
)
append_consumption_tsv_line(surgery_id, line)
if bp.CONSUMPTION_LOG_MARKDOWN_TERMINAL:
print_markdown_stderr(
build_consumption_markdown(
name_to_code=name_to_code,
best=best,
doctor_id=doctor_id,
camera_id=camera_id,
wall_start_epoch=wall_start_epoch,
wall_end_epoch=wall_end_epoch,
since_recording_start=since_recording_start,
),
)