""" 腾讯云短信服务 """ import os import random import uuid from datetime import timedelta from typing import Optional, Tuple from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from tencentcloud.common import credential from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException from tencentcloud.sms.v20210111 import sms_client, models as sms_models from database.models import SmsVerificationCode, utc_now # 腾讯云短信配置 TENCENT_SMS_SECRET_ID = os.getenv("TENCENT_SMS_SECRET_ID", "") TENCENT_SMS_SECRET_KEY = os.getenv("TENCENT_SMS_SECRET_KEY", "") TENCENT_SMS_SDK_APP_ID = os.getenv("TENCENT_SMS_SDK_APP_ID", "") TENCENT_SMS_SIGN_NAME = os.getenv("TENCENT_SMS_SIGN_NAME", "") # 统一使用一个短信模板ID(所有场景共用) TENCENT_SMS_TEMPLATE_ID = os.getenv("TENCENT_SMS_TEMPLATE_ID", "") # 验证码配置 CODE_LENGTH = 6 # 验证码长度 CODE_EXPIRE_MINUTES = 5 # 验证码过期时间(分钟) RATE_LIMIT_SECONDS = 60 # 发送频率限制(秒) def generate_verification_code() -> str: """生成6位随机数字验证码""" return ''.join([str(random.randint(0, 9)) for _ in range(CODE_LENGTH)]) def get_template_id_by_purpose(purpose: str) -> str: """获取短信模板ID(所有场景共用同一个模板)""" return TENCENT_SMS_TEMPLATE_ID def send_sms_via_tencent(phone: str, code: str, purpose: str) -> bool: """ 通过腾讯云发送短信验证码 Args: phone: 手机号 code: 验证码 purpose: 用途 Returns: bool: 是否发送成功 """ try: # 创建认证对象 cred = credential.Credential(TENCENT_SMS_SECRET_ID, TENCENT_SMS_SECRET_KEY) # 创建SMS客户端 client = sms_client.SmsClient(cred, "ap-guangzhou") # 创建请求对象 req = sms_models.SendSmsRequest() req.SmsSdkAppId = TENCENT_SMS_SDK_APP_ID req.SignName = TENCENT_SMS_SIGN_NAME req.TemplateId = get_template_id_by_purpose(purpose) req.TemplateParamSet = [code, str(CODE_EXPIRE_MINUTES)] # 验证码和过期时间 req.PhoneNumberSet = [f"+86{phone}"] # 发送短信 resp = client.SendSms(req) # 检查发送结果 if resp.SendStatusSet and len(resp.SendStatusSet) > 0: status = resp.SendStatusSet[0] if status.Code == "Ok": return True else: print(f"短信发送失败: {status.Code} - {status.Message}") return False return False except TencentCloudSDKException as e: print(f"腾讯云SDK异常: {e}") return False except Exception as e: print(f"发送短信异常: {e}") return False async def check_rate_limit(db: AsyncSession, phone: str) -> Tuple[bool, int]: """ 检查发送频率限制 Args: db: 数据库会话 phone: 手机号 Returns: Tuple[bool, int]: (是否允许发送, 剩余等待秒数) """ # 查询最近的验证码记录 stmt = select(SmsVerificationCode).where( SmsVerificationCode.phone == phone ).order_by( SmsVerificationCode.created_at.desc() ) result = await db.execute(stmt) recent_code = result.scalar_one_or_none() if not recent_code: return True, 0 # 计算距离上次发送的时间 now = utc_now() time_diff = (now - recent_code.created_at).total_seconds() if time_diff < RATE_LIMIT_SECONDS: remaining = int(RATE_LIMIT_SECONDS - time_diff) return False, remaining return True, 0 async def send_verification_code( db: AsyncSession, phone: str, purpose: str, ip_address: Optional[str] = None ) -> Tuple[bool, str, int]: """ 发送验证码 Args: db: 数据库会话 phone: 手机号 purpose: 用途 (register/login/reset_password/change_phone) ip_address: 请求IP地址 Returns: Tuple[bool, str, int]: (是否成功, 消息, 过期时间秒数) """ # 检查频率限制 can_send, remaining = await check_rate_limit(db, phone) if not can_send: return False, f"发送过于频繁,请{remaining}秒后再试", 0 # 生成验证码 code = generate_verification_code() # 发送短信 success = send_sms_via_tencent(phone, code, purpose) if not success: return False, "短信发送失败,请稍后重试", 0 # 保存到数据库 expires_at = utc_now() + timedelta(minutes=CODE_EXPIRE_MINUTES) verification_code = SmsVerificationCode( id=str(uuid.uuid4()), phone=phone, code=code, purpose=purpose, expires_at=expires_at, ip_address=ip_address ) db.add(verification_code) await db.commit() return True, "验证码已发送", CODE_EXPIRE_MINUTES * 60 async def verify_code( db: AsyncSession, phone: str, code: str, purpose: str ) -> Tuple[bool, str]: """ 验证验证码 Args: db: 数据库会话 phone: 手机号 code: 验证码 purpose: 用途 Returns: Tuple[bool, str]: (是否验证成功, 消息) """ # 查询最新的未使用验证码 stmt = select(SmsVerificationCode).where( SmsVerificationCode.phone == phone, SmsVerificationCode.purpose == purpose, SmsVerificationCode.is_used == False, SmsVerificationCode.is_expired == False ).order_by( SmsVerificationCode.created_at.desc() ) result = await db.execute(stmt) verification = result.scalar_one_or_none() if not verification: return False, "验证码不存在或已使用" # 检查是否过期 now = utc_now() if now > verification.expires_at: verification.is_expired = True await db.commit() return False, "验证码已过期" # 验证验证码 if verification.code != code: return False, "验证码错误" # 标记为已使用 verification.is_used = True verification.verified_at = now await db.commit() return True, "验证成功" async def cleanup_expired_codes(db: AsyncSession) -> int: """ 清理过期的验证码 Args: db: 数据库会话 Returns: int: 清理的记录数 """ from sqlalchemy import delete now = utc_now() # 标记过期的验证码 stmt = select(SmsVerificationCode).where( SmsVerificationCode.expires_at < now, SmsVerificationCode.is_expired == False ) result = await db.execute(stmt) expired_codes = result.scalars().all() count = 0 for code in expired_codes: code.is_expired = True count += 1 await db.commit() # 删除7天前的记录 seven_days_ago = now - timedelta(days=7) delete_stmt = delete(SmsVerificationCode).where( SmsVerificationCode.created_at < seven_days_ago ) result = await db.execute(delete_stmt) old_codes_count = result.rowcount await db.commit() return count + old_codes_count