feat: add POST /api/v1/chat endpoint for agent messaging (#242)
* feat: add POST /api/v1/chat endpoint for sending messages to agents Adds an HTTP endpoint that accepts a JSON message, sends it to the lettabot agent via sendToAgent(), and returns the agent's response. This enables external systems (e.g. server-side tools in other agents) to communicate with lettabot programmatically. - Add ChatRequest/ChatResponse types - Add AgentRouter interface extending MessageDeliverer with sendToAgent() - Implement AgentRouter on LettaGateway with agent-name routing - Add POST /api/v1/chat route with auth, validation, and JSON body parsing Written by Cameron ◯ Letta Code "The most profound technologies are those that disappear." -- Mark Weiser * feat: add SSE streaming support to /api/v1/chat endpoint When the client sends Accept: text/event-stream, the chat endpoint streams SDK messages as SSE events instead of waiting for the full response. Each event is a JSON StreamMsg (assistant, tool_call, tool_result, reasoning, result). The result event signals end-of-stream. - Export StreamMsg type from bot.ts - Add streamToAgent() to AgentSession interface and LettaBot - Wire streamToAgent() through LettaGateway with agent-name routing - Add SSE path in chat route (Accept header content negotiation) - Handle client disconnect mid-stream gracefully Written by Cameron ◯ Letta Code "Any sufficiently advanced technology is indistinguishable from magic." -- Arthur C. Clarke * test+docs: add chat endpoint tests and API documentation - 10 tests for POST /api/v1/chat: auth, validation, sync response, agent routing, SSE streaming, stream error handling - 6 tests for gateway sendToAgent/streamToAgent routing - Fix timingSafeEqual crash on mismatched key lengths (return 401, not 500) - Document chat endpoint in configuration.md with sync and SSE examples - Add Chat API link to docs/README.md index Written by Cameron ◯ Letta Code "First, solve the problem. Then, write the code." -- John Johnson
This commit is contained in:
@@ -9,6 +9,7 @@ LettaBot is a multi-channel AI assistant powered by [Letta](https://letta.com) t
|
||||
- [Configuration Reference](./configuration.md) - All config options
|
||||
- [Commands Reference](./commands.md) - Bot commands reference
|
||||
- [CLI Tools](./cli-tools.md) - Agent/operator CLI tools
|
||||
- [Chat API](./configuration.md#chat-endpoint) - HTTP endpoint for programmatic agent access
|
||||
- [Scheduling Tasks](./cron-setup.md) - Cron jobs and heartbeats
|
||||
- [Gmail Pub/Sub](./gmail-pubsub.md) - Email notifications integration
|
||||
- [Railway Deployment](./railway-deploy.md) - Deploy to Railway
|
||||
|
||||
@@ -444,7 +444,7 @@ Attachments are stored in `/tmp/lettabot/attachments/`.
|
||||
|
||||
## API Server Configuration
|
||||
|
||||
The built-in API server provides health checks and CLI messaging endpoints.
|
||||
The built-in API server provides health checks, CLI messaging, and a chat endpoint for programmatic agent access.
|
||||
|
||||
```yaml
|
||||
api:
|
||||
@@ -459,6 +459,74 @@ api:
|
||||
| `api.host` | string | `127.0.0.1` | Bind address. Use `0.0.0.0` for Docker/Railway |
|
||||
| `api.corsOrigin` | string | _(none)_ | CORS origin header for cross-origin access |
|
||||
|
||||
### Chat Endpoint
|
||||
|
||||
Send messages to a lettabot agent and get responses via HTTP. Useful for integrating
|
||||
with other services, server-side tools, webhooks, or custom frontends.
|
||||
|
||||
**Synchronous** (default):
|
||||
|
||||
```bash
|
||||
curl -X POST http://localhost:8080/api/v1/chat \
|
||||
-H "Content-Type: application/json" \
|
||||
-H "X-Api-Key: YOUR_API_KEY" \
|
||||
-d '{"message": "What is on my todo list?"}'
|
||||
```
|
||||
|
||||
Response:
|
||||
|
||||
```json
|
||||
{
|
||||
"success": true,
|
||||
"response": "Here are your current tasks...",
|
||||
"agentName": "LettaBot"
|
||||
}
|
||||
```
|
||||
|
||||
**Streaming** (SSE):
|
||||
|
||||
```bash
|
||||
curl -N -X POST http://localhost:8080/api/v1/chat \
|
||||
-H "Content-Type: application/json" \
|
||||
-H "Accept: text/event-stream" \
|
||||
-H "X-Api-Key: YOUR_API_KEY" \
|
||||
-d '{"message": "What is on my todo list?"}'
|
||||
```
|
||||
|
||||
Each SSE event is a JSON object with a `type` field:
|
||||
|
||||
| Event type | Description |
|
||||
|------------|-------------|
|
||||
| `reasoning` | Model thinking/reasoning tokens |
|
||||
| `assistant` | Response text (may arrive in multiple chunks) |
|
||||
| `tool_call` | Agent is calling a tool (`toolName`, `toolCallId`) |
|
||||
| `tool_result` | Tool execution result (`content`, `isError`) |
|
||||
| `result` | End of stream (`success`, optional `error`) |
|
||||
|
||||
Example stream:
|
||||
|
||||
```
|
||||
data: {"type":"reasoning","content":"Let me check..."}
|
||||
|
||||
data: {"type":"assistant","content":"Here are your "}
|
||||
|
||||
data: {"type":"assistant","content":"current tasks."}
|
||||
|
||||
data: {"type":"result","success":true}
|
||||
|
||||
```
|
||||
|
||||
**Request fields:**
|
||||
|
||||
| Field | Type | Required | Description |
|
||||
|-------|------|----------|-------------|
|
||||
| `message` | string | Yes | The message to send to the agent |
|
||||
| `agent` | string | No | Agent name (defaults to first configured agent) |
|
||||
|
||||
**Authentication:** All requests require the `X-Api-Key` header. The API key is auto-generated on first run and saved to `lettabot-api.json`, or set via `LETTABOT_API_KEY` env var.
|
||||
|
||||
**Multi-agent:** In multi-agent configs, use the `agent` field to target a specific agent by name. Omit it to use the first agent. A 404 is returned if the agent name doesn't match any configured agent.
|
||||
|
||||
## Environment Variables
|
||||
|
||||
Environment variables override config file values:
|
||||
|
||||
@@ -75,8 +75,8 @@ export function validateApiKey(headers: IncomingHttpHeaders, expectedKey: string
|
||||
}
|
||||
|
||||
// Use constant-time comparison to prevent timing attacks
|
||||
return crypto.timingSafeEqual(
|
||||
Buffer.from(providedKey),
|
||||
Buffer.from(expectedKey)
|
||||
);
|
||||
const a = Buffer.from(providedKey);
|
||||
const b = Buffer.from(expectedKey);
|
||||
if (a.length !== b.length) return false;
|
||||
return crypto.timingSafeEqual(a, b);
|
||||
}
|
||||
|
||||
212
src/api/server.test.ts
Normal file
212
src/api/server.test.ts
Normal file
@@ -0,0 +1,212 @@
|
||||
import { describe, it, expect, vi, beforeAll, afterAll } from 'vitest';
|
||||
import * as http from 'http';
|
||||
import { createApiServer } from './server.js';
|
||||
import type { AgentRouter } from '../core/interfaces.js';
|
||||
|
||||
const TEST_API_KEY = 'test-key-12345';
|
||||
const TEST_PORT = 0; // Let OS assign a free port
|
||||
|
||||
function createMockRouter(overrides: Partial<AgentRouter> = {}): AgentRouter {
|
||||
return {
|
||||
deliverToChannel: vi.fn().mockResolvedValue('msg-1'),
|
||||
sendToAgent: vi.fn().mockResolvedValue('Agent says hello'),
|
||||
streamToAgent: vi.fn().mockReturnValue((async function* () {
|
||||
yield { type: 'reasoning', content: 'thinking...' };
|
||||
yield { type: 'assistant', content: 'Hello ' };
|
||||
yield { type: 'assistant', content: 'world' };
|
||||
yield { type: 'result', success: true };
|
||||
})()),
|
||||
getAgentNames: vi.fn().mockReturnValue(['LettaBot']),
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
function getPort(server: http.Server): number {
|
||||
const addr = server.address();
|
||||
if (typeof addr === 'object' && addr) return addr.port;
|
||||
throw new Error('Server not listening');
|
||||
}
|
||||
|
||||
async function request(
|
||||
port: number,
|
||||
method: string,
|
||||
path: string,
|
||||
body?: string,
|
||||
headers: Record<string, string> = {},
|
||||
): Promise<{ status: number; headers: http.IncomingHttpHeaders; body: string }> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const req = http.request({ hostname: '127.0.0.1', port, method, path, headers }, (res) => {
|
||||
let data = '';
|
||||
res.on('data', (chunk) => { data += chunk; });
|
||||
res.on('end', () => resolve({ status: res.statusCode!, headers: res.headers, body: data }));
|
||||
});
|
||||
req.on('error', reject);
|
||||
if (body) req.write(body);
|
||||
req.end();
|
||||
});
|
||||
}
|
||||
|
||||
describe('POST /api/v1/chat', () => {
|
||||
let server: http.Server;
|
||||
let port: number;
|
||||
let router: AgentRouter;
|
||||
|
||||
beforeAll(async () => {
|
||||
router = createMockRouter();
|
||||
server = createApiServer(router, {
|
||||
port: TEST_PORT,
|
||||
apiKey: TEST_API_KEY,
|
||||
host: '127.0.0.1',
|
||||
});
|
||||
// Wait for server to start listening
|
||||
await new Promise<void>((resolve) => {
|
||||
if (server.listening) { resolve(); return; }
|
||||
server.once('listening', resolve);
|
||||
});
|
||||
port = getPort(server);
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await new Promise<void>((resolve) => server.close(() => resolve()));
|
||||
});
|
||||
|
||||
it('returns 401 without api key', async () => {
|
||||
const res = await request(port, 'POST', '/api/v1/chat', '{"message":"hi"}', {
|
||||
'content-type': 'application/json',
|
||||
});
|
||||
expect(res.status).toBe(401);
|
||||
});
|
||||
|
||||
it('returns 401 with wrong api key', async () => {
|
||||
const res = await request(port, 'POST', '/api/v1/chat', '{"message":"hi"}', {
|
||||
'content-type': 'application/json',
|
||||
'x-api-key': 'wrong-key',
|
||||
});
|
||||
expect(res.status).toBe(401);
|
||||
});
|
||||
|
||||
it('returns 400 without Content-Type application/json', async () => {
|
||||
const res = await request(port, 'POST', '/api/v1/chat', 'hello', {
|
||||
'content-type': 'text/plain',
|
||||
'x-api-key': TEST_API_KEY,
|
||||
});
|
||||
expect(res.status).toBe(400);
|
||||
expect(JSON.parse(res.body).error).toContain('application/json');
|
||||
});
|
||||
|
||||
it('returns 400 with invalid JSON', async () => {
|
||||
const res = await request(port, 'POST', '/api/v1/chat', 'not json', {
|
||||
'content-type': 'application/json',
|
||||
'x-api-key': TEST_API_KEY,
|
||||
});
|
||||
expect(res.status).toBe(400);
|
||||
expect(JSON.parse(res.body).error).toContain('Invalid JSON');
|
||||
});
|
||||
|
||||
it('returns 400 without message field', async () => {
|
||||
const res = await request(port, 'POST', '/api/v1/chat', '{"agent":"LettaBot"}', {
|
||||
'content-type': 'application/json',
|
||||
'x-api-key': TEST_API_KEY,
|
||||
});
|
||||
expect(res.status).toBe(400);
|
||||
expect(JSON.parse(res.body).error).toContain('message');
|
||||
});
|
||||
|
||||
it('returns 404 for unknown agent name', async () => {
|
||||
const res = await request(port, 'POST', '/api/v1/chat', '{"message":"hi","agent":"unknown"}', {
|
||||
'content-type': 'application/json',
|
||||
'x-api-key': TEST_API_KEY,
|
||||
});
|
||||
expect(res.status).toBe(404);
|
||||
expect(JSON.parse(res.body).error).toContain('Agent not found');
|
||||
expect(JSON.parse(res.body).error).toContain('LettaBot');
|
||||
});
|
||||
|
||||
it('returns sync JSON response by default', async () => {
|
||||
const res = await request(port, 'POST', '/api/v1/chat', '{"message":"Hello"}', {
|
||||
'content-type': 'application/json',
|
||||
'x-api-key': TEST_API_KEY,
|
||||
});
|
||||
expect(res.status).toBe(200);
|
||||
const parsed = JSON.parse(res.body);
|
||||
expect(parsed.success).toBe(true);
|
||||
expect(parsed.response).toBe('Agent says hello');
|
||||
expect(parsed.agentName).toBe('LettaBot');
|
||||
expect(router.sendToAgent).toHaveBeenCalledWith(
|
||||
undefined,
|
||||
'Hello',
|
||||
{ type: 'webhook', outputMode: 'silent' },
|
||||
);
|
||||
});
|
||||
|
||||
it('routes to named agent', async () => {
|
||||
const res = await request(port, 'POST', '/api/v1/chat', '{"message":"Hi","agent":"LettaBot"}', {
|
||||
'content-type': 'application/json',
|
||||
'x-api-key': TEST_API_KEY,
|
||||
});
|
||||
expect(res.status).toBe(200);
|
||||
expect(router.sendToAgent).toHaveBeenCalledWith(
|
||||
'LettaBot',
|
||||
'Hi',
|
||||
{ type: 'webhook', outputMode: 'silent' },
|
||||
);
|
||||
});
|
||||
|
||||
it('returns SSE stream when Accept: text/event-stream', async () => {
|
||||
// Need a fresh mock since the generator is consumed once
|
||||
(router as any).streamToAgent = vi.fn().mockReturnValue((async function* () {
|
||||
yield { type: 'reasoning', content: 'thinking...' };
|
||||
yield { type: 'assistant', content: 'Hello ' };
|
||||
yield { type: 'assistant', content: 'world' };
|
||||
yield { type: 'result', success: true };
|
||||
})());
|
||||
|
||||
const res = await request(port, 'POST', '/api/v1/chat', '{"message":"Stream test"}', {
|
||||
'content-type': 'application/json',
|
||||
'x-api-key': TEST_API_KEY,
|
||||
'accept': 'text/event-stream',
|
||||
});
|
||||
expect(res.status).toBe(200);
|
||||
expect(res.headers['content-type']).toBe('text/event-stream');
|
||||
expect(res.headers['cache-control']).toBe('no-cache');
|
||||
|
||||
// Parse SSE events
|
||||
const events = res.body
|
||||
.split('\n\n')
|
||||
.filter((line) => line.startsWith('data: '))
|
||||
.map((line) => JSON.parse(line.replace('data: ', '')));
|
||||
|
||||
expect(events).toHaveLength(4);
|
||||
expect(events[0].type).toBe('reasoning');
|
||||
expect(events[1].type).toBe('assistant');
|
||||
expect(events[1].content).toBe('Hello ');
|
||||
expect(events[2].type).toBe('assistant');
|
||||
expect(events[2].content).toBe('world');
|
||||
expect(events[3].type).toBe('result');
|
||||
expect(events[3].success).toBe(true);
|
||||
});
|
||||
|
||||
it('handles stream errors gracefully', async () => {
|
||||
(router as any).streamToAgent = vi.fn().mockReturnValue((async function* () {
|
||||
yield { type: 'assistant', content: 'partial' };
|
||||
throw new Error('connection lost');
|
||||
})());
|
||||
|
||||
const res = await request(port, 'POST', '/api/v1/chat', '{"message":"Error test"}', {
|
||||
'content-type': 'application/json',
|
||||
'x-api-key': TEST_API_KEY,
|
||||
'accept': 'text/event-stream',
|
||||
});
|
||||
expect(res.status).toBe(200);
|
||||
|
||||
const events = res.body
|
||||
.split('\n\n')
|
||||
.filter((line) => line.startsWith('data: '))
|
||||
.map((line) => JSON.parse(line.replace('data: ', '')));
|
||||
|
||||
// Should have the partial chunk + error event
|
||||
expect(events.find((e: any) => e.type === 'assistant')).toBeTruthy();
|
||||
expect(events.find((e: any) => e.type === 'error')).toBeTruthy();
|
||||
expect(events.find((e: any) => e.type === 'error').error).toBe('connection lost');
|
||||
});
|
||||
});
|
||||
@@ -6,9 +6,9 @@
|
||||
import * as http from 'http';
|
||||
import * as fs from 'fs';
|
||||
import { validateApiKey } from './auth.js';
|
||||
import type { SendMessageRequest, SendMessageResponse, SendFileResponse } from './types.js';
|
||||
import type { SendMessageRequest, SendMessageResponse, SendFileResponse, ChatRequest, ChatResponse } from './types.js';
|
||||
import { parseMultipart } from './multipart.js';
|
||||
import type { MessageDeliverer } from '../core/interfaces.js';
|
||||
import type { AgentRouter } from '../core/interfaces.js';
|
||||
import type { ChannelId } from '../core/types.js';
|
||||
|
||||
const VALID_CHANNELS: ChannelId[] = ['telegram', 'slack', 'discord', 'whatsapp', 'signal'];
|
||||
@@ -26,7 +26,7 @@ interface ServerOptions {
|
||||
/**
|
||||
* Create and start the HTTP API server
|
||||
*/
|
||||
export function createApiServer(deliverer: MessageDeliverer, options: ServerOptions): http.Server {
|
||||
export function createApiServer(deliverer: AgentRouter, options: ServerOptions): http.Server {
|
||||
const server = http.createServer(async (req, res) => {
|
||||
// Set CORS headers (configurable origin, defaults to same-origin for security)
|
||||
const corsOrigin = options.corsOrigin || req.headers.origin || 'null';
|
||||
@@ -121,6 +121,101 @@ export function createApiServer(deliverer: MessageDeliverer, options: ServerOpti
|
||||
return;
|
||||
}
|
||||
|
||||
// Route: POST /api/v1/chat (send a message to the agent, get response)
|
||||
if (req.url === '/api/v1/chat' && req.method === 'POST') {
|
||||
try {
|
||||
if (!validateApiKey(req.headers, options.apiKey)) {
|
||||
sendError(res, 401, 'Unauthorized');
|
||||
return;
|
||||
}
|
||||
|
||||
const contentType = req.headers['content-type'] || '';
|
||||
if (!contentType.includes('application/json')) {
|
||||
sendError(res, 400, 'Content-Type must be application/json');
|
||||
return;
|
||||
}
|
||||
|
||||
const body = await readBody(req, MAX_BODY_SIZE);
|
||||
let chatReq: ChatRequest;
|
||||
try {
|
||||
chatReq = JSON.parse(body);
|
||||
} catch {
|
||||
sendError(res, 400, 'Invalid JSON body');
|
||||
return;
|
||||
}
|
||||
|
||||
if (!chatReq.message || typeof chatReq.message !== 'string') {
|
||||
sendError(res, 400, 'Missing required field: message');
|
||||
return;
|
||||
}
|
||||
|
||||
if (chatReq.message.length > MAX_TEXT_LENGTH) {
|
||||
sendError(res, 400, `Message too long (max ${MAX_TEXT_LENGTH} chars)`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Resolve agent name (defaults to first agent)
|
||||
const agentName = chatReq.agent;
|
||||
const agentNames = deliverer.getAgentNames();
|
||||
const resolvedName = agentName || agentNames[0];
|
||||
|
||||
if (agentName && !agentNames.includes(agentName)) {
|
||||
sendError(res, 404, `Agent not found: ${agentName}. Available: ${agentNames.join(', ')}`);
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(`[API] Chat request for agent "${resolvedName}": ${chatReq.message.slice(0, 100)}...`);
|
||||
|
||||
const context = { type: 'webhook' as const, outputMode: 'silent' as const };
|
||||
const wantsStream = (req.headers.accept || '').includes('text/event-stream');
|
||||
|
||||
if (wantsStream) {
|
||||
// SSE streaming: forward SDK stream chunks as events
|
||||
res.writeHead(200, {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
'Connection': 'keep-alive',
|
||||
});
|
||||
|
||||
let clientDisconnected = false;
|
||||
req.on('close', () => { clientDisconnected = true; });
|
||||
|
||||
try {
|
||||
for await (const msg of deliverer.streamToAgent(agentName, chatReq.message, context)) {
|
||||
if (clientDisconnected) break;
|
||||
res.write(`data: ${JSON.stringify(msg)}\n\n`);
|
||||
if (msg.type === 'result') break;
|
||||
}
|
||||
} catch (streamError: any) {
|
||||
if (!clientDisconnected) {
|
||||
res.write(`data: ${JSON.stringify({ type: 'error', error: streamError.message })}\n\n`);
|
||||
}
|
||||
}
|
||||
res.end();
|
||||
} else {
|
||||
// Sync: wait for full response
|
||||
const response = await deliverer.sendToAgent(agentName, chatReq.message, context);
|
||||
|
||||
const chatRes: ChatResponse = {
|
||||
success: true,
|
||||
response,
|
||||
agentName: resolvedName,
|
||||
};
|
||||
res.writeHead(200, { 'Content-Type': 'application/json' });
|
||||
res.end(JSON.stringify(chatRes));
|
||||
}
|
||||
} catch (error: any) {
|
||||
console.error('[API] Chat error:', error);
|
||||
const chatRes: ChatResponse = {
|
||||
success: false,
|
||||
error: error.message || 'Internal server error',
|
||||
};
|
||||
res.writeHead(500, { 'Content-Type': 'application/json' });
|
||||
res.end(JSON.stringify(chatRes));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Route: 404 Not Found
|
||||
sendError(res, 404, 'Not found');
|
||||
});
|
||||
|
||||
@@ -31,3 +31,18 @@ export interface SendFileResponse {
|
||||
error?: string;
|
||||
field?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* POST /api/v1/chat - Send a message to the agent
|
||||
*/
|
||||
export interface ChatRequest {
|
||||
message: string;
|
||||
agent?: string; // Agent name, defaults to first configured agent
|
||||
}
|
||||
|
||||
export interface ChatResponse {
|
||||
success: boolean;
|
||||
response?: string;
|
||||
agentName?: string;
|
||||
error?: string;
|
||||
}
|
||||
|
||||
@@ -98,7 +98,7 @@ async function buildMultimodalMessage(
|
||||
// ---------------------------------------------------------------------------
|
||||
// Stream message type with toolCallId/uuid for dedup
|
||||
// ---------------------------------------------------------------------------
|
||||
interface StreamMsg {
|
||||
export interface StreamMsg {
|
||||
type: string;
|
||||
content?: string;
|
||||
toolCallId?: string;
|
||||
@@ -846,6 +846,35 @@ export class LettaBot implements AgentSession {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stream a message to the agent, yielding chunks as they arrive.
|
||||
* Same lifecycle as sendToAgent() but yields StreamMsg instead of accumulating.
|
||||
*/
|
||||
async *streamToAgent(
|
||||
text: string,
|
||||
_context?: TriggerContext
|
||||
): AsyncGenerator<StreamMsg> {
|
||||
// Serialize with message queue to prevent 409 conflicts
|
||||
while (this.processing) {
|
||||
await new Promise(resolve => setTimeout(resolve, 1000));
|
||||
}
|
||||
|
||||
this.processing = true;
|
||||
|
||||
try {
|
||||
const { session, stream } = await this.runSession(text);
|
||||
|
||||
try {
|
||||
yield* stream();
|
||||
} finally {
|
||||
session.close();
|
||||
}
|
||||
} finally {
|
||||
this.processing = false;
|
||||
this.processQueue();
|
||||
}
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Channel delivery + status
|
||||
// =========================================================================
|
||||
|
||||
@@ -10,6 +10,7 @@ function createMockSession(channels: string[] = ['telegram']): AgentSession {
|
||||
start: vi.fn().mockResolvedValue(undefined),
|
||||
stop: vi.fn().mockResolvedValue(undefined),
|
||||
sendToAgent: vi.fn().mockResolvedValue('response'),
|
||||
streamToAgent: vi.fn().mockReturnValue((async function* () { yield { type: 'result', success: true }; })()),
|
||||
deliverToChannel: vi.fn().mockResolvedValue('msg-123'),
|
||||
getStatus: vi.fn().mockReturnValue({ agentId: 'agent-123', channels }),
|
||||
setAgentId: vi.fn(),
|
||||
@@ -89,4 +90,65 @@ describe('LettaGateway', () => {
|
||||
await gateway.start();
|
||||
expect(good.start).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
describe('sendToAgent', () => {
|
||||
it('routes by agent name', async () => {
|
||||
const s1 = createMockSession();
|
||||
const s2 = createMockSession();
|
||||
gateway.addAgent('alpha', s1);
|
||||
gateway.addAgent('beta', s2);
|
||||
|
||||
await gateway.sendToAgent('beta', 'hello', { type: 'webhook', outputMode: 'silent' });
|
||||
expect(s2.sendToAgent).toHaveBeenCalledWith('hello', { type: 'webhook', outputMode: 'silent' });
|
||||
expect(s1.sendToAgent).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('defaults to first agent when name is undefined', async () => {
|
||||
const s1 = createMockSession();
|
||||
gateway.addAgent('only', s1);
|
||||
|
||||
await gateway.sendToAgent(undefined, 'hi');
|
||||
expect(s1.sendToAgent).toHaveBeenCalledWith('hi', undefined);
|
||||
});
|
||||
|
||||
it('throws when agent name not found', async () => {
|
||||
gateway.addAgent('a', createMockSession());
|
||||
await expect(gateway.sendToAgent('nope', 'hi')).rejects.toThrow('Agent not found: nope');
|
||||
});
|
||||
|
||||
it('throws when no agents configured', async () => {
|
||||
await expect(gateway.sendToAgent(undefined, 'hi')).rejects.toThrow('No agents configured');
|
||||
});
|
||||
});
|
||||
|
||||
describe('streamToAgent', () => {
|
||||
it('routes by agent name and yields stream chunks', async () => {
|
||||
const chunks = [
|
||||
{ type: 'assistant', content: 'hello' },
|
||||
{ type: 'result', success: true },
|
||||
];
|
||||
const s1 = createMockSession();
|
||||
(s1.streamToAgent as any) = async function* () { for (const c of chunks) yield c; };
|
||||
gateway.addAgent('bot', s1);
|
||||
|
||||
const collected = [];
|
||||
for await (const msg of gateway.streamToAgent('bot', 'test')) {
|
||||
collected.push(msg);
|
||||
}
|
||||
expect(collected).toEqual(chunks);
|
||||
});
|
||||
|
||||
it('defaults to first agent when name is undefined', async () => {
|
||||
const s1 = createMockSession();
|
||||
(s1.streamToAgent as any) = async function* () { yield { type: 'result', success: true }; };
|
||||
gateway.addAgent('default', s1);
|
||||
|
||||
const collected = [];
|
||||
for await (const msg of gateway.streamToAgent(undefined, 'test')) {
|
||||
collected.push(msg);
|
||||
}
|
||||
expect(collected).toHaveLength(1);
|
||||
expect(collected[0].type).toBe('result');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -7,9 +7,11 @@
|
||||
* See: docs/multi-agent-architecture.md
|
||||
*/
|
||||
|
||||
import type { AgentSession, MessageDeliverer } from './interfaces.js';
|
||||
import type { AgentSession, AgentRouter } from './interfaces.js';
|
||||
import type { TriggerContext } from './types.js';
|
||||
import type { StreamMsg } from './bot.js';
|
||||
|
||||
export class LettaGateway implements MessageDeliverer {
|
||||
export class LettaGateway implements AgentRouter {
|
||||
private agents: Map<string, AgentSession> = new Map();
|
||||
|
||||
/**
|
||||
@@ -71,6 +73,37 @@ export class LettaGateway implements MessageDeliverer {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a message to a named agent and return the response.
|
||||
* If no name is given, routes to the first registered agent.
|
||||
*/
|
||||
async sendToAgent(agentName: string | undefined, text: string, context?: TriggerContext): Promise<string> {
|
||||
const agent = this.resolveAgent(agentName);
|
||||
return agent.sendToAgent(text, context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stream a message to a named agent, yielding chunks as they arrive.
|
||||
*/
|
||||
async *streamToAgent(agentName: string | undefined, text: string, context?: TriggerContext): AsyncGenerator<StreamMsg> {
|
||||
const agent = this.resolveAgent(agentName);
|
||||
yield* agent.streamToAgent(text, context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve an agent by name, defaulting to the first registered agent.
|
||||
*/
|
||||
private resolveAgent(name?: string): AgentSession {
|
||||
if (!name) {
|
||||
const first = this.agents.values().next().value;
|
||||
if (!first) throw new Error('No agents configured');
|
||||
return first;
|
||||
}
|
||||
const agent = this.agents.get(name);
|
||||
if (!agent) throw new Error(`Agent not found: ${name}`);
|
||||
return agent;
|
||||
}
|
||||
|
||||
/**
|
||||
* Deliver a message to a channel.
|
||||
* Finds the agent that owns the channel and delegates.
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
import type { ChannelAdapter } from '../channels/types.js';
|
||||
import type { InboundMessage, TriggerContext } from './types.js';
|
||||
import type { GroupBatcher } from './group-batcher.js';
|
||||
import type { StreamMsg } from './bot.js';
|
||||
|
||||
export interface AgentSession {
|
||||
/** Register a channel adapter */
|
||||
@@ -29,6 +30,9 @@ export interface AgentSession {
|
||||
/** Send a message to the agent (used by cron, heartbeat, polling) */
|
||||
sendToAgent(text: string, context?: TriggerContext): Promise<string>;
|
||||
|
||||
/** Stream a message to the agent, yielding chunks as they arrive */
|
||||
streamToAgent(text: string, context?: TriggerContext): AsyncGenerator<StreamMsg>;
|
||||
|
||||
/** Deliver a message/file to a specific channel */
|
||||
deliverToChannel(channelId: string, chatId: string, options: {
|
||||
text?: string;
|
||||
@@ -66,3 +70,17 @@ export interface MessageDeliverer {
|
||||
kind?: 'image' | 'file';
|
||||
}): Promise<string | undefined>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extended interface for the API server.
|
||||
* Supports both outbound delivery (to channels) and inbound chat (to agents).
|
||||
* Satisfied by LettaGateway.
|
||||
*/
|
||||
export interface AgentRouter extends MessageDeliverer {
|
||||
/** Send a message to a named agent and return the response text */
|
||||
sendToAgent(agentName: string | undefined, text: string, context?: TriggerContext): Promise<string>;
|
||||
/** Stream a message to a named agent, yielding chunks as they arrive */
|
||||
streamToAgent(agentName: string | undefined, text: string, context?: TriggerContext): AsyncGenerator<StreamMsg>;
|
||||
/** Get all registered agent names */
|
||||
getAgentNames(): string[];
|
||||
}
|
||||
|
||||
@@ -47,6 +47,7 @@ function createMockBot(): AgentSession {
|
||||
start: vi.fn(),
|
||||
stop: vi.fn(),
|
||||
sendToAgent: vi.fn().mockResolvedValue('ok'),
|
||||
streamToAgent: vi.fn().mockReturnValue((async function* () { yield { type: 'result', success: true }; })()),
|
||||
deliverToChannel: vi.fn(),
|
||||
getStatus: vi.fn().mockReturnValue({ agentId: 'test', channels: [] }),
|
||||
setAgentId: vi.fn(),
|
||||
|
||||
Reference in New Issue
Block a user