fix: Parallel tool execution race condition (#377)
This commit is contained in:
@@ -9,6 +9,64 @@ import type { ApprovalRequest } from "../cli/helpers/stream";
|
||||
import { INTERRUPTED_BY_USER } from "../constants";
|
||||
import { executeTool, type ToolExecutionResult } from "../tools/manager";
|
||||
|
||||
/**
|
||||
* Tools that are safe to execute in parallel (read-only or independent).
|
||||
* These tools don't modify files or shared state, so they can't race with each other.
|
||||
* Note: Bash/shell tools are intentionally excluded - they can run arbitrary commands that may write files.
|
||||
*
|
||||
* Includes equivalent tools across all toolsets (Anthropic, Codex/OpenAI, Gemini).
|
||||
*/
|
||||
const PARALLEL_SAFE_TOOLS = new Set([
|
||||
// === Anthropic toolset (default) ===
|
||||
"Read",
|
||||
"Grep",
|
||||
"Glob",
|
||||
|
||||
// === Codex/OpenAI toolset ===
|
||||
// snake_case variants
|
||||
"read_file",
|
||||
"list_dir",
|
||||
"grep_files",
|
||||
// PascalCase variants
|
||||
"ReadFile",
|
||||
"ListDir",
|
||||
"GrepFiles",
|
||||
|
||||
// === Gemini toolset ===
|
||||
// snake_case variants
|
||||
"read_file_gemini",
|
||||
"list_directory",
|
||||
"glob_gemini",
|
||||
"search_file_content",
|
||||
"read_many_files",
|
||||
// PascalCase variants
|
||||
"ReadFileGemini",
|
||||
"ListDirectory",
|
||||
"GlobGemini",
|
||||
"SearchFileContent",
|
||||
"ReadManyFiles",
|
||||
|
||||
// === Cross-toolset tools ===
|
||||
// Search/fetch tools (external APIs or read-only queries)
|
||||
"conversation_search",
|
||||
"web_search",
|
||||
"fetch_webpage",
|
||||
// Background shell output (read-only check)
|
||||
"BashOutput",
|
||||
// Task spawns independent subagents
|
||||
"Task",
|
||||
]);
|
||||
|
||||
function isParallelSafe(toolName: string): boolean {
|
||||
return PARALLEL_SAFE_TOOLS.has(toolName);
|
||||
}
|
||||
|
||||
/** Result format expected by App.tsx for auto-allowed tools */
|
||||
export type AutoAllowedResult = {
|
||||
toolCallId: string;
|
||||
result: ToolExecutionResult;
|
||||
};
|
||||
|
||||
export type ApprovalDecision =
|
||||
| {
|
||||
type: "approve";
|
||||
@@ -156,7 +214,8 @@ async function executeSingleDecision(
|
||||
* - Executing approved tools (with error handling)
|
||||
* - Formatting denials
|
||||
* - Combining all results into a single batch
|
||||
* - Task tools are executed in parallel for better performance
|
||||
* - Parallel-safe tools (read-only + Task) are executed in parallel for performance
|
||||
* - Write/stateful tools (Edit, Write, Bash, etc.) are executed sequentially to avoid race conditions
|
||||
*
|
||||
* Used by both interactive (App.tsx) and headless (headless.ts) modes.
|
||||
*
|
||||
@@ -174,41 +233,41 @@ export async function executeApprovalBatch(
|
||||
null,
|
||||
);
|
||||
|
||||
// Identify Task tools for parallel execution
|
||||
const taskIndices: number[] = [];
|
||||
// Identify parallel-safe tools (read-only + Task)
|
||||
const parallelIndices: number[] = [];
|
||||
for (let i = 0; i < decisions.length; i++) {
|
||||
const decision = decisions[i];
|
||||
if (
|
||||
decision &&
|
||||
decision.type === "approve" &&
|
||||
decision.approval.toolName === "Task"
|
||||
isParallelSafe(decision.approval.toolName)
|
||||
) {
|
||||
taskIndices.push(i);
|
||||
parallelIndices.push(i);
|
||||
}
|
||||
}
|
||||
|
||||
// Execute non-Task tools sequentially (existing behavior)
|
||||
// Execute write/stateful tools sequentially to avoid race conditions
|
||||
for (let i = 0; i < decisions.length; i++) {
|
||||
const decision = decisions[i];
|
||||
if (!decision || taskIndices.includes(i)) continue; // Skip Task tools for now
|
||||
if (!decision || parallelIndices.includes(i)) continue;
|
||||
results[i] = await executeSingleDecision(decision, onChunk, options);
|
||||
}
|
||||
|
||||
// Execute Task tools in parallel
|
||||
if (taskIndices.length > 0) {
|
||||
const taskDecisions = taskIndices
|
||||
// Execute parallel-safe tools (read-only + Task) in parallel
|
||||
if (parallelIndices.length > 0) {
|
||||
const parallelDecisions = parallelIndices
|
||||
.map((i) => decisions[i])
|
||||
.filter((d): d is ApprovalDecision => d !== undefined);
|
||||
const taskResults = await Promise.all(
|
||||
taskDecisions.map((decision) =>
|
||||
const parallelResults = await Promise.all(
|
||||
parallelDecisions.map((decision) =>
|
||||
executeSingleDecision(decision, onChunk, options),
|
||||
),
|
||||
);
|
||||
|
||||
// Place Task results in original positions
|
||||
for (let j = 0; j < taskIndices.length; j++) {
|
||||
const idx = taskIndices[j];
|
||||
const result = taskResults[j];
|
||||
// Place parallel results in original positions
|
||||
for (let j = 0; j < parallelIndices.length; j++) {
|
||||
const idx = parallelIndices[j];
|
||||
const result = parallelResults[j];
|
||||
if (idx !== undefined && result !== undefined) {
|
||||
results[idx] = result;
|
||||
}
|
||||
@@ -218,3 +277,38 @@ export async function executeApprovalBatch(
|
||||
// Filter out nulls (shouldn't happen, but TypeScript needs this)
|
||||
return results.filter((r): r is ApprovalResult => r !== null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper to execute auto-allowed tools and map results to the format expected by App.tsx.
|
||||
* Consolidates the common pattern of converting approvals to decisions, executing them,
|
||||
* and mapping the results back.
|
||||
*
|
||||
* @param autoAllowed - Array of auto-allowed approval contexts (must have .approval property)
|
||||
* @param onChunk - Callback to update UI with tool results
|
||||
* @param options - Optional abort signal for cancellation
|
||||
* @returns Array of results with toolCallId and ToolExecutionResult
|
||||
*/
|
||||
export async function executeAutoAllowedTools(
|
||||
autoAllowed: Array<{ approval: ApprovalRequest }>,
|
||||
onChunk: (chunk: ToolReturnMessage) => void,
|
||||
options?: { abortSignal?: AbortSignal },
|
||||
): Promise<AutoAllowedResult[]> {
|
||||
const decisions: ApprovalDecision[] = autoAllowed.map((ac) => ({
|
||||
type: "approve" as const,
|
||||
approval: ac.approval,
|
||||
}));
|
||||
|
||||
const batchResults = await executeApprovalBatch(decisions, onChunk, options);
|
||||
|
||||
return batchResults
|
||||
.filter((r): r is ApprovalResult & { type: "tool" } => r.type === "tool")
|
||||
.map((r) => ({
|
||||
toolCallId: r.tool_call_id,
|
||||
result: {
|
||||
toolReturn: r.tool_return,
|
||||
status: r.status,
|
||||
stdout: r.stdout,
|
||||
stderr: r.stderr,
|
||||
} as ToolExecutionResult,
|
||||
}));
|
||||
}
|
||||
|
||||
107
src/cli/App.tsx
107
src/cli/App.tsx
@@ -13,7 +13,10 @@ import type {
|
||||
import type { LlmConfig } from "@letta-ai/letta-client/resources/models/models";
|
||||
import { Box, Static, Text } from "ink";
|
||||
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
|
||||
import type { ApprovalResult } from "../agent/approval-execution";
|
||||
import {
|
||||
type ApprovalResult,
|
||||
executeAutoAllowedTools,
|
||||
} from "../agent/approval-execution";
|
||||
import { prefetchAvailableModelHandles } from "../agent/available-models";
|
||||
import { getResumeData } from "../agent/check-approval";
|
||||
import { getClient } from "../agent/client";
|
||||
@@ -1191,36 +1194,10 @@ export default function App({
|
||||
}
|
||||
}
|
||||
|
||||
// Execute auto-allowed tools
|
||||
const autoAllowedResults = await Promise.all(
|
||||
autoAllowed.map(async (ac) => {
|
||||
const parsedArgs = safeJsonParseOr<Record<string, unknown>>(
|
||||
ac.approval.toolArgs,
|
||||
{},
|
||||
);
|
||||
const result = await executeTool(
|
||||
ac.approval.toolName,
|
||||
parsedArgs,
|
||||
{ toolCallId: ac.approval.toolCallId },
|
||||
);
|
||||
|
||||
// Update buffers with tool return for UI
|
||||
onChunk(buffersRef.current, {
|
||||
message_type: "tool_return_message",
|
||||
id: "dummy",
|
||||
date: new Date().toISOString(),
|
||||
tool_call_id: ac.approval.toolCallId,
|
||||
tool_return: result.toolReturn,
|
||||
status: result.status,
|
||||
stdout: result.stdout,
|
||||
stderr: result.stderr,
|
||||
});
|
||||
|
||||
return {
|
||||
toolCallId: ac.approval.toolCallId,
|
||||
result,
|
||||
};
|
||||
}),
|
||||
// Execute auto-allowed tools (sequential for writes, parallel for reads)
|
||||
const autoAllowedResults = await executeAutoAllowedTools(
|
||||
autoAllowed,
|
||||
(chunk) => onChunk(buffersRef.current, chunk),
|
||||
);
|
||||
|
||||
// Create denial results for auto-denied tools and update buffers
|
||||
@@ -3516,36 +3493,10 @@ DO NOT respond to these messages or otherwise consider them in your response unl
|
||||
|
||||
// If all approvals can be auto-handled (yolo mode), process them immediately
|
||||
if (needsUserInput.length === 0) {
|
||||
// Execute auto-allowed tools
|
||||
const autoAllowedResults = await Promise.all(
|
||||
autoAllowed.map(async (ac) => {
|
||||
const parsedArgs = safeJsonParseOr<Record<string, unknown>>(
|
||||
ac.approval.toolArgs,
|
||||
{},
|
||||
);
|
||||
const result = await executeTool(
|
||||
ac.approval.toolName,
|
||||
parsedArgs,
|
||||
{ toolCallId: ac.approval.toolCallId },
|
||||
);
|
||||
|
||||
// Update buffers with tool return for UI
|
||||
onChunk(buffersRef.current, {
|
||||
message_type: "tool_return_message",
|
||||
id: "dummy",
|
||||
date: new Date().toISOString(),
|
||||
tool_call_id: ac.approval.toolCallId,
|
||||
tool_return: result.toolReturn,
|
||||
status: result.status,
|
||||
stdout: result.stdout,
|
||||
stderr: result.stderr,
|
||||
});
|
||||
|
||||
return {
|
||||
toolCallId: ac.approval.toolCallId,
|
||||
result,
|
||||
};
|
||||
}),
|
||||
// Execute auto-allowed tools (sequential for writes, parallel for reads)
|
||||
const autoAllowedResults = await executeAutoAllowedTools(
|
||||
autoAllowed,
|
||||
(chunk) => onChunk(buffersRef.current, chunk),
|
||||
);
|
||||
|
||||
// Create denial results for auto-denied and update UI
|
||||
@@ -3623,36 +3574,10 @@ DO NOT respond to these messages or otherwise consider them in your response unl
|
||||
.filter(Boolean) as ApprovalContext[],
|
||||
);
|
||||
|
||||
// Execute auto-allowed tools and store results
|
||||
const autoAllowedWithResults = await Promise.all(
|
||||
autoAllowed.map(async (ac) => {
|
||||
const parsedArgs = safeJsonParseOr<Record<string, unknown>>(
|
||||
ac.approval.toolArgs,
|
||||
{},
|
||||
);
|
||||
const result = await executeTool(
|
||||
ac.approval.toolName,
|
||||
parsedArgs,
|
||||
{ toolCallId: ac.approval.toolCallId },
|
||||
);
|
||||
|
||||
// Update buffers with tool return for UI
|
||||
onChunk(buffersRef.current, {
|
||||
message_type: "tool_return_message",
|
||||
id: "dummy",
|
||||
date: new Date().toISOString(),
|
||||
tool_call_id: ac.approval.toolCallId,
|
||||
tool_return: result.toolReturn,
|
||||
status: result.status,
|
||||
stdout: result.stdout,
|
||||
stderr: result.stderr,
|
||||
});
|
||||
|
||||
return {
|
||||
toolCallId: ac.approval.toolCallId,
|
||||
result,
|
||||
};
|
||||
}),
|
||||
// Execute auto-allowed tools (sequential for writes, parallel for reads)
|
||||
const autoAllowedWithResults = await executeAutoAllowedTools(
|
||||
autoAllowed,
|
||||
(chunk) => onChunk(buffersRef.current, chunk),
|
||||
);
|
||||
|
||||
// Create denial reasons for auto-denied and update UI
|
||||
|
||||
Reference in New Issue
Block a user