配置 SSOT(TOML + .env) 统一错误契约 Auth 与事务边界 Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client 可观测性(OpenTelemetry + LGTM)
283 lines
9.8 KiB
Python
283 lines
9.8 KiB
Python
"""
|
||
StoryService — Story 层业务逻辑。
|
||
|
||
- 创建 story、版本、evidence 关联
|
||
- 不直接依赖 agent,由 orchestrator 调用
|
||
- story 正文生成后提取 primary image intent 并落库
|
||
"""
|
||
|
||
from datetime import datetime, timezone
|
||
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from app.core.db import transactional
|
||
from app.core.errors import NotFoundError
|
||
from app.core.logging import get_logger
|
||
from app.features.memoir import repo as memoir_repo
|
||
from app.features.memoir.asset_resolver import strip_asset_image_refs_from_markdown
|
||
from app.features.memoir.memoir_images.settings import MemoirImageSettings
|
||
from app.features.story.image_intent_extractor import extract_primary_image_intent
|
||
from app.features.story.repo import (
|
||
count_story_versions,
|
||
create_story,
|
||
create_story_evidence_link,
|
||
create_story_image_intent,
|
||
create_story_version,
|
||
delete_story_image_intents_by_story,
|
||
get_stories_for_user,
|
||
get_story_by_id,
|
||
get_story_image_intent_by_story,
|
||
)
|
||
from app.features.story.time_hints import apply_infer_story_time_start_to_model
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
async def _extract_and_store_image_intent(
|
||
db,
|
||
*,
|
||
story,
|
||
version,
|
||
markdown: str,
|
||
) -> None:
|
||
"""
|
||
从 markdown 提取 primary intent。
|
||
仅移除 pending/failed,避免删掉正在 processing 的旧任务行;同版本则原地更新行以幂等。
|
||
"""
|
||
img_settings = MemoirImageSettings.from_env()
|
||
plain = strip_asset_image_refs_from_markdown(markdown or "").strip()
|
||
min_chars = img_settings.story_image_min_body_chars
|
||
if min_chars > 0 and len(plain) < min_chars:
|
||
await delete_story_image_intents_by_story(
|
||
db, story.id, statuses=["pending", "failed"]
|
||
)
|
||
logger.debug(
|
||
"story image intent skipped: body below min chars story={} len={} min={}",
|
||
story.id,
|
||
len(plain),
|
||
min_chars,
|
||
)
|
||
return
|
||
|
||
await delete_story_image_intents_by_story(
|
||
db, story.id, statuses=["pending", "failed"]
|
||
)
|
||
result = extract_primary_image_intent(
|
||
markdown,
|
||
title=story.title or "",
|
||
stage=story.stage,
|
||
summary=story.summary,
|
||
people_refs=story.people_refs or [],
|
||
place_refs=story.place_refs or [],
|
||
time_start=story.time_start,
|
||
time_end=story.time_end,
|
||
)
|
||
existing = await get_story_image_intent_by_story(db, story.id, role="primary")
|
||
now = datetime.now(timezone.utc)
|
||
|
||
if existing and existing.story_version_id == version.id:
|
||
st = (existing.status or "").strip()
|
||
if st in ("processing", "completed"):
|
||
return
|
||
existing.caption = result.caption
|
||
existing.prompt_brief = result.prompt_brief
|
||
existing.style_profile = result.style_profile
|
||
existing.status = "pending"
|
||
existing.error = None
|
||
existing.asset_id = None
|
||
existing.updated_at = now
|
||
return
|
||
|
||
if existing and existing.story_version_id != version.id:
|
||
# 复用同一主键行,避免删行导致进行中的 Celery 任务找不到 intent
|
||
existing.story_version_id = version.id
|
||
existing.caption = result.caption
|
||
existing.prompt_brief = result.prompt_brief
|
||
existing.style_profile = result.style_profile
|
||
existing.status = "pending"
|
||
existing.error = None
|
||
existing.asset_id = None
|
||
existing.updated_at = now
|
||
return
|
||
|
||
await create_story_image_intent(
|
||
db,
|
||
story_id=story.id,
|
||
story_version_id=version.id,
|
||
caption=result.caption,
|
||
prompt_brief=result.prompt_brief,
|
||
style_profile=result.style_profile,
|
||
)
|
||
|
||
|
||
class StoryService:
|
||
def __init__(self, db: AsyncSession):
|
||
self._db = db
|
||
|
||
async def create_story(
|
||
self,
|
||
user_id: str,
|
||
title: str,
|
||
*,
|
||
stage: str | None = None,
|
||
story_type: str | None = None,
|
||
summary: str | None = None,
|
||
canonical_markdown: str | None = None,
|
||
) -> str:
|
||
"""Create story, commit, return story_id."""
|
||
md = strip_asset_image_refs_from_markdown(canonical_markdown or "")
|
||
async with transactional(self._db):
|
||
story = await create_story(
|
||
self._db,
|
||
user_id=user_id,
|
||
title=title,
|
||
stage=stage,
|
||
story_type=story_type,
|
||
summary=summary,
|
||
canonical_markdown=md,
|
||
)
|
||
await self._db.flush()
|
||
apply_infer_story_time_start_to_model(story)
|
||
if md.strip():
|
||
version = await create_story_version(
|
||
self._db,
|
||
story_id=story.id,
|
||
version_no=1,
|
||
markdown_snapshot=md,
|
||
actor_type="ai",
|
||
source_type="generate",
|
||
)
|
||
await self._db.flush()
|
||
story.current_version_id = version.id
|
||
await _extract_and_store_image_intent(
|
||
self._db,
|
||
story=story,
|
||
version=version,
|
||
markdown=md,
|
||
)
|
||
if md.strip():
|
||
await memoir_repo.mark_chapters_dirty_for_story(self._db, story.id)
|
||
story_id = story.id
|
||
if md.strip():
|
||
from app.features.memoir.repo import get_chapter_ids_linked_to_story
|
||
from app.features.story.post_commit import enqueue_story_post_commit_effects
|
||
|
||
chapter_ids = set(await get_chapter_ids_linked_to_story(self._db, story_id))
|
||
pc = enqueue_story_post_commit_effects(
|
||
user_id=user_id,
|
||
story_ids={story_id},
|
||
chapter_ids=chapter_ids,
|
||
trigger_source="manual_api",
|
||
need_compaction=False,
|
||
)
|
||
logger.info(
|
||
"event=story_post_commit user_id={} trigger=manual_api "
|
||
"enqueued_story_image_count={} enqueued_chapter_recompose_count={} "
|
||
"errors={}",
|
||
user_id,
|
||
pc.enqueued_story_image_count,
|
||
pc.enqueued_chapter_recompose_count,
|
||
pc.errors,
|
||
)
|
||
return story_id
|
||
|
||
async def append_version(
|
||
self,
|
||
story_id: str,
|
||
markdown_snapshot: str,
|
||
*,
|
||
actor_type: str = "ai",
|
||
source_type: str = "generate",
|
||
change_summary: str | None = None,
|
||
prompt_meta: dict | None = None,
|
||
) -> str:
|
||
"""Append new version, update canonical_markdown, return version_id."""
|
||
story = await get_story_by_id(self._db, story_id)
|
||
if not story:
|
||
raise NotFoundError(f"Story {story_id} not found")
|
||
md = strip_asset_image_refs_from_markdown(markdown_snapshot or "")
|
||
parent_id = story.current_version_id
|
||
version_no = (await count_story_versions(self._db, story_id)) + 1
|
||
async with transactional(self._db):
|
||
version = await create_story_version(
|
||
self._db,
|
||
story_id=story_id,
|
||
version_no=version_no,
|
||
markdown_snapshot=md,
|
||
actor_type=actor_type,
|
||
source_type=source_type,
|
||
parent_version_id=parent_id,
|
||
prompt_meta=prompt_meta,
|
||
)
|
||
version.change_summary = change_summary
|
||
story.current_version_id = version.id
|
||
story.canonical_markdown = md
|
||
apply_infer_story_time_start_to_model(story)
|
||
await _extract_and_store_image_intent(
|
||
self._db,
|
||
story=story,
|
||
version=version,
|
||
markdown=md,
|
||
)
|
||
await memoir_repo.mark_chapters_dirty_for_story(self._db, story_id)
|
||
version_id = version.id
|
||
from app.features.memoir.repo import get_chapter_ids_linked_to_story
|
||
from app.features.story.post_commit import enqueue_story_post_commit_effects
|
||
|
||
chapter_ids = set(await get_chapter_ids_linked_to_story(self._db, story_id))
|
||
pc = enqueue_story_post_commit_effects(
|
||
user_id=story.user_id,
|
||
story_ids={story_id},
|
||
chapter_ids=chapter_ids,
|
||
trigger_source="manual_api",
|
||
need_compaction=False,
|
||
)
|
||
logger.info(
|
||
"event=story_post_commit user_id={} trigger=manual_api_append "
|
||
"enqueued_story_image_count={} enqueued_chapter_recompose_count={} errors={}",
|
||
story.user_id,
|
||
pc.enqueued_story_image_count,
|
||
pc.enqueued_chapter_recompose_count,
|
||
pc.errors,
|
||
)
|
||
return version_id
|
||
|
||
async def link_evidence(
|
||
self,
|
||
story_id: str,
|
||
evidence_type: str,
|
||
evidence_id: str,
|
||
*,
|
||
role: str = "primary",
|
||
weight: float | None = None,
|
||
) -> None:
|
||
"""Add evidence link. Caller must ensure story exists."""
|
||
async with transactional(self._db):
|
||
await create_story_evidence_link(
|
||
self._db,
|
||
story_id=story_id,
|
||
evidence_type=evidence_type,
|
||
evidence_id=evidence_id,
|
||
role=role,
|
||
weight=weight,
|
||
)
|
||
|
||
async def get_stories(
|
||
self, user_id: str, *, status: str | None = "active"
|
||
) -> list[dict]:
|
||
"""List stories for user."""
|
||
stories = await get_stories_for_user(self._db, user_id, status=status)
|
||
return [
|
||
{
|
||
"id": s.id,
|
||
"title": s.title,
|
||
"stage": s.stage,
|
||
"story_type": s.story_type,
|
||
"summary": s.summary,
|
||
"canonical_markdown": s.canonical_markdown,
|
||
"status": s.status,
|
||
"created_at": s.created_at.isoformat() if s.created_at else None,
|
||
}
|
||
for s in stories
|
||
]
|