diff --git a/src/core/bot.ts b/src/core/bot.ts index 0955937..0e1db40 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -221,6 +221,32 @@ export class LettaBot implements AgentSession { } } + private getSessionTimeoutMs(): number { + const envTimeoutMs = Number(process.env.LETTA_SESSION_TIMEOUT_MS); + if (Number.isFinite(envTimeoutMs) && envTimeoutMs > 0) { + return envTimeoutMs; + } + return 60000; + } + + private async withSessionTimeout( + promise: Promise, + label: string, + ): Promise { + const timeoutMs = this.getSessionTimeoutMs(); + let timeoutId: ReturnType | undefined; + const timeoutPromise = new Promise((_, reject) => { + timeoutId = setTimeout(() => { + reject(new Error(`${label} timed out after ${timeoutMs}ms`)); + }, timeoutMs); + }); + try { + return await Promise.race([promise, timeoutPromise]); + } finally { + if (timeoutId) clearTimeout(timeoutId); + } + } + private baseSessionOptions(canUseTool?: CanUseToolCallback) { return { permissionMode: 'bypassPermissions' as const, @@ -387,10 +413,16 @@ export class LettaBot implements AgentSession { // Initialize eagerly so the subprocess is ready before the first send() console.log(`[Bot] Initializing session subprocess (key=${key})...`); - await session.initialize(); - console.log(`[Bot] Session subprocess ready (key=${key})`); - this.sessions.set(key, session); - return session; + try { + await this.withSessionTimeout(session.initialize(), `Session initialize (key=${key})`); + console.log(`[Bot] Session subprocess ready (key=${key})`); + this.sessions.set(key, session); + return session; + } catch (error) { + // Close immediately so failed initialization cannot leak a subprocess. + session.close(); + throw error; + } } /** Legacy convenience: resolve key from shared/per-channel mode and delegate. */ @@ -489,7 +521,7 @@ export class LettaBot implements AgentSession { // Send message with fallback chain try { - await session.send(message); + await this.withSessionTimeout(session.send(message), `Session send (key=${convKey})`); } catch (error) { // 409 CONFLICT from orphaned approval if (!retried && isApprovalConflictError(error) && this.store.agentId && convId) { @@ -519,7 +551,12 @@ export class LettaBot implements AgentSession { this.store.conversationId = null; } session = await this.ensureSessionForKey(convKey); - await session.send(message); + try { + await this.withSessionTimeout(session.send(message), `Session send retry (key=${convKey})`); + } catch (retryError) { + this.invalidateSession(convKey); + throw retryError; + } } else { // Unknown error -- invalidate so we get a fresh subprocess next time this.invalidateSession(convKey); @@ -546,11 +583,13 @@ export class LettaBot implements AgentSession { if (id) seenToolCallIds.add(id); } - yield msg; - - // Persist state on result if (msg.type === 'result') { self.persistSessionState(session, capturedConvKey); + } + + yield msg; + + if (msg.type === 'result') { break; } } diff --git a/src/core/sdk-session-contract.test.ts b/src/core/sdk-session-contract.test.ts index 7231737..4c949b3 100644 --- a/src/core/sdk-session-contract.test.ts +++ b/src/core/sdk-session-contract.test.ts @@ -19,16 +19,19 @@ describe('SDK session contract', () => { let originalDataDir: string | undefined; let originalAgentId: string | undefined; let originalRailwayMount: string | undefined; + let originalSessionTimeout: string | undefined; beforeEach(() => { dataDir = mkdtempSync(join(tmpdir(), 'lettabot-sdk-contract-')); originalDataDir = process.env.DATA_DIR; originalAgentId = process.env.LETTA_AGENT_ID; originalRailwayMount = process.env.RAILWAY_VOLUME_MOUNT_PATH; + originalSessionTimeout = process.env.LETTA_SESSION_TIMEOUT_MS; process.env.DATA_DIR = dataDir; process.env.LETTA_AGENT_ID = 'agent-contract-test'; delete process.env.RAILWAY_VOLUME_MOUNT_PATH; + delete process.env.LETTA_SESSION_TIMEOUT_MS; vi.clearAllMocks(); }); @@ -43,6 +46,9 @@ describe('SDK session contract', () => { if (originalRailwayMount === undefined) delete process.env.RAILWAY_VOLUME_MOUNT_PATH; else process.env.RAILWAY_VOLUME_MOUNT_PATH = originalRailwayMount; + if (originalSessionTimeout === undefined) delete process.env.LETTA_SESSION_TIMEOUT_MS; + else process.env.LETTA_SESSION_TIMEOUT_MS = originalSessionTimeout; + rmSync(dataDir, { recursive: true, force: true }); }); @@ -80,4 +86,82 @@ describe('SDK session contract', () => { expect(mockSession.send).toHaveBeenNthCalledWith(2, 'second message'); expect(mockSession.stream).toHaveBeenCalledTimes(2); }); + + it('closes session if initialize times out before first send', async () => { + process.env.LETTA_SESSION_TIMEOUT_MS = '5'; + + const mockSession = { + initialize: vi.fn(() => new Promise(() => {})), + send: vi.fn(async (_message: unknown) => undefined), + stream: vi.fn(() => + (async function* () { + yield { type: 'result', success: true }; + })() + ), + close: vi.fn(() => undefined), + agentId: 'agent-contract-test', + conversationId: 'conversation-contract-test', + }; + + vi.mocked(createSession).mockReturnValue(mockSession as never); + vi.mocked(resumeSession).mockReturnValue(mockSession as never); + + const bot = new LettaBot({ + workingDir: join(dataDir, 'working'), + allowedTools: [], + }); + + await expect(bot.sendToAgent('will timeout')).rejects.toThrow('Session initialize (key=shared) timed out after 5ms'); + expect(mockSession.close).toHaveBeenCalledTimes(1); + }); + + it('invalidates retry session when fallback send fails after conversation-missing error', async () => { + const missingConversation = new Error('conversation not found'); + const retryFailure = new Error('network down'); + + const firstSession = { + initialize: vi.fn(async () => undefined), + send: vi.fn(async () => { + throw missingConversation; + }), + stream: vi.fn(() => + (async function* () { + yield { type: 'result', success: true }; + })() + ), + close: vi.fn(() => undefined), + agentId: 'agent-contract-test', + conversationId: 'conversation-contract-test-1', + }; + + const secondSession = { + initialize: vi.fn(async () => undefined), + send: vi.fn(async () => { + throw retryFailure; + }), + stream: vi.fn(() => + (async function* () { + yield { type: 'result', success: true }; + })() + ), + close: vi.fn(() => undefined), + agentId: 'agent-contract-test', + conversationId: 'conversation-contract-test-2', + }; + + vi.mocked(createSession) + .mockReturnValueOnce(firstSession as never) + .mockReturnValueOnce(secondSession as never); + vi.mocked(resumeSession).mockReturnValue(firstSession as never); + + const bot = new LettaBot({ + workingDir: join(dataDir, 'working'), + allowedTools: [], + }); + + await expect(bot.sendToAgent('trigger fallback')).rejects.toThrow('network down'); + expect(firstSession.close).toHaveBeenCalledTimes(1); + expect(secondSession.close).toHaveBeenCalledTimes(1); + expect(vi.mocked(createSession)).toHaveBeenCalledTimes(2); + }); });