feat(api)!: memory single chain — async MemoryService, strict eval closure

Route all memory ingest/retrieve/enrichment/compaction through async MemoryService.
Remove legacy sync memory implementations (ingest/retrieve/compaction); Celery and
memoir Phase2 call asyncio.run into MemoryService-backed helpers.

Memoir Phase1 batch ingest uses MemoryService.ingest_transcripts_batch; drop chapters.
evidence_bundle_json mirror (Alembic 0015). Evaluation uses snapshot/link-only bundles;
raise EvidenceClosureMissing instead of partial/fallback lineage tiers.

Split memoir state into NarrativeCoverageState and InterviewControlState; delete the
_interview_meta_store adapter layer. Remove rolling-query and recent-fact fallback
settings from config and evidence assembly.

Update judges, docs, tests, and PlaygroundPage alignment.

Made-with: Cursor
This commit is contained in:
Kevin
2026-04-30 14:11:46 +08:00
parent ac436b87a2
commit 71fbd39e32
53 changed files with 953 additions and 2448 deletions

View File

@@ -2,6 +2,7 @@
回忆录处理 Celery 任务
"""
import asyncio
import json
import time
import uuid
@@ -26,7 +27,8 @@ from app.core.chapter_pipeline_lock import (
release_chapter_pipeline_lock as _release_chapter_lock,
)
from app.core.config import settings
from app.core.db import get_sync_db
from app.core.db import AsyncSessionLocal, get_sync_db
from app.core.dependencies import get_embedding_provider
from app.core.llm_gateway import LlmGateway, LlmUseCase
from app.core.logging import get_logger
from app.core.memoir_pipeline_progress import (
@@ -65,10 +67,7 @@ from app.features.memoir.state_service import (
from app.features.memoir.story_pipeline_sync import (
run_story_pipeline_for_category_batch,
)
from app.features.memory.service import (
ingest_transcripts_batch_sync,
schedule_enrichment_for_sources,
)
from app.features.memory.service import MemoryService
from app.features.user.models import User
from app.tasks.celery_app import celery_app
@@ -144,6 +143,33 @@ def _get_llm_fast():
return None
async def _memory_ingest_transcripts_batch(
user_id: str,
items: list[tuple[str, str, dict | None]],
*,
memoir_correlation_id: str,
) -> list[str]:
async with AsyncSessionLocal() as db:
service = MemoryService(db, embedding_provider=get_embedding_provider())
return await service.ingest_transcripts_batch(
user_id,
items,
memoir_correlation_id=memoir_correlation_id,
)
async def _memory_retrieve_evidence(
user_id: str,
query: str,
*,
top_k: int,
) -> dict:
async with AsyncSessionLocal() as db:
service = MemoryService(db, embedding_provider=get_embedding_provider())
bundle = await service.retrieve(user_id, query, top_k=top_k)
return bundle.model_dump()
def _get_redis_client(*, decode_responses: bool = False) -> redis.Redis:
from app.core.config import settings
@@ -557,6 +583,29 @@ def process_memoir_phase2(
return {"status": "noop"}
state = get_or_create_state_sync(user_id, db)
segment_texts = [seg.user_input_text or "" for seg in category_segments]
combined_text = "\n\n".join(segment_texts)
n_units = len(category_segments)
evidence_top_k = int(settings.evidence_top_k_default)
if n_units > int(settings.evidence_large_batch_threshold):
evidence_top_k = int(settings.evidence_top_k_large_batch)
try:
memory_evidence = asyncio.run(
_memory_retrieve_evidence(
user_id,
combined_text,
top_k=evidence_top_k,
)
)
except Exception as e:
logger.warning("Evidence 检索跳过: {}", e)
memory_evidence = {
"relevant_chunks": [],
"relevant_summaries": [],
"relevant_facts": [],
"timeline_hints": [],
"relevant_stories": [],
}
pipeline_t0 = time.perf_counter()
chapter, needs_cover, disp = run_story_pipeline_for_category_batch(
db,
@@ -571,6 +620,7 @@ def process_memoir_phase2(
occupation=user_occupation,
memoir_correlation_id=cid,
llm_fast=llm_fast,
memory_evidence=memory_evidence,
)
pipeline_elapsed = time.perf_counter() - pipeline_t0
story_dispatch_ids |= disp
@@ -786,8 +836,12 @@ def process_memoir_phase1(self, user_id: str, segment_ids: List[str]):
ingested_source_ids: list[str] = []
if ingest_items:
try:
ingested_source_ids = ingest_transcripts_batch_sync(
db, user_id, ingest_items
ingested_source_ids = asyncio.run(
_memory_ingest_transcripts_batch(
user_id,
ingest_items,
memoir_correlation_id=memoir_correlation_id,
)
)
for seg, sid in zip(
non_empty_segments, ingested_source_ids, strict=True
@@ -927,13 +981,6 @@ def process_memoir_phase1(self, user_id: str, segment_ids: List[str]):
},
)
if ingested_source_ids:
schedule_enrichment_for_sources(
user_id,
ingested_source_ids,
memoir_correlation_id=memoir_correlation_id,
)
for cc in phase2_immediate:
p2tid = _dispatch_phase2_immediate(user_id, cc, memoir_correlation_id)
if p2tid:

View File

@@ -2,6 +2,7 @@
from __future__ import annotations
import asyncio
import time
from datetime import datetime
from typing import Any
@@ -9,7 +10,7 @@ from typing import Any
from celery import shared_task
from app.core.config import settings
from app.core.db import get_sync_db
from app.core.db import AsyncSessionLocal
from app.core.logging import get_logger
from app.core.memory_compaction_schedule import (
finalize_memory_compaction_run,
@@ -19,12 +20,28 @@ from app.core.memory_compaction_schedule import (
set_incremental_cursor_pair,
)
from app.core.redis_lock import acquire_redis_lock, release_redis_lock
from app.features.memory.compaction_service import run_memory_compaction_sync
from app.features.memory.repo import list_users_with_recent_chunks_sync
from app.features.memory.repo import list_users_with_recent_chunks
from app.features.memory.service import MemoryService
logger = get_logger(__name__)
async def _list_users_with_recent_chunks_async(hours: int) -> list[str]:
async with AsyncSessionLocal() as db:
return await list_users_with_recent_chunks(db, hours=hours)
async def _run_memory_compaction_async(
user_id: str,
context: dict[str, Any] | None,
) -> dict[str, Any]:
async with AsyncSessionLocal() as db:
service = MemoryService(db)
out = await service.compact_user(user_id, context)
await db.commit()
return out
@shared_task
def memory_compaction_sweep() -> dict[str, Any]:
"""Beat为近期有记忆写入的用户调度 compactiondebounce 仍由 schedule 合并)。"""
@@ -32,8 +49,7 @@ def memory_compaction_sweep() -> dict[str, Any]:
if not settings.memory_compaction_enabled:
return {"skipped": True, "reason": "disabled"}
hours = int(settings.memory_compaction_sweep_recent_hours)
with get_sync_db() as session:
user_ids = list_users_with_recent_chunks_sync(session, hours=hours)
user_ids = asyncio.run(_list_users_with_recent_chunks_async(hours))
ctx_base: dict[str, Any] = {"trigger_source": "beat", "sweep_hours": hours}
for uid in user_ids:
schedule_memory_compaction_run(uid, dict(ctx_base))
@@ -84,9 +100,7 @@ def memory_compaction_run(
return out
try:
with get_sync_db() as session:
out = run_memory_compaction_sync(session, user_id, ctx)
session.commit()
out = asyncio.run(_run_memory_compaction_async(user_id, ctx))
if out.get("new_cursor_ts") and out.get("new_cursor_id") is not None:
set_incremental_cursor_pair(

View File

@@ -6,18 +6,30 @@ Tasks are routed to ``settings.celery_memory_enrichment_queue`` (default ``memor
run workers with ``-Q celery,memory_idle`` or a dedicated low-priority worker for that queue.
"""
import asyncio
import time
from celery import shared_task
from app.core.config import settings
from app.core.db import get_sync_db
from app.core.db import AsyncSessionLocal
from app.core.logging import get_logger
from app.core.memoir_pipeline_progress import merge_fanout_item
from app.features.memory.service import MemoryService
logger = get_logger(__name__)
async def _enrich_memory_source_async(
user_id: str,
source_id: str,
) -> None:
async with AsyncSessionLocal() as db:
service = MemoryService(db)
await service.enrich_source(user_id, source_id, llm=None)
await db.commit()
def schedule_memory_enrichment(
user_id: str,
source_id: str,
@@ -100,28 +112,24 @@ def enrich_memory_source(
status="running",
)
try:
with get_sync_db() as db:
from app.features.memory.enrichment import enrich_memory_after_ingest_sync
enrich_memory_after_ingest_sync(db, user_id, source_id, llm=None)
db.commit()
ms = (time.perf_counter() - t0) * 1000
logger.info(
"event=memory_enrichment_done user_id={} source_id={} duration_ms={:.1f} "
"msg=记忆富化完成",
user_id,
source_id,
ms,
)
merge_fanout_item(
memoir_correlation_id,
list_name="memory_enrichment",
id_field="source_id",
item_id=source_id,
task_id=tid,
status="success",
)
return {"status": "success", "source_id": source_id}
asyncio.run(_enrich_memory_source_async(user_id, source_id))
ms = (time.perf_counter() - t0) * 1000
logger.info(
"event=memory_enrichment_done user_id={} source_id={} duration_ms={:.1f} "
"msg=记忆富化完成",
user_id,
source_id,
ms,
)
merge_fanout_item(
memoir_correlation_id,
list_name="memory_enrichment",
id_field="source_id",
item_id=source_id,
task_id=tid,
status="success",
)
return {"status": "success", "source_id": source_id}
except Exception as e:
ms = (time.perf_counter() - t0) * 1000
logger.warning(