diff --git a/packages/client/src/components/hermes/group-chat/CreateRoomForm.vue b/packages/client/src/components/hermes/group-chat/CreateRoomForm.vue index 7a52703..4c96147 100644 --- a/packages/client/src/components/hermes/group-chat/CreateRoomForm.vue +++ b/packages/client/src/components/hermes/group-chat/CreateRoomForm.vue @@ -22,7 +22,7 @@ const roomInput = ref(null) const compression = ref({ triggerTokens: 100000, maxHistoryTokens: 32000, - tailMessageCount: 20, + tailMessageCount: 10, }) function generateCode(): string { diff --git a/packages/client/src/components/hermes/group-chat/GroupChatPanel.vue b/packages/client/src/components/hermes/group-chat/GroupChatPanel.vue index dbe0f8f..7d0fb15 100644 --- a/packages/client/src/components/hermes/group-chat/GroupChatPanel.vue +++ b/packages/client/src/components/hermes/group-chat/GroupChatPanel.vue @@ -18,7 +18,7 @@ const showSidebar = ref(window.innerWidth > 768) const showCreateModal = ref(false) const showAddAgentModal = ref(false) const showCompressionModal = ref(false) -const compressionConfig = ref({ triggerTokens: 100000, maxHistoryTokens: 32000, tailMessageCount: 20 }) +const compressionConfig = ref({ triggerTokens: 100000, maxHistoryTokens: 32000, tailMessageCount: 10 }) const isCompressing = ref(false) const selectedProfile = ref(null) const agentName = ref('') @@ -119,7 +119,7 @@ function handleOpenCompressionConfig() { compressionConfig.value = { triggerTokens: room.triggerTokens ?? 100000, maxHistoryTokens: room.maxHistoryTokens ?? 32000, - tailMessageCount: room.tailMessageCount ?? 20, + tailMessageCount: room.tailMessageCount ?? 10, } } showCompressionModal.value = true diff --git a/packages/client/src/stores/hermes/group-chat.ts b/packages/client/src/stores/hermes/group-chat.ts index 9b907f9..2203222 100644 --- a/packages/client/src/stores/hermes/group-chat.ts +++ b/packages/client/src/stores/hermes/group-chat.ts @@ -241,7 +241,7 @@ export const useGroupChatStore = defineStore('groupChat', () => { name, inviteCode, agents: agentList, - compression: compression || { triggerTokens: 100000, maxHistoryTokens: 32000, tailMessageCount: 20 }, + compression: compression || { triggerTokens: 100000, maxHistoryTokens: 32000, tailMessageCount: 10 }, }) rooms.value.push(res.room) return res diff --git a/packages/server/src/controllers/hermes/logs.ts b/packages/server/src/controllers/hermes/logs.ts index c91fee5..a531b02 100644 --- a/packages/server/src/controllers/hermes/logs.ts +++ b/packages/server/src/controllers/hermes/logs.ts @@ -5,6 +5,7 @@ import { homedir } from 'os' import * as hermesCli from '../../services/hermes/hermes-cli' const WEBUI_LOG_FILE = join(homedir(), '.hermes-web-ui', 'logs', 'server.log') +const BRIDGE_LOG_FILE = join(homedir(), '.hermes-web-ui', 'logs', 'bridge.log') interface LogEntry { timestamp: string; level: string; logger: string; message: string; raw: string @@ -15,7 +16,7 @@ function parseLine(line: string): LogEntry { const obj = JSON.parse(line) if (obj.level && obj.time) { const ts = new Date(obj.time).toLocaleString('zh-CN', { hour12: false }).replace(/\//g, '-') - const levelMap: Record = { 10: 'DEBUG', 20: 'INFO', 30: 'WARN', 40: 'ERROR', 50: 'FATAL' } + const levelMap: Record = { 10: 'TRACE', 20: 'DEBUG', 30: 'INFO', 40: 'WARN', 50: 'ERROR', 60: 'FATAL' } // Pino 日志格式: { level, time, msg, name (logger name), hostname, pid, ... } const loggerName = obj.name || obj.logger || 'app' const message = obj.msg || (obj.err ? obj.err.message : '') @@ -39,6 +40,14 @@ export async function list(ctx: any) { files.push({ name: 'webui', size, modified }) } catch { } } + if (existsSync(BRIDGE_LOG_FILE)) { + try { + const stat = statSync(BRIDGE_LOG_FILE) + const size = stat.size > 1024 * 1024 ? `${(stat.size / 1024 / 1024).toFixed(1)}MB` : `${(stat.size / 1024).toFixed(1)}KB` + const modified = stat.mtime.toLocaleString() + files.push({ name: 'bridge', size, modified }) + } catch { } + } ctx.body = { files } } @@ -64,6 +73,21 @@ export async function read(ctx: any) { return } + if (logName === 'bridge') { + try { + if (!existsSync(BRIDGE_LOG_FILE)) { ctx.body = { entries: [] }; return } + const content = await readFile(BRIDGE_LOG_FILE, 'utf-8') + const rawLines = content.split('\n') + const sliced = rawLines.length > lines ? rawLines.slice(-lines) : rawLines + const entries: LogEntry[] = [] + for (const line of sliced) { if (!line.trim()) continue; entries.push(parseLine(line)) } + ctx.body = { entries: entries.reverse() } + } catch (err: any) { + ctx.status = 500; ctx.body = { error: err.message } + } + return + } + try { const content = await hermesCli.readLogs(logName, lines, level, session, since) const rawLines = content.split('\n') diff --git a/packages/server/src/db/hermes/schemas.ts b/packages/server/src/db/hermes/schemas.ts index 97fb5e6..b2c2547 100644 --- a/packages/server/src/db/hermes/schemas.ts +++ b/packages/server/src/db/hermes/schemas.ts @@ -115,7 +115,7 @@ export const GC_ROOMS_SCHEMA: Record = { inviteCode: 'TEXT UNIQUE', triggerTokens: 'INTEGER NOT NULL DEFAULT 100000', maxHistoryTokens: 'INTEGER NOT NULL DEFAULT 32000', - tailMessageCount: 'INTEGER NOT NULL DEFAULT 20', + tailMessageCount: 'INTEGER NOT NULL DEFAULT 10', totalTokens: 'INTEGER NOT NULL DEFAULT 0', } @@ -207,90 +207,6 @@ function tableExists(db: NonNullable>, tableName: strin return !!result } -/** - * 获取表的实际结构(包括主键) - */ -function getTableStructure(db: NonNullable>, tableName: string): { - columns: Map - primaryKey: string | null -} { - // 获取列信息 - const columns = db.prepare(`PRAGMA table_info("${tableName}")`).all() as Array<{ name: string; type: string; pk: number }> - const columnMap = new Map() - - for (const col of columns) { - columnMap.set(col.name, col.type) - } - - // 获取主键信息 - const tableInfo = db.prepare( - `SELECT sql FROM sqlite_master WHERE type='table' AND name=?` - ).get(tableName) as { sql: string } | undefined - - // 从 CREATE TABLE 语句中提取主键定义 - const sql = tableInfo?.sql || '' - const pkMatch = sql.match(/PRIMARY KEY\s*\(([^)]+)\)/i) - const primaryKey = pkMatch ? pkMatch[1].replace(/\s+/g, '') : null - - return { columns: columnMap, primaryKey } -} - -/** - * 提取列类型(从 schema 定义中) - */ -function extractType(schemaDef: string): string { - const types = ['TEXT', 'INTEGER', 'REAL', 'BLOB', 'NUMERIC'] - for (const type of types) { - if (schemaDef.toUpperCase().includes(type)) { - return type - } - } - return 'TEXT' -} - -/** - * 检查表结构是否完全匹配 schema(包括主键和列类型) - */ -function structureMatches( - actual: { columns: Map; primaryKey: string | null }, - schema: Record, - expectedPrimaryKey?: string -): boolean { - // 1. 检查主键 - if (expectedPrimaryKey) { - const expectedPKClean = expectedPrimaryKey.replace(/\s+/g, '') - if (actual.primaryKey !== expectedPKClean) { - return false // 主键不匹配 - } - } else { - if (actual.primaryKey) { - return false // 期望没有主键,但实际有 - } - } - - // 2. 检查列数量 - const columnMap = actual.columns as Map - if (columnMap.size !== Object.keys(schema).length) { - return false - } - - // 3. 检查列名和类型 - for (const [colName, colDef] of Object.entries(schema)) { - if (!columnMap.has(colName)) { - return false // 列不存在 - } - - const actualType = columnMap.get(colName)! - const expectedType = extractType(colDef) - - if (actualType !== expectedType) { - return false // 类型不匹配 - } - } - - return true -} - /** * 创建表(带完整 schema) */ @@ -314,102 +230,35 @@ function createTable( db.exec(`CREATE TABLE ${quoteIdentifier(tableName)} (${colDefs.join(', ')})`) } -/** - * 重建表(保留数据) - */ -function rebuildTable( +function canAddColumnToExistingTable(schemaDef: string): boolean { + const normalized = schemaDef.toUpperCase() + if (normalized.includes('PRIMARY KEY')) return false + if (normalized.includes('NOT NULL') && !normalized.includes('DEFAULT')) return false + return true +} + +function addMissingSafeColumns( db: NonNullable>, tableName: string, schema: Record, - primaryKey?: string ): void { - const tempTable = `${tableName}_rebuild_${Date.now()}` + const columns = db.prepare(`PRAGMA table_info(${quoteIdentifier(tableName)})`).all() as Array<{ name: string }> + const existingColumns = new Set(columns.map(col => col.name)) - // 1. 创建新表 - createTable(db, tempTable, schema, primaryKey) - - // 2. 找出两表共有的列(只复制这些列) - const actual = getTableStructure(db, tableName) - const commonCols = Array.from(actual.columns.keys()).filter((col) => schema[col]) - - // 3. 复制数据 - if (commonCols.length > 0) { - const colList = commonCols.map(c => quoteIdentifier(c)).join(', ') - db.exec(` - INSERT INTO ${quoteIdentifier(tempTable)} (${colList}) - SELECT ${colList} FROM ${quoteIdentifier(tableName)} - `) - } - - // 4. 删除旧表 - db.exec(`DROP TABLE ${quoteIdentifier(tableName)}`) - - // 5. 重命名新表 - db.exec(`ALTER TABLE ${quoteIdentifier(tempTable)} RENAME TO ${quoteIdentifier(tableName)}`) -} - -/** - * 同步表的列(不重建表) - */ -function syncColumns( - db: NonNullable>, - tableName: string, - schema: Record -): void { - const actual = getTableStructure(db, tableName) - const expectedCols = new Set(Object.keys(schema)) - - // 添加缺失的列 - for (const colName of expectedCols) { - if (!actual.columns.has(colName)) { - db.exec(`ALTER TABLE ${quoteIdentifier(tableName)} ADD COLUMN ${quoteIdentifier(colName)} ${schema[colName]}`) - } - } - - // 删除多余的列 - for (const colName of actual.columns.keys()) { - if (!expectedCols.has(colName)) { - db.exec(`ALTER TABLE ${quoteIdentifier(tableName)} DROP COLUMN ${quoteIdentifier(colName)}`) - } - } -} - -/** - * 同步索引 - */ -function syncIndexes( - db: NonNullable>, - tableName: string, - indexes: Record -): void { - const existingIndexes = db.prepare( - `SELECT name FROM sqlite_master WHERE type='index' AND tbl_name=?` - ).all(tableName) as Array<{ name: string }> - - const existingNames = new Set(existingIndexes.map(i => i.name)) - const expectedNames = new Set(Object.keys(indexes)) - - // 删除多余索引 - for (const name of existingNames) { - if (expectedNames.has(name)) { - try { db.exec(`DROP INDEX ${quoteIdentifier(name)}`) } catch { } - } - } - - // 创建新索引 - for (const [name, sql] of Object.entries(indexes)) { - if (!existingNames.has(name)) { - try { db.exec(sql) } catch { } + for (const [columnName, columnDef] of Object.entries(schema)) { + if (existingColumns.has(columnName)) continue + if (!canAddColumnToExistingTable(columnDef)) { + console.warn(`[Schema] ${tableName}.${columnName} cannot be added safely to existing table; skipping`) + continue } + db.exec(`ALTER TABLE ${quoteIdentifier(tableName)} ADD COLUMN ${quoteIdentifier(columnName)} ${columnDef}`) } } /** * 主同步函数 * - 表不存在:创建 - * - 表存在但结构不匹配(主键/类型):重建 - * - 表存在且结构匹配:同步列(增删) - * - 同步索引 + * - 表存在:只追加安全的新列,不删除、不重建、不修改主键/类型 */ export function syncTable( tableName: string, @@ -435,22 +284,7 @@ export function syncTable( return } - // 2. 表存在 → 检查结构 - const actual = getTableStructure(db, tableName) - const matches = structureMatches(actual, schema, options?.primaryKey) - - if (matches) { - // 结构完全匹配 → 同步列(理论上不会做任何事,但确保一致性) - syncColumns(db, tableName, schema) - } else { - // 结构不匹配 → 重建表 - rebuildTable(db, tableName, schema, options?.primaryKey) - } - - // 3. 同步索引(不管是否重建) - if (options?.indexes) { - syncIndexes(db, tableName, options.indexes) - } + addMissingSafeColumns(db, tableName, schema) } // ============================================================================ @@ -458,16 +292,11 @@ export function syncTable( // ============================================================================ /** - * Initialize all Hermes SQLite tables with proper schemas. - * This function automatically syncs all tables to match their schema definitions. + * Initialize missing Hermes SQLite tables with proper schemas. + * Existing tables only receive safe additive columns. * Call this once at application bootstrap. */ -export function initAllHermesTables(retryCount = 0): void { - // 防止无限重试(最多重试 1 次) - if (retryCount > 1) { - throw new Error('[Schema] ❌ Database initialization failed after multiple retry attempts. Please delete the database file manually and restart.') - } - +export function initAllHermesTables(): void { const db = getDb() if (!db) return @@ -511,73 +340,7 @@ export function initAllHermesTables(retryCount = 0): void { }) } catch (e) { console.error('Error initializing Hermes SQLite tables:', e) - - // 自动恢复:备份数据库 → 删除损坏的数据库 → 重新初始化 - console.warn('[Schema] Database initialization failed. Attempting automatic recovery...') - - try { - const dbPath = getStoragePath() - const { unlinkSync, copyFileSync, existsSync } = require('fs') - - if (!existsSync(dbPath)) { - console.log('[Schema] Database file does not exist. Creating new database...') - initAllHermesTables() - console.log('[Schema] Database created successfully!') - return - } - - // 检查是否已经存在备份(避免重复失败时创建多个备份) - const existingBackup = dbPath + '.corrupted.last' - let finalBackupPath: string | undefined - - if (existsSync(existingBackup)) { - console.log(`[Schema] Backup already exists: ${existingBackup}`) - console.log('[Schema] Deleting corrupted database without re-backup...') - try { - unlinkSync(dbPath) - } catch (deleteError) { - console.warn('[Schema] Failed to delete corrupted database:', deleteError) - } - } else { - // 没有备份,创建新备份 - const timestamp = Date.now() - const backupPath = dbPath + '.corrupted.' + timestamp - let backupSuccess = false - - try { - copyFileSync(dbPath, backupPath) - backupSuccess = true - finalBackupPath = backupPath - console.log(`[Schema] Backed up corrupted database to: ${backupPath}`) - } catch (backupError) { - console.warn('[Schema] Failed to backup database:', backupError) - } - - // 只有备份成功后才删除原文件 - if (backupSuccess) { - try { - unlinkSync(dbPath) - } catch (deleteError) { - console.warn('[Schema] Failed to delete corrupted database:', deleteError) - } - } - } - - // 3. 删除 WAL 和 SHM 文件 - try { unlinkSync(dbPath + '-wal') } catch { } - try { unlinkSync(dbPath + '-shm') } catch { } - - // 4. 重新初始化(增加重试计数) - console.log('[Schema] Reinitializing database...') - initAllHermesTables(retryCount + 1) - console.log('[Schema] Database recovered successfully! System is ready to use.') - const backupLocation = finalBackupPath || existingBackup - if (backupLocation) { - console.log(`[Schema] If you need to recover old data, restore from: ${backupLocation}`) - } - } catch (recoveryError) { - console.error('[Schema] Failed to recover database:', recoveryError) - throw recoveryError - } + console.error(`[Schema] Database initialization failed. Existing database was left untouched: ${getStoragePath()}`) + throw e } } diff --git a/packages/server/src/db/index.ts b/packages/server/src/db/index.ts index aa88142..eefec87 100644 --- a/packages/server/src/db/index.ts +++ b/packages/server/src/db/index.ts @@ -4,9 +4,12 @@ import { resolve } from 'path' import { homedir } from 'os' const isDev = process.env.NODE_ENV !== 'production' +const isTest = process.env.VITEST === 'true' || process.env.NODE_ENV === 'test' // In WSL, always use home directory to avoid cross-filesystem issues -const DB_DIR = isDev +const DB_DIR = isTest + ? resolve(process.cwd(), 'packages/server/data/test-runtime') + : isDev ? resolve(process.cwd(), 'packages/server/data') : resolve(homedir(), '.hermes-web-ui') const DB_PATH = resolve(DB_DIR, 'hermes-web-ui.db') diff --git a/packages/server/src/lib/context-compressor/index.ts b/packages/server/src/lib/context-compressor/index.ts index 55f744d..9f84b12 100644 --- a/packages/server/src/lib/context-compressor/index.ts +++ b/packages/server/src/lib/context-compressor/index.ts @@ -8,7 +8,7 @@ * 1. If total tokens < trigger threshold → return as-is * 2. Pre-clean: truncate old tool results (no LLM call) * 3. Load snapshot from SQLite for incremental update - * 4. Keep last 20 messages verbatim (tail protection by message count) + * 4. Keep last 10 messages verbatim (tail protection by message count) * 5. Summarize everything before the tail * 6. Save snapshot: last_message_index = index where compression ends */ @@ -44,7 +44,7 @@ export interface CompressionConfig { triggerTokens: number /** Summary token target (default: 8000) */ summaryBudget: number - /** Number of recent messages to keep verbatim (default: 20) */ + /** Number of recent messages to keep verbatim (default: 10) */ tailMessageCount: number /** Timeout for LLM summarization call (default: 60_000ms) */ summarizationTimeoutMs: number @@ -53,7 +53,7 @@ export interface CompressionConfig { export const DEFAULT_COMPRESSION_CONFIG: CompressionConfig = { triggerTokens: 100_000, summaryBudget: 8_000, - tailMessageCount: 20, + tailMessageCount: 10, summarizationTimeoutMs: 120_000, } @@ -521,6 +521,23 @@ export class ChatContextCompressor { const toCompress = newMessages.slice(0, tailStart) const tail = newMessages.slice(tailStart) + if (toCompress.length === 0) { + return { + messages: [ + { role: 'user', content: SUMMARY_PREFIX + '\n\n' + previousSummary }, + ...newMessages, + ], + meta: { + ...meta, + compressed: true, + llmCompressed: false, + summaryTokenEstimate: countTokens(SUMMARY_PREFIX + previousSummary), + verbatimCount: newMessages.length, + compressedStartIndex: lastMessageIndex, + }, + } + } + logger.info( '[context-compressor] [incremental-llm] compressing %d of %d new messages, keeping %d tail', toCompress.length, newMessages.length, tail.length, @@ -536,8 +553,21 @@ export class ChatContextCompressor { summary = await callSummarizer(upstream, apiKey, prompt, history, this.config.summarizationTimeoutMs, previousSummary, profile) logger.info('[context-compressor] incremental-llm done in %dms, %d chars', Date.now() - t0, summary.length) } catch (err: any) { - logger.warn('[context-compressor] incremental-llm failed: %s — reusing previous summary', err.message) - summary = previousSummary + logger.warn('[context-compressor] incremental-llm failed: %s — keeping new messages verbatim', err.message) + return { + messages: [ + { role: 'user', content: SUMMARY_PREFIX + '\n\n' + previousSummary }, + ...newMessages, + ], + meta: { + ...meta, + compressed: true, + llmCompressed: false, + summaryTokenEstimate: countTokens(SUMMARY_PREFIX + previousSummary), + verbatimCount: newMessages.length, + compressedStartIndex: lastMessageIndex, + }, + } } const result: ChatMessage[] = [ @@ -601,13 +631,15 @@ export class ChatContextCompressor { logger.warn('[context-compressor] full-llm failed: %s', err.message) } + if (!summary) { + return { messages: cleaned, meta } + } + const result: ChatMessage[] = [] - if (summary) { - result.push({ role: 'user', content: SUMMARY_PREFIX + '\n\n' + summary }) - if (sessionId) { - saveCompressionSnapshot(sessionId, summary, tailStart - 1, total) - } + result.push({ role: 'user', content: SUMMARY_PREFIX + '\n\n' + summary }) + if (sessionId) { + saveCompressionSnapshot(sessionId, summary, tailStart - 1, total) } result.push(...tail) diff --git a/packages/server/src/services/hermes/agent-bridge/client.ts b/packages/server/src/services/hermes/agent-bridge/client.ts index a6cbae3..4e6f3c9 100644 --- a/packages/server/src/services/hermes/agent-bridge/client.ts +++ b/packages/server/src/services/hermes/agent-bridge/client.ts @@ -1,6 +1,7 @@ import { setTimeout as delay } from 'timers/promises' import { createConnection, type Socket } from 'net' import { URL } from 'url' +import { bridgeLogger } from '../../logger' export const DEFAULT_AGENT_BRIDGE_ENDPOINT = process.platform === 'win32' ? 'tcp://127.0.0.1:18765' @@ -91,6 +92,36 @@ export class AgentBridgeClient { this.timeoutMs = options.timeoutMs ?? envPositiveInt('HERMES_AGENT_BRIDGE_TIMEOUT_MS') ?? DEFAULT_AGENT_BRIDGE_TIMEOUT_MS } + private summarizePayload(payload: Record): Record { + const action = String(payload.action || '') + const summary: Record = { action } + for (const key of ['session_id', 'run_id', 'request_id', 'approval_id', 'profile']) { + if (payload[key] != null) summary[key] = payload[key] + } + if (Array.isArray(payload.conversation_history)) summary.conversation_history_count = payload.conversation_history.length + if (Array.isArray(payload.messages)) summary.messages_count = payload.messages.length + if (typeof payload.message === 'string') summary.message_chars = payload.message.length + else if (Array.isArray(payload.message)) summary.message_parts = payload.message.length + if (typeof payload.command === 'string') summary.command = payload.command + if (typeof payload.text === 'string') summary.text_chars = payload.text.length + if (typeof payload.error === 'string') summary.error = payload.error + if (payload.force_compress === true) summary.force_compress = true + return summary + } + + private summarizeResponse(response: Record): Record { + const summary: Record = { ok: response.ok === true } + for (const key of ['session_id', 'run_id', 'request_id', 'status', 'cursor', 'event_cursor']) { + if (response[key] != null) summary[key] = response[key] + } + if (typeof response.delta === 'string') summary.delta_chars = response.delta.length + if (typeof response.output === 'string') summary.output_chars = response.output.length + if (Array.isArray(response.events)) summary.events_count = response.events.length + if (typeof response.error === 'string') summary.error = response.error + if (Array.isArray(response.history)) summary.history_count = response.history.length + return summary + } + async connect(): Promise { return this } @@ -191,16 +222,47 @@ export class AgentBridgeClient { ): Promise { const run = async (): Promise => { const timeoutMs = options.timeoutMs || this.timeoutMs - const socket = await this.connectSocket() - socket.write(`${JSON.stringify(payload)}\n`) - const raw = await this.readResponse(socket, timeoutMs) - const response = JSON.parse(raw) as { ok?: boolean; error?: string } - if (!response.ok) { - const error = new AgentBridgeError(response.error || 'Agent bridge request failed') - error.response = response - throw error + const startedAt = Date.now() + const action = String(payload.action || '') + const shouldLogRequest = action !== 'get_output' + if (shouldLogRequest) { + bridgeLogger.info({ + endpoint: this.endpoint, + timeoutMs, + request: this.summarizePayload(payload), + }, '[agent-bridge-client] request') + } + try { + const socket = await this.connectSocket() + socket.write(`${JSON.stringify(payload)}\n`) + const raw = await this.readResponse(socket, timeoutMs) + const response = JSON.parse(raw) as { ok?: boolean; error?: string } + if (!response.ok) { + const error = new AgentBridgeError(response.error || 'Agent bridge request failed') + error.response = response + bridgeLogger.warn({ + durationMs: Date.now() - startedAt, + response: this.summarizeResponse(response as Record), + }, '[agent-bridge-client] request rejected') + throw error + } + if (shouldLogRequest) { + bridgeLogger.info({ + durationMs: Date.now() - startedAt, + response: this.summarizeResponse(response as Record), + }, '[agent-bridge-client] response') + } + return response as T + } catch (err: any) { + if (!(err instanceof AgentBridgeError)) { + bridgeLogger.error({ + durationMs: Date.now() - startedAt, + err: { message: err?.message, name: err?.name }, + request: this.summarizePayload(payload), + }, '[agent-bridge-client] request failed') + } + throw err } - return response as T } const next = this.lock.then(run, run) @@ -218,6 +280,7 @@ export class AgentBridgeClient { conversationHistory?: unknown[], instructions?: string, profile?: string, + options: { force_compress?: boolean } = {}, ): Promise { return this.request({ action: 'chat', @@ -226,6 +289,7 @@ export class AgentBridgeClient { ...(conversationHistory ? { conversation_history: conversationHistory } : {}), ...(instructions ? { instructions } : {}), ...(profile ? { profile } : {}), + ...(options.force_compress ? { force_compress: true } : {}), }) } diff --git a/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py b/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py index 171b30c..6e526e8 100644 --- a/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py +++ b/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py @@ -732,6 +732,7 @@ class AgentPool: instructions: str | None = None, conversation_history: list[dict[str, Any]] | None = None, profile: str | None = None, + force_compress: bool = False, ) -> RunRecord: session = self.get_or_create(session_id, profile=profile) with session.lock: @@ -747,14 +748,14 @@ class AgentPool: thread = threading.Thread( target=self._run_chat, - args=(session, record, message, instructions, conversation_history, profile), + args=(session, record, message, instructions, conversation_history, profile, force_compress), daemon=True, name=f"hermes-bridge-run-{run_id[:8]}", ) thread.start() return record - def _run_chat(self, session: AgentSession, record: RunRecord, message: Any, instructions: str | None = None, conversation_history: list[dict[str, Any]] | None = None, profile: str | None = None) -> None: + def _run_chat(self, session: AgentSession, record: RunRecord, message: Any, instructions: str | None = None, conversation_history: list[dict[str, Any]] | None = None, profile: str | None = None, force_compress: bool = False) -> None: def stream_callback(delta: str) -> None: with self._lock: record.deltas.append(str(delta)) @@ -774,6 +775,19 @@ class AgentPool: except Exception: previous_approval_callback = None self._prepersist_user_message(session, message, conversation_history, profile) + if force_compress: + compress = getattr(session.agent, "_compress_context", None) + if callable(compress): + compressed_history, compressed_system = compress( + conversation_history if isinstance(conversation_history, list) else [], + instructions, + approx_tokens=None, + focus_topic="debug_force_compress", + ) + if isinstance(compressed_history, list): + conversation_history = compressed_history + if isinstance(compressed_system, str): + instructions = compressed_system kwargs: dict[str, Any] = dict( task_id=session.session_id, stream_callback=stream_callback, @@ -996,7 +1010,14 @@ class BridgeServer: instructions = req.get("instructions") or req.get("system_message") conversation_history = req.get("conversation_history") profile = req.get("profile") - record = self.pool.start_chat(session_id, message, instructions, conversation_history, profile) + record = self.pool.start_chat( + session_id, + message, + instructions, + conversation_history, + profile, + bool(req.get("force_compress")), + ) if req.get("wait"): timeout = float(req.get("timeout", 0) or 0) deadline = time.time() + timeout if timeout > 0 else None diff --git a/packages/server/src/services/hermes/chat-run-socket.ts b/packages/server/src/services/hermes/chat-run-socket.ts index 055f549..62fbc9f 100644 --- a/packages/server/src/services/hermes/chat-run-socket.ts +++ b/packages/server/src/services/hermes/chat-run-socket.ts @@ -25,7 +25,7 @@ import { ChatContextCompressor, countTokens, SUMMARY_PREFIX } from '../../lib/co import { getCompressionSnapshot } from '../../db/hermes/compression-snapshot' import { parseAnthropicContentArray } from '../../lib/llm-json' import { updateUsage } from '../../db/hermes/usage-store' -import { logger } from '../logger' +import { bridgeLogger, logger } from '../logger' import { AgentBridgeClient, type AgentBridgeMessage, type AgentBridgeOutput } from './agent-bridge' import { getActiveProfileName } from './hermes-profile' import type { ChatMessage } from '../../lib/context-compressor' @@ -194,6 +194,7 @@ interface SessionState { arguments: string startedAt: number }> + bridgeCompressionResults?: Record } interface ResponseRunState { @@ -205,6 +206,19 @@ interface ResponseRunState { type ChatRunSource = 'api_server' | 'cli' +interface BridgeCompressionResult { + messages: ChatMessage[] + beforeMessages: number + resultMessages: number + beforeTokens: number + afterTokens: number + compressed: boolean + llmCompressed: boolean + summaryTokens: number + verbatimCount: number + compressedStartIndex: number +} + // --- ChatRunSocket --- export class ChatRunSocket { @@ -795,6 +809,54 @@ export class ChatRunSocket { * then apply context compression (snapshot-aware + LLM) identically for both * api_server and CLI bridge runs. */ + private async buildDbHistory( + sessionId: string, + options: { excludeLastUser?: boolean } = {}, + ): Promise { + const detail = useLocalSessionStore() + ? getSessionDetail(sessionId) + : await getSessionDetailFromDb(sessionId) + if (!detail?.messages?.length) return [] + + const validMessages = detail.messages.filter(m => + (m.role === 'user' || m.role === 'assistant' || m.role === 'tool') && m.content !== undefined, + ) + + const sourceMessages = options.excludeLastUser + ? (() => { + const lastUserMsgIndex = [...validMessages].reverse().findIndex(m => m.role === 'user') + return lastUserMsgIndex >= 0 + ? validMessages.slice(0, validMessages.length - lastUserMsgIndex - 1) + : validMessages + })() + : validMessages + + return sourceMessages.map((m, idx, arr) => { + const msg: any = { role: m.role, content: m.content || '' } + if (m.reasoning_content) msg.reasoning_content = m.reasoning_content + if (m.tool_calls?.length) { + const cleanedToolCalls = m.tool_calls + .filter((tc: any) => tc.id && tc.id.length > 0) + .map((tc: any) => ({ id: tc.id, type: tc.type, function: tc.function })) + if (cleanedToolCalls.length > 0) msg.tool_calls = cleanedToolCalls + } + if (m.role === 'tool') { + let callId = m.tool_call_id + if (!callId || callId.length === 0) { + const prevMsg = arr[idx - 1] + if (prevMsg?.role === 'assistant' && prevMsg.tool_calls?.length) { + const tc = prevMsg.tool_calls.find((t: any) => t.function?.name === m.tool_name) + if (tc?.id) callId = tc.id + } + } + if (!callId || callId.length === 0) return null + msg.tool_call_id = callId + } + if (m.tool_name) msg.name = m.tool_name + return msg + }).filter((m): m is ChatMessage => m !== null) + } + private async buildCompressedHistory( sessionId: string, profile: string, @@ -803,44 +865,7 @@ export class ChatRunSocket { emit: (event: string, payload: any) => void, ): Promise { try { - const detail = useLocalSessionStore() - ? getSessionDetail(sessionId) - : await getSessionDetailFromDb(sessionId) - if (!detail?.messages?.length) return [] - - const validMessages = detail.messages.filter(m => - (m.role === 'user' || m.role === 'assistant' || m.role === 'tool') && m.content !== undefined, - ) - - // Exclude the last user message (just added by the caller) - const lastUserMsgIndex = [...validMessages].reverse().findIndex(m => m.role === 'user') - let history: ChatMessage[] = (lastUserMsgIndex >= 0 - ? validMessages.slice(0, validMessages.length - lastUserMsgIndex - 1) - : validMessages - ).map((m, idx, arr) => { - const msg: any = { role: m.role, content: m.content || '' } - if (m.reasoning_content) msg.reasoning_content = m.reasoning_content - if (m.tool_calls?.length) { - const cleanedToolCalls = m.tool_calls - .filter((tc: any) => tc.id && tc.id.length > 0) - .map((tc: any) => ({ id: tc.id, type: tc.type, function: tc.function })) - if (cleanedToolCalls.length > 0) msg.tool_calls = cleanedToolCalls - } - if (m.role === 'tool') { - let callId = m.tool_call_id - if (!callId || callId.length === 0) { - const prevMsg = arr[idx - 1] - if (prevMsg?.role === 'assistant' && prevMsg.tool_calls?.length) { - const tc = prevMsg.tool_calls.find((t: any) => t.function?.name === m.tool_name) - if (tc?.id) callId = tc.id - } - } - if (!callId || callId.length === 0) return null - msg.tool_call_id = callId - } - if (m.tool_name) msg.name = m.tool_name - return msg - }).filter((m): m is ChatMessage => m !== null) + let history = await this.buildDbHistory(sessionId, { excludeLastUser: true }) if (history.length === 0) return [] @@ -954,37 +979,39 @@ export class ChatRunSocket { private async forceCompressBridgeHistory( sessionId: string, profile: string, - messages: ChatMessage[], - ): Promise { - const history = messages - .filter(m => m && (m.role === 'user' || m.role === 'assistant' || m.role === 'tool' || m.role === 'system')) - .map(m => { - const msg: any = { role: m.role, content: m.content || '' } - if (m.reasoning_content) msg.reasoning_content = m.reasoning_content - if (m.tool_calls?.length) { - const cleanedToolCalls = m.tool_calls - .filter((tc: any) => tc.id && tc.id.length > 0) - .map((tc: any) => ({ id: tc.id, type: tc.type, function: tc.function })) - if (cleanedToolCalls.length > 0) msg.tool_calls = cleanedToolCalls - } - if (m.tool_call_id) msg.tool_call_id = m.tool_call_id - if (m.name) msg.name = m.name - return msg as ChatMessage - }) + _messages: ChatMessage[], + ): Promise { + const history = await this.buildDbHistory(sessionId, { excludeLastUser: true }) - if (history.length === 0) return [] + if (history.length === 0) { + return { + messages: [], + beforeMessages: 0, + resultMessages: 0, + beforeTokens: 0, + afterTokens: 0, + compressed: false, + llmCompressed: false, + summaryTokens: 0, + verbatimCount: 0, + compressedStartIndex: -1, + } + } const upstream = this.gatewayManager.getUpstream(profile).replace(/\/$/, '') const apiKey = this.gatewayManager.getApiKey(profile) || undefined const totalTokens = countTokens(JSON.stringify(history)) - logger.info('[context-compress] bridge forced compression session=%s: %d messages, ~%d tokens', - sessionId, history.length, totalTokens) + bridgeLogger.info({ + sessionId, + profile, + historyMessages: history.length, + bridgeProvidedMessages: Array.isArray(_messages) ? _messages.length : 0, + tokenEstimate: totalTokens, + snapshotAware: true, + }, '[chat-run-socket] bridge forced compression started') - const result = await compressor.compress(history, upstream, apiKey, undefined, profile) - logger.info('[context-compress] bridge forced compression done session=%s: %d -> %d messages', - sessionId, history.length, result.messages.length) - - return result.messages.map(m => { + const result = await compressor.compress(history, upstream, apiKey, sessionId, profile) + const compressedMessages = result.messages.map(m => { const msg: any = { role: m.role, content: m.content } if (m.reasoning_content) msg.reasoning_content = m.reasoning_content if (m.tool_calls?.length) { @@ -997,6 +1024,40 @@ export class ChatRunSocket { if (m.name) msg.name = m.name return msg }) + const afterTokens = countTokens(JSON.stringify(compressedMessages)) + bridgeLogger.info({ + sessionId, + profile, + beforeMessages: history.length, + resultMessages: result.messages.length, + beforeTokens: totalTokens, + afterTokens, + compressed: result.meta.compressed, + llmCompressed: result.meta.llmCompressed, + verbatimCount: result.meta.verbatimCount, + compressedStartIndex: result.meta.compressedStartIndex, + compressedHistory: result.messages.map((m) => ({ + role: m.role, + content: m.content, + reasoning_content: m.reasoning_content, + tool_calls: m.tool_calls, + tool_call_id: m.tool_call_id, + name: m.name, + })), + }, '[chat-run-socket] bridge forced compression completed') + + return { + messages: compressedMessages, + beforeMessages: history.length, + resultMessages: compressedMessages.length, + beforeTokens: totalTokens, + afterTokens, + compressed: result.meta.compressed, + llmCompressed: result.meta.llmCompressed, + summaryTokens: result.meta.summaryTokenEstimate, + verbatimCount: result.meta.verbatimCount, + compressedStartIndex: result.meta.compressedStartIndex, + } } private resolveRunSource(source?: string, sessionId?: string): ChatRunSource { @@ -1079,8 +1140,20 @@ export class ChatRunSocket { try { logger.info('[chat-run-socket] starting CLI bridge run for session %s', session_id) + bridgeLogger.info({ + sessionId: session_id, + profile, + inputChars: inputStr.length, + historyMessages: history.length, + hasInstructions: Boolean(instructions), + }, '[chat-run-socket] starting CLI bridge run') const started = await this.bridge.chat(session_id, input as AgentBridgeMessage, history, instructions, profile) state.runId = started.run_id + bridgeLogger.info({ + sessionId: session_id, + runId: started.run_id, + status: started.status, + }, '[chat-run-socket] CLI bridge run started') this.pushState(session_id, 'run.started', { event: 'run.started', run_id: started.run_id, @@ -1224,12 +1297,16 @@ export class ChatRunSocket { this.replaceState(sessionId, 'approval.resolved', payload) emit('approval.resolved', payload) } else if (evType === 'bridge.compression.requested') { + const bridgeHistory = await this.buildDbHistory(sessionId, { excludeLastUser: true }) + const tokenCount = bridgeHistory.length > 0 + ? countTokens(JSON.stringify(bridgeHistory)) + : ev.approx_tokens const payload = { event: 'compression.started', run_id: chunk.run_id, request_id: ev.request_id, - message_count: ev.message_count, - token_count: ev.approx_tokens, + message_count: bridgeHistory.length || ev.message_count, + token_count: tokenCount, source: 'bridge', } this.replaceState(sessionId, 'compression.started', payload) @@ -1241,7 +1318,9 @@ export class ChatRunSocket { profile, ev.messages as ChatMessage[], ) - await this.bridge.compressionRespond(String(ev.request_id), { messages: compressed }) + state.bridgeCompressionResults = state.bridgeCompressionResults || {} + state.bridgeCompressionResults[String(ev.request_id)] = compressed + await this.bridge.compressionRespond(String(ev.request_id), { messages: compressed.messages }) } catch (err: any) { await this.bridge.compressionRespond(String(ev.request_id), { error: err?.message || String(err), @@ -1249,18 +1328,30 @@ export class ChatRunSocket { } } } else if (evType === 'bridge.compression.completed') { + const compressionResult = ev.request_id + ? state.bridgeCompressionResults?.[String(ev.request_id)] + : undefined const payload = { event: 'compression.completed', run_id: chunk.run_id, request_id: ev.request_id, - compressed: ev.compressed !== false, - totalMessages: ev.message_count, - resultMessages: ev.result_messages, - beforeTokens: ev.approx_tokens, + compressed: compressionResult?.compressed ?? ev.compressed !== false, + llmCompressed: compressionResult?.llmCompressed, + totalMessages: compressionResult?.beforeMessages ?? ev.message_count, + resultMessages: compressionResult?.resultMessages ?? ev.result_messages, + beforeTokens: compressionResult?.beforeTokens ?? ev.approx_tokens, + afterTokens: compressionResult?.afterTokens, + summaryTokens: compressionResult?.summaryTokens, + verbatimCount: compressionResult?.verbatimCount, + compressedStartIndex: compressionResult?.compressedStartIndex, source: 'bridge', } + if (ev.request_id && state.bridgeCompressionResults) { + delete state.bridgeCompressionResults[String(ev.request_id)] + } this.replaceState(sessionId, 'compression.completed', payload) emit('compression.completed', payload) + await this.calcAndUpdateUsage(sessionId, state, emit) } else if (evType === 'bridge.compression.failed') { const payload = { event: 'compression.completed', @@ -1273,6 +1364,9 @@ export class ChatRunSocket { error: ev.error, source: 'bridge', } + if (ev.request_id && state.bridgeCompressionResults) { + delete state.bridgeCompressionResults[String(ev.request_id)] + } this.replaceState(sessionId, 'compression.completed', payload) emit('compression.completed', payload) } else if (evType === 'status') { diff --git a/packages/server/src/services/hermes/context-engine/types.ts b/packages/server/src/services/hermes/context-engine/types.ts index ef000de..fd5d05b 100644 --- a/packages/server/src/services/hermes/context-engine/types.ts +++ b/packages/server/src/services/hermes/context-engine/types.ts @@ -28,7 +28,7 @@ export interface CompressionConfig { export const DEFAULT_COMPRESSION_CONFIG: CompressionConfig = { triggerTokens: 100_000, maxHistoryTokens: 32_000, - tailMessageCount: 20, + tailMessageCount: 10, charsPerToken: 6, summarizationTimeoutMs: 30_000, } diff --git a/packages/server/src/services/hermes/group-chat/index.ts b/packages/server/src/services/hermes/group-chat/index.ts index 8dab65d..85d34ef 100644 --- a/packages/server/src/services/hermes/group-chat/index.ts +++ b/packages/server/src/services/hermes/group-chat/index.ts @@ -190,7 +190,7 @@ class ChatStorage { saveRoom(id: string, name: string, inviteCode?: string, config?: { triggerTokens?: number; maxHistoryTokens?: number; tailMessageCount?: number }): void { this.db()?.prepare( 'INSERT OR IGNORE INTO gc_rooms (id, name, inviteCode, triggerTokens, maxHistoryTokens, tailMessageCount) VALUES (?, ?, ?, ?, ?, ?)' - ).run(id, name, inviteCode || null, config?.triggerTokens ?? 100000, config?.maxHistoryTokens ?? 32000, config?.tailMessageCount ?? 20) + ).run(id, name, inviteCode || null, config?.triggerTokens ?? 100000, config?.maxHistoryTokens ?? 32000, config?.tailMessageCount ?? 10) } updateRoomConfig(roomId: string, config: { triggerTokens?: number; maxHistoryTokens?: number; tailMessageCount?: number }): void { diff --git a/packages/server/src/services/hermes/session-sync.ts b/packages/server/src/services/hermes/session-sync.ts index 17a4e2b..a357300 100644 --- a/packages/server/src/services/hermes/session-sync.ts +++ b/packages/server/src/services/hermes/session-sync.ts @@ -1,191 +1,13 @@ /** - * Sync Hermes sessions from all profiles on startup. - * Reads api_server sessions from Hermes state.db and imports into local DB. - * Only runs when local DB is empty (first startup). + * Hermes session import is intentionally disabled. * - * Uses sessions-db.ts query logic to properly aggregate session chains. + * Hermes state.db remains a read-only source for Hermes-specific history APIs. + * The web-ui local sessions/messages tables must not be populated from Hermes + * on startup, because that can mix ownership and make data-loss incidents much + * harder to reason about. */ -import { readdirSync, existsSync } from 'fs' -import { resolve, join } from 'path' -import { homedir } from 'os' -import { randomBytes } from 'crypto' -import { getProfileDir } from './hermes-profile' -import { createSession, addMessage, updateSession } from '../../db/hermes/session-store' -import { getDb } from '../../db/index' import { logger } from '../logger' -import { listSessionSummaries as listHermesSessionSummaries } from '../../db/hermes/sessions-db' -import { detectHermesHome } from './hermes-path' -const HERMES_BASE = detectHermesHome() -const PROFILES_DIR = join(HERMES_BASE, 'profiles') - -/** - * Generate a UUID v4 without external dependencies - */ -function generateUuid(): string { - const bytes = randomBytes(16) - bytes[6] = (bytes[6]! & 0x0f) | 0x40 // Version 4 - bytes[8] = (bytes[8]! & 0x3f) | 0x80 // Variant 10 - return [ - bytes.subarray(0, 4).toString('hex'), - bytes.subarray(4, 6).toString('hex'), - bytes.subarray(6, 8).toString('hex'), - bytes.subarray(8, 10).toString('hex'), - bytes.subarray(10, 16).toString('hex'), - ].join('-') -} - -/** - * Get all available profile names including 'default' - */ -function getAllProfiles(): string[] { - const profiles = ['default'] - - if (existsSync(PROFILES_DIR)) { - const dirs = readdirSync(PROFILES_DIR, { withFileTypes: true }) - .filter(dirent => dirent.isDirectory()) - .map(dirent => dirent.name) - profiles.push(...dirs) - } - - return profiles -} - -/** - * Sync api_server sessions from a single profile. - * Uses sessions-db.ts query logic to properly aggregate session chains. - */ -async function syncProfileSessions(profile: string): Promise<{ - synced: number - errors: string[] -}> { - const result = { synced: 0, errors: [] as string[] } - - try { - // Use listSessionSummaries to get aggregated session chains - // This returns only root sessions with aggregated stats from the entire chain - const summaries = await listHermesSessionSummaries('api_server', 10000, profile) - - logger.info(`[session-sync] profile '${profile}': found ${summaries.length} aggregated session chains`) - - for (const hermesSession of summaries) { - // Skip ephemeral sessions (created internally by chat-run-socket) - if (hermesSession.id.startsWith('eph_')) continue - try { - // Generate new session ID for local DB - const newSessionId = generateUuid() - - // Create session in local DB - createSession({ - id: newSessionId, - profile, - model: hermesSession.model, - title: hermesSession.title || undefined, - }) - - // Get full detail including all messages from the session chain - const { getSessionDetailFromDbWithProfile } = await import('../../db/hermes/sessions-db') - const detail = await getSessionDetailFromDbWithProfile(hermesSession.id, profile) - - if (!detail || !detail.messages) { - result.errors.push(`session ${hermesSession.id}: failed to load messages`) - logger.warn(`[session-sync] failed to load messages for session ${hermesSession.id}`) - continue - } - - // Insert all messages from the entire chain - for (const msg of detail.messages) { - addMessage({ - session_id: newSessionId, - role: msg.role, - content: msg.content, - tool_call_id: msg.tool_call_id, - tool_calls: msg.tool_calls, - tool_name: msg.tool_name, - timestamp: msg.timestamp, - token_count: msg.token_count, - finish_reason: msg.finish_reason, - reasoning: msg.reasoning, - reasoning_details: msg.reasoning_details, - reasoning_content: msg.reasoning_content, - }) - } - - // Update session with aggregated stats from Hermes - updateSession(newSessionId, { - started_at: hermesSession.started_at, - ended_at: hermesSession.ended_at, - end_reason: hermesSession.end_reason, - input_tokens: hermesSession.input_tokens, - output_tokens: hermesSession.output_tokens, - cache_read_tokens: hermesSession.cache_read_tokens, - cache_write_tokens: hermesSession.cache_write_tokens, - reasoning_tokens: hermesSession.reasoning_tokens, - estimated_cost_usd: hermesSession.estimated_cost_usd, - last_active: hermesSession.last_active, - preview: hermesSession.preview, - }) - - result.synced++ - logger.info(`[session-sync] synced Hermes session ${hermesSession.id} -> ${newSessionId} (${detail.messages.length} messages, thread_session_count=${detail.thread_session_count})`) - } catch (err: any) { - result.errors.push(`session ${hermesSession.id}: ${err.message}`) - logger.warn(err, `[session-sync] failed to sync session ${hermesSession.id}`) - } - } - } catch (err: any) { - if (!err.message.includes('state.db not found')) { - result.errors.push(err.message) - logger.warn(err, `[session-sync] failed to open state.db for profile '${profile}'`) - } - } - - return result -} - -/** - * Main entry point: sync all profiles on startup - * Only runs if local DB is empty (first startup or after DB reset) - */ export async function syncAllHermesSessionsOnStartup(): Promise { - // Check if local DB has any sessions - only sync if completely empty - const db = getDb() - if (!db) { - logger.info('[session-sync] SQLite not available, skipping Hermes sync') - return - } - - const countResult = db.prepare('SELECT COUNT(*) as count FROM sessions').get() as { count: number } | undefined - const hasExistingSessions = countResult && countResult.count > 0 - - if (hasExistingSessions) { - logger.info('[session-sync] local DB has %d sessions, skipping Hermes sync', countResult!.count) - return - } - - logger.info('[session-sync] local DB is empty, starting Hermes session sync...') - - const profiles = getAllProfiles() - logger.info(`[session-sync] found ${profiles.length} profiles: ${profiles.join(', ')}`) - - let totalSynced = 0 - let totalErrors = 0 - - for (const profile of profiles) { - const result = await syncProfileSessions(profile) - totalSynced += result.synced - totalErrors += result.errors.length - - if (result.errors.length > 0) { - logger.warn(`[session-sync] profile '${profile}' had ${result.errors.length} errors`) - for (const err of result.errors.slice(0, 5)) { - logger.warn(`[session-sync] - ${err}`) - } - if (result.errors.length > 5) { - logger.warn(`[session-sync] - ... and ${result.errors.length - 5} more errors`) - } - } - } - - logger.info(`[session-sync] sync complete: synced=${totalSynced}, errors=${totalErrors}`) + logger.info('[session-sync] Hermes session import is disabled') } diff --git a/packages/server/src/services/logger.ts b/packages/server/src/services/logger.ts index 042b11a..5421436 100644 --- a/packages/server/src/services/logger.ts +++ b/packages/server/src/services/logger.ts @@ -10,22 +10,28 @@ const logDir = resolve(homedir(), '.hermes-web-ui', 'logs') mkdirSync(logDir, { recursive: true }) const logFile = resolve(logDir, 'server.log') +const bridgeLogFile = resolve(logDir, 'bridge.log') -function rotateIfNeeded() { +function rotateFileIfNeeded(file: string) { try { - const stat = statSync(logFile) + const stat = statSync(file) if (stat.size > MAX_LOG_SIZE) { const keepSize = Math.floor(MAX_LOG_SIZE / 2) - const fd = openSync(logFile, 'r') + const fd = openSync(file, 'r') const buf = Buffer.alloc(keepSize) readSync(fd, buf, 0, keepSize, stat.size - keepSize) closeSync(fd) - truncateSync(logFile, 0) - writeFileSync(logFile, buf) + truncateSync(file, 0) + writeFileSync(file, buf) } } catch { } } +function rotateIfNeeded() { + rotateFileIfNeeded(logFile) + rotateFileIfNeeded(bridgeLogFile) +} + // Rotate on startup rotateIfNeeded() @@ -38,3 +44,11 @@ export const logger = pino({ dest: logFile, sync: true, })) + +export const bridgeLogger = pino({ + level: process.env.BRIDGE_LOG_LEVEL || process.env.LOG_LEVEL || 'info', + name: 'bridge', +}, pino.destination({ + dest: bridgeLogFile, + sync: true, +})) diff --git a/packages/server/src/shared/providers.ts b/packages/server/src/shared/providers.ts index 0c0999a..df88660 100644 --- a/packages/server/src/shared/providers.ts +++ b/packages/server/src/shared/providers.ts @@ -14,7 +14,7 @@ export interface ProviderPreset { export const PROVIDER_PRESETS: ProviderPreset[] = [ { - label: 'FUN-Codex', + label: 'Codex-apikey.fun', value: 'fun-codex', builtin: true, base_url: 'https://api.apikey.fun/v1', @@ -27,7 +27,7 @@ export const PROVIDER_PRESETS: ProviderPreset[] = [ ], }, { - label: 'FUN-Claude', + label: 'Claude-apikey.fun', value: 'fun-claude', builtin: true, base_url: 'https://api.apikey.fun', diff --git a/tests/server/context-compressor.test.ts b/tests/server/context-compressor.test.ts new file mode 100644 index 0000000..45cce66 --- /dev/null +++ b/tests/server/context-compressor.test.ts @@ -0,0 +1,112 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' + +const getCompressionSnapshotMock = vi.fn() +const saveCompressionSnapshotMock = vi.fn() +const deleteCompressionSnapshotMock = vi.fn() + +vi.mock('../../packages/server/src/services/logger', () => ({ + logger: { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }, +})) + +vi.mock('../../packages/server/src/db/hermes/compression-snapshot', () => ({ + getCompressionSnapshot: getCompressionSnapshotMock, + saveCompressionSnapshot: saveCompressionSnapshotMock, + deleteCompressionSnapshot: deleteCompressionSnapshotMock, +})) + +describe('ChatContextCompressor', () => { + let originalFetch: typeof global.fetch + + beforeEach(() => { + originalFetch = global.fetch + getCompressionSnapshotMock.mockReset() + saveCompressionSnapshotMock.mockReset() + deleteCompressionSnapshotMock.mockReset() + }) + + afterEach(() => { + global.fetch = originalFetch + }) + + it('keeps full history when full summarization fails', async () => { + const { ChatContextCompressor } = await import('../../packages/server/src/lib/context-compressor') + const compressor = new ChatContextCompressor({ config: { tailMessageCount: 3 } }) + const messages = Array.from({ length: 8 }, (_, i) => ({ + role: i % 2 === 0 ? 'user' : 'assistant', + content: `message ${i}`, + })) + + getCompressionSnapshotMock.mockReturnValue(null) + global.fetch = vi.fn(async () => ({ ok: false, status: 500 })) as any + + const result = await compressor.compress(messages, 'http://upstream', undefined, 's1') + + expect(result.messages).toHaveLength(messages.length) + expect(result.messages.map(m => m.content)).toEqual(messages.map(m => m.content)) + expect(result.meta.compressed).toBe(false) + expect(result.meta.llmCompressed).toBe(false) + expect(saveCompressionSnapshotMock).not.toHaveBeenCalled() + }) + + it('keeps all new messages when incremental summarization fails', async () => { + const { ChatContextCompressor, SUMMARY_PREFIX } = await import('../../packages/server/src/lib/context-compressor') + const compressor = new ChatContextCompressor({ config: { tailMessageCount: 3 } }) + const messages = Array.from({ length: 8 }, (_, i) => ({ + role: i % 2 === 0 ? 'user' : 'assistant', + content: `message ${i}`, + })) + + getCompressionSnapshotMock.mockReturnValue({ + summary: 'previous summary', + lastMessageIndex: 1, + messageCountAtTime: 2, + }) + global.fetch = vi.fn(async () => ({ ok: false, status: 500 })) as any + + const result = await compressor.compress(messages, 'http://upstream', undefined, 's1') + + expect(result.messages).toHaveLength(7) + expect(result.messages[0]).toEqual({ + role: 'user', + content: `${SUMMARY_PREFIX}\n\nprevious summary`, + }) + expect(result.messages.slice(1).map(m => m.content)).toEqual(messages.slice(2).map(m => m.content)) + expect(result.meta.compressed).toBe(true) + expect(result.meta.llmCompressed).toBe(false) + expect(result.meta.compressedStartIndex).toBe(1) + expect(result.meta.verbatimCount).toBe(6) + expect(saveCompressionSnapshotMock).not.toHaveBeenCalled() + }) + + it('does not call the summarizer when snapshot has only tail messages after it', async () => { + const { ChatContextCompressor, SUMMARY_PREFIX } = await import('../../packages/server/src/lib/context-compressor') + const compressor = new ChatContextCompressor({ config: { tailMessageCount: 10 } }) + const messages = Array.from({ length: 6 }, (_, i) => ({ + role: i % 2 === 0 ? 'user' : 'assistant', + content: `message ${i}`, + })) + const fetchMock = vi.fn() + + getCompressionSnapshotMock.mockReturnValue({ + summary: 'previous summary', + lastMessageIndex: 3, + messageCountAtTime: 4, + }) + global.fetch = fetchMock as any + + const result = await compressor.compress(messages, 'http://upstream', undefined, 's1') + + expect(fetchMock).not.toHaveBeenCalled() + expect(result.messages).toHaveLength(3) + expect(result.messages[0].content).toBe(`${SUMMARY_PREFIX}\n\nprevious summary`) + expect(result.messages.slice(1).map(m => m.content)).toEqual(['message 4', 'message 5']) + expect(result.meta.llmCompressed).toBe(false) + expect(result.meta.compressedStartIndex).toBe(3) + expect(saveCompressionSnapshotMock).not.toHaveBeenCalled() + }) +}) diff --git a/tests/server/hermes-schemas.test.ts b/tests/server/hermes-schemas.test.ts index 4faa5e7..0ff3eb3 100644 --- a/tests/server/hermes-schemas.test.ts +++ b/tests/server/hermes-schemas.test.ts @@ -41,7 +41,7 @@ describe('Hermes schema initialization', () => { expect(usageCols.some(c => c.name === 'output_tokens')).toBe(true) }) - it('preserves existing data when syncing schemas', async () => { + it('preserves existing data when adding safe schema columns', async () => { const { initAllHermesTables, USAGE_TABLE, USAGE_SCHEMA } = await import('../../packages/server/src/db/hermes/schemas') @@ -51,7 +51,7 @@ describe('Hermes schema initialization', () => { // Insert test data db.prepare(`INSERT INTO "${USAGE_TABLE}" (session_id, created_at) VALUES (?, ?)`).run('test-session', Date.now()) - // Run initialization (should sync schema) + // Run initialization (should add safe missing columns) expect(() => initAllHermesTables()).not.toThrow() // Verify data is preserved @@ -59,7 +59,7 @@ describe('Hermes schema initialization', () => { expect(row).toBeTruthy() expect(row.session_id).toBe('test-session') - // Verify new columns were added + // Verify safe new columns were added const cols = db.prepare(`PRAGMA table_info("${USAGE_TABLE}")`).all() as Array<{ name: string }> expect(cols.some(c => c.name === 'input_tokens')).toBe(true) expect(cols.some(c => c.name === 'output_tokens')).toBe(true) diff --git a/tests/server/schema-sync.test.ts b/tests/server/schema-sync.test.ts index fed9c10..623c9b0 100644 --- a/tests/server/schema-sync.test.ts +++ b/tests/server/schema-sync.test.ts @@ -136,8 +136,8 @@ describe('Database Schema Synchronization', () => { }) }) - describe('Schema sync with column additions', () => { - it('adds missing columns to existing table without rebuilding', async () => { + describe('Safe additive schema changes', () => { + it('adds missing safe columns to existing table without rebuilding', async () => { const { syncTable, USAGE_TABLE, USAGE_SCHEMA } = await import('../../packages/server/src/db/hermes/schemas') // Create initial table without some columns @@ -150,7 +150,7 @@ describe('Database Schema Synchronization', () => { // Sync with full schema syncTable(USAGE_TABLE, USAGE_SCHEMA, { primaryKey: 'id' }) - // Verify all columns now exist + // Verify safe missing columns now exist const cols = getTableColumns(db, USAGE_TABLE) expect(cols.has('input_tokens')).toBe(true) expect(cols.has('output_tokens')).toBe(true) @@ -209,8 +209,8 @@ describe('Database Schema Synchronization', () => { }) }) - describe('Primary key changes trigger table rebuild', () => { - it('rebuilds table when primary key changes from single column to id column', async () => { + describe('Destructive schema changes are not applied automatically', () => { + it('does not rebuild table when primary key differs', async () => { const { syncTable, GC_ROOM_MEMBERS_TABLE, GC_ROOM_MEMBERS_SCHEMA } = await import('../../packages/server/src/db/hermes/schemas') @@ -228,9 +228,9 @@ describe('Database Schema Synchronization', () => { primaryKey: 'id', }) - // Verify id-based primary key - const pk = getTablePrimaryKey(db, GC_ROOM_MEMBERS_TABLE) - expect(pk).toBe('id') + // Verify existing primary key was left untouched + const tableCols = db.prepare(`PRAGMA table_info("${GC_ROOM_MEMBERS_TABLE}")`).all() as Array<{ name: string; pk: number }> + expect(tableCols.find(c => c.name === 'roomId')?.pk).toBe(1) // Verify data was preserved const row = db.prepare(`SELECT * FROM "${GC_ROOM_MEMBERS_TABLE}" WHERE roomId = ? AND userId = ?`).get('room-1', 'user-1') @@ -238,10 +238,8 @@ describe('Database Schema Synchronization', () => { expect(row.roomId).toBe('room-1') expect(row.userId).toBe('user-1') }) - }) - describe('Schema sync with type changes', () => { - it('rebuilds table when column types change', async () => { + it('does not rebuild table when column types differ', async () => { const { syncTable, USAGE_TABLE, USAGE_SCHEMA } = await import('../../packages/server/src/db/hermes/schemas') const db = getTestDb() @@ -255,17 +253,13 @@ describe('Database Schema Synchronization', () => { // Sync with correct schema syncTable(USAGE_TABLE, USAGE_SCHEMA, { primaryKey: 'id' }) - // Verify column type is correct (should be TEXT now) + // Verify column type was left untouched const cols = getTableColumns(db, USAGE_TABLE) - expect(cols.get('session_id')).toBe('TEXT') + expect(cols.get('session_id')).toBe('INTEGER') - // Verify data was preserved (SQLite can convert INTEGER to TEXT) + // Verify data was preserved const rows = db.prepare(`SELECT COUNT(*) as count FROM "${USAGE_TABLE}"`).get() as { count: number } expect(rows.count).toBe(1) - - // Verify the converted value - const row = db.prepare(`SELECT session_id FROM "${USAGE_TABLE}"`).get() as { session_id: string } - expect(row.session_id).toBe('12345') }) }) @@ -287,7 +281,7 @@ describe('Database Schema Synchronization', () => { expect(indexes).toBeTruthy() }) - it('removes obsolete indexes', async () => { + it('does not alter indexes on existing tables', async () => { const { syncTable, MESSAGES_TABLE, MESSAGES_SCHEMA } = await import('../../packages/server/src/db/hermes/schemas') @@ -304,18 +298,18 @@ describe('Database Schema Synchronization', () => { }, }) - // Verify extra index was removed + // Verify extra index remains const extraIndex = db.prepare(`SELECT name FROM sqlite_master WHERE type='index' AND name=?`).get('idx_extra') - expect(extraIndex).toBeFalsy() + expect(extraIndex).toBeTruthy() - // Verify correct index was created + // Verify expected index was not added to an existing table const correctIndex = db.prepare(`SELECT name FROM sqlite_master WHERE type='index' AND name=?`).get('idx_messages_session_id') - expect(correctIndex).toBeTruthy() + expect(correctIndex).toBeFalsy() }) }) describe('Data preservation during schema sync', () => { - it('preserves data when only adding columns', async () => { + it('preserves data when adding safe columns', async () => { const { syncTable, USAGE_TABLE, USAGE_SCHEMA } = await import('../../packages/server/src/db/hermes/schemas') const db = getTestDb() @@ -327,16 +321,19 @@ describe('Database Schema Synchronization', () => { const sessionId = 'test-session-123' db.prepare(`INSERT INTO "${USAGE_TABLE}" (session_id, created_at) VALUES (?, ?)`).run(sessionId, Date.now()) - // Sync with full schema (should add columns without rebuilding) + // Sync with full schema (should add safe columns only) syncTable(USAGE_TABLE, USAGE_SCHEMA, { primaryKey: 'id' }) // Verify data is still there const row = db.prepare(`SELECT * FROM "${USAGE_TABLE}" WHERE session_id = ?`).get(sessionId) expect(row).toBeTruthy() expect(row.session_id).toBe(sessionId) + + const cols = getTableColumns(db, USAGE_TABLE) + expect(cols.has('input_tokens')).toBe(true) }) - it('preserves data when rebuilding table with compatible columns', async () => { + it('preserves data and existing table definition when primary key is missing', async () => { const { syncTable, GC_ROOM_AGENTS_TABLE, GC_ROOM_AGENTS_SCHEMA } = await import('../../packages/server/src/db/hermes/schemas') @@ -349,11 +346,13 @@ describe('Database Schema Synchronization', () => { db.prepare(`INSERT INTO "${GC_ROOM_AGENTS_TABLE}" (id, roomId, agentId, profile, name, description, invited) VALUES (?, ?, ?, ?, ?, ?, ?)`) .run('agent-1', 'room-1', 'agent-1', 'default', 'Test Agent', '', 0) - // Sync with id primary key (triggers rebuild) + // Sync with id primary key expectation; should not rebuild existing table syncTable(GC_ROOM_AGENTS_TABLE, GC_ROOM_AGENTS_SCHEMA, { primaryKey: 'id', }) + expect(getTablePrimaryKey(db, GC_ROOM_AGENTS_TABLE)).toBe(null) + // Verify data was preserved const row = db.prepare(`SELECT * FROM "${GC_ROOM_AGENTS_TABLE}" WHERE id = ?`) .get('agent-1') @@ -365,8 +364,8 @@ describe('Database Schema Synchronization', () => { }) }) - describe('Column deletion', () => { - it('removes extra columns from existing table', async () => { + describe('Column preservation', () => { + it('keeps extra columns on existing table', async () => { const { syncTable, USAGE_TABLE, USAGE_SCHEMA } = await import('../../packages/server/src/db/hermes/schemas') // Create table with extra columns @@ -377,13 +376,13 @@ describe('Database Schema Synchronization', () => { db.prepare(`INSERT INTO "${USAGE_TABLE}" (session_id, created_at, extra_col, another_extra) VALUES (?, ?, ?, ?)`) .run('test-1', Date.now(), 'value', 123) - // Sync with schema (should remove extra columns) + // Sync with schema (should keep extra columns) syncTable(USAGE_TABLE, USAGE_SCHEMA, { primaryKey: 'id' }) - // Verify extra columns are gone + // Verify extra columns are preserved const cols = getTableColumns(db, USAGE_TABLE) - expect(cols.has('extra_col')).toBe(false) - expect(cols.has('another_extra')).toBe(false) + expect(cols.has('extra_col')).toBe(true) + expect(cols.has('another_extra')).toBe(true) // Verify data is still there const row = db.prepare(`SELECT * FROM "${USAGE_TABLE}" WHERE session_id = ?`).get('test-1') diff --git a/tests/server/session-sync.test.ts b/tests/server/session-sync.test.ts index 780f091..39dbec7 100644 --- a/tests/server/session-sync.test.ts +++ b/tests/server/session-sync.test.ts @@ -1,70 +1,60 @@ /** - * Tests for session-sync service + * Tests for the disabled Hermes session import path. */ -import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest' -import { getDb } from '../../packages/server/src/db/index' -import { initAllStores } from '../../packages/server/src/db/hermes/init' -import { listSessionSummaries } from '../../packages/server/src/db/hermes/sessions-db' -import { syncAllHermesSessionsOnStartup } from '../../packages/server/src/services/hermes/session-sync' - -vi.mock('../../packages/server/src/db/hermes/sessions-db', () => ({ - listSessionSummaries: vi.fn().mockResolvedValue([]), - getSessionDetailFromDbWithProfile: vi.fn(), -})) - -function resetSessionTables(): void { - initAllStores() - - const db = getDb() - if (db) { - db.exec('DELETE FROM messages') - db.exec('DELETE FROM sessions') - } -} +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' describe('session-sync', () => { - beforeEach(() => { - vi.clearAllMocks() - resetSessionTables() + let db: any = null + + beforeEach(async () => { + vi.resetModules() + const { DatabaseSync } = await import('node:sqlite') + db = new DatabaseSync(':memory:') + vi.doMock('../../packages/server/src/db/index', () => ({ + getDb: () => db, + getStoragePath: () => ':memory:', + })) + vi.doMock('../../packages/server/src/db/hermes/sessions-db', () => ({ + listSessionSummaries: vi.fn().mockResolvedValue([]), + getSessionDetailFromDbWithProfile: vi.fn(), + })) }) afterEach(() => { - resetSessionTables() + db?.close() + db = null + vi.doUnmock('../../packages/server/src/db/index') + vi.doUnmock('../../packages/server/src/db/hermes/sessions-db') + vi.resetModules() }) - it('should skip sync when local DB is not empty', async () => { - const db = getDb() - expect(db).not.toBeNull() + async function initTestDb() { + const { initAllStores } = await import('../../packages/server/src/db/hermes/init') + initAllStores() + } - // Insert a test session - db!.prepare(` + it('does not import Hermes sessions when local DB is not empty', async () => { + await initTestDb() + const { syncAllHermesSessionsOnStartup } = await import('../../packages/server/src/services/hermes/session-sync') + + db.prepare(` INSERT INTO sessions (id, profile, source, model, title, started_at, last_active) - VALUES ('test-session-1', 'default', 'api_server', 'gpt-4', 'Test Session', ${Date.now()}, ${Date.now()}) - `).run() + VALUES ('test-session-1', 'default', 'api_server', 'gpt-4', 'Test Session', ?, ?) + `).run(Date.now(), Date.now()) - // Check that session exists - const countResult = db!.prepare('SELECT COUNT(*) as count FROM sessions').get() as { count: number } - expect(countResult.count).toBe(1) - - // Run sync - should skip because DB is not empty await syncAllHermesSessionsOnStartup() - expect(vi.mocked(listSessionSummaries)).not.toHaveBeenCalled() - // Verify session still exists (no changes) - const countAfter = db!.prepare('SELECT COUNT(*) as count FROM sessions').get() as { count: number } + const countAfter = db.prepare('SELECT COUNT(*) as count FROM sessions').get() as { count: number } expect(countAfter.count).toBe(1) }) - it('should attempt sync when local DB is empty', async () => { - const db = getDb() - expect(db).not.toBeNull() + it('does not import Hermes sessions when local DB is empty', async () => { + await initTestDb() + const { syncAllHermesSessionsOnStartup } = await import('../../packages/server/src/services/hermes/session-sync') - // Verify DB is empty - const countBefore = db!.prepare('SELECT COUNT(*) as count FROM sessions').get() as { count: number } - expect(countBefore.count).toBe(0) - - // Run sync - should attempt to sync from Hermes await expect(syncAllHermesSessionsOnStartup()).resolves.toBeUndefined() - expect(vi.mocked(listSessionSummaries)).toHaveBeenCalledWith('api_server', 10000, 'default') + + const countAfter = db.prepare('SELECT COUNT(*) as count FROM sessions').get() as { count: number } + expect(countAfter.count).toBe(0) }) })