[codex] fix bridge tool marker flush persistence (#1002)

* fix: don't drop pending tool-call-marker prefix on tool.started/run.done

The `filterBridgeToolCallMarkupDelta` filter holds back any text that
ends in a partial prefix of `[Calling tool:` (i.e. `[`, `[C`, `[Ca`,
..., `[Calling tool`) so it can decide whether the buffered chars are
the start of a tool-call markup block to be hidden, or just regular
text to be released by the next delta.

The bug: that "release on next delta" assumption breaks at TWO points:

1. **On `tool.started`**: the next chunk for this assistant message is
   the tool call itself, NOT a follow-up text delta. Buffered chars
   sit there forever and nothing flushes them — they vanish silently
   from the user-visible stream.

2. **On run completion**: the code did
   `state.bridgePendingToolCallMarkup = undefined` directly, dropping
   any pending chars without forwarding them.

Both cases produce the user-visible symptom of "abrupt cuts in text
right before/after tool calls (terminal, read_file, write_file...)" —
1 to 13 characters disappear at exactly the boundary where the model
was emitting natural prose that happened to end with `[`.

The fix introduces `flushPendingToolCallMarkup(state)` and calls it:

- In the `tool.started` branch BEFORE recording the tool call, so the
  buffered chars are appended to the open assistant message and emitted
  as a normal `message.delta` to the client.
- At run-done BEFORE clearing the buffer, same flush path.

This is a pure recovery patch — no change to the marker detection
logic itself. If the buffer turns out to actually be a real
`[Calling tool: ...]` marker that just hasn't completed yet, that
case is still caught by the existing `markerIdx >= 0` branch in the
filter on the next delta. The only behavioral change is that the
"orphan" cases (text that ends with `[` but never becomes a marker)
are no longer dropped.

* fix bridge marker flush persistence

---------

Co-authored-by: Paulo Cavallari <paulocavallari@users.noreply.github.com>
This commit is contained in:
ekko
2026-05-25 11:09:16 +08:00
committed by GitHub
parent 9e35d81f48
commit bbb8b1d536
4 changed files with 141 additions and 2 deletions
@@ -5,6 +5,27 @@ export interface BridgeDeltaFilterState {
const TOOL_CALL_MARKER = '[Calling tool:'
const MAX_PENDING_TOOL_MARKUP_LENGTH = 100_000
/**
* Flush any partial-prefix that was held back waiting to see if it would
* become a `[Calling tool: ...]` marker. Call this when the streaming
* context guarantees no marker can follow (e.g. on `tool.started`,
* `tool.completed`, run completion, run failure, abort).
*
* Without this, deltas that legitimately end with `[`, `[C`, `[Ca`, ...,
* `[Calling tool` are silently dropped from the user-visible stream
* because the filter expected the buffered chars to either complete the
* marker (and be discarded) or be released by a follow-up delta — but
* follow-up deltas don't always come for the SAME assistant message.
*
* Returns the buffered text so callers can forward it to the client as
* a regular `message.delta` payload.
*/
export function flushPendingToolCallMarkup(state: BridgeDeltaFilterState): string {
const pending = state.bridgePendingToolCallMarkup || ''
state.bridgePendingToolCallMarkup = ''
return pending
}
function findToolMarkupEnd(text: string, start: number): number {
let depth = 0
let inString = false
@@ -30,7 +30,7 @@ import { summarizeToolArguments } from './response-utils'
import type { ContentBlock, SessionState } from './types'
import type { ChatMessage } from '../../../lib/context-compressor'
import { resolveBridgeRunModelConfig, type RunModelGroup } from './model-config'
import { filterBridgeToolCallMarkupDelta } from './bridge-delta'
import { filterBridgeToolCallMarkupDelta, flushPendingToolCallMarkup } from './bridge-delta'
const BRIDGE_USAGE_FLUSH_DELAY_MS = 200
@@ -69,6 +69,38 @@ export function bridgeTerminalError(chunk: Pick<AgentBridgeOutput, 'status' | 'e
return null
}
function findOpenAssistantMessage(state: SessionState, runMarker: string) {
for (let i = state.messages.length - 1; i >= 0; i -= 1) {
const message = state.messages[i]
if (message.runMarker === runMarker && message.role === 'assistant' && message.finish_reason == null) return message
}
return undefined
}
function flushPendingToolMarkupToAssistant(
state: SessionState,
runMarker: string,
runId: string,
emit: (event: string, payload: any) => void,
): string {
const pendingMarkup = flushPendingToolCallMarkup(state)
if (!pendingMarkup) return ''
state.bridgeOutput = (state.bridgeOutput || '') + pendingMarkup
state.bridgePendingAssistantContent = (state.bridgePendingAssistantContent || '') + pendingMarkup
const last = findOpenAssistantMessage(state, runMarker)
if (last) {
last.content += pendingMarkup
}
emit('message.delta', {
event: 'message.delta',
run_id: runId,
delta: pendingMarkup,
output: state.bridgeOutput,
})
return pendingMarkup
}
function finiteToken(value: unknown): number | undefined {
return typeof value === 'number' && Number.isFinite(value) && value >= 0
? Math.floor(value)
@@ -464,6 +496,12 @@ async function applyBridgeChunkAsync(
usage,
)
} else if (evType === 'tool.started') {
// Flush any partial tool-call-marker prefix that was held back by
// the markup filter. Without this, deltas ending in `[`, `[C`,
// `[Ca`, etc. are silently dropped because no follow-up delta will
// come for this assistant message — the next chunk is the tool call
// itself. See bridge-delta.ts for full rationale.
flushPendingToolMarkupToAssistant(state, runMarker, chunk.run_id, emit)
flushBridgePendingToDb(state, sessionId, runMarker)
const toolName = (ev.tool_name as string) || ''
const args = ev.args as Record<string, unknown> | undefined
@@ -714,6 +752,11 @@ async function applyBridgeChunkAsync(
return
}
// If the run terminated while we still had a partial tool-call-marker
// prefix buffered, flush it to the user-visible stream now. Discarding
// it (which the line below was doing implicitly) silently drops the
// final characters of the assistant message.
flushPendingToolMarkupToAssistant(state, runMarker, chunk.run_id, emit)
flushBridgePendingToDb(state, sessionId)
state.bridgePendingToolCallMarkup = undefined
updateSessionStats(sessionId)
+9 -1
View File
@@ -1,6 +1,6 @@
import { describe, expect, it } from 'vitest'
import { filterBridgeToolCallMarkupDelta } from '../../packages/server/src/services/hermes/run-chat/bridge-delta'
import { filterBridgeToolCallMarkupDelta, flushPendingToolCallMarkup } from '../../packages/server/src/services/hermes/run-chat/bridge-delta'
describe('run-chat bridge delta filtering', () => {
it('keeps ordinary assistant text', () => {
@@ -37,4 +37,12 @@ describe('run-chat bridge delta filtering', () => {
expect(filterBridgeToolCallMarkupDelta(state, 'Text [Call')).toBe('Text ')
expect(filterBridgeToolCallMarkupDelta(state, 'ing tool: terminal with arguments: {}]\nDone')).toBe('Done')
})
it('flushes an orphan partial marker suffix when no text chunk follows', () => {
const state = {}
expect(filterBridgeToolCallMarkupDelta(state, 'Text [Call')).toBe('Text ')
expect(flushPendingToolCallMarkup(state)).toBe('[Call')
expect(flushPendingToolCallMarkup(state)).toBe('')
})
})
@@ -232,6 +232,73 @@ describe('bridge run final context usage', () => {
}))
})
it('persists pending tool marker text before a bridge run completes', async () => {
const emit = vi.fn()
const nsp = makeNamespace(emit)
const socket = makeSocket()
const state = makeState()
const persistedContent: string[] = []
flushBridgePendingToDbMock.mockImplementation((targetState: any) => {
persistedContent.push(targetState.bridgePendingAssistantContent || '')
targetState.bridgePendingAssistantContent = ''
})
ensureOpenBridgeAssistantMessageMock.mockImplementation((targetState: any, sessionId: string, runMarker: string) => {
let message = [...targetState.messages].reverse().find((m: any) => m.runMarker === runMarker && m.role === 'assistant' && m.finish_reason == null)
if (!message) {
message = {
id: targetState.messages.length + 1,
session_id: sessionId,
runMarker,
role: 'assistant',
content: '',
timestamp: Math.floor(Date.now() / 1000),
}
targetState.messages.push(message)
}
return message
})
const sessionMap = new Map([['session-1', state]])
const bridge = {
chat: vi.fn().mockResolvedValue({ run_id: 'run-1', status: 'started' }),
contextEstimate: vi.fn().mockResolvedValue({
token_count: 12345,
message_count: 2,
tool_count: 4,
system_prompt_chars: 13,
}),
streamOutput: vi.fn(async function* () {
yield { run_id: 'run-1', done: false, status: 'running', delta: 'Text [Call', events: [] }
yield { run_id: 'run-1', done: true, status: 'completed', output: '', events: [] }
}),
} as any
const { handleBridgeRun } = await import('../../packages/server/src/services/hermes/run-chat/handle-bridge-run')
await handleBridgeRun(
nsp,
socket,
{ input: 'hello', session_id: 'session-1' },
'default',
sessionMap,
bridge,
false,
vi.fn(),
vi.fn(),
)
expect(persistedContent).toContain('Text [Call')
expect(emit).toHaveBeenCalledWith('message.delta', expect.objectContaining({
delta: 'Text ',
output: 'Text ',
}))
expect(emit).toHaveBeenCalledWith('message.delta', expect.objectContaining({
delta: '[Call',
output: 'Text [Call',
}))
expect(emit).toHaveBeenCalledWith('run.completed', expect.objectContaining({
output: 'Text [Call',
}))
})
it('refreshes full context tokens when a bridge run fails', async () => {
const emit = vi.fn()
const nsp = makeNamespace(emit)