* refactor: 表结构重构,新增段落section和图片image新表 * fix: fix android app import error * refactor: 重构文件名 * fix: 优化提示词 * fix: 消息气泡显示位置异常问题 --------- Co-authored-by: yangshilin <2157598560@qq.com>
211 lines
7.7 KiB
Python
211 lines
7.7 KiB
Python
"""
|
||
PDF 生成服务
|
||
"""
|
||
import logging
|
||
from io import BytesIO
|
||
from typing import List
|
||
|
||
import httpx
|
||
from PIL import Image
|
||
from reportlab.lib.pagesizes import A4
|
||
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
|
||
from reportlab.lib.units import inch
|
||
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, PageBreak, Image as ReportLabImage
|
||
from reportlab.pdfbase import pdfmetrics
|
||
from reportlab.pdfbase.cidfonts import UnicodeCIDFont
|
||
from services.memoir_images.serializers import memoir_image_to_dict
|
||
from services.memoir_images.parser import PLACEHOLDER_RE
|
||
from services.memoir_images.schema import IMAGE_STATUS_COMPLETED, normalize_image_assets
|
||
from services.memoir_images.storage import (
|
||
CosDownloadUrlError,
|
||
TencentCosStorageService,
|
||
mark_image_delivery_unavailable,
|
||
resolve_image_storage_key,
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def strip_image_placeholders(text: str) -> str:
|
||
return PLACEHOLDER_RE.sub("", text or "").strip()
|
||
|
||
|
||
def split_content_blocks(content: str, images: list[dict]) -> list[dict]:
|
||
blocks: list[dict] = []
|
||
remaining = content
|
||
for image in sorted(images or [], key=lambda item: item.get("index", 0)):
|
||
placeholder = image.get("placeholder")
|
||
if not placeholder or placeholder not in remaining:
|
||
continue
|
||
before, remaining = remaining.split(placeholder, 1)
|
||
cleaned_before = strip_image_placeholders(before)
|
||
if cleaned_before:
|
||
blocks.append({"type": "text", "value": cleaned_before})
|
||
if image.get("status") == IMAGE_STATUS_COMPLETED and image.get("url"):
|
||
blocks.append({"type": "image", "url": image["url"]})
|
||
cleaned_remaining = strip_image_placeholders(remaining)
|
||
if cleaned_remaining:
|
||
blocks.append({"type": "text", "value": cleaned_remaining})
|
||
return blocks
|
||
|
||
|
||
def sections_to_blocks(sections: list, prepare_fn=None) -> list[dict]:
|
||
"""
|
||
从 chapter_sections 生成 PDF 用的 blocks:按 order_index 顺序,每段正文 + 可选一张图。
|
||
prepare_fn(images) 用于解析签名 URL,默认 _prepare_pdf_image_assets。
|
||
"""
|
||
if prepare_fn is None:
|
||
prepare_fn = _prepare_pdf_image_assets
|
||
blocks: list[dict] = []
|
||
for section in sorted(sections, key=lambda s: getattr(s, "order_index", 0)):
|
||
content = (getattr(section, "content", None) or "").strip()
|
||
if content:
|
||
blocks.append({"type": "text", "value": content})
|
||
img = None
|
||
if getattr(section, "image_record", None):
|
||
img = memoir_image_to_dict(section.image_record)
|
||
if img:
|
||
prepared = prepare_fn([img])
|
||
if prepared and prepared[0].get("url"):
|
||
blocks.append({"type": "image", "url": prepared[0]["url"]})
|
||
return blocks
|
||
|
||
|
||
def _prepare_pdf_image_assets(images: list[dict]) -> list[dict]:
|
||
storage = TencentCosStorageService.from_env()
|
||
prepared_assets: list[dict] = []
|
||
|
||
for item in normalize_image_assets(images):
|
||
asset = dict(item)
|
||
storage_key = resolve_image_storage_key(asset)
|
||
if asset.get("status") == IMAGE_STATUS_COMPLETED and storage_key:
|
||
try:
|
||
asset["url"] = storage.get_download_url(storage_key)
|
||
except CosDownloadUrlError as exc:
|
||
logger.warning(
|
||
"PDF 图片签名失败: key=%s, retryable=%s, request_id=%s, error=%s",
|
||
storage_key, exc.retryable, exc.request_id, exc,
|
||
)
|
||
asset = mark_image_delivery_unavailable(asset)
|
||
except Exception as exc:
|
||
logger.warning("PDF 图片签名失败: key=%s, error=%s", storage_key, exc)
|
||
asset = mark_image_delivery_unavailable(asset)
|
||
prepared_assets.append(asset)
|
||
|
||
return prepared_assets
|
||
|
||
|
||
def _fit_image_size(image_bytes: bytes, max_width: float, max_height: float) -> tuple[float, float]:
|
||
with Image.open(BytesIO(image_bytes)) as image:
|
||
width, height = image.size
|
||
if width <= 0 or height <= 0:
|
||
return max_width, max_height
|
||
|
||
scale = min(max_width / width, max_height / height)
|
||
return width * scale, height * scale
|
||
|
||
|
||
class PDFService:
|
||
"""PDF 生成服务"""
|
||
|
||
def __init__(self):
|
||
try:
|
||
pdfmetrics.registerFont(UnicodeCIDFont('STSong-Light'))
|
||
self.chinese_font = 'STSong-Light'
|
||
except Exception:
|
||
self.chinese_font = 'Helvetica'
|
||
|
||
async def _fetch_image_bytes(self, url: str) -> bytes | None:
|
||
try:
|
||
async with httpx.AsyncClient(timeout=30) as client:
|
||
response = await client.get(url)
|
||
response.raise_for_status()
|
||
return response.content
|
||
except Exception as exc:
|
||
logger.warning(f"PDF 图片下载失败: url={url}, error={exc}")
|
||
return None
|
||
|
||
async def generate_pdf(self, book, chapters: List) -> bytes:
|
||
buffer = BytesIO()
|
||
doc = SimpleDocTemplate(buffer, pagesize=A4)
|
||
|
||
styles = getSampleStyleSheet()
|
||
title_style = ParagraphStyle(
|
||
'CustomTitle',
|
||
parent=styles['Heading1'],
|
||
fontSize=24,
|
||
spaceAfter=30,
|
||
alignment=1,
|
||
fontName=self.chinese_font
|
||
)
|
||
|
||
heading_style = ParagraphStyle(
|
||
'CustomHeading',
|
||
parent=styles['Heading1'],
|
||
fontSize=18,
|
||
spaceAfter=12,
|
||
fontName=self.chinese_font
|
||
)
|
||
|
||
normal_style = ParagraphStyle(
|
||
'CustomNormal',
|
||
parent=styles['Normal'],
|
||
fontSize=12,
|
||
leading=18,
|
||
fontName=self.chinese_font
|
||
)
|
||
|
||
story = []
|
||
|
||
story.append(Paragraph(book.title, title_style))
|
||
story.append(Spacer(1, 0.5 * inch))
|
||
story.append(PageBreak())
|
||
|
||
story.append(Paragraph("目录", heading_style))
|
||
story.append(Spacer(1, 0.2 * inch))
|
||
for i, chapter in enumerate(chapters, 1):
|
||
story.append(Paragraph(f"{i}. {chapter.title}", normal_style))
|
||
story.append(PageBreak())
|
||
|
||
for chapter in chapters:
|
||
story.append(Paragraph(chapter.title, heading_style))
|
||
story.append(Spacer(1, 0.2 * inch))
|
||
|
||
sections = getattr(chapter, "sections", None) or []
|
||
if sections:
|
||
blocks = sections_to_blocks(sections)
|
||
else:
|
||
images = _prepare_pdf_image_assets(getattr(chapter, "images", None) or [])
|
||
blocks = split_content_blocks(getattr(chapter, "content", "") or "", images)
|
||
|
||
for block in blocks:
|
||
if block["type"] == "text":
|
||
paragraphs = block["value"].split('\n\n')
|
||
for para in paragraphs:
|
||
if para.strip():
|
||
story.append(Paragraph(para.strip(), normal_style))
|
||
story.append(Spacer(1, 0.1 * inch))
|
||
elif block["type"] == "image":
|
||
image_bytes = await self._fetch_image_bytes(block["url"])
|
||
if image_bytes:
|
||
try:
|
||
width, height = _fit_image_size(
|
||
image_bytes,
|
||
max_width=5 * inch,
|
||
max_height=3.75 * inch,
|
||
)
|
||
img = ReportLabImage(BytesIO(image_bytes), width=width, height=height)
|
||
story.append(img)
|
||
story.append(Spacer(1, 0.2 * inch))
|
||
except Exception as exc:
|
||
logger.warning(f"PDF 图片嵌入失败: {exc}")
|
||
|
||
story.append(PageBreak())
|
||
|
||
doc.build(story)
|
||
buffer.seek(0)
|
||
return buffer.read()
|
||
|
||
|
||
pdf_service = PDFService()
|