From 725a2a7cadad2530c0d405d0c07cdd691ad2bd5a Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Wed, 31 Dec 2025 19:19:35 -0800 Subject: [PATCH] fix: update stream-json output format for SDK compatibility (#442) Co-authored-by: Letta --- .github/workflows/ci.yml | 2 + src/headless.ts | 66 +++++- src/index.ts | 3 + src/tests/headless-stream-json-format.test.ts | 190 ++++++++++++++++++ 4 files changed, 254 insertions(+), 7 deletions(-) create mode 100644 src/tests/headless-stream-json-format.test.ts diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 17b107b..7a80ec3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -71,6 +71,8 @@ jobs: uses: t1m0thyj/unlock-keyring@v1 - name: Run tests (extended timeout) + env: + LETTA_API_KEY: ${{ secrets.LETTA_API_KEY }} run: bun test --timeout 15000 - name: Build bundle diff --git a/src/headless.ts b/src/headless.ts index 79f5023..9f88d6c 100644 --- a/src/headless.ts +++ b/src/headless.ts @@ -55,6 +55,7 @@ export async function handleHeadlessCommand( toolset: { type: "string" }, prompt: { type: "boolean", short: "p" }, "output-format": { type: "string" }, + "include-partial-messages": { type: "boolean" }, // Additional flags from index.ts that need to be filtered out help: { type: "boolean", short: "h" }, version: { type: "boolean", short: "v" }, @@ -391,6 +392,7 @@ export async function handleHeadlessCommand( // Validate output format const outputFormat = (values["output-format"] as string | undefined) || "text"; + const includePartialMessages = Boolean(values["include-partial-messages"]); if (!["text", "json", "stream-json"].includes(outputFormat)) { console.error( `Error: Invalid output format "${outputFormat}". Valid formats: text, json, stream-json`, @@ -404,13 +406,20 @@ export async function handleHeadlessCommand( // Initialize session stats const sessionStats = new SessionStats(); + // Use agent.id as session_id for all stream-json messages + const sessionId = agent.id; + // Output init event for stream-json format if (outputFormat === "stream-json") { const initEvent = { - type: "init", + type: "system", + subtype: "init", + session_id: sessionId, agent_id: agent.id, model: agent.llm_config?.model, tools: agent.tools?.map((t) => t.name) || [], + cwd: process.cwd(), + uuid: `init-${agent.id}`, }; console.log(JSON.stringify(initEvent)); } @@ -509,6 +518,8 @@ export async function handleHeadlessCommand( tool_name: decision.approval.toolName, tool_call_id: decision.approval.toolCallId, tool_args: decision.approval.toolArgs, + session_id: sessionId, + uuid: `auto-approval-${decision.approval.toolCallId}`, }), ); } @@ -624,6 +635,8 @@ export async function handleHeadlessCommand( type: "error", message: fullErrorText, detail: errorDetail, + session_id: sessionId, + uuid: crypto.randomUUID(), }), ); @@ -758,6 +771,8 @@ export async function handleHeadlessCommand( tool_args: incomingArgs, reason: permission.reason, matched_rule: permission.matchedRule, + session_id: sessionId, + uuid: `auto-approval-${id}`, }), ); autoApprovalEmitted.add(id); @@ -769,12 +784,34 @@ export async function handleHeadlessCommand( // Output chunk as message event (unless filtered) if (shouldOutputChunk) { - console.log( - JSON.stringify({ - type: "message", - ...chunk, - }), - ); + // Use existing otid or id from the Letta SDK chunk + const chunkWithIds = chunk as typeof chunk & { + otid?: string; + id?: string; + }; + const uuid = chunkWithIds.otid || chunkWithIds.id; + + if (includePartialMessages) { + // Emit as stream_event wrapper (like Claude Code with --include-partial-messages) + console.log( + JSON.stringify({ + type: "stream_event", + event: chunk, + session_id: sessionId, + uuid, + }), + ); + } else { + // Emit as regular message (default) + console.log( + JSON.stringify({ + type: "message", + ...chunk, + session_id: sessionId, + uuid, + }), + ); + } } // Still accumulate for approval tracking @@ -939,6 +976,8 @@ export async function handleHeadlessCommand( max_attempts: LLM_API_ERROR_MAX_RETRIES, delay_ms: delayMs, run_id: lastRunId, + session_id: sessionId, + uuid: `retry-${lastRunId || crypto.randomUUID()}`, }), ); } else { @@ -1011,6 +1050,8 @@ export async function handleHeadlessCommand( max_attempts: LLM_API_ERROR_MAX_RETRIES, delay_ms: delayMs, run_id: lastRunId, + session_id: sessionId, + uuid: `retry-${lastRunId || crypto.randomUUID()}`, }), ); } else { @@ -1077,6 +1118,8 @@ export async function handleHeadlessCommand( message: errorMessage, stop_reason: stopReason, run_id: lastRunId, + session_id: sessionId, + uuid: `error-${lastRunId || crypto.randomUUID()}`, }), ); } else { @@ -1097,6 +1140,8 @@ export async function handleHeadlessCommand( type: "error", message: errorDetails, run_id: lastKnownRunId, + session_id: sessionId, + uuid: `error-${lastKnownRunId || crypto.randomUUID()}`, }), ); } else { @@ -1177,10 +1222,16 @@ export async function handleHeadlessCommand( } } + // Use the last run_id as the result uuid if available, otherwise derive from agent_id + const resultUuid = + allRunIds.size > 0 + ? `result-${Array.from(allRunIds).pop()}` + : `result-${agent.id}`; const resultEvent = { type: "result", subtype: "success", is_error: false, + session_id: sessionId, duration_ms: Math.round(stats.totalWallMs), duration_api_ms: Math.round(stats.totalApiMs), num_turns: stats.usage.stepCount, @@ -1192,6 +1243,7 @@ export async function handleHeadlessCommand( completion_tokens: stats.usage.completionTokens, total_tokens: stats.usage.totalTokens, }, + uuid: resultUuid, }; console.log(JSON.stringify(resultEvent)); } else { diff --git a/src/index.ts b/src/index.ts index d4d2e96..ab1f544 100755 --- a/src/index.ts +++ b/src/index.ts @@ -57,6 +57,8 @@ OPTIONS -p, --prompt Headless prompt mode --output-format Output format for headless mode (text, json, stream-json) Default: text + --include-partial-messages + Emit stream_event wrappers for each chunk (stream-json only) --skills Custom path to skills directory (default: .skills in current directory) --sleeptime Enable sleeptime memory management (only for new agents) --from-af Create agent from an AgentFile (.af) template @@ -337,6 +339,7 @@ async function main(): Promise { "permission-mode": { type: "string" }, yolo: { type: "boolean" }, "output-format": { type: "string" }, + "include-partial-messages": { type: "boolean" }, skills: { type: "string" }, link: { type: "boolean" }, unlink: { type: "boolean" }, diff --git a/src/tests/headless-stream-json-format.test.ts b/src/tests/headless-stream-json-format.test.ts new file mode 100644 index 0000000..a84c8ff --- /dev/null +++ b/src/tests/headless-stream-json-format.test.ts @@ -0,0 +1,190 @@ +import { describe, expect, test } from "bun:test"; +import { spawn } from "node:child_process"; + +/** + * Tests for stream-json output format. + * These verify the message structure matches the SDK-compatible format. + */ + +async function runHeadlessCommand( + prompt: string, + extraArgs: string[] = [], +): Promise { + return new Promise((resolve, reject) => { + const proc = spawn( + "bun", + [ + "run", + "dev", + "--new", + "-p", + prompt, + "--output-format", + "stream-json", + "--yolo", + "-m", + "haiku", + ...extraArgs, + ], + { + cwd: process.cwd(), + env: { ...process.env }, + }, + ); + + let stdout = ""; + let stderr = ""; + + proc.stdout.on("data", (data) => { + stdout += data.toString(); + }); + + proc.stderr.on("data", (data) => { + stderr += data.toString(); + }); + + proc.on("close", (code) => { + if (code !== 0 && !stdout.includes('"type":"result"')) { + reject(new Error(`Process exited with code ${code}: ${stderr}`)); + } else { + // Parse line-delimited JSON + const lines = stdout + .split("\n") + .filter((line) => line.trim()) + .filter((line) => { + try { + JSON.parse(line); + return true; + } catch { + return false; + } + }); + resolve(lines); + } + }); + }); +} + +// Prescriptive prompt to ensure single-step response without tool use +const FAST_PROMPT = + "This is a test. Do not call any tools. Just respond with the word OK and nothing else."; + +describe("stream-json format", () => { + test( + "init message has type 'system' with subtype 'init'", + async () => { + const lines = await runHeadlessCommand(FAST_PROMPT); + const initLine = lines.find((line) => { + const obj = JSON.parse(line); + return obj.type === "system" && obj.subtype === "init"; + }); + + expect(initLine).toBeDefined(); + + const init = JSON.parse(initLine!); + expect(init.type).toBe("system"); + expect(init.subtype).toBe("init"); + expect(init.agent_id).toBeDefined(); + expect(init.session_id).toBe(init.agent_id); // session_id should equal agent_id + expect(init.model).toBeDefined(); + expect(init.tools).toBeInstanceOf(Array); + expect(init.cwd).toBeDefined(); + expect(init.uuid).toBe(`init-${init.agent_id}`); + }, + { timeout: 60000 }, + ); + + test( + "messages have session_id and uuid", + async () => { + const lines = await runHeadlessCommand(FAST_PROMPT); + + // Find a message line + const messageLine = lines.find((line) => { + const obj = JSON.parse(line); + return obj.type === "message"; + }); + + expect(messageLine).toBeDefined(); + + const msg = JSON.parse(messageLine!); + expect(msg.session_id).toBeDefined(); + expect(msg.uuid).toBeDefined(); + // uuid should be otid or id from the Letta SDK chunk + expect(msg.uuid).toBeTruthy(); + }, + { timeout: 60000 }, + ); + + test( + "result message has correct format", + async () => { + const lines = await runHeadlessCommand(FAST_PROMPT); + const resultLine = lines.find((line) => { + const obj = JSON.parse(line); + return obj.type === "result"; + }); + + expect(resultLine).toBeDefined(); + + const result = JSON.parse(resultLine!); + expect(result.type).toBe("result"); + expect(result.subtype).toBe("success"); + expect(result.session_id).toBeDefined(); + expect(result.agent_id).toBeDefined(); + expect(result.session_id).toBe(result.agent_id); + expect(result.duration_ms).toBeGreaterThan(0); + expect(result.uuid).toContain("result-"); + expect(result.result).toBeDefined(); + }, + { timeout: 60000 }, + ); + + test( + "--include-partial-messages wraps chunks in stream_event", + async () => { + const lines = await runHeadlessCommand(FAST_PROMPT, [ + "--include-partial-messages", + ]); + + // Find a stream_event line + const streamEventLine = lines.find((line) => { + const obj = JSON.parse(line); + return obj.type === "stream_event"; + }); + + expect(streamEventLine).toBeDefined(); + + const event = JSON.parse(streamEventLine!); + expect(event.type).toBe("stream_event"); + expect(event.event).toBeDefined(); + expect(event.session_id).toBeDefined(); + expect(event.uuid).toBeDefined(); + // The event should contain the original Letta SDK chunk + expect(event.event.message_type).toBeDefined(); + }, + { timeout: 60000 }, + ); + + test( + "without --include-partial-messages, messages are type 'message'", + async () => { + const lines = await runHeadlessCommand(FAST_PROMPT); + + // Should have message lines, not stream_event + const messageLine = lines.find((line) => { + const obj = JSON.parse(line); + return obj.type === "message"; + }); + + const streamEventLine = lines.find((line) => { + const obj = JSON.parse(line); + return obj.type === "stream_event"; + }); + + expect(messageLine).toBeDefined(); + expect(streamEventLine).toBeUndefined(); + }, + { timeout: 60000 }, + ); +});