diff --git a/src/core/bot.ts b/src/core/bot.ts index bf18464..929955a 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -461,6 +461,13 @@ export class LettaBot implements AgentSession { * creating and initializing it if needed. */ private async ensureSessionForKey(key: string): Promise { + // Re-read the store file from disk so we pick up agent/conversation ID + // changes made by other processes (e.g. after a restart or container deploy). + // This costs one synchronous disk read per incoming message, which is fine + // at chat-bot throughput. If this ever becomes a bottleneck, throttle to + // refresh at most once per second. + this.store.refresh(); + const existing = this.sessions.get(key); if (existing) return existing; @@ -543,6 +550,7 @@ export class LettaBot implements AgentSession { * Pre-warm the session subprocess at startup. Call after config/agent is loaded. */ async warmSession(): Promise { + this.store.refresh(); if (!this.store.agentId && !this.store.conversationId) return; try { // In shared mode, warm the single session. In per-channel mode, warm nothing @@ -1600,6 +1608,7 @@ export class LettaBot implements AgentSession { } getStatus(): { agentId: string | null; conversationId: string | null; channels: string[] } { + this.store.refresh(); return { agentId: this.store.agentId, conversationId: this.store.conversationId, diff --git a/src/core/store.test.ts b/src/core/store.test.ts index 18c226d..15721da 100644 --- a/src/core/store.test.ts +++ b/src/core/store.test.ts @@ -1,6 +1,6 @@ import { describe, it, expect, beforeEach, afterEach } from 'vitest'; import { Store } from './store.js'; -import { existsSync, unlinkSync, mkdirSync, writeFileSync } from 'node:fs'; +import { existsSync, unlinkSync, mkdirSync, readFileSync, writeFileSync } from 'node:fs'; import { join } from 'node:path'; import { tmpdir } from 'node:os'; import type { AgentStore } from './types.js'; @@ -8,6 +8,7 @@ import type { AgentStore } from './types.js'; describe('Store', () => { const testDir = join(tmpdir(), 'lettabot-test-' + Date.now() + '-' + Math.random().toString(36).substring(7)); const testStorePath = join(testDir, 'test-store.json'); + const testBackupPath = `${testStorePath}.bak`; let originalLettaAgentId: string | undefined; beforeEach(() => { @@ -24,6 +25,9 @@ describe('Store', () => { if (existsSync(testStorePath)) { unlinkSync(testStorePath); } + if (existsSync(testBackupPath)) { + unlinkSync(testBackupPath); + } // Restore LETTA_AGENT_ID env var if (originalLettaAgentId !== undefined) { @@ -298,4 +302,38 @@ describe('Store', () => { expect(store1.getConversationId('telegram')).toBe('conv-bot1-tg'); expect(store2.getConversationId('telegram')).toBe('conv-bot2-tg'); }); + + it('should refresh in-memory state from disk', () => { + const writer = new Store(testStorePath, 'TestBot'); + writer.agentId = 'agent-v1'; + + const reader = new Store(testStorePath, 'TestBot'); + expect(reader.agentId).toBe('agent-v1'); + + writer.agentId = 'agent-v2'; + expect(reader.agentId).toBe('agent-v1'); + + reader.refresh(); + expect(reader.agentId).toBe('agent-v2'); + }); + + it('should keep a backup and recover from a corrupted primary store file', () => { + const writer = new Store(testStorePath, 'TestBot'); + writer.agentId = 'agent-backup'; + writer.conversationId = 'conv-backup'; + + expect(existsSync(testBackupPath)).toBe(true); + + // Simulate a torn/corrupted write on the primary file. + writeFileSync(testStorePath, '{"version":2,"agents":', 'utf-8'); + + const recovered = new Store(testStorePath, 'TestBot'); + expect(recovered.agentId).toBe('agent-backup'); + expect(recovered.conversationId).toBe('conv-backup'); + + // Constructor recovery should also rewrite a valid primary file. + const raw = JSON.parse(readFileSync(testStorePath, 'utf-8')); + expect(raw.version).toBe(2); + expect(raw.agents.TestBot.agentId).toBe('agent-backup'); + }); }); diff --git a/src/core/store.ts b/src/core/store.ts index 9246244..c5c3805 100644 --- a/src/core/store.ts +++ b/src/core/store.ts @@ -1,94 +1,296 @@ /** * Agent Store - Persists agent state with multi-agent support - * + * * V2 format: { version: 2, agents: { [name]: AgentStore } } * V1 format (legacy): { agentId: ..., ... } - auto-migrated to V2 */ -import { existsSync, readFileSync, writeFileSync, mkdirSync } from 'node:fs'; -import { resolve, dirname } from 'node:path'; +import { + closeSync, + existsSync, + mkdirSync, + openSync, + readFileSync, + renameSync, + statSync, + unlinkSync, + writeFileSync, +} from 'node:fs'; +import { randomUUID } from 'node:crypto'; +import { dirname, resolve } from 'node:path'; import type { AgentStore, LastMessageTarget } from './types.js'; import { getDataDir } from '../utils/paths.js'; const DEFAULT_STORE_PATH = 'lettabot-agent.json'; +const LOCK_RETRY_MS = 25; +const LOCK_TIMEOUT_MS = 5_000; +const LOCK_STALE_MS = 30_000; +const SLEEP_BUFFER = new Int32Array(new SharedArrayBuffer(4)); interface StoreV2 { version: 2; agents: Record; } +interface ParsedStore { + data: StoreV2; + wasV1: boolean; +} + +let warnedAboutBusyWait = false; + +function sleepSync(ms: number): void { + if (typeof Atomics.wait === 'function') { + Atomics.wait(SLEEP_BUFFER, 0, 0, ms); + return; + } + if (!warnedAboutBusyWait) { + console.warn('[Store] Atomics.wait unavailable, falling back to busy-wait for lock retries'); + warnedAboutBusyWait = true; + } + const end = Date.now() + ms; + while (Date.now() < end) { + // Busy-wait fallback -- should not be reached in standard Node.js (v8+) + } +} + export class Store { - private storePath: string; + private readonly storePath: string; + private readonly lockPath: string; + private readonly backupPath: string; private data: StoreV2; - private agentName: string; - + private readonly agentName: string; + constructor(storePath?: string, agentName?: string) { this.storePath = resolve(getDataDir(), storePath || DEFAULT_STORE_PATH); + this.lockPath = `${this.storePath}.lock`; + this.backupPath = `${this.storePath}.bak`; this.agentName = agentName || 'LettaBot'; this.data = this.load(); } - - private load(): StoreV2 { - try { - if (existsSync(this.storePath)) { - const raw = readFileSync(this.storePath, 'utf-8'); - const rawData = JSON.parse(raw) as any; - - // V1 -> V2 auto-migration - if (!rawData.version && rawData.agentId !== undefined) { - const migrated: StoreV2 = { - version: 2, - agents: { [this.agentName]: rawData } - }; - // Write back migrated data - this.writeRaw(migrated); - return migrated; - } - - // Already V2 - if (rawData.version === 2) { - return rawData as StoreV2; - } - } - } catch (e) { - console.error('Failed to load agent store:', e); + + /** + * Reload store state from disk. + * Useful before critical operations in long-running multi-instance deployments. + */ + refresh(): void { + // Capture file existence before attempting reads so we can distinguish + // "files don't exist" (safe to reset to empty) from "files exist but are + // unreadable" (keep current in-memory state as best available data). + const hasPrimary = existsSync(this.storePath); + const hasBackup = existsSync(this.backupPath); + + const primary = this.tryReadStore(this.storePath, 'primary'); + if (primary) { + this.data = primary.data; + return; } - + + const backup = this.tryReadStore(this.backupPath, 'backup'); + if (backup) { + this.data = backup.data; + // Repair the corrupted/missing primary from backup so the next read + // doesn't have to fall through again. + this.persistStore(backup.data); + console.error(`[Store] Recovered in-memory state for ${this.agentName} from backup store.`); + return; + } + + if (!hasPrimary && !hasBackup) { + this.data = { version: 2, agents: {} }; + return; + } + + // Keep current in-memory state if disk files exist but are unreadable. + console.error(`[Store] Keeping in-memory state for ${this.agentName}; on-disk store could not be read.`); + } + + private normalizeStore(rawData: any): ParsedStore { + // V1 -> V2 in-memory migration + if (!rawData?.version && rawData?.agentId !== undefined) { + return { + wasV1: true, + data: { + version: 2, + agents: { [this.agentName]: rawData as AgentStore }, + }, + }; + } + + // V2 + if (rawData?.version === 2 && rawData.agents && typeof rawData.agents === 'object') { + return { wasV1: false, data: rawData as StoreV2 }; + } + + // Unknown/empty format -> safe empty V2 + return { wasV1: false, data: { version: 2, agents: {} } }; + } + + private readStoreFromPath(filePath: string): ParsedStore | null { + if (!existsSync(filePath)) return null; + const raw = readFileSync(filePath, 'utf-8'); + const rawData = JSON.parse(raw); + return this.normalizeStore(rawData); + } + + private tryReadStore(filePath: string, label: string): ParsedStore | null { + try { + return this.readStoreFromPath(filePath); + } catch (error) { + console.error(`[Store] Failed to read ${label} store at ${filePath}:`, error); + return null; + } + } + + private load(): StoreV2 { + const primary = this.tryReadStore(this.storePath, 'primary'); + if (primary) { + if (primary.wasV1) { + this.persistStore(primary.data); + } + return primary.data; + } + + const backup = this.tryReadStore(this.backupPath, 'backup'); + if (backup) { + console.error(`[Store] Recovered agent store from backup: ${this.backupPath}`); + this.persistStore(backup.data); + return backup.data; + } + // Return empty V2 structure return { version: 2, agents: {} }; } - - private writeRaw(data: StoreV2): void { - try { - // Ensure directory exists (important for Railway volumes) - mkdirSync(dirname(this.storePath), { recursive: true }); - writeFileSync(this.storePath, JSON.stringify(data, null, 2)); - } catch (e) { - console.error('Failed to save agent store:', e); + + private acquireLock(): number { + const start = Date.now(); + + while (true) { + try { + const fd = openSync(this.lockPath, 'wx'); + try { + writeFileSync(fd, `${process.pid}\n`, { encoding: 'utf-8' }); + } catch (error) { + try { + closeSync(fd); + } catch { + // Best-effort close. + } + try { + unlinkSync(this.lockPath); + } catch { + // Best-effort lock cleanup. + } + throw error; + } + return fd; + } catch (error) { + const err = error as NodeJS.ErrnoException; + if (err.code !== 'EEXIST') { + throw error; + } + + this.maybeClearStaleLock(); + if (Date.now() - start >= LOCK_TIMEOUT_MS) { + throw new Error(`Timed out waiting for store lock: ${this.lockPath}`); + } + sleepSync(LOCK_RETRY_MS); + } } } - - private save(): void { - // Reload file to get latest data from other Store instances - const current = existsSync(this.storePath) - ? (() => { - try { - const raw = readFileSync(this.storePath, 'utf-8'); - const data = JSON.parse(raw); - return data.version === 2 ? data : { version: 2, agents: {} }; - } catch { - return { version: 2, agents: {} }; - } - })() - : { version: 2, agents: {} }; - - // Merge our agent's data - current.agents[this.agentName] = this.data.agents[this.agentName]; - - // Write merged data - this.writeRaw(current); + + private maybeClearStaleLock(): void { + try { + const stats = statSync(this.lockPath); + if (Date.now() - stats.mtimeMs > LOCK_STALE_MS) { + unlinkSync(this.lockPath); + } + } catch { + // Best-effort stale lock cleanup. + } } - + + private releaseLock(fd: number): void { + try { + closeSync(fd); + } catch { + // Best-effort close. + } + + try { + unlinkSync(this.lockPath); + } catch (error) { + const err = error as NodeJS.ErrnoException; + if (err.code !== 'ENOENT') { + console.error(`[Store] Failed to release lock ${this.lockPath}:`, error); + } + } + } + + private withLock(fn: () => T): T { + const fd = this.acquireLock(); + try { + return fn(); + } finally { + this.releaseLock(fd); + } + } + + private writeRaw(filePath: string, data: StoreV2): void { + const tmpPath = `${filePath}.${randomUUID()}.tmp`; + try { + mkdirSync(dirname(filePath), { recursive: true }); + writeFileSync(tmpPath, `${JSON.stringify(data, null, 2)}\n`, { encoding: 'utf-8' }); + renameSync(tmpPath, filePath); + } catch (error) { + try { + unlinkSync(tmpPath); + } catch { + // Best-effort cleanup. + } + throw error; + } + } + + private writeStoreFiles(data: StoreV2): void { + this.writeRaw(this.storePath, data); + this.writeRaw(this.backupPath, data); + } + + private readLatestForSave(): StoreV2 { + const primary = this.tryReadStore(this.storePath, 'primary'); + if (primary) return primary.data; + + const backup = this.tryReadStore(this.backupPath, 'backup'); + if (backup) { + console.error(`[Store] Using backup store for merge due to unreadable primary store.`); + return backup.data; + } + + return { version: 2, agents: {} }; + } + + private persistStore(data: StoreV2): void { + try { + this.withLock(() => this.writeStoreFiles(data)); + } catch (error) { + console.error('Failed to persist agent store:', error); + } + } + + private save(): void { + try { + this.withLock(() => { + const current = this.readLatestForSave(); + current.agents[this.agentName] = { ...this.agentData() }; + this.writeStoreFiles(current); + this.data = current; + }); + } catch (error) { + console.error('Failed to save agent store:', error); + } + } + /** * Get agent-specific data (creates entry if doesn't exist) */ @@ -98,7 +300,7 @@ export class Store { } return this.data.agents[this.agentName]; } - + get agentId(): string | null { // Keep legacy env var override only for default single-agent key. // In multi-agent mode, a global LETTA_AGENT_ID would leak across agents. @@ -107,7 +309,7 @@ export class Store { } return this.agentData().agentId || null; } - + set agentId(id: string | null) { const agent = this.agentData(); agent.agentId = id; @@ -117,11 +319,11 @@ export class Store { } this.save(); } - + get conversationId(): string | null { return this.agentData().conversationId || null; } - + set conversationId(id: string | null) { this.agentData().conversationId = id; this.save(); @@ -166,16 +368,16 @@ export class Store { } this.save(); } - + get baseUrl(): string | undefined { return this.agentData().baseUrl; } - + set baseUrl(url: string | undefined) { this.agentData().baseUrl = url; this.save(); } - + /** * Set agent ID and associated server URL together */ @@ -190,45 +392,45 @@ export class Store { } this.save(); } - + /** * Check if stored agent matches current server */ isServerMismatch(currentBaseUrl?: string): boolean { const agent = this.agentData(); if (!agent.agentId || !agent.baseUrl) return false; - + // Normalize URLs for comparison const stored = agent.baseUrl.replace(/\/$/, ''); const current = (currentBaseUrl || 'https://api.letta.com').replace(/\/$/, ''); - + return stored !== current; } - + reset(): void { this.data.agents[this.agentName] = { agentId: null }; this.save(); } - + getInfo(): AgentStore { return { ...this.agentData() }; } - + get lastMessageTarget(): LastMessageTarget | null { return this.agentData().lastMessageTarget || null; } - + set lastMessageTarget(target: LastMessageTarget | null) { this.agentData().lastMessageTarget = target || undefined; this.save(); } - + // Recovery tracking - + get recoveryAttempts(): number { return this.agentData().recoveryAttempts || 0; } - + incrementRecoveryAttempts(): number { const agent = this.agentData(); agent.recoveryAttempts = (agent.recoveryAttempts || 0) + 1; @@ -236,7 +438,7 @@ export class Store { this.save(); return agent.recoveryAttempts; } - + resetRecoveryAttempts(): void { this.agentData().recoveryAttempts = 0; this.save(); diff --git a/src/main.ts b/src/main.ts index 936c3a1..48a12bc 100644 --- a/src/main.ts +++ b/src/main.ts @@ -197,6 +197,60 @@ function parseHeartbeatTarget(raw?: string): { channel: string; chatId: string } const DEFAULT_ATTACHMENTS_MAX_MB = 20; const DEFAULT_ATTACHMENTS_MAX_AGE_DAYS = 14; const ATTACHMENTS_PRUNE_INTERVAL_MS = 24 * 60 * 60 * 1000; +const DISCOVERY_LOCK_TIMEOUT_MS = 15_000; +const DISCOVERY_LOCK_STALE_MS = 60_000; +const DISCOVERY_LOCK_RETRY_MS = 100; + +function sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); +} + +function getDiscoveryLockPath(agentName: string): string { + const safe = agentName + .trim() + .replace(/[^a-zA-Z0-9_-]/g, '-') + .replace(/-+/g, '-') + .replace(/^-|-$/g, '') || 'agent'; + return `${STORE_PATH}.${safe}.discover.lock`; +} + +async function withDiscoveryLock(agentName: string, fn: () => Promise): Promise { + const lockPath = getDiscoveryLockPath(agentName); + const start = Date.now(); + + while (true) { + try { + const handle = await fs.open(lockPath, 'wx'); + try { + await handle.writeFile(`${process.pid}\n`, { encoding: 'utf-8' }); + return await fn(); + } finally { + await handle.close().catch(() => {}); + await fs.rm(lockPath, { force: true }).catch(() => {}); + } + } catch (error) { + const err = error as NodeJS.ErrnoException; + if (err.code !== 'EEXIST') { + throw error; + } + + try { + const stats = await fs.stat(lockPath); + if (Date.now() - stats.mtimeMs > DISCOVERY_LOCK_STALE_MS) { + await fs.rm(lockPath, { force: true }); + continue; + } + } catch { + // Best-effort stale lock cleanup. + } + + if (Date.now() - start >= DISCOVERY_LOCK_TIMEOUT_MS) { + throw new Error(`Timed out waiting for startup discovery lock: ${lockPath}`); + } + await sleep(DISCOVERY_LOCK_RETRY_MS); + } + } +} function resolveAttachmentsMaxBytes(): number { const rawBytes = Number(process.env.ATTACHMENTS_MAX_BYTES); @@ -564,13 +618,26 @@ async function main() { } } - // Container deploy: discover by name + // Container deploy: discover by name under an inter-process lock to avoid startup races. if (!initialStatus.agentId && isContainerDeploy) { - const found = await findAgentByName(agentConfig.name); - if (found) { - console.log(`[Agent:${agentConfig.name}] Found existing agent: ${found.id}`); - bot.setAgentId(found.id); - initialStatus = bot.getStatus(); + try { + await withDiscoveryLock(agentConfig.name, async () => { + // Re-read status after lock acquisition in case another instance already set it. + initialStatus = bot.getStatus(); + if (initialStatus.agentId) return; + + const found = await findAgentByName(agentConfig.name); + if (found) { + console.log(`[Agent:${agentConfig.name}] Found existing agent: ${found.id}`); + bot.setAgentId(found.id); + initialStatus = bot.getStatus(); + } + }); + } catch (error) { + console.warn( + `[Agent:${agentConfig.name}] Discovery lock failed:`, + error instanceof Error ? error.message : error + ); } }