Files
life-echo/app-expo/src/core/ws/client.ts
Sully cb84c00eca 添加staging release workflow (#22)
* update variable name

* update docker port

* fix alembic migration files

* 给远端 SSH 调用加了 keepalive

* fix app-expo code file format

* comment out quality test threshold

---------

Co-authored-by: Kevin <kevin@brighteng.org>
2026-05-11 11:33:07 +08:00

289 lines
7.5 KiB
TypeScript

import { config } from '@/core/config';
import { tokenManager } from '@/core/auth/token-manager';
import type {
RawClientMessage,
RawServerMessage,
WsConnectionState,
WsEvent,
} from './types';
export type WsEventListener = (event: WsEvent) => void;
export type WsStateListener = (state: WsConnectionState) => void;
function buildWsUrl(conversationId: string, token: string): string {
const baseUrl = config.wsBaseUrl.replace(/\/+$/u, '');
const encodedConversationId = encodeURIComponent(conversationId);
const encodedToken = encodeURIComponent(token);
return `${baseUrl}/ws/conversation/${encodedConversationId}?token=${encodedToken}`;
}
function mapServerMessage(raw: RawServerMessage): WsEvent | null {
const cid = raw.conversation_id;
const d = raw.data;
switch (raw.type) {
case 'connect':
return { kind: 'connected', conversationId: cid };
case 'transcript':
return {
kind: 'transcript_received',
conversationId: cid,
text: d.text as string,
audioDuration: d.audio_duration as number | undefined,
voiceSessionId: d.voice_session_id as string | undefined,
segmentIndex: d.segment_index as number | undefined,
isLast: d.is_last as boolean | undefined,
};
case 'agent_response':
return {
kind: 'agent_response',
conversationId: cid,
text: d.text as string,
index: d.index as number | undefined,
total: d.total as number | undefined,
assistantMessageId: d.assistant_message_id as string | undefined,
isTransition: d.transition as boolean | undefined,
segmentIndex: d.segment_index as number | undefined,
};
case 'tts_audio':
return {
kind: 'tts_audio_received',
conversationId: cid,
audioBase64: d.audio_base64 as string,
audioUrl: d.audio_url as string | undefined,
index: d.index as number | undefined,
total: d.total as number | undefined,
assistantMessageId: d.assistant_message_id as string | undefined,
manual: d.manual as boolean | undefined,
};
case 'end_conversation':
return { kind: 'conversation_ended', conversationId: cid };
case 'memoir_update':
return { kind: 'memoir_updated', conversationId: cid, data: d };
case 'error':
return {
kind: 'session_error',
conversationId: cid,
message: d.message as string,
code: d.code as string | undefined,
segmentIndex: d.segment_index as number | undefined,
};
default:
return null;
}
}
export class WsClient {
private ws: WebSocket | null = null;
private conversationId: string;
private state: WsConnectionState = 'disconnected';
private eventListeners = new Set<WsEventListener>();
private stateListeners = new Set<WsStateListener>();
private reconnectAttempt = 0;
private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
private heartbeatTimer: ReturnType<typeof setInterval> | null = null;
private disposed = false;
constructor(conversationId: string) {
this.conversationId = conversationId;
}
// ─── Public API ───
async connect(): Promise<void> {
if (this.disposed) return;
if (this.state === 'connected' || this.state === 'connecting') return;
this.setState('connecting');
const token = await tokenManager.getAccessToken();
if (!token) {
this.setState('disconnected');
return;
}
const url = buildWsUrl(this.conversationId, token);
try {
this.ws = new WebSocket(url);
} catch {
this.scheduleReconnect();
return;
}
this.ws.onopen = () => {
this.reconnectAttempt = 0;
this.setState('connected');
this.startHeartbeat();
};
this.ws.onmessage = (event: MessageEvent) => {
this.handleMessage(event.data as string);
};
this.ws.onclose = () => {
this.stopHeartbeat();
this.setState('disconnected');
if (!this.disposed) {
this.scheduleReconnect();
}
};
this.ws.onerror = () => {
this.ws?.close();
};
}
disconnect(): void {
this.cancelReconnect();
this.stopHeartbeat();
if (this.ws) {
this.ws.onclose = null;
this.ws.close();
this.ws = null;
}
this.setState('disconnected');
}
dispose(): void {
this.disposed = true;
this.disconnect();
this.eventListeners.clear();
this.stateListeners.clear();
}
/** Returns true if the message was sent, false if the socket was not open. */
send(message: Omit<RawClientMessage, 'conversation_id'>): boolean {
if (this.ws?.readyState !== WebSocket.OPEN) return false;
const payload: RawClientMessage = {
...message,
conversation_id: this.conversationId,
};
this.ws.send(JSON.stringify(payload));
return true;
}
sendText(text: string, opts?: { ttsThisTurn?: boolean }): boolean {
return this.send({
type: 'text',
data: {
text,
...(opts?.ttsThisTurn === true ? { tts_this_turn: true } : {}),
},
});
}
sendTtsCancel(): boolean {
return this.send({ type: 'tts_cancel', data: {} });
}
sendTtsRequest(body: {
assistantMessageId: string;
segmentIndex: number;
segmentText?: string;
}): boolean {
return this.send({
type: 'tts_request',
data: {
assistant_message_id: body.assistantMessageId,
segment_index: body.segmentIndex,
...(body.segmentText != null && body.segmentText !== ''
? { segment_text: body.segmentText }
: {}),
},
});
}
sendEndConversation(): boolean {
return this.send({ type: 'end_conversation', data: {} });
}
getState(): WsConnectionState {
return this.state;
}
onEvent(listener: WsEventListener): () => void {
this.eventListeners.add(listener);
return () => this.eventListeners.delete(listener);
}
onStateChange(listener: WsStateListener): () => void {
this.stateListeners.add(listener);
return () => this.stateListeners.delete(listener);
}
// ─── Internals ───
private handleMessage(raw: string): void {
let parsed: RawServerMessage;
try {
parsed = JSON.parse(raw) as RawServerMessage;
} catch {
return;
}
const event = mapServerMessage(parsed);
if (event) {
for (const listener of this.eventListeners) {
listener(event);
}
}
}
private setState(next: WsConnectionState): void {
if (this.state === next) return;
this.state = next;
for (const listener of this.stateListeners) {
listener(next);
}
}
private scheduleReconnect(): void {
if (this.disposed) return;
if (this.reconnectAttempt >= config.ws.reconnectMaxRetries) return;
const delay = Math.min(
config.ws.reconnectBaseDelayMs * Math.pow(2, this.reconnectAttempt),
config.ws.reconnectMaxDelayMs,
);
this.reconnectAttempt++;
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null;
this.connect();
}, delay);
}
private cancelReconnect(): void {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
}
private startHeartbeat(): void {
this.stopHeartbeat();
this.heartbeatTimer = setInterval(() => {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: 'ping' }));
}
}, config.ws.heartbeatIntervalMs);
}
private stopHeartbeat(): void {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
}