Files
life-echo/api/app/features/memory/service.py
Sully 53e0065e3e refactor(api): TOML 配置 SSOT、统一错误契约、Auth/事务加固与可观测性 (#33)
配置 SSOT(TOML + .env)
统一错误契约
Auth 与事务边界
Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client
可观测性(OpenTelemetry + LGTM)
2026-05-22 13:44:50 +08:00

184 lines
6.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
MemoryService — conversation / memoir / eval 的唯一 memory 门面。
所有运行链路通过 async service 进入 ingest、retrieve、enrichment 与 compaction
Celery task 只能作为同步入口包装 async service不再维护 sync memory 双轨。
"""
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.db import transactional
from app.core.logging import get_logger
from app.features.memory.embedding_service import MemoryEmbeddingService
from app.features.memory.ingest_service import MemoryIngestService
from app.features.memory.repo import (
create_curation_action,
mark_facts_stale_for_excluded_chunk,
set_chunk_excluded,
set_memory_fact_status,
)
from app.features.memory.retrieval_service import MemoryRetrievalService
from app.features.memory.schemas import EvidenceBundle
from app.ports.embedding import EmbeddingProvider
logger = get_logger(__name__)
class MemoryService:
def __init__(
self,
db: AsyncSession,
*,
embedding_provider: EmbeddingProvider | None = None,
):
self._db = db
self._embedding = embedding_provider
async def ingest_transcript(
self,
user_id: str,
conversation_id: str,
transcript: str,
*,
lineage_json: dict | None = None,
) -> str:
"""
Ingest conversation transcript into memory.
Creates MemorySource, chunks, populates embedding.
Returns source_id.
"""
service = MemoryIngestService(self._db, embedding_provider=self._embedding)
return await service.ingest_transcript(
user_id,
conversation_id,
transcript,
lineage_json=lineage_json,
)
async def ingest_transcripts_batch(
self,
user_id: str,
items: list[tuple[str, str, dict | None, str | None]],
*,
memoir_correlation_id: str | None = None,
) -> list[str]:
"""Batch ingest transcripts; returns created MemorySource ids."""
service = MemoryIngestService(self._db, embedding_provider=self._embedding)
return await service.ingest_transcripts_batch(
user_id,
items,
memoir_correlation_id=memoir_correlation_id,
)
async def retrieve(
self, user_id: str, query: str, *, top_k: int = 10
) -> EvidenceBundle:
"""Retrieve relevant evidence. 委托 HybridRetriever。"""
service = MemoryRetrievalService(self._db, embedding_provider=self._embedding)
return await service.retrieve(user_id, query, top_k=top_k)
async def enrich_source(self, user_id: str, source_id: str, *, llm=None) -> None:
"""Run post-ingest enrichment through the async memory path."""
from app.features.memory.enrichment import enrich_memory_after_ingest_async
await enrich_memory_after_ingest_async(
self._db,
user_id,
source_id,
llm=llm,
raise_on_failure=True,
)
async def embed_source(
self,
user_id: str,
source_id: str,
*,
raise_on_failure: bool = False,
) -> dict:
"""Embed persisted memory chunks and update embedding status."""
service = MemoryEmbeddingService(self._db, embedding_provider=self._embedding)
return await service.embed_source(
user_id,
source_id,
raise_on_failure=raise_on_failure,
)
async def compact_user(self, user_id: str, context: dict | None = None) -> dict:
"""Run near-duplicate compaction through the async memory path."""
from app.features.memory.compaction_service import run_memory_compaction
return await run_memory_compaction(self._db, user_id, context)
async def exclude_chunk(
self, user_id: str, chunk_id: str, *, reason: str = ""
) -> bool:
async with transactional(self._db):
ok = await set_chunk_excluded(self._db, chunk_id, user_id, True)
if not ok:
return False
stale_count = await mark_facts_stale_for_excluded_chunk(
self._db,
user_id=user_id,
chunk_id=chunk_id,
)
await create_curation_action(
self._db,
user_id=user_id,
action_type="exclude",
target_type="chunk",
target_id=chunk_id,
details={
**({"reason": reason} if reason else {}),
"staled_fact_count": stale_count,
},
)
return True
async def restore_chunk(self, user_id: str, chunk_id: str) -> bool:
async with transactional(self._db):
ok = await set_chunk_excluded(self._db, chunk_id, user_id, False)
if not ok:
return False
await create_curation_action(
self._db,
user_id=user_id,
action_type="restore",
target_type="chunk",
target_id=chunk_id,
details={"fact_restore_policy": "requires_reenrichment"},
)
return True
async def confirm_fact(self, user_id: str, fact_id: str) -> bool:
async with transactional(self._db):
ok = await set_memory_fact_status(self._db, fact_id, user_id, "confirmed")
if not ok:
return False
await create_curation_action(
self._db,
user_id=user_id,
action_type="confirm",
target_type="fact",
target_id=fact_id,
details=None,
)
return True
async def reject_fact(
self, user_id: str, fact_id: str, *, reason: str = ""
) -> bool:
async with transactional(self._db):
ok = await set_memory_fact_status(self._db, fact_id, user_id, "rejected")
if not ok:
return False
await create_curation_action(
self._db,
user_id=user_id,
action_type="reject",
target_type="fact",
target_id=fact_id,
details={"reason": reason} if reason else None,
)
return True