diff --git a/packages/client/src/api/hermes/chat.ts b/packages/client/src/api/hermes/chat.ts index 8bf7823..e1365ff 100644 --- a/packages/client/src/api/hermes/chat.ts +++ b/packages/client/src/api/hermes/chat.ts @@ -488,7 +488,7 @@ function removeSocketListener(socket: Socket, event: string, handler: (...args: */ export function resumeSession( sessionId: string, - onResumed: (data: { session_id: string; messages: any[]; isWorking: boolean; isAborting?: boolean; events: any[]; inputTokens?: number; outputTokens?: number; queueLength?: number }) => void, + onResumed: (data: { session_id: string; messages: any[]; isWorking: boolean; isAborting?: boolean; events: any[]; inputTokens?: number; outputTokens?: number; contextTokens?: number; queueLength?: number }) => void, ): Socket { const socket = connectChatRun() diff --git a/packages/client/src/components/hermes/chat/ChatInput.vue b/packages/client/src/components/hermes/chat/ChatInput.vue index 3a7e077..43bebe1 100644 --- a/packages/client/src/components/hermes/chat/ChatInput.vue +++ b/packages/client/src/components/hermes/chat/ChatInput.vue @@ -204,6 +204,8 @@ watch(() => chatStore.activeSession?.provider, loadContextLength) watch(() => chatStore.activeSession?.model, loadContextLength) const totalTokens = computed(() => { + const context = chatStore.activeSession?.contextTokens + if (typeof context === 'number' && Number.isFinite(context) && context > 0) return context const input = chatStore.activeSession?.inputTokens ?? 0 const output = chatStore.activeSession?.outputTokens ?? 0 return input + output diff --git a/packages/client/src/stores/hermes/chat.ts b/packages/client/src/stores/hermes/chat.ts index ab2c78e..f6c7f0b 100644 --- a/packages/client/src/stores/hermes/chat.ts +++ b/packages/client/src/stores/hermes/chat.ts @@ -69,6 +69,7 @@ export interface Session { messageCount?: number inputTokens?: number outputTokens?: number + contextTokens?: number endedAt?: number | null lastActiveAt?: number workspace?: string | null @@ -529,6 +530,7 @@ export const useChatStore = defineStore('chat', () => { } if (data.inputTokens != null) activeSession.value!.inputTokens = data.inputTokens if (data.outputTokens != null) activeSession.value!.outputTokens = data.outputTokens + if ((data as any).contextTokens != null) activeSession.value!.contextTokens = (data as any).contextTokens if (data.messages?.length) { activeSession.value!.messages = mapHermesMessages(data.messages as any[]) } @@ -755,6 +757,7 @@ export const useChatStore = defineStore('chat', () => { if (action === 'usage' && target) { target.inputTokens = (evt as any).inputTokens target.outputTokens = (evt as any).outputTokens + if ((evt as any).contextTokens != null) target.contextTokens = (evt as any).contextTokens } if (action === 'destroy') { @@ -1258,6 +1261,7 @@ export const useChatStore = defineStore('chat', () => { if (target) { target.inputTokens = (evt as any).inputTokens target.outputTokens = (evt as any).outputTokens + if ((evt as any).contextTokens != null) target.contextTokens = (evt as any).contextTokens } } // Belt-and-suspenders: some providers may deliver the final @@ -1367,6 +1371,7 @@ export const useChatStore = defineStore('chat', () => { if (target) { target.inputTokens = (evt as any).inputTokens target.outputTokens = (evt as any).outputTokens + if ((evt as any).contextTokens != null) target.contextTokens = (evt as any).contextTokens } break } @@ -1689,6 +1694,7 @@ export const useChatStore = defineStore('chat', () => { if (target) { target.inputTokens = (evt as any).inputTokens target.outputTokens = (evt as any).outputTokens + if ((evt as any).contextTokens != null) target.contextTokens = (evt as any).contextTokens } } // Check if backend provided parsed content (from stringified array format) @@ -1782,6 +1788,7 @@ export const useChatStore = defineStore('chat', () => { if (target) { target.inputTokens = (evt as any).inputTokens target.outputTokens = (evt as any).outputTokens + if ((evt as any).contextTokens != null) target.contextTokens = (evt as any).contextTokens } break } diff --git a/packages/server/src/services/hermes/agent-bridge/client.ts b/packages/server/src/services/hermes/agent-bridge/client.ts index b3dfa8b..6965682 100644 --- a/packages/server/src/services/hermes/agent-bridge/client.ts +++ b/packages/server/src/services/hermes/agent-bridge/client.ts @@ -90,6 +90,14 @@ export interface AgentBridgeRunResult extends AgentBridgeResponse { error?: string | null } +export interface AgentBridgeContextEstimate extends AgentBridgeResponse { + session_id: string + token_count?: number | null + message_count: number + tool_count: number + system_prompt_chars: number +} + export interface AgentBridgeCommandResult extends AgentBridgeResponse { session_id: string command: string @@ -372,6 +380,24 @@ export class AgentBridgeClient { }) } + contextEstimate( + sessionId: string, + messages: unknown[], + instructions?: string, + profile?: string, + options: Pick = {}, + ): Promise { + return this.request({ + action: 'context_estimate', + session_id: sessionId, + messages, + ...(instructions ? { instructions } : {}), + ...(profile ? { profile } : {}), + ...(options.model ? { model: options.model } : {}), + ...(options.provider ? { provider: options.provider } : {}), + }) + } + command(sessionId: string, command: string): Promise { return this.request({ action: 'command', diff --git a/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py b/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py index 98c615c..ba7bd75 100755 --- a/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py +++ b/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py @@ -602,6 +602,17 @@ class AgentPool: def wrapped_compress_context(messages, system_message, **kwargs): before_count = len(messages) if isinstance(messages, list) else 0 + approx_tokens = kwargs.get("approx_tokens") + if not isinstance(approx_tokens, int) or approx_tokens <= 0: + approx_tokens = self._estimate_context_tokens(agent, messages, system_message) + print( + "[hermes_bridge] compression requested " + f"session={session_id} messages={before_count} " + f"tokens={approx_tokens if approx_tokens is not None else 'unknown'} " + f"focus={kwargs.get('focus_topic') or ''}", + file=sys.stderr, + flush=True, + ) request_id = uuid.uuid4().hex response_queue: queue.Queue[dict[str, Any]] = queue.Queue(maxsize=1) with self._lock: @@ -610,7 +621,7 @@ class AgentPool: "event": "bridge.compression.requested", "request_id": request_id, "message_count": before_count, - "approx_tokens": kwargs.get("approx_tokens"), + "approx_tokens": approx_tokens, "focus_topic": kwargs.get("focus_topic"), "messages": _jsonable(messages), }) @@ -622,12 +633,14 @@ class AgentPool: if not isinstance(compressed_messages, list): raise RuntimeError("bridge compression response missing messages") next_system_message = response.get("system_message", system_message) + result_approx_tokens = self._estimate_context_tokens(agent, compressed_messages, next_system_message) self._append_event(session_id, { "event": "bridge.compression.completed", "request_id": request_id, "message_count": before_count, "result_messages": len(compressed_messages), - "approx_tokens": kwargs.get("approx_tokens"), + "approx_tokens": approx_tokens, + "result_approx_tokens": result_approx_tokens, "compressed": True, }) return compressed_messages, next_system_message @@ -636,7 +649,7 @@ class AgentPool: "event": "bridge.compression.failed", "request_id": request_id, "message_count": before_count, - "approx_tokens": kwargs.get("approx_tokens"), + "approx_tokens": approx_tokens, "error": "bridge compression timed out", }) raise RuntimeError("bridge compression timed out") @@ -645,7 +658,7 @@ class AgentPool: "event": "bridge.compression.failed", "request_id": request_id, "message_count": before_count, - "approx_tokens": kwargs.get("approx_tokens"), + "approx_tokens": approx_tokens, "error": str(exc), }) raise @@ -655,6 +668,61 @@ class AgentPool: agent._compress_context = wrapped_compress_context + def _estimate_context_tokens(self, agent: Any, messages: Any, system_message: Any = None) -> int | None: + try: + from agent.model_metadata import estimate_request_tokens_rough + except Exception: + return None + + prompt = str(getattr(agent, "_cached_system_prompt", "") or "") + if not prompt: + try: + build_prompt = getattr(agent, "_build_system_prompt", None) + if callable(build_prompt): + prompt = str(build_prompt(system_message) or "") + except Exception: + prompt = str(system_message or "") + + try: + estimate = estimate_request_tokens_rough( + messages if isinstance(messages, list) else [], + system_prompt=prompt, + tools=getattr(agent, "tools", None) or None, + ) + return int(estimate) if isinstance(estimate, (int, float)) and estimate > 0 else None + except Exception: + return None + + def estimate_context( + self, + session_id: str, + messages: list[dict[str, Any]] | None = None, + instructions: str | None = None, + profile: str | None = None, + model: str | None = None, + provider: str | None = None, + ) -> dict[str, Any]: + session = self.get_or_create(session_id, profile=profile, model=model, provider=provider) + token_count = self._estimate_context_tokens(session.agent, messages or [], instructions) + tools = getattr(session.agent, "tools", None) or [] + prompt = str(getattr(session.agent, "_cached_system_prompt", "") or "") + print( + "[hermes_bridge] context estimate " + f"session={session_id} profile={profile or 'default'} " + f"messages={len(messages or [])} system_prompt_chars={len(prompt)} " + f"tools={len(tools) if isinstance(tools, list) else 0} " + f"tokens={token_count if token_count is not None else 'unknown'}", + file=sys.stderr, + flush=True, + ) + return { + "session_id": session_id, + "token_count": token_count, + "message_count": len(messages or []), + "tool_count": len(tools) if isinstance(tools, list) else 0, + "system_prompt_chars": len(prompt), + } + def respond_compression( self, request_id: str, @@ -1329,6 +1397,20 @@ class BridgeServer: return self.pool.get_result(record.run_id) return {"run_id": record.run_id, "session_id": session_id, "status": record.status} + if action == "context_estimate": + session_id = str(req.get("session_id") or "").strip() or uuid.uuid4().hex + messages = req.get("messages") or req.get("conversation_history") or [] + if not isinstance(messages, list): + raise ValueError("messages must be a list") + return self.pool.estimate_context( + session_id, + messages=messages, + instructions=req.get("instructions") or req.get("system_message"), + profile=req.get("profile"), + model=req.get("model"), + provider=req.get("provider"), + ) + if action == "get_result": return self.pool.get_result(str(req.get("run_id") or "")) @@ -1870,6 +1952,10 @@ class BridgeBroker: profile = self._normalize_profile(req.get("profile")) return self._forward(profile, req) + if action == "context_estimate": + profile = self._normalize_profile(req.get("profile")) + return self._forward(profile, req) + if action in {"get_result", "get_output"}: profile = self._profile_for_run(str(req.get("run_id") or "")) return self._forward(profile, req) diff --git a/packages/server/src/services/hermes/context-engine/compressor.ts b/packages/server/src/services/hermes/context-engine/compressor.ts index 3d3f80b..813847d 100644 --- a/packages/server/src/services/hermes/context-engine/compressor.ts +++ b/packages/server/src/services/hermes/context-engine/compressor.ts @@ -100,6 +100,37 @@ export class ContextEngine { const snapshot = this.messageFetcher.getContextSnapshot(input.roomId) logger.debug(`[ContextEngine] snapshot=${snapshot ? `EXISTS (lastMsgId=${snapshot.lastMessageId}, summaryLen=${snapshot.summary.length})` : 'NONE'}`) + const estimateFullContextTokens = async ( + history: Array<{ role: 'user' | 'assistant'; content: string }>, + messageTokenEstimate: number, + ): Promise => { + try { + const estimate = await input.contextTokenEstimator?.(history, instructions) + if (typeof estimate === 'number' && Number.isFinite(estimate) && estimate > 0) { + return Math.floor(estimate) + } + } catch (err: any) { + logger.warn(`[ContextEngine] full context estimate failed room=${input.roomId}, agent=${input.agentName}: ${err.message}`) + } + return messageTokenEstimate + } + + const logThresholdCheck = (path: string, messageTokens: number, fullTokens: number): void => { + meta.messageTokenEstimate = messageTokens + meta.contextTokenEstimate = fullTokens + logger.info({ + roomId: input.roomId, + agentName: input.agentName, + profile: input.profile || 'default', + path, + messages: total, + messageOnlyTokens: messageTokens, + fullContextTokens: fullTokens, + triggerTokens: config.triggerTokens, + decision: fullTokens > config.triggerTokens ? 'compress' : 'skip', + }, '[ContextEngine] threshold check') + } + // ── Path A: Snapshot exists — incremental ──────────── if (snapshot) { meta.hadSnapshot = true @@ -113,11 +144,15 @@ export class ContextEngine { const summaryTokens = this.countTokens(snapshot.summary) const newTokens = this.estimateTokensFromMessages(newMessages) - const totalTokens = summaryTokens + newTokens + const messageOnlyTokens = summaryTokens + newTokens meta.verbatimCount = newMessages.length meta.summaryTokenEstimate = summaryTokens + const snapshotHistory = this.buildHistory(snapshot.summary, newMessages, input.agentSocketId, input.agentName) + const totalTokens = await estimateFullContextTokens(snapshotHistory, messageOnlyTokens) + logThresholdCheck('snapshot', messageOnlyTokens, totalTokens) + logger.debug(`[ContextEngine] [Path A] snapshotIdx=${snapshotIdx}, newMessages=${newMessages.length}, summaryTokens=~${summaryTokens}, newTokens=~${newTokens}, totalTokens=~${totalTokens}, threshold=${config.triggerTokens}`) logger.debug(`[ContextEngine] [Path A] EXISTING SUMMARY (${snapshot.summary.length} chars): ${snapshot.summary.slice(0, 300)}`) if (newMessages.length > 0) { @@ -127,12 +162,16 @@ export class ContextEngine { // Under threshold — return summary + new messages directly if (totalTokens <= config.triggerTokens) { logger.debug(`[ContextEngine] [Path A] UNDER threshold — return summary + ${newMessages.length} verbatim msgs directly`) - const history = this.buildHistory(snapshot.summary, newMessages, input.agentSocketId, input.agentName) - this.logHistory('Path A (no compress)', history) - return { conversationHistory: history, instructions, meta } + this.logHistory('Path A (no compress)', snapshotHistory) + return { conversationHistory: snapshotHistory, instructions, meta } } // Over threshold — incremental compress + if (totalTokens > messageOnlyTokens && newMessages.length <= config.tailMessageCount) { + throw new Error( + `Context window is too small for group chat agent ${input.agentName}: fixed prompt/tool overhead plus ${newMessages.length} new messages uses ~${totalTokens} tokens, exceeding trigger ${config.triggerTokens}, and there is not enough history to compress.`, + ) + } logger.debug(`[ContextEngine] [Path A] OVER threshold — starting INCREMENTAL compression of ${newMessages.length} msgs...`) logger.debug(`[ContextEngine] [Path A] CONTEXT BEFORE COMPRESSION: summary(${snapshot.summary.length} chars) + ${newMessages.length} new msgs`) meta.compressed = true @@ -156,6 +195,7 @@ export class ContextEngine { logger.debug(`[ContextEngine] [Path A] incremental compression DONE in ${elapsed}ms, newSummaryLen=${result.summary.length}, newLastMsgId=${lastMsg.id}`) logger.debug(`[ContextEngine] [Path A] NEW SUMMARY (${result.summary.length} chars): ${result.summary.slice(0, 300)}`) const history = this.buildHistory(result.summary, newMessages, input.agentSocketId, input.agentName) + meta.contextTokenEstimate = await estimateFullContextTokens(history, this.estimateTokens(history)) this.logHistory('Path A (after incremental compress)', history) if (result.sessionId) this.sessionCleaner?.(result.sessionId) return { conversationHistory: history, instructions, meta } @@ -169,20 +209,27 @@ export class ContextEngine { } // ── Path B: No snapshot — full context ─────────────── - const totalTokens = this.estimateTokensFromMessages(messages) + const messageOnlyTokens = this.estimateTokensFromMessages(messages) meta.verbatimCount = total + const fullHistory = messages.map(m => this.mapToHistory(m, input.agentSocketId, input.agentName)) + const totalTokens = await estimateFullContextTokens(fullHistory, messageOnlyTokens) + logThresholdCheck('full', messageOnlyTokens, totalTokens) logger.debug(`[ContextEngine] [Path B] no snapshot, totalMessages=${total}, totalTokens=~${totalTokens}, threshold=${config.triggerTokens}`) // Under threshold — pass all messages verbatim if (totalTokens <= config.triggerTokens) { logger.debug(`[ContextEngine] [Path B] UNDER threshold — return all ${total} msgs verbatim`) - const history = messages.map(m => this.mapToHistory(m, input.agentSocketId, input.agentName)) - this.logHistory('Path B (no compress)', history) - return { conversationHistory: history, instructions, meta } + this.logHistory('Path B (no compress)', fullHistory) + return { conversationHistory: fullHistory, instructions, meta } } // Over threshold — full compress + if (totalTokens > messageOnlyTokens && messages.length <= config.tailMessageCount) { + throw new Error( + `Context window is too small for group chat agent ${input.agentName}: fixed prompt/tool overhead plus ${messages.length} messages uses ~${totalTokens} tokens, exceeding trigger ${config.triggerTokens}, and there is not enough history to compress.`, + ) + } logger.debug(`[ContextEngine] [Path B] OVER threshold — starting FULL compression of ${total} msgs...`) logger.debug(`[ContextEngine] [Path B] CONTEXT BEFORE COMPRESSION: ${total} msgs, ~${totalTokens} tokens`) meta.compressed = true @@ -210,6 +257,7 @@ export class ContextEngine { logger.debug(`[ContextEngine] [Path B] full compression DONE in ${elapsed}ms, summaryLen=${result.summary.length}, compressed=${toCompress.length} msgs, keptTail=${tail.length} msgs, savedLastMsgId=${lastCompressedMsg.id}`) logger.debug(`[ContextEngine] [Path B] COMPRESSED SUMMARY (${result.summary.length} chars): ${result.summary.slice(0, 300)}`) const history = this.buildHistory(result.summary, tail, input.agentSocketId, input.agentName) + meta.contextTokenEstimate = await estimateFullContextTokens(history, this.estimateTokens(history)) this.logHistory('Path B (after full compress)', history) if (result.sessionId) this.sessionCleaner?.(result.sessionId) return { conversationHistory: history, instructions, meta } diff --git a/packages/server/src/services/hermes/context-engine/types.ts b/packages/server/src/services/hermes/context-engine/types.ts index 6dd4a32..71fc3d4 100644 --- a/packages/server/src/services/hermes/context-engine/types.ts +++ b/packages/server/src/services/hermes/context-engine/types.ts @@ -49,6 +49,8 @@ export interface CompressedContext { hadSnapshot: boolean compressed: boolean summaryTokenEstimate: number + contextTokenEstimate?: number + messageTokenEstimate?: number } } @@ -116,4 +118,8 @@ export interface BuildContextInput { currentMessage: StoredMessage compression?: Partial profile?: string + contextTokenEstimator?: ( + history: Array<{ role: 'user' | 'assistant'; content: string }>, + instructions: string, + ) => Promise } diff --git a/packages/server/src/services/hermes/group-chat/agent-clients.ts b/packages/server/src/services/hermes/group-chat/agent-clients.ts index 26d9aed..c5d95da 100644 --- a/packages/server/src/services/hermes/group-chat/agent-clients.ts +++ b/packages/server/src/services/hermes/group-chat/agent-clients.ts @@ -190,9 +190,9 @@ class AgentClient { this.socket!.emit('stop_typing', { roomId }) } - emitContextStatus(roomId: string, status: 'compressing' | 'replying' | 'ready'): void { + emitContextStatus(roomId: string, status: 'compressing' | 'replying' | 'ready', extra?: Record): void { this.ensureConnected() - this.socket!.emit('context_status', { roomId, agentName: this.name, status }) + this.socket!.emit('context_status', { roomId, agentName: this.name, status, ...extra }) } emitApprovalRequested(roomId: string, payload: Record): void { @@ -261,7 +261,7 @@ class AgentClient { async replyToMention( roomId: string, msg: MentionMessage, - onStatus?: (status: 'compressing' | 'replying' | 'ready') => void, + onStatus?: (status: 'compressing' | 'replying' | 'ready', extra?: Record) => void, ): Promise { logger.debug(`[AgentClients] ${this.name} mentioned by ${msg.senderName}: "${msg.content.slice(0, 50)}"`) const runMessageId = groupMessageId(roomId, this.profile, this.name) @@ -278,6 +278,9 @@ class AgentClient { // Build compressed context if context engine is available let conversationHistory: Array<{ role: string; content: string }> = [] let instructions: string | undefined + const bridge = new AgentBridgeClient() + const sessionSeed = String(this.storage?.getRoom?.(roomId)?.sessionSeed || '0') + const sessionId = groupBridgeSessionId(roomId, this.profile, this.name, sessionSeed) if (this.contextEngine && this.storage) { try { @@ -310,9 +313,32 @@ class AgentClient { currentMessage: msg, compression, profile: this.profile, + contextTokenEstimator: async (history: Array<{ role: 'user' | 'assistant'; content: string }>, estimateInstructions: string) => { + const estimate = await bridge.contextEstimate( + sessionId, + history, + estimateInstructions, + this.profile, + ) + logger.info({ + roomId, + agentName: this.name, + profile: this.profile, + sessionId, + messages: estimate.message_count, + toolCount: estimate.tool_count, + systemPromptChars: estimate.system_prompt_chars, + fullContextTokens: estimate.token_count, + }, '[GroupChat] full context estimate') + return estimate.token_count + }, }) conversationHistory = ctx.conversationHistory instructions = ctx.instructions + if (typeof ctx.meta.contextTokenEstimate === 'number' && Number.isFinite(ctx.meta.contextTokenEstimate)) { + this.storage.updateRoomTotalTokens?.(roomId, ctx.meta.contextTokenEstimate) + onStatus?.('replying', { totalTokens: ctx.meta.contextTokenEstimate }) + } logger.debug(`[AgentClients] ${this.name}: context built — historyLen=${conversationHistory.length}, meta=%j`, ctx.meta) onStatus?.('replying') } catch (err: any) { @@ -339,9 +365,6 @@ class AgentClient { const bridgeInput: AgentBridgeMessage = isContentBlockArray(input) ? await convertContentBlocksForAgent(input) : input - const bridge = new AgentBridgeClient() - const sessionSeed = String(this.storage?.getRoom?.(roomId)?.sessionSeed || '0') - const sessionId = groupBridgeSessionId(roomId, this.profile, this.name, sessionSeed) const flushedAssistantParts = new Set() let lastChunk: AgentBridgeOutput | null = null const started = await bridge.chat( @@ -409,6 +432,7 @@ class AgentClient { reasoning_content: reasoningContent || null, }) this.emitMessageStreamEnd(roomId, streamMessageId) + await this.refreshRoomFullContextEstimate(roomId, sessionId, bridge, instructions) onStatus?.('ready') return } @@ -429,6 +453,94 @@ class AgentClient { } } + private async refreshRoomFullContextEstimate( + roomId: string, + sessionId: string, + bridge: AgentBridgeClient, + instructions?: string, + ): Promise { + if (!this.storage?.getMessages) return + try { + const history = this.buildRoomEstimateHistory(roomId) + const estimate = await bridge.contextEstimate( + sessionId, + history, + instructions, + this.profile, + ) + const totalTokens = Number(estimate.token_count || 0) + if (!Number.isFinite(totalTokens) || totalTokens <= 0) return + const rounded = Math.floor(totalTokens) + this.storage.updateRoomTotalTokens?.(roomId, rounded) + this.emitContextStatus(roomId, 'replying', { totalTokens: rounded }) + logger.info({ + roomId, + agentName: this.name, + profile: this.profile, + sessionId, + messages: estimate.message_count, + toolCount: estimate.tool_count, + systemPromptChars: estimate.system_prompt_chars, + fullContextTokens: rounded, + phase: 'final', + }, '[GroupChat] full context estimate') + } catch (err: any) { + logger.warn(`[GroupChat] failed to refresh final context estimate room=${roomId} agent=${this.name}: ${err.message}`) + } + } + + private buildRoomEstimateHistory(roomId: string): Array<{ role: 'user' | 'assistant'; content: string }> { + const messages = this.storage?.getMessages?.(roomId) || [] + return messages.map((message: any) => this.mapRoomMessageForEstimate(message)) + } + + private mapRoomMessageForEstimate(message: any): { role: 'user' | 'assistant'; content: string } { + const senderName = String(message?.senderName || 'unknown') + const role = String(message?.role || 'user') + const isOwnAgent = message?.senderId === this.socket?.id || senderName === this.name + + if (role === 'tool') { + const label = message?.tool_name ? `Tool result: ${message.tool_name}` : 'Tool result' + return { role: 'user', content: `[${senderName}] [${label}]\n${message?.content || ''}` } + } + + if (role === 'assistant' && Array.isArray(message?.tool_calls) && message.tool_calls.length > 0) { + const toolsInfo = message.tool_calls.map((toolCall: any) => { + const name = toolCall?.function?.name || 'unknown' + let args = String(toolCall?.function?.arguments || '{}') + if (args.length > 4000) args = `${args.slice(0, 4000)}...` + return `[Calling tool: ${name} with arguments: ${args}]` + }).join('\n') + const content = String(message?.content || '').trim() + return { + role: isOwnAgent ? 'assistant' : 'user', + content: content + ? `${this.formatAttributedContent(senderName, content)}\n${this.formatAttributionPrefix(senderName)}${toolsInfo}` + : `${this.formatAttributionPrefix(senderName)}${toolsInfo}`, + } + } + + return { + role: isOwnAgent ? 'assistant' : 'user', + content: this.formatAttributedContent(senderName, String(message?.content || '')), + } + } + + private formatAttributedContent(senderName: string, content: string): string { + return `${this.formatAttributionPrefix(senderName)}${this.stripMentions(content)}` + } + + private formatAttributionPrefix(senderName: string): string { + return `[${senderName}]: ` + } + + private stripMentions(content: string): string { + return String(content || '') + .replace(/@([^\s@]+)/g, '') + .replace(/[ \t]{2,}/g, ' ') + .replace(/^\s+/, '') + } + private async sendAgentErrorMessage( roomId: string, messageId: string, @@ -897,8 +1009,8 @@ export class AgentClients { } this._processingRooms.add(agentKey) - const onStatus = (status: 'compressing' | 'replying' | 'ready') => { - agent.emitContextStatus(roomId, status) + const onStatus = (status: 'compressing' | 'replying' | 'ready', extra?: Record) => { + agent.emitContextStatus(roomId, status, extra) logger.debug(`[AgentClients] room ${roomId} agent ${agent.name} status: ${status}`) } diff --git a/packages/server/src/services/hermes/group-chat/index.ts b/packages/server/src/services/hermes/group-chat/index.ts index 7c9459b..2b858f8 100644 --- a/packages/server/src/services/hermes/group-chat/index.ts +++ b/packages/server/src/services/hermes/group-chat/index.ts @@ -925,9 +925,7 @@ export class GroupChatServer { ack?.({ id: savedMsg.id }) const mentionDepth = normalizeMentionDepth(data.mentionDepth) - const shouldRouteMentions = - savedMsg.role === 'user' || - (savedMsg.role === 'assistant' && mentionDepth < 2) + const shouldRouteMentions = savedMsg.role === 'user' if (shouldRouteMentions) { // Server-side @mention routing — parse user mentions and invoke agents directly. @@ -1046,7 +1044,7 @@ export class GroupChatServer { }) } - private handleContextStatus(socket: Socket, data: { roomId?: string; agentName?: string; status?: string }): void { + private handleContextStatus(socket: Socket, data: { roomId?: string; agentName?: string; status?: string; totalTokens?: number }): void { const roomId = data.roomId || 'general' const agentName = data.agentName || '' const status = data.status || '' @@ -1072,6 +1070,11 @@ export class GroupChatServer { agentName, status, }) + + if (typeof data.totalTokens === 'number' && Number.isFinite(data.totalTokens) && data.totalTokens >= 0) { + this.storage.updateRoomTotalTokens(roomId, Math.floor(data.totalTokens)) + this.nsp.to(roomId).emit('room_updated', { roomId, totalTokens: Math.floor(data.totalTokens) }) + } } private async handleInterruptAgent(socket: Socket, data: { roomId?: string; agentName?: string }, ack?: (response?: unknown) => void): Promise { diff --git a/packages/server/src/services/hermes/run-chat/compression.ts b/packages/server/src/services/hermes/run-chat/compression.ts index f203237..9e8b8a0 100644 --- a/packages/server/src/services/hermes/run-chat/compression.ts +++ b/packages/server/src/services/hermes/run-chat/compression.ts @@ -24,6 +24,17 @@ interface RunChatCompressionConfig { compressor: Partial } +export class ContextWindowTooSmallError extends Error { + constructor(message: string) { + super(message) + this.name = 'ContextWindowTooSmallError' + } +} + +function isContextWindowTooSmallError(err: unknown): err is ContextWindowTooSmallError { + return err instanceof ContextWindowTooSmallError || (err instanceof Error && err.name === 'ContextWindowTooSmallError') +} + function isSnapshotUsable( snapshot: { lastMessageIndex: number } | null, history: ChatMessage[], @@ -167,10 +178,10 @@ export async function buildCompressedHistory( emit: (event: string, payload: any) => void, sessionMap: Map, modelContext: { model?: string | null; provider?: string | null } = {}, + contextTokenEstimator?: (messages: ChatMessage[]) => Promise, ): Promise { try { let history = await buildDbHistory(sessionId, { excludeLastUser: true }) - if (history.length === 0) return [] const contextLength = getModelContextLength({ profile, @@ -185,7 +196,40 @@ export async function buildCompressedHistory( } const cState = getOrCreateSession(sessionMap, sessionId) const assembledTokens = await calcAndUpdateUsage(sessionId, cState, emit) - let totalTokens = assembledTokens.inputTokens + assembledTokens.outputTokens + const estimateFullContextTokens = async (messages: ChatMessage[], fallback: number) => { + try { + const estimate = await contextTokenEstimator?.(messages) + if (typeof estimate === 'number' && Number.isFinite(estimate) && estimate > 0) return Math.floor(estimate) + } catch (err) { + logger.warn(err, '[context-compress] session=%s: full context token estimate failed; using message-only estimate', sessionId) + } + return fallback + } + const emitContextUsage = (contextTokens: number) => { + cState.contextTokens = contextTokens + emit('usage.updated', { + event: 'usage.updated', + session_id: sessionId, + inputTokens: cState.inputTokens ?? assembledTokens.inputTokens, + outputTokens: cState.outputTokens ?? assembledTokens.outputTokens, + contextTokens, + }) + } + const messageOnlyTotalTokens = assembledTokens.inputTokens + assembledTokens.outputTokens + let totalTokens = messageOnlyTotalTokens + + if (history.length === 0) { + totalTokens = await estimateFullContextTokens([], 0) + if (totalTokens > triggerTokens) { + throw new ContextWindowTooSmallError( + `Context window is too small: system prompt and tool schemas already use ~${totalTokens} tokens, exceeding compression threshold ${triggerTokens}. Increase model context length, raise compression.threshold, or disable some tools.`, + ) + } + if (totalTokens > 0) emitContextUsage(totalTokens) + return [] + } + + const canCompressHistory = history.length > 4 const snapshot = getCompressionSnapshot(sessionId) const staleSnapshot = snapshot && !isSnapshotUsable(snapshot, history) if (staleSnapshot) { @@ -193,15 +237,40 @@ export async function buildCompressedHistory( sessionId, snapshot.lastMessageIndex, history.length) const staleHistory = buildSnapshotHistory(snapshot, history, compressionConfig.compressor) || history const staleUsage = estimateUsageTokensFromMessages(staleHistory) - totalTokens = staleUsage.inputTokens + staleUsage.outputTokens + totalTokens = await estimateFullContextTokens(staleHistory, staleUsage.inputTokens + staleUsage.outputTokens) + emitContextUsage(totalTokens) + logger.info({ + sessionId, + profile, + messages: staleHistory.length, + messageOnlyTokens: staleUsage.inputTokens + staleUsage.outputTokens, + fullContextTokens: totalTokens, + triggerTokens, + decision: totalTokens > triggerTokens ? 'compress' : 'skip', + snapshot: 'stale', + }, '[context-compress] threshold check') } if (snapshot && !staleSnapshot) { const newMessages = history.slice(snapshot.lastMessageIndex + 1) + const snapshotHistory = buildSnapshotHistory(snapshot, history, compressionConfig.compressor) || history + const snapshotUsage = estimateUsageTokensFromMessages(snapshotHistory) + totalTokens = await estimateFullContextTokens(snapshotHistory, snapshotUsage.inputTokens + snapshotUsage.outputTokens) + emitContextUsage(totalTokens) + logger.info({ + sessionId, + profile, + messages: snapshotHistory.length, + messageOnlyTokens: snapshotUsage.inputTokens + snapshotUsage.outputTokens, + fullContextTokens: totalTokens, + triggerTokens, + decision: totalTokens > triggerTokens ? 'compress' : 'skip', + snapshot: 'usable', + }, '[context-compress] threshold check') logger.info('[context-compress] session=%s: snapshot at %d, %d new messages, assembled ~%d tokens (threshold %d)', sessionId, snapshot.lastMessageIndex, newMessages.length, totalTokens, triggerTokens) if (totalTokens <= triggerTokens) { - history = buildSnapshotHistory(snapshot, history, compressionConfig.compressor) || history + history = snapshotHistory } else { history = await compressHistory(history, newMessages, sessionId, upstream, apiKey, cState, totalTokens, emit, sessionMap, modelContext, compressionConfig.compressor) } @@ -211,7 +280,24 @@ export async function buildCompressedHistory( } else { history = await compressHistory(history, null, sessionId, upstream, apiKey, cState, totalTokens, emit, sessionMap, modelContext, compressionConfig.compressor) } - } else if (history.length > 4) { + } else { + totalTokens = await estimateFullContextTokens(history, totalTokens) + emitContextUsage(totalTokens) + logger.info({ + sessionId, + profile, + messages: history.length, + messageOnlyTokens: messageOnlyTotalTokens, + fullContextTokens: totalTokens, + triggerTokens, + decision: totalTokens > triggerTokens ? 'compress' : 'skip', + snapshot: 'none', + }, '[context-compress] threshold check') + if (!canCompressHistory && totalTokens > triggerTokens) { + throw new ContextWindowTooSmallError( + `Context window is too small: fixed prompt/tool overhead plus ${history.length} history messages uses ~${totalTokens} tokens, exceeding compression threshold ${triggerTokens}, and there is not enough history to compress. Increase model context length, raise compression.threshold, or disable some tools.`, + ) + } if (totalTokens <= triggerTokens) { logger.info('[context-compress] session=%s: %d messages, ~%d tokens — under threshold, skip', sessionId, history.length, totalTokens) } else { @@ -221,6 +307,7 @@ export async function buildCompressedHistory( return history } catch (err) { + if (isContextWindowTooSmallError(err)) throw err logger.warn(err, '[chat-run-socket] failed to build compressed history for session %s', sessionId) return [] } @@ -310,6 +397,7 @@ export async function forceCompressBridgeHistory( sessionId: string, profile: string, _messages: ChatMessage[], + beforeTokenOverride?: number | null, ): Promise { const history = await buildDbHistory(sessionId, { excludeLastUser: true }) @@ -334,7 +422,9 @@ export async function forceCompressBridgeHistory( const contextLength = getModelContextLength({ profile, model: session?.model, provider: session?.provider }) const compressionConfig = await getRunChatCompressionConfig(session?.profile || profile, contextLength) const beforeUsage = estimateSnapshotAwareHistoryUsage(sessionId, history) - const totalTokens = beforeUsage.tokenCount + const totalTokens = typeof beforeTokenOverride === 'number' && Number.isFinite(beforeTokenOverride) && beforeTokenOverride > 0 + ? Math.floor(beforeTokenOverride) + : beforeUsage.tokenCount bridgeLogger.info({ sessionId, profile, diff --git a/packages/server/src/services/hermes/run-chat/handle-bridge-run.ts b/packages/server/src/services/hermes/run-chat/handle-bridge-run.ts index ea7e8ee..a2cbfd9 100644 --- a/packages/server/src/services/hermes/run-chat/handle-bridge-run.ts +++ b/packages/server/src/services/hermes/run-chat/handle-bridge-run.ts @@ -135,6 +135,26 @@ export async function handleBridgeRun( emit, sessionMap, { model: resolvedModel, provider: resolvedProvider }, + async (messages) => { + const estimate = await bridge.contextEstimate( + session_id, + messages, + fullInstructions, + profile, + { model: resolvedModel, provider: resolvedProvider }, + ) + bridgeLogger.info({ + sessionId: session_id, + profile, + model: resolvedModel, + provider: resolvedProvider, + messages: estimate.message_count, + toolCount: estimate.tool_count, + systemPromptChars: estimate.system_prompt_chars, + fullContextTokens: estimate.token_count, + }, '[chat-run-socket] full context estimate') + return estimate.token_count + }, ) const bridgeHistory = history @@ -315,9 +335,19 @@ async function applyBridgeChunkAsync( } else if (evType === 'bridge.compression.requested') { const bridgeHistory = await buildDbHistory(sessionId, { excludeLastUser: true }) const bridgeUsage = estimateUsageTokensFromMessages(bridgeHistory) - const tokenCount = bridgeHistory.length > 0 - ? bridgeUsage.inputTokens + bridgeUsage.outputTokens - : ev.approx_tokens + const messageOnlyTokens = bridgeUsage.inputTokens + bridgeUsage.outputTokens + const tokenCount = typeof ev.approx_tokens === 'number' && Number.isFinite(ev.approx_tokens) && ev.approx_tokens > 0 + ? ev.approx_tokens + : messageOnlyTokens + bridgeLogger.info({ + sessionId, + profile, + bridgeMessages: ev.message_count, + dbMessages: bridgeHistory.length, + messageOnlyTokens, + fullContextTokens: tokenCount, + source: typeof ev.approx_tokens === 'number' ? 'bridge' : 'message-only-fallback', + }, '[chat-run-socket] bridge compression token estimate') const payload = { event: 'compression.started', run_id: chunk.run_id, @@ -334,6 +364,7 @@ async function applyBridgeChunkAsync( sessionId, profile, ev.messages as ChatMessage[], + typeof ev.approx_tokens === 'number' ? ev.approx_tokens : undefined, ) state.bridgeCompressionResults = state.bridgeCompressionResults || {} state.bridgeCompressionResults[String(ev.request_id)] = compressed @@ -357,7 +388,9 @@ async function applyBridgeChunkAsync( totalMessages: compressionResult?.beforeMessages ?? ev.message_count, resultMessages: compressionResult?.resultMessages ?? ev.result_messages, beforeTokens: compressionResult?.beforeTokens ?? ev.approx_tokens, - afterTokens: compressionResult?.afterTokens, + afterTokens: typeof ev.result_approx_tokens === 'number' && Number.isFinite(ev.result_approx_tokens) && ev.result_approx_tokens > 0 + ? ev.result_approx_tokens + : compressionResult?.afterTokens, summaryTokens: compressionResult?.summaryTokens, verbatimCount: compressionResult?.verbatimCount, compressedStartIndex: compressionResult?.compressedStartIndex, diff --git a/packages/server/src/services/hermes/run-chat/index.ts b/packages/server/src/services/hermes/run-chat/index.ts index 839ff43..32a12ae 100644 --- a/packages/server/src/services/hermes/run-chat/index.ts +++ b/packages/server/src/services/hermes/run-chat/index.ts @@ -266,6 +266,7 @@ export class ChatRunSocket { events: state.isWorking ? state.events : [], inputTokens: state.inputTokens, outputTokens: state.outputTokens, + contextTokens: state.contextTokens, queueLength: state.queue?.length || 0, }) diff --git a/packages/server/src/services/hermes/run-chat/types.ts b/packages/server/src/services/hermes/run-chat/types.ts index cb52513..a843b22 100644 --- a/packages/server/src/services/hermes/run-chat/types.ts +++ b/packages/server/src/services/hermes/run-chat/types.ts @@ -46,6 +46,7 @@ export interface SessionState { profile?: string inputTokens?: number outputTokens?: number + contextTokens?: number isAborting?: boolean queue: QueuedRun[] responseRun?: ResponseRunState diff --git a/tests/server/context-engine.test.ts b/tests/server/context-engine.test.ts index 8dba8e5..f782a91 100644 --- a/tests/server/context-engine.test.ts +++ b/tests/server/context-engine.test.ts @@ -203,6 +203,114 @@ describe('ContextEngine.buildContext', () => { expect(mockSummarize).not.toHaveBeenCalled() }) + it('records full context token estimates without compressing when under threshold', async () => { + const messages = makeMessages(3) + mockFetcher.getMessages = vi.fn().mockReturnValue(messages) + const contextTokenEstimator = vi.fn().mockResolvedValue(19_379) + + const result = await engine.buildContext({ + roomId: 'room-1', + agentId: 'agent-1', + agentName: 'Claude', + agentDescription: 'Helper', + agentSocketId: 'agent-socket', + roomName: 'general', + memberNames: ['Alice'], + members: [{ userId: 'u1', name: 'Alice', description: '' }], + upstream: 'http://localhost:8642', + apiKey: null, + currentMessage: messages[messages.length - 1], + contextTokenEstimator, + }) + + expect(result.meta.compressed).toBe(false) + expect(result.meta.contextTokenEstimate).toBe(19_379) + expect(result.meta.messageTokenEstimate).toBeGreaterThan(0) + expect(contextTokenEstimator).toHaveBeenCalledWith( + expect.arrayContaining([{ role: 'assistant', content: expect.stringContaining('[Claude]') }]), + expect.stringContaining('"Claude"'), + ) + expect(mockSummarize).not.toHaveBeenCalled() + }) + + it('uses full context token estimates to trigger group compression', async () => { + const messages = makeMessages(20) + mockFetcher.getMessages = vi.fn().mockReturnValue(messages) + + const result = await engine.buildContext({ + roomId: 'room-1', + agentId: 'agent-1', + agentName: 'Claude', + agentDescription: 'Helper', + agentSocketId: 'agent-socket', + roomName: 'general', + memberNames: [], + members: [], + upstream: 'http://localhost:8642', + apiKey: null, + currentMessage: messages[messages.length - 1], + contextTokenEstimator: vi.fn().mockResolvedValue(120_000), + }) + + expect(result.meta.compressed).toBe(true) + expect(result.meta.contextTokenEstimate).toBe(120_000) + expect(mockSummarize).toHaveBeenCalledTimes(1) + expect(mockFetcher.saveContextSnapshot).toHaveBeenCalledTimes(1) + }) + + it('throws when group prompt and tools exceed threshold with too little history to compress', async () => { + const messages = makeMessages(4) + mockFetcher.getMessages = vi.fn().mockReturnValue(messages) + + await expect(engine.buildContext({ + roomId: 'room-1', + agentId: 'agent-1', + agentName: 'Claude', + agentDescription: 'Helper', + agentSocketId: 'agent-socket', + roomName: 'general', + memberNames: [], + members: [], + upstream: 'http://localhost:8642', + apiKey: null, + currentMessage: messages[messages.length - 1], + contextTokenEstimator: vi.fn().mockResolvedValue(120_000), + })).rejects.toThrow('Context window is too small') + + expect(mockSummarize).not.toHaveBeenCalled() + expect(mockFetcher.saveContextSnapshot).not.toHaveBeenCalled() + }) + + it('throws on snapshot path when overhead plus new messages exceed threshold without compressible history', async () => { + const messages = makeMessages(12) + mockFetcher.getMessages = vi.fn().mockReturnValue(messages) + mockFetcher.getContextSnapshot = vi.fn().mockReturnValue({ + roomId: 'room-1', + summary: 'Existing summary', + lastMessageId: 'msg-9', + lastMessageTimestamp: messages[9].timestamp, + updatedAt: Date.now(), + }) + + await expect(engine.buildContext({ + roomId: 'room-1', + agentId: 'agent-1', + agentName: 'Claude', + agentDescription: 'Helper', + agentSocketId: 'agent-socket', + roomName: 'general', + memberNames: [], + members: [], + upstream: 'http://localhost:8642', + apiKey: null, + currentMessage: messages[messages.length - 1], + contextTokenEstimator: vi.fn().mockResolvedValue(120_000), + })).rejects.toThrow('Context window is too small') + + expect(mockSummarize).not.toHaveBeenCalled() + expect(mockFetcher.saveContextSnapshot).not.toHaveBeenCalled() + }) + it('splits into head/tail and compresses middle when over threshold', async () => { const messages = makeMessages(20) mockFetcher.getMessages = vi.fn().mockReturnValue(messages) diff --git a/tests/server/group-chat-member-sync.test.ts b/tests/server/group-chat-member-sync.test.ts index 5be2731..6b144b5 100644 --- a/tests/server/group-chat-member-sync.test.ts +++ b/tests/server/group-chat-member-sync.test.ts @@ -27,6 +27,7 @@ vi.mock('../../packages/server/src/services/auth', () => ({ })) import { AgentClients } from '../../packages/server/src/services/hermes/group-chat/agent-clients' +import { GroupChatServer } from '../../packages/server/src/services/hermes/group-chat' import { groupChatRoutes, setGroupChatServer } from '../../packages/server/src/routes/hermes/group-chat' function routeHandler(path: string, method: string) { @@ -222,4 +223,37 @@ describe('Group Chat member/agent identity sync', () => { members: [{ id: 'member-1', userId: 'human-1', name: 'Han', description: '', joinedAt: 1 }], }) }) + + it('routes @mentions only from user messages, not agent replies', () => { + const server = Object.create(GroupChatServer.prototype) as any + const emit = vi.fn() + server.rooms = new Map([ + ['room-1', { + hasOnlineMember: vi.fn(() => true), + getOnlineMemberBySocketId: vi.fn((socketId: string) => socketId === 'agent-socket' + ? { userId: 'agent-1', name: '丫鬟' } + : { userId: 'human-1', name: 'Human' }), + }], + ]) + server.socketUserMap = new Map([ + ['human-socket', 'human-1'], + ['agent-socket', 'agent-1'], + ]) + server.userInfoMap = new Map([ + ['human-1', { name: 'Human', description: '' }], + ['agent-1', { name: '丫鬟', description: '' }], + ]) + server.agentClients = { processMentions: vi.fn(async () => undefined) } + server.storage = { + saveMessageAndRefreshRoom: vi.fn((msg: any) => ({ message: msg, totalTokens: 123 })), + } + server.nsp = { to: vi.fn(() => ({ emit })) } + + server.handleMessage({ id: 'human-socket' }, { roomId: 'room-1', content: '@all hi', role: 'user' }, vi.fn()) + expect(server.agentClients.processMentions).toHaveBeenCalledTimes(1) + + server.agentClients.processMentions.mockClear() + server.handleMessage({ id: 'agent-socket' }, { roomId: 'room-1', content: '@all agent says hi', role: 'assistant', mentionDepth: 1 }, vi.fn()) + expect(server.agentClients.processMentions).not.toHaveBeenCalled() + }) }) diff --git a/tests/server/run-chat-compression.test.ts b/tests/server/run-chat-compression.test.ts index 182b55e..a702945 100644 --- a/tests/server/run-chat-compression.test.ts +++ b/tests/server/run-chat-compression.test.ts @@ -162,6 +162,144 @@ describe('run chat compression trigger', () => { ) }) + it('uses full context estimates for compression threshold decisions', async () => { + const messages = Array.from({ length: 10 }, (_, index) => ({ + id: index + 1, + session_id: 'session-1', + role: index === 9 ? 'user' : index % 2 === 0 ? 'user' : 'assistant', + content: `message ${index}`, + timestamp: index + 1, + tool_call_id: null, + tool_calls: null, + tool_name: null, + finish_reason: null, + reasoning_content: null, + })) + getSessionDetailMock.mockReturnValue({ messages }) + calcAndUpdateUsageMock.mockResolvedValue({ inputTokens: 1_000, outputTokens: 0 }) + compressorCompressMock.mockResolvedValue({ + messages: [{ role: 'user', content: 'compressed by full context estimate' }], + meta: { + compressed: true, + llmCompressed: true, + totalMessages: 9, + summaryTokenEstimate: 1, + verbatimCount: 0, + compressedStartIndex: 0, + }, + }) + + const { buildCompressedHistory } = await import('../../packages/server/src/services/hermes/run-chat/compression') + const history = await buildCompressedHistory( + 'session-1', + 'default', + 'http://upstream', + undefined, + vi.fn(), + new Map(), + {}, + vi.fn(async () => 120_000), + ) + + expect(history).toEqual([{ role: 'user', content: 'compressed by full context estimate' }]) + expect(compressorCompressMock).toHaveBeenCalledTimes(1) + }) + + it('emits full context token usage when the full estimate is under threshold', async () => { + const messages = Array.from({ length: 10 }, (_, index) => ({ + id: index + 1, + session_id: 'session-1', + role: index === 9 ? 'user' : index % 2 === 0 ? 'user' : 'assistant', + content: `message ${index}`, + timestamp: index + 1, + tool_call_id: null, + tool_calls: null, + tool_name: null, + finish_reason: null, + reasoning_content: null, + })) + getSessionDetailMock.mockReturnValue({ messages }) + calcAndUpdateUsageMock.mockResolvedValue({ inputTokens: 1_000, outputTokens: 900 }) + const emit = vi.fn() + const contextTokenEstimator = vi.fn(async () => 19_379) + + const { buildCompressedHistory } = await import('../../packages/server/src/services/hermes/run-chat/compression') + const history = await buildCompressedHistory( + 'session-1', + 'default', + 'http://upstream', + undefined, + emit, + new Map(), + {}, + contextTokenEstimator, + ) + + expect(history).toHaveLength(9) + expect(contextTokenEstimator).toHaveBeenCalledWith(expect.arrayContaining([{ role: 'user', content: 'message 0' }])) + expect(emit).toHaveBeenCalledWith('usage.updated', expect.objectContaining({ + event: 'usage.updated', + session_id: 'session-1', + inputTokens: 1_000, + outputTokens: 900, + contextTokens: 19_379, + })) + expect(compressorCompressMock).not.toHaveBeenCalled() + }) + + it('throws when fixed prompt and tool schemas exceed threshold before any history exists', async () => { + getSessionDetailMock.mockReturnValue({ messages: [] }) + const emit = vi.fn() + + const { buildCompressedHistory, ContextWindowTooSmallError } = await import('../../packages/server/src/services/hermes/run-chat/compression') + + await expect(buildCompressedHistory( + 'session-1', + 'default', + 'http://upstream', + undefined, + emit, + new Map(), + {}, + vi.fn(async () => 120_000), + )).rejects.toBeInstanceOf(ContextWindowTooSmallError) + + expect(emit).not.toHaveBeenCalledWith('usage.updated', expect.anything()) + expect(compressorCompressMock).not.toHaveBeenCalled() + }) + + it('throws instead of compressing when full context is over threshold but history is too short', async () => { + const messages = Array.from({ length: 5 }, (_, index) => ({ + id: index + 1, + session_id: 'session-1', + role: index === 4 ? 'user' : index % 2 === 0 ? 'user' : 'assistant', + content: `message ${index}`, + timestamp: index + 1, + tool_call_id: null, + tool_calls: null, + tool_name: null, + finish_reason: null, + reasoning_content: null, + })) + getSessionDetailMock.mockReturnValue({ messages }) + calcAndUpdateUsageMock.mockResolvedValue({ inputTokens: 1_000, outputTokens: 0 }) + + const { buildCompressedHistory, ContextWindowTooSmallError } = await import('../../packages/server/src/services/hermes/run-chat/compression') + + await expect(buildCompressedHistory( + 'session-1', + 'default', + 'http://upstream', + undefined, + vi.fn(), + new Map(), + {}, + vi.fn(async () => 120_000), + )).rejects.toBeInstanceOf(ContextWindowTooSmallError) + + expect(compressorCompressMock).not.toHaveBeenCalled() + }) + it('merges partial compression config with defaults', async () => { const messages = Array.from({ length: 10 }, (_, index) => ({ id: index + 1,