Files
life-echo/api/routers/quota.py
iammm0 44b405d647 refactor: 优化后端支付与微信支付
- 优化payment/config.py、wechat_pay.py
- 优化routers/payment.py、plans.py、quota.py、websocket.py
- 更新main.py、.env.production

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-11 16:06:15 +08:00

188 lines
6.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
配额检查 API 路由
「对话轮数」的定义每条用户发出的消息Segment 表的记录数)计为 1 轮。
"""
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from typing import Optional
from middleware.auth import get_current_user
from database.models import User
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_async_db
from sqlalchemy import select, func
router = APIRouter(prefix="/api/quota", tags=["quota"])
class QuotaCheckResponse(BaseModel):
"""配额检查响应"""
has_quota: bool
remaining_conversations: Optional[int] = None # 剩余对话轮数
remaining_chapters: Optional[int] = None
remaining_words: Optional[int] = None
# 已用量(前端展示 "已用 X / 共 Y"
used_conversations: Optional[int] = None
used_chapters: Optional[int] = None
max_conversations: Optional[int] = None
max_chapters: Optional[int] = None
message: str
# 计划配额限制(与 plans.py 中 AVAILABLE_PLANS 一致)
# 免费50 轮对话 + 1 个章节整理Pro2000 轮无章节限制Pro+10000 轮无章节限制
PLAN_QUOTAS = {
"free": {
"max_conversations": 50,
"max_chapters": 1,
"max_words": None
},
"pro": {
"max_conversations": 2000,
"max_chapters": None,
"max_words": None
},
"pro_plus": {
"max_conversations": 10000,
"max_chapters": None,
"max_words": None
},
# 兼容旧字段
"premium": {
"max_conversations": None,
"max_chapters": None,
"max_words": None
},
# 一分钱测试版(仅开发环境 ENABLE_TEST_PLAN=1无限对话、无限章节
"test": {
"max_conversations": None,
"max_chapters": None,
"max_words": None
},
}
async def get_segment_count(user_id: str, db: AsyncSession) -> int:
"""
获取用户已消耗的对话轮数(= 该用户所有 Segment 记录数)。
每条 Segment 对应一次用户发送的消息(文本/语音)。
"""
from database.models import Segment, Conversation
stmt = (
select(func.count(Segment.id))
.join(Conversation, Segment.conversation_id == Conversation.id)
.where(Conversation.user_id == user_id)
)
result = await db.execute(stmt)
return result.scalar() or 0
async def get_chapter_count(user_id: str, db: AsyncSession) -> int:
"""获取用户当前章节数量"""
from database.models import Chapter
stmt = select(func.count(Chapter.id)).where(Chapter.user_id == user_id)
result = await db.execute(stmt)
return result.scalar() or 0
# 保留旧名称别名,避免已有引用报错
async def get_conversation_count(user_id: str, db: AsyncSession) -> int:
"""别名:实际按 Segment 计数"""
return await get_segment_count(user_id, db)
def check_can_send_message(
subscription_type: str,
segment_count: int
) -> tuple[bool, str]:
"""
检查用户是否还能发送消息(对话轮数)。
返回 (是否允许, 提示信息)。
"""
quotas = PLAN_QUOTAS.get(subscription_type, PLAN_QUOTAS["free"])
max_conv = quotas.get("max_conversations")
if max_conv is None:
return True, ""
if segment_count >= max_conv:
return False, f"对话轮数已用完({segment_count}/{max_conv}),请升级 Pro 或 Pro+ 继续使用"
return True, ""
# 兼容旧调用
def check_can_create_conversation(
subscription_type: str,
conversation_count: int
) -> tuple[bool, str]:
return check_can_send_message(subscription_type, conversation_count)
def check_can_submit_organize(
subscription_type: str,
chapter_count: int
) -> tuple[bool, str]:
"""
检查是否可以提交整理任务(生成新章节)。
免费版仅允许 1 个章节。
返回 (是否允许, 提示信息)。
"""
quotas = PLAN_QUOTAS.get(subscription_type, PLAN_QUOTAS["free"])
max_ch = quotas.get("max_chapters")
if max_ch is None:
return True, ""
if chapter_count >= max_ch:
return False, "章节数量已达上限(免费版仅支持 1 个章节整理),请升级后继续"
return True, ""
@router.get("/check", response_model=QuotaCheckResponse)
async def check_quota(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_async_db)
):
"""
检查用户配额使用情况
"""
plan_type = current_user.subscription_type
quotas = PLAN_QUOTAS.get(plan_type, PLAN_QUOTAS["free"])
# 统计已用量
segment_count = await get_segment_count(current_user.id, db)
chapter_count = await get_chapter_count(current_user.id, db)
max_conversations = quotas.get("max_conversations")
max_chapters = quotas.get("max_chapters")
max_words = quotas.get("max_words")
remaining_conversations = None
remaining_chapters = None
remaining_words = None
if max_conversations is not None:
remaining_conversations = max(0, max_conversations - segment_count)
if max_chapters is not None:
remaining_chapters = max(0, max_chapters - chapter_count)
# 检查是否有配额
has_quota = True
message = "配额充足"
if max_conversations is not None and segment_count >= max_conversations:
has_quota = False
message = f"对话轮数已用完({segment_count}/{max_conversations}),请升级 Pro 或 Pro+ 继续使用"
elif max_chapters is not None and chapter_count >= max_chapters:
has_quota = False
message = "章节数量已达上限(免费版仅支持 1 个章节整理),请升级后继续"
return QuotaCheckResponse(
has_quota=has_quota,
remaining_conversations=remaining_conversations,
remaining_chapters=remaining_chapters,
remaining_words=remaining_words,
used_conversations=segment_count,
used_chapters=chapter_count,
max_conversations=max_conversations,
max_chapters=max_chapters,
message=message
)