Merge branch 'eval/elapsed-time-memoir-batch-chunk' into development

This commit is contained in:
Kevin
2026-04-10 10:27:41 +08:00
66 changed files with 5246 additions and 705 deletions

View File

@@ -29,6 +29,10 @@ from app.core.config import settings
from app.core.db import get_sync_db
from app.core.dependencies import get_llm_provider, get_llm_provider_fast
from app.core.logging import get_logger
from app.core.memoir_pipeline_progress import (
init_pipeline_run_from_phase1,
merge_pipeline_run,
)
from app.core.memoir_pipeline_trace import (
effective_correlation_id,
new_memoir_correlation_id,
@@ -61,6 +65,10 @@ from app.features.memoir.state_service import (
from app.features.memoir.story_pipeline_sync import (
run_story_pipeline_for_category_batch,
)
from app.features.memory.service import (
ingest_transcripts_batch_sync,
schedule_enrichment_for_sources,
)
from app.features.user.models import User
from app.tasks.celery_app import celery_app
@@ -177,7 +185,7 @@ def _update_task_status_sync(
logger.debug("任务状态已更新: task_id={} status={}", task_id, status)
except Exception as e:
logger.error(f"更新任务状态失败: {e}")
logger.error("event=memoir_task_status_update_failed msg=更新任务状态失败 exc={}", e)
def _merge_chapter_image_assets(
@@ -300,19 +308,20 @@ def _phase2_immediate_task_id(user_id: str, chapter_category: str) -> str:
def _schedule_phase2_timeout(
user_id: str, chapter_category: str, memoir_correlation_id: str | None = None
) -> None:
"""Reset countdown for Phase 2 narrative for one category."""
) -> str | None:
"""Reset countdown for Phase 2 narrative for one category。返回 Celery task_id。"""
_revoke_phase2_timeout(user_id, chapter_category)
countdown = float(max(1.0, settings.memoir_narrative_batch_max_wait_seconds))
p2_kwargs: dict = {}
if memoir_correlation_id:
p2_kwargs["memoir_correlation_id"] = memoir_correlation_id
timeout_tid = _phase2_timeout_task_id(user_id, chapter_category)
celery_app.send_task(
"app.tasks.memoir_tasks.process_memoir_phase2",
args=[user_id, chapter_category],
kwargs=p2_kwargs,
countdown=countdown,
task_id=_phase2_timeout_task_id(user_id, chapter_category),
task_id=timeout_tid,
)
logger.info(
"event=phase2_timeout_scheduled user_id={} chapter_category={} countdown={} "
@@ -322,11 +331,12 @@ def _schedule_phase2_timeout(
countdown,
memoir_correlation_id or "",
)
return timeout_tid
def _dispatch_phase2_immediate(
user_id: str, chapter_category: str, memoir_correlation_id: str | None = None
) -> None:
) -> str | None:
_revoke_phase2_timeout(user_id, chapter_category)
p2_kwargs: dict = {}
if memoir_correlation_id:
@@ -335,17 +345,22 @@ def _dispatch_phase2_immediate(
"args": [user_id, chapter_category],
"kwargs": p2_kwargs,
}
fixed_tid: str | None = None
if settings.memoir_phase2_singleflight_immediate:
send_kw["task_id"] = _phase2_immediate_task_id(user_id, chapter_category)
celery_app.send_task("app.tasks.memoir_tasks.process_memoir_phase2", **send_kw)
fixed_tid = _phase2_immediate_task_id(user_id, chapter_category)
send_kw["task_id"] = fixed_tid
ar = celery_app.send_task("app.tasks.memoir_tasks.process_memoir_phase2", **send_kw)
out_tid = fixed_tid or getattr(ar, "id", None)
logger.info(
"event=phase2_dispatched_immediate user_id={} chapter_category={} "
"memoir_correlation_id={} task_id_mode={}",
"memoir_correlation_id={} task_id_mode={} celery_task_id={}",
user_id,
chapter_category,
memoir_correlation_id or "",
"singleflight" if settings.memoir_phase2_singleflight_immediate else "unique",
out_tid or "",
)
return out_tid
def dispatch_pending_memoir_phase2_for_user(user_id: str) -> None:
@@ -370,18 +385,34 @@ def dispatch_pending_memoir_phase2_for_user(user_id: str) -> None:
for chapter_category in cats:
_revoke_phase2_timeout(user_id, chapter_category)
flush_cid = new_memoir_correlation_id()
celery_app.send_task(
ar = celery_app.send_task(
"app.tasks.memoir_tasks.process_memoir_phase2",
args=[user_id, chapter_category],
kwargs={"memoir_correlation_id": flush_cid},
)
p2tid = getattr(ar, "id", None)
logger.info(
"event=phase2_dispatched_flush user_id={} chapter_category={} "
"memoir_correlation_id={}",
"memoir_correlation_id={} celery_task_id={}",
user_id,
chapter_category,
flush_cid,
p2tid or "",
)
if p2tid and flush_cid:
merge_pipeline_run(
flush_cid,
{
"user_id": user_id,
"phase2": [
{
"chapter_category": chapter_category,
"task_id": str(p2tid),
"status": "enqueued",
}
],
},
)
except Exception as e:
logger.error(
"event=phase2_flush_failed user_id={} exc_type={} exc={}",
@@ -406,12 +437,25 @@ def process_memoir_phase2(
phase2_t0 = time.perf_counter()
logger.info(
"event=memoir_phase2_start user_id={} task_id={} chapter_category={} "
"memoir_correlation_id={}",
"memoir_correlation_id={} msg=回忆录第二阶段叙事任务开始",
user_id,
task_id,
chapter_category,
cid,
)
merge_pipeline_run(
cid,
{
"user_id": user_id,
"phase2": [
{
"chapter_category": chapter_category,
"task_id": str(task_id),
"status": "running",
}
],
},
)
try:
with get_sync_db() as db:
user_convs = select(Conversation.id).where(
@@ -431,14 +475,30 @@ def process_memoir_phase2(
category_segments = list(db.execute(stmt).scalars().all())
if not category_segments:
ms = (time.perf_counter() - phase2_t0) * 1000
logger.info(
"event=memoir_phase2_noop user_id={} chapter_category={}",
"event=memoir_phase2_noop user_id={} chapter_category={} "
"duration_ms={:.1f} msg=第二阶段无待叙事片段",
user_id,
chapter_category,
ms,
)
merge_pipeline_run(
cid,
{
"phase2": [
{
"chapter_category": chapter_category,
"task_id": str(task_id),
"status": "noop",
}
],
},
)
return {"status": "noop"}
llm = _get_llm()
llm_fast = _get_llm_fast() or llm
user_obj = db.get(User, user_id)
user_profile = ""
user_birth_year = None
@@ -477,6 +537,19 @@ def process_memoir_phase2(
# 锁内再查一次,避免等待锁期间状态已变
category_segments = list(db.execute(stmt).scalars().all())
if not category_segments:
merge_pipeline_run(
cid,
{
"phase2": [
{
"chapter_category": chapter_category,
"task_id": str(task_id),
"status": "noop",
"detail": {"reason": "empty_after_lock"},
}
],
},
)
return {"status": "noop"}
state = get_or_create_state_sync(user_id, db)
@@ -493,6 +566,7 @@ def process_memoir_phase2(
background_voice=background_voice,
occupation=user_occupation,
memoir_correlation_id=cid,
llm_fast=llm_fast,
)
pipeline_elapsed = time.perf_counter() - pipeline_t0
story_dispatch_ids |= disp
@@ -564,11 +638,13 @@ def process_memoir_phase2(
)
phase2_elapsed = time.perf_counter() - phase2_t0
duration_ms = phase2_elapsed * 1000
logger.info(
"event=memoir_phase2_done user_id={} task_id={} chapter_category={} "
"segment_count={} memoir_correlation_id={} "
"lock_seconds={:.3f} pipeline_seconds={:.3f} "
"phase2_total_seconds={:.3f}",
"phase2_total_seconds={:.3f} duration_ms={:.1f} "
"msg=回忆录第二阶段叙事完成",
user_id,
task_id,
chapter_category,
@@ -577,6 +653,20 @@ def process_memoir_phase2(
lock_elapsed,
pipeline_elapsed,
phase2_elapsed,
duration_ms,
)
merge_pipeline_run(
cid,
{
"phase2": [
{
"chapter_category": chapter_category,
"task_id": str(task_id),
"status": "success",
"detail": {"segments": len(category_segments)},
}
],
},
)
return {
"status": "success",
@@ -590,11 +680,25 @@ def process_memoir_phase2(
raise
except Exception as e:
logger.error(
"event=memoir_phase2_failed user_id={} chapter_category={} exc={}",
"event=memoir_phase2_failed user_id={} chapter_category={} exc={} "
"msg=回忆录第二阶段失败",
user_id,
chapter_category,
e,
)
merge_pipeline_run(
cid,
{
"phase2": [
{
"chapter_category": chapter_category,
"task_id": str(task_id),
"status": "failure",
"detail": {"error": str(e)},
}
],
},
)
raise self.retry(exc=e) from e
@@ -608,13 +712,19 @@ def process_memoir_phase1(self, user_id: str, segment_ids: List[str]):
memoir_correlation_id = new_memoir_correlation_id()
logger.info(
"event=memoir_phase1_start user_id={} task_id={} segments={} "
"memoir_correlation_id={}",
"memoir_correlation_id={} msg=回忆录第一阶段抽取与分类开始",
user_id,
task_id,
len(segment_ids),
memoir_correlation_id,
)
_update_task_status_sync(user_id, task_id, "running")
init_pipeline_run_from_phase1(
user_id,
memoir_correlation_id,
task_id,
segment_count=len(segment_ids),
)
phase1_t0 = time.perf_counter()
try:
@@ -629,6 +739,16 @@ def process_memoir_phase1(self, user_id: str, segment_ids: List[str]):
if not segments:
logger.warning("event=memoir_phase1_no_segments ids={}", segment_ids)
merge_pipeline_run(
memoir_correlation_id,
{
"phase1": {
"status": "success",
"step": "no_segments",
"detail": {"processed": 0},
},
},
)
_update_task_status_sync(
user_id,
task_id,
@@ -637,42 +757,66 @@ def process_memoir_phase1(self, user_id: str, segment_ids: List[str]):
)
return {"status": "no_segments"}
merge_pipeline_run(
memoir_correlation_id,
{
"phase1": {
"step": "memory_ingest",
"detail": {"candidates": len(segments)},
},
},
)
ingest_t0 = time.perf_counter()
ingest_items: list[tuple[str, str, dict | None]] = []
non_empty_segments: list = []
for seg in segments:
conv_id = getattr(seg, "conversation_id", None) or ""
text = (seg.user_input_text or "").strip()
if not text:
continue
try:
from app.features.memory.service import ingest_transcript_sync
conv_id = getattr(seg, "conversation_id", None) or ""
ln = getattr(seg, "lineage_json", None)
lineage_payload = ln if isinstance(ln, dict) else None
ingest_items.append((conv_id, text, lineage_payload))
non_empty_segments.append(seg)
ln = getattr(seg, "lineage_json", None)
lineage_payload = ln if isinstance(ln, dict) else None
source_id = ingest_transcript_sync(
db,
user_id,
conv_id,
text,
lineage_json=lineage_payload,
)
logger.info(
"event=memory_transcript_ingested user_id={} task_id={} "
"source_id={} conversation_id={} segment_id={} transcript_chars={}",
user_id,
task_id,
source_id,
conv_id,
seg.id,
len(text),
ingested_source_ids: list[str] = []
if ingest_items:
try:
ingested_source_ids = ingest_transcripts_batch_sync(
db, user_id, ingest_items
)
for seg, sid in zip(
non_empty_segments, ingested_source_ids, strict=True
):
logger.info(
"event=memory_transcript_ingested user_id={} task_id={} "
"source_id={} conversation_id={} segment_id={} transcript_chars={}",
user_id,
task_id,
sid,
getattr(seg, "conversation_id", None) or "",
seg.id,
len((seg.user_input_text or "").strip()),
)
except Exception as e:
logger.warning(
"Memory ingest 跳过 segment_id={}: {} exc_type={}",
getattr(seg, "id", ""),
"Memory batch ingest 失败: {} exc_type={}",
e,
type(e).__name__,
)
ingest_elapsed = time.perf_counter() - ingest_t0
merge_pipeline_run(
memoir_correlation_id,
{
"phase1": {
"step": "prepare_batches",
"detail": {
"memory_ingest_seconds": round(ingest_elapsed, 3),
"ingested_sources": len(ingested_source_ids),
},
},
},
)
llm = _get_llm()
llm_fast = _get_llm_fast() or llm
@@ -684,6 +828,13 @@ def process_memoir_phase1(self, user_id: str, segment_ids: List[str]):
prep_t0 = time.perf_counter()
memoir_orchestrator = MemoirOrchestrator()
def _phase1_chunk_cb(idx: int, total: int) -> None:
merge_pipeline_run(
memoir_correlation_id,
{"phase1": {"detail": {"prepare_batches_chunk": [idx, total]}}},
)
prepared = memoir_orchestrator.prepare_batches(
segments=list(segments),
llm=llm,
@@ -698,8 +849,18 @@ def process_memoir_phase1(self, user_id: str, segment_ids: List[str]):
db,
memoir_batch=True,
),
on_phase1_chunk=_phase1_chunk_cb,
)
prep_elapsed = time.perf_counter() - prep_t0
merge_pipeline_run(
memoir_correlation_id,
{
"phase1": {
"step": "persist_topics",
"detail": {"prepare_batches_seconds": round(prep_elapsed, 3)},
},
},
)
skip_ids = prepared.segment_skip_story_ids
missing_cat = [
@@ -749,13 +910,72 @@ def process_memoir_phase1(self, user_id: str, segment_ids: List[str]):
db.commit()
merge_pipeline_run(
memoir_correlation_id,
{
"phase1": {
"step": "dispatch_phase2",
"detail": {
"phase2_immediate": list(phase2_immediate),
"phase2_timeout": list(phase2_timeout),
},
},
},
)
if ingested_source_ids:
schedule_enrichment_for_sources(
user_id,
ingested_source_ids,
memoir_correlation_id=memoir_correlation_id,
)
for cc in phase2_immediate:
_dispatch_phase2_immediate(user_id, cc, memoir_correlation_id)
p2tid = _dispatch_phase2_immediate(user_id, cc, memoir_correlation_id)
if p2tid:
merge_pipeline_run(
memoir_correlation_id,
{
"phase2": [
{
"chapter_category": cc,
"task_id": str(p2tid),
"status": "enqueued",
}
],
},
)
for cc in phase2_timeout:
_schedule_phase2_timeout(user_id, cc, memoir_correlation_id)
p2tid = _schedule_phase2_timeout(user_id, cc, memoir_correlation_id)
if p2tid:
merge_pipeline_run(
memoir_correlation_id,
{
"phase2": [
{
"chapter_category": cc,
"task_id": str(p2tid),
"status": "scheduled_timeout",
}
],
},
)
categories_processed = sorted(prepared.category_to_segments.keys())
phase1_elapsed = time.perf_counter() - phase1_t0
merge_pipeline_run(
memoir_correlation_id,
{
"phase1": {
"status": "success",
"step": "completed",
"detail": {
"processed": len(segments),
"phase1_total_seconds": round(phase1_elapsed, 3),
},
},
},
)
_update_task_status_sync(
user_id,
task_id,
@@ -766,11 +986,13 @@ def process_memoir_phase1(self, user_id: str, segment_ids: List[str]):
"phase2_watch_categories": sorted(categories_for_phase2),
},
)
duration_ms = phase1_elapsed * 1000
logger.info(
"event=memoir_phase1_done user_id={} task_id={} segment_count={} "
"categories={} memoir_correlation_id={} "
"memory_ingest_seconds={:.3f} prepare_batches_seconds={:.3f} "
"phase1_total_seconds={:.3f}",
"phase1_total_seconds={:.3f} duration_ms={:.1f} "
"msg=回忆录第一阶段完成",
user_id,
task_id,
len(segments),
@@ -779,6 +1001,7 @@ def process_memoir_phase1(self, user_id: str, segment_ids: List[str]):
ingest_elapsed,
prep_elapsed,
phase1_elapsed,
duration_ms,
)
return {
"status": "success",
@@ -789,7 +1012,21 @@ def process_memoir_phase1(self, user_id: str, segment_ids: List[str]):
except Retry:
raise
except Exception as e:
logger.error("event=memoir_phase1_failed user_id={} exc={}", user_id, e)
logger.error(
"event=memoir_phase1_failed user_id={} exc={} msg=回忆录第一阶段失败",
user_id,
e,
)
merge_pipeline_run(
memoir_correlation_id,
{
"phase1": {
"status": "failure",
"step": "error",
"detail": {"error": str(e)},
},
},
)
_update_task_status_sync(user_id, task_id, "failure", {"error": str(e)})
raise self.retry(exc=e) from e
@@ -810,8 +1047,10 @@ def generate_chapter_content(self, user_id: str, stage: str, new_content: str):
"""
stage = normalize_chapter_category(stage, fallback="summary")
cid = effective_correlation_id(explicit=None, celery_task_id=str(self.request.id))
gen_t0 = time.perf_counter()
logger.info(
"event=generate_chapter_content_start user_id={} stage={} memoir_correlation_id={}",
"event=generate_chapter_content_start user_id={} stage={} memoir_correlation_id={} "
"msg=实时章节生成任务开始",
user_id,
stage,
cid,
@@ -820,6 +1059,7 @@ def generate_chapter_content(self, user_id: str, stage: str, new_content: str):
try:
with get_sync_db() as db:
llm = _get_llm()
llm_fast = _get_llm_fast() or llm
user_obj = db.get(User, user_id)
user_profile = ""
user_birth_year = None
@@ -854,6 +1094,7 @@ def generate_chapter_content(self, user_id: str, stage: str, new_content: str):
background_voice=background_voice,
occupation=user_occupation,
memoir_correlation_id=cid,
llm_fast=llm_fast,
)
db.flush()
if chapter is None:
@@ -882,10 +1123,27 @@ def generate_chapter_content(self, user_id: str, stage: str, new_content: str):
need_quality_pass=True,
memoir_correlation_id=cid,
)
ms = (time.perf_counter() - gen_t0) * 1000
logger.info(
"event=generate_chapter_content_done user_id={} stage={} "
"memoir_correlation_id={} duration_ms={:.1f} msg=实时章节生成完成",
user_id,
stage,
cid,
ms,
)
return {"status": "success"}
except Retry:
raise
except Exception as e:
logger.error(f"章节生成失败: {e}")
ms = (time.perf_counter() - gen_t0) * 1000
logger.error(
"event=generate_chapter_content_failed user_id={} stage={} duration_ms={:.1f} "
"exc={} msg=实时章节生成失败",
user_id,
stage,
ms,
e,
)
raise self.retry(exc=e) from e