/** * CLI Bridge run handler — handles runs that use the agent bridge * to communicate with Hermes CLI agent. */ import type { Server, Socket } from 'socket.io' import { getSystemPrompt } from '../../../lib/llm-prompt' import { getSession, createSession, addMessage, updateSession, updateSessionStats } from '../../../db/hermes/session-store' import { updateUsage } from '../../../db/hermes/usage-store' import { logger, bridgeLogger } from '../../logger' import { AgentBridgeClient, type AgentBridgeContextEstimate, type AgentBridgeMessage, type AgentBridgeOutput } from '../agent-bridge' import { contentBlocksToString, convertContentBlocksForAgent, extractTextForPreview, isContentBlockArray } from './content-blocks' import { buildCompressedHistory, buildDbHistory, buildSnapshotAwareHistory, forceCompressBridgeHistory, pushState, replaceState } from './compression' import { calcAndUpdateUsage, contextTokensWithCachedOverhead, estimateUsageTokensFromMessages, getCachedBridgeContextOverhead, updateContextTokenUsage, updateMessageContextTokenUsage, } from './usage' import { flushBridgePendingToDb, ensureOpenBridgeAssistantMessage, syncBridgeReasoningToMessage, recordBridgeToolStarted, recordBridgeToolCompleted, } from './bridge-message' import { summarizeToolArguments } from './response-utils' import type { ContentBlock, SessionState } from './types' import type { ChatMessage } from '../../../lib/context-compressor' import { resolveBridgeRunModelConfig, type RunModelGroup } from './model-config' import { filterBridgeToolCallMarkupDelta } from './bridge-delta' const BRIDGE_USAGE_FLUSH_DELAY_MS = 200 function stringValue(value: unknown): string { return typeof value === 'string' ? value.trim() : '' } function looksLikeAgentFailure(value: string): boolean { return /\bAPI call failed after\b/i.test(value) || /\bHTTP\s+(?:4\d\d|5\d\d)\b/i.test(value) || /\b(?:401|403|429|500|502|503|504)\b/.test(value) && /\b(?:unauthorized|forbidden|rate limit|unavailable|failed|error)\b/i.test(value) } export function bridgeTerminalError(chunk: Pick): string | null { const result = chunk.result && typeof chunk.result === 'object' && !Array.isArray(chunk.result) ? chunk.result as Record : null const resultError = result ? stringValue(result.error) || stringValue(result.exception) || stringValue(result.message) : '' const finalResponse = result ? stringValue(result.final_response) : '' if (chunk.status === 'error') { return stringValue(chunk.error) || resultError || finalResponse || 'Agent run failed' } if (result?.failed === true || result?.completed === false) { return resultError || finalResponse || 'Agent reported failure' } if (resultError) return resultError if (finalResponse && looksLikeAgentFailure(finalResponse)) return finalResponse return null } function finiteToken(value: unknown): number | undefined { return typeof value === 'number' && Number.isFinite(value) && value >= 0 ? Math.floor(value) : undefined } function cacheBridgeContext(state: SessionState, data: Record | AgentBridgeContextEstimate) { const fixedContextTokens = finiteToken(data.fixed_context_tokens) if (fixedContextTokens == null) return state.bridgeContext = { fixedContextTokens, systemPromptTokens: finiteToken(data.system_prompt_tokens), toolTokens: finiteToken(data.tool_tokens), systemPromptChars: finiteToken(data.system_prompt_chars), toolCount: finiteToken(data.tool_count), toolNames: Array.isArray(data.tool_names) ? data.tool_names.map(String) : undefined, profile: typeof data.profile === 'string' ? data.profile : state.bridgeContext?.profile, model: typeof data.model === 'string' ? data.model : state.bridgeContext?.model, provider: typeof data.provider === 'string' ? data.provider : state.bridgeContext?.provider, } } export async function handleBridgeRun( nsp: ReturnType, socket: Socket, data: { input: string | ContentBlock[]; session_id?: string; model?: string; provider?: string; model_groups?: RunModelGroup[]; instructions?: string; source?: string; queue_id?: string; peerExcludeSocketId?: string }, profile: string, sessionMap: Map, bridge: AgentBridgeClient, _skipUserMessage = false, loadSessionStateFromDbFn: (sid: string, sessionMap: Map) => Promise, dequeueNextQueuedRun: (socket: Socket, sessionId: string, fallbackProfile?: string) => void, ) { const { input, session_id, instructions } = data if (!session_id) { socket.emit('run.failed', { event: 'run.failed', error: 'session_id is required for cli source' }) return } let fullInstructions = instructions ? `${getSystemPrompt()}\n${instructions}` : getSystemPrompt() const sessionRow = getSession(session_id) const sessionModel = sessionRow?.model || '' const sessionProvider = sessionRow?.provider || '' const { model: resolvedModel, provider: resolvedProvider } = await resolveBridgeRunModelConfig({ profile, sessionModel, sessionProvider, requestedModel: data.model, requestedProvider: data.provider, modelGroups: data.model_groups, }) if (sessionRow) { const updates: { model?: string; provider?: string } = {} if (resolvedModel && sessionRow.model !== resolvedModel) updates.model = resolvedModel if (resolvedProvider && sessionRow.provider !== resolvedProvider) updates.provider = resolvedProvider if (Object.keys(updates).length > 0) updateSession(session_id, updates) } if (sessionRow?.workspace) { const workspaceCtx = `[Current working directory: ${sessionRow.workspace}]` fullInstructions = `\n${workspaceCtx}\n${fullInstructions}` } const runMarker = `cli_run_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}` const now = Math.floor(Date.now() / 1000) let state = sessionMap.get(session_id) if (!state) { state = getSession(session_id) ? await loadSessionStateFromDbFn(session_id, sessionMap) : { messages: [], isWorking: false, events: [], queue: [] } sessionMap.set(session_id, state) } state.isWorking = true state.isAborting = false state.profile = profile state.source = 'cli' state.activeRunMarker = runMarker state.runId = undefined state.abortController = undefined state.bridgeOutput = '' state.bridgePendingAssistantContent = '' state.bridgePendingReasoningContent = '' state.bridgePendingToolCallMarkup = '' state.bridgeToolCounter = 0 state.bridgePendingTools = [] state.responseRun = undefined const inputStr = contentBlocksToString(input) state.messages.push({ id: state.messages.length + 1, session_id, runMarker, role: 'user', content: inputStr, timestamp: now, }) if (!getSession(session_id)) { const previewText = extractTextForPreview(input) const preview = previewText.replace(/[\r\n]/g, ' ').substring(0, 100) createSession({ id: session_id, profile, source: 'cli', model: resolvedModel, provider: resolvedProvider, title: preview }) } const messageId = addMessage({ session_id, role: 'user', content: inputStr, timestamp: now, }) socket.join(`session:${session_id}`) const peerTarget = data.peerExcludeSocketId ? nsp.to(`session:${session_id}`).except(data.peerExcludeSocketId) : socket.to(`session:${session_id}`) peerTarget.emit('run.peer_user_message', { event: 'run.peer_user_message', session_id, message: { id: data.queue_id || messageId, role: 'user', content: inputStr, timestamp: now, }, }) const emit = (event: string, payload: any) => { const tagged = { ...payload, session_id } nsp.to(`session:${session_id}`).emit(event, tagged) if (!nsp.adapter.rooms.get(`session:${session_id}`)?.size && socket.connected) { socket.emit(event, tagged) } } const history = await buildCompressedHistory( session_id, profile, '', undefined, emit, sessionMap, { model: resolvedModel, provider: resolvedProvider }, async (messages) => { const cachedOverhead = getCachedBridgeContextOverhead(state) if (cachedOverhead != null) { const messageUsage = estimateUsageTokensFromMessages(messages) return cachedOverhead + messageUsage.inputTokens + messageUsage.outputTokens } const estimate = await bridge.contextEstimate( session_id, messages, fullInstructions, profile, { model: resolvedModel, provider: resolvedProvider }, ) cacheBridgeContext(state, estimate) bridgeLogger.info({ sessionId: session_id, profile, model: resolvedModel, provider: resolvedProvider, messages: estimate.message_count, toolCount: estimate.tool_count, systemPromptChars: estimate.system_prompt_chars, fixedContextTokens: estimate.fixed_context_tokens, fullContextTokens: estimate.token_count, }, '[chat-run-socket] full context estimate') return estimate.token_count }, ) const bridgeHistory = history try { const bridgeInput = isContentBlockArray(input) ? await convertContentBlocksForAgent(input) : input const bridgeStorageInput = isContentBlockArray(input) ? inputStr : undefined logger.info('[chat-run-socket] starting CLI bridge run for session %s', session_id) bridgeLogger.info({ sessionId: session_id, profile, inputChars: inputStr.length, historyMessages: history.length, hasInstructions: Boolean(fullInstructions), multimodalInput: isContentBlockArray(input), }, '[chat-run-socket] starting CLI bridge run') const started = await bridge.chat( session_id, bridgeInput as AgentBridgeMessage, bridgeHistory, fullInstructions, profile, { ...(bridgeStorageInput !== undefined ? { storage_message: bridgeStorageInput } : {}), ...(resolvedModel ? { model: resolvedModel } : {}), ...(resolvedProvider ? { provider: resolvedProvider } : {}), }, ) state.runId = started.run_id bridgeLogger.info({ sessionId: session_id, runId: started.run_id, status: started.status, }, '[chat-run-socket] CLI bridge run started') pushState(sessionMap, session_id, 'run.started', { event: 'run.started', run_id: started.run_id, queue_length: state.queue.length || 0, }) emit('run.started', { event: 'run.started', run_id: started.run_id, queue_length: state.queue.length || 0, }) for await (const chunk of bridge.streamOutput(started.run_id)) { await applyBridgeChunkAsync( nsp, socket, state, session_id, runMarker, chunk, emit, profile, sessionMap, bridge, dequeueNextQueuedRun, fullInstructions, { model: resolvedModel, provider: resolvedProvider }, ) if (chunk.done) break } } catch (err: any) { if (state.activeRunMarker !== runMarker) return if (!state.isWorking) return const queueLen = state.queue?.length ?? 0 state.isWorking = false state.isAborting = false state.profile = undefined state.runId = undefined state.activeRunMarker = undefined state.events = [] state.bridgePendingToolCallMarkup = undefined flushBridgePendingToDb(state, session_id) updateSessionStats(session_id) const message = err instanceof Error ? err.message : String(err) const errUsage = await calcAndUpdateUsage(session_id, state, emit) const errContextTokens = await refreshFinalContextUsage({ sessionId: session_id, profile, model: resolvedModel, provider: resolvedProvider, instructions: fullInstructions, state, usage: errUsage, emit, bridge, }) updateUsage(session_id, { inputTokens: errUsage.inputTokens, outputTokens: errUsage.outputTokens, profile, }) emit('run.failed', { event: 'run.failed', error: message, inputTokens: errUsage.inputTokens, outputTokens: errUsage.outputTokens, contextTokens: errContextTokens, queue_remaining: queueLen, }) if (queueLen > 0) dequeueNextQueuedRun(socket, session_id) } } async function refreshFinalContextUsage(args: { sessionId: string profile: string model?: string | null provider?: string | null instructions: string state: SessionState usage: { inputTokens: number; outputTokens: number } emit: (event: string, payload: any) => void bridge: AgentBridgeClient }): Promise { try { const dbHistory = await buildDbHistory(args.sessionId, { excludeLastUser: false }) const finalHistory = await buildSnapshotAwareHistory( args.sessionId, args.profile, dbHistory, { model: args.model, provider: args.provider }, ) const finalMessageUsage = estimateUsageTokensFromMessages(finalHistory) const finalMessageTokens = finalMessageUsage.inputTokens + finalMessageUsage.outputTokens if (getCachedBridgeContextOverhead(args.state) != null) { const contextTokens = updateMessageContextTokenUsage( args.sessionId, args.state, args.emit, finalMessageTokens, args.usage, ) bridgeLogger.info({ sessionId: args.sessionId, profile: args.profile, model: args.model, provider: args.provider, messages: finalHistory.length, fixedContextTokens: args.state.bridgeContext?.fixedContextTokens, messageTokens: finalMessageTokens, fullContextTokens: contextTokens, }, '[chat-run-socket] final cached context estimate') return contextTokens } const estimate = await args.bridge.contextEstimate( args.sessionId, finalHistory, args.instructions, args.profile, { model: args.model ?? undefined, provider: args.provider ?? undefined }, ) cacheBridgeContext(args.state, estimate) const contextTokens = typeof estimate.token_count === 'number' && Number.isFinite(estimate.token_count) && estimate.token_count > 0 ? Math.floor(estimate.token_count) : undefined if (contextTokens == null) return args.state.contextTokens updateContextTokenUsage(args.sessionId, args.state, args.emit, contextTokens, args.usage) bridgeLogger.info({ sessionId: args.sessionId, profile: args.profile, model: args.model, provider: args.provider, messages: estimate.message_count, toolCount: estimate.tool_count, systemPromptChars: estimate.system_prompt_chars, fullContextTokens: contextTokens, }, '[chat-run-socket] final full context estimate') return contextTokens } catch (err) { bridgeLogger.warn({ err: err instanceof Error ? { message: err.message, name: err.name } : err, sessionId: args.sessionId, profile: args.profile, }, '[chat-run-socket] final full context estimate failed') return args.state.contextTokens } } async function applyBridgeChunkAsync( nsp: ReturnType, socket: Socket, state: SessionState, sessionId: string, runMarker: string, chunk: AgentBridgeOutput, emit: (event: string, payload: any) => void, profile: string, sessionMap: Map, bridge: AgentBridgeClient, dequeueNextQueuedRun: (socket: Socket, sessionId: string, fallbackProfile?: string) => void, instructions: string, modelContext: { model?: string | null; provider?: string | null }, ): Promise { if (state.activeRunMarker !== runMarker) { bridgeLogger.info({ sessionId, runId: chunk.run_id, runMarker, activeRunMarker: state.activeRunMarker, }, '[chat-run-socket] ignoring stale CLI bridge chunk') return } state.runId = chunk.run_id for (const ev of chunk.events || []) { const evType = ev.event as string | undefined if (evType === 'bridge.context.ready') { cacheBridgeContext(state, ev) const usage = await calcAndUpdateUsage(sessionId, state, emit) updateMessageContextTokenUsage( sessionId, state, emit, usage.inputTokens + usage.outputTokens, usage, ) } else if (evType === 'tool.started') { flushBridgePendingToDb(state, sessionId, runMarker) const toolName = (ev.tool_name as string) || '' const args = ev.args as Record | undefined const tool = recordBridgeToolStarted(state, sessionId, runMarker, toolName, args, ev.tool_call_id) const payload = { event: 'tool.started', run_id: chunk.run_id, tool_call_id: tool.id, tool: toolName, name: toolName, arguments: tool.arguments, preview: ev.preview || summarizeToolArguments(tool.arguments), } pushState(sessionMap, sessionId, 'tool.started', payload) emit('tool.started', payload) } else if (evType === 'tool.completed') { const toolName = (ev.tool_name as string) || '' const completed = recordBridgeToolCompleted(state, sessionId, runMarker, toolName, ev) const payload = { event: 'tool.completed', run_id: chunk.run_id, tool_call_id: completed.id, tool: toolName, name: toolName, output: completed.output, duration: completed.duration ?? ev.duration, error: ev.is_error || undefined, } pushState(sessionMap, sessionId, 'tool.completed', payload) emit('tool.completed', payload) } else if (evType === 'turn.boundary') { flushBridgePendingToDb(state, sessionId, runMarker) } else if (evType === 'reasoning.delta' || evType === 'thinking.delta') { const text = String(ev.text || '') if (text) { state.bridgePendingReasoningContent = (state.bridgePendingReasoningContent || '') + text const message = ensureOpenBridgeAssistantMessage(state, sessionId, runMarker) message.reasoning = (message.reasoning || '') + text message.reasoning_content = (message.reasoning_content || '') + text } emit(evType, { event: evType, run_id: chunk.run_id, text, }) } else if (evType === 'reasoning.available') { emit('reasoning.available', { event: 'reasoning.available', run_id: chunk.run_id, }) } else if (evType === 'approval.requested') { const payload = { event: 'approval.requested', run_id: chunk.run_id, approval_id: ev.approval_id, command: ev.command, description: ev.description, choices: ev.choices, allow_permanent: ev.allow_permanent, timeout_ms: ev.timeout_ms, } replaceState(sessionMap, sessionId, 'approval.requested', payload) emit('approval.requested', payload) } else if (evType === 'approval.resolved') { const payload = { event: 'approval.resolved', run_id: chunk.run_id, approval_id: ev.approval_id, choice: ev.choice, } replaceState(sessionMap, sessionId, 'approval.resolved', payload) emit('approval.resolved', payload) } else if (evType === 'bridge.compression.requested') { const bridgeHistory = await buildDbHistory(sessionId, { excludeLastUser: true }) const bridgeUsage = estimateUsageTokensFromMessages(bridgeHistory) const messageOnlyTokens = bridgeUsage.inputTokens + bridgeUsage.outputTokens const tokenCount = typeof ev.approx_tokens === 'number' && Number.isFinite(ev.approx_tokens) && ev.approx_tokens > 0 ? ev.approx_tokens : messageOnlyTokens bridgeLogger.info({ sessionId, profile, bridgeMessages: ev.message_count, dbMessages: bridgeHistory.length, messageOnlyTokens, fullContextTokens: tokenCount, source: typeof ev.approx_tokens === 'number' ? 'bridge' : 'message-only-fallback', }, '[chat-run-socket] bridge compression token estimate') const payload = { event: 'compression.started', run_id: chunk.run_id, request_id: ev.request_id, message_count: bridgeHistory.length || ev.message_count, token_count: tokenCount, source: 'bridge', } replaceState(sessionMap, sessionId, 'compression.started', payload) emit('compression.started', payload) if (ev.request_id && Array.isArray(ev.messages)) { try { const compressed = await forceCompressBridgeHistory( sessionId, profile, ev.messages as ChatMessage[], typeof ev.approx_tokens === 'number' ? ev.approx_tokens : undefined, ) state.bridgeCompressionResults = state.bridgeCompressionResults || {} state.bridgeCompressionResults[String(ev.request_id)] = compressed await bridge.compressionRespond(String(ev.request_id), { messages: compressed.messages }) } catch (err: any) { await bridge.compressionRespond(String(ev.request_id), { error: err?.message || String(err), }).catch(() => undefined) } } } else if (evType === 'bridge.compression.completed') { const compressionResult = ev.request_id ? state.bridgeCompressionResults?.[String(ev.request_id)] : undefined const bridgeAfterContextTokens = finiteToken(ev.result_approx_tokens) const messageAfterTokens = finiteToken(compressionResult?.afterTokens) const afterContextTokens = messageAfterTokens != null && getCachedBridgeContextOverhead(state) != null ? contextTokensWithCachedOverhead(state, messageAfterTokens) : bridgeAfterContextTokens ?? messageAfterTokens const payload = { event: 'compression.completed', run_id: chunk.run_id, request_id: ev.request_id, compressed: compressionResult?.compressed ?? ev.compressed !== false, llmCompressed: compressionResult?.llmCompressed, totalMessages: compressionResult?.beforeMessages ?? ev.message_count, resultMessages: compressionResult?.resultMessages ?? ev.result_messages, beforeTokens: compressionResult?.beforeTokens ?? ev.approx_tokens, afterTokens: messageAfterTokens ?? bridgeAfterContextTokens, contextTokens: afterContextTokens, summaryTokens: compressionResult?.summaryTokens, verbatimCount: compressionResult?.verbatimCount, compressedStartIndex: compressionResult?.compressedStartIndex, source: 'bridge', } if (ev.request_id && state.bridgeCompressionResults) { delete state.bridgeCompressionResults[String(ev.request_id)] } replaceState(sessionMap, sessionId, 'compression.completed', payload) emit('compression.completed', payload) const usage = await calcAndUpdateUsage(sessionId, state, emit) if (messageAfterTokens != null && getCachedBridgeContextOverhead(state) != null) { updateMessageContextTokenUsage(sessionId, state, emit, messageAfterTokens, usage) } else { updateContextTokenUsage(sessionId, state, emit, afterContextTokens, usage) } } else if (evType === 'bridge.compression.failed') { const payload = { event: 'compression.completed', run_id: chunk.run_id, request_id: ev.request_id, compressed: false, totalMessages: ev.message_count, resultMessages: ev.message_count, beforeTokens: ev.approx_tokens, error: ev.error, source: 'bridge', } if (ev.request_id && state.bridgeCompressionResults) { delete state.bridgeCompressionResults[String(ev.request_id)] } replaceState(sessionMap, sessionId, 'compression.completed', payload) emit('compression.completed', payload) } else if (evType === 'status') { emit('agent.event', { event: 'agent.event', run_id: chunk.run_id, ...ev, }) } } if (chunk.delta) { const delta = filterBridgeToolCallMarkupDelta(state, chunk.delta) if (delta) { state.bridgeOutput = (state.bridgeOutput || '') + delta state.bridgePendingAssistantContent = (state.bridgePendingAssistantContent || '') + delta const last = [...state.messages].reverse().find(m => m.runMarker === runMarker) if (last?.role === 'assistant' && last.finish_reason == null) { last.content += delta syncBridgeReasoningToMessage(last, state.bridgePendingReasoningContent) } else { state.messages.push({ id: state.messages.length + 1, session_id: sessionId, runMarker, role: 'assistant', content: delta, reasoning: state.bridgePendingReasoningContent || null, reasoning_content: state.bridgePendingReasoningContent || null, timestamp: Math.floor(Date.now() / 1000), }) } emit('message.delta', { event: 'message.delta', run_id: chunk.run_id, delta, output: state.bridgeOutput, }) } } if (!chunk.done) return if (!state.isWorking) return if (state.isAborting) { bridgeLogger.info({ sessionId, runId: chunk.run_id, status: chunk.status, }, '[chat-run-socket][abort] suppressing CLI bridge terminal chunk during abort') return } flushBridgePendingToDb(state, sessionId) state.bridgePendingToolCallMarkup = undefined updateSessionStats(sessionId) await delay(BRIDGE_USAGE_FLUSH_DELAY_MS) const usage = await calcAndUpdateUsage(sessionId, state, emit) const contextTokens = await refreshFinalContextUsage({ sessionId, profile, model: modelContext.model, provider: modelContext.provider, instructions, state, usage, emit, bridge, }) updateUsage(sessionId, { inputTokens: usage.inputTokens, outputTokens: usage.outputTokens, profile: state.profile, }) const nextQueuedRun = state.queue.length > 0 ? state.queue[0] : undefined state.isWorking = Boolean(nextQueuedRun) state.isAborting = false if (nextQueuedRun) { state.profile = nextQueuedRun.profile || profile state.source = nextQueuedRun.source } else { state.profile = undefined } state.runId = undefined state.activeRunMarker = undefined state.events = [] const terminalError = bridgeTerminalError(chunk) const eventName = terminalError ? 'run.failed' : 'run.completed' const payload = { event: eventName, run_id: chunk.run_id, output: chunk.output || state.bridgeOutput || '', result: chunk.result, error: terminalError || chunk.error, inputTokens: usage.inputTokens, outputTokens: usage.outputTokens, contextTokens, queue_remaining: state.queue.length, } emit(eventName, payload) if (state.queue.length > 0) { dequeueNextQueuedRun(socket, sessionId) } } function delay(ms: number): Promise { return new Promise(resolve => setTimeout(resolve, ms)) }