diff --git a/src/core/bot.ts b/src/core/bot.ts index e932869..c29fdef 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -9,13 +9,29 @@ import { mkdirSync } from 'node:fs'; import type { ChannelAdapter } from '../channels/types.js'; import type { BotConfig, InboundMessage, TriggerContext } from './types.js'; import { Store } from './store.js'; -import { updateAgentName, getPendingApprovals, rejectApproval, cancelRuns, disableAllToolApprovals } from '../tools/letta-api.js'; +import { updateAgentName, getPendingApprovals, rejectApproval, cancelRuns, disableAllToolApprovals, recoverOrphanedConversationApproval } from '../tools/letta-api.js'; import { installSkillsToAgent } from '../skills/loader.js'; import { formatMessageEnvelope, type SessionContextOptions } from './formatter.js'; import { loadMemoryBlocks } from './memory.js'; import { SYSTEM_PROMPT } from './system-prompt.js'; import { StreamWatchdog } from './stream-watchdog.js'; +/** + * Detect if an error is a 409 CONFLICT from an orphaned approval. + * The error may come as a ConflictError from the Letta client (status 409) + * or as an error message string through the CLI transport. + */ +function isApprovalConflictError(error: unknown): boolean { + if (error instanceof Error) { + const msg = error.message.toLowerCase(); + if (msg.includes('waiting for approval')) return true; + if (msg.includes('conflict') && msg.includes('approval')) return true; + } + const statusError = error as { status?: number }; + if (statusError?.status === 409) return true; + return false; +} + export class LettaBot { private store: Store; private config: BotConfig; @@ -237,7 +253,7 @@ export class LettaBot { /** * Process a single message */ - private async processMessage(msg: InboundMessage, adapter: ChannelAdapter): Promise { + private async processMessage(msg: InboundMessage, adapter: ChannelAdapter, retried = false): Promise { console.log('[Bot] Starting processMessage'); // Track when user last sent a message (for heartbeat skip logic) this.lastUserMessageTime = new Date(); @@ -373,6 +389,20 @@ export class LettaBot { try { await withTimeout(session.send(formattedMessage), 'Session send'); } catch (sendError) { + // Check for 409 CONFLICT from orphaned approval_request_message + if (!retried && isApprovalConflictError(sendError) && this.store.agentId && this.store.conversationId) { + console.log('[Bot] CONFLICT detected - attempting orphaned approval recovery...'); + session.close(); + const result = await recoverOrphanedConversationApproval( + this.store.agentId, + this.store.conversationId + ); + if (result.recovered) { + console.log(`[Bot] Recovery succeeded (${result.details}), retrying message...`); + return this.processMessage(msg, adapter, /* retried */ true); + } + console.error(`[Bot] Orphaned approval recovery failed: ${result.details}`); + } console.error('[Bot] Error sending message:', sendError); throw sendError; } @@ -638,7 +668,8 @@ export class LettaBot { private async _sendToAgentInternal( text: string, - _context?: TriggerContext + _context?: TriggerContext, + retried = false ): Promise { // Base options for sessions (systemPrompt/memory set via createAgent for new agents) const baseOptions = { @@ -679,6 +710,21 @@ export class LettaBot { try { await session.send(text); } catch (error) { + // Check for 409 CONFLICT from orphaned approval_request_message + if (!retried && isApprovalConflictError(error) && this.store.agentId && this.store.conversationId) { + console.log('[Bot] CONFLICT in sendToAgent - attempting orphaned approval recovery...'); + session.close(); + const result = await recoverOrphanedConversationApproval( + this.store.agentId, + this.store.conversationId + ); + if (result.recovered) { + console.log(`[Bot] Recovery succeeded (${result.details}), retrying sendToAgent...`); + return this._sendToAgentInternal(text, _context, /* retried */ true); + } + console.error(`[Bot] Orphaned approval recovery failed: ${result.details}`); + throw error; + } if (usedSpecificConversation && this.store.agentId) { console.warn('[Bot] Conversation missing, creating a new conversation...'); session.close(); diff --git a/src/tools/letta-api.ts b/src/tools/letta-api.ts index 0fa6b99..24a7f4f 100644 --- a/src/tools/letta-api.ts +++ b/src/tools/letta-api.ts @@ -509,6 +509,134 @@ export async function ensureNoToolApprovals(agentId: string): Promise { * Disable approval requirement for ALL tools on an agent. * Useful for ensuring a headless deployment doesn't get stuck. */ +/** + * Recover from orphaned approval_request_messages by directly inspecting the conversation. + * + * Unlike getPendingApprovals() which relies on agent.pending_approval or run stop_reason, + * this function looks at the actual conversation messages to find unresolved approval requests + * from terminated (failed/cancelled) runs. + * + * Returns { recovered: true } if orphaned approvals were found and resolved. + */ +export async function recoverOrphanedConversationApproval( + agentId: string, + conversationId: string +): Promise<{ recovered: boolean; details: string }> { + try { + const client = getClient(); + + // List recent messages from the conversation to find orphaned approvals + const messagesPage = await client.conversations.messages.list(conversationId, { limit: 50 }); + const messages: Array> = []; + for await (const msg of messagesPage) { + messages.push(msg as unknown as Record); + } + + if (messages.length === 0) { + return { recovered: false, details: 'No messages in conversation' }; + } + + // Build set of tool_call_ids that already have approval responses + const resolvedToolCalls = new Set(); + for (const msg of messages) { + if (msg.message_type === 'approval_response_message') { + const approvals = (msg.approvals as Array<{ tool_call_id?: string }>) || []; + for (const a of approvals) { + if (a.tool_call_id) resolvedToolCalls.add(a.tool_call_id); + } + } + } + + // Find unresolved approval_request_messages + interface UnresolvedApproval { + toolCallId: string; + toolName: string; + runId: string; + } + const unresolvedByRun = new Map(); + + for (const msg of messages) { + if (msg.message_type !== 'approval_request_message') continue; + + const toolCalls = (msg.tool_calls as Array<{ tool_call_id: string; name: string }>) + || (msg.tool_call ? [msg.tool_call as { tool_call_id: string; name: string }] : []); + const runId = msg.run_id as string | undefined; + + for (const tc of toolCalls) { + if (!tc.tool_call_id || resolvedToolCalls.has(tc.tool_call_id)) continue; + + const key = runId || 'unknown'; + if (!unresolvedByRun.has(key)) unresolvedByRun.set(key, []); + unresolvedByRun.get(key)!.push({ + toolCallId: tc.tool_call_id, + toolName: tc.name || 'unknown', + runId: key, + }); + } + } + + if (unresolvedByRun.size === 0) { + return { recovered: false, details: 'No unresolved approval requests found' }; + } + + // Check each run's status - only resolve orphaned approvals from terminated runs + let recoveredCount = 0; + const details: string[] = []; + + for (const [runId, approvals] of unresolvedByRun) { + if (runId === 'unknown') { + // No run_id on the approval message - can't verify, skip + details.push(`Skipped ${approvals.length} approval(s) with no run_id`); + continue; + } + + try { + const run = await client.runs.retrieve(runId); + const status = run.status; + + if (status === 'failed' || status === 'cancelled') { + console.log(`[Letta API] Found ${approvals.length} orphaned approval(s) from ${status} run ${runId}`); + + // Send denial for each unresolved tool call + const approvalResponses = approvals.map(a => ({ + approve: false as const, + tool_call_id: a.toolCallId, + type: 'approval' as const, + reason: `Auto-denied: originating run was ${status}`, + })); + + await client.conversations.messages.create(conversationId, { + messages: [{ + type: 'approval', + approvals: approvalResponses, + }], + streaming: false, + }); + + recoveredCount += approvals.length; + details.push(`Denied ${approvals.length} approval(s) from ${status} run ${runId}`); + } else { + details.push(`Run ${runId} is ${status} - not orphaned`); + } + } catch (runError) { + console.warn(`[Letta API] Failed to check run ${runId}:`, runError); + details.push(`Failed to check run ${runId}`); + } + } + + const detailStr = details.join('; '); + if (recoveredCount > 0) { + console.log(`[Letta API] Recovered ${recoveredCount} orphaned approval(s): ${detailStr}`); + return { recovered: true, details: detailStr }; + } + + return { recovered: false, details: detailStr }; + } catch (e) { + console.error('[Letta API] Failed to recover orphaned conversation approval:', e); + return { recovered: false, details: `Error: ${e instanceof Error ? e.message : String(e)}` }; + } +} + export async function disableAllToolApprovals(agentId: string): Promise { try { const tools = await getAgentTools(agentId);