[verified] Fix group chat agent member sync (#877)
This commit is contained in:
@@ -186,7 +186,7 @@ export async function listAgents(roomId: string): Promise<{ agents: RoomAgent[]
|
||||
return request(`/api/hermes/group-chat/rooms/${roomId}/agents`)
|
||||
}
|
||||
|
||||
export async function removeAgent(roomId: string, agentId: string): Promise<void> {
|
||||
export async function removeAgent(roomId: string, agentId: string): Promise<{ success: boolean; agents: RoomAgent[]; members: MemberInfo[] }> {
|
||||
return request(`/api/hermes/group-chat/rooms/${roomId}/agents/${agentId}`, {
|
||||
method: 'DELETE',
|
||||
})
|
||||
|
||||
@@ -618,8 +618,9 @@ export const useGroupChatStore = defineStore('groupChat', () => {
|
||||
|
||||
async function removeAgentFromRoom(roomId: string, agentId: string) {
|
||||
try {
|
||||
await removeAgent(roomId, agentId)
|
||||
agents.value = agents.value.filter(a => a.id !== agentId)
|
||||
const res = await removeAgent(roomId, agentId)
|
||||
agents.value = res.agents ?? agents.value.filter(a => a.id !== agentId && a.agentId !== agentId)
|
||||
if (res.members) members.value = res.members
|
||||
} catch (err: any) {
|
||||
error.value = err.message
|
||||
throw err
|
||||
|
||||
@@ -66,6 +66,7 @@ groupChatRoutes.post('/api/hermes/group-chat/rooms', async (ctx) => {
|
||||
|
||||
try {
|
||||
const client = await chatServer.agentClients.createAgent({
|
||||
agentId: agent.agentId,
|
||||
profile: agent.profile,
|
||||
name: agent.name,
|
||||
description: agent.description,
|
||||
@@ -121,6 +122,7 @@ groupChatRoutes.post('/api/hermes/group-chat/rooms/:roomId/clone', async (ctx) =
|
||||
|
||||
try {
|
||||
const client = await chatServer.agentClients.createAgent({
|
||||
agentId: agent.agentId,
|
||||
profile: agent.profile,
|
||||
name: agent.name,
|
||||
description: agent.description,
|
||||
@@ -240,6 +242,7 @@ groupChatRoutes.post('/api/hermes/group-chat/rooms/:roomId/agents', async (ctx)
|
||||
// Auto-connect agent via Socket.IO
|
||||
try {
|
||||
const client = await chatServer.agentClients.createAgent({
|
||||
agentId: agent.agentId,
|
||||
profile: agent.profile,
|
||||
name: agent.name,
|
||||
description: agent.description,
|
||||
@@ -273,9 +276,24 @@ groupChatRoutes.delete('/api/hermes/group-chat/rooms/:roomId/agents/:agentId', a
|
||||
return
|
||||
}
|
||||
|
||||
chatServer.getStorage().removeRoomAgent(ctx.params.agentId)
|
||||
chatServer.agentClients.removeAgentFromRoom(ctx.params.roomId, ctx.params.agentId)
|
||||
ctx.body = { success: true }
|
||||
const roomId = ctx.params.roomId
|
||||
const requestedAgentId = ctx.params.agentId
|
||||
const storage = chatServer.getStorage()
|
||||
const agent = storage.getRoomAgent(roomId, requestedAgentId)
|
||||
if (!agent) {
|
||||
ctx.status = 404
|
||||
ctx.body = { error: 'Agent not found' }
|
||||
return
|
||||
}
|
||||
|
||||
storage.removeRoomMembersForAgent(roomId, agent)
|
||||
storage.removeRoomAgent(roomId, requestedAgentId)
|
||||
chatServer.agentClients.removeAgentFromRoom(roomId, agent.agentId)
|
||||
ctx.body = {
|
||||
success: true,
|
||||
agents: storage.getRoomAgents(roomId),
|
||||
members: storage.getRoomMembers(roomId),
|
||||
}
|
||||
})
|
||||
|
||||
// Delete room
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { io, Socket } from 'socket.io-client'
|
||||
import { randomBytes } from 'crypto'
|
||||
import { getToken } from '../../../services/auth'
|
||||
import { logger } from '../../../services/logger'
|
||||
import { updateUsage } from '../../../db/hermes/usage-store'
|
||||
@@ -11,9 +12,12 @@ import {
|
||||
stripMentionRoutingTokens,
|
||||
} from './mention-routing'
|
||||
|
||||
export const GROUP_CHAT_AGENT_SOCKET_SECRET = randomBytes(32).toString('hex')
|
||||
|
||||
// ─── Types ────────────────────────────────────────────────────
|
||||
|
||||
interface AgentConfig {
|
||||
agentId?: string
|
||||
profile: string
|
||||
name: string
|
||||
description: string
|
||||
@@ -77,7 +81,7 @@ class AgentClient {
|
||||
private pendingToolBaseIds = new Map<string, string>()
|
||||
|
||||
constructor(config: AgentConfig, handlers: AgentEventHandler = {}) {
|
||||
this.agentId = Date.now().toString(36) + Math.random().toString(36).slice(2, 8)
|
||||
this.agentId = config.agentId || Date.now().toString(36) + Math.random().toString(36).slice(2, 8)
|
||||
this.profile = config.profile
|
||||
this.name = config.name
|
||||
this.description = config.description
|
||||
@@ -107,7 +111,11 @@ class AgentClient {
|
||||
this.socket = io(`http://127.0.0.1:${actualPort}/group-chat`, {
|
||||
auth: {
|
||||
token: token || undefined,
|
||||
userId: this.agentId,
|
||||
name: this.name,
|
||||
description: this.description,
|
||||
source: 'agent',
|
||||
agentSocketSecret: GROUP_CHAT_AGENT_SOCKET_SECRET,
|
||||
},
|
||||
transports: ['websocket'],
|
||||
reconnection: true,
|
||||
@@ -243,7 +251,7 @@ class AgentClient {
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Hermes Gateway Integration ────────────────────────────
|
||||
// ─── Hermes Agent Bridge Integration ───────────────────────
|
||||
|
||||
/**
|
||||
* Handle an @mention from the server side.
|
||||
|
||||
@@ -3,7 +3,7 @@ import type { Server as HttpServer } from 'http'
|
||||
import { getToken } from '../../../services/auth'
|
||||
import { logger } from '../../../services/logger'
|
||||
import { getDb } from '../../../db'
|
||||
import { AgentClients } from './agent-clients'
|
||||
import { AgentClients, GROUP_CHAT_AGENT_SOCKET_SECRET } from './agent-clients'
|
||||
import { ContextEngine } from '../context-engine/compressor'
|
||||
import { SessionDeleter } from '../session-deleter'
|
||||
import { countTokens, SUMMARY_PREFIX } from '../../../lib/context-compressor'
|
||||
@@ -75,6 +75,7 @@ interface Member {
|
||||
joinedAt: number
|
||||
online: boolean
|
||||
socketId: string
|
||||
source?: 'human' | 'agent'
|
||||
}
|
||||
|
||||
let _tablesEnsured = false
|
||||
@@ -476,8 +477,20 @@ class ChatStorage {
|
||||
return { id, roomId, agentId, profile, name, description, invited }
|
||||
}
|
||||
|
||||
removeRoomAgent(agentId: string): void {
|
||||
this.db()?.prepare('DELETE FROM gc_room_agents WHERE id = ?').run(agentId)
|
||||
getRoomAgent(roomId: string, agentRef: string): RoomAgent | null {
|
||||
return (this.db()?.prepare(
|
||||
'SELECT id, roomId, agentId, profile, name, description, invited FROM gc_room_agents WHERE roomId = ? AND (id = ? OR agentId = ?)'
|
||||
).get(roomId, agentRef, agentRef) as any) ?? null
|
||||
}
|
||||
|
||||
getRoomAgentByAgentId(roomId: string, agentId: string): RoomAgent | null {
|
||||
return (this.db()?.prepare(
|
||||
'SELECT id, roomId, agentId, profile, name, description, invited FROM gc_room_agents WHERE roomId = ? AND agentId = ?'
|
||||
).get(roomId, agentId) as any) ?? null
|
||||
}
|
||||
|
||||
removeRoomAgent(roomId: string, agentRef: string): void {
|
||||
this.db()?.prepare('DELETE FROM gc_room_agents WHERE roomId = ? AND (id = ? OR agentId = ?)').run(roomId, agentRef, agentRef)
|
||||
}
|
||||
|
||||
// ─── Context Snapshots ──────────────────────────────────
|
||||
@@ -512,10 +525,26 @@ class ChatStorage {
|
||||
|
||||
getRoomMembers(roomId: string): { id: string; userId: string; name: string; description: string; joinedAt: number }[] {
|
||||
return (this.db()?.prepare(
|
||||
'SELECT id, userId, userName as name, description, joinedAt FROM gc_room_members WHERE roomId = ? ORDER BY joinedAt'
|
||||
`SELECT m.id, m.userId, m.userName as name, m.description, m.joinedAt
|
||||
FROM gc_room_members m
|
||||
WHERE m.roomId = ?
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM gc_room_agents a
|
||||
WHERE a.roomId = m.roomId
|
||||
AND (a.agentId = m.userId OR (m.userId NOT GLOB '????????-????-????-????-????????????' AND COALESCE(m.description, '') = '' AND a.name = m.userName))
|
||||
)
|
||||
ORDER BY m.joinedAt`
|
||||
).all(roomId) || []) as unknown as { id: string; userId: string; name: string; description: string; joinedAt: number }[]
|
||||
}
|
||||
|
||||
removeRoomMembersForAgent(roomId: string, agent: Pick<RoomAgent, 'agentId' | 'name'>): void {
|
||||
this.db()?.prepare(
|
||||
`DELETE FROM gc_room_members
|
||||
WHERE roomId = ?
|
||||
AND (userId = ? OR (userId NOT GLOB '????????-????-????-????-????????????' AND COALESCE(description, '') = '' AND userName = ?))`
|
||||
).run(roomId, agent.agentId, agent.name)
|
||||
}
|
||||
|
||||
addRoomMember(roomId: string, userId: string, userName: string, description: string): void {
|
||||
const existing = this.getMemberByUserId(roomId, userId)
|
||||
if (existing) {
|
||||
@@ -565,16 +594,17 @@ class ChatRoom {
|
||||
this.name = name || id
|
||||
}
|
||||
|
||||
addOrUpdateMember(socketId: string, userId: string, name: string, description: string): Member {
|
||||
addOrUpdateMember(socketId: string, userId: string, name: string, description: string, source: 'human' | 'agent' = 'human'): Member {
|
||||
const existing = this.members.get(userId)
|
||||
if (existing) {
|
||||
existing.name = name
|
||||
existing.description = description
|
||||
existing.online = true
|
||||
existing.socketId = socketId
|
||||
existing.source = source
|
||||
return existing
|
||||
}
|
||||
const member: Member = { id: socketId, userId, name, description, joinedAt: Date.now(), online: true, socketId }
|
||||
const member: Member = { id: socketId, userId, name, description, joinedAt: Date.now(), online: true, socketId, source }
|
||||
this.members.set(userId, member)
|
||||
return member
|
||||
}
|
||||
@@ -589,7 +619,7 @@ class ChatRoom {
|
||||
}
|
||||
|
||||
getMembersList(): Member[] {
|
||||
return Array.from(this.members.values())
|
||||
return Array.from(this.members.values()).filter(member => member.source !== 'agent')
|
||||
}
|
||||
|
||||
getOnlineMemberBySocketId(socketId: string): Member | undefined {
|
||||
@@ -615,6 +645,8 @@ export class GroupChatServer {
|
||||
private socketUserMap = new Map<string, string>()
|
||||
/** Map: userId → { name, description } (from auth) */
|
||||
private userInfoMap = new Map<string, { name: string; description: string }>()
|
||||
/** Map: socket.id → requested participant source from handshake */
|
||||
private socketRequestedSourceMap = new Map<string, 'human' | 'agent'>()
|
||||
readonly agentClients = new AgentClients()
|
||||
private _contextEngine: ContextEngine | null = null
|
||||
private _restoreScheduled = false
|
||||
@@ -721,6 +753,7 @@ export class GroupChatServer {
|
||||
for (const agent of agents) {
|
||||
try {
|
||||
const client = await this.agentClients.createAgent({
|
||||
agentId: agent.agentId,
|
||||
profile: agent.profile,
|
||||
name: agent.name,
|
||||
description: agent.description,
|
||||
@@ -755,12 +788,14 @@ export class GroupChatServer {
|
||||
// ─── Connection ─────────────────────────────────────────────
|
||||
|
||||
private onConnection(socket: Socket): void {
|
||||
const auth = socket.handshake.auth as { userId?: string; name?: string; description?: string }
|
||||
const auth = socket.handshake.auth as { userId?: string; name?: string; description?: string; source?: string; agentSocketSecret?: string }
|
||||
const userId = auth.userId || socket.id
|
||||
const userName = auth.name || `User-${userId.slice(0, 6)}`
|
||||
const description = auth.description || ''
|
||||
const requestedSource = auth.source === 'agent' && auth.agentSocketSecret === GROUP_CHAT_AGENT_SOCKET_SECRET ? 'agent' : 'human'
|
||||
|
||||
this.socketUserMap.set(socket.id, userId)
|
||||
this.socketRequestedSourceMap.set(socket.id, requestedSource)
|
||||
this.userInfoMap.set(userId, { name: userName, description })
|
||||
|
||||
logger.debug(`[GroupChat] Connected: ${userName} (socket=${socket.id}, user=${userId})`)
|
||||
@@ -786,7 +821,14 @@ export class GroupChatServer {
|
||||
private handleJoin(socket: Socket, data: { roomId?: string; name?: string; description?: string }, ack?: (res: any) => void): void {
|
||||
const socketId = socket.id
|
||||
const userId = this.socketUserMap.get(socketId) || socketId
|
||||
const requestedSource = this.socketRequestedSourceMap.get(socketId) || 'human'
|
||||
const roomId = data.roomId || 'general'
|
||||
const roomAgent = this.storage.getRoomAgentByAgentId(roomId, userId)
|
||||
const source = requestedSource === 'agent' && roomAgent ? 'agent' : 'human'
|
||||
if (source === 'human' && roomAgent) {
|
||||
ack?.({ error: 'Reserved member identity' })
|
||||
return
|
||||
}
|
||||
const existingMember = this.storage.getMemberByUserId(roomId, userId)
|
||||
const userInfo = this.userInfoMap.get(userId) || {
|
||||
name: existingMember?.name || `User-${userId.slice(0, 6)}`,
|
||||
@@ -805,19 +847,25 @@ export class GroupChatServer {
|
||||
this.storage.saveRoom(roomId, roomId)
|
||||
}
|
||||
|
||||
// Persist member to SQLite
|
||||
this.storage.addRoomMember(roomId, userId, userName, description)
|
||||
// Persist only human members. Agent sockets are runtime participants
|
||||
// tracked through gc_room_agents and AgentClients; storing them in
|
||||
// gc_room_members makes member counts grow on reconnect/restore.
|
||||
if (source !== 'agent') {
|
||||
this.storage.addRoomMember(roomId, userId, userName, description)
|
||||
}
|
||||
|
||||
// Add to in-memory online members (keyed by userId)
|
||||
room.addOrUpdateMember(socketId, userId, userName, description)
|
||||
// Add to in-memory online participants (keyed by userId)
|
||||
room.addOrUpdateMember(socketId, userId, userName, description, source)
|
||||
socket.join(roomId)
|
||||
|
||||
socket.to(roomId).emit('member_joined', {
|
||||
roomId,
|
||||
memberId: userId,
|
||||
memberName: userName,
|
||||
members: room.getMembersList(),
|
||||
})
|
||||
if (source !== 'agent') {
|
||||
socket.to(roomId).emit('member_joined', {
|
||||
roomId,
|
||||
memberId: userId,
|
||||
memberName: userName,
|
||||
members: room.getMembersList(),
|
||||
})
|
||||
}
|
||||
|
||||
// Load history from SQLite
|
||||
const messages = this.storage.getMessages(roomId)
|
||||
@@ -1114,6 +1162,7 @@ export class GroupChatServer {
|
||||
|
||||
this.leaveAllRooms(socket, socketId)
|
||||
this.socketUserMap.delete(socketId)
|
||||
this.socketRequestedSourceMap.delete(socketId)
|
||||
// Don't delete userInfoMap — it persists across reconnects
|
||||
}
|
||||
|
||||
@@ -1137,12 +1186,14 @@ export class GroupChatServer {
|
||||
const member = room.getOnlineMemberBySocketId(socketId)
|
||||
room.removeMember(socketId)
|
||||
socket.leave(rid)
|
||||
this.nsp.to(rid).emit('member_left', {
|
||||
roomId: rid,
|
||||
memberId: member?.userId || socketId,
|
||||
memberName: member?.name || `User-${socketId.slice(0, 6)}`,
|
||||
members: room.getMembersList(),
|
||||
})
|
||||
if (member?.source !== 'agent') {
|
||||
this.nsp.to(rid).emit('member_left', {
|
||||
roomId: rid,
|
||||
memberId: member?.userId || socketId,
|
||||
memberName: member?.name || `User-${socketId.slice(0, 6)}`,
|
||||
members: room.getMembersList(),
|
||||
})
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -0,0 +1,136 @@
|
||||
import { describe, expect, it, vi, beforeEach } from 'vitest'
|
||||
|
||||
const { socketHandlers, mockSocket, mockIo } = vi.hoisted(() => {
|
||||
const socketHandlers = new Map<string, (...args: any[]) => void>()
|
||||
const mockSocket: any = {
|
||||
id: 'socket-1',
|
||||
connected: true,
|
||||
io: { on: vi.fn() },
|
||||
on: vi.fn((event: string, handler: (...args: any[]) => void) => {
|
||||
socketHandlers.set(event, handler)
|
||||
if (event === 'connect') queueMicrotask(() => handler())
|
||||
return mockSocket
|
||||
}),
|
||||
emit: vi.fn(),
|
||||
disconnect: vi.fn(),
|
||||
}
|
||||
const mockIo = vi.fn(() => mockSocket)
|
||||
return { socketHandlers, mockSocket, mockIo }
|
||||
})
|
||||
|
||||
vi.mock('socket.io-client', () => ({
|
||||
io: mockIo,
|
||||
}))
|
||||
|
||||
vi.mock('../../packages/server/src/services/auth', () => ({
|
||||
getToken: vi.fn(async () => 'test-token'),
|
||||
}))
|
||||
|
||||
import { AgentClients } from '../../packages/server/src/services/hermes/group-chat/agent-clients'
|
||||
import { groupChatRoutes, setGroupChatServer } from '../../packages/server/src/routes/hermes/group-chat'
|
||||
|
||||
function routeHandler(path: string, method: string) {
|
||||
const layer = (groupChatRoutes as any).stack.find((item: any) => item.path === path && item.methods.includes(method))
|
||||
if (!layer) throw new Error(`Route not found: ${method} ${path}`)
|
||||
return layer.stack[0]
|
||||
}
|
||||
|
||||
describe('Group Chat member/agent identity sync', () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
socketHandlers.clear()
|
||||
})
|
||||
|
||||
it('uses the persisted group-chat agent id as the runtime agent id and socket user id', async () => {
|
||||
const clients = new AgentClients()
|
||||
|
||||
const client = await clients.createAgent({
|
||||
agentId: 'agent-stable-1',
|
||||
profile: 'default',
|
||||
name: 'Worker',
|
||||
description: '',
|
||||
invited: 0,
|
||||
} as any)
|
||||
|
||||
expect(client.agentId).toBe('agent-stable-1')
|
||||
expect(mockIo).toHaveBeenCalledWith(
|
||||
'http://127.0.0.1:8648/group-chat',
|
||||
expect.objectContaining({
|
||||
auth: expect.objectContaining({
|
||||
token: 'test-token',
|
||||
userId: 'agent-stable-1',
|
||||
name: 'Worker',
|
||||
source: 'agent',
|
||||
agentSocketSecret: expect.any(String),
|
||||
}),
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
it('passes the same persisted agent id into the runtime client when adding an agent', async () => {
|
||||
const addRoomAgent = vi.fn((roomId: string, agentId: string, profile: string, name: string, description: string, invited: number) => ({
|
||||
id: 'row-1', roomId, agentId, profile, name, description, invited,
|
||||
}))
|
||||
const chatServer = {
|
||||
getStorage: () => ({
|
||||
getRoomAgents: vi.fn(() => []),
|
||||
addRoomAgent,
|
||||
}),
|
||||
agentClients: {
|
||||
createAgent: vi.fn(async () => ({ agentId: 'runtime-agent' })),
|
||||
addAgentToRoom: vi.fn(async () => undefined),
|
||||
},
|
||||
}
|
||||
setGroupChatServer(chatServer as any)
|
||||
|
||||
const handler = routeHandler('/api/hermes/group-chat/rooms/:roomId/agents', 'POST')
|
||||
const ctx: any = {
|
||||
params: { roomId: 'room-1' },
|
||||
request: { body: { profile: 'default', name: 'Worker' } },
|
||||
status: 200,
|
||||
body: undefined,
|
||||
}
|
||||
await handler(ctx, async () => {})
|
||||
|
||||
const persisted = ctx.body.agent
|
||||
expect(persisted.agentId).toBeTruthy()
|
||||
expect(chatServer.agentClients.createAgent).toHaveBeenCalledWith(expect.objectContaining({
|
||||
agentId: persisted.agentId,
|
||||
profile: 'default',
|
||||
name: 'Worker',
|
||||
}))
|
||||
})
|
||||
|
||||
it('removes the runtime agent by persisted agentId and returns synchronized room state', async () => {
|
||||
const agentsBefore = [{ id: 'row-1', roomId: 'room-1', agentId: 'agent-stable-1', profile: 'default', name: 'Worker', description: '', invited: 0 }]
|
||||
const storage = {
|
||||
getRoomAgent: vi.fn(() => agentsBefore[0]),
|
||||
getRoomAgents: vi.fn(() => []),
|
||||
removeRoomMembersForAgent: vi.fn(),
|
||||
removeRoomAgent: vi.fn(),
|
||||
getRoomMembers: vi.fn(() => [{ id: 'member-1', userId: 'human-1', name: 'Han', description: '', joinedAt: 1 }]),
|
||||
}
|
||||
const chatServer = {
|
||||
getStorage: () => storage,
|
||||
agentClients: { removeAgentFromRoom: vi.fn() },
|
||||
}
|
||||
setGroupChatServer(chatServer as any)
|
||||
|
||||
const handler = routeHandler('/api/hermes/group-chat/rooms/:roomId/agents/:agentId', 'DELETE')
|
||||
const ctx: any = {
|
||||
params: { roomId: 'room-1', agentId: 'row-1' },
|
||||
status: 200,
|
||||
body: undefined,
|
||||
}
|
||||
await handler(ctx, async () => {})
|
||||
|
||||
expect(chatServer.agentClients.removeAgentFromRoom).toHaveBeenCalledWith('room-1', 'agent-stable-1')
|
||||
expect(storage.removeRoomMembersForAgent).toHaveBeenCalledWith('room-1', agentsBefore[0])
|
||||
expect(storage.removeRoomAgent).toHaveBeenCalledWith('room-1', 'row-1')
|
||||
expect(ctx.body).toEqual({
|
||||
success: true,
|
||||
agents: [],
|
||||
members: [{ id: 'member-1', userId: 'human-1', name: 'Han', description: '', joinedAt: 1 }],
|
||||
})
|
||||
})
|
||||
})
|
||||
Reference in New Issue
Block a user