refactor: use stream processor for bidirectional mode (#539)

This commit is contained in:
cthomas
2026-01-14 12:51:44 -08:00
committed by GitHub
parent 34fbf31528
commit e5b77cd882

View File

@@ -1648,12 +1648,7 @@ async function runBidirectionalMode(
// Send message to agent
const stream = await sendMessageStream(conversationId, currentInput);
// Track stop reason and approvals during this stream
let stopReason: StopReasonType = "error";
const approvalRequests = new Map<
string,
{ toolName: string; args: string }
>();
const streamProcessor = new StreamProcessor();
// Process stream
for await (const chunk of stream) {
@@ -1662,60 +1657,44 @@ async function runBidirectionalMode(
break;
}
// Track stop reason
if (chunk.message_type === "stop_reason") {
stopReason = chunk.stop_reason;
}
// Process chunk through StreamProcessor
const { shouldOutput } = streamProcessor.processChunk(chunk);
// Track approval requests
if (chunk.message_type === "approval_request_message") {
const chunkWithTools = chunk as typeof chunk & {
tool_call?: {
tool_call_id?: string;
name?: string;
arguments?: string;
// Output chunk if not suppressed
if (shouldOutput) {
const chunkWithIds = chunk as typeof chunk & {
otid?: string;
id?: string;
};
const uuid = chunkWithIds.otid || chunkWithIds.id;
if (includePartialMessages) {
const streamEvent: StreamEvent = {
type: "stream_event",
event: chunk,
session_id: sessionId,
uuid: uuid || crypto.randomUUID(),
};
};
const toolCall = chunkWithTools.tool_call;
if (toolCall?.tool_call_id && toolCall?.name) {
const existing = approvalRequests.get(toolCall.tool_call_id);
approvalRequests.set(toolCall.tool_call_id, {
toolName: toolCall.name,
args: (existing?.args || "") + (toolCall.arguments || ""),
});
console.log(JSON.stringify(streamEvent));
} else {
const msg: MessageWire = {
type: "message",
...chunk,
session_id: sessionId,
uuid: uuid || crypto.randomUUID(),
};
console.log(JSON.stringify(msg));
}
}
// Output chunk
const chunkWithIds = chunk as typeof chunk & {
otid?: string;
id?: string;
};
const uuid = chunkWithIds.otid || chunkWithIds.id;
if (includePartialMessages) {
const streamEvent: StreamEvent = {
type: "stream_event",
event: chunk,
session_id: sessionId,
uuid: uuid || crypto.randomUUID(),
};
console.log(JSON.stringify(streamEvent));
} else {
const msg: MessageWire = {
type: "message",
...chunk,
session_id: sessionId,
uuid: uuid || crypto.randomUUID(),
};
console.log(JSON.stringify(msg));
}
// Accumulate for result
const { onChunk } = await import("./cli/helpers/accumulator");
onChunk(buffers, chunk);
}
// Get stop reason from processor
const stopReason = streamProcessor.stopReason || "error";
// Case 1: Turn ended normally - break out of loop
if (stopReason === "end_turn") {
break;
@@ -1728,13 +1707,7 @@ async function runBidirectionalMode(
// Case 3: Requires approval - process approvals and continue
if (stopReason === "requires_approval") {
const approvals = Array.from(approvalRequests.entries()).map(
([toolCallId, { toolName, args }]) => ({
toolCallId,
toolName,
toolArgs: args,
}),
);
const approvals = streamProcessor.getApprovals();
if (approvals.length === 0) {
// No approvals to process - break