"""Map algorithm_subprocesses/5.15 TSV output to domain objects (orchestration adapter only).""" from __future__ import annotations import csv import hashlib import re from dataclasses import dataclass from datetime import datetime, timedelta, timezone from pathlib import Path from app.baked import pipeline as bp from app.consumable_catalog import ( effective_candidate_consumables, normalize_candidate_consumables_raw, ) from app.domain.consumption import SurgeryConsumptionStored @dataclass(frozen=True) class ReferenceDoctorInfo: doctor_id: str doctor_name: str | None display: str raw_line: str _DOCTOR_NAME_ID_RE = re.compile( r"^(?P.+?)\s*\(id=(?P[^,\s)]+)(?:,\s*conf=[\d.]+)?\)\s*(?:\[低置信度\])?\s*$" ) _DOCTOR_ID_ONLY_RE = re.compile( r"^doctor_id=(?P[^\s(]+)(?:\s*\(conf=[\d.]+\))?\s*(?:\[低置信度\])?\s*$" ) def sha256_file(path: Path) -> str: h = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): h.update(chunk) return h.hexdigest() def candidate_cache_key(candidate_consumables: list[str]) -> str: raw = "\n".join(candidate_consumables).encode("utf-8") return hashlib.sha256(raw).hexdigest()[:12] def resolve_reference_candidates(candidate_consumables: list[str] | None) -> list[str]: requested = normalize_candidate_consumables_raw(list(candidate_consumables or [])) return effective_candidate_consumables(requested) def parse_reference_doctor_info(path: Path) -> ReferenceDoctorInfo | None: if not path.is_file(): return None raw_line = "" for line in path.read_text(encoding="utf-8").splitlines(): stripped = line.strip() if stripped.startswith("医生信息:") or stripped.startswith("医生信息:"): raw_line = stripped break if not raw_line: return None body = raw_line.split(":", 1)[-1].split(":", 1)[-1].strip() if not body or body == "未启用": return ReferenceDoctorInfo( doctor_id=bp.VIDEO_RESULT_DOCTOR_ID, doctor_name=None, display=body or "未启用", raw_line=raw_line, ) if body.startswith("识别失败"): return ReferenceDoctorInfo( doctor_id=bp.VIDEO_RESULT_DOCTOR_ID, doctor_name=None, display=body, raw_line=raw_line, ) match = _DOCTOR_NAME_ID_RE.match(body) if match: name = match.group("name").strip() did = match.group("id").strip() return ReferenceDoctorInfo( doctor_id=did, doctor_name=name, display=f"{name} ({did})", raw_line=raw_line, ) match = _DOCTOR_ID_ONLY_RE.match(body) if match: did = match.group("id").strip() return ReferenceDoctorInfo( doctor_id=did, doctor_name=None, display=did, raw_line=raw_line, ) return ReferenceDoctorInfo( doctor_id=bp.VIDEO_RESULT_DOCTOR_ID, doctor_name=None, display=body, raw_line=raw_line, ) def is_reference_result_complete(path: Path) -> bool: if not path.is_file() or path.stat().st_size <= 0: return False lines = [line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip()] if not any(line.lower().startswith("rank\t") for line in lines): return False has_doctor_footer = any( line.startswith("医生信息:") or line.startswith("医生信息:") for line in lines ) has_segment_row = False for line in lines: if line.lower().startswith("rank\t"): continue if line.startswith("医生信息"): continue parts = line.split("\t") if len(parts) >= 5 and parts[0].strip().isdigit(): has_segment_row = True break return has_doctor_footer and has_segment_row def doctor_id_for_consumption_rows(doctor: ReferenceDoctorInfo | None) -> str: if doctor is None: return bp.VIDEO_RESULT_DOCTOR_ID if doctor.doctor_name: return f"{doctor.doctor_name} ({doctor.doctor_id})" if doctor.doctor_id and doctor.doctor_id != bp.VIDEO_RESULT_DOCTOR_ID: return doctor.doctor_id return bp.VIDEO_RESULT_DOCTOR_ID def parse_reference_tsv( path: Path, *, base_timestamp: datetime | None = None, doctor: ReferenceDoctorInfo | None = None, ) -> list[SurgeryConsumptionStored]: if base_timestamp is None: base_timestamp = datetime.now(timezone.utc) if doctor is None: doctor = parse_reference_doctor_info(path) row_doctor_id = doctor_id_for_consumption_rows(doctor) out: list[SurgeryConsumptionStored] = [] with path.open("r", encoding="utf-8", newline="") as f: reader = csv.DictReader(f, delimiter="\t") for row in reader: name = (row.get("top1_name") or "").strip() if not name or name.startswith("("): continue if name.startswith("医生信息"): continue item_id = (row.get("product_id_top1") or "").strip() or name try: start_sec = float((row.get("start_sec") or "0").strip() or 0.0) except ValueError: start_sec = 0.0 out.append( SurgeryConsumptionStored( item_id=item_id, item_name=name, qty=1, doctor_id=row_doctor_id, timestamp=base_timestamp + timedelta(seconds=max(0.0, start_sec)), source="video_batch", ) ) return out