""" OpenTelemetry 初始化:traces / metrics / logs 导出至 OTLP Collector。 在 ``setup_logging()`` 之后、FastAPI / Celery 应用创建前调用 ``setup_telemetry(service_name=...)``。 ``OTEL_ENABLED=false`` 时无操作,便于测试与无 Collector 环境。 """ from __future__ import annotations import logging from typing import TYPE_CHECKING from opentelemetry import metrics, trace from opentelemetry._logs import set_logger_provider from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor from opentelemetry.instrumentation.logging import LoggingInstrumentor from opentelemetry.instrumentation.redis import RedisInstrumentor from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler from opentelemetry.sdk._logs.export import BatchLogRecordProcessor from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio from app.core.config import settings from app.core.runtime_constants import otel_defaults if TYPE_CHECKING: from fastapi import FastAPI _initialized = False _otel_logging_handler: LoggingHandler | None = None _tracer_provider: TracerProvider | None = None _meter_provider: MeterProvider | None = None _log_provider: LoggerProvider | None = None def _build_resource(service_name: str) -> Resource: return Resource.create( { "service.name": service_name, "deployment.environment": settings.app_environment, "service.version": "0.2.0", } ) def _build_sampler(): from opentelemetry.sdk.trace.sampling import ( ALWAYS_OFF, ALWAYS_ON, TraceIdRatioBased, ) name = (otel_defaults.traces_sampler(settings.app_environment) or "always_on").strip().lower() arg = otel_defaults.traces_sampler_arg(settings.app_environment) if name in ("always_on", "alwayson"): return ALWAYS_ON if name in ("always_off", "alwaysoff"): return ALWAYS_OFF ratio = 0.1 if arg is None else arg if name == "traceidratio": return TraceIdRatioBased(ratio) return ParentBasedTraceIdRatio(ratio) def _otlp_timeout_seconds() -> int | None: env = (settings.app_environment or "").strip().lower() if env == "development": return 3 return 10 def setup_telemetry(*, service_name: str) -> None: """配置 OTLP exporter 与自动 instrumentation(幂等)。""" global _initialized, _otel_logging_handler global _tracer_provider, _meter_provider, _log_provider if _initialized or not settings.otel_enabled: return endpoint = settings.otel_exporter_otlp_endpoint.rstrip("/") insecure = otel_defaults.exporter_insecure timeout = _otlp_timeout_seconds() resource = _build_resource(service_name) span_exporter = OTLPSpanExporter( endpoint=endpoint, insecure=insecure, timeout=timeout ) _tracer_provider = TracerProvider(resource=resource, sampler=_build_sampler()) _tracer_provider.add_span_processor( BatchSpanProcessor(span_exporter, export_timeout_millis=(timeout or 10) * 1000) ) trace.set_tracer_provider(_tracer_provider) metric_exporter = OTLPMetricExporter( endpoint=endpoint, insecure=insecure, timeout=timeout ) metric_reader = PeriodicExportingMetricReader( metric_exporter, export_interval_millis=otel_defaults.metric_export_interval_ms, ) _meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader]) metrics.set_meter_provider(_meter_provider) log_exporter = OTLPLogExporter(endpoint=endpoint, insecure=insecure, timeout=timeout) _log_provider = LoggerProvider(resource=resource) _log_provider.add_log_record_processor( BatchLogRecordProcessor( log_exporter, export_timeout_millis=(timeout or 10) * 1000 ) ) set_logger_provider(_log_provider) LoggingInstrumentor().instrument(set_logging_format=True) _otel_logging_handler = LoggingHandler( level=logging.NOTSET, logger_provider=_log_provider, ) logging.getLogger().addHandler(_otel_logging_handler) HTTPXClientInstrumentor().instrument() RedisInstrumentor().instrument() SQLAlchemyInstrumentor().instrument() _initialized = True def shutdown_telemetry() -> None: """停止 OTLP 导出线程并卸载 instrumentation(测试进程退出 / 热重载 / Ctrl+C 前调用)。""" global _initialized, _otel_logging_handler global _tracer_provider, _meter_provider, _log_provider if not _initialized: return for name in ( "opentelemetry", "opentelemetry.sdk", "opentelemetry.exporter", "opentelemetry.exporter.otlp", ): logging.getLogger(name).setLevel(logging.CRITICAL) if _otel_logging_handler is not None: logging.getLogger().removeHandler(_otel_logging_handler) _otel_logging_handler = None try: FastAPIInstrumentor().uninstrument() except Exception: pass for instrumentor in ( LoggingInstrumentor(), HTTPXClientInstrumentor(), RedisInstrumentor(), SQLAlchemyInstrumentor(), CeleryInstrumentor(), ): try: instrumentor.uninstrument() except Exception: pass for provider in (_log_provider, _meter_provider, _tracer_provider): if provider is None: continue try: provider.shutdown() except Exception: pass _tracer_provider = None _meter_provider = None _log_provider = None _initialized = False def instrument_fastapi_app(app: FastAPI) -> None: if not settings.otel_enabled: return FastAPIInstrumentor.instrument_app( app, excluded_urls="/health", ) def instrument_celery() -> None: if not settings.otel_enabled: return CeleryInstrumentor().instrument() def get_tracer(name: str): return trace.get_tracer(name) def get_meter(name: str): return metrics.get_meter(name) def current_trace_context() -> dict[str, str]: """返回当前 span 的 trace_id / span_id(十六进制),无活跃 span 时为空 dict。""" span = trace.get_current_span() ctx = span.get_span_context() if not ctx.is_valid: return {} return { "trace_id": format(ctx.trace_id, "032x"), "span_id": format(ctx.span_id, "016x"), }