""" 订阅计划相关 API 路由 """ from fastapi import APIRouter, Depends from pydantic import BaseModel from typing import List, Optional from datetime import datetime from sqlalchemy.ext.asyncio import AsyncSession from middleware.auth import get_current_user from database.models import User from database import get_async_db router = APIRouter(prefix="/api/plans", tags=["plans"]) class PlanResponse(BaseModel): """订阅计划响应""" id: str name: str display_name: str price: float currency: str features: List[str] max_conversations: Optional[int] = None # None表示无限制 max_chapters: Optional[int] = None max_words: Optional[int] = None is_popular: bool = False class CurrentPlanResponse(BaseModel): """当前订阅计划响应""" plan_id: str plan_name: str subscription_type: str expires_at: Optional[str] = None # 过期时间,None表示永久 features: List[str] usage: dict # 使用情况统计 # 预定义的订阅计划 AVAILABLE_PLANS = [ PlanResponse( id="free", name="free", display_name="免费版", price=0.0, currency="CNY", features=[ "基础对话功能", "生成回忆录章节", "最多3次对话", "最多10个章节" ], max_conversations=3, max_chapters=10, max_words=50000, is_popular=False ), PlanResponse( id="premium", name="premium", display_name="高级版", price=99.0, currency="CNY", features=[ "无限对话", "无限章节", "无限字数", "优先处理", "专属客服支持" ], max_conversations=None, max_chapters=None, max_words=None, is_popular=True ) ] def get_plan_by_type(subscription_type: str) -> Optional[PlanResponse]: """根据订阅类型获取计划信息""" for plan in AVAILABLE_PLANS: if plan.id == subscription_type: return plan return AVAILABLE_PLANS[0] # 默认返回免费版 @router.get("", response_model=List[PlanResponse]) async def get_plans(): """ 获取所有可用的订阅计划 """ return AVAILABLE_PLANS @router.get("/current", response_model=CurrentPlanResponse) async def get_current_plan( current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_async_db) ): """ 获取当前用户的订阅计划信息 """ plan = get_plan_by_type(current_user.subscription_type) # 计算使用情况 from sqlalchemy import select, func # 统计对话数量 from database.models import Conversation stmt = select(func.count(Conversation.id)).where( Conversation.user_id == current_user.id ) result = await db.execute(stmt) conversation_count = result.scalar() or 0 # 统计章节数量 from database.models import Chapter stmt = select(func.count(Chapter.id)).where( Chapter.user_id == current_user.id ) result = await db.execute(stmt) chapter_count = result.scalar() or 0 # 统计总字数 stmt = select(func.sum(func.length(Chapter.content))).where( Chapter.user_id == current_user.id ) result = await db.execute(stmt) total_words = result.scalar() or 0 usage = { "conversations": conversation_count, "chapters": chapter_count, "words": total_words, "max_conversations": plan.max_conversations, "max_chapters": plan.max_chapters, "max_words": plan.max_words } return CurrentPlanResponse( plan_id=plan.id, plan_name=plan.display_name, subscription_type=current_user.subscription_type, expires_at=None, # 目前没有过期时间概念 features=plan.features, usage=usage )