From 3728d1ec0c2fd269594c491a050df6bf6e57b8b2 Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Tue, 17 Feb 2026 12:58:33 -0800 Subject: [PATCH] refactor: extract shared turn queue runtime (#991) --- src/cli/helpers/queuedMessageParts.ts | 28 +++++------ src/headless.ts | 53 ++++---------------- src/queue/turnQueueRuntime.ts | 61 +++++++++++++++++++++++ src/tests/queue/turnQueueRuntime.test.ts | 63 ++++++++++++++++++++++++ 4 files changed, 148 insertions(+), 57 deletions(-) create mode 100644 src/queue/turnQueueRuntime.ts create mode 100644 src/tests/queue/turnQueueRuntime.test.ts diff --git a/src/cli/helpers/queuedMessageParts.ts b/src/cli/helpers/queuedMessageParts.ts index 9a54a08..2ddf3f2 100644 --- a/src/cli/helpers/queuedMessageParts.ts +++ b/src/cli/helpers/queuedMessageParts.ts @@ -1,4 +1,5 @@ import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents"; +import { mergeQueuedTurnInput } from "../../queue/turnQueueRuntime"; import type { QueuedMessage } from "./messageQueueBridge"; import { buildMessageContentFromDisplay } from "./pasteRegistry"; import { extractTaskNotificationsForDisplay } from "./taskNotifications"; @@ -18,21 +19,20 @@ export function getQueuedNotificationSummaries( export function buildQueuedContentParts( queued: QueuedMessage[], ): MessageCreate["content"] { - const parts: MessageCreate["content"] = []; - let isFirst = true; - for (const item of queued) { - if (!isFirst) { - parts.push({ type: "text", text: "\n" }); - } - isFirst = false; - if (item.kind === "task_notification") { - parts.push({ type: "text", text: item.text }); - continue; - } - const userParts = buildMessageContentFromDisplay(item.text); - parts.push(...userParts); + const queueInput = queued.map((item) => + item.kind === "task_notification" + ? ({ kind: "task_notification", text: item.text } as const) + : ({ kind: "user", content: item.text } as const), + ); + + const merged = mergeQueuedTurnInput(queueInput, { + normalizeUserContent: buildMessageContentFromDisplay, + }); + + if (merged === null) { + return []; } - return parts; + return merged; } export function buildQueuedUserText(queued: QueuedMessage[]): string { diff --git a/src/headless.ts b/src/headless.ts index a6b04e8..07189f3 100644 --- a/src/headless.ts +++ b/src/headless.ts @@ -50,6 +50,10 @@ import { drainStreamWithResume, } from "./cli/helpers/stream"; import { SYSTEM_REMINDER_CLOSE, SYSTEM_REMINDER_OPEN } from "./constants"; +import { + mergeQueuedTurnInput, + type QueuedTurnInput, +} from "./queue/turnQueueRuntime"; import { settingsManager } from "./settings-manager"; import { isHeadlessAutoAllowTool, @@ -124,53 +128,16 @@ export function shouldReinjectSkillsAfterCompaction(lines: Line[]): boolean { (line.summary !== undefined || line.stats !== undefined), ); } - -type MessageContentParts = Exclude; - -export type BidirectionalQueuedInput = - | { - kind: "user"; - content: MessageCreate["content"]; - } - | { - kind: "task_notification"; - text: string; - }; +export type BidirectionalQueuedInput = QueuedTurnInput< + MessageCreate["content"] +>; export function mergeBidirectionalQueuedInput( queued: BidirectionalQueuedInput[], ): MessageCreate["content"] | null { - if (queued.length === 0) { - return null; - } - - const mergedParts: MessageContentParts = []; - let isFirst = true; - - for (const item of queued) { - if (!isFirst) { - mergedParts.push({ type: "text", text: "\n" }); - } - isFirst = false; - - if (item.kind === "task_notification") { - mergedParts.push({ type: "text", text: item.text }); - continue; - } - - if (typeof item.content === "string") { - mergedParts.push({ type: "text", text: item.content }); - continue; - } - - mergedParts.push(...item.content); - } - - if (mergedParts.length === 0) { - return null; - } - - return mergedParts as MessageCreate["content"]; + return mergeQueuedTurnInput(queued, { + normalizeUserContent: (content) => content, + }); } type ReflectionOverrides = { diff --git a/src/queue/turnQueueRuntime.ts b/src/queue/turnQueueRuntime.ts new file mode 100644 index 0000000..0f5c8b1 --- /dev/null +++ b/src/queue/turnQueueRuntime.ts @@ -0,0 +1,61 @@ +import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents"; + +type MessageContentParts = Exclude; + +export type QueuedTurnInput = + | { + kind: "user"; + content: TUserContent; + } + | { + kind: "task_notification"; + text: string; + }; + +type MergeQueuedTurnInputOptions = { + normalizeUserContent: (content: TUserContent) => MessageCreate["content"]; + separatorText?: string; +}; + +function appendContentParts( + target: MessageContentParts, + content: MessageCreate["content"], +): void { + if (typeof content === "string") { + target.push({ type: "text", text: content }); + return; + } + target.push(...content); +} + +export function mergeQueuedTurnInput( + queued: QueuedTurnInput[], + options: MergeQueuedTurnInputOptions, +): MessageCreate["content"] | null { + if (queued.length === 0) { + return null; + } + + const separatorText = options.separatorText ?? "\n"; + + const mergedParts: MessageContentParts = []; + let isFirst = true; + + for (const item of queued) { + if (!isFirst) { + mergedParts.push({ type: "text", text: separatorText }); + } + isFirst = false; + + if (item.kind === "task_notification") { + mergedParts.push({ type: "text", text: item.text }); + continue; + } + + appendContentParts(mergedParts, options.normalizeUserContent(item.content)); + } + + return mergedParts.length > 0 + ? (mergedParts as MessageCreate["content"]) + : null; +} diff --git a/src/tests/queue/turnQueueRuntime.test.ts b/src/tests/queue/turnQueueRuntime.test.ts new file mode 100644 index 0000000..b30b3ec --- /dev/null +++ b/src/tests/queue/turnQueueRuntime.test.ts @@ -0,0 +1,63 @@ +import { describe, expect, test } from "bun:test"; +import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents"; +import { + mergeQueuedTurnInput, + type QueuedTurnInput, +} from "../../queue/turnQueueRuntime"; + +describe("turnQueueRuntime", () => { + test("merges user and task notification entries with separators", () => { + const queued: QueuedTurnInput[] = [ + { kind: "user", content: "hello" }, + { + kind: "task_notification", + text: "done", + }, + { kind: "user", content: "world" }, + ]; + + const merged = mergeQueuedTurnInput(queued, { + normalizeUserContent: (content) => content, + }); + + expect(Array.isArray(merged)).toBe(true); + if (!Array.isArray(merged)) return; + const text = merged.flatMap((part) => + part.type === "text" ? [part.text] : [], + ); + expect(text.join("")).toBe( + "hello\ndone\nworld", + ); + }); + + test("preserves multimodal user content", () => { + const content = [ + { type: "text", text: "describe this" }, + { + type: "image", + source: { type: "base64", media_type: "image/png", data: "abc" }, + }, + ] as unknown as Exclude; + + const queued: QueuedTurnInput[] = [ + { kind: "user", content }, + ]; + + const merged = mergeQueuedTurnInput(queued, { + normalizeUserContent: (userContent) => userContent, + }); + + expect(Array.isArray(merged)).toBe(true); + if (!Array.isArray(merged)) return; + expect(merged[0]).toEqual(content[0]); + expect(merged[1]).toEqual(content[1]); + }); + + test("returns null when no queued items exist", () => { + expect( + mergeQueuedTurnInput([], { + normalizeUserContent: (content: string) => content, + }), + ).toBeNull(); + }); +});