From e3df025bd8798ed6cdfb68875c113780287a63e1 Mon Sep 17 00:00:00 2001 From: Cameron Date: Mon, 9 Feb 2026 15:19:13 -0800 Subject: [PATCH] refactor: unify bot loop with runSession(), drop initialize/timeout (#238) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * refactor: unify bot loop with runSession(), drop initialize/timeout Unify the duplicated session lifecycle in processMessage() and sendToAgent() into shared helpers: - baseSessionOptions: computed once, not duplicated - getSession(): 3-way create/resume/fallback in one place - persistSessionState(): save agentId/conversationId/skills - runSession(): send with CONFLICT retry, deduplicated stream Also: - Drop session.initialize() -- SDK auto-initializes on send() - Drop withTimeout() wrapper -- SDK should own timeouts - sendToAgent() shrinks from 98 to 20 lines - processMessage() shrinks from 437 to ~250 lines (delivery stays) - Net -187 lines (1013 -> 825) All recovery logic preserved: pre-send attemptRecovery(), CONFLICT catch + retry, empty-result orphan recovery. Fixes #197 Written by Cameron ◯ Letta Code "Make it work, make it right, make it fast." -- Kent Beck * fix: narrow conversation-not-found fallback to 404/missing errors Codex review caught that runSession() was retrying with createSession() on ANY send error when agentId exists, not just conversation-missing cases. Auth/network/protocol errors would incorrectly fork conversations. Now only retries on 404 or error messages containing "not found" / "missing" / "does not exist". Other errors propagate immediately. Written by Cameron ◯ Letta Code "Be conservative in what you send, be liberal in what you accept." -- Postel's Law * fix: persist agent ID eagerly on creation, not deferred to result Codex review caught that agent/conversation IDs were only saved in the stream result handler. If createAgent() succeeded but send/stream failed, the ID was lost and the next message would create a duplicate agent. Now: getSession() persists the agent ID + runs first-run setup (name, skills) immediately after createAgent(). persistSessionState() only updates conversation ID on result. Written by Cameron ◯ Letta Code "Fail fast, but don't forget what you learned." -- unknown * fix: persist conversation ID after send, before streaming Codex review caught that conversationId was only saved on stream result. If streaming disconnected or aborted before result, the next turn would fall back to resumeSession(agentId) (default conversation) instead of resuming the actual conversation -- forking context. Now saved immediately after successful send(), matching the pre-refactor behavior where it was saved after initialize(). Written by Cameron ◯ Letta Code "The best time to save state was before the failure. The second best time is now." -- adapted proverb --- src/core/bot.ts | 743 +++++++++++++++++++----------------------------- 1 file changed, 293 insertions(+), 450 deletions(-) diff --git a/src/core/bot.ts b/src/core/bot.ts index 47b250d..2170bbe 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -20,8 +20,6 @@ import { SYSTEM_PROMPT } from './system-prompt.js'; /** * Detect if an error is a 409 CONFLICT from an orphaned approval. - * The error may come as a ConflictError from the Letta client (status 409) - * or as an error message string through the CLI transport. */ function isApprovalConflictError(error: unknown): boolean { if (error instanceof Error) { @@ -34,6 +32,23 @@ function isApprovalConflictError(error: unknown): boolean { return false; } +/** + * Detect if an error indicates a missing conversation or agent. + * Only these errors should trigger the "create new conversation" fallback. + * Auth, network, and protocol errors should NOT be retried. + */ +function isConversationMissingError(error: unknown): boolean { + if (error instanceof Error) { + const msg = error.message.toLowerCase(); + if (msg.includes('not found')) return true; + if (msg.includes('conversation') && (msg.includes('missing') || msg.includes('does not exist'))) return true; + if (msg.includes('agent') && msg.includes('not found')) return true; + } + const statusError = error as { status?: number }; + if (statusError?.status === 404) return true; + return false; +} + const SUPPORTED_IMAGE_MIMES = new Set([ 'image/png', 'image/jpeg', 'image/gif', 'image/webp', ]); @@ -42,7 +57,6 @@ async function buildMultimodalMessage( formattedText: string, msg: InboundMessage, ): Promise { - // Respect opt-out: when INLINE_IMAGES=false, skip multimodal and only send file paths in envelope if (process.env.INLINE_IMAGES === 'false') { return formattedText; } @@ -80,6 +94,22 @@ async function buildMultimodalMessage( return content.length > 1 ? content : formattedText; } +// --------------------------------------------------------------------------- +// Stream message type with toolCallId/uuid for dedup +// --------------------------------------------------------------------------- +interface StreamMsg { + type: string; + content?: string; + toolCallId?: string; + toolName?: string; + uuid?: string; + isError?: boolean; + result?: string; + success?: boolean; + error?: string; + [key: string]: unknown; +} + export class LettaBot implements AgentSession { private store: Store; private config: BotConfig; @@ -90,25 +120,184 @@ export class LettaBot implements AgentSession { // Callback to trigger heartbeat (set by main.ts) public onTriggerHeartbeat?: () => Promise; private groupBatcher?: GroupBatcher; - private groupIntervals: Map = new Map(); // channel -> intervalMin - private instantGroupIds: Set = new Set(); // channel:id keys for instant processing + private groupIntervals: Map = new Map(); + private instantGroupIds: Set = new Set(); private processing = false; constructor(config: BotConfig) { this.config = config; - - // Ensure working directory exists mkdirSync(config.workingDir, { recursive: true }); - - // Store in project root (same as main.ts reads for LETTA_AGENT_ID) this.store = new Store('lettabot-agent.json', config.agentName); - console.log(`LettaBot initialized. Agent ID: ${this.store.agentId || '(new)'}`); } - + + // ========================================================================= + // Session options (shared by processMessage and sendToAgent) + // ========================================================================= + + private get baseSessionOptions() { + return { + permissionMode: 'bypassPermissions' as const, + allowedTools: this.config.allowedTools, + cwd: this.config.workingDir, + canUseTool: (toolName: string, _toolInput: Record) => { + console.log(`[Bot] Tool approval requested: ${toolName} (should be auto-approved by bypassPermissions)`); + return { behavior: 'allow' as const }; + }, + }; + } + + // ========================================================================= + // Session lifecycle helpers + // ========================================================================= + /** - * Register a channel adapter + * Create or resume a session with automatic fallback. + * + * Priority: conversationId → agentId (default conv) → createAgent + * If resume fails (conversation missing), falls back to createSession. */ + private async getSession(): Promise { + const opts = this.baseSessionOptions; + + if (this.store.conversationId) { + process.env.LETTA_AGENT_ID = this.store.agentId || undefined; + return resumeSession(this.store.conversationId, opts); + } + if (this.store.agentId) { + process.env.LETTA_AGENT_ID = this.store.agentId; + return resumeSession(this.store.agentId, opts); + } + + // Create new agent -- persist immediately so we don't orphan it on later failures + console.log('[Bot] Creating new agent'); + const newAgentId = await createAgent({ + systemPrompt: SYSTEM_PROMPT, + memory: loadMemoryBlocks(this.config.agentName), + }); + const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com'; + this.store.setAgent(newAgentId, currentBaseUrl); + console.log('[Bot] Saved new agent ID:', newAgentId); + + // First-run setup: name and skills + if (this.config.agentName) { + updateAgentName(newAgentId, this.config.agentName).catch(() => {}); + } + installSkillsToAgent(newAgentId, this.config.skills); + + return createSession(newAgentId, opts); + } + + /** + * Persist conversation ID after a successful session result. + * Agent ID and first-run setup are handled eagerly in getSession(). + */ + private persistSessionState(session: Session): void { + // Agent ID already persisted in getSession() on creation. + // Here we only update if the server returned a different one (shouldn't happen). + if (session.agentId && session.agentId !== this.store.agentId) { + const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com'; + this.store.setAgent(session.agentId, currentBaseUrl, session.conversationId || undefined); + console.log('[Bot] Agent ID updated:', session.agentId); + } else if (session.conversationId && session.conversationId !== this.store.conversationId) { + this.store.conversationId = session.conversationId; + console.log('[Bot] Conversation ID updated:', session.conversationId); + } + } + + /** + * Send a message and return a deduplicated stream. + * + * Handles: + * - Session creation with fallback chain + * - CONFLICT recovery from orphaned approvals (retry once) + * - Conversation-not-found fallback (create new conversation) + * - Tool call deduplication + * - Session persistence after result + * + * Caller is responsible for consuming the stream and closing the session. + */ + private async runSession( + message: SendMessage, + options: { retried?: boolean } = {}, + ): Promise<{ session: Session; stream: () => AsyncGenerator }> { + const { retried = false } = options; + + let session = await this.getSession(); + + // Send message with fallback chain + try { + await session.send(message); + } catch (error) { + // 409 CONFLICT from orphaned approval + if (!retried && isApprovalConflictError(error) && this.store.agentId && this.store.conversationId) { + console.log('[Bot] CONFLICT detected - attempting orphaned approval recovery...'); + session.close(); + const result = await recoverOrphanedConversationApproval( + this.store.agentId, + this.store.conversationId + ); + if (result.recovered) { + console.log(`[Bot] Recovery succeeded (${result.details}), retrying...`); + return this.runSession(message, { retried: true }); + } + console.error(`[Bot] Orphaned approval recovery failed: ${result.details}`); + throw error; + } + + // Conversation/agent not found - try creating a new conversation. + // Only retry on errors that indicate missing conversation/agent, not + // on auth, network, or protocol errors (which would just fail again). + if (this.store.agentId && isConversationMissingError(error)) { + console.warn('[Bot] Conversation not found, creating a new conversation...'); + session.close(); + session = createSession(this.store.agentId, this.baseSessionOptions); + await session.send(message); + } else { + throw error; + } + } + + // Persist conversation ID immediately after successful send, before streaming. + // If streaming disconnects/aborts before result, the next turn will still + // resume the correct conversation instead of forking a new one. + if (session.conversationId && session.conversationId !== this.store.conversationId) { + this.store.conversationId = session.conversationId; + console.log('[Bot] Saved conversation ID:', session.conversationId); + } + + // Return session and a deduplicated stream generator + const seenToolCallIds = new Set(); + const self = this; + + async function* dedupedStream(): AsyncGenerator { + for await (const raw of session.stream()) { + const msg = raw as StreamMsg; + + // Deduplicate tool_call chunks (server streams token-by-token) + if (msg.type === 'tool_call') { + const id = msg.toolCallId; + if (id && seenToolCallIds.has(id)) continue; + if (id) seenToolCallIds.add(id); + } + + yield msg; + + // Persist state on result + if (msg.type === 'result') { + self.persistSessionState(session); + break; + } + } + } + + return { session, stream: dedupedStream }; + } + + // ========================================================================= + // Channel management + // ========================================================================= + registerChannel(adapter: ChannelAdapter): void { adapter.onMessage = (msg) => this.handleMessage(msg, adapter); adapter.onCommand = (cmd) => this.handleCommand(cmd); @@ -116,9 +305,6 @@ export class LettaBot implements AgentSession { console.log(`Registered channel: ${adapter.name}`); } - /** - * Set the group batcher and per-channel intervals. - */ setGroupBatcher(batcher: GroupBatcher, intervals: Map, instantGroupIds?: Set): void { this.groupBatcher = batcher; this.groupIntervals = intervals; @@ -128,16 +314,9 @@ export class LettaBot implements AgentSession { console.log('[Bot] Group batcher configured'); } - /** - * Inject a batched group message into the queue and trigger processing. - * Called by GroupBatcher's onFlush callback. - */ processGroupBatch(msg: InboundMessage, adapter: ChannelAdapter): void { const count = msg.batchedMessages?.length || 0; console.log(`[Bot] Group batch: ${count} messages from ${msg.channel}:${msg.chatId}`); - - // Unwrap single-message batches so they use formatMessageEnvelope (DM-style) - // instead of the chat-log batch format const effective = (count === 1 && msg.batchedMessages) ? msg.batchedMessages[0] : msg; @@ -147,9 +326,10 @@ export class LettaBot implements AgentSession { } } - /** - * Handle slash commands - */ + // ========================================================================= + // Commands + // ========================================================================= + private async handleCommand(command: string): Promise { console.log(`[Command] Received: /${command}`); switch (command) { @@ -165,13 +345,9 @@ export class LettaBot implements AgentSession { return lines.join('\n'); } case 'heartbeat': { - console.log('[Command] /heartbeat received'); if (!this.onTriggerHeartbeat) { - console.log('[Command] /heartbeat - no trigger callback configured'); return '⚠️ Heartbeat service not configured'; } - console.log('[Command] /heartbeat - triggering heartbeat...'); - // Trigger heartbeat asynchronously this.onTriggerHeartbeat().catch(err => { console.error('[Heartbeat] Manual trigger failed:', err); }); @@ -188,10 +364,11 @@ export class LettaBot implements AgentSession { return null; } } + + // ========================================================================= + // Start / Stop + // ========================================================================= - /** - * Start all registered channels - */ async start(): Promise { const startPromises = Array.from(this.channels.entries()).map(async ([id, adapter]) => { try { @@ -202,13 +379,9 @@ export class LettaBot implements AgentSession { console.error(`Failed to start channel ${id}:`, e); } }); - await Promise.all(startPromises); } - /** - * Stop all channels - */ async stop(): Promise { for (const adapter of this.channels.values()) { try { @@ -218,12 +391,11 @@ export class LettaBot implements AgentSession { } } } + + // ========================================================================= + // Approval recovery + // ========================================================================= - /** - * Attempt to recover from stuck approval state. - * Returns true if recovery was attempted, false if no recovery needed. - * @param maxAttempts Maximum recovery attempts before giving up (default: 2) - */ private async attemptRecovery(maxAttempts = 2): Promise<{ recovered: boolean; shouldReset: boolean }> { if (!this.store.agentId) { return { recovered: false, shouldReset: false }; @@ -232,16 +404,12 @@ export class LettaBot implements AgentSession { console.log('[Bot] Checking for pending approvals...'); try { - // Check for pending approvals FIRST, before checking attempt counter const pendingApprovals = await getPendingApprovals( this.store.agentId, this.store.conversationId || undefined ); if (pendingApprovals.length === 0) { - // Standard check found nothing - try conversation-level inspection as fallback. - // This catches cases where agent.pending_approval is null but the conversation - // has an unresolved approval_request_message from a terminated run. if (this.store.conversationId) { const convResult = await recoverOrphanedConversationApproval( this.store.agentId!, @@ -252,23 +420,19 @@ export class LettaBot implements AgentSession { return { recovered: true, shouldReset: false }; } } - // No pending approvals found by either method this.store.resetRecoveryAttempts(); return { recovered: false, shouldReset: false }; } - // There ARE pending approvals - check if we've exceeded max attempts const attempts = this.store.recoveryAttempts; if (attempts >= maxAttempts) { console.error(`[Bot] Recovery failed after ${attempts} attempts. Still have ${pendingApprovals.length} pending approval(s).`); - console.error('[Bot] Try running: lettabot reset-conversation'); return { recovered: false, shouldReset: true }; } console.log(`[Bot] Found ${pendingApprovals.length} pending approval(s), attempting recovery (attempt ${attempts + 1}/${maxAttempts})...`); this.store.incrementRecoveryAttempts(); - // Reject all pending approvals for (const approval of pendingApprovals) { console.log(`[Bot] Rejecting approval for ${approval.toolName} (${approval.toolCallId})`); await rejectApproval( @@ -278,7 +442,6 @@ export class LettaBot implements AgentSession { ); } - // Cancel any active runs const runIds = [...new Set(pendingApprovals.map(a => a.runId))]; if (runIds.length > 0) { console.log(`[Bot] Cancelling ${runIds.length} active run(s)...`); @@ -294,16 +457,15 @@ export class LettaBot implements AgentSession { return { recovered: false, shouldReset: this.store.recoveryAttempts >= maxAttempts }; } } + + // ========================================================================= + // Message queue + // ========================================================================= - /** - * Queue incoming message for processing (prevents concurrent SDK sessions) - */ private async handleMessage(msg: InboundMessage, adapter: ChannelAdapter): Promise { console.log(`[${msg.channel}] Message from ${msg.userId}: ${msg.text}`); - // Route group messages to batcher if configured if (msg.isGroup && this.groupBatcher) { - // Check if this group is configured for instant processing const isInstant = this.instantGroupIds.has(`${msg.channel}:${msg.chatId}`) || (msg.serverId && this.instantGroupIds.has(`${msg.channel}:${msg.serverId}`)); const debounceMs = isInstant ? 0 : (this.groupIntervals.get(msg.channel) ?? 5000); @@ -312,35 +474,19 @@ export class LettaBot implements AgentSession { return; } - // Add to queue this.messageQueue.push({ msg, adapter }); - console.log(`[Queue] Added to queue, length: ${this.messageQueue.length}, processing: ${this.processing}`); - - // Process queue if not already processing if (!this.processing) { - console.log('[Queue] Starting queue processing'); this.processQueue().catch(err => console.error('[Queue] Fatal error in processQueue:', err)); - } else { - console.log('[Queue] Already processing, will process when current message finishes'); } } - /** - * Process messages one at a time - */ private async processQueue(): Promise { - console.log(`[Queue] processQueue called: processing=${this.processing}, queueLength=${this.messageQueue.length}`); - if (this.processing || this.messageQueue.length === 0) { - console.log('[Queue] Exiting early: already processing or empty queue'); - return; - } + if (this.processing || this.messageQueue.length === 0) return; this.processing = true; - console.log('[Queue] Started processing'); while (this.messageQueue.length > 0) { const { msg, adapter } = this.messageQueue.shift()!; - console.log(`[Queue] Processing message from ${msg.userId} (${this.messageQueue.length} remaining)`); try { await this.processMessage(msg, adapter); } catch (error) { @@ -351,16 +497,14 @@ export class LettaBot implements AgentSession { console.log('[Queue] Finished processing all messages'); this.processing = false; } + + // ========================================================================= + // processMessage - User-facing message handling + // ========================================================================= - /** - * Process a single message - */ private async processMessage(msg: InboundMessage, adapter: ChannelAdapter, retried = false): Promise { - console.log('[Bot] Starting processMessage'); - // Track when user last sent a message (for heartbeat skip logic) + // Track timing and last target this.lastUserMessageTime = new Date(); - - // Track last message target for heartbeat delivery this.store.lastMessageTarget = { channel: msg.channel, chatId: msg.chatId, @@ -368,12 +512,9 @@ export class LettaBot implements AgentSession { updatedAt: new Date().toISOString(), }; - console.log('[Bot] Sending typing indicator'); - // Start typing indicator await adapter.sendTypingIndicator(msg.chatId); - console.log('[Bot] Typing indicator sent'); - // Attempt recovery from stuck approval state before starting session + // Pre-send approval recovery const recovery = await this.attemptRecovery(); if (recovery.shouldReset) { await adapter.sendMessage({ @@ -383,147 +524,37 @@ export class LettaBot implements AgentSession { }); return; } - if (recovery.recovered) { - console.log('[Bot] Recovered from stuck approval, continuing with message processing'); - } - - // Create or resume session - let session: Session; - let usedDefaultConversation = false; - let usedSpecificConversation = false; - // Base options for sessions (systemPrompt/memory set via createAgent for new agents) - const baseOptions = { - permissionMode: 'bypassPermissions' as const, - allowedTools: this.config.allowedTools, - cwd: this.config.workingDir, - // bypassPermissions mode auto-allows all tools, no canUseTool callback needed - // But add logging callback to diagnose approval flow issues (#132) - canUseTool: (toolName: string, toolInput: Record) => { - console.log(`[Bot] Tool approval requested: ${toolName} (should be auto-approved by bypassPermissions)`); - console.log(`[Bot] WARNING: canUseTool callback should NOT be called when permissionMode=bypassPermissions`); - // Return allow anyway as fallback - return { behavior: 'allow' as const }; - }, - }; - console.log('[Bot] Session options:', { permissionMode: baseOptions.permissionMode, allowedTools: baseOptions.allowedTools?.length }); - - console.log('[Bot] Creating/resuming session'); + + // Format message with metadata envelope + const prevTarget = this.store.lastMessageTarget; + const isNewChatSession = !prevTarget || prevTarget.chatId !== msg.chatId || prevTarget.channel !== msg.channel; + const sessionContext: SessionContextOptions | undefined = isNewChatSession ? { + agentId: this.store.agentId || undefined, + serverUrl: process.env.LETTA_BASE_URL || this.store.baseUrl || 'https://api.letta.com', + } : undefined; + + const formattedText = msg.isBatch && msg.batchedMessages + ? formatGroupBatchEnvelope(msg.batchedMessages) + : formatMessageEnvelope(msg, {}, sessionContext); + const messageToSend = await buildMultimodalMessage(formattedText, msg); + + // Run session + let session: Session | null = null; try { - if (this.store.conversationId) { - // Resume the specific conversation we've been using - console.log(`[Bot] Resuming conversation: ${this.store.conversationId}`); - process.env.LETTA_AGENT_ID = this.store.agentId || undefined; - usedSpecificConversation = true; - session = resumeSession(this.store.conversationId, baseOptions); - } else if (this.store.agentId) { - // Agent exists but no conversation - try default conversation - console.log(`[Bot] Resuming agent default conversation: ${this.store.agentId}`); - process.env.LETTA_AGENT_ID = this.store.agentId; - usedDefaultConversation = true; - session = resumeSession(this.store.agentId, baseOptions); - } else { - // Create new agent with default conversation - console.log('[Bot] Creating new agent'); - const newAgentId = await createAgent({ - systemPrompt: SYSTEM_PROMPT, - memory: loadMemoryBlocks(this.config.agentName), - }); - session = createSession(newAgentId, baseOptions); - } - console.log('[Bot] Session created/resumed'); - - const defaultTimeoutMs = 30000; // 30s timeout - const envTimeoutMs = Number(process.env.LETTA_SESSION_TIMEOUT_MS); - const initTimeoutMs = Number.isFinite(envTimeoutMs) && envTimeoutMs > 0 - ? envTimeoutMs - : defaultTimeoutMs; - const withTimeout = async (promise: Promise, label: string): Promise => { - let timeoutId: NodeJS.Timeout; - const timeoutPromise = new Promise((_, reject) => { - timeoutId = setTimeout(() => { - reject(new Error(`${label} timed out after ${initTimeoutMs}ms`)); - }, initTimeoutMs); - }); - try { - return await Promise.race([promise, timeoutPromise]); - } finally { - clearTimeout(timeoutId!); - } - }; - let initInfo; - try { - initInfo = await withTimeout(session.initialize(), 'Session initialize'); - } catch (error) { - if (usedSpecificConversation && this.store.agentId) { - console.warn('[Bot] Conversation missing, creating a new conversation...'); - session.close(); - session = createSession(this.store.agentId, baseOptions); - initInfo = await withTimeout(session.initialize(), 'Session initialize (new conversation)'); - usedSpecificConversation = false; - usedDefaultConversation = false; - } else if (usedDefaultConversation && this.store.agentId) { - console.warn('[Bot] Default conversation missing, creating a new conversation...'); - session.close(); - session = createSession(this.store.agentId, baseOptions); - initInfo = await withTimeout(session.initialize(), 'Session initialize (new conversation)'); - usedDefaultConversation = false; - } else { - throw error; - } - } - if (initInfo.conversationId && initInfo.conversationId !== this.store.conversationId) { - this.store.conversationId = initInfo.conversationId; - console.log('[Bot] Saved conversation ID:', initInfo.conversationId); - } + const run = await this.runSession(messageToSend, { retried }); + session = run.session; - // Determine if this is the first message in a new chat session - // (different chatId from last message target = new session context) - const prevTarget = this.store.lastMessageTarget; - const isNewChatSession = !prevTarget || prevTarget.chatId !== msg.chatId || prevTarget.channel !== msg.channel; - const sessionContext: SessionContextOptions | undefined = isNewChatSession ? { - agentId: this.store.agentId || undefined, - serverUrl: process.env.LETTA_BASE_URL || this.store.baseUrl || 'https://api.letta.com', - } : undefined; - - // Send message to agent with metadata envelope - const formattedText = msg.isBatch && msg.batchedMessages - ? formatGroupBatchEnvelope(msg.batchedMessages) - : formatMessageEnvelope(msg, {}, sessionContext); - const messageToSend = await buildMultimodalMessage(formattedText, msg); - try { - await withTimeout(session.send(messageToSend), 'Session send'); - } catch (sendError) { - // Check for 409 CONFLICT from orphaned approval_request_message - if (!retried && isApprovalConflictError(sendError) && this.store.agentId && this.store.conversationId) { - console.log('[Bot] CONFLICT detected - attempting orphaned approval recovery...'); - session.close(); - const result = await recoverOrphanedConversationApproval( - this.store.agentId, - this.store.conversationId - ); - if (result.recovered) { - console.log(`[Bot] Recovery succeeded (${result.details}), retrying message...`); - return this.processMessage(msg, adapter, /* retried */ true); - } - console.error(`[Bot] Orphaned approval recovery failed: ${result.details}`); - } - console.error('[Bot] Error sending message:', sendError); - throw sendError; - } - - // Stream response + // Stream response with delivery let response = ''; let lastUpdate = Date.now(); let messageId: string | null = null; let lastMsgType: string | null = null; let lastAssistantUuid: string | null = null; let sentAnyMessage = false; - let receivedAnyData = false; // Track if we got ANY stream data + let receivedAnyData = false; const msgTypeCounts: Record = {}; - // Helper to finalize and send current accumulated response const finalizeMessage = async () => { - // Check for silent marker - agent chose not to reply if (response.trim() === '') { console.log('[Bot] Agent chose not to reply (no-reply marker)'); sentAnyMessage = true; @@ -540,87 +571,63 @@ export class LettaBot implements AgentSession { await adapter.sendMessage({ chatId: msg.chatId, text: response, threadId: msg.threadId }); } sentAnyMessage = true; - const preview = response.length > 50 ? response.slice(0, 50) + '...' : response; - console.log(`[Bot] Sent: "${preview}"`); } catch { - // Edit failures (e.g. "message not modified") are OK if we already sent the message if (messageId) sentAnyMessage = true; } } - // Reset for next message bubble response = ''; messageId = null; lastUpdate = Date.now(); }; - // Keep typing indicator alive const typingInterval = setInterval(() => { adapter.sendTypingIndicator(msg.chatId).catch(() => {}); }, 4000); - const seenToolCallIds = new Set(); try { - for await (const streamMsg of session.stream()) { - // Deduplicate tool_call chunks: the server streams tool_call_message - // events token-by-token as arguments are generated, so a single tool - // call produces many wire events with the same toolCallId. - // Only count/log the first chunk per unique toolCallId. - if (streamMsg.type === 'tool_call') { - const toolCallId = (streamMsg as any).toolCallId; - if (toolCallId && seenToolCallIds.has(toolCallId)) continue; - if (toolCallId) seenToolCallIds.add(toolCallId); - } - const msgUuid = (streamMsg as any).uuid; + for await (const streamMsg of run.stream()) { receivedAnyData = true; msgTypeCounts[streamMsg.type] = (msgTypeCounts[streamMsg.type] || 0) + 1; - // Always log every stream message type for debugging approval issues const preview = JSON.stringify(streamMsg).slice(0, 300); console.log(`[Stream] type=${streamMsg.type} ${preview}`); - // When message type changes, finalize the current message - // This ensures different message types appear as separate bubbles + // Finalize on type change if (lastMsgType && lastMsgType !== streamMsg.type && response.trim()) { await finalizeMessage(); } - // Detect tool-call loops: abort if agent calls too many tools without producing a result + // Tool loop detection const maxToolCalls = this.config.maxToolCalls ?? 100; if (streamMsg.type === 'tool_call' && (msgTypeCounts['tool_call'] || 0) >= maxToolCalls) { - console.error(`[Bot] Agent stuck in tool loop (${msgTypeCounts['tool_call']} tool calls, limit=${maxToolCalls}), aborting`); + console.error(`[Bot] Agent stuck in tool loop (${msgTypeCounts['tool_call']} calls), aborting`); session.abort().catch(() => {}); response = '(Agent got stuck in a tool loop and was stopped. Try sending your message again.)'; break; } - // Log meaningful events (always, not just on type change for tools) + // Log meaningful events if (streamMsg.type === 'tool_call') { - const toolName = (streamMsg as any).toolName || 'unknown'; - console.log(`[Bot] Calling tool: ${toolName}`); + console.log(`[Bot] Calling tool: ${streamMsg.toolName || 'unknown'}`); } else if (streamMsg.type === 'tool_result') { - const isError = (streamMsg as any).isError; - const contentLen = (streamMsg as any).content?.length || 0; - console.log(`[Bot] Tool completed: error=${isError}, resultLen=${contentLen}`); + console.log(`[Bot] Tool completed: error=${streamMsg.isError}, resultLen=${(streamMsg as any).content?.length || 0}`); } else if (streamMsg.type === 'assistant' && lastMsgType !== 'assistant') { console.log(`[Bot] Generating response...`); } else if (streamMsg.type === 'reasoning' && lastMsgType !== 'reasoning') { console.log(`[Bot] Reasoning...`); - } else if (streamMsg.type === 'init' && lastMsgType !== 'init') { - console.log(`[Bot] Session initialized`); } lastMsgType = streamMsg.type; if (streamMsg.type === 'assistant') { - // Check if this is a new assistant message (different UUID) + const msgUuid = streamMsg.uuid; if (msgUuid && lastAssistantUuid && msgUuid !== lastAssistantUuid && response.trim()) { await finalizeMessage(); } lastAssistantUuid = msgUuid || lastAssistantUuid; - response += streamMsg.content; + response += streamMsg.content || ''; - // Stream updates only for channels that support editing (Telegram, Slack) - // Hold back streaming edits while response could still become + // Live-edit streaming for channels that support it const canEdit = adapter.supportsEditing?.() ?? true; const mayBeNoReply = ''.startsWith(response.trim()); if (canEdit && !mayBeNoReply && Date.now() - lastUpdate > 500 && response.length > 0) { @@ -633,8 +640,6 @@ export class LettaBot implements AgentSession { sentAnyMessage = true; } } catch (editErr) { - // Log but don't fail - streaming edits are best-effort - // (e.g. rate limits, MarkdownV2 formatting issues mid-stream) console.warn('[Bot] Streaming edit failed:', editErr instanceof Error ? editErr.message : editErr); } lastUpdate = Date.now(); @@ -642,24 +647,19 @@ export class LettaBot implements AgentSession { } if (streamMsg.type === 'result') { - // Log result details for debugging (#132) - const resultMsg = streamMsg as { result?: string; success?: boolean; error?: string }; - console.log(`[Bot] Stream result: success=${resultMsg.success}, hasResponse=${response.trim().length > 0}, resultLen=${resultMsg.result?.length || 0}`); + console.log(`[Bot] Stream result: success=${streamMsg.success}, hasResponse=${response.trim().length > 0}`); console.log(`[Bot] Stream message counts:`, msgTypeCounts); - if (resultMsg.error) { - console.error(`[Bot] Result error: ${resultMsg.error}`); + if (streamMsg.error) { + console.error(`[Bot] Result error: ${streamMsg.error}`); } - // Check for potential stuck state (empty result usually means pending approval or error) - if (resultMsg.success && resultMsg.result === '' && !response.trim()) { + // Empty result recovery + if (streamMsg.success && streamMsg.result === '' && !response.trim()) { console.error('[Bot] Warning: Agent returned empty result with no response.'); - console.error('[Bot] Agent ID:', this.store.agentId); - console.error('[Bot] Conversation ID:', this.store.conversationId); - - // Attempt conversation-level recovery and retry once if (!retried && this.store.agentId && this.store.conversationId) { console.log('[Bot] Empty result - attempting orphaned approval recovery...'); session.close(); + session = null; clearInterval(typingInterval); const convResult = await recoverOrphanedConversationApproval( this.store.agentId, @@ -667,52 +667,28 @@ export class LettaBot implements AgentSession { ); if (convResult.recovered) { console.log(`[Bot] Recovery succeeded (${convResult.details}), retrying message...`); - return this.processMessage(msg, adapter, /* retried */ true); + return this.processMessage(msg, adapter, true); } console.warn(`[Bot] No orphaned approvals found: ${convResult.details}`); } } - // Save agent ID and conversation ID - if (session.agentId && session.agentId !== this.store.agentId) { - const isNewAgent = !this.store.agentId; - // Save agent ID along with the current server URL - const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com'; - this.store.setAgent(session.agentId, currentBaseUrl, session.conversationId || undefined); - console.log('Saved agent ID:', session.agentId, 'conversation ID:', session.conversationId, 'on server:', currentBaseUrl); - - // Setup new agents: set name, install skills - if (isNewAgent) { - if (this.config.agentName && session.agentId) { - updateAgentName(session.agentId, this.config.agentName).catch(() => {}); - } - if (session.agentId) { - installSkillsToAgent(session.agentId, this.config.skills); - } - } - } else if (session.conversationId && session.conversationId !== this.store.conversationId) { - // Update conversation ID if it changed - this.store.conversationId = session.conversationId; - } break; } - } } finally { clearInterval(typingInterval); } - // Check for silent marker - agent chose not to reply + // Handle no-reply marker if (response.trim() === '') { - console.log('[Bot] Agent chose not to reply (no-reply marker)'); sentAnyMessage = true; response = ''; } - // Detect unsupported multimodal: images were sent but server replaced them - const sentImages = Array.isArray(messageToSend); - if (sentImages && response.includes('[Image omitted]')) { - console.warn('[Bot] Model does not support images — server replaced inline images with "[Image omitted]". Consider using a vision-capable model or setting features.inlineImages: false in config.'); + // Detect unsupported multimodal + if (Array.isArray(messageToSend) && response.includes('[Image omitted]')) { + console.warn('[Bot] Model does not support images -- consider a vision-capable model or features.inlineImages: false'); } // Send final response @@ -724,18 +700,12 @@ export class LettaBot implements AgentSession { await adapter.sendMessage({ chatId: msg.chatId, text: response, threadId: msg.threadId }); } sentAnyMessage = true; - // Reset recovery counter on successful response this.store.resetRecoveryAttempts(); - const preview = response.length > 50 ? response.slice(0, 50) + '...' : response; - console.log(`[Bot] Sent: "${preview}"`); - } catch (sendError) { - console.error('[Bot] Error sending response:', sendError); - // If edit failed (messageId exists), send the complete response as a new message - // so the user isn't left with a truncated streaming edit + } catch { + // Edit failed -- send as new message so user isn't left with truncated text try { await adapter.sendMessage({ chatId: msg.chatId, text: response, threadId: msg.threadId }); sentAnyMessage = true; - // Reset recovery counter on successful response this.store.resetRecoveryAttempts(); } catch (retryError) { console.error('[Bot] Retry send also failed:', retryError); @@ -743,30 +713,20 @@ export class LettaBot implements AgentSession { } } - // Only show "no response" if we never sent anything + // Handle no response if (!sentAnyMessage) { if (!receivedAnyData) { - // Stream timed out with NO data at all - likely stuck state console.error('[Bot] Stream received NO DATA - possible stuck state'); - console.error('[Bot] Agent:', this.store.agentId); - console.error('[Bot] Conversation:', this.store.conversationId); - console.error('[Bot] This can happen when a previous session disconnected mid-tool-approval'); await adapter.sendMessage({ chatId: msg.chatId, text: '(Session interrupted. Try: lettabot reset-conversation)', threadId: msg.threadId }); } else { - console.warn('[Bot] Stream received data but no assistant message'); - console.warn('[Bot] Message types received:', msgTypeCounts); - // If the stream had tool activity, the agent was working and likely - // sent messages via tools (e.g. lettabot-message send). Don't alarm the user. const hadToolActivity = (msgTypeCounts['tool_call'] || 0) > 0 || (msgTypeCounts['tool_result'] || 0) > 0; if (hadToolActivity) { console.log('[Bot] Agent had tool activity but no assistant message - likely sent via tool'); } else { - console.warn('[Bot] Agent:', this.store.agentId); - console.warn('[Bot] Conversation:', this.store.conversationId); const convIdShort = this.store.conversationId?.slice(0, 8) || 'none'; await adapter.sendMessage({ chatId: msg.chatId, @@ -789,147 +749,50 @@ export class LettaBot implements AgentSession { console.error('[Bot] Failed to send error message to channel:', sendError); } } finally { - session!?.close(); + session?.close(); } } + + // ========================================================================= + // sendToAgent - Background triggers (heartbeats, cron, webhooks) + // ========================================================================= - /** - * Send a message to the agent (for cron jobs, webhooks, etc.) - * - * In silent mode (heartbeats, cron), the agent's text response is NOT auto-delivered. - * The agent must use `lettabot-message` CLI via Bash to send messages explicitly. - * - * @param text - The prompt/message to send - * @param context - Optional trigger context (for logging/tracking) - * @returns The agent's response text - */ async sendToAgent( text: string, _context?: TriggerContext ): Promise { - // Wait for any in-progress message processing to complete - // This prevents 409 conflicts when heartbeats overlap with user messages + // Serialize with message queue to prevent 409 conflicts while (this.processing) { - console.log('[Bot] Waiting for message queue to finish before sendToAgent...'); await new Promise(resolve => setTimeout(resolve, 1000)); } - // Mark as processing to prevent queue from starting this.processing = true; - console.log('[Bot] sendToAgent acquired processing lock'); try { - return await this._sendToAgentInternal(text, _context); + const { session, stream } = await this.runSession(text); + + try { + let response = ''; + for await (const msg of stream()) { + if (msg.type === 'assistant') { + response += msg.content || ''; + } + if (msg.type === 'result') break; + } + return response; + } finally { + session.close(); + } } finally { this.processing = false; - console.log('[Bot] sendToAgent released processing lock'); - // Trigger queue processing in case messages arrived while we were busy this.processQueue(); } } + + // ========================================================================= + // Channel delivery + status + // ========================================================================= - private async _sendToAgentInternal( - text: string, - _context?: TriggerContext, - retried = false - ): Promise { - // Base options for sessions (systemPrompt/memory set via createAgent for new agents) - const baseOptions = { - permissionMode: 'bypassPermissions' as const, - allowedTools: this.config.allowedTools, - cwd: this.config.workingDir, - // bypassPermissions mode auto-allows all tools, no canUseTool callback needed - // But add logging callback to diagnose approval flow issues (#132) - canUseTool: (toolName: string, _toolInput: Record) => { - console.log(`[Bot] Tool approval requested in sendToAgent: ${toolName}`); - console.log(`[Bot] WARNING: canUseTool callback should NOT be called when permissionMode=bypassPermissions`); - return { behavior: 'allow' as const }; - }, - }; - - let session: Session; - let usedDefaultConversation = false; - let usedSpecificConversation = false; - if (this.store.conversationId) { - // Resume the specific conversation we've been using - usedSpecificConversation = true; - session = resumeSession(this.store.conversationId, baseOptions); - } else if (this.store.agentId) { - // Agent exists but no conversation - try default conversation - usedDefaultConversation = true; - session = resumeSession(this.store.agentId, baseOptions); - } else { - // Create new agent with default conversation - const newAgentId = await createAgent({ - systemPrompt: SYSTEM_PROMPT, - memory: loadMemoryBlocks(this.config.agentName), - }); - session = createSession(newAgentId, baseOptions); - } - - try { - try { - await session.send(text); - } catch (error) { - // Check for 409 CONFLICT from orphaned approval_request_message - if (!retried && isApprovalConflictError(error) && this.store.agentId && this.store.conversationId) { - console.log('[Bot] CONFLICT in sendToAgent - attempting orphaned approval recovery...'); - session.close(); - const result = await recoverOrphanedConversationApproval( - this.store.agentId, - this.store.conversationId - ); - if (result.recovered) { - console.log(`[Bot] Recovery succeeded (${result.details}), retrying sendToAgent...`); - return this._sendToAgentInternal(text, _context, /* retried */ true); - } - console.error(`[Bot] Orphaned approval recovery failed: ${result.details}`); - throw error; - } - if (usedSpecificConversation && this.store.agentId) { - console.warn('[Bot] Conversation missing, creating a new conversation...'); - session.close(); - session = createSession(this.store.agentId, baseOptions); - await session.send(text); - usedSpecificConversation = false; - usedDefaultConversation = false; - } else if (usedDefaultConversation && this.store.agentId) { - console.warn('[Bot] Default conversation missing, creating a new conversation...'); - session.close(); - session = createSession(this.store.agentId, baseOptions); - await session.send(text); - usedDefaultConversation = false; - } else { - throw error; - } - } - - let response = ''; - for await (const msg of session.stream()) { - if (msg.type === 'assistant') { - response += msg.content; - } - - if (msg.type === 'result') { - if (session.agentId && session.agentId !== this.store.agentId) { - const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com'; - this.store.setAgent(session.agentId, currentBaseUrl, session.conversationId || undefined); - } else if (session.conversationId && session.conversationId !== this.store.conversationId) { - this.store.conversationId = session.conversationId; - } - break; - } - } - - return response; - } finally { - session.close(); - } - } - - /** - * Deliver a message or file to a specific channel - */ async deliverToChannel( channelId: string, chatId: string, @@ -941,27 +804,22 @@ export class LettaBot implements AgentSession { ): Promise { const adapter = this.channels.get(channelId); if (!adapter) { - console.error(`Channel not found: ${channelId}`); throw new Error(`Channel not found: ${channelId}`); } - // Send file if provided if (options.filePath) { if (typeof adapter.sendFile !== 'function') { throw new Error(`Channel ${channelId} does not support file sending`); } - const result = await adapter.sendFile({ chatId, filePath: options.filePath, - caption: options.text, // text becomes caption for files + caption: options.text, kind: options.kind, }); - return result.messageId; } - // Send text message if (options.text) { const result = await adapter.sendMessage({ chatId, text: options.text }); return result.messageId; @@ -970,9 +828,6 @@ export class LettaBot implements AgentSession { throw new Error('Either text or filePath must be provided'); } - /** - * Get bot status - */ getStatus(): { agentId: string | null; channels: string[] } { return { agentId: this.store.agentId, @@ -980,32 +835,20 @@ export class LettaBot implements AgentSession { }; } - /** - * Set agent ID (for container deploys that discover existing agents) - */ setAgentId(agentId: string): void { this.store.agentId = agentId; console.log(`[Bot] Agent ID set to: ${agentId}`); } - /** - * Reset agent (clear memory) - */ reset(): void { this.store.reset(); console.log('Agent reset'); } - /** - * Get the last message target (for heartbeat delivery) - */ getLastMessageTarget(): { channel: string; chatId: string } | null { return this.store.lastMessageTarget || null; } - /** - * Get the time of the last user message (for heartbeat skip logic) - */ getLastUserMessageTime(): Date | null { return this.lastUserMessageTime; }