diff --git a/cmd/hyperfleet-api/migrate/cmd.go b/cmd/hyperfleet-api/migrate/cmd.go index 89b8d34..0ad9a9e 100755 --- a/cmd/hyperfleet-api/migrate/cmd.go +++ b/cmd/hyperfleet-api/migrate/cmd.go @@ -53,11 +53,11 @@ func runMigrateWithError() error { } }() - if err := db.Migrate(connection.New(ctx)); err != nil { + // Use MigrateWithLock to prevent concurrent migrations from multiple pods + if err := db.MigrateWithLock(ctx, connection); err != nil { logger.WithError(ctx, err).Error("Migration failed") return err } - logger.Info(ctx, "Migration completed successfully") return nil } diff --git a/docs/database.md b/docs/database.md index e5b3d6a..f6f5e8b 100644 --- a/docs/database.md +++ b/docs/database.md @@ -61,6 +61,46 @@ Uses GORM AutoMigrate: - Additive (creates missing tables, columns, indexes) - Run via `./bin/hyperfleet-api migrate` +### Migration Coordination + +**Problem:** During rolling deployments, multiple pods attempt to run migrations simultaneously, causing race conditions and deployment failures. + +**Solution:** PostgreSQL advisory locks ensure exclusive migration execution. + +#### How It Works + +```go +// Only one pod/process acquires the lock and runs migrations +// Others wait until the lock is released +db.MigrateWithLock(ctx, factory) +``` + +**Implementation:** +1. Pod acquires advisory lock via `pg_advisory_xact_lock(hash("migrations"), hash("Migrations"))` +2. Lock holder runs migrations exclusively +3. Other pods block until lock is released +4. Lock automatically released on transaction commit + +**Key Features:** +- **Zero infrastructure overhead** - Uses native PostgreSQL locks +- **Automatic cleanup** - Locks released on transaction end or pod crash +- **Nested lock support** - Same lock can be acquired in nested contexts without deadlock +- **UUID-based ownership** - Only original acquirer can unlock + +#### Testing Concurrent Migrations + +Integration tests validate concurrent behavior: + +```bash +make test-integration # Runs TestConcurrentMigrations +``` + +**Test coverage:** +- `TestConcurrentMigrations` - Multiple pods running migrations simultaneously +- `TestAdvisoryLocksConcurrently` - Lock serialization under race conditions +- `TestAdvisoryLocksWithTransactions` - Lock + transaction interaction +- `TestAdvisoryLockBlocking` - Lock blocking behavior + ## Database Setup ```bash diff --git a/pkg/db/advisory_locks.go b/pkg/db/advisory_locks.go new file mode 100644 index 0000000..4fd4c8e --- /dev/null +++ b/pkg/db/advisory_locks.go @@ -0,0 +1,107 @@ +package db + +import ( + "context" + "errors" + "hash/fnv" + "time" + + "gorm.io/gorm" +) + +// LockType represents the type of advisory lock +type LockType string + +const ( + // Migrations lock type for database migrations + Migrations LockType = "Migrations" +) + +// AdvisoryLock represents a postgres advisory lock +// +// begin # start a Tx +// select pg_advisory_xact_lock(id, lockType) # obtain the lock (blocking) +// end # end the Tx and release the lock +// +// ownerUUID is a way to own the lock. Only the very first +// service call that owns the lock will have the correct ownerUUID. This is necessary +// to allow functions to call other service functions as part of the same lock (id, lockType). +type AdvisoryLock struct { + g2 *gorm.DB + txid int64 + ownerUUID *string + id *string + lockType *LockType + startTime time.Time +} + +// newAdvisoryLock constructs a new AdvisoryLock object. +func newAdvisoryLock(ctx context.Context, connection SessionFactory, ownerUUID *string, id *string, locktype *LockType) (*AdvisoryLock, error) { + if connection == nil { + return nil, errors.New("AdvisoryLock: connection factory is missing") + } + + // it requires a new DB session to start the advisory lock. + g2 := connection.New(ctx) + + // start a Tx to ensure gorm will obtain/release the lock using a same connection. + tx := g2.Begin() + if tx.Error != nil { + return nil, tx.Error + } + + // current transaction ID set by postgres. these are *not* distinct across time + // and do get reset after postgres performs "vacuuming" to reclaim used IDs. + var txid struct{ ID int64 } + err := tx.Raw("select txid_current() as id").Scan(&txid).Error + + return &AdvisoryLock{ + txid: txid.ID, + ownerUUID: ownerUUID, + id: id, + lockType: locktype, + g2: tx, + startTime: time.Now(), + }, err +} + +// lock calls select pg_advisory_xact_lock(id, lockType) to obtain the lock defined by (id, lockType). +// it is blocked if some other thread currently is holding the same lock (id, lockType). +// if blocked, it can be unblocked or timed out when overloaded. +func (l *AdvisoryLock) lock() error { + if l.g2 == nil { + return errors.New("AdvisoryLock: transaction is missing") + } + if l.id == nil { + return errors.New("AdvisoryLock: id is missing") + } + if l.lockType == nil { + return errors.New("AdvisoryLock: lockType is missing") + } + + idAsInt := hash(*l.id) + typeAsInt := hash(string(*l.lockType)) + err := l.g2.Exec("select pg_advisory_xact_lock(?, ?)", idAsInt, typeAsInt).Error + return err +} + +func (l *AdvisoryLock) unlock() error { + if l.g2 == nil { + return errors.New("AdvisoryLock: transaction is missing") + } + + // it ends the Tx and implicitly releases the lock. + err := l.g2.Commit().Error + l.g2 = nil + return err +} + +// hash string to int32 (postgres integer) +// https://pkg.go.dev/math#pkg-constants +// https://www.postgresql.org/docs/12/datatype-numeric.html +func hash(s string) int32 { + h := fnv.New32a() + h.Write([]byte(s)) + // Sum32() returns uint32. needs conversion. + return int32(h.Sum32()) +} diff --git a/pkg/db/context.go b/pkg/db/context.go index 06c5114..8727839 100755 --- a/pkg/db/context.go +++ b/pkg/db/context.go @@ -3,10 +3,33 @@ package db import ( "context" + "github.com/google/uuid" + dbContext "github.com/openshift-hyperfleet/hyperfleet-api/pkg/db/db_context" "github.com/openshift-hyperfleet/hyperfleet-api/pkg/logger" ) +type advisoryLockKey string + +const ( + advisoryLock advisoryLockKey = "advisoryLock" +) + +type advisoryLockMap map[string]*AdvisoryLock + +func (m advisoryLockMap) key(id string, lockType LockType) string { + return id + ":" + string(lockType) +} + +func (m advisoryLockMap) get(id string, lockType LockType) (*AdvisoryLock, bool) { + lock, ok := m[m.key(id, lockType)] + return lock, ok +} + +func (m advisoryLockMap) set(id string, lockType LockType, lock *AdvisoryLock) { + m[m.key(id, lockType)] = lock +} + // NewContext returns a new context with transaction stored in it. // Upon error, the original context is still returned along with an error func NewContext(ctx context.Context, connection SessionFactory) (context.Context, error) { @@ -53,3 +76,74 @@ func MarkForRollback(ctx context.Context, err error) { transaction.SetRollbackFlag(true) logger.WithError(ctx, err).Info("Marked transaction for rollback") } + +// NewAdvisoryLockContext returns a new context with AdvisoryLock stored in it. +// Upon error, the original context is still returned along with an error +func NewAdvisoryLockContext(ctx context.Context, connection SessionFactory, id string, lockType LockType) (context.Context, string, error) { + // lockOwnerID will be different for every service function that attempts to start a lock. + // only the initial call in the stack must unlock. + // Unlock() will compare UUIDs and ensure only the top level call succeeds. + lockOwnerID := uuid.New().String() + + locks, found := ctx.Value(advisoryLock).(advisoryLockMap) + if found { + if _, ok := locks.get(id, lockType); ok { + return ctx, lockOwnerID, nil + } + } else { + locks = make(advisoryLockMap) + } + + lock, err := newAdvisoryLock(ctx, connection, &lockOwnerID, &id, &lockType) + if err != nil { + logger.WithError(ctx, err).Error("Failed to create advisory lock") + return ctx, lockOwnerID, err + } + + // obtain the advisory lock (blocking) + err = lock.lock() + if err != nil { + logger.WithError(ctx, err).Error("Failed to acquire advisory lock") + return ctx, lockOwnerID, err + } + + locks.set(id, lockType, lock) + + ctx = context.WithValue(ctx, advisoryLock, locks) + logger.With(ctx, "lock_id", id, "lock_type", lockType).Info("Acquired advisory lock") + + return ctx, lockOwnerID, nil +} + +// Unlock searches current locks and unlocks the one matching its owner id. +func Unlock(ctx context.Context, callerUUID string) context.Context { + locks, ok := ctx.Value(advisoryLock).(advisoryLockMap) + if !ok { + logger.Error(ctx, "Could not retrieve locks from context") + return ctx + } + + for k, lock := range locks { + if lock.ownerUUID == nil { + logger.With(ctx, "lock_id", lock.id).Warn("lockOwnerID could not be found in AdvisoryLock") + } else if *lock.ownerUUID == callerUUID { + lockID := "" + lockType := *lock.lockType + if lock.id != nil { + lockID = *lock.id + } + + if err := lock.unlock(); err != nil { + logger.With(ctx, "lock_id", lockID, "lock_type", lockType).WithError(err).Error("Could not unlock lock") + } else { + logger.With(ctx, "lock_id", lockID, "lock_type", lockType).Info("Unlocked lock") + } + delete(locks, k) + } else { + // the resolving UUID belongs to a service call that did *not* initiate the lock. + // it is ignored. + } + } + + return ctx +} diff --git a/pkg/db/migrations.go b/pkg/db/migrations.go index 63fa6e0..008b24c 100755 --- a/pkg/db/migrations.go +++ b/pkg/db/migrations.go @@ -24,6 +24,27 @@ func Migrate(g2 *gorm.DB) error { return nil } +// MigrateWithLock runs migrations with an advisory lock to prevent concurrent migrations +func MigrateWithLock(ctx context.Context, factory SessionFactory) error { + // Acquire advisory lock for migrations + ctx, lockOwnerID, err := NewAdvisoryLockContext(ctx, factory, "migrations", Migrations) + if err != nil { + logger.WithError(ctx, err).Error("Could not lock migrations") + return err + } + defer Unlock(ctx, lockOwnerID) + + // Run migrations with the locked context + g2 := factory.New(ctx) + if err := Migrate(g2); err != nil { + logger.WithError(ctx, err).Error("Could not migrate") + return err + } + + logger.Info(ctx, "Migration completed successfully") + return nil +} + // MigrateTo a specific migration will not seed the database, seeds are up to date with the latest // schema based on the most recent migration // This should be for testing purposes mainly diff --git a/test/integration/advisory_locks_test.go b/test/integration/advisory_locks_test.go new file mode 100644 index 0000000..07e6d64 --- /dev/null +++ b/test/integration/advisory_locks_test.go @@ -0,0 +1,321 @@ +package integration + +import ( + "context" + "math/rand" + "sync" + "testing" + "time" + + "github.com/openshift-hyperfleet/hyperfleet-api/pkg/db" + "github.com/openshift-hyperfleet/hyperfleet-api/test" +) + +// TestAdvisoryLocksConcurrently validates that advisory locks properly serialize +// concurrent access to shared resources. This simulates a race condition where +// multiple threads try to access and modify the same variable. +func TestAdvisoryLocksConcurrently(t *testing.T) { + helper := test.NewHelper(t) + + total := 10 + var waiter sync.WaitGroup + waiter.Add(total) + + // Simulate a race condition where multiple threads are trying to access and modify the "total" var. + // The acquireLock func uses an advisory lock so the accesses to "total" should be properly serialized. + for i := 0; i < total; i++ { + go acquireLock(helper, &total, &waiter) + } + + // Wait for all goroutines to complete + waiter.Wait() + + // All goroutines should have decremented total by 1, resulting in 0 + if total != 0 { + t.Errorf("Expected total to be 0, got %d", total) + } +} + +func acquireLock(helper *test.Helper, total *int, waiter *sync.WaitGroup) { + ctx := context.Background() + + // Acquire advisory lock + ctx, lockOwnerID, err := db.NewAdvisoryLockContext(ctx, helper.DBFactory, "test-resource", db.Migrations) + if err != nil { + helper.T.Errorf("Failed to acquire lock: %v", err) + waiter.Done() + return + } + defer db.Unlock(ctx, lockOwnerID) + + // Pretend loading "total" from DB + initTotal := *total + + // Some slow work to increase the likelihood of race conditions + time.Sleep(20 * time.Millisecond) + + // Pretend saving "total" to DB + finalTotal := initTotal - 1 + *total = finalTotal + + waiter.Done() +} + +// TestAdvisoryLocksWithTransactions validates that advisory locks work correctly +// when combined with database transactions in various orders +func TestAdvisoryLocksWithTransactions(t *testing.T) { + helper := test.NewHelper(t) + + total := 10 + var waiter sync.WaitGroup + waiter.Add(total) + + for i := 0; i < total; i++ { + go acquireLockWithTransaction(helper, &total, &waiter) + } + + waiter.Wait() + + if total != 0 { + t.Errorf("Expected total to be 0, got %d", total) + } +} + +func acquireLockWithTransaction(helper *test.Helper, total *int, waiter *sync.WaitGroup) { + ctx := context.Background() + + // Lock and Tx can be stored within the same context. They should be independent of each other. + // It doesn't matter if a Tx coexists or not, nor does it matter if it occurs before or after the lock + r := rand.Intn(3) // no Tx if r == 2 + txBeforeLock := r == 0 + txAfterLock := r == 1 + + var dberr error + + // Randomly add Tx before lock to demonstrate it works + if txBeforeLock { + ctx, dberr = db.NewContext(ctx, helper.DBFactory) + if dberr != nil { + helper.T.Errorf("Failed to create transaction context: %v", dberr) + waiter.Done() + return + } + defer db.Resolve(ctx) + } + + // Acquire advisory lock + ctx, lockOwnerID, dberr := db.NewAdvisoryLockContext(ctx, helper.DBFactory, "test-resource-tx", db.Migrations) + if dberr != nil { + helper.T.Errorf("Failed to acquire lock: %v", dberr) + waiter.Done() + return + } + defer db.Unlock(ctx, lockOwnerID) + + // Randomly add Tx after lock to demonstrate it works + if txAfterLock { + ctx, dberr = db.NewContext(ctx, helper.DBFactory) + if dberr != nil { + helper.T.Errorf("Failed to create transaction context: %v", dberr) + waiter.Done() + return + } + defer db.Resolve(ctx) + } + + // Pretend loading "total" from DB + initTotal := *total + + // Some slow work + time.Sleep(20 * time.Millisecond) + + // Pretend saving "total" to DB + finalTotal := initTotal - 1 + *total = finalTotal + + waiter.Done() +} + +// TestLocksAndExpectedWaits validates the behavior of advisory locks: +// - Nested locks with the same (id, lockType) should not create additional locks +// - Different (id, lockType) combinations should create separate locks +// - Unlocking should only affect the lock matching the owner ID +func TestLocksAndExpectedWaits(t *testing.T) { + helper := test.NewHelper(t) + + // Start lock + ctx := context.Background() + ctx, lockOwnerID, err := db.NewAdvisoryLockContext(ctx, helper.DBFactory, "system", db.Migrations) + if err != nil { + t.Fatalf("Failed to acquire lock: %v", err) + } + + // It should have 1 lock + g2 := helper.DBFactory.New(ctx) + var pgLocks []struct{ Granted bool } + g2.Raw("select granted from pg_locks WHERE locktype = 'advisory' and granted = true").Scan(&pgLocks) + if len(pgLocks) != 1 { + t.Errorf("Expected 1 lock, got %d", len(pgLocks)) + } + + // Successive locking should have no effect (nested lock with same id/type) + // Pretend this runs in a nested func + ctx, lockOwnerID2, err := db.NewAdvisoryLockContext(ctx, helper.DBFactory, "system", db.Migrations) + if err != nil { + t.Fatalf("Failed to acquire nested lock: %v", err) + } + // It should still have 1 lock + pgLocks = nil + g2.Raw("select granted from pg_locks WHERE locktype = 'advisory' and granted = true").Scan(&pgLocks) + if len(pgLocks) != 1 { + t.Errorf("Expected 1 lock after nested acquire, got %d", len(pgLocks)) + } + + // Unlock should have no effect either (unlocking nested lock) + // Pretend this runs in the nested func + db.Unlock(ctx, lockOwnerID2) + // It should still have 1 lock + pgLocks = nil + g2.Raw("select granted from pg_locks WHERE locktype = 'advisory' and granted = true").Scan(&pgLocks) + if len(pgLocks) != 1 { + t.Errorf("Expected 1 lock after nested unlock, got %d", len(pgLocks)) + } + + // Lock on a different (id, lockType) should work + // Pretend this runs in a nested func + ctx, lockOwnerID3, err := db.NewAdvisoryLockContext(ctx, helper.DBFactory, "diff_system", db.Migrations) + if err != nil { + t.Fatalf("Failed to acquire different lock: %v", err) + } + // It should have 2 locks + pgLocks = nil + g2.Raw("select granted from pg_locks WHERE locktype = 'advisory' and granted = true").Scan(&pgLocks) + if len(pgLocks) != 2 { + t.Errorf("Expected 2 locks, got %d", len(pgLocks)) + } + + // Pretend it releases the new lock in the nested func + db.Unlock(ctx, lockOwnerID3) + // It should have 1 lock + pgLocks = nil + g2.Raw("select granted from pg_locks WHERE locktype = 'advisory' and granted = true").Scan(&pgLocks) + if len(pgLocks) != 1 { + t.Errorf("Expected 1 lock after releasing different lock, got %d", len(pgLocks)) + } + + // Unlock the topmost lock + // Pretend it returns back to the parent func + db.Unlock(ctx, lockOwnerID) + // The lock should be gone + pgLocks = nil + g2.Raw("select granted from pg_locks WHERE locktype = 'advisory' and granted = true").Scan(&pgLocks) + if len(pgLocks) != 0 { + t.Errorf("Expected 0 locks after final unlock, got %d", len(pgLocks)) + } +} + +// TestConcurrentMigrations validates that the MigrateWithLock function +// properly serializes concurrent migration attempts, ensuring only one +// instance actually runs migrations at a time. +func TestConcurrentMigrations(t *testing.T) { + helper := test.NewHelper(t) + + // First, reset the database to a clean state + if err := helper.ResetDB(); err != nil { + t.Fatalf("Failed to reset database: %v", err) + } + + total := 5 + var waiter sync.WaitGroup + waiter.Add(total) + + // Track which goroutines successfully acquired the lock + var successCount int + var mu sync.Mutex + errors := make([]error, 0) + + // Simulate multiple pods trying to run migrations concurrently + for i := 0; i < total; i++ { + go func(id int) { + defer waiter.Done() + + ctx := context.Background() + err := db.MigrateWithLock(ctx, helper.DBFactory) + + mu.Lock() + defer mu.Unlock() + + if err != nil { + errors = append(errors, err) + } else { + successCount++ + } + }(i) + } + + waiter.Wait() + + // All migrations should succeed (they're idempotent) + if len(errors) > 0 { + t.Errorf("Expected no errors, but got %d: %v", len(errors), errors) + } + + // All goroutines should complete successfully + if successCount != total { + t.Errorf("Expected %d successful migrations, got %d", total, successCount) + } +} + +// TestAdvisoryLockBlocking validates that a second goroutine trying to acquire +// the same lock will block until the first goroutine releases it. +func TestAdvisoryLockBlocking(t *testing.T) { + helper := test.NewHelper(t) + + ctx := context.Background() + + // First goroutine acquires the lock + ctx1, lockOwnerID1, err := db.NewAdvisoryLockContext(ctx, helper.DBFactory, "blocking-test", db.Migrations) + if err != nil { + t.Fatalf("Failed to acquire first lock: %v", err) + } + + // Track when the second goroutine acquires the lock + acquired := make(chan bool, 1) + released := make(chan bool, 1) + + // Second goroutine tries to acquire the same lock + go func() { + ctx2, lockOwnerID2, err := db.NewAdvisoryLockContext(context.Background(), helper.DBFactory, "blocking-test", db.Migrations) + if err != nil { + t.Errorf("Failed to acquire second lock: %v", err) + return + } + defer db.Unlock(ctx2, lockOwnerID2) + + acquired <- true + <-released // Wait for signal to release + }() + + // Give the second goroutine time to start waiting + time.Sleep(100 * time.Millisecond) + + // The second goroutine should still be blocked + select { + case <-acquired: + t.Error("Second goroutine acquired lock while first still holds it") + default: + // Expected: second goroutine is still blocked + } + + // Release the first lock + db.Unlock(ctx1, lockOwnerID1) + + // Now the second goroutine should acquire the lock + select { + case <-acquired: + // Expected: second goroutine acquired the lock + released <- true + case <-time.After(5 * time.Second): + t.Error("Second goroutine did not acquire lock after first was released") + } +}