385 lines
12 KiB
Python
385 lines
12 KiB
Python
|
|
"""
|
|||
|
|
LLM 调用 OpenTelemetry span 与 metrics(低基数 attributes,不含 prompt/response 正文)。
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
import time
|
|||
|
|
from contextlib import contextmanager
|
|||
|
|
from typing import Any, Iterator, Literal
|
|||
|
|
|
|||
|
|
from opentelemetry import trace
|
|||
|
|
from opentelemetry.trace import Status, StatusCode
|
|||
|
|
|
|||
|
|
from app.core.config import settings
|
|||
|
|
from app.core.telemetry import get_meter, get_tracer
|
|||
|
|
|
|||
|
|
CallType = Literal["json", "chat", "stream"]
|
|||
|
|
|
|||
|
|
_meter = None
|
|||
|
|
_duration_hist = None
|
|||
|
|
_call_counter = None
|
|||
|
|
_tokens_in_counter = None
|
|||
|
|
_tokens_out_counter = None
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _ensure_instruments() -> None:
|
|||
|
|
global _meter, _duration_hist, _call_counter, _tokens_in_counter, _tokens_out_counter
|
|||
|
|
if _meter is not None or not settings.otel_enabled:
|
|||
|
|
return
|
|||
|
|
_meter = get_meter("app.llm")
|
|||
|
|
_duration_hist = _meter.create_histogram(
|
|||
|
|
"llm.call.duration",
|
|||
|
|
unit="ms",
|
|||
|
|
description="LLM call wall time",
|
|||
|
|
)
|
|||
|
|
_call_counter = _meter.create_counter(
|
|||
|
|
"llm.call.total",
|
|||
|
|
description="LLM call count by outcome",
|
|||
|
|
)
|
|||
|
|
_tokens_in_counter = _meter.create_counter(
|
|||
|
|
"llm.tokens.input",
|
|||
|
|
description="LLM input tokens when reported by provider",
|
|||
|
|
)
|
|||
|
|
_tokens_out_counter = _meter.create_counter(
|
|||
|
|
"llm.tokens.output",
|
|||
|
|
description="LLM output tokens when reported by provider",
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def infer_provider_model(
|
|||
|
|
llm: Any,
|
|||
|
|
*,
|
|||
|
|
http_error_vendor: str = "deepseek",
|
|||
|
|
) -> tuple[str, str]:
|
|||
|
|
model = ""
|
|||
|
|
for attr in ("model_name", "model"):
|
|||
|
|
v = getattr(llm, attr, None)
|
|||
|
|
if v:
|
|||
|
|
model = str(v)
|
|||
|
|
break
|
|||
|
|
provider = (http_error_vendor or "unknown").strip().lower()
|
|||
|
|
return provider, model
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _outcome_label(*, parse_ok: bool, used_fallback: bool, error_kind: str | None) -> str:
|
|||
|
|
if parse_ok and not used_fallback:
|
|||
|
|
return "ok"
|
|||
|
|
if used_fallback:
|
|||
|
|
return "fallback"
|
|||
|
|
return error_kind or "error"
|
|||
|
|
|
|||
|
|
|
|||
|
|
def extract_token_usage(response: Any) -> tuple[int, int]:
|
|||
|
|
"""从 LangChain AIMessage / chunk 解析 token 用量。"""
|
|||
|
|
usage = getattr(response, "usage_metadata", None)
|
|||
|
|
if usage is None and hasattr(response, "response_metadata"):
|
|||
|
|
meta = getattr(response, "response_metadata", None) or {}
|
|||
|
|
if isinstance(meta, dict):
|
|||
|
|
usage = meta.get("token_usage") or meta.get("usage")
|
|||
|
|
if usage is None:
|
|||
|
|
return 0, 0
|
|||
|
|
if isinstance(usage, dict):
|
|||
|
|
inp = usage.get("input_tokens") or usage.get("prompt_tokens") or 0
|
|||
|
|
out = usage.get("output_tokens") or usage.get("completion_tokens") or 0
|
|||
|
|
return int(inp or 0), int(out or 0)
|
|||
|
|
inp = getattr(usage, "input_tokens", None) or getattr(usage, "prompt_tokens", None) or 0
|
|||
|
|
out = (
|
|||
|
|
getattr(usage, "output_tokens", None)
|
|||
|
|
or getattr(usage, "completion_tokens", None)
|
|||
|
|
or 0
|
|||
|
|
)
|
|||
|
|
return int(inp or 0), int(out or 0)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def record_llm_completion(
|
|||
|
|
*,
|
|||
|
|
agent: str,
|
|||
|
|
provider: str,
|
|||
|
|
model: str,
|
|||
|
|
duration_ms: float,
|
|||
|
|
call_type: CallType = "chat",
|
|||
|
|
outcome: str = "ok",
|
|||
|
|
input_tokens: int = 0,
|
|||
|
|
output_tokens: int = 0,
|
|||
|
|
span: trace.Span | None = None,
|
|||
|
|
extra_span_attributes: dict[str, Any] | None = None,
|
|||
|
|
) -> None:
|
|||
|
|
if not settings.otel_enabled:
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
_ensure_instruments()
|
|||
|
|
attrs = {
|
|||
|
|
"agent": agent,
|
|||
|
|
"provider": provider,
|
|||
|
|
"call_type": call_type,
|
|||
|
|
"outcome": outcome,
|
|||
|
|
}
|
|||
|
|
if _duration_hist is not None:
|
|||
|
|
_duration_hist.record(duration_ms, attrs)
|
|||
|
|
if _call_counter is not None:
|
|||
|
|
_call_counter.add(1, attrs)
|
|||
|
|
if input_tokens > 0 and _tokens_in_counter is not None:
|
|||
|
|
_tokens_in_counter.add(input_tokens, {"provider": provider, "agent": agent})
|
|||
|
|
if output_tokens > 0 and _tokens_out_counter is not None:
|
|||
|
|
_tokens_out_counter.add(output_tokens, {"provider": provider, "agent": agent})
|
|||
|
|
|
|||
|
|
if span is not None and span.is_recording():
|
|||
|
|
span.set_attribute("llm.duration_ms", round(duration_ms, 2))
|
|||
|
|
span.set_attribute("llm.call_type", call_type)
|
|||
|
|
span.set_attribute("llm.outcome", outcome)
|
|||
|
|
if input_tokens:
|
|||
|
|
span.set_attribute("llm.tokens.input", input_tokens)
|
|||
|
|
if output_tokens:
|
|||
|
|
span.set_attribute("llm.tokens.output", output_tokens)
|
|||
|
|
if extra_span_attributes:
|
|||
|
|
for k, v in extra_span_attributes.items():
|
|||
|
|
span.set_attribute(k, v)
|
|||
|
|
if outcome == "ok":
|
|||
|
|
span.set_status(Status(StatusCode.OK))
|
|||
|
|
elif outcome == "fallback":
|
|||
|
|
span.set_status(Status(StatusCode.OK, "fallback"))
|
|||
|
|
else:
|
|||
|
|
span.set_status(Status(StatusCode.ERROR, outcome))
|
|||
|
|
|
|||
|
|
|
|||
|
|
@contextmanager
|
|||
|
|
def langchain_invoke_span(
|
|||
|
|
*,
|
|||
|
|
agent: str,
|
|||
|
|
provider: str,
|
|||
|
|
model: str,
|
|||
|
|
call_type: CallType,
|
|||
|
|
prompt_sha12: str = "",
|
|||
|
|
max_tokens: int | None = None,
|
|||
|
|
) -> Iterator[dict[str, Any]]:
|
|||
|
|
"""
|
|||
|
|
包住 LangChain invoke/ainvoke;yield 可变 dict 供调用方写入 response 后触发 record。
|
|||
|
|
keys: response, outcome, input_tokens, output_tokens, error_kind
|
|||
|
|
"""
|
|||
|
|
ctx: dict[str, Any] = {
|
|||
|
|
"response": None,
|
|||
|
|
"outcome": "ok",
|
|||
|
|
"input_tokens": 0,
|
|||
|
|
"output_tokens": 0,
|
|||
|
|
}
|
|||
|
|
if not settings.otel_enabled:
|
|||
|
|
yield ctx
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
tracer = get_tracer("app.llm")
|
|||
|
|
span_name = {
|
|||
|
|
"json": "llm.json_invoke",
|
|||
|
|
"chat": "llm.chat_invoke",
|
|||
|
|
"stream": "llm.stream_invoke",
|
|||
|
|
}.get(call_type, "llm.invoke")
|
|||
|
|
attrs: dict[str, Any] = {
|
|||
|
|
"llm.agent": agent,
|
|||
|
|
"llm.provider": provider,
|
|||
|
|
"llm.model": model or "unknown",
|
|||
|
|
"llm.call_type": call_type,
|
|||
|
|
}
|
|||
|
|
if prompt_sha12:
|
|||
|
|
attrs["llm.prompt_sha12"] = prompt_sha12
|
|||
|
|
if max_tokens is not None:
|
|||
|
|
attrs["llm.max_tokens"] = max_tokens
|
|||
|
|
|
|||
|
|
t0 = time.perf_counter()
|
|||
|
|
with tracer.start_as_current_span(span_name, attributes=attrs) as span:
|
|||
|
|
try:
|
|||
|
|
yield ctx
|
|||
|
|
except Exception:
|
|||
|
|
ctx["outcome"] = "error"
|
|||
|
|
raise
|
|||
|
|
finally:
|
|||
|
|
duration_ms = (time.perf_counter() - t0) * 1000
|
|||
|
|
resp = ctx.get("response")
|
|||
|
|
if resp is not None and not ctx.get("input_tokens") and not ctx.get("output_tokens"):
|
|||
|
|
inp, out = extract_token_usage(resp)
|
|||
|
|
ctx["input_tokens"] = inp
|
|||
|
|
ctx["output_tokens"] = out
|
|||
|
|
record_llm_completion(
|
|||
|
|
agent=agent,
|
|||
|
|
provider=provider,
|
|||
|
|
model=model,
|
|||
|
|
duration_ms=duration_ms,
|
|||
|
|
call_type=call_type,
|
|||
|
|
outcome=str(ctx.get("outcome") or "ok"),
|
|||
|
|
input_tokens=int(ctx.get("input_tokens") or 0),
|
|||
|
|
output_tokens=int(ctx.get("output_tokens") or 0),
|
|||
|
|
span=span,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
@contextmanager
|
|||
|
|
def llm_call_span(
|
|||
|
|
*,
|
|||
|
|
agent: str,
|
|||
|
|
schema_name: str,
|
|||
|
|
provider: str,
|
|||
|
|
model: str,
|
|||
|
|
prompt_sha12: str,
|
|||
|
|
max_tokens: int,
|
|||
|
|
) -> Iterator[trace.Span]:
|
|||
|
|
if not settings.otel_enabled:
|
|||
|
|
yield trace.INVALID_SPAN
|
|||
|
|
return
|
|||
|
|
tracer = get_tracer("app.llm")
|
|||
|
|
with tracer.start_as_current_span(
|
|||
|
|
"llm.json_call",
|
|||
|
|
attributes={
|
|||
|
|
"llm.agent": agent,
|
|||
|
|
"llm.schema_name": schema_name,
|
|||
|
|
"llm.provider": provider,
|
|||
|
|
"llm.model": model or "unknown",
|
|||
|
|
"llm.prompt_sha12": prompt_sha12,
|
|||
|
|
"llm.max_tokens": max_tokens,
|
|||
|
|
"llm.call_type": "json",
|
|||
|
|
},
|
|||
|
|
) as span:
|
|||
|
|
yield span
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def observe_ainvoke(
|
|||
|
|
llm: Any,
|
|||
|
|
messages: Any,
|
|||
|
|
*,
|
|||
|
|
agent: str,
|
|||
|
|
provider: str = "deepseek",
|
|||
|
|
model: str = "",
|
|||
|
|
call_type: CallType = "chat",
|
|||
|
|
extra_span_attributes: dict[str, Any] | None = None,
|
|||
|
|
record_response_latency_ms: bool = True,
|
|||
|
|
) -> Any:
|
|||
|
|
"""包装 ``ainvoke``,统一 span + metrics。"""
|
|||
|
|
t0 = time.perf_counter()
|
|||
|
|
with langchain_invoke_span(
|
|||
|
|
agent=agent,
|
|||
|
|
provider=provider,
|
|||
|
|
model=model,
|
|||
|
|
call_type=call_type,
|
|||
|
|
) as tel:
|
|||
|
|
result = await llm.ainvoke(messages)
|
|||
|
|
tel["response"] = result
|
|||
|
|
span = trace.get_current_span()
|
|||
|
|
if span.is_recording():
|
|||
|
|
if record_response_latency_ms:
|
|||
|
|
span.set_attribute(
|
|||
|
|
"llm.response_latency_ms",
|
|||
|
|
round((time.perf_counter() - t0) * 1000, 2),
|
|||
|
|
)
|
|||
|
|
if extra_span_attributes:
|
|||
|
|
for key, value in extra_span_attributes.items():
|
|||
|
|
if value is not None:
|
|||
|
|
span.set_attribute(key, value)
|
|||
|
|
return result
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def observe_astream(
|
|||
|
|
llm: Any,
|
|||
|
|
prompt: Any,
|
|||
|
|
*,
|
|||
|
|
agent: str,
|
|||
|
|
provider: str = "deepseek",
|
|||
|
|
model: str = "",
|
|||
|
|
):
|
|||
|
|
"""包装 ``astream``,记录 wall time 与可选 TTFT。"""
|
|||
|
|
if not settings.otel_enabled:
|
|||
|
|
async for chunk in llm.astream(prompt):
|
|||
|
|
yield chunk
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
tracer = get_tracer("app.llm")
|
|||
|
|
t0 = time.perf_counter()
|
|||
|
|
ttft_ms: float | None = None
|
|||
|
|
last_chunk: Any = None
|
|||
|
|
with tracer.start_as_current_span(
|
|||
|
|
"llm.stream_invoke",
|
|||
|
|
attributes={
|
|||
|
|
"llm.agent": agent,
|
|||
|
|
"llm.provider": provider,
|
|||
|
|
"llm.model": model or "unknown",
|
|||
|
|
"llm.call_type": "stream",
|
|||
|
|
},
|
|||
|
|
) as span:
|
|||
|
|
try:
|
|||
|
|
async for chunk in llm.astream(prompt):
|
|||
|
|
if ttft_ms is None and getattr(chunk, "content", None):
|
|||
|
|
ttft_ms = (time.perf_counter() - t0) * 1000
|
|||
|
|
last_chunk = chunk
|
|||
|
|
yield chunk
|
|||
|
|
except Exception:
|
|||
|
|
duration_ms = (time.perf_counter() - t0) * 1000
|
|||
|
|
record_llm_completion(
|
|||
|
|
agent=agent,
|
|||
|
|
provider=provider,
|
|||
|
|
model=model,
|
|||
|
|
duration_ms=duration_ms,
|
|||
|
|
call_type="stream",
|
|||
|
|
outcome="error",
|
|||
|
|
span=span,
|
|||
|
|
extra_span_attributes=(
|
|||
|
|
{"llm.ttft_ms": round(ttft_ms, 2)} if ttft_ms is not None else None
|
|||
|
|
),
|
|||
|
|
)
|
|||
|
|
raise
|
|||
|
|
duration_ms = (time.perf_counter() - t0) * 1000
|
|||
|
|
inp, out = extract_token_usage(last_chunk) if last_chunk else (0, 0)
|
|||
|
|
extra: dict[str, Any] = {}
|
|||
|
|
if ttft_ms is not None:
|
|||
|
|
extra["llm.ttft_ms"] = round(ttft_ms, 2)
|
|||
|
|
record_llm_completion(
|
|||
|
|
agent=agent,
|
|||
|
|
provider=provider,
|
|||
|
|
model=model,
|
|||
|
|
duration_ms=duration_ms,
|
|||
|
|
call_type="stream",
|
|||
|
|
outcome="ok",
|
|||
|
|
input_tokens=inp,
|
|||
|
|
output_tokens=out,
|
|||
|
|
span=span,
|
|||
|
|
extra_span_attributes=extra or None,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def record_llm_call(
|
|||
|
|
*,
|
|||
|
|
agent: str,
|
|||
|
|
schema_name: str,
|
|||
|
|
provider: str,
|
|||
|
|
model: str,
|
|||
|
|
duration_ms: float,
|
|||
|
|
attempts: int,
|
|||
|
|
parse_ok: bool,
|
|||
|
|
used_fallback: bool,
|
|||
|
|
error_kind: str | None,
|
|||
|
|
prompt_sha12: str,
|
|||
|
|
input_tokens: int = 0,
|
|||
|
|
output_tokens: int = 0,
|
|||
|
|
span: trace.Span | None = None,
|
|||
|
|
) -> None:
|
|||
|
|
outcome = _outcome_label(
|
|||
|
|
parse_ok=parse_ok,
|
|||
|
|
used_fallback=used_fallback,
|
|||
|
|
error_kind=error_kind,
|
|||
|
|
)
|
|||
|
|
record_llm_completion(
|
|||
|
|
agent=agent,
|
|||
|
|
provider=provider,
|
|||
|
|
model=model,
|
|||
|
|
duration_ms=duration_ms,
|
|||
|
|
call_type="json",
|
|||
|
|
outcome=outcome,
|
|||
|
|
input_tokens=input_tokens,
|
|||
|
|
output_tokens=output_tokens,
|
|||
|
|
span=span,
|
|||
|
|
extra_span_attributes={
|
|||
|
|
"llm.schema_name": schema_name,
|
|||
|
|
"llm.attempts": attempts,
|
|||
|
|
"llm.parse_ok": parse_ok,
|
|||
|
|
"llm.used_fallback": used_fallback,
|
|||
|
|
**({"llm.error_kind": error_kind} if error_kind else {}),
|
|||
|
|
**({"llm.prompt_sha12": prompt_sha12} if prompt_sha12 else {}),
|
|||
|
|
},
|
|||
|
|
)
|