diff --git a/packages/client/src/api/hermes/performance-monitor.ts b/packages/client/src/api/hermes/performance-monitor.ts new file mode 100644 index 0000000..ecce62b --- /dev/null +++ b/packages/client/src/api/hermes/performance-monitor.ts @@ -0,0 +1,63 @@ +import { request } from '../client' + +export interface ProcessUsage { + pid: number + role: 'web' | 'broker' | 'worker' + profile?: string + running: boolean + cpuPercent: number + memoryRssBytes: number + command?: string + error?: string +} + +export interface PerformanceRuntimeSnapshot { + timestamp: number + system: { + platform: string + arch: string + uptimeSeconds: number + cpuCount: number + cpuPercent: number + loadAverage: number[] + totalMemoryBytes: number + freeMemoryBytes: number + usedMemoryBytes: number + memoryPercent: number + } + web: { + pid: number + uptimeSeconds: number + memory: Record + cpuPercent: number + } + bridge: { + endpoint: string + reachable: boolean + error?: string + broker: { + running: boolean + ready: boolean + pid?: number + process?: ProcessUsage + restartScheduled: boolean + restartAttempts: number + } + workers: Array + totalWorkerMemoryRssBytes: number + } + sessions: { + active: number + running: number + byProfile: Record + } +} + +export async function fetchPerformanceRuntime(): Promise { + return request('/api/hermes/performance/runtime') +} diff --git a/packages/client/src/components/layout/AppSidebar.vue b/packages/client/src/components/layout/AppSidebar.vue index 265e7ec..cc38632 100644 --- a/packages/client/src/components/layout/AppSidebar.vue +++ b/packages/client/src/components/layout/AppSidebar.vue @@ -226,10 +226,17 @@ function openChangelog() { {{ t("sidebar.usage") }} - + diff --git a/packages/client/src/i18n/locales/de.ts b/packages/client/src/i18n/locales/de.ts index abfb770..f559832 100644 --- a/packages/client/src/i18n/locales/de.ts +++ b/packages/client/src/i18n/locales/de.ts @@ -82,6 +82,7 @@ export default { memory: 'Gedachtnis', logs: 'Protokolle', usage: 'Nutzung', + performance: 'Leistung', skillsUsage: 'Skill-Nutzung', channels: 'Kanale', terminal: 'Konsole', @@ -116,6 +117,36 @@ export default { collapse: 'Menü einklappen', }, + performance: { + title: 'Leistung', + subtitle: 'Systemressourcen, Bridge Broker, Workers und aktive Sitzungen überwachen', + refresh: 'Aktualisieren', + autoRefreshOn: 'Automatisch aktualisieren', + autoRefreshOff: 'Manuell aktualisieren', + loadFailed: 'Leistungsdaten konnten nicht geladen werden', + systemCpu: 'System-CPU', + systemMemory: 'Systemspeicher', + activeSessions: 'Aktive Sitzungen', + runningSessions: 'Laufend {count}', + workers: 'Workers', + totalWorkerMemory: 'Worker-Gesamtspeicher', + processes: 'Prozesse', + uptime: 'Laufzeit', + running: 'Läuft', + stopped: 'Gestoppt', + workerMemory: 'Worker-Speicher', + lastUpdated: 'Aktualisiert', + profile: 'Profile', + memory: 'Speicher', + sessions: 'Sitzungen', + runningActiveSessions: 'Laufend / Aktiv', + lastUsed: 'Zuletzt verwendet', + status: 'Status', + noWorkers: 'Keine Workers', + sessionsByProfile: 'Sitzungen nach Profile', + noActiveSessions: 'Keine aktiven Sitzungen', + }, + // Drawer drawer: { terminal: 'Konsole', diff --git a/packages/client/src/i18n/locales/en.ts b/packages/client/src/i18n/locales/en.ts index 85df13a..98b9f2b 100644 --- a/packages/client/src/i18n/locales/en.ts +++ b/packages/client/src/i18n/locales/en.ts @@ -83,6 +83,7 @@ export default { memory: 'Memory', logs: 'Logs', usage: 'Usage', + performance: 'Performance', skillsUsage: 'Skills Usage', channels: 'Channels', gateways: 'Gateways', @@ -116,6 +117,36 @@ export default { noChangelog: 'No changelog available', }, + performance: { + title: 'Performance', + subtitle: 'Inspect system resources, bridge broker, workers, and active sessions', + refresh: 'Refresh', + autoRefreshOn: 'Auto refresh', + autoRefreshOff: 'Manual refresh', + loadFailed: 'Failed to load performance metrics', + systemCpu: 'System CPU', + systemMemory: 'System Memory', + activeSessions: 'Active Sessions', + runningSessions: 'Running {count}', + workers: 'Workers', + totalWorkerMemory: 'Worker memory', + processes: 'Processes', + uptime: 'Uptime', + running: 'Running', + stopped: 'Stopped', + workerMemory: 'Worker Memory', + lastUpdated: 'Updated', + profile: 'Profile', + memory: 'Memory', + sessions: 'Sessions', + runningActiveSessions: 'Running / Active', + lastUsed: 'Last Used', + status: 'Status', + noWorkers: 'No workers', + sessionsByProfile: 'Sessions by Profile', + noActiveSessions: 'No active sessions', + }, + // Drawer drawer: { terminal: 'Terminal', diff --git a/packages/client/src/i18n/locales/es.ts b/packages/client/src/i18n/locales/es.ts index 679f4ab..8a4928e 100644 --- a/packages/client/src/i18n/locales/es.ts +++ b/packages/client/src/i18n/locales/es.ts @@ -82,6 +82,7 @@ export default { memory: 'Memoria', logs: 'Registros', usage: 'Uso', + performance: 'Rendimiento', skillsUsage: 'Uso de habilidades', channels: 'Canales', terminal: 'Terminal', @@ -116,6 +117,36 @@ export default { collapse: 'Contraer menú', }, + performance: { + title: 'Rendimiento', + subtitle: 'Supervisa recursos del sistema, Bridge Broker, Workers y sesiones activas', + refresh: 'Actualizar', + autoRefreshOn: 'Actualización automática', + autoRefreshOff: 'Actualización manual', + loadFailed: 'No se pudieron cargar las métricas de rendimiento', + systemCpu: 'CPU del sistema', + systemMemory: 'Memoria del sistema', + activeSessions: 'Sesiones activas', + runningSessions: 'En ejecución {count}', + workers: 'Workers', + totalWorkerMemory: 'Memoria total de Worker', + processes: 'Procesos', + uptime: 'Tiempo activo', + running: 'En ejecución', + stopped: 'Detenido', + workerMemory: 'Memoria de Worker', + lastUpdated: 'Actualizado', + profile: 'Profile', + memory: 'Memoria', + sessions: 'Sesiones', + runningActiveSessions: 'En ejecución / Activas', + lastUsed: 'Último uso', + status: 'Estado', + noWorkers: 'Sin Workers', + sessionsByProfile: 'Sesiones por Profile', + noActiveSessions: 'No hay sesiones activas', + }, + // Drawer drawer: { terminal: 'Terminal', diff --git a/packages/client/src/i18n/locales/fr.ts b/packages/client/src/i18n/locales/fr.ts index dc300f7..8d6785b 100644 --- a/packages/client/src/i18n/locales/fr.ts +++ b/packages/client/src/i18n/locales/fr.ts @@ -82,6 +82,7 @@ export default { memory: 'Memoire', logs: 'Journaux', usage: 'Utilisation', + performance: 'Performance', skillsUsage: 'Utilisation des compétences', channels: 'Canaux', terminal: 'Terminal', @@ -116,6 +117,36 @@ export default { collapse: 'Replier le menu', }, + performance: { + title: 'Performance', + subtitle: 'Surveiller les ressources système, Bridge Broker, Workers et sessions actives', + refresh: 'Actualiser', + autoRefreshOn: 'Actualisation auto', + autoRefreshOff: 'Actualisation manuelle', + loadFailed: 'Échec du chargement des métriques de performance', + systemCpu: 'CPU système', + systemMemory: 'Mémoire système', + activeSessions: 'Sessions actives', + runningSessions: 'En cours {count}', + workers: 'Workers', + totalWorkerMemory: 'Mémoire totale Worker', + processes: 'Processus', + uptime: 'Disponibilité', + running: 'En cours', + stopped: 'Arrêté', + workerMemory: 'Mémoire Worker', + lastUpdated: 'Mis à jour', + profile: 'Profile', + memory: 'Mémoire', + sessions: 'Sessions', + runningActiveSessions: 'En cours / Actives', + lastUsed: 'Dernière utilisation', + status: 'Statut', + noWorkers: 'Aucun Worker', + sessionsByProfile: 'Sessions par Profile', + noActiveSessions: 'Aucune session active', + }, + // Drawer drawer: { terminal: 'Terminal', diff --git a/packages/client/src/i18n/locales/ja.ts b/packages/client/src/i18n/locales/ja.ts index fd7da5c..cbf3268 100644 --- a/packages/client/src/i18n/locales/ja.ts +++ b/packages/client/src/i18n/locales/ja.ts @@ -82,6 +82,7 @@ export default { memory: 'メモリ', logs: 'ログ', usage: '使用量', + performance: 'パフォーマンス', skillsUsage: 'スキル使用状況', channels: 'チャンネル', terminal: 'ターミナル', @@ -116,6 +117,36 @@ export default { collapse: 'メニューを折りたたむ', }, + performance: { + title: 'パフォーマンス', + subtitle: 'システムリソース、Bridge Broker、Workers、アクティブセッションを確認', + refresh: '更新', + autoRefreshOn: '自動更新', + autoRefreshOff: '手動更新', + loadFailed: 'パフォーマンスデータの読み込みに失敗しました', + systemCpu: 'システム CPU', + systemMemory: 'システムメモリ', + activeSessions: 'アクティブセッション', + runningSessions: '実行中 {count}', + workers: 'Workers', + totalWorkerMemory: 'Worker 合計メモリ', + processes: 'プロセス', + uptime: '稼働時間', + running: '実行中', + stopped: '停止', + workerMemory: 'Worker メモリ', + lastUpdated: '更新時刻', + profile: 'Profile', + memory: 'メモリ', + sessions: 'セッション', + runningActiveSessions: '実行中 / アクティブ', + lastUsed: '最終使用', + status: '状態', + noWorkers: 'Worker はありません', + sessionsByProfile: 'Profile 別セッション', + noActiveSessions: 'アクティブセッションはありません', + }, + // ドロワー drawer: { terminal: 'ターミナル', diff --git a/packages/client/src/i18n/locales/ko.ts b/packages/client/src/i18n/locales/ko.ts index c349ba8..92194ba 100644 --- a/packages/client/src/i18n/locales/ko.ts +++ b/packages/client/src/i18n/locales/ko.ts @@ -82,6 +82,7 @@ export default { memory: '메모리', logs: '로그', usage: '사용량', + performance: '성능 모니터링', skillsUsage: '스킬 사용량', channels: '채널', terminal: '터미널', @@ -116,6 +117,36 @@ export default { collapse: '메뉴 접기', }, + performance: { + title: '성능 모니터링', + subtitle: '시스템 리소스, Bridge Broker, Workers, 활성 세션 확인', + refresh: '새로고침', + autoRefreshOn: '자동 새로고침', + autoRefreshOff: '수동 새로고침', + loadFailed: '성능 데이터를 불러오지 못했습니다', + systemCpu: '시스템 CPU', + systemMemory: '시스템 메모리', + activeSessions: '활성 세션', + runningSessions: '실행 중 {count}', + workers: 'Workers', + totalWorkerMemory: 'Worker 총 메모리', + processes: '프로세스', + uptime: '실행 시간', + running: '실행 중', + stopped: '중지됨', + workerMemory: 'Worker 메모리', + lastUpdated: '업데이트 시간', + profile: 'Profile', + memory: '메모리', + sessions: '세션', + runningActiveSessions: '실행 중 / 활성', + lastUsed: '마지막 사용', + status: '상태', + noWorkers: 'Worker 없음', + sessionsByProfile: 'Profile별 세션', + noActiveSessions: '활성 세션 없음', + }, + // 서랍 drawer: { terminal: '터미널', diff --git a/packages/client/src/i18n/locales/pt.ts b/packages/client/src/i18n/locales/pt.ts index d133fec..92c6b40 100644 --- a/packages/client/src/i18n/locales/pt.ts +++ b/packages/client/src/i18n/locales/pt.ts @@ -82,6 +82,7 @@ export default { memory: 'Memoria', logs: 'Logs', usage: 'Uso', + performance: 'Desempenho', skillsUsage: 'Uso de habilidades', channels: 'Canais', terminal: 'Terminal', @@ -116,6 +117,36 @@ export default { collapse: 'Recolher menu', }, + performance: { + title: 'Desempenho', + subtitle: 'Monitore recursos do sistema, Bridge Broker, Workers e sessões ativas', + refresh: 'Atualizar', + autoRefreshOn: 'Atualização automática', + autoRefreshOff: 'Atualização manual', + loadFailed: 'Falha ao carregar métricas de desempenho', + systemCpu: 'CPU do sistema', + systemMemory: 'Memória do sistema', + activeSessions: 'Sessões ativas', + runningSessions: 'Em execução {count}', + workers: 'Workers', + totalWorkerMemory: 'Memória total de Worker', + processes: 'Processos', + uptime: 'Tempo ativo', + running: 'Em execução', + stopped: 'Parado', + workerMemory: 'Memória de Worker', + lastUpdated: 'Atualizado', + profile: 'Profile', + memory: 'Memória', + sessions: 'Sessões', + runningActiveSessions: 'Em execução / Ativas', + lastUsed: 'Último uso', + status: 'Status', + noWorkers: 'Nenhum Worker', + sessionsByProfile: 'Sessões por Profile', + noActiveSessions: 'Nenhuma sessão ativa', + }, + // Gaveta drawer: { terminal: 'Terminal', diff --git a/packages/client/src/i18n/locales/zh-TW.ts b/packages/client/src/i18n/locales/zh-TW.ts index 4da2417..642cf6b 100644 --- a/packages/client/src/i18n/locales/zh-TW.ts +++ b/packages/client/src/i18n/locales/zh-TW.ts @@ -83,6 +83,7 @@ export default { memory: '記憶', logs: '日誌', usage: '用量', + performance: '效能監控', skillsUsage: '技能用量', channels: '頻道', gateways: '閘道', @@ -116,6 +117,36 @@ export default { noChangelog: '目前無更新日誌', }, + performance: { + title: '效能監控', + subtitle: '查看系統資源、Bridge Broker、Workers 和活躍會話', + refresh: '重新整理', + autoRefreshOn: '自動重新整理', + autoRefreshOff: '手動重新整理', + loadFailed: '效能資料載入失敗', + systemCpu: '系統 CPU', + systemMemory: '系統記憶體', + activeSessions: '活躍會話', + runningSessions: '執行中 {count}', + workers: 'Workers', + totalWorkerMemory: 'Worker 總記憶體', + processes: '程序', + uptime: '執行', + running: '執行中', + stopped: '已停止', + workerMemory: 'Worker 記憶體', + lastUpdated: '更新時間', + profile: 'Profile', + memory: '記憶體', + sessions: '會話', + runningActiveSessions: '執行中 / 活躍', + lastUsed: '最後使用', + status: '狀態', + noWorkers: '暫無 Worker', + sessionsByProfile: '按 Profile 統計會話', + noActiveSessions: '暫無活躍會話', + }, + // 抽屜 drawer: { terminal: '終端機', diff --git a/packages/client/src/i18n/locales/zh.ts b/packages/client/src/i18n/locales/zh.ts index 00e51d9..f1218ec 100644 --- a/packages/client/src/i18n/locales/zh.ts +++ b/packages/client/src/i18n/locales/zh.ts @@ -83,6 +83,7 @@ export default { memory: '记忆', logs: '日志', usage: '用量', + performance: '性能监控', skillsUsage: '技能用量', channels: '频道', gateways: '网关', @@ -116,6 +117,36 @@ export default { noChangelog: '暂无更新日志', }, + performance: { + title: '性能监控', + subtitle: '查看系统资源、Bridge Broker、Workers 和活跃会话', + refresh: '刷新', + autoRefreshOn: '自动刷新', + autoRefreshOff: '手动刷新', + loadFailed: '性能数据加载失败', + systemCpu: '系统 CPU', + systemMemory: '系统内存', + activeSessions: '活跃会话', + runningSessions: '运行中 {count}', + workers: 'Workers', + totalWorkerMemory: 'Worker 总内存', + processes: '进程', + uptime: '运行', + running: '运行中', + stopped: '已停止', + workerMemory: 'Worker 内存', + lastUpdated: '更新时间', + profile: 'Profile', + memory: '内存', + sessions: '会话', + runningActiveSessions: '运行中 / 活跃', + lastUsed: '最后使用', + status: '状态', + noWorkers: '暂无 Worker', + sessionsByProfile: '按 Profile 统计会话', + noActiveSessions: '暂无活跃会话', + }, + // 抽屉 drawer: { terminal: '终端', diff --git a/packages/client/src/router/index.ts b/packages/client/src/router/index.ts index 246071b..ce77c95 100644 --- a/packages/client/src/router/index.ts +++ b/packages/client/src/router/index.ts @@ -50,6 +50,11 @@ const router = createRouter({ name: 'hermes.usage', component: () => import('@/views/hermes/UsageView.vue'), }, + { + path: '/hermes/performance', + name: 'hermes.performance', + component: () => import('@/views/hermes/PerformanceView.vue'), + }, { path: '/hermes/skills-usage', name: 'hermes.skillsUsage', diff --git a/packages/client/src/views/hermes/PerformanceView.vue b/packages/client/src/views/hermes/PerformanceView.vue new file mode 100644 index 0000000..790fe45 --- /dev/null +++ b/packages/client/src/views/hermes/PerformanceView.vue @@ -0,0 +1,486 @@ + + + + + diff --git a/packages/server/src/controllers/hermes/performance-monitor.ts b/packages/server/src/controllers/hermes/performance-monitor.ts new file mode 100644 index 0000000..74dfb8d --- /dev/null +++ b/packages/server/src/controllers/hermes/performance-monitor.ts @@ -0,0 +1,9 @@ +import { createEmptyOpsRuntimeSnapshot, getOpsRuntimeSnapshot } from '../../services/hermes/ops-monitor' + +export async function runtime(ctx: any) { + try { + ctx.body = await getOpsRuntimeSnapshot() + } catch (err: any) { + ctx.body = createEmptyOpsRuntimeSnapshot(err?.message || 'Failed to read performance metrics') + } +} diff --git a/packages/server/src/routes/hermes/performance-monitor.ts b/packages/server/src/routes/hermes/performance-monitor.ts new file mode 100644 index 0000000..ca5c1a6 --- /dev/null +++ b/packages/server/src/routes/hermes/performance-monitor.ts @@ -0,0 +1,6 @@ +import Router from '@koa/router' +import * as ctrl from '../../controllers/hermes/performance-monitor' + +export const performanceMonitorRoutes = new Router() + +performanceMonitorRoutes.get('/api/hermes/performance/runtime', ctrl.runtime) diff --git a/packages/server/src/routes/index.ts b/packages/server/src/routes/index.ts index 2538f4a..35112fc 100644 --- a/packages/server/src/routes/index.ts +++ b/packages/server/src/routes/index.ts @@ -31,6 +31,7 @@ import { ttsRoutes } from './hermes/tts' import { mediaRoutes } from './hermes/media' import { proxyRoutes, proxyMiddleware } from './hermes/proxy' import { groupChatRoutes, setGroupChatServer } from './hermes/group-chat' +import { performanceMonitorRoutes } from './hermes/performance-monitor' /** * Register all routes on the Koa app. @@ -72,6 +73,7 @@ export function registerRoutes(app: any, requireAuth: (ctx: Context, next: Next) app.use(cronHistoryRoutes.routes()) // Must be before proxy app.use(kanbanRoutes.routes()) // Must be before proxy app.use(mediaRoutes.routes()) // Must be before proxy + app.use(performanceMonitorRoutes.routes()) // Must be before proxy app.use(proxyRoutes.routes()) // Proxy catch-all middleware (must be last) diff --git a/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py b/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py index 87500a9..a867772 100755 --- a/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py +++ b/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py @@ -10,13 +10,16 @@ delimited JSON request/response protocol over a local socket. from __future__ import annotations import argparse +import atexit import copy +import errno import hashlib import importlib.util import json import locale import os import queue +import signal import shutil import socket import subprocess @@ -38,12 +41,100 @@ DEFAULT_AGENT_ROOT = "~/.hermes/hermes-agent" DEFAULT_HERMES_HOME = "~/.hermes" APPROVAL_TIMEOUT_SECONDS = 120 APPROVAL_TIMEOUT_MS = APPROVAL_TIMEOUT_SECONDS * 1000 +PARENT_WATCHDOG_INTERVAL_SECONDS = 2.0 def _bridge_platform() -> str: return os.environ.get("HERMES_AGENT_BRIDGE_PLATFORM", "cli").strip() or "cli" +def _positive_int(value: str | None) -> int | None: + if not value: + return None + try: + parsed = int(value) + except (TypeError, ValueError): + return None + return parsed if parsed > 0 else None + + +def _process_exists(pid: int) -> bool: + if pid <= 0: + return False + if os.name == "nt": + try: + result = subprocess.run( + ["tasklist.exe", "/FI", f"PID eq {pid}", "/NH"], + check=False, + capture_output=True, + text=True, + timeout=5, + ) + return str(pid) in (result.stdout or "") + except Exception: + return True + try: + os.kill(pid, 0) + return True + except ProcessLookupError: + return False + except PermissionError: + return True + except OSError as exc: + return exc.errno != errno.ESRCH + + +def _start_parent_process_watchdog( + parent_pid: int | None, + stop_event: threading.Event, + label: str, + interval: float = PARENT_WATCHDOG_INTERVAL_SECONDS, +) -> None: + if not parent_pid or parent_pid == os.getpid(): + return + + def run() -> None: + while not stop_event.wait(interval): + if _process_exists(parent_pid): + continue + print( + f"[hermes-bridge] parent pid {parent_pid} exited; stopping {label}", + file=sys.stderr, + flush=True, + ) + stop_event.set() + return + + threading.Thread(target=run, daemon=True, name=f"hermes-bridge-parent-watchdog-{label}").start() + + +def _install_stop_signal_handlers(stop_event: threading.Event) -> Callable[[], None]: + if threading.current_thread() is not threading.main_thread(): + return lambda: None + + previous: list[tuple[signal.Signals, Any]] = [] + + def handle_signal(signum: int, _frame: Any) -> None: + stop_event.set() + + for signum in (signal.SIGINT, signal.SIGTERM): + try: + sig = signal.Signals(signum) + previous.append((sig, signal.getsignal(sig))) + signal.signal(sig, handle_signal) + except Exception: + pass + + def restore() -> None: + for sig, handler in previous: + try: + signal.signal(sig, handler) + except Exception: + pass + + return restore + + def _suppress_bridge_platform_hint() -> None: raw = os.environ.get("HERMES_BRIDGE_SUPPRESS_PLATFORM_HINT", "cli").strip() if raw.lower() in {"0", "false", "no", "off"}: @@ -1452,12 +1543,18 @@ class BridgeServer: raise ValueError("action is required") if action == "ping": + with self.pool._lock: + sessions = list(self.pool._sessions.values()) + running_sessions = sum(1 for session in sessions if session.running) return { "pong": True, "time": time.time(), + "pid": os.getpid(), "agent_root": str(_agent_root()), "profile": _worker_profile() or "default", "hermes_home": str(_hermes_home()), + "session_count": len(sessions), + "running_session_count": running_sessions, } if action == "chat": @@ -1588,46 +1685,54 @@ class BridgeServer: def serve_forever(self) -> None: server = self._make_server_socket() - server.listen(16) - server.settimeout(0.2) - print(json.dumps({"event": "ready", "endpoint": self.endpoint}), flush=True) + restore_signals = _install_stop_signal_handlers(self._stop) + _start_parent_process_watchdog( + _positive_int(os.environ.get("HERMES_AGENT_BRIDGE_BROKER_PID")), + self._stop, + f"worker:{_worker_profile() or 'default'}", + ) + try: + server.listen(16) + server.settimeout(0.2) + print(json.dumps({"event": "ready", "endpoint": self.endpoint}), flush=True) - while not self._stop.is_set(): - conn: socket.socket | None = None - try: + while not self._stop.is_set(): + conn: socket.socket | None = None try: - conn, _addr = server.accept() - except socket.timeout: - self._gc_idle_sessions() - continue - try: - req = self._read_request(conn) - data = self.handle(req) - resp = {"ok": True, **_jsonable(data)} - except Exception as exc: - resp = { - "ok": False, - "error": str(exc), - "error_type": exc.__class__.__name__, - } - self._write_response(conn, resp) - except KeyboardInterrupt: - break - except Exception as exc: - print(f"[hermes-bridge] server loop error: {exc}", file=sys.stderr, flush=True) - finally: - if conn is not None: try: - conn.close() - except OSError: - pass - - server.close() - if self.endpoint.startswith("ipc://"): - try: - Path(self.endpoint.removeprefix("ipc://")).unlink(missing_ok=True) - except OSError: - pass + conn, _addr = server.accept() + except socket.timeout: + self._gc_idle_sessions() + continue + try: + req = self._read_request(conn) + data = self.handle(req) + resp = {"ok": True, **_jsonable(data)} + except Exception as exc: + resp = { + "ok": False, + "error": str(exc), + "error_type": exc.__class__.__name__, + } + self._write_response(conn, resp) + except KeyboardInterrupt: + break + except Exception as exc: + print(f"[hermes-bridge] server loop error: {exc}", file=sys.stderr, flush=True) + finally: + if conn is not None: + try: + conn.close() + except OSError: + pass + finally: + restore_signals() + server.close() + if self.endpoint.startswith("ipc://"): + try: + Path(self.endpoint.removeprefix("ipc://")).unlink(missing_ok=True) + except OSError: + pass class WorkerProcess: @@ -1647,6 +1752,10 @@ class WorkerProcess: def running(self) -> bool: return self.process is not None and self.process.poll() is None + @property + def pid(self) -> int | None: + return self.process.pid if self.process is not None else None + def start(self) -> None: with self._lock: if self.running: @@ -1668,6 +1777,7 @@ class WorkerProcess: **os.environ, "HERMES_AGENT_BRIDGE_ENDPOINT": self.endpoint, "HERMES_AGENT_BRIDGE_WORKER_PROFILE": self.profile, + "HERMES_AGENT_BRIDGE_BROKER_PID": str(os.getpid()), } self.process = subprocess.Popen( args, @@ -2019,6 +2129,18 @@ class BridgeBroker: if event.get("event") in {"bridge.compression.completed", "bridge.compression.failed"} and request_id: self._compression_profile.pop(request_id, None) + def stop(self) -> None: + self._stop.set() + with self._lock: + workers = list(self._workers.values()) + self._workers.clear() + self._run_profile.clear() + self._session_profile.clear() + self._approval_profile.clear() + self._compression_profile.clear() + for worker in workers: + worker.stop() + def _forward(self, profile: str, req: dict[str, Any]) -> dict[str, Any]: worker = self._worker_for_profile(profile) forwarded = dict(req) @@ -2034,8 +2156,33 @@ class BridgeBroker: if action == "ping": with self._lock: - workers = {profile: worker.running for profile, worker in self._workers.items()} - return {"pong": True, "time": time.time(), "mode": "broker", "workers": workers} + worker_details = { + profile: { + "running": worker.running, + "pid": worker.pid, + "endpoint": worker.endpoint, + "last_used_at": worker.last_used_at, + } + for profile, worker in self._workers.items() + } + workers = {profile: details["running"] for profile, details in worker_details.items()} + sessions_by_profile: dict[str, int] = {} + for profile in self._session_profile.values(): + sessions_by_profile[profile] = sessions_by_profile.get(profile, 0) + 1 + active_sessions = len(self._session_profile) + return { + "pong": True, + "time": time.time(), + "mode": "broker", + "broker": { + "pid": os.getpid(), + "endpoint": self.endpoint, + }, + "workers": workers, + "worker_details": worker_details, + "active_sessions": active_sessions, + "sessions_by_profile": sessions_by_profile, + } if action == "worker_ping": profile = self._normalize_profile(req.get("profile")) @@ -2145,17 +2292,7 @@ class BridgeBroker: return {"sessions": sessions} if action == "shutdown": - self._stop.set() - with self._lock: - workers = list(self._workers.values()) - for worker in workers: - if not worker.running: - worker.stop() - continue - try: - worker.request({"action": "shutdown"}) - except Exception: - worker.stop() + self.stop() return {"status": "shutting_down"} raise ValueError(f"unknown action: {action}") @@ -2187,51 +2324,55 @@ class BridgeBroker: def serve_forever(self) -> None: server = self._make_server_socket() - server.listen(64) - server.settimeout(0.2) - print(json.dumps({"event": "ready", "endpoint": self.endpoint, "mode": "broker"}), flush=True) + restore_signals = _install_stop_signal_handlers(self._stop) + atexit.register(self.stop) + try: + server.listen(64) + server.settimeout(0.2) + print(json.dumps({"event": "ready", "endpoint": self.endpoint, "mode": "broker"}), flush=True) - while not self._stop.is_set(): - conn: socket.socket | None = None - try: + while not self._stop.is_set(): + conn: socket.socket | None = None try: - conn, _addr = server.accept() - except socket.timeout: - self._gc_idle_workers() - continue - try: - req = self._read_request(conn) - data = self.handle(req) - resp = {"ok": True, **_jsonable(data)} - except Exception as exc: - resp = { - "ok": False, - "error": str(exc), - "error_type": exc.__class__.__name__, - } - self._write_response(conn, resp) - except KeyboardInterrupt: - break - except Exception as exc: - print(f"[hermes-bridge-broker] server loop error: {exc}", file=sys.stderr, flush=True) - finally: - if conn is not None: try: - conn.close() - except OSError: - pass - - with self._lock: - workers = list(self._workers.values()) - self._workers.clear() - for worker in workers: - worker.stop() - server.close() - if self.endpoint.startswith("ipc://"): + conn, _addr = server.accept() + except socket.timeout: + self._gc_idle_workers() + continue + try: + req = self._read_request(conn) + data = self.handle(req) + resp = {"ok": True, **_jsonable(data)} + except Exception as exc: + resp = { + "ok": False, + "error": str(exc), + "error_type": exc.__class__.__name__, + } + self._write_response(conn, resp) + except KeyboardInterrupt: + break + except Exception as exc: + print(f"[hermes-bridge-broker] server loop error: {exc}", file=sys.stderr, flush=True) + finally: + if conn is not None: + try: + conn.close() + except OSError: + pass + finally: + restore_signals() try: - Path(self.endpoint.removeprefix("ipc://")).unlink(missing_ok=True) - except OSError: + atexit.unregister(self.stop) + except Exception: pass + self.stop() + server.close() + if self.endpoint.startswith("ipc://"): + try: + Path(self.endpoint.removeprefix("ipc://")).unlink(missing_ok=True) + except OSError: + pass def main(argv: list[str] | None = None) -> int: diff --git a/packages/server/src/services/hermes/agent-bridge/manager.ts b/packages/server/src/services/hermes/agent-bridge/manager.ts index a6d49c4..bb13a3c 100644 --- a/packages/server/src/services/hermes/agent-bridge/manager.ts +++ b/packages/server/src/services/hermes/agent-bridge/manager.ts @@ -25,6 +25,17 @@ export interface BridgeCommand { hermesHome: string } +export interface AgentBridgeManagerRuntimeState { + endpoint: string + running: boolean + ready: boolean + pid?: number + starting: boolean + stopping: boolean + restartScheduled: boolean + restartAttempts: number +} + function envPositiveInt(name: string): number | undefined { const raw = process.env[name] if (!raw) return undefined @@ -308,6 +319,19 @@ export class AgentBridgeManager { return !!this.child && !this.child.killed && this.ready } + getRuntimeState(): AgentBridgeManagerRuntimeState { + return { + endpoint: this.endpoint, + running: this.running, + ready: this.ready, + pid: this.child?.pid, + starting: !!this.starting, + stopping: this.stopping, + restartScheduled: !!this.restartTimer, + restartAttempts: this.restartAttempts, + } + } + async start(): Promise { if (this.running) return if (this.starting) return this.starting diff --git a/packages/server/src/services/hermes/ops-monitor.ts b/packages/server/src/services/hermes/ops-monitor.ts new file mode 100644 index 0000000..1d36f8b --- /dev/null +++ b/packages/server/src/services/hermes/ops-monitor.ts @@ -0,0 +1,551 @@ +import { execFileSync } from 'child_process' +import { readFileSync } from 'fs' +import { cpus, freemem, loadavg, platform, totalmem, uptime } from 'os' +import { AgentBridgeClient } from './agent-bridge' +import { getAgentBridgeManager } from './agent-bridge/manager' + +export interface ProcessUsage { + pid: number + role: 'web' | 'broker' | 'worker' + profile?: string + running: boolean + cpuPercent: number + memoryRssBytes: number + command?: string + error?: string +} + +export interface OpsRuntimeSnapshot { + timestamp: number + system: { + platform: NodeJS.Platform + arch: string + uptimeSeconds: number + cpuCount: number + cpuPercent: number + loadAverage: number[] + totalMemoryBytes: number + freeMemoryBytes: number + usedMemoryBytes: number + memoryPercent: number + } + web: { + pid: number + uptimeSeconds: number + memory: NodeJS.MemoryUsage + cpuPercent: number + } + bridge: { + endpoint: string + reachable: boolean + error?: string + broker: { + running: boolean + ready: boolean + pid?: number + process?: ProcessUsage + restartScheduled: boolean + restartAttempts: number + } + workers: Array + totalWorkerMemoryRssBytes: number + } + sessions: { + active: number + running: number + byProfile: Record + } +} + +interface CpuTimesSample { + idle: number + total: number +} + +interface WebCpuSample { + at: number + usage: NodeJS.CpuUsage +} + +interface SystemMemoryUsage { + totalMemoryBytes: number + freeMemoryBytes: number + usedMemoryBytes: number + memoryPercent: number +} + +let previousSystemCpu: CpuTimesSample | null = null +let previousWebCpu: WebCpuSample | null = null + +function safeCpus(): ReturnType { + try { + return cpus() + } catch { + return [] + } +} + +function safeLoadAverage(): number[] { + try { + return loadavg() + } catch { + return [0, 0, 0] + } +} + +function safeUptime(): number { + try { + return uptime() + } catch { + return 0 + } +} + +function safeProcessUptime(): number { + try { + return process.uptime() + } catch { + return 0 + } +} + +function safeProcessMemoryUsage(): NodeJS.MemoryUsage { + try { + return process.memoryUsage() + } catch { + return { + rss: 0, + heapTotal: 0, + heapUsed: 0, + external: 0, + arrayBuffers: 0, + } + } +} + +function readCpuTimes(): CpuTimesSample { + let idle = 0 + let total = 0 + for (const cpu of safeCpus()) { + idle += cpu.times.idle + total += Object.values(cpu.times).reduce((sum, value) => sum + value, 0) + } + return { idle, total } +} + +function sampleSystemCpuPercent(): number | null { + try { + const current = readCpuTimes() + const previous = previousSystemCpu + previousSystemCpu = current + if (!previous) return null + + const idleDelta = current.idle - previous.idle + const totalDelta = current.total - previous.total + if (totalDelta <= 0) return null + return clampPercent(((totalDelta - idleDelta) / totalDelta) * 100) + } catch { + return null + } +} + +function sampleWebCpuPercent(): number | null { + try { + const current = { + at: Date.now(), + usage: process.cpuUsage(), + } + const previous = previousWebCpu + previousWebCpu = current + if (!previous) return null + + const elapsedMicros = (current.at - previous.at) * 1000 + const used = (current.usage.user - previous.usage.user) + (current.usage.system - previous.usage.system) + if (elapsedMicros <= 0 || used < 0) return null + return clampPercent((used / elapsedMicros / Math.max(safeCpus().length, 1)) * 100) + } catch { + return null + } +} + +function clampPercent(value: number): number { + return Math.max(0, Math.min(100, Math.round(value * 10) / 10)) +} + +function numberOrNull(value: unknown): number | null { + const parsed = Number(value) + return Number.isFinite(parsed) ? parsed : null +} + +function fallbackSystemMemoryUsage(): SystemMemoryUsage { + let memoryTotal = 0 + let memoryFree = 0 + try { + memoryTotal = totalmem() + memoryFree = freemem() + } catch {} + const usedMemory = memoryTotal - memoryFree + return { + totalMemoryBytes: memoryTotal, + freeMemoryBytes: memoryFree, + usedMemoryBytes: usedMemory, + memoryPercent: memoryTotal > 0 ? clampPercent((usedMemory / memoryTotal) * 100) : 0, + } +} + +function parseVmStatPageCount(line: string): number | null { + const match = line.match(/:\s+([\d.]+)\.?$/) + if (!match) return null + const value = Number(match[1].replace(/\./g, '')) + return Number.isFinite(value) ? value : null +} + +export function parseMacVmStatMemory(vmStatOutput: string, totalMemoryBytes: number): SystemMemoryUsage | null { + const pageSize = Number(vmStatOutput.match(/page size of\s+(\d+)\s+bytes/i)?.[1]) + if (!Number.isFinite(pageSize) || pageSize <= 0 || totalMemoryBytes <= 0) return null + + const pages: Record = {} + for (const line of vmStatOutput.split(/\r?\n/)) { + const count = parseVmStatPageCount(line.trim()) + if (count == null) continue + if (line.includes('Pages active')) pages.active = count + else if (line.includes('Pages wired down')) pages.wired = count + else if (line.includes('Pages occupied by compressor')) pages.compressed = count + } + + const usedPages = (pages.active || 0) + (pages.wired || 0) + (pages.compressed || 0) + if (usedPages <= 0) return null + const usedMemory = Math.min(totalMemoryBytes, usedPages * pageSize) + const freeMemory = Math.max(0, totalMemoryBytes - usedMemory) + + return { + totalMemoryBytes, + freeMemoryBytes: freeMemory, + usedMemoryBytes: usedMemory, + memoryPercent: clampPercent((usedMemory / totalMemoryBytes) * 100), + } +} + +function collectMacSystemMemoryUsage(): SystemMemoryUsage | null { + try { + const totalRaw = execFileSync('sysctl', ['-n', 'hw.memsize'], { + encoding: 'utf-8', + timeout: 3000, + }).trim() + const totalMemoryBytes = Number(totalRaw) + const vmStatOutput = execFileSync('vm_stat', { + encoding: 'utf-8', + timeout: 3000, + }) + return parseMacVmStatMemory(vmStatOutput, totalMemoryBytes) + } catch { + return null + } +} + +function collectSystemMemoryUsage(): SystemMemoryUsage { + if (platform() === 'darwin') { + return collectMacSystemMemoryUsage() || fallbackSystemMemoryUsage() + } + return fallbackSystemMemoryUsage() +} + +function collectPosixProcessMetrics(pids: number[]): Map> { + const metrics = collectProcfsProcessMetrics(pids) + if (!pids.length) return metrics + try { + const output = execFileSync('ps', ['-o', 'pid=,pcpu=,rss=,comm=', '-p', pids.join(',')], { + encoding: 'utf-8', + timeout: 3000, + }) + for (const line of output.split(/\r?\n/)) { + const trimmed = line.trim() + if (!trimmed) continue + const [pidRaw, cpuRaw, rssRaw, ...commandParts] = trimmed.split(/\s+/) + const pid = Number(pidRaw) + if (!Number.isFinite(pid)) continue + const rssKb = numberOrNull(rssRaw) + metrics.set(pid, { + cpuPercent: numberOrNull(cpuRaw) ?? 0, + memoryRssBytes: rssKb == null ? metrics.get(pid)?.memoryRssBytes : rssKb * 1024, + command: commandParts.join(' ') || undefined, + }) + } + return metrics + } catch { + return metrics + } +} + +function collectProcfsProcessMetrics(pids: number[]): Map> { + const metrics = new Map>() + for (const pid of pids) { + try { + const status = readFileSync(`/proc/${pid}/status`, 'utf-8') + const rssKb = Number(status.match(/^VmRSS:\s+(\d+)\s+kB/im)?.[1]) + const name = status.match(/^Name:\s+(.+)$/im)?.[1]?.trim() + metrics.set(pid, { + cpuPercent: 0, + memoryRssBytes: Number.isFinite(rssKb) ? rssKb * 1024 : 0, + command: name, + }) + } catch {} + } + return metrics +} + +function parseWindowsJson(output: string): any[] { + if (!output.trim()) return [] + const parsed = JSON.parse(output) + return Array.isArray(parsed) ? parsed : [parsed] +} + +function collectWindowsProcessMetrics(pids: number[]): Map> { + if (!pids.length) return new Map() + const idList = pids.join(',') + try { + const script = [ + `$ids=@(${idList})`, + 'Get-CimInstance Win32_PerfFormattedData_PerfProc_Process', + '| Where-Object { $ids -contains [int]$_.IDProcess }', + '| Select-Object @{Name="pid";Expression={[int]$_.IDProcess}},@{Name="cpuPercent";Expression={[double]$_.PercentProcessorTime}},@{Name="memoryRssBytes";Expression={[double]$_.WorkingSet}},@{Name="command";Expression={$_.Name}}', + '| ConvertTo-Json -Compress', + ].join(' ') + const output = execFileSync('powershell.exe', ['-NoProfile', '-Command', script], { + encoding: 'utf-8', + timeout: 5000, + windowsHide: true, + }) + const metrics = new Map>() + for (const item of parseWindowsJson(output)) { + const pid = Number(item?.pid) + if (!Number.isFinite(pid)) continue + metrics.set(pid, { + cpuPercent: numberOrNull(item?.cpuPercent) ?? 0, + memoryRssBytes: numberOrNull(item?.memoryRssBytes) ?? 0, + command: typeof item?.command === 'string' ? item.command : undefined, + }) + } + return metrics + } catch {} + + const metrics = new Map>() + for (const pid of pids) { + try { + const output = execFileSync('tasklist.exe', ['/FI', `PID eq ${pid}`, '/FO', 'CSV', '/NH'], { + encoding: 'utf-8', + timeout: 3000, + windowsHide: true, + }) + const line = output.split(/\r?\n/).find(item => item.includes(`"${pid}"`)) + if (!line) continue + const columns = line.match(/(".*?"|[^",]+)(?=\s*,|\s*$)/g)?.map(value => value.replace(/^"|"$/g, '')) || [] + const memoryKb = Number(columns[4]?.replace(/[^\d]/g, '')) + metrics.set(pid, { + cpuPercent: 0, + memoryRssBytes: Number.isFinite(memoryKb) ? memoryKb * 1024 : 0, + command: columns[0], + }) + } catch {} + } + return metrics +} + +function collectProcessMetrics(pids: number[]): Map> { + const uniquePids = [...new Set(pids.filter(pid => Number.isFinite(pid) && pid > 0))] + return platform() === 'win32' + ? collectWindowsProcessMetrics(uniquePids) + : collectPosixProcessMetrics(uniquePids) +} + +function processUsage( + pid: number | undefined, + role: ProcessUsage['role'], + metrics: Map>, + profile?: string, +): ProcessUsage | undefined { + if (!pid) return undefined + const metric = metrics.get(pid) + return { + pid, + role, + profile, + running: !!metric, + cpuPercent: metric?.cpuPercent ?? 0, + memoryRssBytes: metric?.memoryRssBytes ?? 0, + command: metric?.command, + } +} + +function normalizeWorker(raw: unknown): { + running: boolean + pid?: number + endpoint?: string + lastUsedAt?: number +} { + if (typeof raw === 'boolean') return { running: raw } + if (!raw || typeof raw !== 'object') return { running: false } + const record = raw as Record + const pid = Number(record.pid) + const lastUsedAt = Number(record.last_used_at) + return { + running: !!record.running, + pid: Number.isFinite(pid) && pid > 0 ? pid : undefined, + endpoint: typeof record.endpoint === 'string' ? record.endpoint : undefined, + lastUsedAt: Number.isFinite(lastUsedAt) ? lastUsedAt : undefined, + } +} + +export function createEmptyOpsRuntimeSnapshot(error?: string): OpsRuntimeSnapshot { + return { + timestamp: Date.now(), + system: { + platform: process.platform, + arch: process.arch, + uptimeSeconds: safeUptime(), + cpuCount: safeCpus().length, + cpuPercent: 0, + loadAverage: safeLoadAverage(), + totalMemoryBytes: 0, + freeMemoryBytes: 0, + usedMemoryBytes: 0, + memoryPercent: 0, + }, + web: { + pid: process.pid, + uptimeSeconds: safeProcessUptime(), + memory: safeProcessMemoryUsage(), + cpuPercent: 0, + }, + bridge: { + endpoint: '', + reachable: false, + error, + broker: { + running: false, + ready: false, + restartScheduled: false, + restartAttempts: 0, + }, + workers: [], + totalWorkerMemoryRssBytes: 0, + }, + sessions: { + active: 0, + running: 0, + byProfile: {}, + }, + } +} + +export async function getOpsRuntimeSnapshot(): Promise { + const manager = getAgentBridgeManager() + const managerState = manager.getRuntimeState() + let bridgeReachable = false + let bridgeError: string | undefined + let bridgePing: Record = {} + let sessions: Array> = [] + + try { + const client = new AgentBridgeClient({ endpoint: managerState.endpoint, timeoutMs: 2000, connectRetryMs: 0 }) + bridgePing = await client.ping() as Record + bridgeReachable = true + try { + const list = await client.list() + sessions = Array.isArray((list as any).sessions) ? (list as any).sessions : [] + } catch {} + } catch (err: any) { + bridgeError = err?.message || 'Agent bridge is not reachable' + } + + const workerEntries = Object.entries((bridgePing.worker_details || {}) as Record) + .map(([profile, value]) => [profile, normalizeWorker(value)] as const) + const brokerPid = Number(bridgePing.broker?.pid || managerState.pid) + const pids = [ + process.pid, + Number.isFinite(brokerPid) ? brokerPid : undefined, + ...workerEntries.map(([, worker]) => worker.pid), + ].filter((pid): pid is number => typeof pid === 'number' && pid > 0) + const processMetrics = collectProcessMetrics(pids) + + const sessionCountsByProfile: Record = {} + let runningSessions = 0 + for (const session of sessions) { + const profileName = String(session.profile || 'default') + sessionCountsByProfile[profileName] = (sessionCountsByProfile[profileName] || 0) + 1 + if (session.running) runningSessions += 1 + } + if (!sessions.length && bridgePing.sessions_by_profile && typeof bridgePing.sessions_by_profile === 'object') { + for (const [profileName, count] of Object.entries(bridgePing.sessions_by_profile)) { + const value = Number(count) + if (Number.isFinite(value)) sessionCountsByProfile[profileName] = value + } + } + + const workers = workerEntries.map(([profileName, worker]) => { + const usage = processUsage(worker.pid, 'worker', processMetrics, profileName) + return { + pid: worker.pid || 0, + role: 'worker' as const, + profile: profileName, + running: worker.running, + cpuPercent: usage?.cpuPercent ?? 0, + memoryRssBytes: usage?.memoryRssBytes ?? 0, + command: usage?.command, + endpoint: worker.endpoint, + lastUsedAt: worker.lastUsedAt, + sessionCount: sessionCountsByProfile[profileName] || 0, + runningSessionCount: sessions.filter(session => String(session.profile || 'default') === profileName && session.running).length, + } + }) + + const systemMemory = collectSystemMemoryUsage() + const totalWorkerMemory = workers.reduce((sum, worker) => sum + (worker.memoryRssBytes || 0), 0) + + return { + timestamp: Date.now(), + system: { + platform: process.platform, + arch: process.arch, + uptimeSeconds: safeUptime(), + cpuCount: safeCpus().length, + cpuPercent: sampleSystemCpuPercent() ?? 0, + loadAverage: safeLoadAverage(), + totalMemoryBytes: systemMemory.totalMemoryBytes, + freeMemoryBytes: systemMemory.freeMemoryBytes, + usedMemoryBytes: systemMemory.usedMemoryBytes, + memoryPercent: systemMemory.memoryPercent, + }, + web: { + pid: process.pid, + uptimeSeconds: safeProcessUptime(), + memory: safeProcessMemoryUsage(), + cpuPercent: sampleWebCpuPercent() ?? 0, + }, + bridge: { + endpoint: managerState.endpoint, + reachable: bridgeReachable, + error: bridgeError, + broker: { + running: managerState.running, + ready: managerState.ready, + pid: Number.isFinite(brokerPid) && brokerPid > 0 ? brokerPid : undefined, + process: processUsage(Number.isFinite(brokerPid) ? brokerPid : undefined, 'broker', processMetrics), + restartScheduled: managerState.restartScheduled, + restartAttempts: managerState.restartAttempts, + }, + workers, + totalWorkerMemoryRssBytes: totalWorkerMemory, + }, + sessions: { + active: sessions.length || Number(bridgePing.active_sessions || 0), + running: runningSessions, + byProfile: sessionCountsByProfile, + }, + } +} diff --git a/tests/server/agent-bridge-python-concurrency.test.ts b/tests/server/agent-bridge-python-concurrency.test.ts index d736d4d..8f0e3c0 100644 --- a/tests/server/agent-bridge-python-concurrency.test.ts +++ b/tests/server/agent-bridge-python-concurrency.test.ts @@ -393,6 +393,80 @@ assert calls == [] pool._run_context.session_id = "session-a" assert pool._approval_dispatcher("cmd", "desc", allow_permanent=False) == "once" assert calls == [("cmd", "desc", False)] +`) + }) + + it('cleans broker workers and wires worker parent watchdog state', () => { + runPython(String.raw` +${harness} + +class FakeWorker: + def __init__(self): + self.running = True + self.stopped = False + + def stop(self): + self.running = False + self.stopped = True + +broker = bridge.BridgeBroker("ipc:///tmp/unused.sock") +worker = FakeWorker() +broker._workers["default"] = worker +broker._run_profile["run-a"] = "default" +broker._session_profile["session-a"] = "default" +broker._approval_profile["approval-a"] = "default" +broker._compression_profile["compression-a"] = "default" + +broker.stop() +assert broker._stop.is_set() +assert worker.stopped +assert broker._workers == {} +assert broker._run_profile == {} +assert broker._session_profile == {} +assert broker._approval_profile == {} +assert broker._compression_profile == {} + +created = {} + +class FakeProcess: + stdout = None + stderr = None + + def poll(self): + return None + +def fake_popen(args, **kwargs): + created["args"] = args + created["env"] = kwargs["env"] + return FakeProcess() + +original_popen = bridge.subprocess.Popen +original_getpid = bridge.os.getpid +try: + bridge.subprocess.Popen = fake_popen + bridge.os.getpid = lambda: 4242 + proc_worker = bridge.WorkerProcess("default", "ipc:///tmp/worker.sock", "/agent", "/home") + proc_worker._pipe_stderr = lambda: None + proc_worker._wait_ready = lambda: None + proc_worker.start() +finally: + bridge.subprocess.Popen = original_popen + bridge.os.getpid = original_getpid + +assert created["env"]["HERMES_AGENT_BRIDGE_BROKER_PID"] == "4242" +assert created["env"]["HERMES_AGENT_BRIDGE_WORKER_PROFILE"] == "default" + +stop_event = threading.Event() +seen_pids = [] +original_process_exists = bridge._process_exists +try: + bridge._process_exists = lambda pid: seen_pids.append(pid) and False + bridge._start_parent_process_watchdog(12345, stop_event, "test", interval=0.01) + assert wait_for(stop_event.is_set, timeout=2) +finally: + bridge._process_exists = original_process_exists + +assert seen_pids == [12345] `) }) }) diff --git a/tests/server/performance-monitor-controller.test.ts b/tests/server/performance-monitor-controller.test.ts new file mode 100644 index 0000000..b8e6e15 --- /dev/null +++ b/tests/server/performance-monitor-controller.test.ts @@ -0,0 +1,40 @@ +import { afterEach, describe, expect, it, vi } from 'vitest' + +const getOpsRuntimeSnapshot = vi.fn() + +vi.mock('../../packages/server/src/services/hermes/ops-monitor', () => ({ + createEmptyOpsRuntimeSnapshot: (error?: string) => ({ timestamp: 0, error }), + getOpsRuntimeSnapshot, +})) + +describe('performance monitor controller', () => { + afterEach(() => { + vi.clearAllMocks() + }) + + it('returns the runtime snapshot from the performance service', async () => { + const snapshot = { + timestamp: 1, + bridge: { workers: [] }, + sessions: { active: 0 }, + } + getOpsRuntimeSnapshot.mockResolvedValue(snapshot) + const ctx: any = {} + + const { runtime } = await import('../../packages/server/src/controllers/hermes/performance-monitor') + await runtime(ctx) + + expect(ctx.body).toBe(snapshot) + }) + + it('returns a zero snapshot when metrics collection fails', async () => { + getOpsRuntimeSnapshot.mockRejectedValue(new Error('boom')) + const ctx: any = {} + + const { runtime } = await import('../../packages/server/src/controllers/hermes/performance-monitor') + await runtime(ctx) + + expect(ctx.status).toBeUndefined() + expect(ctx.body).toEqual({ timestamp: 0, error: 'boom' }) + }) +})