diff --git a/src/tests/websocket/listen-client-protocol.test.ts b/src/tests/websocket/listen-client-protocol.test.ts index ca9f60a..348c6c1 100644 --- a/src/tests/websocket/listen-client-protocol.test.ts +++ b/src/tests/websocket/listen-client-protocol.test.ts @@ -241,23 +241,11 @@ describe("listen-client requestApprovalOverWS", () => { }); }); -describe("listen-client controlResponseCapable latch", () => { - test("runtime initializes with controlResponseCapable = false", () => { +describe("listen-client state_response control protocol", () => { + test("always advertises control_response capability", () => { const runtime = __listenClientTestUtils.createRuntime(); - expect(runtime.controlResponseCapable).toBe(false); - }); - - test("latch stays true after being set once", () => { - const runtime = __listenClientTestUtils.createRuntime(); - expect(runtime.controlResponseCapable).toBe(false); - - runtime.controlResponseCapable = true; - expect(runtime.controlResponseCapable).toBe(true); - - // Simulates second message without the flag — latch should persist - // (actual latching happens in handleIncomingMessage, but the runtime - // field itself should hold the value) - expect(runtime.controlResponseCapable).toBe(true); + const snapshot = __listenClientTestUtils.buildStateResponse(runtime, 1); + expect(snapshot.control_response_capable).toBe(true); }); }); diff --git a/src/websocket/listen-client.ts b/src/websocket/listen-client.ts index 0fd8a34..fa6ddc8 100644 --- a/src/websocket/listen-client.ts +++ b/src/websocket/listen-client.ts @@ -9,11 +9,9 @@ import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agen import type { ApprovalCreate, LettaStreamingResponse, - ToolReturn, } from "@letta-ai/letta-client/resources/agents/messages"; import WebSocket from "ws"; import { - type ApprovalDecision, type ApprovalResult, executeApprovalBatch, } from "../agent/approval-execution"; @@ -111,16 +109,6 @@ interface IncomingMessage { messages: Array< (MessageCreate & { client_message_id?: string }) | ApprovalCreate >; - /** Cloud sets this when it supports can_use_tool / control_response protocol. */ - supportsControlResponse?: boolean; -} - -interface ResultMessage { - type: "result"; - success: boolean; - stopReason?: string; - event_seq?: number; - session_id?: string; } interface RunStartedMessage { @@ -245,7 +233,6 @@ type ServerMessage = | WsControlResponse; type ClientMessage = | PingMessage - | ResultMessage | RunStartedMessage | RunRequestErrorMessage | ModeChangedMessage @@ -266,8 +253,6 @@ type ListenerRuntime = { hasSuccessfulConnection: boolean; messageQueue: Promise; pendingApprovalResolvers: Map; - /** Latched once supportsControlResponse is seen on any message. */ - controlResponseCapable: boolean; /** Stable session ID for MessageEnvelope-based emissions (scoped to runtime lifecycle). */ sessionId: string; /** Monotonic event sequence for all outbound status/protocol events. */ @@ -326,10 +311,6 @@ type ListenerRuntime = { pendingInterruptedToolCallIds: string[] | null; }; -type ApprovalSlot = - | { type: "result"; value: ApprovalResult } - | { type: "decision" }; - // Listen mode supports one active connection per process. let activeRuntime: ListenerRuntime | null = null; @@ -384,7 +365,6 @@ function createRuntime(): ListenerRuntime { hasSuccessfulConnection: false, messageQueue: Promise.resolve(), pendingApprovalResolvers: new Map(), - controlResponseCapable: false, sessionId: `listen-${crypto.randomUUID()}`, eventSeqCounter: 0, lastStopReason: null, @@ -745,7 +725,7 @@ function buildStateResponse( mode: permissionMode.getMode(), is_processing: runtime.isProcessing, last_stop_reason: runtime.lastStopReason, - control_response_capable: runtime.controlResponseCapable, + control_response_capable: true, active_run: { run_id: runtime.activeRunId, agent_id: runtime.activeAgentId, @@ -792,6 +772,36 @@ function emitCancelAck( } as CancelAckMessage); } +function emitTurnResult( + socket: WebSocket, + runtime: ListenerRuntime, + params: { + subtype: ProtocolResultMessage["subtype"]; + agentId: string; + conversationId: string; + durationMs: number; + numTurns: number; + runIds: string[]; + stopReason?: StopReasonType; + }, +): void { + emitToWS(socket, { + type: "result", + subtype: params.subtype, + agent_id: params.agentId, + conversation_id: params.conversationId, + duration_ms: params.durationMs, + duration_api_ms: 0, + num_turns: params.numTurns, + result: null, + run_ids: params.runIds, + usage: null, + ...(params.stopReason ? { stop_reason: params.stopReason } : {}), + session_id: runtime.sessionId, + uuid: `result-${crypto.randomUUID()}`, + }); +} + function sendClientMessage( socket: WebSocket, payload: ClientMessage, @@ -1701,89 +1711,6 @@ export function requestApprovalOverWS( }); } -function buildApprovalExecutionPlan( - approvalMessage: ApprovalCreate, - pendingApprovals: Array<{ - toolCallId: string; - toolName: string; - toolArgs: string; - }>, -): { - slots: ApprovalSlot[]; - decisions: ApprovalDecision[]; -} { - const pendingByToolCallId = new Map( - pendingApprovals.map((approval) => [approval.toolCallId, approval]), - ); - - const slots: ApprovalSlot[] = []; - const decisions: ApprovalDecision[] = []; - - for (const approval of approvalMessage.approvals ?? []) { - if (approval.type === "tool") { - slots.push({ type: "result", value: approval as ToolReturn }); - continue; - } - - if (approval.type !== "approval") { - slots.push({ - type: "result", - value: { - type: "tool", - tool_call_id: "unknown", - tool_return: "Error: Unsupported approval payload", - status: "error", - }, - }); - continue; - } - - const pending = pendingByToolCallId.get(approval.tool_call_id); - - if (approval.approve) { - if (!pending) { - slots.push({ - type: "result", - value: { - type: "tool", - tool_call_id: approval.tool_call_id, - tool_return: "Error: Pending approval not found", - status: "error", - }, - }); - continue; - } - - decisions.push({ - type: "approve", - approval: { - toolCallId: pending.toolCallId, - toolName: pending.toolName, - toolArgs: pending.toolArgs || "{}", - }, - }); - slots.push({ type: "decision" }); - continue; - } - - decisions.push({ - type: "deny", - approval: { - toolCallId: approval.tool_call_id, - toolName: pending?.toolName ?? "", - toolArgs: pending?.toolArgs ?? "{}", - }, - reason: - typeof approval.reason === "string" && approval.reason.length > 0 - ? approval.reason - : "Tool execution denied", - }); - slots.push({ type: "decision" }); - } - - return { slots, decisions }; -} - async function recoverPendingApprovals( runtime: ListenerRuntime, socket: WebSocket, @@ -1910,11 +1837,6 @@ async function recoverPendingApprovals( ]; if (needsUserInput.length > 0) { - if (!runtime.controlResponseCapable) { - runtime.lastStopReason = "requires_approval"; - return; - } - // Reflect approval-wait state in runtime snapshot while control // requests are pending, so state_response queries see // requires_approval even during the WS round-trip. @@ -2015,7 +1937,6 @@ async function recoverPendingApprovals( approvals: executionResults, }, ], - supportsControlResponse: runtime.controlResponseCapable, }, socket, runtime, @@ -2319,9 +2240,6 @@ async function connectWithRetry( return; } - // Recovery requests are only sent by the modern cloud listener protocol. - runtime.controlResponseCapable = true; - // Serialize recovery with normal message handling to avoid concurrent // handleIncomingMessage execution when user messages arrive concurrently. runtime.pendingTurns++; @@ -2363,9 +2281,23 @@ async function connectWithRetry( // Handle incoming messages (queued for sequential processing) if (parsed.type === "message") { - // Queue lifecycle tracking: only enqueue if first payload is a - // MessageCreate (has `content`). ApprovalCreate payloads (legacy - // approval path) do not represent user-initiated messages. + const hasApprovalPayload = parsed.messages.some( + (payload): payload is ApprovalCreate => + "type" in payload && payload.type === "approval", + ); + if (hasApprovalPayload) { + emitToWS(socket, { + type: "error", + message: + "Protocol violation: device websocket no longer accepts approval payloads inside message frames. Send control_response instead.", + stop_reason: "error", + session_id: runtime.sessionId, + uuid: `error-${crypto.randomUUID()}`, + }); + return; + } + + // Queue lifecycle tracking: only enqueue user MessageCreate payloads. const firstPayload = parsed.messages.at(0); const isUserMessage = firstPayload !== undefined && "content" in firstPayload; @@ -2585,11 +2517,6 @@ async function handleIncomingMessage( runtime.activeExecutingToolCallIds = []; try { - // Latch capability: once seen, always use blocking path (strict check to avoid truthy strings) - if (msg.supportsControlResponse === true) { - runtime.controlResponseCapable = true; - } - if (!agentId) { runtime.isProcessing = false; clearActiveRunState(runtime); @@ -2609,7 +2536,6 @@ async function handleIncomingMessage( let messagesToSend: Array = []; let turnToolContextId: string | null = null; let queuedInterruptedToolCallIds: string[] = []; - let shouldClearSubmittedApprovalTracking = false; // Prepend queued interrupted results from a prior cancelled turn. const consumed = consumeInterruptQueue( @@ -2624,100 +2550,6 @@ async function handleIncomingMessage( messagesToSend.push(...msg.messages); - const firstMessage = msg.messages[0]; - const isApprovalMessage = - firstMessage && - "type" in firstMessage && - firstMessage.type === "approval" && - "approvals" in firstMessage; - - if (isApprovalMessage) { - if (runtime.controlResponseCapable && process.env.DEBUG) { - console.warn( - "[Listen] Protocol violation: controlResponseCapable is latched but received legacy ApprovalCreate message. " + - "The cloud should send control_response instead. This may cause the current turn to stall.", - ); - } - const approvalMessage = firstMessage as ApprovalCreate; - const client = await getClient(); - const agent = await client.agents.retrieve(agentId); - const resumeData = await getResumeData( - client, - agent, - requestedConversationId, - ); - - const { slots, decisions } = buildApprovalExecutionPlan( - approvalMessage, - resumeData.pendingApprovals, - ); - lastExecutingToolCallIds = decisions - .filter( - ( - decision, - ): decision is Extract => - decision.type === "approve", - ) - .map((decision) => decision.approval.toolCallId); - runtime.activeExecutingToolCallIds = [...lastExecutingToolCallIds]; - - const decisionResults = - decisions.length > 0 - ? await executeApprovalBatch(decisions, undefined, { - toolContextId: turnToolContextId ?? undefined, - abortSignal: runtime.activeAbortController.signal, - }) - : []; - const persistedDecisionResults = - normalizeExecutionResultsForInterruptParity( - runtime, - decisionResults, - lastExecutingToolCallIds, - ); - - const rebuiltApprovals: ApprovalResult[] = []; - let decisionResultIndex = 0; - - for (const slot of slots) { - if (slot.type === "result") { - rebuiltApprovals.push(slot.value); - continue; - } - - const next = persistedDecisionResults[decisionResultIndex]; - if (next) { - rebuiltApprovals.push(next); - decisionResultIndex++; - continue; - } - - rebuiltApprovals.push({ - type: "tool", - tool_call_id: "unknown", - tool_return: "Error: Missing approval execution result", - status: "error", - }); - } - - lastExecutionResults = rebuiltApprovals; - shouldClearSubmittedApprovalTracking = true; - messagesToSend = [ - { - type: "approval", - approvals: rebuiltApprovals, - }, - ]; - // Emit terminal tool outcomes immediately so WS consumers can close - // tool-call UI state without waiting for follow-up hydration. - emitInterruptToolReturnMessage( - socket, - runtime, - rebuiltApprovals, - runtime.activeRunId ?? undefined, - "tool-return", - ); - } - let currentInput = messagesToSend; const sendOptions: Parameters[2] = { agentId, @@ -2740,12 +2572,6 @@ async function handleIncomingMessage( runtime, runtime.activeAbortController.signal, ); - if (shouldClearSubmittedApprovalTracking) { - lastExecutionResults = null; - lastExecutingToolCallIds = []; - lastNeedsUserInputToolCallIds = []; - runtime.activeExecutingToolCallIds = []; - } turnToolContextId = getStreamToolContextId( stream as Stream, @@ -2830,28 +2656,14 @@ async function handleIncomingMessage( runtime.isProcessing = false; clearActiveRunState(runtime); - if (runtime.controlResponseCapable) { - emitToWS(socket, { - type: "result", - subtype: "success", - agent_id: agentId, - conversation_id: conversationId, - duration_ms: performance.now() - msgStartTime, - duration_api_ms: 0, - num_turns: msgTurnCount, - result: null, - run_ids: msgRunIds, - usage: null, - session_id: runtime.sessionId, - uuid: `result-${crypto.randomUUID()}`, - }); - } else { - sendClientMessage(socket, { - type: "result", - success: true, - stopReason: "end_turn", - }); - } + emitTurnResult(socket, runtime, { + subtype: "success", + agentId, + conversationId, + durationMs: performance.now() - msgStartTime, + numTurns: msgTurnCount, + runIds: msgRunIds, + }); break; } @@ -2861,29 +2673,15 @@ async function handleIncomingMessage( runtime.isProcessing = false; clearActiveRunState(runtime); - if (runtime.controlResponseCapable) { - emitToWS(socket, { - type: "result", - subtype: "interrupted", - agent_id: agentId, - conversation_id: conversationId, - duration_ms: performance.now() - msgStartTime, - duration_api_ms: 0, - num_turns: msgTurnCount, - result: null, - run_ids: msgRunIds, - usage: null, - stop_reason: "cancelled", - session_id: runtime.sessionId, - uuid: `result-${crypto.randomUUID()}`, - }); - } else { - sendClientMessage(socket, { - type: "result", - success: false, - stopReason: "cancelled", - }); - } + emitTurnResult(socket, runtime, { + subtype: "interrupted", + agentId, + conversationId, + durationMs: performance.now() - msgStartTime, + numTurns: msgTurnCount, + runIds: msgRunIds, + stopReason: "cancelled", + }); break; } @@ -2961,29 +2759,15 @@ async function handleIncomingMessage( runtime.isProcessing = false; clearActiveRunState(runtime); - if (runtime.controlResponseCapable) { - emitToWS(socket, { - type: "result", - subtype: "interrupted", - agent_id: agentId, - conversation_id: conversationId, - duration_ms: performance.now() - msgStartTime, - duration_api_ms: 0, - num_turns: msgTurnCount, - result: null, - run_ids: msgRunIds, - usage: null, - stop_reason: "cancelled", - session_id: runtime.sessionId, - uuid: `result-${crypto.randomUUID()}`, - }); - } else { - sendClientMessage(socket, { - type: "result", - success: false, - stopReason: "cancelled", - }); - } + emitTurnResult(socket, runtime, { + subtype: "interrupted", + agentId, + conversationId, + durationMs: performance.now() - msgStartTime, + numTurns: msgTurnCount, + runIds: msgRunIds, + stopReason: "cancelled", + }); break; } @@ -3002,29 +2786,15 @@ async function handleIncomingMessage( session_id: runtime.sessionId, uuid: `error-${crypto.randomUUID()}`, }); - if (runtime.controlResponseCapable) { - emitToWS(socket, { - type: "result", - subtype: "error", - agent_id: agentId, - conversation_id: conversationId, - duration_ms: performance.now() - msgStartTime, - duration_api_ms: 0, - num_turns: msgTurnCount, - result: null, - run_ids: msgRunIds, - usage: null, - stop_reason: effectiveStopReason, - session_id: runtime.sessionId, - uuid: `result-${crypto.randomUUID()}`, - }); - } else { - sendClientMessage(socket, { - type: "result", - success: false, - stopReason: effectiveStopReason, - }); - } + emitTurnResult(socket, runtime, { + subtype: "error", + agentId, + conversationId, + durationMs: performance.now() - msgStartTime, + numTurns: msgTurnCount, + runIds: msgRunIds, + stopReason: effectiveStopReason, + }); break; } @@ -3035,9 +2805,20 @@ async function handleIncomingMessage( runtime.isProcessing = false; clearActiveRunState(runtime); - sendClientMessage(socket, { - type: "result", - success: false, + emitToWS(socket, { + type: "error", + message: "requires_approval stop returned no approvals", + stop_reason: "error", + session_id: runtime.sessionId, + uuid: `error-${crypto.randomUUID()}`, + }); + emitTurnResult(socket, runtime, { + subtype: "error", + agentId, + conversationId, + durationMs: performance.now() - msgStartTime, + numTurns: msgTurnCount, + runIds: msgRunIds, stopReason: "error", }); break; @@ -3119,20 +2900,7 @@ async function handleIncomingMessage( if (needsUserInput.length > 0) { runtime.lastStopReason = "requires_approval"; - if (!runtime.controlResponseCapable) { - // Legacy path: break out, let cloud re-enter with ApprovalCreate - runtime.isProcessing = false; - clearActiveRunState(runtime); - - sendClientMessage(socket, { - type: "result", - success: false, - stopReason: "requires_approval", - }); - break; - } - - // New path: blocking-in-loop via WS control protocol + // Block in-loop via the control protocol for all device approvals. for (const ac of needsUserInput) { const requestId = `perm-${ac.approval.toolCallId}`; const diffs = await computeDiffPreviews( @@ -3306,29 +3074,15 @@ async function handleIncomingMessage( runtime.isProcessing = false; clearActiveRunState(runtime); - if (runtime.controlResponseCapable) { - emitToWS(socket, { - type: "result", - subtype: "interrupted", - agent_id: agentId || "", - conversation_id: conversationId, - duration_ms: performance.now() - msgStartTime, - duration_api_ms: 0, - num_turns: msgTurnCount, - result: null, - run_ids: msgRunIds, - usage: null, - stop_reason: "cancelled", - session_id: runtime.sessionId, - uuid: `result-${crypto.randomUUID()}`, - }); - } else { - sendClientMessage(socket, { - type: "result", - success: false, - stopReason: "cancelled", - }); - } + emitTurnResult(socket, runtime, { + subtype: "interrupted", + agentId: agentId || "", + conversationId, + durationMs: performance.now() - msgStartTime, + numTurns: msgTurnCount, + runIds: msgRunIds, + stopReason: "cancelled", + }); return; } @@ -3363,29 +3117,15 @@ async function handleIncomingMessage( session_id: runtime.sessionId, uuid: `error-${crypto.randomUUID()}`, }); - if (runtime.controlResponseCapable) { - emitToWS(socket, { - type: "result", - subtype: "error", - agent_id: agentId || "", - conversation_id: conversationId, - duration_ms: performance.now() - msgStartTime, - duration_api_ms: 0, - num_turns: msgTurnCount, - result: null, - run_ids: msgRunIds, - usage: null, - stop_reason: "error", - session_id: runtime.sessionId, - uuid: `result-${crypto.randomUUID()}`, - }); - } else { - sendClientMessage(socket, { - type: "result", - success: false, - stopReason: "error", - }); - } + emitTurnResult(socket, runtime, { + subtype: "error", + agentId: agentId || "", + conversationId, + durationMs: performance.now() - msgStartTime, + numTurns: msgTurnCount, + runIds: msgRunIds, + stopReason: "error", + }); if (process.env.DEBUG) { console.error("[Listen] Error handling message:", error);