refactor(matrix): unify !commands with upstream /commands
Matrix !commands that duplicated upstream bot logic (!new, !status, !showreasoning, !restore) are removed. !reset, !cancel, !status, !model, and bare !heartbeat now delegate to the upstream /reset, /cancel, /status, /model, /heartbeat handlers via onCommand callback. This fixes !new (now !reset) which only invalidated the session without clearing the conversation from the store, causing stuck processes to reconnect to poisoned conversations. Matrix-specific commands (!pause, !resume, !ignorebot-*, !turns, !timeout, !heartbeat on/off) remain local since they manage per-room state with no upstream equivalent. Authored-by: Ani Tunturi <ani@wiuf.net>
This commit is contained in:
@@ -80,9 +80,6 @@ export class MatrixAdapter implements ChannelAdapter {
|
|||||||
onHeartbeatStop?: () => void;
|
onHeartbeatStop?: () => void;
|
||||||
onHeartbeatStart?: () => void;
|
onHeartbeatStart?: () => void;
|
||||||
onTimeoutHeartbeat?: () => void;
|
onTimeoutHeartbeat?: () => void;
|
||||||
getAgentId?: () => string | undefined;
|
|
||||||
// Invalidate the session for a conversation key — used by !new to force a fresh conversation
|
|
||||||
onInvalidateSession?: (key?: string) => void;
|
|
||||||
|
|
||||||
constructor(config: MatrixAdapterConfig) {
|
constructor(config: MatrixAdapterConfig) {
|
||||||
if (!config.homeserverUrl) throw new Error("homeserverUrl is required");
|
if (!config.homeserverUrl) throw new Error("homeserverUrl is required");
|
||||||
@@ -145,8 +142,7 @@ export class MatrixAdapter implements ChannelAdapter {
|
|||||||
onHeartbeatStart: () => { this._heartbeatEnabled = true; this.onHeartbeatStart?.(); },
|
onHeartbeatStart: () => { this._heartbeatEnabled = true; this.onHeartbeatStart?.(); },
|
||||||
isHeartbeatEnabled: () => this._heartbeatEnabled,
|
isHeartbeatEnabled: () => this._heartbeatEnabled,
|
||||||
onTimeoutHeartbeat: () => this.onTimeoutHeartbeat?.(),
|
onTimeoutHeartbeat: () => this.onTimeoutHeartbeat?.(),
|
||||||
getAgentId: () => this.getAgentId?.(),
|
onCommand: (cmd, chatId, args) => this.onCommand?.(cmd, chatId, args) ?? Promise.resolve(null),
|
||||||
onInvalidateSession: (key: string) => this.onInvalidateSession?.(key),
|
|
||||||
});
|
});
|
||||||
|
|
||||||
await this.initClient();
|
await this.initClient();
|
||||||
|
|||||||
@@ -1,14 +1,22 @@
|
|||||||
/**
|
/**
|
||||||
* Matrix Bot Command Processor
|
* Matrix Bot Command Processor
|
||||||
*
|
*
|
||||||
* Handles !commands sent by allowed users in Matrix rooms:
|
* Handles !commands sent by allowed users in Matrix rooms.
|
||||||
|
*
|
||||||
|
* Matrix-specific commands (per-room state, bot-loop prevention):
|
||||||
* !commands — list all available commands
|
* !commands — list all available commands
|
||||||
* !pause — silence bot in current room (SQLite persisted)
|
* !pause — silence bot in current room (SQLite persisted)
|
||||||
* !resume — re-enable bot in current room
|
* !resume — re-enable bot in current room
|
||||||
* !status — show paused rooms, ignored bots, heartbeat state
|
|
||||||
* !ignorebot-add @u:s — add user to global ignore list (prevents bot loops)
|
* !ignorebot-add @u:s — add user to global ignore list (prevents bot loops)
|
||||||
* !ignorebot-remove @u:s — remove user from ignore list
|
* !ignorebot-remove @u:s — remove user from ignore list
|
||||||
* !heartbeat on/off — toggle the heartbeat cron (in-memory)
|
* !turns N — respond to bot messages for N turns
|
||||||
|
* !timeout — kill stuck heartbeat run
|
||||||
|
*
|
||||||
|
* Delegated to upstream bot commands (full store + session lifecycle):
|
||||||
|
* !reset — delegates to /reset (clear conversation + new session)
|
||||||
|
* !cancel — delegates to /cancel (abort active run)
|
||||||
|
* !status — delegates to /status (agent info + conversation keys)
|
||||||
|
* !heartbeat — delegates to /heartbeat (trigger) or toggle on/off locally
|
||||||
*
|
*
|
||||||
* Commands run AFTER access control (allowedUsers) but BEFORE the paused-room
|
* Commands run AFTER access control (allowedUsers) but BEFORE the paused-room
|
||||||
* check, so !resume always works even in a paused room.
|
* check, so !resume always works even in a paused room.
|
||||||
@@ -24,8 +32,8 @@ interface CommandCallbacks {
|
|||||||
onHeartbeatStart?: () => void;
|
onHeartbeatStart?: () => void;
|
||||||
isHeartbeatEnabled?: () => boolean;
|
isHeartbeatEnabled?: () => boolean;
|
||||||
onTimeoutHeartbeat?: () => void;
|
onTimeoutHeartbeat?: () => void;
|
||||||
getAgentId?: () => string | undefined;
|
/** Delegate to upstream bot /commands (reset, cancel, status, heartbeat, model) */
|
||||||
onInvalidateSession?: (key: string) => void;
|
onCommand?: (command: string, chatId?: string, args?: string) => Promise<string | null>;
|
||||||
}
|
}
|
||||||
|
|
||||||
export class MatrixCommandProcessor {
|
export class MatrixCommandProcessor {
|
||||||
@@ -48,37 +56,40 @@ export class MatrixCommandProcessor {
|
|||||||
body: string,
|
body: string,
|
||||||
roomId: string,
|
roomId: string,
|
||||||
sender: string,
|
sender: string,
|
||||||
roomMeta?: { isDm: boolean; roomName: string },
|
_roomMeta?: { isDm: boolean; roomName: string },
|
||||||
): Promise<string | undefined> {
|
): Promise<string | undefined> {
|
||||||
const parts = body.slice(1).trim().split(/\s+/);
|
const parts = body.slice(1).trim().split(/\s+/);
|
||||||
const cmd = parts[0]?.toLowerCase();
|
const cmd = parts[0]?.toLowerCase();
|
||||||
const args = parts.slice(1);
|
const args = parts.slice(1);
|
||||||
|
|
||||||
switch (cmd) {
|
switch (cmd) {
|
||||||
|
// Matrix-specific commands (per-room state)
|
||||||
case "commands":
|
case "commands":
|
||||||
return this.doCommands();
|
return this.doCommands();
|
||||||
case "pause":
|
case "pause":
|
||||||
return this.doPause(roomId, sender);
|
return this.doPause(roomId, sender);
|
||||||
case "resume":
|
case "resume":
|
||||||
return this.doResume(roomId);
|
return this.doResume(roomId);
|
||||||
case "status":
|
|
||||||
return this.doStatus(roomId);
|
|
||||||
case "ignorebot-add":
|
case "ignorebot-add":
|
||||||
return this.doBotAdd(args[0], sender);
|
return this.doBotAdd(args[0], sender);
|
||||||
case "ignorebot-remove":
|
case "ignorebot-remove":
|
||||||
return this.doBotRemove(args[0]);
|
return this.doBotRemove(args[0]);
|
||||||
case "heartbeat":
|
|
||||||
return this.doHeartbeat(args[0]);
|
|
||||||
case "restore":
|
|
||||||
return this.doRestore(args[0], roomId, roomMeta?.isDm ?? false, roomMeta?.roomName ?? roomId);
|
|
||||||
case "turns":
|
case "turns":
|
||||||
return this.doTurns(args[0], roomId);
|
return this.doTurns(args[0], roomId);
|
||||||
case "timeout":
|
case "timeout":
|
||||||
return this.doTimeout();
|
return this.doTimeout();
|
||||||
case "new":
|
|
||||||
return await this.doNew(roomId, roomMeta?.isDm ?? false, roomMeta?.roomName ?? roomId);
|
// Heartbeat: on/off toggles locally, bare !heartbeat delegates to /heartbeat (trigger)
|
||||||
case "showreasoning":
|
case "heartbeat":
|
||||||
return this.doShowReasoning();
|
return this.doHeartbeat(args[0], roomId);
|
||||||
|
|
||||||
|
// Delegate to upstream /commands
|
||||||
|
case "reset":
|
||||||
|
case "cancel":
|
||||||
|
case "status":
|
||||||
|
case "model":
|
||||||
|
return await this.delegateToBot(cmd, roomId, args.join(' ') || undefined);
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
@@ -120,29 +131,44 @@ export class MatrixCommandProcessor {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ─── Command implementations ─────────────────────────────────────────────
|
// ─── Delegate to upstream bot commands ──────────────────────────────────
|
||||||
|
|
||||||
|
private async delegateToBot(
|
||||||
|
command: string,
|
||||||
|
roomId: string,
|
||||||
|
args?: string,
|
||||||
|
): Promise<string> {
|
||||||
|
if (!this.callbacks.onCommand) {
|
||||||
|
return `⚠️ !${command} not available (bot command handler not wired)`;
|
||||||
|
}
|
||||||
|
const result = await this.callbacks.onCommand(command, roomId, args);
|
||||||
|
return result ?? `(No response from /${command})`;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── Matrix-specific command implementations ────────────────────────────
|
||||||
|
|
||||||
private doCommands(): string {
|
private doCommands(): string {
|
||||||
const lines = [
|
const lines = [
|
||||||
"📜 **Available Commands**",
|
"📜 **Available Commands**",
|
||||||
"",
|
"",
|
||||||
"**Bot Control**",
|
"**Room Control**",
|
||||||
" `!pause` — Silence bot in current room",
|
" `!pause` — Silence bot in current room",
|
||||||
" `!resume` — Re-enable bot in current room",
|
" `!resume` — Re-enable bot in current room",
|
||||||
" `!status` — Show bot status, paused rooms, heartbeat state",
|
" `!status` — Show agent status and conversation info",
|
||||||
"",
|
"",
|
||||||
"**Bot Loop Prevention**",
|
"**Bot Loop Prevention**",
|
||||||
" `!ignorebot-add @user:server` — Add bot to ignore list",
|
" `!ignorebot-add @user:server` — Add bot to ignore list",
|
||||||
" `!ignorebot-remove @user:server` — Remove from ignore list",
|
" `!ignorebot-remove @user:server` — Remove from ignore list",
|
||||||
" `!turns N` (1-50) — Respond to bot messages for N turns",
|
" `!turns N` (1-50) — Respond to bot messages for N turns",
|
||||||
"",
|
"",
|
||||||
"**Conversation Management**",
|
"**Conversation**",
|
||||||
" `!new` — Create fresh Letta conversation for this room",
|
" `!reset` — Reset conversation for this room (fresh start)",
|
||||||
" `!restore conv-xxxx` — Point room at specific conversation",
|
" `!cancel` — Cancel active run",
|
||||||
" `!showreasoning` — Show current reasoning display status",
|
" `!model [handle]` — View or change LLM model",
|
||||||
"",
|
"",
|
||||||
"**Heartbeat Control**",
|
"**Heartbeat**",
|
||||||
" `!heartbeat on/off` — Toggle heartbeat cron",
|
" `!heartbeat on/off` — Toggle heartbeat cron",
|
||||||
|
" `!heartbeat` — Trigger heartbeat now",
|
||||||
" `!timeout` — Kill stuck heartbeat run",
|
" `!timeout` — Kill stuck heartbeat run",
|
||||||
];
|
];
|
||||||
return lines.join("\n");
|
return lines.join("\n");
|
||||||
@@ -158,28 +184,6 @@ export class MatrixCommandProcessor {
|
|||||||
return "▶️ Bot resumed in this room.";
|
return "▶️ Bot resumed in this room.";
|
||||||
}
|
}
|
||||||
|
|
||||||
private doStatus(roomId: string): string {
|
|
||||||
const paused = this.storage.getPausedRooms();
|
|
||||||
const ignored = this.storage.getIgnoredBots();
|
|
||||||
const hbState = this.callbacks.isHeartbeatEnabled?.() ? "on" : "off";
|
|
||||||
const thisRoomPaused = this.storage.isRoomPaused(roomId);
|
|
||||||
|
|
||||||
const turnsRemaining = this.botTurns.get(roomId);
|
|
||||||
const lines = [
|
|
||||||
"📊 **Bot Status**",
|
|
||||||
`This room: ${thisRoomPaused ? "⏸️ paused" : "▶️ active"}`,
|
|
||||||
`Conversation key: \`matrix:${roomId}\``,
|
|
||||||
turnsRemaining ? `Bot turns: ${turnsRemaining} remaining` : "Bot turns: off (observer mode in multi-bot rooms)",
|
|
||||||
paused.length > 0 ? `Paused rooms: ${paused.length}` : "No rooms paused",
|
|
||||||
ignored.length > 0
|
|
||||||
? `Known bots:\n${ignored.map((u) => ` • ${u}`).join("\n")}`
|
|
||||||
: "No known bots",
|
|
||||||
`Heartbeat: ${hbState}`,
|
|
||||||
];
|
|
||||||
|
|
||||||
return lines.join("\n");
|
|
||||||
}
|
|
||||||
|
|
||||||
private doBotAdd(userId: string | undefined, sender: string): string {
|
private doBotAdd(userId: string | undefined, sender: string): string {
|
||||||
if (!userId?.startsWith("@")) {
|
if (!userId?.startsWith("@")) {
|
||||||
return "⚠️ Usage: !ignorebot-add @user:server";
|
return "⚠️ Usage: !ignorebot-add @user:server";
|
||||||
@@ -196,8 +200,9 @@ export class MatrixCommandProcessor {
|
|||||||
return `✅ Removed ${userId} from ignore list`;
|
return `✅ Removed ${userId} from ignore list`;
|
||||||
}
|
}
|
||||||
|
|
||||||
private doHeartbeat(arg: string | undefined): string {
|
private async doHeartbeat(arg: string | undefined, roomId: string): Promise<string> {
|
||||||
const normalized = arg?.toLowerCase();
|
const normalized = arg?.toLowerCase();
|
||||||
|
// !heartbeat on/off — local toggle
|
||||||
if (normalized === "off" || normalized === "stop") {
|
if (normalized === "off" || normalized === "stop") {
|
||||||
this.callbacks.onHeartbeatStop?.();
|
this.callbacks.onHeartbeatStop?.();
|
||||||
return "⏸️ Heartbeat cron stopped";
|
return "⏸️ Heartbeat cron stopped";
|
||||||
@@ -206,7 +211,8 @@ export class MatrixCommandProcessor {
|
|||||||
this.callbacks.onHeartbeatStart?.();
|
this.callbacks.onHeartbeatStart?.();
|
||||||
return "▶️ Heartbeat cron started";
|
return "▶️ Heartbeat cron started";
|
||||||
}
|
}
|
||||||
return "⚠️ Usage: !heartbeat on | !heartbeat off";
|
// Bare !heartbeat — delegate to /heartbeat (trigger)
|
||||||
|
return await this.delegateToBot('heartbeat', roomId);
|
||||||
}
|
}
|
||||||
|
|
||||||
private doTurns(arg: string | undefined, roomId: string): string {
|
private doTurns(arg: string | undefined, roomId: string): string {
|
||||||
@@ -220,15 +226,6 @@ export class MatrixCommandProcessor {
|
|||||||
return `🔄 Will respond to bot messages for the next ${n} turns in this room`;
|
return `🔄 Will respond to bot messages for the next ${n} turns in this room`;
|
||||||
}
|
}
|
||||||
|
|
||||||
private doRestore(
|
|
||||||
_convId: string | undefined,
|
|
||||||
_roomId: string,
|
|
||||||
_isDm: boolean,
|
|
||||||
_roomName: string,
|
|
||||||
): string {
|
|
||||||
return "ℹ️ !restore is no longer needed — each room has its own persistent conversation via per-chat mode.\nUse !new to start a fresh conversation.";
|
|
||||||
}
|
|
||||||
|
|
||||||
private doTimeout(): string {
|
private doTimeout(): string {
|
||||||
if (this.callbacks.onTimeoutHeartbeat) {
|
if (this.callbacks.onTimeoutHeartbeat) {
|
||||||
this.callbacks.onTimeoutHeartbeat();
|
this.callbacks.onTimeoutHeartbeat();
|
||||||
@@ -236,38 +233,4 @@ export class MatrixCommandProcessor {
|
|||||||
}
|
}
|
||||||
return "⚠️ No heartbeat timeout handler registered";
|
return "⚠️ No heartbeat timeout handler registered";
|
||||||
}
|
}
|
||||||
|
|
||||||
private async doNew(
|
|
||||||
roomId: string,
|
|
||||||
isDm: boolean,
|
|
||||||
roomName: string,
|
|
||||||
): Promise<string> {
|
|
||||||
const agentId = this.callbacks.getAgentId?.();
|
|
||||||
if (!agentId) {
|
|
||||||
return "⚠️ No agent ID available";
|
|
||||||
}
|
|
||||||
if (!this.callbacks.onInvalidateSession) {
|
|
||||||
return "⚠️ Session reset not available (onInvalidateSession not wired)";
|
|
||||||
}
|
|
||||||
// In per-chat mode the conversation key is 'matrix:{roomId}'
|
|
||||||
const key = `matrix:${roomId}`;
|
|
||||||
this.callbacks.onInvalidateSession(key);
|
|
||||||
log.info(`!new: invalidated session for key ${key}`);
|
|
||||||
return `✓ Fresh conversation started for ${isDm ? "this DM" : roomName}. Next message will begin a new session.`;
|
|
||||||
}
|
|
||||||
|
|
||||||
private doShowReasoning(): string {
|
|
||||||
return [
|
|
||||||
"🧠 **Reasoning Text Display**",
|
|
||||||
"",
|
|
||||||
"Controls whether the agent's thinking/reasoning text is shown in chat.",
|
|
||||||
"The 🧠 emoji always appears when reasoning starts — this setting controls the text.",
|
|
||||||
"",
|
|
||||||
"**Configuration:** Set `display.showReasoning` in your `lettabot.yaml`.",
|
|
||||||
" - `true`: Show reasoning text in a collapsible block",
|
|
||||||
" - `false`: Hide reasoning text (only final response shown)",
|
|
||||||
"",
|
|
||||||
"Restart the bot after changing config.",
|
|
||||||
].join('\n');
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -41,7 +41,6 @@ export interface ChannelAdapter {
|
|||||||
// Event handlers (set by bot core)
|
// Event handlers (set by bot core)
|
||||||
onMessage?: (msg: InboundMessage) => Promise<void>;
|
onMessage?: (msg: InboundMessage) => Promise<void>;
|
||||||
onCommand?: (command: string, chatId?: string, args?: string, forcePerChat?: boolean) => Promise<string | null>;
|
onCommand?: (command: string, chatId?: string, args?: string, forcePerChat?: boolean) => Promise<string | null>;
|
||||||
onInvalidateSession?: (key?: string) => void;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -60,9 +60,6 @@ export interface AgentSession {
|
|||||||
|
|
||||||
/** Invalidate a cached session, forcing fresh session on next message */
|
/** Invalidate a cached session, forcing fresh session on next message */
|
||||||
invalidateSession(key?: string): void;
|
invalidateSession(key?: string): void;
|
||||||
|
|
||||||
/** Callback for session invalidation requests (e.g., from !new command) */
|
|
||||||
onInvalidateSession?: (key?: string) => void;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -457,8 +457,6 @@ async function main() {
|
|||||||
matrixAdapter.onHeartbeatStart = () => heartbeatService.start();
|
matrixAdapter.onHeartbeatStart = () => heartbeatService.start();
|
||||||
// Best-effort: stops the timer so no new runs fire; running promise times out on its own
|
// Best-effort: stops the timer so no new runs fire; running promise times out on its own
|
||||||
matrixAdapter.onTimeoutHeartbeat = () => { heartbeatService.stop(); log.warn('Matrix !timeout: heartbeat stopped (abort not yet supported)'); };
|
matrixAdapter.onTimeoutHeartbeat = () => { heartbeatService.stop(); log.warn('Matrix !timeout: heartbeat stopped (abort not yet supported)'); };
|
||||||
matrixAdapter.getAgentId = () => bot.getStatus().agentId ?? undefined;
|
|
||||||
matrixAdapter.onInvalidateSession = (key?: string) => bot.invalidateSession(key);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Per-agent polling -- resolve accounts from polling > integrations.google (legacy) > env
|
// Per-agent polling -- resolve accounts from polling > integrations.google (legacy) > env
|
||||||
|
|||||||
Reference in New Issue
Block a user