fix(tools): add Glob cancellation plumbing (#1494)
Co-authored-by: Letta Code <noreply@letta.com>
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import { afterEach, describe, expect, test } from "bun:test";
|
||||
import { glob } from "../../tools/impl/Glob";
|
||||
import { executeTool, loadSpecificTools } from "../../tools/manager";
|
||||
import { TestDirectory } from "../helpers/testFs";
|
||||
|
||||
describe("Glob tool", () => {
|
||||
@@ -44,4 +45,60 @@ describe("Glob tool", () => {
|
||||
|
||||
expect(result.files).toEqual([]);
|
||||
});
|
||||
|
||||
test("aborts promptly when signal is already aborted", async () => {
|
||||
testDir = new TestDirectory();
|
||||
testDir.createFile("a.ts", "");
|
||||
|
||||
const abortController = new AbortController();
|
||||
abortController.abort();
|
||||
|
||||
await expect(
|
||||
glob({
|
||||
pattern: "**/*.ts",
|
||||
path: testDir.path,
|
||||
signal: abortController.signal,
|
||||
}),
|
||||
).rejects.toMatchObject({ name: "AbortError" });
|
||||
});
|
||||
|
||||
test("manager passes signal through to Glob execution", async () => {
|
||||
testDir = new TestDirectory();
|
||||
testDir.createFile("a.ts", "");
|
||||
|
||||
await loadSpecificTools(["Glob"]);
|
||||
|
||||
const abortController = new AbortController();
|
||||
abortController.abort();
|
||||
|
||||
const result = await executeTool(
|
||||
"Glob",
|
||||
{ pattern: "**/*.ts", path: testDir.path },
|
||||
{ signal: abortController.signal },
|
||||
);
|
||||
|
||||
expect(result.status).toBe("error");
|
||||
expect(typeof result.toolReturn).toBe("string");
|
||||
expect(result.toolReturn).toContain("Interrupted by user");
|
||||
});
|
||||
|
||||
test("manager passes signal through to GlobGemini execution", async () => {
|
||||
testDir = new TestDirectory();
|
||||
testDir.createFile("a.ts", "");
|
||||
|
||||
await loadSpecificTools(["GlobGemini"]);
|
||||
|
||||
const abortController = new AbortController();
|
||||
abortController.abort();
|
||||
|
||||
const result = await executeTool(
|
||||
"GlobGemini",
|
||||
{ pattern: "**/*.ts", dir_path: testDir.path },
|
||||
{ signal: abortController.signal },
|
||||
);
|
||||
|
||||
expect(result.status).toBe("error");
|
||||
expect(typeof result.toolReturn).toBe("string");
|
||||
expect(result.toolReturn).toContain("Interrupted by user");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { afterEach, describe, expect, test } from "bun:test";
|
||||
import { grep } from "../../tools/impl/Grep";
|
||||
import { executeTool, loadSpecificTools } from "../../tools/manager";
|
||||
import { TestDirectory } from "../helpers/testFs";
|
||||
|
||||
describe("Grep tool", () => {
|
||||
@@ -144,4 +145,61 @@ describe("Grep tool", () => {
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
test("aborts promptly when signal is already aborted", async () => {
|
||||
testDir = new TestDirectory();
|
||||
testDir.createFile("test.txt", "Hello World");
|
||||
|
||||
const abortController = new AbortController();
|
||||
abortController.abort();
|
||||
|
||||
await expect(
|
||||
grep({
|
||||
pattern: "World",
|
||||
path: testDir.path,
|
||||
output_mode: "content",
|
||||
signal: abortController.signal,
|
||||
}),
|
||||
).rejects.toMatchObject({ name: "AbortError" });
|
||||
});
|
||||
|
||||
test("manager passes signal through to Grep execution", async () => {
|
||||
testDir = new TestDirectory();
|
||||
testDir.createFile("test.txt", "Hello World");
|
||||
|
||||
await loadSpecificTools(["Grep"]);
|
||||
|
||||
const abortController = new AbortController();
|
||||
abortController.abort();
|
||||
|
||||
const result = await executeTool(
|
||||
"Grep",
|
||||
{ pattern: "World", path: testDir.path, output_mode: "content" },
|
||||
{ signal: abortController.signal },
|
||||
);
|
||||
|
||||
expect(result.status).toBe("error");
|
||||
expect(typeof result.toolReturn).toBe("string");
|
||||
expect(result.toolReturn).toContain("Interrupted by user");
|
||||
});
|
||||
|
||||
test("manager passes signal through to GrepFiles execution", async () => {
|
||||
testDir = new TestDirectory();
|
||||
testDir.createFile("test.txt", "Hello World");
|
||||
|
||||
await loadSpecificTools(["GrepFiles"]);
|
||||
|
||||
const abortController = new AbortController();
|
||||
abortController.abort();
|
||||
|
||||
const result = await executeTool(
|
||||
"GrepFiles",
|
||||
{ pattern: "World", path: testDir.path },
|
||||
{ signal: abortController.signal },
|
||||
);
|
||||
|
||||
expect(result.status).toBe("error");
|
||||
expect(typeof result.toolReturn).toBe("string");
|
||||
expect(result.toolReturn).toContain("Interrupted by user");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { afterEach, describe, expect, test } from "bun:test";
|
||||
import { search_file_content } from "../../tools/impl/SearchFileContentGemini";
|
||||
import { executeTool, loadSpecificTools } from "../../tools/manager";
|
||||
import { TestDirectory } from "../helpers/testFs";
|
||||
|
||||
describe("SearchFileContent tool", () => {
|
||||
@@ -74,4 +75,40 @@ describe("SearchFileContent tool", () => {
|
||||
|
||||
expect(result.message).toContain("Hello World");
|
||||
});
|
||||
|
||||
test("aborts promptly when signal is already aborted", async () => {
|
||||
testDir = new TestDirectory();
|
||||
testDir.createFile("test.txt", "Hello World");
|
||||
|
||||
const abortController = new AbortController();
|
||||
abortController.abort();
|
||||
|
||||
await expect(
|
||||
search_file_content({
|
||||
pattern: "Hello",
|
||||
dir_path: testDir.path,
|
||||
signal: abortController.signal,
|
||||
}),
|
||||
).rejects.toMatchObject({ name: "AbortError" });
|
||||
});
|
||||
|
||||
test("manager passes signal through to SearchFileContent execution", async () => {
|
||||
testDir = new TestDirectory();
|
||||
testDir.createFile("test.txt", "Hello World");
|
||||
|
||||
await loadSpecificTools(["SearchFileContent"]);
|
||||
|
||||
const abortController = new AbortController();
|
||||
abortController.abort();
|
||||
|
||||
const result = await executeTool(
|
||||
"SearchFileContent",
|
||||
{ pattern: "Hello", dir_path: testDir.path },
|
||||
{ signal: abortController.signal },
|
||||
);
|
||||
|
||||
expect(result.status).toBe("error");
|
||||
expect(typeof result.toolReturn).toBe("string");
|
||||
expect(result.toolReturn).toContain("Interrupted by user");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -24,6 +24,7 @@ const rgPath = getRipgrepPath();
|
||||
interface GlobArgs {
|
||||
pattern: string;
|
||||
path?: string;
|
||||
signal?: AbortSignal;
|
||||
}
|
||||
|
||||
interface GlobResult {
|
||||
@@ -59,7 +60,7 @@ function applyFileLimit(files: string[], workingDirectory: string): GlobResult {
|
||||
|
||||
export async function glob(args: GlobArgs): Promise<GlobResult> {
|
||||
validateRequiredParams(args, ["pattern"], "Glob");
|
||||
const { pattern, path: searchPath } = args;
|
||||
const { pattern, path: searchPath, signal } = args;
|
||||
|
||||
// Explicit check for undefined/empty pattern (validateRequiredParams only checks key existence)
|
||||
if (!pattern) {
|
||||
@@ -93,6 +94,7 @@ export async function glob(args: GlobArgs): Promise<GlobResult> {
|
||||
const { stdout } = await execFileAsync(rgPath, rgArgs, {
|
||||
maxBuffer: 50 * 1024 * 1024, // 50MB buffer for large file lists
|
||||
cwd: userCwd,
|
||||
signal,
|
||||
});
|
||||
|
||||
const files = stdout.trim().split("\n").filter(Boolean).sort();
|
||||
@@ -104,6 +106,16 @@ export async function glob(args: GlobArgs): Promise<GlobResult> {
|
||||
code?: string | number;
|
||||
};
|
||||
|
||||
const isAbortError =
|
||||
err.name === "AbortError" ||
|
||||
err.code === "ABORT_ERR" ||
|
||||
err.message === "The operation was aborted";
|
||||
if (isAbortError) {
|
||||
throw Object.assign(new Error("The operation was aborted"), {
|
||||
name: "AbortError",
|
||||
});
|
||||
}
|
||||
|
||||
// ripgrep exits with code 1 when no files match - that's not an error
|
||||
if (err.code === 1 || err.code === "1") {
|
||||
return { files: [] };
|
||||
|
||||
@@ -11,6 +11,7 @@ interface GlobGeminiArgs {
|
||||
case_sensitive?: boolean;
|
||||
respect_git_ignore?: boolean;
|
||||
respect_gemini_ignore?: boolean;
|
||||
signal?: AbortSignal;
|
||||
}
|
||||
|
||||
export async function glob_gemini(
|
||||
@@ -20,6 +21,7 @@ export async function glob_gemini(
|
||||
const lettaArgs = {
|
||||
pattern: args.pattern,
|
||||
path: args.dir_path,
|
||||
signal: args.signal,
|
||||
};
|
||||
|
||||
const result = await lettaGlob(lettaArgs);
|
||||
|
||||
@@ -47,6 +47,7 @@ export interface GrepArgs {
|
||||
head_limit?: number;
|
||||
offset?: number;
|
||||
multiline?: boolean;
|
||||
signal?: AbortSignal;
|
||||
}
|
||||
|
||||
interface GrepResult {
|
||||
@@ -71,6 +72,7 @@ export async function grep(args: GrepArgs): Promise<GrepResult> {
|
||||
head_limit = 100,
|
||||
offset = 0,
|
||||
multiline,
|
||||
signal,
|
||||
} = args;
|
||||
|
||||
const userCwd = process.env.USER_CWD || process.cwd();
|
||||
@@ -102,6 +104,7 @@ export async function grep(args: GrepArgs): Promise<GrepResult> {
|
||||
const { stdout } = await execFileAsync(rgPath, rgArgs, {
|
||||
maxBuffer: 10 * 1024 * 1024,
|
||||
cwd: userCwd,
|
||||
signal,
|
||||
});
|
||||
if (output_mode === "files_with_matches") {
|
||||
const allFiles = stdout.trim().split("\n").filter(Boolean);
|
||||
@@ -178,12 +181,21 @@ export async function grep(args: GrepArgs): Promise<GrepResult> {
|
||||
} catch (error) {
|
||||
const err = error as NodeJS.ErrnoException & {
|
||||
stdout?: string;
|
||||
code?: string | number;
|
||||
};
|
||||
const code = typeof err.code === "number" ? err.code : undefined;
|
||||
const _stdout = typeof err.stdout === "string" ? err.stdout : "";
|
||||
const code = err.code !== undefined ? String(err.code) : undefined;
|
||||
const message =
|
||||
typeof err.message === "string" ? err.message : "Unknown error";
|
||||
if (code === 1) {
|
||||
const isAbortError =
|
||||
err.name === "AbortError" ||
|
||||
err.code === "ABORT_ERR" ||
|
||||
err.message === "The operation was aborted";
|
||||
if (isAbortError) {
|
||||
throw Object.assign(new Error("The operation was aborted"), {
|
||||
name: "AbortError",
|
||||
});
|
||||
}
|
||||
if (code === "1") {
|
||||
if (output_mode === "files_with_matches")
|
||||
return { output: "No files found", files: 0 };
|
||||
if (output_mode === "count")
|
||||
|
||||
@@ -6,6 +6,7 @@ interface GrepFilesArgs {
|
||||
include?: string;
|
||||
path?: string;
|
||||
limit?: number;
|
||||
signal?: AbortSignal;
|
||||
}
|
||||
|
||||
interface GrepFilesResult {
|
||||
@@ -26,13 +27,14 @@ export async function grep_files(
|
||||
): Promise<GrepFilesResult> {
|
||||
validateRequiredParams(args, ["pattern"], "grep_files");
|
||||
|
||||
const { pattern, include, path, limit = DEFAULT_LIMIT } = args;
|
||||
const { pattern, include, path, limit = DEFAULT_LIMIT, signal } = args;
|
||||
|
||||
const grepArgs: GrepArgs = {
|
||||
pattern,
|
||||
path,
|
||||
glob: include,
|
||||
output_mode: "files_with_matches",
|
||||
signal,
|
||||
};
|
||||
|
||||
const result = await grep(grepArgs);
|
||||
|
||||
@@ -9,6 +9,7 @@ interface SearchFileContentGeminiArgs {
|
||||
pattern: string;
|
||||
dir_path?: string;
|
||||
include?: string;
|
||||
signal?: AbortSignal;
|
||||
}
|
||||
|
||||
export async function search_file_content(
|
||||
@@ -20,6 +21,7 @@ export async function search_file_content(
|
||||
path: args.dir_path,
|
||||
glob: args.include,
|
||||
output_mode: "content" as const, // Return actual matching lines, not just file paths
|
||||
signal: args.signal,
|
||||
};
|
||||
|
||||
const result = await grep(lettaArgs);
|
||||
|
||||
@@ -46,7 +46,7 @@ const FILE_MODIFYING_TOOLS = new Set([
|
||||
]);
|
||||
|
||||
export const TOOL_NAMES = Object.keys(TOOL_DEFINITIONS) as ToolName[];
|
||||
const STREAMING_SHELL_TOOLS = new Set([
|
||||
const SIGNAL_AWARE_TOOLS = new Set([
|
||||
"Bash",
|
||||
"BashOutput",
|
||||
"TaskOutput",
|
||||
@@ -56,6 +56,14 @@ const STREAMING_SHELL_TOOLS = new Set([
|
||||
"Shell",
|
||||
"run_shell_command",
|
||||
"RunShellCommand",
|
||||
"Glob",
|
||||
"Grep",
|
||||
"grep_files",
|
||||
"GrepFiles",
|
||||
"glob_gemini",
|
||||
"GlobGemini",
|
||||
"search_file_content",
|
||||
"SearchFileContent",
|
||||
]);
|
||||
|
||||
// Maps internal tool names to server/model-facing tool names
|
||||
@@ -1324,13 +1332,22 @@ export async function executeTool(
|
||||
// Inject options for tools that support them without altering schemas
|
||||
let enhancedArgs = args;
|
||||
|
||||
if (STREAMING_SHELL_TOOLS.has(internalName)) {
|
||||
if (options?.signal) {
|
||||
enhancedArgs = { ...enhancedArgs, signal: options.signal };
|
||||
}
|
||||
if (options?.onOutput) {
|
||||
enhancedArgs = { ...enhancedArgs, onOutput: options.onOutput };
|
||||
}
|
||||
if (SIGNAL_AWARE_TOOLS.has(internalName) && options?.signal) {
|
||||
enhancedArgs = { ...enhancedArgs, signal: options.signal };
|
||||
}
|
||||
|
||||
if (
|
||||
(internalName === "Bash" ||
|
||||
internalName === "BashOutput" ||
|
||||
internalName === "shell_command" ||
|
||||
internalName === "ShellCommand" ||
|
||||
internalName === "shell" ||
|
||||
internalName === "Shell" ||
|
||||
internalName === "run_shell_command" ||
|
||||
internalName === "RunShellCommand") &&
|
||||
options?.onOutput
|
||||
) {
|
||||
enhancedArgs = { ...enhancedArgs, onOutput: options.onOutput };
|
||||
}
|
||||
|
||||
// Inject toolCallId and abort signal for Task tool
|
||||
|
||||
Reference in New Issue
Block a user