fix performance monitor worker blocking
This commit is contained in:
@@ -69,7 +69,7 @@ function setAutoRefresh(enabled: boolean) {
|
|||||||
timer = undefined
|
timer = undefined
|
||||||
}
|
}
|
||||||
if (enabled) {
|
if (enabled) {
|
||||||
timer = setInterval(() => loadRuntime(false), 3000)
|
timer = setInterval(() => loadRuntime(false), 5000)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2320,6 +2320,27 @@ class BridgeBroker:
|
|||||||
def _write_response(self, conn: socket.socket, resp: dict[str, Any]) -> None:
|
def _write_response(self, conn: socket.socket, resp: dict[str, Any]) -> None:
|
||||||
_write_json_response(conn, resp)
|
_write_json_response(conn, resp)
|
||||||
|
|
||||||
|
def _handle_connection(self, conn: socket.socket) -> None:
|
||||||
|
try:
|
||||||
|
try:
|
||||||
|
req = self._read_request(conn)
|
||||||
|
data = self.handle(req)
|
||||||
|
resp = {"ok": True, **_jsonable(data)}
|
||||||
|
except Exception as exc:
|
||||||
|
resp = {
|
||||||
|
"ok": False,
|
||||||
|
"error": str(exc),
|
||||||
|
"error_type": exc.__class__.__name__,
|
||||||
|
}
|
||||||
|
self._write_response(conn, resp)
|
||||||
|
except Exception as exc:
|
||||||
|
print(f"[hermes-bridge-broker] connection error: {exc}", file=sys.stderr, flush=True)
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
conn.close()
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
def _gc_idle_workers(self) -> None:
|
def _gc_idle_workers(self) -> None:
|
||||||
now = time.time()
|
now = time.time()
|
||||||
if now - self._last_gc < self.GC_INTERVAL_SECONDS:
|
if now - self._last_gc < self.GC_INTERVAL_SECONDS:
|
||||||
@@ -2346,34 +2367,22 @@ class BridgeBroker:
|
|||||||
print(json.dumps({"event": "ready", "endpoint": self.endpoint, "mode": "broker"}), flush=True)
|
print(json.dumps({"event": "ready", "endpoint": self.endpoint, "mode": "broker"}), flush=True)
|
||||||
|
|
||||||
while not self._stop.is_set():
|
while not self._stop.is_set():
|
||||||
conn: socket.socket | None = None
|
|
||||||
try:
|
try:
|
||||||
try:
|
try:
|
||||||
conn, _addr = server.accept()
|
conn, _addr = server.accept()
|
||||||
except socket.timeout:
|
except socket.timeout:
|
||||||
self._gc_idle_workers()
|
self._gc_idle_workers()
|
||||||
continue
|
continue
|
||||||
try:
|
threading.Thread(
|
||||||
req = self._read_request(conn)
|
target=self._handle_connection,
|
||||||
data = self.handle(req)
|
args=(conn,),
|
||||||
resp = {"ok": True, **_jsonable(data)}
|
daemon=True,
|
||||||
except Exception as exc:
|
name="hermes-bridge-broker-connection",
|
||||||
resp = {
|
).start()
|
||||||
"ok": False,
|
|
||||||
"error": str(exc),
|
|
||||||
"error_type": exc.__class__.__name__,
|
|
||||||
}
|
|
||||||
self._write_response(conn, resp)
|
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
break
|
break
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
print(f"[hermes-bridge-broker] server loop error: {exc}", file=sys.stderr, flush=True)
|
print(f"[hermes-bridge-broker] server loop error: {exc}", file=sys.stderr, flush=True)
|
||||||
finally:
|
|
||||||
if conn is not None:
|
|
||||||
try:
|
|
||||||
conn.close()
|
|
||||||
except OSError:
|
|
||||||
pass
|
|
||||||
finally:
|
finally:
|
||||||
restore_signals()
|
restore_signals()
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ function runPython(script: string): void {
|
|||||||
const harness = String.raw`
|
const harness = String.raw`
|
||||||
import contextvars
|
import contextvars
|
||||||
import importlib.util
|
import importlib.util
|
||||||
|
import json
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import threading
|
import threading
|
||||||
@@ -501,6 +502,57 @@ finally:
|
|||||||
bridge._process_exists = original_process_exists
|
bridge._process_exists = original_process_exists
|
||||||
|
|
||||||
assert seen_pids == [12345]
|
assert seen_pids == [12345]
|
||||||
|
`)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('handles broker ping while another broker request is blocked', () => {
|
||||||
|
runPython(String.raw`
|
||||||
|
${harness}
|
||||||
|
|
||||||
|
class BlockingBroker(bridge.BridgeBroker):
|
||||||
|
def handle(self, req):
|
||||||
|
if req.get("action") == "block":
|
||||||
|
time.sleep(0.4)
|
||||||
|
return {"blocked": True}
|
||||||
|
return super().handle(req)
|
||||||
|
|
||||||
|
class MemoryConn:
|
||||||
|
def __init__(self, req):
|
||||||
|
self.request = (json.dumps(req) + "\n").encode("utf-8")
|
||||||
|
self.response = b""
|
||||||
|
self.closed = False
|
||||||
|
|
||||||
|
def recv(self, size):
|
||||||
|
if not self.request:
|
||||||
|
return b""
|
||||||
|
chunk = self.request[:size]
|
||||||
|
self.request = self.request[size:]
|
||||||
|
return chunk
|
||||||
|
|
||||||
|
def sendall(self, payload):
|
||||||
|
self.response += payload
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self.closed = True
|
||||||
|
|
||||||
|
broker = BlockingBroker("ipc:///tmp/unused.sock")
|
||||||
|
blocking_conn = MemoryConn({"action": "block"})
|
||||||
|
thread = threading.Thread(target=broker._handle_connection, args=(blocking_conn,))
|
||||||
|
thread.start()
|
||||||
|
time.sleep(0.05)
|
||||||
|
|
||||||
|
ping_conn = MemoryConn({"action": "ping"})
|
||||||
|
broker._handle_connection(ping_conn)
|
||||||
|
ping_resp = json.loads(ping_conn.response.decode("utf-8"))
|
||||||
|
assert ping_resp["ok"] is True, ping_resp
|
||||||
|
assert ping_resp["pong"] is True, ping_resp
|
||||||
|
assert ping_conn.closed is True, ping_conn.closed
|
||||||
|
|
||||||
|
thread.join(timeout=2)
|
||||||
|
assert not thread.is_alive(), blocking_conn.response
|
||||||
|
blocked_resp = json.loads(blocking_conn.response.decode("utf-8"))
|
||||||
|
assert blocked_resp["ok"] is True, blocked_resp
|
||||||
|
assert blocked_resp["blocked"] is True, blocked_resp
|
||||||
`)
|
`)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user