From 4c9f63c4e2a4fd13860c5baf1f0e8c0ed9f188c0 Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Tue, 10 Mar 2026 13:42:42 -0700 Subject: [PATCH] feat(remote): support per-conversation working directories in listener mode (#1323) --- src/agent/approval-execution.ts | 24 +- src/agent/check-approval.ts | 13 +- src/agent/message.ts | 5 +- src/agent/modify.ts | 3 +- src/cli/components/ConversationSelector.tsx | 13 +- src/cli/helpers/approvalClassification.ts | 10 +- src/cli/subcommands/messages.ts | 4 +- src/helpers/diffPreview.ts | 15 +- src/tests/agent/getResumeData.test.ts | 6 +- src/tests/helpers/diffPreview.test.ts | 31 +++ .../websocket/listen-client-protocol.test.ts | 111 ++++++++ src/tools/manager.ts | 41 ++- src/websocket/listen-client.ts | 251 +++++++++++++++++- 13 files changed, 482 insertions(+), 45 deletions(-) diff --git a/src/agent/approval-execution.ts b/src/agent/approval-execution.ts index 8715c34..cf2dd46 100644 --- a/src/agent/approval-execution.ts +++ b/src/agent/approval-execution.ts @@ -10,6 +10,7 @@ import type { ToolReturnMessage } from "@letta-ai/letta-client/resources/tools"; import type { ApprovalRequest } from "../cli/helpers/stream"; import { INTERRUPTED_BY_USER } from "../constants"; import { + captureToolExecutionContext, executeTool, type ToolExecutionResult, type ToolReturnContent, @@ -135,6 +136,7 @@ const GLOBAL_LOCK_TOOLS = new Set([ export function getResourceKey( toolName: string, toolArgs: Record, + workingDirectory: string = process.env.USER_CWD || process.cwd(), ): string { // Global lock tools serialize with everything if (GLOBAL_LOCK_TOOLS.has(toolName)) { @@ -146,10 +148,9 @@ export function getResourceKey( const filePath = toolArgs.file_path; if (typeof filePath === "string") { // Normalize to absolute path for consistent comparison - const userCwd = process.env.USER_CWD || process.cwd(); return path.isAbsolute(filePath) ? path.normalize(filePath) - : path.resolve(userCwd, filePath); + : path.resolve(workingDirectory, filePath); } } @@ -360,8 +361,15 @@ export async function executeApprovalBatch( isStderr?: boolean, ) => void; toolContextId?: string; + workingDirectory?: string; }, ): Promise { + const toolContextId = + options?.toolContextId ?? + (options?.workingDirectory + ? captureToolExecutionContext(options.workingDirectory).contextId + : undefined); + // Pre-allocate results array to maintain original order const results: (ApprovalResult | null)[] = new Array(decisions.length).fill( null, @@ -399,7 +407,11 @@ export async function executeApprovalBatch( } else { args = decision.approval.toolArgs || {}; } - const resourceKey = getResourceKey(toolName, args); + const resourceKey = getResourceKey( + toolName, + args, + options?.workingDirectory, + ); const indices = writeToolsByResource.get(resourceKey) || []; indices.push(i); @@ -411,7 +423,10 @@ export async function executeApprovalBatch( const execute = async (i: number) => { const decision = decisions[i]; if (decision) { - results[i] = await executeSingleDecision(decision, onChunk, options); + results[i] = await executeSingleDecision(decision, onChunk, { + ...options, + toolContextId, + }); } }; @@ -456,6 +471,7 @@ export async function executeAutoAllowedTools( isStderr?: boolean, ) => void; toolContextId?: string; + workingDirectory?: string; }, ): Promise { const decisions: ApprovalDecision[] = autoAllowed.map((ac) => ({ diff --git a/src/agent/check-approval.ts b/src/agent/check-approval.ts index d105c4a..cd2d51e 100644 --- a/src/agent/check-approval.ts +++ b/src/agent/check-approval.ts @@ -479,14 +479,11 @@ export async function getResumeData( // may not support this pattern) if (includeMessageHistory && isBackfillEnabled()) { try { - const messagesPage = await client.conversations.messages.list( - "default", - { - limit: BACKFILL_PAGE_LIMIT, - order: "desc", - agent_id: agent.id, - }, - ); + const messagesPage = await client.agents.messages.list(agent.id, { + conversation_id: "default", + limit: BACKFILL_PAGE_LIMIT, + order: "desc", + }); messages = sortChronological(messagesPage.getPaginatedItems()); if (process.env.DEBUG) { diff --git a/src/agent/message.ts b/src/agent/message.ts index 1691714..1c966f5 100644 --- a/src/agent/message.ts +++ b/src/agent/message.ts @@ -55,6 +55,7 @@ export type SendMessageStreamOptions = { background?: boolean; agentId?: string; // Required when conversationId is "default" approvalNormalization?: ApprovalNormalizationOptions; + workingDirectory?: string; }; export function buildConversationMessagesCreateRequestBody( @@ -118,7 +119,9 @@ export async function sendMessageStream( // Wait for any in-progress toolset switch to complete before reading tools // This prevents sending messages with stale tools during a switch await waitForToolsetReady(); - const { clientTools, contextId } = captureToolExecutionContext(); + const { clientTools, contextId } = captureToolExecutionContext( + opts.workingDirectory, + ); const { clientSkills, errors: clientSkillDiscoveryErrors } = await buildClientSkillsPayload({ agentId: opts.agentId, diff --git a/src/agent/modify.ts b/src/agent/modify.ts index 987e885..514f012 100644 --- a/src/agent/modify.ts +++ b/src/agent/modify.ts @@ -278,7 +278,8 @@ export async function recompileAgentSystemPrompt( options: RecompileAgentSystemPromptOptions = {}, clientOverride?: AgentSystemPromptRecompileClient, ): Promise { - const client = clientOverride ?? (await getClient()); + const client = (clientOverride ?? + (await getClient())) as AgentSystemPromptRecompileClient; return client.agents.recompile(agentId, { dry_run: options.dryRun, diff --git a/src/cli/components/ConversationSelector.tsx b/src/cli/components/ConversationSelector.tsx index 72ecde8..a7537a6 100644 --- a/src/cli/components/ConversationSelector.tsx +++ b/src/cli/components/ConversationSelector.tsx @@ -242,14 +242,11 @@ export function ConversationSelector({ let defaultConversation: EnrichedConversation | null = null; if (!afterCursor) { try { - const defaultMessages = await client.conversations.messages.list( - "default", - { - limit: 20, - order: "desc", - agent_id: agentId, - }, - ); + const defaultMessages = await client.agents.messages.list(agentId, { + conversation_id: "default", + limit: 20, + order: "desc", + }); const defaultMsgItems = defaultMessages.getPaginatedItems(); if (defaultMsgItems.length > 0) { const defaultStats = getMessageStats( diff --git a/src/cli/helpers/approvalClassification.ts b/src/cli/helpers/approvalClassification.ts index 0404e75..1bd1eb6 100644 --- a/src/cli/helpers/approvalClassification.ts +++ b/src/cli/helpers/approvalClassification.ts @@ -24,6 +24,7 @@ export type ClassifyApprovalsOptions = { getContext?: ( toolName: string, parsedArgs: Record, + workingDirectory?: string, ) => Promise; alwaysRequiresUserInput?: (toolName: string) => boolean; treatAskAsDeny?: boolean; @@ -31,6 +32,7 @@ export type ClassifyApprovalsOptions = { missingNameReason?: string; requireArgsForAutoApprove?: boolean; missingArgsReason?: (missing: string[]) => string; + workingDirectory?: string; }; export async function getMissingRequiredArgs( @@ -74,9 +76,13 @@ export async function classifyApprovals( approval.toolArgs || "{}", {}, ); - const permission = await checkToolPermission(toolName, parsedArgs); + const permission = await checkToolPermission( + toolName, + parsedArgs, + opts.workingDirectory, + ); const context = opts.getContext - ? await opts.getContext(toolName, parsedArgs) + ? await opts.getContext(toolName, parsedArgs, opts.workingDirectory) : null; let decision = permission.decision; diff --git a/src/cli/subcommands/messages.ts b/src/cli/subcommands/messages.ts index 193770c..9bc339f 100644 --- a/src/cli/subcommands/messages.ts +++ b/src/cli/subcommands/messages.ts @@ -159,12 +159,12 @@ export async function runMessagesSubcommand(argv: string[]): Promise { return 1; } - const response = await client.conversations.messages.list("default", { + const response = await client.agents.messages.list(agentId, { + conversation_id: "default", limit: parseLimit(parsed.values.limit, 20), after: parsed.values.after, before: parsed.values.before, order, - agent_id: agentId, }); const messages = response.getPaginatedItems() ?? []; diff --git a/src/helpers/diffPreview.ts b/src/helpers/diffPreview.ts index 394de3f..a0cb436 100644 --- a/src/helpers/diffPreview.ts +++ b/src/helpers/diffPreview.ts @@ -4,7 +4,7 @@ * and only sends hunks, which is sufficient for rendering. */ -import { basename } from "node:path"; +import path, { basename } from "node:path"; import type { AdvancedDiffResult, AdvancedHunk } from "../cli/helpers/diff"; import type { DiffHunk, DiffHunkLine, DiffPreview } from "../types/protocol"; @@ -124,6 +124,7 @@ async function getDiffDeps(): Promise { export async function computeDiffPreviews( toolName: string, toolArgs: Record, + workingDirectory: string = process.env.USER_CWD || process.cwd(), ): Promise { const { computeAdvancedDiff, @@ -139,9 +140,12 @@ export async function computeDiffPreviews( if (isFileWriteTool(toolName)) { const filePath = toolArgs.file_path as string | undefined; if (filePath) { + const resolvedFilePath = path.isAbsolute(filePath) + ? filePath + : path.resolve(workingDirectory, filePath); const result = computeAdvancedDiff({ kind: "write", - filePath, + filePath: resolvedFilePath, content: (toolArgs.content as string) || "", }); previews.push(toDiffPreview(result, basename(filePath))); @@ -149,10 +153,13 @@ export async function computeDiffPreviews( } else if (isFileEditTool(toolName)) { const filePath = toolArgs.file_path as string | undefined; if (filePath) { + const resolvedFilePath = path.isAbsolute(filePath) + ? filePath + : path.resolve(workingDirectory, filePath); if (toolArgs.edits && Array.isArray(toolArgs.edits)) { const result = computeAdvancedDiff({ kind: "multi_edit", - filePath, + filePath: resolvedFilePath, edits: toolArgs.edits as Array<{ old_string: string; new_string: string; @@ -163,7 +170,7 @@ export async function computeDiffPreviews( } else { const result = computeAdvancedDiff({ kind: "edit", - filePath, + filePath: resolvedFilePath, oldString: (toolArgs.old_string as string) || "", newString: (toolArgs.new_string as string) || "", replaceAll: toolArgs.replace_all as boolean | undefined, diff --git a/src/tests/agent/getResumeData.test.ts b/src/tests/agent/getResumeData.test.ts index b2aa47b..80e3af5 100644 --- a/src/tests/agent/getResumeData.test.ts +++ b/src/tests/agent/getResumeData.test.ts @@ -103,7 +103,7 @@ describe("getResumeData", () => { const conversationsRetrieve = mock(async () => ({ in_context_message_ids: ["msg-last"], })); - const conversationsList = mock(async () => ({ + const agentsList = mock(async () => ({ getPaginatedItems: () => [ makeUserMessage("msg-a"), makeUserMessage("msg-b"), @@ -114,15 +114,15 @@ describe("getResumeData", () => { const client = { conversations: { retrieve: conversationsRetrieve, - messages: { list: conversationsList }, }, + agents: { messages: { list: agentsList } }, messages: { retrieve: messagesRetrieve }, } as unknown as Letta; const resume = await getResumeData(client, makeAgent(), "default"); expect(messagesRetrieve).toHaveBeenCalledTimes(1); - expect(conversationsList).toHaveBeenCalledTimes(1); + expect(agentsList).toHaveBeenCalledTimes(1); expect(resume.pendingApprovals).toHaveLength(0); expect(resume.messageHistory.length).toBeGreaterThan(0); }); diff --git a/src/tests/helpers/diffPreview.test.ts b/src/tests/helpers/diffPreview.test.ts index 64ae5c2..5bd1803 100644 --- a/src/tests/helpers/diffPreview.test.ts +++ b/src/tests/helpers/diffPreview.test.ts @@ -1,4 +1,7 @@ import { describe, expect, it } from "bun:test"; +import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; import type { AdvancedDiffFallback, AdvancedDiffSuccess, @@ -213,4 +216,32 @@ describe("computeDiffPreviews", () => { expect(previews).toHaveLength(2); expect(previews.map((p) => p.fileName).sort()).toEqual(["a.txt", "b.txt"]); }); + + it("resolves relative file paths against the provided working directory", async () => { + const tempRoot = await mkdtemp( + path.join(os.tmpdir(), "letta-diff-preview-"), + ); + const workspaceDir = path.join(tempRoot, "workspace"); + const nestedDir = path.join(workspaceDir, "nested"); + const targetFile = path.join(nestedDir, "sample.txt"); + await mkdir(nestedDir, { recursive: true }); + await writeFile(targetFile, "old content", "utf8"); + + try { + const previews = await computeDiffPreviews( + "edit", + { + file_path: "nested/sample.txt", + old_string: "old content", + new_string: "new content", + }, + workspaceDir, + ); + expect(previews).toHaveLength(1); + expect(previews[0]?.mode).toBe("advanced"); + expect(previews[0]?.fileName).toBe("sample.txt"); + } finally { + await rm(tempRoot, { recursive: true, force: true }); + } + }); }); diff --git a/src/tests/websocket/listen-client-protocol.test.ts b/src/tests/websocket/listen-client-protocol.test.ts index e7d4f9b..8a5d897 100644 --- a/src/tests/websocket/listen-client-protocol.test.ts +++ b/src/tests/websocket/listen-client-protocol.test.ts @@ -1,4 +1,7 @@ import { describe, expect, test } from "bun:test"; +import { mkdir, mkdtemp, realpath, rm } from "node:fs/promises"; +import os from "node:os"; +import { join } from "node:path"; import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages"; import WebSocket from "ws"; import { buildConversationMessagesCreateRequestBody } from "../../agent/message"; @@ -254,6 +257,114 @@ describe("listen-client state_response control protocol", () => { expect(typeof snapshot.cwd).toBe("string"); expect(snapshot.cwd.length).toBeGreaterThan(0); + expect(snapshot.configured_cwd).toBe(snapshot.cwd); + expect(snapshot.active_turn_cwd).toBeNull(); + expect(snapshot.cwd_agent_id).toBeNull(); + expect(snapshot.cwd_conversation_id).toBe("default"); + }); + + test("scopes configured and active cwd to the requested agent and conversation", () => { + const runtime = __listenClientTestUtils.createRuntime(); + __listenClientTestUtils.setConversationWorkingDirectory( + runtime, + "agent-a", + "conv-a", + "/repo/a", + ); + __listenClientTestUtils.setConversationWorkingDirectory( + runtime, + "agent-b", + "default", + "/repo/b", + ); + runtime.activeAgentId = "agent-a"; + runtime.activeConversationId = "conv-a"; + runtime.activeWorkingDirectory = "/repo/a"; + + const activeSnapshot = __listenClientTestUtils.buildStateResponse( + runtime, + 2, + "agent-a", + "conv-a", + ); + expect(activeSnapshot.configured_cwd).toBe("/repo/a"); + expect(activeSnapshot.active_turn_cwd).toBe("/repo/a"); + expect(activeSnapshot.cwd_agent_id).toBe("agent-a"); + expect(activeSnapshot.cwd_conversation_id).toBe("conv-a"); + + const defaultSnapshot = __listenClientTestUtils.buildStateResponse( + runtime, + 3, + "agent-b", + "default", + ); + expect(defaultSnapshot.configured_cwd).toBe("/repo/b"); + expect(defaultSnapshot.active_turn_cwd).toBeNull(); + expect(defaultSnapshot.cwd_agent_id).toBe("agent-b"); + expect(defaultSnapshot.cwd_conversation_id).toBe("default"); + }); +}); + +describe("listen-client cwd change handling", () => { + test("resolves relative cwd changes against the conversation cwd and preserves active turn cwd", async () => { + const runtime = __listenClientTestUtils.createRuntime(); + const socket = new MockSocket(WebSocket.OPEN); + const tempRoot = await mkdtemp(join(os.tmpdir(), "letta-listen-cwd-")); + const repoDir = join(tempRoot, "repo"); + const serverDir = join(repoDir, "server"); + const clientDir = join(repoDir, "client"); + await mkdir(serverDir, { recursive: true }); + await mkdir(clientDir, { recursive: true }); + const normalizedServerDir = await realpath(serverDir); + const normalizedClientDir = await realpath(clientDir); + + try { + __listenClientTestUtils.setConversationWorkingDirectory( + runtime, + "agent-1", + "conv-1", + normalizedServerDir, + ); + runtime.activeAgentId = "agent-1"; + runtime.activeConversationId = "conv-1"; + runtime.activeWorkingDirectory = normalizedServerDir; + + await __listenClientTestUtils.handleCwdChange( + { + type: "change_cwd", + agentId: "agent-1", + conversationId: "conv-1", + cwd: "../client", + }, + socket as unknown as WebSocket, + runtime, + ); + + expect( + __listenClientTestUtils.getConversationWorkingDirectory( + runtime, + "agent-1", + "conv-1", + ), + ).toBe(normalizedClientDir); + + expect(socket.sentPayloads).toHaveLength(2); + const changed = JSON.parse(socket.sentPayloads[0] as string); + expect(changed.type).toBe("cwd_changed"); + expect(changed.success).toBe(true); + expect(changed.agent_id).toBe("agent-1"); + expect(changed.cwd).toBe(normalizedClientDir); + expect(changed.conversation_id).toBe("conv-1"); + + const snapshot = JSON.parse(socket.sentPayloads[1] as string); + expect(snapshot.type).toBe("state_response"); + expect(snapshot.configured_cwd).toBe(normalizedClientDir); + expect(snapshot.active_turn_cwd).toBe(normalizedServerDir); + expect(snapshot.cwd_agent_id).toBe("agent-1"); + expect(snapshot.cwd_conversation_id).toBe("conv-1"); + } finally { + await rm(tempRoot, { recursive: true, force: true }); + } }); }); diff --git a/src/tools/manager.ts b/src/tools/manager.ts index 74b85f9..082b5f6 100644 --- a/src/tools/manager.ts +++ b/src/tools/manager.ts @@ -278,6 +278,7 @@ type ToolExecutionContextSnapshot = { toolRegistry: ToolRegistry; externalTools: Map; externalExecutor?: ExternalToolExecutor; + workingDirectory: string; }; export type CapturedToolExecutionContext = { @@ -586,11 +587,14 @@ export function getClientToolsFromRegistry(): ClientTool[] { * The returned context id can be used later to execute tool calls against this * exact snapshot even if the global registry changes between dispatch and execute. */ -export function captureToolExecutionContext(): CapturedToolExecutionContext { +export function captureToolExecutionContext( + workingDirectory: string = process.env.USER_CWD || process.cwd(), +): CapturedToolExecutionContext { const snapshot: ToolExecutionContextSnapshot = { toolRegistry: new Map(toolRegistry), externalTools: new Map(getExternalToolsRegistry()), externalExecutor: getExternalToolExecutor(), + workingDirectory, }; const contextId = saveExecutionContext(snapshot); @@ -615,6 +619,27 @@ export function captureToolExecutionContext(): CapturedToolExecutionContext { }; } +async function withExecutionWorkingDirectory( + workingDirectory: string | undefined, + fn: () => Promise, +): Promise { + if (!workingDirectory) { + return fn(); + } + + const previousUserCwd = process.env.USER_CWD; + process.env.USER_CWD = workingDirectory; + try { + return await fn(); + } finally { + if (previousUserCwd === undefined) { + delete process.env.USER_CWD; + } else { + process.env.USER_CWD = previousUserCwd; + } + } +} + /** * Get permissions for a specific tool. * @param toolName - The name of the tool @@ -1158,6 +1183,7 @@ export async function executeTool( context?.externalTools ?? getExternalToolsRegistry(); const activeExternalExecutor = context?.externalExecutor ?? getExternalToolExecutor(); + const workingDirectory = context?.workingDirectory; // Check if this is an external tool (SDK-executed) if (activeExternalTools.has(name)) { @@ -1192,6 +1218,7 @@ export async function executeTool( internalName, args as Record, options?.toolCallId, + workingDirectory, ); if (preHookResult.blocked) { const feedback = preHookResult.feedback.join("\n") || "Blocked by hook"; @@ -1229,7 +1256,9 @@ export async function executeTool( enhancedArgs = { ...enhancedArgs, toolCallId: options.toolCallId }; } - const result = await tool.fn(enhancedArgs); + const result = await withExecutionWorkingDirectory(workingDirectory, () => + tool.fn(enhancedArgs), + ); const duration = Date.now() - startTime; // Extract stdout/stderr if present (for bash tools) @@ -1271,7 +1300,7 @@ export async function executeTool( output: getDisplayableToolReturn(flattenedResponse), }, options?.toolCallId, - undefined, // workingDirectory + workingDirectory, undefined, // agentId undefined, // precedingReasoning - not available in tool manager context undefined, // precedingAssistantMessage - not available in tool manager context @@ -1295,7 +1324,7 @@ export async function executeTool( errorOutput, "tool_error", // error type for returned errors options?.toolCallId, - undefined, // workingDirectory + workingDirectory, undefined, // agentId undefined, // precedingReasoning - not available in tool manager context undefined, // precedingAssistantMessage - not available in tool manager context @@ -1378,7 +1407,7 @@ export async function executeTool( args as Record, { status: "error", output: errorMessage }, options?.toolCallId, - undefined, // workingDirectory + workingDirectory, undefined, // agentId undefined, // precedingReasoning - not available in tool manager context undefined, // precedingAssistantMessage - not available in tool manager context @@ -1397,7 +1426,7 @@ export async function executeTool( errorMessage, errorType, options?.toolCallId, - undefined, // workingDirectory + workingDirectory, undefined, // agentId undefined, // precedingReasoning - not available in tool manager context undefined, // precedingAssistantMessage - not available in tool manager context diff --git a/src/websocket/listen-client.ts b/src/websocket/listen-client.ts index c9ee9fd..a4b5e9a 100644 --- a/src/websocket/listen-client.ts +++ b/src/websocket/listen-client.ts @@ -3,6 +3,8 @@ * Connects to Letta Cloud and receives messages to execute locally */ +import { realpath, stat } from "node:fs/promises"; +import path from "node:path"; import { APIError } from "@letta-ai/letta-client/core/error"; import type { Stream } from "@letta-ai/letta-client/core/streaming"; import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents"; @@ -157,6 +159,15 @@ interface GetStatusMessage { interface GetStateMessage { type: "get_state"; + agentId?: string | null; + conversationId?: string | null; +} + +interface ChangeCwdMessage { + type: "change_cwd"; + agentId?: string | null; + conversationId?: string | null; + cwd: string; } interface CancelRunMessage { @@ -188,6 +199,10 @@ interface StateResponseMessage { generated_at: string; state_seq: number; cwd: string; + configured_cwd: string; + active_turn_cwd: string | null; + cwd_agent_id: string | null; + cwd_conversation_id: string | null; mode: "default" | "acceptEdits" | "plan" | "bypassPermissions"; is_processing: boolean; last_stop_reason: string | null; @@ -223,6 +238,17 @@ interface StateResponseMessage { event_seq?: number; } +interface CwdChangedMessage { + type: "cwd_changed"; + agent_id: string | null; + conversation_id: string; + cwd: string; + success: boolean; + error?: string; + event_seq?: number; + session_id?: string; +} + type ServerMessage = | PongMessage | StatusMessage @@ -230,6 +256,7 @@ type ServerMessage = | ModeChangeMessage | GetStatusMessage | GetStateMessage + | ChangeCwdMessage | CancelRunMessage | RecoverPendingApprovalsMessage | WsControlResponse; @@ -238,6 +265,7 @@ type ClientMessage = | RunStartedMessage | RunRequestErrorMessage | ModeChangedMessage + | CwdChangedMessage | StatusResponseMessage | StateResponseMessage; @@ -266,6 +294,7 @@ type ListenerRuntime = { /** Active run metadata for reconnect snapshot state. */ activeAgentId: string | null; activeConversationId: string | null; + activeWorkingDirectory: string | null; activeRunId: string | null; activeRunStartedAt: string | null; /** Abort controller for the currently active message turn. */ @@ -311,6 +340,8 @@ type ListenerRuntime = { * Threaded into the next send for persistence normalization. */ pendingInterruptedToolCallIds: string[] | null; + bootWorkingDirectory: string; + workingDirectoryByConversation: Map; }; // Listen mode supports one active connection per process. @@ -354,11 +385,94 @@ function handleModeChange(msg: ModeChangeMessage, socket: WebSocket): void { } } +function normalizeCwdAgentId(agentId?: string | null): string | null { + return agentId && agentId.length > 0 ? agentId : null; +} + +function getWorkingDirectoryScopeKey( + agentId?: string | null, + conversationId?: string | null, +): string { + const normalizedConversationId = normalizeConversationId(conversationId); + const normalizedAgentId = normalizeCwdAgentId(agentId); + if (normalizedConversationId === "default") { + return `agent:${normalizedAgentId ?? "__unknown__"}::conversation:default`; + } + + return `conversation:${normalizedConversationId}`; +} + +async function handleCwdChange( + msg: ChangeCwdMessage, + socket: WebSocket, + runtime: ListenerRuntime, +): Promise { + const conversationId = normalizeConversationId(msg.conversationId); + const agentId = normalizeCwdAgentId(msg.agentId); + const currentWorkingDirectory = getConversationWorkingDirectory( + runtime, + agentId, + conversationId, + ); + + try { + const requestedPath = msg.cwd?.trim(); + if (!requestedPath) { + throw new Error("Working directory cannot be empty"); + } + + const resolvedPath = path.isAbsolute(requestedPath) + ? requestedPath + : path.resolve(currentWorkingDirectory, requestedPath); + const normalizedPath = await realpath(resolvedPath); + const stats = await stat(normalizedPath); + if (!stats.isDirectory()) { + throw new Error(`Not a directory: ${normalizedPath}`); + } + + setConversationWorkingDirectory( + runtime, + agentId, + conversationId, + normalizedPath, + ); + sendClientMessage( + socket, + { + type: "cwd_changed", + agent_id: agentId, + conversation_id: conversationId, + cwd: normalizedPath, + success: true, + }, + runtime, + ); + sendStateSnapshot(socket, runtime, agentId, conversationId); + } catch (error) { + sendClientMessage( + socket, + { + type: "cwd_changed", + agent_id: agentId, + conversation_id: conversationId, + cwd: msg.cwd, + success: false, + error: + error instanceof Error + ? error.message + : "Working directory change failed", + }, + runtime, + ); + } +} + const MAX_RETRY_DURATION_MS = 5 * 60 * 1000; // 5 minutes const INITIAL_RETRY_DELAY_MS = 1000; // 1 second const MAX_RETRY_DELAY_MS = 30000; // 30 seconds function createRuntime(): ListenerRuntime { + const bootWorkingDirectory = process.env.USER_CWD || process.cwd(); const runtime: ListenerRuntime = { socket: null, heartbeatInterval: null, @@ -373,6 +487,7 @@ function createRuntime(): ListenerRuntime { isProcessing: false, activeAgentId: null, activeConversationId: null, + activeWorkingDirectory: null, activeRunId: null, activeRunStartedAt: null, activeAbortController: null, @@ -384,6 +499,8 @@ function createRuntime(): ListenerRuntime { continuationEpoch: 0, activeExecutingToolCallIds: [], pendingInterruptedToolCallIds: null, + bootWorkingDirectory, + workingDirectoryByConversation: new Map(), coalescedSkipQueueItemIds: new Set(), pendingTurns: 0, // queueRuntime assigned below — needs runtime ref in callbacks @@ -462,6 +579,39 @@ function createRuntime(): ListenerRuntime { return runtime; } +function normalizeConversationId(conversationId?: string | null): string { + return conversationId && conversationId.length > 0 + ? conversationId + : "default"; +} + +function getConversationWorkingDirectory( + runtime: ListenerRuntime, + agentId?: string | null, + conversationId?: string | null, +): string { + const scopeKey = getWorkingDirectoryScopeKey(agentId, conversationId); + return ( + runtime.workingDirectoryByConversation.get(scopeKey) ?? + runtime.bootWorkingDirectory + ); +} + +function setConversationWorkingDirectory( + runtime: ListenerRuntime, + agentId: string | null, + conversationId: string, + workingDirectory: string, +): void { + const scopeKey = getWorkingDirectoryScopeKey(agentId, conversationId); + if (workingDirectory === runtime.bootWorkingDirectory) { + runtime.workingDirectoryByConversation.delete(scopeKey); + return; + } + + runtime.workingDirectoryByConversation.set(scopeKey, workingDirectory); +} + function clearRuntimeTimers(runtime: ListenerRuntime): void { if (runtime.reconnectTimeout) { clearTimeout(runtime.reconnectTimeout); @@ -476,6 +626,7 @@ function clearRuntimeTimers(runtime: ListenerRuntime): void { function clearActiveRunState(runtime: ListenerRuntime): void { runtime.activeAgentId = null; runtime.activeConversationId = null; + runtime.activeWorkingDirectory = null; runtime.activeRunId = null; runtime.activeRunStartedAt = null; runtime.activeAbortController = null; @@ -615,6 +766,7 @@ export function parseServerMessage( parsed.type === "mode_change" || parsed.type === "get_status" || parsed.type === "get_state" || + parsed.type === "change_cwd" || parsed.type === "cancel_run" || parsed.type === "recover_pending_approvals" ) { @@ -692,7 +844,21 @@ function mergeDequeuedBatchContent( function buildStateResponse( runtime: ListenerRuntime, stateSeq: number, + agentId?: string | null, + conversationId?: string | null, ): StateResponseMessage { + const scopedAgentId = normalizeCwdAgentId(agentId); + const scopedConversationId = normalizeConversationId(conversationId); + const configuredWorkingDirectory = getConversationWorkingDirectory( + runtime, + scopedAgentId, + scopedConversationId, + ); + const activeTurnWorkingDirectory = + runtime.activeAgentId === scopedAgentId && + runtime.activeConversationId === scopedConversationId + ? runtime.activeWorkingDirectory + : null; const queueItems = runtime.queueRuntime.items.map((item) => ({ id: item.id, client_message_id: item.clientMessageId ?? `cm-${item.id}`, @@ -724,7 +890,11 @@ function buildStateResponse( generated_at: new Date().toISOString(), state_seq: stateSeq, event_seq: stateSeq, - cwd: process.env.USER_CWD || process.cwd(), + cwd: configuredWorkingDirectory, + configured_cwd: configuredWorkingDirectory, + active_turn_cwd: activeTurnWorkingDirectory, + cwd_agent_id: scopedAgentId, + cwd_conversation_id: scopedConversationId, mode: permissionMode.getMode(), is_processing: runtime.isProcessing, last_stop_reason: runtime.lastStopReason, @@ -745,12 +915,22 @@ function buildStateResponse( }; } -function sendStateSnapshot(socket: WebSocket, runtime: ListenerRuntime): void { +function sendStateSnapshot( + socket: WebSocket, + runtime: ListenerRuntime, + agentId?: string | null, + conversationId?: string | null, +): void { const stateSeq = nextEventSeq(runtime); if (stateSeq === null) { return; } - const stateResponse = buildStateResponse(runtime, stateSeq); + const stateResponse = buildStateResponse( + runtime, + stateSeq, + agentId, + conversationId, + ); sendClientMessage(socket, stateResponse, runtime); } @@ -1508,6 +1688,13 @@ async function resolveStaleApprovals( agentId: runtime.activeAgentId, streamTokens: true, background: true, + workingDirectory: + runtime.activeWorkingDirectory ?? + getConversationWorkingDirectory( + runtime, + runtime.activeAgentId, + recoveryConversationId, + ), }, { maxRetries: 0, signal: abortSignal }, ); @@ -1751,6 +1938,17 @@ async function recoverPendingApprovals( const requestedConversationId = msg.conversationId || undefined; const conversationId = requestedConversationId ?? "default"; + const recoveryAgentId = normalizeCwdAgentId(agentId); + const recoveryWorkingDirectory = + runtime.activeAgentId === recoveryAgentId && + runtime.activeConversationId === conversationId && + runtime.activeWorkingDirectory + ? runtime.activeWorkingDirectory + : getConversationWorkingDirectory( + runtime, + recoveryAgentId, + conversationId, + ); const client = await getClient(); const agent = await client.agents.retrieve(agentId); @@ -1814,6 +2012,7 @@ async function recoverPendingApprovals( alwaysRequiresUserInput: isInteractiveApprovalTool, treatAskAsDeny: false, requireArgsForAutoApprove: true, + workingDirectory: recoveryWorkingDirectory, }, ); @@ -1858,6 +2057,7 @@ async function recoverPendingApprovals( const diffs = await computeDiffPreviews( ac.approval.toolName, ac.parsedArgs, + recoveryWorkingDirectory, ); const controlRequest: ControlRequest = { @@ -1931,7 +2131,9 @@ async function recoverPendingApprovals( return; } - const executionResults = await executeApprovalBatch(decisions); + const executionResults = await executeApprovalBatch(decisions, undefined, { + workingDirectory: recoveryWorkingDirectory, + }); clearPendingApprovalBatchIds( runtime, decisions.map((decision) => decision.approval), @@ -2115,6 +2317,15 @@ async function connectWithRetry( return; } + if (parsed.type === "change_cwd") { + if (runtime !== activeRuntime || runtime.intentionallyClosed) { + return; + } + + void handleCwdChange(parsed, socket, runtime); + return; + } + // Handle status request from cloud (immediate response) if (parsed.type === "get_status") { if (runtime !== activeRuntime || runtime.intentionallyClosed) { @@ -2219,12 +2430,21 @@ async function connectWithRetry( if (runtime !== activeRuntime || runtime.intentionallyClosed) { return; } + const requestedConversationId = normalizeConversationId( + parsed.conversationId, + ); + const requestedAgentId = normalizeCwdAgentId(parsed.agentId); // If we're blocked on an approval callback, don't queue behind the // pending turn; respond immediately so refreshed clients can render the // approval card needed to unblock execution. if (runtime.pendingApprovalResolvers.size > 0) { - sendStateSnapshot(socket, runtime); + sendStateSnapshot( + socket, + runtime, + requestedAgentId, + requestedConversationId, + ); return; } @@ -2236,7 +2456,12 @@ async function connectWithRetry( return; } - sendStateSnapshot(socket, runtime); + sendStateSnapshot( + socket, + runtime, + requestedAgentId, + requestedConversationId, + ); }) .catch((error: unknown) => { if (process.env.DEBUG) { @@ -2507,6 +2732,12 @@ async function handleIncomingMessage( const agentId = msg.agentId; const requestedConversationId = msg.conversationId || undefined; const conversationId = requestedConversationId ?? "default"; + const normalizedAgentId = normalizeCwdAgentId(agentId); + const turnWorkingDirectory = getConversationWorkingDirectory( + runtime, + normalizedAgentId, + conversationId, + ); const msgStartTime = performance.now(); let msgTurnCount = 0; const msgRunIds: string[] = []; @@ -2523,6 +2754,7 @@ async function handleIncomingMessage( runtime.activeAbortController = new AbortController(); runtime.activeAgentId = agentId ?? null; runtime.activeConversationId = conversationId; + runtime.activeWorkingDirectory = turnWorkingDirectory; runtime.activeRunId = null; runtime.activeRunStartedAt = new Date().toISOString(); runtime.activeExecutingToolCallIds = []; @@ -2566,6 +2798,7 @@ async function handleIncomingMessage( agentId, streamTokens: true, background: true, + workingDirectory: turnWorkingDirectory, ...(queuedInterruptedToolCallIds.length > 0 ? { approvalNormalization: { @@ -2847,6 +3080,7 @@ async function handleIncomingMessage( alwaysRequiresUserInput: isInteractiveApprovalTool, treatAskAsDeny: false, // Let cloud UI handle approvals requireArgsForAutoApprove: true, + workingDirectory: turnWorkingDirectory, }); // Snapshot all tool_call_ids before entering approval wait so cancel can @@ -2917,6 +3151,7 @@ async function handleIncomingMessage( const diffs = await computeDiffPreviews( ac.approval.toolName, ac.parsedArgs, + turnWorkingDirectory, ); const controlRequest: ControlRequest = { @@ -3003,6 +3238,7 @@ async function handleIncomingMessage( { toolContextId: turnToolContextId ?? undefined, abortSignal: runtime.activeAbortController.signal, + workingDirectory: turnWorkingDirectory, }, ); const persistedExecutionResults = @@ -3172,12 +3408,15 @@ export const __listenClientTestUtils = { createRuntime, stopRuntime, buildStateResponse, + handleCwdChange, emitToWS, + getConversationWorkingDirectory, rememberPendingApprovalBatchIds, resolvePendingApprovalBatchId, resolveRecoveryBatchId, clearPendingApprovalBatchIds, populateInterruptQueue, + setConversationWorkingDirectory, consumeInterruptQueue, extractInterruptToolReturns, emitInterruptToolReturnMessage,