feat: 新增短信验证码服务

- 新增sms_service.py短信验证服务
- 新增test_sms_verification.py短信验证测试
- 更新requirements.txt添加短信服务依赖
- 更新.env.production添加短信服务配置
This commit is contained in:
iammm0
2026-01-27 11:35:54 +08:00
parent 8f4a68f40f
commit 9c39df62bc
4 changed files with 707 additions and 1 deletions

265
api/services/sms_service.py Normal file
View File

@@ -0,0 +1,265 @@
"""
腾讯云短信服务
"""
import os
import random
import uuid
from datetime import timedelta
from typing import Optional, Tuple
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from tencentcloud.common import credential
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.sms.v20210111 import sms_client, models as sms_models
from database.models import SmsVerificationCode, utc_now
# 腾讯云短信配置
TENCENT_SMS_SECRET_ID = os.getenv("TENCENT_SMS_SECRET_ID", "")
TENCENT_SMS_SECRET_KEY = os.getenv("TENCENT_SMS_SECRET_KEY", "")
TENCENT_SMS_SDK_APP_ID = os.getenv("TENCENT_SMS_SDK_APP_ID", "")
TENCENT_SMS_SIGN_NAME = os.getenv("TENCENT_SMS_SIGN_NAME", "")
# 统一使用一个短信模板ID所有场景共用
TENCENT_SMS_TEMPLATE_ID = os.getenv("TENCENT_SMS_TEMPLATE_ID", "")
# 验证码配置
CODE_LENGTH = 6 # 验证码长度
CODE_EXPIRE_MINUTES = 5 # 验证码过期时间(分钟)
RATE_LIMIT_SECONDS = 60 # 发送频率限制(秒)
def generate_verification_code() -> str:
"""生成6位随机数字验证码"""
return ''.join([str(random.randint(0, 9)) for _ in range(CODE_LENGTH)])
def get_template_id_by_purpose(purpose: str) -> str:
"""获取短信模板ID所有场景共用同一个模板"""
return TENCENT_SMS_TEMPLATE_ID
def send_sms_via_tencent(phone: str, code: str, purpose: str) -> bool:
"""
通过腾讯云发送短信验证码
Args:
phone: 手机号
code: 验证码
purpose: 用途
Returns:
bool: 是否发送成功
"""
try:
# 创建认证对象
cred = credential.Credential(TENCENT_SMS_SECRET_ID, TENCENT_SMS_SECRET_KEY)
# 创建SMS客户端
client = sms_client.SmsClient(cred, "ap-guangzhou")
# 创建请求对象
req = sms_models.SendSmsRequest()
req.SmsSdkAppId = TENCENT_SMS_SDK_APP_ID
req.SignName = TENCENT_SMS_SIGN_NAME
req.TemplateId = get_template_id_by_purpose(purpose)
req.TemplateParamSet = [code, str(CODE_EXPIRE_MINUTES)] # 验证码和过期时间
req.PhoneNumberSet = [f"+86{phone}"]
# 发送短信
resp = client.SendSms(req)
# 检查发送结果
if resp.SendStatusSet and len(resp.SendStatusSet) > 0:
status = resp.SendStatusSet[0]
if status.Code == "Ok":
return True
else:
print(f"短信发送失败: {status.Code} - {status.Message}")
return False
return False
except TencentCloudSDKException as e:
print(f"腾讯云SDK异常: {e}")
return False
except Exception as e:
print(f"发送短信异常: {e}")
return False
async def check_rate_limit(db: AsyncSession, phone: str) -> Tuple[bool, int]:
"""
检查发送频率限制
Args:
db: 数据库会话
phone: 手机号
Returns:
Tuple[bool, int]: (是否允许发送, 剩余等待秒数)
"""
# 查询最近的验证码记录
stmt = select(SmsVerificationCode).where(
SmsVerificationCode.phone == phone
).order_by(
SmsVerificationCode.created_at.desc()
)
result = await db.execute(stmt)
recent_code = result.scalar_one_or_none()
if not recent_code:
return True, 0
# 计算距离上次发送的时间
now = utc_now()
time_diff = (now - recent_code.created_at).total_seconds()
if time_diff < RATE_LIMIT_SECONDS:
remaining = int(RATE_LIMIT_SECONDS - time_diff)
return False, remaining
return True, 0
async def send_verification_code(
db: AsyncSession,
phone: str,
purpose: str,
ip_address: Optional[str] = None
) -> Tuple[bool, str, int]:
"""
发送验证码
Args:
db: 数据库会话
phone: 手机号
purpose: 用途 (register/login/reset_password/change_phone)
ip_address: 请求IP地址
Returns:
Tuple[bool, str, int]: (是否成功, 消息, 过期时间秒数)
"""
# 检查频率限制
can_send, remaining = await check_rate_limit(db, phone)
if not can_send:
return False, f"发送过于频繁,请{remaining}秒后再试", 0
# 生成验证码
code = generate_verification_code()
# 发送短信
success = send_sms_via_tencent(phone, code, purpose)
if not success:
return False, "短信发送失败,请稍后重试", 0
# 保存到数据库
expires_at = utc_now() + timedelta(minutes=CODE_EXPIRE_MINUTES)
verification_code = SmsVerificationCode(
id=str(uuid.uuid4()),
phone=phone,
code=code,
purpose=purpose,
expires_at=expires_at,
ip_address=ip_address
)
db.add(verification_code)
await db.commit()
return True, "验证码已发送", CODE_EXPIRE_MINUTES * 60
async def verify_code(
db: AsyncSession,
phone: str,
code: str,
purpose: str
) -> Tuple[bool, str]:
"""
验证验证码
Args:
db: 数据库会话
phone: 手机号
code: 验证码
purpose: 用途
Returns:
Tuple[bool, str]: (是否验证成功, 消息)
"""
# 查询最新的未使用验证码
stmt = select(SmsVerificationCode).where(
SmsVerificationCode.phone == phone,
SmsVerificationCode.purpose == purpose,
SmsVerificationCode.is_used == False,
SmsVerificationCode.is_expired == False
).order_by(
SmsVerificationCode.created_at.desc()
)
result = await db.execute(stmt)
verification = result.scalar_one_or_none()
if not verification:
return False, "验证码不存在或已使用"
# 检查是否过期
now = utc_now()
if now > verification.expires_at:
verification.is_expired = True
await db.commit()
return False, "验证码已过期"
# 验证验证码
if verification.code != code:
return False, "验证码错误"
# 标记为已使用
verification.is_used = True
verification.verified_at = now
await db.commit()
return True, "验证成功"
async def cleanup_expired_codes(db: AsyncSession) -> int:
"""
清理过期的验证码
Args:
db: 数据库会话
Returns:
int: 清理的记录数
"""
from sqlalchemy import delete
now = utc_now()
# 标记过期的验证码
stmt = select(SmsVerificationCode).where(
SmsVerificationCode.expires_at < now,
SmsVerificationCode.is_expired == False
)
result = await db.execute(stmt)
expired_codes = result.scalars().all()
count = 0
for code in expired_codes:
code.is_expired = True
count += 1
await db.commit()
# 删除7天前的记录
seven_days_ago = now - timedelta(days=7)
delete_stmt = delete(SmsVerificationCode).where(
SmsVerificationCode.created_at < seven_days_ago
)
result = await db.execute(delete_stmt)
old_codes_count = result.rowcount
await db.commit()
return count + old_codes_count