Files
Redflag/aggregator-server/internal/scheduler/queue.go
jpetree331 f97d4845af feat(security): A-1 Ed25519 key rotation + A-2 replay attack fixes
Complete RedFlag codebase with two major security audit implementations.

== A-1: Ed25519 Key Rotation Support ==

Server:
- SignCommand sets SignedAt timestamp and KeyID on every signature
- signing_keys database table (migration 020) for multi-key rotation
- InitializePrimaryKey registers active key at startup
- /api/v1/public-keys endpoint for rotation-aware agents
- SigningKeyQueries for key lifecycle management

Agent:
- Key-ID-aware verification via CheckKeyRotation
- FetchAndCacheAllActiveKeys for rotation pre-caching
- Cache metadata with TTL and staleness fallback
- SecurityLogger events for key rotation and command signing

== A-2: Replay Attack Fixes (F-1 through F-7) ==

F-5 CRITICAL - RetryCommand now signs via signAndCreateCommand
F-1 HIGH     - v3 format: "{agent_id}:{cmd_id}:{type}:{hash}:{ts}"
F-7 HIGH     - Migration 026: expires_at column with partial index
F-6 HIGH     - GetPendingCommands/GetStuckCommands filter by expires_at
F-2 HIGH     - Agent-side executedIDs dedup map with cleanup
F-4 HIGH     - commandMaxAge reduced from 24h to 4h
F-3 CRITICAL - Old-format commands rejected after 48h via CreatedAt

Verification fixes: migration idempotency (ETHOS #4), log format
compliance (ETHOS #1), stale comments updated.

All 24 tests passing. Docker --no-cache build verified.
See docs/ for full audit reports and deviation log (DEV-001 to DEV-019).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-28 21:25:47 -04:00

287 lines
6.4 KiB
Go

package scheduler
import (
"container/heap"
"fmt"
"sync"
"time"
"github.com/google/uuid"
)
// SubsystemJob represents a scheduled subsystem scan
type SubsystemJob struct {
AgentID uuid.UUID
AgentHostname string // For logging/debugging
Subsystem string
IntervalMinutes int
NextRunAt time.Time
Enabled bool
index int // Heap index (managed by heap.Interface)
}
// String returns a human-readable representation of the job
func (j *SubsystemJob) String() string {
return fmt.Sprintf("[%s/%s] next_run=%s interval=%dm",
j.AgentHostname, j.Subsystem,
j.NextRunAt.Format("15:04:05"), j.IntervalMinutes)
}
// jobHeap implements heap.Interface for SubsystemJob priority queue
// Jobs are ordered by NextRunAt (earliest first)
type jobHeap []*SubsystemJob
func (h jobHeap) Len() int { return len(h) }
func (h jobHeap) Less(i, j int) bool {
return h[i].NextRunAt.Before(h[j].NextRunAt)
}
func (h jobHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
func (h *jobHeap) Push(x interface{}) {
n := len(*h)
job := x.(*SubsystemJob)
job.index = n
*h = append(*h, job)
}
func (h *jobHeap) Pop() interface{} {
old := *h
n := len(old)
job := old[n-1]
old[n-1] = nil // Avoid memory leak
job.index = -1 // Mark as removed
*h = old[0 : n-1]
return job
}
// PriorityQueue is a thread-safe priority queue for subsystem jobs
// Jobs are ordered by their NextRunAt timestamp (earliest first)
type PriorityQueue struct {
heap jobHeap
mu sync.RWMutex
// Index for fast lookups by agent_id + subsystem
index map[string]*SubsystemJob // key: "agent_id:subsystem"
}
// NewPriorityQueue creates a new empty priority queue
func NewPriorityQueue() *PriorityQueue {
pq := &PriorityQueue{
heap: make(jobHeap, 0),
index: make(map[string]*SubsystemJob),
}
heap.Init(&pq.heap)
return pq
}
// Push adds a job to the queue
// If a job with the same agent_id + subsystem already exists, it's updated
func (pq *PriorityQueue) Push(job *SubsystemJob) {
pq.mu.Lock()
defer pq.mu.Unlock()
key := makeKey(job.AgentID, job.Subsystem)
// Check if job already exists
if existing, exists := pq.index[key]; exists {
// Update existing job
existing.NextRunAt = job.NextRunAt
existing.IntervalMinutes = job.IntervalMinutes
existing.Enabled = job.Enabled
existing.AgentHostname = job.AgentHostname
heap.Fix(&pq.heap, existing.index)
return
}
// Add new job
heap.Push(&pq.heap, job)
pq.index[key] = job
}
// Pop removes and returns the job with the earliest NextRunAt
// Returns nil if queue is empty
func (pq *PriorityQueue) Pop() *SubsystemJob {
pq.mu.Lock()
defer pq.mu.Unlock()
if pq.heap.Len() == 0 {
return nil
}
job := heap.Pop(&pq.heap).(*SubsystemJob)
key := makeKey(job.AgentID, job.Subsystem)
delete(pq.index, key)
return job
}
// Peek returns the job with the earliest NextRunAt without removing it
// Returns nil if queue is empty
func (pq *PriorityQueue) Peek() *SubsystemJob {
pq.mu.RLock()
defer pq.mu.RUnlock()
if pq.heap.Len() == 0 {
return nil
}
return pq.heap[0]
}
// Remove removes a specific job from the queue
// Returns true if job was found and removed, false otherwise
func (pq *PriorityQueue) Remove(agentID uuid.UUID, subsystem string) bool {
pq.mu.Lock()
defer pq.mu.Unlock()
key := makeKey(agentID, subsystem)
job, exists := pq.index[key]
if !exists {
return false
}
heap.Remove(&pq.heap, job.index)
delete(pq.index, key)
return true
}
// Get retrieves a specific job without removing it
// Returns nil if not found
func (pq *PriorityQueue) Get(agentID uuid.UUID, subsystem string) *SubsystemJob {
pq.mu.RLock()
defer pq.mu.RUnlock()
key := makeKey(agentID, subsystem)
return pq.index[key]
}
// PopBefore returns all jobs with NextRunAt <= before, up to limit
// Jobs are removed from the queue
// If limit <= 0, all matching jobs are returned
func (pq *PriorityQueue) PopBefore(before time.Time, limit int) []*SubsystemJob {
pq.mu.Lock()
defer pq.mu.Unlock()
var jobs []*SubsystemJob
for pq.heap.Len() > 0 {
// Peek at next job
next := pq.heap[0]
// Stop if next job is after our cutoff
if next.NextRunAt.After(before) {
break
}
// Stop if we've hit the limit
if limit > 0 && len(jobs) >= limit {
break
}
// Pop and collect the job
job := heap.Pop(&pq.heap).(*SubsystemJob)
key := makeKey(job.AgentID, job.Subsystem)
delete(pq.index, key)
jobs = append(jobs, job)
}
return jobs
}
// PeekBefore returns all jobs with NextRunAt <= before without removing them
// If limit <= 0, all matching jobs are returned
func (pq *PriorityQueue) PeekBefore(before time.Time, limit int) []*SubsystemJob {
pq.mu.RLock()
defer pq.mu.RUnlock()
var jobs []*SubsystemJob
for i := 0; i < pq.heap.Len(); i++ {
job := pq.heap[i]
if job.NextRunAt.After(before) {
// Since heap is sorted by NextRunAt, we can break early
// Note: This is only valid because we peek in order
break
}
if limit > 0 && len(jobs) >= limit {
break
}
jobs = append(jobs, job)
}
return jobs
}
// Len returns the number of jobs in the queue
func (pq *PriorityQueue) Len() int {
pq.mu.RLock()
defer pq.mu.RUnlock()
return pq.heap.Len()
}
// Clear removes all jobs from the queue
func (pq *PriorityQueue) Clear() {
pq.mu.Lock()
defer pq.mu.Unlock()
pq.heap = make(jobHeap, 0)
pq.index = make(map[string]*SubsystemJob)
heap.Init(&pq.heap)
}
// GetStats returns statistics about the queue
func (pq *PriorityQueue) GetStats() QueueStats {
pq.mu.RLock()
defer pq.mu.RUnlock()
stats := QueueStats{
Size: pq.heap.Len(),
}
if pq.heap.Len() > 0 {
stats.NextRunAt = &pq.heap[0].NextRunAt
stats.OldestJob = pq.heap[0].String()
}
// Count jobs by subsystem
stats.JobsBySubsystem = make(map[string]int)
for _, job := range pq.heap {
stats.JobsBySubsystem[job.Subsystem]++
}
return stats
}
// QueueStats holds statistics about the priority queue
type QueueStats struct {
Size int
NextRunAt *time.Time
OldestJob string
JobsBySubsystem map[string]int
}
// String returns a human-readable representation of stats
func (s QueueStats) String() string {
nextRun := "empty"
if s.NextRunAt != nil {
nextRun = s.NextRunAt.Format("15:04:05")
}
return fmt.Sprintf("size=%d next=%s oldest=%s", s.Size, nextRun, s.OldestJob)
}
// makeKey creates a unique key for agent_id + subsystem
func makeKey(agentID uuid.UUID, subsystem string) string {
return agentID.String() + ":" + subsystem
}