diff --git a/packages/client/src/stores/hermes/chat.ts b/packages/client/src/stores/hermes/chat.ts index 3722d77..36c3c14 100644 --- a/packages/client/src/stores/hermes/chat.ts +++ b/packages/client/src/stores/hermes/chat.ts @@ -388,6 +388,8 @@ export const useChatStore = defineStore('chat', () => { const queueLengths = ref>(new Map()) /** sessionId → queued user messages not yet visible in the transcript */ const queuedUserMessages = ref>(new Map()) + /** sessionId → queue ids that server reported as dequeued before the peer message arrived */ + const dequeuedQueueIds = ref>>(new Map()) const pendingApprovals = ref>(new Map()) const activePendingApproval = computed(() => { const sid = activeSessionId.value @@ -1002,41 +1004,42 @@ export const useChatStore = defineStore('chat', () => { queuedUserMessages.value = nextMap } - function removeQueuedMessage(sessionId: string, messageId: string) { + function updateQueuedUserMessage(sessionId: string, messageId: string, patch: Partial) { const queue = queuedUserMessages.value.get(sessionId) if (!queue?.length) return + const next = queue.map(message => message.id === messageId + ? { ...message, ...patch, queued: true } + : message) + const nextMap = new Map(queuedUserMessages.value) + nextMap.set(sessionId, next) + queuedUserMessages.value = nextMap + } + + function dropQueuedUserMessage(sessionId: string, messageId: string): boolean { + const queue = queuedUserMessages.value.get(sessionId) + if (!queue?.length) return false const next = queue.filter(message => message.id !== messageId) + if (next.length === queue.length) return false const nextMap = new Map(queuedUserMessages.value) if (next.length > 0) { nextMap.set(sessionId, next) + queueLengths.value.set(sessionId, next.length) } else { nextMap.delete(sessionId) + queueLengths.value.delete(sessionId) } queuedUserMessages.value = nextMap - queueLengths.value.set(sessionId, next.length) + return true + } + + function removeQueuedMessage(sessionId: string, messageId: string) { + if (!dropQueuedUserMessage(sessionId, messageId)) return getChatRunSocket()?.emit('cancel_queued_run', { session_id: sessionId, queue_id: messageId, }) } - function promoteNextQueuedUserMessage(sessionId: string) { - const queue = queuedUserMessages.value.get(sessionId) - if (!queue?.length) return - const [next, ...rest] = queue - const nextMap = new Map(queuedUserMessages.value) - if (rest.length > 0) { - nextMap.set(sessionId, rest) - } else { - nextMap.delete(sessionId) - } - queuedUserMessages.value = nextMap - if (!getSessionMsgs(sessionId).some(message => message.id === next.id)) { - addMessage(sessionId, { ...next, queued: false }) - updateSessionTitle(sessionId) - } - } - function normalizeQueuedUserMessages(rawMessages: unknown): Message[] { if (!Array.isArray(rawMessages)) return [] return rawMessages.flatMap((raw) => { @@ -1076,6 +1079,26 @@ export const useChatStore = defineStore('chat', () => { queuedUserMessages.value = nextMap } + function markDequeuedQueueId(sessionId: string, messageId: string) { + const nextMap = new Map(dequeuedQueueIds.value) + const ids = new Set(nextMap.get(sessionId) || []) + ids.add(messageId) + nextMap.set(sessionId, ids) + dequeuedQueueIds.value = nextMap + } + + function consumeDequeuedQueueId(sessionId: string, messageId: string): boolean { + const ids = dequeuedQueueIds.value.get(sessionId) + if (!ids?.has(messageId)) return false + const nextIds = new Set(ids) + nextIds.delete(messageId) + const nextMap = new Map(dequeuedQueueIds.value) + if (nextIds.size > 0) nextMap.set(sessionId, nextIds) + else nextMap.delete(sessionId) + dequeuedQueueIds.value = nextMap + return true + } + function handleRunQueuedEvent(sessionId: string, evt: RunEvent) { const queueLength = Number((evt as any).queue_length || 0) if (queueLength > 0) { @@ -1100,6 +1123,8 @@ export const useChatStore = defineStore('chat', () => { if (dequeued && !getSessionMsgs(sessionId).some(message => message.id === dequeued.id)) { addMessage(sessionId, { ...dequeued, queued: false }) updateSessionTitle(sessionId) + } else if (!dequeued) { + markDequeuedQueueId(sessionId, dequeuedId) } return } @@ -1288,11 +1313,15 @@ export const useChatStore = defineStore('chat', () => { systemType: isBridgeSlashCommand ? 'command' : undefined, } - if (!shouldQueue) { + if (shouldQueue) { + enqueueUserMessage(sid, userMsg) + } else { addMessage(sid, userMsg) updateSessionTitle(sid) + serverWorking.value.add(sid) } + let runSubmitted = false try { // Build input in Anthropic format @@ -1310,6 +1339,7 @@ export const useChatStore = defineStore('chat', () => { const dl = urlMap.get(a.name) return dl ? { ...a, url: dl } : a }) + updateQueuedUserMessage(sid, userMsg.id, { attachments: userMsg.attachments }) } else { const msgs = getSessionMsgs(sid) const lastUser = msgs.findLast(m => m.id === userMsg.id) @@ -1349,10 +1379,6 @@ export const useChatStore = defineStore('chat', () => { activeSession.value.messageCount = Math.max(activeSession.value.messageCount || 0, 1) } - if (shouldQueue) { - enqueueUserMessage(sid, userMsg) - } - // Helper to clean up this session's stream state const cleanup = () => { streamStates.value.delete(sid) @@ -1498,7 +1524,6 @@ export const useChatStore = defineStore('chat', () => { runProducedAssistantText = false runHadToolActivity = false closeStreamingAssistant() - promoteNextQueuedUserMessage(sid) if ((evt as any).queue_length > 0) { queueLengths.value.set(sid, (evt as any).queue_length) } else { @@ -1914,11 +1939,18 @@ export const useChatStore = defineStore('chat', () => { undefined, { onReconnectResume: applyReconnectResume }, ) + runSubmitted = true if (!isBridgeSlashCommand || isBridgeCompressCommand || isBridgePlanCommand || isBridgeGoalCommand) { streamStates.value.set(sid, ctrl) } } catch (err: any) { + if (shouldQueue && !runSubmitted) { + dropQueuedUserMessage(sid, userMsg.id) + } + if (!shouldQueue && !runSubmitted) { + serverWorking.value.delete(sid) + } addMessage(sid, { id: uid(), role: 'system', @@ -1991,7 +2023,6 @@ export const useChatStore = defineStore('chat', () => { runProducedAssistantText = false runHadToolActivity = false closeStreamingAssistant() - promoteNextQueuedUserMessage(sid) if ((evt as any).queue_length > 0) { queueLengths.value.set(sid, (evt as any).queue_length) } else { @@ -2416,7 +2447,8 @@ export const useChatStore = defineStore('chat', () => { queued: !!peer?.queued, systemType: peer?.role === 'command' ? 'command' : undefined, } - if (peer?.queued) { + const wasDequeued = messageId ? consumeDequeuedQueueId(sid, messageId) : false + if (peer?.queued || (!wasDequeued && isSessionLive(sid))) { enqueueUserMessage(sid, message) } else { addMessage(sid, message) diff --git a/tests/e2e/chat-streaming.spec.ts b/tests/e2e/chat-streaming.spec.ts index 8e65377..2da8483 100644 --- a/tests/e2e/chat-streaming.spec.ts +++ b/tests/e2e/chat-streaming.spec.ts @@ -142,6 +142,22 @@ test('keeps queued runs on one socket and does not duplicate streamed handlers', expect(second.runCount).toBe(2) expect(second.run.session_id).toBe(first.run.session_id) expect(second.run.input).toBe('Second queued contract') + await expect(page.locator('p').filter({ hasText: /^Second queued contract$/ })).toHaveCount(0) + + await page.evaluate(({ sid, queueId }) => { + const socket = (window as any).__PW_CHAT_SOCKET__.latest + socket.__trigger('run.peer_user_message', { + event: 'run.peer_user_message', + session_id: sid, + message: { + id: queueId, + role: 'user', + content: 'Second queued contract', + timestamp: Date.now() / 1000, + }, + }) + }, { sid: first.run.session_id, queueId: second.run.queue_id }) + await expect(page.locator('p').filter({ hasText: /^Second queued contract$/ })).toHaveCount(0) await page.evaluate((sid) => { const socket = (window as any).__PW_CHAT_SOCKET__.latest @@ -152,6 +168,29 @@ test('keeps queued runs on one socket and does not duplicate streamed handlers', output: 'First answer', queue_remaining: 1, }) + }, first.run.session_id) + + await expect(page.locator('p').filter({ hasText: /^Second queued contract$/ })).toHaveCount(0) + + await page.evaluate(({ sid, queueId }) => { + const socket = (window as any).__PW_CHAT_SOCKET__.latest + socket.__trigger('run.queued', { + event: 'run.queued', + session_id: sid, + queue_length: 0, + dequeued_queue_id: queueId, + queued_messages: [], + }) + socket.__trigger('run.peer_user_message', { + event: 'run.peer_user_message', + session_id: sid, + message: { + id: queueId, + role: 'user', + content: 'Second queued contract', + timestamp: Date.now() / 1000, + }, + }) socket.__trigger('run.started', { event: 'run.started', session_id: sid, run_id: 'run-2', queue_length: 0 }) socket.__trigger('message.delta', { event: 'message.delta', session_id: sid, run_id: 'run-2', delta: 'Second answer' }) socket.__trigger('run.completed', { @@ -161,7 +200,7 @@ test('keeps queued runs on one socket and does not duplicate streamed handlers', output: 'Second answer', queue_remaining: 0, }) - }, first.run.session_id) + }, { sid: first.run.session_id, queueId: second.run.queue_id }) await expect(page.locator('p').filter({ hasText: /^First answer$/ })).toHaveCount(1) await expect(page.locator('p').filter({ hasText: /^Second queued contract$/ })).toHaveCount(1)