* add staging ios app build script * feat(api): add OpenTelemetry LGTM stack for local observability Wire OTel traces, metrics, and logs through a collector to Tempo, Prometheus, and Loki, with custom LLM instrumentation, dev compose overlay, Grafana provisioning, env templates, and development.sh auto-start. Co-authored-by: Cursor <cursoragent@cursor.com> * feat: expand observability, harden dev tooling, and fix expo staging UX Add business and LLM Prometheus metrics with Grafana dashboards, alerting, and a metrics verification script. Wire telemetry through adapters and core LLM paths, and document the local LGTM workflow. Fix development.sh for macOS bash 3.2, open Grafana and eval-web in Chrome, and repair eval-web auto-open (unbound EVAL_WEB_BROWSER_SCHEDULED). Merge internal-eval into the main dev script with improved compose handling. Require EXPO_PUBLIC_* at build time, improve iOS HTTP ATS for staging IPs, show memoir empty state instead of load errors when no chapters exist, and add jest env setup plus chapter list response normalization. Co-authored-by: Cursor <cursoragent@cursor.com> * chore: enable Grafana Assistant Cursor plugin Co-authored-by: Cursor <cursoragent@cursor.com> * fix: memoir empty state and repair withdrawn 0020_chapters_book_id stamp Show empty memoir UI when the chapter list succeeds with no items; treat auth/404 as non-fatal. Extend alembic revision repair so local dev DBs stamped with the removed 0020_chapters_book_id migration can roll back and upgrade to 0019. Co-authored-by: Cursor <cursoragent@cursor.com> --------- Co-authored-by: Kevin <kevin@brighteng.org> Co-authored-by: Cursor <cursoragent@cursor.com>
385 lines
12 KiB
Python
385 lines
12 KiB
Python
"""
|
||
LLM 调用 OpenTelemetry span 与 metrics(低基数 attributes,不含 prompt/response 正文)。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import time
|
||
from contextlib import contextmanager
|
||
from typing import Any, Iterator, Literal
|
||
|
||
from opentelemetry import trace
|
||
from opentelemetry.trace import Status, StatusCode
|
||
|
||
from app.core.config import settings
|
||
from app.core.telemetry import get_meter, get_tracer
|
||
|
||
CallType = Literal["json", "chat", "stream"]
|
||
|
||
_meter = None
|
||
_duration_hist = None
|
||
_call_counter = None
|
||
_tokens_in_counter = None
|
||
_tokens_out_counter = None
|
||
|
||
|
||
def _ensure_instruments() -> None:
|
||
global _meter, _duration_hist, _call_counter, _tokens_in_counter, _tokens_out_counter
|
||
if _meter is not None or not settings.otel_enabled:
|
||
return
|
||
_meter = get_meter("app.llm")
|
||
_duration_hist = _meter.create_histogram(
|
||
"llm.call.duration",
|
||
unit="ms",
|
||
description="LLM call wall time",
|
||
)
|
||
_call_counter = _meter.create_counter(
|
||
"llm.call.total",
|
||
description="LLM call count by outcome",
|
||
)
|
||
_tokens_in_counter = _meter.create_counter(
|
||
"llm.tokens.input",
|
||
description="LLM input tokens when reported by provider",
|
||
)
|
||
_tokens_out_counter = _meter.create_counter(
|
||
"llm.tokens.output",
|
||
description="LLM output tokens when reported by provider",
|
||
)
|
||
|
||
|
||
def infer_provider_model(
|
||
llm: Any,
|
||
*,
|
||
http_error_vendor: str = "deepseek",
|
||
) -> tuple[str, str]:
|
||
model = ""
|
||
for attr in ("model_name", "model"):
|
||
v = getattr(llm, attr, None)
|
||
if v:
|
||
model = str(v)
|
||
break
|
||
provider = (http_error_vendor or "unknown").strip().lower()
|
||
return provider, model
|
||
|
||
|
||
def _outcome_label(*, parse_ok: bool, used_fallback: bool, error_kind: str | None) -> str:
|
||
if parse_ok and not used_fallback:
|
||
return "ok"
|
||
if used_fallback:
|
||
return "fallback"
|
||
return error_kind or "error"
|
||
|
||
|
||
def extract_token_usage(response: Any) -> tuple[int, int]:
|
||
"""从 LangChain AIMessage / chunk 解析 token 用量。"""
|
||
usage = getattr(response, "usage_metadata", None)
|
||
if usage is None and hasattr(response, "response_metadata"):
|
||
meta = getattr(response, "response_metadata", None) or {}
|
||
if isinstance(meta, dict):
|
||
usage = meta.get("token_usage") or meta.get("usage")
|
||
if usage is None:
|
||
return 0, 0
|
||
if isinstance(usage, dict):
|
||
inp = usage.get("input_tokens") or usage.get("prompt_tokens") or 0
|
||
out = usage.get("output_tokens") or usage.get("completion_tokens") or 0
|
||
return int(inp or 0), int(out or 0)
|
||
inp = getattr(usage, "input_tokens", None) or getattr(usage, "prompt_tokens", None) or 0
|
||
out = (
|
||
getattr(usage, "output_tokens", None)
|
||
or getattr(usage, "completion_tokens", None)
|
||
or 0
|
||
)
|
||
return int(inp or 0), int(out or 0)
|
||
|
||
|
||
def record_llm_completion(
|
||
*,
|
||
agent: str,
|
||
provider: str,
|
||
model: str,
|
||
duration_ms: float,
|
||
call_type: CallType = "chat",
|
||
outcome: str = "ok",
|
||
input_tokens: int = 0,
|
||
output_tokens: int = 0,
|
||
span: trace.Span | None = None,
|
||
extra_span_attributes: dict[str, Any] | None = None,
|
||
) -> None:
|
||
if not settings.otel_enabled:
|
||
return
|
||
|
||
_ensure_instruments()
|
||
attrs = {
|
||
"agent": agent,
|
||
"provider": provider,
|
||
"call_type": call_type,
|
||
"outcome": outcome,
|
||
}
|
||
if _duration_hist is not None:
|
||
_duration_hist.record(duration_ms, attrs)
|
||
if _call_counter is not None:
|
||
_call_counter.add(1, attrs)
|
||
if input_tokens > 0 and _tokens_in_counter is not None:
|
||
_tokens_in_counter.add(input_tokens, {"provider": provider, "agent": agent})
|
||
if output_tokens > 0 and _tokens_out_counter is not None:
|
||
_tokens_out_counter.add(output_tokens, {"provider": provider, "agent": agent})
|
||
|
||
if span is not None and span.is_recording():
|
||
span.set_attribute("llm.duration_ms", round(duration_ms, 2))
|
||
span.set_attribute("llm.call_type", call_type)
|
||
span.set_attribute("llm.outcome", outcome)
|
||
if input_tokens:
|
||
span.set_attribute("llm.tokens.input", input_tokens)
|
||
if output_tokens:
|
||
span.set_attribute("llm.tokens.output", output_tokens)
|
||
if extra_span_attributes:
|
||
for k, v in extra_span_attributes.items():
|
||
span.set_attribute(k, v)
|
||
if outcome == "ok":
|
||
span.set_status(Status(StatusCode.OK))
|
||
elif outcome == "fallback":
|
||
span.set_status(Status(StatusCode.OK, "fallback"))
|
||
else:
|
||
span.set_status(Status(StatusCode.ERROR, outcome))
|
||
|
||
|
||
@contextmanager
|
||
def langchain_invoke_span(
|
||
*,
|
||
agent: str,
|
||
provider: str,
|
||
model: str,
|
||
call_type: CallType,
|
||
prompt_sha12: str = "",
|
||
max_tokens: int | None = None,
|
||
) -> Iterator[dict[str, Any]]:
|
||
"""
|
||
包住 LangChain invoke/ainvoke;yield 可变 dict 供调用方写入 response 后触发 record。
|
||
keys: response, outcome, input_tokens, output_tokens, error_kind
|
||
"""
|
||
ctx: dict[str, Any] = {
|
||
"response": None,
|
||
"outcome": "ok",
|
||
"input_tokens": 0,
|
||
"output_tokens": 0,
|
||
}
|
||
if not settings.otel_enabled:
|
||
yield ctx
|
||
return
|
||
|
||
tracer = get_tracer("app.llm")
|
||
span_name = {
|
||
"json": "llm.json_invoke",
|
||
"chat": "llm.chat_invoke",
|
||
"stream": "llm.stream_invoke",
|
||
}.get(call_type, "llm.invoke")
|
||
attrs: dict[str, Any] = {
|
||
"llm.agent": agent,
|
||
"llm.provider": provider,
|
||
"llm.model": model or "unknown",
|
||
"llm.call_type": call_type,
|
||
}
|
||
if prompt_sha12:
|
||
attrs["llm.prompt_sha12"] = prompt_sha12
|
||
if max_tokens is not None:
|
||
attrs["llm.max_tokens"] = max_tokens
|
||
|
||
t0 = time.perf_counter()
|
||
with tracer.start_as_current_span(span_name, attributes=attrs) as span:
|
||
try:
|
||
yield ctx
|
||
except Exception:
|
||
ctx["outcome"] = "error"
|
||
raise
|
||
finally:
|
||
duration_ms = (time.perf_counter() - t0) * 1000
|
||
resp = ctx.get("response")
|
||
if resp is not None and not ctx.get("input_tokens") and not ctx.get("output_tokens"):
|
||
inp, out = extract_token_usage(resp)
|
||
ctx["input_tokens"] = inp
|
||
ctx["output_tokens"] = out
|
||
record_llm_completion(
|
||
agent=agent,
|
||
provider=provider,
|
||
model=model,
|
||
duration_ms=duration_ms,
|
||
call_type=call_type,
|
||
outcome=str(ctx.get("outcome") or "ok"),
|
||
input_tokens=int(ctx.get("input_tokens") or 0),
|
||
output_tokens=int(ctx.get("output_tokens") or 0),
|
||
span=span,
|
||
)
|
||
|
||
|
||
@contextmanager
|
||
def llm_call_span(
|
||
*,
|
||
agent: str,
|
||
schema_name: str,
|
||
provider: str,
|
||
model: str,
|
||
prompt_sha12: str,
|
||
max_tokens: int,
|
||
) -> Iterator[trace.Span]:
|
||
if not settings.otel_enabled:
|
||
yield trace.INVALID_SPAN
|
||
return
|
||
tracer = get_tracer("app.llm")
|
||
with tracer.start_as_current_span(
|
||
"llm.json_call",
|
||
attributes={
|
||
"llm.agent": agent,
|
||
"llm.schema_name": schema_name,
|
||
"llm.provider": provider,
|
||
"llm.model": model or "unknown",
|
||
"llm.prompt_sha12": prompt_sha12,
|
||
"llm.max_tokens": max_tokens,
|
||
"llm.call_type": "json",
|
||
},
|
||
) as span:
|
||
yield span
|
||
|
||
|
||
async def observe_ainvoke(
|
||
llm: Any,
|
||
messages: Any,
|
||
*,
|
||
agent: str,
|
||
provider: str = "deepseek",
|
||
model: str = "",
|
||
call_type: CallType = "chat",
|
||
extra_span_attributes: dict[str, Any] | None = None,
|
||
record_response_latency_ms: bool = True,
|
||
) -> Any:
|
||
"""包装 ``ainvoke``,统一 span + metrics。"""
|
||
t0 = time.perf_counter()
|
||
with langchain_invoke_span(
|
||
agent=agent,
|
||
provider=provider,
|
||
model=model,
|
||
call_type=call_type,
|
||
) as tel:
|
||
result = await llm.ainvoke(messages)
|
||
tel["response"] = result
|
||
span = trace.get_current_span()
|
||
if span.is_recording():
|
||
if record_response_latency_ms:
|
||
span.set_attribute(
|
||
"llm.response_latency_ms",
|
||
round((time.perf_counter() - t0) * 1000, 2),
|
||
)
|
||
if extra_span_attributes:
|
||
for key, value in extra_span_attributes.items():
|
||
if value is not None:
|
||
span.set_attribute(key, value)
|
||
return result
|
||
|
||
|
||
async def observe_astream(
|
||
llm: Any,
|
||
prompt: Any,
|
||
*,
|
||
agent: str,
|
||
provider: str = "deepseek",
|
||
model: str = "",
|
||
):
|
||
"""包装 ``astream``,记录 wall time 与可选 TTFT。"""
|
||
if not settings.otel_enabled:
|
||
async for chunk in llm.astream(prompt):
|
||
yield chunk
|
||
return
|
||
|
||
tracer = get_tracer("app.llm")
|
||
t0 = time.perf_counter()
|
||
ttft_ms: float | None = None
|
||
last_chunk: Any = None
|
||
with tracer.start_as_current_span(
|
||
"llm.stream_invoke",
|
||
attributes={
|
||
"llm.agent": agent,
|
||
"llm.provider": provider,
|
||
"llm.model": model or "unknown",
|
||
"llm.call_type": "stream",
|
||
},
|
||
) as span:
|
||
try:
|
||
async for chunk in llm.astream(prompt):
|
||
if ttft_ms is None and getattr(chunk, "content", None):
|
||
ttft_ms = (time.perf_counter() - t0) * 1000
|
||
last_chunk = chunk
|
||
yield chunk
|
||
except Exception:
|
||
duration_ms = (time.perf_counter() - t0) * 1000
|
||
record_llm_completion(
|
||
agent=agent,
|
||
provider=provider,
|
||
model=model,
|
||
duration_ms=duration_ms,
|
||
call_type="stream",
|
||
outcome="error",
|
||
span=span,
|
||
extra_span_attributes=(
|
||
{"llm.ttft_ms": round(ttft_ms, 2)} if ttft_ms is not None else None
|
||
),
|
||
)
|
||
raise
|
||
duration_ms = (time.perf_counter() - t0) * 1000
|
||
inp, out = extract_token_usage(last_chunk) if last_chunk else (0, 0)
|
||
extra: dict[str, Any] = {}
|
||
if ttft_ms is not None:
|
||
extra["llm.ttft_ms"] = round(ttft_ms, 2)
|
||
record_llm_completion(
|
||
agent=agent,
|
||
provider=provider,
|
||
model=model,
|
||
duration_ms=duration_ms,
|
||
call_type="stream",
|
||
outcome="ok",
|
||
input_tokens=inp,
|
||
output_tokens=out,
|
||
span=span,
|
||
extra_span_attributes=extra or None,
|
||
)
|
||
|
||
|
||
def record_llm_call(
|
||
*,
|
||
agent: str,
|
||
schema_name: str,
|
||
provider: str,
|
||
model: str,
|
||
duration_ms: float,
|
||
attempts: int,
|
||
parse_ok: bool,
|
||
used_fallback: bool,
|
||
error_kind: str | None,
|
||
prompt_sha12: str,
|
||
input_tokens: int = 0,
|
||
output_tokens: int = 0,
|
||
span: trace.Span | None = None,
|
||
) -> None:
|
||
outcome = _outcome_label(
|
||
parse_ok=parse_ok,
|
||
used_fallback=used_fallback,
|
||
error_kind=error_kind,
|
||
)
|
||
record_llm_completion(
|
||
agent=agent,
|
||
provider=provider,
|
||
model=model,
|
||
duration_ms=duration_ms,
|
||
call_type="json",
|
||
outcome=outcome,
|
||
input_tokens=input_tokens,
|
||
output_tokens=output_tokens,
|
||
span=span,
|
||
extra_span_attributes={
|
||
"llm.schema_name": schema_name,
|
||
"llm.attempts": attempts,
|
||
"llm.parse_ok": parse_ok,
|
||
"llm.used_fallback": used_fallback,
|
||
**({"llm.error_kind": error_kind} if error_kind else {}),
|
||
**({"llm.prompt_sha12": prompt_sha12} if prompt_sha12 else {}),
|
||
},
|
||
)
|