feat: resume server-side run on pre-stream 409 conversation busy (#1370)

Co-authored-by: Letta Code <noreply@letta.com>
This commit is contained in:
jnjpng
2026-03-12 17:59:45 -06:00
committed by GitHub
parent af45355617
commit 0992c00a03
4 changed files with 231 additions and 49 deletions

View File

@@ -52,7 +52,11 @@ import {
ensureMemoryFilesystemDirs, ensureMemoryFilesystemDirs,
getMemoryFilesystemRoot, getMemoryFilesystemRoot,
} from "../agent/memoryFilesystem"; } from "../agent/memoryFilesystem";
import { getStreamToolContextId, sendMessageStream } from "../agent/message"; import {
getStreamToolContextId,
type StreamRequestContext,
sendMessageStream,
} from "../agent/message";
import { import {
getModelInfo, getModelInfo,
getModelInfoForLlmConfig, getModelInfoForLlmConfig,
@@ -268,7 +272,13 @@ import {
import { formatStatusLineHelp } from "./helpers/statusLineHelp"; import { formatStatusLineHelp } from "./helpers/statusLineHelp";
import { buildStatusLinePayload } from "./helpers/statusLinePayload"; import { buildStatusLinePayload } from "./helpers/statusLinePayload";
import { executeStatusLineCommand } from "./helpers/statusLineRuntime"; import { executeStatusLineCommand } from "./helpers/statusLineRuntime";
import { type ApprovalRequest, drainStreamWithResume } from "./helpers/stream"; import {
type ApprovalRequest,
type DrainResult,
discoverFallbackRunIdWithTimeout,
drainStream,
drainStreamWithResume,
} from "./helpers/stream";
import { import {
collectFinishedTaskToolCalls, collectFinishedTaskToolCalls,
createSubagentGroupItem, createSubagentGroupItem,
@@ -3942,6 +3952,10 @@ export default function App({
clearCompletedSubagents(); clearCompletedSubagents();
} }
// Capture once before the retry loop so the temporal filter in
// discoverFallbackRunIdForResume covers runs created by any attempt.
const requestStartedAtMs = Date.now();
while (true) { while (true) {
// Capture the signal BEFORE any async operations // Capture the signal BEFORE any async operations
// This prevents a race where handleInterrupt nulls the ref during await // This prevents a race where handleInterrupt nulls the ref during await
@@ -3985,15 +3999,18 @@ export default function App({
// Wrap in try-catch to handle pre-stream desync errors (when sendMessageStream // Wrap in try-catch to handle pre-stream desync errors (when sendMessageStream
// throws before streaming begins, e.g., retry after LLM error when backend // throws before streaming begins, e.g., retry after LLM error when backend
// already cleared the approval) // already cleared the approval)
let stream: Awaited<ReturnType<typeof sendMessageStream>>; let stream: Awaited<ReturnType<typeof sendMessageStream>> | null =
null;
let turnToolContextId: string | null = null; let turnToolContextId: string | null = null;
let preStreamResumeResult: DrainResult | null = null;
try { try {
stream = await sendMessageStream( const nextStream = await sendMessageStream(
conversationIdRef.current, conversationIdRef.current,
currentInput, currentInput,
{ agentId: agentIdRef.current }, { agentId: agentIdRef.current },
); );
turnToolContextId = getStreamToolContextId(stream); stream = nextStream;
turnToolContextId = getStreamToolContextId(nextStream);
} catch (preStreamError) { } catch (preStreamError) {
debugLog( debugLog(
"stream", "stream",
@@ -4082,6 +4099,97 @@ export default function App({
}, },
); );
// Attempt to discover and resume the in-flight run before waiting
try {
const resumeCtx: StreamRequestContext = {
conversationId: conversationIdRef.current,
resolvedConversationId: conversationIdRef.current,
agentId: agentIdRef.current,
requestStartedAtMs,
};
debugLog(
"stream",
"Conversation busy: attempting run discovery for resume (conv=%s, agent=%s)",
resumeCtx.conversationId,
resumeCtx.agentId,
);
const client = await getClient();
const discoveredRunId = await discoverFallbackRunIdWithTimeout(
client,
resumeCtx,
);
debugLog(
"stream",
"Run discovery result: %s",
discoveredRunId ?? "none",
);
if (discoveredRunId) {
if (signal?.aborted || userCancelledRef.current) {
const isStaleAtAbort =
myGeneration !== conversationGenerationRef.current;
if (!isStaleAtAbort) {
setStreaming(false);
}
return;
}
// Found a running run — resume its stream
buffersRef.current.interrupted = false;
buffersRef.current.commitGeneration =
(buffersRef.current.commitGeneration || 0) + 1;
const resumeStream = await client.runs.messages.stream(
discoveredRunId,
{
starting_after: 0,
batch_size: 1000,
},
);
preStreamResumeResult = await drainStream(
resumeStream,
buffersRef.current,
refreshDerivedThrottled,
signal,
undefined, // no handleFirstMessage on resume
undefined,
contextTrackerRef.current,
);
// Attach the discovered run ID
if (!preStreamResumeResult.lastRunId) {
preStreamResumeResult.lastRunId = discoveredRunId;
}
debugLog(
"stream",
"Pre-stream resume succeeded (runId=%s, stopReason=%s)",
discoveredRunId,
preStreamResumeResult.stopReason,
);
// Fall through — preStreamResumeResult will short-circuit drainStreamWithResume
}
} catch (resumeError) {
if (signal?.aborted || userCancelledRef.current) {
const isStaleAtAbort =
myGeneration !== conversationGenerationRef.current;
if (!isStaleAtAbort) {
setStreaming(false);
}
return;
}
debugLog(
"stream",
"Pre-stream resume failed, falling back to wait/retry: %s",
resumeError instanceof Error
? resumeError.message
: String(resumeError),
);
// Fall through to existing wait/retry behavior
}
// If resume succeeded, skip the wait/retry loop
if (!preStreamResumeResult) {
// Show status message // Show status message
const statusId = uid("status"); const statusId = uid("status");
buffersRef.current.byId.set(statusId, { buffersRef.current.byId.set(statusId, {
@@ -4119,6 +4227,7 @@ export default function App({
restorePinnedPermissionMode(); restorePinnedPermissionMode();
continue; continue;
} }
}
// User pressed ESC - fall through to error handling // User pressed ESC - fall through to error handling
} }
@@ -4297,8 +4406,11 @@ export default function App({
} }
// Not a recoverable desync - re-throw to outer catch // Not a recoverable desync - re-throw to outer catch
// (unless pre-stream resume already succeeded)
if (!preStreamResumeResult) {
throw preStreamError; throw preStreamError;
} }
}
// Check again after network call - user may have pressed Escape during sendMessageStream // Check again after network call - user may have pressed Escape during sendMessageStream
if (signal?.aborted) { if (signal?.aborted) {
@@ -4403,14 +4515,15 @@ export default function App({
contextTrackerRef.current.currentTurnId++; contextTrackerRef.current.currentTurnId++;
} }
const { const drainResult = preStreamResumeResult
stopReason, ? preStreamResumeResult
approval, : (() => {
approvals, if (!stream) {
apiDurationMs, throw new Error(
lastRunId, "Expected stream when pre-stream resume did not succeed",
fallbackError, );
} = await drainStreamWithResume( }
return drainStreamWithResume(
stream, stream,
buffersRef.current, buffersRef.current,
refreshDerivedThrottled, refreshDerivedThrottled,
@@ -4419,6 +4532,16 @@ export default function App({
undefined, undefined,
contextTrackerRef.current, contextTrackerRef.current,
); );
})();
const {
stopReason,
approval,
approvals,
apiDurationMs,
lastRunId,
fallbackError,
} = await drainResult;
// Update currentRunId for error reporting in catch block // Update currentRunId for error reporting in catch block
currentRunId = lastRunId ?? undefined; currentRunId = lastRunId ?? undefined;

View File

@@ -16,7 +16,7 @@ import {
type StreamRequestContext, type StreamRequestContext,
} from "../../agent/message"; } from "../../agent/message";
import { telemetry } from "../../telemetry"; import { telemetry } from "../../telemetry";
import { debugWarn } from "../../utils/debug"; import { debugLog, debugWarn } from "../../utils/debug";
import { formatDuration, logTiming } from "../../utils/timing"; import { formatDuration, logTiming } from "../../utils/timing";
import { import {
@@ -57,7 +57,7 @@ export type DrainStreamHook = (
| undefined | undefined
| Promise<DrainStreamHookResult | undefined>; | Promise<DrainStreamHookResult | undefined>;
type DrainResult = { export type DrainResult = {
stopReason: StopReasonType; stopReason: StopReasonType;
lastRunId?: string | null; lastRunId?: string | null;
lastSeqId?: number | null; lastSeqId?: number | null;
@@ -101,7 +101,7 @@ function parseRunCreatedAtMs(run: Run): number {
return Number.isFinite(parsed) ? parsed : 0; return Number.isFinite(parsed) ? parsed : 0;
} }
async function discoverFallbackRunIdWithTimeout( export async function discoverFallbackRunIdWithTimeout(
client: RunsListClient, client: RunsListClient,
ctx: StreamRequestContext, ctx: StreamRequestContext,
): Promise<string | null> { ): Promise<string | null> {
@@ -512,6 +512,9 @@ export async function drainStreamWithResume(
); );
let runIdToResume = result.lastRunId ?? null; let runIdToResume = result.lastRunId ?? null;
let runIdSource: "stream_chunk" | "discovery" | null = result.lastRunId
? "stream_chunk"
: null;
// If the stream failed before exposing run_id, try to discover the latest // If the stream failed before exposing run_id, try to discover the latest
// running/created run for this conversation that was created after send start. // running/created run for this conversation that was created after send start.
@@ -523,13 +526,25 @@ export async function drainStreamWithResume(
!abortSignal.aborted !abortSignal.aborted
) { ) {
try { try {
debugLog(
"stream",
"Mid-stream resume: attempting run discovery (conv=%s, agent=%s)",
streamRequestContext.conversationId,
streamRequestContext.agentId,
);
const client = await lazyClient(); const client = await lazyClient();
runIdToResume = await discoverFallbackRunIdWithTimeout( runIdToResume = await discoverFallbackRunIdWithTimeout(
client, client,
streamRequestContext, streamRequestContext,
); );
debugLog(
"stream",
"Mid-stream resume: run discovery result: %s",
runIdToResume ?? "none",
);
if (runIdToResume) { if (runIdToResume) {
result.lastRunId = runIdToResume; result.lastRunId = runIdToResume;
runIdSource = "discovery";
} }
} catch (lookupError) { } catch (lookupError) {
const lookupErrorMsg = const lookupErrorMsg =
@@ -574,6 +589,21 @@ export async function drainStreamWithResume(
}, },
); );
debugLog(
"stream",
"Mid-stream resume: fetching run stream (source=%s, runId=%s, lastSeqId=%s)",
runIdSource ?? "unknown",
runIdToResume,
result.lastSeqId ?? 0,
);
debugLog(
"stream",
"Mid-stream resume: attempting resume (runId=%s, lastSeqId=%s)",
runIdToResume,
result.lastSeqId ?? 0,
);
try { try {
const client = await lazyClient(); const client = await lazyClient();
@@ -613,6 +643,12 @@ export async function drainStreamWithResume(
// Use the resume result (should have proper stop_reason now) // Use the resume result (should have proper stop_reason now)
// Clear the original stream error since we recovered // Clear the original stream error since we recovered
debugLog(
"stream",
"Mid-stream resume succeeded (runId=%s, stopReason=%s)",
runIdToResume,
resumeResult.stopReason,
);
result = resumeResult; result = resumeResult;
// The resumed stream uses a fresh streamProcessor that won't have // The resumed stream uses a fresh streamProcessor that won't have
@@ -635,6 +671,12 @@ export async function drainStreamWithResume(
resumeError instanceof Error resumeError instanceof Error
? resumeError.message ? resumeError.message
: String(resumeError); : String(resumeError);
debugLog(
"stream",
"Mid-stream resume failed (runId=%s): %s",
runIdToResume,
resumeErrorMsg,
);
telemetry.trackError( telemetry.trackError(
"stream_resume_failed", "stream_resume_failed",
resumeErrorMsg, resumeErrorMsg,
@@ -655,6 +697,11 @@ export async function drainStreamWithResume(
// Only log if we actually skipped for a reason (i.e., we didn't enter the resume branch above) // Only log if we actually skipped for a reason (i.e., we didn't enter the resume branch above)
if (skipReasons.length > 0) { if (skipReasons.length > 0) {
debugLog(
"stream",
"Mid-stream resume skipped: %s",
skipReasons.join(", "),
);
telemetry.trackError( telemetry.trackError(
"stream_resume_skipped", "stream_resume_skipped",
`${result.fallbackError || "Stream error (no client-side detail)"} [skip: ${skipReasons.join(", ")}]`, `${result.fallbackError || "Stream error (no client-side detail)"} [skip: ${skipReasons.join(", ")}]`,

View File

@@ -1610,6 +1610,11 @@ ${SYSTEM_REMINDER_CLOSE}
} }
// Check for 409 "conversation busy" error - retry once with delay // Check for 409 "conversation busy" error - retry once with delay
// TODO: Add pre-stream resume logic for parity with App.tsx.
// Before waiting, attempt to discover the in-flight run via
// discoverFallbackRunIdWithTimeout() and resume its stream with
// client.runs.messages.stream() + drainStream(). See App.tsx
// retry_conversation_busy handler for reference implementation.
if (preStreamAction === "retry_conversation_busy") { if (preStreamAction === "retry_conversation_busy") {
conversationBusyRetries += 1; conversationBusyRetries += 1;
const retryDelayMs = getRetryDelayMs({ const retryDelayMs = getRetryDelayMs({

View File

@@ -2216,6 +2216,13 @@ async function sendMessageStreamWithRetry(
} }
if (action === "retry_conversation_busy") { if (action === "retry_conversation_busy") {
// TODO: Add pre-stream resume logic for parity with App.tsx.
// Before waiting, attempt to discover the in-flight run via
// discoverFallbackRunIdWithTimeout() and resume its stream with
// client.runs.messages.stream() + drainStream(). This avoids
// blind wait/retry cycles when the server already created a run
// from the original request. See App.tsx retry_conversation_busy
// handler for reference implementation.
const attempt = conversationBusyRetries + 1; const attempt = conversationBusyRetries + 1;
const delayMs = getRetryDelayMs({ const delayMs = getRetryDelayMs({
category: "conversation_busy", category: "conversation_busy",