From 210b0ee6c28157a0eba4a95c0552528f7c9e634f Mon Sep 17 00:00:00 2001 From: ekko <152005280+EKKOLearnAI@users.noreply.github.com> Date: Wed, 20 May 2026 10:02:15 +0800 Subject: [PATCH] Harden bridge broker restart (#862) --- docs/cli-chat-sessions.md | 5 + .../server/src/controllers/hermes/config.ts | 2 +- .../server/src/controllers/hermes/profiles.ts | 8 +- packages/server/src/index.ts | 3 +- .../services/hermes/agent-bridge/client.ts | 24 ++- .../hermes/agent-bridge/hermes_bridge.py | 77 ++++++++++ .../services/hermes/agent-bridge/manager.ts | 137 +++++++++++++++++- tests/server/agent-bridge-manager.test.ts | 33 +++++ 8 files changed, 280 insertions(+), 9 deletions(-) diff --git a/docs/cli-chat-sessions.md b/docs/cli-chat-sessions.md index 343ac40..1adbcd4 100644 --- a/docs/cli-chat-sessions.md +++ b/docs/cli-chat-sessions.md @@ -437,7 +437,10 @@ chatRunServer.init() |------|------| | `HERMES_AGENT_BRIDGE_ENDPOINT` | Bridge endpoint;Windows 默认 `tcp://127.0.0.1:18765`,macOS/Linux 默认 `ipc:///tmp/hermes-agent-bridge.sock` | | `HERMES_AGENT_BRIDGE_TIMEOUT_MS` | Node 等待 bridge 请求响应的超时,默认 `120000` ms | +| `HERMES_AGENT_BRIDGE_CONNECT_RETRY_MS` | Node 连接 bridge socket 失败时的短重试窗口,默认 `5000` ms | | `HERMES_AGENT_BRIDGE_STARTUP_TIMEOUT_MS` | Node 等待 Python bridge ready 的超时,默认 `120000` ms | +| `HERMES_AGENT_BRIDGE_AUTO_RESTART` | bridge broker 意外退出后是否自动重启;设为 `0`/`false`/`no`/`off` 可关闭,默认开启 | +| `HERMES_AGENT_BRIDGE_RESTART_DELAY_MS` | bridge broker 自动重启基础延迟,默认 `1000` ms,连续失败时最多退避到 `30000` ms | | `HERMES_AGENT_BRIDGE_PYTHON` | 指定 Python 解释器路径 | | `HERMES_AGENT_ROOT` | hermes-agent 安装目录 | | `HERMES_AGENT_BRIDGE_UV` | 指定 uv 可执行文件路径 | @@ -446,6 +449,8 @@ chatRunServer.init() | `HERMES_BRIDGE_MAX_TURNS` | 覆盖 bridge 最大轮数 | | `UV` | uv 可执行文件路径 fallback | +正常使用不需要配置这些变量。Windows 下如果默认 TCP 端口被旧 bridge/broker/worker 占用,新 bridge 会先按端口杀掉旧进程树,再用同一个 endpoint 重建。 + Windows 首次启动慢时可以临时放大: ```powershell diff --git a/packages/server/src/controllers/hermes/config.ts b/packages/server/src/controllers/hermes/config.ts index ae4e460..9047f3d 100644 --- a/packages/server/src/controllers/hermes/config.ts +++ b/packages/server/src/controllers/hermes/config.ts @@ -81,7 +81,7 @@ function deepMerge(target: Record, source: Record): Re async function destroyBridgeProfile(profile: string): Promise { try { - const result = await new AgentBridgeClient().destroyProfile(profile) + const result = await new AgentBridgeClient({ connectRetryMs: 0, timeoutMs: 5000 }).destroyProfile(profile) logger.info('[config] destroyed bridge sessions after gateway restart profile=%s destroyed=%s', profile, result.destroyed) } catch (err) { logger.warn(err, '[config] failed to destroy bridge sessions after gateway restart profile=%s', profile) diff --git a/packages/server/src/controllers/hermes/profiles.ts b/packages/server/src/controllers/hermes/profiles.ts index b7004c6..f151fcc 100644 --- a/packages/server/src/controllers/hermes/profiles.ts +++ b/packages/server/src/controllers/hermes/profiles.ts @@ -11,6 +11,8 @@ import { detectHermesRootHome } from '../../services/hermes/hermes-path' import { getActiveProfileName } from '../../services/hermes/hermes-profile' import type { HermesProfile } from '../../services/hermes/hermes-cli' +const bridgeCleanupClient = () => new AgentBridgeClient({ connectRetryMs: 0, timeoutMs: 5000 }) + const RESERVED_PROFILE_NAMES = new Set([ 'hermes', 'default', 'test', 'tmp', 'root', 'sudo', ]) @@ -216,7 +218,7 @@ export async function remove(ctx: any) { } try { try { - const result = await new AgentBridgeClient().destroyProfile(name) + const result = await bridgeCleanupClient().destroyProfile(name) logger.info('[profiles] destroyed bridge sessions for deleted profile "%s" destroyed=%s', name, result.destroyed) } catch (err) { logger.warn(err, '[profiles] failed to destroy bridge sessions for deleted profile "%s"', name) @@ -294,9 +296,7 @@ export async function switchProfile(ctx: any) { // Destroy all bridge sessions so they get recreated with the new profile config try { - const { AgentBridgeClient } = await import('../../services/hermes/agent-bridge') - const bridge = new AgentBridgeClient() - await bridge.destroyAll() + await bridgeCleanupClient().destroyAll() logger.info('[switchProfile] destroyed all bridge sessions for profile "%s"', name) } catch (err: any) { logger.warn(err, '[switchProfile] failed to destroy bridge sessions') diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 0e4d5b1..8e9b4ec 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -39,10 +39,9 @@ process.on('uncaughtException', (err) => { }) process.on('unhandledRejection', (reason) => { - console.error('FATAL: Unhandled rejection') + console.error('Unhandled rejection') console.error(reason) logger.error(reason, 'Unhandled rejection') - process.exit(1) }) let server: any = null diff --git a/packages/server/src/services/hermes/agent-bridge/client.ts b/packages/server/src/services/hermes/agent-bridge/client.ts index 1c303ef..b3dfa8b 100644 --- a/packages/server/src/services/hermes/agent-bridge/client.ts +++ b/packages/server/src/services/hermes/agent-bridge/client.ts @@ -32,6 +32,7 @@ export type AgentBridgeStatus = 'running' | 'complete' | 'interrupted' | 'error' export interface AgentBridgeOptions { endpoint?: string timeoutMs?: number + connectRetryMs?: number } export interface AgentBridgeRequestOptions { @@ -108,11 +109,13 @@ export class AgentBridgeError extends Error { export class AgentBridgeClient { readonly endpoint: string readonly timeoutMs: number + readonly connectRetryMs: number private lock: Promise = Promise.resolve() constructor(options: AgentBridgeOptions = {}) { this.endpoint = options.endpoint || process.env.HERMES_AGENT_BRIDGE_ENDPOINT || DEFAULT_AGENT_BRIDGE_ENDPOINT this.timeoutMs = options.timeoutMs ?? envPositiveInt('HERMES_AGENT_BRIDGE_TIMEOUT_MS') ?? DEFAULT_AGENT_BRIDGE_TIMEOUT_MS + this.connectRetryMs = options.connectRetryMs ?? envPositiveInt('HERMES_AGENT_BRIDGE_CONNECT_RETRY_MS') ?? 5000 } private summarizePayload(payload: Record): Record { @@ -172,7 +175,7 @@ export class AgentBridgeClient { return undefined } - private connectSocket(): Promise { + private connectSocketOnce(): Promise { return new Promise((resolveConnect, rejectConnect) => { const endpoint = this.endpoint let socket: Socket @@ -207,6 +210,25 @@ export class AgentBridgeClient { }) } + private isRetryableConnectError(err: any): boolean { + const code = String(err?.code || '') + return ['ECONNREFUSED', 'ENOENT', 'ECONNRESET', 'EPIPE', 'ETIMEDOUT'].includes(code) + } + + private async connectSocket(): Promise { + const deadline = Date.now() + Math.max(0, this.connectRetryMs) + for (;;) { + try { + return await this.connectSocketOnce() + } catch (err) { + if (!this.isRetryableConnectError(err) || Date.now() >= deadline) { + throw err + } + await delay(100) + } + } + } + private readResponse(socket: Socket, timeoutMs: number): Promise { return new Promise((resolveRead, rejectRead) => { let buffer = '' diff --git a/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py b/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py index 20999d7..d806405 100644 --- a/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py +++ b/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py @@ -1637,7 +1637,84 @@ def _send_bridge_request(endpoint: str, req: dict[str, Any], timeout: float) -> pass +def _tcp_endpoint_port(endpoint: str) -> int | None: + parsed = urlparse(endpoint) + if parsed.scheme != "tcp": + return None + try: + port = int(parsed.port or 0) + return port if port > 0 else None + except (TypeError, ValueError): + return None + + +def _windows_listening_pids_on_port(port: int) -> list[int]: + if os.name != "nt": + return [] + try: + result = subprocess.run( + ["netstat.exe", "-ano", "-p", "tcp"], + check=False, + capture_output=True, + text=True, + timeout=5, + ) + except Exception: + return [] + pids: set[int] = set() + for line in result.stdout.splitlines(): + parts = line.strip().split() + if len(parts) < 5: + continue + proto, local_address, _remote_address, state, pid_raw = parts[:5] + if proto.upper() != "TCP" or state.upper() != "LISTENING": + continue + if not local_address.endswith(f":{port}"): + continue + try: + pid = int(pid_raw) + except ValueError: + continue + if pid > 0 and pid != os.getpid(): + pids.add(pid) + return sorted(pids) + + +def _kill_windows_endpoint_occupants(endpoint: str) -> None: + if os.name != "nt": + return + port = _tcp_endpoint_port(endpoint) + if not port: + return + for pid in _windows_listening_pids_on_port(port): + try: + print( + f"[hermes-bridge] killing stale process tree pid={pid} port={port}", + file=sys.stderr, + flush=True, + ) + subprocess.run( + ["taskkill.exe", "/PID", str(pid), "/T", "/F"], + check=False, + capture_output=True, + text=True, + timeout=10, + ) + except Exception as exc: + print( + f"[hermes-bridge] failed to kill stale process pid={pid}: {exc}", + file=sys.stderr, + flush=True, + ) + deadline = time.time() + 3 + while time.time() < deadline: + if not _windows_listening_pids_on_port(port): + return + time.sleep(0.1) + + def _make_listen_socket(endpoint: str) -> socket.socket: + _kill_windows_endpoint_occupants(endpoint) if endpoint.startswith("ipc://"): if not hasattr(socket, "AF_UNIX"): raise RuntimeError("ipc:// endpoints require Unix domain socket support; use tcp://host:port on this platform") diff --git a/packages/server/src/services/hermes/agent-bridge/manager.ts b/packages/server/src/services/hermes/agent-bridge/manager.ts index 6644e4b..0be26c2 100644 --- a/packages/server/src/services/hermes/agent-bridge/manager.ts +++ b/packages/server/src/services/hermes/agent-bridge/manager.ts @@ -1,11 +1,14 @@ import { execFileSync, spawn, type ChildProcess } from 'child_process' import { existsSync, readFileSync } from 'fs' +import { createServer } from 'net' import { dirname, isAbsolute, join, resolve } from 'path' import { logger } from '../../logger' import { detectHermesHome, getHermesBin } from '../hermes-path' import { DEFAULT_AGENT_BRIDGE_ENDPOINT } from './client' const DEFAULT_AGENT_BRIDGE_STARTUP_TIMEOUT_MS = 120000 +const DEFAULT_AGENT_BRIDGE_RESTART_DELAY_MS = 1000 +const MAX_AGENT_BRIDGE_RESTART_DELAY_MS = 30000 export interface AgentBridgeManagerOptions { endpoint?: string @@ -204,15 +207,94 @@ function bridgeScriptPath(): string { return found } +function isTcpEndpoint(endpoint: string): boolean { + return endpoint.startsWith('tcp://') +} + +async function canListenTcpEndpoint(endpoint: string): Promise { + const url = new URL(endpoint) + const host = url.hostname || '127.0.0.1' + const port = Number(url.port) + if (!Number.isFinite(port) || port <= 0) return false + + return await new Promise((resolveAvailable) => { + const probe = createServer() + const done = (available: boolean) => { + probe.removeAllListeners() + resolveAvailable(available) + } + probe.once('error', () => done(false)) + probe.listen(port, host, () => { + probe.close(() => done(true)) + }) + }) +} + +function tcpEndpointPort(endpoint: string): number | undefined { + if (!isTcpEndpoint(endpoint)) return undefined + const url = new URL(endpoint) + const port = Number(url.port) + return Number.isFinite(port) && port > 0 ? port : undefined +} + +function windowsListeningPidsOnPort(port: number): number[] { + try { + const output = execFileSync('netstat.exe', ['-ano', '-p', 'tcp'], { encoding: 'utf-8', windowsHide: true }) + const pids = new Set() + for (const line of output.split(/\r?\n/)) { + const parts = line.trim().split(/\s+/) + if (parts.length < 5) continue + const [proto, localAddress, , state, pidRaw] = parts + if (proto.toUpperCase() !== 'TCP' || state.toUpperCase() !== 'LISTENING') continue + if (!localAddress.endsWith(`:${port}`)) continue + const pid = Number(pidRaw) + if (Number.isFinite(pid) && pid > 0 && pid !== process.pid) pids.add(pid) + } + return [...pids] + } catch { + return [] + } +} + +async function waitForTcpEndpoint(endpoint: string, timeoutMs: number): Promise { + const deadline = Date.now() + timeoutMs + while (Date.now() < deadline) { + if (await canListenTcpEndpoint(endpoint)) return true + await new Promise(resolve => setTimeout(resolve, 100)) + } + return canListenTcpEndpoint(endpoint) +} + +async function killWindowsEndpointOccupants(endpoint: string): Promise { + const port = tcpEndpointPort(endpoint) + if (!port) return + const pids = windowsListeningPidsOnPort(port) + if (!pids.length) return + for (const pid of pids) { + try { + logger.warn('[agent-bridge] killing stale process tree pid=%d on bridge port %d', pid, port) + execFileSync('taskkill.exe', ['/PID', String(pid), '/T', '/F'], { encoding: 'utf-8', windowsHide: true }) + } catch (err) { + logger.warn(err, '[agent-bridge] failed to kill stale bridge process pid=%d', pid) + } + } + await waitForTcpEndpoint(endpoint, 3000) +} + export class AgentBridgeManager { - readonly endpoint: string + endpoint: string private readonly options: AgentBridgeManagerOptions + private readonly explicitEndpoint: boolean private child: ChildProcess | null = null private starting: Promise | null = null private ready = false + private stopping = false + private restartTimer: NodeJS.Timeout | null = null + private restartAttempts = 0 constructor(options: AgentBridgeManagerOptions = {}) { this.options = options + this.explicitEndpoint = Boolean(options.endpoint || process.env.HERMES_AGENT_BRIDGE_ENDPOINT) this.endpoint = options.endpoint || process.env.HERMES_AGENT_BRIDGE_ENDPOINT || DEFAULT_AGENT_BRIDGE_ENDPOINT } @@ -223,6 +305,11 @@ export class AgentBridgeManager { async start(): Promise { if (this.running) return if (this.starting) return this.starting + this.stopping = false + if (this.restartTimer) { + clearTimeout(this.restartTimer) + this.restartTimer = null + } this.starting = this.startProcess() try { await this.starting @@ -234,6 +321,7 @@ export class AgentBridgeManager { private async startProcess(): Promise { const script = bridgeScriptPath() const command = resolveAgentBridgeCommand(this.options) + await this.prepareEndpoint() const args = [...command.argsPrefix, script, '--endpoint', this.endpoint] const agentRoot = command.agentRoot const hermesHome = command.hermesHome @@ -258,9 +346,11 @@ export class AgentBridgeManager { this.ready = false child.once('exit', (code, signal) => { + const shouldRestart = this.ready && !this.stopping && this.child === child && this.autoRestartEnabled() logger.warn('[agent-bridge] exited code=%s signal=%s', code, signal) this.ready = false if (this.child === child) this.child = null + if (shouldRestart) this.scheduleRestart(code, signal) }) child.stderr?.on('data', chunk => { @@ -312,6 +402,7 @@ export class AgentBridgeManager { const parsed = JSON.parse(line) if (parsed?.event === 'ready') { this.ready = true + this.restartAttempts = 0 readyResolved = true cleanup() resolveReady() @@ -330,7 +421,51 @@ export class AgentBridgeManager { logger.info('[agent-bridge] ready at %s', this.endpoint) } + private async prepareEndpoint(): Promise { + if (!this.explicitEndpoint && process.platform === 'win32' && isTcpEndpoint(this.endpoint)) { + if (!(await canListenTcpEndpoint(this.endpoint))) { + await killWindowsEndpointOccupants(this.endpoint) + } + } + process.env.HERMES_AGENT_BRIDGE_ENDPOINT = this.endpoint + } + + private autoRestartEnabled(): boolean { + const raw = String(process.env.HERMES_AGENT_BRIDGE_AUTO_RESTART || '').trim().toLowerCase() + return !['0', 'false', 'no', 'off'].includes(raw) + } + + private scheduleRestart(code: number | null, signal: NodeJS.Signals | null): void { + if (this.restartTimer || this.stopping) return + this.restartAttempts += 1 + const envDelay = envPositiveInt('HERMES_AGENT_BRIDGE_RESTART_DELAY_MS') ?? DEFAULT_AGENT_BRIDGE_RESTART_DELAY_MS + const delayMs = Math.min( + MAX_AGENT_BRIDGE_RESTART_DELAY_MS, + envDelay * Math.max(1, this.restartAttempts), + ) + logger.warn( + '[agent-bridge] broker exited unexpectedly code=%s signal=%s; restarting in %dms (attempt %d)', + code, + signal, + delayMs, + this.restartAttempts, + ) + this.restartTimer = setTimeout(() => { + this.restartTimer = null + if (this.stopping) return + this.start().catch((err) => { + logger.warn(err, '[agent-bridge] automatic restart failed') + if (!this.stopping) this.scheduleRestart(null, null) + }) + }, delayMs) + } + async stop(): Promise { + this.stopping = true + if (this.restartTimer) { + clearTimeout(this.restartTimer) + this.restartTimer = null + } const child = this.child if (!child) return this.ready = false diff --git a/tests/server/agent-bridge-manager.test.ts b/tests/server/agent-bridge-manager.test.ts index d7ac7ef..5dad236 100644 --- a/tests/server/agent-bridge-manager.test.ts +++ b/tests/server/agent-bridge-manager.test.ts @@ -1,4 +1,5 @@ import { chmodSync, mkdirSync, mkdtempSync, rmSync, writeFileSync } from 'fs' +import { createServer, type Server } from 'net' import { tmpdir } from 'os' import { join } from 'path' import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' @@ -74,4 +75,36 @@ describe('agent bridge manager command resolution', () => { expect(DEFAULT_AGENT_BRIDGE_ENDPOINT).toContain(`hermes-agent-bridge-test-${process.pid}`) expect(DEFAULT_AGENT_BRIDGE_ENDPOINT).not.toBe('ipc:///tmp/hermes-agent-bridge.sock') }) + + it('waits briefly for a restarting bridge socket before failing', async () => { + const endpoint = process.platform === 'win32' + ? `tcp://127.0.0.1:${32000 + (process.pid % 10000)}` + : `ipc://${join(tempDir, 'late-bridge.sock')}` + let server: Server | undefined + + const ready = new Promise((resolve) => { + setTimeout(() => { + server = createServer((socket) => { + socket.once('data', () => { + socket.end(`${JSON.stringify({ ok: true, pong: true })}\n`) + }) + }) + if (endpoint.startsWith('ipc://')) { + server.listen(endpoint.slice('ipc://'.length), resolve) + } else { + const url = new URL(endpoint) + server.listen(Number(url.port), url.hostname, resolve) + } + }, 150) + }) + + try { + const { AgentBridgeClient } = await import('../../packages/server/src/services/hermes/agent-bridge/client') + const client = new AgentBridgeClient({ endpoint, connectRetryMs: 1000, timeoutMs: 1000 }) + await expect(client.ping()).resolves.toMatchObject({ ok: true, pong: true }) + await ready + } finally { + await new Promise((resolve) => server?.close(() => resolve()) ?? resolve()) + } + }) })