cache group chat fixed context usage (#925)
This commit is contained in:
@@ -175,6 +175,12 @@ export class ContextEngine {
|
||||
logger.debug(`[ContextEngine] [Path A] OVER threshold — starting INCREMENTAL compression of ${newMessages.length} msgs...`)
|
||||
logger.debug(`[ContextEngine] [Path A] CONTEXT BEFORE COMPRESSION: summary(${snapshot.summary.length} chars) + ${newMessages.length} new msgs`)
|
||||
meta.compressed = true
|
||||
input.onProgress?.({
|
||||
status: 'compressing',
|
||||
path: 'snapshot',
|
||||
messageCount: newMessages.length,
|
||||
tokenCount: totalTokens,
|
||||
})
|
||||
|
||||
const t0 = Date.now()
|
||||
const result = await this.summarize(
|
||||
@@ -233,6 +239,12 @@ export class ContextEngine {
|
||||
logger.debug(`[ContextEngine] [Path B] OVER threshold — starting FULL compression of ${total} msgs...`)
|
||||
logger.debug(`[ContextEngine] [Path B] CONTEXT BEFORE COMPRESSION: ${total} msgs, ~${totalTokens} tokens`)
|
||||
meta.compressed = true
|
||||
input.onProgress?.({
|
||||
status: 'compressing',
|
||||
path: 'full',
|
||||
messageCount: total,
|
||||
tokenCount: totalTokens,
|
||||
})
|
||||
|
||||
const t0 = Date.now()
|
||||
const result = await this.summarize(
|
||||
|
||||
@@ -96,6 +96,13 @@ export interface GatewayCaller {
|
||||
|
||||
export type SessionCleaner = (sessionId: string) => void
|
||||
|
||||
export type ContextProgress = (event: {
|
||||
status: 'compressing'
|
||||
path: 'snapshot' | 'full'
|
||||
messageCount: number
|
||||
tokenCount: number
|
||||
}) => void
|
||||
|
||||
// ─── Build Context Input ───────────────────────────────────
|
||||
|
||||
export interface MemberInfo {
|
||||
@@ -122,4 +129,5 @@ export interface BuildContextInput {
|
||||
history: Array<{ role: 'user' | 'assistant'; content: string }>,
|
||||
instructions: string,
|
||||
) => Promise<number | null | undefined>
|
||||
onProgress?: ContextProgress
|
||||
}
|
||||
|
||||
@@ -3,7 +3,8 @@ import { randomBytes } from 'crypto'
|
||||
import { getToken } from '../../../services/auth'
|
||||
import { logger } from '../../../services/logger'
|
||||
import { updateUsage } from '../../../db/hermes/usage-store'
|
||||
import { AgentBridgeClient, type AgentBridgeMessage, type AgentBridgeOutput } from '../agent-bridge'
|
||||
import { countTokens } from '../../../lib/context-compressor'
|
||||
import { AgentBridgeClient, type AgentBridgeContextEstimate, type AgentBridgeMessage, type AgentBridgeOutput } from '../agent-bridge'
|
||||
import { convertContentBlocksForAgent, isContentBlockArray } from '../run-chat/content-blocks'
|
||||
import type { ContentBlock } from '../run-chat/types'
|
||||
import {
|
||||
@@ -42,6 +43,35 @@ type MentionMessage = {
|
||||
mentionDepth?: number
|
||||
}
|
||||
|
||||
type GroupEstimateMessage = { role: 'user' | 'assistant'; content: string }
|
||||
|
||||
interface BridgeContextCache {
|
||||
fixedContextTokens: number
|
||||
instructions?: string
|
||||
systemPromptTokens?: number
|
||||
toolTokens?: number
|
||||
systemPromptChars?: number
|
||||
toolCount?: number
|
||||
toolNames?: string[]
|
||||
profile?: string
|
||||
model?: string
|
||||
provider?: string
|
||||
}
|
||||
|
||||
export function estimateGroupHistoryMessageTokens(history: Array<{ content?: unknown }>): number {
|
||||
return history.reduce((sum, message) => sum + countTokens(String(message.content || '')), 0)
|
||||
}
|
||||
|
||||
export function groupContextTokensWithFixedOverhead(
|
||||
fixedContextTokens: number | null | undefined,
|
||||
history: Array<{ content?: unknown }>,
|
||||
): number | undefined {
|
||||
if (typeof fixedContextTokens !== 'number' || !Number.isFinite(fixedContextTokens) || fixedContextTokens < 0) {
|
||||
return undefined
|
||||
}
|
||||
return Math.floor(fixedContextTokens) + estimateGroupHistoryMessageTokens(history)
|
||||
}
|
||||
|
||||
interface MemberData {
|
||||
id: string
|
||||
name: string
|
||||
@@ -79,6 +109,7 @@ class AgentClient {
|
||||
private storage: any = null
|
||||
private pendingToolCallIds = new Map<string, string[]>()
|
||||
private pendingToolBaseIds = new Map<string, string>()
|
||||
private bridgeContextCache = new Map<string, BridgeContextCache>()
|
||||
|
||||
constructor(config: AgentConfig, handlers: AgentEventHandler = {}) {
|
||||
this.agentId = config.agentId || Date.now().toString(36) + Math.random().toString(36).slice(2, 8)
|
||||
@@ -150,6 +181,7 @@ class AgentClient {
|
||||
this.socket.disconnect()
|
||||
this.socket = null
|
||||
this.joinedRooms.clear()
|
||||
this.bridgeContextCache.clear()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -245,6 +277,89 @@ class AgentClient {
|
||||
return Array.from(this.joinedRooms)
|
||||
}
|
||||
|
||||
private finiteToken(value: unknown): number | undefined {
|
||||
return typeof value === 'number' && Number.isFinite(value) && value >= 0
|
||||
? Math.floor(value)
|
||||
: undefined
|
||||
}
|
||||
|
||||
private cacheBridgeContext(sessionId: string, data: Record<string, unknown> | AgentBridgeContextEstimate, instructions?: string): void {
|
||||
const fixedContextTokens = this.finiteToken(data.fixed_context_tokens)
|
||||
if (fixedContextTokens == null) return
|
||||
this.bridgeContextCache.set(sessionId, {
|
||||
fixedContextTokens,
|
||||
instructions,
|
||||
systemPromptTokens: this.finiteToken(data.system_prompt_tokens),
|
||||
toolTokens: this.finiteToken(data.tool_tokens),
|
||||
systemPromptChars: this.finiteToken(data.system_prompt_chars),
|
||||
toolCount: this.finiteToken(data.tool_count),
|
||||
toolNames: Array.isArray(data.tool_names) ? data.tool_names.map(String) : undefined,
|
||||
profile: typeof data.profile === 'string' ? data.profile : undefined,
|
||||
model: typeof data.model === 'string' ? data.model : undefined,
|
||||
provider: typeof data.provider === 'string' ? data.provider : undefined,
|
||||
})
|
||||
}
|
||||
|
||||
private estimateHistoryMessageTokens(history: GroupEstimateMessage[]): number {
|
||||
return estimateGroupHistoryMessageTokens(history)
|
||||
}
|
||||
|
||||
private estimateWithCachedBridgeContext(sessionId: string, history: GroupEstimateMessage[], instructions?: string): number | undefined {
|
||||
const cache = this.bridgeContextCache.get(sessionId)
|
||||
if (!cache) return undefined
|
||||
if (cache.instructions !== instructions) return undefined
|
||||
return groupContextTokensWithFixedOverhead(cache.fixedContextTokens, history)
|
||||
}
|
||||
|
||||
private async estimateGroupContextTokens(
|
||||
roomId: string,
|
||||
sessionId: string,
|
||||
bridge: AgentBridgeClient,
|
||||
history: GroupEstimateMessage[],
|
||||
instructions: string | undefined,
|
||||
phase: string,
|
||||
): Promise<number | undefined> {
|
||||
const cachedTokens = this.estimateWithCachedBridgeContext(sessionId, history, instructions)
|
||||
if (cachedTokens != null) {
|
||||
logger.info({
|
||||
roomId,
|
||||
agentName: this.name,
|
||||
profile: this.profile,
|
||||
sessionId,
|
||||
messages: history.length,
|
||||
fixedContextTokens: this.bridgeContextCache.get(sessionId)?.fixedContextTokens,
|
||||
messageTokens: cachedTokens - (this.bridgeContextCache.get(sessionId)?.fixedContextTokens || 0),
|
||||
fullContextTokens: cachedTokens,
|
||||
phase,
|
||||
source: 'cache',
|
||||
}, '[GroupChat] full context estimate')
|
||||
return cachedTokens
|
||||
}
|
||||
|
||||
const estimate = await bridge.contextEstimate(
|
||||
sessionId,
|
||||
history,
|
||||
instructions,
|
||||
this.profile,
|
||||
)
|
||||
this.cacheBridgeContext(sessionId, estimate, instructions)
|
||||
const totalTokens = Number(estimate.token_count || 0)
|
||||
logger.info({
|
||||
roomId,
|
||||
agentName: this.name,
|
||||
profile: this.profile,
|
||||
sessionId,
|
||||
messages: estimate.message_count,
|
||||
toolCount: estimate.tool_count,
|
||||
systemPromptChars: estimate.system_prompt_chars,
|
||||
fixedContextTokens: estimate.fixed_context_tokens,
|
||||
fullContextTokens: estimate.token_count,
|
||||
phase,
|
||||
source: 'bridge',
|
||||
}, '[GroupChat] full context estimate')
|
||||
return Number.isFinite(totalTokens) && totalTokens > 0 ? Math.floor(totalTokens) : undefined
|
||||
}
|
||||
|
||||
private ensureConnected(): void {
|
||||
if (!this.socket?.connected) {
|
||||
throw new Error(`Agent "${this.name}" is not connected`)
|
||||
@@ -285,7 +400,6 @@ class AgentClient {
|
||||
if (this.contextEngine && this.storage) {
|
||||
try {
|
||||
logger.debug(`[AgentClients] ${this.name}: building context...`)
|
||||
onStatus?.('compressing')
|
||||
// Get room members with descriptions for context
|
||||
const roomMembers: Array<{ userId: string; name: string; description: string }> = this.storage.getRoomMembers(roomId) || []
|
||||
const memberNames = roomMembers.map((m: any) => m.name)
|
||||
@@ -313,24 +427,21 @@ class AgentClient {
|
||||
currentMessage: msg,
|
||||
compression,
|
||||
profile: this.profile,
|
||||
onProgress: (event: { status: 'compressing'; messageCount: number; tokenCount: number }) => {
|
||||
onStatus?.('compressing', {
|
||||
messageCount: event.messageCount,
|
||||
totalTokens: event.tokenCount,
|
||||
})
|
||||
},
|
||||
contextTokenEstimator: async (history: Array<{ role: 'user' | 'assistant'; content: string }>, estimateInstructions: string) => {
|
||||
const estimate = await bridge.contextEstimate(
|
||||
return this.estimateGroupContextTokens(
|
||||
roomId,
|
||||
sessionId,
|
||||
bridge,
|
||||
history,
|
||||
estimateInstructions,
|
||||
this.profile,
|
||||
'build',
|
||||
)
|
||||
logger.info({
|
||||
roomId,
|
||||
agentName: this.name,
|
||||
profile: this.profile,
|
||||
sessionId,
|
||||
messages: estimate.message_count,
|
||||
toolCount: estimate.tool_count,
|
||||
systemPromptChars: estimate.system_prompt_chars,
|
||||
fullContextTokens: estimate.token_count,
|
||||
}, '[GroupChat] full context estimate')
|
||||
return estimate.token_count
|
||||
},
|
||||
})
|
||||
conversationHistory = ctx.conversationHistory
|
||||
@@ -382,7 +493,7 @@ class AgentClient {
|
||||
streamStarted = true
|
||||
for await (const chunk of bridge.streamOutput(started.run_id, { timeoutMs: 120000 })) {
|
||||
lastChunk = chunk
|
||||
reasoningContent += await this.recordBridgeEvents(roomId, chunk, () => streamMessageId, async () => {
|
||||
reasoningContent += await this.recordBridgeEvents(roomId, sessionId, instructions, chunk, () => streamMessageId, async () => {
|
||||
const toolBaseId = streamMessageId
|
||||
if (currentContent.trim()) {
|
||||
await this.sendMessage(roomId, currentContent, streamMessageId, {
|
||||
@@ -462,28 +573,18 @@ class AgentClient {
|
||||
if (!this.storage?.getMessages) return
|
||||
try {
|
||||
const history = this.buildRoomEstimateHistory(roomId)
|
||||
const estimate = await bridge.contextEstimate(
|
||||
const cachedTokens = await this.estimateGroupContextTokens(
|
||||
roomId,
|
||||
sessionId,
|
||||
bridge,
|
||||
history,
|
||||
instructions,
|
||||
this.profile,
|
||||
'final',
|
||||
)
|
||||
const totalTokens = Number(estimate.token_count || 0)
|
||||
if (!Number.isFinite(totalTokens) || totalTokens <= 0) return
|
||||
const rounded = Math.floor(totalTokens)
|
||||
if (cachedTokens == null || cachedTokens <= 0) return
|
||||
const rounded = Math.floor(cachedTokens)
|
||||
this.storage.updateRoomTotalTokens?.(roomId, rounded)
|
||||
this.emitContextStatus(roomId, 'replying', { totalTokens: rounded })
|
||||
logger.info({
|
||||
roomId,
|
||||
agentName: this.name,
|
||||
profile: this.profile,
|
||||
sessionId,
|
||||
messages: estimate.message_count,
|
||||
toolCount: estimate.tool_count,
|
||||
systemPromptChars: estimate.system_prompt_chars,
|
||||
fullContextTokens: rounded,
|
||||
phase: 'final',
|
||||
}, '[GroupChat] full context estimate')
|
||||
} catch (err: any) {
|
||||
logger.warn(`[GroupChat] failed to refresh final context estimate room=${roomId} agent=${this.name}: ${err.message}`)
|
||||
}
|
||||
@@ -561,6 +662,8 @@ class AgentClient {
|
||||
|
||||
private async recordBridgeEvents(
|
||||
roomId: string,
|
||||
sessionId: string,
|
||||
instructions: string | undefined,
|
||||
chunk: AgentBridgeOutput,
|
||||
getCurrentMessageId: () => string,
|
||||
beforeToolStarted: () => Promise<string>,
|
||||
@@ -568,7 +671,9 @@ class AgentClient {
|
||||
let reasoning = ''
|
||||
for (const ev of chunk.events || []) {
|
||||
const eventType = String((ev as any)?.event || '')
|
||||
if (eventType === 'tool.started') {
|
||||
if (eventType === 'bridge.context.ready') {
|
||||
this.cacheBridgeContext(sessionId, ev as Record<string, unknown>, instructions)
|
||||
} else if (eventType === 'tool.started') {
|
||||
const toolBaseId = await beforeToolStarted()
|
||||
this.recordToolStarted(roomId, ev as Record<string, unknown>, toolBaseId)
|
||||
} else if (eventType === 'tool.completed') {
|
||||
|
||||
@@ -207,6 +207,7 @@ describe('ContextEngine.buildContext', () => {
|
||||
const messages = makeMessages(3)
|
||||
mockFetcher.getMessages = vi.fn().mockReturnValue(messages)
|
||||
const contextTokenEstimator = vi.fn().mockResolvedValue(19_379)
|
||||
const onProgress = vi.fn()
|
||||
|
||||
const result = await engine.buildContext({
|
||||
roomId: 'room-1',
|
||||
@@ -221,6 +222,7 @@ describe('ContextEngine.buildContext', () => {
|
||||
apiKey: null,
|
||||
currentMessage: messages[messages.length - 1],
|
||||
contextTokenEstimator,
|
||||
onProgress,
|
||||
})
|
||||
|
||||
expect(result.meta.compressed).toBe(false)
|
||||
@@ -231,11 +233,13 @@ describe('ContextEngine.buildContext', () => {
|
||||
expect.stringContaining('"Claude"'),
|
||||
)
|
||||
expect(mockSummarize).not.toHaveBeenCalled()
|
||||
expect(onProgress).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('uses full context token estimates to trigger group compression', async () => {
|
||||
const messages = makeMessages(20)
|
||||
mockFetcher.getMessages = vi.fn().mockReturnValue(messages)
|
||||
const onProgress = vi.fn()
|
||||
|
||||
const result = await engine.buildContext({
|
||||
roomId: 'room-1',
|
||||
@@ -250,12 +254,19 @@ describe('ContextEngine.buildContext', () => {
|
||||
apiKey: null,
|
||||
currentMessage: messages[messages.length - 1],
|
||||
contextTokenEstimator: vi.fn().mockResolvedValue(120_000),
|
||||
onProgress,
|
||||
})
|
||||
|
||||
expect(result.meta.compressed).toBe(true)
|
||||
expect(result.meta.contextTokenEstimate).toBe(120_000)
|
||||
expect(mockSummarize).toHaveBeenCalledTimes(1)
|
||||
expect(mockFetcher.saveContextSnapshot).toHaveBeenCalledTimes(1)
|
||||
expect(onProgress).toHaveBeenCalledWith({
|
||||
status: 'compressing',
|
||||
path: 'full',
|
||||
messageCount: 20,
|
||||
tokenCount: 120_000,
|
||||
})
|
||||
})
|
||||
|
||||
it('throws when group prompt and tools exceed threshold with too little history to compress', async () => {
|
||||
@@ -413,6 +424,7 @@ describe('ContextEngine.buildContext', () => {
|
||||
const updatedMessages = [...messages.slice(0, 9), middleInsert, ...messages.slice(9)]
|
||||
mockFetcher.getMessages = vi.fn().mockReturnValue(updatedMessages)
|
||||
|
||||
const onProgress = vi.fn()
|
||||
// Second call — incremental update
|
||||
await engine.buildContext({
|
||||
roomId: 'room-1', agentId: 'agent-1', agentName: 'Claude',
|
||||
@@ -420,12 +432,17 @@ describe('ContextEngine.buildContext', () => {
|
||||
memberNames: [], members: [], upstream: 'http://localhost:8642', apiKey: null,
|
||||
currentMessage: updatedMessages[updatedMessages.length - 1],
|
||||
compression: { triggerTokens: 10 },
|
||||
onProgress,
|
||||
})
|
||||
|
||||
expect(mockSummarize).toHaveBeenCalledTimes(2)
|
||||
// Second call: has previousSummary
|
||||
const secondCallArgs = mockSummarize.mock.calls[1]
|
||||
expect(secondCallArgs[6]).toBe('Summary of conversation.')
|
||||
expect(onProgress).toHaveBeenCalledWith(expect.objectContaining({
|
||||
status: 'compressing',
|
||||
path: 'snapshot',
|
||||
}))
|
||||
})
|
||||
|
||||
it('falls back to no-summary on LLM failure', async () => {
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
import { describe, expect, it } from 'vitest'
|
||||
import { countTokens } from '../../packages/server/src/lib/context-compressor'
|
||||
import {
|
||||
estimateGroupHistoryMessageTokens,
|
||||
groupContextTokensWithFixedOverhead,
|
||||
} from '../../packages/server/src/services/hermes/group-chat/agent-clients'
|
||||
|
||||
describe('group chat fixed context cache helpers', () => {
|
||||
it('adds cached fixed context to group chat message tokens', () => {
|
||||
const history = [
|
||||
{ role: 'user', content: '[Alice]: hello' },
|
||||
{ role: 'assistant', content: '[Bot]: hi there' },
|
||||
]
|
||||
|
||||
const messageTokens = estimateGroupHistoryMessageTokens(history)
|
||||
|
||||
expect(messageTokens).toBe(countTokens('[Alice]: hello') + countTokens('[Bot]: hi there'))
|
||||
expect(groupContextTokensWithFixedOverhead(20_000, history)).toBe(20_000 + messageTokens)
|
||||
})
|
||||
|
||||
it('signals fallback when fixed context is unavailable', () => {
|
||||
expect(groupContextTokensWithFixedOverhead(undefined, [{ content: 'hello' }])).toBeUndefined()
|
||||
expect(groupContextTokensWithFixedOverhead(null, [{ content: 'hello' }])).toBeUndefined()
|
||||
})
|
||||
})
|
||||
Reference in New Issue
Block a user