fix: serialize all sendToAgent calls and use per-key canUseTool callbacks (#501)
This commit is contained in:
@@ -1527,13 +1527,12 @@ export class LettaBot implements AgentSession {
|
||||
|
||||
/**
|
||||
* Acquire the appropriate lock for a conversation key.
|
||||
* In per-channel mode with a dedicated key, no lock needed (parallel OK).
|
||||
* In per-channel mode with a channel key, wait for that key's queue.
|
||||
* In per-channel mode, wait for that key's queue to drain before proceeding.
|
||||
* In shared mode, use the global processing flag.
|
||||
* All keys — including 'heartbeat' — are serialized to prevent concurrent
|
||||
* sends on the same Session object, which the SDK does not support.
|
||||
*/
|
||||
private async acquireLock(convKey: string): Promise<boolean> {
|
||||
if (convKey === 'heartbeat') return false; // No lock needed
|
||||
|
||||
if (convKey !== 'shared') {
|
||||
while (this.processingKeys.has(convKey)) {
|
||||
await new Promise(resolve => setTimeout(resolve, 1000));
|
||||
|
||||
@@ -38,6 +38,8 @@ vi.mock('./system-prompt.js', () => ({
|
||||
import { createAgent, createSession, resumeSession } from '@letta-ai/letta-code-sdk';
|
||||
import { getLatestRunError, recoverOrphanedConversationApproval } from '../tools/letta-api.js';
|
||||
import { LettaBot } from './bot.js';
|
||||
import { SessionManager } from './session-manager.js';
|
||||
import { Store } from './store.js';
|
||||
|
||||
function deferred<T>() {
|
||||
let resolve!: (value: T | PromiseLike<T>) => void;
|
||||
@@ -487,6 +489,130 @@ describe('SDK session contract', () => {
|
||||
expect(opts).not.toHaveProperty('memfs');
|
||||
});
|
||||
|
||||
it('keeps canUseTool callbacks isolated for concurrent keyed sessions', async () => {
|
||||
const store = new Store(undefined, 'LettaBot');
|
||||
store.setAgent('agent-contract-test', 'https://api.letta.com');
|
||||
|
||||
const allowCallbackDispatch = deferred<void>();
|
||||
const bothSendsStarted = deferred<void>();
|
||||
const callbackResults: Array<{ sessionName: string; answer: string | undefined }> = [];
|
||||
let createdSessions = 0;
|
||||
let startedSends = 0;
|
||||
|
||||
vi.mocked(resumeSession).mockImplementation((_id, opts) => {
|
||||
const sessionName = createdSessions++ === 0 ? 'chat-a' : 'chat-b';
|
||||
return {
|
||||
initialize: vi.fn(async () => undefined),
|
||||
bootstrapState: vi.fn(async () => ({ hasPendingApproval: false })),
|
||||
send: vi.fn(async (_message: unknown) => {
|
||||
startedSends += 1;
|
||||
if (startedSends === 2) {
|
||||
bothSendsStarted.resolve();
|
||||
}
|
||||
await bothSendsStarted.promise;
|
||||
await allowCallbackDispatch.promise;
|
||||
|
||||
const canUseTool = opts?.canUseTool;
|
||||
if (!canUseTool) {
|
||||
throw new Error('Expected mocked session options to include canUseTool');
|
||||
}
|
||||
|
||||
const result = await canUseTool('AskUserQuestion', { sessionName });
|
||||
const updatedInput = 'updatedInput' in result
|
||||
? result.updatedInput as Record<string, unknown> | undefined
|
||||
: undefined;
|
||||
callbackResults.push({
|
||||
sessionName,
|
||||
answer: typeof updatedInput?.answer === 'string'
|
||||
? updatedInput.answer
|
||||
: undefined,
|
||||
});
|
||||
}),
|
||||
stream: vi.fn(() =>
|
||||
(async function* () {
|
||||
yield { type: 'result', success: true };
|
||||
})()
|
||||
),
|
||||
close: vi.fn(() => undefined),
|
||||
agentId: 'agent-contract-test',
|
||||
conversationId: `${sessionName}-conversation`,
|
||||
} as never;
|
||||
});
|
||||
|
||||
const canUseToolA = vi.fn(async () => ({
|
||||
behavior: 'allow' as const,
|
||||
updatedInput: { answer: 'from-chat-a' },
|
||||
}));
|
||||
const canUseToolB = vi.fn(async () => ({
|
||||
behavior: 'allow' as const,
|
||||
updatedInput: { answer: 'from-chat-b' },
|
||||
}));
|
||||
|
||||
const sessionManager = new SessionManager(
|
||||
store,
|
||||
{
|
||||
workingDir: join(dataDir, 'working'),
|
||||
allowedTools: [],
|
||||
conversationMode: 'per-chat',
|
||||
},
|
||||
new Set<string>(),
|
||||
new Map<string, string>(),
|
||||
);
|
||||
|
||||
const runA = sessionManager.runSession('message-a', {
|
||||
convKey: 'slack:C123',
|
||||
canUseTool: canUseToolA,
|
||||
});
|
||||
const runB = sessionManager.runSession('message-b', {
|
||||
convKey: 'discord:C456',
|
||||
canUseTool: canUseToolB,
|
||||
});
|
||||
|
||||
await bothSendsStarted.promise;
|
||||
allowCallbackDispatch.resolve();
|
||||
await Promise.all([runA, runB]);
|
||||
|
||||
expect(canUseToolA).toHaveBeenCalledTimes(1);
|
||||
expect(canUseToolB).toHaveBeenCalledTimes(1);
|
||||
expect(callbackResults).toEqual([
|
||||
{ sessionName: 'chat-a', answer: 'from-chat-a' },
|
||||
{ sessionName: 'chat-b', answer: 'from-chat-b' },
|
||||
]);
|
||||
});
|
||||
|
||||
it('treats dedicated heartbeat sends as a keyed lock target', async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const bot = new LettaBot({
|
||||
workingDir: join(dataDir, 'working'),
|
||||
allowedTools: [],
|
||||
heartbeatConversation: 'dedicated',
|
||||
});
|
||||
const botInternal = bot as any;
|
||||
|
||||
const acquiredFirst = await botInternal.acquireLock('heartbeat');
|
||||
let secondResolved = false;
|
||||
const secondAcquire = botInternal.acquireLock('heartbeat').then((value: boolean) => {
|
||||
secondResolved = true;
|
||||
return value;
|
||||
});
|
||||
|
||||
await Promise.resolve();
|
||||
|
||||
expect(acquiredFirst).toBe(true);
|
||||
expect(botInternal.processingKeys.has('heartbeat')).toBe(true);
|
||||
expect(secondResolved).toBe(false);
|
||||
|
||||
botInternal.releaseLock('heartbeat', acquiredFirst);
|
||||
await vi.advanceTimersByTimeAsync(1000);
|
||||
|
||||
expect(await secondAcquire).toBe(true);
|
||||
botInternal.releaseLock('heartbeat', true);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it('restarts a keyed queue after non-shared lock release when backlog exists', async () => {
|
||||
const bot = new LettaBot({
|
||||
workingDir: join(dataDir, 'working'),
|
||||
|
||||
@@ -35,18 +35,21 @@ export class SessionManager {
|
||||
private sessionCreationLocks: Map<string, { promise: Promise<Session>; generation: number }> = new Map();
|
||||
private sessionGenerations: Map<string, number> = new Map();
|
||||
|
||||
// Per-message tool callback. Updated before each send() so the Session
|
||||
// options (which hold a stable wrapper) route to the current handler.
|
||||
private currentCanUseTool: CanUseToolCallback | undefined;
|
||||
// Per-key tool callbacks. Updated before each send() so the Session
|
||||
// options (which hold a stable per-key wrapper) route to the current handler.
|
||||
// Keyed by convKey so concurrent sessions on different keys don't clobber
|
||||
// each other's callback (e.g. 'discord' and 'heartbeat' running in parallel).
|
||||
private currentCanUseToolByKey = new Map<string, CanUseToolCallback | undefined>();
|
||||
|
||||
// Stable callback wrapper so the Session options never change, but we can
|
||||
// swap out the per-message handler before each send().
|
||||
private readonly sessionCanUseTool: CanUseToolCallback = async (toolName, toolInput) => {
|
||||
if (this.currentCanUseTool) {
|
||||
return this.currentCanUseTool(toolName, toolInput);
|
||||
}
|
||||
return { behavior: 'allow' as const };
|
||||
};
|
||||
// Returns a stable per-key wrapper so Session options never change, while
|
||||
// still allowing the per-message handler to be swapped before each send().
|
||||
private createSessionCanUseTool(key: string): CanUseToolCallback {
|
||||
return async (toolName, toolInput) => {
|
||||
const handler = this.currentCanUseToolByKey.get(key);
|
||||
if (handler) return handler(toolName, toolInput);
|
||||
return { behavior: 'allow' as const };
|
||||
};
|
||||
}
|
||||
|
||||
constructor(
|
||||
store: Store,
|
||||
@@ -220,7 +223,7 @@ export class SessionManager {
|
||||
// changes made by other processes (e.g. after a restart or container deploy).
|
||||
this.store.refresh();
|
||||
|
||||
const opts = this.baseSessionOptions(this.sessionCanUseTool);
|
||||
const opts = this.baseSessionOptions(this.createSessionCanUseTool(key));
|
||||
let session: Session;
|
||||
let sessionAgentId: string | undefined;
|
||||
|
||||
@@ -412,6 +415,7 @@ export class SessionManager {
|
||||
this.sessionLastUsed.delete(key);
|
||||
}
|
||||
this.lastResultRunFingerprints.delete(key);
|
||||
this.currentCanUseToolByKey.delete(key);
|
||||
} else {
|
||||
const keys = new Set<string>([
|
||||
...this.sessions.keys(),
|
||||
@@ -430,6 +434,7 @@ export class SessionManager {
|
||||
this.sessionCreationLocks.clear();
|
||||
this.sessionLastUsed.clear();
|
||||
this.lastResultRunFingerprints.clear();
|
||||
this.currentCanUseToolByKey.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -497,8 +502,8 @@ export class SessionManager {
|
||||
): Promise<{ session: Session; stream: () => AsyncGenerator<StreamMsg> }> {
|
||||
const { retried = false, canUseTool, convKey = 'shared' } = options;
|
||||
|
||||
// Update the per-message callback before sending
|
||||
this.currentCanUseTool = canUseTool;
|
||||
// Update the per-key callback before sending
|
||||
this.currentCanUseToolByKey.set(convKey, canUseTool);
|
||||
|
||||
let session = await this.ensureSessionForKey(convKey);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user