feat: introduce common stream processor (#533)

This commit is contained in:
cthomas
2026-01-14 12:03:42 -08:00
committed by GitHub
parent 12c84c6d0a
commit caab9faef2
3 changed files with 282 additions and 289 deletions

View File

@@ -13,6 +13,7 @@ import {
markIncompleteToolsAsCancelled,
onChunk,
} from "./accumulator";
import { StreamProcessor } from "./streamProcessor";
export type ApprovalRequest = {
toolCallId: string;
@@ -45,19 +46,9 @@ export async function drainStream(
)[STREAM_REQUEST_START_TIME];
let hasLoggedTTFT = false;
let _approvalRequestId: string | null = null;
const pendingApprovals = new Map<
string,
{
toolCallId: string;
toolName: string;
toolArgs: string;
}
>();
const streamProcessor = new StreamProcessor();
let stopReason: StopReasonType | null = null;
let lastRunId: string | null = null;
let lastSeqId: number | null = null;
let hasCalledFirstMessage = false;
let fallbackError: string | null = null;
@@ -114,16 +105,6 @@ export async function drainStream(
queueMicrotask(refresh);
break;
}
// Store the run_id (for error reporting) and seq_id (for stream resumption)
// Capture run_id even if seq_id is missing - we need it for error details
if ("run_id" in chunk && chunk.run_id) {
lastRunId = chunk.run_id;
}
if ("seq_id" in chunk && chunk.seq_id) {
lastSeqId = chunk.seq_id;
}
if (chunk.message_type === "ping") continue;
// Call onFirstMessage callback on the first agent response chunk
if (
@@ -149,61 +130,7 @@ export async function drainStream(
logTiming(`TTFT: ${formatDuration(ttft)} (from POST to first content)`);
}
// Remove tool from pending approvals when it completes (server-side execution finished)
// This means the tool was executed server-side and doesn't need approval
if (chunk.message_type === "tool_return_message") {
if (chunk.tool_call_id) {
pendingApprovals.delete(chunk.tool_call_id);
}
// Continue processing this chunk (for UI display)
}
// Need to store the approval request ID to send an approval in a new run
if (chunk.message_type === "approval_request_message") {
_approvalRequestId = chunk.id;
}
// Accumulate approval request state across streaming chunks
// Support parallel tool calls by tracking each tool_call_id separately
// NOTE: Only track approval_request_message, NOT tool_call_message
// tool_call_message = auto-executed server-side (e.g., web_search)
// approval_request_message = needs user approval (e.g., Bash)
if (chunk.message_type === "approval_request_message") {
// console.log(
// "[drainStream] approval_request_message chunk:",
// JSON.stringify(chunk, null, 2),
// );
// Normalize tool calls: support both legacy tool_call and new tool_calls array
const toolCalls = Array.isArray(chunk.tool_calls)
? chunk.tool_calls
: chunk.tool_call
? [chunk.tool_call]
: [];
for (const toolCall of toolCalls) {
if (!toolCall?.tool_call_id) continue; // strict: require id
// Get or create entry for this tool_call_id
const existing = pendingApprovals.get(toolCall.tool_call_id) || {
toolCallId: toolCall.tool_call_id,
toolName: "",
toolArgs: "",
};
// Update name if provided
if (toolCall.name) {
existing.toolName = toolCall.name;
}
// Accumulate arguments (may arrive across multiple chunks)
if (toolCall.arguments) {
existing.toolArgs += toolCall.arguments;
}
pendingApprovals.set(toolCall.tool_call_id, existing);
}
}
const { shouldOutput } = streamProcessor.processChunk(chunk);
// Check abort signal before processing - don't add data after interrupt
if (abortSignal?.aborted) {
@@ -213,24 +140,9 @@ export async function drainStream(
break;
}
// Suppress mid-stream desync errors (match headless behavior)
// These are transient and will be handled by end-of-turn desync recovery
const errObj = (chunk as unknown as { error?: { detail?: string } })
.error;
if (
errObj?.detail?.includes("No tool call is currently awaiting approval")
) {
// Server isn't ready for approval yet; let the stream continue
// Suppress the error frame from output
continue;
}
onChunk(buffers, chunk);
queueMicrotask(refresh);
if (chunk.message_type === "stop_reason") {
stopReason = chunk.stop_reason;
// Continue reading stream to get usage_statistics that may come after
if (shouldOutput) {
onChunk(buffers, chunk);
queueMicrotask(refresh);
}
}
} catch (e) {
@@ -240,17 +152,21 @@ export async function drainStream(
debugWarn("drainStream", "Stream error caught:", errorMessage);
// Try to extract run_id from APIError if we don't have one yet
if (!lastRunId && e instanceof APIError && e.error) {
if (!streamProcessor.lastRunId && e instanceof APIError && e.error) {
const errorObj = e.error as Record<string, unknown>;
if ("run_id" in errorObj && typeof errorObj.run_id === "string") {
lastRunId = errorObj.run_id;
debugWarn("drainStream", "Extracted run_id from error:", lastRunId);
streamProcessor.lastRunId = errorObj.run_id;
debugWarn(
"drainStream",
"Extracted run_id from error:",
streamProcessor.lastRunId,
);
}
}
// Only set fallbackError if we don't have a run_id - if we have a run_id,
// App.tsx will fetch detailed error info from the server which is better
if (!lastRunId) {
if (!streamProcessor.lastRunId) {
fallbackError = errorMessage;
}
@@ -265,6 +181,10 @@ export async function drainStream(
}
}
if (!stopReason && streamProcessor.stopReason) {
stopReason = streamProcessor.stopReason;
}
// If we aborted via listener but loop exited without setting stopReason
// (SDK returns gracefully on abort), mark as cancelled
if (abortedViaListener && !stopReason) {
@@ -294,7 +214,7 @@ export async function drainStream(
if (stopReason === "requires_approval") {
// Convert map to array, including ALL tool_call_ids (even incomplete ones)
// Incomplete entries will be denied at the business logic layer
const allPending = Array.from(pendingApprovals.values());
const allPending = Array.from(streamProcessor.pendingApprovals.values());
// console.log(
// "[drainStream] All pending approvals before processing:",
// JSON.stringify(allPending, null, 2),
@@ -322,8 +242,7 @@ export async function drainStream(
}
// Clear the map for next turn
pendingApprovals.clear();
_approvalRequestId = null;
streamProcessor.pendingApprovals.clear();
}
const apiDurationMs = performance.now() - startTime;
@@ -332,8 +251,8 @@ export async function drainStream(
stopReason,
approval,
approvals,
lastRunId,
lastSeqId,
lastRunId: streamProcessor.lastRunId,
lastSeqId: streamProcessor.lastSeqId,
apiDurationMs,
fallbackError,
};

View File

@@ -0,0 +1,194 @@
import type { LettaStreamingResponse } from "@letta-ai/letta-client/resources/agents/messages";
import type { StopReasonType } from "@letta-ai/letta-client/resources/runs/runs";
// ============================================================================
// TYPES
// ============================================================================
export interface ApprovalRequest {
toolCallId: string;
toolName: string;
toolArgs: string;
}
export interface ErrorInfo {
message: string;
error_type?: string;
detail?: string;
run_id?: string;
}
export interface ChunkProcessingResult {
/** Whether this chunk should be output to the user */
shouldOutput: boolean;
/** If this is an error chunk, formatted error message */
errorInfo?: ErrorInfo;
/** If this chunk updated an approval, the current state */
updatedApproval?: ApprovalRequest;
}
// ============================================================================
// STREAM PROCESSOR
// ============================================================================
export class StreamProcessor {
// State tracking (public for easy access - wrapper decides usage)
public pendingApprovals = new Map<string, ApprovalRequest>();
public runIds = new Set<string>();
public lastRunId: string | null = null;
public lastSeqId: number | null = null;
public stopReason: StopReasonType | null = null;
// Approval ID fallback (for backends that don't include tool_call_id in every chunk)
private lastApprovalId: string | null = null;
processChunk(chunk: LettaStreamingResponse): ChunkProcessingResult {
let errorInfo: ErrorInfo | undefined;
let updatedApproval: ApprovalRequest | undefined;
// Store the run_id (for error reporting) and seq_id (for stream resumption)
// Capture run_id even if seq_id is missing - we need it for error details
if ("run_id" in chunk && chunk.run_id) {
this.runIds.add(chunk.run_id);
this.lastRunId = chunk.run_id;
}
// Track seq_id (drainStream line 122-124)
if ("seq_id" in chunk && chunk.seq_id) {
this.lastSeqId = chunk.seq_id;
}
// Skip ping messages (drainStream line 126)
if (chunk.message_type === "ping") {
return { shouldOutput: false };
}
// Detect mid-stream errors
// Case 1: LettaErrorMessage from the API (has message_type: "error_message")
if ("message_type" in chunk && chunk.message_type === "error_message") {
// This is a LettaErrorMessage
const apiError = chunk as LettaStreamingResponse.LettaErrorMessage;
errorInfo = {
message: apiError.message,
error_type: apiError.error_type,
detail: apiError.detail,
run_id: this.lastRunId || undefined,
};
}
// Case 2: Generic error object without message_type
const chunkWithError = chunk as typeof chunk & {
error?: { message?: string; detail?: string };
};
if (chunkWithError.error && !("message_type" in chunk)) {
const errorText = chunkWithError.error.message || "An error occurred";
const errorDetail = chunkWithError.error.detail || "";
errorInfo = {
message: errorDetail ? `${errorText}: ${errorDetail}` : errorText,
run_id: this.lastRunId || undefined,
};
}
// Suppress mid-stream desync errors (match headless behavior)
// These are transient and will be handled by end-of-turn desync recovery
if (
errorInfo?.message?.includes(
"No tool call is currently awaiting approval",
)
) {
// Server isn't ready for approval yet; let the stream continue until it is
// Suppress the error frame from output
return { shouldOutput: false, errorInfo };
}
// Remove tool from pending approvals when it completes (server-side execution finished)
// This means the tool was executed server-side and doesn't need approval
if (chunk.message_type === "tool_return_message") {
if (chunk.tool_call_id) {
this.pendingApprovals.delete(chunk.tool_call_id);
}
// Continue processing this chunk (for UI display)
}
// Need to store the approval request ID to send an approval in a new run
if (chunk.message_type === "approval_request_message") {
this.lastApprovalId = chunk.id;
}
// Accumulate approval request state across streaming chunks
// Support parallel tool calls by tracking each tool_call_id separately
// NOTE: Only track approval_request_message, NOT tool_call_message
// tool_call_message = auto-executed server-side (e.g., web_search)
// approval_request_message = needs user approval (e.g., Bash)
if (chunk.message_type === "approval_request_message") {
// console.log(
// "[drainStream] approval_request_message chunk:",
// JSON.stringify(chunk, null, 2),
// );
// Normalize tool calls: support both legacy tool_call and new tool_calls array
const toolCalls = Array.isArray(chunk.tool_calls)
? chunk.tool_calls
: chunk.tool_call
? [chunk.tool_call]
: [];
for (const toolCall of toolCalls) {
// Many backends stream tool_call chunks where only the first frame
// carries the tool_call_id; subsequent argument deltas omit it.
// Fall back to the last seen id within this turn so we can
// properly accumulate args.
let id: string | null = toolCall?.tool_call_id ?? this.lastApprovalId;
if (!id) {
// As an additional guard, if exactly one approval is being
// tracked already, use that id for continued argument deltas.
if (this.pendingApprovals.size === 1) {
id = Array.from(this.pendingApprovals.keys())[0] ?? null;
}
}
if (!id) continue; // cannot safely attribute this chunk
this.lastApprovalId = id;
// Get or create entry for this tool_call_id
const existing = this.pendingApprovals.get(id) || {
toolCallId: id,
toolName: "",
toolArgs: "",
};
// Update name if provided
if (toolCall.name) {
existing.toolName = toolCall.name;
}
// Accumulate arguments (may arrive across multiple chunks)
if (toolCall.arguments) {
existing.toolArgs += toolCall.arguments;
}
this.pendingApprovals.set(id, existing);
updatedApproval = existing;
}
}
if (chunk.message_type === "stop_reason") {
this.stopReason = chunk.stop_reason;
// Continue reading stream to get usage_statistics that may come after
}
// Default: output this chunk
return { shouldOutput: true, errorInfo, updatedApproval };
}
/**
* Get accumulated approvals as array
*/
getApprovals(): ApprovalRequest[] {
return Array.from(this.pendingApprovals.values()).map((a) => ({
toolCallId: a.toolCallId,
toolName: a.toolName,
toolArgs: a.toolArgs,
}));
}
}