refactor(cli): unify TUI and headless stream processing (#784)

Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
Charles Packer
2026-02-02 09:59:59 -08:00
committed by GitHub
parent 7297e334f0
commit b9eaaa1b5d
4 changed files with 463 additions and 477 deletions

View File

@@ -13,6 +13,7 @@ import {
markIncompleteToolsAsCancelled,
onChunk,
} from "./accumulator";
import type { ErrorInfo } from "./streamProcessor";
import { StreamProcessor } from "./streamProcessor";
export type ApprovalRequest = {
@@ -21,6 +22,27 @@ export type ApprovalRequest = {
toolArgs: string;
};
export type DrainStreamHookContext = {
chunk: LettaStreamingResponse;
shouldOutput: boolean;
errorInfo?: ErrorInfo;
updatedApproval?: ApprovalRequest;
streamProcessor: StreamProcessor;
};
export type DrainStreamHookResult = {
shouldOutput?: boolean;
shouldAccumulate?: boolean;
stopReason?: StopReasonType;
};
export type DrainStreamHook = (
ctx: DrainStreamHookContext,
) =>
| DrainStreamHookResult
| undefined
| Promise<DrainStreamHookResult | undefined>;
type DrainResult = {
stopReason: StopReasonType;
lastRunId?: string | null;
@@ -37,6 +59,7 @@ export async function drainStream(
refresh: () => void,
abortSignal?: AbortSignal,
onFirstMessage?: () => void,
onChunkProcessed?: DrainStreamHook,
): Promise<DrainResult> {
const startTime = performance.now();
@@ -130,7 +153,8 @@ export async function drainStream(
logTiming(`TTFT: ${formatDuration(ttft)} (from POST to first content)`);
}
const { shouldOutput } = streamProcessor.processChunk(chunk);
const { shouldOutput, errorInfo, updatedApproval } =
streamProcessor.processChunk(chunk);
// Check abort signal before processing - don't add data after interrupt
if (abortSignal?.aborted) {
@@ -140,10 +164,40 @@ export async function drainStream(
break;
}
if (shouldOutput) {
let shouldOutputChunk = shouldOutput;
let shouldAccumulate = shouldOutput;
if (onChunkProcessed) {
const hookResult = await onChunkProcessed({
chunk,
shouldOutput: shouldOutputChunk,
errorInfo,
updatedApproval,
streamProcessor,
});
if (hookResult?.shouldOutput !== undefined) {
shouldOutputChunk = hookResult.shouldOutput;
}
if (hookResult?.shouldAccumulate !== undefined) {
shouldAccumulate = hookResult.shouldAccumulate;
} else {
shouldAccumulate = shouldOutputChunk;
}
if (hookResult?.stopReason) {
stopReason = hookResult.stopReason;
}
} else {
shouldAccumulate = shouldOutputChunk;
}
if (shouldAccumulate) {
onChunk(buffers, chunk);
queueMicrotask(refresh);
}
if (stopReason) {
break;
}
}
} catch (e) {
// Handle stream errors (e.g., JSON parse errors from SDK, network issues)
@@ -270,6 +324,7 @@ export async function drainStream(
* @param refresh - Callback to refresh UI
* @param abortSignal - Optional abort signal for cancellation
* @param onFirstMessage - Optional callback to invoke on first message chunk
* @param onChunkProcessed - Optional hook to observe/override per-chunk behavior
* @returns Result with stop_reason, approval info, and timing
*/
export async function drainStreamWithResume(
@@ -278,6 +333,7 @@ export async function drainStreamWithResume(
refresh: () => void,
abortSignal?: AbortSignal,
onFirstMessage?: () => void,
onChunkProcessed?: DrainStreamHook,
): Promise<DrainResult> {
const overallStartTime = performance.now();
@@ -288,6 +344,7 @@ export async function drainStreamWithResume(
refresh,
abortSignal,
onFirstMessage,
onChunkProcessed,
);
// If stream ended without proper stop_reason and we have resume info, try once to reconnect
@@ -333,6 +390,8 @@ export async function drainStreamWithResume(
buffers,
refresh,
abortSignal,
undefined,
onChunkProcessed,
);
// Use the resume result (should have proper stop_reason now)