feat: cancel client-side tools (#113)
This commit is contained in:
@@ -33,10 +33,34 @@ export type ApprovalResult = ToolReturn | ApprovalCreate.ApprovalReturn;
|
||||
export async function executeApprovalBatch(
|
||||
decisions: ApprovalDecision[],
|
||||
onChunk?: (chunk: ToolReturnMessage) => void,
|
||||
options?: { abortSignal?: AbortSignal },
|
||||
): Promise<ApprovalResult[]> {
|
||||
const results: ApprovalResult[] = [];
|
||||
|
||||
for (const decision of decisions) {
|
||||
// If aborted before starting this decision, record an interrupted result
|
||||
if (options?.abortSignal?.aborted) {
|
||||
// Emit an interrupted chunk for visibility if callback provided
|
||||
if (onChunk) {
|
||||
onChunk({
|
||||
message_type: "tool_return_message",
|
||||
id: "dummy",
|
||||
date: new Date().toISOString(),
|
||||
tool_call_id: decision.approval.toolCallId,
|
||||
tool_return: "User interrupted tool execution",
|
||||
status: "error",
|
||||
});
|
||||
}
|
||||
|
||||
results.push({
|
||||
type: "tool",
|
||||
tool_call_id: decision.approval.toolCallId,
|
||||
tool_return: "User interrupted tool execution",
|
||||
status: "error",
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
if (decision.type === "approve") {
|
||||
// Execute the approved tool
|
||||
try {
|
||||
@@ -48,6 +72,7 @@ export async function executeApprovalBatch(
|
||||
const toolResult = await executeTool(
|
||||
decision.approval.toolName,
|
||||
parsedArgs,
|
||||
{ signal: options?.abortSignal },
|
||||
);
|
||||
|
||||
// Update UI if callback provided (interactive mode)
|
||||
@@ -73,9 +98,15 @@ export async function executeApprovalBatch(
|
||||
stderr: toolResult.stderr,
|
||||
});
|
||||
} catch (e) {
|
||||
// Still need to send error result to backend for this tool
|
||||
const errorMessage = `Error executing tool: ${String(e)}`;
|
||||
const isAbortError =
|
||||
e instanceof Error &&
|
||||
(e.name === "AbortError" ||
|
||||
e.message === "The operation was aborted");
|
||||
const errorMessage = isAbortError
|
||||
? "User interrupted tool execution"
|
||||
: `Error executing tool: ${String(e)}`;
|
||||
|
||||
// Still need to send error result to backend for this tool
|
||||
// Update UI if callback provided
|
||||
if (onChunk) {
|
||||
onChunk({
|
||||
|
||||
@@ -176,6 +176,10 @@ export default function App({
|
||||
>
|
||||
>([]);
|
||||
const [isExecutingTool, setIsExecutingTool] = useState(false);
|
||||
const [queuedApprovalResults, setQueuedApprovalResults] = useState<
|
||||
ApprovalResult[] | null
|
||||
>(null);
|
||||
const toolAbortControllerRef = useRef<AbortController | null>(null);
|
||||
|
||||
// Track auto-handled results to combine with user decisions
|
||||
const [autoHandledResults, setAutoHandledResults] = useState<
|
||||
@@ -732,6 +736,14 @@ export default function App({
|
||||
}, []);
|
||||
|
||||
const handleInterrupt = useCallback(async () => {
|
||||
// If we're executing client-side tools, abort them locally instead of hitting the backend
|
||||
if (isExecutingTool && toolAbortControllerRef.current) {
|
||||
toolAbortControllerRef.current.abort();
|
||||
setStreaming(false);
|
||||
setIsExecutingTool(false);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!streaming || interruptRequested) return;
|
||||
|
||||
setInterruptRequested(true);
|
||||
@@ -751,7 +763,7 @@ export default function App({
|
||||
appendError(`Failed to interrupt stream: ${String(e)}`);
|
||||
setInterruptRequested(false);
|
||||
}
|
||||
}, [agentId, streaming, interruptRequested, appendError]);
|
||||
}, [agentId, streaming, interruptRequested, appendError, isExecutingTool]);
|
||||
|
||||
// Reset interrupt flag when streaming ends
|
||||
useEffect(() => {
|
||||
@@ -1304,8 +1316,9 @@ export default function App({
|
||||
setStreaming(true);
|
||||
refreshDerived();
|
||||
|
||||
// Check for pending approvals before sending message
|
||||
if (CHECK_PENDING_APPROVALS_BEFORE_SEND) {
|
||||
// Check for pending approvals before sending message (skip if we already have
|
||||
// a queued approval response to send first).
|
||||
if (CHECK_PENDING_APPROVALS_BEFORE_SEND && !queuedApprovalResults) {
|
||||
try {
|
||||
const client = await getClient();
|
||||
// Fetch fresh agent state to check for pending approvals with accurate in-context messages
|
||||
@@ -1343,14 +1356,25 @@ export default function App({
|
||||
}
|
||||
}
|
||||
|
||||
// Start the conversation loop
|
||||
await processConversation([
|
||||
{
|
||||
type: "message",
|
||||
role: "user",
|
||||
content: messageContent as unknown as MessageCreate["content"],
|
||||
},
|
||||
]);
|
||||
// Start the conversation loop. If we have queued approval results from an interrupted
|
||||
// client-side execution, send them first before the new user message.
|
||||
const initialInput: Array<MessageCreate | ApprovalCreate> = [];
|
||||
|
||||
if (queuedApprovalResults) {
|
||||
initialInput.push({
|
||||
type: "approval",
|
||||
approvals: queuedApprovalResults,
|
||||
});
|
||||
setQueuedApprovalResults(null);
|
||||
}
|
||||
|
||||
initialInput.push({
|
||||
type: "message",
|
||||
role: "user",
|
||||
content: messageContent as unknown as MessageCreate["content"],
|
||||
});
|
||||
|
||||
await processConversation(initialInput);
|
||||
|
||||
// Clean up placeholders after submission
|
||||
clearPlaceholdersInText(msg);
|
||||
@@ -1368,6 +1392,7 @@ export default function App({
|
||||
columns,
|
||||
commitEligibleLines,
|
||||
isExecutingTool,
|
||||
queuedApprovalResults,
|
||||
],
|
||||
);
|
||||
|
||||
@@ -1395,6 +1420,9 @@ export default function App({
|
||||
// Show "thinking" state and lock input while executing approved tools client-side
|
||||
setStreaming(true);
|
||||
|
||||
const approvalAbortController = new AbortController();
|
||||
toolAbortControllerRef.current = approvalAbortController;
|
||||
|
||||
// Combine all decisions using snapshots
|
||||
const allDecisions = [
|
||||
...approvalResultsSnapshot,
|
||||
@@ -1421,7 +1449,10 @@ export default function App({
|
||||
appendError(chunk.tool_return);
|
||||
}
|
||||
}
|
||||
// Flush UI so completed tools show up while the batch continues
|
||||
refreshDerived();
|
||||
},
|
||||
{ abortSignal: approvalAbortController.signal },
|
||||
);
|
||||
|
||||
// Combine with auto-handled and auto-denied results using snapshots
|
||||
@@ -1472,16 +1503,25 @@ export default function App({
|
||||
setThinkingMessage(getRandomThinkingMessage());
|
||||
refreshDerived();
|
||||
|
||||
// Continue conversation with all results
|
||||
await processConversation([
|
||||
{
|
||||
type: "approval",
|
||||
approvals: allResults as ApprovalResult[],
|
||||
},
|
||||
]);
|
||||
const wasAborted = approvalAbortController.signal.aborted;
|
||||
|
||||
if (wasAborted) {
|
||||
// Queue results to send alongside the next user message
|
||||
setQueuedApprovalResults(allResults as ApprovalResult[]);
|
||||
setStreaming(false);
|
||||
} else {
|
||||
// Continue conversation with all results
|
||||
await processConversation([
|
||||
{
|
||||
type: "approval",
|
||||
approvals: allResults as ApprovalResult[],
|
||||
},
|
||||
]);
|
||||
}
|
||||
} finally {
|
||||
// Always release the execution guard, even if an error occurred
|
||||
setIsExecutingTool(false);
|
||||
toolAbortControllerRef.current = null;
|
||||
}
|
||||
},
|
||||
[
|
||||
|
||||
@@ -10,7 +10,7 @@ describe("Bash tool", () => {
|
||||
|
||||
expect(result.content).toBeDefined();
|
||||
expect(result.content[0]?.text).toContain("Hello, World!");
|
||||
expect(result.isError).toBeUndefined();
|
||||
expect(result.status).toBe("success");
|
||||
});
|
||||
|
||||
test("captures stderr in output", async () => {
|
||||
@@ -28,7 +28,7 @@ describe("Bash tool", () => {
|
||||
description: "Test exit code",
|
||||
});
|
||||
|
||||
expect(result.isError).toBe(true);
|
||||
expect(result.status).toBe("error");
|
||||
expect(result.content[0]?.text).toContain("Exit code");
|
||||
});
|
||||
|
||||
@@ -39,7 +39,7 @@ describe("Bash tool", () => {
|
||||
timeout: 100,
|
||||
});
|
||||
|
||||
expect(result.isError).toBe(true);
|
||||
expect(result.status).toBe("error");
|
||||
expect(result.content[0]?.text).toContain("timed out");
|
||||
}, 2000);
|
||||
|
||||
|
||||
@@ -65,7 +65,7 @@ describe("tool truncation integration tests", () => {
|
||||
|
||||
const output = result.content[0]?.text || "";
|
||||
expect(output).toContain("[Output truncated after 30,000 characters");
|
||||
expect(result.isError).toBe(true);
|
||||
expect(result.status).toBe("error");
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
19
src/tools/README.md
Normal file
19
src/tools/README.md
Normal file
@@ -0,0 +1,19 @@
|
||||
# Client-side tool guidelines
|
||||
|
||||
How to implement tools that run locally in Letta Code.
|
||||
|
||||
## Contract
|
||||
- Function signature: `(args, opts?) => Promise<{ toolReturn: string; status: "success" | "error"; stdout?: string[]; stderr?: string[] }>`
|
||||
- Optional `opts.signal?: AbortSignal`. If you spawn a subprocess, wire this signal to it so Esc/abort can kill it cleanly. If you’re pure in-process, you can ignore it.
|
||||
|
||||
## Subprocess tools (e.g., Bash)
|
||||
- Pass the provided `AbortSignal` to `exec`/`spawn` so abort kills the child. Normalize abort errors to `toolReturn: "User interrupted tool execution", status: "error"`.
|
||||
- Avoid running multiple subprocesses unless you also expose a cancel hook; we execute tools serially to avoid races.
|
||||
|
||||
## In-process tools (read/write/edit)
|
||||
- You can ignore the signal, but still return a clear `toolReturn` and `status`.
|
||||
- Be deterministic and side-effect aware; the runner keeps tools serial to avoid file races.
|
||||
|
||||
## Errors
|
||||
- Return a concise error message in `toolReturn` and set `status: "error"`.
|
||||
- Don’t `console.error` from tools; the UI surfaces the returned message.
|
||||
@@ -12,6 +12,7 @@ interface BashArgs {
|
||||
timeout?: number;
|
||||
description?: string;
|
||||
run_in_background?: boolean;
|
||||
signal?: AbortSignal;
|
||||
}
|
||||
|
||||
interface BashResult {
|
||||
@@ -19,7 +20,7 @@ interface BashResult {
|
||||
type: string;
|
||||
text: string;
|
||||
}>;
|
||||
isError?: boolean;
|
||||
status: "success" | "error";
|
||||
}
|
||||
|
||||
export async function bash(args: BashArgs): Promise<BashResult> {
|
||||
@@ -29,13 +30,17 @@ export async function bash(args: BashArgs): Promise<BashResult> {
|
||||
timeout = 120000,
|
||||
description: _description,
|
||||
run_in_background = false,
|
||||
signal,
|
||||
} = args;
|
||||
const userCwd = process.env.USER_CWD || process.cwd();
|
||||
|
||||
if (command === "/bashes") {
|
||||
const processes = Array.from(backgroundProcesses.entries());
|
||||
if (processes.length === 0) {
|
||||
return { content: [{ type: "text", text: "(no content)" }] };
|
||||
return {
|
||||
content: [{ type: "text", text: "(no content)" }],
|
||||
status: "success",
|
||||
};
|
||||
}
|
||||
let output = "";
|
||||
for (const [id, proc] of processes) {
|
||||
@@ -44,7 +49,10 @@ export async function bash(args: BashArgs): Promise<BashResult> {
|
||||
: "unknown";
|
||||
output += `${id}: ${proc.command} (${proc.status}, runtime: ${runtime})\n`;
|
||||
}
|
||||
return { content: [{ type: "text", text: output.trim() }] };
|
||||
return {
|
||||
content: [{ type: "text", text: output.trim() }],
|
||||
status: "success",
|
||||
};
|
||||
}
|
||||
|
||||
if (run_in_background) {
|
||||
@@ -100,6 +108,7 @@ export async function bash(args: BashArgs): Promise<BashResult> {
|
||||
text: `Command running in background with ID: ${bashId}`,
|
||||
},
|
||||
],
|
||||
status: "success",
|
||||
};
|
||||
}
|
||||
|
||||
@@ -110,6 +119,7 @@ export async function bash(args: BashArgs): Promise<BashResult> {
|
||||
maxBuffer: 10 * 1024 * 1024,
|
||||
cwd: userCwd,
|
||||
env: { ...process.env },
|
||||
signal,
|
||||
};
|
||||
const { stdout, stderr } = await execAsync(command, options);
|
||||
const stdoutStr = typeof stdout === "string" ? stdout : stdout.toString();
|
||||
@@ -126,6 +136,7 @@ export async function bash(args: BashArgs): Promise<BashResult> {
|
||||
|
||||
return {
|
||||
content: [{ type: "text", text: truncatedOutput }],
|
||||
status: "success",
|
||||
};
|
||||
} catch (error) {
|
||||
const err = error as NodeJS.ErrnoException & {
|
||||
@@ -133,14 +144,26 @@ export async function bash(args: BashArgs): Promise<BashResult> {
|
||||
stderr?: string;
|
||||
killed?: boolean;
|
||||
signal?: string;
|
||||
code?: string;
|
||||
name?: string;
|
||||
};
|
||||
const isAbort =
|
||||
signal?.aborted ||
|
||||
err.code === "ABORT_ERR" ||
|
||||
err.name === "AbortError" ||
|
||||
err.message === "The operation was aborted";
|
||||
|
||||
let errorMessage = "";
|
||||
if (err.killed && err.signal === "SIGTERM")
|
||||
errorMessage = `Command timed out after ${effectiveTimeout}ms\n`;
|
||||
if (err.code) errorMessage += `Exit code: ${err.code}\n`;
|
||||
if (err.stderr) errorMessage += err.stderr;
|
||||
else if (err.message) errorMessage += err.message;
|
||||
if (err.stdout) errorMessage = `${err.stdout}\n${errorMessage}`;
|
||||
if (isAbort) {
|
||||
errorMessage = "User interrupted tool execution";
|
||||
} else {
|
||||
if (err.killed && err.signal === "SIGTERM")
|
||||
errorMessage = `Command timed out after ${effectiveTimeout}ms\n`;
|
||||
if (err.code) errorMessage += `Exit code: ${err.code}\n`;
|
||||
if (err.stderr) errorMessage += err.stderr;
|
||||
else if (err.message) errorMessage += err.message;
|
||||
if (err.stdout) errorMessage = `${err.stdout}\n${errorMessage}`;
|
||||
}
|
||||
|
||||
// Apply character limit even to error messages
|
||||
const { content: truncatedError } = truncateByChars(
|
||||
@@ -151,7 +174,7 @@ export async function bash(args: BashArgs): Promise<BashResult> {
|
||||
|
||||
return {
|
||||
content: [{ type: "text", text: truncatedError }],
|
||||
isError: true,
|
||||
status: "error",
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -459,6 +459,7 @@ function flattenToolResponse(result: unknown): string {
|
||||
export async function executeTool(
|
||||
name: string,
|
||||
args: ToolArgs,
|
||||
options?: { signal?: AbortSignal },
|
||||
): Promise<ToolExecutionResult> {
|
||||
const tool = toolRegistry.get(name);
|
||||
|
||||
@@ -470,7 +471,13 @@ export async function executeTool(
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await tool.fn(args);
|
||||
// Inject abort signal for tools that support it (currently Bash) without altering schemas
|
||||
const argsWithSignal =
|
||||
name === "Bash" && options?.signal
|
||||
? { ...args, signal: options.signal }
|
||||
: args;
|
||||
|
||||
const result = await tool.fn(argsWithSignal);
|
||||
|
||||
// Extract stdout/stderr if present (for bash tools)
|
||||
const recordResult = isRecord(result) ? result : undefined;
|
||||
@@ -478,7 +485,6 @@ export async function executeTool(
|
||||
const stderrValue = recordResult?.stderr;
|
||||
const stdout = isStringArray(stdoutValue) ? stdoutValue : undefined;
|
||||
const stderr = isStringArray(stderrValue) ? stderrValue : undefined;
|
||||
|
||||
// Flatten the response to plain text
|
||||
const flattenedResponse = flattenToolResponse(result);
|
||||
|
||||
@@ -490,6 +496,21 @@ export async function executeTool(
|
||||
...(stderr && { stderr }),
|
||||
};
|
||||
} catch (error) {
|
||||
const isAbort =
|
||||
error instanceof Error &&
|
||||
(error.name === "AbortError" ||
|
||||
error.message === "The operation was aborted" ||
|
||||
// node:child_process AbortError may include code/message variants
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
(error as any).code === "ABORT_ERR");
|
||||
|
||||
if (isAbort) {
|
||||
return {
|
||||
toolReturn: "User interrupted tool execution",
|
||||
status: "error",
|
||||
};
|
||||
}
|
||||
|
||||
// Don't console.error here - it pollutes the TUI
|
||||
// The error message is already returned in toolReturn
|
||||
return {
|
||||
|
||||
Reference in New Issue
Block a user