refactor: extract shared turn queue runtime (#991)

This commit is contained in:
Charles Packer
2026-02-17 12:58:33 -08:00
committed by GitHub
parent 44d4cc87c1
commit 3728d1ec0c
4 changed files with 148 additions and 57 deletions

View File

@@ -1,4 +1,5 @@
import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents";
import { mergeQueuedTurnInput } from "../../queue/turnQueueRuntime";
import type { QueuedMessage } from "./messageQueueBridge";
import { buildMessageContentFromDisplay } from "./pasteRegistry";
import { extractTaskNotificationsForDisplay } from "./taskNotifications";
@@ -18,21 +19,20 @@ export function getQueuedNotificationSummaries(
export function buildQueuedContentParts(
queued: QueuedMessage[],
): MessageCreate["content"] {
const parts: MessageCreate["content"] = [];
let isFirst = true;
for (const item of queued) {
if (!isFirst) {
parts.push({ type: "text", text: "\n" });
}
isFirst = false;
if (item.kind === "task_notification") {
parts.push({ type: "text", text: item.text });
continue;
}
const userParts = buildMessageContentFromDisplay(item.text);
parts.push(...userParts);
const queueInput = queued.map((item) =>
item.kind === "task_notification"
? ({ kind: "task_notification", text: item.text } as const)
: ({ kind: "user", content: item.text } as const),
);
const merged = mergeQueuedTurnInput(queueInput, {
normalizeUserContent: buildMessageContentFromDisplay,
});
if (merged === null) {
return [];
}
return parts;
return merged;
}
export function buildQueuedUserText(queued: QueuedMessage[]): string {

View File

@@ -50,6 +50,10 @@ import {
drainStreamWithResume,
} from "./cli/helpers/stream";
import { SYSTEM_REMINDER_CLOSE, SYSTEM_REMINDER_OPEN } from "./constants";
import {
mergeQueuedTurnInput,
type QueuedTurnInput,
} from "./queue/turnQueueRuntime";
import { settingsManager } from "./settings-manager";
import {
isHeadlessAutoAllowTool,
@@ -124,53 +128,16 @@ export function shouldReinjectSkillsAfterCompaction(lines: Line[]): boolean {
(line.summary !== undefined || line.stats !== undefined),
);
}
type MessageContentParts = Exclude<MessageCreate["content"], string>;
export type BidirectionalQueuedInput =
| {
kind: "user";
content: MessageCreate["content"];
}
| {
kind: "task_notification";
text: string;
};
export type BidirectionalQueuedInput = QueuedTurnInput<
MessageCreate["content"]
>;
export function mergeBidirectionalQueuedInput(
queued: BidirectionalQueuedInput[],
): MessageCreate["content"] | null {
if (queued.length === 0) {
return null;
}
const mergedParts: MessageContentParts = [];
let isFirst = true;
for (const item of queued) {
if (!isFirst) {
mergedParts.push({ type: "text", text: "\n" });
}
isFirst = false;
if (item.kind === "task_notification") {
mergedParts.push({ type: "text", text: item.text });
continue;
}
if (typeof item.content === "string") {
mergedParts.push({ type: "text", text: item.content });
continue;
}
mergedParts.push(...item.content);
}
if (mergedParts.length === 0) {
return null;
}
return mergedParts as MessageCreate["content"];
return mergeQueuedTurnInput(queued, {
normalizeUserContent: (content) => content,
});
}
type ReflectionOverrides = {

View File

@@ -0,0 +1,61 @@
import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents";
type MessageContentParts = Exclude<MessageCreate["content"], string>;
export type QueuedTurnInput<TUserContent> =
| {
kind: "user";
content: TUserContent;
}
| {
kind: "task_notification";
text: string;
};
type MergeQueuedTurnInputOptions<TUserContent> = {
normalizeUserContent: (content: TUserContent) => MessageCreate["content"];
separatorText?: string;
};
function appendContentParts(
target: MessageContentParts,
content: MessageCreate["content"],
): void {
if (typeof content === "string") {
target.push({ type: "text", text: content });
return;
}
target.push(...content);
}
export function mergeQueuedTurnInput<TUserContent>(
queued: QueuedTurnInput<TUserContent>[],
options: MergeQueuedTurnInputOptions<TUserContent>,
): MessageCreate["content"] | null {
if (queued.length === 0) {
return null;
}
const separatorText = options.separatorText ?? "\n";
const mergedParts: MessageContentParts = [];
let isFirst = true;
for (const item of queued) {
if (!isFirst) {
mergedParts.push({ type: "text", text: separatorText });
}
isFirst = false;
if (item.kind === "task_notification") {
mergedParts.push({ type: "text", text: item.text });
continue;
}
appendContentParts(mergedParts, options.normalizeUserContent(item.content));
}
return mergedParts.length > 0
? (mergedParts as MessageCreate["content"])
: null;
}

View File

@@ -0,0 +1,63 @@
import { describe, expect, test } from "bun:test";
import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents";
import {
mergeQueuedTurnInput,
type QueuedTurnInput,
} from "../../queue/turnQueueRuntime";
describe("turnQueueRuntime", () => {
test("merges user and task notification entries with separators", () => {
const queued: QueuedTurnInput<string>[] = [
{ kind: "user", content: "hello" },
{
kind: "task_notification",
text: "<task-notification>done</task-notification>",
},
{ kind: "user", content: "world" },
];
const merged = mergeQueuedTurnInput(queued, {
normalizeUserContent: (content) => content,
});
expect(Array.isArray(merged)).toBe(true);
if (!Array.isArray(merged)) return;
const text = merged.flatMap((part) =>
part.type === "text" ? [part.text] : [],
);
expect(text.join("")).toBe(
"hello\n<task-notification>done</task-notification>\nworld",
);
});
test("preserves multimodal user content", () => {
const content = [
{ type: "text", text: "describe this" },
{
type: "image",
source: { type: "base64", media_type: "image/png", data: "abc" },
},
] as unknown as Exclude<MessageCreate["content"], string>;
const queued: QueuedTurnInput<MessageCreate["content"]>[] = [
{ kind: "user", content },
];
const merged = mergeQueuedTurnInput(queued, {
normalizeUserContent: (userContent) => userContent,
});
expect(Array.isArray(merged)).toBe(true);
if (!Array.isArray(merged)) return;
expect(merged[0]).toEqual(content[0]);
expect(merged[1]).toEqual(content[1]);
});
test("returns null when no queued items exist", () => {
expect(
mergeQueuedTurnInput([], {
normalizeUserContent: (content: string) => content,
}),
).toBeNull();
});
});