fix: display-reasoning stream fixes and Signal compatibility (#355)

This commit is contained in:
Cameron
2026-02-23 15:00:17 -08:00
committed by GitHub
parent cae5b104b3
commit 7028f042af
5 changed files with 314 additions and 46 deletions

View File

@@ -497,6 +497,39 @@ LETTABOT_MEMFS=true npm start
For more details, see the [Letta Code memory documentation](https://docs.letta.com/letta-code/memory/) and the [Context Repositories blog post](https://www.letta.com/blog/context-repositories).
### Display Tool Calls and Reasoning
Show optional "what the agent is doing" messages directly in channel output.
```yaml
features:
display:
showToolCalls: true
showReasoning: false
reasoningMaxChars: 1200
```
In multi-agent configs, set this per agent:
```yaml
agents:
- name: work-assistant
features:
display:
showToolCalls: true
```
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `features.display.showToolCalls` | boolean | `false` | Show tool invocation summaries in chat output |
| `features.display.showReasoning` | boolean | `false` | Show model reasoning/thinking text in chat output |
| `features.display.reasoningMaxChars` | number | `0` | Truncate reasoning to N chars (`0` = no limit) |
Notes:
- Tool call display filters out empty/null input fields and shows the final args for the tool call.
- Reasoning display uses plain bold/italic markdown for better cross-channel compatibility (including Signal).
- Display messages are informational; they do not replace the assistant response. Normal retry/error handling still applies if no assistant reply is produced.
### No-Reply (Opt-Out)
The agent can choose not to respond to a message by sending exactly:

View File

@@ -150,6 +150,12 @@ curl http://localhost:8283/v1/health
If the bot hangs after sending a message:
**What LettaBot already does automatically**
- For terminal runs with no assistant output, LettaBot attempts one recovery/retry cycle.
- If the failure is an approval-related conflict, LettaBot scans the same conversation for unresolved approvals, auto-denies orphaned ones, cancels active runs for that same conversation, then retries once.
- For generic `409 conflict` ("another request is currently being processed"), LettaBot does not blindly retry; it returns a clear "wait and try again" message.
**1. Check for pending tool approvals**
Some tools may have `requires_approval: true` set. LettaBot disables these on startup, but check:

View File

@@ -10,7 +10,7 @@ import type { ChannelAdapter } from '../channels/types.js';
import type { BotConfig, InboundMessage, TriggerContext } from './types.js';
import type { AgentSession } from './interfaces.js';
import { Store } from './store.js';
import { updateAgentName, getPendingApprovals, rejectApproval, cancelRuns, recoverOrphanedConversationApproval } from '../tools/letta-api.js';
import { updateAgentName, getPendingApprovals, rejectApproval, cancelRuns, recoverOrphanedConversationApproval, getLatestRunError } from '../tools/letta-api.js';
import { installSkillsToAgent } from '../skills/loader.js';
import { formatMessageEnvelope, formatGroupBatchEnvelope, type SessionContextOptions } from './formatter.js';
import type { GroupBatcher } from './group-batcher.js';
@@ -72,6 +72,11 @@ function formatApiErrorForUser(error: { message: string; stopReason: string; api
return `(Rate limited${reasonStr}. Try again in a moment.)`;
}
// 409 CONFLICT (concurrent request on same conversation)
if (msg.includes('conflict') || msg.includes('409') || msg.includes('another request is currently being processed')) {
return '(Another request is still processing on this conversation. Wait a moment and try again.)';
}
// Authentication
if (msg.includes('401') || msg.includes('403') || msg.includes('unauthorized') || msg.includes('forbidden')) {
return '(Authentication failed -- check your API key in lettabot.yaml.)';
@@ -218,7 +223,7 @@ export class LettaBot implements AgentSession {
private formatToolCallDisplay(streamMsg: StreamMsg): string {
const name = streamMsg.toolName || 'unknown';
const params = this.abbreviateToolInput(streamMsg);
return params ? `> **Tool:** ${name} (${params})` : `> **Tool:** ${name}`;
return params ? `**Tool:** ${name} (${params})` : `**Tool:** ${name}`;
}
/**
@@ -227,7 +232,8 @@ export class LettaBot implements AgentSession {
private abbreviateToolInput(streamMsg: StreamMsg): string {
const input = streamMsg.toolInput as Record<string, unknown> | undefined;
if (!input || typeof input !== 'object') return '';
const entries = Object.entries(input).slice(0, 2);
// Filter out undefined/null values (SDK yields {raw: undefined} for partial chunks)
const entries = Object.entries(input).filter(([, v]) => v != null).slice(0, 2);
return entries
.map(([k, v]) => {
let str: string;
@@ -250,9 +256,9 @@ export class LettaBot implements AgentSession {
const truncated = maxChars > 0 && text.length > maxChars
? text.slice(0, maxChars) + '...'
: text;
// Prefix every line with "> " so the whole block renders as a blockquote
const lines = truncated.split('\n').map(line => `> ${line}`);
return `> **Thinking**\n${lines.join('\n')}`;
// Use italic for reasoning -- works across all channels including Signal
// (Signal only supports bold/italic/code, no blockquotes)
return `**Thinking**\n_${truncated}_`;
}
// =========================================================================
@@ -1117,6 +1123,9 @@ export class LettaBot implements AgentSession {
let lastErrorDetail: { message: string; stopReason: string; apiError?: Record<string, unknown> } | null = null;
let retryInfo: { attempt: number; maxAttempts: number; reason: string } | null = null;
let reasoningBuffer = '';
// Buffer the latest tool_call by ID so we display it once with full args
// (the SDK streams multiple tool_call messages per call -- first has empty input).
let pendingToolDisplay: { toolCallId: string; msg: any } | null = null;
const msgTypeCounts: Record<string, number> = {};
const finalizeMessage = async () => {
@@ -1171,24 +1180,47 @@ export class LettaBot implements AgentSession {
const preview = JSON.stringify(streamMsg).slice(0, 300);
console.log(`[Stream] type=${streamMsg.type} ${preview}`);
// stream_event is a low-level streaming primitive (partial deltas), not a
// semantic type change. Skip it for type-transition logic so it doesn't
// prematurely flush reasoning buffers or finalize assistant messages.
const isSemanticType = streamMsg.type !== 'stream_event';
// Finalize on type change (avoid double-handling when result provides full response)
if (lastMsgType && lastMsgType !== streamMsg.type && response.trim() && streamMsg.type !== 'result') {
if (isSemanticType && lastMsgType && lastMsgType !== streamMsg.type && response.trim() && streamMsg.type !== 'result') {
await finalizeMessage();
}
// Flush reasoning buffer when type changes away from reasoning
if (lastMsgType === 'reasoning' && streamMsg.type !== 'reasoning' && reasoningBuffer.trim()) {
if (isSemanticType && lastMsgType === 'reasoning' && streamMsg.type !== 'reasoning' && reasoningBuffer.trim()) {
if (this.config.display?.showReasoning && !suppressDelivery) {
try {
const text = this.formatReasoningDisplay(reasoningBuffer);
await adapter.sendMessage({ chatId: msg.chatId, text, threadId: msg.threadId });
sentAnyMessage = true;
// Note: display messages don't set sentAnyMessage -- they're informational,
// not a substitute for an assistant response. Error handling and retry must
// still fire even if reasoning was displayed.
} catch (err) {
console.warn('[Bot] Failed to send reasoning display:', err instanceof Error ? err.message : err);
}
}
reasoningBuffer = '';
}
// Flush pending tool call display when type changes away from tool_call.
// The SDK streams multiple tool_call messages per call (first has empty args),
// so we buffer and display the last one which has the complete input.
if (isSemanticType && pendingToolDisplay && streamMsg.type !== 'tool_call') {
if (this.config.display?.showToolCalls && !suppressDelivery) {
try {
const text = this.formatToolCallDisplay(pendingToolDisplay.msg);
await adapter.sendMessage({ chatId: msg.chatId, text, threadId: msg.threadId });
// Display messages don't set sentAnyMessage (see reasoning display comment).
} catch (err) {
console.warn('[Bot] Failed to send tool call display:', err instanceof Error ? err.message : err);
}
}
pendingToolDisplay = null;
}
// Tool loop detection
const maxToolCalls = this.config.maxToolCalls ?? 100;
@@ -1202,18 +1234,13 @@ export class LettaBot implements AgentSession {
// Log meaningful events with structured summaries
if (streamMsg.type === 'tool_call') {
this.syncTodoToolCall(streamMsg);
console.log(`[Stream] >>> TOOL CALL: ${streamMsg.toolName || 'unknown'} (id: ${streamMsg.toolCallId?.slice(0, 12) || '?'})`);
const tcName = streamMsg.toolName || 'unknown';
const tcId = streamMsg.toolCallId?.slice(0, 12) || '?';
console.log(`[Stream] >>> TOOL CALL: ${tcName} (id: ${tcId})`);
sawNonAssistantSinceLastUuid = true;
// Display tool call in channel if configured
if (this.config.display?.showToolCalls && !suppressDelivery) {
try {
const text = this.formatToolCallDisplay(streamMsg);
await adapter.sendMessage({ chatId: msg.chatId, text, threadId: msg.threadId });
sentAnyMessage = true;
} catch (err) {
console.warn('[Bot] Failed to send tool call display:', err instanceof Error ? err.message : err);
}
}
// Buffer the tool call -- the SDK streams multiple chunks per call
// (first has empty args). We display the last chunk when type changes.
pendingToolDisplay = { toolCallId: streamMsg.toolCallId || '', msg: streamMsg };
} else if (streamMsg.type === 'tool_result') {
console.log(`[Stream] <<< TOOL RESULT: error=${streamMsg.isError}, len=${(streamMsg as any).content?.length || 0}`);
sawNonAssistantSinceLastUuid = true;
@@ -1246,7 +1273,9 @@ export class LettaBot implements AgentSession {
} else if (streamMsg.type !== 'assistant') {
sawNonAssistantSinceLastUuid = true;
}
lastMsgType = streamMsg.type;
// Don't let stream_event overwrite lastMsgType -- it's noise between
// semantic types and would cause false type-transition triggers.
if (isSemanticType) lastMsgType = streamMsg.type;
if (streamMsg.type === 'assistant') {
const msgUuid = streamMsg.uuid;
@@ -1319,8 +1348,52 @@ export class LettaBot implements AgentSession {
// the current buffer, but finalizeMessage() clears it on type changes.
// sentAnyMessage is the authoritative "did we deliver output" flag.
const nothingDelivered = !hasResponse && !sentAnyMessage;
const retryConvKey = this.resolveConversationKey(msg.channel);
const retryConvIdFromStore = (retryConvKey === 'shared'
? this.store.conversationId
: this.store.getConversationId(retryConvKey)) ?? undefined;
const retryConvId = (typeof streamMsg.conversationId === 'string' && streamMsg.conversationId.length > 0)
? streamMsg.conversationId
: retryConvIdFromStore;
// Enrich opaque error detail from run metadata (single fast API call).
// The wire protocol's stop_reason often just says "error" -- the run
// metadata has the actual detail (e.g. "waiting for approval on a tool call").
if (isTerminalError && this.store.agentId &&
(!lastErrorDetail || lastErrorDetail.message === 'Agent stopped: error')) {
const enriched = await getLatestRunError(this.store.agentId, retryConvId);
if (enriched) {
lastErrorDetail = { message: enriched.message, stopReason: enriched.stopReason };
}
}
// Don't retry on 409 CONFLICT -- the conversation is busy, retrying
// immediately will just get the same error and waste a session.
const isConflictError = lastErrorDetail?.message?.toLowerCase().includes('conflict') || false;
// For approval-specific conflicts, attempt recovery directly (don't
// enter the generic retry path which would just get another CONFLICT).
const isApprovalConflict = isConflictError &&
lastErrorDetail?.message?.toLowerCase().includes('waiting for approval');
if (isApprovalConflict && !retried && this.store.agentId) {
if (retryConvId) {
console.log('[Bot] Approval conflict detected -- attempting targeted recovery...');
this.invalidateSession(retryConvKey);
session = null;
clearInterval(typingInterval);
const convResult = await recoverOrphanedConversationApproval(
this.store.agentId, retryConvId, true /* deepScan */
);
if (convResult.recovered) {
console.log(`[Bot] Approval recovery succeeded (${convResult.details}), retrying message...`);
return this.processMessage(msg, adapter, true);
}
console.warn(`[Bot] Approval recovery failed: ${convResult.details}`);
}
}
const shouldRetryForEmptyResult = streamMsg.success && resultText === '' && nothingDelivered;
const shouldRetryForErrorResult = isTerminalError && nothingDelivered;
const shouldRetryForErrorResult = isTerminalError && nothingDelivered && !isConflictError;
if (shouldRetryForEmptyResult || shouldRetryForErrorResult) {
if (shouldRetryForEmptyResult) {
console.error(`[Bot] Warning: Agent returned empty result with no response. stopReason=${streamMsg.stopReason || 'N/A'}, conv=${streamMsg.conversationId || 'N/A'}`);
@@ -1329,10 +1402,6 @@ export class LettaBot implements AgentSession {
console.error(`[Bot] Warning: Agent returned terminal error (error=${streamMsg.error}, stopReason=${streamMsg.stopReason || 'N/A'}) with no response.`);
}
const retryConvKey = this.resolveConversationKey(msg.channel);
const retryConvId = retryConvKey === 'shared'
? this.store.conversationId
: this.store.getConversationId(retryConvKey);
if (!retried && this.store.agentId && retryConvId) {
const reason = shouldRetryForErrorResult ? 'error result' : 'empty result';
console.log(`[Bot] ${reason} - attempting orphaned approval recovery...`);

View File

@@ -1,9 +1,10 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
// Mock the Letta client before importing the module under test
const mockConversationsMessagesList = vi.fn();
const mockConversationsMessagesCreate = vi.fn();
const mockRunsRetrieve = vi.fn();
const mockRunsList = vi.fn();
const mockAgentsMessagesCancel = vi.fn();
vi.mock('@letta-ai/letta-client', () => {
@@ -15,13 +16,16 @@ vi.mock('@letta-ai/letta-client', () => {
create: mockConversationsMessagesCreate,
},
};
runs = { retrieve: mockRunsRetrieve };
runs = {
retrieve: mockRunsRetrieve,
list: mockRunsList,
};
agents = { messages: { cancel: mockAgentsMessagesCancel } };
},
};
});
import { recoverOrphanedConversationApproval } from './letta-api.js';
import { getLatestRunError, recoverOrphanedConversationApproval } from './letta-api.js';
// Helper to create a mock async iterable from an array (Letta client returns paginated iterators)
function mockPageIterator<T>(items: T[]) {
@@ -35,6 +39,12 @@ function mockPageIterator<T>(items: T[]) {
describe('recoverOrphanedConversationApproval', () => {
beforeEach(() => {
vi.clearAllMocks();
mockRunsList.mockReturnValue(mockPageIterator([]));
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});
it('returns false when no messages in conversation', async () => {
@@ -68,14 +78,22 @@ describe('recoverOrphanedConversationApproval', () => {
]));
mockRunsRetrieve.mockResolvedValue({ status: 'failed', stop_reason: 'error' });
mockConversationsMessagesCreate.mockResolvedValue({});
mockRunsList.mockReturnValue(mockPageIterator([{ id: 'run-denial-1' }]));
mockAgentsMessagesCancel.mockResolvedValue(undefined);
const result = await recoverOrphanedConversationApproval('agent-1', 'conv-1');
// Recovery has a 3s delay after denial; advance fake timers to resolve it
const resultPromise = recoverOrphanedConversationApproval('agent-1', 'conv-1');
await vi.advanceTimersByTimeAsync(3000);
const result = await resultPromise;
expect(result.recovered).toBe(true);
expect(result.details).toContain('Denied 1 approval(s) from failed run run-1');
expect(mockConversationsMessagesCreate).toHaveBeenCalledOnce();
// Should NOT cancel -- run is already terminated
expect(mockAgentsMessagesCancel).not.toHaveBeenCalled();
// Should only cancel runs active in this same conversation
expect(mockAgentsMessagesCancel).toHaveBeenCalledOnce();
expect(mockAgentsMessagesCancel).toHaveBeenCalledWith('agent-1', {
run_ids: ['run-denial-1'],
});
});
it('recovers from stuck running+requires_approval and cancels the run', async () => {
@@ -89,12 +107,15 @@ describe('recoverOrphanedConversationApproval', () => {
]));
mockRunsRetrieve.mockResolvedValue({ status: 'running', stop_reason: 'requires_approval' });
mockConversationsMessagesCreate.mockResolvedValue({});
mockRunsList.mockReturnValue(mockPageIterator([{ id: 'run-2' }]));
mockAgentsMessagesCancel.mockResolvedValue(undefined);
const result = await recoverOrphanedConversationApproval('agent-1', 'conv-1');
const resultPromise = recoverOrphanedConversationApproval('agent-1', 'conv-1');
await vi.advanceTimersByTimeAsync(3000);
const result = await resultPromise;
expect(result.recovered).toBe(true);
expect(result.details).toContain('(cancelled)');
expect(result.details).toContain('(runs cancelled)');
// Should send denial
expect(mockConversationsMessagesCreate).toHaveBeenCalledOnce();
const createCall = mockConversationsMessagesCreate.mock.calls[0];
@@ -104,6 +125,9 @@ describe('recoverOrphanedConversationApproval', () => {
expect(approvals[0].tool_call_id).toBe('tc-2');
// Should cancel the stuck run
expect(mockAgentsMessagesCancel).toHaveBeenCalledOnce();
expect(mockAgentsMessagesCancel).toHaveBeenCalledWith('agent-1', {
run_ids: ['run-2'],
});
});
it('skips already-resolved approvals', async () => {
@@ -157,12 +181,58 @@ describe('recoverOrphanedConversationApproval', () => {
]));
mockRunsRetrieve.mockResolvedValue({ status: 'running', stop_reason: 'requires_approval' });
mockConversationsMessagesCreate.mockResolvedValue({});
mockRunsList.mockReturnValue(mockPageIterator([{ id: 'run-5' }]));
// Cancel fails
mockAgentsMessagesCancel.mockRejectedValue(new Error('cancel failed'));
const result = await recoverOrphanedConversationApproval('agent-1', 'conv-1');
const resultPromise = recoverOrphanedConversationApproval('agent-1', 'conv-1');
await vi.advanceTimersByTimeAsync(3000);
const result = await resultPromise;
expect(result.recovered).toBe(true);
expect(result.details).toContain('(cancel failed)');
// Cancel failure is logged but doesn't change the suffix anymore
expect(result.details).toContain('Denied 1 approval(s) from running run run-5');
});
});
describe('getLatestRunError', () => {
beforeEach(() => {
vi.clearAllMocks();
});
it('scopes latest run lookup to conversation when provided', async () => {
mockRunsList.mockReturnValue(mockPageIterator([
{
id: 'run-err-1',
conversation_id: 'conv-1',
stop_reason: 'error',
metadata: { error: { detail: 'Another request is currently being processed (conflict)' } },
},
]));
const result = await getLatestRunError('agent-1', 'conv-1');
expect(mockRunsList).toHaveBeenCalledWith({
agent_id: 'agent-1',
conversation_id: 'conv-1',
limit: 1,
});
expect(result?.message).toContain('conflict');
expect(result?.stopReason).toBe('error');
});
it('returns null when response is for a different conversation', async () => {
mockRunsList.mockReturnValue(mockPageIterator([
{
id: 'run-other',
conversation_id: 'conv-2',
stop_reason: 'error',
metadata: { error: { detail: 'waiting for approval' } },
},
]));
const result = await getLatestRunError('agent-1', 'conv-1');
expect(result).toBeNull();
});
});

View File

@@ -461,6 +461,85 @@ export async function cancelRuns(
}
}
/**
* Fetch the error detail from the latest failed run on an agent.
* Returns the actual error detail from run metadata (which is more
* descriptive than the opaque `stop_reason=error` wire message).
* Single API call -- fast enough to use on every error.
*/
export async function getLatestRunError(
agentId: string,
conversationId?: string
): Promise<{ message: string; stopReason: string; isApprovalError: boolean } | null> {
try {
const client = getClient();
const runs = await client.runs.list({
agent_id: agentId,
conversation_id: conversationId,
limit: 1,
});
const runsArray: Array<Record<string, unknown>> = [];
for await (const run of runs) {
runsArray.push(run as unknown as Record<string, unknown>);
break; // Only need the first one
}
const run = runsArray[0];
if (!run) return null;
if (conversationId
&& typeof run.conversation_id === 'string'
&& run.conversation_id !== conversationId) {
console.warn('[Letta API] Latest run lookup returned a different conversation, skipping enrichment');
return null;
}
const meta = run.metadata as Record<string, unknown> | undefined;
const err = meta?.error as Record<string, unknown> | undefined;
const detail = typeof err?.detail === 'string' ? err.detail : '';
const stopReason = typeof run.stop_reason === 'string' ? run.stop_reason : 'error';
if (!detail) return null;
const isApprovalError = detail.toLowerCase().includes('waiting for approval')
|| detail.toLowerCase().includes('approve or deny');
console.log(`[Letta API] Latest run error: ${detail.slice(0, 150)}${isApprovalError ? ' [approval]' : ''}`);
return { message: detail, stopReason, isApprovalError };
} catch (e) {
console.warn('[Letta API] Failed to fetch latest run error:', e instanceof Error ? e.message : e);
return null;
}
}
async function listActiveConversationRunIds(
agentId: string,
conversationId: string,
limit = 25
): Promise<string[]> {
try {
const client = getClient();
const runs = await client.runs.list({
agent_id: agentId,
conversation_id: conversationId,
active: true,
limit,
});
const runIds: string[] = [];
for await (const run of runs) {
const id = (run as { id?: unknown }).id;
if (typeof id === 'string' && id.length > 0) {
runIds.push(id);
}
if (runIds.length >= limit) break;
}
return runIds;
} catch (e) {
console.warn('[Letta API] Failed to list active conversation runs:', e instanceof Error ? e.message : e);
return [];
}
}
/**
* Disable tool approval requirement for a specific tool on an agent.
* This sets requires_approval: false at the server level.
@@ -548,13 +627,17 @@ export async function ensureNoToolApprovals(agentId: string): Promise<void> {
*/
export async function recoverOrphanedConversationApproval(
agentId: string,
conversationId: string
conversationId: string,
deepScan = false
): Promise<{ recovered: boolean; details: string }> {
try {
const client = getClient();
// List recent messages from the conversation to find orphaned approvals
const messagesPage = await client.conversations.messages.list(conversationId, { limit: 50 });
// List recent messages from the conversation to find orphaned approvals.
// Default: 50 (fast path). Deep scan: 500 (for conversations with many approvals).
const scanLimit = deepScan ? 500 : 50;
console.log(`[Letta API] Scanning ${scanLimit} messages for orphaned approvals...`);
const messagesPage = await client.conversations.messages.list(conversationId, { limit: scanLimit });
const messages: Array<Record<string, unknown>> = [];
for await (const msg of messagesPage) {
messages.push(msg as unknown as Record<string, unknown>);
@@ -648,19 +731,26 @@ export async function recoverOrphanedConversationApproval(
streaming: false,
});
// Cancel active stuck runs after rejecting their approvals
// The denial triggers a new agent run server-side. Wait for it to
// settle before returning, otherwise the caller retries immediately
// and hits a 409 because the denial's run is still processing.
await new Promise(resolve => setTimeout(resolve, 3000));
// Cancel only active runs for this conversation to avoid interrupting
// unrelated in-flight requests on other conversations.
const activeRunIds = await listActiveConversationRunIds(agentId, conversationId);
let cancelled = false;
if (isStuckApproval) {
cancelled = await cancelRuns(agentId, [runId]);
if (activeRunIds.length > 0) {
cancelled = await cancelRuns(agentId, activeRunIds);
if (cancelled) {
console.log(`[Letta API] Cancelled stuck run ${runId}`);
} else {
console.warn(`[Letta API] Failed to cancel stuck run ${runId}`);
console.log(`[Letta API] Cancelled ${activeRunIds.length} active conversation run(s) after approval denial`);
}
} else {
console.log(`[Letta API] No active runs to cancel for conversation ${conversationId}`);
}
recoveredCount += approvals.length;
const suffix = isStuckApproval ? (cancelled ? ' (cancelled)' : ' (cancel failed)') : '';
const suffix = cancelled ? ' (runs cancelled)' : '';
details.push(`Denied ${approvals.length} approval(s) from ${status} run ${runId}${suffix}`);
} else {
details.push(`Run ${runId} is ${status}/${stopReason} - not orphaned`);