From c018369caecaae8f42ebf61b37eb132706caced6 Mon Sep 17 00:00:00 2001 From: Cameron Date: Thu, 5 Mar 2026 17:57:04 -0800 Subject: [PATCH] fix: serialize all sendToAgent calls and use per-key canUseTool callbacks (#501) --- src/core/bot.ts | 7 +- src/core/sdk-session-contract.test.ts | 126 ++++++++++++++++++++++++++ src/core/session-manager.ts | 33 ++++--- 3 files changed, 148 insertions(+), 18 deletions(-) diff --git a/src/core/bot.ts b/src/core/bot.ts index ada8c3f..dae0a18 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -1527,13 +1527,12 @@ export class LettaBot implements AgentSession { /** * Acquire the appropriate lock for a conversation key. - * In per-channel mode with a dedicated key, no lock needed (parallel OK). - * In per-channel mode with a channel key, wait for that key's queue. + * In per-channel mode, wait for that key's queue to drain before proceeding. * In shared mode, use the global processing flag. + * All keys — including 'heartbeat' — are serialized to prevent concurrent + * sends on the same Session object, which the SDK does not support. */ private async acquireLock(convKey: string): Promise { - if (convKey === 'heartbeat') return false; // No lock needed - if (convKey !== 'shared') { while (this.processingKeys.has(convKey)) { await new Promise(resolve => setTimeout(resolve, 1000)); diff --git a/src/core/sdk-session-contract.test.ts b/src/core/sdk-session-contract.test.ts index dc686a9..6487be5 100644 --- a/src/core/sdk-session-contract.test.ts +++ b/src/core/sdk-session-contract.test.ts @@ -38,6 +38,8 @@ vi.mock('./system-prompt.js', () => ({ import { createAgent, createSession, resumeSession } from '@letta-ai/letta-code-sdk'; import { getLatestRunError, recoverOrphanedConversationApproval } from '../tools/letta-api.js'; import { LettaBot } from './bot.js'; +import { SessionManager } from './session-manager.js'; +import { Store } from './store.js'; function deferred() { let resolve!: (value: T | PromiseLike) => void; @@ -487,6 +489,130 @@ describe('SDK session contract', () => { expect(opts).not.toHaveProperty('memfs'); }); + it('keeps canUseTool callbacks isolated for concurrent keyed sessions', async () => { + const store = new Store(undefined, 'LettaBot'); + store.setAgent('agent-contract-test', 'https://api.letta.com'); + + const allowCallbackDispatch = deferred(); + const bothSendsStarted = deferred(); + const callbackResults: Array<{ sessionName: string; answer: string | undefined }> = []; + let createdSessions = 0; + let startedSends = 0; + + vi.mocked(resumeSession).mockImplementation((_id, opts) => { + const sessionName = createdSessions++ === 0 ? 'chat-a' : 'chat-b'; + return { + initialize: vi.fn(async () => undefined), + bootstrapState: vi.fn(async () => ({ hasPendingApproval: false })), + send: vi.fn(async (_message: unknown) => { + startedSends += 1; + if (startedSends === 2) { + bothSendsStarted.resolve(); + } + await bothSendsStarted.promise; + await allowCallbackDispatch.promise; + + const canUseTool = opts?.canUseTool; + if (!canUseTool) { + throw new Error('Expected mocked session options to include canUseTool'); + } + + const result = await canUseTool('AskUserQuestion', { sessionName }); + const updatedInput = 'updatedInput' in result + ? result.updatedInput as Record | undefined + : undefined; + callbackResults.push({ + sessionName, + answer: typeof updatedInput?.answer === 'string' + ? updatedInput.answer + : undefined, + }); + }), + stream: vi.fn(() => + (async function* () { + yield { type: 'result', success: true }; + })() + ), + close: vi.fn(() => undefined), + agentId: 'agent-contract-test', + conversationId: `${sessionName}-conversation`, + } as never; + }); + + const canUseToolA = vi.fn(async () => ({ + behavior: 'allow' as const, + updatedInput: { answer: 'from-chat-a' }, + })); + const canUseToolB = vi.fn(async () => ({ + behavior: 'allow' as const, + updatedInput: { answer: 'from-chat-b' }, + })); + + const sessionManager = new SessionManager( + store, + { + workingDir: join(dataDir, 'working'), + allowedTools: [], + conversationMode: 'per-chat', + }, + new Set(), + new Map(), + ); + + const runA = sessionManager.runSession('message-a', { + convKey: 'slack:C123', + canUseTool: canUseToolA, + }); + const runB = sessionManager.runSession('message-b', { + convKey: 'discord:C456', + canUseTool: canUseToolB, + }); + + await bothSendsStarted.promise; + allowCallbackDispatch.resolve(); + await Promise.all([runA, runB]); + + expect(canUseToolA).toHaveBeenCalledTimes(1); + expect(canUseToolB).toHaveBeenCalledTimes(1); + expect(callbackResults).toEqual([ + { sessionName: 'chat-a', answer: 'from-chat-a' }, + { sessionName: 'chat-b', answer: 'from-chat-b' }, + ]); + }); + + it('treats dedicated heartbeat sends as a keyed lock target', async () => { + vi.useFakeTimers(); + try { + const bot = new LettaBot({ + workingDir: join(dataDir, 'working'), + allowedTools: [], + heartbeatConversation: 'dedicated', + }); + const botInternal = bot as any; + + const acquiredFirst = await botInternal.acquireLock('heartbeat'); + let secondResolved = false; + const secondAcquire = botInternal.acquireLock('heartbeat').then((value: boolean) => { + secondResolved = true; + return value; + }); + + await Promise.resolve(); + + expect(acquiredFirst).toBe(true); + expect(botInternal.processingKeys.has('heartbeat')).toBe(true); + expect(secondResolved).toBe(false); + + botInternal.releaseLock('heartbeat', acquiredFirst); + await vi.advanceTimersByTimeAsync(1000); + + expect(await secondAcquire).toBe(true); + botInternal.releaseLock('heartbeat', true); + } finally { + vi.useRealTimers(); + } + }); + it('restarts a keyed queue after non-shared lock release when backlog exists', async () => { const bot = new LettaBot({ workingDir: join(dataDir, 'working'), diff --git a/src/core/session-manager.ts b/src/core/session-manager.ts index 9afe6ca..2c04d1f 100644 --- a/src/core/session-manager.ts +++ b/src/core/session-manager.ts @@ -35,18 +35,21 @@ export class SessionManager { private sessionCreationLocks: Map; generation: number }> = new Map(); private sessionGenerations: Map = new Map(); - // Per-message tool callback. Updated before each send() so the Session - // options (which hold a stable wrapper) route to the current handler. - private currentCanUseTool: CanUseToolCallback | undefined; + // Per-key tool callbacks. Updated before each send() so the Session + // options (which hold a stable per-key wrapper) route to the current handler. + // Keyed by convKey so concurrent sessions on different keys don't clobber + // each other's callback (e.g. 'discord' and 'heartbeat' running in parallel). + private currentCanUseToolByKey = new Map(); - // Stable callback wrapper so the Session options never change, but we can - // swap out the per-message handler before each send(). - private readonly sessionCanUseTool: CanUseToolCallback = async (toolName, toolInput) => { - if (this.currentCanUseTool) { - return this.currentCanUseTool(toolName, toolInput); - } - return { behavior: 'allow' as const }; - }; + // Returns a stable per-key wrapper so Session options never change, while + // still allowing the per-message handler to be swapped before each send(). + private createSessionCanUseTool(key: string): CanUseToolCallback { + return async (toolName, toolInput) => { + const handler = this.currentCanUseToolByKey.get(key); + if (handler) return handler(toolName, toolInput); + return { behavior: 'allow' as const }; + }; + } constructor( store: Store, @@ -220,7 +223,7 @@ export class SessionManager { // changes made by other processes (e.g. after a restart or container deploy). this.store.refresh(); - const opts = this.baseSessionOptions(this.sessionCanUseTool); + const opts = this.baseSessionOptions(this.createSessionCanUseTool(key)); let session: Session; let sessionAgentId: string | undefined; @@ -412,6 +415,7 @@ export class SessionManager { this.sessionLastUsed.delete(key); } this.lastResultRunFingerprints.delete(key); + this.currentCanUseToolByKey.delete(key); } else { const keys = new Set([ ...this.sessions.keys(), @@ -430,6 +434,7 @@ export class SessionManager { this.sessionCreationLocks.clear(); this.sessionLastUsed.clear(); this.lastResultRunFingerprints.clear(); + this.currentCanUseToolByKey.clear(); } } @@ -497,8 +502,8 @@ export class SessionManager { ): Promise<{ session: Session; stream: () => AsyncGenerator }> { const { retried = false, canUseTool, convKey = 'shared' } = options; - // Update the per-message callback before sending - this.currentCanUseTool = canUseTool; + // Update the per-key callback before sending + this.currentCanUseToolByKey.set(convKey, canUseTool); let session = await this.ensureSessionForKey(convKey);