diff --git a/src/cli/App.tsx b/src/cli/App.tsx index 1da9267..70146ec 100644 --- a/src/cli/App.tsx +++ b/src/cli/App.tsx @@ -25,6 +25,7 @@ import type { ApprovalContext } from "../permissions/analyzer"; import { permissionMode } from "../permissions/mode"; import { updateProjectSettings } from "../settings"; import { settingsManager } from "../settings-manager"; +import { telemetry } from "../telemetry"; import type { ToolExecutionResult } from "../tools/manager"; import { analyzeToolApproval, @@ -314,6 +315,7 @@ export default function App({ const agentIdRef = useRef(agentId); useEffect(() => { agentIdRef.current = agentId; + telemetry.setCurrentAgentId(agentId); }, [agentId]); const resumeKey = useSuspend(); @@ -473,6 +475,18 @@ export default function App({ // Session stats tracking const sessionStatsRef = useRef(new SessionStats()); + // Wire up session stats to telemetry for safety net handlers + useEffect(() => { + telemetry.setSessionStatsGetter(() => + sessionStatsRef.current.getSnapshot(), + ); + + // Cleanup on unmount (defensive, prevents potential memory leak) + return () => { + telemetry.setSessionStatsGetter(undefined); + }; + }, []); + // Show exit stats on exit (double Ctrl+C) const [showExitStats, setShowExitStats] = useState(false); @@ -1409,6 +1423,25 @@ export default function App({ return; } + // Track error with enhanced context + const errorType = + e instanceof Error ? e.constructor.name : "UnknownError"; + const errorMessage = e instanceof Error ? e.message : String(e); + + // Extract HTTP status code if available (API errors often have this) + const httpStatus = + e && + typeof e === "object" && + "status" in e && + typeof e.status === "number" + ? e.status + : undefined; + + telemetry.trackError(errorType, errorMessage, "message_stream", { + httpStatus, + modelId: currentModelId || undefined, + }); + // Use comprehensive error formatting const errorDetails = formatErrorDetails(e, agentIdRef.current); appendError(errorDetails); @@ -1418,11 +1451,25 @@ export default function App({ abortControllerRef.current = null; } }, - [appendError, refreshDerived, refreshDerivedThrottled, setStreaming], + [ + appendError, + refreshDerived, + refreshDerivedThrottled, + setStreaming, + currentModelId, + ], ); - const handleExit = useCallback(() => { + const handleExit = useCallback(async () => { saveLastAgentBeforeExit(); + + // Track session end explicitly (before exit) with stats + const stats = sessionStatsRef.current.getSnapshot(); + telemetry.trackSessionEnd(stats, "exit_command"); + + // Flush telemetry before exit + await telemetry.flush(); + setShowExitStats(true); // Give React time to render the stats, then exit setTimeout(() => { @@ -1678,6 +1725,9 @@ export default function App({ if (!msg) return { submitted: false }; + // Track user input (agent_id automatically added from telemetry.currentAgentId) + telemetry.trackUserInput(msg, "user", currentModelId || "unknown"); + // Block submission if waiting for explicit user action (approvals) // In this case, input is hidden anyway, so this shouldn't happen if (pendingApprovals.length > 0) { @@ -2000,6 +2050,13 @@ export default function App({ saveLastAgentBeforeExit(); + // Track session end explicitly (before exit) with stats + const stats = sessionStatsRef.current.getSnapshot(); + telemetry.trackSessionEnd(stats, "logout"); + + // Flush telemetry before exit + await telemetry.flush(); + // Exit after a brief delay to show the message setTimeout(() => process.exit(0), 500); } catch (error) { diff --git a/src/index.ts b/src/index.ts index 11f1b5a..9fcf695 100755 --- a/src/index.ts +++ b/src/index.ts @@ -8,6 +8,7 @@ import type { AgentProvenance } from "./agent/create"; import { LETTA_CLOUD_API_URL } from "./auth/oauth"; import { permissionMode } from "./permissions/mode"; import { settingsManager } from "./settings-manager"; +import { telemetry } from "./telemetry"; import { forceUpsertTools, isToolsNotFoundError, @@ -214,6 +215,9 @@ async function main(): Promise { await settingsManager.initialize(); const settings = settingsManager.getSettings(); + // Initialize telemetry (enabled by default, opt-out via LETTA_CODE_TELEM=0) + telemetry.init(); + // Check for updates on startup (non-blocking) const { checkAndAutoUpdate } = await import("./updater/auto-update"); checkAndAutoUpdate().catch(() => { diff --git a/src/telemetry/index.ts b/src/telemetry/index.ts new file mode 100644 index 0000000..f449307 --- /dev/null +++ b/src/telemetry/index.ts @@ -0,0 +1,433 @@ +export interface TelemetryEvent { + type: "session_start" | "session_end" | "tool_usage" | "error" | "user_input"; + timestamp: string; + data: Record; +} + +export interface SessionStartData { + startup_command: string; + version: string; + platform: string; + node_version: string; +} + +export interface SessionEndData { + duration: number; // in seconds + message_count: number; + tool_call_count: number; + exit_reason?: string; // e.g., "exit_command", "logout", "sigint", "process_exit" + total_api_ms?: number; + total_wall_ms?: number; + prompt_tokens?: number; + completion_tokens?: number; + total_tokens?: number; + cached_tokens?: number; + reasoning_tokens?: number; + step_count?: number; +} + +export interface ToolUsageData { + tool_name: string; + success: boolean; + duration: number; + response_length?: number; + error_type?: string; + stderr?: string; +} + +export interface ErrorData { + error_type: string; + error_message: string; + context?: string; + http_status?: number; + model_id?: string; +} + +export interface UserInputData { + input_length: number; + is_command: boolean; + command_name?: string; + message_type: string; + model_id: string; +} + +class TelemetryManager { + private events: TelemetryEvent[] = []; + private sessionId: string; + private currentAgentId: string | null = null; + private sessionStartTime: number; + private messageCount = 0; + private toolCallCount = 0; + private sessionEndTracked = false; + private flushInterval: NodeJS.Timeout | null = null; + private readonly FLUSH_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes + private readonly MAX_BATCH_SIZE = 100; + private sessionStatsGetter?: () => { + totalWallMs: number; + totalApiMs: number; + usage: { + promptTokens: number; + completionTokens: number; + totalTokens: number; + cachedTokens: number; + reasoningTokens: number; + stepCount: number; + }; + }; + + constructor() { + this.sessionId = this.generateSessionId(); + this.sessionStartTime = Date.now(); + } + + private generateSessionId(): string { + return `${Date.now()}-${Math.random().toString(36).substring(2, 15)}`; + } + + /** + * Check if telemetry is enabled based on LETTA_CODE_TELEM env var + * Enabled by default unless explicitly disabled or using self-hosted server + */ + private isTelemetryEnabled(): boolean { + // Check environment variable - must be explicitly set to "0" or "false" to disable + const envValue = process.env.LETTA_CODE_TELEM; + if (envValue === "0" || envValue === "false") { + return false; + } + + // Disable telemetry if using self-hosted server (not api.letta.com) + const baseURL = process.env.LETTA_BASE_URL; + if (baseURL && !baseURL.includes("api.letta.com")) { + return false; + } + + // Disable telemetry if no API key is set + const apiKey = process.env.LETTA_API_KEY; + if (!apiKey) { + return false; + } + + return true; + } + + /** + * Initialize telemetry and start periodic flushing + */ + init() { + if (!this.isTelemetryEnabled()) { + return; + } + + this.trackSessionStart(); + + // Set up periodic flushing + this.flushInterval = setInterval(() => { + this.flush().catch((err) => { + // Silently fail - we don't want telemetry to interfere with user experience + if (process.env.LETTA_DEBUG) { + console.error("Telemetry flush error:", err); + } + }); + }, this.FLUSH_INTERVAL_MS); + + // Don't let the interval prevent process from exiting + this.flushInterval.unref(); + + // Safety net: Handle Ctrl+C interruption + // Note: Normal exits via handleExit flush explicitly + process.on("SIGINT", () => { + try { + this.trackSessionEnd(undefined, "sigint"); + // Fire and forget - try to flush but don't wait (might not complete) + this.flush().catch(() => { + // Silently ignore + }); + } catch { + // Silently ignore - don't prevent process from exiting + } + // Exit immediately - don't wait for flush + process.exit(0); + }); + + // TODO: Add telemetry for crashes and abnormal exits + // Current limitation: We can't reliably flush telemetry on process.on("exit") + // because the event loop is shut down and async operations don't work. + // Potential solution: Write unsent events to ~/.letta/telemetry-queue.json + // and send them on next startup. This would capture crash telemetry without + // risking hangs on exit. + } + + /** + * Track a telemetry event + */ + private track( + type: TelemetryEvent["type"], + data: + | Record + | SessionStartData + | SessionEndData + | ToolUsageData + | ErrorData + | UserInputData, + ) { + if (!this.isTelemetryEnabled()) { + return; + } + + const event: TelemetryEvent = { + type, + timestamp: new Date().toISOString(), + data: { + ...data, + session_id: this.sessionId, + agent_id: this.currentAgentId || undefined, + }, + }; + + this.events.push(event); + + // Flush if batch size is reached + if (this.events.length >= this.MAX_BATCH_SIZE) { + this.flush().catch((err) => { + if (process.env.LETTA_DEBUG) { + console.error("Telemetry flush error:", err); + } + }); + } + } + + /** + * Set the current agent ID (called from App.tsx when agent changes) + * This is automatically added to all telemetry events + */ + setCurrentAgentId(agentId: string | null) { + this.currentAgentId = agentId; + } + + /** + * Set a getter function for session stats (called from App.tsx) + * This allows safety net handlers to access stats even if not explicitly passed + * Pass undefined to clear the getter (for cleanup) + */ + setSessionStatsGetter( + getter?: () => { + totalWallMs: number; + totalApiMs: number; + usage: { + promptTokens: number; + completionTokens: number; + totalTokens: number; + cachedTokens: number; + reasoningTokens: number; + stepCount: number; + }; + }, + ) { + this.sessionStatsGetter = getter; + } + + /** + * Track session start + */ + trackSessionStart() { + // Extract agent ID from startup args if --agent or -a is provided + const args = process.argv.slice(2); + const agentFlagIndex = args.findIndex( + (arg) => arg === "--agent" || arg === "-a", + ); + if (agentFlagIndex !== -1 && agentFlagIndex + 1 < args.length) { + const agentId = args[agentFlagIndex + 1]; + if (agentId) { + this.currentAgentId = agentId; + } + } + + const data: SessionStartData = { + startup_command: args.join(" "), + version: process.env.npm_package_version || "unknown", + platform: process.platform, + node_version: process.version, + }; + this.track("session_start", data); + } + + /** + * Track session end + * @param stats Optional session stats (from sessionStatsRef.current.getSnapshot() in App.tsx) + * @param exitReason Optional reason for exit (e.g., "exit_command", "logout", "sigint", "process_exit") + */ + trackSessionEnd( + stats?: { + totalWallMs: number; + totalApiMs: number; + usage: { + promptTokens: number; + completionTokens: number; + totalTokens: number; + cachedTokens: number; + reasoningTokens: number; + stepCount: number; + }; + }, + exitReason?: string, + ) { + // Prevent double-tracking (can be called from both handleExit and process.on("exit")) + if (this.sessionEndTracked) { + return; + } + this.sessionEndTracked = true; + + // Try to get stats from getter if not provided (for safety net handlers) + let sessionStats = stats; + if (!sessionStats && this.sessionStatsGetter) { + try { + sessionStats = this.sessionStatsGetter(); + } catch { + // Ignore errors - stats will be undefined + } + } + + const duration = Math.floor((Date.now() - this.sessionStartTime) / 1000); + const data: SessionEndData = { + duration, + message_count: this.messageCount, + tool_call_count: this.toolCallCount, + exit_reason: exitReason, + // Include optional stats if available + total_api_ms: sessionStats?.totalApiMs, + total_wall_ms: sessionStats?.totalWallMs, + prompt_tokens: sessionStats?.usage.promptTokens, + completion_tokens: sessionStats?.usage.completionTokens, + total_tokens: sessionStats?.usage.totalTokens, + cached_tokens: sessionStats?.usage.cachedTokens, + reasoning_tokens: sessionStats?.usage.reasoningTokens, + step_count: sessionStats?.usage.stepCount, + }; + this.track("session_end", data); + } + + /** + * Track tool usage + */ + trackToolUsage( + toolName: string, + success: boolean, + duration: number, + responseLength?: number, + errorType?: string, + stderr?: string, + ) { + this.toolCallCount++; + const data: ToolUsageData = { + tool_name: toolName, + success, + duration, + response_length: responseLength, + error_type: errorType, + stderr, + }; + this.track("tool_usage", data); + } + + /** + * Track errors + */ + trackError( + errorType: string, + errorMessage: string, + context?: string, + options?: { + httpStatus?: number; + modelId?: string; + }, + ) { + const data: ErrorData = { + error_type: errorType, + error_message: errorMessage, + context, + http_status: options?.httpStatus, + model_id: options?.modelId, + }; + this.track("error", data); + } + + /** + * Track user input + * Note: agent_id is automatically added from currentAgentId + */ + trackUserInput(input: string, messageType: string, modelId: string) { + this.messageCount++; + + const isCommand = input.trim().startsWith("/"); + const commandName = isCommand ? input.trim().split(/\s+/)[0] : undefined; + + const data: UserInputData = { + input_length: input.length, + is_command: isCommand, + command_name: commandName, + message_type: messageType, + model_id: modelId, + }; + this.track("user_input", data); + } + + /** + * Flush events to the server + */ + async flush(): Promise { + if (this.events.length === 0 || !this.isTelemetryEnabled()) { + return; + } + + const eventsToSend = [...this.events]; + this.events = []; + + const baseURL = process.env.LETTA_BASE_URL || "https://api.letta.com"; + const apiKey = process.env.LETTA_API_KEY; + + try { + // Add 5 second timeout to prevent telemetry from blocking shutdown + const timeoutPromise = new Promise((_, reject) => + setTimeout(() => reject(new Error("Telemetry request timeout")), 5000), + ); + + const fetchPromise = fetch(`${baseURL}/v1/metadata/telemetry`, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${apiKey}`, + "X-Letta-Source": "letta-code", + }, + body: JSON.stringify({ + service: "letta-code", + events: eventsToSend, + }), + }); + + const response = (await Promise.race([ + fetchPromise, + timeoutPromise, + ])) as Response; + + if (!response.ok) { + throw new Error(`Telemetry flush failed: ${response.status}`); + } + } catch { + // If flush fails, put events back in queue, but don't throw error + this.events.unshift(...eventsToSend); + } + } + + /** + * Clean up resources + */ + cleanup() { + if (this.flushInterval) { + clearInterval(this.flushInterval); + this.flushInterval = null; + } + } +} + +// Export singleton instance +export const telemetry = new TelemetryManager(); diff --git a/src/tools/manager.ts b/src/tools/manager.ts index f211fb0..4a838bc 100644 --- a/src/tools/manager.ts +++ b/src/tools/manager.ts @@ -6,6 +6,7 @@ import { } from "@letta-ai/letta-client"; import { getModelInfo } from "../agent/model"; import { getAllSubagentConfigs } from "../agent/subagents"; +import { telemetry } from "../telemetry"; import { TOOL_DEFINITIONS, type ToolName } from "./toolDefinitions"; export const TOOL_NAMES = Object.keys(TOOL_DEFINITIONS) as ToolName[]; @@ -899,6 +900,8 @@ export async function executeTool( }; } + const startTime = Date.now(); + try { // Inject options for tools that support them without altering schemas let enhancedArgs = args; @@ -914,6 +917,7 @@ export async function executeTool( } const result = await tool.fn(enhancedArgs); + const duration = Date.now() - startTime; // Extract stdout/stderr if present (for bash tools) const recordResult = isRecord(result) ? result : undefined; @@ -924,6 +928,16 @@ export async function executeTool( // Flatten the response to plain text const flattenedResponse = flattenToolResponse(result); + // Track tool usage (success path - we're in the try block) + telemetry.trackToolUsage( + internalName, + true, // Hardcoded to true since tool execution succeeded + duration, + flattenedResponse.length, + undefined, // no error_type on success + stderr ? stderr.join("\n") : undefined, + ); + // Return the full response (truncation happens in UI layer only) return { toolReturn: flattenedResponse, @@ -932,24 +946,38 @@ export async function executeTool( ...(stderr && { stderr }), }; } catch (error) { + const duration = Date.now() - startTime; const isAbort = error instanceof Error && (error.name === "AbortError" || error.message === "The operation was aborted" || // node:child_process AbortError may include code/message variants ("code" in error && error.code === "ABORT_ERR")); + const errorType = isAbort + ? "abort" + : error instanceof Error + ? error.name + : "unknown"; + const errorMessage = isAbort + ? "User interrupted tool execution" + : error instanceof Error + ? error.message + : String(error); - if (isAbort) { - return { - toolReturn: "User interrupted tool execution", - status: "error", - }; - } + // Track tool usage error + telemetry.trackToolUsage( + internalName, + false, + duration, + errorMessage.length, + errorType, + errorMessage, + ); // Don't console.error here - it pollutes the TUI // The error message is already returned in toolReturn return { - toolReturn: error instanceof Error ? error.message : String(error), + toolReturn: errorMessage, status: "error", }; }