"""传输层 `llm_http_openai_chat_errors` 的中性错讯;兼容 re-export 仍经 openai_compatible_errors。""" import httpx import pytest from openai import APIStatusError from app.core.openai_compatible_errors import ( extract_openai_http_status, format_openai_compatible_http_error_message, should_log_openai_error_as_warning, ) def _status_error(status: int, *, body: object | None = None) -> APIStatusError: req = httpx.Request("POST", "https://api.deepseek.com/v1/chat/completions") resp = httpx.Response(status, request=req, json=body if body is not None else {}) return APIStatusError("upstream", response=resp, body=body) def test_extract_status_from_api_status_error() -> None: e = _status_error(429) assert extract_openai_http_status(e) == 429 def test_format_402_balance_chinese_message() -> None: e = _status_error( 402, body={"error": {"message": "Insufficient balance", "type": "insufficient_quota"}}, ) msg = format_openai_compatible_http_error_message(e) assert msg is not None assert "402" in msg assert "余额" in msg def test_format_401_and_warning_flag() -> None: e = _status_error(401, body={"error": {"message": "invalid api key"}}) assert should_log_openai_error_as_warning(e) is True m = format_openai_compatible_http_error_message(e) assert m is not None assert "401" in m assert "密钥" in m def test_format_503_server_busy() -> None: e = _status_error(503) m = format_openai_compatible_http_error_message(e) assert m is not None assert "503" in m assert should_log_openai_error_as_warning(e) is False def test_format_httpx_http_status_error() -> None: req = httpx.Request("GET", "https://api.deepseek.com/v1/models") resp = httpx.Response(429, request=req) try: resp.raise_for_status() except httpx.HTTPStatusError as e: m = format_openai_compatible_http_error_message(e) assert m is not None assert "429" in m def test_unknown_status_418() -> None: e = _status_error(418) m = format_openai_compatible_http_error_message(e) assert m is not None assert "418" in m