fix(queue): dequeueInFlightRef lock to prevent duplicate dequeue submissions (#1479)
Co-authored-by: Letta Code <noreply@letta.com>
This commit is contained in:
@@ -1899,6 +1899,8 @@ export default function App({
|
|||||||
// Epoch counter to force dequeue effect re-run when refs change but state doesn't
|
// Epoch counter to force dequeue effect re-run when refs change but state doesn't
|
||||||
// Incremented when userCancelledRef is reset while messages are queued
|
// Incremented when userCancelledRef is reset while messages are queued
|
||||||
const [dequeueEpoch, setDequeueEpoch] = useState(0);
|
const [dequeueEpoch, setDequeueEpoch] = useState(0);
|
||||||
|
// Strict lock to ensure dequeue submit path is at-most-once while onSubmit is in flight.
|
||||||
|
const dequeueInFlightRef = useRef(false);
|
||||||
|
|
||||||
// Track last dequeued message for restoration on error
|
// Track last dequeued message for restoration on error
|
||||||
// If an error occurs after dequeue, we restore this to the input field (if input is empty)
|
// If an error occurs after dequeue, we restore this to the input field (if input is empty)
|
||||||
@@ -10695,7 +10697,8 @@ ${SYSTEM_REMINDER_CLOSE}
|
|||||||
!anySelectorOpen && // Don't dequeue while a selector/overlay is open
|
!anySelectorOpen && // Don't dequeue while a selector/overlay is open
|
||||||
!waitingForQueueCancelRef.current && // Don't dequeue while waiting for cancel
|
!waitingForQueueCancelRef.current && // Don't dequeue while waiting for cancel
|
||||||
!userCancelledRef.current && // Don't dequeue if user just cancelled
|
!userCancelledRef.current && // Don't dequeue if user just cancelled
|
||||||
!abortControllerRef.current // Don't dequeue while processConversation is still active
|
!abortControllerRef.current && // Don't dequeue while processConversation is still active
|
||||||
|
!dequeueInFlightRef.current // Don't dequeue while previous dequeue submit is still in flight
|
||||||
) {
|
) {
|
||||||
// consumeItems(n) fires onDequeued → setQueueDisplay(prev => prev.slice(n)).
|
// consumeItems(n) fires onDequeued → setQueueDisplay(prev => prev.slice(n)).
|
||||||
const batch = tuiQueueRef.current?.consumeItems(queueLen);
|
const batch = tuiQueueRef.current?.consumeItems(queueLen);
|
||||||
@@ -10725,7 +10728,16 @@ ${SYSTEM_REMINDER_CLOSE}
|
|||||||
|
|
||||||
// Submit via normal flow — overrideContentPartsRef carries rich content parts.
|
// Submit via normal flow — overrideContentPartsRef carries rich content parts.
|
||||||
overrideContentPartsRef.current = queuedContentParts;
|
overrideContentPartsRef.current = queuedContentParts;
|
||||||
onSubmitRef.current(concatenatedMessage);
|
// Lock prevents re-entrant dequeue if deps churn before processConversation
|
||||||
|
// sets abortControllerRef (which is the normal long-term gate).
|
||||||
|
dequeueInFlightRef.current = true;
|
||||||
|
void onSubmitRef.current(concatenatedMessage).finally(() => {
|
||||||
|
dequeueInFlightRef.current = false;
|
||||||
|
// If more items arrived while in-flight, bump epoch so the effect re-runs.
|
||||||
|
if ((tuiQueueRef.current?.length ?? 0) > 0) {
|
||||||
|
setDequeueEpoch((e) => e + 1);
|
||||||
|
}
|
||||||
|
});
|
||||||
} else if (hasAnythingQueued) {
|
} else if (hasAnythingQueued) {
|
||||||
// Log why dequeue was blocked (useful for debugging stuck queues)
|
// Log why dequeue was blocked (useful for debugging stuck queues)
|
||||||
debugLog(
|
debugLog(
|
||||||
|
|||||||
@@ -33,7 +33,8 @@ describe("queue ordering wiring", () => {
|
|||||||
// Queue is now drained via QueueRuntime.consumeItems; setQueueDisplay is
|
// Queue is now drained via QueueRuntime.consumeItems; setQueueDisplay is
|
||||||
// updated automatically via the onDequeued callback — no direct setState here.
|
// updated automatically via the onDequeued callback — no direct setState here.
|
||||||
expect(segment).toContain("tuiQueueRef.current?.consumeItems(queueLen)");
|
expect(segment).toContain("tuiQueueRef.current?.consumeItems(queueLen)");
|
||||||
expect(segment).toContain("onSubmitRef.current(concatenatedMessage);");
|
expect(segment).toContain("onSubmitRef.current(concatenatedMessage)");
|
||||||
|
expect(segment).toContain("!dequeueInFlightRef.current");
|
||||||
expect(segment).toContain("queuedOverlayAction,");
|
expect(segment).toContain("queuedOverlayAction,");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user