From f04e56d16517d85a226651da329a6c2488da38b3 Mon Sep 17 00:00:00 2001 From: Cameron Date: Sun, 15 Mar 2026 14:37:14 -0700 Subject: [PATCH 1/3] fix: kill session subprocess on /cancel to break stuck streams (#600) --- src/core/bot.ts | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/core/bot.ts b/src/core/bot.ts index 86024e5..8556448 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -825,12 +825,18 @@ export class LettaBot implements AgentSession { // Signal the stream loop to break this.cancelledKeys.add(convKey); - // Abort client-side stream + // Abort client-side stream and kill the session subprocess. + // abort() sends an interrupt control_request, but the CLI may not + // handle it if blocked on a long-running tool (e.g., Task subagent). + // invalidateSession() calls session.close() which kills the subprocess, + // closes the transport pump, and resolves all stream waiters with null + // -- guaranteeing the for-await loop in processMessage breaks. const session = this.sessionManager.getSession(convKey); if (session) { session.abort().catch(() => {}); log.info(`/cancel - aborted session stream (key=${convKey})`); } + this.sessionManager.invalidateSession(convKey); // Cancel server-side run (conversation-scoped) const convId = convKey === 'shared' @@ -1750,8 +1756,12 @@ export class LettaBot implements AgentSession { } lap('stream complete'); - // If cancelled, clean up partial state and return early + // If cancelled, clean up partial state and return early. + // Invalidate defensively in case the cancel handler's invalidation + // didn't fire (e.g., race with command dispatch). if (this.cancelledKeys.has(convKey)) { + this.sessionManager.invalidateSession(convKey); + session = null; if (messageId) { try { await adapter.editMessage(msg.chatId, messageId, '(Run cancelled.)'); From c50ce657d8116847e3879aedd5580ed8875859b6 Mon Sep 17 00:00:00 2001 From: Jason Carreira Date: Mon, 16 Mar 2026 01:22:43 -0400 Subject: [PATCH 2/3] fix(logging): set correct trigger type for cron/heartbeat turns MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Set the correct trigger type ('cron' and 'heartbeat') for cron/heartbeat turns instead of defaulting to 'message', and remove the spurious channel property from heartbeat turns. Written by Cameron ◯ Letta Code "The measure of intelligence is the ability to change." -- Albert Einstein --- src/cron/heartbeat.ts | 3 --- src/cron/service.ts | 7 +++++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/cron/heartbeat.ts b/src/cron/heartbeat.ts index a8307f7..82c004e 100644 --- a/src/cron/heartbeat.ts +++ b/src/cron/heartbeat.ts @@ -278,12 +278,9 @@ export class HeartbeatService { }); // Build trigger context for silent mode - const lastTarget = this.bot.getLastMessageTarget(); const triggerContext: TriggerContext = { type: 'heartbeat', outputMode: 'silent', - sourceChannel: lastTarget?.channel, - sourceChatId: lastTarget?.chatId, }; try { diff --git a/src/cron/service.ts b/src/cron/service.ts index 22a6216..b36455a 100644 --- a/src/cron/service.ts +++ b/src/cron/service.ts @@ -8,6 +8,7 @@ import { existsSync, readFileSync, writeFileSync, appendFileSync, mkdirSync, copyFileSync, renameSync, watch, type FSWatcher } from 'node:fs'; import { resolve, dirname, basename } from 'node:path'; import type { AgentSession } from '../core/interfaces.js'; +import type { TriggerContext } from '../core/types.js'; import type { CronJob, CronJobCreate, CronSchedule, CronConfig, HeartbeatConfig } from './types.js'; import { DEFAULT_HEARTBEAT_MESSAGES } from './types.js'; import { getCronDataDir, getCronLogPath, getCronStorePath, getLegacyCronStorePath } from '../utils/paths.js'; @@ -317,7 +318,8 @@ export class CronService { try { // SILENT MODE - response NOT auto-delivered // Agent must use `lettabot-message` CLI to send messages - const response = await this.bot.sendToAgent(config.message); + const triggerContext: TriggerContext = { type: 'heartbeat', outputMode: 'silent' }; + const response = await this.bot.sendToAgent(config.message, triggerContext); log.info(`Heartbeat finished (SILENT MODE)`); log.info(` - Response: ${response?.slice(0, 100)}${(response?.length || 0) > 100 ? '...' : ''}`); @@ -446,7 +448,8 @@ export class CronService { ].join('\n'); // Send message to agent - const response = await this.bot.sendToAgent(messageWithMetadata); + const triggerContext: TriggerContext = { type: 'cron', outputMode: 'silent' }; + const response = await this.bot.sendToAgent(messageWithMetadata, triggerContext); // Resolve delivery target: explicit config > last message target fallback > silent let deliverTarget: { channel: string; chatId: string } | null = job.deliver ?? null; From 695c5bc66577ae5170b850f559ed2c0422f40c4d Mon Sep 17 00:00:00 2001 From: Cameron Date: Sun, 15 Mar 2026 22:45:15 -0700 Subject: [PATCH 3/3] fix: deduplicate approval requests across runs (#601) (#602) --- src/tools/letta-api.ts | 124 ++++++++++++++++++++++------------------- 1 file changed, 66 insertions(+), 58 deletions(-) diff --git a/src/tools/letta-api.ts b/src/tools/letta-api.ts index 9a572f6..6efd5c4 100644 --- a/src/tools/letta-api.ts +++ b/src/tools/letta-api.ts @@ -90,8 +90,13 @@ export async function recoverPendingApprovalsForAgent( }; } + // Deduplicate by tool_call_id defensively (getPendingApprovals should + // already dedup, but this guards against any upstream regression). + const rejectedIds = new Set(); let rejectedCount = 0; for (const approval of pending) { + if (rejectedIds.has(approval.toolCallId)) continue; + rejectedIds.add(approval.toolCallId); const ok = await rejectApproval(agentId, { toolCallId: approval.toolCallId, reason, @@ -433,70 +438,73 @@ export async function getPendingApprovals( stop_reason: 'requires_approval', limit: 10, }); - - const pendingApprovals: PendingApproval[] = []; - + + // Collect qualifying run IDs (avoid re-fetching messages per run) + const qualifyingRunIds: string[] = []; for await (const run of runsPage) { if (run.status === 'running' || run.stop_reason === 'requires_approval') { - // Get recent messages to find approval_request_message - const messagesPage = await client.agents.messages.list(agentId, { - conversation_id: conversationId, - limit: 100, - }); - - const messages: Array<{ message_type?: string }> = []; - for await (const msg of messagesPage) { - messages.push(msg as { message_type?: string }); - } - - const resolvedToolCalls = new Set(); - for (const msg of messages) { - if ('message_type' in msg && msg.message_type === 'approval_response_message') { - const approvalMsg = msg as { - approvals?: Array<{ tool_call_id?: string | null }>; - }; - const approvals = approvalMsg.approvals || []; - for (const approval of approvals) { - if (approval.tool_call_id) { - resolvedToolCalls.add(approval.tool_call_id); - } - } - } - } - - const seenToolCalls = new Set(); - for (const msg of messages) { - // Check for approval_request_message type - if ('message_type' in msg && msg.message_type === 'approval_request_message') { - const approvalMsg = msg as { - id: string; - tool_calls?: Array<{ tool_call_id: string; name: string }>; - tool_call?: { tool_call_id: string; name: string }; - run_id?: string; - }; - - // Extract tool call info - const toolCalls = approvalMsg.tool_calls || (approvalMsg.tool_call ? [approvalMsg.tool_call] : []); - for (const tc of toolCalls) { - if (resolvedToolCalls.has(tc.tool_call_id)) { - continue; - } - if (seenToolCalls.has(tc.tool_call_id)) { - continue; - } - seenToolCalls.add(tc.tool_call_id); - pendingApprovals.push({ - runId: approvalMsg.run_id || run.id, - toolCallId: tc.tool_call_id, - toolName: tc.name, - messageId: approvalMsg.id, - }); - } + qualifyingRunIds.push(run.id); + } + } + + if (qualifyingRunIds.length === 0) { + return []; + } + + // Fetch messages ONCE and scan for resolved + pending approvals + const messagesPage = await client.agents.messages.list(agentId, { + conversation_id: conversationId, + limit: 100, + }); + + const messages: Array<{ message_type?: string }> = []; + for await (const msg of messagesPage) { + messages.push(msg as { message_type?: string }); + } + + // Build set of already-resolved tool_call_ids + const resolvedToolCalls = new Set(); + for (const msg of messages) { + if ('message_type' in msg && msg.message_type === 'approval_response_message') { + const approvalMsg = msg as { + approvals?: Array<{ tool_call_id?: string | null }>; + }; + const approvals = approvalMsg.approvals || []; + for (const approval of approvals) { + if (approval.tool_call_id) { + resolvedToolCalls.add(approval.tool_call_id); } } } } - + + // Collect unresolved approval requests, deduplicating across all runs + const pendingApprovals: PendingApproval[] = []; + const seenToolCalls = new Set(); + for (const msg of messages) { + if ('message_type' in msg && msg.message_type === 'approval_request_message') { + const approvalMsg = msg as { + id: string; + tool_calls?: Array<{ tool_call_id: string; name: string }>; + tool_call?: { tool_call_id: string; name: string }; + run_id?: string; + }; + + const toolCalls = approvalMsg.tool_calls || (approvalMsg.tool_call ? [approvalMsg.tool_call] : []); + for (const tc of toolCalls) { + if (resolvedToolCalls.has(tc.tool_call_id)) continue; + if (seenToolCalls.has(tc.tool_call_id)) continue; + seenToolCalls.add(tc.tool_call_id); + pendingApprovals.push({ + runId: approvalMsg.run_id || qualifyingRunIds[0], + toolCallId: tc.tool_call_id, + toolName: tc.name, + messageId: approvalMsg.id, + }); + } + } + } + return pendingApprovals; } catch (e) { log.error('Failed to get pending approvals:', e);