feat: add token usage tracking, context display, and dynamic context length (#132)

* fix: specify TS_NODE_PROJECT for dev:server script

ts-node/register resolves tsconfig from the entry file upward,
finding the root solution-style tsconfig.json (no compilerOptions).
This causes target to default to ES3, breaking MapIterator spread
syntax (TS2802). Set TS_NODE_PROJECT env var to point to the server
tsconfig which targets ES2024.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add token usage tracking, context display, and dynamic context length

- Intercept SSE proxy to capture run.completed events and persist token
  usage (input_tokens, output_tokens) per session to SQLite/JSON store
- Display context usage bar in ChatInput showing used/total/remaining tokens
- Resolve actual context length from Hermes models_dev_cache.json based
  on the active profile's default model (fallback 200K), with 5min in-memory cache
- Move sessions-db.ts to db/hermes/ for unified database layer
- Add usage store with SQLite + JSON fallback (auto-migration via ensureTable)
- Fix proxy SSE path regex to match rewritten upstream path
- Fix route ordering: /sessions/usage before /sessions/:id to avoid 404
- Fetch per-session usage on session enter instead of batch
- Add unit tests for usage-store, db index, and proxy SSE interception

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
ekko
2026-04-22 16:14:50 +08:00
committed by GitHub
parent ce3bf5f3eb
commit 6f69c69802
26 changed files with 1203 additions and 144 deletions
@@ -1,6 +1,8 @@
import * as hermesCli from '../../services/hermes/hermes-cli'
import { getConversationDetail, listConversationSummaries } from '../../services/hermes/conversations'
import { listSessionSummaries, searchSessionSummaries } from '../../services/hermes/sessions-db'
import { listSessionSummaries, searchSessionSummaries } from '../../db/hermes/sessions-db'
import { deleteUsage, getUsage, getUsageBatch } from '../../db/hermes/usage-store'
import { getModelContextLength } from '../../services/hermes/model-context'
import { logger } from '../../services/logger'
function parseHumanOnly(value: unknown): boolean {
@@ -84,9 +86,29 @@ export async function remove(ctx: any) {
ctx.body = { error: 'Failed to delete session' }
return
}
deleteUsage(ctx.params.id)
ctx.body = { ok: true }
}
export async function usageBatch(ctx: any) {
const ids = (ctx.query.ids as string)
if (!ids) {
ctx.body = {}
return
}
const idList = ids.split(',').filter(Boolean)
ctx.body = getUsageBatch(idList)
}
export async function usageSingle(ctx: any) {
const result = getUsage(ctx.params.id)
if (!result) {
ctx.body = { input_tokens: 0, output_tokens: 0 }
return
}
ctx.body = result
}
export async function rename(ctx: any) {
const { title } = ctx.request.body as { title?: string }
if (!title || typeof title !== 'string') {
@@ -102,3 +124,8 @@ export async function rename(ctx: any) {
}
ctx.body = { ok: true }
}
export async function contextLength(ctx: any) {
const profile = (ctx.query.profile as string) || undefined
ctx.body = { context_length: getModelContextLength(profile) }
}
@@ -1,4 +1,4 @@
import { getActiveProfileDir } from './hermes-profile'
import { getActiveProfileDir } from '../../services/hermes/hermes-profile'
const SQLITE_AVAILABLE = (() => {
const [major, minor] = process.versions.node.split('.').map(Number)
@@ -0,0 +1,75 @@
import { isSqliteAvailable, ensureTable, getDb, jsonSet, jsonGet, jsonGetAll, jsonDelete } from '../index'
const TABLE = 'session_usage'
const SCHEMA = {
session_id: 'TEXT PRIMARY KEY',
input_tokens: 'INTEGER NOT NULL DEFAULT 0',
output_tokens: 'INTEGER NOT NULL DEFAULT 0',
updated_at: 'INTEGER NOT NULL',
}
export function initUsageStore(): void {
if (isSqliteAvailable()) {
ensureTable(TABLE, SCHEMA)
}
}
export function updateUsage(sessionId: string, inputTokens: number, outputTokens: number): void {
const record = { input_tokens: inputTokens, output_tokens: outputTokens, updated_at: Date.now() }
if (isSqliteAvailable()) {
const db = getDb()!
db.prepare(
`INSERT INTO ${TABLE} (session_id, input_tokens, output_tokens, updated_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(session_id) DO UPDATE SET
input_tokens = excluded.input_tokens,
output_tokens = excluded.output_tokens,
updated_at = excluded.updated_at`,
).run(sessionId, inputTokens, outputTokens, record.updated_at)
} else {
jsonSet(TABLE, sessionId, record)
}
}
export function getUsage(sessionId: string): { input_tokens: number; output_tokens: number } | undefined {
if (isSqliteAvailable()) {
return getDb()!.prepare(
`SELECT input_tokens, output_tokens FROM ${TABLE} WHERE session_id = ?`,
).get(sessionId) as { input_tokens: number; output_tokens: number } | undefined
}
const row = jsonGet(TABLE, sessionId)
if (!row) return undefined
return { input_tokens: row.input_tokens ?? 0, output_tokens: row.output_tokens ?? 0 }
}
export function getUsageBatch(
sessionIds: string[],
): Record<string, { input_tokens: number; output_tokens: number }> {
if (sessionIds.length === 0) return {}
if (isSqliteAvailable()) {
const db = getDb()!
const placeholders = sessionIds.map(() => '?').join(',')
const rows = db.prepare(
`SELECT session_id, input_tokens, output_tokens FROM ${TABLE} WHERE session_id IN (${placeholders})`,
).all(...sessionIds) as Array<{ session_id: string; input_tokens: number; output_tokens: number }>
const map: Record<string, { input_tokens: number; output_tokens: number }> = {}
for (const r of rows) map[r.session_id] = { input_tokens: r.input_tokens, output_tokens: r.output_tokens }
return map
}
const all = jsonGetAll(TABLE)
const map: Record<string, { input_tokens: number; output_tokens: number }> = {}
for (const id of sessionIds) {
const row = all[id]
if (row) map[id] = { input_tokens: row.input_tokens ?? 0, output_tokens: row.output_tokens ?? 0 }
}
return map
}
export function deleteUsage(sessionId: string): void {
if (isSqliteAvailable()) {
getDb()!.prepare(`DELETE FROM ${TABLE} WHERE session_id = ?`).run(sessionId)
} else {
jsonDelete(TABLE, sessionId)
}
}
+136
View File
@@ -0,0 +1,136 @@
import { DatabaseSync } from 'node:sqlite'
import { mkdirSync, readFileSync, writeFileSync, existsSync } from 'fs'
import { resolve } from 'path'
import { homedir } from 'os'
const DB_DIR = resolve(homedir(), '.hermes-web-ui')
const DB_PATH = resolve(DB_DIR, 'hermes-web-ui.db')
const JSON_PATH = resolve(DB_DIR, 'hermes-web-ui.json')
// --- SQLite availability check ---
const SQLITE_AVAILABLE = (() => {
const [major, minor] = process.versions.node.split('.').map(Number)
return major > 22 || (major === 22 && minor >= 5)
})()
export function isSqliteAvailable(): boolean {
return SQLITE_AVAILABLE
}
// --- SQLite backend ---
let _db: DatabaseSync | null = null
export function getDb(): DatabaseSync | null {
if (!SQLITE_AVAILABLE) return null
if (!_db) {
mkdirSync(DB_DIR, { recursive: true })
_db = new DatabaseSync(DB_PATH)
_db.exec('PRAGMA journal_mode=WAL')
_db.exec('PRAGMA foreign_keys=ON')
}
return _db
}
/**
* Ensure a table's schema matches the expected definition.
* - Creates the table if it does not exist
* - Adds missing columns (ALTER TABLE ADD COLUMN)
* - Drops extra columns (ALTER TABLE DROP COLUMN, SQLite 3.35+)
*
* No-op when SQLite is not available.
*/
export function ensureTable(tableName: string, schema: Record<string, string>): void {
const db = getDb()
if (!db) return
const colDefs = Object.entries(schema)
.map(([col, def]) => `"${col}" ${def}`)
.join(', ')
db.exec(`CREATE TABLE IF NOT EXISTS "${tableName}" (${colDefs})`)
const rows = db.prepare(`PRAGMA table_info("${tableName}")`).all() as Array<{ name: string }>
const existingCols = new Set(rows.map(r => r.name))
const expectedCols = new Set(Object.keys(schema))
for (const col of expectedCols) {
if (!existingCols.has(col)) {
db.exec(`ALTER TABLE "${tableName}" ADD COLUMN "${col}" ${schema[col]}`)
}
}
for (const col of existingCols) {
if (!expectedCols.has(col)) {
db.exec(`ALTER TABLE "${tableName}" DROP COLUMN "${col}"`)
}
}
}
// --- JSON fallback backend ---
type JsonData = Record<string, Record<string, Record<string, any>>>
function readJsonStore(): JsonData {
if (!existsSync(JSON_PATH)) return {}
try {
return JSON.parse(readFileSync(JSON_PATH, 'utf-8'))
} catch {
return {}
}
}
function writeJsonStore(data: JsonData): void {
mkdirSync(DB_DIR, { recursive: true })
writeFileSync(JSON_PATH, JSON.stringify(data, null, 2), 'utf-8')
}
/**
* Get a record from the JSON store.
* @param table Table name (namespace)
* @param key Primary key
*/
export function jsonGet(table: string, key: string): Record<string, any> | undefined {
const data = readJsonStore()
return data[table]?.[key]
}
/**
* Set a record in the JSON store.
* @param table Table name (namespace)
* @param key Primary key
* @param value Record data
*/
export function jsonSet(table: string, key: string, value: Record<string, any>): void {
const data = readJsonStore()
if (!data[table]) data[table] = {}
data[table][key] = value
writeJsonStore(data)
}
/**
* Get all records from a table in the JSON store.
*/
export function jsonGetAll(table: string): Record<string, Record<string, any>> {
const data = readJsonStore()
return data[table] || {}
}
/**
* Delete a record from the JSON store.
*/
export function jsonDelete(table: string, key: string): void {
const data = readJsonStore()
if (data[table]) {
delete data[table][key]
writeJsonStore(data)
}
}
/**
* Get the storage path for debugging.
*/
export function getStoragePath(): string {
return SQLITE_AVAILABLE ? DB_PATH : JSON_PATH
}
+7 -1
View File
@@ -20,7 +20,7 @@ import { logger } from './services/logger'
declare const __APP_VERSION__: string
const APP_VERSION = typeof __APP_VERSION__ !== 'undefined'
? __APP_VERSION__
: (() => { try { return JSON.parse(readFileSync(resolve(__dirname, '../../package.json'), 'utf-8')).version } catch { return 'dev' } } )()
: (() => { try { return JSON.parse(readFileSync(resolve(__dirname, '../../package.json'), 'utf-8')).version } catch { return 'dev' } })()
// Global error handlers
process.on('uncaughtException', (err) => {
@@ -44,6 +44,12 @@ export async function bootstrap() {
await initGatewayManager()
console.log('[bootstrap] gateway manager initialized')
// Initialize web-ui SQLite tables
const { initUsageStore } = await import('./db/hermes/usage-store')
initUsageStore()
console.log('[bootstrap] usage store initialized')
app.use(cors({ origin: config.corsOrigins }))
app.use(bodyParser())
console.log('[bootstrap] cors + bodyParser registered')
@@ -1,9 +1,26 @@
import type { Context } from 'koa'
import { config } from '../../config'
import { getGatewayManagerInstance } from '../../services/gateway-bootstrap'
import { updateUsage } from '../../db/hermes/usage-store'
function getGatewayManager() { return getGatewayManagerInstance() }
// --- run_id → session_id mapping (in-memory, ephemeral) ---
const runSessionMap = new Map<string, string>()
export function setRunSession(runId: string, sessionId: string): void {
runSessionMap.set(runId, sessionId)
// Auto-cleanup after 30 minutes
setTimeout(() => runSessionMap.delete(runId), 30 * 60 * 1000)
}
function getSessionForRun(runId: string): string | undefined {
return runSessionMap.get(runId)
}
// --- Helpers ---
function isTransientGatewayError(err: any): boolean {
const msg = String(err?.message || '')
const causeCode = String(err?.cause?.code || '')
@@ -48,19 +65,7 @@ function resolveUpstream(ctx: Context): string {
return config.upstream.replace(/\/$/, '')
}
export async function proxy(ctx: Context) {
const profile = resolveProfile(ctx)
const upstream = resolveUpstream(ctx)
// Rewrite path for upstream gateway:
// /api/hermes/v1/* -> /v1/* (upstream uses /v1/ prefix)
// /api/hermes/* -> /api/* (upstream uses /api/ prefix)
const upstreamPath = ctx.path.replace(/^\/api\/hermes\/v1/, '/v1').replace(/^\/api\/hermes/, '/api')
const params = new URLSearchParams(ctx.search || '')
params.delete('token')
const search = params.toString()
const url = `${upstream}${upstreamPath}${search ? `?${search}` : ''}`
// Build headers — forward most, strip browser/web-ui specific ones
function buildProxyHeaders(ctx: Context, upstream: string): Record<string, string> {
const headers: Record<string, string> = {}
for (const [key, value] of Object.entries(ctx.headers)) {
if (value == null) continue
@@ -75,33 +80,118 @@ export async function proxy(ctx: Context) {
}
}
// Inject Hermes gateway API key from profile's .env
const mgr = getGatewayManager()
if (mgr) {
const apiKey = mgr.getApiKey(profile)
const apiKey = mgr.getApiKey(resolveProfile(ctx))
if (apiKey) {
headers['authorization'] = `Bearer ${apiKey}`
}
}
return headers
}
// --- SSE stream interception ---
const SSE_EVENTS_PATH = /^\/v1\/runs\/([^/]+)\/events$/
/**
* Parse SSE text chunks and extract run.completed events.
* Returns the run_id if a run.completed was found.
*/
function extractRunCompletedFromChunk(chunk: string): string | null {
// SSE format: each line is "data: {...}\n\n"
const lines = chunk.split('\n')
for (const line of lines) {
if (!line.startsWith('data: ')) continue
try {
const data = JSON.parse(line.slice(6))
if (data.event === 'run.completed' && data.usage && data.run_id) {
const sessionId = getSessionForRun(data.run_id)
if (sessionId) {
updateUsage(sessionId, data.usage.input_tokens, data.usage.output_tokens)
return data.run_id
}
}
} catch { /* not JSON, skip */ }
}
return null
}
/**
* Stream an SSE response while intercepting run.completed events.
*/
async function streamSSE(ctx: Context, res: Response): Promise<void> {
if (!res.body) {
ctx.res.end()
return
}
const reader = res.body.getReader()
const decoder = new TextDecoder()
let buffer = ''
try {
// Build request body from raw body
let body: string | undefined
if (ctx.req.method !== 'GET' && ctx.req.method !== 'HEAD') {
body = (ctx as any).request.rawBody as string | undefined
while (true) {
const { done, value } = await reader.read()
if (done) break
// Forward raw bytes to client immediately
ctx.res.write(value)
// Also decode for interception
buffer += decoder.decode(value, { stream: true })
// Process complete SSE lines (delimited by double newline)
let newlineIdx: number
while ((newlineIdx = buffer.indexOf('\n\n')) !== -1) {
const eventBlock = buffer.slice(0, newlineIdx)
buffer = buffer.slice(newlineIdx + 2)
extractRunCompletedFromChunk(eventBlock)
}
}
const requestInit: RequestInit = {
method: ctx.req.method,
headers,
body,
// Process remaining buffer
if (buffer.trim()) {
extractRunCompletedFromChunk(buffer)
}
} finally {
ctx.res.end()
}
}
// --- Main proxy function ---
export async function proxy(ctx: Context) {
const profile = resolveProfile(ctx)
const upstream = resolveUpstream(ctx)
const upstreamPath = ctx.path.replace(/^\/api\/hermes\/v1/, '/v1').replace(/^\/api\/hermes/, '/api')
const params = new URLSearchParams(ctx.search || '')
params.delete('token')
const search = params.toString()
const url = `${upstream}${upstreamPath}${search ? `?${search}` : ''}`
const headers = buildProxyHeaders(ctx, upstream)
try {
let body: string | undefined
if (ctx.req.method !== 'GET' && ctx.req.method !== 'HEAD') {
// @koa/bodyparser parses JSON into ctx.request.body but doesn't store rawBody
// by default. Re-serialize the parsed body to get the string form.
const parsed = (ctx as any).request.body
if (typeof parsed === 'string') {
body = parsed
} else if (parsed && typeof parsed === 'object') {
body = JSON.stringify(parsed)
}
}
const requestInit: RequestInit = { method: ctx.req.method, headers, body }
let res: Response
try {
res = await fetch(url, requestInit)
} catch (err: any) {
// Gateway may be restarting; wait briefly and retry once.
if (isTransientGatewayError(err) && await waitForGatewayReady(upstream)) {
res = await fetch(url, requestInit)
} else {
@@ -116,10 +206,37 @@ export async function proxy(ctx: Context) {
ctx.set(key, value)
}
})
ctx.status = res.status
// Stream response body
// Intercept POST /v1/runs to capture run_id → session_id mapping
if (ctx.req.method === 'POST' && /\/v1\/runs$/.test(upstreamPath) && body) {
try {
const parsed = JSON.parse(body)
if (parsed.session_id) {
const resBody = await res.text()
ctx.res.write(resBody)
ctx.res.end()
try {
const result = JSON.parse(resBody)
if (result.run_id) {
setRunSession(result.run_id, parsed.session_id)
}
} catch { /* response not JSON, ignore */ }
return
}
} catch { /* body not JSON, fall through to normal stream */ }
// No session_id in body — fall through to normal response handling below
}
// Intercept SSE streams for /v1/runs/{id}/events
const sseMatch = upstreamPath.match(SSE_EVENTS_PATH)
if (sseMatch) {
await streamSSE(ctx, res)
return
}
// Default: pipe response body directly
if (res.body) {
const reader = res.body.getReader()
const pump = async () => {
@@ -8,6 +8,9 @@ sessionRoutes.get('/api/hermes/sessions/conversations/:id/messages', ctrl.getCon
sessionRoutes.get('/api/hermes/sessions', ctrl.list)
sessionRoutes.get('/api/hermes/search/sessions', ctrl.search)
sessionRoutes.get('/api/hermes/sessions/search', ctrl.search)
sessionRoutes.get('/api/hermes/sessions/usage', ctrl.usageBatch)
sessionRoutes.get('/api/hermes/sessions/context-length', ctrl.contextLength)
sessionRoutes.get('/api/hermes/sessions/:id', ctrl.get)
sessionRoutes.get('/api/hermes/sessions/:id/usage', ctrl.usageSingle)
sessionRoutes.delete('/api/hermes/sessions/:id', ctrl.remove)
sessionRoutes.post('/api/hermes/sessions/:id/rename', ctrl.rename)
@@ -0,0 +1,106 @@
import { resolve, join } from 'path'
import { homedir } from 'os'
import { readFileSync, existsSync, statSync } from 'fs'
const HERMES_BASE = resolve(homedir(), '.hermes')
const MODELS_DEV_CACHE = resolve(HERMES_BASE, 'models_dev_cache.json')
const DEFAULT_CONTEXT_LENGTH = 200_000
interface ModelLimit {
context?: number
output?: number
input?: number
}
interface ModelEntry {
id?: string
limit?: ModelLimit
}
interface ProviderEntry {
models?: Record<string, ModelEntry>
}
// --- In-memory cache: parsed models_dev_cache (1.7MB), invalidated by mtime ---
let _cache: Record<string, ProviderEntry> | null = null
let _cacheMtime = 0
const CACHE_TTL_MS = 5 * 60 * 1000 // 5 minutes
let _cacheLoadedAt = 0
function loadModelsDevCache(): Record<string, ProviderEntry> | null {
if (!existsSync(MODELS_DEV_CACHE)) return null
try {
const stat = statSync(MODELS_DEV_CACHE)
const now = Date.now()
// Return cached if file hasn't changed and within TTL
if (_cache && stat.mtimeMs === _cacheMtime && now - _cacheLoadedAt < CACHE_TTL_MS) {
return _cache
}
const raw = readFileSync(MODELS_DEV_CACHE, 'utf-8')
_cache = JSON.parse(raw) as Record<string, ProviderEntry>
_cacheMtime = stat.mtimeMs
_cacheLoadedAt = now
return _cache
} catch {
return _cache // return stale cache on error
}
}
// --- Profile helpers ---
function getProfileDir(profile?: string): string {
if (!profile || profile === 'default') return HERMES_BASE
const dir = join(HERMES_BASE, 'profiles', profile)
return existsSync(dir) ? dir : HERMES_BASE
}
function getDefaultModel(profileDir: string): string | null {
const configPath = join(profileDir, 'config.yaml')
if (!existsSync(configPath)) return null
try {
const content = readFileSync(configPath, 'utf-8')
const match = content.match(/^model:\s*\n\s+default:\s*(.+)$/m)
return match ? match[1].trim() : null
} catch {
return null
}
}
// --- Context lookup ---
function lookupContextFromCache(modelName: string): number | null {
const data = loadModelsDevCache()
if (!data) return null
// Exact match first
for (const prov of Object.values(data)) {
const models = prov.models || {}
const entry = models[modelName]
if (entry?.limit?.context) return entry.limit.context
}
// Case-insensitive fallback
const lower = modelName.toLowerCase()
for (const prov of Object.values(data)) {
const models = prov.models || {}
for (const [name, entry] of Object.entries(models)) {
if (name.toLowerCase() === lower && entry?.limit?.context) {
return entry.limit.context
}
}
}
return null
}
/**
* Get the context length for the current profile's default model.
* Results are cached in memory (5min TTL) and invalidated by file mtime.
*/
export function getModelContextLength(profile?: string): number {
const profileDir = getProfileDir(profile)
const model = getDefaultModel(profileDir)
if (!model) return DEFAULT_CONTEXT_LENGTH
return lookupContextFromCache(model) || DEFAULT_CONTEXT_LENGTH
}