From 13426396ac46a14d8c89a578271652bfe5fd6430 Mon Sep 17 00:00:00 2001 From: Cameron Date: Wed, 11 Mar 2026 16:22:26 -0700 Subject: [PATCH] refactor: extract DisplayPipeline from processMessage stream loop (#550) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extracts a DisplayPipeline async generator that wraps the raw SDK stream and yields clean DisplayEvent types. Refactors processMessage() to consume pipeline events instead of raw StreamMsg objects. - Locks foreground on first substantive event (reasoning/tool_call/etc), eliminating buffering delay for real-time display - Filters pre-foreground error/retry events to prevent false approval recovery - Re-throws 429 in rejectApproval to prevent rate-limit loops - Gates reasoning log on display config - 12 pipeline unit tests + updated integration tests (56 total) - Net -224 lines from bot.ts Written by Cameron ◯ Letta Code "The purpose of abstraction is not to be vague, but to create a new semantic level in which one can be absolutely precise." -- Edsger Dijkstra --- src/core/bot-logging.test.ts | 15 +- src/core/bot.ts | 896 ++++++++++---------------- src/core/display-pipeline.test.ts | 211 ++++++ src/core/display-pipeline.ts | 360 +++++++++++ src/core/result-guard.test.ts | 68 +- src/core/sdk-session-contract.test.ts | 75 +++ src/tools/letta-api.ts | 6 + 7 files changed, 1062 insertions(+), 569 deletions(-) create mode 100644 src/core/display-pipeline.test.ts create mode 100644 src/core/display-pipeline.ts diff --git a/src/core/bot-logging.test.ts b/src/core/bot-logging.test.ts index 1cf2a73..d577688 100644 --- a/src/core/bot-logging.test.ts +++ b/src/core/bot-logging.test.ts @@ -82,18 +82,9 @@ describe('stream logging levels', () => { const infoMessages = loggerSpies.info.mock.calls.map(([message]) => String(message)); const traceMessages = loggerSpies.trace.mock.calls.map(([message]) => String(message)); - expect(debugMessages.some((m) => m.includes('Buffering run-scoped pre-foreground display event'))).toBe(false); - expect(debugMessages.some((m) => m.includes('Deferring run-scoped pre-foreground event'))).toBe(false); - expect(debugMessages.some((m) => m.includes('Skipping non-foreground stream event'))).toBe(false); - - expect(infoMessages.some((m) => m.includes('type=tool_call'))).toBe(false); - expect(infoMessages.some((m) => m.includes('type=tool_result'))).toBe(false); - - expect(traceMessages.some((m) => m.includes('Buffering run-scoped pre-foreground display event'))).toBe(true); - expect(traceMessages.some((m) => m.includes('Skipping non-foreground stream event'))).toBe(true); - expect(traceMessages.some((m) => m.includes('type=tool_call'))).toBe(true); - expect(traceMessages.some((m) => m.includes('type=tool_result'))).toBe(true); - + // Run ID filtering now handled by DisplayPipeline; verify summary log is emitted at info level expect(infoMessages.some((m) => m.includes('Filtered') && m.includes('non-foreground event(s)'))).toBe(true); + // Foreground run locking is logged at info level + expect(infoMessages.some((m) => m.includes('Foreground run locked'))).toBe(true); }); }); \ No newline at end of file diff --git a/src/core/bot.ts b/src/core/bot.ts index b51f831..622a191 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -23,6 +23,7 @@ import { redactOutbound } from './redact.js'; import { parseDirectives, stripActionsBlock, type Directive } from './directives.js'; import { resolveEmoji } from './emoji.js'; import { SessionManager } from './session-manager.js'; +import { createDisplayPipeline, type DisplayEvent, type CompleteEvent, type ErrorEvent } from './display-pipeline.js'; import { createLogger } from '../logger.js'; @@ -1181,38 +1182,26 @@ export class LettaBot implements AgentSession { lap('session send'); session = run.session; - // Stream response with delivery + // Stream response with delivery via DisplayPipeline let response = ''; - let lastUpdate = 0; // Start at 0 so the first streaming edit fires immediately - let rateLimitedUntil = 0; // Timestamp until which we should avoid API calls (429 backoff) + let lastUpdate = 0; + let rateLimitedUntil = 0; let messageId: string | null = null; - let lastMsgType: string | null = null; let lastAssistantUuid: string | null = null; let sentAnyMessage = false; let receivedAnyData = false; let sawNonAssistantSinceLastUuid = false; let lastErrorDetail: StreamErrorDetail | null = null; let retryInfo: { attempt: number; maxAttempts: number; reason: string } | null = null; - let reasoningBuffer = ''; - let expectedForegroundRunId: string | null = null; - let expectedForegroundRunSource: 'assistant' | 'result' | null = null; - let sawCompetingRunEvent = false; let sawForegroundResult = false; - let filteredRunEventCount = 0; - let ignoredNonForegroundResultCount = 0; - let bufferedDisplayFlushed = false; - let bufferedDisplayFlushCount = 0; - let bufferedDisplayDropCount = 0; - const bufferedDisplayEvents: Array< - | { kind: 'reasoning'; runId: string; content: string } - | { kind: 'tool_call'; runId: string; msg: StreamMsg } - > = []; const msgTypeCounts: Record = {}; const bashCommandByToolCallId = new Map(); let lastBashCommand = ''; let repeatedBashFailureKey: string | null = null; let repeatedBashFailureCount = 0; const maxRepeatedBashFailures = 3; + let lastEventType: string | null = null; + let abortedWithMessage = false; const parseAndHandleDirectives = async () => { if (!response.trim()) return; @@ -1230,77 +1219,9 @@ export class LettaBot implements AgentSession { } }; - const sendReasoningDisplay = async (content: string) => { - if (!this.config.display?.showReasoning || suppressDelivery || !content.trim()) return; - try { - const reasoning = formatReasoningDisplay(content, adapter.id, this.config.display?.reasoningMaxChars); - await adapter.sendMessage({ - chatId: msg.chatId, - text: reasoning.text, - threadId: msg.threadId, - parseMode: reasoning.parseMode, - }); - // Note: display messages don't set sentAnyMessage -- they're informational, - // not a substitute for an assistant response. - } catch (err) { - log.warn('Failed to send reasoning display:', err instanceof Error ? err.message : err); - } - }; - - const sendToolCallDisplay = async (toolMsg: StreamMsg) => { - if (!this.config.display?.showToolCalls || suppressDelivery) return; - try { - const text = formatToolCallDisplay(toolMsg); - await adapter.sendMessage({ chatId: msg.chatId, text, threadId: msg.threadId }); - } catch (err) { - log.warn('Failed to send tool call display:', err instanceof Error ? err.message : err); - } - }; - - const bufferRunScopedDisplayEvent = (runId: string, streamMsg: StreamMsg) => { - if (streamMsg.type === 'reasoning') { - if (!this.config.display?.showReasoning) return; - const chunk = typeof streamMsg.content === 'string' ? streamMsg.content : ''; - if (!chunk) return; - const lastEvent = bufferedDisplayEvents[bufferedDisplayEvents.length - 1]; - if (lastEvent && lastEvent.kind === 'reasoning' && lastEvent.runId === runId) { - lastEvent.content += chunk; - } else { - bufferedDisplayEvents.push({ kind: 'reasoning', runId, content: chunk }); - } - return; - } - - if (streamMsg.type === 'tool_call') { - if (!this.config.display?.showToolCalls) return; - bufferedDisplayEvents.push({ kind: 'tool_call', runId, msg: streamMsg }); - } - }; - - const flushBufferedDisplayEventsForRun = async (runId: string) => { - for (const event of bufferedDisplayEvents) { - if (event.runId !== runId) { - bufferedDisplayDropCount += 1; - continue; - } - if (event.kind === 'reasoning') { - await sendReasoningDisplay(event.content); - bufferedDisplayFlushCount += 1; - continue; - } - - this.sessionManager.syncTodoToolCall(event.msg); - await sendToolCallDisplay(event.msg); - bufferedDisplayFlushCount += 1; - } - bufferedDisplayEvents.length = 0; - }; - const finalizeMessage = async () => { - // Parse and execute XML directives before sending await parseAndHandleDirectives(); - // Check for no-reply AFTER directive parsing if (response.trim() === '') { log.info('Agent chose not to reply (no-reply marker)'); sentAnyMessage = true; @@ -1311,7 +1232,6 @@ export class LettaBot implements AgentSession { } if (!suppressDelivery && response.trim()) { - // Wait out any active rate limit before sending const rlRemaining = rateLimitedUntil - Date.now(); if (rlRemaining > 0) { const waitMs = Math.min(rlRemaining, 30_000); @@ -1328,7 +1248,6 @@ export class LettaBot implements AgentSession { sentAnyMessage = true; } catch (finalizeErr) { if (messageId) { - // Edit failed but original message was already visible sentAnyMessage = true; } else { log.warn('finalizeMessage send failed:', finalizeErr instanceof Error ? finalizeErr.message : finalizeErr); @@ -1339,477 +1258,354 @@ export class LettaBot implements AgentSession { messageId = null; lastUpdate = Date.now(); }; - + const typingInterval = setInterval(() => { adapter.sendTypingIndicator(msg.chatId).catch(() => {}); }, 4000); - + try { let firstChunkLogged = false; - let streamedAssistantText = ''; - for await (const streamMsg of run.stream()) { - // Check for /cancel before processing each chunk + const pipeline = createDisplayPipeline(run.stream(), { + convKey, + resultFingerprints: this.lastResultRunFingerprints, + }); + + for await (const event of pipeline) { + // Check for /cancel before processing each event if (this.cancelledKeys.has(convKey)) { log.info(`Stream cancelled by /cancel (key=${convKey})`); break; } if (!firstChunkLogged) { lap('first stream chunk'); firstChunkLogged = true; } - const eventRunIds = this.normalizeStreamRunIds(streamMsg); - if (expectedForegroundRunId === null && eventRunIds.length > 0) { - if (streamMsg.type === 'assistant' || streamMsg.type === 'result') { - expectedForegroundRunId = eventRunIds[0]; - expectedForegroundRunSource = streamMsg.type === 'assistant' ? 'assistant' : 'result'; - log.info(`Selected foreground run for stream delivery (seq=${seq}, key=${convKey}, runId=${expectedForegroundRunId}, source=${streamMsg.type})`); - if (!bufferedDisplayFlushed && bufferedDisplayEvents.length > 0) { - await flushBufferedDisplayEventsForRun(expectedForegroundRunId); - bufferedDisplayFlushed = true; - } - } else { - // Do not lock to a run based on pre-assistant non-terminal events; - // these can belong to a concurrent background run. - const runId = eventRunIds[0]; - if (runId && (streamMsg.type === 'reasoning' || streamMsg.type === 'tool_call')) { - bufferRunScopedDisplayEvent(runId, streamMsg); - filteredRunEventCount++; - log.trace(`Buffering run-scoped pre-foreground display event (seq=${seq}, key=${convKey}, type=${streamMsg.type}, runId=${runId})`); - continue; - } - filteredRunEventCount++; - log.trace(`Deferring run-scoped pre-foreground event (seq=${seq}, key=${convKey}, type=${streamMsg.type}, runIds=${eventRunIds.join(',')})`); - continue; - } - } else if (expectedForegroundRunId && eventRunIds.length > 0 && !eventRunIds.includes(expectedForegroundRunId)) { - // After a tool call the Letta server may assign a new run ID for - // the continuation. Rebind on assistant events -- background Task - // agents run in separate sessions and don't produce assistant - // events in the foreground stream. Other event types (reasoning, - // tool_call, result) from different runs are still filtered to - // prevent background Task output leaking into user display (#482). - if (streamMsg.type === 'assistant') { - const newRunId = eventRunIds[0]; - log.info(`Rebinding foreground run: ${expectedForegroundRunId} -> ${newRunId} (seq=${seq}, key=${convKey}, type=${streamMsg.type})`); - expectedForegroundRunId = newRunId; - expectedForegroundRunSource = 'assistant'; - // Flush any buffered display events for the new run. - if (bufferedDisplayEvents.length > 0) { - await flushBufferedDisplayEventsForRun(newRunId); - } - // Fall through to normal processing - } else { - sawCompetingRunEvent = true; - filteredRunEventCount++; - if (streamMsg.type === 'result') { - ignoredNonForegroundResultCount++; - log.warn(`Ignoring non-foreground result event (seq=${seq}, key=${convKey}, runIds=${eventRunIds.join(',')}, expected=${expectedForegroundRunId})`); - } else { - log.trace(`Skipping non-foreground stream event (seq=${seq}, key=${convKey}, type=${streamMsg.type}, runIds=${eventRunIds.join(',')}, expected=${expectedForegroundRunId})`); - } - continue; - } - } - receivedAnyData = true; - msgTypeCounts[streamMsg.type] = (msgTypeCounts[streamMsg.type] || 0) + 1; - - log.trace(`type=${streamMsg.type} ${JSON.stringify(streamMsg).slice(0, 300)}`); - - // stream_event is a low-level streaming primitive (partial deltas), not a - // semantic type change. Skip it for type-transition logic so it doesn't - // prematurely flush reasoning buffers or finalize assistant messages. - const isSemanticType = streamMsg.type !== 'stream_event'; + msgTypeCounts[event.type] = (msgTypeCounts[event.type] || 0) + 1; - // Finalize on type change (avoid double-handling when result provides full response) - if (isSemanticType && lastMsgType && lastMsgType !== streamMsg.type && response.trim() && streamMsg.type !== 'result') { - await finalizeMessage(); - } - - // Flush reasoning buffer when type changes away from reasoning - if (isSemanticType && lastMsgType === 'reasoning' && streamMsg.type !== 'reasoning' && reasoningBuffer.trim()) { - log.info(`Reasoning: ${reasoningBuffer.trim()}`); - await sendReasoningDisplay(reasoningBuffer); - reasoningBuffer = ''; - } - - // (Tool call displays fire immediately in the tool_call handler below.) - - // Tool loop detection - const maxToolCalls = this.config.maxToolCalls ?? 100; - if (streamMsg.type === 'tool_call' && (msgTypeCounts['tool_call'] || 0) >= maxToolCalls) { - log.error(`Agent stuck in tool loop (${msgTypeCounts['tool_call']} calls), aborting`); - session.abort().catch(() => {}); - response = '(Agent got stuck in a tool loop and was stopped. Try sending your message again.)'; - break; - } - - // Log meaningful events with structured summaries - if (streamMsg.type === 'tool_call') { - this.sessionManager.syncTodoToolCall(streamMsg); - const tcName = streamMsg.toolName || 'unknown'; - const tcId = streamMsg.toolCallId?.slice(0, 12) || '?'; - log.info(`>>> TOOL CALL: ${tcName} (id: ${tcId})`); - - if (tcName === 'Bash') { - const toolInput = (streamMsg.toolInput && typeof streamMsg.toolInput === 'object') - ? streamMsg.toolInput as Record - : null; - const command = typeof toolInput?.command === 'string' ? toolInput.command : ''; - if (command) { - lastBashCommand = command; - if (streamMsg.toolCallId) { - bashCommandByToolCallId.set(streamMsg.toolCallId, command); - } - } - } - - sawNonAssistantSinceLastUuid = true; - // Display tool call (args are fully accumulated by dedupedStream buffer-and-flush) - await sendToolCallDisplay(streamMsg); - } else if (streamMsg.type === 'tool_result') { - log.info(`<<< TOOL RESULT: error=${streamMsg.isError}, len=${(streamMsg as any).content?.length || 0}`); - sawNonAssistantSinceLastUuid = true; - - const toolCallId = typeof streamMsg.toolCallId === 'string' ? streamMsg.toolCallId : ''; - const mappedCommand = toolCallId ? bashCommandByToolCallId.get(toolCallId) : undefined; - if (toolCallId) { - bashCommandByToolCallId.delete(toolCallId); - } - const bashCommand = (mappedCommand || lastBashCommand || '').trim(); - const toolResultContent = typeof (streamMsg as any).content === 'string' - ? (streamMsg as any).content - : typeof (streamMsg as any).result === 'string' - ? (streamMsg as any).result - : ''; - const lowerContent = toolResultContent.toLowerCase(); - const isLettabotCliCall = /^lettabot(?:-[a-z0-9-]+)?\b/i.test(bashCommand); - const looksCliCommandError = lowerContent.includes('unknown command') - || lowerContent.includes('command not found') - || lowerContent.includes('usage: lettabot') - || lowerContent.includes('usage: lettabot-bluesky') - || lowerContent.includes('error: --agent is required for bluesky commands'); - - if (streamMsg.isError && bashCommand && isLettabotCliCall && looksCliCommandError) { - const errorKind = lowerContent.includes('unknown command') || lowerContent.includes('command not found') - ? 'unknown-command' - : 'usage-error'; - const failureKey = `${bashCommand.toLowerCase()}::${errorKind}`; - if (repeatedBashFailureKey === failureKey) { - repeatedBashFailureCount += 1; - } else { - repeatedBashFailureKey = failureKey; - repeatedBashFailureCount = 1; - } - - if (repeatedBashFailureCount >= maxRepeatedBashFailures) { - log.error(`Stopping run after repeated Bash command failures (${repeatedBashFailureCount}) for: ${bashCommand}`); - session.abort().catch(() => {}); - response = `(I stopped after repeated CLI command failures while running: ${bashCommand}. The command path appears mismatched. Please confirm Bluesky CLI commands are available, then resend your request.)`; - break; - } - } else { - repeatedBashFailureKey = null; - repeatedBashFailureCount = 0; - } - } else if (streamMsg.type === 'assistant' && lastMsgType !== 'assistant') { - log.info(`Generating response...`); - } else if (streamMsg.type === 'reasoning') { - if (lastMsgType !== 'reasoning') { - log.info(`Reasoning...`); - } - sawNonAssistantSinceLastUuid = true; - // Accumulate reasoning content for display - if (this.config.display?.showReasoning) { - reasoningBuffer += streamMsg.content || ''; - } - } else if (streamMsg.type === 'error') { - // SDK now surfaces error detail that was previously dropped. - // Store for use in the user-facing error message. - lastErrorDetail = { - message: (streamMsg as any).message || 'unknown', - stopReason: (streamMsg as any).stopReason || 'error', - apiError: (streamMsg as any).apiError, - }; - log.error(`Stream error detail: ${lastErrorDetail.message} [${lastErrorDetail.stopReason}]`); - sawNonAssistantSinceLastUuid = true; - } else if (streamMsg.type === 'retry') { - const rm = streamMsg as any; - retryInfo = { attempt: rm.attempt, maxAttempts: rm.maxAttempts, reason: rm.reason }; - log.info(`Retrying (${rm.attempt}/${rm.maxAttempts}): ${rm.reason}`); - sawNonAssistantSinceLastUuid = true; - } else if (streamMsg.type !== 'assistant') { - sawNonAssistantSinceLastUuid = true; - } - // Don't let stream_event overwrite lastMsgType -- it's noise between - // semantic types and would cause false type-transition triggers. - if (isSemanticType) lastMsgType = streamMsg.type; - - if (streamMsg.type === 'assistant') { - const msgUuid = streamMsg.uuid; - if (msgUuid && lastAssistantUuid && msgUuid !== lastAssistantUuid) { - if (response.trim()) { - if (!sawNonAssistantSinceLastUuid) { - log.warn(`WARNING: Assistant UUID changed (${lastAssistantUuid.slice(0, 8)} -> ${msgUuid.slice(0, 8)}) with no visible tool_call/reasoning events between them. Tool call events may have been dropped by SDK transformMessage().`); - } + switch (event.type) { + case 'reasoning': { + // Finalize any pending assistant text on type transition + if (lastEventType === 'text' && response.trim()) { await finalizeMessage(); } - // Start tracking tool/reasoning visibility for the new assistant UUID. - sawNonAssistantSinceLastUuid = false; - } else if (msgUuid && !lastAssistantUuid) { - // Clear any pre-assistant noise so the first UUID becomes a clean baseline. - sawNonAssistantSinceLastUuid = false; - } - lastAssistantUuid = msgUuid || lastAssistantUuid; - - const assistantChunk = streamMsg.content || ''; - response += assistantChunk; - streamedAssistantText += assistantChunk; - - // Live-edit streaming for channels that support it - // Hold back streaming edits while response could still be or block - const canEdit = adapter.supportsEditing?.() ?? false; - const trimmed = response.trim(); - const mayBeHidden = ''.startsWith(trimmed) - || ''.startsWith(trimmed) - || (trimmed.startsWith('')); - // Strip any completed block from the streaming text - const streamText = stripActionsBlock(response).trim(); - if (canEdit && !mayBeHidden && !suppressDelivery && !this.cancelledKeys.has(convKey) && streamText.length > 0 && Date.now() - lastUpdate > 1500 && Date.now() > rateLimitedUntil) { - try { - const prefixedStream = this.prefixResponse(streamText); - if (messageId) { - await adapter.editMessage(msg.chatId, messageId, prefixedStream); - } else { - const result = await adapter.sendMessage({ chatId: msg.chatId, text: prefixedStream, threadId: msg.threadId }); - messageId = result.messageId; - sentAnyMessage = true; - } - } catch (editErr: any) { - log.warn('Streaming edit failed:', editErr instanceof Error ? editErr.message : editErr); - // Detect 429 rate limit and suppress further streaming edits - const errStr = String(editErr?.message ?? editErr); - const retryMatch = errStr.match(/retry after (\d+)/i); - if (errStr.includes('429') || retryMatch) { - const retryAfter = retryMatch ? Number(retryMatch[1]) : 30; - rateLimitedUntil = Date.now() + retryAfter * 1000; - log.warn(`Rate limited -- suppressing streaming edits for ${retryAfter}s`); + lastEventType = 'reasoning'; + sawNonAssistantSinceLastUuid = true; + if (this.config.display?.showReasoning && !suppressDelivery && event.content.trim()) { + log.info(`Reasoning: ${event.content.trim().slice(0, 100)}`); + try { + const reasoning = formatReasoningDisplay(event.content, adapter.id, this.config.display?.reasoningMaxChars); + await adapter.sendMessage({ + chatId: msg.chatId, + text: reasoning.text, + threadId: msg.threadId, + parseMode: reasoning.parseMode, + }); + } catch (err) { + log.warn('Failed to send reasoning display:', err instanceof Error ? err.message : err); } } - lastUpdate = Date.now(); + break; + } + + case 'tool_call': { + // Finalize any pending assistant text on type transition + if (lastEventType === 'text' && response.trim()) { + await finalizeMessage(); + } + lastEventType = 'tool_call'; + this.sessionManager.syncTodoToolCall(event.raw); + log.info(`>>> TOOL CALL: ${event.name} (id: ${event.id.slice(0, 12) || '?'})`); + sawNonAssistantSinceLastUuid = true; + + // Tool loop detection + const maxToolCalls = this.config.maxToolCalls ?? 100; + if ((msgTypeCounts['tool_call'] || 0) >= maxToolCalls) { + log.error(`Agent stuck in tool loop (${msgTypeCounts['tool_call']} calls), aborting`); + session?.abort().catch(() => {}); + response = '(Agent got stuck in a tool loop and was stopped. Try sending your message again.)'; + abortedWithMessage = true; + break; + } + + // Bash command tracking for repeated failure detection + if (event.name === 'Bash') { + const command = typeof event.args?.command === 'string' ? event.args.command : ''; + if (command) { + lastBashCommand = command; + if (event.id) bashCommandByToolCallId.set(event.id, command); + } + } + + // Display + if (this.config.display?.showToolCalls && !suppressDelivery) { + try { + const text = formatToolCallDisplay(event.raw); + await adapter.sendMessage({ chatId: msg.chatId, text, threadId: msg.threadId }); + } catch (err) { + log.warn('Failed to send tool call display:', err instanceof Error ? err.message : err); + } + } + break; + } + + case 'tool_result': { + lastEventType = 'tool_result'; + log.info(`<<< TOOL RESULT: error=${event.isError}, len=${event.content.length}`); + sawNonAssistantSinceLastUuid = true; + + // Repeated Bash failure detection + const mappedCommand = event.toolCallId ? bashCommandByToolCallId.get(event.toolCallId) : undefined; + if (event.toolCallId) bashCommandByToolCallId.delete(event.toolCallId); + const bashCommand = (mappedCommand || lastBashCommand || '').trim(); + const lowerContent = event.content.toLowerCase(); + const isLettabotCliCall = /^lettabot(?:-[a-z0-9-]+)?\b/i.test(bashCommand); + const looksCliCommandError = lowerContent.includes('unknown command') + || lowerContent.includes('command not found') + || lowerContent.includes('usage: lettabot') + || lowerContent.includes('usage: lettabot-bluesky') + || lowerContent.includes('error: --agent is required for bluesky commands'); + + if (event.isError && bashCommand && isLettabotCliCall && looksCliCommandError) { + const errorKind = lowerContent.includes('unknown command') || lowerContent.includes('command not found') + ? 'unknown-command' : 'usage-error'; + const failureKey = `${bashCommand.toLowerCase()}::${errorKind}`; + if (repeatedBashFailureKey === failureKey) { + repeatedBashFailureCount += 1; + } else { + repeatedBashFailureKey = failureKey; + repeatedBashFailureCount = 1; + } + if (repeatedBashFailureCount >= maxRepeatedBashFailures) { + log.error(`Stopping run after repeated Bash command failures (${repeatedBashFailureCount}) for: ${bashCommand}`); + session?.abort().catch(() => {}); + response = `(I stopped after repeated CLI command failures while running: ${bashCommand}. The command path appears mismatched. Please confirm Bluesky CLI commands are available, then resend your request.)`; + abortedWithMessage = true; + break; + } + } else { + repeatedBashFailureKey = null; + repeatedBashFailureCount = 0; + } + break; + } + + case 'text': { + lastEventType = 'text'; + // Detect assistant UUID change (multi-turn response boundary) + if (event.uuid && lastAssistantUuid && event.uuid !== lastAssistantUuid) { + if (response.trim()) { + if (!sawNonAssistantSinceLastUuid) { + log.warn(`WARNING: Assistant UUID changed (${lastAssistantUuid.slice(0, 8)} -> ${event.uuid.slice(0, 8)}) with no visible tool_call/reasoning events between them.`); + } + await finalizeMessage(); + } + sawNonAssistantSinceLastUuid = false; + } else if (event.uuid && !lastAssistantUuid) { + sawNonAssistantSinceLastUuid = false; + } + lastAssistantUuid = event.uuid || lastAssistantUuid; + + response += event.delta; + + // Live-edit streaming for channels that support it + const canEdit = adapter.supportsEditing?.() ?? false; + const trimmed = response.trim(); + const mayBeHidden = ''.startsWith(trimmed) + || ''.startsWith(trimmed) + || (trimmed.startsWith('')); + const streamText = stripActionsBlock(response).trim(); + if (canEdit && !mayBeHidden && !suppressDelivery && !this.cancelledKeys.has(convKey) + && streamText.length > 0 && Date.now() - lastUpdate > 1500 && Date.now() > rateLimitedUntil) { + try { + const prefixedStream = this.prefixResponse(streamText); + if (messageId) { + await adapter.editMessage(msg.chatId, messageId, prefixedStream); + } else { + const result = await adapter.sendMessage({ chatId: msg.chatId, text: prefixedStream, threadId: msg.threadId }); + messageId = result.messageId; + sentAnyMessage = true; + } + } catch (editErr: any) { + log.warn('Streaming edit failed:', editErr instanceof Error ? editErr.message : editErr); + const errStr = String(editErr?.message ?? editErr); + const retryMatch = errStr.match(/retry after (\d+)/i); + if (errStr.includes('429') || retryMatch) { + const retryAfter = retryMatch ? Number(retryMatch[1]) : 30; + rateLimitedUntil = Date.now() + retryAfter * 1000; + log.warn(`Rate limited -- suppressing streaming edits for ${retryAfter}s`); + } + } + lastUpdate = Date.now(); + } + break; + } + + case 'complete': { + sawForegroundResult = true; + + // Handle cancelled results + if (event.cancelled) { + log.info(`Discarding cancelled run result (seq=${seq})`); + this.sessionManager.invalidateSession(convKey); + session = null; + if (!retried) { + clearInterval(typingInterval); + return this.processMessage(msg, adapter, true); + } + break; + } + + // Handle stale duplicate results + if (event.stale) { + this.sessionManager.invalidateSession(convKey); + session = null; + if (!retried) { + log.warn(`Retrying message after stale duplicate result (seq=${seq}, key=${convKey})`); + clearInterval(typingInterval); + return this.processMessage(msg, adapter, true); + } + response = ''; + break; + } + + // Result text handling: only use the pipeline's result text as a + // FALLBACK when no assistant text was streamed. If text events were + // already yielded and possibly finalized (sent), don't override. + if (event.text.trim() && !event.hadStreamedText && event.success && !event.error) { + response = event.text; + } else if (!sentAnyMessage && response.trim().length === 0 && event.text.trim() && event.success && !event.error) { + // Safety fallback: if nothing was delivered yet and response is empty, + // allow result-text-based resend. + response = event.text; + } + + const hasResponse = response.trim().length > 0; + const resultText = typeof event.raw.result === 'string' ? event.raw.result : ''; + log.info(`Stream result: seq=${seq} success=${event.success}, hasResponse=${hasResponse}, resultLen=${resultText.length}`); + if (event.error) { + const parts = [`error=${event.error}`]; + if (event.stopReason) parts.push(`stopReason=${event.stopReason}`); + if (event.durationMs !== undefined) parts.push(`duration=${event.durationMs}ms`); + if (event.conversationId) parts.push(`conv=${event.conversationId}`); + log.error(`Result error: ${parts.join(', ')}`); + } + + // Retry/recovery logic + const retryConvKey = this.resolveConversationKey(msg.channel, msg.chatId, msg.forcePerChat); + const retryConvIdFromStore = (retryConvKey === 'shared' + ? this.store.conversationId + : this.store.getConversationId(retryConvKey)) ?? undefined; + const retryConvIdRaw = (event.conversationId && event.conversationId.length > 0) + ? event.conversationId + : retryConvIdFromStore; + const retryConvId = isRecoverableConversationId(retryConvIdRaw) + ? retryConvIdRaw + : undefined; + + const initialRetryDecision = this.buildResultRetryDecision( + event.raw, resultText, hasResponse, sentAnyMessage, lastErrorDetail, + ); + + // Enrich opaque error detail from run metadata + if (initialRetryDecision.isTerminalError && this.store.agentId && + (!lastErrorDetail || lastErrorDetail.message === 'Agent stopped: error')) { + const enriched = await getLatestRunError(this.store.agentId, retryConvId); + if (enriched) { + lastErrorDetail = { + message: enriched.message, + stopReason: enriched.stopReason, + isApprovalError: enriched.isApprovalError, + }; + } + } + + const retryDecision = this.buildResultRetryDecision( + event.raw, resultText, hasResponse, sentAnyMessage, lastErrorDetail, + ); + + // Approval conflict recovery + if (retryDecision.isApprovalConflict && !retried && this.store.agentId) { + if (retryConvId) { + log.info('Approval conflict detected -- attempting targeted recovery...'); + this.sessionManager.invalidateSession(retryConvKey); + session = null; + clearInterval(typingInterval); + const convResult = await recoverOrphanedConversationApproval( + this.store.agentId, retryConvId, true, + ); + if (convResult.recovered) { + log.info(`Approval recovery succeeded (${convResult.details}), retrying message...`); + return this.processMessage(msg, adapter, true); + } + log.warn(`Approval recovery failed: ${convResult.details}`); + return this.processMessage(msg, adapter, true); + } else { + log.info('Approval conflict in default conversation -- attempting agent-level recovery...'); + this.sessionManager.invalidateSession(retryConvKey); + session = null; + clearInterval(typingInterval); + const agentResult = await recoverPendingApprovalsForAgent(this.store.agentId); + if (agentResult.recovered) { + log.info(`Agent-level recovery succeeded (${agentResult.details}), retrying message...`); + return this.processMessage(msg, adapter, true); + } + log.warn(`Agent-level recovery failed: ${agentResult.details}`); + return this.processMessage(msg, adapter, true); + } + } + + // Empty/error result retry + if (retryDecision.shouldRetryForEmptyResult || retryDecision.shouldRetryForErrorResult) { + if (!retried && this.store.agentId && retryConvId) { + const reason = retryDecision.shouldRetryForErrorResult ? 'error result' : 'empty result'; + log.info(`${reason} - attempting orphaned approval recovery...`); + this.sessionManager.invalidateSession(retryConvKey); + session = null; + clearInterval(typingInterval); + const convResult = await recoverOrphanedConversationApproval(this.store.agentId, retryConvId); + if (convResult.recovered) { + log.info(`Recovery succeeded (${convResult.details}), retrying message...`); + return this.processMessage(msg, adapter, true); + } + if (retryDecision.shouldRetryForErrorResult) { + log.info('Retrying once after terminal error...'); + return this.processMessage(msg, adapter, true); + } + } + } + + // Terminal error with no response + if (retryDecision.isTerminalError && !hasResponse && !sentAnyMessage) { + if (lastErrorDetail) { + response = formatApiErrorForUser(lastErrorDetail); + } else { + const err = event.error || 'unknown error'; + const reason = event.stopReason ? ` [${event.stopReason}]` : ''; + response = `(Agent run failed: ${err}${reason}. Try sending your message again.)`; + } + } + break; + } + + case 'error': { + lastErrorDetail = { + message: event.message, + stopReason: event.stopReason || 'error', + apiError: event.apiError, + }; + log.error(`Stream error detail: ${event.message} [${event.stopReason || 'error'}]`); + sawNonAssistantSinceLastUuid = true; + break; + } + + case 'retry': { + retryInfo = { attempt: event.attempt, maxAttempts: event.maxAttempts, reason: event.reason }; + log.info(`Retrying (${event.attempt}/${event.maxAttempts}): ${event.reason}`); + sawNonAssistantSinceLastUuid = true; + break; } } - - if (streamMsg.type === 'result') { - // Discard cancelled run results -- the server flushes accumulated - // content from a previously cancelled run as the result for the - // next message. Discard it and retry so the message gets processed. - if (streamMsg.stopReason === 'cancelled') { - log.info(`Discarding cancelled run result (seq=${seq}, len=${typeof streamMsg.result === 'string' ? streamMsg.result.length : 0})`); - this.sessionManager.invalidateSession(convKey); - session = null; - if (!retried) { - return this.processMessage(msg, adapter, true); - } - break; - } - const resultRunState = this.classifyResultRun(convKey, streamMsg); - if (resultRunState === 'stale') { - this.sessionManager.invalidateSession(convKey); - session = null; - if (!retried) { - log.warn(`Retrying message after stale duplicate result (seq=${seq}, key=${convKey})`); - return this.processMessage(msg, adapter, true); - } - response = ''; - break; - } - - sawForegroundResult = true; - - const resultText = typeof streamMsg.result === 'string' ? streamMsg.result : ''; - if (resultText.trim().length > 0) { - const streamedTextTrimmed = streamedAssistantText.trim(); - const resultTextTrimmed = resultText.trim(); - // Decision tree: - // 1) Diverged from streamed output -> prefer streamed text (active fix today) - // 2) No streamed assistant text -> use result text as fallback - // 3) Streamed text exists but nothing was delivered -> allow one result resend - // Compare against all streamed assistant text, not the current - // response buffer (which can be reset between assistant turns). - if (streamedTextTrimmed.length > 0 && resultTextTrimmed !== streamedTextTrimmed) { - log.warn( - `Result text diverges from streamed content ` + - `(resultLen=${resultText.length}, streamLen=${streamedAssistantText.length}). ` + - `Preferring streamed content to avoid n-1 desync.` - ); - } else if (streamedTextTrimmed.length === 0 && streamMsg.success !== false && !streamMsg.error) { - // Fallback for models/providers that only populate result text. - // Skip on error results -- the result field may contain reasoning - // text or other non-deliverable content (e.g. llm_api_error). - response = resultText; - } else if (!sentAnyMessage && response.trim().length === 0 && streamMsg.success !== false && !streamMsg.error) { - // Safety fallback: if we streamed text but nothing was - // delivered yet, allow a single result-based resend. - // Skip on error results for the same reason as above. - response = resultText; - } - } - const hasResponse = response.trim().length > 0; - log.info(`Stream result: seq=${seq} success=${streamMsg.success}, hasResponse=${hasResponse}, resultLen=${resultText.length}`); - if (response.trim().length > 0) { - log.debug(`Stream result preview: seq=${seq} responsePreview=${response.trim().slice(0, 60)}`); - } - log.info(`Stream message counts:`, msgTypeCounts); - if (filteredRunEventCount > 0) { - log.info(`Filtered ${filteredRunEventCount} non-foreground event(s) from stream (seq=${seq}, key=${convKey}, expectedRunId=${expectedForegroundRunId || 'unknown'})`); - } - if (ignoredNonForegroundResultCount > 0) { - log.info(`Ignored ${ignoredNonForegroundResultCount} non-foreground result event(s) (seq=${seq}, key=${convKey}, expectedRunId=${expectedForegroundRunId || 'unknown'})`); - } - if (bufferedDisplayFlushCount > 0 || bufferedDisplayDropCount > 0) { - log.info(`Buffered display events: flushed=${bufferedDisplayFlushCount}, dropped=${bufferedDisplayDropCount} (seq=${seq}, key=${convKey}, expectedRunId=${expectedForegroundRunId || 'unknown'})`); - } - if (streamMsg.error) { - const detail = resultText.trim(); - const parts = [`error=${streamMsg.error}`]; - if (streamMsg.stopReason) parts.push(`stopReason=${streamMsg.stopReason}`); - if (streamMsg.durationMs !== undefined) parts.push(`duration=${streamMsg.durationMs}ms`); - if (streamMsg.conversationId) parts.push(`conv=${streamMsg.conversationId}`); - if (detail) parts.push(`detail=${detail.slice(0, 300)}`); - log.error(`Result error: ${parts.join(', ')}`); - } - - // Retry once when stream ends without any assistant text. - // This catches both empty-success and terminal-error runs. - // TODO(letta-code-sdk#31): Remove once SDK handles HITL approvals in bypassPermissions mode. - // Only retry if we never sent anything to the user. hasResponse tracks - // the current buffer, but finalizeMessage() clears it on type changes. - // sentAnyMessage is the authoritative "did we deliver output" flag. - const retryConvKey = this.resolveConversationKey(msg.channel, msg.chatId, msg.forcePerChat); - const retryConvIdFromStore = (retryConvKey === 'shared' - ? this.store.conversationId - : this.store.getConversationId(retryConvKey)) ?? undefined; - const retryConvIdRaw = (typeof streamMsg.conversationId === 'string' && streamMsg.conversationId.length > 0) - ? streamMsg.conversationId - : retryConvIdFromStore; - const retryConvId = isRecoverableConversationId(retryConvIdRaw) - ? retryConvIdRaw - : undefined; - if (!retryConvId && retryConvIdRaw) { - log.info(`Skipping approval recovery for non-recoverable conversation id: ${retryConvIdRaw}`); - } - const initialRetryDecision = this.buildResultRetryDecision( - streamMsg, - resultText, - hasResponse, - sentAnyMessage, - lastErrorDetail, - ); - - // Enrich opaque error detail from run metadata (single fast API call). - // The wire protocol's stop_reason often just says "error" -- the run - // metadata has the actual detail (e.g. "waiting for approval on a tool call"). - if (initialRetryDecision.isTerminalError && this.store.agentId && - (!lastErrorDetail || lastErrorDetail.message === 'Agent stopped: error')) { - const enriched = await getLatestRunError(this.store.agentId, retryConvId); - if (enriched) { - lastErrorDetail = { - message: enriched.message, - stopReason: enriched.stopReason, - isApprovalError: enriched.isApprovalError, - }; - } - } - - const retryDecision = this.buildResultRetryDecision( - streamMsg, - resultText, - hasResponse, - sentAnyMessage, - lastErrorDetail, - ); - - // For approval-specific conflicts, attempt recovery directly (don't - // enter the generic retry path which would just get another CONFLICT). - // Use isApprovalError from run metadata as a fallback when the - // error message doesn't contain the expected strings (e.g. when - // the type=error event was lost and enrichment detected a stuck run). - if (retryDecision.isApprovalConflict && !retried && this.store.agentId) { - if (retryConvId) { - log.info('Approval conflict detected -- attempting targeted recovery...'); - this.sessionManager.invalidateSession(retryConvKey); - session = null; - clearInterval(typingInterval); - const convResult = await recoverOrphanedConversationApproval( - this.store.agentId, retryConvId, true /* deepScan */ - ); - if (convResult.recovered) { - log.info(`Approval recovery succeeded (${convResult.details}), retrying message...`); - return this.processMessage(msg, adapter, true); - } - log.warn(`Approval recovery failed: ${convResult.details}`); - log.info('Retrying once with a fresh session after approval conflict...'); - return this.processMessage(msg, adapter, true); - } else { - log.info('Approval conflict detected in default/alias conversation -- attempting agent-level recovery...'); - this.sessionManager.invalidateSession(retryConvKey); - session = null; - clearInterval(typingInterval); - const agentResult = await recoverPendingApprovalsForAgent(this.store.agentId); - if (agentResult.recovered) { - log.info(`Agent-level recovery succeeded (${agentResult.details}), retrying message...`); - return this.processMessage(msg, adapter, true); - } - log.warn(`Agent-level recovery failed: ${agentResult.details}`); - log.info('Retrying once with a fresh session after approval conflict...'); - return this.processMessage(msg, adapter, true); - } - } - - if (retryDecision.shouldRetryForEmptyResult || retryDecision.shouldRetryForErrorResult) { - if (retryDecision.shouldRetryForEmptyResult) { - log.error(`Warning: Agent returned empty result with no response. stopReason=${streamMsg.stopReason || 'N/A'}, conv=${streamMsg.conversationId || 'N/A'}`); - } - if (retryDecision.shouldRetryForErrorResult) { - log.error(`Warning: Agent returned terminal error (error=${streamMsg.error}, stopReason=${streamMsg.stopReason || 'N/A'}) with no response.`); - } - - if (!retried && this.store.agentId && retryConvId) { - const reason = retryDecision.shouldRetryForErrorResult ? 'error result' : 'empty result'; - log.info(`${reason} - attempting orphaned approval recovery...`); - this.sessionManager.invalidateSession(retryConvKey); - session = null; - clearInterval(typingInterval); - const convResult = await recoverOrphanedConversationApproval( - this.store.agentId, - retryConvId - ); - if (convResult.recovered) { - log.info(`Recovery succeeded (${convResult.details}), retrying message...`); - return this.processMessage(msg, adapter, true); - } - log.warn(`No orphaned approvals found: ${convResult.details}`); - - // Some client-side approval failures do not surface as pending approvals. - // Retry once anyway in case the previous run terminated mid-tool cycle. - if (retryDecision.shouldRetryForErrorResult) { - log.info('Retrying once after terminal error (no orphaned approvals detected)...'); - return this.processMessage(msg, adapter, true); - } - } else if (!retried && retryDecision.shouldRetryForErrorResult && !retryConvId) { - log.warn('Skipping terminal-error retry because no recoverable conversation id is available.'); - } - } - - if (retryDecision.isTerminalError && !hasResponse && !sentAnyMessage) { - if (lastErrorDetail) { - response = formatApiErrorForUser(lastErrorDetail); - } else { - const err = streamMsg.error || 'unknown error'; - const reason = streamMsg.stopReason ? ` [${streamMsg.stopReason}]` : ''; - response = `(Agent run failed: ${err}${reason}. Try sending your message again.)`; - } - } - + if (abortedWithMessage) { + log.info(`Stopping stream consumption after explicit abort (seq=${seq}, key=${convKey})`); break; } } @@ -1830,22 +1626,18 @@ export class LettaBot implements AgentSession { return; } - const missingForegroundTerminalResult = - expectedForegroundRunId !== null && - !sawForegroundResult && - sawCompetingRunEvent && - !sentAnyMessage; - - if (missingForegroundTerminalResult) { - log.warn(`Foreground run ended without terminal result after competing run activity (seq=${seq}, key=${convKey}, expectedRunId=${expectedForegroundRunId})`); + // If pipeline ended without a complete event, something went wrong. + // Exception: if we aborted with an explicit message (tool loop, bash failure), + // just deliver that message. + if (!sawForegroundResult && !sentAnyMessage && !abortedWithMessage) { + log.warn(`Stream ended without result (seq=${seq}, key=${convKey})`); this.sessionManager.invalidateSession(convKey); session = null; response = ''; - reasoningBuffer = ''; if (!retried) { return this.processMessage(msg, adapter, true); } - response = '(The agent stream ended before a foreground result was received. Please try again.)'; + response = '(The agent stream ended before a result was received. Please try again.)'; } // Parse and execute XML directives (e.g. ) @@ -1871,10 +1663,9 @@ export class LettaBot implements AgentSession { lap('directives done'); // Send final response if (response.trim()) { - // Wait out any active rate limit before sending the final message const rateLimitRemaining = rateLimitedUntil - Date.now(); if (rateLimitRemaining > 0) { - const waitMs = Math.min(rateLimitRemaining, 30_000); // Cap at 30s + const waitMs = Math.min(rateLimitRemaining, 30_000); log.info(`Waiting ${(waitMs / 1000).toFixed(1)}s for rate limit before final send`); await new Promise(resolve => setTimeout(resolve, waitMs)); } @@ -1888,7 +1679,6 @@ export class LettaBot implements AgentSession { sentAnyMessage = true; this.store.resetRecoveryAttempts(); } catch (sendErr) { - // Edit failed -- send as new message so user isn't left with truncated text log.warn('Final message delivery failed:', sendErr instanceof Error ? sendErr.message : sendErr); try { await adapter.sendMessage({ chatId: msg.chatId, text: prefixedFinal, threadId: msg.threadId }); @@ -1899,7 +1689,7 @@ export class LettaBot implements AgentSession { } } } - + lap('message delivered'); await this.deliverNoVisibleResponseIfNeeded(msg, adapter, sentAnyMessage, receivedAnyData, msgTypeCounts); diff --git a/src/core/display-pipeline.test.ts b/src/core/display-pipeline.test.ts new file mode 100644 index 0000000..2b351ba --- /dev/null +++ b/src/core/display-pipeline.test.ts @@ -0,0 +1,211 @@ +import { describe, it, expect } from 'vitest'; +import { createDisplayPipeline, type DisplayEvent } from './display-pipeline.js'; +import type { StreamMsg } from './types.js'; + +/** Helper: collect all DisplayEvents from a pipeline fed with the given messages. */ +async function collect( + messages: StreamMsg[], + convKey = 'test', +): Promise { + async function* feed(): AsyncIterable { + for (const msg of messages) yield msg; + } + const events: DisplayEvent[] = []; + for await (const evt of createDisplayPipeline(feed(), { + convKey, + resultFingerprints: new Map(), + })) { + events.push(evt); + } + return events; +} + +describe('createDisplayPipeline', () => { + it('locks foreground on first reasoning event and yields immediately', async () => { + const events = await collect([ + { type: 'reasoning', content: 'thinking...', runId: 'run-1' }, + { type: 'assistant', content: 'reply', runId: 'run-1' }, + { type: 'result', success: true, result: 'reply', runIds: ['run-1'] }, + ]); + + const types = events.map(e => e.type); + // Reasoning should appear BEFORE text -- no buffering + expect(types[0]).toBe('reasoning'); + expect(types[1]).toBe('text'); + expect(types[2]).toBe('complete'); + }); + + it('locks foreground on first tool_call event', async () => { + const events = await collect([ + { type: 'tool_call', toolCallId: 'tc-1', toolName: 'Bash', toolInput: { command: 'echo hi' }, runId: 'run-1' }, + { type: 'assistant', content: 'done', runId: 'run-1' }, + { type: 'result', success: true, result: 'done', runIds: ['run-1'] }, + ]); + + expect(events[0].type).toBe('tool_call'); + expect(events[1].type).toBe('text'); + expect(events[2].type).toBe('complete'); + }); + + it('filters pre-foreground error events to prevent false retry triggers', async () => { + const events = await collect([ + { type: 'error', runId: 'run-bg', message: 'conflict waiting for approval', stopReason: 'error' }, + { type: 'result', success: false, error: 'error', runIds: ['run-main'] }, + ]); + + // Pre-foreground error is filtered (not yielded). Only the result passes through. + const errorEvt = events.find(e => e.type === 'error'); + const completeEvt = events.find(e => e.type === 'complete'); + expect(errorEvt).toBeUndefined(); + expect(completeEvt).toBeDefined(); + if (completeEvt?.type === 'complete') { + expect(completeEvt.runIds).toContain('run-main'); + } + }); + + it('rebinds foreground on assistant event with new run ID', async () => { + const events = await collect([ + { type: 'assistant', content: 'before tool ', runId: 'run-1' }, + { type: 'tool_call', toolCallId: 'tc-1', toolName: 'Bash', toolInput: {}, runId: 'run-1' }, + { type: 'assistant', content: 'after tool', runId: 'run-2' }, + { type: 'result', success: true, result: 'before tool after tool', runIds: ['run-2'] }, + ]); + + const textEvents = events.filter(e => e.type === 'text'); + // Both assistant events should pass through (rebind on run-2) + expect(textEvents.length).toBe(2); + expect(events.find(e => e.type === 'complete')).toBeDefined(); + }); + + it('filters non-foreground events after lock', async () => { + const events = await collect([ + { type: 'reasoning', content: 'foreground thinking', runId: 'run-1' }, + { type: 'reasoning', content: 'background noise', runId: 'run-2' }, + { type: 'assistant', content: 'reply', runId: 'run-1' }, + { type: 'result', success: true, result: 'reply', runIds: ['run-1'] }, + ]); + + const reasoningEvents = events.filter(e => e.type === 'reasoning'); + // Only foreground reasoning should appear (run-2 filtered after lock to run-1) + expect(reasoningEvents.length).toBe(1); + if (reasoningEvents[0].type === 'reasoning') { + expect(reasoningEvents[0].content).toBe('foreground thinking'); + } + }); + + it('accumulates reasoning chunks and flushes on type change', async () => { + const events = await collect([ + { type: 'reasoning', content: 'part 1 ', runId: 'run-1' }, + { type: 'reasoning', content: 'part 2', runId: 'run-1' }, + { type: 'assistant', content: 'reply', runId: 'run-1' }, + { type: 'result', success: true, result: 'reply', runIds: ['run-1'] }, + ]); + + const reasoningEvents = events.filter(e => e.type === 'reasoning'); + // Multiple reasoning chunks should be accumulated into one event + expect(reasoningEvents.length).toBe(1); + if (reasoningEvents[0].type === 'reasoning') { + expect(reasoningEvents[0].content).toBe('part 1 part 2'); + } + }); + + it('prefers streamed text over result field on divergence', async () => { + const events = await collect([ + { type: 'assistant', content: 'streamed reply', runId: 'run-1' }, + { type: 'result', success: true, result: 'result field reply', runIds: ['run-1'] }, + ]); + + const complete = events.find(e => e.type === 'complete'); + expect(complete).toBeDefined(); + if (complete?.type === 'complete') { + expect(complete.text).toBe('streamed reply'); + } + }); + + it('falls back to result field when no streamed text', async () => { + const events = await collect([ + { type: 'result', success: true, result: 'result only', runIds: ['run-1'] }, + ]); + + const complete = events.find(e => e.type === 'complete'); + expect(complete).toBeDefined(); + if (complete?.type === 'complete') { + expect(complete.text).toBe('result only'); + } + }); + + it('detects stale duplicate results by run fingerprint', async () => { + const fingerprints = new Map(); + + // First call -- fresh + const events1 = await (async () => { + async function* feed(): AsyncIterable { + yield { type: 'result', success: true, result: 'first', runIds: ['run-1'] }; + } + const events: DisplayEvent[] = []; + for await (const evt of createDisplayPipeline(feed(), { convKey: 'test', resultFingerprints: fingerprints })) { + events.push(evt); + } + return events; + })(); + + // Second call with same runIds -- stale + const events2 = await (async () => { + async function* feed(): AsyncIterable { + yield { type: 'result', success: true, result: 'second', runIds: ['run-1'] }; + } + const events: DisplayEvent[] = []; + for await (const evt of createDisplayPipeline(feed(), { convKey: 'test', resultFingerprints: fingerprints })) { + events.push(evt); + } + return events; + })(); + + const c1 = events1.find(e => e.type === 'complete'); + const c2 = events2.find(e => e.type === 'complete'); + expect(c1?.type === 'complete' && c1.stale).toBe(false); + expect(c2?.type === 'complete' && c2.stale).toBe(true); + }); + + it('marks cancelled results', async () => { + const events = await collect([ + { type: 'result', success: true, result: '', stopReason: 'cancelled', runIds: ['run-1'] }, + ]); + + const complete = events.find(e => e.type === 'complete'); + expect(complete).toBeDefined(); + if (complete?.type === 'complete') { + expect(complete.cancelled).toBe(true); + } + }); + + it('skips stream_event types', async () => { + const events = await collect([ + { type: 'stream_event', content: 'partial delta' }, + { type: 'assistant', content: 'reply', runId: 'run-1' }, + { type: 'result', success: true, result: 'reply', runIds: ['run-1'] }, + ]); + + // stream_event never reaches the output -- only text + complete + expect(events.length).toBe(2); + expect(events[0].type).toBe('text'); + expect(events[1].type).toBe('complete'); + }); + + it('yields tool_result events', async () => { + const events = await collect([ + { type: 'tool_call', toolCallId: 'tc-1', toolName: 'Bash', toolInput: {}, runId: 'run-1' }, + { type: 'tool_result', toolCallId: 'tc-1', content: 'ok', isError: false, runId: 'run-1' }, + { type: 'assistant', content: 'done', runId: 'run-1' }, + { type: 'result', success: true, result: 'done', runIds: ['run-1'] }, + ]); + + const toolResult = events.find(e => e.type === 'tool_result'); + expect(toolResult).toBeDefined(); + if (toolResult?.type === 'tool_result') { + expect(toolResult.toolCallId).toBe('tc-1'); + expect(toolResult.content).toBe('ok'); + expect(toolResult.isError).toBe(false); + } + }); +}); diff --git a/src/core/display-pipeline.ts b/src/core/display-pipeline.ts new file mode 100644 index 0000000..4f03d09 --- /dev/null +++ b/src/core/display-pipeline.ts @@ -0,0 +1,360 @@ +/** + * DisplayPipeline — transforms raw SDK stream events into clean, + * high-level display events for channel delivery. + * + * Encapsulates: + * - Run ID filtering (foreground tracking, rebinding) + * - Reasoning chunk accumulation (flushed on type transitions) + * - stream_event skipping + * - Type transition tracking + * - Result text selection (streamed vs result field) + * - Stale/cancelled result classification + */ + +import type { StreamMsg } from './types.js'; +import { createLogger } from '../logger.js'; + +const log = createLogger('DisplayPipeline'); + +// ─── Display event types ──────────────────────────────────────────────────── + +export interface ReasoningEvent { + type: 'reasoning'; + /** Complete accumulated reasoning block. */ + content: string; +} + +export interface ToolCallEvent { + type: 'tool_call'; + name: string; + args: Record; + id: string; + /** The raw StreamMsg for consumers that need extra fields. */ + raw: StreamMsg; +} + +export interface ToolResultEvent { + type: 'tool_result'; + toolCallId: string; + content: string; + isError: boolean; + raw: StreamMsg; +} + +export interface TextEvent { + type: 'text'; + /** Full accumulated assistant text for this turn. */ + content: string; + /** Just this chunk's addition. */ + delta: string; + /** Assistant message UUID (changes on multi-turn responses). */ + uuid: string; +} + +export interface CompleteEvent { + type: 'complete'; + /** Final response text (after streamed-vs-result selection). */ + text: string; + success: boolean; + error?: string; + stopReason?: string; + conversationId?: string; + runIds: string[]; + durationMs?: number; + /** True if this is a stale duplicate result (same run fingerprint as last time). */ + stale: boolean; + /** True if this result came from a cancelled run (should be discarded + retried). */ + cancelled: boolean; + /** Whether any assistant text was accumulated during streaming. */ + hadStreamedText: boolean; + /** The raw StreamMsg for consumers that need extra fields. */ + raw: StreamMsg; +} + +export interface ErrorEvent { + type: 'error'; + message: string; + stopReason?: string; + apiError?: Record; + runId?: string; +} + +export interface RetryEvent { + type: 'retry'; + attempt: number; + maxAttempts: number; + reason: string; + delayMs?: number; +} + +export type DisplayEvent = + | ReasoningEvent + | ToolCallEvent + | ToolResultEvent + | TextEvent + | CompleteEvent + | ErrorEvent + | RetryEvent; + +// ─── Run fingerprinting (stale detection) ─────────────────────────────────── + +function classifyResult( + convKey: string, + runIds: string[], + fingerprints: Map, +): 'fresh' | 'stale' | 'unknown' { + if (runIds.length === 0) return 'unknown'; + const fingerprint = [...new Set(runIds)].sort().join(','); + const previous = fingerprints.get(convKey); + if (previous === fingerprint) { + log.warn(`Stale duplicate result detected (key=${convKey}, runIds=${fingerprint})`); + return 'stale'; + } + fingerprints.set(convKey, fingerprint); + return 'fresh'; +} + +// ─── Helpers ──────────────────────────────────────────────────────────────── + +function extractRunIds(msg: StreamMsg): string[] { + const ids: string[] = []; + const rawId = (msg as StreamMsg & { runId?: unknown; run_id?: unknown }).runId + ?? (msg as StreamMsg & { run_id?: unknown }).run_id; + if (typeof rawId === 'string' && rawId.trim()) ids.push(rawId.trim()); + + const rawIds = (msg as StreamMsg & { runIds?: unknown; run_ids?: unknown }).runIds + ?? (msg as StreamMsg & { run_ids?: unknown }).run_ids; + if (Array.isArray(rawIds)) { + for (const id of rawIds) { + if (typeof id === 'string' && id.trim()) ids.push(id.trim()); + } + } + return ids.length > 0 ? [...new Set(ids)] : []; +} + +// ─── Pipeline ─────────────────────────────────────────────────────────────── + +export interface DisplayPipelineOptions { + /** Conversation key for stale-result detection. */ + convKey: string; + /** Shared fingerprint map for stale-result detection (instance-level, not module-level). */ + resultFingerprints: Map; +} + +/** + * Wraps an SDK stream (already deduped by session-manager) and yields + * clean DisplayEvents. All run-ID filtering, reasoning accumulation, + * and result classification happens inside. + */ +export async function* createDisplayPipeline( + stream: AsyncIterable, + opts: DisplayPipelineOptions, +): AsyncGenerator { + const { convKey, resultFingerprints } = opts; + + // ── Foreground run tracking ── + let foregroundRunId: string | null = null; + let foregroundSource: string | null = null; + + // ── Reasoning accumulation ── + let reasoningBuffer = ''; + + // ── Assistant text accumulation ── + let assistantText = ''; + let lastAssistantUuid: string | null = null; + let lastSemanticType: string | null = null; + + // ── All run IDs seen (for result) ── + const allRunIds = new Set(); + + // ── Stats ── + let filteredCount = 0; + + // ── Helpers ── + function* flushReasoning(): Generator { + if (reasoningBuffer.trim()) { + yield { type: 'reasoning', content: reasoningBuffer }; + reasoningBuffer = ''; + } + } + + // ── Main loop ── + for await (const msg of stream) { + const eventRunIds = extractRunIds(msg); + for (const id of eventRunIds) allRunIds.add(id); + + // Skip stream_event (low-level deltas, not semantic) + if (msg.type === 'stream_event') continue; + + log.trace(`raw: type=${msg.type} runIds=${eventRunIds.join(',') || 'none'} fg=${foregroundRunId || 'unlocked'}`); + + // ── Run ID filtering ── + // Lock types: substantive events that prove this run is the foreground turn. + // Error/retry are excluded -- they're transient signals that could come + // from a failed run before the real foreground starts. + const isLockType = msg.type === 'reasoning' || msg.type === 'tool_call' + || msg.type === 'tool_result' || msg.type === 'assistant' || msg.type === 'result'; + + if (foregroundRunId === null && eventRunIds.length > 0 && isLockType) { + // Lock foreground on the first substantive event with a run ID. + // Background Tasks use separate sessions and cannot produce events in + // this stream, so the first run-scoped event is always from the current + // turn's run. This eliminates buffering delay -- reasoning and tool calls + // display immediately instead of waiting for the first assistant event. + foregroundRunId = eventRunIds[0]; + foregroundSource = msg.type; + log.info(`Foreground run locked: ${foregroundRunId} (source=${foregroundSource})`); + // Fall through to type transitions and dispatch for immediate processing. + } else if (foregroundRunId === null && eventRunIds.length > 0 && !isLockType) { + // Pre-foreground error/retry events are filtered. If passed through, + // they set lastErrorDetail in the consumer and can spuriously trigger + // approval recovery or suppress legitimate retries. + filteredCount++; + continue; + } else if (foregroundRunId && eventRunIds.length > 0 && !eventRunIds.includes(foregroundRunId)) { + // Event from a different run. Rebind on assistant events only + // (background Tasks don't produce assistant events in the foreground stream). + if (msg.type === 'assistant') { + const newRunId = eventRunIds[0]; + log.info(`Foreground run rebind: ${foregroundRunId} -> ${newRunId}`); + foregroundRunId = newRunId; + foregroundSource = 'assistant'; + } else { + filteredCount++; + continue; + } + } + + // ── Type transitions ── + // (stream_event is already `continue`d above, so all events here are semantic.) + if (lastSemanticType && lastSemanticType !== msg.type) { + if (lastSemanticType === 'reasoning') { + yield* flushReasoning(); + } + } + lastSemanticType = msg.type; + + // ── Dispatch by type ── + switch (msg.type) { + case 'reasoning': { + reasoningBuffer += msg.content || ''; + break; + } + + case 'tool_call': { + yield { + type: 'tool_call', + name: msg.toolName || 'unknown', + args: (msg.toolInput && typeof msg.toolInput === 'object' ? msg.toolInput : {}) as Record, + id: msg.toolCallId || '', + raw: msg, + }; + break; + } + + case 'tool_result': { + yield { + type: 'tool_result', + toolCallId: msg.toolCallId || '', + content: typeof (msg as any).content === 'string' + ? (msg as any).content + : typeof (msg as any).result === 'string' + ? (msg as any).result + : '', + isError: !!msg.isError, + raw: msg, + }; + break; + } + + case 'assistant': { + const delta = msg.content || ''; + const uuid = msg.uuid || ''; + lastAssistantUuid = uuid || lastAssistantUuid; + + assistantText += delta; + yield { + type: 'text', + content: assistantText, + delta, + uuid: lastAssistantUuid || '', + }; + break; + } + + case 'result': { + // Flush any remaining reasoning + yield* flushReasoning(); + + const resultText = typeof msg.result === 'string' ? msg.result : ''; + const streamedTrimmed = assistantText.trim(); + const resultTrimmed = resultText.trim(); + const runIds = extractRunIds(msg); + + // Result text selection: prefer streamed text over result field + let finalText = assistantText; + if (streamedTrimmed.length > 0 && resultTrimmed !== streamedTrimmed) { + // Diverged — prefer streamed (avoid n-1 desync) + log.warn(`Result diverges from streamed (resultLen=${resultText.length}, streamLen=${assistantText.length}), preferring streamed`); + } else if (streamedTrimmed.length === 0 && msg.success !== false && !msg.error) { + // No streamed text — use result as fallback + finalText = resultText; + } + + // Classify + const cancelled = (msg as any).stopReason === 'cancelled'; + const staleState = classifyResult(convKey, runIds.length > 0 ? runIds : [...allRunIds], resultFingerprints); + const stale = staleState === 'stale'; + + if (filteredCount > 0) { + log.info(`Filtered ${filteredCount} non-foreground event(s) (key=${convKey})`); + } + + yield { + type: 'complete', + text: finalText, + success: msg.success !== false, + error: typeof msg.error === 'string' ? msg.error : undefined, + stopReason: typeof (msg as any).stopReason === 'string' ? (msg as any).stopReason : undefined, + conversationId: typeof (msg as any).conversationId === 'string' ? (msg as any).conversationId : undefined, + runIds: runIds.length > 0 ? runIds : [...allRunIds], + durationMs: typeof (msg as any).durationMs === 'number' ? (msg as any).durationMs : undefined, + stale, + cancelled, + hadStreamedText: streamedTrimmed.length > 0, + raw: msg, + }; + break; + } + + case 'error': { + yield { + type: 'error', + message: (msg as any).message || 'unknown', + stopReason: (msg as any).stopReason, + apiError: (msg as any).apiError, + runId: (msg as any).runId, + }; + break; + } + + case 'retry': { + yield { + type: 'retry', + attempt: (msg as any).attempt ?? 0, + maxAttempts: (msg as any).maxAttempts ?? 0, + reason: (msg as any).reason || 'unknown', + delayMs: (msg as any).delayMs, + }; + break; + } + + default: + // Unhandled event types — skip + break; + } + } + + // Flush any trailing reasoning that wasn't followed by a type change + yield* flushReasoning(); +} diff --git a/src/core/result-guard.test.ts b/src/core/result-guard.test.ts index 52a97ea..015d33e 100644 --- a/src/core/result-guard.test.ts +++ b/src/core/result-guard.test.ts @@ -154,6 +154,61 @@ describe('result divergence guard', () => { expect(sentTexts.some(text => text.includes('repeated CLI command failures'))).toBe(true); }); + it('stops consuming stream and avoids retry after explicit tool-loop abort', async () => { + const bot = new LettaBot({ + workingDir: workDir, + allowedTools: [], + maxToolCalls: 1, + }); + + const adapter = { + id: 'mock', + name: 'Mock', + start: vi.fn(async () => {}), + stop: vi.fn(async () => {}), + isRunning: vi.fn(() => true), + sendMessage: vi.fn(async (_msg: OutboundMessage) => ({ messageId: 'msg-1' })), + editMessage: vi.fn(async () => {}), + sendTypingIndicator: vi.fn(async () => {}), + stopTypingIndicator: vi.fn(async () => {}), + supportsEditing: vi.fn(() => false), + sendFile: vi.fn(async () => ({ messageId: 'file-1' })), + }; + + const runSession = vi.fn(); + runSession.mockResolvedValueOnce({ + session: { abort: vi.fn(async () => {}) }, + stream: async function* () { + yield { type: 'tool_call', toolCallId: 'tc-1', toolName: 'Bash', toolInput: { command: 'echo hi' } }; + // These trailing events should be ignored because the run was already aborted. + yield { type: 'assistant', content: 'late assistant text' }; + yield { type: 'result', success: false, error: 'error', stopReason: 'cancelled', result: '' }; + }, + }); + runSession.mockResolvedValueOnce({ + session: { abort: vi.fn(async () => {}) }, + stream: async function* () { + yield { type: 'assistant', content: 'retried response' }; + yield { type: 'result', success: true, result: 'retried response' }; + }, + }); + (bot as any).sessionManager.runSession = runSession; + + const msg: InboundMessage = { + channel: 'discord', + chatId: 'chat-1', + userId: 'user-1', + text: 'hello', + timestamp: new Date(), + }; + + await (bot as any).processMessage(msg, adapter); + + expect(runSession).toHaveBeenCalledTimes(1); + const sentTexts = adapter.sendMessage.mock.calls.map(([payload]) => payload.text); + expect(sentTexts).toEqual(['(Agent got stuck in a tool loop and was stopped. Try sending your message again.)']); + }); + it('does not deliver reasoning text from error results as the response', async () => { const bot = new LettaBot({ workingDir: workDir, @@ -255,7 +310,7 @@ describe('result divergence guard', () => { expect(sentTexts).toEqual(['Before tool. ', 'After tool.']); }); - it('buffers pre-foreground run-scoped display events and drops non-foreground buffers', async () => { + it('locks foreground on first event with run ID and displays immediately', async () => { const bot = new LettaBot({ workingDir: workDir, allowedTools: [], @@ -276,11 +331,14 @@ describe('result divergence guard', () => { sendFile: vi.fn(async () => ({ messageId: 'file-1' })), }; + // Reasoning and tool_call arrive before any assistant event. The pipeline + // locks foreground on the first event with a run ID (the reasoning event) + // and processes everything immediately -- no buffering. (bot as any).sessionManager.runSession = vi.fn(async () => ({ session: { abort: vi.fn(async () => {}) }, stream: async function* () { - yield { type: 'reasoning', content: 'background-thinking', runId: 'run-bg' }; - yield { type: 'tool_call', toolCallId: 'tc-bg', toolName: 'Bash', toolInput: { command: 'echo leak' }, runId: 'run-bg' }; + yield { type: 'reasoning', content: 'pre-tool thinking', runId: 'run-tool' }; + yield { type: 'tool_call', toolCallId: 'tc-1', toolName: 'Bash', toolInput: { command: 'echo hi' }, runId: 'run-tool' }; yield { type: 'assistant', content: 'main reply', runId: 'run-main' }; yield { type: 'result', success: true, result: 'main reply', runIds: ['run-main'] }; }, @@ -297,7 +355,9 @@ describe('result divergence guard', () => { await (bot as any).processMessage(msg, adapter); const sentTexts = adapter.sendMessage.mock.calls.map(([payload]) => payload.text); - expect(sentTexts).toEqual(['main reply']); + // Reasoning display + tool call display + main reply -- all immediate, no buffering + expect(sentTexts.length).toBe(3); + expect(sentTexts[2]).toBe('main reply'); }); it('retries once when a competing result arrives before any foreground terminal result', async () => { diff --git a/src/core/sdk-session-contract.test.ts b/src/core/sdk-session-contract.test.ts index 9d1e8e9..55b260f 100644 --- a/src/core/sdk-session-contract.test.ts +++ b/src/core/sdk-session-contract.test.ts @@ -1100,6 +1100,81 @@ describe('SDK session contract', () => { expect(sentTexts).toContain('after retry'); }); + it('filters pre-foreground errors so they do not trigger false approval recovery', async () => { + const bot = new LettaBot({ + workingDir: join(dataDir, 'working'), + allowedTools: [], + }); + + let runCall = 0; + (bot as any).sessionManager.runSession = vi.fn(async () => ({ + session: { abort: vi.fn(async () => undefined) }, + stream: async function* () { + if (runCall++ === 0) { + // Pre-foreground error is filtered by the pipeline -- it never + // reaches processMessage, so lastErrorDetail stays null and + // isApprovalConflict cannot fire. + yield { + type: 'error', + runId: 'run-bg', + message: 'CONFLICT: Cannot send a new message: waiting for approval', + stopReason: 'error', + }; + yield { type: 'result', success: false, error: 'error', conversationId: 'conv-approval', runIds: ['run-main'] }; + return; + } + // Retry succeeds + yield { type: 'assistant', content: 'after retry' }; + yield { type: 'result', success: true, result: 'after retry', conversationId: 'conv-approval', runIds: ['run-main-2'] }; + }, + })); + + vi.mocked(recoverOrphanedConversationApproval).mockResolvedValueOnce({ + recovered: false, + details: 'No unresolved approval requests found', + }); + + const adapter = { + id: 'mock', + name: 'Mock', + start: vi.fn(async () => {}), + stop: vi.fn(async () => {}), + isRunning: vi.fn(() => true), + sendMessage: vi.fn(async (_payload: unknown) => ({ messageId: 'msg-1' })), + editMessage: vi.fn(async () => {}), + sendTypingIndicator: vi.fn(async () => {}), + stopTypingIndicator: vi.fn(async () => {}), + supportsEditing: vi.fn(() => false), + sendFile: vi.fn(async () => ({ messageId: 'file-1' })), + }; + + const msg = { + channel: 'discord', + chatId: 'chat-1', + userId: 'user-1', + text: 'hello', + timestamp: new Date(), + }; + + await (bot as any).processMessage(msg, adapter); + + // The pre-foreground error is filtered, so lastErrorDetail is null. + // The result (success=false, nothing delivered) triggers shouldRetryForErrorResult, + // NOT isApprovalConflict. The retry goes through the error-result path with + // orphaned approval recovery, then retries and succeeds. + expect((bot as any).sessionManager.runSession).toHaveBeenCalledTimes(2); + // Approval recovery should have been attempted via the error-result path + expect(recoverOrphanedConversationApproval).toHaveBeenCalledWith( + 'agent-contract-test', + 'conv-approval', + ); + const sentTexts = adapter.sendMessage.mock.calls.map((call) => { + const payload = call[0] as { text?: string }; + return payload.text; + }); + expect(sentTexts).toContain('after retry'); + }); + it('uses agent-level recovery for default conversation alias on terminal approval conflict', async () => { const bot = new LettaBot({ workingDir: join(dataDir, 'working'), diff --git a/src/tools/letta-api.ts b/src/tools/letta-api.ts index 8a872af..02dd9b9 100644 --- a/src/tools/letta-api.ts +++ b/src/tools/letta-api.ts @@ -542,6 +542,12 @@ export async function rejectApproval( log.warn(`Approval already resolved for tool call ${approval.toolCallId}`); return true; } + // Re-throw rate limit errors so callers can bail out early instead of + // hammering the API in a tight loop. + if (err?.status === 429) { + log.error('Failed to reject approval:', e); + throw e; + } log.error('Failed to reject approval:', e); return false; }