""" 回忆录处理 Celery 任务 """ import asyncio import json import time import uuid from datetime import datetime, timedelta, timezone from typing import Dict, List, Set import redis from celery import shared_task from celery.exceptions import Retry from celery.result import AsyncResult from sqlalchemy import func, select from sqlalchemy.orm import Session from app.agents.chat.background_voice import infer_background_voice from app.agents.chat.prompts_profile import format_user_profile_context from app.agents.memoir import MemoirOrchestrator from app.agents.stage_constants import normalize_chapter_category from app.core.chapter_pipeline_lock import ( acquire_chapter_pipeline_lock as _acquire_chapter_lock, ) from app.core.chapter_pipeline_lock import ( release_chapter_pipeline_lock as _release_chapter_lock, ) from app.core.config import settings from app.core.db import AsyncSessionLocal, get_sync_db from app.core.dependencies import get_embedding_provider from app.core.llm_gateway import LlmGateway, LlmUseCase from app.core.logging import get_logger from app.core.memoir_pipeline_progress import ( init_pipeline_run_from_phase1, merge_pipeline_run, ) from app.core.memoir_pipeline_trace import ( effective_correlation_id, new_memoir_correlation_id, ) from app.features.conversation.models import Conversation, Segment from app.features.memoir.cover_eligibility import ( chapter_needs_cover_enqueue, ) from app.features.memoir.memoir_images.parser import ( build_initial_image_assets, ) from app.features.memoir.memoir_images.schema import ( IMAGE_STATUS_COMPLETED, IMAGE_STATUS_FAILED, IMAGE_STATUS_PENDING, normalize_image_assets, ) from app.features.memoir.memoir_images.serializers import ( image_dict_to_row_kwargs, ) from app.features.memoir.memoir_images.settings import MemoirImageSettings from app.features.memoir.models import ( Book, MemoirImage, ) from app.features.memoir.state_service import ( get_or_create_state_sync, update_slot_sync, ) from app.features.memoir.story_pipeline_sync import ( run_story_pipeline_for_category_batch, ) from app.features.memory.service import MemoryService from app.features.user.models import User from app.tasks.celery_app import celery_app logger = get_logger(__name__) _REDIS_CLIENTS: dict[bool, redis.Redis] = {} def _run_post_pipeline_commit( *, user_id: str, story_dispatch_ids: set[str], recompose_chapter_ids: set[str], cover_chapter_ids: set[str], trigger_source: str, need_compaction: bool, need_quality_pass: bool = False, memoir_correlation_id: str | None = None, compaction_extra: dict | None = None, ) -> None: """Shared post-commit dispatch: images, recompose, compaction, quality pass, covers.""" from app.features.story.post_commit import enqueue_story_post_commit_effects pc = enqueue_story_post_commit_effects( user_id=user_id, story_ids=set(story_dispatch_ids), chapter_ids=recompose_chapter_ids, trigger_source=trigger_source, need_compaction=need_compaction, need_quality_pass=need_quality_pass, memoir_correlation_id=memoir_correlation_id, compaction_extra=compaction_extra, ) logger.info( "event=story_post_commit user_id={} trigger={} " "enqueued_story_image_count={} enqueued_chapter_recompose_count={} " "compaction_scheduled={} quality_pass_scheduled={} errors={}", user_id, trigger_source, pc.enqueued_story_image_count, pc.enqueued_chapter_recompose_count, pc.compaction_scheduled, pc.quality_pass_scheduled, pc.errors, ) if cover_chapter_ids: image_settings = MemoirImageSettings.from_env() if image_settings.enabled: from app.tasks.chapter_cover_enqueue import ( try_enqueue_generate_chapter_cover, ) for ch_id in sorted(cover_chapter_ids): if try_enqueue_generate_chapter_cover(ch_id, source=trigger_source): logger.info("派发章节封面任务: chapter={}", ch_id) def _get_llm(): """Celery 任务内获取 LangChain LLM(通过 port)""" try: return LlmGateway().langchain_llm_for(LlmUseCase("memoir_tasks")) except Exception: return None def _get_llm_fast(): """分类 / 抽取等快档位任务(与叙事、路由默认模型可分离)。""" try: return LlmGateway().langchain_llm_for( LlmUseCase("memoir_tasks.fast", fast=True) ) except Exception: return None async def _memory_ingest_transcripts_batch( user_id: str, items: list[tuple[str, str, dict | None]], *, memoir_correlation_id: str, ) -> list[str]: async with AsyncSessionLocal() as db: service = MemoryService(db, embedding_provider=get_embedding_provider()) return await service.ingest_transcripts_batch( user_id, items, memoir_correlation_id=memoir_correlation_id, ) async def _memory_retrieve_evidence( user_id: str, query: str, *, top_k: int, ) -> dict: async with AsyncSessionLocal() as db: service = MemoryService(db, embedding_provider=get_embedding_provider()) bundle = await service.retrieve(user_id, query, top_k=top_k) return bundle.model_dump() def _get_redis_client(*, decode_responses: bool = False) -> redis.Redis: from app.core.config import settings client = _REDIS_CLIENTS.get(decode_responses) if client is None: client = redis.from_url( settings.redis_url, decode_responses=decode_responses, ) _REDIS_CLIENTS[decode_responses] = client return client def _chapter_lock_ttl() -> int: return int(settings.chapter_pipeline_lock_ttl_seconds) def _update_task_status_sync( user_id: str, task_id: str, status: str, result: Dict = None ): """同步更新任务状态(在 Celery 任务中使用)""" try: r = _get_redis_client(decode_responses=True) key = f"task:user:{user_id}:tasks" # 获取现有任务信息 data = r.hget(key, task_id) if data: task_info = json.loads(data) else: task_info = {"task_id": task_id} task_info["status"] = status task_info["updated_at"] = datetime.now(timezone.utc).isoformat() if result is not None: task_info["result"] = result r.hset(key, task_id, json.dumps(task_info)) r.expire(key, 3600) # 1小时过期 logger.debug("任务状态已更新: task_id={} status={}", task_id, status) except Exception as e: logger.error( "event=memoir_task_status_update_failed msg=更新任务状态失败 exc={}", e ) def _merge_chapter_image_assets( existing_images: list[dict] | None, placeholders: list[dict], provider: str, style: str, size: str, now_iso: str, ) -> list[dict]: normalized_existing_images = normalize_image_assets(existing_images) existing_by_placeholder = { item.get("placeholder"): dict(item) for item in normalized_existing_images if item.get("placeholder") } merged_assets: list[dict] = [] for item in placeholders: existing = existing_by_placeholder.get(item["placeholder"]) if existing: merged_item = dict(existing) merged_item["index"] = item["index"] merged_item["placeholder"] = item["placeholder"] merged_item["description"] = item["description"] merged_item["provider"] = merged_item.get("provider") or provider merged_item["style"] = merged_item.get("style") or style merged_item["size"] = merged_item.get("size") or size merged_item["created_at"] = merged_item.get("created_at") or now_iso merged_item["updated_at"] = merged_item.get("updated_at") or now_iso if merged_item.get("status") == IMAGE_STATUS_COMPLETED and not ( merged_item.get("storage_key") or merged_item.get("url") ): merged_item["status"] = IMAGE_STATUS_FAILED merged_item["error"] = merged_item.get("error") or "missing image url" else: merged_item = build_initial_image_assets( placeholders=[item], provider=provider, style=style, size=size, now_iso=now_iso, )[0] merged_assets.append(merged_item) return merged_assets def chapter_has_images_to_generate(images: list[dict] | None) -> bool: return any( item.get("status") in {IMAGE_STATUS_PENDING, IMAGE_STATUS_FAILED} for item in normalize_image_assets(images) ) def _memoir_image_from_asset( chapter_id: str, order_index: int, image_asset: dict, ) -> MemoirImage: """从单条图片 dict 构建 MemoirImage 行(用于写入 memoir_images 表)。""" kwargs = image_dict_to_row_kwargs(image_asset) return MemoirImage( id=str(uuid.uuid4()).replace("-", "")[:32], chapter_id=chapter_id, order_index=order_index, **kwargs, ) def _phase2_timeout_task_id(user_id: str, chapter_category: str) -> str: return f"phase2-timeout-{user_id}-{chapter_category}" def _revoke_phase2_timeout(user_id: str, chapter_category: str) -> None: tid = _phase2_timeout_task_id(user_id, chapter_category) try: AsyncResult(tid, app=celery_app).revoke(terminate=False) except Exception as e: logger.debug( "event=phase2_timeout_revoke_skipped task_id={} exc={}", tid, e, ) def _should_trigger_phase2( db: Session, user_id: str, chapter_category: str, current_segment_chars: int, ) -> bool: if current_segment_chars >= int(settings.memoir_narrative_immediate_char_threshold): return True user_convs = select(Conversation.id).where( Conversation.user_id == user_id, Conversation.deleted_at.is_(None), ) stmt = select( func.count(Segment.id), func.coalesce(func.sum(func.length(Segment.user_input_text)), 0), ).where( Segment.conversation_id.in_(user_convs), Segment.topic_category == chapter_category, Segment.narrated.is_(False), Segment.skip_narrative.is_(False), ) row = db.execute(stmt).one() count, total_chars = int(row[0] or 0), int(row[1] or 0) if count >= int(settings.memoir_narrative_batch_min_segments): return True if total_chars >= int(settings.memoir_narrative_batch_min_chars): return True return False def _phase2_immediate_task_id(user_id: str, chapter_category: str) -> str: return f"phase2-immediate-{user_id}-{chapter_category}" def _wake_deferred_segments_for_category( db: Session, user_id: str, chapter_category: str, ) -> int: """清空该用户某 chapter_category 下旧的 defer 元数据,让其与新素材一起重判。 返回被唤醒的 segment 数量,仅用于日志。 """ user_convs = select(Conversation.id).where( Conversation.user_id == user_id, Conversation.deleted_at.is_(None), ) stmt = select(Segment).where( Segment.conversation_id.in_(user_convs), Segment.topic_category == chapter_category, Segment.narrated.is_(False), Segment.skip_narrative.is_(False), Segment.narrative_deferred_until.isnot(None), ) rows = list(db.execute(stmt).scalars().all()) if not rows: return 0 for seg in rows: seg.narrative_deferred_until = None seg.narrative_defer_count = 0 seg.narrative_defer_reason = None return len(rows) def _persist_phase2_route_defer( db: Session, *, user_id: str, chapter_category: str, task_id: str, memoir_correlation_id: str | None, defer_segment_ids: list[str], defer_reason: str, phase2_started: float, pipeline_elapsed: float, lock_elapsed: float, ) -> dict: """把本批 segment 标记为延迟态,并按需再排一次 Phase2 timeout。 返回 Celery 任务的 result dict(``status=deferred``)。 """ now_ts = datetime.now(timezone.utc) max_attempts = int(settings.memoir_route_defer_max_attempts) defer_seconds = float(settings.memoir_route_defer_seconds) deferred_until_ts = now_ts + timedelta(seconds=max(defer_seconds, 1.0)) rows: list[Segment] = [] if defer_segment_ids: stmt = select(Segment).where(Segment.id.in_(list(defer_segment_ids))) rows = list(db.execute(stmt).scalars().all()) saturated_segments = 0 new_max_attempts_reached = False for seg in rows: prev_count = int(seg.narrative_defer_count or 0) seg.narrative_defer_count = prev_count + 1 seg.narrative_defer_reason = defer_reason seg.narrative_last_attempt_at = now_ts if seg.narrative_defer_count >= max_attempts: seg.narrative_deferred_until = None saturated_segments += 1 new_max_attempts_reached = True else: seg.narrative_deferred_until = deferred_until_ts db.commit() next_task_id: str | None = None if rows and not new_max_attempts_reached: next_task_id = _schedule_phase2_timeout( user_id, chapter_category, memoir_correlation_id ) phase2_elapsed = time.perf_counter() - phase2_started duration_ms = phase2_elapsed * 1000 logger.info( "event=memoir_phase2_route_deferred user_id={} task_id={} chapter_category={} " "segment_count={} saturated_count={} reason={} memoir_correlation_id={} " "lock_seconds={:.3f} pipeline_seconds={:.3f} " "phase2_total_seconds={:.3f} duration_ms={:.1f} next_task_id={} " "msg=Phase2 路由低置信,本批 segment 延迟", user_id, task_id, chapter_category, len(rows), saturated_segments, defer_reason, memoir_correlation_id or "", lock_elapsed, pipeline_elapsed, phase2_elapsed, duration_ms, next_task_id or "", ) merge_pipeline_run( memoir_correlation_id, { "phase2": [ { "chapter_category": chapter_category, "task_id": str(task_id), "status": "deferred", "detail": { "segments": len(rows), "reason": defer_reason, "saturated_count": saturated_segments, "next_task_id": next_task_id, }, } ], }, ) return { "status": "deferred", "chapter_category": chapter_category, "segments": len(rows), "reason": defer_reason, "saturated_count": saturated_segments, } def _schedule_phase2_timeout( user_id: str, chapter_category: str, memoir_correlation_id: str | None = None ) -> str | None: """Reset countdown for Phase 2 narrative for one category。返回 Celery task_id。""" _revoke_phase2_timeout(user_id, chapter_category) countdown = float(max(1.0, settings.memoir_narrative_batch_max_wait_seconds)) p2_kwargs: dict = {} if memoir_correlation_id: p2_kwargs["memoir_correlation_id"] = memoir_correlation_id timeout_tid = _phase2_timeout_task_id(user_id, chapter_category) celery_app.send_task( "app.tasks.memoir_tasks.process_memoir_phase2", args=[user_id, chapter_category], kwargs=p2_kwargs, countdown=countdown, task_id=timeout_tid, ) logger.info( "event=phase2_timeout_scheduled user_id={} chapter_category={} countdown={} " "memoir_correlation_id={}", user_id, chapter_category, countdown, memoir_correlation_id or "", ) return timeout_tid def _dispatch_phase2_immediate( user_id: str, chapter_category: str, memoir_correlation_id: str | None = None ) -> str | None: _revoke_phase2_timeout(user_id, chapter_category) p2_kwargs: dict = {} if memoir_correlation_id: p2_kwargs["memoir_correlation_id"] = memoir_correlation_id send_kw: dict = { "args": [user_id, chapter_category], "kwargs": p2_kwargs, } fixed_tid: str | None = None if settings.memoir_phase2_singleflight_immediate: fixed_tid = _phase2_immediate_task_id(user_id, chapter_category) send_kw["task_id"] = fixed_tid ar = celery_app.send_task("app.tasks.memoir_tasks.process_memoir_phase2", **send_kw) out_tid = fixed_tid or getattr(ar, "id", None) logger.info( "event=phase2_dispatched_immediate user_id={} chapter_category={} " "memoir_correlation_id={} task_id_mode={} celery_task_id={}", user_id, chapter_category, memoir_correlation_id or "", "singleflight" if settings.memoir_phase2_singleflight_immediate else "unique", out_tid or "", ) return out_tid def dispatch_pending_memoir_phase2_for_user(user_id: str) -> None: """会话结束等场景:为该用户所有待叙事类目各发一条 Phase2(幂等)。""" try: with get_sync_db() as db: user_convs = select(Conversation.id).where( Conversation.user_id == user_id, Conversation.deleted_at.is_(None), ) stmt = ( select(Segment.topic_category) .where( Segment.conversation_id.in_(user_convs), Segment.narrated.is_(False), Segment.skip_narrative.is_(False), Segment.topic_category.isnot(None), ) .distinct() ) cats = [r[0] for r in db.execute(stmt).all() if r[0]] for chapter_category in cats: _revoke_phase2_timeout(user_id, chapter_category) flush_cid = new_memoir_correlation_id() ar = celery_app.send_task( "app.tasks.memoir_tasks.process_memoir_phase2", args=[user_id, chapter_category], kwargs={"memoir_correlation_id": flush_cid}, ) p2tid = getattr(ar, "id", None) logger.info( "event=phase2_dispatched_flush user_id={} chapter_category={} " "memoir_correlation_id={} celery_task_id={}", user_id, chapter_category, flush_cid, p2tid or "", ) if p2tid and flush_cid: merge_pipeline_run( flush_cid, { "user_id": user_id, "phase2": [ { "chapter_category": chapter_category, "task_id": str(p2tid), "status": "enqueued", } ], }, ) except Exception as e: logger.error( "event=phase2_flush_failed user_id={} exc_type={} exc={}", user_id, type(e).__name__, e, ) @shared_task(bind=True, max_retries=3, default_retry_delay=30) def process_memoir_phase2( self, user_id: str, chapter_category: str, memoir_correlation_id: str | None = None, ): """Phase 2:叙事 / 路由 / 忠实度 / 标题;按类目加锁,消费未叙事且非 skip 的 segments。""" task_id = self.request.id cid = effective_correlation_id( explicit=memoir_correlation_id, celery_task_id=str(task_id) ) phase2_t0 = time.perf_counter() logger.info( "event=memoir_phase2_start user_id={} task_id={} chapter_category={} " "memoir_correlation_id={} msg=回忆录第二阶段叙事任务开始", user_id, task_id, chapter_category, cid, ) merge_pipeline_run( cid, { "user_id": user_id, "phase2": [ { "chapter_category": chapter_category, "task_id": str(task_id), "status": "running", } ], }, ) try: with get_sync_db() as db: user_convs = select(Conversation.id).where( Conversation.user_id == user_id, Conversation.deleted_at.is_(None), ) now_utc = datetime.now(timezone.utc) stmt = ( select(Segment) .where( Segment.conversation_id.in_(user_convs), Segment.topic_category == chapter_category, Segment.narrated.is_(False), Segment.skip_narrative.is_(False), ( Segment.narrative_deferred_until.is_(None) | (Segment.narrative_deferred_until <= now_utc) ), ) .order_by(Segment.created_at) ) category_segments = list(db.execute(stmt).scalars().all()) if not category_segments: ms = (time.perf_counter() - phase2_t0) * 1000 logger.info( "event=memoir_phase2_noop user_id={} chapter_category={} " "duration_ms={:.1f} msg=第二阶段无待叙事片段", user_id, chapter_category, ms, ) merge_pipeline_run( cid, { "phase2": [ { "chapter_category": chapter_category, "task_id": str(task_id), "status": "noop", } ], }, ) return {"status": "noop"} llm = _get_llm() llm_fast = _get_llm_fast() or llm user_obj = db.get(User, user_id) user_profile = "" user_birth_year = None background_voice = "default" user_occupation = "" if user_obj: user_birth_year = user_obj.birth_year user_profile = format_user_profile_context( birth_year=user_obj.birth_year, birth_place=user_obj.birth_place, grew_up_place=user_obj.grew_up_place, occupation=user_obj.occupation, ) background_voice = infer_background_voice(user_obj.occupation) user_occupation = user_obj.occupation or "" image_settings = MemoirImageSettings.from_env() story_dispatch_ids: Set[str] = set() chapters_to_enqueue: Set[str] = set() affected_chapter_ids: Set[str] = set() lock_t0 = time.perf_counter() lock_handle = _acquire_chapter_lock( user_id, chapter_category, ttl_seconds=_chapter_lock_ttl() ) lock_elapsed = time.perf_counter() - lock_t0 if lock_handle is None: logger.warning( "event=memoir_phase2_lock_busy user_id={} chapter_category={}", user_id, chapter_category, ) raise self.retry(countdown=10) try: # 锁内再查一次,避免等待锁期间状态已变 category_segments = list(db.execute(stmt).scalars().all()) if not category_segments: merge_pipeline_run( cid, { "phase2": [ { "chapter_category": chapter_category, "task_id": str(task_id), "status": "noop", "detail": {"reason": "empty_after_lock"}, } ], }, ) return {"status": "noop"} state = get_or_create_state_sync(user_id, db) segment_texts = [seg.user_input_text or "" for seg in category_segments] combined_text = "\n\n".join(segment_texts) n_units = len(category_segments) evidence_top_k = int(settings.evidence_top_k_default) if n_units > int(settings.evidence_large_batch_threshold): evidence_top_k = int(settings.evidence_top_k_large_batch) try: memory_evidence = asyncio.run( _memory_retrieve_evidence( user_id, combined_text, top_k=evidence_top_k, ) ) except Exception as e: logger.warning("Evidence 检索跳过: {}", e) memory_evidence = { "relevant_chunks": [], "relevant_summaries": [], "relevant_facts": [], "relevant_stories": [], } pipeline_t0 = time.perf_counter() pipeline_result = run_story_pipeline_for_category_batch( db, user_id=user_id, chapter_category=chapter_category, category_segments=category_segments, state=state, user_profile=user_profile, user_birth_year=user_birth_year, llm=llm, background_voice=background_voice, occupation=user_occupation, memoir_correlation_id=cid, llm_fast=llm_fast, memory_evidence=memory_evidence, ) pipeline_elapsed = time.perf_counter() - pipeline_t0 if pipeline_result.deferred: deferred_response = _persist_phase2_route_defer( db, user_id=user_id, chapter_category=chapter_category, task_id=str(task_id), memoir_correlation_id=cid, defer_segment_ids=pipeline_result.defer_segment_ids, defer_reason=pipeline_result.defer_reason or "unknown", phase2_started=phase2_t0, pipeline_elapsed=pipeline_elapsed, lock_elapsed=lock_elapsed, ) return deferred_response chapter = pipeline_result.chapter story_dispatch_ids |= pipeline_result.dispatch_ids db.flush() if chapter is None: logger.error( "event=memoir_phase2_no_chapter user_id={} chapter_category={}", user_id, chapter_category, ) db.rollback() raise self.retry( exc=RuntimeError("story_pipeline returned no chapter"), countdown=30, ) db.refresh(chapter) affected_chapter_ids.add(chapter.id) needs_cover_enqueue = ( image_settings.enabled and chapter_needs_cover_enqueue(chapter) ) stmt_book = ( select(Book) .where(Book.user_id == user_id) .order_by(Book.updated_at.desc()) ) result_book = db.execute(stmt_book) book = result_book.scalar_one_or_none() if not book: book = Book( id=str(uuid.uuid4()), user_id=user_id, title="我的回忆录", total_pages=0, total_words=0, cover_image_url=None, ) db.add(book) book.has_update = True book.last_update_chapter_id = chapter.id if needs_cover_enqueue: chapters_to_enqueue.add(chapter.id) for seg in category_segments: seg.narrated = True seg.processed = True db.commit() _run_post_pipeline_commit( user_id=user_id, story_dispatch_ids=story_dispatch_ids, recompose_chapter_ids=affected_chapter_ids, cover_chapter_ids=chapters_to_enqueue, trigger_source="pipeline_phase2", need_compaction=True, need_quality_pass=True, memoir_correlation_id=cid, compaction_extra={ "pipeline_run_id": str(task_id), "memoir_correlation_id": cid, "story_dispatch_ids": sorted(story_dispatch_ids), "chapters_to_enqueue": sorted(chapters_to_enqueue), "chapter_category": chapter_category, }, ) phase2_elapsed = time.perf_counter() - phase2_t0 duration_ms = phase2_elapsed * 1000 logger.info( "event=memoir_phase2_done user_id={} task_id={} chapter_category={} " "segment_count={} memoir_correlation_id={} " "lock_seconds={:.3f} pipeline_seconds={:.3f} " "phase2_total_seconds={:.3f} duration_ms={:.1f} " "msg=回忆录第二阶段叙事完成", user_id, task_id, chapter_category, len(category_segments), cid, lock_elapsed, pipeline_elapsed, phase2_elapsed, duration_ms, ) merge_pipeline_run( cid, { "phase2": [ { "chapter_category": chapter_category, "task_id": str(task_id), "status": "success", "detail": {"segments": len(category_segments)}, } ], }, ) return { "status": "success", "chapter_category": chapter_category, "segments": len(category_segments), } finally: _release_chapter_lock(lock_handle) except Retry: raise except Exception as e: logger.error( "event=memoir_phase2_failed user_id={} chapter_category={} exc={} " "msg=回忆录第二阶段失败", user_id, chapter_category, e, ) merge_pipeline_run( cid, { "phase2": [ { "chapter_category": chapter_category, "task_id": str(task_id), "status": "failure", "detail": {"error": str(e)}, } ], }, ) raise self.retry(exc=e) from e @shared_task(bind=True, max_retries=3, default_retry_delay=60) def process_memoir_phase1(self, user_id: str, segment_ids: List[str]): """ Phase 1:记忆 ingest + 抽取/分类;持久化 topic_category / skip_narrative; 按需派发 Phase 2(阈值或延迟兜底)。 """ task_id = self.request.id memoir_correlation_id = new_memoir_correlation_id() logger.info( "event=memoir_phase1_start user_id={} task_id={} segments={} " "memoir_correlation_id={} msg=回忆录第一阶段抽取与分类开始", user_id, task_id, len(segment_ids), memoir_correlation_id, ) _update_task_status_sync(user_id, task_id, "running") init_pipeline_run_from_phase1( user_id, memoir_correlation_id, task_id, segment_count=len(segment_ids), ) phase1_t0 = time.perf_counter() try: with get_sync_db() as db: stmt = ( select(Segment) .where(Segment.id.in_(segment_ids)) .order_by(Segment.created_at.asc(), Segment.id.asc()) ) rows = db.execute(stmt).scalars().all() segments = [s for s in rows if not s.narrated] if not segments: logger.warning("event=memoir_phase1_no_segments ids={}", segment_ids) merge_pipeline_run( memoir_correlation_id, { "phase1": { "status": "success", "step": "no_segments", "detail": {"processed": 0}, }, }, ) _update_task_status_sync( user_id, task_id, "success", {"processed": 0, "categories": []}, ) return {"status": "no_segments"} merge_pipeline_run( memoir_correlation_id, { "phase1": { "step": "memory_ingest", "detail": {"candidates": len(segments)}, }, }, ) ingest_t0 = time.perf_counter() ingest_items: list[tuple[str, str, dict | None]] = [] non_empty_segments: list = [] for seg in segments: text = (seg.user_input_text or "").strip() if not text: continue conv_id = getattr(seg, "conversation_id", None) or "" ln = getattr(seg, "lineage_json", None) lineage_payload = ln if isinstance(ln, dict) else None ingest_items.append((conv_id, text, lineage_payload)) non_empty_segments.append(seg) ingested_source_ids: list[str] = [] if ingest_items: try: ingested_source_ids = asyncio.run( _memory_ingest_transcripts_batch( user_id, ingest_items, memoir_correlation_id=memoir_correlation_id, ) ) for seg, sid in zip( non_empty_segments, ingested_source_ids, strict=True ): logger.info( "event=memory_transcript_ingested user_id={} task_id={} " "source_id={} conversation_id={} segment_id={} transcript_chars={}", user_id, task_id, sid, getattr(seg, "conversation_id", None) or "", seg.id, len((seg.user_input_text or "").strip()), ) except Exception as e: logger.warning( "Memory batch ingest 失败: {} exc_type={}", e, type(e).__name__, ) ingest_elapsed = time.perf_counter() - ingest_t0 merge_pipeline_run( memoir_correlation_id, { "phase1": { "step": "prepare_batches", "detail": { "memory_ingest_seconds": round(ingest_elapsed, 3), "ingested_sources": len(ingested_source_ids), }, }, }, ) llm = _get_llm() llm_fast = _get_llm_fast() or llm if (settings.llm_fast_model or "").strip(): logger.info( "event=llm_fast_tier_used pipeline=memoir_prepare_batches model={}", settings.llm_fast_model, ) prep_t0 = time.perf_counter() memoir_orchestrator = MemoirOrchestrator() def _phase1_chunk_cb(idx: int, total: int) -> None: merge_pipeline_run( memoir_correlation_id, {"phase1": {"detail": {"prepare_batches_chunk": [idx, total]}}}, ) prepared = memoir_orchestrator.prepare_batches( segments=list(segments), llm=llm, llm_fast=llm_fast, get_or_create_state=lambda: get_or_create_state_sync(user_id, db), update_slot=lambda stage, slot_name, snippet, seg_ids: update_slot_sync( user_id, stage, slot_name, snippet, seg_ids, db, memoir_batch=True, ), on_phase1_chunk=_phase1_chunk_cb, ) prep_elapsed = time.perf_counter() - prep_t0 merge_pipeline_run( memoir_correlation_id, { "phase1": { "step": "persist_topics", "detail": {"prepare_batches_seconds": round(prep_elapsed, 3)}, }, }, ) skip_ids = prepared.segment_skip_story_ids missing_cat = [ seg.id for seg in segments if not prepared.segment_chapter_category.get(str(seg.id)) ] if missing_cat: logger.error( "event=memoir_phase1_missing_category abort segment_ids={}", missing_cat, ) raise RuntimeError( f"memoir_phase1_missing_category: {len(missing_cat)} segments" ) for seg in segments: cat = prepared.segment_chapter_category[str(seg.id)] seg.topic_category = cat is_skip = str(seg.id) in skip_ids seg.skip_narrative = is_skip seg.narrated = False if is_skip: seg.processed = True db.flush() categories_for_phase2: Set[str] = set() phase2_immediate: list[str] = [] phase2_timeout: list[str] = [] woke_up_by_category: dict[str, int] = {} for chapter_category, cat_segments in prepared.category_to_segments.items(): batch_non_skip = [ s for s in cat_segments if str(s.id) not in prepared.segment_skip_story_ids ] if not batch_non_skip: continue woke = _wake_deferred_segments_for_category( db, user_id, chapter_category ) if woke: woke_up_by_category[chapter_category] = woke max_chars = max( len((s.user_input_text or "").strip()) for s in batch_non_skip ) categories_for_phase2.add(chapter_category) if _should_trigger_phase2(db, user_id, chapter_category, max_chars): phase2_immediate.append(chapter_category) else: phase2_timeout.append(chapter_category) if woke_up_by_category: logger.info( "event=memoir_phase1_wake_deferred user_id={} categories={} " "msg=Phase1 新素材唤醒同类目延迟 segment", user_id, woke_up_by_category, ) db.commit() merge_pipeline_run( memoir_correlation_id, { "phase1": { "step": "dispatch_phase2", "detail": { "phase2_immediate": list(phase2_immediate), "phase2_timeout": list(phase2_timeout), }, }, }, ) for cc in phase2_immediate: p2tid = _dispatch_phase2_immediate(user_id, cc, memoir_correlation_id) if p2tid: merge_pipeline_run( memoir_correlation_id, { "phase2": [ { "chapter_category": cc, "task_id": str(p2tid), "status": "enqueued", } ], }, ) for cc in phase2_timeout: p2tid = _schedule_phase2_timeout(user_id, cc, memoir_correlation_id) if p2tid: merge_pipeline_run( memoir_correlation_id, { "phase2": [ { "chapter_category": cc, "task_id": str(p2tid), "status": "scheduled_timeout", } ], }, ) categories_processed = sorted(prepared.category_to_segments.keys()) phase1_elapsed = time.perf_counter() - phase1_t0 merge_pipeline_run( memoir_correlation_id, { "phase1": { "status": "success", "step": "completed", "detail": { "processed": len(segments), "phase1_total_seconds": round(phase1_elapsed, 3), }, }, }, ) _update_task_status_sync( user_id, task_id, "success", { "processed": len(segments), "categories_processed": categories_processed, "phase2_watch_categories": sorted(categories_for_phase2), }, ) duration_ms = phase1_elapsed * 1000 logger.info( "event=memoir_phase1_done user_id={} task_id={} segment_count={} " "categories={} memoir_correlation_id={} " "memory_ingest_seconds={:.3f} prepare_batches_seconds={:.3f} " "phase1_total_seconds={:.3f} duration_ms={:.1f} " "msg=回忆录第一阶段完成", user_id, task_id, len(segments), categories_processed, memoir_correlation_id, ingest_elapsed, prep_elapsed, phase1_elapsed, duration_ms, ) return { "status": "success", "processed": len(segments), "categories_processed": categories_processed, } except Retry: raise except Exception as e: logger.error( "event=memoir_phase1_failed user_id={} exc={} msg=回忆录第一阶段失败", user_id, e, ) merge_pipeline_run( memoir_correlation_id, { "phase1": { "status": "failure", "step": "error", "detail": {"error": str(e)}, }, }, ) _update_task_status_sync(user_id, task_id, "failure", {"error": str(e)}) raise self.retry(exc=e) from e @shared_task(bind=True, max_retries=3, default_retry_delay=30) def generate_chapter_content(self, user_id: str, stage: str, new_content: str): """ 单独生成章节内容的任务(用于实时更新) Args: user_id: 用户 ID stage: 阶段 new_content: 新内容 """ stage = normalize_chapter_category(stage, fallback="summary") cid = effective_correlation_id(explicit=None, celery_task_id=str(self.request.id)) gen_t0 = time.perf_counter() logger.info( "event=generate_chapter_content_start user_id={} stage={} memoir_correlation_id={} " "msg=实时章节生成任务开始", user_id, stage, cid, ) try: with get_sync_db() as db: llm = _get_llm() llm_fast = _get_llm_fast() or llm user_obj = db.get(User, user_id) user_profile = "" user_birth_year = None background_voice = "default" user_occupation = "" if user_obj: user_birth_year = user_obj.birth_year user_profile = format_user_profile_context( birth_year=user_obj.birth_year, birth_place=user_obj.birth_place, grew_up_place=user_obj.grew_up_place, occupation=user_obj.occupation, ) background_voice = infer_background_voice(user_obj.occupation) user_occupation = user_obj.occupation or "" class _Seg: def __init__(self, text: str): self.id = str(uuid.uuid4()) self.user_input_text = text state = get_or_create_state_sync(user_id, db) chapter, _, dispatch_ids = run_story_pipeline_for_category_batch( db, user_id=user_id, chapter_category=stage, category_segments=[_Seg(new_content)], state=state, user_profile=user_profile, user_birth_year=user_birth_year, llm=llm, background_voice=background_voice, occupation=user_occupation, memoir_correlation_id=cid, llm_fast=llm_fast, ) db.flush() if chapter is None: logger.error( "event=generate_chapter_content_no_chapter user_id={} stage={}", user_id, stage, ) db.rollback() raise self.retry( exc=RuntimeError("story_pipeline returned no chapter"), countdown=30, ) db.commit() db.refresh(chapter) ch_ids: set[str] = {str(chapter.id)} cover_ids = ch_ids if chapter_needs_cover_enqueue(chapter) else set() _run_post_pipeline_commit( user_id=user_id, story_dispatch_ids=set(dispatch_ids), recompose_chapter_ids=ch_ids, cover_chapter_ids=cover_ids, trigger_source="pipeline_generate_chapter", need_compaction=False, need_quality_pass=True, memoir_correlation_id=cid, ) ms = (time.perf_counter() - gen_t0) * 1000 logger.info( "event=generate_chapter_content_done user_id={} stage={} " "memoir_correlation_id={} duration_ms={:.1f} msg=实时章节生成完成", user_id, stage, cid, ms, ) return {"status": "success"} except Retry: raise except Exception as e: ms = (time.perf_counter() - gen_t0) * 1000 logger.error( "event=generate_chapter_content_failed user_id={} stage={} duration_ms={:.1f} " "exc={} msg=实时章节生成失败", user_id, stage, ms, e, ) raise self.retry(exc=e) from e