From f4d89863c7aaf2958173a87a13e82c36c01c9bf6 Mon Sep 17 00:00:00 2001 From: Cameron Date: Wed, 11 Mar 2026 23:33:19 -0700 Subject: [PATCH] fix(headless): support recover_pending_approvals in stream-json mode (#1361) Co-authored-by: Letta Code --- src/headless.ts | 178 +++++++++++++++++- .../headless-input-format.test.ts | 66 +++++++ src/types/protocol.ts | 32 +++- 3 files changed, 274 insertions(+), 2 deletions(-) diff --git a/src/headless.ts b/src/headless.ts index 3569813..c20a229 100644 --- a/src/headless.ts +++ b/src/headless.ts @@ -7,7 +7,10 @@ import type { } from "@letta-ai/letta-client/resources/agents/agents"; import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages"; import type { StopReasonType } from "@letta-ai/letta-client/resources/runs/runs"; -import type { ApprovalResult } from "./agent/approval-execution"; +import type { + ApprovalDecision, + ApprovalResult, +} from "./agent/approval-execution"; import { extractConflictDetail, fetchRunErrorDetail, @@ -109,6 +112,7 @@ import type { ListMessagesControlRequest, MessageWire, QueueLifecycleEvent, + RecoverPendingApprovalsControlRequest, RecoveryMessage, ResultMessage, RetryMessage, @@ -2834,6 +2838,148 @@ async function runBidirectionalMode( return result; } + async function recoverPendingApprovalsFromControlRequest( + request: RecoverPendingApprovalsControlRequest, + ): Promise<{ + recovered: boolean; + pending_approval: boolean; + approvals_processed: number; + }> { + const targetAgentId = request.agent_id ?? agent.id; + const targetConversationId = request.conversation_id ?? conversationId; + + if (targetAgentId !== agent.id) { + throw new Error( + `recover_pending_approvals agent mismatch: ${targetAgentId} != ${agent.id}`, + ); + } + + const { getResumeData } = await import("./agent/check-approval"); + const { executeApprovalBatch } = await import("./agent/approval-execution"); + + let approvalsProcessed = 0; + const MAX_RECOVERY_PASSES = 8; + + for (let pass = 0; pass < MAX_RECOVERY_PASSES; pass += 1) { + const freshAgent = await client.agents.retrieve(agent.id); + + let resume: Awaited>; + try { + resume = await getResumeData(client, freshAgent, targetConversationId, { + includeMessageHistory: false, + }); + } catch (error) { + if ( + error instanceof APIError && + (error.status === 404 || error.status === 422) + ) { + return { + recovered: true, + pending_approval: false, + approvals_processed: approvalsProcessed, + }; + } + throw error; + } + + const pendingApprovals = resume.pendingApprovals || []; + if (pendingApprovals.length === 0) { + return { + recovered: true, + pending_approval: false, + approvals_processed: approvalsProcessed, + }; + } + + const { autoAllowed, autoDenied, needsUserInput } = + await classifyApprovals(pendingApprovals, { + alwaysRequiresUserInput: isInteractiveApprovalTool, + requireArgsForAutoApprove: true, + missingNameReason: "Tool call incomplete - missing name", + }); + + const decisions: ApprovalDecision[] = [ + ...autoAllowed.map((ac) => ({ + type: "approve" as const, + approval: ac.approval, + })), + ...autoDenied.map((ac) => ({ + type: "deny" as const, + approval: ac.approval, + reason: ac.denyReason || ac.permission.reason || "Permission denied", + })), + ]; + + for (const ac of needsUserInput) { + const permResponse = await requestPermission( + ac.approval.toolCallId, + ac.approval.toolName, + ac.parsedArgs, + ); + + if (permResponse.decision === "allow") { + const finalApproval = permResponse.updatedInput + ? { + ...ac.approval, + toolArgs: JSON.stringify(permResponse.updatedInput), + } + : ac.approval; + decisions.push({ type: "approve", approval: finalApproval }); + } else { + decisions.push({ + type: "deny", + approval: ac.approval, + reason: permResponse.reason || "Denied by SDK callback", + }); + } + } + + if (decisions.length === 0) { + return { + recovered: false, + pending_approval: true, + approvals_processed: approvalsProcessed, + }; + } + + const executedResults = await executeApprovalBatch(decisions); + approvalsProcessed += executedResults.length; + + const approvalInput: ApprovalCreate = { + type: "approval", + approvals: executedResults as ApprovalResult[], + }; + const approvalStream = await sendMessageStream( + targetConversationId, + [approvalInput], + { agentId: agent.id }, + ); + + const drainResult = await drainStreamWithResume( + approvalStream, + createBuffers(agent.id), + () => {}, + undefined, + undefined, + undefined, + reminderContextTracker, + ); + + if (drainResult.stopReason === "error") { + throw new Error( + drainResult.fallbackError || + "recover_pending_approvals failed while applying approvals", + ); + } + } + + return { + recovered: false, + pending_approval: true, + approvals_processed: approvalsProcessed, + }; + } + // Main processing loop while (true) { const line = await getNextLine(); @@ -3024,6 +3170,36 @@ async function runBidirectionalMode( client, }); console.log(JSON.stringify(listResp)); + } else if (subtype === "recover_pending_approvals") { + const recoverReq = + message.request as RecoverPendingApprovalsControlRequest; + try { + const recovery = + await recoverPendingApprovalsFromControlRequest(recoverReq); + const recoveryResponse: ControlResponse = { + type: "control_response", + response: { + subtype: "success", + request_id: requestId ?? "", + response: recovery, + }, + session_id: sessionId, + uuid: randomUUID(), + }; + console.log(JSON.stringify(recoveryResponse)); + } catch (error) { + const recoveryError: ControlResponse = { + type: "control_response", + response: { + subtype: "error", + request_id: requestId ?? "", + error: error instanceof Error ? error.message : String(error), + }, + session_id: sessionId, + uuid: randomUUID(), + }; + console.log(JSON.stringify(recoveryError)); + } } else { const errorResponse: ControlResponse = { type: "control_response", diff --git a/src/integration-tests/headless-input-format.test.ts b/src/integration-tests/headless-input-format.test.ts index 61f82fc..9c6353b 100644 --- a/src/integration-tests/headless-input-format.test.ts +++ b/src/integration-tests/headless-input-format.test.ts @@ -393,6 +393,72 @@ describe("input-format stream-json", () => { { timeout: 200000 }, ); + test( + "recover_pending_approvals returns structured recovery payload", + async () => { + const objects = (await runBidirectional([ + JSON.stringify({ + type: "control_request", + request_id: "recover_1", + request: { subtype: "recover_pending_approvals" }, + }), + ])) as WireMessage[]; + + const controlResponse = objects.find( + (o): o is ControlResponse => + o.type === "control_response" && + o.response?.request_id === "recover_1", + ); + expect(controlResponse).toBeDefined(); + expect(controlResponse?.response.subtype).toBe("success"); + + if (controlResponse?.response.subtype === "success") { + const recovery = controlResponse.response.response as + | { + recovered?: boolean; + pending_approval?: boolean; + approvals_processed?: number; + } + | undefined; + expect(recovery?.recovered).toBe(true); + expect(recovery?.pending_approval).toBe(false); + expect(recovery?.approvals_processed).toBe(0); + } + }, + { timeout: 200000 }, + ); + + test( + "recover_pending_approvals agent mismatch returns error response", + async () => { + const objects = (await runBidirectional([ + JSON.stringify({ + type: "control_request", + request_id: "recover_mismatch_1", + request: { + subtype: "recover_pending_approvals", + agent_id: "agent-mismatch", + }, + }), + ])) as WireMessage[]; + + const controlResponse = objects.find( + (o): o is ControlResponse => + o.type === "control_response" && + o.response?.request_id === "recover_mismatch_1", + ); + expect(controlResponse).toBeDefined(); + expect(controlResponse?.response.subtype).toBe("error"); + + if (controlResponse?.response.subtype === "error") { + expect(controlResponse.response.error).toContain( + "recover_pending_approvals agent mismatch", + ); + } + }, + { timeout: 200000 }, + ); + test( "--include-partial-messages emits stream_event in bidirectional mode", async () => { diff --git a/src/types/protocol.ts b/src/types/protocol.ts index 0bc3b2d..dd2bc04 100644 --- a/src/types/protocol.ts +++ b/src/types/protocol.ts @@ -437,6 +437,7 @@ export type SdkToCliControlRequest = | { subtype: "interrupt" } | RegisterExternalToolsRequest | BootstrapSessionStateRequest + | RecoverPendingApprovalsControlRequest | ListMessagesControlRequest; /** @@ -505,6 +506,32 @@ export interface ListMessagesControlRequest { limit?: number; } +/** + * Request to recover pending approvals in the current session context (SDK → CLI). + * + * Optional agent/conversation IDs let callers target a specific thread when + * the transport has enough context to do so. + */ +export interface RecoverPendingApprovalsControlRequest { + subtype: "recover_pending_approvals"; + /** Optional explicit agent ID. Defaults to session agent. */ + agent_id?: string; + /** Optional explicit conversation ID. Defaults to session conversation. */ + conversation_id?: string; +} + +/** + * Successful recover_pending_approvals response payload. + * + * `pending_approval: true` indicates recovery completed without transport + * failure, but unresolved approvals still remain after bounded recovery passes. + */ +export interface RecoverPendingApprovalsResponsePayload { + recovered: boolean; + pending_approval: boolean; + approvals_processed: number; +} + /** * Successful list_messages response payload. */ @@ -597,7 +624,10 @@ export type ControlResponseBody = | { subtype: "success"; request_id: string; - response?: CanUseToolResponse | Record; + response?: + | CanUseToolResponse + | RecoverPendingApprovalsResponsePayload + | Record; } | { subtype: "error"; request_id: string; error: string } | ExternalToolResultResponse;