diff --git a/src/agent/message.ts b/src/agent/message.ts index a67a821..24b9ce7 100644 --- a/src/agent/message.ts +++ b/src/agent/message.ts @@ -19,13 +19,20 @@ export async function sendMessageStream( background?: boolean; // add more later: includePings, request timeouts, etc. } = { streamTokens: true, background: true }, + // Disable SDK retries by default - state management happens outside the stream, + // so retries would violate idempotency and create race conditions + requestOptions: { maxRetries?: number } = { maxRetries: 0 }, ): Promise> { const client = await getClient(); - return client.agents.messages.create(agentId, { - messages: messages, - streaming: true, - stream_tokens: opts.streamTokens ?? true, - background: opts.background ?? true, - client_tools: getClientToolsFromRegistry(), - }); + return client.agents.messages.create( + agentId, + { + messages: messages, + streaming: true, + stream_tokens: opts.streamTokens ?? true, + background: opts.background ?? true, + client_tools: getClientToolsFromRegistry(), + }, + requestOptions, + ); } diff --git a/src/cli/helpers/stream.ts b/src/cli/helpers/stream.ts index e48a244..d4c447a 100644 --- a/src/cli/helpers/stream.ts +++ b/src/cli/helpers/stream.ts @@ -376,10 +376,15 @@ export async function drainStreamWithResume( buffers.interrupted = false; // Resume from Redis where we left off - const resumeStream = await client.runs.messages.stream(result.lastRunId, { - starting_after: result.lastSeqId, - batch_size: 1000, // Fetch buffered chunks quickly - }); + // Disable SDK retries - state management happens outside, retries would create race conditions + const resumeStream = await client.runs.messages.stream( + result.lastRunId, + { + starting_after: result.lastSeqId, + batch_size: 1000, // Fetch buffered chunks quickly + }, + { maxRetries: 0 }, + ); // Continue draining from where we left off // Note: Don't pass onFirstMessage again - already called in initial drain