From ed69d64f787d0311a80da91cf25abbc0e5ad6e35 Mon Sep 17 00:00:00 2001 From: cthomas Date: Mon, 23 Mar 2026 18:39:33 -0700 Subject: [PATCH] fix: defer markCurrentLineAsFinished when resume follows (#1498) Co-authored-by: Letta Code --- src/cli/helpers/accumulator.ts | 14 ++++++++++++-- src/cli/helpers/stream.ts | 28 ++++++++++++++++++++++------ 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/src/cli/helpers/accumulator.ts b/src/cli/helpers/accumulator.ts index ef9ef03..3908a7f 100644 --- a/src/cli/helpers/accumulator.ts +++ b/src/cli/helpers/accumulator.ts @@ -383,6 +383,11 @@ export function markCurrentLineAsFinished(b: Buffers) { * @param setInterruptedFlag - Whether to set the interrupted flag (default true). * Pass false when clearing stale tool calls at stream startup to avoid race conditions * with concurrent processConversation calls reading the flag. + * @param reason - Why the cancellation is happening. + * @param skipMarkCurrentLine - When true, do NOT call markCurrentLineAsFinished. + * Use this when a stream resume will follow: the resume stream will finalize the + * streaming line with its full text, so prematurely marking it finished would + * cause it to be committed to static with truncated content. * @returns true if any tool calls were marked as cancelled */ export type CancelReason = @@ -402,6 +407,7 @@ export function markIncompleteToolsAsCancelled( b: Buffers, setInterruptedFlag = true, reason: CancelReason = "internal_cancel", + skipMarkCurrentLine = false, ): boolean { // Mark buffer as interrupted to skip stale throttled refreshes // (only when actually interrupting, not when clearing stale state at startup) @@ -422,8 +428,12 @@ export function markIncompleteToolsAsCancelled( anyToolsCancelled = true; } } - // Also mark any streaming assistant/reasoning lines as finished - markCurrentLineAsFinished(b); + // Mark any streaming assistant/reasoning lines as finished, unless a resume + // is about to follow (in which case the resume stream will finalize it with + // full text — marking it now would freeze truncated content in static). + if (!skipMarkCurrentLine) { + markCurrentLineAsFinished(b); + } return anyToolsCancelled; } diff --git a/src/cli/helpers/stream.ts b/src/cli/helpers/stream.ts index 85244c1..da46ea6 100644 --- a/src/cli/helpers/stream.ts +++ b/src/cli/helpers/stream.ts @@ -213,6 +213,7 @@ export async function drainStream( onChunkProcessed?: DrainStreamHook, contextTracker?: ContextTracker, seenSeqIdThreshold?: number | null, + isResumeStream?: boolean, ): Promise { const startTime = performance.now(); const requestStartTime = getStreamRequestStartTime(stream) ?? startTime; @@ -259,8 +260,6 @@ export async function drainStream( try { for await (const chunk of stream) { - // console.log("chunk", chunk); - // Check if abort generation changed (handleInterrupt ran while we were waiting) // This catches cases where the abort signal might not propagate correctly if ((buffers.abortGeneration || 0) !== startAbortGen) { @@ -386,7 +385,11 @@ export async function drainStream( // Preserve a stop reason already parsed from stream chunks (e.g. llm_api_error) // and only fall back to generic "error" when none is available. stopReason = streamProcessor.stopReason || "error"; - markIncompleteToolsAsCancelled(buffers, true, "stream_error"); + // skipMarkCurrentLine=true: if a resume follows, the resume stream will + // finalize the streaming line with full text. Marking it finished now would + // commit truncated content to static (emittedIdsRef) before resume can append. + // drainStreamWithResume calls markCurrentLineAsFinished if no resume happens. + markIncompleteToolsAsCancelled(buffers, true, "stream_error", true); queueMicrotask(refresh); } finally { // Persist chunk log to disk (one write per stream, not per chunk) @@ -446,7 +449,13 @@ export async function drainStream( const approval: ApprovalRequest | null = approvals[0] || null; streamProcessor.pendingApprovals.clear(); - if (stopReason === "requires_approval" && approvals.length === 0) { + if ( + stopReason === "requires_approval" && + approvals.length === 0 && + !isResumeStream + ) { + // On resume streams, approval chunks are before starting_after and won't be replayed. + // drainStreamWithResume carries them over from the original drain — this is expected. debugWarn( "drainStream", "No approvals collected despite requires_approval stop reason", @@ -577,6 +586,9 @@ export async function drainStreamWithResume( abortSignal && !abortSignal.aborted ) { + // Resume path: markCurrentLineAsFinished was skipped in the catch block. + // If resume fails below, we call it in the catch. If no resume condition is + // met (else branch), we call it there instead. // Preserve original state in case resume needs to merge or fails const originalFallbackError = result.fallbackError; const originalApprovals = result.approvals; @@ -643,6 +655,7 @@ export async function drainStreamWithResume( onChunkProcessed, contextTracker, seenSeqIdThreshold, + true, // isResumeStream ); // Use the resume result (should have proper stop_reason now) @@ -667,8 +680,9 @@ export async function drainStreamWithResume( result.approval = originalApproval; } } catch (resumeError) { - // Resume failed - stick with the error stop_reason - // Restore the original stream error for display + // Resume failed - finalize the streaming line now (skipped in catch block above) + markCurrentLineAsFinished(buffers); + // Stick with the error stop_reason and restore the original stream error for display result.fallbackError = originalFallbackError; const resumeErrorMsg = @@ -701,6 +715,8 @@ export async function drainStreamWithResume( // Only log if we actually skipped for a reason (i.e., we didn't enter the resume branch above) if (skipReasons.length > 0) { + // No resume — finalize the streaming line now (was skipped in catch block) + markCurrentLineAsFinished(buffers); debugLog( "stream", "Mid-stream resume skipped: %s",