feat: add OTID wiring for request deduplication (#1497)

Co-authored-by: Letta Code <noreply@letta.com>
This commit is contained in:
cthomas
2026-03-23 18:36:55 -07:00
committed by GitHub
parent ad56c3d273
commit b445da0ce0
7 changed files with 176 additions and 206 deletions

View File

@@ -6,6 +6,7 @@
* action. No network calls, no React, no stream-json output.
*/
import { randomUUID } from "node:crypto";
import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents";
import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages";
import { isCloudflareEdge52xHtmlError } from "../cli/helpers/errorFormatter";
@@ -368,7 +369,10 @@ export function rebuildInputWithFreshDenials(
serverApprovals: PendingApprovalInfo[],
denialReason: string,
): Array<MessageCreate | ApprovalCreate> {
const stripped = currentInput.filter((item) => item?.type !== "approval");
// Refresh OTIDs on all stripped messages — this is a new request, not a retry
const stripped = currentInput
.filter((item) => item?.type !== "approval")
.map((item) => ({ ...item, otid: randomUUID() }));
if (serverApprovals.length > 0) {
const denials: ApprovalCreate = {
@@ -379,6 +383,7 @@ export function rebuildInputWithFreshDenials(
approve: false,
reason: denialReason,
})),
otid: randomUUID(),
};
return [denials, ...stripped];
}

View File

@@ -1,5 +1,6 @@
// src/cli/App.tsx
import { randomUUID } from "node:crypto";
import { existsSync, readFileSync, renameSync, writeFileSync } from "node:fs";
import { homedir, tmpdir } from "node:os";
import { join, relative } from "node:path";
@@ -281,13 +282,7 @@ import {
import { formatStatusLineHelp } from "./helpers/statusLineHelp";
import { buildStatusLinePayload } from "./helpers/statusLinePayload";
import { executeStatusLineCommand } from "./helpers/statusLineRuntime";
import {
type ApprovalRequest,
type DrainResult,
discoverFallbackRunIdWithTimeout,
drainStream,
drainStreamWithResume,
} from "./helpers/stream";
import { type ApprovalRequest, drainStreamWithResume } from "./helpers/stream";
import {
collectFinishedTaskToolCalls,
createSubagentGroupItem,
@@ -3866,6 +3861,7 @@ export default function App({
type: "message",
role: "user",
content: `${systemMsg}\n\n${newState.originalPrompt}`,
otid: randomUUID(),
},
],
{ allowReentry: true },
@@ -3875,6 +3871,13 @@ export default function App({
// Copy so we can safely mutate for retry recovery flows
let currentInput = [...initialInput];
const refreshCurrentInputOtids = () => {
// Terminal stop-reason retries are NEW requests and must not reuse OTIDs.
currentInput = currentInput.map((item) => ({
...item,
otid: randomUUID(),
}));
};
const allowReentry = options?.allowReentry ?? false;
const hasApprovalInput = initialInput.some(
(item) => item.type === "approval",
@@ -3957,11 +3960,16 @@ export default function App({
canInjectInterruptRecovery
) {
currentInput = [
...lastSentInputRef.current,
// Refresh OTIDs — this is a new request, not a retry of the interrupted one
...lastSentInputRef.current.map((m) => ({
...m,
otid: randomUUID(),
})),
...currentInput.map((m) =>
m.type === "message" && m.role === "user"
? {
...m,
otid: randomUUID(),
content: [
{ type: "text" as const, text: INTERRUPT_RECOVERY_ALERT },
...(typeof m.content === "string"
@@ -3969,7 +3977,7 @@ export default function App({
: m.content),
],
}
: m,
: { ...m, otid: randomUUID() },
),
];
pendingInterruptRecoveryConversationIdRef.current = null;
@@ -4035,23 +4043,22 @@ export default function App({
// Inject queued skill content as user message parts (LET-7353)
// This centralizes skill content injection so all approval-send paths
// automatically get skill SKILL.md content alongside tool results.
{
const { consumeQueuedSkillContent } = await import(
"../tools/impl/skillContentRegistry"
);
const skillContents = consumeQueuedSkillContent();
if (skillContents.length > 0) {
currentInput = [
...currentInput,
{
role: "user",
content: skillContents.map((sc) => ({
type: "text" as const,
text: sc.content,
})),
},
];
}
const { consumeQueuedSkillContent } = await import(
"../tools/impl/skillContentRegistry"
);
const skillContents = consumeQueuedSkillContent();
if (skillContents.length > 0) {
currentInput = [
...currentInput,
{
role: "user",
content: skillContents.map((sc) => ({
type: "text" as const,
text: sc.content,
})),
otid: randomUUID(),
},
];
}
// Stream one turn - use ref to always get the latest conversationId
@@ -4061,7 +4068,6 @@ export default function App({
let stream: Awaited<ReturnType<typeof sendMessageStream>> | null =
null;
let turnToolContextId: string | null = null;
let preStreamResumeResult: DrainResult | null = null;
try {
const nextStream = await sendMessageStream(
conversationIdRef.current,
@@ -4158,135 +4164,42 @@ export default function App({
},
);
// Attempt to discover and resume the in-flight run before waiting
try {
const resumeCtx: StreamRequestContext = {
conversationId: conversationIdRef.current,
resolvedConversationId: conversationIdRef.current,
agentId: agentIdRef.current,
requestStartedAtMs,
};
debugLog(
"stream",
"Conversation busy: attempting run discovery for resume (conv=%s, agent=%s)",
resumeCtx.conversationId,
resumeCtx.agentId,
);
const client = await getClient();
const discoveredRunId = await discoverFallbackRunIdWithTimeout(
client,
resumeCtx,
);
debugLog(
"stream",
"Run discovery result: %s",
discoveredRunId ?? "none",
);
// Show status message
const statusId = uid("status");
buffersRef.current.byId.set(statusId, {
kind: "status",
id: statusId,
lines: ["Conversation is busy, waiting and retrying…"],
});
buffersRef.current.order.push(statusId);
refreshDerived();
if (discoveredRunId) {
if (signal?.aborted || userCancelledRef.current) {
const isStaleAtAbort =
myGeneration !== conversationGenerationRef.current;
if (!isStaleAtAbort) {
setStreaming(false);
}
return;
}
// Found a running run — resume its stream
buffersRef.current.interrupted = false;
buffersRef.current.commitGeneration =
(buffersRef.current.commitGeneration || 0) + 1;
const resumeStream = await client.runs.messages.stream(
discoveredRunId,
{
starting_after: 0,
batch_size: 1000,
},
);
preStreamResumeResult = await drainStream(
resumeStream,
buffersRef.current,
refreshDerivedThrottled,
signal,
undefined, // no handleFirstMessage on resume
undefined,
contextTrackerRef.current,
highestSeqIdSeen,
);
// Attach the discovered run ID
if (!preStreamResumeResult.lastRunId) {
preStreamResumeResult.lastRunId = discoveredRunId;
}
debugLog(
"stream",
"Pre-stream resume succeeded (runId=%s, stopReason=%s)",
discoveredRunId,
preStreamResumeResult.stopReason,
);
// Fall through — preStreamResumeResult will short-circuit drainStreamWithResume
// Wait with abort checking (same pattern as LLM API error retry)
let cancelled = false;
const startTime = Date.now();
while (Date.now() - startTime < retryDelayMs) {
if (
abortControllerRef.current?.signal.aborted ||
userCancelledRef.current
) {
cancelled = true;
break;
}
} catch (resumeError) {
if (signal?.aborted || userCancelledRef.current) {
const isStaleAtAbort =
myGeneration !== conversationGenerationRef.current;
if (!isStaleAtAbort) {
setStreaming(false);
}
return;
}
debugLog(
"stream",
"Pre-stream resume failed, falling back to wait/retry: %s",
resumeError instanceof Error
? resumeError.message
: String(resumeError),
);
// Fall through to existing wait/retry behavior
await new Promise((resolve) => setTimeout(resolve, 100));
}
// If resume succeeded, skip the wait/retry loop
if (!preStreamResumeResult) {
// Show status message
const statusId = uid("status");
buffersRef.current.byId.set(statusId, {
kind: "status",
id: statusId,
lines: ["Conversation is busy, waiting and retrying…"],
});
buffersRef.current.order.push(statusId);
refreshDerived();
// Remove status message
buffersRef.current.byId.delete(statusId);
buffersRef.current.order = buffersRef.current.order.filter(
(id) => id !== statusId,
);
refreshDerived();
// Wait with abort checking (same pattern as LLM API error retry)
let cancelled = false;
const startTime = Date.now();
while (Date.now() - startTime < retryDelayMs) {
if (
abortControllerRef.current?.signal.aborted ||
userCancelledRef.current
) {
cancelled = true;
break;
}
await new Promise((resolve) => setTimeout(resolve, 100));
}
// Remove status message
buffersRef.current.byId.delete(statusId);
buffersRef.current.order = buffersRef.current.order.filter(
(id) => id !== statusId,
);
refreshDerived();
if (!cancelled) {
// Reset interrupted flag so retry stream chunks are processed
buffersRef.current.interrupted = false;
restorePinnedPermissionMode();
continue;
}
if (!cancelled) {
// Reset interrupted flag so retry stream chunks are processed
buffersRef.current.interrupted = false;
restorePinnedPermissionMode();
continue;
}
// User pressed ESC - fall through to error handling
}
@@ -4466,10 +4379,7 @@ export default function App({
}
// Not a recoverable desync - re-throw to outer catch
// (unless pre-stream resume already succeeded)
if (!preStreamResumeResult) {
throw preStreamError;
}
throw preStreamError;
}
// Check again after network call - user may have pressed Escape during sendMessageStream
@@ -4575,25 +4485,19 @@ export default function App({
contextTrackerRef.current.currentTurnId++;
}
const drainResult = preStreamResumeResult
? preStreamResumeResult
: (() => {
if (!stream) {
throw new Error(
"Expected stream when pre-stream resume did not succeed",
);
}
return drainStreamWithResume(
stream,
buffersRef.current,
refreshDerivedThrottled,
signal, // Use captured signal, not ref (which may be nulled by handleInterrupt)
handleFirstMessage,
undefined,
contextTrackerRef.current,
highestSeqIdSeen,
);
})();
if (!stream) {
throw new Error("Expected stream to be set before drain");
}
const drainResult = drainStreamWithResume(
stream,
buffersRef.current,
refreshDerivedThrottled,
signal, // Use captured signal, not ref (which may be nulled by handleInterrupt)
handleFirstMessage,
undefined,
contextTrackerRef.current,
highestSeqIdSeen,
);
const {
stopReason,
@@ -4751,6 +4655,7 @@ export default function App({
refreshDerived();
// Continue conversation with the hook feedback
const hookMessageOtid = randomUUID();
setTimeout(() => {
processConversation(
[
@@ -4758,6 +4663,7 @@ export default function App({
type: "message",
role: "user",
content: hookMessage,
otid: hookMessageOtid,
},
],
{ allowReentry: true },
@@ -5250,11 +5156,16 @@ export default function App({
toolResultsInFlightRef.current = true;
await processConversation(
[
{ type: "approval", approvals: allResults },
{
type: "approval",
approvals: allResults,
otid: randomUUID(),
},
{
type: "message",
role: "user",
content: queuedContentParts,
otid: randomUUID(),
},
],
{ allowReentry: true },
@@ -5300,6 +5211,7 @@ export default function App({
{
type: "approval",
approvals: allResults,
otid: randomUUID(),
},
],
{ allowReentry: true },
@@ -5570,6 +5482,7 @@ export default function App({
type: "message" as const,
role: "system" as const,
content: `<system-reminder>The previous response was empty. Please provide a response with either text content or a tool call.</system-reminder>`,
otid: randomUUID(),
},
];
}
@@ -5593,6 +5506,8 @@ export default function App({
);
refreshDerived();
// Empty-response retry starts a new request/run, so refresh OTIDs.
refreshCurrentInputOtids();
buffersRef.current.interrupted = false;
continue;
}
@@ -5607,6 +5522,9 @@ export default function App({
retriable &&
llmApiErrorRetriesRef.current < LLM_API_ERROR_MAX_RETRIES
) {
// Do NOT replay the same run for terminal post-stream errors
// (e.g. llm_api_error). A retry should create a new run.
llmApiErrorRetriesRef.current += 1;
const attempt = llmApiErrorRetriesRef.current;
const delayMs = getRetryDelayMs({
@@ -5674,9 +5592,11 @@ export default function App({
}
if (!cancelled) {
// Post-stream retry is a new request/run, so refresh OTIDs.
refreshCurrentInputOtids();
// Reset interrupted flag so retry stream chunks are processed
buffersRef.current.interrupted = false;
// Retry by continuing the while loop (same currentInput)
// Retry by continuing the while loop with fresh OTIDs.
continue;
}
// User pressed ESC - fall through to error handling
@@ -6939,7 +6859,7 @@ export default function App({
if (allResults.length > 0) {
toolResultsInFlightRef.current = true;
await processConversation([
{ type: "approval", approvals: allResults },
{ type: "approval", approvals: allResults, otid: randomUUID() },
]);
toolResultsInFlightRef.current = false;
@@ -8043,6 +7963,7 @@ export default function App({
type: "message",
role: "user",
content: buildTextParts(systemMsg, prompt),
otid: randomUUID(),
},
]);
} else {
@@ -9464,6 +9385,7 @@ export default function App({
type: "message",
role: "user",
content: buildTextParts(skillMessage),
otid: randomUUID(),
},
]);
} catch (error) {
@@ -9527,6 +9449,7 @@ export default function App({
type: "message",
role: "user",
content: rememberParts,
otid: randomUUID(),
},
]);
} catch (error) {
@@ -9683,6 +9606,7 @@ export default function App({
type: "message",
role: "user",
content: buildTextParts(initMessage),
otid: randomUUID(),
},
]);
} catch (error) {
@@ -9799,6 +9723,7 @@ export default function App({
content: buildTextParts(
`${SYSTEM_REMINDER_OPEN}\n${prompt}\n${SYSTEM_REMINDER_CLOSE}`,
),
otid: randomUUID(),
},
]);
} catch (error) {
@@ -10417,12 +10342,14 @@ ${SYSTEM_REMINDER_CLOSE}
{
type: "approval",
approvals: recoveryApprovalResults,
otid: randomUUID(),
},
{
type: "message",
role: "user",
content:
messageContent as unknown as MessageCreate["content"],
otid: randomUUID(),
},
];
@@ -10698,6 +10625,7 @@ ${SYSTEM_REMINDER_CLOSE}
type: "message",
role: "user",
content: messageContent as unknown as MessageCreate["content"],
otid: randomUUID(),
});
await processConversation(initialInput, {
@@ -11046,6 +10974,7 @@ ${SYSTEM_REMINDER_CLOSE}
type: "message",
role: "user",
content: buildQueuedContentParts(queuedItemsToAppend),
otid: randomUUID(),
});
refreshDerived();
} else if (hadNotifications) {
@@ -11315,6 +11244,7 @@ ${SYSTEM_REMINDER_CLOSE}
{
type: "approval",
approvals: allResults as ApprovalResult[],
otid: randomUUID(),
},
]);
} finally {

View File

@@ -1316,6 +1316,7 @@ export async function handleHeadlessCommand(
const approvalInput: ApprovalCreate = {
type: "approval",
approvals: executedResults as ApprovalResult[],
otid: randomUUID(),
};
// Inject queued skill content as user message parts (LET-7353)
@@ -1335,6 +1336,7 @@ export async function handleHeadlessCommand(
type: "text" as const,
text: sc.content,
})),
otid: randomUUID(),
});
}
}
@@ -1462,8 +1464,16 @@ ${SYSTEM_REMINDER_CLOSE}
{
role: "user",
content: contentParts,
otid: randomUUID(),
},
];
const refreshCurrentInputOtids = () => {
// Terminal stop-reason retries are NEW requests and must not reuse OTIDs.
currentInput = currentInput.map((item) => ({
...item,
otid: randomUUID(),
}));
};
// Track lastRunId outside the while loop so it's available in catch block
let lastKnownRunId: string | null = null;
@@ -1514,6 +1524,7 @@ ${SYSTEM_REMINDER_CLOSE}
type: "text" as const,
text: sc.content,
})),
otid: randomUUID(),
},
];
}
@@ -1569,12 +1580,11 @@ ${SYSTEM_REMINDER_CLOSE}
continue;
}
// Check for 409 "conversation busy" error - retry once with delay
// TODO: Add pre-stream resume logic for parity with App.tsx.
// Before waiting, attempt to discover the in-flight run via
// discoverFallbackRunIdWithTimeout() and resume its stream with
// client.runs.messages.stream() + drainStream(). See App.tsx
// retry_conversation_busy handler for reference implementation.
// Check for 409 "conversation busy" - wait and retry.
// Stream resume is not attempted here: without OTID validation we
// cannot confirm the in-flight run belongs to this request (e.g. two
// terminals on the same agent). App.tsx handles resume with proper
// context via discoverFallbackRunIdWithTimeout.
if (preStreamAction === "retry_conversation_busy") {
conversationBusyRetries += 1;
const retryDelayMs = getRetryDelayMs({
@@ -1582,11 +1592,10 @@ ${SYSTEM_REMINDER_CLOSE}
attempt: conversationBusyRetries,
});
// Emit retry message for stream-json mode
if (outputFormat === "stream-json") {
const retryMsg: RetryMessage = {
type: "retry",
reason: "error", // 409 conversation busy is a pre-stream error
reason: "error",
attempt: conversationBusyRetries,
max_attempts: CONVERSATION_BUSY_MAX_RETRIES,
delay_ms: retryDelayMs,
@@ -1600,7 +1609,6 @@ ${SYSTEM_REMINDER_CLOSE}
);
}
// Wait before retry
await new Promise((resolve) => setTimeout(resolve, retryDelayMs));
continue;
}
@@ -1918,12 +1926,12 @@ ${SYSTEM_REMINDER_CLOSE}
);
// Send all results in one batch
currentInput = [
{
type: "approval",
approvals: executedResults as ApprovalResult[],
},
];
const approvalInputWithOtid = {
type: "approval" as const,
approvals: executedResults as ApprovalResult[],
otid: randomUUID(),
};
currentInput = [approvalInputWithOtid];
continue;
}
@@ -1979,6 +1987,8 @@ ${SYSTEM_REMINDER_CLOSE}
// Exponential backoff before retrying the same input
await new Promise((resolve) => setTimeout(resolve, delayMs));
// Post-stream retry creates a new run/request.
refreshCurrentInputOtids();
continue;
}
}
@@ -2092,6 +2102,7 @@ ${SYSTEM_REMINDER_CLOSE}
const nudgeMessage: MessageCreate = {
role: "system",
content: `<system-reminder>The previous response was empty. Please provide a response with either text content or a tool call.</system-reminder>`,
otid: randomUUID(),
};
currentInput = [...currentInput, nudgeMessage];
}
@@ -2115,6 +2126,8 @@ ${SYSTEM_REMINDER_CLOSE}
}
await new Promise((resolve) => setTimeout(resolve, delayMs));
// Empty-response retry creates a new run/request.
refreshCurrentInputOtids();
continue;
}
@@ -2148,6 +2161,8 @@ ${SYSTEM_REMINDER_CLOSE}
}
await new Promise((resolve) => setTimeout(resolve, delayMs));
// Post-stream retry creates a new run/request.
refreshCurrentInputOtids();
continue;
}
} catch (_e) {
@@ -2481,6 +2496,7 @@ async function runBidirectionalMode(
const approvalInput: ApprovalCreate = {
type: "approval",
approvals: executedResults as ApprovalResult[],
otid: randomUUID(),
};
const approvalMessages: Array<
@@ -2500,6 +2516,7 @@ async function runBidirectionalMode(
type: "text" as const,
text: sc.content,
})),
otid: randomUUID(),
});
}
}
@@ -2902,6 +2919,7 @@ async function runBidirectionalMode(
const approvalInput: ApprovalCreate = {
type: "approval",
approvals: executedResults as ApprovalResult[],
otid: randomUUID(),
};
const approvalStream = await sendMessageStream(
targetConversationId,
@@ -3628,12 +3646,12 @@ async function runBidirectionalMode(
);
// Send approval results back to continue
currentInput = [
{
type: "approval",
approvals: executedResults,
} as unknown as MessageCreate,
];
const approvalInputWithOtid = {
type: "approval" as const,
approvals: executedResults,
otid: randomUUID(),
};
currentInput = [approvalInputWithOtid as unknown as MessageCreate];
// Continue the loop to process the next stream
continue;

View File

@@ -767,10 +767,13 @@ describe("listen-client multi-worker concurrency", () => {
| Array<Record<string, unknown>>
| undefined;
expect(continuationMessages).toHaveLength(2);
expect(continuationMessages?.[0]).toEqual({
type: "approval",
approvals: [approvalResult],
});
expect(continuationMessages?.[0]).toEqual(
expect.objectContaining({
type: "approval",
approvals: [approvalResult],
otid: expect.any(String),
}),
);
expect(continuationMessages?.[1]).toEqual({
role: "user",
content: [

View File

@@ -471,7 +471,11 @@ export function consumeInterruptQueue(
agentId: string,
conversationId: string,
): {
approvalMessage: { type: "approval"; approvals: ApprovalResult[] };
approvalMessage: {
type: "approval";
approvals: ApprovalResult[];
otid?: string;
};
interruptedToolCallIds: string[];
} | null {
if (
@@ -483,7 +487,11 @@ export function consumeInterruptQueue(
const ctx = runtime.pendingInterruptedContext;
let result: {
approvalMessage: { type: "approval"; approvals: ApprovalResult[] };
approvalMessage: {
type: "approval";
approvals: ApprovalResult[];
otid?: string;
};
interruptedToolCallIds: string[];
} | null = null;
@@ -497,6 +505,7 @@ export function consumeInterruptQueue(
approvalMessage: {
type: "approval",
approvals: runtime.pendingInterruptedResults,
otid: crypto.randomUUID(),
},
interruptedToolCallIds: runtime.pendingInterruptedToolCallIds
? [...runtime.pendingInterruptedToolCallIds]

View File

@@ -290,6 +290,7 @@ export async function resolveStaleApprovals(
{
type: "approval",
approvals: approvalResults,
otid: crypto.randomUUID(),
},
];
const consumedQueuedTurn = consumeQueuedTurn(runtime);

View File

@@ -183,7 +183,11 @@ export async function handleIncomingMessage(
queuedInterruptedToolCallIds = consumed.interruptedToolCallIds;
}
messagesToSend.push(...normalizedMessages);
messagesToSend.push(
...normalizedMessages.map((m) =>
"content" in m && !m.otid ? { ...m, otid: crypto.randomUUID() } : m,
),
);
const firstMessage = normalizedMessages[0];
const isApprovalMessage =