fix(listener): inject queued skill content in websocket continuations (#1502)

Co-authored-by: Letta Code <noreply@letta.com>
This commit is contained in:
Charles Packer
2026-03-23 22:51:57 -07:00
committed by GitHub
parent 82b6df9377
commit af28a3f744
5 changed files with 282 additions and 70 deletions

View File

@@ -6,6 +6,9 @@ import type {
MessageQueueItem,
TaskNotificationQueueItem,
} from "../../queue/queueRuntime";
import { queueSkillContent } from "../../tools/impl/skillContentRegistry";
import { resolveRecoveredApprovalResponse } from "../../websocket/listener/recovery";
import { injectQueuedSkillContent } from "../../websocket/listener/skill-injection";
import type { IncomingMessage } from "../../websocket/listener/types";
type MockStream = {
@@ -197,6 +200,8 @@ function makeIncomingMessage(
describe("listen-client multi-worker concurrency", () => {
beforeEach(() => {
queueSkillContent("__test-cleanup__", "__test-cleanup__");
injectQueuedSkillContent([]);
permissionMode.reset();
sendMessageStreamMock.mockClear();
getStreamToolContextIdMock.mockClear();
@@ -753,6 +758,11 @@ describe("listen-client multi-worker concurrency", () => {
throw new Error("Expected stale recovery queued task item");
}
queueSkillContent(
"tool-call-1",
"<searching-messages>stale recovery skill content</searching-messages>",
);
const recoveryPromise = __listenClientTestUtils.resolveStaleApprovals(
runtime,
socket as unknown as WebSocket,
@@ -766,7 +776,7 @@ describe("listen-client multi-worker concurrency", () => {
const continuationMessages = sendMessageStreamMock.mock.calls[0]?.[1] as
| Array<Record<string, unknown>>
| undefined;
expect(continuationMessages).toHaveLength(2);
expect(continuationMessages).toHaveLength(3);
expect(continuationMessages?.[0]).toEqual(
expect.objectContaining({
type: "approval",
@@ -785,6 +795,16 @@ describe("listen-client multi-worker concurrency", () => {
},
],
});
expect(continuationMessages?.[2]).toEqual({
role: "user",
content: [
{
type: "text",
text: "<searching-messages>stale recovery skill content</searching-messages>",
},
],
otid: expect.any(String),
});
expect(runtime.loopStatus as string).toBe("PROCESSING_API_RESPONSE");
expect(runtime.queueRuntime.length).toBe(0);
expect(runtime.queuedMessagesByItemId.size).toBe(0);
@@ -809,6 +829,156 @@ describe("listen-client multi-worker concurrency", () => {
});
});
test("interrupt-queue approval continuation appends skill content as trailing user message", async () => {
const listener = __listenClientTestUtils.createListenerRuntime();
__listenClientTestUtils.setActiveRuntime(listener);
const runtime = __listenClientTestUtils.getOrCreateScopedRuntime(
listener,
"agent-1",
"conv-int",
);
const socket = new MockSocket();
runtime.pendingInterruptedResults = [
{
type: "approval",
tool_call_id: "call-int",
approve: false,
reason: "Interrupted by user",
},
] as never;
runtime.pendingInterruptedContext = {
agentId: "agent-1",
conversationId: "conv-int",
continuationEpoch: runtime.continuationEpoch,
};
runtime.pendingInterruptedToolCallIds = ["call-int"];
queueSkillContent(
"call-int",
"<searching-messages>interrupt path skill content</searching-messages>",
);
await __listenClientTestUtils.handleIncomingMessage(
{
type: "message",
agentId: "agent-1",
conversationId: "conv-int",
messages: [],
} as unknown as IncomingMessage,
socket as unknown as WebSocket,
runtime,
);
expect(sendMessageStreamMock.mock.calls.length).toBeGreaterThan(0);
const firstSendMessages = sendMessageStreamMock.mock.calls[0]?.[1] as
| Array<Record<string, unknown>>
| undefined;
expect(firstSendMessages).toHaveLength(2);
expect(firstSendMessages?.[0]).toMatchObject({
type: "approval",
approvals: [
{
tool_call_id: "call-int",
approve: false,
reason: "Interrupted by user",
},
],
});
expect(firstSendMessages?.[1]).toEqual({
role: "user",
content: [
{
type: "text",
text: "<searching-messages>interrupt path skill content</searching-messages>",
},
],
otid: expect.any(String),
});
});
test("recovered approval replay keeps approval-only routing and appends skill content at send boundary", async () => {
const listener = __listenClientTestUtils.createListenerRuntime();
__listenClientTestUtils.setActiveRuntime(listener);
const runtime = __listenClientTestUtils.getOrCreateScopedRuntime(
listener,
"agent-1",
"conv-recovered",
);
const socket = new MockSocket();
runtime.recoveredApprovalState = {
agentId: "agent-1",
conversationId: "conv-recovered",
approvalsByRequestId: new Map([
[
"perm-recovered-1",
{
approval: {
toolCallId: "tool-call-recovered-1",
toolName: "Write",
toolArgs: '{"file_path":"foo.ts"}',
},
controlRequest: {
type: "control_request",
request_id: "perm-recovered-1",
request: {
subtype: "can_use_tool",
tool_name: "Write",
input: { file_path: "foo.ts" },
tool_call_id: "tool-call-recovered-1",
permission_suggestions: [],
blocked_path: null,
},
agent_id: "agent-1",
conversation_id: "conv-recovered",
},
},
],
]),
pendingRequestIds: new Set(["perm-recovered-1"]),
responsesByRequestId: new Map(),
};
queueSkillContent(
"tool-call-recovered-1",
"<searching-messages>recovered skill content</searching-messages>",
);
await resolveRecoveredApprovalResponse(
runtime,
socket as unknown as WebSocket,
{
request_id: "perm-recovered-1",
decision: { behavior: "allow" },
},
__listenClientTestUtils.handleIncomingMessage,
{},
);
expect(sendMessageStreamMock.mock.calls.length).toBeGreaterThan(0);
const firstSendMessages = sendMessageStreamMock.mock.calls[0]?.[1] as
| Array<Record<string, unknown>>
| undefined;
expect(firstSendMessages).toHaveLength(2);
expect(firstSendMessages?.[0]).toMatchObject({
type: "approval",
approvals: [],
});
expect(firstSendMessages?.[1]).toEqual({
role: "user",
content: [
{
type: "text",
text: "<searching-messages>recovered skill content</searching-messages>",
},
],
otid: expect.any(String),
});
});
test("queue pump status callbacks stay aggregate when another conversation is busy", async () => {
const listener = __listenClientTestUtils.createListenerRuntime();
__listenClientTestUtils.setActiveRuntime(listener);

View File

@@ -55,6 +55,7 @@ import {
getApprovalContinuationRecoveryDisposition,
isApprovalToolCallDesyncError,
} from "./recovery";
import { injectQueuedSkillContent } from "./skill-injection";
import type { ConversationRuntime } from "./types";
export function isApprovalOnlyInput(
@@ -300,9 +301,11 @@ export async function resolveStaleApprovals(
emitDequeuedUserMessage(socket, runtime, queuedTurn, dequeuedBatch);
}
const continuationMessagesWithSkillContent =
injectQueuedSkillContent(continuationMessages);
const recoveryStream = await sendApprovalContinuationWithRetry(
recoveryConversationId,
continuationMessages,
continuationMessagesWithSkillContent,
{
agentId: runtime.agentId ?? undefined,
streamTokens: true,

View File

@@ -0,0 +1,30 @@
import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents";
import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages";
import { consumeQueuedSkillContent } from "../../tools/impl/skillContentRegistry";
/**
* Append queued Skill tool content as a trailing user message.
*
* Ordering is preserved: existing messages stay in place and skill content,
* when present, is appended at the end.
*/
export function injectQueuedSkillContent(
messages: Array<MessageCreate | ApprovalCreate>,
): Array<MessageCreate | ApprovalCreate> {
const skillContents = consumeQueuedSkillContent();
if (skillContents.length === 0) {
return messages;
}
return [
...messages,
{
role: "user",
otid: crypto.randomUUID(),
content: skillContents.map((sc) => ({
type: "text" as const,
text: sc.content,
})),
},
];
}

View File

@@ -42,6 +42,7 @@ import {
markAwaitingAcceptedApprovalContinuationRunId,
sendApprovalContinuationWithRetry,
} from "./send";
import { injectQueuedSkillContent } from "./skill-injection";
import type { ConversationRuntime } from "./types";
type Decision =
@@ -332,13 +333,15 @@ export async function handleApprovalStop(params: {
emitDequeuedUserMessage(socket, runtime, queuedTurn, dequeuedBatch);
}
const nextInputWithSkillContent = injectQueuedSkillContent(nextInput);
setLoopStatus(runtime, "SENDING_API_REQUEST", {
agent_id: agentId,
conversation_id: conversationId,
});
const stream = await sendApprovalContinuationWithRetry(
conversationId,
nextInput,
nextInputWithSkillContent,
buildSendOptions(),
socket,
runtime,
@@ -348,7 +351,7 @@ export async function handleApprovalStop(params: {
return {
terminated: true,
stream: null,
currentInput: nextInput,
currentInput: nextInputWithSkillContent,
dequeuedBatchId: continuationBatchId,
pendingNormalizationInterruptedToolCallIds: [],
turnToolContextId,
@@ -392,7 +395,7 @@ export async function handleApprovalStop(params: {
return {
terminated: false,
stream,
currentInput: nextInput,
currentInput: nextInputWithSkillContent,
dequeuedBatchId: continuationBatchId,
pendingNormalizationInterruptedToolCallIds: [],
turnToolContextId: null,

View File

@@ -75,6 +75,7 @@ import {
sendApprovalContinuationWithRetry,
sendMessageStreamWithRetry,
} from "./send";
import { injectQueuedSkillContent } from "./skill-injection";
import { handleApprovalStop } from "./turn-approval";
import type { ConversationRuntime, IncomingMessage } from "./types";
@@ -235,11 +236,12 @@ export async function handleIncomingMessage(
});
const isPureApprovalContinuation = isApprovalOnlyInput(currentInput);
const currentInputWithSkillContent = injectQueuedSkillContent(currentInput);
let stream = isPureApprovalContinuation
? await sendApprovalContinuationWithRetry(
conversationId,
currentInput,
currentInputWithSkillContent,
buildSendOptions(),
socket,
runtime,
@@ -247,12 +249,13 @@ export async function handleIncomingMessage(
)
: await sendMessageStreamWithRetry(
conversationId,
currentInput,
currentInputWithSkillContent,
buildSendOptions(),
socket,
runtime,
runtime.activeAbortController.signal,
);
currentInput = currentInputWithSkillContent;
if (!stream) {
return;
}
@@ -420,27 +423,28 @@ export async function handleIncomingMessage(
agent_id: agentId,
conversation_id: conversationId,
});
stream =
currentInput.length === 1 &&
currentInput[0] !== undefined &&
"type" in currentInput[0] &&
currentInput[0].type === "approval"
? await sendApprovalContinuationWithRetry(
conversationId,
currentInput,
buildSendOptions(),
socket,
runtime,
runtime.activeAbortController.signal,
)
: await sendMessageStreamWithRetry(
conversationId,
currentInput,
buildSendOptions(),
socket,
runtime,
runtime.activeAbortController.signal,
);
const isPureApprovalContinuationRetry =
isApprovalOnlyInput(currentInput);
const retryInputWithSkillContent =
injectQueuedSkillContent(currentInput);
stream = isPureApprovalContinuationRetry
? await sendApprovalContinuationWithRetry(
conversationId,
retryInputWithSkillContent,
buildSendOptions(),
socket,
runtime,
runtime.activeAbortController.signal,
)
: await sendMessageStreamWithRetry(
conversationId,
retryInputWithSkillContent,
buildSendOptions(),
socket,
runtime,
runtime.activeAbortController.signal,
);
currentInput = retryInputWithSkillContent;
if (!stream) {
return;
}
@@ -503,27 +507,28 @@ export async function handleIncomingMessage(
agent_id: agentId,
conversation_id: conversationId,
});
stream =
currentInput.length === 1 &&
currentInput[0] !== undefined &&
"type" in currentInput[0] &&
currentInput[0].type === "approval"
? await sendApprovalContinuationWithRetry(
conversationId,
currentInput,
buildSendOptions(),
socket,
runtime,
runtime.activeAbortController.signal,
)
: await sendMessageStreamWithRetry(
conversationId,
currentInput,
buildSendOptions(),
socket,
runtime,
runtime.activeAbortController.signal,
);
const isPureApprovalContinuationRetry =
isApprovalOnlyInput(currentInput);
const retryInputWithSkillContent =
injectQueuedSkillContent(currentInput);
stream = isPureApprovalContinuationRetry
? await sendApprovalContinuationWithRetry(
conversationId,
retryInputWithSkillContent,
buildSendOptions(),
socket,
runtime,
runtime.activeAbortController.signal,
)
: await sendMessageStreamWithRetry(
conversationId,
retryInputWithSkillContent,
buildSendOptions(),
socket,
runtime,
runtime.activeAbortController.signal,
);
currentInput = retryInputWithSkillContent;
if (!stream) {
return;
}
@@ -574,27 +579,28 @@ export async function handleIncomingMessage(
agent_id: agentId,
conversation_id: conversationId,
});
stream =
currentInput.length === 1 &&
currentInput[0] !== undefined &&
"type" in currentInput[0] &&
currentInput[0].type === "approval"
? await sendApprovalContinuationWithRetry(
conversationId,
currentInput,
buildSendOptions(),
socket,
runtime,
runtime.activeAbortController.signal,
)
: await sendMessageStreamWithRetry(
conversationId,
currentInput,
buildSendOptions(),
socket,
runtime,
runtime.activeAbortController.signal,
);
const isPureApprovalContinuationRetry =
isApprovalOnlyInput(currentInput);
const retryInputWithSkillContent =
injectQueuedSkillContent(currentInput);
stream = isPureApprovalContinuationRetry
? await sendApprovalContinuationWithRetry(
conversationId,
retryInputWithSkillContent,
buildSendOptions(),
socket,
runtime,
runtime.activeAbortController.signal,
)
: await sendMessageStreamWithRetry(
conversationId,
retryInputWithSkillContent,
buildSendOptions(),
socket,
runtime,
runtime.activeAbortController.signal,
);
currentInput = retryInputWithSkillContent;
if (!stream) {
return;
}