feat: /cancel slash command to abort active agent runs (#422)
This commit is contained in:
@@ -254,7 +254,7 @@ Ask the bot owner to approve with:
|
||||
return;
|
||||
}
|
||||
if (this.onCommand) {
|
||||
if (command === 'status' || command === 'reset' || command === 'heartbeat' || command === 'model') {
|
||||
if (command === 'status' || command === 'reset' || command === 'heartbeat' || command === 'cancel' || command === 'model') {
|
||||
const result = await this.onCommand(command, message.channel.id, cmdArgs);
|
||||
if (result) {
|
||||
await message.channel.send(result);
|
||||
|
||||
@@ -259,6 +259,13 @@ export class TelegramAdapter implements ChannelAdapter {
|
||||
}
|
||||
});
|
||||
|
||||
this.bot.command('cancel', async (ctx) => {
|
||||
if (this.onCommand) {
|
||||
const result = await this.onCommand('cancel', String(ctx.chat.id));
|
||||
if (result) await ctx.reply(result);
|
||||
}
|
||||
});
|
||||
|
||||
// Handle /model [handle]
|
||||
this.bot.command('model', async (ctx) => {
|
||||
if (this.onCommand) {
|
||||
|
||||
@@ -13,7 +13,7 @@ import type { ChannelAdapter } from '../channels/types.js';
|
||||
import type { BotConfig, InboundMessage, TriggerContext } from './types.js';
|
||||
import type { AgentSession } from './interfaces.js';
|
||||
import { Store } from './store.js';
|
||||
import { updateAgentName, getPendingApprovals, rejectApproval, cancelRuns, recoverOrphanedConversationApproval, getLatestRunError, getAgentModel, updateAgentModel } from '../tools/letta-api.js';
|
||||
import { updateAgentName, getPendingApprovals, rejectApproval, cancelRuns, cancelConversation, recoverOrphanedConversationApproval, getLatestRunError, getAgentModel, updateAgentModel } from '../tools/letta-api.js';
|
||||
import { installSkillsToAgent, withAgentSkillsOnPath, getAgentSkillExecutableDirs, isVoiceMemoConfigured } from '../skills/loader.js';
|
||||
import { formatMessageEnvelope, formatGroupBatchEnvelope, type SessionContextOptions } from './formatter.js';
|
||||
import type { GroupBatcher } from './group-batcher.js';
|
||||
@@ -292,6 +292,7 @@ export class LettaBot implements AgentSession {
|
||||
private listeningGroupIds: Set<string> = new Set();
|
||||
private processing = false; // Global lock for shared mode
|
||||
private processingKeys: Set<string> = new Set(); // Per-key locks for per-channel mode
|
||||
private cancelledKeys: Set<string> = new Set(); // Tracks keys where /cancel was issued
|
||||
|
||||
// AskUserQuestion support: resolves when the next user message arrives.
|
||||
// In per-chat mode, keyed by convKey so each chat resolves independently.
|
||||
@@ -1470,6 +1471,38 @@ export class LettaBot implements AgentSession {
|
||||
return `Conversation reset for ${scope}. Other conversations are unaffected. (Agent memory is preserved.)`;
|
||||
}
|
||||
}
|
||||
case 'cancel': {
|
||||
const convKey = channelId ? this.resolveConversationKey(channelId, chatId) : 'shared';
|
||||
|
||||
// Check if there's actually an active run for this conversation key
|
||||
if (!this.processingKeys.has(convKey) && !this.processing) {
|
||||
return '(Nothing to cancel -- no active run.)';
|
||||
}
|
||||
|
||||
// Signal the stream loop to break
|
||||
this.cancelledKeys.add(convKey);
|
||||
|
||||
// Abort client-side stream
|
||||
const session = this.sessions.get(convKey);
|
||||
if (session) {
|
||||
session.abort().catch(() => {});
|
||||
log.info(`/cancel - aborted session stream (key=${convKey})`);
|
||||
}
|
||||
|
||||
// Cancel server-side run (conversation-scoped)
|
||||
const convId = convKey === 'shared'
|
||||
? this.store.conversationId
|
||||
: this.store.getConversationId(convKey);
|
||||
if (convId) {
|
||||
const ok = await cancelConversation(convId);
|
||||
if (!ok) {
|
||||
return '(Run cancelled locally, but server-side cancellation failed.)';
|
||||
}
|
||||
}
|
||||
|
||||
log.info(`/cancel - run cancelled (key=${convKey})`);
|
||||
return '(Run cancelled.)';
|
||||
}
|
||||
case 'model': {
|
||||
const agentId = this.store.agentId;
|
||||
if (!agentId) return 'No agent configured.';
|
||||
@@ -1871,6 +1904,11 @@ export class LettaBot implements AgentSession {
|
||||
try {
|
||||
let firstChunkLogged = false;
|
||||
for await (const streamMsg of run.stream()) {
|
||||
// Check for /cancel before processing each chunk
|
||||
if (this.cancelledKeys.has(convKey)) {
|
||||
log.info(`Stream cancelled by /cancel (key=${convKey})`);
|
||||
break;
|
||||
}
|
||||
if (!firstChunkLogged) { lap('first stream chunk'); firstChunkLogged = true; }
|
||||
receivedAnyData = true;
|
||||
msgTypeCounts[streamMsg.type] = (msgTypeCounts[streamMsg.type] || 0) + 1;
|
||||
@@ -2000,7 +2038,7 @@ export class LettaBot implements AgentSession {
|
||||
|| (trimmed.startsWith('<actions') && !trimmed.includes('</actions>'));
|
||||
// Strip any completed <actions> block from the streaming text
|
||||
const streamText = stripActionsBlock(response).trim();
|
||||
if (canEdit && !mayBeHidden && !suppressDelivery && streamText.length > 0 && Date.now() - lastUpdate > 500) {
|
||||
if (canEdit && !mayBeHidden && !suppressDelivery && !this.cancelledKeys.has(convKey) && streamText.length > 0 && Date.now() - lastUpdate > 500) {
|
||||
try {
|
||||
const prefixedStream = this.prefixResponse(streamText);
|
||||
if (messageId) {
|
||||
@@ -2018,6 +2056,19 @@ export class LettaBot implements AgentSession {
|
||||
}
|
||||
|
||||
if (streamMsg.type === 'result') {
|
||||
// Discard cancelled run results -- the server flushes accumulated
|
||||
// content from a previously cancelled run as the result for the
|
||||
// next message. Discard it and retry so the message gets processed.
|
||||
if (streamMsg.stopReason === 'cancelled') {
|
||||
log.info(`Discarding cancelled run result (len=${typeof streamMsg.result === 'string' ? streamMsg.result.length : 0})`);
|
||||
this.invalidateSession(convKey);
|
||||
session = null;
|
||||
if (!retried) {
|
||||
return this.processMessage(msg, adapter, true);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
const resultText = typeof streamMsg.result === 'string' ? streamMsg.result : '';
|
||||
if (resultText.trim().length > 0) {
|
||||
response = resultText;
|
||||
@@ -2158,6 +2209,17 @@ export class LettaBot implements AgentSession {
|
||||
}
|
||||
lap('stream complete');
|
||||
|
||||
// If cancelled, clean up partial state and return early
|
||||
if (this.cancelledKeys.has(convKey)) {
|
||||
if (messageId) {
|
||||
try {
|
||||
await adapter.editMessage(msg.chatId, messageId, '(Run cancelled.)');
|
||||
} catch { /* best effort */ }
|
||||
}
|
||||
log.info(`Skipping post-stream delivery -- cancelled (key=${convKey})`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Parse and execute XML directives (e.g. <actions><react emoji="eyes" /></actions>)
|
||||
await parseAndHandleDirectives();
|
||||
|
||||
@@ -2239,6 +2301,7 @@ export class LettaBot implements AgentSession {
|
||||
}
|
||||
} finally {
|
||||
// Session stays alive for reuse -- only invalidated on errors
|
||||
this.cancelledKeys.delete(this.resolveConversationKey(msg.channel, msg.chatId));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -26,6 +26,10 @@ describe('parseCommand', () => {
|
||||
it('returns { command, args } for /model', () => {
|
||||
expect(parseCommand('/model')).toEqual({ command: 'model', args: '' });
|
||||
});
|
||||
|
||||
it('returns { command, args } for /cancel', () => {
|
||||
expect(parseCommand('/cancel')).toEqual({ command: 'cancel', args: '' });
|
||||
});
|
||||
});
|
||||
|
||||
describe('invalid input', () => {
|
||||
@@ -93,7 +97,7 @@ describe('COMMANDS', () => {
|
||||
});
|
||||
|
||||
it('has exactly 6 commands', () => {
|
||||
expect(COMMANDS).toHaveLength(6);
|
||||
expect(COMMANDS).toHaveLength(7);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
* Shared command parsing and help text for all channels.
|
||||
*/
|
||||
|
||||
export const COMMANDS = ['status', 'heartbeat', 'reset', 'help', 'start', 'model'] as const;
|
||||
export const COMMANDS = ['status', 'heartbeat', 'reset', 'cancel', 'help', 'start', 'model'] as const;
|
||||
export type Command = typeof COMMANDS[number];
|
||||
|
||||
export interface ParsedCommand {
|
||||
@@ -18,6 +18,7 @@ Commands:
|
||||
/status - Show current status
|
||||
/heartbeat - Trigger heartbeat
|
||||
/reset - Reset conversation (keeps agent memory)
|
||||
/cancel - Abort the current agent run
|
||||
/model - Show current model and list available models
|
||||
/model <handle> - Switch to a different model
|
||||
/help - Show this message
|
||||
|
||||
@@ -464,6 +464,30 @@ export async function cancelRuns(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel active runs for a specific conversation.
|
||||
* Scoped to a single conversation -- won't affect other channels/conversations.
|
||||
*/
|
||||
export async function cancelConversation(
|
||||
conversationId: string
|
||||
): Promise<boolean> {
|
||||
try {
|
||||
const client = getClient();
|
||||
await client.conversations.cancel(conversationId);
|
||||
log.info(`Cancelled runs for conversation ${conversationId}`);
|
||||
return true;
|
||||
} catch (e) {
|
||||
// 409 "No active runs to cancel" is expected when cancel fires before run starts
|
||||
const err = e as { status?: number };
|
||||
if (err?.status === 409) {
|
||||
log.info(`No active runs to cancel for conversation ${conversationId} (409)`);
|
||||
return true;
|
||||
}
|
||||
log.error(`Failed to cancel conversation ${conversationId}:`, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch the error detail from the latest failed run on an agent.
|
||||
* Returns the actual error detail from run metadata (which is more
|
||||
|
||||
Reference in New Issue
Block a user