diff --git a/src/cli/helpers/stream.ts b/src/cli/helpers/stream.ts index 67c6327..a9719a6 100644 --- a/src/cli/helpers/stream.ts +++ b/src/cli/helpers/stream.ts @@ -13,6 +13,7 @@ import { markIncompleteToolsAsCancelled, onChunk, } from "./accumulator"; +import { StreamProcessor } from "./streamProcessor"; export type ApprovalRequest = { toolCallId: string; @@ -45,19 +46,9 @@ export async function drainStream( )[STREAM_REQUEST_START_TIME]; let hasLoggedTTFT = false; - let _approvalRequestId: string | null = null; - const pendingApprovals = new Map< - string, - { - toolCallId: string; - toolName: string; - toolArgs: string; - } - >(); + const streamProcessor = new StreamProcessor(); let stopReason: StopReasonType | null = null; - let lastRunId: string | null = null; - let lastSeqId: number | null = null; let hasCalledFirstMessage = false; let fallbackError: string | null = null; @@ -114,16 +105,6 @@ export async function drainStream( queueMicrotask(refresh); break; } - // Store the run_id (for error reporting) and seq_id (for stream resumption) - // Capture run_id even if seq_id is missing - we need it for error details - if ("run_id" in chunk && chunk.run_id) { - lastRunId = chunk.run_id; - } - if ("seq_id" in chunk && chunk.seq_id) { - lastSeqId = chunk.seq_id; - } - - if (chunk.message_type === "ping") continue; // Call onFirstMessage callback on the first agent response chunk if ( @@ -149,61 +130,7 @@ export async function drainStream( logTiming(`TTFT: ${formatDuration(ttft)} (from POST to first content)`); } - // Remove tool from pending approvals when it completes (server-side execution finished) - // This means the tool was executed server-side and doesn't need approval - if (chunk.message_type === "tool_return_message") { - if (chunk.tool_call_id) { - pendingApprovals.delete(chunk.tool_call_id); - } - // Continue processing this chunk (for UI display) - } - - // Need to store the approval request ID to send an approval in a new run - if (chunk.message_type === "approval_request_message") { - _approvalRequestId = chunk.id; - } - - // Accumulate approval request state across streaming chunks - // Support parallel tool calls by tracking each tool_call_id separately - // NOTE: Only track approval_request_message, NOT tool_call_message - // tool_call_message = auto-executed server-side (e.g., web_search) - // approval_request_message = needs user approval (e.g., Bash) - if (chunk.message_type === "approval_request_message") { - // console.log( - // "[drainStream] approval_request_message chunk:", - // JSON.stringify(chunk, null, 2), - // ); - - // Normalize tool calls: support both legacy tool_call and new tool_calls array - const toolCalls = Array.isArray(chunk.tool_calls) - ? chunk.tool_calls - : chunk.tool_call - ? [chunk.tool_call] - : []; - - for (const toolCall of toolCalls) { - if (!toolCall?.tool_call_id) continue; // strict: require id - - // Get or create entry for this tool_call_id - const existing = pendingApprovals.get(toolCall.tool_call_id) || { - toolCallId: toolCall.tool_call_id, - toolName: "", - toolArgs: "", - }; - - // Update name if provided - if (toolCall.name) { - existing.toolName = toolCall.name; - } - - // Accumulate arguments (may arrive across multiple chunks) - if (toolCall.arguments) { - existing.toolArgs += toolCall.arguments; - } - - pendingApprovals.set(toolCall.tool_call_id, existing); - } - } + const { shouldOutput } = streamProcessor.processChunk(chunk); // Check abort signal before processing - don't add data after interrupt if (abortSignal?.aborted) { @@ -213,24 +140,9 @@ export async function drainStream( break; } - // Suppress mid-stream desync errors (match headless behavior) - // These are transient and will be handled by end-of-turn desync recovery - const errObj = (chunk as unknown as { error?: { detail?: string } }) - .error; - if ( - errObj?.detail?.includes("No tool call is currently awaiting approval") - ) { - // Server isn't ready for approval yet; let the stream continue - // Suppress the error frame from output - continue; - } - - onChunk(buffers, chunk); - queueMicrotask(refresh); - - if (chunk.message_type === "stop_reason") { - stopReason = chunk.stop_reason; - // Continue reading stream to get usage_statistics that may come after + if (shouldOutput) { + onChunk(buffers, chunk); + queueMicrotask(refresh); } } } catch (e) { @@ -240,17 +152,21 @@ export async function drainStream( debugWarn("drainStream", "Stream error caught:", errorMessage); // Try to extract run_id from APIError if we don't have one yet - if (!lastRunId && e instanceof APIError && e.error) { + if (!streamProcessor.lastRunId && e instanceof APIError && e.error) { const errorObj = e.error as Record; if ("run_id" in errorObj && typeof errorObj.run_id === "string") { - lastRunId = errorObj.run_id; - debugWarn("drainStream", "Extracted run_id from error:", lastRunId); + streamProcessor.lastRunId = errorObj.run_id; + debugWarn( + "drainStream", + "Extracted run_id from error:", + streamProcessor.lastRunId, + ); } } // Only set fallbackError if we don't have a run_id - if we have a run_id, // App.tsx will fetch detailed error info from the server which is better - if (!lastRunId) { + if (!streamProcessor.lastRunId) { fallbackError = errorMessage; } @@ -265,6 +181,10 @@ export async function drainStream( } } + if (!stopReason && streamProcessor.stopReason) { + stopReason = streamProcessor.stopReason; + } + // If we aborted via listener but loop exited without setting stopReason // (SDK returns gracefully on abort), mark as cancelled if (abortedViaListener && !stopReason) { @@ -294,7 +214,7 @@ export async function drainStream( if (stopReason === "requires_approval") { // Convert map to array, including ALL tool_call_ids (even incomplete ones) // Incomplete entries will be denied at the business logic layer - const allPending = Array.from(pendingApprovals.values()); + const allPending = Array.from(streamProcessor.pendingApprovals.values()); // console.log( // "[drainStream] All pending approvals before processing:", // JSON.stringify(allPending, null, 2), @@ -322,8 +242,7 @@ export async function drainStream( } // Clear the map for next turn - pendingApprovals.clear(); - _approvalRequestId = null; + streamProcessor.pendingApprovals.clear(); } const apiDurationMs = performance.now() - startTime; @@ -332,8 +251,8 @@ export async function drainStream( stopReason, approval, approvals, - lastRunId, - lastSeqId, + lastRunId: streamProcessor.lastRunId, + lastSeqId: streamProcessor.lastSeqId, apiDurationMs, fallbackError, }; diff --git a/src/cli/helpers/streamProcessor.ts b/src/cli/helpers/streamProcessor.ts new file mode 100644 index 0000000..4fb2bda --- /dev/null +++ b/src/cli/helpers/streamProcessor.ts @@ -0,0 +1,194 @@ +import type { LettaStreamingResponse } from "@letta-ai/letta-client/resources/agents/messages"; +import type { StopReasonType } from "@letta-ai/letta-client/resources/runs/runs"; + +// ============================================================================ +// TYPES +// ============================================================================ + +export interface ApprovalRequest { + toolCallId: string; + toolName: string; + toolArgs: string; +} + +export interface ErrorInfo { + message: string; + error_type?: string; + detail?: string; + run_id?: string; +} + +export interface ChunkProcessingResult { + /** Whether this chunk should be output to the user */ + shouldOutput: boolean; + + /** If this is an error chunk, formatted error message */ + errorInfo?: ErrorInfo; + + /** If this chunk updated an approval, the current state */ + updatedApproval?: ApprovalRequest; +} + +// ============================================================================ +// STREAM PROCESSOR +// ============================================================================ + +export class StreamProcessor { + // State tracking (public for easy access - wrapper decides usage) + public pendingApprovals = new Map(); + public runIds = new Set(); + public lastRunId: string | null = null; + public lastSeqId: number | null = null; + public stopReason: StopReasonType | null = null; + + // Approval ID fallback (for backends that don't include tool_call_id in every chunk) + private lastApprovalId: string | null = null; + + processChunk(chunk: LettaStreamingResponse): ChunkProcessingResult { + let errorInfo: ErrorInfo | undefined; + let updatedApproval: ApprovalRequest | undefined; + // Store the run_id (for error reporting) and seq_id (for stream resumption) + // Capture run_id even if seq_id is missing - we need it for error details + if ("run_id" in chunk && chunk.run_id) { + this.runIds.add(chunk.run_id); + this.lastRunId = chunk.run_id; + } + + // Track seq_id (drainStream line 122-124) + if ("seq_id" in chunk && chunk.seq_id) { + this.lastSeqId = chunk.seq_id; + } + + // Skip ping messages (drainStream line 126) + if (chunk.message_type === "ping") { + return { shouldOutput: false }; + } + + // Detect mid-stream errors + // Case 1: LettaErrorMessage from the API (has message_type: "error_message") + if ("message_type" in chunk && chunk.message_type === "error_message") { + // This is a LettaErrorMessage + const apiError = chunk as LettaStreamingResponse.LettaErrorMessage; + errorInfo = { + message: apiError.message, + error_type: apiError.error_type, + detail: apiError.detail, + run_id: this.lastRunId || undefined, + }; + } + // Case 2: Generic error object without message_type + const chunkWithError = chunk as typeof chunk & { + error?: { message?: string; detail?: string }; + }; + if (chunkWithError.error && !("message_type" in chunk)) { + const errorText = chunkWithError.error.message || "An error occurred"; + const errorDetail = chunkWithError.error.detail || ""; + errorInfo = { + message: errorDetail ? `${errorText}: ${errorDetail}` : errorText, + run_id: this.lastRunId || undefined, + }; + } + + // Suppress mid-stream desync errors (match headless behavior) + // These are transient and will be handled by end-of-turn desync recovery + if ( + errorInfo?.message?.includes( + "No tool call is currently awaiting approval", + ) + ) { + // Server isn't ready for approval yet; let the stream continue until it is + // Suppress the error frame from output + return { shouldOutput: false, errorInfo }; + } + + // Remove tool from pending approvals when it completes (server-side execution finished) + // This means the tool was executed server-side and doesn't need approval + if (chunk.message_type === "tool_return_message") { + if (chunk.tool_call_id) { + this.pendingApprovals.delete(chunk.tool_call_id); + } + // Continue processing this chunk (for UI display) + } + + // Need to store the approval request ID to send an approval in a new run + if (chunk.message_type === "approval_request_message") { + this.lastApprovalId = chunk.id; + } + + // Accumulate approval request state across streaming chunks + // Support parallel tool calls by tracking each tool_call_id separately + // NOTE: Only track approval_request_message, NOT tool_call_message + // tool_call_message = auto-executed server-side (e.g., web_search) + // approval_request_message = needs user approval (e.g., Bash) + if (chunk.message_type === "approval_request_message") { + // console.log( + // "[drainStream] approval_request_message chunk:", + // JSON.stringify(chunk, null, 2), + // ); + + // Normalize tool calls: support both legacy tool_call and new tool_calls array + const toolCalls = Array.isArray(chunk.tool_calls) + ? chunk.tool_calls + : chunk.tool_call + ? [chunk.tool_call] + : []; + + for (const toolCall of toolCalls) { + // Many backends stream tool_call chunks where only the first frame + // carries the tool_call_id; subsequent argument deltas omit it. + // Fall back to the last seen id within this turn so we can + // properly accumulate args. + let id: string | null = toolCall?.tool_call_id ?? this.lastApprovalId; + if (!id) { + // As an additional guard, if exactly one approval is being + // tracked already, use that id for continued argument deltas. + if (this.pendingApprovals.size === 1) { + id = Array.from(this.pendingApprovals.keys())[0] ?? null; + } + } + if (!id) continue; // cannot safely attribute this chunk + + this.lastApprovalId = id; + + // Get or create entry for this tool_call_id + const existing = this.pendingApprovals.get(id) || { + toolCallId: id, + toolName: "", + toolArgs: "", + }; + + // Update name if provided + if (toolCall.name) { + existing.toolName = toolCall.name; + } + + // Accumulate arguments (may arrive across multiple chunks) + if (toolCall.arguments) { + existing.toolArgs += toolCall.arguments; + } + + this.pendingApprovals.set(id, existing); + updatedApproval = existing; + } + } + + if (chunk.message_type === "stop_reason") { + this.stopReason = chunk.stop_reason; + // Continue reading stream to get usage_statistics that may come after + } + + // Default: output this chunk + return { shouldOutput: true, errorInfo, updatedApproval }; + } + + /** + * Get accumulated approvals as array + */ + getApprovals(): ApprovalRequest[] { + return Array.from(this.pendingApprovals.values()).map((a) => ({ + toolCallId: a.toolCallId, + toolName: a.toolName, + toolArgs: a.toolArgs, + })); + } +} diff --git a/src/headless.ts b/src/headless.ts index 11de01c..ebcbe1b 100644 --- a/src/headless.ts +++ b/src/headless.ts @@ -4,10 +4,7 @@ import type { AgentState, MessageCreate, } from "@letta-ai/letta-client/resources/agents/agents"; -import type { - ApprovalCreate, - LettaStreamingResponse, -} from "@letta-ai/letta-client/resources/agents/messages"; +import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages"; import type { StopReasonType } from "@letta-ai/letta-client/resources/runs/runs"; import type { ApprovalResult } from "./agent/approval-execution"; import { @@ -30,6 +27,7 @@ import { import { formatErrorDetails } from "./cli/helpers/errorFormatter"; import { safeJsonParseOr } from "./cli/helpers/safeJsonParse"; import { drainStreamWithResume } from "./cli/helpers/stream"; +import { StreamProcessor } from "./cli/helpers/streamProcessor"; import { settingsManager } from "./settings-manager"; import { checkToolPermission } from "./tools/manager"; import type { @@ -729,7 +727,7 @@ export async function handleHeadlessCommand( const stream = await sendMessageStream(conversationId, currentInput); // For stream-json, output each chunk as it arrives - let stopReason: StopReasonType; + let stopReason: StopReasonType | null = null; let approvals: Array<{ toolCallId: string; toolName: string; @@ -740,71 +738,35 @@ export async function handleHeadlessCommand( if (outputFormat === "stream-json") { const startTime = performance.now(); - let lastStopReason: StopReasonType | null = null; // Track approval requests across streamed chunks - const approvalRequests = new Map< - string, - { toolName: string; args: string } - >(); const autoApprovalEmitted = new Set(); - let _lastApprovalId: string | null = null; - // Track all run_ids seen during this turn - const runIds = new Set(); + const streamProcessor = new StreamProcessor(); for await (const chunk of stream) { - // Track run_id if present - if ("run_id" in chunk && chunk.run_id) { - runIds.add(chunk.run_id); - } + const { shouldOutput, errorInfo, updatedApproval } = + streamProcessor.processChunk(chunk); // Detect mid-stream errors - // Case 1: LettaErrorMessage from the API (has message_type: "error_message") - if ( - "message_type" in chunk && - chunk.message_type === "error_message" - ) { - // This is a LettaErrorMessage - nest it in our wire format - const apiError = chunk as LettaStreamingResponse.LettaErrorMessage; + if (errorInfo && shouldOutput) { const errorEvent: ErrorMessage = { type: "error", - message: apiError.message, - stop_reason: "error", - run_id: apiError.run_id, - api_error: apiError, - session_id: sessionId, - uuid: crypto.randomUUID(), - }; - console.log(JSON.stringify(errorEvent)); - - // Still accumulate for tracking - const { onChunk: accumulatorOnChunk } = await import( - "./cli/helpers/accumulator" - ); - accumulatorOnChunk(buffers, chunk); - continue; - } - - // Case 2: Generic error object without message_type - const chunkWithError = chunk as typeof chunk & { - error?: { message?: string; detail?: string }; - }; - if (chunkWithError.error && !("message_type" in chunk)) { - // Emit as error event - const errorText = - chunkWithError.error.message || "An error occurred"; - const errorDetail = chunkWithError.error.detail || ""; - const fullErrorText = errorDetail - ? `${errorText}: ${errorDetail}` - : errorText; - - const errorEvent: ErrorMessage = { - type: "error", - message: fullErrorText, + message: errorInfo.message, stop_reason: "error", + run_id: errorInfo.run_id, session_id: sessionId, uuid: crypto.randomUUID(), + ...(errorInfo.error_type && + errorInfo.run_id && { + api_error: { + message_type: "error_message", + message: errorInfo.message, + error_type: errorInfo.error_type, + detail: errorInfo.detail, + run_id: errorInfo.run_id, + }, + }), }; console.log(JSON.stringify(errorEvent)); @@ -817,136 +779,58 @@ export async function handleHeadlessCommand( } // Detect server conflict due to pending approval; handle it and retry - const errObj = (chunk as unknown as { error?: { detail?: string } }) - .error; - if (errObj?.detail?.includes("Cannot send a new message")) { + if (errorInfo?.message?.includes("Cannot send a new message")) { // Don't emit this error; clear approvals and retry outer loop await resolveAllPendingApprovals(); // Reset state and restart turn - lastStopReason = "error" as StopReasonType; + stopReason = "error" as StopReasonType; break; } - if ( - errObj?.detail?.includes( - "No tool call is currently awaiting approval", - ) - ) { - // Server isn't ready for an approval yet; let the stream continue until it is - // Suppress the error frame from output - continue; - } + // Check if we should skip outputting approval requests in bypass mode - const isApprovalRequest = - chunk.message_type === "approval_request_message"; - let shouldOutputChunk = true; + let shouldOutputChunk = shouldOutput; - // Track approval requests (stream-aware: accumulate by tool_call_id) - if (isApprovalRequest) { - const chunkWithTools = chunk as typeof chunk & { - tool_call?: { - tool_call_id?: string; - name?: string; - arguments?: string; - }; - tool_calls?: Array<{ - tool_call_id?: string; - name?: string; - arguments?: string; - }>; - }; - - const toolCalls = Array.isArray(chunkWithTools.tool_calls) - ? chunkWithTools.tool_calls - : chunkWithTools.tool_call - ? [chunkWithTools.tool_call] - : []; - - for (const toolCall of toolCalls) { - // Many backends stream tool_call chunks where only the first frame - // carries the tool_call_id; subsequent argument deltas omit it. - // Fall back to the last seen id within this turn so we can - // properly accumulate args. - let id: string | null = toolCall?.tool_call_id ?? _lastApprovalId; - if (!id) { - // As an additional guard, if exactly one approval is being - // tracked already, use that id for continued argument deltas. - if (approvalRequests.size === 1) { - id = Array.from(approvalRequests.keys())[0] ?? null; - } - } - if (!id) continue; // cannot safely attribute this chunk - - _lastApprovalId = id; - - // Concatenate argument deltas; do not inject placeholder JSON - const prev = approvalRequests.get(id); - const base = prev?.args ?? ""; - const incomingArgs = - toolCall?.arguments != null ? base + toolCall.arguments : base; - - // Preserve previously seen name; set if provided in this chunk - const nextName = toolCall?.name || prev?.toolName || ""; - approvalRequests.set(id, { - toolName: nextName, - args: incomingArgs, - }); - - // Keep an up-to-date approvals array for downstream handling - // Update existing approval if present, otherwise add new one - const existingIndex = approvals.findIndex( - (a) => a.toolCallId === id, + // Check if this approval will be auto-approved. Dedup per tool_call_id + if ( + updatedApproval && + !autoApprovalEmitted.has(updatedApproval.toolCallId) && + updatedApproval.toolName + ) { + const parsedArgs = safeJsonParseOr | null>( + updatedApproval.toolArgs || "{}", + null, + ); + const permission = await checkToolPermission( + updatedApproval.toolName, + parsedArgs || {}, + ); + if (permission.decision === "allow" && parsedArgs) { + // Only emit auto_approval if we already have all required params + const { getToolSchema } = await import("./tools/manager"); + const schema = getToolSchema(updatedApproval.toolName); + const required = + (schema?.input_schema?.required as string[] | undefined) || []; + const missing = required.filter( + (key) => + !(key in parsedArgs) || + (parsedArgs as Record)[key] == null, ); - const approvalObj = { - toolCallId: id, - toolName: nextName, - toolArgs: incomingArgs, - }; - if (existingIndex >= 0) { - approvals[existingIndex] = approvalObj; - } else { - approvals.push(approvalObj); - } - - // Check if this approval will be auto-approved. Dedup per tool_call_id - if (!autoApprovalEmitted.has(id) && nextName) { - const parsedArgs = safeJsonParseOr | null>(incomingArgs || "{}", null); - const permission = await checkToolPermission( - nextName, - parsedArgs || {}, - ); - if (permission.decision === "allow" && parsedArgs) { - // Only emit auto_approval if we already have all required params - const { getToolSchema } = await import("./tools/manager"); - const schema = getToolSchema(nextName); - const required = - (schema?.input_schema?.required as string[] | undefined) || - []; - const missing = required.filter( - (key) => - !(key in parsedArgs) || - (parsedArgs as Record)[key] == null, - ); - if (missing.length === 0) { - shouldOutputChunk = false; - const autoApprovalMsg: AutoApprovalMessage = { - type: "auto_approval", - tool_call: { - name: nextName, - tool_call_id: id, - arguments: incomingArgs || "{}", - }, - reason: permission.reason || "Allowed by permission rule", - matched_rule: permission.matchedRule || "auto-approved", - session_id: sessionId, - uuid: `auto-approval-${id}`, - }; - console.log(JSON.stringify(autoApprovalMsg)); - autoApprovalEmitted.add(id); - } - } + if (missing.length === 0) { + shouldOutputChunk = false; + const autoApprovalMsg: AutoApprovalMessage = { + type: "auto_approval", + tool_call: { + name: updatedApproval.toolName, + tool_call_id: updatedApproval.toolCallId, + arguments: updatedApproval.toolArgs || "{}", + }, + reason: permission.reason || "Allowed by permission rule", + matched_rule: permission.matchedRule || "auto-approved", + session_id: sessionId, + uuid: `auto-approval-${updatedApproval.toolCallId}`, + }; + console.log(JSON.stringify(autoApprovalMsg)); + autoApprovalEmitted.add(updatedApproval.toolCallId); } } } @@ -984,17 +868,13 @@ export async function handleHeadlessCommand( // Still accumulate for approval tracking const { onChunk } = await import("./cli/helpers/accumulator"); onChunk(buffers, chunk); - - // Track stop reason - if (chunk.message_type === "stop_reason") { - lastStopReason = chunk.stop_reason; - } } - stopReason = lastStopReason || "error"; + stopReason = stopReason || streamProcessor.stopReason || "error"; apiDurationMs = performance.now() - startTime; + approvals = streamProcessor.getApprovals(); // Use the last run_id we saw (if any) - lastRunId = runIds.size > 0 ? Array.from(runIds).pop() || null : null; + lastRunId = streamProcessor.lastRunId; if (lastRunId) lastKnownRunId = lastRunId; // Mark final line as finished