Initial commit: Community ADE foundation
- Project structure: docs/, src/, tests/, proto/ - Research synthesis: Letta vs commercial ADEs - Architecture: Redis Streams queue design - Phase 1 orchestration design - Execution plan and project state tracking - Working subagent system (manager.ts fixes) This is the foundation for a Community ADE built on Letta's stateful agent architecture with git-native MemFS. 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta Code <noreply@letta.com>
This commit is contained in:
835
docs/ade-redis-queue-design.md
Normal file
835
docs/ade-redis-queue-design.md
Normal file
@@ -0,0 +1,835 @@
|
||||
# Redis Task Queue Architecture for Letta Community ADE
|
||||
|
||||
## Executive Summary
|
||||
|
||||
This document outlines the architecture for replacing the in-memory `QueueRuntime` with a Redis-backed persistent task queue. The design prioritizes durability, horizontal scalability, and reliable task execution while maintaining compatibility with the existing Task tool and subagent spawning workflows.
|
||||
|
||||
**Key Decisions:**
|
||||
- Use **Redis Streams** (not Sorted Sets) for the primary task queue to leverage consumer groups and at-least-once delivery guarantees
|
||||
- Hybrid approach: Streams for queue semantics, Sorted Sets for scheduling/delays, Hashes for task state
|
||||
- Stateless workers with heartbeat-based liveness detection
|
||||
- Exponential backoff with jitter for retry logic
|
||||
|
||||
---
|
||||
|
||||
## 1. Redis Data Structures
|
||||
|
||||
### 1.1 Primary Queue: Redis Stream
|
||||
|
||||
```
|
||||
Key: ade:queue:tasks
|
||||
Type: Stream
|
||||
Purpose: Main task ingestion and distribution
|
||||
```
|
||||
|
||||
**Why Streams over Sorted Sets?**
|
||||
|
||||
| Feature | Sorted Sets | Redis Streams |
|
||||
|---------|-------------|---------------|
|
||||
| Ordering | Score-based (can have ties) | Strict temporal (millisecond ID) |
|
||||
| Consumer Groups | Manual implementation | Built-in XREADGROUP |
|
||||
| Delivery Semantics | At-most-once (easy) / At-least-once (complex) | At-least-once with ACK |
|
||||
| Pending Tracking | Manual | Built-in XPENDING |
|
||||
| Claim/Retry | Custom Lua scripts | Built-in XCLAIM/XAUTOCLAIM |
|
||||
| Message Visibility | Immediate to all | Consumer-group isolated |
|
||||
|
||||
Streams provide the exact semantics needed for reliable task processing without custom Lua scripting.
|
||||
|
||||
**Stream Entries:**
|
||||
```
|
||||
XADD ade:queue:tasks * taskId <uuid> payload <json> priority <int>
|
||||
```
|
||||
|
||||
### 1.2 Delayed Tasks: Sorted Set
|
||||
|
||||
```
|
||||
Key: ade:queue:delayed
|
||||
Type: Sorted Set (ZSET)
|
||||
Score: scheduled execution timestamp (ms)
|
||||
Member: taskId
|
||||
```
|
||||
|
||||
Used for:
|
||||
- Tasks with explicit `runAfter` timestamps
|
||||
- Retry scheduling with exponential backoff
|
||||
- Rate-limited task release
|
||||
|
||||
### 1.3 Task State Storage: Redis Hash
|
||||
|
||||
```
|
||||
Key: ade:task:{taskId}
|
||||
Type: Hash
|
||||
Fields:
|
||||
- id: string (UUID v4)
|
||||
- status: pending|running|completed|failed
|
||||
- payload: JSON (task arguments)
|
||||
- createdAt: timestamp (ms)
|
||||
- startedAt: timestamp (ms)
|
||||
- completedAt: timestamp (ms)
|
||||
- workerId: string (nullable)
|
||||
- attemptCount: integer
|
||||
- maxAttempts: integer (default: 3)
|
||||
- error: string (last error message)
|
||||
- result: JSON (completed task result)
|
||||
- parentTaskId: string (nullable, for task chains)
|
||||
- subagentId: string (link to subagent state)
|
||||
- priority: integer (0-9, default 5)
|
||||
- kind: message|task_notification|approval_result|overlay_action
|
||||
TTL: 7 days (configurable cleanup for completed/failed tasks)
|
||||
```
|
||||
|
||||
### 1.4 Worker Registry: Redis Hash + Sorted Set
|
||||
|
||||
```
|
||||
Key: ade:workers:active
|
||||
Type: Hash
|
||||
Fields per worker:
|
||||
- {workerId}: JSON { hostname, pid, startedAt, lastHeartbeat, version }
|
||||
|
||||
Key: ade:workers:heartbeat
|
||||
Type: Sorted Set
|
||||
Score: last heartbeat timestamp
|
||||
Member: workerId
|
||||
```
|
||||
|
||||
### 1.5 Consumer Group State
|
||||
|
||||
```
|
||||
Stream Consumer Group: ade:queue:tasks
|
||||
Group Name: ade-workers
|
||||
Consumer Name: {workerId} (unique per process)
|
||||
```
|
||||
|
||||
Redis Streams automatically track:
|
||||
- Pending messages per consumer (XPENDING)
|
||||
- Delivery count per message
|
||||
- Idle time since last read
|
||||
|
||||
---
|
||||
|
||||
## 2. Task Entity Schema
|
||||
|
||||
### 2.1 TypeScript Interface
|
||||
|
||||
```typescript
|
||||
// src/queue/redis/types.ts
|
||||
|
||||
export type TaskStatus =
|
||||
| "pending" // Enqueued, not yet claimed
|
||||
| "running" // Claimed by worker, processing
|
||||
| "completed" // Successfully finished
|
||||
| "failed" // Exhausted all retries
|
||||
| "cancelled"; // Explicitly cancelled
|
||||
|
||||
export type TaskKind =
|
||||
| "message"
|
||||
| "task_notification"
|
||||
| "approval_result"
|
||||
| "overlay_action";
|
||||
|
||||
export interface TaskPayload {
|
||||
// Task identification
|
||||
id: string; // UUID v4
|
||||
kind: TaskKind;
|
||||
|
||||
// Execution context
|
||||
agentId?: string;
|
||||
conversationId?: string;
|
||||
clientMessageId?: string;
|
||||
|
||||
// Content (varies by kind)
|
||||
content?: unknown; // For "message" kind
|
||||
text?: string; // For notification/approval/overlay
|
||||
|
||||
// Subagent execution params (for task_notification)
|
||||
subagentType?: string;
|
||||
prompt?: string;
|
||||
model?: string;
|
||||
existingAgentId?: string;
|
||||
existingConversationId?: string;
|
||||
maxTurns?: number;
|
||||
|
||||
// Scheduling
|
||||
priority: number; // 0-9, lower = higher priority
|
||||
runAfter?: number; // Timestamp ms (for delayed tasks)
|
||||
|
||||
// Retry configuration
|
||||
maxAttempts: number;
|
||||
backoffMultiplier: number; // Default: 2
|
||||
maxBackoffMs: number; // Default: 300000 (5 min)
|
||||
|
||||
// Metadata
|
||||
enqueuedAt: number;
|
||||
source: "user" | "system" | "hook";
|
||||
}
|
||||
|
||||
export interface TaskState extends TaskPayload {
|
||||
status: TaskStatus;
|
||||
workerId?: string;
|
||||
attemptCount: number;
|
||||
startedAt?: number;
|
||||
completedAt?: number;
|
||||
error?: string;
|
||||
result?: unknown;
|
||||
|
||||
// Coalescing support (from QueueRuntime)
|
||||
isCoalescable: boolean;
|
||||
scopeKey?: string; // For grouping coalescable items
|
||||
}
|
||||
```
|
||||
|
||||
### 2.2 State Transitions
|
||||
|
||||
```
|
||||
┌─────────────┐
|
||||
│ PENDING │◄──────────────────┐
|
||||
│ (queued) │ │
|
||||
└──────┬──────┘ │
|
||||
│ claim │ retry
|
||||
▼ │ (with delay)
|
||||
┌─────────────┐ │
|
||||
┌─────────│ RUNNING │───────────────────┘
|
||||
│ │ (claimed) │ fail (retryable)
|
||||
│ └──────┬──────┘
|
||||
complete │ │ fail (final)
|
||||
│ ▼
|
||||
│ ┌─────────────┐
|
||||
└────────►│ COMPLETED │
|
||||
└─────────────┘
|
||||
│
|
||||
┌──────┴──────┐
|
||||
│ FAILED │
|
||||
│ (exhausted)│
|
||||
└─────────────┘
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 3. Worker Pool Registration and Heartbeat
|
||||
|
||||
### 3.1 Worker Lifecycle
|
||||
|
||||
```typescript
|
||||
// src/queue/redis/worker.ts
|
||||
|
||||
class TaskWorker {
|
||||
private workerId: string;
|
||||
private redis: RedisClient;
|
||||
private isRunning: boolean = false;
|
||||
private heartbeatInterval?: NodeJS.Timeout;
|
||||
private claimInterval?: NodeJS.Timeout;
|
||||
|
||||
// Config
|
||||
private readonly HEARTBEAT_INTERVAL_MS = 5000;
|
||||
private readonly HEARTBEAT_TIMEOUT_MS = 30000;
|
||||
private readonly CLAIM_BATCH_SIZE = 10;
|
||||
private readonly PROCESSING_TIMEOUT_MS = 300000; // 5 min
|
||||
|
||||
async start(): Promise<void> {
|
||||
this.workerId = generateWorkerId(); // {hostname}:{pid}:{uuid}
|
||||
|
||||
// Register in worker registry
|
||||
await this.redis.hSet("ade:workers:active", this.workerId, JSON.stringify({
|
||||
hostname: os.hostname(),
|
||||
pid: process.pid,
|
||||
startedAt: Date.now(),
|
||||
lastHeartbeat: Date.now(),
|
||||
version: process.env.npm_package_version || "unknown"
|
||||
}));
|
||||
|
||||
// Create consumer in stream group (idempotent)
|
||||
try {
|
||||
await this.redis.xGroupCreate("ade:queue:tasks", "ade-workers", "$", {
|
||||
MKSTREAM: true
|
||||
});
|
||||
} catch (err) {
|
||||
// Group already exists - ignore
|
||||
}
|
||||
|
||||
this.isRunning = true;
|
||||
this.startHeartbeat();
|
||||
this.startClaimLoop();
|
||||
}
|
||||
|
||||
async stop(): Promise<void> {
|
||||
this.isRunning = false;
|
||||
clearInterval(this.heartbeatInterval);
|
||||
clearInterval(this.claimInterval);
|
||||
|
||||
// Release pending tasks back to queue
|
||||
await this.releasePendingTasks();
|
||||
|
||||
// Deregister
|
||||
await this.redis.hDel("ade:workers:active", this.workerId);
|
||||
await this.redis.zRem("ade:workers:heartbeat", this.workerId);
|
||||
}
|
||||
|
||||
private startHeartbeat(): void {
|
||||
this.heartbeatInterval = setInterval(async () => {
|
||||
await this.redis.zAdd("ade:workers:heartbeat", {
|
||||
score: Date.now(),
|
||||
value: this.workerId
|
||||
});
|
||||
await this.redis.hSet("ade:workers:active", this.workerId, JSON.stringify({
|
||||
...currentInfo,
|
||||
lastHeartbeat: Date.now()
|
||||
}));
|
||||
}, this.HEARTBEAT_INTERVAL_MS);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 3.2 Dead Worker Detection
|
||||
|
||||
```typescript
|
||||
// src/queue/redis/orchestrator.ts (singleton, per-deployment)
|
||||
|
||||
class QueueOrchestrator {
|
||||
async detectAndReclaimDeadWorkerTasks(): Promise<number> {
|
||||
const now = Date.now();
|
||||
const cutoff = now - this.HEARTBEAT_TIMEOUT_MS;
|
||||
|
||||
// Find dead workers
|
||||
const deadWorkers = await this.redis.zRangeByScore(
|
||||
"ade:workers:heartbeat",
|
||||
"-inf",
|
||||
cutoff
|
||||
);
|
||||
|
||||
let reclaimedCount = 0;
|
||||
|
||||
for (const workerId of deadWorkers) {
|
||||
// Find pending tasks for this worker using XPENDING
|
||||
const pending = await this.redis.xPendingRange(
|
||||
"ade:queue:tasks",
|
||||
"ade-workers",
|
||||
"-",
|
||||
"+
|
||||
this.CLAIM_BATCH_SIZE
|
||||
);
|
||||
|
||||
for (const item of pending) {
|
||||
if (item.consumer === workerId && item.idle > this.PROCESSING_TIMEOUT_MS) {
|
||||
// Use XAUTOCLAIM to atomically claim and retry
|
||||
const [nextId, claimed] = await this.redis.xAutoClaim(
|
||||
"ade:queue:tasks",
|
||||
"ade-workers",
|
||||
"orchestrator", // consumer name for cleanup
|
||||
this.PROCESSING_TIMEOUT_MS,
|
||||
item.id,
|
||||
{ COUNT: 1 }
|
||||
);
|
||||
|
||||
// Release back to pending by ACKing (removes from pending list)
|
||||
// The orchestrator will re-add to delayed queue for retry
|
||||
await this.redis.xAck("ade:queue:tasks", "ade-workers", item.id);
|
||||
await this.scheduleRetry(item.id);
|
||||
reclaimedCount++;
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up dead worker registration
|
||||
await this.redis.hDel("ade:workers:active", workerId);
|
||||
await this.redis.zRem("ade:workers:heartbeat", workerId);
|
||||
}
|
||||
|
||||
return reclaimedCount;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 4. Retry Logic with Exponential Backoff
|
||||
|
||||
### 4.1 Backoff Calculation
|
||||
|
||||
```typescript
|
||||
// src/queue/redis/retry.ts
|
||||
|
||||
interface RetryConfig {
|
||||
attempt: number; // 0-indexed (0 = first retry)
|
||||
baseDelayMs: number; // Default: 1000
|
||||
multiplier: number; // Default: 2
|
||||
maxDelayMs: number; // Default: 300000 (5 min)
|
||||
jitterFactor: number; // Default: 0.1 (10% randomization)
|
||||
}
|
||||
|
||||
function calculateRetryDelay(config: RetryConfig): number {
|
||||
// Exponential backoff: base * (multiplier ^ attempt)
|
||||
const exponentialDelay = config.baseDelayMs *
|
||||
Math.pow(config.multiplier, config.attempt);
|
||||
|
||||
// Cap at max
|
||||
const cappedDelay = Math.min(exponentialDelay, config.maxDelayMs);
|
||||
|
||||
// Add jitter to prevent thundering herd: ±jitterFactor
|
||||
const jitter = cappedDelay * config.jitterFactor * (Math.random() * 2 - 1);
|
||||
|
||||
return Math.floor(cappedDelay + jitter);
|
||||
}
|
||||
|
||||
// Examples with defaults:
|
||||
// Attempt 0 (first retry): ~1000ms ±100ms
|
||||
// Attempt 1: ~2000ms ±200ms
|
||||
// Attempt 2: ~4000ms ±400ms
|
||||
// Attempt 3: ~8000ms ±800ms
|
||||
// Attempt 4: ~16000ms ±1600ms
|
||||
// ...up to max 300000ms (5 min)
|
||||
```
|
||||
|
||||
### 4.2 Retry Flow
|
||||
|
||||
```typescript
|
||||
async function handleTaskFailure(
|
||||
taskId: string,
|
||||
error: Error,
|
||||
workerId: string
|
||||
): Promise<void> {
|
||||
const taskKey = `ade:task:${taskId}`;
|
||||
const task = await redis.hGetAll(taskKey);
|
||||
|
||||
const attemptCount = parseInt(task.attemptCount) + 1;
|
||||
const maxAttempts = parseInt(task.maxAttempts);
|
||||
|
||||
if (attemptCount >= maxAttempts) {
|
||||
// Final failure - mark as failed
|
||||
await redis.hSet(taskKey, {
|
||||
status: "failed",
|
||||
error: error.message,
|
||||
completedAt: Date.now(),
|
||||
attemptCount: attemptCount.toString()
|
||||
});
|
||||
|
||||
// Publish failure event for observers
|
||||
await redis.publish("ade:events:task-failed", JSON.stringify({
|
||||
taskId,
|
||||
error: error.message,
|
||||
totalAttempts: attemptCount
|
||||
}));
|
||||
|
||||
// ACK to remove from pending
|
||||
await redis.xAck("ade:queue:tasks", "ade-workers", taskId);
|
||||
} else {
|
||||
// Schedule retry
|
||||
const delay = calculateRetryDelay({
|
||||
attempt: attemptCount,
|
||||
baseDelayMs: 1000,
|
||||
multiplier: 2,
|
||||
maxDelayMs: 300000,
|
||||
jitterFactor: 0.1
|
||||
});
|
||||
|
||||
const runAfter = Date.now() + delay;
|
||||
|
||||
// Update task state
|
||||
await redis.hSet(taskKey, {
|
||||
status: "pending",
|
||||
attemptCount: attemptCount.toString(),
|
||||
error: error.message,
|
||||
workerId: "" // Clear worker assignment
|
||||
});
|
||||
|
||||
// Add to delayed queue
|
||||
await redis.zAdd("ade:queue:delayed", {
|
||||
score: runAfter,
|
||||
value: taskId
|
||||
});
|
||||
|
||||
// ACK to remove from stream pending
|
||||
await redis.xAck("ade:queue:tasks", "ade-workers", taskId);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 4.3 Delayed Task Promoter
|
||||
|
||||
```typescript
|
||||
// Runs periodically (every 1 second) to move due tasks from delayed set to stream
|
||||
|
||||
async function promoteDelayedTasks(): Promise<number> {
|
||||
const now = Date.now();
|
||||
|
||||
// Atomically get and remove due tasks
|
||||
const dueTasks = await redis.zRangeByScore(
|
||||
"ade:queue:delayed",
|
||||
"-inf",
|
||||
now,
|
||||
{ LIMIT: { offset: 0, count: 100 } }
|
||||
);
|
||||
|
||||
if (dueTasks.length === 0) return 0;
|
||||
|
||||
// Remove from delayed queue
|
||||
await redis.zRem("ade:queue:delayed", dueTasks);
|
||||
|
||||
// Re-add to stream for processing
|
||||
for (const taskId of dueTasks) {
|
||||
const task = await redis.hGetAll(`ade:task:${taskId}`);
|
||||
await redis.xAdd("ade:queue:tasks", "*", {
|
||||
taskId,
|
||||
payload: task.payload,
|
||||
priority: task.priority
|
||||
});
|
||||
}
|
||||
|
||||
return dueTasks.length;
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 5. Integration with Existing Task.ts
|
||||
|
||||
### 5.1 Adapter Pattern
|
||||
|
||||
```typescript
|
||||
// src/queue/redis/adapter.ts
|
||||
|
||||
import { QueueRuntime, QueueItem, DequeuedBatch } from "../queueRuntime";
|
||||
import { RedisQueue } from "./queue";
|
||||
|
||||
/**
|
||||
* Redis-backed implementation of QueueRuntime interface.
|
||||
* Allows drop-in replacement of in-memory queue.
|
||||
*/
|
||||
export class RedisQueueAdapter implements QueueRuntime {
|
||||
private redisQueue: RedisQueue;
|
||||
private localBatchBuffer: Map<string, QueueItem> = new Map();
|
||||
|
||||
constructor(redisUrl: string, options?: QueueRuntimeOptions) {
|
||||
this.redisQueue = new RedisQueue(redisUrl, {
|
||||
...options,
|
||||
onTaskCompleted: this.handleTaskCompleted.bind(this),
|
||||
onTaskFailed: this.handleTaskFailed.bind(this)
|
||||
});
|
||||
}
|
||||
|
||||
async enqueue(input: Omit<QueueItem, "id" | "enqueuedAt">): Promise<QueueItem | null> {
|
||||
// Map QueueItem to TaskPayload
|
||||
const taskId = generateUUID();
|
||||
const enqueuedAt = Date.now();
|
||||
|
||||
const payload: TaskPayload = {
|
||||
id: taskId,
|
||||
kind: input.kind,
|
||||
agentId: input.agentId,
|
||||
conversationId: input.conversationId,
|
||||
clientMessageId: input.clientMessageId,
|
||||
text: (input as any).text,
|
||||
content: (input as any).content,
|
||||
priority: 5, // Default priority
|
||||
maxAttempts: 3,
|
||||
backoffMultiplier: 2,
|
||||
maxBackoffMs: 300000,
|
||||
enqueuedAt,
|
||||
source: "user",
|
||||
isCoalescable: isCoalescable(input.kind)
|
||||
};
|
||||
|
||||
const success = await this.redisQueue.enqueue(payload);
|
||||
if (!success) return null;
|
||||
|
||||
return {
|
||||
...input,
|
||||
id: taskId,
|
||||
enqueuedAt
|
||||
} as QueueItem;
|
||||
}
|
||||
|
||||
async tryDequeue(blockedReason: QueueBlockedReason | null): Promise<DequeuedBatch | null> {
|
||||
if (blockedReason !== null) {
|
||||
// Emit blocked event if needed (preserving QueueRuntime behavior)
|
||||
return null;
|
||||
}
|
||||
|
||||
// Claim batch from Redis
|
||||
const batch = await this.redisQueue.claimBatch({
|
||||
consumerId: this.workerId,
|
||||
batchSize: this.getCoalescingBatchSize(),
|
||||
coalescingWindowMs: 50 // Small window for coalescing
|
||||
});
|
||||
|
||||
if (!batch || batch.length === 0) return null;
|
||||
|
||||
// Map back to QueueItem format
|
||||
const items: QueueItem[] = batch.map(task => this.mapTaskToQueueItem(task));
|
||||
|
||||
return {
|
||||
batchId: generateBatchId(),
|
||||
items,
|
||||
mergedCount: items.length,
|
||||
queueLenAfter: await this.redisQueue.getQueueLength()
|
||||
};
|
||||
}
|
||||
|
||||
// ... other QueueRuntime methods
|
||||
}
|
||||
```
|
||||
|
||||
### 5.2 Task.ts Integration Points
|
||||
|
||||
**Current Flow (Task.ts line 403+):**
|
||||
```typescript
|
||||
// Background task spawning
|
||||
const { taskId, outputFile, subagentId } = spawnBackgroundSubagentTask({
|
||||
subagentType: subagent_type,
|
||||
prompt,
|
||||
description,
|
||||
model,
|
||||
toolCallId,
|
||||
existingAgentId: args.agent_id,
|
||||
existingConversationId: args.conversation_id,
|
||||
maxTurns: args.max_turns,
|
||||
});
|
||||
```
|
||||
|
||||
**Proposed Redis Integration:**
|
||||
```typescript
|
||||
// New: Redis-backed task queue integration
|
||||
interface TaskQueueEnqueueOptions {
|
||||
subagentType: string;
|
||||
prompt: string;
|
||||
description: string;
|
||||
model?: string;
|
||||
toolCallId?: string;
|
||||
existingAgentId?: string;
|
||||
existingConversationId?: string;
|
||||
maxTurns?: number;
|
||||
priority?: number;
|
||||
runInBackground?: boolean;
|
||||
}
|
||||
|
||||
// In Task.ts - replace spawnBackgroundSubagentTask with:
|
||||
export async function enqueueSubagentTask(
|
||||
args: TaskQueueEnqueueOptions,
|
||||
queue: RedisQueue
|
||||
): Promise<TaskEnqueueResult> {
|
||||
|
||||
const taskId = generateTaskId();
|
||||
const subagentId = generateSubagentId();
|
||||
|
||||
// Register in subagent state store (for UI)
|
||||
registerSubagent(subagentId, args.subagentType, args.description, args.toolCallId, true);
|
||||
|
||||
const outputFile = createBackgroundOutputFile(taskId);
|
||||
|
||||
// Create task payload
|
||||
const payload: TaskPayload = {
|
||||
id: taskId,
|
||||
kind: "task_notification",
|
||||
subagentType: args.subagentType,
|
||||
prompt: args.prompt,
|
||||
description: args.description,
|
||||
model: args.model,
|
||||
existingAgentId: args.existingAgentId,
|
||||
existingConversationId: args.existingConversationId,
|
||||
maxTurns: args.maxTurns,
|
||||
subagentId,
|
||||
outputFile,
|
||||
priority: args.priority ?? 5,
|
||||
maxAttempts: 3,
|
||||
backoffMultiplier: 2,
|
||||
maxBackoffMs: 300000,
|
||||
enqueuedAt: Date.now(),
|
||||
source: "user",
|
||||
isCoalescable: false // Task notifications are not coalescable
|
||||
};
|
||||
|
||||
// Enqueue to Redis
|
||||
await queue.enqueue(payload);
|
||||
|
||||
return { taskId, outputFile, subagentId };
|
||||
}
|
||||
```
|
||||
|
||||
### 5.3 Worker Implementation for Subagents
|
||||
|
||||
```typescript
|
||||
// src/queue/redis/subagent-worker.ts
|
||||
|
||||
class SubagentTaskWorker extends TaskWorker {
|
||||
protected async processTask(task: TaskState): Promise<void> {
|
||||
// Update subagent state to "running"
|
||||
updateSubagent(task.subagentId!, { status: "running" });
|
||||
|
||||
try {
|
||||
// Execute subagent (existing manager.ts logic)
|
||||
const result = await spawnSubagent(
|
||||
task.subagentType!,
|
||||
task.prompt!,
|
||||
task.model,
|
||||
task.subagentId!,
|
||||
undefined, // signal - handled via task cancellation
|
||||
task.existingAgentId,
|
||||
task.existingConversationId,
|
||||
task.maxTurns
|
||||
);
|
||||
|
||||
// Write transcript
|
||||
writeTaskTranscriptResult(task.outputFile!, result, "");
|
||||
|
||||
// Complete subagent state
|
||||
completeSubagent(task.subagentId!, {
|
||||
success: result.success,
|
||||
error: result.error,
|
||||
totalTokens: result.totalTokens
|
||||
});
|
||||
|
||||
// Send notification if not silent
|
||||
if (!task.silent) {
|
||||
const notification = formatTaskNotification({
|
||||
taskId: task.id,
|
||||
status: result.success ? "completed" : "failed",
|
||||
summary: `Agent "${task.description}" ${result.success ? "completed" : "failed"}`,
|
||||
result: result.success ? result.report : result.error,
|
||||
outputFile: task.outputFile!
|
||||
});
|
||||
|
||||
// Add to message queue for parent agent
|
||||
addToMessageQueue({
|
||||
kind: "task_notification",
|
||||
text: notification
|
||||
});
|
||||
}
|
||||
|
||||
// Mark task completed
|
||||
await this.completeTask(task.id, result);
|
||||
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
|
||||
// Update subagent state
|
||||
completeSubagent(task.subagentId!, { success: false, error: errorMessage });
|
||||
|
||||
// Fail task (triggers retry logic)
|
||||
await this.failTask(task.id, new Error(errorMessage));
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 6. Operational Considerations
|
||||
|
||||
### 6.1 Redis Configuration
|
||||
|
||||
```yaml
|
||||
# Recommended Redis config for task queue
|
||||
maxmemory: 1gb
|
||||
maxmemory-policy: allkeys-lru # Evict old completed tasks first
|
||||
|
||||
# Persistence (for durability)
|
||||
appendonly: yes
|
||||
appendfsync: everysec
|
||||
|
||||
# Stream trimming (prevent unbounded growth)
|
||||
# Set via XTRIM or MAXLEN on XADD
|
||||
```
|
||||
|
||||
### 6.2 Key Patterns and Cleanup
|
||||
|
||||
| Key Pattern | Type | TTL | Cleanup Strategy |
|
||||
|-------------|------|-----|------------------|
|
||||
| `ade:queue:tasks` | Stream | - | XTRIM by MAXLEN (keep 100k) |
|
||||
| `ade:queue:delayed` | ZSET | - | Processed by promoter |
|
||||
| `ade:task:{id}` | Hash | 7 days | Expire completed/failed |
|
||||
| `ade:workers:active` | Hash | - | On worker deregistration |
|
||||
| `ade:workers:heartbeat` | ZSET | - | On worker timeout |
|
||||
|
||||
### 6.3 Monitoring Metrics
|
||||
|
||||
```typescript
|
||||
// Metrics to expose via Prometheus/StatsD
|
||||
interface QueueMetrics {
|
||||
// Queue depth
|
||||
"ade_queue_pending_total": number; // XPENDING count
|
||||
"ade_queue_delayed_total": number; // ZCARD ade:queue:delayed
|
||||
"ade_queue_stream_length": number; // XLEN ade:queue:tasks
|
||||
|
||||
// Throughput
|
||||
"ade_tasks_enqueued_rate": number; // XADD rate
|
||||
"ade_tasks_completed_rate": number; // Completion rate
|
||||
"ade_tasks_failed_rate": number; // Failure rate
|
||||
|
||||
// Worker health
|
||||
"ade_workers_active_total": number; // HLEN ade:workers:active
|
||||
"ade_workers_dead_total": number; // Detected dead workers
|
||||
|
||||
// Processing
|
||||
"ade_task_duration_ms": Histogram; // Time from claim to complete
|
||||
"ade_task_wait_ms": Histogram; // Time from enqueue to claim
|
||||
"ade_task_attempts": Histogram; // Distribution of retry counts
|
||||
}
|
||||
```
|
||||
|
||||
### 6.4 Failure Modes
|
||||
|
||||
| Scenario | Handling |
|
||||
|----------|----------|
|
||||
| Redis unavailable | Tasks fail immediately; caller responsible for retry |
|
||||
| Worker crash | Tasks reclaimed via heartbeat timeout (30s) |
|
||||
| Poison message | Max retries (3) then moved to DLQ |
|
||||
| Slow task | Processing timeout (5 min) triggers requeue |
|
||||
| Duplicate task | Idempotent task IDs (UUID) prevent double execution |
|
||||
|
||||
---
|
||||
|
||||
## 7. Migration Strategy
|
||||
|
||||
### Phase 1: Dual-Write (Week 1)
|
||||
- Implement RedisQueueAdapter
|
||||
- Write to both in-memory and Redis queues
|
||||
- Read from in-memory only (Redis for validation)
|
||||
|
||||
### Phase 2: Shadow Mode (Week 2)
|
||||
- Read from both queues
|
||||
- Compare results, log discrepancies
|
||||
- Fix any edge cases
|
||||
|
||||
### Phase 3: Cutover (Week 3)
|
||||
- Switch reads to Redis
|
||||
- Keep in-memory as fallback
|
||||
- Monitor for 1 week
|
||||
|
||||
### Phase 4: Cleanup (Week 4)
|
||||
- Remove in-memory queue code
|
||||
- Full Redis dependency
|
||||
|
||||
---
|
||||
|
||||
## 8. Implementation Checklist
|
||||
|
||||
- [ ] Redis client configuration (ioredis or node-redis)
|
||||
- [ ] Task entity schema and serialization
|
||||
- [ ] Stream consumer group setup
|
||||
- [ ] Worker registration and heartbeat
|
||||
- [ ] Task claim and processing loop
|
||||
- [ ] Retry logic with exponential backoff
|
||||
- [ ] Delayed task promotion
|
||||
- [ ] Dead worker detection and reclamation
|
||||
- [ ] QueueRuntime adapter implementation
|
||||
- [ ] Task.ts integration
|
||||
- [ ] Subagent state synchronization
|
||||
- [ ] Metrics and monitoring
|
||||
- [ ] Error handling and DLQ
|
||||
- [ ] Tests (unit, integration, load)
|
||||
- [ ] Documentation
|
||||
|
||||
---
|
||||
|
||||
## 9. Appendix: Redis Commands Reference
|
||||
|
||||
| Operation | Command | Complexity |
|
||||
|-----------|---------|------------|
|
||||
| Enqueue task | `XADD` | O(1) |
|
||||
| Claim tasks | `XREADGROUP` | O(N) N=count |
|
||||
| Ack completion | `XACK` | O(1) |
|
||||
| Get pending | `XPENDING` | O(1) |
|
||||
| Claim pending | `XCLAIM` / `XAUTOCLAIM` | O(log N) |
|
||||
| Delay task | `ZADD` delayed | O(log N) |
|
||||
| Promote delayed | `ZRANGEBYSCORE` + `ZREM` + `XADD` | O(log N + M) |
|
||||
| Register worker | `HSET` + `ZADD` | O(1) |
|
||||
| Heartbeat | `ZADD` | O(log N) |
|
||||
| Detect dead | `ZRANGEBYSCORE` | O(log N + M) |
|
||||
Reference in New Issue
Block a user