feat: add message queue for sequential run processing (#501)

Allow sending multiple messages while a run is active. Messages are
queued on the server and processed sequentially after each run
completes. Each completed assistant message triggers speech playback
independently, and the UI shows queue status with a badge indicator.

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
ekko
2026-05-07 10:34:58 +08:00
committed by GitHub
parent 5df8734495
commit 424125843f
17 changed files with 964 additions and 181 deletions
@@ -149,6 +149,14 @@ interface SessionMessage {
codex_reasoning_items?: string | null
}
interface QueuedRun {
queue_id: string
input: string | ContentBlock[]
model?: string
instructions?: string
profile: string
}
interface SessionState {
messages: SessionMessage[]
isWorking: boolean
@@ -160,6 +168,7 @@ interface SessionState {
inputTokens?: number
outputTokens?: number
isAborting?: boolean
queue: QueuedRun[]
}
// --- ChatRunSocket ---
@@ -202,14 +211,50 @@ export class ChatRunSocket {
const profile = (socket.handshake.query?.profile as string) || 'default'
socket.on('run', async (data: {
input: string
input: string | ContentBlock[]
session_id?: string
model?: string
instructions?: string
queue_id?: string
}) => {
if (data.session_id) {
const state = this.getOrCreateSession(data.session_id)
if (state.isWorking) {
state.queue.push({
queue_id: data.queue_id || `queue_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}`,
input: data.input,
model: data.model,
instructions: data.instructions,
profile,
})
this.nsp.to(`session:${data.session_id}`).emit('run.queued', {
event: 'run.queued',
session_id: data.session_id,
queue_length: state.queue.length,
})
logger.info('[chat-run-socket] queued run for session %s (queue: %d)', data.session_id, state.queue.length)
return
}
}
await this.handleRun(socket, data, profile)
})
socket.on('cancel_queued_run', (data: { session_id?: string; queue_id?: string }) => {
if (!data.session_id || !data.queue_id) return
const state = this.sessionMap.get(data.session_id)
if (!state?.queue.length) return
const before = state.queue.length
state.queue = state.queue.filter(item => item.queue_id !== data.queue_id)
if (state.queue.length === before) return
this.nsp.to(`session:${data.session_id}`).emit('run.queued', {
event: 'run.queued',
session_id: data.session_id,
queue_length: state.queue.length,
})
logger.info('[chat-run-socket] cancelled queued run %s for session %s (queue: %d)',
data.queue_id, data.session_id, state.queue.length)
})
socket.on('resume', async (data: { session_id?: string }) => {
if (!data.session_id) return
const sid = data.session_id
@@ -366,43 +411,8 @@ export class ChatRunSocket {
private async resumeSession(socket: Socket, sid: string) {
let state = this.sessionMap.get(sid)
if (!state) {
try {
const detail = useLocalSessionStore()
? getSessionDetailPaginated(sid)
: await getSessionDetailFromDb(sid)
const messages = detail?.messages ? this.handleMessage(detail.messages, sid) : []
// Calculate context tokens — aware of compression snapshot
let inputTokens: number
let outputTokens: number
const snapshot = getCompressionSnapshot(sid)
if (snapshot) {
const newMessages = messages.slice(snapshot.lastMessageIndex + 1)
inputTokens = countTokens(SUMMARY_PREFIX + snapshot.summary) +
newMessages.filter(m => m.role === 'user').reduce((sum, m) => sum + countTokens(m.content || ''), 0)
outputTokens = newMessages
.filter(m => m.role === 'assistant' || m.role === 'tool')
.reduce((sum, m) => sum + countTokens(m.content || '') + countTokens(m.tool_calls + '' || ''), 0)
} else {
inputTokens = messages.filter(m => m.role === 'user').reduce((sum, m) => sum + countTokens(m.content || ''), 0)
outputTokens = messages
.filter(m => m.role === 'assistant' || m.role === 'tool')
.reduce((sum, m) => sum + countTokens(m.content || '') + countTokens(m.tool_calls + '' || ''), 0)
}
state = {
messages,
isWorking: false,
events: [],
inputTokens,
outputTokens,
}
this.sessionMap.set(sid, state)
logger.info('[chat-run-socket] loaded session %s from DB (%d messages)', sid, messages.length)
} catch (err) {
logger.warn(err, '[chat-run-socket] failed to load session %s from DB on resume', sid)
state = { messages: [], isWorking: false, events: [] }
this.sessionMap.set(sid, state)
}
state = await this.loadSessionStateFromDb(sid)
this.sessionMap.set(sid, state)
}
socket.emit('resumed', {
session_id: sid,
@@ -412,17 +422,58 @@ export class ChatRunSocket {
events: state.isWorking ? state.events : [],
inputTokens: state.inputTokens,
outputTokens: state.outputTokens,
queueLength: state.queue?.length || 0,
})
logger.info('[chat-run-socket] socket %s resumed session %s (working: %s, messages: %d)',
socket.id, sid, state.isWorking, state.messages.length)
}
private async loadSessionStateFromDb(sid: string): Promise<SessionState> {
try {
const detail = useLocalSessionStore()
? getSessionDetailPaginated(sid)
: await getSessionDetailFromDb(sid)
const messages = detail?.messages ? this.handleMessage(detail.messages, sid) : []
let inputTokens: number
let outputTokens: number
const snapshot = getCompressionSnapshot(sid)
if (snapshot) {
const newMessages = messages.slice(snapshot.lastMessageIndex + 1)
inputTokens = countTokens(SUMMARY_PREFIX + snapshot.summary) +
newMessages.filter(m => m.role === 'user').reduce((sum, m) => sum + countTokens(m.content || ''), 0)
outputTokens = newMessages
.filter(m => m.role === 'assistant' || m.role === 'tool')
.reduce((sum, m) => sum + countTokens(m.content || '') + countTokens(m.tool_calls + '' || ''), 0)
} else {
inputTokens = messages.filter(m => m.role === 'user').reduce((sum, m) => sum + countTokens(m.content || ''), 0)
outputTokens = messages
.filter(m => m.role === 'assistant' || m.role === 'tool')
.reduce((sum, m) => sum + countTokens(m.content || '') + countTokens(m.tool_calls + '' || ''), 0)
}
logger.info('[chat-run-socket] loaded session %s from DB (%d messages)', sid, messages.length)
return {
messages,
isWorking: false,
events: [],
inputTokens,
outputTokens,
queue: [],
}
} catch (err) {
logger.warn(err, '[chat-run-socket] failed to load session %s from DB', sid)
return { messages: [], isWorking: false, events: [], queue: [] }
}
}
// --- Run handler ---
private async handleRun(
socket: Socket,
data: { input: string | ContentBlock[]; session_id?: string; model?: string; instructions?: string },
profile: string,
skipUserMessage = false,
) {
const { input, session_id, model, instructions } = data
const upstream = this.gatewayManager.getUpstream(profile).replace(/\/$/, '')
@@ -436,36 +487,68 @@ export class ChatRunSocket {
const now = Math.floor(Date.now() / 1000)
// Mark working immediately on run start, and append user message
if (session_id) {
const state = this.getOrCreateSession(session_id)
let state = this.sessionMap.get(session_id)
if (!state) {
state = getSession(session_id)
? await this.loadSessionStateFromDb(session_id)
: { messages: [], isWorking: false, events: [], queue: [] }
this.sessionMap.set(session_id, state)
}
this.hermesSessionIds.set(session_id, hermesSessionId)
state.isWorking = true
state.profile = profile
// Convert ContentBlock[] to string for storage
const inputStr = contentBlocksToString(input)
state.messages.push({
id: state.messages.length + 1,
session_id,
role: 'user',
content: inputStr,
timestamp: now,
})
if (!skipUserMessage) {
// Convert ContentBlock[] to string for storage
const inputStr = contentBlocksToString(input)
state.messages.push({
id: state.messages.length + 1,
session_id,
hermesSessionId,
role: 'user',
content: inputStr,
timestamp: now,
})
// Create session in local DB if it doesn't exist
if (!getSession(session_id)) {
const previewText = extractTextForPreview(input)
const preview = previewText.replace(/[\r\n]/g, ' ').substring(0, 100)
createSession({ id: session_id, profile, model, title: preview })
// Create session in local DB if it doesn't exist
if (!getSession(session_id)) {
const previewText = extractTextForPreview(input)
const preview = previewText.replace(/[\r\n]/g, ' ').substring(0, 100)
createSession({ id: session_id, profile, model, title: preview })
}
// Write user message to local DB immediately
addMessage({
session_id,
role: 'user',
content: inputStr,
timestamp: now,
})
} else {
// Dequeued: write the user message into both memory and DB so the
// backend transcript keeps the same run boundary as the client.
const inputStr = contentBlocksToString(input)
state.messages.push({
id: state.messages.length + 1,
session_id,
hermesSessionId,
role: 'user',
content: inputStr,
timestamp: now,
})
if (!getSession(session_id)) {
const previewText = extractTextForPreview(input)
const preview = previewText.replace(/[\r\n]/g, ' ').substring(0, 100)
createSession({ id: session_id, profile, model, title: preview })
}
addMessage({
session_id,
role: 'user',
content: inputStr,
timestamp: now,
})
}
// Write user message to local DB immediately
addMessage({
session_id,
role: 'user',
content: inputStr,
timestamp: now,
})
socket.join(`session:${session_id}`)
}
@@ -817,16 +900,20 @@ export class ChatRunSocket {
})
if (!res.ok) {
const text = await res.text().catch(() => '')
emit('run.failed', { event: 'run.failed', error: `Upstream ${res.status}: ${text}` })
if (session_id) this.markCompleted(socket, session_id, { event: 'run.failed' })
const queueLen = session_id ? this.sessionMap.get(session_id)?.queue?.length ?? 0 : 0
if (session_id) await this.markCompleted(socket, session_id, { event: 'run.failed' })
emit('run.failed', { event: 'run.failed', error: `Upstream ${res.status}: ${text}`, queue_remaining: queueLen })
if (session_id && queueLen > 0) this.dequeueNextQueuedRun(socket, session_id)
return
}
const runData = await res.json() as any
const runId = runData.run_id
if (!runId) {
emit('run.failed', { event: 'run.failed', error: 'No run_id in upstream response' })
if (session_id) this.markCompleted(socket, session_id, { event: 'run.failed' })
const queueLen = session_id ? this.sessionMap.get(session_id)?.queue?.length ?? 0 : 0
if (session_id) await this.markCompleted(socket, session_id, { event: 'run.failed' })
emit('run.failed', { event: 'run.failed', error: 'No run_id in upstream response', queue_remaining: queueLen })
if (session_id && queueLen > 0) this.dequeueNextQueuedRun(socket, session_id)
return
}
@@ -842,7 +929,12 @@ export class ChatRunSocket {
state.abortController = abortController
}
emit('run.started', { event: 'run.started', run_id: runId, status: runData.status })
emit('run.started', {
event: 'run.started',
run_id: runId,
status: runData.status,
queue_length: session_id ? this.sessionMap.get(session_id)?.queue.length || 0 : 0,
})
// Stream upstream events via EventSource — survives socket disconnect
const eventsUrl = new URL(`${upstream}/v1/runs/${runId}/events`)
@@ -865,7 +957,7 @@ export class ChatRunSocket {
state.eventSource = source
}
source.onmessage = (event: MessageEvent) => {
source.onmessage = async (event: MessageEvent) => {
try {
const parsed = JSON.parse(event.data as string)
// Debug: log all events from upstream
@@ -880,7 +972,7 @@ export class ChatRunSocket {
const state = this.sessionMap.get(session_id)
if (state) {
const msgs = state.messages
const last = msgs[msgs.length - 1]
const last = [...msgs].reverse().find(m => m.hermesSessionId === hermesSessionId)
switch (parsed.event) {
case 'message.delta': {
@@ -949,7 +1041,9 @@ export class ChatRunSocket {
break
}
case 'tool.completed': {
const toolMsg = [...msgs].reverse().find(m => m.role === 'tool' && !m.content)
const toolMsg = [...msgs].reverse().find(m =>
m.hermesSessionId === hermesSessionId && m.role === 'tool' && !m.content
)
if (toolMsg && parsed.output) {
toolMsg.content = typeof parsed.output === 'string' ? parsed.output : JSON.stringify(parsed.output)
}
@@ -966,7 +1060,7 @@ export class ChatRunSocket {
// Debug: log run.completed to check if reasoning is included
logger.info('[chat-run-socket] run.completed keys: %s', Object.keys(parsed))
// Finalize assistant message — if no content was streamed, use output
if (parsed.output && !runProducedAssistantText(msgs)) {
if (parsed.output && !runProducedAssistantText(msgs, hermesSessionId)) {
let outputContent = parsed.output
// Parse output if it's a stringified array
@@ -1020,7 +1114,8 @@ export class ChatRunSocket {
// Only extract text content (tool_calls and reasoning are already in message fields)
let parsedCount = 0
for (const msg of msgs) {
if (msg.role === 'assistant' && typeof msg.content === 'string' &&
if (msg.hermesSessionId === hermesSessionId &&
msg.role === 'assistant' && typeof msg.content === 'string' &&
msg.content.trim().startsWith('[') && msg.content.trim().endsWith(']')) {
try {
logger.info('[chat-run-socket] parsing array content for message %s, content preview: %s',
@@ -1041,7 +1136,9 @@ export class ChatRunSocket {
logger.info('[chat-run-socket] EXIT run.completed case, parsed %d messages', parsedCount)
// Attach the last assistant message's parsed content to fix stringified array format
const lastAssistantMsg = msgs.filter((m: any) => m.role === 'assistant').pop()
const lastAssistantMsg = msgs.filter((m: any) =>
m.hermesSessionId === hermesSessionId && m.role === 'assistant'
).pop()
if (lastAssistantMsg && parsedCount > 0) {
parsed.parsed_content = lastAssistantMsg.content || ''
parsed.parsed_tool_calls = lastAssistantMsg.tool_calls || null
@@ -1065,7 +1162,15 @@ export class ChatRunSocket {
}, '[chat-run-socket][abort] suppressing upstream terminal event during abort')
return
}
if (session_id) this.markCompleted(socket, session_id, { event: parsed.event, run_id: parsed.run_id })
const queueLen = session_id ? this.sessionMap.get(session_id)?.queue?.length ?? 0 : 0
if (session_id) await this.markCompleted(socket, session_id, { event: parsed.event, run_id: parsed.run_id })
// Tag the event with queue_remaining so frontend knows more runs are pending
parsed.queue_remaining = queueLen
emit(parsed.event || 'message', parsed)
if (session_id && queueLen > 0) {
this.dequeueNextQueuedRun(socket, session_id)
}
return
}
// Usage will be calculated after syncFromHermes completes (in markCompleted)
@@ -1080,12 +1185,26 @@ export class ChatRunSocket {
logger.info({ sessionId: session_id }, '[chat-run-socket][abort] event source closed during abort')
return
}
emit('run.failed', { event: 'run.failed', error: 'EventSource connection lost' })
if (session_id) this.markCompleted(socket, session_id, { event: 'run.failed' })
const queueLen = session_id ? this.sessionMap.get(session_id)?.queue?.length ?? 0 : 0
if (session_id) {
void this.markCompleted(socket, session_id, { event: 'run.failed' }).then(() => {
emit('run.failed', { event: 'run.failed', error: 'EventSource connection lost', queue_remaining: queueLen })
if (queueLen > 0) this.dequeueNextQueuedRun(socket, session_id)
})
} else {
emit('run.failed', { event: 'run.failed', error: 'EventSource connection lost' })
}
}
} catch (err: any) {
emit('run.failed', { event: 'run.failed', error: err.message })
if (session_id) this.markCompleted(socket, session_id, { event: 'run.failed' })
const queueLen = session_id ? this.sessionMap.get(session_id)?.queue?.length ?? 0 : 0
if (session_id) {
void this.markCompleted(socket, session_id, { event: 'run.failed' }).then(() => {
emit('run.failed', { event: 'run.failed', error: err.message, queue_remaining: queueLen })
if (queueLen > 0) this.dequeueNextQueuedRun(socket, session_id)
})
} else {
emit('run.failed', { event: 'run.failed', error: err.message })
}
}
}
@@ -1095,6 +1214,19 @@ export class ChatRunSocket {
const state = this.sessionMap.get(sessionId)
if (!state?.isWorking || !state.runId) {
logger.info({ sessionId }, '[chat-run-socket][abort] ignored: no active run')
if (state) {
state.isWorking = false
state.isAborting = false
state.abortController = undefined
state.eventSource = undefined
state.runId = undefined
state.events = []
}
this.emitToSession(socket, sessionId, 'abort.completed', {
event: 'abort.completed',
synced: false,
ignored: true,
})
return
}
@@ -1125,6 +1257,7 @@ export class ChatRunSocket {
await fetch(`${upstream}/v1/runs/${runId}/stop`, {
method: 'POST',
headers,
signal: AbortSignal.timeout(10_000),
})
logger.info('[chat-run-socket] called upstream stop for run %s (session: %s)', runId, sessionId)
logger.info({ sessionId, runId, graceMs: 5000 }, '[chat-run-socket][abort] upstream stop accepted, waiting for graceful exit')
@@ -1150,7 +1283,7 @@ export class ChatRunSocket {
}
/** Mark a session run as completed/failed so reconnecting clients get notified */
private markCompleted(socket: Socket, sessionId: string, _info: { event: string; run_id?: string }) {
private async markCompleted(socket: Socket, sessionId: string, _info: { event: string; run_id?: string }) {
const state = this.sessionMap.get(sessionId)
if (state) {
if (state.isAborting) {
@@ -1171,11 +1304,32 @@ export class ChatRunSocket {
const prof = state.profile
this.hermesSessionIds.delete(sessionId)
state.profile = undefined
void this.syncFromHermes(socket, sessionId, hermesId, prof)
await this.syncFromHermes(socket, sessionId, hermesId, prof)
}
}
}
private dequeueNextQueuedRun(socket: Socket, sessionId: string, fallbackProfile = 'default') {
const state = this.sessionMap.get(sessionId)
if (!state?.queue.length) return false
const next = state.queue.shift()!
logger.info('[chat-run-socket] dequeuing queued run for session %s (remaining: %d)', sessionId, state.queue.length)
this.nsp.to(`session:${sessionId}`).emit('run.queued', {
event: 'run.queued',
session_id: sessionId,
queue_length: state.queue.length,
})
void this.handleRun(socket, {
input: next.input,
session_id: sessionId,
model: next.model,
instructions: next.instructions,
}, next.profile || fallbackProfile, true)
return true
}
private async markAbortCompleted(socket: Socket, sessionId: string, runId: string) {
const state = this.sessionMap.get(sessionId)
if (!state) return
@@ -1198,6 +1352,38 @@ export class ChatRunSocket {
state.abortController = undefined
state.eventSource = undefined
state.runId = undefined
// Process queued messages after abort completes
if (state.queue.length > 0) {
const next = state.queue.shift()!
logger.info('[chat-run-socket][abort] dequeuing queued run for session %s (remaining: %d)', sessionId, state.queue.length)
this.replaceState(sessionId, 'abort.completed', {
event: 'abort.completed',
run_id: runId,
synced,
queue_length: state.queue.length + 1,
})
this.emitToSession(socket, sessionId, 'abort.completed', {
event: 'abort.completed',
run_id: runId,
synced,
queue_length: state.queue.length + 1,
})
this.emitToSession(socket, sessionId, 'run.queued', {
event: 'run.queued',
queue_length: state.queue.length,
})
state.events = []
void this.handleRun(socket, {
input: next.input,
session_id: sessionId,
model: next.model,
instructions: next.instructions,
}, next.profile || profile || 'default', true)
return
}
state.events = []
this.replaceState(sessionId, 'abort.completed', {
event: 'abort.completed',
run_id: runId,
@@ -1208,7 +1394,6 @@ export class ChatRunSocket {
run_id: runId,
synced,
})
state.events = []
logger.info({ sessionId, runId, synced }, '[chat-run-socket][abort] completed')
}
@@ -1289,8 +1474,11 @@ export class ChatRunSocket {
}
if (!detail) return false
// Skip user messages already written to local DB in handleRun
// Skip user messages for DB insert; they are already written in handleRun.
// Keep them in memory replacement so replacing an ephemeral run does not
// delete the queued user message from state.messages.
const toInsert = detail.messages.filter(m => m.role !== 'user')
const toReplaceInMemory = detail.messages
// Build tool_call_id → function.name lookup from assistant messages
// (Hermes stores tool_name as NULL, name lives inside tool_calls JSON)
@@ -1384,7 +1572,7 @@ export class ChatRunSocket {
// Use inputTokens already set by compression path if available
const state = this.sessionMap.get(localSessionId)
if (state) {
const messages = this.handleMessage(toInsert, localSessionId)
const messages = this.handleMessage(toReplaceInMemory, localSessionId)
if (messages.length > 0) {
this.replaceByHermesSessionId(localSessionId, hermesSessionId, messages)
}
@@ -1425,6 +1613,10 @@ export class ChatRunSocket {
// 没找到
if (start === -1) return
if (!newItems.some(item => item.role === 'user')) {
const existingUsers = msg.slice(start, end + 1).filter(item => item.role === 'user')
newItems = [...existingUsers, ...newItems]
}
// 替换
msg.splice(start, end - start + 1, ...newItems)
}
@@ -1448,7 +1640,7 @@ export class ChatRunSocket {
private getOrCreateSession(sessionId: string): SessionState {
let state = this.sessionMap.get(sessionId)
if (!state) {
state = { messages: [], isWorking: false, events: [] }
state = { messages: [], isWorking: false, events: [], queue: [] }
this.sessionMap.set(sessionId, state)
}
return state
@@ -1498,7 +1690,11 @@ export class ChatRunSocket {
}
}
/** Check if any assistant message in the list has non-empty content */
function runProducedAssistantText(messages: SessionMessage[]): boolean {
return messages.some(m => m.role === 'assistant' && m.content?.trim())
/** Check if the current ephemeral run has already produced assistant text. */
function runProducedAssistantText(messages: SessionMessage[], hermesSessionId?: string): boolean {
return messages.some(m =>
m.hermesSessionId === hermesSessionId &&
m.role === 'assistant' &&
!!m.content?.trim()
)
}
@@ -74,9 +74,15 @@ function detectInitSystem(): string {
// Linux 才检查 /proc
if (platform === 'linux') {
try {
if (existsSync('/.dockerenv') || existsSync('/run/.containerenv')) {
return 'container'
}
const comm = readFileSync('/proc/1/comm', 'utf-8').trim()
if (comm === 'systemd') return 'systemd'
if (comm === 'systemd') {
return existsSync('/run/systemd/system') ? 'systemd' : 'other'
}
if (comm === 'init') return 'sysvinit'
return 'other'
@@ -223,13 +229,17 @@ export class GatewayManager {
}
/** 从 base 端口开始递增查找空闲端口(上限 65535) */
private findFreePort(base: number, host = '127.0.0.1'): Promise<number> {
private findFreePort(base: number, host = '127.0.0.1', reservedPorts = new Set<number>()): Promise<number> {
return new Promise((resolve, reject) => {
const tryPort = (port: number) => {
if (port > 65535) {
reject(new Error(`No free port found in range ${base}-65535`))
return
}
if (reservedPorts.has(port)) {
tryPort(port + 1)
return
}
const server = createServer()
server.once('error', () => {
server.close()
@@ -318,7 +328,7 @@ export class GatewayManager {
if (usedPorts.has(port)) {
// 已管理端口冲突 → 找空闲端口
const newPort = await this.findFreePort(port, host)
const newPort = await this.findFreePort(port, host, usedPorts)
logger.info('Port %d is in use for profile "%s", reassigning to %d', port, name, newPort)
this.writeProfilePort(name, newPort, host)
port = newPort
@@ -326,7 +336,7 @@ export class GatewayManager {
// 检查系统级端口占用(外部进程)
const available = await this.checkPortAvailable(port, host)
if (!available) {
const newPort = await this.findFreePort(port, host)
const newPort = await this.findFreePort(port, host, usedPorts)
logger.info('Port %d is occupied by another process for profile "%s", reassigning to %d', port, name, newPort)
this.writeProfilePort(name, newPort, host)
port = newPort