revert: harden Hermes stream recovery around tool-call boundaries (#189) (#192)

Reverts #189 due to reported bugs.

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
ekko
2026-04-24 22:18:32 +08:00
committed by GitHub
parent bff6f844e6
commit 70ed0e0dc2
6 changed files with 114 additions and 496 deletions
@@ -96,41 +96,26 @@ function buildProxyHeaders(ctx: Context, upstream: string): Record<string, strin
const SSE_EVENTS_PATH = /^\/v1\/runs\/([^/]+)\/events$/
/**
* Parse one complete SSE event block and record usage for run.completed.
* The public stream is forwarded elsewhere; parser failures are accounting-only
* and must never abort the client stream.
* Parse SSE text chunks and extract run.completed events.
* Returns the run_id if a run.completed was found.
*/
function extractRunCompletedFromBlock(block: string): string | null {
const dataLines: string[] = []
for (const rawLine of block.split(/\r?\n/)) {
if (!rawLine.startsWith('data:')) continue
let data = rawLine.slice(5)
if (data.startsWith(' ')) data = data.slice(1)
dataLines.push(data)
}
if (dataLines.length === 0) return null
try {
const data = JSON.parse(dataLines.join('\n'))
if (data.event === 'run.completed' && data.usage && data.run_id) {
const sessionId = getSessionForRun(data.run_id)
if (sessionId) {
updateUsage(sessionId, data.usage.input_tokens, data.usage.output_tokens)
return data.run_id
function extractRunCompletedFromChunk(chunk: string): string | null {
// SSE format: each line is "data: {...}\n\n"
const lines = chunk.split('\n')
for (const line of lines) {
if (!line.startsWith('data: ')) continue
try {
const data = JSON.parse(line.slice(6))
if (data.event === 'run.completed' && data.usage && data.run_id) {
const sessionId = getSessionForRun(data.run_id)
if (sessionId) {
updateUsage(sessionId, data.usage.input_tokens, data.usage.output_tokens)
return data.run_id
}
}
}
} catch { /* not JSON or usage accounting failed; skip */ }
return null
}
function takeSSEBlock(buffer: string): { block: string; rest: string } | null {
const lf = buffer.indexOf('\n\n')
const crlf = buffer.indexOf('\r\n\r\n')
if (lf === -1 && crlf === -1) return null
if (crlf !== -1 && (lf === -1 || crlf < lf)) {
return { block: buffer.slice(0, crlf), rest: buffer.slice(crlf + 4) }
} catch { /* not JSON, skip */ }
}
return { block: buffer.slice(0, lf), rest: buffer.slice(lf + 2) }
return null
}
/**
@@ -157,18 +142,18 @@ async function streamSSE(ctx: Context, res: Response): Promise<void> {
// Also decode for interception
buffer += decoder.decode(value, { stream: true })
// Process complete SSE event blocks (LF or CRLF blank-line delimiters).
let next: { block: string; rest: string } | null
while ((next = takeSSEBlock(buffer)) !== null) {
buffer = next.rest
extractRunCompletedFromBlock(next.block)
// Process complete SSE lines (delimited by double newline)
let newlineIdx: number
while ((newlineIdx = buffer.indexOf('\n\n')) !== -1) {
const eventBlock = buffer.slice(0, newlineIdx)
buffer = buffer.slice(newlineIdx + 2)
extractRunCompletedFromChunk(eventBlock)
}
}
buffer += decoder.decode()
// Process remaining buffer
if (buffer.trim()) {
extractRunCompletedFromBlock(buffer)
extractRunCompletedFromChunk(buffer)
}
} finally {
ctx.res.end()
@@ -247,9 +232,6 @@ export async function proxy(ctx: Context) {
// Intercept SSE streams for /v1/runs/{id}/events
const sseMatch = upstreamPath.match(SSE_EVENTS_PATH)
if (sseMatch) {
ctx.set('Content-Type', 'text/event-stream')
ctx.set('Cache-Control', 'no-cache, no-transform')
ctx.set('X-Accel-Buffering', 'no')
await streamSSE(ctx, res)
return
}