feat: add Anthropic format conversion for chat runs and improvements (#347)
* fix: improve chat compression and tool display Context Compression Fixes: - Remove duplicate token calculation in compress() - Simplify compress() to only execute compression, not judge - Add buildConversationHistory() to preserve tool calls in LLM context - Remove unused estimateMessagesTokens() and contextLength parameter - Move all judgment logic to chat-run-socket.ts (uses accurate DB tokens) Tool Call Display Improvements: - Add tool execution duration display (format: 1.272s) - Add success/error status icons with circular backgrounds - Replace text error with SVG icon (X in red circle) - Replace old checkmark with polished green checkmark icon - Add i18n key 'chat.executionDuration' for all locales Bug Fixes: - Fix streaming-indicator stuck by adding try-finally in handleEvent - Add debug logging for compression flow diagnosis - Fix template syntax error in MessageList.vue Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(chat): convert conversation history to Anthropic format before sending to Gateway - Add convertToAnthropicFormat() to transform OpenAI format to Anthropic format - Handle DeepSeek reasoning_content in thinking blocks - Properly convert tool_use and tool_result blocks - Add convertFromAnthropicFormat() for parsing SSE responses - Handle stringified Python arrays in resume messages - Record debug history files for troubleshooting (original vs converted) - Fix tool_call_id validation to prevent empty ID errors - Clean internal Hermes fields (call_id, response_item_id) from tool_calls Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(chat): optimize message parsing and add debug logging - Only check for stringified arrays in assistant messages (performance) - Improve parsing error handling: keep original content on parse failure - Add debug logging for upstream events (reasoning/thinking tracking) - Log run.completed event keys for troubleshooting Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(chat): add message pagination and reasoning sync improvements **Message Pagination:** - Add getSessionDetailPaginated() for paginated message loading - Query with DESC order then reverse in code for optimal performance - Remove listSessionsPaginated() (not needed) **Reasoning Sync:** - Add bidirectional reasoning merge in syncFromHermes - Memory → DB: preserve streamed reasoning from SSE events - DB → Memory: restore reasoning if Hermes Gateway fixes storage - Send resumed event after sync completes with complete messages - Fix reasoning field inconsistency: use unified 'reasoning' field **Message Parsing:** - Only parse stringified arrays for assistant messages (performance) - Improve parse error handling: keep original content on failure - Add debug logging for upstream reasoning/thinking events **Bug Fixes:** - Fix reasoning content display: now works on both SSE and resume - Ensure reasoning is preserved across page refreshes via sync + resumed event Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: increase default pagination limit for messages to 500 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: remove auto-resumed event trigger and clean up debug code - Remove automatic resumed event trigger in syncFromHermes to avoid timing issues - Clean up unused imports (fs, join) - Remove debug history file logging code - Fix socket parameter passing in handleAbort, markCompleted, and syncFromHermes - Change usage emit from room broadcast to socket-only emit - Remove console.log debug statement Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: use reasoning field in convertToAnthropicFormat Change convertToAnthropicFormat to read from reasoning field instead of reasoning_content for consistency with database schema and frontend. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat: parse stringified array content and improve logs - Parse stringified array format in run.completed to extract thinking/text/tool_use - Send parsed content to frontend via parsed_content/parsed_reasoning/parsed_tool_calls - Frontend updates last assistant message with parsed content - Remove ellipsis from log messages, show full content - Add detailed logging for conversion and parsing Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: move finalOutputTrimmed outside else block * fix(chat): handle double-serialized content in resumeSession - Remove outer quotes before parsing stringified array format - Updated changelog for v0.5.2 and v0.5.3 with multilingual support - Fixed message pagination with DESC query + array reverse Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(chat): improve error logging for resume parsing - Add detailed logging for double-serialized content parsing - Log content preview when parsing fails to diagnose issues Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * revert(chat): use simple Python-to-JSON replacement - Revert to simple .replace(/'/g, '"') approach - Parsing failures will keep original content as-is Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -15,6 +15,7 @@ import { updateUsage } from '../../db/hermes/usage-store'
|
||||
import {
|
||||
getSession,
|
||||
getSessionDetail,
|
||||
getSessionDetailPaginated,
|
||||
createSession,
|
||||
addMessage,
|
||||
updateSessionStats,
|
||||
@@ -29,6 +30,98 @@ import { logger } from '../logger'
|
||||
|
||||
const compressor = new ChatContextCompressor()
|
||||
|
||||
// --- Helper: Convert OpenAI format to Anthropic format ---
|
||||
function convertToAnthropicFormat(messages: any[]): any[] {
|
||||
const result: any[] = []
|
||||
|
||||
for (const m of messages) {
|
||||
const role = m.role
|
||||
const content = m.content || ''
|
||||
|
||||
if (role === 'assistant') {
|
||||
const blocks: any[] = []
|
||||
|
||||
// Add thinking block if reasoning_content exists
|
||||
if (m.reasoning) {
|
||||
blocks.push({ type: 'thinking', thinking: m.reasoning })
|
||||
}
|
||||
|
||||
// Add text content
|
||||
if (content) {
|
||||
if (typeof content === 'string') {
|
||||
blocks.push({ type: 'text', text: content })
|
||||
} else if (Array.isArray(content)) {
|
||||
blocks.push(...content)
|
||||
}
|
||||
}
|
||||
|
||||
// Add tool_use blocks
|
||||
if (m.tool_calls && Array.isArray(m.tool_calls)) {
|
||||
for (const tc of m.tool_calls) {
|
||||
if (tc.id && tc.function) {
|
||||
let args = tc.function.arguments || '{}'
|
||||
try {
|
||||
args = typeof args === 'string' ? JSON.parse(args) : args
|
||||
} catch {
|
||||
args = {}
|
||||
}
|
||||
blocks.push({
|
||||
type: 'tool_use',
|
||||
id: tc.id,
|
||||
name: tc.function.name,
|
||||
input: args
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle empty content
|
||||
if (blocks.length === 0) {
|
||||
blocks.push({ type: 'text', text: '' })
|
||||
}
|
||||
|
||||
result.push({ role: 'assistant', content: blocks })
|
||||
continue
|
||||
}
|
||||
|
||||
if (role === 'tool') {
|
||||
// Convert tool message to tool_result in user message
|
||||
const toolContent = content || '(no output)'
|
||||
const toolResult = {
|
||||
type: 'tool_result',
|
||||
tool_use_id: m.tool_call_id || '',
|
||||
content: typeof toolContent === 'string' ? toolContent : JSON.stringify(toolContent)
|
||||
}
|
||||
|
||||
// Merge with previous user message if it ends with tool_result
|
||||
if (
|
||||
result.length > 0 &&
|
||||
result[result.length - 1].role === 'user' &&
|
||||
Array.isArray(result[result.length - 1].content) &&
|
||||
result[result.length - 1].content.length > 0 &&
|
||||
result[result.length - 1].content[result[result.length - 1].content.length - 1].type === 'tool_result'
|
||||
) {
|
||||
result[result.length - 1].content.push(toolResult)
|
||||
} else {
|
||||
result.push({ role: 'user', content: [toolResult] })
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Regular user message
|
||||
if (role === 'user') {
|
||||
if (typeof content === 'string') {
|
||||
result.push({ role: 'user', content: content || '(empty message)' })
|
||||
} else if (Array.isArray(content)) {
|
||||
result.push({ role: 'user', content })
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// --- Session state tracking ---
|
||||
|
||||
interface SessionMessage {
|
||||
@@ -113,108 +206,297 @@ export class ChatRunSocket {
|
||||
const sid = data.session_id
|
||||
const room = `session:${sid}`
|
||||
socket.join(room)
|
||||
|
||||
let state = this.sessionMap.get(sid)
|
||||
|
||||
// Not in memory — load from DB
|
||||
if (!state) {
|
||||
try {
|
||||
const detail = useLocalSessionStore()
|
||||
? getSessionDetail(sid)
|
||||
: await getSessionDetailFromDb(sid)
|
||||
const messages = detail?.messages?.length
|
||||
? detail.messages
|
||||
.filter(m => (m.role === 'user' || m.role === 'assistant' || m.role === 'tool') && m.content !== undefined)
|
||||
.map((m, idx, arr) => {
|
||||
const msg: any = {
|
||||
id: m.id,
|
||||
session_id: sid,
|
||||
role: m.role,
|
||||
content: m.content || '',
|
||||
timestamp: m.timestamp,
|
||||
}
|
||||
if (m.tool_calls?.length) msg.tool_calls = m.tool_calls
|
||||
|
||||
// For tool messages, ensure tool_call_id exists
|
||||
if (m.role === 'tool') {
|
||||
if (m.tool_call_id) {
|
||||
msg.tool_call_id = m.tool_call_id
|
||||
} else {
|
||||
// Try to reconstruct tool_call_id from previous assistant message
|
||||
const prevMsg = arr[idx - 1]
|
||||
if (prevMsg?.role === 'assistant' && prevMsg.tool_calls?.length) {
|
||||
// Find matching tool_call by tool_name
|
||||
const tc = prevMsg.tool_calls.find((t: any) => t.function?.name === m.tool_name)
|
||||
if (tc?.id) {
|
||||
msg.tool_call_id = tc.id
|
||||
} else {
|
||||
// Cannot reconstruct - skip this tool message
|
||||
return null
|
||||
}
|
||||
} else {
|
||||
// No previous assistant message with tool_calls - skip
|
||||
return null
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (m.tool_name) msg.tool_name = m.tool_name
|
||||
if (m.reasoning) msg.reasoning = m.reasoning
|
||||
return msg
|
||||
})
|
||||
.filter(m => m !== null)
|
||||
: []
|
||||
|
||||
// Calculate context tokens — aware of compression snapshot
|
||||
let inputTokens: number
|
||||
const snapshot = getCompressionSnapshot(sid)
|
||||
if (snapshot) {
|
||||
const newMessages = messages.slice(snapshot.lastMessageIndex + 1)
|
||||
inputTokens = countTokens(SUMMARY_PREFIX + snapshot.summary) +
|
||||
newMessages.reduce((sum, m) => sum + countTokens(m.content || ''), 0)
|
||||
} else {
|
||||
inputTokens = messages.reduce((sum, m) => sum + countTokens(m.content || ''), 0)
|
||||
}
|
||||
const outputTokens = messages
|
||||
.filter(m => m.role === 'assistant')
|
||||
.reduce((sum, m) => sum + countTokens(m.content || ''), 0)
|
||||
state = {
|
||||
messages,
|
||||
isWorking: false,
|
||||
events: [],
|
||||
inputTokens,
|
||||
outputTokens,
|
||||
}
|
||||
this.sessionMap.set(sid, state)
|
||||
logger.info('[chat-run-socket] loaded session %s from DB (%d messages)', sid, messages.length)
|
||||
} catch (err) {
|
||||
logger.warn(err, '[chat-run-socket] failed to load session %s from DB on resume', sid)
|
||||
state = { messages: [], isWorking: false, events: [] }
|
||||
this.sessionMap.set(sid, state)
|
||||
}
|
||||
}
|
||||
|
||||
// Reply with messages, working status + events (if working)
|
||||
socket.emit('resumed', {
|
||||
session_id: sid,
|
||||
messages: state.messages,
|
||||
isWorking: state.isWorking,
|
||||
events: state.isWorking ? state.events : [],
|
||||
inputTokens: state.inputTokens,
|
||||
outputTokens: state.outputTokens,
|
||||
})
|
||||
|
||||
logger.info('[chat-run-socket] socket %s resumed session %s (working: %s, messages: %d)',
|
||||
socket.id, sid, state.isWorking, state.messages.length)
|
||||
this.resumeSession(socket, sid)
|
||||
})
|
||||
|
||||
socket.on('abort', (data: { session_id?: string }) => {
|
||||
if (data.session_id) {
|
||||
this.handleAbort(data.session_id)
|
||||
this.handleAbort(socket, data.session_id)
|
||||
}
|
||||
})
|
||||
}
|
||||
private async resumeSession(socket: Socket, sid: string) {
|
||||
let state = this.sessionMap.get(sid)
|
||||
|
||||
try {
|
||||
const detail = useLocalSessionStore()
|
||||
? getSessionDetailPaginated(sid)
|
||||
: await getSessionDetailFromDb(sid)
|
||||
const messages = detail?.messages?.length
|
||||
? detail.messages
|
||||
.filter(m => (m.role === 'user' || m.role === 'assistant' || m.role === 'tool') && m.content !== undefined)
|
||||
.map((m, idx, arr) => {
|
||||
const msg: any = {
|
||||
id: m.id,
|
||||
session_id: sid,
|
||||
role: m.role,
|
||||
content: m.content || '',
|
||||
reasoning: m.reasoning || '',
|
||||
timestamp: m.timestamp,
|
||||
}
|
||||
// Convert Anthropic format content to OpenAI format
|
||||
// Check if content is a stringified array (Hermes Gateway behavior) - only for assistant messages
|
||||
if (m.role === 'assistant' && typeof m.content === 'string') {
|
||||
// Handle double-serialized content: "[{'type': 'text', ...}]" -> "[{'type': 'text', ...}]"
|
||||
let contentToParse = m.content
|
||||
const trimmed = m.content.trim()
|
||||
if (trimmed.startsWith('"') && trimmed.endsWith('"') && trimmed.length >= 2) {
|
||||
contentToParse = trimmed.slice(1, -1)
|
||||
logger.info('[chat-run-socket] resume message %s: double-serialized, removed outer quotes', m.id)
|
||||
}
|
||||
|
||||
if (contentToParse.startsWith('[') && contentToParse.endsWith(']')) {
|
||||
try {
|
||||
// Parse stringified Python-like array to JSON
|
||||
const parsedContent = JSON.parse(
|
||||
contentToParse
|
||||
.replace(/'/g, '"') // Python single quotes to JSON double quotes
|
||||
.replace(/True/g, 'true')
|
||||
.replace(/False/g, 'false')
|
||||
.replace(/None/g, 'null')
|
||||
)
|
||||
if (Array.isArray(parsedContent)) {
|
||||
const textBlocks: string[] = []
|
||||
const toolCalls: any[] = []
|
||||
let reasoningContent: string | null = null
|
||||
|
||||
for (const block of parsedContent) {
|
||||
if (block.type === 'thinking') {
|
||||
reasoningContent = block.thinking
|
||||
} else if (block.type === 'text') {
|
||||
textBlocks.push(block.text)
|
||||
} else if (block.type === 'tool_use') {
|
||||
toolCalls.push({
|
||||
id: block.id,
|
||||
type: 'function',
|
||||
function: {
|
||||
name: block.name,
|
||||
arguments: JSON.stringify(block.input)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
msg.content = textBlocks.join('') || ''
|
||||
if (toolCalls.length > 0) {
|
||||
msg.tool_calls = toolCalls
|
||||
}
|
||||
if (reasoningContent) {
|
||||
msg.reasoning = reasoningContent
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
// Parsing failed, keep original content
|
||||
msg.content = m.content
|
||||
}
|
||||
}
|
||||
} else if (Array.isArray(m.content)) {
|
||||
const textBlocks: string[] = []
|
||||
const toolCalls: any[] = []
|
||||
let reasoningContent: string | null = null
|
||||
|
||||
for (const block of m.content) {
|
||||
if (block.type === 'thinking') {
|
||||
reasoningContent = block.thinking
|
||||
} else if (block.type === 'text') {
|
||||
textBlocks.push(block.text)
|
||||
} else if (block.type === 'tool_use') {
|
||||
toolCalls.push({
|
||||
id: block.id,
|
||||
type: 'function',
|
||||
function: {
|
||||
name: block.name,
|
||||
arguments: JSON.stringify(block.input)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
msg.content = textBlocks.join('') || ''
|
||||
if (toolCalls.length > 0) {
|
||||
msg.tool_calls = toolCalls
|
||||
}
|
||||
if (reasoningContent) {
|
||||
msg.reasoning = reasoningContent
|
||||
}
|
||||
}
|
||||
|
||||
if (m.tool_calls?.length) {
|
||||
// Filter out tool_calls with empty/invalid id and remove internal fields
|
||||
const cleanedToolCalls = m.tool_calls
|
||||
.filter((tc: any) => tc.id && tc.id.length > 0)
|
||||
.map((tc: any) => ({
|
||||
id: tc.id,
|
||||
type: tc.type,
|
||||
function: tc.function
|
||||
}))
|
||||
if (cleanedToolCalls.length > 0) {
|
||||
msg.tool_calls = cleanedToolCalls
|
||||
}
|
||||
}
|
||||
|
||||
// For tool messages, ensure tool_call_id exists
|
||||
if (m.role === 'tool') {
|
||||
let callId = m.tool_call_id
|
||||
if (!callId || callId.length === 0) {
|
||||
// Try to reconstruct tool_call_id from previous assistant message
|
||||
const prevMsg = arr[idx - 1]
|
||||
if (prevMsg?.role === 'assistant' && prevMsg.tool_calls?.length) {
|
||||
// Find matching tool_call by tool_name
|
||||
const tc = prevMsg.tool_calls.find((t: any) => t.function?.name === m.tool_name)
|
||||
if (tc?.id) {
|
||||
callId = tc.id
|
||||
}
|
||||
}
|
||||
}
|
||||
// Skip tool message if no valid tool_call_id
|
||||
if (!callId || callId.length === 0) {
|
||||
return null
|
||||
}
|
||||
msg.tool_call_id = callId
|
||||
}
|
||||
|
||||
if (m.tool_name) msg.tool_name = m.tool_name
|
||||
if (m.reasoning) msg.reasoning = m.reasoning
|
||||
return msg
|
||||
})
|
||||
.filter(m => m !== null)
|
||||
: []
|
||||
// Calculate context tokens — aware of compression snapshot
|
||||
let inputTokens: number
|
||||
const snapshot = getCompressionSnapshot(sid)
|
||||
if (snapshot) {
|
||||
const newMessages = messages.slice(snapshot.lastMessageIndex + 1)
|
||||
inputTokens = countTokens(SUMMARY_PREFIX + snapshot.summary) +
|
||||
newMessages.reduce((sum, m) => sum + countTokens(m.content || ''), 0)
|
||||
} else {
|
||||
inputTokens = messages.reduce((sum, m) => sum + countTokens(m.content || ''), 0)
|
||||
}
|
||||
const outputTokens = messages
|
||||
.filter(m => m.role === 'assistant')
|
||||
.reduce((sum, m) => sum + countTokens(m.content || ''), 0)
|
||||
state = {
|
||||
messages,
|
||||
isWorking: false,
|
||||
events: [],
|
||||
inputTokens,
|
||||
outputTokens,
|
||||
}
|
||||
this.sessionMap.set(sid, state)
|
||||
logger.info('[chat-run-socket] loaded session %s from DB (%d messages)', sid, messages.length)
|
||||
} catch (err) {
|
||||
logger.warn(err, '[chat-run-socket] failed to load session %s from DB on resume', sid)
|
||||
state = { messages: [], isWorking: false, events: [] }
|
||||
this.sessionMap.set(sid, state)
|
||||
}
|
||||
|
||||
// Reply with messages, working status + events (if working)
|
||||
// Convert messages from internal storage format to OpenAI format for client
|
||||
const clientMessages = state.messages.map((m: any) => {
|
||||
const msg: any = { ...m }
|
||||
// Check if content is a stringified array (Hermes Gateway behavior) - only for assistant messages
|
||||
if (m.role === 'assistant' && typeof m.content === 'string') {
|
||||
// Handle double-serialized content: "[{'type': 'text', ...}]"
|
||||
let contentToParse = m.content
|
||||
const trimmed = m.content.trim()
|
||||
if (trimmed.startsWith('"') && trimmed.endsWith('"') && trimmed.length >= 2) {
|
||||
contentToParse = trimmed.slice(1, -1)
|
||||
logger.info('[chat-run-socket] resume message %s: double-serialized, removed outer quotes', m.id)
|
||||
}
|
||||
|
||||
if (contentToParse.trim().startsWith('[') && contentToParse.trim().endsWith(']')) {
|
||||
try {
|
||||
// Parse stringified Python-like array to JSON
|
||||
const parsedContent = JSON.parse(
|
||||
contentToParse
|
||||
.replace(/'/g, '"')
|
||||
.replace(/True/g, 'true')
|
||||
.replace(/False/g, 'false')
|
||||
.replace(/None/g, 'null')
|
||||
)
|
||||
if (Array.isArray(parsedContent)) {
|
||||
const textBlocks: string[] = []
|
||||
const toolCalls: any[] = []
|
||||
let reasoningContent: string | null = null
|
||||
|
||||
for (const block of parsedContent) {
|
||||
if (block.type === 'thinking') {
|
||||
reasoningContent = block.thinking
|
||||
} else if (block.type === 'text') {
|
||||
textBlocks.push(block.text)
|
||||
} else if (block.type === 'tool_use') {
|
||||
toolCalls.push({
|
||||
id: block.id,
|
||||
type: 'function',
|
||||
function: {
|
||||
name: block.name,
|
||||
arguments: JSON.stringify(block.input)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
msg.content = textBlocks.join('') || ''
|
||||
if (toolCalls.length > 0) {
|
||||
msg.tool_calls = toolCalls
|
||||
}
|
||||
if (reasoningContent) {
|
||||
msg.reasoning = reasoningContent
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
logger.error('[chat-run-socket] resume message %s: failed to parse content, error=%s, content=%s', m.id, (e as Error).message, contentToParse.substring(0, 200))
|
||||
// Parsing failed, keep original content
|
||||
msg.content = m.content
|
||||
}
|
||||
}
|
||||
} else if (Array.isArray(m.content)) {
|
||||
// If content is an array (Anthropic format), convert to OpenAI format
|
||||
const textBlocks: string[] = []
|
||||
const toolCalls: any[] = []
|
||||
let reasoningContent: string | null = null
|
||||
|
||||
for (const block of m.content) {
|
||||
if (block.type === 'thinking') {
|
||||
reasoningContent = block.thinking
|
||||
} else if (block.type === 'text') {
|
||||
textBlocks.push(block.text)
|
||||
} else if (block.type === 'tool_use') {
|
||||
toolCalls.push({
|
||||
id: block.id,
|
||||
type: 'function',
|
||||
function: {
|
||||
name: block.name,
|
||||
arguments: JSON.stringify(block.input)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
msg.content = textBlocks.join('') || ''
|
||||
if (toolCalls.length > 0) {
|
||||
msg.tool_calls = toolCalls
|
||||
}
|
||||
if (reasoningContent) {
|
||||
msg.reasoning = reasoningContent
|
||||
}
|
||||
}
|
||||
|
||||
return msg
|
||||
})
|
||||
|
||||
socket.emit('resumed', {
|
||||
session_id: sid,
|
||||
messages: clientMessages,
|
||||
isWorking: state.isWorking,
|
||||
events: state.isWorking ? state.events : [],
|
||||
inputTokens: state.inputTokens,
|
||||
outputTokens: state.outputTokens,
|
||||
})
|
||||
|
||||
logger.info('[chat-run-socket] socket %s resumed session %s (working: %s, messages: %d)',
|
||||
socket.id, sid, state.isWorking, state.messages.length)
|
||||
}
|
||||
// --- Run handler ---
|
||||
|
||||
private async handleRun(
|
||||
@@ -301,31 +583,45 @@ export class ChatRunSocket {
|
||||
tool_calls?: any[]
|
||||
tool_call_id?: string
|
||||
name?: string
|
||||
reasoning_content?: string | null
|
||||
}> = (lastUserMsgIndex >= 0
|
||||
? validMessages.slice(0, validMessages.length - lastUserMsgIndex - 1)
|
||||
: validMessages
|
||||
).map((m, idx, arr) => {
|
||||
const msg: any = { role: m.role, content: m.content || '' }
|
||||
if (m.tool_calls?.length) msg.tool_calls = m.tool_calls
|
||||
const msg: any = { role: m.role, content: m.content || 'empty message' }
|
||||
if (m.reasoning_content) msg.reasoning_content = m.reasoning_content
|
||||
if (m.tool_calls?.length) {
|
||||
// Filter out tool_calls with empty/invalid id and remove internal fields
|
||||
const cleanedToolCalls = m.tool_calls
|
||||
.filter((tc: any) => tc.id && tc.id.length > 0)
|
||||
.map((tc: any) => ({
|
||||
id: tc.id,
|
||||
type: tc.type,
|
||||
function: tc.function
|
||||
}))
|
||||
if (cleanedToolCalls.length > 0) {
|
||||
msg.tool_calls = cleanedToolCalls
|
||||
}
|
||||
}
|
||||
|
||||
// For tool messages, ensure tool_call_id exists
|
||||
if (m.role === 'tool') {
|
||||
if (m.tool_call_id) {
|
||||
msg.tool_call_id = m.tool_call_id
|
||||
} else {
|
||||
let callId = m.tool_call_id
|
||||
if (!callId || callId.length === 0) {
|
||||
// Try to reconstruct tool_call_id from previous assistant message
|
||||
const prevMsg = arr[idx - 1]
|
||||
if (prevMsg?.role === 'assistant' && prevMsg.tool_calls?.length) {
|
||||
const tc = prevMsg.tool_calls.find((t: any) => t.function?.name === m.tool_name)
|
||||
if (tc?.id) {
|
||||
msg.tool_call_id = tc.id
|
||||
} else {
|
||||
return null // Cannot reconstruct
|
||||
callId = tc.id
|
||||
}
|
||||
} else {
|
||||
return null // No assistant message to reconstruct from
|
||||
}
|
||||
}
|
||||
// Skip tool message if no valid tool_call_id
|
||||
if (!callId || callId.length === 0) {
|
||||
return null
|
||||
}
|
||||
msg.tool_call_id = callId
|
||||
}
|
||||
|
||||
if (m.tool_name) msg.name = m.tool_name
|
||||
@@ -367,7 +663,7 @@ export class ChatRunSocket {
|
||||
|
||||
try {
|
||||
const result = await compressor.compress(
|
||||
history, upstream, apiKey, session_id, contextLength,
|
||||
history, upstream, apiKey, session_id,
|
||||
)
|
||||
const afterTokens = await this.calcAndUpdateUsage(session_id, cState, emit)
|
||||
this.replaceState(session_id, 'compression.completed', {
|
||||
@@ -397,13 +693,29 @@ export class ChatRunSocket {
|
||||
compressedStartIndex: result.meta.compressedStartIndex,
|
||||
})
|
||||
|
||||
history = result.messages.map(m => ({
|
||||
role: m.role,
|
||||
content: m.content,
|
||||
tool_calls: m.tool_calls,
|
||||
tool_call_id: m.tool_call_id,
|
||||
name: m.name,
|
||||
}))
|
||||
history = result.messages.map(m => {
|
||||
const msg: any = {
|
||||
role: m.role,
|
||||
content: m.content,
|
||||
tool_call_id: m.tool_call_id,
|
||||
name: m.name,
|
||||
}
|
||||
if (m.reasoning_content) msg.reasoning_content = m.reasoning_content
|
||||
// Filter tool_calls if present, remove internal fields
|
||||
if (m.tool_calls?.length) {
|
||||
const cleanedToolCalls = m.tool_calls
|
||||
.filter((tc: any) => tc.id && tc.id.length > 0)
|
||||
.map((tc: any) => ({
|
||||
id: tc.id,
|
||||
type: tc.type,
|
||||
function: tc.function
|
||||
}))
|
||||
if (cleanedToolCalls.length > 0) {
|
||||
msg.tool_calls = cleanedToolCalls
|
||||
}
|
||||
}
|
||||
return msg
|
||||
})
|
||||
// Update usage from DB (snapshot now updated by compressor)
|
||||
await this.calcAndUpdateUsage(session_id, cState, emit)
|
||||
} catch (err: any) {
|
||||
@@ -457,7 +769,7 @@ export class ChatRunSocket {
|
||||
|
||||
try {
|
||||
const result = await compressor.compress(
|
||||
history, upstream, apiKey, session_id, contextLength,
|
||||
history, upstream, apiKey, session_id,
|
||||
)
|
||||
const cState = this.getOrCreateSession(session_id)
|
||||
const afterTokens = await this.calcAndUpdateUsage(session_id, cState, emit)
|
||||
@@ -488,13 +800,29 @@ export class ChatRunSocket {
|
||||
compressedStartIndex: result.meta.compressedStartIndex,
|
||||
})
|
||||
|
||||
history = result.messages.map(m => ({
|
||||
role: m.role,
|
||||
content: m.content,
|
||||
tool_calls: m.tool_calls,
|
||||
tool_call_id: m.tool_call_id,
|
||||
name: m.name,
|
||||
}))
|
||||
history = result.messages.map(m => {
|
||||
const msg: any = {
|
||||
role: m.role,
|
||||
content: m.content,
|
||||
tool_call_id: m.tool_call_id,
|
||||
name: m.name,
|
||||
}
|
||||
if (m.reasoning_content) msg.reasoning_content = m.reasoning_content
|
||||
// Filter tool_calls if present, remove internal fields
|
||||
if (m.tool_calls?.length) {
|
||||
const cleanedToolCalls = m.tool_calls
|
||||
.filter((tc: any) => tc.id && tc.id.length > 0)
|
||||
.map((tc: any) => ({
|
||||
id: tc.id,
|
||||
type: tc.type,
|
||||
function: tc.function
|
||||
}))
|
||||
if (cleanedToolCalls.length > 0) {
|
||||
msg.tool_calls = cleanedToolCalls
|
||||
}
|
||||
}
|
||||
return msg
|
||||
})
|
||||
await this.calcAndUpdateUsage(session_id, cState, emit)
|
||||
} catch (err: any) {
|
||||
this.replaceState(session_id, 'compression.completed', {
|
||||
@@ -535,6 +863,16 @@ export class ChatRunSocket {
|
||||
|
||||
const headers: Record<string, string> = { 'Content-Type': 'application/json' }
|
||||
if (apiKey) headers['Authorization'] = `Bearer ${apiKey}`
|
||||
|
||||
// Debug: write history to JSON file for analysis (before conversion)
|
||||
|
||||
// Convert conversation_history from OpenAI format to Anthropic format
|
||||
if (body.conversation_history && Array.isArray(body.conversation_history)) {
|
||||
body.conversation_history = convertToAnthropicFormat(body.conversation_history)
|
||||
logger.info('[chat-run-socket] converted conversation_history to Anthropic format for session %s: %d messages, content: %s',
|
||||
session_id || '(new)', body.conversation_history.length, JSON.stringify(body.conversation_history, null, 2))
|
||||
}
|
||||
|
||||
const res = await fetch(`${upstream}/v1/runs`, {
|
||||
method: 'POST',
|
||||
headers,
|
||||
@@ -589,6 +927,12 @@ export class ChatRunSocket {
|
||||
source.onmessage = (event: MessageEvent) => {
|
||||
try {
|
||||
const parsed = JSON.parse(event.data as string)
|
||||
// Debug: log all events from upstream
|
||||
if (parsed.event?.includes('reasoning') || parsed.event?.includes('thinking')) {
|
||||
logger.info('[chat-run-socket] upstream event: %s, data: %j', parsed.event, parsed)
|
||||
} else {
|
||||
logger.info('[chat-run-socket] upstream event: %s', parsed.event)
|
||||
}
|
||||
|
||||
// Track messages into sessionMap
|
||||
if (session_id) {
|
||||
@@ -653,9 +997,15 @@ export class ChatRunSocket {
|
||||
break
|
||||
}
|
||||
case 'run.completed': {
|
||||
logger.info('[chat-run-socket] ENTER run.completed case, session_id: %s, messages: %d',
|
||||
session_id, msgs.length)
|
||||
|
||||
if (last?.role === 'assistant' && last.finish_reason == null) {
|
||||
last.finish_reason = parsed.finish_reason || 'stop'
|
||||
}
|
||||
|
||||
// Debug: log run.completed to check if reasoning is included
|
||||
logger.info('[chat-run-socket] run.completed keys: %s', Object.keys(parsed))
|
||||
// Finalize assistant message — if no content was streamed, use output
|
||||
if (parsed.output && !runProducedAssistantText(msgs)) {
|
||||
if (last?.role === 'assistant') {
|
||||
@@ -670,6 +1020,70 @@ export class ChatRunSocket {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Parse stringified array content for all assistant messages
|
||||
let parsedCount = 0
|
||||
for (const msg of msgs) {
|
||||
if (msg.role === 'assistant' && typeof msg.content === 'string' &&
|
||||
msg.content.trim().startsWith('[') && msg.content.trim().endsWith(']')) {
|
||||
try {
|
||||
logger.info('[chat-run-socket] parsing array content for message %s, content preview: %s',
|
||||
msg.id, msg.content.slice(0, 100))
|
||||
const parsedContent = JSON.parse(
|
||||
msg.content
|
||||
.replace(/'/g, '"')
|
||||
.replace(/True/g, 'true')
|
||||
.replace(/False/g, 'false')
|
||||
.replace(/None/g, 'null')
|
||||
)
|
||||
if (Array.isArray(parsedContent)) {
|
||||
const textBlocks: string[] = []
|
||||
const toolCalls: any[] = []
|
||||
let reasoningContent: string | null = null
|
||||
|
||||
for (const block of parsedContent) {
|
||||
if (block.type === 'thinking') {
|
||||
reasoningContent = block.thinking
|
||||
} else if (block.type === 'text') {
|
||||
textBlocks.push(block.text)
|
||||
} else if (block.type === 'tool_use') {
|
||||
toolCalls.push({
|
||||
id: block.id,
|
||||
type: 'function',
|
||||
function: {
|
||||
name: block.name,
|
||||
arguments: JSON.stringify(block.input)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
msg.content = textBlocks.join('') || ''
|
||||
if (toolCalls.length > 0) {
|
||||
msg.tool_calls = toolCalls
|
||||
}
|
||||
if (reasoningContent) {
|
||||
msg.reasoning = reasoningContent
|
||||
}
|
||||
parsedCount++
|
||||
}
|
||||
} catch (e) {
|
||||
logger.error(e, '[chat-run-socket] failed to parse array content for message %s', msg.id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.info('[chat-run-socket] EXIT run.completed case, parsed %d messages', parsedCount)
|
||||
|
||||
// Attach the last assistant message's parsed content to fix stringified array format
|
||||
const lastAssistantMsg = msgs.filter((m: any) => m.role === 'assistant').pop()
|
||||
if (lastAssistantMsg && parsedCount > 0) {
|
||||
parsed.parsed_content = lastAssistantMsg.content || ''
|
||||
parsed.parsed_tool_calls = lastAssistantMsg.tool_calls || null
|
||||
parsed.parsed_reasoning = lastAssistantMsg.reasoning || null
|
||||
logger.info('[chat-run-socket] attached parsed content to run.completed event for message %s', lastAssistantMsg.id)
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
@@ -682,7 +1096,7 @@ export class ChatRunSocket {
|
||||
|
||||
if (parsed.event === 'run.completed' || parsed.event === 'run.failed') {
|
||||
source.close()
|
||||
if (session_id) this.markCompleted(session_id, { event: parsed.event, run_id: parsed.run_id })
|
||||
if (session_id) this.markCompleted(socket, session_id, { event: parsed.event, run_id: parsed.run_id })
|
||||
}
|
||||
} catch { /* not JSON, skip */ }
|
||||
}
|
||||
@@ -690,26 +1104,26 @@ export class ChatRunSocket {
|
||||
source.onerror = () => {
|
||||
source.close()
|
||||
emit('run.failed', { event: 'run.failed', error: 'EventSource connection lost' })
|
||||
if (session_id) this.markCompleted(session_id, { event: 'run.failed' })
|
||||
if (session_id) this.markCompleted(socket, session_id, { event: 'run.failed' })
|
||||
}
|
||||
} catch (err: any) {
|
||||
emit('run.failed', { event: 'run.failed', error: err.message })
|
||||
if (session_id) this.markCompleted(session_id, { event: 'run.failed' })
|
||||
if (session_id) this.markCompleted(socket, session_id, { event: 'run.failed' })
|
||||
}
|
||||
}
|
||||
|
||||
// --- Abort handler ---
|
||||
|
||||
private handleAbort(sessionId: string) {
|
||||
private handleAbort(socket: Socket, sessionId: string) {
|
||||
const state = this.sessionMap.get(sessionId)
|
||||
if (state?.isWorking && state.abortController) {
|
||||
state.abortController.abort()
|
||||
this.markCompleted(sessionId, { event: 'run.failed', run_id: state.runId })
|
||||
this.markCompleted(socket, sessionId, { event: 'run.failed', run_id: state.runId })
|
||||
}
|
||||
}
|
||||
|
||||
/** Mark a session run as completed/failed so reconnecting clients get notified */
|
||||
private markCompleted(sessionId: string, _info: { event: string; run_id?: string }) {
|
||||
private markCompleted(socket: Socket, sessionId: string, _info: { event: string; run_id?: string }) {
|
||||
const state = this.sessionMap.get(sessionId)
|
||||
if (state) {
|
||||
state.isWorking = false
|
||||
@@ -723,7 +1137,7 @@ export class ChatRunSocket {
|
||||
const prof = state.profile
|
||||
state.hermesSessionId = undefined
|
||||
state.profile = undefined
|
||||
this.syncFromHermes(sessionId, hermesId, prof)
|
||||
this.syncFromHermes(socket, sessionId, hermesId, prof)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -775,7 +1189,7 @@ export class ChatRunSocket {
|
||||
* and write to local DB. This gives us tool results that SSE events don't include.
|
||||
* After sync, enqueues the ephemeral session for deletion.
|
||||
*/
|
||||
private syncFromHermes(localSessionId: string, hermesSessionId: string, profile?: string) {
|
||||
private syncFromHermes(socket: Socket, localSessionId: string, hermesSessionId: string, profile?: string) {
|
||||
getSessionDetailFromDb(hermesSessionId)
|
||||
.then((detail) => {
|
||||
if (!detail || !detail.messages?.length) {
|
||||
@@ -800,6 +1214,40 @@ export class ChatRunSocket {
|
||||
}
|
||||
|
||||
if (toInsert.length > 0) {
|
||||
// Get in-memory messages to preserve reasoning that was streamed via SSE
|
||||
const state = this.sessionMap.get(localSessionId)
|
||||
const memoryMessages = state?.messages || []
|
||||
logger.info('[chat-run-socket] syncFromHermes: memory has %d messages, DB has %d messages',
|
||||
memoryMessages.length, toInsert.length)
|
||||
|
||||
// Match messages by order since Hermes DB and memory should have same sequence
|
||||
let memoryIdx = 0
|
||||
let mergedCount = 0
|
||||
for (let i = 0; i < toInsert.length && memoryIdx < memoryMessages.length; i++) {
|
||||
const dbMsg = toInsert[i]
|
||||
// Skip user messages in memory when matching
|
||||
while (memoryIdx < memoryMessages.length && memoryMessages[memoryIdx].role === 'user') {
|
||||
memoryIdx++
|
||||
}
|
||||
if (memoryIdx >= memoryMessages.length) break
|
||||
const memoryMsg = memoryMessages[memoryIdx]
|
||||
// Only merge if roles match
|
||||
if (dbMsg.role === memoryMsg.role) {
|
||||
// Merge reasoning from memory if DB doesn't have it
|
||||
if (!dbMsg.reasoning && memoryMsg.reasoning) {
|
||||
dbMsg.reasoning = memoryMsg.reasoning
|
||||
mergedCount++
|
||||
logger.info('[chat-run-socket] syncFromHermes: merged reasoning from memory to DB for %s message at index %d',
|
||||
dbMsg.role, i)
|
||||
}
|
||||
}
|
||||
memoryIdx++
|
||||
}
|
||||
|
||||
if (mergedCount > 0) {
|
||||
logger.info('[chat-run-socket] syncFromHermes: merged reasoning for %d messages', mergedCount)
|
||||
}
|
||||
|
||||
for (const msg of toInsert) {
|
||||
// Resolve tool_name from assistant's tool_calls if missing
|
||||
let toolName = msg.tool_name || null
|
||||
@@ -816,7 +1264,7 @@ export class ChatRunSocket {
|
||||
timestamp: msg.timestamp || Math.floor(Date.now() / 1000),
|
||||
token_count: msg.token_count || null,
|
||||
finish_reason: msg.finish_reason || null,
|
||||
reasoning: msg.reasoning || null,
|
||||
reasoning: msg.reasoning || null, // Now includes merged reasoning
|
||||
reasoning_details: msg.reasoning_details || null,
|
||||
reasoning_content: msg.reasoning_content || null,
|
||||
codex_reasoning_items: msg.codex_reasoning_items || null,
|
||||
@@ -843,7 +1291,7 @@ export class ChatRunSocket {
|
||||
const state = this.sessionMap.get(localSessionId)
|
||||
if (state) {
|
||||
const emit = (event: string, payload: any) => {
|
||||
this.nsp.to(`session:${localSessionId}`).emit(event, { ...payload, session_id: localSessionId })
|
||||
socket.emit(event, { ...payload, session_id: localSessionId })
|
||||
}
|
||||
this.calcAndUpdateUsage(localSessionId, state, emit)
|
||||
}
|
||||
@@ -871,6 +1319,7 @@ export class ChatRunSocket {
|
||||
} catch { /* best-effort */ }
|
||||
}
|
||||
|
||||
|
||||
/** Get or create session state in sessionMap */
|
||||
private getOrCreateSession(sessionId: string): SessionState {
|
||||
let state = this.sessionMap.get(sessionId)
|
||||
|
||||
Reference in New Issue
Block a user