- 优化payment/config.py、wechat_pay.py - 优化routers/payment.py、plans.py、quota.py、websocket.py - 更新main.py、.env.production Co-authored-by: Cursor <cursoragent@cursor.com>
349 lines
15 KiB
Python
349 lines
15 KiB
Python
"""
|
||
微信支付 API v3 封装
|
||
使用 wechatpayv3 SDK 实现 APP 支付
|
||
"""
|
||
import json
|
||
import logging
|
||
import os
|
||
import time
|
||
from typing import Optional, Dict, Any
|
||
|
||
# #region agent log
|
||
def _debug_log(location: str, message: str, data: dict, hypothesis_id: str):
|
||
try:
|
||
with open("/Users/zuckxu/PycharmProjects/life-echo/.cursor/debug.log", "a", encoding="utf-8") as f:
|
||
f.write(json.dumps({"id": f"log_{int(time.time()*1000)}", "timestamp": int(time.time() * 1000), "location": location, "message": message, "data": data, "hypothesisId": hypothesis_id}, ensure_ascii=False) + "\n")
|
||
except Exception:
|
||
pass
|
||
# #endregion
|
||
|
||
from .config import WeChatPayConfig
|
||
from .schemas import PaymentResult, NotifyResult, PaymentStatus
|
||
from .exceptions import PaymentConfigError, PaymentCreateError, PaymentNotifyError, PaymentQueryError
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def _normalize_pem_key(key: str) -> str:
|
||
"""
|
||
规范化 PEM 私钥/公钥字符串,便于 wechatpayv3 正确解析。
|
||
- 去除 BOM、首尾引号
|
||
- 环境变量中常将换行写成字面量 \\n,需转为真实换行
|
||
- 统一为 \\n 换行,去除 \\r
|
||
"""
|
||
if not key or not key.strip():
|
||
return key
|
||
key = key.strip().lstrip("\ufeff").strip('"').strip("'")
|
||
key = key.replace("\\n", "\n")
|
||
key = key.replace("\r\n", "\n").replace("\r", "\n")
|
||
return key
|
||
|
||
|
||
def _resolve_key_path(key_path: str) -> str:
|
||
"""解析私钥/公钥文件路径:相对路径时优先相对于 api 目录,避免受当前工作目录影响。"""
|
||
if not key_path or not key_path.strip():
|
||
return key_path
|
||
key_path = key_path.strip()
|
||
if os.path.isabs(key_path):
|
||
return key_path
|
||
# 相对路径:先相对于当前工作目录,若文件不存在则相对于 api 目录(本包所在目录的上一级)
|
||
abs_cwd = os.path.abspath(key_path)
|
||
if os.path.isfile(abs_cwd):
|
||
return abs_cwd
|
||
try:
|
||
api_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||
abs_api = os.path.normpath(os.path.join(api_dir, key_path))
|
||
if os.path.isfile(abs_api):
|
||
return abs_api
|
||
except Exception:
|
||
pass
|
||
return abs_cwd
|
||
|
||
|
||
class WeChatPayClient:
|
||
"""微信支付客户端"""
|
||
|
||
def __init__(self, config: WeChatPayConfig):
|
||
self._config = config
|
||
self._client = None
|
||
|
||
def ensure_client(self):
|
||
"""供外部在后台线程中提前触发初始化,避免首次下单时在预支付超时窗口内做耗时 init。"""
|
||
self._ensure_client()
|
||
|
||
def _ensure_client(self):
|
||
"""确保微信支付客户端已初始化"""
|
||
if not self._config.is_configured:
|
||
raise PaymentConfigError("微信支付配置不完整,请检查环境变量")
|
||
|
||
if self._client is None:
|
||
try:
|
||
# #region agent log
|
||
_debug_log("payment/wechat_pay.py:_ensure_client", "wechat_ensure_client_enter", {}, "H_init")
|
||
# #endregion
|
||
from wechatpayv3 import WeChatPay, WeChatPayType
|
||
|
||
# 商户私钥:优先用 WECHAT_PAY_PRIVATE_KEY_PATH 从文件读取(推荐),否则用 WECHAT_PAY_PRIVATE_KEY
|
||
if self._config.private_key_path and self._config.private_key_path.strip():
|
||
key_path = _resolve_key_path(self._config.private_key_path)
|
||
with open(key_path, "r", encoding="utf-8") as f:
|
||
private_key = f.read()
|
||
elif self._config.private_key and self._config.private_key.strip():
|
||
private_key = self._config.private_key.strip()
|
||
else:
|
||
raise PaymentConfigError("未配置微信支付商户私钥:请设置 WECHAT_PAY_PRIVATE_KEY_PATH(推荐)或 WECHAT_PAY_PRIVATE_KEY")
|
||
|
||
# 统一 PEM 格式:换行符规范化、字面量 \n 转真实换行(环境变量或部分编辑器会写成 \n)
|
||
private_key = _normalize_pem_key(private_key)
|
||
if not private_key or "-----BEGIN" not in private_key:
|
||
raise PaymentConfigError(
|
||
"微信支付商户私钥格式错误:需为 PEM 格式(以 -----BEGIN PRIVATE KEY----- 开头)。"
|
||
"若使用 WECHAT_PAY_PRIVATE_KEY_PATH,请确认文件内容正确;若用 WECHAT_PAY_PRIVATE_KEY,请确认环境变量为完整 PEM。"
|
||
)
|
||
# #region agent log
|
||
_debug_log("payment/wechat_pay.py:_ensure_client", "wechat_ensure_client_key_done", {}, "H_init")
|
||
# #endregion
|
||
|
||
# 平台公钥模式:无需访问 api.mch.weixin.qq.com 拉取证书,适合无法直连微信的环境
|
||
kwargs = dict(
|
||
wechatpay_type=WeChatPayType.APP,
|
||
mchid=self._config.mch_id,
|
||
private_key=private_key,
|
||
cert_serial_no=self._config.cert_serial_no,
|
||
appid=self._config.app_id,
|
||
apiv3_key=self._config.api_v3_key,
|
||
notify_url=self._config.notify_url,
|
||
)
|
||
if self._config.use_platform_public_key:
|
||
if self._config.platform_public_key_path and self._config.platform_public_key_path.strip():
|
||
key_path = _resolve_key_path(self._config.platform_public_key_path)
|
||
with open(key_path, "r", encoding="utf-8") as f:
|
||
platform_pub_key = _normalize_pem_key(f.read())
|
||
elif self._config.platform_public_key and self._config.platform_public_key.strip():
|
||
platform_pub_key = _normalize_pem_key(self._config.platform_public_key.strip())
|
||
else:
|
||
raise PaymentConfigError("平台公钥模式需设置 WECHAT_PAY_PLATFORM_PUBLIC_KEY 或 WECHAT_PAY_PLATFORM_PUBLIC_KEY_PATH")
|
||
if not platform_pub_key or "-----BEGIN" not in platform_pub_key:
|
||
raise PaymentConfigError("微信支付平台公钥格式错误,需为 PEM 格式")
|
||
kwargs["public_key"] = platform_pub_key
|
||
kwargs["public_key_id"] = self._config.platform_public_key_id.strip()
|
||
else:
|
||
kwargs["timeout"] = (10, 25) # 证书模式:拉取平台证书时的超时
|
||
self._client = WeChatPay(**kwargs)
|
||
# #region agent log
|
||
_debug_log("payment/wechat_pay.py:_ensure_client", "wechat_ensure_client_sdk_done", {}, "H_init")
|
||
# #endregion
|
||
logger.info("微信支付客户端初始化成功")
|
||
except FileNotFoundError:
|
||
key_path = _resolve_key_path(self._config.private_key_path) if self._config.private_key_path else self._config.private_key_path
|
||
raise PaymentConfigError(
|
||
f"微信支付商户私钥文件不存在: {key_path}(已尝试相对路径与 api 目录,当前工作目录: {os.getcwd()})"
|
||
)
|
||
except PaymentConfigError:
|
||
raise
|
||
except Exception as e:
|
||
msg = str(e).strip()
|
||
if "No wechatpay platform certificate" in msg or "platform certificate" in msg.lower():
|
||
raise PaymentConfigError(
|
||
"无法获取微信支付平台证书(当前环境可能无法访问 api.mch.weixin.qq.com)。"
|
||
"请改用平台公钥模式:在商户平台「API安全」中获取微信支付公钥与公钥ID,并设置环境变量 "
|
||
"WECHAT_PAY_PLATFORM_PUBLIC_KEY(或 WECHAT_PAY_PLATFORM_PUBLIC_KEY_PATH)与 "
|
||
"WECHAT_PAY_PLATFORM_PUBLIC_KEY_ID。"
|
||
)
|
||
raise PaymentConfigError(
|
||
f"微信支付客户端初始化失败: {e}。"
|
||
"若使用文件路径,请确认文件为 PEM 格式且路径正确;或改用 WECHAT_PAY_PRIVATE_KEY 直接配置私钥内容。"
|
||
)
|
||
|
||
def create_app_order(
|
||
self,
|
||
out_trade_no: str,
|
||
total_amount: int,
|
||
description: str,
|
||
) -> PaymentResult:
|
||
"""
|
||
创建 APP 支付预订单
|
||
|
||
Args:
|
||
out_trade_no: 商户订单号
|
||
total_amount: 金额(单位:分)
|
||
description: 商品描述
|
||
|
||
Returns:
|
||
PaymentResult 包含调起支付所需的参数
|
||
"""
|
||
# #region agent log
|
||
_debug_log("payment/wechat_pay.py:create_app_order", "wechat_create_app_order_enter", {"out_trade_no": out_trade_no}, "H3")
|
||
# #endregion
|
||
self._ensure_client()
|
||
# #region agent log
|
||
_debug_log("payment/wechat_pay.py:create_app_order", "wechat_client_ready", {"out_trade_no": out_trade_no}, "H3")
|
||
# #endregion
|
||
try:
|
||
# #region agent log
|
||
_debug_log("payment/wechat_pay.py:create_app_order", "wechat_pay_request_start", {"out_trade_no": out_trade_no}, "H1,H4")
|
||
# #endregion
|
||
code, message = self._client.pay(
|
||
description=description,
|
||
out_trade_no=out_trade_no,
|
||
amount={"total": total_amount, "currency": "CNY"},
|
||
pay_type=self._get_pay_type(),
|
||
)
|
||
# #region agent log
|
||
_debug_log("payment/wechat_pay.py:create_app_order", "wechat_pay_request_done", {"out_trade_no": out_trade_no, "code": code}, "H1,H4")
|
||
# #endregion
|
||
|
||
result = json.loads(message)
|
||
|
||
if code in range(200, 300):
|
||
prepay_id = result.get("prepay_id", "")
|
||
# 生成 APP 调起支付所需的参数
|
||
timestamp = str(int(time.time()))
|
||
nonce_str = out_trade_no # 使用订单号作为随机字符串的基础
|
||
|
||
# 使用 SDK 的签名方法生成 APP 支付参数
|
||
pay_params = self._client.sign(
|
||
data=[self._config.app_id, timestamp, nonce_str, prepay_id]
|
||
)
|
||
|
||
wechat_params = {
|
||
"appId": self._config.app_id,
|
||
"partnerId": self._config.mch_id,
|
||
"prepayId": prepay_id,
|
||
"nonceStr": nonce_str,
|
||
"timeStamp": timestamp,
|
||
"sign": pay_params,
|
||
"packageValue": "Sign=WXPay",
|
||
}
|
||
|
||
logger.info(f"微信支付预订单创建成功: {out_trade_no}")
|
||
return PaymentResult(
|
||
success=True,
|
||
payment_method="wechat",
|
||
out_trade_no=out_trade_no,
|
||
wechat_params=wechat_params,
|
||
)
|
||
else:
|
||
error_msg = result.get("message", "未知错误")
|
||
logger.error(f"微信支付预订单创建失败: {code} - {error_msg}")
|
||
raise PaymentCreateError(f"微信支付下单失败: {error_msg}")
|
||
|
||
except PaymentCreateError:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"微信支付预订单创建异常: {e}")
|
||
raise PaymentCreateError(f"微信支付下单异常: {e}")
|
||
|
||
def verify_notify(self, headers: Dict[str, str], body: str) -> NotifyResult:
|
||
"""
|
||
验证并解密微信支付回调通知
|
||
|
||
Args:
|
||
headers: 请求头(包含签名等信息)
|
||
body: 请求体(加密的通知数据)
|
||
|
||
Returns:
|
||
NotifyResult 包含解密后的交易信息
|
||
"""
|
||
self._ensure_client()
|
||
|
||
try:
|
||
result = self._client.callback(headers=headers, body=body)
|
||
|
||
if result:
|
||
event_data = result if isinstance(result, dict) else json.loads(result)
|
||
trade_state = event_data.get("trade_state", "")
|
||
out_trade_no = event_data.get("out_trade_no", "")
|
||
transaction_id = event_data.get("transaction_id", "")
|
||
amount_info = event_data.get("amount", {})
|
||
total_amount = amount_info.get("total", 0)
|
||
|
||
logger.info(
|
||
f"微信支付回调验证成功: out_trade_no={out_trade_no}, "
|
||
f"trade_state={trade_state}, amount={total_amount}"
|
||
)
|
||
|
||
return NotifyResult(
|
||
success=True,
|
||
out_trade_no=out_trade_no,
|
||
trade_no=transaction_id,
|
||
total_amount=total_amount,
|
||
trade_status=trade_state,
|
||
)
|
||
else:
|
||
raise PaymentNotifyError("微信支付回调验签失败")
|
||
|
||
except PaymentNotifyError:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"微信支付回调处理异常: {e}")
|
||
raise PaymentNotifyError(f"微信支付回调处理失败: {e}")
|
||
|
||
def query_order(self, out_trade_no: str) -> PaymentStatus:
|
||
"""
|
||
主动查询微信支付订单状态
|
||
|
||
Args:
|
||
out_trade_no: 商户订单号
|
||
|
||
Returns:
|
||
PaymentStatus 订单状态
|
||
"""
|
||
self._ensure_client()
|
||
|
||
try:
|
||
code, message = self._client.query(out_trade_no=out_trade_no)
|
||
result = json.loads(message)
|
||
|
||
if code in range(200, 300):
|
||
trade_state = result.get("trade_state", "NOTPAY")
|
||
transaction_id = result.get("transaction_id", "")
|
||
amount_info = result.get("amount", {})
|
||
total_amount = amount_info.get("total", 0)
|
||
|
||
return PaymentStatus(
|
||
success=True,
|
||
out_trade_no=out_trade_no,
|
||
trade_no=transaction_id,
|
||
trade_status=trade_state,
|
||
total_amount=total_amount,
|
||
)
|
||
else:
|
||
error_msg = result.get("message", "未知错误")
|
||
raise PaymentQueryError(f"查询微信支付订单失败: {error_msg}")
|
||
|
||
except PaymentQueryError:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"查询微信支付订单异常: {e}")
|
||
raise PaymentQueryError(f"查询微信支付订单异常: {e}")
|
||
|
||
def close_order(self, out_trade_no: str) -> bool:
|
||
"""
|
||
关闭微信支付订单
|
||
|
||
Args:
|
||
out_trade_no: 商户订单号
|
||
|
||
Returns:
|
||
是否关闭成功
|
||
"""
|
||
self._ensure_client()
|
||
|
||
try:
|
||
code, message = self._client.close(out_trade_no=out_trade_no)
|
||
if code in range(200, 300):
|
||
logger.info(f"微信支付订单已关闭: {out_trade_no}")
|
||
return True
|
||
else:
|
||
logger.warning(f"关闭微信支付订单失败: {code} - {message}")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"关闭微信支付订单异常: {e}")
|
||
return False
|
||
|
||
def _get_pay_type(self):
|
||
"""获取支付类型枚举"""
|
||
from wechatpayv3 import WeChatPayType
|
||
return WeChatPayType.APP
|