From 02384e7520534d59295dd52a87306ba8b6043b5a Mon Sep 17 00:00:00 2001 From: "Ani (Annie Tunturi)" Date: Wed, 18 Mar 2026 10:46:27 -0400 Subject: [PATCH] queue-core: Redis Streams implementation - Redis Streams implementation with consumer groups (ade-workers) - Task interface with serialization/deserialization - Worker with heartbeat (5s) and task claiming - Retry logic with exponential backoff and jitter - Delayed task support via Sorted Sets - Dead worker reclamation (30s timeout) - Unit tests: 26 passing - TypeScript compilation successful --- jest.config.js | 13 + package.json | 33 ++ src/index.ts | 28 ++ src/queue/RedisQueue.ts | 590 +++++++++++++++++++++++++++++++++ src/queue/Task.ts | 247 ++++++++++++++ src/queue/Worker.ts | 384 +++++++++++++++++++++ tests/queue/RedisQueue.test.ts | 509 ++++++++++++++++++++++++++++ tsconfig.json | 20 ++ 8 files changed, 1824 insertions(+) create mode 100644 jest.config.js create mode 100644 package.json create mode 100644 src/index.ts create mode 100644 src/queue/RedisQueue.ts create mode 100644 src/queue/Task.ts create mode 100644 src/queue/Worker.ts create mode 100644 tests/queue/RedisQueue.test.ts create mode 100644 tsconfig.json diff --git a/jest.config.js b/jest.config.js new file mode 100644 index 0000000..7177895 --- /dev/null +++ b/jest.config.js @@ -0,0 +1,13 @@ +module.exports = { + preset: 'ts-jest', + testEnvironment: 'node', + roots: ['/tests'], + testMatch: ['**/*.test.ts'], + collectCoverageFrom: [ + 'src/**/*.ts', + '!src/**/*.d.ts', + ], + moduleNameMapper: { + '^@/(.*)$': '/src/$1', + }, +}; diff --git a/package.json b/package.json new file mode 100644 index 0000000..1005e8f --- /dev/null +++ b/package.json @@ -0,0 +1,33 @@ +{ + "name": "@community-ade/queue-core", + "version": "0.1.0", + "description": "Redis Streams queue implementation for Community ADE", + "main": "dist/index.js", + "types": "dist/index.d.ts", + "scripts": { + "build": "tsc", + "test": "jest", + "test:watch": "jest --watch", + "dev": "tsc --watch" + }, + "keywords": [ + "redis", + "queue", + "streams", + "tasks" + ], + "author": "Community ADE Team", + "license": "MIT", + "dependencies": { + "ioredis": "^5.3.2", + "uuid": "^9.0.1" + }, + "devDependencies": { + "@types/jest": "^29.5.12", + "@types/node": "^20.11.24", + "@types/uuid": "^9.0.8", + "jest": "^29.7.0", + "ts-jest": "^29.1.2", + "typescript": "^5.9.3" + } +} diff --git a/src/index.ts b/src/index.ts new file mode 100644 index 0000000..2fef3a3 --- /dev/null +++ b/src/index.ts @@ -0,0 +1,28 @@ +/** + * Community ADE Queue Core + * Redis Streams implementation with consumer groups + */ + +export { default as RedisQueue } from './queue/RedisQueue'; +export { default as Worker, WorkerPool } from './queue/Worker'; +export { + TaskPayload, + TaskState, + TaskStatus, + TaskKind, + WorkerInfo, + ClaimOptions, + ClaimResult, + RetryConfig, + DEFAULT_RETRY_CONFIG, + TASK_CONSTANTS, + calculateRetryDelay, + serializeTask, + deserializeTask, + serializeTaskForStream, + deserializeTaskFromStream, + getTaskKey, +} from './queue/Task'; + +export type { RedisQueueOptions, EnqueueResult, TaskStats } from './queue/RedisQueue'; +export type { WorkerOptions, TaskHandler, WorkerState } from './queue/Worker'; diff --git a/src/queue/RedisQueue.ts b/src/queue/RedisQueue.ts new file mode 100644 index 0000000..d084918 --- /dev/null +++ b/src/queue/RedisQueue.ts @@ -0,0 +1,590 @@ +/** + * Redis Streams Queue Implementation + * Uses Redis Streams with consumer groups for reliable task processing + */ + +import Redis from "ioredis"; +import { v4 as uuidv4 } from "uuid"; +import { + TaskPayload, + TaskState, + TaskStatus, + ClaimOptions, + serializeTask, + deserializeTask, + serializeTaskForStream, + getTaskKey, + TASK_CONSTANTS, + calculateRetryDelay, + RetryConfig, +} from "./Task"; + +export interface RedisQueueOptions { + redisUrl?: string; + streamKey?: string; + consumerGroup?: string; + heartbeatTimeoutMs?: number; + processingTimeoutMs?: number; + defaultMaxAttempts?: number; +} + +export interface EnqueueResult { + success: boolean; + taskId?: string; + error?: string; +} + +export interface TaskStats { + pending: number; + delayed: number; + streamLength: number; + activeWorkers: number; +} + +export class RedisQueue { + private redis: Redis; + private streamKey: string; + private consumerGroup: string; + private delayedKey: string; + private workersActiveKey: string; + private workersHeartbeatKey: string; + private heartbeatTimeoutMs: number; + private processingTimeoutMs: number; + private defaultMaxAttempts: number; + + constructor(options: RedisQueueOptions = {}) { + this.redis = new Redis(options.redisUrl || "redis://localhost:6379/0"); + this.streamKey = options.streamKey || TASK_CONSTANTS.STREAM_KEY; + this.consumerGroup = options.consumerGroup || TASK_CONSTANTS.CONSUMER_GROUP; + this.delayedKey = TASK_CONSTANTS.DELAYED_KEY; + this.workersActiveKey = TASK_CONSTANTS.WORKERS_ACTIVE_KEY; + this.workersHeartbeatKey = TASK_CONSTANTS.WORKERS_HEARTBEAT_KEY; + this.heartbeatTimeoutMs = options.heartbeatTimeoutMs || TASK_CONSTANTS.HEARTBEAT_TIMEOUT_MS; + this.processingTimeoutMs = options.processingTimeoutMs || TASK_CONSTANTS.PROCESSING_TIMEOUT_MS; + this.defaultMaxAttempts = options.defaultMaxAttempts || TASK_CONSTANTS.DEFAULT_MAX_ATTEMPTS; + } + + /** + * Initialize the consumer group (idempotent) + */ + async initialize(): Promise { + try { + await this.redis.xgroup("CREATE", this.streamKey, this.consumerGroup, "$", "MKSTREAM"); + } catch (err: any) { + // Group already exists - ignore + if (!err.message?.includes("already exists")) { + throw err; + } + } + } + + /** + * Enqueue a task to the queue + */ + async enqueue(payload: TaskPayload): Promise { + const taskId = payload.id || uuidv4(); + const now = Date.now(); + + const task: TaskState = { + ...payload, + id: taskId, + status: "pending", + attemptCount: 0, + enqueuedAt: payload.enqueuedAt || now, + maxAttempts: payload.maxAttempts || this.defaultMaxAttempts, + backoffMultiplier: payload.backoffMultiplier || 2, + maxBackoffMs: payload.maxBackoffMs || 300000, + priority: payload.priority ?? TASK_CONSTANTS.DEFAULT_PRIORITY, + }; + + try { + // Store task state in hash + const taskKey = getTaskKey(taskId); + await this.redis.hset(taskKey, serializeTask(task)); + + // Set TTL for completed/failed tasks + await this.redis.expire(taskKey, TASK_CONSTANTS.TASK_TTL_SECONDS); + + // Check if task should be delayed + if (task.runAfter && task.runAfter > now) { + // Add to delayed queue + await this.redis.zadd(this.delayedKey, task.runAfter.toString(), taskId); + } else { + // Add to stream for immediate processing + await this.redis.xadd( + this.streamKey, + "*", + ...this.flattenObject(serializeTaskForStream(task)) + ); + } + + return { success: true, taskId }; + } catch (err: any) { + return { success: false, error: err.message }; + } + } + + /** + * Claim tasks from the queue using consumer groups + */ + async claimTasks( + consumerId: string, + options: ClaimOptions = {} + ): Promise { + const batchSize = options.batchSize || TASK_CONSTANTS.CLAIM_BATCH_SIZE; + const blockMs = options.blockMs || 5000; + + try { + const result = await this.redis.xreadgroup( + "GROUP", + this.consumerGroup, + consumerId, + "COUNT", + batchSize, + "BLOCK", + blockMs, + "STREAMS", + this.streamKey, + ">" + ) as [string, [string, string[]][]][] | null; + + if (!result || result.length === 0) { + return []; + } + + const messages = result[0][1]; + + if (!messages || messages.length === 0) { + return []; + } + + const tasks: TaskState[] = []; + + for (const [messageId, fields] of messages) { + const data = this.unflattenArray(fields as string[]); + const taskId = data.taskId; + + if (!taskId) { + continue; + } + + // Get full task state from hash + const taskKey = getTaskKey(taskId); + const taskData = await this.redis.hgetall(taskKey); + + if (!taskData || Object.keys(taskData).length === 0) { + // Task not found, acknowledge to clean up + await this.redis.xack(this.streamKey, this.consumerGroup, messageId); + continue; + } + + const task = deserializeTask(taskData); + task.status = "claimed"; + task.workerId = consumerId; + task.startedAt = Date.now(); + + // Update task state + await this.redis.hset(taskKey, serializeTask(task)); + + // Store message ID for later acknowledgment + (task as any).__messageId = messageId; + + tasks.push(task); + } + + return tasks; + } catch (err: any) { + console.error("Error claiming tasks:", err); + return []; + } + } + + /** + * Mark a task as completed + */ + async completeTask(taskId: string, result?: unknown): Promise { + const taskKey = getTaskKey(taskId); + + try { + const taskData = await this.redis.hgetall(taskKey); + + if (!taskData || Object.keys(taskData).length === 0) { + return false; + } + + const task = deserializeTask(taskData); + + task.status = "completed"; + task.completedAt = Date.now(); + task.result = result; + + await this.redis.hset(taskKey, serializeTask(task)); + + // Acknowledge in stream to remove from pending + await this.acknowledgeTask(taskId); + + return true; + } catch (err: any) { + console.error("Error completing task:", err); + return false; + } + } + + /** + * Mark a task as failed (with retry logic) + */ + async failTask(taskId: string, error: string): Promise { + const taskKey = getTaskKey(taskId); + + try { + const taskData = await this.redis.hgetall(taskKey); + + if (!taskData || Object.keys(taskData).length === 0) { + return false; + } + + const task = deserializeTask(taskData); + const attemptCount = task.attemptCount + 1; + + if (attemptCount >= task.maxAttempts) { + // Final failure + task.status = "failed"; + task.completedAt = Date.now(); + task.error = error; + task.attemptCount = attemptCount; + + await this.redis.hset(taskKey, serializeTask(task)); + await this.acknowledgeTask(taskId); + } else { + // Schedule retry + const delay = calculateRetryDelay({ + attempt: attemptCount, + baseDelayMs: 1000, + multiplier: task.backoffMultiplier, + maxDelayMs: task.maxBackoffMs, + jitterFactor: 0.1, + }); + + const runAfter = Date.now() + delay; + + task.status = "pending"; + task.attemptCount = attemptCount; + task.error = error; + task.workerId = undefined; + task.startedAt = undefined; + task.runAfter = runAfter; + + await this.redis.hset(taskKey, serializeTask(task)); + + // Add to delayed queue + await this.redis.zadd(this.delayedKey, runAfter.toString(), taskId); + + // Acknowledge to remove from pending + await this.acknowledgeTask(taskId); + } + + return true; + } catch (err: any) { + console.error("Error failing task:", err); + return false; + } + } + + /** + * Acknowledge a task in the stream (remove from pending) + */ + private async acknowledgeTask(taskId: string): Promise { + // Find and acknowledge the message by task ID + try { + const pending = await this.redis.xpending( + this.streamKey, + this.consumerGroup, + "-", + "+", + 100 + ); + + for (const item of pending) { + const [id, , , messageCount] = item as [string, string, number, number]; + + // Claim and then acknowledge + const claimed = await this.redis.xclaim( + this.streamKey, + this.consumerGroup, + "system", + 0, + id + ); + + if (claimed && claimed.length > 0) { + await this.redis.xack(this.streamKey, this.consumerGroup, id); + } + } + } catch (err) { + // Best effort acknowledgment + } + } + + /** + * Promote delayed tasks that are ready to run + */ + async promoteDelayedTasks(batchSize: number = 100): Promise { + const now = Date.now(); + + try { + // Get tasks that are due + const dueTasks = await this.redis.zrangebyscore( + this.delayedKey, + 0, + now, + "LIMIT", + 0, + batchSize + ); + + if (!dueTasks || dueTasks.length === 0) { + return 0; + } + + // Remove from delayed queue + await this.redis.zrem(this.delayedKey, ...dueTasks); + + // Add to stream + for (const taskId of dueTasks) { + const taskKey = getTaskKey(taskId); + const taskData = await this.redis.hgetall(taskKey); + + if (taskData && Object.keys(taskData).length > 0) { + const task = deserializeTask(taskData); + await this.redis.xadd( + this.streamKey, + "*", + ...this.flattenObject(serializeTaskForStream(task)) + ); + } + } + + return dueTasks.length; + } catch (err: any) { + console.error("Error promoting delayed tasks:", err); + return 0; + } + } + + /** + * Register a worker + */ + async registerWorker(workerId: string, info: { + hostname: string; + pid: number; + version?: string; + }): Promise { + const now = Date.now(); + const workerInfo = { + ...info, + startedAt: now, + lastHeartbeat: now, + version: info.version || "unknown", + }; + + await this.redis.hset( + this.workersActiveKey, + workerId, + JSON.stringify(workerInfo) + ); + + await this.redis.zadd(this.workersHeartbeatKey, now.toString(), workerId); + } + + /** + * Update worker heartbeat + */ + async updateHeartbeat(workerId: string): Promise { + const now = Date.now(); + + await this.redis.zadd(this.workersHeartbeatKey, now.toString(), workerId); + + // Update worker info + const existing = await this.redis.hget(this.workersActiveKey, workerId); + if (existing) { + const info = JSON.parse(existing); + info.lastHeartbeat = now; + await this.redis.hset(this.workersActiveKey, workerId, JSON.stringify(info)); + } + } + + /** + * Deregister a worker + */ + async deregisterWorker(workerId: string): Promise { + await this.redis.hdel(this.workersActiveKey, workerId); + await this.redis.zrem(this.workersHeartbeatKey, workerId); + } + + /** + * Detect and reclaim tasks from dead workers + */ + async reclaimDeadWorkerTasks(): Promise { + const now = Date.now(); + const cutoff = now - this.heartbeatTimeoutMs; + + try { + // Find dead workers + const deadWorkers = await this.redis.zrangebyscore( + this.workersHeartbeatKey, + 0, + cutoff + ); + + let reclaimedCount = 0; + + for (const workerId of deadWorkers) { + // Get pending tasks for this worker + const pending = await this.redis.xpending( + this.streamKey, + this.consumerGroup, + "-", + "+", + 100 + ); + + for (const item of pending) { + const [id, consumer, idleTime] = item as [string, string, number]; + + if (consumer === workerId && idleTime > this.processingTimeoutMs) { + // Reclaim the task + const claimed = await this.redis.xclaim( + this.streamKey, + this.consumerGroup, + "system", + this.processingTimeoutMs, + id + ); + + if (claimed && claimed.length > 0) { + // Acknowledge and retry + await this.redis.xack(this.streamKey, this.consumerGroup, id); + + // Get task info and schedule retry + const entry = claimed[0] as [string, string[]]; + const fields = entry[1]; + const data = this.unflattenArray(fields); + + if (data.taskId) { + await this.failTask(data.taskId, "Worker died during processing"); + reclaimedCount++; + } + } + } + } + + // Clean up dead worker + await this.redis.hdel(this.workersActiveKey, workerId); + await this.redis.zrem(this.workersHeartbeatKey, workerId); + } + + return reclaimedCount; + } catch (err: any) { + console.error("Error reclaiming dead worker tasks:", err); + return 0; + } + } + + /** + * Get queue statistics + */ + async getStats(): Promise { + const [pending, delayed, streamLength, activeWorkers] = await Promise.all([ + this.redis.xpending(this.streamKey, this.consumerGroup).then( + (result) => (result as [number, string, string, [string, string][]])?.[0] || 0 + ), + this.redis.zcard(this.delayedKey), + this.redis.xlen(this.streamKey), + this.redis.hlen(this.workersActiveKey), + ]); + + return { + pending: pending as number, + delayed, + streamLength, + activeWorkers, + }; + } + + /** + * Get task by ID + */ + async getTask(taskId: string): Promise { + const taskKey = getTaskKey(taskId); + const taskData = await this.redis.hgetall(taskKey); + + if (!taskData || Object.keys(taskData).length === 0) { + return null; + } + + return deserializeTask(taskData); + } + + /** + * Cancel a task + */ + async cancelTask(taskId: string): Promise { + const taskKey = getTaskKey(taskId); + + try { + const taskData = await this.redis.hgetall(taskKey); + + if (!taskData || Object.keys(taskData).length === 0) { + return false; + } + + const task = deserializeTask(taskData); + + if (task.status === "completed" || task.status === "failed") { + return false; // Can't cancel completed/failed tasks + } + + task.status = "cancelled"; + task.completedAt = Date.now(); + + await this.redis.hset(taskKey, serializeTask(task)); + await this.acknowledgeTask(taskId); + + return true; + } catch (err: any) { + console.error("Error cancelling task:", err); + return false; + } + } + + /** + * Close the Redis connection + */ + async close(): Promise { + await this.redis.quit(); + } + + /** + * Helper: Flatten object to array of key-value pairs + */ + private flattenObject(obj: Record): string[] { + const result: string[] = []; + for (const [key, value] of Object.entries(obj)) { + result.push(key, value); + } + return result; + } + + /** + * Helper: Unflatten array to object + */ + private unflattenArray(arr: string[]): Record { + const result: Record = {}; + for (let i = 0; i < arr.length; i += 2) { + result[arr[i]] = arr[i + 1]; + } + return result; + } +} + +export default RedisQueue; diff --git a/src/queue/Task.ts b/src/queue/Task.ts new file mode 100644 index 0000000..9b24634 --- /dev/null +++ b/src/queue/Task.ts @@ -0,0 +1,247 @@ +/** + * Task types and interfaces for Redis Streams queue + * Based on ade-redis-queue-design.md + */ + +export type TaskStatus = + | "pending" // Enqueued, not yet claimed + | "claimed" // Claimed by worker, processing + | "running" // Actively being processed + | "completed" // Successfully finished + | "failed" // Exhausted all retries + | "cancelled"; // Explicitly cancelled + +export type TaskKind = + | "message" + | "task_notification" + | "approval_result" + | "overlay_action"; + +export interface TaskPayload { + // Task identification + id: string; // UUID v4 + kind: TaskKind; + + // Execution context + agentId?: string; + conversationId?: string; + clientMessageId?: string; + + // Content (varies by kind) + content?: unknown; // For "message" kind + text?: string; // For notification/approval/overlay + + // Subagent execution params (for task_notification) + subagentType?: string; + prompt?: string; + model?: string; + existingAgentId?: string; + existingConversationId?: string; + maxTurns?: number; + + // Scheduling + priority: number; // 0-9, lower = higher priority + runAfter?: number; // Timestamp ms (for delayed tasks) + + // Retry configuration + maxAttempts: number; + backoffMultiplier: number; // Default: 2 + maxBackoffMs: number; // Default: 300000 (5 min) + + // Metadata + enqueuedAt: number; + source: "user" | "system" | "hook"; + + // Coalescing support + isCoalescable?: boolean; + scopeKey?: string; // For grouping coalescable items +} + +export interface TaskState extends TaskPayload { + status: TaskStatus; + workerId?: string; + attemptCount: number; + startedAt?: number; + completedAt?: number; + error?: string; + result?: unknown; +} + +export interface WorkerInfo { + workerId: string; + hostname: string; + pid: number; + startedAt: number; + lastHeartbeat: number; + version: string; +} + +export interface ClaimOptions { + batchSize?: number; + blockMs?: number; + coalescingWindowMs?: number; +} + +export interface ClaimResult { + tasks: TaskState[]; + batchId: string; +} + +// Retry configuration interface +export interface RetryConfig { + attempt: number; // 0-indexed (0 = first retry) + baseDelayMs: number; // Default: 1000 + multiplier: number; // Default: 2 + maxDelayMs: number; // Default: 300000 (5 min) + jitterFactor: number; // Default: 0.1 (10% randomization) +} + +// Default retry configuration +export const DEFAULT_RETRY_CONFIG: RetryConfig = { + attempt: 0, + baseDelayMs: 1000, + multiplier: 2, + maxDelayMs: 300000, + jitterFactor: 0.1 +}; + +// Task constants +export const TASK_CONSTANTS = { + // Redis keys + STREAM_KEY: "ade:queue:tasks", + DELAYED_KEY: "ade:queue:delayed", + CONSUMER_GROUP: "ade-workers", + WORKERS_ACTIVE_KEY: "ade:workers:active", + WORKERS_HEARTBEAT_KEY: "ade:workers:heartbeat", + + // Timing + HEARTBEAT_INTERVAL_MS: 5000, + HEARTBEAT_TIMEOUT_MS: 30000, + CLAIM_BATCH_SIZE: 10, + PROCESSING_TIMEOUT_MS: 300000, // 5 min + + // Task state + DEFAULT_MAX_ATTEMPTS: 3, + DEFAULT_PRIORITY: 5, + + // TTL + TASK_TTL_SECONDS: 7 * 24 * 60 * 60, // 7 days +} as const; + +/** + * Calculate retry delay with exponential backoff and jitter + */ +export function calculateRetryDelay(config: Partial = {}): number { + const fullConfig = { ...DEFAULT_RETRY_CONFIG, ...config }; + + // Exponential backoff: base * (multiplier ^ attempt) + const exponentialDelay = fullConfig.baseDelayMs * + Math.pow(fullConfig.multiplier, fullConfig.attempt); + + // Cap at max + const cappedDelay = Math.min(exponentialDelay, fullConfig.maxDelayMs); + + // Add jitter to prevent thundering herd: ±jitterFactor + const jitter = cappedDelay * fullConfig.jitterFactor * (Math.random() * 2 - 1); + + return Math.floor(cappedDelay + jitter); +} + +/** + * Serialize task state for Redis Hash storage + */ +export function serializeTask(task: TaskState): Record { + return { + id: task.id, + kind: task.kind, + status: task.status, + agentId: task.agentId ?? "", + conversationId: task.conversationId ?? "", + clientMessageId: task.clientMessageId ?? "", + content: task.content ? JSON.stringify(task.content) : "", + text: task.text ?? "", + subagentType: task.subagentType ?? "", + prompt: task.prompt ?? "", + model: task.model ?? "", + existingAgentId: task.existingAgentId ?? "", + existingConversationId: task.existingConversationId ?? "", + maxTurns: task.maxTurns?.toString() ?? "", + priority: task.priority.toString(), + runAfter: task.runAfter?.toString() ?? "", + maxAttempts: task.maxAttempts.toString(), + backoffMultiplier: task.backoffMultiplier.toString(), + maxBackoffMs: task.maxBackoffMs.toString(), + enqueuedAt: task.enqueuedAt.toString(), + source: task.source, + isCoalescable: task.isCoalescable ? "1" : "0", + scopeKey: task.scopeKey ?? "", + workerId: task.workerId ?? "", + attemptCount: task.attemptCount.toString(), + startedAt: task.startedAt?.toString() ?? "", + completedAt: task.completedAt?.toString() ?? "", + error: task.error ?? "", + result: task.result ? JSON.stringify(task.result) : "", + }; +} + +/** + * Deserialize task state from Redis Hash storage + */ +export function deserializeTask(data: Record): TaskState { + return { + id: data.id, + kind: data.kind as TaskKind, + status: data.status as TaskStatus, + agentId: data.agentId || undefined, + conversationId: data.conversationId || undefined, + clientMessageId: data.clientMessageId || undefined, + content: data.content ? JSON.parse(data.content) : undefined, + text: data.text || undefined, + subagentType: data.subagentType || undefined, + prompt: data.prompt || undefined, + model: data.model || undefined, + existingAgentId: data.existingAgentId || undefined, + existingConversationId: data.existingConversationId || undefined, + maxTurns: data.maxTurns ? parseInt(data.maxTurns) : undefined, + priority: parseInt(data.priority), + runAfter: data.runAfter ? parseInt(data.runAfter) : undefined, + maxAttempts: parseInt(data.maxAttempts), + backoffMultiplier: parseInt(data.backoffMultiplier), + maxBackoffMs: parseInt(data.maxBackoffMs), + enqueuedAt: parseInt(data.enqueuedAt), + source: data.source as "user" | "system" | "hook", + isCoalescable: data.isCoalescable === "1", + scopeKey: data.scopeKey || undefined, + workerId: data.workerId || undefined, + attemptCount: parseInt(data.attemptCount), + startedAt: data.startedAt ? parseInt(data.startedAt) : undefined, + completedAt: data.completedAt ? parseInt(data.completedAt) : undefined, + error: data.error || undefined, + result: data.result ? JSON.parse(data.result) : undefined, + }; +} + +/** + * Serialize task payload for stream entry + */ +export function serializeTaskForStream(task: TaskPayload): Record { + return { + taskId: task.id, + payload: JSON.stringify(task), + priority: task.priority.toString(), + }; +} + +/** + * Deserialize task from stream entry + */ +export function deserializeTaskFromStream(data: Record): TaskPayload { + return JSON.parse(data.payload) as TaskPayload; +} + +/** + * Generate Redis key for task state hash + */ +export function getTaskKey(taskId: string): string { + return `ade:task:${taskId}`; +} diff --git a/src/queue/Worker.ts b/src/queue/Worker.ts new file mode 100644 index 0000000..669734e --- /dev/null +++ b/src/queue/Worker.ts @@ -0,0 +1,384 @@ +/** + * Worker implementation with heartbeat and task claiming + * Manages the lifecycle of a worker processing tasks from Redis Streams + */ + +import { v4 as uuidv4 } from "uuid"; +import os from "os"; +import RedisQueue from "./RedisQueue"; +import { + TaskState, + ClaimOptions, + WorkerInfo, + TASK_CONSTANTS, +} from "./Task"; + +export interface WorkerOptions { + queue: RedisQueue; + workerId?: string; + heartbeatIntervalMs?: number; + claimIntervalMs?: number; + batchSize?: number; + maxConcurrentTasks?: number; + version?: string; +} + +export interface TaskHandler { + (task: TaskState): Promise; +} + +export type WorkerState = + | "idle" + | "claiming" + | "processing" + | "stopping" + | "stopped"; + +export class Worker { + public readonly workerId: string; + private queue: RedisQueue; + private heartbeatIntervalMs: number; + private claimIntervalMs: number; + private batchSize: number; + private maxConcurrentTasks: number; + private version: string; + + private state: WorkerState = "idle"; + private isRunning: boolean = false; + private heartbeatTimer?: NodeJS.Timeout; + private claimTimer?: NodeJS.Timeout; + private taskHandler?: TaskHandler; + private activeTasks: Map = new Map(); + + constructor(options: WorkerOptions) { + this.workerId = options.workerId || this.generateWorkerId(); + this.queue = options.queue; + this.heartbeatIntervalMs = options.heartbeatIntervalMs || TASK_CONSTANTS.HEARTBEAT_INTERVAL_MS; + this.claimIntervalMs = options.claimIntervalMs || 1000; + this.batchSize = options.batchSize || TASK_CONSTANTS.CLAIM_BATCH_SIZE; + this.maxConcurrentTasks = options.maxConcurrentTasks || 5; + this.version = options.version || process.env.npm_package_version || "unknown"; + } + + /** + * Generate a unique worker ID + */ + private generateWorkerId(): string { + const hostname = os.hostname(); + const pid = process.pid; + const uuid = uuidv4().slice(0, 8); + return `${hostname}:${pid}:${uuid}`; + } + + /** + * Start the worker + */ + async start(taskHandler: TaskHandler): Promise { + if (this.isRunning) { + throw new Error("Worker is already running"); + } + + this.taskHandler = taskHandler; + this.isRunning = true; + this.state = "idle"; + + // Register worker with queue + await this.queue.registerWorker(this.workerId, { + hostname: os.hostname(), + pid: process.pid, + version: this.version, + }); + + // Initialize queue consumer group + await this.queue.initialize(); + + // Start heartbeat + this.startHeartbeat(); + + // Start claim loop + this.startClaimLoop(); + + console.log(`[Worker ${this.workerId}] Started`); + } + + /** + * Stop the worker gracefully + */ + async stop(graceful: boolean = true, timeoutMs: number = 30000): Promise { + if (!this.isRunning) { + return; + } + + console.log(`[Worker ${this.workerId}] Stopping...`); + this.state = "stopping"; + this.isRunning = false; + + // Stop timers + if (this.heartbeatTimer) { + clearInterval(this.heartbeatTimer); + this.heartbeatTimer = undefined; + } + + if (this.claimTimer) { + clearInterval(this.claimTimer); + this.claimTimer = undefined; + } + + // Wait for active tasks to complete if graceful + if (graceful && this.activeTasks.size > 0) { + console.log(`[Worker ${this.workerId}] Waiting for ${this.activeTasks.size} active tasks...`); + const startTime = Date.now(); + + while (this.activeTasks.size > 0 && Date.now() - startTime < timeoutMs) { + await this.sleep(100); + } + + if (this.activeTasks.size > 0) { + console.log(`[Worker ${this.workerId}] Timeout reached, ${this.activeTasks.size} tasks still active`); + } + } + + // Deregister worker + await this.queue.deregisterWorker(this.workerId); + + this.state = "stopped"; + console.log(`[Worker ${this.workerId}] Stopped`); + } + + /** + * Get current worker state + */ + getState(): WorkerState { + return this.state; + } + + /** + * Get number of active tasks + */ + getActiveTaskCount(): number { + return this.activeTasks.size; + } + + /** + * Get worker info + */ + getInfo(): WorkerInfo { + return { + workerId: this.workerId, + hostname: os.hostname(), + pid: process.pid, + startedAt: Date.now(), + lastHeartbeat: Date.now(), + version: this.version, + }; + } + + /** + * Start the heartbeat loop + */ + private startHeartbeat(): void { + // Send initial heartbeat + this.sendHeartbeat(); + + // Schedule periodic heartbeats + this.heartbeatTimer = setInterval(() => { + this.sendHeartbeat().catch((err) => { + console.error(`[Worker ${this.workerId}] Heartbeat error:`, err); + }); + }, this.heartbeatIntervalMs); + } + + /** + * Send a heartbeat + */ + private async sendHeartbeat(): Promise { + try { + await this.queue.updateHeartbeat(this.workerId); + } catch (err) { + console.error(`[Worker ${this.workerId}] Failed to send heartbeat:`, err); + } + } + + /** + * Start the task claim loop + */ + private startClaimLoop(): void { + // Immediate first claim + this.claimTasks().catch((err) => { + console.error(`[Worker ${this.workerId}] Initial claim error:`, err); + }); + + // Schedule periodic claims + this.claimTimer = setInterval(() => { + this.claimTasks().catch((err) => { + console.error(`[Worker ${this.workerId}] Claim error:`, err); + }); + }, this.claimIntervalMs); + } + + /** + * Claim and process tasks + */ + private async claimTasks(): Promise { + if (!this.isRunning) { + return; + } + + // Check if we have capacity for more tasks + const availableSlots = this.maxConcurrentTasks - this.activeTasks.size; + if (availableSlots <= 0) { + return; + } + + this.state = "claiming"; + + try { + const options: ClaimOptions = { + batchSize: Math.min(this.batchSize, availableSlots), + blockMs: 1000, + }; + + const tasks = await this.queue.claimTasks(this.workerId, options); + + if (tasks.length === 0) { + this.state = this.activeTasks.size > 0 ? "processing" : "idle"; + return; + } + + console.log(`[Worker ${this.workerId}] Claimed ${tasks.length} tasks`); + + // Process tasks concurrently + for (const task of tasks) { + this.activeTasks.set(task.id, task); + this.processTask(task).catch((err) => { + console.error(`[Worker ${this.workerId}] Error processing task ${task.id}:`, err); + }); + } + + this.state = "processing"; + } catch (err) { + console.error(`[Worker ${this.workerId}] Error claiming tasks:`, err); + this.state = this.activeTasks.size > 0 ? "processing" : "idle"; + } + } + + /** + * Process a single task + */ + private async processTask(task: TaskState): Promise { + const taskId = task.id; + + console.log(`[Worker ${this.workerId}] Processing task ${taskId} (${task.kind})`); + + try { + if (!this.taskHandler) { + throw new Error("No task handler registered"); + } + + // Execute task handler + const result = await this.taskHandler(task); + + // Mark task as completed + await this.queue.completeTask(taskId, result); + + console.log(`[Worker ${this.workerId}] Completed task ${taskId}`); + } catch (err: any) { + console.error(`[Worker ${this.workerId}] Task ${taskId} failed:`, err); + + // Mark task as failed (triggers retry logic) + await this.queue.failTask(taskId, err.message || String(err)); + } finally { + this.activeTasks.delete(taskId); + + if (this.activeTasks.size === 0 && this.isRunning) { + this.state = "idle"; + } + } + } + + /** + * Sleep helper + */ + private sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } +} + +/** + * Worker pool for managing multiple workers + */ +export class WorkerPool { + private workers: Worker[] = []; + private queue: RedisQueue; + private taskHandler: TaskHandler; + private options: Omit; + + constructor( + queue: RedisQueue, + taskHandler: TaskHandler, + options: Omit = {} + ) { + this.queue = queue; + this.taskHandler = taskHandler; + this.options = options; + } + + /** + * Start the pool with specified number of workers + */ + async start(workerCount: number): Promise { + console.log(`[WorkerPool] Starting ${workerCount} workers...`); + + for (let i = 0; i < workerCount; i++) { + const worker = new Worker({ + ...this.options, + queue: this.queue, + }); + + await worker.start(this.taskHandler); + this.workers.push(worker); + } + + console.log(`[WorkerPool] All ${workerCount} workers started`); + } + + /** + * Stop all workers + */ + async stop(graceful: boolean = true, timeoutMs: number = 30000): Promise { + console.log(`[WorkerPool] Stopping ${this.workers.length} workers...`); + + await Promise.all( + this.workers.map((worker) => worker.stop(graceful, timeoutMs)) + ); + + this.workers = []; + console.log("[WorkerPool] All workers stopped"); + } + + /** + * Get stats for all workers + */ + getStats(): { + totalWorkers: number; + activeTasks: number; + workerStates: Record; + } { + const workerStates: Record = {}; + let activeTasks = 0; + + for (const worker of this.workers) { + const state = worker.getState(); + workerStates[state] = (workerStates[state] || 0) + 1; + activeTasks += worker.getActiveTaskCount(); + } + + return { + totalWorkers: this.workers.length, + activeTasks, + workerStates, + }; + } +} + +export default Worker; diff --git a/tests/queue/RedisQueue.test.ts b/tests/queue/RedisQueue.test.ts new file mode 100644 index 0000000..f1f35ac --- /dev/null +++ b/tests/queue/RedisQueue.test.ts @@ -0,0 +1,509 @@ +/** + * Unit tests for RedisQueue + * Tests enqueue, claim, complete, fail, and retry functionality + */ + +import RedisQueue from "../../src/queue/RedisQueue"; +import { + TaskPayload, + TaskState, + TaskStatus, + calculateRetryDelay, + serializeTask, + deserializeTask, +} from "../../src/queue/Task"; + +// Mock ioredis +jest.mock("ioredis"); + +// Get the mocked Redis class +const MockedRedis = require("ioredis"); + +describe("RedisQueue", () => { + let queue: RedisQueue; + let mockRedis: any; + + beforeEach(() => { + // Reset mocks + jest.clearAllMocks(); + + // Create mock Redis instance + mockRedis = { + hset: jest.fn().mockResolvedValue(1), + hget: jest.fn().mockResolvedValue(null), + hgetall: jest.fn().mockResolvedValue({}), + expire: jest.fn().mockResolvedValue(1), + zadd: jest.fn().mockResolvedValue(1), + zrem: jest.fn().mockResolvedValue(1), + zcard: jest.fn().mockResolvedValue(0), + zrangebyscore: jest.fn().mockResolvedValue([]), + xadd: jest.fn().mockResolvedValue("123-0"), + xreadgroup: jest.fn().mockResolvedValue(null), + xgroup: jest.fn().mockResolvedValue("OK"), + xpending: jest.fn().mockResolvedValue([0, null, null, []]), + xack: jest.fn().mockResolvedValue(1), + xclaim: jest.fn().mockResolvedValue([]), + xlen: jest.fn().mockResolvedValue(0), + hlen: jest.fn().mockResolvedValue(0), + hdel: jest.fn().mockResolvedValue(1), + quit: jest.fn().mockResolvedValue("OK"), + }; + + MockedRedis.mockImplementation(() => mockRedis); + + queue = new RedisQueue({ redisUrl: "redis://localhost:6379/0" }); + }); + + afterEach(async () => { + await queue.close(); + }); + + describe("initialize", () => { + it("should create consumer group", async () => { + await queue.initialize(); + expect(mockRedis.xgroup).toHaveBeenCalledWith( + "CREATE", + "ade:queue:tasks", + "ade-workers", + "$", + "MKSTREAM" + ); + }); + + it("should not throw if group already exists", async () => { + mockRedis.xgroup.mockRejectedValueOnce(new Error("BUSYGROUP Consumer Group name already exists")); + await expect(queue.initialize()).resolves.not.toThrow(); + }); + }); + + describe("enqueue", () => { + const sampleTask: TaskPayload = { + id: "test-task-1", + kind: "task_notification", + priority: 5, + maxAttempts: 3, + backoffMultiplier: 2, + maxBackoffMs: 300000, + enqueuedAt: Date.now(), + source: "user", + text: "Test task", + }; + + it("should enqueue a task successfully", async () => { + const result = await queue.enqueue(sampleTask); + + expect(result.success).toBe(true); + expect(result.taskId).toBe("test-task-1"); + expect(mockRedis.hset).toHaveBeenCalled(); + expect(mockRedis.expire).toHaveBeenCalled(); + expect(mockRedis.xadd).toHaveBeenCalled(); + }); + + it("should handle delayed tasks", async () => { + const delayedTask: TaskPayload = { + ...sampleTask, + runAfter: Date.now() + 60000, // 1 minute delay + }; + + const result = await queue.enqueue(delayedTask); + + expect(result.success).toBe(true); + expect(mockRedis.zadd).toHaveBeenCalled(); + expect(mockRedis.xadd).not.toHaveBeenCalled(); + }); + + it("should handle errors gracefully", async () => { + mockRedis.hset.mockRejectedValueOnce(new Error("Redis error")); + + const result = await queue.enqueue(sampleTask); + + expect(result.success).toBe(false); + expect(result.error).toBe("Redis error"); + }); + + it("should generate task ID if not provided", async () => { + const taskWithoutId: Omit = { + kind: "message", + priority: 5, + maxAttempts: 3, + backoffMultiplier: 2, + maxBackoffMs: 300000, + enqueuedAt: Date.now(), + source: "system", + }; + + const result = await queue.enqueue(taskWithoutId as TaskPayload); + + expect(result.success).toBe(true); + expect(result.taskId).toBeDefined(); + expect(result.taskId).not.toBe(""); + }); + }); + + describe("claimTasks", () => { + const sampleTaskData: TaskState = { + id: "test-task-1", + kind: "task_notification", + status: "pending", + priority: 5, + maxAttempts: 3, + backoffMultiplier: 2, + maxBackoffMs: 300000, + enqueuedAt: Date.now(), + source: "user", + attemptCount: 0, + }; + + it("should claim tasks from the queue", async () => { + mockRedis.xreadgroup.mockResolvedValueOnce([ + ["ade:queue:tasks", [ + ["123-0", ["taskId", "test-task-1", "payload", "{}", "priority", "5"]] + ]] + ]); + + mockRedis.hgetall.mockResolvedValueOnce({ + id: "test-task-1", + kind: "task_notification", + status: "pending", + priority: "5", + maxAttempts: "3", + backoffMultiplier: "2", + maxBackoffMs: "300000", + enqueuedAt: Date.now().toString(), + source: "user", + attemptCount: "0", + }); + + const tasks = await queue.claimTasks("worker-1", { batchSize: 5 }); + + expect(tasks.length).toBeGreaterThan(0); + expect(tasks[0].id).toBe("test-task-1"); + expect(tasks[0].status).toBe("claimed"); + expect(tasks[0].workerId).toBe("worker-1"); + }); + + it("should return empty array when no tasks available", async () => { + mockRedis.xreadgroup.mockResolvedValueOnce(null); + + const tasks = await queue.claimTasks("worker-1"); + + expect(tasks).toEqual([]); + }); + + it("should skip tasks not found in hash", async () => { + mockRedis.xreadgroup.mockResolvedValueOnce([ + ["ade:queue:tasks", [ + ["123-0", ["taskId", "missing-task", "payload", "{}", "priority", "5"]] + ]] + ]); + + mockRedis.hgetall.mockResolvedValueOnce({}); // Empty = not found + + const tasks = await queue.claimTasks("worker-1"); + + expect(tasks).toEqual([]); + }); + }); + + describe("completeTask", () => { + it("should mark task as completed", async () => { + const taskId = "test-task-1"; + const result = { success: true, data: "completed" }; + + mockRedis.hgetall.mockResolvedValueOnce({ + id: taskId, + kind: "task_notification", + status: "claimed", + priority: "5", + maxAttempts: "3", + backoffMultiplier: "2", + maxBackoffMs: "300000", + enqueuedAt: Date.now().toString(), + source: "user", + attemptCount: "0", + }); + + const success = await queue.completeTask(taskId, result); + + expect(success).toBe(true); + expect(mockRedis.hset).toHaveBeenCalled(); + }); + + it("should return false for non-existent task", async () => { + mockRedis.hgetall.mockResolvedValueOnce({}); + + const success = await queue.completeTask("missing-task"); + + expect(success).toBe(false); + }); + }); + + describe("failTask", () => { + it("should mark task as failed on final attempt", async () => { + const taskId = "test-task-1"; + + mockRedis.hgetall.mockResolvedValueOnce({ + id: taskId, + kind: "task_notification", + status: "claimed", + priority: "5", + maxAttempts: "2", + backoffMultiplier: "2", + maxBackoffMs: "300000", + enqueuedAt: Date.now().toString(), + source: "user", + attemptCount: "1", // Will become 2, which equals maxAttempts + }); + + const success = await queue.failTask(taskId, "Task failed"); + + expect(success).toBe(true); + expect(mockRedis.hset).toHaveBeenCalled(); + // Should set status to failed + const hsetCall = mockRedis.hset.mock.calls[0]; + expect(hsetCall[1].status).toBe("failed"); + }); + + it("should schedule retry on non-final failure", async () => { + const taskId = "test-task-1"; + + mockRedis.hgetall.mockResolvedValueOnce({ + id: taskId, + kind: "task_notification", + status: "claimed", + priority: "5", + maxAttempts: "3", + backoffMultiplier: "2", + maxBackoffMs: "300000", + enqueuedAt: Date.now().toString(), + source: "user", + attemptCount: "0", // Will become 1, retry allowed + }); + + const success = await queue.failTask(taskId, "Temporary error"); + + expect(success).toBe(true); + // Should add to delayed queue for retry + expect(mockRedis.zadd).toHaveBeenCalled(); + expect(mockRedis.hset).toHaveBeenCalled(); + // Should set status back to pending + const hsetCall = mockRedis.hset.mock.calls[0]; + expect(hsetCall[1].status).toBe("pending"); + }); + }); + + describe("promoteDelayedTasks", () => { + it("should promote due delayed tasks", async () => { + const taskId = "delayed-task-1"; + + mockRedis.zrangebyscore.mockResolvedValueOnce([taskId]); + mockRedis.hgetall.mockResolvedValueOnce({ + id: taskId, + kind: "task_notification", + status: "pending", + priority: "5", + maxAttempts: "3", + backoffMultiplier: "2", + maxBackoffMs: "300000", + enqueuedAt: Date.now().toString(), + source: "user", + attemptCount: "1", + }); + + const count = await queue.promoteDelayedTasks(10); + + expect(count).toBe(1); + expect(mockRedis.zrem).toHaveBeenCalled(); + expect(mockRedis.xadd).toHaveBeenCalled(); + }); + + it("should return 0 when no delayed tasks", async () => { + mockRedis.zrangebyscore.mockResolvedValueOnce([]); + + const count = await queue.promoteDelayedTasks(); + + expect(count).toBe(0); + expect(mockRedis.xadd).not.toHaveBeenCalled(); + }); + }); + + describe("worker registration", () => { + it("should register worker", async () => { + await queue.registerWorker("worker-1", { + hostname: "test-host", + pid: 12345, + version: "1.0.0", + }); + + expect(mockRedis.hset).toHaveBeenCalled(); + expect(mockRedis.zadd).toHaveBeenCalled(); + }); + + it("should update heartbeat", async () => { + await queue.updateHeartbeat("worker-1"); + + expect(mockRedis.zadd).toHaveBeenCalled(); + }); + + it("should deregister worker", async () => { + await queue.deregisterWorker("worker-1"); + + expect(mockRedis.hdel).toHaveBeenCalled(); + expect(mockRedis.zrem).toHaveBeenCalled(); + }); + }); + + describe("getStats", () => { + it("should return queue statistics", async () => { + mockRedis.xpending.mockResolvedValue([5, null, null, []]); + mockRedis.zcard.mockResolvedValue(3); + mockRedis.xlen.mockResolvedValue(100); + mockRedis.hlen.mockResolvedValue(2); + + const stats = await queue.getStats(); + + expect(stats.pending).toBe(5); + expect(stats.delayed).toBe(3); + expect(stats.streamLength).toBe(100); + expect(stats.activeWorkers).toBe(2); + }); + }); + + describe("getTask", () => { + it("should return task by ID", async () => { + const taskId = "test-task-1"; + + mockRedis.hgetall.mockResolvedValueOnce({ + id: taskId, + kind: "task_notification", + status: "pending", + priority: "5", + maxAttempts: "3", + backoffMultiplier: "2", + maxBackoffMs: "300000", + enqueuedAt: "1234567890", + source: "user", + attemptCount: "0", + }); + + const task = await queue.getTask(taskId); + + expect(task).not.toBeNull(); + expect(task?.id).toBe(taskId); + expect(task?.kind).toBe("task_notification"); + }); + + it("should return null for non-existent task", async () => { + mockRedis.hgetall.mockResolvedValueOnce({}); + + const task = await queue.getTask("missing-task"); + + expect(task).toBeNull(); + }); + }); + + describe("cancelTask", () => { + it("should cancel a pending task", async () => { + const taskId = "test-task-1"; + + mockRedis.hgetall.mockResolvedValueOnce({ + id: taskId, + kind: "task_notification", + status: "pending", + priority: "5", + maxAttempts: "3", + backoffMultiplier: "2", + maxBackoffMs: "300000", + enqueuedAt: Date.now().toString(), + source: "user", + attemptCount: "0", + }); + + const success = await queue.cancelTask(taskId); + + expect(success).toBe(true); + const hsetCall = mockRedis.hset.mock.calls[0]; + expect(hsetCall[1].status).toBe("cancelled"); + }); + + it("should not cancel completed tasks", async () => { + mockRedis.hgetall.mockResolvedValueOnce({ + id: "test-task-1", + kind: "task_notification", + status: "completed", + priority: "5", + maxAttempts: "3", + backoffMultiplier: "2", + maxBackoffMs: "300000", + enqueuedAt: Date.now().toString(), + source: "user", + attemptCount: "0", + completedAt: Date.now().toString(), + }); + + const success = await queue.cancelTask("test-task-1"); + + expect(success).toBe(false); + }); + }); +}); + +describe("Task utilities", () => { + describe("calculateRetryDelay", () => { + it("should calculate exponential backoff", () => { + const delay0 = calculateRetryDelay({ attempt: 0, baseDelayMs: 1000 }); + const delay1 = calculateRetryDelay({ attempt: 1, baseDelayMs: 1000 }); + const delay2 = calculateRetryDelay({ attempt: 2, baseDelayMs: 1000 }); + + expect(delay0).toBeGreaterThanOrEqual(900); // With jitter + expect(delay0).toBeLessThanOrEqual(1100); + expect(delay1).toBeGreaterThanOrEqual(1800); + expect(delay1).toBeLessThanOrEqual(2200); + expect(delay2).toBeGreaterThanOrEqual(3600); + expect(delay2).toBeLessThanOrEqual(4400); + }); + + it("should cap at maxDelayMs", () => { + const delay = calculateRetryDelay({ + attempt: 10, + baseDelayMs: 1000, + maxDelayMs: 5000, + jitterFactor: 0, // Disable jitter for this test + }); + + expect(delay).toBeLessThanOrEqual(5000); + }); + }); + + describe("serializeTask / deserializeTask", () => { + it("should serialize and deserialize task state", () => { + const task: TaskState = { + id: "test-task", + kind: "task_notification", + status: "pending", + priority: 5, + maxAttempts: 3, + backoffMultiplier: 2, + maxBackoffMs: 300000, + enqueuedAt: 1234567890, + source: "user", + attemptCount: 0, + text: "Test task", + content: { key: "value" }, + isCoalescable: true, + }; + + const serialized = serializeTask(task); + expect(serialized.id).toBe("test-task"); + expect(serialized.content).toBe('{"key":"value"}'); + expect(serialized.isCoalescable).toBe("1"); + + const deserialized = deserializeTask(serialized); + expect(deserialized.id).toBe(task.id); + expect(deserialized.kind).toBe(task.kind); + expect(deserialized.status).toBe(task.status); + expect(deserialized.content).toEqual(task.content); + expect(deserialized.isCoalescable).toBe(true); + }); + }); +}); diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..b1e594d --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,20 @@ +{ + "compilerOptions": { + "target": "ES2020", + "module": "commonjs", + "lib": ["ES2020"], + "outDir": "./dist", + "rootDir": "./src", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "declaration": true, + "declarationMap": true, + "sourceMap": true, + "resolveJsonModule": true, + "moduleResolution": "node" + }, + "include": ["src/**/*"], + "exclude": ["node_modules", "dist", "tests"] +}