Files
life-echo/api/app/features/payment/payment_config.py
Sully 53e0065e3e refactor(api): TOML 配置 SSOT、统一错误契约、Auth/事务加固与可观测性 (#33)
配置 SSOT(TOML + .env)
统一错误契约
Auth 与事务边界
Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client
可观测性(OpenTelemetry + LGTM)
2026-05-22 13:44:50 +08:00

150 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
支付模块配置(从 payment 迁入 app从 app.core.config.settings 读取)
"""
from dataclasses import dataclass, field
from app.core.logging import get_logger
logger = get_logger(__name__)
@dataclass
class WeChatPayConfig:
app_id: str = ""
mch_id: str = ""
api_v3_key: str = ""
private_key_path: str = ""
private_key: str = ""
cert_serial_no: str = ""
notify_url: str = ""
platform_public_key: str = ""
platform_public_key_path: str = ""
platform_public_key_id: str = ""
@property
def is_configured(self) -> bool:
has_key = bool(self.private_key.strip()) or bool(self.private_key_path.strip())
return all(
[
self.app_id,
self.mch_id,
self.api_v3_key,
has_key,
self.cert_serial_no,
self.notify_url,
]
)
@property
def use_platform_public_key(self) -> bool:
has_pub = bool(self.platform_public_key.strip()) or bool(
self.platform_public_key_path.strip()
)
return has_pub and bool(self.platform_public_key_id.strip())
@dataclass
class AlipayConfig:
app_id: str = ""
private_key: str = ""
alipay_public_key: str = ""
notify_url: str = ""
sign_type: str = "RSA2"
@property
def is_configured(self) -> bool:
return all(
[
self.app_id,
self.private_key,
self.alipay_public_key,
self.notify_url,
]
)
@dataclass
class PaymentConfig:
wechat: WeChatPayConfig = field(default_factory=WeChatPayConfig)
alipay: AlipayConfig = field(default_factory=AlipayConfig)
alipay_under_development: bool = True
@classmethod
def from_settings(cls, settings) -> "PaymentConfig":
from app.core.runtime_constants import misc_defaults
wechat_private_key = (
getattr(settings, "wechat_pay_private_key", "") or ""
).strip()
if wechat_private_key:
wechat_private_key = (
wechat_private_key.strip('"')
.strip("'")
.lstrip("\ufeff")
.replace("\\n", "\n")
)
wechat_platform_pub = (
getattr(settings, "wechat_pay_platform_public_key", "") or ""
).strip()
if wechat_platform_pub and "\\n" in wechat_platform_pub:
wechat_platform_pub = wechat_platform_pub.replace("\\n", "\n")
wechat_private_key_path = (
getattr(settings, "wechat_pay_private_key_path", "") or ""
).strip()
alipay_under = misc_defaults.alipay_under_development.lower()
config = cls(
wechat=WeChatPayConfig(
app_id=getattr(settings, "wechat_pay_app_id", "") or "",
mch_id=getattr(settings, "wechat_pay_mch_id", "") or "",
api_v3_key=getattr(settings, "wechat_pay_api_v3_key", "") or "",
private_key_path=wechat_private_key_path,
private_key=wechat_private_key if not wechat_private_key_path else "",
cert_serial_no=getattr(settings, "wechat_pay_cert_serial_no", "") or "",
notify_url=getattr(settings, "wechat_pay_notify_url", "") or "",
platform_public_key=wechat_platform_pub,
platform_public_key_path=(
getattr(settings, "wechat_pay_platform_public_key_path", "") or ""
).strip(),
platform_public_key_id=(
getattr(settings, "wechat_pay_platform_public_key_id", "") or ""
).strip(),
),
alipay=AlipayConfig(
app_id=getattr(settings, "alipay_app_id", "") or "",
private_key=getattr(settings, "alipay_private_key", "") or "",
alipay_public_key=getattr(settings, "alipay_public_key", "") or "",
notify_url=getattr(settings, "alipay_notify_url", "") or "",
sign_type=misc_defaults.alipay_sign_type,
),
alipay_under_development=alipay_under in ("true", "1", "yes"),
)
return config
@classmethod
def from_env(cls) -> "PaymentConfig":
from app.core.config import settings
config = cls.from_settings(settings)
if config.wechat.is_configured:
mode = (
"平台公钥模式"
if config.wechat.use_platform_public_key
else "平台证书模式"
)
key_src = "私钥内容" if config.wechat.private_key.strip() else "私钥路径"
logger.info(
"微信支付配置已加载: APP_ID={}, MCH_ID={}, 模式={}, 商户私钥={}",
config.wechat.app_id,
config.wechat.mch_id,
mode,
key_src,
)
else:
logger.warning("微信支付配置不完整,微信支付将不可用")
if config.alipay.is_configured:
logger.info("支付宝配置已加载: APP_ID={}", config.alipay.app_id)
else:
logger.warning("支付宝配置不完整,支付宝支付将不可用")
return config