From 8c4c6d668177a5b19b4417c7144b31787fa71f4e Mon Sep 17 00:00:00 2001 From: ekko Date: Sat, 23 May 2026 09:17:28 +0800 Subject: [PATCH] fix performance monitor worker blocking --- .../hermes/agent-bridge/hermes_bridge.py | 14 ++++ .../server/src/services/hermes/ops-monitor.ts | 82 +++++++++++++++---- .../agent-bridge-python-concurrency.test.ts | 34 ++++++++ 3 files changed, 115 insertions(+), 15 deletions(-) diff --git a/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py b/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py index a867772..a531941 100755 --- a/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py +++ b/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py @@ -2072,6 +2072,7 @@ class BridgeBroker: self.hermes_home = hermes_home self._workers: dict[str, WorkerProcess] = {} self._run_profile: dict[str, str] = {} + self._running_run_profile: dict[str, str] = {} self._session_profile: dict[str, str] = {} self._approval_profile: dict[str, str] = {} self._compression_profile: dict[str, str] = {} @@ -2115,6 +2116,10 @@ class BridgeBroker: with self._lock: if run_id: self._run_profile[run_id] = profile + if resp.get("status") == "running": + self._running_run_profile[run_id] = profile + else: + self._running_run_profile.pop(run_id, None) if session_id: self._session_profile[session_id] = profile for event in resp.get("events") or []: @@ -2135,6 +2140,7 @@ class BridgeBroker: workers = list(self._workers.values()) self._workers.clear() self._run_profile.clear() + self._running_run_profile.clear() self._session_profile.clear() self._approval_profile.clear() self._compression_profile.clear() @@ -2169,7 +2175,11 @@ class BridgeBroker: sessions_by_profile: dict[str, int] = {} for profile in self._session_profile.values(): sessions_by_profile[profile] = sessions_by_profile.get(profile, 0) + 1 + running_sessions_by_profile: dict[str, int] = {} + for profile in self._running_run_profile.values(): + running_sessions_by_profile[profile] = running_sessions_by_profile.get(profile, 0) + 1 active_sessions = len(self._session_profile) + running_sessions = len(self._running_run_profile) return { "pong": True, "time": time.time(), @@ -2181,7 +2191,9 @@ class BridgeBroker: "workers": workers, "worker_details": worker_details, "active_sessions": active_sessions, + "running_sessions": running_sessions, "sessions_by_profile": sessions_by_profile, + "running_sessions_by_profile": running_sessions_by_profile, } if action == "worker_ping": @@ -2236,6 +2248,7 @@ class BridgeBroker: workers = list(self._workers.values()) self._workers.clear() self._run_profile.clear() + self._running_run_profile.clear() self._session_profile.clear() self._approval_profile.clear() self._compression_profile.clear() @@ -2256,6 +2269,7 @@ class BridgeBroker: with self._lock: worker = self._workers.pop(profile, None) self._run_profile = {key: value for key, value in self._run_profile.items() if value != profile} + self._running_run_profile = {key: value for key, value in self._running_run_profile.items() if value != profile} self._session_profile = {key: value for key, value in self._session_profile.items() if value != profile} self._approval_profile = {key: value for key, value in self._approval_profile.items() if value != profile} self._compression_profile = {key: value for key, value in self._compression_profile.items() if value != profile} diff --git a/packages/server/src/services/hermes/ops-monitor.ts b/packages/server/src/services/hermes/ops-monitor.ts index 1d36f8b..6521f6d 100644 --- a/packages/server/src/services/hermes/ops-monitor.ts +++ b/packages/server/src/services/hermes/ops-monitor.ts @@ -72,6 +72,11 @@ interface WebCpuSample { usage: NodeJS.CpuUsage } +interface ProcessCpuSample { + at: number + cpuSeconds: number +} + interface SystemMemoryUsage { totalMemoryBytes: number freeMemoryBytes: number @@ -81,6 +86,7 @@ interface SystemMemoryUsage { let previousSystemCpu: CpuTimesSample | null = null let previousWebCpu: WebCpuSample | null = null +const previousWindowsProcessCpu = new Map() function safeCpus(): ReturnType { try { @@ -305,12 +311,61 @@ function parseWindowsJson(output: string): any[] { return Array.isArray(parsed) ? parsed : [parsed] } +function sampleWindowsProcessCpuPercent(pid: number, cpuSeconds: number): number { + const current = { at: Date.now(), cpuSeconds } + const previous = previousWindowsProcessCpu.get(pid) + previousWindowsProcessCpu.set(pid, current) + if (!previous) return 0 + + const elapsedSeconds = (current.at - previous.at) / 1000 + const cpuDelta = current.cpuSeconds - previous.cpuSeconds + if (elapsedSeconds <= 0 || cpuDelta < 0) return 0 + return clampPercent((cpuDelta / elapsedSeconds / Math.max(safeCpus().length, 1)) * 100) +} + function collectWindowsProcessMetrics(pids: number[]): Map> { if (!pids.length) return new Map() const idList = pids.join(',') try { const script = [ - `$ids=@(${idList})`, + `$ids=@(${idList});`, + '$all=Get-CimInstance Win32_Process | Select-Object ProcessId,ParentProcessId;', + '$byParent=@{};', + 'foreach($p in $all){$parent=[int]$p.ParentProcessId;if(-not $byParent.ContainsKey($parent)){$byParent[$parent]=@()};$byParent[$parent]+=[int]$p.ProcessId};', + '$result=@();', + 'foreach($root in $ids){', + '$seen=@{};$queue=New-Object System.Collections.Queue;$queue.Enqueue([int]$root);$tree=@();', + 'while($queue.Count -gt 0){$current=[int]$queue.Dequeue();if($seen.ContainsKey($current)){continue};$seen[$current]=$true;$tree+=$current;if($byParent.ContainsKey($current)){foreach($child in $byParent[$current]){$queue.Enqueue([int]$child)}}};', + '$procs=Get-Process -Id $tree -ErrorAction SilentlyContinue;', + '$mem=0.0;$cpu=0.0;$names=@();', + 'foreach($proc in $procs){$mem+=[double]$proc.WorkingSet64;if($null -ne $proc.CPU){$cpu+=[double]$proc.CPU};$names+=$proc.ProcessName};', + '$result+=[pscustomobject]@{pid=[int]$root;cpuSeconds=[double]$cpu;memoryRssBytes=[double]$mem;command=($names -join "+")}', + '};', + '$result', + '| ConvertTo-Json -Compress', + ].join(' ') + const output = execFileSync('powershell.exe', ['-NoProfile', '-Command', script], { + encoding: 'utf-8', + timeout: 5000, + windowsHide: true, + }) + const metrics = new Map>() + for (const item of parseWindowsJson(output)) { + const pid = Number(item?.pid) + if (!Number.isFinite(pid)) continue + const cpuSeconds = numberOrNull(item?.cpuSeconds) ?? 0 + metrics.set(pid, { + cpuPercent: sampleWindowsProcessCpuPercent(pid, cpuSeconds), + memoryRssBytes: numberOrNull(item?.memoryRssBytes) ?? 0, + command: typeof item?.command === 'string' ? item.command : undefined, + }) + } + return metrics + } catch {} + + try { + const script = [ + `$ids=@(${idList});`, 'Get-CimInstance Win32_PerfFormattedData_PerfProc_Process', '| Where-Object { $ids -contains [int]$_.IDProcess }', '| Select-Object @{Name="pid";Expression={[int]$_.IDProcess}},@{Name="cpuPercent";Expression={[double]$_.PercentProcessorTime}},@{Name="memoryRssBytes";Expression={[double]$_.WorkingSet}},@{Name="command";Expression={$_.Name}}', @@ -449,16 +504,11 @@ export async function getOpsRuntimeSnapshot(): Promise { let bridgeReachable = false let bridgeError: string | undefined let bridgePing: Record = {} - let sessions: Array> = [] try { const client = new AgentBridgeClient({ endpoint: managerState.endpoint, timeoutMs: 2000, connectRetryMs: 0 }) bridgePing = await client.ping() as Record bridgeReachable = true - try { - const list = await client.list() - sessions = Array.isArray((list as any).sessions) ? (list as any).sessions : [] - } catch {} } catch (err: any) { bridgeError = err?.message || 'Agent bridge is not reachable' } @@ -474,18 +524,20 @@ export async function getOpsRuntimeSnapshot(): Promise { const processMetrics = collectProcessMetrics(pids) const sessionCountsByProfile: Record = {} - let runningSessions = 0 - for (const session of sessions) { - const profileName = String(session.profile || 'default') - sessionCountsByProfile[profileName] = (sessionCountsByProfile[profileName] || 0) + 1 - if (session.running) runningSessions += 1 - } - if (!sessions.length && bridgePing.sessions_by_profile && typeof bridgePing.sessions_by_profile === 'object') { + if (bridgePing.sessions_by_profile && typeof bridgePing.sessions_by_profile === 'object') { for (const [profileName, count] of Object.entries(bridgePing.sessions_by_profile)) { const value = Number(count) if (Number.isFinite(value)) sessionCountsByProfile[profileName] = value } } + const runningSessionCountsByProfile: Record = {} + if (bridgePing.running_sessions_by_profile && typeof bridgePing.running_sessions_by_profile === 'object') { + for (const [profileName, count] of Object.entries(bridgePing.running_sessions_by_profile)) { + const value = Number(count) + if (Number.isFinite(value)) runningSessionCountsByProfile[profileName] = value + } + } + const runningSessions = Number(bridgePing.running_sessions || 0) const workers = workerEntries.map(([profileName, worker]) => { const usage = processUsage(worker.pid, 'worker', processMetrics, profileName) @@ -500,7 +552,7 @@ export async function getOpsRuntimeSnapshot(): Promise { endpoint: worker.endpoint, lastUsedAt: worker.lastUsedAt, sessionCount: sessionCountsByProfile[profileName] || 0, - runningSessionCount: sessions.filter(session => String(session.profile || 'default') === profileName && session.running).length, + runningSessionCount: runningSessionCountsByProfile[profileName] || 0, } }) @@ -543,7 +595,7 @@ export async function getOpsRuntimeSnapshot(): Promise { totalWorkerMemoryRssBytes: totalWorkerMemory, }, sessions: { - active: sessions.length || Number(bridgePing.active_sessions || 0), + active: Number(bridgePing.active_sessions || 0), running: runningSessions, byProfile: sessionCountsByProfile, }, diff --git a/tests/server/agent-bridge-python-concurrency.test.ts b/tests/server/agent-bridge-python-concurrency.test.ts index 8f0e3c0..2a2ba42 100644 --- a/tests/server/agent-bridge-python-concurrency.test.ts +++ b/tests/server/agent-bridge-python-concurrency.test.ts @@ -309,6 +309,7 @@ broker = bridge.BridgeBroker("ipc:///tmp/unused.sock") profile_worker = FakeWorker(2) broker._workers["default"] = profile_worker broker._run_profile["run-session-a"] = "default" +broker._running_run_profile["run-session-a"] = "default" broker._session_profile["session-a"] = "default" broker._approval_profile["approval-a"] = "default" broker._compression_profile["compression-a"] = "default" @@ -318,6 +319,7 @@ assert destroy_profile_result == {"profile": "default", "destroyed": 2} assert profile_worker.stopped assert "default" not in broker._workers assert broker._run_profile == {} +assert broker._running_run_profile == {} assert broker._session_profile == {} assert broker._approval_profile == {} assert broker._compression_profile == {} @@ -327,6 +329,7 @@ worker_b = FakeWorker(3) broker._workers["a"] = worker_a broker._workers["b"] = worker_b broker._run_profile["run-a"] = "a" +broker._running_run_profile["run-a"] = "a" broker._session_profile["session-b"] = "b" destroy_all_result = broker.handle({"action": "destroy_all"}) @@ -335,10 +338,39 @@ assert worker_a.stopped assert worker_b.stopped assert broker._workers == {} assert broker._run_profile == {} +assert broker._running_run_profile == {} assert broker._session_profile == {} `) }) + it('builds broker ping metrics without calling profile workers', () => { + runPython(String.raw` +${harness} + +class PingWorker: + running = True + pid = 12345 + endpoint = "ipc:///tmp/worker.sock" + last_used_at = 12.5 + + def request(self, req): + raise AssertionError("broker ping must not forward to worker") + +broker = bridge.BridgeBroker("ipc:///tmp/broker.sock") +broker._workers["default"] = PingWorker() +broker._session_profile["session-a"] = "default" +broker._running_run_profile["run-a"] = "default" + +resp = broker.handle({"action": "ping"}) +assert resp["workers"] == {"default": True} +assert resp["worker_details"]["default"]["pid"] == 12345 +assert resp["active_sessions"] == 1 +assert resp["running_sessions"] == 1 +assert resp["sessions_by_profile"] == {"default": 1} +assert resp["running_sessions_by_profile"] == {"default": 1} +`) + }) + it('restores approval env and clears handlers when a run fails', () => { runPython(String.raw` ${harness} @@ -413,6 +445,7 @@ broker = bridge.BridgeBroker("ipc:///tmp/unused.sock") worker = FakeWorker() broker._workers["default"] = worker broker._run_profile["run-a"] = "default" +broker._running_run_profile["run-a"] = "default" broker._session_profile["session-a"] = "default" broker._approval_profile["approval-a"] = "default" broker._compression_profile["compression-a"] = "default" @@ -422,6 +455,7 @@ assert broker._stop.is_set() assert worker.stopped assert broker._workers == {} assert broker._run_profile == {} +assert broker._running_run_profile == {} assert broker._session_profile == {} assert broker._approval_profile == {} assert broker._compression_profile == {}