* add staging ios app build script * feat(api): add OpenTelemetry LGTM stack for local observability Wire OTel traces, metrics, and logs through a collector to Tempo, Prometheus, and Loki, with custom LLM instrumentation, dev compose overlay, Grafana provisioning, env templates, and development.sh auto-start. * feat: expand observability, harden dev tooling, and fix expo staging UX Add business and LLM Prometheus metrics with Grafana dashboards, alerting, and a metrics verification script. Wire telemetry through adapters and core LLM paths, and document the local LGTM workflow. Fix development.sh for macOS bash 3.2, open Grafana and eval-web in Chrome, and repair eval-web auto-open (unbound EVAL_WEB_BROWSER_SCHEDULED). Merge internal-eval into the main dev script with improved compose handling. Require EXPO_PUBLIC_* at build time, improve iOS HTTP ATS for staging IPs, show memoir empty state instead of load errors when no chapters exist, and add jest env setup plus chapter list response normalization. * chore: enable Grafana Assistant Cursor plugin * fix: memoir empty state and repair withdrawn 0020_chapters_book_id stamp Show empty memoir UI when the chapter list succeeds with no items; treat auth/404 as non-fatal. Extend alembic revision repair so local dev DBs stamped with the removed 0020_chapters_book_id migration can roll back and upgrade to 0019. --------- Co-authored-by: Kevin <kevin@brighteng.org> Co-authored-by: Cursor <cursoragent@cursor.com>
1402 lines
50 KiB
Python
1402 lines
50 KiB
Python
"""
|
||
回忆录处理 Celery 任务
|
||
"""
|
||
|
||
import asyncio
|
||
import json
|
||
import time
|
||
import uuid
|
||
from datetime import datetime, timedelta, timezone
|
||
from typing import Dict, List, Set
|
||
|
||
import redis
|
||
from celery import shared_task
|
||
from celery.exceptions import Retry
|
||
from celery.result import AsyncResult
|
||
from sqlalchemy import func, select
|
||
from sqlalchemy.orm import Session
|
||
|
||
from app.agents.chat.background_voice import infer_background_voice
|
||
from app.agents.chat.prompts_profile import format_user_profile_context
|
||
from app.agents.memoir import MemoirOrchestrator
|
||
from app.agents.stage_constants import normalize_chapter_category
|
||
from app.core.chapter_pipeline_lock import (
|
||
acquire_chapter_pipeline_lock as _acquire_chapter_lock,
|
||
)
|
||
from app.core.chapter_pipeline_lock import (
|
||
release_chapter_pipeline_lock as _release_chapter_lock,
|
||
)
|
||
from app.core.business_telemetry import business_span
|
||
from app.core.config import settings
|
||
from app.core.db import AsyncSessionLocal, get_sync_db
|
||
from app.core.dependencies import get_embedding_provider
|
||
from app.core.llm_gateway import LlmGateway, LlmUseCase
|
||
from app.core.logging import get_logger
|
||
from app.core.memoir_pipeline_progress import (
|
||
init_pipeline_run_from_phase1,
|
||
merge_pipeline_run,
|
||
)
|
||
from app.core.memoir_pipeline_trace import (
|
||
effective_correlation_id,
|
||
new_memoir_correlation_id,
|
||
)
|
||
from app.features.conversation.models import Conversation, Segment
|
||
from app.features.memoir.cover_eligibility import (
|
||
chapter_needs_cover_enqueue,
|
||
)
|
||
from app.features.memoir.memoir_images.parser import (
|
||
build_initial_image_assets,
|
||
)
|
||
from app.features.memoir.memoir_images.schema import (
|
||
IMAGE_STATUS_COMPLETED,
|
||
IMAGE_STATUS_FAILED,
|
||
IMAGE_STATUS_PENDING,
|
||
normalize_image_assets,
|
||
)
|
||
from app.features.memoir.memoir_images.serializers import (
|
||
image_dict_to_row_kwargs,
|
||
)
|
||
from app.features.memoir.memoir_images.settings import MemoirImageSettings
|
||
from app.features.memoir.models import (
|
||
Book,
|
||
MemoirImage,
|
||
)
|
||
from app.features.memoir.state_service import (
|
||
get_or_create_state_sync,
|
||
update_slot_sync,
|
||
)
|
||
from app.features.memoir.story_pipeline_sync import (
|
||
run_story_pipeline_for_category_batch,
|
||
)
|
||
from app.features.memory.service import MemoryService
|
||
from app.features.user.models import User
|
||
from app.tasks.celery_app import celery_app
|
||
|
||
logger = get_logger(__name__)
|
||
_REDIS_CLIENTS: dict[bool, redis.Redis] = {}
|
||
|
||
|
||
def _run_post_pipeline_commit(
|
||
*,
|
||
user_id: str,
|
||
story_dispatch_ids: set[str],
|
||
recompose_chapter_ids: set[str],
|
||
cover_chapter_ids: set[str],
|
||
trigger_source: str,
|
||
need_compaction: bool,
|
||
need_quality_pass: bool = False,
|
||
memoir_correlation_id: str | None = None,
|
||
compaction_extra: dict | None = None,
|
||
) -> None:
|
||
"""Shared post-commit dispatch: images, recompose, compaction, quality pass, covers."""
|
||
from app.features.story.post_commit import enqueue_story_post_commit_effects
|
||
|
||
pc = enqueue_story_post_commit_effects(
|
||
user_id=user_id,
|
||
story_ids=set(story_dispatch_ids),
|
||
chapter_ids=recompose_chapter_ids,
|
||
trigger_source=trigger_source,
|
||
need_compaction=need_compaction,
|
||
need_quality_pass=need_quality_pass,
|
||
memoir_correlation_id=memoir_correlation_id,
|
||
compaction_extra=compaction_extra,
|
||
)
|
||
logger.info(
|
||
"event=story_post_commit user_id={} trigger={} "
|
||
"enqueued_story_image_count={} enqueued_chapter_recompose_count={} "
|
||
"compaction_scheduled={} quality_pass_scheduled={} errors={}",
|
||
user_id,
|
||
trigger_source,
|
||
pc.enqueued_story_image_count,
|
||
pc.enqueued_chapter_recompose_count,
|
||
pc.compaction_scheduled,
|
||
pc.quality_pass_scheduled,
|
||
pc.errors,
|
||
)
|
||
|
||
if cover_chapter_ids:
|
||
image_settings = MemoirImageSettings.from_env()
|
||
if image_settings.enabled:
|
||
from app.tasks.chapter_cover_enqueue import (
|
||
try_enqueue_generate_chapter_cover,
|
||
)
|
||
|
||
for ch_id in sorted(cover_chapter_ids):
|
||
if try_enqueue_generate_chapter_cover(ch_id, source=trigger_source):
|
||
logger.info("派发章节封面任务: chapter={}", ch_id)
|
||
|
||
|
||
def _get_llm():
|
||
"""Celery 任务内获取 LangChain LLM(通过 port)"""
|
||
try:
|
||
return LlmGateway().langchain_llm_for(LlmUseCase("memoir_tasks"))
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _get_llm_fast():
|
||
"""分类 / 抽取等快档位任务(与叙事、路由默认模型可分离)。"""
|
||
try:
|
||
return LlmGateway().langchain_llm_for(
|
||
LlmUseCase("memoir_tasks.fast", fast=True)
|
||
)
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
async def _memory_ingest_transcripts_batch(
|
||
user_id: str,
|
||
items: list[tuple[str, str, dict | None]],
|
||
*,
|
||
memoir_correlation_id: str,
|
||
) -> list[str]:
|
||
async with AsyncSessionLocal() as db:
|
||
service = MemoryService(db, embedding_provider=get_embedding_provider())
|
||
return await service.ingest_transcripts_batch(
|
||
user_id,
|
||
items,
|
||
memoir_correlation_id=memoir_correlation_id,
|
||
)
|
||
|
||
|
||
async def _memory_retrieve_evidence(
|
||
user_id: str,
|
||
query: str,
|
||
*,
|
||
top_k: int,
|
||
) -> dict:
|
||
async with AsyncSessionLocal() as db:
|
||
service = MemoryService(db, embedding_provider=get_embedding_provider())
|
||
bundle = await service.retrieve(user_id, query, top_k=top_k)
|
||
return bundle.model_dump()
|
||
|
||
|
||
def _get_redis_client(*, decode_responses: bool = False) -> redis.Redis:
|
||
from app.core.config import settings
|
||
|
||
client = _REDIS_CLIENTS.get(decode_responses)
|
||
if client is None:
|
||
client = redis.from_url(
|
||
settings.redis_url,
|
||
decode_responses=decode_responses,
|
||
)
|
||
_REDIS_CLIENTS[decode_responses] = client
|
||
return client
|
||
|
||
|
||
def _chapter_lock_ttl() -> int:
|
||
return int(settings.chapter_pipeline_lock_ttl_seconds)
|
||
|
||
|
||
def _update_task_status_sync(
|
||
user_id: str, task_id: str, status: str, result: Dict = None
|
||
):
|
||
"""同步更新任务状态(在 Celery 任务中使用)"""
|
||
try:
|
||
r = _get_redis_client(decode_responses=True)
|
||
|
||
key = f"task:user:{user_id}:tasks"
|
||
|
||
# 获取现有任务信息
|
||
data = r.hget(key, task_id)
|
||
if data:
|
||
task_info = json.loads(data)
|
||
else:
|
||
task_info = {"task_id": task_id}
|
||
|
||
task_info["status"] = status
|
||
task_info["updated_at"] = datetime.now(timezone.utc).isoformat()
|
||
if result is not None:
|
||
task_info["result"] = result
|
||
|
||
r.hset(key, task_id, json.dumps(task_info))
|
||
r.expire(key, 3600) # 1小时过期
|
||
|
||
logger.debug("任务状态已更新: task_id={} status={}", task_id, status)
|
||
except Exception as e:
|
||
logger.error(
|
||
"event=memoir_task_status_update_failed msg=更新任务状态失败 exc={}", e
|
||
)
|
||
|
||
|
||
def _merge_chapter_image_assets(
|
||
existing_images: list[dict] | None,
|
||
placeholders: list[dict],
|
||
provider: str,
|
||
style: str,
|
||
size: str,
|
||
now_iso: str,
|
||
) -> list[dict]:
|
||
normalized_existing_images = normalize_image_assets(existing_images)
|
||
existing_by_placeholder = {
|
||
item.get("placeholder"): dict(item)
|
||
for item in normalized_existing_images
|
||
if item.get("placeholder")
|
||
}
|
||
merged_assets: list[dict] = []
|
||
|
||
for item in placeholders:
|
||
existing = existing_by_placeholder.get(item["placeholder"])
|
||
if existing:
|
||
merged_item = dict(existing)
|
||
merged_item["index"] = item["index"]
|
||
merged_item["placeholder"] = item["placeholder"]
|
||
merged_item["description"] = item["description"]
|
||
merged_item["provider"] = merged_item.get("provider") or provider
|
||
merged_item["style"] = merged_item.get("style") or style
|
||
merged_item["size"] = merged_item.get("size") or size
|
||
merged_item["created_at"] = merged_item.get("created_at") or now_iso
|
||
merged_item["updated_at"] = merged_item.get("updated_at") or now_iso
|
||
if merged_item.get("status") == IMAGE_STATUS_COMPLETED and not (
|
||
merged_item.get("storage_key") or merged_item.get("url")
|
||
):
|
||
merged_item["status"] = IMAGE_STATUS_FAILED
|
||
merged_item["error"] = merged_item.get("error") or "missing image url"
|
||
else:
|
||
merged_item = build_initial_image_assets(
|
||
placeholders=[item],
|
||
provider=provider,
|
||
style=style,
|
||
size=size,
|
||
now_iso=now_iso,
|
||
)[0]
|
||
merged_assets.append(merged_item)
|
||
|
||
return merged_assets
|
||
|
||
|
||
def chapter_has_images_to_generate(images: list[dict] | None) -> bool:
|
||
return any(
|
||
item.get("status") in {IMAGE_STATUS_PENDING, IMAGE_STATUS_FAILED}
|
||
for item in normalize_image_assets(images)
|
||
)
|
||
|
||
|
||
def _memoir_image_from_asset(
|
||
chapter_id: str,
|
||
order_index: int,
|
||
image_asset: dict,
|
||
) -> MemoirImage:
|
||
"""从单条图片 dict 构建 MemoirImage 行(用于写入 memoir_images 表)。"""
|
||
kwargs = image_dict_to_row_kwargs(image_asset)
|
||
return MemoirImage(
|
||
id=str(uuid.uuid4()).replace("-", "")[:32],
|
||
chapter_id=chapter_id,
|
||
order_index=order_index,
|
||
**kwargs,
|
||
)
|
||
|
||
|
||
def _phase2_timeout_task_id(user_id: str, chapter_category: str) -> str:
|
||
return f"phase2-timeout-{user_id}-{chapter_category}"
|
||
|
||
|
||
def _revoke_phase2_timeout(user_id: str, chapter_category: str) -> None:
|
||
tid = _phase2_timeout_task_id(user_id, chapter_category)
|
||
try:
|
||
AsyncResult(tid, app=celery_app).revoke(terminate=False)
|
||
except Exception as e:
|
||
logger.debug(
|
||
"event=phase2_timeout_revoke_skipped task_id={} exc={}",
|
||
tid,
|
||
e,
|
||
)
|
||
|
||
|
||
def _should_trigger_phase2(
|
||
db: Session,
|
||
user_id: str,
|
||
chapter_category: str,
|
||
current_segment_chars: int,
|
||
) -> bool:
|
||
if current_segment_chars >= int(settings.memoir_narrative_immediate_char_threshold):
|
||
return True
|
||
user_convs = select(Conversation.id).where(
|
||
Conversation.user_id == user_id,
|
||
Conversation.deleted_at.is_(None),
|
||
)
|
||
stmt = select(
|
||
func.count(Segment.id),
|
||
func.coalesce(func.sum(func.length(Segment.user_input_text)), 0),
|
||
).where(
|
||
Segment.conversation_id.in_(user_convs),
|
||
Segment.topic_category == chapter_category,
|
||
Segment.narrated.is_(False),
|
||
Segment.skip_narrative.is_(False),
|
||
)
|
||
row = db.execute(stmt).one()
|
||
count, total_chars = int(row[0] or 0), int(row[1] or 0)
|
||
if count >= int(settings.memoir_narrative_batch_min_segments):
|
||
return True
|
||
if total_chars >= int(settings.memoir_narrative_batch_min_chars):
|
||
return True
|
||
return False
|
||
|
||
|
||
def _phase2_immediate_task_id(user_id: str, chapter_category: str) -> str:
|
||
return f"phase2-immediate-{user_id}-{chapter_category}"
|
||
|
||
|
||
def _wake_deferred_segments_for_category(
|
||
db: Session,
|
||
user_id: str,
|
||
chapter_category: str,
|
||
) -> int:
|
||
"""清空该用户某 chapter_category 下旧的 defer 元数据,让其与新素材一起重判。
|
||
|
||
返回被唤醒的 segment 数量,仅用于日志。
|
||
"""
|
||
user_convs = select(Conversation.id).where(
|
||
Conversation.user_id == user_id,
|
||
Conversation.deleted_at.is_(None),
|
||
)
|
||
stmt = select(Segment).where(
|
||
Segment.conversation_id.in_(user_convs),
|
||
Segment.topic_category == chapter_category,
|
||
Segment.narrated.is_(False),
|
||
Segment.skip_narrative.is_(False),
|
||
Segment.narrative_deferred_until.isnot(None),
|
||
)
|
||
rows = list(db.execute(stmt).scalars().all())
|
||
if not rows:
|
||
return 0
|
||
for seg in rows:
|
||
seg.narrative_deferred_until = None
|
||
seg.narrative_defer_count = 0
|
||
seg.narrative_defer_reason = None
|
||
return len(rows)
|
||
|
||
|
||
def _persist_phase2_route_defer(
|
||
db: Session,
|
||
*,
|
||
user_id: str,
|
||
chapter_category: str,
|
||
task_id: str,
|
||
memoir_correlation_id: str | None,
|
||
defer_segment_ids: list[str],
|
||
defer_reason: str,
|
||
phase2_started: float,
|
||
pipeline_elapsed: float,
|
||
lock_elapsed: float,
|
||
) -> dict:
|
||
"""把本批 segment 标记为延迟态,并按需再排一次 Phase2 timeout。
|
||
|
||
返回 Celery 任务的 result dict(``status=deferred``)。
|
||
"""
|
||
now_ts = datetime.now(timezone.utc)
|
||
max_attempts = int(settings.memoir_route_defer_max_attempts)
|
||
defer_seconds = float(settings.memoir_route_defer_seconds)
|
||
deferred_until_ts = now_ts + timedelta(seconds=max(defer_seconds, 1.0))
|
||
|
||
rows: list[Segment] = []
|
||
if defer_segment_ids:
|
||
stmt = select(Segment).where(Segment.id.in_(list(defer_segment_ids)))
|
||
rows = list(db.execute(stmt).scalars().all())
|
||
|
||
saturated_segments = 0
|
||
new_max_attempts_reached = False
|
||
for seg in rows:
|
||
prev_count = int(seg.narrative_defer_count or 0)
|
||
seg.narrative_defer_count = prev_count + 1
|
||
seg.narrative_defer_reason = defer_reason
|
||
seg.narrative_last_attempt_at = now_ts
|
||
if seg.narrative_defer_count >= max_attempts:
|
||
seg.narrative_deferred_until = None
|
||
saturated_segments += 1
|
||
new_max_attempts_reached = True
|
||
else:
|
||
seg.narrative_deferred_until = deferred_until_ts
|
||
|
||
db.commit()
|
||
|
||
next_task_id: str | None = None
|
||
if rows and not new_max_attempts_reached:
|
||
next_task_id = _schedule_phase2_timeout(
|
||
user_id, chapter_category, memoir_correlation_id
|
||
)
|
||
|
||
phase2_elapsed = time.perf_counter() - phase2_started
|
||
duration_ms = phase2_elapsed * 1000
|
||
logger.info(
|
||
"event=memoir_phase2_route_deferred user_id={} task_id={} chapter_category={} "
|
||
"segment_count={} saturated_count={} reason={} memoir_correlation_id={} "
|
||
"lock_seconds={:.3f} pipeline_seconds={:.3f} "
|
||
"phase2_total_seconds={:.3f} duration_ms={:.1f} next_task_id={} "
|
||
"msg=Phase2 路由低置信,本批 segment 延迟",
|
||
user_id,
|
||
task_id,
|
||
chapter_category,
|
||
len(rows),
|
||
saturated_segments,
|
||
defer_reason,
|
||
memoir_correlation_id or "",
|
||
lock_elapsed,
|
||
pipeline_elapsed,
|
||
phase2_elapsed,
|
||
duration_ms,
|
||
next_task_id or "",
|
||
)
|
||
merge_pipeline_run(
|
||
memoir_correlation_id,
|
||
{
|
||
"phase2": [
|
||
{
|
||
"chapter_category": chapter_category,
|
||
"task_id": str(task_id),
|
||
"status": "deferred",
|
||
"detail": {
|
||
"segments": len(rows),
|
||
"reason": defer_reason,
|
||
"saturated_count": saturated_segments,
|
||
"next_task_id": next_task_id,
|
||
},
|
||
}
|
||
],
|
||
},
|
||
)
|
||
return {
|
||
"status": "deferred",
|
||
"chapter_category": chapter_category,
|
||
"segments": len(rows),
|
||
"reason": defer_reason,
|
||
"saturated_count": saturated_segments,
|
||
}
|
||
|
||
|
||
def _schedule_phase2_timeout(
|
||
user_id: str, chapter_category: str, memoir_correlation_id: str | None = None
|
||
) -> str | None:
|
||
"""Reset countdown for Phase 2 narrative for one category。返回 Celery task_id。"""
|
||
_revoke_phase2_timeout(user_id, chapter_category)
|
||
countdown = float(max(1.0, settings.memoir_narrative_batch_max_wait_seconds))
|
||
p2_kwargs: dict = {}
|
||
if memoir_correlation_id:
|
||
p2_kwargs["memoir_correlation_id"] = memoir_correlation_id
|
||
timeout_tid = _phase2_timeout_task_id(user_id, chapter_category)
|
||
celery_app.send_task(
|
||
"app.tasks.memoir_tasks.process_memoir_phase2",
|
||
args=[user_id, chapter_category],
|
||
kwargs=p2_kwargs,
|
||
countdown=countdown,
|
||
task_id=timeout_tid,
|
||
)
|
||
logger.info(
|
||
"event=phase2_timeout_scheduled user_id={} chapter_category={} countdown={} "
|
||
"memoir_correlation_id={}",
|
||
user_id,
|
||
chapter_category,
|
||
countdown,
|
||
memoir_correlation_id or "",
|
||
)
|
||
return timeout_tid
|
||
|
||
|
||
def _dispatch_phase2_immediate(
|
||
user_id: str, chapter_category: str, memoir_correlation_id: str | None = None
|
||
) -> str | None:
|
||
_revoke_phase2_timeout(user_id, chapter_category)
|
||
p2_kwargs: dict = {}
|
||
if memoir_correlation_id:
|
||
p2_kwargs["memoir_correlation_id"] = memoir_correlation_id
|
||
send_kw: dict = {
|
||
"args": [user_id, chapter_category],
|
||
"kwargs": p2_kwargs,
|
||
}
|
||
fixed_tid: str | None = None
|
||
if settings.memoir_phase2_singleflight_immediate:
|
||
fixed_tid = _phase2_immediate_task_id(user_id, chapter_category)
|
||
send_kw["task_id"] = fixed_tid
|
||
ar = celery_app.send_task("app.tasks.memoir_tasks.process_memoir_phase2", **send_kw)
|
||
out_tid = fixed_tid or getattr(ar, "id", None)
|
||
logger.info(
|
||
"event=phase2_dispatched_immediate user_id={} chapter_category={} "
|
||
"memoir_correlation_id={} task_id_mode={} celery_task_id={}",
|
||
user_id,
|
||
chapter_category,
|
||
memoir_correlation_id or "",
|
||
"singleflight" if settings.memoir_phase2_singleflight_immediate else "unique",
|
||
out_tid or "",
|
||
)
|
||
return out_tid
|
||
|
||
|
||
def dispatch_pending_memoir_phase2_for_user(user_id: str) -> None:
|
||
"""会话结束等场景:为该用户所有待叙事类目各发一条 Phase2(幂等)。"""
|
||
try:
|
||
with get_sync_db() as db:
|
||
user_convs = select(Conversation.id).where(
|
||
Conversation.user_id == user_id,
|
||
Conversation.deleted_at.is_(None),
|
||
)
|
||
stmt = (
|
||
select(Segment.topic_category)
|
||
.where(
|
||
Segment.conversation_id.in_(user_convs),
|
||
Segment.narrated.is_(False),
|
||
Segment.skip_narrative.is_(False),
|
||
Segment.topic_category.isnot(None),
|
||
)
|
||
.distinct()
|
||
)
|
||
cats = [r[0] for r in db.execute(stmt).all() if r[0]]
|
||
for chapter_category in cats:
|
||
_revoke_phase2_timeout(user_id, chapter_category)
|
||
flush_cid = new_memoir_correlation_id()
|
||
ar = celery_app.send_task(
|
||
"app.tasks.memoir_tasks.process_memoir_phase2",
|
||
args=[user_id, chapter_category],
|
||
kwargs={"memoir_correlation_id": flush_cid},
|
||
)
|
||
p2tid = getattr(ar, "id", None)
|
||
logger.info(
|
||
"event=phase2_dispatched_flush user_id={} chapter_category={} "
|
||
"memoir_correlation_id={} celery_task_id={}",
|
||
user_id,
|
||
chapter_category,
|
||
flush_cid,
|
||
p2tid or "",
|
||
)
|
||
if p2tid and flush_cid:
|
||
merge_pipeline_run(
|
||
flush_cid,
|
||
{
|
||
"user_id": user_id,
|
||
"phase2": [
|
||
{
|
||
"chapter_category": chapter_category,
|
||
"task_id": str(p2tid),
|
||
"status": "enqueued",
|
||
}
|
||
],
|
||
},
|
||
)
|
||
except Exception as e:
|
||
logger.error(
|
||
"event=phase2_flush_failed user_id={} exc_type={} exc={}",
|
||
user_id,
|
||
type(e).__name__,
|
||
e,
|
||
)
|
||
|
||
|
||
@shared_task(bind=True, max_retries=3, default_retry_delay=30)
|
||
def process_memoir_phase2(
|
||
self,
|
||
user_id: str,
|
||
chapter_category: str,
|
||
memoir_correlation_id: str | None = None,
|
||
):
|
||
"""Phase 2:叙事 / 路由 / 忠实度 / 标题;按类目加锁,消费未叙事且非 skip 的 segments。"""
|
||
task_id = self.request.id
|
||
cid = effective_correlation_id(
|
||
explicit=memoir_correlation_id, celery_task_id=str(task_id)
|
||
)
|
||
phase2_t0 = time.perf_counter()
|
||
logger.info(
|
||
"event=memoir_phase2_start user_id={} task_id={} chapter_category={} "
|
||
"memoir_correlation_id={} msg=回忆录第二阶段叙事任务开始",
|
||
user_id,
|
||
task_id,
|
||
chapter_category,
|
||
cid,
|
||
)
|
||
merge_pipeline_run(
|
||
cid,
|
||
{
|
||
"user_id": user_id,
|
||
"phase2": [
|
||
{
|
||
"chapter_category": chapter_category,
|
||
"task_id": str(task_id),
|
||
"status": "running",
|
||
}
|
||
],
|
||
},
|
||
)
|
||
try:
|
||
with business_span(
|
||
"memoir.phase2",
|
||
chapter_category=chapter_category,
|
||
), get_sync_db() as db:
|
||
user_convs = select(Conversation.id).where(
|
||
Conversation.user_id == user_id,
|
||
Conversation.deleted_at.is_(None),
|
||
)
|
||
now_utc = datetime.now(timezone.utc)
|
||
stmt = (
|
||
select(Segment)
|
||
.where(
|
||
Segment.conversation_id.in_(user_convs),
|
||
Segment.topic_category == chapter_category,
|
||
Segment.narrated.is_(False),
|
||
Segment.skip_narrative.is_(False),
|
||
(
|
||
Segment.narrative_deferred_until.is_(None)
|
||
| (Segment.narrative_deferred_until <= now_utc)
|
||
),
|
||
)
|
||
.order_by(Segment.created_at)
|
||
)
|
||
category_segments = list(db.execute(stmt).scalars().all())
|
||
|
||
if not category_segments:
|
||
ms = (time.perf_counter() - phase2_t0) * 1000
|
||
logger.info(
|
||
"event=memoir_phase2_noop user_id={} chapter_category={} "
|
||
"duration_ms={:.1f} msg=第二阶段无待叙事片段",
|
||
user_id,
|
||
chapter_category,
|
||
ms,
|
||
)
|
||
merge_pipeline_run(
|
||
cid,
|
||
{
|
||
"phase2": [
|
||
{
|
||
"chapter_category": chapter_category,
|
||
"task_id": str(task_id),
|
||
"status": "noop",
|
||
}
|
||
],
|
||
},
|
||
)
|
||
return {"status": "noop"}
|
||
|
||
llm = _get_llm()
|
||
llm_fast = _get_llm_fast() or llm
|
||
user_obj = db.get(User, user_id)
|
||
user_profile = ""
|
||
user_birth_year = None
|
||
background_voice = "default"
|
||
user_occupation = ""
|
||
user_language = "zh"
|
||
if user_obj:
|
||
user_birth_year = user_obj.birth_year
|
||
user_language = (
|
||
"en"
|
||
if str(getattr(user_obj, "language_preference", "zh") or "zh").lower()
|
||
== "en"
|
||
else "zh"
|
||
)
|
||
user_profile = format_user_profile_context(
|
||
birth_year=user_obj.birth_year,
|
||
birth_place=user_obj.birth_place,
|
||
grew_up_place=user_obj.grew_up_place,
|
||
occupation=user_obj.occupation,
|
||
language=user_language,
|
||
)
|
||
background_voice = infer_background_voice(user_obj.occupation)
|
||
user_occupation = user_obj.occupation or ""
|
||
|
||
image_settings = MemoirImageSettings.from_env()
|
||
story_dispatch_ids: Set[str] = set()
|
||
chapters_to_enqueue: Set[str] = set()
|
||
affected_chapter_ids: Set[str] = set()
|
||
|
||
lock_t0 = time.perf_counter()
|
||
with business_span(
|
||
"memoir.phase2.lock",
|
||
chapter_category=chapter_category,
|
||
):
|
||
lock_handle = _acquire_chapter_lock(
|
||
user_id, chapter_category, ttl_seconds=_chapter_lock_ttl()
|
||
)
|
||
lock_elapsed = time.perf_counter() - lock_t0
|
||
if lock_handle is None:
|
||
logger.warning(
|
||
"event=memoir_phase2_lock_busy user_id={} chapter_category={}",
|
||
user_id,
|
||
chapter_category,
|
||
)
|
||
raise self.retry(countdown=10)
|
||
|
||
try:
|
||
# 锁内再查一次,避免等待锁期间状态已变
|
||
category_segments = list(db.execute(stmt).scalars().all())
|
||
if not category_segments:
|
||
merge_pipeline_run(
|
||
cid,
|
||
{
|
||
"phase2": [
|
||
{
|
||
"chapter_category": chapter_category,
|
||
"task_id": str(task_id),
|
||
"status": "noop",
|
||
"detail": {"reason": "empty_after_lock"},
|
||
}
|
||
],
|
||
},
|
||
)
|
||
return {"status": "noop"}
|
||
|
||
state = get_or_create_state_sync(user_id, db)
|
||
segment_texts = [seg.user_input_text or "" for seg in category_segments]
|
||
combined_text = "\n\n".join(segment_texts)
|
||
n_units = len(category_segments)
|
||
evidence_top_k = int(settings.evidence_top_k_default)
|
||
if n_units > int(settings.evidence_large_batch_threshold):
|
||
evidence_top_k = int(settings.evidence_top_k_large_batch)
|
||
try:
|
||
memory_evidence = asyncio.run(
|
||
_memory_retrieve_evidence(
|
||
user_id,
|
||
combined_text,
|
||
top_k=evidence_top_k,
|
||
)
|
||
)
|
||
except Exception as e:
|
||
logger.warning("Evidence 检索跳过: {}", e)
|
||
memory_evidence = {
|
||
"relevant_chunks": [],
|
||
"relevant_summaries": [],
|
||
"relevant_facts": [],
|
||
"relevant_stories": [],
|
||
}
|
||
pipeline_t0 = time.perf_counter()
|
||
with business_span(
|
||
"memoir.phase2.story_pipeline",
|
||
chapter_category=chapter_category,
|
||
):
|
||
pipeline_result = run_story_pipeline_for_category_batch(
|
||
db,
|
||
user_id=user_id,
|
||
chapter_category=chapter_category,
|
||
category_segments=category_segments,
|
||
state=state,
|
||
user_profile=user_profile,
|
||
user_birth_year=user_birth_year,
|
||
llm=llm,
|
||
background_voice=background_voice,
|
||
occupation=user_occupation,
|
||
memoir_correlation_id=cid,
|
||
llm_fast=llm_fast,
|
||
memory_evidence=memory_evidence,
|
||
language=user_language,
|
||
)
|
||
pipeline_elapsed = time.perf_counter() - pipeline_t0
|
||
|
||
if pipeline_result.deferred:
|
||
deferred_response = _persist_phase2_route_defer(
|
||
db,
|
||
user_id=user_id,
|
||
chapter_category=chapter_category,
|
||
task_id=str(task_id),
|
||
memoir_correlation_id=cid,
|
||
defer_segment_ids=pipeline_result.defer_segment_ids,
|
||
defer_reason=pipeline_result.defer_reason or "unknown",
|
||
phase2_started=phase2_t0,
|
||
pipeline_elapsed=pipeline_elapsed,
|
||
lock_elapsed=lock_elapsed,
|
||
)
|
||
return deferred_response
|
||
|
||
chapter = pipeline_result.chapter
|
||
story_dispatch_ids |= pipeline_result.dispatch_ids
|
||
db.flush()
|
||
if chapter is None:
|
||
logger.error(
|
||
"event=memoir_phase2_no_chapter user_id={} chapter_category={}",
|
||
user_id,
|
||
chapter_category,
|
||
)
|
||
db.rollback()
|
||
raise self.retry(
|
||
exc=RuntimeError("story_pipeline returned no chapter"),
|
||
countdown=30,
|
||
)
|
||
|
||
db.refresh(chapter)
|
||
affected_chapter_ids.add(chapter.id)
|
||
|
||
needs_cover_enqueue = (
|
||
image_settings.enabled and chapter_needs_cover_enqueue(chapter)
|
||
)
|
||
|
||
stmt_book = (
|
||
select(Book)
|
||
.where(Book.user_id == user_id)
|
||
.order_by(Book.updated_at.desc())
|
||
)
|
||
result_book = db.execute(stmt_book)
|
||
book = result_book.scalar_one_or_none()
|
||
if not book:
|
||
book = Book(
|
||
id=str(uuid.uuid4()),
|
||
user_id=user_id,
|
||
title="我的回忆录",
|
||
total_pages=0,
|
||
total_words=0,
|
||
cover_image_url=None,
|
||
)
|
||
db.add(book)
|
||
book.has_update = True
|
||
book.last_update_chapter_id = chapter.id
|
||
|
||
if needs_cover_enqueue:
|
||
chapters_to_enqueue.add(chapter.id)
|
||
|
||
for seg in category_segments:
|
||
seg.narrated = True
|
||
seg.processed = True
|
||
|
||
db.commit()
|
||
|
||
_run_post_pipeline_commit(
|
||
user_id=user_id,
|
||
story_dispatch_ids=story_dispatch_ids,
|
||
recompose_chapter_ids=affected_chapter_ids,
|
||
cover_chapter_ids=chapters_to_enqueue,
|
||
trigger_source="pipeline_phase2",
|
||
need_compaction=True,
|
||
need_quality_pass=True,
|
||
memoir_correlation_id=cid,
|
||
compaction_extra={
|
||
"pipeline_run_id": str(task_id),
|
||
"memoir_correlation_id": cid,
|
||
"story_dispatch_ids": sorted(story_dispatch_ids),
|
||
"chapters_to_enqueue": sorted(chapters_to_enqueue),
|
||
"chapter_category": chapter_category,
|
||
},
|
||
)
|
||
|
||
phase2_elapsed = time.perf_counter() - phase2_t0
|
||
duration_ms = phase2_elapsed * 1000
|
||
logger.info(
|
||
"event=memoir_phase2_done user_id={} task_id={} chapter_category={} "
|
||
"segment_count={} memoir_correlation_id={} "
|
||
"lock_seconds={:.3f} pipeline_seconds={:.3f} "
|
||
"phase2_total_seconds={:.3f} duration_ms={:.1f} "
|
||
"msg=回忆录第二阶段叙事完成",
|
||
user_id,
|
||
task_id,
|
||
chapter_category,
|
||
len(category_segments),
|
||
cid,
|
||
lock_elapsed,
|
||
pipeline_elapsed,
|
||
phase2_elapsed,
|
||
duration_ms,
|
||
)
|
||
merge_pipeline_run(
|
||
cid,
|
||
{
|
||
"phase2": [
|
||
{
|
||
"chapter_category": chapter_category,
|
||
"task_id": str(task_id),
|
||
"status": "success",
|
||
"detail": {"segments": len(category_segments)},
|
||
}
|
||
],
|
||
},
|
||
)
|
||
return {
|
||
"status": "success",
|
||
"chapter_category": chapter_category,
|
||
"segments": len(category_segments),
|
||
}
|
||
finally:
|
||
_release_chapter_lock(lock_handle)
|
||
|
||
except Retry:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(
|
||
"event=memoir_phase2_failed user_id={} chapter_category={} exc={} "
|
||
"msg=回忆录第二阶段失败",
|
||
user_id,
|
||
chapter_category,
|
||
e,
|
||
)
|
||
merge_pipeline_run(
|
||
cid,
|
||
{
|
||
"phase2": [
|
||
{
|
||
"chapter_category": chapter_category,
|
||
"task_id": str(task_id),
|
||
"status": "failure",
|
||
"detail": {"error": str(e)},
|
||
}
|
||
],
|
||
},
|
||
)
|
||
raise self.retry(exc=e) from e
|
||
|
||
|
||
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
|
||
def process_memoir_phase1(self, user_id: str, segment_ids: List[str]):
|
||
"""
|
||
Phase 1:记忆 ingest + 抽取/分类;持久化 topic_category / skip_narrative;
|
||
按需派发 Phase 2(阈值或延迟兜底)。
|
||
"""
|
||
task_id = self.request.id
|
||
memoir_correlation_id = new_memoir_correlation_id()
|
||
logger.info(
|
||
"event=memoir_phase1_start user_id={} task_id={} segments={} "
|
||
"memoir_correlation_id={} msg=回忆录第一阶段抽取与分类开始",
|
||
user_id,
|
||
task_id,
|
||
len(segment_ids),
|
||
memoir_correlation_id,
|
||
)
|
||
_update_task_status_sync(user_id, task_id, "running")
|
||
init_pipeline_run_from_phase1(
|
||
user_id,
|
||
memoir_correlation_id,
|
||
task_id,
|
||
segment_count=len(segment_ids),
|
||
)
|
||
phase1_t0 = time.perf_counter()
|
||
|
||
try:
|
||
with business_span(
|
||
"memoir.phase1",
|
||
segment_count=len(segment_ids),
|
||
), get_sync_db() as db:
|
||
user_obj_for_lang = db.get(User, user_id)
|
||
user_language = (
|
||
"en"
|
||
if user_obj_for_lang is not None
|
||
and str(getattr(user_obj_for_lang, "language_preference", "zh") or "zh").lower()
|
||
== "en"
|
||
else "zh"
|
||
)
|
||
stmt = (
|
||
select(Segment)
|
||
.where(Segment.id.in_(segment_ids))
|
||
.order_by(Segment.created_at.asc(), Segment.id.asc())
|
||
)
|
||
rows = db.execute(stmt).scalars().all()
|
||
segments = [s for s in rows if not s.narrated]
|
||
|
||
if not segments:
|
||
logger.warning("event=memoir_phase1_no_segments ids={}", segment_ids)
|
||
merge_pipeline_run(
|
||
memoir_correlation_id,
|
||
{
|
||
"phase1": {
|
||
"status": "success",
|
||
"step": "no_segments",
|
||
"detail": {"processed": 0},
|
||
},
|
||
},
|
||
)
|
||
_update_task_status_sync(
|
||
user_id,
|
||
task_id,
|
||
"success",
|
||
{"processed": 0, "categories": []},
|
||
)
|
||
return {"status": "no_segments"}
|
||
|
||
merge_pipeline_run(
|
||
memoir_correlation_id,
|
||
{
|
||
"phase1": {
|
||
"step": "memory_ingest",
|
||
"detail": {"candidates": len(segments)},
|
||
},
|
||
},
|
||
)
|
||
ingest_t0 = time.perf_counter()
|
||
with business_span("memoir.phase1.ingest"):
|
||
ingest_items: list[tuple[str, str, dict | None]] = []
|
||
non_empty_segments: list = []
|
||
for seg in segments:
|
||
text = (seg.user_input_text or "").strip()
|
||
if not text:
|
||
continue
|
||
conv_id = getattr(seg, "conversation_id", None) or ""
|
||
ln = getattr(seg, "lineage_json", None)
|
||
lineage_payload = ln if isinstance(ln, dict) else None
|
||
ingest_items.append((conv_id, text, lineage_payload))
|
||
non_empty_segments.append(seg)
|
||
|
||
ingested_source_ids: list[str] = []
|
||
if ingest_items:
|
||
try:
|
||
ingested_source_ids = asyncio.run(
|
||
_memory_ingest_transcripts_batch(
|
||
user_id,
|
||
ingest_items,
|
||
memoir_correlation_id=memoir_correlation_id,
|
||
)
|
||
)
|
||
for seg, sid in zip(
|
||
non_empty_segments, ingested_source_ids, strict=True
|
||
):
|
||
logger.info(
|
||
"event=memory_transcript_ingested user_id={} task_id={} "
|
||
"source_id={} conversation_id={} segment_id={} transcript_chars={}",
|
||
user_id,
|
||
task_id,
|
||
sid,
|
||
getattr(seg, "conversation_id", None) or "",
|
||
seg.id,
|
||
len((seg.user_input_text or "").strip()),
|
||
)
|
||
except Exception as e:
|
||
logger.warning(
|
||
"Memory batch ingest 失败: {} exc_type={}",
|
||
e,
|
||
type(e).__name__,
|
||
)
|
||
ingest_elapsed = time.perf_counter() - ingest_t0
|
||
merge_pipeline_run(
|
||
memoir_correlation_id,
|
||
{
|
||
"phase1": {
|
||
"step": "prepare_batches",
|
||
"detail": {
|
||
"memory_ingest_seconds": round(ingest_elapsed, 3),
|
||
"ingested_sources": len(ingested_source_ids),
|
||
},
|
||
},
|
||
},
|
||
)
|
||
|
||
llm = _get_llm()
|
||
llm_fast = _get_llm_fast() or llm
|
||
if (settings.llm_fast_model or "").strip():
|
||
logger.info(
|
||
"event=llm_fast_tier_used pipeline=memoir_prepare_batches model={}",
|
||
settings.llm_fast_model,
|
||
)
|
||
|
||
prep_t0 = time.perf_counter()
|
||
with business_span("memoir.phase1.prepare_batches"):
|
||
memoir_orchestrator = MemoirOrchestrator()
|
||
|
||
def _phase1_chunk_cb(idx: int, total: int) -> None:
|
||
merge_pipeline_run(
|
||
memoir_correlation_id,
|
||
{"phase1": {"detail": {"prepare_batches_chunk": [idx, total]}}},
|
||
)
|
||
|
||
prepared = memoir_orchestrator.prepare_batches(
|
||
segments=list(segments),
|
||
llm=llm,
|
||
llm_fast=llm_fast,
|
||
get_or_create_state=lambda: get_or_create_state_sync(user_id, db),
|
||
update_slot=lambda stage, slot_name, snippet, seg_ids: update_slot_sync(
|
||
user_id,
|
||
stage,
|
||
slot_name,
|
||
snippet,
|
||
seg_ids,
|
||
db,
|
||
memoir_batch=True,
|
||
),
|
||
on_phase1_chunk=_phase1_chunk_cb,
|
||
language=user_language,
|
||
)
|
||
prep_elapsed = time.perf_counter() - prep_t0
|
||
merge_pipeline_run(
|
||
memoir_correlation_id,
|
||
{
|
||
"phase1": {
|
||
"step": "persist_topics",
|
||
"detail": {"prepare_batches_seconds": round(prep_elapsed, 3)},
|
||
},
|
||
},
|
||
)
|
||
|
||
skip_ids = prepared.segment_skip_story_ids
|
||
missing_cat = [
|
||
seg.id
|
||
for seg in segments
|
||
if not prepared.segment_chapter_category.get(str(seg.id))
|
||
]
|
||
if missing_cat:
|
||
logger.error(
|
||
"event=memoir_phase1_missing_category abort segment_ids={}",
|
||
missing_cat,
|
||
)
|
||
raise RuntimeError(
|
||
f"memoir_phase1_missing_category: {len(missing_cat)} segments"
|
||
)
|
||
|
||
for seg in segments:
|
||
cat = prepared.segment_chapter_category[str(seg.id)]
|
||
seg.topic_category = cat
|
||
is_skip = str(seg.id) in skip_ids
|
||
seg.skip_narrative = is_skip
|
||
seg.narrated = False
|
||
if is_skip:
|
||
seg.processed = True
|
||
|
||
db.flush()
|
||
|
||
categories_for_phase2: Set[str] = set()
|
||
phase2_immediate: list[str] = []
|
||
phase2_timeout: list[str] = []
|
||
woke_up_by_category: dict[str, int] = {}
|
||
for chapter_category, cat_segments in prepared.category_to_segments.items():
|
||
batch_non_skip = [
|
||
s
|
||
for s in cat_segments
|
||
if str(s.id) not in prepared.segment_skip_story_ids
|
||
]
|
||
if not batch_non_skip:
|
||
continue
|
||
woke = _wake_deferred_segments_for_category(
|
||
db, user_id, chapter_category
|
||
)
|
||
if woke:
|
||
woke_up_by_category[chapter_category] = woke
|
||
max_chars = max(
|
||
len((s.user_input_text or "").strip()) for s in batch_non_skip
|
||
)
|
||
categories_for_phase2.add(chapter_category)
|
||
if _should_trigger_phase2(db, user_id, chapter_category, max_chars):
|
||
phase2_immediate.append(chapter_category)
|
||
else:
|
||
phase2_timeout.append(chapter_category)
|
||
|
||
if woke_up_by_category:
|
||
logger.info(
|
||
"event=memoir_phase1_wake_deferred user_id={} categories={} "
|
||
"msg=Phase1 新素材唤醒同类目延迟 segment",
|
||
user_id,
|
||
woke_up_by_category,
|
||
)
|
||
|
||
db.commit()
|
||
|
||
merge_pipeline_run(
|
||
memoir_correlation_id,
|
||
{
|
||
"phase1": {
|
||
"step": "dispatch_phase2",
|
||
"detail": {
|
||
"phase2_immediate": list(phase2_immediate),
|
||
"phase2_timeout": list(phase2_timeout),
|
||
},
|
||
},
|
||
},
|
||
)
|
||
|
||
for cc in phase2_immediate:
|
||
p2tid = _dispatch_phase2_immediate(user_id, cc, memoir_correlation_id)
|
||
if p2tid:
|
||
merge_pipeline_run(
|
||
memoir_correlation_id,
|
||
{
|
||
"phase2": [
|
||
{
|
||
"chapter_category": cc,
|
||
"task_id": str(p2tid),
|
||
"status": "enqueued",
|
||
}
|
||
],
|
||
},
|
||
)
|
||
for cc in phase2_timeout:
|
||
p2tid = _schedule_phase2_timeout(user_id, cc, memoir_correlation_id)
|
||
if p2tid:
|
||
merge_pipeline_run(
|
||
memoir_correlation_id,
|
||
{
|
||
"phase2": [
|
||
{
|
||
"chapter_category": cc,
|
||
"task_id": str(p2tid),
|
||
"status": "scheduled_timeout",
|
||
}
|
||
],
|
||
},
|
||
)
|
||
|
||
categories_processed = sorted(prepared.category_to_segments.keys())
|
||
phase1_elapsed = time.perf_counter() - phase1_t0
|
||
merge_pipeline_run(
|
||
memoir_correlation_id,
|
||
{
|
||
"phase1": {
|
||
"status": "success",
|
||
"step": "completed",
|
||
"detail": {
|
||
"processed": len(segments),
|
||
"phase1_total_seconds": round(phase1_elapsed, 3),
|
||
},
|
||
},
|
||
},
|
||
)
|
||
_update_task_status_sync(
|
||
user_id,
|
||
task_id,
|
||
"success",
|
||
{
|
||
"processed": len(segments),
|
||
"categories_processed": categories_processed,
|
||
"phase2_watch_categories": sorted(categories_for_phase2),
|
||
},
|
||
)
|
||
duration_ms = phase1_elapsed * 1000
|
||
logger.info(
|
||
"event=memoir_phase1_done user_id={} task_id={} segment_count={} "
|
||
"categories={} memoir_correlation_id={} "
|
||
"memory_ingest_seconds={:.3f} prepare_batches_seconds={:.3f} "
|
||
"phase1_total_seconds={:.3f} duration_ms={:.1f} "
|
||
"msg=回忆录第一阶段完成",
|
||
user_id,
|
||
task_id,
|
||
len(segments),
|
||
categories_processed,
|
||
memoir_correlation_id,
|
||
ingest_elapsed,
|
||
prep_elapsed,
|
||
phase1_elapsed,
|
||
duration_ms,
|
||
)
|
||
return {
|
||
"status": "success",
|
||
"processed": len(segments),
|
||
"categories_processed": categories_processed,
|
||
}
|
||
|
||
except Retry:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(
|
||
"event=memoir_phase1_failed user_id={} exc={} msg=回忆录第一阶段失败",
|
||
user_id,
|
||
e,
|
||
)
|
||
merge_pipeline_run(
|
||
memoir_correlation_id,
|
||
{
|
||
"phase1": {
|
||
"status": "failure",
|
||
"step": "error",
|
||
"detail": {"error": str(e)},
|
||
},
|
||
},
|
||
)
|
||
_update_task_status_sync(user_id, task_id, "failure", {"error": str(e)})
|
||
raise self.retry(exc=e) from e
|
||
|
||
@shared_task(bind=True, max_retries=3, default_retry_delay=30)
|
||
def generate_chapter_content(self, user_id: str, stage: str, new_content: str):
|
||
"""
|
||
单独生成章节内容的任务(用于实时更新)
|
||
|
||
Args:
|
||
user_id: 用户 ID
|
||
stage: 阶段
|
||
new_content: 新内容
|
||
"""
|
||
stage = normalize_chapter_category(stage, fallback="summary")
|
||
cid = effective_correlation_id(explicit=None, celery_task_id=str(self.request.id))
|
||
gen_t0 = time.perf_counter()
|
||
logger.info(
|
||
"event=generate_chapter_content_start user_id={} stage={} memoir_correlation_id={} "
|
||
"msg=实时章节生成任务开始",
|
||
user_id,
|
||
stage,
|
||
cid,
|
||
)
|
||
|
||
try:
|
||
with get_sync_db() as db:
|
||
llm = _get_llm()
|
||
llm_fast = _get_llm_fast() or llm
|
||
user_obj = db.get(User, user_id)
|
||
user_profile = ""
|
||
user_birth_year = None
|
||
background_voice = "default"
|
||
user_occupation = ""
|
||
user_language = "zh"
|
||
if user_obj:
|
||
user_birth_year = user_obj.birth_year
|
||
user_language = (
|
||
"en"
|
||
if str(getattr(user_obj, "language_preference", "zh") or "zh").lower()
|
||
== "en"
|
||
else "zh"
|
||
)
|
||
user_profile = format_user_profile_context(
|
||
birth_year=user_obj.birth_year,
|
||
birth_place=user_obj.birth_place,
|
||
grew_up_place=user_obj.grew_up_place,
|
||
occupation=user_obj.occupation,
|
||
language=user_language,
|
||
)
|
||
background_voice = infer_background_voice(user_obj.occupation)
|
||
user_occupation = user_obj.occupation or ""
|
||
|
||
class _Seg:
|
||
def __init__(self, text: str):
|
||
self.id = str(uuid.uuid4())
|
||
self.user_input_text = text
|
||
|
||
state = get_or_create_state_sync(user_id, db)
|
||
chapter, _, dispatch_ids = run_story_pipeline_for_category_batch(
|
||
db,
|
||
user_id=user_id,
|
||
chapter_category=stage,
|
||
category_segments=[_Seg(new_content)],
|
||
state=state,
|
||
user_profile=user_profile,
|
||
user_birth_year=user_birth_year,
|
||
llm=llm,
|
||
background_voice=background_voice,
|
||
occupation=user_occupation,
|
||
memoir_correlation_id=cid,
|
||
llm_fast=llm_fast,
|
||
language=user_language,
|
||
)
|
||
db.flush()
|
||
if chapter is None:
|
||
logger.error(
|
||
"event=generate_chapter_content_no_chapter user_id={} stage={}",
|
||
user_id,
|
||
stage,
|
||
)
|
||
db.rollback()
|
||
raise self.retry(
|
||
exc=RuntimeError("story_pipeline returned no chapter"),
|
||
countdown=30,
|
||
)
|
||
db.commit()
|
||
db.refresh(chapter)
|
||
|
||
ch_ids: set[str] = {str(chapter.id)}
|
||
cover_ids = ch_ids if chapter_needs_cover_enqueue(chapter) else set()
|
||
_run_post_pipeline_commit(
|
||
user_id=user_id,
|
||
story_dispatch_ids=set(dispatch_ids),
|
||
recompose_chapter_ids=ch_ids,
|
||
cover_chapter_ids=cover_ids,
|
||
trigger_source="pipeline_generate_chapter",
|
||
need_compaction=False,
|
||
need_quality_pass=True,
|
||
memoir_correlation_id=cid,
|
||
)
|
||
ms = (time.perf_counter() - gen_t0) * 1000
|
||
logger.info(
|
||
"event=generate_chapter_content_done user_id={} stage={} "
|
||
"memoir_correlation_id={} duration_ms={:.1f} msg=实时章节生成完成",
|
||
user_id,
|
||
stage,
|
||
cid,
|
||
ms,
|
||
)
|
||
return {"status": "success"}
|
||
|
||
except Retry:
|
||
raise
|
||
except Exception as e:
|
||
ms = (time.perf_counter() - gen_t0) * 1000
|
||
logger.error(
|
||
"event=generate_chapter_content_failed user_id={} stage={} duration_ms={:.1f} "
|
||
"exc={} msg=实时章节生成失败",
|
||
user_id,
|
||
stage,
|
||
ms,
|
||
e,
|
||
)
|
||
raise self.retry(exc=e) from e
|