Fix final context and tool status updates (#917)

Co-authored-by: Codex <codex@openai.com>
This commit is contained in:
ekko
2026-05-21 23:21:26 +08:00
committed by GitHub
parent ff1f471745
commit 254573400d
3 changed files with 350 additions and 14 deletions
+40 -7
View File
@@ -79,6 +79,21 @@ function uid(): string {
return Date.now().toString(36) + Math.random().toString(36).slice(2, 8)
}
function isToolOutputError(output: unknown): boolean {
if (typeof output !== 'string' || !output.trim()) return false
try {
const parsed = JSON.parse(output)
if (parsed && typeof parsed === 'object') {
const record = parsed as Record<string, unknown>
if (record.success === false) return true
if (record.error != null && String(record.error).trim() !== '') return true
}
} catch {
return false
}
return false
}
async function uploadFiles(attachments: Attachment[]): Promise<{ name: string; path: string }[]> {
if (attachments.length === 0) return []
const formData = new FormData()
@@ -607,10 +622,11 @@ export const useChatStore = defineStore('chat', () => {
? msgs.filter(m => m.role === 'tool' && m.toolCallId === toolCallId)
: msgs.filter(m => m.role === 'tool' && m.toolStatus === 'running')
if (toolMsgs.length > 0) {
const output = typeof e.output === 'string' ? e.output : undefined
updateMessage(sessionId, toolMsgs[toolMsgs.length - 1].id, {
toolStatus: e.error === true ? 'error' : 'done',
toolStatus: e.error === true || isToolOutputError(output) ? 'error' : 'done',
toolDuration: e.duration,
toolResult: typeof e.output === 'string' ? e.output : undefined,
toolResult: output,
})
}
}
@@ -1224,13 +1240,13 @@ export const useChatStore = defineStore('chat', () => {
: msgs.filter(m => m.role === 'tool' && m.toolStatus === 'running')
if (toolMsgs.length > 0) {
const last = toolMsgs[toolMsgs.length - 1]
// Check if tool errored
const hasError = (evt as any).error === true
const output = typeof (evt as any).output === 'string' ? (evt as any).output : undefined
const hasError = (evt as any).error === true || isToolOutputError(output)
const duration = (evt as any).duration
updateMessage(sid, last.id, {
toolStatus: hasError ? 'error' : 'done',
toolDuration: duration,
toolResult: typeof (evt as any).output === 'string' ? (evt as any).output : undefined,
toolResult: output,
})
}
@@ -1351,6 +1367,14 @@ export const useChatStore = defineStore('chat', () => {
}
case 'run.failed': {
if ((evt as any).inputTokens != null) {
const target = sessions.value.find(s => s.id === sid)
if (target) {
target.inputTokens = (evt as any).inputTokens
target.outputTokens = (evt as any).outputTokens
if ((evt as any).contextTokens != null) target.contextTokens = (evt as any).contextTokens
}
}
addAgentErrorMessage(sid, evt.error)
const msgs = getSessionMsgs(sid)
msgs.forEach((m, i) => {
@@ -1653,11 +1677,12 @@ export const useChatStore = defineStore('chat', () => {
? msgs.filter(m => m.role === 'tool' && m.toolCallId === toolCallId)
: msgs.filter(m => m.role === 'tool' && m.toolStatus === 'running')
if (toolMsgs.length > 0) {
const hasError = (evt as any).error === true
const output = typeof (evt as any).output === 'string' ? (evt as any).output : undefined
const hasError = (evt as any).error === true || isToolOutputError(output)
updateMessage(sid, toolMsgs[toolMsgs.length - 1].id, {
toolStatus: hasError ? 'error' : 'done',
toolDuration: (evt as any).duration,
toolResult: typeof (evt as any).output === 'string' ? (evt as any).output : undefined,
toolResult: output,
})
}
@@ -1764,6 +1789,14 @@ export const useChatStore = defineStore('chat', () => {
}
case 'run.failed': {
if ((evt as any).inputTokens != null) {
const target = sessions.value.find(s => s.id === sid)
if (target) {
target.inputTokens = (evt as any).inputTokens
target.outputTokens = (evt as any).outputTokens
if ((evt as any).contextTokens != null) target.contextTokens = (evt as any).contextTokens
}
}
const hasQueue = (evt as any).queue_remaining > 0
if (hasQueue) {
queueLengths.value.set(sid, (evt as any).queue_remaining)
@@ -10,8 +10,7 @@ import { updateUsage } from '../../../db/hermes/usage-store'
import { logger, bridgeLogger } from '../../logger'
import { AgentBridgeClient, type AgentBridgeMessage, type AgentBridgeOutput } from '../agent-bridge'
import { contentBlocksToString, convertContentBlocksForAgent, extractTextForPreview, isContentBlockArray } from './content-blocks'
import { buildCompressedHistory } from './compression'
import { pushState, replaceState } from './compression'
import { buildCompressedHistory, buildDbHistory, forceCompressBridgeHistory, pushState, replaceState } from './compression'
import { calcAndUpdateUsage, estimateUsageTokensFromMessages } from './usage'
import {
flushBridgePendingToDb,
@@ -20,9 +19,7 @@ import {
recordBridgeToolStarted,
recordBridgeToolCompleted,
} from './bridge-message'
import { forceCompressBridgeHistory } from './compression'
import { summarizeToolArguments } from './response-utils'
import { buildDbHistory } from './compression'
import type { ContentBlock, SessionState } from './types'
import type { ChatMessage } from '../../../lib/context-compressor'
import { resolveBridgeRunModelConfig, type RunModelGroup } from './model-config'
@@ -239,7 +236,21 @@ export async function handleBridgeRun(
})
for await (const chunk of bridge.streamOutput(started.run_id)) {
await applyBridgeChunkAsync(nsp, socket, state, session_id, runMarker, chunk, emit, profile, sessionMap, bridge, dequeueNextQueuedRun)
await applyBridgeChunkAsync(
nsp,
socket,
state,
session_id,
runMarker,
chunk,
emit,
profile,
sessionMap,
bridge,
dequeueNextQueuedRun,
fullInstructions,
{ model: resolvedModel, provider: resolvedProvider },
)
if (chunk.done) break
}
} catch (err: any) {
@@ -256,17 +267,88 @@ export async function handleBridgeRun(
flushBridgePendingToDb(state, session_id)
updateSessionStats(session_id)
const message = err instanceof Error ? err.message : String(err)
emit('run.failed', { event: 'run.failed', error: message, queue_remaining: queueLen })
const errUsage = await calcAndUpdateUsage(session_id, state, emit)
const errContextTokens = await refreshFinalContextUsage({
sessionId: session_id,
profile,
model: resolvedModel,
provider: resolvedProvider,
instructions: fullInstructions,
state,
usage: errUsage,
emit,
bridge,
})
updateUsage(session_id, {
inputTokens: errUsage.inputTokens,
outputTokens: errUsage.outputTokens,
profile: state.profile,
profile,
})
emit('run.failed', {
event: 'run.failed',
error: message,
inputTokens: errUsage.inputTokens,
outputTokens: errUsage.outputTokens,
contextTokens: errContextTokens,
queue_remaining: queueLen,
})
if (queueLen > 0) dequeueNextQueuedRun(socket, session_id)
}
}
async function refreshFinalContextUsage(args: {
sessionId: string
profile: string
model?: string | null
provider?: string | null
instructions: string
state: SessionState
usage: { inputTokens: number; outputTokens: number }
emit: (event: string, payload: any) => void
bridge: AgentBridgeClient
}): Promise<number | undefined> {
try {
const finalHistory = await buildDbHistory(args.sessionId, { excludeLastUser: false })
const estimate = await args.bridge.contextEstimate(
args.sessionId,
finalHistory,
args.instructions,
args.profile,
{ model: args.model ?? undefined, provider: args.provider ?? undefined },
)
const contextTokens = typeof estimate.token_count === 'number' && Number.isFinite(estimate.token_count) && estimate.token_count > 0
? Math.floor(estimate.token_count)
: undefined
if (contextTokens == null) return args.state.contextTokens
args.state.contextTokens = contextTokens
args.emit('usage.updated', {
event: 'usage.updated',
inputTokens: args.usage.inputTokens,
outputTokens: args.usage.outputTokens,
contextTokens,
})
bridgeLogger.info({
sessionId: args.sessionId,
profile: args.profile,
model: args.model,
provider: args.provider,
messages: estimate.message_count,
toolCount: estimate.tool_count,
systemPromptChars: estimate.system_prompt_chars,
fullContextTokens: contextTokens,
}, '[chat-run-socket] final full context estimate')
return contextTokens
} catch (err) {
bridgeLogger.warn({
err: err instanceof Error ? { message: err.message, name: err.name } : err,
sessionId: args.sessionId,
profile: args.profile,
}, '[chat-run-socket] final full context estimate failed')
return args.state.contextTokens
}
}
async function applyBridgeChunkAsync(
nsp: ReturnType<Server['of']>,
socket: Socket,
@@ -279,6 +361,8 @@ async function applyBridgeChunkAsync(
sessionMap: Map<string, SessionState>,
bridge: AgentBridgeClient,
dequeueNextQueuedRun: (socket: Socket, sessionId: string, fallbackProfile?: string) => void,
instructions: string,
modelContext: { model?: string | null; provider?: string | null },
): Promise<void> {
if (state.activeRunMarker !== runMarker) {
bridgeLogger.info({
@@ -509,6 +593,17 @@ async function applyBridgeChunkAsync(
updateSessionStats(sessionId)
await delay(BRIDGE_USAGE_FLUSH_DELAY_MS)
const usage = await calcAndUpdateUsage(sessionId, state, emit)
const contextTokens = await refreshFinalContextUsage({
sessionId,
profile,
model: modelContext.model,
provider: modelContext.provider,
instructions,
state,
usage,
emit,
bridge,
})
updateUsage(sessionId, {
inputTokens: usage.inputTokens,
outputTokens: usage.outputTokens,
@@ -536,6 +631,7 @@ async function applyBridgeChunkAsync(
error: terminalError || chunk.error,
inputTokens: usage.inputTokens,
outputTokens: usage.outputTokens,
contextTokens,
queue_remaining: state.queue.length,
}
emit(eventName, payload)
@@ -0,0 +1,207 @@
import { beforeEach, describe, expect, it, vi } from 'vitest'
const getSystemPromptMock = vi.fn()
const getSessionMock = vi.fn()
const createSessionMock = vi.fn()
const addMessageMock = vi.fn()
const updateSessionMock = vi.fn()
const updateSessionStatsMock = vi.fn()
const updateUsageMock = vi.fn()
const buildCompressedHistoryMock = vi.fn()
const buildDbHistoryMock = vi.fn()
const pushStateMock = vi.fn()
const replaceStateMock = vi.fn()
const forceCompressBridgeHistoryMock = vi.fn()
const calcAndUpdateUsageMock = vi.fn()
const estimateUsageTokensFromMessagesMock = vi.fn()
const flushBridgePendingToDbMock = vi.fn()
const ensureOpenBridgeAssistantMessageMock = vi.fn()
const syncBridgeReasoningToMessageMock = vi.fn()
const recordBridgeToolStartedMock = vi.fn()
const recordBridgeToolCompletedMock = vi.fn()
const resolveBridgeRunModelConfigMock = vi.fn()
vi.mock('../../packages/server/src/lib/llm-prompt', () => ({
getSystemPrompt: getSystemPromptMock,
}))
vi.mock('../../packages/server/src/db/hermes/session-store', () => ({
getSession: getSessionMock,
createSession: createSessionMock,
addMessage: addMessageMock,
updateSession: updateSessionMock,
updateSessionStats: updateSessionStatsMock,
}))
vi.mock('../../packages/server/src/db/hermes/usage-store', () => ({
updateUsage: updateUsageMock,
}))
vi.mock('../../packages/server/src/services/logger', () => ({
logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() },
bridgeLogger: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() },
}))
vi.mock('../../packages/server/src/services/hermes/run-chat/compression', () => ({
buildCompressedHistory: buildCompressedHistoryMock,
buildDbHistory: buildDbHistoryMock,
pushState: pushStateMock,
replaceState: replaceStateMock,
forceCompressBridgeHistory: forceCompressBridgeHistoryMock,
}))
vi.mock('../../packages/server/src/services/hermes/run-chat/usage', () => ({
calcAndUpdateUsage: calcAndUpdateUsageMock,
estimateUsageTokensFromMessages: estimateUsageTokensFromMessagesMock,
}))
vi.mock('../../packages/server/src/services/hermes/run-chat/bridge-message', () => ({
flushBridgePendingToDb: flushBridgePendingToDbMock,
ensureOpenBridgeAssistantMessage: ensureOpenBridgeAssistantMessageMock,
syncBridgeReasoningToMessage: syncBridgeReasoningToMessageMock,
recordBridgeToolStarted: recordBridgeToolStartedMock,
recordBridgeToolCompleted: recordBridgeToolCompletedMock,
}))
vi.mock('../../packages/server/src/services/hermes/run-chat/model-config', () => ({
resolveBridgeRunModelConfig: resolveBridgeRunModelConfigMock,
}))
function makeSocket() {
return {
connected: true,
emit: vi.fn(),
join: vi.fn(),
} as any
}
function makeNamespace(emit: ReturnType<typeof vi.fn>) {
const room = new Set(['socket-1'])
return {
adapter: { rooms: new Map([['session:session-1', room]]) },
to: vi.fn(() => ({ emit })),
} as any
}
function makeState() {
return {
messages: [],
isWorking: false,
events: [],
queue: [],
} as any
}
describe('bridge run final context usage', () => {
beforeEach(() => {
vi.clearAllMocks()
getSystemPromptMock.mockReturnValue('system prompt')
getSessionMock.mockReturnValue({ id: 'session-1', profile: 'default', model: '', provider: '' })
resolveBridgeRunModelConfigMock.mockResolvedValue({ model: 'gpt-test', provider: 'openai' })
buildCompressedHistoryMock.mockResolvedValue([{ role: 'user', content: 'previous' }])
buildDbHistoryMock.mockResolvedValue([
{ role: 'user', content: 'hello' },
{ role: 'assistant', content: 'done' },
])
calcAndUpdateUsageMock.mockResolvedValue({ inputTokens: 11, outputTokens: 7 })
})
it('refreshes full context tokens when a bridge run completes', async () => {
const emit = vi.fn()
const nsp = makeNamespace(emit)
const socket = makeSocket()
const state = makeState()
const sessionMap = new Map([['session-1', state]])
const bridge = {
chat: vi.fn().mockResolvedValue({ run_id: 'run-1', status: 'started' }),
contextEstimate: vi.fn().mockResolvedValue({
token_count: 12345,
message_count: 2,
tool_count: 4,
system_prompt_chars: 13,
}),
streamOutput: vi.fn(async function* () {
yield { run_id: 'run-1', done: true, status: 'completed', output: 'done' }
}),
} as any
const { handleBridgeRun } = await import('../../packages/server/src/services/hermes/run-chat/handle-bridge-run')
await handleBridgeRun(
nsp,
socket,
{ input: 'hello', session_id: 'session-1' },
'default',
sessionMap,
bridge,
false,
vi.fn(),
vi.fn(),
)
expect(bridge.contextEstimate).toHaveBeenCalledWith(
'session-1',
[
{ role: 'user', content: 'hello' },
{ role: 'assistant', content: 'done' },
],
'system prompt',
'default',
{ model: 'gpt-test', provider: 'openai' },
)
expect(state.contextTokens).toBe(12345)
expect(emit).toHaveBeenCalledWith('usage.updated', expect.objectContaining({
inputTokens: 11,
outputTokens: 7,
contextTokens: 12345,
}))
expect(emit).toHaveBeenCalledWith('run.completed', expect.objectContaining({
inputTokens: 11,
outputTokens: 7,
contextTokens: 12345,
}))
})
it('refreshes full context tokens when a bridge run fails', async () => {
const emit = vi.fn()
const nsp = makeNamespace(emit)
const socket = makeSocket()
const state = makeState()
const sessionMap = new Map([['session-1', state]])
const bridge = {
chat: vi.fn().mockRejectedValue(new Error('bridge timeout')),
contextEstimate: vi.fn().mockResolvedValue({
token_count: 54321,
message_count: 1,
tool_count: 4,
system_prompt_chars: 13,
}),
streamOutput: vi.fn(),
} as any
const { handleBridgeRun } = await import('../../packages/server/src/services/hermes/run-chat/handle-bridge-run')
await handleBridgeRun(
nsp,
socket,
{ input: 'hello', session_id: 'session-1' },
'default',
sessionMap,
bridge,
false,
vi.fn(),
vi.fn(),
)
expect(state.contextTokens).toBe(54321)
expect(emit).toHaveBeenCalledWith('usage.updated', expect.objectContaining({
inputTokens: 11,
outputTokens: 7,
contextTokens: 54321,
}))
expect(emit).toHaveBeenCalledWith('run.failed', expect.objectContaining({
error: 'bridge timeout',
inputTokens: 11,
outputTokens: 7,
contextTokens: 54321,
}))
})
})