Files
life-echo/api/routers/chapters.py

306 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
章节相关 API 路由
"""
import logging
import os
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload
from database import get_async_db
from database.models import Chapter as ChapterModel, ChapterSection
from database.models import User as UserModel
from middleware.auth import get_current_user
from app.agents.memoir.prompts import CHAPTER_CATEGORIES, CHAPTER_ORDER, STAGE_TO_ORDER
from services.memoir_images.schema import (
completed_image_assets,
IMAGE_STATUS_COMPLETED,
IMAGE_STATUS_FAILED,
normalize_image_assets,
)
from services.memoir_images.serializers import memoir_image_to_dict
from services.memoir_images.settings import MemoirImageSettings
from services.memoir_images.storage import (
CosDownloadUrlError,
TencentCosStorageService,
mark_image_delivery_unavailable,
normalize_cos_url,
resolve_image_storage_key,
)
router = APIRouter(prefix="/api/chapters", tags=["chapters"])
logger = logging.getLogger(__name__)
def _normalize_image_assets(images: list[dict] | None) -> list[dict]:
bucket = os.getenv("TENCENT_COS_BUCKET", "")
region = os.getenv("TENCENT_COS_REGION", "")
base_url = os.getenv("TENCENT_COS_BASE_URL", "")
storage = TencentCosStorageService.from_env()
settings = MemoirImageSettings.from_env()
source_assets = normalize_image_assets(images)
if not settings.enabled:
source_assets = completed_image_assets(source_assets)
normalized_assets: list[dict] = []
for item in source_assets:
asset = dict(item)
normalized_url = normalize_cos_url(
asset.get("url"),
bucket=bucket,
region=region,
base_url=base_url,
)
storage_key = resolve_image_storage_key(asset)
if asset.get("status") == IMAGE_STATUS_COMPLETED and storage_key:
try:
asset["url"] = storage.get_download_url(storage_key)
except CosDownloadUrlError as exc:
logger.warning(
"章节图片签名失败: key=%s, retryable=%s, request_id=%s, error=%s",
storage_key, exc.retryable, exc.request_id, exc,
)
asset = mark_image_delivery_unavailable(asset)
except Exception as exc:
logger.warning("章节图片签名失败: key=%s, error=%s", storage_key, exc)
asset = mark_image_delivery_unavailable(asset)
else:
asset["url"] = normalized_url
asset.pop("storage_key", None)
normalized_assets.append(asset)
return normalized_assets
def _is_image_permanently_unavailable(rec) -> bool:
"""配图是否应清理:失败不可恢复,或 completed 但无 url/storage_key损坏数据"""
if not rec:
return False
status = getattr(rec, "status", None) or ""
retryable = getattr(rec, "retryable", None)
url = getattr(rec, "url", None)
storage_key = getattr(rec, "storage_key", None)
if status == IMAGE_STATUS_FAILED and retryable is False:
return True
if status == IMAGE_STATUS_COMPLETED and not url and not storage_key:
return True
return False
async def _cleanup_permanently_unavailable_images(ch: ChapterModel, db: AsyncSession) -> None:
"""清理章节中永久不可用的配图section.image_id 置空,删除 memoir_images 记录"""
sections = getattr(ch, "sections", None) or []
cleaned = False
for s in sections:
rec = getattr(s, "image_record", None)
if rec and _is_image_permanently_unavailable(rec):
logger.info("清理不可用配图: chapter=%s, section=%s", ch.id, s.id)
s.image_id = None
await db.delete(rec)
cleaned = True
if cleaned:
await db.commit()
await db.refresh(ch)
def _section_image_to_dict(section) -> dict | None:
"""从 section.image_id 关联的 memoir_imagesimage_record取配图。"""
if getattr(section, "image_record", None):
return memoir_image_to_dict(section.image_record)
return None
def _chapter_cover_to_dict(ch) -> dict | None:
"""优先从 memoir_images 表section_id 为空的一条)取封面,否则回退到 chapter.cover_image JSON。"""
images = getattr(ch, "images", None) or []
for m in images:
if getattr(m, "section_id", None) is None:
return memoir_image_to_dict(m)
if getattr(ch, "cover_image", None) and isinstance(ch.cover_image, dict):
return ch.cover_image
return None
def _sections_to_content_and_images(ch):
"""
从 chapter.sections 按 order_index 顺序拼出 content 与 images保证每段文字与配图一一对应。
客户端依赖 content 中的占位符(与 images 中每项的 placeholder 一致)来切分正文并插入图片。
"""
sections = getattr(ch, "sections", None) or []
ordered = sorted(sections, key=lambda s: getattr(s, "order_index", 0))
parts = []
images = []
for s in ordered:
text = (s.content or "").strip()
if text:
parts.append(text)
img = _section_image_to_dict(s)
if img:
images.append(img)
placeholder = (img.get("placeholder") or "").strip()
if placeholder:
parts.append(placeholder)
content = "\n\n".join(parts) if parts else ""
return content, images
def _chapter_to_dict(ch: ChapterModel) -> dict:
content, images_list = _sections_to_content_and_images(ch)
normalized_images = _normalize_image_assets(images_list)
cover = _chapter_cover_to_dict(ch)
cover_normalized = _normalize_image_assets([cover] if cover else [])[0] if cover else None
sections_data = []
if getattr(ch, "sections", None):
for s in sorted(ch.sections, key=lambda x: getattr(x, "order_index", 0)):
sec_img = _section_image_to_dict(s)
sec_img = _normalize_image_assets([sec_img] if sec_img else [])[0] if sec_img else None
sections_data.append({"content": (s.content or "").strip(), "image": sec_img})
return {
"id": ch.id,
"title": ch.title,
"content": content,
"order_index": ch.order_index,
"status": ch.status,
"category": ch.category,
"images": normalized_images,
"cover_image": cover_normalized,
"sections": sections_data,
"updated_at": ch.updated_at.isoformat() if ch.updated_at else None,
"is_new": ch.is_new,
"source_segments": ch.source_segments or [],
}
@router.get("", response_model=List[dict])
async def get_chapters(
current_user: UserModel = Depends(get_current_user),
is_new: Optional[bool] = Query(None, description="仅返回未读章节"),
db: AsyncSession = Depends(get_async_db)
):
"""
获取用户所有章节(需要认证,仅返回 active 章节)。
始终返回全部 8 个预定义类别,没有内容的类别用占位符返回。
"""
stmt = (
select(ChapterModel)
.where(
ChapterModel.user_id == current_user.id,
ChapterModel.is_active == True,
)
.options(
joinedload(ChapterModel.sections),
joinedload(ChapterModel.images),
joinedload(ChapterModel.sections).joinedload(ChapterSection.image_record),
)
.order_by(ChapterModel.order_index)
)
if is_new is True:
stmt = stmt.where(ChapterModel.is_new == True)
result = await db.execute(stmt)
chapters = result.unique().scalars().all()
chapter_by_category: dict[str, ChapterModel] = {}
for ch in chapters:
if ch.category and ch.category not in chapter_by_category:
chapter_by_category[ch.category] = ch
all_chapters: List[dict] = []
for category in CHAPTER_ORDER:
ch = chapter_by_category.pop(category, None)
if ch:
await _cleanup_permanently_unavailable_images(ch, db)
all_chapters.append(_chapter_to_dict(ch))
else:
if is_new is True:
continue
all_chapters.append({
"id": f"placeholder_{category}",
"title": CHAPTER_CATEGORIES[category],
"content": "",
"order_index": STAGE_TO_ORDER.get(category, 999),
"status": "empty",
"category": category,
"images": [],
"cover_image": None,
"sections": [],
"updated_at": None,
"is_new": False,
"source_segments": [],
})
for ch in chapter_by_category.values():
await _cleanup_permanently_unavailable_images(ch, db)
all_chapters.append(_chapter_to_dict(ch))
return all_chapters
@router.get("/{chapter_id}", response_model=dict)
async def get_chapter(
chapter_id: str,
current_user: UserModel = Depends(get_current_user),
db: AsyncSession = Depends(get_async_db)
):
"""获取章节详情(需要认证,只能访问自己的章节)"""
stmt = (
select(ChapterModel)
.where(ChapterModel.id == chapter_id)
.options(
joinedload(ChapterModel.sections),
joinedload(ChapterModel.images),
joinedload(ChapterModel.sections).joinedload(ChapterSection.image_record),
)
)
result = await db.execute(stmt)
chapter = result.unique().scalar_one_or_none()
if not chapter:
raise HTTPException(status_code=404, detail="Chapter not found")
if chapter.user_id != current_user.id:
raise HTTPException(status_code=403, detail="无权访问此章节")
await _cleanup_permanently_unavailable_images(chapter, db)
return _chapter_to_dict(chapter)
@router.delete("/{chapter_id}")
async def disable_chapter(
chapter_id: str,
current_user: UserModel = Depends(get_current_user),
db: AsyncSession = Depends(get_async_db)
):
"""清除章节(将章节标记为 disabled需要认证只能操作自己的章节"""
chapter = await db.get(ChapterModel, chapter_id)
if not chapter:
raise HTTPException(status_code=404, detail="Chapter not found")
# 验证用户权限
if chapter.user_id != current_user.id:
raise HTTPException(status_code=403, detail="无权操作此章节")
# 将章节标记为 disabled不物理删除
chapter.is_active = False
await db.commit()
return {"status": "ok", "message": "章节已清除"}
@router.post("/{chapter_id}/regenerate")
async def regenerate_chapter(
chapter_id: str,
current_user: UserModel = Depends(get_current_user),
db: AsyncSession = Depends(get_async_db)
):
"""重新整理章节(需要认证,只能操作自己的章节)"""
chapter = await db.get(ChapterModel, chapter_id)
if not chapter:
raise HTTPException(status_code=404, detail="Chapter not found")
# 验证用户权限
if chapter.user_id != current_user.id:
raise HTTPException(status_code=403, detail="无权操作此章节")
# TODO: 实现重新整理逻辑
return {"status": "ok", "message": "Chapter regeneration triggered"}