diff --git a/package.json b/package.json index 8a8551a..5e9b14a 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "hermes-web-ui", - "version": "0.5.15", + "version": "0.5.16", "description": "Self-hosted AI chat dashboard for Hermes Agent — multi-model (Claude, GPT, Gemini, DeepSeek) web UI with Telegram, Discord, Slack, WhatsApp integration", "repository": { "type": "git", diff --git a/packages/client/src/data/changelog.ts b/packages/client/src/data/changelog.ts index 2e0142a..bc72ebe 100644 --- a/packages/client/src/data/changelog.ts +++ b/packages/client/src/data/changelog.ts @@ -5,6 +5,16 @@ export interface ChangelogEntry { } export const changelog: ChangelogEntry[] = [ + { + version: '0.5.16', + date: '2026-05-10', + changes: [ + 'changelog.new_0_5_16_1', + 'changelog.new_0_5_16_2', + 'changelog.new_0_5_16_3', + 'changelog.new_0_5_16_4', + ], + }, { version: '0.5.15', date: '2026-05-09', diff --git a/packages/client/src/i18n/locales/de.ts b/packages/client/src/i18n/locales/de.ts index c0a817a..d35a091 100644 --- a/packages/client/src/i18n/locales/de.ts +++ b/packages/client/src/i18n/locales/de.ts @@ -665,6 +665,10 @@ jobTriggered: 'Job ausgelost', new_0_5_15_9: 'Hermes Markdown-Medien-Rendering und Sync-Wiederholung korrigiert', new_0_5_15_10: 'Upstream-Umgebungsvariablenabhängigkeit entfernt', new_0_5_15_11: 'Wenn die Kanban-Funktion nicht verfügbar ist, updaten Sie bitte hermes-agent', + new_0_5_16_1: 'Chat-Streaming von /v1/runs auf /v1/responses API migriert für geringere Latenz', + new_0_5_16_2: 'Echte API-Nutzung (Tokens, Cache, Reasoning) in Nutzungsstatistik-Tabelle speichern', + new_0_5_16_3: 'QQ-Gruppen-QR-Code zur Website-Navigationsleiste hinzugefügt', + new_0_5_16_4: 'Unbenutztes codex_reasoning_items-Feld aus dem Nachrichtenschema entfernt', new_0_5_13_1: 'Nachrichtenwarteschlange für sequenzielle Run-Verarbeitung hinzugefügt, um gleichzeitige Konflikte zu vermeiden', new_0_5_13_2: 'Zwei-Ebenen-Skills-Verzeichnisstruktur mit Sonstige-Kategorie für flache Skills unterstützt', new_0_5_13_3: 'Temporäre Sitzungen (eph_*) beim Start-Sync filtern, um interne Sitzungen nicht zu importieren', diff --git a/packages/client/src/i18n/locales/en.ts b/packages/client/src/i18n/locales/en.ts index 3dc24c9..3ef8f98 100644 --- a/packages/client/src/i18n/locales/en.ts +++ b/packages/client/src/i18n/locales/en.ts @@ -912,6 +912,10 @@ export default { new_0_5_15_9: 'Fix Hermes markdown media rendering and sync retry', new_0_5_15_10: 'Refactor to remove upstream env dependency', new_0_5_15_11: 'If the Kanban feature is not available, please upgrade hermes-agent', + new_0_5_16_1: 'Migrate chat streaming from /v1/runs to /v1/responses API for lower latency', + new_0_5_16_2: 'Persist real API usage (tokens, cache, reasoning) to usage table', + new_0_5_16_3: 'Add QQ group QR code to website navigation bar', + new_0_5_16_4: 'Remove unused codex_reasoning_items field from message schema', new_0_5_13_1: 'Add message queue for sequential run processing to prevent concurrent request conflicts', new_0_5_13_2: 'Support two-level skills directory structure with misc category for flat skills', new_0_5_13_3: 'Filter out ephemeral sessions during startup sync to avoid importing internal sessions', diff --git a/packages/client/src/i18n/locales/es.ts b/packages/client/src/i18n/locales/es.ts index 8f6cfaf..ce396f2 100644 --- a/packages/client/src/i18n/locales/es.ts +++ b/packages/client/src/i18n/locales/es.ts @@ -661,6 +661,10 @@ jobTriggered: 'Job ejecutado', new_0_5_15_9: 'Corregido renderizado de medios Markdown y reintento de sincronización', new_0_5_15_10: 'Eliminada dependencia de variables de entorno upstream', new_0_5_15_11: 'Si la función Kanban no está disponible, actualice hermes-agent', + new_0_5_16_1: 'Migrar streaming de chat de /v1/runs a /v1/responses API para menor latencia', + new_0_5_16_2: 'Persistir uso real de API (tokens, caché, razonamiento) en tabla de estadísticas', + new_0_5_16_3: 'Añadir código QR del grupo QQ a la barra de navegación del sitio web', + new_0_5_16_4: 'Eliminar campo codex_reasoning_items no utilizado del esquema de mensajes', new_0_5_13_1: 'Cola de mensajes para procesamiento secuencial de ejecuciones, evitando conflictos concurrentes', new_0_5_13_2: 'Soporte para estructura de directorios de skills de dos niveles con categoría misc', new_0_5_13_3: 'Filtrar sesiones efímeras (eph_*) durante la sincronización de inicio', diff --git a/packages/client/src/i18n/locales/fr.ts b/packages/client/src/i18n/locales/fr.ts index 6a25079..3b04589 100644 --- a/packages/client/src/i18n/locales/fr.ts +++ b/packages/client/src/i18n/locales/fr.ts @@ -660,6 +660,10 @@ jobTriggered: 'Job declenche', new_0_5_15_9: 'Correction rendu média Markdown et retry synchronisation', new_0_5_15_10: 'Suppression dépendance variable d\'environnement amont', new_0_5_15_11: 'Si la fonction Kanban n\'est pas disponible, veuillez mettre à niveau hermes-agent', + new_0_5_16_1: 'Migration du streaming de chat de /v1/runs vers l\'API /v1/responses pour une latence réduite', + new_0_5_16_2: 'Persistance de l\'utilisation réelle de l\'API (tokens, cache, raisonnement) dans la table des statistiques', + new_0_5_16_3: 'Ajout du code QR du groupe QQ dans la barre de navigation du site', + new_0_5_16_4: 'Suppression du champ codex_reasoning_items inutilisé du schéma de messages', new_0_5_13_1: 'File d\'attente de messages pour le traitement séquentiel des exécutions, évitant les conflits concurrents', new_0_5_13_2: 'Prise en charge de la structure de répertoire de skills à deux niveaux avec catégorie divers', new_0_5_13_3: 'Filtrer les sessions éphémères (eph_*) lors de la synchronisation au démarrage', diff --git a/packages/client/src/i18n/locales/ja.ts b/packages/client/src/i18n/locales/ja.ts index 61adcb8..49c2423 100644 --- a/packages/client/src/i18n/locales/ja.ts +++ b/packages/client/src/i18n/locales/ja.ts @@ -661,6 +661,10 @@ export default { new_0_5_15_9: 'Hermes Markdownメディアレンダリングと同期リトライを修正', new_0_5_15_10: 'アップストリーム環境変数依存をリファクタリングで削除', new_0_5_15_11: 'カンバン機能が使用できない場合は、hermes-agent をアップグレードしてください', + new_0_5_16_1: 'チャットストリーミングを /v1/runs から /v1/responses API に移行し、レイテンシを削減', + new_0_5_16_2: '実際の API 使用量(トークン、キャッシュ、推論)を統計テーブルに保存', + new_0_5_16_3: 'ウェブサイトのナビゲーションバーにQQグループのQRコードを追加', + new_0_5_16_4: 'メッセージスキーマから未使用の codex_reasoning_items フィールドを削除', new_0_5_13_1: 'メッセージキューによる順次実行処理で同時リクエストの競合を防止', new_0_5_13_2: '2階層スキルディレクトリ構造をサポート、フラットスキルは「その他」カテゴリに分類', new_0_5_13_3: '起動同期時に一時セッション(eph_*)をフィルタリング', diff --git a/packages/client/src/i18n/locales/ko.ts b/packages/client/src/i18n/locales/ko.ts index ecdaf58..46baa8e 100644 --- a/packages/client/src/i18n/locales/ko.ts +++ b/packages/client/src/i18n/locales/ko.ts @@ -661,6 +661,10 @@ export default { new_0_5_15_9: 'Hermes Markdown 미디어 렌더링 및 동기화 재시도 수정', new_0_5_15_10: '업스트림 환경 변수 종속성 제거', new_0_5_15_11: '칸반 기능을 사용할 수 없는 경우 hermes-agent를 업그레이드하세요', + new_0_5_16_1: '채팅 스트리밍을 /v1/runs에서 /v1/responses API로 마이그레이션하여 지연 시간 단축', + new_0_5_16_2: '실제 API 사용량(토큰, 캐시, 추론)을 사용량 통계 테이블에 저장', + new_0_5_16_3: '웹사이트 내비게이션 바에 QQ 그룹 QR 코드 추가', + new_0_5_16_4: '메시지 스키마에서 사용하지 않는 codex_reasoning_items 필드 제거', new_0_5_13_1: '메시지 큐를 통한 순차 실행 처리로 동시 요청 충돌 방지', new_0_5_13_2: '2단계 스킬 디렉토리 구조 지원, 플랫 스킬은 기타 카테고리로 분류', new_0_5_13_3: '시작 동기화 시 임시 세션(eph_*) 필터링', diff --git a/packages/client/src/i18n/locales/pt.ts b/packages/client/src/i18n/locales/pt.ts index 465dcfa..24ae093 100644 --- a/packages/client/src/i18n/locales/pt.ts +++ b/packages/client/src/i18n/locales/pt.ts @@ -661,6 +661,10 @@ jobTriggered: 'Job acionado', new_0_5_15_9: 'Corrigido renderização de mídia Markdown e retry de sincronização', new_0_5_15_10: 'Removida dependência de variável de ambiente upstream', new_0_5_15_11: 'Se o recurso Kanban não estiver disponível, atualize o hermes-agent', + new_0_5_16_1: 'Migrar streaming de chat de /v1/runs para /v1/responses API para menor latência', + new_0_5_16_2: 'Persistir uso real da API (tokens, cache, raciocínio) na tabela de estatísticas', + new_0_5_16_3: 'Adicionar código QR do grupo QQ à barra de navegação do site', + new_0_5_16_4: 'Remover campo codex_reasoning_items não utilizado do esquema de mensagens', new_0_5_13_1: 'Fila de mensagens para processamento sequencial de execuções, evitando conflitos concorrentes', new_0_5_13_2: 'Suporte à estrutura de diretório de skills de dois níveis com categoria diversos', new_0_5_13_3: 'Filtrar sessões efêmeras (eph_*) durante a sincronização na inicialização', diff --git a/packages/client/src/i18n/locales/zh.ts b/packages/client/src/i18n/locales/zh.ts index 591104d..cd85163 100644 --- a/packages/client/src/i18n/locales/zh.ts +++ b/packages/client/src/i18n/locales/zh.ts @@ -914,6 +914,10 @@ export default { new_0_5_15_9: '修复 Hermes Markdown 媒体渲染与同步重试', new_0_5_15_10: '重构移除上游环境变量依赖', new_0_5_15_11: '如果看板功能无法使用,请升级 hermes-agent', + new_0_5_16_1: '聊天流式接口从 /v1/runs 迁移至 /v1/responses,降低延迟', + new_0_5_16_2: '持久化真实 API 用量(token、缓存、推理)到用量统计表', + new_0_5_16_3: '官网导航栏新增 QQ 群二维码', + new_0_5_16_4: '移除消息 schema 中未使用的 codex_reasoning_items 字段', new_0_5_13_1: '新增消息队列,顺序处理运行请求,避免并发冲突', new_0_5_13_2: '支持二级 Skills 目录结构,扁平化 Skill 归入"杂项"分类', new_0_5_13_3: '启动同步时过滤临时会话(eph_*),避免导入内部会话', diff --git a/packages/client/src/stores/hermes/chat.ts b/packages/client/src/stores/hermes/chat.ts index f064435..f45f6c5 100644 --- a/packages/client/src/stores/hermes/chat.ts +++ b/packages/client/src/stores/hermes/chat.ts @@ -27,6 +27,7 @@ export interface Message { content: string timestamp: number toolName?: string + toolCallId?: string toolPreview?: string toolArgs?: string toolResult?: string @@ -156,6 +157,7 @@ function mapHermesMessages(msgs: HermesMessage[]): Message[] { content: '', timestamp: Math.round(msg.timestamp * 1000), toolName: tc.function?.name || 'tool', + toolCallId: tc.id, toolArgs: tc.function?.arguments || undefined, toolStatus: 'done', }) @@ -191,6 +193,7 @@ function mapHermesMessages(msgs: HermesMessage[]): Message[] { content: '', timestamp: Math.round(msg.timestamp * 1000), toolName, + toolCallId: tcId || undefined, toolArgs, toolPreview: typeof preview === 'string' ? preview.slice(0, 100) || undefined : undefined, toolResult: msg.content || undefined, @@ -910,6 +913,7 @@ export const useChatStore = defineStore('chat', () => { case 'tool.started': { runHadToolActivity = true const msgs = getSessionMsgs(sid) + const toolCallId = (evt as any).tool_call_id as string | undefined const last = activeAssistantMessageId ? msgs.find(m => m.id === activeAssistantMessageId) : msgs[msgs.length - 1] @@ -917,13 +921,27 @@ export const useChatStore = defineStore('chat', () => { updateMessage(sid, last.id, { isStreaming: false }) } activeAssistantMessageId = null + const existingTool = toolCallId + ? msgs.find(m => m.role === 'tool' && m.toolCallId === toolCallId) + : null + if (existingTool) { + updateMessage(sid, existingTool.id, { + toolName: evt.tool || evt.name, + toolArgs: typeof (evt as any).arguments === 'string' ? (evt as any).arguments : existingTool.toolArgs, + toolPreview: evt.preview || existingTool.toolPreview, + toolStatus: existingTool.toolStatus || 'running', + }) + break + } addMessage(sid, { id: uid(), role: 'tool', content: '', timestamp: Date.now(), toolName: evt.tool || evt.name, + toolCallId, toolPreview: evt.preview, + toolArgs: typeof (evt as any).arguments === 'string' ? (evt as any).arguments : undefined, toolStatus: 'running', }) @@ -933,9 +951,10 @@ export const useChatStore = defineStore('chat', () => { case 'tool.completed': { runHadToolActivity = true const msgs = getSessionMsgs(sid) - const toolMsgs = msgs.filter( - m => m.role === 'tool' && m.toolStatus === 'running', - ) + const toolCallId = (evt as any).tool_call_id as string | undefined + const toolMsgs = toolCallId + ? msgs.filter(m => m.role === 'tool' && m.toolCallId === toolCallId) + : msgs.filter(m => m.role === 'tool' && m.toolStatus === 'running') if (toolMsgs.length > 0) { const last = toolMsgs[toolMsgs.length - 1] // Check if tool errored @@ -944,6 +963,7 @@ export const useChatStore = defineStore('chat', () => { updateMessage(sid, last.id, { toolStatus: hasError ? 'error' : 'done', toolDuration: duration, + toolResult: typeof (evt as any).output === 'string' ? (evt as any).output : undefined, }) } @@ -1326,6 +1346,7 @@ export const useChatStore = defineStore('chat', () => { case 'tool.started': { runHadToolActivity = true const msgs = getSessionMsgs(sid) + const toolCallId = (evt as any).tool_call_id as string | undefined const last = activeAssistantMessageId ? msgs.find(m => m.id === activeAssistantMessageId) : msgs[msgs.length - 1] @@ -1333,13 +1354,27 @@ export const useChatStore = defineStore('chat', () => { updateMessage(sid, last.id, { isStreaming: false }) } activeAssistantMessageId = null + const existingTool = toolCallId + ? msgs.find(m => m.role === 'tool' && m.toolCallId === toolCallId) + : null + if (existingTool) { + updateMessage(sid, existingTool.id, { + toolName: evt.tool || evt.name, + toolArgs: typeof (evt as any).arguments === 'string' ? (evt as any).arguments : existingTool.toolArgs, + toolPreview: evt.preview || existingTool.toolPreview, + toolStatus: existingTool.toolStatus || 'running', + }) + break + } addMessage(sid, { id: uid(), role: 'tool', content: '', timestamp: Date.now(), toolName: evt.tool || evt.name, + toolCallId, toolPreview: evt.preview, + toolArgs: typeof (evt as any).arguments === 'string' ? (evt as any).arguments : undefined, toolStatus: 'running', }) @@ -1349,12 +1384,16 @@ export const useChatStore = defineStore('chat', () => { case 'tool.completed': { runHadToolActivity = true const msgs = getSessionMsgs(sid) - const toolMsgs = msgs.filter(m => m.role === 'tool' && m.toolStatus === 'running') + const toolCallId = (evt as any).tool_call_id as string | undefined + const toolMsgs = toolCallId + ? msgs.filter(m => m.role === 'tool' && m.toolCallId === toolCallId) + : msgs.filter(m => m.role === 'tool' && m.toolStatus === 'running') if (toolMsgs.length > 0) { const hasError = (evt as any).error === true updateMessage(sid, toolMsgs[toolMsgs.length - 1].id, { toolStatus: hasError ? 'error' : 'done', toolDuration: (evt as any).duration, + toolResult: typeof (evt as any).output === 'string' ? (evt as any).output : undefined, }) } diff --git a/packages/server/src/db/hermes/schemas.ts b/packages/server/src/db/hermes/schemas.ts index 7c5f6d6..97fb5e6 100644 --- a/packages/server/src/db/hermes/schemas.ts +++ b/packages/server/src/db/hermes/schemas.ts @@ -70,7 +70,6 @@ export const MESSAGES_SCHEMA: Record = { reasoning: 'TEXT', reasoning_details: 'TEXT', reasoning_content: 'TEXT', - codex_reasoning_items: 'TEXT', } export const MESSAGES_INDEX = 'CREATE INDEX IF NOT EXISTS idx_messages_session_id ON messages(session_id)' diff --git a/packages/server/src/db/hermes/session-store.ts b/packages/server/src/db/hermes/session-store.ts index 2bb7dd3..7669f50 100644 --- a/packages/server/src/db/hermes/session-store.ts +++ b/packages/server/src/db/hermes/session-store.ts @@ -45,7 +45,6 @@ export interface HermesMessageRow { finish_reason: string | null reasoning: string | null reasoning_details?: string | null - codex_reasoning_items?: string | null reasoning_content?: string | null } @@ -121,7 +120,6 @@ function mapMessageRow(row: Record): HermesMessageRow { finish_reason: row.finish_reason != null ? String(row.finish_reason) : null, reasoning: row.reasoning != null ? String(row.reasoning) : null, reasoning_details: row.reasoning_details != null ? String(row.reasoning_details) : null, - codex_reasoning_items: row.codex_reasoning_items != null ? String(row.codex_reasoning_items) : null, reasoning_content: row.reasoning_content != null ? String(row.reasoning_content) : null, } } @@ -343,21 +341,20 @@ export function addMessage(msg: { reasoning?: string | null reasoning_details?: string | null reasoning_content?: string | null - codex_reasoning_items?: string | null }): number | undefined { if (!isSqliteAvailable()) return undefined const db = getDb()! const toolCallsJson = msg.tool_calls ? JSON.stringify(msg.tool_calls) : null const result = db.prepare( - `INSERT INTO ${MESSAGES_TABLE} (session_id, role, content, tool_call_id, tool_calls, tool_name, timestamp, token_count, finish_reason, reasoning, reasoning_details, reasoning_content, codex_reasoning_items) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + `INSERT INTO ${MESSAGES_TABLE} (session_id, role, content, tool_call_id, tool_calls, tool_name, timestamp, token_count, finish_reason, reasoning, reasoning_details, reasoning_content) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, ).run( msg.session_id, msg.role, msg.content, msg.tool_call_id ?? null, toolCallsJson, msg.tool_name ?? null, msg.timestamp ?? Math.floor(Date.now() / 1000), msg.token_count ?? null, msg.finish_reason ?? null, msg.reasoning ?? null, msg.reasoning_details ?? null, - msg.reasoning_content ?? null, msg.codex_reasoning_items ?? null, + msg.reasoning_content ?? null, ) return result.lastInsertRowid as number } @@ -375,13 +372,12 @@ export function addMessages(msgs: Array<{ reasoning?: string | null reasoning_details?: string | null reasoning_content?: string | null - codex_reasoning_items?: string | null }>): void { if (!isSqliteAvailable() || msgs.length === 0) return const db = getDb()! const insert = db.prepare( - `INSERT INTO ${MESSAGES_TABLE} (session_id, role, content, tool_call_id, tool_calls, tool_name, timestamp, token_count, finish_reason, reasoning, reasoning_details, reasoning_content, codex_reasoning_items) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + `INSERT INTO ${MESSAGES_TABLE} (session_id, role, content, tool_call_id, tool_calls, tool_name, timestamp, token_count, finish_reason, reasoning, reasoning_details, reasoning_content) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, ) db.exec('BEGIN') try { @@ -393,7 +389,7 @@ export function addMessages(msgs: Array<{ msg.timestamp ?? Math.floor(Date.now() / 1000), msg.token_count ?? null, msg.finish_reason ?? null, msg.reasoning ?? null, msg.reasoning_details ?? null, - msg.reasoning_content ?? null, msg.codex_reasoning_items ?? null, + msg.reasoning_content ?? null, ) } db.exec('COMMIT') diff --git a/packages/server/src/db/hermes/sessions-db.ts b/packages/server/src/db/hermes/sessions-db.ts index f9929b7..d68c126 100644 --- a/packages/server/src/db/hermes/sessions-db.ts +++ b/packages/server/src/db/hermes/sessions-db.ts @@ -53,7 +53,6 @@ export interface HermesMessageRow { finish_reason: string | null reasoning: string | null reasoning_details?: string | null - codex_reasoning_items?: string | null reasoning_content?: string | null } @@ -350,7 +349,6 @@ function mapMessageRow(row: Record): HermesMessageRow { finish_reason: normalizeNullableString(row.finish_reason), reasoning, reasoning_details: normalizeNullableString(row.reasoning_details), - codex_reasoning_items: normalizeNullableString(row.codex_reasoning_items), reasoning_content: normalizeNullableString(row.reasoning_content), } } diff --git a/packages/server/src/lib/context-compressor/index.ts b/packages/server/src/lib/context-compressor/index.ts index 8b8bba3..55f744d 100644 --- a/packages/server/src/lib/context-compressor/index.ts +++ b/packages/server/src/lib/context-compressor/index.ts @@ -13,7 +13,6 @@ * 6. Save snapshot: last_message_index = index where compression ends */ -import { EventSource } from 'eventsource' import { encodingForModel, getEncoding } from 'js-tiktoken' import { logger } from '../../services/logger' import { @@ -21,7 +20,6 @@ import { saveCompressionSnapshot, deleteCompressionSnapshot, } from '../../db/hermes/compression-snapshot' -import { getDb } from '../../db/index' // ─── Types ─────────────────────────────────────────────── @@ -376,8 +374,6 @@ export async function callSummarizer( previousSummary?: string, profile?: string, ): Promise { - const sessionId = `compress_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}` - const convHistory: Array<{ role: string; content: string }> = [...history] if (previousSummary) { @@ -390,88 +386,57 @@ export async function callSummarizer( const headers: Record = { 'Content-Type': 'application/json' } if (apiKey) headers['Authorization'] = `Bearer ${apiKey}` - const res = await fetch(`${upstream}/v1/runs`, { + const res = await fetch(`${upstream.replace(/\/$/, '')}/v1/responses`, { method: 'POST', headers, body: JSON.stringify({ input: prompt, conversation_history: convHistory, - session_id: sessionId, + stream: true, + store: false, }), signal: AbortSignal.timeout(timeoutMs), }) if (!res.ok) { - throw new Error(`Summarization run failed: ${res.status}`) + throw new Error(`Summarization response failed: ${res.status}`) } - const { run_id } = await res.json() as { run_id: string } + if (!res.body) { + throw new Error('Summarization response stream missing') + } - return new Promise((resolve, reject) => { - const timer = setTimeout(() => { - source.close() - reject(new Error('Summarization timed out')) - }, timeoutMs) + let output = '' + for await (const frame of readSseFrames(res.body)) { + let parsed: any + try { + parsed = JSON.parse(frame.data) + } catch { + continue + } + const eventType = parsed.type || frame.event || parsed.event - const eventsUrl = new URL(`${upstream}/v1/runs/${run_id}/events`) - - // Use Authorization header instead of query parameter for better compatibility - const eventSourceInit: any = apiKey ? { - fetch: (url: string, init: any = {}) => fetch(url, { - ...init, - headers: { - ...(init.headers || {}), - Authorization: `Bearer ${apiKey}`, - }, - }), - } : {} - - // @ts-ignore - eventsource library types are too strict - const source = new EventSource(eventsUrl.toString(), eventSourceInit) - - source.onmessage = (event: MessageEvent) => { - try { - const parsed = JSON.parse(event.data) - if (parsed.event === 'run.completed') { - clearTimeout(timer) - source.close() - deleteCompressSession(sessionId, profile).catch(() => { }) - const output = parsed.output - if (!output || typeof output !== 'string' || output.trim() === '') { - reject(new Error('Empty summarization response')) - return - } - resolve(output.trim()) - } else if (parsed.event === 'run.failed') { - clearTimeout(timer) - source.close() - deleteCompressSession(sessionId, profile).catch(() => { }) - reject(new Error(parsed.error || 'Summarization run failed')) - } - } catch { /* ignore parse errors */ } + if (eventType === 'response.output_text.delta' && parsed.delta) { + output += parsed.delta + continue } - source.onerror = () => { - clearTimeout(timer) - source.close() - deleteCompressSession(sessionId, profile).catch(() => { }) - reject(new Error('Summarization SSE connection error')) + if (eventType === 'response.completed') { + const response = parsed.response || parsed + const finalText = extractResponseText(response) + if (!output && finalText) output = finalText + if (!output || output.trim() === '') { + throw new Error('Empty summarization response') + } + return output.trim() } - }) -} -/** Enqueue compression session for later deletion instead of deleting immediately */ -async function deleteCompressSession(sessionId: string, profile?: string): Promise { - try { - const db = getDb() - if (!db) return - const now = Date.now() - db.prepare( - `INSERT INTO gc_pending_session_deletes (session_id, profile_name, status, attempt_count, last_error, created_at, updated_at, next_attempt_at) - VALUES (?, ?, 'pending', 0, NULL, ?, ?, 0) - ON CONFLICT(session_id) DO NOTHING`, - ).run(sessionId, profile || 'default', now, now) - } catch { /* best-effort */ } + if (eventType === 'response.failed') { + throw new Error(parsed.error?.message || parsed.error || 'Summarization response failed') + } + } + + throw new Error('Summarization response stream ended without a terminal event') } // ─── Main Compressor ──────────────────────────────────── @@ -665,3 +630,63 @@ export class ChatContextCompressor { deleteCompressionSnapshot(sessionId) } } + +async function* readSseFrames(stream: ReadableStream): AsyncGenerator<{ event?: string; data: string }> { + const decoder = new TextDecoder() + const reader = stream.getReader() + let buffer = '' + + try { + while (true) { + const { done, value } = await reader.read() + if (done) break + buffer += decoder.decode(value, { stream: true }) + + let boundary = buffer.indexOf('\n\n') + while (boundary >= 0) { + const raw = buffer.slice(0, boundary) + buffer = buffer.slice(boundary + 2) + const frame = parseSseFrame(raw) + if (frame?.data) yield frame + boundary = buffer.indexOf('\n\n') + } + } + + buffer += decoder.decode() + const frame = parseSseFrame(buffer) + if (frame?.data) yield frame + } finally { + reader.releaseLock() + } +} + +function parseSseFrame(raw: string): { event?: string; data: string } | null { + let event: string | undefined + const data: string[] = [] + for (const line of raw.split(/\r?\n/)) { + if (!line || line.startsWith(':')) continue + if (line.startsWith('event:')) { + event = line.slice(6).trim() + } else if (line.startsWith('data:')) { + data.push(line.slice(5).trimStart()) + } + } + if (data.length === 0) return null + return { event, data: data.join('\n') } +} + +function extractResponseText(response: any): string { + const output = Array.isArray(response?.output) ? response.output : [] + const parts: string[] = [] + for (const item of output) { + if (item.type !== 'message') continue + const content = Array.isArray(item.content) ? item.content : [] + for (const part of content) { + if (part.type === 'output_text' || part.type === 'text') { + parts.push(part.text || '') + } + } + } + if (parts.length > 0) return parts.join('') + return typeof response?.output_text === 'string' ? response.output_text : '' +} diff --git a/packages/server/src/services/hermes/chat-run-socket.ts b/packages/server/src/services/hermes/chat-run-socket.ts index f0fc2aa..e347559 100644 --- a/packages/server/src/services/hermes/chat-run-socket.ts +++ b/packages/server/src/services/hermes/chat-run-socket.ts @@ -9,9 +9,6 @@ * the client emits 'resume' to rejoin its session room. */ import type { Server, Socket } from 'socket.io' -import { EventSource } from 'eventsource' -import { setRunSession } from '../../routes/hermes/proxy-handler' -import { updateUsage } from '../../db/hermes/usage-store' import { getSystemPrompt } from '../../lib/llm-prompt' import { getSession, @@ -19,16 +16,15 @@ import { getSessionDetailPaginated, createSession, addMessage, - addMessages, updateSessionStats, useLocalSessionStore, } from '../../db/hermes/session-store' -import { getDb } from '../../db/index' import { getSessionDetailFromDb } from '../../db/hermes/sessions-db' import { getModelContextLength } from './model-context' import { ChatContextCompressor, countTokens, SUMMARY_PREFIX } from '../../lib/context-compressor' import { getCompressionSnapshot } from '../../db/hermes/compression-snapshot' import { parseLLMJSON, parseToolArguments, parseAnthropicContentArray } from '../../lib/llm-json' +import { updateUsage } from '../../db/hermes/usage-store' import { logger } from '../logger' /** @@ -136,7 +132,7 @@ interface SessionMessage { session_id: string role: string content: string - hermesSessionId?: string + runMarker?: string tool_call_id?: string | null tool_calls?: any[] | null tool_name?: string | null @@ -146,7 +142,6 @@ interface SessionMessage { reasoning?: string | null reasoning_details?: string | null reasoning_content?: string | null - codex_reasoning_items?: string | null } interface QueuedRun { @@ -162,13 +157,22 @@ interface SessionState { isWorking: boolean events: Array<{ event: string; data: any }> abortController?: AbortController - eventSource?: EventSource runId?: string profile?: string inputTokens?: number outputTokens?: number isAborting?: boolean queue: QueuedRun[] + responseRun?: ResponseRunState +} + +interface ResponseRunState { + runMarker?: string + responseId?: string + text: string + textInserted: boolean + insertedKeys: Set + toolCalls: Map } // --- ChatRunSocket --- @@ -178,7 +182,6 @@ export class ChatRunSocket { private gatewayManager: any /** sessionId → session state (messages, working status, events, run tracking) */ private sessionMap = new Map() - private hermesSessionIds = new Map() constructor(io: Server, gatewayManager: any) { this.nsp = io.of('/chat-run') @@ -479,9 +482,9 @@ export class ChatRunSocket { const upstream = this.gatewayManager.getUpstream(profile).replace(/\/$/, '') const apiKey = this.gatewayManager.getApiKey(profile) || undefined - // Generate ephemeral session ID for Hermes (fresh session per run) - const hermesSessionId = session_id - ? `eph_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}` + // Local marker used only to group in-memory messages for this streamed response. + const runMarker = session_id + ? `resp_run_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}` : undefined const now = Math.floor(Date.now() / 1000) @@ -494,7 +497,6 @@ export class ChatRunSocket { : { messages: [], isWorking: false, events: [], queue: [] } this.sessionMap.set(session_id, state) } - this.hermesSessionIds.set(session_id, hermesSessionId) state.isWorking = true state.profile = profile @@ -504,7 +506,7 @@ export class ChatRunSocket { state.messages.push({ id: state.messages.length + 1, session_id, - hermesSessionId, + runMarker, role: 'user', content: inputStr, timestamp: now, @@ -531,7 +533,7 @@ export class ChatRunSocket { state.messages.push({ id: state.messages.length + 1, session_id, - hermesSessionId, + runMarker, role: 'user', content: inputStr, timestamp: now, @@ -564,7 +566,6 @@ export class ChatRunSocket { try { // Build upstream request body const body: Record = { input } - if (hermesSessionId) body.session_id = hermesSessionId if (model) body.model = model if (instructions) { body.instructions = `${getSystemPrompt()}\n${instructions}` @@ -892,11 +893,22 @@ export class ChatRunSocket { if (body.conversation_history && Array.isArray(body.conversation_history)) { body.conversation_history = convertHistoryFormat(body.conversation_history) } - const res = await fetch(`${upstream}/v1/runs`, { + body.stream = true + body.store = false + + const abortController = new AbortController() + if (session_id) { + const state = this.getOrCreateSession(session_id) + state.isWorking = true + state.runId = undefined + state.abortController = abortController + } + + const res = await fetch(`${upstream}/v1/responses`, { method: 'POST', headers, body: JSON.stringify(body), - signal: AbortSignal.timeout(120_000), + signal: abortController.signal, }) if (!res.ok) { const text = await res.text().catch(() => '') @@ -906,295 +918,93 @@ export class ChatRunSocket { if (session_id && queueLen > 0) this.dequeueNextQueuedRun(socket, session_id) return } - - const runData = await res.json() as any - const runId = runData.run_id - if (!runId) { + if (!res.body) { const queueLen = session_id ? this.sessionMap.get(session_id)?.queue?.length ?? 0 : 0 if (session_id) await this.markCompleted(socket, session_id, { event: 'run.failed' }) - emit('run.failed', { event: 'run.failed', error: 'No run_id in upstream response', queue_remaining: queueLen }) + emit('run.failed', { event: 'run.failed', error: 'Upstream response stream missing', queue_remaining: queueLen }) if (session_id && queueLen > 0) this.dequeueNextQueuedRun(socket, session_id) return } - if (session_id) { - setRunSession(runId, session_id) - } - - const abortController = new AbortController() - if (session_id) { - const state = this.getOrCreateSession(session_id) - state.isWorking = true - state.runId = runId - state.abortController = abortController - } - - emit('run.started', { - event: 'run.started', - run_id: runId, - status: runData.status, - queue_length: session_id ? this.sessionMap.get(session_id)?.queue.length || 0 : 0, - }) - - // Stream upstream events via EventSource — survives socket disconnect - const eventsUrl = new URL(`${upstream}/v1/runs/${runId}/events`) - - // Use Authorization header instead of query parameter for better compatibility - const eventSourceInit: any = apiKey ? { - fetch: (url: string, init: any = {}) => fetch(url, { - ...init, - headers: { - ...(init.headers || {}), - Authorization: `Bearer ${apiKey}`, - }, - }), - } : {} - - // @ts-ignore - eventsource library types are too strict - const source = new EventSource(eventsUrl.toString(), eventSourceInit) - if (session_id) { - const state = this.getOrCreateSession(session_id) - state.eventSource = source - } - - source.onmessage = async (event: MessageEvent) => { + let responseId: string | undefined + for await (const frame of readSseFrames(res.body)) { + let parsed: any try { - const parsed = JSON.parse(event.data as string) - // Debug: log all events from upstream - if (parsed.event?.includes('reasoning') || parsed.event?.includes('thinking')) { - logger.info('[chat-run-socket] upstream event: %s, data: %j', parsed.event, parsed) - } else { - logger.info('[chat-run-socket] upstream event: %s', parsed.event) - } + parsed = JSON.parse(frame.data) + } catch { + continue + } + const upstreamEvent = parsed.type || frame.event || parsed.event + logger.info('[chat-run-socket] upstream response event: %s', upstreamEvent) - // Track messages into sessionMap - if (session_id) { - const state = this.sessionMap.get(session_id) - if (state) { - const msgs = state.messages - const last = [...msgs].reverse().find(m => m.hermesSessionId === hermesSessionId) - - switch (parsed.event) { - case 'message.delta': { - let deltaText = parsed.delta || '' - - // Try to extract text from JSON delta (e.g., "[{\"type\":\"text\",\"text\":\"hello\"}]") - if (deltaText.trim().startsWith('[') && deltaText.trim().endsWith(']')) { - try { - const parsedDelta = parseAnthropicContentArray(deltaText) - const textParts = parsedDelta - .filter((b: any) => b.type === 'text') - .map((b: any) => b.text || '') - deltaText = textParts.join('') - } catch { - // If parsing fails, use delta as-is - } - } - - if (last?.role === 'assistant' && last.finish_reason == null) { - last.content += deltaText - } else { - msgs.push({ - id: msgs.length + 1, - session_id, - hermesSessionId, - role: 'assistant', - content: deltaText, - timestamp: Math.floor(Date.now() / 1000), - }) - } - break - } - case 'reasoning.delta': - case 'thinking.delta': { - const text = parsed.text || parsed.delta || '' - if (!text) break - if (last?.role === 'assistant' && last.finish_reason == null) { - last.reasoning = (last.reasoning || '') + text - } else { - msgs.push({ - id: msgs.length + 1, - session_id, - role: 'assistant', - hermesSessionId, - content: '', - reasoning: text, - timestamp: Math.floor(Date.now() / 1000), - }) - } - break - } - case 'tool.started': { - if (last?.role === 'assistant' && last.finish_reason == null) { - last.finish_reason = 'tool_calls' - } - msgs.push({ - id: msgs.length + 1, - session_id, - role: 'tool', - hermesSessionId, - content: '', - tool_call_id: parsed.tool_call_id || null, - tool_name: parsed.tool || parsed.name || null, - timestamp: Math.floor(Date.now() / 1000), - }) - break - } - case 'tool.completed': { - const toolMsg = [...msgs].reverse().find(m => - m.hermesSessionId === hermesSessionId && m.role === 'tool' && !m.content - ) - if (toolMsg && parsed.output) { - toolMsg.content = typeof parsed.output === 'string' ? parsed.output : JSON.stringify(parsed.output) - } - break - } - case 'run.completed': { - logger.info('[chat-run-socket] ENTER run.completed case, session_id: %s, messages: %d', - session_id, msgs.length) - - if (last?.role === 'assistant' && last.finish_reason == null) { - last.finish_reason = parsed.finish_reason || 'stop' - } - - // Debug: log run.completed to check if reasoning is included - logger.info('[chat-run-socket] run.completed keys: %s', Object.keys(parsed)) - // Finalize assistant message — if no content was streamed, use output - if (parsed.output && !runProducedAssistantText(msgs, hermesSessionId)) { - let outputContent = parsed.output - - // Parse output if it's a stringified array - if (typeof outputContent === 'string' && - outputContent.trim().startsWith('[') && - outputContent.trim().endsWith(']')) { - try { - const parsedOutput = parseAnthropicContentArray(outputContent) - const textParts = parsedOutput - .filter((b: any) => b.type === 'text') - .map((b: any) => b.text || '') - outputContent = textParts.join('') - } catch { - // If parsing fails, use output as-is - } - } - - if (last?.role === 'assistant') { - last.content = outputContent - } else { - msgs.push({ - id: msgs.length + 1, - session_id, - hermesSessionId, - role: 'assistant', - content: outputContent, - timestamp: Math.floor(Date.now() / 1000), - }) - } - } - - // Always parse output if it's an array format (for parsed_content field) - // Only extract text content (tool_calls and reasoning are already sent via other events) - if (parsed.output && typeof parsed.output === 'string' && - parsed.output.trim().startsWith('[') && parsed.output.trim().endsWith(']')) { - try { - const parsedOutput = parseAnthropicContentArray(parsed.output) - const textParts = parsedOutput - .filter((b: any) => b.type === 'text') - .map((b: any) => b.text || '') - - // Set parsed_content for frontend (only text content) - parsed.parsed_content = textParts.join('') || '' - logger.info('[chat-run-socket] parsed output from run.completed event') - } catch (e) { - logger.error(e, '[chat-run-socket] failed to parse output from run.completed') - } - } - - // Parse stringified array content for all assistant messages - // Only extract text content (tool_calls and reasoning are already in message fields) - let parsedCount = 0 - for (const msg of msgs) { - if (msg.hermesSessionId === hermesSessionId && - msg.role === 'assistant' && typeof msg.content === 'string' && - msg.content.trim().startsWith('[') && msg.content.trim().endsWith(']')) { - try { - logger.info('[chat-run-socket] parsing array content for message %s, content preview: %s', - msg.id, msg.content.slice(0, 100)) - const parsedContent = parseAnthropicContentArray(msg.content) - const textBlocks = parsedContent - .filter((b: any) => b.type === 'text') - .map((b: any) => b.text || '') - - msg.content = textBlocks.join('') || '' - parsedCount++ - } catch (e) { - logger.error(e, '[chat-run-socket] failed to parse array content for message %s', msg.id) - } - } - } - - logger.info('[chat-run-socket] EXIT run.completed case, parsed %d messages', parsedCount) - - // Attach the last assistant message's parsed content to fix stringified array format - const lastAssistantMsg = msgs.filter((m: any) => - m.hermesSessionId === hermesSessionId && m.role === 'assistant' - ).pop() - if (lastAssistantMsg && parsedCount > 0) { - parsed.parsed_content = lastAssistantMsg.content || '' - parsed.parsed_tool_calls = lastAssistantMsg.tool_calls || null - parsed.parsed_reasoning = lastAssistantMsg.reasoning || null - logger.info('[chat-run-socket] attached parsed content to run.completed event for message %s', lastAssistantMsg.id) - } - - break - } + if (session_id) { + const state = this.sessionMap.get(session_id) + if (state) { + const mapped = this.applyResponseStreamEvent(state, session_id, runMarker, upstreamEvent, parsed) + if (mapped) { + if (mapped.runId) { + responseId = mapped.runId + state.runId = responseId } + emit(mapped.event, mapped.payload) } } + } - if (parsed.event === 'run.completed' || parsed.event === 'run.failed') { - source.close() - if (session_id && this.sessionMap.get(session_id)?.isAborting) { - logger.info({ - sessionId: session_id, - runId: parsed.run_id, - event: parsed.event, - }, '[chat-run-socket][abort] suppressing upstream terminal event during abort') - return - } - const queueLen = session_id ? this.sessionMap.get(session_id)?.queue?.length ?? 0 : 0 - if (session_id) await this.markCompleted(socket, session_id, { event: parsed.event, run_id: parsed.run_id }) - // Tag the event with queue_remaining so frontend knows more runs are pending - parsed.queue_remaining = queueLen - emit(parsed.event || 'message', parsed) - if (session_id && queueLen > 0) { - this.dequeueNextQueuedRun(socket, session_id) - } + if (upstreamEvent === 'response.completed' || upstreamEvent === 'response.failed') { + if (session_id && this.sessionMap.get(session_id)?.isAborting) { + logger.info({ + sessionId: session_id, + runId: responseId, + event: upstreamEvent, + }, '[chat-run-socket][abort] suppressing upstream terminal event during abort') return } - - // Usage will be calculated after syncFromHermes completes (in markCompleted) - - emit(parsed.event || 'message', parsed) - } catch { /* not JSON, skip */ } - } - - source.onerror = () => { - source.close() - if (session_id && this.sessionMap.get(session_id)?.isAborting) { - logger.info({ sessionId: session_id }, '[chat-run-socket][abort] event source closed during abort') + const queueLen = session_id ? this.sessionMap.get(session_id)?.queue?.length ?? 0 : 0 + if (session_id) await this.markCompleted(socket, session_id, { + event: upstreamEvent === 'response.completed' ? 'run.completed' : 'run.failed', + run_id: responseId, + }) + const finalOutput = parsed.response || parsed + const finalText = extractResponseText(finalOutput) + if (upstreamEvent === 'response.completed' && session_id) { + const usage = finalOutput.usage || {} + updateUsage(session_id, { + inputTokens: usage.input_tokens ?? usage.inputTokens ?? 0, + outputTokens: usage.output_tokens ?? usage.outputTokens ?? 0, + cacheReadTokens: usage.cache_read_tokens ?? usage.cacheReadTokens ?? 0, + cacheWriteTokens: usage.cache_write_tokens ?? usage.cacheWriteTokens ?? 0, + reasoningTokens: usage.reasoning_tokens ?? usage.reasoningTokens ?? 0, + model: finalOutput.model || '', + profile: this.sessionMap.get(session_id)?.profile, + }) + } + const eventName = upstreamEvent === 'response.completed' ? 'run.completed' : 'run.failed' + emit(eventName, { + event: eventName, + run_id: responseId || finalOutput.id, + response_id: responseId || finalOutput.id, + output: finalText, + usage: finalOutput.usage, + error: finalOutput.error || parsed.error, + queue_remaining: queueLen, + }) + if (session_id && queueLen > 0) { + this.dequeueNextQueuedRun(socket, session_id) + } return } - const queueLen = session_id ? this.sessionMap.get(session_id)?.queue?.length ?? 0 : 0 - if (session_id) { - void this.markCompleted(socket, session_id, { event: 'run.failed' }).then(() => { - emit('run.failed', { event: 'run.failed', error: 'EventSource connection lost', queue_remaining: queueLen }) - if (queueLen > 0) this.dequeueNextQueuedRun(socket, session_id) - }) - } else { - emit('run.failed', { event: 'run.failed', error: 'EventSource connection lost' }) - } } + const queueLen = session_id ? this.sessionMap.get(session_id)?.queue?.length ?? 0 : 0 + if (session_id) await this.markCompleted(socket, session_id, { event: 'run.failed', run_id: responseId }) + emit('run.failed', { + event: 'run.failed', + run_id: responseId, + response_id: responseId, + error: 'Response stream ended without a terminal event', + queue_remaining: queueLen, + }) + if (session_id && queueLen > 0) this.dequeueNextQueuedRun(socket, session_id) } catch (err: any) { const queueLen = session_id ? this.sessionMap.get(session_id)?.queue?.length ?? 0 : 0 if (session_id) { @@ -1208,17 +1018,237 @@ export class ChatRunSocket { } } + private applyResponseStreamEvent( + state: SessionState, + sessionId: string, + runMarker: string | undefined, + eventType: string, + parsed: any, + ): { event: string; payload: any; runId?: string } | null { + const run = this.getResponseRunState(state, runMarker) + const now = () => Math.floor(Date.now() / 1000) + + if (eventType === 'response.created') { + const response = parsed.response || parsed + run.responseId = response.id || run.responseId + return { + event: 'run.started', + runId: run.responseId, + payload: { + event: 'run.started', + run_id: run.responseId, + response_id: run.responseId, + status: response.status || 'in_progress', + queue_length: state.queue.length || 0, + }, + } + } + + if (eventType === 'response.output_text.delta') { + const deltaText = parsed.delta || parsed.text || '' + if (!deltaText) return null + run.text += deltaText + + const last = [...state.messages].reverse().find(m => m.runMarker === runMarker) + if (last?.role === 'assistant' && last.finish_reason == null && !last.tool_calls?.length) { + last.content += deltaText + } else { + state.messages.push({ + id: state.messages.length + 1, + session_id: sessionId, + runMarker, + role: 'assistant', + content: deltaText, + timestamp: now(), + }) + } + return { + event: 'message.delta', + payload: { + event: 'message.delta', + run_id: run.responseId, + response_id: run.responseId, + delta: deltaText, + }, + } + } + + if (eventType === 'response.output_text.done') { + const text = parsed.text || run.text + this.insertResponseTextOnce(state, sessionId, runMarker, text) + return null + } + + if (eventType === 'response.output_item.added') { + const item = parsed.item || parsed.output_item || parsed + if (item.type !== 'function_call') return null + const callId = item.call_id || item.id + if (!callId) return null + run.toolCalls.set(callId, responseFunctionCallToToolCall(item)) + return null + } + + if (eventType === 'response.output_item.done') { + const item = parsed.item || parsed.output_item || parsed + if (item.type === 'function_call') { + const callId = item.call_id || item.id + if (!callId) return null + const toolCall = responseFunctionCallToToolCall(item) + run.toolCalls.set(callId, toolCall) + + const key = `assistant:${callId}` + if (!run.insertedKeys.has(key)) { + run.insertedKeys.add(key) + state.messages.push({ + id: state.messages.length + 1, + session_id: sessionId, + runMarker, + role: 'assistant', + content: '', + tool_calls: [toolCall], + finish_reason: 'tool_calls', + timestamp: now(), + }) + addMessage({ + session_id: sessionId, + role: 'assistant', + content: '', + tool_calls: [toolCall], + finish_reason: 'tool_calls', + timestamp: now(), + }) + } + return { + event: 'tool.started', + payload: { + event: 'tool.started', + run_id: run.responseId, + response_id: run.responseId, + tool_call_id: callId, + tool: toolCall.function.name, + name: toolCall.function.name, + arguments: toolCall.function.arguments, + preview: summarizeToolArguments(toolCall.function.arguments), + }, + } + } + + if (item.type === 'function_call_output') { + const callId = item.call_id || item.id + if (!callId) return null + const key = `tool:${callId}` + const output = typeof item.output === 'string' ? item.output : JSON.stringify(item.output ?? '') + const toolName = run.toolCalls.get(callId)?.function?.name || null + if (!run.insertedKeys.has(key)) { + run.insertedKeys.add(key) + state.messages.push({ + id: state.messages.length + 1, + session_id: sessionId, + runMarker, + role: 'tool', + content: output, + tool_call_id: callId, + tool_name: toolName, + timestamp: now(), + }) + addMessage({ + session_id: sessionId, + role: 'tool', + content: output, + tool_call_id: callId, + tool_name: toolName, + timestamp: now(), + }) + } + return { + event: 'tool.completed', + payload: { + event: 'tool.completed', + run_id: run.responseId, + response_id: run.responseId, + tool_call_id: callId, + tool: toolName, + name: toolName, + output, + }, + } + } + } + + if (eventType === 'response.completed') { + const response = parsed.response || parsed + run.responseId = response.id || run.responseId + const output = Array.isArray(response.output) ? response.output : [] + for (const item of output) { + if (item.type === 'function_call') { + this.applyResponseStreamEvent(state, sessionId, runMarker, 'response.output_item.done', { item }) + } else if (item.type === 'function_call_output') { + this.applyResponseStreamEvent(state, sessionId, runMarker, 'response.output_item.done', { item }) + } + } + this.insertResponseTextOnce(state, sessionId, runMarker, extractResponseText(response)) + } + + return null + } + + private getResponseRunState(state: SessionState, runMarker?: string): ResponseRunState { + if (!state.responseRun || state.responseRun.runMarker !== runMarker) { + state.responseRun = { + runMarker, + text: '', + textInserted: false, + insertedKeys: new Set(), + toolCalls: new Map(), + } + } + return state.responseRun + } + + private insertResponseTextOnce( + state: SessionState, + sessionId: string, + runMarker: string | undefined, + text: string, + ) { + const run = this.getResponseRunState(state, runMarker) + if (run.textInserted || !text?.trim()) return + run.textInserted = true + + const last = [...state.messages].reverse().find(m => m.runMarker === runMarker) + if (last?.role === 'assistant' && !last.tool_calls?.length) { + last.content = text + last.finish_reason = 'stop' + } else { + state.messages.push({ + id: state.messages.length + 1, + session_id: sessionId, + runMarker, + role: 'assistant', + content: text, + finish_reason: 'stop', + timestamp: Math.floor(Date.now() / 1000), + }) + } + addMessage({ + session_id: sessionId, + role: 'assistant', + content: text, + finish_reason: 'stop', + timestamp: Math.floor(Date.now() / 1000), + }) + } + // --- Abort handler --- private async handleAbort(socket: Socket, sessionId: string) { const state = this.sessionMap.get(sessionId) - if (!state?.isWorking || !state.runId) { + if (!state?.isWorking || (!state.runId && !state.abortController)) { logger.info({ sessionId }, '[chat-run-socket][abort] ignored: no active run') if (state) { state.isWorking = false state.isAborting = false state.abortController = undefined - state.eventSource = undefined state.runId = undefined state.events = [] } @@ -1244,46 +1274,15 @@ export class ChatRunSocket { }) logger.info({ sessionId, runId }, '[chat-run-socket][abort] started') - // Call upstream stop endpoint - const profile = state.profile || 'default' - const upstream = this.gatewayManager.getUpstream(profile).replace(/\/$/, '') - const apiKey = this.gatewayManager.getApiKey(profile) || undefined - - try { - const headers: Record = { 'Content-Type': 'application/json' } - if (apiKey) headers['Authorization'] = `Bearer ${apiKey}` - - logger.info({ sessionId, runId, upstream }, '[chat-run-socket][abort] calling upstream stop') - await fetch(`${upstream}/v1/runs/${runId}/stop`, { - method: 'POST', - headers, - signal: AbortSignal.timeout(10_000), - }) - logger.info('[chat-run-socket] called upstream stop for run %s (session: %s)', runId, sessionId) - logger.info({ sessionId, runId, graceMs: 5000 }, '[chat-run-socket][abort] upstream stop accepted, waiting for graceful exit') - - // Wait for upstream to process the stop request - await new Promise(resolve => setTimeout(resolve, 5000)) - } catch (err: any) { - logger.warn(err, '[chat-run-socket] failed to call upstream stop for run %s (session: %s)', runId, sessionId) - logger.warn({ sessionId, runId, error: err?.message }, '[chat-run-socket][abort] upstream stop failed, continuing local completion') - } - - // Close local EventSource connection after the upstream grace period. - if (state.eventSource) { - state.eventSource.close() - state.eventSource = undefined - logger.info({ sessionId, runId }, '[chat-run-socket][abort] event source closed') - } if (state.abortController) { state.abortController.abort() } - await this.markAbortCompleted(socket, sessionId, runId) + await this.markAbortCompleted(socket, sessionId, runId || 'response_stream') } /** Mark a session run as completed/failed so reconnecting clients get notified */ - private async markCompleted(socket: Socket, sessionId: string, _info: { event: string; run_id?: string }) { + private async markCompleted(_socket: Socket, sessionId: string, _info: { event: string; run_id?: string }) { const state = this.sessionMap.get(sessionId) if (state) { if (state.isAborting) { @@ -1295,21 +1294,15 @@ export class ChatRunSocket { } state.isWorking = false state.abortController = undefined - state.eventSource = undefined state.runId = undefined state.events = [] - // Sync messages from Hermes ephemeral session to local DB - if (useLocalSessionStore() && this.hermesSessionIds.get(sessionId)) { - const hermesId = this.hermesSessionIds.get(sessionId) - const prof = state.profile - this.hermesSessionIds.delete(sessionId) - state.profile = undefined - await this.syncFromHermes(socket, sessionId, hermesId, prof, { - maxAttempts: 4, - delayMs: 1000, - }) + state.responseRun = undefined + state.profile = undefined + updateSessionStats(sessionId) + const emit = (event: string, payload: any) => { + this.nsp.to(`session:${sessionId}`).emit(event, { ...payload, session_id: sessionId }) } - + await this.calcAndUpdateUsage(sessionId, state, emit) } } @@ -1337,24 +1330,15 @@ export class ChatRunSocket { const state = this.sessionMap.get(sessionId) if (!state) return - const hermesId = this.hermesSessionIds.get(sessionId) const profile = state.profile - let synced = false - if (useLocalSessionStore() && hermesId) { - this.hermesSessionIds.delete(sessionId) - logger.info({ sessionId, hermesId, profile: profile || 'default' }, '[chat-run-socket][abort] syncing stopped run from Hermes') - synced = await this.syncFromHermes(socket, sessionId, hermesId, profile, { - maxAttempts: 4, - delayMs: 1000, - }) - } + updateSessionStats(sessionId) state.isWorking = false state.isAborting = false state.profile = undefined state.abortController = undefined - state.eventSource = undefined state.runId = undefined + state.responseRun = undefined // Process queued messages after abort completes if (state.queue.length > 0) { @@ -1363,13 +1347,13 @@ export class ChatRunSocket { this.replaceState(sessionId, 'abort.completed', { event: 'abort.completed', run_id: runId, - synced, + synced: true, queue_length: state.queue.length + 1, }) this.emitToSession(socket, sessionId, 'abort.completed', { event: 'abort.completed', run_id: runId, - synced, + synced: true, queue_length: state.queue.length + 1, }) this.emitToSession(socket, sessionId, 'run.queued', { @@ -1390,14 +1374,14 @@ export class ChatRunSocket { this.replaceState(sessionId, 'abort.completed', { event: 'abort.completed', run_id: runId, - synced, + synced: true, }) this.emitToSession(socket, sessionId, 'abort.completed', { event: 'abort.completed', run_id: runId, - synced, + synced: true, }) - logger.info({ sessionId, runId, synced }, '[chat-run-socket][abort] completed') + logger.info({ sessionId, runId, synced: true }, '[chat-run-socket][abort] completed') } /** @@ -1445,200 +1429,6 @@ export class ChatRunSocket { } } - /** - * Read complete messages from Hermes state.db for the ephemeral session - * and write to local DB. This gives us tool results that SSE events don't include. - * After sync, enqueues the ephemeral session for deletion. - */ - private async syncFromHermes( - socket: Socket, - localSessionId: string, - hermesSessionId: string, - profile?: string, - options?: { maxAttempts?: number; delayMs?: number }, - ): Promise { - const maxAttempts = options?.maxAttempts || 1 - const delayMs = options?.delayMs || 0 - try { - let detail: Awaited> | null = null - for (let attempt = 1; attempt <= maxAttempts; attempt++) { - detail = await getSessionDetailFromDb(hermesSessionId) - if (!detail || !detail.messages?.length) { - logger.warn('[chat-run-socket] syncFromHermes: no data for Hermes session %s (attempt %d/%d)', hermesSessionId, attempt, maxAttempts) - logger.info({ localSessionId, hermesSessionId, attempt, maxAttempts }, '[chat-run-socket][abort] sync waiting for Hermes data') - if (attempt < maxAttempts && delayMs > 0) { - await new Promise(resolve => setTimeout(resolve, delayMs)) - continue - } - this.enqueueEphemeralDelete(hermesSessionId, profile) - return false - } - break - } - if (!detail) return false - - // Skip user messages for DB insert; they are already written in handleRun. - // Keep them in memory replacement so replacing an ephemeral run does not - // delete the queued user message from state.messages. - const toInsert = detail.messages.filter(m => m.role !== 'user') - const toReplaceInMemory = detail.messages - - // Build tool_call_id → function.name lookup from assistant messages - // (Hermes stores tool_name as NULL, name lives inside tool_calls JSON) - const toolNameMap = new Map() - for (const msg of detail.messages) { - if (msg.role === 'assistant' && Array.isArray(msg.tool_calls)) { - for (const tc of msg.tool_calls) { - const id = tc.id || tc.call_id || tc.tool_call_id - const name = tc.function?.name || tc.name - if (id && name) toolNameMap.set(id, name) - } - } - } - - if (toInsert.length > 0) { - // Get in-memory messages to preserve reasoning that was streamed via SSE - const state = this.sessionMap.get(localSessionId) - const memoryMessages = state?.messages || [] - logger.info('[chat-run-socket] syncFromHermes: memory has %d messages, DB has %d messages', - memoryMessages.length, toInsert.length) - - // Match messages by order since Hermes DB and memory should have same sequence - let memoryIdx = 0 - let mergedCount = 0 - for (let i = 0; i < toInsert.length && memoryIdx < memoryMessages.length; i++) { - const dbMsg = toInsert[i] - // Skip user messages in memory when matching - while (memoryIdx < memoryMessages.length && memoryMessages[memoryIdx].role === 'user') { - memoryIdx++ - } - if (memoryIdx >= memoryMessages.length) break - const memoryMsg = memoryMessages[memoryIdx] - // Only merge if roles match - if (dbMsg.role === memoryMsg.role) { - // Merge reasoning from memory if DB doesn't have it - if (!dbMsg.reasoning && memoryMsg.reasoning) { - dbMsg.reasoning = memoryMsg.reasoning - mergedCount++ - logger.info('[chat-run-socket] syncFromHermes: merged reasoning from memory to DB for %s message at index %d', - dbMsg.role, i) - } - } - memoryIdx++ - } - - if (mergedCount > 0) { - logger.info('[chat-run-socket] syncFromHermes: merged reasoning for %d messages', mergedCount) - } - - // Batch insert with transaction for atomicity - addMessages(toInsert.map(msg => { - // Resolve tool_name from assistant's tool_calls if missing - let toolName = msg.tool_name || null - if (!toolName && msg.tool_call_id) { - toolName = toolNameMap.get(msg.tool_call_id) || null - } - return { - session_id: localSessionId, - role: msg.role, - content: msg.content || '', - tool_call_id: msg.tool_call_id || null, - tool_calls: msg.tool_calls || null, - tool_name: toolName, - timestamp: msg.timestamp || Math.floor(Date.now() / 1000), - token_count: msg.token_count || null, - finish_reason: msg.finish_reason || null, - reasoning: msg.reasoning || null, - reasoning_details: msg.reasoning_details || null, - reasoning_content: msg.reasoning_content || null, - codex_reasoning_items: msg.codex_reasoning_items || null, - } - })) - - logger.info('[chat-run-socket] syncFromHermes: synced %d messages to local session %s', toInsert.length, localSessionId) - } - - updateSessionStats(localSessionId) - - // Record usage from Hermes session - updateUsage(localSessionId, { - inputTokens: detail.input_tokens, - outputTokens: detail.output_tokens, - cacheReadTokens: detail.cache_read_tokens, - cacheWriteTokens: detail.cache_write_tokens, - reasoningTokens: detail.reasoning_tokens, - model: detail.model, - profile: profile || 'default', - }) - - // Calculate usage from DB now that data is complete - // Use inputTokens already set by compression path if available - const state = this.sessionMap.get(localSessionId) - if (state) { - const messages = this.handleMessage(toReplaceInMemory, localSessionId) - if (messages.length > 0) { - this.replaceByHermesSessionId(localSessionId, hermesSessionId, messages) - } - const emit = (event: string, payload: any) => { - const tagged = localSessionId ? { ...payload, localSessionId } : payload - if (localSessionId) { - this.nsp.to(`session:${localSessionId}`).emit(event, tagged) - } else if (socket.connected) { - socket.emit(event, tagged) - } - } - this.calcAndUpdateUsage(localSessionId, state, emit) - } - - // Enqueue ephemeral session for deferred deletion - this.enqueueEphemeralDelete(hermesSessionId, profile) - return true - } catch (err: any) { - logger.warn(err, '[chat-run-socket] syncFromHermes failed for session %s (hermesId: %s, profile: %s): %s', localSessionId, hermesSessionId, profile || 'default', err?.message) - return false - } - } - private replaceByHermesSessionId(session_id: string, hermesSessionId: string, newItems: SessionMessage[]) { - let start = -1 - let end = -1 - const state = this.sessionMap.get(session_id) - const msg = state?.messages || [] - // 找区间 - for (let i = 0; i < msg.length; i++) { - if (msg[i].hermesSessionId === hermesSessionId) { - if (start === -1) start = i - end = i - } else if (start !== -1) { - // 已经找到一段,后面断了就可以结束 - break - } - } - - // 没找到 - if (start === -1) return - if (!newItems.some(item => item.role === 'user')) { - const existingUsers = msg.slice(start, end + 1).filter(item => item.role === 'user') - newItems = [...existingUsers, ...newItems] - } - // 替换 - msg.splice(start, end - start + 1, ...newItems) - } - /** Enqueue an ephemeral Hermes session for deferred deletion */ - private enqueueEphemeralDelete(hermesSessionId: string, profile?: string) { - try { - const db = getDb() - if (!db) return - const now = Date.now() - db.prepare( - `INSERT INTO gc_pending_session_deletes (session_id, profile_name, status, attempt_count, last_error, created_at, updated_at, next_attempt_at) - VALUES (?, ?, 'pending', 0, NULL, ?, ?, ?) - ON CONFLICT(session_id) DO NOTHING`, - ).run(hermesSessionId, profile || 'default', now, now, now) - logger.info('[chat-run-socket] enqueued ephemeral session %s for deletion', hermesSessionId) - } catch { /* best-effort */ } - } - - /** Get or create session state in sessionMap */ private getOrCreateSession(sessionId: string): SessionState { let state = this.sessionMap.get(sessionId) @@ -1676,7 +1466,7 @@ export class ChatRunSocket { } } - /** Close all active EventSource connections and abort controllers */ + /** Close all active upstream response streams */ close() { for (const [sessionId, state] of this.sessionMap.entries()) { if (state.abortController) { @@ -1688,16 +1478,103 @@ export class ChatRunSocket { } } this.sessionMap.clear() - this.hermesSessionIds.clear() logger.info('[chat-run-socket] closed all connections and cleared state') } } -/** Check if the current ephemeral run has already produced assistant text. */ -function runProducedAssistantText(messages: SessionMessage[], hermesSessionId?: string): boolean { - return messages.some(m => - m.hermesSessionId === hermesSessionId && - m.role === 'assistant' && - !!m.content?.trim() - ) +async function* readSseFrames(stream: ReadableStream): AsyncGenerator<{ event?: string; data: string }> { + const decoder = new TextDecoder() + const reader = stream.getReader() + let buffer = '' + + try { + while (true) { + const { done, value } = await reader.read() + if (done) break + buffer += decoder.decode(value, { stream: true }) + + let boundary = buffer.indexOf('\n\n') + while (boundary >= 0) { + const raw = buffer.slice(0, boundary) + buffer = buffer.slice(boundary + 2) + const frame = parseSseFrame(raw) + if (frame?.data) yield frame + boundary = buffer.indexOf('\n\n') + } + } + + buffer += decoder.decode() + const frame = parseSseFrame(buffer) + if (frame?.data) yield frame + } finally { + reader.releaseLock() + } +} + +function parseSseFrame(raw: string): { event?: string; data: string } | null { + let event: string | undefined + const data: string[] = [] + for (const line of raw.split(/\r?\n/)) { + if (!line || line.startsWith(':')) continue + if (line.startsWith('event:')) { + event = line.slice(6).trim() + } else if (line.startsWith('data:')) { + data.push(line.slice(5).trimStart()) + } + } + if (data.length === 0) return null + return { event, data: data.join('\n') } +} + +function responseFunctionCallToToolCall(item: any): any { + const callId = item.call_id || item.id || '' + const name = item.name || item.function?.name || '' + let args = item.arguments ?? item.function?.arguments ?? '{}' + if (typeof args !== 'string') { + args = JSON.stringify(args ?? {}) + } + return { + id: callId, + type: 'function', + function: { + name, + arguments: args || '{}', + }, + } +} + +function summarizeToolArguments(args: string): string | undefined { + if (!args) return undefined + try { + const parsed = JSON.parse(args) + if (!parsed || typeof parsed !== 'object') return args.slice(0, 120) + const preferredKeys = ['cmd', 'command', 'code', 'query', 'path', 'url', 'prompt'] + for (const key of preferredKeys) { + const value = parsed[key] + if (typeof value === 'string' && value.trim()) { + return value.replace(/\s+/g, ' ').slice(0, 160) + } + } + const first = Object.entries(parsed).find(([, value]) => typeof value === 'string' && value.trim()) + if (first) return String(first[1]).replace(/\s+/g, ' ').slice(0, 160) + return JSON.stringify(parsed).slice(0, 160) + } catch { + return args.replace(/\s+/g, ' ').slice(0, 160) + } +} + +function extractResponseText(response: any): string { + const output = Array.isArray(response?.output) ? response.output : [] + const parts: string[] = [] + for (const item of output) { + if (item.type !== 'message') continue + const content = Array.isArray(item.content) ? item.content : [] + for (const part of content) { + if (part.type === 'output_text' || part.type === 'text') { + parts.push(part.text || '') + } + } + } + if (parts.length > 0) return parts.join('') + return typeof response?.output_text === 'string' ? response.output_text : '' } diff --git a/packages/server/src/services/hermes/context-engine/gateway-client.ts b/packages/server/src/services/hermes/context-engine/gateway-client.ts index 55e145e..b824c24 100644 --- a/packages/server/src/services/hermes/context-engine/gateway-client.ts +++ b/packages/server/src/services/hermes/context-engine/gateway-client.ts @@ -1,4 +1,3 @@ -import { EventSource } from 'eventsource' import type { StoredMessage, GatewayCaller } from './types' import { buildSummarizationSystemPrompt, @@ -6,12 +5,11 @@ import { buildIncrementalUpdatePrompt, } from './prompt' import { updateUsage } from '../../../db/hermes/usage-store' -import { getSessionDetailFromDbWithProfile } from '../../../db/hermes/sessions-db' import { logger } from '../../logger' /** - * Calls Hermes /v1/runs to produce LLM-generated summaries. - * Uses non-streaming EventSource to wait for run.completed. + * Calls Hermes /v1/responses to produce LLM-generated summaries. + * The context engine owns history assembly; Responses storage/chaining is not used. */ export class GatewaySummarizer implements GatewayCaller { private timeoutMs: number @@ -29,13 +27,11 @@ export class GatewaySummarizer implements GatewayCaller { profile: string, previousSummary?: string, ): Promise<{ summary: string; sessionId: string }> { - // Build conversation_history from messages const history: Array<{ role: string; content: string }> = messages.map(m => ({ role: 'user', content: `[${m.senderName}]: ${m.content}`, })) - // Inject previous summary for incremental update if (previousSummary) { history.unshift( { role: 'user', content: `[Previous summary]\n${previousSummary}` }, @@ -47,10 +43,7 @@ export class GatewaySummarizer implements GatewayCaller { ? buildIncrementalUpdatePrompt() : buildFullSummaryPrompt() - const sessionId = Date.now().toString(36) + Math.random().toString(36).slice(2, 8) - - // POST /v1/runs - const res = await fetch(`${upstream}/v1/runs`, { + const res = await fetch(`${upstream.replace(/\/$/, '')}/v1/responses`, { method: 'POST', headers: { 'Content-Type': 'application/json', @@ -60,98 +53,122 @@ export class GatewaySummarizer implements GatewayCaller { input: userPrompt, instructions: systemPrompt || buildSummarizationSystemPrompt(), conversation_history: history, - session_id: sessionId, + stream: true, + store: false, }), signal: AbortSignal.timeout(this.timeoutMs), }) if (!res.ok) { - throw new Error(`Summarization run failed: ${res.status}`) + throw new Error(`Summarization response failed: ${res.status}`) + } + if (!res.body) { + throw new Error('Summarization response stream missing') } - const { run_id } = await res.json() as { run_id: string } + let output = '' + for await (const frame of readSseFrames(res.body)) { + let parsed: any + try { + parsed = JSON.parse(frame.data) + } catch { + continue + } + const eventType = parsed.type || frame.event || parsed.event - try { - const output = await this.pollForResult(upstream, apiKey, run_id, sessionId, roomId, profile) - return { summary: output, sessionId } - } finally { - // Note: session cleanup is handled by the caller (compressor.ts) + if (eventType === 'response.output_text.delta' && parsed.delta) { + output += parsed.delta + continue + } + + if (eventType === 'response.completed') { + const response = parsed.response || parsed + const finalText = extractResponseText(response) + if (!output && finalText) output = finalText + + const usage = response.usage || {} + updateUsage(roomId, { + inputTokens: usage.input_tokens ?? usage.inputTokens ?? 0, + outputTokens: usage.output_tokens ?? usage.outputTokens ?? 0, + cacheReadTokens: usage.cache_read_tokens ?? usage.cacheReadTokens ?? 0, + cacheWriteTokens: usage.cache_write_tokens ?? usage.cacheWriteTokens ?? 0, + reasoningTokens: usage.reasoning_tokens ?? usage.reasoningTokens ?? 0, + model: response.model || '', + profile, + }) + logger.debug(`[GatewaySummarizer] Recorded response usage for compression room ${roomId} (profile=${profile}): input=${usage.input_tokens ?? 0}, output=${usage.output_tokens ?? 0}`) + + if (!output || output.trim() === '') { + throw new Error('Empty summarization response') + } + return { summary: output.trim(), sessionId: '' } + } + + if (eventType === 'response.failed') { + throw new Error(parsed.error?.message || parsed.error || 'Summarization response failed') + } } + + throw new Error('Summarization response stream ended without a terminal event') } - - private pollForResult(upstream: string, apiKey: string | null, runId: string, sessionId: string, roomId: string, profile: string): Promise { - return new Promise((resolve, reject) => { - const timer = setTimeout(() => { - source.close() - reject(new Error('Summarization timed out')) - }, this.timeoutMs) - - const eventsUrl = new URL(`${upstream}/v1/runs/${runId}/events`) - - // Use Authorization header instead of query parameter for better compatibility - const eventSourceInit: any = apiKey ? { - fetch: (url: string, init: any = {}) => fetch(url, { - ...init, - headers: { - ...(init.headers || {}), - Authorization: `Bearer ${apiKey}`, - }, - }), - } : {} - - // @ts-ignore - eventsource library types are too strict - const source = new EventSource(eventsUrl.toString(), eventSourceInit) - - source.onmessage = async (event: MessageEvent) => { - try { - const parsed = JSON.parse(event.data) - if (parsed.event === 'run.completed') { - clearTimeout(timer) - - // Record usage data from Hermes state.db BEFORE closing source - // This ensures we fetch usage before sessionCleaner can delete it - try { - const detail = await getSessionDetailFromDbWithProfile(sessionId, profile) - if (detail) { - updateUsage(roomId, { - inputTokens: detail.input_tokens, - outputTokens: detail.output_tokens, - cacheReadTokens: detail.cache_read_tokens, - cacheWriteTokens: detail.cache_write_tokens, - reasoningTokens: detail.reasoning_tokens, - model: detail.model, - profile, - }) - logger.debug(`[GatewaySummarizer] Recorded usage for compression room ${roomId} (session ${sessionId}, profile=${profile}): input=${detail.input_tokens}, output=${detail.output_tokens}`) - } else { - logger.warn(`[GatewaySummarizer] Failed to get session detail for ${sessionId} (profile=${profile})`) - } - } catch (err: any) { - logger.warn(err, '[GatewaySummarizer] Failed to record usage from DB') - } - - source.close() - - const output = parsed.output - if (!output || typeof output !== 'string' || output.trim() === '') { - reject(new Error('Empty summarization response')) - return - } - resolve(output.trim()) - } else if (parsed.event === 'run.failed') { - clearTimeout(timer) - source.close() - reject(new Error(parsed.error || 'Summarization run failed')) - } - } catch { /* ignore parse errors for non-JSON events */ } - } - - source.onerror = () => { - clearTimeout(timer) - source.close() - reject(new Error('Summarization SSE connection error')) - } - }) - } - +} + +async function* readSseFrames(stream: ReadableStream): AsyncGenerator<{ event?: string; data: string }> { + const decoder = new TextDecoder() + const reader = stream.getReader() + let buffer = '' + + try { + while (true) { + const { done, value } = await reader.read() + if (done) break + buffer += decoder.decode(value, { stream: true }) + + let boundary = buffer.indexOf('\n\n') + while (boundary >= 0) { + const raw = buffer.slice(0, boundary) + buffer = buffer.slice(boundary + 2) + const frame = parseSseFrame(raw) + if (frame?.data) yield frame + boundary = buffer.indexOf('\n\n') + } + } + + buffer += decoder.decode() + const frame = parseSseFrame(buffer) + if (frame?.data) yield frame + } finally { + reader.releaseLock() + } +} + +function parseSseFrame(raw: string): { event?: string; data: string } | null { + let event: string | undefined + const data: string[] = [] + for (const line of raw.split(/\r?\n/)) { + if (!line || line.startsWith(':')) continue + if (line.startsWith('event:')) { + event = line.slice(6).trim() + } else if (line.startsWith('data:')) { + data.push(line.slice(5).trimStart()) + } + } + if (data.length === 0) return null + return { event, data: data.join('\n') } +} + +function extractResponseText(response: any): string { + const output = Array.isArray(response?.output) ? response.output : [] + const parts: string[] = [] + for (const item of output) { + if (item.type !== 'message') continue + const content = Array.isArray(item.content) ? item.content : [] + for (const part of content) { + if (part.type === 'output_text' || part.type === 'text') { + parts.push(part.text || '') + } + } + } + if (parts.length > 0) return parts.join('') + return typeof response?.output_text === 'string' ? response.output_text : '' } diff --git a/packages/server/src/services/hermes/group-chat/agent-clients.ts b/packages/server/src/services/hermes/group-chat/agent-clients.ts index 8f2bd5a..4e659d5 100644 --- a/packages/server/src/services/hermes/group-chat/agent-clients.ts +++ b/packages/server/src/services/hermes/group-chat/agent-clients.ts @@ -1,12 +1,8 @@ import { io, Socket } from 'socket.io-client' -import { EventSource } from 'eventsource' import { getToken } from '../../../services/auth' import type { GatewayManager } from '../gateway-manager' -import { deleteSession as hermesDeleteSession } from '../hermes-cli' -import { getActiveProfileName } from '../hermes-profile' import { logger } from '../../../services/logger' import { updateUsage } from '../../../db/hermes/usage-store' -import { getSessionDetailFromDbWithProfile } from '../../../db/hermes/sessions-db' // ─── Types ──────────────────────────────────────────────────── @@ -186,29 +182,6 @@ class AgentClient { } } - private async deleteSession(sessionId: string): Promise { - try { - const sessionProfile = this.storage?.getSessionProfile?.(sessionId) - const currentProfile = getActiveProfileName() - - if (sessionProfile && sessionProfile.profile_name !== currentProfile) { - // Cross-profile: enqueue deferred delete, don't switch profile - this.storage?.enqueuePendingSessionDelete?.(sessionId, sessionProfile.profile_name) - logger.info(`[AgentClients] ${this.name}: cross-profile deferred delete session ${sessionId} (session=${sessionProfile.profile_name}, active=${currentProfile})`) - return - } - - // Same profile or no mapping: delete directly - const ok = await hermesDeleteSession(sessionId) - if (ok) { - this.storage?.deleteSessionProfile?.(sessionId) - } - logger.debug(`[AgentClients] ${this.name}: delete session ${sessionId} (profile=${this.profile}) → ${ok ? 'ok' : 'failed'}`) - } catch (err: any) { - logger.warn(`[AgentClients] ${this.name}: failed to delete session ${sessionId}: ${err.message}`) - } - } - // ─── Hermes Gateway Integration ──────────────────────────── /** @@ -235,8 +208,6 @@ class AgentClient { return } - const sessionId = Date.now().toString(36) + Math.random().toString(36).slice(2, 8) - try { // Notify room that agent is typing this.startTyping(roomId) @@ -290,8 +261,7 @@ class AgentClient { // Strip @mention from input — agent already knows it was mentioned const input = msg.content.replace(new RegExp(`@${this.name.replace(/[.*+?^${}()|[\]\\]/g, '\\$&')}\\s*`, 'gi'), '').trim() || msg.content - // Start a run on Hermes gateway - const runRes = await fetch(`${upstream}/v1/runs`, { + const responseRes = await fetch(`${upstream.replace(/\/$/, '')}/v1/responses`, { method: 'POST', headers: { 'Content-Type': 'application/json', @@ -299,126 +269,81 @@ class AgentClient { }, body: JSON.stringify({ input, - session_id: sessionId, ...(conversationHistory.length > 0 ? { conversation_history: conversationHistory } : {}), ...(instructions ? { instructions } : {}), + stream: true, + store: false, }), signal: AbortSignal.timeout(120000), }) - if (!runRes.ok) { - const text = await runRes.text().catch(() => '') - logger.error(`[AgentClients] ${this.name}: gateway run failed (${runRes.status}): ${text}`) + if (!responseRes.ok) { + const text = await responseRes.text().catch(() => '') + logger.error(`[AgentClients] ${this.name}: gateway response failed (${responseRes.status}): ${text}`) this.stopTyping(roomId) + onStatus?.('ready') return } - const runData = await runRes.json() as any - const run_id = runData.run_id - logger.debug(`[AgentClients] ${this.name}: run started, response=%j`, runData) - if (!run_id) { - logger.error(`[AgentClients] ${this.name}: no run_id in response`) + if (!responseRes.body) { + logger.error(`[AgentClients] ${this.name}: gateway response stream missing`) this.stopTyping(roomId) + onStatus?.('ready') return } - // Save session-to-profile mapping after gateway confirms the run - const actualSessionId = runData.session_id || sessionId - if (!this.storage) { - logger.warn(`[AgentClients] ${this.name}: storage is null, cannot save session profile for ${actualSessionId}`) - } else { - this.storage.saveSessionProfile(actualSessionId, roomId, this.agentId, this.profile) - logger.debug(`[AgentClients] ${this.name}: saved session profile ${actualSessionId} → profile=${this.profile}`) - } - - // Stream events from Hermes - const eventsUrl = new URL(`${upstream}/v1/runs/${run_id}/events`) - logger.debug(`[AgentClients] ${this.name}: streaming events from ${eventsUrl}`) - - // Use Authorization header instead of query parameter for better compatibility - const eventSourceInit: any = apiKey ? { - fetch: (url: string, init: any = {}) => fetch(url, { - ...init, - headers: { - ...(init.headers || {}), - Authorization: `Bearer ${apiKey}`, - }, - }), - } : {} - - // @ts-ignore - eventsource library types are too strict - const source = new EventSource(eventsUrl.toString(), eventSourceInit) - let fullContent = '' - - source.onmessage = async (e: any) => { + for await (const frame of readSseFrames(responseRes.body)) { + let parsed: any try { - const parsed = JSON.parse(e.data) - logger.debug(`[AgentClients] ${this.name}: event=${parsed.event}`) - - if (parsed.event === 'run.completed') { - // Record usage data from Hermes state.db BEFORE closing source - // This ensures we fetch usage before deleteSession can delete it - try { - const detail = await getSessionDetailFromDbWithProfile(actualSessionId, this.profile) - if (detail) { - updateUsage(roomId, { - inputTokens: detail.input_tokens, - outputTokens: detail.output_tokens, - cacheReadTokens: detail.cache_read_tokens, - cacheWriteTokens: detail.cache_write_tokens, - reasoningTokens: detail.reasoning_tokens, - model: detail.model, - profile: this.profile, - }) - logger.debug(`[AgentClients] Recorded usage for room ${roomId} (session ${actualSessionId}, profile=${this.profile}): input=${detail.input_tokens}, output=${detail.output_tokens}`) - } else { - logger.warn(`[AgentClients] Failed to get session detail for ${actualSessionId} (profile=${this.profile})`) - } - } catch (err: any) { - logger.warn(err, '[AgentClients] Failed to record usage from DB') - } - - source.close() - logger.debug(`[AgentClients] ${this.name}: run completed, content length=${fullContent.length}`) - if (fullContent) { - this.stopTyping(roomId) - this.sendMessage(roomId, fullContent) - } - this.deleteSession(actualSessionId).catch(() => { }) - onStatus?.('ready') - return - } - - if (parsed.event === 'run.failed') { - source.close() - logger.error(`[AgentClients] ${this.name}: run failed`) - this.stopTyping(roomId) - this.deleteSession(actualSessionId).catch(() => { }) - onStatus?.('ready') - return - } - - // Accumulate message deltas - if (parsed.event === 'message.delta' && parsed.delta) { - fullContent += parsed.delta - } + parsed = JSON.parse(frame.data) } catch { - // ignore parse errors + continue + } + const eventType = parsed.type || frame.event || parsed.event + logger.debug(`[AgentClients] ${this.name}: event=${eventType}`) + + if (eventType === 'response.output_text.delta' && parsed.delta) { + fullContent += parsed.delta + continue + } + + if (eventType === 'response.completed') { + const response = parsed.response || parsed + const finalText = extractResponseText(response) + if (!fullContent && finalText) fullContent = finalText + const usage = response.usage || {} + updateUsage(roomId, { + inputTokens: usage.input_tokens ?? usage.inputTokens ?? 0, + outputTokens: usage.output_tokens ?? usage.outputTokens ?? 0, + cacheReadTokens: usage.cache_read_tokens ?? usage.cacheReadTokens ?? 0, + cacheWriteTokens: usage.cache_write_tokens ?? usage.cacheWriteTokens ?? 0, + reasoningTokens: usage.reasoning_tokens ?? usage.reasoningTokens ?? 0, + model: response.model || '', + profile: this.profile, + }) + logger.debug(`[AgentClients] ${this.name}: response completed, content length=${fullContent.length}`) + if (fullContent) { + this.stopTyping(roomId) + this.sendMessage(roomId, fullContent) + } + onStatus?.('ready') + return + } + + if (eventType === 'response.failed') { + logger.error(`[AgentClients] ${this.name}: response failed`) + this.stopTyping(roomId) + onStatus?.('ready') + return } } - - source.onerror = (err: any) => { - logger.error(err, `[AgentClients] ${this.name}: EventSource error`) - source.close() - this.stopTyping(roomId) - this.deleteSession(actualSessionId).catch(() => { }) - onStatus?.('ready') - } + logger.warn(`[AgentClients] ${this.name}: response stream ended without terminal event`) + this.stopTyping(roomId) + onStatus?.('ready') } catch (err: any) { logger.error(`[AgentClients] ${this.name}: error handling message: ${err.message}`) this.stopTyping(roomId) - this.deleteSession(sessionId).catch(() => { }) onStatus?.('ready') } } @@ -460,6 +385,66 @@ class AgentClient { } } +async function* readSseFrames(stream: ReadableStream): AsyncGenerator<{ event?: string; data: string }> { + const decoder = new TextDecoder() + const reader = stream.getReader() + let buffer = '' + + try { + while (true) { + const { done, value } = await reader.read() + if (done) break + buffer += decoder.decode(value, { stream: true }) + + let boundary = buffer.indexOf('\n\n') + while (boundary >= 0) { + const raw = buffer.slice(0, boundary) + buffer = buffer.slice(boundary + 2) + const frame = parseSseFrame(raw) + if (frame?.data) yield frame + boundary = buffer.indexOf('\n\n') + } + } + + buffer += decoder.decode() + const frame = parseSseFrame(buffer) + if (frame?.data) yield frame + } finally { + reader.releaseLock() + } +} + +function parseSseFrame(raw: string): { event?: string; data: string } | null { + let event: string | undefined + const data: string[] = [] + for (const line of raw.split(/\r?\n/)) { + if (!line || line.startsWith(':')) continue + if (line.startsWith('event:')) { + event = line.slice(6).trim() + } else if (line.startsWith('data:')) { + data.push(line.slice(5).trimStart()) + } + } + if (data.length === 0) return null + return { event, data: data.join('\n') } +} + +function extractResponseText(response: any): string { + const output = Array.isArray(response?.output) ? response.output : [] + const parts: string[] = [] + for (const item of output) { + if (item.type !== 'message') continue + const content = Array.isArray(item.content) ? item.content : [] + for (const part of content) { + if (part.type === 'output_text' || part.type === 'text') { + parts.push(part.text || '') + } + } + } + if (parts.length > 0) return parts.join('') + return typeof response?.output_text === 'string' ? response.output_text : '' +} + // ─── AgentClients (roomId -> agents) ────────────────────────── export class AgentClients { diff --git a/packages/server/src/services/hermes/session-sync.ts b/packages/server/src/services/hermes/session-sync.ts index 9ab773d..0e49f21 100644 --- a/packages/server/src/services/hermes/session-sync.ts +++ b/packages/server/src/services/hermes/session-sync.ts @@ -107,7 +107,6 @@ async function syncProfileSessions(profile: string): Promise<{ reasoning: msg.reasoning, reasoning_details: msg.reasoning_details, reasoning_content: msg.reasoning_content, - codex_reasoning_items: msg.codex_reasoning_items, }) }