Files
life-echo/api/app/features/memory/retrieval_service.py
2026-04-30 16:22:55 +08:00

55 lines
1.6 KiB
Python

"""Memory retrieval service boundary."""
from __future__ import annotations
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.logging import get_logger
from app.features.memory.retriever import HybridRetriever
from app.features.memory.schemas import EvidenceBundle
from app.ports.embedding import EmbeddingProvider
logger = get_logger(__name__)
class MemoryRetrievalService:
"""Retrieves typed evidence bundles for downstream consumers."""
def __init__(
self,
db: AsyncSession,
*,
embedding_provider: EmbeddingProvider | None = None,
) -> None:
self._db = db
self._embedding = embedding_provider
async def retrieve(
self,
user_id: str,
query: str,
*,
top_k: int = 10,
) -> EvidenceBundle:
retriever = HybridRetriever(self._db, embedding_provider=self._embedding)
raw = await retriever.retrieve(user_id=user_id, query=query, top_k=top_k)
bundle = EvidenceBundle.model_validate(raw)
bd = bundle.model_dump()
vec_ok = self._embedding.is_available() if self._embedding else False
logger.info(
"event=memory_retrieve_done user_id={} query_len={} top_k={} "
"chunks={} facts={} summaries={} stories={} vector_ok={}",
user_id,
len((query or "").strip()),
top_k,
len(bd.get("relevant_chunks") or []),
len(bd.get("relevant_facts") or []),
len(bd.get("relevant_summaries") or []),
len(bd.get("relevant_stories") or []),
vec_ok,
)
return bundle
__all__ = ["MemoryRetrievalService"]