fix mobile chat run reconnect (#993)
This commit is contained in:
@@ -72,6 +72,19 @@ export interface RunEvent {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface ResumeSessionPayload {
|
||||||
|
session_id: string
|
||||||
|
messages: any[]
|
||||||
|
isWorking: boolean
|
||||||
|
isAborting?: boolean
|
||||||
|
events: Array<{ event: string; data: RunEvent }>
|
||||||
|
inputTokens?: number
|
||||||
|
outputTokens?: number
|
||||||
|
contextTokens?: number
|
||||||
|
queueLength?: number
|
||||||
|
queueMessages?: RunEvent['queued_messages']
|
||||||
|
}
|
||||||
|
|
||||||
// ============================
|
// ============================
|
||||||
// Socket.IO chat run connection
|
// Socket.IO chat run connection
|
||||||
// ============================
|
// ============================
|
||||||
@@ -80,6 +93,12 @@ let chatRunSocket: Socket | null = null
|
|||||||
let globalListenersRegistered = false
|
let globalListenersRegistered = false
|
||||||
let chatRunSocketProfile: string | null = null
|
let chatRunSocketProfile: string | null = null
|
||||||
|
|
||||||
|
const TRANSIENT_DISCONNECT_REASONS = new Set<string>([
|
||||||
|
'transport close',
|
||||||
|
'transport error',
|
||||||
|
'ping timeout',
|
||||||
|
])
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Session event handlers map
|
* Session event handlers map
|
||||||
* Maps session_id to event handling functions for isolating concurrent session streams
|
* Maps session_id to event handling functions for isolating concurrent session streams
|
||||||
@@ -597,7 +616,7 @@ function removeSocketListener(socket: Socket, event: string, handler: (...args:
|
|||||||
*/
|
*/
|
||||||
export function resumeSession(
|
export function resumeSession(
|
||||||
sessionId: string,
|
sessionId: string,
|
||||||
onResumed: (data: { session_id: string; messages: any[]; isWorking: boolean; isAborting?: boolean; events: any[]; inputTokens?: number; outputTokens?: number; contextTokens?: number; queueLength?: number; queueMessages?: RunEvent['queued_messages'] }) => void,
|
onResumed: (data: ResumeSessionPayload) => void,
|
||||||
profile?: string | null,
|
profile?: string | null,
|
||||||
): Socket {
|
): Socket {
|
||||||
const socket = connectChatRun(profile)
|
const socket = connectChatRun(profile)
|
||||||
@@ -614,6 +633,9 @@ export function startRunViaSocket(
|
|||||||
onDone: () => void,
|
onDone: () => void,
|
||||||
onError: (err: Error) => void,
|
onError: (err: Error) => void,
|
||||||
onStarted?: (runId: string) => void,
|
onStarted?: (runId: string) => void,
|
||||||
|
options?: {
|
||||||
|
onReconnectResume?: (data: ResumeSessionPayload) => void
|
||||||
|
},
|
||||||
): { abort: () => void } {
|
): { abort: () => void } {
|
||||||
const sid = body.session_id
|
const sid = body.session_id
|
||||||
if (!sid) {
|
if (!sid) {
|
||||||
@@ -622,24 +644,6 @@ export function startRunViaSocket(
|
|||||||
|
|
||||||
let closed = false
|
let closed = false
|
||||||
const socket = connectChatRun(body.profile)
|
const socket = connectChatRun(body.profile)
|
||||||
const handleSocketError = (err: Error) => {
|
|
||||||
if (closed) return
|
|
||||||
closed = true
|
|
||||||
sessionEventHandlers.delete(sid)
|
|
||||||
onError(err)
|
|
||||||
}
|
|
||||||
socket.once('connect_error', handleSocketError)
|
|
||||||
const handleSocketDisconnect = (reason: string) => {
|
|
||||||
if (closed || reason === 'io client disconnect') return
|
|
||||||
handleSocketError(new Error(`Socket disconnected: ${reason}`))
|
|
||||||
}
|
|
||||||
socket.once('disconnect', handleSocketDisconnect)
|
|
||||||
|
|
||||||
const removeTerminalSocketListeners = () => {
|
|
||||||
removeSocketListener(socket, 'connect_error', handleSocketError)
|
|
||||||
removeSocketListener(socket, 'disconnect', handleSocketDisconnect)
|
|
||||||
}
|
|
||||||
|
|
||||||
if (sessionEventHandlers.has(sid)) {
|
if (sessionEventHandlers.has(sid)) {
|
||||||
socket.emit('run', body)
|
socket.emit('run', body)
|
||||||
return {
|
return {
|
||||||
@@ -651,6 +655,66 @@ export function startRunViaSocket(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let sawTransientDisconnect = false
|
||||||
|
let removeTerminalSocketListeners: () => void = () => {}
|
||||||
|
let reconnectResumeHandler: ((data: ResumeSessionPayload) => void) | null = null
|
||||||
|
|
||||||
|
const clearReconnectResumeHandler = () => {
|
||||||
|
if (!reconnectResumeHandler) return
|
||||||
|
removeSocketListener(socket, 'resumed', reconnectResumeHandler)
|
||||||
|
reconnectResumeHandler = null
|
||||||
|
}
|
||||||
|
|
||||||
|
const emitReconnectResume = () => {
|
||||||
|
clearReconnectResumeHandler()
|
||||||
|
if (options?.onReconnectResume) {
|
||||||
|
reconnectResumeHandler = (data: ResumeSessionPayload) => {
|
||||||
|
clearReconnectResumeHandler()
|
||||||
|
if (closed || data.session_id !== sid) return
|
||||||
|
options.onReconnectResume?.(data)
|
||||||
|
}
|
||||||
|
socket.on('resumed', reconnectResumeHandler)
|
||||||
|
}
|
||||||
|
socket.emit('resume', { session_id: sid, ...(body.profile ? { profile: body.profile } : {}) })
|
||||||
|
}
|
||||||
|
|
||||||
|
const handleSocketError = (err: Error) => {
|
||||||
|
if (closed) return
|
||||||
|
closed = true
|
||||||
|
removeTerminalSocketListeners()
|
||||||
|
sessionEventHandlers.delete(sid)
|
||||||
|
onError(err)
|
||||||
|
}
|
||||||
|
const handleSocketConnectError = (err: Error) => {
|
||||||
|
if (closed) return
|
||||||
|
if (sawTransientDisconnect) return
|
||||||
|
handleSocketError(err)
|
||||||
|
}
|
||||||
|
socket.on('connect_error', handleSocketConnectError)
|
||||||
|
const handleSocketDisconnect = (reason: string) => {
|
||||||
|
if (closed || reason === 'io client disconnect') return
|
||||||
|
if (TRANSIENT_DISCONNECT_REASONS.has(reason)) {
|
||||||
|
sawTransientDisconnect = true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
handleSocketError(new Error(`Socket disconnected: ${reason}`))
|
||||||
|
}
|
||||||
|
socket.on('disconnect', handleSocketDisconnect)
|
||||||
|
|
||||||
|
const handleSocketReconnect = () => {
|
||||||
|
if (closed || !sawTransientDisconnect) return
|
||||||
|
sawTransientDisconnect = false
|
||||||
|
emitReconnectResume()
|
||||||
|
}
|
||||||
|
socket.on('connect', handleSocketReconnect)
|
||||||
|
|
||||||
|
removeTerminalSocketListeners = () => {
|
||||||
|
clearReconnectResumeHandler()
|
||||||
|
removeSocketListener(socket, 'connect_error', handleSocketConnectError)
|
||||||
|
removeSocketListener(socket, 'disconnect', handleSocketDisconnect)
|
||||||
|
removeSocketListener(socket, 'connect', handleSocketReconnect)
|
||||||
|
}
|
||||||
|
|
||||||
// Define event handlers for this session
|
// Define event handlers for this session
|
||||||
const handlers = {
|
const handlers = {
|
||||||
onMessageDelta: (evt: RunEvent) => {
|
onMessageDelta: (evt: RunEvent) => {
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import { startRunViaSocket, resumeSession, registerSessionHandlers, unregisterSessionHandlers, getChatRunSocket, respondToolApproval, onPeerUserMessage, respondClarify, type RunEvent, type ContentBlock as ContentBlockImport } from '@/api/hermes/chat'
|
import { startRunViaSocket, resumeSession, registerSessionHandlers, unregisterSessionHandlers, getChatRunSocket, respondToolApproval, onPeerUserMessage, respondClarify, type RunEvent, type ResumeSessionPayload, type ContentBlock as ContentBlockImport } from '@/api/hermes/chat'
|
||||||
import { deleteSession as deleteSessionApi, fetchSession, fetchSessions, setSessionModel, type HermesMessage, type SessionSummary } from '@/api/hermes/sessions'
|
import { deleteSession as deleteSessionApi, fetchSession, fetchSessions, setSessionModel, type HermesMessage, type SessionSummary } from '@/api/hermes/sessions'
|
||||||
import { getActiveProfileName } from '@/api/client'
|
import { getActiveProfileName } from '@/api/client'
|
||||||
import { getDownloadUrl } from '@/api/hermes/download'
|
import { getDownloadUrl } from '@/api/hermes/download'
|
||||||
@@ -1277,6 +1277,107 @@ export const useChatStore = defineStore('chat', () => {
|
|||||||
activeAssistantMessageId = null
|
activeAssistantMessageId = null
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const applyReconnectResume = (data: ResumeSessionPayload) => {
|
||||||
|
if (data.session_id !== sid) return
|
||||||
|
const target = sessions.value.find(s => s.id === sid)
|
||||||
|
if (!target) return
|
||||||
|
|
||||||
|
if (data.isWorking) serverWorking.value.add(sid)
|
||||||
|
else serverWorking.value.delete(sid)
|
||||||
|
|
||||||
|
if (data.queueLength && data.queueLength > 0) {
|
||||||
|
queueLengths.value.set(sid, data.queueLength)
|
||||||
|
} else {
|
||||||
|
queueLengths.value.delete(sid)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (Array.isArray(data.queueMessages)) {
|
||||||
|
replaceQueuedUserMessages(sid, normalizeQueuedUserMessages(data.queueMessages))
|
||||||
|
} else if (!data.queueLength) {
|
||||||
|
replaceQueuedUserMessages(sid, [])
|
||||||
|
}
|
||||||
|
|
||||||
|
if (data.isAborting) {
|
||||||
|
setAbortState({ aborting: true, synced: null })
|
||||||
|
} else if (!data.isWorking) {
|
||||||
|
setAbortState(null)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (data.inputTokens != null) target.inputTokens = data.inputTokens
|
||||||
|
if (data.outputTokens != null) target.outputTokens = data.outputTokens
|
||||||
|
if (data.contextTokens != null) target.contextTokens = data.contextTokens
|
||||||
|
|
||||||
|
if (Array.isArray(data.messages)) {
|
||||||
|
target.messages = mapHermesMessages(data.messages as any[])
|
||||||
|
const lastAssistant = [...target.messages].reverse().find(m => m.role === 'assistant')
|
||||||
|
if (data.isWorking && lastAssistant) {
|
||||||
|
lastAssistant.isStreaming = true
|
||||||
|
activeAssistantMessageId = lastAssistant.id
|
||||||
|
if (lastAssistant.reasoning) noteReasoningStart(lastAssistant.id)
|
||||||
|
} else {
|
||||||
|
activeAssistantMessageId = null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (data.events?.length) {
|
||||||
|
for (const evt of data.events) {
|
||||||
|
const e = evt.data as RunEvent
|
||||||
|
switch (e.event) {
|
||||||
|
case 'compression.started':
|
||||||
|
setCompressionState({
|
||||||
|
compressing: true,
|
||||||
|
messageCount: (e as any).message_count || 0,
|
||||||
|
beforeTokens: (e as any).token_count || 0,
|
||||||
|
afterTokens: 0,
|
||||||
|
compressed: null,
|
||||||
|
})
|
||||||
|
break
|
||||||
|
case 'compression.completed': {
|
||||||
|
const afterTokens = (e as any).contextTokens || (e as any).afterTokens || 0
|
||||||
|
setCompressionState({
|
||||||
|
compressing: false,
|
||||||
|
messageCount: (e as any).totalMessages || 0,
|
||||||
|
beforeTokens: (e as any).beforeTokens || 0,
|
||||||
|
afterTokens,
|
||||||
|
compressed: (e as any).compressed ?? false,
|
||||||
|
error: (e as any).error,
|
||||||
|
})
|
||||||
|
if ((e as any).contextTokens != null) target.contextTokens = (e as any).contextTokens
|
||||||
|
break
|
||||||
|
}
|
||||||
|
case 'abort.started':
|
||||||
|
setAbortState({ aborting: true, synced: null })
|
||||||
|
break
|
||||||
|
case 'abort.completed':
|
||||||
|
setAbortState({ aborting: false, synced: (e as any).synced ?? false })
|
||||||
|
break
|
||||||
|
case 'approval.requested':
|
||||||
|
setPendingApproval({ ...e, session_id: sid })
|
||||||
|
break
|
||||||
|
case 'approval.resolved':
|
||||||
|
clearPendingApproval({ ...e, session_id: sid })
|
||||||
|
break
|
||||||
|
case 'clarify.requested':
|
||||||
|
setPendingClarify({ ...e, session_id: sid })
|
||||||
|
break
|
||||||
|
case 'clarify.resolved':
|
||||||
|
clearPendingClarify({ ...e, session_id: sid })
|
||||||
|
break
|
||||||
|
case 'run.failed':
|
||||||
|
addAgentErrorMessage(sid, e.error)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (activeSessionId.value === sid) activeSession.value = target
|
||||||
|
if (!data.isWorking && !(data.queueLength && data.queueLength > 0)) {
|
||||||
|
cleanup()
|
||||||
|
activeAssistantMessageId = null
|
||||||
|
updateSessionTitle(sid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Send run via Socket.IO and listen to streamed events — all closures capture `sid`
|
// Send run via Socket.IO and listen to streamed events — all closures capture `sid`
|
||||||
const ctrl = startRunViaSocket(
|
const ctrl = startRunViaSocket(
|
||||||
runPayload,
|
runPayload,
|
||||||
@@ -1694,6 +1795,7 @@ export const useChatStore = defineStore('chat', () => {
|
|||||||
cleanup()
|
cleanup()
|
||||||
},
|
},
|
||||||
undefined,
|
undefined,
|
||||||
|
{ onReconnectResume: applyReconnectResume },
|
||||||
)
|
)
|
||||||
|
|
||||||
if (!isBridgeSlashCommand || isBridgeCompressCommand) {
|
if (!isBridgeSlashCommand || isBridgeCompressCommand) {
|
||||||
|
|||||||
@@ -0,0 +1,160 @@
|
|||||||
|
import { beforeEach, describe, expect, it, vi } from 'vitest'
|
||||||
|
|
||||||
|
const socketState = vi.hoisted(() => ({
|
||||||
|
sockets: [] as any[],
|
||||||
|
}))
|
||||||
|
|
||||||
|
vi.mock('socket.io-client', () => {
|
||||||
|
function createSocket() {
|
||||||
|
const listeners = new Map<string, Set<(...args: any[]) => void>>()
|
||||||
|
|
||||||
|
const addListener = (event: string, handler: (...args: any[]) => void) => {
|
||||||
|
if (!listeners.has(event)) listeners.set(event, new Set())
|
||||||
|
listeners.get(event)!.add(handler)
|
||||||
|
}
|
||||||
|
|
||||||
|
const removeListener = (event: string, handler: (...args: any[]) => void) => {
|
||||||
|
const eventListeners = listeners.get(event)
|
||||||
|
if (!eventListeners) return
|
||||||
|
for (const candidate of [...eventListeners]) {
|
||||||
|
if (candidate === handler || (candidate as any).__original === handler) {
|
||||||
|
eventListeners.delete(candidate)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const socket: any = {
|
||||||
|
connected: true,
|
||||||
|
on: vi.fn((event: string, handler: (...args: any[]) => void) => {
|
||||||
|
addListener(event, handler)
|
||||||
|
return socket
|
||||||
|
}),
|
||||||
|
once: vi.fn((event: string, handler: (...args: any[]) => void) => {
|
||||||
|
const wrapped = (...args: any[]) => {
|
||||||
|
removeListener(event, wrapped)
|
||||||
|
handler(...args)
|
||||||
|
}
|
||||||
|
;(wrapped as any).__original = handler
|
||||||
|
addListener(event, wrapped)
|
||||||
|
return socket
|
||||||
|
}),
|
||||||
|
off: vi.fn((event: string, handler: (...args: any[]) => void) => {
|
||||||
|
removeListener(event, handler)
|
||||||
|
return socket
|
||||||
|
}),
|
||||||
|
removeListener: vi.fn((event: string, handler: (...args: any[]) => void) => {
|
||||||
|
removeListener(event, handler)
|
||||||
|
return socket
|
||||||
|
}),
|
||||||
|
removeAllListeners: vi.fn(() => {
|
||||||
|
listeners.clear()
|
||||||
|
return socket
|
||||||
|
}),
|
||||||
|
emit: vi.fn(),
|
||||||
|
disconnect: vi.fn(() => {
|
||||||
|
socket.connected = false
|
||||||
|
}),
|
||||||
|
__listenerCount: (event: string) => listeners.get(event)?.size || 0,
|
||||||
|
__trigger: (event: string, ...args: any[]) => {
|
||||||
|
if (event === 'connect') socket.connected = true
|
||||||
|
if (event === 'disconnect') socket.connected = false
|
||||||
|
for (const handler of [...(listeners.get(event) || [])]) handler(...args)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
return socket
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
io: vi.fn(() => {
|
||||||
|
const socket = createSocket()
|
||||||
|
socketState.sockets.push(socket)
|
||||||
|
return socket
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
vi.mock('../../packages/client/src/api/client', () => ({
|
||||||
|
getApiKey: () => 'test-token',
|
||||||
|
getBaseUrlValue: () => '',
|
||||||
|
}))
|
||||||
|
|
||||||
|
describe('chat-run socket reconnect handling', () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
vi.resetModules()
|
||||||
|
socketState.sockets = []
|
||||||
|
})
|
||||||
|
|
||||||
|
it('keeps transient mobile disconnects alive and resumes after reconnect', async () => {
|
||||||
|
const { startRunViaSocket } = await import('../../packages/client/src/api/hermes/chat')
|
||||||
|
const onEvent = vi.fn()
|
||||||
|
const onDone = vi.fn()
|
||||||
|
const onError = vi.fn()
|
||||||
|
const onReconnectResume = vi.fn()
|
||||||
|
|
||||||
|
startRunViaSocket(
|
||||||
|
{ session_id: 'session-1', input: 'hello', profile: 'default', source: 'cli' },
|
||||||
|
onEvent,
|
||||||
|
onDone,
|
||||||
|
onError,
|
||||||
|
undefined,
|
||||||
|
{ onReconnectResume },
|
||||||
|
)
|
||||||
|
|
||||||
|
const socket = socketState.sockets[0]
|
||||||
|
expect(socket.emit).toHaveBeenCalledWith('run', expect.objectContaining({ session_id: 'session-1' }))
|
||||||
|
|
||||||
|
socket.__trigger('disconnect', 'ping timeout')
|
||||||
|
expect(onError).not.toHaveBeenCalled()
|
||||||
|
|
||||||
|
socket.__trigger('connect_error', new Error('temporary reconnect failure'))
|
||||||
|
expect(onError).not.toHaveBeenCalled()
|
||||||
|
|
||||||
|
socket.__trigger('connect')
|
||||||
|
expect(socket.emit).toHaveBeenCalledWith('resume', { session_id: 'session-1', profile: 'default' })
|
||||||
|
|
||||||
|
const resumed = { session_id: 'session-1', messages: [], isWorking: true, events: [] }
|
||||||
|
socket.__trigger('resumed', resumed)
|
||||||
|
expect(onReconnectResume).toHaveBeenCalledWith(resumed)
|
||||||
|
|
||||||
|
socket.__trigger('message.delta', { event: 'message.delta', session_id: 'session-1', delta: 'after reconnect' })
|
||||||
|
expect(onEvent).toHaveBeenCalledWith({ event: 'message.delta', session_id: 'session-1', delta: 'after reconnect' })
|
||||||
|
expect(onDone).not.toHaveBeenCalled()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('keeps fatal disconnects fatal and removes per-run listeners', async () => {
|
||||||
|
const { startRunViaSocket } = await import('../../packages/client/src/api/hermes/chat')
|
||||||
|
const onError = vi.fn()
|
||||||
|
|
||||||
|
startRunViaSocket(
|
||||||
|
{ session_id: 'session-1', input: 'hello', profile: 'default', source: 'cli' },
|
||||||
|
vi.fn(),
|
||||||
|
vi.fn(),
|
||||||
|
onError,
|
||||||
|
)
|
||||||
|
|
||||||
|
const socket = socketState.sockets[0]
|
||||||
|
socket.__trigger('disconnect', 'io server disconnect')
|
||||||
|
|
||||||
|
expect(onError).toHaveBeenCalledOnce()
|
||||||
|
expect(onError.mock.calls[0][0].message).toBe('Socket disconnected: io server disconnect')
|
||||||
|
expect(socket.__listenerCount('connect')).toBe(0)
|
||||||
|
expect(socket.__listenerCount('disconnect')).toBe(0)
|
||||||
|
expect(socket.__listenerCount('connect_error')).toBe(0)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('does not attach extra reconnect listeners when the session already has handlers', async () => {
|
||||||
|
const { startRunViaSocket } = await import('../../packages/client/src/api/hermes/chat')
|
||||||
|
const body = { session_id: 'session-1', input: 'hello', profile: 'default', source: 'cli' as const }
|
||||||
|
|
||||||
|
startRunViaSocket(body, vi.fn(), vi.fn(), vi.fn())
|
||||||
|
const socket = socketState.sockets[0]
|
||||||
|
expect(socket.__listenerCount('connect')).toBe(1)
|
||||||
|
expect(socket.__listenerCount('disconnect')).toBe(1)
|
||||||
|
|
||||||
|
startRunViaSocket(body, vi.fn(), vi.fn(), vi.fn())
|
||||||
|
expect(socket.__listenerCount('connect')).toBe(1)
|
||||||
|
expect(socket.__listenerCount('disconnect')).toBe(1)
|
||||||
|
expect(socket.emit).toHaveBeenCalledWith('run', body)
|
||||||
|
})
|
||||||
|
})
|
||||||
Reference in New Issue
Block a user