fix: execute tools against dispatch-time snapshot (#1018)

This commit is contained in:
Charles Packer
2026-02-18 16:49:54 -08:00
committed by GitHub
parent f3aa3291db
commit 96396f023d
8 changed files with 368 additions and 34 deletions

View File

@@ -51,7 +51,7 @@ import {
ensureMemoryFilesystemDirs,
getMemoryFilesystemRoot,
} from "../agent/memoryFilesystem";
import { sendMessageStream } from "../agent/message";
import { getStreamToolContextId, sendMessageStream } from "../agent/message";
import {
getModelInfo,
getModelShortName,
@@ -95,6 +95,7 @@ import {
analyzeToolApproval,
checkToolPermission,
executeTool,
releaseToolExecutionContext,
savePermissionRule,
type ToolExecutionResult,
} from "../tools/manager";
@@ -1570,6 +1571,13 @@ export default function App({
const lastSentInputRef = useRef<Array<MessageCreate | ApprovalCreate> | null>(
null,
);
const approvalToolContextIdRef = useRef<string | null>(null);
const clearApprovalToolContext = useCallback(() => {
const contextId = approvalToolContextIdRef.current;
if (!contextId) return;
approvalToolContextIdRef.current = null;
releaseToolExecutionContext(contextId);
}, []);
// Non-null only when the previous turn was explicitly interrupted by the user.
// Used to gate recovery alert injection to true user-interrupt retries.
const pendingInterruptRecoveryConversationIdRef = useRef<string | null>(null);
@@ -3173,12 +3181,14 @@ export default function App({
// throws before streaming begins, e.g., retry after LLM error when backend
// already cleared the approval)
let stream: Awaited<ReturnType<typeof sendMessageStream>>;
let turnToolContextId: string | null = null;
try {
stream = await sendMessageStream(
conversationIdRef.current,
currentInput,
{ agentId: agentIdRef.current },
);
turnToolContextId = getStreamToolContextId(stream);
} catch (preStreamError) {
// Extract error detail using shared helper (handles nested/direct/message shapes)
const errorDetail = extractConflictDetail(preStreamError);
@@ -3599,6 +3609,7 @@ export default function App({
// Case 1: Turn ended normally
if (stopReasonToHandle === "end_turn") {
clearApprovalToolContext();
setStreaming(false);
const liveElapsedMs = (() => {
const snapshot = sessionStatsRef.current.getTrajectorySnapshot();
@@ -3775,6 +3786,7 @@ export default function App({
// Case 1.5: Stream was cancelled by user
if (stopReasonToHandle === "cancelled") {
clearApprovalToolContext();
setStreaming(false);
closeTrajectorySegment();
syncTrajectoryElapsedBase();
@@ -3824,6 +3836,8 @@ export default function App({
// Case 2: Requires approval
if (stopReasonToHandle === "requires_approval") {
clearApprovalToolContext();
approvalToolContextIdRef.current = turnToolContextId;
// Clear stale state immediately to prevent ID mismatch bugs
setAutoHandledResults([]);
setAutoDeniedApprovals([]);
@@ -3839,6 +3853,7 @@ export default function App({
: [];
if (approvalsToProcess.length === 0) {
clearApprovalToolContext();
appendError(
`Unexpected empty approvals with stop reason: ${stopReason}`,
);
@@ -3851,6 +3866,7 @@ export default function App({
// If in quietCancel mode (user queued messages), auto-reject all approvals
// and send denials + queued messages together
if (waitingForQueueCancelRef.current) {
clearApprovalToolContext();
// Create denial results for all approvals
const denialResults = approvalsToProcess.map((approvalItem) => ({
type: "approval" as const,
@@ -3898,6 +3914,7 @@ export default function App({
userCancelledRef.current ||
abortControllerRef.current?.signal.aborted
) {
clearApprovalToolContext();
setStreaming(false);
closeTrajectorySegment();
syncTrajectoryElapsedBase();
@@ -4034,6 +4051,8 @@ export default function App({
{
abortSignal: autoAllowedAbortController.signal,
onStreamingOutput: updateStreamingOutput,
toolContextId:
approvalToolContextIdRef.current ?? undefined,
},
)
: [];
@@ -4744,6 +4763,7 @@ export default function App({
consumeQueuedMessages,
appendTaskNotificationEvents,
maybeCheckMemoryGitStatus,
clearApprovalToolContext,
openTrajectorySegment,
syncTrajectoryTokenBase,
syncTrajectoryElapsedBase,
@@ -5550,6 +5570,7 @@ export default function App({
{
abortSignal: autoAllowedAbortController.signal,
onStreamingOutput: updateStreamingOutput,
toolContextId: approvalToolContextIdRef.current ?? undefined,
},
);
// Map to ApprovalResult format (ToolReturn)
@@ -8572,6 +8593,8 @@ ${SYSTEM_REMINDER_CLOSE}
{
abortSignal: autoAllowedAbortController.signal,
onStreamingOutput: updateStreamingOutput,
toolContextId:
approvalToolContextIdRef.current ?? undefined,
},
)
: [];
@@ -8816,6 +8839,8 @@ ${SYSTEM_REMINDER_CLOSE}
{
abortSignal: autoAllowedAbortController.signal,
onStreamingOutput: updateStreamingOutput,
toolContextId:
approvalToolContextIdRef.current ?? undefined,
},
)
: [];
@@ -9002,6 +9027,7 @@ ${SYSTEM_REMINDER_CLOSE}
if (
!streaming &&
hasAnythingQueued &&
!queuedOverlayAction && // Prioritize queued model/toolset/system switches before dequeuing messages
pendingApprovals.length === 0 &&
!commandRunning &&
!isExecutingTool &&
@@ -9035,7 +9061,7 @@ ${SYSTEM_REMINDER_CLOSE}
// Log why dequeue was blocked (useful for debugging stuck queues)
debugLog(
"queue",
`Dequeue blocked: streaming=${streaming}, pendingApprovals=${pendingApprovals.length}, commandRunning=${commandRunning}, isExecutingTool=${isExecutingTool}, anySelectorOpen=${anySelectorOpen}, waitingForQueueCancel=${waitingForQueueCancelRef.current}, userCancelled=${userCancelledRef.current}, abortController=${!!abortControllerRef.current}`,
`Dequeue blocked: streaming=${streaming}, queuedOverlayAction=${!!queuedOverlayAction}, pendingApprovals=${pendingApprovals.length}, commandRunning=${commandRunning}, isExecutingTool=${isExecutingTool}, anySelectorOpen=${anySelectorOpen}, waitingForQueueCancel=${waitingForQueueCancelRef.current}, userCancelled=${userCancelledRef.current}, abortController=${!!abortControllerRef.current}`,
);
}
}, [
@@ -9045,6 +9071,7 @@ ${SYSTEM_REMINDER_CLOSE}
commandRunning,
isExecutingTool,
anySelectorOpen,
queuedOverlayAction,
dequeueEpoch, // Triggered when userCancelledRef is reset OR task notifications added
]);
@@ -9155,6 +9182,7 @@ ${SYSTEM_REMINDER_CLOSE}
{
abortSignal: approvalAbortController.signal,
onStreamingOutput: updateStreamingOutput,
toolContextId: approvalToolContextIdRef.current ?? undefined,
},
);
} finally {
@@ -9281,6 +9309,7 @@ ${SYSTEM_REMINDER_CLOSE}
}
} finally {
// Always release the execution guard, even if an error occurred
clearApprovalToolContext();
setIsExecutingTool(false);
toolAbortControllerRef.current = null;
executingToolCallIdsRef.current = [];
@@ -9301,6 +9330,7 @@ ${SYSTEM_REMINDER_CLOSE}
queueApprovalResults,
consumeQueuedMessages,
appendTaskNotificationEvents,
clearApprovalToolContext,
syncTrajectoryElapsedBase,
closeTrajectorySegment,
openTrajectorySegment,
@@ -9488,7 +9518,10 @@ ${SYSTEM_REMINDER_CLOSE}
onChunk(buffersRef.current, chunk);
refreshDerived();
},
{ onStreamingOutput: updateStreamingOutput },
{
onStreamingOutput: updateStreamingOutput,
toolContextId: approvalToolContextIdRef.current ?? undefined,
},
);
// Combine with auto-handled and auto-denied results (from initial check)

View File

@@ -3,7 +3,7 @@ import type { Stream } from "@letta-ai/letta-client/core/streaming";
import type { LettaStreamingResponse } from "@letta-ai/letta-client/resources/agents/messages";
import type { StopReasonType } from "@letta-ai/letta-client/resources/runs/runs";
import { getClient } from "../../agent/client";
import { STREAM_REQUEST_START_TIME } from "../../agent/message";
import { getStreamRequestStartTime } from "../../agent/message";
import { debugWarn } from "../../utils/debug";
import { formatDuration, logTiming } from "../../utils/timing";
@@ -64,11 +64,7 @@ export async function drainStream(
contextTracker?: ContextTracker,
): Promise<DrainResult> {
const startTime = performance.now();
// Extract request start time for TTFT logging (attached by sendMessageStream)
const requestStartTime = (
stream as unknown as Record<symbol, number | undefined>
)[STREAM_REQUEST_START_TIME];
const requestStartTime = getStreamRequestStartTime(stream) ?? startTime;
let hasLoggedTTFT = false;
const streamProcessor = new StreamProcessor();
@@ -146,7 +142,6 @@ export async function drainStream(
// Log TTFT (time-to-first-token) when first content chunk arrives
if (
!hasLoggedTTFT &&
requestStartTime !== undefined &&
(chunk.message_type === "reasoning_message" ||
chunk.message_type === "assistant_message")
) {