From c5b9f50767235fc339dc0b027006ff2f2bd53ce1 Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Sun, 2 Nov 2025 22:12:12 -0800 Subject: [PATCH] fix: handle possible case of desync on pending approvals (#53) --- src/agent/check-approval.ts | 89 ++++++++++++++++++++++++++++++++----- src/cli/App.tsx | 53 ++++++++++++++++------ src/headless.ts | 31 ++++++++++++- src/index.ts | 2 +- 4 files changed, 146 insertions(+), 29 deletions(-) diff --git a/src/agent/check-approval.ts b/src/agent/check-approval.ts index ffde657..db9cc82 100644 --- a/src/agent/check-approval.ts +++ b/src/agent/check-approval.ts @@ -2,6 +2,7 @@ // Check for pending approvals and retrieve recent message history when resuming an agent import type Letta from "@letta-ai/letta-client"; +import type { AgentState } from "@letta-ai/letta-client/resources/agents/agents"; import type { LettaMessageUnion } from "@letta-ai/letta-client/resources/agents/messages"; import type { ApprovalRequest } from "../cli/helpers/stream"; @@ -18,34 +19,98 @@ export interface ResumeData { * Checks for pending approvals and retrieves recent message history for backfill. * * @param client - The Letta client - * @param agentId - The agent ID + * @param agent - The agent state (includes in-context messages) * @returns Pending approval (if any) and recent message history */ export async function getResumeData( client: Letta, - agentId: string, + agent: AgentState, ): Promise { try { - const messagesPage = await client.agents.messages.list(agentId); + const messagesPage = await client.agents.messages.list(agent.id); const messages = messagesPage.items; if (!messages || messages.length === 0) { return { pendingApproval: null, messageHistory: [] }; } - // Check for pending approval (last message) + // Compare cursor last message with in-context last message ID + // The backend uses in-context messages for CONFLICT validation, so if they're + // desynced, we need to check the in-context message for pending approvals + const cursorLastMessage = messages[messages.length - 1]; + if (!cursorLastMessage) { + return { pendingApproval: null, messageHistory: [] }; + } + + const inContextLastMessageId = + agent.message_ids && agent.message_ids.length > 0 + ? agent.message_ids[agent.message_ids.length - 1] + : null; + + let messageToCheck = cursorLastMessage; + + // If there's a desync, find the in-context message in the cursor fetch + if ( + inContextLastMessageId && + cursorLastMessage.id !== inContextLastMessageId + ) { + console.warn( + `[check-approval] Desync detected - cursor last: ${cursorLastMessage.id}, in-context last: ${inContextLastMessageId}`, + ); + + // Search for the in-context message in the fetched messages + // NOTE: There might be multiple messages with the same ID (duplicates) + // We want the one with role === "approval" if it exists + const matchingMessages = messages.filter( + (msg) => msg.id === inContextLastMessageId, + ); + + if (matchingMessages.length > 0) { + // Prefer the approval request message if it exists (duplicates can have different types) + const approvalMessage = matchingMessages.find( + (msg) => msg.message_type === "approval_request_message", + ); + const inContextMessage = + approvalMessage || matchingMessages[matchingMessages.length - 1]!; + + messageToCheck = inContextMessage; + } else { + console.warn( + `[check-approval] In-context message ${inContextLastMessageId} not found in cursor fetch.\n` + + ` This likely means the in-context message is older than the cursor window.\n` + + ` Falling back to cursor message - approval state may be incorrect.`, + ); + // Fall back to cursor message if we can't find the in-context one + } + } + + // Check for pending approval using SDK types let pendingApproval: ApprovalRequest | null = null; - const lastMessage = messages[messages.length - 1]; - if (lastMessage?.message_type === "approval_request_message") { + + if (messageToCheck.message_type === "approval_request_message") { + // Cast to access tool_calls with proper typing + const approvalMsg = messageToCheck as LettaMessageUnion & { + tool_calls?: Array<{ + tool_call_id?: string; + name?: string; + arguments?: string; + }>; + tool_call?: { + tool_call_id?: string; + name?: string; + arguments?: string; + }; + }; + // Use tool_calls array (new) or fallback to tool_call (deprecated) - const toolCalls = Array.isArray(lastMessage.tool_calls) - ? lastMessage.tool_calls - : lastMessage.tool_call - ? [lastMessage.tool_call] + const toolCalls = Array.isArray(approvalMsg.tool_calls) + ? approvalMsg.tool_calls + : approvalMsg.tool_call + ? [approvalMsg.tool_call] : []; if (toolCalls.length > 0) { const toolCall = toolCalls[0]; - // Ensure all required fields are present (type guard for ToolCall vs ToolCallDelta) + // Ensure all required fields are present if (toolCall?.tool_call_id && toolCall.name && toolCall.arguments) { pendingApproval = { toolCallId: toolCall.tool_call_id, @@ -56,7 +121,7 @@ export async function getResumeData( } } - // Get last N messages for backfill + // Get last N messages for backfill (always use cursor messages for history) const historyCount = Math.min(MESSAGE_HISTORY_LIMIT, messages.length); let messageHistory = messages.slice(-historyCount); diff --git a/src/cli/App.tsx b/src/cli/App.tsx index 3621d24..0ca40c6 100644 --- a/src/cli/App.tsx +++ b/src/cli/App.tsx @@ -394,12 +394,13 @@ export default function App({ while (true) { // Stream one turn const stream = await sendMessageStream(agentId, currentInput); - const { stopReason, approval, apiDurationMs } = await drainStream( - stream, - buffersRef.current, - refreshDerivedThrottled, - abortControllerRef.current.signal, - ); + const { stopReason, approval, apiDurationMs, lastRunId } = + await drainStream( + stream, + buffersRef.current, + refreshDerivedThrottled, + abortControllerRef.current.signal, + ); // Track API duration sessionStatsRef.current.endTurn(apiDurationMs); @@ -520,17 +521,39 @@ export default function App({ continue; // Loop continues naturally } - // TODO: for error stop reasons, fetch step details - // using lastRunId to get full error message from step.errorData - // Example: client.runs.steps.list(lastRunId, { limit: 1, order: "desc" }) - // Then display step.errorData.message or full error details instead of generic message - // Unexpected stop reason (error, llm_api_error, etc.) // Mark incomplete tool calls as finished to prevent stuck blinking UI markIncompleteToolsAsCancelled(buffersRef.current); - // Show stop reason (mid-stream errors should already be in buffers) - appendError(`Unexpected stop reason: ${stopReason}`); + // Fetch error details from the run if available + let errorDetails = `Unexpected stop reason: ${stopReason}`; + if (lastRunId) { + try { + const client = await getClient(); + const run = await client.runs.retrieve(lastRunId); + + // Check if run has error information in metadata + if (run.metadata?.error) { + const error = run.metadata.error as { + type?: string; + message?: string; + detail?: string; + }; + const errorType = error.type ? `[${error.type}] ` : ""; + const errorMessage = error.message || "An error occurred"; + const errorDetail = error.detail ? `\n${error.detail}` : ""; + errorDetails = `${errorType}${errorMessage}${errorDetail}`; + } + } catch (e) { + // If we can't fetch error details, let user know + appendError( + `${errorDetails}\n(Unable to fetch additional error details from server)`, + ); + return; + } + } + + appendError(errorDetails); setStreaming(false); refreshDerived(); @@ -889,9 +912,11 @@ export default function App({ if (CHECK_PENDING_APPROVALS_BEFORE_SEND) { try { const client = await getClient(); + // Fetch fresh agent state to check for pending approvals with accurate in-context messages + const agent = await client.agents.retrieve(agentId); const { pendingApproval: existingApproval } = await getResumeData( client, - agentId, + agent, ); if (existingApproval) { diff --git a/src/headless.ts b/src/headless.ts index 458d9a1..1dc1e88 100644 --- a/src/headless.ts +++ b/src/headless.ts @@ -146,7 +146,9 @@ export async function handleHeadlessCommand(argv: string[], model?: string) { const resolveAllPendingApprovals = async () => { const { getResumeData } = await import("./agent/check-approval"); while (true) { - const resume = await getResumeData(client, agent.id); + // Re-fetch agent to get latest in-context messages (source of truth for backend) + const freshAgent = await client.agents.retrieve(agent.id); + const resume = await getResumeData(client, freshAgent); if (!resume.pendingApproval) break; const { toolCallId, toolName, toolArgs } = resume.pendingApproval; const parsedArgs = safeJsonParseOr>( @@ -248,6 +250,7 @@ export async function handleHeadlessCommand(argv: string[], model?: string) { toolArgs: string; } | null = null; let apiDurationMs: number; + let lastRunId: string | null = null; if (outputFormat === "stream-json") { const startTime = performance.now(); @@ -435,6 +438,8 @@ export async function handleHeadlessCommand(argv: string[], model?: string) { stopReason = lastStopReason || "error"; apiDurationMs = performance.now() - startTime; + // Use the last run_id we saw (if any) + lastRunId = runIds.size > 0 ? Array.from(runIds).pop() || null : null; // Mark final line as finished const { markCurrentLineAsFinished } = await import( @@ -451,6 +456,7 @@ export async function handleHeadlessCommand(argv: string[], model?: string) { stopReason = result.stopReason; approval = result.approval || null; apiDurationMs = result.apiDurationMs; + lastRunId = result.lastRunId || null; } // Track API duration for this stream @@ -556,11 +562,32 @@ export async function handleHeadlessCommand(argv: string[], model?: string) { .map((line) => ("text" in line ? line.text : "")) .filter(Boolean); - const errorMessage = + let errorMessage = errorMessages.length > 0 ? errorMessages.join("; ") : `Unexpected stop reason: ${stopReason}`; + // Fetch detailed error from run metadata if available + if (lastRunId && errorMessages.length === 0) { + try { + const run = await client.runs.retrieve(lastRunId); + if (run.metadata?.error) { + const error = run.metadata.error as { + type?: string; + message?: string; + detail?: string; + }; + const errorType = error.type ? `[${error.type}] ` : ""; + const errorMsg = error.message || "An error occurred"; + const errorDetail = error.detail ? `: ${error.detail}` : ""; + errorMessage = `${errorType}${errorMsg}${errorDetail}`; + } + } catch (e) { + // If we can't fetch error details, append note to error message + errorMessage = `${errorMessage}\n(Unable to fetch additional error details from server)`; + } + } + if (outputFormat === "stream-json") { // Emit error event console.log( diff --git a/src/index.ts b/src/index.ts index deebe97..1f4b692 100755 --- a/src/index.ts +++ b/src/index.ts @@ -365,7 +365,7 @@ async function main() { // Get resume data (pending approval + message history) if resuming if (resuming) { setLoadingState("checking"); - const data = await getResumeData(client, agent.id); + const data = await getResumeData(client, agent); setResumeData(data); }