fix chat queue promotion (#1042)

This commit is contained in:
ekko
2026-05-26 17:39:38 +08:00
committed by GitHub
parent b0000b4c38
commit e926a8e2fb
2 changed files with 99 additions and 28 deletions
+59 -27
View File
@@ -388,6 +388,8 @@ export const useChatStore = defineStore('chat', () => {
const queueLengths = ref<Map<string, number>>(new Map())
/** sessionId → queued user messages not yet visible in the transcript */
const queuedUserMessages = ref<Map<string, Message[]>>(new Map())
/** sessionId → queue ids that server reported as dequeued before the peer message arrived */
const dequeuedQueueIds = ref<Map<string, Set<string>>>(new Map())
const pendingApprovals = ref<Map<string, PendingApproval>>(new Map())
const activePendingApproval = computed(() => {
const sid = activeSessionId.value
@@ -1002,41 +1004,42 @@ export const useChatStore = defineStore('chat', () => {
queuedUserMessages.value = nextMap
}
function removeQueuedMessage(sessionId: string, messageId: string) {
function updateQueuedUserMessage(sessionId: string, messageId: string, patch: Partial<Message>) {
const queue = queuedUserMessages.value.get(sessionId)
if (!queue?.length) return
const next = queue.map(message => message.id === messageId
? { ...message, ...patch, queued: true }
: message)
const nextMap = new Map(queuedUserMessages.value)
nextMap.set(sessionId, next)
queuedUserMessages.value = nextMap
}
function dropQueuedUserMessage(sessionId: string, messageId: string): boolean {
const queue = queuedUserMessages.value.get(sessionId)
if (!queue?.length) return false
const next = queue.filter(message => message.id !== messageId)
if (next.length === queue.length) return false
const nextMap = new Map(queuedUserMessages.value)
if (next.length > 0) {
nextMap.set(sessionId, next)
queueLengths.value.set(sessionId, next.length)
} else {
nextMap.delete(sessionId)
queueLengths.value.delete(sessionId)
}
queuedUserMessages.value = nextMap
queueLengths.value.set(sessionId, next.length)
return true
}
function removeQueuedMessage(sessionId: string, messageId: string) {
if (!dropQueuedUserMessage(sessionId, messageId)) return
getChatRunSocket()?.emit('cancel_queued_run', {
session_id: sessionId,
queue_id: messageId,
})
}
function promoteNextQueuedUserMessage(sessionId: string) {
const queue = queuedUserMessages.value.get(sessionId)
if (!queue?.length) return
const [next, ...rest] = queue
const nextMap = new Map(queuedUserMessages.value)
if (rest.length > 0) {
nextMap.set(sessionId, rest)
} else {
nextMap.delete(sessionId)
}
queuedUserMessages.value = nextMap
if (!getSessionMsgs(sessionId).some(message => message.id === next.id)) {
addMessage(sessionId, { ...next, queued: false })
updateSessionTitle(sessionId)
}
}
function normalizeQueuedUserMessages(rawMessages: unknown): Message[] {
if (!Array.isArray(rawMessages)) return []
return rawMessages.flatMap((raw) => {
@@ -1076,6 +1079,26 @@ export const useChatStore = defineStore('chat', () => {
queuedUserMessages.value = nextMap
}
function markDequeuedQueueId(sessionId: string, messageId: string) {
const nextMap = new Map(dequeuedQueueIds.value)
const ids = new Set(nextMap.get(sessionId) || [])
ids.add(messageId)
nextMap.set(sessionId, ids)
dequeuedQueueIds.value = nextMap
}
function consumeDequeuedQueueId(sessionId: string, messageId: string): boolean {
const ids = dequeuedQueueIds.value.get(sessionId)
if (!ids?.has(messageId)) return false
const nextIds = new Set(ids)
nextIds.delete(messageId)
const nextMap = new Map(dequeuedQueueIds.value)
if (nextIds.size > 0) nextMap.set(sessionId, nextIds)
else nextMap.delete(sessionId)
dequeuedQueueIds.value = nextMap
return true
}
function handleRunQueuedEvent(sessionId: string, evt: RunEvent) {
const queueLength = Number((evt as any).queue_length || 0)
if (queueLength > 0) {
@@ -1100,6 +1123,8 @@ export const useChatStore = defineStore('chat', () => {
if (dequeued && !getSessionMsgs(sessionId).some(message => message.id === dequeued.id)) {
addMessage(sessionId, { ...dequeued, queued: false })
updateSessionTitle(sessionId)
} else if (!dequeued) {
markDequeuedQueueId(sessionId, dequeuedId)
}
return
}
@@ -1288,11 +1313,15 @@ export const useChatStore = defineStore('chat', () => {
systemType: isBridgeSlashCommand ? 'command' : undefined,
}
if (!shouldQueue) {
if (shouldQueue) {
enqueueUserMessage(sid, userMsg)
} else {
addMessage(sid, userMsg)
updateSessionTitle(sid)
serverWorking.value.add(sid)
}
let runSubmitted = false
try {
// Build input in Anthropic format
@@ -1310,6 +1339,7 @@ export const useChatStore = defineStore('chat', () => {
const dl = urlMap.get(a.name)
return dl ? { ...a, url: dl } : a
})
updateQueuedUserMessage(sid, userMsg.id, { attachments: userMsg.attachments })
} else {
const msgs = getSessionMsgs(sid)
const lastUser = msgs.findLast(m => m.id === userMsg.id)
@@ -1349,10 +1379,6 @@ export const useChatStore = defineStore('chat', () => {
activeSession.value.messageCount = Math.max(activeSession.value.messageCount || 0, 1)
}
if (shouldQueue) {
enqueueUserMessage(sid, userMsg)
}
// Helper to clean up this session's stream state
const cleanup = () => {
streamStates.value.delete(sid)
@@ -1498,7 +1524,6 @@ export const useChatStore = defineStore('chat', () => {
runProducedAssistantText = false
runHadToolActivity = false
closeStreamingAssistant()
promoteNextQueuedUserMessage(sid)
if ((evt as any).queue_length > 0) {
queueLengths.value.set(sid, (evt as any).queue_length)
} else {
@@ -1914,11 +1939,18 @@ export const useChatStore = defineStore('chat', () => {
undefined,
{ onReconnectResume: applyReconnectResume },
)
runSubmitted = true
if (!isBridgeSlashCommand || isBridgeCompressCommand || isBridgePlanCommand || isBridgeGoalCommand) {
streamStates.value.set(sid, ctrl)
}
} catch (err: any) {
if (shouldQueue && !runSubmitted) {
dropQueuedUserMessage(sid, userMsg.id)
}
if (!shouldQueue && !runSubmitted) {
serverWorking.value.delete(sid)
}
addMessage(sid, {
id: uid(),
role: 'system',
@@ -1991,7 +2023,6 @@ export const useChatStore = defineStore('chat', () => {
runProducedAssistantText = false
runHadToolActivity = false
closeStreamingAssistant()
promoteNextQueuedUserMessage(sid)
if ((evt as any).queue_length > 0) {
queueLengths.value.set(sid, (evt as any).queue_length)
} else {
@@ -2416,7 +2447,8 @@ export const useChatStore = defineStore('chat', () => {
queued: !!peer?.queued,
systemType: peer?.role === 'command' ? 'command' : undefined,
}
if (peer?.queued) {
const wasDequeued = messageId ? consumeDequeuedQueueId(sid, messageId) : false
if (peer?.queued || (!wasDequeued && isSessionLive(sid))) {
enqueueUserMessage(sid, message)
} else {
addMessage(sid, message)
+40 -1
View File
@@ -142,6 +142,22 @@ test('keeps queued runs on one socket and does not duplicate streamed handlers',
expect(second.runCount).toBe(2)
expect(second.run.session_id).toBe(first.run.session_id)
expect(second.run.input).toBe('Second queued contract')
await expect(page.locator('p').filter({ hasText: /^Second queued contract$/ })).toHaveCount(0)
await page.evaluate(({ sid, queueId }) => {
const socket = (window as any).__PW_CHAT_SOCKET__.latest
socket.__trigger('run.peer_user_message', {
event: 'run.peer_user_message',
session_id: sid,
message: {
id: queueId,
role: 'user',
content: 'Second queued contract',
timestamp: Date.now() / 1000,
},
})
}, { sid: first.run.session_id, queueId: second.run.queue_id })
await expect(page.locator('p').filter({ hasText: /^Second queued contract$/ })).toHaveCount(0)
await page.evaluate((sid) => {
const socket = (window as any).__PW_CHAT_SOCKET__.latest
@@ -152,6 +168,29 @@ test('keeps queued runs on one socket and does not duplicate streamed handlers',
output: 'First answer',
queue_remaining: 1,
})
}, first.run.session_id)
await expect(page.locator('p').filter({ hasText: /^Second queued contract$/ })).toHaveCount(0)
await page.evaluate(({ sid, queueId }) => {
const socket = (window as any).__PW_CHAT_SOCKET__.latest
socket.__trigger('run.queued', {
event: 'run.queued',
session_id: sid,
queue_length: 0,
dequeued_queue_id: queueId,
queued_messages: [],
})
socket.__trigger('run.peer_user_message', {
event: 'run.peer_user_message',
session_id: sid,
message: {
id: queueId,
role: 'user',
content: 'Second queued contract',
timestamp: Date.now() / 1000,
},
})
socket.__trigger('run.started', { event: 'run.started', session_id: sid, run_id: 'run-2', queue_length: 0 })
socket.__trigger('message.delta', { event: 'message.delta', session_id: sid, run_id: 'run-2', delta: 'Second answer' })
socket.__trigger('run.completed', {
@@ -161,7 +200,7 @@ test('keeps queued runs on one socket and does not duplicate streamed handlers',
output: 'Second answer',
queue_remaining: 0,
})
}, first.run.session_id)
}, { sid: first.run.session_id, queueId: second.run.queue_id })
await expect(page.locator('p').filter({ hasText: /^First answer$/ })).toHaveCount(1)
await expect(page.locator('p').filter({ hasText: /^Second queued contract$/ })).toHaveCount(1)