From a7ec74612fd72a95637b8b04b440b75db123b3e8 Mon Sep 17 00:00:00 2001 From: jnjpng Date: Fri, 20 Feb 2026 16:04:38 -0800 Subject: [PATCH] feat: log chunks for feedback (#1068) Co-authored-by: Letta --- src/cli/App.tsx | 19 ++- src/cli/helpers/chunkLog.ts | 250 ++++++++++++++++++++++++++++++++++++ src/cli/helpers/stream.ts | 15 +++ 3 files changed, 280 insertions(+), 4 deletions(-) create mode 100644 src/cli/helpers/chunkLog.ts diff --git a/src/cli/App.tsx b/src/cli/App.tsx index 7acb177..338a87c 100644 --- a/src/cli/App.tsx +++ b/src/cli/App.tsx @@ -184,6 +184,7 @@ import { } from "./helpers/accumulator"; import { classifyApprovals } from "./helpers/approvalClassification"; import { backfillBuffers } from "./helpers/backfill"; +import { chunkLog } from "./helpers/chunkLog"; import { type ContextWindowOverview, renderContextUsage, @@ -1464,6 +1465,14 @@ export default function App({ const sessionStartTimeRef = useRef(Date.now()); const sessionHooksRanRef = useRef(false); + // Initialize chunk log for this agent + session (clears buffer, GCs old files). + // Re-runs when agentId changes (e.g. agent switch via /agents). + useEffect(() => { + if (agentId && agentId !== "loading") { + chunkLog.init(agentId, telemetry.getSessionId()); + } + }, [agentId]); + const syncTrajectoryTokenBase = useCallback(() => { const snapshot = sessionStatsRef.current.getTrajectorySnapshot(); setTrajectoryTokenBase(snapshot?.tokens ?? 0); @@ -10703,11 +10712,13 @@ ${SYSTEM_REMINDER_CLOSE} }; })(), // Agent info - agent_name: agentName, - agent_description: agentDescription, - model: currentModelId, + agent_name: agentName ?? undefined, + agent_description: agentDescription ?? undefined, + model: currentModelId ?? undefined, // Account info - billing_tier: billingTier, + billing_tier: billingTier ?? undefined, + // Recent chunk log for diagnostics + recent_chunks: chunkLog.getEntries(), }), }, ); diff --git a/src/cli/helpers/chunkLog.ts b/src/cli/helpers/chunkLog.ts new file mode 100644 index 0000000..7afa7fc --- /dev/null +++ b/src/cli/helpers/chunkLog.ts @@ -0,0 +1,250 @@ +/** + * ChunkLog - Rolling log of the last N streaming chunks received by the client. + * + * Stores truncated chunks as JSONL on disk, organized per agent per session: + * ~/.letta/logs/chunk-logs/{agent_id}/{session_id}.jsonl + * + * Metadata (message_type, ids, timestamps) is preserved fully; + * large content fields (reasoning, tool_return, arguments, etc.) are + * truncated to keep the file compact. + * + * Old session logs are garbage-collected: only the most recent N sessions + * per agent are kept on disk. + */ + +import { + existsSync, + mkdirSync, + readdirSync, + unlinkSync, + writeFileSync, +} from "node:fs"; +import { homedir } from "node:os"; +import { join } from "node:path"; +import type { LettaStreamingResponse } from "@letta-ai/letta-client/resources/agents/messages"; +import { debugWarn } from "../../utils/debug"; + +const MAX_ENTRIES = 100; +const CONTENT_TRUNCATE_LEN = 200; +const MAX_SESSION_FILES = 5; +const LOG_BASE_DIR = join(homedir(), ".letta", "logs", "chunk-logs"); + +// --------------------------------------------------------------------------- +// Truncation helpers +// --------------------------------------------------------------------------- + +function truncateStr(value: unknown, maxLen: number): string { + if (value === null || value === undefined) return ""; + const s = typeof value === "string" ? value : JSON.stringify(value); + if (s.length <= maxLen) return s; + return `${s.slice(0, maxLen)}...[truncated, was ${s.length}b]`; +} + +/** + * Build a compact representation of a chunk, preserving all metadata but + * truncating large content fields. + */ +function truncateChunk(chunk: LettaStreamingResponse): Record { + const raw = chunk as Record; + const type = raw.message_type as string | undefined; + + switch (type) { + case "reasoning_message": + return { + ...raw, + reasoning: truncateStr(raw.reasoning, CONTENT_TRUNCATE_LEN), + }; + + case "assistant_message": + return { + ...raw, + content: truncateStr(raw.content, CONTENT_TRUNCATE_LEN), + }; + + case "tool_call_message": { + // Truncate arguments inside tool_call / tool_calls + const truncateToolCall = (tc: Record) => ({ + ...tc, + arguments: truncateStr(tc.arguments, CONTENT_TRUNCATE_LEN), + }); + + const result: Record = { ...raw }; + + if (raw.tool_call && typeof raw.tool_call === "object") { + result.tool_call = truncateToolCall( + raw.tool_call as Record, + ); + } + if (Array.isArray(raw.tool_calls)) { + result.tool_calls = (raw.tool_calls as Record[]).map( + truncateToolCall, + ); + } + return result; + } + + case "tool_return_message": + return { + ...raw, + tool_return: truncateStr(raw.tool_return, CONTENT_TRUNCATE_LEN), + }; + + // Small/important chunk types -- keep as-is + case "ping": + case "error_message": + case "stop_reason": + case "usage_statistics": + return raw; + + // Unknown types -- shallow copy, truncate any "content" field + default: { + const result = { ...raw }; + if (typeof result.content === "string" || Array.isArray(result.content)) { + result.content = truncateStr(result.content, CONTENT_TRUNCATE_LEN); + } + return result; + } + } +} + +// --------------------------------------------------------------------------- +// ChunkLog class +// --------------------------------------------------------------------------- + +class ChunkLog { + private buffer: Record[] = []; + private dirty = false; + private logPath: string | null = null; + private agentDir: string | null = null; + private dirCreated = false; + + /** + * Initialize the chunk log for a specific agent + session. + * Must be called before append/flush will write to disk. + * Clears in-memory buffer and garbage-collects old session files. + */ + init(agentId: string, sessionId: string): void { + this.agentDir = join(LOG_BASE_DIR, agentId); + this.logPath = join(this.agentDir, `${sessionId}.jsonl`); + this.buffer = []; + this.dirty = false; + this.dirCreated = false; + + // GC old session files for this agent (keep last N) + this.pruneOldSessions(); + } + + /** + * Append a chunk to the in-memory log. Does NOT write to disk. + * Call flush() after a stream completes to persist. + */ + append(chunk: LettaStreamingResponse): void { + this.buffer.push(truncateChunk(chunk)); + + // Drop oldest entry if over capacity + if (this.buffer.length > MAX_ENTRIES) { + this.buffer.shift(); + } + + this.dirty = true; + } + + /** + * Flush buffered entries to disk. Call once per stream drain, not per chunk. + */ + flush(): void { + if (this.dirty && this.logPath) { + this.writeToDisk(); + this.dirty = false; + } + } + + /** + * Get all entries as an array of objects (for sending in feedback payload). + */ + getEntries(): Record[] { + return this.buffer; + } + + /** + * Number of entries currently in the log. + */ + get size(): number { + return this.buffer.length; + } + + // ----------------------------------------------------------------------- + // Disk I/O + // ----------------------------------------------------------------------- + + private ensureDir(): void { + if (this.dirCreated || !this.agentDir) return; + try { + if (!existsSync(this.agentDir)) { + mkdirSync(this.agentDir, { recursive: true }); + } + this.dirCreated = true; + } catch (e) { + debugWarn( + "chunkLog", + `Failed to create directory ${this.agentDir}: ${e instanceof Error ? e.message : String(e)}`, + ); + } + } + + private writeToDisk(): void { + if (!this.logPath) return; + this.ensureDir(); + try { + const content = this.buffer + .map((entry) => JSON.stringify(entry)) + .join("\n"); + writeFileSync(this.logPath, `${content}\n`, "utf8"); + } catch (e) { + debugWarn( + "chunkLog", + `Failed to write ${this.logPath}: ${e instanceof Error ? e.message : String(e)}`, + ); + } + } + + /** + * Remove old session log files, keeping only the most recent N. + * + * Session filenames start with Date.now() (e.g. "1740000000000-abc123.jsonl"), + * so lexicographic sort orders them chronologically. + */ + private pruneOldSessions(): void { + if (!this.agentDir) return; + try { + if (!existsSync(this.agentDir)) return; + const files = readdirSync(this.agentDir) + .filter((f) => f.endsWith(".jsonl")) + .sort(); + + // Keep the newest MAX_SESSION_FILES (excluding the current one + // which may not exist on disk yet) + if (files.length >= MAX_SESSION_FILES) { + const toDelete = files.slice(0, files.length - MAX_SESSION_FILES + 1); + for (const file of toDelete) { + try { + unlinkSync(join(this.agentDir, file)); + } catch (e) { + debugWarn( + "chunkLog", + `Failed to delete old session log ${file}: ${e instanceof Error ? e.message : String(e)}`, + ); + } + } + } + } catch (e) { + debugWarn( + "chunkLog", + `Failed to prune old sessions: ${e instanceof Error ? e.message : String(e)}`, + ); + } + } +} + +// Singleton instance +export const chunkLog = new ChunkLog(); diff --git a/src/cli/helpers/stream.ts b/src/cli/helpers/stream.ts index b76f506..c209298 100644 --- a/src/cli/helpers/stream.ts +++ b/src/cli/helpers/stream.ts @@ -13,6 +13,7 @@ import { markIncompleteToolsAsCancelled, onChunk, } from "./accumulator"; +import { chunkLog } from "./chunkLog"; import type { ContextTracker } from "./contextTracker"; import type { ErrorInfo } from "./streamProcessor"; import { StreamProcessor } from "./streamProcessor"; @@ -153,6 +154,13 @@ export async function drainStream( const { shouldOutput, errorInfo, updatedApproval } = streamProcessor.processChunk(chunk); + // Log chunk for feedback diagnostics + try { + chunkLog.append(chunk); + } catch { + // Silently ignore -- diagnostics should not break streaming + } + // Check abort signal before processing - don't add data after interrupt if (abortSignal?.aborted) { stopReason = "cancelled"; @@ -227,6 +235,13 @@ export async function drainStream( markIncompleteToolsAsCancelled(buffers, true, "stream_error"); queueMicrotask(refresh); } finally { + // Persist chunk log to disk (one write per stream, not per chunk) + try { + chunkLog.flush(); + } catch { + // Silently ignore -- diagnostics should not break streaming + } + // Clean up abort listener if (abortSignal) { abortSignal.removeEventListener("abort", abortHandler);