Harden bridge broker restart (#862)

This commit is contained in:
ekko
2026-05-20 10:02:15 +08:00
committed by GitHub
parent 0547fd6b6a
commit 210b0ee6c2
8 changed files with 280 additions and 9 deletions
@@ -32,6 +32,7 @@ export type AgentBridgeStatus = 'running' | 'complete' | 'interrupted' | 'error'
export interface AgentBridgeOptions {
endpoint?: string
timeoutMs?: number
connectRetryMs?: number
}
export interface AgentBridgeRequestOptions {
@@ -108,11 +109,13 @@ export class AgentBridgeError extends Error {
export class AgentBridgeClient {
readonly endpoint: string
readonly timeoutMs: number
readonly connectRetryMs: number
private lock: Promise<unknown> = Promise.resolve()
constructor(options: AgentBridgeOptions = {}) {
this.endpoint = options.endpoint || process.env.HERMES_AGENT_BRIDGE_ENDPOINT || DEFAULT_AGENT_BRIDGE_ENDPOINT
this.timeoutMs = options.timeoutMs ?? envPositiveInt('HERMES_AGENT_BRIDGE_TIMEOUT_MS') ?? DEFAULT_AGENT_BRIDGE_TIMEOUT_MS
this.connectRetryMs = options.connectRetryMs ?? envPositiveInt('HERMES_AGENT_BRIDGE_CONNECT_RETRY_MS') ?? 5000
}
private summarizePayload(payload: Record<string, unknown>): Record<string, unknown> {
@@ -172,7 +175,7 @@ export class AgentBridgeClient {
return undefined
}
private connectSocket(): Promise<Socket> {
private connectSocketOnce(): Promise<Socket> {
return new Promise((resolveConnect, rejectConnect) => {
const endpoint = this.endpoint
let socket: Socket
@@ -207,6 +210,25 @@ export class AgentBridgeClient {
})
}
private isRetryableConnectError(err: any): boolean {
const code = String(err?.code || '')
return ['ECONNREFUSED', 'ENOENT', 'ECONNRESET', 'EPIPE', 'ETIMEDOUT'].includes(code)
}
private async connectSocket(): Promise<Socket> {
const deadline = Date.now() + Math.max(0, this.connectRetryMs)
for (;;) {
try {
return await this.connectSocketOnce()
} catch (err) {
if (!this.isRetryableConnectError(err) || Date.now() >= deadline) {
throw err
}
await delay(100)
}
}
}
private readResponse(socket: Socket, timeoutMs: number): Promise<string> {
return new Promise((resolveRead, rejectRead) => {
let buffer = ''
@@ -1637,7 +1637,84 @@ def _send_bridge_request(endpoint: str, req: dict[str, Any], timeout: float) ->
pass
def _tcp_endpoint_port(endpoint: str) -> int | None:
parsed = urlparse(endpoint)
if parsed.scheme != "tcp":
return None
try:
port = int(parsed.port or 0)
return port if port > 0 else None
except (TypeError, ValueError):
return None
def _windows_listening_pids_on_port(port: int) -> list[int]:
if os.name != "nt":
return []
try:
result = subprocess.run(
["netstat.exe", "-ano", "-p", "tcp"],
check=False,
capture_output=True,
text=True,
timeout=5,
)
except Exception:
return []
pids: set[int] = set()
for line in result.stdout.splitlines():
parts = line.strip().split()
if len(parts) < 5:
continue
proto, local_address, _remote_address, state, pid_raw = parts[:5]
if proto.upper() != "TCP" or state.upper() != "LISTENING":
continue
if not local_address.endswith(f":{port}"):
continue
try:
pid = int(pid_raw)
except ValueError:
continue
if pid > 0 and pid != os.getpid():
pids.add(pid)
return sorted(pids)
def _kill_windows_endpoint_occupants(endpoint: str) -> None:
if os.name != "nt":
return
port = _tcp_endpoint_port(endpoint)
if not port:
return
for pid in _windows_listening_pids_on_port(port):
try:
print(
f"[hermes-bridge] killing stale process tree pid={pid} port={port}",
file=sys.stderr,
flush=True,
)
subprocess.run(
["taskkill.exe", "/PID", str(pid), "/T", "/F"],
check=False,
capture_output=True,
text=True,
timeout=10,
)
except Exception as exc:
print(
f"[hermes-bridge] failed to kill stale process pid={pid}: {exc}",
file=sys.stderr,
flush=True,
)
deadline = time.time() + 3
while time.time() < deadline:
if not _windows_listening_pids_on_port(port):
return
time.sleep(0.1)
def _make_listen_socket(endpoint: str) -> socket.socket:
_kill_windows_endpoint_occupants(endpoint)
if endpoint.startswith("ipc://"):
if not hasattr(socket, "AF_UNIX"):
raise RuntimeError("ipc:// endpoints require Unix domain socket support; use tcp://host:port on this platform")
@@ -1,11 +1,14 @@
import { execFileSync, spawn, type ChildProcess } from 'child_process'
import { existsSync, readFileSync } from 'fs'
import { createServer } from 'net'
import { dirname, isAbsolute, join, resolve } from 'path'
import { logger } from '../../logger'
import { detectHermesHome, getHermesBin } from '../hermes-path'
import { DEFAULT_AGENT_BRIDGE_ENDPOINT } from './client'
const DEFAULT_AGENT_BRIDGE_STARTUP_TIMEOUT_MS = 120000
const DEFAULT_AGENT_BRIDGE_RESTART_DELAY_MS = 1000
const MAX_AGENT_BRIDGE_RESTART_DELAY_MS = 30000
export interface AgentBridgeManagerOptions {
endpoint?: string
@@ -204,15 +207,94 @@ function bridgeScriptPath(): string {
return found
}
function isTcpEndpoint(endpoint: string): boolean {
return endpoint.startsWith('tcp://')
}
async function canListenTcpEndpoint(endpoint: string): Promise<boolean> {
const url = new URL(endpoint)
const host = url.hostname || '127.0.0.1'
const port = Number(url.port)
if (!Number.isFinite(port) || port <= 0) return false
return await new Promise<boolean>((resolveAvailable) => {
const probe = createServer()
const done = (available: boolean) => {
probe.removeAllListeners()
resolveAvailable(available)
}
probe.once('error', () => done(false))
probe.listen(port, host, () => {
probe.close(() => done(true))
})
})
}
function tcpEndpointPort(endpoint: string): number | undefined {
if (!isTcpEndpoint(endpoint)) return undefined
const url = new URL(endpoint)
const port = Number(url.port)
return Number.isFinite(port) && port > 0 ? port : undefined
}
function windowsListeningPidsOnPort(port: number): number[] {
try {
const output = execFileSync('netstat.exe', ['-ano', '-p', 'tcp'], { encoding: 'utf-8', windowsHide: true })
const pids = new Set<number>()
for (const line of output.split(/\r?\n/)) {
const parts = line.trim().split(/\s+/)
if (parts.length < 5) continue
const [proto, localAddress, , state, pidRaw] = parts
if (proto.toUpperCase() !== 'TCP' || state.toUpperCase() !== 'LISTENING') continue
if (!localAddress.endsWith(`:${port}`)) continue
const pid = Number(pidRaw)
if (Number.isFinite(pid) && pid > 0 && pid !== process.pid) pids.add(pid)
}
return [...pids]
} catch {
return []
}
}
async function waitForTcpEndpoint(endpoint: string, timeoutMs: number): Promise<boolean> {
const deadline = Date.now() + timeoutMs
while (Date.now() < deadline) {
if (await canListenTcpEndpoint(endpoint)) return true
await new Promise(resolve => setTimeout(resolve, 100))
}
return canListenTcpEndpoint(endpoint)
}
async function killWindowsEndpointOccupants(endpoint: string): Promise<void> {
const port = tcpEndpointPort(endpoint)
if (!port) return
const pids = windowsListeningPidsOnPort(port)
if (!pids.length) return
for (const pid of pids) {
try {
logger.warn('[agent-bridge] killing stale process tree pid=%d on bridge port %d', pid, port)
execFileSync('taskkill.exe', ['/PID', String(pid), '/T', '/F'], { encoding: 'utf-8', windowsHide: true })
} catch (err) {
logger.warn(err, '[agent-bridge] failed to kill stale bridge process pid=%d', pid)
}
}
await waitForTcpEndpoint(endpoint, 3000)
}
export class AgentBridgeManager {
readonly endpoint: string
endpoint: string
private readonly options: AgentBridgeManagerOptions
private readonly explicitEndpoint: boolean
private child: ChildProcess | null = null
private starting: Promise<void> | null = null
private ready = false
private stopping = false
private restartTimer: NodeJS.Timeout | null = null
private restartAttempts = 0
constructor(options: AgentBridgeManagerOptions = {}) {
this.options = options
this.explicitEndpoint = Boolean(options.endpoint || process.env.HERMES_AGENT_BRIDGE_ENDPOINT)
this.endpoint = options.endpoint || process.env.HERMES_AGENT_BRIDGE_ENDPOINT || DEFAULT_AGENT_BRIDGE_ENDPOINT
}
@@ -223,6 +305,11 @@ export class AgentBridgeManager {
async start(): Promise<void> {
if (this.running) return
if (this.starting) return this.starting
this.stopping = false
if (this.restartTimer) {
clearTimeout(this.restartTimer)
this.restartTimer = null
}
this.starting = this.startProcess()
try {
await this.starting
@@ -234,6 +321,7 @@ export class AgentBridgeManager {
private async startProcess(): Promise<void> {
const script = bridgeScriptPath()
const command = resolveAgentBridgeCommand(this.options)
await this.prepareEndpoint()
const args = [...command.argsPrefix, script, '--endpoint', this.endpoint]
const agentRoot = command.agentRoot
const hermesHome = command.hermesHome
@@ -258,9 +346,11 @@ export class AgentBridgeManager {
this.ready = false
child.once('exit', (code, signal) => {
const shouldRestart = this.ready && !this.stopping && this.child === child && this.autoRestartEnabled()
logger.warn('[agent-bridge] exited code=%s signal=%s', code, signal)
this.ready = false
if (this.child === child) this.child = null
if (shouldRestart) this.scheduleRestart(code, signal)
})
child.stderr?.on('data', chunk => {
@@ -312,6 +402,7 @@ export class AgentBridgeManager {
const parsed = JSON.parse(line)
if (parsed?.event === 'ready') {
this.ready = true
this.restartAttempts = 0
readyResolved = true
cleanup()
resolveReady()
@@ -330,7 +421,51 @@ export class AgentBridgeManager {
logger.info('[agent-bridge] ready at %s', this.endpoint)
}
private async prepareEndpoint(): Promise<void> {
if (!this.explicitEndpoint && process.platform === 'win32' && isTcpEndpoint(this.endpoint)) {
if (!(await canListenTcpEndpoint(this.endpoint))) {
await killWindowsEndpointOccupants(this.endpoint)
}
}
process.env.HERMES_AGENT_BRIDGE_ENDPOINT = this.endpoint
}
private autoRestartEnabled(): boolean {
const raw = String(process.env.HERMES_AGENT_BRIDGE_AUTO_RESTART || '').trim().toLowerCase()
return !['0', 'false', 'no', 'off'].includes(raw)
}
private scheduleRestart(code: number | null, signal: NodeJS.Signals | null): void {
if (this.restartTimer || this.stopping) return
this.restartAttempts += 1
const envDelay = envPositiveInt('HERMES_AGENT_BRIDGE_RESTART_DELAY_MS') ?? DEFAULT_AGENT_BRIDGE_RESTART_DELAY_MS
const delayMs = Math.min(
MAX_AGENT_BRIDGE_RESTART_DELAY_MS,
envDelay * Math.max(1, this.restartAttempts),
)
logger.warn(
'[agent-bridge] broker exited unexpectedly code=%s signal=%s; restarting in %dms (attempt %d)',
code,
signal,
delayMs,
this.restartAttempts,
)
this.restartTimer = setTimeout(() => {
this.restartTimer = null
if (this.stopping) return
this.start().catch((err) => {
logger.warn(err, '[agent-bridge] automatic restart failed')
if (!this.stopping) this.scheduleRestart(null, null)
})
}, delayMs)
}
async stop(): Promise<void> {
this.stopping = true
if (this.restartTimer) {
clearTimeout(this.restartTimer)
this.restartTimer = null
}
const child = this.child
if (!child) return
this.ready = false