From 3ff33fee871f628523ed2b7a690c4a77ac7e6f31 Mon Sep 17 00:00:00 2001 From: Cameron Date: Thu, 5 Feb 2026 17:46:44 -0800 Subject: [PATCH] fix: client-side defensive recovery for orphaned approval_request_messages (#182) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a conversation has an orphaned approval_request_message from a cancelled/failed run, every subsequent message fails with 409 CONFLICT. The existing attemptRecovery() can't find these because it checks agent.pending_approval (null) and scans runs with stop_reason=requires_approval (but the orphaned run is cancelled/failed). Adds recoverOrphanedConversationApproval() which directly inspects conversation messages, finds unresolved approval requests, verifies the originating run is terminated, and sends denials to clear the stuck state. Both processMessage() and sendToAgent() now catch CONFLICT errors and retry once after recovery. Fixes #180 Written by Cameron ◯ Letta Code "It is not the strongest of the species that survives, nor the most intelligent, but the one most responsive to change." - Charles Darwin --- src/core/bot.ts | 52 ++++++++++++++++- src/tools/letta-api.ts | 128 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 177 insertions(+), 3 deletions(-) diff --git a/src/core/bot.ts b/src/core/bot.ts index e932869..c29fdef 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -9,13 +9,29 @@ import { mkdirSync } from 'node:fs'; import type { ChannelAdapter } from '../channels/types.js'; import type { BotConfig, InboundMessage, TriggerContext } from './types.js'; import { Store } from './store.js'; -import { updateAgentName, getPendingApprovals, rejectApproval, cancelRuns, disableAllToolApprovals } from '../tools/letta-api.js'; +import { updateAgentName, getPendingApprovals, rejectApproval, cancelRuns, disableAllToolApprovals, recoverOrphanedConversationApproval } from '../tools/letta-api.js'; import { installSkillsToAgent } from '../skills/loader.js'; import { formatMessageEnvelope, type SessionContextOptions } from './formatter.js'; import { loadMemoryBlocks } from './memory.js'; import { SYSTEM_PROMPT } from './system-prompt.js'; import { StreamWatchdog } from './stream-watchdog.js'; +/** + * Detect if an error is a 409 CONFLICT from an orphaned approval. + * The error may come as a ConflictError from the Letta client (status 409) + * or as an error message string through the CLI transport. + */ +function isApprovalConflictError(error: unknown): boolean { + if (error instanceof Error) { + const msg = error.message.toLowerCase(); + if (msg.includes('waiting for approval')) return true; + if (msg.includes('conflict') && msg.includes('approval')) return true; + } + const statusError = error as { status?: number }; + if (statusError?.status === 409) return true; + return false; +} + export class LettaBot { private store: Store; private config: BotConfig; @@ -237,7 +253,7 @@ export class LettaBot { /** * Process a single message */ - private async processMessage(msg: InboundMessage, adapter: ChannelAdapter): Promise { + private async processMessage(msg: InboundMessage, adapter: ChannelAdapter, retried = false): Promise { console.log('[Bot] Starting processMessage'); // Track when user last sent a message (for heartbeat skip logic) this.lastUserMessageTime = new Date(); @@ -373,6 +389,20 @@ export class LettaBot { try { await withTimeout(session.send(formattedMessage), 'Session send'); } catch (sendError) { + // Check for 409 CONFLICT from orphaned approval_request_message + if (!retried && isApprovalConflictError(sendError) && this.store.agentId && this.store.conversationId) { + console.log('[Bot] CONFLICT detected - attempting orphaned approval recovery...'); + session.close(); + const result = await recoverOrphanedConversationApproval( + this.store.agentId, + this.store.conversationId + ); + if (result.recovered) { + console.log(`[Bot] Recovery succeeded (${result.details}), retrying message...`); + return this.processMessage(msg, adapter, /* retried */ true); + } + console.error(`[Bot] Orphaned approval recovery failed: ${result.details}`); + } console.error('[Bot] Error sending message:', sendError); throw sendError; } @@ -638,7 +668,8 @@ export class LettaBot { private async _sendToAgentInternal( text: string, - _context?: TriggerContext + _context?: TriggerContext, + retried = false ): Promise { // Base options for sessions (systemPrompt/memory set via createAgent for new agents) const baseOptions = { @@ -679,6 +710,21 @@ export class LettaBot { try { await session.send(text); } catch (error) { + // Check for 409 CONFLICT from orphaned approval_request_message + if (!retried && isApprovalConflictError(error) && this.store.agentId && this.store.conversationId) { + console.log('[Bot] CONFLICT in sendToAgent - attempting orphaned approval recovery...'); + session.close(); + const result = await recoverOrphanedConversationApproval( + this.store.agentId, + this.store.conversationId + ); + if (result.recovered) { + console.log(`[Bot] Recovery succeeded (${result.details}), retrying sendToAgent...`); + return this._sendToAgentInternal(text, _context, /* retried */ true); + } + console.error(`[Bot] Orphaned approval recovery failed: ${result.details}`); + throw error; + } if (usedSpecificConversation && this.store.agentId) { console.warn('[Bot] Conversation missing, creating a new conversation...'); session.close(); diff --git a/src/tools/letta-api.ts b/src/tools/letta-api.ts index 0fa6b99..24a7f4f 100644 --- a/src/tools/letta-api.ts +++ b/src/tools/letta-api.ts @@ -509,6 +509,134 @@ export async function ensureNoToolApprovals(agentId: string): Promise { * Disable approval requirement for ALL tools on an agent. * Useful for ensuring a headless deployment doesn't get stuck. */ +/** + * Recover from orphaned approval_request_messages by directly inspecting the conversation. + * + * Unlike getPendingApprovals() which relies on agent.pending_approval or run stop_reason, + * this function looks at the actual conversation messages to find unresolved approval requests + * from terminated (failed/cancelled) runs. + * + * Returns { recovered: true } if orphaned approvals were found and resolved. + */ +export async function recoverOrphanedConversationApproval( + agentId: string, + conversationId: string +): Promise<{ recovered: boolean; details: string }> { + try { + const client = getClient(); + + // List recent messages from the conversation to find orphaned approvals + const messagesPage = await client.conversations.messages.list(conversationId, { limit: 50 }); + const messages: Array> = []; + for await (const msg of messagesPage) { + messages.push(msg as unknown as Record); + } + + if (messages.length === 0) { + return { recovered: false, details: 'No messages in conversation' }; + } + + // Build set of tool_call_ids that already have approval responses + const resolvedToolCalls = new Set(); + for (const msg of messages) { + if (msg.message_type === 'approval_response_message') { + const approvals = (msg.approvals as Array<{ tool_call_id?: string }>) || []; + for (const a of approvals) { + if (a.tool_call_id) resolvedToolCalls.add(a.tool_call_id); + } + } + } + + // Find unresolved approval_request_messages + interface UnresolvedApproval { + toolCallId: string; + toolName: string; + runId: string; + } + const unresolvedByRun = new Map(); + + for (const msg of messages) { + if (msg.message_type !== 'approval_request_message') continue; + + const toolCalls = (msg.tool_calls as Array<{ tool_call_id: string; name: string }>) + || (msg.tool_call ? [msg.tool_call as { tool_call_id: string; name: string }] : []); + const runId = msg.run_id as string | undefined; + + for (const tc of toolCalls) { + if (!tc.tool_call_id || resolvedToolCalls.has(tc.tool_call_id)) continue; + + const key = runId || 'unknown'; + if (!unresolvedByRun.has(key)) unresolvedByRun.set(key, []); + unresolvedByRun.get(key)!.push({ + toolCallId: tc.tool_call_id, + toolName: tc.name || 'unknown', + runId: key, + }); + } + } + + if (unresolvedByRun.size === 0) { + return { recovered: false, details: 'No unresolved approval requests found' }; + } + + // Check each run's status - only resolve orphaned approvals from terminated runs + let recoveredCount = 0; + const details: string[] = []; + + for (const [runId, approvals] of unresolvedByRun) { + if (runId === 'unknown') { + // No run_id on the approval message - can't verify, skip + details.push(`Skipped ${approvals.length} approval(s) with no run_id`); + continue; + } + + try { + const run = await client.runs.retrieve(runId); + const status = run.status; + + if (status === 'failed' || status === 'cancelled') { + console.log(`[Letta API] Found ${approvals.length} orphaned approval(s) from ${status} run ${runId}`); + + // Send denial for each unresolved tool call + const approvalResponses = approvals.map(a => ({ + approve: false as const, + tool_call_id: a.toolCallId, + type: 'approval' as const, + reason: `Auto-denied: originating run was ${status}`, + })); + + await client.conversations.messages.create(conversationId, { + messages: [{ + type: 'approval', + approvals: approvalResponses, + }], + streaming: false, + }); + + recoveredCount += approvals.length; + details.push(`Denied ${approvals.length} approval(s) from ${status} run ${runId}`); + } else { + details.push(`Run ${runId} is ${status} - not orphaned`); + } + } catch (runError) { + console.warn(`[Letta API] Failed to check run ${runId}:`, runError); + details.push(`Failed to check run ${runId}`); + } + } + + const detailStr = details.join('; '); + if (recoveredCount > 0) { + console.log(`[Letta API] Recovered ${recoveredCount} orphaned approval(s): ${detailStr}`); + return { recovered: true, details: detailStr }; + } + + return { recovered: false, details: detailStr }; + } catch (e) { + console.error('[Letta API] Failed to recover orphaned conversation approval:', e); + return { recovered: false, details: `Error: ${e instanceof Error ? e.message : String(e)}` }; + } +} + export async function disableAllToolApprovals(agentId: string): Promise { try { const tools = await getAgentTools(agentId);