From 1275bce75295e728cd91d7accb7a8098a418672d Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Thu, 8 Jan 2026 22:56:56 -0800 Subject: [PATCH] feat: add LETTA_DEBUG_TIMINGS env var for request timing diagnostics (#502) Co-authored-by: Letta --- src/agent/client.ts | 3 + src/agent/message.ts | 17 +++- src/cli/helpers/stream.ts | 20 +++++ src/tests/headless-input-format.test.ts | 53 ++++++------ src/tests/headless-stream-json-format.test.ts | 12 +-- src/utils/timing.ts | 86 +++++++++++++++++++ 6 files changed, 160 insertions(+), 31 deletions(-) create mode 100644 src/utils/timing.ts diff --git a/src/agent/client.ts b/src/agent/client.ts index 0092273..eab8bcc 100644 --- a/src/agent/client.ts +++ b/src/agent/client.ts @@ -4,6 +4,7 @@ import packageJson from "../../package.json"; import { LETTA_CLOUD_API_URL, refreshAccessToken } from "../auth/oauth"; import { ensureAnthropicProviderToken } from "../providers/anthropic-provider"; import { settingsManager } from "../settings-manager"; +import { createTimingFetch, isTimingsEnabled } from "../utils/timing"; /** * Get the current Letta server URL from environment or settings. @@ -86,5 +87,7 @@ export async function getClient() { "X-Letta-Source": "letta-code", "User-Agent": `letta-code/${packageJson.version}`, }, + // Use instrumented fetch for timing logs when LETTA_DEBUG_TIMINGS is enabled + ...(isTimingsEnabled() && { fetch: createTimingFetch(fetch) }), }); } diff --git a/src/agent/message.ts b/src/agent/message.ts index 24b9ce7..282da08 100644 --- a/src/agent/message.ts +++ b/src/agent/message.ts @@ -9,8 +9,12 @@ import type { LettaStreamingResponse, } from "@letta-ai/letta-client/resources/agents/messages"; import { getClientToolsFromRegistry } from "../tools/manager"; +import { isTimingsEnabled } from "../utils/timing"; import { getClient } from "./client"; +// Symbol to store timing info on the stream object +export const STREAM_REQUEST_START_TIME = Symbol("streamRequestStartTime"); + export async function sendMessageStream( agentId: string, messages: Array, @@ -23,8 +27,11 @@ export async function sendMessageStream( // so retries would violate idempotency and create race conditions requestOptions: { maxRetries?: number } = { maxRetries: 0 }, ): Promise> { + // Capture request start time for TTFT measurement when timings are enabled + const requestStartTime = isTimingsEnabled() ? performance.now() : undefined; + const client = await getClient(); - return client.agents.messages.create( + const stream = await client.agents.messages.create( agentId, { messages: messages, @@ -35,4 +42,12 @@ export async function sendMessageStream( }, requestOptions, ); + + // Attach start time to stream for TTFT calculation in drainStream + if (requestStartTime !== undefined) { + (stream as unknown as Record)[STREAM_REQUEST_START_TIME] = + requestStartTime; + } + + return stream; } diff --git a/src/cli/helpers/stream.ts b/src/cli/helpers/stream.ts index d4c447a..299ba08 100644 --- a/src/cli/helpers/stream.ts +++ b/src/cli/helpers/stream.ts @@ -3,7 +3,9 @@ import type { Stream } from "@letta-ai/letta-client/core/streaming"; import type { LettaStreamingResponse } from "@letta-ai/letta-client/resources/agents/messages"; import type { StopReasonType } from "@letta-ai/letta-client/resources/runs/runs"; import { getClient } from "../../agent/client"; +import { STREAM_REQUEST_START_TIME } from "../../agent/message"; import { debugWarn } from "../../utils/debug"; +import { formatDuration, logTiming } from "../../utils/timing"; import { type createBuffers, @@ -37,6 +39,12 @@ export async function drainStream( ): Promise { const startTime = performance.now(); + // Extract request start time for TTFT logging (attached by sendMessageStream) + const requestStartTime = ( + stream as unknown as Record + )[STREAM_REQUEST_START_TIME]; + let hasLoggedTTFT = false; + let _approvalRequestId: string | null = null; const pendingApprovals = new Map< string, @@ -129,6 +137,18 @@ export async function drainStream( queueMicrotask(() => onFirstMessage()); } + // Log TTFT (time-to-first-token) when first content chunk arrives + if ( + !hasLoggedTTFT && + requestStartTime !== undefined && + (chunk.message_type === "reasoning_message" || + chunk.message_type === "assistant_message") + ) { + hasLoggedTTFT = true; + const ttft = performance.now() - requestStartTime; + logTiming(`TTFT: ${formatDuration(ttft)} (from POST to first content)`); + } + // Remove tool from pending approvals when it completes (server-side execution finished) // This means the tool was executed server-side and doesn't need approval if (chunk.message_type === "tool_return_message") { diff --git a/src/tests/headless-input-format.test.ts b/src/tests/headless-input-format.test.ts index 297a3eb..69e549b 100644 --- a/src/tests/headless-input-format.test.ts +++ b/src/tests/headless-input-format.test.ts @@ -25,7 +25,7 @@ const FAST_PROMPT = async function runBidirectional( inputs: string[], extraArgs: string[] = [], - timeoutMs = 90000, // Overall timeout for the entire operation + timeoutMs = 180000, // 180s timeout - CI can be very slow ): Promise { return new Promise((resolve, reject) => { const proc = spawn( @@ -238,7 +238,7 @@ describe("input-format stream-json", () => { expect(initResponse?.agent_id).toBeDefined(); } }, - { timeout: 120000 }, + { timeout: 200000 }, ); test( @@ -291,28 +291,33 @@ describe("input-format stream-json", () => { expect(result?.agent_id).toBeDefined(); expect(result?.duration_ms).toBeGreaterThan(0); }, - { timeout: 120000 }, + { timeout: 200000 }, ); test( "multi-turn conversation maintains context", async () => { - const objects = (await runBidirectional([ - JSON.stringify({ - type: "user", - message: { - role: "user", - content: "Say hello", - }, - }), - JSON.stringify({ - type: "user", - message: { - role: "user", - content: "Say goodbye", - }, - }), - ])) as WireMessage[]; + // Multi-turn test needs 2 sequential LLM calls, so allow more time + const objects = (await runBidirectional( + [ + JSON.stringify({ + type: "user", + message: { + role: "user", + content: "Say hello", + }, + }), + JSON.stringify({ + type: "user", + message: { + role: "user", + content: "Say goodbye", + }, + }), + ], + [], // no extra args + 300000, // 300s for 2 sequential LLM calls - CI can be very slow + )) as WireMessage[]; // Should have at least two results (one per turn) const results = objects.filter( @@ -336,7 +341,7 @@ describe("input-format stream-json", () => { expect(firstResult.session_id).toBe(lastResult.session_id); } }, - { timeout: 180000 }, + { timeout: 320000 }, ); test( @@ -358,7 +363,7 @@ describe("input-format stream-json", () => { expect(controlResponse).toBeDefined(); expect(controlResponse?.response.subtype).toBe("success"); }, - { timeout: 120000 }, + { timeout: 200000 }, ); test( @@ -405,7 +410,7 @@ describe("input-format stream-json", () => { expect(result).toBeDefined(); expect(result?.subtype).toBe("success"); }, - { timeout: 120000 }, + { timeout: 200000 }, ); test( @@ -428,7 +433,7 @@ describe("input-format stream-json", () => { expect(controlResponse).toBeDefined(); expect(controlResponse?.response.subtype).toBe("error"); }, - { timeout: 120000 }, + { timeout: 200000 }, ); test( @@ -446,6 +451,6 @@ describe("input-format stream-json", () => { expect(errorMsg).toBeDefined(); expect(errorMsg?.message).toContain("Invalid JSON"); }, - { timeout: 120000 }, + { timeout: 200000 }, ); }); diff --git a/src/tests/headless-stream-json-format.test.ts b/src/tests/headless-stream-json-format.test.ts index fcef111..e21ad5c 100644 --- a/src/tests/headless-stream-json-format.test.ts +++ b/src/tests/headless-stream-json-format.test.ts @@ -14,7 +14,7 @@ import type { async function runHeadlessCommand( prompt: string, extraArgs: string[] = [], - timeoutMs = 90000, // 90s timeout for slow CI environments + timeoutMs = 180000, // 180s timeout - CI can be very slow ): Promise { return new Promise((resolve, reject) => { const proc = spawn( @@ -105,7 +105,7 @@ describe("stream-json format", () => { expect(init.cwd).toBeDefined(); expect(init.uuid).toBe(`init-${init.agent_id}`); }, - { timeout: 120000 }, + { timeout: 200000 }, ); test( @@ -131,7 +131,7 @@ describe("stream-json format", () => { // uuid should be otid or id from the Letta SDK chunk expect(msg.uuid).toBeTruthy(); }, - { timeout: 120000 }, + { timeout: 200000 }, ); test( @@ -156,7 +156,7 @@ describe("stream-json format", () => { expect(result.uuid).toContain("result-"); expect(result.result).toBeDefined(); }, - { timeout: 120000 }, + { timeout: 200000 }, ); test( @@ -183,7 +183,7 @@ describe("stream-json format", () => { // The event should contain the original Letta SDK chunk expect("message_type" in event.event).toBe(true); }, - { timeout: 120000 }, + { timeout: 200000 }, ); test( @@ -217,6 +217,6 @@ describe("stream-json format", () => { }); expect(resultLine).toBeDefined(); }, - { timeout: 120000 }, + { timeout: 200000 }, ); }); diff --git a/src/utils/timing.ts b/src/utils/timing.ts new file mode 100644 index 0000000..1a75e8b --- /dev/null +++ b/src/utils/timing.ts @@ -0,0 +1,86 @@ +// src/utils/timing.ts +// Debug timing utilities - only active when LETTA_DEBUG_TIMINGS env var is set + +/** + * Check if debug timings are enabled via LETTA_DEBUG_TIMINGS env var + * Set LETTA_DEBUG_TIMINGS=1 or LETTA_DEBUG_TIMINGS=true to enable timing logs + */ +export function isTimingsEnabled(): boolean { + const val = process.env.LETTA_DEBUG_TIMINGS; + return val === "1" || val === "true"; +} + +/** + * Format duration nicely: "245ms" or "1.52s" + */ +export function formatDuration(ms: number): string { + if (ms < 1000) return `${Math.round(ms)}ms`; + return `${(ms / 1000).toFixed(2)}s`; +} + +/** + * Format timestamp: "12:34:56.789" + */ +export function formatTimestamp(date: Date): string { + return date.toISOString().slice(11, 23); +} + +/** + * Log timing message to stderr (won't interfere with stdout JSON in headless mode) + */ +export function logTiming(message: string): void { + if (isTimingsEnabled()) { + console.error(`[timing] ${message}`); + } +} + +// Simple fetch type that matches the SDK's expected signature +type SimpleFetch = ( + input: string | URL | Request, + init?: RequestInit, +) => Promise; + +/** + * Create an instrumented fetch that logs timing for every request. + * Logs request start and end (with duration and status) to stderr. + */ +export function createTimingFetch(baseFetch: SimpleFetch): SimpleFetch { + return async (input, init) => { + const start = performance.now(); + const startTime = formatTimestamp(new Date()); + + // Extract method and URL for logging + const url = + typeof input === "string" + ? input + : input instanceof URL + ? input.href + : input.url; + const method = init?.method || "GET"; + + // Parse path from URL, handling potential errors + let path: string; + try { + path = new URL(url).pathname; + } catch { + path = url; + } + + logTiming(`${method} ${path} started at ${startTime}`); + + try { + const response = await baseFetch(input, init); + const duration = performance.now() - start; + logTiming( + `${method} ${path} -> ${formatDuration(duration)} (status: ${response.status})`, + ); + return response; + } catch (error) { + const duration = performance.now() - start; + logTiming( + `${method} ${path} -> FAILED after ${formatDuration(duration)}`, + ); + throw error; + } + }; +}