Files
FishServer/fish_api/app/subprocess_run.py
2026-05-13 09:19:31 +08:00

210 lines
7.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""子进程运行:合并 stderr 到 stdout整段写入独立 dump 文件,主日志只保留中文摘要。
约定:
- 给定 ``dump_dir`` 时,子进程每行实时 append 到
``{dump_dir}/{ts}_{run_id}__{safe(source_label)}.log``
- 失败(``returncode != 0``)时再复制一份到
``{dump_dir.parent}/subprocess_failures/{ts}_{kind}_{safe(source_label)}_failed.log``
与历史 ``.data/logs/measure/subprocess_failures/...`` 风格一致;
- 子进程结束后向 loguru 写一条带 ``event=True`` 的中文摘要,``metrics.tail`` 含末尾 N 行。
"""
from __future__ import annotations
import os
import re
import shutil
import subprocess
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional
from loguru import logger
from app.logging_config import new_run_id
_SAFE_RE = re.compile(r"[^\w.\-]+", re.UNICODE)
def _safe_label(label: str, *, max_len: int = 96) -> str:
s = _SAFE_RE.sub("_", str(label or "")).strip("._") or "subprocess"
return s[:max_len]
def _now_ts() -> str:
return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
def _tail_lines(lines: List[str], n: int) -> str:
if n <= 0 or not lines:
return ""
tail = lines[-n:]
return "".join(tail).rstrip()
def run_subprocess_with_log(
cmd: List[str],
*,
cwd: str,
env: Optional[Dict[str, str]] = None,
log_name: str,
stream_to_logger: bool = True,
dump_dir: Optional[Path] = None,
run_id: Optional[str] = None,
source_label: str = "",
kind: Optional[str] = None,
pipeline: Optional[str] = None,
step: Optional[str] = None,
summary_tail_lines: int = 30,
) -> subprocess.CompletedProcess[str]:
"""运行子进程,返回 ``CompletedProcess``。
- ``stream_to_logger=True``:每行同步以 INFO 写入 logururuntime sink日志会很长
- ``stream_to_logger=False``:仅在 dump 文件中保留整段输出,主日志写一条中文摘要 + 末尾若干行;
- ``dump_dir`` 为空时不做整段持久化(与旧行为兼容)。
"""
rid = run_id or new_run_id(pipeline or log_name)
pipe_label = pipeline or log_name
step_label = step or "subprocess"
safe_src = _safe_label(source_label or kind or pipe_label)
ts = _now_ts()
dump_path: Optional[Path] = None
dump_fp = None
if dump_dir is not None:
try:
dump_dir.mkdir(parents=True, exist_ok=True)
dump_path = dump_dir / f"{ts}_{rid}__{safe_src}.log"
dump_fp = open(dump_path, "w", encoding="utf-8", errors="replace")
header = (
f"# fish_api subprocess dump\n"
f"# written_at_utc: {datetime.now(timezone.utc).isoformat()}\n"
f"# run_id: {rid}\n"
f"# pipeline: {pipe_label}\n"
f"# step: {step_label}\n"
f"# kind: {kind or pipe_label}\n"
f"# source: {source_label}\n"
f"# cwd: {cwd}\n"
f"# command: {' '.join(cmd)}\n"
f"# --- stdout/stderr ---\n"
)
dump_fp.write(header)
dump_fp.flush()
except OSError as e:
logger.warning(
"[{}] 无法打开子进程日志文件 {}: {}(继续运行,仅丢失整段持久化)",
pipe_label, dump_path, e,
)
dump_path = None
dump_fp = None
t0 = time.perf_counter()
proc = subprocess.Popen(
cmd,
cwd=cwd,
env=env if env is not None else os.environ.copy(),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
errors="replace",
bufsize=1,
)
lines: List[str] = []
try:
if proc.stdout is not None:
for line in proc.stdout:
lines.append(line)
if dump_fp is not None:
dump_fp.write(line)
# 立即 flush方便实时排查正在跑的子进程
dump_fp.flush()
if stream_to_logger:
s = line.rstrip()
if s:
logger.bind(pipeline=pipe_label, run_id=rid).info(
"[{}] {}", log_name, s
)
finally:
rc = proc.wait()
dt_ms = (time.perf_counter() - t0) * 1000.0
if dump_fp is not None:
try:
dump_fp.write(
f"\n# --- end ---\n# returncode: {rc}\n# elapsed_ms: {dt_ms:.1f}\n"
)
dump_fp.flush()
finally:
dump_fp.close()
out = "".join(lines)
failure_path: Optional[Path] = None
if rc != 0 and dump_dir is not None:
try:
fail_dir = dump_dir.parent / "subprocess_failures"
fail_dir.mkdir(parents=True, exist_ok=True)
failure_path = (
fail_dir
/ f"{ts}_{kind or pipe_label}_{safe_src}_failed.log"
)
if dump_path is not None and dump_path.is_file():
shutil.copy2(dump_path, failure_path)
else:
# dump 失败的兜底:把内存里的 out 写进失败文件
failure_path.write_text(
f"# fish_api subprocess failure (no dump file)\n"
f"# returncode: {rc}\n"
f"# command: {' '.join(cmd)}\n"
f"# --- stdout/stderr ---\n{out}",
encoding="utf-8",
)
except OSError as e:
logger.warning(
"[{}] 写入失败日志副本失败 {}: {}",
pipe_label, failure_path, e,
)
failure_path = None
status = "success" if rc == 0 else "fail"
tail = _tail_lines(lines, summary_tail_lines)
metrics: Dict[str, object] = {
"returncode": rc,
"elapsed_ms": round(dt_ms, 3),
"stdout_lines": len(lines),
"dump_path": str(dump_path) if dump_path is not None else "",
"failure_log": str(failure_path) if failure_path is not None else "",
}
summary_msg_zh = (
f"子进程结束 [{log_name}] | 返回码={rc} | 耗时={dt_ms / 1000.0:.2f}s "
f"| 输出{len(lines)}行 -> {dump_path or '(未持久化)'}"
)
bound = logger.bind(
event=True,
pipeline=pipe_label,
step=step_label,
run_id=rid,
source=source_label,
status=status,
duration_ms=round(dt_ms, 3),
metrics=metrics,
)
if rc == 0:
bound.info(summary_msg_zh)
else:
bound.bind(error=f"returncode={rc}").error(summary_msg_zh)
if tail and not stream_to_logger:
head_label = "末尾输出" if rc == 0 else "末尾输出(失败)"
logger.bind(pipeline=pipe_label, run_id=rid).log(
"INFO" if rc == 0 else "WARNING",
"[{}] {}{} 行):\n{}",
log_name,
head_label,
min(summary_tail_lines, len(lines)),
tail,
)
return subprocess.CompletedProcess(cmd, rc, out, "")