From 55163c6e3d7b14b8508d424fc902d0f74295bb98 Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Mon, 27 Oct 2025 21:57:20 -0700 Subject: [PATCH] feat: add stream-json output format and improve error handling (#15) Co-authored-by: Letta --- README.md | 29 ++++++++++--- src/headless.ts | 107 +++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 120 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 30d0b3d..d82e388 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,10 @@ letta -p "run tests" --disallowedTools "Bash" # Control tool permissions echo "Explain this code" | letta -p cat file.txt | letta -p gh pr diff 123 | letta -p --yolo # Review PR changes + +# Output formats +letta -p "analyze code" --output-format json # Structured JSON at end +letta -p "analyze code" --output-format stream-json # JSONL stream (one event per line) ``` You can also use the `--tools` flag to control the underlying *attachment* of tools (not just the permissions). @@ -92,13 +96,13 @@ letta -p "run tests" --tools "Bash,Read" # Only load specific tools letta -p "analyze code" --tools "" # No tools (analysis only) ``` -Use `--output-format json` to get additional information, including the agent ID ("session_id"): +Use `--output-format json` to get structured output with metadata: ```bash # regular text output $ letta -p "hi there" Hi! How can I help you today? -# structured output +# structured output (single JSON object at end) $ letta -p "hi there" --output-format json { "type": "result", @@ -108,12 +112,27 @@ $ letta -p "hi there" --output-format json "duration_api_ms": 2098, "num_turns": 1, "result": "Hi! How can I help you today?", - "session_id": "agent-8ab431ca-63e0-4ca1-ba83-b64d66d95a0f", + "agent_id": "agent-8ab431ca-63e0-4ca1-ba83-b64d66d95a0f", "usage": { - "input_tokens": 294, - "output_tokens": 97 + "prompt_tokens": 294, + "completion_tokens": 97, + "total_tokens": 391 } } + +# streaming JSON output (JSONL - one event per line, token-level streaming) +$ letta -p "hi there" --output-format stream-json +{"type":"init","agent_id":"agent-...","model":"claude-sonnet-4-5-20250929","tools":[...]} +{"type":"message","messageType":"reasoning_message","reasoning":"The user is asking","otid":"...","seqId":1} +{"type":"message","messageType":"reasoning_message","reasoning":" me to say hello","otid":"...","seqId":2} +{"type":"message","messageType":"reasoning_message","reasoning":". This is a simple","otid":"...","seqId":3} +{"type":"message","messageType":"reasoning_message","reasoning":" greeting.","otid":"...","seqId":4} +{"type":"message","messageType":"assistant_message","content":"Hi! How can I help you today?","otid":"...","seqId":5} +{"type":"message","messageType":"stop_reason","stopReason":"end_turn"} +{"type":"message","messageType":"usage_statistics","promptTokens":294,"completionTokens":97,"totalTokens":391} +{"type":"result","subtype":"success","result":"Hi! How can I help you today?","agent_id":"agent-...","usage":{...}} + +Note: Messages are streamed at the token level - each chunk has the same otid and incrementing seqId. ``` ### Permissions diff --git a/src/headless.ts b/src/headless.ts index 0e88729..c6787c0 100644 --- a/src/headless.ts +++ b/src/headless.ts @@ -91,6 +91,17 @@ export async function handleHeadlessCommand(argv: string[]) { // Initialize session stats const sessionStats = new SessionStats(); + // Output init event for stream-json format + if (outputFormat === "stream-json") { + const initEvent = { + type: "init", + agent_id: agent.id, + model: agent.llmConfig?.model, + tools: agent.tools?.map((t) => t.name) || [], + }; + console.log(JSON.stringify(initEvent)); + } + // Send message and process stream loop let currentInput: Array = [ { @@ -103,12 +114,69 @@ export async function handleHeadlessCommand(argv: string[]) { while (true) { const stream = await sendMessageStream(agent.id, currentInput); - // Drain stream and collect approval requests - const { stopReason, approval, apiDurationMs } = await drainStream( - stream, - buffers, - () => {}, // No UI refresh needed in headless mode - ); + // For stream-json, output each chunk as it arrives + let stopReason: Letta.StopReasonType; + let approval: { + toolCallId: string; + toolName: string; + toolArgs: string; + } | null = null; + let apiDurationMs: number; + + if (outputFormat === "stream-json") { + const startTime = performance.now(); + let lastStopReason: Letta.StopReasonType | null = null; + + for await (const chunk of stream) { + // Output chunk as message event + console.log( + JSON.stringify({ + type: "message", + ...chunk, + }), + ); + + // Still accumulate for approval tracking + const { onChunk } = await import("./cli/helpers/accumulator"); + onChunk(buffers, chunk); + + // Track stop reason and approval + if (chunk.messageType === "stop_reason") { + lastStopReason = chunk.stopReason; + } + + // Track approval requests + if (chunk.messageType === "approval_request_message") { + const toolCall = (chunk as any).toolCall; + if (toolCall?.toolCallId && toolCall?.name) { + approval = { + toolCallId: toolCall.toolCallId, + toolName: toolCall.name, + toolArgs: toolCall.arguments || "{}", + }; + } + } + } + + stopReason = lastStopReason || Letta.StopReasonType.Error; + apiDurationMs = performance.now() - startTime; + + // Mark final line as finished + const { markCurrentLineAsFinished } = await import( + "./cli/helpers/accumulator" + ); + markCurrentLineAsFinished(buffers); + } else { + // Normal mode: use drainStream + const result = await drainStream( + stream, + buffers, + () => {}, // No UI refresh needed in headless mode + ); + stopReason = result.stopReason; + approval = result.approval || null; + apiDurationMs = result.apiDurationMs; + } // Track API duration for this stream sessionStats.endTurn(apiDurationMs); @@ -220,16 +288,33 @@ export async function handleHeadlessCommand(argv: string[]) { duration_api_ms: Math.round(stats.totalApiMs), num_turns: stats.usage.stepCount, result: resultText, - session_id: agent.id, + agent_id: agent.id, usage: { - input_tokens: stats.usage.promptTokens, - output_tokens: stats.usage.completionTokens, + prompt_tokens: stats.usage.promptTokens, + completion_tokens: stats.usage.completionTokens, + total_tokens: stats.usage.totalTokens, }, }; console.log(JSON.stringify(output, null, 2)); } else if (outputFormat === "stream-json") { - console.error("stream-json format not yet implemented"); - process.exit(1); + // Output final result event + const stats = sessionStats.getSnapshot(); + const resultEvent = { + type: "result", + subtype: "success", + is_error: false, + duration_ms: Math.round(stats.totalWallMs), + duration_api_ms: Math.round(stats.totalApiMs), + num_turns: stats.usage.stepCount, + result: resultText, + agent_id: agent.id, + usage: { + prompt_tokens: stats.usage.promptTokens, + completion_tokens: stats.usage.completionTokens, + total_tokens: stats.usage.totalTokens, + }, + }; + console.log(JSON.stringify(resultEvent)); } else { // text format (default) if (!lastAssistant || !("text" in lastAssistant)) {