fix chat session lineage visibility (#228)

This commit is contained in:
Zhicheng Han
2026-04-26 04:29:17 +02:00
committed by GitHub
parent f1a6d97c8b
commit b68ba8bcb9
4 changed files with 320 additions and 75 deletions
+61 -53
View File
@@ -397,6 +397,39 @@ export const useChatStore = defineStore('chat', () => {
return rec
}
function compareServerMessages(local: Message[], server: Message[]) {
const localUserIndexes = local.map((m, i) => (m.role === 'user' ? i : -1)).filter(i => i >= 0)
const serverUserIndexes = server.map((m, i) => (m.role === 'user' ? i : -1)).filter(i => i >= 0)
const localUsers = localUserIndexes.length
const serverUsers = serverUserIndexes.length
if (serverUsers > localUsers) return { serverIsCaughtUp: true, serverIsAhead: true }
if (serverUsers < localUsers) return { serverIsCaughtUp: false, serverIsAhead: false }
const localLastUserIndex = localUserIndexes[localUserIndexes.length - 1] ?? -1
const serverLastUserIndex = serverUserIndexes[serverUserIndexes.length - 1] ?? -1
const sameCurrentTurn =
localLastUserIndex < 0
|| serverLastUserIndex < 0
|| local[localLastUserIndex]?.content === server[serverLastUserIndex]?.content
if (!sameCurrentTurn) return { serverIsCaughtUp: false, serverIsAhead: false }
const localCurrentAssistantLen = local
.slice(localLastUserIndex + 1)
.filter(m => m.role === 'assistant')
.reduce((total, m) => total + (m.content?.length || 0), 0)
const serverCurrentAssistantLen = server
.slice(serverLastUserIndex + 1)
.filter(m => m.role === 'assistant')
.reduce((total, m) => total + (m.content?.length || 0), 0)
return {
serverIsCaughtUp: true,
serverIsAhead: serverCurrentAssistantLen >= localCurrentAssistantLen,
}
}
function stopPolling(sid: string) {
const t = pollTimers.get(sid)
if (t) {
@@ -430,23 +463,11 @@ export const useChatStore = defineStore('chat', () => {
const mapped = mapHermesMessages(detail.messages || [])
const target = sessions.value.find(s => s.id === sid)
if (!target) return
// Use the same "content-aware" comparison as switchSession: server
// is ahead iff it knows about at least as many user turns and its
// last assistant text is at least as long as ours.
// Use the same current-turn comparison as switchSession: server is
// ahead only when it has a newer user turn or the assistant output
// after the current user turn has caught up.
const local = target.messages
const localLastAssistant = [...local].reverse().find(m => m.role === 'assistant')
const serverLastAssistant = [...mapped].reverse().find(m => m.role === 'assistant')
const localAssistantLen = localLastAssistant?.content?.length ?? 0
const serverAssistantLen = serverLastAssistant?.content?.length ?? 0
const localUsers = local.filter(m => m.role === 'user').length
const serverUsers = mapped.filter(m => m.role === 'user').length
const serverIsCaughtUp = serverUsers >= localUsers
// Same rationale as switchSession: strictly more user turns means
// server is ahead (new turn complete). Equal user turns + longer
// assistant means server caught up on the current turn.
const serverIsAhead =
serverUsers > localUsers
|| (serverUsers === localUsers && serverAssistantLen >= localAssistantLen)
const { serverIsAhead, serverIsCaughtUp } = compareServerMessages(local, mapped)
if (serverIsAhead) {
target.messages = mapped
if (detail.title && !target.title) target.title = detail.title
@@ -466,12 +487,14 @@ export const useChatStore = defineStore('chat', () => {
if (prev && prev.sig === sig) {
prev.stableTicks += 1
if (prev.stableTicks >= POLL_STABLE_EXITS) {
// Run is done on the server. Force-apply server view even if
// our "don't retreat" guard above skipped it — the server is
// now the authoritative source of truth.
target.messages = mapped
if (detail.title) target.title = detail.title
if (sid === activeSessionId.value) persistActiveMessages()
// The server view has stopped changing. If it is still behind
// the locally streamed assistant reply, end recovery without
// retreating local state; otherwise commit the server view.
if (serverIsAhead) {
target.messages = mapped
if (detail.title) target.title = detail.title
if (sid === activeSessionId.value) persistActiveMessages()
}
clearInFlight(sid)
stopPolling(sid)
}
@@ -548,9 +571,10 @@ export const useChatStore = defineStore('chat', () => {
}
}
// Re-pull active session from server and overwrite local messages. Used on
// SSE drop and on tab-visible events — mobile browsers kill EventSource
// while backgrounded, but the backend run usually completes anyway.
// Re-pull active session from server without retreating newer locally
// streamed output. Used on SSE drop and on tab-visible events — mobile
// browsers kill EventSource while backgrounded, but the backend run usually
// completes anyway.
async function refreshActiveSession(): Promise<boolean> {
const sid = activeSessionId.value
if (!sid) return false
@@ -560,9 +584,12 @@ export const useChatStore = defineStore('chat', () => {
const target = sessions.value.find(s => s.id === sid)
if (!target) return false
const mapped = mapHermesMessages(detail.messages || [])
target.messages = mapped
const { serverIsAhead } = compareServerMessages(target.messages, mapped)
if (serverIsAhead) {
target.messages = mapped
persistActiveMessages()
}
if (detail.title) target.title = detail.title
persistActiveMessages()
return true
} catch (err) {
console.error('Failed to refresh active session:', err)
@@ -616,33 +643,14 @@ export const useChatStore = defineStore('chat', () => {
const detail = await fetchSession(sessionId)
if (detail && detail.messages) {
const mapped = mapHermesMessages(detail.messages)
// Pick whichever view has more information. Simple length comparison
// is wrong because mapHermesMessages folds tool_call-only assistant
// msgs and matches them with tool-result msgs — so post-fold `mapped`
// can be SHORTER than the raw SSE-built local array even when the
// server is strictly ahead. Instead, compare the last assistant
// message content: if the server's is at least as long, the server
// is up-to-date (and has the final complete text); otherwise keep
// local (in-flight window where server hasn't flushed the new turn).
// Pick whichever view has more information for the current turn.
// Simple message-count comparison is wrong because mapHermesMessages
// folds tool_call-only assistant messages; global last-assistant
// comparison is also wrong across turns. Trust server only when it has
// a newer user turn or its assistant output after the current user turn
// has caught up.
const local = activeSession.value.messages
const localLastAssistant = [...local].reverse().find(m => m.role === 'assistant')
const serverLastAssistant = [...mapped].reverse().find(m => m.role === 'assistant')
const localAssistantLen = localLastAssistant?.content?.length ?? 0
const serverAssistantLen = serverLastAssistant?.content?.length ?? 0
const localUsers = local.filter(m => m.role === 'user').length
const serverUsers = mapped.filter(m => m.role === 'user').length
// Trust server when:
// - it has STRICTLY MORE user turns than we do (new turn landed),
// OR
// - same user-turn count AND server's last assistant is at least
// as long as ours (same turn, server caught up or further)
// Otherwise keep local (protects against the server-not-yet-flushed
// race during in-flight runs). Length comparison alone is wrong
// across different turns because each turn's last assistant is
// unrelated to the previous turn's.
const serverIsAhead =
serverUsers > localUsers
|| (serverUsers === localUsers && serverAssistantLen >= localAssistantLen)
const { serverIsAhead } = compareServerMessages(local, mapped)
if (serverIsAhead) {
activeSession.value.messages = mapped
}
@@ -205,15 +205,8 @@ function timingMatchesParent(parent: ConversationSessionRow | undefined, child:
return Math.abs(Number(child.started_at || 0) - Number(parent.ended_at || 0)) <= LINEAGE_TOLERANCE_SECONDS
}
function isBranchRoot(session: ConversationSessionRow | undefined, byId: Map<string, ConversationSessionRow>): boolean {
if (!session?.parent_session_id) return false
const parent = byId.get(session.parent_session_id)
return !!parent && parent.end_reason === 'branched' && timingMatchesParent(parent, session)
}
function isVisibleRoot(session: ConversationSessionRow | undefined, byId: Map<string, ConversationSessionRow>): boolean {
if (!session || session.source === 'tool') return false
return session.parent_session_id == null || isBranchRoot(session, byId)
function isCompressionEndReason(reason: string | null): boolean {
return reason === 'compression' || reason === 'compressed'
}
function continuationCandidates(parent: ConversationSessionRow, byId: Map<string, ConversationSessionRow>, childrenByParent: Map<string | null, string[]>): ConversationSessionRow[] {
@@ -233,7 +226,7 @@ function continuationCandidates(parent: ConversationSessionRow, byId: Map<string
}
function nextContinuationChild(parent: ConversationSessionRow, byId: Map<string, ConversationSessionRow>, childrenByParent: Map<string | null, string[]>): ConversationSessionRow | null {
if (parent.end_reason !== 'compression') return null
if (!isCompressionEndReason(parent.end_reason)) return null
const candidates = continuationCandidates(parent, byId, childrenByParent)
if (candidates.length === 1) return candidates[0]
@@ -247,6 +240,33 @@ function nextContinuationChild(parent: ConversationSessionRow, byId: Map<string,
return null
}
function isCompressionContinuationChild(session: ConversationSessionRow | undefined, byId: Map<string, ConversationSessionRow>, childrenByParent: Map<string | null, string[]>): boolean {
if (!session?.parent_session_id) return false
const parent = byId.get(session.parent_session_id)
if (!parent) return false
return nextContinuationChild(parent, byId, childrenByParent)?.id === session.id
}
function compressionChainRootId(sessionId: string, byId: Map<string, ConversationSessionRow>, childrenByParent: Map<string | null, string[]>): string | null {
let current = byId.get(sessionId) || null
if (!current || current.source === 'tool') return null
const seen = new Set<string>()
while (current?.parent_session_id && !seen.has(current.id)) {
seen.add(current.id)
const parent = byId.get(current.parent_session_id)
if (!parent) break
if (nextContinuationChild(parent, byId, childrenByParent)?.id !== current.id) break
current = parent
}
return current?.id || null
}
function isVisibleConversationStart(session: ConversationSessionRow | undefined, byId: Map<string, ConversationSessionRow>, childrenByParent: Map<string | null, string[]>): boolean {
if (!session || session.source === 'tool') return false
return !isCompressionContinuationChild(session, byId, childrenByParent)
}
function collectConversationChain(rootId: string, byId: Map<string, ConversationSessionRow>, childrenByParent: Map<string | null, string[]>): ConversationSessionRow[] {
const chain: ConversationSessionRow[] = []
const seen = new Set<string>()
@@ -294,10 +314,10 @@ function aggregateSummary(rootId: string, byId: Map<string, ConversationSessionR
const costStatuses = Array.from(new Set(chain.map(session => safeText(session.cost_status)).filter(Boolean)))
return {
...toSummary(root),
title: root.title || firstPreview || null,
preview: root.preview || firstPreview,
model: safeText(last?.model || root.model),
...toSummary(last),
title: last.title || root.title || firstPreview || null,
preview: last.preview || root.preview || firstPreview,
started_at: Number(root.started_at || 0),
ended_at: last?.ended_at ?? null,
last_active: Math.max(...chain.map(session => session.last_active)),
is_active: chain.some(session => session.is_active),
@@ -427,7 +447,7 @@ export async function listConversationSummariesFromDb(options: ConversationListO
}
const summaries = sessions
.filter(session => isVisibleRoot(session, byId))
.filter(session => isVisibleConversationStart(session, byId, childrenByParent))
.map(session => aggregateSummary(session.id, byId, childrenByParent))
.filter((summary): summary is ConversationSummary => !!summary)
@@ -452,9 +472,12 @@ export async function getConversationDetailFromDb(sessionId: string, options: Co
if (!session || session.source === 'tool') return null
chain = [session]
} else {
const root = byId.get(sessionId)
if (!isVisibleRoot(root, byId)) return null
chain = collectConversationChain(sessionId, byId, childrenByParent)
const session = byId.get(sessionId)
if (!session || session.source === 'tool') return null
const rootId = compressionChainRootId(sessionId, byId, childrenByParent)
if (!rootId) return null
if (!isVisibleConversationStart(byId.get(rootId), byId, childrenByParent)) return null
chain = collectConversationChain(rootId, byId, childrenByParent)
}
if (!chain.length) return null
+143
View File
@@ -113,6 +113,149 @@ describe('Chat Store', () => {
expect(store.messages.map(m => m.content)).toEqual(['draft'])
})
it('does not let a stale server refresh erase a newer local assistant reply', async () => {
const cachedMessages = [
{ id: 'u1', role: 'user', content: 'expensive task', timestamp: 1 },
{ id: 'a1', role: 'assistant', content: 'final answer that already streamed', timestamp: 2 },
]
window.localStorage.setItem(ACTIVE_SESSION_KEY, 'sess-stale')
window.localStorage.setItem(
SESSIONS_CACHE_KEY,
JSON.stringify([
{
id: 'sess-stale',
title: 'Stale refresh',
source: 'api_server',
messages: [],
createdAt: 1,
updatedAt: 2,
},
]),
)
window.localStorage.setItem(sessionMessagesKey('sess-stale'), JSON.stringify(cachedMessages))
mockSessionsApi.fetchSessions.mockResolvedValue([makeSummary('sess-stale', 'Stale refresh')])
mockSessionsApi.fetchSession.mockResolvedValue(makeDetail('sess-stale', [
{
id: 1,
session_id: 'sess-stale',
role: 'user',
content: 'expensive task',
tool_call_id: null,
tool_calls: null,
tool_name: null,
timestamp: 1710000000,
token_count: null,
finish_reason: null,
reasoning: null,
},
]))
const store = useChatStore()
await store.loadSessions()
expect(store.messages.map(m => m.content)).toEqual(['expensive task', 'final answer that already streamed'])
await store.refreshActiveSession()
expect(store.messages.map(m => m.content)).toEqual(['expensive task', 'final answer that already streamed'])
const persistedMessages = JSON.parse(window.localStorage.getItem(sessionMessagesKey('sess-stale')) || '[]')
expect(persistedMessages.map((m: any) => m.content)).toEqual(['expensive task', 'final answer that already streamed'])
})
it('does not let stale resume polling erase a newer local assistant reply', async () => {
vi.useFakeTimers()
vi.setSystemTime(new Date('2026-04-22T19:00:00.000Z'))
const cachedMessages = [
{ id: 'u0', role: 'user', content: 'previous task', timestamp: 1 },
{ id: 'a0', role: 'assistant', content: 'a much longer previous assistant answer', timestamp: 2 },
{ id: 'u1', role: 'user', content: 'long task', timestamp: 3 },
{ id: 'a1', role: 'assistant', content: 'local final answer', timestamp: 4 },
]
window.localStorage.setItem(ACTIVE_SESSION_KEY, 'sess-poll-stale')
window.localStorage.setItem(
SESSIONS_CACHE_KEY,
JSON.stringify([
{
id: 'sess-poll-stale',
title: 'Polling stale refresh',
source: 'api_server',
messages: [],
createdAt: 1,
updatedAt: 2,
},
]),
)
window.localStorage.setItem(sessionMessagesKey('sess-poll-stale'), JSON.stringify(cachedMessages))
window.localStorage.setItem(inFlightKey('sess-poll-stale'), JSON.stringify({ runId: 'run-1', startedAt: Date.now() }))
mockSessionsApi.fetchSessions.mockResolvedValue([makeSummary('sess-poll-stale', 'Polling stale refresh')])
mockSessionsApi.fetchSession.mockResolvedValue(makeDetail('sess-poll-stale', [
{
id: 1,
session_id: 'sess-poll-stale',
role: 'user',
content: 'previous task',
tool_call_id: null,
tool_calls: null,
tool_name: null,
timestamp: 1710000000,
token_count: null,
finish_reason: null,
reasoning: null,
},
{
id: 2,
session_id: 'sess-poll-stale',
role: 'assistant',
content: 'a much longer previous assistant answer',
tool_call_id: null,
tool_calls: null,
tool_name: null,
timestamp: 1710000001,
token_count: null,
finish_reason: 'stop',
reasoning: null,
},
{
id: 3,
session_id: 'sess-poll-stale',
role: 'user',
content: 'long task',
tool_call_id: null,
tool_calls: null,
tool_name: null,
timestamp: 1710000002,
token_count: null,
finish_reason: null,
reasoning: null,
},
]))
const store = useChatStore()
await store.loadSessions()
expect(store.messages.map(m => m.content)).toEqual([
'previous task',
'a much longer previous assistant answer',
'long task',
'local final answer',
])
await vi.advanceTimersByTimeAsync(9000)
await flushPromises()
expect(store.messages.map(m => m.content)).toEqual([
'previous task',
'a much longer previous assistant answer',
'long task',
'local final answer',
])
expect(store.isRunActive).toBe(false)
expect(window.localStorage.getItem(inFlightKey('sess-poll-stale'))).toBeNull()
})
it('persists the user message immediately before any SSE delta arrives', async () => {
const store = useChatStore()
+75 -4
View File
@@ -195,21 +195,29 @@ describe('conversation DB service', () => {
const summaries = await mod.listConversationSummariesFromDb({ humanOnly: true })
expect(summaries).toHaveLength(1)
expect(summaries[0]).toEqual(expect.objectContaining({
id: 'root',
id: 'root-cont',
title: 'Continuation',
started_at: 100,
thread_session_count: 2,
ended_at: 111,
cost_status: 'mixed',
actual_cost_usd: 0.30000000000000004,
}))
const detail = await mod.getConversationDetailFromDb('root', { humanOnly: true })
expect(detail?.thread_session_count).toBe(2)
expect(detail?.messages.map((message: any) => message.content)).toEqual([
const detailFromTip = await mod.getConversationDetailFromDb('root-cont', { humanOnly: true })
expect(detailFromTip?.session_id).toBe('root-cont')
expect(detailFromTip?.thread_session_count).toBe(2)
expect(detailFromTip?.messages.map((message: any) => message.content)).toEqual([
'Start here',
'Assistant reply',
'Continue with more detail',
'Continued answer',
])
const detailFromRoot = await mod.getConversationDetailFromDb('root', { humanOnly: true })
expect(detailFromRoot?.messages.map((message: any) => message.content)).toEqual(
detailFromTip?.messages.map((message: any) => message.content),
)
})
it('treats branched children as their own visible conversations', async () => {
@@ -274,6 +282,69 @@ describe('conversation DB service', () => {
expect(detail?.messages.map((message: any) => message.content)).toEqual(['Branch prompt', 'Branch answer'])
})
it('keeps non-compression child sessions visible instead of hiding them under their parent', async () => {
ensureSqliteAvailable()
const { DatabaseSync } = await import('node:sqlite')
const db = new DatabaseSync(join(profileDirState.value, 'state.db'))
createSchema(db)
insertSession(db, {
id: 'parent',
parent_session_id: null,
source: 'cli',
model: 'openai/gpt-5.4',
title: 'Parent',
started_at: 100,
ended_at: 150,
end_reason: null,
message_count: 1,
tool_call_count: 0,
input_tokens: 0,
output_tokens: 0,
cache_read_tokens: 0,
cache_write_tokens: 0,
reasoning_tokens: 0,
billing_provider: 'openai',
estimated_cost_usd: 0,
actual_cost_usd: 0,
cost_status: 'estimated',
})
insertSession(db, {
id: 'review-child',
parent_session_id: 'parent',
source: 'cli',
model: 'openai/gpt-5.4',
title: 'Independent review',
started_at: 300,
ended_at: 320,
end_reason: null,
message_count: 2,
tool_call_count: 0,
input_tokens: 0,
output_tokens: 0,
cache_read_tokens: 0,
cache_write_tokens: 0,
reasoning_tokens: 0,
billing_provider: 'openai',
estimated_cost_usd: 0,
actual_cost_usd: 0,
cost_status: 'estimated',
})
insertMessage(db, { id: 1, session_id: 'parent', role: 'user', content: 'Parent prompt', timestamp: 101 })
insertMessage(db, { id: 2, session_id: 'review-child', role: 'user', content: 'Review prompt', timestamp: 301 })
insertMessage(db, { id: 3, session_id: 'review-child', role: 'assistant', content: 'Review answer', timestamp: 302 })
db.close()
const mod = await import('../../packages/server/src/db/hermes/conversations-db')
const summaries = await mod.listConversationSummariesFromDb({ humanOnly: true })
expect(summaries.map((summary: any) => summary.id)).toEqual(['review-child', 'parent'])
const detail = await mod.getConversationDetailFromDb('review-child', { humanOnly: true })
expect(detail?.thread_session_count).toBe(1)
expect(detail?.messages.map((message: any) => message.content)).toEqual(['Review prompt', 'Review answer'])
})
it('excludes synthetic-only roots from human-only summaries and details', async () => {
ensureSqliteAvailable()
const { DatabaseSync } = await import('node:sqlite')