Kanban:补齐看板事件、链接与批量操作闭环 (#634)
* feat(kanban): add board-scoped event stream bridge * test(kanban): align event refresh expectation * feat(kanban): add links and partial bulk bridge * test(kanban): align links bulk refresh expectation * fix(kanban): treat mutation stderr as failed
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
import { request } from '../client'
|
||||
import { request, getApiKey, getBaseUrlValue } from '../client'
|
||||
|
||||
// ─── Types ──────────────────────────────────────────────────────
|
||||
|
||||
@@ -84,6 +84,8 @@ export interface KanbanTaskDetail {
|
||||
comments: KanbanComment[]
|
||||
events: KanbanEvent[]
|
||||
runs: KanbanRun[]
|
||||
parents?: string[]
|
||||
children?: string[]
|
||||
}
|
||||
|
||||
export interface KanbanStats {
|
||||
@@ -198,6 +200,26 @@ export interface KanbanDispatchOptions extends KanbanBoardOptions {
|
||||
failureLimit?: number
|
||||
}
|
||||
|
||||
export interface KanbanLinkRequest {
|
||||
parent_id: string
|
||||
child_id: string
|
||||
}
|
||||
|
||||
export interface KanbanBulkUpdateRequest {
|
||||
ids: string[]
|
||||
status?: KanbanTaskStatus
|
||||
assignee?: string | null
|
||||
archive?: boolean
|
||||
summary?: string
|
||||
reason?: string
|
||||
}
|
||||
|
||||
export interface KanbanBulkTaskResult {
|
||||
id: string
|
||||
ok: boolean
|
||||
error?: string
|
||||
}
|
||||
|
||||
function normalizedBoard(board?: string): string {
|
||||
const trimmed = board?.trim()
|
||||
return trimmed || 'default'
|
||||
@@ -214,6 +236,37 @@ function boardParams(board?: string): URLSearchParams {
|
||||
return params
|
||||
}
|
||||
|
||||
function websocketProtocol(base?: string): string {
|
||||
if (base) return base.startsWith('https') ? 'wss:' : 'ws:'
|
||||
return location.protocol === 'https:' ? 'wss:' : 'ws:'
|
||||
}
|
||||
|
||||
function formatHostForPort(hostname: string, port: number): string {
|
||||
if (hostname.startsWith('[') && hostname.endsWith(']')) return `${hostname}:${port}`
|
||||
return hostname.includes(':') ? `[${hostname}]:${port}` : `${hostname}:${port}`
|
||||
}
|
||||
|
||||
export function buildKanbanEventsWebSocketUrl(opts?: KanbanBoardOptions): string {
|
||||
const base = getBaseUrlValue()
|
||||
const params = boardParams(opts?.board)
|
||||
const token = getApiKey()
|
||||
if (token) params.set('token', token)
|
||||
const path = `/api/hermes/kanban/events?${params.toString()}`
|
||||
|
||||
if (base) {
|
||||
return `${websocketProtocol(base)}//${new URL(base).host}${path}`
|
||||
}
|
||||
|
||||
const host = import.meta.env.DEV
|
||||
? formatHostForPort(location.hostname, 8648)
|
||||
: location.host
|
||||
return `${websocketProtocol()}//${host}${path}`
|
||||
}
|
||||
|
||||
export function openKanbanEventStream(opts?: KanbanBoardOptions): WebSocket {
|
||||
return new WebSocket(buildKanbanEventsWebSocketUrl(opts))
|
||||
}
|
||||
|
||||
// ─── API functions ───────────────────────────────────────────────
|
||||
|
||||
export async function listBoards(opts?: { includeArchived?: boolean }): Promise<KanbanBoard[]> {
|
||||
@@ -299,6 +352,29 @@ export async function addComment(taskId: string, data: KanbanCommentCreateReques
|
||||
})
|
||||
}
|
||||
|
||||
export async function linkTasks(data: KanbanLinkRequest, opts?: KanbanBoardOptions): Promise<{ ok: boolean; output?: string }> {
|
||||
return request<{ ok: boolean; output?: string }>(appendQuery('/api/hermes/kanban/links', boardParams(opts?.board)), {
|
||||
method: 'POST',
|
||||
body: JSON.stringify(data),
|
||||
})
|
||||
}
|
||||
|
||||
export async function unlinkTasks(data: KanbanLinkRequest, opts?: KanbanBoardOptions): Promise<{ ok: boolean; output?: string }> {
|
||||
const params = boardParams(opts?.board)
|
||||
params.set('parent_id', data.parent_id)
|
||||
params.set('child_id', data.child_id)
|
||||
return request<{ ok: boolean; output?: string }>(appendQuery('/api/hermes/kanban/links', params), {
|
||||
method: 'DELETE',
|
||||
})
|
||||
}
|
||||
|
||||
export async function bulkUpdateTasks(data: KanbanBulkUpdateRequest, opts?: KanbanBoardOptions): Promise<{ results: KanbanBulkTaskResult[] }> {
|
||||
return request<{ results: KanbanBulkTaskResult[] }>(appendQuery('/api/hermes/kanban/tasks/bulk', boardParams(opts?.board)), {
|
||||
method: 'POST',
|
||||
body: JSON.stringify(data),
|
||||
})
|
||||
}
|
||||
|
||||
export async function getTaskLog(taskId: string, opts?: KanbanTaskLogOptions): Promise<KanbanTaskLog> {
|
||||
const params = boardParams(opts?.board)
|
||||
if (opts?.tail !== undefined) params.set('tail', String(opts.tail))
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { defineStore } from 'pinia'
|
||||
import { computed, ref } from 'vue'
|
||||
import * as kanbanApi from '@/api/hermes/kanban'
|
||||
import type { KanbanTask, KanbanStats, KanbanAssignee, KanbanBoard, KanbanCapabilities, KanbanDiagnosticsOptions, KanbanDispatchOptions } from '@/api/hermes/kanban'
|
||||
import type { KanbanTask, KanbanStats, KanbanAssignee, KanbanBoard, KanbanCapabilities, KanbanDiagnosticsOptions, KanbanDispatchOptions, KanbanBulkUpdateRequest } from '@/api/hermes/kanban'
|
||||
|
||||
export const KANBAN_SELECTED_BOARD_STORAGE_KEY = 'hermes.kanban.selectedBoard'
|
||||
export const DEFAULT_KANBAN_BOARD = 'default'
|
||||
@@ -53,6 +53,11 @@ export const useKanbanStore = defineStore('kanban', () => {
|
||||
let statsRequestSeq = 0
|
||||
let assigneesRequestSeq = 0
|
||||
let loadingRequestSeq = 0
|
||||
let eventStreamSeq = 0
|
||||
let eventSocket: WebSocket | null = null
|
||||
let eventReconnectTimer: ReturnType<typeof setTimeout> | null = null
|
||||
let eventRefreshTimer: ReturnType<typeof setTimeout> | null = null
|
||||
let eventStreamEnabled = false
|
||||
|
||||
const activeBoards = computed(() => {
|
||||
const visible = boards.value.filter(board => !board.archived)
|
||||
@@ -86,6 +91,19 @@ export const useKanbanStore = defineStore('kanban', () => {
|
||||
}
|
||||
}
|
||||
|
||||
function hasCapabilityStatus(key: string, statuses: Array<'supported' | 'partial' | 'missing'>): boolean {
|
||||
const detail = capabilities.value?.capabilities?.find(capability => capability.key === key)
|
||||
if (detail) return statuses.includes(detail.status)
|
||||
if (statuses.includes('supported')) return isCapabilitySupported(key)
|
||||
return false
|
||||
}
|
||||
|
||||
function assertCapabilityStatus(key: string, statuses: Array<'supported' | 'partial' | 'missing'>): void {
|
||||
if (!hasCapabilityStatus(key, statuses)) {
|
||||
throw new Error(`Kanban capability "${key}" is not available with the required status`)
|
||||
}
|
||||
}
|
||||
|
||||
function boardExists(board: string): boolean {
|
||||
return activeBoards.value.some(item => item.slug === board)
|
||||
}
|
||||
@@ -102,6 +120,97 @@ export const useKanbanStore = defineStore('kanban', () => {
|
||||
assignees.value = []
|
||||
}
|
||||
|
||||
function clearEventTimers() {
|
||||
if (eventReconnectTimer) clearTimeout(eventReconnectTimer)
|
||||
if (eventRefreshTimer) clearTimeout(eventRefreshTimer)
|
||||
eventReconnectTimer = null
|
||||
eventRefreshTimer = null
|
||||
}
|
||||
|
||||
function closeEventSocket() {
|
||||
if (!eventSocket) return
|
||||
const socket = eventSocket
|
||||
eventSocket = null
|
||||
socket.onclose = null
|
||||
socket.onerror = null
|
||||
socket.onmessage = null
|
||||
try { socket.close() } catch { }
|
||||
}
|
||||
|
||||
function stopEventStream() {
|
||||
eventStreamEnabled = false
|
||||
eventStreamSeq++
|
||||
clearEventTimers()
|
||||
closeEventSocket()
|
||||
}
|
||||
|
||||
function scheduleEventRefresh(board: string, generation: number, seq: number) {
|
||||
if (eventRefreshTimer) clearTimeout(eventRefreshTimer)
|
||||
eventRefreshTimer = setTimeout(() => {
|
||||
if (!eventStreamEnabled || seq !== eventStreamSeq || generation !== boardGeneration || board !== selectedBoard.value) return
|
||||
void Promise.all([fetchBoards(), fetchTasks(true), fetchStats(), fetchAssignees()])
|
||||
}, 100)
|
||||
}
|
||||
|
||||
function scheduleEventReconnect(board: string, generation: number, seq: number) {
|
||||
if (eventReconnectTimer) clearTimeout(eventReconnectTimer)
|
||||
eventReconnectTimer = setTimeout(() => {
|
||||
if (!eventStreamEnabled || seq !== eventStreamSeq || generation !== boardGeneration || board !== selectedBoard.value) return
|
||||
connectEventStream(board, generation, seq)
|
||||
}, 3000)
|
||||
}
|
||||
|
||||
function connectEventStream(board: string, generation: number, seq: number) {
|
||||
closeEventSocket()
|
||||
let socket: WebSocket
|
||||
try {
|
||||
socket = kanbanApi.openKanbanEventStream({ board })
|
||||
} catch (err) {
|
||||
console.error('Failed to open kanban event stream:', err)
|
||||
scheduleEventReconnect(board, generation, seq)
|
||||
return
|
||||
}
|
||||
eventSocket = socket
|
||||
socket.onmessage = (event) => {
|
||||
if (!eventStreamEnabled || seq !== eventStreamSeq || generation !== boardGeneration || board !== selectedBoard.value) return
|
||||
try {
|
||||
const payload = JSON.parse(String(event.data))
|
||||
if (payload?.type === 'event') scheduleEventRefresh(board, generation, seq)
|
||||
} catch {
|
||||
scheduleEventRefresh(board, generation, seq)
|
||||
}
|
||||
}
|
||||
socket.onerror = () => {
|
||||
if (eventSocket === socket) console.error('Kanban event stream error')
|
||||
}
|
||||
socket.onclose = () => {
|
||||
if (eventSocket === socket) {
|
||||
eventSocket = null
|
||||
scheduleEventReconnect(board, generation, seq)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function hasEventStreamCapability(): boolean {
|
||||
const status = capabilities.value?.capabilities?.find(capability => capability.key === 'events')?.status
|
||||
return status === 'supported' || status === 'partial' || isCapabilitySupported('events')
|
||||
}
|
||||
|
||||
function startEventStream() {
|
||||
if (!hasEventStreamCapability()) return false
|
||||
eventStreamEnabled = true
|
||||
const seq = ++eventStreamSeq
|
||||
const generation = boardGeneration
|
||||
const board = selectedBoard.value
|
||||
clearEventTimers()
|
||||
connectEventStream(board, generation, seq)
|
||||
return true
|
||||
}
|
||||
|
||||
function restartEventStreamIfActive() {
|
||||
if (eventStreamEnabled) startEventStream()
|
||||
}
|
||||
|
||||
function setSelectedBoard(board?: string | null): string {
|
||||
const resolved = resolveAvailableBoard(board)
|
||||
const changed = selectedBoard.value !== resolved
|
||||
@@ -111,6 +220,7 @@ export const useKanbanStore = defineStore('kanban', () => {
|
||||
if (changed) {
|
||||
clearBoardScopedState()
|
||||
boardGeneration++
|
||||
restartEventStreamIfActive()
|
||||
}
|
||||
return resolved
|
||||
}
|
||||
@@ -276,6 +386,30 @@ export const useKanbanStore = defineStore('kanban', () => {
|
||||
return kanbanApi.addComment(taskId, { body, author }, { board: selectedBoard.value })
|
||||
}
|
||||
|
||||
async function linkTasks(parentId: string, childId: string) {
|
||||
assertCapability('links')
|
||||
const board = selectedBoard.value
|
||||
const result = await kanbanApi.linkTasks({ parent_id: parentId, child_id: childId }, { board })
|
||||
if (board === selectedBoard.value) await Promise.all([fetchTasks(true), fetchStats(), fetchBoards()])
|
||||
return result
|
||||
}
|
||||
|
||||
async function unlinkTasks(parentId: string, childId: string) {
|
||||
assertCapability('links')
|
||||
const board = selectedBoard.value
|
||||
const result = await kanbanApi.unlinkTasks({ parent_id: parentId, child_id: childId }, { board })
|
||||
if (board === selectedBoard.value) await Promise.all([fetchTasks(true), fetchStats(), fetchBoards()])
|
||||
return result
|
||||
}
|
||||
|
||||
async function bulkUpdateTasks(data: Omit<KanbanBulkUpdateRequest, 'ids'> & { ids: string[] }) {
|
||||
assertCapabilityStatus('bulk', ['supported', 'partial'])
|
||||
const board = selectedBoard.value
|
||||
const result = await kanbanApi.bulkUpdateTasks(data, { board })
|
||||
if (board === selectedBoard.value) await Promise.all([fetchTasks(true), fetchStats(), fetchBoards(), fetchAssignees()])
|
||||
return result
|
||||
}
|
||||
|
||||
async function getTaskLog(taskId: string, tail?: number) {
|
||||
assertCapability('taskLog')
|
||||
return kanbanApi.getTaskLog(taskId, { board: selectedBoard.value, tail })
|
||||
@@ -358,6 +492,9 @@ export const useKanbanStore = defineStore('kanban', () => {
|
||||
unblockTasks,
|
||||
assignTask,
|
||||
addComment,
|
||||
linkTasks,
|
||||
unlinkTasks,
|
||||
bulkUpdateTasks,
|
||||
getTaskLog,
|
||||
getDiagnostics,
|
||||
reclaimTask,
|
||||
@@ -366,6 +503,8 @@ export const useKanbanStore = defineStore('kanban', () => {
|
||||
dispatch,
|
||||
setFilter,
|
||||
setSelectedBoard,
|
||||
startEventStream,
|
||||
stopEventStream,
|
||||
recoverSelectedBoard,
|
||||
resolveAvailableBoard,
|
||||
clearBoardScopedState,
|
||||
|
||||
@@ -113,6 +113,7 @@ watch(() => route.query.board, async () => {
|
||||
onMounted(async () => {
|
||||
await Promise.all([kanbanStore.fetchBoards(), kanbanStore.fetchCapabilities()])
|
||||
await applyBoardSelection(routeBoard(), true, true)
|
||||
kanbanStore.startEventStream()
|
||||
routeReady.value = true
|
||||
refreshTimer.value = setInterval(() => {
|
||||
if (document.visibilityState === 'visible') {
|
||||
@@ -122,6 +123,7 @@ onMounted(async () => {
|
||||
})
|
||||
|
||||
onUnmounted(() => {
|
||||
kanbanStore.stopEventStream()
|
||||
if (refreshTimer.value) clearInterval(refreshTimer.value)
|
||||
})
|
||||
|
||||
|
||||
@@ -41,6 +41,7 @@ function validSeverity(value?: string): value is 'warning' | 'error' | 'critical
|
||||
const MAX_LOG_TAIL_BYTES = 1_000_000
|
||||
const MAX_DISPATCH_TASKS = 100
|
||||
const MAX_DISPATCH_FAILURE_LIMIT = 100
|
||||
const MAX_BULK_TASKS = 100
|
||||
|
||||
type PositiveIntegerResult = { value?: number; error?: string }
|
||||
type StringResult = { value?: string; error?: string }
|
||||
@@ -85,6 +86,25 @@ function optionalString(value: unknown, name: string): StringResult {
|
||||
return { value }
|
||||
}
|
||||
|
||||
function optionalNullableString(value: unknown, name: string): { value?: string | null; error?: string } {
|
||||
if (value === undefined) return {}
|
||||
if (value === null) return { value: null }
|
||||
if (typeof value !== 'string') return { error: `${name} must be a string` }
|
||||
return { value }
|
||||
}
|
||||
|
||||
function hasOwn(body: Record<string, unknown>, key: string): boolean {
|
||||
return Object.prototype.hasOwnProperty.call(body, key)
|
||||
}
|
||||
|
||||
function optionalTaskStatus(value: unknown, name: string): { value?: kanbanCli.KanbanTaskStatus; error?: string } {
|
||||
if (value === undefined || value === null) return {}
|
||||
if (value !== 'triage' && value !== 'todo' && value !== 'ready' && value !== 'running' && value !== 'blocked' && value !== 'done' && value !== 'archived') {
|
||||
return { error: `${name} must be a valid kanban task status` }
|
||||
}
|
||||
return { value }
|
||||
}
|
||||
|
||||
function requiredNonEmptyString(value: unknown, name: string): StringResult {
|
||||
if (typeof value !== 'string' || !value.trim()) return { error: `${name} is required` }
|
||||
return { value }
|
||||
@@ -364,6 +384,80 @@ export async function addComment(ctx: Context) {
|
||||
}
|
||||
}
|
||||
|
||||
export async function linkTasks(ctx: Context) {
|
||||
const bodyResult = requestBody(ctx)
|
||||
if (rejectBadRequest(ctx, bodyResult.error)) return
|
||||
const parentId = requiredNonEmptyString(bodyResult.body.parent_id, 'parent_id')
|
||||
const childId = requiredNonEmptyString(bodyResult.body.child_id, 'child_id')
|
||||
if (rejectBadRequest(ctx, parentId.error || childId.error)) return
|
||||
const board = requestBoard(ctx)
|
||||
if (!board) return
|
||||
try {
|
||||
ctx.body = await kanbanCli.linkTasks(parentId.value!.trim(), childId.value!.trim(), { board })
|
||||
} catch (err: any) {
|
||||
ctx.status = 500
|
||||
ctx.body = { error: err.message }
|
||||
}
|
||||
}
|
||||
|
||||
export async function unlinkTasks(ctx: Context) {
|
||||
const parentId = requiredNonEmptyString(firstQueryValue(ctx.query.parent_id as string | string[] | undefined), 'parent_id')
|
||||
const childId = requiredNonEmptyString(firstQueryValue(ctx.query.child_id as string | string[] | undefined), 'child_id')
|
||||
if (rejectBadRequest(ctx, parentId.error || childId.error)) return
|
||||
const board = requestBoard(ctx)
|
||||
if (!board) return
|
||||
try {
|
||||
ctx.body = await kanbanCli.unlinkTasks(parentId.value!.trim(), childId.value!.trim(), { board })
|
||||
} catch (err: any) {
|
||||
ctx.status = 500
|
||||
ctx.body = { error: err.message }
|
||||
}
|
||||
}
|
||||
|
||||
export async function bulkUpdateTasks(ctx: Context) {
|
||||
const bodyResult = requestBody(ctx)
|
||||
if (rejectBadRequest(ctx, bodyResult.error)) return
|
||||
const body = bodyResult.body
|
||||
const ids = requiredNonEmptyStringArray(body.ids, 'ids')
|
||||
const status = optionalTaskStatus(body.status, 'status')
|
||||
const assignee = optionalNullableString(body.assignee, 'assignee')
|
||||
const archive = optionalBoolean(body.archive, 'archive')
|
||||
const summary = optionalString(body.summary, 'summary')
|
||||
const reason = optionalString(body.reason, 'reason')
|
||||
if (rejectBadRequest(ctx, ids.error || status.error || assignee.error || archive.error || summary.error || reason.error)) return
|
||||
if (!archive.value && status.value === undefined && !hasOwn(body, 'assignee')) {
|
||||
ctx.status = 400
|
||||
ctx.body = { error: 'at least one bulk action is required' }
|
||||
return
|
||||
}
|
||||
if (ids.value!.length > MAX_BULK_TASKS) {
|
||||
ctx.status = 400
|
||||
ctx.body = { error: `ids must contain <= ${MAX_BULK_TASKS} tasks` }
|
||||
return
|
||||
}
|
||||
if (archive.value && status.value !== undefined) {
|
||||
ctx.status = 400
|
||||
ctx.body = { error: 'archive cannot be combined with status' }
|
||||
return
|
||||
}
|
||||
const board = requestBoard(ctx)
|
||||
if (!board) return
|
||||
try {
|
||||
ctx.body = await kanbanCli.bulkUpdateTasks({
|
||||
board,
|
||||
ids: ids.value!.map(id => id.trim()),
|
||||
status: status.value,
|
||||
assignee: assignee.value,
|
||||
archive: archive.value,
|
||||
summary: summary.value,
|
||||
reason: reason.value,
|
||||
})
|
||||
} catch (err: any) {
|
||||
ctx.status = 500
|
||||
ctx.body = { error: err.message }
|
||||
}
|
||||
}
|
||||
|
||||
export async function taskLog(ctx: Context) {
|
||||
const board = requestBoard(ctx)
|
||||
if (!board) return
|
||||
|
||||
@@ -13,6 +13,7 @@ import { initLoginLimiter } from './services/login-limiter'
|
||||
import { initGatewayManager, getGatewayManagerInstance } from './services/gateway-bootstrap'
|
||||
import { bindShutdown } from './services/shutdown'
|
||||
import { setupTerminalWebSocket } from './routes/hermes/terminal'
|
||||
import { setupKanbanEventsWebSocket } from './routes/hermes/kanban-events'
|
||||
import { startVersionCheck } from './routes/health'
|
||||
import { registerRoutes } from './routes'
|
||||
import { setGroupChatServer } from './routes/hermes/group-chat'
|
||||
@@ -140,7 +141,8 @@ export async function bootstrap() {
|
||||
console.log('[bootstrap] app.listen called')
|
||||
|
||||
setupTerminalWebSocket(servers)
|
||||
console.log('[bootstrap] terminal websocket setup')
|
||||
setupKanbanEventsWebSocket(servers)
|
||||
console.log('[bootstrap] terminal + kanban websocket setup')
|
||||
|
||||
// Group chat Socket.IO (must be after server is created)
|
||||
const groupChatServer = new GroupChatServer(servers)
|
||||
@@ -163,7 +165,7 @@ export async function bootstrap() {
|
||||
servers.forEach((httpServer) => {
|
||||
httpServer.on('upgrade', (req: any, socket: any) => {
|
||||
const url = new URL(req.url || '', `http://${req.headers.host}`)
|
||||
if (url.pathname !== '/api/hermes/terminal' && !url.pathname.startsWith('/socket.io/')) {
|
||||
if (url.pathname !== '/api/hermes/terminal' && url.pathname !== '/api/hermes/kanban/events' && !url.pathname.startsWith('/socket.io/')) {
|
||||
socket.destroy()
|
||||
}
|
||||
})
|
||||
|
||||
@@ -0,0 +1,100 @@
|
||||
import { WebSocketServer } from 'ws'
|
||||
import type { WebSocket } from 'ws'
|
||||
import type { Server as HttpServer, IncomingMessage } from 'http'
|
||||
import { getToken } from '../../services/auth'
|
||||
import { logger } from '../../services/logger'
|
||||
import * as kanbanCli from '../../services/hermes/hermes-kanban'
|
||||
|
||||
interface KanbanEventsRequest extends IncomingMessage {
|
||||
kanbanBoard?: string
|
||||
}
|
||||
|
||||
function sendJson(ws: WebSocket, payload: Record<string, unknown>) {
|
||||
if (ws.readyState === ws.OPEN) ws.send(JSON.stringify(payload))
|
||||
}
|
||||
|
||||
function streamLines(onLine: (line: string) => void) {
|
||||
let buffer = ''
|
||||
return (chunk: Buffer | string) => {
|
||||
buffer += chunk.toString()
|
||||
const lines = buffer.split(/\r?\n/)
|
||||
buffer = lines.pop() || ''
|
||||
for (const line of lines) {
|
||||
const trimmed = line.trim()
|
||||
if (trimmed) onLine(trimmed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function setupKanbanEventsWebSocket(httpServers: HttpServer | HttpServer[]) {
|
||||
const wss = new WebSocketServer({ noServer: true })
|
||||
const servers = Array.isArray(httpServers) ? httpServers : [httpServers]
|
||||
|
||||
servers.forEach((httpServer) => {
|
||||
httpServer.on('upgrade', async (req: KanbanEventsRequest, socket, head) => {
|
||||
const url = new URL(req.url || '', `http://${req.headers.host}`)
|
||||
if (url.pathname !== '/api/hermes/kanban/events') return
|
||||
|
||||
const authToken = await getToken()
|
||||
if (authToken) {
|
||||
const token = url.searchParams.get('token') || ''
|
||||
if (token !== authToken) {
|
||||
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n')
|
||||
socket.destroy()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
req.kanbanBoard = kanbanCli.normalizeBoardSlug(url.searchParams.get('board'))
|
||||
} catch {
|
||||
socket.write('HTTP/1.1 400 Bad Request\r\n\r\n')
|
||||
socket.destroy()
|
||||
return
|
||||
}
|
||||
|
||||
wss.handleUpgrade(req, socket, head, (ws) => {
|
||||
wss.emit('connection', ws, req)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
wss.on('connection', (ws, req: KanbanEventsRequest) => {
|
||||
const board = req.kanbanBoard || 'default'
|
||||
const child = kanbanCli.watchEvents({ board, interval: 0.5 })
|
||||
let closed = false
|
||||
|
||||
sendJson(ws, { type: 'connected', board })
|
||||
|
||||
const closeChild = () => {
|
||||
if (closed) return
|
||||
closed = true
|
||||
if (!child.killed) child.kill()
|
||||
}
|
||||
|
||||
child.stdout?.on('data', streamLines((line) => {
|
||||
if (line.toLowerCase().startsWith('watching kanban events')) return
|
||||
sendJson(ws, { type: 'event', board, line })
|
||||
}))
|
||||
|
||||
child.stderr?.on('data', streamLines((line) => {
|
||||
sendJson(ws, { type: 'error', board, message: line })
|
||||
}))
|
||||
|
||||
child.on('error', (err) => {
|
||||
logger.error(err, 'Hermes CLI: kanban watch failed')
|
||||
sendJson(ws, { type: 'error', board, message: err.message })
|
||||
if (ws.readyState === ws.OPEN) ws.close()
|
||||
})
|
||||
|
||||
child.on('exit', (code, signal) => {
|
||||
sendJson(ws, { type: 'stopped', board, code, signal })
|
||||
if (ws.readyState === ws.OPEN) ws.close()
|
||||
})
|
||||
|
||||
ws.on('close', closeChild)
|
||||
ws.on('error', closeChild)
|
||||
})
|
||||
|
||||
logger.info('WebSocket ready at /api/hermes/kanban/events (kanban watch bridge)')
|
||||
}
|
||||
@@ -13,6 +13,9 @@ kanbanRoutes.get('/api/hermes/kanban/diagnostics', ctrl.diagnostics)
|
||||
kanbanRoutes.post('/api/hermes/kanban/dispatch', ctrl.dispatch)
|
||||
kanbanRoutes.get('/api/hermes/kanban/artifact', ctrl.readArtifact)
|
||||
kanbanRoutes.get('/api/hermes/kanban/search-sessions', ctrl.searchSessions)
|
||||
kanbanRoutes.post('/api/hermes/kanban/links', ctrl.linkTasks)
|
||||
kanbanRoutes.delete('/api/hermes/kanban/links', ctrl.unlinkTasks)
|
||||
kanbanRoutes.post('/api/hermes/kanban/tasks/bulk', ctrl.bulkUpdateTasks)
|
||||
kanbanRoutes.get('/api/hermes/kanban', ctrl.list)
|
||||
kanbanRoutes.get('/api/hermes/kanban/:id', ctrl.get)
|
||||
kanbanRoutes.post('/api/hermes/kanban', ctrl.create)
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { execFile } from 'child_process'
|
||||
import { execFile, spawn } from 'child_process'
|
||||
import type { ChildProcess } from 'child_process'
|
||||
import { promisify } from 'util'
|
||||
import { logger } from '../logger'
|
||||
|
||||
@@ -155,6 +156,29 @@ export interface KanbanBoardOptions {
|
||||
board?: string
|
||||
}
|
||||
|
||||
export interface KanbanWatchOptions extends KanbanBoardOptions {
|
||||
interval?: number
|
||||
}
|
||||
|
||||
export interface KanbanBulkTaskUpdateOptions extends KanbanBoardOptions {
|
||||
ids: string[]
|
||||
status?: KanbanTaskStatus
|
||||
assignee?: string | null
|
||||
archive?: boolean
|
||||
summary?: string
|
||||
reason?: string
|
||||
}
|
||||
|
||||
export interface KanbanBulkTaskResult {
|
||||
id: string
|
||||
ok: boolean
|
||||
error?: string
|
||||
}
|
||||
|
||||
export interface KanbanBulkTaskUpdateResult {
|
||||
results: KanbanBulkTaskResult[]
|
||||
}
|
||||
|
||||
// ─── CLI wrappers ───────────────────────────────────────────────
|
||||
|
||||
export async function listBoards(opts?: { includeArchived?: boolean }): Promise<KanbanBoard[]> {
|
||||
@@ -235,9 +259,9 @@ export async function getCapabilities(): Promise<KanbanCapabilities> {
|
||||
{ key: 'reassign', status: 'supported', canonicalRoute: '/tasks/{task_id}/reassign', canonicalCommand: 'reassign', requiresBoard: true },
|
||||
{ key: 'specify', status: 'supported', canonicalRoute: '/tasks/{task_id}/specify', canonicalCommand: 'specify', requiresBoard: true },
|
||||
{ key: 'dispatch', status: 'supported', canonicalRoute: '/dispatch', canonicalCommand: 'dispatch', requiresBoard: true },
|
||||
{ key: 'links', status: 'missing', reason: 'Deferred from current WUI parity batch', canonicalRoute: '/links', canonicalCommand: 'link/unlink', requiresBoard: true },
|
||||
{ key: 'bulk', status: 'missing', reason: 'Deferred from current WUI parity batch', canonicalRoute: '/tasks/bulk', canonicalCommand: 'bulk-equivalent', requiresBoard: true },
|
||||
{ key: 'events', status: 'missing', reason: 'Streaming strategy not selected for WUI yet', canonicalRoute: '/events', canonicalCommand: 'watch', requiresBoard: true },
|
||||
{ key: 'links', status: 'supported', canonicalRoute: '/links', canonicalCommand: 'link/unlink', requiresBoard: true },
|
||||
{ key: 'bulk', status: 'partial', reason: 'WUI applies supported bulk-equivalent CLI transitions per id and returns per-task outcomes; direct priority/status patch parity remains deferred', canonicalRoute: '/tasks/bulk', canonicalCommand: 'bulk-equivalent via complete/block/unblock/archive/assign', requiresBoard: true },
|
||||
{ key: 'events', status: 'partial', reason: 'WUI exposes a board-scoped WebSocket bridge backed by the canonical `kanban watch` stream; payload is currently a refresh invalidation signal, not a typed event model', canonicalRoute: '/events', canonicalCommand: 'watch', requiresBoard: true },
|
||||
{ key: 'homeSubscriptions', status: 'missing', reason: 'Deferred from current WUI parity batch', canonicalRoute: '/home-channels and subscription routes', canonicalCommand: 'notify-*', requiresBoard: true },
|
||||
]
|
||||
const supports = Object.fromEntries(capabilities.map(capability => [capability.key, capability.status === 'supported'])) as Record<string, boolean>
|
||||
@@ -266,6 +290,58 @@ function pushOptional(args: string[], flag: string, value?: string | number | nu
|
||||
if (value !== undefined && value !== null && String(value).trim() !== '') args.push(flag, String(value))
|
||||
}
|
||||
|
||||
function textFromExecValue(value: unknown): string {
|
||||
if (Buffer.isBuffer(value)) return value.toString('utf8')
|
||||
return value === undefined || value === null ? '' : String(value)
|
||||
}
|
||||
|
||||
async function execKanbanMutation(args: string[], logMessage: string, errorPrefix: string): Promise<string> {
|
||||
try {
|
||||
const { stdout, stderr } = await execFileAsync(HERMES_BIN, args, {
|
||||
maxBuffer: 50 * 1024 * 1024,
|
||||
timeout: 30000,
|
||||
...execOpts,
|
||||
})
|
||||
const stderrText = textFromExecValue(stderr).trim()
|
||||
if (stderrText) throw new Error(stderrText)
|
||||
return textFromExecValue(stdout)
|
||||
} catch (err: any) {
|
||||
logger.error(err, logMessage)
|
||||
throw new Error(`${errorPrefix}: ${err.message}`)
|
||||
}
|
||||
}
|
||||
|
||||
export function buildWatchArgs(opts?: KanbanWatchOptions): string[] {
|
||||
const args = [...boardArgs(opts?.board), 'watch']
|
||||
pushOptional(args, '--interval', opts?.interval ?? 0.5)
|
||||
return args
|
||||
}
|
||||
|
||||
export function watchEvents(opts?: KanbanWatchOptions): ChildProcess {
|
||||
return spawn(HERMES_BIN, buildWatchArgs(opts), {
|
||||
stdio: ['ignore', 'pipe', 'pipe'],
|
||||
...execOpts,
|
||||
})
|
||||
}
|
||||
|
||||
export async function linkTasks(parentId: string, childId: string, opts?: KanbanBoardOptions): Promise<{ ok: boolean; output: string }> {
|
||||
const output = await execKanbanMutation(
|
||||
[...boardArgs(opts?.board), 'link', parentId, childId],
|
||||
'Hermes CLI: kanban link failed',
|
||||
'Failed to link kanban tasks',
|
||||
)
|
||||
return { ok: true, output }
|
||||
}
|
||||
|
||||
export async function unlinkTasks(parentId: string, childId: string, opts?: KanbanBoardOptions): Promise<{ ok: boolean; output: string }> {
|
||||
const output = await execKanbanMutation(
|
||||
[...boardArgs(opts?.board), 'unlink', parentId, childId],
|
||||
'Hermes CLI: kanban unlink failed',
|
||||
'Failed to unlink kanban tasks',
|
||||
)
|
||||
return { ok: true, output }
|
||||
}
|
||||
|
||||
export async function addComment(taskId: string, body: string, opts?: KanbanBoardOptions & { author?: string }): Promise<{ ok: boolean; output: string }> {
|
||||
const args = [...boardArgs(opts?.board), 'comment', taskId, body]
|
||||
pushOptional(args, '--author', opts?.author)
|
||||
@@ -476,57 +552,74 @@ export async function completeTasks(taskIds: string[], summary?: string, opts?:
|
||||
const args = [...boardArgs(opts?.board), 'complete', ...taskIds]
|
||||
if (summary) args.push('--summary', summary)
|
||||
|
||||
try {
|
||||
await execFileAsync(HERMES_BIN, args, {
|
||||
maxBuffer: 50 * 1024 * 1024,
|
||||
timeout: 30000,
|
||||
...execOpts,
|
||||
})
|
||||
} catch (err: any) {
|
||||
logger.error(err, 'Hermes CLI: kanban complete failed')
|
||||
throw new Error(`Failed to complete kanban tasks: ${err.message}`)
|
||||
}
|
||||
await execKanbanMutation(args, 'Hermes CLI: kanban complete failed', 'Failed to complete kanban tasks')
|
||||
}
|
||||
|
||||
export async function blockTask(taskId: string, reason: string, opts?: KanbanBoardOptions): Promise<void> {
|
||||
try {
|
||||
await execFileAsync(HERMES_BIN, [...boardArgs(opts?.board), 'block', taskId, reason], {
|
||||
maxBuffer: 50 * 1024 * 1024,
|
||||
timeout: 30000,
|
||||
...execOpts,
|
||||
})
|
||||
} catch (err: any) {
|
||||
logger.error(err, 'Hermes CLI: kanban block failed')
|
||||
throw new Error(`Failed to block kanban task: ${err.message}`)
|
||||
}
|
||||
await execKanbanMutation(
|
||||
[...boardArgs(opts?.board), 'block', taskId, reason],
|
||||
'Hermes CLI: kanban block failed',
|
||||
'Failed to block kanban task',
|
||||
)
|
||||
}
|
||||
|
||||
export async function unblockTasks(taskIds: string[], opts?: KanbanBoardOptions): Promise<void> {
|
||||
try {
|
||||
await execFileAsync(HERMES_BIN, [...boardArgs(opts?.board), 'unblock', ...taskIds], {
|
||||
maxBuffer: 50 * 1024 * 1024,
|
||||
timeout: 30000,
|
||||
...execOpts,
|
||||
})
|
||||
} catch (err: any) {
|
||||
logger.error(err, 'Hermes CLI: kanban unblock failed')
|
||||
throw new Error(`Failed to unblock kanban tasks: ${err.message}`)
|
||||
}
|
||||
await execKanbanMutation(
|
||||
[...boardArgs(opts?.board), 'unblock', ...taskIds],
|
||||
'Hermes CLI: kanban unblock failed',
|
||||
'Failed to unblock kanban tasks',
|
||||
)
|
||||
}
|
||||
|
||||
export async function assignTask(taskId: string, profile: string, opts?: KanbanBoardOptions): Promise<void> {
|
||||
try {
|
||||
await execFileAsync(HERMES_BIN, [...boardArgs(opts?.board), 'assign', taskId, profile], {
|
||||
maxBuffer: 50 * 1024 * 1024,
|
||||
timeout: 30000,
|
||||
...execOpts,
|
||||
})
|
||||
} catch (err: any) {
|
||||
logger.error(err, 'Hermes CLI: kanban assign failed')
|
||||
throw new Error(`Failed to assign kanban task: ${err.message}`)
|
||||
await execKanbanMutation(
|
||||
[...boardArgs(opts?.board), 'assign', taskId, profile],
|
||||
'Hermes CLI: kanban assign failed',
|
||||
'Failed to assign kanban task',
|
||||
)
|
||||
}
|
||||
|
||||
export async function archiveTasks(taskIds: string[], opts?: KanbanBoardOptions): Promise<void> {
|
||||
await execKanbanMutation(
|
||||
[...boardArgs(opts?.board), 'archive', ...taskIds],
|
||||
'Hermes CLI: kanban archive failed',
|
||||
'Failed to archive kanban tasks',
|
||||
)
|
||||
}
|
||||
|
||||
async function applyBulkStatus(taskId: string, opts: KanbanBulkTaskUpdateOptions): Promise<void> {
|
||||
switch (opts.status) {
|
||||
case undefined:
|
||||
return
|
||||
case 'done':
|
||||
return completeTasks([taskId], opts.summary, opts)
|
||||
case 'blocked':
|
||||
return blockTask(taskId, opts.reason?.trim() || 'Bulk update', opts)
|
||||
case 'ready':
|
||||
return unblockTasks([taskId], opts)
|
||||
case 'archived':
|
||||
return archiveTasks([taskId], opts)
|
||||
default:
|
||||
throw new Error(`Bulk status ${opts.status} is not supported by the CLI bridge`)
|
||||
}
|
||||
}
|
||||
|
||||
export async function bulkUpdateTasks(opts: KanbanBulkTaskUpdateOptions): Promise<KanbanBulkTaskUpdateResult> {
|
||||
const ids = opts.ids.map(id => id.trim()).filter(Boolean)
|
||||
const results: KanbanBulkTaskResult[] = []
|
||||
for (const id of ids) {
|
||||
try {
|
||||
if (opts.archive) await archiveTasks([id], opts)
|
||||
else await applyBulkStatus(id, opts)
|
||||
if (opts.assignee !== undefined) await assignTask(id, opts.assignee?.trim() || 'none', opts)
|
||||
results.push({ id, ok: true })
|
||||
} catch (err: any) {
|
||||
results.push({ id, ok: false, error: err?.message || String(err) })
|
||||
}
|
||||
}
|
||||
return { results }
|
||||
}
|
||||
|
||||
export async function getStats(opts?: KanbanBoardOptions): Promise<KanbanStats> {
|
||||
try {
|
||||
const { stdout } = await execFileAsync(HERMES_BIN, [...boardArgs(opts?.board), 'stats', '--json'], {
|
||||
|
||||
Reference in New Issue
Block a user