Complete RedFlag codebase with two major security audit implementations.
== A-1: Ed25519 Key Rotation Support ==
Server:
- SignCommand sets SignedAt timestamp and KeyID on every signature
- signing_keys database table (migration 020) for multi-key rotation
- InitializePrimaryKey registers active key at startup
- /api/v1/public-keys endpoint for rotation-aware agents
- SigningKeyQueries for key lifecycle management
Agent:
- Key-ID-aware verification via CheckKeyRotation
- FetchAndCacheAllActiveKeys for rotation pre-caching
- Cache metadata with TTL and staleness fallback
- SecurityLogger events for key rotation and command signing
== A-2: Replay Attack Fixes (F-1 through F-7) ==
F-5 CRITICAL - RetryCommand now signs via signAndCreateCommand
F-1 HIGH - v3 format: "{agent_id}:{cmd_id}:{type}:{hash}:{ts}"
F-7 HIGH - Migration 026: expires_at column with partial index
F-6 HIGH - GetPendingCommands/GetStuckCommands filter by expires_at
F-2 HIGH - Agent-side executedIDs dedup map with cleanup
F-4 HIGH - commandMaxAge reduced from 24h to 4h
F-3 CRITICAL - Old-format commands rejected after 48h via CreatedAt
Verification fixes: migration idempotency (ETHOS #4), log format
compliance (ETHOS #1), stale comments updated.
All 24 tests passing. Docker --no-cache build verified.
See docs/ for full audit reports and deviation log (DEV-001 to DEV-019).
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
324 lines
7.1 KiB
Go
324 lines
7.1 KiB
Go
package scheduler
|
|
|
|
import (
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
func TestScheduler_NewScheduler(t *testing.T) {
|
|
config := DefaultConfig()
|
|
s := NewScheduler(config, nil, nil, nil)
|
|
|
|
if s == nil {
|
|
t.Fatal("NewScheduler returned nil")
|
|
}
|
|
|
|
if s.config.NumWorkers != 10 {
|
|
t.Fatalf("expected 10 workers, got %d", s.config.NumWorkers)
|
|
}
|
|
|
|
if s.queue == nil {
|
|
t.Fatal("queue not initialized")
|
|
}
|
|
|
|
if len(s.workers) != config.NumWorkers {
|
|
t.Fatalf("expected %d workers, got %d", config.NumWorkers, len(s.workers))
|
|
}
|
|
}
|
|
|
|
func TestScheduler_DefaultConfig(t *testing.T) {
|
|
config := DefaultConfig()
|
|
|
|
if config.CheckInterval != 10*time.Second {
|
|
t.Fatalf("expected check interval 10s, got %v", config.CheckInterval)
|
|
}
|
|
|
|
if config.LookaheadWindow != 60*time.Second {
|
|
t.Fatalf("expected lookahead 60s, got %v", config.LookaheadWindow)
|
|
}
|
|
|
|
if config.MaxJitter != 30*time.Second {
|
|
t.Fatalf("expected max jitter 30s, got %v", config.MaxJitter)
|
|
}
|
|
|
|
if config.NumWorkers != 10 {
|
|
t.Fatalf("expected 10 workers, got %d", config.NumWorkers)
|
|
}
|
|
|
|
if config.BackpressureThreshold != 5 {
|
|
t.Fatalf("expected backpressure threshold 5, got %d", config.BackpressureThreshold)
|
|
}
|
|
|
|
if config.RateLimitPerSecond != 100 {
|
|
t.Fatalf("expected rate limit 100/s, got %d", config.RateLimitPerSecond)
|
|
}
|
|
}
|
|
|
|
func TestScheduler_QueueIntegration(t *testing.T) {
|
|
config := DefaultConfig()
|
|
s := NewScheduler(config, nil, nil, nil)
|
|
|
|
// Add jobs to queue
|
|
agent1 := uuid.New()
|
|
agent2 := uuid.New()
|
|
|
|
job1 := &SubsystemJob{
|
|
AgentID: agent1,
|
|
AgentHostname: "agent-01",
|
|
Subsystem: "updates",
|
|
IntervalMinutes: 15,
|
|
NextRunAt: time.Now().Add(5 * time.Minute),
|
|
}
|
|
|
|
job2 := &SubsystemJob{
|
|
AgentID: agent2,
|
|
AgentHostname: "agent-02",
|
|
Subsystem: "storage",
|
|
IntervalMinutes: 15,
|
|
NextRunAt: time.Now().Add(10 * time.Minute),
|
|
}
|
|
|
|
s.queue.Push(job1)
|
|
s.queue.Push(job2)
|
|
|
|
if s.queue.Len() != 2 {
|
|
t.Fatalf("expected queue len 2, got %d", s.queue.Len())
|
|
}
|
|
|
|
// Get stats
|
|
stats := s.GetQueueStats()
|
|
if stats.Size != 2 {
|
|
t.Fatalf("expected stats size 2, got %d", stats.Size)
|
|
}
|
|
}
|
|
|
|
func TestScheduler_GetStats(t *testing.T) {
|
|
config := DefaultConfig()
|
|
s := NewScheduler(config, nil, nil, nil)
|
|
|
|
// Initial stats should be zero
|
|
stats := s.GetStats()
|
|
|
|
if stats.JobsProcessed != 0 {
|
|
t.Fatalf("expected 0 jobs processed, got %d", stats.JobsProcessed)
|
|
}
|
|
|
|
if stats.CommandsCreated != 0 {
|
|
t.Fatalf("expected 0 commands created, got %d", stats.CommandsCreated)
|
|
}
|
|
|
|
if stats.BackpressureSkips != 0 {
|
|
t.Fatalf("expected 0 backpressure skips, got %d", stats.BackpressureSkips)
|
|
}
|
|
|
|
// Manually update stats (simulating processing)
|
|
s.mu.Lock()
|
|
s.stats.JobsProcessed = 100
|
|
s.stats.CommandsCreated = 95
|
|
s.stats.BackpressureSkips = 5
|
|
s.mu.Unlock()
|
|
|
|
stats = s.GetStats()
|
|
|
|
if stats.JobsProcessed != 100 {
|
|
t.Fatalf("expected 100 jobs processed, got %d", stats.JobsProcessed)
|
|
}
|
|
|
|
if stats.CommandsCreated != 95 {
|
|
t.Fatalf("expected 95 commands created, got %d", stats.CommandsCreated)
|
|
}
|
|
|
|
if stats.BackpressureSkips != 5 {
|
|
t.Fatalf("expected 5 backpressure skips, got %d", stats.BackpressureSkips)
|
|
}
|
|
}
|
|
|
|
func TestScheduler_StartStop(t *testing.T) {
|
|
config := Config{
|
|
CheckInterval: 100 * time.Millisecond, // Fast for testing
|
|
LookaheadWindow: 60 * time.Second,
|
|
MaxJitter: 1 * time.Second,
|
|
NumWorkers: 2,
|
|
BackpressureThreshold: 5,
|
|
RateLimitPerSecond: 0, // Disable rate limiting for test
|
|
}
|
|
|
|
s := NewScheduler(config, nil, nil, nil)
|
|
|
|
// Start scheduler
|
|
err := s.Start()
|
|
if err != nil {
|
|
t.Fatalf("failed to start scheduler: %v", err)
|
|
}
|
|
|
|
// Let it run for a bit
|
|
time.Sleep(500 * time.Millisecond)
|
|
|
|
// Stop scheduler
|
|
err = s.Stop()
|
|
if err != nil {
|
|
t.Fatalf("failed to stop scheduler: %v", err)
|
|
}
|
|
|
|
// Should stop cleanly
|
|
}
|
|
|
|
func TestScheduler_ProcessQueueEmpty(t *testing.T) {
|
|
config := DefaultConfig()
|
|
s := NewScheduler(config, nil, nil, nil)
|
|
|
|
// Process empty queue should not panic
|
|
s.processQueue()
|
|
|
|
stats := s.GetStats()
|
|
if stats.JobsProcessed != 0 {
|
|
t.Fatalf("expected 0 jobs processed on empty queue, got %d", stats.JobsProcessed)
|
|
}
|
|
}
|
|
|
|
func TestScheduler_ProcessQueueWithJobs(t *testing.T) {
|
|
config := Config{
|
|
CheckInterval: 1 * time.Second,
|
|
LookaheadWindow: 60 * time.Second,
|
|
MaxJitter: 5 * time.Second,
|
|
NumWorkers: 2,
|
|
BackpressureThreshold: 5,
|
|
RateLimitPerSecond: 0, // Disable for test
|
|
}
|
|
|
|
s := NewScheduler(config, nil, nil, nil)
|
|
|
|
// Add jobs that are due now
|
|
for i := 0; i < 5; i++ {
|
|
job := &SubsystemJob{
|
|
AgentID: uuid.New(),
|
|
AgentHostname: "test-agent",
|
|
Subsystem: "updates",
|
|
IntervalMinutes: 15,
|
|
NextRunAt: time.Now(), // Due now
|
|
}
|
|
s.queue.Push(job)
|
|
}
|
|
|
|
if s.queue.Len() != 5 {
|
|
t.Fatalf("expected 5 jobs in queue, got %d", s.queue.Len())
|
|
}
|
|
|
|
// Process the queue
|
|
s.processQueue()
|
|
|
|
// Jobs should be dispatched to job channel
|
|
// Note: Without database, workers can't actually process them
|
|
// But we can verify they were dispatched
|
|
|
|
stats := s.GetStats()
|
|
if stats.JobsProcessed == 0 {
|
|
t.Fatal("expected some jobs to be processed")
|
|
}
|
|
}
|
|
|
|
func TestScheduler_RateLimiterRefill(t *testing.T) {
|
|
config := Config{
|
|
CheckInterval: 1 * time.Second,
|
|
LookaheadWindow: 60 * time.Second,
|
|
MaxJitter: 1 * time.Second,
|
|
NumWorkers: 2,
|
|
BackpressureThreshold: 5,
|
|
RateLimitPerSecond: 10, // 10 tokens per second
|
|
}
|
|
|
|
s := NewScheduler(config, nil, nil, nil)
|
|
|
|
if s.rateLimiter == nil {
|
|
t.Fatal("rate limiter not initialized")
|
|
}
|
|
|
|
// Start refill goroutine
|
|
go s.refillRateLimiter()
|
|
|
|
// Wait for some tokens to be added
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
// Should have some tokens available
|
|
tokensAvailable := 0
|
|
for i := 0; i < 15; i++ {
|
|
select {
|
|
case <-s.rateLimiter:
|
|
tokensAvailable++
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
|
|
if tokensAvailable == 0 {
|
|
t.Fatal("expected some tokens to be available after refill")
|
|
}
|
|
|
|
// Should not exceed buffer size (10)
|
|
if tokensAvailable > 10 {
|
|
t.Fatalf("token bucket overflowed: got %d tokens, max is 10", tokensAvailable)
|
|
}
|
|
}
|
|
|
|
func TestScheduler_ConcurrentQueueAccess(t *testing.T) {
|
|
config := DefaultConfig()
|
|
s := NewScheduler(config, nil, nil, nil)
|
|
|
|
done := make(chan bool)
|
|
|
|
// Concurrent pushes
|
|
go func() {
|
|
for i := 0; i < 100; i++ {
|
|
job := &SubsystemJob{
|
|
AgentID: uuid.New(),
|
|
Subsystem: "updates",
|
|
IntervalMinutes: 15,
|
|
NextRunAt: time.Now(),
|
|
}
|
|
s.queue.Push(job)
|
|
}
|
|
done <- true
|
|
}()
|
|
|
|
// Concurrent stats reads
|
|
go func() {
|
|
for i := 0; i < 100; i++ {
|
|
s.GetStats()
|
|
s.GetQueueStats()
|
|
}
|
|
done <- true
|
|
}()
|
|
|
|
// Wait for both
|
|
<-done
|
|
<-done
|
|
|
|
// Should not panic and should have queued jobs
|
|
if s.queue.Len() <= 0 {
|
|
t.Fatal("expected jobs in queue after concurrent pushes")
|
|
}
|
|
}
|
|
|
|
func BenchmarkScheduler_ProcessQueue(b *testing.B) {
|
|
config := DefaultConfig()
|
|
s := NewScheduler(config, nil, nil, nil)
|
|
|
|
// Pre-fill queue with jobs
|
|
for i := 0; i < 1000; i++ {
|
|
job := &SubsystemJob{
|
|
AgentID: uuid.New(),
|
|
Subsystem: "updates",
|
|
IntervalMinutes: 15,
|
|
NextRunAt: time.Now(),
|
|
}
|
|
s.queue.Push(job)
|
|
}
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
s.processQueue()
|
|
}
|
|
}
|