diff --git a/src/core/bot.ts b/src/core/bot.ts index 47b250d..2170bbe 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -20,8 +20,6 @@ import { SYSTEM_PROMPT } from './system-prompt.js'; /** * Detect if an error is a 409 CONFLICT from an orphaned approval. - * The error may come as a ConflictError from the Letta client (status 409) - * or as an error message string through the CLI transport. */ function isApprovalConflictError(error: unknown): boolean { if (error instanceof Error) { @@ -34,6 +32,23 @@ function isApprovalConflictError(error: unknown): boolean { return false; } +/** + * Detect if an error indicates a missing conversation or agent. + * Only these errors should trigger the "create new conversation" fallback. + * Auth, network, and protocol errors should NOT be retried. + */ +function isConversationMissingError(error: unknown): boolean { + if (error instanceof Error) { + const msg = error.message.toLowerCase(); + if (msg.includes('not found')) return true; + if (msg.includes('conversation') && (msg.includes('missing') || msg.includes('does not exist'))) return true; + if (msg.includes('agent') && msg.includes('not found')) return true; + } + const statusError = error as { status?: number }; + if (statusError?.status === 404) return true; + return false; +} + const SUPPORTED_IMAGE_MIMES = new Set([ 'image/png', 'image/jpeg', 'image/gif', 'image/webp', ]); @@ -42,7 +57,6 @@ async function buildMultimodalMessage( formattedText: string, msg: InboundMessage, ): Promise { - // Respect opt-out: when INLINE_IMAGES=false, skip multimodal and only send file paths in envelope if (process.env.INLINE_IMAGES === 'false') { return formattedText; } @@ -80,6 +94,22 @@ async function buildMultimodalMessage( return content.length > 1 ? content : formattedText; } +// --------------------------------------------------------------------------- +// Stream message type with toolCallId/uuid for dedup +// --------------------------------------------------------------------------- +interface StreamMsg { + type: string; + content?: string; + toolCallId?: string; + toolName?: string; + uuid?: string; + isError?: boolean; + result?: string; + success?: boolean; + error?: string; + [key: string]: unknown; +} + export class LettaBot implements AgentSession { private store: Store; private config: BotConfig; @@ -90,25 +120,184 @@ export class LettaBot implements AgentSession { // Callback to trigger heartbeat (set by main.ts) public onTriggerHeartbeat?: () => Promise; private groupBatcher?: GroupBatcher; - private groupIntervals: Map = new Map(); // channel -> intervalMin - private instantGroupIds: Set = new Set(); // channel:id keys for instant processing + private groupIntervals: Map = new Map(); + private instantGroupIds: Set = new Set(); private processing = false; constructor(config: BotConfig) { this.config = config; - - // Ensure working directory exists mkdirSync(config.workingDir, { recursive: true }); - - // Store in project root (same as main.ts reads for LETTA_AGENT_ID) this.store = new Store('lettabot-agent.json', config.agentName); - console.log(`LettaBot initialized. Agent ID: ${this.store.agentId || '(new)'}`); } - + + // ========================================================================= + // Session options (shared by processMessage and sendToAgent) + // ========================================================================= + + private get baseSessionOptions() { + return { + permissionMode: 'bypassPermissions' as const, + allowedTools: this.config.allowedTools, + cwd: this.config.workingDir, + canUseTool: (toolName: string, _toolInput: Record) => { + console.log(`[Bot] Tool approval requested: ${toolName} (should be auto-approved by bypassPermissions)`); + return { behavior: 'allow' as const }; + }, + }; + } + + // ========================================================================= + // Session lifecycle helpers + // ========================================================================= + /** - * Register a channel adapter + * Create or resume a session with automatic fallback. + * + * Priority: conversationId → agentId (default conv) → createAgent + * If resume fails (conversation missing), falls back to createSession. */ + private async getSession(): Promise { + const opts = this.baseSessionOptions; + + if (this.store.conversationId) { + process.env.LETTA_AGENT_ID = this.store.agentId || undefined; + return resumeSession(this.store.conversationId, opts); + } + if (this.store.agentId) { + process.env.LETTA_AGENT_ID = this.store.agentId; + return resumeSession(this.store.agentId, opts); + } + + // Create new agent -- persist immediately so we don't orphan it on later failures + console.log('[Bot] Creating new agent'); + const newAgentId = await createAgent({ + systemPrompt: SYSTEM_PROMPT, + memory: loadMemoryBlocks(this.config.agentName), + }); + const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com'; + this.store.setAgent(newAgentId, currentBaseUrl); + console.log('[Bot] Saved new agent ID:', newAgentId); + + // First-run setup: name and skills + if (this.config.agentName) { + updateAgentName(newAgentId, this.config.agentName).catch(() => {}); + } + installSkillsToAgent(newAgentId, this.config.skills); + + return createSession(newAgentId, opts); + } + + /** + * Persist conversation ID after a successful session result. + * Agent ID and first-run setup are handled eagerly in getSession(). + */ + private persistSessionState(session: Session): void { + // Agent ID already persisted in getSession() on creation. + // Here we only update if the server returned a different one (shouldn't happen). + if (session.agentId && session.agentId !== this.store.agentId) { + const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com'; + this.store.setAgent(session.agentId, currentBaseUrl, session.conversationId || undefined); + console.log('[Bot] Agent ID updated:', session.agentId); + } else if (session.conversationId && session.conversationId !== this.store.conversationId) { + this.store.conversationId = session.conversationId; + console.log('[Bot] Conversation ID updated:', session.conversationId); + } + } + + /** + * Send a message and return a deduplicated stream. + * + * Handles: + * - Session creation with fallback chain + * - CONFLICT recovery from orphaned approvals (retry once) + * - Conversation-not-found fallback (create new conversation) + * - Tool call deduplication + * - Session persistence after result + * + * Caller is responsible for consuming the stream and closing the session. + */ + private async runSession( + message: SendMessage, + options: { retried?: boolean } = {}, + ): Promise<{ session: Session; stream: () => AsyncGenerator }> { + const { retried = false } = options; + + let session = await this.getSession(); + + // Send message with fallback chain + try { + await session.send(message); + } catch (error) { + // 409 CONFLICT from orphaned approval + if (!retried && isApprovalConflictError(error) && this.store.agentId && this.store.conversationId) { + console.log('[Bot] CONFLICT detected - attempting orphaned approval recovery...'); + session.close(); + const result = await recoverOrphanedConversationApproval( + this.store.agentId, + this.store.conversationId + ); + if (result.recovered) { + console.log(`[Bot] Recovery succeeded (${result.details}), retrying...`); + return this.runSession(message, { retried: true }); + } + console.error(`[Bot] Orphaned approval recovery failed: ${result.details}`); + throw error; + } + + // Conversation/agent not found - try creating a new conversation. + // Only retry on errors that indicate missing conversation/agent, not + // on auth, network, or protocol errors (which would just fail again). + if (this.store.agentId && isConversationMissingError(error)) { + console.warn('[Bot] Conversation not found, creating a new conversation...'); + session.close(); + session = createSession(this.store.agentId, this.baseSessionOptions); + await session.send(message); + } else { + throw error; + } + } + + // Persist conversation ID immediately after successful send, before streaming. + // If streaming disconnects/aborts before result, the next turn will still + // resume the correct conversation instead of forking a new one. + if (session.conversationId && session.conversationId !== this.store.conversationId) { + this.store.conversationId = session.conversationId; + console.log('[Bot] Saved conversation ID:', session.conversationId); + } + + // Return session and a deduplicated stream generator + const seenToolCallIds = new Set(); + const self = this; + + async function* dedupedStream(): AsyncGenerator { + for await (const raw of session.stream()) { + const msg = raw as StreamMsg; + + // Deduplicate tool_call chunks (server streams token-by-token) + if (msg.type === 'tool_call') { + const id = msg.toolCallId; + if (id && seenToolCallIds.has(id)) continue; + if (id) seenToolCallIds.add(id); + } + + yield msg; + + // Persist state on result + if (msg.type === 'result') { + self.persistSessionState(session); + break; + } + } + } + + return { session, stream: dedupedStream }; + } + + // ========================================================================= + // Channel management + // ========================================================================= + registerChannel(adapter: ChannelAdapter): void { adapter.onMessage = (msg) => this.handleMessage(msg, adapter); adapter.onCommand = (cmd) => this.handleCommand(cmd); @@ -116,9 +305,6 @@ export class LettaBot implements AgentSession { console.log(`Registered channel: ${adapter.name}`); } - /** - * Set the group batcher and per-channel intervals. - */ setGroupBatcher(batcher: GroupBatcher, intervals: Map, instantGroupIds?: Set): void { this.groupBatcher = batcher; this.groupIntervals = intervals; @@ -128,16 +314,9 @@ export class LettaBot implements AgentSession { console.log('[Bot] Group batcher configured'); } - /** - * Inject a batched group message into the queue and trigger processing. - * Called by GroupBatcher's onFlush callback. - */ processGroupBatch(msg: InboundMessage, adapter: ChannelAdapter): void { const count = msg.batchedMessages?.length || 0; console.log(`[Bot] Group batch: ${count} messages from ${msg.channel}:${msg.chatId}`); - - // Unwrap single-message batches so they use formatMessageEnvelope (DM-style) - // instead of the chat-log batch format const effective = (count === 1 && msg.batchedMessages) ? msg.batchedMessages[0] : msg; @@ -147,9 +326,10 @@ export class LettaBot implements AgentSession { } } - /** - * Handle slash commands - */ + // ========================================================================= + // Commands + // ========================================================================= + private async handleCommand(command: string): Promise { console.log(`[Command] Received: /${command}`); switch (command) { @@ -165,13 +345,9 @@ export class LettaBot implements AgentSession { return lines.join('\n'); } case 'heartbeat': { - console.log('[Command] /heartbeat received'); if (!this.onTriggerHeartbeat) { - console.log('[Command] /heartbeat - no trigger callback configured'); return '⚠️ Heartbeat service not configured'; } - console.log('[Command] /heartbeat - triggering heartbeat...'); - // Trigger heartbeat asynchronously this.onTriggerHeartbeat().catch(err => { console.error('[Heartbeat] Manual trigger failed:', err); }); @@ -188,10 +364,11 @@ export class LettaBot implements AgentSession { return null; } } + + // ========================================================================= + // Start / Stop + // ========================================================================= - /** - * Start all registered channels - */ async start(): Promise { const startPromises = Array.from(this.channels.entries()).map(async ([id, adapter]) => { try { @@ -202,13 +379,9 @@ export class LettaBot implements AgentSession { console.error(`Failed to start channel ${id}:`, e); } }); - await Promise.all(startPromises); } - /** - * Stop all channels - */ async stop(): Promise { for (const adapter of this.channels.values()) { try { @@ -218,12 +391,11 @@ export class LettaBot implements AgentSession { } } } + + // ========================================================================= + // Approval recovery + // ========================================================================= - /** - * Attempt to recover from stuck approval state. - * Returns true if recovery was attempted, false if no recovery needed. - * @param maxAttempts Maximum recovery attempts before giving up (default: 2) - */ private async attemptRecovery(maxAttempts = 2): Promise<{ recovered: boolean; shouldReset: boolean }> { if (!this.store.agentId) { return { recovered: false, shouldReset: false }; @@ -232,16 +404,12 @@ export class LettaBot implements AgentSession { console.log('[Bot] Checking for pending approvals...'); try { - // Check for pending approvals FIRST, before checking attempt counter const pendingApprovals = await getPendingApprovals( this.store.agentId, this.store.conversationId || undefined ); if (pendingApprovals.length === 0) { - // Standard check found nothing - try conversation-level inspection as fallback. - // This catches cases where agent.pending_approval is null but the conversation - // has an unresolved approval_request_message from a terminated run. if (this.store.conversationId) { const convResult = await recoverOrphanedConversationApproval( this.store.agentId!, @@ -252,23 +420,19 @@ export class LettaBot implements AgentSession { return { recovered: true, shouldReset: false }; } } - // No pending approvals found by either method this.store.resetRecoveryAttempts(); return { recovered: false, shouldReset: false }; } - // There ARE pending approvals - check if we've exceeded max attempts const attempts = this.store.recoveryAttempts; if (attempts >= maxAttempts) { console.error(`[Bot] Recovery failed after ${attempts} attempts. Still have ${pendingApprovals.length} pending approval(s).`); - console.error('[Bot] Try running: lettabot reset-conversation'); return { recovered: false, shouldReset: true }; } console.log(`[Bot] Found ${pendingApprovals.length} pending approval(s), attempting recovery (attempt ${attempts + 1}/${maxAttempts})...`); this.store.incrementRecoveryAttempts(); - // Reject all pending approvals for (const approval of pendingApprovals) { console.log(`[Bot] Rejecting approval for ${approval.toolName} (${approval.toolCallId})`); await rejectApproval( @@ -278,7 +442,6 @@ export class LettaBot implements AgentSession { ); } - // Cancel any active runs const runIds = [...new Set(pendingApprovals.map(a => a.runId))]; if (runIds.length > 0) { console.log(`[Bot] Cancelling ${runIds.length} active run(s)...`); @@ -294,16 +457,15 @@ export class LettaBot implements AgentSession { return { recovered: false, shouldReset: this.store.recoveryAttempts >= maxAttempts }; } } + + // ========================================================================= + // Message queue + // ========================================================================= - /** - * Queue incoming message for processing (prevents concurrent SDK sessions) - */ private async handleMessage(msg: InboundMessage, adapter: ChannelAdapter): Promise { console.log(`[${msg.channel}] Message from ${msg.userId}: ${msg.text}`); - // Route group messages to batcher if configured if (msg.isGroup && this.groupBatcher) { - // Check if this group is configured for instant processing const isInstant = this.instantGroupIds.has(`${msg.channel}:${msg.chatId}`) || (msg.serverId && this.instantGroupIds.has(`${msg.channel}:${msg.serverId}`)); const debounceMs = isInstant ? 0 : (this.groupIntervals.get(msg.channel) ?? 5000); @@ -312,35 +474,19 @@ export class LettaBot implements AgentSession { return; } - // Add to queue this.messageQueue.push({ msg, adapter }); - console.log(`[Queue] Added to queue, length: ${this.messageQueue.length}, processing: ${this.processing}`); - - // Process queue if not already processing if (!this.processing) { - console.log('[Queue] Starting queue processing'); this.processQueue().catch(err => console.error('[Queue] Fatal error in processQueue:', err)); - } else { - console.log('[Queue] Already processing, will process when current message finishes'); } } - /** - * Process messages one at a time - */ private async processQueue(): Promise { - console.log(`[Queue] processQueue called: processing=${this.processing}, queueLength=${this.messageQueue.length}`); - if (this.processing || this.messageQueue.length === 0) { - console.log('[Queue] Exiting early: already processing or empty queue'); - return; - } + if (this.processing || this.messageQueue.length === 0) return; this.processing = true; - console.log('[Queue] Started processing'); while (this.messageQueue.length > 0) { const { msg, adapter } = this.messageQueue.shift()!; - console.log(`[Queue] Processing message from ${msg.userId} (${this.messageQueue.length} remaining)`); try { await this.processMessage(msg, adapter); } catch (error) { @@ -351,16 +497,14 @@ export class LettaBot implements AgentSession { console.log('[Queue] Finished processing all messages'); this.processing = false; } + + // ========================================================================= + // processMessage - User-facing message handling + // ========================================================================= - /** - * Process a single message - */ private async processMessage(msg: InboundMessage, adapter: ChannelAdapter, retried = false): Promise { - console.log('[Bot] Starting processMessage'); - // Track when user last sent a message (for heartbeat skip logic) + // Track timing and last target this.lastUserMessageTime = new Date(); - - // Track last message target for heartbeat delivery this.store.lastMessageTarget = { channel: msg.channel, chatId: msg.chatId, @@ -368,12 +512,9 @@ export class LettaBot implements AgentSession { updatedAt: new Date().toISOString(), }; - console.log('[Bot] Sending typing indicator'); - // Start typing indicator await adapter.sendTypingIndicator(msg.chatId); - console.log('[Bot] Typing indicator sent'); - // Attempt recovery from stuck approval state before starting session + // Pre-send approval recovery const recovery = await this.attemptRecovery(); if (recovery.shouldReset) { await adapter.sendMessage({ @@ -383,147 +524,37 @@ export class LettaBot implements AgentSession { }); return; } - if (recovery.recovered) { - console.log('[Bot] Recovered from stuck approval, continuing with message processing'); - } - - // Create or resume session - let session: Session; - let usedDefaultConversation = false; - let usedSpecificConversation = false; - // Base options for sessions (systemPrompt/memory set via createAgent for new agents) - const baseOptions = { - permissionMode: 'bypassPermissions' as const, - allowedTools: this.config.allowedTools, - cwd: this.config.workingDir, - // bypassPermissions mode auto-allows all tools, no canUseTool callback needed - // But add logging callback to diagnose approval flow issues (#132) - canUseTool: (toolName: string, toolInput: Record) => { - console.log(`[Bot] Tool approval requested: ${toolName} (should be auto-approved by bypassPermissions)`); - console.log(`[Bot] WARNING: canUseTool callback should NOT be called when permissionMode=bypassPermissions`); - // Return allow anyway as fallback - return { behavior: 'allow' as const }; - }, - }; - console.log('[Bot] Session options:', { permissionMode: baseOptions.permissionMode, allowedTools: baseOptions.allowedTools?.length }); - - console.log('[Bot] Creating/resuming session'); + + // Format message with metadata envelope + const prevTarget = this.store.lastMessageTarget; + const isNewChatSession = !prevTarget || prevTarget.chatId !== msg.chatId || prevTarget.channel !== msg.channel; + const sessionContext: SessionContextOptions | undefined = isNewChatSession ? { + agentId: this.store.agentId || undefined, + serverUrl: process.env.LETTA_BASE_URL || this.store.baseUrl || 'https://api.letta.com', + } : undefined; + + const formattedText = msg.isBatch && msg.batchedMessages + ? formatGroupBatchEnvelope(msg.batchedMessages) + : formatMessageEnvelope(msg, {}, sessionContext); + const messageToSend = await buildMultimodalMessage(formattedText, msg); + + // Run session + let session: Session | null = null; try { - if (this.store.conversationId) { - // Resume the specific conversation we've been using - console.log(`[Bot] Resuming conversation: ${this.store.conversationId}`); - process.env.LETTA_AGENT_ID = this.store.agentId || undefined; - usedSpecificConversation = true; - session = resumeSession(this.store.conversationId, baseOptions); - } else if (this.store.agentId) { - // Agent exists but no conversation - try default conversation - console.log(`[Bot] Resuming agent default conversation: ${this.store.agentId}`); - process.env.LETTA_AGENT_ID = this.store.agentId; - usedDefaultConversation = true; - session = resumeSession(this.store.agentId, baseOptions); - } else { - // Create new agent with default conversation - console.log('[Bot] Creating new agent'); - const newAgentId = await createAgent({ - systemPrompt: SYSTEM_PROMPT, - memory: loadMemoryBlocks(this.config.agentName), - }); - session = createSession(newAgentId, baseOptions); - } - console.log('[Bot] Session created/resumed'); - - const defaultTimeoutMs = 30000; // 30s timeout - const envTimeoutMs = Number(process.env.LETTA_SESSION_TIMEOUT_MS); - const initTimeoutMs = Number.isFinite(envTimeoutMs) && envTimeoutMs > 0 - ? envTimeoutMs - : defaultTimeoutMs; - const withTimeout = async (promise: Promise, label: string): Promise => { - let timeoutId: NodeJS.Timeout; - const timeoutPromise = new Promise((_, reject) => { - timeoutId = setTimeout(() => { - reject(new Error(`${label} timed out after ${initTimeoutMs}ms`)); - }, initTimeoutMs); - }); - try { - return await Promise.race([promise, timeoutPromise]); - } finally { - clearTimeout(timeoutId!); - } - }; - let initInfo; - try { - initInfo = await withTimeout(session.initialize(), 'Session initialize'); - } catch (error) { - if (usedSpecificConversation && this.store.agentId) { - console.warn('[Bot] Conversation missing, creating a new conversation...'); - session.close(); - session = createSession(this.store.agentId, baseOptions); - initInfo = await withTimeout(session.initialize(), 'Session initialize (new conversation)'); - usedSpecificConversation = false; - usedDefaultConversation = false; - } else if (usedDefaultConversation && this.store.agentId) { - console.warn('[Bot] Default conversation missing, creating a new conversation...'); - session.close(); - session = createSession(this.store.agentId, baseOptions); - initInfo = await withTimeout(session.initialize(), 'Session initialize (new conversation)'); - usedDefaultConversation = false; - } else { - throw error; - } - } - if (initInfo.conversationId && initInfo.conversationId !== this.store.conversationId) { - this.store.conversationId = initInfo.conversationId; - console.log('[Bot] Saved conversation ID:', initInfo.conversationId); - } + const run = await this.runSession(messageToSend, { retried }); + session = run.session; - // Determine if this is the first message in a new chat session - // (different chatId from last message target = new session context) - const prevTarget = this.store.lastMessageTarget; - const isNewChatSession = !prevTarget || prevTarget.chatId !== msg.chatId || prevTarget.channel !== msg.channel; - const sessionContext: SessionContextOptions | undefined = isNewChatSession ? { - agentId: this.store.agentId || undefined, - serverUrl: process.env.LETTA_BASE_URL || this.store.baseUrl || 'https://api.letta.com', - } : undefined; - - // Send message to agent with metadata envelope - const formattedText = msg.isBatch && msg.batchedMessages - ? formatGroupBatchEnvelope(msg.batchedMessages) - : formatMessageEnvelope(msg, {}, sessionContext); - const messageToSend = await buildMultimodalMessage(formattedText, msg); - try { - await withTimeout(session.send(messageToSend), 'Session send'); - } catch (sendError) { - // Check for 409 CONFLICT from orphaned approval_request_message - if (!retried && isApprovalConflictError(sendError) && this.store.agentId && this.store.conversationId) { - console.log('[Bot] CONFLICT detected - attempting orphaned approval recovery...'); - session.close(); - const result = await recoverOrphanedConversationApproval( - this.store.agentId, - this.store.conversationId - ); - if (result.recovered) { - console.log(`[Bot] Recovery succeeded (${result.details}), retrying message...`); - return this.processMessage(msg, adapter, /* retried */ true); - } - console.error(`[Bot] Orphaned approval recovery failed: ${result.details}`); - } - console.error('[Bot] Error sending message:', sendError); - throw sendError; - } - - // Stream response + // Stream response with delivery let response = ''; let lastUpdate = Date.now(); let messageId: string | null = null; let lastMsgType: string | null = null; let lastAssistantUuid: string | null = null; let sentAnyMessage = false; - let receivedAnyData = false; // Track if we got ANY stream data + let receivedAnyData = false; const msgTypeCounts: Record = {}; - // Helper to finalize and send current accumulated response const finalizeMessage = async () => { - // Check for silent marker - agent chose not to reply if (response.trim() === '') { console.log('[Bot] Agent chose not to reply (no-reply marker)'); sentAnyMessage = true; @@ -540,87 +571,63 @@ export class LettaBot implements AgentSession { await adapter.sendMessage({ chatId: msg.chatId, text: response, threadId: msg.threadId }); } sentAnyMessage = true; - const preview = response.length > 50 ? response.slice(0, 50) + '...' : response; - console.log(`[Bot] Sent: "${preview}"`); } catch { - // Edit failures (e.g. "message not modified") are OK if we already sent the message if (messageId) sentAnyMessage = true; } } - // Reset for next message bubble response = ''; messageId = null; lastUpdate = Date.now(); }; - // Keep typing indicator alive const typingInterval = setInterval(() => { adapter.sendTypingIndicator(msg.chatId).catch(() => {}); }, 4000); - const seenToolCallIds = new Set(); try { - for await (const streamMsg of session.stream()) { - // Deduplicate tool_call chunks: the server streams tool_call_message - // events token-by-token as arguments are generated, so a single tool - // call produces many wire events with the same toolCallId. - // Only count/log the first chunk per unique toolCallId. - if (streamMsg.type === 'tool_call') { - const toolCallId = (streamMsg as any).toolCallId; - if (toolCallId && seenToolCallIds.has(toolCallId)) continue; - if (toolCallId) seenToolCallIds.add(toolCallId); - } - const msgUuid = (streamMsg as any).uuid; + for await (const streamMsg of run.stream()) { receivedAnyData = true; msgTypeCounts[streamMsg.type] = (msgTypeCounts[streamMsg.type] || 0) + 1; - // Always log every stream message type for debugging approval issues const preview = JSON.stringify(streamMsg).slice(0, 300); console.log(`[Stream] type=${streamMsg.type} ${preview}`); - // When message type changes, finalize the current message - // This ensures different message types appear as separate bubbles + // Finalize on type change if (lastMsgType && lastMsgType !== streamMsg.type && response.trim()) { await finalizeMessage(); } - // Detect tool-call loops: abort if agent calls too many tools without producing a result + // Tool loop detection const maxToolCalls = this.config.maxToolCalls ?? 100; if (streamMsg.type === 'tool_call' && (msgTypeCounts['tool_call'] || 0) >= maxToolCalls) { - console.error(`[Bot] Agent stuck in tool loop (${msgTypeCounts['tool_call']} tool calls, limit=${maxToolCalls}), aborting`); + console.error(`[Bot] Agent stuck in tool loop (${msgTypeCounts['tool_call']} calls), aborting`); session.abort().catch(() => {}); response = '(Agent got stuck in a tool loop and was stopped. Try sending your message again.)'; break; } - // Log meaningful events (always, not just on type change for tools) + // Log meaningful events if (streamMsg.type === 'tool_call') { - const toolName = (streamMsg as any).toolName || 'unknown'; - console.log(`[Bot] Calling tool: ${toolName}`); + console.log(`[Bot] Calling tool: ${streamMsg.toolName || 'unknown'}`); } else if (streamMsg.type === 'tool_result') { - const isError = (streamMsg as any).isError; - const contentLen = (streamMsg as any).content?.length || 0; - console.log(`[Bot] Tool completed: error=${isError}, resultLen=${contentLen}`); + console.log(`[Bot] Tool completed: error=${streamMsg.isError}, resultLen=${(streamMsg as any).content?.length || 0}`); } else if (streamMsg.type === 'assistant' && lastMsgType !== 'assistant') { console.log(`[Bot] Generating response...`); } else if (streamMsg.type === 'reasoning' && lastMsgType !== 'reasoning') { console.log(`[Bot] Reasoning...`); - } else if (streamMsg.type === 'init' && lastMsgType !== 'init') { - console.log(`[Bot] Session initialized`); } lastMsgType = streamMsg.type; if (streamMsg.type === 'assistant') { - // Check if this is a new assistant message (different UUID) + const msgUuid = streamMsg.uuid; if (msgUuid && lastAssistantUuid && msgUuid !== lastAssistantUuid && response.trim()) { await finalizeMessage(); } lastAssistantUuid = msgUuid || lastAssistantUuid; - response += streamMsg.content; + response += streamMsg.content || ''; - // Stream updates only for channels that support editing (Telegram, Slack) - // Hold back streaming edits while response could still become + // Live-edit streaming for channels that support it const canEdit = adapter.supportsEditing?.() ?? true; const mayBeNoReply = ''.startsWith(response.trim()); if (canEdit && !mayBeNoReply && Date.now() - lastUpdate > 500 && response.length > 0) { @@ -633,8 +640,6 @@ export class LettaBot implements AgentSession { sentAnyMessage = true; } } catch (editErr) { - // Log but don't fail - streaming edits are best-effort - // (e.g. rate limits, MarkdownV2 formatting issues mid-stream) console.warn('[Bot] Streaming edit failed:', editErr instanceof Error ? editErr.message : editErr); } lastUpdate = Date.now(); @@ -642,24 +647,19 @@ export class LettaBot implements AgentSession { } if (streamMsg.type === 'result') { - // Log result details for debugging (#132) - const resultMsg = streamMsg as { result?: string; success?: boolean; error?: string }; - console.log(`[Bot] Stream result: success=${resultMsg.success}, hasResponse=${response.trim().length > 0}, resultLen=${resultMsg.result?.length || 0}`); + console.log(`[Bot] Stream result: success=${streamMsg.success}, hasResponse=${response.trim().length > 0}`); console.log(`[Bot] Stream message counts:`, msgTypeCounts); - if (resultMsg.error) { - console.error(`[Bot] Result error: ${resultMsg.error}`); + if (streamMsg.error) { + console.error(`[Bot] Result error: ${streamMsg.error}`); } - // Check for potential stuck state (empty result usually means pending approval or error) - if (resultMsg.success && resultMsg.result === '' && !response.trim()) { + // Empty result recovery + if (streamMsg.success && streamMsg.result === '' && !response.trim()) { console.error('[Bot] Warning: Agent returned empty result with no response.'); - console.error('[Bot] Agent ID:', this.store.agentId); - console.error('[Bot] Conversation ID:', this.store.conversationId); - - // Attempt conversation-level recovery and retry once if (!retried && this.store.agentId && this.store.conversationId) { console.log('[Bot] Empty result - attempting orphaned approval recovery...'); session.close(); + session = null; clearInterval(typingInterval); const convResult = await recoverOrphanedConversationApproval( this.store.agentId, @@ -667,52 +667,28 @@ export class LettaBot implements AgentSession { ); if (convResult.recovered) { console.log(`[Bot] Recovery succeeded (${convResult.details}), retrying message...`); - return this.processMessage(msg, adapter, /* retried */ true); + return this.processMessage(msg, adapter, true); } console.warn(`[Bot] No orphaned approvals found: ${convResult.details}`); } } - // Save agent ID and conversation ID - if (session.agentId && session.agentId !== this.store.agentId) { - const isNewAgent = !this.store.agentId; - // Save agent ID along with the current server URL - const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com'; - this.store.setAgent(session.agentId, currentBaseUrl, session.conversationId || undefined); - console.log('Saved agent ID:', session.agentId, 'conversation ID:', session.conversationId, 'on server:', currentBaseUrl); - - // Setup new agents: set name, install skills - if (isNewAgent) { - if (this.config.agentName && session.agentId) { - updateAgentName(session.agentId, this.config.agentName).catch(() => {}); - } - if (session.agentId) { - installSkillsToAgent(session.agentId, this.config.skills); - } - } - } else if (session.conversationId && session.conversationId !== this.store.conversationId) { - // Update conversation ID if it changed - this.store.conversationId = session.conversationId; - } break; } - } } finally { clearInterval(typingInterval); } - // Check for silent marker - agent chose not to reply + // Handle no-reply marker if (response.trim() === '') { - console.log('[Bot] Agent chose not to reply (no-reply marker)'); sentAnyMessage = true; response = ''; } - // Detect unsupported multimodal: images were sent but server replaced them - const sentImages = Array.isArray(messageToSend); - if (sentImages && response.includes('[Image omitted]')) { - console.warn('[Bot] Model does not support images — server replaced inline images with "[Image omitted]". Consider using a vision-capable model or setting features.inlineImages: false in config.'); + // Detect unsupported multimodal + if (Array.isArray(messageToSend) && response.includes('[Image omitted]')) { + console.warn('[Bot] Model does not support images -- consider a vision-capable model or features.inlineImages: false'); } // Send final response @@ -724,18 +700,12 @@ export class LettaBot implements AgentSession { await adapter.sendMessage({ chatId: msg.chatId, text: response, threadId: msg.threadId }); } sentAnyMessage = true; - // Reset recovery counter on successful response this.store.resetRecoveryAttempts(); - const preview = response.length > 50 ? response.slice(0, 50) + '...' : response; - console.log(`[Bot] Sent: "${preview}"`); - } catch (sendError) { - console.error('[Bot] Error sending response:', sendError); - // If edit failed (messageId exists), send the complete response as a new message - // so the user isn't left with a truncated streaming edit + } catch { + // Edit failed -- send as new message so user isn't left with truncated text try { await adapter.sendMessage({ chatId: msg.chatId, text: response, threadId: msg.threadId }); sentAnyMessage = true; - // Reset recovery counter on successful response this.store.resetRecoveryAttempts(); } catch (retryError) { console.error('[Bot] Retry send also failed:', retryError); @@ -743,30 +713,20 @@ export class LettaBot implements AgentSession { } } - // Only show "no response" if we never sent anything + // Handle no response if (!sentAnyMessage) { if (!receivedAnyData) { - // Stream timed out with NO data at all - likely stuck state console.error('[Bot] Stream received NO DATA - possible stuck state'); - console.error('[Bot] Agent:', this.store.agentId); - console.error('[Bot] Conversation:', this.store.conversationId); - console.error('[Bot] This can happen when a previous session disconnected mid-tool-approval'); await adapter.sendMessage({ chatId: msg.chatId, text: '(Session interrupted. Try: lettabot reset-conversation)', threadId: msg.threadId }); } else { - console.warn('[Bot] Stream received data but no assistant message'); - console.warn('[Bot] Message types received:', msgTypeCounts); - // If the stream had tool activity, the agent was working and likely - // sent messages via tools (e.g. lettabot-message send). Don't alarm the user. const hadToolActivity = (msgTypeCounts['tool_call'] || 0) > 0 || (msgTypeCounts['tool_result'] || 0) > 0; if (hadToolActivity) { console.log('[Bot] Agent had tool activity but no assistant message - likely sent via tool'); } else { - console.warn('[Bot] Agent:', this.store.agentId); - console.warn('[Bot] Conversation:', this.store.conversationId); const convIdShort = this.store.conversationId?.slice(0, 8) || 'none'; await adapter.sendMessage({ chatId: msg.chatId, @@ -789,147 +749,50 @@ export class LettaBot implements AgentSession { console.error('[Bot] Failed to send error message to channel:', sendError); } } finally { - session!?.close(); + session?.close(); } } + + // ========================================================================= + // sendToAgent - Background triggers (heartbeats, cron, webhooks) + // ========================================================================= - /** - * Send a message to the agent (for cron jobs, webhooks, etc.) - * - * In silent mode (heartbeats, cron), the agent's text response is NOT auto-delivered. - * The agent must use `lettabot-message` CLI via Bash to send messages explicitly. - * - * @param text - The prompt/message to send - * @param context - Optional trigger context (for logging/tracking) - * @returns The agent's response text - */ async sendToAgent( text: string, _context?: TriggerContext ): Promise { - // Wait for any in-progress message processing to complete - // This prevents 409 conflicts when heartbeats overlap with user messages + // Serialize with message queue to prevent 409 conflicts while (this.processing) { - console.log('[Bot] Waiting for message queue to finish before sendToAgent...'); await new Promise(resolve => setTimeout(resolve, 1000)); } - // Mark as processing to prevent queue from starting this.processing = true; - console.log('[Bot] sendToAgent acquired processing lock'); try { - return await this._sendToAgentInternal(text, _context); + const { session, stream } = await this.runSession(text); + + try { + let response = ''; + for await (const msg of stream()) { + if (msg.type === 'assistant') { + response += msg.content || ''; + } + if (msg.type === 'result') break; + } + return response; + } finally { + session.close(); + } } finally { this.processing = false; - console.log('[Bot] sendToAgent released processing lock'); - // Trigger queue processing in case messages arrived while we were busy this.processQueue(); } } + + // ========================================================================= + // Channel delivery + status + // ========================================================================= - private async _sendToAgentInternal( - text: string, - _context?: TriggerContext, - retried = false - ): Promise { - // Base options for sessions (systemPrompt/memory set via createAgent for new agents) - const baseOptions = { - permissionMode: 'bypassPermissions' as const, - allowedTools: this.config.allowedTools, - cwd: this.config.workingDir, - // bypassPermissions mode auto-allows all tools, no canUseTool callback needed - // But add logging callback to diagnose approval flow issues (#132) - canUseTool: (toolName: string, _toolInput: Record) => { - console.log(`[Bot] Tool approval requested in sendToAgent: ${toolName}`); - console.log(`[Bot] WARNING: canUseTool callback should NOT be called when permissionMode=bypassPermissions`); - return { behavior: 'allow' as const }; - }, - }; - - let session: Session; - let usedDefaultConversation = false; - let usedSpecificConversation = false; - if (this.store.conversationId) { - // Resume the specific conversation we've been using - usedSpecificConversation = true; - session = resumeSession(this.store.conversationId, baseOptions); - } else if (this.store.agentId) { - // Agent exists but no conversation - try default conversation - usedDefaultConversation = true; - session = resumeSession(this.store.agentId, baseOptions); - } else { - // Create new agent with default conversation - const newAgentId = await createAgent({ - systemPrompt: SYSTEM_PROMPT, - memory: loadMemoryBlocks(this.config.agentName), - }); - session = createSession(newAgentId, baseOptions); - } - - try { - try { - await session.send(text); - } catch (error) { - // Check for 409 CONFLICT from orphaned approval_request_message - if (!retried && isApprovalConflictError(error) && this.store.agentId && this.store.conversationId) { - console.log('[Bot] CONFLICT in sendToAgent - attempting orphaned approval recovery...'); - session.close(); - const result = await recoverOrphanedConversationApproval( - this.store.agentId, - this.store.conversationId - ); - if (result.recovered) { - console.log(`[Bot] Recovery succeeded (${result.details}), retrying sendToAgent...`); - return this._sendToAgentInternal(text, _context, /* retried */ true); - } - console.error(`[Bot] Orphaned approval recovery failed: ${result.details}`); - throw error; - } - if (usedSpecificConversation && this.store.agentId) { - console.warn('[Bot] Conversation missing, creating a new conversation...'); - session.close(); - session = createSession(this.store.agentId, baseOptions); - await session.send(text); - usedSpecificConversation = false; - usedDefaultConversation = false; - } else if (usedDefaultConversation && this.store.agentId) { - console.warn('[Bot] Default conversation missing, creating a new conversation...'); - session.close(); - session = createSession(this.store.agentId, baseOptions); - await session.send(text); - usedDefaultConversation = false; - } else { - throw error; - } - } - - let response = ''; - for await (const msg of session.stream()) { - if (msg.type === 'assistant') { - response += msg.content; - } - - if (msg.type === 'result') { - if (session.agentId && session.agentId !== this.store.agentId) { - const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com'; - this.store.setAgent(session.agentId, currentBaseUrl, session.conversationId || undefined); - } else if (session.conversationId && session.conversationId !== this.store.conversationId) { - this.store.conversationId = session.conversationId; - } - break; - } - } - - return response; - } finally { - session.close(); - } - } - - /** - * Deliver a message or file to a specific channel - */ async deliverToChannel( channelId: string, chatId: string, @@ -941,27 +804,22 @@ export class LettaBot implements AgentSession { ): Promise { const adapter = this.channels.get(channelId); if (!adapter) { - console.error(`Channel not found: ${channelId}`); throw new Error(`Channel not found: ${channelId}`); } - // Send file if provided if (options.filePath) { if (typeof adapter.sendFile !== 'function') { throw new Error(`Channel ${channelId} does not support file sending`); } - const result = await adapter.sendFile({ chatId, filePath: options.filePath, - caption: options.text, // text becomes caption for files + caption: options.text, kind: options.kind, }); - return result.messageId; } - // Send text message if (options.text) { const result = await adapter.sendMessage({ chatId, text: options.text }); return result.messageId; @@ -970,9 +828,6 @@ export class LettaBot implements AgentSession { throw new Error('Either text or filePath must be provided'); } - /** - * Get bot status - */ getStatus(): { agentId: string | null; channels: string[] } { return { agentId: this.store.agentId, @@ -980,32 +835,20 @@ export class LettaBot implements AgentSession { }; } - /** - * Set agent ID (for container deploys that discover existing agents) - */ setAgentId(agentId: string): void { this.store.agentId = agentId; console.log(`[Bot] Agent ID set to: ${agentId}`); } - /** - * Reset agent (clear memory) - */ reset(): void { this.store.reset(); console.log('Agent reset'); } - /** - * Get the last message target (for heartbeat delivery) - */ getLastMessageTarget(): { channel: string; chatId: string } | null { return this.store.lastMessageTarget || null; } - /** - * Get the time of the last user message (for heartbeat skip logic) - */ getLastUserMessageTime(): Date | null { return this.lastUserMessageTime; }