From b79d705a99dd92ad21cde95c334b8c122f06da38 Mon Sep 17 00:00:00 2001 From: Cameron Date: Thu, 12 Feb 2026 11:24:58 -0800 Subject: [PATCH] perf: reuse SDK session subprocess across messages (#289) --- src/core/bot.ts | 162 ++++++++++++++++++++++++++++++++++-------------- src/main.ts | 3 + 2 files changed, 117 insertions(+), 48 deletions(-) diff --git a/src/core/bot.ts b/src/core/bot.ts index 32d6a8b..881559b 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -130,6 +130,18 @@ export class LettaBot implements AgentSession { // AskUserQuestion support: resolves when the next user message arrives private pendingQuestionResolver: ((text: string) => void) | null = null; + + // Persistent session: reuse a single CLI subprocess across messages + private persistentSession: Session | null = null; + private currentCanUseTool: CanUseToolCallback | undefined; + // Stable callback wrapper so the Session options never change, but we can + // swap out the per-message handler before each send(). + private readonly sessionCanUseTool: CanUseToolCallback = async (toolName, toolInput) => { + if (this.currentCanUseTool) { + return this.currentCanUseTool(toolName, toolInput); + } + return { behavior: 'allow' as const }; + }; constructor(config: BotConfig) { this.config = config; @@ -292,51 +304,79 @@ export class LettaBot implements AgentSession { } /** - * Create or resume a session with automatic fallback. - * - * Priority: conversationId → agentId (default conv) → createAgent - * If resume fails (conversation missing), falls back to createSession. + * Return the persistent session, creating and initializing it if needed. + * The subprocess stays alive across messages -- only recreated on failure. */ - private async getSession(canUseTool?: CanUseToolCallback): Promise { - const opts = this.baseSessionOptions(canUseTool); + private async ensureSession(): Promise { + if (this.persistentSession) { + return this.persistentSession; + } + + const opts = this.baseSessionOptions(this.sessionCanUseTool); + let session: Session; if (this.store.conversationId) { process.env.LETTA_AGENT_ID = this.store.agentId || undefined; - return resumeSession(this.store.conversationId, opts); - } - if (this.store.agentId) { + session = resumeSession(this.store.conversationId, opts); + } else if (this.store.agentId) { process.env.LETTA_AGENT_ID = this.store.agentId; - // Create a new conversation instead of resuming the default. - // This handles the case where the default conversation was deleted - // or never created (e.g., after migrations). - return createSession(this.store.agentId, opts); + session = createSession(this.store.agentId, opts); + } else { + // Create new agent -- persist immediately so we don't orphan it on later failures + console.log('[Bot] Creating new agent'); + const newAgentId = await createAgent({ + systemPrompt: SYSTEM_PROMPT, + memory: loadMemoryBlocks(this.config.agentName), + }); + const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com'; + this.store.setAgent(newAgentId, currentBaseUrl); + console.log('[Bot] Saved new agent ID:', newAgentId); + + if (this.config.agentName) { + updateAgentName(newAgentId, this.config.agentName).catch(() => {}); + } + installSkillsToAgent(newAgentId, this.config.skills); + + session = createSession(newAgentId, opts); } - // Create new agent -- persist immediately so we don't orphan it on later failures - console.log('[Bot] Creating new agent'); - const newAgentId = await createAgent({ - systemPrompt: SYSTEM_PROMPT, - memory: loadMemoryBlocks(this.config.agentName), - }); - const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com'; - this.store.setAgent(newAgentId, currentBaseUrl); - console.log('[Bot] Saved new agent ID:', newAgentId); + // Initialize eagerly so the subprocess is ready before the first send() + console.log('[Bot] Initializing session subprocess...'); + await session.initialize(); + console.log('[Bot] Session subprocess ready'); + this.persistentSession = session; + return session; + } - // First-run setup: name and skills - if (this.config.agentName) { - updateAgentName(newAgentId, this.config.agentName).catch(() => {}); + /** + * Destroy the persistent session so the next ensureSession() spawns a fresh one. + */ + private invalidateSession(): void { + if (this.persistentSession) { + console.log('[Bot] Invalidating persistent session'); + this.persistentSession.close(); + this.persistentSession = null; } - installSkillsToAgent(newAgentId, this.config.skills); + } - return createSession(newAgentId, opts); + /** + * Pre-warm the session subprocess at startup. Call after config/agent is loaded. + */ + async warmSession(): Promise { + if (!this.store.agentId && !this.store.conversationId) return; + try { + await this.ensureSession(); + } catch (err) { + console.warn('[Bot] Session pre-warm failed:', err instanceof Error ? err.message : err); + } } /** * Persist conversation ID after a successful session result. - * Agent ID and first-run setup are handled eagerly in getSession(). + * Agent ID and first-run setup are handled eagerly in ensureSession(). */ private persistSessionState(session: Session): void { - // Agent ID already persisted in getSession() on creation. + // Agent ID already persisted in ensureSession() on creation. // Here we only update if the server returned a different one (shouldn't happen). if (session.agentId && session.agentId !== this.store.agentId) { const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com'; @@ -352,13 +392,11 @@ export class LettaBot implements AgentSession { * Send a message and return a deduplicated stream. * * Handles: - * - Session creation with fallback chain + * - Persistent session reuse (subprocess stays alive across messages) * - CONFLICT recovery from orphaned approvals (retry once) * - Conversation-not-found fallback (create new conversation) * - Tool call deduplication * - Session persistence after result - * - * Caller is responsible for consuming the stream and closing the session. */ private async runSession( message: SendMessage, @@ -366,7 +404,10 @@ export class LettaBot implements AgentSession { ): Promise<{ session: Session; stream: () => AsyncGenerator }> { const { retried = false, canUseTool } = options; - let session = await this.getSession(canUseTool); + // Update the per-message callback before sending + this.currentCanUseTool = canUseTool; + + let session = await this.ensureSession(); // Send message with fallback chain try { @@ -375,7 +416,7 @@ export class LettaBot implements AgentSession { // 409 CONFLICT from orphaned approval if (!retried && isApprovalConflictError(error) && this.store.agentId && this.store.conversationId) { console.log('[Bot] CONFLICT detected - attempting orphaned approval recovery...'); - session.close(); + this.invalidateSession(); const result = await recoverOrphanedConversationApproval( this.store.agentId, this.store.conversationId @@ -393,10 +434,12 @@ export class LettaBot implements AgentSession { // on auth, network, or protocol errors (which would just fail again). if (this.store.agentId && isConversationMissingError(error)) { console.warn('[Bot] Conversation not found, creating a new conversation...'); - session.close(); - session = createSession(this.store.agentId, this.baseSessionOptions(canUseTool)); + this.invalidateSession(); + session = await this.ensureSession(); await session.send(message); } else { + // Unknown error -- invalidate so we get a fresh subprocess next time + this.invalidateSession(); throw error; } } @@ -513,6 +556,7 @@ export class LettaBot implements AgentSession { const oldConversationId = this.store.conversationId; this.store.conversationId = null; this.store.resetRecoveryAttempts(); + this.invalidateSession(); // Subprocess has old conversation baked in console.log(`[Command] /reset - conversation cleared (was: ${oldConversationId})`); return 'Conversation reset. Send a message to start a new conversation. (Agent memory is preserved.)'; } @@ -672,6 +716,11 @@ export class LettaBot implements AgentSession { private async processMessage(msg: InboundMessage, adapter: ChannelAdapter, retried = false): Promise { // Track timing and last target + const debugTiming = !!process.env.LETTABOT_DEBUG_TIMING; + const t0 = debugTiming ? performance.now() : 0; + const lap = (label: string) => { + if (debugTiming) console.log(`[Timing] ${label}: ${(performance.now() - t0).toFixed(0)}ms`); + }; this.lastUserMessageTime = new Date(); // Skip heartbeat target update for listening mode (don't redirect heartbeats) @@ -684,13 +733,20 @@ export class LettaBot implements AgentSession { }; } - // Skip typing indicator for listening mode + // Fire-and-forget typing indicator so session creation starts immediately if (!msg.isListeningMode) { - await adapter.sendTypingIndicator(msg.chatId); + adapter.sendTypingIndicator(msg.chatId).catch(() => {}); } + lap('typing indicator'); // Pre-send approval recovery - const recovery = await this.attemptRecovery(); + // Only run proactive recovery when previous failures were detected. + // Clean-path messages skip straight to session creation (the 409 retry + // in runSession() still catches stuck states reactively). + const recovery = this.store.recoveryAttempts > 0 + ? await this.attemptRecovery() + : { recovered: false, shouldReset: false }; + lap('recovery check'); if (recovery.shouldReset) { if (!msg.isListeningMode) { await adapter.sendMessage({ @@ -714,6 +770,7 @@ export class LettaBot implements AgentSession { ? formatGroupBatchEnvelope(msg.batchedMessages, {}, msg.isListeningMode) : formatMessageEnvelope(msg, {}, sessionContext); const messageToSend = await buildMultimodalMessage(formattedText, msg); + lap('format message'); // Build AskUserQuestion-aware canUseTool callback with channel context. // In bypassPermissions mode, this callback is only invoked for interactive @@ -754,11 +811,12 @@ export class LettaBot implements AgentSession { let session: Session | null = null; try { const run = await this.runSession(messageToSend, { retried, canUseTool }); + lap('session send'); session = run.session; // Stream response with delivery let response = ''; - let lastUpdate = Date.now(); + let lastUpdate = 0; // Start at 0 so the first streaming edit fires immediately let messageId: string | null = null; let lastMsgType: string | null = null; let lastAssistantUuid: string | null = null; @@ -807,7 +865,9 @@ export class LettaBot implements AgentSession { }, 4000); try { + let firstChunkLogged = false; for await (const streamMsg of run.stream()) { + if (!firstChunkLogged) { lap('first stream chunk'); firstChunkLogged = true; } receivedAnyData = true; msgTypeCounts[streamMsg.type] = (msgTypeCounts[streamMsg.type] || 0) + 1; @@ -930,7 +990,7 @@ export class LettaBot implements AgentSession { if (!retried && this.store.agentId && this.store.conversationId) { const reason = shouldRetryForErrorResult ? 'error result' : 'empty result'; console.log(`[Bot] ${reason} - attempting orphaned approval recovery...`); - session.close(); + this.invalidateSession(); session = null; clearInterval(typingInterval); const convResult = await recoverOrphanedConversationApproval( @@ -965,6 +1025,7 @@ export class LettaBot implements AgentSession { clearInterval(typingInterval); adapter.stopTypingIndicator?.(msg.chatId)?.catch(() => {}); } + lap('stream complete'); // Handle no-reply marker if (response.trim() === '') { @@ -992,6 +1053,7 @@ export class LettaBot implements AgentSession { return; } + lap('directives done'); // Send final response if (response.trim()) { const prefixedFinal = this.prefixResponse(response); @@ -1015,6 +1077,7 @@ export class LettaBot implements AgentSession { } } + lap('message delivered'); // Handle no response if (!sentAnyMessage) { if (!receivedAnyData) { @@ -1051,7 +1114,7 @@ export class LettaBot implements AgentSession { console.error('[Bot] Failed to send error message to channel:', sendError); } } finally { - session?.close(); + // Session stays alive for reuse -- only invalidated on errors } } @@ -1071,7 +1134,7 @@ export class LettaBot implements AgentSession { this.processing = true; try { - const { session, stream } = await this.runSession(text); + const { stream } = await this.runSession(text); try { let response = ''; @@ -1092,8 +1155,10 @@ export class LettaBot implements AgentSession { } } return response; - } finally { - session.close(); + } catch (error) { + // Invalidate on stream errors so next call gets a fresh subprocess + this.invalidateSession(); + throw error; } } finally { this.processing = false; @@ -1117,12 +1182,13 @@ export class LettaBot implements AgentSession { this.processing = true; try { - const { session, stream } = await this.runSession(text); + const { stream } = await this.runSession(text); try { yield* stream(); - } finally { - session.close(); + } catch (error) { + this.invalidateSession(); + throw error; } } finally { this.processing = false; diff --git a/src/main.ts b/src/main.ts index e08ace9..c233013 100644 --- a/src/main.ts +++ b/src/main.ts @@ -585,6 +585,9 @@ async function main() { services.groupBatchers.push(batcher); } + // Pre-warm the SDK session subprocess so the first message doesn't pay startup cost + bot.warmSession().catch(() => {}); + // Per-agent cron if (agentConfig.features?.cron ?? globalConfig.cronEnabled) { const cronService = new CronService(bot);