feat(app-expo): conversation WS warmup, background pool, lifecycle
Prefetch opening over WebSocket from the conversations list before navigation, with prepared-session handoff into the chat screen. Add a single-slot background pool so leaving chat (in-app) keeps the last session socket with UI callbacks stripped; dispose on app background and reconnect after resume when the chat screen is mounted. Tear down pooled sockets on logout, purge, and conversation delete. RealtimeSession supports attachUiCallbacks and idempotent dispose, and the chat composer hides the connection notice while connecting if assistant history already exists. Fix pause handler wiring in the conversation screen. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -1,17 +1,25 @@
|
||||
import { useMutation, useQuery, useQueryClient } from '@tanstack/react-query';
|
||||
import { File, Paths } from 'expo-file-system';
|
||||
import { useCallback, useEffect, useRef, useState } from 'react';
|
||||
import { AppState, type AppStateStatus } from 'react-native';
|
||||
|
||||
import type { WsConnectionState } from '@/core/ws/types';
|
||||
|
||||
import { conversationApi } from './api';
|
||||
import {
|
||||
acquireBackgroundConversationWs,
|
||||
disposeAllBackgroundConversationWs,
|
||||
disposeBackgroundConversationWs,
|
||||
releaseConversationWsUi,
|
||||
} from './conversation-ws-background-pool';
|
||||
import { conversationMessagesRepository } from './conversation-messages-repository';
|
||||
import { conversationKeys } from './query-keys';
|
||||
import { takePreparedRealtimeSession } from './prepared-session-registry';
|
||||
import {
|
||||
RealtimeSession,
|
||||
type ErrorCallback,
|
||||
type StreamingTextCallback,
|
||||
type TtsSegmentPayload,
|
||||
type RealtimeSession,
|
||||
} from './realtime-session';
|
||||
import {
|
||||
type ConversationListItem,
|
||||
@@ -126,6 +134,7 @@ export function useDeleteConversation() {
|
||||
mutationFn: (conversationId: string) =>
|
||||
conversationApi.delete(conversationId),
|
||||
onSuccess: async (_, conversationId) => {
|
||||
disposeBackgroundConversationWs(conversationId);
|
||||
await voiceSegmentStore.clearConversation(conversationId);
|
||||
queryClient.setQueryData<ConversationListItem[]>(
|
||||
conversationKeys.lists(),
|
||||
@@ -197,6 +206,13 @@ export function useRealtimeSession({
|
||||
}: UseRealtimeSessionOptions): RealtimeSessionState {
|
||||
const queryClient = useQueryClient();
|
||||
const sessionRef = useRef<RealtimeSession | null>(null);
|
||||
const uiRef = useRef({
|
||||
handleStreamingText: (() => {}) as StreamingTextCallback,
|
||||
handleError: (() => {}) as ErrorCallback,
|
||||
onTtsSegment: undefined as
|
||||
| ((payload: TtsSegmentPayload) => void)
|
||||
| undefined,
|
||||
});
|
||||
|
||||
const [connectionState, setConnectionState] =
|
||||
useState<WsConnectionState>('disconnected');
|
||||
@@ -205,6 +221,10 @@ export function useRealtimeSession({
|
||||
const [awaitingAssistantReply, setAwaitingAssistantReply] = useState(false);
|
||||
const [error, setError] = useState<string | null>(null);
|
||||
|
||||
const [foregroundResumeGeneration, setForegroundResumeGeneration] =
|
||||
useState(0);
|
||||
const needsResumeAfterBackgroundRef = useRef(false);
|
||||
|
||||
const handleStreamingText: StreamingTextCallback = useCallback(
|
||||
(text, isComplete) => {
|
||||
if (text.trim().length > 0) {
|
||||
@@ -225,36 +245,60 @@ export function useRealtimeSession({
|
||||
setError(message);
|
||||
}, []);
|
||||
|
||||
uiRef.current.handleStreamingText = handleStreamingText;
|
||||
uiRef.current.handleError = handleError;
|
||||
uiRef.current.onTtsSegment = onTtsSegment;
|
||||
|
||||
useEffect(() => {
|
||||
if (!enabled || !conversationId) return;
|
||||
|
||||
const session = new RealtimeSession({
|
||||
const sub = AppState.addEventListener('change', (next: AppStateStatus) => {
|
||||
if (next === 'background') {
|
||||
needsResumeAfterBackgroundRef.current = true;
|
||||
disposeAllBackgroundConversationWs();
|
||||
sessionRef.current = null;
|
||||
setConnectionState('disconnected');
|
||||
setStreamingMessage(null);
|
||||
setAwaitingAssistantReply(false);
|
||||
} else if (next === 'active' && needsResumeAfterBackgroundRef.current) {
|
||||
needsResumeAfterBackgroundRef.current = false;
|
||||
setForegroundResumeGeneration((g) => g + 1);
|
||||
}
|
||||
});
|
||||
|
||||
return () => sub.remove();
|
||||
}, [enabled, conversationId]);
|
||||
|
||||
useEffect(() => {
|
||||
if (!enabled || !conversationId) return;
|
||||
|
||||
const prepared = takePreparedRealtimeSession(conversationId);
|
||||
const session = acquireBackgroundConversationWs(
|
||||
conversationId,
|
||||
queryClient,
|
||||
onStreamingText: handleStreamingText,
|
||||
onTtsSegment,
|
||||
onError: handleError,
|
||||
prepared,
|
||||
);
|
||||
|
||||
session.attachUiCallbacks({
|
||||
onStreamingText: (text, isComplete) => {
|
||||
uiRef.current.handleStreamingText(text, isComplete);
|
||||
},
|
||||
onTtsSegment: (payload) => uiRef.current.onTtsSegment?.(payload),
|
||||
onError: (message, code) => uiRef.current.handleError(message, code),
|
||||
onStateChange: setConnectionState,
|
||||
});
|
||||
|
||||
sessionRef.current = session;
|
||||
session.connect();
|
||||
setConnectionState(session.getConnectionState());
|
||||
|
||||
return () => {
|
||||
session.dispose();
|
||||
releaseConversationWsUi(session);
|
||||
sessionRef.current = null;
|
||||
setConnectionState('disconnected');
|
||||
setStreamingMessage(null);
|
||||
setAwaitingAssistantReply(false);
|
||||
};
|
||||
}, [
|
||||
conversationId,
|
||||
enabled,
|
||||
queryClient,
|
||||
handleStreamingText,
|
||||
handleError,
|
||||
onTtsSegment,
|
||||
]);
|
||||
}, [conversationId, enabled, queryClient, foregroundResumeGeneration]);
|
||||
|
||||
const sendText = useCallback(
|
||||
(text: string) => {
|
||||
|
||||
Reference in New Issue
Block a user