- 新增faqs.py常见问题路由 - 新增feedback.py反馈路由 - 新增orders.py订单路由 - 新增plans.py套餐路由 - 新增quota.py配额路由 - 新增user.py用户路由 - 更新main.py注册新路由 - 更新requirements.txt添加依赖
127 lines
4.0 KiB
Python
127 lines
4.0 KiB
Python
"""
|
|
配额检查 API 路由
|
|
"""
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from pydantic import BaseModel
|
|
from typing import Optional
|
|
|
|
from middleware.auth import get_current_user
|
|
from database.models import User
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from database import get_async_db
|
|
from sqlalchemy import select, func
|
|
|
|
router = APIRouter(prefix="/api/quota", tags=["quota"])
|
|
|
|
|
|
class QuotaCheckResponse(BaseModel):
|
|
"""配额检查响应"""
|
|
has_quota: bool # 是否有配额
|
|
remaining_conversations: Optional[int] = None # 剩余对话次数
|
|
remaining_chapters: Optional[int] = None # 剩余章节数
|
|
remaining_words: Optional[int] = None # 剩余字数
|
|
message: str # 提示信息
|
|
|
|
|
|
# 计划配额限制
|
|
PLAN_QUOTAS = {
|
|
"free": {
|
|
"max_conversations": 3,
|
|
"max_chapters": 10,
|
|
"max_words": 50000
|
|
},
|
|
"premium": {
|
|
"max_conversations": None, # 无限制
|
|
"max_chapters": None,
|
|
"max_words": None
|
|
}
|
|
}
|
|
|
|
|
|
@router.get("/check", response_model=QuotaCheckResponse)
|
|
async def check_quota(
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""
|
|
检查用户配额使用情况
|
|
|
|
根据用户的订阅计划检查是否还有配额可以使用
|
|
"""
|
|
plan_type = current_user.subscription_type
|
|
quotas = PLAN_QUOTAS.get(plan_type, PLAN_QUOTAS["free"])
|
|
|
|
# 如果是高级版,无限制
|
|
if plan_type == "premium":
|
|
return QuotaCheckResponse(
|
|
has_quota=True,
|
|
remaining_conversations=None,
|
|
remaining_chapters=None,
|
|
remaining_words=None,
|
|
message="高级版用户,无使用限制"
|
|
)
|
|
|
|
# 统计使用情况
|
|
async for db in get_async_db():
|
|
# 统计对话数量
|
|
from database.models import Conversation
|
|
stmt = select(func.count(Conversation.id)).where(
|
|
Conversation.user_id == current_user.id
|
|
)
|
|
result = await db.execute(stmt)
|
|
conversation_count = result.scalar() or 0
|
|
|
|
# 统计章节数量
|
|
from database.models import Chapter
|
|
stmt = select(func.count(Chapter.id)).where(
|
|
Chapter.user_id == current_user.id
|
|
)
|
|
result = await db.execute(stmt)
|
|
chapter_count = result.scalar() or 0
|
|
|
|
# 统计总字数
|
|
stmt = select(func.sum(func.length(Chapter.content))).where(
|
|
Chapter.user_id == current_user.id
|
|
)
|
|
result = await db.execute(stmt)
|
|
total_words = result.scalar() or 0
|
|
|
|
# 计算剩余配额
|
|
max_conversations = quotas.get("max_conversations")
|
|
max_chapters = quotas.get("max_chapters")
|
|
max_words = quotas.get("max_words")
|
|
|
|
remaining_conversations = None
|
|
remaining_chapters = None
|
|
remaining_words = None
|
|
|
|
if max_conversations is not None:
|
|
remaining_conversations = max(0, max_conversations - conversation_count)
|
|
|
|
if max_chapters is not None:
|
|
remaining_chapters = max(0, max_chapters - chapter_count)
|
|
|
|
if max_words is not None:
|
|
remaining_words = max(0, max_words - total_words)
|
|
|
|
# 检查是否有配额
|
|
has_quota = True
|
|
message = "配额充足"
|
|
|
|
if max_conversations is not None and conversation_count >= max_conversations:
|
|
has_quota = False
|
|
message = "对话次数已用完,请升级到高级版"
|
|
elif max_chapters is not None and chapter_count >= max_chapters:
|
|
has_quota = False
|
|
message = "章节数量已达上限,请升级到高级版"
|
|
elif max_words is not None and total_words >= max_words:
|
|
has_quota = False
|
|
message = "字数已达上限,请升级到高级版"
|
|
|
|
return QuotaCheckResponse(
|
|
has_quota=has_quota,
|
|
remaining_conversations=remaining_conversations,
|
|
remaining_chapters=remaining_chapters,
|
|
remaining_words=remaining_words,
|
|
message=message
|
|
)
|