Files
life-echo/api/app/tasks/memoir_tasks.py
Kevin ccdc4e4277 feat(i18n): persist language preference and thread through chat, memoir, TTS
- Add users.language_preference (Alembic 0018, default zh); capture at signup/SMS
  only; expose on auth and profile APIs
- Lite English prompts for chat and memoir; localized stage labels and agent
  names (Life Echo / 岁月知己)
- Tencent TTS: language-aware synthesis, ModelType=1 for 501004, English chunking
- WebSocket pipeline: emit all AGENT_RESPONSE segments when TTS cancels; INFO logs
  for tts_this_turn and TTS decisions; on-demand TTS logging
- Expo: device language on auth, i18n tiers/agent name, [SPLIT] streaming UX fixes
- Tests for migration, prompts, pipeline, router tts_this_turn, reply segments

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-11 16:16:49 +08:00

1385 lines
49 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
回忆录处理 Celery 任务
"""
import asyncio
import json
import time
import uuid
from datetime import datetime, timedelta, timezone
from typing import Dict, List, Set
import redis
from celery import shared_task
from celery.exceptions import Retry
from celery.result import AsyncResult
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from app.agents.chat.background_voice import infer_background_voice
from app.agents.chat.prompts_profile import format_user_profile_context
from app.agents.memoir import MemoirOrchestrator
from app.agents.stage_constants import normalize_chapter_category
from app.core.chapter_pipeline_lock import (
acquire_chapter_pipeline_lock as _acquire_chapter_lock,
)
from app.core.chapter_pipeline_lock import (
release_chapter_pipeline_lock as _release_chapter_lock,
)
from app.core.config import settings
from app.core.db import AsyncSessionLocal, get_sync_db
from app.core.dependencies import get_embedding_provider
from app.core.llm_gateway import LlmGateway, LlmUseCase
from app.core.logging import get_logger
from app.core.memoir_pipeline_progress import (
init_pipeline_run_from_phase1,
merge_pipeline_run,
)
from app.core.memoir_pipeline_trace import (
effective_correlation_id,
new_memoir_correlation_id,
)
from app.features.conversation.models import Conversation, Segment
from app.features.memoir.cover_eligibility import (
chapter_needs_cover_enqueue,
)
from app.features.memoir.memoir_images.parser import (
build_initial_image_assets,
)
from app.features.memoir.memoir_images.schema import (
IMAGE_STATUS_COMPLETED,
IMAGE_STATUS_FAILED,
IMAGE_STATUS_PENDING,
normalize_image_assets,
)
from app.features.memoir.memoir_images.serializers import (
image_dict_to_row_kwargs,
)
from app.features.memoir.memoir_images.settings import MemoirImageSettings
from app.features.memoir.models import (
Book,
MemoirImage,
)
from app.features.memoir.state_service import (
get_or_create_state_sync,
update_slot_sync,
)
from app.features.memoir.story_pipeline_sync import (
run_story_pipeline_for_category_batch,
)
from app.features.memory.service import MemoryService
from app.features.user.models import User
from app.tasks.celery_app import celery_app
logger = get_logger(__name__)
_REDIS_CLIENTS: dict[bool, redis.Redis] = {}
def _run_post_pipeline_commit(
*,
user_id: str,
story_dispatch_ids: set[str],
recompose_chapter_ids: set[str],
cover_chapter_ids: set[str],
trigger_source: str,
need_compaction: bool,
need_quality_pass: bool = False,
memoir_correlation_id: str | None = None,
compaction_extra: dict | None = None,
) -> None:
"""Shared post-commit dispatch: images, recompose, compaction, quality pass, covers."""
from app.features.story.post_commit import enqueue_story_post_commit_effects
pc = enqueue_story_post_commit_effects(
user_id=user_id,
story_ids=set(story_dispatch_ids),
chapter_ids=recompose_chapter_ids,
trigger_source=trigger_source,
need_compaction=need_compaction,
need_quality_pass=need_quality_pass,
memoir_correlation_id=memoir_correlation_id,
compaction_extra=compaction_extra,
)
logger.info(
"event=story_post_commit user_id={} trigger={} "
"enqueued_story_image_count={} enqueued_chapter_recompose_count={} "
"compaction_scheduled={} quality_pass_scheduled={} errors={}",
user_id,
trigger_source,
pc.enqueued_story_image_count,
pc.enqueued_chapter_recompose_count,
pc.compaction_scheduled,
pc.quality_pass_scheduled,
pc.errors,
)
if cover_chapter_ids:
image_settings = MemoirImageSettings.from_env()
if image_settings.enabled:
from app.tasks.chapter_cover_enqueue import (
try_enqueue_generate_chapter_cover,
)
for ch_id in sorted(cover_chapter_ids):
if try_enqueue_generate_chapter_cover(ch_id, source=trigger_source):
logger.info("派发章节封面任务: chapter={}", ch_id)
def _get_llm():
"""Celery 任务内获取 LangChain LLM通过 port"""
try:
return LlmGateway().langchain_llm_for(LlmUseCase("memoir_tasks"))
except Exception:
return None
def _get_llm_fast():
"""分类 / 抽取等快档位任务(与叙事、路由默认模型可分离)。"""
try:
return LlmGateway().langchain_llm_for(
LlmUseCase("memoir_tasks.fast", fast=True)
)
except Exception:
return None
async def _memory_ingest_transcripts_batch(
user_id: str,
items: list[tuple[str, str, dict | None]],
*,
memoir_correlation_id: str,
) -> list[str]:
async with AsyncSessionLocal() as db:
service = MemoryService(db, embedding_provider=get_embedding_provider())
return await service.ingest_transcripts_batch(
user_id,
items,
memoir_correlation_id=memoir_correlation_id,
)
async def _memory_retrieve_evidence(
user_id: str,
query: str,
*,
top_k: int,
) -> dict:
async with AsyncSessionLocal() as db:
service = MemoryService(db, embedding_provider=get_embedding_provider())
bundle = await service.retrieve(user_id, query, top_k=top_k)
return bundle.model_dump()
def _get_redis_client(*, decode_responses: bool = False) -> redis.Redis:
from app.core.config import settings
client = _REDIS_CLIENTS.get(decode_responses)
if client is None:
client = redis.from_url(
settings.redis_url,
decode_responses=decode_responses,
)
_REDIS_CLIENTS[decode_responses] = client
return client
def _chapter_lock_ttl() -> int:
return int(settings.chapter_pipeline_lock_ttl_seconds)
def _update_task_status_sync(
user_id: str, task_id: str, status: str, result: Dict = None
):
"""同步更新任务状态(在 Celery 任务中使用)"""
try:
r = _get_redis_client(decode_responses=True)
key = f"task:user:{user_id}:tasks"
# 获取现有任务信息
data = r.hget(key, task_id)
if data:
task_info = json.loads(data)
else:
task_info = {"task_id": task_id}
task_info["status"] = status
task_info["updated_at"] = datetime.now(timezone.utc).isoformat()
if result is not None:
task_info["result"] = result
r.hset(key, task_id, json.dumps(task_info))
r.expire(key, 3600) # 1小时过期
logger.debug("任务状态已更新: task_id={} status={}", task_id, status)
except Exception as e:
logger.error(
"event=memoir_task_status_update_failed msg=更新任务状态失败 exc={}", e
)
def _merge_chapter_image_assets(
existing_images: list[dict] | None,
placeholders: list[dict],
provider: str,
style: str,
size: str,
now_iso: str,
) -> list[dict]:
normalized_existing_images = normalize_image_assets(existing_images)
existing_by_placeholder = {
item.get("placeholder"): dict(item)
for item in normalized_existing_images
if item.get("placeholder")
}
merged_assets: list[dict] = []
for item in placeholders:
existing = existing_by_placeholder.get(item["placeholder"])
if existing:
merged_item = dict(existing)
merged_item["index"] = item["index"]
merged_item["placeholder"] = item["placeholder"]
merged_item["description"] = item["description"]
merged_item["provider"] = merged_item.get("provider") or provider
merged_item["style"] = merged_item.get("style") or style
merged_item["size"] = merged_item.get("size") or size
merged_item["created_at"] = merged_item.get("created_at") or now_iso
merged_item["updated_at"] = merged_item.get("updated_at") or now_iso
if merged_item.get("status") == IMAGE_STATUS_COMPLETED and not (
merged_item.get("storage_key") or merged_item.get("url")
):
merged_item["status"] = IMAGE_STATUS_FAILED
merged_item["error"] = merged_item.get("error") or "missing image url"
else:
merged_item = build_initial_image_assets(
placeholders=[item],
provider=provider,
style=style,
size=size,
now_iso=now_iso,
)[0]
merged_assets.append(merged_item)
return merged_assets
def chapter_has_images_to_generate(images: list[dict] | None) -> bool:
return any(
item.get("status") in {IMAGE_STATUS_PENDING, IMAGE_STATUS_FAILED}
for item in normalize_image_assets(images)
)
def _memoir_image_from_asset(
chapter_id: str,
order_index: int,
image_asset: dict,
) -> MemoirImage:
"""从单条图片 dict 构建 MemoirImage 行(用于写入 memoir_images 表)。"""
kwargs = image_dict_to_row_kwargs(image_asset)
return MemoirImage(
id=str(uuid.uuid4()).replace("-", "")[:32],
chapter_id=chapter_id,
order_index=order_index,
**kwargs,
)
def _phase2_timeout_task_id(user_id: str, chapter_category: str) -> str:
return f"phase2-timeout-{user_id}-{chapter_category}"
def _revoke_phase2_timeout(user_id: str, chapter_category: str) -> None:
tid = _phase2_timeout_task_id(user_id, chapter_category)
try:
AsyncResult(tid, app=celery_app).revoke(terminate=False)
except Exception as e:
logger.debug(
"event=phase2_timeout_revoke_skipped task_id={} exc={}",
tid,
e,
)
def _should_trigger_phase2(
db: Session,
user_id: str,
chapter_category: str,
current_segment_chars: int,
) -> bool:
if current_segment_chars >= int(settings.memoir_narrative_immediate_char_threshold):
return True
user_convs = select(Conversation.id).where(
Conversation.user_id == user_id,
Conversation.deleted_at.is_(None),
)
stmt = select(
func.count(Segment.id),
func.coalesce(func.sum(func.length(Segment.user_input_text)), 0),
).where(
Segment.conversation_id.in_(user_convs),
Segment.topic_category == chapter_category,
Segment.narrated.is_(False),
Segment.skip_narrative.is_(False),
)
row = db.execute(stmt).one()
count, total_chars = int(row[0] or 0), int(row[1] or 0)
if count >= int(settings.memoir_narrative_batch_min_segments):
return True
if total_chars >= int(settings.memoir_narrative_batch_min_chars):
return True
return False
def _phase2_immediate_task_id(user_id: str, chapter_category: str) -> str:
return f"phase2-immediate-{user_id}-{chapter_category}"
def _wake_deferred_segments_for_category(
db: Session,
user_id: str,
chapter_category: str,
) -> int:
"""清空该用户某 chapter_category 下旧的 defer 元数据,让其与新素材一起重判。
返回被唤醒的 segment 数量,仅用于日志。
"""
user_convs = select(Conversation.id).where(
Conversation.user_id == user_id,
Conversation.deleted_at.is_(None),
)
stmt = select(Segment).where(
Segment.conversation_id.in_(user_convs),
Segment.topic_category == chapter_category,
Segment.narrated.is_(False),
Segment.skip_narrative.is_(False),
Segment.narrative_deferred_until.isnot(None),
)
rows = list(db.execute(stmt).scalars().all())
if not rows:
return 0
for seg in rows:
seg.narrative_deferred_until = None
seg.narrative_defer_count = 0
seg.narrative_defer_reason = None
return len(rows)
def _persist_phase2_route_defer(
db: Session,
*,
user_id: str,
chapter_category: str,
task_id: str,
memoir_correlation_id: str | None,
defer_segment_ids: list[str],
defer_reason: str,
phase2_started: float,
pipeline_elapsed: float,
lock_elapsed: float,
) -> dict:
"""把本批 segment 标记为延迟态,并按需再排一次 Phase2 timeout。
返回 Celery 任务的 result dict``status=deferred``)。
"""
now_ts = datetime.now(timezone.utc)
max_attempts = int(settings.memoir_route_defer_max_attempts)
defer_seconds = float(settings.memoir_route_defer_seconds)
deferred_until_ts = now_ts + timedelta(seconds=max(defer_seconds, 1.0))
rows: list[Segment] = []
if defer_segment_ids:
stmt = select(Segment).where(Segment.id.in_(list(defer_segment_ids)))
rows = list(db.execute(stmt).scalars().all())
saturated_segments = 0
new_max_attempts_reached = False
for seg in rows:
prev_count = int(seg.narrative_defer_count or 0)
seg.narrative_defer_count = prev_count + 1
seg.narrative_defer_reason = defer_reason
seg.narrative_last_attempt_at = now_ts
if seg.narrative_defer_count >= max_attempts:
seg.narrative_deferred_until = None
saturated_segments += 1
new_max_attempts_reached = True
else:
seg.narrative_deferred_until = deferred_until_ts
db.commit()
next_task_id: str | None = None
if rows and not new_max_attempts_reached:
next_task_id = _schedule_phase2_timeout(
user_id, chapter_category, memoir_correlation_id
)
phase2_elapsed = time.perf_counter() - phase2_started
duration_ms = phase2_elapsed * 1000
logger.info(
"event=memoir_phase2_route_deferred user_id={} task_id={} chapter_category={} "
"segment_count={} saturated_count={} reason={} memoir_correlation_id={} "
"lock_seconds={:.3f} pipeline_seconds={:.3f} "
"phase2_total_seconds={:.3f} duration_ms={:.1f} next_task_id={} "
"msg=Phase2 路由低置信,本批 segment 延迟",
user_id,
task_id,
chapter_category,
len(rows),
saturated_segments,
defer_reason,
memoir_correlation_id or "",
lock_elapsed,
pipeline_elapsed,
phase2_elapsed,
duration_ms,
next_task_id or "",
)
merge_pipeline_run(
memoir_correlation_id,
{
"phase2": [
{
"chapter_category": chapter_category,
"task_id": str(task_id),
"status": "deferred",
"detail": {
"segments": len(rows),
"reason": defer_reason,
"saturated_count": saturated_segments,
"next_task_id": next_task_id,
},
}
],
},
)
return {
"status": "deferred",
"chapter_category": chapter_category,
"segments": len(rows),
"reason": defer_reason,
"saturated_count": saturated_segments,
}
def _schedule_phase2_timeout(
user_id: str, chapter_category: str, memoir_correlation_id: str | None = None
) -> str | None:
"""Reset countdown for Phase 2 narrative for one category。返回 Celery task_id。"""
_revoke_phase2_timeout(user_id, chapter_category)
countdown = float(max(1.0, settings.memoir_narrative_batch_max_wait_seconds))
p2_kwargs: dict = {}
if memoir_correlation_id:
p2_kwargs["memoir_correlation_id"] = memoir_correlation_id
timeout_tid = _phase2_timeout_task_id(user_id, chapter_category)
celery_app.send_task(
"app.tasks.memoir_tasks.process_memoir_phase2",
args=[user_id, chapter_category],
kwargs=p2_kwargs,
countdown=countdown,
task_id=timeout_tid,
)
logger.info(
"event=phase2_timeout_scheduled user_id={} chapter_category={} countdown={} "
"memoir_correlation_id={}",
user_id,
chapter_category,
countdown,
memoir_correlation_id or "",
)
return timeout_tid
def _dispatch_phase2_immediate(
user_id: str, chapter_category: str, memoir_correlation_id: str | None = None
) -> str | None:
_revoke_phase2_timeout(user_id, chapter_category)
p2_kwargs: dict = {}
if memoir_correlation_id:
p2_kwargs["memoir_correlation_id"] = memoir_correlation_id
send_kw: dict = {
"args": [user_id, chapter_category],
"kwargs": p2_kwargs,
}
fixed_tid: str | None = None
if settings.memoir_phase2_singleflight_immediate:
fixed_tid = _phase2_immediate_task_id(user_id, chapter_category)
send_kw["task_id"] = fixed_tid
ar = celery_app.send_task("app.tasks.memoir_tasks.process_memoir_phase2", **send_kw)
out_tid = fixed_tid or getattr(ar, "id", None)
logger.info(
"event=phase2_dispatched_immediate user_id={} chapter_category={} "
"memoir_correlation_id={} task_id_mode={} celery_task_id={}",
user_id,
chapter_category,
memoir_correlation_id or "",
"singleflight" if settings.memoir_phase2_singleflight_immediate else "unique",
out_tid or "",
)
return out_tid
def dispatch_pending_memoir_phase2_for_user(user_id: str) -> None:
"""会话结束等场景:为该用户所有待叙事类目各发一条 Phase2幂等"""
try:
with get_sync_db() as db:
user_convs = select(Conversation.id).where(
Conversation.user_id == user_id,
Conversation.deleted_at.is_(None),
)
stmt = (
select(Segment.topic_category)
.where(
Segment.conversation_id.in_(user_convs),
Segment.narrated.is_(False),
Segment.skip_narrative.is_(False),
Segment.topic_category.isnot(None),
)
.distinct()
)
cats = [r[0] for r in db.execute(stmt).all() if r[0]]
for chapter_category in cats:
_revoke_phase2_timeout(user_id, chapter_category)
flush_cid = new_memoir_correlation_id()
ar = celery_app.send_task(
"app.tasks.memoir_tasks.process_memoir_phase2",
args=[user_id, chapter_category],
kwargs={"memoir_correlation_id": flush_cid},
)
p2tid = getattr(ar, "id", None)
logger.info(
"event=phase2_dispatched_flush user_id={} chapter_category={} "
"memoir_correlation_id={} celery_task_id={}",
user_id,
chapter_category,
flush_cid,
p2tid or "",
)
if p2tid and flush_cid:
merge_pipeline_run(
flush_cid,
{
"user_id": user_id,
"phase2": [
{
"chapter_category": chapter_category,
"task_id": str(p2tid),
"status": "enqueued",
}
],
},
)
except Exception as e:
logger.error(
"event=phase2_flush_failed user_id={} exc_type={} exc={}",
user_id,
type(e).__name__,
e,
)
@shared_task(bind=True, max_retries=3, default_retry_delay=30)
def process_memoir_phase2(
self,
user_id: str,
chapter_category: str,
memoir_correlation_id: str | None = None,
):
"""Phase 2叙事 / 路由 / 忠实度 / 标题;按类目加锁,消费未叙事且非 skip 的 segments。"""
task_id = self.request.id
cid = effective_correlation_id(
explicit=memoir_correlation_id, celery_task_id=str(task_id)
)
phase2_t0 = time.perf_counter()
logger.info(
"event=memoir_phase2_start user_id={} task_id={} chapter_category={} "
"memoir_correlation_id={} msg=回忆录第二阶段叙事任务开始",
user_id,
task_id,
chapter_category,
cid,
)
merge_pipeline_run(
cid,
{
"user_id": user_id,
"phase2": [
{
"chapter_category": chapter_category,
"task_id": str(task_id),
"status": "running",
}
],
},
)
try:
with get_sync_db() as db:
user_convs = select(Conversation.id).where(
Conversation.user_id == user_id,
Conversation.deleted_at.is_(None),
)
now_utc = datetime.now(timezone.utc)
stmt = (
select(Segment)
.where(
Segment.conversation_id.in_(user_convs),
Segment.topic_category == chapter_category,
Segment.narrated.is_(False),
Segment.skip_narrative.is_(False),
(
Segment.narrative_deferred_until.is_(None)
| (Segment.narrative_deferred_until <= now_utc)
),
)
.order_by(Segment.created_at)
)
category_segments = list(db.execute(stmt).scalars().all())
if not category_segments:
ms = (time.perf_counter() - phase2_t0) * 1000
logger.info(
"event=memoir_phase2_noop user_id={} chapter_category={} "
"duration_ms={:.1f} msg=第二阶段无待叙事片段",
user_id,
chapter_category,
ms,
)
merge_pipeline_run(
cid,
{
"phase2": [
{
"chapter_category": chapter_category,
"task_id": str(task_id),
"status": "noop",
}
],
},
)
return {"status": "noop"}
llm = _get_llm()
llm_fast = _get_llm_fast() or llm
user_obj = db.get(User, user_id)
user_profile = ""
user_birth_year = None
background_voice = "default"
user_occupation = ""
user_language = "zh"
if user_obj:
user_birth_year = user_obj.birth_year
user_language = (
"en"
if str(getattr(user_obj, "language_preference", "zh") or "zh").lower()
== "en"
else "zh"
)
user_profile = format_user_profile_context(
birth_year=user_obj.birth_year,
birth_place=user_obj.birth_place,
grew_up_place=user_obj.grew_up_place,
occupation=user_obj.occupation,
language=user_language,
)
background_voice = infer_background_voice(user_obj.occupation)
user_occupation = user_obj.occupation or ""
image_settings = MemoirImageSettings.from_env()
story_dispatch_ids: Set[str] = set()
chapters_to_enqueue: Set[str] = set()
affected_chapter_ids: Set[str] = set()
lock_t0 = time.perf_counter()
lock_handle = _acquire_chapter_lock(
user_id, chapter_category, ttl_seconds=_chapter_lock_ttl()
)
lock_elapsed = time.perf_counter() - lock_t0
if lock_handle is None:
logger.warning(
"event=memoir_phase2_lock_busy user_id={} chapter_category={}",
user_id,
chapter_category,
)
raise self.retry(countdown=10)
try:
# 锁内再查一次,避免等待锁期间状态已变
category_segments = list(db.execute(stmt).scalars().all())
if not category_segments:
merge_pipeline_run(
cid,
{
"phase2": [
{
"chapter_category": chapter_category,
"task_id": str(task_id),
"status": "noop",
"detail": {"reason": "empty_after_lock"},
}
],
},
)
return {"status": "noop"}
state = get_or_create_state_sync(user_id, db)
segment_texts = [seg.user_input_text or "" for seg in category_segments]
combined_text = "\n\n".join(segment_texts)
n_units = len(category_segments)
evidence_top_k = int(settings.evidence_top_k_default)
if n_units > int(settings.evidence_large_batch_threshold):
evidence_top_k = int(settings.evidence_top_k_large_batch)
try:
memory_evidence = asyncio.run(
_memory_retrieve_evidence(
user_id,
combined_text,
top_k=evidence_top_k,
)
)
except Exception as e:
logger.warning("Evidence 检索跳过: {}", e)
memory_evidence = {
"relevant_chunks": [],
"relevant_summaries": [],
"relevant_facts": [],
"relevant_stories": [],
}
pipeline_t0 = time.perf_counter()
pipeline_result = run_story_pipeline_for_category_batch(
db,
user_id=user_id,
chapter_category=chapter_category,
category_segments=category_segments,
state=state,
user_profile=user_profile,
user_birth_year=user_birth_year,
llm=llm,
background_voice=background_voice,
occupation=user_occupation,
memoir_correlation_id=cid,
llm_fast=llm_fast,
memory_evidence=memory_evidence,
language=user_language,
)
pipeline_elapsed = time.perf_counter() - pipeline_t0
if pipeline_result.deferred:
deferred_response = _persist_phase2_route_defer(
db,
user_id=user_id,
chapter_category=chapter_category,
task_id=str(task_id),
memoir_correlation_id=cid,
defer_segment_ids=pipeline_result.defer_segment_ids,
defer_reason=pipeline_result.defer_reason or "unknown",
phase2_started=phase2_t0,
pipeline_elapsed=pipeline_elapsed,
lock_elapsed=lock_elapsed,
)
return deferred_response
chapter = pipeline_result.chapter
story_dispatch_ids |= pipeline_result.dispatch_ids
db.flush()
if chapter is None:
logger.error(
"event=memoir_phase2_no_chapter user_id={} chapter_category={}",
user_id,
chapter_category,
)
db.rollback()
raise self.retry(
exc=RuntimeError("story_pipeline returned no chapter"),
countdown=30,
)
db.refresh(chapter)
affected_chapter_ids.add(chapter.id)
needs_cover_enqueue = (
image_settings.enabled and chapter_needs_cover_enqueue(chapter)
)
stmt_book = (
select(Book)
.where(Book.user_id == user_id)
.order_by(Book.updated_at.desc())
)
result_book = db.execute(stmt_book)
book = result_book.scalar_one_or_none()
if not book:
book = Book(
id=str(uuid.uuid4()),
user_id=user_id,
title="我的回忆录",
total_pages=0,
total_words=0,
cover_image_url=None,
)
db.add(book)
book.has_update = True
book.last_update_chapter_id = chapter.id
if needs_cover_enqueue:
chapters_to_enqueue.add(chapter.id)
for seg in category_segments:
seg.narrated = True
seg.processed = True
db.commit()
_run_post_pipeline_commit(
user_id=user_id,
story_dispatch_ids=story_dispatch_ids,
recompose_chapter_ids=affected_chapter_ids,
cover_chapter_ids=chapters_to_enqueue,
trigger_source="pipeline_phase2",
need_compaction=True,
need_quality_pass=True,
memoir_correlation_id=cid,
compaction_extra={
"pipeline_run_id": str(task_id),
"memoir_correlation_id": cid,
"story_dispatch_ids": sorted(story_dispatch_ids),
"chapters_to_enqueue": sorted(chapters_to_enqueue),
"chapter_category": chapter_category,
},
)
phase2_elapsed = time.perf_counter() - phase2_t0
duration_ms = phase2_elapsed * 1000
logger.info(
"event=memoir_phase2_done user_id={} task_id={} chapter_category={} "
"segment_count={} memoir_correlation_id={} "
"lock_seconds={:.3f} pipeline_seconds={:.3f} "
"phase2_total_seconds={:.3f} duration_ms={:.1f} "
"msg=回忆录第二阶段叙事完成",
user_id,
task_id,
chapter_category,
len(category_segments),
cid,
lock_elapsed,
pipeline_elapsed,
phase2_elapsed,
duration_ms,
)
merge_pipeline_run(
cid,
{
"phase2": [
{
"chapter_category": chapter_category,
"task_id": str(task_id),
"status": "success",
"detail": {"segments": len(category_segments)},
}
],
},
)
return {
"status": "success",
"chapter_category": chapter_category,
"segments": len(category_segments),
}
finally:
_release_chapter_lock(lock_handle)
except Retry:
raise
except Exception as e:
logger.error(
"event=memoir_phase2_failed user_id={} chapter_category={} exc={} "
"msg=回忆录第二阶段失败",
user_id,
chapter_category,
e,
)
merge_pipeline_run(
cid,
{
"phase2": [
{
"chapter_category": chapter_category,
"task_id": str(task_id),
"status": "failure",
"detail": {"error": str(e)},
}
],
},
)
raise self.retry(exc=e) from e
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def process_memoir_phase1(self, user_id: str, segment_ids: List[str]):
"""
Phase 1记忆 ingest + 抽取/分类;持久化 topic_category / skip_narrative
按需派发 Phase 2阈值或延迟兜底
"""
task_id = self.request.id
memoir_correlation_id = new_memoir_correlation_id()
logger.info(
"event=memoir_phase1_start user_id={} task_id={} segments={} "
"memoir_correlation_id={} msg=回忆录第一阶段抽取与分类开始",
user_id,
task_id,
len(segment_ids),
memoir_correlation_id,
)
_update_task_status_sync(user_id, task_id, "running")
init_pipeline_run_from_phase1(
user_id,
memoir_correlation_id,
task_id,
segment_count=len(segment_ids),
)
phase1_t0 = time.perf_counter()
try:
with get_sync_db() as db:
user_obj_for_lang = db.get(User, user_id)
user_language = (
"en"
if user_obj_for_lang is not None
and str(getattr(user_obj_for_lang, "language_preference", "zh") or "zh").lower()
== "en"
else "zh"
)
stmt = (
select(Segment)
.where(Segment.id.in_(segment_ids))
.order_by(Segment.created_at.asc(), Segment.id.asc())
)
rows = db.execute(stmt).scalars().all()
segments = [s for s in rows if not s.narrated]
if not segments:
logger.warning("event=memoir_phase1_no_segments ids={}", segment_ids)
merge_pipeline_run(
memoir_correlation_id,
{
"phase1": {
"status": "success",
"step": "no_segments",
"detail": {"processed": 0},
},
},
)
_update_task_status_sync(
user_id,
task_id,
"success",
{"processed": 0, "categories": []},
)
return {"status": "no_segments"}
merge_pipeline_run(
memoir_correlation_id,
{
"phase1": {
"step": "memory_ingest",
"detail": {"candidates": len(segments)},
},
},
)
ingest_t0 = time.perf_counter()
ingest_items: list[tuple[str, str, dict | None]] = []
non_empty_segments: list = []
for seg in segments:
text = (seg.user_input_text or "").strip()
if not text:
continue
conv_id = getattr(seg, "conversation_id", None) or ""
ln = getattr(seg, "lineage_json", None)
lineage_payload = ln if isinstance(ln, dict) else None
ingest_items.append((conv_id, text, lineage_payload))
non_empty_segments.append(seg)
ingested_source_ids: list[str] = []
if ingest_items:
try:
ingested_source_ids = asyncio.run(
_memory_ingest_transcripts_batch(
user_id,
ingest_items,
memoir_correlation_id=memoir_correlation_id,
)
)
for seg, sid in zip(
non_empty_segments, ingested_source_ids, strict=True
):
logger.info(
"event=memory_transcript_ingested user_id={} task_id={} "
"source_id={} conversation_id={} segment_id={} transcript_chars={}",
user_id,
task_id,
sid,
getattr(seg, "conversation_id", None) or "",
seg.id,
len((seg.user_input_text or "").strip()),
)
except Exception as e:
logger.warning(
"Memory batch ingest 失败: {} exc_type={}",
e,
type(e).__name__,
)
ingest_elapsed = time.perf_counter() - ingest_t0
merge_pipeline_run(
memoir_correlation_id,
{
"phase1": {
"step": "prepare_batches",
"detail": {
"memory_ingest_seconds": round(ingest_elapsed, 3),
"ingested_sources": len(ingested_source_ids),
},
},
},
)
llm = _get_llm()
llm_fast = _get_llm_fast() or llm
if (settings.llm_fast_model or "").strip():
logger.info(
"event=llm_fast_tier_used pipeline=memoir_prepare_batches model={}",
settings.llm_fast_model,
)
prep_t0 = time.perf_counter()
memoir_orchestrator = MemoirOrchestrator()
def _phase1_chunk_cb(idx: int, total: int) -> None:
merge_pipeline_run(
memoir_correlation_id,
{"phase1": {"detail": {"prepare_batches_chunk": [idx, total]}}},
)
prepared = memoir_orchestrator.prepare_batches(
segments=list(segments),
llm=llm,
llm_fast=llm_fast,
get_or_create_state=lambda: get_or_create_state_sync(user_id, db),
update_slot=lambda stage, slot_name, snippet, seg_ids: update_slot_sync(
user_id,
stage,
slot_name,
snippet,
seg_ids,
db,
memoir_batch=True,
),
on_phase1_chunk=_phase1_chunk_cb,
language=user_language,
)
prep_elapsed = time.perf_counter() - prep_t0
merge_pipeline_run(
memoir_correlation_id,
{
"phase1": {
"step": "persist_topics",
"detail": {"prepare_batches_seconds": round(prep_elapsed, 3)},
},
},
)
skip_ids = prepared.segment_skip_story_ids
missing_cat = [
seg.id
for seg in segments
if not prepared.segment_chapter_category.get(str(seg.id))
]
if missing_cat:
logger.error(
"event=memoir_phase1_missing_category abort segment_ids={}",
missing_cat,
)
raise RuntimeError(
f"memoir_phase1_missing_category: {len(missing_cat)} segments"
)
for seg in segments:
cat = prepared.segment_chapter_category[str(seg.id)]
seg.topic_category = cat
is_skip = str(seg.id) in skip_ids
seg.skip_narrative = is_skip
seg.narrated = False
if is_skip:
seg.processed = True
db.flush()
categories_for_phase2: Set[str] = set()
phase2_immediate: list[str] = []
phase2_timeout: list[str] = []
woke_up_by_category: dict[str, int] = {}
for chapter_category, cat_segments in prepared.category_to_segments.items():
batch_non_skip = [
s
for s in cat_segments
if str(s.id) not in prepared.segment_skip_story_ids
]
if not batch_non_skip:
continue
woke = _wake_deferred_segments_for_category(
db, user_id, chapter_category
)
if woke:
woke_up_by_category[chapter_category] = woke
max_chars = max(
len((s.user_input_text or "").strip()) for s in batch_non_skip
)
categories_for_phase2.add(chapter_category)
if _should_trigger_phase2(db, user_id, chapter_category, max_chars):
phase2_immediate.append(chapter_category)
else:
phase2_timeout.append(chapter_category)
if woke_up_by_category:
logger.info(
"event=memoir_phase1_wake_deferred user_id={} categories={} "
"msg=Phase1 新素材唤醒同类目延迟 segment",
user_id,
woke_up_by_category,
)
db.commit()
merge_pipeline_run(
memoir_correlation_id,
{
"phase1": {
"step": "dispatch_phase2",
"detail": {
"phase2_immediate": list(phase2_immediate),
"phase2_timeout": list(phase2_timeout),
},
},
},
)
for cc in phase2_immediate:
p2tid = _dispatch_phase2_immediate(user_id, cc, memoir_correlation_id)
if p2tid:
merge_pipeline_run(
memoir_correlation_id,
{
"phase2": [
{
"chapter_category": cc,
"task_id": str(p2tid),
"status": "enqueued",
}
],
},
)
for cc in phase2_timeout:
p2tid = _schedule_phase2_timeout(user_id, cc, memoir_correlation_id)
if p2tid:
merge_pipeline_run(
memoir_correlation_id,
{
"phase2": [
{
"chapter_category": cc,
"task_id": str(p2tid),
"status": "scheduled_timeout",
}
],
},
)
categories_processed = sorted(prepared.category_to_segments.keys())
phase1_elapsed = time.perf_counter() - phase1_t0
merge_pipeline_run(
memoir_correlation_id,
{
"phase1": {
"status": "success",
"step": "completed",
"detail": {
"processed": len(segments),
"phase1_total_seconds": round(phase1_elapsed, 3),
},
},
},
)
_update_task_status_sync(
user_id,
task_id,
"success",
{
"processed": len(segments),
"categories_processed": categories_processed,
"phase2_watch_categories": sorted(categories_for_phase2),
},
)
duration_ms = phase1_elapsed * 1000
logger.info(
"event=memoir_phase1_done user_id={} task_id={} segment_count={} "
"categories={} memoir_correlation_id={} "
"memory_ingest_seconds={:.3f} prepare_batches_seconds={:.3f} "
"phase1_total_seconds={:.3f} duration_ms={:.1f} "
"msg=回忆录第一阶段完成",
user_id,
task_id,
len(segments),
categories_processed,
memoir_correlation_id,
ingest_elapsed,
prep_elapsed,
phase1_elapsed,
duration_ms,
)
return {
"status": "success",
"processed": len(segments),
"categories_processed": categories_processed,
}
except Retry:
raise
except Exception as e:
logger.error(
"event=memoir_phase1_failed user_id={} exc={} msg=回忆录第一阶段失败",
user_id,
e,
)
merge_pipeline_run(
memoir_correlation_id,
{
"phase1": {
"status": "failure",
"step": "error",
"detail": {"error": str(e)},
},
},
)
_update_task_status_sync(user_id, task_id, "failure", {"error": str(e)})
raise self.retry(exc=e) from e
@shared_task(bind=True, max_retries=3, default_retry_delay=30)
def generate_chapter_content(self, user_id: str, stage: str, new_content: str):
"""
单独生成章节内容的任务(用于实时更新)
Args:
user_id: 用户 ID
stage: 阶段
new_content: 新内容
"""
stage = normalize_chapter_category(stage, fallback="summary")
cid = effective_correlation_id(explicit=None, celery_task_id=str(self.request.id))
gen_t0 = time.perf_counter()
logger.info(
"event=generate_chapter_content_start user_id={} stage={} memoir_correlation_id={} "
"msg=实时章节生成任务开始",
user_id,
stage,
cid,
)
try:
with get_sync_db() as db:
llm = _get_llm()
llm_fast = _get_llm_fast() or llm
user_obj = db.get(User, user_id)
user_profile = ""
user_birth_year = None
background_voice = "default"
user_occupation = ""
user_language = "zh"
if user_obj:
user_birth_year = user_obj.birth_year
user_language = (
"en"
if str(getattr(user_obj, "language_preference", "zh") or "zh").lower()
== "en"
else "zh"
)
user_profile = format_user_profile_context(
birth_year=user_obj.birth_year,
birth_place=user_obj.birth_place,
grew_up_place=user_obj.grew_up_place,
occupation=user_obj.occupation,
language=user_language,
)
background_voice = infer_background_voice(user_obj.occupation)
user_occupation = user_obj.occupation or ""
class _Seg:
def __init__(self, text: str):
self.id = str(uuid.uuid4())
self.user_input_text = text
state = get_or_create_state_sync(user_id, db)
chapter, _, dispatch_ids = run_story_pipeline_for_category_batch(
db,
user_id=user_id,
chapter_category=stage,
category_segments=[_Seg(new_content)],
state=state,
user_profile=user_profile,
user_birth_year=user_birth_year,
llm=llm,
background_voice=background_voice,
occupation=user_occupation,
memoir_correlation_id=cid,
llm_fast=llm_fast,
language=user_language,
)
db.flush()
if chapter is None:
logger.error(
"event=generate_chapter_content_no_chapter user_id={} stage={}",
user_id,
stage,
)
db.rollback()
raise self.retry(
exc=RuntimeError("story_pipeline returned no chapter"),
countdown=30,
)
db.commit()
db.refresh(chapter)
ch_ids: set[str] = {str(chapter.id)}
cover_ids = ch_ids if chapter_needs_cover_enqueue(chapter) else set()
_run_post_pipeline_commit(
user_id=user_id,
story_dispatch_ids=set(dispatch_ids),
recompose_chapter_ids=ch_ids,
cover_chapter_ids=cover_ids,
trigger_source="pipeline_generate_chapter",
need_compaction=False,
need_quality_pass=True,
memoir_correlation_id=cid,
)
ms = (time.perf_counter() - gen_t0) * 1000
logger.info(
"event=generate_chapter_content_done user_id={} stage={} "
"memoir_correlation_id={} duration_ms={:.1f} msg=实时章节生成完成",
user_id,
stage,
cid,
ms,
)
return {"status": "success"}
except Retry:
raise
except Exception as e:
ms = (time.perf_counter() - gen_t0) * 1000
logger.error(
"event=generate_chapter_content_failed user_id={} stage={} duration_ms={:.1f} "
"exc={} msg=实时章节生成失败",
user_id,
stage,
ms,
e,
)
raise self.retry(exc=e) from e