From 7cc8729e570778589c583c274eae6477a797e624 Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Tue, 17 Feb 2026 11:38:29 -0800 Subject: [PATCH] feat: align headless bidirectional queueing with TUI runtime (#990) --- src/cli/helpers/messageQueueBridge.ts | 2 + src/headless.ts | 149 +++++++++++++++++++++++++- src/tests/headless/queueing.test.ts | 66 ++++++++++++ 3 files changed, 216 insertions(+), 1 deletion(-) create mode 100644 src/tests/headless/queueing.test.ts diff --git a/src/cli/helpers/messageQueueBridge.ts b/src/cli/helpers/messageQueueBridge.ts index 6d96166..018c3d4 100644 --- a/src/cli/helpers/messageQueueBridge.ts +++ b/src/cli/helpers/messageQueueBridge.ts @@ -15,6 +15,8 @@ export type QueuedMessage = { type QueueAdder = (message: QueuedMessage) => void; +// Global bridge is intentionally single-consumer. Each process runs either +// one TUI App instance or one headless bidirectional loop. let queueAdder: QueueAdder | null = null; const pendingMessages: QueuedMessage[] = []; const MAX_PENDING_MESSAGES = 10; diff --git a/src/headless.ts b/src/headless.ts index b3508b1..a6b04e8 100644 --- a/src/headless.ts +++ b/src/headless.ts @@ -41,6 +41,10 @@ import { type ReflectionTrigger, reflectionSettingsToLegacyMode, } from "./cli/helpers/memoryReminder"; +import { + type QueuedMessage, + setMessageQueueAdder, +} from "./cli/helpers/messageQueueBridge"; import { type DrainStreamHook, drainStreamWithResume, @@ -121,6 +125,54 @@ export function shouldReinjectSkillsAfterCompaction(lines: Line[]): boolean { ); } +type MessageContentParts = Exclude; + +export type BidirectionalQueuedInput = + | { + kind: "user"; + content: MessageCreate["content"]; + } + | { + kind: "task_notification"; + text: string; + }; + +export function mergeBidirectionalQueuedInput( + queued: BidirectionalQueuedInput[], +): MessageCreate["content"] | null { + if (queued.length === 0) { + return null; + } + + const mergedParts: MessageContentParts = []; + let isFirst = true; + + for (const item of queued) { + if (!isFirst) { + mergedParts.push({ type: "text", text: "\n" }); + } + isFirst = false; + + if (item.kind === "task_notification") { + mergedParts.push({ type: "text", text: item.text }); + continue; + } + + if (typeof item.content === "string") { + mergedParts.push({ type: "text", text: item.content }); + continue; + } + + mergedParts.push(...item.content); + } + + if (mergedParts.length === 0) { + return null; + } + + return mergedParts as MessageCreate["content"]; +} + type ReflectionOverrides = { trigger?: ReflectionTrigger; behavior?: ReflectionBehavior; @@ -2393,6 +2445,29 @@ async function runBidirectionalMode( const lineQueue: string[] = []; let lineResolver: ((line: string | null) => void) | null = null; + const serializeQueuedMessageAsUserLine = (queuedMessage: QueuedMessage) => + JSON.stringify({ + type: "user", + message: { + role: "user", + content: queuedMessage.text, + }, + _queuedKind: queuedMessage.kind, + }); + + // Connect Task/subagent background notifications to the same queueing path + // used by user input so bidirectional mode inherits TUI-style queue behavior. + setMessageQueueAdder((queuedMessage) => { + const syntheticUserLine = serializeQueuedMessageAsUserLine(queuedMessage); + if (lineResolver) { + const resolve = lineResolver; + lineResolver = null; + resolve(syntheticUserLine); + return; + } + lineQueue.push(syntheticUserLine); + }); + // Feed lines into queue or resolver rl.on("line", (line) => { if (lineResolver) { @@ -2405,6 +2480,7 @@ async function runBidirectionalMode( }); rl.on("close", () => { + setMessageQueueAdder(null); if (lineResolver) { const resolve = lineResolver; lineResolver = null; @@ -2669,7 +2745,77 @@ async function runBidirectionalMode( // Handle user messages if (message.type === "user" && message.message?.content !== undefined) { - const userContent = message.message.content; + const queuedInputs: BidirectionalQueuedInput[] = [ + { + kind: "user", + content: message.message.content, + }, + ]; + + // Batch any already-buffered user lines into the same turn, mirroring + // TUI queue dequeue behavior (single coalesced submit when idle). + while (lineQueue.length > 0) { + const candidate = lineQueue[0]; + if (!candidate?.trim()) { + lineQueue.shift(); + continue; + } + + let parsedCandidate: { + type?: string; + message?: { content?: MessageCreate["content"] }; + _queuedKind?: QueuedMessage["kind"]; + }; + try { + parsedCandidate = JSON.parse(candidate); + } catch { + // Leave malformed lines for the main loop to surface as parse errors. + break; + } + + if ( + parsedCandidate.type === "user" && + parsedCandidate.message?.content !== undefined + ) { + lineQueue.shift(); + if (parsedCandidate._queuedKind === "task_notification") { + const notificationText = + typeof parsedCandidate.message.content === "string" + ? parsedCandidate.message.content + : parsedCandidate.message.content + .reduce((texts: string[], part) => { + if ( + part.type === "text" && + "text" in part && + typeof part.text === "string" + ) { + texts.push(part.text); + } + return texts; + }, []) + .join(""); + queuedInputs.push({ + kind: "task_notification", + text: notificationText, + }); + } else { + queuedInputs.push({ + kind: "user", + content: parsedCandidate.message.content, + }); + } + continue; + } + + // Stop coalescing when the queue head is not a user-input line. + // The outer loop must process control/error/system lines in-order. + break; + } + + const userContent = mergeBidirectionalQueuedInput(queuedInputs); + if (userContent === null) { + continue; + } // Create abort controller for this operation currentAbortController = new AbortController(); @@ -3180,5 +3326,6 @@ async function runBidirectionalMode( } // Stdin closed, exit gracefully + setMessageQueueAdder(null); process.exit(0); } diff --git a/src/tests/headless/queueing.test.ts b/src/tests/headless/queueing.test.ts new file mode 100644 index 0000000..ce0ad39 --- /dev/null +++ b/src/tests/headless/queueing.test.ts @@ -0,0 +1,66 @@ +import { describe, expect, test } from "bun:test"; +import { readFileSync } from "node:fs"; +import { fileURLToPath } from "node:url"; +import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents"; +import { + type BidirectionalQueuedInput, + mergeBidirectionalQueuedInput, +} from "../../headless"; + +describe("headless bidirectional queue merging", () => { + test("merges queued user and task notification inputs into one content payload", () => { + const queued: BidirectionalQueuedInput[] = [ + { kind: "user", content: "first user message" }, + { + kind: "task_notification", + text: "done", + }, + { kind: "user", content: "second user message" }, + ]; + + const merged = mergeBidirectionalQueuedInput(queued); + expect(Array.isArray(merged)).toBe(true); + if (!Array.isArray(merged)) return; + + const textParts = merged.flatMap((part) => + part.type === "text" ? [part.text] : [], + ); + expect(textParts.join("")).toContain("first user message"); + expect(textParts.join("")).toContain(""); + expect(textParts.join("")).toContain("second user message"); + }); + + test("preserves multimodal user content parts", () => { + const multimodal = [ + { type: "text", text: "describe image" }, + { + type: "image", + source: { type: "base64", media_type: "image/png", data: "abc" }, + }, + ] as unknown as Exclude; + + const queued: BidirectionalQueuedInput[] = [ + { kind: "user", content: multimodal }, + ]; + + const merged = mergeBidirectionalQueuedInput(queued); + expect(Array.isArray(merged)).toBe(true); + if (!Array.isArray(merged)) return; + expect(merged[0]).toEqual(multimodal[0]); + expect(merged[1]).toEqual(multimodal[1]); + }); +}); + +describe("headless bidirectional queue wiring", () => { + test("registers and clears messageQueueBridge adder in bidirectional mode", () => { + const headlessPath = fileURLToPath( + new URL("../../headless.ts", import.meta.url), + ); + const source = readFileSync(headlessPath, "utf-8"); + + expect(source).toContain("setMessageQueueAdder((queuedMessage) =>"); + expect(source).toContain("serializeQueuedMessageAsUserLine"); + expect(source).toContain("_queuedKind"); + expect(source).toContain("setMessageQueueAdder(null)"); + }); +});