|
|
|
@@ -10,6 +10,7 @@ import type {
|
|
|
|
|
import { DEFAULT_COMPRESSION_CONFIG } from './types'
|
|
|
|
|
import { GatewaySummarizer } from './gateway-client'
|
|
|
|
|
import { buildAgentInstructions, buildSummarizationSystemPrompt } from './prompt'
|
|
|
|
|
import { logger } from '../../../services/logger'
|
|
|
|
|
|
|
|
|
|
export class ContextEngine {
|
|
|
|
|
private config: CompressionConfig
|
|
|
|
@@ -78,7 +79,7 @@ export class ContextEngine {
|
|
|
|
|
const messages = allMessages.filter(m => m.timestamp <= input.currentMessage.timestamp)
|
|
|
|
|
const total = messages.length
|
|
|
|
|
|
|
|
|
|
console.log(`[ContextEngine] buildContext START — room=${input.roomId}, agent=${input.agentName}, totalMessagesInDb=${allMessages.length}, afterFilter=${total}`)
|
|
|
|
|
logger.debug(`[ContextEngine] buildContext START — room=${input.roomId}, agent=${input.agentName}, totalMessagesInDb=${allMessages.length}, afterFilter=${total}`)
|
|
|
|
|
|
|
|
|
|
const instructions = buildAgentInstructions({
|
|
|
|
|
agentName: input.agentName,
|
|
|
|
@@ -97,7 +98,7 @@ export class ContextEngine {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const snapshot = this.messageFetcher.getContextSnapshot(input.roomId)
|
|
|
|
|
console.log(`[ContextEngine] snapshot=${snapshot ? `EXISTS (lastMsgId=${snapshot.lastMessageId}, summaryLen=${snapshot.summary.length})` : 'NONE'}`)
|
|
|
|
|
logger.debug(`[ContextEngine] snapshot=${snapshot ? `EXISTS (lastMsgId=${snapshot.lastMessageId}, summaryLen=${snapshot.summary.length})` : 'NONE'}`)
|
|
|
|
|
|
|
|
|
|
// ── Path A: Snapshot exists — incremental ────────────
|
|
|
|
|
if (snapshot) {
|
|
|
|
@@ -117,23 +118,23 @@ export class ContextEngine {
|
|
|
|
|
meta.verbatimCount = newMessages.length
|
|
|
|
|
meta.summaryTokenEstimate = summaryTokens
|
|
|
|
|
|
|
|
|
|
console.log(`[ContextEngine] [Path A] snapshotIdx=${snapshotIdx}, newMessages=${newMessages.length}, summaryTokens=~${summaryTokens}, newTokens=~${newTokens}, totalTokens=~${totalTokens}, threshold=${config.triggerTokens}`)
|
|
|
|
|
console.log(`[ContextEngine] [Path A] EXISTING SUMMARY (${snapshot.summary.length} chars):`, snapshot.summary.slice(0, 300))
|
|
|
|
|
logger.debug(`[ContextEngine] [Path A] snapshotIdx=${snapshotIdx}, newMessages=${newMessages.length}, summaryTokens=~${summaryTokens}, newTokens=~${newTokens}, totalTokens=~${totalTokens}, threshold=${config.triggerTokens}`)
|
|
|
|
|
logger.debug(`[ContextEngine] [Path A] EXISTING SUMMARY (${snapshot.summary.length} chars): ${snapshot.summary.slice(0, 300)}`)
|
|
|
|
|
if (newMessages.length > 0) {
|
|
|
|
|
console.log(`[ContextEngine] [Path A] NEW MESSAGES (${newMessages.length}):`, newMessages.map(m => `[${m.senderName}]: ${m.content.slice(0, 80)}`).join(' | '))
|
|
|
|
|
logger.debug(`[ContextEngine] [Path A] NEW MESSAGES (${newMessages.length}): ${newMessages.map(m => `[${m.senderName}]: ${m.content.slice(0, 80)}`).join(' | ')}`)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Under threshold — return summary + new messages directly
|
|
|
|
|
if (totalTokens <= config.triggerTokens) {
|
|
|
|
|
console.log(`[ContextEngine] [Path A] UNDER threshold — return summary + ${newMessages.length} verbatim msgs directly`)
|
|
|
|
|
logger.debug(`[ContextEngine] [Path A] UNDER threshold — return summary + ${newMessages.length} verbatim msgs directly`)
|
|
|
|
|
const history = this.buildHistory(snapshot.summary, newMessages, input.agentSocketId)
|
|
|
|
|
this.logHistory('Path A (no compress)', history)
|
|
|
|
|
return { conversationHistory: history, instructions, meta }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Over threshold — incremental compress
|
|
|
|
|
console.log(`[ContextEngine] [Path A] OVER threshold — starting INCREMENTAL compression of ${newMessages.length} msgs...`)
|
|
|
|
|
console.log(`[ContextEngine] [Path A] CONTEXT BEFORE COMPRESSION: summary(${snapshot.summary.length} chars) + ${newMessages.length} new msgs`)
|
|
|
|
|
logger.debug(`[ContextEngine] [Path A] OVER threshold — starting INCREMENTAL compression of ${newMessages.length} msgs...`)
|
|
|
|
|
logger.debug(`[ContextEngine] [Path A] CONTEXT BEFORE COMPRESSION: summary(${snapshot.summary.length} chars) + ${newMessages.length} new msgs`)
|
|
|
|
|
meta.compressed = true
|
|
|
|
|
|
|
|
|
|
const t0 = Date.now()
|
|
|
|
@@ -151,8 +152,8 @@ export class ContextEngine {
|
|
|
|
|
this.messageFetcher.saveContextSnapshot(input.roomId, result.summary, lastMsg.id, lastMsg.timestamp)
|
|
|
|
|
|
|
|
|
|
meta.summaryTokenEstimate = this.countTokens(result.summary)
|
|
|
|
|
console.log(`[ContextEngine] [Path A] incremental compression DONE in ${elapsed}ms, newSummaryLen=${result.summary.length}, newLastMsgId=${lastMsg.id}`)
|
|
|
|
|
console.log(`[ContextEngine] [Path A] NEW SUMMARY (${result.summary.length} chars):`, result.summary.slice(0, 300))
|
|
|
|
|
logger.debug(`[ContextEngine] [Path A] incremental compression DONE in ${elapsed}ms, newSummaryLen=${result.summary.length}, newLastMsgId=${lastMsg.id}`)
|
|
|
|
|
logger.debug(`[ContextEngine] [Path A] NEW SUMMARY (${result.summary.length} chars): ${result.summary.slice(0, 300)}`)
|
|
|
|
|
const history = this.buildHistory(result.summary, newMessages, input.agentSocketId)
|
|
|
|
|
this.logHistory('Path A (after incremental compress)', history)
|
|
|
|
|
if (result.sessionId) this.sessionCleaner?.(result.sessionId)
|
|
|
|
@@ -160,7 +161,7 @@ export class ContextEngine {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Compression failed — degrade
|
|
|
|
|
console.warn(`[ContextEngine] [Path A] incremental compression FAILED (${elapsed}ms) — degrading to summary + trimmed verbatim`)
|
|
|
|
|
logger.warn(`[ContextEngine] [Path A] incremental compression FAILED (${elapsed}ms) — degrading to summary + trimmed verbatim`)
|
|
|
|
|
const history = this.buildHistory(snapshot.summary, newMessages, input.agentSocketId)
|
|
|
|
|
this.trimToBudget(history, summaryTokens, config.maxHistoryTokens)
|
|
|
|
|
return { conversationHistory: history, instructions, meta }
|
|
|
|
@@ -170,19 +171,19 @@ export class ContextEngine {
|
|
|
|
|
const totalTokens = this.estimateTokensFromMessages(messages)
|
|
|
|
|
meta.verbatimCount = total
|
|
|
|
|
|
|
|
|
|
console.log(`[ContextEngine] [Path B] no snapshot, totalMessages=${total}, totalTokens=~${totalTokens}, threshold=${config.triggerTokens}`)
|
|
|
|
|
logger.debug(`[ContextEngine] [Path B] no snapshot, totalMessages=${total}, totalTokens=~${totalTokens}, threshold=${config.triggerTokens}`)
|
|
|
|
|
|
|
|
|
|
// Under threshold — pass all messages verbatim
|
|
|
|
|
if (totalTokens <= config.triggerTokens) {
|
|
|
|
|
console.log(`[ContextEngine] [Path B] UNDER threshold — return all ${total} msgs verbatim`)
|
|
|
|
|
logger.debug(`[ContextEngine] [Path B] UNDER threshold — return all ${total} msgs verbatim`)
|
|
|
|
|
const history = messages.map(m => this.mapToHistory(m, input.agentSocketId))
|
|
|
|
|
this.logHistory('Path B (no compress)', history)
|
|
|
|
|
return { conversationHistory: history, instructions, meta }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Over threshold — full compress
|
|
|
|
|
console.log(`[ContextEngine] [Path B] OVER threshold — starting FULL compression of ${total} msgs...`)
|
|
|
|
|
console.log(`[ContextEngine] [Path B] CONTEXT BEFORE COMPRESSION: ${total} msgs, ~${totalTokens} tokens`)
|
|
|
|
|
logger.debug(`[ContextEngine] [Path B] OVER threshold — starting FULL compression of ${total} msgs...`)
|
|
|
|
|
logger.debug(`[ContextEngine] [Path B] CONTEXT BEFORE COMPRESSION: ${total} msgs, ~${totalTokens} tokens`)
|
|
|
|
|
meta.compressed = true
|
|
|
|
|
|
|
|
|
|
const t0 = Date.now()
|
|
|
|
@@ -204,8 +205,8 @@ export class ContextEngine {
|
|
|
|
|
this.messageFetcher.saveContextSnapshot(input.roomId, result.summary, lastCompressedMsg.id, lastCompressedMsg.timestamp)
|
|
|
|
|
|
|
|
|
|
meta.summaryTokenEstimate = this.countTokens(result.summary)
|
|
|
|
|
console.log(`[ContextEngine] [Path B] full compression DONE in ${elapsed}ms, summaryLen=${result.summary.length}, compressed=${toCompress.length} msgs, keptTail=${tail.length} msgs, savedLastMsgId=${lastCompressedMsg.id}`)
|
|
|
|
|
console.log(`[ContextEngine] [Path B] COMPRESSED SUMMARY (${result.summary.length} chars):`, result.summary.slice(0, 300))
|
|
|
|
|
logger.debug(`[ContextEngine] [Path B] full compression DONE in ${elapsed}ms, summaryLen=${result.summary.length}, compressed=${toCompress.length} msgs, keptTail=${tail.length} msgs, savedLastMsgId=${lastCompressedMsg.id}`)
|
|
|
|
|
logger.debug(`[ContextEngine] [Path B] COMPRESSED SUMMARY (${result.summary.length} chars): ${result.summary.slice(0, 300)}`)
|
|
|
|
|
const history = this.buildHistory(result.summary, tail, input.agentSocketId)
|
|
|
|
|
this.logHistory('Path B (after full compress)', history)
|
|
|
|
|
if (result.sessionId) this.sessionCleaner?.(result.sessionId)
|
|
|
|
@@ -213,7 +214,7 @@ export class ContextEngine {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Compression failed — degrade
|
|
|
|
|
console.warn(`[ContextEngine] [Path B] full compression FAILED (${elapsed}ms) — degrading to trimmed verbatim`)
|
|
|
|
|
logger.warn(`[ContextEngine] [Path B] full compression FAILED (${elapsed}ms) — degrading to trimmed verbatim`)
|
|
|
|
|
const history = messages.map(m => this.mapToHistory(m, input.agentSocketId))
|
|
|
|
|
this.trimToBudget(history, 0, config.maxHistoryTokens)
|
|
|
|
|
meta.verbatimCount = history.length
|
|
|
|
@@ -233,7 +234,7 @@ export class ContextEngine {
|
|
|
|
|
if (allMessages.length === 0) return ''
|
|
|
|
|
|
|
|
|
|
const config = { ...this.config }
|
|
|
|
|
console.log(`[ContextEngine] forceCompress room=${roomId}, messages=${allMessages.length}`)
|
|
|
|
|
logger.debug(`[ContextEngine] forceCompress room=${roomId}, messages=${allMessages.length}`)
|
|
|
|
|
|
|
|
|
|
const t0 = Date.now()
|
|
|
|
|
const result = await this.summarize(roomId, allMessages, this._upstream, this._apiKey)
|
|
|
|
@@ -246,7 +247,7 @@ export class ContextEngine {
|
|
|
|
|
const lastCompressedMsg = toCompress[toCompress.length - 1]
|
|
|
|
|
|
|
|
|
|
this.messageFetcher.saveContextSnapshot(roomId, result.summary, lastCompressedMsg.id, lastCompressedMsg.timestamp)
|
|
|
|
|
console.log(`[ContextEngine] forceCompress DONE in ${elapsed}ms`)
|
|
|
|
|
logger.debug(`[ContextEngine] forceCompress DONE in ${elapsed}ms`)
|
|
|
|
|
if (result.sessionId) this.sessionCleaner?.(result.sessionId)
|
|
|
|
|
return result.summary
|
|
|
|
|
}
|
|
|
|
@@ -299,7 +300,7 @@ export class ContextEngine {
|
|
|
|
|
)
|
|
|
|
|
return { summary: result.summary, sessionId: result.sessionId }
|
|
|
|
|
} catch (err: any) {
|
|
|
|
|
console.warn(`[ContextEngine] Summarization failed for room ${roomId}: ${err.message}`)
|
|
|
|
|
logger.warn(`[ContextEngine] Summarization failed for room ${roomId}: ${err.message}`)
|
|
|
|
|
return { summary: null, sessionId: null }
|
|
|
|
|
} finally {
|
|
|
|
|
// Session cleanup handled here if sessionCleaner is provided
|
|
|
|
@@ -348,10 +349,10 @@ export class ContextEngine {
|
|
|
|
|
/** Log assembled history for debugging */
|
|
|
|
|
private logHistory(label: string, history: Array<{ role: string; content: string }>): void {
|
|
|
|
|
const totalTokens = this.estimateTokens(history)
|
|
|
|
|
console.log(`[ContextEngine] ASSEMBLED HISTORY (${label}): ${history.length} entries, ~${totalTokens} tokens`)
|
|
|
|
|
logger.debug(`[ContextEngine] ASSEMBLED HISTORY (${label}): ${history.length} entries, ~${totalTokens} tokens`)
|
|
|
|
|
for (const entry of history) {
|
|
|
|
|
const preview = entry.content.length > 150 ? entry.content.slice(0, 150) + '...' : entry.content
|
|
|
|
|
console.log(` [${entry.role}] ${preview}`)
|
|
|
|
|
logger.debug(` [${entry.role}] ${preview}`)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|