# Redis Task Queue Architecture for Letta Community ADE ## Executive Summary This document outlines the architecture for replacing the in-memory `QueueRuntime` with a Redis-backed persistent task queue. The design prioritizes durability, horizontal scalability, and reliable task execution while maintaining compatibility with the existing Task tool and subagent spawning workflows. **Key Decisions:** - Use **Redis Streams** (not Sorted Sets) for the primary task queue to leverage consumer groups and at-least-once delivery guarantees - Hybrid approach: Streams for queue semantics, Sorted Sets for scheduling/delays, Hashes for task state - Stateless workers with heartbeat-based liveness detection - Exponential backoff with jitter for retry logic --- ## 1. Redis Data Structures ### 1.1 Primary Queue: Redis Stream ``` Key: ade:queue:tasks Type: Stream Purpose: Main task ingestion and distribution ``` **Why Streams over Sorted Sets?** | Feature | Sorted Sets | Redis Streams | |---------|-------------|---------------| | Ordering | Score-based (can have ties) | Strict temporal (millisecond ID) | | Consumer Groups | Manual implementation | Built-in XREADGROUP | | Delivery Semantics | At-most-once (easy) / At-least-once (complex) | At-least-once with ACK | | Pending Tracking | Manual | Built-in XPENDING | | Claim/Retry | Custom Lua scripts | Built-in XCLAIM/XAUTOCLAIM | | Message Visibility | Immediate to all | Consumer-group isolated | Streams provide the exact semantics needed for reliable task processing without custom Lua scripting. **Stream Entries:** ``` XADD ade:queue:tasks * taskId payload priority ``` ### 1.2 Delayed Tasks: Sorted Set ``` Key: ade:queue:delayed Type: Sorted Set (ZSET) Score: scheduled execution timestamp (ms) Member: taskId ``` Used for: - Tasks with explicit `runAfter` timestamps - Retry scheduling with exponential backoff - Rate-limited task release ### 1.3 Task State Storage: Redis Hash ``` Key: ade:task:{taskId} Type: Hash Fields: - id: string (UUID v4) - status: pending|running|completed|failed - payload: JSON (task arguments) - createdAt: timestamp (ms) - startedAt: timestamp (ms) - completedAt: timestamp (ms) - workerId: string (nullable) - attemptCount: integer - maxAttempts: integer (default: 3) - error: string (last error message) - result: JSON (completed task result) - parentTaskId: string (nullable, for task chains) - subagentId: string (link to subagent state) - priority: integer (0-9, default 5) - kind: message|task_notification|approval_result|overlay_action TTL: 7 days (configurable cleanup for completed/failed tasks) ``` ### 1.4 Worker Registry: Redis Hash + Sorted Set ``` Key: ade:workers:active Type: Hash Fields per worker: - {workerId}: JSON { hostname, pid, startedAt, lastHeartbeat, version } Key: ade:workers:heartbeat Type: Sorted Set Score: last heartbeat timestamp Member: workerId ``` ### 1.5 Consumer Group State ``` Stream Consumer Group: ade:queue:tasks Group Name: ade-workers Consumer Name: {workerId} (unique per process) ``` Redis Streams automatically track: - Pending messages per consumer (XPENDING) - Delivery count per message - Idle time since last read --- ## 2. Task Entity Schema ### 2.1 TypeScript Interface ```typescript // src/queue/redis/types.ts export type TaskStatus = | "pending" // Enqueued, not yet claimed | "running" // Claimed by worker, processing | "completed" // Successfully finished | "failed" // Exhausted all retries | "cancelled"; // Explicitly cancelled export type TaskKind = | "message" | "task_notification" | "approval_result" | "overlay_action"; export interface TaskPayload { // Task identification id: string; // UUID v4 kind: TaskKind; // Execution context agentId?: string; conversationId?: string; clientMessageId?: string; // Content (varies by kind) content?: unknown; // For "message" kind text?: string; // For notification/approval/overlay // Subagent execution params (for task_notification) subagentType?: string; prompt?: string; model?: string; existingAgentId?: string; existingConversationId?: string; maxTurns?: number; // Scheduling priority: number; // 0-9, lower = higher priority runAfter?: number; // Timestamp ms (for delayed tasks) // Retry configuration maxAttempts: number; backoffMultiplier: number; // Default: 2 maxBackoffMs: number; // Default: 300000 (5 min) // Metadata enqueuedAt: number; source: "user" | "system" | "hook"; } export interface TaskState extends TaskPayload { status: TaskStatus; workerId?: string; attemptCount: number; startedAt?: number; completedAt?: number; error?: string; result?: unknown; // Coalescing support (from QueueRuntime) isCoalescable: boolean; scopeKey?: string; // For grouping coalescable items } ``` ### 2.2 State Transitions ``` ┌─────────────┐ │ PENDING │◄──────────────────┐ │ (queued) │ │ └──────┬──────┘ │ │ claim │ retry ▼ │ (with delay) ┌─────────────┐ │ ┌─────────│ RUNNING │───────────────────┘ │ │ (claimed) │ fail (retryable) │ └──────┬──────┘ complete │ │ fail (final) │ ▼ │ ┌─────────────┐ └────────►│ COMPLETED │ └─────────────┘ │ ┌──────┴──────┐ │ FAILED │ │ (exhausted)│ └─────────────┘ ``` --- ## 3. Worker Pool Registration and Heartbeat ### 3.1 Worker Lifecycle ```typescript // src/queue/redis/worker.ts class TaskWorker { private workerId: string; private redis: RedisClient; private isRunning: boolean = false; private heartbeatInterval?: NodeJS.Timeout; private claimInterval?: NodeJS.Timeout; // Config private readonly HEARTBEAT_INTERVAL_MS = 5000; private readonly HEARTBEAT_TIMEOUT_MS = 30000; private readonly CLAIM_BATCH_SIZE = 10; private readonly PROCESSING_TIMEOUT_MS = 300000; // 5 min async start(): Promise { this.workerId = generateWorkerId(); // {hostname}:{pid}:{uuid} // Register in worker registry await this.redis.hSet("ade:workers:active", this.workerId, JSON.stringify({ hostname: os.hostname(), pid: process.pid, startedAt: Date.now(), lastHeartbeat: Date.now(), version: process.env.npm_package_version || "unknown" })); // Create consumer in stream group (idempotent) try { await this.redis.xGroupCreate("ade:queue:tasks", "ade-workers", "$", { MKSTREAM: true }); } catch (err) { // Group already exists - ignore } this.isRunning = true; this.startHeartbeat(); this.startClaimLoop(); } async stop(): Promise { this.isRunning = false; clearInterval(this.heartbeatInterval); clearInterval(this.claimInterval); // Release pending tasks back to queue await this.releasePendingTasks(); // Deregister await this.redis.hDel("ade:workers:active", this.workerId); await this.redis.zRem("ade:workers:heartbeat", this.workerId); } private startHeartbeat(): void { this.heartbeatInterval = setInterval(async () => { await this.redis.zAdd("ade:workers:heartbeat", { score: Date.now(), value: this.workerId }); await this.redis.hSet("ade:workers:active", this.workerId, JSON.stringify({ ...currentInfo, lastHeartbeat: Date.now() })); }, this.HEARTBEAT_INTERVAL_MS); } } ``` ### 3.2 Dead Worker Detection ```typescript // src/queue/redis/orchestrator.ts (singleton, per-deployment) class QueueOrchestrator { async detectAndReclaimDeadWorkerTasks(): Promise { const now = Date.now(); const cutoff = now - this.HEARTBEAT_TIMEOUT_MS; // Find dead workers const deadWorkers = await this.redis.zRangeByScore( "ade:workers:heartbeat", "-inf", cutoff ); let reclaimedCount = 0; for (const workerId of deadWorkers) { // Find pending tasks for this worker using XPENDING const pending = await this.redis.xPendingRange( "ade:queue:tasks", "ade-workers", "-", "+ this.CLAIM_BATCH_SIZE ); for (const item of pending) { if (item.consumer === workerId && item.idle > this.PROCESSING_TIMEOUT_MS) { // Use XAUTOCLAIM to atomically claim and retry const [nextId, claimed] = await this.redis.xAutoClaim( "ade:queue:tasks", "ade-workers", "orchestrator", // consumer name for cleanup this.PROCESSING_TIMEOUT_MS, item.id, { COUNT: 1 } ); // Release back to pending by ACKing (removes from pending list) // The orchestrator will re-add to delayed queue for retry await this.redis.xAck("ade:queue:tasks", "ade-workers", item.id); await this.scheduleRetry(item.id); reclaimedCount++; } } // Clean up dead worker registration await this.redis.hDel("ade:workers:active", workerId); await this.redis.zRem("ade:workers:heartbeat", workerId); } return reclaimedCount; } } ``` --- ## 4. Retry Logic with Exponential Backoff ### 4.1 Backoff Calculation ```typescript // src/queue/redis/retry.ts interface RetryConfig { attempt: number; // 0-indexed (0 = first retry) baseDelayMs: number; // Default: 1000 multiplier: number; // Default: 2 maxDelayMs: number; // Default: 300000 (5 min) jitterFactor: number; // Default: 0.1 (10% randomization) } function calculateRetryDelay(config: RetryConfig): number { // Exponential backoff: base * (multiplier ^ attempt) const exponentialDelay = config.baseDelayMs * Math.pow(config.multiplier, config.attempt); // Cap at max const cappedDelay = Math.min(exponentialDelay, config.maxDelayMs); // Add jitter to prevent thundering herd: ±jitterFactor const jitter = cappedDelay * config.jitterFactor * (Math.random() * 2 - 1); return Math.floor(cappedDelay + jitter); } // Examples with defaults: // Attempt 0 (first retry): ~1000ms ±100ms // Attempt 1: ~2000ms ±200ms // Attempt 2: ~4000ms ±400ms // Attempt 3: ~8000ms ±800ms // Attempt 4: ~16000ms ±1600ms // ...up to max 300000ms (5 min) ``` ### 4.2 Retry Flow ```typescript async function handleTaskFailure( taskId: string, error: Error, workerId: string ): Promise { const taskKey = `ade:task:${taskId}`; const task = await redis.hGetAll(taskKey); const attemptCount = parseInt(task.attemptCount) + 1; const maxAttempts = parseInt(task.maxAttempts); if (attemptCount >= maxAttempts) { // Final failure - mark as failed await redis.hSet(taskKey, { status: "failed", error: error.message, completedAt: Date.now(), attemptCount: attemptCount.toString() }); // Publish failure event for observers await redis.publish("ade:events:task-failed", JSON.stringify({ taskId, error: error.message, totalAttempts: attemptCount })); // ACK to remove from pending await redis.xAck("ade:queue:tasks", "ade-workers", taskId); } else { // Schedule retry const delay = calculateRetryDelay({ attempt: attemptCount, baseDelayMs: 1000, multiplier: 2, maxDelayMs: 300000, jitterFactor: 0.1 }); const runAfter = Date.now() + delay; // Update task state await redis.hSet(taskKey, { status: "pending", attemptCount: attemptCount.toString(), error: error.message, workerId: "" // Clear worker assignment }); // Add to delayed queue await redis.zAdd("ade:queue:delayed", { score: runAfter, value: taskId }); // ACK to remove from stream pending await redis.xAck("ade:queue:tasks", "ade-workers", taskId); } } ``` ### 4.3 Delayed Task Promoter ```typescript // Runs periodically (every 1 second) to move due tasks from delayed set to stream async function promoteDelayedTasks(): Promise { const now = Date.now(); // Atomically get and remove due tasks const dueTasks = await redis.zRangeByScore( "ade:queue:delayed", "-inf", now, { LIMIT: { offset: 0, count: 100 } } ); if (dueTasks.length === 0) return 0; // Remove from delayed queue await redis.zRem("ade:queue:delayed", dueTasks); // Re-add to stream for processing for (const taskId of dueTasks) { const task = await redis.hGetAll(`ade:task:${taskId}`); await redis.xAdd("ade:queue:tasks", "*", { taskId, payload: task.payload, priority: task.priority }); } return dueTasks.length; } ``` --- ## 5. Integration with Existing Task.ts ### 5.1 Adapter Pattern ```typescript // src/queue/redis/adapter.ts import { QueueRuntime, QueueItem, DequeuedBatch } from "../queueRuntime"; import { RedisQueue } from "./queue"; /** * Redis-backed implementation of QueueRuntime interface. * Allows drop-in replacement of in-memory queue. */ export class RedisQueueAdapter implements QueueRuntime { private redisQueue: RedisQueue; private localBatchBuffer: Map = new Map(); constructor(redisUrl: string, options?: QueueRuntimeOptions) { this.redisQueue = new RedisQueue(redisUrl, { ...options, onTaskCompleted: this.handleTaskCompleted.bind(this), onTaskFailed: this.handleTaskFailed.bind(this) }); } async enqueue(input: Omit): Promise { // Map QueueItem to TaskPayload const taskId = generateUUID(); const enqueuedAt = Date.now(); const payload: TaskPayload = { id: taskId, kind: input.kind, agentId: input.agentId, conversationId: input.conversationId, clientMessageId: input.clientMessageId, text: (input as any).text, content: (input as any).content, priority: 5, // Default priority maxAttempts: 3, backoffMultiplier: 2, maxBackoffMs: 300000, enqueuedAt, source: "user", isCoalescable: isCoalescable(input.kind) }; const success = await this.redisQueue.enqueue(payload); if (!success) return null; return { ...input, id: taskId, enqueuedAt } as QueueItem; } async tryDequeue(blockedReason: QueueBlockedReason | null): Promise { if (blockedReason !== null) { // Emit blocked event if needed (preserving QueueRuntime behavior) return null; } // Claim batch from Redis const batch = await this.redisQueue.claimBatch({ consumerId: this.workerId, batchSize: this.getCoalescingBatchSize(), coalescingWindowMs: 50 // Small window for coalescing }); if (!batch || batch.length === 0) return null; // Map back to QueueItem format const items: QueueItem[] = batch.map(task => this.mapTaskToQueueItem(task)); return { batchId: generateBatchId(), items, mergedCount: items.length, queueLenAfter: await this.redisQueue.getQueueLength() }; } // ... other QueueRuntime methods } ``` ### 5.2 Task.ts Integration Points **Current Flow (Task.ts line 403+):** ```typescript // Background task spawning const { taskId, outputFile, subagentId } = spawnBackgroundSubagentTask({ subagentType: subagent_type, prompt, description, model, toolCallId, existingAgentId: args.agent_id, existingConversationId: args.conversation_id, maxTurns: args.max_turns, }); ``` **Proposed Redis Integration:** ```typescript // New: Redis-backed task queue integration interface TaskQueueEnqueueOptions { subagentType: string; prompt: string; description: string; model?: string; toolCallId?: string; existingAgentId?: string; existingConversationId?: string; maxTurns?: number; priority?: number; runInBackground?: boolean; } // In Task.ts - replace spawnBackgroundSubagentTask with: export async function enqueueSubagentTask( args: TaskQueueEnqueueOptions, queue: RedisQueue ): Promise { const taskId = generateTaskId(); const subagentId = generateSubagentId(); // Register in subagent state store (for UI) registerSubagent(subagentId, args.subagentType, args.description, args.toolCallId, true); const outputFile = createBackgroundOutputFile(taskId); // Create task payload const payload: TaskPayload = { id: taskId, kind: "task_notification", subagentType: args.subagentType, prompt: args.prompt, description: args.description, model: args.model, existingAgentId: args.existingAgentId, existingConversationId: args.existingConversationId, maxTurns: args.maxTurns, subagentId, outputFile, priority: args.priority ?? 5, maxAttempts: 3, backoffMultiplier: 2, maxBackoffMs: 300000, enqueuedAt: Date.now(), source: "user", isCoalescable: false // Task notifications are not coalescable }; // Enqueue to Redis await queue.enqueue(payload); return { taskId, outputFile, subagentId }; } ``` ### 5.3 Worker Implementation for Subagents ```typescript // src/queue/redis/subagent-worker.ts class SubagentTaskWorker extends TaskWorker { protected async processTask(task: TaskState): Promise { // Update subagent state to "running" updateSubagent(task.subagentId!, { status: "running" }); try { // Execute subagent (existing manager.ts logic) const result = await spawnSubagent( task.subagentType!, task.prompt!, task.model, task.subagentId!, undefined, // signal - handled via task cancellation task.existingAgentId, task.existingConversationId, task.maxTurns ); // Write transcript writeTaskTranscriptResult(task.outputFile!, result, ""); // Complete subagent state completeSubagent(task.subagentId!, { success: result.success, error: result.error, totalTokens: result.totalTokens }); // Send notification if not silent if (!task.silent) { const notification = formatTaskNotification({ taskId: task.id, status: result.success ? "completed" : "failed", summary: `Agent "${task.description}" ${result.success ? "completed" : "failed"}`, result: result.success ? result.report : result.error, outputFile: task.outputFile! }); // Add to message queue for parent agent addToMessageQueue({ kind: "task_notification", text: notification }); } // Mark task completed await this.completeTask(task.id, result); } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); // Update subagent state completeSubagent(task.subagentId!, { success: false, error: errorMessage }); // Fail task (triggers retry logic) await this.failTask(task.id, new Error(errorMessage)); } } } ``` --- ## 6. Operational Considerations ### 6.1 Redis Configuration ```yaml # Recommended Redis config for task queue maxmemory: 1gb maxmemory-policy: allkeys-lru # Evict old completed tasks first # Persistence (for durability) appendonly: yes appendfsync: everysec # Stream trimming (prevent unbounded growth) # Set via XTRIM or MAXLEN on XADD ``` ### 6.2 Key Patterns and Cleanup | Key Pattern | Type | TTL | Cleanup Strategy | |-------------|------|-----|------------------| | `ade:queue:tasks` | Stream | - | XTRIM by MAXLEN (keep 100k) | | `ade:queue:delayed` | ZSET | - | Processed by promoter | | `ade:task:{id}` | Hash | 7 days | Expire completed/failed | | `ade:workers:active` | Hash | - | On worker deregistration | | `ade:workers:heartbeat` | ZSET | - | On worker timeout | ### 6.3 Monitoring Metrics ```typescript // Metrics to expose via Prometheus/StatsD interface QueueMetrics { // Queue depth "ade_queue_pending_total": number; // XPENDING count "ade_queue_delayed_total": number; // ZCARD ade:queue:delayed "ade_queue_stream_length": number; // XLEN ade:queue:tasks // Throughput "ade_tasks_enqueued_rate": number; // XADD rate "ade_tasks_completed_rate": number; // Completion rate "ade_tasks_failed_rate": number; // Failure rate // Worker health "ade_workers_active_total": number; // HLEN ade:workers:active "ade_workers_dead_total": number; // Detected dead workers // Processing "ade_task_duration_ms": Histogram; // Time from claim to complete "ade_task_wait_ms": Histogram; // Time from enqueue to claim "ade_task_attempts": Histogram; // Distribution of retry counts } ``` ### 6.4 Failure Modes | Scenario | Handling | |----------|----------| | Redis unavailable | Tasks fail immediately; caller responsible for retry | | Worker crash | Tasks reclaimed via heartbeat timeout (30s) | | Poison message | Max retries (3) then moved to DLQ | | Slow task | Processing timeout (5 min) triggers requeue | | Duplicate task | Idempotent task IDs (UUID) prevent double execution | --- ## 7. Migration Strategy ### Phase 1: Dual-Write (Week 1) - Implement RedisQueueAdapter - Write to both in-memory and Redis queues - Read from in-memory only (Redis for validation) ### Phase 2: Shadow Mode (Week 2) - Read from both queues - Compare results, log discrepancies - Fix any edge cases ### Phase 3: Cutover (Week 3) - Switch reads to Redis - Keep in-memory as fallback - Monitor for 1 week ### Phase 4: Cleanup (Week 4) - Remove in-memory queue code - Full Redis dependency --- ## 8. Implementation Checklist - [ ] Redis client configuration (ioredis or node-redis) - [ ] Task entity schema and serialization - [ ] Stream consumer group setup - [ ] Worker registration and heartbeat - [ ] Task claim and processing loop - [ ] Retry logic with exponential backoff - [ ] Delayed task promotion - [ ] Dead worker detection and reclamation - [ ] QueueRuntime adapter implementation - [ ] Task.ts integration - [ ] Subagent state synchronization - [ ] Metrics and monitoring - [ ] Error handling and DLQ - [ ] Tests (unit, integration, load) - [ ] Documentation --- ## 9. Appendix: Redis Commands Reference | Operation | Command | Complexity | |-----------|---------|------------| | Enqueue task | `XADD` | O(1) | | Claim tasks | `XREADGROUP` | O(N) N=count | | Ack completion | `XACK` | O(1) | | Get pending | `XPENDING` | O(1) | | Claim pending | `XCLAIM` / `XAUTOCLAIM` | O(log N) | | Delay task | `ZADD` delayed | O(log N) | | Promote delayed | `ZRANGEBYSCORE` + `ZREM` + `XADD` | O(log N + M) | | Register worker | `HSET` + `ZADD` | O(1) | | Heartbeat | `ZADD` | O(log N) | | Detect dead | `ZRANGEBYSCORE` | O(log N + M) |