feat(listen): make listener runtime conversation-scoped (#1418)

This commit is contained in:
Charles Packer
2026-03-16 22:36:52 -07:00
committed by GitHub
parent 32e042d528
commit 3797560cb2
22 changed files with 2161 additions and 435 deletions

View File

@@ -8,8 +8,10 @@
import type { Letta } from "@letta-ai/letta-client";
import type { AgentState } from "@letta-ai/letta-client/resources/agents/agents";
import { settingsManager } from "../settings-manager";
import { getServerUrl } from "./client";
import { type CreateAgentOptions, createAgent } from "./create";
import { parseMdxFrontmatter } from "./memory";
import { getDefaultModel, resolveModel } from "./model";
import { MEMORY_PROMPTS } from "./promptAssets";
// Tags used to identify default agents
@@ -47,6 +49,80 @@ export const DEFAULT_AGENT_CONFIGS: Record<string, CreateAgentOptions> = {
},
};
function isSelfHostedServer(): boolean {
return !getServerUrl().includes("api.letta.com");
}
export function selectDefaultAgentModel(params: {
preferredModel?: string;
isSelfHosted: boolean;
availableHandles?: Iterable<string>;
}): string | undefined {
const { preferredModel, isSelfHosted, availableHandles } = params;
const resolvedPreferred =
typeof preferredModel === "string" && preferredModel.length > 0
? (resolveModel(preferredModel) ?? preferredModel)
: undefined;
if (!isSelfHosted) {
return resolvedPreferred;
}
const handles = availableHandles ? new Set(availableHandles) : null;
if (!handles) {
return resolvedPreferred;
}
if (resolvedPreferred && handles.has(resolvedPreferred)) {
return resolvedPreferred;
}
const firstNonAutoHandle = Array.from(handles).find(
(handle) => handle !== "letta/auto" && handle !== "letta/auto-fast",
);
if (firstNonAutoHandle) {
return firstNonAutoHandle;
}
const defaultHandle = getDefaultModel();
if (handles.has(defaultHandle)) {
return defaultHandle;
}
return Array.from(handles)[0];
}
async function resolveDefaultAgentModel(
client: Letta,
preferredModel?: string,
): Promise<string | undefined> {
if (!isSelfHostedServer()) {
return selectDefaultAgentModel({
preferredModel,
isSelfHosted: false,
});
}
try {
const availableHandles = new Set(
(await client.models.list())
.map((model) => model.handle)
.filter((handle): handle is string => typeof handle === "string"),
);
return selectDefaultAgentModel({
preferredModel,
isSelfHosted: true,
availableHandles,
});
} catch {
return selectDefaultAgentModel({
preferredModel,
isSelfHosted: true,
});
}
}
/**
* Add a tag to an existing agent.
*/
@@ -81,6 +157,9 @@ async function addTagToAgent(
*/
export async function ensureDefaultAgents(
client: Letta,
options?: {
preferredModel?: string;
},
): Promise<AgentState | null> {
if (!settingsManager.shouldCreateDefaultAgents()) {
return null;
@@ -95,6 +174,7 @@ export async function ensureDefaultAgents(
const { agent } = await createAgent({
...DEFAULT_AGENT_CONFIGS.memo,
model: await resolveDefaultAgentModel(client, options?.preferredModel),
memoryPromptMode: willAutoEnableMemfs ? "memfs" : undefined,
});
await addTagToAgent(client, agent.id, MEMO_TAG);

View File

@@ -1,5 +1,5 @@
// Machine god AI themed thinking verbs
const THINKING_VERBS = [
const THINKING_VERBS = Object.freeze([
"thinking",
"processing",
"computing",
@@ -41,23 +41,23 @@ const THINKING_VERBS = [
"remembering",
"absorbing",
"internalizing",
] as const;
] as const);
export const SYSTEM_PROMPT_UPGRADE_TIP =
"Use /system to upgrade to the latest default prompt.";
export const THINKING_TIPS = [
export const THINKING_TIPS = Object.freeze([
"Use /remember [instructions] to remember something from the conversation.",
"Use /palace to inspect your agent's memory palace.",
"Use /reflect to launch a background reflection agent to update memory.",
"Use /search [query] to search messages across all agents.",
"Use /init to initialize (or re-init) your agent's memory.",
] as const;
] as const);
const THINKING_TIPS_WITH_SYSTEM_UPGRADE = [
const THINKING_TIPS_WITH_SYSTEM_UPGRADE = Object.freeze([
...THINKING_TIPS,
SYSTEM_PROMPT_UPGRADE_TIP,
];
] as const);
type ThinkingVerb = (typeof THINKING_VERBS)[number];
@@ -141,9 +141,6 @@ export function getRandomThinkingTip(options?: {
(options?.includeSystemPromptUpgradeTip ?? true)
? THINKING_TIPS_WITH_SYSTEM_UPGRADE
: THINKING_TIPS;
if (tipPool.length === 0) {
return "";
}
const index = Math.floor(Math.random() * tipPool.length);
return tipPool[index] ?? "";
return tipPool[index] ?? tipPool[0] ?? THINKING_TIPS[0] ?? "";
}

View File

@@ -870,7 +870,9 @@ export async function handleHeadlessCommand(
// Priority 7: Fresh user with no LRU - create default agent
if (!agent) {
const { ensureDefaultAgents } = await import("./agent/defaults");
const defaultAgent = await ensureDefaultAgents(client);
const defaultAgent = await ensureDefaultAgents(client, {
preferredModel: model,
});
if (defaultAgent) {
agent = defaultAgent;
}

View File

@@ -1401,7 +1401,9 @@ async function main(): Promise<void> {
case "create": {
const { ensureDefaultAgents } = await import("./agent/defaults");
try {
const defaultAgent = await ensureDefaultAgents(client);
const defaultAgent = await ensureDefaultAgents(client, {
preferredModel: model,
});
if (defaultAgent) {
setSelectedGlobalAgentId(defaultAgent.id);
setLoadingState("assembling");

View File

@@ -0,0 +1,33 @@
import { describe, expect, test } from "bun:test";
import { selectDefaultAgentModel } from "../../agent/defaults";
describe("selectDefaultAgentModel", () => {
test("uses the caller's preferred model when it is available on self-hosted", () => {
const result = selectDefaultAgentModel({
preferredModel: "haiku",
isSelfHosted: true,
availableHandles: ["anthropic/claude-haiku-4-5"],
});
expect(result).toBe("anthropic/claude-haiku-4-5");
});
test("falls back to a server-available non-auto handle on self-hosted", () => {
const result = selectDefaultAgentModel({
isSelfHosted: true,
availableHandles: ["letta/auto", "anthropic/claude-haiku-4-5"],
});
expect(result).toBe("anthropic/claude-haiku-4-5");
});
test("passes through the preferred model on cloud", () => {
const result = selectDefaultAgentModel({
preferredModel: "haiku",
isSelfHosted: false,
availableHandles: ["letta/auto"],
});
expect(result).toBe("anthropic/claude-haiku-4-5");
});
});

View File

@@ -50,7 +50,9 @@ describe("Thinking messages", () => {
});
test("returns a tip from the configured tip list", () => {
const tip = getRandomThinkingTip();
const tip = getRandomThinkingTip({
includeSystemPromptUpgradeTip: false,
});
expect(tip.length).toBeGreaterThan(0);
expect((THINKING_TIPS as readonly string[]).includes(tip)).toBe(true);

View File

@@ -93,52 +93,105 @@ async function runCLI(
return { stdout: out, code };
}
const REQUIRED_MARKERS = ["BANANA"];
const MAX_ATTEMPTS = 2;
function assertContainsAll(hay: string, needles: string[]) {
for (const n of needles) {
if (!hay.includes(n)) throw new Error(`Missing expected output: ${n}`);
}
}
function extractStreamJsonAssistantText(stdout: string): string {
const parts: string[] = [];
for (const line of stdout.split(/\r?\n/)) {
if (!line.trim()) continue;
try {
const event = JSON.parse(line) as {
type?: string;
message_type?: string;
content?: unknown;
result?: unknown;
};
if (
event.type === "message" &&
event.message_type === "assistant_message" &&
typeof event.content === "string"
) {
parts.push(event.content);
}
if (event.type === "result" && typeof event.result === "string") {
parts.push(event.result);
}
} catch {
// Ignore malformed lines; validation will fail if we never find the marker.
}
}
return parts.join("");
}
function validateOutput(stdout: string, output: Args["output"]) {
if (output === "text") {
assertContainsAll(stdout, REQUIRED_MARKERS);
return;
}
if (output === "json") {
try {
const obj = JSON.parse(stdout);
const result = String(obj?.result ?? "");
assertContainsAll(result, REQUIRED_MARKERS);
return;
} catch (e) {
throw new Error(`Invalid JSON output: ${(e as Error).message}`);
}
}
const streamText = extractStreamJsonAssistantText(stdout);
if (!streamText) {
throw new Error("No assistant/result content found in stream-json output");
}
assertContainsAll(streamText, REQUIRED_MARKERS);
}
async function main() {
const { model, output } = parseArgs(process.argv.slice(2));
const prereq = await ensurePrereqs(model);
if (prereq === "skip") return;
const { stdout, code } = await runCLI(model, output);
if (code !== 0) {
process.exit(code);
let stdout = "";
let code = 0;
let lastError: Error | null = null;
for (let attempt = 1; attempt <= MAX_ATTEMPTS; attempt += 1) {
({ stdout, code } = await runCLI(model, output));
if (code !== 0) {
lastError = new Error(`CLI exited with code ${code}`);
} else {
try {
validateOutput(stdout, output);
console.log(`OK: ${model} / ${output}`);
return;
} catch (error) {
lastError = error as Error;
}
}
if (attempt < MAX_ATTEMPTS) {
console.error(
`[headless-scenario] attempt ${attempt}/${MAX_ATTEMPTS} failed for ${model} / ${output}: ${lastError?.message ?? "unknown error"}`,
);
await Bun.sleep(500);
}
}
try {
// Validate by output mode
if (output === "text") {
assertContainsAll(stdout, ["BANANA"]);
} else if (output === "json") {
try {
const obj = JSON.parse(stdout);
const result = String(obj?.result ?? "");
assertContainsAll(result, ["BANANA"]);
} catch (e) {
throw new Error(`Invalid JSON output: ${(e as Error).message}`);
}
} else if (output === "stream-json") {
// stream-json prints one JSON object per line; find the final result event
const lines = stdout.split(/\r?\n/).filter(Boolean);
const resultLine = lines.find((l) => {
try {
const o = JSON.parse(l);
return o?.type === "result";
} catch {
return false;
}
});
if (!resultLine) throw new Error("No final result event in stream-json");
const evt = JSON.parse(resultLine);
const result = String(evt?.result ?? "");
assertContainsAll(result, ["BANANA"]);
if (code !== 0) {
process.exit(code);
}
if (lastError) {
throw lastError;
}
console.log(`OK: ${model} / ${output}`);
} catch (e) {
// Dump full stdout to aid debugging
console.error(`\n===== BEGIN STDOUT (${model} / ${output}) =====`);

View File

@@ -0,0 +1,631 @@
import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test";
import WebSocket from "ws";
import { permissionMode } from "../../permissions/mode";
import type { MessageQueueItem } from "../../queue/queueRuntime";
type MockStream = {
conversationId: string;
agentId?: string;
};
type DrainResult = {
stopReason: string;
approvals?: Array<{
toolCallId: string;
toolName: string;
toolArgs: string;
}>;
apiDurationMs: number;
};
const defaultDrainResult: DrainResult = {
stopReason: "end_turn",
approvals: [],
apiDurationMs: 0,
};
const sendMessageStreamMock = mock(
async (
conversationId: string,
_messages: unknown[],
opts?: { agentId?: string },
): Promise<MockStream> => ({
conversationId,
agentId: opts?.agentId,
}),
);
const getStreamToolContextIdMock = mock(() => null);
const drainHandlers = new Map<
string,
(abortSignal?: AbortSignal) => Promise<DrainResult>
>();
const drainStreamWithResumeMock = mock(
async (
stream: MockStream,
_buffers: unknown,
_refresh: () => void,
abortSignal?: AbortSignal,
) => {
const handler = drainHandlers.get(stream.conversationId);
if (handler) {
return handler(abortSignal);
}
return defaultDrainResult;
},
);
const cancelConversationMock = mock(async (_conversationId: string) => {});
const getClientMock = mock(async () => ({
conversations: {
cancel: cancelConversationMock,
},
}));
const fetchRunErrorDetailMock = mock(async () => null);
const realStreamModule = await import("../../cli/helpers/stream");
mock.module("../../agent/message", () => ({
sendMessageStream: sendMessageStreamMock,
getStreamToolContextId: getStreamToolContextIdMock,
getStreamRequestContext: () => undefined,
getStreamRequestStartTime: () => undefined,
buildConversationMessagesCreateRequestBody: (
conversationId: string,
messages: unknown[],
opts?: { agentId?: string; streamTokens?: boolean; background?: boolean },
clientTools?: unknown[],
clientSkills?: unknown[],
) => ({
messages,
streaming: true,
stream_tokens: opts?.streamTokens ?? true,
include_pings: true,
background: opts?.background ?? true,
client_skills: clientSkills ?? [],
client_tools: clientTools ?? [],
include_compaction_messages: true,
...(conversationId === "default" && opts?.agentId
? { agent_id: opts.agentId }
: {}),
}),
}));
mock.module("../../cli/helpers/stream", () => ({
...realStreamModule,
drainStreamWithResume: drainStreamWithResumeMock,
}));
mock.module("../../agent/client", () => ({
getClient: getClientMock,
getServerUrl: () => "https://example.test",
clearLastSDKDiagnostic: () => {},
consumeLastSDKDiagnostic: () => null,
}));
mock.module("../../agent/approval-recovery", () => ({
fetchRunErrorDetail: fetchRunErrorDetailMock,
}));
const listenClientModule = await import("../../websocket/listen-client");
const {
__listenClientTestUtils,
requestApprovalOverWS,
resolvePendingApprovalResolver,
} = listenClientModule;
class MockSocket {
readyState: number;
sentPayloads: string[] = [];
constructor(readyState: number = WebSocket.OPEN) {
this.readyState = readyState;
}
send(data: string): void {
this.sentPayloads.push(data);
}
close(): void {}
removeAllListeners(): this {
return this;
}
}
function createDeferredDrain() {
let resolve!: (value: DrainResult) => void;
let reject!: (error: Error) => void;
const promise = new Promise<DrainResult>((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve, reject };
}
async function waitFor(
predicate: () => boolean,
attempts: number = 20,
): Promise<void> {
for (let i = 0; i < attempts; i += 1) {
if (predicate()) {
return;
}
await new Promise((resolve) => setTimeout(resolve, 0));
}
}
function makeIncomingMessage(
agentId: string,
conversationId: string,
text: string,
) {
return {
type: "message" as const,
agentId,
conversationId,
messages: [{ role: "user" as const, content: text }],
};
}
describe("listen-client multi-worker concurrency", () => {
beforeEach(() => {
permissionMode.reset();
sendMessageStreamMock.mockClear();
getStreamToolContextIdMock.mockClear();
drainStreamWithResumeMock.mockClear();
getClientMock.mockClear();
cancelConversationMock.mockClear();
fetchRunErrorDetailMock.mockClear();
drainHandlers.clear();
__listenClientTestUtils.setActiveRuntime(null);
});
afterEach(() => {
permissionMode.reset();
__listenClientTestUtils.setActiveRuntime(null);
});
test("processes simultaneous turns for two named conversations under one agent", async () => {
const listener = __listenClientTestUtils.createListenerRuntime();
const runtimeA = __listenClientTestUtils.getOrCreateConversationRuntime(
listener,
"agent-1",
"conv-a",
);
const runtimeB = __listenClientTestUtils.getOrCreateConversationRuntime(
listener,
"agent-1",
"conv-b",
);
const socket = new MockSocket();
const drainA = createDeferredDrain();
const drainB = createDeferredDrain();
drainHandlers.set("conv-a", () => drainA.promise);
drainHandlers.set("conv-b", () => drainB.promise);
const turnA = __listenClientTestUtils.handleIncomingMessage(
makeIncomingMessage("agent-1", "conv-a", "hello a"),
socket as unknown as WebSocket,
runtimeA,
);
const turnB = __listenClientTestUtils.handleIncomingMessage(
makeIncomingMessage("agent-1", "conv-b", "hello b"),
socket as unknown as WebSocket,
runtimeB,
);
await waitFor(() => sendMessageStreamMock.mock.calls.length === 2);
expect(runtimeA.isProcessing).toBe(true);
expect(runtimeB.isProcessing).toBe(true);
expect(__listenClientTestUtils.getListenerStatus(listener)).toBe(
"processing",
);
expect(sendMessageStreamMock.mock.calls.map((call) => call[0])).toEqual([
"conv-a",
"conv-b",
]);
drainB.resolve(defaultDrainResult);
await turnB;
expect(runtimeB.isProcessing).toBe(false);
expect(runtimeA.isProcessing).toBe(true);
drainA.resolve(defaultDrainResult);
await turnA;
expect(runtimeA.isProcessing).toBe(false);
expect(__listenClientTestUtils.getListenerStatus(listener)).toBe("idle");
});
test("keeps default conversations separate for different agents during concurrent turns", async () => {
const listener = __listenClientTestUtils.createListenerRuntime();
const runtimeA = __listenClientTestUtils.getOrCreateConversationRuntime(
listener,
"agent-a",
"default",
);
const runtimeB = __listenClientTestUtils.getOrCreateConversationRuntime(
listener,
"agent-b",
"default",
);
const socket = new MockSocket();
await Promise.all([
__listenClientTestUtils.handleIncomingMessage(
makeIncomingMessage("agent-a", "default", "from a"),
socket as unknown as WebSocket,
runtimeA,
),
__listenClientTestUtils.handleIncomingMessage(
makeIncomingMessage("agent-b", "default", "from b"),
socket as unknown as WebSocket,
runtimeB,
),
]);
expect(sendMessageStreamMock.mock.calls).toHaveLength(2);
expect(sendMessageStreamMock.mock.calls[0]?.[0]).toBe("default");
expect(sendMessageStreamMock.mock.calls[1]?.[0]).toBe("default");
expect(sendMessageStreamMock.mock.calls[0]?.[2]).toMatchObject({
agentId: "agent-a",
});
expect(sendMessageStreamMock.mock.calls[1]?.[2]).toMatchObject({
agentId: "agent-b",
});
});
test("cancelling one conversation runtime does not cancel another", async () => {
const listener = __listenClientTestUtils.createListenerRuntime();
const runtimeA = __listenClientTestUtils.getOrCreateConversationRuntime(
listener,
"agent-1",
"conv-a",
);
const runtimeB = __listenClientTestUtils.getOrCreateConversationRuntime(
listener,
"agent-1",
"conv-b",
);
runtimeA.isProcessing = true;
runtimeA.activeAbortController = new AbortController();
runtimeB.isProcessing = true;
runtimeB.activeAbortController = new AbortController();
runtimeA.cancelRequested = true;
runtimeA.activeAbortController.abort();
expect(runtimeA.activeAbortController.signal.aborted).toBe(true);
expect(runtimeB.activeAbortController.signal.aborted).toBe(false);
expect(runtimeB.cancelRequested).toBe(false);
});
test("approval waits and resolver routing stay isolated per conversation", async () => {
const listener = __listenClientTestUtils.createListenerRuntime();
const runtimeA = __listenClientTestUtils.getOrCreateConversationRuntime(
listener,
"agent-1",
"conv-a",
);
const runtimeB = __listenClientTestUtils.getOrCreateConversationRuntime(
listener,
"agent-1",
"conv-b",
);
const socket = new MockSocket();
const pendingA = requestApprovalOverWS(
runtimeA,
socket as unknown as WebSocket,
"perm-a",
{
type: "control_request",
request_id: "perm-a",
request: {
subtype: "can_use_tool",
tool_name: "Bash",
input: {},
tool_call_id: "call-a",
permission_suggestions: [],
blocked_path: null,
},
},
);
const pendingB = requestApprovalOverWS(
runtimeB,
socket as unknown as WebSocket,
"perm-b",
{
type: "control_request",
request_id: "perm-b",
request: {
subtype: "can_use_tool",
tool_name: "Bash",
input: {},
tool_call_id: "call-b",
permission_suggestions: [],
blocked_path: null,
},
},
);
expect(listener.approvalRuntimeKeyByRequestId.get("perm-a")).toBe(
runtimeA.key,
);
expect(listener.approvalRuntimeKeyByRequestId.get("perm-b")).toBe(
runtimeB.key,
);
const statusAWhilePending = __listenClientTestUtils.buildLoopStatus(
listener,
{
agent_id: "agent-1",
conversation_id: "conv-a",
},
);
const statusBWhilePending = __listenClientTestUtils.buildLoopStatus(
listener,
{
agent_id: "agent-1",
conversation_id: "conv-b",
},
);
expect(statusAWhilePending.status).toBe("WAITING_ON_APPROVAL");
expect(statusBWhilePending.status).toBe("WAITING_ON_APPROVAL");
expect(
resolvePendingApprovalResolver(runtimeA, {
request_id: "perm-a",
decision: { behavior: "allow" },
}),
).toBe(true);
await expect(pendingA).resolves.toMatchObject({
request_id: "perm-a",
decision: { behavior: "allow" },
});
expect(runtimeA.pendingApprovalResolvers.size).toBe(0);
expect(runtimeB.pendingApprovalResolvers.size).toBe(1);
expect(listener.approvalRuntimeKeyByRequestId.has("perm-a")).toBe(false);
expect(listener.approvalRuntimeKeyByRequestId.get("perm-b")).toBe(
runtimeB.key,
);
const statusAAfterResolve = __listenClientTestUtils.buildLoopStatus(
listener,
{
agent_id: "agent-1",
conversation_id: "conv-a",
},
);
const statusBAfterResolve = __listenClientTestUtils.buildLoopStatus(
listener,
{
agent_id: "agent-1",
conversation_id: "conv-b",
},
);
expect(statusAAfterResolve.status).toBe("WAITING_ON_INPUT");
expect(statusBAfterResolve.status).toBe("WAITING_ON_APPROVAL");
expect(
resolvePendingApprovalResolver(runtimeB, {
request_id: "perm-b",
decision: { behavior: "allow" },
}),
).toBe(true);
await expect(pendingB).resolves.toMatchObject({
request_id: "perm-b",
decision: { behavior: "allow" },
});
});
test("recovered approval state does not leak across conversation scopes", () => {
const listener = __listenClientTestUtils.createListenerRuntime();
const runtimeA = __listenClientTestUtils.getOrCreateConversationRuntime(
listener,
"agent-1",
"conv-a",
);
__listenClientTestUtils.getOrCreateConversationRuntime(
listener,
"agent-1",
"conv-b",
);
runtimeA.recoveredApprovalState = {
agentId: "agent-1",
conversationId: "conv-a",
approvalsByRequestId: new Map([
[
"perm-a",
{
approval: {
toolCallId: "call-a",
toolName: "Bash",
toolArgs: "{}",
},
controlRequest: {
type: "control_request",
request_id: "perm-a",
request: {
subtype: "can_use_tool",
tool_name: "Bash",
input: {},
tool_call_id: "call-a",
permission_suggestions: [],
blocked_path: null,
},
},
},
],
]),
pendingRequestIds: new Set(["perm-a"]),
responsesByRequestId: new Map(),
};
const loopStatusA = __listenClientTestUtils.buildLoopStatus(listener, {
agent_id: "agent-1",
conversation_id: "conv-a",
});
const loopStatusB = __listenClientTestUtils.buildLoopStatus(listener, {
agent_id: "agent-1",
conversation_id: "conv-b",
});
const deviceStatusA = __listenClientTestUtils.buildDeviceStatus(listener, {
agent_id: "agent-1",
conversation_id: "conv-a",
});
const deviceStatusB = __listenClientTestUtils.buildDeviceStatus(listener, {
agent_id: "agent-1",
conversation_id: "conv-b",
});
expect(loopStatusA.status).toBe("WAITING_ON_APPROVAL");
expect(loopStatusB.status).toBe("WAITING_ON_INPUT");
expect(deviceStatusA.pending_control_requests).toHaveLength(1);
expect(deviceStatusA.pending_control_requests[0]?.request_id).toBe(
"perm-a",
);
expect(deviceStatusB.pending_control_requests).toHaveLength(0);
});
test("queue dispatch respects conversation runtime boundaries", async () => {
const listener = __listenClientTestUtils.createListenerRuntime();
__listenClientTestUtils.setActiveRuntime(listener);
const runtimeA = __listenClientTestUtils.getOrCreateScopedRuntime(
listener,
"agent-1",
"conv-a",
);
const runtimeB = __listenClientTestUtils.getOrCreateScopedRuntime(
listener,
"agent-1",
"conv-b",
);
const socket = new MockSocket();
const processed: string[] = [];
const enqueueTurn = (
runtime: (typeof runtimeA | typeof runtimeB) & {
queueRuntime: {
enqueue: (item: {
kind: "message";
source: "user";
content: string;
clientMessageId: string;
agentId: string;
conversationId: string;
}) => { id: string } | null;
};
},
conversationId: string,
text: string,
) => {
const item = runtime.queueRuntime.enqueue({
kind: "message",
source: "user",
content: text,
clientMessageId: `cm-${conversationId}`,
agentId: "agent-1",
conversationId,
});
if (!item) {
throw new Error("Expected queued item to be created");
}
runtime.queuedMessagesByItemId.set(
item.id,
makeIncomingMessage("agent-1", conversationId, text),
);
};
enqueueTurn(runtimeA, "conv-a", "queued a");
enqueueTurn(runtimeB, "conv-b", "queued b");
const processQueuedTurn = mock(
async (queuedTurn: { conversationId?: string }) => {
processed.push(queuedTurn.conversationId ?? "missing");
},
);
const opts = {
connectionId: "conn-1",
onStatusChange: undefined,
} as never;
__listenClientTestUtils.scheduleQueuePump(
runtimeA,
socket as unknown as WebSocket,
opts,
processQueuedTurn,
);
__listenClientTestUtils.scheduleQueuePump(
runtimeB,
socket as unknown as WebSocket,
opts,
processQueuedTurn,
);
await waitFor(() => processed.length === 2);
expect(processed.sort()).toEqual(["conv-a", "conv-b"]);
expect(runtimeA.queueRuntime.length).toBe(0);
expect(runtimeB.queueRuntime.length).toBe(0);
expect(runtimeA.queuedMessagesByItemId.size).toBe(0);
expect(runtimeB.queuedMessagesByItemId.size).toBe(0);
});
test("queue pump status callbacks stay aggregate when another conversation is busy", async () => {
const listener = __listenClientTestUtils.createListenerRuntime();
__listenClientTestUtils.setActiveRuntime(listener);
const runtimeA = __listenClientTestUtils.getOrCreateScopedRuntime(
listener,
"agent-1",
"conv-a",
);
const runtimeB = __listenClientTestUtils.getOrCreateScopedRuntime(
listener,
"agent-1",
"conv-b",
);
const socket = new MockSocket();
const statuses: string[] = [];
runtimeA.isProcessing = true;
runtimeA.loopStatus = "PROCESSING_API_RESPONSE";
const queueInput = {
kind: "message",
source: "user",
content: "queued b",
clientMessageId: "cm-b",
agentId: "agent-1",
conversationId: "conv-b",
} satisfies Omit<MessageQueueItem, "id" | "enqueuedAt">;
const item = runtimeB.queueRuntime.enqueue(queueInput);
if (!item) {
throw new Error("Expected queued item to be created");
}
runtimeB.queuedMessagesByItemId.set(
item.id,
makeIncomingMessage("agent-1", "conv-b", "queued b"),
);
__listenClientTestUtils.scheduleQueuePump(
runtimeB,
socket as unknown as WebSocket,
{
connectionId: "conn-1",
onStatusChange: (status: "idle" | "receiving" | "processing") => {
statuses.push(status);
},
} as never,
async () => {},
);
await waitFor(() => runtimeB.queueRuntime.length === 0);
expect(statuses).not.toContain("idle");
expect(statuses.every((status) => status === "processing")).toBe(true);
expect(listener.conversationRuntimes.has(runtimeB.key)).toBe(false);
expect(listener.conversationRuntimes.has(runtimeA.key)).toBe(true);
});
});

View File

@@ -1,4 +1,4 @@
import { describe, expect, test } from "bun:test";
import { describe, expect, mock, test } from "bun:test";
import { mkdir, mkdtemp, realpath, rm } from "node:fs/promises";
import os from "node:os";
import { join } from "node:path";
@@ -332,31 +332,39 @@ describe("listen-client requestApprovalOverWS", () => {
expect(runtime.pendingApprovalResolvers.size).toBe(0);
});
test("cleans up resolver when send throws", async () => {
test("registers a pending resolver until an approval response arrives", async () => {
const runtime = __listenClientTestUtils.createRuntime();
const socket = new MockSocket(WebSocket.OPEN);
socket.sendImpl = () => {
throw new Error("send failed");
};
const requestId = "perm-send-fail";
await expect(
requestApprovalOverWS(
runtime,
socket as unknown as WebSocket,
requestId,
makeControlRequest(requestId),
),
).rejects.toThrow("send failed");
const pending = requestApprovalOverWS(
runtime,
socket as unknown as WebSocket,
requestId,
makeControlRequest(requestId),
);
expect(runtime.pendingApprovalResolvers.size).toBe(1);
expect(
runtime.pendingApprovalResolvers.get(requestId)?.controlRequest,
).toEqual(makeControlRequest(requestId));
rejectPendingApprovalResolvers(runtime, "cleanup");
await expect(pending).rejects.toThrow("cleanup");
expect(runtime.pendingApprovalResolvers.size).toBe(0);
});
});
describe("listen-client conversation-scoped protocol events", () => {
test("queue enqueue/block updates loop status with runtime scope instead of stream_delta", async () => {
const runtime = __listenClientTestUtils.createRuntime();
const listener = __listenClientTestUtils.createListenerRuntime();
const runtime = __listenClientTestUtils.getOrCreateScopedRuntime(
listener,
"agent-default",
"default",
);
const socket = new MockSocket(WebSocket.OPEN);
runtime.socket = socket as unknown as WebSocket;
listener.socket = socket as unknown as WebSocket;
const input: Omit<MessageQueueItem, "id" | "enqueuedAt"> = {
kind: "message",
@@ -396,9 +404,14 @@ describe("listen-client conversation-scoped protocol events", () => {
});
test("queue dequeue keeps scope through update_queue runtime envelope", async () => {
const runtime = __listenClientTestUtils.createRuntime();
const listener = __listenClientTestUtils.createListenerRuntime();
const runtime = __listenClientTestUtils.getOrCreateScopedRuntime(
listener,
"agent-xyz",
"conv-xyz",
);
const socket = new MockSocket(WebSocket.OPEN);
runtime.socket = socket as unknown as WebSocket;
listener.socket = socket as unknown as WebSocket;
const input: Omit<MessageQueueItem, "id" | "enqueuedAt"> = {
kind: "message",
@@ -465,6 +478,24 @@ describe("listen-client v2 status builders", () => {
});
});
test("resolveRuntimeScope does not guess another conversation when multiple runtimes exist", () => {
const listener = __listenClientTestUtils.createListenerRuntime();
const runtimeA = __listenClientTestUtils.getOrCreateConversationRuntime(
listener,
"agent-1",
"conv-a",
);
__listenClientTestUtils.getOrCreateConversationRuntime(
listener,
"agent-1",
"conv-b",
);
runtimeA.isProcessing = true;
expect(__listenClientTestUtils.resolveRuntimeScope(listener)).toBeNull();
});
test("does not emit bootstrap status updates with __unknown_agent__ runtime", () => {
const runtime = __listenClientTestUtils.createRuntime();
const socket = new MockSocket(WebSocket.OPEN);
@@ -507,7 +538,12 @@ describe("listen-client v2 status builders", () => {
});
test("sync replays device, loop, and queue state for the requested runtime", () => {
const runtime = __listenClientTestUtils.createRuntime();
const listener = __listenClientTestUtils.createListenerRuntime();
const runtime = __listenClientTestUtils.getOrCreateScopedRuntime(
listener,
"agent-1",
"default",
);
const socket = new MockSocket(WebSocket.OPEN);
const queueInput = {
clientMessageId: "cm-1",
@@ -555,7 +591,12 @@ describe("listen-client v2 status builders", () => {
});
test("recovered approvals surface as pending control requests and WAITING_ON_APPROVAL", () => {
const runtime = __listenClientTestUtils.createRuntime();
const listener = __listenClientTestUtils.createListenerRuntime();
const runtime = __listenClientTestUtils.getOrCreateScopedRuntime(
listener,
"agent-1",
"default",
);
const socket = new MockSocket(WebSocket.OPEN);
const requestId = "perm-tool-call-1";
@@ -630,7 +671,12 @@ describe("listen-client v2 status builders", () => {
});
test("starting a live turn clears stale recovered approvals for the same scope", () => {
const runtime = __listenClientTestUtils.createRuntime();
const listener = __listenClientTestUtils.createListenerRuntime();
const runtime = __listenClientTestUtils.getOrCreateScopedRuntime(
listener,
"agent-1",
"default",
);
runtime.recoveredApprovalState = {
agentId: "agent-1",
conversationId: "default",
@@ -682,6 +728,72 @@ describe("listen-client v2 status builders", () => {
});
expect(defaultStatus.current_working_directory).toBe("/repo/b");
});
test("scoped loop status is not suppressed just because another conversation is processing", () => {
const listener = __listenClientTestUtils.createListenerRuntime();
const runtimeA = __listenClientTestUtils.getOrCreateConversationRuntime(
listener,
"agent-1",
"conv-a",
);
const runtimeB = __listenClientTestUtils.getOrCreateConversationRuntime(
listener,
"agent-1",
"conv-b",
);
runtimeA.isProcessing = true;
runtimeA.loopStatus = "PROCESSING_API_RESPONSE";
runtimeB.loopStatus = "WAITING_ON_APPROVAL";
expect(
__listenClientTestUtils.buildLoopStatus(listener, {
agent_id: "agent-1",
conversation_id: "conv-b",
}),
).toEqual({
status: "WAITING_ON_APPROVAL",
active_run_ids: [],
});
});
test("scoped queue snapshots are not suppressed just because another conversation is processing", () => {
const listener = __listenClientTestUtils.createListenerRuntime();
const runtimeA = __listenClientTestUtils.getOrCreateScopedRuntime(
listener,
"agent-1",
"conv-a",
);
const runtimeB = __listenClientTestUtils.getOrCreateScopedRuntime(
listener,
"agent-1",
"conv-b",
);
runtimeA.isProcessing = true;
runtimeA.loopStatus = "PROCESSING_API_RESPONSE";
const queueInput = {
kind: "message",
source: "user",
content: "queued b",
clientMessageId: "cm-b",
agentId: "agent-1",
conversationId: "conv-b",
} satisfies Omit<MessageQueueItem, "id" | "enqueuedAt">;
runtimeB.queueRuntime.enqueue(queueInput);
expect(
__listenClientTestUtils.buildQueueSnapshot(listener, {
agent_id: "agent-1",
conversation_id: "conv-b",
}),
).toEqual([
expect.objectContaining({
client_message_id: "cm-b",
kind: "message",
}),
]);
});
});
describe("listen-client cwd change handling", () => {
@@ -985,8 +1097,14 @@ describe("listen-client capability-gated approval flow", () => {
});
test("requestApprovalOverWS exposes the control request through device status instead of stream_delta", () => {
const runtime = __listenClientTestUtils.createRuntime();
const listener = __listenClientTestUtils.createListenerRuntime();
const runtime = __listenClientTestUtils.getOrCreateScopedRuntime(
listener,
"agent-1",
"default",
);
const socket = new MockSocket(WebSocket.OPEN);
listener.socket = socket as unknown as WebSocket;
const requestId = "perm-adapter-test";
void requestApprovalOverWS(
@@ -996,10 +1114,18 @@ describe("listen-client capability-gated approval flow", () => {
makeControlRequest(requestId),
).catch(() => {});
expect(socket.sentPayloads).toHaveLength(2);
const [loopStatus, deviceStatus] = socket.sentPayloads.map((payload) =>
expect(socket.sentPayloads.length).toBeGreaterThanOrEqual(2);
const outbound = socket.sentPayloads.map((payload) =>
JSON.parse(payload as string),
);
const loopStatus = outbound.find(
(payload) => payload.type === "update_loop_status",
);
const deviceStatus = outbound.find(
(payload) => payload.type === "update_device_status",
);
expect(loopStatus).toBeDefined();
expect(deviceStatus).toBeDefined();
expect(loopStatus.type).toBe("update_loop_status");
expect(loopStatus.loop_status.status).toBe("WAITING_ON_APPROVAL");
expect(deviceStatus.type).toBe("update_device_status");
@@ -1013,6 +1139,64 @@ describe("listen-client capability-gated approval flow", () => {
// Cleanup
rejectPendingApprovalResolvers(runtime, "test cleanup");
});
test("handled recovered approval responses reschedule queue pumping for the fallback scoped runtime", async () => {
const listener = __listenClientTestUtils.createListenerRuntime();
const targetRuntime =
__listenClientTestUtils.getOrCreateConversationRuntime(
listener,
"agent-1",
"default",
);
const socket = new MockSocket(WebSocket.OPEN);
const scheduleQueuePumpMock = mock(() => {});
const resolveRecoveredApprovalResponseMock = mock(async () => true);
const handled = await __listenClientTestUtils.handleApprovalResponseInput(
listener,
{
runtime: { agent_id: "agent-1", conversation_id: "default" },
response: {
request_id: "perm-recovered",
decision: { behavior: "allow" },
},
socket: socket as unknown as WebSocket,
opts: {
onStatusChange: undefined,
connectionId: "conn-1",
},
processQueuedTurn: async () => {},
},
{
resolveRuntimeForApprovalRequest: () => null,
resolvePendingApprovalResolver: () => false,
getOrCreateScopedRuntime: () => targetRuntime,
resolveRecoveredApprovalResponse: resolveRecoveredApprovalResponseMock,
scheduleQueuePump: scheduleQueuePumpMock,
},
);
expect(handled).toBe(true);
expect(resolveRecoveredApprovalResponseMock).toHaveBeenCalledWith(
targetRuntime,
socket,
{
request_id: "perm-recovered",
decision: { behavior: "allow" },
},
expect.any(Function),
{
onStatusChange: undefined,
connectionId: "conn-1",
},
);
expect(scheduleQueuePumpMock).toHaveBeenCalledWith(
targetRuntime,
socket,
expect.objectContaining({ connectionId: "conn-1" }),
expect.any(Function),
);
});
});
describe("listen-client approval recovery batch correlation", () => {

View File

@@ -20,6 +20,8 @@ import {
const {
createRuntime,
createListenerRuntime,
getOrCreateConversationRuntime,
stopRuntime,
rememberPendingApprovalBatchIds,
populateInterruptQueue,
@@ -100,17 +102,28 @@ describe("stopRuntime teardown", () => {
});
test("increments continuationEpoch on each stop", () => {
const runtime = createRuntime();
runtime.socket = new MockSocket(WebSocket.OPEN) as unknown as WebSocket;
const listener = createListenerRuntime();
listener.socket = new MockSocket(WebSocket.OPEN) as unknown as WebSocket;
expect(runtime.continuationEpoch).toBe(0);
stopRuntime(runtime, true);
expect(runtime.continuationEpoch).toBe(1);
const runtimeA = getOrCreateConversationRuntime(
listener,
"agent-1",
"conv-1",
);
const runtimeB = getOrCreateConversationRuntime(
listener,
"agent-2",
"conv-2",
);
runtime.socket = new MockSocket(WebSocket.OPEN) as unknown as WebSocket;
runtime.intentionallyClosed = false;
stopRuntime(runtime, true);
expect(runtime.continuationEpoch).toBe(2);
expect(runtimeA.continuationEpoch).toBe(0);
expect(runtimeB.continuationEpoch).toBe(0);
stopRuntime(listener, true);
expect(runtimeA.continuationEpoch).toBe(1);
expect(runtimeB.continuationEpoch).toBe(1);
expect(listener.conversationRuntimes.size).toBe(0);
});
});

View File

@@ -9,10 +9,11 @@ import {
emitLoopStatusIfOpen,
setLoopStatus,
} from "./protocol-outbound";
import type { ListenerRuntime } from "./types";
import { evictConversationRuntimeIfIdle } from "./runtime";
import type { ConversationRuntime } from "./types";
export function rememberPendingApprovalBatchIds(
runtime: ListenerRuntime,
runtime: ConversationRuntime,
pendingApprovals: Array<{ toolCallId: string }>,
batchId: string,
): void {
@@ -27,7 +28,7 @@ export function rememberPendingApprovalBatchIds(
}
export function resolvePendingApprovalBatchId(
runtime: ListenerRuntime,
runtime: ConversationRuntime,
pendingApprovals: Array<{ toolCallId: string }>,
): string | null {
const batchIds = new Set<string>();
@@ -47,7 +48,7 @@ export function resolvePendingApprovalBatchId(
}
export function resolveRecoveryBatchId(
runtime: ListenerRuntime,
runtime: ConversationRuntime,
pendingApprovals: Array<{ toolCallId: string }>,
): string | null {
if (runtime.pendingApprovalBatchByToolCallId.size === 0) {
@@ -57,7 +58,7 @@ export function resolveRecoveryBatchId(
}
export function clearPendingApprovalBatchIds(
runtime: ListenerRuntime,
runtime: ConversationRuntime,
approvals: Array<{ toolCallId: string }>,
): void {
for (const approval of approvals) {
@@ -178,7 +179,7 @@ export function validateApprovalResultIds(
}
export function resolvePendingApprovalResolver(
runtime: ListenerRuntime,
runtime: ConversationRuntime,
response: ApprovalResponseBody,
): boolean {
const requestId = response.request_id;
@@ -192,6 +193,7 @@ export function resolvePendingApprovalResolver(
}
runtime.pendingApprovalResolvers.delete(requestId);
runtime.listener.approvalRuntimeKeyByRequestId.delete(requestId);
if (runtime.pendingApprovalResolvers.size === 0) {
setLoopStatus(
runtime,
@@ -199,29 +201,49 @@ export function resolvePendingApprovalResolver(
);
}
pending.resolve(response);
emitLoopStatusIfOpen(runtime);
emitDeviceStatusIfOpen(runtime);
emitLoopStatusIfOpen(runtime.listener, {
agent_id: runtime.agentId,
conversation_id: runtime.conversationId,
});
emitDeviceStatusIfOpen(runtime.listener, {
agent_id: runtime.agentId,
conversation_id: runtime.conversationId,
});
evictConversationRuntimeIfIdle(runtime);
return true;
}
export function rejectPendingApprovalResolvers(
runtime: ListenerRuntime,
runtime: ConversationRuntime,
reason: string,
): void {
for (const [, pending] of runtime.pendingApprovalResolvers) {
pending.reject(new Error(reason));
}
runtime.pendingApprovalResolvers.clear();
for (const [requestId, runtimeKey] of runtime.listener
.approvalRuntimeKeyByRequestId) {
if (runtimeKey === runtime.key) {
runtime.listener.approvalRuntimeKeyByRequestId.delete(requestId);
}
}
setLoopStatus(
runtime,
runtime.isProcessing ? "PROCESSING_API_RESPONSE" : "WAITING_ON_INPUT",
);
emitLoopStatusIfOpen(runtime);
emitDeviceStatusIfOpen(runtime);
emitLoopStatusIfOpen(runtime.listener, {
agent_id: runtime.agentId,
conversation_id: runtime.conversationId,
});
emitDeviceStatusIfOpen(runtime.listener, {
agent_id: runtime.agentId,
conversation_id: runtime.conversationId,
});
evictConversationRuntimeIfIdle(runtime);
}
export function requestApprovalOverWS(
runtime: ListenerRuntime,
runtime: ConversationRuntime,
socket: WebSocket,
requestId: string,
controlRequest: ControlRequest,
@@ -236,9 +258,16 @@ export function requestApprovalOverWS(
reject,
controlRequest,
});
runtime.listener.approvalRuntimeKeyByRequestId.set(requestId, runtime.key);
setLoopStatus(runtime, "WAITING_ON_APPROVAL");
emitLoopStatusIfOpen(runtime);
emitDeviceStatusIfOpen(runtime);
emitLoopStatusIfOpen(runtime.listener, {
agent_id: runtime.agentId,
conversation_id: runtime.conversationId,
});
emitDeviceStatusIfOpen(runtime.listener, {
agent_id: runtime.agentId,
conversation_id: runtime.conversationId,
});
});
}

View File

@@ -16,6 +16,7 @@ import { type DequeuedBatch, QueueRuntime } from "../../queue/queueRuntime";
import { createSharedReminderState } from "../../reminders/state";
import { settingsManager } from "../../settings-manager";
import { loadTools } from "../../tools/manager";
import type { ApprovalResponseBody } from "../../types/protocol_v2";
import { isDebugEnabled } from "../../utils/debug";
import { killAllTerminals } from "../terminalHandler";
import {
@@ -75,9 +76,14 @@ import {
shouldAttemptPostStopApprovalRecovery,
} from "./recovery";
import {
clearConversationRuntimeState,
clearRecoveredApprovalStateForScope,
clearRuntimeTimers,
emitListenerStatus,
evictConversationRuntimeIfIdle,
getActiveRuntime,
getListenerStatus,
getOrCreateConversationRuntime,
getPendingControlRequestCount,
getRecoveredApprovalStateForScope,
safeEmitWsEvent,
@@ -92,6 +98,7 @@ import { markAwaitingAcceptedApprovalContinuationRunId } from "./send";
import { handleIncomingMessage } from "./turn";
import type {
ChangeCwdMessage,
ConversationRuntime,
IncomingMessage,
ListenerRuntime,
ModeChangePayload,
@@ -139,15 +146,185 @@ function handleModeChange(
}
}
function ensureConversationQueueRuntime(
listener: ListenerRuntime,
runtime: ConversationRuntime,
): ConversationRuntime {
if (runtime.queueRuntime) {
return runtime;
}
runtime.queueRuntime = new QueueRuntime({
callbacks: {
onEnqueued: (item, queueLen) => {
runtime.pendingTurns = queueLen;
scheduleQueueEmit(listener, getQueueItemScope(item));
},
onDequeued: (batch) => {
runtime.pendingTurns = batch.queueLenAfter;
scheduleQueueEmit(listener, getQueueItemsScope(batch.items));
},
onBlocked: () => {
scheduleQueueEmit(listener, {
agent_id: runtime.agentId,
conversation_id: runtime.conversationId,
});
},
onCleared: (_reason, _clearedCount, items) => {
runtime.pendingTurns = 0;
scheduleQueueEmit(listener, getQueueItemsScope(items));
evictConversationRuntimeIfIdle(runtime);
},
onDropped: (item, _reason, queueLen) => {
runtime.pendingTurns = queueLen;
runtime.queuedMessagesByItemId.delete(item.id);
scheduleQueueEmit(listener, getQueueItemScope(item));
evictConversationRuntimeIfIdle(runtime);
},
},
});
return runtime;
}
function getOrCreateScopedRuntime(
listener: ListenerRuntime,
agentId?: string | null,
conversationId?: string | null,
): ConversationRuntime {
return ensureConversationQueueRuntime(
listener,
getOrCreateConversationRuntime(listener, agentId, conversationId),
);
}
function resolveRuntimeForApprovalRequest(
listener: ListenerRuntime,
requestId?: string | null,
): ConversationRuntime | null {
if (!requestId) {
return null;
}
const runtimeKey = listener.approvalRuntimeKeyByRequestId.get(requestId);
if (!runtimeKey) {
return null;
}
return listener.conversationRuntimes.get(runtimeKey) ?? null;
}
type ProcessQueuedTurn = (
queuedTurn: IncomingMessage,
dequeuedBatch: DequeuedBatch,
) => Promise<void>;
async function handleApprovalResponseInput(
listener: ListenerRuntime,
params: {
runtime: {
agent_id?: string | null;
conversation_id?: string | null;
};
response: ApprovalResponseBody;
socket: WebSocket;
opts: {
onStatusChange?: StartListenerOptions["onStatusChange"];
connectionId?: string;
};
processQueuedTurn: ProcessQueuedTurn;
},
deps: {
resolveRuntimeForApprovalRequest: (
listener: ListenerRuntime,
requestId?: string | null,
) => ConversationRuntime | null;
resolvePendingApprovalResolver: (
runtime: ConversationRuntime,
response: ApprovalResponseBody,
) => boolean;
getOrCreateScopedRuntime: (
listener: ListenerRuntime,
agentId?: string | null,
conversationId?: string | null,
) => ConversationRuntime;
resolveRecoveredApprovalResponse: (
runtime: ConversationRuntime,
socket: WebSocket,
response: ApprovalResponseBody,
processTurn: typeof handleIncomingMessage,
opts?: {
onStatusChange?: StartListenerOptions["onStatusChange"];
connectionId?: string;
},
) => Promise<boolean>;
scheduleQueuePump: (
runtime: ConversationRuntime,
socket: WebSocket,
opts: StartListenerOptions,
processQueuedTurn: ProcessQueuedTurn,
) => void;
} = {
resolveRuntimeForApprovalRequest,
resolvePendingApprovalResolver,
getOrCreateScopedRuntime,
resolveRecoveredApprovalResponse,
scheduleQueuePump,
},
): Promise<boolean> {
const approvalRuntime = deps.resolveRuntimeForApprovalRequest(
listener,
params.response.request_id,
);
if (
approvalRuntime &&
deps.resolvePendingApprovalResolver(approvalRuntime, params.response)
) {
deps.scheduleQueuePump(
approvalRuntime,
params.socket,
params.opts as StartListenerOptions,
params.processQueuedTurn,
);
return true;
}
const targetRuntime =
approvalRuntime ??
deps.getOrCreateScopedRuntime(
listener,
params.runtime.agent_id,
params.runtime.conversation_id,
);
if (
await deps.resolveRecoveredApprovalResponse(
targetRuntime,
params.socket,
params.response,
handleIncomingMessage,
{
onStatusChange: params.opts.onStatusChange,
connectionId: params.opts.connectionId,
},
)
) {
deps.scheduleQueuePump(
targetRuntime,
params.socket,
params.opts as StartListenerOptions,
params.processQueuedTurn,
);
return true;
}
return false;
}
async function handleCwdChange(
msg: ChangeCwdMessage,
socket: WebSocket,
runtime: ListenerRuntime,
runtime: ConversationRuntime,
): Promise<void> {
const conversationId = normalizeConversationId(msg.conversationId);
const agentId = normalizeCwdAgentId(msg.agentId);
const currentWorkingDirectory = getConversationWorkingDirectory(
runtime,
runtime.listener,
agentId,
conversationId,
);
@@ -168,7 +345,7 @@ async function handleCwdChange(
}
setConversationWorkingDirectory(
runtime,
runtime.listener,
agentId,
conversationId,
normalizedPath,
@@ -193,78 +370,27 @@ async function handleCwdChange(
function createRuntime(): ListenerRuntime {
const bootWorkingDirectory = process.env.USER_CWD || process.cwd();
const runtime: ListenerRuntime = {
return {
socket: null,
heartbeatInterval: null,
reconnectTimeout: null,
intentionallyClosed: false,
hasSuccessfulConnection: false,
messageQueue: Promise.resolve(),
pendingApprovalResolvers: new Map(),
recoveredApprovalState: null,
sessionId: `listen-${crypto.randomUUID()}`,
eventSeqCounter: 0,
lastStopReason: null,
isProcessing: false,
activeAgentId: null,
activeConversationId: null,
activeWorkingDirectory: null,
activeRunId: null,
activeRunStartedAt: null,
activeAbortController: null,
cancelRequested: false,
isRecoveringApprovals: false,
loopStatus: "WAITING_ON_INPUT",
pendingApprovalBatchByToolCallId: new Map<string, string>(),
pendingInterruptedResults: null,
pendingInterruptedContext: null,
continuationEpoch: 0,
activeExecutingToolCallIds: [],
pendingInterruptedToolCallIds: null,
queueEmitScheduled: false,
pendingQueueEmitScope: undefined,
onWsEvent: undefined,
reminderState: createSharedReminderState(),
bootWorkingDirectory,
workingDirectoryByConversation: loadPersistedCwdMap(),
connectionId: null,
connectionName: null,
queuedMessagesByItemId: new Map<string, IncomingMessage>(),
queuePumpActive: false,
queuePumpScheduled: false,
queueEmitScheduled: false,
pendingQueueEmitScope: undefined,
pendingTurns: 0,
// queueRuntime assigned below — needs runtime ref in callbacks
queueRuntime: null as unknown as QueueRuntime,
conversationRuntimes: new Map(),
approvalRuntimeKeyByRequestId: new Map(),
lastEmittedStatus: null,
};
runtime.queueRuntime = new QueueRuntime({
callbacks: {
onEnqueued: (item, queueLen) => {
runtime.pendingTurns = queueLen;
const scope = getQueueItemScope(item);
scheduleQueueEmit(runtime, scope);
},
onDequeued: (batch) => {
runtime.pendingTurns = batch.queueLenAfter;
const scope = getQueueItemsScope(batch.items);
scheduleQueueEmit(runtime, scope);
},
onBlocked: (_reason, _queueLen) => {
const scope = getQueueItemScope(runtime.queueRuntime.items[0]);
scheduleQueueEmit(runtime, scope);
},
onCleared: (_reason, _clearedCount, items) => {
runtime.pendingTurns = 0;
const scope = getQueueItemsScope(items);
scheduleQueueEmit(runtime, scope);
},
onDropped: (item, _reason, queueLen) => {
runtime.pendingTurns = queueLen;
runtime.queuedMessagesByItemId.delete(item.id);
const scope = getQueueItemScope(item);
scheduleQueueEmit(runtime, scope);
},
},
});
return runtime;
}
function stopRuntime(
@@ -272,24 +398,20 @@ function stopRuntime(
suppressCallbacks: boolean,
): void {
runtime.intentionallyClosed = true;
runtime.cancelRequested = true;
if (
runtime.activeAbortController &&
!runtime.activeAbortController.signal.aborted
) {
runtime.activeAbortController.abort();
}
clearRuntimeTimers(runtime);
rejectPendingApprovalResolvers(runtime, "Listener runtime stopped");
runtime.pendingApprovalBatchByToolCallId.clear();
// Clear interrupted queue on true teardown to prevent cross-session leakage.
runtime.pendingInterruptedResults = null;
runtime.pendingInterruptedContext = null;
runtime.pendingInterruptedToolCallIds = null;
runtime.activeExecutingToolCallIds = [];
runtime.loopStatus = "WAITING_ON_INPUT";
runtime.continuationEpoch++;
for (const conversationRuntime of runtime.conversationRuntimes.values()) {
rejectPendingApprovalResolvers(
conversationRuntime,
"Listener runtime stopped",
);
clearConversationRuntimeState(conversationRuntime);
if (conversationRuntime.queueRuntime) {
conversationRuntime.queuedMessagesByItemId.clear();
conversationRuntime.queueRuntime.clear("shutdown");
}
}
runtime.conversationRuntimes.clear();
runtime.approvalRuntimeKeyByRequestId.clear();
if (!runtime.socket) {
return;
@@ -397,14 +519,19 @@ async function connectWithRetry(
});
runtime.socket = socket;
const processQueuedTurn = async (
const processQueuedTurn: ProcessQueuedTurn = async (
queuedTurn: IncomingMessage,
dequeuedBatch: DequeuedBatch,
): Promise<void> => {
const scopedRuntime = getOrCreateScopedRuntime(
runtime,
queuedTurn.agentId,
queuedTurn.conversationId,
);
await handleIncomingMessage(
queuedTurn,
socket,
runtime,
scopedRuntime,
opts.onStatusChange,
opts.connectionId,
dequeuedBatch.batchId,
@@ -420,8 +547,19 @@ async function connectWithRetry(
runtime.hasSuccessfulConnection = true;
opts.onConnected(opts.connectionId);
emitDeviceStatusUpdate(socket, runtime);
emitLoopStatusUpdate(socket, runtime);
if (runtime.conversationRuntimes.size === 0) {
emitDeviceStatusUpdate(socket, runtime);
emitLoopStatusUpdate(socket, runtime);
} else {
for (const conversationRuntime of runtime.conversationRuntimes.values()) {
const scope = {
agent_id: conversationRuntime.agentId,
conversation_id: conversationRuntime.conversationId,
};
emitDeviceStatusUpdate(socket, conversationRuntime, scope);
emitLoopStatusUpdate(socket, conversationRuntime, scope);
}
}
runtime.heartbeatInterval = setInterval(() => {
if (socket.readyState === WebSocket.OPEN) {
@@ -471,7 +609,14 @@ async function connectWithRetry(
console.log(`[Listen V2] Dropping sync: runtime mismatch or closed`);
return;
}
await recoverApprovalStateForSync(runtime, parsed.runtime);
await recoverApprovalStateForSync(
getOrCreateScopedRuntime(
runtime,
parsed.runtime.agent_id,
parsed.runtime.conversation_id,
),
parsed.runtime,
);
emitStateSync(socket, runtime, parsed.runtime);
return;
}
@@ -486,23 +631,19 @@ async function connectWithRetry(
}
if (parsed.payload.kind === "approval_response") {
if (resolvePendingApprovalResolver(runtime, parsed.payload)) {
scheduleQueuePump(runtime, socket, opts, processQueuedTurn);
return;
}
if (
await resolveRecoveredApprovalResponse(
runtime,
await handleApprovalResponseInput(runtime, {
runtime: parsed.runtime,
response: parsed.payload,
socket,
parsed.payload,
handleIncomingMessage,
{
opts: {
onStatusChange: opts.onStatusChange,
connectionId: opts.connectionId,
},
)
processQueuedTurn,
})
) {
scheduleQueuePump(runtime, socket, opts, processQueuedTurn);
return;
}
return;
}
@@ -541,6 +682,12 @@ async function connectWithRetry(
return;
}
const scopedRuntime = getOrCreateScopedRuntime(
runtime,
incoming.agentId,
incoming.conversationId,
);
if (shouldQueueInboundMessage(incoming)) {
const firstUserPayload = incoming.messages.find(
(
@@ -549,7 +696,7 @@ async function connectWithRetry(
"content" in payload,
);
if (firstUserPayload) {
const enqueuedItem = runtime.queueRuntime.enqueue({
const enqueuedItem = scopedRuntime.queueRuntime.enqueue({
kind: "message",
source: "user",
content: firstUserPayload.content,
@@ -558,37 +705,37 @@ async function connectWithRetry(
`cm-submit-${crypto.randomUUID()}`,
agentId: parsed.runtime.agent_id,
conversationId: parsed.runtime.conversation_id || "default",
} as Parameters<typeof runtime.queueRuntime.enqueue>[0]);
} as Parameters<typeof scopedRuntime.queueRuntime.enqueue>[0]);
if (enqueuedItem) {
runtime.queuedMessagesByItemId.set(enqueuedItem.id, incoming);
scopedRuntime.queuedMessagesByItemId.set(enqueuedItem.id, incoming);
}
}
scheduleQueuePump(runtime, socket, opts, processQueuedTurn);
scheduleQueuePump(scopedRuntime, socket, opts, processQueuedTurn);
return;
}
runtime.messageQueue = runtime.messageQueue
scopedRuntime.messageQueue = scopedRuntime.messageQueue
.then(async () => {
if (runtime !== getActiveRuntime() || runtime.intentionallyClosed) {
return;
}
opts.onStatusChange?.("receiving", opts.connectionId);
emitListenerStatus(runtime, opts.onStatusChange, opts.connectionId);
await handleIncomingMessage(
incoming,
socket,
runtime,
scopedRuntime,
opts.onStatusChange,
opts.connectionId,
);
opts.onStatusChange?.("idle", opts.connectionId);
scheduleQueuePump(runtime, socket, opts, processQueuedTurn);
emitListenerStatus(runtime, opts.onStatusChange, opts.connectionId);
scheduleQueuePump(scopedRuntime, socket, opts, processQueuedTurn);
})
.catch((error: unknown) => {
if (process.env.DEBUG) {
console.error("[Listen] Error handling queued input:", error);
}
opts.onStatusChange?.("idle", opts.connectionId);
scheduleQueuePump(runtime, socket, opts, processQueuedTurn);
emitListenerStatus(runtime, opts.onStatusChange, opts.connectionId);
scheduleQueuePump(scopedRuntime, socket, opts, processQueuedTurn);
});
return;
}
@@ -605,11 +752,16 @@ async function connectWithRetry(
parsed.runtime.conversation_id ??
undefined,
};
const scopedRuntime = getOrCreateScopedRuntime(
runtime,
scope.agent_id,
scope.conversation_id,
);
const shouldTrackCommand =
!runtime.isProcessing &&
!scopedRuntime.isProcessing &&
getPendingControlRequestCount(runtime, scope) === 0;
if (shouldTrackCommand) {
setLoopStatus(runtime, "EXECUTING_COMMAND", scope);
setLoopStatus(scopedRuntime, "EXECUTING_COMMAND", scope);
}
try {
if (parsed.payload.mode) {
@@ -628,14 +780,14 @@ async function connectWithRetry(
cwd: parsed.payload.cwd,
},
socket,
runtime,
scopedRuntime,
);
} else if (!parsed.payload.mode) {
emitDeviceStatusUpdate(socket, runtime, scope);
}
} finally {
if (shouldTrackCommand) {
setLoopStatus(runtime, "WAITING_ON_INPUT", scope);
setLoopStatus(scopedRuntime, "WAITING_ON_INPUT", scope);
}
}
return;
@@ -651,42 +803,47 @@ async function connectWithRetry(
agent_id: parsed.runtime.agent_id,
conversation_id: parsed.runtime.conversation_id,
}) > 0;
const hasActiveTurn = runtime.isProcessing;
const scopedRuntime = getOrCreateScopedRuntime(
runtime,
parsed.runtime.agent_id,
parsed.runtime.conversation_id,
);
const hasActiveTurn = scopedRuntime.isProcessing;
if (!hasActiveTurn && !hasPendingApprovals) {
return;
}
runtime.cancelRequested = true;
scopedRuntime.cancelRequested = true;
// Eager interrupt capture parity with App/headless:
// if tool execution is currently in-flight, queue explicit interrupted
// tool results immediately at cancel time (before async catch paths).
if (
runtime.activeExecutingToolCallIds.length > 0 &&
(!runtime.pendingInterruptedResults ||
runtime.pendingInterruptedResults.length === 0)
scopedRuntime.activeExecutingToolCallIds.length > 0 &&
(!scopedRuntime.pendingInterruptedResults ||
scopedRuntime.pendingInterruptedResults.length === 0)
) {
runtime.pendingInterruptedResults =
runtime.activeExecutingToolCallIds.map((toolCallId) => ({
scopedRuntime.pendingInterruptedResults =
scopedRuntime.activeExecutingToolCallIds.map((toolCallId) => ({
type: "tool",
tool_call_id: toolCallId,
tool_return: INTERRUPTED_BY_USER,
status: "error",
}));
runtime.pendingInterruptedContext = {
agentId: runtime.activeAgentId || "",
conversationId: runtime.activeConversationId || "default",
continuationEpoch: runtime.continuationEpoch,
scopedRuntime.pendingInterruptedContext = {
agentId: scopedRuntime.agentId || "",
conversationId: scopedRuntime.conversationId,
continuationEpoch: scopedRuntime.continuationEpoch,
};
runtime.pendingInterruptedToolCallIds = [
...runtime.activeExecutingToolCallIds,
scopedRuntime.pendingInterruptedToolCallIds = [
...scopedRuntime.activeExecutingToolCallIds,
];
}
if (
runtime.activeAbortController &&
!runtime.activeAbortController.signal.aborted
scopedRuntime.activeAbortController &&
!scopedRuntime.activeAbortController.signal.aborted
) {
runtime.activeAbortController.abort();
scopedRuntime.activeAbortController.abort();
}
const recoveredApprovalState = getRecoveredApprovalStateForScope(
runtime,
@@ -696,15 +853,15 @@ async function connectWithRetry(
},
);
if (recoveredApprovalState && !hasActiveTurn) {
stashRecoveredApprovalInterrupts(runtime, recoveredApprovalState);
stashRecoveredApprovalInterrupts(scopedRuntime, recoveredApprovalState);
}
if (hasPendingApprovals) {
rejectPendingApprovalResolvers(runtime, "Cancelled by user");
rejectPendingApprovalResolvers(scopedRuntime, "Cancelled by user");
}
if (!hasActiveTurn && hasPendingApprovals) {
emitInterruptedStatusDelta(socket, runtime, {
runId: runtime.activeRunId,
emitInterruptedStatusDelta(socket, scopedRuntime, {
runId: scopedRuntime.activeRunId,
agentId: parsed.runtime.agent_id,
conversationId: parsed.runtime.conversation_id,
});
@@ -712,8 +869,8 @@ async function connectWithRetry(
// Backend cancel parity with TUI (App.tsx:5932-5941).
// Fire-and-forget — local cancel + queued results are the primary mechanism.
const cancelConversationId = runtime.activeConversationId;
const cancelAgentId = runtime.activeAgentId;
const cancelConversationId = scopedRuntime.conversationId;
const cancelAgentId = scopedRuntime.agentId;
if (cancelAgentId) {
getClient()
.then((client) => {
@@ -728,7 +885,7 @@ async function connectWithRetry(
});
}
scheduleQueuePump(runtime, socket, opts, processQueuedTurn);
scheduleQueuePump(scopedRuntime, socket, opts, processQueuedTurn);
return;
}
});
@@ -746,8 +903,12 @@ async function connectWithRetry(
// Single authoritative queue clear for all close paths
// (intentional and unintentional). Must fire before early returns.
runtime.queuedMessagesByItemId.clear();
runtime.queueRuntime.clear("shutdown");
for (const conversationRuntime of runtime.conversationRuntimes.values()) {
conversationRuntime.queuedMessagesByItemId.clear();
if (conversationRuntime.queueRuntime) {
conversationRuntime.queueRuntime.clear("shutdown");
}
}
if (isDebugEnabled()) {
console.log(
@@ -758,7 +919,14 @@ async function connectWithRetry(
clearRuntimeTimers(runtime);
killAllTerminals();
runtime.socket = null;
rejectPendingApprovalResolvers(runtime, "WebSocket disconnected");
for (const conversationRuntime of runtime.conversationRuntimes.values()) {
rejectPendingApprovalResolvers(
conversationRuntime,
"WebSocket disconnected",
);
clearConversationRuntimeState(conversationRuntime);
evictConversationRuntimeIfIdle(conversationRuntime);
}
if (runtime.intentionallyClosed) {
opts.onDisconnected();
@@ -825,6 +993,203 @@ export function stopListenerClient(): void {
stopRuntime(runtime, true);
}
function asListenerRuntimeForTests(
runtime: ListenerRuntime | ConversationRuntime,
): ListenerRuntime {
return "listener" in runtime ? runtime.listener : runtime;
}
function createLegacyTestRuntime(): ConversationRuntime & {
activeAgentId: string | null;
activeConversationId: string;
socket: WebSocket | null;
workingDirectoryByConversation: Map<string, string>;
bootWorkingDirectory: string;
connectionId: string | null;
connectionName: string | null;
sessionId: string;
eventSeqCounter: number;
queueEmitScheduled: boolean;
pendingQueueEmitScope?: {
agent_id?: string | null;
conversation_id?: string | null;
};
onWsEvent?: StartListenerOptions["onWsEvent"];
reminderState: ListenerRuntime["reminderState"];
reconnectTimeout: NodeJS.Timeout | null;
heartbeatInterval: NodeJS.Timeout | null;
intentionallyClosed: boolean;
hasSuccessfulConnection: boolean;
conversationRuntimes: ListenerRuntime["conversationRuntimes"];
approvalRuntimeKeyByRequestId: ListenerRuntime["approvalRuntimeKeyByRequestId"];
lastEmittedStatus: ListenerRuntime["lastEmittedStatus"];
} {
const listener = createRuntime();
const runtime = getOrCreateScopedRuntime(listener, null, "default");
const bridge = runtime as ConversationRuntime & {
activeAgentId: string | null;
activeConversationId: string;
socket: WebSocket | null;
workingDirectoryByConversation: Map<string, string>;
bootWorkingDirectory: string;
connectionId: string | null;
connectionName: string | null;
sessionId: string;
eventSeqCounter: number;
queueEmitScheduled: boolean;
pendingQueueEmitScope?: {
agent_id?: string | null;
conversation_id?: string | null;
};
onWsEvent?: StartListenerOptions["onWsEvent"];
reminderState: ListenerRuntime["reminderState"];
reconnectTimeout: NodeJS.Timeout | null;
heartbeatInterval: NodeJS.Timeout | null;
intentionallyClosed: boolean;
hasSuccessfulConnection: boolean;
conversationRuntimes: ListenerRuntime["conversationRuntimes"];
approvalRuntimeKeyByRequestId: ListenerRuntime["approvalRuntimeKeyByRequestId"];
lastEmittedStatus: ListenerRuntime["lastEmittedStatus"];
};
for (const [prop, getSet] of Object.entries({
socket: {
get: () => listener.socket,
set: (value: WebSocket | null) => {
listener.socket = value;
},
},
workingDirectoryByConversation: {
get: () => listener.workingDirectoryByConversation,
set: (value: Map<string, string>) => {
listener.workingDirectoryByConversation = value;
},
},
bootWorkingDirectory: {
get: () => listener.bootWorkingDirectory,
set: (value: string) => {
listener.bootWorkingDirectory = value;
},
},
connectionId: {
get: () => listener.connectionId,
set: (value: string | null) => {
listener.connectionId = value;
},
},
connectionName: {
get: () => listener.connectionName,
set: (value: string | null) => {
listener.connectionName = value;
},
},
sessionId: {
get: () => listener.sessionId,
set: (value: string) => {
listener.sessionId = value;
},
},
eventSeqCounter: {
get: () => listener.eventSeqCounter,
set: (value: number) => {
listener.eventSeqCounter = value;
},
},
queueEmitScheduled: {
get: () => listener.queueEmitScheduled,
set: (value: boolean) => {
listener.queueEmitScheduled = value;
},
},
pendingQueueEmitScope: {
get: () => listener.pendingQueueEmitScope,
set: (
value:
| {
agent_id?: string | null;
conversation_id?: string | null;
}
| undefined,
) => {
listener.pendingQueueEmitScope = value;
},
},
onWsEvent: {
get: () => listener.onWsEvent,
set: (value: StartListenerOptions["onWsEvent"] | undefined) => {
listener.onWsEvent = value;
},
},
reminderState: {
get: () => listener.reminderState,
set: (value: ListenerRuntime["reminderState"]) => {
listener.reminderState = value;
},
},
reconnectTimeout: {
get: () => listener.reconnectTimeout,
set: (value: NodeJS.Timeout | null) => {
listener.reconnectTimeout = value;
},
},
heartbeatInterval: {
get: () => listener.heartbeatInterval,
set: (value: NodeJS.Timeout | null) => {
listener.heartbeatInterval = value;
},
},
intentionallyClosed: {
get: () => listener.intentionallyClosed,
set: (value: boolean) => {
listener.intentionallyClosed = value;
},
},
hasSuccessfulConnection: {
get: () => listener.hasSuccessfulConnection,
set: (value: boolean) => {
listener.hasSuccessfulConnection = value;
},
},
conversationRuntimes: {
get: () => listener.conversationRuntimes,
set: (value: ListenerRuntime["conversationRuntimes"]) => {
listener.conversationRuntimes = value;
},
},
approvalRuntimeKeyByRequestId: {
get: () => listener.approvalRuntimeKeyByRequestId,
set: (value: ListenerRuntime["approvalRuntimeKeyByRequestId"]) => {
listener.approvalRuntimeKeyByRequestId = value;
},
},
lastEmittedStatus: {
get: () => listener.lastEmittedStatus,
set: (value: ListenerRuntime["lastEmittedStatus"]) => {
listener.lastEmittedStatus = value;
},
},
activeAgentId: {
get: () => runtime.agentId,
set: (value: string | null) => {
runtime.agentId = value;
},
},
activeConversationId: {
get: () => runtime.conversationId,
set: (value: string) => {
runtime.conversationId = value;
},
},
})) {
Object.defineProperty(bridge, prop, {
configurable: true,
enumerable: false,
get: getSet.get,
set: getSet.set,
});
}
return bridge;
}
export {
rejectPendingApprovalResolvers,
requestApprovalOverWS,
@@ -834,8 +1199,16 @@ export { parseServerMessage } from "./protocol-inbound";
export { emitInterruptedStatusDelta } from "./protocol-outbound";
export const __listenClientTestUtils = {
createRuntime,
stopRuntime,
createRuntime: createLegacyTestRuntime,
createListenerRuntime: createRuntime,
getOrCreateScopedRuntime,
stopRuntime: (
runtime: ListenerRuntime | ConversationRuntime,
suppressCallbacks: boolean,
) => stopRuntime(asListenerRuntimeForTests(runtime), suppressCallbacks),
setActiveRuntime,
getListenerStatus,
getOrCreateConversationRuntime,
resolveRuntimeScope,
buildDeviceStatus,
buildLoopStatus,
@@ -864,7 +1237,20 @@ export const __listenClientTestUtils = {
markAwaitingAcceptedApprovalContinuationRunId,
normalizeMessageContentImages,
normalizeInboundMessages,
handleIncomingMessage,
handleApprovalResponseInput,
scheduleQueuePump,
recoverApprovalStateForSync,
clearRecoveredApprovalStateForScope,
clearRecoveredApprovalStateForScope: (
runtime: ListenerRuntime | ConversationRuntime,
scope?: {
agent_id?: string | null;
conversation_id?: string | null;
},
) =>
clearRecoveredApprovalStateForScope(
asListenerRuntimeForTests(runtime),
scope,
),
emitStateSync,
};

View File

@@ -14,9 +14,9 @@ import {
} from "./protocol-outbound";
import { clearRecoveredApprovalState } from "./runtime";
import type {
ConversationRuntime,
InterruptPopulateInput,
InterruptToolReturn,
ListenerRuntime,
RecoveredApprovalState,
} from "./types";
@@ -87,7 +87,7 @@ export function normalizeInterruptedApprovalsForQueue(
}
export function normalizeExecutionResultsForInterruptParity(
runtime: ListenerRuntime,
runtime: ConversationRuntime,
executionResults: ApprovalResult[],
executingToolCallIds: string[],
): ApprovalResult[] {
@@ -273,7 +273,7 @@ export function extractInterruptToolReturns(
export function emitInterruptToolReturnMessage(
socket: WebSocket,
runtime: ListenerRuntime,
runtime: ConversationRuntime,
approvals: ApprovalResult[] | null,
runId?: string | null,
uuidPrefix: string = "interrupt-tool-return",
@@ -308,8 +308,8 @@ export function emitInterruptToolReturnMessage(
],
},
{
agent_id: runtime.activeAgentId ?? undefined,
conversation_id: runtime.activeConversationId ?? undefined,
agent_id: runtime.agentId ?? undefined,
conversation_id: runtime.conversationId,
},
);
}
@@ -317,7 +317,7 @@ export function emitInterruptToolReturnMessage(
export function emitToolExecutionStartedEvents(
socket: WebSocket,
runtime: ListenerRuntime,
runtime: ConversationRuntime,
params: {
toolCallIds: string[];
runId?: string | null;
@@ -339,7 +339,7 @@ export function emitToolExecutionStartedEvents(
export function emitToolExecutionFinishedEvents(
socket: WebSocket,
runtime: ListenerRuntime,
runtime: ConversationRuntime,
params: {
approvals: ApprovalResult[] | null;
runId?: string | null;
@@ -362,7 +362,7 @@ export function emitToolExecutionFinishedEvents(
}
export function getInterruptApprovalsForEmission(
runtime: ListenerRuntime,
runtime: ConversationRuntime,
params: {
lastExecutionResults: ApprovalResult[] | null;
agentId: string;
@@ -391,7 +391,7 @@ export function getInterruptApprovalsForEmission(
}
export function populateInterruptQueue(
runtime: ListenerRuntime,
runtime: ConversationRuntime,
input: InterruptPopulateInput,
): boolean {
const shouldPopulate =
@@ -467,7 +467,7 @@ export function populateInterruptQueue(
}
export function consumeInterruptQueue(
runtime: ListenerRuntime,
runtime: ConversationRuntime,
agentId: string,
conversationId: string,
): {
@@ -519,7 +519,7 @@ export function consumeInterruptQueue(
}
export function stashRecoveredApprovalInterrupts(
runtime: ListenerRuntime,
runtime: ConversationRuntime,
recovered: RecoveredApprovalState,
): boolean {
const approvals = [...recovered.approvalsByRequestId.values()].map(

View File

@@ -23,21 +23,51 @@ import type {
import { SYSTEM_REMINDER_RE } from "./constants";
import { getConversationWorkingDirectory } from "./cwd";
import {
getConversationRuntime,
getPendingControlRequests,
getRecoveredApprovalStateForScope,
nextEventSeq,
safeEmitWsEvent,
} from "./runtime";
import {
isScopeCurrentlyActive,
resolveRuntimeScope,
resolveScopedAgentId,
resolveScopedConversationId,
} from "./scope";
import type { IncomingMessage, ListenerRuntime } from "./types";
import type {
ConversationRuntime,
IncomingMessage,
ListenerRuntime,
} from "./types";
type RuntimeCarrier = ListenerRuntime | ConversationRuntime | null;
function getListenerRuntime(runtime: RuntimeCarrier): ListenerRuntime | null {
if (!runtime) return null;
return "listener" in runtime ? runtime.listener : runtime;
}
function getScopeForRuntime(
runtime: RuntimeCarrier,
scope?: {
agent_id?: string | null;
conversation_id?: string | null;
},
): {
agent_id?: string | null;
conversation_id?: string | null;
} {
if (runtime && "listener" in runtime) {
return {
agent_id: scope?.agent_id ?? runtime.agentId,
conversation_id: scope?.conversation_id ?? runtime.conversationId,
};
}
return scope ?? {};
}
export function emitRuntimeStateUpdates(
runtime: ListenerRuntime,
runtime: RuntimeCarrier,
scope?: {
agent_id?: string | null;
conversation_id?: string | null;
@@ -48,16 +78,35 @@ export function emitRuntimeStateUpdates(
}
export function buildDeviceStatus(
runtime: ListenerRuntime,
runtime: RuntimeCarrier,
params?: {
agent_id?: string | null;
conversation_id?: string | null;
},
): DeviceStatus {
const scopedAgentId = resolveScopedAgentId(runtime, params);
const scopedConversationId = resolveScopedConversationId(runtime, params);
const scopeActive = isScopeCurrentlyActive(
runtime,
const listener = getListenerRuntime(runtime);
if (!listener) {
return {
current_connection_id: null,
connection_name: null,
is_online: false,
is_processing: false,
current_permission_mode: permissionMode.getMode(),
current_working_directory: process.cwd(),
letta_code_version: process.env.npm_package_version || null,
current_toolset: null,
current_toolset_preference: "auto",
current_loaded_tools: getToolNames(),
current_available_skills: [],
background_processes: [],
pending_control_requests: [],
};
}
const scope = getScopeForRuntime(runtime, params);
const scopedAgentId = resolveScopedAgentId(listener, scope);
const scopedConversationId = resolveScopedConversationId(listener, scope);
const conversationRuntime = getConversationRuntime(
listener,
scopedAgentId,
scopedConversationId,
);
@@ -72,13 +121,13 @@ export function buildDeviceStatus(
}
})();
return {
current_connection_id: runtime.connectionId,
connection_name: runtime.connectionName,
is_online: runtime.socket?.readyState === WebSocket.OPEN,
is_processing: scopeActive && runtime.isProcessing,
current_connection_id: listener.connectionId,
connection_name: listener.connectionName,
is_online: listener.socket?.readyState === WebSocket.OPEN,
is_processing: !!conversationRuntime?.isProcessing,
current_permission_mode: permissionMode.getMode(),
current_working_directory: getConversationWorkingDirectory(
runtime,
listener,
scopedAgentId,
scopedConversationId,
),
@@ -88,44 +137,62 @@ export function buildDeviceStatus(
current_loaded_tools: getToolNames(),
current_available_skills: [],
background_processes: [],
pending_control_requests: getPendingControlRequests(runtime, params),
pending_control_requests: getPendingControlRequests(listener, scope),
};
}
export function buildLoopStatus(
runtime: ListenerRuntime,
runtime: RuntimeCarrier,
params?: {
agent_id?: string | null;
conversation_id?: string | null;
},
): LoopState {
const scopedAgentId = resolveScopedAgentId(runtime, params);
const scopedConversationId = resolveScopedConversationId(runtime, params);
const scopeActive = isScopeCurrentlyActive(
runtime,
const listener = getListenerRuntime(runtime);
if (!listener) {
return { status: "WAITING_ON_INPUT", active_run_ids: [] };
}
const scope = getScopeForRuntime(runtime, params);
const scopedAgentId = resolveScopedAgentId(listener, scope);
const scopedConversationId = resolveScopedConversationId(listener, scope);
const conversationRuntime = getConversationRuntime(
listener,
scopedAgentId,
scopedConversationId,
);
if (!scopeActive) {
return { status: "WAITING_ON_INPUT", active_run_ids: [] };
}
const recovered = getRecoveredApprovalStateForScope(runtime, params);
const recovered = getRecoveredApprovalStateForScope(listener, scope);
const status =
recovered &&
recovered.pendingRequestIds.size > 0 &&
runtime.loopStatus === "WAITING_ON_INPUT"
conversationRuntime?.loopStatus === "WAITING_ON_INPUT"
? "WAITING_ON_APPROVAL"
: runtime.loopStatus;
: (conversationRuntime?.loopStatus ?? "WAITING_ON_INPUT");
return {
status,
active_run_ids: runtime.activeRunId ? [runtime.activeRunId] : [],
active_run_ids: conversationRuntime?.activeRunId
? [conversationRuntime.activeRunId]
: [],
};
}
export function buildQueueSnapshot(runtime: ListenerRuntime): QueueMessage[] {
return runtime.queueRuntime.items.map((item) => ({
export function buildQueueSnapshot(
runtime: RuntimeCarrier,
params?: {
agent_id?: string | null;
conversation_id?: string | null;
},
): QueueMessage[] {
const listener = getListenerRuntime(runtime);
if (!listener) {
return [];
}
const scope = getScopeForRuntime(runtime, params);
const conversationRuntime = getConversationRuntime(
listener,
resolveScopedAgentId(listener, scope),
resolveScopedConversationId(listener, scope),
);
return (conversationRuntime?.queueRuntime.items ?? []).map((item) => ({
id: item.id,
client_message_id: item.clientMessageId ?? `cm-${item.id}`,
kind: item.kind,
@@ -136,7 +203,7 @@ export function buildQueueSnapshot(runtime: ListenerRuntime): QueueMessage[] {
}
export function setLoopStatus(
runtime: ListenerRuntime,
runtime: ConversationRuntime,
status: LoopStatus,
scope?: {
agent_id?: string | null;
@@ -152,7 +219,7 @@ export function setLoopStatus(
export function emitProtocolV2Message(
socket: WebSocket,
runtime: ListenerRuntime | null,
runtime: RuntimeCarrier,
message: Omit<
WsProtocolMessage,
"runtime" | "event_seq" | "emitted_at" | "idempotency_key"
@@ -165,11 +232,15 @@ export function emitProtocolV2Message(
if (socket.readyState !== WebSocket.OPEN) {
return;
}
const runtimeScope = resolveRuntimeScope(runtime, scope);
const listener = getListenerRuntime(runtime);
const runtimeScope = resolveRuntimeScope(
listener,
getScopeForRuntime(runtime, scope),
);
if (!runtimeScope) {
return;
}
const eventSeq = nextEventSeq(runtime);
const eventSeq = nextEventSeq(listener);
if (eventSeq === null) {
return;
}
@@ -201,7 +272,7 @@ export function emitProtocolV2Message(
export function emitDeviceStatusUpdate(
socket: WebSocket,
runtime: ListenerRuntime,
runtime: RuntimeCarrier,
scope?: {
agent_id?: string | null;
conversation_id?: string | null;
@@ -219,7 +290,7 @@ export function emitDeviceStatusUpdate(
export function emitLoopStatusUpdate(
socket: WebSocket,
runtime: ListenerRuntime,
runtime: RuntimeCarrier,
scope?: {
agent_id?: string | null;
conversation_id?: string | null;
@@ -236,53 +307,52 @@ export function emitLoopStatusUpdate(
}
export function emitLoopStatusIfOpen(
runtime: ListenerRuntime,
runtime: RuntimeCarrier,
scope?: {
agent_id?: string | null;
conversation_id?: string | null;
},
): void {
if (runtime.socket?.readyState === WebSocket.OPEN) {
emitLoopStatusUpdate(runtime.socket, runtime, scope);
const listener = getListenerRuntime(runtime);
if (listener?.socket?.readyState === WebSocket.OPEN) {
emitLoopStatusUpdate(listener.socket, runtime, scope);
}
}
export function emitDeviceStatusIfOpen(
runtime: ListenerRuntime,
runtime: RuntimeCarrier,
scope?: {
agent_id?: string | null;
conversation_id?: string | null;
},
): void {
if (runtime.socket?.readyState === WebSocket.OPEN) {
emitDeviceStatusUpdate(runtime.socket, runtime, scope);
const listener = getListenerRuntime(runtime);
if (listener?.socket?.readyState === WebSocket.OPEN) {
emitDeviceStatusUpdate(listener.socket, runtime, scope);
}
}
export function emitQueueUpdate(
socket: WebSocket,
runtime: ListenerRuntime,
runtime: RuntimeCarrier,
scope?: {
agent_id?: string | null;
conversation_id?: string | null;
},
): void {
const scopedAgentId = resolveScopedAgentId(runtime, scope);
const scopedConversationId = resolveScopedConversationId(runtime, scope);
const scopeActive = isScopeCurrentlyActive(
runtime,
scopedAgentId,
scopedConversationId,
);
const listener = getListenerRuntime(runtime);
if (!listener) {
return;
}
const resolvedScope = getScopeForRuntime(runtime, scope);
const message: Omit<
QueueUpdateMessage,
"runtime" | "event_seq" | "emitted_at" | "idempotency_key"
> = {
type: "update_queue",
queue: scopeActive ? buildQueueSnapshot(runtime) : [],
queue: buildQueueSnapshot(runtime, resolvedScope),
};
emitProtocolV2Message(socket, runtime, message, scope);
emitProtocolV2Message(socket, runtime, message, resolvedScope);
}
export function isSystemReminderPart(part: unknown): boolean {
@@ -305,7 +375,7 @@ export function isSystemReminderPart(part: unknown): boolean {
export function emitDequeuedUserMessage(
socket: WebSocket,
runtime: ListenerRuntime,
runtime: RuntimeCarrier,
incoming: IncomingMessage,
batch: DequeuedBatch,
): void {
@@ -353,20 +423,21 @@ export function emitDequeuedUserMessage(
}
export function emitQueueUpdateIfOpen(
runtime: ListenerRuntime,
runtime: RuntimeCarrier,
scope?: {
agent_id?: string | null;
conversation_id?: string | null;
},
): void {
if (runtime.socket?.readyState === WebSocket.OPEN) {
emitQueueUpdate(runtime.socket, runtime, scope);
const listener = getListenerRuntime(runtime);
if (listener?.socket?.readyState === WebSocket.OPEN) {
emitQueueUpdate(listener.socket, runtime, scope);
}
}
export function emitStateSync(
socket: WebSocket,
runtime: ListenerRuntime,
runtime: RuntimeCarrier,
scope: RuntimeScope,
): void {
emitDeviceStatusUpdate(socket, runtime, scope);
@@ -413,7 +484,7 @@ export function createLifecycleMessageBase<TMessageType extends string>(
export function emitCanonicalMessageDelta(
socket: WebSocket,
runtime: ListenerRuntime | null,
runtime: RuntimeCarrier,
delta: StreamDelta,
scope?: {
agent_id?: string | null;
@@ -425,7 +496,7 @@ export function emitCanonicalMessageDelta(
export function emitLoopErrorDelta(
socket: WebSocket,
runtime: ListenerRuntime | null,
runtime: RuntimeCarrier,
params: {
message: string;
stopReason: StopReasonType;
@@ -453,7 +524,7 @@ export function emitLoopErrorDelta(
export function emitRetryDelta(
socket: WebSocket,
runtime: ListenerRuntime,
runtime: RuntimeCarrier,
params: {
message: string;
reason: StopReasonType;
@@ -481,7 +552,7 @@ export function emitRetryDelta(
export function emitStatusDelta(
socket: WebSocket,
runtime: ListenerRuntime | null,
runtime: RuntimeCarrier,
params: {
message: string;
level: StatusMessage["level"];
@@ -503,7 +574,7 @@ export function emitStatusDelta(
export function emitInterruptedStatusDelta(
socket: WebSocket,
runtime: ListenerRuntime | null,
runtime: RuntimeCarrier,
params: {
runId?: string | null;
agentId?: string | null;
@@ -521,7 +592,7 @@ export function emitInterruptedStatusDelta(
export function emitStreamDelta(
socket: WebSocket,
runtime: ListenerRuntime | null,
runtime: RuntimeCarrier,
delta: StreamDelta,
scope?: {
agent_id?: string | null;

View File

@@ -9,12 +9,18 @@ import type {
import { mergeQueuedTurnInput } from "../../queue/turnQueueRuntime";
import { getListenerBlockedReason } from "../helpers/listenerQueueAdapter";
import { emitDequeuedUserMessage } from "./protocol-outbound";
import { getActiveRuntime, getPendingControlRequestCount } from "./runtime";
import {
emitListenerStatus,
evictConversationRuntimeIfIdle,
getActiveRuntime,
getListenerStatus,
getPendingControlRequestCount,
} from "./runtime";
import { resolveRuntimeScope } from "./scope";
import type {
ConversationRuntime,
InboundMessagePayload,
IncomingMessage,
ListenerRuntime,
StartListenerOptions,
} from "./types";
@@ -188,7 +194,7 @@ function getPrimaryQueueMessageItem(items: QueueItem[]): QueueItem | null {
}
function buildQueuedTurnMessage(
runtime: ListenerRuntime,
runtime: ConversationRuntime,
batch: DequeuedBatch,
): IncomingMessage | null {
const primaryItem = getPrimaryQueueMessageItem(batch.items);
@@ -241,13 +247,16 @@ export function shouldQueueInboundMessage(parsed: IncomingMessage): boolean {
}
function computeListenerQueueBlockedReason(
runtime: ListenerRuntime,
runtime: ConversationRuntime,
): QueueBlockedReason | null {
const activeScope = resolveRuntimeScope(runtime);
const activeScope = resolveRuntimeScope(runtime.listener, {
agent_id: runtime.agentId,
conversation_id: runtime.conversationId,
});
return getListenerBlockedReason({
isProcessing: runtime.isProcessing,
pendingApprovalsLen: activeScope
? getPendingControlRequestCount(runtime, activeScope)
? getPendingControlRequestCount(runtime.listener, activeScope)
: 0,
cancelRequested: runtime.cancelRequested,
isRecoveringApprovals: runtime.isRecoveringApprovals,
@@ -255,7 +264,7 @@ function computeListenerQueueBlockedReason(
}
async function drainQueuedMessages(
runtime: ListenerRuntime,
runtime: ConversationRuntime,
socket: WebSocket,
opts: StartListenerOptions,
processQueuedTurn: (
@@ -270,7 +279,10 @@ async function drainQueuedMessages(
runtime.queuePumpActive = true;
try {
while (true) {
if (runtime !== getActiveRuntime() || runtime.intentionallyClosed) {
if (
runtime.listener !== getActiveRuntime() ||
runtime.listener.intentionallyClosed
) {
return;
}
@@ -297,17 +309,33 @@ async function drainQueuedMessages(
emitDequeuedUserMessage(socket, runtime, queuedTurn, dequeuedBatch);
opts.onStatusChange?.("receiving", opts.connectionId);
const preTurnStatus =
getListenerStatus(runtime.listener) === "processing"
? "processing"
: "receiving";
if (
opts.connectionId &&
runtime.listener.lastEmittedStatus !== preTurnStatus
) {
runtime.listener.lastEmittedStatus = preTurnStatus;
opts.onStatusChange?.(preTurnStatus, opts.connectionId);
}
await processQueuedTurn(queuedTurn, dequeuedBatch);
opts.onStatusChange?.("idle", opts.connectionId);
emitListenerStatus(
runtime.listener,
opts.onStatusChange,
opts.connectionId,
);
evictConversationRuntimeIfIdle(runtime);
}
} finally {
runtime.queuePumpActive = false;
evictConversationRuntimeIfIdle(runtime);
}
}
export function scheduleQueuePump(
runtime: ListenerRuntime,
runtime: ConversationRuntime,
socket: WebSocket,
opts: StartListenerOptions,
processQueuedTurn: (
@@ -322,7 +350,10 @@ export function scheduleQueuePump(
runtime.messageQueue = runtime.messageQueue
.then(async () => {
runtime.queuePumpScheduled = false;
if (runtime !== getActiveRuntime() || runtime.intentionallyClosed) {
if (
runtime.listener !== getActiveRuntime() ||
runtime.listener.intentionallyClosed
) {
return;
}
await drainQueuedMessages(runtime, socket, opts, processQueuedTurn);
@@ -330,6 +361,11 @@ export function scheduleQueuePump(
.catch((error: unknown) => {
runtime.queuePumpScheduled = false;
console.error("[Listen] Error in queue pump:", error);
opts.onStatusChange?.("idle", opts.connectionId);
emitListenerStatus(
runtime.listener,
opts.onStatusChange,
opts.connectionId,
);
evictConversationRuntimeIfIdle(runtime);
});
}

View File

@@ -44,8 +44,8 @@ import {
} from "./protocol-outbound";
import { clearActiveRunState, clearRecoveredApprovalState } from "./runtime";
import type {
ConversationRuntime,
IncomingMessage,
ListenerRuntime,
RecoveredPendingApproval,
} from "./types";
@@ -128,7 +128,7 @@ export async function isRetriablePostStopError(
export async function drainRecoveryStreamWithEmission(
recoveryStream: Stream<LettaStreamingResponse>,
socket: WebSocket,
runtime: ListenerRuntime,
runtime: ConversationRuntime,
params: {
agentId?: string | null;
conversationId: string;
@@ -195,7 +195,7 @@ export async function drainRecoveryStreamWithEmission(
}
export function finalizeHandledRecoveryTurn(
runtime: ListenerRuntime,
runtime: ConversationRuntime,
socket: WebSocket,
params: {
drainResult: Awaited<ReturnType<typeof drainStreamWithResume>>;
@@ -256,7 +256,7 @@ export function getApprovalContinuationRecoveryDisposition(
}
export async function debugLogApprovalResumeState(
runtime: ListenerRuntime,
runtime: ConversationRuntime,
params: {
agentId: string;
conversationId: string;
@@ -319,12 +319,12 @@ export async function debugLogApprovalResumeState(
}
export async function recoverApprovalStateForSync(
runtime: ListenerRuntime,
runtime: ConversationRuntime,
scope: { agent_id: string; conversation_id: string },
): Promise<void> {
const sameActiveScope =
runtime.activeAgentId === scope.agent_id &&
runtime.activeConversationId === scope.conversation_id;
runtime.agentId === scope.agent_id &&
runtime.conversationId === scope.conversation_id;
if (
sameActiveScope &&
@@ -385,7 +385,7 @@ export async function recoverApprovalStateForSync(
approval.toolName,
input,
getConversationWorkingDirectory(
runtime,
runtime.listener,
scope.agent_id,
scope.conversation_id,
),
@@ -422,13 +422,13 @@ export async function recoverApprovalStateForSync(
}
export async function resolveRecoveredApprovalResponse(
runtime: ListenerRuntime,
runtime: ConversationRuntime,
socket: WebSocket,
response: ApprovalResponseBody,
processTurn: (
msg: IncomingMessage,
socket: WebSocket,
runtime: ListenerRuntime,
runtime: ConversationRuntime,
onStatusChange?: (
status: "idle" | "receiving" | "processing",
connectionId: string,
@@ -515,10 +515,8 @@ export async function resolveRecoveredApprovalResponse(
emitRuntimeStateUpdates(runtime, scope);
runtime.isProcessing = true;
runtime.activeAgentId = recovered.agentId;
runtime.activeConversationId = recovered.conversationId;
runtime.activeWorkingDirectory = getConversationWorkingDirectory(
runtime,
runtime.listener,
recovered.agentId,
recovered.conversationId,
);
@@ -537,7 +535,7 @@ export async function resolveRecoveredApprovalResponse(
const approvalResults = await executeApprovalBatch(decisions, undefined, {
abortSignal: recoveryAbortController.signal,
workingDirectory: getConversationWorkingDirectory(
runtime,
runtime.listener,
recovered.agentId,
recovered.conversationId,
),

View File

@@ -1,6 +1,16 @@
import type { PendingControlRequest } from "../../types/protocol_v2";
import { resolveScopedAgentId, resolveScopedConversationId } from "./scope";
import type { ListenerRuntime, RecoveredApprovalState } from "./types";
import {
normalizeConversationId,
normalizeCwdAgentId,
resolveScopedAgentId,
resolveScopedConversationId,
} from "./scope";
import type {
ConversationRuntime,
ListenerRuntime,
RecoveredApprovalState,
StartListenerOptions,
} from "./types";
let activeRuntime: ListenerRuntime | null = null;
@@ -43,17 +53,195 @@ export function clearRuntimeTimers(runtime: ListenerRuntime): void {
}
}
export function clearActiveRunState(runtime: ListenerRuntime): void {
runtime.activeAgentId = null;
runtime.activeConversationId = null;
export function evictConversationRuntimeIfIdle(
runtime: ConversationRuntime,
): boolean {
if (
runtime.isProcessing ||
runtime.isRecoveringApprovals ||
runtime.queuePumpActive ||
runtime.queuePumpScheduled ||
runtime.pendingTurns > 0 ||
runtime.pendingApprovalResolvers.size > 0 ||
runtime.pendingApprovalBatchByToolCallId.size > 0 ||
runtime.recoveredApprovalState !== null ||
runtime.pendingInterruptedResults !== null ||
runtime.pendingInterruptedContext !== null ||
runtime.activeExecutingToolCallIds.length > 0 ||
(runtime.pendingInterruptedToolCallIds?.length ?? 0) > 0 ||
runtime.activeRunId !== null ||
runtime.activeRunStartedAt !== null ||
runtime.activeAbortController !== null ||
runtime.cancelRequested ||
runtime.queuedMessagesByItemId.size > 0 ||
runtime.queueRuntime?.length > 0
) {
return false;
}
if (runtime.listener.conversationRuntimes.get(runtime.key) !== runtime) {
return false;
}
runtime.listener.conversationRuntimes.delete(runtime.key);
for (const [requestId, runtimeKey] of runtime.listener
.approvalRuntimeKeyByRequestId) {
if (runtimeKey === runtime.key) {
runtime.listener.approvalRuntimeKeyByRequestId.delete(requestId);
}
}
if (
runtime.listener.pendingQueueEmitScope?.agent_id === runtime.agentId &&
normalizeConversationId(
runtime.listener.pendingQueueEmitScope?.conversation_id,
) === runtime.conversationId
) {
runtime.listener.pendingQueueEmitScope = undefined;
}
return true;
}
export function getListenerStatus(
listener: ListenerRuntime,
): "idle" | "receiving" | "processing" {
let hasPendingTurns = false;
for (const runtime of listener.conversationRuntimes.values()) {
if (runtime.isProcessing || runtime.isRecoveringApprovals) {
return "processing";
}
if (runtime.pendingTurns > 0) {
hasPendingTurns = true;
}
}
return hasPendingTurns ? "receiving" : "idle";
}
export function emitListenerStatus(
listener: ListenerRuntime,
onStatusChange: StartListenerOptions["onStatusChange"] | undefined,
connectionId: string | undefined,
): void {
if (!connectionId) {
return;
}
const status = getListenerStatus(listener);
if (listener.lastEmittedStatus === status) {
return;
}
listener.lastEmittedStatus = status;
onStatusChange?.(status, connectionId);
}
export function getConversationRuntimeKey(
agentId?: string | null,
conversationId?: string | null,
): string {
const normalizedConversationId = normalizeConversationId(conversationId);
const normalizedAgentId = normalizeCwdAgentId(agentId);
return `agent:${normalizedAgentId ?? "__unknown__"}::conversation:${normalizedConversationId}`;
}
export function createConversationRuntime(
listener: ListenerRuntime,
agentId?: string | null,
conversationId?: string | null,
): ConversationRuntime {
const normalizedAgentId = normalizeCwdAgentId(agentId);
const normalizedConversationId = normalizeConversationId(conversationId);
const conversationRuntime: ConversationRuntime = {
listener,
key: getConversationRuntimeKey(normalizedAgentId, normalizedConversationId),
agentId: normalizedAgentId,
conversationId: normalizedConversationId,
messageQueue: Promise.resolve(),
pendingApprovalResolvers: new Map(),
recoveredApprovalState: null,
lastStopReason: null,
isProcessing: false,
activeWorkingDirectory: null,
activeRunId: null,
activeRunStartedAt: null,
activeAbortController: null,
cancelRequested: false,
queueRuntime: null as unknown as ConversationRuntime["queueRuntime"],
queuedMessagesByItemId: new Map(),
queuePumpActive: false,
queuePumpScheduled: false,
pendingTurns: 0,
isRecoveringApprovals: false,
loopStatus: "WAITING_ON_INPUT",
pendingApprovalBatchByToolCallId: new Map(),
pendingInterruptedResults: null,
pendingInterruptedContext: null,
continuationEpoch: 0,
activeExecutingToolCallIds: [],
pendingInterruptedToolCallIds: null,
};
listener.conversationRuntimes.set(
conversationRuntime.key,
conversationRuntime,
);
return conversationRuntime;
}
export function getConversationRuntime(
listener: ListenerRuntime,
agentId?: string | null,
conversationId?: string | null,
): ConversationRuntime | null {
return (
listener.conversationRuntimes.get(
getConversationRuntimeKey(agentId, conversationId),
) ?? null
);
}
export function getOrCreateConversationRuntime(
listener: ListenerRuntime,
agentId?: string | null,
conversationId?: string | null,
): ConversationRuntime {
return (
getConversationRuntime(listener, agentId, conversationId) ??
createConversationRuntime(listener, agentId, conversationId)
);
}
export function clearActiveRunState(runtime: ConversationRuntime): void {
runtime.activeWorkingDirectory = null;
runtime.activeRunId = null;
runtime.activeRunStartedAt = null;
runtime.activeAbortController = null;
}
export function clearRecoveredApprovalState(runtime: ListenerRuntime): void {
export function clearRecoveredApprovalState(
runtime: ConversationRuntime,
): void {
runtime.recoveredApprovalState = null;
evictConversationRuntimeIfIdle(runtime);
}
export function clearConversationRuntimeState(
runtime: ConversationRuntime,
): void {
runtime.cancelRequested = true;
if (
runtime.activeAbortController &&
!runtime.activeAbortController.signal.aborted
) {
runtime.activeAbortController.abort();
}
runtime.pendingApprovalBatchByToolCallId.clear();
runtime.pendingInterruptedResults = null;
runtime.pendingInterruptedContext = null;
runtime.pendingInterruptedToolCallIds = null;
runtime.activeExecutingToolCallIds = [];
runtime.loopStatus = "WAITING_ON_INPUT";
runtime.continuationEpoch += 1;
runtime.pendingTurns = 0;
runtime.queuePumpActive = false;
runtime.queuePumpScheduled = false;
clearActiveRunState(runtime);
}
export function getRecoveredApprovalStateForScope(
@@ -68,7 +256,12 @@ export function getRecoveredApprovalStateForScope(
return null;
}
const scopedConversationId = resolveScopedConversationId(runtime, params);
const recovered = runtime.recoveredApprovalState;
const conversationRuntime = getConversationRuntime(
runtime,
scopedAgentId,
scopedConversationId,
);
const recovered = conversationRuntime?.recoveredApprovalState;
if (!recovered) {
return null;
}
@@ -85,9 +278,18 @@ export function clearRecoveredApprovalStateForScope(
conversation_id?: string | null;
},
): void {
const recovered = getRecoveredApprovalStateForScope(runtime, params);
if (recovered) {
clearRecoveredApprovalState(runtime);
const scopedAgentId = resolveScopedAgentId(runtime, params);
if (!scopedAgentId) {
return;
}
const scopedConversationId = resolveScopedConversationId(runtime, params);
const conversationRuntime = getConversationRuntime(
runtime,
scopedAgentId,
scopedConversationId,
);
if (conversationRuntime?.recoveredApprovalState) {
clearRecoveredApprovalState(conversationRuntime);
}
}
@@ -100,30 +302,27 @@ export function getPendingControlRequests(
): PendingControlRequest[] {
const scopedAgentId = resolveScopedAgentId(runtime, params);
const scopedConversationId = resolveScopedConversationId(runtime, params);
const conversationRuntime = getConversationRuntime(
runtime,
scopedAgentId,
scopedConversationId,
);
const requests: PendingControlRequest[] = [];
for (const pending of runtime.pendingApprovalResolvers.values()) {
if (!conversationRuntime) {
return requests;
}
for (const pending of conversationRuntime.pendingApprovalResolvers.values()) {
const request = pending.controlRequest;
if (!request) continue;
if (
scopedAgentId &&
(request.agent_id ?? scopedAgentId) !== scopedAgentId
) {
continue;
}
if (
scopedConversationId &&
(request.conversation_id ?? scopedConversationId) !== scopedConversationId
) {
continue;
}
requests.push({
request_id: request.request_id,
request: request.request,
});
}
const recovered = getRecoveredApprovalStateForScope(runtime, params);
const recovered = conversationRuntime.recoveredApprovalState;
if (recovered) {
for (const requestId of recovered.pendingRequestIds) {
const entry = recovered.approvalsByRequestId.get(requestId);

View File

@@ -1,5 +1,14 @@
import type { RuntimeScope } from "../../types/protocol_v2";
import type { ListenerRuntime } from "./types";
import type { ConversationRuntime, ListenerRuntime } from "./types";
function getOnlyConversationRuntime(
runtime: ListenerRuntime | null,
): ConversationRuntime | null {
if (!runtime || runtime.conversationRuntimes.size !== 1) {
return null;
}
return runtime.conversationRuntimes.values().next().value ?? null;
}
export function normalizeCwdAgentId(agentId?: string | null): string | null {
return agentId && agentId.length > 0 ? agentId : null;
@@ -19,9 +28,14 @@ export function resolveScopedAgentId(
agent_id?: string | null;
},
): string | null {
return (
normalizeCwdAgentId(params?.agent_id) ?? runtime?.activeAgentId ?? null
);
if (!runtime) {
return normalizeCwdAgentId(params?.agent_id) ?? null;
}
const explicitAgentId = normalizeCwdAgentId(params?.agent_id);
if (explicitAgentId) {
return explicitAgentId;
}
return getOnlyConversationRuntime(runtime)?.agentId ?? null;
}
export function resolveScopedConversationId(
@@ -30,8 +44,15 @@ export function resolveScopedConversationId(
conversation_id?: string | null;
},
): string {
return normalizeConversationId(
params?.conversation_id ?? runtime?.activeConversationId,
if (!runtime) {
return normalizeConversationId(params?.conversation_id);
}
if (params?.conversation_id) {
return normalizeConversationId(params.conversation_id);
}
return (
getOnlyConversationRuntime(runtime)?.conversationId ??
normalizeConversationId(params?.conversation_id)
);
}
@@ -52,19 +73,3 @@ export function resolveRuntimeScope(
conversation_id: resolvedConversationId,
};
}
export function isScopeCurrentlyActive(
runtime: ListenerRuntime,
agentId: string | null,
conversationId: string,
): boolean {
if (!runtime.isProcessing) return true;
const activeAgent = runtime.activeAgentId;
const activeConvo = normalizeConversationId(runtime.activeConversationId);
if (agentId && activeAgent && agentId !== activeAgent) return false;
if (conversationId !== activeConvo) return false;
return true;
}

View File

@@ -52,7 +52,7 @@ import {
getApprovalContinuationRecoveryDisposition,
isApprovalToolCallDesyncError,
} from "./recovery";
import type { ListenerRuntime } from "./types";
import type { ConversationRuntime } from "./types";
export function isApprovalOnlyInput(
input: Array<MessageCreate | ApprovalCreate>,
@@ -66,7 +66,7 @@ export function isApprovalOnlyInput(
}
export function markAwaitingAcceptedApprovalContinuationRunId(
runtime: ListenerRuntime,
runtime: ConversationRuntime,
input: Array<MessageCreate | ApprovalCreate>,
): void {
if (isApprovalOnlyInput(input)) {
@@ -80,16 +80,16 @@ export function markAwaitingAcceptedApprovalContinuationRunId(
* touch pendingInterruptedResults (that's exclusively owned by handleIncomingMessage).
*/
async function resolveStaleApprovals(
runtime: ListenerRuntime,
runtime: ConversationRuntime,
socket: WebSocket,
abortSignal: AbortSignal,
): Promise<Awaited<ReturnType<typeof drainRecoveryStreamWithEmission>> | null> {
if (!runtime.activeAgentId) return null;
if (!runtime.agentId) return null;
const client = await getClient();
let agent: Awaited<ReturnType<typeof client.agents.retrieve>>;
try {
agent = await client.agents.retrieve(runtime.activeAgentId);
agent = await client.agents.retrieve(runtime.agentId);
} catch (err) {
if (err instanceof APIError && (err.status === 404 || err.status === 422)) {
return null;
@@ -97,9 +97,7 @@ async function resolveStaleApprovals(
throw err;
}
const requestedConversationId =
runtime.activeConversationId && runtime.activeConversationId !== "default"
? runtime.activeConversationId
: undefined;
runtime.conversationId !== "default" ? runtime.conversationId : undefined;
let resumeData: Awaited<ReturnType<typeof getResumeData>>;
try {
@@ -117,16 +115,16 @@ async function resolveStaleApprovals(
if (pendingApprovals.length === 0) return null;
if (abortSignal.aborted) throw new Error("Cancelled");
const recoveryConversationId = runtime.activeConversationId || "default";
const recoveryConversationId = runtime.conversationId;
const recoveryWorkingDirectory =
runtime.activeWorkingDirectory ??
getConversationWorkingDirectory(
runtime,
runtime.activeAgentId,
runtime.listener,
runtime.agentId,
recoveryConversationId,
);
const scope = {
agent_id: runtime.activeAgentId,
agent_id: runtime.agentId,
conversation_id: recoveryConversationId,
} as const;
@@ -187,7 +185,7 @@ async function resolveStaleApprovals(
blocked_path: null,
...(diffs.length > 0 ? { diffs } : {}),
},
agent_id: runtime.activeAgentId,
agent_id: runtime.agentId,
conversation_id: recoveryConversationId,
};
@@ -246,7 +244,7 @@ async function resolveStaleApprovals(
emitToolExecutionStartedEvents(socket, runtime, {
toolCallIds: approvedToolCallIds,
runId: runtime.activeRunId ?? undefined,
agentId: runtime.activeAgentId,
agentId: runtime.agentId ?? undefined,
conversationId: recoveryConversationId,
});
@@ -258,7 +256,7 @@ async function resolveStaleApprovals(
emitToolExecutionFinishedEvents(socket, runtime, {
approvals: approvalResults,
runId: runtime.activeRunId ?? undefined,
agentId: runtime.activeAgentId,
agentId: runtime.agentId ?? undefined,
conversationId: recoveryConversationId,
});
emitInterruptToolReturnMessage(
@@ -273,7 +271,7 @@ async function resolveStaleApprovals(
recoveryConversationId,
[{ type: "approval", approvals: approvalResults }],
{
agentId: runtime.activeAgentId,
agentId: runtime.agentId ?? undefined,
streamTokens: true,
background: true,
workingDirectory: recoveryWorkingDirectory,
@@ -294,7 +292,7 @@ async function resolveStaleApprovals(
socket,
runtime,
{
agentId: runtime.activeAgentId,
agentId: runtime.agentId ?? undefined,
conversationId: recoveryConversationId,
abortSignal,
},
@@ -324,7 +322,7 @@ export async function sendMessageStreamWithRetry(
messages: Parameters<typeof sendMessageStream>[1],
opts: Parameters<typeof sendMessageStream>[2],
socket: WebSocket,
runtime: ListenerRuntime,
runtime: ConversationRuntime,
abortSignal?: AbortSignal,
): Promise<Awaited<ReturnType<typeof sendMessageStream>>> {
let transientRetries = 0;
@@ -340,7 +338,7 @@ export async function sendMessageStreamWithRetry(
}
runtime.isRecoveringApprovals = false;
setLoopStatus(runtime, "WAITING_FOR_API_RESPONSE", {
agent_id: runtime.activeAgentId,
agent_id: runtime.agentId,
conversation_id: conversationId,
});
@@ -380,7 +378,7 @@ export async function sendMessageStreamWithRetry(
if (approvalConflictDetected) {
runtime.isRecoveringApprovals = true;
setLoopStatus(runtime, "RETRYING_API_REQUEST", {
agent_id: runtime.activeAgentId,
agent_id: runtime.agentId,
conversation_id: conversationId,
});
if (abortSignal?.aborted) throw new Error("Cancelled by user");
@@ -408,7 +406,7 @@ export async function sendMessageStreamWithRetry(
if (action === "retry_transient") {
runtime.isRecoveringApprovals = true;
setLoopStatus(runtime, "RETRYING_API_REQUEST", {
agent_id: runtime.activeAgentId,
agent_id: runtime.agentId,
conversation_id: conversationId,
});
const attempt = transientRetries + 1;
@@ -434,7 +432,7 @@ export async function sendMessageStreamWithRetry(
attempt,
maxAttempts: LLM_API_ERROR_MAX_RETRIES,
delayMs,
agentId: runtime.activeAgentId ?? undefined,
agentId: runtime.agentId ?? undefined,
conversationId,
});
}
@@ -449,7 +447,7 @@ export async function sendMessageStreamWithRetry(
if (action === "retry_conversation_busy") {
runtime.isRecoveringApprovals = true;
setLoopStatus(runtime, "RETRYING_API_REQUEST", {
agent_id: runtime.activeAgentId,
agent_id: runtime.agentId,
conversation_id: conversationId,
});
try {
@@ -459,7 +457,7 @@ export async function sendMessageStreamWithRetry(
{
conversationId,
resolvedConversationId: conversationId,
agentId: runtime.activeAgentId,
agentId: runtime.agentId ?? null,
requestStartedAtMs,
},
);
@@ -500,7 +498,7 @@ export async function sendMessageStreamWithRetry(
attempt,
maxAttempts: MAX_CONVERSATION_BUSY_RETRIES,
delayMs,
agentId: runtime.activeAgentId ?? undefined,
agentId: runtime.agentId ?? undefined,
conversationId,
});
@@ -521,7 +519,7 @@ export async function sendApprovalContinuationWithRetry(
messages: Parameters<typeof sendMessageStream>[1],
opts: Parameters<typeof sendMessageStream>[2],
socket: WebSocket,
runtime: ListenerRuntime,
runtime: ConversationRuntime,
abortSignal?: AbortSignal,
retryOptions: {
allowApprovalRecovery?: boolean;
@@ -541,7 +539,7 @@ export async function sendApprovalContinuationWithRetry(
}
runtime.isRecoveringApprovals = false;
setLoopStatus(runtime, "WAITING_FOR_API_RESPONSE", {
agent_id: runtime.activeAgentId,
agent_id: runtime.agentId,
conversation_id: conversationId,
});
@@ -581,7 +579,7 @@ export async function sendApprovalContinuationWithRetry(
if (approvalConflictDetected) {
runtime.isRecoveringApprovals = true;
setLoopStatus(runtime, "RETRYING_API_REQUEST", {
agent_id: runtime.activeAgentId,
agent_id: runtime.agentId,
conversation_id: conversationId,
});
@@ -603,7 +601,7 @@ export async function sendApprovalContinuationWithRetry(
) {
finalizeHandledRecoveryTurn(runtime, socket, {
drainResult,
agentId: runtime.activeAgentId,
agentId: runtime.agentId ?? undefined,
conversationId,
});
return null;
@@ -621,7 +619,7 @@ export async function sendApprovalContinuationWithRetry(
if (action === "retry_transient") {
runtime.isRecoveringApprovals = true;
setLoopStatus(runtime, "RETRYING_API_REQUEST", {
agent_id: runtime.activeAgentId,
agent_id: runtime.agentId,
conversation_id: conversationId,
});
const attempt = transientRetries + 1;
@@ -649,7 +647,7 @@ export async function sendApprovalContinuationWithRetry(
conversationBusyRetries += 1;
runtime.isRecoveringApprovals = true;
setLoopStatus(runtime, "RETRYING_API_REQUEST", {
agent_id: runtime.activeAgentId,
agent_id: runtime.agentId,
conversation_id: conversationId,
});
@@ -660,7 +658,7 @@ export async function sendApprovalContinuationWithRetry(
{
conversationId,
resolvedConversationId: conversationId,
agentId: runtime.activeAgentId,
agentId: runtime.agentId ?? null,
requestStartedAtMs,
},
);

View File

@@ -40,7 +40,7 @@ import {
markAwaitingAcceptedApprovalContinuationRunId,
sendApprovalContinuationWithRetry,
} from "./send";
import type { ListenerRuntime } from "./types";
import type { ConversationRuntime } from "./types";
type Decision =
| {
@@ -79,7 +79,7 @@ export async function handleApprovalStop(params: {
toolName: string;
toolArgs: string;
}>;
runtime: ListenerRuntime;
runtime: ConversationRuntime;
socket: WebSocket;
agentId: string;
conversationId: string;
@@ -121,8 +121,6 @@ export async function handleApprovalStop(params: {
agent_id: agentId,
conversation_id: conversationId,
});
runtime.activeAgentId = null;
runtime.activeConversationId = null;
runtime.activeWorkingDirectory = null;
runtime.activeRunId = null;
runtime.activeRunStartedAt = null;

View File

@@ -59,6 +59,7 @@ import {
import {
clearActiveRunState,
clearRecoveredApprovalStateForScope,
evictConversationRuntimeIfIdle,
} from "./runtime";
import { normalizeCwdAgentId } from "./scope";
import {
@@ -68,12 +69,12 @@ import {
sendMessageStreamWithRetry,
} from "./send";
import { handleApprovalStop } from "./turn-approval";
import type { IncomingMessage, ListenerRuntime } from "./types";
import type { ConversationRuntime, IncomingMessage } from "./types";
export async function handleIncomingMessage(
msg: IncomingMessage,
socket: WebSocket,
runtime: ListenerRuntime,
runtime: ConversationRuntime,
onStatusChange?: (
status: "idle" | "receiving" | "processing",
connectionId: string,
@@ -86,7 +87,7 @@ export async function handleIncomingMessage(
const conversationId = requestedConversationId ?? "default";
const normalizedAgentId = normalizeCwdAgentId(agentId);
const turnWorkingDirectory = getConversationWorkingDirectory(
runtime,
runtime.listener,
normalizedAgentId,
conversationId,
);
@@ -103,8 +104,6 @@ export async function handleIncomingMessage(
runtime.isProcessing = true;
runtime.cancelRequested = false;
runtime.activeAbortController = new AbortController();
runtime.activeAgentId = agentId ?? null;
runtime.activeConversationId = conversationId;
runtime.activeWorkingDirectory = turnWorkingDirectory;
runtime.activeRunId = null;
runtime.activeRunStartedAt = new Date().toISOString();
@@ -113,7 +112,7 @@ export async function handleIncomingMessage(
agent_id: agentId ?? null,
conversation_id: conversationId,
});
clearRecoveredApprovalStateForScope(runtime, {
clearRecoveredApprovalStateForScope(runtime.listener, {
agent_id: agentId ?? null,
conversation_id: conversationId,
});
@@ -174,7 +173,7 @@ export async function handleIncomingMessage(
const { parts: reminderParts } = await buildSharedReminderParts(
buildListenReminderContext({
agentId: agentId || "",
state: runtime.reminderState,
state: runtime.listener.reminderState,
resolvePlanModeReminder: getPlanModeReminder,
}),
);
@@ -747,5 +746,6 @@ export async function handleIncomingMessage(
runtime.cancelRequested = false;
runtime.isRecoveringApprovals = false;
runtime.activeExecutingToolCallIds = [];
evictConversationRuntimeIfIdle(runtime);
}
}

View File

@@ -96,21 +96,16 @@ export type RecoveredApprovalState = {
responsesByRequestId: Map<string, ApprovalResponseBody>;
};
export type ListenerRuntime = {
socket: WebSocket | null;
heartbeatInterval: NodeJS.Timeout | null;
reconnectTimeout: NodeJS.Timeout | null;
intentionallyClosed: boolean;
hasSuccessfulConnection: boolean;
export type ConversationRuntime = {
listener: ListenerRuntime;
key: string;
agentId: string | null;
conversationId: string;
messageQueue: Promise<void>;
pendingApprovalResolvers: Map<string, PendingApprovalResolver>;
recoveredApprovalState: RecoveredApprovalState | null;
sessionId: string;
eventSeqCounter: number;
lastStopReason: string | null;
isProcessing: boolean;
activeAgentId: string | null;
activeConversationId: string | null;
activeWorkingDirectory: string | null;
activeRunId: string | null;
activeRunStartedAt: string | null;
@@ -120,13 +115,7 @@ export type ListenerRuntime = {
queuedMessagesByItemId: Map<string, IncomingMessage>;
queuePumpActive: boolean;
queuePumpScheduled: boolean;
queueEmitScheduled: boolean;
pendingQueueEmitScope?: {
agent_id?: string | null;
conversation_id?: string | null;
};
pendingTurns: number;
onWsEvent?: StartListenerOptions["onWsEvent"];
isRecoveringApprovals: boolean;
loopStatus: LoopStatus;
pendingApprovalBatchByToolCallId: Map<string, string>;
@@ -139,11 +128,31 @@ export type ListenerRuntime = {
continuationEpoch: number;
activeExecutingToolCallIds: string[];
pendingInterruptedToolCallIds: string[] | null;
};
export type ListenerRuntime = {
socket: WebSocket | null;
heartbeatInterval: NodeJS.Timeout | null;
reconnectTimeout: NodeJS.Timeout | null;
intentionallyClosed: boolean;
hasSuccessfulConnection: boolean;
sessionId: string;
eventSeqCounter: number;
lastStopReason: string | null;
queueEmitScheduled: boolean;
pendingQueueEmitScope?: {
agent_id?: string | null;
conversation_id?: string | null;
};
onWsEvent?: StartListenerOptions["onWsEvent"];
reminderState: SharedReminderState;
bootWorkingDirectory: string;
workingDirectoryByConversation: Map<string, string>;
connectionId: string | null;
connectionName: string | null;
conversationRuntimes: Map<string, ConversationRuntime>;
approvalRuntimeKeyByRequestId: Map<string, string>;
lastEmittedStatus: "idle" | "receiving" | "processing" | null;
};
export interface InterruptPopulateInput {