add bridge performance monitoring

This commit is contained in:
ekko
2026-05-23 09:05:03 +08:00
committed by ekko
parent 4223014e0c
commit c184519c5d
21 changed files with 1778 additions and 91 deletions
@@ -0,0 +1,9 @@
import { createEmptyOpsRuntimeSnapshot, getOpsRuntimeSnapshot } from '../../services/hermes/ops-monitor'
export async function runtime(ctx: any) {
try {
ctx.body = await getOpsRuntimeSnapshot()
} catch (err: any) {
ctx.body = createEmptyOpsRuntimeSnapshot(err?.message || 'Failed to read performance metrics')
}
}
@@ -0,0 +1,6 @@
import Router from '@koa/router'
import * as ctrl from '../../controllers/hermes/performance-monitor'
export const performanceMonitorRoutes = new Router()
performanceMonitorRoutes.get('/api/hermes/performance/runtime', ctrl.runtime)
+2
View File
@@ -31,6 +31,7 @@ import { ttsRoutes } from './hermes/tts'
import { mediaRoutes } from './hermes/media'
import { proxyRoutes, proxyMiddleware } from './hermes/proxy'
import { groupChatRoutes, setGroupChatServer } from './hermes/group-chat'
import { performanceMonitorRoutes } from './hermes/performance-monitor'
/**
* Register all routes on the Koa app.
@@ -72,6 +73,7 @@ export function registerRoutes(app: any, requireAuth: (ctx: Context, next: Next)
app.use(cronHistoryRoutes.routes()) // Must be before proxy
app.use(kanbanRoutes.routes()) // Must be before proxy
app.use(mediaRoutes.routes()) // Must be before proxy
app.use(performanceMonitorRoutes.routes()) // Must be before proxy
app.use(proxyRoutes.routes())
// Proxy catch-all middleware (must be last)
@@ -10,13 +10,16 @@ delimited JSON request/response protocol over a local socket.
from __future__ import annotations
import argparse
import atexit
import copy
import errno
import hashlib
import importlib.util
import json
import locale
import os
import queue
import signal
import shutil
import socket
import subprocess
@@ -38,12 +41,100 @@ DEFAULT_AGENT_ROOT = "~/.hermes/hermes-agent"
DEFAULT_HERMES_HOME = "~/.hermes"
APPROVAL_TIMEOUT_SECONDS = 120
APPROVAL_TIMEOUT_MS = APPROVAL_TIMEOUT_SECONDS * 1000
PARENT_WATCHDOG_INTERVAL_SECONDS = 2.0
def _bridge_platform() -> str:
return os.environ.get("HERMES_AGENT_BRIDGE_PLATFORM", "cli").strip() or "cli"
def _positive_int(value: str | None) -> int | None:
if not value:
return None
try:
parsed = int(value)
except (TypeError, ValueError):
return None
return parsed if parsed > 0 else None
def _process_exists(pid: int) -> bool:
if pid <= 0:
return False
if os.name == "nt":
try:
result = subprocess.run(
["tasklist.exe", "/FI", f"PID eq {pid}", "/NH"],
check=False,
capture_output=True,
text=True,
timeout=5,
)
return str(pid) in (result.stdout or "")
except Exception:
return True
try:
os.kill(pid, 0)
return True
except ProcessLookupError:
return False
except PermissionError:
return True
except OSError as exc:
return exc.errno != errno.ESRCH
def _start_parent_process_watchdog(
parent_pid: int | None,
stop_event: threading.Event,
label: str,
interval: float = PARENT_WATCHDOG_INTERVAL_SECONDS,
) -> None:
if not parent_pid or parent_pid == os.getpid():
return
def run() -> None:
while not stop_event.wait(interval):
if _process_exists(parent_pid):
continue
print(
f"[hermes-bridge] parent pid {parent_pid} exited; stopping {label}",
file=sys.stderr,
flush=True,
)
stop_event.set()
return
threading.Thread(target=run, daemon=True, name=f"hermes-bridge-parent-watchdog-{label}").start()
def _install_stop_signal_handlers(stop_event: threading.Event) -> Callable[[], None]:
if threading.current_thread() is not threading.main_thread():
return lambda: None
previous: list[tuple[signal.Signals, Any]] = []
def handle_signal(signum: int, _frame: Any) -> None:
stop_event.set()
for signum in (signal.SIGINT, signal.SIGTERM):
try:
sig = signal.Signals(signum)
previous.append((sig, signal.getsignal(sig)))
signal.signal(sig, handle_signal)
except Exception:
pass
def restore() -> None:
for sig, handler in previous:
try:
signal.signal(sig, handler)
except Exception:
pass
return restore
def _suppress_bridge_platform_hint() -> None:
raw = os.environ.get("HERMES_BRIDGE_SUPPRESS_PLATFORM_HINT", "cli").strip()
if raw.lower() in {"0", "false", "no", "off"}:
@@ -1452,12 +1543,18 @@ class BridgeServer:
raise ValueError("action is required")
if action == "ping":
with self.pool._lock:
sessions = list(self.pool._sessions.values())
running_sessions = sum(1 for session in sessions if session.running)
return {
"pong": True,
"time": time.time(),
"pid": os.getpid(),
"agent_root": str(_agent_root()),
"profile": _worker_profile() or "default",
"hermes_home": str(_hermes_home()),
"session_count": len(sessions),
"running_session_count": running_sessions,
}
if action == "chat":
@@ -1588,46 +1685,54 @@ class BridgeServer:
def serve_forever(self) -> None:
server = self._make_server_socket()
server.listen(16)
server.settimeout(0.2)
print(json.dumps({"event": "ready", "endpoint": self.endpoint}), flush=True)
restore_signals = _install_stop_signal_handlers(self._stop)
_start_parent_process_watchdog(
_positive_int(os.environ.get("HERMES_AGENT_BRIDGE_BROKER_PID")),
self._stop,
f"worker:{_worker_profile() or 'default'}",
)
try:
server.listen(16)
server.settimeout(0.2)
print(json.dumps({"event": "ready", "endpoint": self.endpoint}), flush=True)
while not self._stop.is_set():
conn: socket.socket | None = None
try:
while not self._stop.is_set():
conn: socket.socket | None = None
try:
conn, _addr = server.accept()
except socket.timeout:
self._gc_idle_sessions()
continue
try:
req = self._read_request(conn)
data = self.handle(req)
resp = {"ok": True, **_jsonable(data)}
except Exception as exc:
resp = {
"ok": False,
"error": str(exc),
"error_type": exc.__class__.__name__,
}
self._write_response(conn, resp)
except KeyboardInterrupt:
break
except Exception as exc:
print(f"[hermes-bridge] server loop error: {exc}", file=sys.stderr, flush=True)
finally:
if conn is not None:
try:
conn.close()
except OSError:
pass
server.close()
if self.endpoint.startswith("ipc://"):
try:
Path(self.endpoint.removeprefix("ipc://")).unlink(missing_ok=True)
except OSError:
pass
conn, _addr = server.accept()
except socket.timeout:
self._gc_idle_sessions()
continue
try:
req = self._read_request(conn)
data = self.handle(req)
resp = {"ok": True, **_jsonable(data)}
except Exception as exc:
resp = {
"ok": False,
"error": str(exc),
"error_type": exc.__class__.__name__,
}
self._write_response(conn, resp)
except KeyboardInterrupt:
break
except Exception as exc:
print(f"[hermes-bridge] server loop error: {exc}", file=sys.stderr, flush=True)
finally:
if conn is not None:
try:
conn.close()
except OSError:
pass
finally:
restore_signals()
server.close()
if self.endpoint.startswith("ipc://"):
try:
Path(self.endpoint.removeprefix("ipc://")).unlink(missing_ok=True)
except OSError:
pass
class WorkerProcess:
@@ -1647,6 +1752,10 @@ class WorkerProcess:
def running(self) -> bool:
return self.process is not None and self.process.poll() is None
@property
def pid(self) -> int | None:
return self.process.pid if self.process is not None else None
def start(self) -> None:
with self._lock:
if self.running:
@@ -1668,6 +1777,7 @@ class WorkerProcess:
**os.environ,
"HERMES_AGENT_BRIDGE_ENDPOINT": self.endpoint,
"HERMES_AGENT_BRIDGE_WORKER_PROFILE": self.profile,
"HERMES_AGENT_BRIDGE_BROKER_PID": str(os.getpid()),
}
self.process = subprocess.Popen(
args,
@@ -2019,6 +2129,18 @@ class BridgeBroker:
if event.get("event") in {"bridge.compression.completed", "bridge.compression.failed"} and request_id:
self._compression_profile.pop(request_id, None)
def stop(self) -> None:
self._stop.set()
with self._lock:
workers = list(self._workers.values())
self._workers.clear()
self._run_profile.clear()
self._session_profile.clear()
self._approval_profile.clear()
self._compression_profile.clear()
for worker in workers:
worker.stop()
def _forward(self, profile: str, req: dict[str, Any]) -> dict[str, Any]:
worker = self._worker_for_profile(profile)
forwarded = dict(req)
@@ -2034,8 +2156,33 @@ class BridgeBroker:
if action == "ping":
with self._lock:
workers = {profile: worker.running for profile, worker in self._workers.items()}
return {"pong": True, "time": time.time(), "mode": "broker", "workers": workers}
worker_details = {
profile: {
"running": worker.running,
"pid": worker.pid,
"endpoint": worker.endpoint,
"last_used_at": worker.last_used_at,
}
for profile, worker in self._workers.items()
}
workers = {profile: details["running"] for profile, details in worker_details.items()}
sessions_by_profile: dict[str, int] = {}
for profile in self._session_profile.values():
sessions_by_profile[profile] = sessions_by_profile.get(profile, 0) + 1
active_sessions = len(self._session_profile)
return {
"pong": True,
"time": time.time(),
"mode": "broker",
"broker": {
"pid": os.getpid(),
"endpoint": self.endpoint,
},
"workers": workers,
"worker_details": worker_details,
"active_sessions": active_sessions,
"sessions_by_profile": sessions_by_profile,
}
if action == "worker_ping":
profile = self._normalize_profile(req.get("profile"))
@@ -2145,17 +2292,7 @@ class BridgeBroker:
return {"sessions": sessions}
if action == "shutdown":
self._stop.set()
with self._lock:
workers = list(self._workers.values())
for worker in workers:
if not worker.running:
worker.stop()
continue
try:
worker.request({"action": "shutdown"})
except Exception:
worker.stop()
self.stop()
return {"status": "shutting_down"}
raise ValueError(f"unknown action: {action}")
@@ -2187,51 +2324,55 @@ class BridgeBroker:
def serve_forever(self) -> None:
server = self._make_server_socket()
server.listen(64)
server.settimeout(0.2)
print(json.dumps({"event": "ready", "endpoint": self.endpoint, "mode": "broker"}), flush=True)
restore_signals = _install_stop_signal_handlers(self._stop)
atexit.register(self.stop)
try:
server.listen(64)
server.settimeout(0.2)
print(json.dumps({"event": "ready", "endpoint": self.endpoint, "mode": "broker"}), flush=True)
while not self._stop.is_set():
conn: socket.socket | None = None
try:
while not self._stop.is_set():
conn: socket.socket | None = None
try:
conn, _addr = server.accept()
except socket.timeout:
self._gc_idle_workers()
continue
try:
req = self._read_request(conn)
data = self.handle(req)
resp = {"ok": True, **_jsonable(data)}
except Exception as exc:
resp = {
"ok": False,
"error": str(exc),
"error_type": exc.__class__.__name__,
}
self._write_response(conn, resp)
except KeyboardInterrupt:
break
except Exception as exc:
print(f"[hermes-bridge-broker] server loop error: {exc}", file=sys.stderr, flush=True)
finally:
if conn is not None:
try:
conn.close()
except OSError:
pass
with self._lock:
workers = list(self._workers.values())
self._workers.clear()
for worker in workers:
worker.stop()
server.close()
if self.endpoint.startswith("ipc://"):
conn, _addr = server.accept()
except socket.timeout:
self._gc_idle_workers()
continue
try:
req = self._read_request(conn)
data = self.handle(req)
resp = {"ok": True, **_jsonable(data)}
except Exception as exc:
resp = {
"ok": False,
"error": str(exc),
"error_type": exc.__class__.__name__,
}
self._write_response(conn, resp)
except KeyboardInterrupt:
break
except Exception as exc:
print(f"[hermes-bridge-broker] server loop error: {exc}", file=sys.stderr, flush=True)
finally:
if conn is not None:
try:
conn.close()
except OSError:
pass
finally:
restore_signals()
try:
Path(self.endpoint.removeprefix("ipc://")).unlink(missing_ok=True)
except OSError:
atexit.unregister(self.stop)
except Exception:
pass
self.stop()
server.close()
if self.endpoint.startswith("ipc://"):
try:
Path(self.endpoint.removeprefix("ipc://")).unlink(missing_ok=True)
except OSError:
pass
def main(argv: list[str] | None = None) -> int:
@@ -25,6 +25,17 @@ export interface BridgeCommand {
hermesHome: string
}
export interface AgentBridgeManagerRuntimeState {
endpoint: string
running: boolean
ready: boolean
pid?: number
starting: boolean
stopping: boolean
restartScheduled: boolean
restartAttempts: number
}
function envPositiveInt(name: string): number | undefined {
const raw = process.env[name]
if (!raw) return undefined
@@ -308,6 +319,19 @@ export class AgentBridgeManager {
return !!this.child && !this.child.killed && this.ready
}
getRuntimeState(): AgentBridgeManagerRuntimeState {
return {
endpoint: this.endpoint,
running: this.running,
ready: this.ready,
pid: this.child?.pid,
starting: !!this.starting,
stopping: this.stopping,
restartScheduled: !!this.restartTimer,
restartAttempts: this.restartAttempts,
}
}
async start(): Promise<void> {
if (this.running) return
if (this.starting) return this.starting
@@ -0,0 +1,551 @@
import { execFileSync } from 'child_process'
import { readFileSync } from 'fs'
import { cpus, freemem, loadavg, platform, totalmem, uptime } from 'os'
import { AgentBridgeClient } from './agent-bridge'
import { getAgentBridgeManager } from './agent-bridge/manager'
export interface ProcessUsage {
pid: number
role: 'web' | 'broker' | 'worker'
profile?: string
running: boolean
cpuPercent: number
memoryRssBytes: number
command?: string
error?: string
}
export interface OpsRuntimeSnapshot {
timestamp: number
system: {
platform: NodeJS.Platform
arch: string
uptimeSeconds: number
cpuCount: number
cpuPercent: number
loadAverage: number[]
totalMemoryBytes: number
freeMemoryBytes: number
usedMemoryBytes: number
memoryPercent: number
}
web: {
pid: number
uptimeSeconds: number
memory: NodeJS.MemoryUsage
cpuPercent: number
}
bridge: {
endpoint: string
reachable: boolean
error?: string
broker: {
running: boolean
ready: boolean
pid?: number
process?: ProcessUsage
restartScheduled: boolean
restartAttempts: number
}
workers: Array<ProcessUsage & {
endpoint?: string
lastUsedAt?: number
sessionCount: number
runningSessionCount: number
}>
totalWorkerMemoryRssBytes: number
}
sessions: {
active: number
running: number
byProfile: Record<string, number>
}
}
interface CpuTimesSample {
idle: number
total: number
}
interface WebCpuSample {
at: number
usage: NodeJS.CpuUsage
}
interface SystemMemoryUsage {
totalMemoryBytes: number
freeMemoryBytes: number
usedMemoryBytes: number
memoryPercent: number
}
let previousSystemCpu: CpuTimesSample | null = null
let previousWebCpu: WebCpuSample | null = null
function safeCpus(): ReturnType<typeof cpus> {
try {
return cpus()
} catch {
return []
}
}
function safeLoadAverage(): number[] {
try {
return loadavg()
} catch {
return [0, 0, 0]
}
}
function safeUptime(): number {
try {
return uptime()
} catch {
return 0
}
}
function safeProcessUptime(): number {
try {
return process.uptime()
} catch {
return 0
}
}
function safeProcessMemoryUsage(): NodeJS.MemoryUsage {
try {
return process.memoryUsage()
} catch {
return {
rss: 0,
heapTotal: 0,
heapUsed: 0,
external: 0,
arrayBuffers: 0,
}
}
}
function readCpuTimes(): CpuTimesSample {
let idle = 0
let total = 0
for (const cpu of safeCpus()) {
idle += cpu.times.idle
total += Object.values(cpu.times).reduce((sum, value) => sum + value, 0)
}
return { idle, total }
}
function sampleSystemCpuPercent(): number | null {
try {
const current = readCpuTimes()
const previous = previousSystemCpu
previousSystemCpu = current
if (!previous) return null
const idleDelta = current.idle - previous.idle
const totalDelta = current.total - previous.total
if (totalDelta <= 0) return null
return clampPercent(((totalDelta - idleDelta) / totalDelta) * 100)
} catch {
return null
}
}
function sampleWebCpuPercent(): number | null {
try {
const current = {
at: Date.now(),
usage: process.cpuUsage(),
}
const previous = previousWebCpu
previousWebCpu = current
if (!previous) return null
const elapsedMicros = (current.at - previous.at) * 1000
const used = (current.usage.user - previous.usage.user) + (current.usage.system - previous.usage.system)
if (elapsedMicros <= 0 || used < 0) return null
return clampPercent((used / elapsedMicros / Math.max(safeCpus().length, 1)) * 100)
} catch {
return null
}
}
function clampPercent(value: number): number {
return Math.max(0, Math.min(100, Math.round(value * 10) / 10))
}
function numberOrNull(value: unknown): number | null {
const parsed = Number(value)
return Number.isFinite(parsed) ? parsed : null
}
function fallbackSystemMemoryUsage(): SystemMemoryUsage {
let memoryTotal = 0
let memoryFree = 0
try {
memoryTotal = totalmem()
memoryFree = freemem()
} catch {}
const usedMemory = memoryTotal - memoryFree
return {
totalMemoryBytes: memoryTotal,
freeMemoryBytes: memoryFree,
usedMemoryBytes: usedMemory,
memoryPercent: memoryTotal > 0 ? clampPercent((usedMemory / memoryTotal) * 100) : 0,
}
}
function parseVmStatPageCount(line: string): number | null {
const match = line.match(/:\s+([\d.]+)\.?$/)
if (!match) return null
const value = Number(match[1].replace(/\./g, ''))
return Number.isFinite(value) ? value : null
}
export function parseMacVmStatMemory(vmStatOutput: string, totalMemoryBytes: number): SystemMemoryUsage | null {
const pageSize = Number(vmStatOutput.match(/page size of\s+(\d+)\s+bytes/i)?.[1])
if (!Number.isFinite(pageSize) || pageSize <= 0 || totalMemoryBytes <= 0) return null
const pages: Record<string, number> = {}
for (const line of vmStatOutput.split(/\r?\n/)) {
const count = parseVmStatPageCount(line.trim())
if (count == null) continue
if (line.includes('Pages active')) pages.active = count
else if (line.includes('Pages wired down')) pages.wired = count
else if (line.includes('Pages occupied by compressor')) pages.compressed = count
}
const usedPages = (pages.active || 0) + (pages.wired || 0) + (pages.compressed || 0)
if (usedPages <= 0) return null
const usedMemory = Math.min(totalMemoryBytes, usedPages * pageSize)
const freeMemory = Math.max(0, totalMemoryBytes - usedMemory)
return {
totalMemoryBytes,
freeMemoryBytes: freeMemory,
usedMemoryBytes: usedMemory,
memoryPercent: clampPercent((usedMemory / totalMemoryBytes) * 100),
}
}
function collectMacSystemMemoryUsage(): SystemMemoryUsage | null {
try {
const totalRaw = execFileSync('sysctl', ['-n', 'hw.memsize'], {
encoding: 'utf-8',
timeout: 3000,
}).trim()
const totalMemoryBytes = Number(totalRaw)
const vmStatOutput = execFileSync('vm_stat', {
encoding: 'utf-8',
timeout: 3000,
})
return parseMacVmStatMemory(vmStatOutput, totalMemoryBytes)
} catch {
return null
}
}
function collectSystemMemoryUsage(): SystemMemoryUsage {
if (platform() === 'darwin') {
return collectMacSystemMemoryUsage() || fallbackSystemMemoryUsage()
}
return fallbackSystemMemoryUsage()
}
function collectPosixProcessMetrics(pids: number[]): Map<number, Partial<ProcessUsage>> {
const metrics = collectProcfsProcessMetrics(pids)
if (!pids.length) return metrics
try {
const output = execFileSync('ps', ['-o', 'pid=,pcpu=,rss=,comm=', '-p', pids.join(',')], {
encoding: 'utf-8',
timeout: 3000,
})
for (const line of output.split(/\r?\n/)) {
const trimmed = line.trim()
if (!trimmed) continue
const [pidRaw, cpuRaw, rssRaw, ...commandParts] = trimmed.split(/\s+/)
const pid = Number(pidRaw)
if (!Number.isFinite(pid)) continue
const rssKb = numberOrNull(rssRaw)
metrics.set(pid, {
cpuPercent: numberOrNull(cpuRaw) ?? 0,
memoryRssBytes: rssKb == null ? metrics.get(pid)?.memoryRssBytes : rssKb * 1024,
command: commandParts.join(' ') || undefined,
})
}
return metrics
} catch {
return metrics
}
}
function collectProcfsProcessMetrics(pids: number[]): Map<number, Partial<ProcessUsage>> {
const metrics = new Map<number, Partial<ProcessUsage>>()
for (const pid of pids) {
try {
const status = readFileSync(`/proc/${pid}/status`, 'utf-8')
const rssKb = Number(status.match(/^VmRSS:\s+(\d+)\s+kB/im)?.[1])
const name = status.match(/^Name:\s+(.+)$/im)?.[1]?.trim()
metrics.set(pid, {
cpuPercent: 0,
memoryRssBytes: Number.isFinite(rssKb) ? rssKb * 1024 : 0,
command: name,
})
} catch {}
}
return metrics
}
function parseWindowsJson(output: string): any[] {
if (!output.trim()) return []
const parsed = JSON.parse(output)
return Array.isArray(parsed) ? parsed : [parsed]
}
function collectWindowsProcessMetrics(pids: number[]): Map<number, Partial<ProcessUsage>> {
if (!pids.length) return new Map()
const idList = pids.join(',')
try {
const script = [
`$ids=@(${idList})`,
'Get-CimInstance Win32_PerfFormattedData_PerfProc_Process',
'| Where-Object { $ids -contains [int]$_.IDProcess }',
'| Select-Object @{Name="pid";Expression={[int]$_.IDProcess}},@{Name="cpuPercent";Expression={[double]$_.PercentProcessorTime}},@{Name="memoryRssBytes";Expression={[double]$_.WorkingSet}},@{Name="command";Expression={$_.Name}}',
'| ConvertTo-Json -Compress',
].join(' ')
const output = execFileSync('powershell.exe', ['-NoProfile', '-Command', script], {
encoding: 'utf-8',
timeout: 5000,
windowsHide: true,
})
const metrics = new Map<number, Partial<ProcessUsage>>()
for (const item of parseWindowsJson(output)) {
const pid = Number(item?.pid)
if (!Number.isFinite(pid)) continue
metrics.set(pid, {
cpuPercent: numberOrNull(item?.cpuPercent) ?? 0,
memoryRssBytes: numberOrNull(item?.memoryRssBytes) ?? 0,
command: typeof item?.command === 'string' ? item.command : undefined,
})
}
return metrics
} catch {}
const metrics = new Map<number, Partial<ProcessUsage>>()
for (const pid of pids) {
try {
const output = execFileSync('tasklist.exe', ['/FI', `PID eq ${pid}`, '/FO', 'CSV', '/NH'], {
encoding: 'utf-8',
timeout: 3000,
windowsHide: true,
})
const line = output.split(/\r?\n/).find(item => item.includes(`"${pid}"`))
if (!line) continue
const columns = line.match(/(".*?"|[^",]+)(?=\s*,|\s*$)/g)?.map(value => value.replace(/^"|"$/g, '')) || []
const memoryKb = Number(columns[4]?.replace(/[^\d]/g, ''))
metrics.set(pid, {
cpuPercent: 0,
memoryRssBytes: Number.isFinite(memoryKb) ? memoryKb * 1024 : 0,
command: columns[0],
})
} catch {}
}
return metrics
}
function collectProcessMetrics(pids: number[]): Map<number, Partial<ProcessUsage>> {
const uniquePids = [...new Set(pids.filter(pid => Number.isFinite(pid) && pid > 0))]
return platform() === 'win32'
? collectWindowsProcessMetrics(uniquePids)
: collectPosixProcessMetrics(uniquePids)
}
function processUsage(
pid: number | undefined,
role: ProcessUsage['role'],
metrics: Map<number, Partial<ProcessUsage>>,
profile?: string,
): ProcessUsage | undefined {
if (!pid) return undefined
const metric = metrics.get(pid)
return {
pid,
role,
profile,
running: !!metric,
cpuPercent: metric?.cpuPercent ?? 0,
memoryRssBytes: metric?.memoryRssBytes ?? 0,
command: metric?.command,
}
}
function normalizeWorker(raw: unknown): {
running: boolean
pid?: number
endpoint?: string
lastUsedAt?: number
} {
if (typeof raw === 'boolean') return { running: raw }
if (!raw || typeof raw !== 'object') return { running: false }
const record = raw as Record<string, unknown>
const pid = Number(record.pid)
const lastUsedAt = Number(record.last_used_at)
return {
running: !!record.running,
pid: Number.isFinite(pid) && pid > 0 ? pid : undefined,
endpoint: typeof record.endpoint === 'string' ? record.endpoint : undefined,
lastUsedAt: Number.isFinite(lastUsedAt) ? lastUsedAt : undefined,
}
}
export function createEmptyOpsRuntimeSnapshot(error?: string): OpsRuntimeSnapshot {
return {
timestamp: Date.now(),
system: {
platform: process.platform,
arch: process.arch,
uptimeSeconds: safeUptime(),
cpuCount: safeCpus().length,
cpuPercent: 0,
loadAverage: safeLoadAverage(),
totalMemoryBytes: 0,
freeMemoryBytes: 0,
usedMemoryBytes: 0,
memoryPercent: 0,
},
web: {
pid: process.pid,
uptimeSeconds: safeProcessUptime(),
memory: safeProcessMemoryUsage(),
cpuPercent: 0,
},
bridge: {
endpoint: '',
reachable: false,
error,
broker: {
running: false,
ready: false,
restartScheduled: false,
restartAttempts: 0,
},
workers: [],
totalWorkerMemoryRssBytes: 0,
},
sessions: {
active: 0,
running: 0,
byProfile: {},
},
}
}
export async function getOpsRuntimeSnapshot(): Promise<OpsRuntimeSnapshot> {
const manager = getAgentBridgeManager()
const managerState = manager.getRuntimeState()
let bridgeReachable = false
let bridgeError: string | undefined
let bridgePing: Record<string, any> = {}
let sessions: Array<Record<string, any>> = []
try {
const client = new AgentBridgeClient({ endpoint: managerState.endpoint, timeoutMs: 2000, connectRetryMs: 0 })
bridgePing = await client.ping() as Record<string, any>
bridgeReachable = true
try {
const list = await client.list()
sessions = Array.isArray((list as any).sessions) ? (list as any).sessions : []
} catch {}
} catch (err: any) {
bridgeError = err?.message || 'Agent bridge is not reachable'
}
const workerEntries = Object.entries((bridgePing.worker_details || {}) as Record<string, unknown>)
.map(([profile, value]) => [profile, normalizeWorker(value)] as const)
const brokerPid = Number(bridgePing.broker?.pid || managerState.pid)
const pids = [
process.pid,
Number.isFinite(brokerPid) ? brokerPid : undefined,
...workerEntries.map(([, worker]) => worker.pid),
].filter((pid): pid is number => typeof pid === 'number' && pid > 0)
const processMetrics = collectProcessMetrics(pids)
const sessionCountsByProfile: Record<string, number> = {}
let runningSessions = 0
for (const session of sessions) {
const profileName = String(session.profile || 'default')
sessionCountsByProfile[profileName] = (sessionCountsByProfile[profileName] || 0) + 1
if (session.running) runningSessions += 1
}
if (!sessions.length && bridgePing.sessions_by_profile && typeof bridgePing.sessions_by_profile === 'object') {
for (const [profileName, count] of Object.entries(bridgePing.sessions_by_profile)) {
const value = Number(count)
if (Number.isFinite(value)) sessionCountsByProfile[profileName] = value
}
}
const workers = workerEntries.map(([profileName, worker]) => {
const usage = processUsage(worker.pid, 'worker', processMetrics, profileName)
return {
pid: worker.pid || 0,
role: 'worker' as const,
profile: profileName,
running: worker.running,
cpuPercent: usage?.cpuPercent ?? 0,
memoryRssBytes: usage?.memoryRssBytes ?? 0,
command: usage?.command,
endpoint: worker.endpoint,
lastUsedAt: worker.lastUsedAt,
sessionCount: sessionCountsByProfile[profileName] || 0,
runningSessionCount: sessions.filter(session => String(session.profile || 'default') === profileName && session.running).length,
}
})
const systemMemory = collectSystemMemoryUsage()
const totalWorkerMemory = workers.reduce((sum, worker) => sum + (worker.memoryRssBytes || 0), 0)
return {
timestamp: Date.now(),
system: {
platform: process.platform,
arch: process.arch,
uptimeSeconds: safeUptime(),
cpuCount: safeCpus().length,
cpuPercent: sampleSystemCpuPercent() ?? 0,
loadAverage: safeLoadAverage(),
totalMemoryBytes: systemMemory.totalMemoryBytes,
freeMemoryBytes: systemMemory.freeMemoryBytes,
usedMemoryBytes: systemMemory.usedMemoryBytes,
memoryPercent: systemMemory.memoryPercent,
},
web: {
pid: process.pid,
uptimeSeconds: safeProcessUptime(),
memory: safeProcessMemoryUsage(),
cpuPercent: sampleWebCpuPercent() ?? 0,
},
bridge: {
endpoint: managerState.endpoint,
reachable: bridgeReachable,
error: bridgeError,
broker: {
running: managerState.running,
ready: managerState.ready,
pid: Number.isFinite(brokerPid) && brokerPid > 0 ? brokerPid : undefined,
process: processUsage(Number.isFinite(brokerPid) ? brokerPid : undefined, 'broker', processMetrics),
restartScheduled: managerState.restartScheduled,
restartAttempts: managerState.restartAttempts,
},
workers,
totalWorkerMemoryRssBytes: totalWorkerMemory,
},
sessions: {
active: sessions.length || Number(bridgePing.active_sessions || 0),
running: runningSessions,
byProfile: sessionCountsByProfile,
},
}
}