fix(stream): dedup replayed seq_id chunks across retries (#1478)
Co-authored-by: Letta Code <noreply@letta.com>
This commit is contained in:
@@ -4006,6 +4006,7 @@ export default function App({
|
||||
// Capture once before the retry loop so the temporal filter in
|
||||
// discoverFallbackRunIdForResume covers runs created by any attempt.
|
||||
const requestStartedAtMs = Date.now();
|
||||
let highestSeqIdSeen: number | null = null;
|
||||
|
||||
while (true) {
|
||||
// Capture the signal BEFORE any async operations
|
||||
@@ -4206,6 +4207,7 @@ export default function App({
|
||||
undefined, // no handleFirstMessage on resume
|
||||
undefined,
|
||||
contextTrackerRef.current,
|
||||
highestSeqIdSeen,
|
||||
);
|
||||
// Attach the discovered run ID
|
||||
if (!preStreamResumeResult.lastRunId) {
|
||||
@@ -4582,6 +4584,7 @@ export default function App({
|
||||
handleFirstMessage,
|
||||
undefined,
|
||||
contextTrackerRef.current,
|
||||
highestSeqIdSeen,
|
||||
);
|
||||
})();
|
||||
|
||||
@@ -4591,9 +4594,14 @@ export default function App({
|
||||
approvals,
|
||||
apiDurationMs,
|
||||
lastRunId,
|
||||
lastSeqId,
|
||||
fallbackError,
|
||||
} = await drainResult;
|
||||
|
||||
if (lastSeqId != null) {
|
||||
highestSeqIdSeen = Math.max(highestSeqIdSeen ?? 0, lastSeqId);
|
||||
}
|
||||
|
||||
// Update currentRunId for error reporting in catch block
|
||||
currentRunId = lastRunId ?? undefined;
|
||||
// Expose to statusline
|
||||
|
||||
@@ -212,12 +212,13 @@ export async function drainStream(
|
||||
onFirstMessage?: () => void,
|
||||
onChunkProcessed?: DrainStreamHook,
|
||||
contextTracker?: ContextTracker,
|
||||
seenSeqIdThreshold?: number | null,
|
||||
): Promise<DrainResult> {
|
||||
const startTime = performance.now();
|
||||
const requestStartTime = getStreamRequestStartTime(stream) ?? startTime;
|
||||
let hasLoggedTTFT = false;
|
||||
|
||||
const streamProcessor = new StreamProcessor();
|
||||
const streamProcessor = new StreamProcessor(seenSeqIdThreshold ?? null);
|
||||
|
||||
let stopReason: StopReasonType | null = null;
|
||||
let hasCalledFirstMessage = false;
|
||||
@@ -488,6 +489,7 @@ export async function drainStreamWithResume(
|
||||
onFirstMessage?: () => void,
|
||||
onChunkProcessed?: DrainStreamHook,
|
||||
contextTracker?: ContextTracker,
|
||||
seenSeqIdThreshold?: number | null,
|
||||
): Promise<DrainResult> {
|
||||
const overallStartTime = performance.now();
|
||||
const streamRequestContext = getStreamRequestContext(stream);
|
||||
@@ -509,6 +511,7 @@ export async function drainStreamWithResume(
|
||||
onFirstMessage,
|
||||
onChunkProcessed,
|
||||
contextTracker,
|
||||
seenSeqIdThreshold,
|
||||
);
|
||||
|
||||
let runIdToResume = result.lastRunId ?? null;
|
||||
@@ -639,6 +642,7 @@ export async function drainStreamWithResume(
|
||||
undefined,
|
||||
onChunkProcessed,
|
||||
contextTracker,
|
||||
seenSeqIdThreshold,
|
||||
);
|
||||
|
||||
// Use the resume result (should have proper stop_reason now)
|
||||
|
||||
@@ -41,9 +41,21 @@ export class StreamProcessor {
|
||||
public lastSeqId: number | null = null;
|
||||
public stopReason: StopReasonType | null = null;
|
||||
|
||||
constructor(private readonly seenSeqIdThreshold: number | null = null) {}
|
||||
|
||||
processChunk(chunk: LettaStreamingResponse): ChunkProcessingResult {
|
||||
let errorInfo: ErrorInfo | undefined;
|
||||
let updatedApproval: ApprovalRequest | undefined;
|
||||
|
||||
if (
|
||||
"seq_id" in chunk &&
|
||||
chunk.seq_id != null &&
|
||||
this.seenSeqIdThreshold != null &&
|
||||
chunk.seq_id <= this.seenSeqIdThreshold
|
||||
) {
|
||||
return { shouldOutput: false };
|
||||
}
|
||||
|
||||
// Store the run_id (for error reporting) and seq_id (for stream resumption)
|
||||
// Capture run_id even if seq_id is missing - we need it for error details
|
||||
if ("run_id" in chunk && chunk.run_id) {
|
||||
|
||||
Reference in New Issue
Block a user