Restore synced queued message UI
This commit is contained in:
@@ -51,6 +51,16 @@ export interface RunEvent {
|
|||||||
session_id?: string
|
session_id?: string
|
||||||
/** Queue length from run.queued event */
|
/** Queue length from run.queued event */
|
||||||
queue_length?: number
|
queue_length?: number
|
||||||
|
/** Queue item that was just removed because it is starting now. */
|
||||||
|
dequeued_queue_id?: string
|
||||||
|
/** Queued user messages from run.queued/resume payloads. */
|
||||||
|
queued_messages?: Array<{
|
||||||
|
id?: string | number
|
||||||
|
role?: string
|
||||||
|
content?: string
|
||||||
|
timestamp?: number
|
||||||
|
queued?: boolean
|
||||||
|
}>
|
||||||
/** User message broadcast to other windows already watching the same session. */
|
/** User message broadcast to other windows already watching the same session. */
|
||||||
message?: {
|
message?: {
|
||||||
id?: string | number
|
id?: string | number
|
||||||
@@ -522,7 +532,7 @@ function removeSocketListener(socket: Socket, event: string, handler: (...args:
|
|||||||
*/
|
*/
|
||||||
export function resumeSession(
|
export function resumeSession(
|
||||||
sessionId: string,
|
sessionId: string,
|
||||||
onResumed: (data: { session_id: string; messages: any[]; isWorking: boolean; isAborting?: boolean; events: any[]; inputTokens?: number; outputTokens?: number; contextTokens?: number; queueLength?: number }) => void,
|
onResumed: (data: { session_id: string; messages: any[]; isWorking: boolean; isAborting?: boolean; events: any[]; inputTokens?: number; outputTokens?: number; contextTokens?: number; queueLength?: number; queueMessages?: RunEvent['queued_messages'] }) => void,
|
||||||
): Socket {
|
): Socket {
|
||||||
const socket = connectChatRun()
|
const socket = connectChatRun()
|
||||||
|
|
||||||
|
|||||||
@@ -538,6 +538,11 @@ export const useChatStore = defineStore('chat', () => {
|
|||||||
} else {
|
} else {
|
||||||
queueLengths.value.delete(sessionId)
|
queueLengths.value.delete(sessionId)
|
||||||
}
|
}
|
||||||
|
if (Array.isArray((data as any).queueMessages)) {
|
||||||
|
replaceQueuedUserMessages(sessionId, normalizeQueuedUserMessages((data as any).queueMessages))
|
||||||
|
} else if (!data.queueLength) {
|
||||||
|
replaceQueuedUserMessages(sessionId, [])
|
||||||
|
}
|
||||||
if ((data as any).isAborting) {
|
if ((data as any).isAborting) {
|
||||||
setAbortState({ aborting: true, synced: null })
|
setAbortState({ aborting: true, synced: null })
|
||||||
} else if (!data.isWorking) {
|
} else if (!data.isWorking) {
|
||||||
@@ -807,19 +812,23 @@ export const useChatStore = defineStore('chat', () => {
|
|||||||
|
|
||||||
function enqueueUserMessage(sessionId: string, message: Message) {
|
function enqueueUserMessage(sessionId: string, message: Message) {
|
||||||
const queue = queuedUserMessages.value.get(sessionId) || []
|
const queue = queuedUserMessages.value.get(sessionId) || []
|
||||||
queue.push({ ...message, queued: true })
|
if (queue.some(item => item.id === message.id)) return
|
||||||
queuedUserMessages.value.set(sessionId, queue)
|
const nextMap = new Map(queuedUserMessages.value)
|
||||||
|
nextMap.set(sessionId, [...queue, { ...message, queued: true }])
|
||||||
|
queuedUserMessages.value = nextMap
|
||||||
}
|
}
|
||||||
|
|
||||||
function removeQueuedMessage(sessionId: string, messageId: string) {
|
function removeQueuedMessage(sessionId: string, messageId: string) {
|
||||||
const queue = queuedUserMessages.value.get(sessionId)
|
const queue = queuedUserMessages.value.get(sessionId)
|
||||||
if (!queue?.length) return
|
if (!queue?.length) return
|
||||||
const next = queue.filter(message => message.id !== messageId)
|
const next = queue.filter(message => message.id !== messageId)
|
||||||
|
const nextMap = new Map(queuedUserMessages.value)
|
||||||
if (next.length > 0) {
|
if (next.length > 0) {
|
||||||
queuedUserMessages.value.set(sessionId, next)
|
nextMap.set(sessionId, next)
|
||||||
} else {
|
} else {
|
||||||
queuedUserMessages.value.delete(sessionId)
|
nextMap.delete(sessionId)
|
||||||
}
|
}
|
||||||
|
queuedUserMessages.value = nextMap
|
||||||
queueLengths.value.set(sessionId, next.length)
|
queueLengths.value.set(sessionId, next.length)
|
||||||
getChatRunSocket()?.emit('cancel_queued_run', {
|
getChatRunSocket()?.emit('cancel_queued_run', {
|
||||||
session_id: sessionId,
|
session_id: sessionId,
|
||||||
@@ -827,6 +836,85 @@ export const useChatStore = defineStore('chat', () => {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function normalizeQueuedUserMessages(rawMessages: unknown): Message[] {
|
||||||
|
if (!Array.isArray(rawMessages)) return []
|
||||||
|
return rawMessages.flatMap((raw) => {
|
||||||
|
const peer = raw as NonNullable<RunEvent['queued_messages']>[number]
|
||||||
|
const content = typeof peer?.content === 'string' ? peer.content : ''
|
||||||
|
const messageId = peer?.id != null ? String(peer.id) : ''
|
||||||
|
if (!messageId || !content.trim()) return []
|
||||||
|
const timestamp = typeof peer?.timestamp === 'number' && Number.isFinite(peer.timestamp)
|
||||||
|
? Math.round(peer.timestamp * 1000)
|
||||||
|
: Date.now()
|
||||||
|
return [{
|
||||||
|
id: messageId,
|
||||||
|
role: 'user' as const,
|
||||||
|
content,
|
||||||
|
timestamp,
|
||||||
|
queued: true,
|
||||||
|
}]
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
function replaceQueuedUserMessages(sessionId: string, messages: Message[]) {
|
||||||
|
const existingById = new Map((queuedUserMessages.value.get(sessionId) || []).map(message => [message.id, message]))
|
||||||
|
const merged = messages.map(message => ({
|
||||||
|
...(existingById.get(message.id) || {}),
|
||||||
|
...message,
|
||||||
|
attachments: existingById.get(message.id)?.attachments || message.attachments,
|
||||||
|
queued: true,
|
||||||
|
}))
|
||||||
|
const nextMap = new Map(queuedUserMessages.value)
|
||||||
|
if (merged.length > 0) {
|
||||||
|
nextMap.set(sessionId, merged)
|
||||||
|
} else {
|
||||||
|
nextMap.delete(sessionId)
|
||||||
|
}
|
||||||
|
queuedUserMessages.value = nextMap
|
||||||
|
}
|
||||||
|
|
||||||
|
function handleRunQueuedEvent(sessionId: string, evt: RunEvent) {
|
||||||
|
const queueLength = Number((evt as any).queue_length || 0)
|
||||||
|
if (queueLength > 0) {
|
||||||
|
queueLengths.value.set(sessionId, queueLength)
|
||||||
|
} else {
|
||||||
|
queueLengths.value.delete(sessionId)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (Array.isArray((evt as any).queued_messages) && !(evt as any).dequeued_queue_id) {
|
||||||
|
const queued = normalizeQueuedUserMessages((evt as any).queued_messages)
|
||||||
|
replaceQueuedUserMessages(sessionId, queued)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
const peer = evt.message
|
||||||
|
const content = typeof peer?.content === 'string' ? peer.content : ''
|
||||||
|
const messageId = peer?.id != null ? String(peer.id) : ''
|
||||||
|
if (!messageId || !content.trim()) return
|
||||||
|
|
||||||
|
if ((queuedUserMessages.value.get(sessionId) || []).some(msg => msg.id === messageId)) return
|
||||||
|
|
||||||
|
const timestamp = typeof peer?.timestamp === 'number' && Number.isFinite(peer.timestamp)
|
||||||
|
? Math.round(peer.timestamp * 1000)
|
||||||
|
: Date.now()
|
||||||
|
const msgs = getSessionMsgs(sessionId)
|
||||||
|
const existingIndex = msgs.findIndex(msg => msg.id === messageId && msg.role === 'user')
|
||||||
|
const existing = existingIndex >= 0 ? msgs[existingIndex] : null
|
||||||
|
if (existingIndex >= 0) {
|
||||||
|
msgs.splice(existingIndex, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
enqueueUserMessage(sessionId, {
|
||||||
|
...(existing || {}),
|
||||||
|
id: messageId,
|
||||||
|
role: 'user',
|
||||||
|
content,
|
||||||
|
timestamp: existing?.timestamp || timestamp,
|
||||||
|
attachments: existing?.attachments,
|
||||||
|
queued: true,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
function setPendingApproval(evt: RunEvent) {
|
function setPendingApproval(evt: RunEvent) {
|
||||||
const sid = evt.session_id
|
const sid = evt.session_id
|
||||||
const approvalId = (evt as any).approval_id as string | undefined
|
const approvalId = (evt as any).approval_id as string | undefined
|
||||||
@@ -869,12 +957,14 @@ export const useChatStore = defineStore('chat', () => {
|
|||||||
function showNextQueuedUserMessage(sessionId: string) {
|
function showNextQueuedUserMessage(sessionId: string) {
|
||||||
const queue = queuedUserMessages.value.get(sessionId)
|
const queue = queuedUserMessages.value.get(sessionId)
|
||||||
if (!queue?.length) return
|
if (!queue?.length) return
|
||||||
const next = queue.shift()!
|
const [next, ...rest] = queue
|
||||||
if (queue.length > 0) {
|
const nextMap = new Map(queuedUserMessages.value)
|
||||||
queuedUserMessages.value.set(sessionId, queue)
|
if (rest.length > 0) {
|
||||||
|
nextMap.set(sessionId, rest)
|
||||||
} else {
|
} else {
|
||||||
queuedUserMessages.value.delete(sessionId)
|
nextMap.delete(sessionId)
|
||||||
}
|
}
|
||||||
|
queuedUserMessages.value = nextMap
|
||||||
addMessage(sessionId, { ...next, queued: false })
|
addMessage(sessionId, { ...next, queued: false })
|
||||||
updateSessionTitle(sessionId)
|
updateSessionTitle(sessionId)
|
||||||
}
|
}
|
||||||
@@ -1054,7 +1144,7 @@ export const useChatStore = defineStore('chat', () => {
|
|||||||
break
|
break
|
||||||
|
|
||||||
case 'run.queued': {
|
case 'run.queued': {
|
||||||
queueLengths.value.set(sid, (evt as any).queue_length || 0)
|
handleRunQueuedEvent(sid, evt)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1492,7 +1582,7 @@ export const useChatStore = defineStore('chat', () => {
|
|||||||
if (evt.session_id && evt.session_id !== sid) return
|
if (evt.session_id && evt.session_id !== sid) return
|
||||||
switch (evt.event) {
|
switch (evt.event) {
|
||||||
case 'run.queued': {
|
case 'run.queued': {
|
||||||
queueLengths.value.set(sid, (evt as any).queue_length || 0)
|
handleRunQueuedEvent(sid, evt)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ import { handleBridgeRun } from './handle-bridge-run'
|
|||||||
import { handleAbort } from './abort'
|
import { handleAbort } from './abort'
|
||||||
import { getOrCreateSession } from './compression'
|
import { getOrCreateSession } from './compression'
|
||||||
import { handleSessionCommand, isSessionCommand, parseSessionCommand } from './session-command'
|
import { handleSessionCommand, isSessionCommand, parseSessionCommand } from './session-command'
|
||||||
|
import { contentBlocksToString } from './content-blocks'
|
||||||
import type { ContentBlock, QueuedRun, SessionState } from './types'
|
import type { ContentBlock, QueuedRun, SessionState } from './types'
|
||||||
import { authenticateUserToken, isAuthEnabled, type AuthenticatedUser } from '../../../middleware/user-auth'
|
import { authenticateUserToken, isAuthEnabled, type AuthenticatedUser } from '../../../middleware/user-auth'
|
||||||
import { userCanAccessProfile } from '../../../db/hermes/users-store'
|
import { userCanAccessProfile } from '../../../db/hermes/users-store'
|
||||||
@@ -162,6 +163,7 @@ export class ChatRunSocket {
|
|||||||
event: 'run.queued',
|
event: 'run.queued',
|
||||||
session_id: data.session_id,
|
session_id: data.session_id,
|
||||||
queue_length: state.queue.length,
|
queue_length: state.queue.length,
|
||||||
|
queued_messages: this.serializeQueuedMessages(state.queue),
|
||||||
})
|
})
|
||||||
logger.info('[chat-run-socket] queued run for session %s (queue: %d)', data.session_id, state.queue.length)
|
logger.info('[chat-run-socket] queued run for session %s (queue: %d)', data.session_id, state.queue.length)
|
||||||
return
|
return
|
||||||
@@ -199,6 +201,7 @@ export class ChatRunSocket {
|
|||||||
event: 'run.queued',
|
event: 'run.queued',
|
||||||
session_id: data.session_id,
|
session_id: data.session_id,
|
||||||
queue_length: state.queue.length,
|
queue_length: state.queue.length,
|
||||||
|
queued_messages: this.serializeQueuedMessages(state.queue),
|
||||||
})
|
})
|
||||||
logger.info('[chat-run-socket] cancelled queued run %s for session %s (queue: %d)',
|
logger.info('[chat-run-socket] cancelled queued run %s for session %s (queue: %d)',
|
||||||
data.queue_id, data.session_id, state.queue.length)
|
data.queue_id, data.session_id, state.queue.length)
|
||||||
@@ -308,6 +311,7 @@ export class ChatRunSocket {
|
|||||||
outputTokens: state.outputTokens,
|
outputTokens: state.outputTokens,
|
||||||
contextTokens: state.contextTokens,
|
contextTokens: state.contextTokens,
|
||||||
queueLength: state.queue?.length || 0,
|
queueLength: state.queue?.length || 0,
|
||||||
|
queueMessages: this.serializeQueuedMessages(state.queue || []),
|
||||||
})
|
})
|
||||||
|
|
||||||
logger.info('[chat-run-socket] socket %s resumed session %s (working: %s, messages: %d)',
|
logger.info('[chat-run-socket] socket %s resumed session %s (working: %s, messages: %d)',
|
||||||
@@ -326,6 +330,8 @@ export class ChatRunSocket {
|
|||||||
event: 'run.queued',
|
event: 'run.queued',
|
||||||
session_id: sessionId,
|
session_id: sessionId,
|
||||||
queue_length: state.queue.length,
|
queue_length: state.queue.length,
|
||||||
|
dequeued_queue_id: next.queue_id,
|
||||||
|
queued_messages: this.serializeQueuedMessages(state.queue),
|
||||||
})
|
})
|
||||||
this.runQueuedItem(socket, sessionId, next, fallbackProfile)
|
this.runQueuedItem(socket, sessionId, next, fallbackProfile)
|
||||||
return true
|
return true
|
||||||
@@ -355,6 +361,16 @@ export class ChatRunSocket {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private serializeQueuedMessages(queue: QueuedRun[]) {
|
||||||
|
return queue.map(item => ({
|
||||||
|
id: item.queue_id,
|
||||||
|
role: 'user',
|
||||||
|
content: contentBlocksToString(item.input),
|
||||||
|
timestamp: Math.floor(Date.now() / 1000),
|
||||||
|
queued: true,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
private canAccessProfile(user: AuthenticatedUser, profile: string): boolean {
|
private canAccessProfile(user: AuthenticatedUser, profile: string): boolean {
|
||||||
return user.role === 'super_admin' || userCanAccessProfile(user.id, profile)
|
return user.role === 'super_admin' || userCanAccessProfile(user.id, profile)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import { flushBridgePendingToDb } from './bridge-message'
|
|||||||
import { buildDbHistory, estimateSnapshotAwareHistoryUsage, forceCompressBridgeHistory, getOrCreateSession, replaceState } from './compression'
|
import { buildDbHistory, estimateSnapshotAwareHistoryUsage, forceCompressBridgeHistory, getOrCreateSession, replaceState } from './compression'
|
||||||
import { handleAbort } from './abort'
|
import { handleAbort } from './abort'
|
||||||
import { calcAndUpdateUsage, contextTokensWithCachedOverhead, updateMessageContextTokenUsage } from './usage'
|
import { calcAndUpdateUsage, contextTokensWithCachedOverhead, updateMessageContextTokenUsage } from './usage'
|
||||||
|
import { contentBlocksToString } from './content-blocks'
|
||||||
import type { ContentBlock, QueuedRun, SessionState } from './types'
|
import type { ContentBlock, QueuedRun, SessionState } from './types'
|
||||||
|
|
||||||
type CommandName =
|
type CommandName =
|
||||||
@@ -150,8 +151,9 @@ export async function handleSessionCommand(
|
|||||||
emitCommand({ ok: false, action: 'queue', message: 'Session is idle. Send the message normally instead.' })
|
emitCommand({ ok: false, action: 'queue', message: 'Session is idle. Send the message normally instead.' })
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
const queueId = `queue_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}`
|
||||||
state.queue.push({
|
state.queue.push({
|
||||||
queue_id: `queue_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}`,
|
queue_id: queueId,
|
||||||
input: command.args,
|
input: command.args,
|
||||||
model: ctx.model,
|
model: ctx.model,
|
||||||
instructions: ctx.instructions,
|
instructions: ctx.instructions,
|
||||||
@@ -163,6 +165,13 @@ export async function handleSessionCommand(
|
|||||||
event: 'run.queued',
|
event: 'run.queued',
|
||||||
session_id: sessionId,
|
session_id: sessionId,
|
||||||
queue_length: state.queue.length,
|
queue_length: state.queue.length,
|
||||||
|
queued_messages: state.queue.map(item => ({
|
||||||
|
id: item.queue_id,
|
||||||
|
role: 'user',
|
||||||
|
content: contentBlocksToString(item.input),
|
||||||
|
timestamp: Math.floor(Date.now() / 1000),
|
||||||
|
queued: true,
|
||||||
|
})),
|
||||||
})
|
})
|
||||||
emitCommand({
|
emitCommand({
|
||||||
action: 'queue',
|
action: 'queue',
|
||||||
|
|||||||
Reference in New Issue
Block a user