From e80c5b4dd6d3591f2331ac87aff5ee88fc2d1ecb Mon Sep 17 00:00:00 2001 From: Cameron Date: Fri, 6 Mar 2026 11:51:05 -0800 Subject: [PATCH] fix: prevent non-foreground run events leaking into user response stream (#513) --- package-lock.json | 2 +- package.json | 2 +- src/core/bot.ts | 194 ++++++++++++++++++++++------------ src/core/result-guard.test.ts | 142 +++++++++++++++++++++++++ src/core/types.ts | 1 + 5 files changed, 269 insertions(+), 72 deletions(-) diff --git a/package-lock.json b/package-lock.json index 54aa0ff..03c81bf 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,7 +12,7 @@ "@clack/prompts": "^0.11.0", "@hapi/boom": "^10.0.1", "@letta-ai/letta-client": "^1.7.11", - "@letta-ai/letta-code-sdk": "^0.1.10", + "@letta-ai/letta-code-sdk": "^0.1.9", "@types/express": "^5.0.6", "@types/node": "^25.0.10", "@types/node-schedule": "^2.1.8", diff --git a/package.json b/package.json index 87b6a2f..47d8ec1 100644 --- a/package.json +++ b/package.json @@ -67,7 +67,7 @@ "@clack/prompts": "^0.11.0", "@hapi/boom": "^10.0.1", "@letta-ai/letta-client": "^1.7.11", - "@letta-ai/letta-code-sdk": "^0.1.10", + "@letta-ai/letta-code-sdk": "^0.1.9", "@types/express": "^5.0.6", "@types/node": "^25.0.10", "@types/node-schedule": "^2.1.8", diff --git a/src/core/bot.ts b/src/core/bot.ts index 7365f94..05c17a0 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -187,7 +187,7 @@ export function resolveHeartbeatConversationKey( } export class LettaBot implements AgentSession { - readonly store: Store; + private store: Store; private config: BotConfig; private channels: Map = new Map(); private messageQueue: Array<{ msg: InboundMessage; adapter: ChannelAdapter }> = []; @@ -269,6 +269,7 @@ export class LettaBot implements AgentSession { private normalizeResultRunIds(msg: StreamMsg): string[] { const runIds = this.normalizeStreamRunIds(msg); if (runIds.length === 0) return []; + return [...new Set(runIds)].sort(); } @@ -484,14 +485,6 @@ export class LettaBot implements AgentSession { return this.sessionManager.warmSession(); } - /** - * Invalidate the live session for a conversation key. - * The next message will create a fresh session using the current store state. - */ - invalidateSession(key?: string): void { - this.sessionManager.invalidateSession(key); - } - // ========================================================================= // Channel management // ========================================================================= @@ -675,26 +668,6 @@ export class LettaBot implements AgentSession { lines.push('', 'Use `/model ` to switch.'); return lines.join('\n'); } - case 'setconv': { - if (!args?.trim()) { - return 'Usage: /setconv '; - } - const newConvId = args.trim(); - const convKey = channelId ? this.resolveConversationKey(channelId, chatId) : 'shared'; - - if (convKey === 'default') { - return 'Conversations are disabled -- cannot set conversation ID.'; - } - - if (convKey === 'shared') { - this.store.conversationId = newConvId; - } else { - this.store.setConversationId(convKey, newConvId); - } - this.sessionManager.invalidateSession(convKey); - log.info(`/setconv - conversation set to ${newConvId} for key="${convKey}"`); - return `Conversation set to: ${newConvId}`; - } default: return null; } @@ -1022,8 +995,17 @@ export class LettaBot implements AgentSession { let reasoningBuffer = ''; let expectedForegroundRunId: string | null = null; let expectedForegroundRunSource: 'assistant' | 'result' | null = null; - let foregroundRunSwitchCount = 0; + let sawCompetingRunEvent = false; + let sawForegroundResult = false; let filteredRunEventCount = 0; + let ignoredNonForegroundResultCount = 0; + let bufferedDisplayFlushed = false; + let bufferedDisplayFlushCount = 0; + let bufferedDisplayDropCount = 0; + const bufferedDisplayEvents: Array< + | { kind: 'reasoning'; runId: string; content: string } + | { kind: 'tool_call'; runId: string; msg: StreamMsg } + > = []; const msgTypeCounts: Record = {}; const parseAndHandleDirectives = async () => { @@ -1041,6 +1023,72 @@ export class LettaBot implements AgentSession { sentAnyMessage = true; } }; + + const sendReasoningDisplay = async (content: string) => { + if (!this.config.display?.showReasoning || suppressDelivery || !content.trim()) return; + try { + const reasoning = formatReasoningDisplay(content, adapter.id, this.config.display?.reasoningMaxChars); + await adapter.sendMessage({ + chatId: msg.chatId, + text: reasoning.text, + threadId: msg.threadId, + parseMode: reasoning.parseMode, + }); + // Note: display messages don't set sentAnyMessage -- they're informational, + // not a substitute for an assistant response. + } catch (err) { + log.warn('Failed to send reasoning display:', err instanceof Error ? err.message : err); + } + }; + + const sendToolCallDisplay = async (toolMsg: StreamMsg) => { + if (!this.config.display?.showToolCalls || suppressDelivery) return; + try { + const text = formatToolCallDisplay(toolMsg); + await adapter.sendMessage({ chatId: msg.chatId, text, threadId: msg.threadId }); + } catch (err) { + log.warn('Failed to send tool call display:', err instanceof Error ? err.message : err); + } + }; + + const bufferRunScopedDisplayEvent = (runId: string, streamMsg: StreamMsg) => { + if (streamMsg.type === 'reasoning') { + if (!this.config.display?.showReasoning) return; + const chunk = typeof streamMsg.content === 'string' ? streamMsg.content : ''; + if (!chunk) return; + const lastEvent = bufferedDisplayEvents[bufferedDisplayEvents.length - 1]; + if (lastEvent && lastEvent.kind === 'reasoning' && lastEvent.runId === runId) { + lastEvent.content += chunk; + } else { + bufferedDisplayEvents.push({ kind: 'reasoning', runId, content: chunk }); + } + return; + } + + if (streamMsg.type === 'tool_call') { + if (!this.config.display?.showToolCalls) return; + bufferedDisplayEvents.push({ kind: 'tool_call', runId, msg: streamMsg }); + } + }; + + const flushBufferedDisplayEventsForRun = async (runId: string) => { + for (const event of bufferedDisplayEvents) { + if (event.runId !== runId) { + bufferedDisplayDropCount += 1; + continue; + } + if (event.kind === 'reasoning') { + await sendReasoningDisplay(event.content); + bufferedDisplayFlushCount += 1; + continue; + } + + this.sessionManager.syncTodoToolCall(event.msg); + await sendToolCallDisplay(event.msg); + bufferedDisplayFlushCount += 1; + } + bufferedDisplayEvents.length = 0; + }; const finalizeMessage = async () => { // Parse and execute XML directives before sending @@ -1100,44 +1148,41 @@ export class LettaBot implements AgentSession { break; } if (!firstChunkLogged) { lap('first stream chunk'); firstChunkLogged = true; } - const eventRunIds = this.normalizeStreamRunIds(streamMsg); if (expectedForegroundRunId === null && eventRunIds.length > 0) { if (streamMsg.type === 'assistant' || streamMsg.type === 'result') { expectedForegroundRunId = eventRunIds[0]; expectedForegroundRunSource = streamMsg.type === 'assistant' ? 'assistant' : 'result'; log.info(`Selected foreground run for stream delivery (seq=${seq}, key=${convKey}, runId=${expectedForegroundRunId}, source=${streamMsg.type})`); + if (!bufferedDisplayFlushed && bufferedDisplayEvents.length > 0) { + await flushBufferedDisplayEventsForRun(expectedForegroundRunId); + bufferedDisplayFlushed = true; + } } else { // Do not lock to a run based on pre-assistant non-terminal events; // these can belong to a concurrent background run. + const runId = eventRunIds[0]; + if (runId && (streamMsg.type === 'reasoning' || streamMsg.type === 'tool_call')) { + bufferRunScopedDisplayEvent(runId, streamMsg); + filteredRunEventCount++; + log.info(`Buffering run-scoped pre-foreground display event (seq=${seq}, key=${convKey}, type=${streamMsg.type}, runId=${runId})`); + continue; + } filteredRunEventCount++; log.info(`Deferring run-scoped pre-foreground event (seq=${seq}, key=${convKey}, type=${streamMsg.type}, runIds=${eventRunIds.join(',')})`); continue; } } else if (expectedForegroundRunId && eventRunIds.length > 0 && !eventRunIds.includes(expectedForegroundRunId)) { - const canSafelySwitchForeground = !sentAnyMessage || messageId !== null; - if (streamMsg.type === 'result' - && foregroundRunSwitchCount === 0 - && canSafelySwitchForeground) { - const previousRunId = expectedForegroundRunId; - const previousRunSource = expectedForegroundRunSource; - expectedForegroundRunId = eventRunIds[0]; - expectedForegroundRunSource = 'result'; - foregroundRunSwitchCount += 1; - // Drop any state collected from the previous run so it cannot - // flush to user-facing delivery after the switch. - response = ''; - reasoningBuffer = ''; - streamedAssistantText = ''; - lastMsgType = null; - lastAssistantUuid = null; - sawNonAssistantSinceLastUuid = false; - log.warn(`Switching foreground run at result boundary (seq=${seq}, key=${convKey}, from=${previousRunId}, to=${expectedForegroundRunId}, prevSource=${previousRunSource || 'unknown'})`); + // Strict no-rebind policy: once foreground is selected, never switch. + sawCompetingRunEvent = true; + filteredRunEventCount++; + if (streamMsg.type === 'result') { + ignoredNonForegroundResultCount++; + log.warn(`Ignoring non-foreground result event (seq=${seq}, key=${convKey}, runIds=${eventRunIds.join(',')}, expected=${expectedForegroundRunId}, source=${expectedForegroundRunSource || 'unknown'})`); } else { - filteredRunEventCount++; log.info(`Skipping non-foreground stream event (seq=${seq}, key=${convKey}, type=${streamMsg.type}, runIds=${eventRunIds.join(',')}, expected=${expectedForegroundRunId})`); - continue; } + continue; } receivedAnyData = true; @@ -1163,17 +1208,7 @@ export class LettaBot implements AgentSession { // Flush reasoning buffer when type changes away from reasoning if (isSemanticType && lastMsgType === 'reasoning' && streamMsg.type !== 'reasoning' && reasoningBuffer.trim()) { log.info(`Reasoning: ${reasoningBuffer.trim()}`); - if (this.config.display?.showReasoning && !suppressDelivery) { - try { - const reasoning = formatReasoningDisplay(reasoningBuffer, adapter.id, this.config.display?.reasoningMaxChars); - await adapter.sendMessage({ chatId: msg.chatId, text: reasoning.text, threadId: msg.threadId, parseMode: reasoning.parseMode }); - // Note: display messages don't set sentAnyMessage -- they're informational, - // not a substitute for an assistant response. Error handling and retry must - // still fire even if reasoning was displayed. - } catch (err) { - log.warn('Failed to send reasoning display:', err instanceof Error ? err.message : err); - } - } + await sendReasoningDisplay(reasoningBuffer); reasoningBuffer = ''; } @@ -1196,14 +1231,7 @@ export class LettaBot implements AgentSession { log.info(`>>> TOOL CALL: ${tcName} (id: ${tcId})`); sawNonAssistantSinceLastUuid = true; // Display tool call (args are fully accumulated by dedupedStream buffer-and-flush) - if (this.config.display?.showToolCalls && !suppressDelivery) { - try { - const text = formatToolCallDisplay(streamMsg); - await adapter.sendMessage({ chatId: msg.chatId, text, threadId: msg.threadId }); - } catch (err) { - log.warn('Failed to send tool call display:', err instanceof Error ? err.message : err); - } - } + await sendToolCallDisplay(streamMsg); } else if (streamMsg.type === 'tool_result') { log.info(`<<< TOOL RESULT: error=${streamMsg.isError}, len=${(streamMsg as any).content?.length || 0}`); sawNonAssistantSinceLastUuid = true; @@ -1321,6 +1349,8 @@ export class LettaBot implements AgentSession { break; } + sawForegroundResult = true; + const resultText = typeof streamMsg.result === 'string' ? streamMsg.result : ''; if (resultText.trim().length > 0) { const streamedTextTrimmed = streamedAssistantText.trim(); @@ -1359,6 +1389,12 @@ export class LettaBot implements AgentSession { if (filteredRunEventCount > 0) { log.info(`Filtered ${filteredRunEventCount} non-foreground event(s) from stream (seq=${seq}, key=${convKey}, expectedRunId=${expectedForegroundRunId || 'unknown'})`); } + if (ignoredNonForegroundResultCount > 0) { + log.info(`Ignored ${ignoredNonForegroundResultCount} non-foreground result event(s) (seq=${seq}, key=${convKey}, expectedRunId=${expectedForegroundRunId || 'unknown'})`); + } + if (bufferedDisplayFlushCount > 0 || bufferedDisplayDropCount > 0) { + log.info(`Buffered display events: flushed=${bufferedDisplayFlushCount}, dropped=${bufferedDisplayDropCount} (seq=${seq}, key=${convKey}, expectedRunId=${expectedForegroundRunId || 'unknown'})`); + } if (streamMsg.error) { const detail = resultText.trim(); const parts = [`error=${streamMsg.error}`]; @@ -1512,6 +1548,24 @@ export class LettaBot implements AgentSession { return; } + const missingForegroundTerminalResult = + expectedForegroundRunId !== null && + !sawForegroundResult && + sawCompetingRunEvent && + !sentAnyMessage; + + if (missingForegroundTerminalResult) { + log.warn(`Foreground run ended without terminal result after competing run activity (seq=${seq}, key=${convKey}, expectedRunId=${expectedForegroundRunId})`); + this.sessionManager.invalidateSession(convKey); + session = null; + response = ''; + reasoningBuffer = ''; + if (!retried) { + return this.processMessage(msg, adapter, true); + } + response = '(The agent stream ended before a foreground result was received. Please try again.)'; + } + // Parse and execute XML directives (e.g. ) await parseAndHandleDirectives(); diff --git a/src/core/result-guard.test.ts b/src/core/result-guard.test.ts index 2bfd5ec..d537f15 100644 --- a/src/core/result-guard.test.ts +++ b/src/core/result-guard.test.ts @@ -158,4 +158,146 @@ describe('result divergence guard', () => { expect(lastSent).not.toContain('Evaluating response protocol'); expect(lastSent).toMatch(/\(.*\)/); // Parenthesized system message }); + + it('ignores non-foreground result events and waits for the foreground result', async () => { + const bot = new LettaBot({ + workingDir: workDir, + allowedTools: [], + }); + + const adapter = { + id: 'mock', + name: 'Mock', + start: vi.fn(async () => {}), + stop: vi.fn(async () => {}), + isRunning: vi.fn(() => true), + sendMessage: vi.fn(async (_msg: OutboundMessage) => ({ messageId: 'msg-1' })), + editMessage: vi.fn(async () => {}), + sendTypingIndicator: vi.fn(async () => {}), + stopTypingIndicator: vi.fn(async () => {}), + supportsEditing: vi.fn(() => false), + sendFile: vi.fn(async () => ({ messageId: 'file-1' })), + }; + + (bot as any).sessionManager.runSession = vi.fn(async () => ({ + session: { abort: vi.fn(async () => {}) }, + stream: async function* () { + yield { type: 'assistant', content: 'main ', runId: 'run-main' }; + yield { type: 'assistant', content: 'background', runId: 'run-bg' }; + yield { type: 'result', success: true, result: 'background final', runIds: ['run-bg'] }; + yield { type: 'assistant', content: 'reply', runId: 'run-main' }; + yield { type: 'result', success: true, result: 'main reply', runIds: ['run-main'] }; + }, + })); + + const msg: InboundMessage = { + channel: 'discord', + chatId: 'chat-1', + userId: 'user-1', + text: 'hello', + timestamp: new Date(), + }; + + await (bot as any).processMessage(msg, adapter); + + const sentTexts = adapter.sendMessage.mock.calls.map(([payload]) => payload.text); + expect(sentTexts).toEqual(['main reply']); + }); + + it('buffers pre-foreground run-scoped display events and drops non-foreground buffers', async () => { + const bot = new LettaBot({ + workingDir: workDir, + allowedTools: [], + display: { showReasoning: true, showToolCalls: true }, + }); + + const adapter = { + id: 'mock', + name: 'Mock', + start: vi.fn(async () => {}), + stop: vi.fn(async () => {}), + isRunning: vi.fn(() => true), + sendMessage: vi.fn(async (_msg: OutboundMessage) => ({ messageId: 'msg-1' })), + editMessage: vi.fn(async () => {}), + sendTypingIndicator: vi.fn(async () => {}), + stopTypingIndicator: vi.fn(async () => {}), + supportsEditing: vi.fn(() => false), + sendFile: vi.fn(async () => ({ messageId: 'file-1' })), + }; + + (bot as any).sessionManager.runSession = vi.fn(async () => ({ + session: { abort: vi.fn(async () => {}) }, + stream: async function* () { + yield { type: 'reasoning', content: 'background-thinking', runId: 'run-bg' }; + yield { type: 'tool_call', toolCallId: 'tc-bg', toolName: 'Bash', toolInput: { command: 'echo leak' }, runId: 'run-bg' }; + yield { type: 'assistant', content: 'main reply', runId: 'run-main' }; + yield { type: 'result', success: true, result: 'main reply', runIds: ['run-main'] }; + }, + })); + + const msg: InboundMessage = { + channel: 'discord', + chatId: 'chat-1', + userId: 'user-1', + text: 'hello', + timestamp: new Date(), + }; + + await (bot as any).processMessage(msg, adapter); + + const sentTexts = adapter.sendMessage.mock.calls.map(([payload]) => payload.text); + expect(sentTexts).toEqual(['main reply']); + }); + + it('retries once when a competing result arrives before any foreground terminal result', async () => { + const bot = new LettaBot({ + workingDir: workDir, + allowedTools: [], + }); + + const adapter = { + id: 'mock', + name: 'Mock', + start: vi.fn(async () => {}), + stop: vi.fn(async () => {}), + isRunning: vi.fn(() => true), + sendMessage: vi.fn(async (_msg: OutboundMessage) => ({ messageId: 'msg-1' })), + editMessage: vi.fn(async () => {}), + sendTypingIndicator: vi.fn(async () => {}), + stopTypingIndicator: vi.fn(async () => {}), + supportsEditing: vi.fn(() => false), + sendFile: vi.fn(async () => ({ messageId: 'file-1' })), + }; + + const runSession = vi.fn(); + runSession.mockResolvedValueOnce({ + session: { abort: vi.fn(async () => {}) }, + stream: async function* () { + yield { type: 'assistant', content: 'partial foreground', runId: 'run-main' }; + yield { type: 'result', success: true, result: 'background final', runIds: ['run-bg'] }; + }, + }); + runSession.mockResolvedValueOnce({ + session: { abort: vi.fn(async () => {}) }, + stream: async function* () { + yield { type: 'assistant', content: 'main reply', runId: 'run-main' }; + yield { type: 'result', success: true, result: 'main reply', runIds: ['run-main'] }; + }, + }); + (bot as any).sessionManager.runSession = runSession; + + const msg: InboundMessage = { + channel: 'discord', + chatId: 'chat-1', + userId: 'user-1', + text: 'hello', + timestamp: new Date(), + }; + + await (bot as any).processMessage(msg, adapter); + + expect(runSession).toHaveBeenCalledTimes(2); + const sentTexts = adapter.sendMessage.mock.calls.map(([payload]) => payload.text); + expect(sentTexts).toEqual(['main reply']); + }); }); diff --git a/src/core/types.ts b/src/core/types.ts index 44aff4d..8092195 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -196,6 +196,7 @@ export interface StreamMsg { toolCallId?: string; toolName?: string; uuid?: string; + runId?: string; isError?: boolean; result?: string; runIds?: string[];