From 648cadf8cd5a04ca054c99a4c601ea25d8b7e278 Mon Sep 17 00:00:00 2001 From: Devansh Jain <31609257+devanshrj@users.noreply.github.com> Date: Tue, 23 Dec 2025 18:31:19 -0800 Subject: [PATCH] feat: Per-resource queues for parallel tool execution (#379) --- src/agent/approval-execution.ts | 156 +++++++++++++++++++++++++------- 1 file changed, 123 insertions(+), 33 deletions(-) diff --git a/src/agent/approval-execution.ts b/src/agent/approval-execution.ts index a8dee4d..994e61d 100644 --- a/src/agent/approval-execution.ts +++ b/src/agent/approval-execution.ts @@ -1,5 +1,6 @@ // src/agent/approval-execution.ts // Shared logic for executing approval batches (used by both interactive and headless modes) +import * as path from "node:path"; import type { ApprovalReturn, ToolReturn, @@ -61,6 +62,76 @@ function isParallelSafe(toolName: string): boolean { return PARALLEL_SAFE_TOOLS.has(toolName); } +/** + * Tools that modify a single file and use `file_path` as their resource identifier. + * These can run in parallel when targeting different files. + */ +const FILE_PATH_TOOLS = new Set([ + // Anthropic toolset + "Edit", + "Write", + "MultiEdit", + // Gemini toolset + "replace", + "write_file_gemini", + "Replace", + "WriteFileGemini", +]); + +/** + * Tools that use a global lock (can touch multiple resources or have arbitrary side effects). + * These must serialize with ALL other write tools to prevent race conditions. + */ +const GLOBAL_LOCK_TOOLS = new Set([ + // Shell tools (arbitrary side effects) + "Bash", + "KillBash", + "run_shell_command", + "RunShellCommand", + "shell_command", + "shell", + "ShellCommand", + "Shell", + // Patch tools (can touch multiple files in a single operation) + "apply_patch", + "ApplyPatch", +]); + +/** + * Extract the resource key for a tool execution. + * Tools with the same resource key must be serialized to avoid race conditions. + * + * Note: Only call this for non-parallel-safe tools. Use isParallelSafe() first. + * + * @param toolName - The name of the tool being executed + * @param toolArgs - The arguments passed to the tool + * @returns Resource key string for grouping + */ +export function getResourceKey( + toolName: string, + toolArgs: Record, +): string { + // Global lock tools serialize with everything + if (GLOBAL_LOCK_TOOLS.has(toolName)) { + return "__global__"; + } + + // File-based tools use the file path as resource key + if (FILE_PATH_TOOLS.has(toolName)) { + const filePath = toolArgs.file_path; + if (typeof filePath === "string") { + // Normalize to absolute path for consistent comparison + const userCwd = process.env.USER_CWD || process.cwd(); + return path.isAbsolute(filePath) + ? path.normalize(filePath) + : path.resolve(userCwd, filePath); + } + } + + // Unknown tools or missing file_path get global lock for safety + return "__global__"; +} + /** Result format expected by App.tsx for auto-allowed tools */ export type AutoAllowedResult = { toolCallId: string; @@ -214,8 +285,12 @@ async function executeSingleDecision( * - Executing approved tools (with error handling) * - Formatting denials * - Combining all results into a single batch - * - Parallel-safe tools (read-only + Task) are executed in parallel for performance - * - Write/stateful tools (Edit, Write, Bash, etc.) are executed sequentially to avoid race conditions + * + * Execution strategy for performance: + * - Parallel-safe tools (read-only + Task) are executed in parallel + * - Write tools are grouped by resource (file path) and executed with per-resource queuing: + * - Different resources → parallel execution + * - Same resource → sequential execution to avoid race conditions * * Used by both interactive (App.tsx) and headless (headless.ts) modes. * @@ -233,46 +308,61 @@ export async function executeApprovalBatch( null, ); - // Identify parallel-safe tools (read-only + Task) + // Categorize decisions by execution strategy const parallelIndices: number[] = []; + const writeToolsByResource = new Map(); + const denyIndices: number[] = []; + for (let i = 0; i < decisions.length; i++) { const decision = decisions[i]; - if ( - decision && - decision.type === "approve" && - isParallelSafe(decision.approval.toolName) - ) { + if (!decision) continue; + + if (decision.type === "deny") { + denyIndices.push(i); + continue; + } + + const toolName = decision.approval.toolName; + + if (isParallelSafe(toolName)) { parallelIndices.push(i); + } else { + // Get resource key for write tools + const args = + typeof decision.approval.toolArgs === "string" + ? JSON.parse(decision.approval.toolArgs) + : decision.approval.toolArgs || {}; + const resourceKey = getResourceKey(toolName, args); + + const indices = writeToolsByResource.get(resourceKey) || []; + indices.push(i); + writeToolsByResource.set(resourceKey, indices); } } - // Execute write/stateful tools sequentially to avoid race conditions - for (let i = 0; i < decisions.length; i++) { + // Helper to execute a decision and store result + const execute = async (i: number) => { const decision = decisions[i]; - if (!decision || parallelIndices.includes(i)) continue; - results[i] = await executeSingleDecision(decision, onChunk, options); - } - - // Execute parallel-safe tools (read-only + Task) in parallel - if (parallelIndices.length > 0) { - const parallelDecisions = parallelIndices - .map((i) => decisions[i]) - .filter((d): d is ApprovalDecision => d !== undefined); - const parallelResults = await Promise.all( - parallelDecisions.map((decision) => - executeSingleDecision(decision, onChunk, options), - ), - ); - - // Place parallel results in original positions - for (let j = 0; j < parallelIndices.length; j++) { - const idx = parallelIndices[j]; - const result = parallelResults[j]; - if (idx !== undefined && result !== undefined) { - results[idx] = result; - } + if (decision) { + results[i] = await executeSingleDecision(decision, onChunk, options); } - } + }; + + // Execute all categories concurrently: + // 1. Parallel-safe tools (all in parallel) + // 2. Write tools grouped by resource (sequential within each group, parallel across groups) + // 3. Denials (no actual execution needed, but process for UI updates) + await Promise.all([ + // Parallel-safe tools + denials: all run in parallel + ...parallelIndices.map(execute), + ...denyIndices.map(execute), + // Write tools: sequential within each resource group, parallel across groups + ...Array.from(writeToolsByResource.values()).map(async (indices) => { + for (const i of indices) { + await execute(i); + } + }), + ]); // Filter out nulls (shouldn't happen, but TypeScript needs this) return results.filter((r): r is ApprovalResult => r !== null);