/** * Chat run via Socket.IO — namespace /chat-run. * * Replaces HTTP POST + SSE. Socket.IO decouples message handling * from connection lifecycle: the server continues streaming upstream * events even after the client disconnects or refreshes. * * Uses Socket.IO rooms keyed by session_id. On client reconnect, * the client emits 'resume' to rejoin its session room. */ import type { Server, Socket } from 'socket.io' import { EventSource } from 'eventsource' import { setRunSession } from '../../routes/hermes/proxy-handler' import { updateUsage } from '../../db/hermes/usage-store' import { getSession, getSessionDetail, createSession, addMessage, updateSessionStats, useLocalSessionStore, } from '../../db/hermes/session-store' import { getDb } from '../../db/index' import { getSessionDetailFromDb } from '../../db/hermes/sessions-db' import { getModelContextLength } from './model-context' import { ChatContextCompressor, countTokens, SUMMARY_PREFIX } from '../../lib/context-compressor' import { getCompressionSnapshot } from '../../db/hermes/compression-snapshot' import { logger } from '../logger' const compressor = new ChatContextCompressor() // --- Session state tracking --- interface SessionMessage { id: number | string session_id: string role: string content: string tool_call_id?: string | null tool_calls?: any[] | null tool_name?: string | null timestamp: number token_count?: number | null finish_reason?: string | null reasoning?: string | null reasoning_details?: string | null reasoning_content?: string | null codex_reasoning_items?: string | null } interface SessionState { messages: SessionMessage[] isWorking: boolean events: Array<{ event: string; data: any }> abortController?: AbortController runId?: string /** Ephemeral session ID used for Hermes (one per run) */ hermesSessionId?: string profile?: string inputTokens?: number outputTokens?: number } // --- ChatRunSocket --- export class ChatRunSocket { private nsp: ReturnType private gatewayManager: any /** sessionId → session state (messages, working status, events, run tracking) */ private sessionMap = new Map() constructor(io: Server, gatewayManager: any) { this.nsp = io.of('/chat-run') this.gatewayManager = gatewayManager } init() { this.nsp.use(this.authMiddleware.bind(this)) this.nsp.on('connection', this.onConnection.bind(this)) logger.info('[chat-run-socket] Socket.IO ready at /chat-run') } // --- Auth middleware --- private async authMiddleware(socket: Socket, next: (err?: Error) => void) { const token = socket.handshake.auth?.token as string | undefined if (!process.env.AUTH_DISABLED && process.env.AUTH_DISABLED !== '1') { const { getToken } = await import('../auth') const serverToken = await getToken() if (serverToken && token !== serverToken) { return next(new Error('Authentication failed')) } } next() } // --- Connection handler --- private onConnection(socket: Socket) { const profile = (socket.handshake.query?.profile as string) || 'default' socket.on('run', async (data: { input: string session_id?: string model?: string instructions?: string }) => { await this.handleRun(socket, data, profile) }) socket.on('resume', async (data: { session_id?: string }) => { if (!data.session_id) return const sid = data.session_id const room = `session:${sid}` socket.join(room) let state = this.sessionMap.get(sid) // Not in memory — load from DB if (!state) { try { const detail = useLocalSessionStore() ? getSessionDetail(sid) : await getSessionDetailFromDb(sid) const messages = detail?.messages?.length ? detail.messages .filter(m => (m.role === 'user' || m.role === 'assistant' || m.role === 'tool') && m.content !== undefined) .map((m, idx, arr) => { const msg: any = { id: m.id, session_id: sid, role: m.role, content: m.content || '', timestamp: m.timestamp, } if (m.tool_calls?.length) msg.tool_calls = m.tool_calls // For tool messages, ensure tool_call_id exists if (m.role === 'tool') { if (m.tool_call_id) { msg.tool_call_id = m.tool_call_id } else { // Try to reconstruct tool_call_id from previous assistant message const prevMsg = arr[idx - 1] if (prevMsg?.role === 'assistant' && prevMsg.tool_calls?.length) { // Find matching tool_call by tool_name const tc = prevMsg.tool_calls.find((t: any) => t.function?.name === m.tool_name) if (tc?.id) { msg.tool_call_id = tc.id } else { // Cannot reconstruct - skip this tool message return null } } else { // No previous assistant message with tool_calls - skip return null } } } if (m.tool_name) msg.tool_name = m.tool_name if (m.reasoning) msg.reasoning = m.reasoning return msg }) .filter(m => m !== null) : [] // Calculate context tokens — aware of compression snapshot let inputTokens: number const snapshot = getCompressionSnapshot(sid) if (snapshot) { const newMessages = messages.slice(snapshot.lastMessageIndex + 1) inputTokens = countTokens(SUMMARY_PREFIX + snapshot.summary) + newMessages.reduce((sum, m) => sum + countTokens(m.content || ''), 0) } else { inputTokens = messages.reduce((sum, m) => sum + countTokens(m.content || ''), 0) } const outputTokens = messages .filter(m => m.role === 'assistant') .reduce((sum, m) => sum + countTokens(m.content || ''), 0) state = { messages, isWorking: false, events: [], inputTokens, outputTokens, } this.sessionMap.set(sid, state) logger.info('[chat-run-socket] loaded session %s from DB (%d messages)', sid, messages.length) } catch (err) { logger.warn(err, '[chat-run-socket] failed to load session %s from DB on resume', sid) state = { messages: [], isWorking: false, events: [] } this.sessionMap.set(sid, state) } } // Reply with messages, working status + events (if working) socket.emit('resumed', { session_id: sid, messages: state.messages, isWorking: state.isWorking, events: state.isWorking ? state.events : [], inputTokens: state.inputTokens, outputTokens: state.outputTokens, }) logger.info('[chat-run-socket] socket %s resumed session %s (working: %s, messages: %d)', socket.id, sid, state.isWorking, state.messages.length) }) socket.on('abort', (data: { session_id?: string }) => { if (data.session_id) { this.handleAbort(data.session_id) } }) } // --- Run handler --- private async handleRun( socket: Socket, data: { input: string; session_id?: string; model?: string; instructions?: string }, profile: string, ) { const { input, session_id, model, instructions } = data const upstream = this.gatewayManager.getUpstream(profile).replace(/\/$/, '') const apiKey = this.gatewayManager.getApiKey(profile) || undefined // Generate ephemeral session ID for Hermes (fresh session per run) const hermesSessionId = session_id ? `eph_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}` : undefined const now = Math.floor(Date.now() / 1000) // Mark working immediately on run start, and append user message if (session_id) { const state = this.getOrCreateSession(session_id) state.isWorking = true state.hermesSessionId = hermesSessionId state.profile = profile state.messages.push({ id: state.messages.length + 1, session_id, role: 'user', content: input, timestamp: now, }) // Create session in local DB if it doesn't exist if (!getSession(session_id)) { const preview = input.replace(/[\r\n]/g, ' ').substring(0, 100) createSession({ id: session_id, profile, model, title: preview }) } // Write user message to local DB immediately addMessage({ session_id, role: 'user', content: input, timestamp: now, }) socket.join(`session:${session_id}`) } // Emit helper: tag every payload with session_id const emit = (event: string, payload: any) => { const tagged = session_id ? { ...payload, session_id } : payload if (session_id) { this.nsp.to(`session:${session_id}`).emit(event, tagged) } else if (socket.connected) { socket.emit(event, tagged) } } try { // Build upstream request body const body: Record = { input } if (hermesSessionId) body.session_id = hermesSessionId if (model) body.model = model if (instructions) body.instructions = instructions // Build conversation_history from DB if session_id is provided if (session_id) { try { const detail = useLocalSessionStore() ? getSessionDetail(session_id) : await getSessionDetailFromDb(session_id) if (detail?.messages?.length) { // Filter valid messages const validMessages = detail.messages.filter(m => (m.role === 'user' || m.role === 'assistant' || m.role === 'tool') && m.content !== undefined ) // Exclude the last user message (just added in handleRun) const lastUserMsgIndex = [...validMessages].reverse().findIndex(m => m.role === 'user') let history: Array<{ role: string content: string tool_calls?: any[] tool_call_id?: string name?: string }> = (lastUserMsgIndex >= 0 ? validMessages.slice(0, validMessages.length - lastUserMsgIndex - 1) : validMessages ).map((m, idx, arr) => { const msg: any = { role: m.role, content: m.content || '' } if (m.tool_calls?.length) msg.tool_calls = m.tool_calls // For tool messages, ensure tool_call_id exists if (m.role === 'tool') { if (m.tool_call_id) { msg.tool_call_id = m.tool_call_id } else { // Try to reconstruct tool_call_id from previous assistant message const prevMsg = arr[idx - 1] if (prevMsg?.role === 'assistant' && prevMsg.tool_calls?.length) { const tc = prevMsg.tool_calls.find((t: any) => t.function?.name === m.tool_name) if (tc?.id) { msg.tool_call_id = tc.id } else { return null // Cannot reconstruct } } else { return null // No assistant message to reconstruct from } } } if (m.tool_name) msg.name = m.tool_name return msg }) .filter(m => m !== null) // Context compression with snapshot awareness const contextLength = getModelContextLength(profile) const triggerTokens = Math.floor(contextLength / 2) const cState = this.getOrCreateSession(session_id) // Calculate inputTokens + outputTokens from DB (unified method) const assembledTokens = await this.calcAndUpdateUsage(session_id, cState, emit) const totalTokens = assembledTokens.inputTokens + assembledTokens.outputTokens // Step 1: Check existing snapshot — if present, assemble summary + new messages const snapshot = session_id ? getCompressionSnapshot(session_id) : null if (snapshot) { const newMessages = history.slice(snapshot.lastMessageIndex + 1) logger.info('[context-compress] session=%s: snapshot at %d, %d new messages, assembled ~%d tokens (threshold %d)', session_id, snapshot.lastMessageIndex, newMessages.length, totalTokens, triggerTokens) if (totalTokens <= triggerTokens) { // Under threshold — use assembled context directly, no LLM call needed history = [ { role: 'user', content: SUMMARY_PREFIX + '\n\n' + snapshot.summary }, ...newMessages, ] } else { this.pushState(session_id, 'compression.started', { event: 'compression.started', message_count: newMessages.length, token_count: totalTokens, }) emit('compression.started', { event: 'compression.started', message_count: newMessages.length, token_count: totalTokens, }) try { const result = await compressor.compress( history, upstream, apiKey, session_id, contextLength, ) const afterTokens = await this.calcAndUpdateUsage(session_id, cState, emit) this.replaceState(session_id, 'compression.completed', { event: 'compression.completed', compressed: result.meta.compressed, llmCompressed: result.meta.llmCompressed, totalMessages: result.meta.totalMessages, resultMessages: result.messages.length, beforeTokens: totalTokens, afterTokens: afterTokens.inputTokens + afterTokens.outputTokens, summaryTokens: result.meta.summaryTokenEstimate, verbatimCount: result.meta.verbatimCount, compressedStartIndex: result.meta.compressedStartIndex, }) logger.info('[context-compress] AFTER session=%s: %d messages, ~%d tokens (was %d)', session_id, result.messages.length, afterTokens.inputTokens + afterTokens.outputTokens, totalTokens) emit('compression.completed', { event: 'compression.completed', compressed: result.meta.compressed, llmCompressed: result.meta.llmCompressed, totalMessages: result.meta.totalMessages, resultMessages: result.messages.length, beforeTokens: totalTokens, afterTokens: afterTokens.inputTokens + afterTokens.outputTokens, summaryTokens: result.meta.summaryTokenEstimate, verbatimCount: result.meta.verbatimCount, compressedStartIndex: result.meta.compressedStartIndex, }) history = result.messages.map(m => ({ role: m.role, content: m.content, tool_calls: m.tool_calls, tool_call_id: m.tool_call_id, name: m.name, })) // Update usage from DB (snapshot now updated by compressor) await this.calcAndUpdateUsage(session_id, cState, emit) } catch (err: any) { this.replaceState(session_id, 'compression.completed', { event: 'compression.completed', compressed: false, totalMessages: newMessages.length, resultMessages: newMessages.length, beforeTokens: totalTokens, afterTokens: totalTokens, summaryTokens: 0, verbatimCount: newMessages.length, compressedStartIndex: -1, error: err.message, }) logger.warn(err, '[chat-run-socket] compression failed for session %s, using assembled context', session_id) emit('compression.completed', { event: 'compression.completed', compressed: false, totalMessages: newMessages.length, resultMessages: newMessages.length, beforeTokens: totalTokens, afterTokens: totalTokens, summaryTokens: 0, verbatimCount: newMessages.length, compressedStartIndex: -1, error: err.message, }) } } } else if (history.length > 4) { // No snapshot — check if raw history exceeds threshold if (totalTokens <= triggerTokens) { // Under threshold — use raw history as-is logger.info('[context-compress] session=%s: %d messages, ~%d tokens — under threshold, skip', session_id, history.length, totalTokens) } else { // Over threshold — full LLM compression logger.info('[context-compress] BEFORE session=%s: %d messages, ~%d tokens (threshold %d)', session_id, history.length, totalTokens, triggerTokens) this.pushState(session_id, 'compression.started', { event: 'compression.started', message_count: history.length, token_count: totalTokens, }) emit('compression.started', { event: 'compression.started', message_count: history.length, token_count: totalTokens, }) try { const result = await compressor.compress( history, upstream, apiKey, session_id, contextLength, ) const cState = this.getOrCreateSession(session_id) const afterTokens = await this.calcAndUpdateUsage(session_id, cState, emit) this.replaceState(session_id, 'compression.completed', { event: 'compression.completed', compressed: result.meta.compressed, llmCompressed: result.meta.llmCompressed, totalMessages: result.meta.totalMessages, resultMessages: result.messages.length, beforeTokens: totalTokens, afterTokens: afterTokens.inputTokens + afterTokens.outputTokens, summaryTokens: result.meta.summaryTokenEstimate, verbatimCount: result.meta.verbatimCount, compressedStartIndex: result.meta.compressedStartIndex, }) logger.info('[context-compress] AFTER session=%s: %d messages, ~%d tokens (was %d)', session_id, result.messages.length, afterTokens.inputTokens + afterTokens.outputTokens, totalTokens) emit('compression.completed', { event: 'compression.completed', compressed: result.meta.compressed, llmCompressed: result.meta.llmCompressed, totalMessages: result.meta.totalMessages, resultMessages: result.messages.length, beforeTokens: totalTokens, afterTokens: afterTokens.inputTokens + afterTokens.outputTokens, summaryTokens: result.meta.summaryTokenEstimate, verbatimCount: result.meta.verbatimCount, compressedStartIndex: result.meta.compressedStartIndex, }) history = result.messages.map(m => ({ role: m.role, content: m.content, tool_calls: m.tool_calls, tool_call_id: m.tool_call_id, name: m.name, })) await this.calcAndUpdateUsage(session_id, cState, emit) } catch (err: any) { this.replaceState(session_id, 'compression.completed', { event: 'compression.completed', compressed: false, totalMessages: history.length, resultMessages: history.length, beforeTokens: totalTokens, afterTokens: totalTokens, summaryTokens: 0, verbatimCount: history.length, compressedStartIndex: -1, error: err.message, }) logger.warn(err, '[chat-run-socket] compression failed for session %s, using raw history', session_id) emit('compression.completed', { event: 'compression.completed', compressed: false, totalMessages: history.length, resultMessages: history.length, beforeTokens: totalTokens, afterTokens: totalTokens, summaryTokens: 0, verbatimCount: history.length, compressedStartIndex: -1, error: err.message, }) } } } body.conversation_history = history } } catch (err) { logger.warn(err, '[chat-run-socket] failed to load conversation history for session %s', session_id) } } const headers: Record = { 'Content-Type': 'application/json' } if (apiKey) headers['Authorization'] = `Bearer ${apiKey}` const res = await fetch(`${upstream}/v1/runs`, { method: 'POST', headers, body: JSON.stringify(body), signal: AbortSignal.timeout(120_000), }) if (!res.ok) { const text = await res.text().catch(() => '') emit('run.failed', { event: 'run.failed', error: `Upstream ${res.status}: ${text}` }) return } const runData = await res.json() as any const runId = runData.run_id if (!runId) { emit('run.failed', { event: 'run.failed', error: 'No run_id in upstream response' }) return } if (session_id) { setRunSession(runId, session_id) } const abortController = new AbortController() if (session_id) { const state = this.getOrCreateSession(session_id) state.isWorking = true state.runId = runId state.abortController = abortController } emit('run.started', { event: 'run.started', run_id: runId, status: runData.status }) // Stream upstream events via EventSource — survives socket disconnect const eventsUrl = new URL(`${upstream}/v1/runs/${runId}/events`) if (apiKey) eventsUrl.searchParams.set('token', apiKey) const source = new EventSource(eventsUrl.toString()) source.onmessage = (event: MessageEvent) => { try { const parsed = JSON.parse(event.data as string) // Track messages into sessionMap if (session_id) { const state = this.sessionMap.get(session_id) if (state) { const msgs = state.messages const last = msgs[msgs.length - 1] switch (parsed.event) { case 'message.delta': { if (last?.role === 'assistant' && last.finish_reason == null) { last.content += (parsed.delta || '') } else { msgs.push({ id: msgs.length + 1, session_id, role: 'assistant', content: parsed.delta || '', timestamp: Math.floor(Date.now() / 1000), }) } break } case 'reasoning.delta': case 'thinking.delta': { const text = parsed.text || parsed.delta || '' if (!text) break if (last?.role === 'assistant' && last.finish_reason == null) { last.reasoning = (last.reasoning || '') + text } else { msgs.push({ id: msgs.length + 1, session_id, role: 'assistant', content: '', reasoning: text, timestamp: Math.floor(Date.now() / 1000), }) } break } case 'tool.started': { if (last?.role === 'assistant' && last.finish_reason == null) { last.finish_reason = 'tool_calls' } msgs.push({ id: msgs.length + 1, session_id, role: 'tool', content: '', tool_call_id: parsed.tool_call_id || null, tool_name: parsed.tool || parsed.name || null, timestamp: Math.floor(Date.now() / 1000), }) break } case 'tool.completed': { const toolMsg = [...msgs].reverse().find(m => m.role === 'tool' && !m.content) if (toolMsg && parsed.output) { toolMsg.content = typeof parsed.output === 'string' ? parsed.output : JSON.stringify(parsed.output) } break } case 'run.completed': { if (last?.role === 'assistant' && last.finish_reason == null) { last.finish_reason = parsed.finish_reason || 'stop' } // Finalize assistant message — if no content was streamed, use output if (parsed.output && !runProducedAssistantText(msgs)) { if (last?.role === 'assistant') { last.content = parsed.output } else { msgs.push({ id: msgs.length + 1, session_id, role: 'assistant', content: parsed.output, timestamp: Math.floor(Date.now() / 1000), }) } } break } } } } // Usage will be calculated after syncFromHermes completes (in markCompleted) emit(parsed.event || 'message', parsed) if (parsed.event === 'run.completed' || parsed.event === 'run.failed') { source.close() if (session_id) this.markCompleted(session_id, { event: parsed.event, run_id: parsed.run_id }) } } catch { /* not JSON, skip */ } } source.onerror = () => { source.close() emit('run.failed', { event: 'run.failed', error: 'EventSource connection lost' }) if (session_id) this.markCompleted(session_id, { event: 'run.failed' }) } } catch (err: any) { emit('run.failed', { event: 'run.failed', error: err.message }) if (session_id) this.markCompleted(session_id, { event: 'run.failed' }) } } // --- Abort handler --- private handleAbort(sessionId: string) { const state = this.sessionMap.get(sessionId) if (state?.isWorking && state.abortController) { state.abortController.abort() this.markCompleted(sessionId, { event: 'run.failed', run_id: state.runId }) } } /** Mark a session run as completed/failed so reconnecting clients get notified */ private markCompleted(sessionId: string, _info: { event: string; run_id?: string }) { const state = this.sessionMap.get(sessionId) if (state) { state.isWorking = false state.abortController = undefined state.runId = undefined state.events = [] // Sync messages from Hermes ephemeral session to local DB if (useLocalSessionStore() && state.hermesSessionId) { const hermesId = state.hermesSessionId const prof = state.profile state.hermesSessionId = undefined state.profile = undefined this.syncFromHermes(sessionId, hermesId, prof) } } } /** * Calculate usage from DB and update state + emit to clients. * @returns { inputTokens, outputTokens } for the caller to use */ private async calcAndUpdateUsage( sid: string, state: SessionState, emit: (event: string, payload: any) => void, ): Promise<{ inputTokens: number; outputTokens: number }> { try { const detail = useLocalSessionStore() ? getSessionDetail(sid) : await getSessionDetailFromDb(sid) const msgs = detail?.messages ?.filter(m => m.role === 'user' || m.role === 'assistant' || m.role === 'tool') || [] const snapshot = getCompressionSnapshot(sid) let inputTokens: number if (snapshot && msgs.length) { const newMessages = msgs.slice(snapshot.lastMessageIndex + 1) inputTokens = countTokens(SUMMARY_PREFIX + snapshot.summary) + newMessages.reduce((sum, m) => sum + countTokens(m.content || ''), 0) } else { inputTokens = msgs.reduce((sum, m) => sum + countTokens(m.content || ''), 0) } const outputTokens = msgs .filter(m => m.role === 'assistant') .reduce((sum, m) => sum + countTokens(m.content || ''), 0) state.inputTokens = inputTokens state.outputTokens = outputTokens emit('usage.updated', { event: 'usage.updated', session_id: sid, inputTokens, outputTokens, }) return { inputTokens, outputTokens } } catch (err: any) { logger.warn(err, '[chat-run-socket] failed to calculate usage for session %s', sid) return { inputTokens: 0, outputTokens: 0 } } } /** * Read complete messages from Hermes state.db for the ephemeral session * and write to local DB. This gives us tool results that SSE events don't include. * After sync, enqueues the ephemeral session for deletion. */ private syncFromHermes(localSessionId: string, hermesSessionId: string, profile?: string) { getSessionDetailFromDb(hermesSessionId) .then((detail) => { if (!detail || !detail.messages?.length) { logger.warn('[chat-run-socket] syncFromHermes: no data for Hermes session %s', hermesSessionId) return } // Skip user messages — already written to local DB in handleRun const toInsert = detail.messages.filter(m => m.role !== 'user') // Build tool_call_id → function.name lookup from assistant messages // (Hermes stores tool_name as NULL, name lives inside tool_calls JSON) const toolNameMap = new Map() for (const msg of detail.messages) { if (msg.role === 'assistant' && Array.isArray(msg.tool_calls)) { for (const tc of msg.tool_calls) { const id = tc.id || tc.call_id || tc.tool_call_id const name = tc.function?.name || tc.name if (id && name) toolNameMap.set(id, name) } } } if (toInsert.length > 0) { for (const msg of toInsert) { // Resolve tool_name from assistant's tool_calls if missing let toolName = msg.tool_name || null if (!toolName && msg.tool_call_id) { toolName = toolNameMap.get(msg.tool_call_id) || null } addMessage({ session_id: localSessionId, role: msg.role, content: msg.content || '', tool_call_id: msg.tool_call_id || null, tool_calls: msg.tool_calls || null, tool_name: toolName, timestamp: msg.timestamp || Math.floor(Date.now() / 1000), token_count: msg.token_count || null, finish_reason: msg.finish_reason || null, reasoning: msg.reasoning || null, reasoning_details: msg.reasoning_details || null, reasoning_content: msg.reasoning_content || null, codex_reasoning_items: msg.codex_reasoning_items || null, }) } logger.info('[chat-run-socket] syncFromHermes: synced %d messages to local session %s', toInsert.length, localSessionId) } updateSessionStats(localSessionId) // Record usage from Hermes session updateUsage(localSessionId, { inputTokens: detail.input_tokens, outputTokens: detail.output_tokens, cacheReadTokens: detail.cache_read_tokens, cacheWriteTokens: detail.cache_write_tokens, reasoningTokens: detail.reasoning_tokens, model: detail.model, profile: profile || 'default', }) // Calculate usage from DB now that data is complete // Use inputTokens already set by compression path if available const state = this.sessionMap.get(localSessionId) if (state) { const emit = (event: string, payload: any) => { this.nsp.to(`session:${localSessionId}`).emit(event, { ...payload, session_id: localSessionId }) } this.calcAndUpdateUsage(localSessionId, state, emit) } // Enqueue ephemeral session for deferred deletion this.enqueueEphemeralDelete(hermesSessionId, profile) }) .catch((err: any) => { logger.warn(err, '[chat-run-socket] syncFromHermes failed for session %s (hermesId: %s, profile: %s)', localSessionId, hermesSessionId, profile || 'default') }) } /** Enqueue an ephemeral Hermes session for deferred deletion */ private enqueueEphemeralDelete(hermesSessionId: string, profile?: string) { try { const db = getDb() if (!db) return const now = Date.now() db.prepare( `INSERT INTO gc_pending_session_deletes (session_id, profile_name, status, attempt_count, last_error, created_at, updated_at, next_attempt_at) VALUES (?, ?, 'pending', 0, NULL, ?, ?, ?) ON CONFLICT(session_id) DO NOTHING`, ).run(hermesSessionId, profile || 'default', now, now, now) logger.info('[chat-run-socket] enqueued ephemeral session %s for deletion', hermesSessionId) } catch { /* best-effort */ } } /** Get or create session state in sessionMap */ private getOrCreateSession(sessionId: string): SessionState { let state = this.sessionMap.get(sessionId) if (!state) { state = { messages: [], isWorking: false, events: [] } this.sessionMap.set(sessionId, state) } return state } /** Append a state event for a session (used for replay on reconnect) */ private pushState(sessionId: string, event: string, data: any) { const state = this.getOrCreateSession(sessionId) state.events.push({ event, data }) } /** Replace the last state with the same event name, or append if different */ private replaceState(sessionId: string, event: string, data: any) { const state = this.sessionMap.get(sessionId) if (state) { const idx = state.events.findIndex(s => s.event === event) if (idx >= 0) { state.events[idx] = { event, data } return } } this.pushState(sessionId, event, data) } } /** Check if any assistant message in the list has non-empty content */ function runProducedAssistantText(messages: SessionMessage[]): boolean { return messages.some(m => m.role === 'assistant' && m.content?.trim()) }