diff --git a/packages/client/src/views/hermes/PerformanceView.vue b/packages/client/src/views/hermes/PerformanceView.vue index 790fe45..f7e7c38 100644 --- a/packages/client/src/views/hermes/PerformanceView.vue +++ b/packages/client/src/views/hermes/PerformanceView.vue @@ -69,7 +69,7 @@ function setAutoRefresh(enabled: boolean) { timer = undefined } if (enabled) { - timer = setInterval(() => loadRuntime(false), 3000) + timer = setInterval(() => loadRuntime(false), 5000) } } diff --git a/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py b/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py index a531941..594d106 100755 --- a/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py +++ b/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py @@ -2320,6 +2320,27 @@ class BridgeBroker: def _write_response(self, conn: socket.socket, resp: dict[str, Any]) -> None: _write_json_response(conn, resp) + def _handle_connection(self, conn: socket.socket) -> None: + try: + try: + req = self._read_request(conn) + data = self.handle(req) + resp = {"ok": True, **_jsonable(data)} + except Exception as exc: + resp = { + "ok": False, + "error": str(exc), + "error_type": exc.__class__.__name__, + } + self._write_response(conn, resp) + except Exception as exc: + print(f"[hermes-bridge-broker] connection error: {exc}", file=sys.stderr, flush=True) + finally: + try: + conn.close() + except OSError: + pass + def _gc_idle_workers(self) -> None: now = time.time() if now - self._last_gc < self.GC_INTERVAL_SECONDS: @@ -2346,34 +2367,22 @@ class BridgeBroker: print(json.dumps({"event": "ready", "endpoint": self.endpoint, "mode": "broker"}), flush=True) while not self._stop.is_set(): - conn: socket.socket | None = None try: try: conn, _addr = server.accept() except socket.timeout: self._gc_idle_workers() continue - try: - req = self._read_request(conn) - data = self.handle(req) - resp = {"ok": True, **_jsonable(data)} - except Exception as exc: - resp = { - "ok": False, - "error": str(exc), - "error_type": exc.__class__.__name__, - } - self._write_response(conn, resp) + threading.Thread( + target=self._handle_connection, + args=(conn,), + daemon=True, + name="hermes-bridge-broker-connection", + ).start() except KeyboardInterrupt: break except Exception as exc: print(f"[hermes-bridge-broker] server loop error: {exc}", file=sys.stderr, flush=True) - finally: - if conn is not None: - try: - conn.close() - except OSError: - pass finally: restore_signals() try: diff --git a/tests/server/agent-bridge-python-concurrency.test.ts b/tests/server/agent-bridge-python-concurrency.test.ts index 2a2ba42..fcffb52 100644 --- a/tests/server/agent-bridge-python-concurrency.test.ts +++ b/tests/server/agent-bridge-python-concurrency.test.ts @@ -21,6 +21,7 @@ function runPython(script: string): void { const harness = String.raw` import contextvars import importlib.util +import json import os import sys import threading @@ -501,6 +502,57 @@ finally: bridge._process_exists = original_process_exists assert seen_pids == [12345] +`) + }) + + it('handles broker ping while another broker request is blocked', () => { + runPython(String.raw` +${harness} + +class BlockingBroker(bridge.BridgeBroker): + def handle(self, req): + if req.get("action") == "block": + time.sleep(0.4) + return {"blocked": True} + return super().handle(req) + +class MemoryConn: + def __init__(self, req): + self.request = (json.dumps(req) + "\n").encode("utf-8") + self.response = b"" + self.closed = False + + def recv(self, size): + if not self.request: + return b"" + chunk = self.request[:size] + self.request = self.request[size:] + return chunk + + def sendall(self, payload): + self.response += payload + + def close(self): + self.closed = True + +broker = BlockingBroker("ipc:///tmp/unused.sock") +blocking_conn = MemoryConn({"action": "block"}) +thread = threading.Thread(target=broker._handle_connection, args=(blocking_conn,)) +thread.start() +time.sleep(0.05) + +ping_conn = MemoryConn({"action": "ping"}) +broker._handle_connection(ping_conn) +ping_resp = json.loads(ping_conn.response.decode("utf-8")) +assert ping_resp["ok"] is True, ping_resp +assert ping_resp["pong"] is True, ping_resp +assert ping_conn.closed is True, ping_conn.closed + +thread.join(timeout=2) +assert not thread.is_alive(), blocking_conn.response +blocked_resp = json.loads(blocking_conn.response.decode("utf-8")) +assert blocked_resp["ok"] is True, blocked_resp +assert blocked_resp["blocked"] is True, blocked_resp `) }) })