From 64a0e4b7d830836a596703069c83f893e887d314 Mon Sep 17 00:00:00 2001 From: Cameron Date: Thu, 26 Feb 2026 11:18:47 -0800 Subject: [PATCH] feat: add sendFile support to Signal channel adapter (#407) --- src/channels/signal.test.ts | 89 +++++++++++++++++++++++++++ src/channels/signal.ts | 32 +++++++++- src/config/types.ts | 2 +- src/core/bot.ts | 16 +++-- src/core/sdk-session-contract.test.ts | 60 ++++++++++++++++++ 5 files changed, 193 insertions(+), 6 deletions(-) create mode 100644 src/channels/signal.test.ts diff --git a/src/channels/signal.test.ts b/src/channels/signal.test.ts new file mode 100644 index 0000000..499eff3 --- /dev/null +++ b/src/channels/signal.test.ts @@ -0,0 +1,89 @@ +import { describe, it, expect, vi } from 'vitest'; +import { SignalAdapter } from './signal.js'; + +describe('SignalAdapter sendFile', () => { + function createAdapter(phone = '+15555555555') { + return new SignalAdapter({ phoneNumber: phone }); + } + + it('sends attachment to a direct message recipient', async () => { + const adapter = createAdapter(); + const rpcSpy = vi.spyOn(adapter as any, 'rpcRequest').mockResolvedValue({ timestamp: 12345 }); + + const result = await adapter.sendFile({ + chatId: '+12223334444', + filePath: '/tmp/voice.ogg', + }); + + expect(rpcSpy).toHaveBeenCalledWith('send', { + attachment: ['/tmp/voice.ogg'], + account: '+15555555555', + recipient: ['+12223334444'], + }); + expect(result.messageId).toBe('12345'); + }); + + it('sends attachment to a group', async () => { + const adapter = createAdapter(); + const rpcSpy = vi.spyOn(adapter as any, 'rpcRequest').mockResolvedValue({ timestamp: 99 }); + + await adapter.sendFile({ + chatId: 'group:abc123', + filePath: '/tmp/photo.png', + kind: 'image', + }); + + expect(rpcSpy).toHaveBeenCalledWith('send', { + attachment: ['/tmp/photo.png'], + account: '+15555555555', + groupId: 'abc123', + }); + }); + + it('includes caption as message text', async () => { + const adapter = createAdapter(); + const rpcSpy = vi.spyOn(adapter as any, 'rpcRequest').mockResolvedValue({ timestamp: 1 }); + + await adapter.sendFile({ + chatId: '+12223334444', + filePath: '/tmp/report.pdf', + caption: 'Here is the report', + }); + + expect(rpcSpy).toHaveBeenCalledWith('send', { + attachment: ['/tmp/report.pdf'], + message: 'Here is the report', + account: '+15555555555', + recipient: ['+12223334444'], + }); + }); + + it('sends to own number for note-to-self', async () => { + const adapter = createAdapter('+19998887777'); + const rpcSpy = vi.spyOn(adapter as any, 'rpcRequest').mockResolvedValue({ timestamp: 42 }); + + await adapter.sendFile({ + chatId: 'note-to-self', + filePath: '/tmp/memo.ogg', + kind: 'audio', + }); + + expect(rpcSpy).toHaveBeenCalledWith('send', { + attachment: ['/tmp/memo.ogg'], + account: '+19998887777', + recipient: ['+19998887777'], + }); + }); + + it('returns unknown when no timestamp in response', async () => { + const adapter = createAdapter(); + vi.spyOn(adapter as any, 'rpcRequest').mockResolvedValue({}); + + const result = await adapter.sendFile({ + chatId: '+12223334444', + filePath: '/tmp/file.txt', + }); + + expect(result.messageId).toBe('unknown'); + }); +}); diff --git a/src/channels/signal.ts b/src/channels/signal.ts index aabce64..77d38a6 100644 --- a/src/channels/signal.ts +++ b/src/channels/signal.ts @@ -6,7 +6,7 @@ */ import type { ChannelAdapter } from './types.js'; -import type { InboundAttachment, InboundMessage, OutboundMessage } from '../core/types.js'; +import type { InboundAttachment, InboundMessage, OutboundFile, OutboundMessage } from '../core/types.js'; import { applySignalGroupGating } from './signal/group-gating.js'; import type { DmPolicy } from '../pairing/types.js'; import { @@ -305,6 +305,36 @@ This code expires in 1 hour.`; }; } + async sendFile(file: OutboundFile): Promise<{ messageId: string }> { + const params: Record = { + attachment: [file.filePath], + }; + + // Include caption as the message text + if (file.caption) { + params.message = file.caption; + } + + if (this.config.phoneNumber) { + params.account = this.config.phoneNumber; + } + + const target = file.chatId === 'note-to-self' ? this.config.phoneNumber : file.chatId; + + if (target.startsWith('group:')) { + params.groupId = target.slice('group:'.length); + } else { + params.recipient = [target]; + } + + const result = await this.rpcRequest<{ timestamp?: number }>('send', params); + const timestamp = result?.timestamp; + + return { + messageId: timestamp ? String(timestamp) : 'unknown', + }; + } + getDmPolicy(): string { return this.config.dmPolicy || 'pairing'; } diff --git a/src/config/types.ts b/src/config/types.ts index 92e1a19..6700510 100644 --- a/src/config/types.ts +++ b/src/config/types.ts @@ -529,7 +529,7 @@ export function normalizeAgents(config: LettaBotConfig): AgentConfig[] { ]; for (const [name, raw, included] of channelCredentials) { if (raw && (raw as Record).enabled !== false && !included) { - console.warn(`[Config] Channel '${name}' is in ${sourcePath} but missing required credentials -- skipping. Check your lettabot.yaml or environment variables.`); + log.warn(`Channel '${name}' is in ${sourcePath} but missing required credentials -- skipping. Check your lettabot.yaml or environment variables.`); } } diff --git a/src/core/bot.ts b/src/core/bot.ts index 50b378a..a8e0c78 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -1065,7 +1065,11 @@ export class LettaBot implements AgentSession { let oldestKey: string | null = null; let oldestTime = Infinity; for (const [k, ts] of this.sessionLastUsed) { - if (k !== key && ts < oldestTime && this.sessions.has(k)) { + if (k === key) continue; + if (!this.sessions.has(k)) continue; + // Never evict an active/in-flight key (can close a live stream). + if (this.processingKeys.has(k) || this.sessionCreationLocks.has(k)) continue; + if (ts < oldestTime) { oldestKey = k; oldestTime = ts; } @@ -1078,6 +1082,9 @@ export class LettaBot implements AgentSession { this.sessionLastUsed.delete(oldestKey); this.sessionGenerations.delete(oldestKey); this.sessionCreationLocks.delete(oldestKey); + } else { + // All existing sessions are active; allow temporary overflow. + log.debug(`LRU session eviction skipped: all ${this.sessions.size} sessions are active/in-flight`); } } @@ -1137,9 +1144,10 @@ export class LettaBot implements AgentSession { this.store.refresh(); if (!this.store.agentId && !this.store.conversationId) return; try { - // In shared mode, warm the single session. In per-channel mode, warm nothing - // (sessions are created on first message per channel). - if (this.config.conversationMode !== 'per-channel') { + const mode = this.config.conversationMode || 'shared'; + // In shared mode, warm the single session. In per-channel/per-chat modes, + // warm nothing (sessions are created on first message per key). + if (mode === 'shared') { await this.ensureSessionForKey('shared'); } } catch (err) { diff --git a/src/core/sdk-session-contract.test.ts b/src/core/sdk-session-contract.test.ts index 7ef8ddb..83a244c 100644 --- a/src/core/sdk-session-contract.test.ts +++ b/src/core/sdk-session-contract.test.ts @@ -262,6 +262,20 @@ describe('SDK session contract', () => { expect(vi.mocked(createSession)).toHaveBeenCalledTimes(1); }); + it('does not pre-warm a shared session in per-chat mode', async () => { + const bot = new LettaBot({ + workingDir: join(dataDir, 'working'), + allowedTools: [], + conversationMode: 'per-chat', + }); + bot.setAgentId('agent-contract-test'); + + await bot.warmSession(); + + expect(vi.mocked(createSession)).not.toHaveBeenCalled(); + expect(vi.mocked(resumeSession)).not.toHaveBeenCalled(); + }); + it('passes memfs: true to createSession when config sets memfs true', async () => { const mockSession = { initialize: vi.fn(async () => undefined), @@ -378,6 +392,52 @@ describe('SDK session contract', () => { expect(processSpy).toHaveBeenCalledWith('slack'); }); + it('LRU eviction in per-chat mode does not close active keys', async () => { + const createdSession = { + initialize: vi.fn(async () => undefined), + send: vi.fn(async (_message: unknown) => undefined), + stream: vi.fn(() => + (async function* () { + yield { type: 'result', success: true }; + })() + ), + close: vi.fn(() => undefined), + agentId: 'agent-contract-test', + conversationId: 'conv-new', + }; + vi.mocked(createSession).mockReturnValue(createdSession as never); + + const activeSession = { + close: vi.fn(() => undefined), + }; + const idleSession = { + close: vi.fn(() => undefined), + }; + + const bot = new LettaBot({ + workingDir: join(dataDir, 'working'), + allowedTools: [], + conversationMode: 'per-chat', + maxSessions: 2, + }); + bot.setAgentId('agent-contract-test'); + + const botInternal = bot as any; + botInternal.sessions.set('telegram:active', activeSession); + botInternal.sessions.set('telegram:idle', idleSession); + botInternal.sessionLastUsed.set('telegram:active', 1); + botInternal.sessionLastUsed.set('telegram:idle', 2); + botInternal.processingKeys.add('telegram:active'); + + await botInternal._createSessionForKey('telegram:new', true, 0); + + expect(activeSession.close).not.toHaveBeenCalled(); + expect(idleSession.close).toHaveBeenCalledTimes(1); + expect(botInternal.sessions.has('telegram:active')).toBe(true); + expect(botInternal.sessions.has('telegram:idle')).toBe(false); + expect(botInternal.sessions.has('telegram:new')).toBe(true); + }); + it('enriches opaque error via stream error event in sendToAgent', async () => { const mockSession = { initialize: vi.fn(async () => undefined),