fix: recover legacy session_usage migration (#345)
Quote SQL defaults when rebuilding legacy usage tables and recover rows left in session_usage_old by failed migrations.
This commit is contained in:
@@ -178,6 +178,105 @@ export const GC_SESSION_PROFILES_SCHEMA: Record<string, string> = {
|
|||||||
|
|
||||||
import { ensureTable, getDb } from '../index'
|
import { ensureTable, getDb } from '../index'
|
||||||
|
|
||||||
|
function quoteIdentifier(identifier: string): string {
|
||||||
|
return `"${identifier.replace(/"/g, '""')}"`
|
||||||
|
}
|
||||||
|
|
||||||
|
function sqlLiteral(value: string | number): string {
|
||||||
|
if (typeof value === 'number') return String(value)
|
||||||
|
return `'${value.replace(/'/g, "''")}'`
|
||||||
|
}
|
||||||
|
|
||||||
|
function usageSchemaDefinitionSql(): string {
|
||||||
|
return Object.entries(USAGE_SCHEMA)
|
||||||
|
.map(([col, def]) => `${quoteIdentifier(col)} ${def}`)
|
||||||
|
.join(', ')
|
||||||
|
}
|
||||||
|
|
||||||
|
function sqliteTableExists(db: NonNullable<ReturnType<typeof getDb>>, tableName: string): boolean {
|
||||||
|
return Boolean(db.prepare(`SELECT name FROM sqlite_master WHERE type='table' AND name=?`).get(tableName))
|
||||||
|
}
|
||||||
|
|
||||||
|
function sqliteTableColumns(db: NonNullable<ReturnType<typeof getDb>>, tableName: string): Set<string> {
|
||||||
|
const rows = db.prepare(`PRAGMA table_info(${quoteIdentifier(tableName)})`).all() as Array<{ name: string }>
|
||||||
|
return new Set(rows.map(row => row.name))
|
||||||
|
}
|
||||||
|
|
||||||
|
function legacyUsageValueSql(
|
||||||
|
sourceAlias: string,
|
||||||
|
oldCols: Set<string>,
|
||||||
|
col: string,
|
||||||
|
defaults: Record<string, string | number>,
|
||||||
|
): string {
|
||||||
|
const sourceColumn = (sourceCol: string) => `${quoteIdentifier(sourceAlias)}.${quoteIdentifier(sourceCol)}`
|
||||||
|
|
||||||
|
if (col === 'created_at' && oldCols.has('updated_at')) {
|
||||||
|
return `COALESCE(${sourceColumn('updated_at')}, ${sqlLiteral(defaults.created_at)})`
|
||||||
|
}
|
||||||
|
|
||||||
|
if (oldCols.has(col)) {
|
||||||
|
return `COALESCE(${sourceColumn(col)}, ${sqlLiteral(defaults[col] ?? 0)})`
|
||||||
|
}
|
||||||
|
|
||||||
|
return sqlLiteral(defaults[col] ?? 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
function insertUsageRowsFromLegacyTable(
|
||||||
|
db: NonNullable<ReturnType<typeof getDb>>,
|
||||||
|
oldTableName: string,
|
||||||
|
oldCols: Set<string>,
|
||||||
|
skipExistingSessionIds = false,
|
||||||
|
): void {
|
||||||
|
const defaults: Record<string, string | number> = {
|
||||||
|
session_id: '',
|
||||||
|
input_tokens: 0,
|
||||||
|
output_tokens: 0,
|
||||||
|
cache_read_tokens: 0,
|
||||||
|
cache_write_tokens: 0,
|
||||||
|
reasoning_tokens: 0,
|
||||||
|
created_at: Date.now(),
|
||||||
|
model: '',
|
||||||
|
profile: 'default',
|
||||||
|
}
|
||||||
|
const sourceAlias = 'old_usage'
|
||||||
|
const sourceColumn = (col: string) => `${quoteIdentifier(sourceAlias)}.${quoteIdentifier(col)}`
|
||||||
|
const insertValues: string[] = []
|
||||||
|
const selectValues: string[] = []
|
||||||
|
|
||||||
|
for (const col of Object.keys(USAGE_SCHEMA)) {
|
||||||
|
if (col === 'id') continue
|
||||||
|
|
||||||
|
insertValues.push(quoteIdentifier(col))
|
||||||
|
selectValues.push(legacyUsageValueSql(sourceAlias, oldCols, col, defaults))
|
||||||
|
}
|
||||||
|
|
||||||
|
const skipExistingWhere = skipExistingSessionIds && oldCols.has('session_id')
|
||||||
|
? ` WHERE NOT EXISTS (SELECT 1 FROM ${quoteIdentifier(USAGE_TABLE)} WHERE ${quoteIdentifier(USAGE_TABLE)}.${quoteIdentifier('session_id')} = ${sourceColumn('session_id')})`
|
||||||
|
: ''
|
||||||
|
|
||||||
|
db.exec(
|
||||||
|
`INSERT INTO ${quoteIdentifier(USAGE_TABLE)} (${insertValues.join(', ')}) ` +
|
||||||
|
`SELECT ${selectValues.join(', ')} FROM ${quoteIdentifier(oldTableName)} AS ${quoteIdentifier(sourceAlias)}` +
|
||||||
|
skipExistingWhere,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
function recoverInterruptedUsageMigration(db: NonNullable<ReturnType<typeof getDb>>): void {
|
||||||
|
const oldUsageTable = `${USAGE_TABLE}_old`
|
||||||
|
if (!sqliteTableExists(db, oldUsageTable)) return
|
||||||
|
|
||||||
|
const oldCols = sqliteTableColumns(db, oldUsageTable)
|
||||||
|
db.exec('BEGIN')
|
||||||
|
try {
|
||||||
|
insertUsageRowsFromLegacyTable(db, oldUsageTable, oldCols, true)
|
||||||
|
db.exec(`DROP TABLE ${quoteIdentifier(oldUsageTable)}`)
|
||||||
|
db.exec('COMMIT')
|
||||||
|
} catch (error) {
|
||||||
|
db.exec('ROLLBACK')
|
||||||
|
throw error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize all Hermes SQLite tables with proper schemas.
|
* Initialize all Hermes SQLite tables with proper schemas.
|
||||||
* This function creates tables and adds missing columns if schemas change.
|
* This function creates tables and adds missing columns if schemas change.
|
||||||
@@ -188,38 +287,29 @@ export function initAllHermesTables(): void {
|
|||||||
if (!db) return
|
if (!db) return
|
||||||
|
|
||||||
// Usage store - with special migration logic
|
// Usage store - with special migration logic
|
||||||
const tableExists = db.prepare(`SELECT name FROM sqlite_master WHERE type='table' AND name=?`).get(USAGE_TABLE)
|
const tableExists = sqliteTableExists(db, USAGE_TABLE)
|
||||||
const cols = (tableExists
|
const cols = tableExists
|
||||||
? db.prepare(`PRAGMA table_info("${USAGE_TABLE}")`).all() as Array<{ name: string; pk: number }>
|
? db.prepare(`PRAGMA table_info(${quoteIdentifier(USAGE_TABLE)})`).all() as Array<{ name: string; pk: number }>
|
||||||
: [])
|
: []
|
||||||
const hasId = cols.some(c => c.name === 'id')
|
const hasId = cols.some(c => c.name === 'id')
|
||||||
if (!hasId && tableExists) {
|
if (!hasId && tableExists) {
|
||||||
// Migration: if session_id is still PRIMARY KEY (no separate id column), recreate table
|
// Migration: if session_id is still PRIMARY KEY (no separate id column), recreate table
|
||||||
const oldCols = new Set(cols.map(c => c.name))
|
const oldCols = new Set(cols.map(c => c.name))
|
||||||
const insertCols = ['session_id', 'input_tokens', 'output_tokens']
|
const oldUsageTable = `${USAGE_TABLE}_old`
|
||||||
const selectCols = [...insertCols]
|
|
||||||
if (oldCols.has('cache_read_tokens')) { insertCols.push('cache_read_tokens'); selectCols.push('cache_read_tokens') }
|
db.exec('BEGIN')
|
||||||
if (oldCols.has('cache_write_tokens')) { insertCols.push('cache_write_tokens'); selectCols.push('cache_write_tokens') }
|
try {
|
||||||
if (oldCols.has('reasoning_tokens')) { insertCols.push('reasoning_tokens'); selectCols.push('reasoning_tokens') }
|
db.exec(`ALTER TABLE ${quoteIdentifier(USAGE_TABLE)} RENAME TO ${quoteIdentifier(oldUsageTable)}`)
|
||||||
if (oldCols.has('created_at')) { insertCols.push('created_at'); selectCols.push('created_at') }
|
db.exec(`CREATE TABLE ${quoteIdentifier(USAGE_TABLE)} (${usageSchemaDefinitionSql()})`)
|
||||||
if (oldCols.has('model')) { insertCols.push('model'); selectCols.push('model') }
|
insertUsageRowsFromLegacyTable(db, oldUsageTable, oldCols)
|
||||||
const defaults = {
|
db.exec(`DROP TABLE ${quoteIdentifier(oldUsageTable)}`)
|
||||||
cache_read_tokens: 0, cache_write_tokens: 0, reasoning_tokens: 0,
|
db.exec('COMMIT')
|
||||||
created_at: Date.now(), model: '', profile: 'default',
|
} catch (error) {
|
||||||
|
db.exec('ROLLBACK')
|
||||||
|
throw error
|
||||||
}
|
}
|
||||||
const insertValues = insertCols.map(c => c)
|
} else if (hasId) {
|
||||||
const selectValues = selectCols.map(c => c)
|
recoverInterruptedUsageMigration(db)
|
||||||
// Columns in new schema but not in old table — use defaults
|
|
||||||
for (const [col] of Object.entries(USAGE_SCHEMA)) {
|
|
||||||
if (!oldCols.has(col) && col !== 'id') {
|
|
||||||
insertValues.push(col)
|
|
||||||
selectValues.push(String(defaults[col as keyof typeof defaults] ?? 0))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
db.exec(`ALTER TABLE "${USAGE_TABLE}" RENAME TO "${USAGE_TABLE}_old"`)
|
|
||||||
db.exec(`CREATE TABLE "${USAGE_TABLE}" (${Object.entries(USAGE_SCHEMA).map(([col, def]) => `"${col}" ${def}`).join(', ')})`)
|
|
||||||
db.exec(`INSERT INTO "${USAGE_TABLE}" (${insertValues.join(', ')}) SELECT ${selectValues.join(', ')} FROM "${USAGE_TABLE}_old"`)
|
|
||||||
db.exec(`DROP TABLE "${USAGE_TABLE}_old"`)
|
|
||||||
}
|
}
|
||||||
ensureTable(USAGE_TABLE, USAGE_SCHEMA)
|
ensureTable(USAGE_TABLE, USAGE_SCHEMA)
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,119 @@
|
|||||||
|
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
|
||||||
|
|
||||||
|
function quoteIdentifier(identifier: string): string {
|
||||||
|
return `"${identifier.replace(/"/g, '""')}"`
|
||||||
|
}
|
||||||
|
|
||||||
|
function ensureTableForTest(db: any, tableName: string, schema: Record<string, string>): void {
|
||||||
|
const colDefs = Object.entries(schema)
|
||||||
|
.map(([col, def]) => `${quoteIdentifier(col)} ${def}`)
|
||||||
|
.join(', ')
|
||||||
|
db.exec(`CREATE TABLE IF NOT EXISTS ${quoteIdentifier(tableName)} (${colDefs})`)
|
||||||
|
|
||||||
|
const rows = db.prepare(`PRAGMA table_info(${quoteIdentifier(tableName)})`).all() as Array<{ name: string }>
|
||||||
|
const existingCols = new Set(rows.map(row => row.name))
|
||||||
|
|
||||||
|
for (const [col, def] of Object.entries(schema)) {
|
||||||
|
if (!existingCols.has(col)) {
|
||||||
|
db.exec(`ALTER TABLE ${quoteIdentifier(tableName)} ADD COLUMN ${quoteIdentifier(col)} ${def}`)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('Hermes schema migrations', () => {
|
||||||
|
let db: any = null
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
vi.resetModules()
|
||||||
|
const { DatabaseSync } = await import('node:sqlite')
|
||||||
|
db = new DatabaseSync(':memory:')
|
||||||
|
vi.doMock('../../packages/server/src/db/index', () => ({
|
||||||
|
getDb: () => db,
|
||||||
|
ensureTable: (tableName: string, schema: Record<string, string>) => ensureTableForTest(db, tableName, schema),
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
db?.close()
|
||||||
|
db = null
|
||||||
|
vi.doUnmock('../../packages/server/src/db/index')
|
||||||
|
vi.resetModules()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('migrates legacy session_usage rows with SQL-safe defaults', async () => {
|
||||||
|
const updatedAt = Date.UTC(2026, 3, 29)
|
||||||
|
db.exec(`CREATE TABLE "session_usage" (
|
||||||
|
"session_id" TEXT PRIMARY KEY,
|
||||||
|
"input_tokens" INTEGER NOT NULL DEFAULT 0,
|
||||||
|
"output_tokens" INTEGER NOT NULL DEFAULT 0,
|
||||||
|
"updated_at" INTEGER NOT NULL
|
||||||
|
)`)
|
||||||
|
db.prepare(
|
||||||
|
`INSERT INTO "session_usage" (session_id, input_tokens, output_tokens, updated_at) VALUES (?, ?, ?, ?)`,
|
||||||
|
).run('legacy-session', 123, 45, updatedAt)
|
||||||
|
|
||||||
|
const { initAllHermesTables } = await import('../../packages/server/src/db/hermes/schemas')
|
||||||
|
|
||||||
|
expect(() => initAllHermesTables()).not.toThrow()
|
||||||
|
|
||||||
|
const row = db.prepare(
|
||||||
|
`SELECT session_id, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens,
|
||||||
|
reasoning_tokens, model, profile, created_at
|
||||||
|
FROM "session_usage"`,
|
||||||
|
).get() as any
|
||||||
|
expect(row).toMatchObject({
|
||||||
|
session_id: 'legacy-session',
|
||||||
|
input_tokens: 123,
|
||||||
|
output_tokens: 45,
|
||||||
|
cache_read_tokens: 0,
|
||||||
|
cache_write_tokens: 0,
|
||||||
|
reasoning_tokens: 0,
|
||||||
|
model: '',
|
||||||
|
profile: 'default',
|
||||||
|
created_at: updatedAt,
|
||||||
|
})
|
||||||
|
expect(db.prepare(`SELECT name FROM sqlite_master WHERE type='table' AND name='session_usage_old'`).get()).toBeUndefined()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('recovers rows left in session_usage_old by a failed previous migration', async () => {
|
||||||
|
const updatedAt = Date.UTC(2026, 3, 30)
|
||||||
|
db.exec(`CREATE TABLE "session_usage" (
|
||||||
|
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
"session_id" TEXT NOT NULL,
|
||||||
|
"input_tokens" INTEGER NOT NULL DEFAULT 0,
|
||||||
|
"output_tokens" INTEGER NOT NULL DEFAULT 0,
|
||||||
|
"cache_read_tokens" INTEGER NOT NULL DEFAULT 0,
|
||||||
|
"cache_write_tokens" INTEGER NOT NULL DEFAULT 0,
|
||||||
|
"reasoning_tokens" INTEGER NOT NULL DEFAULT 0,
|
||||||
|
"model" TEXT NOT NULL DEFAULT '',
|
||||||
|
"profile" TEXT NOT NULL DEFAULT 'default',
|
||||||
|
"created_at" INTEGER NOT NULL
|
||||||
|
)`)
|
||||||
|
db.exec(`CREATE TABLE "session_usage_old" (
|
||||||
|
"session_id" TEXT PRIMARY KEY,
|
||||||
|
"input_tokens" INTEGER NOT NULL DEFAULT 0,
|
||||||
|
"output_tokens" INTEGER NOT NULL DEFAULT 0,
|
||||||
|
"updated_at" INTEGER NOT NULL
|
||||||
|
)`)
|
||||||
|
db.prepare(
|
||||||
|
`INSERT INTO "session_usage_old" (session_id, input_tokens, output_tokens, updated_at) VALUES (?, ?, ?, ?)`,
|
||||||
|
).run('stranded-session', 200, 80, updatedAt)
|
||||||
|
|
||||||
|
const { initAllHermesTables } = await import('../../packages/server/src/db/hermes/schemas')
|
||||||
|
|
||||||
|
expect(() => initAllHermesTables()).not.toThrow()
|
||||||
|
|
||||||
|
const row = db.prepare(
|
||||||
|
`SELECT session_id, input_tokens, output_tokens, model, profile, created_at FROM "session_usage"`,
|
||||||
|
).get() as any
|
||||||
|
expect(row).toMatchObject({
|
||||||
|
session_id: 'stranded-session',
|
||||||
|
input_tokens: 200,
|
||||||
|
output_tokens: 80,
|
||||||
|
model: '',
|
||||||
|
profile: 'default',
|
||||||
|
created_at: updatedAt,
|
||||||
|
})
|
||||||
|
expect(db.prepare(`SELECT name FROM sqlite_master WHERE type='table' AND name='session_usage_old'`).get()).toBeUndefined()
|
||||||
|
})
|
||||||
|
})
|
||||||
Reference in New Issue
Block a user