From 4b759c4d8aab70b60ad349795d8e5fce16265f30 Mon Sep 17 00:00:00 2001 From: ekko <152005280+EKKOLearnAI@users.noreply.github.com> Date: Fri, 22 May 2026 10:20:39 +0800 Subject: [PATCH] cache group chat fixed context usage (#925) --- .../hermes/context-engine/compressor.ts | 12 ++ .../services/hermes/context-engine/types.ts | 8 + .../hermes/group-chat/agent-clients.ts | 171 ++++++++++++++---- tests/server/context-engine.test.ts | 17 ++ tests/server/group-chat-context-cache.test.ts | 25 +++ 5 files changed, 200 insertions(+), 33 deletions(-) create mode 100644 tests/server/group-chat-context-cache.test.ts diff --git a/packages/server/src/services/hermes/context-engine/compressor.ts b/packages/server/src/services/hermes/context-engine/compressor.ts index 813847d..c31b5d1 100644 --- a/packages/server/src/services/hermes/context-engine/compressor.ts +++ b/packages/server/src/services/hermes/context-engine/compressor.ts @@ -175,6 +175,12 @@ export class ContextEngine { logger.debug(`[ContextEngine] [Path A] OVER threshold — starting INCREMENTAL compression of ${newMessages.length} msgs...`) logger.debug(`[ContextEngine] [Path A] CONTEXT BEFORE COMPRESSION: summary(${snapshot.summary.length} chars) + ${newMessages.length} new msgs`) meta.compressed = true + input.onProgress?.({ + status: 'compressing', + path: 'snapshot', + messageCount: newMessages.length, + tokenCount: totalTokens, + }) const t0 = Date.now() const result = await this.summarize( @@ -233,6 +239,12 @@ export class ContextEngine { logger.debug(`[ContextEngine] [Path B] OVER threshold — starting FULL compression of ${total} msgs...`) logger.debug(`[ContextEngine] [Path B] CONTEXT BEFORE COMPRESSION: ${total} msgs, ~${totalTokens} tokens`) meta.compressed = true + input.onProgress?.({ + status: 'compressing', + path: 'full', + messageCount: total, + tokenCount: totalTokens, + }) const t0 = Date.now() const result = await this.summarize( diff --git a/packages/server/src/services/hermes/context-engine/types.ts b/packages/server/src/services/hermes/context-engine/types.ts index 71fc3d4..5376427 100644 --- a/packages/server/src/services/hermes/context-engine/types.ts +++ b/packages/server/src/services/hermes/context-engine/types.ts @@ -96,6 +96,13 @@ export interface GatewayCaller { export type SessionCleaner = (sessionId: string) => void +export type ContextProgress = (event: { + status: 'compressing' + path: 'snapshot' | 'full' + messageCount: number + tokenCount: number +}) => void + // ─── Build Context Input ─────────────────────────────────── export interface MemberInfo { @@ -122,4 +129,5 @@ export interface BuildContextInput { history: Array<{ role: 'user' | 'assistant'; content: string }>, instructions: string, ) => Promise + onProgress?: ContextProgress } diff --git a/packages/server/src/services/hermes/group-chat/agent-clients.ts b/packages/server/src/services/hermes/group-chat/agent-clients.ts index c5d95da..66fc698 100644 --- a/packages/server/src/services/hermes/group-chat/agent-clients.ts +++ b/packages/server/src/services/hermes/group-chat/agent-clients.ts @@ -3,7 +3,8 @@ import { randomBytes } from 'crypto' import { getToken } from '../../../services/auth' import { logger } from '../../../services/logger' import { updateUsage } from '../../../db/hermes/usage-store' -import { AgentBridgeClient, type AgentBridgeMessage, type AgentBridgeOutput } from '../agent-bridge' +import { countTokens } from '../../../lib/context-compressor' +import { AgentBridgeClient, type AgentBridgeContextEstimate, type AgentBridgeMessage, type AgentBridgeOutput } from '../agent-bridge' import { convertContentBlocksForAgent, isContentBlockArray } from '../run-chat/content-blocks' import type { ContentBlock } from '../run-chat/types' import { @@ -42,6 +43,35 @@ type MentionMessage = { mentionDepth?: number } +type GroupEstimateMessage = { role: 'user' | 'assistant'; content: string } + +interface BridgeContextCache { + fixedContextTokens: number + instructions?: string + systemPromptTokens?: number + toolTokens?: number + systemPromptChars?: number + toolCount?: number + toolNames?: string[] + profile?: string + model?: string + provider?: string +} + +export function estimateGroupHistoryMessageTokens(history: Array<{ content?: unknown }>): number { + return history.reduce((sum, message) => sum + countTokens(String(message.content || '')), 0) +} + +export function groupContextTokensWithFixedOverhead( + fixedContextTokens: number | null | undefined, + history: Array<{ content?: unknown }>, +): number | undefined { + if (typeof fixedContextTokens !== 'number' || !Number.isFinite(fixedContextTokens) || fixedContextTokens < 0) { + return undefined + } + return Math.floor(fixedContextTokens) + estimateGroupHistoryMessageTokens(history) +} + interface MemberData { id: string name: string @@ -79,6 +109,7 @@ class AgentClient { private storage: any = null private pendingToolCallIds = new Map() private pendingToolBaseIds = new Map() + private bridgeContextCache = new Map() constructor(config: AgentConfig, handlers: AgentEventHandler = {}) { this.agentId = config.agentId || Date.now().toString(36) + Math.random().toString(36).slice(2, 8) @@ -150,6 +181,7 @@ class AgentClient { this.socket.disconnect() this.socket = null this.joinedRooms.clear() + this.bridgeContextCache.clear() } } @@ -245,6 +277,89 @@ class AgentClient { return Array.from(this.joinedRooms) } + private finiteToken(value: unknown): number | undefined { + return typeof value === 'number' && Number.isFinite(value) && value >= 0 + ? Math.floor(value) + : undefined + } + + private cacheBridgeContext(sessionId: string, data: Record | AgentBridgeContextEstimate, instructions?: string): void { + const fixedContextTokens = this.finiteToken(data.fixed_context_tokens) + if (fixedContextTokens == null) return + this.bridgeContextCache.set(sessionId, { + fixedContextTokens, + instructions, + systemPromptTokens: this.finiteToken(data.system_prompt_tokens), + toolTokens: this.finiteToken(data.tool_tokens), + systemPromptChars: this.finiteToken(data.system_prompt_chars), + toolCount: this.finiteToken(data.tool_count), + toolNames: Array.isArray(data.tool_names) ? data.tool_names.map(String) : undefined, + profile: typeof data.profile === 'string' ? data.profile : undefined, + model: typeof data.model === 'string' ? data.model : undefined, + provider: typeof data.provider === 'string' ? data.provider : undefined, + }) + } + + private estimateHistoryMessageTokens(history: GroupEstimateMessage[]): number { + return estimateGroupHistoryMessageTokens(history) + } + + private estimateWithCachedBridgeContext(sessionId: string, history: GroupEstimateMessage[], instructions?: string): number | undefined { + const cache = this.bridgeContextCache.get(sessionId) + if (!cache) return undefined + if (cache.instructions !== instructions) return undefined + return groupContextTokensWithFixedOverhead(cache.fixedContextTokens, history) + } + + private async estimateGroupContextTokens( + roomId: string, + sessionId: string, + bridge: AgentBridgeClient, + history: GroupEstimateMessage[], + instructions: string | undefined, + phase: string, + ): Promise { + const cachedTokens = this.estimateWithCachedBridgeContext(sessionId, history, instructions) + if (cachedTokens != null) { + logger.info({ + roomId, + agentName: this.name, + profile: this.profile, + sessionId, + messages: history.length, + fixedContextTokens: this.bridgeContextCache.get(sessionId)?.fixedContextTokens, + messageTokens: cachedTokens - (this.bridgeContextCache.get(sessionId)?.fixedContextTokens || 0), + fullContextTokens: cachedTokens, + phase, + source: 'cache', + }, '[GroupChat] full context estimate') + return cachedTokens + } + + const estimate = await bridge.contextEstimate( + sessionId, + history, + instructions, + this.profile, + ) + this.cacheBridgeContext(sessionId, estimate, instructions) + const totalTokens = Number(estimate.token_count || 0) + logger.info({ + roomId, + agentName: this.name, + profile: this.profile, + sessionId, + messages: estimate.message_count, + toolCount: estimate.tool_count, + systemPromptChars: estimate.system_prompt_chars, + fixedContextTokens: estimate.fixed_context_tokens, + fullContextTokens: estimate.token_count, + phase, + source: 'bridge', + }, '[GroupChat] full context estimate') + return Number.isFinite(totalTokens) && totalTokens > 0 ? Math.floor(totalTokens) : undefined + } + private ensureConnected(): void { if (!this.socket?.connected) { throw new Error(`Agent "${this.name}" is not connected`) @@ -285,7 +400,6 @@ class AgentClient { if (this.contextEngine && this.storage) { try { logger.debug(`[AgentClients] ${this.name}: building context...`) - onStatus?.('compressing') // Get room members with descriptions for context const roomMembers: Array<{ userId: string; name: string; description: string }> = this.storage.getRoomMembers(roomId) || [] const memberNames = roomMembers.map((m: any) => m.name) @@ -313,24 +427,21 @@ class AgentClient { currentMessage: msg, compression, profile: this.profile, + onProgress: (event: { status: 'compressing'; messageCount: number; tokenCount: number }) => { + onStatus?.('compressing', { + messageCount: event.messageCount, + totalTokens: event.tokenCount, + }) + }, contextTokenEstimator: async (history: Array<{ role: 'user' | 'assistant'; content: string }>, estimateInstructions: string) => { - const estimate = await bridge.contextEstimate( + return this.estimateGroupContextTokens( + roomId, sessionId, + bridge, history, estimateInstructions, - this.profile, + 'build', ) - logger.info({ - roomId, - agentName: this.name, - profile: this.profile, - sessionId, - messages: estimate.message_count, - toolCount: estimate.tool_count, - systemPromptChars: estimate.system_prompt_chars, - fullContextTokens: estimate.token_count, - }, '[GroupChat] full context estimate') - return estimate.token_count }, }) conversationHistory = ctx.conversationHistory @@ -382,7 +493,7 @@ class AgentClient { streamStarted = true for await (const chunk of bridge.streamOutput(started.run_id, { timeoutMs: 120000 })) { lastChunk = chunk - reasoningContent += await this.recordBridgeEvents(roomId, chunk, () => streamMessageId, async () => { + reasoningContent += await this.recordBridgeEvents(roomId, sessionId, instructions, chunk, () => streamMessageId, async () => { const toolBaseId = streamMessageId if (currentContent.trim()) { await this.sendMessage(roomId, currentContent, streamMessageId, { @@ -462,28 +573,18 @@ class AgentClient { if (!this.storage?.getMessages) return try { const history = this.buildRoomEstimateHistory(roomId) - const estimate = await bridge.contextEstimate( + const cachedTokens = await this.estimateGroupContextTokens( + roomId, sessionId, + bridge, history, instructions, - this.profile, + 'final', ) - const totalTokens = Number(estimate.token_count || 0) - if (!Number.isFinite(totalTokens) || totalTokens <= 0) return - const rounded = Math.floor(totalTokens) + if (cachedTokens == null || cachedTokens <= 0) return + const rounded = Math.floor(cachedTokens) this.storage.updateRoomTotalTokens?.(roomId, rounded) this.emitContextStatus(roomId, 'replying', { totalTokens: rounded }) - logger.info({ - roomId, - agentName: this.name, - profile: this.profile, - sessionId, - messages: estimate.message_count, - toolCount: estimate.tool_count, - systemPromptChars: estimate.system_prompt_chars, - fullContextTokens: rounded, - phase: 'final', - }, '[GroupChat] full context estimate') } catch (err: any) { logger.warn(`[GroupChat] failed to refresh final context estimate room=${roomId} agent=${this.name}: ${err.message}`) } @@ -561,6 +662,8 @@ class AgentClient { private async recordBridgeEvents( roomId: string, + sessionId: string, + instructions: string | undefined, chunk: AgentBridgeOutput, getCurrentMessageId: () => string, beforeToolStarted: () => Promise, @@ -568,7 +671,9 @@ class AgentClient { let reasoning = '' for (const ev of chunk.events || []) { const eventType = String((ev as any)?.event || '') - if (eventType === 'tool.started') { + if (eventType === 'bridge.context.ready') { + this.cacheBridgeContext(sessionId, ev as Record, instructions) + } else if (eventType === 'tool.started') { const toolBaseId = await beforeToolStarted() this.recordToolStarted(roomId, ev as Record, toolBaseId) } else if (eventType === 'tool.completed') { diff --git a/tests/server/context-engine.test.ts b/tests/server/context-engine.test.ts index f782a91..56b6bf5 100644 --- a/tests/server/context-engine.test.ts +++ b/tests/server/context-engine.test.ts @@ -207,6 +207,7 @@ describe('ContextEngine.buildContext', () => { const messages = makeMessages(3) mockFetcher.getMessages = vi.fn().mockReturnValue(messages) const contextTokenEstimator = vi.fn().mockResolvedValue(19_379) + const onProgress = vi.fn() const result = await engine.buildContext({ roomId: 'room-1', @@ -221,6 +222,7 @@ describe('ContextEngine.buildContext', () => { apiKey: null, currentMessage: messages[messages.length - 1], contextTokenEstimator, + onProgress, }) expect(result.meta.compressed).toBe(false) @@ -231,11 +233,13 @@ describe('ContextEngine.buildContext', () => { expect.stringContaining('"Claude"'), ) expect(mockSummarize).not.toHaveBeenCalled() + expect(onProgress).not.toHaveBeenCalled() }) it('uses full context token estimates to trigger group compression', async () => { const messages = makeMessages(20) mockFetcher.getMessages = vi.fn().mockReturnValue(messages) + const onProgress = vi.fn() const result = await engine.buildContext({ roomId: 'room-1', @@ -250,12 +254,19 @@ describe('ContextEngine.buildContext', () => { apiKey: null, currentMessage: messages[messages.length - 1], contextTokenEstimator: vi.fn().mockResolvedValue(120_000), + onProgress, }) expect(result.meta.compressed).toBe(true) expect(result.meta.contextTokenEstimate).toBe(120_000) expect(mockSummarize).toHaveBeenCalledTimes(1) expect(mockFetcher.saveContextSnapshot).toHaveBeenCalledTimes(1) + expect(onProgress).toHaveBeenCalledWith({ + status: 'compressing', + path: 'full', + messageCount: 20, + tokenCount: 120_000, + }) }) it('throws when group prompt and tools exceed threshold with too little history to compress', async () => { @@ -413,6 +424,7 @@ describe('ContextEngine.buildContext', () => { const updatedMessages = [...messages.slice(0, 9), middleInsert, ...messages.slice(9)] mockFetcher.getMessages = vi.fn().mockReturnValue(updatedMessages) + const onProgress = vi.fn() // Second call — incremental update await engine.buildContext({ roomId: 'room-1', agentId: 'agent-1', agentName: 'Claude', @@ -420,12 +432,17 @@ describe('ContextEngine.buildContext', () => { memberNames: [], members: [], upstream: 'http://localhost:8642', apiKey: null, currentMessage: updatedMessages[updatedMessages.length - 1], compression: { triggerTokens: 10 }, + onProgress, }) expect(mockSummarize).toHaveBeenCalledTimes(2) // Second call: has previousSummary const secondCallArgs = mockSummarize.mock.calls[1] expect(secondCallArgs[6]).toBe('Summary of conversation.') + expect(onProgress).toHaveBeenCalledWith(expect.objectContaining({ + status: 'compressing', + path: 'snapshot', + })) }) it('falls back to no-summary on LLM failure', async () => { diff --git a/tests/server/group-chat-context-cache.test.ts b/tests/server/group-chat-context-cache.test.ts new file mode 100644 index 0000000..1988504 --- /dev/null +++ b/tests/server/group-chat-context-cache.test.ts @@ -0,0 +1,25 @@ +import { describe, expect, it } from 'vitest' +import { countTokens } from '../../packages/server/src/lib/context-compressor' +import { + estimateGroupHistoryMessageTokens, + groupContextTokensWithFixedOverhead, +} from '../../packages/server/src/services/hermes/group-chat/agent-clients' + +describe('group chat fixed context cache helpers', () => { + it('adds cached fixed context to group chat message tokens', () => { + const history = [ + { role: 'user', content: '[Alice]: hello' }, + { role: 'assistant', content: '[Bot]: hi there' }, + ] + + const messageTokens = estimateGroupHistoryMessageTokens(history) + + expect(messageTokens).toBe(countTokens('[Alice]: hello') + countTokens('[Bot]: hi there')) + expect(groupContextTokensWithFixedOverhead(20_000, history)).toBe(20_000 + messageTokens) + }) + + it('signals fallback when fixed context is unavailable', () => { + expect(groupContextTokensWithFixedOverhead(undefined, [{ content: 'hello' }])).toBeUndefined() + expect(groupContextTokensWithFixedOverhead(null, [{ content: 'hello' }])).toBeUndefined() + }) +})