From af28a3f7440332d239d027b46be9116bc42211f6 Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Mon, 23 Mar 2026 22:51:57 -0700 Subject: [PATCH] fix(listener): inject queued skill content in websocket continuations (#1502) Co-authored-by: Letta Code --- .../listen-client-concurrency.test.ts | 172 +++++++++++++++++- src/websocket/listener/send.ts | 5 +- src/websocket/listener/skill-injection.ts | 30 +++ src/websocket/listener/turn-approval.ts | 9 +- src/websocket/listener/turn.ts | 136 +++++++------- 5 files changed, 282 insertions(+), 70 deletions(-) create mode 100644 src/websocket/listener/skill-injection.ts diff --git a/src/tests/websocket/listen-client-concurrency.test.ts b/src/tests/websocket/listen-client-concurrency.test.ts index 4e117d2..8901a2c 100644 --- a/src/tests/websocket/listen-client-concurrency.test.ts +++ b/src/tests/websocket/listen-client-concurrency.test.ts @@ -6,6 +6,9 @@ import type { MessageQueueItem, TaskNotificationQueueItem, } from "../../queue/queueRuntime"; +import { queueSkillContent } from "../../tools/impl/skillContentRegistry"; +import { resolveRecoveredApprovalResponse } from "../../websocket/listener/recovery"; +import { injectQueuedSkillContent } from "../../websocket/listener/skill-injection"; import type { IncomingMessage } from "../../websocket/listener/types"; type MockStream = { @@ -197,6 +200,8 @@ function makeIncomingMessage( describe("listen-client multi-worker concurrency", () => { beforeEach(() => { + queueSkillContent("__test-cleanup__", "__test-cleanup__"); + injectQueuedSkillContent([]); permissionMode.reset(); sendMessageStreamMock.mockClear(); getStreamToolContextIdMock.mockClear(); @@ -753,6 +758,11 @@ describe("listen-client multi-worker concurrency", () => { throw new Error("Expected stale recovery queued task item"); } + queueSkillContent( + "tool-call-1", + "stale recovery skill content", + ); + const recoveryPromise = __listenClientTestUtils.resolveStaleApprovals( runtime, socket as unknown as WebSocket, @@ -766,7 +776,7 @@ describe("listen-client multi-worker concurrency", () => { const continuationMessages = sendMessageStreamMock.mock.calls[0]?.[1] as | Array> | undefined; - expect(continuationMessages).toHaveLength(2); + expect(continuationMessages).toHaveLength(3); expect(continuationMessages?.[0]).toEqual( expect.objectContaining({ type: "approval", @@ -785,6 +795,16 @@ describe("listen-client multi-worker concurrency", () => { }, ], }); + expect(continuationMessages?.[2]).toEqual({ + role: "user", + content: [ + { + type: "text", + text: "stale recovery skill content", + }, + ], + otid: expect.any(String), + }); expect(runtime.loopStatus as string).toBe("PROCESSING_API_RESPONSE"); expect(runtime.queueRuntime.length).toBe(0); expect(runtime.queuedMessagesByItemId.size).toBe(0); @@ -809,6 +829,156 @@ describe("listen-client multi-worker concurrency", () => { }); }); + test("interrupt-queue approval continuation appends skill content as trailing user message", async () => { + const listener = __listenClientTestUtils.createListenerRuntime(); + __listenClientTestUtils.setActiveRuntime(listener); + const runtime = __listenClientTestUtils.getOrCreateScopedRuntime( + listener, + "agent-1", + "conv-int", + ); + const socket = new MockSocket(); + + runtime.pendingInterruptedResults = [ + { + type: "approval", + tool_call_id: "call-int", + approve: false, + reason: "Interrupted by user", + }, + ] as never; + runtime.pendingInterruptedContext = { + agentId: "agent-1", + conversationId: "conv-int", + continuationEpoch: runtime.continuationEpoch, + }; + runtime.pendingInterruptedToolCallIds = ["call-int"]; + + queueSkillContent( + "call-int", + "interrupt path skill content", + ); + + await __listenClientTestUtils.handleIncomingMessage( + { + type: "message", + agentId: "agent-1", + conversationId: "conv-int", + messages: [], + } as unknown as IncomingMessage, + socket as unknown as WebSocket, + runtime, + ); + + expect(sendMessageStreamMock.mock.calls.length).toBeGreaterThan(0); + const firstSendMessages = sendMessageStreamMock.mock.calls[0]?.[1] as + | Array> + | undefined; + + expect(firstSendMessages).toHaveLength(2); + expect(firstSendMessages?.[0]).toMatchObject({ + type: "approval", + approvals: [ + { + tool_call_id: "call-int", + approve: false, + reason: "Interrupted by user", + }, + ], + }); + expect(firstSendMessages?.[1]).toEqual({ + role: "user", + content: [ + { + type: "text", + text: "interrupt path skill content", + }, + ], + otid: expect.any(String), + }); + }); + + test("recovered approval replay keeps approval-only routing and appends skill content at send boundary", async () => { + const listener = __listenClientTestUtils.createListenerRuntime(); + __listenClientTestUtils.setActiveRuntime(listener); + const runtime = __listenClientTestUtils.getOrCreateScopedRuntime( + listener, + "agent-1", + "conv-recovered", + ); + const socket = new MockSocket(); + + runtime.recoveredApprovalState = { + agentId: "agent-1", + conversationId: "conv-recovered", + approvalsByRequestId: new Map([ + [ + "perm-recovered-1", + { + approval: { + toolCallId: "tool-call-recovered-1", + toolName: "Write", + toolArgs: '{"file_path":"foo.ts"}', + }, + controlRequest: { + type: "control_request", + request_id: "perm-recovered-1", + request: { + subtype: "can_use_tool", + tool_name: "Write", + input: { file_path: "foo.ts" }, + tool_call_id: "tool-call-recovered-1", + permission_suggestions: [], + blocked_path: null, + }, + agent_id: "agent-1", + conversation_id: "conv-recovered", + }, + }, + ], + ]), + pendingRequestIds: new Set(["perm-recovered-1"]), + responsesByRequestId: new Map(), + }; + + queueSkillContent( + "tool-call-recovered-1", + "recovered skill content", + ); + + await resolveRecoveredApprovalResponse( + runtime, + socket as unknown as WebSocket, + { + request_id: "perm-recovered-1", + decision: { behavior: "allow" }, + }, + __listenClientTestUtils.handleIncomingMessage, + {}, + ); + + expect(sendMessageStreamMock.mock.calls.length).toBeGreaterThan(0); + const firstSendMessages = sendMessageStreamMock.mock.calls[0]?.[1] as + | Array> + | undefined; + + expect(firstSendMessages).toHaveLength(2); + expect(firstSendMessages?.[0]).toMatchObject({ + type: "approval", + approvals: [], + }); + expect(firstSendMessages?.[1]).toEqual({ + role: "user", + content: [ + { + type: "text", + text: "recovered skill content", + }, + ], + otid: expect.any(String), + }); + }); + test("queue pump status callbacks stay aggregate when another conversation is busy", async () => { const listener = __listenClientTestUtils.createListenerRuntime(); __listenClientTestUtils.setActiveRuntime(listener); diff --git a/src/websocket/listener/send.ts b/src/websocket/listener/send.ts index e6f9394..733699b 100644 --- a/src/websocket/listener/send.ts +++ b/src/websocket/listener/send.ts @@ -55,6 +55,7 @@ import { getApprovalContinuationRecoveryDisposition, isApprovalToolCallDesyncError, } from "./recovery"; +import { injectQueuedSkillContent } from "./skill-injection"; import type { ConversationRuntime } from "./types"; export function isApprovalOnlyInput( @@ -300,9 +301,11 @@ export async function resolveStaleApprovals( emitDequeuedUserMessage(socket, runtime, queuedTurn, dequeuedBatch); } + const continuationMessagesWithSkillContent = + injectQueuedSkillContent(continuationMessages); const recoveryStream = await sendApprovalContinuationWithRetry( recoveryConversationId, - continuationMessages, + continuationMessagesWithSkillContent, { agentId: runtime.agentId ?? undefined, streamTokens: true, diff --git a/src/websocket/listener/skill-injection.ts b/src/websocket/listener/skill-injection.ts new file mode 100644 index 0000000..796be78 --- /dev/null +++ b/src/websocket/listener/skill-injection.ts @@ -0,0 +1,30 @@ +import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents"; +import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages"; +import { consumeQueuedSkillContent } from "../../tools/impl/skillContentRegistry"; + +/** + * Append queued Skill tool content as a trailing user message. + * + * Ordering is preserved: existing messages stay in place and skill content, + * when present, is appended at the end. + */ +export function injectQueuedSkillContent( + messages: Array, +): Array { + const skillContents = consumeQueuedSkillContent(); + if (skillContents.length === 0) { + return messages; + } + + return [ + ...messages, + { + role: "user", + otid: crypto.randomUUID(), + content: skillContents.map((sc) => ({ + type: "text" as const, + text: sc.content, + })), + }, + ]; +} diff --git a/src/websocket/listener/turn-approval.ts b/src/websocket/listener/turn-approval.ts index 586a843..096f713 100644 --- a/src/websocket/listener/turn-approval.ts +++ b/src/websocket/listener/turn-approval.ts @@ -42,6 +42,7 @@ import { markAwaitingAcceptedApprovalContinuationRunId, sendApprovalContinuationWithRetry, } from "./send"; +import { injectQueuedSkillContent } from "./skill-injection"; import type { ConversationRuntime } from "./types"; type Decision = @@ -332,13 +333,15 @@ export async function handleApprovalStop(params: { emitDequeuedUserMessage(socket, runtime, queuedTurn, dequeuedBatch); } + const nextInputWithSkillContent = injectQueuedSkillContent(nextInput); + setLoopStatus(runtime, "SENDING_API_REQUEST", { agent_id: agentId, conversation_id: conversationId, }); const stream = await sendApprovalContinuationWithRetry( conversationId, - nextInput, + nextInputWithSkillContent, buildSendOptions(), socket, runtime, @@ -348,7 +351,7 @@ export async function handleApprovalStop(params: { return { terminated: true, stream: null, - currentInput: nextInput, + currentInput: nextInputWithSkillContent, dequeuedBatchId: continuationBatchId, pendingNormalizationInterruptedToolCallIds: [], turnToolContextId, @@ -392,7 +395,7 @@ export async function handleApprovalStop(params: { return { terminated: false, stream, - currentInput: nextInput, + currentInput: nextInputWithSkillContent, dequeuedBatchId: continuationBatchId, pendingNormalizationInterruptedToolCallIds: [], turnToolContextId: null, diff --git a/src/websocket/listener/turn.ts b/src/websocket/listener/turn.ts index 9dc08db..6f48c28 100644 --- a/src/websocket/listener/turn.ts +++ b/src/websocket/listener/turn.ts @@ -75,6 +75,7 @@ import { sendApprovalContinuationWithRetry, sendMessageStreamWithRetry, } from "./send"; +import { injectQueuedSkillContent } from "./skill-injection"; import { handleApprovalStop } from "./turn-approval"; import type { ConversationRuntime, IncomingMessage } from "./types"; @@ -235,11 +236,12 @@ export async function handleIncomingMessage( }); const isPureApprovalContinuation = isApprovalOnlyInput(currentInput); + const currentInputWithSkillContent = injectQueuedSkillContent(currentInput); let stream = isPureApprovalContinuation ? await sendApprovalContinuationWithRetry( conversationId, - currentInput, + currentInputWithSkillContent, buildSendOptions(), socket, runtime, @@ -247,12 +249,13 @@ export async function handleIncomingMessage( ) : await sendMessageStreamWithRetry( conversationId, - currentInput, + currentInputWithSkillContent, buildSendOptions(), socket, runtime, runtime.activeAbortController.signal, ); + currentInput = currentInputWithSkillContent; if (!stream) { return; } @@ -420,27 +423,28 @@ export async function handleIncomingMessage( agent_id: agentId, conversation_id: conversationId, }); - stream = - currentInput.length === 1 && - currentInput[0] !== undefined && - "type" in currentInput[0] && - currentInput[0].type === "approval" - ? await sendApprovalContinuationWithRetry( - conversationId, - currentInput, - buildSendOptions(), - socket, - runtime, - runtime.activeAbortController.signal, - ) - : await sendMessageStreamWithRetry( - conversationId, - currentInput, - buildSendOptions(), - socket, - runtime, - runtime.activeAbortController.signal, - ); + const isPureApprovalContinuationRetry = + isApprovalOnlyInput(currentInput); + const retryInputWithSkillContent = + injectQueuedSkillContent(currentInput); + stream = isPureApprovalContinuationRetry + ? await sendApprovalContinuationWithRetry( + conversationId, + retryInputWithSkillContent, + buildSendOptions(), + socket, + runtime, + runtime.activeAbortController.signal, + ) + : await sendMessageStreamWithRetry( + conversationId, + retryInputWithSkillContent, + buildSendOptions(), + socket, + runtime, + runtime.activeAbortController.signal, + ); + currentInput = retryInputWithSkillContent; if (!stream) { return; } @@ -503,27 +507,28 @@ export async function handleIncomingMessage( agent_id: agentId, conversation_id: conversationId, }); - stream = - currentInput.length === 1 && - currentInput[0] !== undefined && - "type" in currentInput[0] && - currentInput[0].type === "approval" - ? await sendApprovalContinuationWithRetry( - conversationId, - currentInput, - buildSendOptions(), - socket, - runtime, - runtime.activeAbortController.signal, - ) - : await sendMessageStreamWithRetry( - conversationId, - currentInput, - buildSendOptions(), - socket, - runtime, - runtime.activeAbortController.signal, - ); + const isPureApprovalContinuationRetry = + isApprovalOnlyInput(currentInput); + const retryInputWithSkillContent = + injectQueuedSkillContent(currentInput); + stream = isPureApprovalContinuationRetry + ? await sendApprovalContinuationWithRetry( + conversationId, + retryInputWithSkillContent, + buildSendOptions(), + socket, + runtime, + runtime.activeAbortController.signal, + ) + : await sendMessageStreamWithRetry( + conversationId, + retryInputWithSkillContent, + buildSendOptions(), + socket, + runtime, + runtime.activeAbortController.signal, + ); + currentInput = retryInputWithSkillContent; if (!stream) { return; } @@ -574,27 +579,28 @@ export async function handleIncomingMessage( agent_id: agentId, conversation_id: conversationId, }); - stream = - currentInput.length === 1 && - currentInput[0] !== undefined && - "type" in currentInput[0] && - currentInput[0].type === "approval" - ? await sendApprovalContinuationWithRetry( - conversationId, - currentInput, - buildSendOptions(), - socket, - runtime, - runtime.activeAbortController.signal, - ) - : await sendMessageStreamWithRetry( - conversationId, - currentInput, - buildSendOptions(), - socket, - runtime, - runtime.activeAbortController.signal, - ); + const isPureApprovalContinuationRetry = + isApprovalOnlyInput(currentInput); + const retryInputWithSkillContent = + injectQueuedSkillContent(currentInput); + stream = isPureApprovalContinuationRetry + ? await sendApprovalContinuationWithRetry( + conversationId, + retryInputWithSkillContent, + buildSendOptions(), + socket, + runtime, + runtime.activeAbortController.signal, + ) + : await sendMessageStreamWithRetry( + conversationId, + retryInputWithSkillContent, + buildSendOptions(), + socket, + runtime, + runtime.activeAbortController.signal, + ); + currentInput = retryInputWithSkillContent; if (!stream) { return; }