Files
life-echo/api/app/features/conversation/ws/connection_manager.py
Sully 53e0065e3e refactor(api): TOML 配置 SSOT、统一错误契约、Auth/事务加固与可观测性 (#33)
配置 SSOT(TOML + .env)
统一错误契约
Auth 与事务边界
Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client
可观测性(OpenTelemetry + LGTM)
2026-05-22 13:44:50 +08:00

51 lines
1.7 KiB
Python

"""WebSocket 连接管理器:仅负责连接注册/注销和消息收发"""
from typing import Dict
from fastapi import WebSocket
from app.core.errors import NotFoundError
from app.core.logging import get_logger
logger = get_logger(__name__)
class ConnectionManager:
"""WebSocket 连接管理器"""
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
async def connect(self, websocket: WebSocket, conversation_id: str):
"""建立连接"""
await websocket.accept()
self.active_connections[conversation_id] = websocket
async def disconnect(self, conversation_id: str):
"""断开连接"""
if conversation_id in self.active_connections:
del self.active_connections[conversation_id]
async def send_message(self, conversation_id: str, message: dict):
"""发送消息"""
if conversation_id in self.active_connections:
websocket = self.active_connections[conversation_id]
try:
await websocket.send_json(message)
except (RuntimeError, Exception) as e:
logger.warning(
"发送消息失败 (conversation_id={}): {}", conversation_id, e
)
if conversation_id in self.active_connections:
del self.active_connections[conversation_id]
async def receive_message(self, conversation_id: str) -> dict:
"""接收消息"""
if conversation_id in self.active_connections:
websocket = self.active_connections[conversation_id]
return await websocket.receive_json()
raise NotFoundError("Connection not found")
manager = ConnectionManager()