fix: prevent message loss on abort by deferring DB writes to flush (#591)

Defer all non-user message DB writes until response completion or
abort, instead of writing tool calls immediately during streaming.
This ensures correct message ordering and prevents the abort handler
from overwriting displayed messages with incomplete DB data.

- Remove immediate addMessage() calls from response.output_item.done
- Remove immediate addMessage() from insertResponseTextOnce
- Add flushResponseRunToDb() to batch-write all run messages on
  both normal completion (markCompleted) and abort (handleAbort)
- Skip user messages in flush (already written in handleRun)
- Remove refreshActiveSession() from abort.completed frontend handler

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
ekko
2026-05-10 04:10:01 +08:00
committed by GitHub
parent 50122c5ff8
commit fc02348ebd
2 changed files with 33 additions and 33 deletions
@@ -827,9 +827,6 @@ export const useChatStore = defineStore('chat', () => {
} }
}) })
cleanup() cleanup()
if (sid === activeSessionId.value) {
void refreshActiveSession()
}
setAbortState(null) setAbortState(null)
break break
} }
@@ -1268,9 +1265,6 @@ export const useChatStore = defineStore('chat', () => {
} }
}) })
cleanup() cleanup()
if (sid === activeSessionId.value) {
void refreshActiveSession()
}
setAbortState(null) setAbortState(null)
break break
} }
@@ -1109,14 +1109,6 @@ export class ChatRunSocket {
finish_reason: 'tool_calls', finish_reason: 'tool_calls',
timestamp: now(), timestamp: now(),
}) })
addMessage({
session_id: sessionId,
role: 'assistant',
content: '',
tool_calls: [toolCall],
finish_reason: 'tool_calls',
timestamp: now(),
})
} }
return { return {
event: 'tool.started', event: 'tool.started',
@@ -1151,14 +1143,6 @@ export class ChatRunSocket {
tool_name: toolName, tool_name: toolName,
timestamp: now(), timestamp: now(),
}) })
addMessage({
session_id: sessionId,
role: 'tool',
content: output,
tool_call_id: callId,
tool_name: toolName,
timestamp: now(),
})
} }
return { return {
event: 'tool.completed', event: 'tool.completed',
@@ -1215,10 +1199,11 @@ export class ChatRunSocket {
if (run.textInserted || !text?.trim()) return if (run.textInserted || !text?.trim()) return
run.textInserted = true run.textInserted = true
const last = [...state.messages].reverse().find(m => m.runMarker === runMarker) const lastIdx = [...state.messages].map((m, i) => ({ m, i }))
if (last?.role === 'assistant' && !last.tool_calls?.length) { .reverse().find(({ m }) => m.runMarker === runMarker)
last.content = text if (lastIdx && lastIdx.m.role === 'assistant' && !lastIdx.m.tool_calls?.length) {
last.finish_reason = 'stop' lastIdx.m.content = text
lastIdx.m.finish_reason = 'stop'
} else { } else {
state.messages.push({ state.messages.push({
id: state.messages.length + 1, id: state.messages.length + 1,
@@ -1230,13 +1215,30 @@ export class ChatRunSocket {
timestamp: Math.floor(Date.now() / 1000), timestamp: Math.floor(Date.now() / 1000),
}) })
} }
}
/** Flush all non-user messages for this run to DB in order. */
private flushResponseRunToDb(state: SessionState, sessionId: string) {
const run = state.responseRun
if (!run?.runMarker) return
let flushed = 0
for (const msg of state.messages) {
if (msg.runMarker !== run.runMarker) continue
if (msg.role === 'user') continue
addMessage({ addMessage({
session_id: sessionId, session_id: sessionId,
role: 'assistant', role: msg.role,
content: text, content: msg.content || '',
finish_reason: 'stop', tool_call_id: msg.tool_call_id ?? null,
timestamp: Math.floor(Date.now() / 1000), tool_calls: msg.tool_calls ?? null,
tool_name: msg.tool_name ?? null,
finish_reason: msg.finish_reason ?? null,
timestamp: msg.timestamp,
}) })
flushed++
}
logger.info('[chat-run-socket] flushResponseRunToDb: flushed %d messages for session %s',
flushed, sessionId)
} }
// --- Abort handler --- // --- Abort handler ---
@@ -1274,6 +1276,9 @@ export class ChatRunSocket {
}) })
logger.info({ sessionId, runId }, '[chat-run-socket][abort] started') logger.info({ sessionId, runId }, '[chat-run-socket][abort] started')
// Flush in-memory assistant text to DB before aborting the stream.
this.flushResponseRunToDb(state, sessionId)
if (state.abortController) { if (state.abortController) {
state.abortController.abort() state.abortController.abort()
} }
@@ -1296,6 +1301,7 @@ export class ChatRunSocket {
state.abortController = undefined state.abortController = undefined
state.runId = undefined state.runId = undefined
state.events = [] state.events = []
this.flushResponseRunToDb(state, sessionId)
state.responseRun = undefined state.responseRun = undefined
state.profile = undefined state.profile = undefined
updateSessionStats(sessionId) updateSessionStats(sessionId)