Files
life-echo/api/routers/quota.py
iammm0 3690417fdc feat: 新增后端API路由模块
- 新增faqs.py常见问题路由
- 新增feedback.py反馈路由
- 新增orders.py订单路由
- 新增plans.py套餐路由
- 新增quota.py配额路由
- 新增user.py用户路由
- 更新main.py注册新路由
- 更新requirements.txt添加依赖
2026-01-23 14:02:36 +08:00

127 lines
4.0 KiB
Python

"""
配额检查 API 路由
"""
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from typing import Optional
from middleware.auth import get_current_user
from database.models import User
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_async_db
from sqlalchemy import select, func
router = APIRouter(prefix="/api/quota", tags=["quota"])
class QuotaCheckResponse(BaseModel):
"""配额检查响应"""
has_quota: bool # 是否有配额
remaining_conversations: Optional[int] = None # 剩余对话次数
remaining_chapters: Optional[int] = None # 剩余章节数
remaining_words: Optional[int] = None # 剩余字数
message: str # 提示信息
# 计划配额限制
PLAN_QUOTAS = {
"free": {
"max_conversations": 3,
"max_chapters": 10,
"max_words": 50000
},
"premium": {
"max_conversations": None, # 无限制
"max_chapters": None,
"max_words": None
}
}
@router.get("/check", response_model=QuotaCheckResponse)
async def check_quota(
current_user: User = Depends(get_current_user)
):
"""
检查用户配额使用情况
根据用户的订阅计划检查是否还有配额可以使用
"""
plan_type = current_user.subscription_type
quotas = PLAN_QUOTAS.get(plan_type, PLAN_QUOTAS["free"])
# 如果是高级版,无限制
if plan_type == "premium":
return QuotaCheckResponse(
has_quota=True,
remaining_conversations=None,
remaining_chapters=None,
remaining_words=None,
message="高级版用户,无使用限制"
)
# 统计使用情况
async for db in get_async_db():
# 统计对话数量
from database.models import Conversation
stmt = select(func.count(Conversation.id)).where(
Conversation.user_id == current_user.id
)
result = await db.execute(stmt)
conversation_count = result.scalar() or 0
# 统计章节数量
from database.models import Chapter
stmt = select(func.count(Chapter.id)).where(
Chapter.user_id == current_user.id
)
result = await db.execute(stmt)
chapter_count = result.scalar() or 0
# 统计总字数
stmt = select(func.sum(func.length(Chapter.content))).where(
Chapter.user_id == current_user.id
)
result = await db.execute(stmt)
total_words = result.scalar() or 0
# 计算剩余配额
max_conversations = quotas.get("max_conversations")
max_chapters = quotas.get("max_chapters")
max_words = quotas.get("max_words")
remaining_conversations = None
remaining_chapters = None
remaining_words = None
if max_conversations is not None:
remaining_conversations = max(0, max_conversations - conversation_count)
if max_chapters is not None:
remaining_chapters = max(0, max_chapters - chapter_count)
if max_words is not None:
remaining_words = max(0, max_words - total_words)
# 检查是否有配额
has_quota = True
message = "配额充足"
if max_conversations is not None and conversation_count >= max_conversations:
has_quota = False
message = "对话次数已用完,请升级到高级版"
elif max_chapters is not None and chapter_count >= max_chapters:
has_quota = False
message = "章节数量已达上限,请升级到高级版"
elif max_words is not None and total_words >= max_words:
has_quota = False
message = "字数已达上限,请升级到高级版"
return QuotaCheckResponse(
has_quota=has_quota,
remaining_conversations=remaining_conversations,
remaining_chapters=remaining_chapters,
remaining_words=remaining_words,
message=message
)