- Fix migration conflicts and duplicate key errors - Remove duplicate scan logging from agents - Fix AgentHealth UI and Storage page triggers - Prevent scans from appearing on wrong pages Fixes duplicate key violations on fresh installs and storage scans appearing on Updates page.
141 lines
4.0 KiB
Go
141 lines
4.0 KiB
Go
package database
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
|
|
"github.com/jmoiron/sqlx"
|
|
_ "github.com/lib/pq"
|
|
)
|
|
|
|
// DB wraps the database connection
|
|
type DB struct {
|
|
*sqlx.DB
|
|
}
|
|
|
|
// Connect establishes a connection to the PostgreSQL database
|
|
func Connect(databaseURL string) (*DB, error) {
|
|
db, err := sqlx.Connect("postgres", databaseURL)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to connect to database: %w", err)
|
|
}
|
|
|
|
// Configure connection pool
|
|
db.SetMaxOpenConns(25)
|
|
db.SetMaxIdleConns(5)
|
|
|
|
// Test the connection
|
|
if err := db.Ping(); err != nil {
|
|
return nil, fmt.Errorf("failed to ping database: %w", err)
|
|
}
|
|
|
|
return &DB{db}, nil
|
|
}
|
|
|
|
// Migrate runs database migrations with proper tracking
|
|
func (db *DB) Migrate(migrationsPath string) error {
|
|
// Create migrations table if it doesn't exist
|
|
createTableSQL := `
|
|
CREATE TABLE IF NOT EXISTS schema_migrations (
|
|
version VARCHAR(255) PRIMARY KEY,
|
|
applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
|
|
)`
|
|
if _, err := db.Exec(createTableSQL); err != nil {
|
|
return fmt.Errorf("failed to create migrations table: %w", err)
|
|
}
|
|
|
|
// Read migration files
|
|
files, err := os.ReadDir(migrationsPath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read migrations directory: %w", err)
|
|
}
|
|
|
|
// Filter and sort .up.sql files
|
|
var migrationFiles []string
|
|
for _, file := range files {
|
|
if strings.HasSuffix(file.Name(), ".up.sql") {
|
|
migrationFiles = append(migrationFiles, file.Name())
|
|
}
|
|
}
|
|
sort.Strings(migrationFiles)
|
|
|
|
// Execute migrations that haven't been applied yet
|
|
for _, filename := range migrationFiles {
|
|
// Check if migration has already been applied
|
|
var count int
|
|
err := db.Get(&count, "SELECT COUNT(*) FROM schema_migrations WHERE version = $1", filename)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to check migration status for %s: %w", filename, err)
|
|
}
|
|
|
|
if count > 0 {
|
|
fmt.Printf("→ Skipping migration (already applied): %s\n", filename)
|
|
continue
|
|
}
|
|
|
|
// Read migration file
|
|
path := filepath.Join(migrationsPath, filename)
|
|
content, err := os.ReadFile(path)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read migration %s: %w", filename, err)
|
|
}
|
|
|
|
// Execute migration in a transaction
|
|
tx, err := db.Beginx()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to begin transaction for migration %s: %w", filename, err)
|
|
}
|
|
|
|
// Execute the migration SQL
|
|
if _, err := tx.Exec(string(content)); err != nil {
|
|
// Check if it's an "already exists" error
|
|
if strings.Contains(err.Error(), "already exists") ||
|
|
strings.Contains(err.Error(), "duplicate key") ||
|
|
strings.Contains(err.Error(), "relation") && strings.Contains(err.Error(), "already exists") {
|
|
|
|
// Rollback the failed transaction
|
|
tx.Rollback()
|
|
|
|
// Check if this migration was already recorded as applied
|
|
var count int
|
|
checkErr := db.Get(&count, "SELECT COUNT(*) FROM schema_migrations WHERE version = $1", filename)
|
|
if checkErr == nil && count > 0 {
|
|
// Migration was already applied, just skip it
|
|
fmt.Printf("⚠ Migration %s already applied, skipping\n", filename)
|
|
} else {
|
|
// Migration failed and wasn't applied - this is a real error
|
|
return fmt.Errorf("migration %s failed with 'already exists' but migration not recorded: %w", filename, err)
|
|
}
|
|
continue
|
|
}
|
|
|
|
// For any other error, rollback and fail
|
|
tx.Rollback()
|
|
return fmt.Errorf("failed to execute migration %s: %w", filename, err)
|
|
}
|
|
|
|
// Record the migration as applied (normal success path)
|
|
if _, err := tx.Exec("INSERT INTO schema_migrations (version) VALUES ($1)", filename); err != nil {
|
|
tx.Rollback()
|
|
return fmt.Errorf("failed to record migration %s: %w", filename, err)
|
|
}
|
|
|
|
// Commit the transaction
|
|
if err := tx.Commit(); err != nil {
|
|
return fmt.Errorf("failed to commit migration %s: %w", filename, err)
|
|
}
|
|
|
|
fmt.Printf("✓ Successfully executed migration: %s\n", filename)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close closes the database connection
|
|
func (db *DB) Close() error {
|
|
return db.DB.Close()
|
|
}
|