""" LLM 调用 OpenTelemetry span 与 metrics(低基数 attributes,不含 prompt/response 正文)。 """ from __future__ import annotations import time from contextlib import contextmanager from typing import Any, Iterator, Literal from opentelemetry import trace from opentelemetry.trace import Status, StatusCode from app.core.config import settings from app.core.telemetry import get_meter, get_tracer CallType = Literal["json", "chat", "stream"] _meter = None _duration_hist = None _call_counter = None _tokens_in_counter = None _tokens_out_counter = None def _ensure_instruments() -> None: global _meter, _duration_hist, _call_counter, _tokens_in_counter, _tokens_out_counter if _meter is not None or not settings.otel_enabled: return _meter = get_meter("app.llm") _duration_hist = _meter.create_histogram( "llm.call.duration", unit="ms", description="LLM call wall time", ) _call_counter = _meter.create_counter( "llm.call.total", description="LLM call count by outcome", ) _tokens_in_counter = _meter.create_counter( "llm.tokens.input", description="LLM input tokens when reported by provider", ) _tokens_out_counter = _meter.create_counter( "llm.tokens.output", description="LLM output tokens when reported by provider", ) def infer_provider_model( llm: Any, *, http_error_vendor: str = "deepseek", ) -> tuple[str, str]: model = "" for attr in ("model_name", "model"): v = getattr(llm, attr, None) if v: model = str(v) break provider = (http_error_vendor or "unknown").strip().lower() return provider, model def _outcome_label(*, parse_ok: bool, used_fallback: bool, error_kind: str | None) -> str: if parse_ok and not used_fallback: return "ok" if used_fallback: return "fallback" return error_kind or "error" def extract_token_usage(response: Any) -> tuple[int, int]: """从 LangChain AIMessage / chunk 解析 token 用量。""" usage = getattr(response, "usage_metadata", None) if usage is None and hasattr(response, "response_metadata"): meta = getattr(response, "response_metadata", None) or {} if isinstance(meta, dict): usage = meta.get("token_usage") or meta.get("usage") if usage is None: return 0, 0 if isinstance(usage, dict): inp = usage.get("input_tokens") or usage.get("prompt_tokens") or 0 out = usage.get("output_tokens") or usage.get("completion_tokens") or 0 return int(inp or 0), int(out or 0) inp = getattr(usage, "input_tokens", None) or getattr(usage, "prompt_tokens", None) or 0 out = ( getattr(usage, "output_tokens", None) or getattr(usage, "completion_tokens", None) or 0 ) return int(inp or 0), int(out or 0) def record_llm_completion( *, agent: str, provider: str, model: str, duration_ms: float, call_type: CallType = "chat", outcome: str = "ok", input_tokens: int = 0, output_tokens: int = 0, span: trace.Span | None = None, extra_span_attributes: dict[str, Any] | None = None, ) -> None: if not settings.otel_enabled: return _ensure_instruments() attrs = { "agent": agent, "provider": provider, "call_type": call_type, "outcome": outcome, } if _duration_hist is not None: _duration_hist.record(duration_ms, attrs) if _call_counter is not None: _call_counter.add(1, attrs) if input_tokens > 0 and _tokens_in_counter is not None: _tokens_in_counter.add(input_tokens, {"provider": provider, "agent": agent}) if output_tokens > 0 and _tokens_out_counter is not None: _tokens_out_counter.add(output_tokens, {"provider": provider, "agent": agent}) if span is not None and span.is_recording(): span.set_attribute("llm.duration_ms", round(duration_ms, 2)) span.set_attribute("llm.call_type", call_type) span.set_attribute("llm.outcome", outcome) if input_tokens: span.set_attribute("llm.tokens.input", input_tokens) if output_tokens: span.set_attribute("llm.tokens.output", output_tokens) if extra_span_attributes: for k, v in extra_span_attributes.items(): span.set_attribute(k, v) if outcome == "ok": span.set_status(Status(StatusCode.OK)) elif outcome == "fallback": span.set_status(Status(StatusCode.OK, "fallback")) else: span.set_status(Status(StatusCode.ERROR, outcome)) @contextmanager def langchain_invoke_span( *, agent: str, provider: str, model: str, call_type: CallType, prompt_sha12: str = "", max_tokens: int | None = None, ) -> Iterator[dict[str, Any]]: """ 包住 LangChain invoke/ainvoke;yield 可变 dict 供调用方写入 response 后触发 record。 keys: response, outcome, input_tokens, output_tokens, error_kind """ ctx: dict[str, Any] = { "response": None, "outcome": "ok", "input_tokens": 0, "output_tokens": 0, } if not settings.otel_enabled: yield ctx return tracer = get_tracer("app.llm") span_name = { "json": "llm.json_invoke", "chat": "llm.chat_invoke", "stream": "llm.stream_invoke", }.get(call_type, "llm.invoke") attrs: dict[str, Any] = { "llm.agent": agent, "llm.provider": provider, "llm.model": model or "unknown", "llm.call_type": call_type, } if prompt_sha12: attrs["llm.prompt_sha12"] = prompt_sha12 if max_tokens is not None: attrs["llm.max_tokens"] = max_tokens t0 = time.perf_counter() with tracer.start_as_current_span(span_name, attributes=attrs) as span: try: yield ctx except Exception: ctx["outcome"] = "error" raise finally: duration_ms = (time.perf_counter() - t0) * 1000 resp = ctx.get("response") if resp is not None and not ctx.get("input_tokens") and not ctx.get("output_tokens"): inp, out = extract_token_usage(resp) ctx["input_tokens"] = inp ctx["output_tokens"] = out record_llm_completion( agent=agent, provider=provider, model=model, duration_ms=duration_ms, call_type=call_type, outcome=str(ctx.get("outcome") or "ok"), input_tokens=int(ctx.get("input_tokens") or 0), output_tokens=int(ctx.get("output_tokens") or 0), span=span, ) @contextmanager def llm_call_span( *, agent: str, schema_name: str, provider: str, model: str, prompt_sha12: str, max_tokens: int, ) -> Iterator[trace.Span]: if not settings.otel_enabled: yield trace.INVALID_SPAN return tracer = get_tracer("app.llm") with tracer.start_as_current_span( "llm.json_call", attributes={ "llm.agent": agent, "llm.schema_name": schema_name, "llm.provider": provider, "llm.model": model or "unknown", "llm.prompt_sha12": prompt_sha12, "llm.max_tokens": max_tokens, "llm.call_type": "json", }, ) as span: yield span async def observe_ainvoke( llm: Any, messages: Any, *, agent: str, provider: str = "deepseek", model: str = "", call_type: CallType = "chat", extra_span_attributes: dict[str, Any] | None = None, record_response_latency_ms: bool = True, ) -> Any: """包装 ``ainvoke``,统一 span + metrics。""" t0 = time.perf_counter() with langchain_invoke_span( agent=agent, provider=provider, model=model, call_type=call_type, ) as tel: result = await llm.ainvoke(messages) tel["response"] = result span = trace.get_current_span() if span.is_recording(): if record_response_latency_ms: span.set_attribute( "llm.response_latency_ms", round((time.perf_counter() - t0) * 1000, 2), ) if extra_span_attributes: for key, value in extra_span_attributes.items(): if value is not None: span.set_attribute(key, value) return result async def observe_astream( llm: Any, prompt: Any, *, agent: str, provider: str = "deepseek", model: str = "", ): """包装 ``astream``,记录 wall time 与可选 TTFT。""" if not settings.otel_enabled: async for chunk in llm.astream(prompt): yield chunk return tracer = get_tracer("app.llm") t0 = time.perf_counter() ttft_ms: float | None = None last_chunk: Any = None with tracer.start_as_current_span( "llm.stream_invoke", attributes={ "llm.agent": agent, "llm.provider": provider, "llm.model": model or "unknown", "llm.call_type": "stream", }, ) as span: try: async for chunk in llm.astream(prompt): if ttft_ms is None and getattr(chunk, "content", None): ttft_ms = (time.perf_counter() - t0) * 1000 last_chunk = chunk yield chunk except Exception: duration_ms = (time.perf_counter() - t0) * 1000 record_llm_completion( agent=agent, provider=provider, model=model, duration_ms=duration_ms, call_type="stream", outcome="error", span=span, extra_span_attributes=( {"llm.ttft_ms": round(ttft_ms, 2)} if ttft_ms is not None else None ), ) raise duration_ms = (time.perf_counter() - t0) * 1000 inp, out = extract_token_usage(last_chunk) if last_chunk else (0, 0) extra: dict[str, Any] = {} if ttft_ms is not None: extra["llm.ttft_ms"] = round(ttft_ms, 2) record_llm_completion( agent=agent, provider=provider, model=model, duration_ms=duration_ms, call_type="stream", outcome="ok", input_tokens=inp, output_tokens=out, span=span, extra_span_attributes=extra or None, ) def record_llm_call( *, agent: str, schema_name: str, provider: str, model: str, duration_ms: float, attempts: int, parse_ok: bool, used_fallback: bool, error_kind: str | None, prompt_sha12: str, input_tokens: int = 0, output_tokens: int = 0, span: trace.Span | None = None, ) -> None: outcome = _outcome_label( parse_ok=parse_ok, used_fallback=used_fallback, error_kind=error_kind, ) record_llm_completion( agent=agent, provider=provider, model=model, duration_ms=duration_ms, call_type="json", outcome=outcome, input_tokens=input_tokens, output_tokens=output_tokens, span=span, extra_span_attributes={ "llm.schema_name": schema_name, "llm.attempts": attempts, "llm.parse_ok": parse_ok, "llm.used_fallback": used_fallback, **({"llm.error_kind": error_kind} if error_kind else {}), **({"llm.prompt_sha12": prompt_sha12} if prompt_sha12 else {}), }, )