refactor: 优化后端认证和对话功能

- 优化auth.py认证路由,增强功能
- 扩展conversations.py对话路由,添加新接口
This commit is contained in:
iammm0
2026-01-23 14:02:39 +08:00
parent 3690417fdc
commit 4a8f1a3b88
2 changed files with 330 additions and 17 deletions

View File

@@ -2,13 +2,22 @@
认证相关 API 路由:注册、登录、刷新令牌、登出
"""
import uuid
import io
import os
import logging
import traceback
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi import APIRouter, Body, Depends, File, HTTPException, UploadFile, status
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.responses import FileResponse
from pydantic import BaseModel, Field
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from PIL import Image
logger = logging.getLogger(__name__)
from database import get_async_db
from database.models import User, RefreshToken
@@ -138,8 +147,7 @@ async def register(
@router.post("/login", response_model=TokenResponse)
async def login(
request: LoginRequest = None,
form_data: OAuth2PasswordRequestForm = Depends(),
request: LoginRequest = Body(...),
db: AsyncSession = Depends(get_async_db)
):
"""
@@ -147,22 +155,11 @@ async def login(
验证手机号和密码,返回访问令牌和刷新令牌。
支持两种格式:
请求格式:
- JSON: {"phone": "13800138000", "password": "xxx"}
- 表单 (Swagger UI): username=13800138000&password=xxx
"""
# 优先使用表单数据Swagger UI否则使用 JSON
if form_data and form_data.username:
phone = form_data.username
password = form_data.password
elif request:
phone = request.phone
password = request.password
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="请提供手机号和密码"
)
phone = request.phone
password = request.password
# 验证手机号格式(简单验证)
if not phone or len(phone) != 11 or not phone.isdigit():
@@ -314,3 +311,157 @@ async def get_me(
subscription_type=current_user.subscription_type,
created_at=current_user.created_at.isoformat()
)
# 头像存储目录
AVATAR_DIR = Path("uploads/avatars")
AVATAR_DIR.mkdir(parents=True, exist_ok=True)
@router.post("/me/avatar", response_model=UserResponse)
async def upload_avatar(
file: UploadFile = File(...),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_async_db)
):
"""
上传用户头像
支持格式JPEG, PNG, WebP
最大大小5MB
自动裁剪为正方形并压缩
"""
# 验证文件类型
allowed_types = ["image/jpeg", "image/png", "image/webp"]
logger.info(f"上传头像 - 文件名: {file.filename}, Content-Type: {file.content_type}, Size: {file.size if hasattr(file, 'size') else 'unknown'}")
if file.content_type not in allowed_types:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"不支持的文件类型。仅支持: {', '.join(allowed_types)}"
)
# 验证文件大小5MB
file_content = await file.read()
logger.info(f"读取文件内容 - 大小: {len(file_content)} bytes")
# 验证文件内容不为空
if not file_content or len(file_content) == 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="文件内容为空"
)
if len(file_content) > 5 * 1024 * 1024:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="文件大小超过5MB限制"
)
try:
# 确保目录存在
AVATAR_DIR.mkdir(parents=True, exist_ok=True)
# 创建BytesIO对象并确保位置在开头
image_bytes = io.BytesIO(file_content)
image_bytes.seek(0)
# 验证文件是否为有效的图片格式(通过检查文件头)
image_bytes.seek(0)
header = image_bytes.read(16) # 读取更多字节以确保能检测WebP
image_bytes.seek(0)
# 检查常见图片格式的文件头
is_valid_image = False
if header.startswith(b'\xff\xd8\xff'): # JPEG
is_valid_image = True
logger.info("检测到JPEG格式")
elif header.startswith(b'\x89PNG\r\n\x1a\n'): # PNG
is_valid_image = True
logger.info("检测到PNG格式")
elif header.startswith(b'RIFF') and b'WEBP' in header[:12]: # WebP (RIFF...WEBP)
is_valid_image = True
logger.info("检测到WebP格式")
else:
logger.warning(f"无法识别的文件头: {header[:12].hex()}")
if not is_valid_image:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"无效的图片文件格式。文件头: {header[:12].hex()}"
)
# 打开并处理图片
image = Image.open(image_bytes)
logger.info(f"成功打开图片 - 格式: {image.format}, 模式: {image.mode}, 尺寸: {image.size}")
# 转换为RGB模式处理RGBA等格式
if image.mode != "RGB":
image = image.convert("RGB")
# 裁剪为正方形(取中心部分)
width, height = image.size
size = min(width, height)
left = (width - size) // 2
top = (height - size) // 2
right = left + size
bottom = top + size
image = image.crop((left, top, right, bottom))
# 调整大小最大512x512
if size > 512:
image = image.resize((512, 512), Image.Resampling.LANCZOS)
# 生成文件名使用用户ID
file_extension = "jpg"
filename = f"{current_user.id}.{file_extension}"
file_path = AVATAR_DIR / filename
# 保存图片JPEG格式质量85%
image.save(file_path, "JPEG", quality=85, optimize=True)
# 生成URL相对路径前端需要拼接BASE_URL
avatar_url = f"/api/auth/avatars/{filename}"
# 更新数据库
current_user.avatar_url = avatar_url
await db.commit()
await db.refresh(current_user)
return UserResponse(
id=current_user.id,
phone=current_user.phone,
email=current_user.email,
nickname=current_user.nickname,
avatar_url=current_user.avatar_url,
subscription_type=current_user.subscription_type,
created_at=current_user.created_at.isoformat()
)
except Exception as e:
error_msg = f"处理图片失败: {str(e)}"
logger.error(f"头像上传失败: {error_msg}\n{traceback.format_exc()}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=error_msg
)
@router.get("/avatars/{filename}")
async def get_avatar(filename: str):
"""
获取用户头像
返回头像文件如果不存在则返回404
"""
file_path = AVATAR_DIR / filename
if not file_path.exists():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="头像不存在"
)
return FileResponse(
file_path,
media_type="image/jpeg"
)