Fix bridge history, profile models, and Windows gateway handling (#845)

* feat: support profile-aware group chat bridge flows

* feat: route cron jobs through hermes cli

* Fix group chat routing and isolate bridge tests

* Add Grok image-to-video media skill

* Default Grok videos to media directory

* Fix bridge profile fallback and cron repeat clearing

* Refine bridge chat and gateway platform handling

* Filter bridge tool-call text deltas

* Preserve structured bridge chat history

* Prepare beta release build artifacts

* Fix Windows run profile resolution

* Fix Windows path compatibility checks

* Fix profile-scoped model page display

* Hide Windows subprocess windows for jobs and updates

* Hide Windows file backend subprocess windows

* Avoid Windows gateway restart lock conflicts

* Treat Windows gateway lock as running on startup

* Force release Windows gateway lock on restart

* Tighten Windows gateway lock cleanup

* Update chat e2e source expectation

* Bump package version to 0.5.30

---------

Co-authored-by: Codex <codex@openai.com>
This commit is contained in:
ekko
2026-05-19 16:09:59 +08:00
committed by GitHub
parent 3d74d78698
commit 9a9416c99c
129 changed files with 7017 additions and 1838 deletions
@@ -1,8 +1,10 @@
import { io, Socket } from 'socket.io-client'
import { getToken } from '../../../services/auth'
import type { GatewayManager } from '../gateway-manager'
import { logger } from '../../../services/logger'
import { updateUsage } from '../../../db/hermes/usage-store'
import { AgentBridgeClient, type AgentBridgeMessage, type AgentBridgeOutput } from '../agent-bridge'
import { convertContentBlocksForAgent, isContentBlockArray } from '../run-chat/content-blocks'
import type { ContentBlock } from '../run-chat/types'
// ─── Types ────────────────────────────────────────────────────
@@ -22,6 +24,15 @@ interface MessageData {
timestamp: number
}
type MentionMessage = {
content: string
senderName: string
senderId: string
timestamp: number
input?: string | ContentBlock[]
mentionDepth?: number
}
interface MemberData {
id: string
name: string
@@ -55,9 +66,10 @@ class AgentClient {
private joinedRooms = new Set<string>()
private handlers: AgentEventHandler
private _reconnecting = false
private gatewayManager: GatewayManager | null = null
private contextEngine: any = null
private storage: any = null
private pendingToolCallIds = new Map<string, string[]>()
private pendingToolBaseIds = new Map<string, string>()
constructor(config: AgentConfig, handlers: AgentEventHandler = {}) {
this.agentId = Date.now().toString(36) + Math.random().toString(36).slice(2, 8)
@@ -75,10 +87,6 @@ class AgentClient {
return this.socket?.id
}
setGatewayManager(manager: GatewayManager): void {
this.gatewayManager = manager
}
setContextEngine(engine: any): void {
this.contextEngine = engine
}
@@ -146,10 +154,10 @@ class AgentClient {
})
}
sendMessage(roomId: string, content: string): Promise<string> {
sendMessage(roomId: string, content: string, messageId?: string, extra?: Record<string, unknown>): Promise<string> {
this.ensureConnected()
return new Promise((resolve, reject) => {
this.socket!.emit('message', { roomId, content }, (res: { id?: string; error?: string }) => {
this.socket!.emit('message', { roomId, content, id: messageId, ...extra }, (res: { id?: string; error?: string }) => {
if (res.error) {
reject(new Error(res.error))
} else {
@@ -174,6 +182,52 @@ class AgentClient {
this.socket!.emit('context_status', { roomId, agentName: this.name, status })
}
emitApprovalRequested(roomId: string, payload: Record<string, unknown>): void {
this.ensureConnected()
this.socket!.emit('approval.requested', { roomId, agentName: this.name, ...payload })
}
emitApprovalResolved(roomId: string, payload: Record<string, unknown>): void {
this.ensureConnected()
this.socket!.emit('approval.resolved', { roomId, agentName: this.name, ...payload })
}
async interrupt(roomId: string): Promise<void> {
const sessionSeed = String(this.storage?.getRoom?.(roomId)?.sessionSeed || '0')
const sessionId = groupBridgeSessionId(roomId, this.profile, this.name, sessionSeed)
await new AgentBridgeClient().interrupt(sessionId, 'Interrupted by group chat user', this.profile)
this.stopTyping(roomId)
this.emitContextStatus(roomId, 'ready')
}
emitMessageStreamStart(roomId: string, messageId: string): void {
this.ensureConnected()
this.socket!.emit('message_stream_start', {
roomId,
id: messageId,
senderId: this.socket?.id || this.agentId,
senderName: this.name,
timestamp: Date.now(),
})
}
emitMessageStreamDelta(roomId: string, messageId: string, delta: string): void {
if (!delta) return
this.ensureConnected()
this.socket!.emit('message_stream_delta', { roomId, id: messageId, delta })
}
emitMessageReasoningDelta(roomId: string, messageId: string, delta: string): void {
if (!delta) return
this.ensureConnected()
this.socket!.emit('message_reasoning_delta', { roomId, id: messageId, delta })
}
emitMessageStreamEnd(roomId: string, messageId: string): void {
this.ensureConnected()
this.socket!.emit('message_stream_end', { roomId, id: messageId })
}
getJoinedRooms(): string[] {
return Array.from(this.joinedRooms)
}
@@ -193,23 +247,10 @@ class AgentClient {
*/
async replyToMention(
roomId: string,
msg: { content: string; senderName: string; senderId: string; timestamp: number },
msg: MentionMessage,
onStatus?: (status: 'compressing' | 'replying' | 'ready') => void,
): Promise<void> {
logger.debug(`[AgentClients] ${this.name} mentioned by ${msg.senderName}: "${msg.content.slice(0, 50)}"`)
if (!this.gatewayManager) {
logger.debug(`[AgentClients] ${this.name}: gatewayManager is null, skipping`)
return
}
const upstream = this.gatewayManager.getUpstream(this.profile)
const apiKey = this.gatewayManager.getApiKey(this.profile)
logger.debug(`[AgentClients] ${this.name}: upstream=${upstream}, profile=${this.profile}`)
if (!upstream) {
logger.error(`[AgentClients] ${this.name}: no gateway upstream for profile "${this.profile}"`)
return
}
try {
// Notify room that agent is typing
this.startTyping(roomId)
@@ -244,8 +285,8 @@ class AgentClient {
roomName: roomId,
memberNames,
members,
upstream,
apiKey,
upstream: '',
apiKey: null,
currentMessage: msg,
compression,
profile: this.profile,
@@ -261,86 +302,101 @@ class AgentClient {
}
}
// Strip @mention from input — agent already knows it was mentioned
const input = msg.content.replace(new RegExp(`@${this.name.replace(/[.*+?^${}()|[\]\\]/g, '\\$&')}\\s*`, 'gi'), '').trim() || msg.content
const responseRes = await fetch(`${upstream.replace(/\/$/, '')}/v1/responses`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...(apiKey ? { Authorization: `Bearer ${apiKey}` } : {}),
// Keep the original mentions visible and add an explicit routing note.
// When a user mentions multiple agents, stripping only this agent's
// name can make the remaining input look like it was meant for
// someone else.
const routedPrefix = `群聊系统:这条消息已经提及你(${this.name}),请直接回复;即使消息同时提及其他成员,也不要因此输出空回复。`
const ownMentionPattern = new RegExp(`@${this.name.replace(/[.*+?^${}()|[\]\\]/g, '\\$&')}\\s*`, 'gi')
const rawInput = msg.input || msg.content
const input = isContentBlockArray(rawInput)
? rawInput.map((block) => {
if (block.type !== 'text') return block
const text = String(block.text || msg.content).replace(ownMentionPattern, '').trim()
return { ...block, text: `${routedPrefix}\n\n原始消息:${text || msg.content}` }
})
: `${routedPrefix}\n\n原始消息:${msg.content.replace(ownMentionPattern, '').trim() || msg.content}`
const bridgeInput: AgentBridgeMessage = isContentBlockArray(input)
? await convertContentBlocksForAgent(input)
: input
const bridge = new AgentBridgeClient()
const sessionSeed = String(this.storage?.getRoom?.(roomId)?.sessionSeed || '0')
const sessionId = groupBridgeSessionId(roomId, this.profile, this.name, sessionSeed)
const runMessageId = groupMessageId(roomId, this.profile, this.name)
let partIndex = 0
let streamMessageId = groupMessagePartId(runMessageId, partIndex)
let currentContent = ''
let totalContent = ''
let reasoningContent = ''
const flushedAssistantParts = new Set<string>()
let lastChunk: AgentBridgeOutput | null = null
const started = await bridge.chat(
sessionId,
bridgeInput,
conversationHistory,
instructions,
this.profile,
{
source: 'api_server',
},
body: JSON.stringify({
input,
...(conversationHistory.length > 0 ? { conversation_history: conversationHistory } : {}),
...(instructions ? { instructions } : {}),
stream: true,
store: false,
}),
signal: AbortSignal.timeout(120000),
})
)
if (!responseRes.ok) {
const text = await responseRes.text().catch(() => '')
logger.error(`[AgentClients] ${this.name}: gateway response failed (${responseRes.status}): ${text}`)
this.stopTyping(roomId)
onStatus?.('ready')
return
}
if (!responseRes.body) {
logger.error(`[AgentClients] ${this.name}: gateway response stream missing`)
this.stopTyping(roomId)
onStatus?.('ready')
return
}
let fullContent = ''
for await (const frame of readSseFrames(responseRes.body)) {
let parsed: any
try {
parsed = JSON.parse(frame.data)
} catch {
continue
}
const eventType = parsed.type || frame.event || parsed.event
logger.debug(`[AgentClients] ${this.name}: event=${eventType}`)
if (eventType === 'response.output_text.delta' && parsed.delta) {
fullContent += parsed.delta
continue
}
if (eventType === 'response.completed') {
const response = parsed.response || parsed
const finalText = extractResponseText(response)
if (!fullContent && finalText) fullContent = finalText
const usage = response.usage || {}
updateUsage(roomId, {
inputTokens: usage.input_tokens ?? usage.inputTokens ?? 0,
outputTokens: usage.output_tokens ?? usage.outputTokens ?? 0,
cacheReadTokens: usage.cache_read_tokens ?? usage.cacheReadTokens ?? 0,
cacheWriteTokens: usage.cache_write_tokens ?? usage.cacheWriteTokens ?? 0,
reasoningTokens: usage.reasoning_tokens ?? usage.reasoningTokens ?? 0,
model: response.model || '',
profile: this.profile,
})
logger.debug(`[AgentClients] ${this.name}: response completed, content length=${fullContent.length}`)
if (fullContent) {
this.stopTyping(roomId)
this.sendMessage(roomId, fullContent)
this.emitMessageStreamStart(roomId, streamMessageId)
for await (const chunk of bridge.streamOutput(started.run_id, { timeoutMs: 120000 })) {
lastChunk = chunk
reasoningContent += await this.recordBridgeEvents(roomId, chunk, () => streamMessageId, async () => {
const toolBaseId = streamMessageId
if (currentContent.trim()) {
await this.sendMessage(roomId, currentContent, streamMessageId, {
role: 'assistant',
mentionDepth: nextMentionDepth(msg),
reasoning: reasoningContent || null,
reasoning_content: reasoningContent || null,
})
flushedAssistantParts.add(streamMessageId)
currentContent = ''
}
onStatus?.('ready')
return
}
if (eventType === 'response.failed') {
logger.error(`[AgentClients] ${this.name}: response failed`)
this.stopTyping(roomId)
onStatus?.('ready')
return
this.emitMessageStreamEnd(roomId, toolBaseId)
partIndex += 1
streamMessageId = groupMessagePartId(runMessageId, partIndex)
this.emitMessageStreamStart(roomId, streamMessageId)
return toolBaseId
})
if (chunk.delta) {
currentContent += chunk.delta
totalContent += chunk.delta
this.emitMessageStreamDelta(roomId, streamMessageId, chunk.delta)
}
}
logger.warn(`[AgentClients] ${this.name}: response stream ended without terminal event`)
if (lastChunk?.status === 'error') {
logger.error(`[AgentClients] ${this.name}: bridge response failed: ${lastChunk.error || 'unknown error'}`)
this.emitMessageStreamEnd(roomId, streamMessageId)
this.stopTyping(roomId)
onStatus?.('ready')
return
}
if (!totalContent) {
currentContent = extractBridgeFinalText(lastChunk)
totalContent = currentContent
}
recordBridgeUsage(roomId, this.profile, lastChunk?.result)
logger.debug(`[AgentClients] ${this.name}: bridge response completed, content length=${totalContent.length}`)
if (currentContent) {
this.stopTyping(roomId)
await this.sendMessage(roomId, currentContent, streamMessageId, {
role: 'assistant',
mentionDepth: nextMentionDepth(msg),
reasoning: reasoningContent || null,
reasoning_content: reasoningContent || null,
})
this.emitMessageStreamEnd(roomId, streamMessageId)
onStatus?.('ready')
return
}
logger.warn(`[AgentClients] ${this.name}: bridge response completed without content`)
this.emitMessageStreamEnd(roomId, streamMessageId)
this.stopTyping(roomId)
onStatus?.('ready')
} catch (err: any) {
@@ -350,6 +406,132 @@ class AgentClient {
}
}
private async recordBridgeEvents(
roomId: string,
chunk: AgentBridgeOutput,
getCurrentMessageId: () => string,
beforeToolStarted: () => Promise<string>,
): Promise<string> {
let reasoning = ''
for (const ev of chunk.events || []) {
const eventType = String((ev as any)?.event || '')
if (eventType === 'tool.started') {
const toolBaseId = await beforeToolStarted()
this.recordToolStarted(roomId, ev as Record<string, unknown>, toolBaseId)
} else if (eventType === 'tool.completed') {
this.recordToolCompleted(roomId, ev as Record<string, unknown>)
} else if (eventType === 'approval.requested') {
this.emitApprovalRequested(roomId, {
event: 'approval.requested',
approval_id: (ev as any).approval_id,
command: (ev as any).command,
description: (ev as any).description,
choices: Array.isArray((ev as any).choices) ? (ev as any).choices : undefined,
allow_permanent: (ev as any).allow_permanent,
})
} else if (eventType === 'approval.resolved') {
this.emitApprovalResolved(roomId, {
event: 'approval.resolved',
approval_id: (ev as any).approval_id,
choice: (ev as any).choice,
})
} else if (eventType === 'reasoning.delta' || eventType === 'thinking.delta') {
const text = String((ev as any)?.text || '')
reasoning += text
this.emitMessageReasoningDelta(roomId, getCurrentMessageId(), text)
}
}
return reasoning
}
private recordToolStarted(roomId: string, ev: Record<string, unknown>, runMessageId: string): void {
const toolName = String(ev.tool_name || ev.tool || ev.name || '')
const toolCallId = groupToolCallId(ev.tool_call_id, toolName, this.nextToolIndex(roomId, toolName))
this.trackPendingToolCall(roomId, toolName, toolCallId)
this.pendingToolBaseIds.set(toolCallId, runMessageId)
const timestamp = Date.now()
const rawArgs = ev.args ?? ev.arguments ?? ev.input ?? {}
const args = normalizeToolArgs(rawArgs)
const toolCall = {
id: toolCallId,
type: 'function',
function: {
name: toolName,
arguments: JSON.stringify(args),
},
}
const msg: MessageData & Record<string, any> = {
id: `${runMessageId}_toolcall_${safeId(toolCallId)}`,
roomId,
senderId: this.socket?.id || this.agentId,
senderName: this.name,
content: '',
timestamp,
role: 'assistant',
tool_calls: [toolCall],
finish_reason: 'tool_calls',
}
this.sendMessage(roomId, '', msg.id, {
role: 'assistant',
tool_calls: msg.tool_calls,
finish_reason: 'tool_calls',
timestamp,
}).catch((err: any) => logger.warn(`[AgentClients] failed to record tool call: ${err.message}`))
}
private recordToolCompleted(roomId: string, ev: Record<string, unknown>): void {
const toolName = String(ev.tool_name || ev.tool || ev.name || '')
const rawId = String(ev.tool_call_id || '').trim()
const toolCallId = rawId || this.takePendingToolCall(roomId, toolName) || groupToolCallId(null, toolName, this.nextToolIndex(roomId, toolName))
const runMessageId = this.pendingToolBaseIds.get(toolCallId) || groupMessagePartId(groupMessageId(roomId, this.profile, this.name), 0)
this.pendingToolBaseIds.delete(toolCallId)
const output = bridgeToolOutput(ev)
const timestamp = Date.now()
const msg: MessageData & Record<string, any> = {
id: `${runMessageId}_toolresult_${safeId(toolCallId)}_${Date.now()}`,
roomId,
senderId: this.socket?.id || this.agentId,
senderName: this.name,
content: output,
timestamp,
role: 'tool',
tool_call_id: toolCallId,
tool_name: toolName || null,
}
this.sendMessage(roomId, output, msg.id, {
role: 'tool',
tool_call_id: toolCallId,
tool_name: toolName || null,
timestamp,
}).catch((err: any) => logger.warn(`[AgentClients] failed to record tool result: ${err.message}`))
}
private pendingToolKey(roomId: string, toolName: string): string {
return `${roomId}::${toolName || 'tool'}`
}
private trackPendingToolCall(roomId: string, toolName: string, toolCallId: string): void {
const key = this.pendingToolKey(roomId, toolName)
const list = this.pendingToolCallIds.get(key) || []
list.push(toolCallId)
this.pendingToolCallIds.set(key, list)
}
private takePendingToolCall(roomId: string, toolName: string): string | undefined {
const key = this.pendingToolKey(roomId, toolName)
const list = this.pendingToolCallIds.get(key)
if (!list?.length) return undefined
const id = list.shift()
if (list.length) this.pendingToolCallIds.set(key, list)
else this.pendingToolCallIds.delete(key)
return id
}
private nextToolIndex(roomId: string, toolName: string): number {
const key = this.pendingToolKey(roomId, toolName)
return (this.pendingToolCallIds.get(key)?.length || 0) + 1
}
private bindEvents(): void {
const s = this.socket!
@@ -387,77 +569,79 @@ class AgentClient {
}
}
async function* readSseFrames(stream: ReadableStream<Uint8Array>): AsyncGenerator<{ event?: string; data: string }> {
const decoder = new TextDecoder()
const reader = stream.getReader()
let buffer = ''
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
let boundary = buffer.indexOf('\n\n')
while (boundary >= 0) {
const raw = buffer.slice(0, boundary)
buffer = buffer.slice(boundary + 2)
const frame = parseSseFrame(raw)
if (frame?.data) yield frame
boundary = buffer.indexOf('\n\n')
}
}
buffer += decoder.decode()
const frame = parseSseFrame(buffer)
if (frame?.data) yield frame
} finally {
reader.releaseLock()
}
function groupBridgeSessionId(roomId: string, profile: string, name: string, sessionSeed: string): string {
const raw = `gc_${roomId}_${profile}_${name}_${sessionSeed || '0'}`
return raw.replace(/[^a-zA-Z0-9_-]/g, '_').slice(0, 120)
}
function parseSseFrame(raw: string): { event?: string; data: string } | null {
let event: string | undefined
const data: string[] = []
for (const line of raw.split(/\r?\n/)) {
if (!line || line.startsWith(':')) continue
if (line.startsWith('event:')) {
event = line.slice(6).trim()
} else if (line.startsWith('data:')) {
data.push(line.slice(5).trimStart())
}
}
if (data.length === 0) return null
return { event, data: data.join('\n') }
function groupMessageId(roomId: string, profile: string, name: string): string {
const raw = `gcmsg_${safeId(roomId)}_${safeId(profile)}_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`
return raw.replace(/[^a-zA-Z0-9_-]/g, '_').slice(0, 160)
}
function extractResponseText(response: any): string {
const output = Array.isArray(response?.output) ? response.output : []
const parts: string[] = []
for (const item of output) {
if (item.type !== 'message') continue
const content = Array.isArray(item.content) ? item.content : []
for (const part of content) {
if (part.type === 'output_text' || part.type === 'text') {
parts.push(part.text || '')
}
function groupMessagePartId(runMessageId: string, partIndex: number): string {
return `${safeId(runMessageId)}_part_${partIndex}`
}
function groupToolCallId(rawToolCallId: unknown, toolName: string, index: number): string {
const raw = String(rawToolCallId || '').trim()
if (raw) return raw
return `cli_${safeId(toolName || 'tool')}_${Date.now()}_${index}`
}
function safeId(value: string): string {
return String(value || 'item').replace(/[^a-zA-Z0-9_-]/g, '_').slice(0, 80)
}
function bridgeToolOutput(ev: Record<string, unknown>): string {
const value = ev.result ?? ev.output ?? ev.result_preview ?? ev.preview ?? ''
return typeof value === 'string' ? value : JSON.stringify(value ?? '')
}
function normalizeToolArgs(value: unknown): Record<string, unknown> {
if (!value) return {}
if (typeof value === 'string') {
try {
const parsed = JSON.parse(value)
return parsed && typeof parsed === 'object' && !Array.isArray(parsed) ? parsed as Record<string, unknown> : { value }
} catch {
return { value }
}
}
if (parts.length > 0) return parts.join('')
return typeof response?.output_text === 'string' ? response.output_text : ''
return typeof value === 'object' && !Array.isArray(value) ? value as Record<string, unknown> : { value }
}
function extractBridgeFinalText(chunk: AgentBridgeOutput | null): string {
const result = chunk?.result as any
const output = result?.final_response || chunk?.output || ''
return typeof output === 'string' ? output.trim() : ''
}
function recordBridgeUsage(roomId: string, profile: string, result: unknown): void {
const payload = result as any
const usage = payload?.usage || payload?.response?.usage
if (!usage) return
updateUsage(roomId, {
inputTokens: usage.input_tokens ?? usage.inputTokens ?? 0,
outputTokens: usage.output_tokens ?? usage.outputTokens ?? 0,
cacheReadTokens: usage.cache_read_tokens ?? usage.cacheReadTokens ?? 0,
cacheWriteTokens: usage.cache_write_tokens ?? usage.cacheWriteTokens ?? 0,
reasoningTokens: usage.reasoning_tokens ?? usage.reasoningTokens ?? 0,
model: payload?.model || payload?.response?.model || '',
profile,
})
}
// ─── AgentClients (roomId -> agents) ──────────────────────────
export class AgentClients {
private rooms = new Map<string, Map<string, AgentClient>>()
private _gatewayManager: GatewayManager | null = null
private _contextEngine: any = null
private _storage: any = null
// Per-room processing lock + mention queue
private _processingRooms = new Set<string>()
private _mentionQueue = new Map<string, Array<{ agent: AgentClient; msg: { content: string; senderName: string; senderId: string; timestamp: number } }>>()
private _mentionQueue = new Map<string, Array<{ agent: AgentClient; msg: MentionMessage }>>()
/**
* Create an agent client and connect it to the server.
@@ -468,7 +652,6 @@ export class AgentClients {
await client.connect(port)
// Auto-apply stored references (fixes propagation for agents created after set*)
if (this._gatewayManager) client.setGatewayManager(this._gatewayManager)
if (this._contextEngine) client.setContextEngine(this._contextEngine)
if (this._storage) client.setStorage(this._storage)
@@ -557,6 +740,13 @@ export class AgentClients {
return Promise.all(agents.map((agent) => agent.sendMessage(roomId, content)))
}
async interruptAgent(roomId: string, agentName: string): Promise<void> {
const agent = this.getAgents(roomId).find(a => a.name === agentName)
if (!agent) throw new Error(`Agent "${agentName}" not found in room "${roomId}"`)
this._mentionQueue.delete(`${roomId}:${agent.name}`)
await agent.interrupt(roomId)
}
/**
* Disconnect all agents in a room.
*/
@@ -576,7 +766,12 @@ export class AgentClients {
resetRoomContext(roomId: string): void {
this._mentionQueue.delete(roomId)
this._processingRooms.delete(roomId)
for (const key of Array.from(this._mentionQueue.keys())) {
if (key.startsWith(`${roomId}:`)) this._mentionQueue.delete(key)
}
for (const key of Array.from(this._processingRooms)) {
if (key.startsWith(`${roomId}:`)) this._processingRooms.delete(key)
}
if (this._contextEngine) {
try { this._contextEngine.invalidateRoom(roomId) } catch { /* ignore */ }
}
@@ -593,16 +788,6 @@ export class AgentClients {
logger.info('[AgentClients] All agents disconnected')
}
/**
* Set gateway manager for all existing and future agents.
*/
setGatewayManager(manager: GatewayManager): void {
this._gatewayManager = manager
this.rooms.forEach((room) => {
room.forEach((client) => client.setGatewayManager(manager))
})
}
/**
* Set context engine for all existing and future agents.
*/
@@ -628,13 +813,14 @@ export class AgentClients {
* Server-side: parse @mentions and forward to matching agents directly.
* If the room is already processing (compressing/replying), queue the mention.
*/
async processMentions(roomId: string, msg: { content: string; senderName: string; senderId: string; timestamp: number }): Promise<void> {
if (!this._gatewayManager) return
const content = msg.content.toLowerCase()
async processMentions(roomId: string, msg: MentionMessage): Promise<void> {
const agents = this.getAgents(roomId)
const senderName = msg.senderName.toLowerCase()
const mentioned = agents.filter(a => content.includes(`@${a.name.toLowerCase()}`))
const mentioned = agents.filter(a => (
a.name.toLowerCase() !== senderName &&
isAgentMentioned(msg.content, a.name)
))
if (mentioned.length === 0) return
logger.debug(`[AgentClients] ${mentioned.map(a => a.name).join(', ')} mentioned by ${msg.senderName}`)
@@ -652,7 +838,7 @@ export class AgentClients {
private async _processAgentMention(
roomId: string,
agent: AgentClient,
msg: { content: string; senderName: string; senderId: string; timestamp: number },
msg: MentionMessage,
): Promise<void> {
const agentKey = `${roomId}:${agent.name}`
if (this._processingRooms.has(agentKey)) {
@@ -693,9 +879,16 @@ export class AgentClients {
// Process the last queued mention only (most recent, discards stale intermediate ones)
const last = queue[queue.length - 1]
this._processingRooms.add(agentKey)
this._processAgentMention(roomId, last.agent, last.msg).catch((err) => {
logger.error(`[AgentClients] error processing queued mention: ${err.message}`)
})
await this._processAgentMention(roomId, last.agent, last.msg)
}
}
function nextMentionDepth(msg: MentionMessage): number {
return Math.max(0, msg.mentionDepth || 0) + 1
}
function isAgentMentioned(content: string, agentName: string): boolean {
const escaped = agentName.replace(/[.*+?^${}()|[\]\\]/g, '\\$&')
const pattern = new RegExp(`@${escaped}(?=$|\\s|[.,!?;:,。!?;:])`, 'i')
return pattern.test(content)
}
@@ -6,6 +6,8 @@ import { getDb } from '../../../db'
import { AgentClients } from './agent-clients'
import { ContextEngine } from '../context-engine/compressor'
import { SessionDeleter } from '../session-deleter'
import { countTokens, SUMMARY_PREFIX } from '../../../lib/context-compressor'
import { AgentBridgeClient } from '../agent-bridge'
// ─── Types ────────────────────────────────────────────────────
@@ -16,6 +18,43 @@ interface ChatMessage {
senderName: string
content: string
timestamp: number
role?: string
tool_call_id?: string | null
tool_calls?: any[] | null
tool_name?: string | null
finish_reason?: string | null
reasoning?: string | null
reasoning_details?: string | null
reasoning_content?: string | null
mentionDepth?: number
}
function contentToStorageString(content: unknown): string {
if (typeof content === 'string') return content
return JSON.stringify(content ?? '')
}
function contentToText(content: unknown): string {
if (typeof content === 'string') {
const trimmed = content.trim()
if (trimmed.startsWith('[') && trimmed.endsWith(']')) {
try {
return contentToText(JSON.parse(trimmed))
} catch {
return content
}
}
return content
}
if (Array.isArray(content)) {
return content.map((block: any) => {
if (block?.type === 'text') return block.text || ''
if (block?.type === 'image') return `[Image: ${block.name || block.path || ''}]`
if (block?.type === 'file') return `[File: ${block.name || block.path || ''}]`
return ''
}).filter(Boolean).join('\n')
}
return content == null ? '' : String(content)
}
interface RoomAgent {
@@ -64,6 +103,64 @@ export interface PendingSessionDeleteDrainResult {
failed: Array<{ sessionId: string; error: string }>
}
function parseJsonArray(value: unknown): any[] | null {
if (value == null || value === '') return null
if (Array.isArray(value)) return value
if (typeof value !== 'string') return null
try {
const parsed = JSON.parse(value)
return Array.isArray(parsed) ? parsed : null
} catch {
return null
}
}
function normalizeMessageRole(role: unknown): string {
const value = String(role || '').trim()
return ['user', 'assistant', 'tool', 'command'].includes(value) ? value : 'user'
}
function normalizeMentionDepth(depth: unknown): number {
const value = Number(depth)
return Number.isFinite(value) && value > 0 ? Math.floor(value) : 0
}
function groupRunOrder(id: string): { baseId: string; phase: number } {
const value = String(id || '')
const partMatch = value.match(/^(.*)_part_(\d+)(?:_(toolcall|toolresult)_.+)?$/)
if (partMatch) {
const part = Number(partMatch[2] || 0)
const kind = partMatch[3] || 'assistant'
const offset = kind === 'toolcall' ? 1 : kind === 'toolresult' ? 2 : 0
return { baseId: partMatch[1], phase: part * 3 + offset }
}
const toolIdx = value.indexOf('_toolcall_')
if (toolIdx >= 0) return { baseId: value.slice(0, toolIdx), phase: 0 }
const resultIdx = value.indexOf('_toolresult_')
if (resultIdx >= 0) return { baseId: value.slice(0, resultIdx), phase: 1 }
return { baseId: value, phase: 2 }
}
function sortGroupMessages<T extends { id: string; timestamp: number }>(messages: T[]): T[] {
const baseMinTimestamp = new Map<string, number>()
for (const msg of messages) {
const { baseId } = groupRunOrder(msg.id)
const existing = baseMinTimestamp.get(baseId)
if (existing == null || msg.timestamp < existing) baseMinTimestamp.set(baseId, msg.timestamp)
}
return [...messages].sort((a, b) => {
const ao = groupRunOrder(a.id)
const bo = groupRunOrder(b.id)
const at = baseMinTimestamp.get(ao.baseId) ?? a.timestamp
const bt = baseMinTimestamp.get(bo.baseId) ?? b.timestamp
if (at !== bt) return at - bt
if (ao.baseId !== bo.baseId) return ao.baseId.localeCompare(bo.baseId)
if (ao.phase !== bo.phase) return ao.phase - bo.phase
if (a.timestamp !== b.timestamp) return a.timestamp - b.timestamp
return a.id.localeCompare(b.id)
})
}
class ChatStorage {
private db() { return getDb() }
@@ -175,16 +272,16 @@ class ChatStorage {
// ─── Rooms ────────────────────────────────────────────────
getRoom(roomId: string): { id: string; name: string; inviteCode: string | null; triggerTokens: number; maxHistoryTokens: number; tailMessageCount: number; totalTokens: number } | undefined {
return this.db()?.prepare('SELECT id, name, inviteCode, triggerTokens, maxHistoryTokens, tailMessageCount, totalTokens FROM gc_rooms WHERE id = ?').get(roomId) as any
getRoom(roomId: string): { id: string; name: string; inviteCode: string | null; triggerTokens: number; maxHistoryTokens: number; tailMessageCount: number; totalTokens: number; sessionSeed: string } | undefined {
return this.db()?.prepare('SELECT id, name, inviteCode, triggerTokens, maxHistoryTokens, tailMessageCount, totalTokens, sessionSeed FROM gc_rooms WHERE id = ?').get(roomId) as any
}
getRoomByInviteCode(code: string): { id: string; name: string; inviteCode: string | null; triggerTokens: number; maxHistoryTokens: number; tailMessageCount: number; totalTokens: number } | undefined {
return this.db()?.prepare('SELECT id, name, inviteCode, triggerTokens, maxHistoryTokens, tailMessageCount, totalTokens FROM gc_rooms WHERE inviteCode = ?').get(code) as any
getRoomByInviteCode(code: string): { id: string; name: string; inviteCode: string | null; triggerTokens: number; maxHistoryTokens: number; tailMessageCount: number; totalTokens: number; sessionSeed: string } | undefined {
return this.db()?.prepare('SELECT id, name, inviteCode, triggerTokens, maxHistoryTokens, tailMessageCount, totalTokens, sessionSeed FROM gc_rooms WHERE inviteCode = ?').get(code) as any
}
getAllRooms(): { id: string; name: string; inviteCode: string | null; triggerTokens: number; maxHistoryTokens: number; tailMessageCount: number; totalTokens: number }[] {
return (this.db()?.prepare('SELECT id, name, inviteCode, triggerTokens, maxHistoryTokens, tailMessageCount, totalTokens FROM gc_rooms ORDER BY id').all() || []) as any[]
getAllRooms(): { id: string; name: string; inviteCode: string | null; triggerTokens: number; maxHistoryTokens: number; tailMessageCount: number; totalTokens: number; sessionSeed: string }[] {
return (this.db()?.prepare('SELECT id, name, inviteCode, triggerTokens, maxHistoryTokens, tailMessageCount, totalTokens, sessionSeed FROM gc_rooms ORDER BY id').all() || []) as any[]
}
saveRoom(id: string, name: string, inviteCode?: string, config?: { triggerTokens?: number; maxHistoryTokens?: number; tailMessageCount?: number }): void {
@@ -212,25 +309,132 @@ class ChatStorage {
this.db()?.prepare('UPDATE gc_rooms SET totalTokens = ? WHERE id = ?').run(tokens, roomId)
}
rotateRoomSessionSeed(roomId: string): string {
const seed = `${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`
this.db()?.prepare('UPDATE gc_rooms SET sessionSeed = ? WHERE id = ?').run(seed, roomId)
return seed
}
estimateTokens(text: string): number {
const cjk = (text.match(/[\u2e80-\u9fff\uac00-\ud7af\u3000-\u303f\uff00-\uffef]/g) || []).length
const other = text.length - cjk
return Math.ceil(cjk * 1.5 + other / 4)
}
private contentToUsageText(content: unknown): string {
if (typeof content === 'string') return content
if (!content) return ''
if (Array.isArray(content)) {
return content.map((block: any) => {
if (typeof block?.text === 'string') return block.text
if (typeof block?.type === 'string') return `[${block.type}]`
return String(block || '')
}).join('\n')
}
return String(content)
}
private estimateUsageTokensFromMessages(messages: ChatMessage[]): { inputTokens: number; outputTokens: number } {
const inputTokens = messages
.filter(m => (m.role || 'user') === 'user')
.reduce((sum, m) => sum + countTokens(this.contentToUsageText(m.content)), 0)
const outputTokens = messages
.filter(m => m.role === 'assistant' || m.role === 'tool')
.reduce((sum, m) => sum + countTokens(this.contentToUsageText(m.content)) + countTokens(String(m.tool_calls || '')), 0)
return { inputTokens, outputTokens }
}
private estimateRoomTotalTokens(roomId: string, messages: ChatMessage[]): number {
const snapshot = this.getContextSnapshot(roomId)
if (snapshot && messages.length) {
const snapshotIdx = messages.findIndex(m => m.id === snapshot.lastMessageId)
const newMessages = snapshotIdx >= 0
? messages.slice(snapshotIdx + 1)
: messages.filter(m => m.timestamp > snapshot.lastMessageTimestamp)
const newUsage = this.estimateUsageTokensFromMessages(newMessages)
return countTokens(SUMMARY_PREFIX + snapshot.summary) + newUsage.inputTokens + newUsage.outputTokens
}
const usage = this.estimateUsageTokensFromMessages(messages)
return usage.inputTokens + usage.outputTokens
}
// ─── Messages ─────────────────────────────────────────────
getMessages(roomId: string, limit = 500): ChatMessage[] {
const rows = (this.db()?.prepare(
'SELECT id, roomId, senderId, senderName, content, timestamp FROM gc_messages WHERE roomId = ? ORDER BY timestamp DESC LIMIT ?'
'SELECT id, roomId, senderId, senderName, content, timestamp, role, tool_call_id, tool_calls, tool_name, finish_reason, reasoning, reasoning_details, reasoning_content FROM gc_messages WHERE roomId = ? ORDER BY timestamp DESC LIMIT ?'
).all(roomId, limit) || []) as any[]
return rows.reverse()
return sortGroupMessages(rows.map(row => ({
...row,
tool_calls: parseJsonArray(row.tool_calls),
})))
}
getMessage(messageId: string): ChatMessage | null {
const row = this.db()?.prepare(
'SELECT id, roomId, senderId, senderName, content, timestamp, role, tool_call_id, tool_calls, tool_name, finish_reason, reasoning, reasoning_details, reasoning_content FROM gc_messages WHERE id = ?'
).get(messageId) as any
if (!row) return null
return {
...row,
tool_calls: parseJsonArray(row.tool_calls),
}
}
addMessage(msg: ChatMessage): void {
this.upsertMessage(msg)
}
upsertMessage(msg: ChatMessage): void {
const toolCallsJson = msg.tool_calls ? JSON.stringify(msg.tool_calls) : null
this.db()?.prepare(
'INSERT INTO gc_messages (id, roomId, senderId, senderName, content, timestamp) VALUES (?, ?, ?, ?, ?, ?)'
).run(msg.id, msg.roomId, msg.senderId, msg.senderName, msg.content, msg.timestamp)
`INSERT INTO gc_messages (id, roomId, senderId, senderName, content, timestamp, role, tool_call_id, tool_calls, tool_name, finish_reason, reasoning, reasoning_details, reasoning_content)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
+ ` ON CONFLICT(id) DO UPDATE SET
roomId = excluded.roomId,
senderId = excluded.senderId,
senderName = excluded.senderName,
content = excluded.content,
timestamp = excluded.timestamp,
role = excluded.role,
tool_call_id = excluded.tool_call_id,
tool_calls = excluded.tool_calls,
tool_name = excluded.tool_name,
finish_reason = excluded.finish_reason,
reasoning = excluded.reasoning,
reasoning_details = excluded.reasoning_details,
reasoning_content = excluded.reasoning_content`
).run(
msg.id, msg.roomId, msg.senderId, msg.senderName, msg.content, msg.timestamp,
msg.role || 'user',
msg.tool_call_id ?? null,
toolCallsJson,
msg.tool_name ?? null,
msg.finish_reason ?? null,
msg.reasoning ?? null,
msg.reasoning_details ?? null,
msg.reasoning_content ?? null,
)
}
saveMessageAndRefreshRoom(msg: ChatMessage, options: { preserveExistingTimestamp?: boolean } = {}): { message: ChatMessage; totalTokens: number } {
const db = this.db()
if (!db) return { message: msg, totalTokens: 0 }
db.exec('BEGIN IMMEDIATE')
try {
const existing = this.getMessage(msg.id)
const message = existing && options.preserveExistingTimestamp ? { ...msg, timestamp: existing.timestamp } : msg
this.upsertMessage(message)
this.pruneMessages(msg.roomId)
const messages = this.getMessages(msg.roomId)
const totalTokens = this.estimateRoomTotalTokens(msg.roomId, messages)
this.updateRoomTotalTokens(msg.roomId, totalTokens)
db.exec('COMMIT')
return { message, totalTokens }
} catch (err) {
try { db.exec('ROLLBACK') } catch { /* ignore */ }
throw err
}
}
clearRoomContext(roomId: string): void {
@@ -238,7 +442,7 @@ class ChatStorage {
if (!db) return
db.prepare('DELETE FROM gc_messages WHERE roomId = ?').run(roomId)
db.prepare('DELETE FROM gc_context_snapshots WHERE roomId = ?').run(roomId)
db.prepare('UPDATE gc_rooms SET totalTokens = 0 WHERE id = ?').run(roomId)
db.prepare('UPDATE gc_rooms SET totalTokens = 0, sessionSeed = ? WHERE id = ?').run(`${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`, roomId)
}
pruneMessages(roomId: string, keep = 500): void {
@@ -419,13 +623,6 @@ export class GroupChatServer {
/** roomId -> (agentName -> { agentName, status }) */
private contextStatusState = new Map<string, Map<string, { agentName: string; status: string }>>()
setGatewayManager(manager: any): void {
this.agentClients.setGatewayManager(manager)
if (this._contextEngine && manager) {
this._contextEngine.setUpstream(manager.getUpstream(''), manager.getApiKey(''))
}
}
constructor(httpServers: HttpServer | HttpServer[]) {
this.storage = new ChatStorage()
this.storage.init()
@@ -569,10 +766,18 @@ export class GroupChatServer {
logger.debug(`[GroupChat] Connected: ${userName} (socket=${socket.id}, user=${userId})`)
socket.on('join', (data: { roomId?: string; name?: string }, ack?: (response?: unknown) => void) => this.handleJoin(socket, data, ack))
socket.on('message', (data: { roomId?: string; content: string }, ack?: (response?: unknown) => void) => this.handleMessage(socket, data, ack))
socket.on('message', (data: Partial<ChatMessage> & { roomId?: string; content: string | Array<Record<string, unknown>>; id?: string; mentionDepth?: number }, ack?: (response?: unknown) => void) => this.handleMessage(socket, data, ack))
socket.on('message_stream_start', (data: { roomId?: string; id?: string; senderId?: string; senderName?: string; timestamp?: number }) => this.handleMessageStreamStart(socket, data))
socket.on('message_stream_delta', (data: { roomId?: string; id?: string; delta?: string }) => this.handleMessageStreamDelta(socket, data))
socket.on('message_reasoning_delta', (data: { roomId?: string; id?: string; delta?: string }) => this.handleMessageReasoningDelta(socket, data))
socket.on('message_stream_end', (data: { roomId?: string; id?: string }) => this.handleMessageStreamEnd(socket, data))
socket.on('typing', (data: { roomId?: string }) => this.handleTyping(socket, data))
socket.on('stop_typing', (data: { roomId?: string }) => this.handleStopTyping(socket, data))
socket.on('context_status', (data: { roomId?: string; agentName?: string; status?: string }) => this.handleContextStatus(socket, data))
socket.on('interrupt_agent', (data: { roomId?: string; agentName?: string }, ack?: (response?: unknown) => void) => this.handleInterruptAgent(socket, data, ack))
socket.on('approval.requested', (data: { roomId?: string; agentName?: string; approval_id?: string; command?: string; description?: string; choices?: string[]; allow_permanent?: boolean }) => this.handleApprovalRequested(socket, data))
socket.on('approval.resolved', (data: { roomId?: string; agentName?: string; approval_id?: string; choice?: string }) => this.handleApprovalResolved(socket, data))
socket.on('approval.respond', (data: { roomId?: string; approval_id?: string; choice?: string }, ack?: (response?: unknown) => void) => this.handleApprovalRespond(socket, data, ack))
socket.on('disconnect', () => this.handleDisconnect(socket))
}
@@ -581,14 +786,18 @@ export class GroupChatServer {
private handleJoin(socket: Socket, data: { roomId?: string; name?: string; description?: string }, ack?: (res: any) => void): void {
const socketId = socket.id
const userId = this.socketUserMap.get(socketId) || socketId
const userInfo = this.userInfoMap.get(userId) || { name: `User-${userId.slice(0, 6)}`, description: '' }
const userName = data.name || userInfo.name
const description = data.description || userInfo.description
const roomId = data.roomId || 'general'
const existingMember = this.storage.getMemberByUserId(roomId, userId)
const userInfo = this.userInfoMap.get(userId) || {
name: existingMember?.name || `User-${userId.slice(0, 6)}`,
description: existingMember?.description || '',
}
const userName = data.name || existingMember?.name || userInfo.name
const description = data.description || existingMember?.description || userInfo.description
// Update stored user info
this.userInfoMap.set(userId, { name: userName, description })
const roomId = data.roomId || 'general'
let room = this.rooms.get(roomId)
if (!room) {
room = new ChatRoom(roomId)
@@ -628,7 +837,7 @@ export class GroupChatServer {
logger.debug(`[GroupChat] ${userName} (user=${userId}) joined room: ${roomId}`)
}
private handleMessage(socket: Socket, data: { roomId?: string; content: string }, ack?: (res: any) => void): void {
private handleMessage(socket: Socket, data: Partial<ChatMessage> & { roomId?: string; content: string | Array<Record<string, unknown>>; id?: string; mentionDepth?: number }, ack?: (res: any) => void): void {
const socketId = socket.id
const roomId = data.roomId || 'general'
const room = this.rooms.get(roomId)
@@ -643,37 +852,105 @@ export class GroupChatServer {
const userName = member?.name || `User-${socketId.slice(0, 6)}`
const msg: ChatMessage = {
id: this.generateId(),
id: this.normalizeClientMessageId(data.id) || this.generateId(),
roomId,
senderId: userId,
senderName: userName,
content: data.content,
timestamp: Date.now(),
content: contentToStorageString(data.content),
timestamp: this.normalizeMessageTimestamp(data.timestamp, data.role),
role: normalizeMessageRole(data.role),
tool_call_id: data.tool_call_id ?? null,
tool_calls: Array.isArray(data.tool_calls) ? data.tool_calls : null,
tool_name: data.tool_name ?? null,
finish_reason: data.finish_reason ?? null,
reasoning: data.reasoning ?? null,
reasoning_details: data.reasoning_details ?? null,
reasoning_content: data.reasoning_content ?? null,
}
this.storage.addMessage(msg)
this.storage.pruneMessages(roomId)
const saved = this.storage.saveMessageAndRefreshRoom(msg)
const savedMsg = saved.message
const totalTokens = saved.totalTokens
// Recalculate total tokens for the room
const messages = this.storage.getMessages(roomId)
const totalTokens = this.storage.estimateTokens(messages.map(m => m.content + m.senderName).join(''))
this.storage.updateRoomTotalTokens(roomId, totalTokens)
this.nsp.to(roomId).emit('message', msg)
this.nsp.to(roomId).emit('message', savedMsg)
this.nsp.to(roomId).emit('room_updated', { roomId, totalTokens })
ack?.({ id: msg.id })
ack?.({ id: savedMsg.id })
// Server-side @mention routing — parse mentions and invoke agents directly
this.agentClients.processMentions(roomId, {
content: msg.content,
senderName: msg.senderName,
senderId: msg.senderId,
timestamp: msg.timestamp,
}).catch((err) => {
logger.error(`[GroupChat] processMentions error: ${err.message}`)
const mentionDepth = normalizeMentionDepth(data.mentionDepth)
const shouldRouteMentions =
savedMsg.role === 'user' ||
(savedMsg.role === 'assistant' && mentionDepth < 2)
if (shouldRouteMentions) {
// Server-side @mention routing — parse user mentions and invoke agents directly.
this.agentClients.processMentions(roomId, {
content: contentToText(savedMsg.content),
input: Array.isArray(data.content) ? data.content : undefined,
senderName: savedMsg.senderName,
senderId: savedMsg.senderId,
timestamp: savedMsg.timestamp,
mentionDepth,
}).catch((err) => {
logger.error(`[GroupChat] processMentions error: ${err.message}`)
})
}
}
private handleMessageStreamStart(socket: Socket, data: { roomId?: string; id?: string; senderId?: string; senderName?: string; timestamp?: number }): void {
const roomId = data.roomId || 'general'
const room = this.rooms.get(roomId)
if (!room || !room.hasOnlineMember(socket.id)) return
const id = this.normalizeClientMessageId(data.id)
if (!id) return
const member = room.getOnlineMemberBySocketId(socket.id)
this.nsp.to(roomId).emit('message_stream_start', {
id,
roomId,
senderId: data.senderId || member?.userId || socket.id,
senderName: data.senderName || member?.name || `User-${socket.id.slice(0, 6)}`,
content: '',
timestamp: data.timestamp || Date.now(),
role: 'assistant',
finish_reason: 'streaming',
})
}
private handleMessageStreamDelta(socket: Socket, data: { roomId?: string; id?: string; delta?: string }): void {
const roomId = data.roomId || 'general'
const room = this.rooms.get(roomId)
if (!room || !room.hasOnlineMember(socket.id)) return
const id = this.normalizeClientMessageId(data.id)
if (!id || !data.delta) return
this.nsp.to(roomId).emit('message_stream_delta', {
roomId,
id,
delta: String(data.delta),
})
}
private handleMessageReasoningDelta(socket: Socket, data: { roomId?: string; id?: string; delta?: string }): void {
const roomId = data.roomId || 'general'
const room = this.rooms.get(roomId)
if (!room || !room.hasOnlineMember(socket.id)) return
const id = this.normalizeClientMessageId(data.id)
if (!id || !data.delta) return
this.nsp.to(roomId).emit('message_reasoning_delta', {
roomId,
id,
delta: String(data.delta),
})
}
private handleMessageStreamEnd(socket: Socket, data: { roomId?: string; id?: string }): void {
const roomId = data.roomId || 'general'
const room = this.rooms.get(roomId)
if (!room || !room.hasOnlineMember(socket.id)) return
const id = this.normalizeClientMessageId(data.id)
if (!id) return
this.nsp.to(roomId).emit('message_stream_end', { roomId, id })
}
private handleTyping(socket: Socket, data: { roomId?: string }): void {
const roomId = data.roomId || 'general'
const userId = this.socketUserMap.get(socket.id) || socket.id
@@ -749,6 +1026,75 @@ export class GroupChatServer {
})
}
private async handleInterruptAgent(socket: Socket, data: { roomId?: string; agentName?: string }, ack?: (response?: unknown) => void): Promise<void> {
const roomId = data.roomId
const agentName = data.agentName
if (!roomId || !agentName) {
ack?.({ error: 'roomId and agentName are required' })
return
}
const room = this.rooms.get(roomId)
if (!room?.hasOnlineMember(socket.id)) {
ack?.({ error: 'Not in room' })
return
}
try {
await this.agentClients.interruptAgent(roomId, agentName)
this.nsp.to(roomId).emit('context_status', { roomId, agentName, status: 'ready' })
ack?.({ ok: true })
} catch (err: any) {
logger.warn(`[GroupChat] failed to interrupt agent ${agentName} in room ${roomId}: ${err.message}`)
ack?.({ error: err.message || 'interrupt failed' })
}
}
private handleApprovalRequested(socket: Socket, data: { roomId?: string; agentName?: string; approval_id?: string; command?: string; description?: string; choices?: string[]; allow_permanent?: boolean }): void {
const roomId = data.roomId
if (!roomId || !data.approval_id) return
this.nsp.to(roomId).emit('approval.requested', {
event: 'approval.requested',
roomId,
agentName: data.agentName || '',
approval_id: data.approval_id,
command: data.command || '',
description: data.description || '',
choices: Array.isArray(data.choices) ? data.choices : ['once', 'session', 'deny'],
allow_permanent: Boolean(data.allow_permanent),
})
}
private handleApprovalResolved(socket: Socket, data: { roomId?: string; agentName?: string; approval_id?: string; choice?: string }): void {
const roomId = data.roomId
if (!roomId || !data.approval_id) return
this.nsp.to(roomId).emit('approval.resolved', {
event: 'approval.resolved',
roomId,
agentName: data.agentName || '',
approval_id: data.approval_id,
choice: data.choice || '',
})
}
private async handleApprovalRespond(socket: Socket, data: { roomId?: string; approval_id?: string; choice?: string }, ack?: (response?: unknown) => void): Promise<void> {
const roomId = data.roomId
if (!roomId || !data.approval_id) {
ack?.({ error: 'roomId and approval_id are required' })
return
}
const room = this.rooms.get(roomId)
if (!room?.hasOnlineMember(socket.id)) {
ack?.({ error: 'Not in room' })
return
}
try {
const result = await new AgentBridgeClient().approvalRespond(data.approval_id, data.choice || 'deny')
ack?.({ ok: true, resolved: Boolean((result as any)?.resolved) })
} catch (err: any) {
logger.warn(`[GroupChat] failed to respond approval ${data.approval_id}: ${err.message}`)
ack?.({ error: err.message || 'approval response failed' })
}
}
private handleDisconnect(socket: Socket): void {
const socketId = socket.id
const userId = this.socketUserMap.get(socketId)
@@ -804,4 +1150,19 @@ export class GroupChatServer {
private generateId(): string {
return Date.now().toString(36) + Math.random().toString(36).slice(2, 8)
}
private normalizeClientMessageId(id?: string): string | null {
const cleaned = String(id || '').trim()
if (!cleaned || cleaned.length > 160) return null
return /^[a-zA-Z0-9_-]+$/.test(cleaned) ? cleaned : null
}
private normalizeMessageTimestamp(timestamp?: unknown, role?: unknown): number {
const normalizedRole = normalizeMessageRole(role)
if (normalizedRole !== 'user') {
const value = Number(timestamp)
if (Number.isFinite(value) && value > 0) return value
}
return Date.now()
}
}