fix(conversation): 修复实时会话 TTS/回复被离屏 WS 抢占

- 列表预热仅预取消息缓存,避免后台 WebSocket 覆盖服务端连接
- RealtimeSession UI 回调按 owner 独占,防止 offscreen 覆盖聊天页
- 列表页聚焦时再 prewarm,会话页 TTS 入队优先 base64
- 管线下发 TTS 同时带 audio_base64 与 audio_url;协议说明同步
- 移除 TTS 排查用前后端调试日志,保留错误/告警
- 补充 WS / RealtimeSession / entry-warmup / 播放器相关单测

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Kevin
2026-05-12 10:42:44 +08:00
parent 93be60f74c
commit 3d01085442
18 changed files with 643 additions and 261 deletions

View File

@@ -1,7 +1,10 @@
import type { QueryClient } from '@tanstack/react-query';
import { AppState, type AppStateStatus } from 'react-native';
import { RealtimeSession } from './realtime-session';
import {
RealtimeSession,
type RealtimeSessionUiOwner,
} from './realtime-session';
type Slot = { conversationId: string; session: RealtimeSession };
@@ -34,8 +37,11 @@ const offScreenUi = {
};
/** 离屏:保持 WebSocket去掉 UI 回调,避免列表页播 TTS 或对已卸载组件 setState */
export function releaseConversationWsUi(session: RealtimeSession): void {
session.attachUiCallbacks({
export function releaseConversationWsUi(
session: RealtimeSession,
owner: RealtimeSessionUiOwner,
): void {
session.releaseUiCallbacks(owner, {
onStreamingText: offScreenUi.onStreamingText,
onTtsSegment: offScreenUi.onTtsSegment,
onError: offScreenUi.onError,

View File

@@ -1,6 +1,5 @@
import type { QueryClient } from '@tanstack/react-query';
import { acquireBackgroundConversationWs } from './conversation-ws-background-pool';
import { conversationMessagesRepository } from './conversation-messages-repository';
import { conversationKeys } from './query-keys';
import { registerPreparedRealtimeSession } from './prepared-session-registry';
@@ -52,17 +51,13 @@ export async function prefetchConversationMessages(
});
}
const offscreenUiCallbacks = {
onStreamingText: () => {},
onTtsSegment: () => {},
onError: () => {},
onStateChange: () => {},
};
const inflightPrewarms = new Set<string>();
/**
* 列表页/卡片按下时的预热:保持后台 WS 连接,并触发消息缓存填充
* 列表页/卡片按下时的预热:只填充消息缓存,不建立后台 WS
*
* 后端当前以 conversation_id 记录 active WebSocket离屏 WS 会覆盖聊天页连接,
* 导致本轮 TTS/agent_response 发到 offscreen session页面停在「回复中」。
* 与 `warmupConversationOpening` 不同:不等待开场白、不阻塞调用方,仅适用于"已有消息"的会话。
*/
export function prewarmConversationSession(
@@ -70,13 +65,6 @@ export function prewarmConversationSession(
conversationId: string,
): void {
if (!conversationId) return;
const session = acquireBackgroundConversationWs(
conversationId,
queryClient,
null,
);
// 预热阶段没有挂载的 UI先用空回调占位聊天页 mount 时会重新 attach。
session.attachUiCallbacks(offscreenUiCallbacks);
if (inflightPrewarms.has(conversationId)) return;
const cached = queryClient.getQueryData<MessageItem[]>(
conversationKeys.messages(conversationId),

View File

@@ -18,6 +18,7 @@ import { conversationKeys } from './query-keys';
import { takePreparedRealtimeSession } from './prepared-session-registry';
import {
type ErrorCallback,
type RealtimeSessionUiOwner,
type StreamingTextCallback,
type TtsSegmentPayload,
type RealtimeSession,
@@ -219,6 +220,9 @@ export function useRealtimeSession({
}: UseRealtimeSessionOptions): RealtimeSessionState {
const queryClient = useQueryClient();
const sessionRef = useRef<RealtimeSession | null>(null);
const uiOwnerRef = useRef<RealtimeSessionUiOwner>(
Symbol('conversation-screen-ui'),
);
const uiRef = useRef({
handleStreamingText: (() => {}) as StreamingTextCallback,
handleError: (() => {}) as ErrorCallback,
@@ -300,20 +304,23 @@ export function useRealtimeSession({
prepared,
);
session.attachUiCallbacks({
onStreamingText: (text, isComplete) => {
uiRef.current.handleStreamingText(text, isComplete);
session.attachUiCallbacks(
{
onStreamingText: (text, isComplete) => {
uiRef.current.handleStreamingText(text, isComplete);
},
onTtsSegment: (payload) => uiRef.current.onTtsSegment?.(payload),
onError: (message, code) => uiRef.current.handleError(message, code),
onStateChange: setConnectionState,
},
onTtsSegment: (payload) => uiRef.current.onTtsSegment?.(payload),
onError: (message, code) => uiRef.current.handleError(message, code),
onStateChange: setConnectionState,
});
uiOwnerRef.current,
);
sessionRef.current = session;
setConnectionState(session.getConnectionState());
return () => {
releaseConversationWsUi(session);
releaseConversationWsUi(session, uiOwnerRef.current);
sessionRef.current = null;
setConnectionState('disconnected');
setStreamingMessage(null);

View File

@@ -21,6 +21,7 @@ function looksLikeUuidAssistantMessageId(id: string): boolean {
export type StreamingTextCallback = (text: string, isComplete: boolean) => void;
export type ErrorCallback = (message: string, code?: string) => void;
export type RealtimeSessionUiOwner = symbol;
/** WebSocket `tts_audio`:服务端可能只带 base64、只带 COS URL或两者都有 */
export type TtsSegmentPayload = {
@@ -32,6 +33,8 @@ export type TtsSegmentPayload = {
assistantMessageId?: string;
/** 用户点喇叭按需下发时为 true应加入播放队列即使未开「本轮朗读」 */
manual?: boolean;
/** 本段属于用户显式打开 Speak 的自动朗读轮次。 */
autoPlay?: boolean;
};
interface RealtimeSessionOptions {
@@ -63,6 +66,7 @@ export class RealtimeSession {
private onTtsSegment?: (payload: TtsSegmentPayload) => void;
private onError?: ErrorCallback;
private uiStateListener?: WsStateListener;
private uiOwner: RealtimeSessionUiOwner | null = null;
private unsubEvent: (() => void) | null = null;
private unsubState: (() => void) | null = null;
@@ -75,6 +79,18 @@ export class RealtimeSession {
private assistantTurnTtsSync = false;
private pendingTtsByKey = new Map<string, TtsSegmentPayload>();
/** `agent_response` 早于 `tts_audio` 时延后落库,超时后露字且无播放 */
private readonly SYNC_REVEAL_TIMEOUT_MS = 3000;
private deferredSyncCommits = new Map<
string,
{
timeoutId: ReturnType<typeof setTimeout>;
commit: () => void;
index: number;
total: number;
}
>();
private static bufferedTtsKey(
assistantMessageId: string | undefined,
index: number,
@@ -99,13 +115,20 @@ export class RealtimeSession {
}
/** 列表预热接棒或刷新 UI 订阅时替换回调,不重建 WebSocket */
attachUiCallbacks(options: {
onStreamingText?: StreamingTextCallback;
onTtsSegment?: (payload: TtsSegmentPayload) => void;
onError?: ErrorCallback;
onStateChange?: WsStateListener;
}): void {
attachUiCallbacks(
options: {
onStreamingText?: StreamingTextCallback;
onTtsSegment?: (payload: TtsSegmentPayload) => void;
onError?: ErrorCallback;
onStateChange?: WsStateListener;
},
owner?: RealtimeSessionUiOwner,
): void {
if (this.destroyed) return;
if (this.uiOwner && owner !== this.uiOwner) {
return;
}
this.uiOwner = owner ?? null;
if (options.onStreamingText !== undefined) {
this.onStreamingText = options.onStreamingText;
}
@@ -124,6 +147,23 @@ export class RealtimeSession {
}
}
releaseUiCallbacks(
owner: RealtimeSessionUiOwner,
options: {
onStreamingText?: StreamingTextCallback;
onTtsSegment?: (payload: TtsSegmentPayload) => void;
onError?: ErrorCallback;
onStateChange?: WsStateListener;
},
): void {
if (this.destroyed) return;
if (this.uiOwner !== owner) {
return;
}
this.uiOwner = null;
this.attachUiCallbacks(options);
}
async connect(): Promise<void> {
await this.client.connect();
}
@@ -144,6 +184,7 @@ export class RealtimeSession {
/** Returns true if the message was sent over the socket. */
sendText(text: string, options?: { ttsThisTurn?: boolean }): boolean {
this.beginNewOutboundUserTurnCleanup();
const tts = !!options?.ttsThisTurn;
this.assistantTurnTtsSync = tts;
return this.client.sendText(text, { ttsThisTurn: tts });
@@ -160,6 +201,7 @@ export class RealtimeSession {
ttsThisTurn?: boolean;
},
): boolean {
this.beginNewOutboundUserTurnCleanup();
const tts = !!options?.ttsThisTurn;
this.assistantTurnTtsSync = tts;
return this.client.send({
@@ -198,32 +240,99 @@ export class RealtimeSession {
return this.client.sendTtsRequest(body);
}
/** 新开一轮用户发到 WS 之前:清空上轮 sync 残留的缓冲,避免占位 / 错乱 */
private beginNewOutboundUserTurnCleanup(): void {
if (this.deferredSyncCommits.size === 0 && this.pendingTtsByKey.size === 0) {
return;
}
for (const def of this.deferredSyncCommits.values()) {
clearTimeout(def.timeoutId);
try {
def.commit();
} catch {
/* best-effort */
}
}
this.deferredSyncCommits.clear();
this.pendingTtsByKey.clear();
}
// ─── Internal ───
private resetAssistantTtsSyncState(): void {
for (const def of this.deferredSyncCommits.values()) {
clearTimeout(def.timeoutId);
try {
def.commit();
} catch {
/* 取消打断时尽量不丢正文 */
}
}
this.deferredSyncCommits.clear();
this.assistantTurnTtsSync = false;
this.pendingTtsByKey.clear();
}
private flushBufferedTtsIfSync(
/** sync 模式下取出缓冲的该段 TTS调用方需先落缓存再转发方便 UI 绑定 URL */
private takeBufferedTtsIfSync(
assistantMessageId: string | undefined,
index: number,
): void {
if (!this.assistantTurnTtsSync) return;
): TtsSegmentPayload | null {
if (!this.assistantTurnTtsSync) return null;
const key = RealtimeSession.bufferedTtsKey(assistantMessageId, index);
const payload = this.pendingTtsByKey.get(key);
if (payload) {
if (!payload) return null;
this.pendingTtsByKey.delete(key);
return payload;
}
/**
* 正文已先于音频到达:`commit` 延至收到 `tts_audio` 或超时(无音频路径则照常露字)
*/
private scheduleDeferredSyncCommit(
key: string,
index: number,
total: number,
commit: () => void,
): void {
const timeoutId = setTimeout(() => {
const def = this.deferredSyncCommits.get(key);
if (!def || def.timeoutId !== timeoutId) return;
this.deferredSyncCommits.delete(key);
this.pendingTtsByKey.delete(key);
this.onTtsSegment?.(payload);
}
def.commit();
this.finishAssistantTurnIfLastSegment(def.index, def.total);
}, this.SYNC_REVEAL_TIMEOUT_MS);
this.deferredSyncCommits.set(key, { timeoutId, commit, index, total });
}
/** 迟到 `tts_audio` 与延后落库会合:先写缓存再入队播放,确保 URL 能绑定到气泡 */
private tryResolveDeferredSyncWithIncomingTts(
key: string,
incoming: TtsSegmentPayload,
): boolean {
const def = this.deferredSyncCommits.get(key);
if (!def) return false;
clearTimeout(def.timeoutId);
this.deferredSyncCommits.delete(key);
def.commit();
this.onTtsSegment?.(incoming);
this.finishAssistantTurnIfLastSegment(def.index, def.total);
return true;
}
private finishAssistantTurnIfLastSegment(index: number, total: number): void {
if (index >= total - 1) {
this.resetAssistantTtsSyncState();
this.assistantTurnTtsSync = false;
this.pendingTtsByKey.clear();
}
}
/** sync 多段回复不走 footer 流式展示,但仍要清掉「正在回复」占位气泡。 */
private clearAssistantPendingUi(): void {
this.onStreamingText?.('', true);
}
private handleEvent: WsEventListener = (event: WsEvent) => {
if (event.kind === 'agent_response') {
this.handleAgentChunk(event);
@@ -244,12 +353,21 @@ export class RealtimeSession {
assistantMessageId: event.assistantMessageId,
manual: event.manual,
};
if (this.assistantTurnTtsSync && !payload.manual) {
const idx = event.index ?? 0;
const key = RealtimeSession.bufferedTtsKey(
event.assistantMessageId,
idx,
);
payload.autoPlay = true;
const resolvedDeferred = this.tryResolveDeferredSyncWithIncomingTts(
key,
payload,
);
if (resolvedDeferred) {
return;
}
this.pendingTtsByKey.set(key, payload);
} else {
this.onTtsSegment?.(payload);
@@ -292,11 +410,32 @@ export class RealtimeSession {
? assistantSegmentMessageId(event.assistantMessageId, index)
: `${this.conversationId}_agent_${Date.now()}_${index}`;
if (sync) {
this.flushBufferedTtsIfSync(event.assistantMessageId, index);
const bufferedTts = this.takeBufferedTtsIfSync(
event.assistantMessageId,
index,
);
if (bufferedTts) {
this.commitOneAssistantMessage(event.text, id);
this.clearAssistantPendingUi();
this.onTtsSegment?.(bufferedTts);
this.finishAssistantTurnIfLastSegment(index, total);
} else {
const key = RealtimeSession.bufferedTtsKey(
event.assistantMessageId,
index,
);
const textCaptured = event.text;
const idCaptured = id;
this.scheduleDeferredSyncCommit(key, index, total, () => {
this.commitOneAssistantMessage(textCaptured, idCaptured);
this.clearAssistantPendingUi();
});
}
} else {
this.commitOneAssistantMessage(event.text, id);
this.onStreamingText?.(event.text, true);
this.finishAssistantTurnIfLastSegment(index, total);
}
this.commitOneAssistantMessage(event.text, id);
this.onStreamingText?.(event.text, true);
this.finishAssistantTurnIfLastSegment(index, total);
return;
}
@@ -317,18 +456,40 @@ export class RealtimeSession {
const id =
this.pendingAssistantMessageId ??
`${this.conversationId}_agent_${Date.now()}`;
let finishSyncTurnNow = false;
if (sync) {
this.flushBufferedTtsIfSync(assistantId ?? undefined, 0);
this.commitStreamingBufferWithId(id);
const visible =
this.streamingBuffer.trim().length > 0 ? this.streamingBuffer : '…';
this.onStreamingText?.(visible, true);
const bufferedTts = this.takeBufferedTtsIfSync(
assistantId ?? undefined,
0,
);
if (bufferedTts) {
this.commitStreamingBufferWithId(id);
const visible =
this.streamingBuffer.trim().length > 0 ? this.streamingBuffer : '…';
this.onStreamingText?.(visible, true);
this.onTtsSegment?.(bufferedTts);
finishSyncTurnNow = true;
} else {
const snapshot = this.streamingBuffer;
const key = RealtimeSession.bufferedTtsKey(assistantId ?? undefined, 0);
const idCaptured = id;
this.scheduleDeferredSyncCommit(key, 0, 1, () => {
this.streamingBuffer = snapshot;
this.commitStreamingBufferWithId(idCaptured);
this.streamingBuffer = '';
const visible = snapshot.trim().length > 0 ? snapshot : '…';
this.onStreamingText?.(visible, true);
});
}
} else {
this.commitStreamingBufferWithId(id);
finishSyncTurnNow = true;
}
this.streamingBuffer = '';
this.pendingAssistantMessageId = null;
this.finishAssistantTurnIfLastSegment(0, 1);
if (!sync || finishSyncTurnNow) {
this.finishAssistantTurnIfLastSegment(0, 1);
}
}
}

View File

@@ -94,6 +94,12 @@ export function usePlayer(): UsePlayerResult {
const acquired = await audioFocus.acquireForPlayback();
if (!acquired) {
/**
* 录音占用时 acquire 失败且队列尚未 shift若用户进入会话前焦点已在
* `recorder`,可能不会再次触发 `onOwnerChange('recorder')`,旧的
* `wasBlockedByRecorderRef` 不会被置位,录音结束后也不会重试 playNext。
*/
wasBlockedByRecorderRef.current = true;
setStatus('idle');
return;
}
@@ -172,14 +178,17 @@ export function usePlayer(): UsePlayerResult {
if (owner === null && wasBlockedByRecorderRef.current) {
wasBlockedByRecorderRef.current = false;
if (queueRef.current.length > 0 && status === 'idle') {
playNext();
if (
queueRef.current.length > 0 &&
playbackActiveUriRef.current === null
) {
void playNext();
}
}
});
return unsub;
}, [status, currentSource, playNext]);
}, [currentSource, playNext]);
const enqueue = useCallback(
async (item: PlaybackItem) => {