2026-03-30 10:46:35 +08:00
|
|
|
|
"""Celery:memory compaction(近重复 chunk 软排除)。"""
|
|
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
2026-04-30 14:11:46 +08:00
|
|
|
|
import asyncio
|
2026-03-30 10:46:35 +08:00
|
|
|
|
import time
|
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
from typing import Any
|
|
|
|
|
|
|
|
|
|
|
|
from celery import shared_task
|
|
|
|
|
|
|
feat: OpenTelemetry LGTM observability, dev tooling, and memoir UX fixes (#31) (#32)
* add staging ios app build script
* feat(api): add OpenTelemetry LGTM stack for local observability
Wire OTel traces, metrics, and logs through a collector to Tempo,
Prometheus, and Loki, with custom LLM instrumentation, dev compose overlay,
Grafana provisioning, env templates, and development.sh auto-start.
* feat: expand observability, harden dev tooling, and fix expo staging UX
Add business and LLM Prometheus metrics with Grafana dashboards, alerting,
and a metrics verification script. Wire telemetry through adapters and core
LLM paths, and document the local LGTM workflow.
Fix development.sh for macOS bash 3.2, open Grafana and eval-web in Chrome,
and repair eval-web auto-open (unbound EVAL_WEB_BROWSER_SCHEDULED). Merge
internal-eval into the main dev script with improved compose handling.
Require EXPO_PUBLIC_* at build time, improve iOS HTTP ATS for staging IPs,
show memoir empty state instead of load errors when no chapters exist, and
add jest env setup plus chapter list response normalization.
* chore: enable Grafana Assistant Cursor plugin
* fix: memoir empty state and repair withdrawn 0020_chapters_book_id stamp
Show empty memoir UI when the chapter list succeeds with no items; treat auth/404 as non-fatal. Extend alembic revision repair so local dev DBs stamped with the removed 0020_chapters_book_id migration can roll back and upgrade to 0019.
---------
Co-authored-by: Kevin <kevin@brighteng.org>
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-20 15:14:13 +08:00
|
|
|
|
from app.core.business_telemetry import business_span
|
2026-05-22 13:44:50 +08:00
|
|
|
|
from app.core.db import AsyncSessionLocal, transactional
|
2026-03-30 10:46:35 +08:00
|
|
|
|
from app.core.logging import get_logger
|
|
|
|
|
|
from app.core.memory_compaction_schedule import (
|
|
|
|
|
|
finalize_memory_compaction_run,
|
|
|
|
|
|
read_debounce_deadline_ts,
|
|
|
|
|
|
release_scheduler_gate,
|
2026-04-03 11:43:16 +08:00
|
|
|
|
schedule_memory_compaction_run,
|
2026-03-30 10:46:35 +08:00
|
|
|
|
set_incremental_cursor_pair,
|
|
|
|
|
|
)
|
|
|
|
|
|
from app.core.redis_lock import acquire_redis_lock, release_redis_lock
|
2026-04-30 14:11:46 +08:00
|
|
|
|
from app.features.memory.repo import list_users_with_recent_chunks
|
|
|
|
|
|
from app.features.memory.service import MemoryService
|
2026-05-22 13:44:50 +08:00
|
|
|
|
from app.features.memory.constants import memory
|
2026-03-30 10:46:35 +08:00
|
|
|
|
|
|
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
2026-04-30 14:11:46 +08:00
|
|
|
|
async def _list_users_with_recent_chunks_async(hours: int) -> list[str]:
|
|
|
|
|
|
async with AsyncSessionLocal() as db:
|
|
|
|
|
|
return await list_users_with_recent_chunks(db, hours=hours)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _run_memory_compaction_async(
|
|
|
|
|
|
user_id: str,
|
|
|
|
|
|
context: dict[str, Any] | None,
|
|
|
|
|
|
) -> dict[str, Any]:
|
|
|
|
|
|
async with AsyncSessionLocal() as db:
|
2026-05-22 13:44:50 +08:00
|
|
|
|
async with transactional(db):
|
|
|
|
|
|
service = MemoryService(db)
|
|
|
|
|
|
return await service.compact_user(user_id, context)
|
2026-04-30 14:11:46 +08:00
|
|
|
|
|
|
|
|
|
|
|
2026-05-22 13:44:50 +08:00
|
|
|
|
@shared_task(bind=True, ignore_result=True)
|
|
|
|
|
|
def memory_compaction_sweep(self) -> dict[str, Any]:
|
2026-04-03 11:43:16 +08:00
|
|
|
|
"""Beat:为近期有记忆写入的用户调度 compaction(debounce 仍由 schedule 合并)。"""
|
feat(eval): memoir A/B chapter judging and eval-web parity with dialogue
- Judge baseline excerpt and library chapter separately; build_memoir_compare_summary for gate, nine-dim and leaf deltas.
- Memoir SSE chapter payload: baseline_judge, compare_summary, baseline_judge_error.
- MemoirJudgeOutput: loose score coercion and post-validate clamp; memoir judge prompt caps from settings.
- app-eval-web: two-column MemoirScoreCard layout, MemoirCompareSummary, chapter blocks and CSS.
- Add memoir_compare_summary, log_events, celery_log_context, memoir_pipeline_progress; tests and migration 0014.
- Misc: memory/evidence and enrichment paths, task/orchestrator updates, internal-eval docs, env examples.
2026-04-10 10:23:43 +08:00
|
|
|
|
t0 = time.perf_counter()
|
2026-05-22 13:44:50 +08:00
|
|
|
|
if not memory.compaction_enabled:
|
2026-04-03 11:43:16 +08:00
|
|
|
|
return {"skipped": True, "reason": "disabled"}
|
2026-05-22 13:44:50 +08:00
|
|
|
|
hours = int(memory.compaction_sweep_recent_hours)
|
feat: OpenTelemetry LGTM observability, dev tooling, and memoir UX fixes (#31) (#32)
* add staging ios app build script
* feat(api): add OpenTelemetry LGTM stack for local observability
Wire OTel traces, metrics, and logs through a collector to Tempo,
Prometheus, and Loki, with custom LLM instrumentation, dev compose overlay,
Grafana provisioning, env templates, and development.sh auto-start.
* feat: expand observability, harden dev tooling, and fix expo staging UX
Add business and LLM Prometheus metrics with Grafana dashboards, alerting,
and a metrics verification script. Wire telemetry through adapters and core
LLM paths, and document the local LGTM workflow.
Fix development.sh for macOS bash 3.2, open Grafana and eval-web in Chrome,
and repair eval-web auto-open (unbound EVAL_WEB_BROWSER_SCHEDULED). Merge
internal-eval into the main dev script with improved compose handling.
Require EXPO_PUBLIC_* at build time, improve iOS HTTP ATS for staging IPs,
show memoir empty state instead of load errors when no chapters exist, and
add jest env setup plus chapter list response normalization.
* chore: enable Grafana Assistant Cursor plugin
* fix: memoir empty state and repair withdrawn 0020_chapters_book_id stamp
Show empty memoir UI when the chapter list succeeds with no items; treat auth/404 as non-fatal. Extend alembic revision repair so local dev DBs stamped with the removed 0020_chapters_book_id migration can roll back and upgrade to 0019.
---------
Co-authored-by: Kevin <kevin@brighteng.org>
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-20 15:14:13 +08:00
|
|
|
|
with business_span("memory.compaction.sweep", hours=hours):
|
|
|
|
|
|
user_ids = asyncio.run(_list_users_with_recent_chunks_async(hours))
|
2026-04-03 11:43:16 +08:00
|
|
|
|
ctx_base: dict[str, Any] = {"trigger_source": "beat", "sweep_hours": hours}
|
2026-05-22 13:44:50 +08:00
|
|
|
|
scheduled = 0
|
|
|
|
|
|
failed = 0
|
2026-04-03 11:43:16 +08:00
|
|
|
|
for uid in user_ids:
|
2026-05-22 13:44:50 +08:00
|
|
|
|
try:
|
|
|
|
|
|
schedule_memory_compaction_run(uid, dict(ctx_base))
|
|
|
|
|
|
scheduled += 1
|
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
|
failed += 1
|
|
|
|
|
|
logger.warning(
|
|
|
|
|
|
"event=memory_compaction_sweep_schedule_failed user_id={} exc={} "
|
|
|
|
|
|
"msg=单用户 compaction 调度失败,继续扫描",
|
|
|
|
|
|
uid,
|
|
|
|
|
|
exc,
|
|
|
|
|
|
)
|
feat(eval): memoir A/B chapter judging and eval-web parity with dialogue
- Judge baseline excerpt and library chapter separately; build_memoir_compare_summary for gate, nine-dim and leaf deltas.
- Memoir SSE chapter payload: baseline_judge, compare_summary, baseline_judge_error.
- MemoirJudgeOutput: loose score coercion and post-validate clamp; memoir judge prompt caps from settings.
- app-eval-web: two-column MemoirScoreCard layout, MemoirCompareSummary, chapter blocks and CSS.
- Add memoir_compare_summary, log_events, celery_log_context, memoir_pipeline_progress; tests and migration 0014.
- Misc: memory/evidence and enrichment paths, task/orchestrator updates, internal-eval docs, env examples.
2026-04-10 10:23:43 +08:00
|
|
|
|
ms = (time.perf_counter() - t0) * 1000
|
2026-04-03 11:43:16 +08:00
|
|
|
|
logger.info(
|
2026-05-22 13:44:50 +08:00
|
|
|
|
"event=memory_compaction_sweep_done hours={} scheduled_users={} failed_users={} "
|
|
|
|
|
|
"duration_ms={:.1f} msg=记忆压缩定时扫描已调度",
|
feat(eval): memoir A/B chapter judging and eval-web parity with dialogue
- Judge baseline excerpt and library chapter separately; build_memoir_compare_summary for gate, nine-dim and leaf deltas.
- Memoir SSE chapter payload: baseline_judge, compare_summary, baseline_judge_error.
- MemoirJudgeOutput: loose score coercion and post-validate clamp; memoir judge prompt caps from settings.
- app-eval-web: two-column MemoirScoreCard layout, MemoirCompareSummary, chapter blocks and CSS.
- Add memoir_compare_summary, log_events, celery_log_context, memoir_pipeline_progress; tests and migration 0014.
- Misc: memory/evidence and enrichment paths, task/orchestrator updates, internal-eval docs, env examples.
2026-04-10 10:23:43 +08:00
|
|
|
|
hours,
|
2026-05-22 13:44:50 +08:00
|
|
|
|
scheduled,
|
|
|
|
|
|
failed,
|
feat(eval): memoir A/B chapter judging and eval-web parity with dialogue
- Judge baseline excerpt and library chapter separately; build_memoir_compare_summary for gate, nine-dim and leaf deltas.
- Memoir SSE chapter payload: baseline_judge, compare_summary, baseline_judge_error.
- MemoirJudgeOutput: loose score coercion and post-validate clamp; memoir judge prompt caps from settings.
- app-eval-web: two-column MemoirScoreCard layout, MemoirCompareSummary, chapter blocks and CSS.
- Add memoir_compare_summary, log_events, celery_log_context, memoir_pipeline_progress; tests and migration 0014.
- Misc: memory/evidence and enrichment paths, task/orchestrator updates, internal-eval docs, env examples.
2026-04-10 10:23:43 +08:00
|
|
|
|
ms,
|
2026-04-03 11:43:16 +08:00
|
|
|
|
)
|
2026-05-22 13:44:50 +08:00
|
|
|
|
return {"scheduled": scheduled, "failed": failed, "hours": hours}
|
2026-04-03 11:43:16 +08:00
|
|
|
|
|
|
|
|
|
|
|
2026-05-22 13:44:50 +08:00
|
|
|
|
@shared_task(bind=True, max_retries=12, default_retry_delay=20, ignore_result=True)
|
2026-03-30 10:46:35 +08:00
|
|
|
|
def memory_compaction_run(
|
|
|
|
|
|
self, user_id: str, context: dict[str, Any] | None = None
|
|
|
|
|
|
) -> dict[str, Any]:
|
feat(eval): memoir A/B chapter judging and eval-web parity with dialogue
- Judge baseline excerpt and library chapter separately; build_memoir_compare_summary for gate, nine-dim and leaf deltas.
- Memoir SSE chapter payload: baseline_judge, compare_summary, baseline_judge_error.
- MemoirJudgeOutput: loose score coercion and post-validate clamp; memoir judge prompt caps from settings.
- app-eval-web: two-column MemoirScoreCard layout, MemoirCompareSummary, chapter blocks and CSS.
- Add memoir_compare_summary, log_events, celery_log_context, memoir_pipeline_progress; tests and migration 0014.
- Misc: memory/evidence and enrichment paths, task/orchestrator updates, internal-eval docs, env examples.
2026-04-10 10:23:43 +08:00
|
|
|
|
run_t0 = time.perf_counter()
|
2026-05-22 13:44:50 +08:00
|
|
|
|
if not memory.compaction_enabled:
|
2026-03-30 10:46:35 +08:00
|
|
|
|
return {"skipped": True, "reason": "disabled"}
|
|
|
|
|
|
|
|
|
|
|
|
ctx = dict(context or {})
|
|
|
|
|
|
deadline = read_debounce_deadline_ts(user_id)
|
|
|
|
|
|
now = time.time()
|
|
|
|
|
|
if deadline is not None and now < deadline:
|
|
|
|
|
|
delay = max(1.0, deadline - now)
|
|
|
|
|
|
raise self.retry(countdown=int(delay))
|
|
|
|
|
|
|
|
|
|
|
|
lock = acquire_redis_lock(
|
|
|
|
|
|
f"lock:memory_compaction:{user_id}",
|
2026-05-22 13:44:50 +08:00
|
|
|
|
ttl_seconds=memory.compaction_lock_ttl_seconds,
|
2026-03-30 10:46:35 +08:00
|
|
|
|
)
|
|
|
|
|
|
if lock is None:
|
feat(eval): memoir A/B chapter judging and eval-web parity with dialogue
- Judge baseline excerpt and library chapter separately; build_memoir_compare_summary for gate, nine-dim and leaf deltas.
- Memoir SSE chapter payload: baseline_judge, compare_summary, baseline_judge_error.
- MemoirJudgeOutput: loose score coercion and post-validate clamp; memoir judge prompt caps from settings.
- app-eval-web: two-column MemoirScoreCard layout, MemoirCompareSummary, chapter blocks and CSS.
- Add memoir_compare_summary, log_events, celery_log_context, memoir_pipeline_progress; tests and migration 0014.
- Misc: memory/evidence and enrichment paths, task/orchestrator updates, internal-eval docs, env examples.
2026-04-10 10:23:43 +08:00
|
|
|
|
ms = (time.perf_counter() - run_t0) * 1000
|
2026-03-30 10:46:35 +08:00
|
|
|
|
logger.info(
|
feat(eval): memoir A/B chapter judging and eval-web parity with dialogue
- Judge baseline excerpt and library chapter separately; build_memoir_compare_summary for gate, nine-dim and leaf deltas.
- Memoir SSE chapter payload: baseline_judge, compare_summary, baseline_judge_error.
- MemoirJudgeOutput: loose score coercion and post-validate clamp; memoir judge prompt caps from settings.
- app-eval-web: two-column MemoirScoreCard layout, MemoirCompareSummary, chapter blocks and CSS.
- Add memoir_compare_summary, log_events, celery_log_context, memoir_pipeline_progress; tests and migration 0014.
- Misc: memory/evidence and enrichment paths, task/orchestrator updates, internal-eval docs, env examples.
2026-04-10 10:23:43 +08:00
|
|
|
|
"event=memory_compaction_skipped user_id={} reason=lock_not_acquired "
|
|
|
|
|
|
"duration_ms={:.1f} msg=记忆压缩跳过(未拿到锁)",
|
2026-03-30 10:46:35 +08:00
|
|
|
|
user_id,
|
feat(eval): memoir A/B chapter judging and eval-web parity with dialogue
- Judge baseline excerpt and library chapter separately; build_memoir_compare_summary for gate, nine-dim and leaf deltas.
- Memoir SSE chapter payload: baseline_judge, compare_summary, baseline_judge_error.
- MemoirJudgeOutput: loose score coercion and post-validate clamp; memoir judge prompt caps from settings.
- app-eval-web: two-column MemoirScoreCard layout, MemoirCompareSummary, chapter blocks and CSS.
- Add memoir_compare_summary, log_events, celery_log_context, memoir_pipeline_progress; tests and migration 0014.
- Misc: memory/evidence and enrichment paths, task/orchestrator updates, internal-eval docs, env examples.
2026-04-10 10:23:43 +08:00
|
|
|
|
ms,
|
2026-03-30 10:46:35 +08:00
|
|
|
|
)
|
|
|
|
|
|
out = {"skipped": True, "reason": "lock_not_acquired"}
|
|
|
|
|
|
finalize_memory_compaction_run(
|
|
|
|
|
|
user_id,
|
|
|
|
|
|
observed_deadline_ts=deadline,
|
|
|
|
|
|
context=ctx,
|
|
|
|
|
|
)
|
|
|
|
|
|
return out
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
feat: OpenTelemetry LGTM observability, dev tooling, and memoir UX fixes (#31) (#32)
* add staging ios app build script
* feat(api): add OpenTelemetry LGTM stack for local observability
Wire OTel traces, metrics, and logs through a collector to Tempo,
Prometheus, and Loki, with custom LLM instrumentation, dev compose overlay,
Grafana provisioning, env templates, and development.sh auto-start.
* feat: expand observability, harden dev tooling, and fix expo staging UX
Add business and LLM Prometheus metrics with Grafana dashboards, alerting,
and a metrics verification script. Wire telemetry through adapters and core
LLM paths, and document the local LGTM workflow.
Fix development.sh for macOS bash 3.2, open Grafana and eval-web in Chrome,
and repair eval-web auto-open (unbound EVAL_WEB_BROWSER_SCHEDULED). Merge
internal-eval into the main dev script with improved compose handling.
Require EXPO_PUBLIC_* at build time, improve iOS HTTP ATS for staging IPs,
show memoir empty state instead of load errors when no chapters exist, and
add jest env setup plus chapter list response normalization.
* chore: enable Grafana Assistant Cursor plugin
* fix: memoir empty state and repair withdrawn 0020_chapters_book_id stamp
Show empty memoir UI when the chapter list succeeds with no items; treat auth/404 as non-fatal. Extend alembic revision repair so local dev DBs stamped with the removed 0020_chapters_book_id migration can roll back and upgrade to 0019.
---------
Co-authored-by: Kevin <kevin@brighteng.org>
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-20 15:14:13 +08:00
|
|
|
|
with business_span("memory.compaction.run"):
|
|
|
|
|
|
out = asyncio.run(_run_memory_compaction_async(user_id, ctx))
|
2026-03-30 10:46:35 +08:00
|
|
|
|
|
|
|
|
|
|
if out.get("new_cursor_ts") and out.get("new_cursor_id") is not None:
|
|
|
|
|
|
set_incremental_cursor_pair(
|
|
|
|
|
|
user_id,
|
|
|
|
|
|
datetime.fromisoformat(out["new_cursor_ts"]),
|
|
|
|
|
|
str(out["new_cursor_id"]),
|
|
|
|
|
|
)
|
|
|
|
|
|
finalize_memory_compaction_run(
|
|
|
|
|
|
user_id,
|
|
|
|
|
|
observed_deadline_ts=deadline,
|
|
|
|
|
|
context=ctx,
|
|
|
|
|
|
)
|
feat(eval): memoir A/B chapter judging and eval-web parity with dialogue
- Judge baseline excerpt and library chapter separately; build_memoir_compare_summary for gate, nine-dim and leaf deltas.
- Memoir SSE chapter payload: baseline_judge, compare_summary, baseline_judge_error.
- MemoirJudgeOutput: loose score coercion and post-validate clamp; memoir judge prompt caps from settings.
- app-eval-web: two-column MemoirScoreCard layout, MemoirCompareSummary, chapter blocks and CSS.
- Add memoir_compare_summary, log_events, celery_log_context, memoir_pipeline_progress; tests and migration 0014.
- Misc: memory/evidence and enrichment paths, task/orchestrator updates, internal-eval docs, env examples.
2026-04-10 10:23:43 +08:00
|
|
|
|
ms = (time.perf_counter() - run_t0) * 1000
|
|
|
|
|
|
logger.info(
|
|
|
|
|
|
"event=memory_compaction_done user_id={} duration_ms={:.1f} msg=记忆压缩运行完成",
|
|
|
|
|
|
user_id,
|
|
|
|
|
|
ms,
|
|
|
|
|
|
)
|
2026-03-30 10:46:35 +08:00
|
|
|
|
return out
|
|
|
|
|
|
except Exception as exc:
|
feat(eval): memoir A/B chapter judging and eval-web parity with dialogue
- Judge baseline excerpt and library chapter separately; build_memoir_compare_summary for gate, nine-dim and leaf deltas.
- Memoir SSE chapter payload: baseline_judge, compare_summary, baseline_judge_error.
- MemoirJudgeOutput: loose score coercion and post-validate clamp; memoir judge prompt caps from settings.
- app-eval-web: two-column MemoirScoreCard layout, MemoirCompareSummary, chapter blocks and CSS.
- Add memoir_compare_summary, log_events, celery_log_context, memoir_pipeline_progress; tests and migration 0014.
- Misc: memory/evidence and enrichment paths, task/orchestrator updates, internal-eval docs, env examples.
2026-04-10 10:23:43 +08:00
|
|
|
|
ms = (time.perf_counter() - run_t0) * 1000
|
|
|
|
|
|
logger.warning(
|
|
|
|
|
|
"event=memory_compaction_failed user_id={} duration_ms={:.1f} err={} "
|
|
|
|
|
|
"msg=记忆压缩运行失败",
|
|
|
|
|
|
user_id,
|
|
|
|
|
|
ms,
|
|
|
|
|
|
exc,
|
|
|
|
|
|
)
|
2026-03-30 10:46:35 +08:00
|
|
|
|
release_scheduler_gate(user_id)
|
feat(eval): memoir A/B chapter judging and eval-web parity with dialogue
- Judge baseline excerpt and library chapter separately; build_memoir_compare_summary for gate, nine-dim and leaf deltas.
- Memoir SSE chapter payload: baseline_judge, compare_summary, baseline_judge_error.
- MemoirJudgeOutput: loose score coercion and post-validate clamp; memoir judge prompt caps from settings.
- app-eval-web: two-column MemoirScoreCard layout, MemoirCompareSummary, chapter blocks and CSS.
- Add memoir_compare_summary, log_events, celery_log_context, memoir_pipeline_progress; tests and migration 0014.
- Misc: memory/evidence and enrichment paths, task/orchestrator updates, internal-eval docs, env examples.
2026-04-10 10:23:43 +08:00
|
|
|
|
raise self.retry(exc=exc) from exc
|
2026-03-30 10:46:35 +08:00
|
|
|
|
finally:
|
|
|
|
|
|
release_redis_lock(lock)
|