Files
life-echo/api/app/core/errors.py
Sully 53e0065e3e refactor(api): TOML 配置 SSOT、统一错误契约、Auth/事务加固与可观测性 (#33)
配置 SSOT(TOML + .env)
统一错误契约
Auth 与事务边界
Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client
可观测性(OpenTelemetry + LGTM)
2026-05-22 13:44:50 +08:00

245 lines
7.7 KiB
Python

"""
全局异常体系。
只统一*错误*响应格式 {error_code, message, request_id}。
成功响应直接返回 Pydantic model / FileResponse / 原始结构,不强制包装。
"""
from fastapi import FastAPI, HTTPException, Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from app.core.logging import get_logger
logger = get_logger(__name__)
class AppError(Exception):
"""Base application exception."""
def __init__(
self,
message: str = "内部服务器错误",
*,
status_code: int = 500,
error_code: str = "INTERNAL_ERROR",
):
self.message = message
self.status_code = status_code
self.error_code = error_code
super().__init__(message)
class NotFoundError(AppError):
def __init__(self, message: str = "资源不存在"):
super().__init__(message, status_code=404, error_code="NOT_FOUND")
class BadRequestError(AppError):
def __init__(self, message: str = "请求无效"):
super().__init__(message, status_code=400, error_code="BAD_REQUEST")
class AuthenticationError(AppError):
def __init__(self, message: str = "认证失败"):
super().__init__(message, status_code=401, error_code="AUTHENTICATION_FAILED")
class AuthorizationError(AppError):
def __init__(self, message: str = "权限不足"):
super().__init__(message, status_code=403, error_code="FORBIDDEN")
class ValidationError(AppError):
def __init__(self, message: str = "请求参数无效"):
super().__init__(message, status_code=422, error_code="VALIDATION_ERROR")
class ConflictError(AppError):
def __init__(self, message: str = "资源冲突"):
super().__init__(message, status_code=409, error_code="CONFLICT")
class ServiceUnavailableError(AppError):
def __init__(self, message: str = "服务暂时不可用"):
super().__init__(message, status_code=503, error_code="SERVICE_UNAVAILABLE")
class GatewayTimeoutError(AppError):
def __init__(self, message: str = "网关超时"):
super().__init__(message, status_code=504, error_code="GATEWAY_TIMEOUT")
class ProviderError(AppError):
def __init__(self, message: str = "外部服务异常", *, provider: str = ""):
super().__init__(message, status_code=502, error_code="PROVIDER_ERROR")
self.provider = provider
class QuotaExceededError(AppError):
def __init__(self, message: str = "配额已用尽"):
super().__init__(message, status_code=429, error_code="QUOTA_EXCEEDED")
class RateLimitedError(AppError):
def __init__(self, message: str = "请求过于频繁"):
super().__init__(message, status_code=429, error_code="RATE_LIMITED")
# ── Error response helpers ───────────────────────────────────
def _get_request_id(request: Request) -> str:
rid = getattr(request.state, "request_id", None)
if rid:
return str(rid)
scope_state = request.scope.get("state")
if isinstance(scope_state, dict):
return str(scope_state.get("request_id", "-"))
if scope_state is not None:
return str(getattr(scope_state, "request_id", "-"))
return "-"
def _error_response(
*,
status_code: int,
error_code: str,
message: str,
request_id: str,
headers: dict[str, str] | None = None,
) -> JSONResponse:
return JSONResponse(
status_code=status_code,
content={
"error_code": error_code,
"message": message,
"request_id": request_id,
},
headers=headers,
)
def _http_detail_to_message(detail: object) -> str:
if isinstance(detail, str):
return detail
if isinstance(detail, list):
parts: list[str] = []
for item in detail:
if isinstance(item, dict):
loc = item.get("loc")
msg = item.get("msg", "")
if loc:
parts.append(f"{'.'.join(str(x) for x in loc)}: {msg}")
else:
parts.append(str(msg))
else:
parts.append(str(item))
return "; ".join(parts) if parts else "请求无效"
return str(detail)
_STATUS_TO_ERROR_CODE: dict[int, str] = {
400: "BAD_REQUEST",
401: "AUTHENTICATION_FAILED",
403: "FORBIDDEN",
404: "NOT_FOUND",
409: "CONFLICT",
422: "VALIDATION_ERROR",
502: "PROVIDER_ERROR",
503: "SERVICE_UNAVAILABLE",
504: "GATEWAY_TIMEOUT",
}
def _error_code_for_status(status_code: int) -> str:
if status_code == 429:
# HTTP 429 is shared by QuotaExceededError and RateLimitedError; legacy
# HTTPException has no explicit code — default to rate limiting.
return "RATE_LIMITED"
mapped = _STATUS_TO_ERROR_CODE.get(status_code)
if mapped is not None:
return mapped
if status_code >= 500:
return "INTERNAL_ERROR"
return "BAD_REQUEST"
# ── Exception handler registration ──────────────────────────
def register_exception_handlers(app: FastAPI) -> None:
"""Register global exception handlers on the FastAPI app."""
@app.exception_handler(AppError)
async def app_error_handler(request: Request, exc: AppError):
request_id = _get_request_id(request)
headers: dict[str, str] | None = None
if exc.status_code == 401:
headers = {"WWW-Authenticate": "Bearer"}
logger.warning(
"AppError: error_code={} message={} request_id={}",
exc.error_code,
exc.message,
request_id,
)
return _error_response(
status_code=exc.status_code,
error_code=exc.error_code,
message=exc.message,
request_id=request_id,
headers=headers,
)
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
request_id = _get_request_id(request)
message = _http_detail_to_message(exc.detail)
error_code = _error_code_for_status(exc.status_code)
logger.warning(
"HTTPException: status={} error_code={} message={} request_id={}",
exc.status_code,
error_code,
message,
request_id,
)
headers = dict(exc.headers) if exc.headers else None
if exc.status_code == 401 and headers is not None:
headers.setdefault("WWW-Authenticate", "Bearer")
elif exc.status_code == 401:
headers = {"WWW-Authenticate": "Bearer"}
return _error_response(
status_code=exc.status_code,
error_code=error_code,
message=message,
request_id=request_id,
headers=headers,
)
@app.exception_handler(RequestValidationError)
async def validation_error_handler(request: Request, exc: RequestValidationError):
request_id = _get_request_id(request)
message = _http_detail_to_message(exc.errors())
logger.warning(
"RequestValidationError: message={} request_id={}",
message,
request_id,
)
return _error_response(
status_code=422,
error_code="VALIDATION_ERROR",
message=message,
request_id=request_id,
)
@app.exception_handler(Exception)
async def unhandled_error_handler(request: Request, exc: Exception):
request_id = _get_request_id(request)
logger.exception("Unhandled exception: request_id={}", request_id)
return _error_response(
status_code=500,
error_code="INTERNAL_ERROR",
message="服务器内部错误",
request_id=request_id,
)