From 0992c00a039e81f29dadf3bdc6eebea07aca5087 Mon Sep 17 00:00:00 2001 From: jnjpng Date: Thu, 12 Mar 2026 17:59:45 -0600 Subject: [PATCH] feat: resume server-side run on pre-stream 409 conversation busy (#1370) Co-authored-by: Letta Code --- src/cli/App.tsx | 215 ++++++++++++++++++++++++++------- src/cli/helpers/stream.ts | 53 +++++++- src/headless.ts | 5 + src/websocket/listen-client.ts | 7 ++ 4 files changed, 231 insertions(+), 49 deletions(-) diff --git a/src/cli/App.tsx b/src/cli/App.tsx index 0296da7..f5e824d 100644 --- a/src/cli/App.tsx +++ b/src/cli/App.tsx @@ -52,7 +52,11 @@ import { ensureMemoryFilesystemDirs, getMemoryFilesystemRoot, } from "../agent/memoryFilesystem"; -import { getStreamToolContextId, sendMessageStream } from "../agent/message"; +import { + getStreamToolContextId, + type StreamRequestContext, + sendMessageStream, +} from "../agent/message"; import { getModelInfo, getModelInfoForLlmConfig, @@ -268,7 +272,13 @@ import { import { formatStatusLineHelp } from "./helpers/statusLineHelp"; import { buildStatusLinePayload } from "./helpers/statusLinePayload"; import { executeStatusLineCommand } from "./helpers/statusLineRuntime"; -import { type ApprovalRequest, drainStreamWithResume } from "./helpers/stream"; +import { + type ApprovalRequest, + type DrainResult, + discoverFallbackRunIdWithTimeout, + drainStream, + drainStreamWithResume, +} from "./helpers/stream"; import { collectFinishedTaskToolCalls, createSubagentGroupItem, @@ -3942,6 +3952,10 @@ export default function App({ clearCompletedSubagents(); } + // Capture once before the retry loop so the temporal filter in + // discoverFallbackRunIdForResume covers runs created by any attempt. + const requestStartedAtMs = Date.now(); + while (true) { // Capture the signal BEFORE any async operations // This prevents a race where handleInterrupt nulls the ref during await @@ -3985,15 +3999,18 @@ export default function App({ // Wrap in try-catch to handle pre-stream desync errors (when sendMessageStream // throws before streaming begins, e.g., retry after LLM error when backend // already cleared the approval) - let stream: Awaited>; + let stream: Awaited> | null = + null; let turnToolContextId: string | null = null; + let preStreamResumeResult: DrainResult | null = null; try { - stream = await sendMessageStream( + const nextStream = await sendMessageStream( conversationIdRef.current, currentInput, { agentId: agentIdRef.current }, ); - turnToolContextId = getStreamToolContextId(stream); + stream = nextStream; + turnToolContextId = getStreamToolContextId(nextStream); } catch (preStreamError) { debugLog( "stream", @@ -4082,42 +4099,134 @@ export default function App({ }, ); - // Show status message - const statusId = uid("status"); - buffersRef.current.byId.set(statusId, { - kind: "status", - id: statusId, - lines: ["Conversation is busy, waiting and retrying…"], - }); - buffersRef.current.order.push(statusId); - refreshDerived(); + // Attempt to discover and resume the in-flight run before waiting + try { + const resumeCtx: StreamRequestContext = { + conversationId: conversationIdRef.current, + resolvedConversationId: conversationIdRef.current, + agentId: agentIdRef.current, + requestStartedAtMs, + }; + debugLog( + "stream", + "Conversation busy: attempting run discovery for resume (conv=%s, agent=%s)", + resumeCtx.conversationId, + resumeCtx.agentId, + ); + const client = await getClient(); + const discoveredRunId = await discoverFallbackRunIdWithTimeout( + client, + resumeCtx, + ); + debugLog( + "stream", + "Run discovery result: %s", + discoveredRunId ?? "none", + ); - // Wait with abort checking (same pattern as LLM API error retry) - let cancelled = false; - const startTime = Date.now(); - while (Date.now() - startTime < retryDelayMs) { - if ( - abortControllerRef.current?.signal.aborted || - userCancelledRef.current - ) { - cancelled = true; - break; + if (discoveredRunId) { + if (signal?.aborted || userCancelledRef.current) { + const isStaleAtAbort = + myGeneration !== conversationGenerationRef.current; + if (!isStaleAtAbort) { + setStreaming(false); + } + return; + } + + // Found a running run — resume its stream + buffersRef.current.interrupted = false; + buffersRef.current.commitGeneration = + (buffersRef.current.commitGeneration || 0) + 1; + + const resumeStream = await client.runs.messages.stream( + discoveredRunId, + { + starting_after: 0, + batch_size: 1000, + }, + ); + + preStreamResumeResult = await drainStream( + resumeStream, + buffersRef.current, + refreshDerivedThrottled, + signal, + undefined, // no handleFirstMessage on resume + undefined, + contextTrackerRef.current, + ); + // Attach the discovered run ID + if (!preStreamResumeResult.lastRunId) { + preStreamResumeResult.lastRunId = discoveredRunId; + } + debugLog( + "stream", + "Pre-stream resume succeeded (runId=%s, stopReason=%s)", + discoveredRunId, + preStreamResumeResult.stopReason, + ); + // Fall through — preStreamResumeResult will short-circuit drainStreamWithResume } - await new Promise((resolve) => setTimeout(resolve, 100)); + } catch (resumeError) { + if (signal?.aborted || userCancelledRef.current) { + const isStaleAtAbort = + myGeneration !== conversationGenerationRef.current; + if (!isStaleAtAbort) { + setStreaming(false); + } + return; + } + + debugLog( + "stream", + "Pre-stream resume failed, falling back to wait/retry: %s", + resumeError instanceof Error + ? resumeError.message + : String(resumeError), + ); + // Fall through to existing wait/retry behavior } - // Remove status message - buffersRef.current.byId.delete(statusId); - buffersRef.current.order = buffersRef.current.order.filter( - (id) => id !== statusId, - ); - refreshDerived(); + // If resume succeeded, skip the wait/retry loop + if (!preStreamResumeResult) { + // Show status message + const statusId = uid("status"); + buffersRef.current.byId.set(statusId, { + kind: "status", + id: statusId, + lines: ["Conversation is busy, waiting and retrying…"], + }); + buffersRef.current.order.push(statusId); + refreshDerived(); - if (!cancelled) { - // Reset interrupted flag so retry stream chunks are processed - buffersRef.current.interrupted = false; - restorePinnedPermissionMode(); - continue; + // Wait with abort checking (same pattern as LLM API error retry) + let cancelled = false; + const startTime = Date.now(); + while (Date.now() - startTime < retryDelayMs) { + if ( + abortControllerRef.current?.signal.aborted || + userCancelledRef.current + ) { + cancelled = true; + break; + } + await new Promise((resolve) => setTimeout(resolve, 100)); + } + + // Remove status message + buffersRef.current.byId.delete(statusId); + buffersRef.current.order = buffersRef.current.order.filter( + (id) => id !== statusId, + ); + refreshDerived(); + + if (!cancelled) { + // Reset interrupted flag so retry stream chunks are processed + buffersRef.current.interrupted = false; + restorePinnedPermissionMode(); + continue; + } } // User pressed ESC - fall through to error handling } @@ -4297,7 +4406,10 @@ export default function App({ } // Not a recoverable desync - re-throw to outer catch - throw preStreamError; + // (unless pre-stream resume already succeeded) + if (!preStreamResumeResult) { + throw preStreamError; + } } // Check again after network call - user may have pressed Escape during sendMessageStream @@ -4403,6 +4515,25 @@ export default function App({ contextTrackerRef.current.currentTurnId++; } + const drainResult = preStreamResumeResult + ? preStreamResumeResult + : (() => { + if (!stream) { + throw new Error( + "Expected stream when pre-stream resume did not succeed", + ); + } + return drainStreamWithResume( + stream, + buffersRef.current, + refreshDerivedThrottled, + signal, // Use captured signal, not ref (which may be nulled by handleInterrupt) + handleFirstMessage, + undefined, + contextTrackerRef.current, + ); + })(); + const { stopReason, approval, @@ -4410,15 +4541,7 @@ export default function App({ apiDurationMs, lastRunId, fallbackError, - } = await drainStreamWithResume( - stream, - buffersRef.current, - refreshDerivedThrottled, - signal, // Use captured signal, not ref (which may be nulled by handleInterrupt) - handleFirstMessage, - undefined, - contextTrackerRef.current, - ); + } = await drainResult; // Update currentRunId for error reporting in catch block currentRunId = lastRunId ?? undefined; diff --git a/src/cli/helpers/stream.ts b/src/cli/helpers/stream.ts index e8bcb30..8b0c8c0 100644 --- a/src/cli/helpers/stream.ts +++ b/src/cli/helpers/stream.ts @@ -16,7 +16,7 @@ import { type StreamRequestContext, } from "../../agent/message"; import { telemetry } from "../../telemetry"; -import { debugWarn } from "../../utils/debug"; +import { debugLog, debugWarn } from "../../utils/debug"; import { formatDuration, logTiming } from "../../utils/timing"; import { @@ -57,7 +57,7 @@ export type DrainStreamHook = ( | undefined | Promise; -type DrainResult = { +export type DrainResult = { stopReason: StopReasonType; lastRunId?: string | null; lastSeqId?: number | null; @@ -101,7 +101,7 @@ function parseRunCreatedAtMs(run: Run): number { return Number.isFinite(parsed) ? parsed : 0; } -async function discoverFallbackRunIdWithTimeout( +export async function discoverFallbackRunIdWithTimeout( client: RunsListClient, ctx: StreamRequestContext, ): Promise { @@ -512,6 +512,9 @@ export async function drainStreamWithResume( ); let runIdToResume = result.lastRunId ?? null; + let runIdSource: "stream_chunk" | "discovery" | null = result.lastRunId + ? "stream_chunk" + : null; // If the stream failed before exposing run_id, try to discover the latest // running/created run for this conversation that was created after send start. @@ -523,13 +526,25 @@ export async function drainStreamWithResume( !abortSignal.aborted ) { try { + debugLog( + "stream", + "Mid-stream resume: attempting run discovery (conv=%s, agent=%s)", + streamRequestContext.conversationId, + streamRequestContext.agentId, + ); const client = await lazyClient(); runIdToResume = await discoverFallbackRunIdWithTimeout( client, streamRequestContext, ); + debugLog( + "stream", + "Mid-stream resume: run discovery result: %s", + runIdToResume ?? "none", + ); if (runIdToResume) { result.lastRunId = runIdToResume; + runIdSource = "discovery"; } } catch (lookupError) { const lookupErrorMsg = @@ -574,6 +589,21 @@ export async function drainStreamWithResume( }, ); + debugLog( + "stream", + "Mid-stream resume: fetching run stream (source=%s, runId=%s, lastSeqId=%s)", + runIdSource ?? "unknown", + runIdToResume, + result.lastSeqId ?? 0, + ); + + debugLog( + "stream", + "Mid-stream resume: attempting resume (runId=%s, lastSeqId=%s)", + runIdToResume, + result.lastSeqId ?? 0, + ); + try { const client = await lazyClient(); @@ -613,6 +643,12 @@ export async function drainStreamWithResume( // Use the resume result (should have proper stop_reason now) // Clear the original stream error since we recovered + debugLog( + "stream", + "Mid-stream resume succeeded (runId=%s, stopReason=%s)", + runIdToResume, + resumeResult.stopReason, + ); result = resumeResult; // The resumed stream uses a fresh streamProcessor that won't have @@ -635,6 +671,12 @@ export async function drainStreamWithResume( resumeError instanceof Error ? resumeError.message : String(resumeError); + debugLog( + "stream", + "Mid-stream resume failed (runId=%s): %s", + runIdToResume, + resumeErrorMsg, + ); telemetry.trackError( "stream_resume_failed", resumeErrorMsg, @@ -655,6 +697,11 @@ export async function drainStreamWithResume( // Only log if we actually skipped for a reason (i.e., we didn't enter the resume branch above) if (skipReasons.length > 0) { + debugLog( + "stream", + "Mid-stream resume skipped: %s", + skipReasons.join(", "), + ); telemetry.trackError( "stream_resume_skipped", `${result.fallbackError || "Stream error (no client-side detail)"} [skip: ${skipReasons.join(", ")}]`, diff --git a/src/headless.ts b/src/headless.ts index c20a229..5d1eb4c 100644 --- a/src/headless.ts +++ b/src/headless.ts @@ -1610,6 +1610,11 @@ ${SYSTEM_REMINDER_CLOSE} } // Check for 409 "conversation busy" error - retry once with delay + // TODO: Add pre-stream resume logic for parity with App.tsx. + // Before waiting, attempt to discover the in-flight run via + // discoverFallbackRunIdWithTimeout() and resume its stream with + // client.runs.messages.stream() + drainStream(). See App.tsx + // retry_conversation_busy handler for reference implementation. if (preStreamAction === "retry_conversation_busy") { conversationBusyRetries += 1; const retryDelayMs = getRetryDelayMs({ diff --git a/src/websocket/listen-client.ts b/src/websocket/listen-client.ts index 064bc8b..4f2c913 100644 --- a/src/websocket/listen-client.ts +++ b/src/websocket/listen-client.ts @@ -2216,6 +2216,13 @@ async function sendMessageStreamWithRetry( } if (action === "retry_conversation_busy") { + // TODO: Add pre-stream resume logic for parity with App.tsx. + // Before waiting, attempt to discover the in-flight run via + // discoverFallbackRunIdWithTimeout() and resume its stream with + // client.runs.messages.stream() + drainStream(). This avoids + // blind wait/retry cycles when the server already created a run + // from the original request. See App.tsx retry_conversation_busy + // handler for reference implementation. const attempt = conversationBusyRetries + 1; const delayMs = getRetryDelayMs({ category: "conversation_busy",