""" 章节相关 API 路由 """ import logging import os from typing import List, Optional from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from database import get_async_db from database.models import Chapter as ChapterModel from database.models import User as UserModel from middleware.auth import get_current_user from agents.prompts.memory_prompts import CHAPTER_CATEGORIES, CHAPTER_ORDER, STAGE_TO_ORDER from services.memoir_images.schema import ( completed_image_assets, IMAGE_STATUS_COMPLETED, normalize_image_assets, ) from services.memoir_images.settings import MemoirImageSettings from services.memoir_images.storage import ( TencentCosStorageService, normalize_cos_url, resolve_image_storage_key, ) router = APIRouter(prefix="/api/chapters", tags=["chapters"]) logger = logging.getLogger(__name__) def _normalize_image_assets(images: list[dict] | None) -> list[dict]: bucket = os.getenv("TENCENT_COS_BUCKET", "") region = os.getenv("TENCENT_COS_REGION", "") base_url = os.getenv("TENCENT_COS_BASE_URL", "") storage = TencentCosStorageService.from_env() settings = MemoirImageSettings.from_env() source_assets = normalize_image_assets(images) if not settings.enabled: source_assets = completed_image_assets(source_assets) normalized_assets: list[dict] = [] for item in source_assets: asset = dict(item) normalized_url = normalize_cos_url( asset.get("url"), bucket=bucket, region=region, base_url=base_url, ) storage_key = resolve_image_storage_key(asset) if asset.get("status") == IMAGE_STATUS_COMPLETED and storage_key: try: asset["url"] = storage.get_download_url(storage_key) except Exception as exc: logger.warning("章节图片签名失败: key=%s, error=%s", storage_key, exc) asset["url"] = normalized_url asset["error"] = asset.get("error") or "image delivery unavailable" else: asset["url"] = normalized_url asset.pop("storage_key", None) normalized_assets.append(asset) return normalized_assets def _chapter_to_dict(ch: ChapterModel) -> dict: normalized_images = _normalize_image_assets(ch.images) return { "id": ch.id, "title": ch.title, "content": ch.content, "order_index": ch.order_index, "status": ch.status, "category": ch.category, "images": normalized_images, "updated_at": ch.updated_at.isoformat() if ch.updated_at else None, "is_new": ch.is_new, "source_segments": ch.source_segments or [], } @router.get("", response_model=List[dict]) async def get_chapters( current_user: UserModel = Depends(get_current_user), is_new: Optional[bool] = Query(None, description="仅返回未读章节"), db: AsyncSession = Depends(get_async_db) ): """ 获取用户所有章节(需要认证,仅返回 active 章节)。 始终返回全部 8 个预定义类别,没有内容的类别用占位符返回。 """ stmt = select(ChapterModel).where( ChapterModel.user_id == current_user.id, ChapterModel.is_active == True ) if is_new is True: stmt = stmt.where(ChapterModel.is_new == True) stmt = stmt.order_by(ChapterModel.order_index) result = await db.execute(stmt) chapters = result.scalars().all() chapter_by_category: dict[str, ChapterModel] = {} for ch in chapters: if ch.category and ch.category not in chapter_by_category: chapter_by_category[ch.category] = ch all_chapters: List[dict] = [] for category in CHAPTER_ORDER: ch = chapter_by_category.pop(category, None) if ch: all_chapters.append(_chapter_to_dict(ch)) else: if is_new is True: continue all_chapters.append({ "id": f"placeholder_{category}", "title": CHAPTER_CATEGORIES[category], "content": "", "order_index": STAGE_TO_ORDER.get(category, 999), "status": "empty", "category": category, "images": [], "updated_at": None, "is_new": False, "source_segments": [], }) for ch in chapter_by_category.values(): all_chapters.append(_chapter_to_dict(ch)) return all_chapters @router.get("/{chapter_id}", response_model=dict) async def get_chapter( chapter_id: str, current_user: UserModel = Depends(get_current_user), db: AsyncSession = Depends(get_async_db) ): """获取章节详情(需要认证,只能访问自己的章节)""" chapter = await db.get(ChapterModel, chapter_id) if not chapter: raise HTTPException(status_code=404, detail="Chapter not found") # 验证用户权限 if chapter.user_id != current_user.id: raise HTTPException(status_code=403, detail="无权访问此章节") return { "id": chapter.id, "title": chapter.title, "content": chapter.content, "order_index": chapter.order_index, "status": chapter.status, "category": chapter.category, "images": _normalize_image_assets(chapter.images), "updated_at": chapter.updated_at.isoformat() if chapter.updated_at else None, "is_new": chapter.is_new, "source_segments": chapter.source_segments or [], } @router.delete("/{chapter_id}") async def disable_chapter( chapter_id: str, current_user: UserModel = Depends(get_current_user), db: AsyncSession = Depends(get_async_db) ): """清除章节(将章节标记为 disabled,需要认证,只能操作自己的章节)""" chapter = await db.get(ChapterModel, chapter_id) if not chapter: raise HTTPException(status_code=404, detail="Chapter not found") # 验证用户权限 if chapter.user_id != current_user.id: raise HTTPException(status_code=403, detail="无权操作此章节") # 将章节标记为 disabled(不物理删除) chapter.is_active = False await db.commit() return {"status": "ok", "message": "章节已清除"} @router.post("/{chapter_id}/regenerate") async def regenerate_chapter( chapter_id: str, current_user: UserModel = Depends(get_current_user), db: AsyncSession = Depends(get_async_db) ): """重新整理章节(需要认证,只能操作自己的章节)""" chapter = await db.get(ChapterModel, chapter_id) if not chapter: raise HTTPException(status_code=404, detail="Chapter not found") # 验证用户权限 if chapter.user_id != current_user.id: raise HTTPException(status_code=403, detail="无权操作此章节") # TODO: 实现重新整理逻辑 return {"status": "ok", "message": "Chapter regeneration triggered"}