210 lines
7.1 KiB
Python
210 lines
7.1 KiB
Python
"""子进程运行:合并 stderr 到 stdout,整段写入独立 dump 文件,主日志只保留中文摘要。
|
||
|
||
约定:
|
||
- 给定 ``dump_dir`` 时,子进程每行实时 append 到
|
||
``{dump_dir}/{ts}_{run_id}__{safe(source_label)}.log``;
|
||
- 失败(``returncode != 0``)时再复制一份到
|
||
``{dump_dir.parent}/subprocess_failures/{ts}_{kind}_{safe(source_label)}_failed.log``,
|
||
与历史 ``.data/logs/measure/subprocess_failures/...`` 风格一致;
|
||
- 子进程结束后向 loguru 写一条带 ``event=True`` 的中文摘要,``metrics.tail`` 含末尾 N 行。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import os
|
||
import re
|
||
import shutil
|
||
import subprocess
|
||
import time
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from typing import Dict, List, Optional
|
||
|
||
from loguru import logger
|
||
|
||
from app.logging_config import new_run_id
|
||
|
||
_SAFE_RE = re.compile(r"[^\w.\-]+", re.UNICODE)
|
||
|
||
|
||
def _safe_label(label: str, *, max_len: int = 96) -> str:
|
||
s = _SAFE_RE.sub("_", str(label or "")).strip("._") or "subprocess"
|
||
return s[:max_len]
|
||
|
||
|
||
def _now_ts() -> str:
|
||
return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
||
|
||
|
||
def _tail_lines(lines: List[str], n: int) -> str:
|
||
if n <= 0 or not lines:
|
||
return ""
|
||
tail = lines[-n:]
|
||
return "".join(tail).rstrip()
|
||
|
||
|
||
def run_subprocess_with_log(
|
||
cmd: List[str],
|
||
*,
|
||
cwd: str,
|
||
env: Optional[Dict[str, str]] = None,
|
||
log_name: str,
|
||
stream_to_logger: bool = True,
|
||
dump_dir: Optional[Path] = None,
|
||
run_id: Optional[str] = None,
|
||
source_label: str = "",
|
||
kind: Optional[str] = None,
|
||
pipeline: Optional[str] = None,
|
||
step: Optional[str] = None,
|
||
summary_tail_lines: int = 30,
|
||
) -> subprocess.CompletedProcess[str]:
|
||
"""运行子进程,返回 ``CompletedProcess``。
|
||
|
||
- ``stream_to_logger=True``:每行同步以 INFO 写入 loguru(runtime sink),日志会很长;
|
||
- ``stream_to_logger=False``:仅在 dump 文件中保留整段输出,主日志写一条中文摘要 + 末尾若干行;
|
||
- ``dump_dir`` 为空时不做整段持久化(与旧行为兼容)。
|
||
"""
|
||
rid = run_id or new_run_id(pipeline or log_name)
|
||
pipe_label = pipeline or log_name
|
||
step_label = step or "subprocess"
|
||
safe_src = _safe_label(source_label or kind or pipe_label)
|
||
ts = _now_ts()
|
||
|
||
dump_path: Optional[Path] = None
|
||
dump_fp = None
|
||
if dump_dir is not None:
|
||
try:
|
||
dump_dir.mkdir(parents=True, exist_ok=True)
|
||
dump_path = dump_dir / f"{ts}_{rid}__{safe_src}.log"
|
||
dump_fp = open(dump_path, "w", encoding="utf-8", errors="replace")
|
||
header = (
|
||
f"# fish_api subprocess dump\n"
|
||
f"# written_at_utc: {datetime.now(timezone.utc).isoformat()}\n"
|
||
f"# run_id: {rid}\n"
|
||
f"# pipeline: {pipe_label}\n"
|
||
f"# step: {step_label}\n"
|
||
f"# kind: {kind or pipe_label}\n"
|
||
f"# source: {source_label}\n"
|
||
f"# cwd: {cwd}\n"
|
||
f"# command: {' '.join(cmd)}\n"
|
||
f"# --- stdout/stderr ---\n"
|
||
)
|
||
dump_fp.write(header)
|
||
dump_fp.flush()
|
||
except OSError as e:
|
||
logger.warning(
|
||
"[{}] 无法打开子进程日志文件 {}: {}(继续运行,仅丢失整段持久化)",
|
||
pipe_label, dump_path, e,
|
||
)
|
||
dump_path = None
|
||
dump_fp = None
|
||
|
||
t0 = time.perf_counter()
|
||
proc = subprocess.Popen(
|
||
cmd,
|
||
cwd=cwd,
|
||
env=env if env is not None else os.environ.copy(),
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.STDOUT,
|
||
text=True,
|
||
encoding="utf-8",
|
||
errors="replace",
|
||
bufsize=1,
|
||
)
|
||
lines: List[str] = []
|
||
try:
|
||
if proc.stdout is not None:
|
||
for line in proc.stdout:
|
||
lines.append(line)
|
||
if dump_fp is not None:
|
||
dump_fp.write(line)
|
||
# 立即 flush,方便实时排查正在跑的子进程
|
||
dump_fp.flush()
|
||
if stream_to_logger:
|
||
s = line.rstrip()
|
||
if s:
|
||
logger.bind(pipeline=pipe_label, run_id=rid).info(
|
||
"[{}] {}", log_name, s
|
||
)
|
||
finally:
|
||
rc = proc.wait()
|
||
dt_ms = (time.perf_counter() - t0) * 1000.0
|
||
if dump_fp is not None:
|
||
try:
|
||
dump_fp.write(
|
||
f"\n# --- end ---\n# returncode: {rc}\n# elapsed_ms: {dt_ms:.1f}\n"
|
||
)
|
||
dump_fp.flush()
|
||
finally:
|
||
dump_fp.close()
|
||
|
||
out = "".join(lines)
|
||
|
||
failure_path: Optional[Path] = None
|
||
if rc != 0 and dump_dir is not None:
|
||
try:
|
||
fail_dir = dump_dir.parent / "subprocess_failures"
|
||
fail_dir.mkdir(parents=True, exist_ok=True)
|
||
failure_path = (
|
||
fail_dir
|
||
/ f"{ts}_{kind or pipe_label}_{safe_src}_failed.log"
|
||
)
|
||
if dump_path is not None and dump_path.is_file():
|
||
shutil.copy2(dump_path, failure_path)
|
||
else:
|
||
# dump 失败的兜底:把内存里的 out 写进失败文件
|
||
failure_path.write_text(
|
||
f"# fish_api subprocess failure (no dump file)\n"
|
||
f"# returncode: {rc}\n"
|
||
f"# command: {' '.join(cmd)}\n"
|
||
f"# --- stdout/stderr ---\n{out}",
|
||
encoding="utf-8",
|
||
)
|
||
except OSError as e:
|
||
logger.warning(
|
||
"[{}] 写入失败日志副本失败 {}: {}",
|
||
pipe_label, failure_path, e,
|
||
)
|
||
failure_path = None
|
||
|
||
status = "success" if rc == 0 else "fail"
|
||
tail = _tail_lines(lines, summary_tail_lines)
|
||
metrics: Dict[str, object] = {
|
||
"returncode": rc,
|
||
"elapsed_ms": round(dt_ms, 3),
|
||
"stdout_lines": len(lines),
|
||
"dump_path": str(dump_path) if dump_path is not None else "",
|
||
"failure_log": str(failure_path) if failure_path is not None else "",
|
||
}
|
||
summary_msg_zh = (
|
||
f"子进程结束 [{log_name}] | 返回码={rc} | 耗时={dt_ms / 1000.0:.2f}s "
|
||
f"| 输出{len(lines)}行 -> {dump_path or '(未持久化)'}"
|
||
)
|
||
bound = logger.bind(
|
||
event=True,
|
||
pipeline=pipe_label,
|
||
step=step_label,
|
||
run_id=rid,
|
||
source=source_label,
|
||
status=status,
|
||
duration_ms=round(dt_ms, 3),
|
||
metrics=metrics,
|
||
)
|
||
if rc == 0:
|
||
bound.info(summary_msg_zh)
|
||
else:
|
||
bound.bind(error=f"returncode={rc}").error(summary_msg_zh)
|
||
|
||
if tail and not stream_to_logger:
|
||
head_label = "末尾输出" if rc == 0 else "末尾输出(失败)"
|
||
logger.bind(pipeline=pipe_label, run_id=rid).log(
|
||
"INFO" if rc == 0 else "WARNING",
|
||
"[{}] {}({} 行):\n{}",
|
||
log_name,
|
||
head_label,
|
||
min(summary_tail_lines, len(lines)),
|
||
tail,
|
||
)
|
||
|
||
return subprocess.CompletedProcess(cmd, rc, out, "")
|