From 397560ef004fa00d7b5be1c4f7052292dc881dc6 Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Thu, 1 Jan 2026 20:05:41 -0800 Subject: [PATCH] feat: add typed wire format for stream-json protocol (#445) Co-authored-by: Letta --- build.js | 5 + package.json | 9 +- src/headless.ts | 424 ++++++++++-------- src/tests/headless-input-format.test.ts | 146 +++--- src/tests/headless-stream-json-format.test.ts | 24 +- src/types/wire.ts | 255 +++++++++++ tsconfig.types.json | 11 + 7 files changed, 618 insertions(+), 256 deletions(-) create mode 100644 src/types/wire.ts create mode 100644 tsconfig.types.json diff --git a/build.js b/build.js index b68a06c..a602ff2 100644 --- a/build.js +++ b/build.js @@ -75,6 +75,11 @@ if (existsSync(bundledSkillsSrc)) { console.log("📂 Copied bundled skills to skills/"); } +// Generate type declarations for wire types export +console.log("📝 Generating type declarations..."); +await Bun.$`bunx tsc -p tsconfig.types.json`; +console.log(" Output: dist/types/wire.d.ts"); + console.log("✅ Build complete!"); console.log(` Output: letta.js`); console.log(` Size: ${(Bun.file(outputPath).size / 1024).toFixed(0)}KB`); diff --git a/package.json b/package.json index 1ec31f2..f868e1f 100644 --- a/package.json +++ b/package.json @@ -12,8 +12,15 @@ "letta.js", "scripts", "skills", - "vendor" + "vendor", + "dist/types" ], + "exports": { + ".": "./letta.js", + "./wire-types": { + "types": "./dist/types/wire.d.ts" + } + }, "repository": { "type": "git", "url": "https://github.com/letta-ai/letta-code.git" diff --git a/src/headless.ts b/src/headless.ts index 1e65335..7e24e45 100644 --- a/src/headless.ts +++ b/src/headless.ts @@ -4,7 +4,10 @@ import type { AgentState, MessageCreate, } from "@letta-ai/letta-client/resources/agents/agents"; -import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages"; +import type { + ApprovalCreate, + LettaStreamingResponse, +} from "@letta-ai/letta-client/resources/agents/messages"; import type { StopReasonType } from "@letta-ai/letta-client/resources/runs/runs"; import type { ApprovalResult } from "./agent/approval-execution"; import { getClient } from "./agent/client"; @@ -28,6 +31,16 @@ import { forceUpsertTools, isToolsNotFoundError, } from "./tools/manager"; +import type { + AutoApprovalMessage, + ControlResponse, + ErrorMessage, + MessageWire, + ResultMessage, + RetryMessage, + StreamEvent, + SystemInitMessage, +} from "./types/wire"; // Maximum number of times to retry a turn when the backend // reports an `llm_api_error` stop reason. This helps smooth @@ -434,14 +447,18 @@ export async function handleHeadlessCommand( // Output init event for stream-json format if (outputFormat === "stream-json") { - const initEvent = { + const initEvent: SystemInitMessage = { type: "system", subtype: "init", session_id: sessionId, agent_id: agent.id, - model: agent.llm_config?.model, - tools: agent.tools?.map((t) => t.name) || [], + model: agent.llm_config?.model ?? "", + tools: + agent.tools?.map((t) => t.name).filter((n): n is string => !!n) || [], cwd: process.cwd(), + mcp_servers: [], + permission_mode: "", + slash_commands: [], uuid: `init-${agent.id}`, }; console.log(JSON.stringify(initEvent)); @@ -468,6 +485,8 @@ export async function handleHeadlessCommand( toolName: string; toolArgs: string; }; + reason: string; + matchedRule: string; } | { type: "deny"; @@ -523,6 +542,8 @@ export async function handleHeadlessCommand( decisions.push({ type: "approve", approval: currentApproval, + reason: permission.reason || "Allowed by permission rule", + matchedRule: permission.matchedRule || "auto-approved", }); } @@ -535,16 +556,19 @@ export async function handleHeadlessCommand( if (outputFormat === "stream-json") { for (const decision of decisions) { if (decision.type === "approve") { - console.log( - JSON.stringify({ - type: "auto_approval", - tool_name: decision.approval.toolName, + const autoApprovalMsg: AutoApprovalMessage = { + type: "auto_approval", + tool_call: { + name: decision.approval.toolName, tool_call_id: decision.approval.toolCallId, - tool_args: decision.approval.toolArgs, - session_id: sessionId, - uuid: `auto-approval-${decision.approval.toolCallId}`, - }), - ); + arguments: decision.approval.toolArgs, + }, + reason: decision.reason, + matched_rule: decision.matchedRule, + session_id: sessionId, + uuid: `auto-approval-${decision.approval.toolCallId}`, + }; + console.log(JSON.stringify(autoApprovalMsg)); } } } @@ -640,28 +664,54 @@ export async function handleHeadlessCommand( runIds.add(chunk.run_id); } - // Detect mid-stream errors (errors without message_type) + // Detect mid-stream errors + // Case 1: LettaErrorMessage from the API (has message_type: "error_message") + if ( + "message_type" in chunk && + chunk.message_type === "error_message" + ) { + // This is a LettaErrorMessage - nest it in our wire format + const apiError = chunk as LettaStreamingResponse.LettaErrorMessage; + const errorEvent: ErrorMessage = { + type: "error", + message: apiError.message, + stop_reason: "error", + run_id: apiError.run_id, + api_error: apiError, + session_id: sessionId, + uuid: crypto.randomUUID(), + }; + console.log(JSON.stringify(errorEvent)); + + // Still accumulate for tracking + const { onChunk: accumulatorOnChunk } = await import( + "./cli/helpers/accumulator" + ); + accumulatorOnChunk(buffers, chunk); + continue; + } + + // Case 2: Generic error object without message_type const chunkWithError = chunk as typeof chunk & { error?: { message?: string; detail?: string }; }; - if (chunkWithError.error && !chunk.message_type) { + if (chunkWithError.error && !("message_type" in chunk)) { // Emit as error event - const errorMsg = + const errorText = chunkWithError.error.message || "An error occurred"; const errorDetail = chunkWithError.error.detail || ""; const fullErrorText = errorDetail - ? `${errorMsg}: ${errorDetail}` - : errorMsg; + ? `${errorText}: ${errorDetail}` + : errorText; - console.log( - JSON.stringify({ - type: "error", - message: fullErrorText, - detail: errorDetail, - session_id: sessionId, - uuid: crypto.randomUUID(), - }), - ); + const errorEvent: ErrorMessage = { + type: "error", + message: fullErrorText, + stop_reason: "error", + session_id: sessionId, + uuid: crypto.randomUUID(), + }; + console.log(JSON.stringify(errorEvent)); // Still accumulate for tracking const { onChunk: accumulatorOnChunk } = await import( @@ -786,18 +836,19 @@ export async function handleHeadlessCommand( ); if (missing.length === 0) { shouldOutputChunk = false; - console.log( - JSON.stringify({ - type: "auto_approval", - tool_name: nextName, + const autoApprovalMsg: AutoApprovalMessage = { + type: "auto_approval", + tool_call: { + name: nextName, tool_call_id: id, - tool_args: incomingArgs, - reason: permission.reason, - matched_rule: permission.matchedRule, - session_id: sessionId, - uuid: `auto-approval-${id}`, - }), - ); + arguments: incomingArgs || "{}", + }, + reason: permission.reason || "Allowed by permission rule", + matched_rule: permission.matchedRule || "auto-approved", + session_id: sessionId, + uuid: `auto-approval-${id}`, + }; + console.log(JSON.stringify(autoApprovalMsg)); autoApprovalEmitted.add(id); } } @@ -816,24 +867,22 @@ export async function handleHeadlessCommand( if (includePartialMessages) { // Emit as stream_event wrapper (like Claude Code with --include-partial-messages) - console.log( - JSON.stringify({ - type: "stream_event", - event: chunk, - session_id: sessionId, - uuid, - }), - ); + const streamEvent: StreamEvent = { + type: "stream_event", + event: chunk, + session_id: sessionId, + uuid: uuid || crypto.randomUUID(), + }; + console.log(JSON.stringify(streamEvent)); } else { // Emit as regular message (default) - console.log( - JSON.stringify({ - type: "message", - ...chunk, - session_id: sessionId, - uuid, - }), - ); + const msg: MessageWire = { + type: "message", + ...chunk, + session_id: sessionId, + uuid: uuid || crypto.randomUUID(), + }; + console.log(JSON.stringify(msg)); } } @@ -991,18 +1040,17 @@ export async function handleHeadlessCommand( llmApiErrorRetries = attempt; if (outputFormat === "stream-json") { - console.log( - JSON.stringify({ - type: "retry", - reason: "llm_api_error", - attempt, - max_attempts: LLM_API_ERROR_MAX_RETRIES, - delay_ms: delayMs, - run_id: lastRunId, - session_id: sessionId, - uuid: `retry-${lastRunId || crypto.randomUUID()}`, - }), - ); + const retryMsg: RetryMessage = { + type: "retry", + reason: "llm_api_error", + attempt, + max_attempts: LLM_API_ERROR_MAX_RETRIES, + delay_ms: delayMs, + run_id: lastRunId ?? undefined, + session_id: sessionId, + uuid: `retry-${lastRunId || crypto.randomUUID()}`, + }; + console.log(JSON.stringify(retryMsg)); } else { const delaySeconds = Math.round(delayMs / 1000); console.error( @@ -1065,18 +1113,17 @@ export async function handleHeadlessCommand( llmApiErrorRetries = attempt; if (outputFormat === "stream-json") { - console.log( - JSON.stringify({ - type: "retry", - reason: "llm_api_error", - attempt, - max_attempts: LLM_API_ERROR_MAX_RETRIES, - delay_ms: delayMs, - run_id: lastRunId, - session_id: sessionId, - uuid: `retry-${lastRunId || crypto.randomUUID()}`, - }), - ); + const retryMsg: RetryMessage = { + type: "retry", + reason: "llm_api_error", + attempt, + max_attempts: LLM_API_ERROR_MAX_RETRIES, + delay_ms: delayMs, + run_id: lastRunId ?? undefined, + session_id: sessionId, + uuid: `retry-${lastRunId || crypto.randomUUID()}`, + }; + console.log(JSON.stringify(retryMsg)); } else { const delaySeconds = Math.round(delayMs / 1000); console.error( @@ -1135,16 +1182,15 @@ export async function handleHeadlessCommand( if (outputFormat === "stream-json") { // Emit error event - console.log( - JSON.stringify({ - type: "error", - message: errorMessage, - stop_reason: stopReason, - run_id: lastRunId, - session_id: sessionId, - uuid: `error-${lastRunId || crypto.randomUUID()}`, - }), - ); + const errorMsg: ErrorMessage = { + type: "error", + message: errorMessage, + stop_reason: stopReason, + run_id: lastRunId ?? undefined, + session_id: sessionId, + uuid: `error-${lastRunId || crypto.randomUUID()}`, + }; + console.log(JSON.stringify(errorMsg)); } else { console.error(`Error: ${errorMessage}`); } @@ -1158,15 +1204,15 @@ export async function handleHeadlessCommand( const errorDetails = formatErrorDetails(error, agent.id); if (outputFormat === "stream-json") { - console.log( - JSON.stringify({ - type: "error", - message: errorDetails, - run_id: lastKnownRunId, - session_id: sessionId, - uuid: `error-${lastKnownRunId || crypto.randomUUID()}`, - }), - ); + const errorMsg: ErrorMessage = { + type: "error", + message: errorDetails, + stop_reason: "error", + run_id: lastKnownRunId ?? undefined, + session_id: sessionId, + uuid: `error-${lastKnownRunId || crypto.randomUUID()}`, + }; + console.log(JSON.stringify(errorMsg)); } else { console.error(`Error: ${errorDetails}`); } @@ -1250,10 +1296,9 @@ export async function handleHeadlessCommand( allRunIds.size > 0 ? `result-${Array.from(allRunIds).pop()}` : `result-${agent.id}`; - const resultEvent = { + const resultEvent: ResultMessage = { type: "result", subtype: "success", - is_error: false, session_id: sessionId, duration_ms: Math.round(stats.totalWallMs), duration_api_ms: Math.round(stats.totalApiMs), @@ -1330,14 +1375,14 @@ async function runBidirectionalMode( try { message = JSON.parse(line); } catch { - console.log( - JSON.stringify({ - type: "error", - message: "Invalid JSON input", - session_id: sessionId, - uuid: crypto.randomUUID(), - }), - ); + const errorMsg: ErrorMessage = { + type: "error", + message: "Invalid JSON input", + stop_reason: "error", + session_id: sessionId, + uuid: crypto.randomUUID(), + }; + console.log(JSON.stringify(errorMsg)); continue; } @@ -1348,52 +1393,49 @@ async function runBidirectionalMode( if (subtype === "initialize") { // Return session info - console.log( - JSON.stringify({ - type: "control_response", + const initResponse: ControlResponse = { + type: "control_response", + response: { + subtype: "success", + request_id: requestId ?? "", response: { - subtype: "success", - request_id: requestId, - response: { - agent_id: agent.id, - model: agent.llm_config?.model, - tools: agent.tools?.map((t) => t.name) || [], - }, + agent_id: agent.id, + model: agent.llm_config?.model, + tools: agent.tools?.map((t) => t.name) || [], }, - session_id: sessionId, - uuid: crypto.randomUUID(), - }), - ); + }, + session_id: sessionId, + uuid: crypto.randomUUID(), + }; + console.log(JSON.stringify(initResponse)); } else if (subtype === "interrupt") { // Abort current operation if any if (currentAbortController !== null) { (currentAbortController as AbortController).abort(); currentAbortController = null; } - console.log( - JSON.stringify({ - type: "control_response", - response: { - subtype: "success", - request_id: requestId, - }, - session_id: sessionId, - uuid: crypto.randomUUID(), - }), - ); + const interruptResponse: ControlResponse = { + type: "control_response", + response: { + subtype: "success", + request_id: requestId ?? "", + }, + session_id: sessionId, + uuid: crypto.randomUUID(), + }; + console.log(JSON.stringify(interruptResponse)); } else { - console.log( - JSON.stringify({ - type: "control_response", - response: { - subtype: "error", - request_id: requestId, - message: `Unknown control request subtype: ${subtype}`, - }, - session_id: sessionId, - uuid: crypto.randomUUID(), - }), - ); + const errorResponse: ControlResponse = { + type: "control_response", + response: { + subtype: "error", + request_id: requestId ?? "", + error: `Unknown control request subtype: ${subtype}`, + }, + session_id: sessionId, + uuid: crypto.randomUUID(), + }; + console.log(JSON.stringify(errorResponse)); } continue; } @@ -1429,23 +1471,21 @@ async function runBidirectionalMode( const uuid = chunkWithIds.otid || chunkWithIds.id; if (includePartialMessages) { - console.log( - JSON.stringify({ - type: "stream_event", - event: chunk, - session_id: sessionId, - uuid, - }), - ); + const streamEvent: StreamEvent = { + type: "stream_event", + event: chunk, + session_id: sessionId, + uuid: uuid || crypto.randomUUID(), + }; + console.log(JSON.stringify(streamEvent)); } else { - console.log( - JSON.stringify({ - type: "message", - ...chunk, - session_id: sessionId, - uuid, - }), - ); + const msg: MessageWire = { + type: "message", + ...chunk, + session_id: sessionId, + uuid: uuid || crypto.randomUUID(), + }; + console.log(JSON.stringify(msg)); } // Accumulate for result @@ -1466,30 +1506,32 @@ async function runBidirectionalMode( ) as Extract | undefined; const resultText = lastAssistant?.text || ""; - console.log( - JSON.stringify({ - type: "result", - subtype: currentAbortController?.signal.aborted - ? "interrupted" - : "success", - is_error: false, - session_id: sessionId, - duration_ms: Math.round(durationMs), - result: resultText, - agent_id: agent.id, - uuid: `result-${agent.id}-${Date.now()}`, - }), - ); + const resultMsg: ResultMessage = { + type: "result", + subtype: currentAbortController?.signal.aborted + ? "interrupted" + : "success", + session_id: sessionId, + duration_ms: Math.round(durationMs), + duration_api_ms: 0, // Not tracked in bidirectional mode + num_turns: 1, + result: resultText, + agent_id: agent.id, + run_ids: [], + usage: null, + uuid: `result-${agent.id}-${Date.now()}`, + }; + console.log(JSON.stringify(resultMsg)); } catch (error) { - console.log( - JSON.stringify({ - type: "error", - message: - error instanceof Error ? error.message : "Unknown error occurred", - session_id: sessionId, - uuid: crypto.randomUUID(), - }), - ); + const errorMsg: ErrorMessage = { + type: "error", + message: + error instanceof Error ? error.message : "Unknown error occurred", + stop_reason: "error", + session_id: sessionId, + uuid: crypto.randomUUID(), + }; + console.log(JSON.stringify(errorMsg)); } finally { currentAbortController = null; } @@ -1497,14 +1539,14 @@ async function runBidirectionalMode( } // Unknown message type - console.log( - JSON.stringify({ - type: "error", - message: `Unknown message type: ${message.type}`, - session_id: sessionId, - uuid: crypto.randomUUID(), - }), - ); + const errorMsg: ErrorMessage = { + type: "error", + message: `Unknown message type: ${message.type}`, + stop_reason: "error", + session_id: sessionId, + uuid: crypto.randomUUID(), + }; + console.log(JSON.stringify(errorMsg)); } // Stdin closed, exit gracefully diff --git a/src/tests/headless-input-format.test.ts b/src/tests/headless-input-format.test.ts index d4142d4..92a6361 100644 --- a/src/tests/headless-input-format.test.ts +++ b/src/tests/headless-input-format.test.ts @@ -1,9 +1,17 @@ import { describe, expect, test } from "bun:test"; import { spawn } from "node:child_process"; +import type { + ControlResponse, + ErrorMessage, + ResultMessage, + StreamEvent, + SystemInitMessage, + WireMessage, +} from "../types/wire"; /** * Tests for --input-format stream-json bidirectional communication. - * These verify the SDK can communicate with the CLI via stdin/stdout. + * These verify the CLI's wire format for bidirectional communication. */ // Prescriptive prompt to ensure single-step response without tool use @@ -57,7 +65,7 @@ async function runBidirectional( let inputIndex = 0; const writeNextInput = () => { if (inputIndex < inputs.length) { - proc.stdin?.write(inputs[inputIndex] + "\n"); + proc.stdin?.write(`${inputs[inputIndex]}\n`); inputIndex++; setTimeout(writeNextInput, 1000); // 1s between inputs } else { @@ -108,32 +116,35 @@ describe("input-format stream-json", () => { test( "initialize control request returns session info", async () => { - const objects = await runBidirectional([ + const objects = (await runBidirectional([ JSON.stringify({ type: "control_request", request_id: "init_1", request: { subtype: "initialize" }, }), - ]); + ])) as WireMessage[]; // Should have init event const initEvent = objects.find( - (o: any) => o.type === "system" && o.subtype === "init", + (o): o is SystemInitMessage => + o.type === "system" && "subtype" in o && o.subtype === "init", ); expect(initEvent).toBeDefined(); - expect((initEvent as any).agent_id).toBeDefined(); - expect((initEvent as any).session_id).toBeDefined(); - expect((initEvent as any).model).toBeDefined(); - expect((initEvent as any).tools).toBeInstanceOf(Array); + expect(initEvent?.agent_id).toBeDefined(); + expect(initEvent?.session_id).toBeDefined(); + expect(initEvent?.model).toBeDefined(); + expect(initEvent?.tools).toBeInstanceOf(Array); // Should have control_response const controlResponse = objects.find( - (o: any) => o.type === "control_response", + (o): o is ControlResponse => o.type === "control_response", ); expect(controlResponse).toBeDefined(); - expect((controlResponse as any).response.subtype).toBe("success"); - expect((controlResponse as any).response.request_id).toBe("init_1"); - expect((controlResponse as any).response.response.agent_id).toBeDefined(); + expect(controlResponse?.response.subtype).toBe("success"); + expect(controlResponse?.response.request_id).toBe("init_1"); + if (controlResponse?.response.subtype === "success") { + expect(controlResponse.response.response?.agent_id).toBeDefined(); + } }, { timeout: 30000 }, ); @@ -141,7 +152,7 @@ describe("input-format stream-json", () => { test( "user message returns assistant response and result", async () => { - const objects = await runBidirectional( + const objects = (await runBidirectional( [ JSON.stringify({ type: "user", @@ -150,41 +161,47 @@ describe("input-format stream-json", () => { ], [], 10000, - ); + )) as WireMessage[]; // Should have init event const initEvent = objects.find( - (o: any) => o.type === "system" && o.subtype === "init", + (o): o is SystemInitMessage => + o.type === "system" && "subtype" in o && o.subtype === "init", ); expect(initEvent).toBeDefined(); // Should have message events - const messageEvents = objects.filter((o: any) => o.type === "message"); + const messageEvents = objects.filter( + (o): o is WireMessage & { type: "message" } => o.type === "message", + ); expect(messageEvents.length).toBeGreaterThan(0); // All messages should have session_id // uuid is present on content messages (reasoning, assistant) but not meta messages (stop_reason, usage_statistics) for (const msg of messageEvents) { - expect((msg as any).session_id).toBeDefined(); + expect(msg.session_id).toBeDefined(); } // Content messages should have uuid const contentMessages = messageEvents.filter( - (m: any) => - m.message_type === "reasoning_message" || - m.message_type === "assistant_message", + (m) => + "message_type" in m && + (m.message_type === "reasoning_message" || + m.message_type === "assistant_message"), ); for (const msg of contentMessages) { - expect((msg as any).uuid).toBeDefined(); + expect(msg.uuid).toBeDefined(); } // Should have result - const result = objects.find((o: any) => o.type === "result"); + const result = objects.find( + (o): o is ResultMessage => o.type === "result", + ); expect(result).toBeDefined(); - expect((result as any).subtype).toBe("success"); - expect((result as any).session_id).toBeDefined(); - expect((result as any).agent_id).toBeDefined(); - expect((result as any).duration_ms).toBeGreaterThan(0); + expect(result?.subtype).toBe("success"); + expect(result?.session_id).toBeDefined(); + expect(result?.agent_id).toBeDefined(); + expect(result?.duration_ms).toBeGreaterThan(0); }, { timeout: 60000 }, ); @@ -192,7 +209,7 @@ describe("input-format stream-json", () => { test( "multi-turn conversation maintains context", async () => { - const objects = await runBidirectional( + const objects = (await runBidirectional( [ JSON.stringify({ type: "user", @@ -211,23 +228,29 @@ describe("input-format stream-json", () => { ], [], 20000, - ); + )) as WireMessage[]; // Should have at least two results (one per turn) - const results = objects.filter((o: any) => o.type === "result"); + const results = objects.filter( + (o): o is ResultMessage => o.type === "result", + ); expect(results.length).toBeGreaterThanOrEqual(2); // Both results should be successful for (const result of results) { - expect((result as any).subtype).toBe("success"); - expect((result as any).session_id).toBeDefined(); - expect((result as any).agent_id).toBeDefined(); + expect(result.subtype).toBe("success"); + expect(result.session_id).toBeDefined(); + expect(result.agent_id).toBeDefined(); } // The session_id should be consistent across turns (same agent) - const firstSessionId = (results[0] as any).session_id; - const lastSessionId = (results[results.length - 1] as any).session_id; - expect(firstSessionId).toBe(lastSessionId); + const firstResult = results[0]; + const lastResult = results[results.length - 1]; + expect(firstResult).toBeDefined(); + expect(lastResult).toBeDefined(); + if (firstResult && lastResult) { + expect(firstResult.session_id).toBe(lastResult.session_id); + } }, { timeout: 120000 }, ); @@ -235,7 +258,7 @@ describe("input-format stream-json", () => { test( "interrupt control request is acknowledged", async () => { - const objects = await runBidirectional( + const objects = (await runBidirectional( [ JSON.stringify({ type: "control_request", @@ -245,15 +268,15 @@ describe("input-format stream-json", () => { ], [], 8000, // Longer wait for CI - ); + )) as WireMessage[]; // Should have control_response for interrupt const controlResponse = objects.find( - (o: any) => + (o): o is ControlResponse => o.type === "control_response" && o.response?.request_id === "int_1", ); expect(controlResponse).toBeDefined(); - expect((controlResponse as any).response.subtype).toBe("success"); + expect(controlResponse?.response.subtype).toBe("success"); }, { timeout: 30000 }, ); @@ -261,7 +284,7 @@ describe("input-format stream-json", () => { test( "--include-partial-messages emits stream_event in bidirectional mode", async () => { - const objects = await runBidirectional( + const objects = (await runBidirectional( [ JSON.stringify({ type: "user", @@ -270,35 +293,38 @@ describe("input-format stream-json", () => { ], ["--include-partial-messages"], 10000, - ); + )) as WireMessage[]; // Should have stream_event messages (not just "message" type) const streamEvents = objects.filter( - (o: any) => o.type === "stream_event", + (o): o is StreamEvent => o.type === "stream_event", ); expect(streamEvents.length).toBeGreaterThan(0); // Each stream_event should have the event payload and session_id // uuid is present on content events but not meta events (stop_reason, usage_statistics) for (const event of streamEvents) { - expect((event as any).event).toBeDefined(); - expect((event as any).session_id).toBeDefined(); + expect(event.event).toBeDefined(); + expect(event.session_id).toBeDefined(); } // Content events should have uuid const contentEvents = streamEvents.filter( - (e: any) => - e.event?.message_type === "reasoning_message" || - e.event?.message_type === "assistant_message", + (e) => + "message_type" in e.event && + (e.event.message_type === "reasoning_message" || + e.event.message_type === "assistant_message"), ); for (const event of contentEvents) { - expect((event as any).uuid).toBeDefined(); + expect(event.uuid).toBeDefined(); } // Should still have result - const result = objects.find((o: any) => o.type === "result"); + const result = objects.find( + (o): o is ResultMessage => o.type === "result", + ); expect(result).toBeDefined(); - expect((result as any).subtype).toBe("success"); + expect(result?.subtype).toBe("success"); }, { timeout: 60000 }, ); @@ -306,22 +332,22 @@ describe("input-format stream-json", () => { test( "unknown control request returns error", async () => { - const objects = await runBidirectional([ + const objects = (await runBidirectional([ JSON.stringify({ type: "control_request", request_id: "unknown_1", request: { subtype: "unknown_subtype" }, }), - ]); + ])) as WireMessage[]; // Should have control_response with error const controlResponse = objects.find( - (o: any) => + (o): o is ControlResponse => o.type === "control_response" && o.response?.request_id === "unknown_1", ); expect(controlResponse).toBeDefined(); - expect((controlResponse as any).response.subtype).toBe("error"); + expect(controlResponse?.response.subtype).toBe("error"); }, { timeout: 30000 }, ); @@ -330,12 +356,16 @@ describe("input-format stream-json", () => { "invalid JSON input returns error message", async () => { // Use raw string instead of JSON - const objects = await runBidirectional(["not valid json"]); + const objects = (await runBidirectional([ + "not valid json", + ])) as WireMessage[]; // Should have error message - const errorMsg = objects.find((o: any) => o.type === "error"); + const errorMsg = objects.find( + (o): o is ErrorMessage => o.type === "error", + ); expect(errorMsg).toBeDefined(); - expect((errorMsg as any).message).toContain("Invalid JSON"); + expect(errorMsg?.message).toContain("Invalid JSON"); }, { timeout: 30000 }, ); diff --git a/src/tests/headless-stream-json-format.test.ts b/src/tests/headless-stream-json-format.test.ts index 6b60435..67fb580 100644 --- a/src/tests/headless-stream-json-format.test.ts +++ b/src/tests/headless-stream-json-format.test.ts @@ -1,9 +1,14 @@ import { describe, expect, test } from "bun:test"; import { spawn } from "node:child_process"; +import type { + ResultMessage, + StreamEvent, + SystemInitMessage, +} from "../types/wire"; /** * Tests for stream-json output format. - * These verify the message structure matches the SDK-compatible format. + * These verify the message structure matches the wire format types. */ async function runHeadlessCommand( @@ -80,8 +85,9 @@ describe("stream-json format", () => { }); expect(initLine).toBeDefined(); + if (!initLine) throw new Error("initLine not found"); - const init = JSON.parse(initLine!); + const init = JSON.parse(initLine) as SystemInitMessage; expect(init.type).toBe("system"); expect(init.subtype).toBe("init"); expect(init.agent_id).toBeDefined(); @@ -106,8 +112,12 @@ describe("stream-json format", () => { }); expect(messageLine).toBeDefined(); + if (!messageLine) throw new Error("messageLine not found"); - const msg = JSON.parse(messageLine!); + const msg = JSON.parse(messageLine) as { + session_id: string; + uuid: string; + }; expect(msg.session_id).toBeDefined(); expect(msg.uuid).toBeDefined(); // uuid should be otid or id from the Letta SDK chunk @@ -126,8 +136,9 @@ describe("stream-json format", () => { }); expect(resultLine).toBeDefined(); + if (!resultLine) throw new Error("resultLine not found"); - const result = JSON.parse(resultLine!); + const result = JSON.parse(resultLine) as ResultMessage & { uuid: string }; expect(result.type).toBe("result"); expect(result.subtype).toBe("success"); expect(result.session_id).toBeDefined(); @@ -154,14 +165,15 @@ describe("stream-json format", () => { }); expect(streamEventLine).toBeDefined(); + if (!streamEventLine) throw new Error("streamEventLine not found"); - const event = JSON.parse(streamEventLine!); + const event = JSON.parse(streamEventLine) as StreamEvent; expect(event.type).toBe("stream_event"); expect(event.event).toBeDefined(); expect(event.session_id).toBeDefined(); expect(event.uuid).toBeDefined(); // The event should contain the original Letta SDK chunk - expect(event.event.message_type).toBeDefined(); + expect("message_type" in event.event).toBe(true); }, { timeout: 60000 }, ); diff --git a/src/types/wire.ts b/src/types/wire.ts new file mode 100644 index 0000000..63481a8 --- /dev/null +++ b/src/types/wire.ts @@ -0,0 +1,255 @@ +/** + * Wire Format Types + * + * These types define the JSON structure emitted by headless.ts when running + * in stream-json mode. They enable typed consumption of the bidirectional + * JSON protocol. + * + * Design principle: Compose from @letta-ai/letta-client types where possible. + */ + +import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents"; +import type { + AssistantMessage as LettaAssistantMessage, + ReasoningMessage as LettaReasoningMessage, + LettaStreamingResponse, + ToolCallMessage as LettaToolCallMessage, + ToolCall, +} from "@letta-ai/letta-client/resources/agents/messages"; +import type { StopReasonType } from "@letta-ai/letta-client/resources/runs/runs"; +import type { ToolReturnMessage as LettaToolReturnMessage } from "@letta-ai/letta-client/resources/tools"; + +// Re-export letta-client types that consumers may need +export type { + LettaStreamingResponse, + ToolCall, + StopReasonType, + MessageCreate, + LettaToolReturnMessage, +}; + +// ═══════════════════════════════════════════════════════════════ +// BASE ENVELOPE +// All wire messages include these fields +// ═══════════════════════════════════════════════════════════════ + +export interface MessageEnvelope { + session_id: string; + uuid: string; +} + +// ═══════════════════════════════════════════════════════════════ +// SYSTEM MESSAGES +// ═══════════════════════════════════════════════════════════════ + +export interface SystemInitMessage extends MessageEnvelope { + type: "system"; + subtype: "init"; + agent_id: string; + model: string; + tools: string[]; + cwd: string; + mcp_servers: Array<{ name: string; status: string }>; + permission_mode: string; + slash_commands: string[]; + // output_style omitted - Letta Code doesn't have output styles feature +} + +export type SystemMessage = SystemInitMessage; + +// ═══════════════════════════════════════════════════════════════ +// CONTENT MESSAGES +// These wrap letta-client message types with the wire envelope +// ═══════════════════════════════════════════════════════════════ + +/** + * Wire format for assistant messages. + * Extends LettaAssistantMessage with wire envelope fields. + */ +export interface AssistantMessageWire + extends LettaAssistantMessage, + MessageEnvelope { + type: "message"; +} + +/** + * Wire format for tool call messages. + * Extends LettaToolCallMessage with wire envelope fields. + */ +export interface ToolCallMessageWire + extends LettaToolCallMessage, + MessageEnvelope { + type: "message"; +} + +/** + * Wire format for reasoning messages. + * Extends LettaReasoningMessage with wire envelope fields. + */ +export interface ReasoningMessageWire + extends LettaReasoningMessage, + MessageEnvelope { + type: "message"; +} + +/** + * Wire format for tool return messages. + * Extends LettaToolReturnMessage with wire envelope fields. + */ +export interface ToolReturnMessageWire + extends LettaToolReturnMessage, + MessageEnvelope { + type: "message"; +} + +export type ContentMessage = + | AssistantMessageWire + | ToolCallMessageWire + | ReasoningMessageWire + | ToolReturnMessageWire; + +/** + * Generic message wrapper for spreading LettaStreamingResponse chunks. + * Used when the exact message type is determined at runtime. + */ +export type MessageWire = { + type: "message"; + session_id: string; + uuid: string; +} & LettaStreamingResponse; + +// ═══════════════════════════════════════════════════════════════ +// STREAM EVENTS (partial message updates) +// ═══════════════════════════════════════════════════════════════ + +export interface StreamEvent extends MessageEnvelope { + type: "stream_event"; + event: LettaStreamingResponse; +} + +// ═══════════════════════════════════════════════════════════════ +// AUTO APPROVAL +// ═══════════════════════════════════════════════════════════════ + +export interface AutoApprovalMessage extends MessageEnvelope { + type: "auto_approval"; + tool_call: ToolCall; + reason: string; + matched_rule: string; +} + +// ═══════════════════════════════════════════════════════════════ +// ERROR & RETRY +// ═══════════════════════════════════════════════════════════════ + +export interface ErrorMessage extends MessageEnvelope { + type: "error"; + /** High-level error message from the CLI */ + message: string; + stop_reason: StopReasonType; + run_id?: string; + /** Nested API error when the error originated from Letta API */ + api_error?: LettaStreamingResponse.LettaErrorMessage; +} + +export interface RetryMessage extends MessageEnvelope { + type: "retry"; + /** The stop reason that triggered the retry. Uses StopReasonType from letta-client. */ + reason: StopReasonType; + attempt: number; + max_attempts: number; + delay_ms: number; + run_id?: string; +} + +// ═══════════════════════════════════════════════════════════════ +// RESULT +// ═══════════════════════════════════════════════════════════════ + +/** + * Result subtypes. + * For errors, use stop_reason field with StopReasonType from letta-client. + */ +export type ResultSubtype = "success" | "interrupted" | "error"; + +/** + * Usage statistics from letta-client. + * Re-exported for convenience. + */ +export type UsageStatistics = LettaStreamingResponse.LettaUsageStatistics; + +export interface ResultMessage extends MessageEnvelope { + type: "result"; + subtype: ResultSubtype; + agent_id: string; + duration_ms: number; + duration_api_ms: number; + num_turns: number; + result: string | null; + run_ids: string[]; + usage: UsageStatistics | null; + /** + * Present when subtype is "error". + * Uses StopReasonType from letta-client (e.g., 'error', 'max_steps', 'llm_api_error'). + */ + stop_reason?: StopReasonType; +} + +// ═══════════════════════════════════════════════════════════════ +// CONTROL PROTOCOL +// ═══════════════════════════════════════════════════════════════ + +// Requests (external → CLI) +export interface ControlRequest { + type: "control_request"; + request_id: string; + request: ControlRequestBody; +} + +export type ControlRequestBody = + | { subtype: "initialize" } + | { subtype: "interrupt" }; + +// Responses (CLI → external) +export interface ControlResponse extends MessageEnvelope { + type: "control_response"; + response: ControlResponseBody; +} + +export type ControlResponseBody = + | { + subtype: "success"; + request_id: string; + response?: Record; + } + | { subtype: "error"; request_id: string; error: string }; + +// ═══════════════════════════════════════════════════════════════ +// USER INPUT +// ═══════════════════════════════════════════════════════════════ + +/** + * User input message for bidirectional communication. + * Uses MessageCreate from letta-client for multimodal content support. + */ +export interface UserInput { + type: "user"; + message: MessageCreate; +} + +// ═══════════════════════════════════════════════════════════════ +// UNION TYPE +// ═══════════════════════════════════════════════════════════════ + +/** + * Union of all wire message types that can be emitted by headless.ts + */ +export type WireMessage = + | SystemMessage + | ContentMessage + | StreamEvent + | AutoApprovalMessage + | ErrorMessage + | RetryMessage + | ResultMessage + | ControlResponse; diff --git a/tsconfig.types.json b/tsconfig.types.json new file mode 100644 index 0000000..effe174 --- /dev/null +++ b/tsconfig.types.json @@ -0,0 +1,11 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "noEmit": false, + "emitDeclarationOnly": true, + "declaration": true, + "declarationMap": true, + "outDir": "./dist/types" + }, + "include": ["src/types/wire.ts"] +}