Fix plan command support in web bridge (#1018)
* fix: support plan command in web bridge * fix: preserve queued bridge messages * fix: avoid duplicate queued plan messages * fix: preserve plan command semantics --------- Co-authored-by: Codex <codex@openai.com>
This commit is contained in:
@@ -109,7 +109,12 @@ export interface AgentBridgeCommandResult extends AgentBridgeResponse {
|
||||
session_id: string
|
||||
command: string
|
||||
handled: boolean
|
||||
type?: string
|
||||
message?: string
|
||||
output?: string
|
||||
notice?: string
|
||||
loaded?: string[]
|
||||
missing?: string[]
|
||||
new_session_id?: string
|
||||
history?: unknown[]
|
||||
retry?: boolean
|
||||
@@ -405,11 +410,12 @@ export class AgentBridgeClient {
|
||||
})
|
||||
}
|
||||
|
||||
command(sessionId: string, command: string): Promise<AgentBridgeCommandResult> {
|
||||
command(sessionId: string, command: string, profile?: string): Promise<AgentBridgeCommandResult> {
|
||||
return this.request<AgentBridgeCommandResult>({
|
||||
action: 'command',
|
||||
session_id: sessionId,
|
||||
command,
|
||||
...(profile ? { profile } : {}),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -1470,6 +1470,80 @@ class AgentPool:
|
||||
with session.lock:
|
||||
return {"session_id": session_id, "history": copy.deepcopy(session.history)}
|
||||
|
||||
def dispatch_command(self, session_id: str, command: str, profile: str | None = None) -> dict[str, Any]:
|
||||
raw = str(command or "").strip()
|
||||
if raw.startswith("/"):
|
||||
raw = raw[1:].strip()
|
||||
if not raw:
|
||||
raise ValueError("command is required")
|
||||
|
||||
parts = raw.split(maxsplit=1)
|
||||
name = parts[0].lstrip("/").strip().lower()
|
||||
arg = parts[1] if len(parts) > 1 else ""
|
||||
|
||||
with _profile_env(profile):
|
||||
try:
|
||||
try:
|
||||
from agent.skill_bundles import (
|
||||
build_bundle_invocation_message,
|
||||
resolve_bundle_command_key,
|
||||
)
|
||||
|
||||
bundle_key = resolve_bundle_command_key(name)
|
||||
if bundle_key:
|
||||
bundle_result = build_bundle_invocation_message(
|
||||
bundle_key,
|
||||
arg,
|
||||
task_id=session_id,
|
||||
)
|
||||
if bundle_result:
|
||||
message, loaded_names, missing_names = bundle_result
|
||||
return {
|
||||
"session_id": session_id,
|
||||
"command": name,
|
||||
"handled": True,
|
||||
"type": "bundle",
|
||||
"message": message,
|
||||
"loaded": loaded_names,
|
||||
"missing": missing_names,
|
||||
}
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
from agent.skill_commands import (
|
||||
build_skill_invocation_message,
|
||||
resolve_skill_command_key,
|
||||
)
|
||||
|
||||
key = resolve_skill_command_key(name)
|
||||
if key:
|
||||
message = build_skill_invocation_message(
|
||||
key,
|
||||
arg,
|
||||
task_id=session_id,
|
||||
runtime_note=(
|
||||
"If you need user clarification, call the clarify tool. "
|
||||
"Do not output raw JSON question/choices payloads as the final response."
|
||||
),
|
||||
)
|
||||
if message:
|
||||
return {
|
||||
"session_id": session_id,
|
||||
"command": name,
|
||||
"handled": True,
|
||||
"type": "skill",
|
||||
"message": message,
|
||||
}
|
||||
except Exception as exc:
|
||||
raise RuntimeError(f"skill command dispatch failed: {exc}") from exc
|
||||
|
||||
return {
|
||||
"session_id": session_id,
|
||||
"command": name,
|
||||
"handled": False,
|
||||
"message": f"not a supported bridge command: /{name}",
|
||||
}
|
||||
|
||||
def get_result(self, run_id: str) -> dict[str, Any]:
|
||||
with self._lock:
|
||||
record = self._runs.get(run_id)
|
||||
@@ -1701,6 +1775,16 @@ class BridgeServer:
|
||||
if action == "get_history":
|
||||
return self.pool.get_history(str(req.get("session_id") or ""))
|
||||
|
||||
if action == "command":
|
||||
session_id = str(req.get("session_id") or "").strip()
|
||||
if not session_id:
|
||||
raise ValueError("session_id is required")
|
||||
return self.pool.dispatch_command(
|
||||
session_id,
|
||||
str(req.get("command") or ""),
|
||||
req.get("profile"),
|
||||
)
|
||||
|
||||
if action == "destroy":
|
||||
return self.pool.destroy(str(req.get("session_id") or ""))
|
||||
|
||||
@@ -2275,7 +2359,7 @@ class BridgeBroker:
|
||||
profile = self._profile_for_run(str(req.get("run_id") or ""))
|
||||
return self._forward(profile, req)
|
||||
|
||||
if action in {"interrupt", "steer", "get_history", "destroy"}:
|
||||
if action in {"interrupt", "steer", "command", "get_history", "destroy"}:
|
||||
session_id = str(req.get("session_id") or "")
|
||||
profile = self._profile_for_session(session_id, req.get("profile"))
|
||||
resp = self._forward(profile, req)
|
||||
|
||||
@@ -126,11 +126,11 @@ function cacheBridgeContext(state: SessionState, data: Record<string, unknown> |
|
||||
export async function handleBridgeRun(
|
||||
nsp: ReturnType<Server['of']>,
|
||||
socket: Socket,
|
||||
data: { input: string | ContentBlock[]; session_id?: string; model?: string; provider?: string; model_groups?: RunModelGroup[]; instructions?: string; source?: string; queue_id?: string; peerExcludeSocketId?: string },
|
||||
data: { input: string | ContentBlock[]; display_input?: string | ContentBlock[] | null; display_role?: 'user' | 'command'; storage_message?: string; session_id?: string; model?: string; provider?: string; model_groups?: RunModelGroup[]; instructions?: string; source?: string; queue_id?: string; peerExcludeSocketId?: string },
|
||||
profile: string,
|
||||
sessionMap: Map<string, SessionState>,
|
||||
bridge: AgentBridgeClient,
|
||||
_skipUserMessage = false,
|
||||
skipUserMessage = false,
|
||||
loadSessionStateFromDbFn: (sid: string, sessionMap: Map<string, SessionState>) => Promise<SessionState>,
|
||||
dequeueNextQueuedRun: (socket: Socket, sessionId: string, fallbackProfile?: string) => void,
|
||||
) {
|
||||
@@ -193,42 +193,55 @@ export async function handleBridgeRun(
|
||||
state.bridgePendingTools = []
|
||||
state.responseRun = undefined
|
||||
|
||||
const inputStr = contentBlocksToString(input)
|
||||
state.messages.push({
|
||||
id: state.messages.length + 1,
|
||||
session_id,
|
||||
runMarker,
|
||||
role: 'user',
|
||||
content: inputStr,
|
||||
timestamp: now,
|
||||
})
|
||||
const displayInput = data.display_input === undefined ? input : data.display_input
|
||||
const inputStr = displayInput == null ? '' : contentBlocksToString(displayInput)
|
||||
const shouldPersistUserMessage = !skipUserMessage && displayInput !== null
|
||||
const displayRole = data.display_role === 'command' ? 'command' : 'user'
|
||||
let messageId: number | string | undefined
|
||||
|
||||
if (!getSession(session_id)) {
|
||||
const previewText = extractTextForPreview(input)
|
||||
if (shouldPersistUserMessage) {
|
||||
state.messages.push({
|
||||
id: state.messages.length + 1,
|
||||
session_id,
|
||||
runMarker,
|
||||
role: displayRole,
|
||||
content: inputStr,
|
||||
timestamp: now,
|
||||
})
|
||||
|
||||
if (!getSession(session_id)) {
|
||||
const previewText = extractTextForPreview(displayInput || input)
|
||||
const preview = previewText.replace(/[\r\n]/g, ' ').substring(0, 100)
|
||||
createSession({ id: session_id, profile, source: 'cli', model: resolvedModel, provider: resolvedProvider, title: preview })
|
||||
}
|
||||
messageId = addMessage({
|
||||
session_id,
|
||||
role: displayRole,
|
||||
content: inputStr,
|
||||
timestamp: now,
|
||||
})
|
||||
} else if (!getSession(session_id)) {
|
||||
const previewText = displayInput === null ? extractTextForPreview(input) : extractTextForPreview(displayInput || input)
|
||||
const preview = previewText.replace(/[\r\n]/g, ' ').substring(0, 100)
|
||||
createSession({ id: session_id, profile, source: 'cli', model: resolvedModel, provider: resolvedProvider, title: preview })
|
||||
}
|
||||
const messageId = addMessage({
|
||||
session_id,
|
||||
role: 'user',
|
||||
content: inputStr,
|
||||
timestamp: now,
|
||||
})
|
||||
|
||||
socket.join(`session:${session_id}`)
|
||||
const peerTarget = data.peerExcludeSocketId
|
||||
? nsp.to(`session:${session_id}`).except(data.peerExcludeSocketId)
|
||||
: socket.to(`session:${session_id}`)
|
||||
peerTarget.emit('run.peer_user_message', {
|
||||
event: 'run.peer_user_message',
|
||||
session_id,
|
||||
message: {
|
||||
id: data.queue_id || messageId,
|
||||
role: 'user',
|
||||
content: inputStr,
|
||||
timestamp: now,
|
||||
},
|
||||
})
|
||||
if (shouldPersistUserMessage) {
|
||||
const peerTarget = data.peerExcludeSocketId
|
||||
? nsp.to(`session:${session_id}`).except(data.peerExcludeSocketId)
|
||||
: socket.to(`session:${session_id}`)
|
||||
peerTarget.emit('run.peer_user_message', {
|
||||
event: 'run.peer_user_message',
|
||||
session_id,
|
||||
message: {
|
||||
id: data.queue_id || messageId,
|
||||
role: displayRole,
|
||||
content: inputStr,
|
||||
timestamp: now,
|
||||
},
|
||||
})
|
||||
}
|
||||
const emit = (event: string, payload: any) => {
|
||||
const tagged = { ...payload, session_id }
|
||||
nsp.to(`session:${session_id}`).emit(event, tagged)
|
||||
@@ -278,9 +291,11 @@ export async function handleBridgeRun(
|
||||
const bridgeInput = isContentBlockArray(input)
|
||||
? await convertContentBlocksForAgent(input)
|
||||
: input
|
||||
const bridgeStorageInput = isContentBlockArray(input)
|
||||
? inputStr
|
||||
: undefined
|
||||
const bridgeStorageInput = data.storage_message !== undefined
|
||||
? data.storage_message
|
||||
: isContentBlockArray(input)
|
||||
? inputStr
|
||||
: undefined
|
||||
logger.info('[chat-run-socket] starting CLI bridge run for session %s', session_id)
|
||||
bridgeLogger.info({
|
||||
sessionId: session_id,
|
||||
@@ -606,6 +621,25 @@ async function applyBridgeChunkAsync(
|
||||
}
|
||||
replaceState(sessionMap, sessionId, 'approval.resolved', payload)
|
||||
emit('approval.resolved', payload)
|
||||
} else if (evType === 'clarify.requested') {
|
||||
const payload = {
|
||||
event: 'clarify.requested',
|
||||
run_id: chunk.run_id,
|
||||
clarify_id: ev.clarify_id,
|
||||
question: ev.question,
|
||||
choices: Array.isArray(ev.choices) ? ev.choices : null,
|
||||
timeout_ms: ev.timeout_ms,
|
||||
}
|
||||
replaceState(sessionMap, sessionId, 'clarify.requested', payload)
|
||||
emit('clarify.requested', payload)
|
||||
} else if (evType === 'clarify.resolved') {
|
||||
const payload = {
|
||||
event: 'clarify.resolved',
|
||||
run_id: chunk.run_id,
|
||||
clarify_id: ev.clarify_id,
|
||||
}
|
||||
replaceState(sessionMap, sessionId, 'clarify.resolved', payload)
|
||||
emit('clarify.resolved', payload)
|
||||
} else if (evType === 'bridge.compression.requested') {
|
||||
const bridgeHistory = await buildDbHistory(sessionId, { excludeLastUser: true })
|
||||
const bridgeUsage = estimateUsageTokensFromMessages(bridgeHistory)
|
||||
|
||||
@@ -99,6 +99,9 @@ export class ChatRunSocket {
|
||||
|
||||
socket.on('run', async (data: {
|
||||
input: string | ContentBlock[]
|
||||
display_input?: string | ContentBlock[] | null
|
||||
display_role?: 'user' | 'command'
|
||||
storage_message?: string
|
||||
session_id?: string
|
||||
model?: string
|
||||
instructions?: string
|
||||
@@ -133,6 +136,7 @@ export class ChatRunSocket {
|
||||
profile: runProfile,
|
||||
model: data.model,
|
||||
instructions: data.instructions,
|
||||
queueId: data.queue_id,
|
||||
runQueuedItem: this.runQueuedItem.bind(this),
|
||||
})
|
||||
} catch (err) {
|
||||
@@ -268,6 +272,9 @@ export class ChatRunSocket {
|
||||
socket: Socket,
|
||||
data: {
|
||||
input: string | ContentBlock[]
|
||||
display_input?: string | ContentBlock[] | null
|
||||
display_role?: 'user' | 'command'
|
||||
storage_message?: string
|
||||
session_id?: string
|
||||
model?: string
|
||||
provider?: string
|
||||
@@ -358,8 +365,12 @@ export class ChatRunSocket {
|
||||
}
|
||||
|
||||
private runQueuedItem(socket: Socket, sessionId: string, next: QueuedRun, fallbackProfile = 'default') {
|
||||
const skipUserMessage = next.displayInput === null
|
||||
void this.handleRun(socket, {
|
||||
input: next.input,
|
||||
display_input: next.displayInput,
|
||||
display_role: next.displayRole,
|
||||
storage_message: next.storageMessage,
|
||||
session_id: sessionId,
|
||||
model: next.model,
|
||||
provider: next.provider,
|
||||
@@ -368,7 +379,7 @@ export class ChatRunSocket {
|
||||
source: next.source,
|
||||
queue_id: next.queue_id,
|
||||
peerExcludeSocketId: next.originSocketId,
|
||||
}, next.profile || fallbackProfile, true)
|
||||
}, next.profile || fallbackProfile, skipUserMessage)
|
||||
}
|
||||
|
||||
// --- Helpers ---
|
||||
@@ -384,8 +395,10 @@ export class ChatRunSocket {
|
||||
private serializeQueuedMessages(queue: QueuedRun[]) {
|
||||
return queue.map(item => ({
|
||||
id: item.queue_id,
|
||||
role: 'user',
|
||||
content: contentBlocksToString(item.input),
|
||||
role: item.displayRole || (typeof item.displayInput === 'string' && item.displayInput.trim().startsWith('/') ? 'command' : 'user'),
|
||||
content: item.displayInput === null
|
||||
? (item.storageMessage || '')
|
||||
: contentBlocksToString(item.displayInput ?? item.input),
|
||||
timestamp: Math.floor(Date.now() / 1000),
|
||||
queued: true,
|
||||
}))
|
||||
|
||||
@@ -14,6 +14,7 @@ type CommandName =
|
||||
| 'status'
|
||||
| 'abort'
|
||||
| 'queue'
|
||||
| 'plan'
|
||||
| 'clear'
|
||||
| 'title'
|
||||
| 'compress'
|
||||
@@ -34,6 +35,7 @@ interface SessionCommandContext {
|
||||
profile: string
|
||||
model?: string
|
||||
instructions?: string
|
||||
queueId?: string
|
||||
runQueuedItem: (socket: Socket, sessionId: string, next: QueuedRun, fallbackProfile?: string) => void
|
||||
}
|
||||
|
||||
@@ -42,6 +44,7 @@ const COMMAND_ALIASES: Record<string, CommandName> = {
|
||||
status: 'status',
|
||||
abort: 'abort',
|
||||
queue: 'queue',
|
||||
plan: 'plan',
|
||||
clear: 'clear',
|
||||
title: 'title',
|
||||
compress: 'compress',
|
||||
@@ -74,7 +77,9 @@ export async function handleSessionCommand(
|
||||
const state = getOrCreateSession(ctx.sessionMap, sessionId)
|
||||
ctx.socket.join(`session:${sessionId}`)
|
||||
ensureCommandSession(sessionId, ctx)
|
||||
persistCommandMessage(sessionId, state, `/${command.rawName}${command.args ? ` ${command.args}` : ''}`)
|
||||
if (command.name !== 'plan') {
|
||||
persistCommandMessage(sessionId, state, `/${command.rawName}${command.args ? ` ${command.args}` : ''}`)
|
||||
}
|
||||
|
||||
const emitCommand = (payload: Record<string, unknown>) => {
|
||||
const message = typeof payload.message === 'string' ? payload.message : ''
|
||||
@@ -182,6 +187,74 @@ export async function handleSessionCommand(
|
||||
return
|
||||
}
|
||||
|
||||
case 'plan': {
|
||||
const bridgeCommand = `plan${command.args ? ` ${command.args}` : ''}`
|
||||
let result
|
||||
try {
|
||||
result = await ctx.bridge.command(sessionId, bridgeCommand, ctx.profile)
|
||||
} catch (err) {
|
||||
emitCommand({
|
||||
ok: false,
|
||||
action: 'plan',
|
||||
terminal: !state.isWorking,
|
||||
message: `Plan command failed: ${err instanceof Error ? err.message : String(err)}`,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
if (!result.handled || !result.message) {
|
||||
emitCommand({
|
||||
ok: false,
|
||||
action: 'plan',
|
||||
terminal: !state.isWorking,
|
||||
message: result.message || 'Plan command is not available.',
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
const queueId = ctx.queueId || `queue_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}`
|
||||
const displayCommand = `/${bridgeCommand}`
|
||||
const next: QueuedRun = {
|
||||
queue_id: queueId,
|
||||
input: result.message,
|
||||
displayInput: displayCommand,
|
||||
displayRole: 'command',
|
||||
storageMessage: displayCommand,
|
||||
model: ctx.model,
|
||||
instructions: ctx.instructions,
|
||||
profile: ctx.profile,
|
||||
source: 'cli',
|
||||
originSocketId: ctx.socket.id,
|
||||
}
|
||||
|
||||
if (state.isWorking) {
|
||||
state.queue.push(next)
|
||||
emitToSession(ctx.nsp, ctx.socket, sessionId, 'run.queued', {
|
||||
event: 'run.queued',
|
||||
session_id: sessionId,
|
||||
queue_length: state.queue.length,
|
||||
queued_messages: state.queue.map(item => ({
|
||||
id: item.queue_id,
|
||||
role: typeof item.displayInput === 'string' && item.displayInput.trim().startsWith('/') ? 'command' : 'user',
|
||||
content: item.displayInput === null
|
||||
? (item.storageMessage || '')
|
||||
: contentBlocksToString(item.displayInput ?? item.input),
|
||||
timestamp: Math.floor(Date.now() / 1000),
|
||||
queued: true,
|
||||
})),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
emitCommand({
|
||||
action: 'plan',
|
||||
terminal: false,
|
||||
started: true,
|
||||
})
|
||||
ctx.runQueuedItem(ctx.socket, sessionId, next, ctx.profile)
|
||||
return
|
||||
}
|
||||
|
||||
case 'clear': {
|
||||
if (command.args === '--history') {
|
||||
if (state.isWorking) {
|
||||
|
||||
@@ -28,6 +28,9 @@ export interface SessionMessage {
|
||||
export interface QueuedRun {
|
||||
queue_id: string
|
||||
input: string | ContentBlock[]
|
||||
displayInput?: string | ContentBlock[] | null
|
||||
displayRole?: 'user' | 'command'
|
||||
storageMessage?: string
|
||||
model?: string
|
||||
provider?: string
|
||||
model_groups?: Array<{ provider: string; models: string[] }>
|
||||
|
||||
Reference in New Issue
Block a user