Files
life-echo/api/app/features/story/service.py
Sully 53e0065e3e refactor(api): TOML 配置 SSOT、统一错误契约、Auth/事务加固与可观测性 (#33)
配置 SSOT(TOML + .env)
统一错误契约
Auth 与事务边界
Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client
可观测性(OpenTelemetry + LGTM)
2026-05-22 13:44:50 +08:00

283 lines
9.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
StoryService — Story 层业务逻辑。
- 创建 story、版本、evidence 关联
- 不直接依赖 agent由 orchestrator 调用
- story 正文生成后提取 primary image intent 并落库
"""
from datetime import datetime, timezone
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.db import transactional
from app.core.errors import NotFoundError
from app.core.logging import get_logger
from app.features.memoir import repo as memoir_repo
from app.features.memoir.asset_resolver import strip_asset_image_refs_from_markdown
from app.features.memoir.memoir_images.settings import MemoirImageSettings
from app.features.story.image_intent_extractor import extract_primary_image_intent
from app.features.story.repo import (
count_story_versions,
create_story,
create_story_evidence_link,
create_story_image_intent,
create_story_version,
delete_story_image_intents_by_story,
get_stories_for_user,
get_story_by_id,
get_story_image_intent_by_story,
)
from app.features.story.time_hints import apply_infer_story_time_start_to_model
logger = get_logger(__name__)
async def _extract_and_store_image_intent(
db,
*,
story,
version,
markdown: str,
) -> None:
"""
从 markdown 提取 primary intent。
仅移除 pending/failed避免删掉正在 processing 的旧任务行;同版本则原地更新行以幂等。
"""
img_settings = MemoirImageSettings.from_env()
plain = strip_asset_image_refs_from_markdown(markdown or "").strip()
min_chars = img_settings.story_image_min_body_chars
if min_chars > 0 and len(plain) < min_chars:
await delete_story_image_intents_by_story(
db, story.id, statuses=["pending", "failed"]
)
logger.debug(
"story image intent skipped: body below min chars story={} len={} min={}",
story.id,
len(plain),
min_chars,
)
return
await delete_story_image_intents_by_story(
db, story.id, statuses=["pending", "failed"]
)
result = extract_primary_image_intent(
markdown,
title=story.title or "",
stage=story.stage,
summary=story.summary,
people_refs=story.people_refs or [],
place_refs=story.place_refs or [],
time_start=story.time_start,
time_end=story.time_end,
)
existing = await get_story_image_intent_by_story(db, story.id, role="primary")
now = datetime.now(timezone.utc)
if existing and existing.story_version_id == version.id:
st = (existing.status or "").strip()
if st in ("processing", "completed"):
return
existing.caption = result.caption
existing.prompt_brief = result.prompt_brief
existing.style_profile = result.style_profile
existing.status = "pending"
existing.error = None
existing.asset_id = None
existing.updated_at = now
return
if existing and existing.story_version_id != version.id:
# 复用同一主键行,避免删行导致进行中的 Celery 任务找不到 intent
existing.story_version_id = version.id
existing.caption = result.caption
existing.prompt_brief = result.prompt_brief
existing.style_profile = result.style_profile
existing.status = "pending"
existing.error = None
existing.asset_id = None
existing.updated_at = now
return
await create_story_image_intent(
db,
story_id=story.id,
story_version_id=version.id,
caption=result.caption,
prompt_brief=result.prompt_brief,
style_profile=result.style_profile,
)
class StoryService:
def __init__(self, db: AsyncSession):
self._db = db
async def create_story(
self,
user_id: str,
title: str,
*,
stage: str | None = None,
story_type: str | None = None,
summary: str | None = None,
canonical_markdown: str | None = None,
) -> str:
"""Create story, commit, return story_id."""
md = strip_asset_image_refs_from_markdown(canonical_markdown or "")
async with transactional(self._db):
story = await create_story(
self._db,
user_id=user_id,
title=title,
stage=stage,
story_type=story_type,
summary=summary,
canonical_markdown=md,
)
await self._db.flush()
apply_infer_story_time_start_to_model(story)
if md.strip():
version = await create_story_version(
self._db,
story_id=story.id,
version_no=1,
markdown_snapshot=md,
actor_type="ai",
source_type="generate",
)
await self._db.flush()
story.current_version_id = version.id
await _extract_and_store_image_intent(
self._db,
story=story,
version=version,
markdown=md,
)
if md.strip():
await memoir_repo.mark_chapters_dirty_for_story(self._db, story.id)
story_id = story.id
if md.strip():
from app.features.memoir.repo import get_chapter_ids_linked_to_story
from app.features.story.post_commit import enqueue_story_post_commit_effects
chapter_ids = set(await get_chapter_ids_linked_to_story(self._db, story_id))
pc = enqueue_story_post_commit_effects(
user_id=user_id,
story_ids={story_id},
chapter_ids=chapter_ids,
trigger_source="manual_api",
need_compaction=False,
)
logger.info(
"event=story_post_commit user_id={} trigger=manual_api "
"enqueued_story_image_count={} enqueued_chapter_recompose_count={} "
"errors={}",
user_id,
pc.enqueued_story_image_count,
pc.enqueued_chapter_recompose_count,
pc.errors,
)
return story_id
async def append_version(
self,
story_id: str,
markdown_snapshot: str,
*,
actor_type: str = "ai",
source_type: str = "generate",
change_summary: str | None = None,
prompt_meta: dict | None = None,
) -> str:
"""Append new version, update canonical_markdown, return version_id."""
story = await get_story_by_id(self._db, story_id)
if not story:
raise NotFoundError(f"Story {story_id} not found")
md = strip_asset_image_refs_from_markdown(markdown_snapshot or "")
parent_id = story.current_version_id
version_no = (await count_story_versions(self._db, story_id)) + 1
async with transactional(self._db):
version = await create_story_version(
self._db,
story_id=story_id,
version_no=version_no,
markdown_snapshot=md,
actor_type=actor_type,
source_type=source_type,
parent_version_id=parent_id,
prompt_meta=prompt_meta,
)
version.change_summary = change_summary
story.current_version_id = version.id
story.canonical_markdown = md
apply_infer_story_time_start_to_model(story)
await _extract_and_store_image_intent(
self._db,
story=story,
version=version,
markdown=md,
)
await memoir_repo.mark_chapters_dirty_for_story(self._db, story_id)
version_id = version.id
from app.features.memoir.repo import get_chapter_ids_linked_to_story
from app.features.story.post_commit import enqueue_story_post_commit_effects
chapter_ids = set(await get_chapter_ids_linked_to_story(self._db, story_id))
pc = enqueue_story_post_commit_effects(
user_id=story.user_id,
story_ids={story_id},
chapter_ids=chapter_ids,
trigger_source="manual_api",
need_compaction=False,
)
logger.info(
"event=story_post_commit user_id={} trigger=manual_api_append "
"enqueued_story_image_count={} enqueued_chapter_recompose_count={} errors={}",
story.user_id,
pc.enqueued_story_image_count,
pc.enqueued_chapter_recompose_count,
pc.errors,
)
return version_id
async def link_evidence(
self,
story_id: str,
evidence_type: str,
evidence_id: str,
*,
role: str = "primary",
weight: float | None = None,
) -> None:
"""Add evidence link. Caller must ensure story exists."""
async with transactional(self._db):
await create_story_evidence_link(
self._db,
story_id=story_id,
evidence_type=evidence_type,
evidence_id=evidence_id,
role=role,
weight=weight,
)
async def get_stories(
self, user_id: str, *, status: str | None = "active"
) -> list[dict]:
"""List stories for user."""
stories = await get_stories_for_user(self._db, user_id, status=status)
return [
{
"id": s.id,
"title": s.title,
"stage": s.stage,
"story_type": s.story_type,
"summary": s.summary,
"canonical_markdown": s.canonical_markdown,
"status": s.status,
"created_at": s.created_at.isoformat() if s.created_at else None,
}
for s in stories
]