[codex] Harden context compression history handling (#848)
* Use token threshold for chat compression * Add compression settings controls * Use config for chat compression * Cover protected messages in compression tests * Remove message-count compression limit * Harden compression window fallback * Rebuild stale compression snapshots * Harden stale compression snapshots * Update changelog for compression hardening * Prefer local history session details
This commit is contained in:
@@ -10,14 +10,88 @@ import {
|
||||
import { getCompressionSnapshot } from '../../../db/hermes/compression-snapshot'
|
||||
import { ChatContextCompressor, SUMMARY_PREFIX } from '../../../lib/context-compressor'
|
||||
import { getModelContextLength } from '../model-context'
|
||||
import { readConfigYamlForProfile } from '../../config-helpers'
|
||||
import { logger } from '../../logger'
|
||||
import { bridgeLogger } from '../../logger'
|
||||
import { calcAndUpdateUsage, estimateUsageTokensFromMessages } from './usage'
|
||||
import { isAssistantMessageSendable } from './message-format'
|
||||
import type { ChatMessage } from '../../../lib/context-compressor'
|
||||
import type { ChatMessage, CompressionConfig as CompressorConfig } from '../../../lib/context-compressor'
|
||||
import type { SessionState, BridgeCompressionResult } from './types'
|
||||
|
||||
const compressor = new ChatContextCompressor()
|
||||
interface RunChatCompressionConfig {
|
||||
enabled: boolean
|
||||
triggerTokens: number
|
||||
compressor: Partial<CompressorConfig>
|
||||
}
|
||||
|
||||
function isSnapshotUsable(
|
||||
snapshot: { lastMessageIndex: number } | null,
|
||||
history: ChatMessage[],
|
||||
): boolean {
|
||||
return !!snapshot && snapshot.lastMessageIndex >= 0 && snapshot.lastMessageIndex < history.length
|
||||
}
|
||||
|
||||
function buildSnapshotHistory(
|
||||
snapshot: { summary: string; lastMessageIndex: number } | null,
|
||||
history: ChatMessage[],
|
||||
compressionConfig?: Partial<CompressorConfig>,
|
||||
): ChatMessage[] | null {
|
||||
if (!snapshot) return null
|
||||
const headCount = compressionConfig?.headMessageCount || 0
|
||||
const tailCount = compressionConfig?.tailMessageCount || 0
|
||||
const protectedHead = headCount > 0 ? history.slice(0, headCount) : []
|
||||
const summaryMessage = { role: 'user', content: SUMMARY_PREFIX + '\n\n' + snapshot.summary } as ChatMessage
|
||||
|
||||
if (isSnapshotUsable(snapshot, history)) {
|
||||
return [
|
||||
...protectedHead,
|
||||
summaryMessage,
|
||||
...history.slice(snapshot.lastMessageIndex + 1),
|
||||
]
|
||||
}
|
||||
|
||||
const tailStart = Math.max(protectedHead.length, history.length - tailCount)
|
||||
return [
|
||||
...protectedHead,
|
||||
summaryMessage,
|
||||
...history.slice(tailStart),
|
||||
]
|
||||
}
|
||||
|
||||
function clampRatio(value: unknown, fallback: number, min: number, max: number): number {
|
||||
const n = typeof value === 'number' && Number.isFinite(value) ? value : fallback
|
||||
return Math.min(max, Math.max(min, n))
|
||||
}
|
||||
|
||||
function clampInt(value: unknown, fallback: number, min: number, max: number): number {
|
||||
const n = typeof value === 'number' && Number.isFinite(value) ? Math.floor(value) : fallback
|
||||
return Math.min(max, Math.max(min, n))
|
||||
}
|
||||
|
||||
async function getRunChatCompressionConfig(profile: string, contextLength: number): Promise<RunChatCompressionConfig> {
|
||||
let raw: Record<string, any> = {}
|
||||
try {
|
||||
raw = (await readConfigYamlForProfile(profile))?.compression || {}
|
||||
} catch (err) {
|
||||
logger.warn(err, '[context-compress] failed to read compression config for profile %s, using defaults', profile)
|
||||
}
|
||||
|
||||
const threshold = clampRatio(raw.threshold, 0.5, 0.05, 0.95)
|
||||
const targetRatio = clampRatio(raw.target_ratio, 0.2, 0.01, 0.8)
|
||||
const protectLastN = clampInt(raw.protect_last_n, 20, 0, 500)
|
||||
const protectFirstN = clampInt(raw.protect_first_n, 3, 0, 100)
|
||||
|
||||
return {
|
||||
enabled: raw.enabled !== false,
|
||||
triggerTokens: Math.floor(contextLength * threshold),
|
||||
compressor: {
|
||||
triggerTokens: Math.floor(contextLength * threshold),
|
||||
summaryBudget: Math.max(1_000, Math.floor(contextLength * targetRatio)),
|
||||
headMessageCount: protectFirstN,
|
||||
tailMessageCount: protectLastN,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Load conversation history from DB with full message structure (user/assistant/tool).
|
||||
@@ -77,12 +151,7 @@ export function estimateSnapshotAwareHistoryUsage(
|
||||
history: ChatMessage[],
|
||||
): { messageCount: number; tokenCount: number } {
|
||||
const snapshot = getCompressionSnapshot(sessionId)
|
||||
const messages = snapshot
|
||||
? [
|
||||
{ role: 'user', content: SUMMARY_PREFIX + snapshot.summary },
|
||||
...history.slice(snapshot.lastMessageIndex + 1),
|
||||
]
|
||||
: history
|
||||
const messages = buildSnapshotHistory(snapshot, history) || history
|
||||
const usage = estimateUsageTokensFromMessages(messages)
|
||||
return {
|
||||
messageCount: messages.length,
|
||||
@@ -108,29 +177,45 @@ export async function buildCompressedHistory(
|
||||
model: modelContext.model,
|
||||
provider: modelContext.provider,
|
||||
})
|
||||
const triggerTokens = Math.floor(contextLength / 2)
|
||||
const compressionConfig = await getRunChatCompressionConfig(profile, contextLength)
|
||||
const triggerTokens = compressionConfig.triggerTokens
|
||||
if (!compressionConfig.enabled) {
|
||||
logger.info('[context-compress] session=%s: compression disabled by config', sessionId)
|
||||
return history
|
||||
}
|
||||
const cState = getOrCreateSession(sessionMap, sessionId)
|
||||
const assembledTokens = await calcAndUpdateUsage(sessionId, cState, emit)
|
||||
const totalTokens = assembledTokens.inputTokens + assembledTokens.outputTokens
|
||||
let totalTokens = assembledTokens.inputTokens + assembledTokens.outputTokens
|
||||
const snapshot = getCompressionSnapshot(sessionId)
|
||||
const staleSnapshot = snapshot && !isSnapshotUsable(snapshot, history)
|
||||
if (staleSnapshot) {
|
||||
logger.warn('[context-compress] session=%s: stale snapshot index %d for %d history messages; using summary plus safe tail',
|
||||
sessionId, snapshot.lastMessageIndex, history.length)
|
||||
const staleHistory = buildSnapshotHistory(snapshot, history, compressionConfig.compressor) || history
|
||||
const staleUsage = estimateUsageTokensFromMessages(staleHistory)
|
||||
totalTokens = staleUsage.inputTokens + staleUsage.outputTokens
|
||||
}
|
||||
|
||||
if (snapshot) {
|
||||
if (snapshot && !staleSnapshot) {
|
||||
const newMessages = history.slice(snapshot.lastMessageIndex + 1)
|
||||
logger.info('[context-compress] session=%s: snapshot at %d, %d new messages, assembled ~%d tokens (threshold %d)',
|
||||
sessionId, snapshot.lastMessageIndex, newMessages.length, totalTokens, triggerTokens)
|
||||
if (totalTokens <= triggerTokens && newMessages.length <= 150) {
|
||||
history = [
|
||||
{ role: 'user', content: SUMMARY_PREFIX + '\n\n' + snapshot.summary },
|
||||
...newMessages,
|
||||
] as ChatMessage[]
|
||||
if (totalTokens <= triggerTokens) {
|
||||
history = buildSnapshotHistory(snapshot, history, compressionConfig.compressor) || history
|
||||
} else {
|
||||
history = await compressHistory(history, newMessages, sessionId, upstream, apiKey, cState, totalTokens, emit, sessionMap, modelContext)
|
||||
history = await compressHistory(history, newMessages, sessionId, upstream, apiKey, cState, totalTokens, emit, sessionMap, modelContext, compressionConfig.compressor)
|
||||
}
|
||||
} else if (snapshot && staleSnapshot) {
|
||||
if (totalTokens <= triggerTokens) {
|
||||
history = buildSnapshotHistory(snapshot, history, compressionConfig.compressor) || history
|
||||
} else {
|
||||
history = await compressHistory(history, null, sessionId, upstream, apiKey, cState, totalTokens, emit, sessionMap, modelContext, compressionConfig.compressor)
|
||||
}
|
||||
} else if (history.length > 4) {
|
||||
if (totalTokens <= triggerTokens && history.length <= 150) {
|
||||
if (totalTokens <= triggerTokens) {
|
||||
logger.info('[context-compress] session=%s: %d messages, ~%d tokens — under threshold, skip', sessionId, history.length, totalTokens)
|
||||
} else {
|
||||
history = await compressHistory(history, null, sessionId, upstream, apiKey, cState, totalTokens, emit, sessionMap, modelContext)
|
||||
history = await compressHistory(history, null, sessionId, upstream, apiKey, cState, totalTokens, emit, sessionMap, modelContext, compressionConfig.compressor)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -152,6 +237,7 @@ export async function compressHistory(
|
||||
emit: (event: string, payload: any) => void,
|
||||
sessionMap: Map<string, SessionState>,
|
||||
modelContext: { model?: string | null; provider?: string | null } = {},
|
||||
compressionConfig?: Partial<CompressorConfig>,
|
||||
): Promise<ChatMessage[]> {
|
||||
const msgCount = newMessagesOnly ? newMessagesOnly.length : history.length
|
||||
pushState(sessionMap, sessionId, 'compression.started', {
|
||||
@@ -163,6 +249,7 @@ export async function compressHistory(
|
||||
|
||||
try {
|
||||
const session = getSession(sessionId)
|
||||
const compressor = new ChatContextCompressor({ config: compressionConfig })
|
||||
const result = await compressor.compress(history, upstream, apiKey, sessionId, {
|
||||
profile: session?.profile,
|
||||
model: modelContext.model || session?.model,
|
||||
@@ -244,6 +331,8 @@ export async function forceCompressBridgeHistory(
|
||||
const upstream = ''
|
||||
const apiKey = undefined
|
||||
const session = getSession(sessionId)
|
||||
const contextLength = getModelContextLength({ profile, model: session?.model, provider: session?.provider })
|
||||
const compressionConfig = await getRunChatCompressionConfig(session?.profile || profile, contextLength)
|
||||
const beforeUsage = estimateSnapshotAwareHistoryUsage(sessionId, history)
|
||||
const totalTokens = beforeUsage.tokenCount
|
||||
bridgeLogger.info({
|
||||
@@ -256,6 +345,7 @@ export async function forceCompressBridgeHistory(
|
||||
snapshotAware: true,
|
||||
}, '[chat-run-socket] bridge forced compression started')
|
||||
|
||||
const compressor = new ChatContextCompressor({ config: compressionConfig.compressor })
|
||||
const result = await compressor.compress(history, upstream, apiKey, sessionId, {
|
||||
profile: session?.profile || profile,
|
||||
model: session?.model,
|
||||
|
||||
@@ -38,7 +38,7 @@ export async function loadSessionStateFromDb(sid: string, _sessionMap: Map<strin
|
||||
let inputTokens: number
|
||||
let outputTokens: number
|
||||
const snapshot = getCompressionSnapshot(sid)
|
||||
if (snapshot) {
|
||||
if (snapshot && snapshot.lastMessageIndex >= 0 && snapshot.lastMessageIndex < messages.length) {
|
||||
const newMessages = messages.slice(snapshot.lastMessageIndex + 1)
|
||||
const newUsage = estimateUsageTokensFromMessages(newMessages)
|
||||
inputTokens = countTokens(SUMMARY_PREFIX + snapshot.summary) +
|
||||
|
||||
@@ -53,7 +53,7 @@ export async function calcAndUpdateUsage(
|
||||
const snapshot = getCompressionSnapshot(sid)
|
||||
let inputTokens: number
|
||||
let outputTokens: number
|
||||
if (snapshot && msgs.length) {
|
||||
if (snapshot && msgs.length && snapshot.lastMessageIndex >= 0 && snapshot.lastMessageIndex < msgs.length) {
|
||||
const newMessages = msgs.slice(snapshot.lastMessageIndex + 1)
|
||||
const newUsage = estimateUsageTokensFromMessages(newMessages)
|
||||
inputTokens = countTokens(SUMMARY_PREFIX + snapshot.summary) +
|
||||
|
||||
Reference in New Issue
Block a user