diff --git a/src/agent/defaults.ts b/src/agent/defaults.ts index 24536cc..57c26bb 100644 --- a/src/agent/defaults.ts +++ b/src/agent/defaults.ts @@ -8,8 +8,10 @@ import type { Letta } from "@letta-ai/letta-client"; import type { AgentState } from "@letta-ai/letta-client/resources/agents/agents"; import { settingsManager } from "../settings-manager"; +import { getServerUrl } from "./client"; import { type CreateAgentOptions, createAgent } from "./create"; import { parseMdxFrontmatter } from "./memory"; +import { getDefaultModel, resolveModel } from "./model"; import { MEMORY_PROMPTS } from "./promptAssets"; // Tags used to identify default agents @@ -47,6 +49,80 @@ export const DEFAULT_AGENT_CONFIGS: Record = { }, }; +function isSelfHostedServer(): boolean { + return !getServerUrl().includes("api.letta.com"); +} + +export function selectDefaultAgentModel(params: { + preferredModel?: string; + isSelfHosted: boolean; + availableHandles?: Iterable; +}): string | undefined { + const { preferredModel, isSelfHosted, availableHandles } = params; + const resolvedPreferred = + typeof preferredModel === "string" && preferredModel.length > 0 + ? (resolveModel(preferredModel) ?? preferredModel) + : undefined; + + if (!isSelfHosted) { + return resolvedPreferred; + } + + const handles = availableHandles ? new Set(availableHandles) : null; + if (!handles) { + return resolvedPreferred; + } + + if (resolvedPreferred && handles.has(resolvedPreferred)) { + return resolvedPreferred; + } + + const firstNonAutoHandle = Array.from(handles).find( + (handle) => handle !== "letta/auto" && handle !== "letta/auto-fast", + ); + if (firstNonAutoHandle) { + return firstNonAutoHandle; + } + + const defaultHandle = getDefaultModel(); + if (handles.has(defaultHandle)) { + return defaultHandle; + } + + return Array.from(handles)[0]; +} + +async function resolveDefaultAgentModel( + client: Letta, + preferredModel?: string, +): Promise { + if (!isSelfHostedServer()) { + return selectDefaultAgentModel({ + preferredModel, + isSelfHosted: false, + }); + } + + try { + const availableHandles = new Set( + (await client.models.list()) + .map((model) => model.handle) + .filter((handle): handle is string => typeof handle === "string"), + ); + + return selectDefaultAgentModel({ + preferredModel, + isSelfHosted: true, + availableHandles, + }); + } catch { + return selectDefaultAgentModel({ + preferredModel, + isSelfHosted: true, + }); + } +} + /** * Add a tag to an existing agent. */ @@ -81,6 +157,9 @@ async function addTagToAgent( */ export async function ensureDefaultAgents( client: Letta, + options?: { + preferredModel?: string; + }, ): Promise { if (!settingsManager.shouldCreateDefaultAgents()) { return null; @@ -95,6 +174,7 @@ export async function ensureDefaultAgents( const { agent } = await createAgent({ ...DEFAULT_AGENT_CONFIGS.memo, + model: await resolveDefaultAgentModel(client, options?.preferredModel), memoryPromptMode: willAutoEnableMemfs ? "memfs" : undefined, }); await addTagToAgent(client, agent.id, MEMO_TAG); diff --git a/src/cli/helpers/thinkingMessages.ts b/src/cli/helpers/thinkingMessages.ts index 5114439..4b70805 100644 --- a/src/cli/helpers/thinkingMessages.ts +++ b/src/cli/helpers/thinkingMessages.ts @@ -1,5 +1,5 @@ // Machine god AI themed thinking verbs -const THINKING_VERBS = [ +const THINKING_VERBS = Object.freeze([ "thinking", "processing", "computing", @@ -41,23 +41,23 @@ const THINKING_VERBS = [ "remembering", "absorbing", "internalizing", -] as const; +] as const); export const SYSTEM_PROMPT_UPGRADE_TIP = "Use /system to upgrade to the latest default prompt."; -export const THINKING_TIPS = [ +export const THINKING_TIPS = Object.freeze([ "Use /remember [instructions] to remember something from the conversation.", "Use /palace to inspect your agent's memory palace.", "Use /reflect to launch a background reflection agent to update memory.", "Use /search [query] to search messages across all agents.", "Use /init to initialize (or re-init) your agent's memory.", -] as const; +] as const); -const THINKING_TIPS_WITH_SYSTEM_UPGRADE = [ +const THINKING_TIPS_WITH_SYSTEM_UPGRADE = Object.freeze([ ...THINKING_TIPS, SYSTEM_PROMPT_UPGRADE_TIP, -]; +] as const); type ThinkingVerb = (typeof THINKING_VERBS)[number]; @@ -141,9 +141,6 @@ export function getRandomThinkingTip(options?: { (options?.includeSystemPromptUpgradeTip ?? true) ? THINKING_TIPS_WITH_SYSTEM_UPGRADE : THINKING_TIPS; - if (tipPool.length === 0) { - return ""; - } const index = Math.floor(Math.random() * tipPool.length); - return tipPool[index] ?? ""; + return tipPool[index] ?? tipPool[0] ?? THINKING_TIPS[0] ?? ""; } diff --git a/src/headless.ts b/src/headless.ts index 40f3224..80a7eac 100644 --- a/src/headless.ts +++ b/src/headless.ts @@ -870,7 +870,9 @@ export async function handleHeadlessCommand( // Priority 7: Fresh user with no LRU - create default agent if (!agent) { const { ensureDefaultAgents } = await import("./agent/defaults"); - const defaultAgent = await ensureDefaultAgents(client); + const defaultAgent = await ensureDefaultAgents(client, { + preferredModel: model, + }); if (defaultAgent) { agent = defaultAgent; } diff --git a/src/index.ts b/src/index.ts index 0313607..e8e9728 100755 --- a/src/index.ts +++ b/src/index.ts @@ -1401,7 +1401,9 @@ async function main(): Promise { case "create": { const { ensureDefaultAgents } = await import("./agent/defaults"); try { - const defaultAgent = await ensureDefaultAgents(client); + const defaultAgent = await ensureDefaultAgents(client, { + preferredModel: model, + }); if (defaultAgent) { setSelectedGlobalAgentId(defaultAgent.id); setLoadingState("assembling"); diff --git a/src/tests/agent/defaults.test.ts b/src/tests/agent/defaults.test.ts new file mode 100644 index 0000000..8fed85f --- /dev/null +++ b/src/tests/agent/defaults.test.ts @@ -0,0 +1,33 @@ +import { describe, expect, test } from "bun:test"; +import { selectDefaultAgentModel } from "../../agent/defaults"; + +describe("selectDefaultAgentModel", () => { + test("uses the caller's preferred model when it is available on self-hosted", () => { + const result = selectDefaultAgentModel({ + preferredModel: "haiku", + isSelfHosted: true, + availableHandles: ["anthropic/claude-haiku-4-5"], + }); + + expect(result).toBe("anthropic/claude-haiku-4-5"); + }); + + test("falls back to a server-available non-auto handle on self-hosted", () => { + const result = selectDefaultAgentModel({ + isSelfHosted: true, + availableHandles: ["letta/auto", "anthropic/claude-haiku-4-5"], + }); + + expect(result).toBe("anthropic/claude-haiku-4-5"); + }); + + test("passes through the preferred model on cloud", () => { + const result = selectDefaultAgentModel({ + preferredModel: "haiku", + isSelfHosted: false, + availableHandles: ["letta/auto"], + }); + + expect(result).toBe("anthropic/claude-haiku-4-5"); + }); +}); diff --git a/src/tests/cli/thinkingMessages.test.ts b/src/tests/cli/thinkingMessages.test.ts index ad73687..00b5d56 100644 --- a/src/tests/cli/thinkingMessages.test.ts +++ b/src/tests/cli/thinkingMessages.test.ts @@ -50,7 +50,9 @@ describe("Thinking messages", () => { }); test("returns a tip from the configured tip list", () => { - const tip = getRandomThinkingTip(); + const tip = getRandomThinkingTip({ + includeSystemPromptUpgradeTip: false, + }); expect(tip.length).toBeGreaterThan(0); expect((THINKING_TIPS as readonly string[]).includes(tip)).toBe(true); diff --git a/src/tests/headless-scenario.ts b/src/tests/headless-scenario.ts index cc8a3e7..7cd6df4 100644 --- a/src/tests/headless-scenario.ts +++ b/src/tests/headless-scenario.ts @@ -93,52 +93,105 @@ async function runCLI( return { stdout: out, code }; } +const REQUIRED_MARKERS = ["BANANA"]; +const MAX_ATTEMPTS = 2; + function assertContainsAll(hay: string, needles: string[]) { for (const n of needles) { if (!hay.includes(n)) throw new Error(`Missing expected output: ${n}`); } } +function extractStreamJsonAssistantText(stdout: string): string { + const parts: string[] = []; + for (const line of stdout.split(/\r?\n/)) { + if (!line.trim()) continue; + try { + const event = JSON.parse(line) as { + type?: string; + message_type?: string; + content?: unknown; + result?: unknown; + }; + if ( + event.type === "message" && + event.message_type === "assistant_message" && + typeof event.content === "string" + ) { + parts.push(event.content); + } + if (event.type === "result" && typeof event.result === "string") { + parts.push(event.result); + } + } catch { + // Ignore malformed lines; validation will fail if we never find the marker. + } + } + return parts.join(""); +} + +function validateOutput(stdout: string, output: Args["output"]) { + if (output === "text") { + assertContainsAll(stdout, REQUIRED_MARKERS); + return; + } + + if (output === "json") { + try { + const obj = JSON.parse(stdout); + const result = String(obj?.result ?? ""); + assertContainsAll(result, REQUIRED_MARKERS); + return; + } catch (e) { + throw new Error(`Invalid JSON output: ${(e as Error).message}`); + } + } + + const streamText = extractStreamJsonAssistantText(stdout); + if (!streamText) { + throw new Error("No assistant/result content found in stream-json output"); + } + assertContainsAll(streamText, REQUIRED_MARKERS); +} + async function main() { const { model, output } = parseArgs(process.argv.slice(2)); const prereq = await ensurePrereqs(model); if (prereq === "skip") return; - const { stdout, code } = await runCLI(model, output); - if (code !== 0) { - process.exit(code); + let stdout = ""; + let code = 0; + let lastError: Error | null = null; + + for (let attempt = 1; attempt <= MAX_ATTEMPTS; attempt += 1) { + ({ stdout, code } = await runCLI(model, output)); + if (code !== 0) { + lastError = new Error(`CLI exited with code ${code}`); + } else { + try { + validateOutput(stdout, output); + console.log(`OK: ${model} / ${output}`); + return; + } catch (error) { + lastError = error as Error; + } + } + + if (attempt < MAX_ATTEMPTS) { + console.error( + `[headless-scenario] attempt ${attempt}/${MAX_ATTEMPTS} failed for ${model} / ${output}: ${lastError?.message ?? "unknown error"}`, + ); + await Bun.sleep(500); + } } try { - // Validate by output mode - if (output === "text") { - assertContainsAll(stdout, ["BANANA"]); - } else if (output === "json") { - try { - const obj = JSON.parse(stdout); - const result = String(obj?.result ?? ""); - assertContainsAll(result, ["BANANA"]); - } catch (e) { - throw new Error(`Invalid JSON output: ${(e as Error).message}`); - } - } else if (output === "stream-json") { - // stream-json prints one JSON object per line; find the final result event - const lines = stdout.split(/\r?\n/).filter(Boolean); - const resultLine = lines.find((l) => { - try { - const o = JSON.parse(l); - return o?.type === "result"; - } catch { - return false; - } - }); - if (!resultLine) throw new Error("No final result event in stream-json"); - const evt = JSON.parse(resultLine); - const result = String(evt?.result ?? ""); - assertContainsAll(result, ["BANANA"]); + if (code !== 0) { + process.exit(code); + } + if (lastError) { + throw lastError; } - - console.log(`OK: ${model} / ${output}`); } catch (e) { // Dump full stdout to aid debugging console.error(`\n===== BEGIN STDOUT (${model} / ${output}) =====`); diff --git a/src/tests/websocket/listen-client-concurrency.test.ts b/src/tests/websocket/listen-client-concurrency.test.ts new file mode 100644 index 0000000..71f285d --- /dev/null +++ b/src/tests/websocket/listen-client-concurrency.test.ts @@ -0,0 +1,631 @@ +import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"; +import WebSocket from "ws"; +import { permissionMode } from "../../permissions/mode"; +import type { MessageQueueItem } from "../../queue/queueRuntime"; + +type MockStream = { + conversationId: string; + agentId?: string; +}; + +type DrainResult = { + stopReason: string; + approvals?: Array<{ + toolCallId: string; + toolName: string; + toolArgs: string; + }>; + apiDurationMs: number; +}; + +const defaultDrainResult: DrainResult = { + stopReason: "end_turn", + approvals: [], + apiDurationMs: 0, +}; + +const sendMessageStreamMock = mock( + async ( + conversationId: string, + _messages: unknown[], + opts?: { agentId?: string }, + ): Promise => ({ + conversationId, + agentId: opts?.agentId, + }), +); +const getStreamToolContextIdMock = mock(() => null); +const drainHandlers = new Map< + string, + (abortSignal?: AbortSignal) => Promise +>(); +const drainStreamWithResumeMock = mock( + async ( + stream: MockStream, + _buffers: unknown, + _refresh: () => void, + abortSignal?: AbortSignal, + ) => { + const handler = drainHandlers.get(stream.conversationId); + if (handler) { + return handler(abortSignal); + } + return defaultDrainResult; + }, +); +const cancelConversationMock = mock(async (_conversationId: string) => {}); +const getClientMock = mock(async () => ({ + conversations: { + cancel: cancelConversationMock, + }, +})); +const fetchRunErrorDetailMock = mock(async () => null); +const realStreamModule = await import("../../cli/helpers/stream"); + +mock.module("../../agent/message", () => ({ + sendMessageStream: sendMessageStreamMock, + getStreamToolContextId: getStreamToolContextIdMock, + getStreamRequestContext: () => undefined, + getStreamRequestStartTime: () => undefined, + buildConversationMessagesCreateRequestBody: ( + conversationId: string, + messages: unknown[], + opts?: { agentId?: string; streamTokens?: boolean; background?: boolean }, + clientTools?: unknown[], + clientSkills?: unknown[], + ) => ({ + messages, + streaming: true, + stream_tokens: opts?.streamTokens ?? true, + include_pings: true, + background: opts?.background ?? true, + client_skills: clientSkills ?? [], + client_tools: clientTools ?? [], + include_compaction_messages: true, + ...(conversationId === "default" && opts?.agentId + ? { agent_id: opts.agentId } + : {}), + }), +})); + +mock.module("../../cli/helpers/stream", () => ({ + ...realStreamModule, + drainStreamWithResume: drainStreamWithResumeMock, +})); + +mock.module("../../agent/client", () => ({ + getClient: getClientMock, + getServerUrl: () => "https://example.test", + clearLastSDKDiagnostic: () => {}, + consumeLastSDKDiagnostic: () => null, +})); + +mock.module("../../agent/approval-recovery", () => ({ + fetchRunErrorDetail: fetchRunErrorDetailMock, +})); + +const listenClientModule = await import("../../websocket/listen-client"); +const { + __listenClientTestUtils, + requestApprovalOverWS, + resolvePendingApprovalResolver, +} = listenClientModule; + +class MockSocket { + readyState: number; + sentPayloads: string[] = []; + + constructor(readyState: number = WebSocket.OPEN) { + this.readyState = readyState; + } + + send(data: string): void { + this.sentPayloads.push(data); + } + + close(): void {} + + removeAllListeners(): this { + return this; + } +} + +function createDeferredDrain() { + let resolve!: (value: DrainResult) => void; + let reject!: (error: Error) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + +async function waitFor( + predicate: () => boolean, + attempts: number = 20, +): Promise { + for (let i = 0; i < attempts; i += 1) { + if (predicate()) { + return; + } + await new Promise((resolve) => setTimeout(resolve, 0)); + } +} + +function makeIncomingMessage( + agentId: string, + conversationId: string, + text: string, +) { + return { + type: "message" as const, + agentId, + conversationId, + messages: [{ role: "user" as const, content: text }], + }; +} + +describe("listen-client multi-worker concurrency", () => { + beforeEach(() => { + permissionMode.reset(); + sendMessageStreamMock.mockClear(); + getStreamToolContextIdMock.mockClear(); + drainStreamWithResumeMock.mockClear(); + getClientMock.mockClear(); + cancelConversationMock.mockClear(); + fetchRunErrorDetailMock.mockClear(); + drainHandlers.clear(); + __listenClientTestUtils.setActiveRuntime(null); + }); + + afterEach(() => { + permissionMode.reset(); + __listenClientTestUtils.setActiveRuntime(null); + }); + + test("processes simultaneous turns for two named conversations under one agent", async () => { + const listener = __listenClientTestUtils.createListenerRuntime(); + const runtimeA = __listenClientTestUtils.getOrCreateConversationRuntime( + listener, + "agent-1", + "conv-a", + ); + const runtimeB = __listenClientTestUtils.getOrCreateConversationRuntime( + listener, + "agent-1", + "conv-b", + ); + const socket = new MockSocket(); + const drainA = createDeferredDrain(); + const drainB = createDeferredDrain(); + drainHandlers.set("conv-a", () => drainA.promise); + drainHandlers.set("conv-b", () => drainB.promise); + + const turnA = __listenClientTestUtils.handleIncomingMessage( + makeIncomingMessage("agent-1", "conv-a", "hello a"), + socket as unknown as WebSocket, + runtimeA, + ); + const turnB = __listenClientTestUtils.handleIncomingMessage( + makeIncomingMessage("agent-1", "conv-b", "hello b"), + socket as unknown as WebSocket, + runtimeB, + ); + + await waitFor(() => sendMessageStreamMock.mock.calls.length === 2); + + expect(runtimeA.isProcessing).toBe(true); + expect(runtimeB.isProcessing).toBe(true); + expect(__listenClientTestUtils.getListenerStatus(listener)).toBe( + "processing", + ); + expect(sendMessageStreamMock.mock.calls.map((call) => call[0])).toEqual([ + "conv-a", + "conv-b", + ]); + + drainB.resolve(defaultDrainResult); + await turnB; + expect(runtimeB.isProcessing).toBe(false); + expect(runtimeA.isProcessing).toBe(true); + + drainA.resolve(defaultDrainResult); + await turnA; + expect(runtimeA.isProcessing).toBe(false); + expect(__listenClientTestUtils.getListenerStatus(listener)).toBe("idle"); + }); + + test("keeps default conversations separate for different agents during concurrent turns", async () => { + const listener = __listenClientTestUtils.createListenerRuntime(); + const runtimeA = __listenClientTestUtils.getOrCreateConversationRuntime( + listener, + "agent-a", + "default", + ); + const runtimeB = __listenClientTestUtils.getOrCreateConversationRuntime( + listener, + "agent-b", + "default", + ); + const socket = new MockSocket(); + + await Promise.all([ + __listenClientTestUtils.handleIncomingMessage( + makeIncomingMessage("agent-a", "default", "from a"), + socket as unknown as WebSocket, + runtimeA, + ), + __listenClientTestUtils.handleIncomingMessage( + makeIncomingMessage("agent-b", "default", "from b"), + socket as unknown as WebSocket, + runtimeB, + ), + ]); + + expect(sendMessageStreamMock.mock.calls).toHaveLength(2); + expect(sendMessageStreamMock.mock.calls[0]?.[0]).toBe("default"); + expect(sendMessageStreamMock.mock.calls[1]?.[0]).toBe("default"); + expect(sendMessageStreamMock.mock.calls[0]?.[2]).toMatchObject({ + agentId: "agent-a", + }); + expect(sendMessageStreamMock.mock.calls[1]?.[2]).toMatchObject({ + agentId: "agent-b", + }); + }); + + test("cancelling one conversation runtime does not cancel another", async () => { + const listener = __listenClientTestUtils.createListenerRuntime(); + const runtimeA = __listenClientTestUtils.getOrCreateConversationRuntime( + listener, + "agent-1", + "conv-a", + ); + const runtimeB = __listenClientTestUtils.getOrCreateConversationRuntime( + listener, + "agent-1", + "conv-b", + ); + + runtimeA.isProcessing = true; + runtimeA.activeAbortController = new AbortController(); + runtimeB.isProcessing = true; + runtimeB.activeAbortController = new AbortController(); + + runtimeA.cancelRequested = true; + runtimeA.activeAbortController.abort(); + + expect(runtimeA.activeAbortController.signal.aborted).toBe(true); + expect(runtimeB.activeAbortController.signal.aborted).toBe(false); + expect(runtimeB.cancelRequested).toBe(false); + }); + + test("approval waits and resolver routing stay isolated per conversation", async () => { + const listener = __listenClientTestUtils.createListenerRuntime(); + const runtimeA = __listenClientTestUtils.getOrCreateConversationRuntime( + listener, + "agent-1", + "conv-a", + ); + const runtimeB = __listenClientTestUtils.getOrCreateConversationRuntime( + listener, + "agent-1", + "conv-b", + ); + const socket = new MockSocket(); + + const pendingA = requestApprovalOverWS( + runtimeA, + socket as unknown as WebSocket, + "perm-a", + { + type: "control_request", + request_id: "perm-a", + request: { + subtype: "can_use_tool", + tool_name: "Bash", + input: {}, + tool_call_id: "call-a", + permission_suggestions: [], + blocked_path: null, + }, + }, + ); + const pendingB = requestApprovalOverWS( + runtimeB, + socket as unknown as WebSocket, + "perm-b", + { + type: "control_request", + request_id: "perm-b", + request: { + subtype: "can_use_tool", + tool_name: "Bash", + input: {}, + tool_call_id: "call-b", + permission_suggestions: [], + blocked_path: null, + }, + }, + ); + + expect(listener.approvalRuntimeKeyByRequestId.get("perm-a")).toBe( + runtimeA.key, + ); + expect(listener.approvalRuntimeKeyByRequestId.get("perm-b")).toBe( + runtimeB.key, + ); + + const statusAWhilePending = __listenClientTestUtils.buildLoopStatus( + listener, + { + agent_id: "agent-1", + conversation_id: "conv-a", + }, + ); + const statusBWhilePending = __listenClientTestUtils.buildLoopStatus( + listener, + { + agent_id: "agent-1", + conversation_id: "conv-b", + }, + ); + expect(statusAWhilePending.status).toBe("WAITING_ON_APPROVAL"); + expect(statusBWhilePending.status).toBe("WAITING_ON_APPROVAL"); + + expect( + resolvePendingApprovalResolver(runtimeA, { + request_id: "perm-a", + decision: { behavior: "allow" }, + }), + ).toBe(true); + + await expect(pendingA).resolves.toMatchObject({ + request_id: "perm-a", + decision: { behavior: "allow" }, + }); + expect(runtimeA.pendingApprovalResolvers.size).toBe(0); + expect(runtimeB.pendingApprovalResolvers.size).toBe(1); + expect(listener.approvalRuntimeKeyByRequestId.has("perm-a")).toBe(false); + expect(listener.approvalRuntimeKeyByRequestId.get("perm-b")).toBe( + runtimeB.key, + ); + + const statusAAfterResolve = __listenClientTestUtils.buildLoopStatus( + listener, + { + agent_id: "agent-1", + conversation_id: "conv-a", + }, + ); + const statusBAfterResolve = __listenClientTestUtils.buildLoopStatus( + listener, + { + agent_id: "agent-1", + conversation_id: "conv-b", + }, + ); + expect(statusAAfterResolve.status).toBe("WAITING_ON_INPUT"); + expect(statusBAfterResolve.status).toBe("WAITING_ON_APPROVAL"); + + expect( + resolvePendingApprovalResolver(runtimeB, { + request_id: "perm-b", + decision: { behavior: "allow" }, + }), + ).toBe(true); + await expect(pendingB).resolves.toMatchObject({ + request_id: "perm-b", + decision: { behavior: "allow" }, + }); + }); + + test("recovered approval state does not leak across conversation scopes", () => { + const listener = __listenClientTestUtils.createListenerRuntime(); + const runtimeA = __listenClientTestUtils.getOrCreateConversationRuntime( + listener, + "agent-1", + "conv-a", + ); + __listenClientTestUtils.getOrCreateConversationRuntime( + listener, + "agent-1", + "conv-b", + ); + + runtimeA.recoveredApprovalState = { + agentId: "agent-1", + conversationId: "conv-a", + approvalsByRequestId: new Map([ + [ + "perm-a", + { + approval: { + toolCallId: "call-a", + toolName: "Bash", + toolArgs: "{}", + }, + controlRequest: { + type: "control_request", + request_id: "perm-a", + request: { + subtype: "can_use_tool", + tool_name: "Bash", + input: {}, + tool_call_id: "call-a", + permission_suggestions: [], + blocked_path: null, + }, + }, + }, + ], + ]), + pendingRequestIds: new Set(["perm-a"]), + responsesByRequestId: new Map(), + }; + + const loopStatusA = __listenClientTestUtils.buildLoopStatus(listener, { + agent_id: "agent-1", + conversation_id: "conv-a", + }); + const loopStatusB = __listenClientTestUtils.buildLoopStatus(listener, { + agent_id: "agent-1", + conversation_id: "conv-b", + }); + const deviceStatusA = __listenClientTestUtils.buildDeviceStatus(listener, { + agent_id: "agent-1", + conversation_id: "conv-a", + }); + const deviceStatusB = __listenClientTestUtils.buildDeviceStatus(listener, { + agent_id: "agent-1", + conversation_id: "conv-b", + }); + + expect(loopStatusA.status).toBe("WAITING_ON_APPROVAL"); + expect(loopStatusB.status).toBe("WAITING_ON_INPUT"); + expect(deviceStatusA.pending_control_requests).toHaveLength(1); + expect(deviceStatusA.pending_control_requests[0]?.request_id).toBe( + "perm-a", + ); + expect(deviceStatusB.pending_control_requests).toHaveLength(0); + }); + + test("queue dispatch respects conversation runtime boundaries", async () => { + const listener = __listenClientTestUtils.createListenerRuntime(); + __listenClientTestUtils.setActiveRuntime(listener); + const runtimeA = __listenClientTestUtils.getOrCreateScopedRuntime( + listener, + "agent-1", + "conv-a", + ); + const runtimeB = __listenClientTestUtils.getOrCreateScopedRuntime( + listener, + "agent-1", + "conv-b", + ); + const socket = new MockSocket(); + const processed: string[] = []; + + const enqueueTurn = ( + runtime: (typeof runtimeA | typeof runtimeB) & { + queueRuntime: { + enqueue: (item: { + kind: "message"; + source: "user"; + content: string; + clientMessageId: string; + agentId: string; + conversationId: string; + }) => { id: string } | null; + }; + }, + conversationId: string, + text: string, + ) => { + const item = runtime.queueRuntime.enqueue({ + kind: "message", + source: "user", + content: text, + clientMessageId: `cm-${conversationId}`, + agentId: "agent-1", + conversationId, + }); + if (!item) { + throw new Error("Expected queued item to be created"); + } + runtime.queuedMessagesByItemId.set( + item.id, + makeIncomingMessage("agent-1", conversationId, text), + ); + }; + + enqueueTurn(runtimeA, "conv-a", "queued a"); + enqueueTurn(runtimeB, "conv-b", "queued b"); + + const processQueuedTurn = mock( + async (queuedTurn: { conversationId?: string }) => { + processed.push(queuedTurn.conversationId ?? "missing"); + }, + ); + const opts = { + connectionId: "conn-1", + onStatusChange: undefined, + } as never; + + __listenClientTestUtils.scheduleQueuePump( + runtimeA, + socket as unknown as WebSocket, + opts, + processQueuedTurn, + ); + __listenClientTestUtils.scheduleQueuePump( + runtimeB, + socket as unknown as WebSocket, + opts, + processQueuedTurn, + ); + + await waitFor(() => processed.length === 2); + + expect(processed.sort()).toEqual(["conv-a", "conv-b"]); + expect(runtimeA.queueRuntime.length).toBe(0); + expect(runtimeB.queueRuntime.length).toBe(0); + expect(runtimeA.queuedMessagesByItemId.size).toBe(0); + expect(runtimeB.queuedMessagesByItemId.size).toBe(0); + }); + + test("queue pump status callbacks stay aggregate when another conversation is busy", async () => { + const listener = __listenClientTestUtils.createListenerRuntime(); + __listenClientTestUtils.setActiveRuntime(listener); + const runtimeA = __listenClientTestUtils.getOrCreateScopedRuntime( + listener, + "agent-1", + "conv-a", + ); + const runtimeB = __listenClientTestUtils.getOrCreateScopedRuntime( + listener, + "agent-1", + "conv-b", + ); + const socket = new MockSocket(); + const statuses: string[] = []; + + runtimeA.isProcessing = true; + runtimeA.loopStatus = "PROCESSING_API_RESPONSE"; + + const queueInput = { + kind: "message", + source: "user", + content: "queued b", + clientMessageId: "cm-b", + agentId: "agent-1", + conversationId: "conv-b", + } satisfies Omit; + const item = runtimeB.queueRuntime.enqueue(queueInput); + if (!item) { + throw new Error("Expected queued item to be created"); + } + runtimeB.queuedMessagesByItemId.set( + item.id, + makeIncomingMessage("agent-1", "conv-b", "queued b"), + ); + + __listenClientTestUtils.scheduleQueuePump( + runtimeB, + socket as unknown as WebSocket, + { + connectionId: "conn-1", + onStatusChange: (status: "idle" | "receiving" | "processing") => { + statuses.push(status); + }, + } as never, + async () => {}, + ); + + await waitFor(() => runtimeB.queueRuntime.length === 0); + + expect(statuses).not.toContain("idle"); + expect(statuses.every((status) => status === "processing")).toBe(true); + expect(listener.conversationRuntimes.has(runtimeB.key)).toBe(false); + expect(listener.conversationRuntimes.has(runtimeA.key)).toBe(true); + }); +}); diff --git a/src/tests/websocket/listen-client-protocol.test.ts b/src/tests/websocket/listen-client-protocol.test.ts index 10a5bf3..9006b05 100644 --- a/src/tests/websocket/listen-client-protocol.test.ts +++ b/src/tests/websocket/listen-client-protocol.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, test } from "bun:test"; +import { describe, expect, mock, test } from "bun:test"; import { mkdir, mkdtemp, realpath, rm } from "node:fs/promises"; import os from "node:os"; import { join } from "node:path"; @@ -332,31 +332,39 @@ describe("listen-client requestApprovalOverWS", () => { expect(runtime.pendingApprovalResolvers.size).toBe(0); }); - test("cleans up resolver when send throws", async () => { + test("registers a pending resolver until an approval response arrives", async () => { const runtime = __listenClientTestUtils.createRuntime(); const socket = new MockSocket(WebSocket.OPEN); - socket.sendImpl = () => { - throw new Error("send failed"); - }; const requestId = "perm-send-fail"; - await expect( - requestApprovalOverWS( - runtime, - socket as unknown as WebSocket, - requestId, - makeControlRequest(requestId), - ), - ).rejects.toThrow("send failed"); + const pending = requestApprovalOverWS( + runtime, + socket as unknown as WebSocket, + requestId, + makeControlRequest(requestId), + ); + + expect(runtime.pendingApprovalResolvers.size).toBe(1); + expect( + runtime.pendingApprovalResolvers.get(requestId)?.controlRequest, + ).toEqual(makeControlRequest(requestId)); + + rejectPendingApprovalResolvers(runtime, "cleanup"); + await expect(pending).rejects.toThrow("cleanup"); expect(runtime.pendingApprovalResolvers.size).toBe(0); }); }); describe("listen-client conversation-scoped protocol events", () => { test("queue enqueue/block updates loop status with runtime scope instead of stream_delta", async () => { - const runtime = __listenClientTestUtils.createRuntime(); + const listener = __listenClientTestUtils.createListenerRuntime(); + const runtime = __listenClientTestUtils.getOrCreateScopedRuntime( + listener, + "agent-default", + "default", + ); const socket = new MockSocket(WebSocket.OPEN); - runtime.socket = socket as unknown as WebSocket; + listener.socket = socket as unknown as WebSocket; const input: Omit = { kind: "message", @@ -396,9 +404,14 @@ describe("listen-client conversation-scoped protocol events", () => { }); test("queue dequeue keeps scope through update_queue runtime envelope", async () => { - const runtime = __listenClientTestUtils.createRuntime(); + const listener = __listenClientTestUtils.createListenerRuntime(); + const runtime = __listenClientTestUtils.getOrCreateScopedRuntime( + listener, + "agent-xyz", + "conv-xyz", + ); const socket = new MockSocket(WebSocket.OPEN); - runtime.socket = socket as unknown as WebSocket; + listener.socket = socket as unknown as WebSocket; const input: Omit = { kind: "message", @@ -465,6 +478,24 @@ describe("listen-client v2 status builders", () => { }); }); + test("resolveRuntimeScope does not guess another conversation when multiple runtimes exist", () => { + const listener = __listenClientTestUtils.createListenerRuntime(); + const runtimeA = __listenClientTestUtils.getOrCreateConversationRuntime( + listener, + "agent-1", + "conv-a", + ); + __listenClientTestUtils.getOrCreateConversationRuntime( + listener, + "agent-1", + "conv-b", + ); + + runtimeA.isProcessing = true; + + expect(__listenClientTestUtils.resolveRuntimeScope(listener)).toBeNull(); + }); + test("does not emit bootstrap status updates with __unknown_agent__ runtime", () => { const runtime = __listenClientTestUtils.createRuntime(); const socket = new MockSocket(WebSocket.OPEN); @@ -507,7 +538,12 @@ describe("listen-client v2 status builders", () => { }); test("sync replays device, loop, and queue state for the requested runtime", () => { - const runtime = __listenClientTestUtils.createRuntime(); + const listener = __listenClientTestUtils.createListenerRuntime(); + const runtime = __listenClientTestUtils.getOrCreateScopedRuntime( + listener, + "agent-1", + "default", + ); const socket = new MockSocket(WebSocket.OPEN); const queueInput = { clientMessageId: "cm-1", @@ -555,7 +591,12 @@ describe("listen-client v2 status builders", () => { }); test("recovered approvals surface as pending control requests and WAITING_ON_APPROVAL", () => { - const runtime = __listenClientTestUtils.createRuntime(); + const listener = __listenClientTestUtils.createListenerRuntime(); + const runtime = __listenClientTestUtils.getOrCreateScopedRuntime( + listener, + "agent-1", + "default", + ); const socket = new MockSocket(WebSocket.OPEN); const requestId = "perm-tool-call-1"; @@ -630,7 +671,12 @@ describe("listen-client v2 status builders", () => { }); test("starting a live turn clears stale recovered approvals for the same scope", () => { - const runtime = __listenClientTestUtils.createRuntime(); + const listener = __listenClientTestUtils.createListenerRuntime(); + const runtime = __listenClientTestUtils.getOrCreateScopedRuntime( + listener, + "agent-1", + "default", + ); runtime.recoveredApprovalState = { agentId: "agent-1", conversationId: "default", @@ -682,6 +728,72 @@ describe("listen-client v2 status builders", () => { }); expect(defaultStatus.current_working_directory).toBe("/repo/b"); }); + + test("scoped loop status is not suppressed just because another conversation is processing", () => { + const listener = __listenClientTestUtils.createListenerRuntime(); + const runtimeA = __listenClientTestUtils.getOrCreateConversationRuntime( + listener, + "agent-1", + "conv-a", + ); + const runtimeB = __listenClientTestUtils.getOrCreateConversationRuntime( + listener, + "agent-1", + "conv-b", + ); + + runtimeA.isProcessing = true; + runtimeA.loopStatus = "PROCESSING_API_RESPONSE"; + runtimeB.loopStatus = "WAITING_ON_APPROVAL"; + + expect( + __listenClientTestUtils.buildLoopStatus(listener, { + agent_id: "agent-1", + conversation_id: "conv-b", + }), + ).toEqual({ + status: "WAITING_ON_APPROVAL", + active_run_ids: [], + }); + }); + + test("scoped queue snapshots are not suppressed just because another conversation is processing", () => { + const listener = __listenClientTestUtils.createListenerRuntime(); + const runtimeA = __listenClientTestUtils.getOrCreateScopedRuntime( + listener, + "agent-1", + "conv-a", + ); + const runtimeB = __listenClientTestUtils.getOrCreateScopedRuntime( + listener, + "agent-1", + "conv-b", + ); + + runtimeA.isProcessing = true; + runtimeA.loopStatus = "PROCESSING_API_RESPONSE"; + const queueInput = { + kind: "message", + source: "user", + content: "queued b", + clientMessageId: "cm-b", + agentId: "agent-1", + conversationId: "conv-b", + } satisfies Omit; + runtimeB.queueRuntime.enqueue(queueInput); + + expect( + __listenClientTestUtils.buildQueueSnapshot(listener, { + agent_id: "agent-1", + conversation_id: "conv-b", + }), + ).toEqual([ + expect.objectContaining({ + client_message_id: "cm-b", + kind: "message", + }), + ]); + }); }); describe("listen-client cwd change handling", () => { @@ -985,8 +1097,14 @@ describe("listen-client capability-gated approval flow", () => { }); test("requestApprovalOverWS exposes the control request through device status instead of stream_delta", () => { - const runtime = __listenClientTestUtils.createRuntime(); + const listener = __listenClientTestUtils.createListenerRuntime(); + const runtime = __listenClientTestUtils.getOrCreateScopedRuntime( + listener, + "agent-1", + "default", + ); const socket = new MockSocket(WebSocket.OPEN); + listener.socket = socket as unknown as WebSocket; const requestId = "perm-adapter-test"; void requestApprovalOverWS( @@ -996,10 +1114,18 @@ describe("listen-client capability-gated approval flow", () => { makeControlRequest(requestId), ).catch(() => {}); - expect(socket.sentPayloads).toHaveLength(2); - const [loopStatus, deviceStatus] = socket.sentPayloads.map((payload) => + expect(socket.sentPayloads.length).toBeGreaterThanOrEqual(2); + const outbound = socket.sentPayloads.map((payload) => JSON.parse(payload as string), ); + const loopStatus = outbound.find( + (payload) => payload.type === "update_loop_status", + ); + const deviceStatus = outbound.find( + (payload) => payload.type === "update_device_status", + ); + expect(loopStatus).toBeDefined(); + expect(deviceStatus).toBeDefined(); expect(loopStatus.type).toBe("update_loop_status"); expect(loopStatus.loop_status.status).toBe("WAITING_ON_APPROVAL"); expect(deviceStatus.type).toBe("update_device_status"); @@ -1013,6 +1139,64 @@ describe("listen-client capability-gated approval flow", () => { // Cleanup rejectPendingApprovalResolvers(runtime, "test cleanup"); }); + + test("handled recovered approval responses reschedule queue pumping for the fallback scoped runtime", async () => { + const listener = __listenClientTestUtils.createListenerRuntime(); + const targetRuntime = + __listenClientTestUtils.getOrCreateConversationRuntime( + listener, + "agent-1", + "default", + ); + const socket = new MockSocket(WebSocket.OPEN); + const scheduleQueuePumpMock = mock(() => {}); + const resolveRecoveredApprovalResponseMock = mock(async () => true); + + const handled = await __listenClientTestUtils.handleApprovalResponseInput( + listener, + { + runtime: { agent_id: "agent-1", conversation_id: "default" }, + response: { + request_id: "perm-recovered", + decision: { behavior: "allow" }, + }, + socket: socket as unknown as WebSocket, + opts: { + onStatusChange: undefined, + connectionId: "conn-1", + }, + processQueuedTurn: async () => {}, + }, + { + resolveRuntimeForApprovalRequest: () => null, + resolvePendingApprovalResolver: () => false, + getOrCreateScopedRuntime: () => targetRuntime, + resolveRecoveredApprovalResponse: resolveRecoveredApprovalResponseMock, + scheduleQueuePump: scheduleQueuePumpMock, + }, + ); + + expect(handled).toBe(true); + expect(resolveRecoveredApprovalResponseMock).toHaveBeenCalledWith( + targetRuntime, + socket, + { + request_id: "perm-recovered", + decision: { behavior: "allow" }, + }, + expect.any(Function), + { + onStatusChange: undefined, + connectionId: "conn-1", + }, + ); + expect(scheduleQueuePumpMock).toHaveBeenCalledWith( + targetRuntime, + socket, + expect.objectContaining({ connectionId: "conn-1" }), + expect.any(Function), + ); + }); }); describe("listen-client approval recovery batch correlation", () => { diff --git a/src/tests/websocket/listen-interrupt-queue.test.ts b/src/tests/websocket/listen-interrupt-queue.test.ts index fe7215d..92662c2 100644 --- a/src/tests/websocket/listen-interrupt-queue.test.ts +++ b/src/tests/websocket/listen-interrupt-queue.test.ts @@ -20,6 +20,8 @@ import { const { createRuntime, + createListenerRuntime, + getOrCreateConversationRuntime, stopRuntime, rememberPendingApprovalBatchIds, populateInterruptQueue, @@ -100,17 +102,28 @@ describe("stopRuntime teardown", () => { }); test("increments continuationEpoch on each stop", () => { - const runtime = createRuntime(); - runtime.socket = new MockSocket(WebSocket.OPEN) as unknown as WebSocket; + const listener = createListenerRuntime(); + listener.socket = new MockSocket(WebSocket.OPEN) as unknown as WebSocket; - expect(runtime.continuationEpoch).toBe(0); - stopRuntime(runtime, true); - expect(runtime.continuationEpoch).toBe(1); + const runtimeA = getOrCreateConversationRuntime( + listener, + "agent-1", + "conv-1", + ); + const runtimeB = getOrCreateConversationRuntime( + listener, + "agent-2", + "conv-2", + ); - runtime.socket = new MockSocket(WebSocket.OPEN) as unknown as WebSocket; - runtime.intentionallyClosed = false; - stopRuntime(runtime, true); - expect(runtime.continuationEpoch).toBe(2); + expect(runtimeA.continuationEpoch).toBe(0); + expect(runtimeB.continuationEpoch).toBe(0); + + stopRuntime(listener, true); + + expect(runtimeA.continuationEpoch).toBe(1); + expect(runtimeB.continuationEpoch).toBe(1); + expect(listener.conversationRuntimes.size).toBe(0); }); }); diff --git a/src/websocket/listener/approval.ts b/src/websocket/listener/approval.ts index 20a0f72..628025d 100644 --- a/src/websocket/listener/approval.ts +++ b/src/websocket/listener/approval.ts @@ -9,10 +9,11 @@ import { emitLoopStatusIfOpen, setLoopStatus, } from "./protocol-outbound"; -import type { ListenerRuntime } from "./types"; +import { evictConversationRuntimeIfIdle } from "./runtime"; +import type { ConversationRuntime } from "./types"; export function rememberPendingApprovalBatchIds( - runtime: ListenerRuntime, + runtime: ConversationRuntime, pendingApprovals: Array<{ toolCallId: string }>, batchId: string, ): void { @@ -27,7 +28,7 @@ export function rememberPendingApprovalBatchIds( } export function resolvePendingApprovalBatchId( - runtime: ListenerRuntime, + runtime: ConversationRuntime, pendingApprovals: Array<{ toolCallId: string }>, ): string | null { const batchIds = new Set(); @@ -47,7 +48,7 @@ export function resolvePendingApprovalBatchId( } export function resolveRecoveryBatchId( - runtime: ListenerRuntime, + runtime: ConversationRuntime, pendingApprovals: Array<{ toolCallId: string }>, ): string | null { if (runtime.pendingApprovalBatchByToolCallId.size === 0) { @@ -57,7 +58,7 @@ export function resolveRecoveryBatchId( } export function clearPendingApprovalBatchIds( - runtime: ListenerRuntime, + runtime: ConversationRuntime, approvals: Array<{ toolCallId: string }>, ): void { for (const approval of approvals) { @@ -178,7 +179,7 @@ export function validateApprovalResultIds( } export function resolvePendingApprovalResolver( - runtime: ListenerRuntime, + runtime: ConversationRuntime, response: ApprovalResponseBody, ): boolean { const requestId = response.request_id; @@ -192,6 +193,7 @@ export function resolvePendingApprovalResolver( } runtime.pendingApprovalResolvers.delete(requestId); + runtime.listener.approvalRuntimeKeyByRequestId.delete(requestId); if (runtime.pendingApprovalResolvers.size === 0) { setLoopStatus( runtime, @@ -199,29 +201,49 @@ export function resolvePendingApprovalResolver( ); } pending.resolve(response); - emitLoopStatusIfOpen(runtime); - emitDeviceStatusIfOpen(runtime); + emitLoopStatusIfOpen(runtime.listener, { + agent_id: runtime.agentId, + conversation_id: runtime.conversationId, + }); + emitDeviceStatusIfOpen(runtime.listener, { + agent_id: runtime.agentId, + conversation_id: runtime.conversationId, + }); + evictConversationRuntimeIfIdle(runtime); return true; } export function rejectPendingApprovalResolvers( - runtime: ListenerRuntime, + runtime: ConversationRuntime, reason: string, ): void { for (const [, pending] of runtime.pendingApprovalResolvers) { pending.reject(new Error(reason)); } runtime.pendingApprovalResolvers.clear(); + for (const [requestId, runtimeKey] of runtime.listener + .approvalRuntimeKeyByRequestId) { + if (runtimeKey === runtime.key) { + runtime.listener.approvalRuntimeKeyByRequestId.delete(requestId); + } + } setLoopStatus( runtime, runtime.isProcessing ? "PROCESSING_API_RESPONSE" : "WAITING_ON_INPUT", ); - emitLoopStatusIfOpen(runtime); - emitDeviceStatusIfOpen(runtime); + emitLoopStatusIfOpen(runtime.listener, { + agent_id: runtime.agentId, + conversation_id: runtime.conversationId, + }); + emitDeviceStatusIfOpen(runtime.listener, { + agent_id: runtime.agentId, + conversation_id: runtime.conversationId, + }); + evictConversationRuntimeIfIdle(runtime); } export function requestApprovalOverWS( - runtime: ListenerRuntime, + runtime: ConversationRuntime, socket: WebSocket, requestId: string, controlRequest: ControlRequest, @@ -236,9 +258,16 @@ export function requestApprovalOverWS( reject, controlRequest, }); + runtime.listener.approvalRuntimeKeyByRequestId.set(requestId, runtime.key); setLoopStatus(runtime, "WAITING_ON_APPROVAL"); - emitLoopStatusIfOpen(runtime); - emitDeviceStatusIfOpen(runtime); + emitLoopStatusIfOpen(runtime.listener, { + agent_id: runtime.agentId, + conversation_id: runtime.conversationId, + }); + emitDeviceStatusIfOpen(runtime.listener, { + agent_id: runtime.agentId, + conversation_id: runtime.conversationId, + }); }); } diff --git a/src/websocket/listener/client.ts b/src/websocket/listener/client.ts index 3326d47..d44c6ae 100644 --- a/src/websocket/listener/client.ts +++ b/src/websocket/listener/client.ts @@ -16,6 +16,7 @@ import { type DequeuedBatch, QueueRuntime } from "../../queue/queueRuntime"; import { createSharedReminderState } from "../../reminders/state"; import { settingsManager } from "../../settings-manager"; import { loadTools } from "../../tools/manager"; +import type { ApprovalResponseBody } from "../../types/protocol_v2"; import { isDebugEnabled } from "../../utils/debug"; import { killAllTerminals } from "../terminalHandler"; import { @@ -75,9 +76,14 @@ import { shouldAttemptPostStopApprovalRecovery, } from "./recovery"; import { + clearConversationRuntimeState, clearRecoveredApprovalStateForScope, clearRuntimeTimers, + emitListenerStatus, + evictConversationRuntimeIfIdle, getActiveRuntime, + getListenerStatus, + getOrCreateConversationRuntime, getPendingControlRequestCount, getRecoveredApprovalStateForScope, safeEmitWsEvent, @@ -92,6 +98,7 @@ import { markAwaitingAcceptedApprovalContinuationRunId } from "./send"; import { handleIncomingMessage } from "./turn"; import type { ChangeCwdMessage, + ConversationRuntime, IncomingMessage, ListenerRuntime, ModeChangePayload, @@ -139,15 +146,185 @@ function handleModeChange( } } +function ensureConversationQueueRuntime( + listener: ListenerRuntime, + runtime: ConversationRuntime, +): ConversationRuntime { + if (runtime.queueRuntime) { + return runtime; + } + runtime.queueRuntime = new QueueRuntime({ + callbacks: { + onEnqueued: (item, queueLen) => { + runtime.pendingTurns = queueLen; + scheduleQueueEmit(listener, getQueueItemScope(item)); + }, + onDequeued: (batch) => { + runtime.pendingTurns = batch.queueLenAfter; + scheduleQueueEmit(listener, getQueueItemsScope(batch.items)); + }, + onBlocked: () => { + scheduleQueueEmit(listener, { + agent_id: runtime.agentId, + conversation_id: runtime.conversationId, + }); + }, + onCleared: (_reason, _clearedCount, items) => { + runtime.pendingTurns = 0; + scheduleQueueEmit(listener, getQueueItemsScope(items)); + evictConversationRuntimeIfIdle(runtime); + }, + onDropped: (item, _reason, queueLen) => { + runtime.pendingTurns = queueLen; + runtime.queuedMessagesByItemId.delete(item.id); + scheduleQueueEmit(listener, getQueueItemScope(item)); + evictConversationRuntimeIfIdle(runtime); + }, + }, + }); + return runtime; +} + +function getOrCreateScopedRuntime( + listener: ListenerRuntime, + agentId?: string | null, + conversationId?: string | null, +): ConversationRuntime { + return ensureConversationQueueRuntime( + listener, + getOrCreateConversationRuntime(listener, agentId, conversationId), + ); +} + +function resolveRuntimeForApprovalRequest( + listener: ListenerRuntime, + requestId?: string | null, +): ConversationRuntime | null { + if (!requestId) { + return null; + } + const runtimeKey = listener.approvalRuntimeKeyByRequestId.get(requestId); + if (!runtimeKey) { + return null; + } + return listener.conversationRuntimes.get(runtimeKey) ?? null; +} + +type ProcessQueuedTurn = ( + queuedTurn: IncomingMessage, + dequeuedBatch: DequeuedBatch, +) => Promise; + +async function handleApprovalResponseInput( + listener: ListenerRuntime, + params: { + runtime: { + agent_id?: string | null; + conversation_id?: string | null; + }; + response: ApprovalResponseBody; + socket: WebSocket; + opts: { + onStatusChange?: StartListenerOptions["onStatusChange"]; + connectionId?: string; + }; + processQueuedTurn: ProcessQueuedTurn; + }, + deps: { + resolveRuntimeForApprovalRequest: ( + listener: ListenerRuntime, + requestId?: string | null, + ) => ConversationRuntime | null; + resolvePendingApprovalResolver: ( + runtime: ConversationRuntime, + response: ApprovalResponseBody, + ) => boolean; + getOrCreateScopedRuntime: ( + listener: ListenerRuntime, + agentId?: string | null, + conversationId?: string | null, + ) => ConversationRuntime; + resolveRecoveredApprovalResponse: ( + runtime: ConversationRuntime, + socket: WebSocket, + response: ApprovalResponseBody, + processTurn: typeof handleIncomingMessage, + opts?: { + onStatusChange?: StartListenerOptions["onStatusChange"]; + connectionId?: string; + }, + ) => Promise; + scheduleQueuePump: ( + runtime: ConversationRuntime, + socket: WebSocket, + opts: StartListenerOptions, + processQueuedTurn: ProcessQueuedTurn, + ) => void; + } = { + resolveRuntimeForApprovalRequest, + resolvePendingApprovalResolver, + getOrCreateScopedRuntime, + resolveRecoveredApprovalResponse, + scheduleQueuePump, + }, +): Promise { + const approvalRuntime = deps.resolveRuntimeForApprovalRequest( + listener, + params.response.request_id, + ); + if ( + approvalRuntime && + deps.resolvePendingApprovalResolver(approvalRuntime, params.response) + ) { + deps.scheduleQueuePump( + approvalRuntime, + params.socket, + params.opts as StartListenerOptions, + params.processQueuedTurn, + ); + return true; + } + + const targetRuntime = + approvalRuntime ?? + deps.getOrCreateScopedRuntime( + listener, + params.runtime.agent_id, + params.runtime.conversation_id, + ); + if ( + await deps.resolveRecoveredApprovalResponse( + targetRuntime, + params.socket, + params.response, + handleIncomingMessage, + { + onStatusChange: params.opts.onStatusChange, + connectionId: params.opts.connectionId, + }, + ) + ) { + deps.scheduleQueuePump( + targetRuntime, + params.socket, + params.opts as StartListenerOptions, + params.processQueuedTurn, + ); + return true; + } + + return false; +} + async function handleCwdChange( msg: ChangeCwdMessage, socket: WebSocket, - runtime: ListenerRuntime, + runtime: ConversationRuntime, ): Promise { const conversationId = normalizeConversationId(msg.conversationId); const agentId = normalizeCwdAgentId(msg.agentId); const currentWorkingDirectory = getConversationWorkingDirectory( - runtime, + runtime.listener, agentId, conversationId, ); @@ -168,7 +345,7 @@ async function handleCwdChange( } setConversationWorkingDirectory( - runtime, + runtime.listener, agentId, conversationId, normalizedPath, @@ -193,78 +370,27 @@ async function handleCwdChange( function createRuntime(): ListenerRuntime { const bootWorkingDirectory = process.env.USER_CWD || process.cwd(); - const runtime: ListenerRuntime = { + return { socket: null, heartbeatInterval: null, reconnectTimeout: null, intentionallyClosed: false, hasSuccessfulConnection: false, - messageQueue: Promise.resolve(), - pendingApprovalResolvers: new Map(), - recoveredApprovalState: null, sessionId: `listen-${crypto.randomUUID()}`, eventSeqCounter: 0, lastStopReason: null, - isProcessing: false, - activeAgentId: null, - activeConversationId: null, - activeWorkingDirectory: null, - activeRunId: null, - activeRunStartedAt: null, - activeAbortController: null, - cancelRequested: false, - isRecoveringApprovals: false, - loopStatus: "WAITING_ON_INPUT", - pendingApprovalBatchByToolCallId: new Map(), - pendingInterruptedResults: null, - pendingInterruptedContext: null, - continuationEpoch: 0, - activeExecutingToolCallIds: [], - pendingInterruptedToolCallIds: null, + queueEmitScheduled: false, + pendingQueueEmitScope: undefined, + onWsEvent: undefined, reminderState: createSharedReminderState(), bootWorkingDirectory, workingDirectoryByConversation: loadPersistedCwdMap(), connectionId: null, connectionName: null, - queuedMessagesByItemId: new Map(), - queuePumpActive: false, - queuePumpScheduled: false, - queueEmitScheduled: false, - pendingQueueEmitScope: undefined, - pendingTurns: 0, - // queueRuntime assigned below — needs runtime ref in callbacks - queueRuntime: null as unknown as QueueRuntime, + conversationRuntimes: new Map(), + approvalRuntimeKeyByRequestId: new Map(), + lastEmittedStatus: null, }; - runtime.queueRuntime = new QueueRuntime({ - callbacks: { - onEnqueued: (item, queueLen) => { - runtime.pendingTurns = queueLen; - const scope = getQueueItemScope(item); - scheduleQueueEmit(runtime, scope); - }, - onDequeued: (batch) => { - runtime.pendingTurns = batch.queueLenAfter; - const scope = getQueueItemsScope(batch.items); - scheduleQueueEmit(runtime, scope); - }, - onBlocked: (_reason, _queueLen) => { - const scope = getQueueItemScope(runtime.queueRuntime.items[0]); - scheduleQueueEmit(runtime, scope); - }, - onCleared: (_reason, _clearedCount, items) => { - runtime.pendingTurns = 0; - const scope = getQueueItemsScope(items); - scheduleQueueEmit(runtime, scope); - }, - onDropped: (item, _reason, queueLen) => { - runtime.pendingTurns = queueLen; - runtime.queuedMessagesByItemId.delete(item.id); - const scope = getQueueItemScope(item); - scheduleQueueEmit(runtime, scope); - }, - }, - }); - return runtime; } function stopRuntime( @@ -272,24 +398,20 @@ function stopRuntime( suppressCallbacks: boolean, ): void { runtime.intentionallyClosed = true; - runtime.cancelRequested = true; - if ( - runtime.activeAbortController && - !runtime.activeAbortController.signal.aborted - ) { - runtime.activeAbortController.abort(); - } clearRuntimeTimers(runtime); - rejectPendingApprovalResolvers(runtime, "Listener runtime stopped"); - runtime.pendingApprovalBatchByToolCallId.clear(); - - // Clear interrupted queue on true teardown to prevent cross-session leakage. - runtime.pendingInterruptedResults = null; - runtime.pendingInterruptedContext = null; - runtime.pendingInterruptedToolCallIds = null; - runtime.activeExecutingToolCallIds = []; - runtime.loopStatus = "WAITING_ON_INPUT"; - runtime.continuationEpoch++; + for (const conversationRuntime of runtime.conversationRuntimes.values()) { + rejectPendingApprovalResolvers( + conversationRuntime, + "Listener runtime stopped", + ); + clearConversationRuntimeState(conversationRuntime); + if (conversationRuntime.queueRuntime) { + conversationRuntime.queuedMessagesByItemId.clear(); + conversationRuntime.queueRuntime.clear("shutdown"); + } + } + runtime.conversationRuntimes.clear(); + runtime.approvalRuntimeKeyByRequestId.clear(); if (!runtime.socket) { return; @@ -397,14 +519,19 @@ async function connectWithRetry( }); runtime.socket = socket; - const processQueuedTurn = async ( + const processQueuedTurn: ProcessQueuedTurn = async ( queuedTurn: IncomingMessage, dequeuedBatch: DequeuedBatch, ): Promise => { + const scopedRuntime = getOrCreateScopedRuntime( + runtime, + queuedTurn.agentId, + queuedTurn.conversationId, + ); await handleIncomingMessage( queuedTurn, socket, - runtime, + scopedRuntime, opts.onStatusChange, opts.connectionId, dequeuedBatch.batchId, @@ -420,8 +547,19 @@ async function connectWithRetry( runtime.hasSuccessfulConnection = true; opts.onConnected(opts.connectionId); - emitDeviceStatusUpdate(socket, runtime); - emitLoopStatusUpdate(socket, runtime); + if (runtime.conversationRuntimes.size === 0) { + emitDeviceStatusUpdate(socket, runtime); + emitLoopStatusUpdate(socket, runtime); + } else { + for (const conversationRuntime of runtime.conversationRuntimes.values()) { + const scope = { + agent_id: conversationRuntime.agentId, + conversation_id: conversationRuntime.conversationId, + }; + emitDeviceStatusUpdate(socket, conversationRuntime, scope); + emitLoopStatusUpdate(socket, conversationRuntime, scope); + } + } runtime.heartbeatInterval = setInterval(() => { if (socket.readyState === WebSocket.OPEN) { @@ -471,7 +609,14 @@ async function connectWithRetry( console.log(`[Listen V2] Dropping sync: runtime mismatch or closed`); return; } - await recoverApprovalStateForSync(runtime, parsed.runtime); + await recoverApprovalStateForSync( + getOrCreateScopedRuntime( + runtime, + parsed.runtime.agent_id, + parsed.runtime.conversation_id, + ), + parsed.runtime, + ); emitStateSync(socket, runtime, parsed.runtime); return; } @@ -486,23 +631,19 @@ async function connectWithRetry( } if (parsed.payload.kind === "approval_response") { - if (resolvePendingApprovalResolver(runtime, parsed.payload)) { - scheduleQueuePump(runtime, socket, opts, processQueuedTurn); - return; - } if ( - await resolveRecoveredApprovalResponse( - runtime, + await handleApprovalResponseInput(runtime, { + runtime: parsed.runtime, + response: parsed.payload, socket, - parsed.payload, - handleIncomingMessage, - { + opts: { onStatusChange: opts.onStatusChange, connectionId: opts.connectionId, }, - ) + processQueuedTurn, + }) ) { - scheduleQueuePump(runtime, socket, opts, processQueuedTurn); + return; } return; } @@ -541,6 +682,12 @@ async function connectWithRetry( return; } + const scopedRuntime = getOrCreateScopedRuntime( + runtime, + incoming.agentId, + incoming.conversationId, + ); + if (shouldQueueInboundMessage(incoming)) { const firstUserPayload = incoming.messages.find( ( @@ -549,7 +696,7 @@ async function connectWithRetry( "content" in payload, ); if (firstUserPayload) { - const enqueuedItem = runtime.queueRuntime.enqueue({ + const enqueuedItem = scopedRuntime.queueRuntime.enqueue({ kind: "message", source: "user", content: firstUserPayload.content, @@ -558,37 +705,37 @@ async function connectWithRetry( `cm-submit-${crypto.randomUUID()}`, agentId: parsed.runtime.agent_id, conversationId: parsed.runtime.conversation_id || "default", - } as Parameters[0]); + } as Parameters[0]); if (enqueuedItem) { - runtime.queuedMessagesByItemId.set(enqueuedItem.id, incoming); + scopedRuntime.queuedMessagesByItemId.set(enqueuedItem.id, incoming); } } - scheduleQueuePump(runtime, socket, opts, processQueuedTurn); + scheduleQueuePump(scopedRuntime, socket, opts, processQueuedTurn); return; } - runtime.messageQueue = runtime.messageQueue + scopedRuntime.messageQueue = scopedRuntime.messageQueue .then(async () => { if (runtime !== getActiveRuntime() || runtime.intentionallyClosed) { return; } - opts.onStatusChange?.("receiving", opts.connectionId); + emitListenerStatus(runtime, opts.onStatusChange, opts.connectionId); await handleIncomingMessage( incoming, socket, - runtime, + scopedRuntime, opts.onStatusChange, opts.connectionId, ); - opts.onStatusChange?.("idle", opts.connectionId); - scheduleQueuePump(runtime, socket, opts, processQueuedTurn); + emitListenerStatus(runtime, opts.onStatusChange, opts.connectionId); + scheduleQueuePump(scopedRuntime, socket, opts, processQueuedTurn); }) .catch((error: unknown) => { if (process.env.DEBUG) { console.error("[Listen] Error handling queued input:", error); } - opts.onStatusChange?.("idle", opts.connectionId); - scheduleQueuePump(runtime, socket, opts, processQueuedTurn); + emitListenerStatus(runtime, opts.onStatusChange, opts.connectionId); + scheduleQueuePump(scopedRuntime, socket, opts, processQueuedTurn); }); return; } @@ -605,11 +752,16 @@ async function connectWithRetry( parsed.runtime.conversation_id ?? undefined, }; + const scopedRuntime = getOrCreateScopedRuntime( + runtime, + scope.agent_id, + scope.conversation_id, + ); const shouldTrackCommand = - !runtime.isProcessing && + !scopedRuntime.isProcessing && getPendingControlRequestCount(runtime, scope) === 0; if (shouldTrackCommand) { - setLoopStatus(runtime, "EXECUTING_COMMAND", scope); + setLoopStatus(scopedRuntime, "EXECUTING_COMMAND", scope); } try { if (parsed.payload.mode) { @@ -628,14 +780,14 @@ async function connectWithRetry( cwd: parsed.payload.cwd, }, socket, - runtime, + scopedRuntime, ); } else if (!parsed.payload.mode) { emitDeviceStatusUpdate(socket, runtime, scope); } } finally { if (shouldTrackCommand) { - setLoopStatus(runtime, "WAITING_ON_INPUT", scope); + setLoopStatus(scopedRuntime, "WAITING_ON_INPUT", scope); } } return; @@ -651,42 +803,47 @@ async function connectWithRetry( agent_id: parsed.runtime.agent_id, conversation_id: parsed.runtime.conversation_id, }) > 0; - const hasActiveTurn = runtime.isProcessing; + const scopedRuntime = getOrCreateScopedRuntime( + runtime, + parsed.runtime.agent_id, + parsed.runtime.conversation_id, + ); + const hasActiveTurn = scopedRuntime.isProcessing; if (!hasActiveTurn && !hasPendingApprovals) { return; } - runtime.cancelRequested = true; + scopedRuntime.cancelRequested = true; // Eager interrupt capture parity with App/headless: // if tool execution is currently in-flight, queue explicit interrupted // tool results immediately at cancel time (before async catch paths). if ( - runtime.activeExecutingToolCallIds.length > 0 && - (!runtime.pendingInterruptedResults || - runtime.pendingInterruptedResults.length === 0) + scopedRuntime.activeExecutingToolCallIds.length > 0 && + (!scopedRuntime.pendingInterruptedResults || + scopedRuntime.pendingInterruptedResults.length === 0) ) { - runtime.pendingInterruptedResults = - runtime.activeExecutingToolCallIds.map((toolCallId) => ({ + scopedRuntime.pendingInterruptedResults = + scopedRuntime.activeExecutingToolCallIds.map((toolCallId) => ({ type: "tool", tool_call_id: toolCallId, tool_return: INTERRUPTED_BY_USER, status: "error", })); - runtime.pendingInterruptedContext = { - agentId: runtime.activeAgentId || "", - conversationId: runtime.activeConversationId || "default", - continuationEpoch: runtime.continuationEpoch, + scopedRuntime.pendingInterruptedContext = { + agentId: scopedRuntime.agentId || "", + conversationId: scopedRuntime.conversationId, + continuationEpoch: scopedRuntime.continuationEpoch, }; - runtime.pendingInterruptedToolCallIds = [ - ...runtime.activeExecutingToolCallIds, + scopedRuntime.pendingInterruptedToolCallIds = [ + ...scopedRuntime.activeExecutingToolCallIds, ]; } if ( - runtime.activeAbortController && - !runtime.activeAbortController.signal.aborted + scopedRuntime.activeAbortController && + !scopedRuntime.activeAbortController.signal.aborted ) { - runtime.activeAbortController.abort(); + scopedRuntime.activeAbortController.abort(); } const recoveredApprovalState = getRecoveredApprovalStateForScope( runtime, @@ -696,15 +853,15 @@ async function connectWithRetry( }, ); if (recoveredApprovalState && !hasActiveTurn) { - stashRecoveredApprovalInterrupts(runtime, recoveredApprovalState); + stashRecoveredApprovalInterrupts(scopedRuntime, recoveredApprovalState); } if (hasPendingApprovals) { - rejectPendingApprovalResolvers(runtime, "Cancelled by user"); + rejectPendingApprovalResolvers(scopedRuntime, "Cancelled by user"); } if (!hasActiveTurn && hasPendingApprovals) { - emitInterruptedStatusDelta(socket, runtime, { - runId: runtime.activeRunId, + emitInterruptedStatusDelta(socket, scopedRuntime, { + runId: scopedRuntime.activeRunId, agentId: parsed.runtime.agent_id, conversationId: parsed.runtime.conversation_id, }); @@ -712,8 +869,8 @@ async function connectWithRetry( // Backend cancel parity with TUI (App.tsx:5932-5941). // Fire-and-forget — local cancel + queued results are the primary mechanism. - const cancelConversationId = runtime.activeConversationId; - const cancelAgentId = runtime.activeAgentId; + const cancelConversationId = scopedRuntime.conversationId; + const cancelAgentId = scopedRuntime.agentId; if (cancelAgentId) { getClient() .then((client) => { @@ -728,7 +885,7 @@ async function connectWithRetry( }); } - scheduleQueuePump(runtime, socket, opts, processQueuedTurn); + scheduleQueuePump(scopedRuntime, socket, opts, processQueuedTurn); return; } }); @@ -746,8 +903,12 @@ async function connectWithRetry( // Single authoritative queue clear for all close paths // (intentional and unintentional). Must fire before early returns. - runtime.queuedMessagesByItemId.clear(); - runtime.queueRuntime.clear("shutdown"); + for (const conversationRuntime of runtime.conversationRuntimes.values()) { + conversationRuntime.queuedMessagesByItemId.clear(); + if (conversationRuntime.queueRuntime) { + conversationRuntime.queueRuntime.clear("shutdown"); + } + } if (isDebugEnabled()) { console.log( @@ -758,7 +919,14 @@ async function connectWithRetry( clearRuntimeTimers(runtime); killAllTerminals(); runtime.socket = null; - rejectPendingApprovalResolvers(runtime, "WebSocket disconnected"); + for (const conversationRuntime of runtime.conversationRuntimes.values()) { + rejectPendingApprovalResolvers( + conversationRuntime, + "WebSocket disconnected", + ); + clearConversationRuntimeState(conversationRuntime); + evictConversationRuntimeIfIdle(conversationRuntime); + } if (runtime.intentionallyClosed) { opts.onDisconnected(); @@ -825,6 +993,203 @@ export function stopListenerClient(): void { stopRuntime(runtime, true); } +function asListenerRuntimeForTests( + runtime: ListenerRuntime | ConversationRuntime, +): ListenerRuntime { + return "listener" in runtime ? runtime.listener : runtime; +} + +function createLegacyTestRuntime(): ConversationRuntime & { + activeAgentId: string | null; + activeConversationId: string; + socket: WebSocket | null; + workingDirectoryByConversation: Map; + bootWorkingDirectory: string; + connectionId: string | null; + connectionName: string | null; + sessionId: string; + eventSeqCounter: number; + queueEmitScheduled: boolean; + pendingQueueEmitScope?: { + agent_id?: string | null; + conversation_id?: string | null; + }; + onWsEvent?: StartListenerOptions["onWsEvent"]; + reminderState: ListenerRuntime["reminderState"]; + reconnectTimeout: NodeJS.Timeout | null; + heartbeatInterval: NodeJS.Timeout | null; + intentionallyClosed: boolean; + hasSuccessfulConnection: boolean; + conversationRuntimes: ListenerRuntime["conversationRuntimes"]; + approvalRuntimeKeyByRequestId: ListenerRuntime["approvalRuntimeKeyByRequestId"]; + lastEmittedStatus: ListenerRuntime["lastEmittedStatus"]; +} { + const listener = createRuntime(); + const runtime = getOrCreateScopedRuntime(listener, null, "default"); + const bridge = runtime as ConversationRuntime & { + activeAgentId: string | null; + activeConversationId: string; + socket: WebSocket | null; + workingDirectoryByConversation: Map; + bootWorkingDirectory: string; + connectionId: string | null; + connectionName: string | null; + sessionId: string; + eventSeqCounter: number; + queueEmitScheduled: boolean; + pendingQueueEmitScope?: { + agent_id?: string | null; + conversation_id?: string | null; + }; + onWsEvent?: StartListenerOptions["onWsEvent"]; + reminderState: ListenerRuntime["reminderState"]; + reconnectTimeout: NodeJS.Timeout | null; + heartbeatInterval: NodeJS.Timeout | null; + intentionallyClosed: boolean; + hasSuccessfulConnection: boolean; + conversationRuntimes: ListenerRuntime["conversationRuntimes"]; + approvalRuntimeKeyByRequestId: ListenerRuntime["approvalRuntimeKeyByRequestId"]; + lastEmittedStatus: ListenerRuntime["lastEmittedStatus"]; + }; + for (const [prop, getSet] of Object.entries({ + socket: { + get: () => listener.socket, + set: (value: WebSocket | null) => { + listener.socket = value; + }, + }, + workingDirectoryByConversation: { + get: () => listener.workingDirectoryByConversation, + set: (value: Map) => { + listener.workingDirectoryByConversation = value; + }, + }, + bootWorkingDirectory: { + get: () => listener.bootWorkingDirectory, + set: (value: string) => { + listener.bootWorkingDirectory = value; + }, + }, + connectionId: { + get: () => listener.connectionId, + set: (value: string | null) => { + listener.connectionId = value; + }, + }, + connectionName: { + get: () => listener.connectionName, + set: (value: string | null) => { + listener.connectionName = value; + }, + }, + sessionId: { + get: () => listener.sessionId, + set: (value: string) => { + listener.sessionId = value; + }, + }, + eventSeqCounter: { + get: () => listener.eventSeqCounter, + set: (value: number) => { + listener.eventSeqCounter = value; + }, + }, + queueEmitScheduled: { + get: () => listener.queueEmitScheduled, + set: (value: boolean) => { + listener.queueEmitScheduled = value; + }, + }, + pendingQueueEmitScope: { + get: () => listener.pendingQueueEmitScope, + set: ( + value: + | { + agent_id?: string | null; + conversation_id?: string | null; + } + | undefined, + ) => { + listener.pendingQueueEmitScope = value; + }, + }, + onWsEvent: { + get: () => listener.onWsEvent, + set: (value: StartListenerOptions["onWsEvent"] | undefined) => { + listener.onWsEvent = value; + }, + }, + reminderState: { + get: () => listener.reminderState, + set: (value: ListenerRuntime["reminderState"]) => { + listener.reminderState = value; + }, + }, + reconnectTimeout: { + get: () => listener.reconnectTimeout, + set: (value: NodeJS.Timeout | null) => { + listener.reconnectTimeout = value; + }, + }, + heartbeatInterval: { + get: () => listener.heartbeatInterval, + set: (value: NodeJS.Timeout | null) => { + listener.heartbeatInterval = value; + }, + }, + intentionallyClosed: { + get: () => listener.intentionallyClosed, + set: (value: boolean) => { + listener.intentionallyClosed = value; + }, + }, + hasSuccessfulConnection: { + get: () => listener.hasSuccessfulConnection, + set: (value: boolean) => { + listener.hasSuccessfulConnection = value; + }, + }, + conversationRuntimes: { + get: () => listener.conversationRuntimes, + set: (value: ListenerRuntime["conversationRuntimes"]) => { + listener.conversationRuntimes = value; + }, + }, + approvalRuntimeKeyByRequestId: { + get: () => listener.approvalRuntimeKeyByRequestId, + set: (value: ListenerRuntime["approvalRuntimeKeyByRequestId"]) => { + listener.approvalRuntimeKeyByRequestId = value; + }, + }, + lastEmittedStatus: { + get: () => listener.lastEmittedStatus, + set: (value: ListenerRuntime["lastEmittedStatus"]) => { + listener.lastEmittedStatus = value; + }, + }, + activeAgentId: { + get: () => runtime.agentId, + set: (value: string | null) => { + runtime.agentId = value; + }, + }, + activeConversationId: { + get: () => runtime.conversationId, + set: (value: string) => { + runtime.conversationId = value; + }, + }, + })) { + Object.defineProperty(bridge, prop, { + configurable: true, + enumerable: false, + get: getSet.get, + set: getSet.set, + }); + } + return bridge; +} + export { rejectPendingApprovalResolvers, requestApprovalOverWS, @@ -834,8 +1199,16 @@ export { parseServerMessage } from "./protocol-inbound"; export { emitInterruptedStatusDelta } from "./protocol-outbound"; export const __listenClientTestUtils = { - createRuntime, - stopRuntime, + createRuntime: createLegacyTestRuntime, + createListenerRuntime: createRuntime, + getOrCreateScopedRuntime, + stopRuntime: ( + runtime: ListenerRuntime | ConversationRuntime, + suppressCallbacks: boolean, + ) => stopRuntime(asListenerRuntimeForTests(runtime), suppressCallbacks), + setActiveRuntime, + getListenerStatus, + getOrCreateConversationRuntime, resolveRuntimeScope, buildDeviceStatus, buildLoopStatus, @@ -864,7 +1237,20 @@ export const __listenClientTestUtils = { markAwaitingAcceptedApprovalContinuationRunId, normalizeMessageContentImages, normalizeInboundMessages, + handleIncomingMessage, + handleApprovalResponseInput, + scheduleQueuePump, recoverApprovalStateForSync, - clearRecoveredApprovalStateForScope, + clearRecoveredApprovalStateForScope: ( + runtime: ListenerRuntime | ConversationRuntime, + scope?: { + agent_id?: string | null; + conversation_id?: string | null; + }, + ) => + clearRecoveredApprovalStateForScope( + asListenerRuntimeForTests(runtime), + scope, + ), emitStateSync, }; diff --git a/src/websocket/listener/interrupts.ts b/src/websocket/listener/interrupts.ts index 706c940..5ab887e 100644 --- a/src/websocket/listener/interrupts.ts +++ b/src/websocket/listener/interrupts.ts @@ -14,9 +14,9 @@ import { } from "./protocol-outbound"; import { clearRecoveredApprovalState } from "./runtime"; import type { + ConversationRuntime, InterruptPopulateInput, InterruptToolReturn, - ListenerRuntime, RecoveredApprovalState, } from "./types"; @@ -87,7 +87,7 @@ export function normalizeInterruptedApprovalsForQueue( } export function normalizeExecutionResultsForInterruptParity( - runtime: ListenerRuntime, + runtime: ConversationRuntime, executionResults: ApprovalResult[], executingToolCallIds: string[], ): ApprovalResult[] { @@ -273,7 +273,7 @@ export function extractInterruptToolReturns( export function emitInterruptToolReturnMessage( socket: WebSocket, - runtime: ListenerRuntime, + runtime: ConversationRuntime, approvals: ApprovalResult[] | null, runId?: string | null, uuidPrefix: string = "interrupt-tool-return", @@ -308,8 +308,8 @@ export function emitInterruptToolReturnMessage( ], }, { - agent_id: runtime.activeAgentId ?? undefined, - conversation_id: runtime.activeConversationId ?? undefined, + agent_id: runtime.agentId ?? undefined, + conversation_id: runtime.conversationId, }, ); } @@ -317,7 +317,7 @@ export function emitInterruptToolReturnMessage( export function emitToolExecutionStartedEvents( socket: WebSocket, - runtime: ListenerRuntime, + runtime: ConversationRuntime, params: { toolCallIds: string[]; runId?: string | null; @@ -339,7 +339,7 @@ export function emitToolExecutionStartedEvents( export function emitToolExecutionFinishedEvents( socket: WebSocket, - runtime: ListenerRuntime, + runtime: ConversationRuntime, params: { approvals: ApprovalResult[] | null; runId?: string | null; @@ -362,7 +362,7 @@ export function emitToolExecutionFinishedEvents( } export function getInterruptApprovalsForEmission( - runtime: ListenerRuntime, + runtime: ConversationRuntime, params: { lastExecutionResults: ApprovalResult[] | null; agentId: string; @@ -391,7 +391,7 @@ export function getInterruptApprovalsForEmission( } export function populateInterruptQueue( - runtime: ListenerRuntime, + runtime: ConversationRuntime, input: InterruptPopulateInput, ): boolean { const shouldPopulate = @@ -467,7 +467,7 @@ export function populateInterruptQueue( } export function consumeInterruptQueue( - runtime: ListenerRuntime, + runtime: ConversationRuntime, agentId: string, conversationId: string, ): { @@ -519,7 +519,7 @@ export function consumeInterruptQueue( } export function stashRecoveredApprovalInterrupts( - runtime: ListenerRuntime, + runtime: ConversationRuntime, recovered: RecoveredApprovalState, ): boolean { const approvals = [...recovered.approvalsByRequestId.values()].map( diff --git a/src/websocket/listener/protocol-outbound.ts b/src/websocket/listener/protocol-outbound.ts index 6dd919d..c38d927 100644 --- a/src/websocket/listener/protocol-outbound.ts +++ b/src/websocket/listener/protocol-outbound.ts @@ -23,21 +23,51 @@ import type { import { SYSTEM_REMINDER_RE } from "./constants"; import { getConversationWorkingDirectory } from "./cwd"; import { + getConversationRuntime, getPendingControlRequests, getRecoveredApprovalStateForScope, nextEventSeq, safeEmitWsEvent, } from "./runtime"; import { - isScopeCurrentlyActive, resolveRuntimeScope, resolveScopedAgentId, resolveScopedConversationId, } from "./scope"; -import type { IncomingMessage, ListenerRuntime } from "./types"; +import type { + ConversationRuntime, + IncomingMessage, + ListenerRuntime, +} from "./types"; + +type RuntimeCarrier = ListenerRuntime | ConversationRuntime | null; + +function getListenerRuntime(runtime: RuntimeCarrier): ListenerRuntime | null { + if (!runtime) return null; + return "listener" in runtime ? runtime.listener : runtime; +} + +function getScopeForRuntime( + runtime: RuntimeCarrier, + scope?: { + agent_id?: string | null; + conversation_id?: string | null; + }, +): { + agent_id?: string | null; + conversation_id?: string | null; +} { + if (runtime && "listener" in runtime) { + return { + agent_id: scope?.agent_id ?? runtime.agentId, + conversation_id: scope?.conversation_id ?? runtime.conversationId, + }; + } + return scope ?? {}; +} export function emitRuntimeStateUpdates( - runtime: ListenerRuntime, + runtime: RuntimeCarrier, scope?: { agent_id?: string | null; conversation_id?: string | null; @@ -48,16 +78,35 @@ export function emitRuntimeStateUpdates( } export function buildDeviceStatus( - runtime: ListenerRuntime, + runtime: RuntimeCarrier, params?: { agent_id?: string | null; conversation_id?: string | null; }, ): DeviceStatus { - const scopedAgentId = resolveScopedAgentId(runtime, params); - const scopedConversationId = resolveScopedConversationId(runtime, params); - const scopeActive = isScopeCurrentlyActive( - runtime, + const listener = getListenerRuntime(runtime); + if (!listener) { + return { + current_connection_id: null, + connection_name: null, + is_online: false, + is_processing: false, + current_permission_mode: permissionMode.getMode(), + current_working_directory: process.cwd(), + letta_code_version: process.env.npm_package_version || null, + current_toolset: null, + current_toolset_preference: "auto", + current_loaded_tools: getToolNames(), + current_available_skills: [], + background_processes: [], + pending_control_requests: [], + }; + } + const scope = getScopeForRuntime(runtime, params); + const scopedAgentId = resolveScopedAgentId(listener, scope); + const scopedConversationId = resolveScopedConversationId(listener, scope); + const conversationRuntime = getConversationRuntime( + listener, scopedAgentId, scopedConversationId, ); @@ -72,13 +121,13 @@ export function buildDeviceStatus( } })(); return { - current_connection_id: runtime.connectionId, - connection_name: runtime.connectionName, - is_online: runtime.socket?.readyState === WebSocket.OPEN, - is_processing: scopeActive && runtime.isProcessing, + current_connection_id: listener.connectionId, + connection_name: listener.connectionName, + is_online: listener.socket?.readyState === WebSocket.OPEN, + is_processing: !!conversationRuntime?.isProcessing, current_permission_mode: permissionMode.getMode(), current_working_directory: getConversationWorkingDirectory( - runtime, + listener, scopedAgentId, scopedConversationId, ), @@ -88,44 +137,62 @@ export function buildDeviceStatus( current_loaded_tools: getToolNames(), current_available_skills: [], background_processes: [], - pending_control_requests: getPendingControlRequests(runtime, params), + pending_control_requests: getPendingControlRequests(listener, scope), }; } export function buildLoopStatus( - runtime: ListenerRuntime, + runtime: RuntimeCarrier, params?: { agent_id?: string | null; conversation_id?: string | null; }, ): LoopState { - const scopedAgentId = resolveScopedAgentId(runtime, params); - const scopedConversationId = resolveScopedConversationId(runtime, params); - const scopeActive = isScopeCurrentlyActive( - runtime, + const listener = getListenerRuntime(runtime); + if (!listener) { + return { status: "WAITING_ON_INPUT", active_run_ids: [] }; + } + const scope = getScopeForRuntime(runtime, params); + const scopedAgentId = resolveScopedAgentId(listener, scope); + const scopedConversationId = resolveScopedConversationId(listener, scope); + const conversationRuntime = getConversationRuntime( + listener, scopedAgentId, scopedConversationId, ); - - if (!scopeActive) { - return { status: "WAITING_ON_INPUT", active_run_ids: [] }; - } - - const recovered = getRecoveredApprovalStateForScope(runtime, params); + const recovered = getRecoveredApprovalStateForScope(listener, scope); const status = recovered && recovered.pendingRequestIds.size > 0 && - runtime.loopStatus === "WAITING_ON_INPUT" + conversationRuntime?.loopStatus === "WAITING_ON_INPUT" ? "WAITING_ON_APPROVAL" - : runtime.loopStatus; + : (conversationRuntime?.loopStatus ?? "WAITING_ON_INPUT"); return { status, - active_run_ids: runtime.activeRunId ? [runtime.activeRunId] : [], + active_run_ids: conversationRuntime?.activeRunId + ? [conversationRuntime.activeRunId] + : [], }; } -export function buildQueueSnapshot(runtime: ListenerRuntime): QueueMessage[] { - return runtime.queueRuntime.items.map((item) => ({ +export function buildQueueSnapshot( + runtime: RuntimeCarrier, + params?: { + agent_id?: string | null; + conversation_id?: string | null; + }, +): QueueMessage[] { + const listener = getListenerRuntime(runtime); + if (!listener) { + return []; + } + const scope = getScopeForRuntime(runtime, params); + const conversationRuntime = getConversationRuntime( + listener, + resolveScopedAgentId(listener, scope), + resolveScopedConversationId(listener, scope), + ); + return (conversationRuntime?.queueRuntime.items ?? []).map((item) => ({ id: item.id, client_message_id: item.clientMessageId ?? `cm-${item.id}`, kind: item.kind, @@ -136,7 +203,7 @@ export function buildQueueSnapshot(runtime: ListenerRuntime): QueueMessage[] { } export function setLoopStatus( - runtime: ListenerRuntime, + runtime: ConversationRuntime, status: LoopStatus, scope?: { agent_id?: string | null; @@ -152,7 +219,7 @@ export function setLoopStatus( export function emitProtocolV2Message( socket: WebSocket, - runtime: ListenerRuntime | null, + runtime: RuntimeCarrier, message: Omit< WsProtocolMessage, "runtime" | "event_seq" | "emitted_at" | "idempotency_key" @@ -165,11 +232,15 @@ export function emitProtocolV2Message( if (socket.readyState !== WebSocket.OPEN) { return; } - const runtimeScope = resolveRuntimeScope(runtime, scope); + const listener = getListenerRuntime(runtime); + const runtimeScope = resolveRuntimeScope( + listener, + getScopeForRuntime(runtime, scope), + ); if (!runtimeScope) { return; } - const eventSeq = nextEventSeq(runtime); + const eventSeq = nextEventSeq(listener); if (eventSeq === null) { return; } @@ -201,7 +272,7 @@ export function emitProtocolV2Message( export function emitDeviceStatusUpdate( socket: WebSocket, - runtime: ListenerRuntime, + runtime: RuntimeCarrier, scope?: { agent_id?: string | null; conversation_id?: string | null; @@ -219,7 +290,7 @@ export function emitDeviceStatusUpdate( export function emitLoopStatusUpdate( socket: WebSocket, - runtime: ListenerRuntime, + runtime: RuntimeCarrier, scope?: { agent_id?: string | null; conversation_id?: string | null; @@ -236,53 +307,52 @@ export function emitLoopStatusUpdate( } export function emitLoopStatusIfOpen( - runtime: ListenerRuntime, + runtime: RuntimeCarrier, scope?: { agent_id?: string | null; conversation_id?: string | null; }, ): void { - if (runtime.socket?.readyState === WebSocket.OPEN) { - emitLoopStatusUpdate(runtime.socket, runtime, scope); + const listener = getListenerRuntime(runtime); + if (listener?.socket?.readyState === WebSocket.OPEN) { + emitLoopStatusUpdate(listener.socket, runtime, scope); } } export function emitDeviceStatusIfOpen( - runtime: ListenerRuntime, + runtime: RuntimeCarrier, scope?: { agent_id?: string | null; conversation_id?: string | null; }, ): void { - if (runtime.socket?.readyState === WebSocket.OPEN) { - emitDeviceStatusUpdate(runtime.socket, runtime, scope); + const listener = getListenerRuntime(runtime); + if (listener?.socket?.readyState === WebSocket.OPEN) { + emitDeviceStatusUpdate(listener.socket, runtime, scope); } } export function emitQueueUpdate( socket: WebSocket, - runtime: ListenerRuntime, + runtime: RuntimeCarrier, scope?: { agent_id?: string | null; conversation_id?: string | null; }, ): void { - const scopedAgentId = resolveScopedAgentId(runtime, scope); - const scopedConversationId = resolveScopedConversationId(runtime, scope); - const scopeActive = isScopeCurrentlyActive( - runtime, - scopedAgentId, - scopedConversationId, - ); - + const listener = getListenerRuntime(runtime); + if (!listener) { + return; + } + const resolvedScope = getScopeForRuntime(runtime, scope); const message: Omit< QueueUpdateMessage, "runtime" | "event_seq" | "emitted_at" | "idempotency_key" > = { type: "update_queue", - queue: scopeActive ? buildQueueSnapshot(runtime) : [], + queue: buildQueueSnapshot(runtime, resolvedScope), }; - emitProtocolV2Message(socket, runtime, message, scope); + emitProtocolV2Message(socket, runtime, message, resolvedScope); } export function isSystemReminderPart(part: unknown): boolean { @@ -305,7 +375,7 @@ export function isSystemReminderPart(part: unknown): boolean { export function emitDequeuedUserMessage( socket: WebSocket, - runtime: ListenerRuntime, + runtime: RuntimeCarrier, incoming: IncomingMessage, batch: DequeuedBatch, ): void { @@ -353,20 +423,21 @@ export function emitDequeuedUserMessage( } export function emitQueueUpdateIfOpen( - runtime: ListenerRuntime, + runtime: RuntimeCarrier, scope?: { agent_id?: string | null; conversation_id?: string | null; }, ): void { - if (runtime.socket?.readyState === WebSocket.OPEN) { - emitQueueUpdate(runtime.socket, runtime, scope); + const listener = getListenerRuntime(runtime); + if (listener?.socket?.readyState === WebSocket.OPEN) { + emitQueueUpdate(listener.socket, runtime, scope); } } export function emitStateSync( socket: WebSocket, - runtime: ListenerRuntime, + runtime: RuntimeCarrier, scope: RuntimeScope, ): void { emitDeviceStatusUpdate(socket, runtime, scope); @@ -413,7 +484,7 @@ export function createLifecycleMessageBase( export function emitCanonicalMessageDelta( socket: WebSocket, - runtime: ListenerRuntime | null, + runtime: RuntimeCarrier, delta: StreamDelta, scope?: { agent_id?: string | null; @@ -425,7 +496,7 @@ export function emitCanonicalMessageDelta( export function emitLoopErrorDelta( socket: WebSocket, - runtime: ListenerRuntime | null, + runtime: RuntimeCarrier, params: { message: string; stopReason: StopReasonType; @@ -453,7 +524,7 @@ export function emitLoopErrorDelta( export function emitRetryDelta( socket: WebSocket, - runtime: ListenerRuntime, + runtime: RuntimeCarrier, params: { message: string; reason: StopReasonType; @@ -481,7 +552,7 @@ export function emitRetryDelta( export function emitStatusDelta( socket: WebSocket, - runtime: ListenerRuntime | null, + runtime: RuntimeCarrier, params: { message: string; level: StatusMessage["level"]; @@ -503,7 +574,7 @@ export function emitStatusDelta( export function emitInterruptedStatusDelta( socket: WebSocket, - runtime: ListenerRuntime | null, + runtime: RuntimeCarrier, params: { runId?: string | null; agentId?: string | null; @@ -521,7 +592,7 @@ export function emitInterruptedStatusDelta( export function emitStreamDelta( socket: WebSocket, - runtime: ListenerRuntime | null, + runtime: RuntimeCarrier, delta: StreamDelta, scope?: { agent_id?: string | null; diff --git a/src/websocket/listener/queue.ts b/src/websocket/listener/queue.ts index 0c33015..ba1fcaa 100644 --- a/src/websocket/listener/queue.ts +++ b/src/websocket/listener/queue.ts @@ -9,12 +9,18 @@ import type { import { mergeQueuedTurnInput } from "../../queue/turnQueueRuntime"; import { getListenerBlockedReason } from "../helpers/listenerQueueAdapter"; import { emitDequeuedUserMessage } from "./protocol-outbound"; -import { getActiveRuntime, getPendingControlRequestCount } from "./runtime"; +import { + emitListenerStatus, + evictConversationRuntimeIfIdle, + getActiveRuntime, + getListenerStatus, + getPendingControlRequestCount, +} from "./runtime"; import { resolveRuntimeScope } from "./scope"; import type { + ConversationRuntime, InboundMessagePayload, IncomingMessage, - ListenerRuntime, StartListenerOptions, } from "./types"; @@ -188,7 +194,7 @@ function getPrimaryQueueMessageItem(items: QueueItem[]): QueueItem | null { } function buildQueuedTurnMessage( - runtime: ListenerRuntime, + runtime: ConversationRuntime, batch: DequeuedBatch, ): IncomingMessage | null { const primaryItem = getPrimaryQueueMessageItem(batch.items); @@ -241,13 +247,16 @@ export function shouldQueueInboundMessage(parsed: IncomingMessage): boolean { } function computeListenerQueueBlockedReason( - runtime: ListenerRuntime, + runtime: ConversationRuntime, ): QueueBlockedReason | null { - const activeScope = resolveRuntimeScope(runtime); + const activeScope = resolveRuntimeScope(runtime.listener, { + agent_id: runtime.agentId, + conversation_id: runtime.conversationId, + }); return getListenerBlockedReason({ isProcessing: runtime.isProcessing, pendingApprovalsLen: activeScope - ? getPendingControlRequestCount(runtime, activeScope) + ? getPendingControlRequestCount(runtime.listener, activeScope) : 0, cancelRequested: runtime.cancelRequested, isRecoveringApprovals: runtime.isRecoveringApprovals, @@ -255,7 +264,7 @@ function computeListenerQueueBlockedReason( } async function drainQueuedMessages( - runtime: ListenerRuntime, + runtime: ConversationRuntime, socket: WebSocket, opts: StartListenerOptions, processQueuedTurn: ( @@ -270,7 +279,10 @@ async function drainQueuedMessages( runtime.queuePumpActive = true; try { while (true) { - if (runtime !== getActiveRuntime() || runtime.intentionallyClosed) { + if ( + runtime.listener !== getActiveRuntime() || + runtime.listener.intentionallyClosed + ) { return; } @@ -297,17 +309,33 @@ async function drainQueuedMessages( emitDequeuedUserMessage(socket, runtime, queuedTurn, dequeuedBatch); - opts.onStatusChange?.("receiving", opts.connectionId); + const preTurnStatus = + getListenerStatus(runtime.listener) === "processing" + ? "processing" + : "receiving"; + if ( + opts.connectionId && + runtime.listener.lastEmittedStatus !== preTurnStatus + ) { + runtime.listener.lastEmittedStatus = preTurnStatus; + opts.onStatusChange?.(preTurnStatus, opts.connectionId); + } await processQueuedTurn(queuedTurn, dequeuedBatch); - opts.onStatusChange?.("idle", opts.connectionId); + emitListenerStatus( + runtime.listener, + opts.onStatusChange, + opts.connectionId, + ); + evictConversationRuntimeIfIdle(runtime); } } finally { runtime.queuePumpActive = false; + evictConversationRuntimeIfIdle(runtime); } } export function scheduleQueuePump( - runtime: ListenerRuntime, + runtime: ConversationRuntime, socket: WebSocket, opts: StartListenerOptions, processQueuedTurn: ( @@ -322,7 +350,10 @@ export function scheduleQueuePump( runtime.messageQueue = runtime.messageQueue .then(async () => { runtime.queuePumpScheduled = false; - if (runtime !== getActiveRuntime() || runtime.intentionallyClosed) { + if ( + runtime.listener !== getActiveRuntime() || + runtime.listener.intentionallyClosed + ) { return; } await drainQueuedMessages(runtime, socket, opts, processQueuedTurn); @@ -330,6 +361,11 @@ export function scheduleQueuePump( .catch((error: unknown) => { runtime.queuePumpScheduled = false; console.error("[Listen] Error in queue pump:", error); - opts.onStatusChange?.("idle", opts.connectionId); + emitListenerStatus( + runtime.listener, + opts.onStatusChange, + opts.connectionId, + ); + evictConversationRuntimeIfIdle(runtime); }); } diff --git a/src/websocket/listener/recovery.ts b/src/websocket/listener/recovery.ts index 7dd79b6..a7582c6 100644 --- a/src/websocket/listener/recovery.ts +++ b/src/websocket/listener/recovery.ts @@ -44,8 +44,8 @@ import { } from "./protocol-outbound"; import { clearActiveRunState, clearRecoveredApprovalState } from "./runtime"; import type { + ConversationRuntime, IncomingMessage, - ListenerRuntime, RecoveredPendingApproval, } from "./types"; @@ -128,7 +128,7 @@ export async function isRetriablePostStopError( export async function drainRecoveryStreamWithEmission( recoveryStream: Stream, socket: WebSocket, - runtime: ListenerRuntime, + runtime: ConversationRuntime, params: { agentId?: string | null; conversationId: string; @@ -195,7 +195,7 @@ export async function drainRecoveryStreamWithEmission( } export function finalizeHandledRecoveryTurn( - runtime: ListenerRuntime, + runtime: ConversationRuntime, socket: WebSocket, params: { drainResult: Awaited>; @@ -256,7 +256,7 @@ export function getApprovalContinuationRecoveryDisposition( } export async function debugLogApprovalResumeState( - runtime: ListenerRuntime, + runtime: ConversationRuntime, params: { agentId: string; conversationId: string; @@ -319,12 +319,12 @@ export async function debugLogApprovalResumeState( } export async function recoverApprovalStateForSync( - runtime: ListenerRuntime, + runtime: ConversationRuntime, scope: { agent_id: string; conversation_id: string }, ): Promise { const sameActiveScope = - runtime.activeAgentId === scope.agent_id && - runtime.activeConversationId === scope.conversation_id; + runtime.agentId === scope.agent_id && + runtime.conversationId === scope.conversation_id; if ( sameActiveScope && @@ -385,7 +385,7 @@ export async function recoverApprovalStateForSync( approval.toolName, input, getConversationWorkingDirectory( - runtime, + runtime.listener, scope.agent_id, scope.conversation_id, ), @@ -422,13 +422,13 @@ export async function recoverApprovalStateForSync( } export async function resolveRecoveredApprovalResponse( - runtime: ListenerRuntime, + runtime: ConversationRuntime, socket: WebSocket, response: ApprovalResponseBody, processTurn: ( msg: IncomingMessage, socket: WebSocket, - runtime: ListenerRuntime, + runtime: ConversationRuntime, onStatusChange?: ( status: "idle" | "receiving" | "processing", connectionId: string, @@ -515,10 +515,8 @@ export async function resolveRecoveredApprovalResponse( emitRuntimeStateUpdates(runtime, scope); runtime.isProcessing = true; - runtime.activeAgentId = recovered.agentId; - runtime.activeConversationId = recovered.conversationId; runtime.activeWorkingDirectory = getConversationWorkingDirectory( - runtime, + runtime.listener, recovered.agentId, recovered.conversationId, ); @@ -537,7 +535,7 @@ export async function resolveRecoveredApprovalResponse( const approvalResults = await executeApprovalBatch(decisions, undefined, { abortSignal: recoveryAbortController.signal, workingDirectory: getConversationWorkingDirectory( - runtime, + runtime.listener, recovered.agentId, recovered.conversationId, ), diff --git a/src/websocket/listener/runtime.ts b/src/websocket/listener/runtime.ts index 6af7965..ba04eb1 100644 --- a/src/websocket/listener/runtime.ts +++ b/src/websocket/listener/runtime.ts @@ -1,6 +1,16 @@ import type { PendingControlRequest } from "../../types/protocol_v2"; -import { resolveScopedAgentId, resolveScopedConversationId } from "./scope"; -import type { ListenerRuntime, RecoveredApprovalState } from "./types"; +import { + normalizeConversationId, + normalizeCwdAgentId, + resolveScopedAgentId, + resolveScopedConversationId, +} from "./scope"; +import type { + ConversationRuntime, + ListenerRuntime, + RecoveredApprovalState, + StartListenerOptions, +} from "./types"; let activeRuntime: ListenerRuntime | null = null; @@ -43,17 +53,195 @@ export function clearRuntimeTimers(runtime: ListenerRuntime): void { } } -export function clearActiveRunState(runtime: ListenerRuntime): void { - runtime.activeAgentId = null; - runtime.activeConversationId = null; +export function evictConversationRuntimeIfIdle( + runtime: ConversationRuntime, +): boolean { + if ( + runtime.isProcessing || + runtime.isRecoveringApprovals || + runtime.queuePumpActive || + runtime.queuePumpScheduled || + runtime.pendingTurns > 0 || + runtime.pendingApprovalResolvers.size > 0 || + runtime.pendingApprovalBatchByToolCallId.size > 0 || + runtime.recoveredApprovalState !== null || + runtime.pendingInterruptedResults !== null || + runtime.pendingInterruptedContext !== null || + runtime.activeExecutingToolCallIds.length > 0 || + (runtime.pendingInterruptedToolCallIds?.length ?? 0) > 0 || + runtime.activeRunId !== null || + runtime.activeRunStartedAt !== null || + runtime.activeAbortController !== null || + runtime.cancelRequested || + runtime.queuedMessagesByItemId.size > 0 || + runtime.queueRuntime?.length > 0 + ) { + return false; + } + + if (runtime.listener.conversationRuntimes.get(runtime.key) !== runtime) { + return false; + } + + runtime.listener.conversationRuntimes.delete(runtime.key); + for (const [requestId, runtimeKey] of runtime.listener + .approvalRuntimeKeyByRequestId) { + if (runtimeKey === runtime.key) { + runtime.listener.approvalRuntimeKeyByRequestId.delete(requestId); + } + } + if ( + runtime.listener.pendingQueueEmitScope?.agent_id === runtime.agentId && + normalizeConversationId( + runtime.listener.pendingQueueEmitScope?.conversation_id, + ) === runtime.conversationId + ) { + runtime.listener.pendingQueueEmitScope = undefined; + } + return true; +} + +export function getListenerStatus( + listener: ListenerRuntime, +): "idle" | "receiving" | "processing" { + let hasPendingTurns = false; + for (const runtime of listener.conversationRuntimes.values()) { + if (runtime.isProcessing || runtime.isRecoveringApprovals) { + return "processing"; + } + if (runtime.pendingTurns > 0) { + hasPendingTurns = true; + } + } + return hasPendingTurns ? "receiving" : "idle"; +} + +export function emitListenerStatus( + listener: ListenerRuntime, + onStatusChange: StartListenerOptions["onStatusChange"] | undefined, + connectionId: string | undefined, +): void { + if (!connectionId) { + return; + } + const status = getListenerStatus(listener); + if (listener.lastEmittedStatus === status) { + return; + } + listener.lastEmittedStatus = status; + onStatusChange?.(status, connectionId); +} + +export function getConversationRuntimeKey( + agentId?: string | null, + conversationId?: string | null, +): string { + const normalizedConversationId = normalizeConversationId(conversationId); + const normalizedAgentId = normalizeCwdAgentId(agentId); + return `agent:${normalizedAgentId ?? "__unknown__"}::conversation:${normalizedConversationId}`; +} + +export function createConversationRuntime( + listener: ListenerRuntime, + agentId?: string | null, + conversationId?: string | null, +): ConversationRuntime { + const normalizedAgentId = normalizeCwdAgentId(agentId); + const normalizedConversationId = normalizeConversationId(conversationId); + const conversationRuntime: ConversationRuntime = { + listener, + key: getConversationRuntimeKey(normalizedAgentId, normalizedConversationId), + agentId: normalizedAgentId, + conversationId: normalizedConversationId, + messageQueue: Promise.resolve(), + pendingApprovalResolvers: new Map(), + recoveredApprovalState: null, + lastStopReason: null, + isProcessing: false, + activeWorkingDirectory: null, + activeRunId: null, + activeRunStartedAt: null, + activeAbortController: null, + cancelRequested: false, + queueRuntime: null as unknown as ConversationRuntime["queueRuntime"], + queuedMessagesByItemId: new Map(), + queuePumpActive: false, + queuePumpScheduled: false, + pendingTurns: 0, + isRecoveringApprovals: false, + loopStatus: "WAITING_ON_INPUT", + pendingApprovalBatchByToolCallId: new Map(), + pendingInterruptedResults: null, + pendingInterruptedContext: null, + continuationEpoch: 0, + activeExecutingToolCallIds: [], + pendingInterruptedToolCallIds: null, + }; + listener.conversationRuntimes.set( + conversationRuntime.key, + conversationRuntime, + ); + return conversationRuntime; +} + +export function getConversationRuntime( + listener: ListenerRuntime, + agentId?: string | null, + conversationId?: string | null, +): ConversationRuntime | null { + return ( + listener.conversationRuntimes.get( + getConversationRuntimeKey(agentId, conversationId), + ) ?? null + ); +} + +export function getOrCreateConversationRuntime( + listener: ListenerRuntime, + agentId?: string | null, + conversationId?: string | null, +): ConversationRuntime { + return ( + getConversationRuntime(listener, agentId, conversationId) ?? + createConversationRuntime(listener, agentId, conversationId) + ); +} + +export function clearActiveRunState(runtime: ConversationRuntime): void { runtime.activeWorkingDirectory = null; runtime.activeRunId = null; runtime.activeRunStartedAt = null; runtime.activeAbortController = null; } -export function clearRecoveredApprovalState(runtime: ListenerRuntime): void { +export function clearRecoveredApprovalState( + runtime: ConversationRuntime, +): void { runtime.recoveredApprovalState = null; + evictConversationRuntimeIfIdle(runtime); +} + +export function clearConversationRuntimeState( + runtime: ConversationRuntime, +): void { + runtime.cancelRequested = true; + if ( + runtime.activeAbortController && + !runtime.activeAbortController.signal.aborted + ) { + runtime.activeAbortController.abort(); + } + runtime.pendingApprovalBatchByToolCallId.clear(); + runtime.pendingInterruptedResults = null; + runtime.pendingInterruptedContext = null; + runtime.pendingInterruptedToolCallIds = null; + runtime.activeExecutingToolCallIds = []; + runtime.loopStatus = "WAITING_ON_INPUT"; + runtime.continuationEpoch += 1; + runtime.pendingTurns = 0; + runtime.queuePumpActive = false; + runtime.queuePumpScheduled = false; + clearActiveRunState(runtime); } export function getRecoveredApprovalStateForScope( @@ -68,7 +256,12 @@ export function getRecoveredApprovalStateForScope( return null; } const scopedConversationId = resolveScopedConversationId(runtime, params); - const recovered = runtime.recoveredApprovalState; + const conversationRuntime = getConversationRuntime( + runtime, + scopedAgentId, + scopedConversationId, + ); + const recovered = conversationRuntime?.recoveredApprovalState; if (!recovered) { return null; } @@ -85,9 +278,18 @@ export function clearRecoveredApprovalStateForScope( conversation_id?: string | null; }, ): void { - const recovered = getRecoveredApprovalStateForScope(runtime, params); - if (recovered) { - clearRecoveredApprovalState(runtime); + const scopedAgentId = resolveScopedAgentId(runtime, params); + if (!scopedAgentId) { + return; + } + const scopedConversationId = resolveScopedConversationId(runtime, params); + const conversationRuntime = getConversationRuntime( + runtime, + scopedAgentId, + scopedConversationId, + ); + if (conversationRuntime?.recoveredApprovalState) { + clearRecoveredApprovalState(conversationRuntime); } } @@ -100,30 +302,27 @@ export function getPendingControlRequests( ): PendingControlRequest[] { const scopedAgentId = resolveScopedAgentId(runtime, params); const scopedConversationId = resolveScopedConversationId(runtime, params); + const conversationRuntime = getConversationRuntime( + runtime, + scopedAgentId, + scopedConversationId, + ); const requests: PendingControlRequest[] = []; - for (const pending of runtime.pendingApprovalResolvers.values()) { + if (!conversationRuntime) { + return requests; + } + + for (const pending of conversationRuntime.pendingApprovalResolvers.values()) { const request = pending.controlRequest; if (!request) continue; - if ( - scopedAgentId && - (request.agent_id ?? scopedAgentId) !== scopedAgentId - ) { - continue; - } - if ( - scopedConversationId && - (request.conversation_id ?? scopedConversationId) !== scopedConversationId - ) { - continue; - } requests.push({ request_id: request.request_id, request: request.request, }); } - const recovered = getRecoveredApprovalStateForScope(runtime, params); + const recovered = conversationRuntime.recoveredApprovalState; if (recovered) { for (const requestId of recovered.pendingRequestIds) { const entry = recovered.approvalsByRequestId.get(requestId); diff --git a/src/websocket/listener/scope.ts b/src/websocket/listener/scope.ts index baa52e2..7090995 100644 --- a/src/websocket/listener/scope.ts +++ b/src/websocket/listener/scope.ts @@ -1,5 +1,14 @@ import type { RuntimeScope } from "../../types/protocol_v2"; -import type { ListenerRuntime } from "./types"; +import type { ConversationRuntime, ListenerRuntime } from "./types"; + +function getOnlyConversationRuntime( + runtime: ListenerRuntime | null, +): ConversationRuntime | null { + if (!runtime || runtime.conversationRuntimes.size !== 1) { + return null; + } + return runtime.conversationRuntimes.values().next().value ?? null; +} export function normalizeCwdAgentId(agentId?: string | null): string | null { return agentId && agentId.length > 0 ? agentId : null; @@ -19,9 +28,14 @@ export function resolveScopedAgentId( agent_id?: string | null; }, ): string | null { - return ( - normalizeCwdAgentId(params?.agent_id) ?? runtime?.activeAgentId ?? null - ); + if (!runtime) { + return normalizeCwdAgentId(params?.agent_id) ?? null; + } + const explicitAgentId = normalizeCwdAgentId(params?.agent_id); + if (explicitAgentId) { + return explicitAgentId; + } + return getOnlyConversationRuntime(runtime)?.agentId ?? null; } export function resolveScopedConversationId( @@ -30,8 +44,15 @@ export function resolveScopedConversationId( conversation_id?: string | null; }, ): string { - return normalizeConversationId( - params?.conversation_id ?? runtime?.activeConversationId, + if (!runtime) { + return normalizeConversationId(params?.conversation_id); + } + if (params?.conversation_id) { + return normalizeConversationId(params.conversation_id); + } + return ( + getOnlyConversationRuntime(runtime)?.conversationId ?? + normalizeConversationId(params?.conversation_id) ); } @@ -52,19 +73,3 @@ export function resolveRuntimeScope( conversation_id: resolvedConversationId, }; } - -export function isScopeCurrentlyActive( - runtime: ListenerRuntime, - agentId: string | null, - conversationId: string, -): boolean { - if (!runtime.isProcessing) return true; - - const activeAgent = runtime.activeAgentId; - const activeConvo = normalizeConversationId(runtime.activeConversationId); - - if (agentId && activeAgent && agentId !== activeAgent) return false; - if (conversationId !== activeConvo) return false; - - return true; -} diff --git a/src/websocket/listener/send.ts b/src/websocket/listener/send.ts index 41d2314..fdae2d7 100644 --- a/src/websocket/listener/send.ts +++ b/src/websocket/listener/send.ts @@ -52,7 +52,7 @@ import { getApprovalContinuationRecoveryDisposition, isApprovalToolCallDesyncError, } from "./recovery"; -import type { ListenerRuntime } from "./types"; +import type { ConversationRuntime } from "./types"; export function isApprovalOnlyInput( input: Array, @@ -66,7 +66,7 @@ export function isApprovalOnlyInput( } export function markAwaitingAcceptedApprovalContinuationRunId( - runtime: ListenerRuntime, + runtime: ConversationRuntime, input: Array, ): void { if (isApprovalOnlyInput(input)) { @@ -80,16 +80,16 @@ export function markAwaitingAcceptedApprovalContinuationRunId( * touch pendingInterruptedResults (that's exclusively owned by handleIncomingMessage). */ async function resolveStaleApprovals( - runtime: ListenerRuntime, + runtime: ConversationRuntime, socket: WebSocket, abortSignal: AbortSignal, ): Promise> | null> { - if (!runtime.activeAgentId) return null; + if (!runtime.agentId) return null; const client = await getClient(); let agent: Awaited>; try { - agent = await client.agents.retrieve(runtime.activeAgentId); + agent = await client.agents.retrieve(runtime.agentId); } catch (err) { if (err instanceof APIError && (err.status === 404 || err.status === 422)) { return null; @@ -97,9 +97,7 @@ async function resolveStaleApprovals( throw err; } const requestedConversationId = - runtime.activeConversationId && runtime.activeConversationId !== "default" - ? runtime.activeConversationId - : undefined; + runtime.conversationId !== "default" ? runtime.conversationId : undefined; let resumeData: Awaited>; try { @@ -117,16 +115,16 @@ async function resolveStaleApprovals( if (pendingApprovals.length === 0) return null; if (abortSignal.aborted) throw new Error("Cancelled"); - const recoveryConversationId = runtime.activeConversationId || "default"; + const recoveryConversationId = runtime.conversationId; const recoveryWorkingDirectory = runtime.activeWorkingDirectory ?? getConversationWorkingDirectory( - runtime, - runtime.activeAgentId, + runtime.listener, + runtime.agentId, recoveryConversationId, ); const scope = { - agent_id: runtime.activeAgentId, + agent_id: runtime.agentId, conversation_id: recoveryConversationId, } as const; @@ -187,7 +185,7 @@ async function resolveStaleApprovals( blocked_path: null, ...(diffs.length > 0 ? { diffs } : {}), }, - agent_id: runtime.activeAgentId, + agent_id: runtime.agentId, conversation_id: recoveryConversationId, }; @@ -246,7 +244,7 @@ async function resolveStaleApprovals( emitToolExecutionStartedEvents(socket, runtime, { toolCallIds: approvedToolCallIds, runId: runtime.activeRunId ?? undefined, - agentId: runtime.activeAgentId, + agentId: runtime.agentId ?? undefined, conversationId: recoveryConversationId, }); @@ -258,7 +256,7 @@ async function resolveStaleApprovals( emitToolExecutionFinishedEvents(socket, runtime, { approvals: approvalResults, runId: runtime.activeRunId ?? undefined, - agentId: runtime.activeAgentId, + agentId: runtime.agentId ?? undefined, conversationId: recoveryConversationId, }); emitInterruptToolReturnMessage( @@ -273,7 +271,7 @@ async function resolveStaleApprovals( recoveryConversationId, [{ type: "approval", approvals: approvalResults }], { - agentId: runtime.activeAgentId, + agentId: runtime.agentId ?? undefined, streamTokens: true, background: true, workingDirectory: recoveryWorkingDirectory, @@ -294,7 +292,7 @@ async function resolveStaleApprovals( socket, runtime, { - agentId: runtime.activeAgentId, + agentId: runtime.agentId ?? undefined, conversationId: recoveryConversationId, abortSignal, }, @@ -324,7 +322,7 @@ export async function sendMessageStreamWithRetry( messages: Parameters[1], opts: Parameters[2], socket: WebSocket, - runtime: ListenerRuntime, + runtime: ConversationRuntime, abortSignal?: AbortSignal, ): Promise>> { let transientRetries = 0; @@ -340,7 +338,7 @@ export async function sendMessageStreamWithRetry( } runtime.isRecoveringApprovals = false; setLoopStatus(runtime, "WAITING_FOR_API_RESPONSE", { - agent_id: runtime.activeAgentId, + agent_id: runtime.agentId, conversation_id: conversationId, }); @@ -380,7 +378,7 @@ export async function sendMessageStreamWithRetry( if (approvalConflictDetected) { runtime.isRecoveringApprovals = true; setLoopStatus(runtime, "RETRYING_API_REQUEST", { - agent_id: runtime.activeAgentId, + agent_id: runtime.agentId, conversation_id: conversationId, }); if (abortSignal?.aborted) throw new Error("Cancelled by user"); @@ -408,7 +406,7 @@ export async function sendMessageStreamWithRetry( if (action === "retry_transient") { runtime.isRecoveringApprovals = true; setLoopStatus(runtime, "RETRYING_API_REQUEST", { - agent_id: runtime.activeAgentId, + agent_id: runtime.agentId, conversation_id: conversationId, }); const attempt = transientRetries + 1; @@ -434,7 +432,7 @@ export async function sendMessageStreamWithRetry( attempt, maxAttempts: LLM_API_ERROR_MAX_RETRIES, delayMs, - agentId: runtime.activeAgentId ?? undefined, + agentId: runtime.agentId ?? undefined, conversationId, }); } @@ -449,7 +447,7 @@ export async function sendMessageStreamWithRetry( if (action === "retry_conversation_busy") { runtime.isRecoveringApprovals = true; setLoopStatus(runtime, "RETRYING_API_REQUEST", { - agent_id: runtime.activeAgentId, + agent_id: runtime.agentId, conversation_id: conversationId, }); try { @@ -459,7 +457,7 @@ export async function sendMessageStreamWithRetry( { conversationId, resolvedConversationId: conversationId, - agentId: runtime.activeAgentId, + agentId: runtime.agentId ?? null, requestStartedAtMs, }, ); @@ -500,7 +498,7 @@ export async function sendMessageStreamWithRetry( attempt, maxAttempts: MAX_CONVERSATION_BUSY_RETRIES, delayMs, - agentId: runtime.activeAgentId ?? undefined, + agentId: runtime.agentId ?? undefined, conversationId, }); @@ -521,7 +519,7 @@ export async function sendApprovalContinuationWithRetry( messages: Parameters[1], opts: Parameters[2], socket: WebSocket, - runtime: ListenerRuntime, + runtime: ConversationRuntime, abortSignal?: AbortSignal, retryOptions: { allowApprovalRecovery?: boolean; @@ -541,7 +539,7 @@ export async function sendApprovalContinuationWithRetry( } runtime.isRecoveringApprovals = false; setLoopStatus(runtime, "WAITING_FOR_API_RESPONSE", { - agent_id: runtime.activeAgentId, + agent_id: runtime.agentId, conversation_id: conversationId, }); @@ -581,7 +579,7 @@ export async function sendApprovalContinuationWithRetry( if (approvalConflictDetected) { runtime.isRecoveringApprovals = true; setLoopStatus(runtime, "RETRYING_API_REQUEST", { - agent_id: runtime.activeAgentId, + agent_id: runtime.agentId, conversation_id: conversationId, }); @@ -603,7 +601,7 @@ export async function sendApprovalContinuationWithRetry( ) { finalizeHandledRecoveryTurn(runtime, socket, { drainResult, - agentId: runtime.activeAgentId, + agentId: runtime.agentId ?? undefined, conversationId, }); return null; @@ -621,7 +619,7 @@ export async function sendApprovalContinuationWithRetry( if (action === "retry_transient") { runtime.isRecoveringApprovals = true; setLoopStatus(runtime, "RETRYING_API_REQUEST", { - agent_id: runtime.activeAgentId, + agent_id: runtime.agentId, conversation_id: conversationId, }); const attempt = transientRetries + 1; @@ -649,7 +647,7 @@ export async function sendApprovalContinuationWithRetry( conversationBusyRetries += 1; runtime.isRecoveringApprovals = true; setLoopStatus(runtime, "RETRYING_API_REQUEST", { - agent_id: runtime.activeAgentId, + agent_id: runtime.agentId, conversation_id: conversationId, }); @@ -660,7 +658,7 @@ export async function sendApprovalContinuationWithRetry( { conversationId, resolvedConversationId: conversationId, - agentId: runtime.activeAgentId, + agentId: runtime.agentId ?? null, requestStartedAtMs, }, ); diff --git a/src/websocket/listener/turn-approval.ts b/src/websocket/listener/turn-approval.ts index b065ab4..571a058 100644 --- a/src/websocket/listener/turn-approval.ts +++ b/src/websocket/listener/turn-approval.ts @@ -40,7 +40,7 @@ import { markAwaitingAcceptedApprovalContinuationRunId, sendApprovalContinuationWithRetry, } from "./send"; -import type { ListenerRuntime } from "./types"; +import type { ConversationRuntime } from "./types"; type Decision = | { @@ -79,7 +79,7 @@ export async function handleApprovalStop(params: { toolName: string; toolArgs: string; }>; - runtime: ListenerRuntime; + runtime: ConversationRuntime; socket: WebSocket; agentId: string; conversationId: string; @@ -121,8 +121,6 @@ export async function handleApprovalStop(params: { agent_id: agentId, conversation_id: conversationId, }); - runtime.activeAgentId = null; - runtime.activeConversationId = null; runtime.activeWorkingDirectory = null; runtime.activeRunId = null; runtime.activeRunStartedAt = null; diff --git a/src/websocket/listener/turn.ts b/src/websocket/listener/turn.ts index 4cc073c..84a985d 100644 --- a/src/websocket/listener/turn.ts +++ b/src/websocket/listener/turn.ts @@ -59,6 +59,7 @@ import { import { clearActiveRunState, clearRecoveredApprovalStateForScope, + evictConversationRuntimeIfIdle, } from "./runtime"; import { normalizeCwdAgentId } from "./scope"; import { @@ -68,12 +69,12 @@ import { sendMessageStreamWithRetry, } from "./send"; import { handleApprovalStop } from "./turn-approval"; -import type { IncomingMessage, ListenerRuntime } from "./types"; +import type { ConversationRuntime, IncomingMessage } from "./types"; export async function handleIncomingMessage( msg: IncomingMessage, socket: WebSocket, - runtime: ListenerRuntime, + runtime: ConversationRuntime, onStatusChange?: ( status: "idle" | "receiving" | "processing", connectionId: string, @@ -86,7 +87,7 @@ export async function handleIncomingMessage( const conversationId = requestedConversationId ?? "default"; const normalizedAgentId = normalizeCwdAgentId(agentId); const turnWorkingDirectory = getConversationWorkingDirectory( - runtime, + runtime.listener, normalizedAgentId, conversationId, ); @@ -103,8 +104,6 @@ export async function handleIncomingMessage( runtime.isProcessing = true; runtime.cancelRequested = false; runtime.activeAbortController = new AbortController(); - runtime.activeAgentId = agentId ?? null; - runtime.activeConversationId = conversationId; runtime.activeWorkingDirectory = turnWorkingDirectory; runtime.activeRunId = null; runtime.activeRunStartedAt = new Date().toISOString(); @@ -113,7 +112,7 @@ export async function handleIncomingMessage( agent_id: agentId ?? null, conversation_id: conversationId, }); - clearRecoveredApprovalStateForScope(runtime, { + clearRecoveredApprovalStateForScope(runtime.listener, { agent_id: agentId ?? null, conversation_id: conversationId, }); @@ -174,7 +173,7 @@ export async function handleIncomingMessage( const { parts: reminderParts } = await buildSharedReminderParts( buildListenReminderContext({ agentId: agentId || "", - state: runtime.reminderState, + state: runtime.listener.reminderState, resolvePlanModeReminder: getPlanModeReminder, }), ); @@ -747,5 +746,6 @@ export async function handleIncomingMessage( runtime.cancelRequested = false; runtime.isRecoveringApprovals = false; runtime.activeExecutingToolCallIds = []; + evictConversationRuntimeIfIdle(runtime); } } diff --git a/src/websocket/listener/types.ts b/src/websocket/listener/types.ts index 0f95a43..14bcf9a 100644 --- a/src/websocket/listener/types.ts +++ b/src/websocket/listener/types.ts @@ -96,21 +96,16 @@ export type RecoveredApprovalState = { responsesByRequestId: Map; }; -export type ListenerRuntime = { - socket: WebSocket | null; - heartbeatInterval: NodeJS.Timeout | null; - reconnectTimeout: NodeJS.Timeout | null; - intentionallyClosed: boolean; - hasSuccessfulConnection: boolean; +export type ConversationRuntime = { + listener: ListenerRuntime; + key: string; + agentId: string | null; + conversationId: string; messageQueue: Promise; pendingApprovalResolvers: Map; recoveredApprovalState: RecoveredApprovalState | null; - sessionId: string; - eventSeqCounter: number; lastStopReason: string | null; isProcessing: boolean; - activeAgentId: string | null; - activeConversationId: string | null; activeWorkingDirectory: string | null; activeRunId: string | null; activeRunStartedAt: string | null; @@ -120,13 +115,7 @@ export type ListenerRuntime = { queuedMessagesByItemId: Map; queuePumpActive: boolean; queuePumpScheduled: boolean; - queueEmitScheduled: boolean; - pendingQueueEmitScope?: { - agent_id?: string | null; - conversation_id?: string | null; - }; pendingTurns: number; - onWsEvent?: StartListenerOptions["onWsEvent"]; isRecoveringApprovals: boolean; loopStatus: LoopStatus; pendingApprovalBatchByToolCallId: Map; @@ -139,11 +128,31 @@ export type ListenerRuntime = { continuationEpoch: number; activeExecutingToolCallIds: string[]; pendingInterruptedToolCallIds: string[] | null; +}; + +export type ListenerRuntime = { + socket: WebSocket | null; + heartbeatInterval: NodeJS.Timeout | null; + reconnectTimeout: NodeJS.Timeout | null; + intentionallyClosed: boolean; + hasSuccessfulConnection: boolean; + sessionId: string; + eventSeqCounter: number; + lastStopReason: string | null; + queueEmitScheduled: boolean; + pendingQueueEmitScope?: { + agent_id?: string | null; + conversation_id?: string | null; + }; + onWsEvent?: StartListenerOptions["onWsEvent"]; reminderState: SharedReminderState; bootWorkingDirectory: string; workingDirectoryByConversation: Map; connectionId: string | null; connectionName: string | null; + conversationRuntimes: Map; + approvalRuntimeKeyByRequestId: Map; + lastEmittedStatus: "idle" | "receiving" | "processing" | null; }; export interface InterruptPopulateInput {