Files
life-echo/api/app/features/story/post_commit.py

149 lines
4.8 KiB
Python
Raw Normal View History

"""
Story 写入提交后的统一异步调度插图章节物化可选 memory compaction
enqueue 失败不回滚已提交数据仅记录日志依赖后续触发或重试收敛
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timezone
import threading
from typing import Any, cast
import redis
from app.core.config import settings
from app.core.logging import get_logger
from app.core.memory_compaction_schedule import schedule_memory_compaction_run
logger = get_logger(__name__)
_redis_client: redis.Redis | None = None
_redis_lock = threading.Lock()
def _story_image_enqueue_key(story_id: str) -> str:
return f"enqueue:story-image:{story_id}"
def _get_redis() -> redis.Redis:
"""进程内复用单个 Redis 客户端,避免重复创建连接池。"""
global _redis_client
if _redis_client is None:
with _redis_lock:
if _redis_client is None:
_redis_client = redis.from_url(
settings.redis_url, decode_responses=True
)
return _redis_client
@dataclass
class PostCommitResult:
enqueued_story_image_count: int = 0
enqueued_chapter_recompose_count: int = 0
compaction_scheduled: bool = False
errors: list[str] = field(default_factory=list)
def enqueue_story_post_commit_effects(
*,
user_id: str,
story_ids: set[str],
chapter_ids: set[str],
trigger_source: str,
need_image: bool = True,
need_recompose: bool = True,
need_compaction: bool = False,
compaction_extra: dict[str, Any] | None = None,
) -> PostCommitResult:
"""
story_ids 为空则跳过 imagechapter_ids 为空则跳过 recompose
need_compaction=True 时仅按 user_id 调度 compaction不依赖 story/chapter 集合
"""
result = PostCommitResult()
r = _get_redis()
ttl = int(settings.story_image_enqueue_dedup_ttl)
if need_image and story_ids:
from app.tasks.story_image_tasks import (
generate_story_image as gen_story_image_task,
)
for sid in sorted(story_ids):
key = _story_image_enqueue_key(sid)
try:
if not r.set(key, "1", nx=True, ex=ttl):
logger.debug(
"story_image enqueue skipped (dedup): story={} trigger={}",
sid,
trigger_source,
)
continue
except Exception as exc:
logger.warning(
"story_image enqueue dedup redis failed, allowing enqueue: "
"story={} err={}",
sid,
exc,
)
try:
cast(Any, gen_story_image_task).delay(sid)
result.enqueued_story_image_count += 1
except Exception as exc:
logger.warning(
"generate_story_image.delay failed story={} trigger={}: {}",
sid,
trigger_source,
exc,
)
result.errors.append(f"generate_story_image:{sid}:{exc}")
try:
r.delete(key)
except Exception:
pass
if need_recompose and chapter_ids:
from app.tasks.chapter_compose_tasks import (
recompose_chapter as recompose_chapter_task,
)
cd = int(settings.recompose_chapter_delay_seconds)
for cid in sorted(chapter_ids):
try:
cast(Any, recompose_chapter_task).apply_async(
args=[cid], countdown=max(0, cd)
)
result.enqueued_chapter_recompose_count += 1
except Exception as exc:
logger.warning(
"recompose_chapter.apply_async failed chapter={} trigger={}: {}",
cid,
trigger_source,
exc,
)
result.errors.append(f"recompose_chapter:{cid}:{exc}")
if need_compaction:
try:
ctx: dict[str, Any] = {
"trigger_source": trigger_source,
"trigger_time": datetime.now(timezone.utc).isoformat(),
"story_ids": sorted(story_ids),
"chapter_ids": sorted(chapter_ids),
}
if compaction_extra:
ctx.update(compaction_extra)
schedule_memory_compaction_run(user_id, ctx)
result.compaction_scheduled = True
except Exception as exc:
logger.warning(
"schedule_memory_compaction_run failed user_id={} trigger={}: {}",
user_id,
trigger_source,
exc,
)
result.errors.append(f"compaction:{exc}")
return result