Complete RedFlag codebase with two major security audit implementations.
== A-1: Ed25519 Key Rotation Support ==
Server:
- SignCommand sets SignedAt timestamp and KeyID on every signature
- signing_keys database table (migration 020) for multi-key rotation
- InitializePrimaryKey registers active key at startup
- /api/v1/public-keys endpoint for rotation-aware agents
- SigningKeyQueries for key lifecycle management
Agent:
- Key-ID-aware verification via CheckKeyRotation
- FetchAndCacheAllActiveKeys for rotation pre-caching
- Cache metadata with TTL and staleness fallback
- SecurityLogger events for key rotation and command signing
== A-2: Replay Attack Fixes (F-1 through F-7) ==
F-5 CRITICAL - RetryCommand now signs via signAndCreateCommand
F-1 HIGH - v3 format: "{agent_id}:{cmd_id}:{type}:{hash}:{ts}"
F-7 HIGH - Migration 026: expires_at column with partial index
F-6 HIGH - GetPendingCommands/GetStuckCommands filter by expires_at
F-2 HIGH - Agent-side executedIDs dedup map with cleanup
F-4 HIGH - commandMaxAge reduced from 24h to 4h
F-3 CRITICAL - Old-format commands rejected after 48h via CreatedAt
Verification fixes: migration idempotency (ETHOS #4), log format
compliance (ETHOS #1), stale comments updated.
All 24 tests passing. Docker --no-cache build verified.
See docs/ for full audit reports and deviation log (DEV-001 to DEV-019).
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
141 lines
4.0 KiB
Go
141 lines
4.0 KiB
Go
package database
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
|
|
"github.com/jmoiron/sqlx"
|
|
_ "github.com/lib/pq"
|
|
)
|
|
|
|
// DB wraps the database connection
|
|
type DB struct {
|
|
*sqlx.DB
|
|
}
|
|
|
|
// Connect establishes a connection to the PostgreSQL database
|
|
func Connect(databaseURL string) (*DB, error) {
|
|
db, err := sqlx.Connect("postgres", databaseURL)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to connect to database: %w", err)
|
|
}
|
|
|
|
// Configure connection pool
|
|
db.SetMaxOpenConns(25)
|
|
db.SetMaxIdleConns(5)
|
|
|
|
// Test the connection
|
|
if err := db.Ping(); err != nil {
|
|
return nil, fmt.Errorf("failed to ping database: %w", err)
|
|
}
|
|
|
|
return &DB{db}, nil
|
|
}
|
|
|
|
// Migrate runs database migrations with proper tracking
|
|
func (db *DB) Migrate(migrationsPath string) error {
|
|
// Create migrations table if it doesn't exist
|
|
createTableSQL := `
|
|
CREATE TABLE IF NOT EXISTS schema_migrations (
|
|
version VARCHAR(255) PRIMARY KEY,
|
|
applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
|
|
)`
|
|
if _, err := db.Exec(createTableSQL); err != nil {
|
|
return fmt.Errorf("failed to create migrations table: %w", err)
|
|
}
|
|
|
|
// Read migration files
|
|
files, err := os.ReadDir(migrationsPath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read migrations directory: %w", err)
|
|
}
|
|
|
|
// Filter and sort .up.sql files
|
|
var migrationFiles []string
|
|
for _, file := range files {
|
|
if strings.HasSuffix(file.Name(), ".up.sql") {
|
|
migrationFiles = append(migrationFiles, file.Name())
|
|
}
|
|
}
|
|
sort.Strings(migrationFiles)
|
|
|
|
// Execute migrations that haven't been applied yet
|
|
for _, filename := range migrationFiles {
|
|
// Check if migration has already been applied
|
|
var count int
|
|
err := db.Get(&count, "SELECT COUNT(*) FROM schema_migrations WHERE version = $1", filename)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to check migration status for %s: %w", filename, err)
|
|
}
|
|
|
|
if count > 0 {
|
|
fmt.Printf("→ Skipping migration (already applied): %s\n", filename)
|
|
continue
|
|
}
|
|
|
|
// Read migration file
|
|
path := filepath.Join(migrationsPath, filename)
|
|
content, err := os.ReadFile(path)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read migration %s: %w", filename, err)
|
|
}
|
|
|
|
// Execute migration in a transaction
|
|
tx, err := db.Beginx()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to begin transaction for migration %s: %w", filename, err)
|
|
}
|
|
|
|
// Execute the migration SQL
|
|
if _, err := tx.Exec(string(content)); err != nil {
|
|
// Check if it's an "already exists" error
|
|
if strings.Contains(err.Error(), "already exists") ||
|
|
strings.Contains(err.Error(), "duplicate key") ||
|
|
strings.Contains(err.Error(), "relation") && strings.Contains(err.Error(), "already exists") {
|
|
|
|
// Rollback the failed transaction
|
|
tx.Rollback()
|
|
|
|
// Check if this migration was already recorded as applied
|
|
var count int
|
|
checkErr := db.Get(&count, "SELECT COUNT(*) FROM schema_migrations WHERE version = $1", filename)
|
|
if checkErr == nil && count > 0 {
|
|
// Migration was already applied, just skip it
|
|
fmt.Printf("⚠ Migration %s already applied, skipping\n", filename)
|
|
} else {
|
|
// Migration failed and wasn't applied - this is a real error
|
|
return fmt.Errorf("migration %s failed with 'already exists' but migration not recorded: %w", filename, err)
|
|
}
|
|
continue
|
|
}
|
|
|
|
// For any other error, rollback and fail
|
|
tx.Rollback()
|
|
return fmt.Errorf("failed to execute migration %s: %w", filename, err)
|
|
}
|
|
|
|
// Record the migration as applied (normal success path)
|
|
if _, err := tx.Exec("INSERT INTO schema_migrations (version) VALUES ($1)", filename); err != nil {
|
|
tx.Rollback()
|
|
return fmt.Errorf("failed to record migration %s: %w", filename, err)
|
|
}
|
|
|
|
// Commit the transaction
|
|
if err := tx.Commit(); err != nil {
|
|
return fmt.Errorf("failed to commit migration %s: %w", filename, err)
|
|
}
|
|
|
|
fmt.Printf("✓ Successfully executed migration: %s\n", filename)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close closes the database connection
|
|
func (db *DB) Close() error {
|
|
return db.DB.Close()
|
|
}
|