配置 SSOT(TOML + .env) 统一错误契约 Auth 与事务边界 Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client 可观测性(OpenTelemetry + LGTM)
56 lines
1.8 KiB
Python
56 lines
1.8 KiB
Python
"""WS pipeline 分段持久化(语音分段、按需 TTS 等独立 commit 场景)。
|
||
|
||
Each helper here opens its own ``transactional()`` on the long-lived WS
|
||
``AsyncSession`` (see ``app.core.db``): multiple commits per connection are
|
||
intentional for incremental durability.
|
||
|
||
``ConversationHistoryStore`` handles whole human/AI turn writes and refreshes
|
||
Redis after those commits. Pipeline code may interleave ``persist.py`` segment
|
||
commits with turn-level ``history_store`` writes; readers should treat DB as
|
||
authoritative when cache and DB diverge briefly.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from datetime import datetime, timezone
|
||
from typing import Optional
|
||
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from app.core.db import transactional
|
||
from app.features.conversation.models import Conversation, ConversationMessage, Segment
|
||
|
||
|
||
def mark_conversation_active(
|
||
conversation: Conversation, at: Optional[datetime] = None
|
||
) -> datetime:
|
||
activity_time = at or datetime.now(timezone.utc)
|
||
conversation.last_message_at = activity_time
|
||
return activity_time
|
||
|
||
|
||
async def persist_message_tts_url_segment(
|
||
db: AsyncSession,
|
||
msg: ConversationMessage,
|
||
segment_index: int,
|
||
url_stored: str,
|
||
) -> None:
|
||
"""按需 TTS:写入 message.tts_audio_urls[segment_index] 并提交。"""
|
||
urls = list(msg.tts_audio_urls or [])
|
||
while len(urls) <= segment_index:
|
||
urls.append("")
|
||
urls[segment_index] = url_stored
|
||
async with transactional(db):
|
||
msg.tts_audio_urls = urls
|
||
|
||
|
||
async def persist_voice_segment_row(
|
||
db: AsyncSession,
|
||
segment: Segment,
|
||
conversation: Conversation,
|
||
) -> None:
|
||
"""语音分段入库并刷新会话活跃时间。"""
|
||
async with transactional(db):
|
||
db.add(segment)
|
||
mark_conversation_active(conversation)
|