Files
lettabot/src/channels/signal.ts

1020 lines
34 KiB
TypeScript

/**
* Signal Channel Adapter
*
* Uses signal-cli in daemon mode for Signal messaging.
* Based on moltbot's implementation.
*/
import type { ChannelAdapter } from './types.js';
import type { InboundAttachment, InboundMessage, OutboundFile, OutboundMessage } from '../core/types.js';
import { applySignalGroupGating } from './signal/group-gating.js';
import { resolveDailyLimits, checkDailyLimit } from './group-mode.js';
import type { DmPolicy } from '../pairing/types.js';
import {
isUserAllowed,
upsertPairingRequest,
} from '../pairing/store.js';
import { buildAttachmentPath } from './attachments.js';
import { parseCommand, HELP_TEXT } from '../core/commands.js';
import { spawn, type ChildProcess } from 'node:child_process';
import { randomUUID } from 'node:crypto';
import { homedir } from 'node:os';
import { join } from 'node:path';
import { copyFile, stat, access } from 'node:fs/promises';
import { constants } from 'node:fs';
import type { GroupModeConfig } from './group-mode.js';
import { createLogger } from '../logger.js';
const log = createLogger('Signal');
export interface SignalGroupConfig extends GroupModeConfig {}
export interface SignalConfig {
phoneNumber: string; // Bot's phone number (E.164 format, e.g., +15551234567)
cliPath?: string; // Path to signal-cli binary (default: "signal-cli")
httpHost?: string; // Daemon HTTP host (default: "127.0.0.1")
httpPort?: number; // Daemon HTTP port (default: 8090)
startupTimeoutMs?: number; // Max time to wait for daemon startup (default: 30000)
// Security
dmPolicy?: DmPolicy; // 'pairing' (default), 'allowlist', or 'open'
allowedUsers?: string[]; // Phone numbers (config allowlist)
selfChatMode?: boolean; // Respond to Note to Self (default: true)
attachmentsDir?: string;
attachmentsMaxBytes?: number;
// Group gating
mentionPatterns?: string[]; // Regex patterns for mention detection (e.g., ["@bot"])
groups?: Record<string, SignalGroupConfig>; // Per-group settings, "*" for defaults
agentName?: string; // For scoping daily limit counters in multi-agent mode
}
type SignalRpcResponse<T> = {
jsonrpc?: string;
result?: T;
error?: { code?: number; message?: string };
id?: string | number | null;
};
type SignalSseEvent = {
envelope?: {
source?: string;
sourceUuid?: string;
timestamp?: number;
dataMessage?: {
message?: string;
timestamp?: number;
groupInfo?: {
groupId?: string;
groupName?: string;
};
attachments?: Array<{
contentType?: string;
filename?: string;
id?: string;
size?: number;
width?: number;
height?: number;
caption?: string;
}>;
mentions?: Array<{
start?: number;
length?: number;
uuid?: string;
number?: string;
}>;
quote?: {
id?: number;
author?: string;
authorUuid?: string;
text?: string;
};
};
syncMessage?: {
sentMessage?: {
destination?: string;
destinationUuid?: string;
message?: string;
timestamp?: number;
groupInfo?: {
groupId?: string;
groupName?: string;
};
attachments?: Array<{
contentType?: string;
filename?: string;
id?: string;
size?: number;
width?: number;
height?: number;
caption?: string;
}>;
mentions?: Array<{
start?: number;
length?: number;
uuid?: string;
number?: string;
}>;
quote?: {
id?: number;
author?: string;
authorUuid?: string;
text?: string;
};
};
};
typingMessage?: {
action?: string;
};
};
};
/**
* Wait for a file to exist on disk with exponential backoff.
* Signal-cli may still be downloading attachments when the SSE event fires.
*
* @param filePath - Path to check
* @param maxWaitMs - Maximum time to wait (default: 5000ms)
* @param intervalMs - Initial polling interval (default: 100ms)
* @returns true if file exists, false if timeout exceeded
*/
async function waitForFile(filePath: string, maxWaitMs = 5000, intervalMs = 100): Promise<boolean> {
const startTime = Date.now();
while (Date.now() - startTime < maxWaitMs) {
try {
await access(filePath, constants.R_OK);
return true;
} catch {
// File not ready yet, wait and retry
await new Promise((resolve) => setTimeout(resolve, intervalMs));
// Exponential backoff: double the interval, cap at 500ms
intervalMs = Math.min(intervalMs * 2, 500);
}
}
return false;
}
export class SignalAdapter implements ChannelAdapter {
readonly id = 'signal' as const;
readonly name = 'Signal';
private config: SignalConfig;
private running = false;
private daemonProcess: ChildProcess | null = null;
private sseAbortController: AbortController | null = null;
private baseUrl: string;
onMessage?: (msg: InboundMessage) => Promise<void>;
onCommand?: (command: string, chatId?: string, args?: string) => Promise<string | null>;
constructor(config: SignalConfig) {
this.config = {
...config,
dmPolicy: config.dmPolicy || 'pairing',
selfChatMode: config.selfChatMode !== false, // Default true
};
const host = config.httpHost || '127.0.0.1';
const port = config.httpPort || 8090;
this.baseUrl = `http://${host}:${port}`;
}
/**
* Check if a user is authorized based on dmPolicy
* Returns 'allowed', 'blocked', or 'pairing'
*/
private async checkAccess(userId: string): Promise<'allowed' | 'blocked' | 'pairing'> {
const policy = this.config.dmPolicy || 'pairing';
// Open policy: everyone allowed
if (policy === 'open') {
return 'allowed';
}
// Check if already allowed (config or store)
const allowed = await isUserAllowed('signal', userId, this.config.allowedUsers);
if (allowed) {
return 'allowed';
}
// Allowlist policy: not allowed if not in list
if (policy === 'allowlist') {
return 'blocked';
}
// Pairing policy: needs pairing
return 'pairing';
}
/**
* Format pairing message for Signal
*/
private formatPairingMessage(code: string): string {
return `Hi! This bot requires pairing.
Your code: *${code}*
Ask the owner to run:
\`lettabot pairing approve signal ${code}\`
This code expires in 1 hour.`;
}
async start(): Promise<void> {
if (this.running) return;
log.info('Starting adapter...');
// Spawn signal-cli daemon
await this.startDaemon();
// Wait for daemon to be ready
await this.waitForDaemon();
// Start SSE event loop for incoming messages
this.startEventLoop();
this.running = true;
log.info('Adapter started successfully');
}
async stop(): Promise<void> {
if (!this.running) return;
log.info('Stopping adapter...');
// Stop SSE loop
this.sseAbortController?.abort();
this.sseAbortController = null;
// Stop daemon
if (this.daemonProcess && !this.daemonProcess.killed) {
this.daemonProcess.kill('SIGTERM');
this.daemonProcess = null;
}
this.running = false;
log.info('Adapter stopped');
}
isRunning(): boolean {
return this.running;
}
async sendMessage(msg: OutboundMessage): Promise<{ messageId: string }> {
const { markdownToSignal, formatStylesForCli } = await import('./signal-format.js');
let target = msg.chatId;
const rawText = msg.text;
if (!rawText?.trim()) {
throw new Error('Signal requires message text');
}
// Handle Note to Self - send to our own number
if (target === 'note-to-self') {
target = this.config.phoneNumber;
}
// Convert markdown to Signal formatted text with style ranges
const formatted = markdownToSignal(rawText);
const params: Record<string, unknown> = {
message: formatted.text,
};
// Add style ranges if any
if (formatted.styles.length > 0) {
params['text-style'] = formatStylesForCli(formatted.styles);
}
if (this.config.phoneNumber) {
params.account = this.config.phoneNumber;
}
// Determine if this is a group or direct message
if (target.startsWith('group:')) {
params.groupId = target.slice('group:'.length);
} else {
// Direct message - recipient is a phone number or UUID
params.recipient = [target];
}
const result = await this.rpcRequest<{ timestamp?: number }>('send', params);
const timestamp = result?.timestamp;
return {
messageId: timestamp ? String(timestamp) : 'unknown',
};
}
async sendFile(file: OutboundFile): Promise<{ messageId: string }> {
const params: Record<string, unknown> = {
attachment: [file.filePath],
};
// Include caption as the message text
if (file.caption) {
params.message = file.caption;
}
if (this.config.phoneNumber) {
params.account = this.config.phoneNumber;
}
const target = file.chatId === 'note-to-self' ? this.config.phoneNumber : file.chatId;
if (target.startsWith('group:')) {
params.groupId = target.slice('group:'.length);
} else {
params.recipient = [target];
}
const result = await this.rpcRequest<{ timestamp?: number }>('send', params);
const timestamp = result?.timestamp;
return {
messageId: timestamp ? String(timestamp) : 'unknown',
};
}
getDmPolicy(): string {
return this.config.dmPolicy || 'pairing';
}
getFormatterHints() {
return {
supportsReactions: true,
supportsFiles: false,
formatHint: 'ONLY: *bold* _italic_ `code` — NO: headers, code fences, links, quotes, tables',
};
}
supportsEditing(): boolean {
return false;
}
async editMessage(_chatId: string, _messageId: string, _text: string): Promise<void> {
// Signal doesn't support editing messages - no-op
}
async addReaction(chatId: string, messageId: string, emoji: string): Promise<void> {
// messageId is encoded as "timestamp:author" by the inbound handler
const colonIdx = messageId.indexOf(':');
if (colonIdx === -1) {
throw new Error(`Signal addReaction: invalid messageId format (expected "timestamp:author"): ${messageId}`);
}
const targetTimestamp = Number(messageId.slice(0, colonIdx));
const targetAuthor = messageId.slice(colonIdx + 1);
if (!targetTimestamp || !targetAuthor) {
throw new Error(`Signal addReaction: could not parse timestamp/author from "${messageId}"`);
}
const params: Record<string, unknown> = {
emoji,
'target-author': targetAuthor,
'target-timestamp': targetTimestamp,
};
if (this.config.phoneNumber) {
params.account = this.config.phoneNumber;
}
if (chatId.startsWith('group:')) {
params.groupId = chatId.slice('group:'.length);
} else {
params.recipient = [chatId === 'note-to-self' ? this.config.phoneNumber : chatId];
}
await this.rpcRequest('sendReaction', params);
}
async sendTypingIndicator(chatId: string): Promise<void> {
try {
let target = chatId;
// Handle Note to Self
if (target === 'note-to-self') {
target = this.config.phoneNumber;
}
const params: Record<string, unknown> = {};
if (this.config.phoneNumber) {
params.account = this.config.phoneNumber;
}
if (target.startsWith('group:')) {
params.groupId = target.slice('group:'.length);
} else {
params.recipient = [target];
}
await this.rpcRequest('sendTyping', params);
} catch (err) {
// Typing indicators are best-effort
log.warn('Failed to send typing indicator:', err);
}
}
// --- Private methods ---
private async startDaemon(): Promise<void> {
const cliPath = this.config.cliPath || 'signal-cli';
const host = this.config.httpHost || '127.0.0.1';
const port = this.config.httpPort || 8090;
const args: string[] = [];
if (this.config.phoneNumber) {
args.push('-a', this.config.phoneNumber);
}
args.push('daemon');
args.push('--http', `${host}:${port}`);
args.push('--no-receive-stdout');
log.info(`Spawning: ${cliPath} ${args.join(' ')}`);
this.daemonProcess = spawn(cliPath, args, {
stdio: ['ignore', 'pipe', 'pipe'],
});
this.daemonProcess.stdout?.on('data', (data) => {
const lines = data.toString().split(/\r?\n/).filter((l: string) => l.trim());
for (const line of lines) {
log.info(`${line}`);
}
});
this.daemonProcess.stderr?.on('data', (data) => {
const lines = data.toString().split(/\r?\n/).filter((l: string) => l.trim());
for (const line of lines) {
// signal-cli writes most logs to stderr
if (/\b(ERROR|WARN|FAILED|SEVERE)\b/i.test(line)) {
log.error(`${line}`);
} else {
log.info(`${line}`);
}
}
});
this.daemonProcess.on('error', (err) => {
log.error('Daemon spawn error:', err);
});
this.daemonProcess.on('exit', (code) => {
log.info(`Daemon exited with code ${code}`);
if (this.running) {
// Unexpected exit - mark as not running
this.running = false;
}
});
}
private async waitForDaemon(): Promise<void> {
const timeoutMs = this.config.startupTimeoutMs || 30000;
const startTime = Date.now();
const pollIntervalMs = 500;
log.info('Waiting for daemon to be ready...');
while (Date.now() - startTime < timeoutMs) {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), 2000);
try {
const res = await fetch(`${this.baseUrl}/api/v1/check`, {
method: 'GET',
signal: controller.signal,
});
clearTimeout(timeout);
if (res.ok) {
log.info('Daemon is ready');
return;
}
} catch {
clearTimeout(timeout);
// Daemon not ready yet
}
await new Promise((resolve) => setTimeout(resolve, pollIntervalMs));
}
throw new Error(`Signal daemon did not become ready within ${timeoutMs}ms`);
}
private startEventLoop(): void {
this.sseAbortController = new AbortController();
// Run SSE loop in background
this.runSseLoop().catch((err) => {
if (!this.sseAbortController?.signal.aborted) {
log.error('SSE loop error:', err);
}
});
}
private async runSseLoop(): Promise<void> {
const url = new URL(`${this.baseUrl}/api/v1/events`);
if (this.config.phoneNumber) {
url.searchParams.set('account', this.config.phoneNumber);
}
log.info('Starting SSE event loop:', url.toString());
while (!this.sseAbortController?.signal.aborted) {
// Create a new controller for this connection attempt
const connectionController = new AbortController();
// Abort this connection if the main controller is aborted
const onMainAbort = () => connectionController.abort();
this.sseAbortController?.signal.addEventListener('abort', onMainAbort, { once: true });
try {
log.info('Connecting to SSE...');
const res = await fetch(url, {
method: 'GET',
headers: { Accept: 'text/event-stream' },
signal: connectionController.signal,
});
if (!res.ok || !res.body) {
throw new Error(`SSE failed: ${res.status} ${res.statusText}`);
}
log.info('SSE connected');
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (!this.sseAbortController?.signal.aborted) {
const { value, done } = await reader.read();
if (done) {
log.info('SSE stream ended');
break;
}
buffer += decoder.decode(value, { stream: true });
// Process complete events (separated by double newline)
const events = buffer.split('\n\n');
buffer = events.pop() || ''; // Keep incomplete event in buffer
for (const event of events) {
if (!event.trim()) continue;
// Extract data from SSE event (may be multiline)
const lines = event.split('\n');
let data = '';
for (const line of lines) {
if (line.startsWith('data:')) {
data += line.slice(5).trim();
}
}
if (data) {
this.handleSseData(data).catch((err) => {
log.error('Error handling SSE data:', err);
});
}
}
}
// Stream ended normally - wait before reconnecting
log.info('SSE disconnected, reconnecting in 2s...');
await new Promise((resolve) => setTimeout(resolve, 2000));
} catch (err) {
if (this.sseAbortController?.signal.aborted) {
return;
}
log.error('SSE connection error, reconnecting in 5s:', err);
await new Promise((resolve) => setTimeout(resolve, 5000));
} finally {
// Clean up the listener
this.sseAbortController?.signal.removeEventListener('abort', onMainAbort);
}
}
}
private async handleSseData(data: string): Promise<void> {
try {
const event = JSON.parse(data) as SignalSseEvent;
const envelope = event.envelope;
if (!envelope) return;
// Debug: log when we receive any message
if (envelope.dataMessage || envelope.syncMessage) {
log.info('Received envelope:', JSON.stringify(envelope, null, 2));
}
// Handle incoming data messages (from others)
const dataMessage = envelope.dataMessage;
// Handle sync messages (Note to Self, messages we sent from another device)
const syncMessage = envelope.syncMessage?.sentMessage;
// Get the message text and source from either type
let messageText: string | undefined;
let source: string | undefined;
let chatId: string | undefined;
let groupInfo: { groupId?: string; groupName?: string } | undefined;
let attachments: Array<{ contentType?: string; filename?: string; id?: string }> | undefined;
if (dataMessage?.message || dataMessage?.attachments?.length) {
// Regular incoming message
messageText = dataMessage.message;
source = envelope.source || envelope.sourceUuid;
groupInfo = dataMessage.groupInfo;
attachments = dataMessage.attachments;
if (groupInfo?.groupId) {
chatId = `group:${groupInfo.groupId}`;
} else {
chatId = source;
}
} else if (syncMessage?.message || syncMessage?.attachments?.length) {
// Sync message (Note to Self or sent from another device)
messageText = syncMessage.message;
source = syncMessage.destination || syncMessage.destinationUuid;
groupInfo = syncMessage.groupInfo;
attachments = syncMessage.attachments;
// For Note to Self, destination is our own number
const isNoteToSelf = source === this.config.phoneNumber ||
source === envelope.source ||
source === envelope.sourceUuid;
if (isNoteToSelf) {
chatId = 'note-to-self';
} else if (groupInfo?.groupId) {
chatId = `group:${groupInfo.groupId}`;
} else {
chatId = source;
}
}
// Check if we have a valid message before attachment processing
if (!source || !chatId) {
return;
}
// Handle voice message attachments
// Log all attachments for debugging
if (attachments?.length) {
log.info(`Attachments received: ${JSON.stringify(attachments.map(a => ({ type: a.contentType, id: a.id })))}`);
}
const voiceAttachment = attachments?.find(a => a.contentType?.startsWith('audio/'));
if (voiceAttachment?.id) {
log.info(`Voice attachment detected: ${voiceAttachment.contentType}, id: ${voiceAttachment.id}`);
// Always persist voice audio to attachments directory
let savedAudioPath: string | undefined;
const signalAttDir = join(homedir(), '.local/share/signal-cli/attachments');
const voiceSourcePath = join(signalAttDir, voiceAttachment.id);
if (this.config.attachmentsDir) {
const rawExt = voiceAttachment.contentType?.split('/')[1] || 'ogg';
// Clean extension: "aac" not "aac.aac" (filename may already have extension)
const ext = rawExt.replace(/;.*$/, ''); // strip codec params like "ogg;codecs=opus"
const voiceFileName = `voice-${voiceAttachment.id}.${ext}`;
const voiceTargetPath = buildAttachmentPath(this.config.attachmentsDir, 'signal', chatId, voiceFileName);
try {
const voiceFileReady = await waitForFile(voiceSourcePath, 5000);
if (voiceFileReady) {
await copyFile(voiceSourcePath, voiceTargetPath);
savedAudioPath = voiceTargetPath;
log.info(`Voice audio saved to ${voiceTargetPath}`);
}
} catch (err) {
log.warn('Failed to save voice audio:', err);
}
}
try {
const { isTranscriptionConfigured } = await import('../transcription/index.js');
if (!isTranscriptionConfigured()) {
if (chatId) {
await this.sendMessage({
chatId,
text: 'Voice messages require a transcription API key. See: https://github.com/letta-ai/lettabot#voice'
});
}
} else {
// Read attachment from signal-cli attachments directory
// Note: signal-cli may still be downloading when SSE event fires, so we wait
const { readFileSync } = await import('node:fs');
const { homedir: hd } = await import('node:os');
const { join: pjoin } = await import('node:path');
const attachmentPath = pjoin(hd(), '.local/share/signal-cli/attachments', voiceAttachment.id);
log.info(`Waiting for attachment: ${attachmentPath}`);
// Wait for file to be available (signal-cli may still be downloading)
const fileReady = await waitForFile(attachmentPath, 5000);
if (!fileReady) {
log.error(`Attachment file not found after waiting: ${attachmentPath}`);
throw new Error(`Attachment file not found after waiting: ${attachmentPath}`);
}
log.info(`Attachment file ready: ${attachmentPath}`);
const buffer = readFileSync(attachmentPath);
log.info(`Read ${buffer.length} bytes`);
const { transcribeAudio } = await import('../transcription/index.js');
const ext = voiceAttachment.contentType?.split('/')[1] || 'ogg';
const result = await transcribeAudio(buffer, `voice.${ext}`, { audioPath: attachmentPath });
const audioRef = savedAudioPath ? ` (audio: ${savedAudioPath})` : '';
if (result.success) {
if (result.text) {
log.info(`Transcribed voice message: "${result.text.slice(0, 50)}..."`);
messageText = (messageText ? messageText + '\n' : '') + `[Voice message]: ${result.text}`;
} else {
log.warn(`Transcription returned empty text`);
messageText = (messageText ? messageText + '\n' : '') + `[Voice message - transcription returned empty${audioRef}]`;
}
} else {
const errorMsg = result.error || 'Unknown transcription error';
log.error(`Transcription failed: ${errorMsg}`);
messageText = (messageText ? messageText + '\n' : '') + `[Voice message - transcription failed: ${errorMsg}${audioRef}]`;
}
}
} catch (error) {
log.error('Error transcribing voice message:', error);
const audioRef = savedAudioPath ? ` Audio saved to: ${savedAudioPath}` : '';
messageText = (messageText ? messageText + '\n' : '') + `[Voice message - error: ${error instanceof Error ? error.message : 'unknown error'}.${audioRef}]`;
}
} else if (attachments?.some(a => a.contentType?.startsWith('audio/'))) {
// Audio attachment exists but has no ID
log.warn(`Audio attachment found but missing ID: ${JSON.stringify(voiceAttachment)}`);
}
// Collect non-voice attachments (images, files, etc.)
const collectedAttachments = await this.collectSignalAttachments(attachments, chatId);
// After processing attachments, check if we have any message content.
// If this was a voice-only message and transcription failed/was disabled,
// still forward a placeholder so the user knows we got it.
if (!messageText && voiceAttachment?.id) {
messageText = '[Voice message received]';
}
if (!messageText && collectedAttachments.length === 0) {
return;
}
// Handle Note to Self - check selfChatMode
log.info(`Processing message: chatId=${chatId}, source=${source}, selfChatMode=${this.config.selfChatMode}`);
if (chatId === 'note-to-self') {
if (!this.config.selfChatMode) {
// selfChatMode disabled - ignore Note to Self messages
log.info('Note to Self ignored (selfChatMode disabled)');
return;
}
// selfChatMode enabled - allow the message through
log.info('Note to Self allowed (selfChatMode enabled)');
} else if (chatId.startsWith('group:')) {
// Group messages bypass pairing - anyone in the group can interact
log.info('Group message - bypassing access control');
} else {
// External DM - check access control
log.info('Checking access for external message');
const access = await this.checkAccess(source);
log.info(`Access result: ${access}`);
if (access === 'blocked') {
log.info(`Blocked message from unauthorized user: ${source}`);
await this.sendMessage({ chatId: source, text: "Sorry, you're not authorized to use this bot." });
return;
}
if (access === 'pairing') {
// Create pairing request
const { code, created } = await upsertPairingRequest('signal', source, {
firstName: source, // Use phone number as name
});
if (!code) {
await this.sendMessage({
chatId: source,
text: "Too many pending pairing requests. Please try again later."
});
return;
}
// Send pairing message on first contact
if (created) {
log.info(`New pairing request from ${source}: ${code}`);
await this.sendMessage({ chatId: source, text: this.formatPairingMessage(code) });
}
// Don't process the message
return;
}
}
// Handle slash commands
const parsed = parseCommand(messageText);
if (parsed) {
if (parsed.command === 'help' || parsed.command === 'start') {
await this.sendMessage({ chatId, text: HELP_TEXT });
} else if (this.onCommand) {
const result = await this.onCommand(parsed.command, chatId, parsed.args || undefined);
if (result) await this.sendMessage({ chatId, text: result });
}
return; // Don't pass commands to agent
}
const isGroup = chatId.startsWith('group:');
// Apply group gating mode
let wasMentioned: boolean | undefined;
let isListeningMode = false;
if (isGroup && groupInfo?.groupId) {
const mentions = dataMessage?.mentions || syncMessage?.mentions;
const quote = dataMessage?.quote || syncMessage?.quote;
const gatingResult = applySignalGroupGating({
text: messageText || '',
groupId: groupInfo.groupId,
senderId: source,
mentions,
quote,
selfPhoneNumber: this.config.phoneNumber,
groupsConfig: this.config.groups,
mentionPatterns: this.config.mentionPatterns,
});
if (!gatingResult.shouldProcess) {
log.info(`Group message filtered: ${gatingResult.reason}`);
return;
}
// Daily rate limit check
const groupKeys = [groupInfo.groupId, `group:${groupInfo.groupId}`];
const limits = resolveDailyLimits(this.config.groups, groupKeys);
const counterKey = `${this.config.agentName ?? ''}:signal:${limits.matchedKey ?? groupInfo.groupId}`;
const limitResult = checkDailyLimit(counterKey, source, limits);
if (!limitResult.allowed) {
log.info(`Daily limit reached for ${counterKey} (${limitResult.reason})`);
return;
}
wasMentioned = gatingResult.wasMentioned;
isListeningMode = gatingResult.mode === 'listen' && !wasMentioned;
if (wasMentioned) {
log.info(`Bot mentioned via ${gatingResult.method}`);
}
}
// Signal uses timestamps as message IDs. Encode as "timestamp:author" so
// addReaction() can extract the target-author for sendReaction.
const signalTimestamp = envelope.timestamp || Date.now();
const msg: InboundMessage = {
channel: 'signal',
chatId,
userId: source,
messageId: `${signalTimestamp}:${source}`,
text: messageText || '',
timestamp: new Date(signalTimestamp),
isGroup,
groupName: groupInfo?.groupName,
wasMentioned,
isListeningMode,
attachments: collectedAttachments.length > 0 ? collectedAttachments : undefined,
formatterHints: this.getFormatterHints(),
};
this.onMessage?.(msg).catch((err) => {
log.error('Error handling message:', err);
});
} catch (err) {
log.error('Failed to parse SSE event:', err, data);
}
}
private async rpcRequest<T = unknown>(
method: string,
params: Record<string, unknown>,
): Promise<T> {
const id = randomUUID();
const body = JSON.stringify({
jsonrpc: '2.0',
method,
params,
id,
});
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), 10000);
let res: Response;
try {
res = await fetch(`${this.baseUrl}/api/v1/rpc`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body,
signal: controller.signal,
});
} finally {
clearTimeout(timeout);
}
if (res.status === 201) {
return undefined as T;
}
const text = await res.text();
if (!text) {
throw new Error(`Signal RPC empty response (status ${res.status})`);
}
const parsed = JSON.parse(text) as SignalRpcResponse<T>;
if (parsed.error) {
const code = parsed.error.code ?? 'unknown';
const msg = parsed.error.message ?? 'Signal RPC error';
throw new Error(`Signal RPC ${code}: ${msg}`);
}
return parsed.result as T;
}
/**
* Collect attachments from a Signal message
* Copies from signal-cli's attachments directory to our attachments directory
*/
private async collectSignalAttachments(
attachments: Array<{ contentType?: string; filename?: string; id?: string; size?: number; width?: number; height?: number; caption?: string }> | undefined,
chatId: string
): Promise<InboundAttachment[]> {
if (!attachments || attachments.length === 0) return [];
if (!this.config.attachmentsDir) return [];
const results: InboundAttachment[] = [];
const signalAttachmentsDir = join(homedir(), '.local/share/signal-cli/attachments');
for (const attachment of attachments) {
// Skip voice attachments - handled separately by transcription
if (attachment.contentType?.startsWith('audio/')) continue;
if (!attachment.id) continue;
const sourcePath = join(signalAttachmentsDir, attachment.id);
const name = attachment.filename || attachment.id;
const entry: InboundAttachment = {
id: attachment.id,
name,
mimeType: attachment.contentType,
size: attachment.size,
kind: attachment.contentType?.startsWith('image/') ? 'image'
: attachment.contentType?.startsWith('video/') ? 'video'
: 'file',
};
// Check size limit
if (this.config.attachmentsMaxBytes && this.config.attachmentsMaxBytes > 0) {
try {
const stats = await stat(sourcePath);
if (stats.size > this.config.attachmentsMaxBytes) {
log.warn(`Attachment ${name} exceeds size limit, skipping download.`);
results.push(entry);
continue;
}
} catch {
// File might not exist
}
}
// Wait for file to be available (signal-cli may still be downloading)
const fileReady = await waitForFile(sourcePath, 5000);
if (!fileReady) {
log.warn(`Attachment ${name} not found after waiting, skipping.`);
results.push(entry);
continue;
}
// Copy to our attachments directory
const target = buildAttachmentPath(this.config.attachmentsDir, 'signal', chatId, name);
try {
await copyFile(sourcePath, target);
entry.localPath = target;
log.info(`Attachment saved to ${target}`);
} catch (err) {
log.warn('Failed to copy attachment:', err);
}
results.push(entry);
}
return results;
}
}