"""WS pipeline 分段持久化(语音分段、按需 TTS 等独立 commit 场景)。 Each helper here opens its own ``transactional()`` on the long-lived WS ``AsyncSession`` (see ``app.core.db``): multiple commits per connection are intentional for incremental durability. ``ConversationHistoryStore`` handles whole human/AI turn writes and refreshes Redis after those commits. Pipeline code may interleave ``persist.py`` segment commits with turn-level ``history_store`` writes; readers should treat DB as authoritative when cache and DB diverge briefly. """ from __future__ import annotations from datetime import datetime, timezone from typing import Optional from sqlalchemy.ext.asyncio import AsyncSession from app.core.db import transactional from app.features.conversation.models import Conversation, ConversationMessage, Segment def mark_conversation_active( conversation: Conversation, at: Optional[datetime] = None ) -> datetime: activity_time = at or datetime.now(timezone.utc) conversation.last_message_at = activity_time return activity_time async def persist_message_tts_url_segment( db: AsyncSession, msg: ConversationMessage, segment_index: int, url_stored: str, ) -> None: """按需 TTS:写入 message.tts_audio_urls[segment_index] 并提交。""" urls = list(msg.tts_audio_urls or []) while len(urls) <= segment_index: urls.append("") urls[segment_index] = url_stored async with transactional(db): msg.tts_audio_urls = urls async def persist_voice_segment_row( db: AsyncSession, segment: Segment, conversation: Conversation, ) -> None: """语音分段入库并刷新会话活跃时间。""" async with transactional(db): db.add(segment) mark_conversation_active(conversation)