From 277574c7feef26bb85cc28a2c149eaf4099ba35a Mon Sep 17 00:00:00 2001 From: cthomas Date: Fri, 23 Jan 2026 12:35:43 -0800 Subject: [PATCH] feat: implement smoother message queueing (#655) --- bun.lock | 5 +- package.json | 2 +- src/cli/App.tsx | 179 +++++++++++++++++++++++++++++++----------------- 3 files changed, 122 insertions(+), 64 deletions(-) diff --git a/bun.lock b/bun.lock index 5f16797..f3571a7 100644 --- a/bun.lock +++ b/bun.lock @@ -1,10 +1,11 @@ { "lockfileVersion": 1, + "configVersion": 0, "workspaces": { "": { "name": "@letta-ai/letta-code", "dependencies": { - "@letta-ai/letta-client": "1.7.5", + "@letta-ai/letta-client": "^1.7.6", "glob": "^13.0.0", "ink-link": "^5.0.0", "open": "^10.2.0", @@ -90,7 +91,7 @@ "@isaacs/brace-expansion": ["@isaacs/brace-expansion@5.0.0", "", { "dependencies": { "@isaacs/balanced-match": "^4.0.1" } }, "sha512-ZT55BDLV0yv0RBm2czMiZ+SqCGO7AvmOM3G/w2xhVPH+te0aKgFjmBvGlL1dH+ql2tgGO3MVrbb3jCKyvpgnxA=="], - "@letta-ai/letta-client": ["@letta-ai/letta-client@1.7.5", "", {}, "sha512-fyzJ9Bj+8Jf/LGDsPoijwKkddXJl3lII8FDUNkQipV6MQS6vgR+7vrL0QtwMgpwXZr1f47MNb5+Y0O1/TDDsJA=="], + "@letta-ai/letta-client": ["@letta-ai/letta-client@1.7.6", "", {}, "sha512-C/f03uE3TJdgfHk/8rRBxzWvY0YHCYAlrePHcTd0CRHMo++0TA1OTcgiCF+EFVDVYGzfPSeMpqgAZTNvD9r9GQ=="], "@types/bun": ["@types/bun@1.3.1", "", { "dependencies": { "bun-types": "1.3.1" } }, "sha512-4jNMk2/K9YJtfqwoAa28c8wK+T7nvJFOjxI4h/7sORWcypRNxBpr+TPNaCfVWq70tLCJsqoFwcf0oI0JU/fvMQ=="], diff --git a/package.json b/package.json index b6ec4a9..4505a45 100644 --- a/package.json +++ b/package.json @@ -30,7 +30,7 @@ "access": "public" }, "dependencies": { - "@letta-ai/letta-client": "1.7.5", + "@letta-ai/letta-client": "^1.7.6", "glob": "^13.0.0", "ink-link": "^5.0.0", "open": "^10.2.0", diff --git a/src/cli/App.tsx b/src/cli/App.tsx index d79e15a..cc45e87 100644 --- a/src/cli/App.tsx +++ b/src/cli/App.tsx @@ -1114,6 +1114,14 @@ export default function App({ }; }, []); + useEffect(() => { + return () => { + if (queueAppendTimeoutRef.current) { + clearTimeout(queueAppendTimeoutRef.current); + } + }; + }, []); + // Show exit stats on exit (double Ctrl+C) const [showExitStats, setShowExitStats] = useState(false); @@ -1150,7 +1158,11 @@ export default function App({ // Message queue state for queueing messages during streaming const [messageQueue, setMessageQueue] = useState([]); - // Queue cancellation: when any message is queued, we send cancel and wait for stream to end + const messageQueueRef = useRef([]); // For synchronous access + useEffect(() => { + messageQueueRef.current = messageQueue; + }, [messageQueue]); + const waitingForQueueCancelRef = useRef(false); const queueSnapshotRef = useRef([]); const [restoreQueueOnCancel, setRestoreQueueOnCancel] = useState(false); @@ -1159,6 +1171,8 @@ export default function App({ restoreQueueOnCancelRef.current = restoreQueueOnCancel; }, [restoreQueueOnCancel]); + const queueAppendTimeoutRef = useRef(null); // 15s append mode timeout + // Epoch counter to force dequeue effect re-run when refs change but state doesn't // Incremented when userCancelledRef is reset while messages are queued const [dequeueEpoch, setDequeueEpoch] = useState(0); @@ -1182,6 +1196,18 @@ export default function App({ ); }, [isExecutingTool]); + // Consume queued messages for appending to tool results (clears queue + timeout) + const consumeQueuedMessages = useCallback((): string[] | null => { + if (messageQueueRef.current.length === 0) return null; + if (queueAppendTimeoutRef.current) { + clearTimeout(queueAppendTimeoutRef.current); + queueAppendTimeoutRef.current = null; + } + const messages = [...messageQueueRef.current]; + setMessageQueue([]); + return messages; + }, []); + // Helper to wrap async handlers that need to close overlay and lock input // Closes overlay and sets commandRunning before executing, releases lock in finally const withCommandLock = useCallback( @@ -2769,7 +2795,37 @@ export default function App({ return; } - // Check if user queued messages during auto-allowed tool execution + // Append queued messages if any (from 15s append mode) + const queuedMessagesToAppend = consumeQueuedMessages(); + if (queuedMessagesToAppend?.length) { + for (const msg of queuedMessagesToAppend) { + const userId = uid("user"); + buffersRef.current.byId.set(userId, { + kind: "user", + id: userId, + text: msg, + }); + buffersRef.current.order.push(userId); + } + setThinkingMessage(getRandomThinkingVerb()); + refreshDerived(); + toolResultsInFlightRef.current = true; + await processConversation( + [ + { type: "approval", approvals: allResults }, + ...queuedMessagesToAppend.map((msg) => ({ + type: "message" as const, + role: "user" as const, + content: msg as unknown as MessageCreate["content"], + })), + ], + { allowReentry: true }, + ); + toolResultsInFlightRef.current = false; + return; + } + + // Cancel mode - queue results and let dequeue effect handle if (waitingForQueueCancelRef.current) { // Queue results - dequeue effect will pick them up via onSubmit if (allResults.length > 0) { @@ -2792,7 +2848,6 @@ export default function App({ return; } - // Rotate to a new thinking message setThinkingMessage(getRandomThinkingVerb()); refreshDerived(); @@ -3393,6 +3448,7 @@ export default function App({ updateStreamingOutput, needsEagerApprovalCheck, queueApprovalResults, + consumeQueuedMessages, ], ); @@ -4382,64 +4438,49 @@ export default function App({ setMessageQueue((prev) => { const newQueue = [...prev, msg]; - // For slash commands, just queue and wait - don't interrupt the agent. - // For regular messages, cancel the stream so the new message can be sent. const isSlashCommand = msg.startsWith("/"); + // Regular messages: use append mode (wait 15s for tools, then append to API call) if ( !isSlashCommand && streamingRef.current && - !waitingForQueueCancelRef.current + !waitingForQueueCancelRef.current && + !queueAppendTimeoutRef.current ) { - waitingForQueueCancelRef.current = true; - queueSnapshotRef.current = [...newQueue]; - debugLog( - "queue", - `Initiating queue-cancel: queueing "${msg.slice(0, 50)}${msg.length > 50 ? "..." : ""}", sending cancel to server`, - ); - - // Abort client-side tool execution if in progress - // This makes tool interruption visible immediately instead of waiting for completion - if (toolAbortControllerRef.current) { - toolAbortControllerRef.current.abort(); - } - - // Send cancel request to backend (fire-and-forget) - getClient() - .then((client) => { - // Use agents API for "default" conversation (primary message history) - if (conversationIdRef.current === "default") { - return client.agents.messages.cancel(agentIdRef.current); - } - return client.conversations.cancel(conversationIdRef.current); - }) - .then(() => {}) - .catch(() => { - // Reset flag if cancel fails - waitingForQueueCancelRef.current = false; - }); - - // Timeout fallback: if server cancel is slow or fails, abort client-side - // after 3 seconds to prevent "Thinking..." from hanging forever - setTimeout(() => { - // Only abort if we're still waiting AND stream is still active - // (If stream ended naturally or user pressed ESC, these will be false/null) - if ( - waitingForQueueCancelRef.current && - abortControllerRef.current - ) { - debugLog( - "queue", - "Timeout fallback: aborting stream after 3s (server cancel was slow/failed)", - ); - abortControllerRef.current.abort(); - - // Reset flags here because the abort may cause early returns - // in processConversation that skip the completion handlers - waitingForQueueCancelRef.current = false; - queueSnapshotRef.current = []; + queueAppendTimeoutRef.current = setTimeout(() => { + if (messageQueueRef.current.length === 0) { + queueAppendTimeoutRef.current = null; + return; } - }, 3000); + queueAppendTimeoutRef.current = null; + + // 15s expired - fall back to cancel + waitingForQueueCancelRef.current = true; + queueSnapshotRef.current = [...messageQueueRef.current]; + if (toolAbortControllerRef.current) { + toolAbortControllerRef.current.abort(); + } + getClient() + .then((client) => { + if (conversationIdRef.current === "default") { + return client.agents.messages.cancel(agentIdRef.current); + } + return client.conversations.cancel(conversationIdRef.current); + }) + .catch(() => { + waitingForQueueCancelRef.current = false; + }); + setTimeout(() => { + if ( + waitingForQueueCancelRef.current && + abortControllerRef.current + ) { + abortControllerRef.current.abort(); + waitingForQueueCancelRef.current = false; + queueSnapshotRef.current = []; + } + }, 3000); + }, 15000); } return newQueue; @@ -7153,14 +7194,29 @@ ${SYSTEM_REMINDER_CLOSE} waitingForQueueCancelRef.current = false; queueSnapshotRef.current = []; } else { - // Continue conversation with all results + const queuedMessagesToAppend = consumeQueuedMessages(); + const input: Array = [ + { type: "approval", approvals: allResults as ApprovalResult[] }, + ]; + if (queuedMessagesToAppend?.length) { + for (const msg of queuedMessagesToAppend) { + const userId = uid("user"); + buffersRef.current.byId.set(userId, { + kind: "user", + id: userId, + text: msg, + }); + buffersRef.current.order.push(userId); + input.push({ + type: "message", + role: "user", + content: msg as unknown as MessageCreate["content"], + }); + } + refreshDerived(); + } toolResultsInFlightRef.current = true; - await processConversation([ - { - type: "approval", - approvals: allResults as ApprovalResult[], - }, - ]); + await processConversation(input); toolResultsInFlightRef.current = false; // Clear any stale queued results from previous interrupts. @@ -7189,6 +7245,7 @@ ${SYSTEM_REMINDER_CLOSE} setStreaming, updateStreamingOutput, queueApprovalResults, + consumeQueuedMessages, ], );