import type { Stream } from "@letta-ai/letta-client/core/streaming"; import type { LettaStreamingResponse } from "@letta-ai/letta-client/resources/agents/messages"; import type { StopReasonType } from "@letta-ai/letta-client/resources/runs/runs"; import { getClient } from "../../agent/client"; import { debugWarn } from "../../utils/debug"; import { type createBuffers, markCurrentLineAsFinished, markIncompleteToolsAsCancelled, onChunk, } from "./accumulator"; export type ApprovalRequest = { toolCallId: string; toolName: string; toolArgs: string; }; type DrainResult = { stopReason: StopReasonType; lastRunId?: string | null; lastSeqId?: number | null; approval?: ApprovalRequest | null; // DEPRECATED: kept for backward compat approvals?: ApprovalRequest[]; // NEW: supports parallel approvals apiDurationMs: number; // time spent in API call streamError?: string | null; // Client-side error message (e.g., JSON parse error) }; export async function drainStream( stream: Stream, buffers: ReturnType, refresh: () => void, abortSignal?: AbortSignal, onFirstMessage?: () => void, ): Promise { const startTime = performance.now(); let _approvalRequestId: string | null = null; const pendingApprovals = new Map< string, { toolCallId: string; toolName: string; toolArgs: string; } >(); let stopReason: StopReasonType | null = null; let lastRunId: string | null = null; let lastSeqId: number | null = null; let hasCalledFirstMessage = false; let streamError: string | null = null; try { for await (const chunk of stream) { // console.log("chunk", chunk); // Check if stream was aborted if (abortSignal?.aborted) { stopReason = "cancelled"; markIncompleteToolsAsCancelled(buffers); queueMicrotask(refresh); break; } // Store the run_id and seq_id to re-connect if stream is interrupted if ( "run_id" in chunk && "seq_id" in chunk && chunk.run_id && chunk.seq_id ) { lastRunId = chunk.run_id; lastSeqId = chunk.seq_id; } if (chunk.message_type === "ping") continue; // Call onFirstMessage callback on the first agent response chunk if ( !hasCalledFirstMessage && onFirstMessage && (chunk.message_type === "reasoning_message" || chunk.message_type === "assistant_message") ) { hasCalledFirstMessage = true; // Call async in background - don't block stream processing queueMicrotask(() => onFirstMessage()); } // Remove tool from pending approvals when it completes (server-side execution finished) // This means the tool was executed server-side and doesn't need approval if (chunk.message_type === "tool_return_message") { if (chunk.tool_call_id) { pendingApprovals.delete(chunk.tool_call_id); } // Continue processing this chunk (for UI display) } // Need to store the approval request ID to send an approval in a new run if (chunk.message_type === "approval_request_message") { _approvalRequestId = chunk.id; } // Accumulate approval request state across streaming chunks // Support parallel tool calls by tracking each tool_call_id separately // NOTE: Only track approval_request_message, NOT tool_call_message // tool_call_message = auto-executed server-side (e.g., web_search) // approval_request_message = needs user approval (e.g., Bash) if (chunk.message_type === "approval_request_message") { // console.log( // "[drainStream] approval_request_message chunk:", // JSON.stringify(chunk, null, 2), // ); // Normalize tool calls: support both legacy tool_call and new tool_calls array const toolCalls = Array.isArray(chunk.tool_calls) ? chunk.tool_calls : chunk.tool_call ? [chunk.tool_call] : []; for (const toolCall of toolCalls) { if (!toolCall?.tool_call_id) continue; // strict: require id // Get or create entry for this tool_call_id const existing = pendingApprovals.get(toolCall.tool_call_id) || { toolCallId: toolCall.tool_call_id, toolName: "", toolArgs: "", }; // Update name if provided if (toolCall.name) { existing.toolName = toolCall.name; } // Accumulate arguments (may arrive across multiple chunks) if (toolCall.arguments) { existing.toolArgs += toolCall.arguments; } pendingApprovals.set(toolCall.tool_call_id, existing); } } // Check abort signal before processing - don't add data after interrupt if (abortSignal?.aborted) { stopReason = "cancelled"; markIncompleteToolsAsCancelled(buffers); queueMicrotask(refresh); break; } onChunk(buffers, chunk); queueMicrotask(refresh); if (chunk.message_type === "stop_reason") { stopReason = chunk.stop_reason; // Continue reading stream to get usage_statistics that may come after } } } catch (e) { // Handle stream errors (e.g., JSON parse errors from SDK, network issues) // This can happen when the stream ends with incomplete data const errorMessage = e instanceof Error ? e.message : String(e); debugWarn("drainStream", "Stream error caught:", errorMessage); // Capture the error message for display streamError = errorMessage; // Set error stop reason so drainStreamWithResume can try to reconnect stopReason = "error"; markIncompleteToolsAsCancelled(buffers); queueMicrotask(refresh); } // Stream has ended, check if we captured a stop reason if (!stopReason) { stopReason = "error"; } // Mark incomplete tool calls as cancelled if stream was cancelled if (stopReason === "cancelled") { markIncompleteToolsAsCancelled(buffers); } // Mark the final line as finished now that stream has ended markCurrentLineAsFinished(buffers); queueMicrotask(refresh); // Package the approval request(s) at the end, with validation let approval: ApprovalRequest | null = null; let approvals: ApprovalRequest[] = []; if (stopReason === "requires_approval") { // Convert map to array, including ALL tool_call_ids (even incomplete ones) // Incomplete entries will be denied at the business logic layer const allPending = Array.from(pendingApprovals.values()); // console.log( // "[drainStream] All pending approvals before processing:", // JSON.stringify(allPending, null, 2), // ); // Include ALL tool_call_ids - don't filter out incomplete entries // Missing name/args will be handled by denial logic in App.tsx approvals = allPending.map((a) => ({ toolCallId: a.toolCallId, toolName: a.toolName || "", toolArgs: a.toolArgs || "", })); if (approvals.length === 0) { debugWarn( "drainStream", "No approvals collected despite requires_approval stop reason", ); debugWarn("drainStream", "Pending approvals map:", allPending); } else { // Set legacy singular field for backward compatibility approval = approvals[0] || null; } // Clear the map for next turn pendingApprovals.clear(); _approvalRequestId = null; } const apiDurationMs = performance.now() - startTime; return { stopReason, approval, approvals, lastRunId, lastSeqId, apiDurationMs, streamError, }; } /** * Drain a stream with automatic resume on disconnect. * * If the stream ends without receiving a proper stop_reason chunk (indicating * an unexpected disconnect), this will automatically attempt to resume from * Redis using the last received run_id and seq_id. * * @param stream - Initial stream from agent.messages.stream() * @param buffers - Buffer to accumulate chunks * @param refresh - Callback to refresh UI * @param abortSignal - Optional abort signal for cancellation * @param onFirstMessage - Optional callback to invoke on first message chunk * @returns Result with stop_reason, approval info, and timing */ export async function drainStreamWithResume( stream: Stream, buffers: ReturnType, refresh: () => void, abortSignal?: AbortSignal, onFirstMessage?: () => void, ): Promise { const overallStartTime = performance.now(); // Attempt initial drain let result = await drainStream( stream, buffers, refresh, abortSignal, onFirstMessage, ); // If stream ended without proper stop_reason and we have resume info, try once to reconnect if ( result.stopReason === "error" && result.lastRunId && result.lastSeqId !== null && !abortSignal?.aborted ) { // Preserve the original error in case resume fails const originalStreamError = result.streamError; try { const client = await getClient(); // Resume from Redis where we left off const resumeStream = await client.runs.messages.stream(result.lastRunId, { starting_after: result.lastSeqId, batch_size: 1000, // Fetch buffered chunks quickly }); // Continue draining from where we left off // Note: Don't pass onFirstMessage again - already called in initial drain const resumeResult = await drainStream( resumeStream, buffers, refresh, abortSignal, ); // Use the resume result (should have proper stop_reason now) // Clear the original stream error since we recovered result = resumeResult; } catch (_e) { // Resume failed - stick with the error stop_reason // Restore the original stream error for display result.streamError = originalStreamError; } } // Update duration to reflect total time (including resume attempt) result.apiDurationMs = performance.now() - overallStartTime; return result; }