Merge branch 'main' of github.com:letta-ai/letta-code
This commit is contained in:
120
src/cli/App.tsx
120
src/cli/App.tsx
@@ -3,6 +3,8 @@
|
||||
import { Letta } from "@letta-ai/letta-client";
|
||||
import { Box, Static } from "ink";
|
||||
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
|
||||
import { getResumeData } from "../agent/check-approval";
|
||||
import { getClient } from "../agent/client";
|
||||
import { sendMessageStream } from "../agent/message";
|
||||
import { SessionStats } from "../agent/stats";
|
||||
import type { ApprovalContext } from "../permissions/analyzer";
|
||||
@@ -50,6 +52,11 @@ import { useTerminalWidth } from "./hooks/useTerminalWidth";
|
||||
|
||||
const CLEAR_SCREEN_AND_HOME = "\u001B[2J\u001B[H";
|
||||
|
||||
// Feature flag: Check for pending approvals before sending messages
|
||||
// This prevents infinite thinking state when there's an orphaned approval
|
||||
// Can be disabled if the latency check adds too much overhead
|
||||
const CHECK_PENDING_APPROVALS_BEFORE_SEND = true;
|
||||
|
||||
// tiny helper for unique ids (avoid overwriting prior user lines)
|
||||
function uid(prefix: string) {
|
||||
return `${prefix}-${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 8)}`;
|
||||
@@ -98,6 +105,9 @@ export default function App({
|
||||
// Whether a stream is in flight (disables input)
|
||||
const [streaming, setStreaming] = useState(false);
|
||||
|
||||
// Whether an interrupt has been requested for the current stream
|
||||
const [interruptRequested, setInterruptRequested] = useState(false);
|
||||
|
||||
// Whether a command is running (disables input but no streaming UI)
|
||||
const [commandRunning, setCommandRunning] = useState(false);
|
||||
|
||||
@@ -145,6 +155,9 @@ export default function App({
|
||||
// Guard to append welcome snapshot only once
|
||||
const welcomeCommittedRef = useRef(false);
|
||||
|
||||
// AbortController for stream cancellation
|
||||
const abortControllerRef = useRef<AbortController | null>(null);
|
||||
|
||||
// Track terminal shrink events to refresh static output (prevents wrapped leftovers)
|
||||
const columns = useTerminalWidth();
|
||||
const prevColumnsRef = useRef(columns);
|
||||
@@ -357,6 +370,7 @@ export default function App({
|
||||
|
||||
try {
|
||||
setStreaming(true);
|
||||
abortControllerRef.current = new AbortController();
|
||||
|
||||
while (true) {
|
||||
// Stream one turn
|
||||
@@ -365,6 +379,7 @@ export default function App({
|
||||
stream,
|
||||
buffersRef.current,
|
||||
refreshDerivedThrottled,
|
||||
abortControllerRef.current.signal,
|
||||
);
|
||||
|
||||
// Track API duration
|
||||
@@ -380,6 +395,13 @@ export default function App({
|
||||
return;
|
||||
}
|
||||
|
||||
// Case 1.5: Stream was cancelled by user
|
||||
if (stopReason === "cancelled") {
|
||||
appendError("Stream interrupted by user");
|
||||
setStreaming(false);
|
||||
return;
|
||||
}
|
||||
|
||||
// Case 2: Requires approval
|
||||
if (stopReason === Letta.StopReasonType.RequiresApproval) {
|
||||
if (!approval) {
|
||||
@@ -487,6 +509,8 @@ export default function App({
|
||||
} catch (e) {
|
||||
appendError(String(e));
|
||||
setStreaming(false);
|
||||
} finally {
|
||||
abortControllerRef.current = null;
|
||||
}
|
||||
},
|
||||
[agentId, appendError, refreshDerived, refreshDerivedThrottled],
|
||||
@@ -500,17 +524,55 @@ export default function App({
|
||||
}, 100);
|
||||
}, []);
|
||||
|
||||
const handleInterrupt = useCallback(async () => {
|
||||
if (!streaming || interruptRequested) return;
|
||||
|
||||
setInterruptRequested(true);
|
||||
try {
|
||||
const client = getClient();
|
||||
|
||||
// Send cancel request to backend
|
||||
await client.agents.messages.cancel(agentId);
|
||||
|
||||
// WORKAROUND: Also abort the stream immediately since backend cancellation is buggy
|
||||
// TODO: Once backend is fixed, comment out the immediate abort below and uncomment the timeout version
|
||||
if (abortControllerRef.current) {
|
||||
abortControllerRef.current.abort();
|
||||
}
|
||||
|
||||
// FUTURE: Use this timeout-based abort once backend properly sends "cancelled" stop reason
|
||||
// This gives the backend 5 seconds to gracefully close the stream before forcing abort
|
||||
// const abortTimeout = setTimeout(() => {
|
||||
// if (abortControllerRef.current) {
|
||||
// abortControllerRef.current.abort();
|
||||
// }
|
||||
// }, 5000);
|
||||
//
|
||||
// // The timeout will be cleared in processConversation's finally block when stream ends
|
||||
} catch (e) {
|
||||
appendError(`Failed to interrupt stream: ${String(e)}`);
|
||||
setInterruptRequested(false);
|
||||
}
|
||||
}, [agentId, streaming, interruptRequested, appendError]);
|
||||
|
||||
// Reset interrupt flag when streaming ends
|
||||
useEffect(() => {
|
||||
if (!streaming) {
|
||||
setInterruptRequested(false);
|
||||
}
|
||||
}, [streaming]);
|
||||
|
||||
const onSubmit = useCallback(
|
||||
async (message?: string) => {
|
||||
async (message?: string): Promise<{ submitted: boolean }> => {
|
||||
const msg = message?.trim() ?? "";
|
||||
if (!msg || streaming || commandRunning) return;
|
||||
if (!msg || streaming || commandRunning) return { submitted: false };
|
||||
|
||||
// Handle commands (messages starting with "/")
|
||||
if (msg.startsWith("/")) {
|
||||
// Special handling for /model command - opens selector
|
||||
if (msg.trim() === "/model") {
|
||||
setModelSelectorOpen(true);
|
||||
return;
|
||||
return { submitted: true };
|
||||
}
|
||||
|
||||
// Special handling for /agent command - show agent link
|
||||
@@ -527,13 +589,13 @@ export default function App({
|
||||
});
|
||||
buffersRef.current.order.push(cmdId);
|
||||
refreshDerived();
|
||||
return;
|
||||
return { submitted: true };
|
||||
}
|
||||
|
||||
// Special handling for /exit command - show stats and exit
|
||||
if (msg.trim() === "/exit") {
|
||||
handleExit();
|
||||
return;
|
||||
return { submitted: true };
|
||||
}
|
||||
|
||||
// Special handling for /stream command - toggle and save
|
||||
@@ -587,7 +649,7 @@ export default function App({
|
||||
// Unlock input
|
||||
setCommandRunning(false);
|
||||
}
|
||||
return;
|
||||
return { submitted: true };
|
||||
}
|
||||
|
||||
// Immediately add command to transcript with "running" phase
|
||||
@@ -634,7 +696,7 @@ export default function App({
|
||||
// Unlock input
|
||||
setCommandRunning(false);
|
||||
}
|
||||
return; // Don't send commands to Letta agent
|
||||
return { submitted: true }; // Don't send commands to Letta agent
|
||||
}
|
||||
|
||||
// Build message content from display value (handles placeholders for text/images)
|
||||
@@ -652,7 +714,7 @@ export default function App({
|
||||
]
|
||||
: contentParts;
|
||||
|
||||
// Append the user message to transcript (keep placeholders as-is for display)
|
||||
// Append the user message to transcript IMMEDIATELY (optimistic update)
|
||||
const userId = uid("user");
|
||||
buffersRef.current.byId.set(userId, {
|
||||
kind: "user",
|
||||
@@ -665,8 +727,46 @@ export default function App({
|
||||
buffersRef.current.tokenCount = 0;
|
||||
// Rotate to a new thinking message for this turn
|
||||
setThinkingMessage(getRandomThinkingMessage());
|
||||
// Show streaming state immediately for responsiveness
|
||||
setStreaming(true);
|
||||
refreshDerived();
|
||||
|
||||
// Check for pending approvals before sending message
|
||||
if (CHECK_PENDING_APPROVALS_BEFORE_SEND) {
|
||||
try {
|
||||
const client = getClient();
|
||||
const { pendingApproval: existingApproval } = await getResumeData(
|
||||
client,
|
||||
agentId,
|
||||
);
|
||||
|
||||
if (existingApproval) {
|
||||
// There's a pending approval - show it and DON'T send the message yet
|
||||
// The message will be restored to the input field for the user to decide
|
||||
// Note: The user message is already in the transcript (optimistic update)
|
||||
setStreaming(false); // Stop streaming indicator
|
||||
setPendingApproval(existingApproval);
|
||||
|
||||
// Analyze approval context
|
||||
const parsedArgs = safeJsonParseOr<Record<string, unknown>>(
|
||||
existingApproval.toolArgs,
|
||||
{},
|
||||
);
|
||||
const context = await analyzeToolApproval(
|
||||
existingApproval.toolName,
|
||||
parsedArgs,
|
||||
);
|
||||
setApprovalContext(context);
|
||||
|
||||
// Return false = message NOT submitted, will be restored to input
|
||||
return { submitted: false };
|
||||
}
|
||||
} catch (error) {
|
||||
// If check fails, proceed anyway (don't block user)
|
||||
console.error("Failed to check pending approvals:", error);
|
||||
}
|
||||
}
|
||||
|
||||
// Start the conversation loop
|
||||
await processConversation([
|
||||
{
|
||||
@@ -677,6 +777,8 @@ export default function App({
|
||||
|
||||
// Clean up placeholders after submission
|
||||
clearPlaceholdersInText(msg);
|
||||
|
||||
return { submitted: true };
|
||||
},
|
||||
[
|
||||
streaming,
|
||||
@@ -1114,6 +1216,8 @@ export default function App({
|
||||
permissionMode={uiPermissionMode}
|
||||
onPermissionModeChange={setUiPermissionMode}
|
||||
onExit={handleExit}
|
||||
onInterrupt={handleInterrupt}
|
||||
interruptRequested={interruptRequested}
|
||||
/>
|
||||
|
||||
{/* Model Selector - conditionally mounted as overlay */}
|
||||
|
||||
@@ -12,7 +12,7 @@ import { PasteAwareTextInput } from "./PasteAwareTextInput";
|
||||
import { ShimmerText } from "./ShimmerText";
|
||||
|
||||
// Type assertion for ink-spinner compatibility
|
||||
const Spinner = SpinnerLib as ComponentType;
|
||||
const Spinner = SpinnerLib as ComponentType<{ type?: string }>;
|
||||
|
||||
// Only show token count when it exceeds this threshold
|
||||
const COUNTER_VISIBLE_THRESHOLD = 1000;
|
||||
@@ -27,16 +27,20 @@ export function Input({
|
||||
permissionMode: externalMode,
|
||||
onPermissionModeChange,
|
||||
onExit,
|
||||
onInterrupt,
|
||||
interruptRequested = false,
|
||||
}: {
|
||||
visible?: boolean;
|
||||
streaming: boolean;
|
||||
commandRunning?: boolean;
|
||||
tokenCount: number;
|
||||
thinkingMessage: string;
|
||||
onSubmit: (message?: string) => void;
|
||||
onSubmit: (message?: string) => Promise<{ submitted: boolean }>;
|
||||
permissionMode?: PermissionMode;
|
||||
onPermissionModeChange?: (mode: PermissionMode) => void;
|
||||
onExit?: () => void;
|
||||
onInterrupt?: () => void;
|
||||
interruptRequested?: boolean;
|
||||
}) {
|
||||
const [value, setValue] = useState("");
|
||||
const [escapePressed, setEscapePressed] = useState(false);
|
||||
@@ -62,22 +66,30 @@ export function Input({
|
||||
const columns = useTerminalWidth();
|
||||
const contentWidth = Math.max(0, columns - 2);
|
||||
|
||||
// Handle escape key for double-escape-to-clear
|
||||
// Handle escape key for interrupt (when streaming) or double-escape-to-clear (when not)
|
||||
useInput((_input, key) => {
|
||||
if (key.escape && value) {
|
||||
// Only work when input is non-empty
|
||||
if (escapePressed) {
|
||||
// Second escape - clear input
|
||||
setValue("");
|
||||
setEscapePressed(false);
|
||||
if (escapeTimerRef.current) clearTimeout(escapeTimerRef.current);
|
||||
} else {
|
||||
// First escape - start 1-second timer
|
||||
setEscapePressed(true);
|
||||
if (escapeTimerRef.current) clearTimeout(escapeTimerRef.current);
|
||||
escapeTimerRef.current = setTimeout(() => {
|
||||
if (key.escape) {
|
||||
// When streaming, use Esc to interrupt
|
||||
if (streaming && onInterrupt && !interruptRequested) {
|
||||
onInterrupt();
|
||||
return;
|
||||
}
|
||||
|
||||
// When input is non-empty, use double-escape to clear
|
||||
if (value) {
|
||||
if (escapePressed) {
|
||||
// Second escape - clear input
|
||||
setValue("");
|
||||
setEscapePressed(false);
|
||||
}, 1000);
|
||||
if (escapeTimerRef.current) clearTimeout(escapeTimerRef.current);
|
||||
} else {
|
||||
// First escape - start 1-second timer
|
||||
setEscapePressed(true);
|
||||
if (escapeTimerRef.current) clearTimeout(escapeTimerRef.current);
|
||||
escapeTimerRef.current = setTimeout(() => {
|
||||
setEscapePressed(false);
|
||||
}, 1000);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -159,12 +171,17 @@ export function Input({
|
||||
return () => clearInterval(id);
|
||||
}, [streaming, thinkingMessage]);
|
||||
|
||||
const handleSubmit = () => {
|
||||
const handleSubmit = async () => {
|
||||
if (streaming || commandRunning) {
|
||||
return;
|
||||
}
|
||||
onSubmit(value);
|
||||
setValue("");
|
||||
const previousValue = value;
|
||||
setValue(""); // Clear immediately for responsiveness
|
||||
const result = await onSubmit(previousValue);
|
||||
// If message was NOT submitted (e.g. pending approval), restore it
|
||||
if (!result.submitted) {
|
||||
setValue(previousValue);
|
||||
}
|
||||
};
|
||||
|
||||
// Get display name and color for permission mode
|
||||
@@ -212,7 +229,12 @@ export function Input({
|
||||
message={thinkingMessage}
|
||||
shimmerOffset={shimmerOffset}
|
||||
/>
|
||||
{shouldShowTokenCount && <Text dimColor> ({tokenCount} ↑)</Text>}
|
||||
<Text dimColor>
|
||||
{" ("}
|
||||
{interruptRequested ? "interrupting" : "esc to interrupt"}
|
||||
{shouldShowTokenCount && ` · ${tokenCount} ↑`}
|
||||
{")"}
|
||||
</Text>
|
||||
</Box>
|
||||
</Box>
|
||||
)}
|
||||
|
||||
@@ -150,6 +150,26 @@ export function markCurrentLineAsFinished(b: Buffers) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark any incomplete tool calls as cancelled when stream is interrupted.
|
||||
* This prevents blinking tool calls from staying in progress state.
|
||||
*/
|
||||
export function markIncompleteToolsAsCancelled(b: Buffers) {
|
||||
for (const [id, line] of b.byId.entries()) {
|
||||
if (line.kind === "tool_call" && line.phase !== "finished") {
|
||||
const updatedLine = {
|
||||
...line,
|
||||
phase: "finished" as const,
|
||||
resultOk: false,
|
||||
resultText: "Interrupted by user",
|
||||
};
|
||||
b.byId.set(id, updatedLine);
|
||||
}
|
||||
}
|
||||
// Also mark any streaming assistant/reasoning lines as finished
|
||||
markCurrentLineAsFinished(b);
|
||||
}
|
||||
|
||||
type ToolCallLine = Extract<Line, { kind: "tool_call" }>;
|
||||
|
||||
// Flatten common SDK "parts" → text
|
||||
|
||||
@@ -2,6 +2,7 @@ import { Letta } from "@letta-ai/letta-client";
|
||||
import {
|
||||
type createBuffers,
|
||||
markCurrentLineAsFinished,
|
||||
markIncompleteToolsAsCancelled,
|
||||
onChunk,
|
||||
} from "./accumulator";
|
||||
|
||||
@@ -23,6 +24,7 @@ export async function drainStream(
|
||||
stream: AsyncIterable<Letta.LettaStreamingResponse>,
|
||||
buffers: ReturnType<typeof createBuffers>,
|
||||
refresh: () => void,
|
||||
abortSignal?: AbortSignal,
|
||||
): Promise<DrainResult> {
|
||||
const startTime = performance.now();
|
||||
|
||||
@@ -36,6 +38,14 @@ export async function drainStream(
|
||||
let lastSeqId: number | null = null;
|
||||
|
||||
for await (const chunk of stream) {
|
||||
// Check if stream was aborted
|
||||
if (abortSignal?.aborted) {
|
||||
stopReason = "cancelled" as Letta.StopReasonType;
|
||||
// Mark incomplete tool calls as cancelled to prevent stuck blinking UI
|
||||
markIncompleteToolsAsCancelled(buffers);
|
||||
queueMicrotask(refresh);
|
||||
break;
|
||||
}
|
||||
// Store the runId and seqId to re-connect if stream is interrupted
|
||||
if ("runId" in chunk && "seqId" in chunk && chunk.runId && chunk.seqId) {
|
||||
lastRunId = chunk.runId;
|
||||
|
||||
Reference in New Issue
Block a user