merge: incorporate upstream fixes (cancel invalidation, approval dedup, cron logging)

This commit is contained in:
Ani Tunturi
2026-03-16 15:01:27 -04:00
committed by Ani
4 changed files with 83 additions and 65 deletions

View File

@@ -830,12 +830,18 @@ export class LettaBot implements AgentSession {
// Signal the stream loop to break
this.cancelledKeys.add(convKey);
// Abort client-side stream
// Abort client-side stream and kill the session subprocess.
// abort() sends an interrupt control_request, but the CLI may not
// handle it if blocked on a long-running tool (e.g., Task subagent).
// invalidateSession() calls session.close() which kills the subprocess,
// closes the transport pump, and resolves all stream waiters with null
// -- guaranteeing the for-await loop in processMessage breaks.
const session = this.sessionManager.getSession(convKey);
if (session) {
session.abort().catch(() => {});
log.info(`/cancel - aborted session stream (key=${convKey})`);
}
this.sessionManager.invalidateSession(convKey);
// Cancel server-side run (conversation-scoped)
const convId = convKey === 'shared'
@@ -1815,8 +1821,12 @@ export class LettaBot implements AgentSession {
eyesAdded = false;
}
// If cancelled, clean up partial state and return early
// If cancelled, clean up partial state and return early.
// Invalidate defensively in case the cancel handler's invalidation
// didn't fire (e.g., race with command dispatch).
if (this.cancelledKeys.has(convKey)) {
this.sessionManager.invalidateSession(convKey);
session = null;
if (messageId) {
try {
await adapter.editMessage(msg.chatId, messageId, '(Run cancelled.)');

View File

@@ -278,12 +278,9 @@ export class HeartbeatService {
});
// Build trigger context for silent mode
const lastTarget = this.bot.getLastMessageTarget();
const triggerContext: TriggerContext = {
type: 'heartbeat',
outputMode: 'silent',
sourceChannel: lastTarget?.channel,
sourceChatId: lastTarget?.chatId,
};
try {

View File

@@ -8,6 +8,7 @@
import { existsSync, readFileSync, writeFileSync, appendFileSync, mkdirSync, copyFileSync, renameSync, watch, type FSWatcher } from 'node:fs';
import { resolve, dirname, basename } from 'node:path';
import type { AgentSession } from '../core/interfaces.js';
import type { TriggerContext } from '../core/types.js';
import type { CronJob, CronJobCreate, CronSchedule, CronConfig, HeartbeatConfig } from './types.js';
import { DEFAULT_HEARTBEAT_MESSAGES } from './types.js';
import { getCronDataDir, getCronLogPath, getCronStorePath, getLegacyCronStorePath } from '../utils/paths.js';
@@ -317,7 +318,8 @@ export class CronService {
try {
// SILENT MODE - response NOT auto-delivered
// Agent must use `lettabot-message` CLI to send messages
const response = await this.bot.sendToAgent(config.message);
const triggerContext: TriggerContext = { type: 'heartbeat', outputMode: 'silent' };
const response = await this.bot.sendToAgent(config.message, triggerContext);
log.info(`Heartbeat finished (SILENT MODE)`);
log.info(` - Response: ${response?.slice(0, 100)}${(response?.length || 0) > 100 ? '...' : ''}`);
@@ -446,7 +448,8 @@ export class CronService {
].join('\n');
// Send message to agent
const response = await this.bot.sendToAgent(messageWithMetadata);
const triggerContext: TriggerContext = { type: 'cron', outputMode: 'silent' };
const response = await this.bot.sendToAgent(messageWithMetadata, triggerContext);
// Resolve delivery target: explicit config > last message target fallback > silent
let deliverTarget: { channel: string; chatId: string } | null = job.deliver ?? null;

View File

@@ -90,8 +90,13 @@ export async function recoverPendingApprovalsForAgent(
};
}
// Deduplicate by tool_call_id defensively (getPendingApprovals should
// already dedup, but this guards against any upstream regression).
const rejectedIds = new Set<string>();
let rejectedCount = 0;
for (const approval of pending) {
if (rejectedIds.has(approval.toolCallId)) continue;
rejectedIds.add(approval.toolCallId);
const ok = await rejectApproval(agentId, {
toolCallId: approval.toolCallId,
reason,
@@ -433,70 +438,73 @@ export async function getPendingApprovals(
stop_reason: 'requires_approval',
limit: 10,
});
const pendingApprovals: PendingApproval[] = [];
// Collect qualifying run IDs (avoid re-fetching messages per run)
const qualifyingRunIds: string[] = [];
for await (const run of runsPage) {
if (run.status === 'running' || run.stop_reason === 'requires_approval') {
// Get recent messages to find approval_request_message
const messagesPage = await client.agents.messages.list(agentId, {
conversation_id: conversationId,
limit: 100,
});
const messages: Array<{ message_type?: string }> = [];
for await (const msg of messagesPage) {
messages.push(msg as { message_type?: string });
}
const resolvedToolCalls = new Set<string>();
for (const msg of messages) {
if ('message_type' in msg && msg.message_type === 'approval_response_message') {
const approvalMsg = msg as {
approvals?: Array<{ tool_call_id?: string | null }>;
};
const approvals = approvalMsg.approvals || [];
for (const approval of approvals) {
if (approval.tool_call_id) {
resolvedToolCalls.add(approval.tool_call_id);
}
}
}
}
const seenToolCalls = new Set<string>();
for (const msg of messages) {
// Check for approval_request_message type
if ('message_type' in msg && msg.message_type === 'approval_request_message') {
const approvalMsg = msg as {
id: string;
tool_calls?: Array<{ tool_call_id: string; name: string }>;
tool_call?: { tool_call_id: string; name: string };
run_id?: string;
};
// Extract tool call info
const toolCalls = approvalMsg.tool_calls || (approvalMsg.tool_call ? [approvalMsg.tool_call] : []);
for (const tc of toolCalls) {
if (resolvedToolCalls.has(tc.tool_call_id)) {
continue;
}
if (seenToolCalls.has(tc.tool_call_id)) {
continue;
}
seenToolCalls.add(tc.tool_call_id);
pendingApprovals.push({
runId: approvalMsg.run_id || run.id,
toolCallId: tc.tool_call_id,
toolName: tc.name,
messageId: approvalMsg.id,
});
}
qualifyingRunIds.push(run.id);
}
}
if (qualifyingRunIds.length === 0) {
return [];
}
// Fetch messages ONCE and scan for resolved + pending approvals
const messagesPage = await client.agents.messages.list(agentId, {
conversation_id: conversationId,
limit: 100,
});
const messages: Array<{ message_type?: string }> = [];
for await (const msg of messagesPage) {
messages.push(msg as { message_type?: string });
}
// Build set of already-resolved tool_call_ids
const resolvedToolCalls = new Set<string>();
for (const msg of messages) {
if ('message_type' in msg && msg.message_type === 'approval_response_message') {
const approvalMsg = msg as {
approvals?: Array<{ tool_call_id?: string | null }>;
};
const approvals = approvalMsg.approvals || [];
for (const approval of approvals) {
if (approval.tool_call_id) {
resolvedToolCalls.add(approval.tool_call_id);
}
}
}
}
// Collect unresolved approval requests, deduplicating across all runs
const pendingApprovals: PendingApproval[] = [];
const seenToolCalls = new Set<string>();
for (const msg of messages) {
if ('message_type' in msg && msg.message_type === 'approval_request_message') {
const approvalMsg = msg as {
id: string;
tool_calls?: Array<{ tool_call_id: string; name: string }>;
tool_call?: { tool_call_id: string; name: string };
run_id?: string;
};
const toolCalls = approvalMsg.tool_calls || (approvalMsg.tool_call ? [approvalMsg.tool_call] : []);
for (const tc of toolCalls) {
if (resolvedToolCalls.has(tc.tool_call_id)) continue;
if (seenToolCalls.has(tc.tool_call_id)) continue;
seenToolCalls.add(tc.tool_call_id);
pendingApprovals.push({
runId: approvalMsg.run_id || qualifyingRunIds[0],
toolCallId: tc.tool_call_id,
toolName: tc.name,
messageId: approvalMsg.id,
});
}
}
}
return pendingApprovals;
} catch (e) {
log.error('Failed to get pending approvals:', e);