Files
jpetree331 ec0d880036 fix(database): B-1 schema integrity and migration fixes
- Fix migration 024 self-insert and bad column reference (F-B1-1, F-B1-2)
  Uses existing enabled/auto_run columns instead of non-existent deprecated
- Abort server on migration failure instead of warning (F-B1-11)
  main.go now calls log.Fatalf, prints [INFO] only on success
- Fix migration 018 scanner_config filename suffix (F-B1-3)
  Renumbered to 027 with .up.sql suffix
- Remove GRANT to non-existent role in scanner_config (F-B1-4)
- Resolve duplicate migration numbers 009 and 012 (F-B1-13)
  Renamed to 009b and 012b for unique lexical sorting
- Add IF NOT EXISTS to all non-idempotent migrations (F-B1-15)
  Fixed: 011, 012, 017, 023, 023a
- Replace N+1 dashboard stats loop with GetAllUpdateStats (F-B1-6)
  Single aggregate query replaces per-agent loop
- Add composite index on agent_commands(status, sent_at) (F-B1-5)
  New migration 028 with partial index for timeout service
- Add background refresh token cleanup goroutine (F-B1-10)
  24-hour ticker calls CleanupExpiredTokens
- ETHOS log format in migration runner (no emojis)

All 55 tests pass (41 server + 14 agent). No regressions.
See docs/B1_Fix_Implementation.md and DEV-025 through DEV-028.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 07:03:35 -04:00

142 lines
4.0 KiB
Go

package database
import (
"fmt"
"log"
"os"
"path/filepath"
"sort"
"strings"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
)
// DB wraps the database connection
type DB struct {
*sqlx.DB
}
// Connect establishes a connection to the PostgreSQL database
func Connect(databaseURL string) (*DB, error) {
db, err := sqlx.Connect("postgres", databaseURL)
if err != nil {
return nil, fmt.Errorf("failed to connect to database: %w", err)
}
// Configure connection pool
db.SetMaxOpenConns(25)
db.SetMaxIdleConns(5)
// Test the connection
if err := db.Ping(); err != nil {
return nil, fmt.Errorf("failed to ping database: %w", err)
}
return &DB{db}, nil
}
// Migrate runs database migrations with proper tracking
func (db *DB) Migrate(migrationsPath string) error {
// Create migrations table if it doesn't exist
createTableSQL := `
CREATE TABLE IF NOT EXISTS schema_migrations (
version VARCHAR(255) PRIMARY KEY,
applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
)`
if _, err := db.Exec(createTableSQL); err != nil {
return fmt.Errorf("failed to create migrations table: %w", err)
}
// Read migration files
files, err := os.ReadDir(migrationsPath)
if err != nil {
return fmt.Errorf("failed to read migrations directory: %w", err)
}
// Filter and sort .up.sql files
var migrationFiles []string
for _, file := range files {
if strings.HasSuffix(file.Name(), ".up.sql") {
migrationFiles = append(migrationFiles, file.Name())
}
}
sort.Strings(migrationFiles)
// Execute migrations that haven't been applied yet
for _, filename := range migrationFiles {
// Check if migration has already been applied
var count int
err := db.Get(&count, "SELECT COUNT(*) FROM schema_migrations WHERE version = $1", filename)
if err != nil {
return fmt.Errorf("failed to check migration status for %s: %w", filename, err)
}
if count > 0 {
log.Printf("[INFO] [server] [database] migration_skipped version=%s already_applied=true", filename)
continue
}
// Read migration file
path := filepath.Join(migrationsPath, filename)
content, err := os.ReadFile(path)
if err != nil {
return fmt.Errorf("failed to read migration %s: %w", filename, err)
}
// Execute migration in a transaction
tx, err := db.Beginx()
if err != nil {
return fmt.Errorf("failed to begin transaction for migration %s: %w", filename, err)
}
// Execute the migration SQL
if _, err := tx.Exec(string(content)); err != nil {
// Check if it's an "already exists" error
if strings.Contains(err.Error(), "already exists") ||
strings.Contains(err.Error(), "duplicate key") ||
strings.Contains(err.Error(), "relation") && strings.Contains(err.Error(), "already exists") {
// Rollback the failed transaction
tx.Rollback()
// Check if this migration was already recorded as applied
var count int
checkErr := db.Get(&count, "SELECT COUNT(*) FROM schema_migrations WHERE version = $1", filename)
if checkErr == nil && count > 0 {
// Migration was already applied, just skip it
log.Printf("[INFO] [server] [database] migration_already_applied version=%s", filename)
} else {
// Migration failed and wasn't applied - this is a real error
return fmt.Errorf("migration %s failed with 'already exists' but migration not recorded: %w", filename, err)
}
continue
}
// For any other error, rollback and fail
tx.Rollback()
return fmt.Errorf("failed to execute migration %s: %w", filename, err)
}
// Record the migration as applied (normal success path)
if _, err := tx.Exec("INSERT INTO schema_migrations (version) VALUES ($1)", filename); err != nil {
tx.Rollback()
return fmt.Errorf("failed to record migration %s: %w", filename, err)
}
// Commit the transaction
if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit migration %s: %w", filename, err)
}
log.Printf("[INFO] [server] [database] migration_applied version=%s", filename)
}
return nil
}
// Close closes the database connection
func (db *DB) Close() error {
return db.DB.Close()
}