diff --git a/src/cli/App.tsx b/src/cli/App.tsx index 57f913b..d864424 100644 --- a/src/cli/App.tsx +++ b/src/cli/App.tsx @@ -4006,6 +4006,7 @@ export default function App({ // Capture once before the retry loop so the temporal filter in // discoverFallbackRunIdForResume covers runs created by any attempt. const requestStartedAtMs = Date.now(); + let highestSeqIdSeen: number | null = null; while (true) { // Capture the signal BEFORE any async operations @@ -4206,6 +4207,7 @@ export default function App({ undefined, // no handleFirstMessage on resume undefined, contextTrackerRef.current, + highestSeqIdSeen, ); // Attach the discovered run ID if (!preStreamResumeResult.lastRunId) { @@ -4582,6 +4584,7 @@ export default function App({ handleFirstMessage, undefined, contextTrackerRef.current, + highestSeqIdSeen, ); })(); @@ -4591,9 +4594,14 @@ export default function App({ approvals, apiDurationMs, lastRunId, + lastSeqId, fallbackError, } = await drainResult; + if (lastSeqId != null) { + highestSeqIdSeen = Math.max(highestSeqIdSeen ?? 0, lastSeqId); + } + // Update currentRunId for error reporting in catch block currentRunId = lastRunId ?? undefined; // Expose to statusline diff --git a/src/cli/helpers/stream.ts b/src/cli/helpers/stream.ts index 8b0c8c0..85244c1 100644 --- a/src/cli/helpers/stream.ts +++ b/src/cli/helpers/stream.ts @@ -212,12 +212,13 @@ export async function drainStream( onFirstMessage?: () => void, onChunkProcessed?: DrainStreamHook, contextTracker?: ContextTracker, + seenSeqIdThreshold?: number | null, ): Promise { const startTime = performance.now(); const requestStartTime = getStreamRequestStartTime(stream) ?? startTime; let hasLoggedTTFT = false; - const streamProcessor = new StreamProcessor(); + const streamProcessor = new StreamProcessor(seenSeqIdThreshold ?? null); let stopReason: StopReasonType | null = null; let hasCalledFirstMessage = false; @@ -488,6 +489,7 @@ export async function drainStreamWithResume( onFirstMessage?: () => void, onChunkProcessed?: DrainStreamHook, contextTracker?: ContextTracker, + seenSeqIdThreshold?: number | null, ): Promise { const overallStartTime = performance.now(); const streamRequestContext = getStreamRequestContext(stream); @@ -509,6 +511,7 @@ export async function drainStreamWithResume( onFirstMessage, onChunkProcessed, contextTracker, + seenSeqIdThreshold, ); let runIdToResume = result.lastRunId ?? null; @@ -639,6 +642,7 @@ export async function drainStreamWithResume( undefined, onChunkProcessed, contextTracker, + seenSeqIdThreshold, ); // Use the resume result (should have proper stop_reason now) diff --git a/src/cli/helpers/streamProcessor.ts b/src/cli/helpers/streamProcessor.ts index 652be0e..023e449 100644 --- a/src/cli/helpers/streamProcessor.ts +++ b/src/cli/helpers/streamProcessor.ts @@ -41,9 +41,21 @@ export class StreamProcessor { public lastSeqId: number | null = null; public stopReason: StopReasonType | null = null; + constructor(private readonly seenSeqIdThreshold: number | null = null) {} + processChunk(chunk: LettaStreamingResponse): ChunkProcessingResult { let errorInfo: ErrorInfo | undefined; let updatedApproval: ApprovalRequest | undefined; + + if ( + "seq_id" in chunk && + chunk.seq_id != null && + this.seenSeqIdThreshold != null && + chunk.seq_id <= this.seenSeqIdThreshold + ) { + return { shouldOutput: false }; + } + // Store the run_id (for error reporting) and seq_id (for stream resumption) // Capture run_id even if seq_id is missing - we need it for error details if ("run_id" in chunk && chunk.run_id) {