feat: 添加用户认证功能

- 添加认证路由(注册、登录、刷新令牌、登出、获取用户信息)
- 添加认证服务(密码哈希、JWT令牌生成和验证)
- 添加认证中间件(获取当前用户)
- 支持手机号和密码登录
- 支持访问令牌和刷新令牌机制
This commit is contained in:
徐在坤
2026-01-18 15:57:40 +08:00
parent bf9f3cf363
commit 347fd43b35
9 changed files with 492 additions and 0 deletions

View File

@@ -0,0 +1,110 @@
"""
认证服务模块密码哈希、JWT令牌生成和验证
"""
import os
import secrets
from datetime import datetime, timedelta, timezone
from typing import Optional, Dict
from jose import JWTError, jwt
from passlib.context import CryptContext
# 密码加密上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# JWT配置
SECRET_KEY = os.getenv("SECRET_KEY", secrets.token_urlsafe(32))
ALGORITHM = os.getenv("ALGORITHM", "HS256")
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "120")) # 2小时
REFRESH_TOKEN_EXPIRE_DAYS = 30 # 30天
def hash_password(password: str) -> str:
"""
对密码进行哈希加密
Args:
password: 明文密码
Returns:
哈希后的密码
"""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""
验证密码
Args:
plain_password: 明文密码
hashed_password: 哈希后的密码
Returns:
是否匹配
"""
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(data: Dict, expires_delta: Optional[timedelta] = None) -> str:
"""
创建访问令牌JWT
Args:
data: 要编码到令牌中的数据通常包含user_id
expires_delta: 过期时间增量如果不提供则使用默认值2小时
Returns:
JWT令牌字符串
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({
"exp": expire,
"type": "access"
})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def create_refresh_token() -> str:
"""
生成刷新令牌(随机字符串)
Returns:
随机生成的刷新令牌字符串
"""
return secrets.token_urlsafe(32)
def verify_token(token: str) -> Optional[Dict]:
"""
验证JWT令牌
Args:
token: JWT令牌字符串
Returns:
解码后的令牌数据如果无效则返回None
"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
return None
def get_token_expires_at() -> datetime:
"""
获取刷新令牌的过期时间30天后
Returns:
过期时间
"""
return datetime.now(timezone.utc) + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)