fix(core): make /reset safe across conversation keys and in-flight sessions (#386)
This commit is contained in:
@@ -279,7 +279,9 @@ export class LettaBot implements AgentSession {
|
||||
private sessions: Map<string, Session> = new Map();
|
||||
// Coalesces concurrent ensureSessionForKey calls for the same key so the
|
||||
// second caller waits for the first instead of creating a duplicate session.
|
||||
private sessionCreationLocks: Map<string, Promise<Session>> = new Map();
|
||||
// generation prevents stale in-flight creations from being reused after reset.
|
||||
private sessionCreationLocks: Map<string, { promise: Promise<Session>; generation: number }> = new Map();
|
||||
private sessionGenerations: Map<string, number> = new Map();
|
||||
private currentCanUseTool: CanUseToolCallback | undefined;
|
||||
private conversationOverrides: Set<string> = new Set();
|
||||
// Stable callback wrapper so the Session options never change, but we can
|
||||
@@ -814,6 +816,8 @@ export class LettaBot implements AgentSession {
|
||||
* the session -- preventing the first send() from hitting a 409 CONFLICT.
|
||||
*/
|
||||
private async ensureSessionForKey(key: string, bootstrapRetried = false): Promise<Session> {
|
||||
const generation = this.sessionGenerations.get(key) ?? 0;
|
||||
|
||||
// Fast path: session already exists
|
||||
const existing = this.sessions.get(key);
|
||||
if (existing) return existing;
|
||||
@@ -822,19 +826,31 @@ export class LettaBot implements AgentSession {
|
||||
// key (e.g. warmSession running while first message arrives), wait for
|
||||
// it instead of creating a duplicate session.
|
||||
const pending = this.sessionCreationLocks.get(key);
|
||||
if (pending) return pending;
|
||||
if (pending && pending.generation === generation) return pending.promise;
|
||||
|
||||
const promise = this._createSessionForKey(key, bootstrapRetried);
|
||||
this.sessionCreationLocks.set(key, promise);
|
||||
const promise = this._createSessionForKey(key, bootstrapRetried, generation);
|
||||
this.sessionCreationLocks.set(key, { promise, generation });
|
||||
try {
|
||||
return await promise;
|
||||
} finally {
|
||||
const current = this.sessionCreationLocks.get(key);
|
||||
if (current?.promise === promise) {
|
||||
this.sessionCreationLocks.delete(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Internal session creation -- called via ensureSessionForKey's lock. */
|
||||
private async _createSessionForKey(key: string, bootstrapRetried: boolean): Promise<Session> {
|
||||
private async _createSessionForKey(
|
||||
key: string,
|
||||
bootstrapRetried: boolean,
|
||||
generation: number,
|
||||
): Promise<Session> {
|
||||
// Session was invalidated while this creation path was queued.
|
||||
if ((this.sessionGenerations.get(key) ?? 0) !== generation) {
|
||||
return this.ensureSessionForKey(key, bootstrapRetried);
|
||||
}
|
||||
|
||||
// Re-read the store file from disk so we pick up agent/conversation ID
|
||||
// changes made by other processes (e.g. after a restart or container deploy).
|
||||
// This costs one synchronous disk read per incoming message, which is fine
|
||||
@@ -888,6 +904,13 @@ export class LettaBot implements AgentSession {
|
||||
throw error;
|
||||
}
|
||||
|
||||
// reset/invalidate can happen while initialize() is in-flight.
|
||||
if ((this.sessionGenerations.get(key) ?? 0) !== generation) {
|
||||
log.info(`Discarding stale initialized session (key=${key})`);
|
||||
session.close();
|
||||
return this.ensureSessionForKey(key, bootstrapRetried);
|
||||
}
|
||||
|
||||
// Proactive approval detection via bootstrapState().
|
||||
// Single CLI round-trip that returns hasPendingApproval flag alongside
|
||||
// session metadata. If an orphaned approval is stuck, recover now so the
|
||||
@@ -917,7 +940,7 @@ export class LettaBot implements AgentSession {
|
||||
// Recreate session after recovery (conversation state changed).
|
||||
// Call _createSessionForKey directly (not ensureSessionForKey) since
|
||||
// we're already inside the creation lock for this key.
|
||||
return this._createSessionForKey(key, true);
|
||||
return this._createSessionForKey(key, true, generation);
|
||||
}
|
||||
} catch (err) {
|
||||
// bootstrapState failure is non-fatal -- the session is still usable.
|
||||
@@ -926,6 +949,12 @@ export class LettaBot implements AgentSession {
|
||||
}
|
||||
}
|
||||
|
||||
if ((this.sessionGenerations.get(key) ?? 0) !== generation) {
|
||||
log.info(`Discarding stale session after bootstrapState (key=${key})`);
|
||||
session.close();
|
||||
return this.ensureSessionForKey(key, bootstrapRetried);
|
||||
}
|
||||
|
||||
this.sessions.set(key, session);
|
||||
return session;
|
||||
}
|
||||
@@ -941,6 +970,12 @@ export class LettaBot implements AgentSession {
|
||||
*/
|
||||
private invalidateSession(key?: string): void {
|
||||
if (key) {
|
||||
// Invalidate any in-flight creation for this key so reset can force
|
||||
// a fresh conversation/session immediately.
|
||||
const nextGeneration = (this.sessionGenerations.get(key) ?? 0) + 1;
|
||||
this.sessionGenerations.set(key, nextGeneration);
|
||||
this.sessionCreationLocks.delete(key);
|
||||
|
||||
const session = this.sessions.get(key);
|
||||
if (session) {
|
||||
log.info(`Invalidating session (key=${key})`);
|
||||
@@ -948,11 +983,21 @@ export class LettaBot implements AgentSession {
|
||||
this.sessions.delete(key);
|
||||
}
|
||||
} else {
|
||||
const keys = new Set<string>([
|
||||
...this.sessions.keys(),
|
||||
...this.sessionCreationLocks.keys(),
|
||||
]);
|
||||
for (const k of keys) {
|
||||
const nextGeneration = (this.sessionGenerations.get(k) ?? 0) + 1;
|
||||
this.sessionGenerations.set(k, nextGeneration);
|
||||
}
|
||||
|
||||
for (const [k, session] of this.sessions) {
|
||||
log.info(`Invalidating session (key=${k})`);
|
||||
session.close();
|
||||
}
|
||||
this.sessions.clear();
|
||||
this.sessionCreationLocks.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1182,35 +1227,30 @@ export class LettaBot implements AgentSession {
|
||||
return '⏰ Heartbeat triggered (silent mode - check server logs)';
|
||||
}
|
||||
case 'reset': {
|
||||
const convKey = channelId ? this.resolveConversationKey(channelId) : undefined;
|
||||
if (convKey && convKey !== 'shared') {
|
||||
// Per-channel mode: only clear the conversation for this channel
|
||||
// Always scope the reset to the caller's conversation key so that
|
||||
// other channels' conversations are never silently destroyed.
|
||||
// resolveConversationKey returns 'shared' for non-override channels,
|
||||
// or the channel id for per-channel / override channels.
|
||||
const convKey = channelId ? this.resolveConversationKey(channelId) : 'shared';
|
||||
this.store.clearConversation(convKey);
|
||||
this.store.resetRecoveryAttempts();
|
||||
this.invalidateSession(convKey);
|
||||
log.info(`/reset - conversation cleared for ${convKey}`);
|
||||
// Eagerly create the new session so we can report the conversation ID
|
||||
log.info(`/reset - conversation cleared for key="${convKey}"`);
|
||||
// Eagerly create the new session so we can report the conversation ID.
|
||||
try {
|
||||
const session = await this.ensureSessionForKey(convKey);
|
||||
const newConvId = session.conversationId || '(pending)';
|
||||
this.persistSessionState(session, convKey);
|
||||
if (convKey === 'shared') {
|
||||
return `Conversation reset. New conversation: ${newConvId}\n(Agent memory is preserved.)`;
|
||||
}
|
||||
return `Conversation reset for this channel. New conversation: ${newConvId}\nOther channels are unaffected. (Agent memory is preserved.)`;
|
||||
} catch {
|
||||
return `Conversation reset for this channel. Other channels are unaffected. (Agent memory is preserved.)`;
|
||||
}
|
||||
}
|
||||
// Shared mode or no channel context: clear everything
|
||||
this.store.clearConversation();
|
||||
this.store.resetRecoveryAttempts();
|
||||
this.invalidateSession();
|
||||
log.info('/reset - all conversations cleared');
|
||||
try {
|
||||
const session = await this.ensureSessionForKey('shared');
|
||||
const newConvId = session.conversationId || '(pending)';
|
||||
this.persistSessionState(session, 'shared');
|
||||
return `Conversation reset. New conversation: ${newConvId}\n(Agent memory is preserved.)`;
|
||||
} catch {
|
||||
if (convKey === 'shared') {
|
||||
return 'Conversation reset. Send a message to start a new conversation. (Agent memory is preserved.)';
|
||||
}
|
||||
return `Conversation reset for this channel. Other channels are unaffected. (Agent memory is preserved.)`;
|
||||
}
|
||||
}
|
||||
default:
|
||||
return null;
|
||||
|
||||
@@ -14,6 +14,16 @@ vi.mock('@letta-ai/letta-code-sdk', () => ({
|
||||
import { createSession, resumeSession } from '@letta-ai/letta-code-sdk';
|
||||
import { LettaBot } from './bot.js';
|
||||
|
||||
function deferred<T>() {
|
||||
let resolve!: (value: T | PromiseLike<T>) => void;
|
||||
let reject!: (reason?: unknown) => void;
|
||||
const promise = new Promise<T>((res, rej) => {
|
||||
resolve = res;
|
||||
reject = rej;
|
||||
});
|
||||
return { promise, resolve, reject };
|
||||
}
|
||||
|
||||
describe('SDK session contract', () => {
|
||||
let dataDir: string;
|
||||
let originalDataDir: string | undefined;
|
||||
@@ -165,6 +175,68 @@ describe('SDK session contract', () => {
|
||||
expect(vi.mocked(createSession)).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it('reset ignores stale in-flight warm session and creates a fresh one', async () => {
|
||||
const init = deferred<void>();
|
||||
|
||||
const warmSession = {
|
||||
initialize: vi.fn(() => init.promise),
|
||||
bootstrapState: vi.fn(async () => ({ hasPendingApproval: false, conversationId: 'conv-old' })),
|
||||
send: vi.fn(async (_message: unknown) => undefined),
|
||||
stream: vi.fn(() =>
|
||||
(async function* () {
|
||||
yield { type: 'result', success: true };
|
||||
})()
|
||||
),
|
||||
close: vi.fn(() => undefined),
|
||||
agentId: 'agent-contract-test',
|
||||
conversationId: 'conv-old',
|
||||
};
|
||||
|
||||
const resetSession = {
|
||||
initialize: vi.fn(async () => undefined),
|
||||
bootstrapState: vi.fn(async () => ({ hasPendingApproval: false, conversationId: 'conv-new' })),
|
||||
send: vi.fn(async (_message: unknown) => undefined),
|
||||
stream: vi.fn(() =>
|
||||
(async function* () {
|
||||
yield { type: 'result', success: true };
|
||||
})()
|
||||
),
|
||||
close: vi.fn(() => undefined),
|
||||
agentId: 'agent-contract-test',
|
||||
conversationId: 'conv-new',
|
||||
};
|
||||
|
||||
vi.mocked(resumeSession).mockReturnValue(warmSession as never);
|
||||
vi.mocked(createSession).mockReturnValue(resetSession as never);
|
||||
|
||||
const bot = new LettaBot({
|
||||
workingDir: join(dataDir, 'working'),
|
||||
allowedTools: [],
|
||||
});
|
||||
|
||||
// Simulate an existing shared conversation being pre-warmed.
|
||||
bot.setAgentId('agent-contract-test');
|
||||
const botInternal = bot as unknown as {
|
||||
store: { conversationId: string | null };
|
||||
handleCommand: (command: string, channelId?: string) => Promise<string | null>;
|
||||
};
|
||||
botInternal.store.conversationId = 'conv-old';
|
||||
|
||||
const warmPromise = bot.warmSession();
|
||||
await Promise.resolve();
|
||||
|
||||
const resetPromise = botInternal.handleCommand('reset');
|
||||
|
||||
init.resolve();
|
||||
await warmPromise;
|
||||
const resetMessage = await resetPromise;
|
||||
|
||||
expect(resetMessage).toContain('New conversation: conv-new');
|
||||
expect(warmSession.close).toHaveBeenCalledTimes(1);
|
||||
expect(resetSession.initialize).toHaveBeenCalledTimes(1);
|
||||
expect(vi.mocked(createSession)).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('passes memfs: true to createSession when config sets memfs true', async () => {
|
||||
const mockSession = {
|
||||
initialize: vi.fn(async () => undefined),
|
||||
|
||||
@@ -282,6 +282,23 @@ describe('Store', () => {
|
||||
expect(store.getConversationId('discord')).toBeNull();
|
||||
});
|
||||
|
||||
it('clearConversation("shared") only clears the legacy conversationId, not per-channel overrides', () => {
|
||||
const store = new Store(testStorePath, 'TestBot');
|
||||
|
||||
store.conversationId = 'conv-shared';
|
||||
store.setConversationId('slack', 'conv-slack-override');
|
||||
store.setConversationId('discord', 'conv-discord-override');
|
||||
|
||||
// Simulate /reset from a shared-mode channel (e.g. Telegram)
|
||||
store.clearConversation('shared');
|
||||
|
||||
// Only the shared conversation should be wiped
|
||||
expect(store.conversationId).toBeNull();
|
||||
// Per-channel override conversations must survive
|
||||
expect(store.getConversationId('slack')).toBe('conv-slack-override');
|
||||
expect(store.getConversationId('discord')).toBe('conv-discord-override');
|
||||
});
|
||||
|
||||
it('should persist per-key conversations across reloads', () => {
|
||||
const store1 = new Store(testStorePath, 'TestBot');
|
||||
store1.setConversationId('telegram', 'conv-tg-persist');
|
||||
|
||||
@@ -356,12 +356,17 @@ export class Store {
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear conversation(s). If key is provided, clears only that key.
|
||||
* If key is undefined, clears the legacy conversationId AND all per-key conversations.
|
||||
* Clear conversation(s).
|
||||
* - key === 'shared': clears only the legacy shared conversationId (per-channel conversations are untouched).
|
||||
* - key is a channel name: clears only that channel's per-key conversation entry.
|
||||
* - key is undefined: clears the legacy conversationId AND all per-key conversations (full wipe).
|
||||
*/
|
||||
clearConversation(key?: string): void {
|
||||
const agent = this.agentData();
|
||||
if (key) {
|
||||
if (key === 'shared') {
|
||||
// Only wipe the legacy shared conversation; leave per-channel overrides intact.
|
||||
agent.conversationId = null;
|
||||
} else if (key) {
|
||||
if (agent.conversations) {
|
||||
delete agent.conversations[key];
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user