212 lines
7.8 KiB
Python
212 lines
7.8 KiB
Python
"""
|
||
PDF 生成服务(从 services 迁入 memoir feature)
|
||
"""
|
||
from app.core.logging import get_logger
|
||
from io import BytesIO
|
||
from typing import List
|
||
|
||
import httpx
|
||
from PIL import Image
|
||
from reportlab.lib.pagesizes import A4
|
||
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
|
||
from reportlab.lib.units import inch
|
||
from reportlab.platypus import (
|
||
Image as ReportLabImage,
|
||
PageBreak,
|
||
Paragraph,
|
||
SimpleDocTemplate,
|
||
Spacer,
|
||
)
|
||
from reportlab.pdfbase import pdfmetrics
|
||
from reportlab.pdfbase.cidfonts import UnicodeCIDFont
|
||
|
||
from app.features.memoir.memoir_images.parser import PLACEHOLDER_RE
|
||
from app.features.memoir.memoir_images.schema import (
|
||
IMAGE_STATUS_COMPLETED,
|
||
normalize_image_assets,
|
||
)
|
||
from app.features.memoir.memoir_images.serializers import memoir_image_to_dict
|
||
from app.features.memoir.memoir_images.storage import (
|
||
CosDownloadUrlError,
|
||
TencentCosStorageService,
|
||
mark_image_delivery_unavailable,
|
||
resolve_image_storage_key,
|
||
)
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
def strip_image_placeholders(text: str) -> str:
|
||
return PLACEHOLDER_RE.sub("", text or "").strip()
|
||
|
||
|
||
def split_content_blocks(content: str, images: list[dict]) -> list[dict]:
|
||
blocks: list[dict] = []
|
||
remaining = content
|
||
for image in sorted(images or [], key=lambda item: item.get("index", 0)):
|
||
placeholder = image.get("placeholder")
|
||
if not placeholder or placeholder not in remaining:
|
||
continue
|
||
before, remaining = remaining.split(placeholder, 1)
|
||
cleaned_before = strip_image_placeholders(before)
|
||
if cleaned_before:
|
||
blocks.append({"type": "text", "value": cleaned_before})
|
||
if image.get("status") == IMAGE_STATUS_COMPLETED and image.get("url"):
|
||
blocks.append({"type": "image", "url": image["url"]})
|
||
cleaned_remaining = strip_image_placeholders(remaining)
|
||
if cleaned_remaining:
|
||
blocks.append({"type": "text", "value": cleaned_remaining})
|
||
return blocks
|
||
|
||
|
||
def sections_to_blocks(sections: list, prepare_fn=None) -> list[dict]:
|
||
if prepare_fn is None:
|
||
prepare_fn = _prepare_pdf_image_assets
|
||
blocks: list[dict] = []
|
||
for section in sorted(sections, key=lambda s: getattr(s, "order_index", 0)):
|
||
content = (getattr(section, "content", None) or "").strip()
|
||
if content:
|
||
blocks.append({"type": "text", "value": content})
|
||
img = None
|
||
if getattr(section, "image_record", None):
|
||
img = memoir_image_to_dict(section.image_record)
|
||
if img:
|
||
prepared = prepare_fn([img])
|
||
if prepared and prepared[0].get("url"):
|
||
blocks.append({"type": "image", "url": prepared[0]["url"]})
|
||
return blocks
|
||
|
||
|
||
def _prepare_pdf_image_assets(images: list[dict]) -> list[dict]:
|
||
storage = TencentCosStorageService.from_env()
|
||
prepared_assets: list[dict] = []
|
||
for item in normalize_image_assets(images):
|
||
asset = dict(item)
|
||
storage_key = resolve_image_storage_key(asset)
|
||
if asset.get("status") == IMAGE_STATUS_COMPLETED and storage_key:
|
||
try:
|
||
asset["url"] = storage.get_download_url(storage_key)
|
||
except CosDownloadUrlError as exc:
|
||
logger.warning(
|
||
"PDF 图片签名失败: key=%s, retryable=%s, request_id=%s, error=%s",
|
||
storage_key,
|
||
exc.retryable,
|
||
exc.request_id,
|
||
exc,
|
||
)
|
||
asset = mark_image_delivery_unavailable(asset)
|
||
except Exception as exc:
|
||
logger.warning("PDF 图片签名失败: key=%s, error=%s", storage_key, exc)
|
||
asset = mark_image_delivery_unavailable(asset)
|
||
prepared_assets.append(asset)
|
||
return prepared_assets
|
||
|
||
|
||
def _fit_image_size(
|
||
image_bytes: bytes, max_width: float, max_height: float
|
||
) -> tuple[float, float]:
|
||
with Image.open(BytesIO(image_bytes)) as image:
|
||
width, height = image.size
|
||
if width <= 0 or height <= 0:
|
||
return max_width, max_height
|
||
scale = min(max_width / width, max_height / height)
|
||
return width * scale, height * scale
|
||
|
||
|
||
class PDFService:
|
||
def __init__(self):
|
||
try:
|
||
pdfmetrics.registerFont(UnicodeCIDFont("STSong-Light"))
|
||
self.chinese_font = "STSong-Light"
|
||
except Exception:
|
||
self.chinese_font = "Helvetica"
|
||
|
||
async def _fetch_image_bytes(self, url: str) -> bytes | None:
|
||
try:
|
||
async with httpx.AsyncClient(timeout=30) as client:
|
||
response = await client.get(url)
|
||
response.raise_for_status()
|
||
return response.content
|
||
except Exception as exc:
|
||
logger.warning("PDF 图片下载失败: url=%s, error=%s", url, exc)
|
||
return None
|
||
|
||
async def generate_pdf(self, book, chapters: List) -> bytes:
|
||
buffer = BytesIO()
|
||
doc = SimpleDocTemplate(buffer, pagesize=A4)
|
||
styles = getSampleStyleSheet()
|
||
title_style = ParagraphStyle(
|
||
"CustomTitle",
|
||
parent=styles["Heading1"],
|
||
fontSize=24,
|
||
spaceAfter=30,
|
||
alignment=1,
|
||
fontName=self.chinese_font,
|
||
)
|
||
heading_style = ParagraphStyle(
|
||
"CustomHeading",
|
||
parent=styles["Heading1"],
|
||
fontSize=18,
|
||
spaceAfter=12,
|
||
fontName=self.chinese_font,
|
||
)
|
||
normal_style = ParagraphStyle(
|
||
"CustomNormal",
|
||
parent=styles["Normal"],
|
||
fontSize=12,
|
||
leading=18,
|
||
fontName=self.chinese_font,
|
||
)
|
||
story = []
|
||
story.append(Paragraph(book.title, title_style))
|
||
story.append(Spacer(1, 0.5 * inch))
|
||
story.append(PageBreak())
|
||
story.append(Paragraph("目录", heading_style))
|
||
story.append(Spacer(1, 0.2 * inch))
|
||
for i, chapter in enumerate(chapters, 1):
|
||
story.append(Paragraph(f"{i}. {chapter.title}", normal_style))
|
||
story.append(PageBreak())
|
||
for chapter in chapters:
|
||
story.append(Paragraph(chapter.title, heading_style))
|
||
story.append(Spacer(1, 0.2 * inch))
|
||
sections = getattr(chapter, "sections", None) or []
|
||
if sections:
|
||
blocks = sections_to_blocks(sections)
|
||
else:
|
||
images = _prepare_pdf_image_assets(
|
||
getattr(chapter, "images", None) or []
|
||
)
|
||
blocks = split_content_blocks(
|
||
getattr(chapter, "content", "") or "", images
|
||
)
|
||
for block in blocks:
|
||
if block["type"] == "text":
|
||
paragraphs = block["value"].split("\n\n")
|
||
for para in paragraphs:
|
||
if para.strip():
|
||
story.append(Paragraph(para.strip(), normal_style))
|
||
story.append(Spacer(1, 0.1 * inch))
|
||
elif block["type"] == "image":
|
||
image_bytes = await self._fetch_image_bytes(block["url"])
|
||
if image_bytes:
|
||
try:
|
||
width, height = _fit_image_size(
|
||
image_bytes,
|
||
max_width=5 * inch,
|
||
max_height=3.75 * inch,
|
||
)
|
||
img = ReportLabImage(
|
||
BytesIO(image_bytes), width=width, height=height
|
||
)
|
||
story.append(img)
|
||
story.append(Spacer(1, 0.2 * inch))
|
||
except Exception as exc:
|
||
logger.warning("PDF 图片嵌入失败: %s", exc)
|
||
story.append(PageBreak())
|
||
doc.build(story)
|
||
buffer.seek(0)
|
||
return buffer.read()
|
||
|
||
|
||
pdf_service = PDFService()
|