fix: recover pre-stream approval conflicts in headless flows (#896)

Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
Charles Packer
2026-02-10 13:08:04 -08:00
committed by GitHub
parent 08d8d46fdb
commit 4968fe04a8
4 changed files with 593 additions and 16 deletions

View File

@@ -5,9 +5,10 @@ import { getClient } from "./client";
// This is a specific subtype of desync - server HAS approvals but with different IDs
const INVALID_TOOL_CALL_IDS_FRAGMENT = "invalid tool call ids";
// Error when trying to SEND message but server has pending approval waiting
// This is the CONFLICT error - opposite of desync
const APPROVAL_PENDING_DETAIL_FRAGMENT = "cannot send a new message";
// Error when trying to SEND message but server has pending approval waiting.
// Use an approval-specific fragment to avoid matching conversation-busy errors,
// which may also include "cannot send a new message".
const APPROVAL_PENDING_DETAIL_FRAGMENT = "waiting for approval";
// Error when conversation is busy (another request is being processed)
// This is a 409 CONFLICT when trying to send while a run is active
@@ -65,6 +66,51 @@ export function isConversationBusyError(detail: unknown): boolean {
return detail.toLowerCase().includes(CONVERSATION_BUSY_DETAIL_FRAGMENT);
}
export type PreStreamConflictKind =
| "approval_pending"
| "conversation_busy"
| null;
export type PreStreamErrorAction =
| "resolve_approval_pending"
| "retry_conversation_busy"
| "rethrow";
/**
* Classify pre-stream 409 conflict details so callers can route recovery logic.
*/
export function classifyPreStreamConflict(
detail: unknown,
): PreStreamConflictKind {
if (isApprovalPendingError(detail)) return "approval_pending";
if (isConversationBusyError(detail)) return "conversation_busy";
return null;
}
/**
* Determine pre-stream recovery action for one-shot headless sends.
*/
export function getPreStreamErrorAction(
detail: unknown,
conversationBusyRetries: number,
maxConversationBusyRetries: number,
): PreStreamErrorAction {
const kind = classifyPreStreamConflict(detail);
if (kind === "approval_pending") {
return "resolve_approval_pending";
}
if (
kind === "conversation_busy" &&
conversationBusyRetries < maxConversationBusyRetries
) {
return "retry_conversation_busy";
}
return "rethrow";
}
export async function fetchRunErrorDetail(
runId: string | null | undefined,
): Promise<string | null> {

View File

@@ -10,8 +10,8 @@ import type { StopReasonType } from "@letta-ai/letta-client/resources/runs/runs"
import type { ApprovalResult } from "./agent/approval-execution";
import {
fetchRunErrorDetail,
getPreStreamErrorAction,
isApprovalPendingError,
isConversationBusyError,
isInvalidToolCallIdsError,
} from "./agent/approval-recovery";
import { getClient } from "./agent/client";
@@ -1206,11 +1206,38 @@ ${SYSTEM_REMINDER_CLOSE}
errorDetail = preStreamError.message;
}
const preStreamAction = getPreStreamErrorAction(
errorDetail,
conversationBusyRetries,
CONVERSATION_BUSY_MAX_RETRIES,
);
// Check for pending approval blocking new messages - resolve and retry.
// This is distinct from "conversation busy" and needs approval resolution,
// not just a timed delay.
if (preStreamAction === "resolve_approval_pending") {
if (outputFormat === "stream-json") {
const recoveryMsg: RecoveryMessage = {
type: "recovery",
recovery_type: "approval_pending",
message:
"Detected pending approval conflict on send; resolving before retry",
session_id: sessionId,
uuid: `recovery-pre-stream-${crypto.randomUUID()}`,
};
console.log(JSON.stringify(recoveryMsg));
} else {
console.error(
"Pending approval detected, resolving before retry...",
);
}
await resolveAllPendingApprovals();
continue;
}
// Check for 409 "conversation busy" error - retry once with delay
if (
isConversationBusyError(errorDetail) &&
conversationBusyRetries < CONVERSATION_BUSY_MAX_RETRIES
) {
if (preStreamAction === "retry_conversation_busy") {
conversationBusyRetries += 1;
// Emit retry message for stream-json mode
@@ -1884,7 +1911,7 @@ ${SYSTEM_REMINDER_CLOSE}
async function runBidirectionalMode(
agent: AgentState,
conversationId: string,
_client: Letta,
client: Letta,
_outputFormat: string,
includePartialMessages: boolean,
): Promise<void> {
@@ -1908,6 +1935,130 @@ async function runBidirectionalMode(
// Track current operation for interrupt support
let currentAbortController: AbortController | null = null;
// Resolve pending approvals for this conversation before retrying user input.
const resolveAllPendingApprovals = async () => {
const { getResumeData } = await import("./agent/check-approval");
while (true) {
// Re-fetch agent to get latest in-context messages (source of truth for backend)
const freshAgent = await client.agents.retrieve(agent.id);
let resume: Awaited<ReturnType<typeof getResumeData>>;
try {
resume = await getResumeData(client, freshAgent, conversationId);
} catch (error) {
// Treat 404/422 as "no approvals" - stale message/conversation state
if (
error instanceof APIError &&
(error.status === 404 || error.status === 422)
) {
break;
}
throw error;
}
const pendingApprovals = resume.pendingApprovals || [];
if (pendingApprovals.length === 0) break;
type Decision =
| {
type: "approve";
approval: {
toolCallId: string;
toolName: string;
toolArgs: string;
};
reason: string;
matchedRule: string;
}
| {
type: "deny";
approval: {
toolCallId: string;
toolName: string;
toolArgs: string;
};
reason: string;
};
const { autoAllowed, autoDenied } = await classifyApprovals(
pendingApprovals,
{
treatAskAsDeny: true,
denyReasonForAsk: "Tool requires approval (headless mode)",
requireArgsForAutoApprove: true,
missingNameReason: "Tool call incomplete - missing name",
},
);
const decisions: Decision[] = [
...autoAllowed.map((ac) => ({
type: "approve" as const,
approval: ac.approval,
reason: ac.permission.reason || "Allowed by permission rule",
matchedRule:
"matchedRule" in ac.permission && ac.permission.matchedRule
? ac.permission.matchedRule
: "auto-approved",
})),
...autoDenied.map((ac) => {
const fallback =
"matchedRule" in ac.permission && ac.permission.matchedRule
? `Permission denied: ${ac.permission.matchedRule}`
: ac.permission.reason
? `Permission denied: ${ac.permission.reason}`
: "Permission denied: Unknown reason";
return {
type: "deny" as const,
approval: ac.approval,
reason: ac.denyReason ?? fallback,
};
}),
];
const { executeApprovalBatch } = await import(
"./agent/approval-execution"
);
const executedResults = await executeApprovalBatch(decisions);
const approvalInput: ApprovalCreate = {
type: "approval",
approvals: executedResults as ApprovalResult[],
};
const approvalMessages: Array<
| import("@letta-ai/letta-client/resources/agents/agents").MessageCreate
| import("@letta-ai/letta-client/resources/agents/messages").ApprovalCreate
> = [approvalInput];
{
const { consumeQueuedSkillContent } = await import(
"./tools/impl/skillContentRegistry"
);
const skillContents = consumeQueuedSkillContent();
if (skillContents.length > 0) {
approvalMessages.push({
role: "user" as const,
content: skillContents.map((sc) => ({
type: "text" as const,
text: sc.content,
})),
});
}
}
const approvalStream = await sendMessageStream(
conversationId,
approvalMessages,
{ agentId: agent.id },
);
await drainStreamWithResume(
approvalStream,
createBuffers(agent.id),
() => {},
);
}
};
// Create readline interface for stdin
const rl = readline.createInterface({
input: process.stdin,
@@ -2257,10 +2408,54 @@ async function runBidirectionalMode(
}
}
// Send message to agent
const stream = await sendMessageStream(conversationId, currentInput, {
agentId: agent.id,
});
// Send message to agent.
// Wrap in try-catch to handle pre-stream 409 approval-pending errors.
let stream: Awaited<ReturnType<typeof sendMessageStream>>;
try {
stream = await sendMessageStream(conversationId, currentInput, {
agentId: agent.id,
});
} catch (preStreamError) {
let errorDetail = "";
if (
preStreamError instanceof APIError &&
preStreamError.error &&
typeof preStreamError.error === "object"
) {
const errObj = preStreamError.error as Record<string, unknown>;
if (
errObj.error &&
typeof errObj.error === "object" &&
"detail" in errObj.error
) {
const nested = errObj.error as Record<string, unknown>;
errorDetail =
typeof nested.detail === "string" ? nested.detail : "";
}
if (!errorDetail && typeof errObj.detail === "string") {
errorDetail = errObj.detail;
}
}
if (!errorDetail && preStreamError instanceof Error) {
errorDetail = preStreamError.message;
}
if (isApprovalPendingError(errorDetail)) {
const recoveryMsg: RecoveryMessage = {
type: "recovery",
recovery_type: "approval_pending",
message:
"Detected pending approval conflict on send; resolving before retry",
session_id: sessionId,
uuid: `recovery-bidir-${crypto.randomUUID()}`,
};
console.log(JSON.stringify(recoveryMsg));
await resolveAllPendingApprovals();
continue;
}
throw preStreamError;
}
const streamJsonHook: DrainStreamHook = ({
chunk,
shouldOutput,

View File

@@ -0,0 +1,277 @@
import { describe, expect, test } from "bun:test";
import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process";
const TOOL_TRIGGER_PROMPT =
"Use the Bash tool exactly once with command: echo test123. Do not ask clarifying questions.";
const FOLLOWUP_PROMPT = "Say OK only. Do not call tools.";
interface StreamMessage {
type?: string;
subtype?: string;
message_type?: string;
recovery_type?: string;
conversation_id?: string;
request?: { subtype?: string };
[key: string]: unknown;
}
interface PendingApprovalSession {
conversationId: string;
stop: () => void;
messages: StreamMessage[];
}
function parseJsonLines(text: string): StreamMessage[] {
return text
.split(/\r?\n/)
.map((line) => line.trim())
.filter(Boolean)
.flatMap((line) => {
try {
return [JSON.parse(line) as StreamMessage];
} catch {
return [];
}
});
}
async function startPendingApprovalSession(
timeoutMs = 180000,
): Promise<PendingApprovalSession> {
return new Promise((resolve, reject) => {
const proc: ChildProcessWithoutNullStreams = spawn(
"bun",
[
"run",
"dev",
"--input-format",
"stream-json",
"--output-format",
"stream-json",
"--new-agent",
"--new",
"-m",
"haiku",
],
{
cwd: process.cwd(),
env: { ...process.env, LETTA_CODE_AGENT_ROLE: "subagent" },
},
);
let stdoutBuffer = "";
let stderrBuffer = "";
const messages: StreamMessage[] = [];
let settled = false;
let conversationId: string | undefined;
let promptAttempts = 0;
const sendPrompt = () => {
if (promptAttempts >= 3) return;
promptAttempts += 1;
proc.stdin.write(
`${JSON.stringify({
type: "user",
message: { role: "user", content: TOOL_TRIGGER_PROMPT },
})}\n`,
);
};
const stop = () => {
proc.stdin.end();
proc.kill();
};
const timeout = setTimeout(() => {
if (settled) return;
settled = true;
stop();
reject(
new Error(
`Timed out waiting for pending approval after ${timeoutMs}ms\nSTDERR:\n${stderrBuffer}`,
),
);
}, timeoutMs);
const complete = () => {
if (!conversationId) {
settled = true;
clearTimeout(timeout);
stop();
reject(
new Error(
"Pending approval detected before conversation ID was known",
),
);
return;
}
settled = true;
clearTimeout(timeout);
resolve({ conversationId, stop, messages });
};
const onMessage = (msg: StreamMessage) => {
messages.push(msg);
if (
msg.type === "system" &&
msg.subtype === "init" &&
typeof msg.conversation_id === "string"
) {
conversationId = msg.conversation_id;
sendPrompt();
return;
}
// If model responded without tool call, retry prompt up to max attempts.
if (msg.type === "result" && promptAttempts < 3) {
sendPrompt();
return;
}
// Pending approval is active when bidirectional mode asks for permission.
if (
msg.type === "control_request" &&
msg.request?.subtype === "can_use_tool"
) {
complete();
}
};
proc.stdout.on("data", (data) => {
stdoutBuffer += data.toString();
const lines = stdoutBuffer.split(/\r?\n/);
stdoutBuffer = lines.pop() || "";
for (const line of lines) {
try {
onMessage(JSON.parse(line));
} catch {
// Ignore non-JSON output lines
}
}
});
proc.stderr.on("data", (data) => {
stderrBuffer += data.toString();
});
proc.on("close", (code) => {
if (settled) return;
settled = true;
clearTimeout(timeout);
reject(
new Error(
`Pending-approval process exited early (code=${code ?? "null"})\nSTDERR:\n${stderrBuffer}`,
),
);
});
proc.on("error", (error) => {
if (settled) return;
settled = true;
clearTimeout(timeout);
reject(error);
});
});
}
async function runOneShotAgainstConversation(
conversationId: string,
timeoutMs = 180000,
): Promise<{ code: number | null; messages: StreamMessage[]; stderr: string }> {
return new Promise((resolve, reject) => {
const proc = spawn(
"bun",
[
"run",
"dev",
"-p",
FOLLOWUP_PROMPT,
"--conversation",
conversationId,
"--output-format",
"stream-json",
],
{
cwd: process.cwd(),
env: { ...process.env, LETTA_CODE_AGENT_ROLE: "subagent" },
},
);
let stdout = "";
let stderr = "";
let settled = false;
const timeout = setTimeout(() => {
if (settled) return;
settled = true;
proc.kill();
reject(
new Error(`Timed out waiting for one-shot run after ${timeoutMs}ms`),
);
}, timeoutMs);
proc.stdout.on("data", (data) => {
stdout += data.toString();
});
proc.stderr.on("data", (data) => {
stderr += data.toString();
});
proc.on("close", (code) => {
if (settled) return;
settled = true;
clearTimeout(timeout);
resolve({ code, messages: parseJsonLines(stdout), stderr });
});
proc.on("error", (error) => {
if (settled) return;
settled = true;
clearTimeout(timeout);
reject(error);
});
});
}
describe("pre-stream approval recovery", () => {
const maybeTest =
process.env.LETTA_RUN_PRESTREAM_APPROVAL_RECOVERY_TEST === "1"
? test
: test.skip;
maybeTest(
"recovers from pre-stream approval conflict and retries successfully",
async () => {
const pending = await startPendingApprovalSession();
try {
const result = await runOneShotAgainstConversation(
pending.conversationId,
);
if (result.code !== 0) {
throw new Error(
`Expected one-shot run to succeed, got exit code ${result.code}\nSTDERR:\n${result.stderr}`,
);
}
const recoveryEvent = result.messages.find(
(m) =>
m.type === "recovery" && m.recovery_type === "approval_pending",
);
expect(recoveryEvent).toBeDefined();
const resultEvent = result.messages.find((m) => m.type === "result");
expect(resultEvent).toBeDefined();
expect(resultEvent?.subtype).toBe("success");
} finally {
pending.stop();
}
},
240000,
);
});

View File

@@ -1,6 +1,8 @@
import { describe, expect, test } from "bun:test";
import type { Message } from "@letta-ai/letta-client/resources/agents/messages";
import {
classifyPreStreamConflict,
getPreStreamErrorAction,
isApprovalPendingError,
isConversationBusyError,
isInvalidToolCallIdsError,
@@ -63,15 +65,23 @@ describe("isApprovalPendingError", () => {
});
test("detects approval pending error case-insensitively", () => {
expect(isApprovalPendingError("CANNOT SEND A NEW MESSAGE")).toBe(true);
expect(isApprovalPendingError("cannot send a new message")).toBe(true);
expect(isApprovalPendingError("WAITING FOR APPROVAL")).toBe(true);
expect(isApprovalPendingError("waiting for approval")).toBe(true);
});
test("detects partial match in longer message", () => {
const detail = "Error occurred: Cannot send a new message while processing";
const detail =
"Error occurred: agent is waiting for approval while processing";
expect(isApprovalPendingError(detail)).toBe(true);
});
test("does not misclassify conversation busy conflict as approval pending", () => {
const busyDetail =
"CONFLICT: Cannot send a new message: Another request is currently being processed for this conversation.";
expect(isConversationBusyError(busyDetail)).toBe(true);
expect(isApprovalPendingError(busyDetail)).toBe(false);
});
test("returns false for desync errors (opposite case)", () => {
// These are the OPPOSITE error - when we send approval but there's nothing pending
expect(
@@ -138,6 +148,55 @@ describe("isConversationBusyError", () => {
});
});
describe("classifyPreStreamConflict", () => {
test("classifies approval-pending conflict distinctly from busy conflict", () => {
const approvalDetail =
"CONFLICT: Cannot send a new message: The agent is waiting for approval on a tool call.";
const busyDetail =
"CONFLICT: Cannot send a new message: Another request is currently being processed for this conversation.";
expect(classifyPreStreamConflict(approvalDetail)).toBe("approval_pending");
expect(classifyPreStreamConflict(busyDetail)).toBe("conversation_busy");
});
test("returns null for non-conflict errors", () => {
expect(classifyPreStreamConflict("Rate limit exceeded")).toBeNull();
});
});
describe("getPreStreamErrorAction", () => {
test("returns resolve_approval_pending for approval conflict details", () => {
const detail =
"CONFLICT: Cannot send a new message: The agent is waiting for approval on a tool call.";
expect(getPreStreamErrorAction(detail, 0, 1)).toBe(
"resolve_approval_pending",
);
});
test("returns retry_conversation_busy when busy and retries remain", () => {
const detail =
"CONFLICT: Cannot send a new message: Another request is currently being processed for this conversation.";
expect(getPreStreamErrorAction(detail, 0, 1)).toBe(
"retry_conversation_busy",
);
});
test("returns rethrow when conversation busy retries are exhausted", () => {
const detail =
"CONFLICT: Cannot send a new message: Another request is currently being processed for this conversation.";
expect(getPreStreamErrorAction(detail, 1, 1)).toBe("rethrow");
});
test("returns rethrow for unrelated errors", () => {
expect(getPreStreamErrorAction("Rate limit exceeded", 0, 1)).toBe(
"rethrow",
);
});
});
/**
* Tests for parallel tool call approval extraction.
* Ensures lazy recovery handles multiple simultaneous tool calls correctly.