diff --git a/.gitignore b/.gitignore index 6902b1e..39483ff 100644 --- a/.gitignore +++ b/.gitignore @@ -60,3 +60,4 @@ bun.lock # Telegram MTProto session data (contains auth secrets) data/telegram-mtproto/ +logs/ diff --git a/package.json b/package.json index c69b273..ce63683 100644 --- a/package.json +++ b/package.json @@ -16,7 +16,7 @@ "setup": "tsx src/setup.ts", "dev": "tsx src/main.ts", "build": "tsc", - "postbuild": "cp -r src/looms/*.txt dist/looms/ && cp src/api/portal.html dist/api/portal.html && node scripts/fix-bin-permissions.mjs", + "postbuild": "cp -r src/looms/*.txt dist/looms/ && cp src/api/portal.html dist/api/portal.html && cp src/core/turn-viewer.html dist/core/ && node scripts/fix-bin-permissions.mjs", "prepare": "npx patch-package || true", "prepublishOnly": "npm run build && npm run test:run", "start": "node dist/main.js", diff --git a/src/api/server.ts b/src/api/server.ts index 9d263a5..38f76db 100644 --- a/src/api/server.ts +++ b/src/api/server.ts @@ -5,6 +5,8 @@ import * as http from 'http'; import * as fs from 'fs'; +import { readFile } from 'node:fs/promises'; +import * as crypto from 'node:crypto'; import { validateApiKey } from './auth.js'; import type { SendMessageResponse, ChatRequest, ChatResponse, AsyncChatResponse, PairingListResponse, PairingApproveRequest, PairingApproveResponse } from './types.js'; import { listPairingRequests, approvePairingCode } from '../pairing/store.js'; @@ -18,6 +20,7 @@ import { buildErrorResponse, buildModelList, validateChatRequest, } from './openai-compat.js'; import type { OpenAIChatRequest } from './openai-compat.js'; +import { getTurnViewerHtml } from '../core/turn-viewer.js'; import { createLogger } from '../logger.js'; @@ -38,8 +41,9 @@ type ResolvedChatRequest = { interface ServerOptions { port: number; apiKey: string; - host?: string; // Bind address (default: 127.0.0.1 for security) + host?: string; // Bind address (default: 127.0.0.1 for security) corsOrigin?: string; // CORS origin (default: same-origin only) + turnLogFiles?: Record; // agentName -> filePath; enables GET /turns viewer stores?: Map; // Agent stores for management endpoints agentChannels?: Map; // Channel IDs per agent name sessionInvalidators?: Map void>; // Invalidate live sessions after store writes @@ -49,6 +53,155 @@ interface ServerOptions { * Create and start the HTTP API server */ export function createApiServer(deliverer: AgentRouter, options: ServerOptions): http.Server { + // ── Turn viewer SSE infrastructure ────────────────────────────────────── + interface SSEClient { + res: http.ServerResponse; + sentCount: number; + lastTurnId?: string; + } + const sseClientsByAgent = new Map>(); + const broadcastQueues = new Map>(); + + function getTurnId(turn: unknown): string | undefined { + if (!turn || typeof turn !== 'object') return undefined; + const id = (turn as { turnId?: unknown }).turnId; + return typeof id === 'string' && id.trim() ? id : undefined; + } + + async function readTurns(filePath: string): Promise { + try { + const content = await readFile(filePath, 'utf8'); + return content.split('\n').filter(l => l.trim()).map(l => { + try { return JSON.parse(l); } catch { return null; } + }).filter(Boolean); + } catch { return []; } + } + + async function broadcastNewTurns(agentName: string, filePath: string): Promise { + const clients = sseClientsByAgent.get(agentName); + if (!clients || clients.size === 0) return; + const allTurns = await readTurns(filePath); + const currentLastTurnId = getTurnId(allTurns[allTurns.length - 1]); + + for (const client of clients) { + // If the log was cleared, reset the client snapshot and UI. + if (allTurns.length === 0) { + const shouldReset = client.sentCount !== 0 || !!client.lastTurnId; + client.sentCount = 0; + client.lastTurnId = undefined; + if (shouldReset) { + const payload = `data: ${JSON.stringify({ type: 'init', turns: [] })}\n\n`; + try { client.res.write(payload); } catch { clients.delete(client); } + } + continue; + } + + if (client.lastTurnId === currentLastTurnId && client.sentCount === allTurns.length) { + continue; + } + + let turnsToAppend: unknown[] | null = null; + if (client.lastTurnId) { + let previousIndex = -1; + for (let i = allTurns.length - 1; i >= 0; i--) { + if (getTurnId(allTurns[i]) === client.lastTurnId) { + previousIndex = i; + break; + } + } + if (previousIndex >= 0) { + turnsToAppend = allTurns.slice(previousIndex + 1); + } + } else if (client.sentCount < allTurns.length) { + // Fallback for older records without turnId. + turnsToAppend = allTurns.slice(client.sentCount); + } + + const appendTurns = turnsToAppend ?? []; + const shouldResync = turnsToAppend === null + || (appendTurns.length === 0 && (client.sentCount !== allTurns.length || client.lastTurnId !== currentLastTurnId)); + + const payload = shouldResync + ? `data: ${JSON.stringify({ type: 'init', turns: allTurns })}\n\n` + : appendTurns.length > 0 + ? `data: ${JSON.stringify({ type: 'append', turns: appendTurns })}\n\n` + : null; + + if (payload) { + try { + client.res.write(payload); + } catch { + clients.delete(client); + continue; + } + } + + client.sentCount = allTurns.length; + client.lastTurnId = currentLastTurnId; + } + } + + function enqueueBroadcast(agentName: string, filePath: string): void { + const prev = broadcastQueues.get(filePath) ?? Promise.resolve(); + const next = prev.then(() => broadcastNewTurns(agentName, filePath)).catch(() => {}); + broadcastQueues.set(filePath, next); + } + + const watchers = new Map(); + + function ensureWatching(agentName: string, filePath: string): void { + if (watchers.has(filePath)) return; + let watcher: fs.FSWatcher; + try { + watcher = fs.watch(filePath, { persistent: false }, (eventType) => { + if (eventType === 'rename') { + // Inode replaced (trim via atomic rename on Linux). Restart watcher. + watcher.close(); + watchers.delete(filePath); + setTimeout(() => { + const clients = sseClientsByAgent.get(agentName); + if (clients && clients.size > 0) { + enqueueBroadcast(agentName, filePath); + ensureWatching(agentName, filePath); + } + }, 200); + return; + } + enqueueBroadcast(agentName, filePath); + }); + } catch { + setTimeout(() => { + const clients = sseClientsByAgent.get(agentName); + if (clients && clients.size > 0) ensureWatching(agentName, filePath); + }, 2000); + return; + } + watcher.on('error', () => { + watcher.close(); + watchers.delete(filePath); + // Auto-restart watcher after trim (inode replacement on Linux) + setTimeout(() => { + const clients = sseClientsByAgent.get(agentName); + if (clients && clients.size > 0) ensureWatching(agentName, filePath); + }, 500); + }); + watchers.set(filePath, watcher); + } + + function maybeUnwatch(filePath: string, clients: Set): void { + if (clients.size === 0 && watchers.has(filePath)) { + watchers.get(filePath)!.close(); + watchers.delete(filePath); + broadcastQueues.delete(filePath); + } + } + + if (options.turnLogFiles) { + for (const agentName of Object.keys(options.turnLogFiles)) { + sseClientsByAgent.set(agentName, new Set()); + } + } + const server = http.createServer(async (req, res) => { // Set CORS headers (configurable origin, defaults to same-origin for security) const corsOrigin = options.corsOrigin || req.headers.origin || 'null'; @@ -70,6 +223,63 @@ export function createApiServer(deliverer: AgentRouter, options: ServerOptions): return; } + // Turn viewer routes + if (options.turnLogFiles && req.method === 'GET') { + const agentNames = Object.keys(options.turnLogFiles); + const parsedUrl = new URL(req.url ?? '/', `http://localhost`); + + const validateTurnAuth = (): boolean => { + if (validateApiKey(req.headers, options.apiKey)) return true; + const qKey = parsedUrl.searchParams.get('key') || ''; + if (!qKey) return false; + const a = Buffer.from(qKey); + const b = Buffer.from(options.apiKey); + return a.length === b.length && crypto.timingSafeEqual(a, b); + }; + + if (parsedUrl.pathname === '/turns') { + res.writeHead(200, { 'Content-Type': 'text/html; charset=utf-8' }); + res.end(getTurnViewerHtml(agentNames)); + return; + } + if (parsedUrl.pathname === '/turns/data') { + if (!validateTurnAuth()) { sendError(res, 401, 'Unauthorized'); return; } + const agentName = parsedUrl.searchParams.get('agent') || agentNames[0]; + const filePath = options.turnLogFiles[agentName]; + if (!filePath) { res.writeHead(404); res.end('Unknown agent'); return; } + res.writeHead(200, { 'Content-Type': 'application/json', 'Cache-Control': 'no-store' }); + res.end(JSON.stringify(await readTurns(filePath))); + return; + } + if (parsedUrl.pathname === '/turns/stream') { + if (!validateTurnAuth()) { sendError(res, 401, 'Unauthorized'); return; } + const agentName = parsedUrl.searchParams.get('agent') || agentNames[0]; + const filePath = options.turnLogFiles[agentName]; + if (!filePath) { res.writeHead(404); res.end('Unknown agent'); return; } + res.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-store', + 'Connection': 'keep-alive', + 'X-Accel-Buffering': 'no', + }); + const allTurns = await readTurns(filePath); + res.write(`data: ${JSON.stringify({ type: 'init', turns: allTurns })}\n\n`); + const clients = sseClientsByAgent.get(agentName)!; + const client: SSEClient = { + res, + sentCount: allTurns.length, + lastTurnId: getTurnId(allTurns[allTurns.length - 1]), + }; + clients.add(client); + ensureWatching(agentName, filePath); + req.on('close', () => { + clients.delete(client); + maybeUnwatch(filePath, clients); + }); + return; + } + } + // Route: POST /api/v1/messages (unified: supports both text and files) if (req.url === '/api/v1/messages' && req.method === 'POST') { try { diff --git a/src/config/types.ts b/src/config/types.ts index b752626..3cdea95 100644 --- a/src/config/types.ts +++ b/src/config/types.ts @@ -102,6 +102,10 @@ export interface AgentConfig { display?: DisplayConfig; allowedTools?: string[]; // Per-agent tool whitelist (overrides global/env ALLOWED_TOOLS) disallowedTools?: string[]; // Per-agent tool blocklist (overrides global/env DISALLOWED_TOOLS) + logging?: { + turnLogFile?: string; // Path to JSONL file for turn logging (one record per agent turn) + maxTurns?: number; // Max turns to retain in the log file (default: 1000, oldest trimmed) + }; }; /** Security settings */ security?: { @@ -195,6 +199,10 @@ export interface LettaBotConfig { display?: DisplayConfig; // Show tool calls / reasoning in channel output allowedTools?: string[]; // Global tool whitelist (overridden by per-agent, falls back to ALLOWED_TOOLS env) disallowedTools?: string[]; // Global tool blocklist (overridden by per-agent, falls back to DISALLOWED_TOOLS env) + logging?: { + turnLogFile?: string; // Global turn log file (overridden by per-agent) + maxTurns?: number; // Global maxTurns default (overridden by per-agent) + }; }; // Polling - system-level background checks (Gmail, etc.) diff --git a/src/core/bot.ts b/src/core/bot.ts index 8a4c094..3312b33 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -10,7 +10,7 @@ import { access, unlink, realpath, stat, constants } from 'node:fs/promises'; import { execFile } from 'node:child_process'; import { extname, resolve, join } from 'node:path'; import type { ChannelAdapter } from '../channels/types.js'; -import type { BotConfig, InboundMessage, TriggerContext, StreamMsg } from './types.js'; +import type { BotConfig, InboundMessage, TriggerContext, TriggerType, StreamMsg } from './types.js'; import { formatApiErrorForUser } from './errors.js'; import { formatToolCallDisplay, formatReasoningDisplay, formatQuestionsForChannel } from './display.js'; import type { AgentSession } from './interfaces.js'; @@ -24,6 +24,7 @@ import { parseDirectives, stripActionsBlock, type Directive } from './directives import { resolveEmoji } from './emoji.js'; import { SessionManager } from './session-manager.js'; import { createDisplayPipeline, type DisplayEvent, type CompleteEvent, type ErrorEvent } from './display-pipeline.js'; +import { TurnLogger, TurnAccumulator, generateTurnId, type TurnRecord } from './turn-logger.js'; import { createLogger } from '../logger.js'; @@ -233,11 +234,15 @@ export class LettaBot implements AgentSession { private conversationOverrides: Set = new Set(); private readonly sessionManager: SessionManager; + private readonly turnLogger: TurnLogger | null; constructor(config: BotConfig) { this.config = config; mkdirSync(config.workingDir, { recursive: true }); this.store = new Store('lettabot-agent.json', config.agentName); + this.turnLogger = config.logging?.turnLogFile + ? new TurnLogger(config.logging.turnLogFile, config.logging.maxTurns) + : null; if (config.reuseSession === false) { log.warn('Session reuse disabled (conversations.reuseSession=false): each foreground/background message uses a fresh SDK subprocess (~5s overhead per turn).'); } @@ -1172,7 +1177,7 @@ export class LettaBot implements AgentSession { private async processMessage(msg: InboundMessage, adapter: ChannelAdapter, retried = false): Promise { // Track timing and last target const debugTiming = !!process.env.LETTABOT_DEBUG_TIMING; - const t0 = debugTiming ? performance.now() : 0; + const t0 = performance.now(); const lap = (label: string) => { log.debug(`${label}: ${(performance.now() - t0).toFixed(0)}ms`); }; @@ -1278,6 +1283,10 @@ export class LettaBot implements AgentSession { adapter.sendTypingIndicator(msg.chatId).catch(() => {}); }, 4000); + const turnId = this.turnLogger ? generateTurnId() : ''; + const turnAcc = this.turnLogger ? new TurnAccumulator() : null; + let turnWritten = false; + try { let firstChunkLogged = false; const pipeline = createDisplayPipeline(run.stream(), { @@ -1286,6 +1295,7 @@ export class LettaBot implements AgentSession { }); for await (const event of pipeline) { + turnAcc?.feed(event); // Check for /cancel before processing each event if (this.cancelledKeys.has(convKey)) { log.info(`Stream cancelled by /cancel (key=${convKey})`); @@ -1627,6 +1637,24 @@ export class LettaBot implements AgentSession { } finally { clearInterval(typingInterval); adapter.stopTypingIndicator?.(msg.chatId)?.catch(() => {}); + // Write turn record (even partial turns on cancel/error) + if (this.turnLogger && turnAcc && !turnWritten) { + turnWritten = true; + const { events, output } = turnAcc.finalize(); + this.turnLogger.write({ + ts: new Date().toISOString(), + turnId, + trigger: 'user_message' as const, + channel: msg.channel, + chatId: msg.chatId, + userId: msg.userId, + input: typeof messageToSend === 'string' ? messageToSend : '[multimodal]', + events, + output: output || response, + durationMs: Math.round(performance.now() - t0), + error: lastErrorDetail?.message, + }).catch(() => {}); + } } lap('stream complete'); @@ -1784,8 +1812,14 @@ export class LettaBot implements AgentSession { const convKey = this.resolveHeartbeatConversationKey(); const acquired = await this.acquireLock(convKey); + const sendT0 = performance.now(); + const sendTurnId = this.turnLogger ? generateTurnId() : ''; + const sendTurnAcc = this.turnLogger ? new TurnAccumulator() : null; + let sendTurnWritten = false; + try { let retried = false; + while (true) { const { stream } = await this.sessionManager.runSession(text, { convKey, retried }); @@ -1796,6 +1830,7 @@ export class LettaBot implements AgentSession { let usedMessageCli = false; let lastErrorDetail: StreamErrorDetail | undefined; for await (const msg of stream()) { + sendTurnAcc?.feedRaw(msg); if (msg.type === 'tool_call') { this.sessionManager.syncTodoToolCall(msg); if (isSilent && msg.toolName === 'Bash') { @@ -1930,6 +1965,21 @@ export class LettaBot implements AgentSession { if (this.config.reuseSession === false) { this.sessionManager.invalidateSession(convKey); } + // Write turn record + if (this.turnLogger && sendTurnAcc && !sendTurnWritten) { + sendTurnWritten = true; + const { events, output } = sendTurnAcc.finalize(); + this.turnLogger.write({ + ts: new Date().toISOString(), + turnId: sendTurnId, + trigger: context?.type ?? 'heartbeat', + channel: context?.sourceChannel, + input: text, + events, + output, + durationMs: Math.round(performance.now() - sendT0), + }).catch(() => {}); + } this.releaseLock(convKey, acquired); } } @@ -1944,20 +1994,42 @@ export class LettaBot implements AgentSession { ): AsyncGenerator { const convKey = this.resolveHeartbeatConversationKey(); const acquired = await this.acquireLock(convKey); + const streamT0 = performance.now(); + const streamTurnId = this.turnLogger ? generateTurnId() : ''; + const streamTurnAcc = this.turnLogger ? new TurnAccumulator() : null; + let streamTurnError: string | undefined; try { const { stream } = await this.sessionManager.runSession(text, { convKey }); try { - yield* stream(); + for await (const msg of stream()) { + streamTurnAcc?.feedRaw(msg); + yield msg; + } } catch (error) { this.sessionManager.invalidateSession(convKey); + streamTurnError = error instanceof Error ? error.message : String(error); throw error; } } finally { if (this.config.reuseSession === false) { this.sessionManager.invalidateSession(convKey); } + if (this.turnLogger && streamTurnAcc) { + const { events, output } = streamTurnAcc.finalize(); + this.turnLogger.write({ + ts: new Date().toISOString(), + turnId: streamTurnId, + trigger: context?.type ?? 'heartbeat', + channel: context?.sourceChannel, + input: text, + events, + output, + durationMs: Math.round(performance.now() - streamT0), + error: streamTurnError, + }).catch(() => {}); + } this.releaseLock(convKey, acquired); } } diff --git a/src/core/turn-logger.ts b/src/core/turn-logger.ts new file mode 100644 index 0000000..289c1b1 --- /dev/null +++ b/src/core/turn-logger.ts @@ -0,0 +1,222 @@ +/** + * Turn logger -- writes one JSONL record per agent turn. + * + * TurnLogger handles file I/O with write serialization and bounded retention. + * TurnAccumulator collects DisplayEvents during a turn and produces a TurnRecord. + */ + +import { appendFile, readFile, writeFile, rename } from 'node:fs/promises'; +import { mkdirSync, readFileSync } from 'node:fs'; +import { dirname } from 'node:path'; +import { randomUUID } from 'node:crypto'; +import { createLogger } from '../logger.js'; +import type { DisplayEvent } from './display-pipeline.js'; +import type { TriggerType, StreamMsg } from './types.js'; + +const log = createLogger('TurnLogger'); + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +export type TurnEvent = + | { type: 'reasoning'; content: string } + | { type: 'tool_call'; id: string; name: string; args: unknown } + | { type: 'tool_result'; id: string; content: string; isError: boolean }; + +export interface TurnRecord { + ts: string; + turnId: string; + trigger: TriggerType; + channel?: string; + chatId?: string; + userId?: string; + input: string; + events: TurnEvent[]; + output: string; + durationMs?: number; + error?: string; +} + +// --------------------------------------------------------------------------- +// Constants +// --------------------------------------------------------------------------- + +const DEFAULT_MAX_TURNS = 1000; +const MAX_TOOL_RESULT_BYTES = 4 * 1024; + +// --------------------------------------------------------------------------- +// TurnAccumulator +// --------------------------------------------------------------------------- + +/** + * Collects DisplayEvents during a single agent turn and produces a TurnRecord. + * Works with the DisplayPipeline's event types -- no raw StreamMsg handling. + */ +export class TurnAccumulator { + private _events: TurnEvent[] = []; + private _output = ''; + // For feedRaw() only (sendToAgent/streamToAgent paths) + private _reasoningAcc = ''; + private _lastRawType: string | null = null; + + /** Feed a DisplayEvent from the pipeline. */ + feed(event: DisplayEvent): void { + switch (event.type) { + case 'reasoning': + // Pipeline already accumulates and flushes reasoning chunks, + // so each reasoning event is a complete block. + this._events.push({ type: 'reasoning', content: event.content }); + break; + case 'tool_call': + this._events.push({ + type: 'tool_call', + id: event.id, + name: event.name, + args: event.args, + }); + break; + case 'tool_result': { + const content = event.content.length > MAX_TOOL_RESULT_BYTES + ? event.content.slice(0, MAX_TOOL_RESULT_BYTES) + '…[truncated]' + : event.content; + this._events.push({ + type: 'tool_result', + id: event.toolCallId, + content, + isError: event.isError, + }); + break; + } + case 'text': + this._output += event.delta; + break; + } + } + + /** + * Feed a raw StreamMsg (for sendToAgent/streamToAgent which don't use DisplayPipeline). + * Handles reasoning accumulation inline since there's no pipeline to do it. + */ + feedRaw(msg: StreamMsg): void { + // Flush reasoning on transition away from reasoning + if (this._lastRawType === 'reasoning' && msg.type !== 'reasoning' && this._reasoningAcc.trim()) { + this._events.push({ type: 'reasoning', content: this._reasoningAcc.trim() }); + this._reasoningAcc = ''; + } + switch (msg.type) { + case 'reasoning': + this._reasoningAcc += msg.content || ''; + break; + case 'tool_call': + this._events.push({ + type: 'tool_call', + id: msg.toolCallId || '', + name: msg.toolName || 'unknown', + args: (msg as any).toolInput ?? (msg as any).rawArguments ?? null, + }); + break; + case 'tool_result': { + const raw = (msg as any).content ?? ''; + const str = typeof raw === 'string' ? raw : JSON.stringify(raw); + this._events.push({ + type: 'tool_result', + id: msg.toolCallId || '', + content: str.length > MAX_TOOL_RESULT_BYTES + ? str.slice(0, MAX_TOOL_RESULT_BYTES) + '…[truncated]' + : str, + isError: !!msg.isError, + }); + break; + } + case 'assistant': + this._output += msg.content || ''; + break; + } + if (msg.type !== 'stream_event') this._lastRawType = msg.type; + } + + /** Return the accumulated events and output text. */ + finalize(): { events: TurnEvent[]; output: string } { + // Flush trailing reasoning + if (this._reasoningAcc.trim()) { + this._events.push({ type: 'reasoning', content: this._reasoningAcc.trim() }); + this._reasoningAcc = ''; + } + return { events: this._events, output: this._output }; + } +} + +// --------------------------------------------------------------------------- +// TurnLogger +// --------------------------------------------------------------------------- + +export class TurnLogger { + private filePath: string; + private maxTurns: number; + private ready = false; + private lineCount = 0; + private writeQueue = Promise.resolve(); + + constructor(filePath: string, maxTurns = DEFAULT_MAX_TURNS) { + this.filePath = filePath; + if (!Number.isInteger(maxTurns) || maxTurns <= 0) { + throw new Error(`TurnLogger: maxTurns must be a positive integer (got ${maxTurns})`); + } + this.maxTurns = maxTurns; + try { + mkdirSync(dirname(filePath), { recursive: true }); + this.ready = true; + try { + const existing = readFileSync(filePath, 'utf8'); + this.lineCount = existing.split('\n').filter(l => l.trim()).length; + } catch { + this.lineCount = 0; + } + } catch (err) { + log.warn(`Failed to create log directory for ${filePath}:`, err instanceof Error ? err.message : err); + } + } + + /** Serialized write -- prevents concurrent trim races. */ + async write(record: TurnRecord): Promise { + if (!this.ready) return; + this.writeQueue = this.writeQueue.then(() => this._write(record)).catch(() => {}); + return this.writeQueue; + } + + private async _write(record: TurnRecord): Promise { + try { + await appendFile(this.filePath, JSON.stringify(record) + '\n'); + this.lineCount++; + if (this.lineCount > this.maxTurns) { + await this.trim(); + } + } catch (err) { + log.warn(`Failed to write turn record:`, err instanceof Error ? err.message : err); + } + } + + private async trim(): Promise { + try { + const content = await readFile(this.filePath, 'utf8'); + const lines = content.split('\n').filter(l => l.trim()); + const trimmed = lines.slice(lines.length - this.maxTurns).join('\n') + '\n'; + const tmp = this.filePath + '.tmp'; + await writeFile(tmp, trimmed); + await rename(tmp, this.filePath); + this.lineCount = this.maxTurns; + } catch (err) { + log.warn(`Failed to trim turn log:`, err instanceof Error ? err.message : err); + } + } +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** Generate a unique turn ID. */ +export function generateTurnId(): string { + return randomUUID(); +} diff --git a/src/core/turn-viewer.html b/src/core/turn-viewer.html new file mode 100644 index 0000000..9a6bf1f --- /dev/null +++ b/src/core/turn-viewer.html @@ -0,0 +1,510 @@ + + + + + +Turn Viewer + + + +
+
+

Turn Viewer

+ + + +
Invalid API key
+
+
+ +
+

Turn Viewer

+ __DEFAULT_AGENT__ + + +
+
+ +
+ Trigger +
+ + + + + + +
+ +
+ +
+
+ +
+ + + + + + + + + + + + + +
TimeTriggerChannelEventsDurationInputOutput
+ +
+ +
+
+
+
+
+
+ + + + +
+
+
+ +
+
+
+ +
+
+
+ +
+
+
+ +
+
+
+
+ + + + diff --git a/src/core/turn-viewer.ts b/src/core/turn-viewer.ts new file mode 100644 index 0000000..e2ad610 --- /dev/null +++ b/src/core/turn-viewer.ts @@ -0,0 +1,38 @@ +/** + * Turn viewer SPA — served by the API server at GET /turns. + * + * The HTML lives in turn-viewer.html (same directory). + * getTurnViewerHtml() injects agent metadata via simple token substitution + * so the template file can be edited and linted independently. + */ + +import { readFileSync } from 'node:fs'; +import { fileURLToPath } from 'node:url'; +import { dirname, join } from 'node:path'; + +const __dirname = dirname(fileURLToPath(import.meta.url)); +let TEMPLATE: string; +try { + TEMPLATE = readFileSync(join(__dirname, 'turn-viewer.html'), 'utf8'); +} catch { + throw new Error('turn-viewer.html not found -- did you run npm run build?'); +} + +function escHtml(s: string): string { + return s.replace(/&/g, '&').replace(//g, '>').replace(/"/g, '"'); +} + +export function getTurnViewerHtml(agentNames: string[]): string { + const multiAgent = agentNames.length > 1; + const defaultAgent = agentNames[0] ?? ''; + // Escape to prevent XSS if an agent name contains it + const agentNamesJson = JSON.stringify(agentNames).replace(/<\/script>/gi, '<\\/script>'); + const agentOptions = agentNames.map(n => ``).join(''); + + return TEMPLATE + .replaceAll('__AGENT_NAMES_JSON__', agentNamesJson) + .replaceAll('__DEFAULT_AGENT__', escHtml(defaultAgent)) + .replaceAll('__LABEL_DISPLAY__', multiAgent ? 'none' : 'inline') + .replaceAll('__SELECT_DISPLAY__', multiAgent ? 'inline-block' : 'none') + .replaceAll('__AGENT_OPTIONS__', agentOptions); +} diff --git a/src/core/types.ts b/src/core/types.ts index ae11c05..b54e584 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -192,6 +192,12 @@ export interface BotConfig { sendFileMaxSize?: number; // Max file size in bytes for (default: 50MB) sendFileCleanup?: boolean; // Allow to delete files after send (default: false) + // Logging + logging?: { + turnLogFile?: string; // Path to JSONL file for turn logging (one record per agent turn) + maxTurns?: number; // Max turns to retain in the log file (default: 1000, oldest trimmed) + }; + // Cron cronStorePath?: string; // Resolved cron store path (per-agent in multi-agent mode) diff --git a/src/main.ts b/src/main.ts index 81aa8bf..ce1480b 100644 --- a/src/main.ts +++ b/src/main.ts @@ -259,6 +259,25 @@ async function main() { const isMultiAgent = agents.length > 1; log.info(`${agents.length} agent(s) configured: ${agents.map(a => a.name).join(', ')}`); + // Validate agent names are unique + const agentNames = agents.map(a => a.name); + const duplicateAgentName = agentNames.find((n, i) => agentNames.indexOf(n) !== i); + if (duplicateAgentName) { + log.error(`Multiple agents share the same name: "${duplicateAgentName}". Each agent must have a unique name.`); + process.exit(1); + } + + // Validate no two agents share the same turnLogFile + const turnLogFilePaths = agents + .map(a => (a.features?.logging ?? yamlConfig.features?.logging)?.turnLogFile) + .filter((p): p is string => !!p) + .map(p => resolve(p)); + const duplicateTurnLog = turnLogFilePaths.find((p, i) => turnLogFilePaths.indexOf(p) !== i); + if (duplicateTurnLog) { + log.error(`Multiple agents share the same turnLogFile: "${duplicateTurnLog}". Each agent must use a unique log file path.`); + process.exit(1); + } + // Validate at least one agent has channels const totalChannels = agents.reduce((sum, a) => sum + Object.keys(a.channels).length, 0); if (totalChannels === 0) { @@ -351,6 +370,7 @@ async function main() { maxSessions: agentConfig.conversations?.maxSessions, reuseSession: agentConfig.conversations?.reuseSession, redaction: agentConfig.security?.redaction, + logging: agentConfig.features?.logging ?? yamlConfig.features?.logging, cronStorePath, skills: { cronEnabled: agentConfig.features?.cron ?? globalConfig.cronEnabled, @@ -530,11 +550,17 @@ async function main() { const apiPort = parseInt(process.env.PORT || '8080', 10); const apiHost = process.env.API_HOST || (isContainerDeploy ? '0.0.0.0' : undefined); // Container platforms need 0.0.0.0 for health checks const apiCorsOrigin = process.env.API_CORS_ORIGIN; // undefined = same-origin only + const turnLogFiles: Record = {}; + for (const a of agents) { + const logging = a.features?.logging ?? yamlConfig.features?.logging; + if (logging?.turnLogFile) turnLogFiles[a.name] = logging.turnLogFile; + } const apiServer = createApiServer(gateway, { port: apiPort, apiKey: apiKey, host: apiHost, corsOrigin: apiCorsOrigin, + turnLogFiles: Object.keys(turnLogFiles).length > 0 ? turnLogFiles : undefined, stores: agentStores, agentChannels: agentChannelMap, sessionInvalidators,