diff --git a/packages/client/src/stores/hermes/chat.ts b/packages/client/src/stores/hermes/chat.ts index d3a4413..7138436 100644 --- a/packages/client/src/stores/hermes/chat.ts +++ b/packages/client/src/stores/hermes/chat.ts @@ -397,6 +397,39 @@ export const useChatStore = defineStore('chat', () => { return rec } + function compareServerMessages(local: Message[], server: Message[]) { + const localUserIndexes = local.map((m, i) => (m.role === 'user' ? i : -1)).filter(i => i >= 0) + const serverUserIndexes = server.map((m, i) => (m.role === 'user' ? i : -1)).filter(i => i >= 0) + const localUsers = localUserIndexes.length + const serverUsers = serverUserIndexes.length + + if (serverUsers > localUsers) return { serverIsCaughtUp: true, serverIsAhead: true } + if (serverUsers < localUsers) return { serverIsCaughtUp: false, serverIsAhead: false } + + const localLastUserIndex = localUserIndexes[localUserIndexes.length - 1] ?? -1 + const serverLastUserIndex = serverUserIndexes[serverUserIndexes.length - 1] ?? -1 + const sameCurrentTurn = + localLastUserIndex < 0 + || serverLastUserIndex < 0 + || local[localLastUserIndex]?.content === server[serverLastUserIndex]?.content + + if (!sameCurrentTurn) return { serverIsCaughtUp: false, serverIsAhead: false } + + const localCurrentAssistantLen = local + .slice(localLastUserIndex + 1) + .filter(m => m.role === 'assistant') + .reduce((total, m) => total + (m.content?.length || 0), 0) + const serverCurrentAssistantLen = server + .slice(serverLastUserIndex + 1) + .filter(m => m.role === 'assistant') + .reduce((total, m) => total + (m.content?.length || 0), 0) + + return { + serverIsCaughtUp: true, + serverIsAhead: serverCurrentAssistantLen >= localCurrentAssistantLen, + } + } + function stopPolling(sid: string) { const t = pollTimers.get(sid) if (t) { @@ -430,23 +463,11 @@ export const useChatStore = defineStore('chat', () => { const mapped = mapHermesMessages(detail.messages || []) const target = sessions.value.find(s => s.id === sid) if (!target) return - // Use the same "content-aware" comparison as switchSession: server - // is ahead iff it knows about at least as many user turns and its - // last assistant text is at least as long as ours. + // Use the same current-turn comparison as switchSession: server is + // ahead only when it has a newer user turn or the assistant output + // after the current user turn has caught up. const local = target.messages - const localLastAssistant = [...local].reverse().find(m => m.role === 'assistant') - const serverLastAssistant = [...mapped].reverse().find(m => m.role === 'assistant') - const localAssistantLen = localLastAssistant?.content?.length ?? 0 - const serverAssistantLen = serverLastAssistant?.content?.length ?? 0 - const localUsers = local.filter(m => m.role === 'user').length - const serverUsers = mapped.filter(m => m.role === 'user').length - const serverIsCaughtUp = serverUsers >= localUsers - // Same rationale as switchSession: strictly more user turns means - // server is ahead (new turn complete). Equal user turns + longer - // assistant means server caught up on the current turn. - const serverIsAhead = - serverUsers > localUsers - || (serverUsers === localUsers && serverAssistantLen >= localAssistantLen) + const { serverIsAhead, serverIsCaughtUp } = compareServerMessages(local, mapped) if (serverIsAhead) { target.messages = mapped if (detail.title && !target.title) target.title = detail.title @@ -466,12 +487,14 @@ export const useChatStore = defineStore('chat', () => { if (prev && prev.sig === sig) { prev.stableTicks += 1 if (prev.stableTicks >= POLL_STABLE_EXITS) { - // Run is done on the server. Force-apply server view even if - // our "don't retreat" guard above skipped it — the server is - // now the authoritative source of truth. - target.messages = mapped - if (detail.title) target.title = detail.title - if (sid === activeSessionId.value) persistActiveMessages() + // The server view has stopped changing. If it is still behind + // the locally streamed assistant reply, end recovery without + // retreating local state; otherwise commit the server view. + if (serverIsAhead) { + target.messages = mapped + if (detail.title) target.title = detail.title + if (sid === activeSessionId.value) persistActiveMessages() + } clearInFlight(sid) stopPolling(sid) } @@ -548,9 +571,10 @@ export const useChatStore = defineStore('chat', () => { } } - // Re-pull active session from server and overwrite local messages. Used on - // SSE drop and on tab-visible events — mobile browsers kill EventSource - // while backgrounded, but the backend run usually completes anyway. + // Re-pull active session from server without retreating newer locally + // streamed output. Used on SSE drop and on tab-visible events — mobile + // browsers kill EventSource while backgrounded, but the backend run usually + // completes anyway. async function refreshActiveSession(): Promise { const sid = activeSessionId.value if (!sid) return false @@ -560,9 +584,12 @@ export const useChatStore = defineStore('chat', () => { const target = sessions.value.find(s => s.id === sid) if (!target) return false const mapped = mapHermesMessages(detail.messages || []) - target.messages = mapped + const { serverIsAhead } = compareServerMessages(target.messages, mapped) + if (serverIsAhead) { + target.messages = mapped + persistActiveMessages() + } if (detail.title) target.title = detail.title - persistActiveMessages() return true } catch (err) { console.error('Failed to refresh active session:', err) @@ -616,33 +643,14 @@ export const useChatStore = defineStore('chat', () => { const detail = await fetchSession(sessionId) if (detail && detail.messages) { const mapped = mapHermesMessages(detail.messages) - // Pick whichever view has more information. Simple length comparison - // is wrong because mapHermesMessages folds tool_call-only assistant - // msgs and matches them with tool-result msgs — so post-fold `mapped` - // can be SHORTER than the raw SSE-built local array even when the - // server is strictly ahead. Instead, compare the last assistant - // message content: if the server's is at least as long, the server - // is up-to-date (and has the final complete text); otherwise keep - // local (in-flight window where server hasn't flushed the new turn). + // Pick whichever view has more information for the current turn. + // Simple message-count comparison is wrong because mapHermesMessages + // folds tool_call-only assistant messages; global last-assistant + // comparison is also wrong across turns. Trust server only when it has + // a newer user turn or its assistant output after the current user turn + // has caught up. const local = activeSession.value.messages - const localLastAssistant = [...local].reverse().find(m => m.role === 'assistant') - const serverLastAssistant = [...mapped].reverse().find(m => m.role === 'assistant') - const localAssistantLen = localLastAssistant?.content?.length ?? 0 - const serverAssistantLen = serverLastAssistant?.content?.length ?? 0 - const localUsers = local.filter(m => m.role === 'user').length - const serverUsers = mapped.filter(m => m.role === 'user').length - // Trust server when: - // - it has STRICTLY MORE user turns than we do (new turn landed), - // OR - // - same user-turn count AND server's last assistant is at least - // as long as ours (same turn, server caught up or further) - // Otherwise keep local (protects against the server-not-yet-flushed - // race during in-flight runs). Length comparison alone is wrong - // across different turns because each turn's last assistant is - // unrelated to the previous turn's. - const serverIsAhead = - serverUsers > localUsers - || (serverUsers === localUsers && serverAssistantLen >= localAssistantLen) + const { serverIsAhead } = compareServerMessages(local, mapped) if (serverIsAhead) { activeSession.value.messages = mapped } diff --git a/packages/server/src/db/hermes/conversations-db.ts b/packages/server/src/db/hermes/conversations-db.ts index 54de656..9b9f89a 100644 --- a/packages/server/src/db/hermes/conversations-db.ts +++ b/packages/server/src/db/hermes/conversations-db.ts @@ -205,15 +205,8 @@ function timingMatchesParent(parent: ConversationSessionRow | undefined, child: return Math.abs(Number(child.started_at || 0) - Number(parent.ended_at || 0)) <= LINEAGE_TOLERANCE_SECONDS } -function isBranchRoot(session: ConversationSessionRow | undefined, byId: Map): boolean { - if (!session?.parent_session_id) return false - const parent = byId.get(session.parent_session_id) - return !!parent && parent.end_reason === 'branched' && timingMatchesParent(parent, session) -} - -function isVisibleRoot(session: ConversationSessionRow | undefined, byId: Map): boolean { - if (!session || session.source === 'tool') return false - return session.parent_session_id == null || isBranchRoot(session, byId) +function isCompressionEndReason(reason: string | null): boolean { + return reason === 'compression' || reason === 'compressed' } function continuationCandidates(parent: ConversationSessionRow, byId: Map, childrenByParent: Map): ConversationSessionRow[] { @@ -233,7 +226,7 @@ function continuationCandidates(parent: ConversationSessionRow, byId: Map, childrenByParent: Map): ConversationSessionRow | null { - if (parent.end_reason !== 'compression') return null + if (!isCompressionEndReason(parent.end_reason)) return null const candidates = continuationCandidates(parent, byId, childrenByParent) if (candidates.length === 1) return candidates[0] @@ -247,6 +240,33 @@ function nextContinuationChild(parent: ConversationSessionRow, byId: Map, childrenByParent: Map): boolean { + if (!session?.parent_session_id) return false + const parent = byId.get(session.parent_session_id) + if (!parent) return false + return nextContinuationChild(parent, byId, childrenByParent)?.id === session.id +} + +function compressionChainRootId(sessionId: string, byId: Map, childrenByParent: Map): string | null { + let current = byId.get(sessionId) || null + if (!current || current.source === 'tool') return null + + const seen = new Set() + while (current?.parent_session_id && !seen.has(current.id)) { + seen.add(current.id) + const parent = byId.get(current.parent_session_id) + if (!parent) break + if (nextContinuationChild(parent, byId, childrenByParent)?.id !== current.id) break + current = parent + } + return current?.id || null +} + +function isVisibleConversationStart(session: ConversationSessionRow | undefined, byId: Map, childrenByParent: Map): boolean { + if (!session || session.source === 'tool') return false + return !isCompressionContinuationChild(session, byId, childrenByParent) +} + function collectConversationChain(rootId: string, byId: Map, childrenByParent: Map): ConversationSessionRow[] { const chain: ConversationSessionRow[] = [] const seen = new Set() @@ -294,10 +314,10 @@ function aggregateSummary(rootId: string, byId: Map safeText(session.cost_status)).filter(Boolean))) return { - ...toSummary(root), - title: root.title || firstPreview || null, - preview: root.preview || firstPreview, - model: safeText(last?.model || root.model), + ...toSummary(last), + title: last.title || root.title || firstPreview || null, + preview: last.preview || root.preview || firstPreview, + started_at: Number(root.started_at || 0), ended_at: last?.ended_at ?? null, last_active: Math.max(...chain.map(session => session.last_active)), is_active: chain.some(session => session.is_active), @@ -427,7 +447,7 @@ export async function listConversationSummariesFromDb(options: ConversationListO } const summaries = sessions - .filter(session => isVisibleRoot(session, byId)) + .filter(session => isVisibleConversationStart(session, byId, childrenByParent)) .map(session => aggregateSummary(session.id, byId, childrenByParent)) .filter((summary): summary is ConversationSummary => !!summary) @@ -452,9 +472,12 @@ export async function getConversationDetailFromDb(sessionId: string, options: Co if (!session || session.source === 'tool') return null chain = [session] } else { - const root = byId.get(sessionId) - if (!isVisibleRoot(root, byId)) return null - chain = collectConversationChain(sessionId, byId, childrenByParent) + const session = byId.get(sessionId) + if (!session || session.source === 'tool') return null + const rootId = compressionChainRootId(sessionId, byId, childrenByParent) + if (!rootId) return null + if (!isVisibleConversationStart(byId.get(rootId), byId, childrenByParent)) return null + chain = collectConversationChain(rootId, byId, childrenByParent) } if (!chain.length) return null diff --git a/tests/client/chat-store.test.ts b/tests/client/chat-store.test.ts index cf1ccb4..4f3bce4 100644 --- a/tests/client/chat-store.test.ts +++ b/tests/client/chat-store.test.ts @@ -113,6 +113,149 @@ describe('Chat Store', () => { expect(store.messages.map(m => m.content)).toEqual(['draft']) }) + it('does not let a stale server refresh erase a newer local assistant reply', async () => { + const cachedMessages = [ + { id: 'u1', role: 'user', content: 'expensive task', timestamp: 1 }, + { id: 'a1', role: 'assistant', content: 'final answer that already streamed', timestamp: 2 }, + ] + + window.localStorage.setItem(ACTIVE_SESSION_KEY, 'sess-stale') + window.localStorage.setItem( + SESSIONS_CACHE_KEY, + JSON.stringify([ + { + id: 'sess-stale', + title: 'Stale refresh', + source: 'api_server', + messages: [], + createdAt: 1, + updatedAt: 2, + }, + ]), + ) + window.localStorage.setItem(sessionMessagesKey('sess-stale'), JSON.stringify(cachedMessages)) + + mockSessionsApi.fetchSessions.mockResolvedValue([makeSummary('sess-stale', 'Stale refresh')]) + mockSessionsApi.fetchSession.mockResolvedValue(makeDetail('sess-stale', [ + { + id: 1, + session_id: 'sess-stale', + role: 'user', + content: 'expensive task', + tool_call_id: null, + tool_calls: null, + tool_name: null, + timestamp: 1710000000, + token_count: null, + finish_reason: null, + reasoning: null, + }, + ])) + + const store = useChatStore() + await store.loadSessions() + expect(store.messages.map(m => m.content)).toEqual(['expensive task', 'final answer that already streamed']) + + await store.refreshActiveSession() + + expect(store.messages.map(m => m.content)).toEqual(['expensive task', 'final answer that already streamed']) + const persistedMessages = JSON.parse(window.localStorage.getItem(sessionMessagesKey('sess-stale')) || '[]') + expect(persistedMessages.map((m: any) => m.content)).toEqual(['expensive task', 'final answer that already streamed']) + }) + + it('does not let stale resume polling erase a newer local assistant reply', async () => { + vi.useFakeTimers() + vi.setSystemTime(new Date('2026-04-22T19:00:00.000Z')) + + const cachedMessages = [ + { id: 'u0', role: 'user', content: 'previous task', timestamp: 1 }, + { id: 'a0', role: 'assistant', content: 'a much longer previous assistant answer', timestamp: 2 }, + { id: 'u1', role: 'user', content: 'long task', timestamp: 3 }, + { id: 'a1', role: 'assistant', content: 'local final answer', timestamp: 4 }, + ] + + window.localStorage.setItem(ACTIVE_SESSION_KEY, 'sess-poll-stale') + window.localStorage.setItem( + SESSIONS_CACHE_KEY, + JSON.stringify([ + { + id: 'sess-poll-stale', + title: 'Polling stale refresh', + source: 'api_server', + messages: [], + createdAt: 1, + updatedAt: 2, + }, + ]), + ) + window.localStorage.setItem(sessionMessagesKey('sess-poll-stale'), JSON.stringify(cachedMessages)) + window.localStorage.setItem(inFlightKey('sess-poll-stale'), JSON.stringify({ runId: 'run-1', startedAt: Date.now() })) + + mockSessionsApi.fetchSessions.mockResolvedValue([makeSummary('sess-poll-stale', 'Polling stale refresh')]) + mockSessionsApi.fetchSession.mockResolvedValue(makeDetail('sess-poll-stale', [ + { + id: 1, + session_id: 'sess-poll-stale', + role: 'user', + content: 'previous task', + tool_call_id: null, + tool_calls: null, + tool_name: null, + timestamp: 1710000000, + token_count: null, + finish_reason: null, + reasoning: null, + }, + { + id: 2, + session_id: 'sess-poll-stale', + role: 'assistant', + content: 'a much longer previous assistant answer', + tool_call_id: null, + tool_calls: null, + tool_name: null, + timestamp: 1710000001, + token_count: null, + finish_reason: 'stop', + reasoning: null, + }, + { + id: 3, + session_id: 'sess-poll-stale', + role: 'user', + content: 'long task', + tool_call_id: null, + tool_calls: null, + tool_name: null, + timestamp: 1710000002, + token_count: null, + finish_reason: null, + reasoning: null, + }, + ])) + + const store = useChatStore() + await store.loadSessions() + expect(store.messages.map(m => m.content)).toEqual([ + 'previous task', + 'a much longer previous assistant answer', + 'long task', + 'local final answer', + ]) + + await vi.advanceTimersByTimeAsync(9000) + await flushPromises() + + expect(store.messages.map(m => m.content)).toEqual([ + 'previous task', + 'a much longer previous assistant answer', + 'long task', + 'local final answer', + ]) + expect(store.isRunActive).toBe(false) + expect(window.localStorage.getItem(inFlightKey('sess-poll-stale'))).toBeNull() + }) + it('persists the user message immediately before any SSE delta arrives', async () => { const store = useChatStore() diff --git a/tests/server/conversations-db.test.ts b/tests/server/conversations-db.test.ts index 14bc253..be85f76 100644 --- a/tests/server/conversations-db.test.ts +++ b/tests/server/conversations-db.test.ts @@ -195,21 +195,29 @@ describe('conversation DB service', () => { const summaries = await mod.listConversationSummariesFromDb({ humanOnly: true }) expect(summaries).toHaveLength(1) expect(summaries[0]).toEqual(expect.objectContaining({ - id: 'root', + id: 'root-cont', + title: 'Continuation', + started_at: 100, thread_session_count: 2, ended_at: 111, cost_status: 'mixed', actual_cost_usd: 0.30000000000000004, })) - const detail = await mod.getConversationDetailFromDb('root', { humanOnly: true }) - expect(detail?.thread_session_count).toBe(2) - expect(detail?.messages.map((message: any) => message.content)).toEqual([ + const detailFromTip = await mod.getConversationDetailFromDb('root-cont', { humanOnly: true }) + expect(detailFromTip?.session_id).toBe('root-cont') + expect(detailFromTip?.thread_session_count).toBe(2) + expect(detailFromTip?.messages.map((message: any) => message.content)).toEqual([ 'Start here', 'Assistant reply', 'Continue with more detail', 'Continued answer', ]) + + const detailFromRoot = await mod.getConversationDetailFromDb('root', { humanOnly: true }) + expect(detailFromRoot?.messages.map((message: any) => message.content)).toEqual( + detailFromTip?.messages.map((message: any) => message.content), + ) }) it('treats branched children as their own visible conversations', async () => { @@ -274,6 +282,69 @@ describe('conversation DB service', () => { expect(detail?.messages.map((message: any) => message.content)).toEqual(['Branch prompt', 'Branch answer']) }) + it('keeps non-compression child sessions visible instead of hiding them under their parent', async () => { + ensureSqliteAvailable() + const { DatabaseSync } = await import('node:sqlite') + const db = new DatabaseSync(join(profileDirState.value, 'state.db')) + createSchema(db) + + insertSession(db, { + id: 'parent', + parent_session_id: null, + source: 'cli', + model: 'openai/gpt-5.4', + title: 'Parent', + started_at: 100, + ended_at: 150, + end_reason: null, + message_count: 1, + tool_call_count: 0, + input_tokens: 0, + output_tokens: 0, + cache_read_tokens: 0, + cache_write_tokens: 0, + reasoning_tokens: 0, + billing_provider: 'openai', + estimated_cost_usd: 0, + actual_cost_usd: 0, + cost_status: 'estimated', + }) + insertSession(db, { + id: 'review-child', + parent_session_id: 'parent', + source: 'cli', + model: 'openai/gpt-5.4', + title: 'Independent review', + started_at: 300, + ended_at: 320, + end_reason: null, + message_count: 2, + tool_call_count: 0, + input_tokens: 0, + output_tokens: 0, + cache_read_tokens: 0, + cache_write_tokens: 0, + reasoning_tokens: 0, + billing_provider: 'openai', + estimated_cost_usd: 0, + actual_cost_usd: 0, + cost_status: 'estimated', + }) + + insertMessage(db, { id: 1, session_id: 'parent', role: 'user', content: 'Parent prompt', timestamp: 101 }) + insertMessage(db, { id: 2, session_id: 'review-child', role: 'user', content: 'Review prompt', timestamp: 301 }) + insertMessage(db, { id: 3, session_id: 'review-child', role: 'assistant', content: 'Review answer', timestamp: 302 }) + db.close() + + const mod = await import('../../packages/server/src/db/hermes/conversations-db') + const summaries = await mod.listConversationSummariesFromDb({ humanOnly: true }) + expect(summaries.map((summary: any) => summary.id)).toEqual(['review-child', 'parent']) + + const detail = await mod.getConversationDetailFromDb('review-child', { humanOnly: true }) + expect(detail?.thread_session_count).toBe(1) + expect(detail?.messages.map((message: any) => message.content)).toEqual(['Review prompt', 'Review answer']) + }) + it('excludes synthetic-only roots from human-only summaries and details', async () => { ensureSqliteAvailable() const { DatabaseSync } = await import('node:sqlite')