diff --git a/package-lock.json b/package-lock.json index 577eca7..e73d8d8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,7 +12,7 @@ "@clack/prompts": "^0.11.0", "@hapi/boom": "^10.0.1", "@letta-ai/letta-client": "^1.7.8", - "@letta-ai/letta-code-sdk": "^0.1.8", + "@letta-ai/letta-code-sdk": "^0.1.6", "@types/express": "^5.0.6", "@types/node": "^25.0.10", "@types/node-schedule": "^2.1.8", diff --git a/src/core/bot.ts b/src/core/bot.ts index a8e0c78..ecea6be 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -2233,8 +2233,9 @@ export class LettaBot implements AgentSession { async sendToAgent( text: string, - _context?: TriggerContext + context?: TriggerContext ): Promise { + const isSilent = context?.outputMode === 'silent'; const convKey = this.resolveHeartbeatConversationKey(); const acquired = await this.acquireLock(convKey); @@ -2278,6 +2279,9 @@ export class LettaBot implements AgentSession { break; } } + if (isSilent && response.trim()) { + log.info(`Silent mode: collected ${response.length} chars (not delivered)`); + } return response; } catch (error) { // Invalidate on stream errors so next call gets a fresh subprocess @@ -2295,7 +2299,7 @@ export class LettaBot implements AgentSession { */ async *streamToAgent( text: string, - _context?: TriggerContext + context?: TriggerContext ): AsyncGenerator { const convKey = this.resolveHeartbeatConversationKey(); const acquired = await this.acquireLock(convKey); diff --git a/src/core/sdk-session-contract.test.ts b/src/core/sdk-session-contract.test.ts index 83a244c..b7c5d16 100644 --- a/src/core/sdk-session-contract.test.ts +++ b/src/core/sdk-session-contract.test.ts @@ -559,4 +559,69 @@ describe('SDK session contract', () => { }) ); }); + + it('does not leak stale stream events between consecutive sendToAgent calls', async () => { + // Simulates the real SDK behavior prior to 0.1.8: the shared streamQueue + // retains events that arrive after the result message. When the next + // stream() call starts, it reads these stale events first, causing the + // N-1 desync and silent-mode heartbeat leak. + const sharedQueue: Array<{ type: string; content?: string; success?: boolean }> = []; + let sendCount = 0; + + const mockSession = { + initialize: vi.fn(async () => undefined), + send: vi.fn(async () => { + // SDK 0.1.8 fix: clear stale events from previous run on every send(). + // Without this line, stale events from run A leak into run B's stream. + sharedQueue.length = 0; + + if (sendCount === 0) { + // First run: response A, result, then trailing stale events that + // arrive in the background pump AFTER the result has been yielded. + sharedQueue.push( + { type: 'assistant', content: 'response-A' }, + { type: 'result', success: true }, + // Stale event that would leak into next stream() without the fix: + { type: 'assistant', content: 'stale-heartbeat-text' }, + ); + } else { + // Second run: response B + sharedQueue.push( + { type: 'assistant', content: 'response-B' }, + { type: 'result', success: true }, + ); + } + sendCount++; + }), + stream: vi.fn(() => + (async function* () { + while (sharedQueue.length > 0) { + const msg = sharedQueue.shift()!; + yield msg; + if (msg.type === 'result') break; + } + })() + ), + close: vi.fn(() => undefined), + agentId: 'agent-queue-leak-test', + conversationId: 'conversation-queue-leak-test', + }; + + vi.mocked(createSession).mockReturnValue(mockSession as never); + vi.mocked(resumeSession).mockReturnValue(mockSession as never); + + const bot = new LettaBot({ + workingDir: join(dataDir, 'working'), + allowedTools: [], + }); + + const responseA = await bot.sendToAgent('first message'); + expect(responseA).toBe('response-A'); + + const responseB = await bot.sendToAgent('second message'); + // Before the SDK 0.1.8 fix, responseB would be 'stale-heartbeat-text' + // because the sharedQueue still had the trailing event from run A. + // With the fix (queue cleared on send), responseB is 'response-B'. + expect(responseB).toBe('response-B'); + }); });