- 回忆录:事实边界补充允许清单;传记文体示例与 JSON 叙事要求对齐 - default 职业提示 occupation_context;cadre/military 退休语境 - GET 章节读路径零写入,prepare_chapter_read_view + markdown_for_response - 文本归一抽到 core/text_normalize;移除弃用 reply 策略与 recompose_chapters_for_story - ConversationService:WS 连接/用户段落/结束对话;对外错误固定文案 - 测试:HTTP 脱敏契约、章节读视图、occupation 与 background_voice
76 lines
3.0 KiB
Python
76 lines
3.0 KiB
Python
"""HTTP 层对外错误文案脱敏契约(响应体不含内部异常串)。"""
|
|
|
|
from io import BytesIO
|
|
from unittest.mock import MagicMock
|
|
|
|
import pytest
|
|
from httpx import ASGITransport, AsyncClient
|
|
|
|
from app.core.dependencies import get_current_user
|
|
from app.features.auth.deps import get_auth_service
|
|
from app.features.auth.router import router as auth_router
|
|
from app.features.payment.deps import get_payment_order_service
|
|
from app.features.payment.router import router as payment_router
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_wechat_notify_returns_fixed_message_on_service_error() -> None:
|
|
from fastapi import FastAPI
|
|
|
|
class BoomOrderService:
|
|
async def handle_wechat_notify(self, *, headers: dict, body: str):
|
|
raise RuntimeError("wechat_sdk_secret_123")
|
|
|
|
app = FastAPI()
|
|
app.include_router(payment_router)
|
|
app.dependency_overrides[get_payment_order_service] = lambda: BoomOrderService()
|
|
|
|
transport = ASGITransport(app=app)
|
|
async with AsyncClient(transport=transport, base_url="http://test") as client:
|
|
r = await client.post("/api/payment/notify/wechat", content="raw")
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
assert data.get("code") == "FAIL"
|
|
assert data.get("message") == "处理失败"
|
|
assert "wechat_sdk" not in r.text
|
|
assert "secret" not in r.text.lower()
|
|
|
|
|
|
def _minimal_jpeg_bytes() -> bytes:
|
|
"""1x1 JPEG 最小合法文件。"""
|
|
return (
|
|
b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00"
|
|
b"\xff\xdb\x00C\x00\x08\x06\x06\x07\x06\x05\x08\x07\x07\x07\t\t\x08\n\x0c\x14\r\x0c\x0b\x0b\x0c\x19\x12\x13\x0f\x14\x1d\x1a\x1f\x1e\x1d\x1a\x1c\x1c $.' \",#\x1c\x1c(7),01444\x1f'9=82<.342\xff\xc0\x00\x11\x08\x00\x01\x00\x01\x01\x01\x11\x00\xff\xc4\x00\x14\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x08\xff\xc4\x00\x14\x10\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xda\x00\x0c\x03\x01\x00\x02\x11\x03\x11\x00\x3f\x00\xaa\xff\xd9"
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_avatar_upload_500_detail_sanitized(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
from fastapi import FastAPI
|
|
|
|
fake_user = MagicMock()
|
|
fake_user.id = "user-contract-test"
|
|
|
|
class BoomAuth:
|
|
async def update_avatar_url(self, user_id: str, avatar_url: str):
|
|
raise RuntimeError("db_connection_secret_xyz")
|
|
|
|
app = FastAPI()
|
|
app.include_router(auth_router)
|
|
app.dependency_overrides[get_current_user] = lambda: fake_user
|
|
app.dependency_overrides[get_auth_service] = lambda: BoomAuth()
|
|
|
|
transport = ASGITransport(app=app)
|
|
files = {"file": ("a.jpg", BytesIO(_minimal_jpeg_bytes()), "image/jpeg")}
|
|
async with AsyncClient(transport=transport, base_url="http://test") as client:
|
|
r = await client.post("/api/auth/me/avatar", files=files)
|
|
|
|
assert r.status_code == 500
|
|
body = r.json()
|
|
detail = body.get("detail", "")
|
|
assert detail == "处理图片失败,请重试"
|
|
assert "secret" not in str(detail).lower()
|
|
assert "db_connection" not in r.text
|