Merge queue-core: Redis Streams implementation

This commit is contained in:
Ani (Annie Tunturi)
2026-03-18 11:26:40 -04:00
8 changed files with 1824 additions and 0 deletions

13
jest.config.js Normal file
View File

@@ -0,0 +1,13 @@
module.exports = {
preset: 'ts-jest',
testEnvironment: 'node',
roots: ['<rootDir>/tests'],
testMatch: ['**/*.test.ts'],
collectCoverageFrom: [
'src/**/*.ts',
'!src/**/*.d.ts',
],
moduleNameMapper: {
'^@/(.*)$': '<rootDir>/src/$1',
},
};

33
package.json Normal file
View File

@@ -0,0 +1,33 @@
{
"name": "@community-ade/queue-core",
"version": "0.1.0",
"description": "Redis Streams queue implementation for Community ADE",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"build": "tsc",
"test": "jest",
"test:watch": "jest --watch",
"dev": "tsc --watch"
},
"keywords": [
"redis",
"queue",
"streams",
"tasks"
],
"author": "Community ADE Team",
"license": "MIT",
"dependencies": {
"ioredis": "^5.3.2",
"uuid": "^9.0.1"
},
"devDependencies": {
"@types/jest": "^29.5.12",
"@types/node": "^20.11.24",
"@types/uuid": "^9.0.8",
"jest": "^29.7.0",
"ts-jest": "^29.1.2",
"typescript": "^5.9.3"
}
}

28
src/index.ts Normal file
View File

@@ -0,0 +1,28 @@
/**
* Community ADE Queue Core
* Redis Streams implementation with consumer groups
*/
export { default as RedisQueue } from './queue/RedisQueue';
export { default as Worker, WorkerPool } from './queue/Worker';
export {
TaskPayload,
TaskState,
TaskStatus,
TaskKind,
WorkerInfo,
ClaimOptions,
ClaimResult,
RetryConfig,
DEFAULT_RETRY_CONFIG,
TASK_CONSTANTS,
calculateRetryDelay,
serializeTask,
deserializeTask,
serializeTaskForStream,
deserializeTaskFromStream,
getTaskKey,
} from './queue/Task';
export type { RedisQueueOptions, EnqueueResult, TaskStats } from './queue/RedisQueue';
export type { WorkerOptions, TaskHandler, WorkerState } from './queue/Worker';

590
src/queue/RedisQueue.ts Normal file
View File

@@ -0,0 +1,590 @@
/**
* Redis Streams Queue Implementation
* Uses Redis Streams with consumer groups for reliable task processing
*/
import Redis from "ioredis";
import { v4 as uuidv4 } from "uuid";
import {
TaskPayload,
TaskState,
TaskStatus,
ClaimOptions,
serializeTask,
deserializeTask,
serializeTaskForStream,
getTaskKey,
TASK_CONSTANTS,
calculateRetryDelay,
RetryConfig,
} from "./Task";
export interface RedisQueueOptions {
redisUrl?: string;
streamKey?: string;
consumerGroup?: string;
heartbeatTimeoutMs?: number;
processingTimeoutMs?: number;
defaultMaxAttempts?: number;
}
export interface EnqueueResult {
success: boolean;
taskId?: string;
error?: string;
}
export interface TaskStats {
pending: number;
delayed: number;
streamLength: number;
activeWorkers: number;
}
export class RedisQueue {
private redis: Redis;
private streamKey: string;
private consumerGroup: string;
private delayedKey: string;
private workersActiveKey: string;
private workersHeartbeatKey: string;
private heartbeatTimeoutMs: number;
private processingTimeoutMs: number;
private defaultMaxAttempts: number;
constructor(options: RedisQueueOptions = {}) {
this.redis = new Redis(options.redisUrl || "redis://localhost:6379/0");
this.streamKey = options.streamKey || TASK_CONSTANTS.STREAM_KEY;
this.consumerGroup = options.consumerGroup || TASK_CONSTANTS.CONSUMER_GROUP;
this.delayedKey = TASK_CONSTANTS.DELAYED_KEY;
this.workersActiveKey = TASK_CONSTANTS.WORKERS_ACTIVE_KEY;
this.workersHeartbeatKey = TASK_CONSTANTS.WORKERS_HEARTBEAT_KEY;
this.heartbeatTimeoutMs = options.heartbeatTimeoutMs || TASK_CONSTANTS.HEARTBEAT_TIMEOUT_MS;
this.processingTimeoutMs = options.processingTimeoutMs || TASK_CONSTANTS.PROCESSING_TIMEOUT_MS;
this.defaultMaxAttempts = options.defaultMaxAttempts || TASK_CONSTANTS.DEFAULT_MAX_ATTEMPTS;
}
/**
* Initialize the consumer group (idempotent)
*/
async initialize(): Promise<void> {
try {
await this.redis.xgroup("CREATE", this.streamKey, this.consumerGroup, "$", "MKSTREAM");
} catch (err: any) {
// Group already exists - ignore
if (!err.message?.includes("already exists")) {
throw err;
}
}
}
/**
* Enqueue a task to the queue
*/
async enqueue(payload: TaskPayload): Promise<EnqueueResult> {
const taskId = payload.id || uuidv4();
const now = Date.now();
const task: TaskState = {
...payload,
id: taskId,
status: "pending",
attemptCount: 0,
enqueuedAt: payload.enqueuedAt || now,
maxAttempts: payload.maxAttempts || this.defaultMaxAttempts,
backoffMultiplier: payload.backoffMultiplier || 2,
maxBackoffMs: payload.maxBackoffMs || 300000,
priority: payload.priority ?? TASK_CONSTANTS.DEFAULT_PRIORITY,
};
try {
// Store task state in hash
const taskKey = getTaskKey(taskId);
await this.redis.hset(taskKey, serializeTask(task));
// Set TTL for completed/failed tasks
await this.redis.expire(taskKey, TASK_CONSTANTS.TASK_TTL_SECONDS);
// Check if task should be delayed
if (task.runAfter && task.runAfter > now) {
// Add to delayed queue
await this.redis.zadd(this.delayedKey, task.runAfter.toString(), taskId);
} else {
// Add to stream for immediate processing
await this.redis.xadd(
this.streamKey,
"*",
...this.flattenObject(serializeTaskForStream(task))
);
}
return { success: true, taskId };
} catch (err: any) {
return { success: false, error: err.message };
}
}
/**
* Claim tasks from the queue using consumer groups
*/
async claimTasks(
consumerId: string,
options: ClaimOptions = {}
): Promise<TaskState[]> {
const batchSize = options.batchSize || TASK_CONSTANTS.CLAIM_BATCH_SIZE;
const blockMs = options.blockMs || 5000;
try {
const result = await this.redis.xreadgroup(
"GROUP",
this.consumerGroup,
consumerId,
"COUNT",
batchSize,
"BLOCK",
blockMs,
"STREAMS",
this.streamKey,
">"
) as [string, [string, string[]][]][] | null;
if (!result || result.length === 0) {
return [];
}
const messages = result[0][1];
if (!messages || messages.length === 0) {
return [];
}
const tasks: TaskState[] = [];
for (const [messageId, fields] of messages) {
const data = this.unflattenArray(fields as string[]);
const taskId = data.taskId;
if (!taskId) {
continue;
}
// Get full task state from hash
const taskKey = getTaskKey(taskId);
const taskData = await this.redis.hgetall(taskKey);
if (!taskData || Object.keys(taskData).length === 0) {
// Task not found, acknowledge to clean up
await this.redis.xack(this.streamKey, this.consumerGroup, messageId);
continue;
}
const task = deserializeTask(taskData);
task.status = "claimed";
task.workerId = consumerId;
task.startedAt = Date.now();
// Update task state
await this.redis.hset(taskKey, serializeTask(task));
// Store message ID for later acknowledgment
(task as any).__messageId = messageId;
tasks.push(task);
}
return tasks;
} catch (err: any) {
console.error("Error claiming tasks:", err);
return [];
}
}
/**
* Mark a task as completed
*/
async completeTask(taskId: string, result?: unknown): Promise<boolean> {
const taskKey = getTaskKey(taskId);
try {
const taskData = await this.redis.hgetall(taskKey);
if (!taskData || Object.keys(taskData).length === 0) {
return false;
}
const task = deserializeTask(taskData);
task.status = "completed";
task.completedAt = Date.now();
task.result = result;
await this.redis.hset(taskKey, serializeTask(task));
// Acknowledge in stream to remove from pending
await this.acknowledgeTask(taskId);
return true;
} catch (err: any) {
console.error("Error completing task:", err);
return false;
}
}
/**
* Mark a task as failed (with retry logic)
*/
async failTask(taskId: string, error: string): Promise<boolean> {
const taskKey = getTaskKey(taskId);
try {
const taskData = await this.redis.hgetall(taskKey);
if (!taskData || Object.keys(taskData).length === 0) {
return false;
}
const task = deserializeTask(taskData);
const attemptCount = task.attemptCount + 1;
if (attemptCount >= task.maxAttempts) {
// Final failure
task.status = "failed";
task.completedAt = Date.now();
task.error = error;
task.attemptCount = attemptCount;
await this.redis.hset(taskKey, serializeTask(task));
await this.acknowledgeTask(taskId);
} else {
// Schedule retry
const delay = calculateRetryDelay({
attempt: attemptCount,
baseDelayMs: 1000,
multiplier: task.backoffMultiplier,
maxDelayMs: task.maxBackoffMs,
jitterFactor: 0.1,
});
const runAfter = Date.now() + delay;
task.status = "pending";
task.attemptCount = attemptCount;
task.error = error;
task.workerId = undefined;
task.startedAt = undefined;
task.runAfter = runAfter;
await this.redis.hset(taskKey, serializeTask(task));
// Add to delayed queue
await this.redis.zadd(this.delayedKey, runAfter.toString(), taskId);
// Acknowledge to remove from pending
await this.acknowledgeTask(taskId);
}
return true;
} catch (err: any) {
console.error("Error failing task:", err);
return false;
}
}
/**
* Acknowledge a task in the stream (remove from pending)
*/
private async acknowledgeTask(taskId: string): Promise<void> {
// Find and acknowledge the message by task ID
try {
const pending = await this.redis.xpending(
this.streamKey,
this.consumerGroup,
"-",
"+",
100
);
for (const item of pending) {
const [id, , , messageCount] = item as [string, string, number, number];
// Claim and then acknowledge
const claimed = await this.redis.xclaim(
this.streamKey,
this.consumerGroup,
"system",
0,
id
);
if (claimed && claimed.length > 0) {
await this.redis.xack(this.streamKey, this.consumerGroup, id);
}
}
} catch (err) {
// Best effort acknowledgment
}
}
/**
* Promote delayed tasks that are ready to run
*/
async promoteDelayedTasks(batchSize: number = 100): Promise<number> {
const now = Date.now();
try {
// Get tasks that are due
const dueTasks = await this.redis.zrangebyscore(
this.delayedKey,
0,
now,
"LIMIT",
0,
batchSize
);
if (!dueTasks || dueTasks.length === 0) {
return 0;
}
// Remove from delayed queue
await this.redis.zrem(this.delayedKey, ...dueTasks);
// Add to stream
for (const taskId of dueTasks) {
const taskKey = getTaskKey(taskId);
const taskData = await this.redis.hgetall(taskKey);
if (taskData && Object.keys(taskData).length > 0) {
const task = deserializeTask(taskData);
await this.redis.xadd(
this.streamKey,
"*",
...this.flattenObject(serializeTaskForStream(task))
);
}
}
return dueTasks.length;
} catch (err: any) {
console.error("Error promoting delayed tasks:", err);
return 0;
}
}
/**
* Register a worker
*/
async registerWorker(workerId: string, info: {
hostname: string;
pid: number;
version?: string;
}): Promise<void> {
const now = Date.now();
const workerInfo = {
...info,
startedAt: now,
lastHeartbeat: now,
version: info.version || "unknown",
};
await this.redis.hset(
this.workersActiveKey,
workerId,
JSON.stringify(workerInfo)
);
await this.redis.zadd(this.workersHeartbeatKey, now.toString(), workerId);
}
/**
* Update worker heartbeat
*/
async updateHeartbeat(workerId: string): Promise<void> {
const now = Date.now();
await this.redis.zadd(this.workersHeartbeatKey, now.toString(), workerId);
// Update worker info
const existing = await this.redis.hget(this.workersActiveKey, workerId);
if (existing) {
const info = JSON.parse(existing);
info.lastHeartbeat = now;
await this.redis.hset(this.workersActiveKey, workerId, JSON.stringify(info));
}
}
/**
* Deregister a worker
*/
async deregisterWorker(workerId: string): Promise<void> {
await this.redis.hdel(this.workersActiveKey, workerId);
await this.redis.zrem(this.workersHeartbeatKey, workerId);
}
/**
* Detect and reclaim tasks from dead workers
*/
async reclaimDeadWorkerTasks(): Promise<number> {
const now = Date.now();
const cutoff = now - this.heartbeatTimeoutMs;
try {
// Find dead workers
const deadWorkers = await this.redis.zrangebyscore(
this.workersHeartbeatKey,
0,
cutoff
);
let reclaimedCount = 0;
for (const workerId of deadWorkers) {
// Get pending tasks for this worker
const pending = await this.redis.xpending(
this.streamKey,
this.consumerGroup,
"-",
"+",
100
);
for (const item of pending) {
const [id, consumer, idleTime] = item as [string, string, number];
if (consumer === workerId && idleTime > this.processingTimeoutMs) {
// Reclaim the task
const claimed = await this.redis.xclaim(
this.streamKey,
this.consumerGroup,
"system",
this.processingTimeoutMs,
id
);
if (claimed && claimed.length > 0) {
// Acknowledge and retry
await this.redis.xack(this.streamKey, this.consumerGroup, id);
// Get task info and schedule retry
const entry = claimed[0] as [string, string[]];
const fields = entry[1];
const data = this.unflattenArray(fields);
if (data.taskId) {
await this.failTask(data.taskId, "Worker died during processing");
reclaimedCount++;
}
}
}
}
// Clean up dead worker
await this.redis.hdel(this.workersActiveKey, workerId);
await this.redis.zrem(this.workersHeartbeatKey, workerId);
}
return reclaimedCount;
} catch (err: any) {
console.error("Error reclaiming dead worker tasks:", err);
return 0;
}
}
/**
* Get queue statistics
*/
async getStats(): Promise<TaskStats> {
const [pending, delayed, streamLength, activeWorkers] = await Promise.all([
this.redis.xpending(this.streamKey, this.consumerGroup).then(
(result) => (result as [number, string, string, [string, string][]])?.[0] || 0
),
this.redis.zcard(this.delayedKey),
this.redis.xlen(this.streamKey),
this.redis.hlen(this.workersActiveKey),
]);
return {
pending: pending as number,
delayed,
streamLength,
activeWorkers,
};
}
/**
* Get task by ID
*/
async getTask(taskId: string): Promise<TaskState | null> {
const taskKey = getTaskKey(taskId);
const taskData = await this.redis.hgetall(taskKey);
if (!taskData || Object.keys(taskData).length === 0) {
return null;
}
return deserializeTask(taskData);
}
/**
* Cancel a task
*/
async cancelTask(taskId: string): Promise<boolean> {
const taskKey = getTaskKey(taskId);
try {
const taskData = await this.redis.hgetall(taskKey);
if (!taskData || Object.keys(taskData).length === 0) {
return false;
}
const task = deserializeTask(taskData);
if (task.status === "completed" || task.status === "failed") {
return false; // Can't cancel completed/failed tasks
}
task.status = "cancelled";
task.completedAt = Date.now();
await this.redis.hset(taskKey, serializeTask(task));
await this.acknowledgeTask(taskId);
return true;
} catch (err: any) {
console.error("Error cancelling task:", err);
return false;
}
}
/**
* Close the Redis connection
*/
async close(): Promise<void> {
await this.redis.quit();
}
/**
* Helper: Flatten object to array of key-value pairs
*/
private flattenObject(obj: Record<string, string>): string[] {
const result: string[] = [];
for (const [key, value] of Object.entries(obj)) {
result.push(key, value);
}
return result;
}
/**
* Helper: Unflatten array to object
*/
private unflattenArray(arr: string[]): Record<string, string> {
const result: Record<string, string> = {};
for (let i = 0; i < arr.length; i += 2) {
result[arr[i]] = arr[i + 1];
}
return result;
}
}
export default RedisQueue;

247
src/queue/Task.ts Normal file
View File

@@ -0,0 +1,247 @@
/**
* Task types and interfaces for Redis Streams queue
* Based on ade-redis-queue-design.md
*/
export type TaskStatus =
| "pending" // Enqueued, not yet claimed
| "claimed" // Claimed by worker, processing
| "running" // Actively being processed
| "completed" // Successfully finished
| "failed" // Exhausted all retries
| "cancelled"; // Explicitly cancelled
export type TaskKind =
| "message"
| "task_notification"
| "approval_result"
| "overlay_action";
export interface TaskPayload {
// Task identification
id: string; // UUID v4
kind: TaskKind;
// Execution context
agentId?: string;
conversationId?: string;
clientMessageId?: string;
// Content (varies by kind)
content?: unknown; // For "message" kind
text?: string; // For notification/approval/overlay
// Subagent execution params (for task_notification)
subagentType?: string;
prompt?: string;
model?: string;
existingAgentId?: string;
existingConversationId?: string;
maxTurns?: number;
// Scheduling
priority: number; // 0-9, lower = higher priority
runAfter?: number; // Timestamp ms (for delayed tasks)
// Retry configuration
maxAttempts: number;
backoffMultiplier: number; // Default: 2
maxBackoffMs: number; // Default: 300000 (5 min)
// Metadata
enqueuedAt: number;
source: "user" | "system" | "hook";
// Coalescing support
isCoalescable?: boolean;
scopeKey?: string; // For grouping coalescable items
}
export interface TaskState extends TaskPayload {
status: TaskStatus;
workerId?: string;
attemptCount: number;
startedAt?: number;
completedAt?: number;
error?: string;
result?: unknown;
}
export interface WorkerInfo {
workerId: string;
hostname: string;
pid: number;
startedAt: number;
lastHeartbeat: number;
version: string;
}
export interface ClaimOptions {
batchSize?: number;
blockMs?: number;
coalescingWindowMs?: number;
}
export interface ClaimResult {
tasks: TaskState[];
batchId: string;
}
// Retry configuration interface
export interface RetryConfig {
attempt: number; // 0-indexed (0 = first retry)
baseDelayMs: number; // Default: 1000
multiplier: number; // Default: 2
maxDelayMs: number; // Default: 300000 (5 min)
jitterFactor: number; // Default: 0.1 (10% randomization)
}
// Default retry configuration
export const DEFAULT_RETRY_CONFIG: RetryConfig = {
attempt: 0,
baseDelayMs: 1000,
multiplier: 2,
maxDelayMs: 300000,
jitterFactor: 0.1
};
// Task constants
export const TASK_CONSTANTS = {
// Redis keys
STREAM_KEY: "ade:queue:tasks",
DELAYED_KEY: "ade:queue:delayed",
CONSUMER_GROUP: "ade-workers",
WORKERS_ACTIVE_KEY: "ade:workers:active",
WORKERS_HEARTBEAT_KEY: "ade:workers:heartbeat",
// Timing
HEARTBEAT_INTERVAL_MS: 5000,
HEARTBEAT_TIMEOUT_MS: 30000,
CLAIM_BATCH_SIZE: 10,
PROCESSING_TIMEOUT_MS: 300000, // 5 min
// Task state
DEFAULT_MAX_ATTEMPTS: 3,
DEFAULT_PRIORITY: 5,
// TTL
TASK_TTL_SECONDS: 7 * 24 * 60 * 60, // 7 days
} as const;
/**
* Calculate retry delay with exponential backoff and jitter
*/
export function calculateRetryDelay(config: Partial<RetryConfig> = {}): number {
const fullConfig = { ...DEFAULT_RETRY_CONFIG, ...config };
// Exponential backoff: base * (multiplier ^ attempt)
const exponentialDelay = fullConfig.baseDelayMs *
Math.pow(fullConfig.multiplier, fullConfig.attempt);
// Cap at max
const cappedDelay = Math.min(exponentialDelay, fullConfig.maxDelayMs);
// Add jitter to prevent thundering herd: ±jitterFactor
const jitter = cappedDelay * fullConfig.jitterFactor * (Math.random() * 2 - 1);
return Math.floor(cappedDelay + jitter);
}
/**
* Serialize task state for Redis Hash storage
*/
export function serializeTask(task: TaskState): Record<string, string> {
return {
id: task.id,
kind: task.kind,
status: task.status,
agentId: task.agentId ?? "",
conversationId: task.conversationId ?? "",
clientMessageId: task.clientMessageId ?? "",
content: task.content ? JSON.stringify(task.content) : "",
text: task.text ?? "",
subagentType: task.subagentType ?? "",
prompt: task.prompt ?? "",
model: task.model ?? "",
existingAgentId: task.existingAgentId ?? "",
existingConversationId: task.existingConversationId ?? "",
maxTurns: task.maxTurns?.toString() ?? "",
priority: task.priority.toString(),
runAfter: task.runAfter?.toString() ?? "",
maxAttempts: task.maxAttempts.toString(),
backoffMultiplier: task.backoffMultiplier.toString(),
maxBackoffMs: task.maxBackoffMs.toString(),
enqueuedAt: task.enqueuedAt.toString(),
source: task.source,
isCoalescable: task.isCoalescable ? "1" : "0",
scopeKey: task.scopeKey ?? "",
workerId: task.workerId ?? "",
attemptCount: task.attemptCount.toString(),
startedAt: task.startedAt?.toString() ?? "",
completedAt: task.completedAt?.toString() ?? "",
error: task.error ?? "",
result: task.result ? JSON.stringify(task.result) : "",
};
}
/**
* Deserialize task state from Redis Hash storage
*/
export function deserializeTask(data: Record<string, string>): TaskState {
return {
id: data.id,
kind: data.kind as TaskKind,
status: data.status as TaskStatus,
agentId: data.agentId || undefined,
conversationId: data.conversationId || undefined,
clientMessageId: data.clientMessageId || undefined,
content: data.content ? JSON.parse(data.content) : undefined,
text: data.text || undefined,
subagentType: data.subagentType || undefined,
prompt: data.prompt || undefined,
model: data.model || undefined,
existingAgentId: data.existingAgentId || undefined,
existingConversationId: data.existingConversationId || undefined,
maxTurns: data.maxTurns ? parseInt(data.maxTurns) : undefined,
priority: parseInt(data.priority),
runAfter: data.runAfter ? parseInt(data.runAfter) : undefined,
maxAttempts: parseInt(data.maxAttempts),
backoffMultiplier: parseInt(data.backoffMultiplier),
maxBackoffMs: parseInt(data.maxBackoffMs),
enqueuedAt: parseInt(data.enqueuedAt),
source: data.source as "user" | "system" | "hook",
isCoalescable: data.isCoalescable === "1",
scopeKey: data.scopeKey || undefined,
workerId: data.workerId || undefined,
attemptCount: parseInt(data.attemptCount),
startedAt: data.startedAt ? parseInt(data.startedAt) : undefined,
completedAt: data.completedAt ? parseInt(data.completedAt) : undefined,
error: data.error || undefined,
result: data.result ? JSON.parse(data.result) : undefined,
};
}
/**
* Serialize task payload for stream entry
*/
export function serializeTaskForStream(task: TaskPayload): Record<string, string> {
return {
taskId: task.id,
payload: JSON.stringify(task),
priority: task.priority.toString(),
};
}
/**
* Deserialize task from stream entry
*/
export function deserializeTaskFromStream(data: Record<string, string>): TaskPayload {
return JSON.parse(data.payload) as TaskPayload;
}
/**
* Generate Redis key for task state hash
*/
export function getTaskKey(taskId: string): string {
return `ade:task:${taskId}`;
}

384
src/queue/Worker.ts Normal file
View File

@@ -0,0 +1,384 @@
/**
* Worker implementation with heartbeat and task claiming
* Manages the lifecycle of a worker processing tasks from Redis Streams
*/
import { v4 as uuidv4 } from "uuid";
import os from "os";
import RedisQueue from "./RedisQueue";
import {
TaskState,
ClaimOptions,
WorkerInfo,
TASK_CONSTANTS,
} from "./Task";
export interface WorkerOptions {
queue: RedisQueue;
workerId?: string;
heartbeatIntervalMs?: number;
claimIntervalMs?: number;
batchSize?: number;
maxConcurrentTasks?: number;
version?: string;
}
export interface TaskHandler {
(task: TaskState): Promise<unknown>;
}
export type WorkerState =
| "idle"
| "claiming"
| "processing"
| "stopping"
| "stopped";
export class Worker {
public readonly workerId: string;
private queue: RedisQueue;
private heartbeatIntervalMs: number;
private claimIntervalMs: number;
private batchSize: number;
private maxConcurrentTasks: number;
private version: string;
private state: WorkerState = "idle";
private isRunning: boolean = false;
private heartbeatTimer?: NodeJS.Timeout;
private claimTimer?: NodeJS.Timeout;
private taskHandler?: TaskHandler;
private activeTasks: Map<string, TaskState> = new Map();
constructor(options: WorkerOptions) {
this.workerId = options.workerId || this.generateWorkerId();
this.queue = options.queue;
this.heartbeatIntervalMs = options.heartbeatIntervalMs || TASK_CONSTANTS.HEARTBEAT_INTERVAL_MS;
this.claimIntervalMs = options.claimIntervalMs || 1000;
this.batchSize = options.batchSize || TASK_CONSTANTS.CLAIM_BATCH_SIZE;
this.maxConcurrentTasks = options.maxConcurrentTasks || 5;
this.version = options.version || process.env.npm_package_version || "unknown";
}
/**
* Generate a unique worker ID
*/
private generateWorkerId(): string {
const hostname = os.hostname();
const pid = process.pid;
const uuid = uuidv4().slice(0, 8);
return `${hostname}:${pid}:${uuid}`;
}
/**
* Start the worker
*/
async start(taskHandler: TaskHandler): Promise<void> {
if (this.isRunning) {
throw new Error("Worker is already running");
}
this.taskHandler = taskHandler;
this.isRunning = true;
this.state = "idle";
// Register worker with queue
await this.queue.registerWorker(this.workerId, {
hostname: os.hostname(),
pid: process.pid,
version: this.version,
});
// Initialize queue consumer group
await this.queue.initialize();
// Start heartbeat
this.startHeartbeat();
// Start claim loop
this.startClaimLoop();
console.log(`[Worker ${this.workerId}] Started`);
}
/**
* Stop the worker gracefully
*/
async stop(graceful: boolean = true, timeoutMs: number = 30000): Promise<void> {
if (!this.isRunning) {
return;
}
console.log(`[Worker ${this.workerId}] Stopping...`);
this.state = "stopping";
this.isRunning = false;
// Stop timers
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = undefined;
}
if (this.claimTimer) {
clearInterval(this.claimTimer);
this.claimTimer = undefined;
}
// Wait for active tasks to complete if graceful
if (graceful && this.activeTasks.size > 0) {
console.log(`[Worker ${this.workerId}] Waiting for ${this.activeTasks.size} active tasks...`);
const startTime = Date.now();
while (this.activeTasks.size > 0 && Date.now() - startTime < timeoutMs) {
await this.sleep(100);
}
if (this.activeTasks.size > 0) {
console.log(`[Worker ${this.workerId}] Timeout reached, ${this.activeTasks.size} tasks still active`);
}
}
// Deregister worker
await this.queue.deregisterWorker(this.workerId);
this.state = "stopped";
console.log(`[Worker ${this.workerId}] Stopped`);
}
/**
* Get current worker state
*/
getState(): WorkerState {
return this.state;
}
/**
* Get number of active tasks
*/
getActiveTaskCount(): number {
return this.activeTasks.size;
}
/**
* Get worker info
*/
getInfo(): WorkerInfo {
return {
workerId: this.workerId,
hostname: os.hostname(),
pid: process.pid,
startedAt: Date.now(),
lastHeartbeat: Date.now(),
version: this.version,
};
}
/**
* Start the heartbeat loop
*/
private startHeartbeat(): void {
// Send initial heartbeat
this.sendHeartbeat();
// Schedule periodic heartbeats
this.heartbeatTimer = setInterval(() => {
this.sendHeartbeat().catch((err) => {
console.error(`[Worker ${this.workerId}] Heartbeat error:`, err);
});
}, this.heartbeatIntervalMs);
}
/**
* Send a heartbeat
*/
private async sendHeartbeat(): Promise<void> {
try {
await this.queue.updateHeartbeat(this.workerId);
} catch (err) {
console.error(`[Worker ${this.workerId}] Failed to send heartbeat:`, err);
}
}
/**
* Start the task claim loop
*/
private startClaimLoop(): void {
// Immediate first claim
this.claimTasks().catch((err) => {
console.error(`[Worker ${this.workerId}] Initial claim error:`, err);
});
// Schedule periodic claims
this.claimTimer = setInterval(() => {
this.claimTasks().catch((err) => {
console.error(`[Worker ${this.workerId}] Claim error:`, err);
});
}, this.claimIntervalMs);
}
/**
* Claim and process tasks
*/
private async claimTasks(): Promise<void> {
if (!this.isRunning) {
return;
}
// Check if we have capacity for more tasks
const availableSlots = this.maxConcurrentTasks - this.activeTasks.size;
if (availableSlots <= 0) {
return;
}
this.state = "claiming";
try {
const options: ClaimOptions = {
batchSize: Math.min(this.batchSize, availableSlots),
blockMs: 1000,
};
const tasks = await this.queue.claimTasks(this.workerId, options);
if (tasks.length === 0) {
this.state = this.activeTasks.size > 0 ? "processing" : "idle";
return;
}
console.log(`[Worker ${this.workerId}] Claimed ${tasks.length} tasks`);
// Process tasks concurrently
for (const task of tasks) {
this.activeTasks.set(task.id, task);
this.processTask(task).catch((err) => {
console.error(`[Worker ${this.workerId}] Error processing task ${task.id}:`, err);
});
}
this.state = "processing";
} catch (err) {
console.error(`[Worker ${this.workerId}] Error claiming tasks:`, err);
this.state = this.activeTasks.size > 0 ? "processing" : "idle";
}
}
/**
* Process a single task
*/
private async processTask(task: TaskState): Promise<void> {
const taskId = task.id;
console.log(`[Worker ${this.workerId}] Processing task ${taskId} (${task.kind})`);
try {
if (!this.taskHandler) {
throw new Error("No task handler registered");
}
// Execute task handler
const result = await this.taskHandler(task);
// Mark task as completed
await this.queue.completeTask(taskId, result);
console.log(`[Worker ${this.workerId}] Completed task ${taskId}`);
} catch (err: any) {
console.error(`[Worker ${this.workerId}] Task ${taskId} failed:`, err);
// Mark task as failed (triggers retry logic)
await this.queue.failTask(taskId, err.message || String(err));
} finally {
this.activeTasks.delete(taskId);
if (this.activeTasks.size === 0 && this.isRunning) {
this.state = "idle";
}
}
}
/**
* Sleep helper
*/
private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}
/**
* Worker pool for managing multiple workers
*/
export class WorkerPool {
private workers: Worker[] = [];
private queue: RedisQueue;
private taskHandler: TaskHandler;
private options: Omit<WorkerOptions, "queue" | "taskHandler">;
constructor(
queue: RedisQueue,
taskHandler: TaskHandler,
options: Omit<WorkerOptions, "queue" | "taskHandler"> = {}
) {
this.queue = queue;
this.taskHandler = taskHandler;
this.options = options;
}
/**
* Start the pool with specified number of workers
*/
async start(workerCount: number): Promise<void> {
console.log(`[WorkerPool] Starting ${workerCount} workers...`);
for (let i = 0; i < workerCount; i++) {
const worker = new Worker({
...this.options,
queue: this.queue,
});
await worker.start(this.taskHandler);
this.workers.push(worker);
}
console.log(`[WorkerPool] All ${workerCount} workers started`);
}
/**
* Stop all workers
*/
async stop(graceful: boolean = true, timeoutMs: number = 30000): Promise<void> {
console.log(`[WorkerPool] Stopping ${this.workers.length} workers...`);
await Promise.all(
this.workers.map((worker) => worker.stop(graceful, timeoutMs))
);
this.workers = [];
console.log("[WorkerPool] All workers stopped");
}
/**
* Get stats for all workers
*/
getStats(): {
totalWorkers: number;
activeTasks: number;
workerStates: Record<string, number>;
} {
const workerStates: Record<string, number> = {};
let activeTasks = 0;
for (const worker of this.workers) {
const state = worker.getState();
workerStates[state] = (workerStates[state] || 0) + 1;
activeTasks += worker.getActiveTaskCount();
}
return {
totalWorkers: this.workers.length,
activeTasks,
workerStates,
};
}
}
export default Worker;

View File

@@ -0,0 +1,509 @@
/**
* Unit tests for RedisQueue
* Tests enqueue, claim, complete, fail, and retry functionality
*/
import RedisQueue from "../../src/queue/RedisQueue";
import {
TaskPayload,
TaskState,
TaskStatus,
calculateRetryDelay,
serializeTask,
deserializeTask,
} from "../../src/queue/Task";
// Mock ioredis
jest.mock("ioredis");
// Get the mocked Redis class
const MockedRedis = require("ioredis");
describe("RedisQueue", () => {
let queue: RedisQueue;
let mockRedis: any;
beforeEach(() => {
// Reset mocks
jest.clearAllMocks();
// Create mock Redis instance
mockRedis = {
hset: jest.fn().mockResolvedValue(1),
hget: jest.fn().mockResolvedValue(null),
hgetall: jest.fn().mockResolvedValue({}),
expire: jest.fn().mockResolvedValue(1),
zadd: jest.fn().mockResolvedValue(1),
zrem: jest.fn().mockResolvedValue(1),
zcard: jest.fn().mockResolvedValue(0),
zrangebyscore: jest.fn().mockResolvedValue([]),
xadd: jest.fn().mockResolvedValue("123-0"),
xreadgroup: jest.fn().mockResolvedValue(null),
xgroup: jest.fn().mockResolvedValue("OK"),
xpending: jest.fn().mockResolvedValue([0, null, null, []]),
xack: jest.fn().mockResolvedValue(1),
xclaim: jest.fn().mockResolvedValue([]),
xlen: jest.fn().mockResolvedValue(0),
hlen: jest.fn().mockResolvedValue(0),
hdel: jest.fn().mockResolvedValue(1),
quit: jest.fn().mockResolvedValue("OK"),
};
MockedRedis.mockImplementation(() => mockRedis);
queue = new RedisQueue({ redisUrl: "redis://localhost:6379/0" });
});
afterEach(async () => {
await queue.close();
});
describe("initialize", () => {
it("should create consumer group", async () => {
await queue.initialize();
expect(mockRedis.xgroup).toHaveBeenCalledWith(
"CREATE",
"ade:queue:tasks",
"ade-workers",
"$",
"MKSTREAM"
);
});
it("should not throw if group already exists", async () => {
mockRedis.xgroup.mockRejectedValueOnce(new Error("BUSYGROUP Consumer Group name already exists"));
await expect(queue.initialize()).resolves.not.toThrow();
});
});
describe("enqueue", () => {
const sampleTask: TaskPayload = {
id: "test-task-1",
kind: "task_notification",
priority: 5,
maxAttempts: 3,
backoffMultiplier: 2,
maxBackoffMs: 300000,
enqueuedAt: Date.now(),
source: "user",
text: "Test task",
};
it("should enqueue a task successfully", async () => {
const result = await queue.enqueue(sampleTask);
expect(result.success).toBe(true);
expect(result.taskId).toBe("test-task-1");
expect(mockRedis.hset).toHaveBeenCalled();
expect(mockRedis.expire).toHaveBeenCalled();
expect(mockRedis.xadd).toHaveBeenCalled();
});
it("should handle delayed tasks", async () => {
const delayedTask: TaskPayload = {
...sampleTask,
runAfter: Date.now() + 60000, // 1 minute delay
};
const result = await queue.enqueue(delayedTask);
expect(result.success).toBe(true);
expect(mockRedis.zadd).toHaveBeenCalled();
expect(mockRedis.xadd).not.toHaveBeenCalled();
});
it("should handle errors gracefully", async () => {
mockRedis.hset.mockRejectedValueOnce(new Error("Redis error"));
const result = await queue.enqueue(sampleTask);
expect(result.success).toBe(false);
expect(result.error).toBe("Redis error");
});
it("should generate task ID if not provided", async () => {
const taskWithoutId: Omit<TaskPayload, "id"> = {
kind: "message",
priority: 5,
maxAttempts: 3,
backoffMultiplier: 2,
maxBackoffMs: 300000,
enqueuedAt: Date.now(),
source: "system",
};
const result = await queue.enqueue(taskWithoutId as TaskPayload);
expect(result.success).toBe(true);
expect(result.taskId).toBeDefined();
expect(result.taskId).not.toBe("");
});
});
describe("claimTasks", () => {
const sampleTaskData: TaskState = {
id: "test-task-1",
kind: "task_notification",
status: "pending",
priority: 5,
maxAttempts: 3,
backoffMultiplier: 2,
maxBackoffMs: 300000,
enqueuedAt: Date.now(),
source: "user",
attemptCount: 0,
};
it("should claim tasks from the queue", async () => {
mockRedis.xreadgroup.mockResolvedValueOnce([
["ade:queue:tasks", [
["123-0", ["taskId", "test-task-1", "payload", "{}", "priority", "5"]]
]]
]);
mockRedis.hgetall.mockResolvedValueOnce({
id: "test-task-1",
kind: "task_notification",
status: "pending",
priority: "5",
maxAttempts: "3",
backoffMultiplier: "2",
maxBackoffMs: "300000",
enqueuedAt: Date.now().toString(),
source: "user",
attemptCount: "0",
});
const tasks = await queue.claimTasks("worker-1", { batchSize: 5 });
expect(tasks.length).toBeGreaterThan(0);
expect(tasks[0].id).toBe("test-task-1");
expect(tasks[0].status).toBe("claimed");
expect(tasks[0].workerId).toBe("worker-1");
});
it("should return empty array when no tasks available", async () => {
mockRedis.xreadgroup.mockResolvedValueOnce(null);
const tasks = await queue.claimTasks("worker-1");
expect(tasks).toEqual([]);
});
it("should skip tasks not found in hash", async () => {
mockRedis.xreadgroup.mockResolvedValueOnce([
["ade:queue:tasks", [
["123-0", ["taskId", "missing-task", "payload", "{}", "priority", "5"]]
]]
]);
mockRedis.hgetall.mockResolvedValueOnce({}); // Empty = not found
const tasks = await queue.claimTasks("worker-1");
expect(tasks).toEqual([]);
});
});
describe("completeTask", () => {
it("should mark task as completed", async () => {
const taskId = "test-task-1";
const result = { success: true, data: "completed" };
mockRedis.hgetall.mockResolvedValueOnce({
id: taskId,
kind: "task_notification",
status: "claimed",
priority: "5",
maxAttempts: "3",
backoffMultiplier: "2",
maxBackoffMs: "300000",
enqueuedAt: Date.now().toString(),
source: "user",
attemptCount: "0",
});
const success = await queue.completeTask(taskId, result);
expect(success).toBe(true);
expect(mockRedis.hset).toHaveBeenCalled();
});
it("should return false for non-existent task", async () => {
mockRedis.hgetall.mockResolvedValueOnce({});
const success = await queue.completeTask("missing-task");
expect(success).toBe(false);
});
});
describe("failTask", () => {
it("should mark task as failed on final attempt", async () => {
const taskId = "test-task-1";
mockRedis.hgetall.mockResolvedValueOnce({
id: taskId,
kind: "task_notification",
status: "claimed",
priority: "5",
maxAttempts: "2",
backoffMultiplier: "2",
maxBackoffMs: "300000",
enqueuedAt: Date.now().toString(),
source: "user",
attemptCount: "1", // Will become 2, which equals maxAttempts
});
const success = await queue.failTask(taskId, "Task failed");
expect(success).toBe(true);
expect(mockRedis.hset).toHaveBeenCalled();
// Should set status to failed
const hsetCall = mockRedis.hset.mock.calls[0];
expect(hsetCall[1].status).toBe("failed");
});
it("should schedule retry on non-final failure", async () => {
const taskId = "test-task-1";
mockRedis.hgetall.mockResolvedValueOnce({
id: taskId,
kind: "task_notification",
status: "claimed",
priority: "5",
maxAttempts: "3",
backoffMultiplier: "2",
maxBackoffMs: "300000",
enqueuedAt: Date.now().toString(),
source: "user",
attemptCount: "0", // Will become 1, retry allowed
});
const success = await queue.failTask(taskId, "Temporary error");
expect(success).toBe(true);
// Should add to delayed queue for retry
expect(mockRedis.zadd).toHaveBeenCalled();
expect(mockRedis.hset).toHaveBeenCalled();
// Should set status back to pending
const hsetCall = mockRedis.hset.mock.calls[0];
expect(hsetCall[1].status).toBe("pending");
});
});
describe("promoteDelayedTasks", () => {
it("should promote due delayed tasks", async () => {
const taskId = "delayed-task-1";
mockRedis.zrangebyscore.mockResolvedValueOnce([taskId]);
mockRedis.hgetall.mockResolvedValueOnce({
id: taskId,
kind: "task_notification",
status: "pending",
priority: "5",
maxAttempts: "3",
backoffMultiplier: "2",
maxBackoffMs: "300000",
enqueuedAt: Date.now().toString(),
source: "user",
attemptCount: "1",
});
const count = await queue.promoteDelayedTasks(10);
expect(count).toBe(1);
expect(mockRedis.zrem).toHaveBeenCalled();
expect(mockRedis.xadd).toHaveBeenCalled();
});
it("should return 0 when no delayed tasks", async () => {
mockRedis.zrangebyscore.mockResolvedValueOnce([]);
const count = await queue.promoteDelayedTasks();
expect(count).toBe(0);
expect(mockRedis.xadd).not.toHaveBeenCalled();
});
});
describe("worker registration", () => {
it("should register worker", async () => {
await queue.registerWorker("worker-1", {
hostname: "test-host",
pid: 12345,
version: "1.0.0",
});
expect(mockRedis.hset).toHaveBeenCalled();
expect(mockRedis.zadd).toHaveBeenCalled();
});
it("should update heartbeat", async () => {
await queue.updateHeartbeat("worker-1");
expect(mockRedis.zadd).toHaveBeenCalled();
});
it("should deregister worker", async () => {
await queue.deregisterWorker("worker-1");
expect(mockRedis.hdel).toHaveBeenCalled();
expect(mockRedis.zrem).toHaveBeenCalled();
});
});
describe("getStats", () => {
it("should return queue statistics", async () => {
mockRedis.xpending.mockResolvedValue([5, null, null, []]);
mockRedis.zcard.mockResolvedValue(3);
mockRedis.xlen.mockResolvedValue(100);
mockRedis.hlen.mockResolvedValue(2);
const stats = await queue.getStats();
expect(stats.pending).toBe(5);
expect(stats.delayed).toBe(3);
expect(stats.streamLength).toBe(100);
expect(stats.activeWorkers).toBe(2);
});
});
describe("getTask", () => {
it("should return task by ID", async () => {
const taskId = "test-task-1";
mockRedis.hgetall.mockResolvedValueOnce({
id: taskId,
kind: "task_notification",
status: "pending",
priority: "5",
maxAttempts: "3",
backoffMultiplier: "2",
maxBackoffMs: "300000",
enqueuedAt: "1234567890",
source: "user",
attemptCount: "0",
});
const task = await queue.getTask(taskId);
expect(task).not.toBeNull();
expect(task?.id).toBe(taskId);
expect(task?.kind).toBe("task_notification");
});
it("should return null for non-existent task", async () => {
mockRedis.hgetall.mockResolvedValueOnce({});
const task = await queue.getTask("missing-task");
expect(task).toBeNull();
});
});
describe("cancelTask", () => {
it("should cancel a pending task", async () => {
const taskId = "test-task-1";
mockRedis.hgetall.mockResolvedValueOnce({
id: taskId,
kind: "task_notification",
status: "pending",
priority: "5",
maxAttempts: "3",
backoffMultiplier: "2",
maxBackoffMs: "300000",
enqueuedAt: Date.now().toString(),
source: "user",
attemptCount: "0",
});
const success = await queue.cancelTask(taskId);
expect(success).toBe(true);
const hsetCall = mockRedis.hset.mock.calls[0];
expect(hsetCall[1].status).toBe("cancelled");
});
it("should not cancel completed tasks", async () => {
mockRedis.hgetall.mockResolvedValueOnce({
id: "test-task-1",
kind: "task_notification",
status: "completed",
priority: "5",
maxAttempts: "3",
backoffMultiplier: "2",
maxBackoffMs: "300000",
enqueuedAt: Date.now().toString(),
source: "user",
attemptCount: "0",
completedAt: Date.now().toString(),
});
const success = await queue.cancelTask("test-task-1");
expect(success).toBe(false);
});
});
});
describe("Task utilities", () => {
describe("calculateRetryDelay", () => {
it("should calculate exponential backoff", () => {
const delay0 = calculateRetryDelay({ attempt: 0, baseDelayMs: 1000 });
const delay1 = calculateRetryDelay({ attempt: 1, baseDelayMs: 1000 });
const delay2 = calculateRetryDelay({ attempt: 2, baseDelayMs: 1000 });
expect(delay0).toBeGreaterThanOrEqual(900); // With jitter
expect(delay0).toBeLessThanOrEqual(1100);
expect(delay1).toBeGreaterThanOrEqual(1800);
expect(delay1).toBeLessThanOrEqual(2200);
expect(delay2).toBeGreaterThanOrEqual(3600);
expect(delay2).toBeLessThanOrEqual(4400);
});
it("should cap at maxDelayMs", () => {
const delay = calculateRetryDelay({
attempt: 10,
baseDelayMs: 1000,
maxDelayMs: 5000,
jitterFactor: 0, // Disable jitter for this test
});
expect(delay).toBeLessThanOrEqual(5000);
});
});
describe("serializeTask / deserializeTask", () => {
it("should serialize and deserialize task state", () => {
const task: TaskState = {
id: "test-task",
kind: "task_notification",
status: "pending",
priority: 5,
maxAttempts: 3,
backoffMultiplier: 2,
maxBackoffMs: 300000,
enqueuedAt: 1234567890,
source: "user",
attemptCount: 0,
text: "Test task",
content: { key: "value" },
isCoalescable: true,
};
const serialized = serializeTask(task);
expect(serialized.id).toBe("test-task");
expect(serialized.content).toBe('{"key":"value"}');
expect(serialized.isCoalescable).toBe("1");
const deserialized = deserializeTask(serialized);
expect(deserialized.id).toBe(task.id);
expect(deserialized.kind).toBe(task.kind);
expect(deserialized.status).toBe(task.status);
expect(deserialized.content).toEqual(task.content);
expect(deserialized.isCoalescable).toBe(true);
});
});
});

20
tsconfig.json Normal file
View File

@@ -0,0 +1,20 @@
{
"compilerOptions": {
"target": "ES2020",
"module": "commonjs",
"lib": ["ES2020"],
"outDir": "./dist",
"rootDir": "./src",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"declaration": true,
"declarationMap": true,
"sourceMap": true,
"resolveJsonModule": true,
"moduleResolution": "node"
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist", "tests"]
}