Files
community-ade/tests/queue/RedisQueue.test.ts
Ani (Annie Tunturi) 02384e7520 queue-core: Redis Streams implementation
- Redis Streams implementation with consumer groups (ade-workers)
- Task interface with serialization/deserialization
- Worker with heartbeat (5s) and task claiming
- Retry logic with exponential backoff and jitter
- Delayed task support via Sorted Sets
- Dead worker reclamation (30s timeout)
- Unit tests: 26 passing
- TypeScript compilation successful
2026-03-18 10:46:27 -04:00

510 lines
15 KiB
TypeScript

/**
* Unit tests for RedisQueue
* Tests enqueue, claim, complete, fail, and retry functionality
*/
import RedisQueue from "../../src/queue/RedisQueue";
import {
TaskPayload,
TaskState,
TaskStatus,
calculateRetryDelay,
serializeTask,
deserializeTask,
} from "../../src/queue/Task";
// Mock ioredis
jest.mock("ioredis");
// Get the mocked Redis class
const MockedRedis = require("ioredis");
describe("RedisQueue", () => {
let queue: RedisQueue;
let mockRedis: any;
beforeEach(() => {
// Reset mocks
jest.clearAllMocks();
// Create mock Redis instance
mockRedis = {
hset: jest.fn().mockResolvedValue(1),
hget: jest.fn().mockResolvedValue(null),
hgetall: jest.fn().mockResolvedValue({}),
expire: jest.fn().mockResolvedValue(1),
zadd: jest.fn().mockResolvedValue(1),
zrem: jest.fn().mockResolvedValue(1),
zcard: jest.fn().mockResolvedValue(0),
zrangebyscore: jest.fn().mockResolvedValue([]),
xadd: jest.fn().mockResolvedValue("123-0"),
xreadgroup: jest.fn().mockResolvedValue(null),
xgroup: jest.fn().mockResolvedValue("OK"),
xpending: jest.fn().mockResolvedValue([0, null, null, []]),
xack: jest.fn().mockResolvedValue(1),
xclaim: jest.fn().mockResolvedValue([]),
xlen: jest.fn().mockResolvedValue(0),
hlen: jest.fn().mockResolvedValue(0),
hdel: jest.fn().mockResolvedValue(1),
quit: jest.fn().mockResolvedValue("OK"),
};
MockedRedis.mockImplementation(() => mockRedis);
queue = new RedisQueue({ redisUrl: "redis://localhost:6379/0" });
});
afterEach(async () => {
await queue.close();
});
describe("initialize", () => {
it("should create consumer group", async () => {
await queue.initialize();
expect(mockRedis.xgroup).toHaveBeenCalledWith(
"CREATE",
"ade:queue:tasks",
"ade-workers",
"$",
"MKSTREAM"
);
});
it("should not throw if group already exists", async () => {
mockRedis.xgroup.mockRejectedValueOnce(new Error("BUSYGROUP Consumer Group name already exists"));
await expect(queue.initialize()).resolves.not.toThrow();
});
});
describe("enqueue", () => {
const sampleTask: TaskPayload = {
id: "test-task-1",
kind: "task_notification",
priority: 5,
maxAttempts: 3,
backoffMultiplier: 2,
maxBackoffMs: 300000,
enqueuedAt: Date.now(),
source: "user",
text: "Test task",
};
it("should enqueue a task successfully", async () => {
const result = await queue.enqueue(sampleTask);
expect(result.success).toBe(true);
expect(result.taskId).toBe("test-task-1");
expect(mockRedis.hset).toHaveBeenCalled();
expect(mockRedis.expire).toHaveBeenCalled();
expect(mockRedis.xadd).toHaveBeenCalled();
});
it("should handle delayed tasks", async () => {
const delayedTask: TaskPayload = {
...sampleTask,
runAfter: Date.now() + 60000, // 1 minute delay
};
const result = await queue.enqueue(delayedTask);
expect(result.success).toBe(true);
expect(mockRedis.zadd).toHaveBeenCalled();
expect(mockRedis.xadd).not.toHaveBeenCalled();
});
it("should handle errors gracefully", async () => {
mockRedis.hset.mockRejectedValueOnce(new Error("Redis error"));
const result = await queue.enqueue(sampleTask);
expect(result.success).toBe(false);
expect(result.error).toBe("Redis error");
});
it("should generate task ID if not provided", async () => {
const taskWithoutId: Omit<TaskPayload, "id"> = {
kind: "message",
priority: 5,
maxAttempts: 3,
backoffMultiplier: 2,
maxBackoffMs: 300000,
enqueuedAt: Date.now(),
source: "system",
};
const result = await queue.enqueue(taskWithoutId as TaskPayload);
expect(result.success).toBe(true);
expect(result.taskId).toBeDefined();
expect(result.taskId).not.toBe("");
});
});
describe("claimTasks", () => {
const sampleTaskData: TaskState = {
id: "test-task-1",
kind: "task_notification",
status: "pending",
priority: 5,
maxAttempts: 3,
backoffMultiplier: 2,
maxBackoffMs: 300000,
enqueuedAt: Date.now(),
source: "user",
attemptCount: 0,
};
it("should claim tasks from the queue", async () => {
mockRedis.xreadgroup.mockResolvedValueOnce([
["ade:queue:tasks", [
["123-0", ["taskId", "test-task-1", "payload", "{}", "priority", "5"]]
]]
]);
mockRedis.hgetall.mockResolvedValueOnce({
id: "test-task-1",
kind: "task_notification",
status: "pending",
priority: "5",
maxAttempts: "3",
backoffMultiplier: "2",
maxBackoffMs: "300000",
enqueuedAt: Date.now().toString(),
source: "user",
attemptCount: "0",
});
const tasks = await queue.claimTasks("worker-1", { batchSize: 5 });
expect(tasks.length).toBeGreaterThan(0);
expect(tasks[0].id).toBe("test-task-1");
expect(tasks[0].status).toBe("claimed");
expect(tasks[0].workerId).toBe("worker-1");
});
it("should return empty array when no tasks available", async () => {
mockRedis.xreadgroup.mockResolvedValueOnce(null);
const tasks = await queue.claimTasks("worker-1");
expect(tasks).toEqual([]);
});
it("should skip tasks not found in hash", async () => {
mockRedis.xreadgroup.mockResolvedValueOnce([
["ade:queue:tasks", [
["123-0", ["taskId", "missing-task", "payload", "{}", "priority", "5"]]
]]
]);
mockRedis.hgetall.mockResolvedValueOnce({}); // Empty = not found
const tasks = await queue.claimTasks("worker-1");
expect(tasks).toEqual([]);
});
});
describe("completeTask", () => {
it("should mark task as completed", async () => {
const taskId = "test-task-1";
const result = { success: true, data: "completed" };
mockRedis.hgetall.mockResolvedValueOnce({
id: taskId,
kind: "task_notification",
status: "claimed",
priority: "5",
maxAttempts: "3",
backoffMultiplier: "2",
maxBackoffMs: "300000",
enqueuedAt: Date.now().toString(),
source: "user",
attemptCount: "0",
});
const success = await queue.completeTask(taskId, result);
expect(success).toBe(true);
expect(mockRedis.hset).toHaveBeenCalled();
});
it("should return false for non-existent task", async () => {
mockRedis.hgetall.mockResolvedValueOnce({});
const success = await queue.completeTask("missing-task");
expect(success).toBe(false);
});
});
describe("failTask", () => {
it("should mark task as failed on final attempt", async () => {
const taskId = "test-task-1";
mockRedis.hgetall.mockResolvedValueOnce({
id: taskId,
kind: "task_notification",
status: "claimed",
priority: "5",
maxAttempts: "2",
backoffMultiplier: "2",
maxBackoffMs: "300000",
enqueuedAt: Date.now().toString(),
source: "user",
attemptCount: "1", // Will become 2, which equals maxAttempts
});
const success = await queue.failTask(taskId, "Task failed");
expect(success).toBe(true);
expect(mockRedis.hset).toHaveBeenCalled();
// Should set status to failed
const hsetCall = mockRedis.hset.mock.calls[0];
expect(hsetCall[1].status).toBe("failed");
});
it("should schedule retry on non-final failure", async () => {
const taskId = "test-task-1";
mockRedis.hgetall.mockResolvedValueOnce({
id: taskId,
kind: "task_notification",
status: "claimed",
priority: "5",
maxAttempts: "3",
backoffMultiplier: "2",
maxBackoffMs: "300000",
enqueuedAt: Date.now().toString(),
source: "user",
attemptCount: "0", // Will become 1, retry allowed
});
const success = await queue.failTask(taskId, "Temporary error");
expect(success).toBe(true);
// Should add to delayed queue for retry
expect(mockRedis.zadd).toHaveBeenCalled();
expect(mockRedis.hset).toHaveBeenCalled();
// Should set status back to pending
const hsetCall = mockRedis.hset.mock.calls[0];
expect(hsetCall[1].status).toBe("pending");
});
});
describe("promoteDelayedTasks", () => {
it("should promote due delayed tasks", async () => {
const taskId = "delayed-task-1";
mockRedis.zrangebyscore.mockResolvedValueOnce([taskId]);
mockRedis.hgetall.mockResolvedValueOnce({
id: taskId,
kind: "task_notification",
status: "pending",
priority: "5",
maxAttempts: "3",
backoffMultiplier: "2",
maxBackoffMs: "300000",
enqueuedAt: Date.now().toString(),
source: "user",
attemptCount: "1",
});
const count = await queue.promoteDelayedTasks(10);
expect(count).toBe(1);
expect(mockRedis.zrem).toHaveBeenCalled();
expect(mockRedis.xadd).toHaveBeenCalled();
});
it("should return 0 when no delayed tasks", async () => {
mockRedis.zrangebyscore.mockResolvedValueOnce([]);
const count = await queue.promoteDelayedTasks();
expect(count).toBe(0);
expect(mockRedis.xadd).not.toHaveBeenCalled();
});
});
describe("worker registration", () => {
it("should register worker", async () => {
await queue.registerWorker("worker-1", {
hostname: "test-host",
pid: 12345,
version: "1.0.0",
});
expect(mockRedis.hset).toHaveBeenCalled();
expect(mockRedis.zadd).toHaveBeenCalled();
});
it("should update heartbeat", async () => {
await queue.updateHeartbeat("worker-1");
expect(mockRedis.zadd).toHaveBeenCalled();
});
it("should deregister worker", async () => {
await queue.deregisterWorker("worker-1");
expect(mockRedis.hdel).toHaveBeenCalled();
expect(mockRedis.zrem).toHaveBeenCalled();
});
});
describe("getStats", () => {
it("should return queue statistics", async () => {
mockRedis.xpending.mockResolvedValue([5, null, null, []]);
mockRedis.zcard.mockResolvedValue(3);
mockRedis.xlen.mockResolvedValue(100);
mockRedis.hlen.mockResolvedValue(2);
const stats = await queue.getStats();
expect(stats.pending).toBe(5);
expect(stats.delayed).toBe(3);
expect(stats.streamLength).toBe(100);
expect(stats.activeWorkers).toBe(2);
});
});
describe("getTask", () => {
it("should return task by ID", async () => {
const taskId = "test-task-1";
mockRedis.hgetall.mockResolvedValueOnce({
id: taskId,
kind: "task_notification",
status: "pending",
priority: "5",
maxAttempts: "3",
backoffMultiplier: "2",
maxBackoffMs: "300000",
enqueuedAt: "1234567890",
source: "user",
attemptCount: "0",
});
const task = await queue.getTask(taskId);
expect(task).not.toBeNull();
expect(task?.id).toBe(taskId);
expect(task?.kind).toBe("task_notification");
});
it("should return null for non-existent task", async () => {
mockRedis.hgetall.mockResolvedValueOnce({});
const task = await queue.getTask("missing-task");
expect(task).toBeNull();
});
});
describe("cancelTask", () => {
it("should cancel a pending task", async () => {
const taskId = "test-task-1";
mockRedis.hgetall.mockResolvedValueOnce({
id: taskId,
kind: "task_notification",
status: "pending",
priority: "5",
maxAttempts: "3",
backoffMultiplier: "2",
maxBackoffMs: "300000",
enqueuedAt: Date.now().toString(),
source: "user",
attemptCount: "0",
});
const success = await queue.cancelTask(taskId);
expect(success).toBe(true);
const hsetCall = mockRedis.hset.mock.calls[0];
expect(hsetCall[1].status).toBe("cancelled");
});
it("should not cancel completed tasks", async () => {
mockRedis.hgetall.mockResolvedValueOnce({
id: "test-task-1",
kind: "task_notification",
status: "completed",
priority: "5",
maxAttempts: "3",
backoffMultiplier: "2",
maxBackoffMs: "300000",
enqueuedAt: Date.now().toString(),
source: "user",
attemptCount: "0",
completedAt: Date.now().toString(),
});
const success = await queue.cancelTask("test-task-1");
expect(success).toBe(false);
});
});
});
describe("Task utilities", () => {
describe("calculateRetryDelay", () => {
it("should calculate exponential backoff", () => {
const delay0 = calculateRetryDelay({ attempt: 0, baseDelayMs: 1000 });
const delay1 = calculateRetryDelay({ attempt: 1, baseDelayMs: 1000 });
const delay2 = calculateRetryDelay({ attempt: 2, baseDelayMs: 1000 });
expect(delay0).toBeGreaterThanOrEqual(900); // With jitter
expect(delay0).toBeLessThanOrEqual(1100);
expect(delay1).toBeGreaterThanOrEqual(1800);
expect(delay1).toBeLessThanOrEqual(2200);
expect(delay2).toBeGreaterThanOrEqual(3600);
expect(delay2).toBeLessThanOrEqual(4400);
});
it("should cap at maxDelayMs", () => {
const delay = calculateRetryDelay({
attempt: 10,
baseDelayMs: 1000,
maxDelayMs: 5000,
jitterFactor: 0, // Disable jitter for this test
});
expect(delay).toBeLessThanOrEqual(5000);
});
});
describe("serializeTask / deserializeTask", () => {
it("should serialize and deserialize task state", () => {
const task: TaskState = {
id: "test-task",
kind: "task_notification",
status: "pending",
priority: 5,
maxAttempts: 3,
backoffMultiplier: 2,
maxBackoffMs: 300000,
enqueuedAt: 1234567890,
source: "user",
attemptCount: 0,
text: "Test task",
content: { key: "value" },
isCoalescable: true,
};
const serialized = serializeTask(task);
expect(serialized.id).toBe("test-task");
expect(serialized.content).toBe('{"key":"value"}');
expect(serialized.isCoalescable).toBe("1");
const deserialized = deserializeTask(serialized);
expect(deserialized.id).toBe(task.id);
expect(deserialized.kind).toBe(task.kind);
expect(deserialized.status).toBe(task.status);
expect(deserialized.content).toEqual(task.content);
expect(deserialized.isCoalescable).toBe(true);
});
});
});