diff --git a/src/tests/websocket/listen-client-protocol.test.ts b/src/tests/websocket/listen-client-protocol.test.ts index 8881030..ad8debc 100644 --- a/src/tests/websocket/listen-client-protocol.test.ts +++ b/src/tests/websocket/listen-client-protocol.test.ts @@ -330,6 +330,12 @@ describe("listen-client state_response control protocol", () => { expect(snapshot.control_response_capable).toBe(true); }); + test("advertises tool lifecycle capability for device clients", () => { + const runtime = __listenClientTestUtils.createRuntime(); + const snapshot = __listenClientTestUtils.buildStateResponse(runtime, 2); + expect(snapshot.tool_lifecycle_capable).toBe(true); + }); + test("includes the effective working directory", () => { const runtime = __listenClientTestUtils.createRuntime(); const snapshot = __listenClientTestUtils.buildStateResponse(runtime, 1); @@ -818,6 +824,70 @@ describe("listen-client emitToWS adapter", () => { // Verify it's a UUID format after the prefix expect(runtime.sessionId.length).toBeGreaterThan(10); }); + + test("emits approval lifecycle events", () => { + const socket = new MockSocket(WebSocket.OPEN); + const requested = { + type: "approval_requested" as const, + request_id: "perm-tool-1", + tool_call_id: "tool-1", + tool_name: "Bash", + run_id: "run-1", + session_id: "listen-test", + uuid: "approval-requested-1", + }; + const received = { + type: "approval_received" as const, + request_id: "perm-tool-1", + tool_call_id: "tool-1", + decision: "allow" as const, + reason: "Approved via WebSocket", + run_id: "run-1", + session_id: "listen-test", + uuid: "approval-received-1", + }; + + __listenClientTestUtils.emitToWS(socket as unknown as WebSocket, requested); + __listenClientTestUtils.emitToWS(socket as unknown as WebSocket, received); + + expect(socket.sentPayloads).toHaveLength(2); + const sentRequested = JSON.parse(socket.sentPayloads[0] as string); + const sentReceived = JSON.parse(socket.sentPayloads[1] as string); + expect(sentRequested.type).toBe("approval_requested"); + expect(sentRequested.request_id).toBe("perm-tool-1"); + expect(sentReceived.type).toBe("approval_received"); + expect(sentReceived.decision).toBe("allow"); + }); + + test("emits tool execution lifecycle events", () => { + const socket = new MockSocket(WebSocket.OPEN); + const started = { + type: "tool_execution_started" as const, + tool_call_id: "tool-2", + run_id: "run-2", + session_id: "listen-test", + uuid: "tool-exec-started-2", + }; + const finished = { + type: "tool_execution_finished" as const, + tool_call_id: "tool-2", + status: "success" as const, + run_id: "run-2", + session_id: "listen-test", + uuid: "tool-exec-finished-2", + }; + + __listenClientTestUtils.emitToWS(socket as unknown as WebSocket, started); + __listenClientTestUtils.emitToWS(socket as unknown as WebSocket, finished); + + expect(socket.sentPayloads).toHaveLength(2); + const sentStarted = JSON.parse(socket.sentPayloads[0] as string); + const sentFinished = JSON.parse(socket.sentPayloads[1] as string); + expect(sentStarted.type).toBe("tool_execution_started"); + expect(sentStarted.tool_call_id).toBe("tool-2"); + expect(sentFinished.type).toBe("tool_execution_finished"); + expect(sentFinished.status).toBe("success"); + }); }); describe("listen-client post-stop approval recovery policy", () => { diff --git a/src/types/protocol.ts b/src/types/protocol.ts index dd2bc04..b5246fc 100644 --- a/src/types/protocol.ts +++ b/src/types/protocol.ts @@ -176,6 +176,65 @@ export interface StreamEvent extends MessageEnvelope { event: LettaStreamingResponse; } +// ═══════════════════════════════════════════════════════════════ +// TOOL LIFECYCLE EVENTS +// ═══════════════════════════════════════════════════════════════ + +/** + * Informational lifecycle event emitted when the runtime asks for user approval + * for a specific tool call. + * + * NOTE: + * - `control_request` remains the canonical UI trigger for approval state. + * - This event is telemetry/lifecycle only and should not replace + * `control_request` in UI reducers. + */ +export interface ApprovalRequestedMessage extends MessageEnvelope { + type: "approval_requested"; + request_id: string; + tool_call_id: string; + tool_name: string; + run_id?: string; +} + +/** + * Informational lifecycle event emitted after an approval request receives + * a decision. + * + * NOTE: + * - `control_request` + `control_response` remain canonical for approval flow. + * - This event is telemetry/lifecycle only. + */ +export interface ApprovalReceivedMessage extends MessageEnvelope { + type: "approval_received"; + request_id: string; + tool_call_id: string; + decision: "allow" | "deny"; + reason?: string; + run_id?: string; +} + +/** + * Emitted when local execution starts for a previously approved tool call. + * This is authoritative for starting tool-running timers in device clients. + */ +export interface ToolExecutionStartedMessage extends MessageEnvelope { + type: "tool_execution_started"; + tool_call_id: string; + run_id?: string; +} + +/** + * Emitted when local execution finishes for a previously started tool call. + * This is authoritative for stopping tool-running timers in device clients. + */ +export interface ToolExecutionFinishedMessage extends MessageEnvelope { + type: "tool_execution_finished"; + tool_call_id: string; + status: "success" | "error"; + run_id?: string; +} + // ═══════════════════════════════════════════════════════════════ // AUTO APPROVAL // ═══════════════════════════════════════════════════════════════ @@ -778,6 +837,10 @@ export type WireMessage = | SystemMessage | ContentMessage | StreamEvent + | ApprovalRequestedMessage + | ApprovalReceivedMessage + | ToolExecutionStartedMessage + | ToolExecutionFinishedMessage | AutoApprovalMessage | CancelAckMessage | ErrorMessage diff --git a/src/websocket/listen-client.ts b/src/websocket/listen-client.ts index 5ca9bf8..d6336dc 100644 --- a/src/websocket/listen-client.ts +++ b/src/websocket/listen-client.ts @@ -63,6 +63,8 @@ import { settingsManager } from "../settings-manager"; import { isInteractiveApprovalTool } from "../tools/interactivePolicy"; import { loadTools } from "../tools/manager"; import type { + ApprovalReceivedMessage, + ApprovalRequestedMessage, AutoApprovalMessage, CancelAckMessage, CanUseToolResponse, @@ -77,6 +79,8 @@ import type { RetryMessage, StopReasonType, SyncCompleteMessage, + ToolExecutionFinishedMessage, + ToolExecutionStartedMessage, TranscriptBackfillMessage, TranscriptSupplementMessage, } from "../types/protocol"; @@ -286,6 +290,7 @@ interface StateResponseMessage { is_processing: boolean; last_stop_reason: string | null; control_response_capable: boolean; + tool_lifecycle_capable: boolean; active_run: { run_id: string | null; agent_id: string | null; @@ -1385,6 +1390,7 @@ function buildStateResponse( is_processing: runtime.isProcessing, last_stop_reason: runtime.lastStopReason, control_response_capable: true, + tool_lifecycle_capable: true, active_run: { run_id: runtime.activeRunId, agent_id: runtime.activeAgentId, @@ -1536,6 +1542,10 @@ function sendControlMessageOverWebSocket( export type WsProtocolEvent = | MessageWire + | ApprovalRequestedMessage + | ApprovalReceivedMessage + | ToolExecutionStartedMessage + | ToolExecutionFinishedMessage | AutoApprovalMessage | CancelAckMessage | ErrorMessage @@ -1949,6 +1959,54 @@ function emitInterruptToolReturnMessage( } } +function emitToolExecutionStartedEvents( + socket: WebSocket, + runtime: ListenerRuntime, + params: { + toolCallIds: string[]; + runId?: string | null; + agentId?: string; + conversationId?: string; + }, +): void { + for (const toolCallId of params.toolCallIds) { + emitToWS(socket, { + type: "tool_execution_started", + tool_call_id: toolCallId, + ...(params.runId ? { run_id: params.runId } : {}), + session_id: runtime.sessionId, + uuid: `tool-exec-started-${toolCallId}`, + agent_id: params.agentId, + conversation_id: params.conversationId, + }); + } +} + +function emitToolExecutionFinishedEvents( + socket: WebSocket, + runtime: ListenerRuntime, + params: { + approvals: ApprovalResult[] | null; + runId?: string | null; + agentId?: string; + conversationId?: string; + }, +): void { + const toolReturns = extractInterruptToolReturns(params.approvals); + for (const toolReturn of toolReturns) { + emitToWS(socket, { + type: "tool_execution_finished", + tool_call_id: toolReturn.tool_call_id, + status: toolReturn.status, + ...(params.runId ? { run_id: params.runId } : {}), + session_id: runtime.sessionId, + uuid: `tool-exec-finished-${toolReturn.tool_call_id}`, + agent_id: params.agentId, + conversation_id: params.conversationId, + }); + } +} + function getInterruptApprovalsForEmission( runtime: ListenerRuntime, params: { @@ -3690,6 +3748,8 @@ async function handleIncomingMessage( ac.parsedArgs, turnWorkingDirectory, ); + const lifecycleRunId = + runId || runtime.activeRunId || msgRunIds[msgRunIds.length - 1]; const controlRequest: ControlRequest = { type: "control_request", @@ -3707,6 +3767,18 @@ async function handleIncomingMessage( conversation_id: conversationId, }; + emitToWS(socket, { + type: "approval_requested", + request_id: requestId, + tool_call_id: ac.approval.toolCallId, + tool_name: ac.approval.toolName, + ...(lifecycleRunId ? { run_id: lifecycleRunId } : {}), + session_id: runtime.sessionId, + uuid: `approval-requested-${ac.approval.toolCallId}`, + agent_id: agentId, + conversation_id: conversationId, + }); + const responseBody = await requestApprovalOverWS( runtime, socket, @@ -3742,21 +3814,58 @@ async function handleIncomingMessage( agent_id: agentId, conversation_id: conversationId, } as AutoApprovalMessage); + emitToWS(socket, { + type: "approval_received", + request_id: requestId, + tool_call_id: ac.approval.toolCallId, + decision: "allow", + reason: "Approved via WebSocket", + ...(lifecycleRunId ? { run_id: lifecycleRunId } : {}), + session_id: runtime.sessionId, + uuid: `approval-received-${ac.approval.toolCallId}`, + agent_id: agentId, + conversation_id: conversationId, + }); } else { decisions.push({ type: "deny", approval: ac.approval, reason: response?.message || "Denied via WebSocket", }); + emitToWS(socket, { + type: "approval_received", + request_id: requestId, + tool_call_id: ac.approval.toolCallId, + decision: "deny", + reason: response?.message || "Denied via WebSocket", + ...(lifecycleRunId ? { run_id: lifecycleRunId } : {}), + session_id: runtime.sessionId, + uuid: `approval-received-${ac.approval.toolCallId}`, + agent_id: agentId, + conversation_id: conversationId, + }); } } else { + const denyReason = + responseBody.subtype === "error" + ? responseBody.error + : "Unknown error"; decisions.push({ type: "deny", approval: ac.approval, - reason: - responseBody.subtype === "error" - ? responseBody.error - : "Unknown error", + reason: denyReason, + }); + emitToWS(socket, { + type: "approval_received", + request_id: requestId, + tool_call_id: ac.approval.toolCallId, + decision: "deny", + reason: denyReason, + ...(lifecycleRunId ? { run_id: lifecycleRunId } : {}), + session_id: runtime.sessionId, + uuid: `approval-received-${ac.approval.toolCallId}`, + agent_id: agentId, + conversation_id: conversationId, }); } } @@ -3771,6 +3880,14 @@ async function handleIncomingMessage( ) .map((decision) => decision.approval.toolCallId); runtime.activeExecutingToolCallIds = [...lastExecutingToolCallIds]; + const executionRunId = + runId || runtime.activeRunId || msgRunIds[msgRunIds.length - 1]; + emitToolExecutionStartedEvents(socket, runtime, { + toolCallIds: lastExecutingToolCallIds, + runId: executionRunId, + agentId, + conversationId, + }); // Execute approved/denied tools const executionResults = await executeApprovalBatch( @@ -3788,6 +3905,12 @@ async function handleIncomingMessage( executionResults, lastExecutingToolCallIds, ); + emitToolExecutionFinishedEvents(socket, runtime, { + approvals: persistedExecutionResults, + runId: executionRunId, + agentId, + conversationId, + }); lastExecutionResults = persistedExecutionResults; // WS-first parity: publish tool-return terminal outcomes immediately on // normal approval execution, before continuation stream send. @@ -3850,6 +3973,12 @@ async function handleIncomingMessage( conversationId, }); if (approvalsForEmission) { + emitToolExecutionFinishedEvents(socket, runtime, { + approvals: approvalsForEmission, + runId: runtime.activeRunId || msgRunIds[msgRunIds.length - 1], + agentId: agentId || "", + conversationId, + }); emitInterruptToolReturnMessage( socket, runtime,