fix: prevent non-foreground run events leaking into user response stream (#513)

This commit is contained in:
Cameron
2026-03-06 11:51:05 -08:00
committed by GitHub
parent 761de6d716
commit e80c5b4dd6
5 changed files with 269 additions and 72 deletions

2
package-lock.json generated
View File

@@ -12,7 +12,7 @@
"@clack/prompts": "^0.11.0",
"@hapi/boom": "^10.0.1",
"@letta-ai/letta-client": "^1.7.11",
"@letta-ai/letta-code-sdk": "^0.1.10",
"@letta-ai/letta-code-sdk": "^0.1.9",
"@types/express": "^5.0.6",
"@types/node": "^25.0.10",
"@types/node-schedule": "^2.1.8",

View File

@@ -67,7 +67,7 @@
"@clack/prompts": "^0.11.0",
"@hapi/boom": "^10.0.1",
"@letta-ai/letta-client": "^1.7.11",
"@letta-ai/letta-code-sdk": "^0.1.10",
"@letta-ai/letta-code-sdk": "^0.1.9",
"@types/express": "^5.0.6",
"@types/node": "^25.0.10",
"@types/node-schedule": "^2.1.8",

View File

@@ -187,7 +187,7 @@ export function resolveHeartbeatConversationKey(
}
export class LettaBot implements AgentSession {
readonly store: Store;
private store: Store;
private config: BotConfig;
private channels: Map<string, ChannelAdapter> = new Map();
private messageQueue: Array<{ msg: InboundMessage; adapter: ChannelAdapter }> = [];
@@ -269,6 +269,7 @@ export class LettaBot implements AgentSession {
private normalizeResultRunIds(msg: StreamMsg): string[] {
const runIds = this.normalizeStreamRunIds(msg);
if (runIds.length === 0) return [];
return [...new Set(runIds)].sort();
}
@@ -484,14 +485,6 @@ export class LettaBot implements AgentSession {
return this.sessionManager.warmSession();
}
/**
* Invalidate the live session for a conversation key.
* The next message will create a fresh session using the current store state.
*/
invalidateSession(key?: string): void {
this.sessionManager.invalidateSession(key);
}
// =========================================================================
// Channel management
// =========================================================================
@@ -675,26 +668,6 @@ export class LettaBot implements AgentSession {
lines.push('', 'Use `/model <handle>` to switch.');
return lines.join('\n');
}
case 'setconv': {
if (!args?.trim()) {
return 'Usage: /setconv <conversation-id>';
}
const newConvId = args.trim();
const convKey = channelId ? this.resolveConversationKey(channelId, chatId) : 'shared';
if (convKey === 'default') {
return 'Conversations are disabled -- cannot set conversation ID.';
}
if (convKey === 'shared') {
this.store.conversationId = newConvId;
} else {
this.store.setConversationId(convKey, newConvId);
}
this.sessionManager.invalidateSession(convKey);
log.info(`/setconv - conversation set to ${newConvId} for key="${convKey}"`);
return `Conversation set to: ${newConvId}`;
}
default:
return null;
}
@@ -1022,8 +995,17 @@ export class LettaBot implements AgentSession {
let reasoningBuffer = '';
let expectedForegroundRunId: string | null = null;
let expectedForegroundRunSource: 'assistant' | 'result' | null = null;
let foregroundRunSwitchCount = 0;
let sawCompetingRunEvent = false;
let sawForegroundResult = false;
let filteredRunEventCount = 0;
let ignoredNonForegroundResultCount = 0;
let bufferedDisplayFlushed = false;
let bufferedDisplayFlushCount = 0;
let bufferedDisplayDropCount = 0;
const bufferedDisplayEvents: Array<
| { kind: 'reasoning'; runId: string; content: string }
| { kind: 'tool_call'; runId: string; msg: StreamMsg }
> = [];
const msgTypeCounts: Record<string, number> = {};
const parseAndHandleDirectives = async () => {
@@ -1041,6 +1023,72 @@ export class LettaBot implements AgentSession {
sentAnyMessage = true;
}
};
const sendReasoningDisplay = async (content: string) => {
if (!this.config.display?.showReasoning || suppressDelivery || !content.trim()) return;
try {
const reasoning = formatReasoningDisplay(content, adapter.id, this.config.display?.reasoningMaxChars);
await adapter.sendMessage({
chatId: msg.chatId,
text: reasoning.text,
threadId: msg.threadId,
parseMode: reasoning.parseMode,
});
// Note: display messages don't set sentAnyMessage -- they're informational,
// not a substitute for an assistant response.
} catch (err) {
log.warn('Failed to send reasoning display:', err instanceof Error ? err.message : err);
}
};
const sendToolCallDisplay = async (toolMsg: StreamMsg) => {
if (!this.config.display?.showToolCalls || suppressDelivery) return;
try {
const text = formatToolCallDisplay(toolMsg);
await adapter.sendMessage({ chatId: msg.chatId, text, threadId: msg.threadId });
} catch (err) {
log.warn('Failed to send tool call display:', err instanceof Error ? err.message : err);
}
};
const bufferRunScopedDisplayEvent = (runId: string, streamMsg: StreamMsg) => {
if (streamMsg.type === 'reasoning') {
if (!this.config.display?.showReasoning) return;
const chunk = typeof streamMsg.content === 'string' ? streamMsg.content : '';
if (!chunk) return;
const lastEvent = bufferedDisplayEvents[bufferedDisplayEvents.length - 1];
if (lastEvent && lastEvent.kind === 'reasoning' && lastEvent.runId === runId) {
lastEvent.content += chunk;
} else {
bufferedDisplayEvents.push({ kind: 'reasoning', runId, content: chunk });
}
return;
}
if (streamMsg.type === 'tool_call') {
if (!this.config.display?.showToolCalls) return;
bufferedDisplayEvents.push({ kind: 'tool_call', runId, msg: streamMsg });
}
};
const flushBufferedDisplayEventsForRun = async (runId: string) => {
for (const event of bufferedDisplayEvents) {
if (event.runId !== runId) {
bufferedDisplayDropCount += 1;
continue;
}
if (event.kind === 'reasoning') {
await sendReasoningDisplay(event.content);
bufferedDisplayFlushCount += 1;
continue;
}
this.sessionManager.syncTodoToolCall(event.msg);
await sendToolCallDisplay(event.msg);
bufferedDisplayFlushCount += 1;
}
bufferedDisplayEvents.length = 0;
};
const finalizeMessage = async () => {
// Parse and execute XML directives before sending
@@ -1100,44 +1148,41 @@ export class LettaBot implements AgentSession {
break;
}
if (!firstChunkLogged) { lap('first stream chunk'); firstChunkLogged = true; }
const eventRunIds = this.normalizeStreamRunIds(streamMsg);
if (expectedForegroundRunId === null && eventRunIds.length > 0) {
if (streamMsg.type === 'assistant' || streamMsg.type === 'result') {
expectedForegroundRunId = eventRunIds[0];
expectedForegroundRunSource = streamMsg.type === 'assistant' ? 'assistant' : 'result';
log.info(`Selected foreground run for stream delivery (seq=${seq}, key=${convKey}, runId=${expectedForegroundRunId}, source=${streamMsg.type})`);
if (!bufferedDisplayFlushed && bufferedDisplayEvents.length > 0) {
await flushBufferedDisplayEventsForRun(expectedForegroundRunId);
bufferedDisplayFlushed = true;
}
} else {
// Do not lock to a run based on pre-assistant non-terminal events;
// these can belong to a concurrent background run.
const runId = eventRunIds[0];
if (runId && (streamMsg.type === 'reasoning' || streamMsg.type === 'tool_call')) {
bufferRunScopedDisplayEvent(runId, streamMsg);
filteredRunEventCount++;
log.info(`Buffering run-scoped pre-foreground display event (seq=${seq}, key=${convKey}, type=${streamMsg.type}, runId=${runId})`);
continue;
}
filteredRunEventCount++;
log.info(`Deferring run-scoped pre-foreground event (seq=${seq}, key=${convKey}, type=${streamMsg.type}, runIds=${eventRunIds.join(',')})`);
continue;
}
} else if (expectedForegroundRunId && eventRunIds.length > 0 && !eventRunIds.includes(expectedForegroundRunId)) {
const canSafelySwitchForeground = !sentAnyMessage || messageId !== null;
if (streamMsg.type === 'result'
&& foregroundRunSwitchCount === 0
&& canSafelySwitchForeground) {
const previousRunId = expectedForegroundRunId;
const previousRunSource = expectedForegroundRunSource;
expectedForegroundRunId = eventRunIds[0];
expectedForegroundRunSource = 'result';
foregroundRunSwitchCount += 1;
// Drop any state collected from the previous run so it cannot
// flush to user-facing delivery after the switch.
response = '';
reasoningBuffer = '';
streamedAssistantText = '';
lastMsgType = null;
lastAssistantUuid = null;
sawNonAssistantSinceLastUuid = false;
log.warn(`Switching foreground run at result boundary (seq=${seq}, key=${convKey}, from=${previousRunId}, to=${expectedForegroundRunId}, prevSource=${previousRunSource || 'unknown'})`);
// Strict no-rebind policy: once foreground is selected, never switch.
sawCompetingRunEvent = true;
filteredRunEventCount++;
if (streamMsg.type === 'result') {
ignoredNonForegroundResultCount++;
log.warn(`Ignoring non-foreground result event (seq=${seq}, key=${convKey}, runIds=${eventRunIds.join(',')}, expected=${expectedForegroundRunId}, source=${expectedForegroundRunSource || 'unknown'})`);
} else {
filteredRunEventCount++;
log.info(`Skipping non-foreground stream event (seq=${seq}, key=${convKey}, type=${streamMsg.type}, runIds=${eventRunIds.join(',')}, expected=${expectedForegroundRunId})`);
continue;
}
continue;
}
receivedAnyData = true;
@@ -1163,17 +1208,7 @@ export class LettaBot implements AgentSession {
// Flush reasoning buffer when type changes away from reasoning
if (isSemanticType && lastMsgType === 'reasoning' && streamMsg.type !== 'reasoning' && reasoningBuffer.trim()) {
log.info(`Reasoning: ${reasoningBuffer.trim()}`);
if (this.config.display?.showReasoning && !suppressDelivery) {
try {
const reasoning = formatReasoningDisplay(reasoningBuffer, adapter.id, this.config.display?.reasoningMaxChars);
await adapter.sendMessage({ chatId: msg.chatId, text: reasoning.text, threadId: msg.threadId, parseMode: reasoning.parseMode });
// Note: display messages don't set sentAnyMessage -- they're informational,
// not a substitute for an assistant response. Error handling and retry must
// still fire even if reasoning was displayed.
} catch (err) {
log.warn('Failed to send reasoning display:', err instanceof Error ? err.message : err);
}
}
await sendReasoningDisplay(reasoningBuffer);
reasoningBuffer = '';
}
@@ -1196,14 +1231,7 @@ export class LettaBot implements AgentSession {
log.info(`>>> TOOL CALL: ${tcName} (id: ${tcId})`);
sawNonAssistantSinceLastUuid = true;
// Display tool call (args are fully accumulated by dedupedStream buffer-and-flush)
if (this.config.display?.showToolCalls && !suppressDelivery) {
try {
const text = formatToolCallDisplay(streamMsg);
await adapter.sendMessage({ chatId: msg.chatId, text, threadId: msg.threadId });
} catch (err) {
log.warn('Failed to send tool call display:', err instanceof Error ? err.message : err);
}
}
await sendToolCallDisplay(streamMsg);
} else if (streamMsg.type === 'tool_result') {
log.info(`<<< TOOL RESULT: error=${streamMsg.isError}, len=${(streamMsg as any).content?.length || 0}`);
sawNonAssistantSinceLastUuid = true;
@@ -1321,6 +1349,8 @@ export class LettaBot implements AgentSession {
break;
}
sawForegroundResult = true;
const resultText = typeof streamMsg.result === 'string' ? streamMsg.result : '';
if (resultText.trim().length > 0) {
const streamedTextTrimmed = streamedAssistantText.trim();
@@ -1359,6 +1389,12 @@ export class LettaBot implements AgentSession {
if (filteredRunEventCount > 0) {
log.info(`Filtered ${filteredRunEventCount} non-foreground event(s) from stream (seq=${seq}, key=${convKey}, expectedRunId=${expectedForegroundRunId || 'unknown'})`);
}
if (ignoredNonForegroundResultCount > 0) {
log.info(`Ignored ${ignoredNonForegroundResultCount} non-foreground result event(s) (seq=${seq}, key=${convKey}, expectedRunId=${expectedForegroundRunId || 'unknown'})`);
}
if (bufferedDisplayFlushCount > 0 || bufferedDisplayDropCount > 0) {
log.info(`Buffered display events: flushed=${bufferedDisplayFlushCount}, dropped=${bufferedDisplayDropCount} (seq=${seq}, key=${convKey}, expectedRunId=${expectedForegroundRunId || 'unknown'})`);
}
if (streamMsg.error) {
const detail = resultText.trim();
const parts = [`error=${streamMsg.error}`];
@@ -1512,6 +1548,24 @@ export class LettaBot implements AgentSession {
return;
}
const missingForegroundTerminalResult =
expectedForegroundRunId !== null &&
!sawForegroundResult &&
sawCompetingRunEvent &&
!sentAnyMessage;
if (missingForegroundTerminalResult) {
log.warn(`Foreground run ended without terminal result after competing run activity (seq=${seq}, key=${convKey}, expectedRunId=${expectedForegroundRunId})`);
this.sessionManager.invalidateSession(convKey);
session = null;
response = '';
reasoningBuffer = '';
if (!retried) {
return this.processMessage(msg, adapter, true);
}
response = '(The agent stream ended before a foreground result was received. Please try again.)';
}
// Parse and execute XML directives (e.g. <actions><react emoji="eyes" /></actions>)
await parseAndHandleDirectives();

View File

@@ -158,4 +158,146 @@ describe('result divergence guard', () => {
expect(lastSent).not.toContain('Evaluating response protocol');
expect(lastSent).toMatch(/\(.*\)/); // Parenthesized system message
});
it('ignores non-foreground result events and waits for the foreground result', async () => {
const bot = new LettaBot({
workingDir: workDir,
allowedTools: [],
});
const adapter = {
id: 'mock',
name: 'Mock',
start: vi.fn(async () => {}),
stop: vi.fn(async () => {}),
isRunning: vi.fn(() => true),
sendMessage: vi.fn(async (_msg: OutboundMessage) => ({ messageId: 'msg-1' })),
editMessage: vi.fn(async () => {}),
sendTypingIndicator: vi.fn(async () => {}),
stopTypingIndicator: vi.fn(async () => {}),
supportsEditing: vi.fn(() => false),
sendFile: vi.fn(async () => ({ messageId: 'file-1' })),
};
(bot as any).sessionManager.runSession = vi.fn(async () => ({
session: { abort: vi.fn(async () => {}) },
stream: async function* () {
yield { type: 'assistant', content: 'main ', runId: 'run-main' };
yield { type: 'assistant', content: 'background', runId: 'run-bg' };
yield { type: 'result', success: true, result: 'background final', runIds: ['run-bg'] };
yield { type: 'assistant', content: 'reply', runId: 'run-main' };
yield { type: 'result', success: true, result: 'main reply', runIds: ['run-main'] };
},
}));
const msg: InboundMessage = {
channel: 'discord',
chatId: 'chat-1',
userId: 'user-1',
text: 'hello',
timestamp: new Date(),
};
await (bot as any).processMessage(msg, adapter);
const sentTexts = adapter.sendMessage.mock.calls.map(([payload]) => payload.text);
expect(sentTexts).toEqual(['main reply']);
});
it('buffers pre-foreground run-scoped display events and drops non-foreground buffers', async () => {
const bot = new LettaBot({
workingDir: workDir,
allowedTools: [],
display: { showReasoning: true, showToolCalls: true },
});
const adapter = {
id: 'mock',
name: 'Mock',
start: vi.fn(async () => {}),
stop: vi.fn(async () => {}),
isRunning: vi.fn(() => true),
sendMessage: vi.fn(async (_msg: OutboundMessage) => ({ messageId: 'msg-1' })),
editMessage: vi.fn(async () => {}),
sendTypingIndicator: vi.fn(async () => {}),
stopTypingIndicator: vi.fn(async () => {}),
supportsEditing: vi.fn(() => false),
sendFile: vi.fn(async () => ({ messageId: 'file-1' })),
};
(bot as any).sessionManager.runSession = vi.fn(async () => ({
session: { abort: vi.fn(async () => {}) },
stream: async function* () {
yield { type: 'reasoning', content: 'background-thinking', runId: 'run-bg' };
yield { type: 'tool_call', toolCallId: 'tc-bg', toolName: 'Bash', toolInput: { command: 'echo leak' }, runId: 'run-bg' };
yield { type: 'assistant', content: 'main reply', runId: 'run-main' };
yield { type: 'result', success: true, result: 'main reply', runIds: ['run-main'] };
},
}));
const msg: InboundMessage = {
channel: 'discord',
chatId: 'chat-1',
userId: 'user-1',
text: 'hello',
timestamp: new Date(),
};
await (bot as any).processMessage(msg, adapter);
const sentTexts = adapter.sendMessage.mock.calls.map(([payload]) => payload.text);
expect(sentTexts).toEqual(['main reply']);
});
it('retries once when a competing result arrives before any foreground terminal result', async () => {
const bot = new LettaBot({
workingDir: workDir,
allowedTools: [],
});
const adapter = {
id: 'mock',
name: 'Mock',
start: vi.fn(async () => {}),
stop: vi.fn(async () => {}),
isRunning: vi.fn(() => true),
sendMessage: vi.fn(async (_msg: OutboundMessage) => ({ messageId: 'msg-1' })),
editMessage: vi.fn(async () => {}),
sendTypingIndicator: vi.fn(async () => {}),
stopTypingIndicator: vi.fn(async () => {}),
supportsEditing: vi.fn(() => false),
sendFile: vi.fn(async () => ({ messageId: 'file-1' })),
};
const runSession = vi.fn();
runSession.mockResolvedValueOnce({
session: { abort: vi.fn(async () => {}) },
stream: async function* () {
yield { type: 'assistant', content: 'partial foreground', runId: 'run-main' };
yield { type: 'result', success: true, result: 'background final', runIds: ['run-bg'] };
},
});
runSession.mockResolvedValueOnce({
session: { abort: vi.fn(async () => {}) },
stream: async function* () {
yield { type: 'assistant', content: 'main reply', runId: 'run-main' };
yield { type: 'result', success: true, result: 'main reply', runIds: ['run-main'] };
},
});
(bot as any).sessionManager.runSession = runSession;
const msg: InboundMessage = {
channel: 'discord',
chatId: 'chat-1',
userId: 'user-1',
text: 'hello',
timestamp: new Date(),
};
await (bot as any).processMessage(msg, adapter);
expect(runSession).toHaveBeenCalledTimes(2);
const sentTexts = adapter.sendMessage.mock.calls.map(([payload]) => payload.text);
expect(sentTexts).toEqual(['main reply']);
});
});

View File

@@ -196,6 +196,7 @@ export interface StreamMsg {
toolCallId?: string;
toolName?: string;
uuid?: string;
runId?: string;
isError?: boolean;
result?: string;
runIds?: string[];