feat: message queueing [LET-4669] (#171)

This commit is contained in:
Christina Tong
2025-12-10 11:08:55 -08:00
committed by GitHub
parent 9fb0b68a40
commit b6c387b9ec
4 changed files with 146 additions and 23 deletions

View File

@@ -2,14 +2,12 @@
import { existsSync, readFileSync } from "node:fs"; import { existsSync, readFileSync } from "node:fs";
import { APIError, APIUserAbortError } from "@letta-ai/letta-client/core/error"; import { APIError, APIUserAbortError } from "@letta-ai/letta-client/core/error";
import type { Stream } from "@letta-ai/letta-client/core/streaming";
import type { import type {
AgentState, AgentState,
MessageCreate, MessageCreate,
} from "@letta-ai/letta-client/resources/agents/agents"; } from "@letta-ai/letta-client/resources/agents/agents";
import type { import type {
ApprovalCreate, ApprovalCreate,
LettaStreamingResponse,
Message, Message,
} from "@letta-ai/letta-client/resources/agents/messages"; } from "@letta-ai/letta-client/resources/agents/messages";
import type { LlmConfig } from "@letta-ai/letta-client/resources/models/models"; import type { LlmConfig } from "@letta-ai/letta-client/resources/models/models";
@@ -33,28 +31,21 @@ import {
savePermissionRule, savePermissionRule,
} from "../tools/manager"; } from "../tools/manager";
import { AgentSelector } from "./components/AgentSelector"; import { AgentSelector } from "./components/AgentSelector";
// import { ApprovalDialog } from "./components/ApprovalDialog";
import { ApprovalDialog } from "./components/ApprovalDialogRich"; import { ApprovalDialog } from "./components/ApprovalDialogRich";
// import { AssistantMessage } from "./components/AssistantMessage";
import { AssistantMessage } from "./components/AssistantMessageRich"; import { AssistantMessage } from "./components/AssistantMessageRich";
import { CommandMessage } from "./components/CommandMessage"; import { CommandMessage } from "./components/CommandMessage";
import { EnterPlanModeDialog } from "./components/EnterPlanModeDialog"; import { EnterPlanModeDialog } from "./components/EnterPlanModeDialog";
// import { ErrorMessage } from "./components/ErrorMessage";
import { ErrorMessage } from "./components/ErrorMessageRich"; import { ErrorMessage } from "./components/ErrorMessageRich";
// import { Input } from "./components/Input";
import { Input } from "./components/InputRich"; import { Input } from "./components/InputRich";
import { ModelSelector } from "./components/ModelSelector"; import { ModelSelector } from "./components/ModelSelector";
import { PlanModeDialog } from "./components/PlanModeDialog"; import { PlanModeDialog } from "./components/PlanModeDialog";
import { QuestionDialog } from "./components/QuestionDialog"; import { QuestionDialog } from "./components/QuestionDialog";
// import { ReasoningMessage } from "./components/ReasoningMessage";
import { ReasoningMessage } from "./components/ReasoningMessageRich"; import { ReasoningMessage } from "./components/ReasoningMessageRich";
import { SessionStats as SessionStatsComponent } from "./components/SessionStats"; import { SessionStats as SessionStatsComponent } from "./components/SessionStats";
import { StatusMessage } from "./components/StatusMessage"; import { StatusMessage } from "./components/StatusMessage";
import { SystemPromptSelector } from "./components/SystemPromptSelector"; import { SystemPromptSelector } from "./components/SystemPromptSelector";
// import { ToolCallMessage } from "./components/ToolCallMessage";
import { ToolCallMessage } from "./components/ToolCallMessageRich"; import { ToolCallMessage } from "./components/ToolCallMessageRich";
import { ToolsetSelector } from "./components/ToolsetSelector"; import { ToolsetSelector } from "./components/ToolsetSelector";
// import { UserMessage } from "./components/UserMessage";
import { UserMessage } from "./components/UserMessageRich"; import { UserMessage } from "./components/UserMessageRich";
import { WelcomeScreen } from "./components/WelcomeScreen"; import { WelcomeScreen } from "./components/WelcomeScreen";
import { import {
@@ -365,6 +356,9 @@ export default function App({
// Track if user wants to cancel (persists across state updates) // Track if user wants to cancel (persists across state updates)
const userCancelledRef = useRef(false); const userCancelledRef = useRef(false);
// Message queue state for queueing messages during streaming
const [messageQueue, setMessageQueue] = useState<string[]>([]);
// Track terminal shrink events to refresh static output (prevents wrapped leftovers) // Track terminal shrink events to refresh static output (prevents wrapped leftovers)
const columns = useTerminalWidth(); const columns = useTerminalWidth();
const prevColumnsRef = useRef(columns); const prevColumnsRef = useRef(columns);
@@ -938,12 +932,19 @@ export default function App({
}, 100); }, 100);
}, []); }, []);
// Handler when user presses UP/ESC to load queue into input for editing
const handleEnterQueueEditMode = useCallback(() => {
setMessageQueue([]);
}, []);
const handleInterrupt = useCallback(async () => { const handleInterrupt = useCallback(async () => {
// If we're executing client-side tools, abort them locally instead of hitting the backend // If we're executing client-side tools, abort them locally instead of hitting the backend
if (isExecutingTool && toolAbortControllerRef.current) { if (isExecutingTool && toolAbortControllerRef.current) {
toolAbortControllerRef.current.abort(); toolAbortControllerRef.current.abort();
setStreaming(false); setStreaming(false);
setIsExecutingTool(false); setIsExecutingTool(false);
appendError("Stream interrupted by user");
refreshDerived();
return; return;
} }
@@ -997,6 +998,12 @@ export default function App({
refreshDerived, refreshDerived,
]); ]);
// Keep ref to latest processConversation to avoid circular deps in useEffect
const processConversationRef = useRef(processConversation);
useEffect(() => {
processConversationRef.current = processConversation;
}, [processConversation]);
// Reset interrupt flag when streaming ends // Reset interrupt flag when streaming ends
useEffect(() => { useEffect(() => {
if (!streaming) { if (!streaming) {
@@ -1007,10 +1014,26 @@ export default function App({
const onSubmit = useCallback( const onSubmit = useCallback(
async (message?: string): Promise<{ submitted: boolean }> => { async (message?: string): Promise<{ submitted: boolean }> => {
const msg = message?.trim() ?? ""; const msg = message?.trim() ?? "";
// Block submission while a stream is in flight, a command is running, or an approval batch if (!msg) return { submitted: false };
// is currently executing tools (prevents re-surfacing pending approvals mid-execution).
if (!msg || streaming || commandRunning || isExecutingTool) // Block submission if waiting for explicit user action (approvals)
// In this case, input is hidden anyway, so this shouldn't happen
if (pendingApprovals.length > 0) {
return { submitted: false }; return { submitted: false };
}
// Queue message if agent is busy (streaming, executing tool, or running command)
// This allows messages to queue up while agent is working
const agentBusy = streaming || isExecutingTool || commandRunning;
if (agentBusy) {
setMessageQueue((prev) => [...prev, msg]);
return { submitted: true }; // Clears input
}
// Reset cancellation flag when starting new submission
// This ensures that after an interrupt, new messages can be sent
userCancelledRef.current = false;
// Handle commands (messages starting with "/") // Handle commands (messages starting with "/")
if (msg.startsWith("/")) { if (msg.startsWith("/")) {
@@ -1919,9 +1942,39 @@ ${recentCommits}
commitEligibleLines, commitEligibleLines,
isExecutingTool, isExecutingTool,
queuedApprovalResults, queuedApprovalResults,
pendingApprovals,
], ],
); );
const onSubmitRef = useRef(onSubmit);
useEffect(() => {
onSubmitRef.current = onSubmit;
}, [onSubmit]);
// Process queued messages when streaming ends
useEffect(() => {
if (
!streaming &&
messageQueue.length > 0 &&
pendingApprovals.length === 0 &&
!commandRunning &&
!isExecutingTool
) {
const [firstMessage, ...rest] = messageQueue;
setMessageQueue(rest);
// Submit the first message using the normal submit flow
// This ensures all setup (reminders, UI updates, etc.) happens correctly
onSubmitRef.current(firstMessage);
}
}, [
streaming,
messageQueue,
pendingApprovals,
commandRunning,
isExecutingTool,
]);
// Helper to send all approval results when done // Helper to send all approval results when done
const sendAllResults = useCallback( const sendAllResults = useCallback(
async ( async (
@@ -2936,7 +2989,6 @@ Plan file path: ${planFilePath}`;
streaming={ streaming={
streaming && !abortControllerRef.current?.signal.aborted streaming && !abortControllerRef.current?.signal.aborted
} }
commandRunning={commandRunning}
tokenCount={tokenCount} tokenCount={tokenCount}
thinkingMessage={thinkingMessage} thinkingMessage={thinkingMessage}
onSubmit={onSubmit} onSubmit={onSubmit}
@@ -2948,6 +3000,8 @@ Plan file path: ${planFilePath}`;
agentId={agentId} agentId={agentId}
agentName={agentName} agentName={agentName}
currentModel={currentModelDisplay} currentModel={currentModelDisplay}
messageQueue={messageQueue}
onEnterQueueEditMode={handleEnterQueueEditMode}
/> />
{/* Model Selector - conditionally mounted as overlay */} {/* Model Selector - conditionally mounted as overlay */}

View File

@@ -11,6 +11,7 @@ import { useTerminalWidth } from "../hooks/useTerminalWidth";
import { colors } from "./colors"; import { colors } from "./colors";
import { InputAssist } from "./InputAssist"; import { InputAssist } from "./InputAssist";
import { PasteAwareTextInput } from "./PasteAwareTextInput"; import { PasteAwareTextInput } from "./PasteAwareTextInput";
import { QueuedMessages } from "./QueuedMessages";
import { ShimmerText } from "./ShimmerText"; import { ShimmerText } from "./ShimmerText";
// Type assertion for ink-spinner compatibility // Type assertion for ink-spinner compatibility
@@ -23,7 +24,6 @@ const COUNTER_VISIBLE_THRESHOLD = 1000;
export function Input({ export function Input({
visible = true, visible = true,
streaming, streaming,
commandRunning = false,
tokenCount, tokenCount,
thinkingMessage, thinkingMessage,
onSubmit, onSubmit,
@@ -35,10 +35,11 @@ export function Input({
agentId, agentId,
agentName, agentName,
currentModel, currentModel,
messageQueue,
onEnterQueueEditMode,
}: { }: {
visible?: boolean; visible?: boolean;
streaming: boolean; streaming: boolean;
commandRunning?: boolean;
tokenCount: number; tokenCount: number;
thinkingMessage: string; thinkingMessage: string;
onSubmit: (message?: string) => Promise<{ submitted: boolean }>; onSubmit: (message?: string) => Promise<{ submitted: boolean }>;
@@ -50,6 +51,8 @@ export function Input({
agentId?: string; agentId?: string;
agentName?: string | null; agentName?: string | null;
currentModel?: string | null; currentModel?: string | null;
messageQueue?: string[];
onEnterQueueEditMode?: () => void;
}) { }) {
const [value, setValue] = useState(""); const [value, setValue] = useState("");
const [escapePressed, setEscapePressed] = useState(false); const [escapePressed, setEscapePressed] = useState(false);
@@ -119,6 +122,16 @@ export function Input({
// When streaming, use Esc to interrupt // When streaming, use Esc to interrupt
if (streaming && onInterrupt && !interruptRequested) { if (streaming && onInterrupt && !interruptRequested) {
onInterrupt(); onInterrupt();
// If there are queued messages, load them into the input box
if (messageQueue && messageQueue.length > 0) {
const queueText = messageQueue.join("\n");
setValue(queueText);
// Signal to App.tsx to clear the queue
if (onEnterQueueEditMode) {
onEnterQueueEditMode();
}
}
return; return;
} }
@@ -226,7 +239,7 @@ export function Input({
} }
// On first wrapped line // On first wrapped line
// First press: move to start, second press: navigate history // First press: move to start, second press: queue edit or history
if (currentCursorPosition > 0 && !atStartBoundary) { if (currentCursorPosition > 0 && !atStartBoundary) {
// First press - move cursor to start // First press - move cursor to start
setCursorPos(0); setCursorPos(0);
@@ -234,7 +247,25 @@ export function Input({
return; return;
} }
// Second press or already at start - trigger history navigation // Check if we should load queue (streaming with queued messages)
if (
streaming &&
messageQueue &&
messageQueue.length > 0 &&
atStartBoundary
) {
setAtStartBoundary(false);
// Clear the queue and load into input as one multi-line message
const queueText = messageQueue.join("\n");
setValue(queueText);
// Signal to App.tsx to clear the queue
if (onEnterQueueEditMode) {
onEnterQueueEditMode();
}
return;
}
// Otherwise, trigger history navigation
if (history.length === 0) return; if (history.length === 0) return;
setAtStartBoundary(false); // Reset for next time setAtStartBoundary(false); // Reset for next time
@@ -352,9 +383,6 @@ export function Input({
return; return;
} }
if (streaming || commandRunning) {
return;
}
const previousValue = value; const previousValue = value;
// Add to history if not empty and not a duplicate of the last entry // Add to history if not empty and not a duplicate of the last entry
@@ -458,6 +486,11 @@ export function Input({
</Box> </Box>
)} )}
{/* Queue display - show when streaming with queued messages */}
{streaming && messageQueue && messageQueue.length > 0 && (
<QueuedMessages messages={messageQueue} />
)}
<Box flexDirection="column"> <Box flexDirection="column">
{/* Top horizontal divider */} {/* Top horizontal divider */}
<Text dimColor>{horizontalLine}</Text> <Text dimColor>{horizontalLine}</Text>

View File

@@ -0,0 +1,36 @@
import { Box, Text } from "ink";
import { memo } from "react";
interface QueuedMessagesProps {
messages: string[];
}
export const QueuedMessages = memo(({ messages }: QueuedMessagesProps) => {
const maxDisplay = 5;
return (
<Box flexDirection="column" marginBottom={1}>
{messages.slice(0, maxDisplay).map((msg) => (
<Box key={msg} flexDirection="row">
<Box width={2} flexShrink={0}>
<Text dimColor>{">"}</Text>
</Box>
<Box flexGrow={1}>
<Text dimColor>{msg}</Text>
</Box>
</Box>
))}
{messages.length > maxDisplay && (
<Box flexDirection="row">
<Box width={2} flexShrink={0} />
<Box flexGrow={1}>
<Text dimColor>...and {messages.length - maxDisplay} more</Text>
</Box>
</Box>
)}
</Box>
);
});
QueuedMessages.displayName = "QueuedMessages";

View File

@@ -905,9 +905,9 @@ export async function handleHeadlessCommand(
) as Extract<Line, { kind: "tool_call" }> | undefined; ) as Extract<Line, { kind: "tool_call" }> | undefined;
const resultText = const resultText =
(lastAssistant && lastAssistant.text) || lastAssistant?.text ||
(lastReasoning && lastReasoning.text) || lastReasoning?.text ||
(lastToolResult && lastToolResult.resultText) || lastToolResult?.resultText ||
"No assistant response found"; "No assistant response found";
// Output based on format // Output based on format