From f15deef3fc5a1c0a74d048e8f743a70318b82d93 Mon Sep 17 00:00:00 2001 From: ekko <152005280+EKKOLearnAI@users.noreply.github.com> Date: Fri, 1 May 2026 08:13:55 +0800 Subject: [PATCH] fix(chat): isolate concurrent session events by refactoring WebSocket event handling (#365) Refactored the WebSocket event handling mechanism to use global listeners with session-specific event routing instead of per-session listeners. This prevents event cross-talk when multiple chat sessions run concurrently. Key changes: - Client: Added sessionEventHandlers Map to route events to appropriate sessions - Client: Registered global listeners once per socket connection - Server: Extracted message processing logic into handleMessage method - Server: Improved Hermes session ID tracking with dedicated Map - Server: Added replaceByHermesSessionId for targeted message replacement Co-authored-by: Claude Sonnet 4.6 --- packages/client/src/api/hermes/chat.ts | 395 +++++++++++--- packages/client/src/stores/hermes/chat.ts | 61 +-- .../server/src/db/hermes/session-store.ts | 1 + .../src/services/hermes/chat-run-socket.ts | 493 ++++++++---------- 4 files changed, 560 insertions(+), 390 deletions(-) diff --git a/packages/client/src/api/hermes/chat.ts b/packages/client/src/api/hermes/chat.ts index 1ce6bd6..444a036 100644 --- a/packages/client/src/api/hermes/chat.ts +++ b/packages/client/src/api/hermes/chat.ts @@ -47,6 +47,228 @@ export interface RunEvent { // ============================ let chatRunSocket: Socket | null = null +let globalListenersRegistered = false + +/** + * Session event handlers map + * Maps session_id to event handling functions for isolating concurrent session streams + */ +const sessionEventHandlers = new Map void + onReasoningDelta: (event: RunEvent) => void + onThinkingDelta: (event: RunEvent) => void + onReasoningAvailable: (event: RunEvent) => void + onToolStarted: (event: RunEvent) => void + onToolCompleted: (event: RunEvent) => void + onRunStarted: (event: RunEvent) => void + onRunCompleted: (event: RunEvent) => void + onRunFailed: (event: RunEvent) => void + onCompressionStarted: (event: RunEvent) => void + onCompressionCompleted: (event: RunEvent) => void + onUsageUpdated: (event: RunEvent) => void +}>() + +/** + * Global message.delta event handler + * Distributes events to appropriate session based on session_id + */ +function globalMessageDeltaHandler(event: RunEvent): void { + const sid = event.session_id + if (!sid) return + + const handlers = sessionEventHandlers.get(sid) + if (handlers?.onMessageDelta) { + handlers.onMessageDelta(event) + } +} + +/** + * Global reasoning.delta event handler + */ +function globalReasoningDeltaHandler(event: RunEvent): void { + const sid = event.session_id + if (!sid) return + + const handlers = sessionEventHandlers.get(sid) + if (handlers?.onReasoningDelta) { + handlers.onReasoningDelta(event) + } +} + +/** + * Global thinking.delta event handler (alias for reasoning.delta) + */ +function globalThinkingDeltaHandler(event: RunEvent): void { + const sid = event.session_id + if (!sid) return + + const handlers = sessionEventHandlers.get(sid) + if (handlers?.onThinkingDelta) { + handlers.onThinkingDelta(event) + } +} + +/** + * Global reasoning.available event handler + */ +function globalReasoningAvailableHandler(event: RunEvent): void { + const sid = event.session_id + if (!sid) return + + const handlers = sessionEventHandlers.get(sid) + if (handlers?.onReasoningAvailable) { + handlers.onReasoningAvailable(event) + } +} + +/** + * Global tool.started event handler + */ +function globalToolStartedHandler(event: RunEvent): void { + const sid = event.session_id + if (!sid) return + + const handlers = sessionEventHandlers.get(sid) + if (handlers?.onToolStarted) { + handlers.onToolStarted(event) + } +} + +/** + * Global tool.completed event handler + */ +function globalToolCompletedHandler(event: RunEvent): void { + const sid = event.session_id + if (!sid) return + + const handlers = sessionEventHandlers.get(sid) + if (handlers?.onToolCompleted) { + handlers.onToolCompleted(event) + } +} + +/** + * Global run.started event handler + */ +function globalRunStartedHandler(event: RunEvent): void { + const sid = event.session_id + if (!sid) return + + const handlers = sessionEventHandlers.get(sid) + if (handlers?.onRunStarted) { + handlers.onRunStarted(event) + } +} + +/** + * Global run.completed event handler + */ +function globalRunCompletedHandler(event: RunEvent): void { + const sid = event.session_id + if (!sid) return + + const handlers = sessionEventHandlers.get(sid) + if (handlers?.onRunCompleted) { + handlers.onRunCompleted(event) + } + + // Auto-cleanup session handlers on completion + sessionEventHandlers.delete(sid) +} + +/** + * Global run.failed event handler + */ +function globalRunFailedHandler(event: RunEvent): void { + const sid = event.session_id + if (!sid) return + + const handlers = sessionEventHandlers.get(sid) + if (handlers?.onRunFailed) { + handlers.onRunFailed(event) + } + + // Auto-cleanup session handlers on failure + sessionEventHandlers.delete(sid) +} + +/** + * Global compression.started event handler + */ +function globalCompressionStartedHandler(event: RunEvent): void { + const sid = event.session_id + if (!sid) return + + const handlers = sessionEventHandlers.get(sid) + if (handlers?.onCompressionStarted) { + handlers.onCompressionStarted(event) + } +} + +/** + * Global compression.completed event handler + */ +function globalCompressionCompletedHandler(event: RunEvent): void { + const sid = event.session_id + if (!sid) return + + const handlers = sessionEventHandlers.get(sid) + if (handlers?.onCompressionCompleted) { + handlers.onCompressionCompleted(event) + } +} + +/** + * Global usage.updated event handler + */ +function globalUsageUpdatedHandler(event: RunEvent): void { + const sid = event.session_id + if (!sid) return + + const handlers = sessionEventHandlers.get(sid) + if (handlers?.onUsageUpdated) { + handlers.onUsageUpdated(event) + } +} + +/** + * Register event handlers for a session + * @param sessionId - Session ID + * @param handlers - Event handling functions + * @returns Cleanup function to unregister handlers + */ +export function registerSessionHandlers( + sessionId: string, + handlers: { + onMessageDelta: (event: RunEvent) => void + onReasoningDelta: (event: RunEvent) => void + onThinkingDelta: (event: RunEvent) => void + onReasoningAvailable: (event: RunEvent) => void + onToolStarted: (event: RunEvent) => void + onToolCompleted: (event: RunEvent) => void + onRunStarted: (event: RunEvent) => void + onRunCompleted: (event: RunEvent) => void + onRunFailed: (event: RunEvent) => void + onCompressionStarted: (event: RunEvent) => void + onCompressionCompleted: (event: RunEvent) => void + onUsageUpdated: (event: RunEvent) => void + } +): () => void { + sessionEventHandlers.set(sessionId, handlers) + + // Return cleanup function + return () => { + sessionEventHandlers.delete(sessionId) + } +} + +/** + * Unregister event handlers for a session + * @param sessionId - Session ID + */ +export function unregisterSessionHandlers(sessionId: string): void { + sessionEventHandlers.delete(sessionId) +} export function getChatRunSocket(): Socket | null { return chatRunSocket @@ -59,6 +281,7 @@ export function connectChatRun(): Socket { if (chatRunSocket) { chatRunSocket.removeAllListeners() chatRunSocket.disconnect() + globalListenersRegistered = false } const baseUrl = getBaseUrlValue() @@ -75,6 +298,33 @@ export function connectChatRun(): Socket { reconnectionDelayMax: 10000, }) + // Register global listeners only once per socket connection + if (!globalListenersRegistered) { + // Message events + chatRunSocket.on('message.delta', globalMessageDeltaHandler) + chatRunSocket.on('reasoning.delta', globalReasoningDeltaHandler) + chatRunSocket.on('thinking.delta', globalThinkingDeltaHandler) + chatRunSocket.on('reasoning.available', globalReasoningAvailableHandler) + + // Tool events + chatRunSocket.on('tool.started', globalToolStartedHandler) + chatRunSocket.on('tool.completed', globalToolCompletedHandler) + + // Run lifecycle events + chatRunSocket.on('run.started', globalRunStartedHandler) + chatRunSocket.on('run.failed', globalRunFailedHandler) + chatRunSocket.on('run.completed', globalRunCompletedHandler) + + // Compression events + chatRunSocket.on('compression.started', globalCompressionStartedHandler) + chatRunSocket.on('compression.completed', globalCompressionCompletedHandler) + + // Usage events + chatRunSocket.on('usage.updated', globalUsageUpdatedHandler) + + globalListenersRegistered = true + } + return chatRunSocket } @@ -82,6 +332,8 @@ export function disconnectChatRun(): void { if (chatRunSocket) { chatRunSocket.disconnect() chatRunSocket = null + globalListenersRegistered = false + sessionEventHandlers.clear() } } @@ -111,92 +363,83 @@ export function startRunViaSocket( onError: (err: Error) => void, onStarted?: (runId: string) => void, ): { abort: () => void } { - const socket = connectChatRun() + const sid = body.session_id + if (!sid) { + throw new Error('session_id is required for startRunViaSocket') + } + let closed = false - function cleanup() { - if (closed) return - closed = true - socket.off('run.started', onRunStarted) - socket.off('run.failed', onRunFailed) - socket.off('message.delta', onMessageDelta) - socket.off('reasoning.delta', onReasoningDelta) - socket.off('thinking.delta', onReasoningDelta) - socket.off('reasoning.available', onReasoningAvailable) - socket.off('tool.started', onToolStarted) - socket.off('tool.completed', onToolCompleted) - socket.off('run.completed', onRunCompleted) - socket.off('compression.started', onCompressionStarted) - socket.off('compression.completed', onCompressionCompleted) - socket.off('usage.updated', onUsageUpdated) + // Define event handlers for this session + const handlers = { + onMessageDelta: (evt: RunEvent) => { + if (closed) return + onEvent(evt) + }, + onReasoningDelta: (evt: RunEvent) => { + if (closed) return + onEvent(evt) + }, + onThinkingDelta: (evt: RunEvent) => { + if (closed) return + onEvent(evt) + }, + onReasoningAvailable: (evt: RunEvent) => { + if (closed) return + onEvent(evt) + }, + onToolStarted: (evt: RunEvent) => { + if (closed) return + onEvent(evt) + }, + onToolCompleted: (evt: RunEvent) => { + if (closed) return + onEvent(evt) + }, + onRunStarted: (evt: RunEvent) => { + if (closed) return + onEvent(evt) + onStarted?.(evt.run_id || '') + }, + onRunCompleted: (evt: RunEvent) => { + if (closed) return + onEvent(evt) + closed = true + onDone() + }, + onRunFailed: (evt: RunEvent) => { + if (closed) return + onEvent(evt) + closed = true + onError(new Error(evt.error || 'Run failed')) + }, + onCompressionStarted: (evt: RunEvent) => { + if (closed) return + onEvent(evt) + }, + onCompressionCompleted: (evt: RunEvent) => { + if (closed) return + onEvent(evt) + }, + onUsageUpdated: (evt: RunEvent) => { + if (closed) return + onEvent(evt) + }, } - // All event handlers share the same cleanup logic. - // IMPORTANT: The Socket.IO connection is shared across all in-flight runs - // (single namespace, single socket). When two sessions run concurrently, - // every `startRunViaSocket()` call registers its own `message.delta` / - // `tool.*` / `run.*` listeners on the SAME socket, so each event would - // fan out to every listener and corrupt the wrong session's transcript. - // The server tags every payload with `session_id`; we filter here so each - // run only sees its own events. We also accept untagged events (for - // backwards compatibility) when no session_id was provided in the request. - const expectedSid = body.session_id - const handleEvent = (event: RunEvent) => { - if (closed) return - // Filter events by session_id to prevent cross-session contamination - if (expectedSid && event.session_id && event.session_id !== expectedSid) { - return - } - try { - onEvent(event) - } finally { - if (event.event === 'run.completed' || event.event === 'run.failed') { - cleanup() - onDone() - } - } - } + // Register handlers in the global session map + sessionEventHandlers.set(sid, handlers) - function onRunStarted(data: RunEvent) { - handleEvent(data) - onStarted?.(data.run_id || '') - } - function onRunFailed(data: RunEvent) { - handleEvent(data) - onError?.(new Error(data.error || 'Run failed')) - } - function onMessageDelta(data: RunEvent) { handleEvent(data) } - function onReasoningDelta(data: RunEvent) { handleEvent(data) } - function onThinkingDelta(data: RunEvent) { handleEvent(data) } - function onReasoningAvailable(data: RunEvent) { handleEvent(data) } - function onToolStarted(data: RunEvent) { handleEvent(data) } - function onToolCompleted(data: RunEvent) { handleEvent(data) } - function onRunCompleted(data: RunEvent) { handleEvent(data) } - function onCompressionStarted(data: RunEvent) { handleEvent(data) } - function onCompressionCompleted(data: RunEvent) { handleEvent(data) } - function onUsageUpdated(data: RunEvent) { handleEvent(data) } - - socket.on('run.started', onRunStarted) - socket.on('run.failed', onRunFailed) - socket.on('message.delta', onMessageDelta) - socket.on('reasoning.delta', onReasoningDelta) - socket.on('thinking.delta', onThinkingDelta) - socket.on('reasoning.available', onReasoningAvailable) - socket.on('tool.started', onToolStarted) - socket.on('tool.completed', onToolCompleted) - socket.on('run.completed', onRunCompleted) - socket.on('compression.started', onCompressionStarted) - socket.on('compression.completed', onCompressionCompleted) - socket.on('usage.updated', onUsageUpdated) - - // Emit run:start with ack callback to get run_id + // Emit run request + const socket = connectChatRun() socket.emit('run', body) return { abort: () => { if (!closed) { - socket.emit('abort', { session_id: body.session_id }) - cleanup() + closed = true + sessionEventHandlers.delete(sid) + socket.emit('abort', { session_id: sid }) } }, } diff --git a/packages/client/src/stores/hermes/chat.ts b/packages/client/src/stores/hermes/chat.ts index 9dd5ef1..6f85dd5 100644 --- a/packages/client/src/stores/hermes/chat.ts +++ b/packages/client/src/stores/hermes/chat.ts @@ -1,4 +1,4 @@ -import { startRunViaSocket, connectChatRun, resumeSession, type RunEvent } from '@/api/hermes/chat' +import { startRunViaSocket, resumeSession, registerSessionHandlers, unregisterSessionHandlers, type RunEvent } from '@/api/hermes/chat' import { deleteSession as deleteSessionApi, fetchSession, fetchSessions, type HermesMessage, type SessionSummary } from '@/api/hermes/sessions' import { getApiKey } from '@/api/client' import { defineStore } from 'pinia' @@ -582,6 +582,8 @@ export const useChatStore = defineStore('chat', () => { } addMessage(sid, userMsg) + + updateSessionTitle(sid) try { @@ -855,6 +857,7 @@ export const useChatStore = defineStore('chat', () => { timestamp: Date.now(), }) } + cleanup() updateSessionTitle(sid) // the in-flight marker. If the browser is reloading right now @@ -962,7 +965,6 @@ export const useChatStore = defineStore('chat', () => { // Only set up listeners if there's an actual in-flight run if (!readInFlight(sid)) return - const socket = connectChatRun() let closed = false let runProducedAssistantText = false let runHadToolActivity = false @@ -970,19 +972,10 @@ export const useChatStore = defineStore('chat', () => { const cleanup = () => { if (closed) return closed = true - socket.off('run.started', onRunStarted) - socket.off('run.failed', onRunFailed) - socket.off('message.delta', onMessageDelta) - socket.off('reasoning.delta', onReasoningDelta) - socket.off('thinking.delta', onThinkingDelta) - socket.off('reasoning.available', onReasoningAvailable) - socket.off('tool.started', onToolStarted) - socket.off('tool.completed', onToolCompleted) - socket.off('run.completed', onRunCompleted) - socket.off('compression.started', onCompressionStarted) - socket.off('compression.completed', onCompressionCompleted) streamStates.value.delete(sid) serverWorking.value.delete(sid) + // Unregister from global session handlers + unregisterSessionHandlers(sid) } // Shared event handler — filters by session_id tag @@ -1172,6 +1165,8 @@ export const useChatStore = defineStore('chat', () => { timestamp: Date.now(), }) } + + cleanup() updateSessionTitle(sid) @@ -1218,33 +1213,25 @@ export const useChatStore = defineStore('chat', () => { } } - function onRunStarted(data: RunEvent) { handleEvent(data) } - function onRunFailed(data: RunEvent) { handleEvent(data) } - function onMessageDelta(data: RunEvent) { handleEvent(data) } - function onReasoningDelta(data: RunEvent) { handleEvent(data) } - function onThinkingDelta(data: RunEvent) { handleEvent(data) } - function onReasoningAvailable(data: RunEvent) { handleEvent(data) } - function onToolStarted(data: RunEvent) { handleEvent(data) } - function onToolCompleted(data: RunEvent) { handleEvent(data) } - function onRunCompleted(data: RunEvent) { handleEvent(data) } - function onCompressionStarted(data: RunEvent) { handleEvent(data) } - function onCompressionCompleted(data: RunEvent) { handleEvent(data) } - - socket.on('run.started', onRunStarted) - socket.on('run.failed', onRunFailed) - socket.on('message.delta', onMessageDelta) - socket.on('reasoning.delta', onReasoningDelta) - socket.on('thinking.delta', onThinkingDelta) - socket.on('reasoning.available', onReasoningAvailable) - socket.on('tool.started', onToolStarted) - socket.on('tool.completed', onToolCompleted) - socket.on('run.completed', onRunCompleted) - socket.on('compression.started', onCompressionStarted) - socket.on('compression.completed', onCompressionCompleted) + // Register handlers in global session map + registerSessionHandlers(sid, { + onMessageDelta: (evt) => handleEvent(evt), + onReasoningDelta: (evt) => handleEvent(evt), + onThinkingDelta: (evt) => handleEvent(evt), + onReasoningAvailable: (evt) => handleEvent(evt), + onToolStarted: (evt) => handleEvent(evt), + onToolCompleted: (evt) => handleEvent(evt), + onRunStarted: (evt) => handleEvent(evt), + onRunCompleted: (evt) => handleEvent(evt), + onRunFailed: (evt) => handleEvent(evt), + onCompressionStarted: (evt) => handleEvent(evt), + onCompressionCompleted: (evt) => handleEvent(evt), + onUsageUpdated: (evt) => handleEvent(evt), + }) // No need to emit resume here — switchSession already did it. // Server already joined room and replayed events. - // Just set up listeners for ongoing streaming events. + // Just set up handlers for ongoing streaming events. // Mark as streaming so UI shows the indicator streamStates.value.set(sid, { abort: cleanup }) diff --git a/packages/server/src/db/hermes/session-store.ts b/packages/server/src/db/hermes/session-store.ts index eba3019..fff6bd8 100644 --- a/packages/server/src/db/hermes/session-store.ts +++ b/packages/server/src/db/hermes/session-store.ts @@ -421,6 +421,7 @@ export function updateSessionStats(id: string): void { last_active = COALESCE((SELECT MAX(timestamp) FROM ${MESSAGES_TABLE} WHERE session_id = ?), started_at) WHERE id = ?`, ).run(id, id, id) + console.log(`Updated session ${id} stats`) } export function getSessionDetailPaginated( diff --git a/packages/server/src/services/hermes/chat-run-socket.ts b/packages/server/src/services/hermes/chat-run-socket.ts index 3cb6f58..58f89b3 100644 --- a/packages/server/src/services/hermes/chat-run-socket.ts +++ b/packages/server/src/services/hermes/chat-run-socket.ts @@ -111,7 +111,7 @@ function convertToAnthropicFormat(messages: any[]): any[] { // Regular user message if (role === 'user') { if (typeof content === 'string') { - result.push({ role: 'user', content: content || '(empty message)' }) + result.push({ role: 'user', content: content || '' }) } else if (Array.isArray(content)) { result.push({ role: 'user', content }) } @@ -129,6 +129,7 @@ interface SessionMessage { session_id: string role: string content: string + hermesSessionId?: string tool_call_id?: string | null tool_calls?: any[] | null tool_name?: string | null @@ -147,8 +148,6 @@ interface SessionState { events: Array<{ event: string; data: any }> abortController?: AbortController runId?: string - /** Ephemeral session ID used for Hermes (one per run) */ - hermesSessionId?: string profile?: string inputTokens?: number outputTokens?: number @@ -161,6 +160,7 @@ export class ChatRunSocket { private gatewayManager: any /** sessionId → session state (messages, working status, events, run tracking) */ private sessionMap = new Map() + private hermesSessionIds = new Map() constructor(io: Server, gatewayManager: any) { this.nsp = io.of('/chat-run') @@ -215,279 +215,191 @@ export class ChatRunSocket { } }) } + private handleMessage(messages: SessionMessage[], sid: string): any[] { + let _messages = [] + try { + _messages = messages + .filter(m => (m.role === 'user' || m.role === 'assistant' || m.role === 'tool') && m.content !== undefined) + .map((m, idx, arr) => { + const msg: any = { + id: m.id, + session_id: sid, + role: m.role, + content: m.content || '', + reasoning: m.reasoning || '', + timestamp: m.timestamp, + } + // Convert Anthropic format content to OpenAI format + // Check if content is a stringified array (Hermes Gateway behavior) - only for assistant messages + if (m.role === 'assistant' && typeof m.content === 'string') { + // Handle double-serialized content: "[{'type': 'text', ...}]" -> "[{'type': 'text', ...}]" + let contentToParse = m.content + const trimmed = m.content.trim() + if (trimmed.startsWith('"') && trimmed.endsWith('"') && trimmed.length >= 2) { + contentToParse = trimmed.slice(1, -1) + logger.info('[chat-run-socket] resume message %s: double-serialized, removed outer quotes', m.id) + } + + if (contentToParse.startsWith('[') && contentToParse.endsWith(']')) { + try { + // Parse stringified Python-like array to JSON + const parsedContent = JSON.parse( + contentToParse + .replace(/'/g, '"') // Python single quotes to JSON double quotes + .replace(/True/g, 'true') + .replace(/False/g, 'false') + .replace(/None/g, 'null') + ) + if (Array.isArray(parsedContent)) { + const textBlocks: string[] = [] + const toolCalls: any[] = [] + let reasoningContent: string | null = null + + for (const block of parsedContent) { + if (block.type === 'thinking') { + reasoningContent = block.thinking + } else if (block.type === 'text') { + textBlocks.push(block.text) + } else if (block.type === 'tool_use') { + toolCalls.push({ + id: block.id, + type: 'function', + function: { + name: block.name, + arguments: JSON.stringify(block.input) + } + }) + } + } + + msg.content = textBlocks.join('') || '' + if (toolCalls.length > 0) { + msg.tool_calls = toolCalls + } + if (reasoningContent) { + msg.reasoning = reasoningContent + } + } + } catch (e) { + // Parsing failed, keep original content + msg.content = m.content + } + } + } else if (Array.isArray(m.content)) { + const textBlocks: string[] = [] + const toolCalls: any[] = [] + let reasoningContent: string | null = null + + for (const block of m.content) { + if (block.type === 'thinking') { + reasoningContent = block.thinking + } else if (block.type === 'text') { + textBlocks.push(block.text) + } else if (block.type === 'tool_use') { + toolCalls.push({ + id: block.id, + type: 'function', + function: { + name: block.name, + arguments: JSON.stringify(block.input) + } + }) + } + } + + msg.content = textBlocks.join('') || '' + if (toolCalls.length > 0) { + msg.tool_calls = toolCalls + } + if (reasoningContent) { + msg.reasoning = reasoningContent + } + } + + if (m.tool_calls?.length) { + // Filter out tool_calls with empty/invalid id and remove internal fields + const cleanedToolCalls = m.tool_calls + .filter((tc: any) => tc.id && tc.id.length > 0) + .map((tc: any) => ({ + id: tc.id, + type: tc.type, + function: tc.function + })) + if (cleanedToolCalls.length > 0) { + msg.tool_calls = cleanedToolCalls + } + } + + // For tool messages, ensure tool_call_id exists + if (m.role === 'tool') { + let callId = m.tool_call_id + if (!callId || callId.length === 0) { + // Try to reconstruct tool_call_id from previous assistant message + const prevMsg = arr[idx - 1] + if (prevMsg?.role === 'assistant' && prevMsg.tool_calls?.length) { + // Find matching tool_call by tool_name + const tc = prevMsg.tool_calls.find((t: any) => t.function?.name === m.tool_name) + if (tc?.id) { + callId = tc.id + } + } + } + // Skip tool message if no valid tool_call_id + if (!callId || callId.length === 0) { + return null + } + msg.tool_call_id = callId + } + + if (m.tool_name) msg.tool_name = m.tool_name + if (m.reasoning) msg.reasoning = m.reasoning + return msg + }) + .filter(m => m !== null) + } catch (error) { + + } + return _messages + } private async resumeSession(socket: Socket, sid: string) { let state = this.sessionMap.get(sid) - - try { - const detail = useLocalSessionStore() - ? getSessionDetailPaginated(sid) - : await getSessionDetailFromDb(sid) - const messages = detail?.messages?.length - ? detail.messages - .filter(m => (m.role === 'user' || m.role === 'assistant' || m.role === 'tool') && m.content !== undefined) - .map((m, idx, arr) => { - const msg: any = { - id: m.id, - session_id: sid, - role: m.role, - content: m.content || '', - reasoning: m.reasoning || '', - timestamp: m.timestamp, - } - // Convert Anthropic format content to OpenAI format - // Check if content is a stringified array (Hermes Gateway behavior) - only for assistant messages - if (m.role === 'assistant' && typeof m.content === 'string') { - // Handle double-serialized content: "[{'type': 'text', ...}]" -> "[{'type': 'text', ...}]" - let contentToParse = m.content - const trimmed = m.content.trim() - if (trimmed.startsWith('"') && trimmed.endsWith('"') && trimmed.length >= 2) { - contentToParse = trimmed.slice(1, -1) - logger.info('[chat-run-socket] resume message %s: double-serialized, removed outer quotes', m.id) - } - - if (contentToParse.startsWith('[') && contentToParse.endsWith(']')) { - try { - // Parse stringified Python-like array to JSON - const parsedContent = JSON.parse( - contentToParse - .replace(/'/g, '"') // Python single quotes to JSON double quotes - .replace(/True/g, 'true') - .replace(/False/g, 'false') - .replace(/None/g, 'null') - ) - if (Array.isArray(parsedContent)) { - const textBlocks: string[] = [] - const toolCalls: any[] = [] - let reasoningContent: string | null = null - - for (const block of parsedContent) { - if (block.type === 'thinking') { - reasoningContent = block.thinking - } else if (block.type === 'text') { - textBlocks.push(block.text) - } else if (block.type === 'tool_use') { - toolCalls.push({ - id: block.id, - type: 'function', - function: { - name: block.name, - arguments: JSON.stringify(block.input) - } - }) - } - } - - msg.content = textBlocks.join('') || '' - if (toolCalls.length > 0) { - msg.tool_calls = toolCalls - } - if (reasoningContent) { - msg.reasoning = reasoningContent - } - } - } catch (e) { - // Parsing failed, keep original content - msg.content = m.content - } - } - } else if (Array.isArray(m.content)) { - const textBlocks: string[] = [] - const toolCalls: any[] = [] - let reasoningContent: string | null = null - - for (const block of m.content) { - if (block.type === 'thinking') { - reasoningContent = block.thinking - } else if (block.type === 'text') { - textBlocks.push(block.text) - } else if (block.type === 'tool_use') { - toolCalls.push({ - id: block.id, - type: 'function', - function: { - name: block.name, - arguments: JSON.stringify(block.input) - } - }) - } - } - - msg.content = textBlocks.join('') || '' - if (toolCalls.length > 0) { - msg.tool_calls = toolCalls - } - if (reasoningContent) { - msg.reasoning = reasoningContent - } - } - - if (m.tool_calls?.length) { - // Filter out tool_calls with empty/invalid id and remove internal fields - const cleanedToolCalls = m.tool_calls - .filter((tc: any) => tc.id && tc.id.length > 0) - .map((tc: any) => ({ - id: tc.id, - type: tc.type, - function: tc.function - })) - if (cleanedToolCalls.length > 0) { - msg.tool_calls = cleanedToolCalls - } - } - - // For tool messages, ensure tool_call_id exists - if (m.role === 'tool') { - let callId = m.tool_call_id - if (!callId || callId.length === 0) { - // Try to reconstruct tool_call_id from previous assistant message - const prevMsg = arr[idx - 1] - if (prevMsg?.role === 'assistant' && prevMsg.tool_calls?.length) { - // Find matching tool_call by tool_name - const tc = prevMsg.tool_calls.find((t: any) => t.function?.name === m.tool_name) - if (tc?.id) { - callId = tc.id - } - } - } - // Skip tool message if no valid tool_call_id - if (!callId || callId.length === 0) { - return null - } - msg.tool_call_id = callId - } - - if (m.tool_name) msg.tool_name = m.tool_name - if (m.reasoning) msg.reasoning = m.reasoning - return msg - }) - .filter(m => m !== null) - : [] - // Calculate context tokens — aware of compression snapshot - let inputTokens: number - const snapshot = getCompressionSnapshot(sid) - if (snapshot) { - const newMessages = messages.slice(snapshot.lastMessageIndex + 1) - inputTokens = countTokens(SUMMARY_PREFIX + snapshot.summary) + - newMessages.reduce((sum, m) => sum + countTokens(m.content || ''), 0) - } else { - inputTokens = messages.reduce((sum, m) => sum + countTokens(m.content || ''), 0) + if (!state) { + try { + const detail = useLocalSessionStore() + ? getSessionDetailPaginated(sid) + : await getSessionDetailFromDb(sid) + const messages = detail?.messages ? this.handleMessage(detail.messages, sid) : [] + // Calculate context tokens — aware of compression snapshot + let inputTokens: number + const snapshot = getCompressionSnapshot(sid) + if (snapshot) { + const newMessages = messages.slice(snapshot.lastMessageIndex + 1) + inputTokens = countTokens(SUMMARY_PREFIX + snapshot.summary) + + newMessages.reduce((sum, m) => sum + countTokens(m.content || ''), 0) + } else { + inputTokens = messages.reduce((sum, m) => sum + countTokens(m.content || ''), 0) + } + const outputTokens = messages + .filter(m => m.role === 'assistant') + .reduce((sum, m) => sum + countTokens(m.content || ''), 0) + state = { + messages, + isWorking: false, + events: [], + inputTokens, + outputTokens, + } + this.sessionMap.set(sid, state) + logger.info('[chat-run-socket] loaded session %s from DB (%d messages)', sid, messages.length) + } catch (err) { + logger.warn(err, '[chat-run-socket] failed to load session %s from DB on resume', sid) + state = { messages: [], isWorking: false, events: [] } + this.sessionMap.set(sid, state) } - const outputTokens = messages - .filter(m => m.role === 'assistant') - .reduce((sum, m) => sum + countTokens(m.content || ''), 0) - state = { - messages, - isWorking: false, - events: [], - inputTokens, - outputTokens, - } - this.sessionMap.set(sid, state) - logger.info('[chat-run-socket] loaded session %s from DB (%d messages)', sid, messages.length) - } catch (err) { - logger.warn(err, '[chat-run-socket] failed to load session %s from DB on resume', sid) - state = { messages: [], isWorking: false, events: [] } - this.sessionMap.set(sid, state) } - - // Reply with messages, working status + events (if working) - // Convert messages from internal storage format to OpenAI format for client - const clientMessages = state.messages.map((m: any) => { - const msg: any = { ...m } - // Check if content is a stringified array (Hermes Gateway behavior) - only for assistant messages - if (m.role === 'assistant' && typeof m.content === 'string') { - // Handle double-serialized content: "[{'type': 'text', ...}]" - let contentToParse = m.content - const trimmed = m.content.trim() - if (trimmed.startsWith('"') && trimmed.endsWith('"') && trimmed.length >= 2) { - contentToParse = trimmed.slice(1, -1) - logger.info('[chat-run-socket] resume message %s: double-serialized, removed outer quotes', m.id) - } - - if (contentToParse.trim().startsWith('[') && contentToParse.trim().endsWith(']')) { - try { - // Parse stringified Python-like array to JSON - const parsedContent = JSON.parse( - contentToParse - .replace(/'/g, '"') - .replace(/True/g, 'true') - .replace(/False/g, 'false') - .replace(/None/g, 'null') - ) - if (Array.isArray(parsedContent)) { - const textBlocks: string[] = [] - const toolCalls: any[] = [] - let reasoningContent: string | null = null - - for (const block of parsedContent) { - if (block.type === 'thinking') { - reasoningContent = block.thinking - } else if (block.type === 'text') { - textBlocks.push(block.text) - } else if (block.type === 'tool_use') { - toolCalls.push({ - id: block.id, - type: 'function', - function: { - name: block.name, - arguments: JSON.stringify(block.input) - } - }) - } - } - - msg.content = textBlocks.join('') || '' - if (toolCalls.length > 0) { - msg.tool_calls = toolCalls - } - if (reasoningContent) { - msg.reasoning = reasoningContent - } - } - } catch (e) { - logger.error('[chat-run-socket] resume message %s: failed to parse content, error=%s, content=%s', m.id, (e as Error).message, contentToParse.substring(0, 200)) - // Parsing failed, keep original content - msg.content = m.content - } - } - } else if (Array.isArray(m.content)) { - // If content is an array (Anthropic format), convert to OpenAI format - const textBlocks: string[] = [] - const toolCalls: any[] = [] - let reasoningContent: string | null = null - - for (const block of m.content) { - if (block.type === 'thinking') { - reasoningContent = block.thinking - } else if (block.type === 'text') { - textBlocks.push(block.text) - } else if (block.type === 'tool_use') { - toolCalls.push({ - id: block.id, - type: 'function', - function: { - name: block.name, - arguments: JSON.stringify(block.input) - } - }) - } - } - - msg.content = textBlocks.join('') || '' - if (toolCalls.length > 0) { - msg.tool_calls = toolCalls - } - if (reasoningContent) { - msg.reasoning = reasoningContent - } - } - - return msg - }) - socket.emit('resumed', { session_id: sid, - messages: clientMessages, + messages: state.messages, isWorking: state.isWorking, events: state.isWorking ? state.events : [], inputTokens: state.inputTokens, @@ -518,8 +430,8 @@ export class ChatRunSocket { // Mark working immediately on run start, and append user message if (session_id) { const state = this.getOrCreateSession(session_id) + this.hermesSessionIds.set(session_id, hermesSessionId) state.isWorking = true - state.hermesSessionId = hermesSessionId state.profile = profile state.messages.push({ id: state.messages.length + 1, @@ -599,7 +511,7 @@ export class ChatRunSocket { ? validMessages.slice(0, validMessages.length - lastUserMsgIndex - 1) : validMessages ).map((m, idx, arr) => { - const msg: any = { role: m.role, content: m.content || 'empty message' } + const msg: any = { role: m.role, content: m.content || '' } if (m.reasoning_content) msg.reasoning_content = m.reasoning_content if (m.tool_calls?.length) { // Filter out tool_calls with empty/invalid id and remove internal fields @@ -960,6 +872,7 @@ export class ChatRunSocket { msgs.push({ id: msgs.length + 1, session_id, + hermesSessionId, role: 'assistant', content: parsed.delta || '', timestamp: Math.floor(Date.now() / 1000), @@ -978,6 +891,7 @@ export class ChatRunSocket { id: msgs.length + 1, session_id, role: 'assistant', + hermesSessionId, content: '', reasoning: text, timestamp: Math.floor(Date.now() / 1000), @@ -993,6 +907,7 @@ export class ChatRunSocket { id: msgs.length + 1, session_id, role: 'tool', + hermesSessionId, content: '', tool_call_id: parsed.tool_call_id || null, tool_name: parsed.tool || parsed.name || null, @@ -1025,6 +940,7 @@ export class ChatRunSocket { msgs.push({ id: msgs.length + 1, session_id, + hermesSessionId, role: 'assistant', content: parsed.output, timestamp: Math.floor(Date.now() / 1000), @@ -1141,12 +1057,11 @@ export class ChatRunSocket { state.abortController = undefined state.runId = undefined state.events = [] - // Sync messages from Hermes ephemeral session to local DB - if (useLocalSessionStore() && state.hermesSessionId) { - const hermesId = state.hermesSessionId + if (useLocalSessionStore() && this.hermesSessionIds.get(sessionId)) { + const hermesId = this.hermesSessionIds.get(sessionId) const prof = state.profile - state.hermesSessionId = undefined + this.hermesSessionIds.delete(sessionId) state.profile = undefined this.syncFromHermes(socket, sessionId, hermesId, prof) } @@ -1207,7 +1122,6 @@ export class ChatRunSocket { logger.warn('[chat-run-socket] syncFromHermes: no data for Hermes session %s', hermesSessionId) return } - // Skip user messages — already written to local DB in handleRun const toInsert = detail.messages.filter(m => m.role !== 'user') @@ -1301,6 +1215,10 @@ export class ChatRunSocket { // Use inputTokens already set by compression path if available const state = this.sessionMap.get(localSessionId) if (state) { + const messages = this.handleMessage(toInsert, localSessionId) + if (messages.length > 0) { + this.replaceByHermesSessionId(localSessionId, hermesSessionId, messages) + } const emit = (event: string, payload: any) => { socket.emit(event, { ...payload, session_id: localSessionId }) } @@ -1314,7 +1232,28 @@ export class ChatRunSocket { logger.warn(err, '[chat-run-socket] syncFromHermes failed for session %s (hermesId: %s, profile: %s)', localSessionId, hermesSessionId, profile || 'default') }) } + private replaceByHermesSessionId(session_id: string, hermesSessionId: string, newItems: SessionMessage[]) { + let start = -1 + let end = -1 + const state = this.sessionMap.get(session_id) + const msg = state?.messages || [] + // 找区间 + for (let i = 0; i < msg.length; i++) { + if (msg[i].hermesSessionId === hermesSessionId) { + if (start === -1) start = i + end = i + } else if (start !== -1) { + // 已经找到一段,后面断了就可以结束 + break + } + } + // 没找到 + if (start === -1) return + // 替换 + msg.splice(start, end - start + 1, ...newItems) + console.log(msg) + } /** Enqueue an ephemeral Hermes session for deferred deletion */ private enqueueEphemeralDelete(hermesSessionId: string, profile?: string) { try {