/** * Context compression — build conversation history from DB, * apply snapshot-aware compression and LLM summarization. */ import { getSessionDetail, getSession, } from '../../../db/hermes/session-store' import { getCompressionSnapshot } from '../../../db/hermes/compression-snapshot' import { ChatContextCompressor, SUMMARY_PREFIX } from '../../../lib/context-compressor' import { getModelContextLength } from '../model-context' import { readConfigYamlForProfile } from '../../config-helpers' import { logger } from '../../logger' import { bridgeLogger } from '../../logger' import { calcAndUpdateUsage, estimateUsageTokensFromMessages } from './usage' import { isAssistantMessageSendable } from './message-format' import type { ChatMessage, CompressionConfig as CompressorConfig } from '../../../lib/context-compressor' import type { SessionState, BridgeCompressionResult } from './types' interface RunChatCompressionConfig { enabled: boolean triggerTokens: number compressor: Partial } function isSnapshotUsable( snapshot: { lastMessageIndex: number } | null, history: ChatMessage[], ): boolean { return !!snapshot && snapshot.lastMessageIndex >= 0 && snapshot.lastMessageIndex < history.length } function buildSnapshotHistory( snapshot: { summary: string; lastMessageIndex: number } | null, history: ChatMessage[], compressionConfig?: Partial, ): ChatMessage[] | null { if (!snapshot) return null const headCount = compressionConfig?.headMessageCount || 0 const tailCount = compressionConfig?.tailMessageCount || 0 const protectedHead = headCount > 0 ? history.slice(0, headCount) : [] const summaryMessage = { role: 'user', content: SUMMARY_PREFIX + '\n\n' + snapshot.summary } as ChatMessage if (isSnapshotUsable(snapshot, history)) { return [ ...protectedHead, summaryMessage, ...history.slice(snapshot.lastMessageIndex + 1), ] } const tailStart = Math.max(protectedHead.length, history.length - tailCount) return [ ...protectedHead, summaryMessage, ...history.slice(tailStart), ] } function clampRatio(value: unknown, fallback: number, min: number, max: number): number { const n = typeof value === 'number' && Number.isFinite(value) ? value : fallback return Math.min(max, Math.max(min, n)) } function clampInt(value: unknown, fallback: number, min: number, max: number): number { const n = typeof value === 'number' && Number.isFinite(value) ? Math.floor(value) : fallback return Math.min(max, Math.max(min, n)) } async function getRunChatCompressionConfig(profile: string, contextLength: number): Promise { let raw: Record = {} try { raw = (await readConfigYamlForProfile(profile))?.compression || {} } catch (err) { logger.warn(err, '[context-compress] failed to read compression config for profile %s, using defaults', profile) } const threshold = clampRatio(raw.threshold, 0.5, 0.05, 0.95) const targetRatio = clampRatio(raw.target_ratio, 0.2, 0.01, 0.8) const protectLastN = clampInt(raw.protect_last_n, 20, 0, 500) const protectFirstN = clampInt(raw.protect_first_n, 3, 0, 100) return { enabled: raw.enabled !== false, triggerTokens: Math.floor(contextLength * threshold), compressor: { triggerTokens: Math.floor(contextLength * threshold), summaryBudget: Math.max(1_000, Math.floor(contextLength * targetRatio)), headMessageCount: protectFirstN, tailMessageCount: protectLastN, }, } } /** * Load conversation history from DB with full message structure (user/assistant/tool). */ export async function buildDbHistory( sessionId: string, options: { excludeLastUser?: boolean } = {}, ): Promise { const detail = getSessionDetail(sessionId) if (!detail?.messages?.length) return [] const validMessages = detail.messages.filter(m => (m.role === 'user' || m.role === 'assistant' || m.role === 'tool') && m.content !== undefined, ) const sourceMessages = options.excludeLastUser ? (() => { const lastUserMsgIndex = [...validMessages].reverse().findIndex(m => m.role === 'user') return lastUserMsgIndex >= 0 ? validMessages.slice(0, validMessages.length - lastUserMsgIndex - 1) : validMessages })() : validMessages return sourceMessages.map((m, idx, arr) => { const msg: any = { role: m.role, content: m.content || '' } if (m.reasoning_content) msg.reasoning_content = m.reasoning_content if (m.tool_calls?.length) { const cleanedToolCalls = m.tool_calls .filter((tc: any) => tc.id && tc.id.length > 0) .map((tc: any) => ({ id: tc.id, type: tc.type, function: tc.function })) if (cleanedToolCalls.length > 0) msg.tool_calls = cleanedToolCalls } if (m.role === 'tool') { let callId = m.tool_call_id if (!callId || callId.length === 0) { const prevMsg = arr[idx - 1] if (prevMsg?.role === 'assistant' && prevMsg.tool_calls?.length) { const tc = prevMsg.tool_calls.find((t: any) => t.function?.name === m.tool_name) if (tc?.id) callId = tc.id } } if (!callId || callId.length === 0) return null msg.tool_call_id = callId } if (m.tool_name) msg.name = m.tool_name if (m.role === 'assistant' && !isAssistantMessageSendable(msg)) { logger.warn('[chat-run-socket] skipped empty assistant message while building history for session %s', sessionId) return null } return msg }).filter((m): m is ChatMessage => m !== null) } export function estimateSnapshotAwareHistoryUsage( sessionId: string, history: ChatMessage[], ): { messageCount: number; tokenCount: number } { const snapshot = getCompressionSnapshot(sessionId) const messages = buildSnapshotHistory(snapshot, history) || history const usage = estimateUsageTokensFromMessages(messages) return { messageCount: messages.length, tokenCount: usage.inputTokens + usage.outputTokens, } } export async function buildCompressedHistory( sessionId: string, profile: string, upstream: string, apiKey: string | undefined, emit: (event: string, payload: any) => void, sessionMap: Map, modelContext: { model?: string | null; provider?: string | null } = {}, ): Promise { try { let history = await buildDbHistory(sessionId, { excludeLastUser: true }) if (history.length === 0) return [] const contextLength = getModelContextLength({ profile, model: modelContext.model, provider: modelContext.provider, }) const compressionConfig = await getRunChatCompressionConfig(profile, contextLength) const triggerTokens = compressionConfig.triggerTokens if (!compressionConfig.enabled) { logger.info('[context-compress] session=%s: compression disabled by config', sessionId) return history } const cState = getOrCreateSession(sessionMap, sessionId) const assembledTokens = await calcAndUpdateUsage(sessionId, cState, emit) let totalTokens = assembledTokens.inputTokens + assembledTokens.outputTokens const snapshot = getCompressionSnapshot(sessionId) const staleSnapshot = snapshot && !isSnapshotUsable(snapshot, history) if (staleSnapshot) { logger.warn('[context-compress] session=%s: stale snapshot index %d for %d history messages; using summary plus safe tail', sessionId, snapshot.lastMessageIndex, history.length) const staleHistory = buildSnapshotHistory(snapshot, history, compressionConfig.compressor) || history const staleUsage = estimateUsageTokensFromMessages(staleHistory) totalTokens = staleUsage.inputTokens + staleUsage.outputTokens } if (snapshot && !staleSnapshot) { const newMessages = history.slice(snapshot.lastMessageIndex + 1) logger.info('[context-compress] session=%s: snapshot at %d, %d new messages, assembled ~%d tokens (threshold %d)', sessionId, snapshot.lastMessageIndex, newMessages.length, totalTokens, triggerTokens) if (totalTokens <= triggerTokens) { history = buildSnapshotHistory(snapshot, history, compressionConfig.compressor) || history } else { history = await compressHistory(history, newMessages, sessionId, upstream, apiKey, cState, totalTokens, emit, sessionMap, modelContext, compressionConfig.compressor) } } else if (snapshot && staleSnapshot) { if (totalTokens <= triggerTokens) { history = buildSnapshotHistory(snapshot, history, compressionConfig.compressor) || history } else { history = await compressHistory(history, null, sessionId, upstream, apiKey, cState, totalTokens, emit, sessionMap, modelContext, compressionConfig.compressor) } } else if (history.length > 4) { if (totalTokens <= triggerTokens) { logger.info('[context-compress] session=%s: %d messages, ~%d tokens — under threshold, skip', sessionId, history.length, totalTokens) } else { history = await compressHistory(history, null, sessionId, upstream, apiKey, cState, totalTokens, emit, sessionMap, modelContext, compressionConfig.compressor) } } return history } catch (err) { logger.warn(err, '[chat-run-socket] failed to build compressed history for session %s', sessionId) return [] } } export async function compressHistory( history: ChatMessage[], newMessagesOnly: ChatMessage[] | null, sessionId: string, upstream: string, apiKey: string | undefined, cState: SessionState, totalTokens: number, emit: (event: string, payload: any) => void, sessionMap: Map, modelContext: { model?: string | null; provider?: string | null } = {}, compressionConfig?: Partial, ): Promise { const msgCount = newMessagesOnly ? newMessagesOnly.length : history.length pushState(sessionMap, sessionId, 'compression.started', { event: 'compression.started', message_count: msgCount, token_count: totalTokens, }) emit('compression.started', { event: 'compression.started', message_count: msgCount, token_count: totalTokens, }) try { const session = getSession(sessionId) const compressor = new ChatContextCompressor({ config: compressionConfig }) const result = await compressor.compress(history, upstream, apiKey, sessionId, { profile: session?.profile, model: modelContext.model || session?.model, provider: modelContext.provider || session?.provider, }) const afterTokens = await calcAndUpdateUsage(sessionId, cState, emit) const compressedMeta = { event: 'compression.completed' as const, compressed: result.meta.compressed, llmCompressed: result.meta.llmCompressed, totalMessages: result.meta.totalMessages, resultMessages: result.messages.length, beforeTokens: totalTokens, afterTokens: afterTokens.inputTokens + afterTokens.outputTokens, summaryTokens: result.meta.summaryTokenEstimate, verbatimCount: result.meta.verbatimCount, compressedStartIndex: result.meta.compressedStartIndex, } replaceState(sessionMap, sessionId, 'compression.completed', compressedMeta) logger.info('[context-compress] AFTER session=%s: %d messages, ~%d tokens (was %d)', sessionId, result.messages.length, afterTokens.inputTokens + afterTokens.outputTokens, totalTokens) emit('compression.completed', compressedMeta) const compressed = result.messages.map(m => { const msg: any = { role: m.role, content: m.content, tool_call_id: m.tool_call_id, name: m.name } if (m.reasoning_content) msg.reasoning_content = m.reasoning_content if (m.tool_calls?.length) { const cleanedToolCalls = m.tool_calls .filter((tc: any) => tc.id && tc.id.length > 0) .map((tc: any) => ({ id: tc.id, type: tc.type, function: tc.function })) if (cleanedToolCalls.length > 0) msg.tool_calls = cleanedToolCalls } return msg }) await calcAndUpdateUsage(sessionId, cState, emit) return compressed } catch (err: any) { const failedMeta = { event: 'compression.completed' as const, compressed: false, totalMessages: msgCount, resultMessages: msgCount, beforeTokens: totalTokens, afterTokens: totalTokens, summaryTokens: 0, verbatimCount: msgCount, compressedStartIndex: -1, error: err.message, } replaceState(sessionMap, sessionId, 'compression.completed', failedMeta) logger.warn(err, '[chat-run-socket] compression failed for session %s, using assembled context', sessionId) emit('compression.completed', failedMeta) return history } } export async function forceCompressBridgeHistory( sessionId: string, profile: string, _messages: ChatMessage[], ): Promise { const history = await buildDbHistory(sessionId, { excludeLastUser: true }) if (history.length === 0) { return { messages: [], beforeMessages: 0, resultMessages: 0, beforeTokens: 0, afterTokens: 0, compressed: false, llmCompressed: false, summaryTokens: 0, verbatimCount: 0, compressedStartIndex: -1, } } const upstream = '' const apiKey = undefined const session = getSession(sessionId) const contextLength = getModelContextLength({ profile, model: session?.model, provider: session?.provider }) const compressionConfig = await getRunChatCompressionConfig(session?.profile || profile, contextLength) const beforeUsage = estimateSnapshotAwareHistoryUsage(sessionId, history) const totalTokens = beforeUsage.tokenCount bridgeLogger.info({ sessionId, profile, historyMessages: history.length, snapshotAwareMessages: beforeUsage.messageCount, bridgeProvidedMessages: Array.isArray(_messages) ? _messages.length : 0, tokenEstimate: totalTokens, snapshotAware: true, }, '[chat-run-socket] bridge forced compression started') const compressor = new ChatContextCompressor({ config: compressionConfig.compressor }) const result = await compressor.compress(history, upstream, apiKey, sessionId, { profile: session?.profile || profile, model: session?.model, provider: session?.provider, }) const compressedMessages = result.messages.map(m => { const msg: any = { role: m.role, content: m.content } if (m.reasoning_content) msg.reasoning_content = m.reasoning_content if (m.tool_calls?.length) { const cleanedToolCalls = m.tool_calls .filter((tc: any) => tc.id && tc.id.length > 0) .map((tc: any) => ({ id: tc.id, type: tc.type, function: tc.function })) if (cleanedToolCalls.length > 0) msg.tool_calls = cleanedToolCalls } if (m.tool_call_id) msg.tool_call_id = m.tool_call_id if (m.name) msg.name = m.name return msg }) const afterUsage = estimateUsageTokensFromMessages(compressedMessages) const afterTokens = afterUsage.inputTokens + afterUsage.outputTokens bridgeLogger.info({ sessionId, profile, beforeMessages: history.length, resultMessages: result.messages.length, beforeTokens: totalTokens, afterTokens, compressed: result.meta.compressed, llmCompressed: result.meta.llmCompressed, verbatimCount: result.meta.verbatimCount, compressedStartIndex: result.meta.compressedStartIndex, compressedHistory: result.messages.map((m) => ({ role: m.role, content: m.content, reasoning_content: m.reasoning_content, tool_calls: m.tool_calls, tool_call_id: m.tool_call_id, name: m.name, })), }, '[chat-run-socket] bridge forced compression completed') return { messages: compressedMessages, beforeMessages: history.length, resultMessages: compressedMessages.length, beforeTokens: totalTokens, afterTokens, compressed: result.meta.compressed, llmCompressed: result.meta.llmCompressed, summaryTokens: result.meta.summaryTokenEstimate, verbatimCount: result.meta.verbatimCount, compressedStartIndex: result.meta.compressedStartIndex, } } // --- Shared state helpers (used by compression) --- export function getOrCreateSession(sessionMap: Map, sessionId: string): SessionState { let state = sessionMap.get(sessionId) if (!state) { state = { messages: [], isWorking: false, events: [], queue: [] } sessionMap.set(sessionId, state) } return state } export function pushState(sessionMap: Map, sessionId: string, event: string, data: any) { const state = getOrCreateSession(sessionMap, sessionId) state.events.push({ event, data }) } export function replaceState(sessionMap: Map, sessionId: string, event: string, data: any) { const state = sessionMap.get(sessionId) if (state) { const idx = state.events.findIndex(s => s.event === event) if (idx >= 0) { state.events[idx] = { event, data } return } } pushState(sessionMap, sessionId, event, data) }