From fc02348ebde241eb32168c55f33617a9218ffb3b Mon Sep 17 00:00:00 2001 From: ekko <152005280+EKKOLearnAI@users.noreply.github.com> Date: Sun, 10 May 2026 04:10:01 +0800 Subject: [PATCH] fix: prevent message loss on abort by deferring DB writes to flush (#591) Defer all non-user message DB writes until response completion or abort, instead of writing tool calls immediately during streaming. This ensures correct message ordering and prevents the abort handler from overwriting displayed messages with incomplete DB data. - Remove immediate addMessage() calls from response.output_item.done - Remove immediate addMessage() from insertResponseTextOnce - Add flushResponseRunToDb() to batch-write all run messages on both normal completion (markCompleted) and abort (handleAbort) - Skip user messages in flush (already written in handleRun) - Remove refreshActiveSession() from abort.completed frontend handler Co-authored-by: Claude Opus 4.7 --- packages/client/src/stores/hermes/chat.ts | 6 -- .../src/services/hermes/chat-run-socket.ts | 60 ++++++++++--------- 2 files changed, 33 insertions(+), 33 deletions(-) diff --git a/packages/client/src/stores/hermes/chat.ts b/packages/client/src/stores/hermes/chat.ts index f45f6c5..70de49b 100644 --- a/packages/client/src/stores/hermes/chat.ts +++ b/packages/client/src/stores/hermes/chat.ts @@ -827,9 +827,6 @@ export const useChatStore = defineStore('chat', () => { } }) cleanup() - if (sid === activeSessionId.value) { - void refreshActiveSession() - } setAbortState(null) break } @@ -1268,9 +1265,6 @@ export const useChatStore = defineStore('chat', () => { } }) cleanup() - if (sid === activeSessionId.value) { - void refreshActiveSession() - } setAbortState(null) break } diff --git a/packages/server/src/services/hermes/chat-run-socket.ts b/packages/server/src/services/hermes/chat-run-socket.ts index e347559..b25d756 100644 --- a/packages/server/src/services/hermes/chat-run-socket.ts +++ b/packages/server/src/services/hermes/chat-run-socket.ts @@ -1109,14 +1109,6 @@ export class ChatRunSocket { finish_reason: 'tool_calls', timestamp: now(), }) - addMessage({ - session_id: sessionId, - role: 'assistant', - content: '', - tool_calls: [toolCall], - finish_reason: 'tool_calls', - timestamp: now(), - }) } return { event: 'tool.started', @@ -1151,14 +1143,6 @@ export class ChatRunSocket { tool_name: toolName, timestamp: now(), }) - addMessage({ - session_id: sessionId, - role: 'tool', - content: output, - tool_call_id: callId, - tool_name: toolName, - timestamp: now(), - }) } return { event: 'tool.completed', @@ -1215,10 +1199,11 @@ export class ChatRunSocket { if (run.textInserted || !text?.trim()) return run.textInserted = true - const last = [...state.messages].reverse().find(m => m.runMarker === runMarker) - if (last?.role === 'assistant' && !last.tool_calls?.length) { - last.content = text - last.finish_reason = 'stop' + const lastIdx = [...state.messages].map((m, i) => ({ m, i })) + .reverse().find(({ m }) => m.runMarker === runMarker) + if (lastIdx && lastIdx.m.role === 'assistant' && !lastIdx.m.tool_calls?.length) { + lastIdx.m.content = text + lastIdx.m.finish_reason = 'stop' } else { state.messages.push({ id: state.messages.length + 1, @@ -1230,13 +1215,30 @@ export class ChatRunSocket { timestamp: Math.floor(Date.now() / 1000), }) } - addMessage({ - session_id: sessionId, - role: 'assistant', - content: text, - finish_reason: 'stop', - timestamp: Math.floor(Date.now() / 1000), - }) + } + + /** Flush all non-user messages for this run to DB in order. */ + private flushResponseRunToDb(state: SessionState, sessionId: string) { + const run = state.responseRun + if (!run?.runMarker) return + let flushed = 0 + for (const msg of state.messages) { + if (msg.runMarker !== run.runMarker) continue + if (msg.role === 'user') continue + addMessage({ + session_id: sessionId, + role: msg.role, + content: msg.content || '', + tool_call_id: msg.tool_call_id ?? null, + tool_calls: msg.tool_calls ?? null, + tool_name: msg.tool_name ?? null, + finish_reason: msg.finish_reason ?? null, + timestamp: msg.timestamp, + }) + flushed++ + } + logger.info('[chat-run-socket] flushResponseRunToDb: flushed %d messages for session %s', + flushed, sessionId) } // --- Abort handler --- @@ -1274,6 +1276,9 @@ export class ChatRunSocket { }) logger.info({ sessionId, runId }, '[chat-run-socket][abort] started') + // Flush in-memory assistant text to DB before aborting the stream. + this.flushResponseRunToDb(state, sessionId) + if (state.abortController) { state.abortController.abort() } @@ -1296,6 +1301,7 @@ export class ChatRunSocket { state.abortController = undefined state.runId = undefined state.events = [] + this.flushResponseRunToDb(state, sessionId) state.responseRun = undefined state.profile = undefined updateSessionStats(sessionId)