diff --git a/src/tests/websocket/listen-client-concurrency.test.ts b/src/tests/websocket/listen-client-concurrency.test.ts
index 4e117d2..8901a2c 100644
--- a/src/tests/websocket/listen-client-concurrency.test.ts
+++ b/src/tests/websocket/listen-client-concurrency.test.ts
@@ -6,6 +6,9 @@ import type {
MessageQueueItem,
TaskNotificationQueueItem,
} from "../../queue/queueRuntime";
+import { queueSkillContent } from "../../tools/impl/skillContentRegistry";
+import { resolveRecoveredApprovalResponse } from "../../websocket/listener/recovery";
+import { injectQueuedSkillContent } from "../../websocket/listener/skill-injection";
import type { IncomingMessage } from "../../websocket/listener/types";
type MockStream = {
@@ -197,6 +200,8 @@ function makeIncomingMessage(
describe("listen-client multi-worker concurrency", () => {
beforeEach(() => {
+ queueSkillContent("__test-cleanup__", "__test-cleanup__");
+ injectQueuedSkillContent([]);
permissionMode.reset();
sendMessageStreamMock.mockClear();
getStreamToolContextIdMock.mockClear();
@@ -753,6 +758,11 @@ describe("listen-client multi-worker concurrency", () => {
throw new Error("Expected stale recovery queued task item");
}
+ queueSkillContent(
+ "tool-call-1",
+ "stale recovery skill content",
+ );
+
const recoveryPromise = __listenClientTestUtils.resolveStaleApprovals(
runtime,
socket as unknown as WebSocket,
@@ -766,7 +776,7 @@ describe("listen-client multi-worker concurrency", () => {
const continuationMessages = sendMessageStreamMock.mock.calls[0]?.[1] as
| Array>
| undefined;
- expect(continuationMessages).toHaveLength(2);
+ expect(continuationMessages).toHaveLength(3);
expect(continuationMessages?.[0]).toEqual(
expect.objectContaining({
type: "approval",
@@ -785,6 +795,16 @@ describe("listen-client multi-worker concurrency", () => {
},
],
});
+ expect(continuationMessages?.[2]).toEqual({
+ role: "user",
+ content: [
+ {
+ type: "text",
+ text: "stale recovery skill content",
+ },
+ ],
+ otid: expect.any(String),
+ });
expect(runtime.loopStatus as string).toBe("PROCESSING_API_RESPONSE");
expect(runtime.queueRuntime.length).toBe(0);
expect(runtime.queuedMessagesByItemId.size).toBe(0);
@@ -809,6 +829,156 @@ describe("listen-client multi-worker concurrency", () => {
});
});
+ test("interrupt-queue approval continuation appends skill content as trailing user message", async () => {
+ const listener = __listenClientTestUtils.createListenerRuntime();
+ __listenClientTestUtils.setActiveRuntime(listener);
+ const runtime = __listenClientTestUtils.getOrCreateScopedRuntime(
+ listener,
+ "agent-1",
+ "conv-int",
+ );
+ const socket = new MockSocket();
+
+ runtime.pendingInterruptedResults = [
+ {
+ type: "approval",
+ tool_call_id: "call-int",
+ approve: false,
+ reason: "Interrupted by user",
+ },
+ ] as never;
+ runtime.pendingInterruptedContext = {
+ agentId: "agent-1",
+ conversationId: "conv-int",
+ continuationEpoch: runtime.continuationEpoch,
+ };
+ runtime.pendingInterruptedToolCallIds = ["call-int"];
+
+ queueSkillContent(
+ "call-int",
+ "interrupt path skill content",
+ );
+
+ await __listenClientTestUtils.handleIncomingMessage(
+ {
+ type: "message",
+ agentId: "agent-1",
+ conversationId: "conv-int",
+ messages: [],
+ } as unknown as IncomingMessage,
+ socket as unknown as WebSocket,
+ runtime,
+ );
+
+ expect(sendMessageStreamMock.mock.calls.length).toBeGreaterThan(0);
+ const firstSendMessages = sendMessageStreamMock.mock.calls[0]?.[1] as
+ | Array>
+ | undefined;
+
+ expect(firstSendMessages).toHaveLength(2);
+ expect(firstSendMessages?.[0]).toMatchObject({
+ type: "approval",
+ approvals: [
+ {
+ tool_call_id: "call-int",
+ approve: false,
+ reason: "Interrupted by user",
+ },
+ ],
+ });
+ expect(firstSendMessages?.[1]).toEqual({
+ role: "user",
+ content: [
+ {
+ type: "text",
+ text: "interrupt path skill content",
+ },
+ ],
+ otid: expect.any(String),
+ });
+ });
+
+ test("recovered approval replay keeps approval-only routing and appends skill content at send boundary", async () => {
+ const listener = __listenClientTestUtils.createListenerRuntime();
+ __listenClientTestUtils.setActiveRuntime(listener);
+ const runtime = __listenClientTestUtils.getOrCreateScopedRuntime(
+ listener,
+ "agent-1",
+ "conv-recovered",
+ );
+ const socket = new MockSocket();
+
+ runtime.recoveredApprovalState = {
+ agentId: "agent-1",
+ conversationId: "conv-recovered",
+ approvalsByRequestId: new Map([
+ [
+ "perm-recovered-1",
+ {
+ approval: {
+ toolCallId: "tool-call-recovered-1",
+ toolName: "Write",
+ toolArgs: '{"file_path":"foo.ts"}',
+ },
+ controlRequest: {
+ type: "control_request",
+ request_id: "perm-recovered-1",
+ request: {
+ subtype: "can_use_tool",
+ tool_name: "Write",
+ input: { file_path: "foo.ts" },
+ tool_call_id: "tool-call-recovered-1",
+ permission_suggestions: [],
+ blocked_path: null,
+ },
+ agent_id: "agent-1",
+ conversation_id: "conv-recovered",
+ },
+ },
+ ],
+ ]),
+ pendingRequestIds: new Set(["perm-recovered-1"]),
+ responsesByRequestId: new Map(),
+ };
+
+ queueSkillContent(
+ "tool-call-recovered-1",
+ "recovered skill content",
+ );
+
+ await resolveRecoveredApprovalResponse(
+ runtime,
+ socket as unknown as WebSocket,
+ {
+ request_id: "perm-recovered-1",
+ decision: { behavior: "allow" },
+ },
+ __listenClientTestUtils.handleIncomingMessage,
+ {},
+ );
+
+ expect(sendMessageStreamMock.mock.calls.length).toBeGreaterThan(0);
+ const firstSendMessages = sendMessageStreamMock.mock.calls[0]?.[1] as
+ | Array>
+ | undefined;
+
+ expect(firstSendMessages).toHaveLength(2);
+ expect(firstSendMessages?.[0]).toMatchObject({
+ type: "approval",
+ approvals: [],
+ });
+ expect(firstSendMessages?.[1]).toEqual({
+ role: "user",
+ content: [
+ {
+ type: "text",
+ text: "recovered skill content",
+ },
+ ],
+ otid: expect.any(String),
+ });
+ });
+
test("queue pump status callbacks stay aggregate when another conversation is busy", async () => {
const listener = __listenClientTestUtils.createListenerRuntime();
__listenClientTestUtils.setActiveRuntime(listener);
diff --git a/src/websocket/listener/send.ts b/src/websocket/listener/send.ts
index e6f9394..733699b 100644
--- a/src/websocket/listener/send.ts
+++ b/src/websocket/listener/send.ts
@@ -55,6 +55,7 @@ import {
getApprovalContinuationRecoveryDisposition,
isApprovalToolCallDesyncError,
} from "./recovery";
+import { injectQueuedSkillContent } from "./skill-injection";
import type { ConversationRuntime } from "./types";
export function isApprovalOnlyInput(
@@ -300,9 +301,11 @@ export async function resolveStaleApprovals(
emitDequeuedUserMessage(socket, runtime, queuedTurn, dequeuedBatch);
}
+ const continuationMessagesWithSkillContent =
+ injectQueuedSkillContent(continuationMessages);
const recoveryStream = await sendApprovalContinuationWithRetry(
recoveryConversationId,
- continuationMessages,
+ continuationMessagesWithSkillContent,
{
agentId: runtime.agentId ?? undefined,
streamTokens: true,
diff --git a/src/websocket/listener/skill-injection.ts b/src/websocket/listener/skill-injection.ts
new file mode 100644
index 0000000..796be78
--- /dev/null
+++ b/src/websocket/listener/skill-injection.ts
@@ -0,0 +1,30 @@
+import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents";
+import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages";
+import { consumeQueuedSkillContent } from "../../tools/impl/skillContentRegistry";
+
+/**
+ * Append queued Skill tool content as a trailing user message.
+ *
+ * Ordering is preserved: existing messages stay in place and skill content,
+ * when present, is appended at the end.
+ */
+export function injectQueuedSkillContent(
+ messages: Array,
+): Array {
+ const skillContents = consumeQueuedSkillContent();
+ if (skillContents.length === 0) {
+ return messages;
+ }
+
+ return [
+ ...messages,
+ {
+ role: "user",
+ otid: crypto.randomUUID(),
+ content: skillContents.map((sc) => ({
+ type: "text" as const,
+ text: sc.content,
+ })),
+ },
+ ];
+}
diff --git a/src/websocket/listener/turn-approval.ts b/src/websocket/listener/turn-approval.ts
index 586a843..096f713 100644
--- a/src/websocket/listener/turn-approval.ts
+++ b/src/websocket/listener/turn-approval.ts
@@ -42,6 +42,7 @@ import {
markAwaitingAcceptedApprovalContinuationRunId,
sendApprovalContinuationWithRetry,
} from "./send";
+import { injectQueuedSkillContent } from "./skill-injection";
import type { ConversationRuntime } from "./types";
type Decision =
@@ -332,13 +333,15 @@ export async function handleApprovalStop(params: {
emitDequeuedUserMessage(socket, runtime, queuedTurn, dequeuedBatch);
}
+ const nextInputWithSkillContent = injectQueuedSkillContent(nextInput);
+
setLoopStatus(runtime, "SENDING_API_REQUEST", {
agent_id: agentId,
conversation_id: conversationId,
});
const stream = await sendApprovalContinuationWithRetry(
conversationId,
- nextInput,
+ nextInputWithSkillContent,
buildSendOptions(),
socket,
runtime,
@@ -348,7 +351,7 @@ export async function handleApprovalStop(params: {
return {
terminated: true,
stream: null,
- currentInput: nextInput,
+ currentInput: nextInputWithSkillContent,
dequeuedBatchId: continuationBatchId,
pendingNormalizationInterruptedToolCallIds: [],
turnToolContextId,
@@ -392,7 +395,7 @@ export async function handleApprovalStop(params: {
return {
terminated: false,
stream,
- currentInput: nextInput,
+ currentInput: nextInputWithSkillContent,
dequeuedBatchId: continuationBatchId,
pendingNormalizationInterruptedToolCallIds: [],
turnToolContextId: null,
diff --git a/src/websocket/listener/turn.ts b/src/websocket/listener/turn.ts
index 9dc08db..6f48c28 100644
--- a/src/websocket/listener/turn.ts
+++ b/src/websocket/listener/turn.ts
@@ -75,6 +75,7 @@ import {
sendApprovalContinuationWithRetry,
sendMessageStreamWithRetry,
} from "./send";
+import { injectQueuedSkillContent } from "./skill-injection";
import { handleApprovalStop } from "./turn-approval";
import type { ConversationRuntime, IncomingMessage } from "./types";
@@ -235,11 +236,12 @@ export async function handleIncomingMessage(
});
const isPureApprovalContinuation = isApprovalOnlyInput(currentInput);
+ const currentInputWithSkillContent = injectQueuedSkillContent(currentInput);
let stream = isPureApprovalContinuation
? await sendApprovalContinuationWithRetry(
conversationId,
- currentInput,
+ currentInputWithSkillContent,
buildSendOptions(),
socket,
runtime,
@@ -247,12 +249,13 @@ export async function handleIncomingMessage(
)
: await sendMessageStreamWithRetry(
conversationId,
- currentInput,
+ currentInputWithSkillContent,
buildSendOptions(),
socket,
runtime,
runtime.activeAbortController.signal,
);
+ currentInput = currentInputWithSkillContent;
if (!stream) {
return;
}
@@ -420,27 +423,28 @@ export async function handleIncomingMessage(
agent_id: agentId,
conversation_id: conversationId,
});
- stream =
- currentInput.length === 1 &&
- currentInput[0] !== undefined &&
- "type" in currentInput[0] &&
- currentInput[0].type === "approval"
- ? await sendApprovalContinuationWithRetry(
- conversationId,
- currentInput,
- buildSendOptions(),
- socket,
- runtime,
- runtime.activeAbortController.signal,
- )
- : await sendMessageStreamWithRetry(
- conversationId,
- currentInput,
- buildSendOptions(),
- socket,
- runtime,
- runtime.activeAbortController.signal,
- );
+ const isPureApprovalContinuationRetry =
+ isApprovalOnlyInput(currentInput);
+ const retryInputWithSkillContent =
+ injectQueuedSkillContent(currentInput);
+ stream = isPureApprovalContinuationRetry
+ ? await sendApprovalContinuationWithRetry(
+ conversationId,
+ retryInputWithSkillContent,
+ buildSendOptions(),
+ socket,
+ runtime,
+ runtime.activeAbortController.signal,
+ )
+ : await sendMessageStreamWithRetry(
+ conversationId,
+ retryInputWithSkillContent,
+ buildSendOptions(),
+ socket,
+ runtime,
+ runtime.activeAbortController.signal,
+ );
+ currentInput = retryInputWithSkillContent;
if (!stream) {
return;
}
@@ -503,27 +507,28 @@ export async function handleIncomingMessage(
agent_id: agentId,
conversation_id: conversationId,
});
- stream =
- currentInput.length === 1 &&
- currentInput[0] !== undefined &&
- "type" in currentInput[0] &&
- currentInput[0].type === "approval"
- ? await sendApprovalContinuationWithRetry(
- conversationId,
- currentInput,
- buildSendOptions(),
- socket,
- runtime,
- runtime.activeAbortController.signal,
- )
- : await sendMessageStreamWithRetry(
- conversationId,
- currentInput,
- buildSendOptions(),
- socket,
- runtime,
- runtime.activeAbortController.signal,
- );
+ const isPureApprovalContinuationRetry =
+ isApprovalOnlyInput(currentInput);
+ const retryInputWithSkillContent =
+ injectQueuedSkillContent(currentInput);
+ stream = isPureApprovalContinuationRetry
+ ? await sendApprovalContinuationWithRetry(
+ conversationId,
+ retryInputWithSkillContent,
+ buildSendOptions(),
+ socket,
+ runtime,
+ runtime.activeAbortController.signal,
+ )
+ : await sendMessageStreamWithRetry(
+ conversationId,
+ retryInputWithSkillContent,
+ buildSendOptions(),
+ socket,
+ runtime,
+ runtime.activeAbortController.signal,
+ );
+ currentInput = retryInputWithSkillContent;
if (!stream) {
return;
}
@@ -574,27 +579,28 @@ export async function handleIncomingMessage(
agent_id: agentId,
conversation_id: conversationId,
});
- stream =
- currentInput.length === 1 &&
- currentInput[0] !== undefined &&
- "type" in currentInput[0] &&
- currentInput[0].type === "approval"
- ? await sendApprovalContinuationWithRetry(
- conversationId,
- currentInput,
- buildSendOptions(),
- socket,
- runtime,
- runtime.activeAbortController.signal,
- )
- : await sendMessageStreamWithRetry(
- conversationId,
- currentInput,
- buildSendOptions(),
- socket,
- runtime,
- runtime.activeAbortController.signal,
- );
+ const isPureApprovalContinuationRetry =
+ isApprovalOnlyInput(currentInput);
+ const retryInputWithSkillContent =
+ injectQueuedSkillContent(currentInput);
+ stream = isPureApprovalContinuationRetry
+ ? await sendApprovalContinuationWithRetry(
+ conversationId,
+ retryInputWithSkillContent,
+ buildSendOptions(),
+ socket,
+ runtime,
+ runtime.activeAbortController.signal,
+ )
+ : await sendMessageStreamWithRetry(
+ conversationId,
+ retryInputWithSkillContent,
+ buildSendOptions(),
+ socket,
+ runtime,
+ runtime.activeAbortController.signal,
+ );
+ currentInput = retryInputWithSkillContent;
if (!stream) {
return;
}