fix: remove StreamWatchdog that kills long-running agent operations (#204)
The watchdog aborted streams after idle timeouts, which breaks legitimate subagent operations. The SDK stream should throw on connection failures. Written by Cameron ◯ Letta Code "The stream will end when it's ready." - a patient engineer
This commit is contained in:
@@ -14,7 +14,7 @@ import { installSkillsToAgent } from '../skills/loader.js';
|
||||
import { formatMessageEnvelope, type SessionContextOptions } from './formatter.js';
|
||||
import { loadMemoryBlocks } from './memory.js';
|
||||
import { SYSTEM_PROMPT } from './system-prompt.js';
|
||||
import { StreamWatchdog } from './stream-watchdog.js';
|
||||
|
||||
|
||||
/**
|
||||
* Detect if an error is a 409 CONFLICT from an orphaned approval.
|
||||
@@ -426,21 +426,6 @@ export class LettaBot {
|
||||
let receivedAnyData = false; // Track if we got ANY stream data
|
||||
const msgTypeCounts: Record<string, number> = {};
|
||||
|
||||
// Stream watchdog - abort if idle for too long
|
||||
const watchdog = new StreamWatchdog({
|
||||
onAbort: () => {
|
||||
session.abort().catch((err) => {
|
||||
console.error('[Bot] Stream abort failed:', err);
|
||||
});
|
||||
try {
|
||||
session.close();
|
||||
} catch (err) {
|
||||
console.error('[Bot] Stream close failed:', err);
|
||||
}
|
||||
},
|
||||
});
|
||||
watchdog.start();
|
||||
|
||||
// Helper to finalize and send current accumulated response
|
||||
const finalizeMessage = async () => {
|
||||
// Check for silent marker - agent chose not to reply
|
||||
@@ -490,7 +475,6 @@ export class LettaBot {
|
||||
if (toolCallId) seenToolCallIds.add(toolCallId);
|
||||
}
|
||||
const msgUuid = (streamMsg as any).uuid;
|
||||
watchdog.ping();
|
||||
receivedAnyData = true;
|
||||
msgTypeCounts[streamMsg.type] = (msgTypeCounts[streamMsg.type] || 0) + 1;
|
||||
|
||||
@@ -578,7 +562,6 @@ export class LettaBot {
|
||||
console.log('[Bot] Empty result - attempting orphaned approval recovery...');
|
||||
session.close();
|
||||
clearInterval(typingInterval);
|
||||
watchdog.stop();
|
||||
const convResult = await recoverOrphanedConversationApproval(
|
||||
this.store.agentId,
|
||||
this.store.conversationId
|
||||
@@ -617,7 +600,6 @@ export class LettaBot {
|
||||
|
||||
}
|
||||
} finally {
|
||||
watchdog.stop();
|
||||
clearInterval(typingInterval);
|
||||
}
|
||||
|
||||
@@ -819,40 +801,20 @@ export class LettaBot {
|
||||
}
|
||||
|
||||
let response = '';
|
||||
const watchdog = new StreamWatchdog({
|
||||
onAbort: () => {
|
||||
console.warn('[Bot] sendToAgent stream idle timeout, aborting session...');
|
||||
session.abort().catch((err) => {
|
||||
console.error('[Bot] sendToAgent abort failed:', err);
|
||||
});
|
||||
try {
|
||||
session.close();
|
||||
} catch (err) {
|
||||
console.error('[Bot] sendToAgent close failed:', err);
|
||||
}
|
||||
},
|
||||
});
|
||||
watchdog.start();
|
||||
|
||||
try {
|
||||
for await (const msg of session.stream()) {
|
||||
watchdog.ping();
|
||||
if (msg.type === 'assistant') {
|
||||
response += msg.content;
|
||||
}
|
||||
|
||||
if (msg.type === 'result') {
|
||||
if (session.agentId && session.agentId !== this.store.agentId) {
|
||||
const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com';
|
||||
this.store.setAgent(session.agentId, currentBaseUrl, session.conversationId || undefined);
|
||||
} else if (session.conversationId && session.conversationId !== this.store.conversationId) {
|
||||
this.store.conversationId = session.conversationId;
|
||||
}
|
||||
break;
|
||||
}
|
||||
for await (const msg of session.stream()) {
|
||||
if (msg.type === 'assistant') {
|
||||
response += msg.content;
|
||||
}
|
||||
|
||||
if (msg.type === 'result') {
|
||||
if (session.agentId && session.agentId !== this.store.agentId) {
|
||||
const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com';
|
||||
this.store.setAgent(session.agentId, currentBaseUrl, session.conversationId || undefined);
|
||||
} else if (session.conversationId && session.conversationId !== this.store.conversationId) {
|
||||
this.store.conversationId = session.conversationId;
|
||||
}
|
||||
break;
|
||||
}
|
||||
} finally {
|
||||
watchdog.stop();
|
||||
}
|
||||
|
||||
return response;
|
||||
|
||||
@@ -1,204 +0,0 @@
|
||||
import { describe, expect, it, vi, beforeEach, afterEach } from 'vitest';
|
||||
import { StreamWatchdog } from './stream-watchdog.js';
|
||||
|
||||
describe('StreamWatchdog', () => {
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
// Clear env var before each test
|
||||
delete process.env.LETTA_STREAM_IDLE_TIMEOUT_MS;
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
describe('default behavior', () => {
|
||||
it('uses 120s default idle timeout', () => {
|
||||
const onAbort = vi.fn();
|
||||
const watchdog = new StreamWatchdog({ onAbort });
|
||||
watchdog.start();
|
||||
|
||||
// Should not abort before 120s
|
||||
vi.advanceTimersByTime(119000);
|
||||
expect(onAbort).not.toHaveBeenCalled();
|
||||
expect(watchdog.isAborted).toBe(false);
|
||||
|
||||
// Should abort at 120s
|
||||
vi.advanceTimersByTime(1000);
|
||||
expect(onAbort).toHaveBeenCalledTimes(1);
|
||||
expect(watchdog.isAborted).toBe(true);
|
||||
|
||||
watchdog.stop();
|
||||
});
|
||||
|
||||
it('ping() resets the idle timer', () => {
|
||||
const onAbort = vi.fn();
|
||||
const watchdog = new StreamWatchdog({ onAbort });
|
||||
watchdog.start();
|
||||
|
||||
// Advance 100s, then ping
|
||||
vi.advanceTimersByTime(100000);
|
||||
watchdog.ping();
|
||||
|
||||
// Advance another 100s - should not abort (only 100s since ping)
|
||||
vi.advanceTimersByTime(100000);
|
||||
expect(onAbort).not.toHaveBeenCalled();
|
||||
|
||||
// Advance 20 more seconds - now 120s since last ping
|
||||
vi.advanceTimersByTime(20000);
|
||||
expect(onAbort).toHaveBeenCalledTimes(1);
|
||||
|
||||
watchdog.stop();
|
||||
});
|
||||
|
||||
it('stop() prevents abort callback', () => {
|
||||
const onAbort = vi.fn();
|
||||
const watchdog = new StreamWatchdog({ onAbort });
|
||||
watchdog.start();
|
||||
|
||||
vi.advanceTimersByTime(25000);
|
||||
watchdog.stop();
|
||||
|
||||
// Even after full timeout, should not call abort
|
||||
vi.advanceTimersByTime(10000);
|
||||
expect(onAbort).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('custom options', () => {
|
||||
it('respects custom idleTimeoutMs', () => {
|
||||
const onAbort = vi.fn();
|
||||
const watchdog = new StreamWatchdog({ onAbort, idleTimeoutMs: 5000 });
|
||||
watchdog.start();
|
||||
|
||||
vi.advanceTimersByTime(4000);
|
||||
expect(onAbort).not.toHaveBeenCalled();
|
||||
|
||||
vi.advanceTimersByTime(1000);
|
||||
expect(onAbort).toHaveBeenCalledTimes(1);
|
||||
|
||||
watchdog.stop();
|
||||
});
|
||||
});
|
||||
|
||||
describe('environment variable override', () => {
|
||||
it('uses LETTA_STREAM_IDLE_TIMEOUT_MS when set', () => {
|
||||
process.env.LETTA_STREAM_IDLE_TIMEOUT_MS = '10000';
|
||||
|
||||
const onAbort = vi.fn();
|
||||
const watchdog = new StreamWatchdog({ onAbort });
|
||||
watchdog.start();
|
||||
|
||||
vi.advanceTimersByTime(9000);
|
||||
expect(onAbort).not.toHaveBeenCalled();
|
||||
|
||||
vi.advanceTimersByTime(1000);
|
||||
expect(onAbort).toHaveBeenCalledTimes(1);
|
||||
|
||||
watchdog.stop();
|
||||
});
|
||||
|
||||
it('env var takes precedence over options', () => {
|
||||
process.env.LETTA_STREAM_IDLE_TIMEOUT_MS = '5000';
|
||||
|
||||
const onAbort = vi.fn();
|
||||
// Option says 60s, but env says 5s
|
||||
const watchdog = new StreamWatchdog({ onAbort, idleTimeoutMs: 60000 });
|
||||
watchdog.start();
|
||||
|
||||
vi.advanceTimersByTime(5000);
|
||||
expect(onAbort).toHaveBeenCalledTimes(1);
|
||||
|
||||
watchdog.stop();
|
||||
});
|
||||
|
||||
it('ignores invalid env var values', () => {
|
||||
process.env.LETTA_STREAM_IDLE_TIMEOUT_MS = 'invalid';
|
||||
|
||||
const onAbort = vi.fn();
|
||||
const watchdog = new StreamWatchdog({ onAbort, idleTimeoutMs: 5000 });
|
||||
watchdog.start();
|
||||
|
||||
// Should use option value (5s) since env is invalid
|
||||
vi.advanceTimersByTime(5000);
|
||||
expect(onAbort).toHaveBeenCalledTimes(1);
|
||||
|
||||
watchdog.stop();
|
||||
});
|
||||
|
||||
it('ignores zero env var value', () => {
|
||||
process.env.LETTA_STREAM_IDLE_TIMEOUT_MS = '0';
|
||||
|
||||
const onAbort = vi.fn();
|
||||
const watchdog = new StreamWatchdog({ onAbort, idleTimeoutMs: 5000 });
|
||||
watchdog.start();
|
||||
|
||||
// Should use option value (5s) since env is 0
|
||||
vi.advanceTimersByTime(5000);
|
||||
expect(onAbort).toHaveBeenCalledTimes(1);
|
||||
|
||||
watchdog.stop();
|
||||
});
|
||||
});
|
||||
|
||||
describe('logging', () => {
|
||||
it('logs waiting message at logIntervalMs when idle', () => {
|
||||
const consoleSpy = vi.spyOn(console, 'log').mockImplementation(() => {});
|
||||
|
||||
const watchdog = new StreamWatchdog({ logIntervalMs: 1000 });
|
||||
watchdog.start();
|
||||
|
||||
// First interval - 1s idle
|
||||
vi.advanceTimersByTime(1000);
|
||||
expect(consoleSpy).toHaveBeenCalledWith(
|
||||
'[Bot] Stream waiting',
|
||||
expect.objectContaining({ idleMs: expect.any(Number) })
|
||||
);
|
||||
|
||||
consoleSpy.mockRestore();
|
||||
watchdog.stop();
|
||||
});
|
||||
});
|
||||
|
||||
describe('edge cases', () => {
|
||||
it('can be stopped before start', () => {
|
||||
const watchdog = new StreamWatchdog({});
|
||||
expect(() => watchdog.stop()).not.toThrow();
|
||||
});
|
||||
|
||||
it('multiple pings work correctly', () => {
|
||||
const onAbort = vi.fn();
|
||||
const watchdog = new StreamWatchdog({ onAbort, idleTimeoutMs: 1000 });
|
||||
watchdog.start();
|
||||
|
||||
// Rapid pings should keep resetting
|
||||
for (let i = 0; i < 10; i++) {
|
||||
vi.advanceTimersByTime(500);
|
||||
watchdog.ping();
|
||||
}
|
||||
|
||||
expect(onAbort).not.toHaveBeenCalled();
|
||||
|
||||
// Now let it timeout
|
||||
vi.advanceTimersByTime(1000);
|
||||
expect(onAbort).toHaveBeenCalledTimes(1);
|
||||
|
||||
watchdog.stop();
|
||||
});
|
||||
|
||||
it('abort callback only fires once', () => {
|
||||
const onAbort = vi.fn();
|
||||
const watchdog = new StreamWatchdog({ onAbort, idleTimeoutMs: 1000 });
|
||||
watchdog.start();
|
||||
|
||||
vi.advanceTimersByTime(1000);
|
||||
expect(onAbort).toHaveBeenCalledTimes(1);
|
||||
|
||||
// Even if we wait more, should not fire again
|
||||
vi.advanceTimersByTime(5000);
|
||||
expect(onAbort).toHaveBeenCalledTimes(1);
|
||||
|
||||
watchdog.stop();
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1,106 +0,0 @@
|
||||
/**
|
||||
* Stream Watchdog
|
||||
*
|
||||
* Monitors streaming responses for idle timeouts and provides
|
||||
* periodic logging when the stream is waiting.
|
||||
*/
|
||||
|
||||
export interface StreamWatchdogOptions {
|
||||
/** Idle timeout in milliseconds. Default: 30000 (30s) */
|
||||
idleTimeoutMs?: number;
|
||||
/** Log interval when idle. Default: 10000 (10s) */
|
||||
logIntervalMs?: number;
|
||||
/** Called when idle timeout triggers abort */
|
||||
onAbort?: () => void;
|
||||
}
|
||||
|
||||
export class StreamWatchdog {
|
||||
private idleTimer: NodeJS.Timeout | null = null;
|
||||
private logTimer: NodeJS.Timeout | null = null;
|
||||
private _aborted = false;
|
||||
private startTime = 0;
|
||||
private lastActivity = 0;
|
||||
|
||||
private readonly idleTimeoutMs: number;
|
||||
private readonly logIntervalMs: number;
|
||||
private readonly onAbort?: () => void;
|
||||
|
||||
constructor(options: StreamWatchdogOptions = {}) {
|
||||
// Allow env override, then option, then default
|
||||
const envTimeout = Number(process.env.LETTA_STREAM_IDLE_TIMEOUT_MS);
|
||||
this.idleTimeoutMs = Number.isFinite(envTimeout) && envTimeout > 0
|
||||
? envTimeout
|
||||
: (options.idleTimeoutMs ?? 120000);
|
||||
this.logIntervalMs = options.logIntervalMs ?? 10000;
|
||||
this.onAbort = options.onAbort;
|
||||
}
|
||||
|
||||
/**
|
||||
* Start watching the stream
|
||||
*/
|
||||
start(): void {
|
||||
this.startTime = Date.now();
|
||||
this.lastActivity = this.startTime;
|
||||
this._aborted = false;
|
||||
|
||||
this.resetIdleTimer();
|
||||
|
||||
// Periodic logging when idle
|
||||
this.logTimer = setInterval(() => {
|
||||
const now = Date.now();
|
||||
const idleMs = now - this.lastActivity;
|
||||
if (idleMs >= this.logIntervalMs) {
|
||||
console.log('[Bot] Stream waiting', {
|
||||
elapsedMs: now - this.startTime,
|
||||
idleMs,
|
||||
});
|
||||
}
|
||||
}, this.logIntervalMs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Call on each stream chunk to reset the idle timer
|
||||
*/
|
||||
ping(): void {
|
||||
this.lastActivity = Date.now();
|
||||
this.resetIdleTimer();
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop watching and cleanup all timers
|
||||
*/
|
||||
stop(): void {
|
||||
if (this.idleTimer) {
|
||||
clearTimeout(this.idleTimer);
|
||||
this.idleTimer = null;
|
||||
}
|
||||
if (this.logTimer) {
|
||||
clearInterval(this.logTimer);
|
||||
this.logTimer = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether the watchdog triggered an abort
|
||||
*/
|
||||
get isAborted(): boolean {
|
||||
return this._aborted;
|
||||
}
|
||||
|
||||
private resetIdleTimer(): void {
|
||||
if (this.idleTimer) {
|
||||
clearTimeout(this.idleTimer);
|
||||
}
|
||||
|
||||
this.idleTimer = setTimeout(() => {
|
||||
if (this._aborted) return;
|
||||
this._aborted = true;
|
||||
|
||||
console.warn(`[Bot] Stream idle timeout after ${this.idleTimeoutMs}ms, aborting...`);
|
||||
|
||||
if (this.onAbort) {
|
||||
this.onAbort();
|
||||
}
|
||||
}, this.idleTimeoutMs);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user