feat: v0.5.16 - migrate to Responses API (#586)

* refactor: migrate from /v1/runs to /v1/responses streaming API

Replace EventSource-based polling with direct SSE streaming via the
/v1/responses endpoint across all server-side callers (chat-run-socket,
context-compressor, gateway-client, agent-clients). Messages are now
written to DB in real-time during streaming, eliminating post-run sync.
Frontend chat store adds tool_call_id tracking for deduplication.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* chore: bump version to 0.5.16 and add changelog

- Persist real API usage to usage table on response.completed
- Remove unused codex_reasoning_items field from message schema
- Fix unused variable warnings in chat-run-socket
- Bump version to 0.5.16
- Add changelog entries for 0.5.16 (8 locales)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
ekko
2026-05-10 02:49:58 +08:00
committed by GitHub
parent a36c0a3095
commit 50122c5ff8
19 changed files with 843 additions and 866 deletions
File diff suppressed because it is too large Load Diff
@@ -1,4 +1,3 @@
import { EventSource } from 'eventsource'
import type { StoredMessage, GatewayCaller } from './types'
import {
buildSummarizationSystemPrompt,
@@ -6,12 +5,11 @@ import {
buildIncrementalUpdatePrompt,
} from './prompt'
import { updateUsage } from '../../../db/hermes/usage-store'
import { getSessionDetailFromDbWithProfile } from '../../../db/hermes/sessions-db'
import { logger } from '../../logger'
/**
* Calls Hermes /v1/runs to produce LLM-generated summaries.
* Uses non-streaming EventSource to wait for run.completed.
* Calls Hermes /v1/responses to produce LLM-generated summaries.
* The context engine owns history assembly; Responses storage/chaining is not used.
*/
export class GatewaySummarizer implements GatewayCaller {
private timeoutMs: number
@@ -29,13 +27,11 @@ export class GatewaySummarizer implements GatewayCaller {
profile: string,
previousSummary?: string,
): Promise<{ summary: string; sessionId: string }> {
// Build conversation_history from messages
const history: Array<{ role: string; content: string }> = messages.map(m => ({
role: 'user',
content: `[${m.senderName}]: ${m.content}`,
}))
// Inject previous summary for incremental update
if (previousSummary) {
history.unshift(
{ role: 'user', content: `[Previous summary]\n${previousSummary}` },
@@ -47,10 +43,7 @@ export class GatewaySummarizer implements GatewayCaller {
? buildIncrementalUpdatePrompt()
: buildFullSummaryPrompt()
const sessionId = Date.now().toString(36) + Math.random().toString(36).slice(2, 8)
// POST /v1/runs
const res = await fetch(`${upstream}/v1/runs`, {
const res = await fetch(`${upstream.replace(/\/$/, '')}/v1/responses`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
@@ -60,98 +53,122 @@ export class GatewaySummarizer implements GatewayCaller {
input: userPrompt,
instructions: systemPrompt || buildSummarizationSystemPrompt(),
conversation_history: history,
session_id: sessionId,
stream: true,
store: false,
}),
signal: AbortSignal.timeout(this.timeoutMs),
})
if (!res.ok) {
throw new Error(`Summarization run failed: ${res.status}`)
throw new Error(`Summarization response failed: ${res.status}`)
}
if (!res.body) {
throw new Error('Summarization response stream missing')
}
const { run_id } = await res.json() as { run_id: string }
let output = ''
for await (const frame of readSseFrames(res.body)) {
let parsed: any
try {
parsed = JSON.parse(frame.data)
} catch {
continue
}
const eventType = parsed.type || frame.event || parsed.event
try {
const output = await this.pollForResult(upstream, apiKey, run_id, sessionId, roomId, profile)
return { summary: output, sessionId }
} finally {
// Note: session cleanup is handled by the caller (compressor.ts)
if (eventType === 'response.output_text.delta' && parsed.delta) {
output += parsed.delta
continue
}
if (eventType === 'response.completed') {
const response = parsed.response || parsed
const finalText = extractResponseText(response)
if (!output && finalText) output = finalText
const usage = response.usage || {}
updateUsage(roomId, {
inputTokens: usage.input_tokens ?? usage.inputTokens ?? 0,
outputTokens: usage.output_tokens ?? usage.outputTokens ?? 0,
cacheReadTokens: usage.cache_read_tokens ?? usage.cacheReadTokens ?? 0,
cacheWriteTokens: usage.cache_write_tokens ?? usage.cacheWriteTokens ?? 0,
reasoningTokens: usage.reasoning_tokens ?? usage.reasoningTokens ?? 0,
model: response.model || '',
profile,
})
logger.debug(`[GatewaySummarizer] Recorded response usage for compression room ${roomId} (profile=${profile}): input=${usage.input_tokens ?? 0}, output=${usage.output_tokens ?? 0}`)
if (!output || output.trim() === '') {
throw new Error('Empty summarization response')
}
return { summary: output.trim(), sessionId: '' }
}
if (eventType === 'response.failed') {
throw new Error(parsed.error?.message || parsed.error || 'Summarization response failed')
}
}
throw new Error('Summarization response stream ended without a terminal event')
}
private pollForResult(upstream: string, apiKey: string | null, runId: string, sessionId: string, roomId: string, profile: string): Promise<string> {
return new Promise<string>((resolve, reject) => {
const timer = setTimeout(() => {
source.close()
reject(new Error('Summarization timed out'))
}, this.timeoutMs)
const eventsUrl = new URL(`${upstream}/v1/runs/${runId}/events`)
// Use Authorization header instead of query parameter for better compatibility
const eventSourceInit: any = apiKey ? {
fetch: (url: string, init: any = {}) => fetch(url, {
...init,
headers: {
...(init.headers || {}),
Authorization: `Bearer ${apiKey}`,
},
}),
} : {}
// @ts-ignore - eventsource library types are too strict
const source = new EventSource(eventsUrl.toString(), eventSourceInit)
source.onmessage = async (event: MessageEvent) => {
try {
const parsed = JSON.parse(event.data)
if (parsed.event === 'run.completed') {
clearTimeout(timer)
// Record usage data from Hermes state.db BEFORE closing source
// This ensures we fetch usage before sessionCleaner can delete it
try {
const detail = await getSessionDetailFromDbWithProfile(sessionId, profile)
if (detail) {
updateUsage(roomId, {
inputTokens: detail.input_tokens,
outputTokens: detail.output_tokens,
cacheReadTokens: detail.cache_read_tokens,
cacheWriteTokens: detail.cache_write_tokens,
reasoningTokens: detail.reasoning_tokens,
model: detail.model,
profile,
})
logger.debug(`[GatewaySummarizer] Recorded usage for compression room ${roomId} (session ${sessionId}, profile=${profile}): input=${detail.input_tokens}, output=${detail.output_tokens}`)
} else {
logger.warn(`[GatewaySummarizer] Failed to get session detail for ${sessionId} (profile=${profile})`)
}
} catch (err: any) {
logger.warn(err, '[GatewaySummarizer] Failed to record usage from DB')
}
source.close()
const output = parsed.output
if (!output || typeof output !== 'string' || output.trim() === '') {
reject(new Error('Empty summarization response'))
return
}
resolve(output.trim())
} else if (parsed.event === 'run.failed') {
clearTimeout(timer)
source.close()
reject(new Error(parsed.error || 'Summarization run failed'))
}
} catch { /* ignore parse errors for non-JSON events */ }
}
source.onerror = () => {
clearTimeout(timer)
source.close()
reject(new Error('Summarization SSE connection error'))
}
})
}
}
async function* readSseFrames(stream: ReadableStream<Uint8Array>): AsyncGenerator<{ event?: string; data: string }> {
const decoder = new TextDecoder()
const reader = stream.getReader()
let buffer = ''
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
let boundary = buffer.indexOf('\n\n')
while (boundary >= 0) {
const raw = buffer.slice(0, boundary)
buffer = buffer.slice(boundary + 2)
const frame = parseSseFrame(raw)
if (frame?.data) yield frame
boundary = buffer.indexOf('\n\n')
}
}
buffer += decoder.decode()
const frame = parseSseFrame(buffer)
if (frame?.data) yield frame
} finally {
reader.releaseLock()
}
}
function parseSseFrame(raw: string): { event?: string; data: string } | null {
let event: string | undefined
const data: string[] = []
for (const line of raw.split(/\r?\n/)) {
if (!line || line.startsWith(':')) continue
if (line.startsWith('event:')) {
event = line.slice(6).trim()
} else if (line.startsWith('data:')) {
data.push(line.slice(5).trimStart())
}
}
if (data.length === 0) return null
return { event, data: data.join('\n') }
}
function extractResponseText(response: any): string {
const output = Array.isArray(response?.output) ? response.output : []
const parts: string[] = []
for (const item of output) {
if (item.type !== 'message') continue
const content = Array.isArray(item.content) ? item.content : []
for (const part of content) {
if (part.type === 'output_text' || part.type === 'text') {
parts.push(part.text || '')
}
}
}
if (parts.length > 0) return parts.join('')
return typeof response?.output_text === 'string' ? response.output_text : ''
}
@@ -1,12 +1,8 @@
import { io, Socket } from 'socket.io-client'
import { EventSource } from 'eventsource'
import { getToken } from '../../../services/auth'
import type { GatewayManager } from '../gateway-manager'
import { deleteSession as hermesDeleteSession } from '../hermes-cli'
import { getActiveProfileName } from '../hermes-profile'
import { logger } from '../../../services/logger'
import { updateUsage } from '../../../db/hermes/usage-store'
import { getSessionDetailFromDbWithProfile } from '../../../db/hermes/sessions-db'
// ─── Types ────────────────────────────────────────────────────
@@ -186,29 +182,6 @@ class AgentClient {
}
}
private async deleteSession(sessionId: string): Promise<void> {
try {
const sessionProfile = this.storage?.getSessionProfile?.(sessionId)
const currentProfile = getActiveProfileName()
if (sessionProfile && sessionProfile.profile_name !== currentProfile) {
// Cross-profile: enqueue deferred delete, don't switch profile
this.storage?.enqueuePendingSessionDelete?.(sessionId, sessionProfile.profile_name)
logger.info(`[AgentClients] ${this.name}: cross-profile deferred delete session ${sessionId} (session=${sessionProfile.profile_name}, active=${currentProfile})`)
return
}
// Same profile or no mapping: delete directly
const ok = await hermesDeleteSession(sessionId)
if (ok) {
this.storage?.deleteSessionProfile?.(sessionId)
}
logger.debug(`[AgentClients] ${this.name}: delete session ${sessionId} (profile=${this.profile}) → ${ok ? 'ok' : 'failed'}`)
} catch (err: any) {
logger.warn(`[AgentClients] ${this.name}: failed to delete session ${sessionId}: ${err.message}`)
}
}
// ─── Hermes Gateway Integration ────────────────────────────
/**
@@ -235,8 +208,6 @@ class AgentClient {
return
}
const sessionId = Date.now().toString(36) + Math.random().toString(36).slice(2, 8)
try {
// Notify room that agent is typing
this.startTyping(roomId)
@@ -290,8 +261,7 @@ class AgentClient {
// Strip @mention from input — agent already knows it was mentioned
const input = msg.content.replace(new RegExp(`@${this.name.replace(/[.*+?^${}()|[\]\\]/g, '\\$&')}\\s*`, 'gi'), '').trim() || msg.content
// Start a run on Hermes gateway
const runRes = await fetch(`${upstream}/v1/runs`, {
const responseRes = await fetch(`${upstream.replace(/\/$/, '')}/v1/responses`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
@@ -299,126 +269,81 @@ class AgentClient {
},
body: JSON.stringify({
input,
session_id: sessionId,
...(conversationHistory.length > 0 ? { conversation_history: conversationHistory } : {}),
...(instructions ? { instructions } : {}),
stream: true,
store: false,
}),
signal: AbortSignal.timeout(120000),
})
if (!runRes.ok) {
const text = await runRes.text().catch(() => '')
logger.error(`[AgentClients] ${this.name}: gateway run failed (${runRes.status}): ${text}`)
if (!responseRes.ok) {
const text = await responseRes.text().catch(() => '')
logger.error(`[AgentClients] ${this.name}: gateway response failed (${responseRes.status}): ${text}`)
this.stopTyping(roomId)
onStatus?.('ready')
return
}
const runData = await runRes.json() as any
const run_id = runData.run_id
logger.debug(`[AgentClients] ${this.name}: run started, response=%j`, runData)
if (!run_id) {
logger.error(`[AgentClients] ${this.name}: no run_id in response`)
if (!responseRes.body) {
logger.error(`[AgentClients] ${this.name}: gateway response stream missing`)
this.stopTyping(roomId)
onStatus?.('ready')
return
}
// Save session-to-profile mapping after gateway confirms the run
const actualSessionId = runData.session_id || sessionId
if (!this.storage) {
logger.warn(`[AgentClients] ${this.name}: storage is null, cannot save session profile for ${actualSessionId}`)
} else {
this.storage.saveSessionProfile(actualSessionId, roomId, this.agentId, this.profile)
logger.debug(`[AgentClients] ${this.name}: saved session profile ${actualSessionId} → profile=${this.profile}`)
}
// Stream events from Hermes
const eventsUrl = new URL(`${upstream}/v1/runs/${run_id}/events`)
logger.debug(`[AgentClients] ${this.name}: streaming events from ${eventsUrl}`)
// Use Authorization header instead of query parameter for better compatibility
const eventSourceInit: any = apiKey ? {
fetch: (url: string, init: any = {}) => fetch(url, {
...init,
headers: {
...(init.headers || {}),
Authorization: `Bearer ${apiKey}`,
},
}),
} : {}
// @ts-ignore - eventsource library types are too strict
const source = new EventSource(eventsUrl.toString(), eventSourceInit)
let fullContent = ''
source.onmessage = async (e: any) => {
for await (const frame of readSseFrames(responseRes.body)) {
let parsed: any
try {
const parsed = JSON.parse(e.data)
logger.debug(`[AgentClients] ${this.name}: event=${parsed.event}`)
if (parsed.event === 'run.completed') {
// Record usage data from Hermes state.db BEFORE closing source
// This ensures we fetch usage before deleteSession can delete it
try {
const detail = await getSessionDetailFromDbWithProfile(actualSessionId, this.profile)
if (detail) {
updateUsage(roomId, {
inputTokens: detail.input_tokens,
outputTokens: detail.output_tokens,
cacheReadTokens: detail.cache_read_tokens,
cacheWriteTokens: detail.cache_write_tokens,
reasoningTokens: detail.reasoning_tokens,
model: detail.model,
profile: this.profile,
})
logger.debug(`[AgentClients] Recorded usage for room ${roomId} (session ${actualSessionId}, profile=${this.profile}): input=${detail.input_tokens}, output=${detail.output_tokens}`)
} else {
logger.warn(`[AgentClients] Failed to get session detail for ${actualSessionId} (profile=${this.profile})`)
}
} catch (err: any) {
logger.warn(err, '[AgentClients] Failed to record usage from DB')
}
source.close()
logger.debug(`[AgentClients] ${this.name}: run completed, content length=${fullContent.length}`)
if (fullContent) {
this.stopTyping(roomId)
this.sendMessage(roomId, fullContent)
}
this.deleteSession(actualSessionId).catch(() => { })
onStatus?.('ready')
return
}
if (parsed.event === 'run.failed') {
source.close()
logger.error(`[AgentClients] ${this.name}: run failed`)
this.stopTyping(roomId)
this.deleteSession(actualSessionId).catch(() => { })
onStatus?.('ready')
return
}
// Accumulate message deltas
if (parsed.event === 'message.delta' && parsed.delta) {
fullContent += parsed.delta
}
parsed = JSON.parse(frame.data)
} catch {
// ignore parse errors
continue
}
const eventType = parsed.type || frame.event || parsed.event
logger.debug(`[AgentClients] ${this.name}: event=${eventType}`)
if (eventType === 'response.output_text.delta' && parsed.delta) {
fullContent += parsed.delta
continue
}
if (eventType === 'response.completed') {
const response = parsed.response || parsed
const finalText = extractResponseText(response)
if (!fullContent && finalText) fullContent = finalText
const usage = response.usage || {}
updateUsage(roomId, {
inputTokens: usage.input_tokens ?? usage.inputTokens ?? 0,
outputTokens: usage.output_tokens ?? usage.outputTokens ?? 0,
cacheReadTokens: usage.cache_read_tokens ?? usage.cacheReadTokens ?? 0,
cacheWriteTokens: usage.cache_write_tokens ?? usage.cacheWriteTokens ?? 0,
reasoningTokens: usage.reasoning_tokens ?? usage.reasoningTokens ?? 0,
model: response.model || '',
profile: this.profile,
})
logger.debug(`[AgentClients] ${this.name}: response completed, content length=${fullContent.length}`)
if (fullContent) {
this.stopTyping(roomId)
this.sendMessage(roomId, fullContent)
}
onStatus?.('ready')
return
}
if (eventType === 'response.failed') {
logger.error(`[AgentClients] ${this.name}: response failed`)
this.stopTyping(roomId)
onStatus?.('ready')
return
}
}
source.onerror = (err: any) => {
logger.error(err, `[AgentClients] ${this.name}: EventSource error`)
source.close()
this.stopTyping(roomId)
this.deleteSession(actualSessionId).catch(() => { })
onStatus?.('ready')
}
logger.warn(`[AgentClients] ${this.name}: response stream ended without terminal event`)
this.stopTyping(roomId)
onStatus?.('ready')
} catch (err: any) {
logger.error(`[AgentClients] ${this.name}: error handling message: ${err.message}`)
this.stopTyping(roomId)
this.deleteSession(sessionId).catch(() => { })
onStatus?.('ready')
}
}
@@ -460,6 +385,66 @@ class AgentClient {
}
}
async function* readSseFrames(stream: ReadableStream<Uint8Array>): AsyncGenerator<{ event?: string; data: string }> {
const decoder = new TextDecoder()
const reader = stream.getReader()
let buffer = ''
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
let boundary = buffer.indexOf('\n\n')
while (boundary >= 0) {
const raw = buffer.slice(0, boundary)
buffer = buffer.slice(boundary + 2)
const frame = parseSseFrame(raw)
if (frame?.data) yield frame
boundary = buffer.indexOf('\n\n')
}
}
buffer += decoder.decode()
const frame = parseSseFrame(buffer)
if (frame?.data) yield frame
} finally {
reader.releaseLock()
}
}
function parseSseFrame(raw: string): { event?: string; data: string } | null {
let event: string | undefined
const data: string[] = []
for (const line of raw.split(/\r?\n/)) {
if (!line || line.startsWith(':')) continue
if (line.startsWith('event:')) {
event = line.slice(6).trim()
} else if (line.startsWith('data:')) {
data.push(line.slice(5).trimStart())
}
}
if (data.length === 0) return null
return { event, data: data.join('\n') }
}
function extractResponseText(response: any): string {
const output = Array.isArray(response?.output) ? response.output : []
const parts: string[] = []
for (const item of output) {
if (item.type !== 'message') continue
const content = Array.isArray(item.content) ? item.content : []
for (const part of content) {
if (part.type === 'output_text' || part.type === 'text') {
parts.push(part.text || '')
}
}
}
if (parts.length > 0) return parts.join('')
return typeof response?.output_text === 'string' ? response.output_text : ''
}
// ─── AgentClients (roomId -> agents) ──────────────────────────
export class AgentClients {
@@ -107,7 +107,6 @@ async function syncProfileSessions(profile: string): Promise<{
reasoning: msg.reasoning,
reasoning_details: msg.reasoning_details,
reasoning_content: msg.reasoning_content,
codex_reasoning_items: msg.codex_reasoning_items,
})
}