From 70ed0e0dc278915cc339ca09be333b6e2ba57e7a Mon Sep 17 00:00:00 2001 From: ekko <152005280+EKKOLearnAI@users.noreply.github.com> Date: Fri, 24 Apr 2026 22:18:32 +0800 Subject: [PATCH] revert: harden Hermes stream recovery around tool-call boundaries (#189) (#192) Reverts #189 due to reported bugs. Co-authored-by: Claude Opus 4.6 --- packages/client/src/api/hermes/chat.ts | 4 +- packages/client/src/stores/hermes/chat.ts | 189 ++++++++-------- .../server/src/routes/hermes/proxy-handler.ts | 66 ++---- tests/client/chat-api.test.ts | 59 ----- tests/client/chat-store.test.ts | 207 +----------------- tests/server/proxy-handler.test.ts | 85 ------- 6 files changed, 114 insertions(+), 496 deletions(-) delete mode 100644 tests/client/chat-api.test.ts diff --git a/packages/client/src/api/hermes/chat.ts b/packages/client/src/api/hermes/chat.ts index b9fb7c9..d882553 100644 --- a/packages/client/src/api/hermes/chat.ts +++ b/packages/client/src/api/hermes/chat.ts @@ -72,9 +72,7 @@ export function streamRunEvents( onDone() } } catch { - // Some SSE adapters may deliver raw text frames. Treat them as - // assistant deltas so they render instead of being silently ignored. - onEvent({ event: 'message.delta', delta: e.data }) + onEvent({ event: 'message', delta: e.data }) } } diff --git a/packages/client/src/stores/hermes/chat.ts b/packages/client/src/stores/hermes/chat.ts index 8d91ddc..8355dbf 100644 --- a/packages/client/src/stores/hermes/chat.ts +++ b/packages/client/src/stores/hermes/chat.ts @@ -1,5 +1,5 @@ import { startRun, streamRunEvents, type ChatMessage, type RunEvent } from '@/api/hermes/chat' -import { deleteSession as deleteSessionApi, fetchSession, fetchSessions, fetchSessionUsageSingle, type HermesMessage, type SessionDetail, type SessionSummary } from '@/api/hermes/sessions' +import { deleteSession as deleteSessionApi, fetchSession, fetchSessions, fetchSessionUsageSingle, type HermesMessage, type SessionSummary } from '@/api/hermes/sessions' import { defineStore } from 'pinia' import { ref, computed } from 'vue' import { useAppStore } from './app' @@ -162,42 +162,6 @@ function mapHermesSession(s: SessionSummary): Session { } } -function assistantTranscriptLength(msgs: Message[]): number { - return msgs.reduce((total, m) => total + (m.role === 'assistant' ? (m.content?.length ?? 0) : 0), 0) -} - -function userTurnCount(msgs: Message[]): number { - return msgs.filter(m => m.role === 'user').length -} - -function serverMessagesAreAheadOrEqual(local: Message[], server: Message[]): boolean { - const localUsers = userTurnCount(local) - const serverUsers = userTurnCount(server) - return serverUsers > localUsers - || (serverUsers === localUsers && assistantTranscriptLength(server) >= assistantTranscriptLength(local)) -} - -function serverHasCaughtUpToLocalTurn(local: Message[], server: Message[]): boolean { - return userTurnCount(server) >= userTurnCount(local) -} - -function applySessionMetaFromDetail(target: Session, detail: SessionDetail) { - if (detail.title) target.title = detail.title - target.endedAt = detail.ended_at != null ? Math.round(detail.ended_at * 1000) : null - target.lastActiveAt = detail.last_active != null ? Math.round(detail.last_active * 1000) : target.lastActiveAt - target.updatedAt = Math.round((detail.last_active || detail.ended_at || target.updatedAt / 1000) * 1000) -} - -function applyServerMessagesIfAhead(target: Session, detail: SessionDetail): boolean { - const mapped = mapHermesMessages(detail.messages || []) - applySessionMetaFromDetail(target, detail) - if (serverMessagesAreAheadOrEqual(target.messages, mapped)) { - target.messages = mapped - return true - } - return false -} - // Cache keys for stale-while-revalidate loading of sessions / messages. // All keys include the active profile name to isolate cache between profiles. // Rendering from cache on boot avoids the multi-round-trip wait the user sees @@ -392,10 +356,8 @@ export const useChatStore = defineStore('chat', () => { saveJsonWithLegacy(inFlightKey(sid), { runId, startedAt: Date.now() } as InFlightRun, legacyInFlightKey(sid)) } - function clearInFlight(sid: string, runId?: string): boolean { - if (runId && readInFlight(sid)?.runId !== runId) return false + function clearInFlight(sid: string) { removeItemWithLegacy(inFlightKey(sid), legacyInFlightKey(sid)) - return true } function readInFlight(sid: string): InFlightRun | null { @@ -419,8 +381,8 @@ export const useChatStore = defineStore('chat', () => { } // Poll fetchSession while an in-flight run is recovering. Exits when the - // server reports a terminal session and its message signature is stable for - // POLL_STABLE_EXITS ticks, TTL elapses, or the user explicitly starts streaming. + // server's message signature is stable for POLL_STABLE_EXITS ticks (run + // presumed done), TTL elapses, or the user explicitly starts streaming. function startPolling(sid: string) { if (pollTimers.has(sid)) return resumingRuns.value = new Set([...resumingRuns.value, sid]) @@ -441,21 +403,34 @@ export const useChatStore = defineStore('chat', () => { const mapped = mapHermesMessages(detail.messages || []) const target = sessions.value.find(s => s.id === sid) if (!target) return - const serverIsCaughtUp = serverHasCaughtUpToLocalTurn(target.messages, mapped) - const serverIsAhead = serverMessagesAreAheadOrEqual(target.messages, mapped) - const serverIsTerminal = detail.ended_at != null - - applySessionMetaFromDetail(target, detail) + // Use the same "content-aware" comparison as switchSession: server + // is ahead iff it knows about at least as many user turns and its + // last assistant text is at least as long as ours. + const local = target.messages + const localLastAssistant = [...local].reverse().find(m => m.role === 'assistant') + const serverLastAssistant = [...mapped].reverse().find(m => m.role === 'assistant') + const localAssistantLen = localLastAssistant?.content?.length ?? 0 + const serverAssistantLen = serverLastAssistant?.content?.length ?? 0 + const localUsers = local.filter(m => m.role === 'user').length + const serverUsers = mapped.filter(m => m.role === 'user').length + const serverIsCaughtUp = serverUsers >= localUsers + // Same rationale as switchSession: strictly more user turns means + // server is ahead (new turn complete). Equal user turns + longer + // assistant means server caught up on the current turn. + const serverIsAhead = + serverUsers > localUsers + || (serverUsers === localUsers && serverAssistantLen >= localAssistantLen) if (serverIsAhead) { target.messages = mapped + if (detail.title && !target.title) target.title = detail.title if (sid === activeSessionId.value) persistActiveMessages() } - - // Stability detection ONLY matters when the server has caught up to - // our latest user turn AND the session is terminal. During long tool - // calls the persisted transcript may be stable while the run is still - // active; treating that as completion is the truncation failure mode. - if (!serverIsCaughtUp || !serverIsTerminal) { + // Stability detection ONLY matters when the server has at least as + // many user turns as we do. Otherwise the server is still catching + // up (e.g. the new turn we just sent hasn't been flushed server-side + // yet) and a "stable" signature is a false positive — the stability + // is the server NOT having our latest turn, not the run being done. + if (!serverIsCaughtUp) { pollSignatures.delete(sid) } else { const last = mapped[mapped.length - 1] @@ -464,13 +439,14 @@ export const useChatStore = defineStore('chat', () => { if (prev && prev.sig === sig) { prev.stableTicks += 1 if (prev.stableTicks >= POLL_STABLE_EXITS) { - // Server confirms this run is terminal. Keep any longer local - // stream text if the final session export lags behind. - if (serverIsAhead) target.messages = mapped + // Run is done on the server. Force-apply server view even if + // our "don't retreat" guard above skipped it — the server is + // now the authoritative source of truth. + target.messages = mapped + if (detail.title) target.title = detail.title if (sid === activeSessionId.value) persistActiveMessages() - if (clearInFlight(sid, inFlight.runId)) { - stopPolling(sid) - } + clearInFlight(sid) + stopPolling(sid) } } else { pollSignatures.set(sid, { sig, stableTicks: 0 }) @@ -545,9 +521,9 @@ export const useChatStore = defineStore('chat', () => { } } - // Re-pull active session from server without retreating from longer local - // streamed text. Used on SSE drop and tab-visible events — mobile browsers - // can kill EventSource while the backend run continues. + // Re-pull active session from server and overwrite local messages. Used on + // SSE drop and on tab-visible events — mobile browsers kill EventSource + // while backgrounded, but the backend run usually completes anyway. async function refreshActiveSession(): Promise { const sid = activeSessionId.value if (!sid) return false @@ -556,28 +532,17 @@ export const useChatStore = defineStore('chat', () => { if (!detail) return false const target = sessions.value.find(s => s.id === sid) if (!target) return false - const applied = applyServerMessagesIfAhead(target, detail) - if (applied && sid === activeSessionId.value) persistActiveMessages() - return applied + const mapped = mapHermesMessages(detail.messages || []) + target.messages = mapped + if (detail.title) target.title = detail.title + persistActiveMessages() + return true } catch (err) { console.error('Failed to refresh active session:', err) return false } } - async function reconcileSessionAfterCompletion(sid: string): Promise { - try { - const detail = await fetchSession(sid) - if (!detail) return - const target = sessions.value.find(s => s.id === sid) - if (!target) return - const applied = applyServerMessagesIfAhead(target, detail) - if (applied && sid === activeSessionId.value) persistActiveMessages() - } catch (err) { - console.error('Failed to reconcile completed session:', err) - } - } - function createSession(): Session { const session: Session = { @@ -623,14 +588,34 @@ export const useChatStore = defineStore('chat', () => { const detail = await fetchSession(sessionId) if (detail && detail.messages) { const mapped = mapHermesMessages(detail.messages) - // Pick whichever view has more information. Simple array length - // comparison is wrong because mapHermesMessages folds tool-call-only - // assistant msgs into tool-result msgs. Also, tool boundaries can - // split one assistant turn into pre-tool and post-tool assistant - // segments, so comparing only the last assistant segment can retreat - // a fuller local transcript to stale pre-tool server text. Compare - // user-turn count plus total assistant transcript length instead. - if (serverMessagesAreAheadOrEqual(activeSession.value.messages, mapped)) { + // Pick whichever view has more information. Simple length comparison + // is wrong because mapHermesMessages folds tool_call-only assistant + // msgs and matches them with tool-result msgs — so post-fold `mapped` + // can be SHORTER than the raw SSE-built local array even when the + // server is strictly ahead. Instead, compare the last assistant + // message content: if the server's is at least as long, the server + // is up-to-date (and has the final complete text); otherwise keep + // local (in-flight window where server hasn't flushed the new turn). + const local = activeSession.value.messages + const localLastAssistant = [...local].reverse().find(m => m.role === 'assistant') + const serverLastAssistant = [...mapped].reverse().find(m => m.role === 'assistant') + const localAssistantLen = localLastAssistant?.content?.length ?? 0 + const serverAssistantLen = serverLastAssistant?.content?.length ?? 0 + const localUsers = local.filter(m => m.role === 'user').length + const serverUsers = mapped.filter(m => m.role === 'user').length + // Trust server when: + // - it has STRICTLY MORE user turns than we do (new turn landed), + // OR + // - same user-turn count AND server's last assistant is at least + // as long as ours (same turn, server caught up or further) + // Otherwise keep local (protects against the server-not-yet-flushed + // race during in-flight runs). Length comparison alone is wrong + // across different turns because each turn's last assistant is + // unrelated to the previous turn's. + const serverIsAhead = + serverUsers > localUsers + || (serverUsers === localUsers && serverAssistantLen >= localAssistantLen) + if (serverIsAhead) { activeSession.value.messages = mapped } // Update title: use Hermes title, or fallback to first user message @@ -838,8 +823,7 @@ export const useChatStore = defineStore('chat', () => { case 'run.started': break - case 'message.delta': - case 'message': { + case 'message.delta': { const msgs = getSessionMsgs(sid) const last = msgs[msgs.length - 1] if (last?.role === 'assistant' && last.isStreaming) { @@ -904,14 +888,14 @@ export const useChatStore = defineStore('chat', () => { } cleanup() updateSessionTitle(sid) - // Persist the terminal local view before clearing the in-flight - // marker. If final SSE deltas were missed, reconcile once from - // the authoritative session export without retreating from - // longer local text. + // the in-flight marker. If the browser is reloading right now + // and kills us between the two localStorage writes, we want + // the next page load to still see in-flight === true (so + // polling kicks in and recovers) rather than the other way + // around (cleared in-flight + stale streaming cache = UI stuck). if (sid === activeSessionId.value) persistActiveMessages() - void reconcileSessionAfterCompletion(sid).finally(() => { - if (readInFlight(sid)?.runId === runId) startPolling(sid) - }) + clearInFlight(sid) + stopPolling(sid) break } @@ -939,9 +923,8 @@ export const useChatStore = defineStore('chat', () => { }) cleanup() if (sid === activeSessionId.value) persistActiveMessages() - if (clearInFlight(sid, runId)) { - stopPolling(sid) - } + clearInFlight(sid) + stopPolling(sid) break } } @@ -970,9 +953,13 @@ export const useChatStore = defineStore('chat', () => { if (last?.isStreaming) { updateMessage(sid, last.id, { isStreaming: false }) } - // Keep running tool state until refresh/polling sees the server's - // terminal transcript. A dropped SSE connection is not proof the - // tool completed. + // Any tool messages still marked 'running' will be replaced by the + // server's view after refresh; clear their spinner state now. + msgs.forEach((m, i) => { + if (m.role === 'tool' && m.toolStatus === 'running') { + msgs[i] = { ...m, toolStatus: 'done' } + } + }) cleanup() if (sid === activeSessionId.value) { void refreshActiveSession() diff --git a/packages/server/src/routes/hermes/proxy-handler.ts b/packages/server/src/routes/hermes/proxy-handler.ts index 03c14b1..77f38d3 100644 --- a/packages/server/src/routes/hermes/proxy-handler.ts +++ b/packages/server/src/routes/hermes/proxy-handler.ts @@ -96,41 +96,26 @@ function buildProxyHeaders(ctx: Context, upstream: string): Record { // Also decode for interception buffer += decoder.decode(value, { stream: true }) - // Process complete SSE event blocks (LF or CRLF blank-line delimiters). - let next: { block: string; rest: string } | null - while ((next = takeSSEBlock(buffer)) !== null) { - buffer = next.rest - extractRunCompletedFromBlock(next.block) + // Process complete SSE lines (delimited by double newline) + let newlineIdx: number + while ((newlineIdx = buffer.indexOf('\n\n')) !== -1) { + const eventBlock = buffer.slice(0, newlineIdx) + buffer = buffer.slice(newlineIdx + 2) + extractRunCompletedFromChunk(eventBlock) } } - buffer += decoder.decode() // Process remaining buffer if (buffer.trim()) { - extractRunCompletedFromBlock(buffer) + extractRunCompletedFromChunk(buffer) } } finally { ctx.res.end() @@ -247,9 +232,6 @@ export async function proxy(ctx: Context) { // Intercept SSE streams for /v1/runs/{id}/events const sseMatch = upstreamPath.match(SSE_EVENTS_PATH) if (sseMatch) { - ctx.set('Content-Type', 'text/event-stream') - ctx.set('Cache-Control', 'no-cache, no-transform') - ctx.set('X-Accel-Buffering', 'no') await streamSSE(ctx, res) return } diff --git a/tests/client/chat-api.test.ts b/tests/client/chat-api.test.ts deleted file mode 100644 index 9c76f49..0000000 --- a/tests/client/chat-api.test.ts +++ /dev/null @@ -1,59 +0,0 @@ -// @vitest-environment jsdom -import { beforeEach, describe, expect, it, vi } from 'vitest' -import { streamRunEvents, type RunEvent } from '@/api/hermes/chat' - -class MockEventSource { - static instances: MockEventSource[] = [] - - url: string - onmessage: ((event: { data: string }) => void) | null = null - onerror: (() => void) | null = null - close = vi.fn() - - constructor(url: string) { - this.url = url - MockEventSource.instances.push(this) - } - - emit(data: string) { - this.onmessage?.({ data }) - } - - fail() { - this.onerror?.() - } -} - -describe('streamRunEvents', () => { - beforeEach(() => { - window.localStorage.clear() - MockEventSource.instances = [] - vi.stubGlobal('EventSource', MockEventSource) - }) - - it('maps non-JSON EventSource data to message.delta so raw text is rendered', () => { - const events: RunEvent[] = [] - - streamRunEvents('run-raw', event => events.push(event), vi.fn(), vi.fn()) - MockEventSource.instances[0].emit('原因:raw fallback') - - expect(events).toEqual([{ event: 'message.delta', delta: '原因:raw fallback' }]) - }) - - it('parses colon-containing JSON deltas and closes on completion', () => { - const events: RunEvent[] = [] - const onDone = vi.fn() - - streamRunEvents('run-json', event => events.push(event), onDone, vi.fn()) - const source = MockEventSource.instances[0] - source.emit(JSON.stringify({ event: 'message.delta', delta: '让我直接读文件:A: B' })) - source.emit(JSON.stringify({ event: 'run.completed' })) - - expect(events).toEqual([ - { event: 'message.delta', delta: '让我直接读文件:A: B' }, - { event: 'run.completed' }, - ]) - expect(source.close).toHaveBeenCalledTimes(1) - expect(onDone).toHaveBeenCalledTimes(1) - }) -}) diff --git a/tests/client/chat-store.test.ts b/tests/client/chat-store.test.ts index af296e3..cf1ccb4 100644 --- a/tests/client/chat-store.test.ts +++ b/tests/client/chat-store.test.ts @@ -41,36 +41,13 @@ function makeSummary(id: string, title = 'Session') { } } -function makeDetail(id: string, messages: Array>, overrides: Record = {}) { +function makeDetail(id: string, messages: Array>) { return { ...makeSummary(id), - ...overrides, messages, } } -function makeHermesMessage( - id: number, - role: 'user' | 'assistant' | 'system' | 'tool', - content: string, - overrides: Record = {}, -) { - return { - id, - session_id: overrides.session_id || 'sess-1', - role, - content, - tool_call_id: null, - tool_calls: null, - tool_name: null, - timestamp: 1710000000 + id, - token_count: null, - finish_reason: null, - reasoning: null, - ...overrides, - } -} - async function flushPromises() { await Promise.resolve() await Promise.resolve() @@ -317,186 +294,4 @@ describe('Chat Store', () => { expect(store.isRunActive).toBe(false) expect(window.localStorage.getItem(inFlightKey('sess-1'))).toBeNull() }) - - it('keeps colon deltas before and after a tool boundary', async () => { - mockChatApi.streamRunEvents.mockImplementation(( - _runId: string, - onEvent: (event: any) => void, - ) => { - onEvent({ event: 'message.delta', delta: '让我直接读文件:' }) - onEvent({ event: 'tool.started', tool: 'read_file', preview: 'notes.md' }) - onEvent({ event: 'tool.completed' }) - onEvent({ event: 'message.delta', delta: '读取后结论: final' }) - onEvent({ event: 'run.completed' }) - return { abort: vi.fn() } - }) - - const store = useChatStore() - await flushPromises() - await store.sendMessage('check file') - await flushPromises() - - const assistantText = store.messages - .filter(m => m.role === 'assistant') - .map(m => m.content) - .join('') - expect(assistantText).toBe('让我直接读文件:读取后结论: final') - expect(store.messages.some(m => m.role === 'tool' && m.toolName === 'read_file' && m.toolStatus === 'done')).toBe(true) - }) - - it('renders raw SSE fallback message events as assistant deltas', async () => { - mockChatApi.streamRunEvents.mockImplementation(( - _runId: string, - onEvent: (event: any) => void, - ) => { - onEvent({ event: 'message', delta: '原因:raw fallback' }) - onEvent({ event: 'run.completed' }) - return { abort: vi.fn() } - }) - - const store = useChatStore() - await flushPromises() - await store.sendMessage('raw stream') - await flushPromises() - - expect(store.messages.some(m => m.role === 'assistant' && m.content === '原因:raw fallback')).toBe(true) - }) - - it('does not stop polling when server messages are stable but the session is still active', async () => { - vi.useFakeTimers() - - let fetchSessionCalls = 0 - mockSessionsApi.fetchSession.mockImplementation(async () => { - fetchSessionCalls += 1 - if (fetchSessionCalls === 1) return null - return makeDetail('sess-1', [ - makeHermesMessage(1, 'user', 'tool gap prompt'), - makeHermesMessage(2, 'assistant', '让我直接读文件:'), - ], { ended_at: null }) - }) - - mockChatApi.streamRunEvents.mockImplementation(( - _runId: string, - onEvent: (event: any) => void, - _onDone: () => void, - onError: (err: Error) => void, - ) => { - onEvent({ event: 'message.delta', delta: '让我直接读文件:' }) - setTimeout(() => onError(new Error('SSE connection error')), 0) - return { abort: vi.fn() } - }) - - const store = useChatStore() - await flushPromises() - await store.sendMessage('tool gap prompt') - const sid = store.activeSessionId! - - await vi.advanceTimersByTimeAsync(0) - await flushPromises() - await vi.advanceTimersByTimeAsync(9000) - await flushPromises() - - expect(window.localStorage.getItem(inFlightKey(sid))).toBeTruthy() - expect(store.isRunActive).toBe(true) - }) - - it('reconciles the final session after run.completed to recover missed deltas', async () => { - let fetchSessionCalls = 0 - mockSessionsApi.fetchSession.mockImplementation(async () => { - fetchSessionCalls += 1 - if (fetchSessionCalls === 1) return null - return makeDetail('sess-1', [ - makeHermesMessage(1, 'user', 'finish prompt'), - makeHermesMessage(2, 'assistant', '让我直接读文件:读取后结论:完整回答'), - ], { ended_at: 1710000010 }) - }) - - mockChatApi.streamRunEvents.mockImplementation(( - _runId: string, - onEvent: (event: any) => void, - ) => { - onEvent({ event: 'message.delta', delta: '让我直接读文件:' }) - onEvent({ event: 'run.completed' }) - return { abort: vi.fn() } - }) - - const store = useChatStore() - await flushPromises() - await store.sendMessage('finish prompt') - await flushPromises() - - expect(store.messages.some(m => m.role === 'assistant' && m.content === '让我直接读文件:读取后结论:完整回答')).toBe(true) - }) - - it('does not replace longer local tool-boundary text with a stale shorter final fetch', async () => { - let fetchSessionCalls = 0 - const stalePrefix = '让我直接读文件:较长的工具前说明' - mockSessionsApi.fetchSession.mockImplementation(async () => { - fetchSessionCalls += 1 - if (fetchSessionCalls === 1) return null - return makeDetail('sess-1', [ - makeHermesMessage(1, 'user', 'stale prompt'), - makeHermesMessage(2, 'assistant', stalePrefix), - ], { ended_at: 1710000010 }) - }) - - mockChatApi.streamRunEvents.mockImplementation(( - _runId: string, - onEvent: (event: any) => void, - ) => { - onEvent({ event: 'message.delta', delta: stalePrefix }) - onEvent({ event: 'tool.started', tool: 'read_file', preview: 'notes.md' }) - onEvent({ event: 'tool.completed' }) - onEvent({ event: 'message.delta', delta: 'OK' }) - onEvent({ event: 'run.completed' }) - return { abort: vi.fn() } - }) - - const store = useChatStore() - await flushPromises() - await store.sendMessage('stale prompt') - await flushPromises() - - const assistantText = store.messages - .filter(m => m.role === 'assistant') - .map(m => m.content) - .join('') - expect(assistantText).toBe(`${stalePrefix}OK`) - expect(store.messages.some(m => m.role === 'tool' && m.toolStatus === 'done')).toBe(true) - }) - - it('does not let delayed completion reconciliation clear a newer in-flight run', async () => { - let resolveReconcile: ((detail: any) => void) | null = null - const reconcilePromise = new Promise(resolve => { resolveReconcile = resolve }) - mockSessionsApi.fetchSession.mockImplementation(() => reconcilePromise) - mockChatApi.startRun - .mockResolvedValueOnce({ run_id: 'run-1', status: 'queued' }) - .mockResolvedValueOnce({ run_id: 'run-2', status: 'queued' }) - let firstRunEvent: ((event: any) => void) | null = null - mockChatApi.streamRunEvents.mockImplementation(( - runId: string, - onEvent: (event: any) => void, - ) => { - if (runId === 'run-1') firstRunEvent = onEvent - return { abort: vi.fn() } - }) - - const store = useChatStore() - await flushPromises() - await store.sendMessage('first') - firstRunEvent!({ event: 'run.completed' }) - await flushPromises() - const sid = store.activeSessionId! - await store.sendMessage('second') - - expect(JSON.parse(window.localStorage.getItem(inFlightKey(sid)) || '{}').runId).toBe('run-2') - - resolveReconcile!(makeDetail(sid, [ - makeHermesMessage(1, 'user', 'first', { session_id: sid }), - makeHermesMessage(2, 'assistant', 'first done', { session_id: sid }), - ], { ended_at: 1710000010 })) - await flushPromises() - - expect(JSON.parse(window.localStorage.getItem(inFlightKey(sid)) || '{}').runId).toBe('run-2') - }) }) diff --git a/tests/server/proxy-handler.test.ts b/tests/server/proxy-handler.test.ts index cb9db4e..87bdd88 100644 --- a/tests/server/proxy-handler.test.ts +++ b/tests/server/proxy-handler.test.ts @@ -414,89 +414,4 @@ describe('SSE stream interception — run.completed', () => { expect(mockUpdateUsage).toHaveBeenCalledWith('session-split', 200, 50) }) - - it('forwards colon-containing SSE deltas around tool events unchanged and disables buffering', async () => { - const runId = 'run-colon-tool' - const sseData = [ - `data: ${JSON.stringify({ event: 'message.delta', run_id: runId, delta: '让我直接读文件:A: B' })}\n\n`, - `data: ${JSON.stringify({ event: 'tool.started', run_id: runId, tool: 'read_file', preview: 'file:a.md' })}\n\n`, - `data: ${JSON.stringify({ event: 'tool.completed', run_id: runId })}\n\n`, - `data: ${JSON.stringify({ event: 'message.delta', run_id: runId, delta: '继续:done' })}\n\n`, - ] - - mockFetch.mockResolvedValue({ - status: 200, - headers: new Headers({ 'content-type': 'text/event-stream' }), - body: createSSEBody(sseData), - }) - - const ctx = createMockCtx({ - path: `/api/hermes/v1/runs/${runId}/events`, - search: '', - }) - - await proxy(ctx) - - const forwarded = ctx.res.write.mock.calls - .map(([chunk]: [Uint8Array]) => new TextDecoder().decode(chunk)) - .join('') - expect(forwarded).toBe(sseData.join('')) - expect(ctx.set).toHaveBeenCalledWith('Content-Type', 'text/event-stream') - expect(ctx.set).toHaveBeenCalledWith('Cache-Control', 'no-cache, no-transform') - expect(ctx.set).toHaveBeenCalledWith('X-Accel-Buffering', 'no') - }) - - it('intercepts run.completed with CRLF delimiters and data without a space', async () => { - const runId = 'run-crlf' - setRunSession(runId, 'session-crlf') - const completedJson = JSON.stringify({ event: 'run.completed', run_id: runId, usage: { input_tokens: 321, output_tokens: 45, total_tokens: 366 } }) - const sseData = [`data:${completedJson}\r\n\r\n`] - - mockFetch.mockResolvedValue({ - status: 200, - headers: new Headers({ 'content-type': 'text/event-stream' }), - body: createSSEBody(sseData), - }) - - const ctx = createMockCtx({ - path: `/api/hermes/v1/runs/${runId}/events`, - search: '', - }) - - await proxy(ctx) - - expect(mockUpdateUsage).toHaveBeenCalledWith('session-crlf', 321, 45) - }) - - it('does not let usage accounting failures abort the SSE stream', async () => { - const runId = 'run-usage-fails' - setRunSession(runId, 'session-usage-fails') - mockUpdateUsage.mockImplementationOnce(() => { - throw new Error('usage db unavailable') - }) - const sseData = [ - `data: ${JSON.stringify({ event: 'message.delta', run_id: runId, delta: 'before:' })}\n\n`, - `data: ${JSON.stringify({ event: 'run.completed', run_id: runId, usage: { input_tokens: 1, output_tokens: 2, total_tokens: 3 } })}\n\n`, - ] - - mockFetch.mockResolvedValue({ - status: 200, - headers: new Headers({ 'content-type': 'text/event-stream' }), - body: createSSEBody(sseData), - }) - - const ctx = createMockCtx({ - path: `/api/hermes/v1/runs/${runId}/events`, - search: '', - }) - - await proxy(ctx) - - const forwarded = ctx.res.write.mock.calls - .map(([chunk]: [Uint8Array]) => new TextDecoder().decode(chunk)) - .join('') - expect(ctx.status).toBe(200) - expect(forwarded).toBe(sseData.join('')) - expect(ctx.res.end).toHaveBeenCalled() - }) })