feat: implement smoother message queueing (#655)

This commit is contained in:
cthomas
2026-01-23 12:35:43 -08:00
committed by GitHub
parent 4648ef1211
commit 277574c7fe
3 changed files with 122 additions and 64 deletions

View File

@@ -1,10 +1,11 @@
{
"lockfileVersion": 1,
"configVersion": 0,
"workspaces": {
"": {
"name": "@letta-ai/letta-code",
"dependencies": {
"@letta-ai/letta-client": "1.7.5",
"@letta-ai/letta-client": "^1.7.6",
"glob": "^13.0.0",
"ink-link": "^5.0.0",
"open": "^10.2.0",
@@ -90,7 +91,7 @@
"@isaacs/brace-expansion": ["@isaacs/brace-expansion@5.0.0", "", { "dependencies": { "@isaacs/balanced-match": "^4.0.1" } }, "sha512-ZT55BDLV0yv0RBm2czMiZ+SqCGO7AvmOM3G/w2xhVPH+te0aKgFjmBvGlL1dH+ql2tgGO3MVrbb3jCKyvpgnxA=="],
"@letta-ai/letta-client": ["@letta-ai/letta-client@1.7.5", "", {}, "sha512-fyzJ9Bj+8Jf/LGDsPoijwKkddXJl3lII8FDUNkQipV6MQS6vgR+7vrL0QtwMgpwXZr1f47MNb5+Y0O1/TDDsJA=="],
"@letta-ai/letta-client": ["@letta-ai/letta-client@1.7.6", "", {}, "sha512-C/f03uE3TJdgfHk/8rRBxzWvY0YHCYAlrePHcTd0CRHMo++0TA1OTcgiCF+EFVDVYGzfPSeMpqgAZTNvD9r9GQ=="],
"@types/bun": ["@types/bun@1.3.1", "", { "dependencies": { "bun-types": "1.3.1" } }, "sha512-4jNMk2/K9YJtfqwoAa28c8wK+T7nvJFOjxI4h/7sORWcypRNxBpr+TPNaCfVWq70tLCJsqoFwcf0oI0JU/fvMQ=="],

View File

@@ -30,7 +30,7 @@
"access": "public"
},
"dependencies": {
"@letta-ai/letta-client": "1.7.5",
"@letta-ai/letta-client": "^1.7.6",
"glob": "^13.0.0",
"ink-link": "^5.0.0",
"open": "^10.2.0",

View File

@@ -1114,6 +1114,14 @@ export default function App({
};
}, []);
useEffect(() => {
return () => {
if (queueAppendTimeoutRef.current) {
clearTimeout(queueAppendTimeoutRef.current);
}
};
}, []);
// Show exit stats on exit (double Ctrl+C)
const [showExitStats, setShowExitStats] = useState(false);
@@ -1150,7 +1158,11 @@ export default function App({
// Message queue state for queueing messages during streaming
const [messageQueue, setMessageQueue] = useState<string[]>([]);
// Queue cancellation: when any message is queued, we send cancel and wait for stream to end
const messageQueueRef = useRef<string[]>([]); // For synchronous access
useEffect(() => {
messageQueueRef.current = messageQueue;
}, [messageQueue]);
const waitingForQueueCancelRef = useRef(false);
const queueSnapshotRef = useRef<string[]>([]);
const [restoreQueueOnCancel, setRestoreQueueOnCancel] = useState(false);
@@ -1159,6 +1171,8 @@ export default function App({
restoreQueueOnCancelRef.current = restoreQueueOnCancel;
}, [restoreQueueOnCancel]);
const queueAppendTimeoutRef = useRef<NodeJS.Timeout | null>(null); // 15s append mode timeout
// Epoch counter to force dequeue effect re-run when refs change but state doesn't
// Incremented when userCancelledRef is reset while messages are queued
const [dequeueEpoch, setDequeueEpoch] = useState(0);
@@ -1182,6 +1196,18 @@ export default function App({
);
}, [isExecutingTool]);
// Consume queued messages for appending to tool results (clears queue + timeout)
const consumeQueuedMessages = useCallback((): string[] | null => {
if (messageQueueRef.current.length === 0) return null;
if (queueAppendTimeoutRef.current) {
clearTimeout(queueAppendTimeoutRef.current);
queueAppendTimeoutRef.current = null;
}
const messages = [...messageQueueRef.current];
setMessageQueue([]);
return messages;
}, []);
// Helper to wrap async handlers that need to close overlay and lock input
// Closes overlay and sets commandRunning before executing, releases lock in finally
const withCommandLock = useCallback(
@@ -2769,7 +2795,37 @@ export default function App({
return;
}
// Check if user queued messages during auto-allowed tool execution
// Append queued messages if any (from 15s append mode)
const queuedMessagesToAppend = consumeQueuedMessages();
if (queuedMessagesToAppend?.length) {
for (const msg of queuedMessagesToAppend) {
const userId = uid("user");
buffersRef.current.byId.set(userId, {
kind: "user",
id: userId,
text: msg,
});
buffersRef.current.order.push(userId);
}
setThinkingMessage(getRandomThinkingVerb());
refreshDerived();
toolResultsInFlightRef.current = true;
await processConversation(
[
{ type: "approval", approvals: allResults },
...queuedMessagesToAppend.map((msg) => ({
type: "message" as const,
role: "user" as const,
content: msg as unknown as MessageCreate["content"],
})),
],
{ allowReentry: true },
);
toolResultsInFlightRef.current = false;
return;
}
// Cancel mode - queue results and let dequeue effect handle
if (waitingForQueueCancelRef.current) {
// Queue results - dequeue effect will pick them up via onSubmit
if (allResults.length > 0) {
@@ -2792,7 +2848,6 @@ export default function App({
return;
}
// Rotate to a new thinking message
setThinkingMessage(getRandomThinkingVerb());
refreshDerived();
@@ -3393,6 +3448,7 @@ export default function App({
updateStreamingOutput,
needsEagerApprovalCheck,
queueApprovalResults,
consumeQueuedMessages,
],
);
@@ -4382,64 +4438,49 @@ export default function App({
setMessageQueue((prev) => {
const newQueue = [...prev, msg];
// For slash commands, just queue and wait - don't interrupt the agent.
// For regular messages, cancel the stream so the new message can be sent.
const isSlashCommand = msg.startsWith("/");
// Regular messages: use append mode (wait 15s for tools, then append to API call)
if (
!isSlashCommand &&
streamingRef.current &&
!waitingForQueueCancelRef.current
!waitingForQueueCancelRef.current &&
!queueAppendTimeoutRef.current
) {
waitingForQueueCancelRef.current = true;
queueSnapshotRef.current = [...newQueue];
debugLog(
"queue",
`Initiating queue-cancel: queueing "${msg.slice(0, 50)}${msg.length > 50 ? "..." : ""}", sending cancel to server`,
);
// Abort client-side tool execution if in progress
// This makes tool interruption visible immediately instead of waiting for completion
if (toolAbortControllerRef.current) {
toolAbortControllerRef.current.abort();
}
// Send cancel request to backend (fire-and-forget)
getClient()
.then((client) => {
// Use agents API for "default" conversation (primary message history)
if (conversationIdRef.current === "default") {
return client.agents.messages.cancel(agentIdRef.current);
}
return client.conversations.cancel(conversationIdRef.current);
})
.then(() => {})
.catch(() => {
// Reset flag if cancel fails
waitingForQueueCancelRef.current = false;
});
// Timeout fallback: if server cancel is slow or fails, abort client-side
// after 3 seconds to prevent "Thinking..." from hanging forever
setTimeout(() => {
// Only abort if we're still waiting AND stream is still active
// (If stream ended naturally or user pressed ESC, these will be false/null)
if (
waitingForQueueCancelRef.current &&
abortControllerRef.current
) {
debugLog(
"queue",
"Timeout fallback: aborting stream after 3s (server cancel was slow/failed)",
);
abortControllerRef.current.abort();
// Reset flags here because the abort may cause early returns
// in processConversation that skip the completion handlers
waitingForQueueCancelRef.current = false;
queueSnapshotRef.current = [];
queueAppendTimeoutRef.current = setTimeout(() => {
if (messageQueueRef.current.length === 0) {
queueAppendTimeoutRef.current = null;
return;
}
}, 3000);
queueAppendTimeoutRef.current = null;
// 15s expired - fall back to cancel
waitingForQueueCancelRef.current = true;
queueSnapshotRef.current = [...messageQueueRef.current];
if (toolAbortControllerRef.current) {
toolAbortControllerRef.current.abort();
}
getClient()
.then((client) => {
if (conversationIdRef.current === "default") {
return client.agents.messages.cancel(agentIdRef.current);
}
return client.conversations.cancel(conversationIdRef.current);
})
.catch(() => {
waitingForQueueCancelRef.current = false;
});
setTimeout(() => {
if (
waitingForQueueCancelRef.current &&
abortControllerRef.current
) {
abortControllerRef.current.abort();
waitingForQueueCancelRef.current = false;
queueSnapshotRef.current = [];
}
}, 3000);
}, 15000);
}
return newQueue;
@@ -7153,14 +7194,29 @@ ${SYSTEM_REMINDER_CLOSE}
waitingForQueueCancelRef.current = false;
queueSnapshotRef.current = [];
} else {
// Continue conversation with all results
const queuedMessagesToAppend = consumeQueuedMessages();
const input: Array<MessageCreate | ApprovalCreate> = [
{ type: "approval", approvals: allResults as ApprovalResult[] },
];
if (queuedMessagesToAppend?.length) {
for (const msg of queuedMessagesToAppend) {
const userId = uid("user");
buffersRef.current.byId.set(userId, {
kind: "user",
id: userId,
text: msg,
});
buffersRef.current.order.push(userId);
input.push({
type: "message",
role: "user",
content: msg as unknown as MessageCreate["content"],
});
}
refreshDerived();
}
toolResultsInFlightRef.current = true;
await processConversation([
{
type: "approval",
approvals: allResults as ApprovalResult[],
},
]);
await processConversation(input);
toolResultsInFlightRef.current = false;
// Clear any stale queued results from previous interrupts.
@@ -7189,6 +7245,7 @@ ${SYSTEM_REMINDER_CLOSE}
setStreaming,
updateStreamingOutput,
queueApprovalResults,
consumeQueuedMessages,
],
);