Files
life-echo/api/app/features/story/post_commit.py
Kevin 064ad2161d refactor(eval+memoir):精简内部评测路由与服务,composite/对话摘要与 judge 能力补强
- 访谈:新增 interview_state_hints,联动 orchestrator 与提示词
- 回忆录:story_pipeline_sync/state/memory/post_commit 与 Celery 任务调整
- 基建:开发用 celery broker、compose/development 脚本、依赖注入
- eval-web:移除数据集/实验/版本等页面与流式轮询,突出 Playground
- 文档与单测同步
2026-04-08 21:36:12 +08:00

175 lines
6.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Story 写入提交后的统一异步调度:插图、章节物化、可选 memory compaction。
enqueue 失败不回滚已提交数据,仅记录日志;依赖后续触发或重试收敛。
"""
from __future__ import annotations
import threading
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, cast
import redis
from app.core.config import settings
from app.core.logging import get_logger
from app.core.memory_compaction_schedule import schedule_memory_compaction_run
logger = get_logger(__name__)
_redis_client: redis.Redis | None = None
_redis_lock = threading.Lock()
def _story_image_enqueue_key(story_id: str) -> str:
return f"enqueue:story-image:{story_id}"
def _get_redis() -> redis.Redis:
"""进程内复用单个 Redis 客户端,避免重复创建连接池。"""
global _redis_client
if _redis_client is None:
with _redis_lock:
if _redis_client is None:
_redis_client = redis.from_url(
settings.redis_url, decode_responses=True
)
return _redis_client
@dataclass
class PostCommitResult:
enqueued_story_image_count: int = 0
enqueued_chapter_recompose_count: int = 0
compaction_scheduled: bool = False
quality_pass_scheduled: bool = False
errors: list[str] = field(default_factory=list)
def enqueue_story_post_commit_effects(
*,
user_id: str,
story_ids: set[str],
chapter_ids: set[str],
trigger_source: str,
need_image: bool = True,
need_recompose: bool = True,
need_compaction: bool = False,
need_quality_pass: bool = False,
compaction_extra: dict[str, Any] | None = None,
memoir_correlation_id: str | None = None,
) -> PostCommitResult:
"""
Unified post-commit fan-out: story images, chapter recompose, compaction,
and quality pass. story_ids 为空则跳过 imagechapter_ids 为空则跳过 recompose。
need_compaction=True 时仅按 user_id 调度 compaction不依赖 story/chapter 集合)。
"""
result = PostCommitResult()
r = _get_redis()
ttl = int(settings.story_image_enqueue_dedup_ttl)
if need_image and story_ids:
from app.tasks.story_image_tasks import (
generate_story_image as gen_story_image_task,
)
for sid in sorted(story_ids):
key = _story_image_enqueue_key(sid)
try:
if not r.set(key, "1", nx=True, ex=ttl):
logger.debug(
"story_image enqueue skipped (dedup): story={} trigger={}",
sid,
trigger_source,
)
continue
except Exception as exc:
logger.warning(
"story_image enqueue dedup redis failed, allowing enqueue: "
"story={} err={}",
sid,
exc,
)
try:
cast(Any, gen_story_image_task).delay(sid)
result.enqueued_story_image_count += 1
except Exception as exc:
logger.warning(
"generate_story_image.delay failed story={} trigger={}: {}",
sid,
trigger_source,
exc,
)
result.errors.append(f"generate_story_image:{sid}:{exc}")
try:
r.delete(key)
except Exception as e:
logger.debug("Redis key 清理失败: {}", e)
if need_recompose and chapter_ids:
from app.tasks.chapter_compose_tasks import (
recompose_chapter as recompose_chapter_task,
)
cd = int(settings.recompose_chapter_delay_seconds)
for cid in sorted(chapter_ids):
try:
cast(Any, recompose_chapter_task).apply_async(
args=[cid], countdown=max(0, cd)
)
result.enqueued_chapter_recompose_count += 1
except Exception as exc:
logger.warning(
"recompose_chapter.apply_async failed chapter={} trigger={}: {}",
cid,
trigger_source,
exc,
)
result.errors.append(f"recompose_chapter:{cid}:{exc}")
if need_compaction:
try:
ctx: dict[str, Any] = {
"trigger_source": trigger_source,
"trigger_time": datetime.now(timezone.utc).isoformat(),
"story_ids": sorted(story_ids),
"chapter_ids": sorted(chapter_ids),
}
if compaction_extra:
ctx.update(compaction_extra)
schedule_memory_compaction_run(user_id, ctx)
result.compaction_scheduled = True
except Exception as exc:
logger.warning(
"schedule_memory_compaction_run failed user_id={} trigger={}: {}",
user_id,
trigger_source,
exc,
)
result.errors.append(f"compaction:{exc}")
if need_quality_pass and settings.memoir_quality_pass_enabled and story_ids:
try:
from app.tasks.memoir_quality_pass_tasks import (
memoir_quality_pass as quality_pass_task,
)
cd = int(settings.memoir_quality_pass_delay_seconds)
cast(Any, quality_pass_task).apply_async(
args=[user_id, sorted(story_ids), sorted(chapter_ids)],
kwargs={"memoir_correlation_id": memoir_correlation_id},
countdown=max(0, cd),
)
result.quality_pass_scheduled = True
except Exception as exc:
logger.warning(
"memoir_quality_pass enqueue failed user_id={} trigger={}: {}",
user_id,
trigger_source,
exc,
)
result.errors.append(f"quality_pass:{exc}")
return result