Files
life-echo/api/services/auth_service.py
penghanyuan dbbb924625 feat: 添加Redis支持和Celery任务处理
- 新增Redis服务模块用于会话状态存储和缓存
- 集成Celery用于后台任务处理
- 更新Docker Compose配置以支持开发环境
- 优化API以支持异步调用和Redis会话存储
- 更新文档以反映新的开发环境配置和使用方法
2026-01-21 23:06:47 +01:00

117 lines
2.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
认证服务模块密码哈希、JWT令牌生成和验证
"""
import os
import secrets
from datetime import datetime, timedelta, timezone
from typing import Optional, Dict
import bcrypt
from jose import JWTError, jwt
# JWT配置
SECRET_KEY = os.getenv("SECRET_KEY", secrets.token_urlsafe(32))
ALGORITHM = os.getenv("ALGORITHM", "HS256")
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "120")) # 2小时
REFRESH_TOKEN_EXPIRE_DAYS = 30 # 30天
def hash_password(password: str) -> str:
"""
对密码进行哈希加密
Args:
password: 明文密码
Returns:
哈希后的密码
"""
# bcrypt 要求 bytes 输入
password_bytes = password.encode('utf-8')
salt = bcrypt.gensalt()
hashed = bcrypt.hashpw(password_bytes, salt)
return hashed.decode('utf-8')
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""
验证密码
Args:
plain_password: 明文密码
hashed_password: 哈希后的密码
Returns:
是否匹配
"""
try:
password_bytes = plain_password.encode('utf-8')
hashed_bytes = hashed_password.encode('utf-8')
return bcrypt.checkpw(password_bytes, hashed_bytes)
except Exception:
return False
def create_access_token(data: Dict, expires_delta: Optional[timedelta] = None) -> str:
"""
创建访问令牌JWT
Args:
data: 要编码到令牌中的数据通常包含user_id
expires_delta: 过期时间增量如果不提供则使用默认值2小时
Returns:
JWT令牌字符串
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({
"exp": expire,
"type": "access"
})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def create_refresh_token() -> str:
"""
生成刷新令牌(随机字符串)
Returns:
随机生成的刷新令牌字符串
"""
return secrets.token_urlsafe(32)
def verify_token(token: str) -> Optional[Dict]:
"""
验证JWT令牌
Args:
token: JWT令牌字符串
Returns:
解码后的令牌数据如果无效则返回None
"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
return None
def get_token_expires_at() -> datetime:
"""
获取刷新令牌的过期时间30天后
Returns:
过期时间
"""
return datetime.now(timezone.utc) + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)