""" 回忆录处理 Celery 任务 """ import json from app.core.logging import get_logger import uuid from io import BytesIO from typing import Dict, List from datetime import datetime, timezone import redis from celery import shared_task from PIL import Image from sqlalchemy import delete, select from sqlalchemy.orm import Session, joinedload from app.core.db import get_sync_db from app.features.conversation.models import Segment from app.features.memoir.models import ( Book, Chapter, ChapterSection, MemoirImage, MemoirState, ) from app.features.user.models import User from app.core.dependencies import get_llm_provider from app.agents.state_schema import MemoirStateSchema, SlotData, default_state from app.agents.prompts.memory_prompts import ( get_creative_title_prompt, get_narrative_prompt, get_state_extraction_prompt, get_chapter_classification_prompt, inject_image_placeholder_template, STAGE_TO_ORDER, CHAPTER_CATEGORIES, ) from app.agents.prompts.profile_prompts import format_user_profile_context import hashlib from app.features.memoir.memoir_images.parser import ( build_initial_image_assets, parse_image_placeholders, split_narrative_to_sections, ) from app.features.memoir.memoir_images.json_payload import extract_json_payload from app.core.dependencies import get_image_generator from app.features.memoir.memoir_images.prompting import MemoirImagePromptService from app.features.memoir.memoir_images.schema import ( completed_image_assets, IMAGE_STATUS_COMPLETED, IMAGE_STATUS_FAILED, IMAGE_STATUS_PENDING, IMAGE_STATUS_PROCESSING, normalize_image_assets, ) from app.features.memoir.memoir_images.serializers import ( image_dict_to_row_kwargs, memoir_image_to_dict, ) from app.features.memoir.memoir_images.settings import MemoirImageSettings from app.ports.image_gen import TaskStatus from app.features.memoir.memoir_images.storage import ( TencentCosStorageService, CosUploadError, ) logger = get_logger(__name__) _REDIS_CLIENTS: dict[bool, redis.Redis] = {} def _get_llm(): """Celery 任务内获取 LangChain LLM(通过 port)""" try: return getattr(get_llm_provider(), "langchain_llm", None) except Exception: return None def _get_redis_client(*, decode_responses: bool = False) -> redis.Redis: from app.core.config import settings client = _REDIS_CLIENTS.get(decode_responses) if client is None: client = redis.from_url( settings.redis_url, decode_responses=decode_responses, ) _REDIS_CLIENTS[decode_responses] = client return client def _acquire_chapter_lock(user_id: str, stage: str, timeout: int = 120) -> bool: """获取章节分布式锁,防止并发写入同一章节""" r = _get_redis_client() lock_key = f"lock:chapter:{user_id}:{stage}" return r.set(lock_key, "1", nx=True, ex=timeout) def _release_chapter_lock(user_id: str, stage: str): """释放章节分布式锁""" r = _get_redis_client() lock_key = f"lock:chapter:{user_id}:{stage}" r.delete(lock_key) def _acquire_chapter_image_lock(chapter_id: str, timeout: int = 600) -> bool: """获取章节补图分布式锁,避免同一章节重复补图。""" r = _get_redis_client() lock_key = f"lock:chapter-images:{chapter_id}" return r.set(lock_key, "1", nx=True, ex=timeout) def _release_chapter_image_lock(chapter_id: str): """释放章节补图分布式锁。""" r = _get_redis_client() lock_key = f"lock:chapter-images:{chapter_id}" r.delete(lock_key) def _update_task_status_sync(user_id: str, task_id: str, status: str, result: Dict = None): """同步更新任务状态(在 Celery 任务中使用)""" try: r = _get_redis_client(decode_responses=True) key = f"task:user:{user_id}:tasks" # 获取现有任务信息 data = r.hget(key, task_id) if data: task_info = json.loads(data) else: task_info = {"task_id": task_id} task_info["status"] = status task_info["updated_at"] = datetime.now(timezone.utc).isoformat() if result is not None: task_info["result"] = result r.hset(key, task_id, json.dumps(task_info)) r.expire(key, 3600) # 1小时过期 logger.info(f"任务状态已更新: task_id={task_id}, status={status}") except Exception as e: logger.error(f"更新任务状态失败: {e}") def _merge_chapter_image_assets( existing_images: list[dict] | None, placeholders: list[dict], provider: str, style: str, size: str, now_iso: str, ) -> list[dict]: normalized_existing_images = normalize_image_assets(existing_images) existing_by_placeholder = { item.get("placeholder"): dict(item) for item in normalized_existing_images if item.get("placeholder") } merged_assets: list[dict] = [] for item in placeholders: existing = existing_by_placeholder.get(item["placeholder"]) if existing: merged_item = dict(existing) merged_item["index"] = item["index"] merged_item["placeholder"] = item["placeholder"] merged_item["description"] = item["description"] merged_item["provider"] = merged_item.get("provider") or provider merged_item["style"] = merged_item.get("style") or style merged_item["size"] = merged_item.get("size") or size merged_item["created_at"] = merged_item.get("created_at") or now_iso merged_item["updated_at"] = merged_item.get("updated_at") or now_iso if merged_item.get("status") == IMAGE_STATUS_COMPLETED and not ( merged_item.get("storage_key") or merged_item.get("url") ): merged_item["status"] = IMAGE_STATUS_FAILED merged_item["error"] = merged_item.get("error") or "missing image url" else: merged_item = build_initial_image_assets( placeholders=[item], provider=provider, style=style, size=size, now_iso=now_iso, )[0] merged_assets.append(merged_item) return merged_assets def chapter_has_images_to_generate(images: list[dict] | None) -> bool: return any( item.get("status") in {IMAGE_STATUS_PENDING, IMAGE_STATUS_FAILED} for item in normalize_image_assets(images) ) def _memoir_image_from_asset( chapter_id: str, section_id: str | None, order_index: int, image_asset: dict, ) -> MemoirImage: """从单条图片 dict 构建 MemoirImage 行(用于写入 memoir_images 表)。""" kwargs = image_dict_to_row_kwargs(image_asset) return MemoirImage( id=str(uuid.uuid4()).replace("-", "")[:32], chapter_id=chapter_id, section_id=section_id, order_index=order_index, **kwargs, ) def _section_has_image_to_generate(section) -> bool: """章节段落是否有待生成的配图(从 image_record / image_id 关联的 memoir_images 读取)。""" r = getattr(section, "image_record", None) if not r: return False status = (getattr(r, "status") or "").strip() return status in (IMAGE_STATUS_PENDING, IMAGE_STATUS_FAILED) def _chapter_has_any_section_images_to_generate(chapter) -> bool: if not chapter or not getattr(chapter, "sections", None): return False return any(_section_has_image_to_generate(s) for s in chapter.sections) def _select_placeholders_for_effective_max( placeholders: list[dict], existing_images: list[dict] | None, effective_max: int, ) -> list[dict]: existing_placeholders = { item.get("placeholder") for item in normalize_image_assets(existing_images) if item.get("placeholder") } existing_count_in_content = sum( 1 for item in placeholders if item.get("placeholder") in existing_placeholders ) remaining_new_slots = max(0, effective_max - existing_count_in_content) selected: list[dict] = [] for item in placeholders: if item.get("placeholder") in existing_placeholders: selected.append(item) continue if remaining_new_slots <= 0: continue selected.append(item) remaining_new_slots -= 1 return [{**item, "index": index} for index, item in enumerate(selected)] def _save_narrative_to_sections(db: Session, chapter, narrative: str, title: str, category: str, order_index: int, source_segments: list, user_id: str): """ 将带占位符的 narrative 拆成 chapter_sections 并写入;为每段占位符创建 pending 配图。 已有 section 与图片不删除,仅追加新内容。封面图先空着,不自动设置。 chapter 可为已有章节或 None(会新建)。返回 chapter。 """ now_iso = datetime.now(timezone.utc).isoformat() if chapter is None: chapter = Chapter( id=str(uuid.uuid4()), user_id=user_id, title=title, order_index=order_index, status="completed", category=category, cover_image=None, is_new=True, source_segments=source_segments or [], ) db.add(chapter) db.flush() # 已有 sections 不删除,只追加新内容 existing_sections = ( db.execute( select(ChapterSection) .where(ChapterSection.chapter_id == chapter.id) .order_by(ChapterSection.order_index) ) .scalars().all() ) if existing_sections: existing_content = "\n\n".join( (s.content or "").strip() for s in existing_sections if (s.content or "").strip() ) if existing_content and narrative.startswith(existing_content): new_part = narrative[len(existing_content):].lstrip() else: new_part = (narrative or "").strip() if not new_part: chapter.title = title chapter.is_new = True chapter.source_segments = list(set((chapter.source_segments or []) + (source_segments or []))) return chapter narrative_to_parse = new_part order_base = max(s.order_index for s in existing_sections) + 1 else: narrative_to_parse = (narrative or "").strip() order_base = 0 segments = split_narrative_to_sections(narrative_to_parse) if not segments: sec = ChapterSection( id=str(uuid.uuid4()), chapter_id=chapter.id, order_index=order_base, content=(narrative_to_parse or "").strip() or "", image_id=None, ) db.add(sec) db.flush() chapter.title = title chapter.is_new = True chapter.source_segments = list(set((chapter.source_segments or []) + (source_segments or []))) return chapter img_settings = MemoirImageSettings.from_env() prompt_service = MemoirImagePromptService(llm=None, settings=img_settings) if img_settings.enabled else None # 每 3 个 section 对应 1 张图片,其他 section 的 image_id 为空 def _should_have_image(order_idx: int) -> bool: return (order_idx % 3) == 2 def _placeholder_for_segment(seg: dict, order_idx: int) -> dict | None: ph = seg.get("placeholder_info") if ph and ph.get("placeholder") and ph.get("description"): return ph content = (seg.get("content") or "").strip() desc = (content[:50] + "…") if len(content) > 50 else (content or "章节配图") return {"placeholder": f"{{{{{{{{IMAGE:{desc}}}}}}}}}", "description": desc} # 按顺序创建 section,每 3 个 section 对应 1 张配图 for i, seg in enumerate(segments): order_idx = order_base + i content = (seg.get("content") or "").strip() image_asset = None if img_settings.enabled and _should_have_image(order_idx): ph = _placeholder_for_segment(seg, order_idx) style = prompt_service.CATEGORY_STYLE_MAP.get(category, img_settings.default_style) if prompt_service else img_settings.default_style image_asset = build_initial_image_assets( [ph], img_settings.provider, style, img_settings.default_size, now_iso, )[0] sec = ChapterSection( id=str(uuid.uuid4()), chapter_id=chapter.id, order_index=order_idx, content=content, image_id=None, ) db.add(sec) db.flush() if image_asset: # 本段配图与当前 section 绑定,memoir_images.order_index = section.order_index + 1(封面 0 预留) mi = _memoir_image_from_asset(chapter.id, sec.id, order_idx + 1, image_asset) db.add(mi) db.flush() sec.image_id = mi.id db.flush() # 封面图先空着,不自动用首图做封面 chapter.title = title chapter.is_new = True chapter.source_segments = list(set((chapter.source_segments or []) + (source_segments or []))) return chapter def initialize_chapter_images(_chapter): """ 兼容旧调用:若章节已改为 sections 存储,则图片初始化已在 _save_narrative_to_sections 中完成,直接返回。 """ logger.info("initialize_chapter_images: 已由 _save_narrative_to_sections 处理 section 配图,跳过") return [] def _normalize_image_bytes_for_storage(image_bytes: bytes) -> bytes: with Image.open(BytesIO(image_bytes)) as image: output = BytesIO() if image.mode in {"RGBA", "LA"}: normalized = image elif image.mode == "P": normalized = image.convert("RGBA") else: normalized = image.convert("RGB") normalized.save(output, format="PNG") return output.getvalue() STAGE_KEYWORDS = { "childhood": ["童年", "小时候", "出生", "家乡", "小镇"], "education": ["上学", "学校", "老师", "同学", "教育", "大学"], "career": ["工作", "职业", "事业", "公司", "同事", "创业"], "family": ["伴侣", "孩子", "家庭", "家人", "结婚", "父母"], "belief": ["信念", "价值观", "座右铭", "坚持", "原则"], } # 5-stage → 默认 8-category 映射(LLM 分类失败时的兜底) _STAGE_TO_DEFAULT_CATEGORY = { "childhood": "childhood", "education": "education", "career": "career_early", "family": "family", "belief": "beliefs", } def _detect_stage(user_message: str, fallback_stage: str) -> str: """检测消息所属的 5-stage 阶段(用于状态跟踪)""" message = user_message.lower() for stage, keywords in STAGE_KEYWORDS.items(): if any(word in message for word in keywords): return stage return fallback_stage def _classify_chapter_category(text: str, fallback_stage: str, llm=None) -> str | None: """ 将内容分类到 8 个章节类别之一。 优先使用 LLM,失败则按 5-stage 关键词映射到默认类别。 如果 LLM 判定内容无实质回忆录价值,返回 None。 """ if llm: try: prompt = get_chapter_classification_prompt(text) response = llm.invoke(prompt) category = response.content.strip().lower() if category == "none": logger.info(f"LLM 判定内容无回忆录价值,跳过: {text[:80]}...") return None if category in CHAPTER_CATEGORIES: return category except Exception as e: logger.warning(f"LLM 章节分类失败: {e}") stage = _detect_stage(text, fallback_stage) return _STAGE_TO_DEFAULT_CATEGORY.get(stage, _STAGE_TO_DEFAULT_CATEGORY.get(fallback_stage, "childhood")) def _coerce_state(model: MemoirState) -> MemoirStateSchema: """将数据库模型转换为 Schema""" return MemoirStateSchema.model_validate( { "stage_order": model.stage_order or default_state().stage_order, "current_stage": model.current_stage, "covered_stages": model.covered_stages or [], "slots": model.slots if isinstance(model.slots, dict) else default_state().slots, } ) def _get_or_create_state_sync(user_id: str, db: Session) -> MemoirStateSchema: """同步获取或创建状态""" stmt = select(MemoirState).where(MemoirState.user_id == user_id) result = db.execute(stmt) state = result.scalar_one_or_none() if state: return _coerce_state(state) default = default_state() state = MemoirState( id=str(uuid.uuid4()), user_id=user_id, stage_order=default.stage_order, current_stage=default.current_stage, covered_stages=default.covered_stages, slots={k: {sk: sv.model_dump() for sk, sv in v.items()} for k, v in default.slots.items()}, ) db.add(state) db.commit() db.refresh(state) return _coerce_state(state) def _update_slot_sync( user_id: str, stage: str, slot_name: str, snippet: str, segment_ids: List[str], db: Session, ) -> MemoirStateSchema: """同步更新 slot""" stmt = select(MemoirState).where(MemoirState.user_id == user_id) result = db.execute(stmt) state = result.scalar_one_or_none() if not state: _get_or_create_state_sync(user_id, db) result = db.execute(stmt) state = result.scalar_one() slots: Dict[str, Dict] = state.slots or {} stage_slots = slots.get(stage, {}) existing = stage_slots.get(slot_name, {}) merged_segment_ids = list({*(existing.get("segment_ids") or []), *segment_ids}) stage_slots[slot_name] = SlotData(snippet=snippet, segment_ids=merged_segment_ids).model_dump() slots[stage] = stage_slots state.slots = slots state.current_stage = state.current_stage or stage db.commit() db.refresh(state) return _coerce_state(state) @shared_task(bind=True, max_retries=3, default_retry_delay=60) def process_memoir_segments(self, user_id: str, segment_ids: List[str]): """ 处理回忆录段落的 Celery 任务 Args: user_id: 用户 ID segment_ids: 段落 ID 列表 """ task_id = self.request.id logger.info(f"开始处理回忆录段落: user_id={user_id}, task_id={task_id}, segments={len(segment_ids)}") # 更新任务状态为 running _update_task_status_sync(user_id, task_id, "running") try: with get_sync_db() as db: chapters_to_enqueue: set[str] = set() # 获取段落 stmt = select(Segment).where(Segment.id.in_(segment_ids)) result = db.execute(stmt) segments = result.scalars().all() if not segments: logger.warning(f"未找到段落: {segment_ids}") return {"status": "no_segments"} # 获取用户状态和资料 state = _get_or_create_state_sync(user_id, db) llm = _get_llm() image_settings = MemoirImageSettings.from_env() user_obj = db.get(User, user_id) user_profile = "" user_birth_year = None if user_obj: user_birth_year = user_obj.birth_year user_profile = format_user_profile_context( birth_year=user_obj.birth_year, birth_place=user_obj.birth_place, grew_up_place=user_obj.grew_up_place, occupation=user_obj.occupation, ) # 分两步处理: # 1) 5-stage 状态跟踪(slots) # 2) 8-category 章节分类(chapter creation) category_to_segments: Dict[str, List[Segment]] = {} for segment in segments: text = segment.transcript_text detected_stage = _detect_stage(text, state.current_stage) # 提取 slots(5-stage 状态跟踪) extracted_slots = {} if llm: try: prompt = get_state_extraction_prompt( user_message=text, current_stage=state.current_stage, stage_slots=state.slots.get(detected_stage, {}), ) response = llm.invoke(prompt) parsed = json.loads(extract_json_payload(response.content)) detected_stage = parsed.get("detected_stage", detected_stage) extracted_slots = parsed.get("slots", {}) or {} except (json.JSONDecodeError, Exception) as e: logger.warning(f"LLM 解析失败: {e}") for slot_name, snippet in extracted_slots.items(): state = _update_slot_sync( user_id=user_id, stage=detected_stage, slot_name=slot_name, snippet=snippet, segment_ids=[segment.id], db=db, ) # 8-category 章节分类 chapter_category = _classify_chapter_category(text, detected_stage, llm) if chapter_category is None: logger.info(f"段落无回忆录价值,跳过: segment_id={segment.id}") continue category_to_segments.setdefault(chapter_category, []).append(segment) # 按 8 分类生成章节内容 for chapter_category, category_segments in category_to_segments.items(): if not _acquire_chapter_lock(user_id, chapter_category): logger.warning(f"章节锁竞争: user={user_id}, category={chapter_category}, 延迟重试") raise self.retry(countdown=10) try: segment_texts = [seg.transcript_text for seg in category_segments] combined_text = "\n\n".join(segment_texts) source_ids = [seg.id for seg in category_segments] # 查找 active 章节(被清除的章节不继续更新,而是创建新的),并预加载 sections stmt_chapter = ( select(Chapter) .where( Chapter.user_id == user_id, Chapter.category == chapter_category, Chapter.is_active == True, ) .options(joinedload(Chapter.sections)) ) result_chapter = db.execute(stmt_chapter) chapter = result_chapter.unique().scalar_one_or_none() # 获取 slot snippets slot_snippets = { key: value.snippet for key, value in (state.slots.get(chapter_category, {}) or {}).items() if value.snippet } # 生成标题和内容;已有章节的正文从 sections 拼接 title = chapter.title if chapter else f"{chapter_category} 回忆" existing_content = "" if chapter and getattr(chapter, "sections", None): existing_content = "\n\n".join( s.content for s in sorted(chapter.sections, key=lambda x: x.order_index) if (s.content or "").strip() ) narrative = combined_text if llm: try: if not chapter: title_prompt = get_creative_title_prompt( stage=chapter_category, emotion="neutral", slots=slot_snippets, user_profile=user_profile, birth_year=user_birth_year, ) title_response = llm.invoke(title_prompt) title = title_response.content.strip().strip('"') narrative_prompt = get_narrative_prompt( stage=chapter_category, slots=slot_snippets, new_content=combined_text, existing_content=existing_content, user_profile=user_profile, birth_year=user_birth_year, ) narrative_response = llm.invoke(narrative_prompt) new_narrative = narrative_response.content.strip() # 追加而非替换 if existing_content: narrative = f"{existing_content}\n\n{new_narrative}" else: narrative = new_narrative except Exception as e: logger.warning(f"LLM 生成失败: {e}") if existing_content: narrative = f"{existing_content}\n\n{combined_text}" # 安全检查:新内容不应比旧内容短 if existing_content and len(narrative) < len(existing_content) * 0.8: logger.warning( f"内容长度异常: existing={len(existing_content)}, " f"new={len(narrative)}, category={chapter_category}. 回退为追加模式" ) narrative = f"{existing_content}\n\n{combined_text}" # 入库前:占位符位置用正则匹配后拼上固定模板 narrative = inject_image_placeholder_template(narrative) calculated_order_index = STAGE_TO_ORDER.get(chapter_category, 999) # 写入 sections(拆段 + 每段配图占位),新建或覆盖该章下所有 sections chapter = _save_narrative_to_sections( db, chapter, narrative, title=title, category=chapter_category, order_index=calculated_order_index, source_segments=source_ids, user_id=user_id, ) db.flush() db.refresh(chapter) if image_settings.enabled and _chapter_has_any_section_images_to_generate(chapter): chapters_to_enqueue.add(chapter.id) # 更新 Book stmt_book = select(Book).where(Book.user_id == user_id).order_by(Book.updated_at.desc()) result_book = db.execute(stmt_book) book = result_book.scalar_one_or_none() if not book: book = Book( id=str(uuid.uuid4()), user_id=user_id, title="我的回忆录", total_pages=0, total_words=0, cover_image_url=None, ) db.add(book) book.has_update = True book.last_update_chapter_id = chapter.id finally: _release_chapter_lock(user_id, chapter_category) # 标记段落为已处理 for seg in segments: seg.processed = True db.commit() for chapter_id in sorted(chapters_to_enqueue): try: logger.info(f"派发章节补图任务: chapter={chapter_id}") generate_chapter_images.delay(chapter_id) except Exception as exc: logger.warning(f"补图任务派发失败: chapter={chapter_id}, error={exc}") logger.info(f"回忆录处理完成: user_id={user_id}, task_id={task_id}") # 更新任务状态为成功 _update_task_status_sync(user_id, task_id, "success", {"processed": len(segments)}) return {"status": "success", "processed": len(segments)} except Exception as e: logger.error(f"回忆录处理失败: {e}") # 更新任务状态为失败 _update_task_status_sync(user_id, task_id, "failure", {"error": str(e)}) # 重试 raise self.retry(exc=e) @shared_task(bind=True, max_retries=3, default_retry_delay=30) def generate_chapter_content(self, user_id: str, stage: str, new_content: str): """ 单独生成章节内容的任务(用于实时更新) Args: user_id: 用户 ID stage: 阶段 new_content: 新内容 """ logger.info(f"生成章节内容: user_id={user_id}, stage={stage}") try: with get_sync_db() as db: llm = _get_llm() # 查找 active 章节并预加载 sections stmt = ( select(Chapter) .where( Chapter.user_id == user_id, Chapter.category == stage, Chapter.is_active == True, ) .options(joinedload(Chapter.sections)) ) result = db.execute(stmt) chapter = result.unique().scalar_one_or_none() existing_content = "" if chapter and getattr(chapter, "sections", None): existing_content = "\n\n".join( s.content for s in sorted(chapter.sections, key=lambda x: x.order_index) if (s.content or "").strip() ) if llm: prompt = get_narrative_prompt( stage=stage, slots={}, new_content=new_content, existing_content=existing_content, ) response = llm.invoke(prompt) new_narrative = response.content.strip() # 追加而非替换 if existing_content: narrative = f"{existing_content}\n\n{new_narrative}" else: narrative = new_narrative else: narrative = f"{existing_content}\n\n{new_content}" if existing_content else new_content # 安全检查:新内容不应比旧内容短 if existing_content and len(narrative) < len(existing_content) * 0.8: logger.warning( f"内容长度异常: existing={len(existing_content)}, " f"new={len(narrative)}, stage={stage}. 回退为追加模式" ) narrative = f"{existing_content}\n\n{new_content}" # 入库前:占位符位置用正则匹配后拼上固定模板 narrative = inject_image_placeholder_template(narrative) calculated_order_index = STAGE_TO_ORDER.get(stage, 999) title = chapter.title if chapter else f"{stage} 回忆" chapter = _save_narrative_to_sections( db, chapter, narrative, title=title, category=stage, order_index=calculated_order_index, source_segments=[], user_id=user_id, ) db.commit() return {"status": "success"} except Exception as e: logger.error(f"章节生成失败: {e}") raise self.retry(exc=e) def build_cos_key(user_id: str, chapter_id: str, index: int, prompt: str) -> str: short_hash = hashlib.sha1(prompt.encode("utf-8")).hexdigest()[:10] return f"memoirs/{user_id}/{chapter_id}/{index}-{short_hash}.png" @shared_task(bind=True, max_retries=3, default_retry_delay=30) def generate_chapter_images(self, chapter_id: str): """Async task to generate images for a chapter's sections (each section has at most one image).""" lock_acquired = False provider = None with get_sync_db() as db: try: stmt = ( select(Chapter) .where(Chapter.id == chapter_id) .options( joinedload(Chapter.sections).joinedload(ChapterSection.image_record), joinedload(Chapter.images), ) ) chapter = db.execute(stmt).unique().scalar_one_or_none() if not chapter: logger.info("章节补图跳过: chapter=%s, reason=not_found", chapter_id) return {"status": "no_chapter"} sections = getattr(chapter, "sections", None) or [] sections_with_pending = [ (idx, s) for idx, s in enumerate(sections) if _section_has_image_to_generate(s) ] if not sections_with_pending: logger.info("章节补图跳过: chapter=%s, reason=no_pending_images", chapter_id) return {"status": "no_images"} settings = MemoirImageSettings.from_env() if not settings.enabled: logger.info("章节补图跳过: chapter=%s, reason=disabled", chapter_id) return {"status": "disabled"} lock_acquired = _acquire_chapter_image_lock(chapter_id) if not lock_acquired: logger.info("章节补图跳过: chapter=%s, reason=locked", chapter_id) return {"status": "locked"} prompt_service = MemoirImagePromptService(_get_llm(), settings) image_generator = get_image_generator() storage = TencentCosStorageService.from_env() logger.info( "章节补图开始: chapter=%s, pending_sections=%d", chapter_id, len(sections_with_pending), ) retryable_failures: list[str] = [] permanent_failures: list[str] = [] def _apply_item_to_memoir_image(rec: MemoirImage, d: dict): rec.placeholder = d.get("placeholder") rec.description = d.get("description") rec.status = (d.get("status") or "pending").strip() or "pending" rec.prompt = d.get("prompt") rec.url = d.get("url") rec.storage_key = d.get("storage_key") rec.provider = d.get("provider") rec.style = d.get("style") rec.size = d.get("size") rec.error = d.get("error") rec.retryable = d.get("retryable") rec.updated_at = datetime.now(timezone.utc) for sec_index, section in sections_with_pending: item = memoir_image_to_dict(section.image_record) if section.image_record else {} current_item = dict(item) if item else {} current_item.setdefault("placeholder", "") current_item.setdefault("description", "") current_item["status"] = IMAGE_STATUS_PROCESSING current_item["updated_at"] = datetime.now(timezone.utc).isoformat() _apply_item_to_memoir_image(section.image_record, current_item) db.commit() try: context_lines = (section.content or "").strip().split("\n")[:5] context_excerpt = " ".join(context_lines)[:200] prompt_data = prompt_service.build_prompt( chapter_title=chapter.title, chapter_category=chapter.category or "", description=current_item.get("description", ""), context_excerpt=context_excerpt, ) result = image_generator.generate( prompt_data["prompt"], prompt_data["size"], prompt_data["style"], ) if result.status != TaskStatus.COMPLETED or not result.image_url: raise RuntimeError(result.error or "Image generation failed") image_bytes = _normalize_image_bytes_for_storage( image_generator.download_image(result.image_url) ) key = build_cos_key(chapter.user_id, chapter.id, sec_index, prompt_data["prompt"]) current_item["storage_key"] = key current_item["url"] = storage.upload_bytes(image_bytes, key, "image/png") current_item["prompt"] = prompt_data["prompt"] current_item["style"] = prompt_data["style"] current_item["size"] = prompt_data["size"] current_item["status"] = IMAGE_STATUS_COMPLETED current_item["error"] = None current_item["retryable"] = None current_item["updated_at"] = datetime.now(timezone.utc).isoformat() _apply_item_to_memoir_image(section.image_record, current_item) db.commit() logger.info( "章节补图成功: chapter=%s, section_index=%s, url=%s", chapter_id, sec_index, current_item["url"], ) except Exception as exc: failure_msg = f"section_index={sec_index}, error={exc}" if isinstance(exc, CosUploadError) and not exc.retryable: permanent_failures.append(failure_msg) logger.error("图片上传不可重试,清理配图: chapter=%s, %s", chapter_id, failure_msg) mi = section.image_record section.image_id = None if mi: db.delete(mi) db.commit() else: current_item["status"] = IMAGE_STATUS_FAILED current_item["error"] = str(exc) current_item["retryable"] = True retryable_failures.append(failure_msg) logger.warning("图片生成失败(可重试): chapter=%s, %s", chapter_id, failure_msg) current_item["updated_at"] = datetime.now(timezone.utc).isoformat() _apply_item_to_memoir_image(section.image_record, current_item) db.commit() # 封面图先空着,不自动用首张完成图做封面 if retryable_failures: raise RuntimeError( f"章节补图存在可重试失败项: chapter={chapter_id}, failures={'; '.join(retryable_failures)}" ) return {"status": "success"} except Exception as exc: logger.error("章节补图任务失败: chapter=%s, error=%s", chapter_id, exc) raise self.retry(exc=exc) finally: if provider: provider.close() if lock_acquired: _release_chapter_image_lock(chapter_id)