"""语音确认(ASR/解析/审计)的终端 loguru 行 + 每手术 TSV 落盘,与 `consumption_tsv_log` 并列。""" from __future__ import annotations import re import threading from datetime import datetime, timezone from pathlib import Path from zoneinfo import ZoneInfo, ZoneInfoNotFoundError from loguru import logger from app.baked import pipeline as bp _lock = threading.Lock() HEADER = ( "时间戳(ISO,UTC)\t来源\t状态\tconfirmation_id\tasr/识别文本\t" "resolved_label\trejected\terror\taudio_object_key\n" ) def _ts_iso_utc() -> str: return datetime.now(timezone.utc).isoformat(timespec="milliseconds") def _encode_cell(value: str) -> str: return (value or "").replace("\r", " ").replace("\n", " ").replace("\t", " ") def _log_tz_info() -> object: raw = (bp.CONSUMPTION_LOG_TIMEZONE or "").strip() if not raw: lt = datetime.now().astimezone().tzinfo return lt if lt is not None else timezone.utc try: return ZoneInfo(raw) except ZoneInfoNotFoundError: return timezone.utc def _ts_local_for_display() -> str: tz = _log_tz_info() return datetime.now(tz).isoformat(timespec="milliseconds") def _safe_surgery_path_segment(surgery_id: str) -> str: s = (surgery_id or "unknown").strip() or "unknown" s = re.sub(r"[^\w\-.@]", "_", s) return s[:200] if len(s) > 200 else s def resolved_voice_log_path(surgery_id: str) -> Path: raw = (bp.VOICE_FILE_LOG_PATH or "logs/voice_{surgery_id}.txt").strip() safe = _safe_surgery_path_segment(surgery_id) if "{surgery_id}" in raw: raw = raw.replace("{surgery_id}", safe) else: p0 = Path(raw) if p0.suffix: raw = str(p0.with_name(f"{p0.stem}_{safe}{p0.suffix}")) else: raw = f"{raw.rstrip('/')}_{safe}.txt" p = Path(raw).expanduser() if not p.is_absolute(): p = Path.cwd() / p return p def init_voice_log_file(surgery_id: str) -> None: """与 `init_consumption_log_file` 同生命周期:`start_surgery` 时截断并写表头。""" if not bp.VOICE_FILE_LOG_ENABLED: return path = resolved_voice_log_path(surgery_id) path.parent.mkdir(parents=True, exist_ok=True) with _lock: with path.open("w", encoding="utf-8") as f: f.write(HEADER) def append_voice_tsv_line(surgery_id: str, line: str) -> None: if not bp.VOICE_FILE_LOG_ENABLED: return path = resolved_voice_log_path(surgery_id) path.parent.mkdir(parents=True, exist_ok=True) with _lock: with path.open("a", encoding="utf-8") as f: f.write(line) class VoiceTextLogWriter: """注入式 voice 日志写入器,封装 `init_file` / `emit_event`。""" def __init__(self) -> None: pass def init_file(self, surgery_id: str) -> None: init_voice_log_file(surgery_id) def emit_event( self, *, surgery_id: str, source: str, status: str, confirmation_id: str, asr_text: str | None = None, resolved_label: str | None = None, rejected: str | bool | None = None, error_message: str | None = None, audio_object_key: str | None = None, ) -> None: emit_voice_event( surgery_id=surgery_id, source=source, status=status, confirmation_id=confirmation_id, asr_text=asr_text, resolved_label=resolved_label, rejected=rejected, error_message=error_message, audio_object_key=audio_object_key, ) def emit_voice_event( *, surgery_id: str, source: str, status: str, confirmation_id: str, asr_text: str | None = None, resolved_label: str | None = None, rejected: str | bool | None = None, error_message: str | None = None, audio_object_key: str | None = None, ) -> None: rj: str if rejected is None: rj = "" elif isinstance(rejected, bool): rj = "true" if rejected else "false" else: rj = str(rejected) ts_utc = _ts_iso_utc() local_hint = _ts_local_for_display() if status in ("recognized", "rejected"): logger.info( "VoiceConfirm local_ts={!r} surgery_id={} source={} status={} " "confirmation_id={} asr_text={!r} resolved_label={!r} rejected={} " "error={!r} audio_key={!r}", local_hint, surgery_id, source, status, confirmation_id, asr_text, resolved_label, rj, error_message, audio_object_key, ) else: logger.warning( "VoiceConfirm local_ts={!r} surgery_id={} source={} status={} " "confirmation_id={} asr_text={!r} resolved_label={!r} rejected={} " "error={!r} audio_key={!r}", local_hint, surgery_id, source, status, confirmation_id, asr_text, resolved_label, rj, error_message, audio_object_key, ) if not bp.VOICE_FILE_LOG_ENABLED: return row = [ _encode_cell(ts_utc), _encode_cell(source), _encode_cell(status), _encode_cell(confirmation_id), _encode_cell("" if asr_text is None else asr_text), _encode_cell("" if resolved_label is None else resolved_label), _encode_cell(rj), _encode_cell("" if error_message is None else error_message), _encode_cell("" if audio_object_key is None else audio_object_key), ] line = "\t".join(row) + "\n" append_voice_tsv_line(surgery_id, line)