fix(chat): isolate concurrent session events by refactoring WebSocket event handling (#365)
Refactored the WebSocket event handling mechanism to use global listeners with session-specific event routing instead of per-session listeners. This prevents event cross-talk when multiple chat sessions run concurrently. Key changes: - Client: Added sessionEventHandlers Map to route events to appropriate sessions - Client: Registered global listeners once per socket connection - Server: Extracted message processing logic into handleMessage method - Server: Improved Hermes session ID tracking with dedicated Map - Server: Added replaceByHermesSessionId for targeted message replacement Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -47,6 +47,228 @@ export interface RunEvent {
|
||||
// ============================
|
||||
|
||||
let chatRunSocket: Socket | null = null
|
||||
let globalListenersRegistered = false
|
||||
|
||||
/**
|
||||
* Session event handlers map
|
||||
* Maps session_id to event handling functions for isolating concurrent session streams
|
||||
*/
|
||||
const sessionEventHandlers = new Map<string, {
|
||||
onMessageDelta: (event: RunEvent) => void
|
||||
onReasoningDelta: (event: RunEvent) => void
|
||||
onThinkingDelta: (event: RunEvent) => void
|
||||
onReasoningAvailable: (event: RunEvent) => void
|
||||
onToolStarted: (event: RunEvent) => void
|
||||
onToolCompleted: (event: RunEvent) => void
|
||||
onRunStarted: (event: RunEvent) => void
|
||||
onRunCompleted: (event: RunEvent) => void
|
||||
onRunFailed: (event: RunEvent) => void
|
||||
onCompressionStarted: (event: RunEvent) => void
|
||||
onCompressionCompleted: (event: RunEvent) => void
|
||||
onUsageUpdated: (event: RunEvent) => void
|
||||
}>()
|
||||
|
||||
/**
|
||||
* Global message.delta event handler
|
||||
* Distributes events to appropriate session based on session_id
|
||||
*/
|
||||
function globalMessageDeltaHandler(event: RunEvent): void {
|
||||
const sid = event.session_id
|
||||
if (!sid) return
|
||||
|
||||
const handlers = sessionEventHandlers.get(sid)
|
||||
if (handlers?.onMessageDelta) {
|
||||
handlers.onMessageDelta(event)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Global reasoning.delta event handler
|
||||
*/
|
||||
function globalReasoningDeltaHandler(event: RunEvent): void {
|
||||
const sid = event.session_id
|
||||
if (!sid) return
|
||||
|
||||
const handlers = sessionEventHandlers.get(sid)
|
||||
if (handlers?.onReasoningDelta) {
|
||||
handlers.onReasoningDelta(event)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Global thinking.delta event handler (alias for reasoning.delta)
|
||||
*/
|
||||
function globalThinkingDeltaHandler(event: RunEvent): void {
|
||||
const sid = event.session_id
|
||||
if (!sid) return
|
||||
|
||||
const handlers = sessionEventHandlers.get(sid)
|
||||
if (handlers?.onThinkingDelta) {
|
||||
handlers.onThinkingDelta(event)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Global reasoning.available event handler
|
||||
*/
|
||||
function globalReasoningAvailableHandler(event: RunEvent): void {
|
||||
const sid = event.session_id
|
||||
if (!sid) return
|
||||
|
||||
const handlers = sessionEventHandlers.get(sid)
|
||||
if (handlers?.onReasoningAvailable) {
|
||||
handlers.onReasoningAvailable(event)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Global tool.started event handler
|
||||
*/
|
||||
function globalToolStartedHandler(event: RunEvent): void {
|
||||
const sid = event.session_id
|
||||
if (!sid) return
|
||||
|
||||
const handlers = sessionEventHandlers.get(sid)
|
||||
if (handlers?.onToolStarted) {
|
||||
handlers.onToolStarted(event)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Global tool.completed event handler
|
||||
*/
|
||||
function globalToolCompletedHandler(event: RunEvent): void {
|
||||
const sid = event.session_id
|
||||
if (!sid) return
|
||||
|
||||
const handlers = sessionEventHandlers.get(sid)
|
||||
if (handlers?.onToolCompleted) {
|
||||
handlers.onToolCompleted(event)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Global run.started event handler
|
||||
*/
|
||||
function globalRunStartedHandler(event: RunEvent): void {
|
||||
const sid = event.session_id
|
||||
if (!sid) return
|
||||
|
||||
const handlers = sessionEventHandlers.get(sid)
|
||||
if (handlers?.onRunStarted) {
|
||||
handlers.onRunStarted(event)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Global run.completed event handler
|
||||
*/
|
||||
function globalRunCompletedHandler(event: RunEvent): void {
|
||||
const sid = event.session_id
|
||||
if (!sid) return
|
||||
|
||||
const handlers = sessionEventHandlers.get(sid)
|
||||
if (handlers?.onRunCompleted) {
|
||||
handlers.onRunCompleted(event)
|
||||
}
|
||||
|
||||
// Auto-cleanup session handlers on completion
|
||||
sessionEventHandlers.delete(sid)
|
||||
}
|
||||
|
||||
/**
|
||||
* Global run.failed event handler
|
||||
*/
|
||||
function globalRunFailedHandler(event: RunEvent): void {
|
||||
const sid = event.session_id
|
||||
if (!sid) return
|
||||
|
||||
const handlers = sessionEventHandlers.get(sid)
|
||||
if (handlers?.onRunFailed) {
|
||||
handlers.onRunFailed(event)
|
||||
}
|
||||
|
||||
// Auto-cleanup session handlers on failure
|
||||
sessionEventHandlers.delete(sid)
|
||||
}
|
||||
|
||||
/**
|
||||
* Global compression.started event handler
|
||||
*/
|
||||
function globalCompressionStartedHandler(event: RunEvent): void {
|
||||
const sid = event.session_id
|
||||
if (!sid) return
|
||||
|
||||
const handlers = sessionEventHandlers.get(sid)
|
||||
if (handlers?.onCompressionStarted) {
|
||||
handlers.onCompressionStarted(event)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Global compression.completed event handler
|
||||
*/
|
||||
function globalCompressionCompletedHandler(event: RunEvent): void {
|
||||
const sid = event.session_id
|
||||
if (!sid) return
|
||||
|
||||
const handlers = sessionEventHandlers.get(sid)
|
||||
if (handlers?.onCompressionCompleted) {
|
||||
handlers.onCompressionCompleted(event)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Global usage.updated event handler
|
||||
*/
|
||||
function globalUsageUpdatedHandler(event: RunEvent): void {
|
||||
const sid = event.session_id
|
||||
if (!sid) return
|
||||
|
||||
const handlers = sessionEventHandlers.get(sid)
|
||||
if (handlers?.onUsageUpdated) {
|
||||
handlers.onUsageUpdated(event)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register event handlers for a session
|
||||
* @param sessionId - Session ID
|
||||
* @param handlers - Event handling functions
|
||||
* @returns Cleanup function to unregister handlers
|
||||
*/
|
||||
export function registerSessionHandlers(
|
||||
sessionId: string,
|
||||
handlers: {
|
||||
onMessageDelta: (event: RunEvent) => void
|
||||
onReasoningDelta: (event: RunEvent) => void
|
||||
onThinkingDelta: (event: RunEvent) => void
|
||||
onReasoningAvailable: (event: RunEvent) => void
|
||||
onToolStarted: (event: RunEvent) => void
|
||||
onToolCompleted: (event: RunEvent) => void
|
||||
onRunStarted: (event: RunEvent) => void
|
||||
onRunCompleted: (event: RunEvent) => void
|
||||
onRunFailed: (event: RunEvent) => void
|
||||
onCompressionStarted: (event: RunEvent) => void
|
||||
onCompressionCompleted: (event: RunEvent) => void
|
||||
onUsageUpdated: (event: RunEvent) => void
|
||||
}
|
||||
): () => void {
|
||||
sessionEventHandlers.set(sessionId, handlers)
|
||||
|
||||
// Return cleanup function
|
||||
return () => {
|
||||
sessionEventHandlers.delete(sessionId)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Unregister event handlers for a session
|
||||
* @param sessionId - Session ID
|
||||
*/
|
||||
export function unregisterSessionHandlers(sessionId: string): void {
|
||||
sessionEventHandlers.delete(sessionId)
|
||||
}
|
||||
|
||||
export function getChatRunSocket(): Socket | null {
|
||||
return chatRunSocket
|
||||
@@ -59,6 +281,7 @@ export function connectChatRun(): Socket {
|
||||
if (chatRunSocket) {
|
||||
chatRunSocket.removeAllListeners()
|
||||
chatRunSocket.disconnect()
|
||||
globalListenersRegistered = false
|
||||
}
|
||||
|
||||
const baseUrl = getBaseUrlValue()
|
||||
@@ -75,6 +298,33 @@ export function connectChatRun(): Socket {
|
||||
reconnectionDelayMax: 10000,
|
||||
})
|
||||
|
||||
// Register global listeners only once per socket connection
|
||||
if (!globalListenersRegistered) {
|
||||
// Message events
|
||||
chatRunSocket.on('message.delta', globalMessageDeltaHandler)
|
||||
chatRunSocket.on('reasoning.delta', globalReasoningDeltaHandler)
|
||||
chatRunSocket.on('thinking.delta', globalThinkingDeltaHandler)
|
||||
chatRunSocket.on('reasoning.available', globalReasoningAvailableHandler)
|
||||
|
||||
// Tool events
|
||||
chatRunSocket.on('tool.started', globalToolStartedHandler)
|
||||
chatRunSocket.on('tool.completed', globalToolCompletedHandler)
|
||||
|
||||
// Run lifecycle events
|
||||
chatRunSocket.on('run.started', globalRunStartedHandler)
|
||||
chatRunSocket.on('run.failed', globalRunFailedHandler)
|
||||
chatRunSocket.on('run.completed', globalRunCompletedHandler)
|
||||
|
||||
// Compression events
|
||||
chatRunSocket.on('compression.started', globalCompressionStartedHandler)
|
||||
chatRunSocket.on('compression.completed', globalCompressionCompletedHandler)
|
||||
|
||||
// Usage events
|
||||
chatRunSocket.on('usage.updated', globalUsageUpdatedHandler)
|
||||
|
||||
globalListenersRegistered = true
|
||||
}
|
||||
|
||||
return chatRunSocket
|
||||
}
|
||||
|
||||
@@ -82,6 +332,8 @@ export function disconnectChatRun(): void {
|
||||
if (chatRunSocket) {
|
||||
chatRunSocket.disconnect()
|
||||
chatRunSocket = null
|
||||
globalListenersRegistered = false
|
||||
sessionEventHandlers.clear()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -111,92 +363,83 @@ export function startRunViaSocket(
|
||||
onError: (err: Error) => void,
|
||||
onStarted?: (runId: string) => void,
|
||||
): { abort: () => void } {
|
||||
const socket = connectChatRun()
|
||||
const sid = body.session_id
|
||||
if (!sid) {
|
||||
throw new Error('session_id is required for startRunViaSocket')
|
||||
}
|
||||
|
||||
let closed = false
|
||||
|
||||
function cleanup() {
|
||||
if (closed) return
|
||||
closed = true
|
||||
socket.off('run.started', onRunStarted)
|
||||
socket.off('run.failed', onRunFailed)
|
||||
socket.off('message.delta', onMessageDelta)
|
||||
socket.off('reasoning.delta', onReasoningDelta)
|
||||
socket.off('thinking.delta', onReasoningDelta)
|
||||
socket.off('reasoning.available', onReasoningAvailable)
|
||||
socket.off('tool.started', onToolStarted)
|
||||
socket.off('tool.completed', onToolCompleted)
|
||||
socket.off('run.completed', onRunCompleted)
|
||||
socket.off('compression.started', onCompressionStarted)
|
||||
socket.off('compression.completed', onCompressionCompleted)
|
||||
socket.off('usage.updated', onUsageUpdated)
|
||||
// Define event handlers for this session
|
||||
const handlers = {
|
||||
onMessageDelta: (evt: RunEvent) => {
|
||||
if (closed) return
|
||||
onEvent(evt)
|
||||
},
|
||||
onReasoningDelta: (evt: RunEvent) => {
|
||||
if (closed) return
|
||||
onEvent(evt)
|
||||
},
|
||||
onThinkingDelta: (evt: RunEvent) => {
|
||||
if (closed) return
|
||||
onEvent(evt)
|
||||
},
|
||||
onReasoningAvailable: (evt: RunEvent) => {
|
||||
if (closed) return
|
||||
onEvent(evt)
|
||||
},
|
||||
onToolStarted: (evt: RunEvent) => {
|
||||
if (closed) return
|
||||
onEvent(evt)
|
||||
},
|
||||
onToolCompleted: (evt: RunEvent) => {
|
||||
if (closed) return
|
||||
onEvent(evt)
|
||||
},
|
||||
onRunStarted: (evt: RunEvent) => {
|
||||
if (closed) return
|
||||
onEvent(evt)
|
||||
onStarted?.(evt.run_id || '')
|
||||
},
|
||||
onRunCompleted: (evt: RunEvent) => {
|
||||
if (closed) return
|
||||
onEvent(evt)
|
||||
closed = true
|
||||
onDone()
|
||||
},
|
||||
onRunFailed: (evt: RunEvent) => {
|
||||
if (closed) return
|
||||
onEvent(evt)
|
||||
closed = true
|
||||
onError(new Error(evt.error || 'Run failed'))
|
||||
},
|
||||
onCompressionStarted: (evt: RunEvent) => {
|
||||
if (closed) return
|
||||
onEvent(evt)
|
||||
},
|
||||
onCompressionCompleted: (evt: RunEvent) => {
|
||||
if (closed) return
|
||||
onEvent(evt)
|
||||
},
|
||||
onUsageUpdated: (evt: RunEvent) => {
|
||||
if (closed) return
|
||||
onEvent(evt)
|
||||
},
|
||||
}
|
||||
|
||||
// All event handlers share the same cleanup logic.
|
||||
// IMPORTANT: The Socket.IO connection is shared across all in-flight runs
|
||||
// (single namespace, single socket). When two sessions run concurrently,
|
||||
// every `startRunViaSocket()` call registers its own `message.delta` /
|
||||
// `tool.*` / `run.*` listeners on the SAME socket, so each event would
|
||||
// fan out to every listener and corrupt the wrong session's transcript.
|
||||
// The server tags every payload with `session_id`; we filter here so each
|
||||
// run only sees its own events. We also accept untagged events (for
|
||||
// backwards compatibility) when no session_id was provided in the request.
|
||||
const expectedSid = body.session_id
|
||||
const handleEvent = (event: RunEvent) => {
|
||||
if (closed) return
|
||||
// Filter events by session_id to prevent cross-session contamination
|
||||
if (expectedSid && event.session_id && event.session_id !== expectedSid) {
|
||||
return
|
||||
}
|
||||
try {
|
||||
onEvent(event)
|
||||
} finally {
|
||||
if (event.event === 'run.completed' || event.event === 'run.failed') {
|
||||
cleanup()
|
||||
onDone()
|
||||
}
|
||||
}
|
||||
}
|
||||
// Register handlers in the global session map
|
||||
sessionEventHandlers.set(sid, handlers)
|
||||
|
||||
function onRunStarted(data: RunEvent) {
|
||||
handleEvent(data)
|
||||
onStarted?.(data.run_id || '')
|
||||
}
|
||||
function onRunFailed(data: RunEvent) {
|
||||
handleEvent(data)
|
||||
onError?.(new Error(data.error || 'Run failed'))
|
||||
}
|
||||
function onMessageDelta(data: RunEvent) { handleEvent(data) }
|
||||
function onReasoningDelta(data: RunEvent) { handleEvent(data) }
|
||||
function onThinkingDelta(data: RunEvent) { handleEvent(data) }
|
||||
function onReasoningAvailable(data: RunEvent) { handleEvent(data) }
|
||||
function onToolStarted(data: RunEvent) { handleEvent(data) }
|
||||
function onToolCompleted(data: RunEvent) { handleEvent(data) }
|
||||
function onRunCompleted(data: RunEvent) { handleEvent(data) }
|
||||
function onCompressionStarted(data: RunEvent) { handleEvent(data) }
|
||||
function onCompressionCompleted(data: RunEvent) { handleEvent(data) }
|
||||
function onUsageUpdated(data: RunEvent) { handleEvent(data) }
|
||||
|
||||
socket.on('run.started', onRunStarted)
|
||||
socket.on('run.failed', onRunFailed)
|
||||
socket.on('message.delta', onMessageDelta)
|
||||
socket.on('reasoning.delta', onReasoningDelta)
|
||||
socket.on('thinking.delta', onThinkingDelta)
|
||||
socket.on('reasoning.available', onReasoningAvailable)
|
||||
socket.on('tool.started', onToolStarted)
|
||||
socket.on('tool.completed', onToolCompleted)
|
||||
socket.on('run.completed', onRunCompleted)
|
||||
socket.on('compression.started', onCompressionStarted)
|
||||
socket.on('compression.completed', onCompressionCompleted)
|
||||
socket.on('usage.updated', onUsageUpdated)
|
||||
|
||||
// Emit run:start with ack callback to get run_id
|
||||
// Emit run request
|
||||
const socket = connectChatRun()
|
||||
socket.emit('run', body)
|
||||
|
||||
return {
|
||||
abort: () => {
|
||||
if (!closed) {
|
||||
socket.emit('abort', { session_id: body.session_id })
|
||||
cleanup()
|
||||
closed = true
|
||||
sessionEventHandlers.delete(sid)
|
||||
socket.emit('abort', { session_id: sid })
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user