Files
life-echo/api/app/features/conversation/ws/persist.py
Sully 53e0065e3e refactor(api): TOML 配置 SSOT、统一错误契约、Auth/事务加固与可观测性 (#33)
配置 SSOT(TOML + .env)
统一错误契约
Auth 与事务边界
Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client
可观测性(OpenTelemetry + LGTM)
2026-05-22 13:44:50 +08:00

56 lines
1.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""WS pipeline 分段持久化(语音分段、按需 TTS 等独立 commit 场景)。
Each helper here opens its own ``transactional()`` on the long-lived WS
``AsyncSession`` (see ``app.core.db``): multiple commits per connection are
intentional for incremental durability.
``ConversationHistoryStore`` handles whole human/AI turn writes and refreshes
Redis after those commits. Pipeline code may interleave ``persist.py`` segment
commits with turn-level ``history_store`` writes; readers should treat DB as
authoritative when cache and DB diverge briefly.
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Optional
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.db import transactional
from app.features.conversation.models import Conversation, ConversationMessage, Segment
def mark_conversation_active(
conversation: Conversation, at: Optional[datetime] = None
) -> datetime:
activity_time = at or datetime.now(timezone.utc)
conversation.last_message_at = activity_time
return activity_time
async def persist_message_tts_url_segment(
db: AsyncSession,
msg: ConversationMessage,
segment_index: int,
url_stored: str,
) -> None:
"""按需 TTS写入 message.tts_audio_urls[segment_index] 并提交。"""
urls = list(msg.tts_audio_urls or [])
while len(urls) <= segment_index:
urls.append("")
urls[segment_index] = url_stored
async with transactional(db):
msg.tts_audio_urls = urls
async def persist_voice_segment_row(
db: AsyncSession,
segment: Segment,
conversation: Conversation,
) -> None:
"""语音分段入库并刷新会话活跃时间。"""
async with transactional(db):
db.add(segment)
mark_conversation_active(conversation)