Account for full context tokens in compression (#908)

* Account for full context tokens in compression

* Fix group chat final context updates

---------

Co-authored-by: Codex <codex@openai.com>
This commit is contained in:
ekko
2026-05-21 19:40:52 +08:00
committed by GitHub
parent b2ec321990
commit 39ead94352
16 changed files with 730 additions and 35 deletions
+1 -1
View File
@@ -488,7 +488,7 @@ function removeSocketListener(socket: Socket, event: string, handler: (...args:
*/
export function resumeSession(
sessionId: string,
onResumed: (data: { session_id: string; messages: any[]; isWorking: boolean; isAborting?: boolean; events: any[]; inputTokens?: number; outputTokens?: number; queueLength?: number }) => void,
onResumed: (data: { session_id: string; messages: any[]; isWorking: boolean; isAborting?: boolean; events: any[]; inputTokens?: number; outputTokens?: number; contextTokens?: number; queueLength?: number }) => void,
): Socket {
const socket = connectChatRun()
@@ -204,6 +204,8 @@ watch(() => chatStore.activeSession?.provider, loadContextLength)
watch(() => chatStore.activeSession?.model, loadContextLength)
const totalTokens = computed(() => {
const context = chatStore.activeSession?.contextTokens
if (typeof context === 'number' && Number.isFinite(context) && context > 0) return context
const input = chatStore.activeSession?.inputTokens ?? 0
const output = chatStore.activeSession?.outputTokens ?? 0
return input + output
@@ -69,6 +69,7 @@ export interface Session {
messageCount?: number
inputTokens?: number
outputTokens?: number
contextTokens?: number
endedAt?: number | null
lastActiveAt?: number
workspace?: string | null
@@ -529,6 +530,7 @@ export const useChatStore = defineStore('chat', () => {
}
if (data.inputTokens != null) activeSession.value!.inputTokens = data.inputTokens
if (data.outputTokens != null) activeSession.value!.outputTokens = data.outputTokens
if ((data as any).contextTokens != null) activeSession.value!.contextTokens = (data as any).contextTokens
if (data.messages?.length) {
activeSession.value!.messages = mapHermesMessages(data.messages as any[])
}
@@ -755,6 +757,7 @@ export const useChatStore = defineStore('chat', () => {
if (action === 'usage' && target) {
target.inputTokens = (evt as any).inputTokens
target.outputTokens = (evt as any).outputTokens
if ((evt as any).contextTokens != null) target.contextTokens = (evt as any).contextTokens
}
if (action === 'destroy') {
@@ -1258,6 +1261,7 @@ export const useChatStore = defineStore('chat', () => {
if (target) {
target.inputTokens = (evt as any).inputTokens
target.outputTokens = (evt as any).outputTokens
if ((evt as any).contextTokens != null) target.contextTokens = (evt as any).contextTokens
}
}
// Belt-and-suspenders: some providers may deliver the final
@@ -1367,6 +1371,7 @@ export const useChatStore = defineStore('chat', () => {
if (target) {
target.inputTokens = (evt as any).inputTokens
target.outputTokens = (evt as any).outputTokens
if ((evt as any).contextTokens != null) target.contextTokens = (evt as any).contextTokens
}
break
}
@@ -1689,6 +1694,7 @@ export const useChatStore = defineStore('chat', () => {
if (target) {
target.inputTokens = (evt as any).inputTokens
target.outputTokens = (evt as any).outputTokens
if ((evt as any).contextTokens != null) target.contextTokens = (evt as any).contextTokens
}
}
// Check if backend provided parsed content (from stringified array format)
@@ -1782,6 +1788,7 @@ export const useChatStore = defineStore('chat', () => {
if (target) {
target.inputTokens = (evt as any).inputTokens
target.outputTokens = (evt as any).outputTokens
if ((evt as any).contextTokens != null) target.contextTokens = (evt as any).contextTokens
}
break
}
@@ -90,6 +90,14 @@ export interface AgentBridgeRunResult extends AgentBridgeResponse {
error?: string | null
}
export interface AgentBridgeContextEstimate extends AgentBridgeResponse {
session_id: string
token_count?: number | null
message_count: number
tool_count: number
system_prompt_chars: number
}
export interface AgentBridgeCommandResult extends AgentBridgeResponse {
session_id: string
command: string
@@ -372,6 +380,24 @@ export class AgentBridgeClient {
})
}
contextEstimate(
sessionId: string,
messages: unknown[],
instructions?: string,
profile?: string,
options: Pick<AgentBridgeChatOptions, 'model' | 'provider'> = {},
): Promise<AgentBridgeContextEstimate> {
return this.request<AgentBridgeContextEstimate>({
action: 'context_estimate',
session_id: sessionId,
messages,
...(instructions ? { instructions } : {}),
...(profile ? { profile } : {}),
...(options.model ? { model: options.model } : {}),
...(options.provider ? { provider: options.provider } : {}),
})
}
command(sessionId: string, command: string): Promise<AgentBridgeCommandResult> {
return this.request<AgentBridgeCommandResult>({
action: 'command',
@@ -602,6 +602,17 @@ class AgentPool:
def wrapped_compress_context(messages, system_message, **kwargs):
before_count = len(messages) if isinstance(messages, list) else 0
approx_tokens = kwargs.get("approx_tokens")
if not isinstance(approx_tokens, int) or approx_tokens <= 0:
approx_tokens = self._estimate_context_tokens(agent, messages, system_message)
print(
"[hermes_bridge] compression requested "
f"session={session_id} messages={before_count} "
f"tokens={approx_tokens if approx_tokens is not None else 'unknown'} "
f"focus={kwargs.get('focus_topic') or ''}",
file=sys.stderr,
flush=True,
)
request_id = uuid.uuid4().hex
response_queue: queue.Queue[dict[str, Any]] = queue.Queue(maxsize=1)
with self._lock:
@@ -610,7 +621,7 @@ class AgentPool:
"event": "bridge.compression.requested",
"request_id": request_id,
"message_count": before_count,
"approx_tokens": kwargs.get("approx_tokens"),
"approx_tokens": approx_tokens,
"focus_topic": kwargs.get("focus_topic"),
"messages": _jsonable(messages),
})
@@ -622,12 +633,14 @@ class AgentPool:
if not isinstance(compressed_messages, list):
raise RuntimeError("bridge compression response missing messages")
next_system_message = response.get("system_message", system_message)
result_approx_tokens = self._estimate_context_tokens(agent, compressed_messages, next_system_message)
self._append_event(session_id, {
"event": "bridge.compression.completed",
"request_id": request_id,
"message_count": before_count,
"result_messages": len(compressed_messages),
"approx_tokens": kwargs.get("approx_tokens"),
"approx_tokens": approx_tokens,
"result_approx_tokens": result_approx_tokens,
"compressed": True,
})
return compressed_messages, next_system_message
@@ -636,7 +649,7 @@ class AgentPool:
"event": "bridge.compression.failed",
"request_id": request_id,
"message_count": before_count,
"approx_tokens": kwargs.get("approx_tokens"),
"approx_tokens": approx_tokens,
"error": "bridge compression timed out",
})
raise RuntimeError("bridge compression timed out")
@@ -645,7 +658,7 @@ class AgentPool:
"event": "bridge.compression.failed",
"request_id": request_id,
"message_count": before_count,
"approx_tokens": kwargs.get("approx_tokens"),
"approx_tokens": approx_tokens,
"error": str(exc),
})
raise
@@ -655,6 +668,61 @@ class AgentPool:
agent._compress_context = wrapped_compress_context
def _estimate_context_tokens(self, agent: Any, messages: Any, system_message: Any = None) -> int | None:
try:
from agent.model_metadata import estimate_request_tokens_rough
except Exception:
return None
prompt = str(getattr(agent, "_cached_system_prompt", "") or "")
if not prompt:
try:
build_prompt = getattr(agent, "_build_system_prompt", None)
if callable(build_prompt):
prompt = str(build_prompt(system_message) or "")
except Exception:
prompt = str(system_message or "")
try:
estimate = estimate_request_tokens_rough(
messages if isinstance(messages, list) else [],
system_prompt=prompt,
tools=getattr(agent, "tools", None) or None,
)
return int(estimate) if isinstance(estimate, (int, float)) and estimate > 0 else None
except Exception:
return None
def estimate_context(
self,
session_id: str,
messages: list[dict[str, Any]] | None = None,
instructions: str | None = None,
profile: str | None = None,
model: str | None = None,
provider: str | None = None,
) -> dict[str, Any]:
session = self.get_or_create(session_id, profile=profile, model=model, provider=provider)
token_count = self._estimate_context_tokens(session.agent, messages or [], instructions)
tools = getattr(session.agent, "tools", None) or []
prompt = str(getattr(session.agent, "_cached_system_prompt", "") or "")
print(
"[hermes_bridge] context estimate "
f"session={session_id} profile={profile or 'default'} "
f"messages={len(messages or [])} system_prompt_chars={len(prompt)} "
f"tools={len(tools) if isinstance(tools, list) else 0} "
f"tokens={token_count if token_count is not None else 'unknown'}",
file=sys.stderr,
flush=True,
)
return {
"session_id": session_id,
"token_count": token_count,
"message_count": len(messages or []),
"tool_count": len(tools) if isinstance(tools, list) else 0,
"system_prompt_chars": len(prompt),
}
def respond_compression(
self,
request_id: str,
@@ -1329,6 +1397,20 @@ class BridgeServer:
return self.pool.get_result(record.run_id)
return {"run_id": record.run_id, "session_id": session_id, "status": record.status}
if action == "context_estimate":
session_id = str(req.get("session_id") or "").strip() or uuid.uuid4().hex
messages = req.get("messages") or req.get("conversation_history") or []
if not isinstance(messages, list):
raise ValueError("messages must be a list")
return self.pool.estimate_context(
session_id,
messages=messages,
instructions=req.get("instructions") or req.get("system_message"),
profile=req.get("profile"),
model=req.get("model"),
provider=req.get("provider"),
)
if action == "get_result":
return self.pool.get_result(str(req.get("run_id") or ""))
@@ -1870,6 +1952,10 @@ class BridgeBroker:
profile = self._normalize_profile(req.get("profile"))
return self._forward(profile, req)
if action == "context_estimate":
profile = self._normalize_profile(req.get("profile"))
return self._forward(profile, req)
if action in {"get_result", "get_output"}:
profile = self._profile_for_run(str(req.get("run_id") or ""))
return self._forward(profile, req)
@@ -100,6 +100,37 @@ export class ContextEngine {
const snapshot = this.messageFetcher.getContextSnapshot(input.roomId)
logger.debug(`[ContextEngine] snapshot=${snapshot ? `EXISTS (lastMsgId=${snapshot.lastMessageId}, summaryLen=${snapshot.summary.length})` : 'NONE'}`)
const estimateFullContextTokens = async (
history: Array<{ role: 'user' | 'assistant'; content: string }>,
messageTokenEstimate: number,
): Promise<number> => {
try {
const estimate = await input.contextTokenEstimator?.(history, instructions)
if (typeof estimate === 'number' && Number.isFinite(estimate) && estimate > 0) {
return Math.floor(estimate)
}
} catch (err: any) {
logger.warn(`[ContextEngine] full context estimate failed room=${input.roomId}, agent=${input.agentName}: ${err.message}`)
}
return messageTokenEstimate
}
const logThresholdCheck = (path: string, messageTokens: number, fullTokens: number): void => {
meta.messageTokenEstimate = messageTokens
meta.contextTokenEstimate = fullTokens
logger.info({
roomId: input.roomId,
agentName: input.agentName,
profile: input.profile || 'default',
path,
messages: total,
messageOnlyTokens: messageTokens,
fullContextTokens: fullTokens,
triggerTokens: config.triggerTokens,
decision: fullTokens > config.triggerTokens ? 'compress' : 'skip',
}, '[ContextEngine] threshold check')
}
// ── Path A: Snapshot exists — incremental ────────────
if (snapshot) {
meta.hadSnapshot = true
@@ -113,11 +144,15 @@ export class ContextEngine {
const summaryTokens = this.countTokens(snapshot.summary)
const newTokens = this.estimateTokensFromMessages(newMessages)
const totalTokens = summaryTokens + newTokens
const messageOnlyTokens = summaryTokens + newTokens
meta.verbatimCount = newMessages.length
meta.summaryTokenEstimate = summaryTokens
const snapshotHistory = this.buildHistory(snapshot.summary, newMessages, input.agentSocketId, input.agentName)
const totalTokens = await estimateFullContextTokens(snapshotHistory, messageOnlyTokens)
logThresholdCheck('snapshot', messageOnlyTokens, totalTokens)
logger.debug(`[ContextEngine] [Path A] snapshotIdx=${snapshotIdx}, newMessages=${newMessages.length}, summaryTokens=~${summaryTokens}, newTokens=~${newTokens}, totalTokens=~${totalTokens}, threshold=${config.triggerTokens}`)
logger.debug(`[ContextEngine] [Path A] EXISTING SUMMARY (${snapshot.summary.length} chars): ${snapshot.summary.slice(0, 300)}`)
if (newMessages.length > 0) {
@@ -127,12 +162,16 @@ export class ContextEngine {
// Under threshold — return summary + new messages directly
if (totalTokens <= config.triggerTokens) {
logger.debug(`[ContextEngine] [Path A] UNDER threshold — return summary + ${newMessages.length} verbatim msgs directly`)
const history = this.buildHistory(snapshot.summary, newMessages, input.agentSocketId, input.agentName)
this.logHistory('Path A (no compress)', history)
return { conversationHistory: history, instructions, meta }
this.logHistory('Path A (no compress)', snapshotHistory)
return { conversationHistory: snapshotHistory, instructions, meta }
}
// Over threshold — incremental compress
if (totalTokens > messageOnlyTokens && newMessages.length <= config.tailMessageCount) {
throw new Error(
`Context window is too small for group chat agent ${input.agentName}: fixed prompt/tool overhead plus ${newMessages.length} new messages uses ~${totalTokens} tokens, exceeding trigger ${config.triggerTokens}, and there is not enough history to compress.`,
)
}
logger.debug(`[ContextEngine] [Path A] OVER threshold — starting INCREMENTAL compression of ${newMessages.length} msgs...`)
logger.debug(`[ContextEngine] [Path A] CONTEXT BEFORE COMPRESSION: summary(${snapshot.summary.length} chars) + ${newMessages.length} new msgs`)
meta.compressed = true
@@ -156,6 +195,7 @@ export class ContextEngine {
logger.debug(`[ContextEngine] [Path A] incremental compression DONE in ${elapsed}ms, newSummaryLen=${result.summary.length}, newLastMsgId=${lastMsg.id}`)
logger.debug(`[ContextEngine] [Path A] NEW SUMMARY (${result.summary.length} chars): ${result.summary.slice(0, 300)}`)
const history = this.buildHistory(result.summary, newMessages, input.agentSocketId, input.agentName)
meta.contextTokenEstimate = await estimateFullContextTokens(history, this.estimateTokens(history))
this.logHistory('Path A (after incremental compress)', history)
if (result.sessionId) this.sessionCleaner?.(result.sessionId)
return { conversationHistory: history, instructions, meta }
@@ -169,20 +209,27 @@ export class ContextEngine {
}
// ── Path B: No snapshot — full context ───────────────
const totalTokens = this.estimateTokensFromMessages(messages)
const messageOnlyTokens = this.estimateTokensFromMessages(messages)
meta.verbatimCount = total
const fullHistory = messages.map(m => this.mapToHistory(m, input.agentSocketId, input.agentName))
const totalTokens = await estimateFullContextTokens(fullHistory, messageOnlyTokens)
logThresholdCheck('full', messageOnlyTokens, totalTokens)
logger.debug(`[ContextEngine] [Path B] no snapshot, totalMessages=${total}, totalTokens=~${totalTokens}, threshold=${config.triggerTokens}`)
// Under threshold — pass all messages verbatim
if (totalTokens <= config.triggerTokens) {
logger.debug(`[ContextEngine] [Path B] UNDER threshold — return all ${total} msgs verbatim`)
const history = messages.map(m => this.mapToHistory(m, input.agentSocketId, input.agentName))
this.logHistory('Path B (no compress)', history)
return { conversationHistory: history, instructions, meta }
this.logHistory('Path B (no compress)', fullHistory)
return { conversationHistory: fullHistory, instructions, meta }
}
// Over threshold — full compress
if (totalTokens > messageOnlyTokens && messages.length <= config.tailMessageCount) {
throw new Error(
`Context window is too small for group chat agent ${input.agentName}: fixed prompt/tool overhead plus ${messages.length} messages uses ~${totalTokens} tokens, exceeding trigger ${config.triggerTokens}, and there is not enough history to compress.`,
)
}
logger.debug(`[ContextEngine] [Path B] OVER threshold — starting FULL compression of ${total} msgs...`)
logger.debug(`[ContextEngine] [Path B] CONTEXT BEFORE COMPRESSION: ${total} msgs, ~${totalTokens} tokens`)
meta.compressed = true
@@ -210,6 +257,7 @@ export class ContextEngine {
logger.debug(`[ContextEngine] [Path B] full compression DONE in ${elapsed}ms, summaryLen=${result.summary.length}, compressed=${toCompress.length} msgs, keptTail=${tail.length} msgs, savedLastMsgId=${lastCompressedMsg.id}`)
logger.debug(`[ContextEngine] [Path B] COMPRESSED SUMMARY (${result.summary.length} chars): ${result.summary.slice(0, 300)}`)
const history = this.buildHistory(result.summary, tail, input.agentSocketId, input.agentName)
meta.contextTokenEstimate = await estimateFullContextTokens(history, this.estimateTokens(history))
this.logHistory('Path B (after full compress)', history)
if (result.sessionId) this.sessionCleaner?.(result.sessionId)
return { conversationHistory: history, instructions, meta }
@@ -49,6 +49,8 @@ export interface CompressedContext {
hadSnapshot: boolean
compressed: boolean
summaryTokenEstimate: number
contextTokenEstimate?: number
messageTokenEstimate?: number
}
}
@@ -116,4 +118,8 @@ export interface BuildContextInput {
currentMessage: StoredMessage
compression?: Partial<CompressionConfig>
profile?: string
contextTokenEstimator?: (
history: Array<{ role: 'user' | 'assistant'; content: string }>,
instructions: string,
) => Promise<number | null | undefined>
}
@@ -190,9 +190,9 @@ class AgentClient {
this.socket!.emit('stop_typing', { roomId })
}
emitContextStatus(roomId: string, status: 'compressing' | 'replying' | 'ready'): void {
emitContextStatus(roomId: string, status: 'compressing' | 'replying' | 'ready', extra?: Record<string, unknown>): void {
this.ensureConnected()
this.socket!.emit('context_status', { roomId, agentName: this.name, status })
this.socket!.emit('context_status', { roomId, agentName: this.name, status, ...extra })
}
emitApprovalRequested(roomId: string, payload: Record<string, unknown>): void {
@@ -261,7 +261,7 @@ class AgentClient {
async replyToMention(
roomId: string,
msg: MentionMessage,
onStatus?: (status: 'compressing' | 'replying' | 'ready') => void,
onStatus?: (status: 'compressing' | 'replying' | 'ready', extra?: Record<string, unknown>) => void,
): Promise<void> {
logger.debug(`[AgentClients] ${this.name} mentioned by ${msg.senderName}: "${msg.content.slice(0, 50)}"`)
const runMessageId = groupMessageId(roomId, this.profile, this.name)
@@ -278,6 +278,9 @@ class AgentClient {
// Build compressed context if context engine is available
let conversationHistory: Array<{ role: string; content: string }> = []
let instructions: string | undefined
const bridge = new AgentBridgeClient()
const sessionSeed = String(this.storage?.getRoom?.(roomId)?.sessionSeed || '0')
const sessionId = groupBridgeSessionId(roomId, this.profile, this.name, sessionSeed)
if (this.contextEngine && this.storage) {
try {
@@ -310,9 +313,32 @@ class AgentClient {
currentMessage: msg,
compression,
profile: this.profile,
contextTokenEstimator: async (history: Array<{ role: 'user' | 'assistant'; content: string }>, estimateInstructions: string) => {
const estimate = await bridge.contextEstimate(
sessionId,
history,
estimateInstructions,
this.profile,
)
logger.info({
roomId,
agentName: this.name,
profile: this.profile,
sessionId,
messages: estimate.message_count,
toolCount: estimate.tool_count,
systemPromptChars: estimate.system_prompt_chars,
fullContextTokens: estimate.token_count,
}, '[GroupChat] full context estimate')
return estimate.token_count
},
})
conversationHistory = ctx.conversationHistory
instructions = ctx.instructions
if (typeof ctx.meta.contextTokenEstimate === 'number' && Number.isFinite(ctx.meta.contextTokenEstimate)) {
this.storage.updateRoomTotalTokens?.(roomId, ctx.meta.contextTokenEstimate)
onStatus?.('replying', { totalTokens: ctx.meta.contextTokenEstimate })
}
logger.debug(`[AgentClients] ${this.name}: context built — historyLen=${conversationHistory.length}, meta=%j`, ctx.meta)
onStatus?.('replying')
} catch (err: any) {
@@ -339,9 +365,6 @@ class AgentClient {
const bridgeInput: AgentBridgeMessage = isContentBlockArray(input)
? await convertContentBlocksForAgent(input)
: input
const bridge = new AgentBridgeClient()
const sessionSeed = String(this.storage?.getRoom?.(roomId)?.sessionSeed || '0')
const sessionId = groupBridgeSessionId(roomId, this.profile, this.name, sessionSeed)
const flushedAssistantParts = new Set<string>()
let lastChunk: AgentBridgeOutput | null = null
const started = await bridge.chat(
@@ -409,6 +432,7 @@ class AgentClient {
reasoning_content: reasoningContent || null,
})
this.emitMessageStreamEnd(roomId, streamMessageId)
await this.refreshRoomFullContextEstimate(roomId, sessionId, bridge, instructions)
onStatus?.('ready')
return
}
@@ -429,6 +453,94 @@ class AgentClient {
}
}
private async refreshRoomFullContextEstimate(
roomId: string,
sessionId: string,
bridge: AgentBridgeClient,
instructions?: string,
): Promise<void> {
if (!this.storage?.getMessages) return
try {
const history = this.buildRoomEstimateHistory(roomId)
const estimate = await bridge.contextEstimate(
sessionId,
history,
instructions,
this.profile,
)
const totalTokens = Number(estimate.token_count || 0)
if (!Number.isFinite(totalTokens) || totalTokens <= 0) return
const rounded = Math.floor(totalTokens)
this.storage.updateRoomTotalTokens?.(roomId, rounded)
this.emitContextStatus(roomId, 'replying', { totalTokens: rounded })
logger.info({
roomId,
agentName: this.name,
profile: this.profile,
sessionId,
messages: estimate.message_count,
toolCount: estimate.tool_count,
systemPromptChars: estimate.system_prompt_chars,
fullContextTokens: rounded,
phase: 'final',
}, '[GroupChat] full context estimate')
} catch (err: any) {
logger.warn(`[GroupChat] failed to refresh final context estimate room=${roomId} agent=${this.name}: ${err.message}`)
}
}
private buildRoomEstimateHistory(roomId: string): Array<{ role: 'user' | 'assistant'; content: string }> {
const messages = this.storage?.getMessages?.(roomId) || []
return messages.map((message: any) => this.mapRoomMessageForEstimate(message))
}
private mapRoomMessageForEstimate(message: any): { role: 'user' | 'assistant'; content: string } {
const senderName = String(message?.senderName || 'unknown')
const role = String(message?.role || 'user')
const isOwnAgent = message?.senderId === this.socket?.id || senderName === this.name
if (role === 'tool') {
const label = message?.tool_name ? `Tool result: ${message.tool_name}` : 'Tool result'
return { role: 'user', content: `[${senderName}] [${label}]\n${message?.content || ''}` }
}
if (role === 'assistant' && Array.isArray(message?.tool_calls) && message.tool_calls.length > 0) {
const toolsInfo = message.tool_calls.map((toolCall: any) => {
const name = toolCall?.function?.name || 'unknown'
let args = String(toolCall?.function?.arguments || '{}')
if (args.length > 4000) args = `${args.slice(0, 4000)}...`
return `[Calling tool: ${name} with arguments: ${args}]`
}).join('\n')
const content = String(message?.content || '').trim()
return {
role: isOwnAgent ? 'assistant' : 'user',
content: content
? `${this.formatAttributedContent(senderName, content)}\n${this.formatAttributionPrefix(senderName)}${toolsInfo}`
: `${this.formatAttributionPrefix(senderName)}${toolsInfo}`,
}
}
return {
role: isOwnAgent ? 'assistant' : 'user',
content: this.formatAttributedContent(senderName, String(message?.content || '')),
}
}
private formatAttributedContent(senderName: string, content: string): string {
return `${this.formatAttributionPrefix(senderName)}${this.stripMentions(content)}`
}
private formatAttributionPrefix(senderName: string): string {
return `[${senderName}]: `
}
private stripMentions(content: string): string {
return String(content || '')
.replace(/@([^\s@]+)/g, '')
.replace(/[ \t]{2,}/g, ' ')
.replace(/^\s+/, '')
}
private async sendAgentErrorMessage(
roomId: string,
messageId: string,
@@ -897,8 +1009,8 @@ export class AgentClients {
}
this._processingRooms.add(agentKey)
const onStatus = (status: 'compressing' | 'replying' | 'ready') => {
agent.emitContextStatus(roomId, status)
const onStatus = (status: 'compressing' | 'replying' | 'ready', extra?: Record<string, unknown>) => {
agent.emitContextStatus(roomId, status, extra)
logger.debug(`[AgentClients] room ${roomId} agent ${agent.name} status: ${status}`)
}
@@ -925,9 +925,7 @@ export class GroupChatServer {
ack?.({ id: savedMsg.id })
const mentionDepth = normalizeMentionDepth(data.mentionDepth)
const shouldRouteMentions =
savedMsg.role === 'user' ||
(savedMsg.role === 'assistant' && mentionDepth < 2)
const shouldRouteMentions = savedMsg.role === 'user'
if (shouldRouteMentions) {
// Server-side @mention routing — parse user mentions and invoke agents directly.
@@ -1046,7 +1044,7 @@ export class GroupChatServer {
})
}
private handleContextStatus(socket: Socket, data: { roomId?: string; agentName?: string; status?: string }): void {
private handleContextStatus(socket: Socket, data: { roomId?: string; agentName?: string; status?: string; totalTokens?: number }): void {
const roomId = data.roomId || 'general'
const agentName = data.agentName || ''
const status = data.status || ''
@@ -1072,6 +1070,11 @@ export class GroupChatServer {
agentName,
status,
})
if (typeof data.totalTokens === 'number' && Number.isFinite(data.totalTokens) && data.totalTokens >= 0) {
this.storage.updateRoomTotalTokens(roomId, Math.floor(data.totalTokens))
this.nsp.to(roomId).emit('room_updated', { roomId, totalTokens: Math.floor(data.totalTokens) })
}
}
private async handleInterruptAgent(socket: Socket, data: { roomId?: string; agentName?: string }, ack?: (response?: unknown) => void): Promise<void> {
@@ -24,6 +24,17 @@ interface RunChatCompressionConfig {
compressor: Partial<CompressorConfig>
}
export class ContextWindowTooSmallError extends Error {
constructor(message: string) {
super(message)
this.name = 'ContextWindowTooSmallError'
}
}
function isContextWindowTooSmallError(err: unknown): err is ContextWindowTooSmallError {
return err instanceof ContextWindowTooSmallError || (err instanceof Error && err.name === 'ContextWindowTooSmallError')
}
function isSnapshotUsable(
snapshot: { lastMessageIndex: number } | null,
history: ChatMessage[],
@@ -167,10 +178,10 @@ export async function buildCompressedHistory(
emit: (event: string, payload: any) => void,
sessionMap: Map<string, SessionState>,
modelContext: { model?: string | null; provider?: string | null } = {},
contextTokenEstimator?: (messages: ChatMessage[]) => Promise<number | null | undefined>,
): Promise<ChatMessage[]> {
try {
let history = await buildDbHistory(sessionId, { excludeLastUser: true })
if (history.length === 0) return []
const contextLength = getModelContextLength({
profile,
@@ -185,7 +196,40 @@ export async function buildCompressedHistory(
}
const cState = getOrCreateSession(sessionMap, sessionId)
const assembledTokens = await calcAndUpdateUsage(sessionId, cState, emit)
let totalTokens = assembledTokens.inputTokens + assembledTokens.outputTokens
const estimateFullContextTokens = async (messages: ChatMessage[], fallback: number) => {
try {
const estimate = await contextTokenEstimator?.(messages)
if (typeof estimate === 'number' && Number.isFinite(estimate) && estimate > 0) return Math.floor(estimate)
} catch (err) {
logger.warn(err, '[context-compress] session=%s: full context token estimate failed; using message-only estimate', sessionId)
}
return fallback
}
const emitContextUsage = (contextTokens: number) => {
cState.contextTokens = contextTokens
emit('usage.updated', {
event: 'usage.updated',
session_id: sessionId,
inputTokens: cState.inputTokens ?? assembledTokens.inputTokens,
outputTokens: cState.outputTokens ?? assembledTokens.outputTokens,
contextTokens,
})
}
const messageOnlyTotalTokens = assembledTokens.inputTokens + assembledTokens.outputTokens
let totalTokens = messageOnlyTotalTokens
if (history.length === 0) {
totalTokens = await estimateFullContextTokens([], 0)
if (totalTokens > triggerTokens) {
throw new ContextWindowTooSmallError(
`Context window is too small: system prompt and tool schemas already use ~${totalTokens} tokens, exceeding compression threshold ${triggerTokens}. Increase model context length, raise compression.threshold, or disable some tools.`,
)
}
if (totalTokens > 0) emitContextUsage(totalTokens)
return []
}
const canCompressHistory = history.length > 4
const snapshot = getCompressionSnapshot(sessionId)
const staleSnapshot = snapshot && !isSnapshotUsable(snapshot, history)
if (staleSnapshot) {
@@ -193,15 +237,40 @@ export async function buildCompressedHistory(
sessionId, snapshot.lastMessageIndex, history.length)
const staleHistory = buildSnapshotHistory(snapshot, history, compressionConfig.compressor) || history
const staleUsage = estimateUsageTokensFromMessages(staleHistory)
totalTokens = staleUsage.inputTokens + staleUsage.outputTokens
totalTokens = await estimateFullContextTokens(staleHistory, staleUsage.inputTokens + staleUsage.outputTokens)
emitContextUsage(totalTokens)
logger.info({
sessionId,
profile,
messages: staleHistory.length,
messageOnlyTokens: staleUsage.inputTokens + staleUsage.outputTokens,
fullContextTokens: totalTokens,
triggerTokens,
decision: totalTokens > triggerTokens ? 'compress' : 'skip',
snapshot: 'stale',
}, '[context-compress] threshold check')
}
if (snapshot && !staleSnapshot) {
const newMessages = history.slice(snapshot.lastMessageIndex + 1)
const snapshotHistory = buildSnapshotHistory(snapshot, history, compressionConfig.compressor) || history
const snapshotUsage = estimateUsageTokensFromMessages(snapshotHistory)
totalTokens = await estimateFullContextTokens(snapshotHistory, snapshotUsage.inputTokens + snapshotUsage.outputTokens)
emitContextUsage(totalTokens)
logger.info({
sessionId,
profile,
messages: snapshotHistory.length,
messageOnlyTokens: snapshotUsage.inputTokens + snapshotUsage.outputTokens,
fullContextTokens: totalTokens,
triggerTokens,
decision: totalTokens > triggerTokens ? 'compress' : 'skip',
snapshot: 'usable',
}, '[context-compress] threshold check')
logger.info('[context-compress] session=%s: snapshot at %d, %d new messages, assembled ~%d tokens (threshold %d)',
sessionId, snapshot.lastMessageIndex, newMessages.length, totalTokens, triggerTokens)
if (totalTokens <= triggerTokens) {
history = buildSnapshotHistory(snapshot, history, compressionConfig.compressor) || history
history = snapshotHistory
} else {
history = await compressHistory(history, newMessages, sessionId, upstream, apiKey, cState, totalTokens, emit, sessionMap, modelContext, compressionConfig.compressor)
}
@@ -211,7 +280,24 @@ export async function buildCompressedHistory(
} else {
history = await compressHistory(history, null, sessionId, upstream, apiKey, cState, totalTokens, emit, sessionMap, modelContext, compressionConfig.compressor)
}
} else if (history.length > 4) {
} else {
totalTokens = await estimateFullContextTokens(history, totalTokens)
emitContextUsage(totalTokens)
logger.info({
sessionId,
profile,
messages: history.length,
messageOnlyTokens: messageOnlyTotalTokens,
fullContextTokens: totalTokens,
triggerTokens,
decision: totalTokens > triggerTokens ? 'compress' : 'skip',
snapshot: 'none',
}, '[context-compress] threshold check')
if (!canCompressHistory && totalTokens > triggerTokens) {
throw new ContextWindowTooSmallError(
`Context window is too small: fixed prompt/tool overhead plus ${history.length} history messages uses ~${totalTokens} tokens, exceeding compression threshold ${triggerTokens}, and there is not enough history to compress. Increase model context length, raise compression.threshold, or disable some tools.`,
)
}
if (totalTokens <= triggerTokens) {
logger.info('[context-compress] session=%s: %d messages, ~%d tokens — under threshold, skip', sessionId, history.length, totalTokens)
} else {
@@ -221,6 +307,7 @@ export async function buildCompressedHistory(
return history
} catch (err) {
if (isContextWindowTooSmallError(err)) throw err
logger.warn(err, '[chat-run-socket] failed to build compressed history for session %s', sessionId)
return []
}
@@ -310,6 +397,7 @@ export async function forceCompressBridgeHistory(
sessionId: string,
profile: string,
_messages: ChatMessage[],
beforeTokenOverride?: number | null,
): Promise<BridgeCompressionResult> {
const history = await buildDbHistory(sessionId, { excludeLastUser: true })
@@ -334,7 +422,9 @@ export async function forceCompressBridgeHistory(
const contextLength = getModelContextLength({ profile, model: session?.model, provider: session?.provider })
const compressionConfig = await getRunChatCompressionConfig(session?.profile || profile, contextLength)
const beforeUsage = estimateSnapshotAwareHistoryUsage(sessionId, history)
const totalTokens = beforeUsage.tokenCount
const totalTokens = typeof beforeTokenOverride === 'number' && Number.isFinite(beforeTokenOverride) && beforeTokenOverride > 0
? Math.floor(beforeTokenOverride)
: beforeUsage.tokenCount
bridgeLogger.info({
sessionId,
profile,
@@ -135,6 +135,26 @@ export async function handleBridgeRun(
emit,
sessionMap,
{ model: resolvedModel, provider: resolvedProvider },
async (messages) => {
const estimate = await bridge.contextEstimate(
session_id,
messages,
fullInstructions,
profile,
{ model: resolvedModel, provider: resolvedProvider },
)
bridgeLogger.info({
sessionId: session_id,
profile,
model: resolvedModel,
provider: resolvedProvider,
messages: estimate.message_count,
toolCount: estimate.tool_count,
systemPromptChars: estimate.system_prompt_chars,
fullContextTokens: estimate.token_count,
}, '[chat-run-socket] full context estimate')
return estimate.token_count
},
)
const bridgeHistory = history
@@ -315,9 +335,19 @@ async function applyBridgeChunkAsync(
} else if (evType === 'bridge.compression.requested') {
const bridgeHistory = await buildDbHistory(sessionId, { excludeLastUser: true })
const bridgeUsage = estimateUsageTokensFromMessages(bridgeHistory)
const tokenCount = bridgeHistory.length > 0
? bridgeUsage.inputTokens + bridgeUsage.outputTokens
: ev.approx_tokens
const messageOnlyTokens = bridgeUsage.inputTokens + bridgeUsage.outputTokens
const tokenCount = typeof ev.approx_tokens === 'number' && Number.isFinite(ev.approx_tokens) && ev.approx_tokens > 0
? ev.approx_tokens
: messageOnlyTokens
bridgeLogger.info({
sessionId,
profile,
bridgeMessages: ev.message_count,
dbMessages: bridgeHistory.length,
messageOnlyTokens,
fullContextTokens: tokenCount,
source: typeof ev.approx_tokens === 'number' ? 'bridge' : 'message-only-fallback',
}, '[chat-run-socket] bridge compression token estimate')
const payload = {
event: 'compression.started',
run_id: chunk.run_id,
@@ -334,6 +364,7 @@ async function applyBridgeChunkAsync(
sessionId,
profile,
ev.messages as ChatMessage[],
typeof ev.approx_tokens === 'number' ? ev.approx_tokens : undefined,
)
state.bridgeCompressionResults = state.bridgeCompressionResults || {}
state.bridgeCompressionResults[String(ev.request_id)] = compressed
@@ -357,7 +388,9 @@ async function applyBridgeChunkAsync(
totalMessages: compressionResult?.beforeMessages ?? ev.message_count,
resultMessages: compressionResult?.resultMessages ?? ev.result_messages,
beforeTokens: compressionResult?.beforeTokens ?? ev.approx_tokens,
afterTokens: compressionResult?.afterTokens,
afterTokens: typeof ev.result_approx_tokens === 'number' && Number.isFinite(ev.result_approx_tokens) && ev.result_approx_tokens > 0
? ev.result_approx_tokens
: compressionResult?.afterTokens,
summaryTokens: compressionResult?.summaryTokens,
verbatimCount: compressionResult?.verbatimCount,
compressedStartIndex: compressionResult?.compressedStartIndex,
@@ -266,6 +266,7 @@ export class ChatRunSocket {
events: state.isWorking ? state.events : [],
inputTokens: state.inputTokens,
outputTokens: state.outputTokens,
contextTokens: state.contextTokens,
queueLength: state.queue?.length || 0,
})
@@ -46,6 +46,7 @@ export interface SessionState {
profile?: string
inputTokens?: number
outputTokens?: number
contextTokens?: number
isAborting?: boolean
queue: QueuedRun[]
responseRun?: ResponseRunState
+108
View File
@@ -203,6 +203,114 @@ describe('ContextEngine.buildContext', () => {
expect(mockSummarize).not.toHaveBeenCalled()
})
it('records full context token estimates without compressing when under threshold', async () => {
const messages = makeMessages(3)
mockFetcher.getMessages = vi.fn().mockReturnValue(messages)
const contextTokenEstimator = vi.fn().mockResolvedValue(19_379)
const result = await engine.buildContext({
roomId: 'room-1',
agentId: 'agent-1',
agentName: 'Claude',
agentDescription: 'Helper',
agentSocketId: 'agent-socket',
roomName: 'general',
memberNames: ['Alice'],
members: [{ userId: 'u1', name: 'Alice', description: '' }],
upstream: 'http://localhost:8642',
apiKey: null,
currentMessage: messages[messages.length - 1],
contextTokenEstimator,
})
expect(result.meta.compressed).toBe(false)
expect(result.meta.contextTokenEstimate).toBe(19_379)
expect(result.meta.messageTokenEstimate).toBeGreaterThan(0)
expect(contextTokenEstimator).toHaveBeenCalledWith(
expect.arrayContaining([{ role: 'assistant', content: expect.stringContaining('[Claude]') }]),
expect.stringContaining('"Claude"'),
)
expect(mockSummarize).not.toHaveBeenCalled()
})
it('uses full context token estimates to trigger group compression', async () => {
const messages = makeMessages(20)
mockFetcher.getMessages = vi.fn().mockReturnValue(messages)
const result = await engine.buildContext({
roomId: 'room-1',
agentId: 'agent-1',
agentName: 'Claude',
agentDescription: 'Helper',
agentSocketId: 'agent-socket',
roomName: 'general',
memberNames: [],
members: [],
upstream: 'http://localhost:8642',
apiKey: null,
currentMessage: messages[messages.length - 1],
contextTokenEstimator: vi.fn().mockResolvedValue(120_000),
})
expect(result.meta.compressed).toBe(true)
expect(result.meta.contextTokenEstimate).toBe(120_000)
expect(mockSummarize).toHaveBeenCalledTimes(1)
expect(mockFetcher.saveContextSnapshot).toHaveBeenCalledTimes(1)
})
it('throws when group prompt and tools exceed threshold with too little history to compress', async () => {
const messages = makeMessages(4)
mockFetcher.getMessages = vi.fn().mockReturnValue(messages)
await expect(engine.buildContext({
roomId: 'room-1',
agentId: 'agent-1',
agentName: 'Claude',
agentDescription: 'Helper',
agentSocketId: 'agent-socket',
roomName: 'general',
memberNames: [],
members: [],
upstream: 'http://localhost:8642',
apiKey: null,
currentMessage: messages[messages.length - 1],
contextTokenEstimator: vi.fn().mockResolvedValue(120_000),
})).rejects.toThrow('Context window is too small')
expect(mockSummarize).not.toHaveBeenCalled()
expect(mockFetcher.saveContextSnapshot).not.toHaveBeenCalled()
})
it('throws on snapshot path when overhead plus new messages exceed threshold without compressible history', async () => {
const messages = makeMessages(12)
mockFetcher.getMessages = vi.fn().mockReturnValue(messages)
mockFetcher.getContextSnapshot = vi.fn().mockReturnValue({
roomId: 'room-1',
summary: 'Existing summary',
lastMessageId: 'msg-9',
lastMessageTimestamp: messages[9].timestamp,
updatedAt: Date.now(),
})
await expect(engine.buildContext({
roomId: 'room-1',
agentId: 'agent-1',
agentName: 'Claude',
agentDescription: 'Helper',
agentSocketId: 'agent-socket',
roomName: 'general',
memberNames: [],
members: [],
upstream: 'http://localhost:8642',
apiKey: null,
currentMessage: messages[messages.length - 1],
contextTokenEstimator: vi.fn().mockResolvedValue(120_000),
})).rejects.toThrow('Context window is too small')
expect(mockSummarize).not.toHaveBeenCalled()
expect(mockFetcher.saveContextSnapshot).not.toHaveBeenCalled()
})
it('splits into head/tail and compresses middle when over threshold', async () => {
const messages = makeMessages(20)
mockFetcher.getMessages = vi.fn().mockReturnValue(messages)
@@ -27,6 +27,7 @@ vi.mock('../../packages/server/src/services/auth', () => ({
}))
import { AgentClients } from '../../packages/server/src/services/hermes/group-chat/agent-clients'
import { GroupChatServer } from '../../packages/server/src/services/hermes/group-chat'
import { groupChatRoutes, setGroupChatServer } from '../../packages/server/src/routes/hermes/group-chat'
function routeHandler(path: string, method: string) {
@@ -222,4 +223,37 @@ describe('Group Chat member/agent identity sync', () => {
members: [{ id: 'member-1', userId: 'human-1', name: 'Han', description: '', joinedAt: 1 }],
})
})
it('routes @mentions only from user messages, not agent replies', () => {
const server = Object.create(GroupChatServer.prototype) as any
const emit = vi.fn()
server.rooms = new Map([
['room-1', {
hasOnlineMember: vi.fn(() => true),
getOnlineMemberBySocketId: vi.fn((socketId: string) => socketId === 'agent-socket'
? { userId: 'agent-1', name: '丫鬟' }
: { userId: 'human-1', name: 'Human' }),
}],
])
server.socketUserMap = new Map([
['human-socket', 'human-1'],
['agent-socket', 'agent-1'],
])
server.userInfoMap = new Map([
['human-1', { name: 'Human', description: '' }],
['agent-1', { name: '丫鬟', description: '' }],
])
server.agentClients = { processMentions: vi.fn(async () => undefined) }
server.storage = {
saveMessageAndRefreshRoom: vi.fn((msg: any) => ({ message: msg, totalTokens: 123 })),
}
server.nsp = { to: vi.fn(() => ({ emit })) }
server.handleMessage({ id: 'human-socket' }, { roomId: 'room-1', content: '@all hi', role: 'user' }, vi.fn())
expect(server.agentClients.processMentions).toHaveBeenCalledTimes(1)
server.agentClients.processMentions.mockClear()
server.handleMessage({ id: 'agent-socket' }, { roomId: 'room-1', content: '@all agent says hi', role: 'assistant', mentionDepth: 1 }, vi.fn())
expect(server.agentClients.processMentions).not.toHaveBeenCalled()
})
})
+138
View File
@@ -162,6 +162,144 @@ describe('run chat compression trigger', () => {
)
})
it('uses full context estimates for compression threshold decisions', async () => {
const messages = Array.from({ length: 10 }, (_, index) => ({
id: index + 1,
session_id: 'session-1',
role: index === 9 ? 'user' : index % 2 === 0 ? 'user' : 'assistant',
content: `message ${index}`,
timestamp: index + 1,
tool_call_id: null,
tool_calls: null,
tool_name: null,
finish_reason: null,
reasoning_content: null,
}))
getSessionDetailMock.mockReturnValue({ messages })
calcAndUpdateUsageMock.mockResolvedValue({ inputTokens: 1_000, outputTokens: 0 })
compressorCompressMock.mockResolvedValue({
messages: [{ role: 'user', content: 'compressed by full context estimate' }],
meta: {
compressed: true,
llmCompressed: true,
totalMessages: 9,
summaryTokenEstimate: 1,
verbatimCount: 0,
compressedStartIndex: 0,
},
})
const { buildCompressedHistory } = await import('../../packages/server/src/services/hermes/run-chat/compression')
const history = await buildCompressedHistory(
'session-1',
'default',
'http://upstream',
undefined,
vi.fn(),
new Map(),
{},
vi.fn(async () => 120_000),
)
expect(history).toEqual([{ role: 'user', content: 'compressed by full context estimate' }])
expect(compressorCompressMock).toHaveBeenCalledTimes(1)
})
it('emits full context token usage when the full estimate is under threshold', async () => {
const messages = Array.from({ length: 10 }, (_, index) => ({
id: index + 1,
session_id: 'session-1',
role: index === 9 ? 'user' : index % 2 === 0 ? 'user' : 'assistant',
content: `message ${index}`,
timestamp: index + 1,
tool_call_id: null,
tool_calls: null,
tool_name: null,
finish_reason: null,
reasoning_content: null,
}))
getSessionDetailMock.mockReturnValue({ messages })
calcAndUpdateUsageMock.mockResolvedValue({ inputTokens: 1_000, outputTokens: 900 })
const emit = vi.fn()
const contextTokenEstimator = vi.fn(async () => 19_379)
const { buildCompressedHistory } = await import('../../packages/server/src/services/hermes/run-chat/compression')
const history = await buildCompressedHistory(
'session-1',
'default',
'http://upstream',
undefined,
emit,
new Map(),
{},
contextTokenEstimator,
)
expect(history).toHaveLength(9)
expect(contextTokenEstimator).toHaveBeenCalledWith(expect.arrayContaining([{ role: 'user', content: 'message 0' }]))
expect(emit).toHaveBeenCalledWith('usage.updated', expect.objectContaining({
event: 'usage.updated',
session_id: 'session-1',
inputTokens: 1_000,
outputTokens: 900,
contextTokens: 19_379,
}))
expect(compressorCompressMock).not.toHaveBeenCalled()
})
it('throws when fixed prompt and tool schemas exceed threshold before any history exists', async () => {
getSessionDetailMock.mockReturnValue({ messages: [] })
const emit = vi.fn()
const { buildCompressedHistory, ContextWindowTooSmallError } = await import('../../packages/server/src/services/hermes/run-chat/compression')
await expect(buildCompressedHistory(
'session-1',
'default',
'http://upstream',
undefined,
emit,
new Map(),
{},
vi.fn(async () => 120_000),
)).rejects.toBeInstanceOf(ContextWindowTooSmallError)
expect(emit).not.toHaveBeenCalledWith('usage.updated', expect.anything())
expect(compressorCompressMock).not.toHaveBeenCalled()
})
it('throws instead of compressing when full context is over threshold but history is too short', async () => {
const messages = Array.from({ length: 5 }, (_, index) => ({
id: index + 1,
session_id: 'session-1',
role: index === 4 ? 'user' : index % 2 === 0 ? 'user' : 'assistant',
content: `message ${index}`,
timestamp: index + 1,
tool_call_id: null,
tool_calls: null,
tool_name: null,
finish_reason: null,
reasoning_content: null,
}))
getSessionDetailMock.mockReturnValue({ messages })
calcAndUpdateUsageMock.mockResolvedValue({ inputTokens: 1_000, outputTokens: 0 })
const { buildCompressedHistory, ContextWindowTooSmallError } = await import('../../packages/server/src/services/hermes/run-chat/compression')
await expect(buildCompressedHistory(
'session-1',
'default',
'http://upstream',
undefined,
vi.fn(),
new Map(),
{},
vi.fn(async () => 120_000),
)).rejects.toBeInstanceOf(ContextWindowTooSmallError)
expect(compressorCompressMock).not.toHaveBeenCalled()
})
it('merges partial compression config with defaults', async () => {
const messages = Array.from({ length: 10 }, (_, index) => ({
id: index + 1,