Files
Redflag/aggregator-agent/internal/event/buffer.go
jpetree331 f97d4845af feat(security): A-1 Ed25519 key rotation + A-2 replay attack fixes
Complete RedFlag codebase with two major security audit implementations.

== A-1: Ed25519 Key Rotation Support ==

Server:
- SignCommand sets SignedAt timestamp and KeyID on every signature
- signing_keys database table (migration 020) for multi-key rotation
- InitializePrimaryKey registers active key at startup
- /api/v1/public-keys endpoint for rotation-aware agents
- SigningKeyQueries for key lifecycle management

Agent:
- Key-ID-aware verification via CheckKeyRotation
- FetchAndCacheAllActiveKeys for rotation pre-caching
- Cache metadata with TTL and staleness fallback
- SecurityLogger events for key rotation and command signing

== A-2: Replay Attack Fixes (F-1 through F-7) ==

F-5 CRITICAL - RetryCommand now signs via signAndCreateCommand
F-1 HIGH     - v3 format: "{agent_id}:{cmd_id}:{type}:{hash}:{ts}"
F-7 HIGH     - Migration 026: expires_at column with partial index
F-6 HIGH     - GetPendingCommands/GetStuckCommands filter by expires_at
F-2 HIGH     - Agent-side executedIDs dedup map with cleanup
F-4 HIGH     - commandMaxAge reduced from 24h to 4h
F-3 CRITICAL - Old-format commands rejected after 48h via CreatedAt

Verification fixes: migration idempotency (ETHOS #4), log format
compliance (ETHOS #1), stale comments updated.

All 24 tests passing. Docker --no-cache build verified.
See docs/ for full audit reports and deviation log (DEV-001 to DEV-019).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-28 21:25:47 -04:00

135 lines
3.0 KiB
Go

package event
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"github.com/google/uuid"
"sync"
"github.com/Fimeg/RedFlag/aggregator-agent/internal/models"
)
const (
defaultMaxBufferSize = 1000 // Max events to buffer
)
// Buffer handles local event buffering for offline resilience
type Buffer struct {
filePath string
maxSize int
mu sync.Mutex
}
// NewBuffer creates a new event buffer with the specified file path
func NewBuffer(filePath string) *Buffer {
return &Buffer{
filePath: filePath,
maxSize: defaultMaxBufferSize,
}
}
// BufferEvent saves an event to the local buffer file
func (b *Buffer) BufferEvent(event *models.SystemEvent) error {
b.mu.Lock()
defer b.mu.Unlock()
// Ensure event has an ID
if event.ID == uuid.Nil {
return fmt.Errorf("event ID cannot be nil")
}
// Create directory if needed
dir := filepath.Dir(b.filePath)
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("failed to create buffer directory: %w", err)
}
// Read existing buffer
var events []*models.SystemEvent
if data, err := os.ReadFile(b.filePath); err == nil {
if err := json.Unmarshal(data, &events); err != nil {
// If we can't unmarshal, start fresh
events = []*models.SystemEvent{}
}
}
// Append new event
events = append(events, event)
// Keep only last N events if buffer too large (circular buffer)
if len(events) > b.maxSize {
events = events[len(events)-b.maxSize:]
}
// Write back to file
data, err := json.Marshal(events)
if err != nil {
return fmt.Errorf("failed to marshal events: %w", err)
}
if err := os.WriteFile(b.filePath, data, 0644); err != nil {
return fmt.Errorf("failed to write buffer file: %w", err)
}
return nil
}
// GetBufferedEvents retrieves and clears the buffer
func (b *Buffer) GetBufferedEvents() ([]*models.SystemEvent, error) {
b.mu.Lock()
defer b.mu.Unlock()
// Read buffer file
var events []*models.SystemEvent
data, err := os.ReadFile(b.filePath)
if err != nil {
if os.IsNotExist(err) {
return nil, nil // No buffer file means no events
}
return nil, fmt.Errorf("failed to read buffer file: %w", err)
}
if err := json.Unmarshal(data, &events); err != nil {
return nil, fmt.Errorf("failed to unmarshal events: %w", err)
}
// Clear buffer file after reading
if err := os.Remove(b.filePath); err != nil && !os.IsNotExist(err) {
// Log warning but don't fail - events were still retrieved
fmt.Printf("Warning: Failed to clear buffer file: %v\n", err)
}
return events, nil
}
// SetMaxSize sets the maximum number of events to buffer
func (b *Buffer) SetMaxSize(size int) {
b.mu.Lock()
defer b.mu.Unlock()
b.maxSize = size
}
// GetStats returns buffer statistics
func (b *Buffer) GetStats() (int, error) {
b.mu.Lock()
defer b.mu.Unlock()
data, err := os.ReadFile(b.filePath)
if err != nil {
if os.IsNotExist(err) {
return 0, nil
}
return 0, err
}
var events []*models.SystemEvent
if err := json.Unmarshal(data, &events); err != nil {
return 0, err
}
return len(events), nil
}