Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/hyperfleet-api/migrate/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ func runMigrateWithError() error {
}
}()

if err := db.Migrate(connection.New(ctx)); err != nil {
// Use MigrateWithLock to prevent concurrent migrations from multiple pods
if err := db.MigrateWithLock(ctx, connection); err != nil {
logger.WithError(ctx, err).Error("Migration failed")
return err
}

logger.Info(ctx, "Migration completed successfully")
return nil
}
40 changes: 40 additions & 0 deletions docs/database.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,46 @@ Uses GORM AutoMigrate:
- Additive (creates missing tables, columns, indexes)
- Run via `./bin/hyperfleet-api migrate`

### Migration Coordination

**Problem:** During rolling deployments, multiple pods attempt to run migrations simultaneously, causing race conditions and deployment failures.

**Solution:** PostgreSQL advisory locks ensure exclusive migration execution.

#### How It Works

```go
// Only one pod/process acquires the lock and runs migrations
// Others wait until the lock is released
db.MigrateWithLock(ctx, factory)
```

**Implementation:**
1. Pod acquires advisory lock via `pg_advisory_xact_lock(hash("migrations"), hash("Migrations"))`
2. Lock holder runs migrations exclusively
3. Other pods block until lock is released
4. Lock automatically released on transaction commit

**Key Features:**
- **Zero infrastructure overhead** - Uses native PostgreSQL locks
- **Automatic cleanup** - Locks released on transaction end or pod crash
- **Nested lock support** - Same lock can be acquired in nested contexts without deadlock
- **UUID-based ownership** - Only original acquirer can unlock

#### Testing Concurrent Migrations

Integration tests validate concurrent behavior:

```bash
make test-integration # Runs TestConcurrentMigrations
```

**Test coverage:**
- `TestConcurrentMigrations` - Multiple pods running migrations simultaneously
- `TestAdvisoryLocksConcurrently` - Lock serialization under race conditions
- `TestAdvisoryLocksWithTransactions` - Lock + transaction interaction
- `TestAdvisoryLockBlocking` - Lock blocking behavior

## Database Setup

```bash
Expand Down
107 changes: 107 additions & 0 deletions pkg/db/advisory_locks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package db

import (
"context"
"errors"
"hash/fnv"
"time"

"gorm.io/gorm"
)

// LockType represents the type of advisory lock
type LockType string

const (
// Migrations lock type for database migrations
Migrations LockType = "Migrations"
)

// AdvisoryLock represents a postgres advisory lock
//
// begin # start a Tx
// select pg_advisory_xact_lock(id, lockType) # obtain the lock (blocking)
// end # end the Tx and release the lock
//
// ownerUUID is a way to own the lock. Only the very first
// service call that owns the lock will have the correct ownerUUID. This is necessary
// to allow functions to call other service functions as part of the same lock (id, lockType).
type AdvisoryLock struct {
g2 *gorm.DB
txid int64
ownerUUID *string
id *string
lockType *LockType
startTime time.Time
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

`txid` and `startTime` are set but never read anywhere. If `startTime` is intended for future lock-duration logging, it'd be nice to wire it into `unlock()` now (e.g., log the hold duration). Otherwise these are dead fields and the `txid_current()` query is a wasted round-trip per lock.

// newAdvisoryLock constructs a new AdvisoryLock object.
func newAdvisoryLock(ctx context.Context, connection SessionFactory, ownerUUID *string, id *string, locktype *LockType) (*AdvisoryLock, error) {
if connection == nil {
return nil, errors.New("AdvisoryLock: connection factory is missing")
}

// it requires a new DB session to start the advisory lock.
g2 := connection.New(ctx)

// start a Tx to ensure gorm will obtain/release the lock using a same connection.
tx := g2.Begin()
if tx.Error != nil {
return nil, tx.Error
}

// current transaction ID set by postgres. these are *not* distinct across time
// and do get reset after postgres performs "vacuuming" to reclaim used IDs.
var txid struct{ ID int64 }
err := tx.Raw("select txid_current() as id").Scan(&txid).Error

return &AdvisoryLock{
txid: txid.ID,
ownerUUID: ownerUUID,
id: id,
lockType: locktype,
g2: tx,
startTime: time.Now(),
}, err
}

// lock calls select pg_advisory_xact_lock(id, lockType) to obtain the lock defined by (id, lockType).
// it is blocked if some other thread currently is holding the same lock (id, lockType).
// if blocked, it can be unblocked or timed out when overloaded.
func (l *AdvisoryLock) lock() error {
if l.g2 == nil {
return errors.New("AdvisoryLock: transaction is missing")
}
if l.id == nil {
return errors.New("AdvisoryLock: id is missing")
}
if l.lockType == nil {
return errors.New("AdvisoryLock: lockType is missing")
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

`pg_advisory_xact_lock` blocks indefinitely — if a pod hangs holding the migration lock (stuck migration, network partition), every other pod's init container blocks forever waiting.

The Helm chart doesn't set `activeDeadlineSeconds` on the pod spec either, so there's no Kubernetes-level safety net.

A couple of options:

  1. Use `pg_try_advisory_xact_lock()` in a retry loop with a configurable timeout
  2. `SET LOCAL statement_timeout = '300s'` on the lock session before acquiring
  3. At minimum, add `activeDeadlineSeconds` to the pod spec as a backstop

I'd lean toward option 2 — it's a one-liner and gives a clean error if the lock can't be acquired within 5 minutes.

idAsInt := hash(*l.id)
typeAsInt := hash(string(*l.lockType))
err := l.g2.Exec("select pg_advisory_xact_lock(?, ?)", idAsInt, typeAsInt).Error
return err
}

func (l *AdvisoryLock) unlock() error {
if l.g2 == nil {
return errors.New("AdvisoryLock: transaction is missing")
}

// it ends the Tx and implicitly releases the lock.
err := l.g2.Commit().Error
l.g2 = nil
return err
}

// hash string to int32 (postgres integer)
// https://pkg.go.dev/math#pkg-constants
// https://www.postgresql.org/docs/12/datatype-numeric.html
func hash(s string) int32 {
h := fnv.New32a()
h.Write([]byte(s))
// Sum32() returns uint32. needs conversion.
return int32(h.Sum32())
}
94 changes: 94 additions & 0 deletions pkg/db/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,33 @@ package db
import (
"context"

"github.com/google/uuid"

dbContext "github.com/openshift-hyperfleet/hyperfleet-api/pkg/db/db_context"
"github.com/openshift-hyperfleet/hyperfleet-api/pkg/logger"
)

type advisoryLockKey string

const (
advisoryLock advisoryLockKey = "advisoryLock"
)

type advisoryLockMap map[string]*AdvisoryLock

func (m advisoryLockMap) key(id string, lockType LockType) string {
return id + ":" + string(lockType)
}

func (m advisoryLockMap) get(id string, lockType LockType) (*AdvisoryLock, bool) {
lock, ok := m[m.key(id, lockType)]
return lock, ok
}

func (m advisoryLockMap) set(id string, lockType LockType, lock *AdvisoryLock) {
m[m.key(id, lockType)] = lock
}

// NewContext returns a new context with transaction stored in it.
// Upon error, the original context is still returned along with an error
func NewContext(ctx context.Context, connection SessionFactory) (context.Context, error) {
Expand Down Expand Up @@ -53,3 +76,74 @@ func MarkForRollback(ctx context.Context, err error) {
transaction.SetRollbackFlag(true)
logger.WithError(ctx, err).Info("Marked transaction for rollback")
}

// NewAdvisoryLockContext returns a new context with AdvisoryLock stored in it.
// Upon error, the original context is still returned along with an error
func NewAdvisoryLockContext(ctx context.Context, connection SessionFactory, id string, lockType LockType) (context.Context, string, error) {
// lockOwnerID will be different for every service function that attempts to start a lock.
// only the initial call in the stack must unlock.
// Unlock() will compare UUIDs and ensure only the top level call succeeds.
lockOwnerID := uuid.New().String()

locks, found := ctx.Value(advisoryLock).(advisoryLockMap)
if found {
if _, ok := locks.get(id, lockType); ok {
return ctx, lockOwnerID, nil
}
} else {
locks = make(advisoryLockMap)
}

lock, err := newAdvisoryLock(ctx, connection, &lockOwnerID, &id, &lockType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Priority: Bug

The two error paths in NewAdvisoryLockContext (lines 97-100 and 104-107) leak the GORM
transaction created inside newAdvisoryLock. When either newAdvisoryLock or lock.lock() fails,
lock.g2 holds an open transaction that is never committed or rolled back.

The cleanest fix is to handle it inside newAdvisoryLock itself — roll back the transaction if
txid_current() fails:

  var txid struct{ ID int64 }
  err := tx.Raw("select txid_current() as id").Scan(&txid).Error
  if err != nil {
      tx.Rollback()
      return nil, err
  }

  return &AdvisoryLock{
      txid:      txid.ID,
      ownerUUID: ownerUUID,
      id:        id,
      lockType:  locktype,
      g2:        tx,
      startTime: time.Now(),
  }, nil

And in NewAdvisoryLockContext, roll back if lock.lock() fails:

err = lock.lock()
if err != nil {
    logger.WithError(ctx, err).Error("Failed to acquire advisory lock")
    lock.g2.Rollback() // clean up the open transaction
    return ctx, lockOwnerID, err
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

`advisoryLockMap` is a plain Go map stored via `context.WithValue`. The map is retrieved and mutated in place here — since maps are reference types, this mutation is visible to anything holding the same context. Go maps aren't safe for concurrent access and the runtime will panic if two goroutines hit the same map.

Current usage is fine (each init container starts from `context.Background()`), but since this is an exported API, a future caller sharing contexts across goroutines could hit a data race.

Worth either adding a `sync.Mutex` to the map wrapper or a doc comment on `NewAdvisoryLockContext` noting that the returned context must not be shared across goroutines.

if err != nil {
logger.WithError(ctx, err).Error("Failed to create advisory lock")
return ctx, lockOwnerID, err
}

// obtain the advisory lock (blocking)
err = lock.lock()
if err != nil {
logger.WithError(ctx, err).Error("Failed to acquire advisory lock")
return ctx, lockOwnerID, err
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

`Unlock` returns `context.Context` but every caller discards it (`defer Unlock(ctx, lockOwnerID)`). Since the map is mutated in place via `delete()`, the returned context is the exact same object. The return type suggests immutable context semantics but doesn't deliver them.

Simplest fix: drop the return value to match the `defer` usage pattern.

locks.set(id, lockType, lock)

ctx = context.WithValue(ctx, advisoryLock, locks)
logger.With(ctx, "lock_id", id, "lock_type", lockType).Info("Acquired advisory lock")

return ctx, lockOwnerID, nil
}

// Unlock searches current locks and unlocks the one matching its owner id.
func Unlock(ctx context.Context, callerUUID string) context.Context {
locks, ok := ctx.Value(advisoryLock).(advisoryLockMap)
if !ok {
logger.Error(ctx, "Could not retrieve locks from context")
return ctx
}

for k, lock := range locks {
if lock.ownerUUID == nil {
logger.With(ctx, "lock_id", lock.id).Warn("lockOwnerID could not be found in AdvisoryLock")
} else if *lock.ownerUUID == callerUUID {
lockID := "<missing>"
lockType := *lock.lockType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Priority: Bug

Line 131 dereferences lock.lockType without a nil check, but both lock.id (line 132) and
lock.ownerUUID (line 127) have nil guards. Should be consistent — if these fields are
pointers, guard them all:

  lockID := "<missing>"
  lockTypeStr := LockType("<missing>")
  if lock.lockType != nil {
      lockTypeStr = *lock.lockType
  }
  if lock.id != nil {
      lockID = *lock.id
  }

Minor, but a panic in Unlock during cleanup would be hard to debug in production.

if lock.id != nil {
lockID = *lock.id
}

if err := lock.unlock(); err != nil {
logger.With(ctx, "lock_id", lockID, "lock_type", lockType).WithError(err).Error("Could not unlock lock")
} else {
logger.With(ctx, "lock_id", lockID, "lock_type", lockType).Info("Unlocked lock")
}
delete(locks, k)
} else {
// the resolving UUID belongs to a service call that did *not* initiate the lock.
// it is ignored.
}
}

return ctx
}
21 changes: 21 additions & 0 deletions pkg/db/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,27 @@ func Migrate(g2 *gorm.DB) error {
return nil
}

// MigrateWithLock runs migrations with an advisory lock to prevent concurrent migrations
func MigrateWithLock(ctx context.Context, factory SessionFactory) error {
// Acquire advisory lock for migrations
ctx, lockOwnerID, err := NewAdvisoryLockContext(ctx, factory, "migrations", Migrations)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Priority: Pattern

The lock ID "migrations" on line 30 is a bare string, while the lock type already uses a
constant (Migrations). Since advisory_locks.go already has a constants block for lock types,
consider adding the lock ID there too:

  const (
      // Migrations lock type for database migrations
      Migrations LockType = "Migrations"

      // MigrationsLockID is the advisory lock ID used for migration coordination
      MigrationsLockID = "migrations"
  )

Then in migrations.go:

Suggested change
ctx, lockOwnerID, err := NewAdvisoryLockContext(ctx, factory, "migrations", Migrations)
ctx, lockOwnerID, err := NewAdvisoryLockContext(ctx, factory, MigrationsLockID, Migrations)

if err != nil {
logger.WithError(ctx, err).Error("Could not lock migrations")
return err
}
defer Unlock(ctx, lockOwnerID)

// Run migrations with the locked context
g2 := factory.New(ctx)
if err := Migrate(g2); err != nil {
logger.WithError(ctx, err).Error("Could not migrate")
return err
}

logger.Info(ctx, "Migration completed successfully")
return nil
}

// MigrateTo a specific migration will not seed the database, seeds are up to date with the latest
// schema based on the most recent migration
// This should be for testing purposes mainly
Expand Down
Loading