[verified] Fix group chat final stream rendering (#859)
This commit is contained in:
@@ -70,10 +70,35 @@ function uid(): string {
|
|||||||
return Date.now().toString(36) + Math.random().toString(36).slice(2, 8)
|
return Date.now().toString(36) + Math.random().toString(36).slice(2, 8)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const STREAM_FINAL_CONTENT_RECOVERY_DELAY_MS = 300
|
||||||
|
|
||||||
function normalizeLocalFilePath(path: string): string {
|
function normalizeLocalFilePath(path: string): string {
|
||||||
return /^[a-zA-Z]:\\/.test(path) ? path.replace(/\\/g, '/') : path
|
return /^[a-zA-Z]:\\/.test(path) ? path.replace(/\\/g, '/') : path
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function hasText(value?: string | null): boolean {
|
||||||
|
return !!value?.trim()
|
||||||
|
}
|
||||||
|
|
||||||
|
function hasToolCalls(message: ChatMessage): boolean {
|
||||||
|
return !!message.tool_calls?.length
|
||||||
|
}
|
||||||
|
|
||||||
|
function needsFinalContentRecovery(message: ChatMessage): boolean {
|
||||||
|
return message.role === 'assistant' && !hasText(message.content) && hasText(message.reasoning) && !hasToolCalls(message)
|
||||||
|
}
|
||||||
|
|
||||||
|
function mergeFinalMessage(existing: ChatMessage | null, msg: ChatMessage): ChatMessage {
|
||||||
|
return {
|
||||||
|
...msg,
|
||||||
|
content: hasText(msg.content) ? msg.content : existing?.content || msg.content || '',
|
||||||
|
reasoning: hasText(msg.reasoning) ? msg.reasoning : existing?.reasoning ?? msg.reasoning ?? null,
|
||||||
|
reasoning_content: hasText(msg.reasoning_content) ? msg.reasoning_content : existing?.reasoning_content ?? msg.reasoning_content ?? null,
|
||||||
|
isStreaming: false,
|
||||||
|
attachments: existing?.attachments || msg.attachments,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export interface GroupPendingApproval {
|
export interface GroupPendingApproval {
|
||||||
roomId: string
|
roomId: string
|
||||||
agentName: string
|
agentName: string
|
||||||
@@ -111,6 +136,31 @@ export const useGroupChatStore = defineStore('groupChat', () => {
|
|||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function recoverMissingFinalContent(roomId: string, messageId: string) {
|
||||||
|
if (currentRoomId.value !== roomId) return
|
||||||
|
const idx = messages.value.findIndex(m => m.id === messageId)
|
||||||
|
if (idx < 0 || !needsFinalContentRecovery(messages.value[idx])) return
|
||||||
|
|
||||||
|
try {
|
||||||
|
const res = await getRoomDetail(roomId)
|
||||||
|
const recovered = res.messages.find(m => m.id === messageId)
|
||||||
|
if (!recovered || !hasText(recovered.content)) return
|
||||||
|
|
||||||
|
const currentIdx = messages.value.findIndex(m => m.id === messageId)
|
||||||
|
if (currentIdx < 0 || !needsFinalContentRecovery(messages.value[currentIdx])) return
|
||||||
|
messages.value[currentIdx] = mergeFinalMessage(messages.value[currentIdx], recovered)
|
||||||
|
messages.value = [...messages.value]
|
||||||
|
} catch {
|
||||||
|
// Keep the reasoning-only bubble visible; a later final message event can still merge it.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function scheduleMissingFinalContentRecovery(roomId: string, messageId: string) {
|
||||||
|
setTimeout(() => {
|
||||||
|
void recoverMissingFinalContent(roomId, messageId)
|
||||||
|
}, STREAM_FINAL_CONTENT_RECOVERY_DELAY_MS)
|
||||||
|
}
|
||||||
|
|
||||||
// Computed: returns first active status for backward compat
|
// Computed: returns first active status for backward compat
|
||||||
const contextStatus = computed(() => {
|
const contextStatus = computed(() => {
|
||||||
for (const [, status] of contextStatuses.value) {
|
for (const [, status] of contextStatuses.value) {
|
||||||
@@ -176,11 +226,7 @@ export const useGroupChatStore = defineStore('groupChat', () => {
|
|||||||
if (msg.roomId === currentRoomId.value) {
|
if (msg.roomId === currentRoomId.value) {
|
||||||
const idx = messages.value.findIndex(m => m.id === msg.id)
|
const idx = messages.value.findIndex(m => m.id === msg.id)
|
||||||
const existing = idx >= 0 ? messages.value[idx] : null
|
const existing = idx >= 0 ? messages.value[idx] : null
|
||||||
const resolvedMsg = {
|
const resolvedMsg = mergeFinalMessage(existing, msg)
|
||||||
...msg,
|
|
||||||
isStreaming: false,
|
|
||||||
attachments: existing?.attachments,
|
|
||||||
}
|
|
||||||
if (idx >= 0) {
|
if (idx >= 0) {
|
||||||
messages.value[idx] = resolvedMsg
|
messages.value[idx] = resolvedMsg
|
||||||
messages.value = [...messages.value]
|
messages.value = [...messages.value]
|
||||||
@@ -207,7 +253,16 @@ export const useGroupChatStore = defineStore('groupChat', () => {
|
|||||||
msg.isStreaming = true
|
msg.isStreaming = true
|
||||||
const idx = messages.value.findIndex(m => m.id === msg.id)
|
const idx = messages.value.findIndex(m => m.id === msg.id)
|
||||||
if (idx >= 0) {
|
if (idx >= 0) {
|
||||||
messages.value[idx] = { ...messages.value[idx], ...msg, isStreaming: true }
|
const existing = messages.value[idx]
|
||||||
|
if (!existing.isStreaming) return
|
||||||
|
messages.value[idx] = {
|
||||||
|
...existing,
|
||||||
|
...msg,
|
||||||
|
content: hasText(msg.content) ? msg.content : existing.content || '',
|
||||||
|
reasoning: hasText(msg.reasoning) ? msg.reasoning : existing.reasoning,
|
||||||
|
reasoning_content: hasText(msg.reasoning_content) ? msg.reasoning_content : existing.reasoning_content,
|
||||||
|
isStreaming: true,
|
||||||
|
}
|
||||||
messages.value = [...messages.value]
|
messages.value = [...messages.value]
|
||||||
} else {
|
} else {
|
||||||
messages.value.push(msg)
|
messages.value.push(msg)
|
||||||
@@ -217,7 +272,7 @@ export const useGroupChatStore = defineStore('groupChat', () => {
|
|||||||
socket.on('message_stream_delta', (data: { roomId: string; id: string; delta: string }) => {
|
socket.on('message_stream_delta', (data: { roomId: string; id: string; delta: string }) => {
|
||||||
if (data.roomId !== currentRoomId.value) return
|
if (data.roomId !== currentRoomId.value) return
|
||||||
const idx = messages.value.findIndex(m => m.id === data.id)
|
const idx = messages.value.findIndex(m => m.id === data.id)
|
||||||
if (idx < 0) return
|
if (idx < 0 || !messages.value[idx].isStreaming) return
|
||||||
messages.value[idx] = {
|
messages.value[idx] = {
|
||||||
...messages.value[idx],
|
...messages.value[idx],
|
||||||
content: messages.value[idx].content + data.delta,
|
content: messages.value[idx].content + data.delta,
|
||||||
@@ -228,7 +283,7 @@ export const useGroupChatStore = defineStore('groupChat', () => {
|
|||||||
socket.on('message_reasoning_delta', (data: { roomId: string; id: string; delta: string }) => {
|
socket.on('message_reasoning_delta', (data: { roomId: string; id: string; delta: string }) => {
|
||||||
if (data.roomId !== currentRoomId.value) return
|
if (data.roomId !== currentRoomId.value) return
|
||||||
const idx = messages.value.findIndex(m => m.id === data.id)
|
const idx = messages.value.findIndex(m => m.id === data.id)
|
||||||
if (idx < 0) return
|
if (idx < 0 || !messages.value[idx].isStreaming) return
|
||||||
messages.value[idx] = {
|
messages.value[idx] = {
|
||||||
...messages.value[idx],
|
...messages.value[idx],
|
||||||
reasoning: (messages.value[idx].reasoning || '') + data.delta,
|
reasoning: (messages.value[idx].reasoning || '') + data.delta,
|
||||||
@@ -254,6 +309,9 @@ export const useGroupChatStore = defineStore('groupChat', () => {
|
|||||||
isStreaming: false,
|
isStreaming: false,
|
||||||
}
|
}
|
||||||
messages.value = [...messages.value]
|
messages.value = [...messages.value]
|
||||||
|
if (needsFinalContentRecovery(messages.value[idx])) {
|
||||||
|
scheduleMissingFinalContentRecovery(data.roomId, data.id)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,224 @@
|
|||||||
|
// @vitest-environment jsdom
|
||||||
|
import { beforeEach, describe, expect, it, vi } from 'vitest'
|
||||||
|
import { createPinia, setActivePinia } from 'pinia'
|
||||||
|
import type { ChatMessage, RoomInfo } from '@/api/hermes/group-chat'
|
||||||
|
|
||||||
|
const groupChatApiMock = vi.hoisted(() => {
|
||||||
|
const handlers = new Map<string, Function[]>()
|
||||||
|
const socket: any = {
|
||||||
|
connected: true,
|
||||||
|
id: 'socket-1',
|
||||||
|
on: vi.fn((event: string, cb: Function) => {
|
||||||
|
const existing = handlers.get(event) || []
|
||||||
|
existing.push(cb)
|
||||||
|
handlers.set(event, existing)
|
||||||
|
return socket
|
||||||
|
}),
|
||||||
|
emit: vi.fn((event: string, _data?: unknown, ack?: Function) => {
|
||||||
|
if (event === 'join' && ack) ack({ members: [], agents: [], typingUsers: [], contextStatuses: [] })
|
||||||
|
return socket
|
||||||
|
}),
|
||||||
|
disconnect: vi.fn(),
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
handlers,
|
||||||
|
socket,
|
||||||
|
connectGroupChat: vi.fn(() => socket),
|
||||||
|
disconnectGroupChat: vi.fn(),
|
||||||
|
getSocket: vi.fn(() => socket),
|
||||||
|
getStoredUserId: vi.fn(() => 'user-1'),
|
||||||
|
getStoredUserName: vi.fn(() => 'tester'),
|
||||||
|
createRoom: vi.fn(),
|
||||||
|
listRooms: vi.fn(),
|
||||||
|
getRoomDetail: vi.fn(),
|
||||||
|
joinRoomByCode: vi.fn(),
|
||||||
|
addAgent: vi.fn(),
|
||||||
|
listAgents: vi.fn(),
|
||||||
|
removeAgent: vi.fn(),
|
||||||
|
cloneRoom: vi.fn(),
|
||||||
|
deleteRoom: vi.fn(),
|
||||||
|
clearRoomContext: vi.fn(),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
vi.mock('@/api/hermes/group-chat', () => groupChatApiMock)
|
||||||
|
vi.mock('@/api/client', () => ({ getApiKey: vi.fn(() => 'test-token') }))
|
||||||
|
vi.mock('@/api/hermes/download', () => ({ getDownloadUrl: vi.fn((path: string) => `/download?path=${path}`) }))
|
||||||
|
|
||||||
|
function emitSocket(event: string, payload: unknown) {
|
||||||
|
for (const cb of groupChatApiMock.handlers.get(event) || []) cb(payload)
|
||||||
|
}
|
||||||
|
|
||||||
|
const room: RoomInfo = {
|
||||||
|
id: 'room-1',
|
||||||
|
name: 'Test Room',
|
||||||
|
inviteCode: 'ROOM1',
|
||||||
|
}
|
||||||
|
|
||||||
|
function assistantMessage(overrides: Partial<ChatMessage>): ChatMessage {
|
||||||
|
return {
|
||||||
|
id: 'msg-1',
|
||||||
|
roomId: 'room-1',
|
||||||
|
senderId: 'agent-1',
|
||||||
|
senderName: 'bot',
|
||||||
|
content: '',
|
||||||
|
timestamp: 1,
|
||||||
|
role: 'assistant',
|
||||||
|
...overrides,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function createJoinedStore(initialMessages: ChatMessage[] = []) {
|
||||||
|
groupChatApiMock.getRoomDetail.mockResolvedValue({
|
||||||
|
room,
|
||||||
|
messages: initialMessages,
|
||||||
|
agents: [],
|
||||||
|
members: [],
|
||||||
|
})
|
||||||
|
const { useGroupChatStore } = await import('@/stores/hermes/group-chat')
|
||||||
|
const store = useGroupChatStore()
|
||||||
|
store.connect()
|
||||||
|
await store.joinRoom('room-1')
|
||||||
|
groupChatApiMock.getRoomDetail.mockClear()
|
||||||
|
return store
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('group chat store streaming merge', () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
vi.useRealTimers()
|
||||||
|
setActivePinia(createPinia())
|
||||||
|
groupChatApiMock.handlers.clear()
|
||||||
|
for (const key of Object.keys(groupChatApiMock)) {
|
||||||
|
const value = (groupChatApiMock as any)[key]
|
||||||
|
if (value?.mockReset && key !== 'socket') value.mockReset()
|
||||||
|
}
|
||||||
|
groupChatApiMock.connectGroupChat.mockReturnValue(groupChatApiMock.socket)
|
||||||
|
groupChatApiMock.getSocket.mockReturnValue(groupChatApiMock.socket)
|
||||||
|
groupChatApiMock.getStoredUserId.mockReturnValue('user-1')
|
||||||
|
groupChatApiMock.getStoredUserName.mockReturnValue('tester')
|
||||||
|
groupChatApiMock.socket.on.mockClear()
|
||||||
|
groupChatApiMock.socket.emit.mockClear()
|
||||||
|
groupChatApiMock.socket.disconnect.mockClear()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('preserves streamed reasoning when the final message supplies content only', async () => {
|
||||||
|
const store = await createJoinedStore()
|
||||||
|
|
||||||
|
emitSocket('message_stream_start', assistantMessage({ id: 'msg-1' }))
|
||||||
|
emitSocket('message_reasoning_delta', { roomId: 'room-1', id: 'msg-1', delta: 'thinking...' })
|
||||||
|
emitSocket('message', assistantMessage({ id: 'msg-1', content: '收到', reasoning: null, reasoning_content: null }))
|
||||||
|
|
||||||
|
expect(store.messages).toHaveLength(1)
|
||||||
|
expect(store.messages[0]).toMatchObject({
|
||||||
|
id: 'msg-1',
|
||||||
|
content: '收到',
|
||||||
|
reasoning: 'thinking...',
|
||||||
|
reasoning_content: 'thinking...',
|
||||||
|
isStreaming: false,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
it('preserves streamed content when the final message payload is blank', async () => {
|
||||||
|
const store = await createJoinedStore()
|
||||||
|
|
||||||
|
emitSocket('message_stream_start', assistantMessage({ id: 'msg-1' }))
|
||||||
|
emitSocket('message_stream_delta', { roomId: 'room-1', id: 'msg-1', delta: 'final' })
|
||||||
|
emitSocket('message_stream_delta', { roomId: 'room-1', id: 'msg-1', delta: ' answer' })
|
||||||
|
emitSocket('message', assistantMessage({ id: 'msg-1', content: '', reasoning: 'thinking...' }))
|
||||||
|
|
||||||
|
expect(store.messages).toHaveLength(1)
|
||||||
|
expect(store.messages[0]).toMatchObject({
|
||||||
|
id: 'msg-1',
|
||||||
|
content: 'final answer',
|
||||||
|
reasoning: 'thinking...',
|
||||||
|
isStreaming: false,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
it('ignores late content deltas for a completed message', async () => {
|
||||||
|
const store = await createJoinedStore()
|
||||||
|
|
||||||
|
emitSocket('message', assistantMessage({ id: 'msg-1', content: 'final answer', reasoning: 'thinking...' }))
|
||||||
|
emitSocket('message_stream_delta', { roomId: 'room-1', id: 'msg-1', delta: ' stale' })
|
||||||
|
|
||||||
|
expect(store.messages).toHaveLength(1)
|
||||||
|
expect(store.messages[0]).toMatchObject({
|
||||||
|
id: 'msg-1',
|
||||||
|
content: 'final answer',
|
||||||
|
reasoning: 'thinking...',
|
||||||
|
isStreaming: false,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
it('ignores late reasoning deltas for a completed message', async () => {
|
||||||
|
const store = await createJoinedStore()
|
||||||
|
|
||||||
|
emitSocket('message', assistantMessage({ id: 'msg-1', content: 'final answer', reasoning: 'thinking...' }))
|
||||||
|
emitSocket('message_reasoning_delta', { roomId: 'room-1', id: 'msg-1', delta: ' stale' })
|
||||||
|
|
||||||
|
expect(store.messages).toHaveLength(1)
|
||||||
|
expect(store.messages[0]).toMatchObject({
|
||||||
|
id: 'msg-1',
|
||||||
|
content: 'final answer',
|
||||||
|
reasoning: 'thinking...',
|
||||||
|
isStreaming: false,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
it('ignores a late empty stream start for a completed message', async () => {
|
||||||
|
const store = await createJoinedStore()
|
||||||
|
|
||||||
|
emitSocket('message', assistantMessage({ id: 'msg-1', content: 'final answer', reasoning: 'thinking...' }))
|
||||||
|
emitSocket('message_stream_start', assistantMessage({ id: 'msg-1', content: '', timestamp: 2 }))
|
||||||
|
|
||||||
|
expect(store.messages).toHaveLength(1)
|
||||||
|
expect(store.messages[0]).toMatchObject({
|
||||||
|
id: 'msg-1',
|
||||||
|
content: 'final answer',
|
||||||
|
reasoning: 'thinking...',
|
||||||
|
isStreaming: false,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
it('ignores a late stream start for a completed empty tool-call message', async () => {
|
||||||
|
const store = await createJoinedStore()
|
||||||
|
const toolCalls = [{ id: 'tool-1', type: 'function', function: { name: 'lookup', arguments: '{}' } }]
|
||||||
|
|
||||||
|
emitSocket('message', assistantMessage({ id: 'msg-1', content: '', tool_calls: toolCalls }))
|
||||||
|
emitSocket('message_stream_start', assistantMessage({ id: 'msg-1', content: '', timestamp: 2 }))
|
||||||
|
emitSocket('message_stream_delta', { roomId: 'room-1', id: 'msg-1', delta: ' stale' })
|
||||||
|
|
||||||
|
expect(store.messages).toHaveLength(1)
|
||||||
|
expect(store.messages[0]).toMatchObject({
|
||||||
|
id: 'msg-1',
|
||||||
|
content: '',
|
||||||
|
tool_calls: toolCalls,
|
||||||
|
isStreaming: false,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
it('refetches room detail when a stream ends with reasoning but no final content', async () => {
|
||||||
|
vi.useFakeTimers()
|
||||||
|
const store = await createJoinedStore()
|
||||||
|
groupChatApiMock.getRoomDetail.mockResolvedValue({
|
||||||
|
room,
|
||||||
|
agents: [],
|
||||||
|
members: [],
|
||||||
|
messages: [assistantMessage({ id: 'msg-1', content: 'final from db', reasoning: 'thinking...' })],
|
||||||
|
})
|
||||||
|
|
||||||
|
emitSocket('message_stream_start', assistantMessage({ id: 'msg-1' }))
|
||||||
|
emitSocket('message_reasoning_delta', { roomId: 'room-1', id: 'msg-1', delta: 'thinking...' })
|
||||||
|
emitSocket('message_stream_end', { roomId: 'room-1', id: 'msg-1' })
|
||||||
|
|
||||||
|
await vi.runAllTimersAsync()
|
||||||
|
|
||||||
|
expect(groupChatApiMock.getRoomDetail).toHaveBeenCalledWith('room-1')
|
||||||
|
expect(store.messages[0]).toMatchObject({
|
||||||
|
id: 'msg-1',
|
||||||
|
content: 'final from db',
|
||||||
|
reasoning: 'thinking...',
|
||||||
|
isStreaming: false,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
Reference in New Issue
Block a user