重构回忆录为 story-first / markdown-first 架构并整合图片意图与前端 UI 修复

本次 squash merge 将 codex-story-first-image-intent 的整体改动合入 development,核心内容包括:

1. 后端数据与迁移:新增 stories、story_versions、story_image_intents、chapter_cover_intents、assets 等模型与 Alembic 迁移,建立 story-first、markdown-first、asset-first 的主数据链路。

2. 生成与任务链:引入 StoryBuilderOrchestrator、ChapterComposerOrchestrator、story_image_tasks、chapter_cover_tasks,图片生成从正文占位符改为结构化 intent -> asset -> markdown 回填。

3. 并发与一致性:为 story/chapter intent 增加 claim_token、claimed_at、attempt_count,采用数据库原子 claim 为主、Redis 锁为辅,避免重复生成、锁误删和 processing 卡死。

4. Memoir 读写路径:章节 canonical_markdown 成为正文真源,列表/详情接口补齐 markdown、cover_asset、word_count 等字段,PDF 与 asset 解析链路同步升级。

5. Memory / Retrieval:扩展 transcript ingest、chunking、evidence 检索与 story 聚合基础设施,为后续 story-first RAG 与多 agent 编排提供底座。

6. App 端体验:章节页继续走 MarkdownRenderer 阅读链,同时吸收 fix3-19 的跨平台 UI glitch 修复;更新对话页、首页、文案资源与章节列表映射逻辑。

7. 测试与文档:补充 asset resolver、story image task、章节封面派发、markdown 映射等回归测试,并加入图片占位符退役设计文档。
This commit is contained in:
Kevin
2026-03-20 10:30:07 +08:00
parent 13e3124b85
commit 7f57f96c25
67 changed files with 4751 additions and 832 deletions

View File

@@ -3,6 +3,14 @@ Celery 任务模块
"""
from .celery_app import celery_app
from .chapter_cover_tasks import generate_chapter_cover
from .memoir_tasks import process_memoir_segments, generate_chapter_images
from .story_image_tasks import generate_story_image
__all__ = ["celery_app", "process_memoir_segments", "generate_chapter_images"]
__all__ = [
"celery_app",
"process_memoir_segments",
"generate_chapter_images",
"generate_chapter_cover",
"generate_story_image",
]

View File

@@ -9,10 +9,12 @@ from celery import Celery
from app.core.config import settings
# 与 main.py / Alembic 一致:注册所有 model避免 mapper 初始化时 relationship 字符串找不到类
from app.features.asset import models as _asset_models # noqa: F401 - register Asset
from app.features.auth import models as _auth_models # noqa: F401
from app.features.conversation import models as _conv_models # noqa: F401
from app.features.memory import models as _memory_models # noqa: F401
from app.features.memoir import models as _memoir_models # noqa: F401
from app.features.story import models as _story_models # noqa: F401
from app.features.payment import models as _payment_models # noqa: F401
from app.features.user import models as _user_models # noqa: F401
@@ -23,7 +25,11 @@ celery_app = Celery(
"life_echo",
broker=REDIS_URL,
backend=REDIS_URL,
include=["app.tasks.memoir_tasks"],
include=[
"app.tasks.memoir_tasks",
"app.tasks.story_image_tasks",
"app.tasks.chapter_cover_tasks",
],
)
# Celery 配置

View File

@@ -0,0 +1,304 @@
"""
Chapter 封面生成 Celery 任务。
从 chapter_cover_intents 原子 claim intent或创建新 intent 后生成封面,
写入 assets绑定到 chapters.cover_asset_id。封面不回写进正文 markdown。
"""
import hashlib
import uuid
from datetime import datetime, timedelta, timezone
from celery import shared_task
from PIL import Image
from sqlalchemy import and_, func, or_, select, update
from sqlalchemy.orm import joinedload
from app.core.db import get_sync_db
from app.core.dependencies import get_image_generator
from app.core.redis_lock import acquire_redis_lock, release_redis_lock
from app.features.asset.models import Asset
from app.features.memoir.chapter_cover import (
aggregate_cover_prompt_from_chapter,
aggregate_cover_prompt_from_stories,
)
from app.features.memoir.memoir_images.storage import TencentCosStorageService
from app.features.memoir.models import Chapter, ChapterCoverIntent, ChapterStoryLink
from app.ports.image_gen import TaskStatus
from app.core.logging import get_logger
logger = get_logger(__name__)
CHAPTER_COVER_LOCK_TTL_SECONDS = 1800
CHAPTER_COVER_CLAIM_TTL_SECONDS = 1800
def _build_cover_cos_key(user_id: str, chapter_id: str, prompt: str) -> str:
short_hash = hashlib.sha1(prompt.encode("utf-8")).hexdigest()[:10]
return f"chapters/{user_id}/{chapter_id}/cover-{short_hash}.png"
def _normalize_image_bytes(image_bytes: bytes) -> bytes:
from io import BytesIO
with Image.open(BytesIO(image_bytes)) as image:
output = BytesIO()
if image.mode in {"RGBA", "LA"}:
normalized = image
elif image.mode == "P":
normalized = image.convert("RGBA")
else:
normalized = image.convert("RGB")
normalized.save(output, format="PNG")
return output.getvalue()
def _build_cover_prompt(prompt_brief: str) -> str:
"""从 intent.prompt_brief 构建出图 prompt。"""
from app.agents.memoir.prompts import IMAGE_PLACEHOLDER_TEMPLATE
base = IMAGE_PLACEHOLDER_TEMPLATE
if prompt_brief and prompt_brief.strip():
return f"{base}{prompt_brief.strip()}"
return f"{base}。章节封面"
def _chapter_cover_claimable_clause(now: datetime):
cutoff = now - timedelta(seconds=CHAPTER_COVER_CLAIM_TTL_SECONDS)
return or_(
ChapterCoverIntent.status.in_(["pending", "failed"]),
and_(
ChapterCoverIntent.status == "processing",
or_(
ChapterCoverIntent.claimed_at.is_(None),
ChapterCoverIntent.claimed_at < cutoff,
),
),
)
def _build_chapter_cover_brief(chapter: Chapter) -> str:
prompt_brief = ""
stories = []
if getattr(chapter, "story_links", None):
for link in sorted(
chapter.story_links, key=lambda l: getattr(l, "order_index", 0)
):
story = getattr(link, "story", None)
if story:
stories.append(story)
prompt_brief = aggregate_cover_prompt_from_stories(
stories,
chapter_title=chapter.title or "",
chapter_category=chapter.category or "",
)
if prompt_brief:
return prompt_brief
md = (chapter.canonical_markdown or "").strip()
excerpt = md[:200] if md else ""
return aggregate_cover_prompt_from_chapter(
chapter_title=chapter.title or "",
chapter_category=chapter.category or "",
markdown_excerpt=excerpt,
)
def _claim_chapter_cover_intent_sync(db, chapter: Chapter, claim_token: str):
now = datetime.now(timezone.utc)
claimable = _chapter_cover_claimable_clause(now)
candidate_id = db.execute(
select(ChapterCoverIntent.id)
.where(ChapterCoverIntent.chapter_id == chapter.id)
.where(claimable)
.order_by(
ChapterCoverIntent.updated_at.desc(), ChapterCoverIntent.created_at.desc()
)
.limit(1)
).scalar_one_or_none()
if candidate_id:
claimed = db.execute(
update(ChapterCoverIntent)
.where(ChapterCoverIntent.id == candidate_id)
.where(_chapter_cover_claimable_clause(now))
.values(
status="processing",
claim_token=claim_token,
claimed_at=now,
updated_at=now,
error=None,
attempt_count=func.coalesce(ChapterCoverIntent.attempt_count, 0) + 1,
)
)
if (claimed.rowcount or 0) != 1:
db.rollback()
return None
intent = db.get(ChapterCoverIntent, candidate_id)
db.commit()
return intent
cutoff = now - timedelta(seconds=CHAPTER_COVER_CLAIM_TTL_SECONDS)
fresh_processing = db.execute(
select(ChapterCoverIntent.id)
.where(ChapterCoverIntent.chapter_id == chapter.id)
.where(ChapterCoverIntent.status == "processing")
.where(ChapterCoverIntent.claimed_at.is_not(None))
.where(ChapterCoverIntent.claimed_at >= cutoff)
.limit(1)
).scalar_one_or_none()
if fresh_processing:
return None
intent = ChapterCoverIntent(
id=str(uuid.uuid4()),
chapter_id=chapter.id,
prompt_brief=_build_chapter_cover_brief(chapter),
status="processing",
claim_token=claim_token,
claimed_at=now,
attempt_count=1,
)
db.add(intent)
db.flush()
db.commit()
return intent
@shared_task(bind=True, max_retries=3, default_retry_delay=30)
def generate_chapter_cover(self, chapter_id: str):
"""
为 chapter 生成封面。
从 chapter_cover_intents 原子认领 intent或创建新 intent 后生成,
写入 assets 并绑定到 chapters.cover_asset_id。
"""
lock_key = f"lock:chapter-images:{chapter_id}"
lock_handle = acquire_redis_lock(
lock_key, ttl_seconds=CHAPTER_COVER_LOCK_TTL_SECONDS
)
if lock_handle is None:
logger.info("generate_chapter_cover: chapter=%s, reason=locked", chapter_id)
return {"status": "locked"}
claim_token = uuid.uuid4().hex
intent = None
try:
with get_sync_db() as db:
stmt = (
select(Chapter)
.where(Chapter.id == chapter_id)
.options(
joinedload(Chapter.story_links).joinedload(ChapterStoryLink.story),
)
)
chapter = db.execute(stmt).unique().scalar_one_or_none()
if not chapter:
logger.info(
"generate_chapter_cover: chapter=%s, reason=not_found", chapter_id
)
return {"status": "no_chapter"}
if getattr(chapter, "cover_asset_id", None):
logger.info(
"generate_chapter_cover: chapter=%s, reason=has_cover_asset",
chapter_id,
)
return {"status": "already_has_asset"}
intent = _claim_chapter_cover_intent_sync(db, chapter, claim_token)
if not intent:
logger.info(
"generate_chapter_cover: chapter=%s, reason=no_claimable_intent",
chapter_id,
)
return {"status": "no_intent"}
try:
image_generator = get_image_generator()
storage = TencentCosStorageService.from_env()
from app.features.memoir.memoir_images.settings import MemoirImageSettings
settings = MemoirImageSettings.from_env()
prompt_final = _build_cover_prompt(intent.prompt_brief or "")
result = image_generator.generate(
prompt_final,
settings.default_size,
settings.default_style,
)
if result.status != TaskStatus.COMPLETED or not result.image_url:
raise RuntimeError(result.error or "Image generation failed")
image_bytes = _normalize_image_bytes(
image_generator.download_image(result.image_url)
)
cos_key = _build_cover_cos_key(chapter.user_id, chapter_id, prompt_final)
url = storage.upload_bytes(image_bytes, cos_key, "image/png")
asset_id = str(uuid.uuid4())
with get_sync_db() as db:
intent_db = db.get(ChapterCoverIntent, intent.id)
if (
not intent_db
or (intent_db.status or "").strip() != "processing"
or (intent_db.claim_token or "").strip() != claim_token
):
logger.info(
"generate_chapter_cover: skip persist intent=%s status=%s claim=%s",
intent.id,
getattr(intent_db, "status", None),
getattr(intent_db, "claim_token", None),
)
return {"status": "superseded_or_cancelled"}
asset = Asset(
id=asset_id,
asset_type="chapter_cover",
storage_key=cos_key,
url=url,
provider=settings.provider,
style_profile=settings.default_style,
prompt_final=prompt_final,
status="completed",
)
db.add(asset)
db.flush()
intent_db.asset_id = asset_id
intent_db.status = "completed"
intent_db.claim_token = None
intent_db.claimed_at = None
intent_db.error = None
intent_db.updated_at = datetime.now(timezone.utc)
chapter_db = db.get(Chapter, chapter_id)
chapter_db.cover_asset_id = asset_id
db.commit()
logger.info(
"generate_chapter_cover: chapter=%s, asset=%s, url=%s",
chapter_id,
asset_id,
url,
)
return {"status": "success", "asset_id": asset_id}
except Exception as exc:
if intent is not None:
with get_sync_db() as db:
intent_db = db.get(ChapterCoverIntent, intent.id)
if (
intent_db
and (intent_db.claim_token or "").strip() == claim_token
):
intent_db.status = "failed"
intent_db.claim_token = None
intent_db.claimed_at = None
intent_db.error = str(exc)
intent_db.updated_at = datetime.now(timezone.utc)
db.commit()
logger.warning(
"generate_chapter_cover failed: chapter=%s, error=%s", chapter_id, exc
)
raise self.retry(exc=exc)
finally:
release_redis_lock(lock_handle)

View File

@@ -30,22 +30,18 @@ from app.agents.state_schema import MemoirStateSchema, SlotData, default_state
from app.agents.memoir.prompts import (
STAGE_TO_ORDER,
get_narrative_json_prompt,
inject_image_placeholder_template,
)
from app.agents.memoir import MemoirOrchestrator
from app.agents.memoir.narrative_agent import NarrativeAgent
from app.agents.memoir.placeholder_agent import inject_placeholders
from app.agents.chat.prompts_profile import format_user_profile_context
from app.features.memoir.memoir_images.parser import (
build_initial_image_assets,
parse_image_placeholders,
parse_narrative_to_sections,
split_narrative_to_sections,
)
import hashlib
from app.core.dependencies import get_image_generator
from app.agents.image_prompt import ImagePromptOrchestrator
from app.features.memoir.memoir_images.prompting import MemoirImagePromptService
from app.features.memoir.memoir_images.schema import (
completed_image_assets,
IMAGE_STATUS_COMPLETED,
@@ -250,6 +246,19 @@ def _chapter_has_cover_to_generate(chapter) -> bool:
return False
def _chapter_needs_cover_enqueue(chapter) -> bool:
"""尚无 cover_asset 且章节有正文时,可派发 generate_chapter_cover。"""
if not chapter:
return False
if getattr(chapter, "cover_asset_id", None):
return False
md = (getattr(chapter, "canonical_markdown", None) or "").strip()
if md:
return True
sections = getattr(chapter, "sections", None) or []
return any((getattr(s, "content", None) or "").strip() for s in sections)
def _get_cover_memoir_image(chapter):
"""获取章节封面 MemoirImagesection_id=None若无可生成则返回 None。"""
images = getattr(chapter, "images", None) or []
@@ -298,11 +307,10 @@ def _save_narrative_to_sections(
user_id: str,
):
"""
带占位符的 narrative 拆成 chapter_sections 并写入;为每段占位符创建 pending 配图
已有 section 与图片不删除,仅追加新内容。若无封面 MemoirImage 则创建 pending 封面section_id=None
将 narrative 拆成 chapter_sections 并写入(段落不配 MemoirImage
已有 section 不删除,仅追加新内容。章节封面由 generate_chapter_cover + cover_asset_id 闭环处理
chapter 可为已有章节或 None会新建。返回 chapter。
"""
now_iso = datetime.now(timezone.utc).isoformat()
if chapter is None:
chapter = Chapter(
id=str(uuid.uuid4()),
@@ -344,6 +352,11 @@ def _save_narrative_to_sections(
chapter.source_segments = list(
set((chapter.source_segments or []) + (source_segments or []))
)
from app.features.memoir.repo import (
ensure_chapter_markdown_and_version_sync,
)
ensure_chapter_markdown_and_version_sync(db, chapter, narrative)
return chapter
narrative_to_parse = new_part
order_base = max(s.order_index for s in existing_sections) + 1
@@ -351,13 +364,6 @@ def _save_narrative_to_sections(
narrative_to_parse = (narrative or "").strip()
order_base = 0
img_settings = MemoirImageSettings.from_env()
prompt_service = (
MemoirImagePromptService(llm=None, settings=img_settings)
if img_settings.enabled
else None
)
segments = parse_narrative_to_sections(narrative_to_parse)
if not segments:
sec = ChapterSection(
@@ -369,82 +375,20 @@ def _save_narrative_to_sections(
)
db.add(sec)
db.flush()
if img_settings.enabled:
stmt_cover = select(MemoirImage).where(
MemoirImage.chapter_id == chapter.id,
MemoirImage.section_id.is_(None),
)
if not db.execute(stmt_cover).scalar_one_or_none():
cover_ph = {
"placeholder": "{{{{{{{{IMAGE:章节封面}}}}}}}}",
"description": "章节封面",
"index": 0,
}
cover_asset = build_initial_image_assets(
[cover_ph],
img_settings.provider,
prompt_service.CATEGORY_STYLE_MAP.get(
category, img_settings.default_style
)
if prompt_service
else img_settings.default_style,
img_settings.default_size,
now_iso,
)[0]
cover_mi = _memoir_image_from_asset(chapter.id, None, 0, cover_asset)
db.add(cover_mi)
db.flush()
chapter.title = title
chapter.is_new = True
chapter.source_segments = list(
set((chapter.source_segments or []) + (source_segments or []))
)
from app.features.memoir.repo import ensure_chapter_markdown_and_version_sync
ensure_chapter_markdown_and_version_sync(db, chapter, narrative)
return chapter
def _should_have_image(seg: dict, order_idx: int) -> bool:
"""有 placeholder_info 的段落配图;无则兼容旧格式(每 3 段 1 图)"""
ph = seg.get("placeholder_info")
if ph and ph.get("description"):
return True
return (order_idx % 3) == 2
def _placeholder_for_segment(seg: dict, order_idx: int) -> dict | None:
ph = seg.get("placeholder_info")
if ph and ph.get("placeholder") and ph.get("description"):
# 确保有 indexbuild_initial_image_assets 依赖此字段
if "index" not in ph:
ph = {**ph, "index": order_idx}
return ph
content = (seg.get("content") or "").strip()
desc = (content[:50] + "") if len(content) > 50 else (content or "章节配图")
return {
"placeholder": f"{{{{{{{{IMAGE:{desc}}}}}}}}}",
"description": desc,
"index": order_idx,
}
# 按顺序创建 section每 3 个 section 对应 1 张配图
# 段落不再绑定配图(每故事/章节结构化出图);仅章节封面走 MemoirImage
for i, seg in enumerate(segments):
order_idx = order_base + i
content = (seg.get("content") or "").strip()
image_asset = None
if img_settings.enabled and _should_have_image(seg, order_idx):
ph = _placeholder_for_segment(seg, order_idx)
style = (
prompt_service.CATEGORY_STYLE_MAP.get(
category, img_settings.default_style
)
if prompt_service
else img_settings.default_style
)
image_asset = build_initial_image_assets(
[ph],
img_settings.provider,
style,
img_settings.default_size,
now_iso,
)[0]
sec = ChapterSection(
id=str(uuid.uuid4()),
chapter_id=chapter.id,
@@ -454,49 +398,17 @@ def _save_narrative_to_sections(
)
db.add(sec)
db.flush()
if image_asset:
# 本段配图与当前 section 绑定memoir_images.order_index = section.order_index + 1封面 0 预留)
mi = _memoir_image_from_asset(
chapter.id, sec.id, order_idx + 1, image_asset
)
db.add(mi)
db.flush()
sec.image_id = mi.id
db.flush()
# 封面图:若无则创建 pending MemoirImagesection_id=None, order_index=0
if img_settings.enabled:
stmt_cover = select(MemoirImage).where(
MemoirImage.chapter_id == chapter.id,
MemoirImage.section_id.is_(None),
)
existing_cover = db.execute(stmt_cover).scalar_one_or_none()
if not existing_cover:
cover_ph = {
"placeholder": "{{{{{{{{IMAGE:章节封面}}}}}}}}",
"description": "章节封面",
"index": 0,
}
cover_asset = build_initial_image_assets(
[cover_ph],
img_settings.provider,
prompt_service.CATEGORY_STYLE_MAP.get(
category, img_settings.default_style
)
if prompt_service
else img_settings.default_style,
img_settings.default_size,
now_iso,
)[0]
cover_mi = _memoir_image_from_asset(chapter.id, None, 0, cover_asset)
db.add(cover_mi)
db.flush()
chapter.title = title
chapter.is_new = True
chapter.source_segments = list(
set((chapter.source_segments or []) + (source_segments or []))
)
# 确保 canonical_markdown 与版本链agent 产出由 repo 落库)
from app.features.memoir.repo import ensure_chapter_markdown_and_version_sync
ensure_chapter_markdown_and_version_sync(db, chapter, narrative)
return chapter
@@ -504,9 +416,7 @@ def initialize_chapter_images(_chapter):
"""
兼容旧调用:若章节已改为 sections 存储,则图片初始化已在 _save_narrative_to_sections 中完成,直接返回。
"""
logger.info(
"initialize_chapter_images: 已由 _save_narrative_to_sections 处理 section 配图,跳过"
)
logger.info("initialize_chapter_images: 封面由 generate_chapter_cover 处理,跳过")
return []
@@ -625,6 +535,17 @@ def process_memoir_segments(self, user_id: str, segment_ids: List[str]):
logger.warning(f"未找到段落: {segment_ids}")
return {"status": "no_segments"}
# Memory ingest: transcript -> memory_sources, chunks, FTS
conv_id = getattr(segments[0], "conversation_id", None) or ""
transcript = "\n\n".join(seg.transcript_text or "" for seg in segments)
if transcript.strip():
try:
from app.features.memory.service import ingest_transcript_sync
ingest_transcript_sync(db, user_id, conv_id, transcript)
except Exception as e:
logger.warning("Memory ingest 跳过: %s", e)
# 获取用户状态和资料
state = _get_or_create_state_sync(user_id, db)
llm = _get_llm()
@@ -643,6 +564,11 @@ def process_memoir_segments(self, user_id: str, segment_ids: List[str]):
)
narrative_agent = NarrativeAgent()
chapter_composer = __import__(
"app.agents.memoir.chapter_composer_orchestrator",
fromlist=["ChapterComposerOrchestrator"],
).ChapterComposerOrchestrator()
from app.features.memory.repo import retrieve_evidence_sync
def _process_category(
chapter_category: str,
@@ -652,11 +578,26 @@ def process_memoir_segments(self, user_id: str, segment_ids: List[str]):
birth_year,
llm,
):
"""单章节处理:NarrativeAgent 生成标题+叙事PlaceholderInjectAgent 注入,持久化"""
"""单章节处理:ChapterComposerOrchestrator 生成 markdown或 NarrativeAgent 回退repo 落库"""
segment_texts = [seg.transcript_text or "" for seg in category_segments]
combined_text = "\n\n".join(segment_texts)
source_ids = [seg.id for seg in category_segments]
# 证据检索writing RAG
try:
evidence = retrieve_evidence_sync(
db, user_id, combined_text, top_k=10
)
except Exception as e:
logger.warning("Evidence 检索跳过: %s", e)
evidence = {
"relevant_chunks": [],
"relevant_summaries": [],
"relevant_facts": [],
"timeline_hints": [],
"relevant_stories": [],
}
stmt_chapter = (
select(Chapter)
.where(
@@ -684,14 +625,19 @@ def process_memoir_segments(self, user_id: str, segment_ids: List[str]):
slot_snippets[key] = snip
title = chapter.title if chapter else f"{chapter_category} 回忆"
existing_content = ""
if chapter and getattr(chapter, "sections", None):
existing_content = "\n\n".join(
s.content
for s in sorted(chapter.sections, key=lambda x: x.order_index)
if (s.content or "").strip()
existing_markdown = ""
if chapter:
existing_markdown = (
getattr(chapter, "canonical_markdown", None) or ""
)
narrative = combined_text
if not existing_markdown and getattr(chapter, "sections", None):
existing_markdown = "\n\n".join(
s.content
for s in sorted(
chapter.sections, key=lambda x: x.order_index
)
if (s.content or "").strip()
)
if not chapter:
title = narrative_agent.generate_title(
@@ -702,37 +648,46 @@ def process_memoir_segments(self, user_id: str, segment_ids: List[str]):
birth_year=birth_year,
llm=llm,
)
new_narrative = narrative_agent.generate_narrative(
stage=chapter_category,
slots=slot_snippets,
new_content=combined_text,
existing_content=existing_content,
# ChapterComposerOrchestrator 产出 markdownagent 不落库)
narrative = chapter_composer.compose_chapter_markdown(
title=title,
category=chapter_category,
evidence=evidence,
existing_markdown=existing_markdown,
user_profile=profile,
birth_year=birth_year,
llm=llm,
)
if _is_json_narrative(new_narrative):
narrative = new_narrative
elif existing_content:
narrative = f"{existing_content}\n\n{new_narrative}"
else:
narrative = new_narrative
if not narrative or not narrative.strip():
new_narrative = narrative_agent.generate_narrative(
stage=chapter_category,
slots=slot_snippets,
new_content=combined_text,
existing_content=existing_markdown,
user_profile=profile,
birth_year=birth_year,
llm=llm,
)
if _is_json_narrative(new_narrative):
narrative = new_narrative
elif existing_markdown:
narrative = f"{existing_markdown}\n\n{new_narrative}"
else:
narrative = new_narrative
if (
existing_content
existing_markdown
and not _is_json_narrative(narrative)
and len(narrative) < len(existing_content) * 0.8
and len(narrative) < len(existing_markdown) * 0.8
):
logger.warning(
"内容长度异常: existing=%d, new=%d, category=%s. 回退为追加模式",
len(existing_content),
len(existing_markdown),
len(narrative),
chapter_category,
)
narrative = f"{existing_content}\n\n{combined_text}"
narrative = f"{existing_markdown}\n\n{combined_text}"
if not _is_json_narrative(narrative):
narrative = inject_placeholders(narrative)
calculated_order_index = STAGE_TO_ORDER.get(chapter_category, 999)
chapter = _save_narrative_to_sections(
@@ -748,9 +703,8 @@ def process_memoir_segments(self, user_id: str, segment_ids: List[str]):
db.flush()
db.refresh(chapter)
has_images = image_settings.enabled and (
_chapter_has_any_section_images_to_generate(chapter)
or _chapter_has_cover_to_generate(chapter)
needs_cover_enqueue = (
image_settings.enabled and _chapter_needs_cover_enqueue(chapter)
)
stmt_book = (
@@ -773,7 +727,7 @@ def process_memoir_segments(self, user_id: str, segment_ids: List[str]):
book.has_update = True
book.last_update_chapter_id = chapter.id
return chapter, has_images
return chapter, needs_cover_enqueue
def _raise_retry():
raise self.retry(countdown=10)
@@ -800,13 +754,15 @@ def process_memoir_segments(self, user_id: str, segment_ids: List[str]):
db.commit()
from app.tasks.chapter_cover_tasks import generate_chapter_cover
for chapter_id in sorted(chapters_to_enqueue):
try:
logger.info(f"派发章节补图任务: chapter={chapter_id}")
generate_chapter_images.delay(chapter_id)
logger.info(f"派发章节封面任务: chapter={chapter_id}")
generate_chapter_cover.delay(chapter_id)
except Exception as exc:
logger.warning(
f"补图任务派发失败: chapter={chapter_id}, error={exc}"
f"章节封面任务派发失败: chapter={chapter_id}, error={exc}"
)
logger.info(f"回忆录处理完成: user_id={user_id}, task_id={task_id}")
@@ -902,8 +858,6 @@ def generate_chapter_content(self, user_id: str, stage: str, new_content: str):
)
narrative = f"{existing_content}\n\n{new_content}"
if not _is_json_narrative(narrative):
narrative = inject_image_placeholder_template(narrative)
calculated_order_index = STAGE_TO_ORDER.get(stage, 999)
title = chapter.title if chapter else f"{stage} 回忆"
chapter = _save_narrative_to_sections(
@@ -922,16 +876,15 @@ def generate_chapter_content(self, user_id: str, stage: str, new_content: str):
if (
image_settings.enabled
and chapter
and (
_chapter_has_any_section_images_to_generate(chapter)
or _chapter_has_cover_to_generate(chapter)
)
and _chapter_needs_cover_enqueue(chapter)
):
from app.tasks.chapter_cover_tasks import generate_chapter_cover
try:
generate_chapter_images.delay(chapter.id)
generate_chapter_cover.delay(chapter.id)
except Exception as exc:
logger.warning(
"补图任务派发失败: chapter=%s, error=%s", chapter.id, exc
"章节封面任务派发失败: chapter=%s, error=%s", chapter.id, exc
)
return {"status": "success"}
@@ -948,7 +901,7 @@ def build_cos_key(user_id: str, chapter_id: str, index: int | str, prompt: str)
@shared_task(bind=True, max_retries=3, default_retry_delay=30)
def generate_chapter_images(self, chapter_id: str):
"""Async task to generate images for a chapter's cover and sections (each section has at most one image)."""
"""异步补图:处理封面 MemoirImage 与历史遗留的段落配图pending/failed"""
lock_acquired = False
provider = None
with get_sync_db() as db:

View File

@@ -0,0 +1,297 @@
"""
Story 主插图生成 Celery 任务。
从 story_image_intents 原子 claim intent生成图片写入 assets更新 intent。
不读取正文占位符。
"""
import hashlib
import uuid
from datetime import datetime, timedelta, timezone
from celery import shared_task
from PIL import Image
from sqlalchemy import and_, func, or_, select, update
from app.core.db import get_sync_db
from app.core.dependencies import get_image_generator
from app.core.redis_lock import acquire_redis_lock, release_redis_lock
from app.features.asset.models import Asset
from app.features.memoir.memoir_images.storage import TencentCosStorageService
from app.features.story.backfill import backfill_image_into_markdown
from app.features.story.models import Story, StoryImageIntent, StoryVersion
from app.ports.image_gen import TaskStatus
from app.core.logging import get_logger
logger = get_logger(__name__)
STORY_IMAGE_LOCK_TTL_SECONDS = 1800
STORY_IMAGE_CLAIM_TTL_SECONDS = 1800
def _build_story_image_cos_key(
user_id: str, story_id: str, intent_id: str, prompt: str
) -> str:
short_hash = hashlib.sha1(prompt.encode("utf-8")).hexdigest()[:10]
return f"stories/{user_id}/{story_id}/{intent_id}-{short_hash}.png"
def _normalize_image_bytes(image_bytes: bytes) -> bytes:
from io import BytesIO
with Image.open(BytesIO(image_bytes)) as image:
output = BytesIO()
if image.mode in {"RGBA", "LA"}:
normalized = image
elif image.mode == "P":
normalized = image.convert("RGBA")
else:
normalized = image.convert("RGB")
normalized.save(output, format="PNG")
return output.getvalue()
def _build_story_image_prompt(
prompt_brief: str,
story_title: str = "",
story_stage: str | None = None,
style_profile: str | None = None,
) -> str:
"""从 intent.prompt_brief 构建出图 prompt。"""
from app.agents.memoir.prompts import IMAGE_PLACEHOLDER_TEMPLATE
base = IMAGE_PLACEHOLDER_TEMPLATE
if prompt_brief and prompt_brief.strip():
return f"{base}{prompt_brief.strip()}"
fallback = "".join(filter(None, [story_title, story_stage or ""])) or "人生故事"
return f"{base}{fallback}"
def _story_image_claimable_clause(now: datetime):
cutoff = now - timedelta(seconds=STORY_IMAGE_CLAIM_TTL_SECONDS)
return or_(
StoryImageIntent.status.in_(["pending", "failed"]),
and_(
StoryImageIntent.status == "processing",
or_(
StoryImageIntent.claimed_at.is_(None),
StoryImageIntent.claimed_at < cutoff,
),
),
)
def _claim_story_image_intent_sync(db, story_id: str, claim_token: str):
now = datetime.now(timezone.utc)
claimable = _story_image_claimable_clause(now)
candidate_id = db.execute(
select(StoryImageIntent.id)
.where(StoryImageIntent.story_id == story_id)
.where(StoryImageIntent.intent_role == "primary")
.where(claimable)
.order_by(
StoryImageIntent.updated_at.desc(), StoryImageIntent.created_at.desc()
)
.limit(1)
).scalar_one_or_none()
if not candidate_id:
return None
claimed = db.execute(
update(StoryImageIntent)
.where(StoryImageIntent.id == candidate_id)
.where(_story_image_claimable_clause(now))
.values(
status="processing",
claim_token=claim_token,
claimed_at=now,
updated_at=now,
error=None,
attempt_count=func.coalesce(StoryImageIntent.attempt_count, 0) + 1,
)
)
if (claimed.rowcount or 0) != 1:
db.rollback()
return None
row = (
db.execute(
select(StoryImageIntent, Story)
.join(Story, StoryImageIntent.story_id == Story.id)
.where(StoryImageIntent.id == candidate_id)
)
.unique()
.first()
)
db.commit()
return row
@shared_task(bind=True, max_retries=3, default_retry_delay=30)
def generate_story_image(self, story_id: str):
"""
为 story 生成主插图。
从 story_image_intents 原子认领 primary intent生成后写入 assets 并更新 intent。
"""
lock_key = f"lock:story-image:{story_id}"
lock_handle = acquire_redis_lock(lock_key, ttl_seconds=STORY_IMAGE_LOCK_TTL_SECONDS)
if lock_handle is None:
logger.info("generate_story_image: story=%s, reason=locked", story_id)
return {"status": "locked"}
claim_token = uuid.uuid4().hex
intent = None
story = None
try:
with get_sync_db() as db:
row = _claim_story_image_intent_sync(db, story_id, claim_token)
if not row:
logger.info(
"generate_story_image: story=%s, reason=no_claimable_intent",
story_id,
)
return {"status": "no_intent"}
intent, story = row
image_generator = get_image_generator()
storage = TencentCosStorageService.from_env()
from app.features.memoir.memoir_images.settings import MemoirImageSettings
settings = MemoirImageSettings.from_env()
prompt_final = _build_story_image_prompt(
intent.prompt_brief or "",
story_title=story.title or "",
story_stage=story.stage,
style_profile=intent.style_profile or settings.default_style,
)
result = image_generator.generate(
prompt_final,
settings.default_size,
intent.style_profile or settings.default_style,
)
if result.status != TaskStatus.COMPLETED or not result.image_url:
raise RuntimeError(result.error or "Image generation failed")
image_bytes = _normalize_image_bytes(
image_generator.download_image(result.image_url)
)
cos_key = _build_story_image_cos_key(
story.user_id, story_id, intent.id, prompt_final
)
url = storage.upload_bytes(image_bytes, cos_key, "image/png")
asset_id = str(uuid.uuid4())
with get_sync_db() as db:
intent_db = db.get(StoryImageIntent, intent.id)
if (
not intent_db
or (intent_db.status or "").strip() != "processing"
or (intent_db.claim_token or "").strip() != claim_token
):
logger.info(
"generate_story_image: skip persist intent=%s status=%s claim=%s",
intent.id,
getattr(intent_db, "status", None),
getattr(intent_db, "claim_token", None),
)
return {"status": "superseded_or_cancelled"}
asset = Asset(
id=asset_id,
asset_type="story_image",
storage_key=cos_key,
url=url,
provider=settings.provider,
style_profile=intent.style_profile or settings.default_style,
prompt_final=prompt_final,
status="completed",
)
db.add(asset)
db.flush()
story_db = db.get(Story, story_id)
target_vid = intent_db.story_version_id or story_db.current_version_id
current_vid = story_db.current_version_id
intent_db.asset_id = asset_id
intent_db.status = "completed"
intent_db.claim_token = None
intent_db.claimed_at = None
intent_db.error = None
intent_db.updated_at = datetime.now(timezone.utc)
db.flush()
# 仅当 intent 仍指向当前版本时回填正文,避免慢任务/重试把图插到新版本上
if not target_vid or target_vid != current_vid:
db.commit()
logger.info(
"generate_story_image: stale intent skip backfill story=%s "
"intent_ver=%s current=%s",
story_id,
target_vid,
current_vid,
)
return {"status": "success_stale", "asset_id": asset_id}
ver = db.get(StoryVersion, target_vid)
if not ver:
db.commit()
return {"status": "success_no_snapshot", "asset_id": asset_id}
base_md = ver.markdown_snapshot or ""
backfilled_md = backfill_image_into_markdown(
base_md,
asset_id=asset_id,
caption=intent_db.caption or "主插图",
source_span=intent_db.source_span,
)
max_stmt = select(func.max(StoryVersion.version_no)).where(
StoryVersion.story_id == story_id
)
max_no = db.execute(max_stmt).scalar()
version_no = (max_no or 0) + 1
new_ver = StoryVersion(
id=str(uuid.uuid4()),
story_id=story_id,
version_no=version_no,
markdown_snapshot=backfilled_md,
change_summary="主插图回填",
actor_type="system",
source_type="image_backfill",
parent_version_id=story_db.current_version_id,
)
db.add(new_ver)
db.flush()
story_db.current_version_id = new_ver.id
story_db.canonical_markdown = backfilled_md
db.commit()
logger.info(
"generate_story_image: story=%s, asset=%s, url=%s",
story_id,
asset_id,
url,
)
return {"status": "success", "asset_id": asset_id}
except Exception as exc:
if intent is not None:
with get_sync_db() as db:
intent_db = db.get(StoryImageIntent, intent.id)
if (
intent_db
and (intent_db.status or "").strip() != "completed"
and (intent_db.claim_token or "").strip() == claim_token
):
intent_db.status = "failed"
intent_db.claim_token = None
intent_db.claimed_at = None
intent_db.error = str(exc)
intent_db.updated_at = datetime.now(timezone.utc)
db.commit()
logger.warning("generate_story_image failed: story=%s, error=%s", story_id, exc)
raise self.retry(exc=exc)
finally:
release_redis_lock(lock_handle)