fix: client-side defensive recovery for orphaned approval_request_messages (#182)

When a conversation has an orphaned approval_request_message from a
cancelled/failed run, every subsequent message fails with 409 CONFLICT.
The existing attemptRecovery() can't find these because it checks
agent.pending_approval (null) and scans runs with stop_reason=requires_approval
(but the orphaned run is cancelled/failed).

Adds recoverOrphanedConversationApproval() which directly inspects conversation
messages, finds unresolved approval requests, verifies the originating run is
terminated, and sends denials to clear the stuck state. Both processMessage()
and sendToAgent() now catch CONFLICT errors and retry once after recovery.

Fixes #180

Written by Cameron ◯ Letta Code

"It is not the strongest of the species that survives, nor the most intelligent,
but the one most responsive to change." - Charles Darwin
This commit is contained in:
Cameron
2026-02-05 17:46:44 -08:00
committed by GitHub
parent 3b7150013c
commit 3ff33fee87
2 changed files with 177 additions and 3 deletions

View File

@@ -9,13 +9,29 @@ import { mkdirSync } from 'node:fs';
import type { ChannelAdapter } from '../channels/types.js';
import type { BotConfig, InboundMessage, TriggerContext } from './types.js';
import { Store } from './store.js';
import { updateAgentName, getPendingApprovals, rejectApproval, cancelRuns, disableAllToolApprovals } from '../tools/letta-api.js';
import { updateAgentName, getPendingApprovals, rejectApproval, cancelRuns, disableAllToolApprovals, recoverOrphanedConversationApproval } from '../tools/letta-api.js';
import { installSkillsToAgent } from '../skills/loader.js';
import { formatMessageEnvelope, type SessionContextOptions } from './formatter.js';
import { loadMemoryBlocks } from './memory.js';
import { SYSTEM_PROMPT } from './system-prompt.js';
import { StreamWatchdog } from './stream-watchdog.js';
/**
* Detect if an error is a 409 CONFLICT from an orphaned approval.
* The error may come as a ConflictError from the Letta client (status 409)
* or as an error message string through the CLI transport.
*/
function isApprovalConflictError(error: unknown): boolean {
if (error instanceof Error) {
const msg = error.message.toLowerCase();
if (msg.includes('waiting for approval')) return true;
if (msg.includes('conflict') && msg.includes('approval')) return true;
}
const statusError = error as { status?: number };
if (statusError?.status === 409) return true;
return false;
}
export class LettaBot {
private store: Store;
private config: BotConfig;
@@ -237,7 +253,7 @@ export class LettaBot {
/**
* Process a single message
*/
private async processMessage(msg: InboundMessage, adapter: ChannelAdapter): Promise<void> {
private async processMessage(msg: InboundMessage, adapter: ChannelAdapter, retried = false): Promise<void> {
console.log('[Bot] Starting processMessage');
// Track when user last sent a message (for heartbeat skip logic)
this.lastUserMessageTime = new Date();
@@ -373,6 +389,20 @@ export class LettaBot {
try {
await withTimeout(session.send(formattedMessage), 'Session send');
} catch (sendError) {
// Check for 409 CONFLICT from orphaned approval_request_message
if (!retried && isApprovalConflictError(sendError) && this.store.agentId && this.store.conversationId) {
console.log('[Bot] CONFLICT detected - attempting orphaned approval recovery...');
session.close();
const result = await recoverOrphanedConversationApproval(
this.store.agentId,
this.store.conversationId
);
if (result.recovered) {
console.log(`[Bot] Recovery succeeded (${result.details}), retrying message...`);
return this.processMessage(msg, adapter, /* retried */ true);
}
console.error(`[Bot] Orphaned approval recovery failed: ${result.details}`);
}
console.error('[Bot] Error sending message:', sendError);
throw sendError;
}
@@ -638,7 +668,8 @@ export class LettaBot {
private async _sendToAgentInternal(
text: string,
_context?: TriggerContext
_context?: TriggerContext,
retried = false
): Promise<string> {
// Base options for sessions (systemPrompt/memory set via createAgent for new agents)
const baseOptions = {
@@ -679,6 +710,21 @@ export class LettaBot {
try {
await session.send(text);
} catch (error) {
// Check for 409 CONFLICT from orphaned approval_request_message
if (!retried && isApprovalConflictError(error) && this.store.agentId && this.store.conversationId) {
console.log('[Bot] CONFLICT in sendToAgent - attempting orphaned approval recovery...');
session.close();
const result = await recoverOrphanedConversationApproval(
this.store.agentId,
this.store.conversationId
);
if (result.recovered) {
console.log(`[Bot] Recovery succeeded (${result.details}), retrying sendToAgent...`);
return this._sendToAgentInternal(text, _context, /* retried */ true);
}
console.error(`[Bot] Orphaned approval recovery failed: ${result.details}`);
throw error;
}
if (usedSpecificConversation && this.store.agentId) {
console.warn('[Bot] Conversation missing, creating a new conversation...');
session.close();

View File

@@ -509,6 +509,134 @@ export async function ensureNoToolApprovals(agentId: string): Promise<void> {
* Disable approval requirement for ALL tools on an agent.
* Useful for ensuring a headless deployment doesn't get stuck.
*/
/**
* Recover from orphaned approval_request_messages by directly inspecting the conversation.
*
* Unlike getPendingApprovals() which relies on agent.pending_approval or run stop_reason,
* this function looks at the actual conversation messages to find unresolved approval requests
* from terminated (failed/cancelled) runs.
*
* Returns { recovered: true } if orphaned approvals were found and resolved.
*/
export async function recoverOrphanedConversationApproval(
agentId: string,
conversationId: string
): Promise<{ recovered: boolean; details: string }> {
try {
const client = getClient();
// List recent messages from the conversation to find orphaned approvals
const messagesPage = await client.conversations.messages.list(conversationId, { limit: 50 });
const messages: Array<Record<string, unknown>> = [];
for await (const msg of messagesPage) {
messages.push(msg as unknown as Record<string, unknown>);
}
if (messages.length === 0) {
return { recovered: false, details: 'No messages in conversation' };
}
// Build set of tool_call_ids that already have approval responses
const resolvedToolCalls = new Set<string>();
for (const msg of messages) {
if (msg.message_type === 'approval_response_message') {
const approvals = (msg.approvals as Array<{ tool_call_id?: string }>) || [];
for (const a of approvals) {
if (a.tool_call_id) resolvedToolCalls.add(a.tool_call_id);
}
}
}
// Find unresolved approval_request_messages
interface UnresolvedApproval {
toolCallId: string;
toolName: string;
runId: string;
}
const unresolvedByRun = new Map<string, UnresolvedApproval[]>();
for (const msg of messages) {
if (msg.message_type !== 'approval_request_message') continue;
const toolCalls = (msg.tool_calls as Array<{ tool_call_id: string; name: string }>)
|| (msg.tool_call ? [msg.tool_call as { tool_call_id: string; name: string }] : []);
const runId = msg.run_id as string | undefined;
for (const tc of toolCalls) {
if (!tc.tool_call_id || resolvedToolCalls.has(tc.tool_call_id)) continue;
const key = runId || 'unknown';
if (!unresolvedByRun.has(key)) unresolvedByRun.set(key, []);
unresolvedByRun.get(key)!.push({
toolCallId: tc.tool_call_id,
toolName: tc.name || 'unknown',
runId: key,
});
}
}
if (unresolvedByRun.size === 0) {
return { recovered: false, details: 'No unresolved approval requests found' };
}
// Check each run's status - only resolve orphaned approvals from terminated runs
let recoveredCount = 0;
const details: string[] = [];
for (const [runId, approvals] of unresolvedByRun) {
if (runId === 'unknown') {
// No run_id on the approval message - can't verify, skip
details.push(`Skipped ${approvals.length} approval(s) with no run_id`);
continue;
}
try {
const run = await client.runs.retrieve(runId);
const status = run.status;
if (status === 'failed' || status === 'cancelled') {
console.log(`[Letta API] Found ${approvals.length} orphaned approval(s) from ${status} run ${runId}`);
// Send denial for each unresolved tool call
const approvalResponses = approvals.map(a => ({
approve: false as const,
tool_call_id: a.toolCallId,
type: 'approval' as const,
reason: `Auto-denied: originating run was ${status}`,
}));
await client.conversations.messages.create(conversationId, {
messages: [{
type: 'approval',
approvals: approvalResponses,
}],
streaming: false,
});
recoveredCount += approvals.length;
details.push(`Denied ${approvals.length} approval(s) from ${status} run ${runId}`);
} else {
details.push(`Run ${runId} is ${status} - not orphaned`);
}
} catch (runError) {
console.warn(`[Letta API] Failed to check run ${runId}:`, runError);
details.push(`Failed to check run ${runId}`);
}
}
const detailStr = details.join('; ');
if (recoveredCount > 0) {
console.log(`[Letta API] Recovered ${recoveredCount} orphaned approval(s): ${detailStr}`);
return { recovered: true, details: detailStr };
}
return { recovered: false, details: detailStr };
} catch (e) {
console.error('[Letta API] Failed to recover orphaned conversation approval:', e);
return { recovered: false, details: `Error: ${e instanceof Error ? e.message : String(e)}` };
}
}
export async function disableAllToolApprovals(agentId: string): Promise<number> {
try {
const tools = await getAgentTools(agentId);