Fix bridge compression history handling (#726)

* feat(bridge): refactor compression to use DB history and add structured logging

- Extract buildDbHistory() to share message loading between buildCompressedHistory and forceCompressBridgeHistory
- forceCompressBridgeHistory now reads from local DB instead of using Python-provided messages, ensuring consistency with api_server path
- Pass sessionId to compressor for snapshot-aware compression
- Add force_compress flag to bridge chat requests
- Add bridgeLogger structured logging for compression lifecycle
- Simplify schemas, session-sync, and providers

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* fix bridge compression history handling

---------

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
ekko
2026-05-14 21:02:59 +08:00
committed by GitHub
parent 7420f7aad5
commit d0f1e7d1f2
19 changed files with 576 additions and 638 deletions
@@ -1,6 +1,7 @@
import { setTimeout as delay } from 'timers/promises'
import { createConnection, type Socket } from 'net'
import { URL } from 'url'
import { bridgeLogger } from '../../logger'
export const DEFAULT_AGENT_BRIDGE_ENDPOINT = process.platform === 'win32'
? 'tcp://127.0.0.1:18765'
@@ -91,6 +92,36 @@ export class AgentBridgeClient {
this.timeoutMs = options.timeoutMs ?? envPositiveInt('HERMES_AGENT_BRIDGE_TIMEOUT_MS') ?? DEFAULT_AGENT_BRIDGE_TIMEOUT_MS
}
private summarizePayload(payload: Record<string, unknown>): Record<string, unknown> {
const action = String(payload.action || '')
const summary: Record<string, unknown> = { action }
for (const key of ['session_id', 'run_id', 'request_id', 'approval_id', 'profile']) {
if (payload[key] != null) summary[key] = payload[key]
}
if (Array.isArray(payload.conversation_history)) summary.conversation_history_count = payload.conversation_history.length
if (Array.isArray(payload.messages)) summary.messages_count = payload.messages.length
if (typeof payload.message === 'string') summary.message_chars = payload.message.length
else if (Array.isArray(payload.message)) summary.message_parts = payload.message.length
if (typeof payload.command === 'string') summary.command = payload.command
if (typeof payload.text === 'string') summary.text_chars = payload.text.length
if (typeof payload.error === 'string') summary.error = payload.error
if (payload.force_compress === true) summary.force_compress = true
return summary
}
private summarizeResponse(response: Record<string, unknown>): Record<string, unknown> {
const summary: Record<string, unknown> = { ok: response.ok === true }
for (const key of ['session_id', 'run_id', 'request_id', 'status', 'cursor', 'event_cursor']) {
if (response[key] != null) summary[key] = response[key]
}
if (typeof response.delta === 'string') summary.delta_chars = response.delta.length
if (typeof response.output === 'string') summary.output_chars = response.output.length
if (Array.isArray(response.events)) summary.events_count = response.events.length
if (typeof response.error === 'string') summary.error = response.error
if (Array.isArray(response.history)) summary.history_count = response.history.length
return summary
}
async connect(): Promise<this> {
return this
}
@@ -191,16 +222,47 @@ export class AgentBridgeClient {
): Promise<T> {
const run = async (): Promise<T> => {
const timeoutMs = options.timeoutMs || this.timeoutMs
const socket = await this.connectSocket()
socket.write(`${JSON.stringify(payload)}\n`)
const raw = await this.readResponse(socket, timeoutMs)
const response = JSON.parse(raw) as { ok?: boolean; error?: string }
if (!response.ok) {
const error = new AgentBridgeError(response.error || 'Agent bridge request failed')
error.response = response
throw error
const startedAt = Date.now()
const action = String(payload.action || '')
const shouldLogRequest = action !== 'get_output'
if (shouldLogRequest) {
bridgeLogger.info({
endpoint: this.endpoint,
timeoutMs,
request: this.summarizePayload(payload),
}, '[agent-bridge-client] request')
}
try {
const socket = await this.connectSocket()
socket.write(`${JSON.stringify(payload)}\n`)
const raw = await this.readResponse(socket, timeoutMs)
const response = JSON.parse(raw) as { ok?: boolean; error?: string }
if (!response.ok) {
const error = new AgentBridgeError(response.error || 'Agent bridge request failed')
error.response = response
bridgeLogger.warn({
durationMs: Date.now() - startedAt,
response: this.summarizeResponse(response as Record<string, unknown>),
}, '[agent-bridge-client] request rejected')
throw error
}
if (shouldLogRequest) {
bridgeLogger.info({
durationMs: Date.now() - startedAt,
response: this.summarizeResponse(response as Record<string, unknown>),
}, '[agent-bridge-client] response')
}
return response as T
} catch (err: any) {
if (!(err instanceof AgentBridgeError)) {
bridgeLogger.error({
durationMs: Date.now() - startedAt,
err: { message: err?.message, name: err?.name },
request: this.summarizePayload(payload),
}, '[agent-bridge-client] request failed')
}
throw err
}
return response as T
}
const next = this.lock.then(run, run)
@@ -218,6 +280,7 @@ export class AgentBridgeClient {
conversationHistory?: unknown[],
instructions?: string,
profile?: string,
options: { force_compress?: boolean } = {},
): Promise<AgentBridgeChatStarted> {
return this.request<AgentBridgeChatStarted>({
action: 'chat',
@@ -226,6 +289,7 @@ export class AgentBridgeClient {
...(conversationHistory ? { conversation_history: conversationHistory } : {}),
...(instructions ? { instructions } : {}),
...(profile ? { profile } : {}),
...(options.force_compress ? { force_compress: true } : {}),
})
}
@@ -732,6 +732,7 @@ class AgentPool:
instructions: str | None = None,
conversation_history: list[dict[str, Any]] | None = None,
profile: str | None = None,
force_compress: bool = False,
) -> RunRecord:
session = self.get_or_create(session_id, profile=profile)
with session.lock:
@@ -747,14 +748,14 @@ class AgentPool:
thread = threading.Thread(
target=self._run_chat,
args=(session, record, message, instructions, conversation_history, profile),
args=(session, record, message, instructions, conversation_history, profile, force_compress),
daemon=True,
name=f"hermes-bridge-run-{run_id[:8]}",
)
thread.start()
return record
def _run_chat(self, session: AgentSession, record: RunRecord, message: Any, instructions: str | None = None, conversation_history: list[dict[str, Any]] | None = None, profile: str | None = None) -> None:
def _run_chat(self, session: AgentSession, record: RunRecord, message: Any, instructions: str | None = None, conversation_history: list[dict[str, Any]] | None = None, profile: str | None = None, force_compress: bool = False) -> None:
def stream_callback(delta: str) -> None:
with self._lock:
record.deltas.append(str(delta))
@@ -774,6 +775,19 @@ class AgentPool:
except Exception:
previous_approval_callback = None
self._prepersist_user_message(session, message, conversation_history, profile)
if force_compress:
compress = getattr(session.agent, "_compress_context", None)
if callable(compress):
compressed_history, compressed_system = compress(
conversation_history if isinstance(conversation_history, list) else [],
instructions,
approx_tokens=None,
focus_topic="debug_force_compress",
)
if isinstance(compressed_history, list):
conversation_history = compressed_history
if isinstance(compressed_system, str):
instructions = compressed_system
kwargs: dict[str, Any] = dict(
task_id=session.session_id,
stream_callback=stream_callback,
@@ -996,7 +1010,14 @@ class BridgeServer:
instructions = req.get("instructions") or req.get("system_message")
conversation_history = req.get("conversation_history")
profile = req.get("profile")
record = self.pool.start_chat(session_id, message, instructions, conversation_history, profile)
record = self.pool.start_chat(
session_id,
message,
instructions,
conversation_history,
profile,
bool(req.get("force_compress")),
)
if req.get("wait"):
timeout = float(req.get("timeout", 0) or 0)
deadline = time.time() + timeout if timeout > 0 else None