diff --git a/src/channels/discord.ts b/src/channels/discord.ts index a926342..10bb4d5 100644 --- a/src/channels/discord.ts +++ b/src/channels/discord.ts @@ -254,7 +254,7 @@ Ask the bot owner to approve with: return; } if (this.onCommand) { - if (command === 'status' || command === 'reset' || command === 'heartbeat' || command === 'model') { + if (command === 'status' || command === 'reset' || command === 'heartbeat' || command === 'cancel' || command === 'model') { const result = await this.onCommand(command, message.channel.id, cmdArgs); if (result) { await message.channel.send(result); diff --git a/src/channels/telegram.ts b/src/channels/telegram.ts index 609ce37..c9fae73 100644 --- a/src/channels/telegram.ts +++ b/src/channels/telegram.ts @@ -259,6 +259,13 @@ export class TelegramAdapter implements ChannelAdapter { } }); + this.bot.command('cancel', async (ctx) => { + if (this.onCommand) { + const result = await this.onCommand('cancel', String(ctx.chat.id)); + if (result) await ctx.reply(result); + } + }); + // Handle /model [handle] this.bot.command('model', async (ctx) => { if (this.onCommand) { diff --git a/src/core/bot.ts b/src/core/bot.ts index 0687cce..f32ac20 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -13,7 +13,7 @@ import type { ChannelAdapter } from '../channels/types.js'; import type { BotConfig, InboundMessage, TriggerContext } from './types.js'; import type { AgentSession } from './interfaces.js'; import { Store } from './store.js'; -import { updateAgentName, getPendingApprovals, rejectApproval, cancelRuns, recoverOrphanedConversationApproval, getLatestRunError, getAgentModel, updateAgentModel } from '../tools/letta-api.js'; +import { updateAgentName, getPendingApprovals, rejectApproval, cancelRuns, cancelConversation, recoverOrphanedConversationApproval, getLatestRunError, getAgentModel, updateAgentModel } from '../tools/letta-api.js'; import { installSkillsToAgent, withAgentSkillsOnPath, getAgentSkillExecutableDirs, isVoiceMemoConfigured } from '../skills/loader.js'; import { formatMessageEnvelope, formatGroupBatchEnvelope, type SessionContextOptions } from './formatter.js'; import type { GroupBatcher } from './group-batcher.js'; @@ -292,6 +292,7 @@ export class LettaBot implements AgentSession { private listeningGroupIds: Set = new Set(); private processing = false; // Global lock for shared mode private processingKeys: Set = new Set(); // Per-key locks for per-channel mode + private cancelledKeys: Set = new Set(); // Tracks keys where /cancel was issued // AskUserQuestion support: resolves when the next user message arrives. // In per-chat mode, keyed by convKey so each chat resolves independently. @@ -1470,6 +1471,38 @@ export class LettaBot implements AgentSession { return `Conversation reset for ${scope}. Other conversations are unaffected. (Agent memory is preserved.)`; } } + case 'cancel': { + const convKey = channelId ? this.resolveConversationKey(channelId, chatId) : 'shared'; + + // Check if there's actually an active run for this conversation key + if (!this.processingKeys.has(convKey) && !this.processing) { + return '(Nothing to cancel -- no active run.)'; + } + + // Signal the stream loop to break + this.cancelledKeys.add(convKey); + + // Abort client-side stream + const session = this.sessions.get(convKey); + if (session) { + session.abort().catch(() => {}); + log.info(`/cancel - aborted session stream (key=${convKey})`); + } + + // Cancel server-side run (conversation-scoped) + const convId = convKey === 'shared' + ? this.store.conversationId + : this.store.getConversationId(convKey); + if (convId) { + const ok = await cancelConversation(convId); + if (!ok) { + return '(Run cancelled locally, but server-side cancellation failed.)'; + } + } + + log.info(`/cancel - run cancelled (key=${convKey})`); + return '(Run cancelled.)'; + } case 'model': { const agentId = this.store.agentId; if (!agentId) return 'No agent configured.'; @@ -1871,6 +1904,11 @@ export class LettaBot implements AgentSession { try { let firstChunkLogged = false; for await (const streamMsg of run.stream()) { + // Check for /cancel before processing each chunk + if (this.cancelledKeys.has(convKey)) { + log.info(`Stream cancelled by /cancel (key=${convKey})`); + break; + } if (!firstChunkLogged) { lap('first stream chunk'); firstChunkLogged = true; } receivedAnyData = true; msgTypeCounts[streamMsg.type] = (msgTypeCounts[streamMsg.type] || 0) + 1; @@ -2000,7 +2038,7 @@ export class LettaBot implements AgentSession { || (trimmed.startsWith('')); // Strip any completed block from the streaming text const streamText = stripActionsBlock(response).trim(); - if (canEdit && !mayBeHidden && !suppressDelivery && streamText.length > 0 && Date.now() - lastUpdate > 500) { + if (canEdit && !mayBeHidden && !suppressDelivery && !this.cancelledKeys.has(convKey) && streamText.length > 0 && Date.now() - lastUpdate > 500) { try { const prefixedStream = this.prefixResponse(streamText); if (messageId) { @@ -2018,6 +2056,19 @@ export class LettaBot implements AgentSession { } if (streamMsg.type === 'result') { + // Discard cancelled run results -- the server flushes accumulated + // content from a previously cancelled run as the result for the + // next message. Discard it and retry so the message gets processed. + if (streamMsg.stopReason === 'cancelled') { + log.info(`Discarding cancelled run result (len=${typeof streamMsg.result === 'string' ? streamMsg.result.length : 0})`); + this.invalidateSession(convKey); + session = null; + if (!retried) { + return this.processMessage(msg, adapter, true); + } + break; + } + const resultText = typeof streamMsg.result === 'string' ? streamMsg.result : ''; if (resultText.trim().length > 0) { response = resultText; @@ -2158,6 +2209,17 @@ export class LettaBot implements AgentSession { } lap('stream complete'); + // If cancelled, clean up partial state and return early + if (this.cancelledKeys.has(convKey)) { + if (messageId) { + try { + await adapter.editMessage(msg.chatId, messageId, '(Run cancelled.)'); + } catch { /* best effort */ } + } + log.info(`Skipping post-stream delivery -- cancelled (key=${convKey})`); + return; + } + // Parse and execute XML directives (e.g. ) await parseAndHandleDirectives(); @@ -2239,6 +2301,7 @@ export class LettaBot implements AgentSession { } } finally { // Session stays alive for reuse -- only invalidated on errors + this.cancelledKeys.delete(this.resolveConversationKey(msg.channel, msg.chatId)); } } diff --git a/src/core/commands.test.ts b/src/core/commands.test.ts index 1fc11ae..71f35f0 100644 --- a/src/core/commands.test.ts +++ b/src/core/commands.test.ts @@ -26,6 +26,10 @@ describe('parseCommand', () => { it('returns { command, args } for /model', () => { expect(parseCommand('/model')).toEqual({ command: 'model', args: '' }); }); + + it('returns { command, args } for /cancel', () => { + expect(parseCommand('/cancel')).toEqual({ command: 'cancel', args: '' }); + }); }); describe('invalid input', () => { @@ -93,7 +97,7 @@ describe('COMMANDS', () => { }); it('has exactly 6 commands', () => { - expect(COMMANDS).toHaveLength(6); + expect(COMMANDS).toHaveLength(7); }); }); diff --git a/src/core/commands.ts b/src/core/commands.ts index 8701da5..94e5f98 100644 --- a/src/core/commands.ts +++ b/src/core/commands.ts @@ -4,7 +4,7 @@ * Shared command parsing and help text for all channels. */ -export const COMMANDS = ['status', 'heartbeat', 'reset', 'help', 'start', 'model'] as const; +export const COMMANDS = ['status', 'heartbeat', 'reset', 'cancel', 'help', 'start', 'model'] as const; export type Command = typeof COMMANDS[number]; export interface ParsedCommand { @@ -18,6 +18,7 @@ Commands: /status - Show current status /heartbeat - Trigger heartbeat /reset - Reset conversation (keeps agent memory) +/cancel - Abort the current agent run /model - Show current model and list available models /model - Switch to a different model /help - Show this message diff --git a/src/tools/letta-api.ts b/src/tools/letta-api.ts index 0cd91a2..3f63098 100644 --- a/src/tools/letta-api.ts +++ b/src/tools/letta-api.ts @@ -464,6 +464,30 @@ export async function cancelRuns( } } +/** + * Cancel active runs for a specific conversation. + * Scoped to a single conversation -- won't affect other channels/conversations. + */ +export async function cancelConversation( + conversationId: string +): Promise { + try { + const client = getClient(); + await client.conversations.cancel(conversationId); + log.info(`Cancelled runs for conversation ${conversationId}`); + return true; + } catch (e) { + // 409 "No active runs to cancel" is expected when cancel fires before run starts + const err = e as { status?: number }; + if (err?.status === 409) { + log.info(`No active runs to cancel for conversation ${conversationId} (409)`); + return true; + } + log.error(`Failed to cancel conversation ${conversationId}:`, e); + return false; + } +} + /** * Fetch the error detail from the latest failed run on an agent. * Returns the actual error detail from run metadata (which is more