diff --git a/packages/client/src/api/hermes/chat.ts b/packages/client/src/api/hermes/chat.ts index a4c653d..a9ef556 100644 --- a/packages/client/src/api/hermes/chat.ts +++ b/packages/client/src/api/hermes/chat.ts @@ -16,6 +16,7 @@ export interface StartRunRequest { instructions?: string session_id?: string model?: string + queue_id?: string } export interface StartRunResponse { @@ -45,6 +46,8 @@ export interface RunEvent { } /** session_id tag added by server for client-side filtering */ session_id?: string + /** Queue length from run.queued event */ + queue_length?: number } // ============================ @@ -73,6 +76,7 @@ const sessionEventHandlers = new Map void onAbortCompleted: (event: RunEvent) => void onUsageUpdated: (event: RunEvent) => void + onRunQueued?: (event: RunEvent) => void }>() /** @@ -179,7 +183,8 @@ function globalRunCompletedHandler(event: RunEvent): void { handlers.onRunCompleted(event) } - // Auto-cleanup session handlers on completion + // Auto-cleanup session handlers on completion (skip if more runs queued) + if ((event as any).queue_remaining > 0) return sessionEventHandlers.delete(sid) } @@ -195,10 +200,24 @@ function globalRunFailedHandler(event: RunEvent): void { handlers.onRunFailed(event) } - // Auto-cleanup session handlers on failure + // Auto-cleanup session handlers on failure (skip if more runs queued) + if ((event as any).queue_remaining > 0) return sessionEventHandlers.delete(sid) } +/** + * Global run.queued event handler + */ +function globalRunQueuedHandler(event: RunEvent): void { + const sid = event.session_id + if (!sid) return + + const handlers = sessionEventHandlers.get(sid) + if (handlers?.onRunQueued) { + handlers.onRunQueued(event) + } +} + /** * Global compression.started event handler */ @@ -250,6 +269,9 @@ function globalAbortCompletedHandler(event: RunEvent): void { handlers.onAbortCompleted(event) } + // If abort completion is followed by queued runs, keep the handler alive so + // the next run.started/message.delta/run.completed events are still received. + if ((event as any).queue_length > 0) return sessionEventHandlers.delete(sid) } @@ -289,6 +311,7 @@ export function registerSessionHandlers( onAbortStarted: (event: RunEvent) => void onAbortCompleted: (event: RunEvent) => void onUsageUpdated: (event: RunEvent) => void + onRunQueued?: (event: RunEvent) => void } ): () => void { sessionEventHandlers.set(sessionId, handlers) @@ -361,6 +384,7 @@ export function connectChatRun(): Socket { chatRunSocket.on('run.started', globalRunStartedHandler) chatRunSocket.on('run.failed', globalRunFailedHandler) chatRunSocket.on('run.completed', globalRunCompletedHandler) + chatRunSocket.on('run.queued', globalRunQueuedHandler) // Compression events chatRunSocket.on('compression.started', globalCompressionStartedHandler) @@ -395,7 +419,7 @@ export function disconnectChatRun(): void { */ export function resumeSession( sessionId: string, - onResumed: (data: { session_id: string; messages: any[]; isWorking: boolean; isAborting?: boolean; events: any[]; inputTokens?: number; outputTokens?: number }) => void, + onResumed: (data: { session_id: string; messages: any[]; isWorking: boolean; isAborting?: boolean; events: any[]; inputTokens?: number; outputTokens?: number; queueLength?: number }) => void, ): Socket { const socket = connectChatRun() @@ -418,6 +442,18 @@ export function startRunViaSocket( } let closed = false + const socket = connectChatRun() + + if (sessionEventHandlers.has(sid)) { + socket.emit('run', body) + return { + abort: () => { + if (!closed) { + socket.emit('abort', { session_id: sid }) + } + }, + } + } // Define event handlers for this session const handlers = { @@ -453,12 +489,14 @@ export function startRunViaSocket( onRunCompleted: (evt: RunEvent) => { if (closed) return onEvent(evt) + if ((evt as any).queue_remaining > 0) return closed = true onDone() }, onRunFailed: (evt: RunEvent) => { if (closed) return onEvent(evt) + if ((evt as any).queue_remaining > 0) return closed = true onError(new Error(evt.error || 'Run failed')) }, @@ -477,6 +515,7 @@ export function startRunViaSocket( onAbortCompleted: (evt: RunEvent) => { if (closed) return onEvent(evt) + if ((evt as any).queue_length > 0) return closed = true onDone() }, @@ -484,13 +523,16 @@ export function startRunViaSocket( if (closed) return onEvent(evt) }, + onRunQueued: (evt: RunEvent) => { + if (closed) return + onEvent(evt) + }, } // Register handlers in the global session map sessionEventHandlers.set(sid, handlers) // Emit run request - const socket = connectChatRun() socket.emit('run', body) return { diff --git a/packages/client/src/components/hermes/chat/ChatInput.vue b/packages/client/src/components/hermes/chat/ChatInput.vue index 31b8c3d..3afb724 100644 --- a/packages/client/src/components/hermes/chat/ChatInput.vue +++ b/packages/client/src/components/hermes/chat/ChatInput.vue @@ -374,7 +374,7 @@ function isImage(type: string): boolean { @@ -310,12 +359,214 @@ watch(currentToolCalls, () => { flex-direction: column; gap: 16px; background-color: $bg-card; + position: relative; .dark & { background-color: #333333; } } +.queue-float-panel { + position: sticky; + right: 16px; + bottom: 16px; + z-index: 4; + align-self: flex-end; + width: min(340px, calc(100% - 16px)); + margin-top: auto; + padding: 10px; + border: 1px solid rgba(var(--accent-info-rgb), 0.22); + border-radius: 16px; + background: #ffffff; + box-shadow: 0 14px 40px rgba(0, 0, 0, 0.14); + backdrop-filter: blur(14px); + + .dark & { + background: #262626; + } +} + +.queue-float-header { + display: flex; + align-items: center; + gap: 8px; + padding: 2px 4px 8px; + color: $text-secondary; + font-size: 12px; + font-weight: 600; + + strong { + margin-left: auto; + min-width: 20px; + height: 20px; + display: inline-flex; + align-items: center; + justify-content: center; + border-radius: 999px; + background: rgba(var(--accent-info-rgb), 0.16); + color: var(--accent-info); + } +} + +.queue-orbit { + width: 18px; + height: 18px; + border-radius: 50%; + border: 1px solid rgba(var(--accent-info-rgb), 0.28); + position: relative; + animation: queue-spin 1.6s linear infinite; + + span { + position: absolute; + width: 6px; + height: 6px; + border-radius: 50%; + right: -2px; + top: 5px; + background: var(--accent-info); + box-shadow: 0 0 12px rgba(var(--accent-info-rgb), 0.65); + } +} + +.queue-float-list { + display: flex; + flex-direction: column; + gap: 6px; + max-height: 172px; + overflow-y: auto; +} + +.queue-float-item { + display: flex; + align-items: center; + gap: 8px; + min-height: 34px; + padding: 7px 8px; + border-radius: 11px; + background: rgba(255, 255, 255, 0.68); + color: $text-primary; + + .dark & { + background: rgba(255, 255, 255, 0.08); + } +} + +.queue-index { + flex: 0 0 auto; + width: 20px; + height: 20px; + border-radius: 7px; + display: inline-flex; + align-items: center; + justify-content: center; + font-size: 11px; + color: var(--accent-info); + background: rgba(var(--accent-info-rgb), 0.12); +} + +.queue-text { + min-width: 0; + flex: 1; + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; + font-size: 12px; +} + +.queue-remove { + flex: 0 0 auto; + width: 24px; + height: 24px; + border: none; + border-radius: 8px; + display: inline-flex; + align-items: center; + justify-content: center; + color: $text-muted; + background: transparent; + cursor: pointer; + transition: all $transition-fast; + + &:hover { + color: $error; + background: rgba($error, 0.1); + } +} + +@media (max-width: 640px) { + .queue-float-panel { + right: 8px; + bottom: 8px; + width: min(260px, calc(100% - 8px)); + padding: 7px; + border-radius: 14px; + } + + .queue-float-header { + padding: 0 2px; + font-size: 11px; + + span:nth-child(2) { + display: none; + } + } + + .queue-orbit { + width: 16px; + height: 16px; + + span { + width: 5px; + height: 5px; + top: 5px; + } + } + + .queue-float-list { + margin-top: 6px; + max-height: min(220px, 34dvh); + overflow-y: auto; + } + + .queue-float-item { + min-height: 30px; + padding: 5px 6px; + } + + .queue-index { + width: 18px; + height: 18px; + border-radius: 6px; + font-size: 10px; + } + + .queue-text { + font-size: 11px; + } + + .queue-remove { + width: 22px; + height: 22px; + } +} + +@keyframes queue-spin { + to { + transform: rotate(360deg); + } +} + +.queue-float-enter-active, +.queue-float-leave-active { + transition: opacity 0.2s ease, transform 0.2s ease; +} + +.queue-float-enter-from, +.queue-float-leave-to { + opacity: 0; + transform: translateY(10px) scale(0.98); +} + .empty-state { flex: 1; display: flex; diff --git a/packages/client/src/composables/useSpeech.ts b/packages/client/src/composables/useSpeech.ts index 5369d57..ea1b754 100644 --- a/packages/client/src/composables/useSpeech.ts +++ b/packages/client/src/composables/useSpeech.ts @@ -15,6 +15,12 @@ export interface SpeechState { progress: number // 当前进度(字符数) } +interface SpeechQueueItem { + messageId: string + content: string + options: SpeechOptions +} + /** * Web Speech API 语音播放 Composable */ @@ -29,7 +35,8 @@ export function useSpeech() { }) let utterance: SpeechSynthesisUtterance | null = null - let currentText = '' + let playbackToken = 0 + const speechQueue: SpeechQueueItem[] = [] // 加载可用语音列表 function loadVoices() { @@ -106,8 +113,12 @@ export function useSpeech() { /** * 停止当前播放 */ - function stop() { - if (synth.speaking) { + function stop(clearQueue = true) { + playbackToken += 1 + if (clearQueue) { + speechQueue.length = 0 + } + if (synth.speaking || synth.pending || synth.paused) { synth.cancel() } if (utterance) { @@ -119,7 +130,89 @@ export function useSpeech() { currentMessageId: null, progress: 0, } - currentText = '' + } + + function speak(messageId: string, text: string, options: SpeechOptions = {}) { + const token = ++playbackToken + + utterance = new SpeechSynthesisUtterance(text) + const activeUtterance = utterance + const activeText = text + + // 设置语音参数 + utterance.rate = options.rate ?? 1 + utterance.pitch = options.pitch ?? 1 + utterance.volume = options.volume ?? 1 + utterance.voice = options.voice ?? getDefaultVoice() + + console.log('[useSpeech] Selected voice:', utterance.voice?.name, utterance.voice?.lang) + + if (options.lang) { + utterance.lang = options.lang + } else if (utterance.voice) { + utterance.lang = utterance.voice.lang + } + + // 事件监听 + utterance.onstart = () => { + if (token !== playbackToken || utterance !== activeUtterance) return + console.log('[useSpeech] onstart fired') + state.value.isPlaying = true + state.value.isPaused = false + state.value.currentMessageId = messageId + state.value.progress = 0 + } + + utterance.onboundary = (event) => { + if (token !== playbackToken || utterance !== activeUtterance) return + if (event.name === 'word') { + state.value.progress = event.charIndex + } + } + + utterance.onend = () => { + if (token !== playbackToken || utterance !== activeUtterance) return + console.log('[useSpeech] onend fired') + state.value.isPlaying = false + state.value.isPaused = false + state.value.currentMessageId = null + state.value.progress = activeText.length + utterance = null + if (speechQueue.length > 0) { + window.setTimeout(playNextQueuedSpeech, 0) + } + } + + utterance.onerror = (event) => { + if (token !== playbackToken || utterance !== activeUtterance) return + console.error('[useSpeech] Speech synthesis error:', event.error) + state.value.isPlaying = false + state.value.isPaused = false + state.value.currentMessageId = null + utterance = null + if (speechQueue.length > 0) { + window.setTimeout(playNextQueuedSpeech, 0) + } + } + + // 开始播放 + console.log('[useSpeech] Calling synth.speak()') + synth.speak(utterance) + } + + function playNextQueuedSpeech() { + if (state.value.isPlaying || state.value.isPaused || synth.speaking || synth.pending) return + const next = speechQueue.shift() + if (!next) return + + const text = extractReadableText(next.content) + if (!text) { + window.setTimeout(playNextQueuedSpeech, 0) + return + } + + console.log('[useSpeech] Playing queued text:', text.substring(0, 50) + '...') + speak(next.messageId, text, next.options) } /** @@ -159,58 +252,23 @@ export function useSpeech() { // 停止当前播放 stop() + speak(messageId, text, options) + } - // 创建新的 utterance - utterance = new SpeechSynthesisUtterance(text) - currentText = text - - // 设置语音参数 - utterance.rate = options.rate ?? 1 - utterance.pitch = options.pitch ?? 1 - utterance.volume = options.volume ?? 1 - utterance.voice = options.voice ?? getDefaultVoice() - - console.log('[useSpeech] Selected voice:', utterance.voice?.name, utterance.voice?.lang) - - if (options.lang) { - utterance.lang = options.lang - } else if (utterance.voice) { - utterance.lang = utterance.voice.lang + /** + * 自动播放入队:不打断当前语音,按完成顺序依次播放。 + */ + function enqueue(messageId: string, content: string, options: SpeechOptions = {}) { + if (!isSupported.value) { + console.warn('[useSpeech] Speech synthesis not supported') + return } - - // 事件监听 - utterance.onstart = () => { - console.log('[useSpeech] onstart fired') - state.value.isPlaying = true - state.value.isPaused = false - state.value.currentMessageId = messageId - state.value.progress = 0 + if (!extractReadableText(content)) { + console.warn('[useSpeech] No readable text found') + return } - - utterance.onboundary = (event) => { - if (event.name === 'word') { - state.value.progress = event.charIndex - } - } - - utterance.onend = () => { - console.log('[useSpeech] onend fired') - state.value.isPlaying = false - state.value.isPaused = false - state.value.currentMessageId = null - state.value.progress = currentText.length - } - - utterance.onerror = (event) => { - console.error('[useSpeech] Speech synthesis error:', event.error) - state.value.isPlaying = false - state.value.isPaused = false - state.value.currentMessageId = null - } - - // 开始播放 - console.log('[useSpeech] Calling synth.speak()') - synth.speak(utterance) + speechQueue.push({ messageId, content, options }) + playNextQueuedSpeech() } /** @@ -269,6 +327,7 @@ export function useSpeech() { resume, stop, toggle, + enqueue, getDefaultVoice, getAllVoices, extractReadableText, diff --git a/packages/client/src/i18n/locales/de.ts b/packages/client/src/i18n/locales/de.ts index d77f089..eeb8d27 100644 --- a/packages/client/src/i18n/locales/de.ts +++ b/packages/client/src/i18n/locales/de.ts @@ -117,6 +117,8 @@ export default { emptyState: 'Starten Sie eine Konversation mit Hermes Agent', inputPlaceholder: 'Nachricht eingeben... (Enter zum Senden, Shift+Enter fur neue Zeile)', attachFiles: 'Dateien anhangen', + messageQueue: 'Nachrichtenwarteschlange', + removeQueuedMessage: 'Nachricht aus Warteschlange entfernen', stop: 'Stopp', send: 'Senden', contextUsed: 'Kontext verwendet:', diff --git a/packages/client/src/i18n/locales/en.ts b/packages/client/src/i18n/locales/en.ts index 4552f02..b9802ba 100644 --- a/packages/client/src/i18n/locales/en.ts +++ b/packages/client/src/i18n/locales/en.ts @@ -128,6 +128,8 @@ export default { inputPlaceholder: 'Type a message... (Enter to send, Shift+Enter for new line)', attachFiles: 'Attach files', autoPlaySpeech: 'Auto-play voice', + messageQueue: 'Message queue', + removeQueuedMessage: 'Remove queued message', stop: 'Stop', start: 'Start', stopGateway: 'Stop Gateway', diff --git a/packages/client/src/i18n/locales/es.ts b/packages/client/src/i18n/locales/es.ts index f38ed76..5ab0ffb 100644 --- a/packages/client/src/i18n/locales/es.ts +++ b/packages/client/src/i18n/locales/es.ts @@ -117,6 +117,8 @@ export default { emptyState: 'Inicia una conversacion con Hermes Agent', inputPlaceholder: 'Escribe un mensaje... (Enter para enviar, Shift+Enter para nueva linea)', attachFiles: 'Adjuntar archivos', + messageQueue: 'Cola de mensajes', + removeQueuedMessage: 'Quitar mensaje de la cola', stop: 'Detener', send: 'Enviar', contextUsed: 'Contexto utilizado:', diff --git a/packages/client/src/i18n/locales/fr.ts b/packages/client/src/i18n/locales/fr.ts index 670d8a0..5dc28fd 100644 --- a/packages/client/src/i18n/locales/fr.ts +++ b/packages/client/src/i18n/locales/fr.ts @@ -117,6 +117,8 @@ export default { emptyState: 'Demarrer une conversation avec Hermes Agent', inputPlaceholder: 'Tapez un message... (Entree pour envoyer, Shift+Entree pour un saut de ligne)', attachFiles: 'Joindre des fichiers', + messageQueue: 'File de messages', + removeQueuedMessage: 'Retirer le message de la file', stop: 'Arreter', send: 'Envoyer', contextUsed: 'Contexte utilise :', diff --git a/packages/client/src/i18n/locales/ja.ts b/packages/client/src/i18n/locales/ja.ts index 8bf80de..0f7d154 100644 --- a/packages/client/src/i18n/locales/ja.ts +++ b/packages/client/src/i18n/locales/ja.ts @@ -117,6 +117,8 @@ export default { emptyState: 'Hermes Agent と会話を開始しましょう', inputPlaceholder: 'メッセージを入力... (Enter で送信、Shift+Enter で改行)', attachFiles: 'ファイルを添付', + messageQueue: 'メッセージキュー', + removeQueuedMessage: 'キューのメッセージを削除', stop: '停止', send: '送信', contextUsed: 'コンテキスト使用量:', diff --git a/packages/client/src/i18n/locales/ko.ts b/packages/client/src/i18n/locales/ko.ts index af0d3d6..3ad9ec9 100644 --- a/packages/client/src/i18n/locales/ko.ts +++ b/packages/client/src/i18n/locales/ko.ts @@ -117,6 +117,8 @@ export default { emptyState: 'Hermes Agent와 대화를 시작하세요', inputPlaceholder: '메시지를 입력하세요... (Enter로 전송, Shift+Enter로 줄바꿈)', attachFiles: '파일 첨부', + messageQueue: '메시지 대기열', + removeQueuedMessage: '대기열 메시지 제거', stop: '중지', send: '전송', contextUsed: '사용된 컨텍스트:', diff --git a/packages/client/src/i18n/locales/pt.ts b/packages/client/src/i18n/locales/pt.ts index b2fbf6b..8379f90 100644 --- a/packages/client/src/i18n/locales/pt.ts +++ b/packages/client/src/i18n/locales/pt.ts @@ -117,6 +117,8 @@ export default { emptyState: 'Inicie uma conversa com o Hermes Agent', inputPlaceholder: 'Digite uma mensagem... (Enter para enviar, Shift+Enter para nova linha)', attachFiles: 'Anexar arquivos', + messageQueue: 'Fila de mensagens', + removeQueuedMessage: 'Remover mensagem da fila', stop: 'Parar', send: 'Enviar', contextUsed: 'Contexto utilizado:', diff --git a/packages/client/src/i18n/locales/zh.ts b/packages/client/src/i18n/locales/zh.ts index facde2d..d10a94d 100644 --- a/packages/client/src/i18n/locales/zh.ts +++ b/packages/client/src/i18n/locales/zh.ts @@ -128,6 +128,8 @@ export default { inputPlaceholder: '输入消息... (Enter 发送,Shift+Enter 换行)', attachFiles: '添加附件', autoPlaySpeech: '自动播放语音', + messageQueue: '消息队列', + removeQueuedMessage: '移除队列消息', stop: '停止', start: '启动', stopGateway: '停止网关', diff --git a/packages/client/src/stores/hermes/chat.ts b/packages/client/src/stores/hermes/chat.ts index f294203..f064435 100644 --- a/packages/client/src/stores/hermes/chat.ts +++ b/packages/client/src/stores/hermes/chat.ts @@ -39,6 +39,7 @@ export interface Message { // 2) 流式:由 reasoning.delta / thinking.delta / reasoning.available 事件累加 // 不含 包裹标签;内容自身可以为多段纯文本。 reasoning?: string + queued?: boolean } export interface Session { @@ -312,6 +313,10 @@ export const useChatStore = defineStore('chat', () => { const streamStates = ref void }>>(new Map()) /** sessionId → server-reported isWorking status */ const serverWorking = ref>(new Set()) + /** sessionId → queued message count */ + const queueLengths = ref>(new Map()) + /** sessionId → queued user messages not yet visible in the transcript */ + const queuedUserMessages = ref>(new Map()) // 自动播放语音开关 const autoPlaySpeechEnabled = ref(false) @@ -448,6 +453,11 @@ export const useChatStore = defineStore('chat', () => { } else { serverWorking.value.delete(sessionId) } + if (data.queueLength && data.queueLength > 0) { + queueLengths.value.set(sessionId, data.queueLength) + } else { + queueLengths.value.delete(sessionId) + } if ((data as any).isAborting) { setAbortState({ aborting: true, synced: null }) } else if (!data.isWorking) { @@ -568,6 +578,41 @@ export const useChatStore = defineStore('chat', () => { } } + function enqueueUserMessage(sessionId: string, message: Message) { + const queue = queuedUserMessages.value.get(sessionId) || [] + queue.push({ ...message, queued: true }) + queuedUserMessages.value.set(sessionId, queue) + } + + function removeQueuedMessage(sessionId: string, messageId: string) { + const queue = queuedUserMessages.value.get(sessionId) + if (!queue?.length) return + const next = queue.filter(message => message.id !== messageId) + if (next.length > 0) { + queuedUserMessages.value.set(sessionId, next) + } else { + queuedUserMessages.value.delete(sessionId) + } + queueLengths.value.set(sessionId, next.length) + getChatRunSocket()?.emit('cancel_queued_run', { + session_id: sessionId, + queue_id: messageId, + }) + } + + function showNextQueuedUserMessage(sessionId: string) { + const queue = queuedUserMessages.value.get(sessionId) + if (!queue?.length) return + const next = queue.shift()! + if (queue.length > 0) { + queuedUserMessages.value.set(sessionId, queue) + } else { + queuedUserMessages.value.delete(sessionId) + } + addMessage(sessionId, { ...next, queued: false }) + updateSessionTitle(sessionId) + } + function updateSessionTitle(sessionId: string) { const target = sessions.value.find(s => s.id === sessionId) if (!target) return @@ -596,7 +641,7 @@ export const useChatStore = defineStore('chat', () => { } async function sendMessage(content: string, attachments?: Attachment[]) { - if ((!content.trim() && !(attachments && attachments.length > 0)) || isStreaming.value) return + if ((!content.trim() && !(attachments && attachments.length > 0))) return primeCompletionBellIfEnabled() @@ -607,6 +652,7 @@ export const useChatStore = defineStore('chat', () => { // Capture session ID at send time — all callbacks use this, not activeSessionId const sid = activeSessionId.value! + const shouldQueue = isSessionLive(sid) const userMsg: Message = { id: uid(), @@ -614,12 +660,13 @@ export const useChatStore = defineStore('chat', () => { content: content.trim(), timestamp: Date.now(), attachments: attachments && attachments.length > 0 ? attachments : undefined, + queued: shouldQueue, } - addMessage(sid, userMsg) - - - updateSessionTitle(sid) + if (!shouldQueue) { + addMessage(sid, userMsg) + updateSessionTitle(sid) + } try { @@ -635,13 +682,20 @@ export const useChatStore = defineStore('chat', () => { const base = `/api/hermes/download?path=${encodeURIComponent(f.path)}&name=${encodeURIComponent(f.name)}` return [f.name, token ? `${base}&token=${encodeURIComponent(token)}` : base] })) - const msgs = getSessionMsgs(sid) - const lastUser = msgs.findLast(m => m.id === userMsg.id) - if (lastUser?.attachments) { - lastUser.attachments = lastUser.attachments.map(a => { + if (shouldQueue && userMsg.attachments) { + userMsg.attachments = userMsg.attachments.map(a => { const dl = urlMap.get(a.name) return dl ? { ...a, url: dl } : a }) + } else { + const msgs = getSessionMsgs(sid) + const lastUser = msgs.findLast(m => m.id === userMsg.id) + if (lastUser?.attachments) { + lastUser.attachments = lastUser.attachments.map(a => { + const dl = urlMap.get(a.name) + return dl ? { ...a, url: dl } : a + }) + } } // Build content blocks with uploaded file paths @@ -657,6 +711,11 @@ export const useChatStore = defineStore('chat', () => { input, session_id: sid, model: sessionModel || undefined, + queue_id: userMsg.id, + } + + if (shouldQueue) { + enqueueUserMessage(sid, userMsg) } // Helper to clean up this session's stream state @@ -665,15 +724,29 @@ export const useChatStore = defineStore('chat', () => { serverWorking.value.delete(sid) } - // Per-run flags used to detect silently-swallowed errors at run.completed. + // Per-active-run flags used to detect silently-swallowed errors at run.completed. // hermes-agent occasionally emits run.completed with empty output and no // usage when the agent layer caught an upstream error (e.g. invalid API // key). We need to distinguish: (a) run with assistant text produced, // (b) run with only tool activity, (c) run with truly nothing visible. - // Reset per send() call — closures captured by Socket.IO callbacks are scoped - // to this run, so there is no cross-run contamination. + // Reset on every run.started because one handler may span multiple queued runs. let runProducedAssistantText = false let runHadToolActivity = false + let activeAssistantMessageId: string | null = null + + const startNextQueuedUser = () => { + showNextQueuedUserMessage(sid) + } + + const closeStreamingAssistant = () => { + const msgs = getSessionMsgs(sid) + msgs.forEach(m => { + if (m.role === 'assistant' && m.isStreaming) { + updateMessage(sid, m.id, { isStreaming: false }) + } + }) + activeAssistantMessageId = null + } // Send run via Socket.IO and listen to streamed events — all closures capture `sid` const ctrl = startRunViaSocket( @@ -682,8 +755,23 @@ export const useChatStore = defineStore('chat', () => { (evt: RunEvent) => { switch (evt.event) { case 'run.started': + setAbortState(null) + runProducedAssistantText = false + runHadToolActivity = false + closeStreamingAssistant() + startNextQueuedUser() + if ((evt as any).queue_length > 0) { + queueLengths.value.set(sid, (evt as any).queue_length) + } else { + queueLengths.value.delete(sid) + } break + case 'run.queued': { + queueLengths.value.set(sid, (evt as any).queue_length || 0) + break + } + case 'compression.started': { setCompressionState({ compressing: true, @@ -720,6 +808,11 @@ export const useChatStore = defineStore('chat', () => { case 'abort.completed': { setAbortState({ aborting: false, synced: (evt as any).synced ?? false }) + if ((evt as any).queue_length > 0) { + queueLengths.value.set(sid, (evt as any).queue_length) + setAbortState(null) + break + } const msgs = getSessionMsgs(sid) const lastMsg = msgs[msgs.length - 1] if (lastMsg?.isStreaming) { @@ -744,7 +837,9 @@ export const useChatStore = defineStore('chat', () => { if (!text) break runProducedAssistantText = true const msgs = getSessionMsgs(sid) - const last = msgs[msgs.length - 1] + const last = activeAssistantMessageId + ? msgs.find(m => m.id === activeAssistantMessageId) + : null if (last?.role === 'assistant' && last.isStreaming) { last.reasoning = (last.reasoning || '') + text noteReasoningStart(last.id) @@ -758,6 +853,7 @@ export const useChatStore = defineStore('chat', () => { isStreaming: true, reasoning: text, }) + activeAssistantMessageId = newId noteReasoningStart(newId) } @@ -784,7 +880,9 @@ export const useChatStore = defineStore('chat', () => { case 'message.delta': { if (evt.delta) runProducedAssistantText = true const msgs = getSessionMsgs(sid) - const last = msgs[msgs.length - 1] + const last = activeAssistantMessageId + ? msgs.find(m => m.id === activeAssistantMessageId) + : null if (last?.role === 'assistant' && last.isStreaming) { const prev = last.content const next = prev + (evt.delta || '') @@ -803,6 +901,7 @@ export const useChatStore = defineStore('chat', () => { timestamp: Date.now(), isStreaming: true, }) + activeAssistantMessageId = newId } break @@ -811,10 +910,13 @@ export const useChatStore = defineStore('chat', () => { case 'tool.started': { runHadToolActivity = true const msgs = getSessionMsgs(sid) - const last = msgs[msgs.length - 1] + const last = activeAssistantMessageId + ? msgs.find(m => m.id === activeAssistantMessageId) + : msgs[msgs.length - 1] if (last?.isStreaming) { updateMessage(sid, last.id, { isStreaming: false }) } + activeAssistantMessageId = null addMessage(sid, { id: uid(), role: 'tool', @@ -850,7 +952,9 @@ export const useChatStore = defineStore('chat', () => { case 'run.completed': { const msgs = getSessionMsgs(sid) - const lastMsg = msgs[msgs.length - 1] + const lastMsg = activeAssistantMessageId + ? msgs.find(m => m.id === activeAssistantMessageId) + : msgs[msgs.length - 1] if (lastMsg?.isStreaming) { updateMessage(sid, lastMsg.id, { isStreaming: false }) } @@ -873,7 +977,9 @@ export const useChatStore = defineStore('chat', () => { if ((evt as any).parsed_content !== undefined) { // Backend has parsed stringified array format, update last assistant message const msgs = getSessionMsgs(sid) - const lastAssistant = [...msgs].reverse().find(m => m.role === 'assistant') + const lastAssistant = activeAssistantMessageId + ? msgs.find(m => m.id === activeAssistantMessageId) + : [...msgs].reverse().find(m => m.role === 'assistant') if (lastAssistant) { updateMessage(sid, lastAssistant.id, { content: (evt as any).parsed_content || '', @@ -936,7 +1042,12 @@ export const useChatStore = defineStore('chat', () => { } } - cleanup() + if ((evt as any).queue_remaining > 0) { + queueLengths.value.set(sid, (evt as any).queue_remaining) + } else { + cleanup() + } + activeAssistantMessageId = null updateSessionTitle(sid) break } @@ -963,7 +1074,11 @@ export const useChatStore = defineStore('chat', () => { msgs[i] = { ...m, toolStatus: 'error' } } }) - cleanup() + if ((evt as any).queue_remaining > 0) { + queueLengths.value.set(sid, (evt as any).queue_remaining) + } else { + cleanup() + } break } @@ -1033,6 +1148,7 @@ export const useChatStore = defineStore('chat', () => { let closed = false let runProducedAssistantText = false let runHadToolActivity = false + let activeAssistantMessageId: string | null = null const cleanup = () => { if (closed) return @@ -1043,13 +1159,42 @@ export const useChatStore = defineStore('chat', () => { unregisterSessionHandlers(sid) } + const startNextQueuedUser = () => { + showNextQueuedUserMessage(sid) + } + + const closeStreamingAssistant = () => { + const msgs = getSessionMsgs(sid) + msgs.forEach(m => { + if (m.role === 'assistant' && m.isStreaming) { + updateMessage(sid, m.id, { isStreaming: false }) + } + }) + activeAssistantMessageId = null + } + // Shared event handler — filters by session_id tag function handleEvent(evt: RunEvent) { if (closed) return // Filter events for this session (server tags all events with session_id) if (evt.session_id && evt.session_id !== sid) return switch (evt.event) { + case 'run.queued': { + queueLengths.value.set(sid, (evt as any).queue_length || 0) + break + } + case 'run.started': + setAbortState(null) + runProducedAssistantText = false + runHadToolActivity = false + closeStreamingAssistant() + startNextQueuedUser() + if ((evt as any).queue_length > 0) { + queueLengths.value.set(sid, (evt as any).queue_length) + } else { + queueLengths.value.delete(sid) + } break case 'compression.started': { @@ -1087,6 +1232,11 @@ export const useChatStore = defineStore('chat', () => { case 'abort.completed': { setAbortState({ aborting: false, synced: (evt as any).synced ?? false }) + if ((evt as any).queue_length > 0) { + queueLengths.value.set(sid, (evt as any).queue_length) + setAbortState(null) + break + } const msgs = getSessionMsgs(sid) const lastMsg = msgs[msgs.length - 1] if (lastMsg?.isStreaming) { @@ -1111,7 +1261,9 @@ export const useChatStore = defineStore('chat', () => { if (!text) break runProducedAssistantText = true const msgs = getSessionMsgs(sid) - const last = msgs[msgs.length - 1] + const last = activeAssistantMessageId + ? msgs.find(m => m.id === activeAssistantMessageId) + : null if (last?.role === 'assistant' && last.isStreaming) { last.reasoning = (last.reasoning || '') + text noteReasoningStart(last.id) @@ -1125,6 +1277,7 @@ export const useChatStore = defineStore('chat', () => { isStreaming: true, reasoning: text, }) + activeAssistantMessageId = newId noteReasoningStart(newId) } @@ -1144,7 +1297,9 @@ export const useChatStore = defineStore('chat', () => { case 'message.delta': { if (evt.delta) runProducedAssistantText = true const msgs = getSessionMsgs(sid) - const last = msgs[msgs.length - 1] + const last = activeAssistantMessageId + ? msgs.find(m => m.id === activeAssistantMessageId) + : null if (last?.role === 'assistant' && last.isStreaming) { const prev = last.content const next = prev + (evt.delta || '') @@ -1162,6 +1317,7 @@ export const useChatStore = defineStore('chat', () => { timestamp: Date.now(), isStreaming: true, }) + activeAssistantMessageId = newId } break @@ -1170,10 +1326,13 @@ export const useChatStore = defineStore('chat', () => { case 'tool.started': { runHadToolActivity = true const msgs = getSessionMsgs(sid) - const last = msgs[msgs.length - 1] + const last = activeAssistantMessageId + ? msgs.find(m => m.id === activeAssistantMessageId) + : msgs[msgs.length - 1] if (last?.isStreaming) { updateMessage(sid, last.id, { isStreaming: false }) } + activeAssistantMessageId = null addMessage(sid, { id: uid(), role: 'tool', @@ -1203,8 +1362,16 @@ export const useChatStore = defineStore('chat', () => { } case 'run.completed': { + const hasQueue = (evt as any).queue_remaining > 0 + if (hasQueue) { + queueLengths.value.set(sid, (evt as any).queue_remaining) + } else { + queueLengths.value.delete(sid) + } const msgs = getSessionMsgs(sid) - const lastMsg = msgs[msgs.length - 1] + const lastMsg = activeAssistantMessageId + ? msgs.find(m => m.id === activeAssistantMessageId) + : msgs[msgs.length - 1] if (lastMsg?.isStreaming) { updateMessage(sid, lastMsg.id, { isStreaming: false }) } @@ -1221,7 +1388,9 @@ export const useChatStore = defineStore('chat', () => { if ((evt as any).parsed_content !== undefined) { // Backend has parsed stringified array format, update last assistant message const msgs = getSessionMsgs(sid) - const lastAssistant = [...msgs].reverse().find(m => m.role === 'assistant') + const lastAssistant = activeAssistantMessageId + ? msgs.find(m => m.id === activeAssistantMessageId) + : [...msgs].reverse().find(m => m.role === 'assistant') if (lastAssistant) { updateMessage(sid, lastAssistant.id, { content: (evt as any).parsed_content || '', @@ -1258,12 +1427,35 @@ export const useChatStore = defineStore('chat', () => { playCompletionBellIfEnabled() } - cleanup() + // Auto-play speech for every completed assistant message + if (autoPlaySpeechEnabled.value) { + const msgs = getSessionMsgs(sid) + const lastAssistant = [...msgs].reverse().find(m => m.role === 'assistant') + if (lastAssistant?.content) { + setTimeout(() => { + playMessageSpeech(lastAssistant.id, lastAssistant.content) + }, 300) + } + } + + if (!hasQueue) { + cleanup() + activeAssistantMessageId = null + } else { + // More runs pending — reset for next run but don't cleanup + activeAssistantMessageId = null + } updateSessionTitle(sid) break } case 'run.failed': { + const hasQueue = (evt as any).queue_remaining > 0 + if (hasQueue) { + queueLengths.value.set(sid, (evt as any).queue_remaining) + } else { + queueLengths.value.delete(sid) + } const msgs = getSessionMsgs(sid) const lastErr = msgs[msgs.length - 1] if (lastErr?.isStreaming) { @@ -1285,7 +1477,9 @@ export const useChatStore = defineStore('chat', () => { msgs[i] = { ...m, toolStatus: 'error' } } }) - cleanup() + if (!hasQueue) { + cleanup() + } break } @@ -1316,6 +1510,7 @@ export const useChatStore = defineStore('chat', () => { onAbortStarted: (evt) => handleEvent(evt), onAbortCompleted: (evt) => handleEvent(evt), onUsageUpdated: (evt) => handleEvent(evt), + onRunQueued: (evt) => handleEvent(evt), }) // No need to emit resume here — switchSession already did it. @@ -1343,6 +1538,13 @@ export const useChatStore = defineStore('chat', () => { if (lastMsg?.isStreaming) { updateMessage(sid, lastMsg.id, { isStreaming: false }) } + window.setTimeout(() => { + if (activeSessionId.value === sid && abortState.value?.aborting) { + streamStates.value.delete(sid) + serverWorking.value.delete(sid) + setAbortState(null) + } + }, 20_000) } } @@ -1452,6 +1654,9 @@ export const useChatStore = defineStore('chat', () => { compressionState, abortState, isAborting, + queueLengths, + queuedUserMessages, + removeQueuedMessage, isLoadingSessions, sessionsLoaded, isLoadingMessages, diff --git a/packages/server/src/db/hermes/session-store.ts b/packages/server/src/db/hermes/session-store.ts index fff6bd8..2bb7dd3 100644 --- a/packages/server/src/db/hermes/session-store.ts +++ b/packages/server/src/db/hermes/session-store.ts @@ -318,7 +318,7 @@ export function getSessionDetail(id: string): HermesSessionDetailRow | null { const sessionRow = db.prepare(`SELECT * FROM ${SESSIONS_TABLE} WHERE id = ?`).get(id) as Record | undefined if (!sessionRow) return null const msgRows = db.prepare( - `SELECT * FROM ${MESSAGES_TABLE} WHERE session_id = ? ORDER BY timestamp, id`, + `SELECT * FROM ${MESSAGES_TABLE} WHERE session_id = ? ORDER BY id`, ).all(id) as Record[] const session = mapSessionRow(sessionRow) return { @@ -445,9 +445,10 @@ export function getSessionDetailPaginated( ).get(id) as { total: number } | undefined const total = countResult?.total || 0 - // Get paginated messages (newest first from DB, then reverse) + // Get paginated messages (newest first from DB, then reverse). + // Timestamp precision is mixed across message sources; id is insertion order. const msgRows = db.prepare( - `SELECT * FROM ${MESSAGES_TABLE} WHERE session_id = ? ORDER BY timestamp DESC, id DESC LIMIT ? OFFSET ?`, + `SELECT * FROM ${MESSAGES_TABLE} WHERE session_id = ? ORDER BY id DESC LIMIT ? OFFSET ?`, ).all(id, limit, offset) as Record[] const session = mapSessionRow(sessionRow) diff --git a/packages/server/src/services/hermes/chat-run-socket.ts b/packages/server/src/services/hermes/chat-run-socket.ts index a8e8d25..e35b014 100644 --- a/packages/server/src/services/hermes/chat-run-socket.ts +++ b/packages/server/src/services/hermes/chat-run-socket.ts @@ -149,6 +149,14 @@ interface SessionMessage { codex_reasoning_items?: string | null } +interface QueuedRun { + queue_id: string + input: string | ContentBlock[] + model?: string + instructions?: string + profile: string +} + interface SessionState { messages: SessionMessage[] isWorking: boolean @@ -160,6 +168,7 @@ interface SessionState { inputTokens?: number outputTokens?: number isAborting?: boolean + queue: QueuedRun[] } // --- ChatRunSocket --- @@ -202,14 +211,50 @@ export class ChatRunSocket { const profile = (socket.handshake.query?.profile as string) || 'default' socket.on('run', async (data: { - input: string + input: string | ContentBlock[] session_id?: string model?: string instructions?: string + queue_id?: string }) => { + if (data.session_id) { + const state = this.getOrCreateSession(data.session_id) + if (state.isWorking) { + state.queue.push({ + queue_id: data.queue_id || `queue_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}`, + input: data.input, + model: data.model, + instructions: data.instructions, + profile, + }) + this.nsp.to(`session:${data.session_id}`).emit('run.queued', { + event: 'run.queued', + session_id: data.session_id, + queue_length: state.queue.length, + }) + logger.info('[chat-run-socket] queued run for session %s (queue: %d)', data.session_id, state.queue.length) + return + } + } await this.handleRun(socket, data, profile) }) + socket.on('cancel_queued_run', (data: { session_id?: string; queue_id?: string }) => { + if (!data.session_id || !data.queue_id) return + const state = this.sessionMap.get(data.session_id) + if (!state?.queue.length) return + const before = state.queue.length + state.queue = state.queue.filter(item => item.queue_id !== data.queue_id) + if (state.queue.length === before) return + this.nsp.to(`session:${data.session_id}`).emit('run.queued', { + event: 'run.queued', + session_id: data.session_id, + queue_length: state.queue.length, + }) + logger.info('[chat-run-socket] cancelled queued run %s for session %s (queue: %d)', + data.queue_id, data.session_id, state.queue.length) + }) + socket.on('resume', async (data: { session_id?: string }) => { if (!data.session_id) return const sid = data.session_id @@ -366,43 +411,8 @@ export class ChatRunSocket { private async resumeSession(socket: Socket, sid: string) { let state = this.sessionMap.get(sid) if (!state) { - try { - const detail = useLocalSessionStore() - ? getSessionDetailPaginated(sid) - : await getSessionDetailFromDb(sid) - const messages = detail?.messages ? this.handleMessage(detail.messages, sid) : [] - // Calculate context tokens — aware of compression snapshot - - let inputTokens: number - let outputTokens: number - const snapshot = getCompressionSnapshot(sid) - if (snapshot) { - const newMessages = messages.slice(snapshot.lastMessageIndex + 1) - inputTokens = countTokens(SUMMARY_PREFIX + snapshot.summary) + - newMessages.filter(m => m.role === 'user').reduce((sum, m) => sum + countTokens(m.content || ''), 0) - outputTokens = newMessages - .filter(m => m.role === 'assistant' || m.role === 'tool') - .reduce((sum, m) => sum + countTokens(m.content || '') + countTokens(m.tool_calls + '' || ''), 0) - } else { - inputTokens = messages.filter(m => m.role === 'user').reduce((sum, m) => sum + countTokens(m.content || ''), 0) - outputTokens = messages - .filter(m => m.role === 'assistant' || m.role === 'tool') - .reduce((sum, m) => sum + countTokens(m.content || '') + countTokens(m.tool_calls + '' || ''), 0) - } - state = { - messages, - isWorking: false, - events: [], - inputTokens, - outputTokens, - } - this.sessionMap.set(sid, state) - logger.info('[chat-run-socket] loaded session %s from DB (%d messages)', sid, messages.length) - } catch (err) { - logger.warn(err, '[chat-run-socket] failed to load session %s from DB on resume', sid) - state = { messages: [], isWorking: false, events: [] } - this.sessionMap.set(sid, state) - } + state = await this.loadSessionStateFromDb(sid) + this.sessionMap.set(sid, state) } socket.emit('resumed', { session_id: sid, @@ -412,17 +422,58 @@ export class ChatRunSocket { events: state.isWorking ? state.events : [], inputTokens: state.inputTokens, outputTokens: state.outputTokens, + queueLength: state.queue?.length || 0, }) logger.info('[chat-run-socket] socket %s resumed session %s (working: %s, messages: %d)', socket.id, sid, state.isWorking, state.messages.length) } + + private async loadSessionStateFromDb(sid: string): Promise { + try { + const detail = useLocalSessionStore() + ? getSessionDetailPaginated(sid) + : await getSessionDetailFromDb(sid) + const messages = detail?.messages ? this.handleMessage(detail.messages, sid) : [] + + let inputTokens: number + let outputTokens: number + const snapshot = getCompressionSnapshot(sid) + if (snapshot) { + const newMessages = messages.slice(snapshot.lastMessageIndex + 1) + inputTokens = countTokens(SUMMARY_PREFIX + snapshot.summary) + + newMessages.filter(m => m.role === 'user').reduce((sum, m) => sum + countTokens(m.content || ''), 0) + outputTokens = newMessages + .filter(m => m.role === 'assistant' || m.role === 'tool') + .reduce((sum, m) => sum + countTokens(m.content || '') + countTokens(m.tool_calls + '' || ''), 0) + } else { + inputTokens = messages.filter(m => m.role === 'user').reduce((sum, m) => sum + countTokens(m.content || ''), 0) + outputTokens = messages + .filter(m => m.role === 'assistant' || m.role === 'tool') + .reduce((sum, m) => sum + countTokens(m.content || '') + countTokens(m.tool_calls + '' || ''), 0) + } + + logger.info('[chat-run-socket] loaded session %s from DB (%d messages)', sid, messages.length) + return { + messages, + isWorking: false, + events: [], + inputTokens, + outputTokens, + queue: [], + } + } catch (err) { + logger.warn(err, '[chat-run-socket] failed to load session %s from DB', sid) + return { messages: [], isWorking: false, events: [], queue: [] } + } + } // --- Run handler --- private async handleRun( socket: Socket, data: { input: string | ContentBlock[]; session_id?: string; model?: string; instructions?: string }, profile: string, + skipUserMessage = false, ) { const { input, session_id, model, instructions } = data const upstream = this.gatewayManager.getUpstream(profile).replace(/\/$/, '') @@ -436,36 +487,68 @@ export class ChatRunSocket { const now = Math.floor(Date.now() / 1000) // Mark working immediately on run start, and append user message if (session_id) { - const state = this.getOrCreateSession(session_id) + let state = this.sessionMap.get(session_id) + if (!state) { + state = getSession(session_id) + ? await this.loadSessionStateFromDb(session_id) + : { messages: [], isWorking: false, events: [], queue: [] } + this.sessionMap.set(session_id, state) + } this.hermesSessionIds.set(session_id, hermesSessionId) state.isWorking = true state.profile = profile - // Convert ContentBlock[] to string for storage - const inputStr = contentBlocksToString(input) - state.messages.push({ - id: state.messages.length + 1, - session_id, - role: 'user', - content: inputStr, - timestamp: now, - }) + if (!skipUserMessage) { + // Convert ContentBlock[] to string for storage + const inputStr = contentBlocksToString(input) + state.messages.push({ + id: state.messages.length + 1, + session_id, + hermesSessionId, + role: 'user', + content: inputStr, + timestamp: now, + }) - // Create session in local DB if it doesn't exist - if (!getSession(session_id)) { - const previewText = extractTextForPreview(input) - const preview = previewText.replace(/[\r\n]/g, ' ').substring(0, 100) - createSession({ id: session_id, profile, model, title: preview }) + // Create session in local DB if it doesn't exist + if (!getSession(session_id)) { + const previewText = extractTextForPreview(input) + const preview = previewText.replace(/[\r\n]/g, ' ').substring(0, 100) + createSession({ id: session_id, profile, model, title: preview }) + } + + // Write user message to local DB immediately + addMessage({ + session_id, + role: 'user', + content: inputStr, + timestamp: now, + }) + } else { + // Dequeued: write the user message into both memory and DB so the + // backend transcript keeps the same run boundary as the client. + const inputStr = contentBlocksToString(input) + state.messages.push({ + id: state.messages.length + 1, + session_id, + hermesSessionId, + role: 'user', + content: inputStr, + timestamp: now, + }) + if (!getSession(session_id)) { + const previewText = extractTextForPreview(input) + const preview = previewText.replace(/[\r\n]/g, ' ').substring(0, 100) + createSession({ id: session_id, profile, model, title: preview }) + } + addMessage({ + session_id, + role: 'user', + content: inputStr, + timestamp: now, + }) } - // Write user message to local DB immediately - addMessage({ - session_id, - role: 'user', - content: inputStr, - timestamp: now, - }) - socket.join(`session:${session_id}`) } @@ -817,16 +900,20 @@ export class ChatRunSocket { }) if (!res.ok) { const text = await res.text().catch(() => '') - emit('run.failed', { event: 'run.failed', error: `Upstream ${res.status}: ${text}` }) - if (session_id) this.markCompleted(socket, session_id, { event: 'run.failed' }) + const queueLen = session_id ? this.sessionMap.get(session_id)?.queue?.length ?? 0 : 0 + if (session_id) await this.markCompleted(socket, session_id, { event: 'run.failed' }) + emit('run.failed', { event: 'run.failed', error: `Upstream ${res.status}: ${text}`, queue_remaining: queueLen }) + if (session_id && queueLen > 0) this.dequeueNextQueuedRun(socket, session_id) return } const runData = await res.json() as any const runId = runData.run_id if (!runId) { - emit('run.failed', { event: 'run.failed', error: 'No run_id in upstream response' }) - if (session_id) this.markCompleted(socket, session_id, { event: 'run.failed' }) + const queueLen = session_id ? this.sessionMap.get(session_id)?.queue?.length ?? 0 : 0 + if (session_id) await this.markCompleted(socket, session_id, { event: 'run.failed' }) + emit('run.failed', { event: 'run.failed', error: 'No run_id in upstream response', queue_remaining: queueLen }) + if (session_id && queueLen > 0) this.dequeueNextQueuedRun(socket, session_id) return } @@ -842,7 +929,12 @@ export class ChatRunSocket { state.abortController = abortController } - emit('run.started', { event: 'run.started', run_id: runId, status: runData.status }) + emit('run.started', { + event: 'run.started', + run_id: runId, + status: runData.status, + queue_length: session_id ? this.sessionMap.get(session_id)?.queue.length || 0 : 0, + }) // Stream upstream events via EventSource — survives socket disconnect const eventsUrl = new URL(`${upstream}/v1/runs/${runId}/events`) @@ -865,7 +957,7 @@ export class ChatRunSocket { state.eventSource = source } - source.onmessage = (event: MessageEvent) => { + source.onmessage = async (event: MessageEvent) => { try { const parsed = JSON.parse(event.data as string) // Debug: log all events from upstream @@ -880,7 +972,7 @@ export class ChatRunSocket { const state = this.sessionMap.get(session_id) if (state) { const msgs = state.messages - const last = msgs[msgs.length - 1] + const last = [...msgs].reverse().find(m => m.hermesSessionId === hermesSessionId) switch (parsed.event) { case 'message.delta': { @@ -949,7 +1041,9 @@ export class ChatRunSocket { break } case 'tool.completed': { - const toolMsg = [...msgs].reverse().find(m => m.role === 'tool' && !m.content) + const toolMsg = [...msgs].reverse().find(m => + m.hermesSessionId === hermesSessionId && m.role === 'tool' && !m.content + ) if (toolMsg && parsed.output) { toolMsg.content = typeof parsed.output === 'string' ? parsed.output : JSON.stringify(parsed.output) } @@ -966,7 +1060,7 @@ export class ChatRunSocket { // Debug: log run.completed to check if reasoning is included logger.info('[chat-run-socket] run.completed keys: %s', Object.keys(parsed)) // Finalize assistant message — if no content was streamed, use output - if (parsed.output && !runProducedAssistantText(msgs)) { + if (parsed.output && !runProducedAssistantText(msgs, hermesSessionId)) { let outputContent = parsed.output // Parse output if it's a stringified array @@ -1020,7 +1114,8 @@ export class ChatRunSocket { // Only extract text content (tool_calls and reasoning are already in message fields) let parsedCount = 0 for (const msg of msgs) { - if (msg.role === 'assistant' && typeof msg.content === 'string' && + if (msg.hermesSessionId === hermesSessionId && + msg.role === 'assistant' && typeof msg.content === 'string' && msg.content.trim().startsWith('[') && msg.content.trim().endsWith(']')) { try { logger.info('[chat-run-socket] parsing array content for message %s, content preview: %s', @@ -1041,7 +1136,9 @@ export class ChatRunSocket { logger.info('[chat-run-socket] EXIT run.completed case, parsed %d messages', parsedCount) // Attach the last assistant message's parsed content to fix stringified array format - const lastAssistantMsg = msgs.filter((m: any) => m.role === 'assistant').pop() + const lastAssistantMsg = msgs.filter((m: any) => + m.hermesSessionId === hermesSessionId && m.role === 'assistant' + ).pop() if (lastAssistantMsg && parsedCount > 0) { parsed.parsed_content = lastAssistantMsg.content || '' parsed.parsed_tool_calls = lastAssistantMsg.tool_calls || null @@ -1065,7 +1162,15 @@ export class ChatRunSocket { }, '[chat-run-socket][abort] suppressing upstream terminal event during abort') return } - if (session_id) this.markCompleted(socket, session_id, { event: parsed.event, run_id: parsed.run_id }) + const queueLen = session_id ? this.sessionMap.get(session_id)?.queue?.length ?? 0 : 0 + if (session_id) await this.markCompleted(socket, session_id, { event: parsed.event, run_id: parsed.run_id }) + // Tag the event with queue_remaining so frontend knows more runs are pending + parsed.queue_remaining = queueLen + emit(parsed.event || 'message', parsed) + if (session_id && queueLen > 0) { + this.dequeueNextQueuedRun(socket, session_id) + } + return } // Usage will be calculated after syncFromHermes completes (in markCompleted) @@ -1080,12 +1185,26 @@ export class ChatRunSocket { logger.info({ sessionId: session_id }, '[chat-run-socket][abort] event source closed during abort') return } - emit('run.failed', { event: 'run.failed', error: 'EventSource connection lost' }) - if (session_id) this.markCompleted(socket, session_id, { event: 'run.failed' }) + const queueLen = session_id ? this.sessionMap.get(session_id)?.queue?.length ?? 0 : 0 + if (session_id) { + void this.markCompleted(socket, session_id, { event: 'run.failed' }).then(() => { + emit('run.failed', { event: 'run.failed', error: 'EventSource connection lost', queue_remaining: queueLen }) + if (queueLen > 0) this.dequeueNextQueuedRun(socket, session_id) + }) + } else { + emit('run.failed', { event: 'run.failed', error: 'EventSource connection lost' }) + } } } catch (err: any) { - emit('run.failed', { event: 'run.failed', error: err.message }) - if (session_id) this.markCompleted(socket, session_id, { event: 'run.failed' }) + const queueLen = session_id ? this.sessionMap.get(session_id)?.queue?.length ?? 0 : 0 + if (session_id) { + void this.markCompleted(socket, session_id, { event: 'run.failed' }).then(() => { + emit('run.failed', { event: 'run.failed', error: err.message, queue_remaining: queueLen }) + if (queueLen > 0) this.dequeueNextQueuedRun(socket, session_id) + }) + } else { + emit('run.failed', { event: 'run.failed', error: err.message }) + } } } @@ -1095,6 +1214,19 @@ export class ChatRunSocket { const state = this.sessionMap.get(sessionId) if (!state?.isWorking || !state.runId) { logger.info({ sessionId }, '[chat-run-socket][abort] ignored: no active run') + if (state) { + state.isWorking = false + state.isAborting = false + state.abortController = undefined + state.eventSource = undefined + state.runId = undefined + state.events = [] + } + this.emitToSession(socket, sessionId, 'abort.completed', { + event: 'abort.completed', + synced: false, + ignored: true, + }) return } @@ -1125,6 +1257,7 @@ export class ChatRunSocket { await fetch(`${upstream}/v1/runs/${runId}/stop`, { method: 'POST', headers, + signal: AbortSignal.timeout(10_000), }) logger.info('[chat-run-socket] called upstream stop for run %s (session: %s)', runId, sessionId) logger.info({ sessionId, runId, graceMs: 5000 }, '[chat-run-socket][abort] upstream stop accepted, waiting for graceful exit') @@ -1150,7 +1283,7 @@ export class ChatRunSocket { } /** Mark a session run as completed/failed so reconnecting clients get notified */ - private markCompleted(socket: Socket, sessionId: string, _info: { event: string; run_id?: string }) { + private async markCompleted(socket: Socket, sessionId: string, _info: { event: string; run_id?: string }) { const state = this.sessionMap.get(sessionId) if (state) { if (state.isAborting) { @@ -1171,11 +1304,32 @@ export class ChatRunSocket { const prof = state.profile this.hermesSessionIds.delete(sessionId) state.profile = undefined - void this.syncFromHermes(socket, sessionId, hermesId, prof) + await this.syncFromHermes(socket, sessionId, hermesId, prof) } + } } + private dequeueNextQueuedRun(socket: Socket, sessionId: string, fallbackProfile = 'default') { + const state = this.sessionMap.get(sessionId) + if (!state?.queue.length) return false + + const next = state.queue.shift()! + logger.info('[chat-run-socket] dequeuing queued run for session %s (remaining: %d)', sessionId, state.queue.length) + this.nsp.to(`session:${sessionId}`).emit('run.queued', { + event: 'run.queued', + session_id: sessionId, + queue_length: state.queue.length, + }) + void this.handleRun(socket, { + input: next.input, + session_id: sessionId, + model: next.model, + instructions: next.instructions, + }, next.profile || fallbackProfile, true) + return true + } + private async markAbortCompleted(socket: Socket, sessionId: string, runId: string) { const state = this.sessionMap.get(sessionId) if (!state) return @@ -1198,6 +1352,38 @@ export class ChatRunSocket { state.abortController = undefined state.eventSource = undefined state.runId = undefined + + // Process queued messages after abort completes + if (state.queue.length > 0) { + const next = state.queue.shift()! + logger.info('[chat-run-socket][abort] dequeuing queued run for session %s (remaining: %d)', sessionId, state.queue.length) + this.replaceState(sessionId, 'abort.completed', { + event: 'abort.completed', + run_id: runId, + synced, + queue_length: state.queue.length + 1, + }) + this.emitToSession(socket, sessionId, 'abort.completed', { + event: 'abort.completed', + run_id: runId, + synced, + queue_length: state.queue.length + 1, + }) + this.emitToSession(socket, sessionId, 'run.queued', { + event: 'run.queued', + queue_length: state.queue.length, + }) + state.events = [] + void this.handleRun(socket, { + input: next.input, + session_id: sessionId, + model: next.model, + instructions: next.instructions, + }, next.profile || profile || 'default', true) + return + } + + state.events = [] this.replaceState(sessionId, 'abort.completed', { event: 'abort.completed', run_id: runId, @@ -1208,7 +1394,6 @@ export class ChatRunSocket { run_id: runId, synced, }) - state.events = [] logger.info({ sessionId, runId, synced }, '[chat-run-socket][abort] completed') } @@ -1289,8 +1474,11 @@ export class ChatRunSocket { } if (!detail) return false - // Skip user messages — already written to local DB in handleRun + // Skip user messages for DB insert; they are already written in handleRun. + // Keep them in memory replacement so replacing an ephemeral run does not + // delete the queued user message from state.messages. const toInsert = detail.messages.filter(m => m.role !== 'user') + const toReplaceInMemory = detail.messages // Build tool_call_id → function.name lookup from assistant messages // (Hermes stores tool_name as NULL, name lives inside tool_calls JSON) @@ -1384,7 +1572,7 @@ export class ChatRunSocket { // Use inputTokens already set by compression path if available const state = this.sessionMap.get(localSessionId) if (state) { - const messages = this.handleMessage(toInsert, localSessionId) + const messages = this.handleMessage(toReplaceInMemory, localSessionId) if (messages.length > 0) { this.replaceByHermesSessionId(localSessionId, hermesSessionId, messages) } @@ -1425,6 +1613,10 @@ export class ChatRunSocket { // 没找到 if (start === -1) return + if (!newItems.some(item => item.role === 'user')) { + const existingUsers = msg.slice(start, end + 1).filter(item => item.role === 'user') + newItems = [...existingUsers, ...newItems] + } // 替换 msg.splice(start, end - start + 1, ...newItems) } @@ -1448,7 +1640,7 @@ export class ChatRunSocket { private getOrCreateSession(sessionId: string): SessionState { let state = this.sessionMap.get(sessionId) if (!state) { - state = { messages: [], isWorking: false, events: [] } + state = { messages: [], isWorking: false, events: [], queue: [] } this.sessionMap.set(sessionId, state) } return state @@ -1498,7 +1690,11 @@ export class ChatRunSocket { } } -/** Check if any assistant message in the list has non-empty content */ -function runProducedAssistantText(messages: SessionMessage[]): boolean { - return messages.some(m => m.role === 'assistant' && m.content?.trim()) +/** Check if the current ephemeral run has already produced assistant text. */ +function runProducedAssistantText(messages: SessionMessage[], hermesSessionId?: string): boolean { + return messages.some(m => + m.hermesSessionId === hermesSessionId && + m.role === 'assistant' && + !!m.content?.trim() + ) } diff --git a/packages/server/src/services/hermes/gateway-manager.ts b/packages/server/src/services/hermes/gateway-manager.ts index 803efb3..20245ee 100644 --- a/packages/server/src/services/hermes/gateway-manager.ts +++ b/packages/server/src/services/hermes/gateway-manager.ts @@ -74,9 +74,15 @@ function detectInitSystem(): string { // Linux 才检查 /proc if (platform === 'linux') { try { + if (existsSync('/.dockerenv') || existsSync('/run/.containerenv')) { + return 'container' + } + const comm = readFileSync('/proc/1/comm', 'utf-8').trim() - if (comm === 'systemd') return 'systemd' + if (comm === 'systemd') { + return existsSync('/run/systemd/system') ? 'systemd' : 'other' + } if (comm === 'init') return 'sysvinit' return 'other' @@ -223,13 +229,17 @@ export class GatewayManager { } /** 从 base 端口开始递增查找空闲端口(上限 65535) */ - private findFreePort(base: number, host = '127.0.0.1'): Promise { + private findFreePort(base: number, host = '127.0.0.1', reservedPorts = new Set()): Promise { return new Promise((resolve, reject) => { const tryPort = (port: number) => { if (port > 65535) { reject(new Error(`No free port found in range ${base}-65535`)) return } + if (reservedPorts.has(port)) { + tryPort(port + 1) + return + } const server = createServer() server.once('error', () => { server.close() @@ -318,7 +328,7 @@ export class GatewayManager { if (usedPorts.has(port)) { // 已管理端口冲突 → 找空闲端口 - const newPort = await this.findFreePort(port, host) + const newPort = await this.findFreePort(port, host, usedPorts) logger.info('Port %d is in use for profile "%s", reassigning to %d', port, name, newPort) this.writeProfilePort(name, newPort, host) port = newPort @@ -326,7 +336,7 @@ export class GatewayManager { // 检查系统级端口占用(外部进程) const available = await this.checkPortAvailable(port, host) if (!available) { - const newPort = await this.findFreePort(port, host) + const newPort = await this.findFreePort(port, host, usedPorts) logger.info('Port %d is occupied by another process for profile "%s", reassigning to %d', port, name, newPort) this.writeProfilePort(name, newPort, host) port = newPort