配置 SSOT(TOML + .env) 统一错误契约 Auth 与事务边界 Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client 可观测性(OpenTelemetry + LGTM)
245 lines
7.7 KiB
Python
245 lines
7.7 KiB
Python
"""
|
|
全局异常体系。
|
|
|
|
只统一*错误*响应格式 {error_code, message, request_id}。
|
|
成功响应直接返回 Pydantic model / FileResponse / 原始结构,不强制包装。
|
|
"""
|
|
|
|
from fastapi import FastAPI, HTTPException, Request
|
|
from fastapi.exceptions import RequestValidationError
|
|
from fastapi.responses import JSONResponse
|
|
|
|
from app.core.logging import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class AppError(Exception):
|
|
"""Base application exception."""
|
|
|
|
def __init__(
|
|
self,
|
|
message: str = "内部服务器错误",
|
|
*,
|
|
status_code: int = 500,
|
|
error_code: str = "INTERNAL_ERROR",
|
|
):
|
|
self.message = message
|
|
self.status_code = status_code
|
|
self.error_code = error_code
|
|
super().__init__(message)
|
|
|
|
|
|
class NotFoundError(AppError):
|
|
def __init__(self, message: str = "资源不存在"):
|
|
super().__init__(message, status_code=404, error_code="NOT_FOUND")
|
|
|
|
|
|
class BadRequestError(AppError):
|
|
def __init__(self, message: str = "请求无效"):
|
|
super().__init__(message, status_code=400, error_code="BAD_REQUEST")
|
|
|
|
|
|
class AuthenticationError(AppError):
|
|
def __init__(self, message: str = "认证失败"):
|
|
super().__init__(message, status_code=401, error_code="AUTHENTICATION_FAILED")
|
|
|
|
|
|
class AuthorizationError(AppError):
|
|
def __init__(self, message: str = "权限不足"):
|
|
super().__init__(message, status_code=403, error_code="FORBIDDEN")
|
|
|
|
|
|
class ValidationError(AppError):
|
|
def __init__(self, message: str = "请求参数无效"):
|
|
super().__init__(message, status_code=422, error_code="VALIDATION_ERROR")
|
|
|
|
|
|
class ConflictError(AppError):
|
|
def __init__(self, message: str = "资源冲突"):
|
|
super().__init__(message, status_code=409, error_code="CONFLICT")
|
|
|
|
|
|
class ServiceUnavailableError(AppError):
|
|
def __init__(self, message: str = "服务暂时不可用"):
|
|
super().__init__(message, status_code=503, error_code="SERVICE_UNAVAILABLE")
|
|
|
|
|
|
class GatewayTimeoutError(AppError):
|
|
def __init__(self, message: str = "网关超时"):
|
|
super().__init__(message, status_code=504, error_code="GATEWAY_TIMEOUT")
|
|
|
|
|
|
class ProviderError(AppError):
|
|
def __init__(self, message: str = "外部服务异常", *, provider: str = ""):
|
|
super().__init__(message, status_code=502, error_code="PROVIDER_ERROR")
|
|
self.provider = provider
|
|
|
|
|
|
class QuotaExceededError(AppError):
|
|
def __init__(self, message: str = "配额已用尽"):
|
|
super().__init__(message, status_code=429, error_code="QUOTA_EXCEEDED")
|
|
|
|
|
|
class RateLimitedError(AppError):
|
|
def __init__(self, message: str = "请求过于频繁"):
|
|
super().__init__(message, status_code=429, error_code="RATE_LIMITED")
|
|
|
|
|
|
# ── Error response helpers ───────────────────────────────────
|
|
|
|
|
|
def _get_request_id(request: Request) -> str:
|
|
rid = getattr(request.state, "request_id", None)
|
|
if rid:
|
|
return str(rid)
|
|
scope_state = request.scope.get("state")
|
|
if isinstance(scope_state, dict):
|
|
return str(scope_state.get("request_id", "-"))
|
|
if scope_state is not None:
|
|
return str(getattr(scope_state, "request_id", "-"))
|
|
return "-"
|
|
|
|
|
|
def _error_response(
|
|
*,
|
|
status_code: int,
|
|
error_code: str,
|
|
message: str,
|
|
request_id: str,
|
|
headers: dict[str, str] | None = None,
|
|
) -> JSONResponse:
|
|
return JSONResponse(
|
|
status_code=status_code,
|
|
content={
|
|
"error_code": error_code,
|
|
"message": message,
|
|
"request_id": request_id,
|
|
},
|
|
headers=headers,
|
|
)
|
|
|
|
|
|
def _http_detail_to_message(detail: object) -> str:
|
|
if isinstance(detail, str):
|
|
return detail
|
|
if isinstance(detail, list):
|
|
parts: list[str] = []
|
|
for item in detail:
|
|
if isinstance(item, dict):
|
|
loc = item.get("loc")
|
|
msg = item.get("msg", "")
|
|
if loc:
|
|
parts.append(f"{'.'.join(str(x) for x in loc)}: {msg}")
|
|
else:
|
|
parts.append(str(msg))
|
|
else:
|
|
parts.append(str(item))
|
|
return "; ".join(parts) if parts else "请求无效"
|
|
return str(detail)
|
|
|
|
|
|
_STATUS_TO_ERROR_CODE: dict[int, str] = {
|
|
400: "BAD_REQUEST",
|
|
401: "AUTHENTICATION_FAILED",
|
|
403: "FORBIDDEN",
|
|
404: "NOT_FOUND",
|
|
409: "CONFLICT",
|
|
422: "VALIDATION_ERROR",
|
|
502: "PROVIDER_ERROR",
|
|
503: "SERVICE_UNAVAILABLE",
|
|
504: "GATEWAY_TIMEOUT",
|
|
}
|
|
|
|
|
|
def _error_code_for_status(status_code: int) -> str:
|
|
if status_code == 429:
|
|
# HTTP 429 is shared by QuotaExceededError and RateLimitedError; legacy
|
|
# HTTPException has no explicit code — default to rate limiting.
|
|
return "RATE_LIMITED"
|
|
mapped = _STATUS_TO_ERROR_CODE.get(status_code)
|
|
if mapped is not None:
|
|
return mapped
|
|
if status_code >= 500:
|
|
return "INTERNAL_ERROR"
|
|
return "BAD_REQUEST"
|
|
|
|
|
|
# ── Exception handler registration ──────────────────────────
|
|
|
|
|
|
def register_exception_handlers(app: FastAPI) -> None:
|
|
"""Register global exception handlers on the FastAPI app."""
|
|
|
|
@app.exception_handler(AppError)
|
|
async def app_error_handler(request: Request, exc: AppError):
|
|
request_id = _get_request_id(request)
|
|
headers: dict[str, str] | None = None
|
|
if exc.status_code == 401:
|
|
headers = {"WWW-Authenticate": "Bearer"}
|
|
logger.warning(
|
|
"AppError: error_code={} message={} request_id={}",
|
|
exc.error_code,
|
|
exc.message,
|
|
request_id,
|
|
)
|
|
return _error_response(
|
|
status_code=exc.status_code,
|
|
error_code=exc.error_code,
|
|
message=exc.message,
|
|
request_id=request_id,
|
|
headers=headers,
|
|
)
|
|
|
|
@app.exception_handler(HTTPException)
|
|
async def http_exception_handler(request: Request, exc: HTTPException):
|
|
request_id = _get_request_id(request)
|
|
message = _http_detail_to_message(exc.detail)
|
|
error_code = _error_code_for_status(exc.status_code)
|
|
logger.warning(
|
|
"HTTPException: status={} error_code={} message={} request_id={}",
|
|
exc.status_code,
|
|
error_code,
|
|
message,
|
|
request_id,
|
|
)
|
|
headers = dict(exc.headers) if exc.headers else None
|
|
if exc.status_code == 401 and headers is not None:
|
|
headers.setdefault("WWW-Authenticate", "Bearer")
|
|
elif exc.status_code == 401:
|
|
headers = {"WWW-Authenticate": "Bearer"}
|
|
return _error_response(
|
|
status_code=exc.status_code,
|
|
error_code=error_code,
|
|
message=message,
|
|
request_id=request_id,
|
|
headers=headers,
|
|
)
|
|
|
|
@app.exception_handler(RequestValidationError)
|
|
async def validation_error_handler(request: Request, exc: RequestValidationError):
|
|
request_id = _get_request_id(request)
|
|
message = _http_detail_to_message(exc.errors())
|
|
logger.warning(
|
|
"RequestValidationError: message={} request_id={}",
|
|
message,
|
|
request_id,
|
|
)
|
|
return _error_response(
|
|
status_code=422,
|
|
error_code="VALIDATION_ERROR",
|
|
message=message,
|
|
request_id=request_id,
|
|
)
|
|
|
|
@app.exception_handler(Exception)
|
|
async def unhandled_error_handler(request: Request, exc: Exception):
|
|
request_id = _get_request_id(request)
|
|
logger.exception("Unhandled exception: request_id={}", request_id)
|
|
return _error_response(
|
|
status_code=500,
|
|
error_code="INTERNAL_ERROR",
|
|
message="服务器内部错误",
|
|
request_id=request_id,
|
|
)
|