fix(bridge): preserve text/tool-call ordering to stop split narration (#1145)
When the model interleaves narration text with tool calls within one
turn ("text → tool → more text"), the assistant text was rendered split
across the tool boundary — a word could be cut in half, e.g. the part
before the tool call ending mid-word and the remainder appearing after
the tool card.
Root cause: the agent bridge (`hermes_bridge.py`) accumulated streamed
text in `RunRecord.deltas` and tool/lifecycle events in
`RunRecord.events` as two parallel lists with no relative ordering. On
poll, the aggregated text (`"".join(deltas)`) and the events were
delivered separately, and the Node consumer (`handle-bridge-run.ts`)
processed all `chunk.events` (including `tool.started`) before the
aggregated `chunk.delta`. The real interleaving of text and tool calls
was therefore lost, splitting the text around the tool boundary.
Fix:
- Bridge: `stream_callback` now also appends each text chunk as an
ordered `stream.delta` event into the same `events` list as
tool.started/tool.completed, preserving true interleaving. `deltas`
is still kept for the aggregated `output`/resume snapshot.
- Node: process `stream.delta` events inline within the events loop (in
true order), and skip the aggregated `chunk.delta` when ordered
`stream.delta` events were present for that chunk (avoids duplicate
text). Text-delta handling was extracted into `processBridgeTextDelta`
and reused by both paths.
Verified end-to-end: narration that calls a tool mid-sentence now
streams and persists as coherent text in the exact order produced, with
no word split across the tool boundary.
Co-authored-by: Paulo Cavallari <paulocavallari@users.noreply.github.com>
This commit is contained in:
@@ -1554,7 +1554,20 @@ class AgentPool:
|
|||||||
with _profile_env(profile):
|
with _profile_env(profile):
|
||||||
def stream_callback(delta: str) -> None:
|
def stream_callback(delta: str) -> None:
|
||||||
with self._lock:
|
with self._lock:
|
||||||
record.deltas.append(str(delta))
|
text = str(delta)
|
||||||
|
# Keep `deltas` for the aggregated `output`/resume snapshot,
|
||||||
|
# AND record each text chunk as an ordered event in the SAME
|
||||||
|
# `events` list used by tool.started/tool.completed. Text and
|
||||||
|
# tool events were previously tracked in two parallel lists
|
||||||
|
# with no relative ordering, so when the model interleaved
|
||||||
|
# narration and tool calls ("text → tool → more text") the
|
||||||
|
# consumer reordered them — processing all events before the
|
||||||
|
# aggregated delta — which visibly split a word across the
|
||||||
|
# tool boundary. Recording text as ordered events preserves
|
||||||
|
# the true interleaving.
|
||||||
|
record.deltas.append(text)
|
||||||
|
if text:
|
||||||
|
record.events.append({"event": "stream.delta", "delta": text})
|
||||||
|
|
||||||
approval_session_token = None
|
approval_session_token = None
|
||||||
registered_gateway_approval_session = None
|
registered_gateway_approval_session = None
|
||||||
|
|||||||
@@ -100,6 +100,42 @@ function flushPendingToolMarkupToAssistant(
|
|||||||
return pendingMarkup
|
return pendingMarkup
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function processBridgeTextDelta(
|
||||||
|
state: SessionState,
|
||||||
|
sessionId: string,
|
||||||
|
runMarker: string,
|
||||||
|
runId: string,
|
||||||
|
rawDelta: string,
|
||||||
|
emit: (event: string, payload: any) => void,
|
||||||
|
): void {
|
||||||
|
const delta = filterBridgeToolCallMarkupDelta(state, rawDelta)
|
||||||
|
if (!delta) return
|
||||||
|
state.bridgeOutput = (state.bridgeOutput || '') + delta
|
||||||
|
state.bridgePendingAssistantContent = (state.bridgePendingAssistantContent || '') + delta
|
||||||
|
const last = [...state.messages].reverse().find(m => m.runMarker === runMarker)
|
||||||
|
if (last?.role === 'assistant' && last.finish_reason == null) {
|
||||||
|
last.content += delta
|
||||||
|
syncBridgeReasoningToMessage(last, state.bridgePendingReasoningContent)
|
||||||
|
} else {
|
||||||
|
state.messages.push({
|
||||||
|
id: state.messages.length + 1,
|
||||||
|
session_id: sessionId,
|
||||||
|
runMarker,
|
||||||
|
role: 'assistant',
|
||||||
|
content: delta,
|
||||||
|
reasoning: state.bridgePendingReasoningContent || null,
|
||||||
|
reasoning_content: state.bridgePendingReasoningContent || null,
|
||||||
|
timestamp: Math.floor(Date.now() / 1000),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
emit('message.delta', {
|
||||||
|
event: 'message.delta',
|
||||||
|
run_id: runId,
|
||||||
|
delta,
|
||||||
|
output: state.bridgeOutput,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
function finiteToken(value: unknown): number | undefined {
|
function finiteToken(value: unknown): number | undefined {
|
||||||
return typeof value === 'number' && Number.isFinite(value) && value >= 0
|
return typeof value === 'number' && Number.isFinite(value) && value >= 0
|
||||||
? Math.floor(value)
|
? Math.floor(value)
|
||||||
@@ -572,8 +608,19 @@ async function applyBridgeChunkAsync(
|
|||||||
|
|
||||||
state.runId = chunk.run_id
|
state.runId = chunk.run_id
|
||||||
|
|
||||||
|
// When the bridge emits text as ordered `stream.delta` events (interleaved
|
||||||
|
// with tool.started/tool.completed in the SAME events list), we process the
|
||||||
|
// text here in true order and must NOT also process the aggregated
|
||||||
|
// `chunk.delta` below (that would duplicate the text). This flag tracks it.
|
||||||
|
let sawStreamDeltaEvent = false
|
||||||
|
|
||||||
for (const ev of chunk.events || []) {
|
for (const ev of chunk.events || []) {
|
||||||
const evType = ev.event as string | undefined
|
const evType = ev.event as string | undefined
|
||||||
|
if (evType === 'stream.delta') {
|
||||||
|
sawStreamDeltaEvent = true
|
||||||
|
processBridgeTextDelta(state, sessionId, runMarker, chunk.run_id, String((ev as any).delta || ''), emit)
|
||||||
|
continue
|
||||||
|
}
|
||||||
if (evType === 'bridge.context.ready') {
|
if (evType === 'bridge.context.ready') {
|
||||||
cacheBridgeContext(state, ev)
|
cacheBridgeContext(state, ev)
|
||||||
const usage = await calcAndUpdateUsage(sessionId, state, emit)
|
const usage = await calcAndUpdateUsage(sessionId, state, emit)
|
||||||
@@ -837,7 +884,11 @@ async function applyBridgeChunkAsync(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (chunk.delta) {
|
// Only process the aggregated chunk.delta when the bridge did NOT emit
|
||||||
|
// ordered stream.delta events for this chunk. With ordered events, the text
|
||||||
|
// was already handled above in true interleaved order; processing it again
|
||||||
|
// here would duplicate it.
|
||||||
|
if (chunk.delta && !sawStreamDeltaEvent) {
|
||||||
const delta = filterBridgeToolCallMarkupDelta(state, chunk.delta)
|
const delta = filterBridgeToolCallMarkupDelta(state, chunk.delta)
|
||||||
if (delta) {
|
if (delta) {
|
||||||
state.bridgeOutput = (state.bridgeOutput || '') + delta
|
state.bridgeOutput = (state.bridgeOutput || '') + delta
|
||||||
|
|||||||
Reference in New Issue
Block a user