diff --git a/packages/client/src/api/hermes/chat.ts b/packages/client/src/api/hermes/chat.ts index cc35897..b8a8c30 100644 --- a/packages/client/src/api/hermes/chat.ts +++ b/packages/client/src/api/hermes/chat.ts @@ -51,6 +51,16 @@ export interface RunEvent { session_id?: string /** Queue length from run.queued event */ queue_length?: number + /** Queue item that was just removed because it is starting now. */ + dequeued_queue_id?: string + /** Queued user messages from run.queued/resume payloads. */ + queued_messages?: Array<{ + id?: string | number + role?: string + content?: string + timestamp?: number + queued?: boolean + }> /** User message broadcast to other windows already watching the same session. */ message?: { id?: string | number @@ -522,7 +532,7 @@ function removeSocketListener(socket: Socket, event: string, handler: (...args: */ export function resumeSession( sessionId: string, - onResumed: (data: { session_id: string; messages: any[]; isWorking: boolean; isAborting?: boolean; events: any[]; inputTokens?: number; outputTokens?: number; contextTokens?: number; queueLength?: number }) => void, + onResumed: (data: { session_id: string; messages: any[]; isWorking: boolean; isAborting?: boolean; events: any[]; inputTokens?: number; outputTokens?: number; contextTokens?: number; queueLength?: number; queueMessages?: RunEvent['queued_messages'] }) => void, ): Socket { const socket = connectChatRun() diff --git a/packages/client/src/stores/hermes/chat.ts b/packages/client/src/stores/hermes/chat.ts index d13395a..35e77ee 100644 --- a/packages/client/src/stores/hermes/chat.ts +++ b/packages/client/src/stores/hermes/chat.ts @@ -538,6 +538,11 @@ export const useChatStore = defineStore('chat', () => { } else { queueLengths.value.delete(sessionId) } + if (Array.isArray((data as any).queueMessages)) { + replaceQueuedUserMessages(sessionId, normalizeQueuedUserMessages((data as any).queueMessages)) + } else if (!data.queueLength) { + replaceQueuedUserMessages(sessionId, []) + } if ((data as any).isAborting) { setAbortState({ aborting: true, synced: null }) } else if (!data.isWorking) { @@ -807,19 +812,23 @@ export const useChatStore = defineStore('chat', () => { function enqueueUserMessage(sessionId: string, message: Message) { const queue = queuedUserMessages.value.get(sessionId) || [] - queue.push({ ...message, queued: true }) - queuedUserMessages.value.set(sessionId, queue) + if (queue.some(item => item.id === message.id)) return + const nextMap = new Map(queuedUserMessages.value) + nextMap.set(sessionId, [...queue, { ...message, queued: true }]) + queuedUserMessages.value = nextMap } function removeQueuedMessage(sessionId: string, messageId: string) { const queue = queuedUserMessages.value.get(sessionId) if (!queue?.length) return const next = queue.filter(message => message.id !== messageId) + const nextMap = new Map(queuedUserMessages.value) if (next.length > 0) { - queuedUserMessages.value.set(sessionId, next) + nextMap.set(sessionId, next) } else { - queuedUserMessages.value.delete(sessionId) + nextMap.delete(sessionId) } + queuedUserMessages.value = nextMap queueLengths.value.set(sessionId, next.length) getChatRunSocket()?.emit('cancel_queued_run', { session_id: sessionId, @@ -827,6 +836,85 @@ export const useChatStore = defineStore('chat', () => { }) } + function normalizeQueuedUserMessages(rawMessages: unknown): Message[] { + if (!Array.isArray(rawMessages)) return [] + return rawMessages.flatMap((raw) => { + const peer = raw as NonNullable[number] + const content = typeof peer?.content === 'string' ? peer.content : '' + const messageId = peer?.id != null ? String(peer.id) : '' + if (!messageId || !content.trim()) return [] + const timestamp = typeof peer?.timestamp === 'number' && Number.isFinite(peer.timestamp) + ? Math.round(peer.timestamp * 1000) + : Date.now() + return [{ + id: messageId, + role: 'user' as const, + content, + timestamp, + queued: true, + }] + }) + } + + function replaceQueuedUserMessages(sessionId: string, messages: Message[]) { + const existingById = new Map((queuedUserMessages.value.get(sessionId) || []).map(message => [message.id, message])) + const merged = messages.map(message => ({ + ...(existingById.get(message.id) || {}), + ...message, + attachments: existingById.get(message.id)?.attachments || message.attachments, + queued: true, + })) + const nextMap = new Map(queuedUserMessages.value) + if (merged.length > 0) { + nextMap.set(sessionId, merged) + } else { + nextMap.delete(sessionId) + } + queuedUserMessages.value = nextMap + } + + function handleRunQueuedEvent(sessionId: string, evt: RunEvent) { + const queueLength = Number((evt as any).queue_length || 0) + if (queueLength > 0) { + queueLengths.value.set(sessionId, queueLength) + } else { + queueLengths.value.delete(sessionId) + } + + if (Array.isArray((evt as any).queued_messages) && !(evt as any).dequeued_queue_id) { + const queued = normalizeQueuedUserMessages((evt as any).queued_messages) + replaceQueuedUserMessages(sessionId, queued) + return + } + + const peer = evt.message + const content = typeof peer?.content === 'string' ? peer.content : '' + const messageId = peer?.id != null ? String(peer.id) : '' + if (!messageId || !content.trim()) return + + if ((queuedUserMessages.value.get(sessionId) || []).some(msg => msg.id === messageId)) return + + const timestamp = typeof peer?.timestamp === 'number' && Number.isFinite(peer.timestamp) + ? Math.round(peer.timestamp * 1000) + : Date.now() + const msgs = getSessionMsgs(sessionId) + const existingIndex = msgs.findIndex(msg => msg.id === messageId && msg.role === 'user') + const existing = existingIndex >= 0 ? msgs[existingIndex] : null + if (existingIndex >= 0) { + msgs.splice(existingIndex, 1) + } + + enqueueUserMessage(sessionId, { + ...(existing || {}), + id: messageId, + role: 'user', + content, + timestamp: existing?.timestamp || timestamp, + attachments: existing?.attachments, + queued: true, + }) + } + function setPendingApproval(evt: RunEvent) { const sid = evt.session_id const approvalId = (evt as any).approval_id as string | undefined @@ -869,12 +957,14 @@ export const useChatStore = defineStore('chat', () => { function showNextQueuedUserMessage(sessionId: string) { const queue = queuedUserMessages.value.get(sessionId) if (!queue?.length) return - const next = queue.shift()! - if (queue.length > 0) { - queuedUserMessages.value.set(sessionId, queue) + const [next, ...rest] = queue + const nextMap = new Map(queuedUserMessages.value) + if (rest.length > 0) { + nextMap.set(sessionId, rest) } else { - queuedUserMessages.value.delete(sessionId) + nextMap.delete(sessionId) } + queuedUserMessages.value = nextMap addMessage(sessionId, { ...next, queued: false }) updateSessionTitle(sessionId) } @@ -1054,7 +1144,7 @@ export const useChatStore = defineStore('chat', () => { break case 'run.queued': { - queueLengths.value.set(sid, (evt as any).queue_length || 0) + handleRunQueuedEvent(sid, evt) break } @@ -1492,7 +1582,7 @@ export const useChatStore = defineStore('chat', () => { if (evt.session_id && evt.session_id !== sid) return switch (evt.event) { case 'run.queued': { - queueLengths.value.set(sid, (evt as any).queue_length || 0) + handleRunQueuedEvent(sid, evt) break } diff --git a/packages/server/src/services/hermes/run-chat/index.ts b/packages/server/src/services/hermes/run-chat/index.ts index aae2dc3..24b954d 100644 --- a/packages/server/src/services/hermes/run-chat/index.ts +++ b/packages/server/src/services/hermes/run-chat/index.ts @@ -19,6 +19,7 @@ import { handleBridgeRun } from './handle-bridge-run' import { handleAbort } from './abort' import { getOrCreateSession } from './compression' import { handleSessionCommand, isSessionCommand, parseSessionCommand } from './session-command' +import { contentBlocksToString } from './content-blocks' import type { ContentBlock, QueuedRun, SessionState } from './types' import { authenticateUserToken, isAuthEnabled, type AuthenticatedUser } from '../../../middleware/user-auth' import { userCanAccessProfile } from '../../../db/hermes/users-store' @@ -162,6 +163,7 @@ export class ChatRunSocket { event: 'run.queued', session_id: data.session_id, queue_length: state.queue.length, + queued_messages: this.serializeQueuedMessages(state.queue), }) logger.info('[chat-run-socket] queued run for session %s (queue: %d)', data.session_id, state.queue.length) return @@ -199,6 +201,7 @@ export class ChatRunSocket { event: 'run.queued', session_id: data.session_id, queue_length: state.queue.length, + queued_messages: this.serializeQueuedMessages(state.queue), }) logger.info('[chat-run-socket] cancelled queued run %s for session %s (queue: %d)', data.queue_id, data.session_id, state.queue.length) @@ -308,6 +311,7 @@ export class ChatRunSocket { outputTokens: state.outputTokens, contextTokens: state.contextTokens, queueLength: state.queue?.length || 0, + queueMessages: this.serializeQueuedMessages(state.queue || []), }) logger.info('[chat-run-socket] socket %s resumed session %s (working: %s, messages: %d)', @@ -326,6 +330,8 @@ export class ChatRunSocket { event: 'run.queued', session_id: sessionId, queue_length: state.queue.length, + dequeued_queue_id: next.queue_id, + queued_messages: this.serializeQueuedMessages(state.queue), }) this.runQueuedItem(socket, sessionId, next, fallbackProfile) return true @@ -355,6 +361,16 @@ export class ChatRunSocket { } } + private serializeQueuedMessages(queue: QueuedRun[]) { + return queue.map(item => ({ + id: item.queue_id, + role: 'user', + content: contentBlocksToString(item.input), + timestamp: Math.floor(Date.now() / 1000), + queued: true, + })) + } + private canAccessProfile(user: AuthenticatedUser, profile: string): boolean { return user.role === 'super_admin' || userCanAccessProfile(user.id, profile) } diff --git a/packages/server/src/services/hermes/run-chat/session-command.ts b/packages/server/src/services/hermes/run-chat/session-command.ts index d75a8e4..db31a85 100644 --- a/packages/server/src/services/hermes/run-chat/session-command.ts +++ b/packages/server/src/services/hermes/run-chat/session-command.ts @@ -6,6 +6,7 @@ import { flushBridgePendingToDb } from './bridge-message' import { buildDbHistory, estimateSnapshotAwareHistoryUsage, forceCompressBridgeHistory, getOrCreateSession, replaceState } from './compression' import { handleAbort } from './abort' import { calcAndUpdateUsage, contextTokensWithCachedOverhead, updateMessageContextTokenUsage } from './usage' +import { contentBlocksToString } from './content-blocks' import type { ContentBlock, QueuedRun, SessionState } from './types' type CommandName = @@ -150,8 +151,9 @@ export async function handleSessionCommand( emitCommand({ ok: false, action: 'queue', message: 'Session is idle. Send the message normally instead.' }) return } + const queueId = `queue_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}` state.queue.push({ - queue_id: `queue_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}`, + queue_id: queueId, input: command.args, model: ctx.model, instructions: ctx.instructions, @@ -163,6 +165,13 @@ export async function handleSessionCommand( event: 'run.queued', session_id: sessionId, queue_length: state.queue.length, + queued_messages: state.queue.map(item => ({ + id: item.queue_id, + role: 'user', + content: contentBlocksToString(item.input), + timestamp: Math.floor(Date.now() / 1000), + queued: true, + })), }) emitCommand({ action: 'queue',