diff --git a/src/core/bot.ts b/src/core/bot.ts index ebafbf7..39f8801 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -279,7 +279,9 @@ export class LettaBot implements AgentSession { private sessions: Map = new Map(); // Coalesces concurrent ensureSessionForKey calls for the same key so the // second caller waits for the first instead of creating a duplicate session. - private sessionCreationLocks: Map> = new Map(); + // generation prevents stale in-flight creations from being reused after reset. + private sessionCreationLocks: Map; generation: number }> = new Map(); + private sessionGenerations: Map = new Map(); private currentCanUseTool: CanUseToolCallback | undefined; private conversationOverrides: Set = new Set(); // Stable callback wrapper so the Session options never change, but we can @@ -814,6 +816,8 @@ export class LettaBot implements AgentSession { * the session -- preventing the first send() from hitting a 409 CONFLICT. */ private async ensureSessionForKey(key: string, bootstrapRetried = false): Promise { + const generation = this.sessionGenerations.get(key) ?? 0; + // Fast path: session already exists const existing = this.sessions.get(key); if (existing) return existing; @@ -822,19 +826,31 @@ export class LettaBot implements AgentSession { // key (e.g. warmSession running while first message arrives), wait for // it instead of creating a duplicate session. const pending = this.sessionCreationLocks.get(key); - if (pending) return pending; + if (pending && pending.generation === generation) return pending.promise; - const promise = this._createSessionForKey(key, bootstrapRetried); - this.sessionCreationLocks.set(key, promise); + const promise = this._createSessionForKey(key, bootstrapRetried, generation); + this.sessionCreationLocks.set(key, { promise, generation }); try { return await promise; } finally { - this.sessionCreationLocks.delete(key); + const current = this.sessionCreationLocks.get(key); + if (current?.promise === promise) { + this.sessionCreationLocks.delete(key); + } } } /** Internal session creation -- called via ensureSessionForKey's lock. */ - private async _createSessionForKey(key: string, bootstrapRetried: boolean): Promise { + private async _createSessionForKey( + key: string, + bootstrapRetried: boolean, + generation: number, + ): Promise { + // Session was invalidated while this creation path was queued. + if ((this.sessionGenerations.get(key) ?? 0) !== generation) { + return this.ensureSessionForKey(key, bootstrapRetried); + } + // Re-read the store file from disk so we pick up agent/conversation ID // changes made by other processes (e.g. after a restart or container deploy). // This costs one synchronous disk read per incoming message, which is fine @@ -888,6 +904,13 @@ export class LettaBot implements AgentSession { throw error; } + // reset/invalidate can happen while initialize() is in-flight. + if ((this.sessionGenerations.get(key) ?? 0) !== generation) { + log.info(`Discarding stale initialized session (key=${key})`); + session.close(); + return this.ensureSessionForKey(key, bootstrapRetried); + } + // Proactive approval detection via bootstrapState(). // Single CLI round-trip that returns hasPendingApproval flag alongside // session metadata. If an orphaned approval is stuck, recover now so the @@ -917,7 +940,7 @@ export class LettaBot implements AgentSession { // Recreate session after recovery (conversation state changed). // Call _createSessionForKey directly (not ensureSessionForKey) since // we're already inside the creation lock for this key. - return this._createSessionForKey(key, true); + return this._createSessionForKey(key, true, generation); } } catch (err) { // bootstrapState failure is non-fatal -- the session is still usable. @@ -926,6 +949,12 @@ export class LettaBot implements AgentSession { } } + if ((this.sessionGenerations.get(key) ?? 0) !== generation) { + log.info(`Discarding stale session after bootstrapState (key=${key})`); + session.close(); + return this.ensureSessionForKey(key, bootstrapRetried); + } + this.sessions.set(key, session); return session; } @@ -941,6 +970,12 @@ export class LettaBot implements AgentSession { */ private invalidateSession(key?: string): void { if (key) { + // Invalidate any in-flight creation for this key so reset can force + // a fresh conversation/session immediately. + const nextGeneration = (this.sessionGenerations.get(key) ?? 0) + 1; + this.sessionGenerations.set(key, nextGeneration); + this.sessionCreationLocks.delete(key); + const session = this.sessions.get(key); if (session) { log.info(`Invalidating session (key=${key})`); @@ -948,11 +983,21 @@ export class LettaBot implements AgentSession { this.sessions.delete(key); } } else { + const keys = new Set([ + ...this.sessions.keys(), + ...this.sessionCreationLocks.keys(), + ]); + for (const k of keys) { + const nextGeneration = (this.sessionGenerations.get(k) ?? 0) + 1; + this.sessionGenerations.set(k, nextGeneration); + } + for (const [k, session] of this.sessions) { log.info(`Invalidating session (key=${k})`); session.close(); } this.sessions.clear(); + this.sessionCreationLocks.clear(); } } @@ -1182,34 +1227,29 @@ export class LettaBot implements AgentSession { return '⏰ Heartbeat triggered (silent mode - check server logs)'; } case 'reset': { - const convKey = channelId ? this.resolveConversationKey(channelId) : undefined; - if (convKey && convKey !== 'shared') { - // Per-channel mode: only clear the conversation for this channel - this.store.clearConversation(convKey); - this.invalidateSession(convKey); - log.info(`/reset - conversation cleared for ${convKey}`); - // Eagerly create the new session so we can report the conversation ID - try { - const session = await this.ensureSessionForKey(convKey); - const newConvId = session.conversationId || '(pending)'; - this.persistSessionState(session, convKey); - return `Conversation reset for this channel. New conversation: ${newConvId}\nOther channels are unaffected. (Agent memory is preserved.)`; - } catch { - return `Conversation reset for this channel. Other channels are unaffected. (Agent memory is preserved.)`; - } - } - // Shared mode or no channel context: clear everything - this.store.clearConversation(); + // Always scope the reset to the caller's conversation key so that + // other channels' conversations are never silently destroyed. + // resolveConversationKey returns 'shared' for non-override channels, + // or the channel id for per-channel / override channels. + const convKey = channelId ? this.resolveConversationKey(channelId) : 'shared'; + this.store.clearConversation(convKey); this.store.resetRecoveryAttempts(); - this.invalidateSession(); - log.info('/reset - all conversations cleared'); + this.invalidateSession(convKey); + log.info(`/reset - conversation cleared for key="${convKey}"`); + // Eagerly create the new session so we can report the conversation ID. try { - const session = await this.ensureSessionForKey('shared'); + const session = await this.ensureSessionForKey(convKey); const newConvId = session.conversationId || '(pending)'; - this.persistSessionState(session, 'shared'); - return `Conversation reset. New conversation: ${newConvId}\n(Agent memory is preserved.)`; + this.persistSessionState(session, convKey); + if (convKey === 'shared') { + return `Conversation reset. New conversation: ${newConvId}\n(Agent memory is preserved.)`; + } + return `Conversation reset for this channel. New conversation: ${newConvId}\nOther channels are unaffected. (Agent memory is preserved.)`; } catch { - return 'Conversation reset. Send a message to start a new conversation. (Agent memory is preserved.)'; + if (convKey === 'shared') { + return 'Conversation reset. Send a message to start a new conversation. (Agent memory is preserved.)'; + } + return `Conversation reset for this channel. Other channels are unaffected. (Agent memory is preserved.)`; } } default: diff --git a/src/core/sdk-session-contract.test.ts b/src/core/sdk-session-contract.test.ts index e188991..8c4ed6e 100644 --- a/src/core/sdk-session-contract.test.ts +++ b/src/core/sdk-session-contract.test.ts @@ -14,6 +14,16 @@ vi.mock('@letta-ai/letta-code-sdk', () => ({ import { createSession, resumeSession } from '@letta-ai/letta-code-sdk'; import { LettaBot } from './bot.js'; +function deferred() { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + describe('SDK session contract', () => { let dataDir: string; let originalDataDir: string | undefined; @@ -165,6 +175,68 @@ describe('SDK session contract', () => { expect(vi.mocked(createSession)).toHaveBeenCalledTimes(2); }); + it('reset ignores stale in-flight warm session and creates a fresh one', async () => { + const init = deferred(); + + const warmSession = { + initialize: vi.fn(() => init.promise), + bootstrapState: vi.fn(async () => ({ hasPendingApproval: false, conversationId: 'conv-old' })), + send: vi.fn(async (_message: unknown) => undefined), + stream: vi.fn(() => + (async function* () { + yield { type: 'result', success: true }; + })() + ), + close: vi.fn(() => undefined), + agentId: 'agent-contract-test', + conversationId: 'conv-old', + }; + + const resetSession = { + initialize: vi.fn(async () => undefined), + bootstrapState: vi.fn(async () => ({ hasPendingApproval: false, conversationId: 'conv-new' })), + send: vi.fn(async (_message: unknown) => undefined), + stream: vi.fn(() => + (async function* () { + yield { type: 'result', success: true }; + })() + ), + close: vi.fn(() => undefined), + agentId: 'agent-contract-test', + conversationId: 'conv-new', + }; + + vi.mocked(resumeSession).mockReturnValue(warmSession as never); + vi.mocked(createSession).mockReturnValue(resetSession as never); + + const bot = new LettaBot({ + workingDir: join(dataDir, 'working'), + allowedTools: [], + }); + + // Simulate an existing shared conversation being pre-warmed. + bot.setAgentId('agent-contract-test'); + const botInternal = bot as unknown as { + store: { conversationId: string | null }; + handleCommand: (command: string, channelId?: string) => Promise; + }; + botInternal.store.conversationId = 'conv-old'; + + const warmPromise = bot.warmSession(); + await Promise.resolve(); + + const resetPromise = botInternal.handleCommand('reset'); + + init.resolve(); + await warmPromise; + const resetMessage = await resetPromise; + + expect(resetMessage).toContain('New conversation: conv-new'); + expect(warmSession.close).toHaveBeenCalledTimes(1); + expect(resetSession.initialize).toHaveBeenCalledTimes(1); + expect(vi.mocked(createSession)).toHaveBeenCalledTimes(1); + }); + it('passes memfs: true to createSession when config sets memfs true', async () => { const mockSession = { initialize: vi.fn(async () => undefined), diff --git a/src/core/store.test.ts b/src/core/store.test.ts index 15721da..aa9716e 100644 --- a/src/core/store.test.ts +++ b/src/core/store.test.ts @@ -282,6 +282,23 @@ describe('Store', () => { expect(store.getConversationId('discord')).toBeNull(); }); + it('clearConversation("shared") only clears the legacy conversationId, not per-channel overrides', () => { + const store = new Store(testStorePath, 'TestBot'); + + store.conversationId = 'conv-shared'; + store.setConversationId('slack', 'conv-slack-override'); + store.setConversationId('discord', 'conv-discord-override'); + + // Simulate /reset from a shared-mode channel (e.g. Telegram) + store.clearConversation('shared'); + + // Only the shared conversation should be wiped + expect(store.conversationId).toBeNull(); + // Per-channel override conversations must survive + expect(store.getConversationId('slack')).toBe('conv-slack-override'); + expect(store.getConversationId('discord')).toBe('conv-discord-override'); + }); + it('should persist per-key conversations across reloads', () => { const store1 = new Store(testStorePath, 'TestBot'); store1.setConversationId('telegram', 'conv-tg-persist'); diff --git a/src/core/store.ts b/src/core/store.ts index 7c8fd5f..baa1cbf 100644 --- a/src/core/store.ts +++ b/src/core/store.ts @@ -356,12 +356,17 @@ export class Store { } /** - * Clear conversation(s). If key is provided, clears only that key. - * If key is undefined, clears the legacy conversationId AND all per-key conversations. + * Clear conversation(s). + * - key === 'shared': clears only the legacy shared conversationId (per-channel conversations are untouched). + * - key is a channel name: clears only that channel's per-key conversation entry. + * - key is undefined: clears the legacy conversationId AND all per-key conversations (full wipe). */ clearConversation(key?: string): void { const agent = this.agentData(); - if (key) { + if (key === 'shared') { + // Only wipe the legacy shared conversation; leave per-channel overrides intact. + agent.conversationId = null; + } else if (key) { if (agent.conversations) { delete agent.conversations[key]; }