fix: harden Hermes stream recovery around tool boundaries (#189)

This commit is contained in:
Zhicheng Han
2026-04-24 15:42:42 +02:00
committed by GitHub
parent edd41e6eb7
commit 009acc1c28
6 changed files with 496 additions and 114 deletions
+101 -88
View File
@@ -1,5 +1,5 @@
import { startRun, streamRunEvents, type ChatMessage, type RunEvent } from '@/api/hermes/chat'
import { deleteSession as deleteSessionApi, fetchSession, fetchSessions, fetchSessionUsageSingle, type HermesMessage, type SessionSummary } from '@/api/hermes/sessions'
import { deleteSession as deleteSessionApi, fetchSession, fetchSessions, fetchSessionUsageSingle, type HermesMessage, type SessionDetail, type SessionSummary } from '@/api/hermes/sessions'
import { defineStore } from 'pinia'
import { ref, computed } from 'vue'
import { useAppStore } from './app'
@@ -162,6 +162,42 @@ function mapHermesSession(s: SessionSummary): Session {
}
}
function assistantTranscriptLength(msgs: Message[]): number {
return msgs.reduce((total, m) => total + (m.role === 'assistant' ? (m.content?.length ?? 0) : 0), 0)
}
function userTurnCount(msgs: Message[]): number {
return msgs.filter(m => m.role === 'user').length
}
function serverMessagesAreAheadOrEqual(local: Message[], server: Message[]): boolean {
const localUsers = userTurnCount(local)
const serverUsers = userTurnCount(server)
return serverUsers > localUsers
|| (serverUsers === localUsers && assistantTranscriptLength(server) >= assistantTranscriptLength(local))
}
function serverHasCaughtUpToLocalTurn(local: Message[], server: Message[]): boolean {
return userTurnCount(server) >= userTurnCount(local)
}
function applySessionMetaFromDetail(target: Session, detail: SessionDetail) {
if (detail.title) target.title = detail.title
target.endedAt = detail.ended_at != null ? Math.round(detail.ended_at * 1000) : null
target.lastActiveAt = detail.last_active != null ? Math.round(detail.last_active * 1000) : target.lastActiveAt
target.updatedAt = Math.round((detail.last_active || detail.ended_at || target.updatedAt / 1000) * 1000)
}
function applyServerMessagesIfAhead(target: Session, detail: SessionDetail): boolean {
const mapped = mapHermesMessages(detail.messages || [])
applySessionMetaFromDetail(target, detail)
if (serverMessagesAreAheadOrEqual(target.messages, mapped)) {
target.messages = mapped
return true
}
return false
}
// Cache keys for stale-while-revalidate loading of sessions / messages.
// All keys include the active profile name to isolate cache between profiles.
// Rendering from cache on boot avoids the multi-round-trip wait the user sees
@@ -356,8 +392,10 @@ export const useChatStore = defineStore('chat', () => {
saveJsonWithLegacy(inFlightKey(sid), { runId, startedAt: Date.now() } as InFlightRun, legacyInFlightKey(sid))
}
function clearInFlight(sid: string) {
function clearInFlight(sid: string, runId?: string): boolean {
if (runId && readInFlight(sid)?.runId !== runId) return false
removeItemWithLegacy(inFlightKey(sid), legacyInFlightKey(sid))
return true
}
function readInFlight(sid: string): InFlightRun | null {
@@ -381,8 +419,8 @@ export const useChatStore = defineStore('chat', () => {
}
// Poll fetchSession while an in-flight run is recovering. Exits when the
// server's message signature is stable for POLL_STABLE_EXITS ticks (run
// presumed done), TTL elapses, or the user explicitly starts streaming.
// server reports a terminal session and its message signature is stable for
// POLL_STABLE_EXITS ticks, TTL elapses, or the user explicitly starts streaming.
function startPolling(sid: string) {
if (pollTimers.has(sid)) return
resumingRuns.value = new Set([...resumingRuns.value, sid])
@@ -403,34 +441,21 @@ export const useChatStore = defineStore('chat', () => {
const mapped = mapHermesMessages(detail.messages || [])
const target = sessions.value.find(s => s.id === sid)
if (!target) return
// Use the same "content-aware" comparison as switchSession: server
// is ahead iff it knows about at least as many user turns and its
// last assistant text is at least as long as ours.
const local = target.messages
const localLastAssistant = [...local].reverse().find(m => m.role === 'assistant')
const serverLastAssistant = [...mapped].reverse().find(m => m.role === 'assistant')
const localAssistantLen = localLastAssistant?.content?.length ?? 0
const serverAssistantLen = serverLastAssistant?.content?.length ?? 0
const localUsers = local.filter(m => m.role === 'user').length
const serverUsers = mapped.filter(m => m.role === 'user').length
const serverIsCaughtUp = serverUsers >= localUsers
// Same rationale as switchSession: strictly more user turns means
// server is ahead (new turn complete). Equal user turns + longer
// assistant means server caught up on the current turn.
const serverIsAhead =
serverUsers > localUsers
|| (serverUsers === localUsers && serverAssistantLen >= localAssistantLen)
const serverIsCaughtUp = serverHasCaughtUpToLocalTurn(target.messages, mapped)
const serverIsAhead = serverMessagesAreAheadOrEqual(target.messages, mapped)
const serverIsTerminal = detail.ended_at != null
applySessionMetaFromDetail(target, detail)
if (serverIsAhead) {
target.messages = mapped
if (detail.title && !target.title) target.title = detail.title
if (sid === activeSessionId.value) persistActiveMessages()
}
// Stability detection ONLY matters when the server has at least as
// many user turns as we do. Otherwise the server is still catching
// up (e.g. the new turn we just sent hasn't been flushed server-side
// yet) and a "stable" signature is a false positive — the stability
// is the server NOT having our latest turn, not the run being done.
if (!serverIsCaughtUp) {
// Stability detection ONLY matters when the server has caught up to
// our latest user turn AND the session is terminal. During long tool
// calls the persisted transcript may be stable while the run is still
// active; treating that as completion is the truncation failure mode.
if (!serverIsCaughtUp || !serverIsTerminal) {
pollSignatures.delete(sid)
} else {
const last = mapped[mapped.length - 1]
@@ -439,14 +464,13 @@ export const useChatStore = defineStore('chat', () => {
if (prev && prev.sig === sig) {
prev.stableTicks += 1
if (prev.stableTicks >= POLL_STABLE_EXITS) {
// Run is done on the server. Force-apply server view even if
// our "don't retreat" guard above skipped it — the server is
// now the authoritative source of truth.
target.messages = mapped
if (detail.title) target.title = detail.title
// Server confirms this run is terminal. Keep any longer local
// stream text if the final session export lags behind.
if (serverIsAhead) target.messages = mapped
if (sid === activeSessionId.value) persistActiveMessages()
clearInFlight(sid)
stopPolling(sid)
if (clearInFlight(sid, inFlight.runId)) {
stopPolling(sid)
}
}
} else {
pollSignatures.set(sid, { sig, stableTicks: 0 })
@@ -521,9 +545,9 @@ export const useChatStore = defineStore('chat', () => {
}
}
// Re-pull active session from server and overwrite local messages. Used on
// SSE drop and on tab-visible events — mobile browsers kill EventSource
// while backgrounded, but the backend run usually completes anyway.
// Re-pull active session from server without retreating from longer local
// streamed text. Used on SSE drop and tab-visible events — mobile browsers
// can kill EventSource while the backend run continues.
async function refreshActiveSession(): Promise<boolean> {
const sid = activeSessionId.value
if (!sid) return false
@@ -532,17 +556,28 @@ export const useChatStore = defineStore('chat', () => {
if (!detail) return false
const target = sessions.value.find(s => s.id === sid)
if (!target) return false
const mapped = mapHermesMessages(detail.messages || [])
target.messages = mapped
if (detail.title) target.title = detail.title
persistActiveMessages()
return true
const applied = applyServerMessagesIfAhead(target, detail)
if (applied && sid === activeSessionId.value) persistActiveMessages()
return applied
} catch (err) {
console.error('Failed to refresh active session:', err)
return false
}
}
async function reconcileSessionAfterCompletion(sid: string): Promise<void> {
try {
const detail = await fetchSession(sid)
if (!detail) return
const target = sessions.value.find(s => s.id === sid)
if (!target) return
const applied = applyServerMessagesIfAhead(target, detail)
if (applied && sid === activeSessionId.value) persistActiveMessages()
} catch (err) {
console.error('Failed to reconcile completed session:', err)
}
}
function createSession(): Session {
const session: Session = {
@@ -588,34 +623,14 @@ export const useChatStore = defineStore('chat', () => {
const detail = await fetchSession(sessionId)
if (detail && detail.messages) {
const mapped = mapHermesMessages(detail.messages)
// Pick whichever view has more information. Simple length comparison
// is wrong because mapHermesMessages folds tool_call-only assistant
// msgs and matches them with tool-result msgs — so post-fold `mapped`
// can be SHORTER than the raw SSE-built local array even when the
// server is strictly ahead. Instead, compare the last assistant
// message content: if the server's is at least as long, the server
// is up-to-date (and has the final complete text); otherwise keep
// local (in-flight window where server hasn't flushed the new turn).
const local = activeSession.value.messages
const localLastAssistant = [...local].reverse().find(m => m.role === 'assistant')
const serverLastAssistant = [...mapped].reverse().find(m => m.role === 'assistant')
const localAssistantLen = localLastAssistant?.content?.length ?? 0
const serverAssistantLen = serverLastAssistant?.content?.length ?? 0
const localUsers = local.filter(m => m.role === 'user').length
const serverUsers = mapped.filter(m => m.role === 'user').length
// Trust server when:
// - it has STRICTLY MORE user turns than we do (new turn landed),
// OR
// - same user-turn count AND server's last assistant is at least
// as long as ours (same turn, server caught up or further)
// Otherwise keep local (protects against the server-not-yet-flushed
// race during in-flight runs). Length comparison alone is wrong
// across different turns because each turn's last assistant is
// unrelated to the previous turn's.
const serverIsAhead =
serverUsers > localUsers
|| (serverUsers === localUsers && serverAssistantLen >= localAssistantLen)
if (serverIsAhead) {
// Pick whichever view has more information. Simple array length
// comparison is wrong because mapHermesMessages folds tool-call-only
// assistant msgs into tool-result msgs. Also, tool boundaries can
// split one assistant turn into pre-tool and post-tool assistant
// segments, so comparing only the last assistant segment can retreat
// a fuller local transcript to stale pre-tool server text. Compare
// user-turn count plus total assistant transcript length instead.
if (serverMessagesAreAheadOrEqual(activeSession.value.messages, mapped)) {
activeSession.value.messages = mapped
}
// Update title: use Hermes title, or fallback to first user message
@@ -823,7 +838,8 @@ export const useChatStore = defineStore('chat', () => {
case 'run.started':
break
case 'message.delta': {
case 'message.delta':
case 'message': {
const msgs = getSessionMsgs(sid)
const last = msgs[msgs.length - 1]
if (last?.role === 'assistant' && last.isStreaming) {
@@ -888,14 +904,14 @@ export const useChatStore = defineStore('chat', () => {
}
cleanup()
updateSessionTitle(sid)
// the in-flight marker. If the browser is reloading right now
// and kills us between the two localStorage writes, we want
// the next page load to still see in-flight === true (so
// polling kicks in and recovers) rather than the other way
// around (cleared in-flight + stale streaming cache = UI stuck).
// Persist the terminal local view before clearing the in-flight
// marker. If final SSE deltas were missed, reconcile once from
// the authoritative session export without retreating from
// longer local text.
if (sid === activeSessionId.value) persistActiveMessages()
clearInFlight(sid)
stopPolling(sid)
void reconcileSessionAfterCompletion(sid).finally(() => {
if (readInFlight(sid)?.runId === runId) startPolling(sid)
})
break
}
@@ -923,8 +939,9 @@ export const useChatStore = defineStore('chat', () => {
})
cleanup()
if (sid === activeSessionId.value) persistActiveMessages()
clearInFlight(sid)
stopPolling(sid)
if (clearInFlight(sid, runId)) {
stopPolling(sid)
}
break
}
}
@@ -953,13 +970,9 @@ export const useChatStore = defineStore('chat', () => {
if (last?.isStreaming) {
updateMessage(sid, last.id, { isStreaming: false })
}
// Any tool messages still marked 'running' will be replaced by the
// server's view after refresh; clear their spinner state now.
msgs.forEach((m, i) => {
if (m.role === 'tool' && m.toolStatus === 'running') {
msgs[i] = { ...m, toolStatus: 'done' }
}
})
// Keep running tool state until refresh/polling sees the server's
// terminal transcript. A dropped SSE connection is not proof the
// tool completed.
cleanup()
if (sid === activeSessionId.value) {
void refreshActiveSession()