Files
community-ade/docs/ade-redis-queue-design.md
Ani (Annie Tunturi) 00382055c6 Initial commit: Community ADE foundation
- Project structure: docs/, src/, tests/, proto/
- Research synthesis: Letta vs commercial ADEs
- Architecture: Redis Streams queue design
- Phase 1 orchestration design
- Execution plan and project state tracking
- Working subagent system (manager.ts fixes)

This is the foundation for a Community ADE built on Letta's
stateful agent architecture with git-native MemFS.

👾 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta Code <noreply@letta.com>
2026-03-18 10:30:20 -04:00

23 KiB

Redis Task Queue Architecture for Letta Community ADE

Executive Summary

This document outlines the architecture for replacing the in-memory QueueRuntime with a Redis-backed persistent task queue. The design prioritizes durability, horizontal scalability, and reliable task execution while maintaining compatibility with the existing Task tool and subagent spawning workflows.

Key Decisions:

  • Use Redis Streams (not Sorted Sets) for the primary task queue to leverage consumer groups and at-least-once delivery guarantees
  • Hybrid approach: Streams for queue semantics, Sorted Sets for scheduling/delays, Hashes for task state
  • Stateless workers with heartbeat-based liveness detection
  • Exponential backoff with jitter for retry logic

1. Redis Data Structures

1.1 Primary Queue: Redis Stream

Key: ade:queue:tasks
Type: Stream
Purpose: Main task ingestion and distribution

Why Streams over Sorted Sets?

Feature Sorted Sets Redis Streams
Ordering Score-based (can have ties) Strict temporal (millisecond ID)
Consumer Groups Manual implementation Built-in XREADGROUP
Delivery Semantics At-most-once (easy) / At-least-once (complex) At-least-once with ACK
Pending Tracking Manual Built-in XPENDING
Claim/Retry Custom Lua scripts Built-in XCLAIM/XAUTOCLAIM
Message Visibility Immediate to all Consumer-group isolated

Streams provide the exact semantics needed for reliable task processing without custom Lua scripting.

Stream Entries:

XADD ade:queue:tasks * taskId <uuid> payload <json> priority <int>

1.2 Delayed Tasks: Sorted Set

Key: ade:queue:delayed
Type: Sorted Set (ZSET)
Score: scheduled execution timestamp (ms)
Member: taskId

Used for:

  • Tasks with explicit runAfter timestamps
  • Retry scheduling with exponential backoff
  • Rate-limited task release

1.3 Task State Storage: Redis Hash

Key: ade:task:{taskId}
Type: Hash
Fields:
  - id: string (UUID v4)
  - status: pending|running|completed|failed
  - payload: JSON (task arguments)
  - createdAt: timestamp (ms)
  - startedAt: timestamp (ms)
  - completedAt: timestamp (ms)
  - workerId: string (nullable)
  - attemptCount: integer
  - maxAttempts: integer (default: 3)
  - error: string (last error message)
  - result: JSON (completed task result)
  - parentTaskId: string (nullable, for task chains)
  - subagentId: string (link to subagent state)
  - priority: integer (0-9, default 5)
  - kind: message|task_notification|approval_result|overlay_action
TTL: 7 days (configurable cleanup for completed/failed tasks)

1.4 Worker Registry: Redis Hash + Sorted Set

Key: ade:workers:active
Type: Hash
Fields per worker:
  - {workerId}: JSON { hostname, pid, startedAt, lastHeartbeat, version }

Key: ade:workers:heartbeat
Type: Sorted Set
Score: last heartbeat timestamp
Member: workerId

1.5 Consumer Group State

Stream Consumer Group: ade:queue:tasks
Group Name: ade-workers
Consumer Name: {workerId} (unique per process)

Redis Streams automatically track:

  • Pending messages per consumer (XPENDING)
  • Delivery count per message
  • Idle time since last read

2. Task Entity Schema

2.1 TypeScript Interface

// src/queue/redis/types.ts

export type TaskStatus = 
  | "pending"      // Enqueued, not yet claimed
  | "running"      // Claimed by worker, processing
  | "completed"    // Successfully finished
  | "failed"       // Exhausted all retries
  | "cancelled";   // Explicitly cancelled

export type TaskKind =
  | "message"
  | "task_notification" 
  | "approval_result"
  | "overlay_action";

export interface TaskPayload {
  // Task identification
  id: string;                    // UUID v4
  kind: TaskKind;
  
  // Execution context
  agentId?: string;
  conversationId?: string;
  clientMessageId?: string;
  
  // Content (varies by kind)
  content?: unknown;             // For "message" kind
  text?: string;                 // For notification/approval/overlay
  
  // Subagent execution params (for task_notification)
  subagentType?: string;
  prompt?: string;
  model?: string;
  existingAgentId?: string;
  existingConversationId?: string;
  maxTurns?: number;
  
  // Scheduling
  priority: number;              // 0-9, lower = higher priority
  runAfter?: number;             // Timestamp ms (for delayed tasks)
  
  // Retry configuration
  maxAttempts: number;
  backoffMultiplier: number;     // Default: 2
  maxBackoffMs: number;          // Default: 300000 (5 min)
  
  // Metadata
  enqueuedAt: number;
  source: "user" | "system" | "hook";
}

export interface TaskState extends TaskPayload {
  status: TaskStatus;
  workerId?: string;
  attemptCount: number;
  startedAt?: number;
  completedAt?: number;
  error?: string;
  result?: unknown;
  
  // Coalescing support (from QueueRuntime)
  isCoalescable: boolean;
  scopeKey?: string;             // For grouping coalescable items
}

2.2 State Transitions

                    ┌─────────────┐
                    │   PENDING   │◄──────────────────┐
                    │  (queued)   │                   │
                    └──────┬──────┘                   │
                           │ claim                    │ retry
                           ▼                          │ (with delay)
                    ┌─────────────┐                   │
         ┌─────────│   RUNNING   │───────────────────┘
         │         │  (claimed)  │ fail (retryable)
         │         └──────┬──────┘
complete │                │ fail (final)
         │                ▼
         │         ┌─────────────┐
         └────────►│  COMPLETED  │
                   └─────────────┘
                          │
                   ┌──────┴──────┐
                   │    FAILED   │
                   │  (exhausted)│
                   └─────────────┘

3. Worker Pool Registration and Heartbeat

3.1 Worker Lifecycle

// src/queue/redis/worker.ts

class TaskWorker {
  private workerId: string;
  private redis: RedisClient;
  private isRunning: boolean = false;
  private heartbeatInterval?: NodeJS.Timeout;
  private claimInterval?: NodeJS.Timeout;
  
  // Config
  private readonly HEARTBEAT_INTERVAL_MS = 5000;
  private readonly HEARTBEAT_TIMEOUT_MS = 30000;
  private readonly CLAIM_BATCH_SIZE = 10;
  private readonly PROCESSING_TIMEOUT_MS = 300000; // 5 min

  async start(): Promise<void> {
    this.workerId = generateWorkerId(); // {hostname}:{pid}:{uuid}
    
    // Register in worker registry
    await this.redis.hSet("ade:workers:active", this.workerId, JSON.stringify({
      hostname: os.hostname(),
      pid: process.pid,
      startedAt: Date.now(),
      lastHeartbeat: Date.now(),
      version: process.env.npm_package_version || "unknown"
    }));
    
    // Create consumer in stream group (idempotent)
    try {
      await this.redis.xGroupCreate("ade:queue:tasks", "ade-workers", "$", {
        MKSTREAM: true
      });
    } catch (err) {
      // Group already exists - ignore
    }
    
    this.isRunning = true;
    this.startHeartbeat();
    this.startClaimLoop();
  }

  async stop(): Promise<void> {
    this.isRunning = false;
    clearInterval(this.heartbeatInterval);
    clearInterval(this.claimInterval);
    
    // Release pending tasks back to queue
    await this.releasePendingTasks();
    
    // Deregister
    await this.redis.hDel("ade:workers:active", this.workerId);
    await this.redis.zRem("ade:workers:heartbeat", this.workerId);
  }

  private startHeartbeat(): void {
    this.heartbeatInterval = setInterval(async () => {
      await this.redis.zAdd("ade:workers:heartbeat", {
        score: Date.now(),
        value: this.workerId
      });
      await this.redis.hSet("ade:workers:active", this.workerId, JSON.stringify({
        ...currentInfo,
        lastHeartbeat: Date.now()
      }));
    }, this.HEARTBEAT_INTERVAL_MS);
  }
}

3.2 Dead Worker Detection

// src/queue/redis/orchestrator.ts (singleton, per-deployment)

class QueueOrchestrator {
  async detectAndReclaimDeadWorkerTasks(): Promise<number> {
    const now = Date.now();
    const cutoff = now - this.HEARTBEAT_TIMEOUT_MS;
    
    // Find dead workers
    const deadWorkers = await this.redis.zRangeByScore(
      "ade:workers:heartbeat",
      "-inf",
      cutoff
    );
    
    let reclaimedCount = 0;
    
    for (const workerId of deadWorkers) {
      // Find pending tasks for this worker using XPENDING
      const pending = await this.redis.xPendingRange(
        "ade:queue:tasks",
        "ade-workers",
        "-",
        "+
        this.CLAIM_BATCH_SIZE
      );
      
      for (const item of pending) {
        if (item.consumer === workerId && item.idle > this.PROCESSING_TIMEOUT_MS) {
          // Use XAUTOCLAIM to atomically claim and retry
          const [nextId, claimed] = await this.redis.xAutoClaim(
            "ade:queue:tasks",
            "ade-workers",
            "orchestrator", // consumer name for cleanup
            this.PROCESSING_TIMEOUT_MS,
            item.id,
            { COUNT: 1 }
          );
          
          // Release back to pending by ACKing (removes from pending list)
          // The orchestrator will re-add to delayed queue for retry
          await this.redis.xAck("ade:queue:tasks", "ade-workers", item.id);
          await this.scheduleRetry(item.id);
          reclaimedCount++;
        }
      }
      
      // Clean up dead worker registration
      await this.redis.hDel("ade:workers:active", workerId);
      await this.redis.zRem("ade:workers:heartbeat", workerId);
    }
    
    return reclaimedCount;
  }
}

4. Retry Logic with Exponential Backoff

4.1 Backoff Calculation

// src/queue/redis/retry.ts

interface RetryConfig {
  attempt: number;           // 0-indexed (0 = first retry)
  baseDelayMs: number;       // Default: 1000
  multiplier: number;        // Default: 2
  maxDelayMs: number;        // Default: 300000 (5 min)
  jitterFactor: number;      // Default: 0.1 (10% randomization)
}

function calculateRetryDelay(config: RetryConfig): number {
  // Exponential backoff: base * (multiplier ^ attempt)
  const exponentialDelay = config.baseDelayMs * 
    Math.pow(config.multiplier, config.attempt);
  
  // Cap at max
  const cappedDelay = Math.min(exponentialDelay, config.maxDelayMs);
  
  // Add jitter to prevent thundering herd: ±jitterFactor
  const jitter = cappedDelay * config.jitterFactor * (Math.random() * 2 - 1);
  
  return Math.floor(cappedDelay + jitter);
}

// Examples with defaults:
// Attempt 0 (first retry): ~1000ms ±100ms
// Attempt 1: ~2000ms ±200ms
// Attempt 2: ~4000ms ±400ms
// Attempt 3: ~8000ms ±800ms
// Attempt 4: ~16000ms ±1600ms
// ...up to max 300000ms (5 min)

4.2 Retry Flow

async function handleTaskFailure(
  taskId: string,
  error: Error,
  workerId: string
): Promise<void> {
  const taskKey = `ade:task:${taskId}`;
  const task = await redis.hGetAll(taskKey);
  
  const attemptCount = parseInt(task.attemptCount) + 1;
  const maxAttempts = parseInt(task.maxAttempts);
  
  if (attemptCount >= maxAttempts) {
    // Final failure - mark as failed
    await redis.hSet(taskKey, {
      status: "failed",
      error: error.message,
      completedAt: Date.now(),
      attemptCount: attemptCount.toString()
    });
    
    // Publish failure event for observers
    await redis.publish("ade:events:task-failed", JSON.stringify({
      taskId,
      error: error.message,
      totalAttempts: attemptCount
    }));
    
    // ACK to remove from pending
    await redis.xAck("ade:queue:tasks", "ade-workers", taskId);
  } else {
    // Schedule retry
    const delay = calculateRetryDelay({
      attempt: attemptCount,
      baseDelayMs: 1000,
      multiplier: 2,
      maxDelayMs: 300000,
      jitterFactor: 0.1
    });
    
    const runAfter = Date.now() + delay;
    
    // Update task state
    await redis.hSet(taskKey, {
      status: "pending",
      attemptCount: attemptCount.toString(),
      error: error.message,
      workerId: "" // Clear worker assignment
    });
    
    // Add to delayed queue
    await redis.zAdd("ade:queue:delayed", {
      score: runAfter,
      value: taskId
    });
    
    // ACK to remove from stream pending
    await redis.xAck("ade:queue:tasks", "ade-workers", taskId);
  }
}

4.3 Delayed Task Promoter

// Runs periodically (every 1 second) to move due tasks from delayed set to stream

async function promoteDelayedTasks(): Promise<number> {
  const now = Date.now();
  
  // Atomically get and remove due tasks
  const dueTasks = await redis.zRangeByScore(
    "ade:queue:delayed",
    "-inf",
    now,
    { LIMIT: { offset: 0, count: 100 } }
  );
  
  if (dueTasks.length === 0) return 0;
  
  // Remove from delayed queue
  await redis.zRem("ade:queue:delayed", dueTasks);
  
  // Re-add to stream for processing
  for (const taskId of dueTasks) {
    const task = await redis.hGetAll(`ade:task:${taskId}`);
    await redis.xAdd("ade:queue:tasks", "*", {
      taskId,
      payload: task.payload,
      priority: task.priority
    });
  }
  
  return dueTasks.length;
}

5. Integration with Existing Task.ts

5.1 Adapter Pattern

// src/queue/redis/adapter.ts

import { QueueRuntime, QueueItem, DequeuedBatch } from "../queueRuntime";
import { RedisQueue } from "./queue";

/**
 * Redis-backed implementation of QueueRuntime interface.
 * Allows drop-in replacement of in-memory queue.
 */
export class RedisQueueAdapter implements QueueRuntime {
  private redisQueue: RedisQueue;
  private localBatchBuffer: Map<string, QueueItem> = new Map();
  
  constructor(redisUrl: string, options?: QueueRuntimeOptions) {
    this.redisQueue = new RedisQueue(redisUrl, {
      ...options,
      onTaskCompleted: this.handleTaskCompleted.bind(this),
      onTaskFailed: this.handleTaskFailed.bind(this)
    });
  }

  async enqueue(input: Omit<QueueItem, "id" | "enqueuedAt">): Promise<QueueItem | null> {
    // Map QueueItem to TaskPayload
    const taskId = generateUUID();
    const enqueuedAt = Date.now();
    
    const payload: TaskPayload = {
      id: taskId,
      kind: input.kind,
      agentId: input.agentId,
      conversationId: input.conversationId,
      clientMessageId: input.clientMessageId,
      text: (input as any).text,
      content: (input as any).content,
      priority: 5, // Default priority
      maxAttempts: 3,
      backoffMultiplier: 2,
      maxBackoffMs: 300000,
      enqueuedAt,
      source: "user",
      isCoalescable: isCoalescable(input.kind)
    };
    
    const success = await this.redisQueue.enqueue(payload);
    if (!success) return null;
    
    return {
      ...input,
      id: taskId,
      enqueuedAt
    } as QueueItem;
  }

  async tryDequeue(blockedReason: QueueBlockedReason | null): Promise<DequeuedBatch | null> {
    if (blockedReason !== null) {
      // Emit blocked event if needed (preserving QueueRuntime behavior)
      return null;
    }
    
    // Claim batch from Redis
    const batch = await this.redisQueue.claimBatch({
      consumerId: this.workerId,
      batchSize: this.getCoalescingBatchSize(),
      coalescingWindowMs: 50 // Small window for coalescing
    });
    
    if (!batch || batch.length === 0) return null;
    
    // Map back to QueueItem format
    const items: QueueItem[] = batch.map(task => this.mapTaskToQueueItem(task));
    
    return {
      batchId: generateBatchId(),
      items,
      mergedCount: items.length,
      queueLenAfter: await this.redisQueue.getQueueLength()
    };
  }

  // ... other QueueRuntime methods
}

5.2 Task.ts Integration Points

Current Flow (Task.ts line 403+):

// Background task spawning
const { taskId, outputFile, subagentId } = spawnBackgroundSubagentTask({
  subagentType: subagent_type,
  prompt,
  description,
  model,
  toolCallId,
  existingAgentId: args.agent_id,
  existingConversationId: args.conversation_id,
  maxTurns: args.max_turns,
});

Proposed Redis Integration:

// New: Redis-backed task queue integration
interface TaskQueueEnqueueOptions {
  subagentType: string;
  prompt: string;
  description: string;
  model?: string;
  toolCallId?: string;
  existingAgentId?: string;
  existingConversationId?: string;
  maxTurns?: number;
  priority?: number;
  runInBackground?: boolean;
}

// In Task.ts - replace spawnBackgroundSubagentTask with:
export async function enqueueSubagentTask(
  args: TaskQueueEnqueueOptions,
  queue: RedisQueue
): Promise<TaskEnqueueResult> {
  
  const taskId = generateTaskId();
  const subagentId = generateSubagentId();
  
  // Register in subagent state store (for UI)
  registerSubagent(subagentId, args.subagentType, args.description, args.toolCallId, true);
  
  const outputFile = createBackgroundOutputFile(taskId);
  
  // Create task payload
  const payload: TaskPayload = {
    id: taskId,
    kind: "task_notification",
    subagentType: args.subagentType,
    prompt: args.prompt,
    description: args.description,
    model: args.model,
    existingAgentId: args.existingAgentId,
    existingConversationId: args.existingConversationId,
    maxTurns: args.maxTurns,
    subagentId,
    outputFile,
    priority: args.priority ?? 5,
    maxAttempts: 3,
    backoffMultiplier: 2,
    maxBackoffMs: 300000,
    enqueuedAt: Date.now(),
    source: "user",
    isCoalescable: false // Task notifications are not coalescable
  };
  
  // Enqueue to Redis
  await queue.enqueue(payload);
  
  return { taskId, outputFile, subagentId };
}

5.3 Worker Implementation for Subagents

// src/queue/redis/subagent-worker.ts

class SubagentTaskWorker extends TaskWorker {
  protected async processTask(task: TaskState): Promise<void> {
    // Update subagent state to "running"
    updateSubagent(task.subagentId!, { status: "running" });
    
    try {
      // Execute subagent (existing manager.ts logic)
      const result = await spawnSubagent(
        task.subagentType!,
        task.prompt!,
        task.model,
        task.subagentId!,
        undefined, // signal - handled via task cancellation
        task.existingAgentId,
        task.existingConversationId,
        task.maxTurns
      );
      
      // Write transcript
      writeTaskTranscriptResult(task.outputFile!, result, "");
      
      // Complete subagent state
      completeSubagent(task.subagentId!, {
        success: result.success,
        error: result.error,
        totalTokens: result.totalTokens
      });
      
      // Send notification if not silent
      if (!task.silent) {
        const notification = formatTaskNotification({
          taskId: task.id,
          status: result.success ? "completed" : "failed",
          summary: `Agent "${task.description}" ${result.success ? "completed" : "failed"}`,
          result: result.success ? result.report : result.error,
          outputFile: task.outputFile!
        });
        
        // Add to message queue for parent agent
        addToMessageQueue({
          kind: "task_notification",
          text: notification
        });
      }
      
      // Mark task completed
      await this.completeTask(task.id, result);
      
    } catch (error) {
      const errorMessage = error instanceof Error ? error.message : String(error);
      
      // Update subagent state
      completeSubagent(task.subagentId!, { success: false, error: errorMessage });
      
      // Fail task (triggers retry logic)
      await this.failTask(task.id, new Error(errorMessage));
    }
  }
}

6. Operational Considerations

6.1 Redis Configuration

# Recommended Redis config for task queue
maxmemory: 1gb
maxmemory-policy: allkeys-lru  # Evict old completed tasks first

# Persistence (for durability)
appendonly: yes
appendfsync: everysec

# Stream trimming (prevent unbounded growth)
# Set via XTRIM or MAXLEN on XADD

6.2 Key Patterns and Cleanup

Key Pattern Type TTL Cleanup Strategy
ade:queue:tasks Stream - XTRIM by MAXLEN (keep 100k)
ade:queue:delayed ZSET - Processed by promoter
ade:task:{id} Hash 7 days Expire completed/failed
ade:workers:active Hash - On worker deregistration
ade:workers:heartbeat ZSET - On worker timeout

6.3 Monitoring Metrics

// Metrics to expose via Prometheus/StatsD
interface QueueMetrics {
  // Queue depth
  "ade_queue_pending_total": number;      // XPENDING count
  "ade_queue_delayed_total": number;      // ZCARD ade:queue:delayed
  "ade_queue_stream_length": number;      // XLEN ade:queue:tasks
  
  // Throughput
  "ade_tasks_enqueued_rate": number;      // XADD rate
  "ade_tasks_completed_rate": number;     // Completion rate
  "ade_tasks_failed_rate": number;        // Failure rate
  
  // Worker health
  "ade_workers_active_total": number;     // HLEN ade:workers:active
  "ade_workers_dead_total": number;       // Detected dead workers
  
  // Processing
  "ade_task_duration_ms": Histogram;      // Time from claim to complete
  "ade_task_wait_ms": Histogram;          // Time from enqueue to claim
  "ade_task_attempts": Histogram;         // Distribution of retry counts
}

6.4 Failure Modes

Scenario Handling
Redis unavailable Tasks fail immediately; caller responsible for retry
Worker crash Tasks reclaimed via heartbeat timeout (30s)
Poison message Max retries (3) then moved to DLQ
Slow task Processing timeout (5 min) triggers requeue
Duplicate task Idempotent task IDs (UUID) prevent double execution

7. Migration Strategy

Phase 1: Dual-Write (Week 1)

  • Implement RedisQueueAdapter
  • Write to both in-memory and Redis queues
  • Read from in-memory only (Redis for validation)

Phase 2: Shadow Mode (Week 2)

  • Read from both queues
  • Compare results, log discrepancies
  • Fix any edge cases

Phase 3: Cutover (Week 3)

  • Switch reads to Redis
  • Keep in-memory as fallback
  • Monitor for 1 week

Phase 4: Cleanup (Week 4)

  • Remove in-memory queue code
  • Full Redis dependency

8. Implementation Checklist

  • Redis client configuration (ioredis or node-redis)
  • Task entity schema and serialization
  • Stream consumer group setup
  • Worker registration and heartbeat
  • Task claim and processing loop
  • Retry logic with exponential backoff
  • Delayed task promotion
  • Dead worker detection and reclamation
  • QueueRuntime adapter implementation
  • Task.ts integration
  • Subagent state synchronization
  • Metrics and monitoring
  • Error handling and DLQ
  • Tests (unit, integration, load)
  • Documentation

9. Appendix: Redis Commands Reference

Operation Command Complexity
Enqueue task XADD O(1)
Claim tasks XREADGROUP O(N) N=count
Ack completion XACK O(1)
Get pending XPENDING O(1)
Claim pending XCLAIM / XAUTOCLAIM O(log N)
Delay task ZADD delayed O(log N)
Promote delayed ZRANGEBYSCORE + ZREM + XADD O(log N + M)
Register worker HSET + ZADD O(1)
Heartbeat ZADD O(log N)
Detect dead ZRANGEBYSCORE O(log N + M)