diff --git a/src/agent/turn-recovery-policy.ts b/src/agent/turn-recovery-policy.ts index e9faeb0..1732ba3 100644 --- a/src/agent/turn-recovery-policy.ts +++ b/src/agent/turn-recovery-policy.ts @@ -6,6 +6,7 @@ * action. No network calls, no React, no stream-json output. */ +import { randomUUID } from "node:crypto"; import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents"; import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages"; import { isCloudflareEdge52xHtmlError } from "../cli/helpers/errorFormatter"; @@ -368,7 +369,10 @@ export function rebuildInputWithFreshDenials( serverApprovals: PendingApprovalInfo[], denialReason: string, ): Array { - const stripped = currentInput.filter((item) => item?.type !== "approval"); + // Refresh OTIDs on all stripped messages — this is a new request, not a retry + const stripped = currentInput + .filter((item) => item?.type !== "approval") + .map((item) => ({ ...item, otid: randomUUID() })); if (serverApprovals.length > 0) { const denials: ApprovalCreate = { @@ -379,6 +383,7 @@ export function rebuildInputWithFreshDenials( approve: false, reason: denialReason, })), + otid: randomUUID(), }; return [denials, ...stripped]; } diff --git a/src/cli/App.tsx b/src/cli/App.tsx index fb608d6..3915cc4 100644 --- a/src/cli/App.tsx +++ b/src/cli/App.tsx @@ -1,5 +1,6 @@ // src/cli/App.tsx +import { randomUUID } from "node:crypto"; import { existsSync, readFileSync, renameSync, writeFileSync } from "node:fs"; import { homedir, tmpdir } from "node:os"; import { join, relative } from "node:path"; @@ -281,13 +282,7 @@ import { import { formatStatusLineHelp } from "./helpers/statusLineHelp"; import { buildStatusLinePayload } from "./helpers/statusLinePayload"; import { executeStatusLineCommand } from "./helpers/statusLineRuntime"; -import { - type ApprovalRequest, - type DrainResult, - discoverFallbackRunIdWithTimeout, - drainStream, - drainStreamWithResume, -} from "./helpers/stream"; +import { type ApprovalRequest, drainStreamWithResume } from "./helpers/stream"; import { collectFinishedTaskToolCalls, createSubagentGroupItem, @@ -3866,6 +3861,7 @@ export default function App({ type: "message", role: "user", content: `${systemMsg}\n\n${newState.originalPrompt}`, + otid: randomUUID(), }, ], { allowReentry: true }, @@ -3875,6 +3871,13 @@ export default function App({ // Copy so we can safely mutate for retry recovery flows let currentInput = [...initialInput]; + const refreshCurrentInputOtids = () => { + // Terminal stop-reason retries are NEW requests and must not reuse OTIDs. + currentInput = currentInput.map((item) => ({ + ...item, + otid: randomUUID(), + })); + }; const allowReentry = options?.allowReentry ?? false; const hasApprovalInput = initialInput.some( (item) => item.type === "approval", @@ -3957,11 +3960,16 @@ export default function App({ canInjectInterruptRecovery ) { currentInput = [ - ...lastSentInputRef.current, + // Refresh OTIDs — this is a new request, not a retry of the interrupted one + ...lastSentInputRef.current.map((m) => ({ + ...m, + otid: randomUUID(), + })), ...currentInput.map((m) => m.type === "message" && m.role === "user" ? { ...m, + otid: randomUUID(), content: [ { type: "text" as const, text: INTERRUPT_RECOVERY_ALERT }, ...(typeof m.content === "string" @@ -3969,7 +3977,7 @@ export default function App({ : m.content), ], } - : m, + : { ...m, otid: randomUUID() }, ), ]; pendingInterruptRecoveryConversationIdRef.current = null; @@ -4035,23 +4043,22 @@ export default function App({ // Inject queued skill content as user message parts (LET-7353) // This centralizes skill content injection so all approval-send paths // automatically get skill SKILL.md content alongside tool results. - { - const { consumeQueuedSkillContent } = await import( - "../tools/impl/skillContentRegistry" - ); - const skillContents = consumeQueuedSkillContent(); - if (skillContents.length > 0) { - currentInput = [ - ...currentInput, - { - role: "user", - content: skillContents.map((sc) => ({ - type: "text" as const, - text: sc.content, - })), - }, - ]; - } + const { consumeQueuedSkillContent } = await import( + "../tools/impl/skillContentRegistry" + ); + const skillContents = consumeQueuedSkillContent(); + if (skillContents.length > 0) { + currentInput = [ + ...currentInput, + { + role: "user", + content: skillContents.map((sc) => ({ + type: "text" as const, + text: sc.content, + })), + otid: randomUUID(), + }, + ]; } // Stream one turn - use ref to always get the latest conversationId @@ -4061,7 +4068,6 @@ export default function App({ let stream: Awaited> | null = null; let turnToolContextId: string | null = null; - let preStreamResumeResult: DrainResult | null = null; try { const nextStream = await sendMessageStream( conversationIdRef.current, @@ -4158,135 +4164,42 @@ export default function App({ }, ); - // Attempt to discover and resume the in-flight run before waiting - try { - const resumeCtx: StreamRequestContext = { - conversationId: conversationIdRef.current, - resolvedConversationId: conversationIdRef.current, - agentId: agentIdRef.current, - requestStartedAtMs, - }; - debugLog( - "stream", - "Conversation busy: attempting run discovery for resume (conv=%s, agent=%s)", - resumeCtx.conversationId, - resumeCtx.agentId, - ); - const client = await getClient(); - const discoveredRunId = await discoverFallbackRunIdWithTimeout( - client, - resumeCtx, - ); - debugLog( - "stream", - "Run discovery result: %s", - discoveredRunId ?? "none", - ); + // Show status message + const statusId = uid("status"); + buffersRef.current.byId.set(statusId, { + kind: "status", + id: statusId, + lines: ["Conversation is busy, waiting and retrying…"], + }); + buffersRef.current.order.push(statusId); + refreshDerived(); - if (discoveredRunId) { - if (signal?.aborted || userCancelledRef.current) { - const isStaleAtAbort = - myGeneration !== conversationGenerationRef.current; - if (!isStaleAtAbort) { - setStreaming(false); - } - return; - } - - // Found a running run — resume its stream - buffersRef.current.interrupted = false; - buffersRef.current.commitGeneration = - (buffersRef.current.commitGeneration || 0) + 1; - - const resumeStream = await client.runs.messages.stream( - discoveredRunId, - { - starting_after: 0, - batch_size: 1000, - }, - ); - - preStreamResumeResult = await drainStream( - resumeStream, - buffersRef.current, - refreshDerivedThrottled, - signal, - undefined, // no handleFirstMessage on resume - undefined, - contextTrackerRef.current, - highestSeqIdSeen, - ); - // Attach the discovered run ID - if (!preStreamResumeResult.lastRunId) { - preStreamResumeResult.lastRunId = discoveredRunId; - } - debugLog( - "stream", - "Pre-stream resume succeeded (runId=%s, stopReason=%s)", - discoveredRunId, - preStreamResumeResult.stopReason, - ); - // Fall through — preStreamResumeResult will short-circuit drainStreamWithResume + // Wait with abort checking (same pattern as LLM API error retry) + let cancelled = false; + const startTime = Date.now(); + while (Date.now() - startTime < retryDelayMs) { + if ( + abortControllerRef.current?.signal.aborted || + userCancelledRef.current + ) { + cancelled = true; + break; } - } catch (resumeError) { - if (signal?.aborted || userCancelledRef.current) { - const isStaleAtAbort = - myGeneration !== conversationGenerationRef.current; - if (!isStaleAtAbort) { - setStreaming(false); - } - return; - } - - debugLog( - "stream", - "Pre-stream resume failed, falling back to wait/retry: %s", - resumeError instanceof Error - ? resumeError.message - : String(resumeError), - ); - // Fall through to existing wait/retry behavior + await new Promise((resolve) => setTimeout(resolve, 100)); } - // If resume succeeded, skip the wait/retry loop - if (!preStreamResumeResult) { - // Show status message - const statusId = uid("status"); - buffersRef.current.byId.set(statusId, { - kind: "status", - id: statusId, - lines: ["Conversation is busy, waiting and retrying…"], - }); - buffersRef.current.order.push(statusId); - refreshDerived(); + // Remove status message + buffersRef.current.byId.delete(statusId); + buffersRef.current.order = buffersRef.current.order.filter( + (id) => id !== statusId, + ); + refreshDerived(); - // Wait with abort checking (same pattern as LLM API error retry) - let cancelled = false; - const startTime = Date.now(); - while (Date.now() - startTime < retryDelayMs) { - if ( - abortControllerRef.current?.signal.aborted || - userCancelledRef.current - ) { - cancelled = true; - break; - } - await new Promise((resolve) => setTimeout(resolve, 100)); - } - - // Remove status message - buffersRef.current.byId.delete(statusId); - buffersRef.current.order = buffersRef.current.order.filter( - (id) => id !== statusId, - ); - refreshDerived(); - - if (!cancelled) { - // Reset interrupted flag so retry stream chunks are processed - buffersRef.current.interrupted = false; - restorePinnedPermissionMode(); - continue; - } + if (!cancelled) { + // Reset interrupted flag so retry stream chunks are processed + buffersRef.current.interrupted = false; + restorePinnedPermissionMode(); + continue; } // User pressed ESC - fall through to error handling } @@ -4466,10 +4379,7 @@ export default function App({ } // Not a recoverable desync - re-throw to outer catch - // (unless pre-stream resume already succeeded) - if (!preStreamResumeResult) { - throw preStreamError; - } + throw preStreamError; } // Check again after network call - user may have pressed Escape during sendMessageStream @@ -4575,25 +4485,19 @@ export default function App({ contextTrackerRef.current.currentTurnId++; } - const drainResult = preStreamResumeResult - ? preStreamResumeResult - : (() => { - if (!stream) { - throw new Error( - "Expected stream when pre-stream resume did not succeed", - ); - } - return drainStreamWithResume( - stream, - buffersRef.current, - refreshDerivedThrottled, - signal, // Use captured signal, not ref (which may be nulled by handleInterrupt) - handleFirstMessage, - undefined, - contextTrackerRef.current, - highestSeqIdSeen, - ); - })(); + if (!stream) { + throw new Error("Expected stream to be set before drain"); + } + const drainResult = drainStreamWithResume( + stream, + buffersRef.current, + refreshDerivedThrottled, + signal, // Use captured signal, not ref (which may be nulled by handleInterrupt) + handleFirstMessage, + undefined, + contextTrackerRef.current, + highestSeqIdSeen, + ); const { stopReason, @@ -4751,6 +4655,7 @@ export default function App({ refreshDerived(); // Continue conversation with the hook feedback + const hookMessageOtid = randomUUID(); setTimeout(() => { processConversation( [ @@ -4758,6 +4663,7 @@ export default function App({ type: "message", role: "user", content: hookMessage, + otid: hookMessageOtid, }, ], { allowReentry: true }, @@ -5250,11 +5156,16 @@ export default function App({ toolResultsInFlightRef.current = true; await processConversation( [ - { type: "approval", approvals: allResults }, + { + type: "approval", + approvals: allResults, + otid: randomUUID(), + }, { type: "message", role: "user", content: queuedContentParts, + otid: randomUUID(), }, ], { allowReentry: true }, @@ -5300,6 +5211,7 @@ export default function App({ { type: "approval", approvals: allResults, + otid: randomUUID(), }, ], { allowReentry: true }, @@ -5570,6 +5482,7 @@ export default function App({ type: "message" as const, role: "system" as const, content: `The previous response was empty. Please provide a response with either text content or a tool call.`, + otid: randomUUID(), }, ]; } @@ -5593,6 +5506,8 @@ export default function App({ ); refreshDerived(); + // Empty-response retry starts a new request/run, so refresh OTIDs. + refreshCurrentInputOtids(); buffersRef.current.interrupted = false; continue; } @@ -5607,6 +5522,9 @@ export default function App({ retriable && llmApiErrorRetriesRef.current < LLM_API_ERROR_MAX_RETRIES ) { + // Do NOT replay the same run for terminal post-stream errors + // (e.g. llm_api_error). A retry should create a new run. + llmApiErrorRetriesRef.current += 1; const attempt = llmApiErrorRetriesRef.current; const delayMs = getRetryDelayMs({ @@ -5674,9 +5592,11 @@ export default function App({ } if (!cancelled) { + // Post-stream retry is a new request/run, so refresh OTIDs. + refreshCurrentInputOtids(); // Reset interrupted flag so retry stream chunks are processed buffersRef.current.interrupted = false; - // Retry by continuing the while loop (same currentInput) + // Retry by continuing the while loop with fresh OTIDs. continue; } // User pressed ESC - fall through to error handling @@ -6939,7 +6859,7 @@ export default function App({ if (allResults.length > 0) { toolResultsInFlightRef.current = true; await processConversation([ - { type: "approval", approvals: allResults }, + { type: "approval", approvals: allResults, otid: randomUUID() }, ]); toolResultsInFlightRef.current = false; @@ -8043,6 +7963,7 @@ export default function App({ type: "message", role: "user", content: buildTextParts(systemMsg, prompt), + otid: randomUUID(), }, ]); } else { @@ -9464,6 +9385,7 @@ export default function App({ type: "message", role: "user", content: buildTextParts(skillMessage), + otid: randomUUID(), }, ]); } catch (error) { @@ -9527,6 +9449,7 @@ export default function App({ type: "message", role: "user", content: rememberParts, + otid: randomUUID(), }, ]); } catch (error) { @@ -9683,6 +9606,7 @@ export default function App({ type: "message", role: "user", content: buildTextParts(initMessage), + otid: randomUUID(), }, ]); } catch (error) { @@ -9799,6 +9723,7 @@ export default function App({ content: buildTextParts( `${SYSTEM_REMINDER_OPEN}\n${prompt}\n${SYSTEM_REMINDER_CLOSE}`, ), + otid: randomUUID(), }, ]); } catch (error) { @@ -10417,12 +10342,14 @@ ${SYSTEM_REMINDER_CLOSE} { type: "approval", approvals: recoveryApprovalResults, + otid: randomUUID(), }, { type: "message", role: "user", content: messageContent as unknown as MessageCreate["content"], + otid: randomUUID(), }, ]; @@ -10698,6 +10625,7 @@ ${SYSTEM_REMINDER_CLOSE} type: "message", role: "user", content: messageContent as unknown as MessageCreate["content"], + otid: randomUUID(), }); await processConversation(initialInput, { @@ -11046,6 +10974,7 @@ ${SYSTEM_REMINDER_CLOSE} type: "message", role: "user", content: buildQueuedContentParts(queuedItemsToAppend), + otid: randomUUID(), }); refreshDerived(); } else if (hadNotifications) { @@ -11315,6 +11244,7 @@ ${SYSTEM_REMINDER_CLOSE} { type: "approval", approvals: allResults as ApprovalResult[], + otid: randomUUID(), }, ]); } finally { diff --git a/src/headless.ts b/src/headless.ts index 14a52be..ef12eef 100644 --- a/src/headless.ts +++ b/src/headless.ts @@ -1316,6 +1316,7 @@ export async function handleHeadlessCommand( const approvalInput: ApprovalCreate = { type: "approval", approvals: executedResults as ApprovalResult[], + otid: randomUUID(), }; // Inject queued skill content as user message parts (LET-7353) @@ -1335,6 +1336,7 @@ export async function handleHeadlessCommand( type: "text" as const, text: sc.content, })), + otid: randomUUID(), }); } } @@ -1462,8 +1464,16 @@ ${SYSTEM_REMINDER_CLOSE} { role: "user", content: contentParts, + otid: randomUUID(), }, ]; + const refreshCurrentInputOtids = () => { + // Terminal stop-reason retries are NEW requests and must not reuse OTIDs. + currentInput = currentInput.map((item) => ({ + ...item, + otid: randomUUID(), + })); + }; // Track lastRunId outside the while loop so it's available in catch block let lastKnownRunId: string | null = null; @@ -1514,6 +1524,7 @@ ${SYSTEM_REMINDER_CLOSE} type: "text" as const, text: sc.content, })), + otid: randomUUID(), }, ]; } @@ -1569,12 +1580,11 @@ ${SYSTEM_REMINDER_CLOSE} continue; } - // Check for 409 "conversation busy" error - retry once with delay - // TODO: Add pre-stream resume logic for parity with App.tsx. - // Before waiting, attempt to discover the in-flight run via - // discoverFallbackRunIdWithTimeout() and resume its stream with - // client.runs.messages.stream() + drainStream(). See App.tsx - // retry_conversation_busy handler for reference implementation. + // Check for 409 "conversation busy" - wait and retry. + // Stream resume is not attempted here: without OTID validation we + // cannot confirm the in-flight run belongs to this request (e.g. two + // terminals on the same agent). App.tsx handles resume with proper + // context via discoverFallbackRunIdWithTimeout. if (preStreamAction === "retry_conversation_busy") { conversationBusyRetries += 1; const retryDelayMs = getRetryDelayMs({ @@ -1582,11 +1592,10 @@ ${SYSTEM_REMINDER_CLOSE} attempt: conversationBusyRetries, }); - // Emit retry message for stream-json mode if (outputFormat === "stream-json") { const retryMsg: RetryMessage = { type: "retry", - reason: "error", // 409 conversation busy is a pre-stream error + reason: "error", attempt: conversationBusyRetries, max_attempts: CONVERSATION_BUSY_MAX_RETRIES, delay_ms: retryDelayMs, @@ -1600,7 +1609,6 @@ ${SYSTEM_REMINDER_CLOSE} ); } - // Wait before retry await new Promise((resolve) => setTimeout(resolve, retryDelayMs)); continue; } @@ -1918,12 +1926,12 @@ ${SYSTEM_REMINDER_CLOSE} ); // Send all results in one batch - currentInput = [ - { - type: "approval", - approvals: executedResults as ApprovalResult[], - }, - ]; + const approvalInputWithOtid = { + type: "approval" as const, + approvals: executedResults as ApprovalResult[], + otid: randomUUID(), + }; + currentInput = [approvalInputWithOtid]; continue; } @@ -1979,6 +1987,8 @@ ${SYSTEM_REMINDER_CLOSE} // Exponential backoff before retrying the same input await new Promise((resolve) => setTimeout(resolve, delayMs)); + // Post-stream retry creates a new run/request. + refreshCurrentInputOtids(); continue; } } @@ -2092,6 +2102,7 @@ ${SYSTEM_REMINDER_CLOSE} const nudgeMessage: MessageCreate = { role: "system", content: `The previous response was empty. Please provide a response with either text content or a tool call.`, + otid: randomUUID(), }; currentInput = [...currentInput, nudgeMessage]; } @@ -2115,6 +2126,8 @@ ${SYSTEM_REMINDER_CLOSE} } await new Promise((resolve) => setTimeout(resolve, delayMs)); + // Empty-response retry creates a new run/request. + refreshCurrentInputOtids(); continue; } @@ -2148,6 +2161,8 @@ ${SYSTEM_REMINDER_CLOSE} } await new Promise((resolve) => setTimeout(resolve, delayMs)); + // Post-stream retry creates a new run/request. + refreshCurrentInputOtids(); continue; } } catch (_e) { @@ -2481,6 +2496,7 @@ async function runBidirectionalMode( const approvalInput: ApprovalCreate = { type: "approval", approvals: executedResults as ApprovalResult[], + otid: randomUUID(), }; const approvalMessages: Array< @@ -2500,6 +2516,7 @@ async function runBidirectionalMode( type: "text" as const, text: sc.content, })), + otid: randomUUID(), }); } } @@ -2902,6 +2919,7 @@ async function runBidirectionalMode( const approvalInput: ApprovalCreate = { type: "approval", approvals: executedResults as ApprovalResult[], + otid: randomUUID(), }; const approvalStream = await sendMessageStream( targetConversationId, @@ -3628,12 +3646,12 @@ async function runBidirectionalMode( ); // Send approval results back to continue - currentInput = [ - { - type: "approval", - approvals: executedResults, - } as unknown as MessageCreate, - ]; + const approvalInputWithOtid = { + type: "approval" as const, + approvals: executedResults, + otid: randomUUID(), + }; + currentInput = [approvalInputWithOtid as unknown as MessageCreate]; // Continue the loop to process the next stream continue; diff --git a/src/tests/websocket/listen-client-concurrency.test.ts b/src/tests/websocket/listen-client-concurrency.test.ts index a323419..28b1b56 100644 --- a/src/tests/websocket/listen-client-concurrency.test.ts +++ b/src/tests/websocket/listen-client-concurrency.test.ts @@ -767,10 +767,13 @@ describe("listen-client multi-worker concurrency", () => { | Array> | undefined; expect(continuationMessages).toHaveLength(2); - expect(continuationMessages?.[0]).toEqual({ - type: "approval", - approvals: [approvalResult], - }); + expect(continuationMessages?.[0]).toEqual( + expect.objectContaining({ + type: "approval", + approvals: [approvalResult], + otid: expect.any(String), + }), + ); expect(continuationMessages?.[1]).toEqual({ role: "user", content: [ diff --git a/src/websocket/listener/interrupts.ts b/src/websocket/listener/interrupts.ts index 5ab887e..d67bf9d 100644 --- a/src/websocket/listener/interrupts.ts +++ b/src/websocket/listener/interrupts.ts @@ -471,7 +471,11 @@ export function consumeInterruptQueue( agentId: string, conversationId: string, ): { - approvalMessage: { type: "approval"; approvals: ApprovalResult[] }; + approvalMessage: { + type: "approval"; + approvals: ApprovalResult[]; + otid?: string; + }; interruptedToolCallIds: string[]; } | null { if ( @@ -483,7 +487,11 @@ export function consumeInterruptQueue( const ctx = runtime.pendingInterruptedContext; let result: { - approvalMessage: { type: "approval"; approvals: ApprovalResult[] }; + approvalMessage: { + type: "approval"; + approvals: ApprovalResult[]; + otid?: string; + }; interruptedToolCallIds: string[]; } | null = null; @@ -497,6 +505,7 @@ export function consumeInterruptQueue( approvalMessage: { type: "approval", approvals: runtime.pendingInterruptedResults, + otid: crypto.randomUUID(), }, interruptedToolCallIds: runtime.pendingInterruptedToolCallIds ? [...runtime.pendingInterruptedToolCallIds] diff --git a/src/websocket/listener/send.ts b/src/websocket/listener/send.ts index f103996..af0bee8 100644 --- a/src/websocket/listener/send.ts +++ b/src/websocket/listener/send.ts @@ -290,6 +290,7 @@ export async function resolveStaleApprovals( { type: "approval", approvals: approvalResults, + otid: crypto.randomUUID(), }, ]; const consumedQueuedTurn = consumeQueuedTurn(runtime); diff --git a/src/websocket/listener/turn.ts b/src/websocket/listener/turn.ts index 3530e61..8ceb225 100644 --- a/src/websocket/listener/turn.ts +++ b/src/websocket/listener/turn.ts @@ -183,7 +183,11 @@ export async function handleIncomingMessage( queuedInterruptedToolCallIds = consumed.interruptedToolCallIds; } - messagesToSend.push(...normalizedMessages); + messagesToSend.push( + ...normalizedMessages.map((m) => + "content" in m && !m.otid ? { ...m, otid: crypto.randomUUID() } : m, + ), + ); const firstMessage = normalizedMessages[0]; const isApprovalMessage =