fix performance monitor worker blocking

This commit is contained in:
ekko
2026-05-23 09:17:28 +08:00
committed by ekko
parent c184519c5d
commit 8c4c6d6681
3 changed files with 115 additions and 15 deletions
@@ -2072,6 +2072,7 @@ class BridgeBroker:
self.hermes_home = hermes_home
self._workers: dict[str, WorkerProcess] = {}
self._run_profile: dict[str, str] = {}
self._running_run_profile: dict[str, str] = {}
self._session_profile: dict[str, str] = {}
self._approval_profile: dict[str, str] = {}
self._compression_profile: dict[str, str] = {}
@@ -2115,6 +2116,10 @@ class BridgeBroker:
with self._lock:
if run_id:
self._run_profile[run_id] = profile
if resp.get("status") == "running":
self._running_run_profile[run_id] = profile
else:
self._running_run_profile.pop(run_id, None)
if session_id:
self._session_profile[session_id] = profile
for event in resp.get("events") or []:
@@ -2135,6 +2140,7 @@ class BridgeBroker:
workers = list(self._workers.values())
self._workers.clear()
self._run_profile.clear()
self._running_run_profile.clear()
self._session_profile.clear()
self._approval_profile.clear()
self._compression_profile.clear()
@@ -2169,7 +2175,11 @@ class BridgeBroker:
sessions_by_profile: dict[str, int] = {}
for profile in self._session_profile.values():
sessions_by_profile[profile] = sessions_by_profile.get(profile, 0) + 1
running_sessions_by_profile: dict[str, int] = {}
for profile in self._running_run_profile.values():
running_sessions_by_profile[profile] = running_sessions_by_profile.get(profile, 0) + 1
active_sessions = len(self._session_profile)
running_sessions = len(self._running_run_profile)
return {
"pong": True,
"time": time.time(),
@@ -2181,7 +2191,9 @@ class BridgeBroker:
"workers": workers,
"worker_details": worker_details,
"active_sessions": active_sessions,
"running_sessions": running_sessions,
"sessions_by_profile": sessions_by_profile,
"running_sessions_by_profile": running_sessions_by_profile,
}
if action == "worker_ping":
@@ -2236,6 +2248,7 @@ class BridgeBroker:
workers = list(self._workers.values())
self._workers.clear()
self._run_profile.clear()
self._running_run_profile.clear()
self._session_profile.clear()
self._approval_profile.clear()
self._compression_profile.clear()
@@ -2256,6 +2269,7 @@ class BridgeBroker:
with self._lock:
worker = self._workers.pop(profile, None)
self._run_profile = {key: value for key, value in self._run_profile.items() if value != profile}
self._running_run_profile = {key: value for key, value in self._running_run_profile.items() if value != profile}
self._session_profile = {key: value for key, value in self._session_profile.items() if value != profile}
self._approval_profile = {key: value for key, value in self._approval_profile.items() if value != profile}
self._compression_profile = {key: value for key, value in self._compression_profile.items() if value != profile}
@@ -72,6 +72,11 @@ interface WebCpuSample {
usage: NodeJS.CpuUsage
}
interface ProcessCpuSample {
at: number
cpuSeconds: number
}
interface SystemMemoryUsage {
totalMemoryBytes: number
freeMemoryBytes: number
@@ -81,6 +86,7 @@ interface SystemMemoryUsage {
let previousSystemCpu: CpuTimesSample | null = null
let previousWebCpu: WebCpuSample | null = null
const previousWindowsProcessCpu = new Map<number, ProcessCpuSample>()
function safeCpus(): ReturnType<typeof cpus> {
try {
@@ -305,12 +311,61 @@ function parseWindowsJson(output: string): any[] {
return Array.isArray(parsed) ? parsed : [parsed]
}
function sampleWindowsProcessCpuPercent(pid: number, cpuSeconds: number): number {
const current = { at: Date.now(), cpuSeconds }
const previous = previousWindowsProcessCpu.get(pid)
previousWindowsProcessCpu.set(pid, current)
if (!previous) return 0
const elapsedSeconds = (current.at - previous.at) / 1000
const cpuDelta = current.cpuSeconds - previous.cpuSeconds
if (elapsedSeconds <= 0 || cpuDelta < 0) return 0
return clampPercent((cpuDelta / elapsedSeconds / Math.max(safeCpus().length, 1)) * 100)
}
function collectWindowsProcessMetrics(pids: number[]): Map<number, Partial<ProcessUsage>> {
if (!pids.length) return new Map()
const idList = pids.join(',')
try {
const script = [
`$ids=@(${idList})`,
`$ids=@(${idList});`,
'$all=Get-CimInstance Win32_Process | Select-Object ProcessId,ParentProcessId;',
'$byParent=@{};',
'foreach($p in $all){$parent=[int]$p.ParentProcessId;if(-not $byParent.ContainsKey($parent)){$byParent[$parent]=@()};$byParent[$parent]+=[int]$p.ProcessId};',
'$result=@();',
'foreach($root in $ids){',
'$seen=@{};$queue=New-Object System.Collections.Queue;$queue.Enqueue([int]$root);$tree=@();',
'while($queue.Count -gt 0){$current=[int]$queue.Dequeue();if($seen.ContainsKey($current)){continue};$seen[$current]=$true;$tree+=$current;if($byParent.ContainsKey($current)){foreach($child in $byParent[$current]){$queue.Enqueue([int]$child)}}};',
'$procs=Get-Process -Id $tree -ErrorAction SilentlyContinue;',
'$mem=0.0;$cpu=0.0;$names=@();',
'foreach($proc in $procs){$mem+=[double]$proc.WorkingSet64;if($null -ne $proc.CPU){$cpu+=[double]$proc.CPU};$names+=$proc.ProcessName};',
'$result+=[pscustomobject]@{pid=[int]$root;cpuSeconds=[double]$cpu;memoryRssBytes=[double]$mem;command=($names -join "+")}',
'};',
'$result',
'| ConvertTo-Json -Compress',
].join(' ')
const output = execFileSync('powershell.exe', ['-NoProfile', '-Command', script], {
encoding: 'utf-8',
timeout: 5000,
windowsHide: true,
})
const metrics = new Map<number, Partial<ProcessUsage>>()
for (const item of parseWindowsJson(output)) {
const pid = Number(item?.pid)
if (!Number.isFinite(pid)) continue
const cpuSeconds = numberOrNull(item?.cpuSeconds) ?? 0
metrics.set(pid, {
cpuPercent: sampleWindowsProcessCpuPercent(pid, cpuSeconds),
memoryRssBytes: numberOrNull(item?.memoryRssBytes) ?? 0,
command: typeof item?.command === 'string' ? item.command : undefined,
})
}
return metrics
} catch {}
try {
const script = [
`$ids=@(${idList});`,
'Get-CimInstance Win32_PerfFormattedData_PerfProc_Process',
'| Where-Object { $ids -contains [int]$_.IDProcess }',
'| Select-Object @{Name="pid";Expression={[int]$_.IDProcess}},@{Name="cpuPercent";Expression={[double]$_.PercentProcessorTime}},@{Name="memoryRssBytes";Expression={[double]$_.WorkingSet}},@{Name="command";Expression={$_.Name}}',
@@ -449,16 +504,11 @@ export async function getOpsRuntimeSnapshot(): Promise<OpsRuntimeSnapshot> {
let bridgeReachable = false
let bridgeError: string | undefined
let bridgePing: Record<string, any> = {}
let sessions: Array<Record<string, any>> = []
try {
const client = new AgentBridgeClient({ endpoint: managerState.endpoint, timeoutMs: 2000, connectRetryMs: 0 })
bridgePing = await client.ping() as Record<string, any>
bridgeReachable = true
try {
const list = await client.list()
sessions = Array.isArray((list as any).sessions) ? (list as any).sessions : []
} catch {}
} catch (err: any) {
bridgeError = err?.message || 'Agent bridge is not reachable'
}
@@ -474,18 +524,20 @@ export async function getOpsRuntimeSnapshot(): Promise<OpsRuntimeSnapshot> {
const processMetrics = collectProcessMetrics(pids)
const sessionCountsByProfile: Record<string, number> = {}
let runningSessions = 0
for (const session of sessions) {
const profileName = String(session.profile || 'default')
sessionCountsByProfile[profileName] = (sessionCountsByProfile[profileName] || 0) + 1
if (session.running) runningSessions += 1
}
if (!sessions.length && bridgePing.sessions_by_profile && typeof bridgePing.sessions_by_profile === 'object') {
if (bridgePing.sessions_by_profile && typeof bridgePing.sessions_by_profile === 'object') {
for (const [profileName, count] of Object.entries(bridgePing.sessions_by_profile)) {
const value = Number(count)
if (Number.isFinite(value)) sessionCountsByProfile[profileName] = value
}
}
const runningSessionCountsByProfile: Record<string, number> = {}
if (bridgePing.running_sessions_by_profile && typeof bridgePing.running_sessions_by_profile === 'object') {
for (const [profileName, count] of Object.entries(bridgePing.running_sessions_by_profile)) {
const value = Number(count)
if (Number.isFinite(value)) runningSessionCountsByProfile[profileName] = value
}
}
const runningSessions = Number(bridgePing.running_sessions || 0)
const workers = workerEntries.map(([profileName, worker]) => {
const usage = processUsage(worker.pid, 'worker', processMetrics, profileName)
@@ -500,7 +552,7 @@ export async function getOpsRuntimeSnapshot(): Promise<OpsRuntimeSnapshot> {
endpoint: worker.endpoint,
lastUsedAt: worker.lastUsedAt,
sessionCount: sessionCountsByProfile[profileName] || 0,
runningSessionCount: sessions.filter(session => String(session.profile || 'default') === profileName && session.running).length,
runningSessionCount: runningSessionCountsByProfile[profileName] || 0,
}
})
@@ -543,7 +595,7 @@ export async function getOpsRuntimeSnapshot(): Promise<OpsRuntimeSnapshot> {
totalWorkerMemoryRssBytes: totalWorkerMemory,
},
sessions: {
active: sessions.length || Number(bridgePing.active_sessions || 0),
active: Number(bridgePing.active_sessions || 0),
running: runningSessions,
byProfile: sessionCountsByProfile,
},
@@ -309,6 +309,7 @@ broker = bridge.BridgeBroker("ipc:///tmp/unused.sock")
profile_worker = FakeWorker(2)
broker._workers["default"] = profile_worker
broker._run_profile["run-session-a"] = "default"
broker._running_run_profile["run-session-a"] = "default"
broker._session_profile["session-a"] = "default"
broker._approval_profile["approval-a"] = "default"
broker._compression_profile["compression-a"] = "default"
@@ -318,6 +319,7 @@ assert destroy_profile_result == {"profile": "default", "destroyed": 2}
assert profile_worker.stopped
assert "default" not in broker._workers
assert broker._run_profile == {}
assert broker._running_run_profile == {}
assert broker._session_profile == {}
assert broker._approval_profile == {}
assert broker._compression_profile == {}
@@ -327,6 +329,7 @@ worker_b = FakeWorker(3)
broker._workers["a"] = worker_a
broker._workers["b"] = worker_b
broker._run_profile["run-a"] = "a"
broker._running_run_profile["run-a"] = "a"
broker._session_profile["session-b"] = "b"
destroy_all_result = broker.handle({"action": "destroy_all"})
@@ -335,10 +338,39 @@ assert worker_a.stopped
assert worker_b.stopped
assert broker._workers == {}
assert broker._run_profile == {}
assert broker._running_run_profile == {}
assert broker._session_profile == {}
`)
})
it('builds broker ping metrics without calling profile workers', () => {
runPython(String.raw`
${harness}
class PingWorker:
running = True
pid = 12345
endpoint = "ipc:///tmp/worker.sock"
last_used_at = 12.5
def request(self, req):
raise AssertionError("broker ping must not forward to worker")
broker = bridge.BridgeBroker("ipc:///tmp/broker.sock")
broker._workers["default"] = PingWorker()
broker._session_profile["session-a"] = "default"
broker._running_run_profile["run-a"] = "default"
resp = broker.handle({"action": "ping"})
assert resp["workers"] == {"default": True}
assert resp["worker_details"]["default"]["pid"] == 12345
assert resp["active_sessions"] == 1
assert resp["running_sessions"] == 1
assert resp["sessions_by_profile"] == {"default": 1}
assert resp["running_sessions_by_profile"] == {"default": 1}
`)
})
it('restores approval env and clears handlers when a run fails', () => {
runPython(String.raw`
${harness}
@@ -413,6 +445,7 @@ broker = bridge.BridgeBroker("ipc:///tmp/unused.sock")
worker = FakeWorker()
broker._workers["default"] = worker
broker._run_profile["run-a"] = "default"
broker._running_run_profile["run-a"] = "default"
broker._session_profile["session-a"] = "default"
broker._approval_profile["approval-a"] = "default"
broker._compression_profile["compression-a"] = "default"
@@ -422,6 +455,7 @@ assert broker._stop.is_set()
assert worker.stopped
assert broker._workers == {}
assert broker._run_profile == {}
assert broker._running_run_profile == {}
assert broker._session_profile == {}
assert broker._approval_profile == {}
assert broker._compression_profile == {}