Files
life-echo/api/app/core/telemetry.py
Sully 53e0065e3e refactor(api): TOML 配置 SSOT、统一错误契约、Auth/事务加固与可观测性 (#33)
配置 SSOT(TOML + .env)
统一错误契约
Auth 与事务边界
Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client
可观测性(OpenTelemetry + LGTM)
2026-05-22 13:44:50 +08:00

220 lines
7.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
OpenTelemetry 初始化traces / metrics / logs 导出至 OTLP Collector。
在 ``setup_logging()`` 之后、FastAPI / Celery 应用创建前调用 ``setup_telemetry(service_name=...)``。
``OTEL_ENABLED=false`` 时无操作,便于测试与无 Collector 环境。
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from opentelemetry import metrics, trace
from opentelemetry._logs import set_logger_provider
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.logging import LoggingInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio
from app.core.config import settings
from app.core.runtime_constants import otel_defaults
if TYPE_CHECKING:
from fastapi import FastAPI
_initialized = False
_otel_logging_handler: LoggingHandler | None = None
_tracer_provider: TracerProvider | None = None
_meter_provider: MeterProvider | None = None
_log_provider: LoggerProvider | None = None
def _build_resource(service_name: str) -> Resource:
return Resource.create(
{
"service.name": service_name,
"deployment.environment": settings.app_environment,
"service.version": "0.2.0",
}
)
def _build_sampler():
from opentelemetry.sdk.trace.sampling import (
ALWAYS_OFF,
ALWAYS_ON,
TraceIdRatioBased,
)
name = (otel_defaults.traces_sampler(settings.app_environment) or "always_on").strip().lower()
arg = otel_defaults.traces_sampler_arg(settings.app_environment)
if name in ("always_on", "alwayson"):
return ALWAYS_ON
if name in ("always_off", "alwaysoff"):
return ALWAYS_OFF
ratio = 0.1 if arg is None else arg
if name == "traceidratio":
return TraceIdRatioBased(ratio)
return ParentBasedTraceIdRatio(ratio)
def _otlp_timeout_seconds() -> int | None:
env = (settings.app_environment or "").strip().lower()
if env == "development":
return 3
return 10
def setup_telemetry(*, service_name: str) -> None:
"""配置 OTLP exporter 与自动 instrumentation幂等"""
global _initialized, _otel_logging_handler
global _tracer_provider, _meter_provider, _log_provider
if _initialized or not settings.otel_enabled:
return
endpoint = settings.otel_exporter_otlp_endpoint.rstrip("/")
insecure = otel_defaults.exporter_insecure
timeout = _otlp_timeout_seconds()
resource = _build_resource(service_name)
span_exporter = OTLPSpanExporter(
endpoint=endpoint, insecure=insecure, timeout=timeout
)
_tracer_provider = TracerProvider(resource=resource, sampler=_build_sampler())
_tracer_provider.add_span_processor(
BatchSpanProcessor(span_exporter, export_timeout_millis=(timeout or 10) * 1000)
)
trace.set_tracer_provider(_tracer_provider)
metric_exporter = OTLPMetricExporter(
endpoint=endpoint, insecure=insecure, timeout=timeout
)
metric_reader = PeriodicExportingMetricReader(
metric_exporter,
export_interval_millis=otel_defaults.metric_export_interval_ms,
)
_meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
metrics.set_meter_provider(_meter_provider)
log_exporter = OTLPLogExporter(endpoint=endpoint, insecure=insecure, timeout=timeout)
_log_provider = LoggerProvider(resource=resource)
_log_provider.add_log_record_processor(
BatchLogRecordProcessor(
log_exporter, export_timeout_millis=(timeout or 10) * 1000
)
)
set_logger_provider(_log_provider)
LoggingInstrumentor().instrument(set_logging_format=True)
_otel_logging_handler = LoggingHandler(
level=logging.NOTSET,
logger_provider=_log_provider,
)
logging.getLogger().addHandler(_otel_logging_handler)
HTTPXClientInstrumentor().instrument()
RedisInstrumentor().instrument()
SQLAlchemyInstrumentor().instrument()
_initialized = True
def shutdown_telemetry() -> None:
"""停止 OTLP 导出线程并卸载 instrumentation测试进程退出 / 热重载 / Ctrl+C 前调用)。"""
global _initialized, _otel_logging_handler
global _tracer_provider, _meter_provider, _log_provider
if not _initialized:
return
for name in (
"opentelemetry",
"opentelemetry.sdk",
"opentelemetry.exporter",
"opentelemetry.exporter.otlp",
):
logging.getLogger(name).setLevel(logging.CRITICAL)
if _otel_logging_handler is not None:
logging.getLogger().removeHandler(_otel_logging_handler)
_otel_logging_handler = None
try:
FastAPIInstrumentor().uninstrument()
except Exception:
pass
for instrumentor in (
LoggingInstrumentor(),
HTTPXClientInstrumentor(),
RedisInstrumentor(),
SQLAlchemyInstrumentor(),
CeleryInstrumentor(),
):
try:
instrumentor.uninstrument()
except Exception:
pass
for provider in (_log_provider, _meter_provider, _tracer_provider):
if provider is None:
continue
try:
provider.shutdown()
except Exception:
pass
_tracer_provider = None
_meter_provider = None
_log_provider = None
_initialized = False
def instrument_fastapi_app(app: FastAPI) -> None:
if not settings.otel_enabled:
return
FastAPIInstrumentor.instrument_app(
app,
excluded_urls="/health",
)
def instrument_celery() -> None:
if not settings.otel_enabled:
return
CeleryInstrumentor().instrument()
def get_tracer(name: str):
return trace.get_tracer(name)
def get_meter(name: str):
return metrics.get_meter(name)
def current_trace_context() -> dict[str, str]:
"""返回当前 span 的 trace_id / span_id十六进制无活跃 span 时为空 dict。"""
span = trace.get_current_span()
ctx = span.get_span_context()
if not ctx.is_valid:
return {}
return {
"trace_id": format(ctx.trace_id, "032x"),
"span_id": format(ctx.span_id, "016x"),
}