""" 支付相关 API 路由 处理订单创建、支付回调、订单状态查询 """ import asyncio import uuid import logging from datetime import timedelta from typing import List from fastapi import APIRouter, Depends, Request, HTTPException from fastapi.responses import PlainTextResponse from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from middleware.auth import get_current_user from database.models import User, Order, utc_now from database import get_async_db from payment import PaymentService, PaymentConfig from payment.schemas import ( CreateOrderRequest, CreateOrderResponse, OrderStatusResponse, OrderListResponse, ) from payment.exceptions import PaymentError from routers.plans import get_plan_by_type, AVAILABLE_PLANS logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/payment", tags=["payment"]) # 初始化支付服务(延迟加载,避免在导入时就读取环境变量) _payment_service = None def get_payment_service() -> PaymentService: """获取支付服务单例""" global _payment_service if _payment_service is None: config = PaymentConfig.from_env() _payment_service = PaymentService(config) return _payment_service # 订单超时时间(分钟) ORDER_EXPIRE_MINUTES = 30 # 订阅时长映射(天数) SUBSCRIPTION_DURATION_DAYS = { "pro": 365, # Pro 版一年 "pro_plus": 365, # Pro+ 版一年 "premium": 365, # 兼容旧高级版 } def generate_order_no() -> str: """生成唯一订单号""" import time timestamp = time.strftime("%Y%m%d%H%M%S") short_uuid = uuid.uuid4().hex[:8].upper() return f"LE{timestamp}{short_uuid}" @router.post("/create-order", response_model=CreateOrderResponse) async def create_order( request: CreateOrderRequest, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_async_db), ): """ 创建支付订单并获取支付参数 - plan_id: 套餐 ID(如 premium) - payment_method: 支付方式(wechat / alipay) """ # 验证套餐 plan = None for p in AVAILABLE_PLANS: if p.id == request.plan_id: plan = p break if plan is None: raise HTTPException(status_code=400, detail="无效的套餐 ID") if plan.price <= 0: raise HTTPException(status_code=400, detail="免费套餐无需支付") # 验证支付方式 if request.payment_method not in ("wechat", "alipay"): raise HTTPException(status_code=400, detail="不支持的支付方式,仅支持 wechat / alipay") payment_service = get_payment_service() # 检查支付方式是否可用 if not payment_service.is_method_available(request.payment_method): if request.payment_method == "alipay": raise HTTPException( status_code=503, detail="支付宝支付接口正在开发中,暂时不可用" ) raise HTTPException( status_code=503, detail=f"{request.payment_method} 支付暂不可用,请选择其他支付方式" ) # 金额转为分 amount_fen = int(plan.price * 100) order_no = generate_order_no() now = utc_now() # 创建数据库订单记录 order = Order( id=order_no, user_id=current_user.id, plan_id=plan.id, plan_name=plan.display_name, amount=amount_fen, currency=plan.currency, payment_method=request.payment_method, status="pending", created_at=now, expired_at=now + timedelta(minutes=ORDER_EXPIRE_MINUTES), ) db.add(order) await db.flush() # 调用支付服务创建预支付订单(同步的微信/支付宝 SDK 会发 HTTP,放到线程池避免阻塞事件循环导致超时) try: payment_result = await asyncio.to_thread( payment_service.create_payment, request.payment_method, order_no, amount_fen, f"往事拾遗 - {plan.display_name}", ) except PaymentError as e: # 支付下单失败,更新订单状态 order.status = "failed" await db.flush() logger.error(f"创建支付订单失败: {e.message}") raise HTTPException(status_code=500, detail=f"创建支付订单失败: {e.message}") logger.info(f"订单创建成功: {order_no}, 方式: {request.payment_method}, 金额: {amount_fen}分") return CreateOrderResponse( order_id=order_no, payment_method=request.payment_method, wechat_params=payment_result.wechat_params, alipay_order_string=payment_result.alipay_order_string, ) @router.post("/notify/wechat") async def wechat_notify( request: Request, db: AsyncSession = Depends(get_async_db), ): """ 微信支付异步回调通知 此接口无需认证,由微信支付服务器调用 """ payment_service = get_payment_service() try: # 获取请求头和请求体 headers = dict(request.headers) body = await request.body() body_str = body.decode("utf-8") # 验签并解析通知 notify_result = payment_service.handle_wechat_notify( headers=headers, body=body_str ) if notify_result.success and notify_result.trade_status == "SUCCESS": await _handle_payment_success( db=db, out_trade_no=notify_result.out_trade_no, trade_no=notify_result.trade_no, ) # 微信要求返回 JSON 格式的成功响应 return {"code": "SUCCESS", "message": "成功"} except Exception as e: logger.error(f"微信支付回调处理失败: {e}") # 微信要求返回失败时也要有正确格式 return {"code": "FAIL", "message": str(e)} @router.post("/notify/alipay") async def alipay_notify( request: Request, db: AsyncSession = Depends(get_async_db), ): """ 支付宝异步回调通知 此接口无需认证,由支付宝服务器调用 """ payment_service = get_payment_service() try: # 解析 form 表单参数 form_data = await request.form() params = {key: value for key, value in form_data.items()} # 验签并解析通知 notify_result = payment_service.handle_alipay_notify(params=params) if notify_result.success and notify_result.trade_status in ( "TRADE_SUCCESS", "TRADE_FINISHED", "SUCCESS", ): await _handle_payment_success( db=db, out_trade_no=notify_result.out_trade_no, trade_no=notify_result.trade_no, ) # 支付宝要求返回纯文本 "success" return PlainTextResponse("success") except Exception as e: logger.error(f"支付宝回调处理失败: {e}") return PlainTextResponse("fail") @router.get("/order/{order_id}/status", response_model=OrderStatusResponse) async def get_order_status( order_id: str, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_async_db), ): """查询订单支付状态""" stmt = select(Order).where( Order.id == order_id, Order.user_id == current_user.id, ) result = await db.execute(stmt) order = result.scalar_one_or_none() if order is None: raise HTTPException(status_code=404, detail="订单不存在") return OrderStatusResponse( order_id=order.id, plan_id=order.plan_id, plan_name=order.plan_name, amount=order.amount, currency=order.currency, payment_method=order.payment_method, status=order.status, trade_no=order.trade_no, created_at=order.created_at.isoformat() if order.created_at else "", paid_at=order.paid_at.isoformat() if order.paid_at else None, ) @router.get("/orders", response_model=List[OrderListResponse]) async def list_orders( current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_async_db), ): """获取当前用户的订单列表""" stmt = ( select(Order) .where(Order.user_id == current_user.id) .order_by(Order.created_at.desc()) ) result = await db.execute(stmt) orders = result.scalars().all() return [ OrderListResponse( id=order.id, plan_id=order.plan_id, plan_name=order.plan_name, amount=order.amount, currency=order.currency, status=order.status, payment_method=order.payment_method, created_at=order.created_at.isoformat() if order.created_at else "", paid_at=order.paid_at.isoformat() if order.paid_at else None, ) for order in orders ] async def _handle_payment_success( db: AsyncSession, out_trade_no: str, trade_no: str, ): """ 处理支付成功的通用逻辑 - 更新订单状态 - 升级用户订阅 """ # 查找订单 stmt = select(Order).where(Order.id == out_trade_no) result = await db.execute(stmt) order = result.scalar_one_or_none() if order is None: logger.warning(f"支付回调: 订单不存在 {out_trade_no}") return if order.status == "paid": logger.info(f"支付回调: 订单已处理过 {out_trade_no}") return now = utc_now() # 更新订单状态 order.status = "paid" order.trade_no = trade_no order.paid_at = now # 查找用户并升级订阅 stmt = select(User).where(User.id == order.user_id) result = await db.execute(stmt) user = result.scalar_one_or_none() if user: duration_days = SUBSCRIPTION_DURATION_DAYS.get(order.plan_id, 365) # 如果用户当前订阅未过期,在现有基础上续期 if user.subscription_expires_at and user.subscription_expires_at > now: user.subscription_expires_at = user.subscription_expires_at + timedelta(days=duration_days) else: user.subscription_expires_at = now + timedelta(days=duration_days) user.subscription_type = order.plan_id logger.info( f"用户 {user.id} 订阅已升级为 {order.plan_id}," f"到期时间: {user.subscription_expires_at}" ) await db.flush() logger.info(f"支付成功处理完成: 订单 {out_trade_no}, 第三方交易号 {trade_no}")