Files
life-echo/api/middleware/auth.py
徐在坤 347fd43b35 feat: 添加用户认证功能
- 添加认证路由(注册、登录、刷新令牌、登出、获取用户信息)
- 添加认证服务(密码哈希、JWT令牌生成和验证)
- 添加认证中间件(获取当前用户)
- 支持手机号和密码登录
- 支持访问令牌和刷新令牌机制
2026-01-18 15:57:40 +08:00

84 lines
2.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
认证依赖从JWT令牌获取当前用户
"""
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_async_db
from database.models import User
from services.auth_service import verify_token
# OAuth2密码流配置
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_async_db)
) -> User:
"""
从JWT令牌获取当前用户
Args:
token: JWT访问令牌
db: 数据库会话
Returns:
当前用户对象
Raises:
HTTPException: 如果令牌无效或用户不存在
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无法验证凭据",
headers={"WWW-Authenticate": "Bearer"},
)
# 验证令牌
payload = verify_token(token)
if payload is None:
raise credentials_exception
# 获取用户ID
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
# 检查令牌类型
token_type = payload.get("type")
if token_type != "access":
raise credentials_exception
# 从数据库获取用户
user = await db.get(User, user_id)
if user is None:
raise credentials_exception
return user
async def get_optional_user(
token: Optional[str] = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_async_db)
) -> Optional[User]:
"""
可选用户(用于某些公开端点)
Args:
token: JWT访问令牌可选
db: 数据库会话
Returns:
用户对象如果提供了有效令牌否则返回None
"""
if token is None:
return None
try:
return await get_current_user(token, db)
except HTTPException:
return None