From c88621574aebe75976ff66ea5ae1504709dae9e2 Mon Sep 17 00:00:00 2001 From: Cameron Date: Sat, 7 Feb 2026 14:43:01 -0800 Subject: [PATCH] fix: remove StreamWatchdog that kills long-running agent operations (#204) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The watchdog aborted streams after idle timeouts, which breaks legitimate subagent operations. The SDK stream should throw on connection failures. Written by Cameron ◯ Letta Code "The stream will end when it's ready." - a patient engineer --- src/core/bot.ts | 66 +++------- src/core/stream-watchdog.test.ts | 204 ------------------------------- src/core/stream-watchdog.ts | 106 ---------------- 3 files changed, 14 insertions(+), 362 deletions(-) delete mode 100644 src/core/stream-watchdog.test.ts delete mode 100644 src/core/stream-watchdog.ts diff --git a/src/core/bot.ts b/src/core/bot.ts index bd4b773..01d354d 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -14,7 +14,7 @@ import { installSkillsToAgent } from '../skills/loader.js'; import { formatMessageEnvelope, type SessionContextOptions } from './formatter.js'; import { loadMemoryBlocks } from './memory.js'; import { SYSTEM_PROMPT } from './system-prompt.js'; -import { StreamWatchdog } from './stream-watchdog.js'; + /** * Detect if an error is a 409 CONFLICT from an orphaned approval. @@ -426,21 +426,6 @@ export class LettaBot { let receivedAnyData = false; // Track if we got ANY stream data const msgTypeCounts: Record = {}; - // Stream watchdog - abort if idle for too long - const watchdog = new StreamWatchdog({ - onAbort: () => { - session.abort().catch((err) => { - console.error('[Bot] Stream abort failed:', err); - }); - try { - session.close(); - } catch (err) { - console.error('[Bot] Stream close failed:', err); - } - }, - }); - watchdog.start(); - // Helper to finalize and send current accumulated response const finalizeMessage = async () => { // Check for silent marker - agent chose not to reply @@ -490,7 +475,6 @@ export class LettaBot { if (toolCallId) seenToolCallIds.add(toolCallId); } const msgUuid = (streamMsg as any).uuid; - watchdog.ping(); receivedAnyData = true; msgTypeCounts[streamMsg.type] = (msgTypeCounts[streamMsg.type] || 0) + 1; @@ -578,7 +562,6 @@ export class LettaBot { console.log('[Bot] Empty result - attempting orphaned approval recovery...'); session.close(); clearInterval(typingInterval); - watchdog.stop(); const convResult = await recoverOrphanedConversationApproval( this.store.agentId, this.store.conversationId @@ -617,7 +600,6 @@ export class LettaBot { } } finally { - watchdog.stop(); clearInterval(typingInterval); } @@ -819,40 +801,20 @@ export class LettaBot { } let response = ''; - const watchdog = new StreamWatchdog({ - onAbort: () => { - console.warn('[Bot] sendToAgent stream idle timeout, aborting session...'); - session.abort().catch((err) => { - console.error('[Bot] sendToAgent abort failed:', err); - }); - try { - session.close(); - } catch (err) { - console.error('[Bot] sendToAgent close failed:', err); - } - }, - }); - watchdog.start(); - - try { - for await (const msg of session.stream()) { - watchdog.ping(); - if (msg.type === 'assistant') { - response += msg.content; - } - - if (msg.type === 'result') { - if (session.agentId && session.agentId !== this.store.agentId) { - const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com'; - this.store.setAgent(session.agentId, currentBaseUrl, session.conversationId || undefined); - } else if (session.conversationId && session.conversationId !== this.store.conversationId) { - this.store.conversationId = session.conversationId; - } - break; - } + for await (const msg of session.stream()) { + if (msg.type === 'assistant') { + response += msg.content; + } + + if (msg.type === 'result') { + if (session.agentId && session.agentId !== this.store.agentId) { + const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com'; + this.store.setAgent(session.agentId, currentBaseUrl, session.conversationId || undefined); + } else if (session.conversationId && session.conversationId !== this.store.conversationId) { + this.store.conversationId = session.conversationId; + } + break; } - } finally { - watchdog.stop(); } return response; diff --git a/src/core/stream-watchdog.test.ts b/src/core/stream-watchdog.test.ts deleted file mode 100644 index a33c8d5..0000000 --- a/src/core/stream-watchdog.test.ts +++ /dev/null @@ -1,204 +0,0 @@ -import { describe, expect, it, vi, beforeEach, afterEach } from 'vitest'; -import { StreamWatchdog } from './stream-watchdog.js'; - -describe('StreamWatchdog', () => { - beforeEach(() => { - vi.useFakeTimers(); - // Clear env var before each test - delete process.env.LETTA_STREAM_IDLE_TIMEOUT_MS; - }); - - afterEach(() => { - vi.useRealTimers(); - }); - - describe('default behavior', () => { - it('uses 120s default idle timeout', () => { - const onAbort = vi.fn(); - const watchdog = new StreamWatchdog({ onAbort }); - watchdog.start(); - - // Should not abort before 120s - vi.advanceTimersByTime(119000); - expect(onAbort).not.toHaveBeenCalled(); - expect(watchdog.isAborted).toBe(false); - - // Should abort at 120s - vi.advanceTimersByTime(1000); - expect(onAbort).toHaveBeenCalledTimes(1); - expect(watchdog.isAborted).toBe(true); - - watchdog.stop(); - }); - - it('ping() resets the idle timer', () => { - const onAbort = vi.fn(); - const watchdog = new StreamWatchdog({ onAbort }); - watchdog.start(); - - // Advance 100s, then ping - vi.advanceTimersByTime(100000); - watchdog.ping(); - - // Advance another 100s - should not abort (only 100s since ping) - vi.advanceTimersByTime(100000); - expect(onAbort).not.toHaveBeenCalled(); - - // Advance 20 more seconds - now 120s since last ping - vi.advanceTimersByTime(20000); - expect(onAbort).toHaveBeenCalledTimes(1); - - watchdog.stop(); - }); - - it('stop() prevents abort callback', () => { - const onAbort = vi.fn(); - const watchdog = new StreamWatchdog({ onAbort }); - watchdog.start(); - - vi.advanceTimersByTime(25000); - watchdog.stop(); - - // Even after full timeout, should not call abort - vi.advanceTimersByTime(10000); - expect(onAbort).not.toHaveBeenCalled(); - }); - }); - - describe('custom options', () => { - it('respects custom idleTimeoutMs', () => { - const onAbort = vi.fn(); - const watchdog = new StreamWatchdog({ onAbort, idleTimeoutMs: 5000 }); - watchdog.start(); - - vi.advanceTimersByTime(4000); - expect(onAbort).not.toHaveBeenCalled(); - - vi.advanceTimersByTime(1000); - expect(onAbort).toHaveBeenCalledTimes(1); - - watchdog.stop(); - }); - }); - - describe('environment variable override', () => { - it('uses LETTA_STREAM_IDLE_TIMEOUT_MS when set', () => { - process.env.LETTA_STREAM_IDLE_TIMEOUT_MS = '10000'; - - const onAbort = vi.fn(); - const watchdog = new StreamWatchdog({ onAbort }); - watchdog.start(); - - vi.advanceTimersByTime(9000); - expect(onAbort).not.toHaveBeenCalled(); - - vi.advanceTimersByTime(1000); - expect(onAbort).toHaveBeenCalledTimes(1); - - watchdog.stop(); - }); - - it('env var takes precedence over options', () => { - process.env.LETTA_STREAM_IDLE_TIMEOUT_MS = '5000'; - - const onAbort = vi.fn(); - // Option says 60s, but env says 5s - const watchdog = new StreamWatchdog({ onAbort, idleTimeoutMs: 60000 }); - watchdog.start(); - - vi.advanceTimersByTime(5000); - expect(onAbort).toHaveBeenCalledTimes(1); - - watchdog.stop(); - }); - - it('ignores invalid env var values', () => { - process.env.LETTA_STREAM_IDLE_TIMEOUT_MS = 'invalid'; - - const onAbort = vi.fn(); - const watchdog = new StreamWatchdog({ onAbort, idleTimeoutMs: 5000 }); - watchdog.start(); - - // Should use option value (5s) since env is invalid - vi.advanceTimersByTime(5000); - expect(onAbort).toHaveBeenCalledTimes(1); - - watchdog.stop(); - }); - - it('ignores zero env var value', () => { - process.env.LETTA_STREAM_IDLE_TIMEOUT_MS = '0'; - - const onAbort = vi.fn(); - const watchdog = new StreamWatchdog({ onAbort, idleTimeoutMs: 5000 }); - watchdog.start(); - - // Should use option value (5s) since env is 0 - vi.advanceTimersByTime(5000); - expect(onAbort).toHaveBeenCalledTimes(1); - - watchdog.stop(); - }); - }); - - describe('logging', () => { - it('logs waiting message at logIntervalMs when idle', () => { - const consoleSpy = vi.spyOn(console, 'log').mockImplementation(() => {}); - - const watchdog = new StreamWatchdog({ logIntervalMs: 1000 }); - watchdog.start(); - - // First interval - 1s idle - vi.advanceTimersByTime(1000); - expect(consoleSpy).toHaveBeenCalledWith( - '[Bot] Stream waiting', - expect.objectContaining({ idleMs: expect.any(Number) }) - ); - - consoleSpy.mockRestore(); - watchdog.stop(); - }); - }); - - describe('edge cases', () => { - it('can be stopped before start', () => { - const watchdog = new StreamWatchdog({}); - expect(() => watchdog.stop()).not.toThrow(); - }); - - it('multiple pings work correctly', () => { - const onAbort = vi.fn(); - const watchdog = new StreamWatchdog({ onAbort, idleTimeoutMs: 1000 }); - watchdog.start(); - - // Rapid pings should keep resetting - for (let i = 0; i < 10; i++) { - vi.advanceTimersByTime(500); - watchdog.ping(); - } - - expect(onAbort).not.toHaveBeenCalled(); - - // Now let it timeout - vi.advanceTimersByTime(1000); - expect(onAbort).toHaveBeenCalledTimes(1); - - watchdog.stop(); - }); - - it('abort callback only fires once', () => { - const onAbort = vi.fn(); - const watchdog = new StreamWatchdog({ onAbort, idleTimeoutMs: 1000 }); - watchdog.start(); - - vi.advanceTimersByTime(1000); - expect(onAbort).toHaveBeenCalledTimes(1); - - // Even if we wait more, should not fire again - vi.advanceTimersByTime(5000); - expect(onAbort).toHaveBeenCalledTimes(1); - - watchdog.stop(); - }); - }); -}); diff --git a/src/core/stream-watchdog.ts b/src/core/stream-watchdog.ts deleted file mode 100644 index 043131d..0000000 --- a/src/core/stream-watchdog.ts +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Stream Watchdog - * - * Monitors streaming responses for idle timeouts and provides - * periodic logging when the stream is waiting. - */ - -export interface StreamWatchdogOptions { - /** Idle timeout in milliseconds. Default: 30000 (30s) */ - idleTimeoutMs?: number; - /** Log interval when idle. Default: 10000 (10s) */ - logIntervalMs?: number; - /** Called when idle timeout triggers abort */ - onAbort?: () => void; -} - -export class StreamWatchdog { - private idleTimer: NodeJS.Timeout | null = null; - private logTimer: NodeJS.Timeout | null = null; - private _aborted = false; - private startTime = 0; - private lastActivity = 0; - - private readonly idleTimeoutMs: number; - private readonly logIntervalMs: number; - private readonly onAbort?: () => void; - - constructor(options: StreamWatchdogOptions = {}) { - // Allow env override, then option, then default - const envTimeout = Number(process.env.LETTA_STREAM_IDLE_TIMEOUT_MS); - this.idleTimeoutMs = Number.isFinite(envTimeout) && envTimeout > 0 - ? envTimeout - : (options.idleTimeoutMs ?? 120000); - this.logIntervalMs = options.logIntervalMs ?? 10000; - this.onAbort = options.onAbort; - } - - /** - * Start watching the stream - */ - start(): void { - this.startTime = Date.now(); - this.lastActivity = this.startTime; - this._aborted = false; - - this.resetIdleTimer(); - - // Periodic logging when idle - this.logTimer = setInterval(() => { - const now = Date.now(); - const idleMs = now - this.lastActivity; - if (idleMs >= this.logIntervalMs) { - console.log('[Bot] Stream waiting', { - elapsedMs: now - this.startTime, - idleMs, - }); - } - }, this.logIntervalMs); - } - - /** - * Call on each stream chunk to reset the idle timer - */ - ping(): void { - this.lastActivity = Date.now(); - this.resetIdleTimer(); - } - - /** - * Stop watching and cleanup all timers - */ - stop(): void { - if (this.idleTimer) { - clearTimeout(this.idleTimer); - this.idleTimer = null; - } - if (this.logTimer) { - clearInterval(this.logTimer); - this.logTimer = null; - } - } - - /** - * Whether the watchdog triggered an abort - */ - get isAborted(): boolean { - return this._aborted; - } - - private resetIdleTimer(): void { - if (this.idleTimer) { - clearTimeout(this.idleTimer); - } - - this.idleTimer = setTimeout(() => { - if (this._aborted) return; - this._aborted = true; - - console.warn(`[Bot] Stream idle timeout after ${this.idleTimeoutMs}ms, aborting...`); - - if (this.onAbort) { - this.onAbort(); - } - }, this.idleTimeoutMs); - } -}