feat: Per-resource queues for parallel tool execution (#379)
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
// src/agent/approval-execution.ts
|
||||
// Shared logic for executing approval batches (used by both interactive and headless modes)
|
||||
import * as path from "node:path";
|
||||
import type {
|
||||
ApprovalReturn,
|
||||
ToolReturn,
|
||||
@@ -61,6 +62,76 @@ function isParallelSafe(toolName: string): boolean {
|
||||
return PARALLEL_SAFE_TOOLS.has(toolName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tools that modify a single file and use `file_path` as their resource identifier.
|
||||
* These can run in parallel when targeting different files.
|
||||
*/
|
||||
const FILE_PATH_TOOLS = new Set([
|
||||
// Anthropic toolset
|
||||
"Edit",
|
||||
"Write",
|
||||
"MultiEdit",
|
||||
// Gemini toolset
|
||||
"replace",
|
||||
"write_file_gemini",
|
||||
"Replace",
|
||||
"WriteFileGemini",
|
||||
]);
|
||||
|
||||
/**
|
||||
* Tools that use a global lock (can touch multiple resources or have arbitrary side effects).
|
||||
* These must serialize with ALL other write tools to prevent race conditions.
|
||||
*/
|
||||
const GLOBAL_LOCK_TOOLS = new Set([
|
||||
// Shell tools (arbitrary side effects)
|
||||
"Bash",
|
||||
"KillBash",
|
||||
"run_shell_command",
|
||||
"RunShellCommand",
|
||||
"shell_command",
|
||||
"shell",
|
||||
"ShellCommand",
|
||||
"Shell",
|
||||
// Patch tools (can touch multiple files in a single operation)
|
||||
"apply_patch",
|
||||
"ApplyPatch",
|
||||
]);
|
||||
|
||||
/**
|
||||
* Extract the resource key for a tool execution.
|
||||
* Tools with the same resource key must be serialized to avoid race conditions.
|
||||
*
|
||||
* Note: Only call this for non-parallel-safe tools. Use isParallelSafe() first.
|
||||
*
|
||||
* @param toolName - The name of the tool being executed
|
||||
* @param toolArgs - The arguments passed to the tool
|
||||
* @returns Resource key string for grouping
|
||||
*/
|
||||
export function getResourceKey(
|
||||
toolName: string,
|
||||
toolArgs: Record<string, unknown>,
|
||||
): string {
|
||||
// Global lock tools serialize with everything
|
||||
if (GLOBAL_LOCK_TOOLS.has(toolName)) {
|
||||
return "__global__";
|
||||
}
|
||||
|
||||
// File-based tools use the file path as resource key
|
||||
if (FILE_PATH_TOOLS.has(toolName)) {
|
||||
const filePath = toolArgs.file_path;
|
||||
if (typeof filePath === "string") {
|
||||
// Normalize to absolute path for consistent comparison
|
||||
const userCwd = process.env.USER_CWD || process.cwd();
|
||||
return path.isAbsolute(filePath)
|
||||
? path.normalize(filePath)
|
||||
: path.resolve(userCwd, filePath);
|
||||
}
|
||||
}
|
||||
|
||||
// Unknown tools or missing file_path get global lock for safety
|
||||
return "__global__";
|
||||
}
|
||||
|
||||
/** Result format expected by App.tsx for auto-allowed tools */
|
||||
export type AutoAllowedResult = {
|
||||
toolCallId: string;
|
||||
@@ -214,8 +285,12 @@ async function executeSingleDecision(
|
||||
* - Executing approved tools (with error handling)
|
||||
* - Formatting denials
|
||||
* - Combining all results into a single batch
|
||||
* - Parallel-safe tools (read-only + Task) are executed in parallel for performance
|
||||
* - Write/stateful tools (Edit, Write, Bash, etc.) are executed sequentially to avoid race conditions
|
||||
*
|
||||
* Execution strategy for performance:
|
||||
* - Parallel-safe tools (read-only + Task) are executed in parallel
|
||||
* - Write tools are grouped by resource (file path) and executed with per-resource queuing:
|
||||
* - Different resources → parallel execution
|
||||
* - Same resource → sequential execution to avoid race conditions
|
||||
*
|
||||
* Used by both interactive (App.tsx) and headless (headless.ts) modes.
|
||||
*
|
||||
@@ -233,46 +308,61 @@ export async function executeApprovalBatch(
|
||||
null,
|
||||
);
|
||||
|
||||
// Identify parallel-safe tools (read-only + Task)
|
||||
// Categorize decisions by execution strategy
|
||||
const parallelIndices: number[] = [];
|
||||
const writeToolsByResource = new Map<string, number[]>();
|
||||
const denyIndices: number[] = [];
|
||||
|
||||
for (let i = 0; i < decisions.length; i++) {
|
||||
const decision = decisions[i];
|
||||
if (
|
||||
decision &&
|
||||
decision.type === "approve" &&
|
||||
isParallelSafe(decision.approval.toolName)
|
||||
) {
|
||||
if (!decision) continue;
|
||||
|
||||
if (decision.type === "deny") {
|
||||
denyIndices.push(i);
|
||||
continue;
|
||||
}
|
||||
|
||||
const toolName = decision.approval.toolName;
|
||||
|
||||
if (isParallelSafe(toolName)) {
|
||||
parallelIndices.push(i);
|
||||
} else {
|
||||
// Get resource key for write tools
|
||||
const args =
|
||||
typeof decision.approval.toolArgs === "string"
|
||||
? JSON.parse(decision.approval.toolArgs)
|
||||
: decision.approval.toolArgs || {};
|
||||
const resourceKey = getResourceKey(toolName, args);
|
||||
|
||||
const indices = writeToolsByResource.get(resourceKey) || [];
|
||||
indices.push(i);
|
||||
writeToolsByResource.set(resourceKey, indices);
|
||||
}
|
||||
}
|
||||
|
||||
// Execute write/stateful tools sequentially to avoid race conditions
|
||||
for (let i = 0; i < decisions.length; i++) {
|
||||
// Helper to execute a decision and store result
|
||||
const execute = async (i: number) => {
|
||||
const decision = decisions[i];
|
||||
if (!decision || parallelIndices.includes(i)) continue;
|
||||
results[i] = await executeSingleDecision(decision, onChunk, options);
|
||||
}
|
||||
|
||||
// Execute parallel-safe tools (read-only + Task) in parallel
|
||||
if (parallelIndices.length > 0) {
|
||||
const parallelDecisions = parallelIndices
|
||||
.map((i) => decisions[i])
|
||||
.filter((d): d is ApprovalDecision => d !== undefined);
|
||||
const parallelResults = await Promise.all(
|
||||
parallelDecisions.map((decision) =>
|
||||
executeSingleDecision(decision, onChunk, options),
|
||||
),
|
||||
);
|
||||
|
||||
// Place parallel results in original positions
|
||||
for (let j = 0; j < parallelIndices.length; j++) {
|
||||
const idx = parallelIndices[j];
|
||||
const result = parallelResults[j];
|
||||
if (idx !== undefined && result !== undefined) {
|
||||
results[idx] = result;
|
||||
}
|
||||
if (decision) {
|
||||
results[i] = await executeSingleDecision(decision, onChunk, options);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Execute all categories concurrently:
|
||||
// 1. Parallel-safe tools (all in parallel)
|
||||
// 2. Write tools grouped by resource (sequential within each group, parallel across groups)
|
||||
// 3. Denials (no actual execution needed, but process for UI updates)
|
||||
await Promise.all([
|
||||
// Parallel-safe tools + denials: all run in parallel
|
||||
...parallelIndices.map(execute),
|
||||
...denyIndices.map(execute),
|
||||
// Write tools: sequential within each resource group, parallel across groups
|
||||
...Array.from(writeToolsByResource.values()).map(async (indices) => {
|
||||
for (const i of indices) {
|
||||
await execute(i);
|
||||
}
|
||||
}),
|
||||
]);
|
||||
|
||||
// Filter out nulls (shouldn't happen, but TypeScript needs this)
|
||||
return results.filter((r): r is ApprovalResult => r !== null);
|
||||
|
||||
Reference in New Issue
Block a user