From e5b77cd882be504f24fac0c66bac2e4f95d4a80a Mon Sep 17 00:00:00 2001 From: cthomas Date: Wed, 14 Jan 2026 12:51:44 -0800 Subject: [PATCH] refactor: use stream processor for bidirectional mode (#539) --- src/headless.ts | 87 +++++++++++++++++-------------------------------- 1 file changed, 30 insertions(+), 57 deletions(-) diff --git a/src/headless.ts b/src/headless.ts index ebcbe1b..bd16a21 100644 --- a/src/headless.ts +++ b/src/headless.ts @@ -1648,12 +1648,7 @@ async function runBidirectionalMode( // Send message to agent const stream = await sendMessageStream(conversationId, currentInput); - // Track stop reason and approvals during this stream - let stopReason: StopReasonType = "error"; - const approvalRequests = new Map< - string, - { toolName: string; args: string } - >(); + const streamProcessor = new StreamProcessor(); // Process stream for await (const chunk of stream) { @@ -1662,60 +1657,44 @@ async function runBidirectionalMode( break; } - // Track stop reason - if (chunk.message_type === "stop_reason") { - stopReason = chunk.stop_reason; - } + // Process chunk through StreamProcessor + const { shouldOutput } = streamProcessor.processChunk(chunk); - // Track approval requests - if (chunk.message_type === "approval_request_message") { - const chunkWithTools = chunk as typeof chunk & { - tool_call?: { - tool_call_id?: string; - name?: string; - arguments?: string; + // Output chunk if not suppressed + if (shouldOutput) { + const chunkWithIds = chunk as typeof chunk & { + otid?: string; + id?: string; + }; + const uuid = chunkWithIds.otid || chunkWithIds.id; + + if (includePartialMessages) { + const streamEvent: StreamEvent = { + type: "stream_event", + event: chunk, + session_id: sessionId, + uuid: uuid || crypto.randomUUID(), }; - }; - const toolCall = chunkWithTools.tool_call; - if (toolCall?.tool_call_id && toolCall?.name) { - const existing = approvalRequests.get(toolCall.tool_call_id); - approvalRequests.set(toolCall.tool_call_id, { - toolName: toolCall.name, - args: (existing?.args || "") + (toolCall.arguments || ""), - }); + console.log(JSON.stringify(streamEvent)); + } else { + const msg: MessageWire = { + type: "message", + ...chunk, + session_id: sessionId, + uuid: uuid || crypto.randomUUID(), + }; + console.log(JSON.stringify(msg)); } } - // Output chunk - const chunkWithIds = chunk as typeof chunk & { - otid?: string; - id?: string; - }; - const uuid = chunkWithIds.otid || chunkWithIds.id; - - if (includePartialMessages) { - const streamEvent: StreamEvent = { - type: "stream_event", - event: chunk, - session_id: sessionId, - uuid: uuid || crypto.randomUUID(), - }; - console.log(JSON.stringify(streamEvent)); - } else { - const msg: MessageWire = { - type: "message", - ...chunk, - session_id: sessionId, - uuid: uuid || crypto.randomUUID(), - }; - console.log(JSON.stringify(msg)); - } - // Accumulate for result const { onChunk } = await import("./cli/helpers/accumulator"); onChunk(buffers, chunk); } + // Get stop reason from processor + const stopReason = streamProcessor.stopReason || "error"; + // Case 1: Turn ended normally - break out of loop if (stopReason === "end_turn") { break; @@ -1728,13 +1707,7 @@ async function runBidirectionalMode( // Case 3: Requires approval - process approvals and continue if (stopReason === "requires_approval") { - const approvals = Array.from(approvalRequests.entries()).map( - ([toolCallId, { toolName, args }]) => ({ - toolCallId, - toolName, - toolArgs: args, - }), - ); + const approvals = streamProcessor.getApprovals(); if (approvals.length === 0) { // No approvals to process - break