fix: harden agent store persistence and startup agent discovery (#357)

This commit is contained in:
Cameron
2026-02-23 12:06:53 -08:00
committed by GitHub
parent 92d8a0cf10
commit 4fa212a9a1
4 changed files with 402 additions and 86 deletions

View File

@@ -461,6 +461,13 @@ export class LettaBot implements AgentSession {
* creating and initializing it if needed.
*/
private async ensureSessionForKey(key: string): Promise<Session> {
// Re-read the store file from disk so we pick up agent/conversation ID
// changes made by other processes (e.g. after a restart or container deploy).
// This costs one synchronous disk read per incoming message, which is fine
// at chat-bot throughput. If this ever becomes a bottleneck, throttle to
// refresh at most once per second.
this.store.refresh();
const existing = this.sessions.get(key);
if (existing) return existing;
@@ -543,6 +550,7 @@ export class LettaBot implements AgentSession {
* Pre-warm the session subprocess at startup. Call after config/agent is loaded.
*/
async warmSession(): Promise<void> {
this.store.refresh();
if (!this.store.agentId && !this.store.conversationId) return;
try {
// In shared mode, warm the single session. In per-channel mode, warm nothing
@@ -1600,6 +1608,7 @@ export class LettaBot implements AgentSession {
}
getStatus(): { agentId: string | null; conversationId: string | null; channels: string[] } {
this.store.refresh();
return {
agentId: this.store.agentId,
conversationId: this.store.conversationId,

View File

@@ -1,6 +1,6 @@
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { Store } from './store.js';
import { existsSync, unlinkSync, mkdirSync, writeFileSync } from 'node:fs';
import { existsSync, unlinkSync, mkdirSync, readFileSync, writeFileSync } from 'node:fs';
import { join } from 'node:path';
import { tmpdir } from 'node:os';
import type { AgentStore } from './types.js';
@@ -8,6 +8,7 @@ import type { AgentStore } from './types.js';
describe('Store', () => {
const testDir = join(tmpdir(), 'lettabot-test-' + Date.now() + '-' + Math.random().toString(36).substring(7));
const testStorePath = join(testDir, 'test-store.json');
const testBackupPath = `${testStorePath}.bak`;
let originalLettaAgentId: string | undefined;
beforeEach(() => {
@@ -24,6 +25,9 @@ describe('Store', () => {
if (existsSync(testStorePath)) {
unlinkSync(testStorePath);
}
if (existsSync(testBackupPath)) {
unlinkSync(testBackupPath);
}
// Restore LETTA_AGENT_ID env var
if (originalLettaAgentId !== undefined) {
@@ -298,4 +302,38 @@ describe('Store', () => {
expect(store1.getConversationId('telegram')).toBe('conv-bot1-tg');
expect(store2.getConversationId('telegram')).toBe('conv-bot2-tg');
});
it('should refresh in-memory state from disk', () => {
const writer = new Store(testStorePath, 'TestBot');
writer.agentId = 'agent-v1';
const reader = new Store(testStorePath, 'TestBot');
expect(reader.agentId).toBe('agent-v1');
writer.agentId = 'agent-v2';
expect(reader.agentId).toBe('agent-v1');
reader.refresh();
expect(reader.agentId).toBe('agent-v2');
});
it('should keep a backup and recover from a corrupted primary store file', () => {
const writer = new Store(testStorePath, 'TestBot');
writer.agentId = 'agent-backup';
writer.conversationId = 'conv-backup';
expect(existsSync(testBackupPath)).toBe(true);
// Simulate a torn/corrupted write on the primary file.
writeFileSync(testStorePath, '{"version":2,"agents":', 'utf-8');
const recovered = new Store(testStorePath, 'TestBot');
expect(recovered.agentId).toBe('agent-backup');
expect(recovered.conversationId).toBe('conv-backup');
// Constructor recovery should also rewrite a valid primary file.
const raw = JSON.parse(readFileSync(testStorePath, 'utf-8'));
expect(raw.version).toBe(2);
expect(raw.agents.TestBot.agentId).toBe('agent-backup');
});
});

View File

@@ -1,94 +1,296 @@
/**
* Agent Store - Persists agent state with multi-agent support
*
*
* V2 format: { version: 2, agents: { [name]: AgentStore } }
* V1 format (legacy): { agentId: ..., ... } - auto-migrated to V2
*/
import { existsSync, readFileSync, writeFileSync, mkdirSync } from 'node:fs';
import { resolve, dirname } from 'node:path';
import {
closeSync,
existsSync,
mkdirSync,
openSync,
readFileSync,
renameSync,
statSync,
unlinkSync,
writeFileSync,
} from 'node:fs';
import { randomUUID } from 'node:crypto';
import { dirname, resolve } from 'node:path';
import type { AgentStore, LastMessageTarget } from './types.js';
import { getDataDir } from '../utils/paths.js';
const DEFAULT_STORE_PATH = 'lettabot-agent.json';
const LOCK_RETRY_MS = 25;
const LOCK_TIMEOUT_MS = 5_000;
const LOCK_STALE_MS = 30_000;
const SLEEP_BUFFER = new Int32Array(new SharedArrayBuffer(4));
interface StoreV2 {
version: 2;
agents: Record<string, AgentStore>;
}
interface ParsedStore {
data: StoreV2;
wasV1: boolean;
}
let warnedAboutBusyWait = false;
function sleepSync(ms: number): void {
if (typeof Atomics.wait === 'function') {
Atomics.wait(SLEEP_BUFFER, 0, 0, ms);
return;
}
if (!warnedAboutBusyWait) {
console.warn('[Store] Atomics.wait unavailable, falling back to busy-wait for lock retries');
warnedAboutBusyWait = true;
}
const end = Date.now() + ms;
while (Date.now() < end) {
// Busy-wait fallback -- should not be reached in standard Node.js (v8+)
}
}
export class Store {
private storePath: string;
private readonly storePath: string;
private readonly lockPath: string;
private readonly backupPath: string;
private data: StoreV2;
private agentName: string;
private readonly agentName: string;
constructor(storePath?: string, agentName?: string) {
this.storePath = resolve(getDataDir(), storePath || DEFAULT_STORE_PATH);
this.lockPath = `${this.storePath}.lock`;
this.backupPath = `${this.storePath}.bak`;
this.agentName = agentName || 'LettaBot';
this.data = this.load();
}
private load(): StoreV2 {
try {
if (existsSync(this.storePath)) {
const raw = readFileSync(this.storePath, 'utf-8');
const rawData = JSON.parse(raw) as any;
// V1 -> V2 auto-migration
if (!rawData.version && rawData.agentId !== undefined) {
const migrated: StoreV2 = {
version: 2,
agents: { [this.agentName]: rawData }
};
// Write back migrated data
this.writeRaw(migrated);
return migrated;
}
// Already V2
if (rawData.version === 2) {
return rawData as StoreV2;
}
}
} catch (e) {
console.error('Failed to load agent store:', e);
/**
* Reload store state from disk.
* Useful before critical operations in long-running multi-instance deployments.
*/
refresh(): void {
// Capture file existence before attempting reads so we can distinguish
// "files don't exist" (safe to reset to empty) from "files exist but are
// unreadable" (keep current in-memory state as best available data).
const hasPrimary = existsSync(this.storePath);
const hasBackup = existsSync(this.backupPath);
const primary = this.tryReadStore(this.storePath, 'primary');
if (primary) {
this.data = primary.data;
return;
}
const backup = this.tryReadStore(this.backupPath, 'backup');
if (backup) {
this.data = backup.data;
// Repair the corrupted/missing primary from backup so the next read
// doesn't have to fall through again.
this.persistStore(backup.data);
console.error(`[Store] Recovered in-memory state for ${this.agentName} from backup store.`);
return;
}
if (!hasPrimary && !hasBackup) {
this.data = { version: 2, agents: {} };
return;
}
// Keep current in-memory state if disk files exist but are unreadable.
console.error(`[Store] Keeping in-memory state for ${this.agentName}; on-disk store could not be read.`);
}
private normalizeStore(rawData: any): ParsedStore {
// V1 -> V2 in-memory migration
if (!rawData?.version && rawData?.agentId !== undefined) {
return {
wasV1: true,
data: {
version: 2,
agents: { [this.agentName]: rawData as AgentStore },
},
};
}
// V2
if (rawData?.version === 2 && rawData.agents && typeof rawData.agents === 'object') {
return { wasV1: false, data: rawData as StoreV2 };
}
// Unknown/empty format -> safe empty V2
return { wasV1: false, data: { version: 2, agents: {} } };
}
private readStoreFromPath(filePath: string): ParsedStore | null {
if (!existsSync(filePath)) return null;
const raw = readFileSync(filePath, 'utf-8');
const rawData = JSON.parse(raw);
return this.normalizeStore(rawData);
}
private tryReadStore(filePath: string, label: string): ParsedStore | null {
try {
return this.readStoreFromPath(filePath);
} catch (error) {
console.error(`[Store] Failed to read ${label} store at ${filePath}:`, error);
return null;
}
}
private load(): StoreV2 {
const primary = this.tryReadStore(this.storePath, 'primary');
if (primary) {
if (primary.wasV1) {
this.persistStore(primary.data);
}
return primary.data;
}
const backup = this.tryReadStore(this.backupPath, 'backup');
if (backup) {
console.error(`[Store] Recovered agent store from backup: ${this.backupPath}`);
this.persistStore(backup.data);
return backup.data;
}
// Return empty V2 structure
return { version: 2, agents: {} };
}
private writeRaw(data: StoreV2): void {
try {
// Ensure directory exists (important for Railway volumes)
mkdirSync(dirname(this.storePath), { recursive: true });
writeFileSync(this.storePath, JSON.stringify(data, null, 2));
} catch (e) {
console.error('Failed to save agent store:', e);
private acquireLock(): number {
const start = Date.now();
while (true) {
try {
const fd = openSync(this.lockPath, 'wx');
try {
writeFileSync(fd, `${process.pid}\n`, { encoding: 'utf-8' });
} catch (error) {
try {
closeSync(fd);
} catch {
// Best-effort close.
}
try {
unlinkSync(this.lockPath);
} catch {
// Best-effort lock cleanup.
}
throw error;
}
return fd;
} catch (error) {
const err = error as NodeJS.ErrnoException;
if (err.code !== 'EEXIST') {
throw error;
}
this.maybeClearStaleLock();
if (Date.now() - start >= LOCK_TIMEOUT_MS) {
throw new Error(`Timed out waiting for store lock: ${this.lockPath}`);
}
sleepSync(LOCK_RETRY_MS);
}
}
}
private save(): void {
// Reload file to get latest data from other Store instances
const current = existsSync(this.storePath)
? (() => {
try {
const raw = readFileSync(this.storePath, 'utf-8');
const data = JSON.parse(raw);
return data.version === 2 ? data : { version: 2, agents: {} };
} catch {
return { version: 2, agents: {} };
}
})()
: { version: 2, agents: {} };
// Merge our agent's data
current.agents[this.agentName] = this.data.agents[this.agentName];
// Write merged data
this.writeRaw(current);
private maybeClearStaleLock(): void {
try {
const stats = statSync(this.lockPath);
if (Date.now() - stats.mtimeMs > LOCK_STALE_MS) {
unlinkSync(this.lockPath);
}
} catch {
// Best-effort stale lock cleanup.
}
}
private releaseLock(fd: number): void {
try {
closeSync(fd);
} catch {
// Best-effort close.
}
try {
unlinkSync(this.lockPath);
} catch (error) {
const err = error as NodeJS.ErrnoException;
if (err.code !== 'ENOENT') {
console.error(`[Store] Failed to release lock ${this.lockPath}:`, error);
}
}
}
private withLock<T>(fn: () => T): T {
const fd = this.acquireLock();
try {
return fn();
} finally {
this.releaseLock(fd);
}
}
private writeRaw(filePath: string, data: StoreV2): void {
const tmpPath = `${filePath}.${randomUUID()}.tmp`;
try {
mkdirSync(dirname(filePath), { recursive: true });
writeFileSync(tmpPath, `${JSON.stringify(data, null, 2)}\n`, { encoding: 'utf-8' });
renameSync(tmpPath, filePath);
} catch (error) {
try {
unlinkSync(tmpPath);
} catch {
// Best-effort cleanup.
}
throw error;
}
}
private writeStoreFiles(data: StoreV2): void {
this.writeRaw(this.storePath, data);
this.writeRaw(this.backupPath, data);
}
private readLatestForSave(): StoreV2 {
const primary = this.tryReadStore(this.storePath, 'primary');
if (primary) return primary.data;
const backup = this.tryReadStore(this.backupPath, 'backup');
if (backup) {
console.error(`[Store] Using backup store for merge due to unreadable primary store.`);
return backup.data;
}
return { version: 2, agents: {} };
}
private persistStore(data: StoreV2): void {
try {
this.withLock(() => this.writeStoreFiles(data));
} catch (error) {
console.error('Failed to persist agent store:', error);
}
}
private save(): void {
try {
this.withLock(() => {
const current = this.readLatestForSave();
current.agents[this.agentName] = { ...this.agentData() };
this.writeStoreFiles(current);
this.data = current;
});
} catch (error) {
console.error('Failed to save agent store:', error);
}
}
/**
* Get agent-specific data (creates entry if doesn't exist)
*/
@@ -98,7 +300,7 @@ export class Store {
}
return this.data.agents[this.agentName];
}
get agentId(): string | null {
// Keep legacy env var override only for default single-agent key.
// In multi-agent mode, a global LETTA_AGENT_ID would leak across agents.
@@ -107,7 +309,7 @@ export class Store {
}
return this.agentData().agentId || null;
}
set agentId(id: string | null) {
const agent = this.agentData();
agent.agentId = id;
@@ -117,11 +319,11 @@ export class Store {
}
this.save();
}
get conversationId(): string | null {
return this.agentData().conversationId || null;
}
set conversationId(id: string | null) {
this.agentData().conversationId = id;
this.save();
@@ -166,16 +368,16 @@ export class Store {
}
this.save();
}
get baseUrl(): string | undefined {
return this.agentData().baseUrl;
}
set baseUrl(url: string | undefined) {
this.agentData().baseUrl = url;
this.save();
}
/**
* Set agent ID and associated server URL together
*/
@@ -190,45 +392,45 @@ export class Store {
}
this.save();
}
/**
* Check if stored agent matches current server
*/
isServerMismatch(currentBaseUrl?: string): boolean {
const agent = this.agentData();
if (!agent.agentId || !agent.baseUrl) return false;
// Normalize URLs for comparison
const stored = agent.baseUrl.replace(/\/$/, '');
const current = (currentBaseUrl || 'https://api.letta.com').replace(/\/$/, '');
return stored !== current;
}
reset(): void {
this.data.agents[this.agentName] = { agentId: null };
this.save();
}
getInfo(): AgentStore {
return { ...this.agentData() };
}
get lastMessageTarget(): LastMessageTarget | null {
return this.agentData().lastMessageTarget || null;
}
set lastMessageTarget(target: LastMessageTarget | null) {
this.agentData().lastMessageTarget = target || undefined;
this.save();
}
// Recovery tracking
get recoveryAttempts(): number {
return this.agentData().recoveryAttempts || 0;
}
incrementRecoveryAttempts(): number {
const agent = this.agentData();
agent.recoveryAttempts = (agent.recoveryAttempts || 0) + 1;
@@ -236,7 +438,7 @@ export class Store {
this.save();
return agent.recoveryAttempts;
}
resetRecoveryAttempts(): void {
this.agentData().recoveryAttempts = 0;
this.save();

View File

@@ -197,6 +197,60 @@ function parseHeartbeatTarget(raw?: string): { channel: string; chatId: string }
const DEFAULT_ATTACHMENTS_MAX_MB = 20;
const DEFAULT_ATTACHMENTS_MAX_AGE_DAYS = 14;
const ATTACHMENTS_PRUNE_INTERVAL_MS = 24 * 60 * 60 * 1000;
const DISCOVERY_LOCK_TIMEOUT_MS = 15_000;
const DISCOVERY_LOCK_STALE_MS = 60_000;
const DISCOVERY_LOCK_RETRY_MS = 100;
function sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
function getDiscoveryLockPath(agentName: string): string {
const safe = agentName
.trim()
.replace(/[^a-zA-Z0-9_-]/g, '-')
.replace(/-+/g, '-')
.replace(/^-|-$/g, '') || 'agent';
return `${STORE_PATH}.${safe}.discover.lock`;
}
async function withDiscoveryLock<T>(agentName: string, fn: () => Promise<T>): Promise<T> {
const lockPath = getDiscoveryLockPath(agentName);
const start = Date.now();
while (true) {
try {
const handle = await fs.open(lockPath, 'wx');
try {
await handle.writeFile(`${process.pid}\n`, { encoding: 'utf-8' });
return await fn();
} finally {
await handle.close().catch(() => {});
await fs.rm(lockPath, { force: true }).catch(() => {});
}
} catch (error) {
const err = error as NodeJS.ErrnoException;
if (err.code !== 'EEXIST') {
throw error;
}
try {
const stats = await fs.stat(lockPath);
if (Date.now() - stats.mtimeMs > DISCOVERY_LOCK_STALE_MS) {
await fs.rm(lockPath, { force: true });
continue;
}
} catch {
// Best-effort stale lock cleanup.
}
if (Date.now() - start >= DISCOVERY_LOCK_TIMEOUT_MS) {
throw new Error(`Timed out waiting for startup discovery lock: ${lockPath}`);
}
await sleep(DISCOVERY_LOCK_RETRY_MS);
}
}
}
function resolveAttachmentsMaxBytes(): number {
const rawBytes = Number(process.env.ATTACHMENTS_MAX_BYTES);
@@ -564,13 +618,26 @@ async function main() {
}
}
// Container deploy: discover by name
// Container deploy: discover by name under an inter-process lock to avoid startup races.
if (!initialStatus.agentId && isContainerDeploy) {
const found = await findAgentByName(agentConfig.name);
if (found) {
console.log(`[Agent:${agentConfig.name}] Found existing agent: ${found.id}`);
bot.setAgentId(found.id);
initialStatus = bot.getStatus();
try {
await withDiscoveryLock(agentConfig.name, async () => {
// Re-read status after lock acquisition in case another instance already set it.
initialStatus = bot.getStatus();
if (initialStatus.agentId) return;
const found = await findAgentByName(agentConfig.name);
if (found) {
console.log(`[Agent:${agentConfig.name}] Found existing agent: ${found.id}`);
bot.setAgentId(found.id);
initialStatus = bot.getStatus();
}
});
} catch (error) {
console.warn(
`[Agent:${agentConfig.name}] Discovery lock failed:`,
error instanceof Error ? error.message : error
);
}
}