2026-04-29 16:26:24 +08:00
/**
* Chat run via Socket.IO — namespace /chat-run.
*
* Replaces HTTP POST + SSE. Socket.IO decouples message handling
* from connection lifecycle: the server continues streaming upstream
* events even after the client disconnects or refreshes.
*
* Uses Socket.IO rooms keyed by session_id. On client reconnect,
* the client emits 'resume' to rejoin its session room.
*/
import type { Server , Socket } from 'socket.io'
import { EventSource } from 'eventsource'
import { setRunSession } from '../../routes/hermes/proxy-handler'
import { updateUsage } from '../../db/hermes/usage-store'
2026-05-04 19:48:40 +08:00
import { getSystemPrompt } from '../../lib/llm-prompt'
2026-04-29 16:26:24 +08:00
import {
getSession ,
getSessionDetail ,
2026-04-30 16:40:37 +08:00
getSessionDetailPaginated ,
2026-04-29 16:26:24 +08:00
createSession ,
addMessage ,
2026-05-02 20:12:58 +08:00
addMessages ,
2026-04-29 16:26:24 +08:00
updateSessionStats ,
useLocalSessionStore ,
} from '../../db/hermes/session-store'
import { getDb } from '../../db/index'
import { getSessionDetailFromDb } from '../../db/hermes/sessions-db'
import { getModelContextLength } from './model-context'
import { ChatContextCompressor , countTokens , SUMMARY_PREFIX } from '../../lib/context-compressor'
import { getCompressionSnapshot } from '../../db/hermes/compression-snapshot'
2026-05-02 08:58:14 +08:00
import { parseLLMJSON , parseToolArguments , parseAnthropicContentArray } from '../../lib/llm-json'
2026-04-29 16:26:24 +08:00
import { logger } from '../logger'
2026-05-02 15:39:01 +08:00
/**
* Content block types for Anthropic-compatible message format
*/
export type ContentBlock =
| { type : 'text' ; text : string }
| { type : 'image' ; name : string ; path : string ; media_type : string }
| { type : 'file' ; name : string ; path : string ; media_type? : string }
/**
* Convert ContentBlock[] to string for display/storage
* - string → 直接返回
* - ContentBlock[] → 返回 JSON 字符串
*/
function contentBlocksToString ( input : string | ContentBlock [ ] ) : string {
if ( typeof input === 'string' ) return input
return JSON . stringify ( input )
}
/**
* Extract text content from ContentBlock[] for title preview
*/
function extractTextForPreview ( input : string | ContentBlock [ ] ) : string {
if ( typeof input === 'string' ) return input
return input
. filter ( block = > block . type === 'text' )
. map ( block = > block . text )
. join ( '\n' )
}
/**
* Check if input is ContentBlock array
*/
function isContentBlockArray ( input : any ) : input is ContentBlock [ ] {
return Array . isArray ( input ) && input . length > 0 && ( 'type' in input [ 0 ] )
}
/**
* Convert file/image blocks with path to base64 format for upstream API
*
* Converts images to base64 data URLs for Anthropic/OpenAI API compatibility.
* File attachments are converted to text mentions.
*/
async function convertContentBlocks ( blocks : ContentBlock [ ] ) : Promise < string > {
let contentStr = ''
for ( const block of blocks ) {
if ( block . type === 'text' ) {
contentStr += block . text
} else if ( block . type === 'image' ) {
contentStr += ` [Image: ${ block . path } ] `
} else if ( block . type === 'file' ) {
contentStr += ` [File: ${ block . path } ] `
}
}
return contentStr
}
2026-04-29 16:26:24 +08:00
const compressor = new ChatContextCompressor ( )
2026-04-30 16:40:37 +08:00
// --- Helper: Convert OpenAI format to Anthropic format ---
2026-05-02 20:12:58 +08:00
function convertHistoryFormat ( messages : any [ ] ) : any [ ] {
2026-04-30 16:40:37 +08:00
const result : any [ ] = [ ]
for ( const m of messages ) {
const role = m . role
const content = m . content || ''
2026-05-02 20:12:58 +08:00
delete m . reasoning_content
2026-04-30 16:40:37 +08:00
if ( role === 'tool' ) {
// Convert tool message to tool_result in user message
2026-05-02 08:58:14 +08:00
// Follow Hermes official format: content is a string (not array)
2026-05-02 20:12:58 +08:00
let pushItem = { . . . m }
pushItem . role = 'user'
pushItem . content = ` [Tool result: ${ content } ] `
result . push ( pushItem )
2026-04-30 16:40:37 +08:00
continue
}
// Regular user message
if ( role === 'user' ) {
2026-05-02 08:58:14 +08:00
// Format: { role: 'user', content: [{ type: 'text', text: '...' }] }
2026-04-30 16:40:37 +08:00
if ( typeof content === 'string' ) {
2026-05-02 20:12:58 +08:00
result . push ( { role : 'user' , content : content } )
2026-04-30 16:40:37 +08:00
} else if ( Array . isArray ( content ) ) {
2026-05-02 08:58:14 +08:00
// Already in array format, assume it's correct
2026-05-02 20:12:58 +08:00
result . push ( { role : 'user' , content : convertContentBlocks ( content ) } )
2026-04-30 16:40:37 +08:00
}
continue
}
2026-05-02 20:12:58 +08:00
if ( role === 'assistant' ) {
result . push ( { . . . m } )
continue
}
2026-04-30 16:40:37 +08:00
}
return result
}
2026-04-29 16:26:24 +08:00
// --- Session state tracking ---
interface SessionMessage {
id : number | string
session_id : string
role : string
content : string
2026-05-01 08:13:55 +08:00
hermesSessionId? : string
2026-04-29 16:26:24 +08:00
tool_call_id? : string | null
tool_calls? : any [ ] | null
tool_name? : string | null
timestamp : number
token_count? : number | null
finish_reason? : string | null
reasoning? : string | null
reasoning_details? : string | null
reasoning_content? : string | null
codex_reasoning_items? : string | null
}
2026-05-08 16:59:36 +02:00
type ApprovalChoice = 'once' | 'session' | 'always' | 'deny'
2026-05-07 10:34:58 +08:00
interface QueuedRun {
queue_id : string
input : string | ContentBlock [ ]
model? : string
instructions? : string
profile : string
}
2026-04-29 16:26:24 +08:00
interface SessionState {
messages : SessionMessage [ ]
isWorking : boolean
events : Array < { event : string ; data : any } >
abortController? : AbortController
2026-05-05 13:03:14 +08:00
eventSource? : EventSource
2026-04-29 16:26:24 +08:00
runId? : string
profile? : string
inputTokens? : number
outputTokens? : number
2026-05-05 13:03:14 +08:00
isAborting? : boolean
2026-05-07 10:34:58 +08:00
queue : QueuedRun [ ]
2026-04-29 16:26:24 +08:00
}
// --- ChatRunSocket ---
export class ChatRunSocket {
private nsp : ReturnType < Server [ 'of' ] >
private gatewayManager : any
/** sessionId → session state (messages, working status, events, run tracking) */
private sessionMap = new Map < string , SessionState > ( )
2026-05-01 08:13:55 +08:00
private hermesSessionIds = new Map < string , any > ( )
2026-04-29 16:26:24 +08:00
constructor ( io : Server , gatewayManager : any ) {
this . nsp = io . of ( '/chat-run' )
this . gatewayManager = gatewayManager
}
init() {
this . nsp . use ( this . authMiddleware . bind ( this ) )
this . nsp . on ( 'connection' , this . onConnection . bind ( this ) )
logger . info ( '[chat-run-socket] Socket.IO ready at /chat-run' )
}
// --- Auth middleware ---
private async authMiddleware ( socket : Socket , next : ( err? : Error ) = > void ) {
const token = socket . handshake . auth ? . token as string | undefined
if ( ! process . env . AUTH_DISABLED && process . env . AUTH_DISABLED !== '1' ) {
const { getToken } = await import ( '../auth' )
const serverToken = await getToken ( )
if ( serverToken && token !== serverToken ) {
return next ( new Error ( 'Authentication failed' ) )
}
}
next ( )
}
// --- Connection handler ---
private onConnection ( socket : Socket ) {
const profile = ( socket . handshake . query ? . profile as string ) || 'default'
socket . on ( 'run' , async ( data : {
2026-05-07 10:34:58 +08:00
input : string | ContentBlock [ ]
2026-04-29 16:26:24 +08:00
session_id? : string
model? : string
instructions? : string
2026-05-07 10:34:58 +08:00
queue_id? : string
2026-04-29 16:26:24 +08:00
} ) = > {
2026-05-07 10:34:58 +08:00
if ( data . session_id ) {
const state = this . getOrCreateSession ( data . session_id )
if ( state . isWorking ) {
state . queue . push ( {
queue_id : data.queue_id || ` queue_ ${ Date . now ( ) . toString ( 36 ) } _ ${ Math . random ( ) . toString ( 36 ) . slice ( 2 , 8 ) } ` ,
input : data.input ,
model : data.model ,
instructions : data.instructions ,
profile ,
} )
this . nsp . to ( ` session: ${ data . session_id } ` ) . emit ( 'run.queued' , {
event : 'run.queued' ,
session_id : data.session_id ,
queue_length : state.queue.length ,
} )
logger . info ( '[chat-run-socket] queued run for session %s (queue: %d)' , data . session_id , state . queue . length )
return
}
}
2026-04-29 16:26:24 +08:00
await this . handleRun ( socket , data , profile )
} )
2026-05-07 10:34:58 +08:00
socket . on ( 'cancel_queued_run' , ( data : { session_id? : string ; queue_id? : string } ) = > {
if ( ! data . session_id || ! data . queue_id ) return
const state = this . sessionMap . get ( data . session_id )
if ( ! state ? . queue . length ) return
const before = state . queue . length
state . queue = state . queue . filter ( item = > item . queue_id !== data . queue_id )
if ( state . queue . length === before ) return
this . nsp . to ( ` session: ${ data . session_id } ` ) . emit ( 'run.queued' , {
event : 'run.queued' ,
session_id : data.session_id ,
queue_length : state.queue.length ,
} )
logger . info ( '[chat-run-socket] cancelled queued run %s for session %s (queue: %d)' ,
data . queue_id , data . session_id , state . queue . length )
} )
2026-04-29 16:26:24 +08:00
socket . on ( 'resume' , async ( data : { session_id? : string } ) = > {
if ( ! data . session_id ) return
const sid = data . session_id
const room = ` session: ${ sid } `
socket . join ( room )
2026-04-30 16:40:37 +08:00
this . resumeSession ( socket , sid )
} )
2026-04-29 16:26:24 +08:00
2026-04-30 16:40:37 +08:00
socket . on ( 'abort' , ( data : { session_id? : string } ) = > {
if ( data . session_id ) {
2026-05-05 13:03:14 +08:00
void this . handleAbort ( socket , data . session_id )
2026-04-30 16:40:37 +08:00
}
} )
2026-05-08 16:59:36 +02:00
socket . on ( 'approval.respond' , ( data : { session_id? : string ; choice? : ApprovalChoice ; all? : boolean } ) = > {
const choice = data . choice
if ( data . session_id && ( choice === 'once' || choice === 'session' || choice === 'always' || choice === 'deny' ) ) {
void this . handleApprovalRespond ( socket , data . session_id , choice , data . all === true )
}
} )
2026-04-30 16:40:37 +08:00
}
2026-05-01 08:13:55 +08:00
private handleMessage ( messages : SessionMessage [ ] , sid : string ) : any [ ] {
let _messages = [ ]
2026-04-30 16:40:37 +08:00
try {
2026-05-01 08:13:55 +08:00
_messages = messages
. filter ( m = > ( m . role === 'user' || m . role === 'assistant' || m . role === 'tool' ) && m . content !== undefined )
. map ( ( m , idx , arr ) = > {
const msg : any = {
id : m.id ,
session_id : sid ,
role : m.role ,
content : m.content || '' ,
reasoning : m.reasoning || '' ,
timestamp : m.timestamp ,
}
// Convert Anthropic format content to OpenAI format
// Check if content is a stringified array (Hermes Gateway behavior) - only for assistant messages
if ( m . role === 'assistant' && typeof m . content === 'string' ) {
// Handle double-serialized content: "[{'type': 'text', ...}]" -> "[{'type': 'text', ...}]"
let contentToParse = m . content
const trimmed = m . content . trim ( )
if ( trimmed . startsWith ( '"' ) && trimmed . endsWith ( '"' ) && trimmed . length >= 2 ) {
contentToParse = trimmed . slice ( 1 , - 1 )
logger . info ( '[chat-run-socket] resume message %s: double-serialized, removed outer quotes' , m . id )
2026-04-30 16:40:37 +08:00
}
2026-04-29 19:54:51 +08:00
2026-05-01 08:13:55 +08:00
if ( contentToParse . startsWith ( '[' ) && contentToParse . endsWith ( ']' ) ) {
try {
2026-05-02 08:58:14 +08:00
// Use robust LLM JSON parser
const parsedContent = parseAnthropicContentArray ( contentToParse )
const textBlocks : string [ ] = [ ]
const toolCalls : any [ ] = [ ]
let reasoningContent : string | null = null
for ( const block of parsedContent ) {
if ( block . type === 'thinking' ) {
reasoningContent = block . thinking || null
} else if ( block . type === 'text' ) {
textBlocks . push ( block . text || '' )
} else if ( block . type === 'tool_use' ) {
toolCalls . push ( {
id : block.id ,
type : 'function' ,
function : {
name : block.name ,
arguments : typeof block . input === 'object' ? JSON . stringify ( block . input ) : ( block . input ? ? '{}' )
}
} )
2026-05-01 08:13:55 +08:00
}
2026-05-02 08:58:14 +08:00
}
2026-04-30 16:40:37 +08:00
2026-05-02 08:58:14 +08:00
msg . content = textBlocks . join ( '' ) || ''
if ( toolCalls . length > 0 ) {
msg . tool_calls = toolCalls
}
if ( reasoningContent ) {
msg . reasoning = reasoningContent
2026-04-30 16:40:37 +08:00
}
2026-05-01 08:13:55 +08:00
} catch ( e ) {
2026-05-02 08:58:14 +08:00
logger . warn ( e , '[chat-run-socket] failed to parse array content for message %s, keeping original' , m . id )
2026-05-01 08:13:55 +08:00
// Parsing failed, keep original content
msg . content = m . content
2026-04-30 16:40:37 +08:00
}
}
2026-05-01 08:13:55 +08:00
} else if ( Array . isArray ( m . content ) ) {
const textBlocks : string [ ] = [ ]
const toolCalls : any [ ] = [ ]
let reasoningContent : string | null = null
for ( const block of m . content ) {
if ( block . type === 'thinking' ) {
reasoningContent = block . thinking
} else if ( block . type === 'text' ) {
textBlocks . push ( block . text )
} else if ( block . type === 'tool_use' ) {
toolCalls . push ( {
id : block.id ,
type : 'function' ,
function : {
name : block.name ,
2026-05-02 08:58:14 +08:00
arguments : JSON.stringify ( block . input ? ? { } )
2026-04-30 16:40:37 +08:00
}
2026-05-01 08:13:55 +08:00
} )
2026-04-30 16:40:37 +08:00
}
}
2026-05-01 08:13:55 +08:00
msg . content = textBlocks . join ( '' ) || ''
if ( toolCalls . length > 0 ) {
msg . tool_calls = toolCalls
}
if ( reasoningContent ) {
msg . reasoning = reasoningContent
}
}
2026-04-30 16:40:37 +08:00
2026-05-01 08:13:55 +08:00
if ( m . tool_calls ? . length ) {
// Filter out tool_calls with empty/invalid id and remove internal fields
const cleanedToolCalls = m . tool_calls
. filter ( ( tc : any ) = > tc . id && tc . id . length > 0 )
. map ( ( tc : any ) = > ( {
id : tc.id ,
type : tc . type ,
function : tc . function
} ) )
if ( cleanedToolCalls . length > 0 ) {
msg . tool_calls = cleanedToolCalls
}
}
2026-04-30 16:40:37 +08:00
2026-05-01 08:13:55 +08:00
// For tool messages, ensure tool_call_id exists
if ( m . role === 'tool' ) {
let callId = m . tool_call_id
if ( ! callId || callId . length === 0 ) {
// Try to reconstruct tool_call_id from previous assistant message
const prevMsg = arr [ idx - 1 ]
if ( prevMsg ? . role === 'assistant' && prevMsg . tool_calls ? . length ) {
// Find matching tool_call by tool_name
const tc = prevMsg . tool_calls . find ( ( t : any ) = > t . function ? . name === m . tool_name )
if ( tc ? . id ) {
callId = tc . id
2026-04-30 16:40:37 +08:00
}
}
}
2026-05-01 08:13:55 +08:00
// Skip tool message if no valid tool_call_id
if ( ! callId || callId . length === 0 ) {
return null
}
msg . tool_call_id = callId
2026-04-29 16:26:24 +08:00
}
2026-05-01 08:13:55 +08:00
if ( m . tool_name ) msg . tool_name = m . tool_name
if ( m . reasoning ) msg . reasoning = m . reasoning
return msg
} )
. filter ( m = > m !== null )
} catch ( error ) {
}
return _messages
}
private async resumeSession ( socket : Socket , sid : string ) {
let state = this . sessionMap . get ( sid )
if ( ! state ) {
2026-05-07 10:34:58 +08:00
state = await this . loadSessionStateFromDb ( sid )
this . sessionMap . set ( sid , state )
2026-05-01 08:13:55 +08:00
}
2026-04-30 16:40:37 +08:00
socket . emit ( 'resumed' , {
session_id : sid ,
2026-05-01 08:13:55 +08:00
messages : state.messages ,
2026-04-30 16:40:37 +08:00
isWorking : state.isWorking ,
2026-05-05 13:03:14 +08:00
isAborting : state.isAborting || false ,
2026-04-30 16:40:37 +08:00
events : state.isWorking ? state . events : [ ] ,
inputTokens : state.inputTokens ,
outputTokens : state.outputTokens ,
2026-05-07 10:34:58 +08:00
queueLength : state.queue?.length || 0 ,
2026-04-29 16:26:24 +08:00
} )
2026-04-30 16:40:37 +08:00
logger . info ( '[chat-run-socket] socket %s resumed session %s (working: %s, messages: %d)' ,
socket . id , sid , state . isWorking , state . messages . length )
}
2026-05-07 10:34:58 +08:00
private async loadSessionStateFromDb ( sid : string ) : Promise < SessionState > {
try {
const detail = useLocalSessionStore ( )
? getSessionDetailPaginated ( sid )
: await getSessionDetailFromDb ( sid )
const messages = detail ? . messages ? this . handleMessage ( detail . messages , sid ) : [ ]
let inputTokens : number
let outputTokens : number
const snapshot = getCompressionSnapshot ( sid )
if ( snapshot ) {
const newMessages = messages . slice ( snapshot . lastMessageIndex + 1 )
inputTokens = countTokens ( SUMMARY_PREFIX + snapshot . summary ) +
newMessages . filter ( m = > m . role === 'user' ) . reduce ( ( sum , m ) = > sum + countTokens ( m . content || '' ) , 0 )
outputTokens = newMessages
. filter ( m = > m . role === 'assistant' || m . role === 'tool' )
. reduce ( ( sum , m ) = > sum + countTokens ( m . content || '' ) + countTokens ( m . tool_calls + '' || '' ) , 0 )
} else {
inputTokens = messages . filter ( m = > m . role === 'user' ) . reduce ( ( sum , m ) = > sum + countTokens ( m . content || '' ) , 0 )
outputTokens = messages
. filter ( m = > m . role === 'assistant' || m . role === 'tool' )
. reduce ( ( sum , m ) = > sum + countTokens ( m . content || '' ) + countTokens ( m . tool_calls + '' || '' ) , 0 )
}
logger . info ( '[chat-run-socket] loaded session %s from DB (%d messages)' , sid , messages . length )
return {
messages ,
isWorking : false ,
events : [ ] ,
inputTokens ,
outputTokens ,
queue : [ ] ,
}
} catch ( err ) {
logger . warn ( err , '[chat-run-socket] failed to load session %s from DB' , sid )
return { messages : [ ] , isWorking : false , events : [ ] , queue : [ ] }
}
}
2026-04-29 16:26:24 +08:00
// --- Run handler ---
private async handleRun (
socket : Socket ,
2026-05-02 15:39:01 +08:00
data : { input : string | ContentBlock [ ] ; session_id? : string ; model? : string ; instructions? : string } ,
2026-04-29 16:26:24 +08:00
profile : string ,
2026-05-07 10:34:58 +08:00
skipUserMessage = false ,
2026-04-29 16:26:24 +08:00
) {
const { input , session_id , model , instructions } = data
const upstream = this . gatewayManager . getUpstream ( profile ) . replace ( /\/$/ , '' )
const apiKey = this . gatewayManager . getApiKey ( profile ) || undefined
// Generate ephemeral session ID for Hermes (fresh session per run)
const hermesSessionId = session_id
? ` eph_ ${ Date . now ( ) . toString ( 36 ) } _ ${ Math . random ( ) . toString ( 36 ) . slice ( 2 , 8 ) } `
: undefined
const now = Math . floor ( Date . now ( ) / 1000 )
// Mark working immediately on run start, and append user message
if ( session_id ) {
2026-05-07 10:34:58 +08:00
let state = this . sessionMap . get ( session_id )
if ( ! state ) {
state = getSession ( session_id )
? await this . loadSessionStateFromDb ( session_id )
: { messages : [ ] , isWorking : false , events : [ ] , queue : [ ] }
this . sessionMap . set ( session_id , state )
}
2026-05-01 08:13:55 +08:00
this . hermesSessionIds . set ( session_id , hermesSessionId )
2026-04-29 16:26:24 +08:00
state . isWorking = true
state . profile = profile
2026-05-02 15:39:01 +08:00
2026-05-07 10:34:58 +08:00
if ( ! skipUserMessage ) {
// Convert ContentBlock[] to string for storage
const inputStr = contentBlocksToString ( input )
state . messages . push ( {
id : state.messages.length + 1 ,
session_id ,
hermesSessionId ,
role : 'user' ,
content : inputStr ,
timestamp : now ,
} )
2026-04-29 16:26:24 +08:00
2026-05-07 10:34:58 +08:00
// Create session in local DB if it doesn't exist
if ( ! getSession ( session_id ) ) {
const previewText = extractTextForPreview ( input )
const preview = previewText . replace ( /[\r\n]/g , ' ' ) . substring ( 0 , 100 )
createSession ( { id : session_id , profile , model , title : preview } )
}
2026-04-29 16:26:24 +08:00
2026-05-07 10:34:58 +08:00
// Write user message to local DB immediately
addMessage ( {
session_id ,
role : 'user' ,
content : inputStr ,
timestamp : now ,
} )
} else {
// Dequeued: write the user message into both memory and DB so the
// backend transcript keeps the same run boundary as the client.
const inputStr = contentBlocksToString ( input )
state . messages . push ( {
id : state.messages.length + 1 ,
session_id ,
hermesSessionId ,
role : 'user' ,
content : inputStr ,
timestamp : now ,
} )
if ( ! getSession ( session_id ) ) {
const previewText = extractTextForPreview ( input )
const preview = previewText . replace ( /[\r\n]/g , ' ' ) . substring ( 0 , 100 )
createSession ( { id : session_id , profile , model , title : preview } )
}
addMessage ( {
session_id ,
role : 'user' ,
content : inputStr ,
timestamp : now ,
} )
}
2026-04-29 16:26:24 +08:00
socket . join ( ` session: ${ session_id } ` )
}
// Emit helper: tag every payload with session_id
const emit = ( event : string , payload : any ) = > {
const tagged = session_id ? { . . . payload , session_id } : payload
if ( session_id ) {
this . nsp . to ( ` session: ${ session_id } ` ) . emit ( event , tagged )
} else if ( socket . connected ) {
socket . emit ( event , tagged )
}
}
try {
// Build upstream request body
const body : Record < string , any > = { input }
if ( hermesSessionId ) body . session_id = hermesSessionId
if ( model ) body . model = model
2026-05-04 19:48:40 +08:00
if ( instructions ) {
body . instructions = ` ${ getSystemPrompt ( ) } \ n ${ instructions } `
} else {
body . instructions = getSystemPrompt ( )
}
2026-04-30 20:17:38 +08:00
// Inject workspace context if set for this session
if ( session_id ) {
const sessionRow = getSession ( session_id )
if ( sessionRow ? . workspace ) {
const workspaceCtx = ` [Current working directory: ${ sessionRow . workspace } ] `
body . instructions = body . instructions
2026-05-04 19:48:40 +08:00
? ` \ n ${ workspaceCtx } \ n ${ body . instructions } `
: ` \ n ${ workspaceCtx } `
2026-04-30 20:17:38 +08:00
}
}
2026-04-29 16:26:24 +08:00
// Build conversation_history from DB if session_id is provided
if ( session_id ) {
try {
const detail = useLocalSessionStore ( )
? getSessionDetail ( session_id )
: await getSessionDetailFromDb ( session_id )
if ( detail ? . messages ? . length ) {
// Filter valid messages
const validMessages = detail . messages . filter ( m = >
( m . role === 'user' || m . role === 'assistant' || m . role === 'tool' ) && m . content !== undefined
)
// Exclude the last user message (just added in handleRun)
const lastUserMsgIndex = [ . . . validMessages ] . reverse ( ) . findIndex ( m = > m . role === 'user' )
let history : Array < {
role : string
content : string
tool_calls? : any [ ]
tool_call_id? : string
name? : string
2026-04-30 16:40:37 +08:00
reasoning_content? : string | null
2026-04-29 16:26:24 +08:00
} > = ( lastUserMsgIndex >= 0
2026-04-29 19:54:51 +08:00
? validMessages . slice ( 0 , validMessages . length - lastUserMsgIndex - 1 )
: validMessages
) . map ( ( m , idx , arr ) = > {
2026-05-01 08:13:55 +08:00
const msg : any = { role : m.role , content : m.content || '' }
2026-04-30 16:40:37 +08:00
if ( m . reasoning_content ) msg . reasoning_content = m . reasoning_content
if ( m . tool_calls ? . length ) {
// Filter out tool_calls with empty/invalid id and remove internal fields
const cleanedToolCalls = m . tool_calls
. filter ( ( tc : any ) = > tc . id && tc . id . length > 0 )
. map ( ( tc : any ) = > ( {
id : tc.id ,
type : tc . type ,
function : tc . function
} ) )
if ( cleanedToolCalls . length > 0 ) {
msg . tool_calls = cleanedToolCalls
}
}
2026-04-29 19:54:51 +08:00
// For tool messages, ensure tool_call_id exists
if ( m . role === 'tool' ) {
2026-04-30 16:40:37 +08:00
let callId = m . tool_call_id
if ( ! callId || callId . length === 0 ) {
2026-04-29 19:54:51 +08:00
// Try to reconstruct tool_call_id from previous assistant message
const prevMsg = arr [ idx - 1 ]
if ( prevMsg ? . role === 'assistant' && prevMsg . tool_calls ? . length ) {
const tc = prevMsg . tool_calls . find ( ( t : any ) = > t . function ? . name === m . tool_name )
if ( tc ? . id ) {
2026-04-30 16:40:37 +08:00
callId = tc . id
2026-04-29 19:54:51 +08:00
}
}
}
2026-04-30 16:40:37 +08:00
// Skip tool message if no valid tool_call_id
if ( ! callId || callId . length === 0 ) {
return null
}
msg . tool_call_id = callId
2026-04-29 19:54:51 +08:00
}
if ( m . tool_name ) msg . name = m . tool_name
return msg
} )
. filter ( m = > m !== null )
2026-04-29 16:26:24 +08:00
// Context compression with snapshot awareness
const contextLength = getModelContextLength ( profile )
const triggerTokens = Math . floor ( contextLength / 2 )
const cState = this . getOrCreateSession ( session_id )
// Calculate inputTokens + outputTokens from DB (unified method)
const assembledTokens = await this . calcAndUpdateUsage ( session_id , cState , emit )
const totalTokens = assembledTokens . inputTokens + assembledTokens . outputTokens
// Step 1: Check existing snapshot — if present, assemble summary + new messages
const snapshot = session_id ? getCompressionSnapshot ( session_id ) : null
if ( snapshot ) {
const newMessages = history . slice ( snapshot . lastMessageIndex + 1 )
logger . info ( '[context-compress] session=%s: snapshot at %d, %d new messages, assembled ~%d tokens (threshold %d)' ,
session_id , snapshot . lastMessageIndex , newMessages . length , totalTokens , triggerTokens )
2026-05-02 20:12:58 +08:00
// triggerTokens
2026-05-06 14:07:13 +08:00
if ( totalTokens <= triggerTokens && newMessages . length <= 200 ) {
2026-04-29 16:26:24 +08:00
// Under threshold — use assembled context directly, no LLM call needed
history = [
{ role : 'user' , content : SUMMARY_PREFIX + '\n\n' + snapshot . summary } ,
. . . newMessages ,
]
} else {
this . pushState ( session_id , 'compression.started' , {
event : 'compression.started' ,
message_count : newMessages.length ,
token_count : totalTokens ,
} )
emit ( 'compression.started' , {
event : 'compression.started' ,
message_count : newMessages.length ,
token_count : totalTokens ,
} )
try {
const result = await compressor . compress (
2026-04-30 16:40:37 +08:00
history , upstream , apiKey , session_id ,
2026-04-29 16:26:24 +08:00
)
const afterTokens = await this . calcAndUpdateUsage ( session_id , cState , emit )
this . replaceState ( session_id , 'compression.completed' , {
event : 'compression.completed' ,
compressed : result.meta.compressed ,
llmCompressed : result.meta.llmCompressed ,
totalMessages : result.meta.totalMessages ,
resultMessages : result.messages.length ,
beforeTokens : totalTokens ,
afterTokens : afterTokens.inputTokens + afterTokens . outputTokens ,
summaryTokens : result.meta.summaryTokenEstimate ,
verbatimCount : result.meta.verbatimCount ,
compressedStartIndex : result.meta.compressedStartIndex ,
} )
logger . info ( '[context-compress] AFTER session=%s: %d messages, ~%d tokens (was %d)' , session_id , result . messages . length , afterTokens . inputTokens + afterTokens . outputTokens , totalTokens )
emit ( 'compression.completed' , {
event : 'compression.completed' ,
compressed : result.meta.compressed ,
llmCompressed : result.meta.llmCompressed ,
totalMessages : result.meta.totalMessages ,
resultMessages : result.messages.length ,
beforeTokens : totalTokens ,
afterTokens : afterTokens.inputTokens + afterTokens . outputTokens ,
summaryTokens : result.meta.summaryTokenEstimate ,
verbatimCount : result.meta.verbatimCount ,
compressedStartIndex : result.meta.compressedStartIndex ,
} )
2026-04-30 16:40:37 +08:00
history = result . messages . map ( m = > {
const msg : any = {
role : m.role ,
content : m.content ,
tool_call_id : m.tool_call_id ,
name : m.name ,
}
if ( m . reasoning_content ) msg . reasoning_content = m . reasoning_content
// Filter tool_calls if present, remove internal fields
if ( m . tool_calls ? . length ) {
const cleanedToolCalls = m . tool_calls
. filter ( ( tc : any ) = > tc . id && tc . id . length > 0 )
. map ( ( tc : any ) = > ( {
id : tc.id ,
type : tc . type ,
function : tc . function
} ) )
if ( cleanedToolCalls . length > 0 ) {
msg . tool_calls = cleanedToolCalls
}
}
return msg
} )
2026-04-29 16:26:24 +08:00
// Update usage from DB (snapshot now updated by compressor)
await this . calcAndUpdateUsage ( session_id , cState , emit )
} catch ( err : any ) {
this . replaceState ( session_id , 'compression.completed' , {
event : 'compression.completed' ,
compressed : false ,
totalMessages : newMessages.length ,
resultMessages : newMessages.length ,
beforeTokens : totalTokens ,
afterTokens : totalTokens ,
summaryTokens : 0 ,
verbatimCount : newMessages.length ,
compressedStartIndex : - 1 ,
error : err.message ,
} )
logger . warn ( err , '[chat-run-socket] compression failed for session %s, using assembled context' , session_id )
emit ( 'compression.completed' , {
event : 'compression.completed' ,
compressed : false ,
totalMessages : newMessages.length ,
resultMessages : newMessages.length ,
beforeTokens : totalTokens ,
afterTokens : totalTokens ,
summaryTokens : 0 ,
verbatimCount : newMessages.length ,
compressedStartIndex : - 1 ,
error : err.message ,
} )
}
}
} else if ( history . length > 4 ) {
// No snapshot — check if raw history exceeds threshold
2026-05-06 14:07:13 +08:00
if ( totalTokens <= triggerTokens && history . length <= 200 ) {
2026-04-29 16:26:24 +08:00
// Under threshold — use raw history as-is
logger . info ( '[context-compress] session=%s: %d messages, ~%d tokens — under threshold, skip' , session_id , history . length , totalTokens )
} else {
// Over threshold — full LLM compression
logger . info ( '[context-compress] BEFORE session=%s: %d messages, ~%d tokens (threshold %d)' , session_id , history . length , totalTokens , triggerTokens )
this . pushState ( session_id , 'compression.started' , {
event : 'compression.started' ,
message_count : history.length ,
token_count : totalTokens ,
} )
emit ( 'compression.started' , {
event : 'compression.started' ,
message_count : history.length ,
token_count : totalTokens ,
} )
try {
const result = await compressor . compress (
2026-04-30 16:40:37 +08:00
history , upstream , apiKey , session_id ,
2026-04-29 16:26:24 +08:00
)
const cState = this . getOrCreateSession ( session_id )
const afterTokens = await this . calcAndUpdateUsage ( session_id , cState , emit )
this . replaceState ( session_id , 'compression.completed' , {
event : 'compression.completed' ,
compressed : result.meta.compressed ,
llmCompressed : result.meta.llmCompressed ,
totalMessages : result.meta.totalMessages ,
resultMessages : result.messages.length ,
beforeTokens : totalTokens ,
afterTokens : afterTokens.inputTokens + afterTokens . outputTokens ,
summaryTokens : result.meta.summaryTokenEstimate ,
verbatimCount : result.meta.verbatimCount ,
compressedStartIndex : result.meta.compressedStartIndex ,
} )
logger . info ( '[context-compress] AFTER session=%s: %d messages, ~%d tokens (was %d)' , session_id , result . messages . length , afterTokens . inputTokens + afterTokens . outputTokens , totalTokens )
emit ( 'compression.completed' , {
event : 'compression.completed' ,
compressed : result.meta.compressed ,
llmCompressed : result.meta.llmCompressed ,
totalMessages : result.meta.totalMessages ,
resultMessages : result.messages.length ,
beforeTokens : totalTokens ,
afterTokens : afterTokens.inputTokens + afterTokens . outputTokens ,
summaryTokens : result.meta.summaryTokenEstimate ,
verbatimCount : result.meta.verbatimCount ,
compressedStartIndex : result.meta.compressedStartIndex ,
} )
2026-04-30 16:40:37 +08:00
history = result . messages . map ( m = > {
const msg : any = {
role : m.role ,
content : m.content ,
tool_call_id : m.tool_call_id ,
name : m.name ,
}
if ( m . reasoning_content ) msg . reasoning_content = m . reasoning_content
// Filter tool_calls if present, remove internal fields
if ( m . tool_calls ? . length ) {
const cleanedToolCalls = m . tool_calls
. filter ( ( tc : any ) = > tc . id && tc . id . length > 0 )
. map ( ( tc : any ) = > ( {
id : tc.id ,
type : tc . type ,
function : tc . function
} ) )
if ( cleanedToolCalls . length > 0 ) {
msg . tool_calls = cleanedToolCalls
}
}
return msg
} )
2026-04-29 16:26:24 +08:00
await this . calcAndUpdateUsage ( session_id , cState , emit )
} catch ( err : any ) {
this . replaceState ( session_id , 'compression.completed' , {
event : 'compression.completed' ,
compressed : false ,
totalMessages : history.length ,
resultMessages : history.length ,
beforeTokens : totalTokens ,
afterTokens : totalTokens ,
summaryTokens : 0 ,
verbatimCount : history.length ,
compressedStartIndex : - 1 ,
error : err.message ,
} )
logger . warn ( err , '[chat-run-socket] compression failed for session %s, using raw history' , session_id )
emit ( 'compression.completed' , {
event : 'compression.completed' ,
compressed : false ,
totalMessages : history.length ,
resultMessages : history.length ,
beforeTokens : totalTokens ,
afterTokens : totalTokens ,
summaryTokens : 0 ,
verbatimCount : history.length ,
compressedStartIndex : - 1 ,
error : err.message ,
} )
}
}
}
body . conversation_history = history
}
} catch ( err ) {
logger . warn ( err , '[chat-run-socket] failed to load conversation history for session %s' , session_id )
}
}
const headers : Record < string , string > = { 'Content-Type' : 'application/json' }
if ( apiKey ) headers [ 'Authorization' ] = ` Bearer ${ apiKey } `
2026-05-08 16:59:36 +02:00
if ( session_id ) headers [ 'X-Hermes-Session-Key' ] = this . getGatewaySessionKey ( session_id )
2026-05-02 15:39:01 +08:00
// Convert input from ContentBlock[] to Anthropic format (with base64 images)
if ( isContentBlockArray ( input ) ) {
body . input = await convertContentBlocks ( input )
}
2026-04-30 16:40:37 +08:00
// Debug: write history to JSON file for analysis (before conversion)
// Convert conversation_history from OpenAI format to Anthropic format
if ( body . conversation_history && Array . isArray ( body . conversation_history ) ) {
2026-05-02 20:12:58 +08:00
body . conversation_history = convertHistoryFormat ( body . conversation_history )
2026-04-30 16:40:37 +08:00
}
2026-04-29 16:26:24 +08:00
const res = await fetch ( ` ${ upstream } /v1/runs ` , {
method : 'POST' ,
headers ,
body : JSON.stringify ( body ) ,
signal : AbortSignal.timeout ( 120 _000 ) ,
} )
if ( ! res . ok ) {
const text = await res . text ( ) . catch ( ( ) = > '' )
2026-05-07 10:34:58 +08:00
const queueLen = session_id ? this . sessionMap . get ( session_id ) ? . queue ? . length ? ? 0 : 0
if ( session_id ) await this . markCompleted ( socket , session_id , { event : 'run.failed' } )
emit ( 'run.failed' , { event : 'run.failed' , error : ` Upstream ${ res . status } : ${ text } ` , queue_remaining : queueLen } )
if ( session_id && queueLen > 0 ) this . dequeueNextQueuedRun ( socket , session_id )
2026-04-29 16:26:24 +08:00
return
}
const runData = await res . json ( ) as any
const runId = runData . run_id
if ( ! runId ) {
2026-05-07 10:34:58 +08:00
const queueLen = session_id ? this . sessionMap . get ( session_id ) ? . queue ? . length ? ? 0 : 0
if ( session_id ) await this . markCompleted ( socket , session_id , { event : 'run.failed' } )
emit ( 'run.failed' , { event : 'run.failed' , error : 'No run_id in upstream response' , queue_remaining : queueLen } )
if ( session_id && queueLen > 0 ) this . dequeueNextQueuedRun ( socket , session_id )
2026-04-29 16:26:24 +08:00
return
}
if ( session_id ) {
setRunSession ( runId , session_id )
}
const abortController = new AbortController ( )
if ( session_id ) {
const state = this . getOrCreateSession ( session_id )
state . isWorking = true
state . runId = runId
state . abortController = abortController
}
2026-05-07 10:34:58 +08:00
emit ( 'run.started' , {
event : 'run.started' ,
run_id : runId ,
status : runData.status ,
queue_length : session_id ? this . sessionMap . get ( session_id ) ? . queue . length || 0 : 0 ,
} )
2026-04-29 16:26:24 +08:00
// Stream upstream events via EventSource — survives socket disconnect
const eventsUrl = new URL ( ` ${ upstream } /v1/runs/ ${ runId } /events ` )
2026-04-29 23:09:18 +08:00
// Use Authorization header instead of query parameter for better compatibility
2026-05-08 16:59:36 +02:00
const eventSourceInit : any = ( apiKey || session_id ) ? {
2026-04-29 23:09:18 +08:00
fetch : ( url : string , init : any = { } ) = > fetch ( url , {
. . . init ,
headers : {
. . . ( init . headers || { } ) ,
2026-05-08 16:59:36 +02:00
. . . ( apiKey ? { Authorization : ` Bearer ${ apiKey } ` } : { } ) ,
. . . ( session_id ? { 'X-Hermes-Session-Key' : this . getGatewaySessionKey ( session_id ) } : { } ) ,
2026-04-29 23:09:18 +08:00
} ,
} ) ,
} : { }
// @ts-ignore - eventsource library types are too strict
const source = new EventSource ( eventsUrl . toString ( ) , eventSourceInit )
2026-05-05 13:03:14 +08:00
if ( session_id ) {
const state = this . getOrCreateSession ( session_id )
state . eventSource = source
}
2026-04-29 16:26:24 +08:00
2026-05-07 10:34:58 +08:00
source . onmessage = async ( event : MessageEvent ) = > {
2026-04-29 16:26:24 +08:00
try {
const parsed = JSON . parse ( event . data as string )
2026-04-30 16:40:37 +08:00
// Debug: log all events from upstream
if ( parsed . event ? . includes ( 'reasoning' ) || parsed . event ? . includes ( 'thinking' ) ) {
logger . info ( '[chat-run-socket] upstream event: %s, data: %j' , parsed . event , parsed )
} else {
logger . info ( '[chat-run-socket] upstream event: %s' , parsed . event )
}
2026-04-29 16:26:24 +08:00
2026-05-08 16:59:36 +02:00
// Surface structured approval protocol events immediately instead of
// letting the client sit in a silent working state. New API-server
// builds emit approval.request; older payloads with approval_required
// are normalized only for compatibility while the upstream rolls out.
const approvalPayload = this . normalizeApprovalRequest ( parsed , runId )
if ( approvalPayload ) {
if ( session_id ) this . replaceState ( session_id , 'approval.request' , approvalPayload )
emit ( 'approval.request' , approvalPayload )
if ( parsed . event === 'approval_required' || parsed . event === 'approval.requested' || parsed . event === 'approval.request' ) {
return
}
}
2026-04-29 16:26:24 +08:00
// Track messages into sessionMap
if ( session_id ) {
const state = this . sessionMap . get ( session_id )
if ( state ) {
const msgs = state . messages
2026-05-07 10:34:58 +08:00
const last = [ . . . msgs ] . reverse ( ) . find ( m = > m . hermesSessionId === hermesSessionId )
2026-04-29 16:26:24 +08:00
switch ( parsed . event ) {
case 'message.delta' : {
2026-05-02 08:58:14 +08:00
let deltaText = parsed . delta || ''
// Try to extract text from JSON delta (e.g., "[{\"type\":\"text\",\"text\":\"hello\"}]")
if ( deltaText . trim ( ) . startsWith ( '[' ) && deltaText . trim ( ) . endsWith ( ']' ) ) {
try {
const parsedDelta = parseAnthropicContentArray ( deltaText )
const textParts = parsedDelta
. filter ( ( b : any ) = > b . type === 'text' )
. map ( ( b : any ) = > b . text || '' )
deltaText = textParts . join ( '' )
} catch {
// If parsing fails, use delta as-is
}
}
2026-04-29 16:26:24 +08:00
if ( last ? . role === 'assistant' && last . finish_reason == null ) {
2026-05-02 08:58:14 +08:00
last . content += deltaText
2026-04-29 16:26:24 +08:00
} else {
msgs . push ( {
id : msgs.length + 1 ,
session_id ,
2026-05-01 08:13:55 +08:00
hermesSessionId ,
2026-04-29 16:26:24 +08:00
role : 'assistant' ,
2026-05-02 08:58:14 +08:00
content : deltaText ,
2026-04-29 16:26:24 +08:00
timestamp : Math.floor ( Date . now ( ) / 1000 ) ,
} )
}
break
}
case 'reasoning.delta' :
case 'thinking.delta' : {
const text = parsed . text || parsed . delta || ''
if ( ! text ) break
if ( last ? . role === 'assistant' && last . finish_reason == null ) {
last . reasoning = ( last . reasoning || '' ) + text
} else {
msgs . push ( {
id : msgs.length + 1 ,
session_id ,
role : 'assistant' ,
2026-05-01 08:13:55 +08:00
hermesSessionId ,
2026-04-29 16:26:24 +08:00
content : '' ,
reasoning : text ,
timestamp : Math.floor ( Date . now ( ) / 1000 ) ,
} )
}
break
}
case 'tool.started' : {
if ( last ? . role === 'assistant' && last . finish_reason == null ) {
last . finish_reason = 'tool_calls'
}
msgs . push ( {
id : msgs.length + 1 ,
session_id ,
role : 'tool' ,
2026-05-01 08:13:55 +08:00
hermesSessionId ,
2026-04-29 16:26:24 +08:00
content : '' ,
tool_call_id : parsed.tool_call_id || null ,
tool_name : parsed.tool || parsed . name || null ,
timestamp : Math.floor ( Date . now ( ) / 1000 ) ,
} )
break
}
case 'tool.completed' : {
2026-05-07 10:34:58 +08:00
const toolMsg = [ . . . msgs ] . reverse ( ) . find ( m = >
m . hermesSessionId === hermesSessionId && m . role === 'tool' && ! m . content
)
2026-04-29 16:26:24 +08:00
if ( toolMsg && parsed . output ) {
toolMsg . content = typeof parsed . output === 'string' ? parsed.output : JSON.stringify ( parsed . output )
}
break
}
case 'run.completed' : {
2026-04-30 16:40:37 +08:00
logger . info ( '[chat-run-socket] ENTER run.completed case, session_id: %s, messages: %d' ,
session_id , msgs . length )
2026-04-29 16:26:24 +08:00
if ( last ? . role === 'assistant' && last . finish_reason == null ) {
last . finish_reason = parsed . finish_reason || 'stop'
}
2026-04-30 16:40:37 +08:00
// Debug: log run.completed to check if reasoning is included
logger . info ( '[chat-run-socket] run.completed keys: %s' , Object . keys ( parsed ) )
2026-04-29 16:26:24 +08:00
// Finalize assistant message — if no content was streamed, use output
2026-05-07 10:34:58 +08:00
if ( parsed . output && ! runProducedAssistantText ( msgs , hermesSessionId ) ) {
2026-05-02 08:58:14 +08:00
let outputContent = parsed . output
// Parse output if it's a stringified array
if ( typeof outputContent === 'string' &&
outputContent . trim ( ) . startsWith ( '[' ) &&
outputContent . trim ( ) . endsWith ( ']' ) ) {
try {
const parsedOutput = parseAnthropicContentArray ( outputContent )
const textParts = parsedOutput
. filter ( ( b : any ) = > b . type === 'text' )
. map ( ( b : any ) = > b . text || '' )
outputContent = textParts . join ( '' )
} catch {
// If parsing fails, use output as-is
}
}
2026-04-29 16:26:24 +08:00
if ( last ? . role === 'assistant' ) {
2026-05-02 08:58:14 +08:00
last . content = outputContent
2026-04-29 16:26:24 +08:00
} else {
msgs . push ( {
id : msgs.length + 1 ,
session_id ,
2026-05-01 08:13:55 +08:00
hermesSessionId ,
2026-04-29 16:26:24 +08:00
role : 'assistant' ,
2026-05-02 08:58:14 +08:00
content : outputContent ,
2026-04-29 16:26:24 +08:00
timestamp : Math.floor ( Date . now ( ) / 1000 ) ,
} )
}
}
2026-04-30 16:40:37 +08:00
2026-05-02 08:58:14 +08:00
// Always parse output if it's an array format (for parsed_content field)
// Only extract text content (tool_calls and reasoning are already sent via other events)
if ( parsed . output && typeof parsed . output === 'string' &&
parsed . output . trim ( ) . startsWith ( '[' ) && parsed . output . trim ( ) . endsWith ( ']' ) ) {
try {
const parsedOutput = parseAnthropicContentArray ( parsed . output )
const textParts = parsedOutput
. filter ( ( b : any ) = > b . type === 'text' )
. map ( ( b : any ) = > b . text || '' )
// Set parsed_content for frontend (only text content)
parsed . parsed_content = textParts . join ( '' ) || ''
logger . info ( '[chat-run-socket] parsed output from run.completed event' )
} catch ( e ) {
logger . error ( e , '[chat-run-socket] failed to parse output from run.completed' )
}
}
2026-04-30 16:40:37 +08:00
// Parse stringified array content for all assistant messages
2026-05-02 08:58:14 +08:00
// Only extract text content (tool_calls and reasoning are already in message fields)
2026-04-30 16:40:37 +08:00
let parsedCount = 0
for ( const msg of msgs ) {
2026-05-07 10:34:58 +08:00
if ( msg . hermesSessionId === hermesSessionId &&
msg . role === 'assistant' && typeof msg . content === 'string' &&
2026-04-30 16:40:37 +08:00
msg . content . trim ( ) . startsWith ( '[' ) && msg . content . trim ( ) . endsWith ( ']' ) ) {
try {
logger . info ( '[chat-run-socket] parsing array content for message %s, content preview: %s' ,
msg . id , msg . content . slice ( 0 , 100 ) )
2026-05-02 08:58:14 +08:00
const parsedContent = parseAnthropicContentArray ( msg . content )
const textBlocks = parsedContent
. filter ( ( b : any ) = > b . type === 'text' )
. map ( ( b : any ) = > b . text || '' )
msg . content = textBlocks . join ( '' ) || ''
parsedCount ++
2026-04-30 16:40:37 +08:00
} catch ( e ) {
logger . error ( e , '[chat-run-socket] failed to parse array content for message %s' , msg . id )
}
}
}
logger . info ( '[chat-run-socket] EXIT run.completed case, parsed %d messages' , parsedCount )
// Attach the last assistant message's parsed content to fix stringified array format
2026-05-07 10:34:58 +08:00
const lastAssistantMsg = msgs . filter ( ( m : any ) = >
m . hermesSessionId === hermesSessionId && m . role === 'assistant'
) . pop ( )
2026-04-30 16:40:37 +08:00
if ( lastAssistantMsg && parsedCount > 0 ) {
parsed . parsed_content = lastAssistantMsg . content || ''
parsed . parsed_tool_calls = lastAssistantMsg . tool_calls || null
parsed . parsed_reasoning = lastAssistantMsg . reasoning || null
logger . info ( '[chat-run-socket] attached parsed content to run.completed event for message %s' , lastAssistantMsg . id )
}
2026-04-29 16:26:24 +08:00
break
}
}
}
}
if ( parsed . event === 'run.completed' || parsed . event === 'run.failed' ) {
source . close ( )
2026-05-05 13:03:14 +08:00
if ( session_id && this . sessionMap . get ( session_id ) ? . isAborting ) {
logger . info ( {
sessionId : session_id ,
runId : parsed.run_id ,
event : parsed.event ,
} , '[chat-run-socket][abort] suppressing upstream terminal event during abort' )
return
}
2026-05-07 10:34:58 +08:00
const queueLen = session_id ? this . sessionMap . get ( session_id ) ? . queue ? . length ? ? 0 : 0
if ( session_id ) await this . markCompleted ( socket , session_id , { event : parsed.event , run_id : parsed.run_id } )
// Tag the event with queue_remaining so frontend knows more runs are pending
parsed . queue_remaining = queueLen
emit ( parsed . event || 'message' , parsed )
if ( session_id && queueLen > 0 ) {
this . dequeueNextQueuedRun ( socket , session_id )
}
return
2026-04-29 16:26:24 +08:00
}
2026-05-05 13:03:14 +08:00
// Usage will be calculated after syncFromHermes completes (in markCompleted)
emit ( parsed . event || 'message' , parsed )
2026-04-29 16:26:24 +08:00
} catch { /* not JSON, skip */ }
}
source . onerror = ( ) = > {
source . close ( )
2026-05-05 13:03:14 +08:00
if ( session_id && this . sessionMap . get ( session_id ) ? . isAborting ) {
logger . info ( { sessionId : session_id } , '[chat-run-socket][abort] event source closed during abort' )
return
}
2026-05-07 10:34:58 +08:00
const queueLen = session_id ? this . sessionMap . get ( session_id ) ? . queue ? . length ? ? 0 : 0
if ( session_id ) {
void this . markCompleted ( socket , session_id , { event : 'run.failed' } ) . then ( ( ) = > {
emit ( 'run.failed' , { event : 'run.failed' , error : 'EventSource connection lost' , queue_remaining : queueLen } )
if ( queueLen > 0 ) this . dequeueNextQueuedRun ( socket , session_id )
} )
} else {
emit ( 'run.failed' , { event : 'run.failed' , error : 'EventSource connection lost' } )
}
2026-04-29 16:26:24 +08:00
}
} catch ( err : any ) {
2026-05-07 10:34:58 +08:00
const queueLen = session_id ? this . sessionMap . get ( session_id ) ? . queue ? . length ? ? 0 : 0
if ( session_id ) {
void this . markCompleted ( socket , session_id , { event : 'run.failed' } ) . then ( ( ) = > {
emit ( 'run.failed' , { event : 'run.failed' , error : err.message , queue_remaining : queueLen } )
if ( queueLen > 0 ) this . dequeueNextQueuedRun ( socket , session_id )
} )
} else {
emit ( 'run.failed' , { event : 'run.failed' , error : err.message } )
}
2026-04-29 16:26:24 +08:00
}
}
// --- Abort handler ---
2026-05-05 13:03:14 +08:00
private async handleAbort ( socket : Socket , sessionId : string ) {
2026-04-29 16:26:24 +08:00
const state = this . sessionMap . get ( sessionId )
2026-05-05 13:03:14 +08:00
if ( ! state ? . isWorking || ! state . runId ) {
logger . info ( { sessionId } , '[chat-run-socket][abort] ignored: no active run' )
2026-05-07 10:34:58 +08:00
if ( state ) {
state . isWorking = false
state . isAborting = false
state . abortController = undefined
state . eventSource = undefined
state . runId = undefined
state . events = [ ]
}
this . emitToSession ( socket , sessionId , 'abort.completed' , {
event : 'abort.completed' ,
synced : false ,
ignored : true ,
} )
2026-05-05 13:03:14 +08:00
return
}
const runId = state . runId
state . isAborting = true
this . replaceState ( sessionId , 'abort.started' , {
event : 'abort.started' ,
run_id : runId ,
graceMs : 5000 ,
} )
this . emitToSession ( socket , sessionId , 'abort.started' , {
event : 'abort.started' ,
run_id : runId ,
graceMs : 5000 ,
} )
logger . info ( { sessionId , runId } , '[chat-run-socket][abort] started' )
// Call upstream stop endpoint
const profile = state . profile || 'default'
const upstream = this . gatewayManager . getUpstream ( profile ) . replace ( /\/$/ , '' )
const apiKey = this . gatewayManager . getApiKey ( profile ) || undefined
try {
2026-05-08 16:59:36 +02:00
const headers : Record < string , string > = {
'Content-Type' : 'application/json' ,
'X-Hermes-Session-Key' : this . getGatewaySessionKey ( sessionId ) ,
}
if ( apiKey ) {
headers [ 'Authorization' ] = ` Bearer ${ apiKey } `
}
2026-05-05 13:03:14 +08:00
logger . info ( { sessionId , runId , upstream } , '[chat-run-socket][abort] calling upstream stop' )
await fetch ( ` ${ upstream } /v1/runs/ ${ runId } /stop ` , {
method : 'POST' ,
headers ,
2026-05-07 10:34:58 +08:00
signal : AbortSignal.timeout ( 10 _000 ) ,
2026-05-05 13:03:14 +08:00
} )
logger . info ( '[chat-run-socket] called upstream stop for run %s (session: %s)' , runId , sessionId )
logger . info ( { sessionId , runId , graceMs : 5000 } , '[chat-run-socket][abort] upstream stop accepted, waiting for graceful exit' )
// Wait for upstream to process the stop request
await new Promise ( resolve = > setTimeout ( resolve , 5000 ) )
} catch ( err : any ) {
logger . warn ( err , '[chat-run-socket] failed to call upstream stop for run %s (session: %s)' , runId , sessionId )
logger . warn ( { sessionId , runId , error : err?.message } , '[chat-run-socket][abort] upstream stop failed, continuing local completion' )
}
// Close local EventSource connection after the upstream grace period.
if ( state . eventSource ) {
state . eventSource . close ( )
state . eventSource = undefined
logger . info ( { sessionId , runId } , '[chat-run-socket][abort] event source closed' )
}
if ( state . abortController ) {
2026-04-29 16:26:24 +08:00
state . abortController . abort ( )
}
2026-05-05 13:03:14 +08:00
await this . markAbortCompleted ( socket , sessionId , runId )
2026-04-29 16:26:24 +08:00
}
2026-05-08 16:59:36 +02:00
private async handleApprovalRespond ( socket : Socket , sessionId : string , choice : ApprovalChoice , all : boolean ) {
const state = this . sessionMap . get ( sessionId )
const runId = state ? . runId
if ( ! state ? . isWorking || ! runId ) {
this . emitToSession ( socket , sessionId , 'approval.responded' , {
event : 'approval.responded' ,
choice ,
all ,
resolved : 0 ,
error : 'No active run for this session' ,
} )
return
}
const profile = state . profile || 'default'
const upstream = this . gatewayManager . getUpstream ( profile ) . replace ( /\/$/ , '' )
const apiKey = this . gatewayManager . getApiKey ( profile ) || undefined
const sessionKey = this . getGatewaySessionKey ( sessionId )
const headers : Record < string , string > = {
'Content-Type' : 'application/json' ,
'X-Hermes-Session-Key' : sessionKey ,
}
if ( apiKey ) headers [ 'Authorization' ] = ` Bearer ${ apiKey } `
const unsupported = await this . getApprovalCapabilityError ( upstream , headers )
if ( unsupported ) {
const event = {
event : 'approval.responded' ,
run_id : runId ,
choice ,
all ,
resolved : 0 ,
error : unsupported ,
}
this . replaceState ( sessionId , 'approval.responded' , event )
this . emitToSession ( socket , sessionId , 'approval.responded' , event )
return
}
let resolved = 0
let error = ''
try {
const res = await fetch ( ` ${ upstream } /v1/runs/ ${ runId } /approval ` , {
method : 'POST' ,
headers ,
body : JSON.stringify ( { choice , all } ) ,
signal : AbortSignal.timeout ( 30 _000 ) ,
} )
if ( ! res . ok ) {
const text = await res . text ( ) . catch ( ( ) = > '' )
error = ` Upstream ${ res . status } : ${ text } `
} else {
const json = await res . json ( ) . catch ( ( ) = > ( { } ) ) as any
resolved = Number ( json . resolved ? ? 0 ) || 0
}
} catch ( err : any ) {
error = err ? . message || 'Approval resolve failed'
}
const event = {
event : 'approval.responded' ,
run_id : runId ,
choice ,
all ,
resolved ,
. . . ( error ? { error } : { } ) ,
}
this . replaceState ( sessionId , 'approval.responded' , event )
this . emitToSession ( socket , sessionId , 'approval.responded' , event )
}
2026-04-29 16:26:24 +08:00
/** Mark a session run as completed/failed so reconnecting clients get notified */
2026-05-07 10:34:58 +08:00
private async markCompleted ( socket : Socket , sessionId : string , _info : { event : string ; run_id? : string } ) {
2026-04-29 16:26:24 +08:00
const state = this . sessionMap . get ( sessionId )
if ( state ) {
2026-05-05 13:03:14 +08:00
if ( state . isAborting ) {
logger . info ( {
sessionId ,
runId : state.runId ,
} , '[chat-run-socket][abort] terminal upstream event observed; abort handler will finish cleanup' )
return
}
2026-04-29 16:26:24 +08:00
state . isWorking = false
state . abortController = undefined
2026-05-05 13:03:14 +08:00
state . eventSource = undefined
2026-04-29 16:26:24 +08:00
state . runId = undefined
state . events = [ ]
// Sync messages from Hermes ephemeral session to local DB
2026-05-01 08:13:55 +08:00
if ( useLocalSessionStore ( ) && this . hermesSessionIds . get ( sessionId ) ) {
const hermesId = this . hermesSessionIds . get ( sessionId )
2026-04-29 16:26:24 +08:00
const prof = state . profile
2026-05-01 08:13:55 +08:00
this . hermesSessionIds . delete ( sessionId )
2026-04-29 16:26:24 +08:00
state . profile = undefined
2026-05-08 19:55:55 +08:00
await this . syncFromHermes ( socket , sessionId , hermesId , prof , {
maxAttempts : 4 ,
delayMs : 1000 ,
} )
2026-04-29 16:26:24 +08:00
}
2026-05-07 10:34:58 +08:00
2026-04-29 16:26:24 +08:00
}
}
2026-05-07 10:34:58 +08:00
private dequeueNextQueuedRun ( socket : Socket , sessionId : string , fallbackProfile = 'default' ) {
const state = this . sessionMap . get ( sessionId )
if ( ! state ? . queue . length ) return false
const next = state . queue . shift ( ) !
logger . info ( '[chat-run-socket] dequeuing queued run for session %s (remaining: %d)' , sessionId , state . queue . length )
this . nsp . to ( ` session: ${ sessionId } ` ) . emit ( 'run.queued' , {
event : 'run.queued' ,
session_id : sessionId ,
queue_length : state.queue.length ,
} )
void this . handleRun ( socket , {
input : next.input ,
session_id : sessionId ,
model : next.model ,
instructions : next.instructions ,
} , next . profile || fallbackProfile , true )
return true
}
2026-05-05 13:03:14 +08:00
private async markAbortCompleted ( socket : Socket , sessionId : string , runId : string ) {
const state = this . sessionMap . get ( sessionId )
if ( ! state ) return
const hermesId = this . hermesSessionIds . get ( sessionId )
const profile = state . profile
let synced = false
if ( useLocalSessionStore ( ) && hermesId ) {
this . hermesSessionIds . delete ( sessionId )
logger . info ( { sessionId , hermesId , profile : profile || 'default' } , '[chat-run-socket][abort] syncing stopped run from Hermes' )
synced = await this . syncFromHermes ( socket , sessionId , hermesId , profile , {
2026-05-08 19:55:55 +08:00
maxAttempts : 4 ,
2026-05-05 13:03:14 +08:00
delayMs : 1000 ,
} )
}
state . isWorking = false
state . isAborting = false
state . profile = undefined
state . abortController = undefined
state . eventSource = undefined
state . runId = undefined
2026-05-07 10:34:58 +08:00
// Process queued messages after abort completes
if ( state . queue . length > 0 ) {
const next = state . queue . shift ( ) !
logger . info ( '[chat-run-socket][abort] dequeuing queued run for session %s (remaining: %d)' , sessionId , state . queue . length )
this . replaceState ( sessionId , 'abort.completed' , {
event : 'abort.completed' ,
run_id : runId ,
synced ,
queue_length : state.queue.length + 1 ,
} )
this . emitToSession ( socket , sessionId , 'abort.completed' , {
event : 'abort.completed' ,
run_id : runId ,
synced ,
queue_length : state.queue.length + 1 ,
} )
this . emitToSession ( socket , sessionId , 'run.queued' , {
event : 'run.queued' ,
queue_length : state.queue.length ,
} )
state . events = [ ]
void this . handleRun ( socket , {
input : next.input ,
session_id : sessionId ,
model : next.model ,
instructions : next.instructions ,
} , next . profile || profile || 'default' , true )
return
}
state . events = [ ]
2026-05-05 13:03:14 +08:00
this . replaceState ( sessionId , 'abort.completed' , {
event : 'abort.completed' ,
run_id : runId ,
synced ,
} )
this . emitToSession ( socket , sessionId , 'abort.completed' , {
event : 'abort.completed' ,
run_id : runId ,
synced ,
} )
logger . info ( { sessionId , runId , synced } , '[chat-run-socket][abort] completed' )
}
2026-04-29 16:26:24 +08:00
/**
* Calculate usage from DB and update state + emit to clients.
* @returns { inputTokens, outputTokens } for the caller to use
*/
private async calcAndUpdateUsage (
sid : string , state : SessionState , emit : ( event : string , payload : any ) = > void ,
) : Promise < { inputTokens : number ; outputTokens : number } > {
try {
const detail = useLocalSessionStore ( )
? getSessionDetail ( sid )
: await getSessionDetailFromDb ( sid )
const msgs = detail ? . messages
? . filter ( m = > m . role === 'user' || m . role === 'assistant' || m . role === 'tool' ) || [ ]
const snapshot = getCompressionSnapshot ( sid )
let inputTokens : number
2026-05-02 20:12:58 +08:00
let outputTokens : number
2026-04-29 16:26:24 +08:00
if ( snapshot && msgs . length ) {
const newMessages = msgs . slice ( snapshot . lastMessageIndex + 1 )
inputTokens = countTokens ( SUMMARY_PREFIX + snapshot . summary ) +
2026-05-02 20:12:58 +08:00
newMessages . filter ( m = > m . role === 'user' ) . reduce ( ( sum , m ) = > sum + countTokens ( m . content || '' ) , 0 )
outputTokens = newMessages
. filter ( m = > m . role === 'assistant' || m . role === 'tool' )
. reduce ( ( sum , m ) = > sum + countTokens ( m . content || '' ) + countTokens ( m . tool_calls + '' || '' ) , 0 )
2026-04-29 16:26:24 +08:00
} else {
2026-05-02 20:12:58 +08:00
inputTokens = msgs . filter ( m = > m . role === 'user' ) . reduce ( ( sum , m ) = > sum + countTokens ( m . content || '' ) , 0 )
outputTokens = msgs
. filter ( m = > m . role === 'assistant' || m . role === 'tool' )
. reduce ( ( sum , m ) = > sum + countTokens ( m . content || '' ) + countTokens ( m . tool_calls + '' || '' ) , 0 )
2026-04-29 16:26:24 +08:00
}
state . inputTokens = inputTokens
state . outputTokens = outputTokens
emit ( 'usage.updated' , {
event : 'usage.updated' ,
session_id : sid ,
inputTokens ,
outputTokens ,
} )
return { inputTokens , outputTokens }
} catch ( err : any ) {
logger . warn ( err , '[chat-run-socket] failed to calculate usage for session %s' , sid )
return { inputTokens : 0 , outputTokens : 0 }
}
}
/**
* Read complete messages from Hermes state.db for the ephemeral session
* and write to local DB. This gives us tool results that SSE events don't include.
* After sync, enqueues the ephemeral session for deletion.
*/
2026-05-05 13:03:14 +08:00
private async syncFromHermes (
socket : Socket ,
localSessionId : string ,
hermesSessionId : string ,
profile? : string ,
options ? : { maxAttempts? : number ; delayMs? : number } ,
) : Promise < boolean > {
const maxAttempts = options ? . maxAttempts || 1
const delayMs = options ? . delayMs || 0
try {
let detail : Awaited < ReturnType < typeof getSessionDetailFromDb > > | null = null
for ( let attempt = 1 ; attempt <= maxAttempts ; attempt ++ ) {
detail = await getSessionDetailFromDb ( hermesSessionId )
2026-04-29 16:26:24 +08:00
if ( ! detail || ! detail . messages ? . length ) {
2026-05-05 13:03:14 +08:00
logger . warn ( '[chat-run-socket] syncFromHermes: no data for Hermes session %s (attempt %d/%d)' , hermesSessionId , attempt , maxAttempts )
logger . info ( { localSessionId , hermesSessionId , attempt , maxAttempts } , '[chat-run-socket][abort] sync waiting for Hermes data' )
if ( attempt < maxAttempts && delayMs > 0 ) {
await new Promise ( resolve = > setTimeout ( resolve , delayMs ) )
continue
}
this . enqueueEphemeralDelete ( hermesSessionId , profile )
return false
2026-04-29 16:26:24 +08:00
}
2026-05-05 13:03:14 +08:00
break
}
if ( ! detail ) return false
2026-05-07 10:34:58 +08:00
// Skip user messages for DB insert; they are already written in handleRun.
// Keep them in memory replacement so replacing an ephemeral run does not
// delete the queued user message from state.messages.
2026-05-05 13:03:14 +08:00
const toInsert = detail . messages . filter ( m = > m . role !== 'user' )
2026-05-07 10:34:58 +08:00
const toReplaceInMemory = detail . messages
2026-05-05 13:03:14 +08:00
// Build tool_call_id → function.name lookup from assistant messages
// (Hermes stores tool_name as NULL, name lives inside tool_calls JSON)
const toolNameMap = new Map < string , string > ( )
for ( const msg of detail . messages ) {
if ( msg . role === 'assistant' && Array . isArray ( msg . tool_calls ) ) {
for ( const tc of msg . tool_calls ) {
const id = tc . id || tc . call_id || tc . tool_call_id
const name = tc . function ? . name || tc . name
if ( id && name ) toolNameMap . set ( id , name )
2026-04-29 16:26:24 +08:00
}
}
2026-05-05 13:03:14 +08:00
}
2026-04-29 16:26:24 +08:00
2026-05-05 13:03:14 +08:00
if ( toInsert . length > 0 ) {
// Get in-memory messages to preserve reasoning that was streamed via SSE
const state = this . sessionMap . get ( localSessionId )
const memoryMessages = state ? . messages || [ ]
logger . info ( '[chat-run-socket] syncFromHermes: memory has %d messages, DB has %d messages' ,
memoryMessages . length , toInsert . length )
// Match messages by order since Hermes DB and memory should have same sequence
let memoryIdx = 0
let mergedCount = 0
for ( let i = 0 ; i < toInsert . length && memoryIdx < memoryMessages . length ; i ++ ) {
const dbMsg = toInsert [ i ]
// Skip user messages in memory when matching
while ( memoryIdx < memoryMessages . length && memoryMessages [ memoryIdx ] . role === 'user' ) {
2026-04-30 16:40:37 +08:00
memoryIdx ++
}
2026-05-05 13:03:14 +08:00
if ( memoryIdx >= memoryMessages . length ) break
const memoryMsg = memoryMessages [ memoryIdx ]
// Only merge if roles match
if ( dbMsg . role === memoryMsg . role ) {
// Merge reasoning from memory if DB doesn't have it
if ( ! dbMsg . reasoning && memoryMsg . reasoning ) {
dbMsg . reasoning = memoryMsg . reasoning
mergedCount ++
logger . info ( '[chat-run-socket] syncFromHermes: merged reasoning from memory to DB for %s message at index %d' ,
dbMsg . role , i )
2026-05-02 20:12:58 +08:00
}
2026-05-05 13:03:14 +08:00
}
memoryIdx ++
2026-04-29 16:26:24 +08:00
}
2026-05-05 13:03:14 +08:00
if ( mergedCount > 0 ) {
logger . info ( '[chat-run-socket] syncFromHermes: merged reasoning for %d messages' , mergedCount )
}
2026-04-29 16:26:24 +08:00
2026-05-05 13:03:14 +08:00
// Batch insert with transaction for atomicity
addMessages ( toInsert . map ( msg = > {
// Resolve tool_name from assistant's tool_calls if missing
let toolName = msg . tool_name || null
if ( ! toolName && msg . tool_call_id ) {
toolName = toolNameMap . get ( msg . tool_call_id ) || null
2026-05-01 08:13:55 +08:00
}
2026-05-05 13:03:14 +08:00
return {
session_id : localSessionId ,
role : msg.role ,
content : msg.content || '' ,
tool_call_id : msg.tool_call_id || null ,
tool_calls : msg.tool_calls || null ,
tool_name : toolName ,
timestamp : msg.timestamp || Math . floor ( Date . now ( ) / 1000 ) ,
token_count : msg.token_count || null ,
finish_reason : msg.finish_reason || null ,
reasoning : msg.reasoning || null ,
reasoning_details : msg.reasoning_details || null ,
reasoning_content : msg.reasoning_content || null ,
codex_reasoning_items : msg.codex_reasoning_items || null ,
2026-04-29 16:26:24 +08:00
}
2026-05-05 13:03:14 +08:00
} ) )
2026-04-29 16:26:24 +08:00
2026-05-05 13:03:14 +08:00
logger . info ( '[chat-run-socket] syncFromHermes: synced %d messages to local session %s' , toInsert . length , localSessionId )
}
updateSessionStats ( localSessionId )
// Record usage from Hermes session
updateUsage ( localSessionId , {
inputTokens : detail.input_tokens ,
outputTokens : detail.output_tokens ,
cacheReadTokens : detail.cache_read_tokens ,
cacheWriteTokens : detail.cache_write_tokens ,
reasoningTokens : detail.reasoning_tokens ,
model : detail.model ,
profile : profile || 'default' ,
2026-04-29 16:26:24 +08:00
} )
2026-05-05 13:03:14 +08:00
// Calculate usage from DB now that data is complete
// Use inputTokens already set by compression path if available
const state = this . sessionMap . get ( localSessionId )
if ( state ) {
2026-05-07 10:34:58 +08:00
const messages = this . handleMessage ( toReplaceInMemory , localSessionId )
2026-05-05 13:03:14 +08:00
if ( messages . length > 0 ) {
this . replaceByHermesSessionId ( localSessionId , hermesSessionId , messages )
}
const emit = ( event : string , payload : any ) = > {
const tagged = localSessionId ? { . . . payload , localSessionId } : payload
if ( localSessionId ) {
this . nsp . to ( ` session: ${ localSessionId } ` ) . emit ( event , tagged )
} else if ( socket . connected ) {
socket . emit ( event , tagged )
}
}
this . calcAndUpdateUsage ( localSessionId , state , emit )
}
// Enqueue ephemeral session for deferred deletion
this . enqueueEphemeralDelete ( hermesSessionId , profile )
return true
} catch ( err : any ) {
logger . warn ( err , '[chat-run-socket] syncFromHermes failed for session %s (hermesId: %s, profile: %s)' , localSessionId , hermesSessionId , profile || 'default' )
return false
}
2026-04-29 16:26:24 +08:00
}
2026-05-01 08:13:55 +08:00
private replaceByHermesSessionId ( session_id : string , hermesSessionId : string , newItems : SessionMessage [ ] ) {
let start = - 1
let end = - 1
const state = this . sessionMap . get ( session_id )
const msg = state ? . messages || [ ]
// 找区间
for ( let i = 0 ; i < msg . length ; i ++ ) {
if ( msg [ i ] . hermesSessionId === hermesSessionId ) {
if ( start === - 1 ) start = i
end = i
} else if ( start !== - 1 ) {
// 已经找到一段,后面断了就可以结束
break
}
}
2026-04-29 16:26:24 +08:00
2026-05-01 08:13:55 +08:00
// 没找到
if ( start === - 1 ) return
2026-05-07 10:34:58 +08:00
if ( ! newItems . some ( item = > item . role === 'user' ) ) {
const existingUsers = msg . slice ( start , end + 1 ) . filter ( item = > item . role === 'user' )
newItems = [ . . . existingUsers , . . . newItems ]
}
2026-05-01 08:13:55 +08:00
// 替换
msg . splice ( start , end - start + 1 , . . . newItems )
}
2026-04-29 16:26:24 +08:00
/** Enqueue an ephemeral Hermes session for deferred deletion */
private enqueueEphemeralDelete ( hermesSessionId : string , profile? : string ) {
try {
const db = getDb ( )
if ( ! db ) return
const now = Date . now ( )
db . prepare (
` INSERT INTO gc_pending_session_deletes (session_id, profile_name, status, attempt_count, last_error, created_at, updated_at, next_attempt_at)
VALUES (?, ?, 'pending', 0, NULL, ?, ?, ?)
ON CONFLICT(session_id) DO NOTHING ` ,
) . run ( hermesSessionId , profile || 'default' , now , now , now )
logger . info ( '[chat-run-socket] enqueued ephemeral session %s for deletion' , hermesSessionId )
} catch { /* best-effort */ }
}
2026-05-08 16:59:36 +02:00
private async getApprovalCapabilityError ( upstream : string , headers : Record < string , string > ) : Promise < string | null > {
try {
const res = await fetch ( ` ${ upstream } /v1/capabilities ` , {
method : 'GET' ,
headers ,
signal : AbortSignal.timeout ( 10 _000 ) ,
} )
if ( ! res . ok ) {
return ` Hermes Agent API does not advertise approval support; upgrade to Hermes Agent main or a build containing NousResearch/hermes-agent#21899. (/v1/capabilities returned ${ res . status } ) `
}
const caps = await res . json ( ) . catch ( ( ) = > ( { } ) ) as any
const features = caps ? . features || { }
const endpoints = caps ? . endpoints || { }
if ( features . approval_events === true && features . run_approval_response === true && endpoints . run_approval ? . path ) {
return null
}
return 'Hermes Agent API does not support run approval control plane; upgrade to Hermes Agent main or a build containing NousResearch/hermes-agent#21899.'
} catch ( err : any ) {
return ` Unable to verify Hermes Agent approval capabilities: ${ err ? . message || 'capability request failed' } `
}
}
2026-04-29 16:26:24 +08:00
2026-04-30 16:40:37 +08:00
2026-05-08 16:59:36 +02:00
/** Get stable gateway approval/memory key for a Web UI chat session. */
private getGatewaySessionKey ( sessionId : string ) : string {
return ` webui: ${ sessionId } ` . replace ( /[\r\n\x00]/g , '_' )
}
private normalizeApprovalRequest ( parsed : any , runId? : string ) : any | null {
const eventName = parsed ? . event
const status = parsed ? . status || parsed ? . data ? . status || parsed ? . output ? . status
const isApprovalEvent = eventName === 'approval_required' || eventName === 'approval.requested' || eventName === 'approval.request'
if ( ! isApprovalEvent && status !== 'approval_required' ) return null
const source = parsed ? . data && typeof parsed . data === 'object'
? parsed.data
: parsed?.output && typeof parsed . output === 'object'
? parsed.output
: parsed
return {
event : 'approval.request' ,
run_id : parsed?.run_id || runId ,
timestamp : parsed?.timestamp || Date . now ( ) / 1000 ,
command : source?.command ,
description : source?.description ,
pattern_key : source?.pattern_key ,
pattern_keys : source?.pattern_keys ,
choices : Array.isArray ( source ? . choices ) ? source . choices : [ 'once' , 'session' , 'always' , 'deny' ] ,
message : source?.message ,
}
}
2026-04-29 16:26:24 +08:00
/** Get or create session state in sessionMap */
private getOrCreateSession ( sessionId : string ) : SessionState {
let state = this . sessionMap . get ( sessionId )
if ( ! state ) {
2026-05-07 10:34:58 +08:00
state = { messages : [ ] , isWorking : false , events : [ ] , queue : [ ] }
2026-04-29 16:26:24 +08:00
this . sessionMap . set ( sessionId , state )
}
return state
}
/** Append a state event for a session (used for replay on reconnect) */
private pushState ( sessionId : string , event : string , data : any ) {
const state = this . getOrCreateSession ( sessionId )
state . events . push ( { event , data } )
}
/** Replace the last state with the same event name, or append if different */
private replaceState ( sessionId : string , event : string , data : any ) {
const state = this . sessionMap . get ( sessionId )
if ( state ) {
const idx = state . events . findIndex ( s = > s . event === event )
if ( idx >= 0 ) {
state . events [ idx ] = { event , data }
return
}
}
this . pushState ( sessionId , event , data )
}
2026-05-05 13:03:14 +08:00
private emitToSession ( socket : Socket , sessionId : string , event : string , payload : any ) {
const tagged = { . . . payload , session_id : sessionId }
this . nsp . to ( ` session: ${ sessionId } ` ) . emit ( event , tagged )
if ( ! this . nsp . adapter . rooms . get ( ` session: ${ sessionId } ` ) ? . size && socket . connected ) {
socket . emit ( event , tagged )
}
}
/** Close all active EventSource connections and abort controllers */
close() {
for ( const [ sessionId , state ] of this . sessionMap . entries ( ) ) {
if ( state . abortController ) {
try {
state . abortController . abort ( )
} catch ( e ) {
logger . warn ( e , '[chat-run-socket] failed to abort controller for session %s' , sessionId )
}
}
}
this . sessionMap . clear ( )
this . hermesSessionIds . clear ( )
logger . info ( '[chat-run-socket] closed all connections and cleared state' )
}
2026-04-29 16:26:24 +08:00
}
2026-05-07 10:34:58 +08:00
/** Check if the current ephemeral run has already produced assistant text. */
function runProducedAssistantText ( messages : SessionMessage [ ] , hermesSessionId? : string ) : boolean {
return messages . some ( m = >
m . hermesSessionId === hermesSessionId &&
m . role === 'assistant' &&
! ! m . content ? . trim ( )
)
2026-04-29 16:26:24 +08:00
}