diff --git a/packages/client/src/stores/hermes/chat.ts b/packages/client/src/stores/hermes/chat.ts index f6c7f0b..846c5be 100644 --- a/packages/client/src/stores/hermes/chat.ts +++ b/packages/client/src/stores/hermes/chat.ts @@ -79,6 +79,21 @@ function uid(): string { return Date.now().toString(36) + Math.random().toString(36).slice(2, 8) } +function isToolOutputError(output: unknown): boolean { + if (typeof output !== 'string' || !output.trim()) return false + try { + const parsed = JSON.parse(output) + if (parsed && typeof parsed === 'object') { + const record = parsed as Record + if (record.success === false) return true + if (record.error != null && String(record.error).trim() !== '') return true + } + } catch { + return false + } + return false +} + async function uploadFiles(attachments: Attachment[]): Promise<{ name: string; path: string }[]> { if (attachments.length === 0) return [] const formData = new FormData() @@ -607,10 +622,11 @@ export const useChatStore = defineStore('chat', () => { ? msgs.filter(m => m.role === 'tool' && m.toolCallId === toolCallId) : msgs.filter(m => m.role === 'tool' && m.toolStatus === 'running') if (toolMsgs.length > 0) { + const output = typeof e.output === 'string' ? e.output : undefined updateMessage(sessionId, toolMsgs[toolMsgs.length - 1].id, { - toolStatus: e.error === true ? 'error' : 'done', + toolStatus: e.error === true || isToolOutputError(output) ? 'error' : 'done', toolDuration: e.duration, - toolResult: typeof e.output === 'string' ? e.output : undefined, + toolResult: output, }) } } @@ -1224,13 +1240,13 @@ export const useChatStore = defineStore('chat', () => { : msgs.filter(m => m.role === 'tool' && m.toolStatus === 'running') if (toolMsgs.length > 0) { const last = toolMsgs[toolMsgs.length - 1] - // Check if tool errored - const hasError = (evt as any).error === true + const output = typeof (evt as any).output === 'string' ? (evt as any).output : undefined + const hasError = (evt as any).error === true || isToolOutputError(output) const duration = (evt as any).duration updateMessage(sid, last.id, { toolStatus: hasError ? 'error' : 'done', toolDuration: duration, - toolResult: typeof (evt as any).output === 'string' ? (evt as any).output : undefined, + toolResult: output, }) } @@ -1351,6 +1367,14 @@ export const useChatStore = defineStore('chat', () => { } case 'run.failed': { + if ((evt as any).inputTokens != null) { + const target = sessions.value.find(s => s.id === sid) + if (target) { + target.inputTokens = (evt as any).inputTokens + target.outputTokens = (evt as any).outputTokens + if ((evt as any).contextTokens != null) target.contextTokens = (evt as any).contextTokens + } + } addAgentErrorMessage(sid, evt.error) const msgs = getSessionMsgs(sid) msgs.forEach((m, i) => { @@ -1653,11 +1677,12 @@ export const useChatStore = defineStore('chat', () => { ? msgs.filter(m => m.role === 'tool' && m.toolCallId === toolCallId) : msgs.filter(m => m.role === 'tool' && m.toolStatus === 'running') if (toolMsgs.length > 0) { - const hasError = (evt as any).error === true + const output = typeof (evt as any).output === 'string' ? (evt as any).output : undefined + const hasError = (evt as any).error === true || isToolOutputError(output) updateMessage(sid, toolMsgs[toolMsgs.length - 1].id, { toolStatus: hasError ? 'error' : 'done', toolDuration: (evt as any).duration, - toolResult: typeof (evt as any).output === 'string' ? (evt as any).output : undefined, + toolResult: output, }) } @@ -1764,6 +1789,14 @@ export const useChatStore = defineStore('chat', () => { } case 'run.failed': { + if ((evt as any).inputTokens != null) { + const target = sessions.value.find(s => s.id === sid) + if (target) { + target.inputTokens = (evt as any).inputTokens + target.outputTokens = (evt as any).outputTokens + if ((evt as any).contextTokens != null) target.contextTokens = (evt as any).contextTokens + } + } const hasQueue = (evt as any).queue_remaining > 0 if (hasQueue) { queueLengths.value.set(sid, (evt as any).queue_remaining) diff --git a/packages/server/src/services/hermes/run-chat/handle-bridge-run.ts b/packages/server/src/services/hermes/run-chat/handle-bridge-run.ts index 4582f31..94c9a5f 100644 --- a/packages/server/src/services/hermes/run-chat/handle-bridge-run.ts +++ b/packages/server/src/services/hermes/run-chat/handle-bridge-run.ts @@ -10,8 +10,7 @@ import { updateUsage } from '../../../db/hermes/usage-store' import { logger, bridgeLogger } from '../../logger' import { AgentBridgeClient, type AgentBridgeMessage, type AgentBridgeOutput } from '../agent-bridge' import { contentBlocksToString, convertContentBlocksForAgent, extractTextForPreview, isContentBlockArray } from './content-blocks' -import { buildCompressedHistory } from './compression' -import { pushState, replaceState } from './compression' +import { buildCompressedHistory, buildDbHistory, forceCompressBridgeHistory, pushState, replaceState } from './compression' import { calcAndUpdateUsage, estimateUsageTokensFromMessages } from './usage' import { flushBridgePendingToDb, @@ -20,9 +19,7 @@ import { recordBridgeToolStarted, recordBridgeToolCompleted, } from './bridge-message' -import { forceCompressBridgeHistory } from './compression' import { summarizeToolArguments } from './response-utils' -import { buildDbHistory } from './compression' import type { ContentBlock, SessionState } from './types' import type { ChatMessage } from '../../../lib/context-compressor' import { resolveBridgeRunModelConfig, type RunModelGroup } from './model-config' @@ -239,7 +236,21 @@ export async function handleBridgeRun( }) for await (const chunk of bridge.streamOutput(started.run_id)) { - await applyBridgeChunkAsync(nsp, socket, state, session_id, runMarker, chunk, emit, profile, sessionMap, bridge, dequeueNextQueuedRun) + await applyBridgeChunkAsync( + nsp, + socket, + state, + session_id, + runMarker, + chunk, + emit, + profile, + sessionMap, + bridge, + dequeueNextQueuedRun, + fullInstructions, + { model: resolvedModel, provider: resolvedProvider }, + ) if (chunk.done) break } } catch (err: any) { @@ -256,17 +267,88 @@ export async function handleBridgeRun( flushBridgePendingToDb(state, session_id) updateSessionStats(session_id) const message = err instanceof Error ? err.message : String(err) - emit('run.failed', { event: 'run.failed', error: message, queue_remaining: queueLen }) const errUsage = await calcAndUpdateUsage(session_id, state, emit) + const errContextTokens = await refreshFinalContextUsage({ + sessionId: session_id, + profile, + model: resolvedModel, + provider: resolvedProvider, + instructions: fullInstructions, + state, + usage: errUsage, + emit, + bridge, + }) updateUsage(session_id, { inputTokens: errUsage.inputTokens, outputTokens: errUsage.outputTokens, - profile: state.profile, + profile, + }) + emit('run.failed', { + event: 'run.failed', + error: message, + inputTokens: errUsage.inputTokens, + outputTokens: errUsage.outputTokens, + contextTokens: errContextTokens, + queue_remaining: queueLen, }) if (queueLen > 0) dequeueNextQueuedRun(socket, session_id) } } +async function refreshFinalContextUsage(args: { + sessionId: string + profile: string + model?: string | null + provider?: string | null + instructions: string + state: SessionState + usage: { inputTokens: number; outputTokens: number } + emit: (event: string, payload: any) => void + bridge: AgentBridgeClient +}): Promise { + try { + const finalHistory = await buildDbHistory(args.sessionId, { excludeLastUser: false }) + const estimate = await args.bridge.contextEstimate( + args.sessionId, + finalHistory, + args.instructions, + args.profile, + { model: args.model ?? undefined, provider: args.provider ?? undefined }, + ) + const contextTokens = typeof estimate.token_count === 'number' && Number.isFinite(estimate.token_count) && estimate.token_count > 0 + ? Math.floor(estimate.token_count) + : undefined + if (contextTokens == null) return args.state.contextTokens + + args.state.contextTokens = contextTokens + args.emit('usage.updated', { + event: 'usage.updated', + inputTokens: args.usage.inputTokens, + outputTokens: args.usage.outputTokens, + contextTokens, + }) + bridgeLogger.info({ + sessionId: args.sessionId, + profile: args.profile, + model: args.model, + provider: args.provider, + messages: estimate.message_count, + toolCount: estimate.tool_count, + systemPromptChars: estimate.system_prompt_chars, + fullContextTokens: contextTokens, + }, '[chat-run-socket] final full context estimate') + return contextTokens + } catch (err) { + bridgeLogger.warn({ + err: err instanceof Error ? { message: err.message, name: err.name } : err, + sessionId: args.sessionId, + profile: args.profile, + }, '[chat-run-socket] final full context estimate failed') + return args.state.contextTokens + } +} + async function applyBridgeChunkAsync( nsp: ReturnType, socket: Socket, @@ -279,6 +361,8 @@ async function applyBridgeChunkAsync( sessionMap: Map, bridge: AgentBridgeClient, dequeueNextQueuedRun: (socket: Socket, sessionId: string, fallbackProfile?: string) => void, + instructions: string, + modelContext: { model?: string | null; provider?: string | null }, ): Promise { if (state.activeRunMarker !== runMarker) { bridgeLogger.info({ @@ -509,6 +593,17 @@ async function applyBridgeChunkAsync( updateSessionStats(sessionId) await delay(BRIDGE_USAGE_FLUSH_DELAY_MS) const usage = await calcAndUpdateUsage(sessionId, state, emit) + const contextTokens = await refreshFinalContextUsage({ + sessionId, + profile, + model: modelContext.model, + provider: modelContext.provider, + instructions, + state, + usage, + emit, + bridge, + }) updateUsage(sessionId, { inputTokens: usage.inputTokens, outputTokens: usage.outputTokens, @@ -536,6 +631,7 @@ async function applyBridgeChunkAsync( error: terminalError || chunk.error, inputTokens: usage.inputTokens, outputTokens: usage.outputTokens, + contextTokens, queue_remaining: state.queue.length, } emit(eventName, payload) diff --git a/tests/server/run-chat-bridge-final-context.test.ts b/tests/server/run-chat-bridge-final-context.test.ts new file mode 100644 index 0000000..5137796 --- /dev/null +++ b/tests/server/run-chat-bridge-final-context.test.ts @@ -0,0 +1,207 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const getSystemPromptMock = vi.fn() +const getSessionMock = vi.fn() +const createSessionMock = vi.fn() +const addMessageMock = vi.fn() +const updateSessionMock = vi.fn() +const updateSessionStatsMock = vi.fn() +const updateUsageMock = vi.fn() +const buildCompressedHistoryMock = vi.fn() +const buildDbHistoryMock = vi.fn() +const pushStateMock = vi.fn() +const replaceStateMock = vi.fn() +const forceCompressBridgeHistoryMock = vi.fn() +const calcAndUpdateUsageMock = vi.fn() +const estimateUsageTokensFromMessagesMock = vi.fn() +const flushBridgePendingToDbMock = vi.fn() +const ensureOpenBridgeAssistantMessageMock = vi.fn() +const syncBridgeReasoningToMessageMock = vi.fn() +const recordBridgeToolStartedMock = vi.fn() +const recordBridgeToolCompletedMock = vi.fn() +const resolveBridgeRunModelConfigMock = vi.fn() + +vi.mock('../../packages/server/src/lib/llm-prompt', () => ({ + getSystemPrompt: getSystemPromptMock, +})) + +vi.mock('../../packages/server/src/db/hermes/session-store', () => ({ + getSession: getSessionMock, + createSession: createSessionMock, + addMessage: addMessageMock, + updateSession: updateSessionMock, + updateSessionStats: updateSessionStatsMock, +})) + +vi.mock('../../packages/server/src/db/hermes/usage-store', () => ({ + updateUsage: updateUsageMock, +})) + +vi.mock('../../packages/server/src/services/logger', () => ({ + logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() }, + bridgeLogger: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() }, +})) + +vi.mock('../../packages/server/src/services/hermes/run-chat/compression', () => ({ + buildCompressedHistory: buildCompressedHistoryMock, + buildDbHistory: buildDbHistoryMock, + pushState: pushStateMock, + replaceState: replaceStateMock, + forceCompressBridgeHistory: forceCompressBridgeHistoryMock, +})) + +vi.mock('../../packages/server/src/services/hermes/run-chat/usage', () => ({ + calcAndUpdateUsage: calcAndUpdateUsageMock, + estimateUsageTokensFromMessages: estimateUsageTokensFromMessagesMock, +})) + +vi.mock('../../packages/server/src/services/hermes/run-chat/bridge-message', () => ({ + flushBridgePendingToDb: flushBridgePendingToDbMock, + ensureOpenBridgeAssistantMessage: ensureOpenBridgeAssistantMessageMock, + syncBridgeReasoningToMessage: syncBridgeReasoningToMessageMock, + recordBridgeToolStarted: recordBridgeToolStartedMock, + recordBridgeToolCompleted: recordBridgeToolCompletedMock, +})) + +vi.mock('../../packages/server/src/services/hermes/run-chat/model-config', () => ({ + resolveBridgeRunModelConfig: resolveBridgeRunModelConfigMock, +})) + +function makeSocket() { + return { + connected: true, + emit: vi.fn(), + join: vi.fn(), + } as any +} + +function makeNamespace(emit: ReturnType) { + const room = new Set(['socket-1']) + return { + adapter: { rooms: new Map([['session:session-1', room]]) }, + to: vi.fn(() => ({ emit })), + } as any +} + +function makeState() { + return { + messages: [], + isWorking: false, + events: [], + queue: [], + } as any +} + +describe('bridge run final context usage', () => { + beforeEach(() => { + vi.clearAllMocks() + getSystemPromptMock.mockReturnValue('system prompt') + getSessionMock.mockReturnValue({ id: 'session-1', profile: 'default', model: '', provider: '' }) + resolveBridgeRunModelConfigMock.mockResolvedValue({ model: 'gpt-test', provider: 'openai' }) + buildCompressedHistoryMock.mockResolvedValue([{ role: 'user', content: 'previous' }]) + buildDbHistoryMock.mockResolvedValue([ + { role: 'user', content: 'hello' }, + { role: 'assistant', content: 'done' }, + ]) + calcAndUpdateUsageMock.mockResolvedValue({ inputTokens: 11, outputTokens: 7 }) + }) + + it('refreshes full context tokens when a bridge run completes', async () => { + const emit = vi.fn() + const nsp = makeNamespace(emit) + const socket = makeSocket() + const state = makeState() + const sessionMap = new Map([['session-1', state]]) + const bridge = { + chat: vi.fn().mockResolvedValue({ run_id: 'run-1', status: 'started' }), + contextEstimate: vi.fn().mockResolvedValue({ + token_count: 12345, + message_count: 2, + tool_count: 4, + system_prompt_chars: 13, + }), + streamOutput: vi.fn(async function* () { + yield { run_id: 'run-1', done: true, status: 'completed', output: 'done' } + }), + } as any + + const { handleBridgeRun } = await import('../../packages/server/src/services/hermes/run-chat/handle-bridge-run') + await handleBridgeRun( + nsp, + socket, + { input: 'hello', session_id: 'session-1' }, + 'default', + sessionMap, + bridge, + false, + vi.fn(), + vi.fn(), + ) + + expect(bridge.contextEstimate).toHaveBeenCalledWith( + 'session-1', + [ + { role: 'user', content: 'hello' }, + { role: 'assistant', content: 'done' }, + ], + 'system prompt', + 'default', + { model: 'gpt-test', provider: 'openai' }, + ) + expect(state.contextTokens).toBe(12345) + expect(emit).toHaveBeenCalledWith('usage.updated', expect.objectContaining({ + inputTokens: 11, + outputTokens: 7, + contextTokens: 12345, + })) + expect(emit).toHaveBeenCalledWith('run.completed', expect.objectContaining({ + inputTokens: 11, + outputTokens: 7, + contextTokens: 12345, + })) + }) + + it('refreshes full context tokens when a bridge run fails', async () => { + const emit = vi.fn() + const nsp = makeNamespace(emit) + const socket = makeSocket() + const state = makeState() + const sessionMap = new Map([['session-1', state]]) + const bridge = { + chat: vi.fn().mockRejectedValue(new Error('bridge timeout')), + contextEstimate: vi.fn().mockResolvedValue({ + token_count: 54321, + message_count: 1, + tool_count: 4, + system_prompt_chars: 13, + }), + streamOutput: vi.fn(), + } as any + + const { handleBridgeRun } = await import('../../packages/server/src/services/hermes/run-chat/handle-bridge-run') + await handleBridgeRun( + nsp, + socket, + { input: 'hello', session_id: 'session-1' }, + 'default', + sessionMap, + bridge, + false, + vi.fn(), + vi.fn(), + ) + + expect(state.contextTokens).toBe(54321) + expect(emit).toHaveBeenCalledWith('usage.updated', expect.objectContaining({ + inputTokens: 11, + outputTokens: 7, + contextTokens: 54321, + })) + expect(emit).toHaveBeenCalledWith('run.failed', expect.objectContaining({ + error: 'bridge timeout', + inputTokens: 11, + outputTokens: 7, + contextTokens: 54321, + })) + }) +})