diff --git a/src/core/bot.ts b/src/core/bot.ts index 4f77d04..b74b6cd 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -830,12 +830,18 @@ export class LettaBot implements AgentSession { // Signal the stream loop to break this.cancelledKeys.add(convKey); - // Abort client-side stream + // Abort client-side stream and kill the session subprocess. + // abort() sends an interrupt control_request, but the CLI may not + // handle it if blocked on a long-running tool (e.g., Task subagent). + // invalidateSession() calls session.close() which kills the subprocess, + // closes the transport pump, and resolves all stream waiters with null + // -- guaranteeing the for-await loop in processMessage breaks. const session = this.sessionManager.getSession(convKey); if (session) { session.abort().catch(() => {}); log.info(`/cancel - aborted session stream (key=${convKey})`); } + this.sessionManager.invalidateSession(convKey); // Cancel server-side run (conversation-scoped) const convId = convKey === 'shared' @@ -1815,8 +1821,12 @@ export class LettaBot implements AgentSession { eyesAdded = false; } - // If cancelled, clean up partial state and return early + // If cancelled, clean up partial state and return early. + // Invalidate defensively in case the cancel handler's invalidation + // didn't fire (e.g., race with command dispatch). if (this.cancelledKeys.has(convKey)) { + this.sessionManager.invalidateSession(convKey); + session = null; if (messageId) { try { await adapter.editMessage(msg.chatId, messageId, '(Run cancelled.)'); diff --git a/src/cron/heartbeat.ts b/src/cron/heartbeat.ts index 148cba8..5c8c965 100644 --- a/src/cron/heartbeat.ts +++ b/src/cron/heartbeat.ts @@ -278,12 +278,9 @@ export class HeartbeatService { }); // Build trigger context for silent mode - const lastTarget = this.bot.getLastMessageTarget(); const triggerContext: TriggerContext = { type: 'heartbeat', outputMode: 'silent', - sourceChannel: lastTarget?.channel, - sourceChatId: lastTarget?.chatId, }; try { diff --git a/src/cron/service.ts b/src/cron/service.ts index 22a6216..b36455a 100644 --- a/src/cron/service.ts +++ b/src/cron/service.ts @@ -8,6 +8,7 @@ import { existsSync, readFileSync, writeFileSync, appendFileSync, mkdirSync, copyFileSync, renameSync, watch, type FSWatcher } from 'node:fs'; import { resolve, dirname, basename } from 'node:path'; import type { AgentSession } from '../core/interfaces.js'; +import type { TriggerContext } from '../core/types.js'; import type { CronJob, CronJobCreate, CronSchedule, CronConfig, HeartbeatConfig } from './types.js'; import { DEFAULT_HEARTBEAT_MESSAGES } from './types.js'; import { getCronDataDir, getCronLogPath, getCronStorePath, getLegacyCronStorePath } from '../utils/paths.js'; @@ -317,7 +318,8 @@ export class CronService { try { // SILENT MODE - response NOT auto-delivered // Agent must use `lettabot-message` CLI to send messages - const response = await this.bot.sendToAgent(config.message); + const triggerContext: TriggerContext = { type: 'heartbeat', outputMode: 'silent' }; + const response = await this.bot.sendToAgent(config.message, triggerContext); log.info(`Heartbeat finished (SILENT MODE)`); log.info(` - Response: ${response?.slice(0, 100)}${(response?.length || 0) > 100 ? '...' : ''}`); @@ -446,7 +448,8 @@ export class CronService { ].join('\n'); // Send message to agent - const response = await this.bot.sendToAgent(messageWithMetadata); + const triggerContext: TriggerContext = { type: 'cron', outputMode: 'silent' }; + const response = await this.bot.sendToAgent(messageWithMetadata, triggerContext); // Resolve delivery target: explicit config > last message target fallback > silent let deliverTarget: { channel: string; chatId: string } | null = job.deliver ?? null; diff --git a/src/tools/letta-api.ts b/src/tools/letta-api.ts index b0d847d..c86315b 100644 --- a/src/tools/letta-api.ts +++ b/src/tools/letta-api.ts @@ -90,8 +90,13 @@ export async function recoverPendingApprovalsForAgent( }; } + // Deduplicate by tool_call_id defensively (getPendingApprovals should + // already dedup, but this guards against any upstream regression). + const rejectedIds = new Set(); let rejectedCount = 0; for (const approval of pending) { + if (rejectedIds.has(approval.toolCallId)) continue; + rejectedIds.add(approval.toolCallId); const ok = await rejectApproval(agentId, { toolCallId: approval.toolCallId, reason, @@ -433,70 +438,73 @@ export async function getPendingApprovals( stop_reason: 'requires_approval', limit: 10, }); - - const pendingApprovals: PendingApproval[] = []; - + + // Collect qualifying run IDs (avoid re-fetching messages per run) + const qualifyingRunIds: string[] = []; for await (const run of runsPage) { if (run.status === 'running' || run.stop_reason === 'requires_approval') { - // Get recent messages to find approval_request_message - const messagesPage = await client.agents.messages.list(agentId, { - conversation_id: conversationId, - limit: 100, - }); - - const messages: Array<{ message_type?: string }> = []; - for await (const msg of messagesPage) { - messages.push(msg as { message_type?: string }); - } - - const resolvedToolCalls = new Set(); - for (const msg of messages) { - if ('message_type' in msg && msg.message_type === 'approval_response_message') { - const approvalMsg = msg as { - approvals?: Array<{ tool_call_id?: string | null }>; - }; - const approvals = approvalMsg.approvals || []; - for (const approval of approvals) { - if (approval.tool_call_id) { - resolvedToolCalls.add(approval.tool_call_id); - } - } - } - } - - const seenToolCalls = new Set(); - for (const msg of messages) { - // Check for approval_request_message type - if ('message_type' in msg && msg.message_type === 'approval_request_message') { - const approvalMsg = msg as { - id: string; - tool_calls?: Array<{ tool_call_id: string; name: string }>; - tool_call?: { tool_call_id: string; name: string }; - run_id?: string; - }; - - // Extract tool call info - const toolCalls = approvalMsg.tool_calls || (approvalMsg.tool_call ? [approvalMsg.tool_call] : []); - for (const tc of toolCalls) { - if (resolvedToolCalls.has(tc.tool_call_id)) { - continue; - } - if (seenToolCalls.has(tc.tool_call_id)) { - continue; - } - seenToolCalls.add(tc.tool_call_id); - pendingApprovals.push({ - runId: approvalMsg.run_id || run.id, - toolCallId: tc.tool_call_id, - toolName: tc.name, - messageId: approvalMsg.id, - }); - } + qualifyingRunIds.push(run.id); + } + } + + if (qualifyingRunIds.length === 0) { + return []; + } + + // Fetch messages ONCE and scan for resolved + pending approvals + const messagesPage = await client.agents.messages.list(agentId, { + conversation_id: conversationId, + limit: 100, + }); + + const messages: Array<{ message_type?: string }> = []; + for await (const msg of messagesPage) { + messages.push(msg as { message_type?: string }); + } + + // Build set of already-resolved tool_call_ids + const resolvedToolCalls = new Set(); + for (const msg of messages) { + if ('message_type' in msg && msg.message_type === 'approval_response_message') { + const approvalMsg = msg as { + approvals?: Array<{ tool_call_id?: string | null }>; + }; + const approvals = approvalMsg.approvals || []; + for (const approval of approvals) { + if (approval.tool_call_id) { + resolvedToolCalls.add(approval.tool_call_id); } } } } - + + // Collect unresolved approval requests, deduplicating across all runs + const pendingApprovals: PendingApproval[] = []; + const seenToolCalls = new Set(); + for (const msg of messages) { + if ('message_type' in msg && msg.message_type === 'approval_request_message') { + const approvalMsg = msg as { + id: string; + tool_calls?: Array<{ tool_call_id: string; name: string }>; + tool_call?: { tool_call_id: string; name: string }; + run_id?: string; + }; + + const toolCalls = approvalMsg.tool_calls || (approvalMsg.tool_call ? [approvalMsg.tool_call] : []); + for (const tc of toolCalls) { + if (resolvedToolCalls.has(tc.tool_call_id)) continue; + if (seenToolCalls.has(tc.tool_call_id)) continue; + seenToolCalls.add(tc.tool_call_id); + pendingApprovals.push({ + runId: approvalMsg.run_id || qualifyingRunIds[0], + toolCallId: tc.tool_call_id, + toolName: tc.name, + messageId: approvalMsg.id, + }); + } + } + } + return pendingApprovals; } catch (e) { log.error('Failed to get pending approvals:', e);