This reverts commit 96a2984ad2, reversing
changes made to 12e61d5b18.
This commit is contained in:
christinatong01
2026-03-23 17:31:32 -07:00
parent 96a2984ad2
commit 524893174e
19 changed files with 74 additions and 448 deletions

View File

@@ -185,7 +185,6 @@ get_fm_value() {
# Skip skill SKILL.md files — they use a different frontmatter format.
for file in $(git diff --cached --name-only --diff-filter=ACM | grep -E '^(memory/)?(system|reference)/.*\\.md$'); do
staged=$(git show ":$file")
staged=$(printf '%s' "$staged" | tr -d '\r')
# Frontmatter is required
first_line=$(echo "$staged" | head -1)

View File

@@ -6,13 +6,6 @@ import { shell } from "../tools/impl/Shell.js";
const isWindows = process.platform === "win32";
function getEchoCommand(...args: string[]): string[] {
if (isWindows) {
return ["cmd.exe", "/c", "echo", ...args];
}
return ["/usr/bin/env", "echo", ...args];
}
describe("shell codex tool", () => {
let tempDir: string;
@@ -25,13 +18,11 @@ describe("shell codex tool", () => {
test("executes simple command with execvp-style args", async () => {
const result = await shell({
command: getEchoCommand("hello", "world"),
command: ["echo", "hello", "world"],
});
expect(result.output.replaceAll('"', "")).toBe("hello world");
expect(result.stdout.join(" ").replaceAll('"', "")).toContain(
"hello world",
);
expect(result.output).toBe("hello world");
expect(result.stdout).toContain("hello world");
expect(result.stderr.length).toBe(0);
});
@@ -63,10 +54,10 @@ describe("shell codex tool", () => {
// This is the key test for execvp semantics - args with spaces
// should NOT be split
const result = await shell({
command: getEchoCommand("hello world", "foo bar"),
command: ["echo", "hello world", "foo bar"],
});
expect(result.output.replaceAll('"', "")).toBe("hello world foo bar");
expect(result.output).toBe("hello world foo bar");
});
test.skipIf(isWindows)("respects workdir parameter", async () => {
@@ -189,7 +180,7 @@ describe("shell codex tool", () => {
test("handles special characters in arguments", async () => {
const result = await shell({
command: getEchoCommand("$HOME", "$(whoami)", "`date`"),
command: ["echo", "$HOME", "$(whoami)", "`date`"],
});
// Since we're using execvp-style (not shell expansion),

View File

@@ -1,6 +1,5 @@
import { afterEach, describe, expect, test } from "bun:test";
import { glob } from "../../tools/impl/Glob";
import { executeTool, loadSpecificTools } from "../../tools/manager";
import { TestDirectory } from "../helpers/testFs";
describe("Glob tool", () => {
@@ -45,60 +44,4 @@ describe("Glob tool", () => {
expect(result.files).toEqual([]);
});
test("aborts promptly when signal is already aborted", async () => {
testDir = new TestDirectory();
testDir.createFile("a.ts", "");
const abortController = new AbortController();
abortController.abort();
await expect(
glob({
pattern: "**/*.ts",
path: testDir.path,
signal: abortController.signal,
}),
).rejects.toMatchObject({ name: "AbortError" });
});
test("manager passes signal through to Glob execution", async () => {
testDir = new TestDirectory();
testDir.createFile("a.ts", "");
await loadSpecificTools(["Glob"]);
const abortController = new AbortController();
abortController.abort();
const result = await executeTool(
"Glob",
{ pattern: "**/*.ts", path: testDir.path },
{ signal: abortController.signal },
);
expect(result.status).toBe("error");
expect(typeof result.toolReturn).toBe("string");
expect(result.toolReturn).toContain("Interrupted by user");
});
test("manager passes signal through to GlobGemini execution", async () => {
testDir = new TestDirectory();
testDir.createFile("a.ts", "");
await loadSpecificTools(["GlobGemini"]);
const abortController = new AbortController();
abortController.abort();
const result = await executeTool(
"GlobGemini",
{ pattern: "**/*.ts", dir_path: testDir.path },
{ signal: abortController.signal },
);
expect(result.status).toBe("error");
expect(typeof result.toolReturn).toBe("string");
expect(result.toolReturn).toContain("Interrupted by user");
});
});

View File

@@ -1,6 +1,5 @@
import { afterEach, describe, expect, test } from "bun:test";
import { grep } from "../../tools/impl/Grep";
import { executeTool, loadSpecificTools } from "../../tools/manager";
import { TestDirectory } from "../helpers/testFs";
describe("Grep tool", () => {
@@ -145,61 +144,4 @@ describe("Grep tool", () => {
}
}
});
test("aborts promptly when signal is already aborted", async () => {
testDir = new TestDirectory();
testDir.createFile("test.txt", "Hello World");
const abortController = new AbortController();
abortController.abort();
await expect(
grep({
pattern: "World",
path: testDir.path,
output_mode: "content",
signal: abortController.signal,
}),
).rejects.toMatchObject({ name: "AbortError" });
});
test("manager passes signal through to Grep execution", async () => {
testDir = new TestDirectory();
testDir.createFile("test.txt", "Hello World");
await loadSpecificTools(["Grep"]);
const abortController = new AbortController();
abortController.abort();
const result = await executeTool(
"Grep",
{ pattern: "World", path: testDir.path, output_mode: "content" },
{ signal: abortController.signal },
);
expect(result.status).toBe("error");
expect(typeof result.toolReturn).toBe("string");
expect(result.toolReturn).toContain("Interrupted by user");
});
test("manager passes signal through to GrepFiles execution", async () => {
testDir = new TestDirectory();
testDir.createFile("test.txt", "Hello World");
await loadSpecificTools(["GrepFiles"]);
const abortController = new AbortController();
abortController.abort();
const result = await executeTool(
"GrepFiles",
{ pattern: "World", path: testDir.path },
{ signal: abortController.signal },
);
expect(result.status).toBe("error");
expect(typeof result.toolReturn).toBe("string");
expect(result.toolReturn).toContain("Interrupted by user");
});
});

View File

@@ -1,6 +1,5 @@
import { afterEach, describe, expect, test } from "bun:test";
import { search_file_content } from "../../tools/impl/SearchFileContentGemini";
import { executeTool, loadSpecificTools } from "../../tools/manager";
import { TestDirectory } from "../helpers/testFs";
describe("SearchFileContent tool", () => {
@@ -75,40 +74,4 @@ describe("SearchFileContent tool", () => {
expect(result.message).toContain("Hello World");
});
test("aborts promptly when signal is already aborted", async () => {
testDir = new TestDirectory();
testDir.createFile("test.txt", "Hello World");
const abortController = new AbortController();
abortController.abort();
await expect(
search_file_content({
pattern: "Hello",
dir_path: testDir.path,
signal: abortController.signal,
}),
).rejects.toMatchObject({ name: "AbortError" });
});
test("manager passes signal through to SearchFileContent execution", async () => {
testDir = new TestDirectory();
testDir.createFile("test.txt", "Hello World");
await loadSpecificTools(["SearchFileContent"]);
const abortController = new AbortController();
abortController.abort();
const result = await executeTool(
"SearchFileContent",
{ pattern: "Hello", dir_path: testDir.path },
{ signal: abortController.signal },
);
expect(result.status).toBe("error");
expect(typeof result.toolReturn).toBe("string");
expect(result.toolReturn).toContain("Interrupted by user");
});
});

View File

@@ -1,5 +1,6 @@
import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test";
import WebSocket from "ws";
import type { ResumeData } from "../../agent/check-approval";
import { permissionMode } from "../../permissions/mode";
import type {
MessageQueueItem,
@@ -67,6 +68,13 @@ const getClientMock = mock(async () => ({
cancel: cancelConversationMock,
},
}));
const getResumeDataMock = mock(
async (): Promise<ResumeData> => ({
pendingApproval: null,
pendingApprovals: [],
messageHistory: [],
}),
);
const classifyApprovalsMock = mock(async () => ({
autoAllowed: [],
autoDenied: [],
@@ -195,6 +203,7 @@ describe("listen-client multi-worker concurrency", () => {
drainStreamWithResumeMock.mockClear();
getClientMock.mockClear();
retrieveAgentMock.mockClear();
getResumeDataMock.mockClear();
classifyApprovalsMock.mockClear();
executeApprovalBatchMock.mockClear();
cancelConversationMock.mockClear();
@@ -697,6 +706,11 @@ describe("listen-client multi-worker concurrency", () => {
status: "success",
};
getResumeDataMock.mockResolvedValueOnce({
pendingApproval: approval,
pendingApprovals: [approval],
messageHistory: [],
});
classifyApprovalsMock.mockResolvedValueOnce({
autoAllowed: [
{
@@ -743,13 +757,7 @@ describe("listen-client multi-worker concurrency", () => {
runtime,
socket as unknown as WebSocket,
new AbortController().signal,
{
getResumeData: async () => ({
pendingApproval: approval,
pendingApprovals: [approval],
messageHistory: [],
}),
},
{ getResumeData: getResumeDataMock },
);
await waitFor(() => sendMessageStreamMock.mock.calls.length === 1);
@@ -854,77 +862,6 @@ describe("listen-client multi-worker concurrency", () => {
expect(listener.conversationRuntimes.has(runtimeA.key)).toBe(true);
});
test("stale approval response after approval-only interrupt unlatches cancelRequested and allows queue pump", async () => {
const listener = __listenClientTestUtils.createListenerRuntime();
__listenClientTestUtils.setActiveRuntime(listener);
const runtime = __listenClientTestUtils.getOrCreateScopedRuntime(
listener,
"agent-1",
"conv-a",
);
const socket = new MockSocket();
runtime.cancelRequested = true;
runtime.isProcessing = false;
runtime.loopStatus = "WAITING_ON_INPUT";
const queueInput = {
kind: "message",
source: "user",
content: "queued after stale approval",
clientMessageId: "cm-stale-approval",
agentId: "agent-1",
conversationId: "conv-a",
} satisfies Omit<MessageQueueItem, "id" | "enqueuedAt">;
const item = runtime.queueRuntime.enqueue(queueInput);
if (!item) {
throw new Error("Expected queued item to be created");
}
runtime.queuedMessagesByItemId.set(
item.id,
makeIncomingMessage("agent-1", "conv-a", "queued after stale approval"),
);
const scheduleQueuePumpMock = mock(() => {
__listenClientTestUtils.scheduleQueuePump(
runtime,
socket as unknown as WebSocket,
{} as never,
async () => {},
);
});
const handled = await __listenClientTestUtils.handleApprovalResponseInput(
listener,
{
runtime: { agent_id: "agent-1", conversation_id: "conv-a" },
response: {
request_id: "perm-stale-after-approval-only-interrupt",
decision: { behavior: "allow" },
},
socket: socket as unknown as WebSocket,
opts: {
onStatusChange: undefined,
connectionId: "conn-1",
},
processQueuedTurn: async () => {},
},
{
resolveRuntimeForApprovalRequest: () => null,
resolvePendingApprovalResolver: () => false,
getOrCreateScopedRuntime: () => runtime,
resolveRecoveredApprovalResponse: async () => false,
scheduleQueuePump: scheduleQueuePumpMock,
},
);
expect(handled).toBe(false);
expect(runtime.cancelRequested).toBe(false);
expect(scheduleQueuePumpMock).toHaveBeenCalledTimes(1);
await waitFor(() => runtime.queuePumpScheduled === false);
expect(runtime.queueRuntime.length).toBe(0);
});
test("change_device_state command holds queued input until the tracked command completes", async () => {
const listener = __listenClientTestUtils.createListenerRuntime();
__listenClientTestUtils.setActiveRuntime(listener);

View File

@@ -277,35 +277,6 @@ describe("listen-client approval resolver wiring", () => {
expect(runtime.pendingApprovalResolvers.size).toBe(0);
});
test("resolving final approval response restores WAITING_ON_INPUT even while processing stays true", async () => {
const runtime = __listenClientTestUtils.createRuntime();
const socket = new MockSocket(WebSocket.OPEN);
const requestId = "perm-processing";
runtime.isProcessing = true;
runtime.loopStatus = "WAITING_ON_APPROVAL" as never;
const pending = requestApprovalOverWS(
runtime,
socket as unknown as WebSocket,
requestId,
makeControlRequest(requestId),
);
const resolved = resolvePendingApprovalResolver(
runtime,
makeSuccessResponse(requestId),
);
expect(resolved).toBe(true);
await expect(pending).resolves.toMatchObject({
request_id: requestId,
decision: { behavior: "allow" },
});
expect(String(runtime.loopStatus)).toBe("WAITING_ON_INPUT");
expect(runtime.isProcessing).toBe(true);
});
test("ignores non-matching request_id and keeps pending resolver", async () => {
const runtime = __listenClientTestUtils.createRuntime();
const socket = new MockSocket(WebSocket.OPEN);
@@ -358,9 +329,8 @@ describe("listen-client approval resolver wiring", () => {
await expect(second).rejects.toThrow("socket closed");
});
test("cleanup resets loop status to WAITING_ON_INPUT even while processing stays true", async () => {
test("cleanup resets WAITING_ON_INPUT instead of restoring fake processing", async () => {
const runtime = __listenClientTestUtils.createRuntime();
runtime.isProcessing = true;
runtime.loopStatus = "WAITING_ON_APPROVAL";
@@ -371,7 +341,6 @@ describe("listen-client approval resolver wiring", () => {
rejectPendingApprovalResolvers(runtime, "socket closed");
expect(runtime.loopStatus as string).toBe("WAITING_ON_INPUT");
expect(runtime.isProcessing).toBe(true);
await expect(pending).rejects.toThrow("socket closed");
});
@@ -1333,59 +1302,6 @@ describe("listen-client capability-gated approval flow", () => {
expect.any(Function),
);
});
test("stale approval responses after interrupt are benign and do not mutate runtime state", async () => {
const listener = __listenClientTestUtils.createListenerRuntime();
const targetRuntime =
__listenClientTestUtils.getOrCreateConversationRuntime(
listener,
"agent-1",
"default",
);
targetRuntime.cancelRequested = true;
targetRuntime.loopStatus = "WAITING_ON_INPUT";
targetRuntime.isProcessing = false;
const socket = new MockSocket(WebSocket.OPEN);
const scheduleQueuePumpMock = mock(() => {});
const resolveRecoveredApprovalResponseMock = mock(async () => false);
const handled = await __listenClientTestUtils.handleApprovalResponseInput(
listener,
{
runtime: { agent_id: "agent-1", conversation_id: "default" },
response: {
request_id: "perm-stale-after-interrupt",
decision: { behavior: "allow" },
},
socket: socket as unknown as WebSocket,
opts: {
onStatusChange: undefined,
connectionId: "conn-1",
},
processQueuedTurn: async () => {},
},
{
resolveRuntimeForApprovalRequest: () => null,
resolvePendingApprovalResolver: () => false,
getOrCreateScopedRuntime: () => targetRuntime,
resolveRecoveredApprovalResponse: resolveRecoveredApprovalResponseMock,
scheduleQueuePump: scheduleQueuePumpMock,
},
);
expect(handled).toBe(false);
expect(resolveRecoveredApprovalResponseMock).not.toHaveBeenCalled();
expect(scheduleQueuePumpMock).toHaveBeenCalledWith(
targetRuntime,
socket,
expect.objectContaining({ connectionId: "conn-1" }),
expect.any(Function),
);
expect(targetRuntime.cancelRequested).toBe(false);
expect(targetRuntime.loopStatus).toBe("WAITING_ON_INPUT");
expect(targetRuntime.isProcessing).toBe(false);
});
});
describe("listen-client approval recovery batch correlation", () => {

View File

@@ -24,7 +24,6 @@ const rgPath = getRipgrepPath();
interface GlobArgs {
pattern: string;
path?: string;
signal?: AbortSignal;
}
interface GlobResult {
@@ -60,7 +59,7 @@ function applyFileLimit(files: string[], workingDirectory: string): GlobResult {
export async function glob(args: GlobArgs): Promise<GlobResult> {
validateRequiredParams(args, ["pattern"], "Glob");
const { pattern, path: searchPath, signal } = args;
const { pattern, path: searchPath } = args;
// Explicit check for undefined/empty pattern (validateRequiredParams only checks key existence)
if (!pattern) {
@@ -94,7 +93,6 @@ export async function glob(args: GlobArgs): Promise<GlobResult> {
const { stdout } = await execFileAsync(rgPath, rgArgs, {
maxBuffer: 50 * 1024 * 1024, // 50MB buffer for large file lists
cwd: userCwd,
signal,
});
const files = stdout.trim().split("\n").filter(Boolean).sort();
@@ -106,16 +104,6 @@ export async function glob(args: GlobArgs): Promise<GlobResult> {
code?: string | number;
};
const isAbortError =
err.name === "AbortError" ||
err.code === "ABORT_ERR" ||
err.message === "The operation was aborted";
if (isAbortError) {
throw Object.assign(new Error("The operation was aborted"), {
name: "AbortError",
});
}
// ripgrep exits with code 1 when no files match - that's not an error
if (err.code === 1 || err.code === "1") {
return { files: [] };

View File

@@ -11,7 +11,6 @@ interface GlobGeminiArgs {
case_sensitive?: boolean;
respect_git_ignore?: boolean;
respect_gemini_ignore?: boolean;
signal?: AbortSignal;
}
export async function glob_gemini(
@@ -21,7 +20,6 @@ export async function glob_gemini(
const lettaArgs = {
pattern: args.pattern,
path: args.dir_path,
signal: args.signal,
};
const result = await lettaGlob(lettaArgs);

View File

@@ -47,7 +47,6 @@ export interface GrepArgs {
head_limit?: number;
offset?: number;
multiline?: boolean;
signal?: AbortSignal;
}
interface GrepResult {
@@ -72,7 +71,6 @@ export async function grep(args: GrepArgs): Promise<GrepResult> {
head_limit = 100,
offset = 0,
multiline,
signal,
} = args;
const userCwd = process.env.USER_CWD || process.cwd();
@@ -104,7 +102,6 @@ export async function grep(args: GrepArgs): Promise<GrepResult> {
const { stdout } = await execFileAsync(rgPath, rgArgs, {
maxBuffer: 10 * 1024 * 1024,
cwd: userCwd,
signal,
});
if (output_mode === "files_with_matches") {
const allFiles = stdout.trim().split("\n").filter(Boolean);
@@ -181,21 +178,12 @@ export async function grep(args: GrepArgs): Promise<GrepResult> {
} catch (error) {
const err = error as NodeJS.ErrnoException & {
stdout?: string;
code?: string | number;
};
const code = err.code !== undefined ? String(err.code) : undefined;
const code = typeof err.code === "number" ? err.code : undefined;
const _stdout = typeof err.stdout === "string" ? err.stdout : "";
const message =
typeof err.message === "string" ? err.message : "Unknown error";
const isAbortError =
err.name === "AbortError" ||
err.code === "ABORT_ERR" ||
err.message === "The operation was aborted";
if (isAbortError) {
throw Object.assign(new Error("The operation was aborted"), {
name: "AbortError",
});
}
if (code === "1") {
if (code === 1) {
if (output_mode === "files_with_matches")
return { output: "No files found", files: 0 };
if (output_mode === "count")

View File

@@ -6,7 +6,6 @@ interface GrepFilesArgs {
include?: string;
path?: string;
limit?: number;
signal?: AbortSignal;
}
interface GrepFilesResult {
@@ -27,14 +26,13 @@ export async function grep_files(
): Promise<GrepFilesResult> {
validateRequiredParams(args, ["pattern"], "grep_files");
const { pattern, include, path, limit = DEFAULT_LIMIT, signal } = args;
const { pattern, include, path, limit = DEFAULT_LIMIT } = args;
const grepArgs: GrepArgs = {
pattern,
path,
glob: include,
output_mode: "files_with_matches",
signal,
};
const result = await grep(grepArgs);

View File

@@ -9,7 +9,6 @@ interface SearchFileContentGeminiArgs {
pattern: string;
dir_path?: string;
include?: string;
signal?: AbortSignal;
}
export async function search_file_content(
@@ -21,7 +20,6 @@ export async function search_file_content(
path: args.dir_path,
glob: args.include,
output_mode: "content" as const, // Return actual matching lines, not just file paths
signal: args.signal,
};
const result = await grep(lettaArgs);

View File

@@ -46,7 +46,7 @@ const FILE_MODIFYING_TOOLS = new Set([
]);
export const TOOL_NAMES = Object.keys(TOOL_DEFINITIONS) as ToolName[];
const SIGNAL_AWARE_TOOLS = new Set([
const STREAMING_SHELL_TOOLS = new Set([
"Bash",
"BashOutput",
"TaskOutput",
@@ -56,14 +56,6 @@ const SIGNAL_AWARE_TOOLS = new Set([
"Shell",
"run_shell_command",
"RunShellCommand",
"Glob",
"Grep",
"grep_files",
"GrepFiles",
"glob_gemini",
"GlobGemini",
"search_file_content",
"SearchFileContent",
]);
// Maps internal tool names to server/model-facing tool names
@@ -1332,22 +1324,13 @@ export async function executeTool(
// Inject options for tools that support them without altering schemas
let enhancedArgs = args;
if (SIGNAL_AWARE_TOOLS.has(internalName) && options?.signal) {
enhancedArgs = { ...enhancedArgs, signal: options.signal };
}
if (
(internalName === "Bash" ||
internalName === "BashOutput" ||
internalName === "shell_command" ||
internalName === "ShellCommand" ||
internalName === "shell" ||
internalName === "Shell" ||
internalName === "run_shell_command" ||
internalName === "RunShellCommand") &&
options?.onOutput
) {
enhancedArgs = { ...enhancedArgs, onOutput: options.onOutput };
if (STREAMING_SHELL_TOOLS.has(internalName)) {
if (options?.signal) {
enhancedArgs = { ...enhancedArgs, signal: options.signal };
}
if (options?.onOutput) {
enhancedArgs = { ...enhancedArgs, onOutput: options.onOutput };
}
}
// Inject toolCallId and abort signal for Task tool

View File

@@ -12,14 +12,7 @@ export type ListenerQueueGatingConditions = {
export function getListenerBlockedReason(
c: ListenerQueueGatingConditions,
): QueueBlockedReason | null {
if (
c.cancelRequested &&
(c.isProcessing ||
c.isRecoveringApprovals ||
c.loopStatus !== "WAITING_ON_INPUT")
) {
return "interrupt_in_progress";
}
if (c.cancelRequested) return "interrupt_in_progress";
if (c.pendingApprovalsLen > 0) return "pending_approvals";
if (c.isRecoveringApprovals) return "runtime_busy";
if (c.loopStatus === "WAITING_ON_APPROVAL") return "pending_approvals";

View File

@@ -339,18 +339,6 @@ async function handleApprovalResponseInput(
params.runtime.agent_id,
params.runtime.conversation_id,
);
if (targetRuntime.cancelRequested && !targetRuntime.isProcessing) {
targetRuntime.cancelRequested = false;
deps.scheduleQueuePump(
targetRuntime,
params.socket,
params.opts as StartListenerOptions,
params.processQueuedTurn,
);
return false;
}
if (
await deps.resolveRecoveredApprovalResponse(
targetRuntime,
@@ -434,9 +422,11 @@ async function handleChangeDeviceStateInput(
const shouldTrackCommand =
!scopedRuntime.isProcessing &&
resolvedDeps.getPendingControlRequestCount(listener, scope) === 0;
if (shouldTrackCommand) {
resolvedDeps.setLoopStatus(scopedRuntime, "EXECUTING_COMMAND", scope);
}
try {
if (params.command.payload.mode) {
resolvedDeps.handleModeChange(
@@ -446,6 +436,7 @@ async function handleChangeDeviceStateInput(
scope,
);
}
if (params.command.payload.cwd) {
await resolvedDeps.handleCwdChange(
{
@@ -958,9 +949,11 @@ async function connectWithRetry(
...scopedRuntime.activeExecutingToolCallIds,
];
}
const activeAbortController = scopedRuntime.activeAbortController;
if (activeAbortController && !activeAbortController.signal.aborted) {
activeAbortController.abort();
if (
scopedRuntime.activeAbortController &&
!scopedRuntime.activeAbortController.signal.aborted
) {
scopedRuntime.activeAbortController.abort();
}
const recoveredApprovalState = getRecoveredApprovalStateForScope(
runtime,
@@ -984,10 +977,6 @@ async function connectWithRetry(
});
}
if (!hasActiveTurn) {
scopedRuntime.cancelRequested = false;
}
// Backend cancel parity with TUI (App.tsx:5932-5941).
// Fire-and-forget — local cancel + queued results are the primary mechanism.
const cancelConversationId = scopedRuntime.conversationId;

View File

@@ -250,6 +250,10 @@ function buildQueuedTurnMessage(
};
}
export function shouldQueueInboundMessage(parsed: IncomingMessage): boolean {
return parsed.messages.some((payload) => "content" in payload);
}
export function consumeQueuedTurn(runtime: ConversationRuntime): {
dequeuedBatch: DequeuedBatch;
queuedTurn: IncomingMessage;
@@ -275,10 +279,7 @@ export function consumeQueuedTurn(runtime: ConversationRuntime): {
}
}
if (!hasMessage) {
return null;
}
if (queueLen === 0) {
if (!hasMessage || queueLen === 0) {
return null;
}
@@ -298,10 +299,6 @@ export function consumeQueuedTurn(runtime: ConversationRuntime): {
};
}
export function shouldQueueInboundMessage(parsed: IncomingMessage): boolean {
return parsed.messages.some((payload) => "content" in payload);
}
function computeListenerQueueBlockedReason(
runtime: ConversationRuntime,
): QueueBlockedReason | null {

View File

@@ -225,9 +225,11 @@ export function clearConversationRuntimeState(
runtime: ConversationRuntime,
): void {
runtime.cancelRequested = true;
const activeAbortController = runtime.activeAbortController;
if (activeAbortController && !activeAbortController.signal.aborted) {
activeAbortController.abort();
if (
runtime.activeAbortController &&
!runtime.activeAbortController.signal.aborted
) {
runtime.activeAbortController.abort();
}
runtime.pendingApprovalBatchByToolCallId.clear();
runtime.pendingInterruptedResults = null;

View File

@@ -95,7 +95,6 @@ export async function handleApprovalStop(params: {
currentInput: Array<MessageCreate | ApprovalCreate>;
pendingNormalizationInterruptedToolCallIds: string[];
turnToolContextId: string | null;
abortSignal: AbortSignal;
buildSendOptions: () => Parameters<
typeof sendApprovalContinuationWithRetry
>[2];
@@ -113,9 +112,13 @@ export async function handleApprovalStop(params: {
msgRunIds,
currentInput,
turnToolContextId,
abortSignal,
buildSendOptions,
} = params;
const abortController = runtime.activeAbortController;
if (!abortController) {
throw new Error("Missing active abort controller during approval handling");
}
if (approvals.length === 0) {
runtime.lastStopReason = "error";
@@ -280,7 +283,7 @@ export async function handleApprovalStop(params: {
const executionResults = await executeApprovalBatch(decisions, undefined, {
toolContextId: turnToolContextId ?? undefined,
abortSignal,
abortSignal: abortController.signal,
workingDirectory: turnWorkingDirectory,
});
const persistedExecutionResults = normalizeExecutionResultsForInterruptParity(
@@ -328,6 +331,7 @@ export async function handleApprovalStop(params: {
nextInput.push(...queuedTurn.messages);
emitDequeuedUserMessage(socket, runtime, queuedTurn, dequeuedBatch);
}
setLoopStatus(runtime, "SENDING_API_REQUEST", {
agent_id: agentId,
conversation_id: conversationId,
@@ -338,7 +342,7 @@ export async function handleApprovalStop(params: {
buildSendOptions(),
socket,
runtime,
abortSignal,
abortController.signal,
);
if (!stream) {
return {

View File

@@ -122,9 +122,7 @@ export async function handleIncomingMessage(
runtime.isProcessing = true;
runtime.cancelRequested = false;
const turnAbortController = new AbortController();
runtime.activeAbortController = turnAbortController;
const turnAbortSignal = turnAbortController.signal;
runtime.activeAbortController = new AbortController();
runtime.activeWorkingDirectory = turnWorkingDirectory;
runtime.activeRunId = null;
runtime.activeRunStartedAt = new Date().toISOString();
@@ -242,7 +240,7 @@ export async function handleIncomingMessage(
buildSendOptions(),
socket,
runtime,
turnAbortSignal,
runtime.activeAbortController.signal,
)
: await sendMessageStreamWithRetry(
conversationId,
@@ -250,7 +248,7 @@ export async function handleIncomingMessage(
buildSendOptions(),
socket,
runtime,
turnAbortSignal,
runtime.activeAbortController.signal,
);
if (!stream) {
return;
@@ -277,7 +275,7 @@ export async function handleIncomingMessage(
stream as Stream<LettaStreamingResponse>,
buffers,
() => {},
turnAbortSignal,
runtime.activeAbortController.signal,
undefined,
({ chunk, shouldOutput, errorInfo }) => {
const maybeRunId = (chunk as { run_id?: unknown }).run_id;
@@ -430,7 +428,7 @@ export async function handleIncomingMessage(
buildSendOptions(),
socket,
runtime,
turnAbortSignal,
runtime.activeAbortController.signal,
)
: await sendMessageStreamWithRetry(
conversationId,
@@ -438,7 +436,7 @@ export async function handleIncomingMessage(
buildSendOptions(),
socket,
runtime,
turnAbortSignal,
runtime.activeAbortController.signal,
);
if (!stream) {
return;
@@ -494,7 +492,7 @@ export async function handleIncomingMessage(
});
await new Promise((resolve) => setTimeout(resolve, delayMs));
if (turnAbortSignal.aborted) {
if (runtime.activeAbortController.signal.aborted) {
throw new Error("Cancelled by user");
}
@@ -513,7 +511,7 @@ export async function handleIncomingMessage(
buildSendOptions(),
socket,
runtime,
turnAbortSignal,
runtime.activeAbortController.signal,
)
: await sendMessageStreamWithRetry(
conversationId,
@@ -521,7 +519,7 @@ export async function handleIncomingMessage(
buildSendOptions(),
socket,
runtime,
turnAbortSignal,
runtime.activeAbortController.signal,
);
if (!stream) {
return;
@@ -565,7 +563,7 @@ export async function handleIncomingMessage(
});
await new Promise((resolve) => setTimeout(resolve, delayMs));
if (turnAbortSignal.aborted) {
if (runtime.activeAbortController.signal.aborted) {
throw new Error("Cancelled by user");
}
@@ -584,7 +582,7 @@ export async function handleIncomingMessage(
buildSendOptions(),
socket,
runtime,
turnAbortSignal,
runtime.activeAbortController.signal,
)
: await sendMessageStreamWithRetry(
conversationId,
@@ -592,7 +590,7 @@ export async function handleIncomingMessage(
buildSendOptions(),
socket,
runtime,
turnAbortSignal,
runtime.activeAbortController.signal,
);
if (!stream) {
return;
@@ -674,7 +672,6 @@ export async function handleIncomingMessage(
currentInput,
pendingNormalizationInterruptedToolCallIds,
turnToolContextId,
abortSignal: turnAbortSignal,
buildSendOptions,
});
if (approvalResult.terminated || !approvalResult.stream) {