Merge branch 'main' of https://github.com/letta-ai/letta-code
This commit is contained in:
@@ -185,6 +185,7 @@ get_fm_value() {
|
||||
# Skip skill SKILL.md files — they use a different frontmatter format.
|
||||
for file in $(git diff --cached --name-only --diff-filter=ACM | grep -E '^(memory/)?(system|reference)/.*\\.md$'); do
|
||||
staged=$(git show ":$file")
|
||||
staged=$(printf '%s' "$staged" | tr -d '\r')
|
||||
|
||||
# Frontmatter is required
|
||||
first_line=$(echo "$staged" | head -1)
|
||||
|
||||
@@ -6,6 +6,13 @@ import { shell } from "../tools/impl/Shell.js";
|
||||
|
||||
const isWindows = process.platform === "win32";
|
||||
|
||||
function getEchoCommand(...args: string[]): string[] {
|
||||
if (isWindows) {
|
||||
return ["cmd.exe", "/c", "echo", ...args];
|
||||
}
|
||||
return ["/usr/bin/env", "echo", ...args];
|
||||
}
|
||||
|
||||
describe("shell codex tool", () => {
|
||||
let tempDir: string;
|
||||
|
||||
@@ -18,11 +25,13 @@ describe("shell codex tool", () => {
|
||||
|
||||
test("executes simple command with execvp-style args", async () => {
|
||||
const result = await shell({
|
||||
command: ["echo", "hello", "world"],
|
||||
command: getEchoCommand("hello", "world"),
|
||||
});
|
||||
|
||||
expect(result.output).toBe("hello world");
|
||||
expect(result.stdout).toContain("hello world");
|
||||
expect(result.output.replaceAll('"', "")).toBe("hello world");
|
||||
expect(result.stdout.join(" ").replaceAll('"', "")).toContain(
|
||||
"hello world",
|
||||
);
|
||||
expect(result.stderr.length).toBe(0);
|
||||
});
|
||||
|
||||
@@ -54,10 +63,10 @@ describe("shell codex tool", () => {
|
||||
// This is the key test for execvp semantics - args with spaces
|
||||
// should NOT be split
|
||||
const result = await shell({
|
||||
command: ["echo", "hello world", "foo bar"],
|
||||
command: getEchoCommand("hello world", "foo bar"),
|
||||
});
|
||||
|
||||
expect(result.output).toBe("hello world foo bar");
|
||||
expect(result.output.replaceAll('"', "")).toBe("hello world foo bar");
|
||||
});
|
||||
|
||||
test.skipIf(isWindows)("respects workdir parameter", async () => {
|
||||
@@ -180,7 +189,7 @@ describe("shell codex tool", () => {
|
||||
|
||||
test("handles special characters in arguments", async () => {
|
||||
const result = await shell({
|
||||
command: ["echo", "$HOME", "$(whoami)", "`date`"],
|
||||
command: getEchoCommand("$HOME", "$(whoami)", "`date`"),
|
||||
});
|
||||
|
||||
// Since we're using execvp-style (not shell expansion),
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { afterEach, describe, expect, test } from "bun:test";
|
||||
import { glob } from "../../tools/impl/Glob";
|
||||
import { executeTool, loadSpecificTools } from "../../tools/manager";
|
||||
import { TestDirectory } from "../helpers/testFs";
|
||||
|
||||
describe("Glob tool", () => {
|
||||
@@ -44,4 +45,60 @@ describe("Glob tool", () => {
|
||||
|
||||
expect(result.files).toEqual([]);
|
||||
});
|
||||
|
||||
test("aborts promptly when signal is already aborted", async () => {
|
||||
testDir = new TestDirectory();
|
||||
testDir.createFile("a.ts", "");
|
||||
|
||||
const abortController = new AbortController();
|
||||
abortController.abort();
|
||||
|
||||
await expect(
|
||||
glob({
|
||||
pattern: "**/*.ts",
|
||||
path: testDir.path,
|
||||
signal: abortController.signal,
|
||||
}),
|
||||
).rejects.toMatchObject({ name: "AbortError" });
|
||||
});
|
||||
|
||||
test("manager passes signal through to Glob execution", async () => {
|
||||
testDir = new TestDirectory();
|
||||
testDir.createFile("a.ts", "");
|
||||
|
||||
await loadSpecificTools(["Glob"]);
|
||||
|
||||
const abortController = new AbortController();
|
||||
abortController.abort();
|
||||
|
||||
const result = await executeTool(
|
||||
"Glob",
|
||||
{ pattern: "**/*.ts", path: testDir.path },
|
||||
{ signal: abortController.signal },
|
||||
);
|
||||
|
||||
expect(result.status).toBe("error");
|
||||
expect(typeof result.toolReturn).toBe("string");
|
||||
expect(result.toolReturn).toContain("Interrupted by user");
|
||||
});
|
||||
|
||||
test("manager passes signal through to GlobGemini execution", async () => {
|
||||
testDir = new TestDirectory();
|
||||
testDir.createFile("a.ts", "");
|
||||
|
||||
await loadSpecificTools(["GlobGemini"]);
|
||||
|
||||
const abortController = new AbortController();
|
||||
abortController.abort();
|
||||
|
||||
const result = await executeTool(
|
||||
"GlobGemini",
|
||||
{ pattern: "**/*.ts", dir_path: testDir.path },
|
||||
{ signal: abortController.signal },
|
||||
);
|
||||
|
||||
expect(result.status).toBe("error");
|
||||
expect(typeof result.toolReturn).toBe("string");
|
||||
expect(result.toolReturn).toContain("Interrupted by user");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { afterEach, describe, expect, test } from "bun:test";
|
||||
import { grep } from "../../tools/impl/Grep";
|
||||
import { executeTool, loadSpecificTools } from "../../tools/manager";
|
||||
import { TestDirectory } from "../helpers/testFs";
|
||||
|
||||
describe("Grep tool", () => {
|
||||
@@ -144,4 +145,61 @@ describe("Grep tool", () => {
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
test("aborts promptly when signal is already aborted", async () => {
|
||||
testDir = new TestDirectory();
|
||||
testDir.createFile("test.txt", "Hello World");
|
||||
|
||||
const abortController = new AbortController();
|
||||
abortController.abort();
|
||||
|
||||
await expect(
|
||||
grep({
|
||||
pattern: "World",
|
||||
path: testDir.path,
|
||||
output_mode: "content",
|
||||
signal: abortController.signal,
|
||||
}),
|
||||
).rejects.toMatchObject({ name: "AbortError" });
|
||||
});
|
||||
|
||||
test("manager passes signal through to Grep execution", async () => {
|
||||
testDir = new TestDirectory();
|
||||
testDir.createFile("test.txt", "Hello World");
|
||||
|
||||
await loadSpecificTools(["Grep"]);
|
||||
|
||||
const abortController = new AbortController();
|
||||
abortController.abort();
|
||||
|
||||
const result = await executeTool(
|
||||
"Grep",
|
||||
{ pattern: "World", path: testDir.path, output_mode: "content" },
|
||||
{ signal: abortController.signal },
|
||||
);
|
||||
|
||||
expect(result.status).toBe("error");
|
||||
expect(typeof result.toolReturn).toBe("string");
|
||||
expect(result.toolReturn).toContain("Interrupted by user");
|
||||
});
|
||||
|
||||
test("manager passes signal through to GrepFiles execution", async () => {
|
||||
testDir = new TestDirectory();
|
||||
testDir.createFile("test.txt", "Hello World");
|
||||
|
||||
await loadSpecificTools(["GrepFiles"]);
|
||||
|
||||
const abortController = new AbortController();
|
||||
abortController.abort();
|
||||
|
||||
const result = await executeTool(
|
||||
"GrepFiles",
|
||||
{ pattern: "World", path: testDir.path },
|
||||
{ signal: abortController.signal },
|
||||
);
|
||||
|
||||
expect(result.status).toBe("error");
|
||||
expect(typeof result.toolReturn).toBe("string");
|
||||
expect(result.toolReturn).toContain("Interrupted by user");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { afterEach, describe, expect, test } from "bun:test";
|
||||
import { search_file_content } from "../../tools/impl/SearchFileContentGemini";
|
||||
import { executeTool, loadSpecificTools } from "../../tools/manager";
|
||||
import { TestDirectory } from "../helpers/testFs";
|
||||
|
||||
describe("SearchFileContent tool", () => {
|
||||
@@ -74,4 +75,40 @@ describe("SearchFileContent tool", () => {
|
||||
|
||||
expect(result.message).toContain("Hello World");
|
||||
});
|
||||
|
||||
test("aborts promptly when signal is already aborted", async () => {
|
||||
testDir = new TestDirectory();
|
||||
testDir.createFile("test.txt", "Hello World");
|
||||
|
||||
const abortController = new AbortController();
|
||||
abortController.abort();
|
||||
|
||||
await expect(
|
||||
search_file_content({
|
||||
pattern: "Hello",
|
||||
dir_path: testDir.path,
|
||||
signal: abortController.signal,
|
||||
}),
|
||||
).rejects.toMatchObject({ name: "AbortError" });
|
||||
});
|
||||
|
||||
test("manager passes signal through to SearchFileContent execution", async () => {
|
||||
testDir = new TestDirectory();
|
||||
testDir.createFile("test.txt", "Hello World");
|
||||
|
||||
await loadSpecificTools(["SearchFileContent"]);
|
||||
|
||||
const abortController = new AbortController();
|
||||
abortController.abort();
|
||||
|
||||
const result = await executeTool(
|
||||
"SearchFileContent",
|
||||
{ pattern: "Hello", dir_path: testDir.path },
|
||||
{ signal: abortController.signal },
|
||||
);
|
||||
|
||||
expect(result.status).toBe("error");
|
||||
expect(typeof result.toolReturn).toBe("string");
|
||||
expect(result.toolReturn).toContain("Interrupted by user");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test";
|
||||
import WebSocket from "ws";
|
||||
import type { ResumeData } from "../../agent/check-approval";
|
||||
import { permissionMode } from "../../permissions/mode";
|
||||
import type {
|
||||
MessageQueueItem,
|
||||
@@ -68,13 +67,6 @@ const getClientMock = mock(async () => ({
|
||||
cancel: cancelConversationMock,
|
||||
},
|
||||
}));
|
||||
const getResumeDataMock = mock(
|
||||
async (): Promise<ResumeData> => ({
|
||||
pendingApproval: null,
|
||||
pendingApprovals: [],
|
||||
messageHistory: [],
|
||||
}),
|
||||
);
|
||||
const classifyApprovalsMock = mock(async () => ({
|
||||
autoAllowed: [],
|
||||
autoDenied: [],
|
||||
@@ -203,7 +195,6 @@ describe("listen-client multi-worker concurrency", () => {
|
||||
drainStreamWithResumeMock.mockClear();
|
||||
getClientMock.mockClear();
|
||||
retrieveAgentMock.mockClear();
|
||||
getResumeDataMock.mockClear();
|
||||
classifyApprovalsMock.mockClear();
|
||||
executeApprovalBatchMock.mockClear();
|
||||
cancelConversationMock.mockClear();
|
||||
@@ -706,11 +697,6 @@ describe("listen-client multi-worker concurrency", () => {
|
||||
status: "success",
|
||||
};
|
||||
|
||||
getResumeDataMock.mockResolvedValueOnce({
|
||||
pendingApproval: approval,
|
||||
pendingApprovals: [approval],
|
||||
messageHistory: [],
|
||||
});
|
||||
classifyApprovalsMock.mockResolvedValueOnce({
|
||||
autoAllowed: [
|
||||
{
|
||||
@@ -757,7 +743,13 @@ describe("listen-client multi-worker concurrency", () => {
|
||||
runtime,
|
||||
socket as unknown as WebSocket,
|
||||
new AbortController().signal,
|
||||
{ getResumeData: getResumeDataMock },
|
||||
{
|
||||
getResumeData: async () => ({
|
||||
pendingApproval: approval,
|
||||
pendingApprovals: [approval],
|
||||
messageHistory: [],
|
||||
}),
|
||||
},
|
||||
);
|
||||
|
||||
await waitFor(() => sendMessageStreamMock.mock.calls.length === 1);
|
||||
@@ -862,6 +854,77 @@ describe("listen-client multi-worker concurrency", () => {
|
||||
expect(listener.conversationRuntimes.has(runtimeA.key)).toBe(true);
|
||||
});
|
||||
|
||||
test("stale approval response after approval-only interrupt unlatches cancelRequested and allows queue pump", async () => {
|
||||
const listener = __listenClientTestUtils.createListenerRuntime();
|
||||
__listenClientTestUtils.setActiveRuntime(listener);
|
||||
const runtime = __listenClientTestUtils.getOrCreateScopedRuntime(
|
||||
listener,
|
||||
"agent-1",
|
||||
"conv-a",
|
||||
);
|
||||
const socket = new MockSocket();
|
||||
|
||||
runtime.cancelRequested = true;
|
||||
runtime.isProcessing = false;
|
||||
runtime.loopStatus = "WAITING_ON_INPUT";
|
||||
|
||||
const queueInput = {
|
||||
kind: "message",
|
||||
source: "user",
|
||||
content: "queued after stale approval",
|
||||
clientMessageId: "cm-stale-approval",
|
||||
agentId: "agent-1",
|
||||
conversationId: "conv-a",
|
||||
} satisfies Omit<MessageQueueItem, "id" | "enqueuedAt">;
|
||||
const item = runtime.queueRuntime.enqueue(queueInput);
|
||||
if (!item) {
|
||||
throw new Error("Expected queued item to be created");
|
||||
}
|
||||
runtime.queuedMessagesByItemId.set(
|
||||
item.id,
|
||||
makeIncomingMessage("agent-1", "conv-a", "queued after stale approval"),
|
||||
);
|
||||
|
||||
const scheduleQueuePumpMock = mock(() => {
|
||||
__listenClientTestUtils.scheduleQueuePump(
|
||||
runtime,
|
||||
socket as unknown as WebSocket,
|
||||
{} as never,
|
||||
async () => {},
|
||||
);
|
||||
});
|
||||
|
||||
const handled = await __listenClientTestUtils.handleApprovalResponseInput(
|
||||
listener,
|
||||
{
|
||||
runtime: { agent_id: "agent-1", conversation_id: "conv-a" },
|
||||
response: {
|
||||
request_id: "perm-stale-after-approval-only-interrupt",
|
||||
decision: { behavior: "allow" },
|
||||
},
|
||||
socket: socket as unknown as WebSocket,
|
||||
opts: {
|
||||
onStatusChange: undefined,
|
||||
connectionId: "conn-1",
|
||||
},
|
||||
processQueuedTurn: async () => {},
|
||||
},
|
||||
{
|
||||
resolveRuntimeForApprovalRequest: () => null,
|
||||
resolvePendingApprovalResolver: () => false,
|
||||
getOrCreateScopedRuntime: () => runtime,
|
||||
resolveRecoveredApprovalResponse: async () => false,
|
||||
scheduleQueuePump: scheduleQueuePumpMock,
|
||||
},
|
||||
);
|
||||
|
||||
expect(handled).toBe(false);
|
||||
expect(runtime.cancelRequested).toBe(false);
|
||||
expect(scheduleQueuePumpMock).toHaveBeenCalledTimes(1);
|
||||
await waitFor(() => runtime.queuePumpScheduled === false);
|
||||
expect(runtime.queueRuntime.length).toBe(0);
|
||||
});
|
||||
|
||||
test("change_device_state command holds queued input until the tracked command completes", async () => {
|
||||
const listener = __listenClientTestUtils.createListenerRuntime();
|
||||
__listenClientTestUtils.setActiveRuntime(listener);
|
||||
|
||||
@@ -277,6 +277,35 @@ describe("listen-client approval resolver wiring", () => {
|
||||
expect(runtime.pendingApprovalResolvers.size).toBe(0);
|
||||
});
|
||||
|
||||
test("resolving final approval response restores WAITING_ON_INPUT even while processing stays true", async () => {
|
||||
const runtime = __listenClientTestUtils.createRuntime();
|
||||
const socket = new MockSocket(WebSocket.OPEN);
|
||||
const requestId = "perm-processing";
|
||||
|
||||
runtime.isProcessing = true;
|
||||
runtime.loopStatus = "WAITING_ON_APPROVAL" as never;
|
||||
|
||||
const pending = requestApprovalOverWS(
|
||||
runtime,
|
||||
socket as unknown as WebSocket,
|
||||
requestId,
|
||||
makeControlRequest(requestId),
|
||||
);
|
||||
|
||||
const resolved = resolvePendingApprovalResolver(
|
||||
runtime,
|
||||
makeSuccessResponse(requestId),
|
||||
);
|
||||
|
||||
expect(resolved).toBe(true);
|
||||
await expect(pending).resolves.toMatchObject({
|
||||
request_id: requestId,
|
||||
decision: { behavior: "allow" },
|
||||
});
|
||||
expect(String(runtime.loopStatus)).toBe("WAITING_ON_INPUT");
|
||||
expect(runtime.isProcessing).toBe(true);
|
||||
});
|
||||
|
||||
test("ignores non-matching request_id and keeps pending resolver", async () => {
|
||||
const runtime = __listenClientTestUtils.createRuntime();
|
||||
const socket = new MockSocket(WebSocket.OPEN);
|
||||
@@ -329,8 +358,9 @@ describe("listen-client approval resolver wiring", () => {
|
||||
await expect(second).rejects.toThrow("socket closed");
|
||||
});
|
||||
|
||||
test("cleanup resets WAITING_ON_INPUT instead of restoring fake processing", async () => {
|
||||
test("cleanup resets loop status to WAITING_ON_INPUT even while processing stays true", async () => {
|
||||
const runtime = __listenClientTestUtils.createRuntime();
|
||||
|
||||
runtime.isProcessing = true;
|
||||
runtime.loopStatus = "WAITING_ON_APPROVAL";
|
||||
|
||||
@@ -341,6 +371,7 @@ describe("listen-client approval resolver wiring", () => {
|
||||
rejectPendingApprovalResolvers(runtime, "socket closed");
|
||||
|
||||
expect(runtime.loopStatus as string).toBe("WAITING_ON_INPUT");
|
||||
expect(runtime.isProcessing).toBe(true);
|
||||
await expect(pending).rejects.toThrow("socket closed");
|
||||
});
|
||||
|
||||
@@ -1302,6 +1333,59 @@ describe("listen-client capability-gated approval flow", () => {
|
||||
expect.any(Function),
|
||||
);
|
||||
});
|
||||
|
||||
test("stale approval responses after interrupt are benign and do not mutate runtime state", async () => {
|
||||
const listener = __listenClientTestUtils.createListenerRuntime();
|
||||
const targetRuntime =
|
||||
__listenClientTestUtils.getOrCreateConversationRuntime(
|
||||
listener,
|
||||
"agent-1",
|
||||
"default",
|
||||
);
|
||||
targetRuntime.cancelRequested = true;
|
||||
targetRuntime.loopStatus = "WAITING_ON_INPUT";
|
||||
targetRuntime.isProcessing = false;
|
||||
|
||||
const socket = new MockSocket(WebSocket.OPEN);
|
||||
const scheduleQueuePumpMock = mock(() => {});
|
||||
const resolveRecoveredApprovalResponseMock = mock(async () => false);
|
||||
|
||||
const handled = await __listenClientTestUtils.handleApprovalResponseInput(
|
||||
listener,
|
||||
{
|
||||
runtime: { agent_id: "agent-1", conversation_id: "default" },
|
||||
response: {
|
||||
request_id: "perm-stale-after-interrupt",
|
||||
decision: { behavior: "allow" },
|
||||
},
|
||||
socket: socket as unknown as WebSocket,
|
||||
opts: {
|
||||
onStatusChange: undefined,
|
||||
connectionId: "conn-1",
|
||||
},
|
||||
processQueuedTurn: async () => {},
|
||||
},
|
||||
{
|
||||
resolveRuntimeForApprovalRequest: () => null,
|
||||
resolvePendingApprovalResolver: () => false,
|
||||
getOrCreateScopedRuntime: () => targetRuntime,
|
||||
resolveRecoveredApprovalResponse: resolveRecoveredApprovalResponseMock,
|
||||
scheduleQueuePump: scheduleQueuePumpMock,
|
||||
},
|
||||
);
|
||||
|
||||
expect(handled).toBe(false);
|
||||
expect(resolveRecoveredApprovalResponseMock).not.toHaveBeenCalled();
|
||||
expect(scheduleQueuePumpMock).toHaveBeenCalledWith(
|
||||
targetRuntime,
|
||||
socket,
|
||||
expect.objectContaining({ connectionId: "conn-1" }),
|
||||
expect.any(Function),
|
||||
);
|
||||
expect(targetRuntime.cancelRequested).toBe(false);
|
||||
expect(targetRuntime.loopStatus).toBe("WAITING_ON_INPUT");
|
||||
expect(targetRuntime.isProcessing).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("listen-client approval recovery batch correlation", () => {
|
||||
|
||||
@@ -24,6 +24,7 @@ const rgPath = getRipgrepPath();
|
||||
interface GlobArgs {
|
||||
pattern: string;
|
||||
path?: string;
|
||||
signal?: AbortSignal;
|
||||
}
|
||||
|
||||
interface GlobResult {
|
||||
@@ -59,7 +60,7 @@ function applyFileLimit(files: string[], workingDirectory: string): GlobResult {
|
||||
|
||||
export async function glob(args: GlobArgs): Promise<GlobResult> {
|
||||
validateRequiredParams(args, ["pattern"], "Glob");
|
||||
const { pattern, path: searchPath } = args;
|
||||
const { pattern, path: searchPath, signal } = args;
|
||||
|
||||
// Explicit check for undefined/empty pattern (validateRequiredParams only checks key existence)
|
||||
if (!pattern) {
|
||||
@@ -93,6 +94,7 @@ export async function glob(args: GlobArgs): Promise<GlobResult> {
|
||||
const { stdout } = await execFileAsync(rgPath, rgArgs, {
|
||||
maxBuffer: 50 * 1024 * 1024, // 50MB buffer for large file lists
|
||||
cwd: userCwd,
|
||||
signal,
|
||||
});
|
||||
|
||||
const files = stdout.trim().split("\n").filter(Boolean).sort();
|
||||
@@ -104,6 +106,16 @@ export async function glob(args: GlobArgs): Promise<GlobResult> {
|
||||
code?: string | number;
|
||||
};
|
||||
|
||||
const isAbortError =
|
||||
err.name === "AbortError" ||
|
||||
err.code === "ABORT_ERR" ||
|
||||
err.message === "The operation was aborted";
|
||||
if (isAbortError) {
|
||||
throw Object.assign(new Error("The operation was aborted"), {
|
||||
name: "AbortError",
|
||||
});
|
||||
}
|
||||
|
||||
// ripgrep exits with code 1 when no files match - that's not an error
|
||||
if (err.code === 1 || err.code === "1") {
|
||||
return { files: [] };
|
||||
|
||||
@@ -11,6 +11,7 @@ interface GlobGeminiArgs {
|
||||
case_sensitive?: boolean;
|
||||
respect_git_ignore?: boolean;
|
||||
respect_gemini_ignore?: boolean;
|
||||
signal?: AbortSignal;
|
||||
}
|
||||
|
||||
export async function glob_gemini(
|
||||
@@ -20,6 +21,7 @@ export async function glob_gemini(
|
||||
const lettaArgs = {
|
||||
pattern: args.pattern,
|
||||
path: args.dir_path,
|
||||
signal: args.signal,
|
||||
};
|
||||
|
||||
const result = await lettaGlob(lettaArgs);
|
||||
|
||||
@@ -47,6 +47,7 @@ export interface GrepArgs {
|
||||
head_limit?: number;
|
||||
offset?: number;
|
||||
multiline?: boolean;
|
||||
signal?: AbortSignal;
|
||||
}
|
||||
|
||||
interface GrepResult {
|
||||
@@ -71,6 +72,7 @@ export async function grep(args: GrepArgs): Promise<GrepResult> {
|
||||
head_limit = 100,
|
||||
offset = 0,
|
||||
multiline,
|
||||
signal,
|
||||
} = args;
|
||||
|
||||
const userCwd = process.env.USER_CWD || process.cwd();
|
||||
@@ -102,6 +104,7 @@ export async function grep(args: GrepArgs): Promise<GrepResult> {
|
||||
const { stdout } = await execFileAsync(rgPath, rgArgs, {
|
||||
maxBuffer: 10 * 1024 * 1024,
|
||||
cwd: userCwd,
|
||||
signal,
|
||||
});
|
||||
if (output_mode === "files_with_matches") {
|
||||
const allFiles = stdout.trim().split("\n").filter(Boolean);
|
||||
@@ -178,12 +181,21 @@ export async function grep(args: GrepArgs): Promise<GrepResult> {
|
||||
} catch (error) {
|
||||
const err = error as NodeJS.ErrnoException & {
|
||||
stdout?: string;
|
||||
code?: string | number;
|
||||
};
|
||||
const code = typeof err.code === "number" ? err.code : undefined;
|
||||
const _stdout = typeof err.stdout === "string" ? err.stdout : "";
|
||||
const code = err.code !== undefined ? String(err.code) : undefined;
|
||||
const message =
|
||||
typeof err.message === "string" ? err.message : "Unknown error";
|
||||
if (code === 1) {
|
||||
const isAbortError =
|
||||
err.name === "AbortError" ||
|
||||
err.code === "ABORT_ERR" ||
|
||||
err.message === "The operation was aborted";
|
||||
if (isAbortError) {
|
||||
throw Object.assign(new Error("The operation was aborted"), {
|
||||
name: "AbortError",
|
||||
});
|
||||
}
|
||||
if (code === "1") {
|
||||
if (output_mode === "files_with_matches")
|
||||
return { output: "No files found", files: 0 };
|
||||
if (output_mode === "count")
|
||||
|
||||
@@ -6,6 +6,7 @@ interface GrepFilesArgs {
|
||||
include?: string;
|
||||
path?: string;
|
||||
limit?: number;
|
||||
signal?: AbortSignal;
|
||||
}
|
||||
|
||||
interface GrepFilesResult {
|
||||
@@ -26,13 +27,14 @@ export async function grep_files(
|
||||
): Promise<GrepFilesResult> {
|
||||
validateRequiredParams(args, ["pattern"], "grep_files");
|
||||
|
||||
const { pattern, include, path, limit = DEFAULT_LIMIT } = args;
|
||||
const { pattern, include, path, limit = DEFAULT_LIMIT, signal } = args;
|
||||
|
||||
const grepArgs: GrepArgs = {
|
||||
pattern,
|
||||
path,
|
||||
glob: include,
|
||||
output_mode: "files_with_matches",
|
||||
signal,
|
||||
};
|
||||
|
||||
const result = await grep(grepArgs);
|
||||
|
||||
@@ -9,6 +9,7 @@ interface SearchFileContentGeminiArgs {
|
||||
pattern: string;
|
||||
dir_path?: string;
|
||||
include?: string;
|
||||
signal?: AbortSignal;
|
||||
}
|
||||
|
||||
export async function search_file_content(
|
||||
@@ -20,6 +21,7 @@ export async function search_file_content(
|
||||
path: args.dir_path,
|
||||
glob: args.include,
|
||||
output_mode: "content" as const, // Return actual matching lines, not just file paths
|
||||
signal: args.signal,
|
||||
};
|
||||
|
||||
const result = await grep(lettaArgs);
|
||||
|
||||
@@ -46,7 +46,7 @@ const FILE_MODIFYING_TOOLS = new Set([
|
||||
]);
|
||||
|
||||
export const TOOL_NAMES = Object.keys(TOOL_DEFINITIONS) as ToolName[];
|
||||
const STREAMING_SHELL_TOOLS = new Set([
|
||||
const SIGNAL_AWARE_TOOLS = new Set([
|
||||
"Bash",
|
||||
"BashOutput",
|
||||
"TaskOutput",
|
||||
@@ -56,6 +56,14 @@ const STREAMING_SHELL_TOOLS = new Set([
|
||||
"Shell",
|
||||
"run_shell_command",
|
||||
"RunShellCommand",
|
||||
"Glob",
|
||||
"Grep",
|
||||
"grep_files",
|
||||
"GrepFiles",
|
||||
"glob_gemini",
|
||||
"GlobGemini",
|
||||
"search_file_content",
|
||||
"SearchFileContent",
|
||||
]);
|
||||
|
||||
// Maps internal tool names to server/model-facing tool names
|
||||
@@ -1324,13 +1332,22 @@ export async function executeTool(
|
||||
// Inject options for tools that support them without altering schemas
|
||||
let enhancedArgs = args;
|
||||
|
||||
if (STREAMING_SHELL_TOOLS.has(internalName)) {
|
||||
if (options?.signal) {
|
||||
enhancedArgs = { ...enhancedArgs, signal: options.signal };
|
||||
}
|
||||
if (options?.onOutput) {
|
||||
enhancedArgs = { ...enhancedArgs, onOutput: options.onOutput };
|
||||
}
|
||||
if (SIGNAL_AWARE_TOOLS.has(internalName) && options?.signal) {
|
||||
enhancedArgs = { ...enhancedArgs, signal: options.signal };
|
||||
}
|
||||
|
||||
if (
|
||||
(internalName === "Bash" ||
|
||||
internalName === "BashOutput" ||
|
||||
internalName === "shell_command" ||
|
||||
internalName === "ShellCommand" ||
|
||||
internalName === "shell" ||
|
||||
internalName === "Shell" ||
|
||||
internalName === "run_shell_command" ||
|
||||
internalName === "RunShellCommand") &&
|
||||
options?.onOutput
|
||||
) {
|
||||
enhancedArgs = { ...enhancedArgs, onOutput: options.onOutput };
|
||||
}
|
||||
|
||||
// Inject toolCallId and abort signal for Task tool
|
||||
|
||||
@@ -12,7 +12,14 @@ export type ListenerQueueGatingConditions = {
|
||||
export function getListenerBlockedReason(
|
||||
c: ListenerQueueGatingConditions,
|
||||
): QueueBlockedReason | null {
|
||||
if (c.cancelRequested) return "interrupt_in_progress";
|
||||
if (
|
||||
c.cancelRequested &&
|
||||
(c.isProcessing ||
|
||||
c.isRecoveringApprovals ||
|
||||
c.loopStatus !== "WAITING_ON_INPUT")
|
||||
) {
|
||||
return "interrupt_in_progress";
|
||||
}
|
||||
if (c.pendingApprovalsLen > 0) return "pending_approvals";
|
||||
if (c.isRecoveringApprovals) return "runtime_busy";
|
||||
if (c.loopStatus === "WAITING_ON_APPROVAL") return "pending_approvals";
|
||||
|
||||
@@ -339,6 +339,18 @@ async function handleApprovalResponseInput(
|
||||
params.runtime.agent_id,
|
||||
params.runtime.conversation_id,
|
||||
);
|
||||
|
||||
if (targetRuntime.cancelRequested && !targetRuntime.isProcessing) {
|
||||
targetRuntime.cancelRequested = false;
|
||||
deps.scheduleQueuePump(
|
||||
targetRuntime,
|
||||
params.socket,
|
||||
params.opts as StartListenerOptions,
|
||||
params.processQueuedTurn,
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (
|
||||
await deps.resolveRecoveredApprovalResponse(
|
||||
targetRuntime,
|
||||
@@ -422,11 +434,9 @@ async function handleChangeDeviceStateInput(
|
||||
const shouldTrackCommand =
|
||||
!scopedRuntime.isProcessing &&
|
||||
resolvedDeps.getPendingControlRequestCount(listener, scope) === 0;
|
||||
|
||||
if (shouldTrackCommand) {
|
||||
resolvedDeps.setLoopStatus(scopedRuntime, "EXECUTING_COMMAND", scope);
|
||||
}
|
||||
|
||||
try {
|
||||
if (params.command.payload.mode) {
|
||||
resolvedDeps.handleModeChange(
|
||||
@@ -436,7 +446,6 @@ async function handleChangeDeviceStateInput(
|
||||
scope,
|
||||
);
|
||||
}
|
||||
|
||||
if (params.command.payload.cwd) {
|
||||
await resolvedDeps.handleCwdChange(
|
||||
{
|
||||
@@ -949,11 +958,9 @@ async function connectWithRetry(
|
||||
...scopedRuntime.activeExecutingToolCallIds,
|
||||
];
|
||||
}
|
||||
if (
|
||||
scopedRuntime.activeAbortController &&
|
||||
!scopedRuntime.activeAbortController.signal.aborted
|
||||
) {
|
||||
scopedRuntime.activeAbortController.abort();
|
||||
const activeAbortController = scopedRuntime.activeAbortController;
|
||||
if (activeAbortController && !activeAbortController.signal.aborted) {
|
||||
activeAbortController.abort();
|
||||
}
|
||||
const recoveredApprovalState = getRecoveredApprovalStateForScope(
|
||||
runtime,
|
||||
@@ -977,6 +984,10 @@ async function connectWithRetry(
|
||||
});
|
||||
}
|
||||
|
||||
if (!hasActiveTurn) {
|
||||
scopedRuntime.cancelRequested = false;
|
||||
}
|
||||
|
||||
// Backend cancel parity with TUI (App.tsx:5932-5941).
|
||||
// Fire-and-forget — local cancel + queued results are the primary mechanism.
|
||||
const cancelConversationId = scopedRuntime.conversationId;
|
||||
|
||||
@@ -250,10 +250,6 @@ function buildQueuedTurnMessage(
|
||||
};
|
||||
}
|
||||
|
||||
export function shouldQueueInboundMessage(parsed: IncomingMessage): boolean {
|
||||
return parsed.messages.some((payload) => "content" in payload);
|
||||
}
|
||||
|
||||
export function consumeQueuedTurn(runtime: ConversationRuntime): {
|
||||
dequeuedBatch: DequeuedBatch;
|
||||
queuedTurn: IncomingMessage;
|
||||
@@ -279,7 +275,10 @@ export function consumeQueuedTurn(runtime: ConversationRuntime): {
|
||||
}
|
||||
}
|
||||
|
||||
if (!hasMessage || queueLen === 0) {
|
||||
if (!hasMessage) {
|
||||
return null;
|
||||
}
|
||||
if (queueLen === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -299,6 +298,10 @@ export function consumeQueuedTurn(runtime: ConversationRuntime): {
|
||||
};
|
||||
}
|
||||
|
||||
export function shouldQueueInboundMessage(parsed: IncomingMessage): boolean {
|
||||
return parsed.messages.some((payload) => "content" in payload);
|
||||
}
|
||||
|
||||
function computeListenerQueueBlockedReason(
|
||||
runtime: ConversationRuntime,
|
||||
): QueueBlockedReason | null {
|
||||
|
||||
@@ -225,11 +225,9 @@ export function clearConversationRuntimeState(
|
||||
runtime: ConversationRuntime,
|
||||
): void {
|
||||
runtime.cancelRequested = true;
|
||||
if (
|
||||
runtime.activeAbortController &&
|
||||
!runtime.activeAbortController.signal.aborted
|
||||
) {
|
||||
runtime.activeAbortController.abort();
|
||||
const activeAbortController = runtime.activeAbortController;
|
||||
if (activeAbortController && !activeAbortController.signal.aborted) {
|
||||
activeAbortController.abort();
|
||||
}
|
||||
runtime.pendingApprovalBatchByToolCallId.clear();
|
||||
runtime.pendingInterruptedResults = null;
|
||||
|
||||
@@ -95,6 +95,7 @@ export async function handleApprovalStop(params: {
|
||||
currentInput: Array<MessageCreate | ApprovalCreate>;
|
||||
pendingNormalizationInterruptedToolCallIds: string[];
|
||||
turnToolContextId: string | null;
|
||||
abortSignal: AbortSignal;
|
||||
buildSendOptions: () => Parameters<
|
||||
typeof sendApprovalContinuationWithRetry
|
||||
>[2];
|
||||
@@ -112,13 +113,9 @@ export async function handleApprovalStop(params: {
|
||||
msgRunIds,
|
||||
currentInput,
|
||||
turnToolContextId,
|
||||
abortSignal,
|
||||
buildSendOptions,
|
||||
} = params;
|
||||
const abortController = runtime.activeAbortController;
|
||||
|
||||
if (!abortController) {
|
||||
throw new Error("Missing active abort controller during approval handling");
|
||||
}
|
||||
|
||||
if (approvals.length === 0) {
|
||||
runtime.lastStopReason = "error";
|
||||
@@ -283,7 +280,7 @@ export async function handleApprovalStop(params: {
|
||||
|
||||
const executionResults = await executeApprovalBatch(decisions, undefined, {
|
||||
toolContextId: turnToolContextId ?? undefined,
|
||||
abortSignal: abortController.signal,
|
||||
abortSignal,
|
||||
workingDirectory: turnWorkingDirectory,
|
||||
});
|
||||
const persistedExecutionResults = normalizeExecutionResultsForInterruptParity(
|
||||
@@ -331,7 +328,6 @@ export async function handleApprovalStop(params: {
|
||||
nextInput.push(...queuedTurn.messages);
|
||||
emitDequeuedUserMessage(socket, runtime, queuedTurn, dequeuedBatch);
|
||||
}
|
||||
|
||||
setLoopStatus(runtime, "SENDING_API_REQUEST", {
|
||||
agent_id: agentId,
|
||||
conversation_id: conversationId,
|
||||
@@ -342,7 +338,7 @@ export async function handleApprovalStop(params: {
|
||||
buildSendOptions(),
|
||||
socket,
|
||||
runtime,
|
||||
abortController.signal,
|
||||
abortSignal,
|
||||
);
|
||||
if (!stream) {
|
||||
return {
|
||||
|
||||
@@ -122,7 +122,9 @@ export async function handleIncomingMessage(
|
||||
|
||||
runtime.isProcessing = true;
|
||||
runtime.cancelRequested = false;
|
||||
runtime.activeAbortController = new AbortController();
|
||||
const turnAbortController = new AbortController();
|
||||
runtime.activeAbortController = turnAbortController;
|
||||
const turnAbortSignal = turnAbortController.signal;
|
||||
runtime.activeWorkingDirectory = turnWorkingDirectory;
|
||||
runtime.activeRunId = null;
|
||||
runtime.activeRunStartedAt = new Date().toISOString();
|
||||
@@ -240,7 +242,7 @@ export async function handleIncomingMessage(
|
||||
buildSendOptions(),
|
||||
socket,
|
||||
runtime,
|
||||
runtime.activeAbortController.signal,
|
||||
turnAbortSignal,
|
||||
)
|
||||
: await sendMessageStreamWithRetry(
|
||||
conversationId,
|
||||
@@ -248,7 +250,7 @@ export async function handleIncomingMessage(
|
||||
buildSendOptions(),
|
||||
socket,
|
||||
runtime,
|
||||
runtime.activeAbortController.signal,
|
||||
turnAbortSignal,
|
||||
);
|
||||
if (!stream) {
|
||||
return;
|
||||
@@ -275,7 +277,7 @@ export async function handleIncomingMessage(
|
||||
stream as Stream<LettaStreamingResponse>,
|
||||
buffers,
|
||||
() => {},
|
||||
runtime.activeAbortController.signal,
|
||||
turnAbortSignal,
|
||||
undefined,
|
||||
({ chunk, shouldOutput, errorInfo }) => {
|
||||
const maybeRunId = (chunk as { run_id?: unknown }).run_id;
|
||||
@@ -428,7 +430,7 @@ export async function handleIncomingMessage(
|
||||
buildSendOptions(),
|
||||
socket,
|
||||
runtime,
|
||||
runtime.activeAbortController.signal,
|
||||
turnAbortSignal,
|
||||
)
|
||||
: await sendMessageStreamWithRetry(
|
||||
conversationId,
|
||||
@@ -436,7 +438,7 @@ export async function handleIncomingMessage(
|
||||
buildSendOptions(),
|
||||
socket,
|
||||
runtime,
|
||||
runtime.activeAbortController.signal,
|
||||
turnAbortSignal,
|
||||
);
|
||||
if (!stream) {
|
||||
return;
|
||||
@@ -492,7 +494,7 @@ export async function handleIncomingMessage(
|
||||
});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, delayMs));
|
||||
if (runtime.activeAbortController.signal.aborted) {
|
||||
if (turnAbortSignal.aborted) {
|
||||
throw new Error("Cancelled by user");
|
||||
}
|
||||
|
||||
@@ -511,7 +513,7 @@ export async function handleIncomingMessage(
|
||||
buildSendOptions(),
|
||||
socket,
|
||||
runtime,
|
||||
runtime.activeAbortController.signal,
|
||||
turnAbortSignal,
|
||||
)
|
||||
: await sendMessageStreamWithRetry(
|
||||
conversationId,
|
||||
@@ -519,7 +521,7 @@ export async function handleIncomingMessage(
|
||||
buildSendOptions(),
|
||||
socket,
|
||||
runtime,
|
||||
runtime.activeAbortController.signal,
|
||||
turnAbortSignal,
|
||||
);
|
||||
if (!stream) {
|
||||
return;
|
||||
@@ -563,7 +565,7 @@ export async function handleIncomingMessage(
|
||||
});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, delayMs));
|
||||
if (runtime.activeAbortController.signal.aborted) {
|
||||
if (turnAbortSignal.aborted) {
|
||||
throw new Error("Cancelled by user");
|
||||
}
|
||||
|
||||
@@ -582,7 +584,7 @@ export async function handleIncomingMessage(
|
||||
buildSendOptions(),
|
||||
socket,
|
||||
runtime,
|
||||
runtime.activeAbortController.signal,
|
||||
turnAbortSignal,
|
||||
)
|
||||
: await sendMessageStreamWithRetry(
|
||||
conversationId,
|
||||
@@ -590,7 +592,7 @@ export async function handleIncomingMessage(
|
||||
buildSendOptions(),
|
||||
socket,
|
||||
runtime,
|
||||
runtime.activeAbortController.signal,
|
||||
turnAbortSignal,
|
||||
);
|
||||
if (!stream) {
|
||||
return;
|
||||
@@ -672,6 +674,7 @@ export async function handleIncomingMessage(
|
||||
currentInput,
|
||||
pendingNormalizationInterruptedToolCallIds,
|
||||
turnToolContextId,
|
||||
abortSignal: turnAbortSignal,
|
||||
buildSendOptions,
|
||||
});
|
||||
if (approvalResult.terminated || !approvalResult.stream) {
|
||||
|
||||
Reference in New Issue
Block a user