diff --git a/packages/client/src/stores/hermes/group-chat.ts b/packages/client/src/stores/hermes/group-chat.ts index fa03c01..5b05e0e 100644 --- a/packages/client/src/stores/hermes/group-chat.ts +++ b/packages/client/src/stores/hermes/group-chat.ts @@ -70,10 +70,35 @@ function uid(): string { return Date.now().toString(36) + Math.random().toString(36).slice(2, 8) } +const STREAM_FINAL_CONTENT_RECOVERY_DELAY_MS = 300 + function normalizeLocalFilePath(path: string): string { return /^[a-zA-Z]:\\/.test(path) ? path.replace(/\\/g, '/') : path } +function hasText(value?: string | null): boolean { + return !!value?.trim() +} + +function hasToolCalls(message: ChatMessage): boolean { + return !!message.tool_calls?.length +} + +function needsFinalContentRecovery(message: ChatMessage): boolean { + return message.role === 'assistant' && !hasText(message.content) && hasText(message.reasoning) && !hasToolCalls(message) +} + +function mergeFinalMessage(existing: ChatMessage | null, msg: ChatMessage): ChatMessage { + return { + ...msg, + content: hasText(msg.content) ? msg.content : existing?.content || msg.content || '', + reasoning: hasText(msg.reasoning) ? msg.reasoning : existing?.reasoning ?? msg.reasoning ?? null, + reasoning_content: hasText(msg.reasoning_content) ? msg.reasoning_content : existing?.reasoning_content ?? msg.reasoning_content ?? null, + isStreaming: false, + attachments: existing?.attachments || msg.attachments, + } +} + export interface GroupPendingApproval { roomId: string agentName: string @@ -111,6 +136,31 @@ export const useGroupChatStore = defineStore('groupChat', () => { })) } + async function recoverMissingFinalContent(roomId: string, messageId: string) { + if (currentRoomId.value !== roomId) return + const idx = messages.value.findIndex(m => m.id === messageId) + if (idx < 0 || !needsFinalContentRecovery(messages.value[idx])) return + + try { + const res = await getRoomDetail(roomId) + const recovered = res.messages.find(m => m.id === messageId) + if (!recovered || !hasText(recovered.content)) return + + const currentIdx = messages.value.findIndex(m => m.id === messageId) + if (currentIdx < 0 || !needsFinalContentRecovery(messages.value[currentIdx])) return + messages.value[currentIdx] = mergeFinalMessage(messages.value[currentIdx], recovered) + messages.value = [...messages.value] + } catch { + // Keep the reasoning-only bubble visible; a later final message event can still merge it. + } + } + + function scheduleMissingFinalContentRecovery(roomId: string, messageId: string) { + setTimeout(() => { + void recoverMissingFinalContent(roomId, messageId) + }, STREAM_FINAL_CONTENT_RECOVERY_DELAY_MS) + } + // Computed: returns first active status for backward compat const contextStatus = computed(() => { for (const [, status] of contextStatuses.value) { @@ -176,11 +226,7 @@ export const useGroupChatStore = defineStore('groupChat', () => { if (msg.roomId === currentRoomId.value) { const idx = messages.value.findIndex(m => m.id === msg.id) const existing = idx >= 0 ? messages.value[idx] : null - const resolvedMsg = { - ...msg, - isStreaming: false, - attachments: existing?.attachments, - } + const resolvedMsg = mergeFinalMessage(existing, msg) if (idx >= 0) { messages.value[idx] = resolvedMsg messages.value = [...messages.value] @@ -207,7 +253,16 @@ export const useGroupChatStore = defineStore('groupChat', () => { msg.isStreaming = true const idx = messages.value.findIndex(m => m.id === msg.id) if (idx >= 0) { - messages.value[idx] = { ...messages.value[idx], ...msg, isStreaming: true } + const existing = messages.value[idx] + if (!existing.isStreaming) return + messages.value[idx] = { + ...existing, + ...msg, + content: hasText(msg.content) ? msg.content : existing.content || '', + reasoning: hasText(msg.reasoning) ? msg.reasoning : existing.reasoning, + reasoning_content: hasText(msg.reasoning_content) ? msg.reasoning_content : existing.reasoning_content, + isStreaming: true, + } messages.value = [...messages.value] } else { messages.value.push(msg) @@ -217,7 +272,7 @@ export const useGroupChatStore = defineStore('groupChat', () => { socket.on('message_stream_delta', (data: { roomId: string; id: string; delta: string }) => { if (data.roomId !== currentRoomId.value) return const idx = messages.value.findIndex(m => m.id === data.id) - if (idx < 0) return + if (idx < 0 || !messages.value[idx].isStreaming) return messages.value[idx] = { ...messages.value[idx], content: messages.value[idx].content + data.delta, @@ -228,7 +283,7 @@ export const useGroupChatStore = defineStore('groupChat', () => { socket.on('message_reasoning_delta', (data: { roomId: string; id: string; delta: string }) => { if (data.roomId !== currentRoomId.value) return const idx = messages.value.findIndex(m => m.id === data.id) - if (idx < 0) return + if (idx < 0 || !messages.value[idx].isStreaming) return messages.value[idx] = { ...messages.value[idx], reasoning: (messages.value[idx].reasoning || '') + data.delta, @@ -254,6 +309,9 @@ export const useGroupChatStore = defineStore('groupChat', () => { isStreaming: false, } messages.value = [...messages.value] + if (needsFinalContentRecovery(messages.value[idx])) { + scheduleMissingFinalContentRecovery(data.roomId, data.id) + } } }) diff --git a/tests/client/group-chat-store-streaming.test.ts b/tests/client/group-chat-store-streaming.test.ts new file mode 100644 index 0000000..2c3d213 --- /dev/null +++ b/tests/client/group-chat-store-streaming.test.ts @@ -0,0 +1,224 @@ +// @vitest-environment jsdom +import { beforeEach, describe, expect, it, vi } from 'vitest' +import { createPinia, setActivePinia } from 'pinia' +import type { ChatMessage, RoomInfo } from '@/api/hermes/group-chat' + +const groupChatApiMock = vi.hoisted(() => { + const handlers = new Map() + const socket: any = { + connected: true, + id: 'socket-1', + on: vi.fn((event: string, cb: Function) => { + const existing = handlers.get(event) || [] + existing.push(cb) + handlers.set(event, existing) + return socket + }), + emit: vi.fn((event: string, _data?: unknown, ack?: Function) => { + if (event === 'join' && ack) ack({ members: [], agents: [], typingUsers: [], contextStatuses: [] }) + return socket + }), + disconnect: vi.fn(), + } + return { + handlers, + socket, + connectGroupChat: vi.fn(() => socket), + disconnectGroupChat: vi.fn(), + getSocket: vi.fn(() => socket), + getStoredUserId: vi.fn(() => 'user-1'), + getStoredUserName: vi.fn(() => 'tester'), + createRoom: vi.fn(), + listRooms: vi.fn(), + getRoomDetail: vi.fn(), + joinRoomByCode: vi.fn(), + addAgent: vi.fn(), + listAgents: vi.fn(), + removeAgent: vi.fn(), + cloneRoom: vi.fn(), + deleteRoom: vi.fn(), + clearRoomContext: vi.fn(), + } +}) + +vi.mock('@/api/hermes/group-chat', () => groupChatApiMock) +vi.mock('@/api/client', () => ({ getApiKey: vi.fn(() => 'test-token') })) +vi.mock('@/api/hermes/download', () => ({ getDownloadUrl: vi.fn((path: string) => `/download?path=${path}`) })) + +function emitSocket(event: string, payload: unknown) { + for (const cb of groupChatApiMock.handlers.get(event) || []) cb(payload) +} + +const room: RoomInfo = { + id: 'room-1', + name: 'Test Room', + inviteCode: 'ROOM1', +} + +function assistantMessage(overrides: Partial): ChatMessage { + return { + id: 'msg-1', + roomId: 'room-1', + senderId: 'agent-1', + senderName: 'bot', + content: '', + timestamp: 1, + role: 'assistant', + ...overrides, + } +} + +async function createJoinedStore(initialMessages: ChatMessage[] = []) { + groupChatApiMock.getRoomDetail.mockResolvedValue({ + room, + messages: initialMessages, + agents: [], + members: [], + }) + const { useGroupChatStore } = await import('@/stores/hermes/group-chat') + const store = useGroupChatStore() + store.connect() + await store.joinRoom('room-1') + groupChatApiMock.getRoomDetail.mockClear() + return store +} + +describe('group chat store streaming merge', () => { + beforeEach(() => { + vi.useRealTimers() + setActivePinia(createPinia()) + groupChatApiMock.handlers.clear() + for (const key of Object.keys(groupChatApiMock)) { + const value = (groupChatApiMock as any)[key] + if (value?.mockReset && key !== 'socket') value.mockReset() + } + groupChatApiMock.connectGroupChat.mockReturnValue(groupChatApiMock.socket) + groupChatApiMock.getSocket.mockReturnValue(groupChatApiMock.socket) + groupChatApiMock.getStoredUserId.mockReturnValue('user-1') + groupChatApiMock.getStoredUserName.mockReturnValue('tester') + groupChatApiMock.socket.on.mockClear() + groupChatApiMock.socket.emit.mockClear() + groupChatApiMock.socket.disconnect.mockClear() + }) + + it('preserves streamed reasoning when the final message supplies content only', async () => { + const store = await createJoinedStore() + + emitSocket('message_stream_start', assistantMessage({ id: 'msg-1' })) + emitSocket('message_reasoning_delta', { roomId: 'room-1', id: 'msg-1', delta: 'thinking...' }) + emitSocket('message', assistantMessage({ id: 'msg-1', content: '收到', reasoning: null, reasoning_content: null })) + + expect(store.messages).toHaveLength(1) + expect(store.messages[0]).toMatchObject({ + id: 'msg-1', + content: '收到', + reasoning: 'thinking...', + reasoning_content: 'thinking...', + isStreaming: false, + }) + }) + + it('preserves streamed content when the final message payload is blank', async () => { + const store = await createJoinedStore() + + emitSocket('message_stream_start', assistantMessage({ id: 'msg-1' })) + emitSocket('message_stream_delta', { roomId: 'room-1', id: 'msg-1', delta: 'final' }) + emitSocket('message_stream_delta', { roomId: 'room-1', id: 'msg-1', delta: ' answer' }) + emitSocket('message', assistantMessage({ id: 'msg-1', content: '', reasoning: 'thinking...' })) + + expect(store.messages).toHaveLength(1) + expect(store.messages[0]).toMatchObject({ + id: 'msg-1', + content: 'final answer', + reasoning: 'thinking...', + isStreaming: false, + }) + }) + + it('ignores late content deltas for a completed message', async () => { + const store = await createJoinedStore() + + emitSocket('message', assistantMessage({ id: 'msg-1', content: 'final answer', reasoning: 'thinking...' })) + emitSocket('message_stream_delta', { roomId: 'room-1', id: 'msg-1', delta: ' stale' }) + + expect(store.messages).toHaveLength(1) + expect(store.messages[0]).toMatchObject({ + id: 'msg-1', + content: 'final answer', + reasoning: 'thinking...', + isStreaming: false, + }) + }) + + it('ignores late reasoning deltas for a completed message', async () => { + const store = await createJoinedStore() + + emitSocket('message', assistantMessage({ id: 'msg-1', content: 'final answer', reasoning: 'thinking...' })) + emitSocket('message_reasoning_delta', { roomId: 'room-1', id: 'msg-1', delta: ' stale' }) + + expect(store.messages).toHaveLength(1) + expect(store.messages[0]).toMatchObject({ + id: 'msg-1', + content: 'final answer', + reasoning: 'thinking...', + isStreaming: false, + }) + }) + + it('ignores a late empty stream start for a completed message', async () => { + const store = await createJoinedStore() + + emitSocket('message', assistantMessage({ id: 'msg-1', content: 'final answer', reasoning: 'thinking...' })) + emitSocket('message_stream_start', assistantMessage({ id: 'msg-1', content: '', timestamp: 2 })) + + expect(store.messages).toHaveLength(1) + expect(store.messages[0]).toMatchObject({ + id: 'msg-1', + content: 'final answer', + reasoning: 'thinking...', + isStreaming: false, + }) + }) + + it('ignores a late stream start for a completed empty tool-call message', async () => { + const store = await createJoinedStore() + const toolCalls = [{ id: 'tool-1', type: 'function', function: { name: 'lookup', arguments: '{}' } }] + + emitSocket('message', assistantMessage({ id: 'msg-1', content: '', tool_calls: toolCalls })) + emitSocket('message_stream_start', assistantMessage({ id: 'msg-1', content: '', timestamp: 2 })) + emitSocket('message_stream_delta', { roomId: 'room-1', id: 'msg-1', delta: ' stale' }) + + expect(store.messages).toHaveLength(1) + expect(store.messages[0]).toMatchObject({ + id: 'msg-1', + content: '', + tool_calls: toolCalls, + isStreaming: false, + }) + }) + + it('refetches room detail when a stream ends with reasoning but no final content', async () => { + vi.useFakeTimers() + const store = await createJoinedStore() + groupChatApiMock.getRoomDetail.mockResolvedValue({ + room, + agents: [], + members: [], + messages: [assistantMessage({ id: 'msg-1', content: 'final from db', reasoning: 'thinking...' })], + }) + + emitSocket('message_stream_start', assistantMessage({ id: 'msg-1' })) + emitSocket('message_reasoning_delta', { roomId: 'room-1', id: 'msg-1', delta: 'thinking...' }) + emitSocket('message_stream_end', { roomId: 'room-1', id: 'msg-1' }) + + await vi.runAllTimersAsync() + + expect(groupChatApiMock.getRoomDetail).toHaveBeenCalledWith('room-1') + expect(store.messages[0]).toMatchObject({ + id: 'msg-1', + content: 'final from db', + reasoning: 'thinking...', + isStreaming: false, + }) + }) +})