feat: group chat session lifecycle, typing recovery, mention highlighting (#186)

* feat: restore group chat system with Socket.IO and SQLite persistence

- GroupChatServer: Socket.IO server with room management, message history, typing indicators
- SQLite storage for rooms, messages, and agent configuration
- AgentClients: manages AI agent connections via socket.io-client, forwards @mentions to Hermes gateway
- REST API: room CRUD, agent management, invite codes
- Agent auto-restoration on server restart
- Tests for all REST endpoints

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: add context-engine design document for group chat compression

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: handle special-character session search

* fix: keep unicode dotted session search on quoted FTS path

* feat: add context engine and group chat frontend UI

- Context engine: three-zone compression (head/tail/summary) with LLM
  summarization, incremental updates, TTL cache, and graceful degradation
- Frontend: group chat page with Socket.IO client, room sidebar, message
  list, agent/member display, create/join-by-code modals
- Integration: wire context engine into agent-clients before /v1/runs
- Refactor ChatStorage to use global DB (getDb/ensureTable) with gc_ prefix
- Add i18n keys for group chat to all 8 locales
- Add sidebar nav entry and router for group chat page

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: remove leftover main branch code from merge conflict resolution

The `isNumericQuery`, `hasUnsafeChars`, and `runLikeContentSearch` functions
no longer exist — they were replaced by HEAD's `shouldUseLiteralContentSearch`
and `runLiteralContentSearch`. This dead code block caused a TypeScript
compile error after the merge.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: install missing socket.io dep and type ack params

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: enable WebSocket proxy and fix socket.io transport for group chat

- Add ws: true to Vite proxy config so WebSocket upgrade requests
  are forwarded to the backend
- Allow both polling and websocket transports on server and client
  (polling as fallback when WebSocket upgrade fails through proxy)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: separate socket.io path from REST routes for group chat

socket.io was mounted at /api/hermes/group-chat which intercepted all
REST requests to /api/hermes/group-chat/rooms etc, returning
"Transport unknown". Changed socket.io path to /api/hermes/group-chat/ws
to avoid conflicts.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: improve group chat UI, agent management, and socket.io reliability

- Redesign GroupChatPanel with Naive UI, stacked agent avatars, and popover management
- Match GroupChatInput style with single chat input, add IME composition handling
- Add agent add/remove per room with profile selection and duplicate prevention
- Use @multiavatar for SVG avatar generation with caching
- Decouple joinRoom from socket.io, use REST API for data loading
- Switch socket.io to default path with /group-chat namespace to avoid proxy conflicts
- Restore agent connections after server is listening
- Add getRoomDetail REST endpoint and duplicate agent prevention (409)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: server-side @mention routing with context compression status and queue

- Move @mention detection from agent socket listeners to server-side processMentions()
- Add per-room processing lock to block mention dispatch during compression
- Queue mentions during processing, drain only the latest when ready
- Emit context_status events (compressing/replying/ready) to room via Socket.IO
- Frontend displays compression status indicator above input
- Token-based compression trigger (100k threshold) with CJK-aware estimation
- Fix compressor type errors (countTokens parameter type)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: improve group chat profile handling and session sync

Refine group chat room/session behavior with per-room compression controls, sidebar updates, and better stale session cleanup so multi-profile group chat state stays consistent.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: group chat improvements — session lifecycle, typing recovery, mention highlighting

- Fix cross-profile session deletion with deferred delete queue
- Move saveSessionProfile to after gateway response confirmation
- Replace all console.log with logger in group-chat modules
- Add server-side typing/context_status state tracking for room rejoin
- Fix @ mention popup position to follow cursor
- Add @ mention highlighting (blue) in chat message content
- Fix mention regex to match all occurrences after HTML tags
- Enable esbuild minify and treeShaking
- Move @multiavatar/multiavatar to devDependencies
- Add i18n keys for group chat features
- Update tests for new functionality

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* chore: bump version to 0.4.5 and move @multiavatar to devDependencies

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Zhicheng Han <zhicheng.han@mathematik.uni-goettingen.de>
This commit is contained in:
ekko
2026-04-24 20:41:14 +08:00
committed by GitHub
parent 82965ae6e2
commit ba72264542
47 changed files with 7590 additions and 141 deletions
@@ -0,0 +1,669 @@
import { io, Socket } from 'socket.io-client'
import { EventSource } from 'eventsource'
import { getToken } from '../../../services/auth'
import type { GatewayManager } from '../gateway-manager'
import { deleteSession as hermesDeleteSession } from '../hermes-cli'
import { getActiveProfileName } from '../hermes-profile'
import { logger } from '../../../services/logger'
// ─── Types ────────────────────────────────────────────────────
interface AgentConfig {
profile: string
name: string
description: string
invited: number
}
interface MessageData {
id: string
roomId: string
senderId: string
senderName: string
content: string
timestamp: number
}
interface MemberData {
id: string
name: string
joinedAt: number
}
interface JoinResult {
roomId: string
roomName: string
members: MemberData[]
messages: MessageData[]
rooms: string[]
}
export interface AgentEventHandler {
onMessage?: (data: { roomId: string; msg: MessageData }) => void
onTyping?: (data: { roomId: string; userId: string; userName: string }) => void
onStopTyping?: (data: { roomId: string; userId: string; userName: string }) => void
onMemberJoined?: (data: { roomId: string; memberId: string; memberName: string; members: MemberData[] }) => void
onMemberLeft?: (data: { roomId: string; memberId: string; memberName: string; members: MemberData[] }) => void
}
// ─── Agent Client (single connection) ─────────────────────────
class AgentClient {
readonly agentId: string
readonly profile: string
readonly name: string
readonly description: string
private socket: Socket | null = null
private joinedRooms = new Set<string>()
private handlers: AgentEventHandler
private _reconnecting = false
private gatewayManager: GatewayManager | null = null
private contextEngine: any = null
private storage: any = null
constructor(config: AgentConfig, handlers: AgentEventHandler = {}) {
this.agentId = Date.now().toString(36) + Math.random().toString(36).slice(2, 8)
this.profile = config.profile
this.name = config.name
this.description = config.description
this.handlers = handlers
}
get connected(): boolean {
return this.socket?.connected ?? false
}
get id(): string | undefined {
return this.socket?.id
}
setGatewayManager(manager: GatewayManager): void {
this.gatewayManager = manager
}
setContextEngine(engine: any): void {
this.contextEngine = engine
}
setStorage(storage: any): void {
this.storage = storage
}
async connect(port = 8648): Promise<void> {
const token = await getToken()
this.socket = io(`http://127.0.0.1:${port}/group-chat`, {
auth: {
token: token || undefined,
name: this.name,
},
transports: ['websocket'],
reconnection: true,
reconnectionAttempts: Infinity,
reconnectionDelay: 1000,
reconnectionDelayMax: 30000,
})
this.bindEvents()
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error('Connection timeout')), 10000)
this.socket!.on('connect', () => {
clearTimeout(timeout)
logger.debug(`[AgentClient] ${this.name} connected, socket id: ${this.socket!.id}`)
resolve()
})
this.socket!.on('connect_error', (err) => {
clearTimeout(timeout)
logger.error(err, `[AgentClient] ${this.name} connect_error`)
reject(err)
})
})
}
disconnect(): void {
if (this.socket) {
this.socket.disconnect()
this.socket = null
this.joinedRooms.clear()
}
}
async joinRoom(roomId: string): Promise<JoinResult> {
this.ensureConnected()
return new Promise((resolve, reject) => {
this.socket!.emit('join', { roomId }, (res: JoinResult | { error: string }) => {
if ('error' in res) {
reject(new Error(res.error))
} else {
this.joinedRooms.add(roomId)
resolve(res)
}
})
})
}
sendMessage(roomId: string, content: string): Promise<string> {
this.ensureConnected()
return new Promise((resolve, reject) => {
this.socket!.emit('message', { roomId, content }, (res: { id?: string; error?: string }) => {
if (res.error) {
reject(new Error(res.error))
} else {
resolve(res.id!)
}
})
})
}
startTyping(roomId: string): void {
this.ensureConnected()
this.socket!.emit('typing', { roomId })
}
stopTyping(roomId: string): void {
this.ensureConnected()
this.socket!.emit('stop_typing', { roomId })
}
emitContextStatus(roomId: string, status: 'compressing' | 'replying' | 'ready'): void {
this.ensureConnected()
this.socket!.emit('context_status', { roomId, agentName: this.name, status })
}
getJoinedRooms(): string[] {
return Array.from(this.joinedRooms)
}
private ensureConnected(): void {
if (!this.socket?.connected) {
throw new Error(`Agent "${this.name}" is not connected`)
}
}
private async deleteSession(sessionId: string): Promise<void> {
try {
const sessionProfile = this.storage?.getSessionProfile?.(sessionId)
const currentProfile = getActiveProfileName()
if (sessionProfile && sessionProfile.profile_name !== currentProfile) {
// Cross-profile: enqueue deferred delete, don't switch profile
this.storage?.enqueuePendingSessionDelete?.(sessionId, sessionProfile.profile_name)
logger.info(`[AgentClients] ${this.name}: cross-profile deferred delete session ${sessionId} (session=${sessionProfile.profile_name}, active=${currentProfile})`)
return
}
// Same profile or no mapping: delete directly
const ok = await hermesDeleteSession(sessionId)
if (ok) {
this.storage?.deleteSessionProfile?.(sessionId)
}
logger.debug(`[AgentClients] ${this.name}: delete session ${sessionId} (profile=${this.profile}) → ${ok ? 'ok' : 'failed'}`)
} catch (err: any) {
logger.warn(`[AgentClients] ${this.name}: failed to delete session ${sessionId}: ${err.message}`)
}
}
// ─── Hermes Gateway Integration ────────────────────────────
/**
* Handle an @mention from the server side.
* Called by AgentClients.processMentions() — no socket round-trip needed.
* onStatus is called to report context compression progress.
*/
async replyToMention(
roomId: string,
msg: { content: string; senderName: string; senderId: string; timestamp: number },
onStatus?: (status: 'compressing' | 'replying' | 'ready') => void,
): Promise<void> {
logger.debug(`[AgentClients] ${this.name} mentioned by ${msg.senderName}: "${msg.content.slice(0, 50)}"`)
if (!this.gatewayManager) {
logger.debug(`[AgentClients] ${this.name}: gatewayManager is null, skipping`)
return
}
const upstream = this.gatewayManager.getUpstream(this.profile)
const apiKey = this.gatewayManager.getApiKey(this.profile)
logger.debug(`[AgentClients] ${this.name}: upstream=${upstream}, profile=${this.profile}`)
if (!upstream) {
logger.error(`[AgentClients] ${this.name}: no gateway upstream for profile "${this.profile}"`)
return
}
const sessionId = Date.now().toString(36) + Math.random().toString(36).slice(2, 8)
try {
// Notify room that agent is typing
this.startTyping(roomId)
// Build compressed context if context engine is available
let conversationHistory: Array<{ role: string; content: string }> = []
let instructions: string | undefined
if (this.contextEngine && this.storage) {
try {
logger.debug(`[AgentClients] ${this.name}: building context...`)
onStatus?.('compressing')
// Get room members with descriptions for context
const roomMembers: Array<{ userId: string; name: string; description: string }> = this.storage.getRoomMembers(roomId) || []
const memberNames = roomMembers.map((m: any) => m.name)
const members = roomMembers.map((m: any) => ({ userId: m.userId, name: m.name, description: m.description }))
// Get room compression config
const roomInfo = this.storage.getRoom(roomId)
const compression = roomInfo ? {
triggerTokens: roomInfo.triggerTokens,
maxHistoryTokens: roomInfo.maxHistoryTokens,
tailMessageCount: roomInfo.tailMessageCount,
} : undefined
const ctx = await this.contextEngine.buildContext({
roomId,
agentId: this.agentId,
agentName: this.name,
agentDescription: this.description,
agentSocketId: this.socket?.id || '',
roomName: roomId,
memberNames,
members,
upstream,
apiKey,
currentMessage: msg,
compression,
})
conversationHistory = ctx.conversationHistory
instructions = ctx.instructions
logger.debug(`[AgentClients] ${this.name}: context built — historyLen=${conversationHistory.length}, meta=%j`, ctx.meta)
onStatus?.('replying')
} catch (err: any) {
logger.warn(`[AgentClients] ${this.name}: context engine failed: ${err.message}`)
onStatus?.('replying')
// Degrade: continue without context
}
}
// Strip @mention from input — agent already knows it was mentioned
const input = msg.content.replace(new RegExp(`@${this.name.replace(/[.*+?^${}()|[\]\\]/g, '\\$&')}\\s*`, 'gi'), '').trim() || msg.content
// Start a run on Hermes gateway
const runRes = await fetch(`${upstream}/v1/runs`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...(apiKey ? { Authorization: `Bearer ${apiKey}` } : {}),
},
body: JSON.stringify({
input,
session_id: sessionId,
...(conversationHistory.length > 0 ? { conversation_history: conversationHistory } : {}),
...(instructions ? { instructions } : {}),
}),
signal: AbortSignal.timeout(120000),
})
if (!runRes.ok) {
const text = await runRes.text().catch(() => '')
logger.error(`[AgentClients] ${this.name}: gateway run failed (${runRes.status}): ${text}`)
this.stopTyping(roomId)
return
}
const runData = await runRes.json() as any
const run_id = runData.run_id
logger.debug(`[AgentClients] ${this.name}: run started, response=%j`, runData)
if (!run_id) {
logger.error(`[AgentClients] ${this.name}: no run_id in response`)
this.stopTyping(roomId)
return
}
// Save session-to-profile mapping after gateway confirms the run
const actualSessionId = runData.session_id || sessionId
if (!this.storage) {
logger.warn(`[AgentClients] ${this.name}: storage is null, cannot save session profile for ${actualSessionId}`)
} else {
this.storage.saveSessionProfile(actualSessionId, roomId, this.agentId, this.profile)
logger.debug(`[AgentClients] ${this.name}: saved session profile ${actualSessionId} → profile=${this.profile}`)
}
// Stream events from Hermes
const eventsUrl = new URL(`${upstream}/v1/runs/${run_id}/events`)
if (apiKey) eventsUrl.searchParams.set('token', apiKey)
logger.debug(`[AgentClients] ${this.name}: streaming events from ${eventsUrl}`)
const source = new EventSource(eventsUrl.toString())
let fullContent = ''
source.onmessage = (e: any) => {
try {
const parsed = JSON.parse(e.data)
logger.debug(`[AgentClients] ${this.name}: event=${parsed.event}`)
if (parsed.event === 'run.completed') {
source.close()
logger.debug(`[AgentClients] ${this.name}: run completed, content length=${fullContent.length}`)
if (fullContent) {
this.stopTyping(roomId)
this.sendMessage(roomId, fullContent)
}
this.deleteSession(actualSessionId).catch(() => { })
onStatus?.('ready')
return
}
if (parsed.event === 'run.failed') {
source.close()
logger.error(`[AgentClients] ${this.name}: run failed`)
this.stopTyping(roomId)
this.deleteSession(actualSessionId).catch(() => { })
onStatus?.('ready')
return
}
// Accumulate message deltas
if (parsed.event === 'message.delta' && parsed.delta) {
fullContent += parsed.delta
}
} catch {
// ignore parse errors
}
}
source.onerror = (err: any) => {
logger.error(err, `[AgentClients] ${this.name}: EventSource error`)
source.close()
this.stopTyping(roomId)
this.deleteSession(actualSessionId).catch(() => { })
onStatus?.('ready')
}
} catch (err: any) {
logger.error(`[AgentClients] ${this.name}: error handling message: ${err.message}`)
this.stopTyping(roomId)
this.deleteSession(sessionId).catch(() => { })
onStatus?.('ready')
}
}
private bindEvents(): void {
const s = this.socket!
s.on('typing', (data: any) => {
this.handlers.onTyping?.(data)
})
s.on('stop_typing', (data: any) => {
this.handlers.onStopTyping?.(data)
})
s.on('member_joined', (data: any) => {
this.handlers.onMemberJoined?.(data)
})
s.on('member_left', (data: any) => {
this.handlers.onMemberLeft?.(data)
})
// Auto rejoin rooms on reconnect
s.io.on('reconnect', async () => {
if (this._reconnecting) return
this._reconnecting = true
logger.info(`[AgentClients] ${this.name} reconnecting, rejoining ${this.joinedRooms.size} rooms...`)
const rooms = Array.from(this.joinedRooms)
for (const roomId of rooms) {
try {
await this.joinRoom(roomId)
} catch (err: any) {
logger.error(`[AgentClients] ${this.name} failed to rejoin room ${roomId}: ${err.message}`)
}
}
this._reconnecting = false
})
}
}
// ─── AgentClients (roomId -> agents) ──────────────────────────
export class AgentClients {
private rooms = new Map<string, Map<string, AgentClient>>()
private _gatewayManager: GatewayManager | null = null
private _contextEngine: any = null
private _storage: any = null
// Per-room processing lock + mention queue
private _processingRooms = new Set<string>()
private _mentionQueue = new Map<string, Array<{ agent: AgentClient; msg: { content: string; senderName: string; senderId: string; timestamp: number } }>>()
/**
* Create an agent client and connect it to the server.
* The agent will NOT auto-join any room — call addAgentToRoom separately.
*/
async createAgent(config: AgentConfig, handlers?: AgentEventHandler, port?: number): Promise<AgentClient> {
const client = new AgentClient(config, handlers)
await client.connect(port)
// Auto-apply stored references (fixes propagation for agents created after set*)
if (this._gatewayManager) client.setGatewayManager(this._gatewayManager)
if (this._contextEngine) client.setContextEngine(this._contextEngine)
if (this._storage) client.setStorage(this._storage)
logger.info(`[AgentClients] Connected: ${client.name} (${client.agentId})`)
return client
}
/**
* Connect an agent to a room.
*/
async addAgentToRoom(roomId: string, client: AgentClient): Promise<JoinResult> {
let room = this.rooms.get(roomId)
if (!room) {
room = new Map()
this.rooms.set(roomId, room)
}
room.set(client.agentId, client)
const result = await client.joinRoom(roomId)
logger.info(`[AgentClients] ${client.name} joined room: ${roomId}`)
return result
}
/**
* Remove an agent from a room and disconnect it.
*/
removeAgentFromRoom(roomId: string, agentId: string): void {
const room = this.rooms.get(roomId)
if (!room) return
const client = room.get(agentId)
if (client) {
client.disconnect()
room.delete(agentId)
logger.info(`[AgentClients] ${client.name} left room: ${roomId}`)
// Invalidate context engine cache for this agent
if (this._contextEngine) {
try { this._contextEngine.invalidateRoom(roomId) } catch { /* ignore */ }
}
}
if (room.size === 0) {
this.rooms.delete(roomId)
}
}
/**
* Get all agents in a room.
*/
getAgents(roomId: string): AgentClient[] {
const room = this.rooms.get(roomId)
return room ? Array.from(room.values()) : []
}
/**
* Get a specific agent in a room.
*/
getAgent(roomId: string, agentId: string): AgentClient | undefined {
return this.rooms.get(roomId)?.get(agentId)
}
/**
* Get all room IDs that have agents.
*/
getRoomIds(): string[] {
return Array.from(this.rooms.keys())
}
/**
* Send a message from a specific agent in a room.
*/
async sendMessage(roomId: string, agentId: string, content: string): Promise<string> {
const client = this.getAgent(roomId, agentId)
if (!client) {
throw new Error(`Agent "${agentId}" not found in room "${roomId}"`)
}
return client.sendMessage(roomId, content)
}
/**
* Broadcast a message from all agents in a room.
*/
async broadcastFromRoom(roomId: string, content: string): Promise<string[]> {
const agents = this.getAgents(roomId)
return Promise.all(agents.map((agent) => agent.sendMessage(roomId, content)))
}
/**
* Disconnect all agents in a room.
*/
disconnectRoom(roomId: string): void {
const room = this.rooms.get(roomId)
if (!room) return
room.forEach((client) => client.disconnect())
this.rooms.delete(roomId)
logger.info(`[AgentClients] All agents disconnected from room: ${roomId}`)
// Invalidate context engine cache for this room
if (this._contextEngine) {
try { this._contextEngine.invalidateRoom(roomId) } catch { /* ignore */ }
}
}
/**
* Disconnect all agents in all rooms.
*/
disconnectAll(): void {
this.rooms.forEach((room) => {
room.forEach((client) => client.disconnect())
})
this.rooms.clear()
logger.info('[AgentClients] All agents disconnected')
}
/**
* Set gateway manager for all existing and future agents.
*/
setGatewayManager(manager: GatewayManager): void {
this._gatewayManager = manager
this.rooms.forEach((room) => {
room.forEach((client) => client.setGatewayManager(manager))
})
}
/**
* Set context engine for all existing and future agents.
*/
setContextEngine(engine: any): void {
this._contextEngine = engine
this.rooms.forEach((room) => {
room.forEach((client) => client.setContextEngine(engine))
})
}
/**
* Set message storage for all existing and future agents.
*/
setStorage(storage: any): void {
this._storage = storage
this.rooms.forEach((room) => {
room.forEach((client) => client.setStorage(storage))
})
}
/**
* Server-side: parse @mentions and forward to matching agents directly.
* If the room is already processing (compressing/replying), queue the mention.
*/
async processMentions(roomId: string, msg: { content: string; senderName: string; senderId: string; timestamp: number }): Promise<void> {
if (!this._gatewayManager) return
const content = msg.content.toLowerCase()
const agents = this.getAgents(roomId)
const mentioned = agents.filter(a => content.includes(`@${a.name.toLowerCase()}`))
if (mentioned.length === 0) return
logger.debug(`[AgentClients] ${mentioned.map(a => a.name).join(', ')} mentioned by ${msg.senderName}`)
for (const agent of mentioned) {
this._processAgentMention(roomId, agent, msg).catch((err) => {
logger.error(`[AgentClients] error processing mention for ${agent.name}: ${err.message}`)
})
}
}
/**
* Process a single agent mention with status reporting and queue drain.
*/
private async _processAgentMention(
roomId: string,
agent: AgentClient,
msg: { content: string; senderName: string; senderId: string; timestamp: number },
): Promise<void> {
const agentKey = `${roomId}:${agent.name}`
if (this._processingRooms.has(agentKey)) {
// Queue for this specific agent
let queue = this._mentionQueue.get(agentKey)
if (!queue) {
queue = []
this._mentionQueue.set(agentKey, queue)
}
queue.push({ agent, msg })
logger.debug(`[AgentClients] agent ${agent.name} is processing, queued mention in room ${roomId}`)
return
}
this._processingRooms.add(agentKey)
const onStatus = (status: 'compressing' | 'replying' | 'ready') => {
agent.emitContextStatus(roomId, status)
logger.debug(`[AgentClients] room ${roomId} agent ${agent.name} status: ${status}`)
}
try {
await agent.replyToMention(roomId, msg, onStatus)
} finally {
this._processingRooms.delete(agentKey)
await this._drainQueue(agentKey, roomId)
}
}
/**
* Drain queued mentions for a room after processing completes.
*/
private async _drainQueue(agentKey: string, roomId: string): Promise<void> {
const queue = this._mentionQueue.get(agentKey)
if (!queue || queue.length === 0) return
this._mentionQueue.delete(agentKey)
logger.debug(`[AgentClients] draining ${queue.length} queued mention(s) for ${agentKey}`)
// Process the last queued mention only (most recent, discards stale intermediate ones)
const last = queue[queue.length - 1]
this._processingRooms.add(agentKey)
this._processAgentMention(roomId, last.agent, last.msg).catch((err) => {
logger.error(`[AgentClients] error processing queued mention: ${err.message}`)
})
}
}
@@ -0,0 +1,866 @@
import { Server, Socket, Namespace } from 'socket.io'
import type { Server as HttpServer } from 'http'
import { getToken } from '../../../services/auth'
import { logger } from '../../../services/logger'
import { getDb, ensureTable } from '../../../db'
import { AgentClients } from './agent-clients'
import { deleteSession as hermesDeleteSession } from '../hermes-cli'
import { ContextEngine } from '../context-engine/compressor'
// ─── Types ────────────────────────────────────────────────────
interface ChatMessage {
id: string
roomId: string
senderId: string
senderName: string
content: string
timestamp: number
}
interface RoomAgent {
id: string
roomId: string
agentId: string
profile: string
name: string
description: string
invited: number
}
interface Member {
id: string
userId: string
name: string
description: string
joinedAt: number
online: boolean
socketId: string
}
// ─── SQLite Storage (global DB) ──────────────────────────────
const GC_PENDING_SESSION_DELETES_SCHEMA: Record<string, string> = {
session_id: 'TEXT PRIMARY KEY',
profile_name: 'TEXT NOT NULL',
status: "TEXT NOT NULL DEFAULT 'pending'",
attempt_count: 'INTEGER NOT NULL DEFAULT 0',
last_error: 'TEXT',
created_at: 'INTEGER NOT NULL',
updated_at: 'INTEGER NOT NULL',
next_attempt_at: 'INTEGER NOT NULL DEFAULT 0',
}
const GC_SESSION_PROFILES_SCHEMA: Record<string, string> = {
session_id: 'TEXT PRIMARY KEY',
room_id: 'TEXT NOT NULL',
agent_id: 'TEXT NOT NULL',
profile_name: 'TEXT NOT NULL',
created_at: 'INTEGER NOT NULL',
}
const GC_ROOMS_SCHEMA: Record<string, string> = {
id: 'TEXT PRIMARY KEY',
name: 'TEXT NOT NULL',
inviteCode: 'TEXT UNIQUE',
triggerTokens: 'INTEGER NOT NULL DEFAULT 100000',
maxHistoryTokens: 'INTEGER NOT NULL DEFAULT 32000',
tailMessageCount: 'INTEGER NOT NULL DEFAULT 20',
totalTokens: 'INTEGER NOT NULL DEFAULT 0',
}
const GC_MESSAGES_SCHEMA: Record<string, string> = {
id: 'TEXT PRIMARY KEY',
roomId: 'TEXT NOT NULL',
senderId: 'TEXT NOT NULL',
senderName: 'TEXT NOT NULL',
content: 'TEXT NOT NULL',
timestamp: 'INTEGER NOT NULL',
}
const GC_ROOM_AGENTS_SCHEMA: Record<string, string> = {
id: 'TEXT PRIMARY KEY',
roomId: 'TEXT NOT NULL',
agentId: 'TEXT NOT NULL',
profile: 'TEXT NOT NULL',
name: 'TEXT NOT NULL',
description: "TEXT NOT NULL DEFAULT ''",
invited: 'INTEGER NOT NULL DEFAULT 0',
}
const GC_CONTEXT_SNAPSHOTS_SCHEMA: Record<string, string> = {
roomId: 'TEXT PRIMARY KEY',
summary: 'TEXT NOT NULL DEFAULT \'\'',
lastMessageId: 'TEXT NOT NULL',
lastMessageTimestamp: 'INTEGER NOT NULL',
updatedAt: 'INTEGER NOT NULL',
}
const GC_ROOM_MEMBERS_SCHEMA: Record<string, string> = {
id: 'TEXT PRIMARY KEY',
roomId: 'TEXT NOT NULL',
userId: 'TEXT NOT NULL',
userName: 'TEXT NOT NULL',
description: "TEXT NOT NULL DEFAULT ''",
joinedAt: 'INTEGER NOT NULL',
updatedAt: 'INTEGER NOT NULL',
}
let _tablesEnsured = false
interface PendingSessionDelete {
session_id: string
profile_name: string
status: string
attempt_count: number
last_error: string | null
created_at: number
updated_at: number
next_attempt_at: number
}
interface GroupChatSessionProfile {
session_id: string
room_id: string
agent_id: string
profile_name: string
created_at: number
}
export interface PendingSessionDeleteDrainResult {
deleted: string[]
failed: Array<{ sessionId: string; error: string }>
}
class ChatStorage {
private db() { return getDb() }
init(): void {
if (_tablesEnsured) return
const db = this.db()
if (!db) return
ensureTable('gc_rooms', GC_ROOMS_SCHEMA)
ensureTable('gc_messages', GC_MESSAGES_SCHEMA)
ensureTable('gc_room_agents', GC_ROOM_AGENTS_SCHEMA)
ensureTable('gc_context_snapshots', GC_CONTEXT_SNAPSHOTS_SCHEMA)
ensureTable('gc_room_members', GC_ROOM_MEMBERS_SCHEMA)
ensureTable('gc_pending_session_deletes', GC_PENDING_SESSION_DELETES_SCHEMA)
ensureTable('gc_session_profiles', GC_SESSION_PROFILES_SCHEMA)
// Indexes (safe to run multiple times — CREATE INDEX IF NOT EXISTS)
try { db.exec('CREATE INDEX IF NOT EXISTS idx_gc_messages_room ON gc_messages(roomId, timestamp)') } catch { /* ignore */ }
try { db.exec('CREATE INDEX IF NOT EXISTS idx_gc_room_agents_room ON gc_room_agents(roomId)') } catch { /* ignore */ }
try { db.exec('CREATE UNIQUE INDEX IF NOT EXISTS idx_gc_room_members_unique ON gc_room_members(roomId, userId)') } catch { /* ignore */ }
try { db.exec('CREATE INDEX IF NOT EXISTS idx_gc_pending_session_deletes_profile ON gc_pending_session_deletes(profile_name, status, next_attempt_at, created_at)') } catch { /* ignore */ }
try { db.exec('CREATE INDEX IF NOT EXISTS idx_gc_session_profiles_profile ON gc_session_profiles(profile_name, created_at)') } catch { /* ignore */ }
_tablesEnsured = true
}
saveSessionProfile(sessionId: string, roomId: string, agentId: string, profileName: string): void {
this.db()?.prepare(
'INSERT INTO gc_session_profiles (session_id, room_id, agent_id, profile_name, created_at) VALUES (?, ?, ?, ?, ?) ON CONFLICT(session_id) DO UPDATE SET room_id = excluded.room_id, agent_id = excluded.agent_id, profile_name = excluded.profile_name'
).run(sessionId, roomId, agentId, profileName, Date.now())
}
getSessionProfile(sessionId: string): GroupChatSessionProfile | null {
return (this.db()?.prepare(
'SELECT session_id, room_id, agent_id, profile_name, created_at FROM gc_session_profiles WHERE session_id = ?'
).get(sessionId) as GroupChatSessionProfile | undefined) ?? null
}
deleteSessionProfile(sessionId: string): void {
this.db()?.prepare('DELETE FROM gc_session_profiles WHERE session_id = ?').run(sessionId)
}
listPendingSessionDeletes(profileName: string, limit = 50): PendingSessionDelete[] {
const rows = this.db()?.prepare(
`SELECT session_id, profile_name, status, attempt_count, last_error, created_at, updated_at, next_attempt_at
FROM gc_pending_session_deletes
WHERE profile_name = ? AND status = 'pending' AND next_attempt_at <= ?
ORDER BY created_at ASC
LIMIT ?`
).all(profileName, Date.now(), limit) || []
return rows.map((row: any) => ({
session_id: String(row.session_id || ''),
profile_name: String(row.profile_name || ''),
status: String(row.status || 'pending'),
attempt_count: Number(row.attempt_count || 0),
last_error: row.last_error == null ? null : String(row.last_error),
created_at: Number(row.created_at || 0),
updated_at: Number(row.updated_at || 0),
next_attempt_at: Number(row.next_attempt_at || 0),
}))
}
enqueuePendingSessionDelete(sessionId: string, profileName: string): void {
const now = Date.now()
this.db()?.prepare(
`INSERT INTO gc_pending_session_deletes (session_id, profile_name, status, attempt_count, last_error, created_at, updated_at, next_attempt_at)
VALUES (?, ?, 'pending', 0, NULL, ?, ?, 0)
ON CONFLICT(session_id) DO UPDATE SET
profile_name = excluded.profile_name,
status = 'pending',
updated_at = excluded.updated_at,
next_attempt_at = 0`
).run(sessionId, profileName, now, now)
}
claimPendingSessionDeletes(profileName: string, limit = 50): PendingSessionDelete[] {
const rows = this.listPendingSessionDeletes(profileName, limit)
if (rows.length === 0) return []
const now = Date.now()
const stmt = this.db()?.prepare(
`UPDATE gc_pending_session_deletes
SET status = 'processing', updated_at = ?
WHERE session_id = ? AND status = 'pending'`
)
const claimed: PendingSessionDelete[] = []
for (const row of rows) {
const result = stmt?.run(now, row.session_id)
if (result?.changes) {
claimed.push({ ...row, status: 'processing', updated_at: now })
}
}
return claimed
}
markPendingSessionDeleteFailed(sessionId: string, error: string): void {
const now = Date.now()
this.db()?.prepare(
`UPDATE gc_pending_session_deletes
SET status = 'pending',
attempt_count = attempt_count + 1,
last_error = ?,
updated_at = ?,
next_attempt_at = ?
WHERE session_id = ?`
).run(error, now, now + 60_000, sessionId)
}
removePendingSessionDelete(sessionId: string): void {
this.db()?.prepare('DELETE FROM gc_pending_session_deletes WHERE session_id = ?').run(sessionId)
}
getPendingDeletedSessionIds(): Set<string> {
const rows = (this.db()?.prepare(
`SELECT session_id FROM gc_pending_session_deletes WHERE status IN ('pending', 'processing')`
).all() || []) as Array<{ session_id: string }>
return new Set(rows.map(row => row.session_id))
}
// ─── Rooms ────────────────────────────────────────────────
getRoom(roomId: string): { id: string; name: string; inviteCode: string | null; triggerTokens: number; maxHistoryTokens: number; tailMessageCount: number; totalTokens: number } | undefined {
return this.db()?.prepare('SELECT id, name, inviteCode, triggerTokens, maxHistoryTokens, tailMessageCount, totalTokens FROM gc_rooms WHERE id = ?').get(roomId) as any
}
getRoomByInviteCode(code: string): { id: string; name: string; inviteCode: string | null; triggerTokens: number; maxHistoryTokens: number; tailMessageCount: number; totalTokens: number } | undefined {
return this.db()?.prepare('SELECT id, name, inviteCode, triggerTokens, maxHistoryTokens, tailMessageCount, totalTokens FROM gc_rooms WHERE inviteCode = ?').get(code) as any
}
getAllRooms(): { id: string; name: string; inviteCode: string | null; triggerTokens: number; maxHistoryTokens: number; tailMessageCount: number; totalTokens: number }[] {
return (this.db()?.prepare('SELECT id, name, inviteCode, triggerTokens, maxHistoryTokens, tailMessageCount, totalTokens FROM gc_rooms ORDER BY id').all() || []) as any[]
}
saveRoom(id: string, name: string, inviteCode?: string, config?: { triggerTokens?: number; maxHistoryTokens?: number; tailMessageCount?: number }): void {
this.db()?.prepare(
'INSERT OR IGNORE INTO gc_rooms (id, name, inviteCode, triggerTokens, maxHistoryTokens, tailMessageCount) VALUES (?, ?, ?, ?, ?, ?)'
).run(id, name, inviteCode || null, config?.triggerTokens ?? 100000, config?.maxHistoryTokens ?? 32000, config?.tailMessageCount ?? 20)
}
updateRoomConfig(roomId: string, config: { triggerTokens?: number; maxHistoryTokens?: number; tailMessageCount?: number }): void {
const sets: string[] = []
const vals: any[] = []
if (config.triggerTokens !== undefined) { sets.push('triggerTokens = ?'); vals.push(config.triggerTokens) }
if (config.maxHistoryTokens !== undefined) { sets.push('maxHistoryTokens = ?'); vals.push(config.maxHistoryTokens) }
if (config.tailMessageCount !== undefined) { sets.push('tailMessageCount = ?'); vals.push(config.tailMessageCount) }
if (sets.length === 0) return
vals.push(roomId)
this.db()?.prepare(`UPDATE gc_rooms SET ${sets.join(', ')} WHERE id = ?`).run(...vals)
}
updateRoomInviteCode(roomId: string, inviteCode: string): void {
this.db()?.prepare('UPDATE gc_rooms SET inviteCode = ? WHERE id = ?').run(inviteCode, roomId)
}
updateRoomTotalTokens(roomId: string, tokens: number): void {
this.db()?.prepare('UPDATE gc_rooms SET totalTokens = ? WHERE id = ?').run(tokens, roomId)
}
estimateTokens(text: string): number {
const cjk = (text.match(/[\u2e80-\u9fff\uac00-\ud7af\u3000-\u303f\uff00-\uffef]/g) || []).length
const other = text.length - cjk
return Math.ceil(cjk * 1.5 + other / 4)
}
// ─── Messages ─────────────────────────────────────────────
getMessages(roomId: string, limit = 500): ChatMessage[] {
const rows = (this.db()?.prepare(
'SELECT id, roomId, senderId, senderName, content, timestamp FROM gc_messages WHERE roomId = ? ORDER BY timestamp DESC LIMIT ?'
).all(roomId, limit) || []) as any[]
return rows.reverse()
}
addMessage(msg: ChatMessage): void {
this.db()?.prepare(
'INSERT INTO gc_messages (id, roomId, senderId, senderName, content, timestamp) VALUES (?, ?, ?, ?, ?, ?)'
).run(msg.id, msg.roomId, msg.senderId, msg.senderName, msg.content, msg.timestamp)
}
pruneMessages(roomId: string, keep = 500): void {
const db = this.db()
if (!db) return
const count = (db.prepare('SELECT COUNT(*) as c FROM gc_messages WHERE roomId = ?').get(roomId) as any)?.c
if (count > keep) {
const cutoff = db.prepare(
'SELECT timestamp FROM gc_messages WHERE roomId = ? ORDER BY timestamp DESC LIMIT 1 OFFSET ?'
).get(roomId, keep - 1) as any
if (cutoff) {
const result = db.prepare('DELETE FROM gc_messages WHERE roomId = ? AND timestamp < ?').run(roomId, cutoff.timestamp)
logger.info(`[GroupChat] pruned ${result.changes} messages from room ${roomId} (had ${count}, keeping ${keep})`)
}
}
}
// ─── Room Agents ──────────────────────────────────────────
getRoomAgents(roomId: string): RoomAgent[] {
return (this.db()?.prepare(
'SELECT id, roomId, agentId, profile, name, description, invited FROM gc_room_agents WHERE roomId = ?'
).all(roomId) || []) as unknown as RoomAgent[]
}
addRoomAgent(roomId: string, agentId: string, profile: string, name: string, description: string, invited: number): RoomAgent {
const id = Date.now().toString(36) + Math.random().toString(36).slice(2, 8)
this.db()?.prepare(
'INSERT INTO gc_room_agents (id, roomId, agentId, profile, name, description, invited) VALUES (?, ?, ?, ?, ?, ?, ?)'
).run(id, roomId, agentId, profile, name, description, invited)
return { id, roomId, agentId, profile, name, description, invited }
}
removeRoomAgent(agentId: string): void {
this.db()?.prepare('DELETE FROM gc_room_agents WHERE id = ?').run(agentId)
}
// ─── Context Snapshots ──────────────────────────────────
getContextSnapshot(roomId: string): { roomId: string; summary: string; lastMessageId: string; lastMessageTimestamp: number; updatedAt: number } | null {
return (this.db()?.prepare(
'SELECT roomId, summary, lastMessageId, lastMessageTimestamp, updatedAt FROM gc_context_snapshots WHERE roomId = ?'
).get(roomId) as any) ?? null
}
saveContextSnapshot(roomId: string, summary: string, lastMessageId: string, lastMessageTimestamp: number): void {
this.db()?.prepare(
'INSERT INTO gc_context_snapshots (roomId, summary, lastMessageId, lastMessageTimestamp, updatedAt) VALUES (?, ?, ?, ?, ?) ON CONFLICT(roomId) DO UPDATE SET summary = excluded.summary, lastMessageId = excluded.lastMessageId, lastMessageTimestamp = excluded.lastMessageTimestamp, updatedAt = excluded.updatedAt'
).run(roomId, summary, lastMessageId, lastMessageTimestamp, Date.now())
}
deleteContextSnapshot(roomId: string): void {
this.db()?.prepare('DELETE FROM gc_context_snapshots WHERE roomId = ?').run(roomId)
}
deleteRoom(roomId: string): void {
const db = this.db()
if (!db) return
db.prepare('DELETE FROM gc_messages WHERE roomId = ?').run(roomId)
db.prepare('DELETE FROM gc_room_agents WHERE roomId = ?').run(roomId)
db.prepare('DELETE FROM gc_room_members WHERE roomId = ?').run(roomId)
db.prepare('DELETE FROM gc_context_snapshots WHERE roomId = ?').run(roomId)
db.prepare('DELETE FROM gc_rooms WHERE id = ?').run(roomId)
}
// ─── Room Members ──────────────────────────────────────
getRoomMembers(roomId: string): { id: string; userId: string; name: string; description: string; joinedAt: number }[] {
return (this.db()?.prepare(
'SELECT id, userId, userName as name, description, joinedAt FROM gc_room_members WHERE roomId = ? ORDER BY joinedAt'
).all(roomId) || []) as unknown as { id: string; userId: string; name: string; description: string; joinedAt: number }[]
}
addRoomMember(roomId: string, userId: string, userName: string, description: string): void {
const existing = this.getMemberByUserId(roomId, userId)
if (existing) {
// Update name/description on rejoin, refresh updatedAt
this.db()?.prepare(
'UPDATE gc_room_members SET userName = ?, description = ?, updatedAt = ? WHERE roomId = ? AND userId = ?'
).run(userName, description, Date.now(), roomId, userId)
return
}
const id = Date.now().toString(36) + Math.random().toString(36).slice(2, 8)
const now = Date.now()
this.db()?.prepare(
'INSERT INTO gc_room_members (id, roomId, userId, userName, description, joinedAt, updatedAt) VALUES (?, ?, ?, ?, ?, ?, ?)'
).run(id, roomId, userId, userName, description, now, now)
}
getMemberByUserId(roomId: string, userId: string): Member | null {
return (this.db()?.prepare(
'SELECT id, userId, userName as name, description, joinedAt FROM gc_room_members WHERE roomId = ? AND userId = ?'
).get(roomId, userId) as any) ?? null
}
updateMemberActivity(roomId: string, userId: string): void {
this.db()?.prepare(
'UPDATE gc_room_members SET updatedAt = ? WHERE roomId = ? AND userId = ?'
).run(Date.now(), roomId, userId)
}
}
export async function drainPendingSessionDeletes(profileName: string): Promise<PendingSessionDeleteDrainResult> {
const storage = new ChatStorage()
storage.init()
const claimed = storage.claimPendingSessionDeletes(profileName)
const result: PendingSessionDeleteDrainResult = { deleted: [], failed: [] }
for (const item of claimed) {
try {
const ok = await hermesDeleteSession(item.session_id)
if (!ok) {
throw new Error('Failed to delete session')
}
storage.removePendingSessionDelete(item.session_id)
storage.deleteSessionProfile(item.session_id)
result.deleted.push(item.session_id)
} catch (err: any) {
const message = err?.message || 'Failed to delete session'
storage.markPendingSessionDeleteFailed(item.session_id, message)
result.failed.push({ sessionId: item.session_id, error: message })
}
}
return result
}
// ─── ChatRoom (in-memory, for online members) ─────────────────
class ChatRoom {
readonly id: string
name: string
readonly members = new Map<string, Member>()
constructor(id: string, name?: string) {
this.id = id
this.name = name || id
}
addOrUpdateMember(socketId: string, userId: string, name: string, description: string): Member {
const existing = this.members.get(userId)
if (existing) {
existing.name = name
existing.description = description
existing.online = true
existing.socketId = socketId
return existing
}
const member: Member = { id: socketId, userId, name, description, joinedAt: Date.now(), online: true, socketId }
this.members.set(userId, member)
return member
}
removeMember(socketId: string): void {
for (const member of this.members.values()) {
if (member.socketId === socketId) {
member.online = false
break
}
}
}
getMembersList(): Member[] {
return Array.from(this.members.values())
}
getOnlineMemberBySocketId(socketId: string): Member | undefined {
for (const member of this.members.values()) {
if (member.socketId === socketId && member.online) return member
}
return undefined
}
hasOnlineMember(socketId: string): boolean {
return this.getOnlineMemberBySocketId(socketId) !== undefined
}
}
// ─── GroupChat Server ────────────────────────────────────────
export class GroupChatServer {
private io: Server
private nsp: Namespace
private storage: ChatStorage
private rooms = new Map<string, ChatRoom>()
/** Map: socket.id → persistent userId */
private socketUserMap = new Map<string, string>()
/** Map: userId → { name, description } (from auth) */
private userInfoMap = new Map<string, { name: string; description: string }>()
readonly agentClients = new AgentClients()
private _contextEngine: ContextEngine | null = null
private _restoreScheduled = false
/** roomId -> (userId -> { userName, timer }) */
private typingState = new Map<string, Map<string, { userName: string; timer: ReturnType<typeof setTimeout> }>>()
/** roomId -> (agentName -> { agentName, status }) */
private contextStatusState = new Map<string, Map<string, { agentName: string; status: string }>>()
setGatewayManager(manager: any): void {
this.agentClients.setGatewayManager(manager)
if (this._contextEngine && manager) {
this._contextEngine.setUpstream(manager.getUpstream(''), manager.getApiKey(''))
}
}
constructor(httpServer: HttpServer) {
this.storage = new ChatStorage()
this.storage.init()
this.io = new Server(httpServer, {
cors: { origin: '*' }
})
this.nsp = this.io.of('/group-chat')
this.nsp.use(this.authMiddleware.bind(this))
this.nsp.on('connection', this.onConnection.bind(this))
// Restore persisted rooms into memory
this.storage.getAllRooms().forEach((row) => {
this.rooms.set(row.id, new ChatRoom(row.id, row.name))
})
logger.info('[GroupChat] Socket.IO ready at /group-chat')
// Initialize context engine for group chat compression
const contextEngine = new ContextEngine({
messageFetcher: this.storage,
sessionCleaner: async (sessionId: string) => {
try {
await hermesDeleteSession(sessionId)
} catch (err: any) {
logger.warn(`[GroupChat] failed to delete compression session ${sessionId}: ${err.message}`)
}
},
})
this.agentClients.setContextEngine(contextEngine)
this.agentClients.setStorage(this.storage)
this._contextEngine = contextEngine
// Restore agent connections — call restoreAgents() after server is listening
this._restoreScheduled = false
}
getIO(): Server {
return this.io
}
getStorage(): ChatStorage {
return this.storage
}
getContextEngine(): ContextEngine | null {
return this._contextEngine || null
}
getRoomIds(): string[] {
return Array.from(this.rooms.keys())
}
// ─── Restore Agents ─────────────────────────────────────────
/**
* Restore persisted agent connections. Safe to call multiple times;
* will only execute once.
*/
async restoreWhenReady(): Promise<void> {
if (this._restoreScheduled) return
this._restoreScheduled = true
await this.restoreAgents()
}
private async restoreAgents(): Promise<void> {
const rooms = this.storage.getAllRooms()
let total = 0
for (const room of rooms) {
const agents = this.storage.getRoomAgents(room.id)
for (const agent of agents) {
try {
const client = await this.agentClients.createAgent({
profile: agent.profile,
name: agent.name,
description: agent.description,
invited: agent.invited,
})
await this.agentClients.addAgentToRoom(room.id, client)
total++
} catch (err: any) {
logger.error(`[GroupChat] Failed to restore agent ${agent.name} in room ${room.id}: ${err.message}`)
}
}
}
if (total > 0) {
logger.info(`[GroupChat] Restored ${total} agent(s) across ${rooms.length} room(s)`)
}
}
// ─── Auth ───────────────────────────────────────────────────
private async authMiddleware(socket: Socket, next: (err?: Error) => void): Promise<void> {
const authToken = await getToken()
const token = socket.handshake.auth.token || socket.handshake.query.token || ''
if (authToken) {
if (token !== authToken) {
return next(new Error('Unauthorized'))
}
}
next()
}
// ─── Connection ─────────────────────────────────────────────
private onConnection(socket: Socket): void {
const auth = socket.handshake.auth as { userId?: string; name?: string; description?: string }
const userId = auth.userId || socket.id
const userName = auth.name || `User-${userId.slice(0, 6)}`
const description = auth.description || ''
this.socketUserMap.set(socket.id, userId)
this.userInfoMap.set(userId, { name: userName, description })
logger.debug(`[GroupChat] Connected: ${userName} (socket=${socket.id}, user=${userId})`)
socket.on('join', (data: { roomId?: string; name?: string }, ack?: (response?: unknown) => void) => this.handleJoin(socket, data, ack))
socket.on('message', (data: { roomId?: string; content: string }, ack?: (response?: unknown) => void) => this.handleMessage(socket, data, ack))
socket.on('typing', (data: { roomId?: string }) => this.handleTyping(socket, data))
socket.on('stop_typing', (data: { roomId?: string }) => this.handleStopTyping(socket, data))
socket.on('context_status', (data: { roomId?: string; agentName?: string; status?: string }) => this.handleContextStatus(socket, data))
socket.on('disconnect', () => this.handleDisconnect(socket))
}
// ─── Handlers ───────────────────────────────────────────────
private handleJoin(socket: Socket, data: { roomId?: string; name?: string; description?: string }, ack?: (res: any) => void): void {
const socketId = socket.id
const userId = this.socketUserMap.get(socketId) || socketId
const userInfo = this.userInfoMap.get(userId) || { name: `User-${userId.slice(0, 6)}`, description: '' }
const userName = data.name || userInfo.name
const description = data.description || userInfo.description
// Update stored user info
this.userInfoMap.set(userId, { name: userName, description })
const roomId = data.roomId || 'general'
let room = this.rooms.get(roomId)
if (!room) {
room = new ChatRoom(roomId)
this.rooms.set(roomId, room)
this.storage.saveRoom(roomId, roomId)
}
// Persist member to SQLite
this.storage.addRoomMember(roomId, userId, userName, description)
// Add to in-memory online members (keyed by userId)
room.addOrUpdateMember(socketId, userId, userName, description)
socket.join(roomId)
socket.to(roomId).emit('member_joined', {
roomId,
memberId: userId,
memberName: userName,
members: room.getMembersList(),
})
// Load history from SQLite
const messages = this.storage.getMessages(roomId)
const agents = this.storage.getRoomAgents(roomId)
ack?.({
roomId,
roomName: room.name,
members: room.getMembersList(),
messages,
agents,
rooms: this.getRoomIds(),
typingUsers: this.getTypingUsers(roomId),
contextStatuses: this.getContextStatuses(roomId),
})
logger.debug(`[GroupChat] ${userName} (user=${userId}) joined room: ${roomId}`)
}
private handleMessage(socket: Socket, data: { roomId?: string; content: string }, ack?: (res: any) => void): void {
const socketId = socket.id
const roomId = data.roomId || 'general'
const room = this.rooms.get(roomId)
if (!room || !room.hasOnlineMember(socketId)) {
ack?.({ error: 'Not in room' })
return
}
const member = room.getOnlineMemberBySocketId(socketId)
const userId = member?.userId || socketId
const userName = member?.name || `User-${socketId.slice(0, 6)}`
const msg: ChatMessage = {
id: this.generateId(),
roomId,
senderId: userId,
senderName: userName,
content: data.content,
timestamp: Date.now(),
}
this.storage.addMessage(msg)
this.storage.pruneMessages(roomId)
// Recalculate total tokens for the room
const messages = this.storage.getMessages(roomId)
const totalTokens = this.storage.estimateTokens(messages.map(m => m.content + m.senderName).join(''))
this.storage.updateRoomTotalTokens(roomId, totalTokens)
this.nsp.to(roomId).emit('message', msg)
this.nsp.to(roomId).emit('room_updated', { roomId, totalTokens })
ack?.({ id: msg.id })
// Server-side @mention routing — parse mentions and invoke agents directly
this.agentClients.processMentions(roomId, {
content: msg.content,
senderName: msg.senderName,
senderId: msg.senderId,
timestamp: msg.timestamp,
}).catch((err) => {
logger.error(`[GroupChat] processMentions error: ${err.message}`)
})
}
private handleTyping(socket: Socket, data: { roomId?: string }): void {
const roomId = data.roomId || 'general'
const userId = this.socketUserMap.get(socket.id) || socket.id
const userName = this.userInfoMap.get(userId)?.name || `User-${socket.id.slice(0, 6)}`
// Track typing state for rejoin recovery
let roomTyping = this.typingState.get(roomId)
if (!roomTyping) {
roomTyping = new Map()
this.typingState.set(roomId, roomTyping)
}
const existing = roomTyping.get(userId)
if (existing) clearTimeout(existing.timer)
roomTyping.set(userId, {
userName,
timer: setTimeout(() => {
roomTyping!.delete(userId)
if (roomTyping!.size === 0) this.typingState.delete(roomId)
}, 30000),
})
socket.to(roomId).emit('typing', {
roomId,
userId,
userName,
})
}
private handleStopTyping(socket: Socket, data: { roomId?: string }): void {
const roomId = data.roomId || 'general'
const userId = this.socketUserMap.get(socket.id) || socket.id
// Remove from typing state
const roomTyping = this.typingState.get(roomId)
if (roomTyping) {
const entry = roomTyping.get(userId)
if (entry) clearTimeout(entry.timer)
roomTyping.delete(userId)
if (roomTyping.size === 0) this.typingState.delete(roomId)
}
socket.to(roomId).emit('stop_typing', {
roomId,
userId,
})
}
private handleContextStatus(socket: Socket, data: { roomId?: string; agentName?: string; status?: string }): void {
const roomId = data.roomId || 'general'
const agentName = data.agentName || ''
const status = data.status || ''
if (!agentName) return
let roomStatuses = this.contextStatusState.get(roomId)
if (!roomStatuses) {
roomStatuses = new Map()
this.contextStatusState.set(roomId, roomStatuses)
}
if (status === 'ready') {
roomStatuses.delete(agentName)
if (roomStatuses.size === 0) this.contextStatusState.delete(roomId)
} else {
roomStatuses.set(agentName, { agentName, status })
}
// Relay to all other sockets in the room
socket.to(roomId).emit('context_status', {
roomId,
agentName,
status,
})
}
private handleDisconnect(socket: Socket): void {
const socketId = socket.id
const userId = this.socketUserMap.get(socketId)
const userName = userId ? this.userInfoMap.get(userId)?.name : undefined
logger.debug(`[GroupChat] Disconnected: ${userName || socketId} (socket=${socketId}, user=${userId || socketId})`)
// Clean up typing state for this socket
for (const [roomId, roomTyping] of this.typingState) {
const entry = roomTyping.get(userId || socketId)
if (entry) {
clearTimeout(entry.timer)
roomTyping.delete(userId || socketId)
if (roomTyping.size === 0) this.typingState.delete(roomId)
}
}
this.leaveAllRooms(socket, socketId)
this.socketUserMap.delete(socketId)
// Don't delete userInfoMap — it persists across reconnects
}
// ─── Helpers ────────────────────────────────────────────────
private getTypingUsers(roomId: string): Array<{ userId: string; userName: string }> {
const roomTyping = this.typingState.get(roomId)
if (!roomTyping) return []
return Array.from(roomTyping.entries()).map(([userId, entry]) => ({ userId, userName: entry.userName }))
}
private getContextStatuses(roomId: string): Array<{ agentName: string; status: string }> {
const roomStatuses = this.contextStatusState.get(roomId)
if (!roomStatuses) return []
return Array.from(roomStatuses.values())
}
private leaveAllRooms(socket: Socket, socketId: string): void {
this.rooms.forEach((room, rid) => {
if (room.hasOnlineMember(socketId)) {
const member = room.getOnlineMemberBySocketId(socketId)
room.removeMember(socketId)
socket.leave(rid)
this.nsp.to(rid).emit('member_left', {
roomId: rid,
memberId: member?.userId || socketId,
memberName: member?.name || `User-${socketId.slice(0, 6)}`,
members: room.getMembersList(),
})
}
})
}
private generateId(): string {
return Date.now().toString(36) + Math.random().toString(36).slice(2, 8)
}
}