Files

56 lines
1.8 KiB
Python
Raw Permalink Normal View History

"""WS pipeline 分段持久化(语音分段、按需 TTS 等独立 commit 场景)。
Each helper here opens its own ``transactional()`` on the long-lived WS
``AsyncSession`` (see ``app.core.db``): multiple commits per connection are
intentional for incremental durability.
``ConversationHistoryStore`` handles whole human/AI turn writes and refreshes
Redis after those commits. Pipeline code may interleave ``persist.py`` segment
commits with turn-level ``history_store`` writes; readers should treat DB as
authoritative when cache and DB diverge briefly.
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Optional
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.db import transactional
from app.features.conversation.models import Conversation, ConversationMessage, Segment
def mark_conversation_active(
conversation: Conversation, at: Optional[datetime] = None
) -> datetime:
activity_time = at or datetime.now(timezone.utc)
conversation.last_message_at = activity_time
return activity_time
async def persist_message_tts_url_segment(
db: AsyncSession,
msg: ConversationMessage,
segment_index: int,
url_stored: str,
) -> None:
"""按需 TTS写入 message.tts_audio_urls[segment_index] 并提交。"""
urls = list(msg.tts_audio_urls or [])
while len(urls) <= segment_index:
urls.append("")
urls[segment_index] = url_stored
async with transactional(db):
msg.tts_audio_urls = urls
async def persist_voice_segment_row(
db: AsyncSession,
segment: Segment,
conversation: Conversation,
) -> None:
"""语音分段入库并刷新会话活跃时间。"""
async with transactional(db):
db.add(segment)
mark_conversation_active(conversation)