// src/agent/approval-execution.ts // Shared logic for executing approval batches (used by both interactive and headless modes) import * as path from "node:path"; import type { ApprovalReturn, TextContent, ToolReturn, } from "@letta-ai/letta-client/resources/agents/messages"; import type { ToolReturnMessage } from "@letta-ai/letta-client/resources/tools"; import type { ApprovalRequest } from "../cli/helpers/stream"; import { INTERRUPTED_BY_USER } from "../constants"; import { captureToolExecutionContext, executeTool, type ToolExecutionResult, type ToolReturnContent, } from "../tools/manager"; /** * Extract displayable text from tool return content (for UI display). * Multimodal content returns the text parts concatenated. */ export function getDisplayableToolReturn(content: ToolReturnContent): string { if (typeof content === "string") { return content; } // Extract text from multimodal content return content .filter((part): part is TextContent => part.type === "text") .map((part) => part.text) .join("\n"); } /** * Tools that are safe to execute in parallel (read-only or independent). * These tools don't modify files or shared state, so they can't race with each other. * Note: Bash/shell tools are intentionally excluded - they can run arbitrary commands that may write files. * * Includes equivalent tools across all toolsets (Anthropic, Codex/OpenAI, Gemini). */ const PARALLEL_SAFE_TOOLS = new Set([ // === Anthropic toolset (default) === "Read", "view_image", "ViewImage", "Grep", "Glob", // === Codex/OpenAI toolset === // snake_case variants "read_file", "list_dir", "grep_files", // PascalCase variants "ReadFile", "ListDir", "GrepFiles", // === Gemini toolset === // snake_case variants "read_file_gemini", "list_directory", "glob_gemini", "search_file_content", "read_many_files", // PascalCase variants "ReadFileGemini", "ListDirectory", "GlobGemini", "SearchFileContent", "ReadManyFiles", // === Cross-toolset tools === // Search/fetch tools (external APIs or read-only queries) "conversation_search", "web_search", "fetch_webpage", // Background task output (read-only check) "TaskOutput", // Task spawns independent subagents "Task", // Plan mode tools (no parameters, no file operations) "EnterPlanMode", "ExitPlanMode", ]); function isParallelSafe(toolName: string): boolean { return PARALLEL_SAFE_TOOLS.has(toolName); } /** * Tools that modify a single file and use `file_path` as their resource identifier. * These can run in parallel when targeting different files. */ const FILE_PATH_TOOLS = new Set([ // Anthropic toolset "Edit", "Write", "MultiEdit", // Gemini toolset "replace", "write_file_gemini", "Replace", "WriteFileGemini", ]); /** * Tools that use a global lock (can touch multiple resources or have arbitrary side effects). * These must serialize with ALL other write tools to prevent race conditions. */ const GLOBAL_LOCK_TOOLS = new Set([ // Shell tools (arbitrary side effects) "Bash", "KillBash", "run_shell_command", "RunShellCommand", // Memory tool (file + git side effects) "memory", "shell_command", "shell", "ShellCommand", "Shell", // Patch tools (can touch multiple files in a single operation) "apply_patch", "ApplyPatch", ]); /** * Extract the resource key for a tool execution. * Tools with the same resource key must be serialized to avoid race conditions. * * Note: Only call this for non-parallel-safe tools. Use isParallelSafe() first. * * @param toolName - The name of the tool being executed * @param toolArgs - The arguments passed to the tool * @returns Resource key string for grouping */ export function getResourceKey( toolName: string, toolArgs: Record, workingDirectory: string = process.env.USER_CWD || process.cwd(), ): string { // Global lock tools serialize with everything if (GLOBAL_LOCK_TOOLS.has(toolName)) { return "__global__"; } // File-based tools use the file path as resource key if (FILE_PATH_TOOLS.has(toolName)) { const filePath = toolArgs.file_path; if (typeof filePath === "string") { // Normalize to absolute path for consistent comparison return path.isAbsolute(filePath) ? path.normalize(filePath) : path.resolve(workingDirectory, filePath); } } // Unknown tools or missing file_path get global lock for safety return "__global__"; } /** Result format expected by App.tsx for auto-allowed tools */ export type AutoAllowedResult = { toolCallId: string; result: ToolExecutionResult; }; export type ApprovalDecision = | { type: "approve"; approval: ApprovalRequest; // If set, skip executeTool and use this result (for fancy UI tools) precomputedResult?: ToolExecutionResult; } | { type: "deny"; approval: ApprovalRequest; reason: string }; // Align result type with the SDK's expected union for approvals payloads export type ApprovalResult = ToolReturn | ApprovalReturn; /** * Execute a single approval decision and return the result. * Extracted to allow parallel execution of Task tools. */ async function executeSingleDecision( decision: ApprovalDecision, onChunk?: (chunk: ToolReturnMessage) => void, options?: { abortSignal?: AbortSignal; onStreamingOutput?: ( toolCallId: string, chunk: string, isStderr?: boolean, ) => void; toolContextId?: string; }, ): Promise { // If aborted, record an interrupted result if (options?.abortSignal?.aborted) { if (onChunk) { onChunk({ message_type: "tool_return_message", id: "dummy", date: new Date().toISOString(), tool_call_id: decision.approval.toolCallId, tool_return: INTERRUPTED_BY_USER, status: "error", }); } return { type: "tool", tool_call_id: decision.approval.toolCallId, tool_return: INTERRUPTED_BY_USER, status: "error", }; } if (decision.type === "approve") { // If fancy UI already computed the result, use it directly if (decision.precomputedResult) { return { type: "tool", tool_call_id: decision.approval.toolCallId, tool_return: decision.precomputedResult.toolReturn, status: decision.precomputedResult.status, stdout: decision.precomputedResult.stdout, stderr: decision.precomputedResult.stderr, }; } // Execute the approved tool try { // Safe parse - toolArgs should be "{}" but handle edge cases let parsedArgs: Record = {}; if (typeof decision.approval.toolArgs === "string") { try { parsedArgs = JSON.parse(decision.approval.toolArgs); } catch { // Empty or malformed args - use empty object parsedArgs = {}; } } else { parsedArgs = decision.approval.toolArgs || {}; } const toolResult = await executeTool( decision.approval.toolName, parsedArgs, { signal: options?.abortSignal, toolCallId: decision.approval.toolCallId, toolContextId: options?.toolContextId, onOutput: options?.onStreamingOutput ? (chunk, stream) => options.onStreamingOutput?.( decision.approval.toolCallId, chunk, stream === "stderr", ) : undefined, }, ); // Update UI if callback provided (interactive mode) // Note: UI display uses text-only version, backend gets full multimodal content if (onChunk) { onChunk({ message_type: "tool_return_message", id: "dummy", date: new Date().toISOString(), tool_call_id: decision.approval.toolCallId, tool_return: getDisplayableToolReturn(toolResult.toolReturn), status: toolResult.status, stdout: toolResult.stdout, stderr: toolResult.stderr, }); } return { type: "tool", tool_call_id: decision.approval.toolCallId, tool_return: toolResult.toolReturn, // Full multimodal content for backend status: toolResult.status, stdout: toolResult.stdout, stderr: toolResult.stderr, }; } catch (e) { const isAbortError = e instanceof Error && (e.name === "AbortError" || e.message === "The operation was aborted"); const errorMessage = isAbortError ? INTERRUPTED_BY_USER : `Error executing tool: ${String(e)}`; if (onChunk) { onChunk({ message_type: "tool_return_message", id: "dummy", date: new Date().toISOString(), tool_call_id: decision.approval.toolCallId, tool_return: errorMessage, status: "error", }); } return { type: "tool", tool_call_id: decision.approval.toolCallId, tool_return: errorMessage, status: "error", }; } } // Format denial for backend if (onChunk) { onChunk({ message_type: "tool_return_message", id: "dummy", date: new Date().toISOString(), tool_call_id: decision.approval.toolCallId, tool_return: `Error: request to call tool denied. User reason: ${decision.reason}`, status: "error", }); } return { type: "approval", tool_call_id: decision.approval.toolCallId, approve: false, reason: decision.reason, }; } /** * Execute a batch of approval decisions and format results for the backend. * * This function handles: * - Executing approved tools (with error handling) * - Formatting denials * - Combining all results into a single batch * * Execution strategy for performance: * - Parallel-safe tools (read-only + Task) are executed in parallel * - Write tools are grouped by resource (file path) and executed with per-resource queuing: * - Different resources → parallel execution * - Same resource → sequential execution to avoid race conditions * * Used by both interactive (App.tsx) and headless (headless.ts) modes. * * @param decisions - Array of approve/deny decisions for each tool * @param onChunk - Optional callback to update UI with tool results (for interactive mode) * @returns Array of formatted results ready to send to backend (maintains original order) */ export async function executeApprovalBatch( decisions: ApprovalDecision[], onChunk?: (chunk: ToolReturnMessage) => void, options?: { abortSignal?: AbortSignal; onStreamingOutput?: ( toolCallId: string, chunk: string, isStderr?: boolean, ) => void; toolContextId?: string; workingDirectory?: string; }, ): Promise { const toolContextId = options?.toolContextId ?? (options?.workingDirectory ? captureToolExecutionContext(options.workingDirectory).contextId : undefined); // Pre-allocate results array to maintain original order const results: (ApprovalResult | null)[] = new Array(decisions.length).fill( null, ); // Categorize decisions by execution strategy const parallelIndices: number[] = []; const writeToolsByResource = new Map(); const denyIndices: number[] = []; for (let i = 0; i < decisions.length; i++) { const decision = decisions[i]; if (!decision) continue; if (decision.type === "deny") { denyIndices.push(i); continue; } const toolName = decision.approval.toolName; if (isParallelSafe(toolName)) { parallelIndices.push(i); } else { // Get resource key for write tools // Safe parse - handle empty or malformed toolArgs let args: Record = {}; if (typeof decision.approval.toolArgs === "string") { try { args = JSON.parse(decision.approval.toolArgs); } catch { // Empty or malformed args - use empty object (will use global lock) args = {}; } } else { args = decision.approval.toolArgs || {}; } const resourceKey = getResourceKey( toolName, args, options?.workingDirectory, ); const indices = writeToolsByResource.get(resourceKey) || []; indices.push(i); writeToolsByResource.set(resourceKey, indices); } } // Helper to execute a decision and store result const execute = async (i: number) => { const decision = decisions[i]; if (decision) { results[i] = await executeSingleDecision(decision, onChunk, { ...options, toolContextId, }); } }; // Execute all categories concurrently: // 1. Parallel-safe tools (all in parallel) // 2. Write tools grouped by resource (sequential within each group, parallel across groups) // 3. Denials (no actual execution needed, but process for UI updates) await Promise.all([ // Parallel-safe tools + denials: all run in parallel ...parallelIndices.map(execute), ...denyIndices.map(execute), // Write tools: sequential within each resource group, parallel across groups ...Array.from(writeToolsByResource.values()).map(async (indices) => { for (const i of indices) { await execute(i); } }), ]); // Filter out nulls (shouldn't happen, but TypeScript needs this) return results.filter((r): r is ApprovalResult => r !== null); } /** * Helper to execute auto-allowed tools and map results to the format expected by App.tsx. * Consolidates the common pattern of converting approvals to decisions, executing them, * and mapping the results back. * * @param autoAllowed - Array of auto-allowed approval contexts (must have .approval property) * @param onChunk - Callback to update UI with tool results * @param options - Optional abort signal for cancellation * @returns Array of results with toolCallId and ToolExecutionResult */ export async function executeAutoAllowedTools( autoAllowed: Array<{ approval: ApprovalRequest }>, onChunk: (chunk: ToolReturnMessage) => void, options?: { abortSignal?: AbortSignal; onStreamingOutput?: ( toolCallId: string, chunk: string, isStderr?: boolean, ) => void; toolContextId?: string; workingDirectory?: string; }, ): Promise { const decisions: ApprovalDecision[] = autoAllowed.map((ac) => ({ type: "approve" as const, approval: ac.approval, })); const batchResults = await executeApprovalBatch(decisions, onChunk, options); return batchResults .filter((r): r is ApprovalResult & { type: "tool" } => r.type === "tool") .map((r) => ({ toolCallId: r.tool_call_id, result: { toolReturn: r.tool_return, status: r.status, stdout: r.stdout, stderr: r.stderr, } as ToolExecutionResult, })); }