diff --git a/packages/client/src/api/hermes/chat.ts b/packages/client/src/api/hermes/chat.ts index 84ee7fb..cc35897 100644 --- a/packages/client/src/api/hermes/chat.ts +++ b/packages/client/src/api/hermes/chat.ts @@ -57,6 +57,7 @@ export interface RunEvent { role?: string content?: string timestamp?: number + queued?: boolean } } diff --git a/packages/client/src/stores/hermes/chat.ts b/packages/client/src/stores/hermes/chat.ts index ba5e54c..d13395a 100644 --- a/packages/client/src/stores/hermes/chat.ts +++ b/packages/client/src/stores/hermes/chat.ts @@ -1887,18 +1887,29 @@ export const useChatStore = defineStore('chat', () => { resumeServerWorkingRun(sid, true) return } + if (messageId && (queuedUserMessages.value.get(sid) || []).some(msg => msg.id === messageId)) { + serverWorking.value.add(sid) + resumeServerWorkingRun(sid, true) + return + } const timestamp = typeof peer?.timestamp === 'number' && Number.isFinite(peer.timestamp) ? Math.round(peer.timestamp * 1000) : Date.now() - addMessage(sid, { + const message: Message = { id: messageId || uid(), role: 'user', content, timestamp, - }) - updateSessionTitle(sid) + queued: !!peer?.queued, + } + if (peer?.queued) { + enqueueUserMessage(sid, message) + } else { + addMessage(sid, message) + updateSessionTitle(sid) + } serverWorking.value.add(sid) resumeServerWorkingRun(sid, true) } diff --git a/packages/server/src/services/hermes/run-chat/handle-api-run.ts b/packages/server/src/services/hermes/run-chat/handle-api-run.ts index 94a44ab..650eb3f 100644 --- a/packages/server/src/services/hermes/run-chat/handle-api-run.ts +++ b/packages/server/src/services/hermes/run-chat/handle-api-run.ts @@ -68,7 +68,7 @@ export async function loadSessionStateFromDb(sid: string, _sessionMap: Map, socket: Socket, - data: { input: string | ContentBlock[]; session_id?: string; model?: string; provider?: string; instructions?: string; source?: string }, + data: { input: string | ContentBlock[]; session_id?: string; model?: string; provider?: string; instructions?: string; source?: string; queue_id?: string; peerExcludeSocketId?: string }, profile: string, sessionMap: Map, skipUserMessage = false, @@ -133,7 +133,7 @@ export async function handleApiRun( content: inputStr, timestamp: now, }) - peerUserMessage = { id: messageId, role: 'user', content: inputStr, timestamp: now } + peerUserMessage = { id: data.queue_id ? undefined : messageId, role: 'user', content: inputStr, timestamp: now } } else { const inputStr = contentBlocksToString(input) state.messages.push({ @@ -155,15 +155,21 @@ export async function handleApiRun( content: inputStr, timestamp: now, }) - peerUserMessage = { id: messageId, role: 'user', content: inputStr, timestamp: now } + peerUserMessage = { id: data.queue_id ? undefined : messageId, role: 'user', content: inputStr, timestamp: now } } socket.join(`session:${session_id}`) if (peerUserMessage) { - socket.to(`session:${session_id}`).emit('run.peer_user_message', { + const target = data.peerExcludeSocketId + ? nsp.to(`session:${session_id}`).except(data.peerExcludeSocketId) + : socket.to(`session:${session_id}`) + target.emit('run.peer_user_message', { event: 'run.peer_user_message', session_id, - message: peerUserMessage, + message: { + ...peerUserMessage, + id: data.queue_id || peerUserMessage.id, + }, }) } } diff --git a/packages/server/src/services/hermes/run-chat/handle-bridge-run.ts b/packages/server/src/services/hermes/run-chat/handle-bridge-run.ts index cb4a4f6..2025340 100644 --- a/packages/server/src/services/hermes/run-chat/handle-bridge-run.ts +++ b/packages/server/src/services/hermes/run-chat/handle-bridge-run.ts @@ -94,7 +94,7 @@ function cacheBridgeContext(state: SessionState, data: Record | export async function handleBridgeRun( nsp: ReturnType, socket: Socket, - data: { input: string | ContentBlock[]; session_id?: string; model?: string; provider?: string; model_groups?: RunModelGroup[]; instructions?: string; source?: string }, + data: { input: string | ContentBlock[]; session_id?: string; model?: string; provider?: string; model_groups?: RunModelGroup[]; instructions?: string; source?: string; queue_id?: string; peerExcludeSocketId?: string }, profile: string, sessionMap: Map, bridge: AgentBridgeClient, @@ -181,11 +181,14 @@ export async function handleBridgeRun( }) socket.join(`session:${session_id}`) - socket.to(`session:${session_id}`).emit('run.peer_user_message', { + const peerTarget = data.peerExcludeSocketId + ? nsp.to(`session:${session_id}`).except(data.peerExcludeSocketId) + : socket.to(`session:${session_id}`) + peerTarget.emit('run.peer_user_message', { event: 'run.peer_user_message', session_id, message: { - id: messageId, + id: data.queue_id || messageId, role: 'user', content: inputStr, timestamp: now, diff --git a/packages/server/src/services/hermes/run-chat/index.ts b/packages/server/src/services/hermes/run-chat/index.ts index 2956d77..135cbd6 100644 --- a/packages/server/src/services/hermes/run-chat/index.ts +++ b/packages/server/src/services/hermes/run-chat/index.ts @@ -19,6 +19,7 @@ import { handleBridgeRun } from './handle-bridge-run' import { handleAbort } from './abort' import { getOrCreateSession } from './compression' import { handleSessionCommand, isSessionCommand, parseSessionCommand } from './session-command' +import { contentBlocksToString } from './content-blocks' import type { ContentBlock, QueuedRun, SessionState } from './types' import { authenticateUserToken, isAuthEnabled, type AuthenticatedUser } from '../../../middleware/user-auth' import { userCanAccessProfile } from '../../../db/hermes/users-store' @@ -146,8 +147,9 @@ export class ChatRunSocket { return } if (state.isWorking) { + const queueId = data.queue_id || `queue_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}` state.queue.push({ - queue_id: data.queue_id || `queue_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}`, + queue_id: queueId, input: data.input, model: data.model, provider: data.provider, @@ -155,6 +157,18 @@ export class ChatRunSocket { instructions: data.instructions, profile: runProfile, source, + originSocketId: socket.id, + }) + socket.to(`session:${data.session_id}`).emit('run.peer_user_message', { + event: 'run.peer_user_message', + session_id: data.session_id, + message: { + id: queueId, + role: 'user', + content: contentBlocksToString(data.input), + timestamp: Math.floor(Date.now() / 1000), + queued: true, + }, }) this.nsp.to(`session:${data.session_id}`).emit('run.queued', { event: 'run.queued', @@ -249,6 +263,8 @@ export class ChatRunSocket { model_groups?: Array<{ provider: string; models: string[] }> instructions?: string source?: string + queue_id?: string + peerExcludeSocketId?: string }, profile: string, skipUserMessage = false, @@ -336,6 +352,8 @@ export class ChatRunSocket { model_groups: next.model_groups, instructions: next.instructions, source: next.source, + queue_id: next.queue_id, + peerExcludeSocketId: next.originSocketId, }, next.profile || fallbackProfile, true) } diff --git a/packages/server/src/services/hermes/run-chat/session-command.ts b/packages/server/src/services/hermes/run-chat/session-command.ts index 794fa61..d75a8e4 100644 --- a/packages/server/src/services/hermes/run-chat/session-command.ts +++ b/packages/server/src/services/hermes/run-chat/session-command.ts @@ -157,6 +157,7 @@ export async function handleSessionCommand( instructions: ctx.instructions, profile: ctx.profile, source: 'cli', + originSocketId: ctx.socket.id, }) emitToSession(ctx.nsp, ctx.socket, sessionId, 'run.queued', { event: 'run.queued', diff --git a/packages/server/src/services/hermes/run-chat/types.ts b/packages/server/src/services/hermes/run-chat/types.ts index 2a87f96..a0974c7 100644 --- a/packages/server/src/services/hermes/run-chat/types.ts +++ b/packages/server/src/services/hermes/run-chat/types.ts @@ -34,6 +34,7 @@ export interface QueuedRun { instructions?: string profile: string source?: ChatRunSource + originSocketId?: string } export interface SessionState {