feat: add LETTA_DEBUG_TIMINGS env var for request timing diagnostics (#502)

Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
Charles Packer
2026-01-08 22:56:56 -08:00
committed by GitHub
parent 2610b4594f
commit 1275bce752
6 changed files with 160 additions and 31 deletions

View File

@@ -4,6 +4,7 @@ import packageJson from "../../package.json";
import { LETTA_CLOUD_API_URL, refreshAccessToken } from "../auth/oauth";
import { ensureAnthropicProviderToken } from "../providers/anthropic-provider";
import { settingsManager } from "../settings-manager";
import { createTimingFetch, isTimingsEnabled } from "../utils/timing";
/**
* Get the current Letta server URL from environment or settings.
@@ -86,5 +87,7 @@ export async function getClient() {
"X-Letta-Source": "letta-code",
"User-Agent": `letta-code/${packageJson.version}`,
},
// Use instrumented fetch for timing logs when LETTA_DEBUG_TIMINGS is enabled
...(isTimingsEnabled() && { fetch: createTimingFetch(fetch) }),
});
}

View File

@@ -9,8 +9,12 @@ import type {
LettaStreamingResponse,
} from "@letta-ai/letta-client/resources/agents/messages";
import { getClientToolsFromRegistry } from "../tools/manager";
import { isTimingsEnabled } from "../utils/timing";
import { getClient } from "./client";
// Symbol to store timing info on the stream object
export const STREAM_REQUEST_START_TIME = Symbol("streamRequestStartTime");
export async function sendMessageStream(
agentId: string,
messages: Array<MessageCreate | ApprovalCreate>,
@@ -23,8 +27,11 @@ export async function sendMessageStream(
// so retries would violate idempotency and create race conditions
requestOptions: { maxRetries?: number } = { maxRetries: 0 },
): Promise<Stream<LettaStreamingResponse>> {
// Capture request start time for TTFT measurement when timings are enabled
const requestStartTime = isTimingsEnabled() ? performance.now() : undefined;
const client = await getClient();
return client.agents.messages.create(
const stream = await client.agents.messages.create(
agentId,
{
messages: messages,
@@ -35,4 +42,12 @@ export async function sendMessageStream(
},
requestOptions,
);
// Attach start time to stream for TTFT calculation in drainStream
if (requestStartTime !== undefined) {
(stream as unknown as Record<symbol, number>)[STREAM_REQUEST_START_TIME] =
requestStartTime;
}
return stream;
}

View File

@@ -3,7 +3,9 @@ import type { Stream } from "@letta-ai/letta-client/core/streaming";
import type { LettaStreamingResponse } from "@letta-ai/letta-client/resources/agents/messages";
import type { StopReasonType } from "@letta-ai/letta-client/resources/runs/runs";
import { getClient } from "../../agent/client";
import { STREAM_REQUEST_START_TIME } from "../../agent/message";
import { debugWarn } from "../../utils/debug";
import { formatDuration, logTiming } from "../../utils/timing";
import {
type createBuffers,
@@ -37,6 +39,12 @@ export async function drainStream(
): Promise<DrainResult> {
const startTime = performance.now();
// Extract request start time for TTFT logging (attached by sendMessageStream)
const requestStartTime = (
stream as unknown as Record<symbol, number | undefined>
)[STREAM_REQUEST_START_TIME];
let hasLoggedTTFT = false;
let _approvalRequestId: string | null = null;
const pendingApprovals = new Map<
string,
@@ -129,6 +137,18 @@ export async function drainStream(
queueMicrotask(() => onFirstMessage());
}
// Log TTFT (time-to-first-token) when first content chunk arrives
if (
!hasLoggedTTFT &&
requestStartTime !== undefined &&
(chunk.message_type === "reasoning_message" ||
chunk.message_type === "assistant_message")
) {
hasLoggedTTFT = true;
const ttft = performance.now() - requestStartTime;
logTiming(`TTFT: ${formatDuration(ttft)} (from POST to first content)`);
}
// Remove tool from pending approvals when it completes (server-side execution finished)
// This means the tool was executed server-side and doesn't need approval
if (chunk.message_type === "tool_return_message") {

View File

@@ -25,7 +25,7 @@ const FAST_PROMPT =
async function runBidirectional(
inputs: string[],
extraArgs: string[] = [],
timeoutMs = 90000, // Overall timeout for the entire operation
timeoutMs = 180000, // 180s timeout - CI can be very slow
): Promise<object[]> {
return new Promise((resolve, reject) => {
const proc = spawn(
@@ -238,7 +238,7 @@ describe("input-format stream-json", () => {
expect(initResponse?.agent_id).toBeDefined();
}
},
{ timeout: 120000 },
{ timeout: 200000 },
);
test(
@@ -291,28 +291,33 @@ describe("input-format stream-json", () => {
expect(result?.agent_id).toBeDefined();
expect(result?.duration_ms).toBeGreaterThan(0);
},
{ timeout: 120000 },
{ timeout: 200000 },
);
test(
"multi-turn conversation maintains context",
async () => {
const objects = (await runBidirectional([
JSON.stringify({
type: "user",
message: {
role: "user",
content: "Say hello",
},
}),
JSON.stringify({
type: "user",
message: {
role: "user",
content: "Say goodbye",
},
}),
])) as WireMessage[];
// Multi-turn test needs 2 sequential LLM calls, so allow more time
const objects = (await runBidirectional(
[
JSON.stringify({
type: "user",
message: {
role: "user",
content: "Say hello",
},
}),
JSON.stringify({
type: "user",
message: {
role: "user",
content: "Say goodbye",
},
}),
],
[], // no extra args
300000, // 300s for 2 sequential LLM calls - CI can be very slow
)) as WireMessage[];
// Should have at least two results (one per turn)
const results = objects.filter(
@@ -336,7 +341,7 @@ describe("input-format stream-json", () => {
expect(firstResult.session_id).toBe(lastResult.session_id);
}
},
{ timeout: 180000 },
{ timeout: 320000 },
);
test(
@@ -358,7 +363,7 @@ describe("input-format stream-json", () => {
expect(controlResponse).toBeDefined();
expect(controlResponse?.response.subtype).toBe("success");
},
{ timeout: 120000 },
{ timeout: 200000 },
);
test(
@@ -405,7 +410,7 @@ describe("input-format stream-json", () => {
expect(result).toBeDefined();
expect(result?.subtype).toBe("success");
},
{ timeout: 120000 },
{ timeout: 200000 },
);
test(
@@ -428,7 +433,7 @@ describe("input-format stream-json", () => {
expect(controlResponse).toBeDefined();
expect(controlResponse?.response.subtype).toBe("error");
},
{ timeout: 120000 },
{ timeout: 200000 },
);
test(
@@ -446,6 +451,6 @@ describe("input-format stream-json", () => {
expect(errorMsg).toBeDefined();
expect(errorMsg?.message).toContain("Invalid JSON");
},
{ timeout: 120000 },
{ timeout: 200000 },
);
});

View File

@@ -14,7 +14,7 @@ import type {
async function runHeadlessCommand(
prompt: string,
extraArgs: string[] = [],
timeoutMs = 90000, // 90s timeout for slow CI environments
timeoutMs = 180000, // 180s timeout - CI can be very slow
): Promise<string[]> {
return new Promise((resolve, reject) => {
const proc = spawn(
@@ -105,7 +105,7 @@ describe("stream-json format", () => {
expect(init.cwd).toBeDefined();
expect(init.uuid).toBe(`init-${init.agent_id}`);
},
{ timeout: 120000 },
{ timeout: 200000 },
);
test(
@@ -131,7 +131,7 @@ describe("stream-json format", () => {
// uuid should be otid or id from the Letta SDK chunk
expect(msg.uuid).toBeTruthy();
},
{ timeout: 120000 },
{ timeout: 200000 },
);
test(
@@ -156,7 +156,7 @@ describe("stream-json format", () => {
expect(result.uuid).toContain("result-");
expect(result.result).toBeDefined();
},
{ timeout: 120000 },
{ timeout: 200000 },
);
test(
@@ -183,7 +183,7 @@ describe("stream-json format", () => {
// The event should contain the original Letta SDK chunk
expect("message_type" in event.event).toBe(true);
},
{ timeout: 120000 },
{ timeout: 200000 },
);
test(
@@ -217,6 +217,6 @@ describe("stream-json format", () => {
});
expect(resultLine).toBeDefined();
},
{ timeout: 120000 },
{ timeout: 200000 },
);
});

86
src/utils/timing.ts Normal file
View File

@@ -0,0 +1,86 @@
// src/utils/timing.ts
// Debug timing utilities - only active when LETTA_DEBUG_TIMINGS env var is set
/**
* Check if debug timings are enabled via LETTA_DEBUG_TIMINGS env var
* Set LETTA_DEBUG_TIMINGS=1 or LETTA_DEBUG_TIMINGS=true to enable timing logs
*/
export function isTimingsEnabled(): boolean {
const val = process.env.LETTA_DEBUG_TIMINGS;
return val === "1" || val === "true";
}
/**
* Format duration nicely: "245ms" or "1.52s"
*/
export function formatDuration(ms: number): string {
if (ms < 1000) return `${Math.round(ms)}ms`;
return `${(ms / 1000).toFixed(2)}s`;
}
/**
* Format timestamp: "12:34:56.789"
*/
export function formatTimestamp(date: Date): string {
return date.toISOString().slice(11, 23);
}
/**
* Log timing message to stderr (won't interfere with stdout JSON in headless mode)
*/
export function logTiming(message: string): void {
if (isTimingsEnabled()) {
console.error(`[timing] ${message}`);
}
}
// Simple fetch type that matches the SDK's expected signature
type SimpleFetch = (
input: string | URL | Request,
init?: RequestInit,
) => Promise<Response>;
/**
* Create an instrumented fetch that logs timing for every request.
* Logs request start and end (with duration and status) to stderr.
*/
export function createTimingFetch(baseFetch: SimpleFetch): SimpleFetch {
return async (input, init) => {
const start = performance.now();
const startTime = formatTimestamp(new Date());
// Extract method and URL for logging
const url =
typeof input === "string"
? input
: input instanceof URL
? input.href
: input.url;
const method = init?.method || "GET";
// Parse path from URL, handling potential errors
let path: string;
try {
path = new URL(url).pathname;
} catch {
path = url;
}
logTiming(`${method} ${path} started at ${startTime}`);
try {
const response = await baseFetch(input, init);
const duration = performance.now() - start;
logTiming(
`${method} ${path} -> ${formatDuration(duration)} (status: ${response.status})`,
);
return response;
} catch (error) {
const duration = performance.now() - start;
logTiming(
`${method} ${path} -> FAILED after ${formatDuration(duration)}`,
);
throw error;
}
};
}