[codex] add clarify support with response path tests (#972)

* feat: 新增 clarify(澄清/确认)交互支持

* test clarify response bridge path

---------

Co-authored-by: GoldenFish123321 <golden_fish@foxmail.com>
This commit is contained in:
ekko
2026-05-24 18:09:39 +08:00
committed by GitHub
parent a7f0a92fe6
commit e743c81ad3
17 changed files with 568 additions and 1 deletions
@@ -481,6 +481,10 @@ export class AgentBridgeClient {
return this.request({ action: 'approval_respond', approval_id: approvalId, choice })
}
clarifyRespond(clarifyId: string, response: string): Promise<AgentBridgeResponse> {
return this.request({ action: 'clarify_respond', clarify_id: clarifyId, response })
}
compressionRespond(
requestId: string,
payload: { messages?: unknown[]; system_message?: string; error?: string },
@@ -598,6 +598,7 @@ class AgentPool:
self._approval_requests: dict[str, queue.Queue[str]] = {}
self._gateway_approval_requests: dict[str, str] = {}
self._compression_requests: dict[str, queue.Queue[dict[str, Any]]] = {}
self._clarify_requests: dict[str, queue.Queue[str]] = {}
self._run_context = threading.local()
self._approval_handlers: dict[str, Callable[..., str]] = {}
self._exec_ask_depth = 0
@@ -667,6 +668,7 @@ class AgentPool:
tool_progress_callback=self._tool_progress_callback(session_id),
tool_start_callback=self._tool_start_callback(session_id),
tool_complete_callback=self._tool_complete_callback(session_id),
clarify_callback=self._clarify_callback(session_id),
)
agent.compression_enabled = False
self._install_compression_hook(agent, session_id)
@@ -1053,6 +1055,30 @@ class AgentPool:
return callback
def _clarify_callback(self, session_id: str):
def callback(question: str, choices: list[str] | None = None) -> str:
clarify_id = uuid.uuid4().hex
response_queue: queue.Queue[str] = queue.Queue(maxsize=1)
with self._lock:
self._clarify_requests[clarify_id] = response_queue
self._append_event(session_id, {
"event": "clarify.requested",
"clarify_id": clarify_id,
"question": str(question or ""),
"choices": list(choices) if choices else None,
"timeout_ms": 300_000,
})
try:
user_response = response_queue.get(timeout=300)
except queue.Empty:
user_response = "[user did not respond within 5m]"
finally:
with self._lock:
self._clarify_requests.pop(clarify_id, None)
return user_response
return callback
def _approval_dispatcher(self, command: str, description: str, *, allow_permanent: bool = True) -> str:
session_id = str(getattr(self._run_context, "session_id", "") or "")
if not session_id:
@@ -1425,6 +1451,17 @@ class AgentPool:
pass
return {"approval_id": approval_id, "resolved": True, "choice": cleaned}
def respond_clarify(self, clarify_id: str, response: str) -> dict[str, Any]:
with self._lock:
response_queue = self._clarify_requests.get(clarify_id)
if response_queue is None:
return {"clarify_id": clarify_id, "resolved": False}
try:
response_queue.put_nowait(response)
except queue.Full:
pass
return {"clarify_id": clarify_id, "resolved": True}
def get_history(self, session_id: str) -> dict[str, Any]:
with self._lock:
session = self._sessions.get(session_id)
@@ -1640,6 +1677,13 @@ class BridgeServer:
raise ValueError("approval_id is required")
return self.pool.respond_approval(approval_id, str(req.get("choice") or "deny"))
if action == "clarify_respond":
clarify_id = str(req.get("clarify_id") or "").strip()
if not clarify_id:
raise ValueError("clarify_id is required")
response = str(req.get("response") or "").strip()
return self.pool.respond_clarify(clarify_id, response)
if action == "compression_respond":
request_id = str(req.get("request_id") or "").strip()
if not request_id:
@@ -2087,6 +2131,7 @@ class BridgeBroker:
self._running_run_profile: dict[str, str] = {}
self._session_profile: dict[str, str] = {}
self._approval_profile: dict[str, str] = {}
self._clarify_profile: dict[str, str] = {}
self._compression_profile: dict[str, str] = {}
self._lock = threading.RLock()
self._stop = threading.Event()
@@ -2140,6 +2185,9 @@ class BridgeBroker:
approval_id = str(event.get("approval_id") or "")
if approval_id:
self._approval_profile[approval_id] = profile
clarify_id = str(event.get("clarify_id") or "")
if clarify_id:
self._clarify_profile[clarify_id] = profile
request_id = str(event.get("request_id") or "")
if event.get("event") == "bridge.compression.requested" and request_id:
self._compression_profile[request_id] = profile
@@ -2155,6 +2203,7 @@ class BridgeBroker:
self._running_run_profile.clear()
self._session_profile.clear()
self._approval_profile.clear()
self._clarify_profile.clear()
self._compression_profile.clear()
for worker in workers:
worker.stop()
@@ -2245,6 +2294,16 @@ class BridgeBroker:
raise KeyError(f"unknown approval request: {approval_id}")
return self._forward(profile, req)
if action == "clarify_respond":
clarify_id = str(req.get("clarify_id") or "").strip()
if not clarify_id:
raise ValueError("clarify_id is required")
with self._lock:
profile = self._clarify_profile.get(clarify_id)
if not profile:
raise KeyError(f"unknown clarify request: {clarify_id}")
return self._forward(profile, req)
if action == "compression_respond":
request_id = str(req.get("request_id") or "").strip()
if not request_id:
@@ -2263,6 +2322,7 @@ class BridgeBroker:
self._running_run_profile.clear()
self._session_profile.clear()
self._approval_profile.clear()
self._clarify_profile.clear()
self._compression_profile.clear()
destroyed = 0
for worker in workers:
@@ -2284,6 +2344,7 @@ class BridgeBroker:
self._running_run_profile = {key: value for key, value in self._running_run_profile.items() if value != profile}
self._session_profile = {key: value for key, value in self._session_profile.items() if value != profile}
self._approval_profile = {key: value for key, value in self._approval_profile.items() if value != profile}
self._clarify_profile = {key: value for key, value in self._clarify_profile.items() if value != profile}
self._compression_profile = {key: value for key, value in self._compression_profile.items() if value != profile}
if worker is None or not worker.running:
@@ -241,6 +241,25 @@ export class ChatRunSocket {
})
}
})
socket.on('clarify.respond', async (data: { session_id?: string; clarify_id?: string; response?: string }) => {
if (!data.session_id || !data.clarify_id) return
try {
const result = await this.bridge.clarifyRespond(data.clarify_id, data.response || '')
this.emitToSession(socket, data.session_id, 'clarify.resolved', {
event: 'clarify.resolved',
clarify_id: data.clarify_id,
resolved: Boolean((result as any)?.resolved),
})
} catch (err) {
this.emitToSession(socket, data.session_id, 'clarify.resolved', {
event: 'clarify.resolved',
clarify_id: data.clarify_id,
resolved: false,
error: err instanceof Error ? err.message : String(err),
})
}
})
}
// --- Run dispatcher ---