diff --git a/src/core/bot.ts b/src/core/bot.ts index d9208c3..ebafbf7 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -277,6 +277,9 @@ export class LettaBot implements AgentSession { // In shared mode, only the "shared" key is used. In per-channel mode, each // channel (and optionally heartbeat) gets its own subprocess. private sessions: Map = new Map(); + // Coalesces concurrent ensureSessionForKey calls for the same key so the + // second caller waits for the first instead of creating a duplicate session. + private sessionCreationLocks: Map> = new Map(); private currentCanUseTool: CanUseToolCallback | undefined; private conversationOverrides: Set = new Set(); // Stable callback wrapper so the Session options never change, but we can @@ -805,8 +808,33 @@ export class LettaBot implements AgentSession { /** * Return the persistent session for the given conversation key, * creating and initializing it if needed. + * + * After initialization, calls bootstrapState() to detect pending approvals. + * If an orphaned approval is found, recovers proactively before returning + * the session -- preventing the first send() from hitting a 409 CONFLICT. */ - private async ensureSessionForKey(key: string): Promise { + private async ensureSessionForKey(key: string, bootstrapRetried = false): Promise { + // Fast path: session already exists + const existing = this.sessions.get(key); + if (existing) return existing; + + // Coalesce concurrent callers: if another call is already creating this + // key (e.g. warmSession running while first message arrives), wait for + // it instead of creating a duplicate session. + const pending = this.sessionCreationLocks.get(key); + if (pending) return pending; + + const promise = this._createSessionForKey(key, bootstrapRetried); + this.sessionCreationLocks.set(key, promise); + try { + return await promise; + } finally { + this.sessionCreationLocks.delete(key); + } + } + + /** Internal session creation -- called via ensureSessionForKey's lock. */ + private async _createSessionForKey(key: string, bootstrapRetried: boolean): Promise { // Re-read the store file from disk so we pick up agent/conversation ID // changes made by other processes (e.g. after a restart or container deploy). // This costs one synchronous disk read per incoming message, which is fine @@ -814,9 +842,6 @@ export class LettaBot implements AgentSession { // refresh at most once per second. this.store.refresh(); - const existing = this.sessions.get(key); - if (existing) return existing; - const opts = this.baseSessionOptions(this.sessionCanUseTool); let session: Session; @@ -857,13 +882,52 @@ export class LettaBot implements AgentSession { try { await this.withSessionTimeout(session.initialize(), `Session initialize (key=${key})`); log.info(`Session subprocess ready (key=${key})`); - this.sessions.set(key, session); - return session; } catch (error) { // Close immediately so failed initialization cannot leak a subprocess. session.close(); throw error; } + + // Proactive approval detection via bootstrapState(). + // Single CLI round-trip that returns hasPendingApproval flag alongside + // session metadata. If an orphaned approval is stuck, recover now so the + // first send() doesn't hit a 409 CONFLICT. + if (!bootstrapRetried && this.store.agentId) { + try { + const bootstrap = await this.withSessionTimeout( + session.bootstrapState(), + `Session bootstrapState (key=${key})`, + ); + if (bootstrap.hasPendingApproval) { + const convId = bootstrap.conversationId || session.conversationId; + log.warn(`Pending approval detected at session startup (key=${key}, conv=${convId}), recovering...`); + session.close(); + if (convId) { + const result = await recoverOrphanedConversationApproval( + this.store.agentId, + convId, + true, /* deepScan */ + ); + if (result.recovered) { + log.info(`Proactive approval recovery succeeded: ${result.details}`); + } else { + log.warn(`Proactive approval recovery did not find resolvable approvals: ${result.details}`); + } + } + // Recreate session after recovery (conversation state changed). + // Call _createSessionForKey directly (not ensureSessionForKey) since + // we're already inside the creation lock for this key. + return this._createSessionForKey(key, true); + } + } catch (err) { + // bootstrapState failure is non-fatal -- the session is still usable. + // The reactive 409 handler in runSession() will catch stuck approvals. + log.warn(`bootstrapState check failed (key=${key}), continuing:`, err instanceof Error ? err.message : err); + } + } + + this.sessions.set(key, session); + return session; } /** Legacy convenience: resolve key from shared/per-channel mode and delegate. */ @@ -1374,10 +1438,10 @@ export class LettaBot implements AgentSession { } lap('typing indicator'); - // Pre-send approval recovery - // Only run proactive recovery when previous failures were detected. - // Clean-path messages skip straight to session creation (the 409 retry - // in runSession() still catches stuck states reactively). + // Pre-send approval recovery (secondary defense). + // Primary detection is now in ensureSessionForKey() via bootstrapState(). + // This fallback only fires when previous failures incremented recoveryAttempts, + // covering edge cases where a cached session encounters a new stuck approval. const recovery = this.store.recoveryAttempts > 0 ? await this.attemptRecovery() : { recovered: false, shouldReset: false }; diff --git a/src/tools/letta-api.ts b/src/tools/letta-api.ts index 4b38cf1..704af5d 100644 --- a/src/tools/letta-api.ts +++ b/src/tools/letta-api.ts @@ -714,8 +714,13 @@ export async function recoverOrphanedConversationApproval( // No client is going to approve them -- reject and cancel so // lettabot can proceed. const isStuckApproval = status === 'running' && stopReason === 'requires_approval'; + // Letta Cloud uses status "created" with no stop_reason for runs + // that paused on requires_approval but haven't been resumed yet. + // If we found unresolved approval_request_messages for this run, + // it's stuck -- treat it the same as a running/requires_approval. + const isCreatedWithApproval = status === 'created'; - if (isTerminated || isAbandonedApproval || isStuckApproval) { + if (isTerminated || isAbandonedApproval || isStuckApproval || isCreatedWithApproval) { log.info(`Found ${approvals.length} blocking approval(s) from ${status}/${stopReason} run ${runId}`); // Send denial for each unresolved tool call