Files
life-echo/api/middleware/auth.py

84 lines
2.0 KiB
Python
Raw Normal View History

"""
认证依赖从JWT令牌获取当前用户
"""
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_async_db
from database.models import User
from services.auth_service import verify_token
# OAuth2密码流配置
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_async_db)
) -> User:
"""
从JWT令牌获取当前用户
Args:
token: JWT访问令牌
db: 数据库会话
Returns:
当前用户对象
Raises:
HTTPException: 如果令牌无效或用户不存在
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无法验证凭据",
headers={"WWW-Authenticate": "Bearer"},
)
# 验证令牌
payload = verify_token(token)
if payload is None:
raise credentials_exception
# 获取用户ID
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
# 检查令牌类型
token_type = payload.get("type")
if token_type != "access":
raise credentials_exception
# 从数据库获取用户
user = await db.get(User, user_id)
if user is None:
raise credentials_exception
return user
async def get_optional_user(
token: Optional[str] = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_async_db)
) -> Optional[User]:
"""
可选用户用于某些公开端点
Args:
token: JWT访问令牌可选
db: 数据库会话
Returns:
用户对象如果提供了有效令牌否则返回None
"""
if token is None:
return None
try:
return await get_current_user(token, db)
except HTTPException:
return None