From 25950133b8829bfa55956e3bbb6345219f45cab7 Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Wed, 24 Dec 2025 01:00:35 -0800 Subject: [PATCH] fix: handle JSON parse errors in stream gracefully (#382) Co-authored-by: Letta --- src/cli/App.tsx | 41 +++++--- src/cli/helpers/stream.ts | 209 +++++++++++++++++++++----------------- 2 files changed, 145 insertions(+), 105 deletions(-) diff --git a/src/cli/App.tsx b/src/cli/App.tsx index 4318aaf..b69ea00 100644 --- a/src/cli/App.tsx +++ b/src/cli/App.tsx @@ -979,14 +979,20 @@ export default function App({ } }; - const { stopReason, approval, approvals, apiDurationMs, lastRunId } = - await drainStreamWithResume( - stream, - buffersRef.current, - refreshDerivedThrottled, - signal, // Use captured signal, not ref (which may be nulled by handleInterrupt) - syncAgentState, - ); + const { + stopReason, + approval, + approvals, + apiDurationMs, + lastRunId, + streamError, + } = await drainStreamWithResume( + stream, + buffersRef.current, + refreshDerivedThrottled, + signal, // Use captured signal, not ref (which may be nulled by handleInterrupt) + syncAgentState, + ); // Update currentRunId for error reporting in catch block currentRunId = lastRunId ?? undefined; @@ -1403,10 +1409,10 @@ export default function App({ // Mark incomplete tool calls as finished to prevent stuck blinking UI markIncompleteToolsAsCancelled(buffersRef.current); - // Track the server-side error in telemetry + // Track the error in telemetry telemetry.trackError( - stopReason || "unknown_stop_reason", - `Stream stopped with reason: ${stopReason}`, + streamError ? "StreamError" : stopReason || "unknown_stop_reason", + streamError || `Stream stopped with reason: ${stopReason}`, "message_stream", { modelId: currentModelId || undefined, @@ -1414,7 +1420,18 @@ export default function App({ }, ); - // Fetch error details from the run if available + // If we have a client-side stream error (e.g., JSON parse error), show it directly + if (streamError) { + const errorMsg = lastRunId + ? `Stream error: ${streamError}\n(run_id: ${lastRunId})` + : `Stream error: ${streamError}`; + appendError(errorMsg, true); // Skip telemetry - already tracked above + setStreaming(false); + refreshDerived(); + return; + } + + // Fetch error details from the run if available (server-side errors) if (lastRunId) { try { const client = await getClient(); diff --git a/src/cli/helpers/stream.ts b/src/cli/helpers/stream.ts index d61a3d3..9f5504a 100644 --- a/src/cli/helpers/stream.ts +++ b/src/cli/helpers/stream.ts @@ -24,6 +24,7 @@ type DrainResult = { approval?: ApprovalRequest | null; // DEPRECATED: kept for backward compat approvals?: ApprovalRequest[]; // NEW: supports parallel approvals apiDurationMs: number; // time spent in API call + streamError?: string | null; // Client-side error message (e.g., JSON parse error) }; export async function drainStream( @@ -49,113 +50,129 @@ export async function drainStream( let lastRunId: string | null = null; let lastSeqId: number | null = null; let hasCalledFirstMessage = false; + let streamError: string | null = null; - for await (const chunk of stream) { - // console.log("chunk", chunk); + try { + for await (const chunk of stream) { + // console.log("chunk", chunk); - // Check if stream was aborted - if (abortSignal?.aborted) { - stopReason = "cancelled"; - markIncompleteToolsAsCancelled(buffers); - queueMicrotask(refresh); - break; - } - // Store the run_id and seq_id to re-connect if stream is interrupted - if ( - "run_id" in chunk && - "seq_id" in chunk && - chunk.run_id && - chunk.seq_id - ) { - lastRunId = chunk.run_id; - lastSeqId = chunk.seq_id; - } - - if (chunk.message_type === "ping") continue; - - // Call onFirstMessage callback on the first agent response chunk - if ( - !hasCalledFirstMessage && - onFirstMessage && - (chunk.message_type === "reasoning_message" || - chunk.message_type === "assistant_message") - ) { - hasCalledFirstMessage = true; - // Call async in background - don't block stream processing - queueMicrotask(() => onFirstMessage()); - } - - // Remove tool from pending approvals when it completes (server-side execution finished) - // This means the tool was executed server-side and doesn't need approval - if (chunk.message_type === "tool_return_message") { - if (chunk.tool_call_id) { - pendingApprovals.delete(chunk.tool_call_id); + // Check if stream was aborted + if (abortSignal?.aborted) { + stopReason = "cancelled"; + markIncompleteToolsAsCancelled(buffers); + queueMicrotask(refresh); + break; + } + // Store the run_id and seq_id to re-connect if stream is interrupted + if ( + "run_id" in chunk && + "seq_id" in chunk && + chunk.run_id && + chunk.seq_id + ) { + lastRunId = chunk.run_id; + lastSeqId = chunk.seq_id; } - // Continue processing this chunk (for UI display) - } - // Need to store the approval request ID to send an approval in a new run - if (chunk.message_type === "approval_request_message") { - _approvalRequestId = chunk.id; - } + if (chunk.message_type === "ping") continue; - // Accumulate approval request state across streaming chunks - // Support parallel tool calls by tracking each tool_call_id separately - // NOTE: Only track approval_request_message, NOT tool_call_message - // tool_call_message = auto-executed server-side (e.g., web_search) - // approval_request_message = needs user approval (e.g., Bash) - if (chunk.message_type === "approval_request_message") { - // console.log( - // "[drainStream] approval_request_message chunk:", - // JSON.stringify(chunk, null, 2), - // ); + // Call onFirstMessage callback on the first agent response chunk + if ( + !hasCalledFirstMessage && + onFirstMessage && + (chunk.message_type === "reasoning_message" || + chunk.message_type === "assistant_message") + ) { + hasCalledFirstMessage = true; + // Call async in background - don't block stream processing + queueMicrotask(() => onFirstMessage()); + } - // Normalize tool calls: support both legacy tool_call and new tool_calls array - const toolCalls = Array.isArray(chunk.tool_calls) - ? chunk.tool_calls - : chunk.tool_call - ? [chunk.tool_call] - : []; - - for (const toolCall of toolCalls) { - if (!toolCall?.tool_call_id) continue; // strict: require id - - // Get or create entry for this tool_call_id - const existing = pendingApprovals.get(toolCall.tool_call_id) || { - toolCallId: toolCall.tool_call_id, - toolName: "", - toolArgs: "", - }; - - // Update name if provided - if (toolCall.name) { - existing.toolName = toolCall.name; + // Remove tool from pending approvals when it completes (server-side execution finished) + // This means the tool was executed server-side and doesn't need approval + if (chunk.message_type === "tool_return_message") { + if (chunk.tool_call_id) { + pendingApprovals.delete(chunk.tool_call_id); } + // Continue processing this chunk (for UI display) + } - // Accumulate arguments (may arrive across multiple chunks) - if (toolCall.arguments) { - existing.toolArgs += toolCall.arguments; + // Need to store the approval request ID to send an approval in a new run + if (chunk.message_type === "approval_request_message") { + _approvalRequestId = chunk.id; + } + + // Accumulate approval request state across streaming chunks + // Support parallel tool calls by tracking each tool_call_id separately + // NOTE: Only track approval_request_message, NOT tool_call_message + // tool_call_message = auto-executed server-side (e.g., web_search) + // approval_request_message = needs user approval (e.g., Bash) + if (chunk.message_type === "approval_request_message") { + // console.log( + // "[drainStream] approval_request_message chunk:", + // JSON.stringify(chunk, null, 2), + // ); + + // Normalize tool calls: support both legacy tool_call and new tool_calls array + const toolCalls = Array.isArray(chunk.tool_calls) + ? chunk.tool_calls + : chunk.tool_call + ? [chunk.tool_call] + : []; + + for (const toolCall of toolCalls) { + if (!toolCall?.tool_call_id) continue; // strict: require id + + // Get or create entry for this tool_call_id + const existing = pendingApprovals.get(toolCall.tool_call_id) || { + toolCallId: toolCall.tool_call_id, + toolName: "", + toolArgs: "", + }; + + // Update name if provided + if (toolCall.name) { + existing.toolName = toolCall.name; + } + + // Accumulate arguments (may arrive across multiple chunks) + if (toolCall.arguments) { + existing.toolArgs += toolCall.arguments; + } + + pendingApprovals.set(toolCall.tool_call_id, existing); } + } - pendingApprovals.set(toolCall.tool_call_id, existing); + // Check abort signal before processing - don't add data after interrupt + if (abortSignal?.aborted) { + stopReason = "cancelled"; + markIncompleteToolsAsCancelled(buffers); + queueMicrotask(refresh); + break; + } + + onChunk(buffers, chunk); + queueMicrotask(refresh); + + if (chunk.message_type === "stop_reason") { + stopReason = chunk.stop_reason; + // Continue reading stream to get usage_statistics that may come after } } + } catch (e) { + // Handle stream errors (e.g., JSON parse errors from SDK, network issues) + // This can happen when the stream ends with incomplete data + const errorMessage = e instanceof Error ? e.message : String(e); + debugWarn("drainStream", "Stream error caught:", errorMessage); - // Check abort signal before processing - don't add data after interrupt - if (abortSignal?.aborted) { - stopReason = "cancelled"; - markIncompleteToolsAsCancelled(buffers); - queueMicrotask(refresh); - break; - } + // Capture the error message for display + streamError = errorMessage; - onChunk(buffers, chunk); + // Set error stop reason so drainStreamWithResume can try to reconnect + stopReason = "error"; + markIncompleteToolsAsCancelled(buffers); queueMicrotask(refresh); - - if (chunk.message_type === "stop_reason") { - stopReason = chunk.stop_reason; - // Continue reading stream to get usage_statistics that may come after - } } // Stream has ended, check if we captured a stop reason @@ -218,6 +235,7 @@ export async function drainStream( lastRunId, lastSeqId, apiDurationMs, + streamError, }; } @@ -260,6 +278,9 @@ export async function drainStreamWithResume( result.lastSeqId !== null && !abortSignal?.aborted ) { + // Preserve the original error in case resume fails + const originalStreamError = result.streamError; + try { const client = await getClient(); // Resume from Redis where we left off @@ -278,10 +299,12 @@ export async function drainStreamWithResume( ); // Use the resume result (should have proper stop_reason now) + // Clear the original stream error since we recovered result = resumeResult; } catch (_e) { // Resume failed - stick with the error stop_reason - // The original error result will be returned + // Restore the original stream error for display + result.streamError = originalStreamError; } }