Files
lettabot/src/core/bot.ts
Gabriele Sarti 110681e979 feat: pass images to the LLM via multimodal API (#184)
feat: pass images to the LLM via multimodal API

When users send images through any channel, the actual image content is now passed to the LLM via the SDK's multimodal API (imageFromFile/imageFromURL) instead of just text metadata.

- Graceful fallback for unsupported MIME types, missing files, and load errors
- Opt-out via features.inlineImages: false in config
- Warns when model doesn't support vision (detects [Image omitted] in response)
2026-02-08 20:22:32 -08:00

1010 lines
41 KiB
TypeScript

/**
* LettaBot Core - Handles agent communication
*
* Single agent, single conversation - chat continues across all channels.
*/
import { createAgent, createSession, resumeSession, imageFromFile, imageFromURL, type Session, type MessageContentItem, type SendMessage } from '@letta-ai/letta-code-sdk';
import { mkdirSync } from 'node:fs';
import type { ChannelAdapter } from '../channels/types.js';
import type { BotConfig, InboundMessage, TriggerContext } from './types.js';
import { Store } from './store.js';
import { updateAgentName, getPendingApprovals, rejectApproval, cancelRuns, recoverOrphanedConversationApproval } from '../tools/letta-api.js';
import { installSkillsToAgent } from '../skills/loader.js';
import { formatMessageEnvelope, formatGroupBatchEnvelope, type SessionContextOptions } from './formatter.js';
import type { GroupBatcher } from './group-batcher.js';
import { loadMemoryBlocks } from './memory.js';
import { SYSTEM_PROMPT } from './system-prompt.js';
/**
* Detect if an error is a 409 CONFLICT from an orphaned approval.
* The error may come as a ConflictError from the Letta client (status 409)
* or as an error message string through the CLI transport.
*/
function isApprovalConflictError(error: unknown): boolean {
if (error instanceof Error) {
const msg = error.message.toLowerCase();
if (msg.includes('waiting for approval')) return true;
if (msg.includes('conflict') && msg.includes('approval')) return true;
}
const statusError = error as { status?: number };
if (statusError?.status === 409) return true;
return false;
}
const SUPPORTED_IMAGE_MIMES = new Set([
'image/png', 'image/jpeg', 'image/gif', 'image/webp',
]);
async function buildMultimodalMessage(
formattedText: string,
msg: InboundMessage,
): Promise<SendMessage> {
// Respect opt-out: when INLINE_IMAGES=false, skip multimodal and only send file paths in envelope
if (process.env.INLINE_IMAGES === 'false') {
return formattedText;
}
const imageAttachments = (msg.attachments ?? []).filter(
(a) => a.kind === 'image'
&& (a.localPath || a.url)
&& (!a.mimeType || SUPPORTED_IMAGE_MIMES.has(a.mimeType))
);
if (imageAttachments.length === 0) {
return formattedText;
}
const content: MessageContentItem[] = [
{ type: 'text', text: formattedText },
];
for (const attachment of imageAttachments) {
try {
if (attachment.localPath) {
content.push(imageFromFile(attachment.localPath));
} else if (attachment.url) {
content.push(await imageFromURL(attachment.url));
}
} catch (err) {
console.warn(`[Bot] Failed to load image ${attachment.name || 'unknown'}: ${err instanceof Error ? err.message : err}`);
}
}
if (content.length > 1) {
console.log(`[Bot] Sending ${content.length - 1} inline image(s) to LLM`);
}
return content.length > 1 ? content : formattedText;
}
export class LettaBot {
private store: Store;
private config: BotConfig;
private channels: Map<string, ChannelAdapter> = new Map();
private messageQueue: Array<{ msg: InboundMessage; adapter: ChannelAdapter }> = [];
private lastUserMessageTime: Date | null = null;
// Callback to trigger heartbeat (set by main.ts)
public onTriggerHeartbeat?: () => Promise<void>;
private groupBatcher?: GroupBatcher;
private groupIntervals: Map<string, number> = new Map(); // channel -> intervalMin
private instantGroupIds: Set<string> = new Set(); // channel:id keys for instant processing
private processing = false;
constructor(config: BotConfig) {
this.config = config;
// Ensure working directory exists
mkdirSync(config.workingDir, { recursive: true });
// Store in project root (same as main.ts reads for LETTA_AGENT_ID)
this.store = new Store('lettabot-agent.json');
console.log(`LettaBot initialized. Agent ID: ${this.store.agentId || '(new)'}`);
}
/**
* Register a channel adapter
*/
registerChannel(adapter: ChannelAdapter): void {
adapter.onMessage = (msg) => this.handleMessage(msg, adapter);
adapter.onCommand = (cmd) => this.handleCommand(cmd);
this.channels.set(adapter.id, adapter);
console.log(`Registered channel: ${adapter.name}`);
}
/**
* Set the group batcher and per-channel intervals.
*/
setGroupBatcher(batcher: GroupBatcher, intervals: Map<string, number>, instantGroupIds?: Set<string>): void {
this.groupBatcher = batcher;
this.groupIntervals = intervals;
if (instantGroupIds) {
this.instantGroupIds = instantGroupIds;
}
console.log('[Bot] Group batcher configured');
}
/**
* Inject a batched group message into the queue and trigger processing.
* Called by GroupBatcher's onFlush callback.
*/
processGroupBatch(msg: InboundMessage, adapter: ChannelAdapter): void {
const count = msg.batchedMessages?.length || 0;
console.log(`[Bot] Group batch: ${count} messages from ${msg.channel}:${msg.chatId}`);
// Unwrap single-message batches so they use formatMessageEnvelope (DM-style)
// instead of the chat-log batch format
const effective = (count === 1 && msg.batchedMessages)
? msg.batchedMessages[0]
: msg;
this.messageQueue.push({ msg: effective, adapter });
if (!this.processing) {
this.processQueue().catch(err => console.error('[Queue] Fatal error in processQueue:', err));
}
}
/**
* Handle slash commands
*/
private async handleCommand(command: string): Promise<string | null> {
console.log(`[Command] Received: /${command}`);
switch (command) {
case 'status': {
const info = this.store.getInfo();
const lines = [
`*Status*`,
`Agent ID: \`${info.agentId || '(none)'}\``,
`Created: ${info.createdAt || 'N/A'}`,
`Last used: ${info.lastUsedAt || 'N/A'}`,
`Channels: ${Array.from(this.channels.keys()).join(', ')}`,
];
return lines.join('\n');
}
case 'heartbeat': {
console.log('[Command] /heartbeat received');
if (!this.onTriggerHeartbeat) {
console.log('[Command] /heartbeat - no trigger callback configured');
return '⚠️ Heartbeat service not configured';
}
console.log('[Command] /heartbeat - triggering heartbeat...');
// Trigger heartbeat asynchronously
this.onTriggerHeartbeat().catch(err => {
console.error('[Heartbeat] Manual trigger failed:', err);
});
return '⏰ Heartbeat triggered (silent mode - check server logs)';
}
case 'reset': {
const oldConversationId = this.store.conversationId;
this.store.conversationId = null;
this.store.resetRecoveryAttempts();
console.log(`[Command] /reset - conversation cleared (was: ${oldConversationId})`);
return 'Conversation reset. Send a message to start a new conversation. (Agent memory is preserved.)';
}
default:
return null;
}
}
/**
* Start all registered channels
*/
async start(): Promise<void> {
const startPromises = Array.from(this.channels.entries()).map(async ([id, adapter]) => {
try {
console.log(`Starting channel: ${adapter.name}...`);
await adapter.start();
console.log(`Started channel: ${adapter.name}`);
} catch (e) {
console.error(`Failed to start channel ${id}:`, e);
}
});
await Promise.all(startPromises);
}
/**
* Stop all channels
*/
async stop(): Promise<void> {
for (const adapter of this.channels.values()) {
try {
await adapter.stop();
} catch (e) {
console.error(`Failed to stop channel ${adapter.id}:`, e);
}
}
}
/**
* Attempt to recover from stuck approval state.
* Returns true if recovery was attempted, false if no recovery needed.
* @param maxAttempts Maximum recovery attempts before giving up (default: 2)
*/
private async attemptRecovery(maxAttempts = 2): Promise<{ recovered: boolean; shouldReset: boolean }> {
if (!this.store.agentId) {
return { recovered: false, shouldReset: false };
}
console.log('[Bot] Checking for pending approvals...');
try {
// Check for pending approvals FIRST, before checking attempt counter
const pendingApprovals = await getPendingApprovals(
this.store.agentId,
this.store.conversationId || undefined
);
if (pendingApprovals.length === 0) {
// Standard check found nothing - try conversation-level inspection as fallback.
// This catches cases where agent.pending_approval is null but the conversation
// has an unresolved approval_request_message from a terminated run.
if (this.store.conversationId) {
const convResult = await recoverOrphanedConversationApproval(
this.store.agentId!,
this.store.conversationId
);
if (convResult.recovered) {
console.log(`[Bot] Conversation-level recovery succeeded: ${convResult.details}`);
return { recovered: true, shouldReset: false };
}
}
// No pending approvals found by either method
this.store.resetRecoveryAttempts();
return { recovered: false, shouldReset: false };
}
// There ARE pending approvals - check if we've exceeded max attempts
const attempts = this.store.recoveryAttempts;
if (attempts >= maxAttempts) {
console.error(`[Bot] Recovery failed after ${attempts} attempts. Still have ${pendingApprovals.length} pending approval(s).`);
console.error('[Bot] Try running: lettabot reset-conversation');
return { recovered: false, shouldReset: true };
}
console.log(`[Bot] Found ${pendingApprovals.length} pending approval(s), attempting recovery (attempt ${attempts + 1}/${maxAttempts})...`);
this.store.incrementRecoveryAttempts();
// Reject all pending approvals
for (const approval of pendingApprovals) {
console.log(`[Bot] Rejecting approval for ${approval.toolName} (${approval.toolCallId})`);
await rejectApproval(
this.store.agentId,
{ toolCallId: approval.toolCallId, reason: 'Session was interrupted - retrying request' },
this.store.conversationId || undefined
);
}
// Cancel any active runs
const runIds = [...new Set(pendingApprovals.map(a => a.runId))];
if (runIds.length > 0) {
console.log(`[Bot] Cancelling ${runIds.length} active run(s)...`);
await cancelRuns(this.store.agentId, runIds);
}
console.log('[Bot] Recovery completed');
return { recovered: true, shouldReset: false };
} catch (error) {
console.error('[Bot] Recovery failed:', error);
this.store.incrementRecoveryAttempts();
return { recovered: false, shouldReset: this.store.recoveryAttempts >= maxAttempts };
}
}
/**
* Queue incoming message for processing (prevents concurrent SDK sessions)
*/
private async handleMessage(msg: InboundMessage, adapter: ChannelAdapter): Promise<void> {
console.log(`[${msg.channel}] Message from ${msg.userId}: ${msg.text}`);
// Route group messages to batcher if configured
if (msg.isGroup && this.groupBatcher) {
// Check if this group is configured for instant processing
const isInstant = this.instantGroupIds.has(`${msg.channel}:${msg.chatId}`)
|| (msg.serverId && this.instantGroupIds.has(`${msg.channel}:${msg.serverId}`));
const intervalMin = isInstant ? 0 : (this.groupIntervals.get(msg.channel) ?? 10);
console.log(`[Bot] Group message routed to batcher (interval=${intervalMin}min, mentioned=${msg.wasMentioned}, instant=${!!isInstant})`);
this.groupBatcher.enqueue(msg, adapter, intervalMin);
return;
}
// Add to queue
this.messageQueue.push({ msg, adapter });
console.log(`[Queue] Added to queue, length: ${this.messageQueue.length}, processing: ${this.processing}`);
// Process queue if not already processing
if (!this.processing) {
console.log('[Queue] Starting queue processing');
this.processQueue().catch(err => console.error('[Queue] Fatal error in processQueue:', err));
} else {
console.log('[Queue] Already processing, will process when current message finishes');
}
}
/**
* Process messages one at a time
*/
private async processQueue(): Promise<void> {
console.log(`[Queue] processQueue called: processing=${this.processing}, queueLength=${this.messageQueue.length}`);
if (this.processing || this.messageQueue.length === 0) {
console.log('[Queue] Exiting early: already processing or empty queue');
return;
}
this.processing = true;
console.log('[Queue] Started processing');
while (this.messageQueue.length > 0) {
const { msg, adapter } = this.messageQueue.shift()!;
console.log(`[Queue] Processing message from ${msg.userId} (${this.messageQueue.length} remaining)`);
try {
await this.processMessage(msg, adapter);
} catch (error) {
console.error('[Queue] Error processing message:', error);
}
}
console.log('[Queue] Finished processing all messages');
this.processing = false;
}
/**
* Process a single message
*/
private async processMessage(msg: InboundMessage, adapter: ChannelAdapter, retried = false): Promise<void> {
console.log('[Bot] Starting processMessage');
// Track when user last sent a message (for heartbeat skip logic)
this.lastUserMessageTime = new Date();
// Track last message target for heartbeat delivery
this.store.lastMessageTarget = {
channel: msg.channel,
chatId: msg.chatId,
messageId: msg.messageId,
updatedAt: new Date().toISOString(),
};
console.log('[Bot] Sending typing indicator');
// Start typing indicator
await adapter.sendTypingIndicator(msg.chatId);
console.log('[Bot] Typing indicator sent');
// Attempt recovery from stuck approval state before starting session
const recovery = await this.attemptRecovery();
if (recovery.shouldReset) {
await adapter.sendMessage({
chatId: msg.chatId,
text: '(Session recovery failed after multiple attempts. Try: lettabot reset-conversation)',
threadId: msg.threadId,
});
return;
}
if (recovery.recovered) {
console.log('[Bot] Recovered from stuck approval, continuing with message processing');
}
// Create or resume session
let session: Session;
let usedDefaultConversation = false;
let usedSpecificConversation = false;
// Base options for sessions (systemPrompt/memory set via createAgent for new agents)
const baseOptions = {
permissionMode: 'bypassPermissions' as const,
allowedTools: this.config.allowedTools,
cwd: this.config.workingDir,
// bypassPermissions mode auto-allows all tools, no canUseTool callback needed
// But add logging callback to diagnose approval flow issues (#132)
canUseTool: (toolName: string, toolInput: Record<string, unknown>) => {
console.log(`[Bot] Tool approval requested: ${toolName} (should be auto-approved by bypassPermissions)`);
console.log(`[Bot] WARNING: canUseTool callback should NOT be called when permissionMode=bypassPermissions`);
// Return allow anyway as fallback
return { behavior: 'allow' as const };
},
};
console.log('[Bot] Session options:', { permissionMode: baseOptions.permissionMode, allowedTools: baseOptions.allowedTools?.length });
console.log('[Bot] Creating/resuming session');
try {
if (this.store.conversationId) {
// Resume the specific conversation we've been using
console.log(`[Bot] Resuming conversation: ${this.store.conversationId}`);
process.env.LETTA_AGENT_ID = this.store.agentId || undefined;
usedSpecificConversation = true;
session = resumeSession(this.store.conversationId, baseOptions);
} else if (this.store.agentId) {
// Agent exists but no conversation - try default conversation
console.log(`[Bot] Resuming agent default conversation: ${this.store.agentId}`);
process.env.LETTA_AGENT_ID = this.store.agentId;
usedDefaultConversation = true;
session = resumeSession(this.store.agentId, baseOptions);
} else {
// Create new agent with default conversation
console.log('[Bot] Creating new agent');
const newAgentId = await createAgent({
model: this.config.model,
systemPrompt: SYSTEM_PROMPT,
memory: loadMemoryBlocks(this.config.agentName),
});
session = createSession(newAgentId, baseOptions);
}
console.log('[Bot] Session created/resumed');
const defaultTimeoutMs = 30000; // 30s timeout
const envTimeoutMs = Number(process.env.LETTA_SESSION_TIMEOUT_MS);
const initTimeoutMs = Number.isFinite(envTimeoutMs) && envTimeoutMs > 0
? envTimeoutMs
: defaultTimeoutMs;
const withTimeout = async <T>(promise: Promise<T>, label: string): Promise<T> => {
let timeoutId: NodeJS.Timeout;
const timeoutPromise = new Promise<T>((_, reject) => {
timeoutId = setTimeout(() => {
reject(new Error(`${label} timed out after ${initTimeoutMs}ms`));
}, initTimeoutMs);
});
try {
return await Promise.race([promise, timeoutPromise]);
} finally {
clearTimeout(timeoutId!);
}
};
let initInfo;
try {
initInfo = await withTimeout(session.initialize(), 'Session initialize');
} catch (error) {
if (usedSpecificConversation && this.store.agentId) {
console.warn('[Bot] Conversation missing, creating a new conversation...');
session.close();
session = createSession(this.store.agentId, baseOptions);
initInfo = await withTimeout(session.initialize(), 'Session initialize (new conversation)');
usedSpecificConversation = false;
usedDefaultConversation = false;
} else if (usedDefaultConversation && this.store.agentId) {
console.warn('[Bot] Default conversation missing, creating a new conversation...');
session.close();
session = createSession(this.store.agentId, baseOptions);
initInfo = await withTimeout(session.initialize(), 'Session initialize (new conversation)');
usedDefaultConversation = false;
} else {
throw error;
}
}
if (initInfo.conversationId && initInfo.conversationId !== this.store.conversationId) {
this.store.conversationId = initInfo.conversationId;
console.log('[Bot] Saved conversation ID:', initInfo.conversationId);
}
// Determine if this is the first message in a new chat session
// (different chatId from last message target = new session context)
const prevTarget = this.store.lastMessageTarget;
const isNewChatSession = !prevTarget || prevTarget.chatId !== msg.chatId || prevTarget.channel !== msg.channel;
const sessionContext: SessionContextOptions | undefined = isNewChatSession ? {
agentId: this.store.agentId || undefined,
serverUrl: process.env.LETTA_BASE_URL || this.store.baseUrl || 'https://api.letta.com',
} : undefined;
// Send message to agent with metadata envelope
const formattedText = msg.isBatch && msg.batchedMessages
? formatGroupBatchEnvelope(msg.batchedMessages)
: formatMessageEnvelope(msg, {}, sessionContext);
const messageToSend = await buildMultimodalMessage(formattedText, msg);
try {
await withTimeout(session.send(messageToSend), 'Session send');
} catch (sendError) {
// Check for 409 CONFLICT from orphaned approval_request_message
if (!retried && isApprovalConflictError(sendError) && this.store.agentId && this.store.conversationId) {
console.log('[Bot] CONFLICT detected - attempting orphaned approval recovery...');
session.close();
const result = await recoverOrphanedConversationApproval(
this.store.agentId,
this.store.conversationId
);
if (result.recovered) {
console.log(`[Bot] Recovery succeeded (${result.details}), retrying message...`);
return this.processMessage(msg, adapter, /* retried */ true);
}
console.error(`[Bot] Orphaned approval recovery failed: ${result.details}`);
}
console.error('[Bot] Error sending message:', sendError);
throw sendError;
}
// Stream response
let response = '';
let lastUpdate = Date.now();
let messageId: string | null = null;
let lastMsgType: string | null = null;
let lastAssistantUuid: string | null = null;
let sentAnyMessage = false;
let receivedAnyData = false; // Track if we got ANY stream data
const msgTypeCounts: Record<string, number> = {};
// Helper to finalize and send current accumulated response
const finalizeMessage = async () => {
// Check for silent marker - agent chose not to reply
if (response.trim() === '<no-reply/>') {
console.log('[Bot] Agent chose not to reply (no-reply marker)');
sentAnyMessage = true;
response = '';
messageId = null;
lastUpdate = Date.now();
return;
}
if (response.trim()) {
try {
if (messageId) {
await adapter.editMessage(msg.chatId, messageId, response);
} else {
await adapter.sendMessage({ chatId: msg.chatId, text: response, threadId: msg.threadId });
}
sentAnyMessage = true;
const preview = response.length > 50 ? response.slice(0, 50) + '...' : response;
console.log(`[Bot] Sent: "${preview}"`);
} catch {
// Ignore send errors
}
}
// Reset for next message bubble
response = '';
messageId = null;
lastUpdate = Date.now();
};
// Keep typing indicator alive
const typingInterval = setInterval(() => {
adapter.sendTypingIndicator(msg.chatId).catch(() => {});
}, 4000);
const seenToolCallIds = new Set<string>();
try {
for await (const streamMsg of session.stream()) {
// Deduplicate tool_call chunks: the server streams tool_call_message
// events token-by-token as arguments are generated, so a single tool
// call produces many wire events with the same toolCallId.
// Only count/log the first chunk per unique toolCallId.
if (streamMsg.type === 'tool_call') {
const toolCallId = (streamMsg as any).toolCallId;
if (toolCallId && seenToolCallIds.has(toolCallId)) continue;
if (toolCallId) seenToolCallIds.add(toolCallId);
}
const msgUuid = (streamMsg as any).uuid;
receivedAnyData = true;
msgTypeCounts[streamMsg.type] = (msgTypeCounts[streamMsg.type] || 0) + 1;
// Always log every stream message type for debugging approval issues
const preview = JSON.stringify(streamMsg).slice(0, 300);
console.log(`[Stream] type=${streamMsg.type} ${preview}`);
// When message type changes, finalize the current message
// This ensures different message types appear as separate bubbles
if (lastMsgType && lastMsgType !== streamMsg.type && response.trim()) {
await finalizeMessage();
}
// Detect tool-call loops: abort if agent calls too many tools without producing a result
const maxToolCalls = this.config.maxToolCalls ?? 100;
if (streamMsg.type === 'tool_call' && (msgTypeCounts['tool_call'] || 0) >= maxToolCalls) {
console.error(`[Bot] Agent stuck in tool loop (${msgTypeCounts['tool_call']} tool calls, limit=${maxToolCalls}), aborting`);
session.abort().catch(() => {});
response = '(Agent got stuck in a tool loop and was stopped. Try sending your message again.)';
break;
}
// Log meaningful events (always, not just on type change for tools)
if (streamMsg.type === 'tool_call') {
const toolName = (streamMsg as any).toolName || 'unknown';
console.log(`[Bot] Calling tool: ${toolName}`);
} else if (streamMsg.type === 'tool_result') {
const isError = (streamMsg as any).isError;
const contentLen = (streamMsg as any).content?.length || 0;
console.log(`[Bot] Tool completed: error=${isError}, resultLen=${contentLen}`);
} else if (streamMsg.type === 'assistant' && lastMsgType !== 'assistant') {
console.log(`[Bot] Generating response...`);
} else if (streamMsg.type === 'reasoning' && lastMsgType !== 'reasoning') {
console.log(`[Bot] Reasoning...`);
} else if (streamMsg.type === 'init' && lastMsgType !== 'init') {
console.log(`[Bot] Session initialized`);
}
lastMsgType = streamMsg.type;
if (streamMsg.type === 'assistant') {
// Check if this is a new assistant message (different UUID)
if (msgUuid && lastAssistantUuid && msgUuid !== lastAssistantUuid && response.trim()) {
await finalizeMessage();
}
lastAssistantUuid = msgUuid || lastAssistantUuid;
response += streamMsg.content;
// Stream updates only for channels that support editing (Telegram, Slack)
// Hold back streaming edits while response could still become <no-reply/>
const canEdit = adapter.supportsEditing?.() ?? true;
const mayBeNoReply = '<no-reply/>'.startsWith(response.trim());
if (canEdit && !mayBeNoReply && Date.now() - lastUpdate > 500 && response.length > 0) {
try {
if (messageId) {
await adapter.editMessage(msg.chatId, messageId, response);
} else {
const result = await adapter.sendMessage({ chatId: msg.chatId, text: response, threadId: msg.threadId });
messageId = result.messageId;
}
} catch {
// Ignore edit errors
}
lastUpdate = Date.now();
}
}
if (streamMsg.type === 'result') {
// Log result details for debugging (#132)
const resultMsg = streamMsg as { result?: string; success?: boolean; error?: string };
console.log(`[Bot] Stream result: success=${resultMsg.success}, hasResponse=${response.trim().length > 0}, resultLen=${resultMsg.result?.length || 0}`);
console.log(`[Bot] Stream message counts:`, msgTypeCounts);
if (resultMsg.error) {
console.error(`[Bot] Result error: ${resultMsg.error}`);
}
// Check for potential stuck state (empty result usually means pending approval or error)
if (resultMsg.success && resultMsg.result === '' && !response.trim()) {
console.error('[Bot] Warning: Agent returned empty result with no response.');
console.error('[Bot] Agent ID:', this.store.agentId);
console.error('[Bot] Conversation ID:', this.store.conversationId);
// Attempt conversation-level recovery and retry once
if (!retried && this.store.agentId && this.store.conversationId) {
console.log('[Bot] Empty result - attempting orphaned approval recovery...');
session.close();
clearInterval(typingInterval);
const convResult = await recoverOrphanedConversationApproval(
this.store.agentId,
this.store.conversationId
);
if (convResult.recovered) {
console.log(`[Bot] Recovery succeeded (${convResult.details}), retrying message...`);
return this.processMessage(msg, adapter, /* retried */ true);
}
console.warn(`[Bot] No orphaned approvals found: ${convResult.details}`);
}
}
// Save agent ID and conversation ID
if (session.agentId && session.agentId !== this.store.agentId) {
const isNewAgent = !this.store.agentId;
// Save agent ID along with the current server URL
const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com';
this.store.setAgent(session.agentId, currentBaseUrl, session.conversationId || undefined);
console.log('Saved agent ID:', session.agentId, 'conversation ID:', session.conversationId, 'on server:', currentBaseUrl);
// Setup new agents: set name, install skills
if (isNewAgent) {
if (this.config.agentName && session.agentId) {
updateAgentName(session.agentId, this.config.agentName).catch(() => {});
}
if (session.agentId) {
installSkillsToAgent(session.agentId, this.config.skills);
}
}
} else if (session.conversationId && session.conversationId !== this.store.conversationId) {
// Update conversation ID if it changed
this.store.conversationId = session.conversationId;
}
break;
}
}
} finally {
clearInterval(typingInterval);
}
// Check for silent marker - agent chose not to reply
if (response.trim() === '<no-reply/>') {
console.log('[Bot] Agent chose not to reply (no-reply marker)');
sentAnyMessage = true;
response = '';
}
// Detect unsupported multimodal: images were sent but server replaced them
const sentImages = Array.isArray(messageToSend);
if (sentImages && response.includes('[Image omitted]')) {
console.warn('[Bot] Model does not support images — server replaced inline images with "[Image omitted]". Consider using a vision-capable model or setting features.inlineImages: false in config.');
}
// Send final response
if (response.trim()) {
try {
if (messageId) {
await adapter.editMessage(msg.chatId, messageId, response);
} else {
await adapter.sendMessage({ chatId: msg.chatId, text: response, threadId: msg.threadId });
}
sentAnyMessage = true;
// Reset recovery counter on successful response
this.store.resetRecoveryAttempts();
const preview = response.length > 50 ? response.slice(0, 50) + '...' : response;
console.log(`[Bot] Sent: "${preview}"`);
} catch (sendError) {
console.error('[Bot] Error sending response:', sendError);
if (!messageId) {
try {
await adapter.sendMessage({ chatId: msg.chatId, text: response, threadId: msg.threadId });
sentAnyMessage = true;
// Reset recovery counter on successful response
this.store.resetRecoveryAttempts();
} catch (retryError) {
console.error('[Bot] Retry send also failed:', retryError);
}
}
}
}
// Only show "no response" if we never sent anything
if (!sentAnyMessage) {
if (!receivedAnyData) {
// Stream timed out with NO data at all - likely stuck state
console.error('[Bot] Stream received NO DATA - possible stuck state');
console.error('[Bot] Agent:', this.store.agentId);
console.error('[Bot] Conversation:', this.store.conversationId);
console.error('[Bot] This can happen when a previous session disconnected mid-tool-approval');
await adapter.sendMessage({
chatId: msg.chatId,
text: '(Session interrupted. Try: lettabot reset-conversation)',
threadId: msg.threadId
});
} else {
console.warn('[Bot] Stream received data but no assistant message');
console.warn('[Bot] Message types received:', msgTypeCounts);
// If the stream had tool activity, the agent was working and likely
// sent messages via tools (e.g. lettabot-message send). Don't alarm the user.
const hadToolActivity = (msgTypeCounts['tool_call'] || 0) > 0 || (msgTypeCounts['tool_result'] || 0) > 0;
if (hadToolActivity) {
console.log('[Bot] Agent had tool activity but no assistant message - likely sent via tool');
} else {
console.warn('[Bot] Agent:', this.store.agentId);
console.warn('[Bot] Conversation:', this.store.conversationId);
const convIdShort = this.store.conversationId?.slice(0, 8) || 'none';
await adapter.sendMessage({
chatId: msg.chatId,
text: `(No response. Conversation: ${convIdShort}... Try: lettabot reset-conversation)`,
threadId: msg.threadId
});
}
}
}
} catch (error) {
console.error('[Bot] Error processing message:', error);
try {
await adapter.sendMessage({
chatId: msg.chatId,
text: `Error: ${error instanceof Error ? error.message : 'Unknown error'}`,
threadId: msg.threadId,
});
} catch (sendError) {
console.error('[Bot] Failed to send error message to channel:', sendError);
}
} finally {
session!?.close();
}
}
/**
* Send a message to the agent (for cron jobs, webhooks, etc.)
*
* In silent mode (heartbeats, cron), the agent's text response is NOT auto-delivered.
* The agent must use `lettabot-message` CLI via Bash to send messages explicitly.
*
* @param text - The prompt/message to send
* @param context - Optional trigger context (for logging/tracking)
* @returns The agent's response text
*/
async sendToAgent(
text: string,
_context?: TriggerContext
): Promise<string> {
// Wait for any in-progress message processing to complete
// This prevents 409 conflicts when heartbeats overlap with user messages
while (this.processing) {
console.log('[Bot] Waiting for message queue to finish before sendToAgent...');
await new Promise(resolve => setTimeout(resolve, 1000));
}
// Mark as processing to prevent queue from starting
this.processing = true;
console.log('[Bot] sendToAgent acquired processing lock');
try {
return await this._sendToAgentInternal(text, _context);
} finally {
this.processing = false;
console.log('[Bot] sendToAgent released processing lock');
// Trigger queue processing in case messages arrived while we were busy
this.processQueue();
}
}
private async _sendToAgentInternal(
text: string,
_context?: TriggerContext,
retried = false
): Promise<string> {
// Base options for sessions (systemPrompt/memory set via createAgent for new agents)
const baseOptions = {
permissionMode: 'bypassPermissions' as const,
allowedTools: this.config.allowedTools,
cwd: this.config.workingDir,
// bypassPermissions mode auto-allows all tools, no canUseTool callback needed
// But add logging callback to diagnose approval flow issues (#132)
canUseTool: (toolName: string, _toolInput: Record<string, unknown>) => {
console.log(`[Bot] Tool approval requested in sendToAgent: ${toolName}`);
console.log(`[Bot] WARNING: canUseTool callback should NOT be called when permissionMode=bypassPermissions`);
return { behavior: 'allow' as const };
},
};
let session: Session;
let usedDefaultConversation = false;
let usedSpecificConversation = false;
if (this.store.conversationId) {
// Resume the specific conversation we've been using
usedSpecificConversation = true;
session = resumeSession(this.store.conversationId, baseOptions);
} else if (this.store.agentId) {
// Agent exists but no conversation - try default conversation
usedDefaultConversation = true;
session = resumeSession(this.store.agentId, baseOptions);
} else {
// Create new agent with default conversation
const newAgentId = await createAgent({
model: this.config.model,
systemPrompt: SYSTEM_PROMPT,
memory: loadMemoryBlocks(this.config.agentName),
});
session = createSession(newAgentId, baseOptions);
}
try {
try {
await session.send(text);
} catch (error) {
// Check for 409 CONFLICT from orphaned approval_request_message
if (!retried && isApprovalConflictError(error) && this.store.agentId && this.store.conversationId) {
console.log('[Bot] CONFLICT in sendToAgent - attempting orphaned approval recovery...');
session.close();
const result = await recoverOrphanedConversationApproval(
this.store.agentId,
this.store.conversationId
);
if (result.recovered) {
console.log(`[Bot] Recovery succeeded (${result.details}), retrying sendToAgent...`);
return this._sendToAgentInternal(text, _context, /* retried */ true);
}
console.error(`[Bot] Orphaned approval recovery failed: ${result.details}`);
throw error;
}
if (usedSpecificConversation && this.store.agentId) {
console.warn('[Bot] Conversation missing, creating a new conversation...');
session.close();
session = createSession(this.store.agentId, baseOptions);
await session.send(text);
usedSpecificConversation = false;
usedDefaultConversation = false;
} else if (usedDefaultConversation && this.store.agentId) {
console.warn('[Bot] Default conversation missing, creating a new conversation...');
session.close();
session = createSession(this.store.agentId, baseOptions);
await session.send(text);
usedDefaultConversation = false;
} else {
throw error;
}
}
let response = '';
for await (const msg of session.stream()) {
if (msg.type === 'assistant') {
response += msg.content;
}
if (msg.type === 'result') {
if (session.agentId && session.agentId !== this.store.agentId) {
const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com';
this.store.setAgent(session.agentId, currentBaseUrl, session.conversationId || undefined);
} else if (session.conversationId && session.conversationId !== this.store.conversationId) {
this.store.conversationId = session.conversationId;
}
break;
}
}
return response;
} finally {
session.close();
}
}
/**
* Deliver a message or file to a specific channel
*/
async deliverToChannel(
channelId: string,
chatId: string,
options: {
text?: string;
filePath?: string;
kind?: 'image' | 'file';
}
): Promise<string | undefined> {
const adapter = this.channels.get(channelId);
if (!adapter) {
console.error(`Channel not found: ${channelId}`);
throw new Error(`Channel not found: ${channelId}`);
}
// Send file if provided
if (options.filePath) {
if (typeof adapter.sendFile !== 'function') {
throw new Error(`Channel ${channelId} does not support file sending`);
}
const result = await adapter.sendFile({
chatId,
filePath: options.filePath,
caption: options.text, // text becomes caption for files
kind: options.kind,
});
return result.messageId;
}
// Send text message
if (options.text) {
const result = await adapter.sendMessage({ chatId, text: options.text });
return result.messageId;
}
throw new Error('Either text or filePath must be provided');
}
/**
* Get bot status
*/
getStatus(): { agentId: string | null; channels: string[] } {
return {
agentId: this.store.agentId,
channels: Array.from(this.channels.keys()),
};
}
/**
* Set agent ID (for container deploys that discover existing agents)
*/
setAgentId(agentId: string): void {
this.store.agentId = agentId;
console.log(`[Bot] Agent ID set to: ${agentId}`);
}
/**
* Reset agent (clear memory)
*/
reset(): void {
this.store.reset();
console.log('Agent reset');
}
/**
* Get the last message target (for heartbeat delivery)
*/
getLastMessageTarget(): { channel: string; chatId: string } | null {
return this.store.lastMessageTarget || null;
}
/**
* Get the time of the last user message (for heartbeat skip logic)
*/
getLastUserMessageTime(): Date | null {
return this.lastUserMessageTime;
}
}