"""子进程运行:合并 stderr 到 stdout,整段写入独立 dump 文件,主日志只保留中文摘要。 约定: - 给定 ``dump_dir`` 时,子进程每行实时 append 到 ``{dump_dir}/{ts}_{run_id}__{safe(source_label)}.log``; - 失败(``returncode != 0``)时再复制一份到 ``{dump_dir.parent}/subprocess_failures/{ts}_{kind}_{safe(source_label)}_failed.log``, 与历史 ``.data/logs/measure/subprocess_failures/...`` 风格一致; - 子进程结束后向 loguru 写一条带 ``event=True`` 的中文摘要,``metrics.tail`` 含末尾 N 行。 """ from __future__ import annotations import os import re import shutil import subprocess import time from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional from loguru import logger from app.logging_config import new_run_id _SAFE_RE = re.compile(r"[^\w.\-]+", re.UNICODE) def _safe_label(label: str, *, max_len: int = 96) -> str: s = _SAFE_RE.sub("_", str(label or "")).strip("._") or "subprocess" return s[:max_len] def _now_ts() -> str: return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") def _tail_lines(lines: List[str], n: int) -> str: if n <= 0 or not lines: return "" tail = lines[-n:] return "".join(tail).rstrip() def run_subprocess_with_log( cmd: List[str], *, cwd: str, env: Optional[Dict[str, str]] = None, log_name: str, stream_to_logger: bool = True, dump_dir: Optional[Path] = None, run_id: Optional[str] = None, source_label: str = "", kind: Optional[str] = None, pipeline: Optional[str] = None, step: Optional[str] = None, summary_tail_lines: int = 30, ) -> subprocess.CompletedProcess[str]: """运行子进程,返回 ``CompletedProcess``。 - ``stream_to_logger=True``:每行同步以 INFO 写入 loguru(runtime sink),日志会很长; - ``stream_to_logger=False``:仅在 dump 文件中保留整段输出,主日志写一条中文摘要 + 末尾若干行; - ``dump_dir`` 为空时不做整段持久化(与旧行为兼容)。 """ rid = run_id or new_run_id(pipeline or log_name) pipe_label = pipeline or log_name step_label = step or "subprocess" safe_src = _safe_label(source_label or kind or pipe_label) ts = _now_ts() dump_path: Optional[Path] = None dump_fp = None if dump_dir is not None: try: dump_dir.mkdir(parents=True, exist_ok=True) dump_path = dump_dir / f"{ts}_{rid}__{safe_src}.log" dump_fp = open(dump_path, "w", encoding="utf-8", errors="replace") header = ( f"# fish_api subprocess dump\n" f"# written_at_utc: {datetime.now(timezone.utc).isoformat()}\n" f"# run_id: {rid}\n" f"# pipeline: {pipe_label}\n" f"# step: {step_label}\n" f"# kind: {kind or pipe_label}\n" f"# source: {source_label}\n" f"# cwd: {cwd}\n" f"# command: {' '.join(cmd)}\n" f"# --- stdout/stderr ---\n" ) dump_fp.write(header) dump_fp.flush() except OSError as e: logger.warning( "[{}] 无法打开子进程日志文件 {}: {}(继续运行,仅丢失整段持久化)", pipe_label, dump_path, e, ) dump_path = None dump_fp = None t0 = time.perf_counter() proc = subprocess.Popen( cmd, cwd=cwd, env=env if env is not None else os.environ.copy(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding="utf-8", errors="replace", bufsize=1, ) lines: List[str] = [] try: if proc.stdout is not None: for line in proc.stdout: lines.append(line) if dump_fp is not None: dump_fp.write(line) # 立即 flush,方便实时排查正在跑的子进程 dump_fp.flush() if stream_to_logger: s = line.rstrip() if s: logger.bind(pipeline=pipe_label, run_id=rid).info( "[{}] {}", log_name, s ) finally: rc = proc.wait() dt_ms = (time.perf_counter() - t0) * 1000.0 if dump_fp is not None: try: dump_fp.write( f"\n# --- end ---\n# returncode: {rc}\n# elapsed_ms: {dt_ms:.1f}\n" ) dump_fp.flush() finally: dump_fp.close() out = "".join(lines) failure_path: Optional[Path] = None if rc != 0 and dump_dir is not None: try: fail_dir = dump_dir.parent / "subprocess_failures" fail_dir.mkdir(parents=True, exist_ok=True) failure_path = ( fail_dir / f"{ts}_{kind or pipe_label}_{safe_src}_failed.log" ) if dump_path is not None and dump_path.is_file(): shutil.copy2(dump_path, failure_path) else: # dump 失败的兜底:把内存里的 out 写进失败文件 failure_path.write_text( f"# fish_api subprocess failure (no dump file)\n" f"# returncode: {rc}\n" f"# command: {' '.join(cmd)}\n" f"# --- stdout/stderr ---\n{out}", encoding="utf-8", ) except OSError as e: logger.warning( "[{}] 写入失败日志副本失败 {}: {}", pipe_label, failure_path, e, ) failure_path = None status = "success" if rc == 0 else "fail" tail = _tail_lines(lines, summary_tail_lines) metrics: Dict[str, object] = { "returncode": rc, "elapsed_ms": round(dt_ms, 3), "stdout_lines": len(lines), "dump_path": str(dump_path) if dump_path is not None else "", "failure_log": str(failure_path) if failure_path is not None else "", } summary_msg_zh = ( f"子进程结束 [{log_name}] | 返回码={rc} | 耗时={dt_ms / 1000.0:.2f}s " f"| 输出{len(lines)}行 -> {dump_path or '(未持久化)'}" ) bound = logger.bind( event=True, pipeline=pipe_label, step=step_label, run_id=rid, source=source_label, status=status, duration_ms=round(dt_ms, 3), metrics=metrics, ) if rc == 0: bound.info(summary_msg_zh) else: bound.bind(error=f"returncode={rc}").error(summary_msg_zh) if tail and not stream_to_logger: head_label = "末尾输出" if rc == 0 else "末尾输出(失败)" logger.bind(pipeline=pipe_label, run_id=rid).log( "INFO" if rc == 0 else "WARNING", "[{}] {}({} 行):\n{}", log_name, head_label, min(summary_tail_lines, len(lines)), tail, ) return subprocess.CompletedProcess(cmd, rc, out, "")