[codex] Handle chat run abort lifecycle (#454)

* feat: call upstream stop API when aborting a run

- Modified handleAbort to call POST /v1/runs/{run_id}/stop endpoint
- Use profile-specific upstream URL and API key from gatewayManager
- Add 5-second timeout with error handling and logging
- Keep local abortController.abort() for EventSource cleanup
- Change handleAbort to async method and update call site

This ensures the upstream Hermes gateway is properly notified
when a user aborts a run, allowing graceful termination.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: close ChatRunSocket connections on shutdown to prevent hanging

- Add close() method to ChatRunSocket to abort all active runs
  and clear session state
- Pass chatRunServer to bindShutdown and close it before
  groupChatServer during shutdown
- This prevents EventSource connections and abort controllers
  from keeping the process alive during nodemon restart

Fixes the "still waiting for sub-process to finish" issue.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Handle chat run abort lifecycle

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
ekko
2026-05-05 13:03:14 +08:00
committed by GitHub
parent f13ce3a080
commit e3d28f4659
8 changed files with 524 additions and 231 deletions
+45 -3
View File
@@ -70,6 +70,8 @@ const sessionEventHandlers = new Map<string, {
onRunFailed: (event: RunEvent) => void
onCompressionStarted: (event: RunEvent) => void
onCompressionCompleted: (event: RunEvent) => void
onAbortStarted: (event: RunEvent) => void
onAbortCompleted: (event: RunEvent) => void
onUsageUpdated: (event: RunEvent) => void
}>()
@@ -223,6 +225,34 @@ function globalCompressionCompletedHandler(event: RunEvent): void {
}
}
/**
* Global abort.started event handler
*/
function globalAbortStartedHandler(event: RunEvent): void {
const sid = event.session_id
if (!sid) return
const handlers = sessionEventHandlers.get(sid)
if (handlers?.onAbortStarted) {
handlers.onAbortStarted(event)
}
}
/**
* Global abort.completed event handler
*/
function globalAbortCompletedHandler(event: RunEvent): void {
const sid = event.session_id
if (!sid) return
const handlers = sessionEventHandlers.get(sid)
if (handlers?.onAbortCompleted) {
handlers.onAbortCompleted(event)
}
sessionEventHandlers.delete(sid)
}
/**
* Global usage.updated event handler
*/
@@ -256,6 +286,8 @@ export function registerSessionHandlers(
onRunFailed: (event: RunEvent) => void
onCompressionStarted: (event: RunEvent) => void
onCompressionCompleted: (event: RunEvent) => void
onAbortStarted: (event: RunEvent) => void
onAbortCompleted: (event: RunEvent) => void
onUsageUpdated: (event: RunEvent) => void
}
): () => void {
@@ -333,6 +365,8 @@ export function connectChatRun(): Socket {
// Compression events
chatRunSocket.on('compression.started', globalCompressionStartedHandler)
chatRunSocket.on('compression.completed', globalCompressionCompletedHandler)
chatRunSocket.on('abort.started', globalAbortStartedHandler)
chatRunSocket.on('abort.completed', globalAbortCompletedHandler)
// Usage events
chatRunSocket.on('usage.updated', globalUsageUpdatedHandler)
@@ -361,7 +395,7 @@ export function disconnectChatRun(): void {
*/
export function resumeSession(
sessionId: string,
onResumed: (data: { session_id: string; messages: any[]; isWorking: boolean; events: any[]; inputTokens?: number; outputTokens?: number }) => void,
onResumed: (data: { session_id: string; messages: any[]; isWorking: boolean; isAborting?: boolean; events: any[]; inputTokens?: number; outputTokens?: number }) => void,
): Socket {
const socket = connectChatRun()
@@ -436,6 +470,16 @@ export function startRunViaSocket(
if (closed) return
onEvent(evt)
},
onAbortStarted: (evt: RunEvent) => {
if (closed) return
onEvent(evt)
},
onAbortCompleted: (evt: RunEvent) => {
if (closed) return
onEvent(evt)
closed = true
onDone()
},
onUsageUpdated: (evt: RunEvent) => {
if (closed) return
onEvent(evt)
@@ -452,8 +496,6 @@ export function startRunViaSocket(
return {
abort: () => {
if (!closed) {
closed = true
sessionEventHandlers.delete(sid)
socket.emit('abort', { session_id: sid })
}
},
@@ -317,6 +317,7 @@ function isImage(type: string): boolean {
v-if="chatStore.isStreaming"
size="small"
type="error"
:disabled="chatStore.isAborting"
@click="chatStore.stopStreaming()"
>
{{ t('chat.stop') }}
@@ -145,7 +145,7 @@ watch(
:highlight="chatStore.focusMessageId === msg.id"
/>
<Transition name="fade">
<div v-if="chatStore.isRunActive" class="streaming-indicator">
<div v-if="chatStore.isRunActive || chatStore.abortState" class="streaming-indicator">
<video
:src="isDark ? thinkingVideoDark : thinkingVideoLight"
autoplay
@@ -154,7 +154,47 @@ watch(
playsinline
class="thinking-video"
/>
<div v-if="currentToolCalls.length > 0 || chatStore.compressionState" class="tool-calls-panel">
<div v-if="currentToolCalls.length > 0 || chatStore.compressionState || chatStore.abortState" class="tool-calls-panel">
<!-- Abort indicator -->
<div v-if="chatStore.abortState" class="tool-call-item compression-item">
<svg
v-if="chatStore.abortState.aborting"
width="12"
height="12"
viewBox="0 0 24 24"
fill="none"
stroke="currentColor"
stroke-width="1.5"
class="tool-call-icon"
>
<path d="M10 9v6m4-6v6M5 5h14v14H5z" />
</svg>
<svg
v-else
width="12"
height="12"
viewBox="0 0 24 24"
fill="none"
stroke="currentColor"
stroke-width="1.5"
class="tool-call-icon"
>
<path d="M5 13l4 4L19 7" />
</svg>
<span class="tool-call-name">
{{
chatStore.abortState.aborting
? 'Pausing... waiting for the run to stop and sync'
: chatStore.abortState.synced
? 'Paused and synced'
: 'Paused'
}}
</span>
<span
v-if="chatStore.abortState.aborting"
class="tool-call-spinner"
></span>
</div>
<!-- Compression indicator -->
<div v-if="chatStore.compressionState" class="tool-call-item compression-item">
<svg
@@ -133,7 +133,7 @@ watch(currentToolCalls, () => {
:highlight="chatStore.focusMessageId === msg.id"
/>
<Transition name="fade">
<div v-if="chatStore.isRunActive" class="streaming-indicator">
<div v-if="chatStore.isRunActive || chatStore.abortState" class="streaming-indicator">
<video
:src="isDark ? thinkingVideoDark : thinkingVideoLight"
autoplay
@@ -142,7 +142,47 @@ watch(currentToolCalls, () => {
playsinline
class="thinking-video"
/>
<div v-if="currentToolCalls.length > 0 || chatStore.compressionState" class="tool-calls-panel">
<div v-if="currentToolCalls.length > 0 || chatStore.compressionState || chatStore.abortState" class="tool-calls-panel">
<!-- Abort indicator -->
<div v-if="chatStore.abortState" class="tool-call-item compression-item">
<svg
v-if="chatStore.abortState.aborting"
width="12"
height="12"
viewBox="0 0 24 24"
fill="none"
stroke="currentColor"
stroke-width="1.5"
class="tool-call-icon"
>
<path d="M10 9v6m4-6v6M5 5h14v14H5z" />
</svg>
<svg
v-else
width="12"
height="12"
viewBox="0 0 24 24"
fill="none"
stroke="currentColor"
stroke-width="1.5"
class="tool-call-icon"
>
<path d="M5 13l4 4L19 7" />
</svg>
<span class="tool-call-name">
{{
chatStore.abortState.aborting
? 'Pausing... waiting for the run to stop and sync'
: chatStore.abortState.synced
? 'Paused and synced'
: 'Paused'
}}
</span>
<span
v-if="chatStore.abortState.aborting"
class="tool-call-spinner"
></span>
</div>
<!-- Compression indicator -->
<div v-if="chatStore.compressionState" class="tool-call-item compression-item">
<svg
+105 -102
View File
@@ -1,4 +1,4 @@
import { startRunViaSocket, resumeSession, registerSessionHandlers, unregisterSessionHandlers, type RunEvent, type ContentBlock as ContentBlockImport } from '@/api/hermes/chat'
import { startRunViaSocket, resumeSession, registerSessionHandlers, unregisterSessionHandlers, getChatRunSocket, type RunEvent, type ContentBlock as ContentBlockImport } from '@/api/hermes/chat'
import { deleteSession as deleteSessionApi, fetchSession, fetchSessions, type HermesMessage, type SessionSummary } from '@/api/hermes/sessions'
import { getApiKey } from '@/api/client'
import { defineStore } from 'pinia'
@@ -219,7 +219,6 @@ function mapHermesSession(s: SessionSummary): Session {
const STORAGE_KEY_PREFIX = 'hermes_active_session_'
const LEGACY_STORAGE_KEY = 'hermes_active_session'
const IN_FLIGHT_TTL_MS = 15 * 60 * 1000 // Give up after 15 minutes
// 获取当前 profile 名称,用于隔离缓存。
// 从 profiles store 的 activeProfileName(同步 localStorage)读取,
@@ -234,22 +233,6 @@ function getProfileName(): string {
function storageKey(): string { return STORAGE_KEY_PREFIX + getProfileName() }
function legacyStorageKey(): string | null { return getProfileName() === 'default' ? LEGACY_STORAGE_KEY : null }
function inFlightKey(sid: string): string { return `hermes_in_flight_v1_${getProfileName()}_${sid}` }
function legacyInFlightKey(sid: string): string | null { return getProfileName() === 'default' ? `hermes_in_flight_v1_${sid}` : null }
interface InFlightRun {
runId: string
startedAt: number
}
function loadJson<T>(key: string): T | null {
try {
const raw = localStorage.getItem(key)
return raw ? (JSON.parse(raw) as T) : null
} catch {
return null
}
}
function isQuotaExceededError(error: unknown): boolean {
if (!error || typeof error !== 'object') return false
@@ -301,14 +284,6 @@ function setItemBestEffort(key: string, value: string) {
}
}
function saveJson(key: string, value: unknown) {
try {
setItemBestEffort(key, JSON.stringify(value))
} catch {
// quota exceeded or private mode — ignore, cache is best-effort
}
}
function removeItem(key: string) {
try {
localStorage.removeItem(key)
@@ -317,23 +292,6 @@ function removeItem(key: string) {
}
}
function loadJsonWithFallback<T>(key: string, legacyKey?: string | null): T | null {
const value = loadJson<T>(key)
if (value != null) return value
if (!legacyKey) return null
return loadJson<T>(legacyKey)
}
function saveJsonWithLegacy(key: string, value: unknown, legacyKey?: string | null) {
saveJson(key, value)
if (legacyKey) removeItem(legacyKey)
}
function removeItemWithLegacy(key: string, legacyKey?: string | null) {
removeItem(key)
if (legacyKey) removeItem(legacyKey)
}
// Strip the circular `file: File` reference from attachments before caching —
// File objects don't serialize and we only need name/type/size/url for display.
@@ -375,6 +333,17 @@ export const useChatStore = defineStore('chat', () => {
compressionState.value = state
}
const abortState = ref<{
aborting: boolean
synced: boolean | null
error?: string
} | null>(null)
const isAborting = computed(() => abortState.value?.aborting === true)
function setAbortState(state: typeof abortState.value) {
abortState.value = state
}
const activeSession = ref<Session | null>(null)
const messages = computed<Message[]>(() => activeSession.value?.messages || [])
@@ -382,30 +351,11 @@ export const useChatStore = defineStore('chat', () => {
return streamStates.value.has(sessionId) || serverWorking.value.has(sessionId)
}
function markInFlight(sid: string, runId: string) {
saveJsonWithLegacy(inFlightKey(sid), { runId, startedAt: Date.now() } as InFlightRun, legacyInFlightKey(sid))
}
function clearInFlight(sid: string) {
removeItemWithLegacy(inFlightKey(sid), legacyInFlightKey(sid))
}
function readInFlight(sid: string): InFlightRun | null {
const rec = loadJsonWithFallback<InFlightRun>(inFlightKey(sid), legacyInFlightKey(sid))
if (!rec) return null
if (Date.now() - rec.startedAt > IN_FLIGHT_TTL_MS) {
removeItemWithLegacy(inFlightKey(sid), legacyInFlightKey(sid))
return null
}
return rec
}
async function loadSessions() {
isLoadingSessions.value = true
try {
const list = await fetchSessions()
const fresh = list.map(mapHermesSession)
const freshIds = new Set(fresh.map(s => s.id))
// Preserve already-loaded messages for sessions that are still present,
// so we don't blow away the active session's messages on refresh.
const msgsByIdBefore = new Map(sessions.value.map(s => [s.id, s.messages]))
@@ -413,19 +363,7 @@ export const useChatStore = defineStore('chat', () => {
const prev = msgsByIdBefore.get(s.id)
if (prev && prev.length) s.messages = prev
}
// Preserve local-only sessions the server hasn't seen yet — e.g. a chat
// that was just created and whose first run is still in-flight. Without
// this, refreshing mid-run would wipe the session and fall back to
// sessions[0], which is exactly what the user reported.
// Sessions without an active in-flight run are considered deleted and
// cleaned up along with their cached messages.
const localOnly = sessions.value.filter(s => {
if (freshIds.has(s.id)) return false
if (readInFlight(s.id)) return true
removeItemWithLegacy(inFlightKey(s.id), legacyInFlightKey(s.id))
return false
})
sessions.value = [...localOnly, ...fresh]
sessions.value = fresh
// Restore last active session, fallback to most recent
const savedId = activeSessionId.value
@@ -500,6 +438,11 @@ export const useChatStore = defineStore('chat', () => {
} else {
serverWorking.value.delete(sessionId)
}
if ((data as any).isAborting) {
setAbortState({ aborting: true, synced: null })
} else if (!data.isWorking) {
setAbortState(null)
}
if (data.inputTokens != null) activeSession.value!.inputTokens = data.inputTokens
if (data.outputTokens != null) activeSession.value!.outputTokens = data.outputTokens
if (data.messages?.length) {
@@ -533,6 +476,10 @@ export const useChatStore = defineStore('chat', () => {
compressed: e.compressed ?? false,
error: e.error,
})
} else if (e.event === 'abort.started') {
setAbortState({ aborting: true, synced: null })
} else if (e.event === 'abort.completed') {
setAbortState({ aborting: false, synced: e.synced ?? false })
}
}
}
@@ -546,7 +493,7 @@ export const useChatStore = defineStore('chat', () => {
}
// Resume in-flight run event listeners if needed
resumeInFlightRun(sessionId)
resumeServerWorkingRun(sessionId)
}
function newChat() {
@@ -744,6 +691,33 @@ export const useChatStore = defineStore('chat', () => {
break
}
case 'abort.started': {
console.log('[chat abort] pause started', evt)
setAbortState({ aborting: true, synced: null })
break
}
case 'abort.completed': {
console.log('[chat abort] pause completed', evt)
setAbortState({ aborting: false, synced: (evt as any).synced ?? false })
const msgs = getSessionMsgs(sid)
const lastMsg = msgs[msgs.length - 1]
if (lastMsg?.isStreaming) {
updateMessage(sid, lastMsg.id, { isStreaming: false })
}
msgs.forEach((m, i) => {
if (m.role === 'tool' && m.toolStatus === 'running') {
msgs[i] = { ...m, toolStatus: 'done' }
}
})
cleanup()
if (sid === activeSessionId.value) {
void refreshActiveSession()
}
setAbortState(null)
break
}
case 'reasoning.delta':
case 'thinking.delta': {
const text = evt.text || evt.delta || ''
@@ -943,13 +917,6 @@ export const useChatStore = defineStore('chat', () => {
cleanup()
updateSessionTitle(sid)
// the in-flight marker. If the browser is reloading right now
// and kills us between the two localStorage writes, we want
// the next page load to still see in-flight === true (so
// polling kicks in and recovers) rather than the other way
// around (cleared in-flight + stale streaming cache = UI stuck).
clearInFlight(sid)
break
}
@@ -976,8 +943,6 @@ export const useChatStore = defineStore('chat', () => {
}
})
cleanup()
clearInFlight(sid)
break
}
@@ -1020,10 +985,7 @@ export const useChatStore = defineStore('chat', () => {
void refreshActiveSession()
}
},
// onStarted — called when server acks with run_id
(runId: string) => {
markInFlight(sid, runId)
},
undefined,
)
streamStates.value.set(sid, ctrl)
@@ -1042,11 +1004,11 @@ export const useChatStore = defineStore('chat', () => {
* Emits 'resume' to join the session room on the server,
* then sets up event listeners to receive ongoing events.
*/
function resumeInFlightRun(sid: string) {
function resumeServerWorkingRun(sid: string) {
// Don't register duplicate listeners if already streaming
if (streamStates.value.has(sid)) return
// Only set up listeners if there's an actual in-flight run
if (!readInFlight(sid)) return
// Only set up listeners if the server reported an active run during resume.
if (!serverWorking.value.has(sid)) return
let closed = false
let runProducedAssistantText = false
@@ -1098,6 +1060,33 @@ export const useChatStore = defineStore('chat', () => {
break
}
case 'abort.started': {
console.log('[chat abort] resumed pause started', evt)
setAbortState({ aborting: true, synced: null })
break
}
case 'abort.completed': {
console.log('[chat abort] resumed pause completed', evt)
setAbortState({ aborting: false, synced: (evt as any).synced ?? false })
const msgs = getSessionMsgs(sid)
const lastMsg = msgs[msgs.length - 1]
if (lastMsg?.isStreaming) {
updateMessage(sid, lastMsg.id, { isStreaming: false })
}
msgs.forEach((m, i) => {
if (m.role === 'tool' && m.toolStatus === 'running') {
msgs[i] = { ...m, toolStatus: 'done' }
}
})
cleanup()
if (sid === activeSessionId.value) {
void refreshActiveSession()
}
setAbortState(null)
break
}
case 'reasoning.delta':
case 'thinking.delta': {
const text = evt.text || evt.delta || ''
@@ -1252,8 +1241,6 @@ export const useChatStore = defineStore('chat', () => {
cleanup()
updateSessionTitle(sid)
clearInFlight(sid)
break
}
@@ -1280,8 +1267,6 @@ export const useChatStore = defineStore('chat', () => {
}
})
cleanup()
clearInFlight(sid)
break
}
@@ -1309,6 +1294,8 @@ export const useChatStore = defineStore('chat', () => {
onRunFailed: (evt) => handleEvent(evt),
onCompressionStarted: (evt) => handleEvent(evt),
onCompressionCompleted: (evt) => handleEvent(evt),
onAbortStarted: (evt) => handleEvent(evt),
onAbortCompleted: (evt) => handleEvent(evt),
onUsageUpdated: (evt) => handleEvent(evt),
})
@@ -1316,25 +1303,29 @@ export const useChatStore = defineStore('chat', () => {
// Server already joined room and replayed events.
// Just set up handlers for ongoing streaming events.
// Mark as streaming so UI shows the indicator
streamStates.value.set(sid, { abort: cleanup })
// Mark as streaming so UI shows the indicator and can still abort after refresh.
streamStates.value.set(sid, {
abort: () => {
getChatRunSocket()?.emit('abort', { session_id: sid })
},
})
}
function stopStreaming() {
const sid = activeSessionId.value
if (!sid) return
if (isAborting.value) return
const ctrl = streamStates.value.get(sid)
if (ctrl) {
console.log('[chat abort] stop requested', { sessionId: sid })
setAbortState({ aborting: true, synced: null })
ctrl.abort()
const msgs = getSessionMsgs(sid)
const lastMsg = msgs[msgs.length - 1]
if (lastMsg?.isStreaming) {
updateMessage(sid, lastMsg.id, { isStreaming: false })
}
streamStates.value.delete(sid)
serverWorking.value.delete(sid)
}
clearInFlight(sid)
}
// Tab visibility: re-sync when returning to foreground
@@ -1345,11 +1336,21 @@ export const useChatStore = defineStore('chat', () => {
if (sid && !streamStates.value.has(sid)) {
// Re-load messages via resume (server loads from DB)
resumeSession(sid, (data) => {
if (data.isWorking) {
serverWorking.value.add(sid)
} else {
serverWorking.value.delete(sid)
}
if (data.isAborting) {
setAbortState({ aborting: true, synced: null })
} else if (!data.isWorking) {
setAbortState(null)
}
if (data.messages?.length && activeSession.value) {
activeSession.value.messages = mapHermesMessages(data.messages as any[])
}
resumeServerWorkingRun(sid)
})
resumeInFlightRun(sid)
}
}
})
@@ -1431,6 +1432,8 @@ export const useChatStore = defineStore('chat', () => {
isRunActive,
isSessionLive,
compressionState,
abortState,
isAborting,
isLoadingSessions,
sessionsLoaded,
isLoadingMessages,
+3 -2
View File
@@ -37,6 +37,7 @@ process.on('unhandledRejection', (reason) => {
})
let server: any = null
let chatRunServer: any = null
export async function bootstrap() {
console.log(`hermes-web-ui v${APP_VERSION} starting...`)
@@ -102,7 +103,7 @@ export async function bootstrap() {
groupChatServer.setGatewayManager(getGatewayManagerInstance())
// Chat run Socket.IO — shares the same Server instance, just adds /chat-run namespace
const chatRunServer = new ChatRunSocket(groupChatServer.getIO(), getGatewayManagerInstance())
chatRunServer = new ChatRunSocket(groupChatServer.getIO(), getGatewayManagerInstance())
setChatRunServer(chatRunServer)
chatRunServer.init()
@@ -139,7 +140,7 @@ export async function bootstrap() {
logger.error({ err }, 'Server error')
})
bindShutdown(server, groupChatServer)
bindShutdown(server, groupChatServer, chatRunServer)
startVersionCheck()
}
@@ -154,10 +154,12 @@ interface SessionState {
isWorking: boolean
events: Array<{ event: string; data: any }>
abortController?: AbortController
eventSource?: EventSource
runId?: string
profile?: string
inputTokens?: number
outputTokens?: number
isAborting?: boolean
}
// --- ChatRunSocket ---
@@ -218,7 +220,7 @@ export class ChatRunSocket {
socket.on('abort', (data: { session_id?: string }) => {
if (data.session_id) {
this.handleAbort(socket, data.session_id)
void this.handleAbort(socket, data.session_id)
}
})
}
@@ -406,6 +408,7 @@ export class ChatRunSocket {
session_id: sid,
messages: state.messages,
isWorking: state.isWorking,
isAborting: state.isAborting || false,
events: state.isWorking ? state.events : [],
inputTokens: state.inputTokens,
outputTokens: state.outputTokens,
@@ -815,6 +818,7 @@ export class ChatRunSocket {
if (!res.ok) {
const text = await res.text().catch(() => '')
emit('run.failed', { event: 'run.failed', error: `Upstream ${res.status}: ${text}` })
if (session_id) this.markCompleted(socket, session_id, { event: 'run.failed' })
return
}
@@ -822,6 +826,7 @@ export class ChatRunSocket {
const runId = runData.run_id
if (!runId) {
emit('run.failed', { event: 'run.failed', error: 'No run_id in upstream response' })
if (session_id) this.markCompleted(socket, session_id, { event: 'run.failed' })
return
}
@@ -855,6 +860,10 @@ export class ChatRunSocket {
// @ts-ignore - eventsource library types are too strict
const source = new EventSource(eventsUrl.toString(), eventSourceInit)
if (session_id) {
const state = this.getOrCreateSession(session_id)
state.eventSource = source
}
source.onmessage = (event: MessageEvent) => {
try {
@@ -1046,19 +1055,31 @@ export class ChatRunSocket {
}
}
if (parsed.event === 'run.completed' || parsed.event === 'run.failed') {
source.close()
if (session_id && this.sessionMap.get(session_id)?.isAborting) {
logger.info({
sessionId: session_id,
runId: parsed.run_id,
event: parsed.event,
}, '[chat-run-socket][abort] suppressing upstream terminal event during abort')
return
}
if (session_id) this.markCompleted(socket, session_id, { event: parsed.event, run_id: parsed.run_id })
}
// Usage will be calculated after syncFromHermes completes (in markCompleted)
emit(parsed.event || 'message', parsed)
if (parsed.event === 'run.completed' || parsed.event === 'run.failed') {
source.close()
if (session_id) this.markCompleted(socket, session_id, { event: parsed.event, run_id: parsed.run_id })
}
} catch { /* not JSON, skip */ }
}
source.onerror = () => {
source.close()
if (session_id && this.sessionMap.get(session_id)?.isAborting) {
logger.info({ sessionId: session_id }, '[chat-run-socket][abort] event source closed during abort')
return
}
emit('run.failed', { event: 'run.failed', error: 'EventSource connection lost' })
if (session_id) this.markCompleted(socket, session_id, { event: 'run.failed' })
}
@@ -1070,20 +1091,78 @@ export class ChatRunSocket {
// --- Abort handler ---
private handleAbort(socket: Socket, sessionId: string) {
private async handleAbort(socket: Socket, sessionId: string) {
const state = this.sessionMap.get(sessionId)
if (state?.isWorking && state.abortController) {
state.abortController.abort()
this.markCompleted(socket, sessionId, { event: 'run.failed', run_id: state.runId })
if (!state?.isWorking || !state.runId) {
logger.info({ sessionId }, '[chat-run-socket][abort] ignored: no active run')
return
}
const runId = state.runId
state.isAborting = true
this.replaceState(sessionId, 'abort.started', {
event: 'abort.started',
run_id: runId,
graceMs: 5000,
})
this.emitToSession(socket, sessionId, 'abort.started', {
event: 'abort.started',
run_id: runId,
graceMs: 5000,
})
logger.info({ sessionId, runId }, '[chat-run-socket][abort] started')
// Call upstream stop endpoint
const profile = state.profile || 'default'
const upstream = this.gatewayManager.getUpstream(profile).replace(/\/$/, '')
const apiKey = this.gatewayManager.getApiKey(profile) || undefined
try {
const headers: Record<string, string> = { 'Content-Type': 'application/json' }
if (apiKey) headers['Authorization'] = `Bearer ${apiKey}`
logger.info({ sessionId, runId, upstream }, '[chat-run-socket][abort] calling upstream stop')
await fetch(`${upstream}/v1/runs/${runId}/stop`, {
method: 'POST',
headers,
})
logger.info('[chat-run-socket] called upstream stop for run %s (session: %s)', runId, sessionId)
logger.info({ sessionId, runId, graceMs: 5000 }, '[chat-run-socket][abort] upstream stop accepted, waiting for graceful exit')
// Wait for upstream to process the stop request
await new Promise(resolve => setTimeout(resolve, 5000))
} catch (err: any) {
logger.warn(err, '[chat-run-socket] failed to call upstream stop for run %s (session: %s)', runId, sessionId)
logger.warn({ sessionId, runId, error: err?.message }, '[chat-run-socket][abort] upstream stop failed, continuing local completion')
}
// Close local EventSource connection after the upstream grace period.
if (state.eventSource) {
state.eventSource.close()
state.eventSource = undefined
logger.info({ sessionId, runId }, '[chat-run-socket][abort] event source closed')
}
if (state.abortController) {
state.abortController.abort()
}
await this.markAbortCompleted(socket, sessionId, runId)
}
/** Mark a session run as completed/failed so reconnecting clients get notified */
private markCompleted(socket: Socket, sessionId: string, _info: { event: string; run_id?: string }) {
const state = this.sessionMap.get(sessionId)
if (state) {
if (state.isAborting) {
logger.info({
sessionId,
runId: state.runId,
}, '[chat-run-socket][abort] terminal upstream event observed; abort handler will finish cleanup')
return
}
state.isWorking = false
state.abortController = undefined
state.eventSource = undefined
state.runId = undefined
state.events = []
// Sync messages from Hermes ephemeral session to local DB
@@ -1092,11 +1171,47 @@ export class ChatRunSocket {
const prof = state.profile
this.hermesSessionIds.delete(sessionId)
state.profile = undefined
this.syncFromHermes(socket, sessionId, hermesId, prof)
void this.syncFromHermes(socket, sessionId, hermesId, prof)
}
}
}
private async markAbortCompleted(socket: Socket, sessionId: string, runId: string) {
const state = this.sessionMap.get(sessionId)
if (!state) return
const hermesId = this.hermesSessionIds.get(sessionId)
const profile = state.profile
let synced = false
if (useLocalSessionStore() && hermesId) {
this.hermesSessionIds.delete(sessionId)
logger.info({ sessionId, hermesId, profile: profile || 'default' }, '[chat-run-socket][abort] syncing stopped run from Hermes')
synced = await this.syncFromHermes(socket, sessionId, hermesId, profile, {
maxAttempts: 10,
delayMs: 1000,
})
}
state.isWorking = false
state.isAborting = false
state.profile = undefined
state.abortController = undefined
state.eventSource = undefined
state.runId = undefined
this.replaceState(sessionId, 'abort.completed', {
event: 'abort.completed',
run_id: runId,
synced,
})
this.emitToSession(socket, sessionId, 'abort.completed', {
event: 'abort.completed',
run_id: runId,
synced,
})
state.events = []
logger.info({ sessionId, runId, synced }, '[chat-run-socket][abort] completed')
}
/**
* Calculate usage from DB and update state + emit to clients.
* @returns { inputTokens, outputTokens } for the caller to use
@@ -1147,129 +1262,150 @@ export class ChatRunSocket {
* and write to local DB. This gives us tool results that SSE events don't include.
* After sync, enqueues the ephemeral session for deletion.
*/
private syncFromHermes(socket: Socket, localSessionId: string, hermesSessionId: string, profile?: string) {
getSessionDetailFromDb(hermesSessionId)
.then((detail) => {
private async syncFromHermes(
socket: Socket,
localSessionId: string,
hermesSessionId: string,
profile?: string,
options?: { maxAttempts?: number; delayMs?: number },
): Promise<boolean> {
const maxAttempts = options?.maxAttempts || 1
const delayMs = options?.delayMs || 0
try {
let detail: Awaited<ReturnType<typeof getSessionDetailFromDb>> | null = null
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
detail = await getSessionDetailFromDb(hermesSessionId)
if (!detail || !detail.messages?.length) {
logger.warn('[chat-run-socket] syncFromHermes: no data for Hermes session %s', hermesSessionId)
return
logger.warn('[chat-run-socket] syncFromHermes: no data for Hermes session %s (attempt %d/%d)', hermesSessionId, attempt, maxAttempts)
logger.info({ localSessionId, hermesSessionId, attempt, maxAttempts }, '[chat-run-socket][abort] sync waiting for Hermes data')
if (attempt < maxAttempts && delayMs > 0) {
await new Promise(resolve => setTimeout(resolve, delayMs))
continue
}
this.enqueueEphemeralDelete(hermesSessionId, profile)
return false
}
// Skip user messages — already written to local DB in handleRun
const toInsert = detail.messages.filter(m => m.role !== 'user')
break
}
if (!detail) return false
// Build tool_call_id → function.name lookup from assistant messages
// (Hermes stores tool_name as NULL, name lives inside tool_calls JSON)
const toolNameMap = new Map<string, string>()
for (const msg of detail.messages) {
if (msg.role === 'assistant' && Array.isArray(msg.tool_calls)) {
for (const tc of msg.tool_calls) {
const id = tc.id || tc.call_id || tc.tool_call_id
const name = tc.function?.name || tc.name
if (id && name) toolNameMap.set(id, name)
}
// Skip user messages — already written to local DB in handleRun
const toInsert = detail.messages.filter(m => m.role !== 'user')
// Build tool_call_id → function.name lookup from assistant messages
// (Hermes stores tool_name as NULL, name lives inside tool_calls JSON)
const toolNameMap = new Map<string, string>()
for (const msg of detail.messages) {
if (msg.role === 'assistant' && Array.isArray(msg.tool_calls)) {
for (const tc of msg.tool_calls) {
const id = tc.id || tc.call_id || tc.tool_call_id
const name = tc.function?.name || tc.name
if (id && name) toolNameMap.set(id, name)
}
}
}
if (toInsert.length > 0) {
// Get in-memory messages to preserve reasoning that was streamed via SSE
const state = this.sessionMap.get(localSessionId)
const memoryMessages = state?.messages || []
logger.info('[chat-run-socket] syncFromHermes: memory has %d messages, DB has %d messages',
memoryMessages.length, toInsert.length)
if (toInsert.length > 0) {
// Get in-memory messages to preserve reasoning that was streamed via SSE
const state = this.sessionMap.get(localSessionId)
const memoryMessages = state?.messages || []
logger.info('[chat-run-socket] syncFromHermes: memory has %d messages, DB has %d messages',
memoryMessages.length, toInsert.length)
// Match messages by order since Hermes DB and memory should have same sequence
let memoryIdx = 0
let mergedCount = 0
for (let i = 0; i < toInsert.length && memoryIdx < memoryMessages.length; i++) {
const dbMsg = toInsert[i]
// Skip user messages in memory when matching
while (memoryIdx < memoryMessages.length && memoryMessages[memoryIdx].role === 'user') {
memoryIdx++
}
if (memoryIdx >= memoryMessages.length) break
const memoryMsg = memoryMessages[memoryIdx]
// Only merge if roles match
if (dbMsg.role === memoryMsg.role) {
// Merge reasoning from memory if DB doesn't have it
if (!dbMsg.reasoning && memoryMsg.reasoning) {
dbMsg.reasoning = memoryMsg.reasoning
mergedCount++
logger.info('[chat-run-socket] syncFromHermes: merged reasoning from memory to DB for %s message at index %d',
dbMsg.role, i)
}
}
// Match messages by order since Hermes DB and memory should have same sequence
let memoryIdx = 0
let mergedCount = 0
for (let i = 0; i < toInsert.length && memoryIdx < memoryMessages.length; i++) {
const dbMsg = toInsert[i]
// Skip user messages in memory when matching
while (memoryIdx < memoryMessages.length && memoryMessages[memoryIdx].role === 'user') {
memoryIdx++
}
if (mergedCount > 0) {
logger.info('[chat-run-socket] syncFromHermes: merged reasoning for %d messages', mergedCount)
if (memoryIdx >= memoryMessages.length) break
const memoryMsg = memoryMessages[memoryIdx]
// Only merge if roles match
if (dbMsg.role === memoryMsg.role) {
// Merge reasoning from memory if DB doesn't have it
if (!dbMsg.reasoning && memoryMsg.reasoning) {
dbMsg.reasoning = memoryMsg.reasoning
mergedCount++
logger.info('[chat-run-socket] syncFromHermes: merged reasoning from memory to DB for %s message at index %d',
dbMsg.role, i)
}
}
// Batch insert with transaction for atomicity
addMessages(toInsert.map(msg => {
// Resolve tool_name from assistant's tool_calls if missing
let toolName = msg.tool_name || null
if (!toolName && msg.tool_call_id) {
toolName = toolNameMap.get(msg.tool_call_id) || null
}
return {
session_id: localSessionId,
role: msg.role,
content: msg.content || '',
tool_call_id: msg.tool_call_id || null,
tool_calls: msg.tool_calls || null,
tool_name: toolName,
timestamp: msg.timestamp || Math.floor(Date.now() / 1000),
token_count: msg.token_count || null,
finish_reason: msg.finish_reason || null,
reasoning: msg.reasoning || null,
reasoning_details: msg.reasoning_details || null,
reasoning_content: msg.reasoning_content || null,
codex_reasoning_items: msg.codex_reasoning_items || null,
}
}))
logger.info('[chat-run-socket] syncFromHermes: synced %d messages to local session %s', toInsert.length, localSessionId)
memoryIdx++
}
updateSessionStats(localSessionId)
// Record usage from Hermes session
updateUsage(localSessionId, {
inputTokens: detail.input_tokens,
outputTokens: detail.output_tokens,
cacheReadTokens: detail.cache_read_tokens,
cacheWriteTokens: detail.cache_write_tokens,
reasoningTokens: detail.reasoning_tokens,
model: detail.model,
profile: profile || 'default',
})
// Calculate usage from DB now that data is complete
// Use inputTokens already set by compression path if available
const state = this.sessionMap.get(localSessionId)
if (state) {
const messages = this.handleMessage(toInsert, localSessionId)
if (messages.length > 0) {
this.replaceByHermesSessionId(localSessionId, hermesSessionId, messages)
}
const emit = (event: string, payload: any) => {
const tagged = localSessionId ? { ...payload, localSessionId } : payload
if (localSessionId) {
this.nsp.to(`session:${localSessionId}`).emit(event, tagged)
} else if (socket.connected) {
socket.emit(event, tagged)
}
}
this.calcAndUpdateUsage(localSessionId, state, emit)
if (mergedCount > 0) {
logger.info('[chat-run-socket] syncFromHermes: merged reasoning for %d messages', mergedCount)
}
// Enqueue ephemeral session for deferred deletion
this.enqueueEphemeralDelete(hermesSessionId, profile)
})
.catch((err: any) => {
logger.warn(err, '[chat-run-socket] syncFromHermes failed for session %s (hermesId: %s, profile: %s)', localSessionId, hermesSessionId, profile || 'default')
// Batch insert with transaction for atomicity
addMessages(toInsert.map(msg => {
// Resolve tool_name from assistant's tool_calls if missing
let toolName = msg.tool_name || null
if (!toolName && msg.tool_call_id) {
toolName = toolNameMap.get(msg.tool_call_id) || null
}
return {
session_id: localSessionId,
role: msg.role,
content: msg.content || '',
tool_call_id: msg.tool_call_id || null,
tool_calls: msg.tool_calls || null,
tool_name: toolName,
timestamp: msg.timestamp || Math.floor(Date.now() / 1000),
token_count: msg.token_count || null,
finish_reason: msg.finish_reason || null,
reasoning: msg.reasoning || null,
reasoning_details: msg.reasoning_details || null,
reasoning_content: msg.reasoning_content || null,
codex_reasoning_items: msg.codex_reasoning_items || null,
}
}))
logger.info('[chat-run-socket] syncFromHermes: synced %d messages to local session %s', toInsert.length, localSessionId)
}
updateSessionStats(localSessionId)
// Record usage from Hermes session
updateUsage(localSessionId, {
inputTokens: detail.input_tokens,
outputTokens: detail.output_tokens,
cacheReadTokens: detail.cache_read_tokens,
cacheWriteTokens: detail.cache_write_tokens,
reasoningTokens: detail.reasoning_tokens,
model: detail.model,
profile: profile || 'default',
})
// Calculate usage from DB now that data is complete
// Use inputTokens already set by compression path if available
const state = this.sessionMap.get(localSessionId)
if (state) {
const messages = this.handleMessage(toInsert, localSessionId)
if (messages.length > 0) {
this.replaceByHermesSessionId(localSessionId, hermesSessionId, messages)
}
const emit = (event: string, payload: any) => {
const tagged = localSessionId ? { ...payload, localSessionId } : payload
if (localSessionId) {
this.nsp.to(`session:${localSessionId}`).emit(event, tagged)
} else if (socket.connected) {
socket.emit(event, tagged)
}
}
this.calcAndUpdateUsage(localSessionId, state, emit)
}
// Enqueue ephemeral session for deferred deletion
this.enqueueEphemeralDelete(hermesSessionId, profile)
return true
} catch (err: any) {
logger.warn(err, '[chat-run-socket] syncFromHermes failed for session %s (hermesId: %s, profile: %s)', localSessionId, hermesSessionId, profile || 'default')
return false
}
}
private replaceByHermesSessionId(session_id: string, hermesSessionId: string, newItems: SessionMessage[]) {
let start = -1
@@ -1336,6 +1472,30 @@ export class ChatRunSocket {
}
this.pushState(sessionId, event, data)
}
private emitToSession(socket: Socket, sessionId: string, event: string, payload: any) {
const tagged = { ...payload, session_id: sessionId }
this.nsp.to(`session:${sessionId}`).emit(event, tagged)
if (!this.nsp.adapter.rooms.get(`session:${sessionId}`)?.size && socket.connected) {
socket.emit(event, tagged)
}
}
/** Close all active EventSource connections and abort controllers */
close() {
for (const [sessionId, state] of this.sessionMap.entries()) {
if (state.abortController) {
try {
state.abortController.abort()
} catch (e) {
logger.warn(e, '[chat-run-socket] failed to abort controller for session %s', sessionId)
}
}
}
this.sessionMap.clear()
this.hermesSessionIds.clear()
logger.info('[chat-run-socket] closed all connections and cleared state')
}
}
/** Check if any assistant message in the list has non-empty content */
+7 -1
View File
@@ -1,6 +1,6 @@
import { logger } from './logger'
export function bindShutdown(server: any, groupChatServer?: any): void {
export function bindShutdown(server: any, groupChatServer?: any, chatRunServer?: any): void {
let isShuttingDown = false
const shutdown = async (signal: string) => {
@@ -10,6 +10,12 @@ export function bindShutdown(server: any, groupChatServer?: any): void {
logger.info('Shutting down (%s)...', signal)
try {
// Close ChatRunSocket first to abort all active runs and close EventSource connections
if (chatRunServer) {
chatRunServer.close()
logger.info('ChatRunSocket closed')
}
// Disconnect Socket.IO before HTTP server to prevent hanging
if (groupChatServer) {
groupChatServer.agentClients.disconnectAll()