From 4968fe04a8806723e5ed68dc13b8cabaac246aec Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Tue, 10 Feb 2026 13:08:04 -0800 Subject: [PATCH] fix: recover pre-stream approval conflicts in headless flows (#896) Co-authored-by: Letta --- src/agent/approval-recovery.ts | 52 +++- src/headless.ts | 215 +++++++++++++- .../prestream-approval-recovery.test.ts | 277 ++++++++++++++++++ src/tests/approval-recovery.test.ts | 65 +++- 4 files changed, 593 insertions(+), 16 deletions(-) create mode 100644 src/integration-tests/prestream-approval-recovery.test.ts diff --git a/src/agent/approval-recovery.ts b/src/agent/approval-recovery.ts index f4856f6..cc45355 100644 --- a/src/agent/approval-recovery.ts +++ b/src/agent/approval-recovery.ts @@ -5,9 +5,10 @@ import { getClient } from "./client"; // This is a specific subtype of desync - server HAS approvals but with different IDs const INVALID_TOOL_CALL_IDS_FRAGMENT = "invalid tool call ids"; -// Error when trying to SEND message but server has pending approval waiting -// This is the CONFLICT error - opposite of desync -const APPROVAL_PENDING_DETAIL_FRAGMENT = "cannot send a new message"; +// Error when trying to SEND message but server has pending approval waiting. +// Use an approval-specific fragment to avoid matching conversation-busy errors, +// which may also include "cannot send a new message". +const APPROVAL_PENDING_DETAIL_FRAGMENT = "waiting for approval"; // Error when conversation is busy (another request is being processed) // This is a 409 CONFLICT when trying to send while a run is active @@ -65,6 +66,51 @@ export function isConversationBusyError(detail: unknown): boolean { return detail.toLowerCase().includes(CONVERSATION_BUSY_DETAIL_FRAGMENT); } +export type PreStreamConflictKind = + | "approval_pending" + | "conversation_busy" + | null; + +export type PreStreamErrorAction = + | "resolve_approval_pending" + | "retry_conversation_busy" + | "rethrow"; + +/** + * Classify pre-stream 409 conflict details so callers can route recovery logic. + */ +export function classifyPreStreamConflict( + detail: unknown, +): PreStreamConflictKind { + if (isApprovalPendingError(detail)) return "approval_pending"; + if (isConversationBusyError(detail)) return "conversation_busy"; + return null; +} + +/** + * Determine pre-stream recovery action for one-shot headless sends. + */ +export function getPreStreamErrorAction( + detail: unknown, + conversationBusyRetries: number, + maxConversationBusyRetries: number, +): PreStreamErrorAction { + const kind = classifyPreStreamConflict(detail); + + if (kind === "approval_pending") { + return "resolve_approval_pending"; + } + + if ( + kind === "conversation_busy" && + conversationBusyRetries < maxConversationBusyRetries + ) { + return "retry_conversation_busy"; + } + + return "rethrow"; +} + export async function fetchRunErrorDetail( runId: string | null | undefined, ): Promise { diff --git a/src/headless.ts b/src/headless.ts index 8af5bdd..e95454e 100644 --- a/src/headless.ts +++ b/src/headless.ts @@ -10,8 +10,8 @@ import type { StopReasonType } from "@letta-ai/letta-client/resources/runs/runs" import type { ApprovalResult } from "./agent/approval-execution"; import { fetchRunErrorDetail, + getPreStreamErrorAction, isApprovalPendingError, - isConversationBusyError, isInvalidToolCallIdsError, } from "./agent/approval-recovery"; import { getClient } from "./agent/client"; @@ -1206,11 +1206,38 @@ ${SYSTEM_REMINDER_CLOSE} errorDetail = preStreamError.message; } + const preStreamAction = getPreStreamErrorAction( + errorDetail, + conversationBusyRetries, + CONVERSATION_BUSY_MAX_RETRIES, + ); + + // Check for pending approval blocking new messages - resolve and retry. + // This is distinct from "conversation busy" and needs approval resolution, + // not just a timed delay. + if (preStreamAction === "resolve_approval_pending") { + if (outputFormat === "stream-json") { + const recoveryMsg: RecoveryMessage = { + type: "recovery", + recovery_type: "approval_pending", + message: + "Detected pending approval conflict on send; resolving before retry", + session_id: sessionId, + uuid: `recovery-pre-stream-${crypto.randomUUID()}`, + }; + console.log(JSON.stringify(recoveryMsg)); + } else { + console.error( + "Pending approval detected, resolving before retry...", + ); + } + + await resolveAllPendingApprovals(); + continue; + } + // Check for 409 "conversation busy" error - retry once with delay - if ( - isConversationBusyError(errorDetail) && - conversationBusyRetries < CONVERSATION_BUSY_MAX_RETRIES - ) { + if (preStreamAction === "retry_conversation_busy") { conversationBusyRetries += 1; // Emit retry message for stream-json mode @@ -1884,7 +1911,7 @@ ${SYSTEM_REMINDER_CLOSE} async function runBidirectionalMode( agent: AgentState, conversationId: string, - _client: Letta, + client: Letta, _outputFormat: string, includePartialMessages: boolean, ): Promise { @@ -1908,6 +1935,130 @@ async function runBidirectionalMode( // Track current operation for interrupt support let currentAbortController: AbortController | null = null; + // Resolve pending approvals for this conversation before retrying user input. + const resolveAllPendingApprovals = async () => { + const { getResumeData } = await import("./agent/check-approval"); + while (true) { + // Re-fetch agent to get latest in-context messages (source of truth for backend) + const freshAgent = await client.agents.retrieve(agent.id); + + let resume: Awaited>; + try { + resume = await getResumeData(client, freshAgent, conversationId); + } catch (error) { + // Treat 404/422 as "no approvals" - stale message/conversation state + if ( + error instanceof APIError && + (error.status === 404 || error.status === 422) + ) { + break; + } + throw error; + } + + const pendingApprovals = resume.pendingApprovals || []; + if (pendingApprovals.length === 0) break; + + type Decision = + | { + type: "approve"; + approval: { + toolCallId: string; + toolName: string; + toolArgs: string; + }; + reason: string; + matchedRule: string; + } + | { + type: "deny"; + approval: { + toolCallId: string; + toolName: string; + toolArgs: string; + }; + reason: string; + }; + + const { autoAllowed, autoDenied } = await classifyApprovals( + pendingApprovals, + { + treatAskAsDeny: true, + denyReasonForAsk: "Tool requires approval (headless mode)", + requireArgsForAutoApprove: true, + missingNameReason: "Tool call incomplete - missing name", + }, + ); + + const decisions: Decision[] = [ + ...autoAllowed.map((ac) => ({ + type: "approve" as const, + approval: ac.approval, + reason: ac.permission.reason || "Allowed by permission rule", + matchedRule: + "matchedRule" in ac.permission && ac.permission.matchedRule + ? ac.permission.matchedRule + : "auto-approved", + })), + ...autoDenied.map((ac) => { + const fallback = + "matchedRule" in ac.permission && ac.permission.matchedRule + ? `Permission denied: ${ac.permission.matchedRule}` + : ac.permission.reason + ? `Permission denied: ${ac.permission.reason}` + : "Permission denied: Unknown reason"; + return { + type: "deny" as const, + approval: ac.approval, + reason: ac.denyReason ?? fallback, + }; + }), + ]; + + const { executeApprovalBatch } = await import( + "./agent/approval-execution" + ); + const executedResults = await executeApprovalBatch(decisions); + + const approvalInput: ApprovalCreate = { + type: "approval", + approvals: executedResults as ApprovalResult[], + }; + + const approvalMessages: Array< + | import("@letta-ai/letta-client/resources/agents/agents").MessageCreate + | import("@letta-ai/letta-client/resources/agents/messages").ApprovalCreate + > = [approvalInput]; + + { + const { consumeQueuedSkillContent } = await import( + "./tools/impl/skillContentRegistry" + ); + const skillContents = consumeQueuedSkillContent(); + if (skillContents.length > 0) { + approvalMessages.push({ + role: "user" as const, + content: skillContents.map((sc) => ({ + type: "text" as const, + text: sc.content, + })), + }); + } + } + + const approvalStream = await sendMessageStream( + conversationId, + approvalMessages, + { agentId: agent.id }, + ); + await drainStreamWithResume( + approvalStream, + createBuffers(agent.id), + () => {}, + ); + } + }; + // Create readline interface for stdin const rl = readline.createInterface({ input: process.stdin, @@ -2257,10 +2408,54 @@ async function runBidirectionalMode( } } - // Send message to agent - const stream = await sendMessageStream(conversationId, currentInput, { - agentId: agent.id, - }); + // Send message to agent. + // Wrap in try-catch to handle pre-stream 409 approval-pending errors. + let stream: Awaited>; + try { + stream = await sendMessageStream(conversationId, currentInput, { + agentId: agent.id, + }); + } catch (preStreamError) { + let errorDetail = ""; + if ( + preStreamError instanceof APIError && + preStreamError.error && + typeof preStreamError.error === "object" + ) { + const errObj = preStreamError.error as Record; + if ( + errObj.error && + typeof errObj.error === "object" && + "detail" in errObj.error + ) { + const nested = errObj.error as Record; + errorDetail = + typeof nested.detail === "string" ? nested.detail : ""; + } + if (!errorDetail && typeof errObj.detail === "string") { + errorDetail = errObj.detail; + } + } + if (!errorDetail && preStreamError instanceof Error) { + errorDetail = preStreamError.message; + } + + if (isApprovalPendingError(errorDetail)) { + const recoveryMsg: RecoveryMessage = { + type: "recovery", + recovery_type: "approval_pending", + message: + "Detected pending approval conflict on send; resolving before retry", + session_id: sessionId, + uuid: `recovery-bidir-${crypto.randomUUID()}`, + }; + console.log(JSON.stringify(recoveryMsg)); + await resolveAllPendingApprovals(); + continue; + } + + throw preStreamError; + } const streamJsonHook: DrainStreamHook = ({ chunk, shouldOutput, diff --git a/src/integration-tests/prestream-approval-recovery.test.ts b/src/integration-tests/prestream-approval-recovery.test.ts new file mode 100644 index 0000000..c06c466 --- /dev/null +++ b/src/integration-tests/prestream-approval-recovery.test.ts @@ -0,0 +1,277 @@ +import { describe, expect, test } from "bun:test"; +import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process"; + +const TOOL_TRIGGER_PROMPT = + "Use the Bash tool exactly once with command: echo test123. Do not ask clarifying questions."; +const FOLLOWUP_PROMPT = "Say OK only. Do not call tools."; + +interface StreamMessage { + type?: string; + subtype?: string; + message_type?: string; + recovery_type?: string; + conversation_id?: string; + request?: { subtype?: string }; + [key: string]: unknown; +} + +interface PendingApprovalSession { + conversationId: string; + stop: () => void; + messages: StreamMessage[]; +} + +function parseJsonLines(text: string): StreamMessage[] { + return text + .split(/\r?\n/) + .map((line) => line.trim()) + .filter(Boolean) + .flatMap((line) => { + try { + return [JSON.parse(line) as StreamMessage]; + } catch { + return []; + } + }); +} + +async function startPendingApprovalSession( + timeoutMs = 180000, +): Promise { + return new Promise((resolve, reject) => { + const proc: ChildProcessWithoutNullStreams = spawn( + "bun", + [ + "run", + "dev", + "--input-format", + "stream-json", + "--output-format", + "stream-json", + "--new-agent", + "--new", + "-m", + "haiku", + ], + { + cwd: process.cwd(), + env: { ...process.env, LETTA_CODE_AGENT_ROLE: "subagent" }, + }, + ); + + let stdoutBuffer = ""; + let stderrBuffer = ""; + const messages: StreamMessage[] = []; + + let settled = false; + let conversationId: string | undefined; + let promptAttempts = 0; + + const sendPrompt = () => { + if (promptAttempts >= 3) return; + promptAttempts += 1; + proc.stdin.write( + `${JSON.stringify({ + type: "user", + message: { role: "user", content: TOOL_TRIGGER_PROMPT }, + })}\n`, + ); + }; + + const stop = () => { + proc.stdin.end(); + proc.kill(); + }; + + const timeout = setTimeout(() => { + if (settled) return; + settled = true; + stop(); + reject( + new Error( + `Timed out waiting for pending approval after ${timeoutMs}ms\nSTDERR:\n${stderrBuffer}`, + ), + ); + }, timeoutMs); + + const complete = () => { + if (!conversationId) { + settled = true; + clearTimeout(timeout); + stop(); + reject( + new Error( + "Pending approval detected before conversation ID was known", + ), + ); + return; + } + settled = true; + clearTimeout(timeout); + resolve({ conversationId, stop, messages }); + }; + + const onMessage = (msg: StreamMessage) => { + messages.push(msg); + + if ( + msg.type === "system" && + msg.subtype === "init" && + typeof msg.conversation_id === "string" + ) { + conversationId = msg.conversation_id; + sendPrompt(); + return; + } + + // If model responded without tool call, retry prompt up to max attempts. + if (msg.type === "result" && promptAttempts < 3) { + sendPrompt(); + return; + } + + // Pending approval is active when bidirectional mode asks for permission. + if ( + msg.type === "control_request" && + msg.request?.subtype === "can_use_tool" + ) { + complete(); + } + }; + + proc.stdout.on("data", (data) => { + stdoutBuffer += data.toString(); + const lines = stdoutBuffer.split(/\r?\n/); + stdoutBuffer = lines.pop() || ""; + + for (const line of lines) { + try { + onMessage(JSON.parse(line)); + } catch { + // Ignore non-JSON output lines + } + } + }); + + proc.stderr.on("data", (data) => { + stderrBuffer += data.toString(); + }); + + proc.on("close", (code) => { + if (settled) return; + settled = true; + clearTimeout(timeout); + reject( + new Error( + `Pending-approval process exited early (code=${code ?? "null"})\nSTDERR:\n${stderrBuffer}`, + ), + ); + }); + + proc.on("error", (error) => { + if (settled) return; + settled = true; + clearTimeout(timeout); + reject(error); + }); + }); +} + +async function runOneShotAgainstConversation( + conversationId: string, + timeoutMs = 180000, +): Promise<{ code: number | null; messages: StreamMessage[]; stderr: string }> { + return new Promise((resolve, reject) => { + const proc = spawn( + "bun", + [ + "run", + "dev", + "-p", + FOLLOWUP_PROMPT, + "--conversation", + conversationId, + "--output-format", + "stream-json", + ], + { + cwd: process.cwd(), + env: { ...process.env, LETTA_CODE_AGENT_ROLE: "subagent" }, + }, + ); + + let stdout = ""; + let stderr = ""; + let settled = false; + + const timeout = setTimeout(() => { + if (settled) return; + settled = true; + proc.kill(); + reject( + new Error(`Timed out waiting for one-shot run after ${timeoutMs}ms`), + ); + }, timeoutMs); + + proc.stdout.on("data", (data) => { + stdout += data.toString(); + }); + + proc.stderr.on("data", (data) => { + stderr += data.toString(); + }); + + proc.on("close", (code) => { + if (settled) return; + settled = true; + clearTimeout(timeout); + resolve({ code, messages: parseJsonLines(stdout), stderr }); + }); + + proc.on("error", (error) => { + if (settled) return; + settled = true; + clearTimeout(timeout); + reject(error); + }); + }); +} + +describe("pre-stream approval recovery", () => { + const maybeTest = + process.env.LETTA_RUN_PRESTREAM_APPROVAL_RECOVERY_TEST === "1" + ? test + : test.skip; + + maybeTest( + "recovers from pre-stream approval conflict and retries successfully", + async () => { + const pending = await startPendingApprovalSession(); + + try { + const result = await runOneShotAgainstConversation( + pending.conversationId, + ); + + if (result.code !== 0) { + throw new Error( + `Expected one-shot run to succeed, got exit code ${result.code}\nSTDERR:\n${result.stderr}`, + ); + } + + const recoveryEvent = result.messages.find( + (m) => + m.type === "recovery" && m.recovery_type === "approval_pending", + ); + expect(recoveryEvent).toBeDefined(); + + const resultEvent = result.messages.find((m) => m.type === "result"); + expect(resultEvent).toBeDefined(); + expect(resultEvent?.subtype).toBe("success"); + } finally { + pending.stop(); + } + }, + 240000, + ); +}); diff --git a/src/tests/approval-recovery.test.ts b/src/tests/approval-recovery.test.ts index 29a2554..90f68e0 100644 --- a/src/tests/approval-recovery.test.ts +++ b/src/tests/approval-recovery.test.ts @@ -1,6 +1,8 @@ import { describe, expect, test } from "bun:test"; import type { Message } from "@letta-ai/letta-client/resources/agents/messages"; import { + classifyPreStreamConflict, + getPreStreamErrorAction, isApprovalPendingError, isConversationBusyError, isInvalidToolCallIdsError, @@ -63,15 +65,23 @@ describe("isApprovalPendingError", () => { }); test("detects approval pending error case-insensitively", () => { - expect(isApprovalPendingError("CANNOT SEND A NEW MESSAGE")).toBe(true); - expect(isApprovalPendingError("cannot send a new message")).toBe(true); + expect(isApprovalPendingError("WAITING FOR APPROVAL")).toBe(true); + expect(isApprovalPendingError("waiting for approval")).toBe(true); }); test("detects partial match in longer message", () => { - const detail = "Error occurred: Cannot send a new message while processing"; + const detail = + "Error occurred: agent is waiting for approval while processing"; expect(isApprovalPendingError(detail)).toBe(true); }); + test("does not misclassify conversation busy conflict as approval pending", () => { + const busyDetail = + "CONFLICT: Cannot send a new message: Another request is currently being processed for this conversation."; + expect(isConversationBusyError(busyDetail)).toBe(true); + expect(isApprovalPendingError(busyDetail)).toBe(false); + }); + test("returns false for desync errors (opposite case)", () => { // These are the OPPOSITE error - when we send approval but there's nothing pending expect( @@ -138,6 +148,55 @@ describe("isConversationBusyError", () => { }); }); +describe("classifyPreStreamConflict", () => { + test("classifies approval-pending conflict distinctly from busy conflict", () => { + const approvalDetail = + "CONFLICT: Cannot send a new message: The agent is waiting for approval on a tool call."; + const busyDetail = + "CONFLICT: Cannot send a new message: Another request is currently being processed for this conversation."; + + expect(classifyPreStreamConflict(approvalDetail)).toBe("approval_pending"); + expect(classifyPreStreamConflict(busyDetail)).toBe("conversation_busy"); + }); + + test("returns null for non-conflict errors", () => { + expect(classifyPreStreamConflict("Rate limit exceeded")).toBeNull(); + }); +}); + +describe("getPreStreamErrorAction", () => { + test("returns resolve_approval_pending for approval conflict details", () => { + const detail = + "CONFLICT: Cannot send a new message: The agent is waiting for approval on a tool call."; + + expect(getPreStreamErrorAction(detail, 0, 1)).toBe( + "resolve_approval_pending", + ); + }); + + test("returns retry_conversation_busy when busy and retries remain", () => { + const detail = + "CONFLICT: Cannot send a new message: Another request is currently being processed for this conversation."; + + expect(getPreStreamErrorAction(detail, 0, 1)).toBe( + "retry_conversation_busy", + ); + }); + + test("returns rethrow when conversation busy retries are exhausted", () => { + const detail = + "CONFLICT: Cannot send a new message: Another request is currently being processed for this conversation."; + + expect(getPreStreamErrorAction(detail, 1, 1)).toBe("rethrow"); + }); + + test("returns rethrow for unrelated errors", () => { + expect(getPreStreamErrorAction("Rate limit exceeded", 0, 1)).toBe( + "rethrow", + ); + }); +}); + /** * Tests for parallel tool call approval extraction. * Ensures lazy recovery handles multiple simultaneous tool calls correctly.