1076 lines
43 KiB
Python
1076 lines
43 KiB
Python
"""
|
||
回忆录处理 Celery 任务
|
||
"""
|
||
import json
|
||
from app.core.logging import get_logger
|
||
import uuid
|
||
from io import BytesIO
|
||
from typing import Dict, List
|
||
from datetime import datetime, timezone
|
||
|
||
import redis
|
||
from celery import shared_task
|
||
from PIL import Image
|
||
from sqlalchemy import delete, select
|
||
from sqlalchemy.orm import Session, joinedload
|
||
|
||
from app.core.db import get_sync_db
|
||
from app.features.conversation.models import Segment
|
||
from app.features.memoir.models import (
|
||
Book,
|
||
Chapter,
|
||
ChapterSection,
|
||
MemoirImage,
|
||
MemoirState,
|
||
)
|
||
from app.features.user.models import User
|
||
from app.core.dependencies import get_llm_provider
|
||
from app.agents.state_schema import MemoirStateSchema, SlotData, default_state
|
||
from app.agents.memoir.prompts import (
|
||
STAGE_TO_ORDER,
|
||
get_narrative_json_prompt,
|
||
inject_image_placeholder_template,
|
||
)
|
||
from app.agents.memoir import MemoirOrchestrator
|
||
from app.agents.memoir.narrative_agent import NarrativeAgent
|
||
from app.agents.memoir.placeholder_agent import inject_placeholders
|
||
from app.agents.chat.prompts_profile import format_user_profile_context
|
||
from app.features.memoir.memoir_images.parser import (
|
||
build_initial_image_assets,
|
||
parse_image_placeholders,
|
||
parse_narrative_to_sections,
|
||
split_narrative_to_sections,
|
||
)
|
||
import hashlib
|
||
from app.core.dependencies import get_image_generator
|
||
from app.agents.image_prompt import ImagePromptOrchestrator
|
||
from app.features.memoir.memoir_images.prompting import MemoirImagePromptService
|
||
from app.features.memoir.memoir_images.schema import (
|
||
completed_image_assets,
|
||
IMAGE_STATUS_COMPLETED,
|
||
IMAGE_STATUS_FAILED,
|
||
IMAGE_STATUS_PENDING,
|
||
IMAGE_STATUS_PROCESSING,
|
||
normalize_image_assets,
|
||
)
|
||
from app.features.memoir.memoir_images.serializers import (
|
||
image_dict_to_row_kwargs,
|
||
memoir_image_to_dict,
|
||
)
|
||
from app.features.memoir.memoir_images.settings import MemoirImageSettings
|
||
from app.ports.image_gen import TaskStatus
|
||
from app.features.memoir.memoir_images.storage import (
|
||
TencentCosStorageService,
|
||
CosUploadError,
|
||
)
|
||
|
||
logger = get_logger(__name__)
|
||
_REDIS_CLIENTS: dict[bool, redis.Redis] = {}
|
||
|
||
|
||
def _is_json_narrative(text: str) -> bool:
|
||
"""检测 narrative 是否为 JSON 格式(paragraphs 结构)"""
|
||
if not text or not text.strip():
|
||
return False
|
||
s = text.strip()
|
||
return s.startswith("{") and "paragraphs" in s
|
||
|
||
|
||
def _get_llm():
|
||
"""Celery 任务内获取 LangChain LLM(通过 port)"""
|
||
try:
|
||
return getattr(get_llm_provider(), "langchain_llm", None)
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _get_redis_client(*, decode_responses: bool = False) -> redis.Redis:
|
||
from app.core.config import settings
|
||
|
||
client = _REDIS_CLIENTS.get(decode_responses)
|
||
if client is None:
|
||
client = redis.from_url(
|
||
settings.redis_url,
|
||
decode_responses=decode_responses,
|
||
)
|
||
_REDIS_CLIENTS[decode_responses] = client
|
||
return client
|
||
|
||
|
||
def _acquire_chapter_lock(user_id: str, stage: str, timeout: int = 120) -> bool:
|
||
"""获取章节分布式锁,防止并发写入同一章节"""
|
||
r = _get_redis_client()
|
||
lock_key = f"lock:chapter:{user_id}:{stage}"
|
||
return r.set(lock_key, "1", nx=True, ex=timeout)
|
||
|
||
|
||
def _release_chapter_lock(user_id: str, stage: str):
|
||
"""释放章节分布式锁"""
|
||
r = _get_redis_client()
|
||
lock_key = f"lock:chapter:{user_id}:{stage}"
|
||
r.delete(lock_key)
|
||
|
||
|
||
def _acquire_chapter_image_lock(chapter_id: str, timeout: int = 600) -> bool:
|
||
"""获取章节补图分布式锁,避免同一章节重复补图。"""
|
||
r = _get_redis_client()
|
||
lock_key = f"lock:chapter-images:{chapter_id}"
|
||
return r.set(lock_key, "1", nx=True, ex=timeout)
|
||
|
||
|
||
def _release_chapter_image_lock(chapter_id: str):
|
||
"""释放章节补图分布式锁。"""
|
||
r = _get_redis_client()
|
||
lock_key = f"lock:chapter-images:{chapter_id}"
|
||
r.delete(lock_key)
|
||
|
||
|
||
def _update_task_status_sync(user_id: str, task_id: str, status: str, result: Dict = None):
|
||
"""同步更新任务状态(在 Celery 任务中使用)"""
|
||
try:
|
||
r = _get_redis_client(decode_responses=True)
|
||
|
||
key = f"task:user:{user_id}:tasks"
|
||
|
||
# 获取现有任务信息
|
||
data = r.hget(key, task_id)
|
||
if data:
|
||
task_info = json.loads(data)
|
||
else:
|
||
task_info = {"task_id": task_id}
|
||
|
||
task_info["status"] = status
|
||
task_info["updated_at"] = datetime.now(timezone.utc).isoformat()
|
||
if result is not None:
|
||
task_info["result"] = result
|
||
|
||
r.hset(key, task_id, json.dumps(task_info))
|
||
r.expire(key, 3600) # 1小时过期
|
||
|
||
logger.info(f"任务状态已更新: task_id={task_id}, status={status}")
|
||
except Exception as e:
|
||
logger.error(f"更新任务状态失败: {e}")
|
||
|
||
|
||
def _merge_chapter_image_assets(
|
||
existing_images: list[dict] | None,
|
||
placeholders: list[dict],
|
||
provider: str,
|
||
style: str,
|
||
size: str,
|
||
now_iso: str,
|
||
) -> list[dict]:
|
||
normalized_existing_images = normalize_image_assets(existing_images)
|
||
existing_by_placeholder = {
|
||
item.get("placeholder"): dict(item)
|
||
for item in normalized_existing_images
|
||
if item.get("placeholder")
|
||
}
|
||
merged_assets: list[dict] = []
|
||
|
||
for item in placeholders:
|
||
existing = existing_by_placeholder.get(item["placeholder"])
|
||
if existing:
|
||
merged_item = dict(existing)
|
||
merged_item["index"] = item["index"]
|
||
merged_item["placeholder"] = item["placeholder"]
|
||
merged_item["description"] = item["description"]
|
||
merged_item["provider"] = merged_item.get("provider") or provider
|
||
merged_item["style"] = merged_item.get("style") or style
|
||
merged_item["size"] = merged_item.get("size") or size
|
||
merged_item["created_at"] = merged_item.get("created_at") or now_iso
|
||
merged_item["updated_at"] = merged_item.get("updated_at") or now_iso
|
||
if merged_item.get("status") == IMAGE_STATUS_COMPLETED and not (
|
||
merged_item.get("storage_key") or merged_item.get("url")
|
||
):
|
||
merged_item["status"] = IMAGE_STATUS_FAILED
|
||
merged_item["error"] = merged_item.get("error") or "missing image url"
|
||
else:
|
||
merged_item = build_initial_image_assets(
|
||
placeholders=[item],
|
||
provider=provider,
|
||
style=style,
|
||
size=size,
|
||
now_iso=now_iso,
|
||
)[0]
|
||
merged_assets.append(merged_item)
|
||
|
||
return merged_assets
|
||
|
||
|
||
def chapter_has_images_to_generate(images: list[dict] | None) -> bool:
|
||
return any(
|
||
item.get("status") in {IMAGE_STATUS_PENDING, IMAGE_STATUS_FAILED}
|
||
for item in normalize_image_assets(images)
|
||
)
|
||
|
||
|
||
def _memoir_image_from_asset(
|
||
chapter_id: str,
|
||
section_id: str | None,
|
||
order_index: int,
|
||
image_asset: dict,
|
||
) -> MemoirImage:
|
||
"""从单条图片 dict 构建 MemoirImage 行(用于写入 memoir_images 表)。"""
|
||
kwargs = image_dict_to_row_kwargs(image_asset)
|
||
return MemoirImage(
|
||
id=str(uuid.uuid4()).replace("-", "")[:32],
|
||
chapter_id=chapter_id,
|
||
section_id=section_id,
|
||
order_index=order_index,
|
||
**kwargs,
|
||
)
|
||
|
||
|
||
def _section_has_image_to_generate(section) -> bool:
|
||
"""章节段落是否有待生成的配图(从 image_record / image_id 关联的 memoir_images 读取)。"""
|
||
r = getattr(section, "image_record", None)
|
||
if not r:
|
||
return False
|
||
status = (getattr(r, "status") or "").strip()
|
||
return status in (IMAGE_STATUS_PENDING, IMAGE_STATUS_FAILED)
|
||
|
||
|
||
def _chapter_has_any_section_images_to_generate(chapter) -> bool:
|
||
if not chapter or not getattr(chapter, "sections", None):
|
||
return False
|
||
return any(_section_has_image_to_generate(s) for s in chapter.sections)
|
||
|
||
|
||
def _chapter_has_cover_to_generate(chapter) -> bool:
|
||
"""章节是否有待生成的封面图(MemoirImage section_id=None 且 status 为 pending/failed)。"""
|
||
images = getattr(chapter, "images", None) or []
|
||
for m in images:
|
||
if getattr(m, "section_id", None) is None:
|
||
status = (getattr(m, "status") or "").strip()
|
||
return status in (IMAGE_STATUS_PENDING, IMAGE_STATUS_FAILED)
|
||
return False
|
||
|
||
|
||
def _get_cover_memoir_image(chapter):
|
||
"""获取章节封面 MemoirImage(section_id=None),若无可生成则返回 None。"""
|
||
images = getattr(chapter, "images", None) or []
|
||
for m in images:
|
||
if getattr(m, "section_id", None) is None:
|
||
return m
|
||
return None
|
||
|
||
|
||
def _select_placeholders_for_effective_max(
|
||
placeholders: list[dict],
|
||
existing_images: list[dict] | None,
|
||
effective_max: int,
|
||
) -> list[dict]:
|
||
existing_placeholders = {
|
||
item.get("placeholder")
|
||
for item in normalize_image_assets(existing_images)
|
||
if item.get("placeholder")
|
||
}
|
||
existing_count_in_content = sum(
|
||
1 for item in placeholders if item.get("placeholder") in existing_placeholders
|
||
)
|
||
remaining_new_slots = max(0, effective_max - existing_count_in_content)
|
||
|
||
selected: list[dict] = []
|
||
for item in placeholders:
|
||
if item.get("placeholder") in existing_placeholders:
|
||
selected.append(item)
|
||
continue
|
||
if remaining_new_slots <= 0:
|
||
continue
|
||
selected.append(item)
|
||
remaining_new_slots -= 1
|
||
|
||
return [{**item, "index": index} for index, item in enumerate(selected)]
|
||
|
||
|
||
def _save_narrative_to_sections(db: Session, chapter, narrative: str, title: str, category: str, order_index: int, source_segments: list, user_id: str):
|
||
"""
|
||
将带占位符的 narrative 拆成 chapter_sections 并写入;为每段占位符创建 pending 配图。
|
||
已有 section 与图片不删除,仅追加新内容。若无封面 MemoirImage 则创建 pending 封面(section_id=None)。
|
||
chapter 可为已有章节或 None(会新建)。返回 chapter。
|
||
"""
|
||
now_iso = datetime.now(timezone.utc).isoformat()
|
||
if chapter is None:
|
||
chapter = Chapter(
|
||
id=str(uuid.uuid4()),
|
||
user_id=user_id,
|
||
title=title,
|
||
order_index=order_index,
|
||
status="completed",
|
||
category=category,
|
||
cover_image=None,
|
||
is_new=True,
|
||
source_segments=source_segments or [],
|
||
)
|
||
db.add(chapter)
|
||
db.flush()
|
||
|
||
# 已有 sections 不删除,只追加新内容
|
||
existing_sections = (
|
||
db.execute(
|
||
select(ChapterSection)
|
||
.where(ChapterSection.chapter_id == chapter.id)
|
||
.order_by(ChapterSection.order_index)
|
||
)
|
||
.scalars().all()
|
||
)
|
||
if existing_sections:
|
||
existing_content = "\n\n".join(
|
||
(s.content or "").strip() for s in existing_sections if (s.content or "").strip()
|
||
)
|
||
if existing_content and narrative.startswith(existing_content):
|
||
new_part = narrative[len(existing_content):].lstrip()
|
||
else:
|
||
new_part = (narrative or "").strip()
|
||
if not new_part:
|
||
chapter.title = title
|
||
chapter.is_new = True
|
||
chapter.source_segments = list(set((chapter.source_segments or []) + (source_segments or [])))
|
||
return chapter
|
||
narrative_to_parse = new_part
|
||
order_base = max(s.order_index for s in existing_sections) + 1
|
||
else:
|
||
narrative_to_parse = (narrative or "").strip()
|
||
order_base = 0
|
||
|
||
img_settings = MemoirImageSettings.from_env()
|
||
prompt_service = MemoirImagePromptService(llm=None, settings=img_settings) if img_settings.enabled else None
|
||
|
||
segments = parse_narrative_to_sections(narrative_to_parse)
|
||
if not segments:
|
||
sec = ChapterSection(
|
||
id=str(uuid.uuid4()),
|
||
chapter_id=chapter.id,
|
||
order_index=order_base,
|
||
content=(narrative_to_parse or "").strip() or "",
|
||
image_id=None,
|
||
)
|
||
db.add(sec)
|
||
db.flush()
|
||
if img_settings.enabled:
|
||
stmt_cover = (
|
||
select(MemoirImage)
|
||
.where(
|
||
MemoirImage.chapter_id == chapter.id,
|
||
MemoirImage.section_id.is_(None),
|
||
)
|
||
)
|
||
if not db.execute(stmt_cover).scalar_one_or_none():
|
||
cover_ph = {
|
||
"placeholder": "{{{{{{{{IMAGE:章节封面}}}}}}}}",
|
||
"description": "章节封面",
|
||
"index": 0,
|
||
}
|
||
cover_asset = build_initial_image_assets(
|
||
[cover_ph],
|
||
img_settings.provider,
|
||
prompt_service.CATEGORY_STYLE_MAP.get(category, img_settings.default_style) if prompt_service else img_settings.default_style,
|
||
img_settings.default_size,
|
||
now_iso,
|
||
)[0]
|
||
cover_mi = _memoir_image_from_asset(chapter.id, None, 0, cover_asset)
|
||
db.add(cover_mi)
|
||
db.flush()
|
||
chapter.title = title
|
||
chapter.is_new = True
|
||
chapter.source_segments = list(set((chapter.source_segments or []) + (source_segments or [])))
|
||
return chapter
|
||
|
||
def _should_have_image(seg: dict, order_idx: int) -> bool:
|
||
"""有 placeholder_info 的段落配图;无则兼容旧格式(每 3 段 1 图)"""
|
||
ph = seg.get("placeholder_info")
|
||
if ph and ph.get("description"):
|
||
return True
|
||
return (order_idx % 3) == 2
|
||
|
||
def _placeholder_for_segment(seg: dict, order_idx: int) -> dict | None:
|
||
ph = seg.get("placeholder_info")
|
||
if ph and ph.get("placeholder") and ph.get("description"):
|
||
return ph
|
||
content = (seg.get("content") or "").strip()
|
||
desc = (content[:50] + "…") if len(content) > 50 else (content or "章节配图")
|
||
return {"placeholder": f"{{{{{{{{IMAGE:{desc}}}}}}}}}", "description": desc}
|
||
|
||
# 按顺序创建 section,每 3 个 section 对应 1 张配图
|
||
for i, seg in enumerate(segments):
|
||
order_idx = order_base + i
|
||
content = (seg.get("content") or "").strip()
|
||
image_asset = None
|
||
if img_settings.enabled and _should_have_image(seg, order_idx):
|
||
ph = _placeholder_for_segment(seg, order_idx)
|
||
style = prompt_service.CATEGORY_STYLE_MAP.get(category, img_settings.default_style) if prompt_service else img_settings.default_style
|
||
image_asset = build_initial_image_assets(
|
||
[ph],
|
||
img_settings.provider,
|
||
style,
|
||
img_settings.default_size,
|
||
now_iso,
|
||
)[0]
|
||
|
||
sec = ChapterSection(
|
||
id=str(uuid.uuid4()),
|
||
chapter_id=chapter.id,
|
||
order_index=order_idx,
|
||
content=content,
|
||
image_id=None,
|
||
)
|
||
db.add(sec)
|
||
db.flush()
|
||
if image_asset:
|
||
# 本段配图与当前 section 绑定,memoir_images.order_index = section.order_index + 1(封面 0 预留)
|
||
mi = _memoir_image_from_asset(chapter.id, sec.id, order_idx + 1, image_asset)
|
||
db.add(mi)
|
||
db.flush()
|
||
sec.image_id = mi.id
|
||
db.flush()
|
||
|
||
# 封面图:若无则创建 pending MemoirImage(section_id=None, order_index=0)
|
||
if img_settings.enabled:
|
||
stmt_cover = (
|
||
select(MemoirImage)
|
||
.where(
|
||
MemoirImage.chapter_id == chapter.id,
|
||
MemoirImage.section_id.is_(None),
|
||
)
|
||
)
|
||
existing_cover = db.execute(stmt_cover).scalar_one_or_none()
|
||
if not existing_cover:
|
||
cover_ph = {
|
||
"placeholder": "{{{{{{{{IMAGE:章节封面}}}}}}}}",
|
||
"description": "章节封面",
|
||
"index": 0,
|
||
}
|
||
cover_asset = build_initial_image_assets(
|
||
[cover_ph],
|
||
img_settings.provider,
|
||
prompt_service.CATEGORY_STYLE_MAP.get(category, img_settings.default_style) if prompt_service else img_settings.default_style,
|
||
img_settings.default_size,
|
||
now_iso,
|
||
)[0]
|
||
cover_mi = _memoir_image_from_asset(chapter.id, None, 0, cover_asset)
|
||
db.add(cover_mi)
|
||
db.flush()
|
||
|
||
chapter.title = title
|
||
chapter.is_new = True
|
||
chapter.source_segments = list(set((chapter.source_segments or []) + (source_segments or [])))
|
||
return chapter
|
||
|
||
|
||
def initialize_chapter_images(_chapter):
|
||
"""
|
||
兼容旧调用:若章节已改为 sections 存储,则图片初始化已在 _save_narrative_to_sections 中完成,直接返回。
|
||
"""
|
||
logger.info("initialize_chapter_images: 已由 _save_narrative_to_sections 处理 section 配图,跳过")
|
||
return []
|
||
|
||
|
||
def _normalize_image_bytes_for_storage(image_bytes: bytes) -> bytes:
|
||
with Image.open(BytesIO(image_bytes)) as image:
|
||
output = BytesIO()
|
||
if image.mode in {"RGBA", "LA"}:
|
||
normalized = image
|
||
elif image.mode == "P":
|
||
normalized = image.convert("RGBA")
|
||
else:
|
||
normalized = image.convert("RGB")
|
||
normalized.save(output, format="PNG")
|
||
return output.getvalue()
|
||
|
||
|
||
def _coerce_state(model: MemoirState) -> MemoirStateSchema:
|
||
"""将数据库模型转换为 Schema"""
|
||
return MemoirStateSchema.model_validate(
|
||
{
|
||
"stage_order": model.stage_order or default_state().stage_order,
|
||
"current_stage": model.current_stage,
|
||
"covered_stages": model.covered_stages or [],
|
||
"slots": model.slots if isinstance(model.slots, dict) else default_state().slots,
|
||
}
|
||
)
|
||
|
||
|
||
def _get_or_create_state_sync(user_id: str, db: Session) -> MemoirStateSchema:
|
||
"""同步获取或创建状态"""
|
||
stmt = select(MemoirState).where(MemoirState.user_id == user_id)
|
||
result = db.execute(stmt)
|
||
state = result.scalar_one_or_none()
|
||
if state:
|
||
return _coerce_state(state)
|
||
|
||
default = default_state()
|
||
state = MemoirState(
|
||
id=str(uuid.uuid4()),
|
||
user_id=user_id,
|
||
stage_order=default.stage_order,
|
||
current_stage=default.current_stage,
|
||
covered_stages=default.covered_stages,
|
||
slots={k: {sk: sv.model_dump() for sk, sv in v.items()} for k, v in default.slots.items()},
|
||
)
|
||
db.add(state)
|
||
db.commit()
|
||
db.refresh(state)
|
||
return _coerce_state(state)
|
||
|
||
|
||
def _update_slot_sync(
|
||
user_id: str,
|
||
stage: str,
|
||
slot_name: str,
|
||
snippet: str,
|
||
segment_ids: List[str],
|
||
db: Session,
|
||
) -> MemoirStateSchema:
|
||
"""同步更新 slot"""
|
||
stmt = select(MemoirState).where(MemoirState.user_id == user_id)
|
||
result = db.execute(stmt)
|
||
state = result.scalar_one_or_none()
|
||
if not state:
|
||
_get_or_create_state_sync(user_id, db)
|
||
result = db.execute(stmt)
|
||
state = result.scalar_one()
|
||
|
||
slots: Dict[str, Dict] = state.slots or {}
|
||
stage_slots = slots.get(stage, {})
|
||
existing = stage_slots.get(slot_name, {})
|
||
|
||
merged_segment_ids = list({*(existing.get("segment_ids") or []), *segment_ids})
|
||
stage_slots[slot_name] = SlotData(snippet=snippet, segment_ids=merged_segment_ids).model_dump()
|
||
slots[stage] = stage_slots
|
||
state.slots = slots
|
||
state.current_stage = state.current_stage or stage
|
||
db.commit()
|
||
db.refresh(state)
|
||
return _coerce_state(state)
|
||
|
||
|
||
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
|
||
def process_memoir_segments(self, user_id: str, segment_ids: List[str]):
|
||
"""
|
||
处理回忆录段落的 Celery 任务
|
||
|
||
Args:
|
||
user_id: 用户 ID
|
||
segment_ids: 段落 ID 列表
|
||
"""
|
||
task_id = self.request.id
|
||
logger.info(f"开始处理回忆录段落: user_id={user_id}, task_id={task_id}, segments={len(segment_ids)}")
|
||
|
||
# 更新任务状态为 running
|
||
_update_task_status_sync(user_id, task_id, "running")
|
||
|
||
try:
|
||
with get_sync_db() as db:
|
||
chapters_to_enqueue: set[str] = set()
|
||
# 获取段落
|
||
stmt = select(Segment).where(Segment.id.in_(segment_ids))
|
||
result = db.execute(stmt)
|
||
segments = result.scalars().all()
|
||
|
||
if not segments:
|
||
logger.warning(f"未找到段落: {segment_ids}")
|
||
return {"status": "no_segments"}
|
||
|
||
# 获取用户状态和资料
|
||
state = _get_or_create_state_sync(user_id, db)
|
||
llm = _get_llm()
|
||
image_settings = MemoirImageSettings.from_env()
|
||
|
||
user_obj = db.get(User, user_id)
|
||
user_profile = ""
|
||
user_birth_year = None
|
||
if user_obj:
|
||
user_birth_year = user_obj.birth_year
|
||
user_profile = format_user_profile_context(
|
||
birth_year=user_obj.birth_year,
|
||
birth_place=user_obj.birth_place,
|
||
grew_up_place=user_obj.grew_up_place,
|
||
occupation=user_obj.occupation,
|
||
)
|
||
|
||
narrative_agent = NarrativeAgent()
|
||
|
||
def _process_category(
|
||
chapter_category: str,
|
||
category_segments: List,
|
||
state: MemoirStateSchema,
|
||
profile: str,
|
||
birth_year,
|
||
llm,
|
||
):
|
||
"""单章节处理:NarrativeAgent 生成标题+叙事,PlaceholderInjectAgent 注入,持久化"""
|
||
segment_texts = [seg.transcript_text or "" for seg in category_segments]
|
||
combined_text = "\n\n".join(segment_texts)
|
||
source_ids = [seg.id for seg in category_segments]
|
||
|
||
stmt_chapter = (
|
||
select(Chapter)
|
||
.where(
|
||
Chapter.user_id == user_id,
|
||
Chapter.category == chapter_category,
|
||
Chapter.is_active == True,
|
||
)
|
||
.options(
|
||
joinedload(Chapter.sections).joinedload(ChapterSection.image_record),
|
||
joinedload(Chapter.images),
|
||
)
|
||
)
|
||
result_chapter = db.execute(stmt_chapter)
|
||
chapter = result_chapter.unique().scalar_one_or_none()
|
||
|
||
slot_snippets = {}
|
||
stage_slots = state.slots.get(chapter_category, {}) or {}
|
||
for key, value in stage_slots.items():
|
||
snip = getattr(value, "snippet", None) or (value.get("snippet") if isinstance(value, dict) else None)
|
||
if snip:
|
||
slot_snippets[key] = snip
|
||
|
||
title = chapter.title if chapter else f"{chapter_category} 回忆"
|
||
existing_content = ""
|
||
if chapter and getattr(chapter, "sections", None):
|
||
existing_content = "\n\n".join(
|
||
s.content for s in sorted(chapter.sections, key=lambda x: x.order_index) if (s.content or "").strip()
|
||
)
|
||
narrative = combined_text
|
||
|
||
if not chapter:
|
||
title = narrative_agent.generate_title(
|
||
stage=chapter_category,
|
||
emotion="neutral",
|
||
slots=slot_snippets,
|
||
user_profile=profile,
|
||
birth_year=birth_year,
|
||
llm=llm,
|
||
)
|
||
new_narrative = narrative_agent.generate_narrative(
|
||
stage=chapter_category,
|
||
slots=slot_snippets,
|
||
new_content=combined_text,
|
||
existing_content=existing_content,
|
||
user_profile=profile,
|
||
birth_year=birth_year,
|
||
llm=llm,
|
||
)
|
||
if _is_json_narrative(new_narrative):
|
||
narrative = new_narrative
|
||
elif existing_content:
|
||
narrative = f"{existing_content}\n\n{new_narrative}"
|
||
else:
|
||
narrative = new_narrative
|
||
|
||
if existing_content and not _is_json_narrative(narrative) and len(narrative) < len(existing_content) * 0.8:
|
||
logger.warning(
|
||
"内容长度异常: existing=%d, new=%d, category=%s. 回退为追加模式",
|
||
len(existing_content),
|
||
len(narrative),
|
||
chapter_category,
|
||
)
|
||
narrative = f"{existing_content}\n\n{combined_text}"
|
||
|
||
if not _is_json_narrative(narrative):
|
||
narrative = inject_placeholders(narrative)
|
||
calculated_order_index = STAGE_TO_ORDER.get(chapter_category, 999)
|
||
|
||
chapter = _save_narrative_to_sections(
|
||
db,
|
||
chapter,
|
||
narrative,
|
||
title=title,
|
||
category=chapter_category,
|
||
order_index=calculated_order_index,
|
||
source_segments=source_ids,
|
||
user_id=user_id,
|
||
)
|
||
db.flush()
|
||
db.refresh(chapter)
|
||
|
||
has_images = image_settings.enabled and (
|
||
_chapter_has_any_section_images_to_generate(chapter)
|
||
or _chapter_has_cover_to_generate(chapter)
|
||
)
|
||
|
||
stmt_book = select(Book).where(Book.user_id == user_id).order_by(Book.updated_at.desc())
|
||
result_book = db.execute(stmt_book)
|
||
book = result_book.scalar_one_or_none()
|
||
if not book:
|
||
book = Book(
|
||
id=str(uuid.uuid4()),
|
||
user_id=user_id,
|
||
title="我的回忆录",
|
||
total_pages=0,
|
||
total_words=0,
|
||
cover_image_url=None,
|
||
)
|
||
db.add(book)
|
||
book.has_update = True
|
||
book.last_update_chapter_id = chapter.id
|
||
|
||
return chapter, has_images
|
||
|
||
def _raise_retry():
|
||
raise self.retry(countdown=10)
|
||
|
||
memoir_orchestrator = MemoirOrchestrator()
|
||
chapters_to_enqueue, _ = memoir_orchestrator.run(
|
||
segments=segments,
|
||
llm=llm,
|
||
user_profile=user_profile,
|
||
user_birth_year=user_birth_year,
|
||
get_or_create_state=lambda: _get_or_create_state_sync(user_id, db),
|
||
update_slot=lambda stage, slot_name, snippet, seg_ids: _update_slot_sync(
|
||
user_id, stage, slot_name, snippet, seg_ids, db
|
||
),
|
||
acquire_lock=lambda stage: _acquire_chapter_lock(user_id, stage),
|
||
release_lock=lambda stage: _release_chapter_lock(user_id, stage),
|
||
process_category=_process_category,
|
||
raise_retry=_raise_retry,
|
||
)
|
||
|
||
# 标记段落为已处理
|
||
for seg in segments:
|
||
seg.processed = True
|
||
|
||
db.commit()
|
||
|
||
for chapter_id in sorted(chapters_to_enqueue):
|
||
try:
|
||
logger.info(f"派发章节补图任务: chapter={chapter_id}")
|
||
generate_chapter_images.delay(chapter_id)
|
||
except Exception as exc:
|
||
logger.warning(f"补图任务派发失败: chapter={chapter_id}, error={exc}")
|
||
|
||
logger.info(f"回忆录处理完成: user_id={user_id}, task_id={task_id}")
|
||
|
||
# 更新任务状态为成功
|
||
_update_task_status_sync(user_id, task_id, "success", {"processed": len(segments)})
|
||
|
||
return {"status": "success", "processed": len(segments)}
|
||
|
||
except Exception as e:
|
||
logger.error(f"回忆录处理失败: {e}")
|
||
|
||
# 更新任务状态为失败
|
||
_update_task_status_sync(user_id, task_id, "failure", {"error": str(e)})
|
||
|
||
# 重试
|
||
raise self.retry(exc=e)
|
||
|
||
|
||
@shared_task(bind=True, max_retries=3, default_retry_delay=30)
|
||
def generate_chapter_content(self, user_id: str, stage: str, new_content: str):
|
||
"""
|
||
单独生成章节内容的任务(用于实时更新)
|
||
|
||
Args:
|
||
user_id: 用户 ID
|
||
stage: 阶段
|
||
new_content: 新内容
|
||
"""
|
||
logger.info(f"生成章节内容: user_id={user_id}, stage={stage}")
|
||
|
||
try:
|
||
with get_sync_db() as db:
|
||
llm = _get_llm()
|
||
|
||
# 查找 active 章节并预加载 sections
|
||
stmt = (
|
||
select(Chapter)
|
||
.where(
|
||
Chapter.user_id == user_id,
|
||
Chapter.category == stage,
|
||
Chapter.is_active == True,
|
||
)
|
||
.options(joinedload(Chapter.sections))
|
||
)
|
||
result = db.execute(stmt)
|
||
chapter = result.unique().scalar_one_or_none()
|
||
existing_content = ""
|
||
if chapter and getattr(chapter, "sections", None):
|
||
existing_content = "\n\n".join(
|
||
s.content for s in sorted(chapter.sections, key=lambda x: x.order_index) if (s.content or "").strip()
|
||
)
|
||
|
||
if llm:
|
||
prompt = get_narrative_json_prompt(
|
||
stage=stage,
|
||
slots={},
|
||
new_content=new_content,
|
||
existing_content=existing_content,
|
||
)
|
||
json_llm = llm.bind(
|
||
model_kwargs={"response_format": {"type": "json_object"}},
|
||
max_tokens=4096,
|
||
)
|
||
response = json_llm.invoke(prompt)
|
||
new_narrative = response.content.strip()
|
||
if _is_json_narrative(new_narrative):
|
||
narrative = new_narrative
|
||
elif existing_content:
|
||
narrative = f"{existing_content}\n\n{new_narrative}"
|
||
else:
|
||
narrative = new_narrative
|
||
else:
|
||
narrative = f"{existing_content}\n\n{new_content}" if existing_content else new_content
|
||
|
||
# 安全检查:新内容不应比旧内容短(仅非 JSON 格式)
|
||
if existing_content and not _is_json_narrative(narrative) and len(narrative) < len(existing_content) * 0.8:
|
||
logger.warning(
|
||
f"内容长度异常: existing={len(existing_content)}, "
|
||
f"new={len(narrative)}, stage={stage}. 回退为追加模式"
|
||
)
|
||
narrative = f"{existing_content}\n\n{new_content}"
|
||
|
||
if not _is_json_narrative(narrative):
|
||
narrative = inject_image_placeholder_template(narrative)
|
||
calculated_order_index = STAGE_TO_ORDER.get(stage, 999)
|
||
title = chapter.title if chapter else f"{stage} 回忆"
|
||
chapter = _save_narrative_to_sections(
|
||
db,
|
||
chapter,
|
||
narrative,
|
||
title=title,
|
||
category=stage,
|
||
order_index=calculated_order_index,
|
||
source_segments=[],
|
||
user_id=user_id,
|
||
)
|
||
db.commit()
|
||
db.refresh(chapter)
|
||
image_settings = MemoirImageSettings.from_env()
|
||
if image_settings.enabled and chapter and (
|
||
_chapter_has_any_section_images_to_generate(chapter)
|
||
or _chapter_has_cover_to_generate(chapter)
|
||
):
|
||
try:
|
||
generate_chapter_images.delay(chapter.id)
|
||
except Exception as exc:
|
||
logger.warning("补图任务派发失败: chapter=%s, error=%s", chapter.id, exc)
|
||
return {"status": "success"}
|
||
|
||
except Exception as e:
|
||
logger.error(f"章节生成失败: {e}")
|
||
raise self.retry(exc=e)
|
||
|
||
|
||
def build_cos_key(user_id: str, chapter_id: str, index: int | str, prompt: str) -> str:
|
||
short_hash = hashlib.sha1(prompt.encode("utf-8")).hexdigest()[:10]
|
||
index_part = "cover" if index in (-1, "cover") else str(index)
|
||
return f"memoirs/{user_id}/{chapter_id}/{index_part}-{short_hash}.png"
|
||
|
||
|
||
@shared_task(bind=True, max_retries=3, default_retry_delay=30)
|
||
def generate_chapter_images(self, chapter_id: str):
|
||
"""Async task to generate images for a chapter's cover and sections (each section has at most one image)."""
|
||
lock_acquired = False
|
||
provider = None
|
||
with get_sync_db() as db:
|
||
try:
|
||
stmt = (
|
||
select(Chapter)
|
||
.where(Chapter.id == chapter_id)
|
||
.options(
|
||
joinedload(Chapter.sections).joinedload(ChapterSection.image_record),
|
||
joinedload(Chapter.images),
|
||
)
|
||
)
|
||
chapter = db.execute(stmt).unique().scalar_one_or_none()
|
||
if not chapter:
|
||
logger.info("章节补图跳过: chapter=%s, reason=not_found", chapter_id)
|
||
return {"status": "no_chapter"}
|
||
sections = getattr(chapter, "sections", None) or []
|
||
sections_with_pending = [
|
||
(idx, s) for idx, s in enumerate(sections) if _section_has_image_to_generate(s)
|
||
]
|
||
cover_rec = _get_cover_memoir_image(chapter)
|
||
cover_to_generate = (
|
||
cover_rec
|
||
if cover_rec
|
||
and (getattr(cover_rec, "status") or "").strip()
|
||
in (IMAGE_STATUS_PENDING, IMAGE_STATUS_FAILED)
|
||
else None
|
||
)
|
||
if not sections_with_pending and not cover_to_generate:
|
||
logger.info("章节补图跳过: chapter=%s, reason=no_pending_images", chapter_id)
|
||
return {"status": "no_images"}
|
||
|
||
settings = MemoirImageSettings.from_env()
|
||
if not settings.enabled:
|
||
logger.info("章节补图跳过: chapter=%s, reason=disabled", chapter_id)
|
||
return {"status": "disabled"}
|
||
|
||
lock_acquired = _acquire_chapter_image_lock(chapter_id)
|
||
if not lock_acquired:
|
||
logger.info("章节补图跳过: chapter=%s, reason=locked", chapter_id)
|
||
return {"status": "locked"}
|
||
|
||
prompt_orchestrator = ImagePromptOrchestrator(_get_llm(), settings)
|
||
image_generator = get_image_generator()
|
||
storage = TencentCosStorageService.from_env()
|
||
logger.info(
|
||
"章节补图开始: chapter=%s, pending_sections=%d, cover=%s",
|
||
chapter_id,
|
||
len(sections_with_pending),
|
||
bool(cover_to_generate),
|
||
)
|
||
retryable_failures: list[str] = []
|
||
permanent_failures: list[str] = []
|
||
|
||
def _apply_item_to_memoir_image(rec: MemoirImage, d: dict):
|
||
rec.placeholder = d.get("placeholder")
|
||
rec.description = d.get("description")
|
||
rec.status = (d.get("status") or "pending").strip() or "pending"
|
||
rec.prompt = d.get("prompt")
|
||
rec.url = d.get("url")
|
||
rec.storage_key = d.get("storage_key")
|
||
rec.provider = d.get("provider")
|
||
rec.style = d.get("style")
|
||
rec.size = d.get("size")
|
||
rec.error = d.get("error")
|
||
rec.retryable = d.get("retryable")
|
||
rec.updated_at = datetime.now(timezone.utc)
|
||
|
||
# 先处理封面图
|
||
if cover_to_generate:
|
||
current_item = memoir_image_to_dict(cover_to_generate) or {}
|
||
current_item.setdefault("placeholder", "")
|
||
current_item.setdefault("description", "")
|
||
current_item["status"] = IMAGE_STATUS_PROCESSING
|
||
current_item["updated_at"] = datetime.now(timezone.utc).isoformat()
|
||
_apply_item_to_memoir_image(cover_to_generate, current_item)
|
||
db.commit()
|
||
try:
|
||
sections_ordered = sorted(sections, key=lambda s: getattr(s, "order_index", 0))
|
||
first_content = (sections_ordered[0].content or "").strip() if sections_ordered else ""
|
||
context_excerpt = " ".join(first_content.split("\n")[:5])[:200]
|
||
prompt_data = prompt_orchestrator.build_cover_prompt(
|
||
chapter_title=chapter.title,
|
||
chapter_category=chapter.category or "",
|
||
context_excerpt=context_excerpt,
|
||
)
|
||
result = image_generator.generate(
|
||
prompt_data["prompt"],
|
||
prompt_data["size"],
|
||
prompt_data["style"],
|
||
)
|
||
if result.status != TaskStatus.COMPLETED or not result.image_url:
|
||
raise RuntimeError(result.error or "Image generation failed")
|
||
image_bytes = _normalize_image_bytes_for_storage(
|
||
image_generator.download_image(result.image_url)
|
||
)
|
||
key = build_cos_key(chapter.user_id, chapter.id, "cover", prompt_data["prompt"])
|
||
current_item["storage_key"] = key
|
||
current_item["url"] = storage.upload_bytes(image_bytes, key, "image/png")
|
||
current_item["prompt"] = prompt_data["prompt"]
|
||
current_item["style"] = prompt_data["style"]
|
||
current_item["size"] = prompt_data["size"]
|
||
current_item["status"] = IMAGE_STATUS_COMPLETED
|
||
current_item["error"] = None
|
||
current_item["retryable"] = None
|
||
current_item["updated_at"] = datetime.now(timezone.utc).isoformat()
|
||
_apply_item_to_memoir_image(cover_to_generate, current_item)
|
||
db.commit()
|
||
logger.info(
|
||
"章节封面图生成成功: chapter=%s, url=%s",
|
||
chapter_id,
|
||
current_item["url"],
|
||
)
|
||
except Exception as exc:
|
||
failure_msg = f"cover, error={exc}"
|
||
if isinstance(exc, CosUploadError) and not exc.retryable:
|
||
permanent_failures.append(failure_msg)
|
||
logger.error("封面图上传不可重试,清理: chapter=%s, %s", chapter_id, failure_msg)
|
||
db.delete(cover_to_generate)
|
||
db.commit()
|
||
else:
|
||
current_item = memoir_image_to_dict(cover_to_generate) or {}
|
||
current_item["status"] = IMAGE_STATUS_FAILED
|
||
current_item["error"] = str(exc)
|
||
current_item["retryable"] = True
|
||
current_item["updated_at"] = datetime.now(timezone.utc).isoformat()
|
||
retryable_failures.append(failure_msg)
|
||
logger.warning("封面图生成失败(可重试): chapter=%s, %s", chapter_id, failure_msg)
|
||
_apply_item_to_memoir_image(cover_to_generate, current_item)
|
||
db.commit()
|
||
|
||
for sec_index, section in sections_with_pending:
|
||
item = memoir_image_to_dict(section.image_record) if section.image_record else {}
|
||
current_item = dict(item) if item else {}
|
||
current_item.setdefault("placeholder", "")
|
||
current_item.setdefault("description", "")
|
||
current_item["status"] = IMAGE_STATUS_PROCESSING
|
||
current_item["updated_at"] = datetime.now(timezone.utc).isoformat()
|
||
_apply_item_to_memoir_image(section.image_record, current_item)
|
||
db.commit()
|
||
|
||
try:
|
||
context_lines = (section.content or "").strip().split("\n")[:5]
|
||
context_excerpt = " ".join(context_lines)[:200]
|
||
prompt_data = prompt_orchestrator.build_prompt(
|
||
chapter_title=chapter.title,
|
||
chapter_category=chapter.category or "",
|
||
description=current_item.get("description", ""),
|
||
context_excerpt=context_excerpt,
|
||
)
|
||
result = image_generator.generate(
|
||
prompt_data["prompt"],
|
||
prompt_data["size"],
|
||
prompt_data["style"],
|
||
)
|
||
if result.status != TaskStatus.COMPLETED or not result.image_url:
|
||
raise RuntimeError(result.error or "Image generation failed")
|
||
image_bytes = _normalize_image_bytes_for_storage(
|
||
image_generator.download_image(result.image_url)
|
||
)
|
||
key = build_cos_key(chapter.user_id, chapter.id, sec_index, prompt_data["prompt"])
|
||
current_item["storage_key"] = key
|
||
current_item["url"] = storage.upload_bytes(image_bytes, key, "image/png")
|
||
current_item["prompt"] = prompt_data["prompt"]
|
||
current_item["style"] = prompt_data["style"]
|
||
current_item["size"] = prompt_data["size"]
|
||
current_item["status"] = IMAGE_STATUS_COMPLETED
|
||
current_item["error"] = None
|
||
current_item["retryable"] = None
|
||
current_item["updated_at"] = datetime.now(timezone.utc).isoformat()
|
||
_apply_item_to_memoir_image(section.image_record, current_item)
|
||
db.commit()
|
||
logger.info(
|
||
"章节补图成功: chapter=%s, section_index=%s, url=%s",
|
||
chapter_id,
|
||
sec_index,
|
||
current_item["url"],
|
||
)
|
||
except Exception as exc:
|
||
failure_msg = f"section_index={sec_index}, error={exc}"
|
||
if isinstance(exc, CosUploadError) and not exc.retryable:
|
||
permanent_failures.append(failure_msg)
|
||
logger.error("图片上传不可重试,清理配图: chapter=%s, %s", chapter_id, failure_msg)
|
||
mi = section.image_record
|
||
section.image_id = None
|
||
if mi:
|
||
db.delete(mi)
|
||
db.commit()
|
||
else:
|
||
current_item["status"] = IMAGE_STATUS_FAILED
|
||
current_item["error"] = str(exc)
|
||
current_item["retryable"] = True
|
||
retryable_failures.append(failure_msg)
|
||
logger.warning("图片生成失败(可重试): chapter=%s, %s", chapter_id, failure_msg)
|
||
current_item["updated_at"] = datetime.now(timezone.utc).isoformat()
|
||
_apply_item_to_memoir_image(section.image_record, current_item)
|
||
db.commit()
|
||
|
||
if retryable_failures:
|
||
raise RuntimeError(
|
||
f"章节补图存在可重试失败项: chapter={chapter_id}, failures={'; '.join(retryable_failures)}"
|
||
)
|
||
return {"status": "success"}
|
||
except Exception as exc:
|
||
logger.error("章节补图任务失败: chapter=%s, error=%s", chapter_id, exc)
|
||
raise self.retry(exc=exc)
|
||
finally:
|
||
if provider:
|
||
provider.close()
|
||
if lock_acquired:
|
||
_release_chapter_image_lock(chapter_id)
|