Files
Redflag/aggregator-agent/internal/acknowledgment/tracker.go
jpetree331 f97d4845af feat(security): A-1 Ed25519 key rotation + A-2 replay attack fixes
Complete RedFlag codebase with two major security audit implementations.

== A-1: Ed25519 Key Rotation Support ==

Server:
- SignCommand sets SignedAt timestamp and KeyID on every signature
- signing_keys database table (migration 020) for multi-key rotation
- InitializePrimaryKey registers active key at startup
- /api/v1/public-keys endpoint for rotation-aware agents
- SigningKeyQueries for key lifecycle management

Agent:
- Key-ID-aware verification via CheckKeyRotation
- FetchAndCacheAllActiveKeys for rotation pre-caching
- Cache metadata with TTL and staleness fallback
- SecurityLogger events for key rotation and command signing

== A-2: Replay Attack Fixes (F-1 through F-7) ==

F-5 CRITICAL - RetryCommand now signs via signAndCreateCommand
F-1 HIGH     - v3 format: "{agent_id}:{cmd_id}:{type}:{hash}:{ts}"
F-7 HIGH     - Migration 026: expires_at column with partial index
F-6 HIGH     - GetPendingCommands/GetStuckCommands filter by expires_at
F-2 HIGH     - Agent-side executedIDs dedup map with cleanup
F-4 HIGH     - commandMaxAge reduced from 24h to 4h
F-3 CRITICAL - Old-format commands rejected after 48h via CreatedAt

Verification fixes: migration idempotency (ETHOS #4), log format
compliance (ETHOS #1), stale comments updated.

All 24 tests passing. Docker --no-cache build verified.
See docs/ for full audit reports and deviation log (DEV-001 to DEV-019).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-28 21:25:47 -04:00

194 lines
4.0 KiB
Go

package acknowledgment
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"
)
// PendingResult represents a command result awaiting acknowledgment
type PendingResult struct {
CommandID string `json:"command_id"`
SentAt time.Time `json:"sent_at"`
RetryCount int `json:"retry_count"`
}
// Tracker manages pending acknowledgments for command results
type Tracker struct {
pending map[string]*PendingResult
mu sync.RWMutex
filePath string
maxAge time.Duration // Max time to keep pending (default 24h)
maxRetries int // Max retries before giving up (default 10)
}
// NewTracker creates a new acknowledgment tracker
func NewTracker(statePath string) *Tracker {
return &Tracker{
pending: make(map[string]*PendingResult),
filePath: filepath.Join(statePath, "pending_acks.json"),
maxAge: 24 * time.Hour,
maxRetries: 10,
}
}
// Load restores pending acknowledgments from disk
func (t *Tracker) Load() error {
t.mu.Lock()
defer t.mu.Unlock()
// If file doesn't exist, that's fine (fresh start)
if _, err := os.Stat(t.filePath); os.IsNotExist(err) {
return nil
}
data, err := os.ReadFile(t.filePath)
if err != nil {
return fmt.Errorf("failed to read pending acks: %w", err)
}
if len(data) == 0 {
return nil // Empty file
}
var pending map[string]*PendingResult
if err := json.Unmarshal(data, &pending); err != nil {
return fmt.Errorf("failed to parse pending acks: %w", err)
}
t.pending = pending
return nil
}
// Save persists pending acknowledgments to disk
func (t *Tracker) Save() error {
t.mu.RLock()
defer t.mu.RUnlock()
// Ensure directory exists
dir := filepath.Dir(t.filePath)
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("failed to create ack directory: %w", err)
}
data, err := json.MarshalIndent(t.pending, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal pending acks: %w", err)
}
if err := os.WriteFile(t.filePath, data, 0600); err != nil {
return fmt.Errorf("failed to write pending acks: %w", err)
}
return nil
}
// Add marks a command result as pending acknowledgment
func (t *Tracker) Add(commandID string) {
t.mu.Lock()
defer t.mu.Unlock()
t.pending[commandID] = &PendingResult{
CommandID: commandID,
SentAt: time.Now(),
RetryCount: 0,
}
}
// Acknowledge marks command results as acknowledged and removes them
func (t *Tracker) Acknowledge(commandIDs []string) {
t.mu.Lock()
defer t.mu.Unlock()
for _, id := range commandIDs {
delete(t.pending, id)
}
}
// GetPending returns list of command IDs awaiting acknowledgment
func (t *Tracker) GetPending() []string {
t.mu.RLock()
defer t.mu.RUnlock()
ids := make([]string, 0, len(t.pending))
for id := range t.pending {
ids = append(ids, id)
}
return ids
}
// IncrementRetry increments retry count for a command
func (t *Tracker) IncrementRetry(commandID string) {
t.mu.Lock()
defer t.mu.Unlock()
if result, exists := t.pending[commandID]; exists {
result.RetryCount++
}
}
// Cleanup removes old or over-retried pending results
func (t *Tracker) Cleanup() int {
t.mu.Lock()
defer t.mu.Unlock()
now := time.Now()
removed := 0
for id, result := range t.pending {
// Remove if too old
if now.Sub(result.SentAt) > t.maxAge {
delete(t.pending, id)
removed++
continue
}
// Remove if retried too many times
if result.RetryCount >= t.maxRetries {
delete(t.pending, id)
removed++
continue
}
}
return removed
}
// Stats returns statistics about pending acknowledgments
func (t *Tracker) Stats() Stats {
t.mu.RLock()
defer t.mu.RUnlock()
stats := Stats{
Total: len(t.pending),
}
now := time.Now()
for _, result := range t.pending {
age := now.Sub(result.SentAt)
if age > 1*time.Hour {
stats.OlderThan1Hour++
}
if result.RetryCount > 0 {
stats.WithRetries++
}
if result.RetryCount >= 5 {
stats.HighRetries++
}
}
return stats
}
// Stats holds statistics about pending acknowledgments
type Stats struct {
Total int
OlderThan1Hour int
WithRetries int
HighRetries int
}