From 41518bda11ef924d2936af25b9f4bd6badfed24c Mon Sep 17 00:00:00 2001 From: Kevin Date: Fri, 3 Apr 2026 11:43:16 +0800 Subject: [PATCH] =?UTF-8?q?=E8=81=8A=E5=A4=A9=E5=92=8C=E5=9B=9E=E5=BF=86?= =?UTF-8?q?=E5=BD=95=E8=AF=81=E6=8D=AE=E6=A3=80=E7=B4=A2=E9=83=BD=E8=B5=B0?= =?UTF-8?q?=20pgvector=EF=BC=8C=E5=8E=BB=E6=8E=89=20Postgres=20FTS/content?= =?UTF-8?q?=5Ftsv=EF=BC=8C=E6=96=B0=E8=BF=81=E7=A7=BB=E5=88=A0=E6=8E=89=20?= =?UTF-8?q?content=5Ftsv=20=E5=88=97=EF=BC=88=E9=83=A8=E7=BD=B2=E8=A6=81?= =?UTF-8?q?=E5=85=88=20alembic=20upgrade=EF=BC=89=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Embedding 端口增加 is_available(),聊天和回忆录日志用统一方式表示向量是否真能调用。 记忆整理(compaction)支持 Beat 定期扫用户; 事实抽取提示与 subject 归一化,减少同一人多种称呼; --- api/.env.example | 4 +- api/.env.production | 4 +- api/.env.staging | 16 +++ api/README.md | 3 +- .../versions/0007_drop_chunk_content_tsv.py | 28 ++++ api/app/adapters/embedding/zhipu.py | 22 +++ api/app/agents/chat/orchestrator.py | 17 ++- api/app/core/config.py | 10 +- api/app/features/memoir/service.py | 2 +- .../features/memoir/story_pipeline_sync.py | 21 ++- api/app/features/memory/enrichment.py | 31 +++- .../features/memory/enrichment_pipeline.py | 30 +++- api/app/features/memory/evidence.py | 64 +++++++-- api/app/features/memory/extractor.py | 66 ++++++--- api/app/features/memory/models.py | 3 - api/app/features/memory/repo.py | 133 ++++++++---------- api/app/features/memory/retriever.py | 60 +++----- api/app/features/memory/service.py | 21 +-- api/app/ports/embedding.py | 12 ++ api/app/tasks/celery_app.py | 10 +- api/app/tasks/memory_compaction_tasks.py | 19 +++ api/docker-compose.yml | 45 +++--- api/docs/memoir_reliability.md | 3 +- api/docs/memory-retrieval.md | 19 ++- api/tests/test_memory_compaction.py | 67 ++++++++- api/tests/test_memory_evidence.py | 55 ++++++++ 26 files changed, 543 insertions(+), 222 deletions(-) create mode 100644 api/alembic/versions/0007_drop_chunk_content_tsv.py diff --git a/api/.env.example b/api/.env.example index 68029ec..a524a2c 100644 --- a/api/.env.example +++ b/api/.env.example @@ -116,8 +116,9 @@ REDIS_SESSION_TTL=86400 # ============================================================================= # Memory compaction(近重复 memory chunk 软排除;Celery + Redis 防抖) +# 模板统一默认开启;须同时运行 celery worker 与 celery-beat(docker-compose 已含 beat,负责 memory_compaction_sweep)。 # ============================================================================= -# MEMORY_COMPACTION_ENABLED=false +MEMORY_COMPACTION_ENABLED=true # MEMORY_COMPACTION_DEBOUNCE_SECONDS=105 # MEMORY_COMPACTION_LOCK_TTL_SECONDS=600 # MEMORY_COMPACTION_CHUNK_SIMILARITY_THRESHOLD=0.92 @@ -127,6 +128,7 @@ REDIS_SESSION_TTL=86400 # MEMORY_COMPACTION_MAX_NEIGHBORS_PER_CHUNK=25 # MEMORY_COMPACTION_TEXT_JACCARD_MIN=0.55 # MEMORY_COMPACTION_METADATA_EVENT_YEAR_WINDOW=1 +# MEMORY_COMPACTION_SWEEP_RECENT_HOURS=24 # ============================================================================= # Story 流水线(post-commit、章节物化、append 上限、evidence 检索) diff --git a/api/.env.production b/api/.env.production index 26b31fe..68a76af 100644 --- a/api/.env.production +++ b/api/.env.production @@ -100,8 +100,9 @@ REDIS_SESSION_TTL=86400 # ============================================================================= # Memory compaction(近重复 memory chunk 软排除;Celery + Redis 防抖) +# 与 .env.example / .env.development 一致默认开启;需 running:celery worker + celery-beat(见 docker-compose.yml)。 # ============================================================================= -# MEMORY_COMPACTION_ENABLED=false +MEMORY_COMPACTION_ENABLED=true # MEMORY_COMPACTION_DEBOUNCE_SECONDS=105 # MEMORY_COMPACTION_LOCK_TTL_SECONDS=600 # MEMORY_COMPACTION_CHUNK_SIMILARITY_THRESHOLD=0.92 @@ -111,6 +112,7 @@ REDIS_SESSION_TTL=86400 # MEMORY_COMPACTION_MAX_NEIGHBORS_PER_CHUNK=25 # MEMORY_COMPACTION_TEXT_JACCARD_MIN=0.55 # MEMORY_COMPACTION_METADATA_EVENT_YEAR_WINDOW=1 +# MEMORY_COMPACTION_SWEEP_RECENT_HOURS=24 # ============================================================================= # Story 流水线(post-commit、章节物化、append 上限、evidence 检索) diff --git a/api/.env.staging b/api/.env.staging index 8491fac..813b403 100644 --- a/api/.env.staging +++ b/api/.env.staging @@ -36,6 +36,22 @@ DATABASE_URL=postgresql://postgres:postgres@postgres:5432/life_echo REDIS_URL=redis://redis:6379/0 REDIS_SESSION_TTL=86400 +# ============================================================================= +# Memory compaction(近重复 memory chunk 软排除;Celery + Redis 防抖) +# 与 example / development / production 一致默认开启;预发须跑 worker + celery-beat。 +# ============================================================================= +MEMORY_COMPACTION_ENABLED=true +# MEMORY_COMPACTION_DEBOUNCE_SECONDS=105 +# MEMORY_COMPACTION_LOCK_TTL_SECONDS=600 +# MEMORY_COMPACTION_CHUNK_SIMILARITY_THRESHOLD=0.92 +# MEMORY_COMPACTION_MIN_LAYERS_FOR_EXCLUDE=2 +# MEMORY_COMPACTION_MAX_CHUNKS_PER_RUN=200 +# MEMORY_COMPACTION_MAX_EXCLUDES_PER_RUN=50 +# MEMORY_COMPACTION_MAX_NEIGHBORS_PER_CHUNK=25 +# MEMORY_COMPACTION_TEXT_JACCARD_MIN=0.55 +# MEMORY_COMPACTION_METADATA_EVENT_YEAR_WINDOW=1 +# MEMORY_COMPACTION_SWEEP_RECENT_HOURS=24 + # ============================================================================= # Auth # ============================================================================= diff --git a/api/README.md b/api/README.md index 4fea135..fd01a08 100644 --- a/api/README.md +++ b/api/README.md @@ -15,7 +15,8 @@ Life Echo API 是一个智能对话系统,通过 WebSocket 实时连接,使 ### LLM 与记忆(约定文档) - **JSON 模式**:结构化抽取/路由/叙事 JSON 使用 `app/core/langchain_llm.py` 的 `bind_json_object_mode`(与 [DeepSeek JSON Output](https://api-docs.deepseek.com/guides/json_mode) 一致);详见 [`docs/llm-json-mode.md`](docs/llm-json-mode.md)。适配器说明见 [`app/adapters/llm/deepseek.py`](app/adapters/llm/deepseek.py)。 -- **记忆检索**:异步 `HybridRetriever`(FTS+向量)与 Celery `retrieve_evidence_sync`(FTS)差异见 [`docs/memory-retrieval.md`](docs/memory-retrieval.md)。 +- **记忆检索**:异步与 Celery 均使用 **向量(pgvector)** chunks,见 [`docs/memory-retrieval.md`](docs/memory-retrieval.md)。 +- **Memory compaction**:`.env.example` / [`.env.development`](.env.development) / [`.env.staging`](.env.staging) / [`.env.production`](.env.production) 均默认 `MEMORY_COMPACTION_ENABLED=true`。须运行 **Celery worker** 与 **celery-beat**([`docker-compose.yml`](docker-compose.yml) 已包含 `celery-beat`,用于定期 `memory_compaction_sweep`)。 ## 技术栈 diff --git a/api/alembic/versions/0007_drop_chunk_content_tsv.py b/api/alembic/versions/0007_drop_chunk_content_tsv.py new file mode 100644 index 0000000..f230546 --- /dev/null +++ b/api/alembic/versions/0007_drop_chunk_content_tsv.py @@ -0,0 +1,28 @@ +"""移除 memory_chunks.content_tsv(FTS 列已停用,检索统一向量)。 + +Revision ID: 0007_drop_chunk_content_tsv +Revises: 0006_segment_memoir_phases +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +from alembic import op + +revision: str = "0007_drop_chunk_content_tsv" +down_revision: Union[str, None] = "0006_segment_memoir_phases" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.drop_column("memory_chunks", "content_tsv") + + +def downgrade() -> None: + op.add_column( + "memory_chunks", + sa.Column("content_tsv", postgresql.TSVECTOR(), nullable=True), + ) diff --git a/api/app/adapters/embedding/zhipu.py b/api/app/adapters/embedding/zhipu.py index de055cd..cb018ad 100644 --- a/api/app/adapters/embedding/zhipu.py +++ b/api/app/adapters/embedding/zhipu.py @@ -7,6 +7,9 @@ import asyncio from zai import ZhipuAiClient from app.core.embedding import MEMORY_EMBEDDING_DIMENSION +from app.core.logging import get_logger + +_logger = get_logger(__name__) # 单次请求最多 64 条文本(智谱 Embedding-3 文档) _EMBED_BATCH_SIZE = 64 @@ -22,6 +25,9 @@ class ZhipuEmbeddingProvider: ) -> None: self._model = model if not api_key: + _logger.warning( + "ZhipuEmbeddingProvider: api_key 为空,embedding 将不可用(记忆检索与 ingest 向量写入会降级)" + ) self._client = None elif base_url: self._client = ZhipuAiClient( @@ -31,6 +37,9 @@ class ZhipuEmbeddingProvider: else: self._client = ZhipuAiClient(api_key=api_key) + def is_available(self) -> bool: + return self._client is not None + def _create_vectors_sync(self, texts: list[str]) -> list[list[float]]: assert self._client is not None resp = self._client.embeddings.create( @@ -54,3 +63,16 @@ class ZhipuEmbeddingProvider: part = await asyncio.to_thread(self._create_vectors_sync, batch) out.extend(part) return out + + def embed_text_sync(self, text: str) -> list[float]: + vecs = self.embed_texts_sync([text]) + return vecs[0] if vecs else [] + + def embed_texts_sync(self, texts: list[str]) -> list[list[float]]: + if not self._client or not texts: + return [] + out: list[list[float]] = [] + for i in range(0, len(texts), _EMBED_BATCH_SIZE): + batch = texts[i : i + _EMBED_BATCH_SIZE] + out.extend(self._create_vectors_sync(batch)) + return out diff --git a/api/app/agents/chat/orchestrator.py b/api/app/agents/chat/orchestrator.py index 694b7f1..9d0e8ff 100644 --- a/api/app/agents/chat/orchestrator.py +++ b/api/app/agents/chat/orchestrator.py @@ -68,9 +68,22 @@ async def _fetch_interview_memory_evidence( ): return "" try: - ms = MemoryService(db, embedding_provider=get_embedding_provider()) + emb = get_embedding_provider() + ms = MemoryService(db, embedding_provider=emb) bundle = await ms.retrieve(user_id, msg, top_k=settings.chat_memory_top_k) - text = format_evidence_chunks_for_prompt(bundle.model_dump()) + bd = bundle.model_dump() + vector_ok = emb.is_available() + logger.info( + "memory_evidence_retrieved user_id={} chunks={} facts={} summaries={} timeline={} stories={} vector_ok={}", + user_id, + len(bd.get("relevant_chunks") or []), + len(bd.get("relevant_facts") or []), + len(bd.get("relevant_summaries") or []), + len(bd.get("timeline_hints") or []), + len(bd.get("relevant_stories") or []), + vector_ok, + ) + text = format_evidence_chunks_for_prompt(bd) t = (text or "").strip() if not t: return "" diff --git a/api/app/core/config.py b/api/app/core/config.py index 71dddc5..8ee8417 100644 --- a/api/app/core/config.py +++ b/api/app/core/config.py @@ -267,16 +267,16 @@ class Settings(BaseSettings): memoir_phase2_singleflight_immediate: bool = True # ── Memory 检索与富化 ───────────────────────────────────── - # True:query 为空时仍返回 rolling 摘要 + 最近事实/时间线(无 chunk FTS) + # True:query 为空时仍返回 rolling 摘要 + 最近事实/时间线(无 chunk 向量检索) memory_evidence_empty_query_include_rolling: bool = False # False:跳过 ingest 后 LLM 富化(摘要/事实/时间线) memory_enrichment_enabled: bool = True memory_enrichment_max_chars: int = Field(default=12000, ge=1000, le=100_000) - # True:事实 FTS 未命中时退回「最近 confirmed 事实」(易引入无关/矛盾事实;默认关) + # True:事实 ILIKE 未命中时退回「最近 confirmed 事实」(易引入无关/矛盾事实;默认关) memory_fact_search_use_recent_fallback: bool = False - # ── Memory compaction(近重复 chunk 软排除;事件触发 + Redis 防抖 + 用户锁)── - memory_compaction_enabled: bool = False + # ── Memory compaction(近重复 chunk 软排除;事件触发 + Redis 防抖 + 用户锁;需 worker + Beat 跑 sweep)── + memory_compaction_enabled: bool = True memory_compaction_debounce_seconds: int = Field(default=105, ge=10, le=3600) memory_compaction_lock_ttl_seconds: int = Field(default=600, ge=60, le=7200) memory_compaction_chunk_similarity_threshold: float = Field( @@ -288,6 +288,8 @@ class Settings(BaseSettings): memory_compaction_max_neighbors_per_chunk: int = Field(default=25, ge=5, le=100) memory_compaction_text_jaccard_min: float = Field(default=0.55, ge=0.0, le=1.0) memory_compaction_metadata_event_year_window: int = Field(default=1, ge=0, le=50) + # Beat sweep:扫描最近 N 小时内有新 chunk 的用户并调度 compaction + memory_compaction_sweep_recent_hours: int = Field(default=24, ge=1, le=168) # ── Liblib ─────────────────────────────────────────────── liblib_access_key: str = "" diff --git a/api/app/features/memoir/service.py b/api/app/features/memoir/service.py index e417d2d..581c289 100644 --- a/api/app/features/memoir/service.py +++ b/api/app/features/memoir/service.py @@ -74,7 +74,7 @@ class MemoirService: self._object_storage = object_storage async def get_evidence(self, user_id: str, query: str, *, top_k: int = 10) -> dict: - """通过 MemoryService → HybridRetriever 获取证据(含向量时与 Celery 的 FTS-only 路径不同)。""" + """通过 MemoryService → HybridRetriever 获取证据(向量 chunks,与 Celery 叙事路径一致)。""" if self._memory is None: return { "relevant_chunks": [], diff --git a/api/app/features/memoir/story_pipeline_sync.py b/api/app/features/memoir/story_pipeline_sync.py index 1b42d13..c4e14d6 100644 --- a/api/app/features/memoir/story_pipeline_sync.py +++ b/api/app/features/memoir/story_pipeline_sync.py @@ -33,6 +33,7 @@ from app.agents.memoir.story_route_agent import ( ) from app.agents.state_schema import MemoirStateSchema from app.core.config import settings +from app.core.dependencies import get_embedding_provider from app.core.logging import get_logger from app.features.memoir.cover_eligibility import chapter_needs_cover_enqueue from app.features.memoir.memoir_images.settings import MemoirImageSettings @@ -714,8 +715,16 @@ def run_story_pipeline_for_category_batch( top_k = int(settings.evidence_top_k_default) if n_units > int(settings.evidence_large_batch_threshold): top_k = int(settings.evidence_top_k_large_batch) + emb = get_embedding_provider() + embedding_available = emb.is_available() try: - evidence = retrieve_evidence_sync(session, user_id, combined_text, top_k=top_k) + evidence = retrieve_evidence_sync( + session, + user_id, + combined_text, + top_k=top_k, + embedding_provider=emb, + ) except Exception as e: logger.warning("Evidence 检索跳过: {}", e) evidence = { @@ -726,6 +735,16 @@ def run_story_pipeline_for_category_batch( "relevant_stories": [], } + logger.info( + "memoir_evidence_retrieved user_id={} chunks={} facts={} summaries={} stories={} vector_ok={}", + user_id, + len(evidence.get("relevant_chunks") or []), + len(evidence.get("relevant_facts") or []), + len(evidence.get("relevant_summaries") or []), + len(evidence.get("relevant_stories") or []), + embedding_available, + ) + evidence_text = format_evidence_chunks_for_prompt(evidence) oral_for_memoir = normalize_oral_for_memoir(combined_text, llm=llm) ct_raw = (combined_text or "").strip() diff --git a/api/app/features/memory/enrichment.py b/api/app/features/memory/enrichment.py index a2c4163..61e10ab 100644 --- a/api/app/features/memory/enrichment.py +++ b/api/app/features/memory/enrichment.py @@ -36,7 +36,12 @@ from app.features.memory.summarizer import ( generate_session_summary_async, generate_session_summary_sync, ) -from app.features.memory.enrichment_pipeline import dedupe_key, normalize_object_json +from app.features.memory.enrichment_pipeline import ( + dedupe_key, + normalize_object_json, + normalize_subject, +) +from app.features.user.models import User from app.features.memory.timeline import ( build_timeline_events_from_facts_async, build_timeline_events_from_facts_sync, @@ -69,6 +74,10 @@ def enrich_memory_after_ingest_sync( llm = _resolve_llm_sync() if not llm: return + narrator_name: str | None = None + u_row = session.get(User, user_id) + if u_row and (u_row.nickname or "").strip(): + narrator_name = (u_row.nickname or "").strip() chunks = list_chunks_for_source_sync(session, source_id) if not chunks: return @@ -111,11 +120,13 @@ def enrich_memory_after_ingest_sync( source_chunk_ids=chunk_ids, ) - raw_facts = extract_facts_from_transcript_sync(llm, numbered) + raw_facts = extract_facts_from_transcript_sync( + llm, numbered, narrator_name=narrator_name + ) seen: set[tuple] = set() inserted: list[dict] = [] for f in raw_facts: - key = dedupe_key(f) + key = dedupe_key(f, narrator_name=narrator_name) if key in seen: continue seen.add(key) @@ -126,7 +137,7 @@ def enrich_memory_after_ingest_sync( session, user_id=user_id, fact_type=f.get("fact_type") or "event", - subject=f.get("subject"), + subject=normalize_subject(f.get("subject"), narrator_name), predicate=f.get("predicate"), object_json=normalize_object_json(f.get("object_json")), confidence=float(f.get("confidence") or 0.75), @@ -175,6 +186,10 @@ async def enrich_memory_after_ingest_async( llm = _resolve_llm_sync() if not llm: return + narrator_name: str | None = None + u_row = await db.get(User, user_id) + if u_row and (u_row.nickname or "").strip(): + narrator_name = (u_row.nickname or "").strip() stmt = ( select(MemoryChunk) .where(MemoryChunk.source_id == source_id) @@ -227,11 +242,13 @@ async def enrich_memory_after_ingest_async( source_chunk_ids=chunk_ids, ) - raw_facts = await extract_facts_from_transcript_async(llm, numbered) + raw_facts = await extract_facts_from_transcript_async( + llm, numbered, narrator_name=narrator_name + ) seen: set[tuple] = set() inserted: list[dict] = [] for f in raw_facts: - key = dedupe_key(f) + key = dedupe_key(f, narrator_name=narrator_name) if key in seen: continue seen.add(key) @@ -242,7 +259,7 @@ async def enrich_memory_after_ingest_async( db, user_id=user_id, fact_type=f.get("fact_type") or "event", - subject=f.get("subject"), + subject=normalize_subject(f.get("subject"), narrator_name), predicate=f.get("predicate"), object_json=normalize_object_json(f.get("object_json")), confidence=float(f.get("confidence") or 0.75), diff --git a/api/app/features/memory/enrichment_pipeline.py b/api/app/features/memory/enrichment_pipeline.py index 76a4999..440cb9c 100644 --- a/api/app/features/memory/enrichment_pipeline.py +++ b/api/app/features/memory/enrichment_pipeline.py @@ -5,10 +5,34 @@ from __future__ import annotations import json from typing import Any +# 叙述者常见别名 — 归一化到 narrator_name 或「叙述者」 +_NARRATOR_ALIASES: frozenset[str] = frozenset( + { + "我", + "本人", + "人物", + "叙述者", + "讲述者", + "老人", + "自己", + "咱们", + } +) -def dedupe_key(f: dict) -> tuple: - s = f.get("subject") or "" - p = f.get("predicate") or "" + +def normalize_subject(subject: str | None, narrator_name: str | None = None) -> str: + """将代词/泛称映射为统一 subject,便于去重与检索。""" + s = (subject or "").strip() + if not s: + return narrator_name or "叙述者" + if s in _NARRATOR_ALIASES: + return narrator_name or "叙述者" + return s + + +def dedupe_key(f: dict, *, narrator_name: str | None = None) -> tuple: + s = normalize_subject(f.get("subject"), narrator_name) + p = (f.get("predicate") or "").strip() o = f.get("object_json") try: oj = json.dumps(o, sort_keys=True, ensure_ascii=False) if o is not None else "" diff --git a/api/app/features/memory/evidence.py b/api/app/features/memory/evidence.py index b8c14d4..7a34605 100644 --- a/api/app/features/memory/evidence.py +++ b/api/app/features/memory/evidence.py @@ -4,22 +4,24 @@ 权威层级(可靠性 hardening): - **Chunk 原文**(未 excluded)为首要证据;rolling 摘要/故事摘录为便利视图,不得压过冲突的 chunk。 - **MemoryFact**:`confirmed` 为检索默认集;`candidate` 可被上游提升;`stale` 由 compaction 等标出,检索时应排除。 -- 事实 FTS 无命中时是否退回「最近事实」由 `memory_fact_search_use_recent_fallback` 控制(默认可避免串台)。 +- 事实 ILIKE 无命中时是否退回「最近事实」由 `memory_fact_search_use_recent_fallback` 控制(默认可避免串台)。 -Celery 使用 sync;`HybridRetriever` 使用 async + RRF chunk 合并。 +Celery 使用 sync + 向量 chunks;`HybridRetriever` 使用 async + 向量 chunks。 """ from __future__ import annotations +from typing import TYPE_CHECKING + from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import Session from app.core.config import settings +from app.core.logging import get_logger from app.features.memory.repo import ( list_summaries_for_evidence_async, list_summaries_for_evidence_sync, - search_chunks_fts, - search_chunks_fts_sync, + search_chunks_vector_sync, search_facts_for_user_async, search_facts_for_user_sync, search_timeline_events_for_user_async, @@ -30,6 +32,11 @@ from app.features.story.repo import ( list_recent_stories_for_evidence_sync, ) +if TYPE_CHECKING: + from app.ports.embedding import EmbeddingProvider + +logger = get_logger(__name__) + EMPTY_EVIDENCE_BUNDLE: dict = { "relevant_chunks": [], "relevant_summaries": [], @@ -119,7 +126,7 @@ async def fetch_evidence_metadata_async( def _empty_query_bundle_sync(session: Session, user_id: str, top_k: int) -> dict: - """无 FTS query 时的「浏览」降级:rolling 摘要 + 事实/时间线 fallback。""" + """空 query 时的「浏览」降级:rolling 摘要 + 事实/时间线 fallback。""" from app.features.memory.models import MemorySummary from sqlalchemy import select @@ -204,19 +211,50 @@ async def _empty_query_bundle_async(db: AsyncSession, user_id: str, top_k: int) def retrieve_evidence_bundle_sync( - session: Session, user_id: str, query: str, *, top_k: int = 10 + session: Session, + user_id: str, + query: str, + *, + top_k: int = 10, + embedding_provider: "EmbeddingProvider | None" = None, ) -> dict: - """Celery / 叙事流水线:FTS-only chunks + 元数据。""" + """Celery / 叙事流水线:向量 chunks + 元数据(需 embedding_provider)。""" if not query or not query.strip(): if settings.memory_evidence_empty_query_include_rolling: return _empty_query_bundle_sync(session, user_id, top_k) return dict(EMPTY_EVIDENCE_BUNDLE) q = query.strip() - chunk_rows = search_chunks_fts_sync(session, user_id, q, top_k) - relevant_chunks = [ - {"id": r["id"], "content": r["content"], "chunk_index": r["chunk_index"]} - for r in chunk_rows - ] + relevant_chunks: list[dict] = [] + if embedding_provider is not None: + try: + q_emb = embedding_provider.embed_text_sync(q) + except Exception as exc: + logger.warning( + "retrieve_evidence_bundle_sync embed failed user_id={} err={}", + user_id, + exc, + ) + q_emb = [] + if q_emb: + chunk_rows = search_chunks_vector_sync(session, user_id, q_emb, top_k) + relevant_chunks = [ + { + "id": r["id"], + "content": r["content"], + "chunk_index": r["chunk_index"], + } + for r in chunk_rows + ] + else: + logger.warning( + "retrieve_evidence_bundle_sync empty_query_embedding user_id={}", + user_id, + ) + else: + logger.warning( + "retrieve_evidence_bundle_sync no_embedding_provider user_id={}", + user_id, + ) meta = fetch_evidence_metadata_sync(session, user_id, q, top_k) return { "relevant_chunks": relevant_chunks, @@ -233,7 +271,7 @@ async def retrieve_evidence_bundle_async( merged_chunk_dicts: list[dict], ) -> dict: """ - 异步路径:chunk 已由调用方 RRF 合并;此处只拼元数据。 + 异步路径:chunk 已由调用方(如 HybridRetriever)向量检索填入;此处只拼元数据。 merged_chunk_dicts: [{"id","content","chunk_index"}, ...] """ diff --git a/api/app/features/memory/extractor.py b/api/app/features/memory/extractor.py index f84274e..0f78b91 100644 --- a/api/app/features/memory/extractor.py +++ b/api/app/features/memory/extractor.py @@ -21,18 +21,39 @@ def _max_transcript_chars() -> int: return settings.memory_enrichment_max_chars -def extract_facts_from_transcript_sync(llm: Any, numbered_blocks: str) -> list[dict]: +def _facts_extraction_instructions(narrator_label: str) -> str: + return ( + "你是回忆录事实抽取助手。用户正在口述人生回忆,所有内容默认是**过去发生的事**," + "而非当前或未来计划(除非原文明确说「现在」「打算」「准备将要」等)。\n\n" + "## 抽取规则\n" + "1. subject 必须用明确的人名或固定称谓:\n" + f" - 叙述者本人统一用「{narrator_label}」\n" + " - 其他人用全名或稳定专名(如「王伟」),禁止用「他」「她」「我」「我们大伙」等代词作 subject;" + "若代词在上下文中可唯一解析为某人,则 subject 写该人姓名/专名\n" + "2. 事件、职务变动、地点迁移等一律按**过去回忆**理解;travel/调动/命令类表述勿写成「即将要做」" + "除非原文明确为未来时态\n" + "3. 若可推断大约年代或人生阶段,将 approximate_era 写入 object_json(与 value 等字段并存)," + '例如 "1990年代"、"2001年"、"退休后"、"30岁前后"\n' + "4. fact_type: person|event|relation|place|milestone\n" + "5. predicate:简短中文谓语(如「出生地」「担任职务」「调往」)\n" + "6. object_json:字符串或对象;可含 value、approximate_era 等\n" + "7. confidence 0..1;source_chunk_id 必须等于某段 [chunk_id=...] 中的 id\n\n" + '只输出 JSON:{"facts":[...]},无事实则 {"facts":[]}。\n\n' + ) + + +def extract_facts_from_transcript_sync( + llm: Any, + numbered_blocks: str, + *, + narrator_name: str | None = None, +) -> list[dict]: """同步:带 chunk_id 标记的文本 → 事实列表。""" if not llm or not (numbered_blocks or "").strip(): return [] text = numbered_blocks.strip()[: _max_transcript_chars()] - prompt = ( - "你是回忆录记忆抽取助手。阅读下列带 [chunk_id=...] 的文本块,抽取可核查的事实。\n" - "每个事实含 fact_type: person|event|relation|place|milestone;subject;predicate;" - "object_json(可为字符串或对象);confidence 0..1;source_chunk_id 必须等于某段的 chunk id。\n" - '只输出 JSON:{"facts":[...]},无事实则 {"facts":[]}。\n\n' - f"{text}" - ) + narrator_label = (narrator_name or "").strip() or "叙述者" + prompt = _facts_extraction_instructions(narrator_label) + text try: raw = invoke_json_object( llm, @@ -50,19 +71,17 @@ def extract_facts_from_transcript_sync(llm: Any, numbered_blocks: str) -> list[d async def extract_facts_from_transcript_async( - llm: Any, numbered_blocks: str + llm: Any, + numbered_blocks: str, + *, + narrator_name: str | None = None, ) -> list[dict]: """异步版。""" if not llm or not (numbered_blocks or "").strip(): return [] text = numbered_blocks.strip()[: _max_transcript_chars()] - prompt = ( - "你是回忆录记忆抽取助手。阅读下列带 [chunk_id=...] 的文本块,抽取可核查的事实。\n" - "每个事实含 fact_type: person|event|relation|place|milestone;subject;predicate;" - "object_json;confidence 0..1;source_chunk_id 必须等于某段的 chunk id。\n" - '只输出 JSON:{"facts":[...]},无事实则 {"facts":[]}。\n\n' - f"{text}" - ) + narrator_label = (narrator_name or "").strip() or "叙述者" + prompt = _facts_extraction_instructions(narrator_label) + text try: raw = await ainvoke_json_object( llm, @@ -81,11 +100,24 @@ async def extract_facts_from_transcript_async( async def extract_facts(chunk_text: str, *, user_id: str) -> list[dict]: """兼容旧接口:单块文本(无 chunk id 时传空 source_chunk_id)。""" + from app.core.db import AsyncSessionLocal from app.core.dependencies import get_llm_provider_fast + from app.features.user.models import User llm = get_llm_provider_fast().langchain_llm + narrator_name: str | None = None + try: + async with AsyncSessionLocal() as db: + u = await db.get(User, user_id) + if u and (u.nickname or "").strip(): + narrator_name = (u.nickname or "").strip() + except Exception: + pass + blocks = f"[chunk_id=null]\n{chunk_text}" - facts = await extract_facts_from_transcript_async(llm, blocks) + facts = await extract_facts_from_transcript_async( + llm, blocks, narrator_name=narrator_name + ) for f in facts: if f.get("source_chunk_id") in (None, "null", ""): f["source_chunk_id"] = None diff --git a/api/app/features/memory/models.py b/api/app/features/memory/models.py index 22c877b..9ebcdc1 100644 --- a/api/app/features/memory/models.py +++ b/api/app/features/memory/models.py @@ -10,7 +10,6 @@ from sqlalchemy import ( String, Text, ) -from sqlalchemy.dialects.postgresql import TSVECTOR as TSVector from sqlalchemy.orm import relationship from app.core.db import Base, utc_now @@ -46,8 +45,6 @@ class MemoryChunk(Base): content = Column(Text, nullable=False) # pgvector embedding — Alembic migration 负责 CREATE EXTENSION vector 及列类型 embedding = Column(pgvector_type, nullable=True) - # PostgreSQL FTS — Alembic migration 负责 generated tsvector 列 + GIN index - content_tsv = Column(TSVector, nullable=True) chunk_index = Column(Integer, nullable=False) speaker = Column(String, nullable=True) event_year = Column(Integer, nullable=True) diff --git a/api/app/features/memory/repo.py b/api/app/features/memory/repo.py index 62b3546..adc011b 100644 --- a/api/app/features/memory/repo.py +++ b/api/app/features/memory/repo.py @@ -1,7 +1,8 @@ """Memory repository — MemorySource, MemoryChunk, MemoryFact, TimelineEvent data access.""" import uuid -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone +from typing import TYPE_CHECKING from sqlalchemy import delete, literal, or_, select, text, tuple_, update from sqlalchemy.ext.asyncio import AsyncSession @@ -16,6 +17,9 @@ from app.features.memory.models import ( TimelineEvent, ) +if TYPE_CHECKING: + from app.ports.embedding import EmbeddingProvider + def _new_id() -> str: return str(uuid.uuid4()) @@ -105,16 +109,6 @@ async def create_chunk( return chunk -def update_chunk_fts_sync(session: Session, chunk_id: str) -> None: - """Populate content_tsv for FTS (sync). Caller must commit.""" - session.execute( - text( - "UPDATE memory_chunks SET content_tsv = to_tsvector('simple', content) WHERE id = :id" - ), - {"id": chunk_id}, - ) - - def update_chunk_embedding_sync( session: Session, chunk_id: str, embedding: list[float] ) -> None: @@ -133,39 +127,6 @@ async def update_chunk_embedding( chunk.embedding = embedding -async def update_chunk_fts(db: AsyncSession, chunk_id: str) -> None: - """Populate content_tsv for FTS. Caller must commit.""" - await db.execute( - text( - "UPDATE memory_chunks SET content_tsv = to_tsvector('simple', content) WHERE id = :id" - ), - {"id": chunk_id}, - ) - - -async def search_chunks_fts( - db: AsyncSession, user_id: str, query: str, limit: int = 20 -) -> list[dict]: - """FTS search on memory_chunks. Returns list of {id, content, chunk_index}.""" - if not query or not query.strip(): - return [] - q = query.strip() - stmt = text(""" - SELECT id, content, chunk_index - FROM memory_chunks - WHERE user_id = :user_id AND (is_excluded IS NOT TRUE OR is_excluded = false) - AND content_tsv IS NOT NULL AND content_tsv @@ plainto_tsquery('simple', :q) - ORDER BY ts_rank_cd(content_tsv, plainto_tsquery('simple', :q2)) DESC - LIMIT :lim - """) - result = await db.execute(stmt, {"user_id": user_id, "q": q, "q2": q, "lim": limit}) - rows = result.mappings().all() - return [ - {"id": r["id"], "content": r["content"], "chunk_index": r["chunk_index"]} - for r in rows - ] - - async def get_chunks_by_ids( db: AsyncSession, chunk_ids: list[str] ) -> list[MemoryChunk]: @@ -219,29 +180,6 @@ def get_timeline_events_for_user_sync( return list(session.execute(stmt).unique().scalars().all()) -def search_chunks_fts_sync( - session: Session, user_id: str, query: str, limit: int = 20 -) -> list[dict]: - """FTS on memory_chunks(sync,Celery)。""" - if not query or not query.strip(): - return [] - q = query.strip() - stmt = text(""" - SELECT id, content, chunk_index - FROM memory_chunks - WHERE user_id = :user_id AND (is_excluded IS NOT TRUE OR is_excluded = false) - AND content_tsv IS NOT NULL AND content_tsv @@ plainto_tsquery('simple', :q) - ORDER BY ts_rank_cd(content_tsv, plainto_tsquery('simple', :q2)) DESC - LIMIT :lim - """) - result = session.execute(stmt, {"user_id": user_id, "q": q, "q2": q, "lim": limit}) - rows = result.mappings().all() - return [ - {"id": r["id"], "content": r["content"], "chunk_index": r["chunk_index"]} - for r in rows - ] - - def search_facts_for_user_sync( session: Session, user_id: str, query: str, limit: int = 20 ) -> list[MemoryFact]: @@ -401,6 +339,49 @@ async def search_chunks_vector( ] +def search_chunks_vector_sync( + session: Session, user_id: str, query_embedding: list[float], limit: int = 20 +) -> list[dict]: + """pgvector 余弦距离检索(sync,Celery)。返回 {id, content, chunk_index, distance}。""" + if not query_embedding: + return [] + stmt = text(""" + SELECT id, content, chunk_index, + (embedding <=> CAST(:emb AS vector)) AS distance + FROM memory_chunks + WHERE user_id = :user_id AND (is_excluded IS NOT TRUE OR is_excluded = false) + AND embedding IS NOT NULL + ORDER BY embedding <=> CAST(:emb2 AS vector) + LIMIT :lim + """) + emb_str = "[" + ",".join(str(x) for x in query_embedding) + "]" + result = session.execute( + stmt, + {"user_id": user_id, "emb": emb_str, "emb2": emb_str, "lim": limit}, + ) + rows = result.mappings().all() + return [ + { + "id": r["id"], + "content": r["content"], + "chunk_index": r["chunk_index"], + "distance": float(r["distance"]), + } + for r in rows + ] + + +def list_users_with_recent_chunks_sync(session: Session, *, hours: int) -> list[str]: + """最近 N 小时内有新 chunk 的用户 id(Beat compaction 扫描)。""" + if hours < 1: + hours = 1 + cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) + stmt = ( + select(MemoryChunk.user_id).where(MemoryChunk.created_at >= cutoff).distinct() + ) + return list(session.execute(stmt).scalars().all()) + + def list_summaries_for_evidence_sync( session: Session, *, user_id: str, q: str, limit: int ) -> list[dict]: @@ -453,17 +434,27 @@ def list_summaries_for_evidence_sync( def retrieve_evidence_sync( - session: Session, user_id: str, query: str, *, top_k: int = 10 + session: Session, + user_id: str, + query: str, + *, + top_k: int = 10, + embedding_provider: "EmbeddingProvider | None" = None, ) -> dict: """ Sync evidence retrieval for Celery tasks. - 能力:**仅 FTS** 检索 chunks(与 `HybridRetriever` 的 FTS+向量 RRF 不同,见 - `api/docs/memory-retrieval.md`);facts/timeline 按 query ILIKE;fallback 见 repo。 + chunks:**向量**(pgvector)与异步 `HybridRetriever` 对齐;facts/timeline 按 query ILIKE。 """ from app.features.memory.evidence import retrieve_evidence_bundle_sync - return retrieve_evidence_bundle_sync(session, user_id, query, top_k=top_k) + return retrieve_evidence_bundle_sync( + session, + user_id, + query, + top_k=top_k, + embedding_provider=embedding_provider, + ) async def get_timeline_events_for_user( diff --git a/api/app/features/memory/retriever.py b/api/app/features/memory/retriever.py index 904fe2d..f398049 100644 --- a/api/app/features/memory/retriever.py +++ b/api/app/features/memory/retriever.py @@ -1,31 +1,17 @@ -"""Hybrid retriever — metadata filter + FTS + vector retrieval + score fusion.""" +"""Hybrid retriever — 向量检索 + 元数据证据包。""" from sqlalchemy.ext.asyncio import AsyncSession +from app.core.logging import get_logger from app.features.memory.evidence import retrieve_evidence_bundle_async -from app.features.memory.repo import search_chunks_fts, search_chunks_vector +from app.features.memory.repo import search_chunks_vector from app.ports.embedding import EmbeddingProvider - -def _rrf_merge( - fts_items: list[dict], vector_items: list[dict], k: int = 60 -) -> list[dict]: - """Reciprocal Rank Fusion. Merge FTS and vector results by id.""" - scores: dict[str, float] = {} - for rank, item in enumerate(fts_items): - cid = item["id"] - scores[cid] = scores.get(cid, 0) + 1 / (k + rank + 1) - for rank, item in enumerate(vector_items): - cid = item["id"] - scores[cid] = scores.get(cid, 0) + 1 / (k + rank + 1) - - all_items = {x["id"]: x for x in fts_items + vector_items} - sorted_ids = sorted(scores.keys(), key=lambda i: scores[i], reverse=True) - return [all_items[i] for i in sorted_ids] +logger = get_logger(__name__) class HybridRetriever: - """Combine FTS, vector, and metadata filter into evidence bundle.""" + """向量 chunk 检索 + facts/timeline/summaries/stories。""" def __init__( self, @@ -51,27 +37,27 @@ class HybridRetriever: ) q = query.strip() - fts_chunks = await search_chunks_fts( - self._db, user_id=user_id, query=query, limit=top_k * 2 - ) - - vector_chunks: list[dict] = [] - if self._embedding and q: + merged_chunk_dicts: list[dict] = [] + if self._embedding: q_emb = await self._embedding.embed_text(q) if q_emb: - vector_chunks = await search_chunks_vector( - self._db, user_id=user_id, query_embedding=q_emb, limit=top_k * 2 + vector_rows = await search_chunks_vector( + self._db, user_id, q_emb, limit=top_k ) - - merged = _rrf_merge(fts_chunks, vector_chunks)[:top_k] - merged_chunk_dicts = [ - { - "id": c["id"], - "content": c["content"], - "chunk_index": c.get("chunk_index", 0), - } - for c in merged - ] + merged_chunk_dicts = [ + { + "id": c["id"], + "content": c["content"], + "chunk_index": c.get("chunk_index", 0), + } + for c in vector_rows + ] + else: + logger.warning( + "HybridRetriever empty_query_embedding user_id={}", user_id + ) + else: + logger.warning("HybridRetriever no_embedding_provider user_id={}", user_id) return await retrieve_evidence_bundle_async( self._db, diff --git a/api/app/features/memory/service.py b/api/app/features/memory/service.py index b2e28a6..88f6337 100644 --- a/api/app/features/memory/service.py +++ b/api/app/features/memory/service.py @@ -1,16 +1,14 @@ """ MemoryService — conversation / memoir 的统一门面。 -- ingest_transcript: transcript -> memory_sources, chunks, embedding, FTS +- ingest_transcript: transcript -> memory_sources, chunks, embedding - ingest 后可选:LLM 富化(session/rolling 摘要、事实、时间线) -- retrieve: 委托 HybridRetriever 返回 evidence bundle(FTS + 可选向量 RRF) +- retrieve: 委托 HybridRetriever 返回 evidence bundle(向量 chunks) -Celery 侧使用 `ingest_transcript_sync` + `retrieve_evidence_sync`,与异步路径差异见 +Celery 侧使用 `ingest_transcript_sync` + `retrieve_evidence_sync`,与异步路径对齐见 `api/docs/memory-retrieval.md`。 """ -import asyncio - from sqlalchemy.ext.asyncio import AsyncSession from app.core.logging import get_logger @@ -23,7 +21,6 @@ from app.features.memory.repo import ( set_chunk_excluded, set_memory_fact_status, update_chunk_embedding, - update_chunk_fts, ) from app.ports.embedding import EmbeddingProvider @@ -45,7 +42,7 @@ class MemoryService: ) -> str: """ Ingest conversation transcript into memory. - Creates MemorySource, chunks, populates embedding + FTS. + Creates MemorySource, chunks, populates embedding. Returns source_id. """ if not transcript or not transcript.strip(): @@ -73,10 +70,6 @@ class MemoryService: await self._db.flush() - # FTS: populate content_tsv - for chunk_id, _ in chunk_records: - await update_chunk_fts(self._db, chunk_id) - # Embedding: 若有 provider 则写入 if self._embedding and chunk_records: texts = [c for _, c in chunk_records] @@ -186,7 +179,7 @@ def ingest_transcript_sync( ) -> str: """ Sync transcript ingest for Celery tasks. - Creates source + chunks + FTS, and best-effort populates embeddings. + Creates source + chunks, and best-effort populates embeddings. Returns source_id. """ from app.core.dependencies import get_embedding_provider @@ -195,7 +188,6 @@ def ingest_transcript_sync( create_chunk_sync, create_source_sync, update_chunk_embedding_sync, - update_chunk_fts_sync, ) if not transcript or not transcript.strip(): @@ -222,13 +214,12 @@ def ingest_transcript_sync( ) session.flush() chunk_records.append((chunk.id, content)) - update_chunk_fts_sync(session, chunk.id) try: embedding_provider = get_embedding_provider() if chunk_records and embedding_provider is not None: texts = [content for _, content in chunk_records] - embeddings = asyncio.run(embedding_provider.embed_texts(texts)) + embeddings = embedding_provider.embed_texts_sync(texts) for (chunk_id, _), emb in zip(chunk_records, embeddings): if emb: update_chunk_embedding_sync(session, chunk_id, emb) diff --git a/api/app/ports/embedding.py b/api/app/ports/embedding.py index c62f718..795c3c2 100644 --- a/api/app/ports/embedding.py +++ b/api/app/ports/embedding.py @@ -5,6 +5,10 @@ from typing import Protocol, runtime_checkable @runtime_checkable class EmbeddingProvider(Protocol): + def is_available(self) -> bool: + """进程内 embedding 已配置且可发起调用(无 key / 未初始化 client 时为 False)。""" + ... + async def embed_text(self, text: str) -> list[float]: """Embed a single text into a vector.""" ... @@ -12,3 +16,11 @@ class EmbeddingProvider(Protocol): async def embed_texts(self, texts: list[str]) -> list[list[float]]: """Embed multiple texts into vectors.""" ... + + def embed_text_sync(self, text: str) -> list[float]: + """同步嵌入单条文本(Celery / sync DB 会话)。""" + ... + + def embed_texts_sync(self, texts: list[str]) -> list[list[float]]: + """同步嵌入多条文本(Celery / sync DB 会话)。""" + ... diff --git a/api/app/tasks/celery_app.py b/api/app/tasks/celery_app.py index c72dd20..a4d1aa4 100644 --- a/api/app/tasks/celery_app.py +++ b/api/app/tasks/celery_app.py @@ -63,11 +63,9 @@ celery_app.conf.update( # 不设置自定义队列路由,使用 Celery 默认队列 ) -# 定时任务配置(如果需要) celery_app.conf.beat_schedule = { - # 示例:每小时清理过期会话 - # "cleanup-expired-sessions": { - # "task": "app.tasks.cleanup.cleanup_sessions", - # "schedule": 3600.0, - # }, + "memory-compaction-sweep": { + "task": "app.tasks.memory_compaction_tasks.memory_compaction_sweep", + "schedule": 6 * 3600.0, + }, } diff --git a/api/app/tasks/memory_compaction_tasks.py b/api/app/tasks/memory_compaction_tasks.py index 154208e..76c35fc 100644 --- a/api/app/tasks/memory_compaction_tasks.py +++ b/api/app/tasks/memory_compaction_tasks.py @@ -15,14 +15,33 @@ from app.core.memory_compaction_schedule import ( finalize_memory_compaction_run, read_debounce_deadline_ts, release_scheduler_gate, + schedule_memory_compaction_run, set_incremental_cursor_pair, ) from app.core.redis_lock import acquire_redis_lock, release_redis_lock from app.features.memory.compaction_service import run_memory_compaction_sync +from app.features.memory.repo import list_users_with_recent_chunks_sync logger = get_logger(__name__) +@shared_task +def memory_compaction_sweep() -> dict[str, Any]: + """Beat:为近期有记忆写入的用户调度 compaction(debounce 仍由 schedule 合并)。""" + if not settings.memory_compaction_enabled: + return {"skipped": True, "reason": "disabled"} + hours = int(settings.memory_compaction_sweep_recent_hours) + with get_sync_db() as session: + user_ids = list_users_with_recent_chunks_sync(session, hours=hours) + ctx_base: dict[str, Any] = {"trigger_source": "beat", "sweep_hours": hours} + for uid in user_ids: + schedule_memory_compaction_run(uid, dict(ctx_base)) + logger.info( + "memory_compaction_sweep hours={} scheduled_users={}", hours, len(user_ids) + ) + return {"scheduled": len(user_ids), "user_ids": user_ids} + + @shared_task(bind=True, max_retries=12, default_retry_delay=20) def memory_compaction_run( self, user_id: str, context: dict[str, Any] | None = None diff --git a/api/docker-compose.yml b/api/docker-compose.yml index eeba7b8..e5e2030 100644 --- a/api/docker-compose.yml +++ b/api/docker-compose.yml @@ -119,27 +119,30 @@ services: max-size: "10m" max-file: "3" - # Celery Beat(定时任务调度,可选) - # celery-beat: - # build: - # context: . - # dockerfile: Dockerfile - # image: life-echo-api:latest - # container_name: life-echo-celery-beat - # command: celery -A app.tasks.celery_app beat --loglevel=info - # env_file: - # - .env - # environment: - # - DATABASE_URL=postgresql://postgres:postgres@postgres:5432/life_echo - # - REDIS_URL=redis://redis:6379/0 - # restart: always - # depends_on: - # postgres: - # condition: service_healthy - # redis: - # condition: service_healthy - # networks: - # - life-echo-network + celery-beat: + build: + context: . + dockerfile: Dockerfile + image: life-echo-api:latest + container_name: life-echo-celery-beat + command: uv run celery -A app.tasks.celery_app beat --loglevel=info + env_file: + - .env + restart: always + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_healthy + celery-worker: + condition: service_started + networks: + - life-echo-network + logging: + driver: "json-file" + options: + max-size: "10m" + max-file: "3" # Flower(Celery 监控面板,可选) # flower: diff --git a/api/docs/memoir_reliability.md b/api/docs/memoir_reliability.md index 583e9ab..17bba68 100644 --- a/api/docs/memoir_reliability.md +++ b/api/docs/memoir_reliability.md @@ -15,7 +15,8 @@ This document summarizes production-oriented behavior for the memoir narrative p | `memoir_fidelity_fail_open_on_parse_error` | `False` | When `True`, fidelity JSON/LLM failures pass the gate even for new stories (rollback only via ops need). | | `memoir_narrative_evidence_overlap_min_chars` | `14` | Deterministic overlap check between body and evidence plain text. | | `memoir_title_slots_require_body_or_oral_match` | `True` | Narrows title-generation slot inputs to body/oral overlap. | -| `memory_fact_search_use_recent_fallback` | `False` | When `False`, fact FTS misses do **not** fall back to “recent confirmed facts” (reduces contradictory/unrelated facts in prompts). | +| `memory_fact_search_use_recent_fallback` | `False` | When `False`, fact ILIKE misses do **not** fall back to “recent confirmed facts” (reduces contradictory/unrelated facts in prompts). | +| `memory_compaction_enabled` | `True` | Near-duplicate chunk soft-exclude; requires Celery worker + **Beat** for periodic `memory_compaction_sweep`. | | `memoir_recompose_retry_on_lock_contention` | `True` | Chapter recompose retries with backoff when the chapter pipeline lock is held. | | `memoir_phase2_singleflight_immediate` | `True` | Immediate Phase 2 `send_task` uses a stable `task_id` per user/category to reduce duplicate queue entries. | | `chapter_pipeline_lock_ttl_seconds` | `360` | Shared lock TTL for Phase 2 and `recompose_chapter`; tune with longest expected runtimes. | diff --git a/api/docs/memory-retrieval.md b/api/docs/memory-retrieval.md index 7401df4..a0ffd2c 100644 --- a/api/docs/memory-retrieval.md +++ b/api/docs/memory-retrieval.md @@ -4,32 +4,31 @@ | 路径 | 入口 | 检索能力 | |------|------|----------| -| **异步(HTTP / MemoirService) | `MemoryService.retrieve` → `HybridRetriever` → `evidence.retrieve_evidence_bundle_async` | **FTS + 向量(pgvector)**,RRF 融合 chunks;facts / timeline 按 **query ILIKE**,无命中则 **fallback** 最近条;rolling + ILIKE **摘要**;**stories**(标题/摘要匹配) | -| **同步(Celery) | `retrieve_evidence_sync`(`repo` 薄封装 → `evidence.retrieve_evidence_bundle_sync`) | **仅 FTS** chunks;其余与上类似(无向量) | +| **异步(HTTP / MemoirService)** | `MemoryService.retrieve` → `HybridRetriever` → `evidence.retrieve_evidence_bundle_async` | **向量(pgvector)** chunks;facts / timeline 按 **query ILIKE**,无命中则 **fallback** 最近条;rolling + ILIKE **摘要**;**stories**(标题/摘要匹配) | +| **同步(Celery)** | `retrieve_evidence_sync`(注入 `get_embedding_provider()` → `evidence.retrieve_evidence_bundle_sync`) | **向量** chunks + 同上元数据;与异步路径对齐 | -证据组装在 `app/features/memory/evidence.py`;`memory/repo` 仅提供原子查询(chunk FTS、facts/timeline 搜索、摘要列表等),story 合并在 evidence 层完成。 +证据组装在 `app/features/memory/evidence.py`;`memory/repo` 提供原子查询(chunk 向量、facts/timeline 搜索、摘要列表等),story 合并在 evidence 层完成。 -## 为何 Celery 与 Hybrid 不完全一致 +## 依赖 embedding -- `ingest_transcript_sync` 仅写入 chunk + **FTS**,**跳过 embedding**(见 `MemoryService.ingest_transcript_sync` 注释),与异步 ingest 行为对齐策略不同。 -- 在 worker 中补齐同步向量检索需注入 `EmbeddingProvider` 与同会话查询,成本与可用性需单独评估。 - -业务上应假设:**线上章节生成任务**以 FTS 证据为主;**异步 API** 若配置了 embedding,检索语义更富。 +- 未配置 `ZHIPU_API_KEY`(或 provider `_client` 为空)时,chunk 检索为空列表,仍会返回 facts/timeline/summaries/stories(按 query ILIKE)。 +- 日志:`HybridRetriever` / `retrieve_evidence_bundle_sync` 在无 provider 或空向量时会打 warning。 ## 空 query - 默认:`relevant_*` 均为空(与历史行为一致)。 -- 若设置 `memory_evidence_empty_query_include_rolling=true`:返回**无 chunk FTS**,但含 **rolling 摘要**、最近 facts / timeline(用于「浏览」模式)。 +- 若设置 `memory_evidence_empty_query_include_rolling=true`:返回**无 chunk**,但含 **rolling 摘要**、最近 facts / timeline(用于「浏览」模式)。 ## 富化(ingest 后 LLM) - `memory_enrichment_enabled`(默认 `true`):`ingest_transcript` / `ingest_transcript_sync` 后执行摘要、事实、时间线;`false` 时跳过。 - `memory_enrichment_max_chars`:截断送入 LLM 的文本长度。 - 同一 `memory_source_id` 的时间线在重跑富化前会先删后插入,避免重复事件。 +- Ingest 写入 **embedding**(best-effort);历史 FTS 列 `content_tsv` 已由迁移 `0007_drop_chunk_content_tsv` 删除。 ## Celery 任务中的顺序 -`process_memoir_segments`(`app/tasks/memoir_tasks.py`)在**同一任务**内先执行 `ingest_transcript_sync`(并 `commit`),再执行 `MemoirOrchestrator` 与 `run_story_pipeline_for_category_batch`。因此 `retrieve_evidence_sync` 能看到**本批刚写入**的 memory chunks(无竞态)。 +`process_memoir_segments`(`app/tasks/memoir_tasks.py`)在**同一任务**内先执行 `ingest_transcript_sync`(并 `commit`),再执行 `MemoirOrchestrator` 与 `run_story_pipeline_for_category_batch`。因此 `retrieve_evidence_sync` 能看到**本批刚写入**的 memory chunks(无竞态),前提是 embedding API 已成功写入向量。 章节分类上,若模型返回 **none** 或命中零散档案启发式,Story 侧会统一落入 **`summary` 章节**并继续叙事落库,与「本批 transcript 已进 memory」一致,避免误以为内容被丢弃。 diff --git a/api/tests/test_memory_compaction.py b/api/tests/test_memory_compaction.py index a944f04..0535711 100644 --- a/api/tests/test_memory_compaction.py +++ b/api/tests/test_memory_compaction.py @@ -19,7 +19,10 @@ from app.features.memory.compaction_service import ( text_layer_match, ) from app.features.memory.service import ingest_transcript_sync -from app.tasks.memory_compaction_tasks import memory_compaction_run +from app.tasks.memory_compaction_tasks import ( + memory_compaction_run, + memory_compaction_sweep, +) class FakeRedis: @@ -197,12 +200,17 @@ def test_ingest_transcript_sync_populates_embeddings(monkeypatch) -> None: self.commit_calls += 1 class FakeEmbeddingProvider: + def is_available(self) -> bool: + return True + async def embed_texts(self, texts: list[str]) -> list[list[float]]: return [[float(i)] for i, _ in enumerate(texts, start=1)] + def embed_texts_sync(self, texts: list[str]) -> list[list[float]]: + return [[float(i)] for i, _ in enumerate(texts, start=1)] + fake_session = FakeSession() embedded: list[tuple[str, list[float]]] = [] - fts_updated: list[str] = [] monkeypatch.setattr(settings, "memory_enrichment_enabled", False) monkeypatch.setattr( @@ -221,10 +229,6 @@ def test_ingest_transcript_sync_populates_embeddings(monkeypatch) -> None: "app.features.memory.repo.create_chunk_sync", lambda *args, **kwargs: SimpleNamespace(id=f"chunk-{kwargs['chunk_index']}"), ) - monkeypatch.setattr( - "app.features.memory.repo.update_chunk_fts_sync", - lambda session, chunk_id: fts_updated.append(chunk_id), - ) monkeypatch.setattr( "app.features.memory.repo.update_chunk_embedding_sync", lambda session, chunk_id, embedding: embedded.append((chunk_id, embedding)), @@ -239,7 +243,6 @@ def test_ingest_transcript_sync_populates_embeddings(monkeypatch) -> None: assert source_id == "src-1" assert [chunk_id for chunk_id, _ in embedded] == ["chunk-0", "chunk-1"] - assert fts_updated == ["chunk-0", "chunk-1"] assert fake_session.commit_calls == 1 @@ -442,3 +445,53 @@ def test_memory_compaction_run_releases_gate_and_retries_on_failure( assert "retry:RuntimeError" in events assert "release_lock" in events assert events.index("release_gate") < events.index("retry:RuntimeError") + + +def test_memory_compaction_sweep_skipped_when_disabled( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr(settings, "memory_compaction_enabled", False) + out = memory_compaction_sweep() + assert out == {"skipped": True, "reason": "disabled"} + + +def test_memory_compaction_sweep_schedules_recent_users( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr(settings, "memory_compaction_enabled", True) + monkeypatch.setattr(settings, "memory_compaction_sweep_recent_hours", 24) + scheduled: list[tuple[str, dict]] = [] + + class _DbCtx: + def __enter__(self): + return object() + + def __exit__(self, *args): + return None + + monkeypatch.setattr( + "app.tasks.memory_compaction_tasks.get_sync_db", + lambda: _DbCtx(), + ) + + def fake_list(session, *, hours): + assert hours == 24 + return ["user-a", "user-b"] + + monkeypatch.setattr( + "app.tasks.memory_compaction_tasks.list_users_with_recent_chunks_sync", + fake_list, + ) + + monkeypatch.setattr( + "app.tasks.memory_compaction_tasks.schedule_memory_compaction_run", + lambda uid, ctx: scheduled.append((uid, dict(ctx))), + ) + + out = memory_compaction_sweep() + assert out["scheduled"] == 2 + assert set(out["user_ids"]) == {"user-a", "user-b"} + assert {u for u, _ in scheduled} == {"user-a", "user-b"} + for _, ctx in scheduled: + assert ctx.get("trigger_source") == "beat" + assert ctx.get("sweep_hours") == 24 diff --git a/api/tests/test_memory_evidence.py b/api/tests/test_memory_evidence.py index aa0c1a5..c7bb452 100644 --- a/api/tests/test_memory_evidence.py +++ b/api/tests/test_memory_evidence.py @@ -1,14 +1,69 @@ """Memory evidence 组装与检索契约(纯函数 / 无 DB)。""" +import pytest + +from app.features.memory import evidence as evidence_mod from app.features.memory.evidence import ( EMPTY_EVIDENCE_BUNDLE, _facts_to_dicts, _stories_to_dicts, _timeline_to_dicts, + retrieve_evidence_bundle_sync, ) from app.features.memory.schemas import EvidenceBundle +class _FakeEmbedding: + def is_available(self) -> bool: + return True + + def embed_text_sync(self, text: str) -> list[float]: + return [0.25, 0.5, 0.75] + + +def test_retrieve_evidence_bundle_sync_uses_vector_search( + monkeypatch: pytest.MonkeyPatch, +) -> None: + searched: list[tuple] = [] + + def fake_search(session, user_id, emb, top_k): + searched.append((user_id, emb, top_k)) + return [ + { + "id": "c1", + "content": "chunk body", + "chunk_index": 0, + "distance": 0.1, + } + ] + + def fake_meta(session, user_id, q, top_k): + return { + "relevant_facts": [], + "timeline_hints": [], + "relevant_summaries": [], + "relevant_stories": [], + } + + monkeypatch.setattr(evidence_mod, "search_chunks_vector_sync", fake_search) + monkeypatch.setattr(evidence_mod, "fetch_evidence_metadata_sync", fake_meta) + + out = retrieve_evidence_bundle_sync( + session=object(), + user_id="u1", + query=" hello ", + top_k=7, + embedding_provider=_FakeEmbedding(), + ) + assert len(searched) == 1 + assert searched[0][0] == "u1" + assert searched[0][1] == [0.25, 0.5, 0.75] + assert searched[0][2] == 7 + assert out["relevant_chunks"] == [ + {"id": "c1", "content": "chunk body", "chunk_index": 0}, + ] + + def test_empty_evidence_bundle_keys() -> None: assert set(EMPTY_EVIDENCE_BUNDLE.keys()) == { "relevant_chunks",