Files
life-echo/api/app/tasks/chapter_compose_tasks.py
Kevin ac49bc7f23 feat(eval): memoir A/B chapter judging and eval-web parity with dialogue
- Judge baseline excerpt and library chapter separately; build_memoir_compare_summary for gate, nine-dim and leaf deltas.

- Memoir SSE chapter payload: baseline_judge, compare_summary, baseline_judge_error.

- MemoirJudgeOutput: loose score coercion and post-validate clamp; memoir judge prompt caps from settings.

- app-eval-web: two-column MemoirScoreCard layout, MemoirCompareSummary, chapter blocks and CSS.

- Add memoir_compare_summary, log_events, celery_log_context, memoir_pipeline_progress; tests and migration 0014.

- Misc: memory/evidence and enrichment paths, task/orchestrator updates, internal-eval docs, env examples.
2026-04-10 10:25:15 +08:00

153 lines
5.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Celerystory 变更后重组关联章节的 canonical_markdown物化视图"""
import time
from datetime import datetime, timezone
from celery import shared_task
from app.core.chapter_pipeline_lock import (
acquire_chapter_pipeline_lock,
release_chapter_pipeline_lock,
)
from app.core.config import settings
from app.core.db import get_sync_db
from app.core.logging import get_logger
from app.core.memoir_pipeline_progress import merge_fanout_item
from app.core.memoir_pipeline_trace import new_memoir_correlation_id
from app.core.memory_compaction_schedule import schedule_memory_compaction_run
from app.features.memoir import repo as memoir_repo
from app.features.memoir.models import Chapter
logger = get_logger(__name__)
@shared_task(bind=True, max_retries=8, default_retry_delay=30)
def recompose_chapter(
self, chapter_id: str, memoir_correlation_id: str | None = None
) -> dict:
"""
按章节物化 canonical_markdown仅当 markdown_compose_dirty 为 True 时执行;
与 pipeline 共用章节级 Redis 锁,拿不到锁则跳过(依赖后续触发重试)。
"""
lock_ttl = int(settings.chapter_pipeline_lock_ttl_seconds)
tid = str(self.request.id)
t0 = time.perf_counter()
merge_fanout_item(
memoir_correlation_id,
list_name="recompose_chapters",
id_field="chapter_id",
item_id=chapter_id,
task_id=tid,
status="running",
)
user_id: str | None = None
composed = False
with get_sync_db() as session:
chapter = session.get(Chapter, chapter_id)
if not chapter:
ms = (time.perf_counter() - t0) * 1000
logger.info(
"event=recompose_chapter status=not_found chapter_id={} duration_ms={:.1f} "
"msg=章节重组跳过(章节不存在)",
chapter_id,
ms,
)
merge_fanout_item(
memoir_correlation_id,
list_name="recompose_chapters",
id_field="chapter_id",
item_id=chapter_id,
task_id=tid,
status="not_found",
)
return {"status": "not_found"}
if chapter.markdown_compose_dirty is not True:
ms = (time.perf_counter() - t0) * 1000
logger.info(
"event=recompose_chapter status=skip_not_dirty chapter_id={} duration_ms={:.1f} "
"msg=章节重组跳过(无需重组)",
chapter_id,
ms,
)
merge_fanout_item(
memoir_correlation_id,
list_name="recompose_chapters",
id_field="chapter_id",
item_id=chapter_id,
task_id=tid,
status="skip_not_dirty",
)
return {"status": "skip_not_dirty"}
uid = str(chapter.user_id)
stage = str(chapter.category)
lock_handle = acquire_chapter_pipeline_lock(uid, stage, ttl_seconds=lock_ttl)
if lock_handle is None:
ms = (time.perf_counter() - t0) * 1000
logger.info(
"event=recompose_chapter status=lock_busy_retry "
"chapter_id={} user_id={} stage={} retry_on_lock={} duration_ms={:.1f} "
"msg=章节重组等待锁或重试",
chapter_id,
uid,
stage,
settings.memoir_recompose_retry_on_lock_contention,
ms,
)
if settings.memoir_recompose_retry_on_lock_contention:
countdown = max(15, min(120, lock_ttl // 4))
raise self.retry(countdown=countdown)
merge_fanout_item(
memoir_correlation_id,
list_name="recompose_chapters",
id_field="chapter_id",
item_id=chapter_id,
task_id=tid,
status="skip_lock_contention",
)
return {"status": "skip_lock_contention"}
try:
composed = memoir_repo.compose_chapter_from_story_links_sync(
session, chapter_id
)
session.commit()
user_id = uid
except Exception as exc:
session.rollback()
logger.warning(
"recompose_chapter failed chapter_id={} err={}", chapter_id, exc
)
raise self.retry(exc=exc) from exc
finally:
release_chapter_pipeline_lock(lock_handle)
if user_id:
schedule_memory_compaction_run(
user_id,
{
"trigger_source": "chapter_recompose",
"trigger_time": datetime.now(timezone.utc).isoformat(),
"pipeline_run_id": str(self.request.id),
"memoir_correlation_id": new_memoir_correlation_id(),
"recomposed_chapter_ids": [chapter_id],
},
)
ms = (time.perf_counter() - t0) * 1000
st = "composed" if composed else "empty"
logger.info(
"event=recompose_chapter status={} chapter_id={} user_id={} duration_ms={:.1f} "
"msg=章节物化重组完成",
st,
chapter_id,
user_id or "-",
ms,
)
merge_fanout_item(
memoir_correlation_id,
list_name="recompose_chapters",
id_field="chapter_id",
item_id=chapter_id,
task_id=tid,
status="composed" if composed else "empty",
)
return {"status": "composed" if composed else "empty", "chapter_id": chapter_id}