diff --git a/packages/client/src/api/hermes/chat.ts b/packages/client/src/api/hermes/chat.ts index 2e9c6c8..773c9ea 100644 --- a/packages/client/src/api/hermes/chat.ts +++ b/packages/client/src/api/hermes/chat.ts @@ -105,6 +105,8 @@ const sessionEventHandlers = new Map void onApprovalResolved?: (event: RunEvent) => void onPeerUserMessage?: (event: RunEvent) => void + onClarifyRequested?: (event: RunEvent) => void + onClarifyResolved?: (event: RunEvent) => void }>() const peerUserMessageHandlers = new Set<(event: RunEvent) => void>() @@ -372,6 +374,26 @@ function globalPeerUserMessageHandler(event: RunEvent): void { } } +function globalClarifyRequestedHandler(event: RunEvent): void { + const sid = event.session_id + if (!sid) return + + const handlers = sessionEventHandlers.get(sid) + if (handlers?.onClarifyRequested) { + handlers.onClarifyRequested(event) + } +} + +function globalClarifyResolvedHandler(event: RunEvent): void { + const sid = event.session_id + if (!sid) return + + const handlers = sessionEventHandlers.get(sid) + if (handlers?.onClarifyResolved) { + handlers.onClarifyResolved(event) + } +} + /** * Register event handlers for a session * @param sessionId - Session ID @@ -401,6 +423,8 @@ export function registerSessionHandlers( onApprovalRequested?: (event: RunEvent) => void onApprovalResolved?: (event: RunEvent) => void onPeerUserMessage?: (event: RunEvent) => void + onClarifyRequested?: (event: RunEvent) => void + onClarifyResolved?: (event: RunEvent) => void } ): () => void { sessionEventHandlers.set(sessionId, handlers) @@ -426,6 +450,19 @@ export function onPeerUserMessage(handler: (event: RunEvent) => void): () => voi } } +export function respondClarify( + sessionId: string, + clarifyId: string, + response: string, +): void { + const socket = connectChatRun() + socket.emit('clarify.respond', { + session_id: sessionId, + clarify_id: clarifyId, + response, + }) +} + export function respondToolApproval( sessionId: string, approvalId: string, @@ -510,6 +547,8 @@ export function connectChatRun(requestedProfile?: string | null): Socket { chatRunSocket.on('approval.requested', globalApprovalRequestedHandler) chatRunSocket.on('approval.resolved', globalApprovalResolvedHandler) chatRunSocket.on('run.peer_user_message', globalPeerUserMessageHandler) + chatRunSocket.on('clarify.requested', globalClarifyRequestedHandler) + chatRunSocket.on('clarify.resolved', globalClarifyResolvedHandler) // Compression events chatRunSocket.on('compression.started', globalCompressionStartedHandler) @@ -708,6 +747,14 @@ export function startRunViaSocket( if (closed) return onEvent(evt) }, + onClarifyRequested: (evt: RunEvent) => { + if (closed) return + onEvent(evt) + }, + onClarifyResolved: (evt: RunEvent) => { + if (closed) return + onEvent(evt) + }, } // Register handlers in the global session map diff --git a/packages/client/src/components/hermes/chat/ChatPanel.vue b/packages/client/src/components/hermes/chat/ChatPanel.vue index 9478688..ef646bb 100644 --- a/packages/client/src/components/hermes/chat/ChatPanel.vue +++ b/packages/client/src/components/hermes/chat/ChatPanel.vue @@ -154,6 +154,17 @@ const headerTitle = computed(() => const activeApproval = computed(() => chatStore.activePendingApproval); const visibleApproval = computed(() => activeApproval.value); + +const activeClarify = computed(() => chatStore.activePendingClarify); +const visibleClarify = computed(() => activeClarify.value); +const clarifyResponse = ref(''); + +function handleClarify(response?: string) { + const finalResponse = response !== undefined ? response : clarifyResponse.value.trim(); + chatStore.respondToClarify(finalResponse); + clarifyResponse.value = ''; +} + const showNewChatModal = ref(false); const newChatProfile = ref("default"); const newChatProvider = ref(""); @@ -1230,6 +1241,63 @@ async function handleSessionModelCustomSubmit() { +
+ +
+
+
{{ t('chat.clarifyKicker') }}
+
{{ t('chat.clarifyTitle') }}
+
{{ visibleClarify.question }}
+
+
+ + {{ choice }} + + + {{ t('chat.clarifyDismiss') }} + +
+
+
+ + + {{ t('chat.clarifySubmit') }} + +
+
+
+
{ return sid ? pendingApprovals.value.get(sid) || null : null }) + const pendingClarifies = ref>(new Map()) + const activePendingClarify = computed(() => { + const sid = activeSessionId.value + return sid ? pendingClarifies.value.get(sid) || null : null + }) + // 自动播放语音开关 const autoPlaySpeechEnabled = ref(false) @@ -623,6 +638,10 @@ export const useChatStore = defineStore('chat', () => { setPendingApproval({ ...e, session_id: sessionId } as RunEvent) } else if (e.event === 'approval.resolved') { clearPendingApproval({ ...e, session_id: sessionId } as RunEvent) + } else if (e.event === 'clarify.requested') { + setPendingClarify({ ...e, session_id: sessionId } as RunEvent) + } else if (e.event === 'clarify.resolved') { + clearPendingClarify({ ...e, session_id: sessionId } as RunEvent) } else if (e.event === 'run.failed') { addAgentErrorMessage(sessionId, e.error) serverWorking.value.delete(sessionId) @@ -1048,6 +1067,41 @@ export const useChatStore = defineStore('chat', () => { pendingApprovals.value = new Map(pendingApprovals.value) } + function setPendingClarify(evt: RunEvent) { + const sid = evt.session_id + const clarifyId = (evt as any).clarify_id as string | undefined + if (!sid || !clarifyId) return + pendingClarifies.value.set(sid, { + sessionId: sid, + clarifyId, + question: String((evt as any).question || ''), + choices: Array.isArray((evt as any).choices) ? (evt as any).choices : null, + timeoutMs: Number((evt as any).timeout_ms) || 300000, + requestedAt: Date.now(), + }) + pendingClarifies.value = new Map(pendingClarifies.value) + } + + function clearPendingClarify(evt: RunEvent) { + const sid = evt.session_id + if (!sid) return + const current = pendingClarifies.value.get(sid) + if (!current) return + const clarifyId = (evt as any).clarify_id + if (clarifyId && current.clarifyId !== clarifyId) return + pendingClarifies.value.delete(sid) + pendingClarifies.value = new Map(pendingClarifies.value) + } + + function respondToClarify(response: string) { + const pending = activePendingClarify.value + if (!pending) return + respondClarify(pending.sessionId, pending.clarifyId, response) + pendingClarifies.value.delete(pending.sessionId) + pendingClarifies.value = new Map(pendingClarifies.value) + } + + function respondApproval(choice: PendingApproval['choices'][number]) { const pending = activePendingApproval.value if (!pending) return @@ -1469,6 +1523,16 @@ export const useChatStore = defineStore('chat', () => { break } + case 'clarify.requested': { + setPendingClarify(evt) + break + } + + case 'clarify.resolved': { + clearPendingClarify(evt) + break + } + case 'run.completed': { const msgs = getSessionMsgs(sid) const lastMsg = activeAssistantMessageId @@ -1919,6 +1983,16 @@ export const useChatStore = defineStore('chat', () => { break } + case 'clarify.requested': { + setPendingClarify(evt) + break + } + + case 'clarify.resolved': { + clearPendingClarify(evt) + break + } + case 'run.completed': { const hasQueue = (evt as any).queue_remaining > 0 if (hasQueue) { @@ -2067,6 +2141,8 @@ export const useChatStore = defineStore('chat', () => { onUsageUpdated: (evt) => handleEvent(evt), onSessionCommand: (evt) => handleEvent(evt), onRunQueued: (evt) => handleEvent(evt), + onClarifyRequested: (evt) => handleEvent(evt), + onClarifyResolved: (evt) => handleEvent(evt), }) // No need to emit resume here — switchSession already did it. @@ -2259,6 +2335,7 @@ export const useChatStore = defineStore('chat', () => { queuedUserMessages, pendingApprovals, activePendingApproval, + activePendingClarify, removeQueuedMessage, isLoadingSessions, sessionsLoaded, @@ -2274,6 +2351,7 @@ export const useChatStore = defineStore('chat', () => { sendMessage, stopStreaming, respondApproval, + respondToClarify, loadSessions, refreshActiveSession, getThinkingObservation, diff --git a/packages/server/src/services/hermes/agent-bridge/client.ts b/packages/server/src/services/hermes/agent-bridge/client.ts index 70aabd7..2798f86 100644 --- a/packages/server/src/services/hermes/agent-bridge/client.ts +++ b/packages/server/src/services/hermes/agent-bridge/client.ts @@ -481,6 +481,10 @@ export class AgentBridgeClient { return this.request({ action: 'approval_respond', approval_id: approvalId, choice }) } + clarifyRespond(clarifyId: string, response: string): Promise { + return this.request({ action: 'clarify_respond', clarify_id: clarifyId, response }) + } + compressionRespond( requestId: string, payload: { messages?: unknown[]; system_message?: string; error?: string }, diff --git a/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py b/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py index 9964f71..f77dc0d 100755 --- a/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py +++ b/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py @@ -598,6 +598,7 @@ class AgentPool: self._approval_requests: dict[str, queue.Queue[str]] = {} self._gateway_approval_requests: dict[str, str] = {} self._compression_requests: dict[str, queue.Queue[dict[str, Any]]] = {} + self._clarify_requests: dict[str, queue.Queue[str]] = {} self._run_context = threading.local() self._approval_handlers: dict[str, Callable[..., str]] = {} self._exec_ask_depth = 0 @@ -667,6 +668,7 @@ class AgentPool: tool_progress_callback=self._tool_progress_callback(session_id), tool_start_callback=self._tool_start_callback(session_id), tool_complete_callback=self._tool_complete_callback(session_id), + clarify_callback=self._clarify_callback(session_id), ) agent.compression_enabled = False self._install_compression_hook(agent, session_id) @@ -1053,6 +1055,30 @@ class AgentPool: return callback + def _clarify_callback(self, session_id: str): + def callback(question: str, choices: list[str] | None = None) -> str: + clarify_id = uuid.uuid4().hex + response_queue: queue.Queue[str] = queue.Queue(maxsize=1) + with self._lock: + self._clarify_requests[clarify_id] = response_queue + self._append_event(session_id, { + "event": "clarify.requested", + "clarify_id": clarify_id, + "question": str(question or ""), + "choices": list(choices) if choices else None, + "timeout_ms": 300_000, + }) + try: + user_response = response_queue.get(timeout=300) + except queue.Empty: + user_response = "[user did not respond within 5m]" + finally: + with self._lock: + self._clarify_requests.pop(clarify_id, None) + return user_response + + return callback + def _approval_dispatcher(self, command: str, description: str, *, allow_permanent: bool = True) -> str: session_id = str(getattr(self._run_context, "session_id", "") or "") if not session_id: @@ -1425,6 +1451,17 @@ class AgentPool: pass return {"approval_id": approval_id, "resolved": True, "choice": cleaned} + def respond_clarify(self, clarify_id: str, response: str) -> dict[str, Any]: + with self._lock: + response_queue = self._clarify_requests.get(clarify_id) + if response_queue is None: + return {"clarify_id": clarify_id, "resolved": False} + try: + response_queue.put_nowait(response) + except queue.Full: + pass + return {"clarify_id": clarify_id, "resolved": True} + def get_history(self, session_id: str) -> dict[str, Any]: with self._lock: session = self._sessions.get(session_id) @@ -1640,6 +1677,13 @@ class BridgeServer: raise ValueError("approval_id is required") return self.pool.respond_approval(approval_id, str(req.get("choice") or "deny")) + if action == "clarify_respond": + clarify_id = str(req.get("clarify_id") or "").strip() + if not clarify_id: + raise ValueError("clarify_id is required") + response = str(req.get("response") or "").strip() + return self.pool.respond_clarify(clarify_id, response) + if action == "compression_respond": request_id = str(req.get("request_id") or "").strip() if not request_id: @@ -2087,6 +2131,7 @@ class BridgeBroker: self._running_run_profile: dict[str, str] = {} self._session_profile: dict[str, str] = {} self._approval_profile: dict[str, str] = {} + self._clarify_profile: dict[str, str] = {} self._compression_profile: dict[str, str] = {} self._lock = threading.RLock() self._stop = threading.Event() @@ -2140,6 +2185,9 @@ class BridgeBroker: approval_id = str(event.get("approval_id") or "") if approval_id: self._approval_profile[approval_id] = profile + clarify_id = str(event.get("clarify_id") or "") + if clarify_id: + self._clarify_profile[clarify_id] = profile request_id = str(event.get("request_id") or "") if event.get("event") == "bridge.compression.requested" and request_id: self._compression_profile[request_id] = profile @@ -2155,6 +2203,7 @@ class BridgeBroker: self._running_run_profile.clear() self._session_profile.clear() self._approval_profile.clear() + self._clarify_profile.clear() self._compression_profile.clear() for worker in workers: worker.stop() @@ -2245,6 +2294,16 @@ class BridgeBroker: raise KeyError(f"unknown approval request: {approval_id}") return self._forward(profile, req) + if action == "clarify_respond": + clarify_id = str(req.get("clarify_id") or "").strip() + if not clarify_id: + raise ValueError("clarify_id is required") + with self._lock: + profile = self._clarify_profile.get(clarify_id) + if not profile: + raise KeyError(f"unknown clarify request: {clarify_id}") + return self._forward(profile, req) + if action == "compression_respond": request_id = str(req.get("request_id") or "").strip() if not request_id: @@ -2263,6 +2322,7 @@ class BridgeBroker: self._running_run_profile.clear() self._session_profile.clear() self._approval_profile.clear() + self._clarify_profile.clear() self._compression_profile.clear() destroyed = 0 for worker in workers: @@ -2284,6 +2344,7 @@ class BridgeBroker: self._running_run_profile = {key: value for key, value in self._running_run_profile.items() if value != profile} self._session_profile = {key: value for key, value in self._session_profile.items() if value != profile} self._approval_profile = {key: value for key, value in self._approval_profile.items() if value != profile} + self._clarify_profile = {key: value for key, value in self._clarify_profile.items() if value != profile} self._compression_profile = {key: value for key, value in self._compression_profile.items() if value != profile} if worker is None or not worker.running: diff --git a/packages/server/src/services/hermes/run-chat/index.ts b/packages/server/src/services/hermes/run-chat/index.ts index 4af5e76..f2a35eb 100644 --- a/packages/server/src/services/hermes/run-chat/index.ts +++ b/packages/server/src/services/hermes/run-chat/index.ts @@ -241,6 +241,25 @@ export class ChatRunSocket { }) } }) + + socket.on('clarify.respond', async (data: { session_id?: string; clarify_id?: string; response?: string }) => { + if (!data.session_id || !data.clarify_id) return + try { + const result = await this.bridge.clarifyRespond(data.clarify_id, data.response || '') + this.emitToSession(socket, data.session_id, 'clarify.resolved', { + event: 'clarify.resolved', + clarify_id: data.clarify_id, + resolved: Boolean((result as any)?.resolved), + }) + } catch (err) { + this.emitToSession(socket, data.session_id, 'clarify.resolved', { + event: 'clarify.resolved', + clarify_id: data.clarify_id, + resolved: false, + error: err instanceof Error ? err.message : String(err), + }) + } + }) } // --- Run dispatcher --- diff --git a/tests/server/agent-bridge-client-clarify.test.ts b/tests/server/agent-bridge-client-clarify.test.ts new file mode 100644 index 0000000..648c2cc --- /dev/null +++ b/tests/server/agent-bridge-client-clarify.test.ts @@ -0,0 +1,20 @@ +import { describe, expect, it, vi } from 'vitest' + +describe('AgentBridgeClient clarify responses', () => { + it('sends clarify_respond requests to the bridge', async () => { + const { AgentBridgeClient } = await import('../../packages/server/src/services/hermes/agent-bridge/client') + const client = new AgentBridgeClient({ endpoint: 'tcp://127.0.0.1:1', connectRetryMs: 0, timeoutMs: 1 }) + const request = vi.spyOn(client, 'request').mockResolvedValue({ ok: true, resolved: true }) + + await expect(client.clarifyRespond('clarify-1', 'Use the first option')).resolves.toEqual({ + ok: true, + resolved: true, + }) + + expect(request).toHaveBeenCalledWith({ + action: 'clarify_respond', + clarify_id: 'clarify-1', + response: 'Use the first option', + }) + }) +}) diff --git a/tests/server/run-chat-clarify-respond.test.ts b/tests/server/run-chat-clarify-respond.test.ts new file mode 100644 index 0000000..c5af0a3 --- /dev/null +++ b/tests/server/run-chat-clarify-respond.test.ts @@ -0,0 +1,120 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const bridgeMock = vi.hoisted(() => ({ + clarifyRespond: vi.fn(), +})) + +vi.mock('../../packages/server/src/services/hermes/agent-bridge', () => ({ + AgentBridgeClient: vi.fn(() => bridgeMock), +})) + +vi.mock('../../packages/server/src/services/logger', () => ({ + logger: { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }, +})) + +vi.mock('../../packages/server/src/db/hermes/session-store', () => ({ + getSession: vi.fn(() => null), +})) + +vi.mock('../../packages/server/src/services/hermes/hermes-profile', () => ({ + getActiveProfileName: vi.fn(() => 'default'), + getProfileDir: vi.fn(() => '/tmp/hermes-default'), + listProfileNamesFromDisk: vi.fn(() => ['default']), +})) + +vi.mock('../../packages/server/src/middleware/user-auth', () => ({ + authenticateUserToken: vi.fn(), + isAuthEnabled: vi.fn(async () => false), +})) + +vi.mock('../../packages/server/src/db/hermes/users-store', () => ({ + userCanAccessProfile: vi.fn(() => true), +})) + +function createSocketHarness() { + const handlers = new Map() + const namespaceEmit = vi.fn() + const namespace = { + adapter: { rooms: new Map([['session:session-1', new Set(['socket-1'])]]) }, + to: vi.fn(() => ({ emit: namespaceEmit })), + use: vi.fn(), + on: vi.fn(), + } + const io = { + of: vi.fn(() => namespace), + } + const socket = { + id: 'socket-1', + connected: true, + data: {}, + handshake: { auth: {}, query: { profile: 'default' } }, + on: vi.fn((event: string, handler: Function) => { + handlers.set(event, handler) + }), + join: vi.fn(), + emit: vi.fn(), + } + return { handlers, io, namespace, namespaceEmit, socket } +} + +describe('ChatRunSocket clarify responses', () => { + beforeEach(() => { + vi.resetModules() + bridgeMock.clarifyRespond.mockReset() + }) + + it('forwards clarify.respond events to the bridge and emits clarify.resolved', async () => { + bridgeMock.clarifyRespond.mockResolvedValue({ ok: true, resolved: true }) + const { ChatRunSocket } = await import('../../packages/server/src/services/hermes/run-chat') + const { handlers, io, namespace, namespaceEmit, socket } = createSocketHarness() + const server = new ChatRunSocket(io as any) + + ;(server as any).onConnection(socket) + await handlers.get('clarify.respond')?.({ + session_id: 'session-1', + clarify_id: 'clarify-1', + response: 'Use option A', + }) + + expect(bridgeMock.clarifyRespond).toHaveBeenCalledWith('clarify-1', 'Use option A') + expect(namespace.to).toHaveBeenCalledWith('session:session-1') + expect(namespaceEmit).toHaveBeenCalledWith('clarify.resolved', { + event: 'clarify.resolved', + session_id: 'session-1', + clarify_id: 'clarify-1', + resolved: true, + }) + }) + + it('emits an unresolved clarify result when the bridge rejects the response', async () => { + bridgeMock.clarifyRespond.mockRejectedValue(new Error('unknown clarify request')) + const { ChatRunSocket } = await import('../../packages/server/src/services/hermes/run-chat') + const { handlers, namespaceEmit, socket } = createSocketHarness() + const namespace = { + adapter: { rooms: new Map([['session:session-1', new Set(['socket-1'])]]) }, + to: vi.fn(() => ({ emit: namespaceEmit })), + use: vi.fn(), + on: vi.fn(), + } + const server = new ChatRunSocket({ of: vi.fn(() => namespace) } as any) + + ;(server as any).onConnection(socket) + await handlers.get('clarify.respond')?.({ + session_id: 'session-1', + clarify_id: 'clarify-1', + response: 'Use option B', + }) + + expect(namespaceEmit).toHaveBeenCalledWith('clarify.resolved', { + event: 'clarify.resolved', + session_id: 'session-1', + clarify_id: 'clarify-1', + resolved: false, + error: 'unknown clarify request', + }) + }) +})