fix: defer markCurrentLineAsFinished when resume follows (#1498)

Co-authored-by: Letta Code <noreply@letta.com>
This commit is contained in:
cthomas
2026-03-23 18:39:33 -07:00
committed by GitHub
parent b445da0ce0
commit ed69d64f78
2 changed files with 34 additions and 8 deletions

View File

@@ -383,6 +383,11 @@ export function markCurrentLineAsFinished(b: Buffers) {
* @param setInterruptedFlag - Whether to set the interrupted flag (default true). * @param setInterruptedFlag - Whether to set the interrupted flag (default true).
* Pass false when clearing stale tool calls at stream startup to avoid race conditions * Pass false when clearing stale tool calls at stream startup to avoid race conditions
* with concurrent processConversation calls reading the flag. * with concurrent processConversation calls reading the flag.
* @param reason - Why the cancellation is happening.
* @param skipMarkCurrentLine - When true, do NOT call markCurrentLineAsFinished.
* Use this when a stream resume will follow: the resume stream will finalize the
* streaming line with its full text, so prematurely marking it finished would
* cause it to be committed to static with truncated content.
* @returns true if any tool calls were marked as cancelled * @returns true if any tool calls were marked as cancelled
*/ */
export type CancelReason = export type CancelReason =
@@ -402,6 +407,7 @@ export function markIncompleteToolsAsCancelled(
b: Buffers, b: Buffers,
setInterruptedFlag = true, setInterruptedFlag = true,
reason: CancelReason = "internal_cancel", reason: CancelReason = "internal_cancel",
skipMarkCurrentLine = false,
): boolean { ): boolean {
// Mark buffer as interrupted to skip stale throttled refreshes // Mark buffer as interrupted to skip stale throttled refreshes
// (only when actually interrupting, not when clearing stale state at startup) // (only when actually interrupting, not when clearing stale state at startup)
@@ -422,8 +428,12 @@ export function markIncompleteToolsAsCancelled(
anyToolsCancelled = true; anyToolsCancelled = true;
} }
} }
// Also mark any streaming assistant/reasoning lines as finished // Mark any streaming assistant/reasoning lines as finished, unless a resume
// is about to follow (in which case the resume stream will finalize it with
// full text — marking it now would freeze truncated content in static).
if (!skipMarkCurrentLine) {
markCurrentLineAsFinished(b); markCurrentLineAsFinished(b);
}
return anyToolsCancelled; return anyToolsCancelled;
} }

View File

@@ -213,6 +213,7 @@ export async function drainStream(
onChunkProcessed?: DrainStreamHook, onChunkProcessed?: DrainStreamHook,
contextTracker?: ContextTracker, contextTracker?: ContextTracker,
seenSeqIdThreshold?: number | null, seenSeqIdThreshold?: number | null,
isResumeStream?: boolean,
): Promise<DrainResult> { ): Promise<DrainResult> {
const startTime = performance.now(); const startTime = performance.now();
const requestStartTime = getStreamRequestStartTime(stream) ?? startTime; const requestStartTime = getStreamRequestStartTime(stream) ?? startTime;
@@ -259,8 +260,6 @@ export async function drainStream(
try { try {
for await (const chunk of stream) { for await (const chunk of stream) {
// console.log("chunk", chunk);
// Check if abort generation changed (handleInterrupt ran while we were waiting) // Check if abort generation changed (handleInterrupt ran while we were waiting)
// This catches cases where the abort signal might not propagate correctly // This catches cases where the abort signal might not propagate correctly
if ((buffers.abortGeneration || 0) !== startAbortGen) { if ((buffers.abortGeneration || 0) !== startAbortGen) {
@@ -386,7 +385,11 @@ export async function drainStream(
// Preserve a stop reason already parsed from stream chunks (e.g. llm_api_error) // Preserve a stop reason already parsed from stream chunks (e.g. llm_api_error)
// and only fall back to generic "error" when none is available. // and only fall back to generic "error" when none is available.
stopReason = streamProcessor.stopReason || "error"; stopReason = streamProcessor.stopReason || "error";
markIncompleteToolsAsCancelled(buffers, true, "stream_error"); // skipMarkCurrentLine=true: if a resume follows, the resume stream will
// finalize the streaming line with full text. Marking it finished now would
// commit truncated content to static (emittedIdsRef) before resume can append.
// drainStreamWithResume calls markCurrentLineAsFinished if no resume happens.
markIncompleteToolsAsCancelled(buffers, true, "stream_error", true);
queueMicrotask(refresh); queueMicrotask(refresh);
} finally { } finally {
// Persist chunk log to disk (one write per stream, not per chunk) // Persist chunk log to disk (one write per stream, not per chunk)
@@ -446,7 +449,13 @@ export async function drainStream(
const approval: ApprovalRequest | null = approvals[0] || null; const approval: ApprovalRequest | null = approvals[0] || null;
streamProcessor.pendingApprovals.clear(); streamProcessor.pendingApprovals.clear();
if (stopReason === "requires_approval" && approvals.length === 0) { if (
stopReason === "requires_approval" &&
approvals.length === 0 &&
!isResumeStream
) {
// On resume streams, approval chunks are before starting_after and won't be replayed.
// drainStreamWithResume carries them over from the original drain — this is expected.
debugWarn( debugWarn(
"drainStream", "drainStream",
"No approvals collected despite requires_approval stop reason", "No approvals collected despite requires_approval stop reason",
@@ -577,6 +586,9 @@ export async function drainStreamWithResume(
abortSignal && abortSignal &&
!abortSignal.aborted !abortSignal.aborted
) { ) {
// Resume path: markCurrentLineAsFinished was skipped in the catch block.
// If resume fails below, we call it in the catch. If no resume condition is
// met (else branch), we call it there instead.
// Preserve original state in case resume needs to merge or fails // Preserve original state in case resume needs to merge or fails
const originalFallbackError = result.fallbackError; const originalFallbackError = result.fallbackError;
const originalApprovals = result.approvals; const originalApprovals = result.approvals;
@@ -643,6 +655,7 @@ export async function drainStreamWithResume(
onChunkProcessed, onChunkProcessed,
contextTracker, contextTracker,
seenSeqIdThreshold, seenSeqIdThreshold,
true, // isResumeStream
); );
// Use the resume result (should have proper stop_reason now) // Use the resume result (should have proper stop_reason now)
@@ -667,8 +680,9 @@ export async function drainStreamWithResume(
result.approval = originalApproval; result.approval = originalApproval;
} }
} catch (resumeError) { } catch (resumeError) {
// Resume failed - stick with the error stop_reason // Resume failed - finalize the streaming line now (skipped in catch block above)
// Restore the original stream error for display markCurrentLineAsFinished(buffers);
// Stick with the error stop_reason and restore the original stream error for display
result.fallbackError = originalFallbackError; result.fallbackError = originalFallbackError;
const resumeErrorMsg = const resumeErrorMsg =
@@ -701,6 +715,8 @@ export async function drainStreamWithResume(
// Only log if we actually skipped for a reason (i.e., we didn't enter the resume branch above) // Only log if we actually skipped for a reason (i.e., we didn't enter the resume branch above)
if (skipReasons.length > 0) { if (skipReasons.length > 0) {
// No resume — finalize the streaming line now (was skipped in catch block)
markCurrentLineAsFinished(buffers);
debugLog( debugLog(
"stream", "stream",
"Mid-stream resume skipped: %s", "Mid-stream resume skipped: %s",