配置 SSOT(TOML + .env) 统一错误契约 Auth 与事务边界 Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client 可观测性(OpenTelemetry + LGTM)
220 lines
7.0 KiB
Python
220 lines
7.0 KiB
Python
"""
|
||
OpenTelemetry 初始化:traces / metrics / logs 导出至 OTLP Collector。
|
||
|
||
在 ``setup_logging()`` 之后、FastAPI / Celery 应用创建前调用 ``setup_telemetry(service_name=...)``。
|
||
``OTEL_ENABLED=false`` 时无操作,便于测试与无 Collector 环境。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
from typing import TYPE_CHECKING
|
||
|
||
from opentelemetry import metrics, trace
|
||
from opentelemetry._logs import set_logger_provider
|
||
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
|
||
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
|
||
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
|
||
from opentelemetry.instrumentation.celery import CeleryInstrumentor
|
||
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
|
||
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
|
||
from opentelemetry.instrumentation.logging import LoggingInstrumentor
|
||
from opentelemetry.instrumentation.redis import RedisInstrumentor
|
||
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
|
||
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
|
||
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
|
||
from opentelemetry.sdk.metrics import MeterProvider
|
||
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
|
||
from opentelemetry.sdk.resources import Resource
|
||
from opentelemetry.sdk.trace import TracerProvider
|
||
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||
from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio
|
||
|
||
from app.core.config import settings
|
||
from app.core.runtime_constants import otel_defaults
|
||
|
||
if TYPE_CHECKING:
|
||
from fastapi import FastAPI
|
||
|
||
_initialized = False
|
||
_otel_logging_handler: LoggingHandler | None = None
|
||
_tracer_provider: TracerProvider | None = None
|
||
_meter_provider: MeterProvider | None = None
|
||
_log_provider: LoggerProvider | None = None
|
||
|
||
|
||
def _build_resource(service_name: str) -> Resource:
|
||
return Resource.create(
|
||
{
|
||
"service.name": service_name,
|
||
"deployment.environment": settings.app_environment,
|
||
"service.version": "0.2.0",
|
||
}
|
||
)
|
||
|
||
|
||
def _build_sampler():
|
||
from opentelemetry.sdk.trace.sampling import (
|
||
ALWAYS_OFF,
|
||
ALWAYS_ON,
|
||
TraceIdRatioBased,
|
||
)
|
||
|
||
name = (otel_defaults.traces_sampler(settings.app_environment) or "always_on").strip().lower()
|
||
arg = otel_defaults.traces_sampler_arg(settings.app_environment)
|
||
if name in ("always_on", "alwayson"):
|
||
return ALWAYS_ON
|
||
if name in ("always_off", "alwaysoff"):
|
||
return ALWAYS_OFF
|
||
ratio = 0.1 if arg is None else arg
|
||
if name == "traceidratio":
|
||
return TraceIdRatioBased(ratio)
|
||
return ParentBasedTraceIdRatio(ratio)
|
||
|
||
|
||
def _otlp_timeout_seconds() -> int | None:
|
||
env = (settings.app_environment or "").strip().lower()
|
||
if env == "development":
|
||
return 3
|
||
return 10
|
||
|
||
|
||
def setup_telemetry(*, service_name: str) -> None:
|
||
"""配置 OTLP exporter 与自动 instrumentation(幂等)。"""
|
||
global _initialized, _otel_logging_handler
|
||
global _tracer_provider, _meter_provider, _log_provider
|
||
if _initialized or not settings.otel_enabled:
|
||
return
|
||
|
||
endpoint = settings.otel_exporter_otlp_endpoint.rstrip("/")
|
||
insecure = otel_defaults.exporter_insecure
|
||
timeout = _otlp_timeout_seconds()
|
||
|
||
resource = _build_resource(service_name)
|
||
|
||
span_exporter = OTLPSpanExporter(
|
||
endpoint=endpoint, insecure=insecure, timeout=timeout
|
||
)
|
||
_tracer_provider = TracerProvider(resource=resource, sampler=_build_sampler())
|
||
_tracer_provider.add_span_processor(
|
||
BatchSpanProcessor(span_exporter, export_timeout_millis=(timeout or 10) * 1000)
|
||
)
|
||
trace.set_tracer_provider(_tracer_provider)
|
||
|
||
metric_exporter = OTLPMetricExporter(
|
||
endpoint=endpoint, insecure=insecure, timeout=timeout
|
||
)
|
||
metric_reader = PeriodicExportingMetricReader(
|
||
metric_exporter,
|
||
export_interval_millis=otel_defaults.metric_export_interval_ms,
|
||
)
|
||
_meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
|
||
metrics.set_meter_provider(_meter_provider)
|
||
|
||
log_exporter = OTLPLogExporter(endpoint=endpoint, insecure=insecure, timeout=timeout)
|
||
_log_provider = LoggerProvider(resource=resource)
|
||
_log_provider.add_log_record_processor(
|
||
BatchLogRecordProcessor(
|
||
log_exporter, export_timeout_millis=(timeout or 10) * 1000
|
||
)
|
||
)
|
||
set_logger_provider(_log_provider)
|
||
|
||
LoggingInstrumentor().instrument(set_logging_format=True)
|
||
_otel_logging_handler = LoggingHandler(
|
||
level=logging.NOTSET,
|
||
logger_provider=_log_provider,
|
||
)
|
||
logging.getLogger().addHandler(_otel_logging_handler)
|
||
|
||
HTTPXClientInstrumentor().instrument()
|
||
RedisInstrumentor().instrument()
|
||
SQLAlchemyInstrumentor().instrument()
|
||
|
||
_initialized = True
|
||
|
||
|
||
def shutdown_telemetry() -> None:
|
||
"""停止 OTLP 导出线程并卸载 instrumentation(测试进程退出 / 热重载 / Ctrl+C 前调用)。"""
|
||
global _initialized, _otel_logging_handler
|
||
global _tracer_provider, _meter_provider, _log_provider
|
||
if not _initialized:
|
||
return
|
||
|
||
for name in (
|
||
"opentelemetry",
|
||
"opentelemetry.sdk",
|
||
"opentelemetry.exporter",
|
||
"opentelemetry.exporter.otlp",
|
||
):
|
||
logging.getLogger(name).setLevel(logging.CRITICAL)
|
||
|
||
if _otel_logging_handler is not None:
|
||
logging.getLogger().removeHandler(_otel_logging_handler)
|
||
_otel_logging_handler = None
|
||
|
||
try:
|
||
FastAPIInstrumentor().uninstrument()
|
||
except Exception:
|
||
pass
|
||
|
||
for instrumentor in (
|
||
LoggingInstrumentor(),
|
||
HTTPXClientInstrumentor(),
|
||
RedisInstrumentor(),
|
||
SQLAlchemyInstrumentor(),
|
||
CeleryInstrumentor(),
|
||
):
|
||
try:
|
||
instrumentor.uninstrument()
|
||
except Exception:
|
||
pass
|
||
|
||
for provider in (_log_provider, _meter_provider, _tracer_provider):
|
||
if provider is None:
|
||
continue
|
||
try:
|
||
provider.shutdown()
|
||
except Exception:
|
||
pass
|
||
|
||
_tracer_provider = None
|
||
_meter_provider = None
|
||
_log_provider = None
|
||
_initialized = False
|
||
|
||
|
||
def instrument_fastapi_app(app: FastAPI) -> None:
|
||
if not settings.otel_enabled:
|
||
return
|
||
FastAPIInstrumentor.instrument_app(
|
||
app,
|
||
excluded_urls="/health",
|
||
)
|
||
|
||
|
||
def instrument_celery() -> None:
|
||
if not settings.otel_enabled:
|
||
return
|
||
CeleryInstrumentor().instrument()
|
||
|
||
|
||
def get_tracer(name: str):
|
||
return trace.get_tracer(name)
|
||
|
||
|
||
def get_meter(name: str):
|
||
return metrics.get_meter(name)
|
||
|
||
|
||
def current_trace_context() -> dict[str, str]:
|
||
"""返回当前 span 的 trace_id / span_id(十六进制),无活跃 span 时为空 dict。"""
|
||
span = trace.get_current_span()
|
||
ctx = span.get_span_context()
|
||
if not ctx.is_valid:
|
||
return {}
|
||
return {
|
||
"trace_id": format(ctx.trace_id, "032x"),
|
||
"span_id": format(ctx.span_id, "016x"),
|
||
}
|