fix: support parallel tool calling in headless mode (#81)

Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
Charles Packer
2025-11-07 18:05:27 -08:00
committed by GitHub
parent 2845781811
commit ac0b929119

View File

@@ -185,26 +185,54 @@ export async function handleHeadlessCommand(
// Re-fetch agent to get latest in-context messages (source of truth for backend)
const freshAgent = await client.agents.retrieve(agent.id);
const resume = await getResumeData(client, freshAgent);
if (!resume.pendingApproval) break;
const { toolCallId, toolName, toolArgs } = resume.pendingApproval;
const parsedArgs = safeJsonParseOr<Record<string, unknown>>(
toolArgs || "{}",
{},
);
const permission = await checkToolPermission(toolName, parsedArgs);
let approvalInput: ApprovalCreate;
if (permission.decision === "deny" || permission.decision === "ask") {
const denyReason =
permission.decision === "ask"
? "Tool requires approval (headless mode)"
: `Permission denied: ${permission.matchedRule || permission.reason}`;
approvalInput = {
type: "approval",
approval_request_id: toolCallId,
approve: false,
reason: denyReason,
};
} else {
// Use plural field for parallel tool calls
const pendingApprovals = resume.pendingApprovals || [];
if (pendingApprovals.length === 0) break;
// Phase 1: Collect decisions for all approvals
type Decision =
| {
type: "approve";
approval: {
toolCallId: string;
toolName: string;
toolArgs: string;
};
}
| {
type: "deny";
approval: {
toolCallId: string;
toolName: string;
toolArgs: string;
};
reason: string;
};
const decisions: Decision[] = [];
for (const currentApproval of pendingApprovals) {
const { toolCallId, toolName, toolArgs } = currentApproval;
const parsedArgs = safeJsonParseOr<Record<string, unknown>>(
toolArgs || "{}",
{},
);
const permission = await checkToolPermission(toolName, parsedArgs);
if (permission.decision === "deny" || permission.decision === "ask") {
const denyReason =
permission.decision === "ask"
? "Tool requires approval (headless mode)"
: `Permission denied: ${permission.matchedRule || permission.reason}`;
decisions.push({
type: "deny",
approval: currentApproval,
reason: denyReason,
});
continue;
}
// Verify required args present; if missing, deny so the model retries with args
const { getToolSchema } = await import("./tools/manager");
const schema = getToolSchema(toolName);
@@ -215,41 +243,89 @@ export async function handleHeadlessCommand(
!(key in parsedArgs) || String(parsedArgs[key] ?? "").length === 0,
);
if (missing.length > 0) {
approvalInput = {
type: "approval",
approval_request_id: toolCallId,
approve: false,
decisions.push({
type: "deny",
approval: currentApproval,
reason: `Missing required parameter${missing.length > 1 ? "s" : ""}: ${missing.join(", ")}`,
};
} else {
const toolResult = await executeTool(toolName, parsedArgs);
// Emit auto_approval event for stream-json for visibility
if (outputFormat === "stream-json") {
console.log(
JSON.stringify({
type: "auto_approval",
tool_name: toolName,
tool_call_id: toolCallId,
reason: permission.reason,
matched_rule: permission.matchedRule,
}),
});
continue;
}
// Approve for execution
decisions.push({
type: "approve",
approval: currentApproval,
});
}
// Phase 2: Execute approved tools and format results
const executedResults: Array<{
type: "tool" | "approval";
tool_call_id: string;
tool_return?: string;
status?: "success" | "error";
stdout?: string[];
stderr?: string[];
approve?: boolean;
reason?: string;
}> = [];
for (const decision of decisions) {
if (decision.type === "approve") {
try {
const parsedArgs = safeJsonParseOr<Record<string, unknown>>(
decision.approval.toolArgs || "{}",
{},
);
const toolResult = await executeTool(
decision.approval.toolName,
parsedArgs,
);
// Emit auto_approval event for stream-json for visibility
if (outputFormat === "stream-json") {
console.log(
JSON.stringify({
type: "auto_approval",
tool_name: decision.approval.toolName,
tool_call_id: decision.approval.toolCallId,
}),
);
}
executedResults.push({
type: "tool",
tool_call_id: decision.approval.toolCallId,
tool_return: toolResult.toolReturn,
status: toolResult.status,
stdout: toolResult.stdout,
stderr: toolResult.stderr,
});
} catch (e) {
const errorMessage = `Error executing tool: ${String(e)}`;
executedResults.push({
type: "tool",
tool_call_id: decision.approval.toolCallId,
tool_return: errorMessage,
status: "error",
});
}
approvalInput = {
} else {
executedResults.push({
type: "approval",
approvals: [
{
type: "tool",
tool_call_id: toolCallId,
tool_return: toolResult.toolReturn,
status: toolResult.status,
stdout: toolResult.stdout,
stderr: toolResult.stderr,
},
],
};
tool_call_id: decision.approval.toolCallId,
approve: false,
reason: decision.reason,
});
}
}
// Send all results in one batch
const approvalInput: ApprovalCreate = {
type: "approval",
approvals: executedResults as any,
};
// Send the approval to clear the pending state; drain the stream without output
const approvalStream = await sendMessageStream(agent.id, [approvalInput]);
if (outputFormat === "stream-json") {
@@ -288,11 +364,11 @@ export async function handleHeadlessCommand(
// For stream-json, output each chunk as it arrives
let stopReason: StopReasonType;
let approval: {
let approvals: Array<{
toolCallId: string;
toolName: string;
toolArgs: string;
} | null = null;
}> = [];
let apiDurationMs: number;
let lastRunId: string | null = null;
@@ -409,12 +485,21 @@ export async function handleHeadlessCommand(
args: incomingArgs,
});
// Keep an up-to-date approval object for downstream handling
approval = {
// Keep an up-to-date approvals array for downstream handling
// Update existing approval if present, otherwise add new one
const existingIndex = approvals.findIndex(
(a) => a.toolCallId === id,
);
const approvalObj = {
toolCallId: id,
toolName: toolCall.name,
toolArgs: incomingArgs,
};
if (existingIndex >= 0) {
approvals[existingIndex] = approvalObj;
} else {
approvals.push(approvalObj);
}
// Check if this approval will be auto-approved. Dedup per tool_call_id
if (!autoApprovalEmitted.has(id)) {
@@ -498,7 +583,7 @@ export async function handleHeadlessCommand(
() => {}, // No UI refresh needed in headless mode
);
stopReason = result.stopReason;
approval = result.approval || null;
approvals = result.approvals || [];
apiDurationMs = result.apiDurationMs;
lastRunId = result.lastRunId || null;
}
@@ -511,87 +596,152 @@ export async function handleHeadlessCommand(
break;
}
// Case 2: Requires approval
// Case 2: Requires approval - batch process all approvals
if (stopReason === "requires_approval") {
if (!approval) {
console.error("Unexpected null approval");
if (approvals.length === 0) {
console.error("Unexpected empty approvals array");
process.exit(1);
}
const { toolCallId, toolName, toolArgs } = approval;
// Phase 1: Collect decisions for all approvals
type Decision =
| {
type: "approve";
approval: {
toolCallId: string;
toolName: string;
toolArgs: string;
};
}
| {
type: "deny";
approval: {
toolCallId: string;
toolName: string;
toolArgs: string;
};
reason: string;
};
// Check permission using existing permission system
const parsedArgs = safeJsonParseOr<Record<string, unknown>>(
toolArgs,
{},
);
const permission = await checkToolPermission(toolName, parsedArgs);
const decisions: Decision[] = [];
// Handle deny decision
if (permission.decision === "deny") {
const denyReason = `Permission denied: ${permission.matchedRule || permission.reason}`;
currentInput = [
{
type: "approval",
approval_request_id: toolCallId,
approve: false,
for (const currentApproval of approvals) {
const { toolCallId, toolName, toolArgs } = currentApproval;
// Check permission using existing permission system
const parsedArgs = safeJsonParseOr<Record<string, unknown>>(
toolArgs,
{},
);
const permission = await checkToolPermission(toolName, parsedArgs);
// Handle deny decision
if (permission.decision === "deny") {
const denyReason = `Permission denied: ${permission.matchedRule || permission.reason}`;
decisions.push({
type: "deny",
approval: currentApproval,
reason: denyReason,
},
];
continue;
}
});
continue;
}
// Handle ask decision - in headless mode, auto-deny
if (permission.decision === "ask") {
currentInput = [
{
type: "approval",
approval_request_id: toolCallId,
approve: false,
// Handle ask decision - in headless mode, auto-deny
if (permission.decision === "ask") {
decisions.push({
type: "deny",
approval: currentApproval,
reason: "Tool requires approval (headless mode)",
},
];
continue;
}
});
continue;
}
// Permission is "allow" - verify we have required arguments before executing
const { getToolSchema } = await import("./tools/manager");
const schema = getToolSchema(toolName);
const required =
(schema?.input_schema?.required as string[] | undefined) || [];
const missing = required.filter(
(key) =>
!(key in parsedArgs) || String(parsedArgs[key] ?? "").length === 0,
);
if (missing.length > 0) {
// Auto-deny with a clear reason so the model can retry with arguments
currentInput = [
{
type: "approval",
approval_request_id: toolCallId,
approve: false,
// Permission is "allow" - verify we have required arguments before executing
const { getToolSchema } = await import("./tools/manager");
const schema = getToolSchema(toolName);
const required =
(schema?.input_schema?.required as string[] | undefined) || [];
const missing = required.filter(
(key) =>
!(key in parsedArgs) ||
String(parsedArgs[key] ?? "").length === 0,
);
if (missing.length > 0) {
// Auto-deny with a clear reason so the model can retry with arguments
decisions.push({
type: "deny",
approval: currentApproval,
reason: `Missing required parameter${missing.length > 1 ? "s" : ""}: ${missing.join(", ")}`,
},
];
continue;
});
continue;
}
// Approve this tool for execution
decisions.push({
type: "approve",
approval: currentApproval,
});
}
// Execute tool and continue loop
const toolResult = await executeTool(toolName, parsedArgs);
// Phase 2: Execute all approved tools and format results
const executedResults: Array<{
type: "tool" | "approval";
tool_call_id: string;
tool_return?: string;
status?: "success" | "error";
stdout?: string[];
stderr?: string[];
approve?: boolean;
reason?: string;
}> = [];
currentInput = [
{
type: "approval",
approvals: [
{
for (const decision of decisions) {
if (decision.type === "approve") {
// Execute the approved tool
try {
const parsedArgs = safeJsonParseOr<Record<string, unknown>>(
decision.approval.toolArgs,
{},
);
const toolResult = await executeTool(
decision.approval.toolName,
parsedArgs,
);
executedResults.push({
type: "tool",
tool_call_id: toolCallId,
tool_call_id: decision.approval.toolCallId,
tool_return: toolResult.toolReturn,
status: toolResult.status,
stdout: toolResult.stdout,
stderr: toolResult.stderr,
},
],
});
} catch (e) {
// Still need to send error result to backend for this tool
const errorMessage = `Error executing tool: ${String(e)}`;
executedResults.push({
type: "tool",
tool_call_id: decision.approval.toolCallId,
tool_return: errorMessage,
status: "error",
});
}
} else {
// Format denial for backend
executedResults.push({
type: "approval",
tool_call_id: decision.approval.toolCallId,
approve: false,
reason: decision.reason,
});
}
}
// Send all results in one batch
currentInput = [
{
type: "approval",
approvals: executedResults as any,
},
];
continue;