Memory compaction(近重复 chunk 软排除) - 新增 compaction 调度:Redis debounce、scheduler gate、增量游标;任务结束时 finalize,避免 gate 长期占用并处理运行期新 trigger。 - Celery memory_compaction_run:debounce 未到点则 retry;用户级 Redis 锁;成功路径更新游标并 finalize;异常时释放 scheduler gate 并 self.retry,避免静默卡死调度与瞬时失败不重试。 - compaction_service:多层判定 + canonical 打分;无 embedding 时停止前移游标(awaiting_embeddings);curation details 补全 trigger 等上下文。 - ingest_transcript_sync:同步路径尽力写入 embedding,与异步 ingest 行为对齐,避免 compaction 永远扫不到无向量 chunk。 - repo:新增 update_chunk_embedding_sync。 测试 - 扩展 test_memory_compaction:调度合并、finalize、ingest embedding、无向量游标、异常路径 gate+retry 等回归用
8.7 KiB
name, overview, todos, isProject
| name | overview | todos | isProject |
|---|---|---|---|
| false |
Memory 整理(Compaction)管线计划(修订版)
目标与约束
- 事件触发 + 防抖、增量范围(非每次全库)、软操作优先(合并/标记 exclude,硬删极少且高置信)、全量大扫除低频。
- 不修改
User档案字段(出生年、职业等);仅操作 memory 相关表。
必须满足(MVP 硬约束)
| 约束 | 说明 |
|---|---|
| 幂等 | 同一批次重复执行,结果不应放大(重复 exclude、重复 curation_action 应被跳过或 no-op)。 |
| 用户级互斥 | 同一用户同一时刻只允许一个 compaction 在跑(见下「用户级短锁」)。 |
| 单次上限 | 扫描 chunk 数、写入 exclude 数、候选对数均有限流与上限。 |
| 只做软操作 | set_chunk_excluded + MemoryCurationAction;不物理删 memory_sources/memory_chunks(MVP)。 |
| 可追踪 | 每次 exclude 记录原因、对比对象 chunk id、触发上下文(见 context)。 |
1) 调度层:防抖主路径(收紧)
不再把「revoke 旧 Celery task + 重新 apply_async」作为主路径,原因:Celery revoke 对 已 prefetch / 已执行 的任务 强保证不足;多 worker、网络抖动下 task_id 链路易碎。
主路径(推荐)
- Redis 键例如:
debounce_until:{user_id}或pending:{user_id}(值可为 Unix 秒或 ISO 时间),表示「最早允许执行 compaction 的时间」。 - 每次触发(memoir / recompose / manual / beat)只做:把 debounce 窗口往后推(或
SET+ TTL),尽量少发 Celery 任务。 - 任务
memory_compaction_run设计为 幂等;worker 执行开头校验:- 当前时间是否 仍落在 debounce 窗口内(若设计为「到点再跑」,则相反:未到点则短
retry或return); - 是否已取得用户级锁(见 §2);
- 可选:是否有 更新的 trigger 覆盖了本次 run(例如比较
pending_version/last_trigger_seq)。
- 当前时间是否 仍落在 debounce 窗口内(若设计为「到点再跑」,则相反:未到点则短
增强项(可选)
- revoke + task_id 可作为 补充,不作为依赖正确性的主路径。
分层原则
- 调度层:尽量少发任务。
- 执行层:即使多发、retry、重叠调度,也 安全(幂等 + 锁 + 上限)。
2) 用户级短锁(MVP 标配,非可选)
在 memory_compaction_run(user_id) 开头:
- 键:
lock:memory_compaction:{user_id} - TTL:5–15 分钟(可配置,略大于单次任务 P99)。
- 获取失败:直接 skip,结构化日志
lock_contention/skipped_reason=lock_not_acquired。
覆盖场景:process_memoir_segments 与 recompose_chapters_for_story 双触发、worker 重试、beat/手动全量与增量重叠。
3) 增量范围(明确规则:以 chunk 为索引)
MVP 定义:增量对象 = 新进入对比池的 chunks,不以 memory_sources.created_at 单独作为主粒度。
原因
- 一个 source 下多 chunk,source 时间粒度太粗;
- source 更新不一定等于 chunk 语义变化。
推荐规则
- 维护 per-user cursor:
last_compaction_cursor(Redis 或 DB;MVP 可用 Rediscompaction:chunk_cursor:{user_id}存max(processed_chunk_updated_at)或单调序号)。 - 取
**memory_chunks.updated_at(或created_at)> cursor** 的 chunk 作为 增量 chunk 集合(分页 + 上限)。 - 对比范围:增量 chunk 仅与 同 user、未 excluded 的 历史 chunk 做近重复检测(可再限:同 source 优先、或时间窗内)。
可选收窄:若 context 带 candidate_chunk_ids / candidate_source_ids,与上述集合 求交 或 并集取优(实现时二选一并写死文档)。
4) 近重复判定:多层保护(不只全局 embedding 阈值)
配置项 memory_compaction_chunk_similarity_threshold 保留,但 自动 exclude 条件 至少需 多层组合(建议 三项中满足 ≥2 项 才自动 exclude,可配置):
| 层 | 说明 |
|---|---|
| Embedding | 同 user_id、余弦相似度 ≥ 阈值 |
| 文本规则 | 归一化(空白/标点)后高 overlap、containment、或长度比例接近 |
| 元数据邻近 | 时间相近;或 source / story / chapter 关联接近(若有) |
必须:embedding 高相似 不等于 重复(主题/情绪相近的不同场景)。
5) 保留哪条 chunk:canonical 选择策略(非「一律排除较晚」)
不采用简单「排除较晚一条」。
MVP 打分函数(示例因子,可加权)
referenced_count/ 被引用次数(若有)metadata完整度- 文本长度(信息更全)
- canonical 来源加成:来自 story/chapter 流水线、或
source_type/ 标记为 canonical 的来源
结果:保留 分高者,exclude 分低者;同分再 tie-break(如更早的 ingest、或更长的文本)。
6) Context:触发批次信息(收紧)
除 story_ids / chapter_ids / story_dispatch_ids / chapters_to_enqueue 外,显式传入:
| 字段 | 说明 |
|---|---|
trigger_source |
memoir_segments / chapter_recompose / manual / beat |
trigger_time |
ISO8601 或 monotonic |
candidate_chunk_ids / candidate_source_ids |
直接候选(可选) |
pipeline_run_id / request_id |
与 memoir task / HTTP 对齐,便于追溯 |
写入日志与(可选)MemoryCurationAction.details。
7) 可观测性:指标 + 日志
日志(保留):结构化,user_id、耗时、跳过原因、exclude 对。
指标(建议 Prometheus 或现有 metrics 体系)
memory_compaction_runs_totalmemory_compaction_skipped_total(含 lock、debounce、空增量)memory_compaction_chunks_scanned_totalmemory_compaction_chunks_excluded_totalmemory_compaction_candidates_totalmemory_compaction_duration_msmemory_compaction_lock_contention_total
派生关注
- exclude rate;
- duplicate detection 人工抽检 precision(流程外记录,非自动指标)。
原因:compaction 最常见问题是 悄悄做错,而非 crash。
架构示意(修订)
flowchart LR
subgraph triggers [Triggers]
MS[memoir_segments]
RC[chapter_recompose]
M[manual]
B[beat]
end
subgraph redis [Redis]
DEB[debounce_until_or_pending]
LOCK[lock memory_compaction user]
CUR[last_compaction_cursor]
end
subgraph worker [Worker]
T[memory_compaction_run idempotent]
end
triggers --> DEB
DEB --> T
T --> LOCK
T --> CUR
代码锚点(不变)
- 调度挂钩:
[process_memoir_segments](api/app/tasks/memoir_tasks.py)成功路径;可选[recompose_chapters_for_story](api/app/tasks/chapter_compose_tasks.py)(双触发由锁 + 幂等吸收)。 - 业务逻辑:新
features/memory/compaction_service.py;任务:tasks/memory_compaction_tasks.py;配置:core/config.py。
阶段切分(不变,与修订对齐)
- MVP:Redis 防抖主路径 + 用户锁 + chunk 增量 + 多层近重复 + 打分保留 + context + 日志 + 指标。
- 阶段二:pending 建议表 / LLM 仲裁。
- 阶段三:低频全量 + 分片。
Implementation todos(修订)
- Config:
memory_compaction_*(含 debounce 秒、锁 TTL、chunk 上限、相似度与「至少 K 层满足」)。 - Redis:
debounce_until/pending主路径;lock:memory_compaction:{user_id};last_compaction_cursor(或等价)。 - Service:幂等 compaction;chunk 索引增量;多层判定 + 打分 exclude;curation_action details 含对比 id 与 context。
- Celery:
memory_compaction_run入口锁与 debounce 校验;注册 task。 - Hooks:
memoir_tasks/ 可选chapter_compose_tasks传完整 context。 - Metrics + logs +
.env.example文档。