本次 squash merge 将 codex-story-first-image-intent 的整体改动合入 development,核心内容包括: 1. 后端数据与迁移:新增 stories、story_versions、story_image_intents、chapter_cover_intents、assets 等模型与 Alembic 迁移,建立 story-first、markdown-first、asset-first 的主数据链路。 2. 生成与任务链:引入 StoryBuilderOrchestrator、ChapterComposerOrchestrator、story_image_tasks、chapter_cover_tasks,图片生成从正文占位符改为结构化 intent -> asset -> markdown 回填。 3. 并发与一致性:为 story/chapter intent 增加 claim_token、claimed_at、attempt_count,采用数据库原子 claim 为主、Redis 锁为辅,避免重复生成、锁误删和 processing 卡死。 4. Memoir 读写路径:章节 canonical_markdown 成为正文真源,列表/详情接口补齐 markdown、cover_asset、word_count 等字段,PDF 与 asset 解析链路同步升级。 5. Memory / Retrieval:扩展 transcript ingest、chunking、evidence 检索与 story 聚合基础设施,为后续 story-first RAG 与多 agent 编排提供底座。 6. App 端体验:章节页继续走 MarkdownRenderer 阅读链,同时吸收 fix3-19 的跨平台 UI glitch 修复;更新对话页、首页、文案资源与章节列表映射逻辑。 7. 测试与文档:补充 asset resolver、story image task、章节封面派发、markdown 映射等回归测试,并加入图片占位符退役设计文档。
130 lines
3.8 KiB
Python
130 lines
3.8 KiB
Python
"""
|
|
MemoryService — conversation / memoir 的统一门面。
|
|
|
|
- ingest_transcript: transcript -> memory_sources, chunks, embedding, FTS
|
|
- retrieve: 委托 HybridRetriever 返回 evidence bundle
|
|
"""
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.features.memory.chunker import chunk_transcript
|
|
from app.features.memory.repo import (
|
|
create_chunk,
|
|
create_source,
|
|
update_chunk_embedding,
|
|
update_chunk_fts,
|
|
)
|
|
from app.ports.embedding import EmbeddingProvider
|
|
|
|
|
|
class MemoryService:
|
|
def __init__(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
embedding_provider: EmbeddingProvider | None = None,
|
|
):
|
|
self._db = db
|
|
self._embedding = embedding_provider
|
|
|
|
async def ingest_transcript(
|
|
self, user_id: str, conversation_id: str, transcript: str
|
|
) -> str:
|
|
"""
|
|
Ingest conversation transcript into memory.
|
|
Creates MemorySource, chunks, populates embedding + FTS.
|
|
Returns source_id.
|
|
"""
|
|
if not transcript or not transcript.strip():
|
|
raise ValueError("transcript cannot be empty")
|
|
|
|
source = await create_source(
|
|
self._db,
|
|
user_id=user_id,
|
|
source_type="transcript",
|
|
raw_text=transcript.strip(),
|
|
conversation_id=conversation_id,
|
|
)
|
|
|
|
chunks_text = chunk_transcript(transcript.strip())
|
|
chunk_records = []
|
|
for i, content in enumerate(chunks_text):
|
|
chunk = await create_chunk(
|
|
self._db,
|
|
source_id=source.id,
|
|
user_id=user_id,
|
|
content=content,
|
|
chunk_index=i,
|
|
)
|
|
chunk_records.append((chunk.id, content))
|
|
|
|
await self._db.flush()
|
|
|
|
# FTS: populate content_tsv
|
|
for chunk_id, _ in chunk_records:
|
|
await update_chunk_fts(self._db, chunk_id)
|
|
|
|
# Embedding: 若有 provider 则写入
|
|
if self._embedding and chunk_records:
|
|
texts = [c for _, c in chunk_records]
|
|
embeddings = await self._embedding.embed_texts(texts)
|
|
for (chunk_id, _), emb in zip(chunk_records, embeddings):
|
|
if emb:
|
|
await update_chunk_embedding(self._db, chunk_id, emb)
|
|
|
|
await self._db.commit()
|
|
return source.id
|
|
|
|
async def retrieve(self, user_id: str, query: str, *, top_k: int = 10) -> dict:
|
|
"""Retrieve relevant evidence. 委托 HybridRetriever。"""
|
|
from app.features.memory.retriever import HybridRetriever
|
|
|
|
retriever = HybridRetriever(self._db, embedding_provider=self._embedding)
|
|
return await retriever.retrieve(user_id=user_id, query=query, top_k=top_k)
|
|
|
|
|
|
def ingest_transcript_sync(
|
|
session,
|
|
user_id: str,
|
|
conversation_id: str,
|
|
transcript: str,
|
|
) -> str:
|
|
"""
|
|
Sync transcript ingest for Celery tasks.
|
|
Creates source + chunks + FTS. Skips embedding (async).
|
|
Returns source_id.
|
|
"""
|
|
from app.features.memory.chunker import chunk_transcript
|
|
from app.features.memory.repo import (
|
|
create_chunk_sync,
|
|
create_source_sync,
|
|
update_chunk_fts_sync,
|
|
)
|
|
|
|
if not transcript or not transcript.strip():
|
|
raise ValueError("transcript cannot be empty")
|
|
|
|
source = create_source_sync(
|
|
session,
|
|
user_id=user_id,
|
|
source_type="transcript",
|
|
raw_text=transcript.strip(),
|
|
conversation_id=conversation_id,
|
|
)
|
|
session.flush()
|
|
|
|
chunks_text = chunk_transcript(transcript.strip())
|
|
for i, content in enumerate(chunks_text):
|
|
chunk = create_chunk_sync(
|
|
session,
|
|
source_id=source.id,
|
|
user_id=user_id,
|
|
content=content,
|
|
chunk_index=i,
|
|
)
|
|
session.flush()
|
|
update_chunk_fts_sync(session, chunk.id)
|
|
|
|
session.commit()
|
|
return source.id
|