fix: resolve stream queue contamination causing N-1 desync and silent mode leak (#411)
This commit is contained in:
2
package-lock.json
generated
2
package-lock.json
generated
@@ -12,7 +12,7 @@
|
||||
"@clack/prompts": "^0.11.0",
|
||||
"@hapi/boom": "^10.0.1",
|
||||
"@letta-ai/letta-client": "^1.7.8",
|
||||
"@letta-ai/letta-code-sdk": "^0.1.8",
|
||||
"@letta-ai/letta-code-sdk": "^0.1.6",
|
||||
"@types/express": "^5.0.6",
|
||||
"@types/node": "^25.0.10",
|
||||
"@types/node-schedule": "^2.1.8",
|
||||
|
||||
@@ -2233,8 +2233,9 @@ export class LettaBot implements AgentSession {
|
||||
|
||||
async sendToAgent(
|
||||
text: string,
|
||||
_context?: TriggerContext
|
||||
context?: TriggerContext
|
||||
): Promise<string> {
|
||||
const isSilent = context?.outputMode === 'silent';
|
||||
const convKey = this.resolveHeartbeatConversationKey();
|
||||
const acquired = await this.acquireLock(convKey);
|
||||
|
||||
@@ -2278,6 +2279,9 @@ export class LettaBot implements AgentSession {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (isSilent && response.trim()) {
|
||||
log.info(`Silent mode: collected ${response.length} chars (not delivered)`);
|
||||
}
|
||||
return response;
|
||||
} catch (error) {
|
||||
// Invalidate on stream errors so next call gets a fresh subprocess
|
||||
@@ -2295,7 +2299,7 @@ export class LettaBot implements AgentSession {
|
||||
*/
|
||||
async *streamToAgent(
|
||||
text: string,
|
||||
_context?: TriggerContext
|
||||
context?: TriggerContext
|
||||
): AsyncGenerator<StreamMsg> {
|
||||
const convKey = this.resolveHeartbeatConversationKey();
|
||||
const acquired = await this.acquireLock(convKey);
|
||||
|
||||
@@ -559,4 +559,69 @@ describe('SDK session contract', () => {
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it('does not leak stale stream events between consecutive sendToAgent calls', async () => {
|
||||
// Simulates the real SDK behavior prior to 0.1.8: the shared streamQueue
|
||||
// retains events that arrive after the result message. When the next
|
||||
// stream() call starts, it reads these stale events first, causing the
|
||||
// N-1 desync and silent-mode heartbeat leak.
|
||||
const sharedQueue: Array<{ type: string; content?: string; success?: boolean }> = [];
|
||||
let sendCount = 0;
|
||||
|
||||
const mockSession = {
|
||||
initialize: vi.fn(async () => undefined),
|
||||
send: vi.fn(async () => {
|
||||
// SDK 0.1.8 fix: clear stale events from previous run on every send().
|
||||
// Without this line, stale events from run A leak into run B's stream.
|
||||
sharedQueue.length = 0;
|
||||
|
||||
if (sendCount === 0) {
|
||||
// First run: response A, result, then trailing stale events that
|
||||
// arrive in the background pump AFTER the result has been yielded.
|
||||
sharedQueue.push(
|
||||
{ type: 'assistant', content: 'response-A' },
|
||||
{ type: 'result', success: true },
|
||||
// Stale event that would leak into next stream() without the fix:
|
||||
{ type: 'assistant', content: 'stale-heartbeat-text' },
|
||||
);
|
||||
} else {
|
||||
// Second run: response B
|
||||
sharedQueue.push(
|
||||
{ type: 'assistant', content: 'response-B' },
|
||||
{ type: 'result', success: true },
|
||||
);
|
||||
}
|
||||
sendCount++;
|
||||
}),
|
||||
stream: vi.fn(() =>
|
||||
(async function* () {
|
||||
while (sharedQueue.length > 0) {
|
||||
const msg = sharedQueue.shift()!;
|
||||
yield msg;
|
||||
if (msg.type === 'result') break;
|
||||
}
|
||||
})()
|
||||
),
|
||||
close: vi.fn(() => undefined),
|
||||
agentId: 'agent-queue-leak-test',
|
||||
conversationId: 'conversation-queue-leak-test',
|
||||
};
|
||||
|
||||
vi.mocked(createSession).mockReturnValue(mockSession as never);
|
||||
vi.mocked(resumeSession).mockReturnValue(mockSession as never);
|
||||
|
||||
const bot = new LettaBot({
|
||||
workingDir: join(dataDir, 'working'),
|
||||
allowedTools: [],
|
||||
});
|
||||
|
||||
const responseA = await bot.sendToAgent('first message');
|
||||
expect(responseA).toBe('response-A');
|
||||
|
||||
const responseB = await bot.sendToAgent('second message');
|
||||
// Before the SDK 0.1.8 fix, responseB would be 'stale-heartbeat-text'
|
||||
// because the sharedQueue still had the trailing event from run A.
|
||||
// With the fix (queue cleared on send), responseB is 'response-B'.
|
||||
expect(responseB).toBe('response-B');
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user