Skip to content

HYPERFLEET-618: add PostgreSQL advisory locks for migration coordination#72

Open
ldornele wants to merge 1 commit intoopenshift-hyperfleet:mainfrom
ldornele:HYPERFLEET-618
Open

HYPERFLEET-618: add PostgreSQL advisory locks for migration coordination#72
ldornele wants to merge 1 commit intoopenshift-hyperfleet:mainfrom
ldornele:HYPERFLEET-618

Conversation

@ldornele
Copy link

@ldornele ldornele commented Mar 8, 2026

Summary

Adds PostgreSQL advisory locks to coordinate database migrations during rolling deployments, preventing race conditions when multiple pods start simultaneously.

Problem

During rolling deployments (e.g., scaling from 0→3 replicas), each pod's init container attempts to run database migrations concurrently. The current implementation using gormigrate lacks coordination for concurrent execution, leading to:

  • Race conditions between migration attempts
  • Duplicate migration executions causing database errors
  • Failed deployments and pod crashes
  • Inconsistent database state

Solution: PostgreSQL Advisory Locks

We chose PostgreSQL advisory locks over other coordination mechanisms for the following reasons:

Why Advisory Locks?

Zero infrastructure overhead

  • No additional Kubernetes resources (Jobs, ConfigMaps, leader election)
  • No external coordination service required
  • Pure PostgreSQL native functionality

Automatic cleanup

  • Locks automatically released on transaction commit/rollback
  • Pod crashes don't leave stale locks
  • No manual cleanup or timeout management needed

Simple implementation

  • Single pg_advisory_xact_lock(id, type) call
  • Transaction-scoped lock lifecycle
  • Minimal code complexity (~180 lines total)

Battle-tested pattern

  • Used successfully in uhc-account-manager for years
  • PostgreSQL advisory locks are well-documented and stable
  • Proven solution for this exact use case

Why NOT Other Approaches?

Kubernetes Job

  • Requires separate Job definition and lifecycle management
  • Adds complexity to deployment pipeline (pre-install hooks, job cleanup)
  • Need to handle Job failures, retries, and cleanup
  • Tightly couples migration to deployment orchestration

Helm Hooks (pre-upgrade/pre-install)

  • Only works with Helm deployments (limits flexibility)
  • Hooks run sequentially, slowing down deployments
  • Debugging hook failures is more complex
  • Still need locking if hook runs multiple replicas

Leader Election

  • Requires additional RBAC permissions for pods
  • More complex implementation (leader election libraries, health checks)
  • Potential leader churn during network issues
  • Overkill for simple migration coordination

References

Summary by CodeRabbit

  • New Features

    • Database migrations now use advisory locks to coordinate execution safely across multiple pods.
  • Documentation

    • Added Migration Coordination section describing the locking mechanism and test procedures.
  • Tests

    • Added integration tests validating concurrent migration scenarios and advisory lock behavior.

@openshift-ci openshift-ci bot requested review from aredenba-rh and vkareh March 8, 2026 00:37
@openshift-ci
Copy link

openshift-ci bot commented Mar 8, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign tirthct for approval. For more information see the Code Review Process.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@openshift-ci
Copy link

openshift-ci bot commented Mar 8, 2026

Hi @ldornele. Thanks for your PR.

I'm waiting for a openshift-hyperfleet member to verify that this patch is reasonable to test. If it is, they should reply with /ok-to-test on its own line. Until that is done, I will not automatically test new commits in this PR, but the usual testing commands by org members will still work.

Tip

We noticed you've done this a few times! Consider joining the org to skip this step and gain /lgtm and other bot rights. We recommend asking approvers on your previous PRs to sponsor you.

Once the patch is verified, the new status will be reflected by the ok-to-test label.

I understand the commands that are listed here.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@coderabbitai
Copy link

coderabbitai bot commented Mar 8, 2026

Walkthrough

This pull request introduces a PostgreSQL advisory lock mechanism to coordinate database migrations across multiple pods. The changes include a new advisory lock implementation using PostgreSQL's pg_advisory_xact_lock, context-scoped lock management, a new MigrateWithLock function, and comprehensive integration tests. The migration command is updated to use the lock-based migration function instead of the direct migration call. Documentation explains the coordination mechanism, its zero-infrastructure-overhead design, and automatic cleanup via transaction completion.

Sequence Diagram(s)

sequenceDiagram
    participant Pod as Hyperfleet Pod
    participant Ctx as Context Manager
    participant Lock as Advisory Lock
    participant TX as DB Transaction
    participant Migration as Migration Engine
    participant PG as PostgreSQL

    Pod->>Ctx: MigrateWithLock(ctx, factory)
    Ctx->>Lock: NewAdvisoryLockContext(ctx, factory, id, Migrations)
    Lock->>TX: Begin Transaction
    TX->>PG: START TRANSACTION
    Lock->>Lock: Retrieve txid_current()
    Lock->>PG: pg_advisory_xact_lock(hash(id), hash(Migrations))
    PG-->>Lock: Lock acquired or wait
    Lock-->>Ctx: Return ctx with lock, ownerID, error
    Ctx->>Migration: Migrate(ctx with lock)
    Migration->>PG: Run pending migrations
    PG-->>Migration: Complete migrations
    Ctx->>Lock: Unlock(ctx, ownerID)
    Lock->>TX: Commit Transaction
    TX->>PG: COMMIT
    PG-->>TX: Lock released automatically
    Lock-->>Ctx: Acknowledged
    Ctx-->>Pod: Success or error
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and specifically summarizes the main change: adding PostgreSQL advisory locks for migration coordination to address concurrent migration issues during pod scaling.
Docstring Coverage ✅ Passed Docstring coverage is 81.25% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Tip

Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs).
Share your feedback on Discord.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (3)
pkg/db/context.go (1)

118-149: Consider documenting the context usage pattern.

The advisoryLockMap is shared by reference when stored in context. The implementation is safe when each goroutine starts with its own context.Background() (as the tests demonstrate), but could lead to subtle map race conditions if callers share a single context across goroutines that concurrently call Unlock.

Consider adding a doc comment clarifying that each concurrent operation should derive from a fresh context.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/db/context.go` around lines 118 - 149, The advisoryLockMap stored in
context is shared by reference which can cause map race conditions if a single
context is reused across goroutines; add a doc comment near Unlock (and/or the
advisoryLockMap type) explaining the usage pattern: callers must not share the
same context with advisory locks across concurrent goroutines and should instead
derive a fresh context (e.g., context.Background() or
context.WithCancel/WithTimeout) per concurrent operation to avoid concurrent
mutation of the map; reference advisoryLockMap and Unlock in the comment so
callers know which API has this constraint.
test/integration/advisory_locks_test.go (1)

288-297: Minor: Consider using error channel instead of t.Errorf in goroutine.

Calling t.Errorf from a goroutine can be fragile if the test function exits before the goroutine completes. While this test correctly synchronizes via channels, using an error channel would make the pattern more robust.

♻️ Alternative pattern using error channel
 	// Track when the second goroutine acquires the lock
 	acquired := make(chan bool, 1)
 	released := make(chan bool, 1)
+	errChan := make(chan error, 1)

 	// Second goroutine tries to acquire the same lock
 	go func() {
 		ctx2, lockOwnerID2, err := db.NewAdvisoryLockContext(context.Background(), helper.DBFactory, "blocking-test", db.Migrations)
 		if err != nil {
-			t.Errorf("Failed to acquire second lock: %v", err)
+			errChan <- err
 			return
 		}
 		defer db.Unlock(ctx2, lockOwnerID2)

+		errChan <- nil
 		acquired <- true
 		<-released // Wait for signal to release
 	}()

Then check errChan after the test completes to report any errors.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@test/integration/advisory_locks_test.go` around lines 288 - 297, Replace the
goroutine's direct t.Errorf calls with sending errors into an error channel
(e.g., errChan := make(chan error, 1)) so the main test goroutine can assert
them after synchronization; specifically, in the block that calls
db.NewAdvisoryLockContext (producing ctx2, lockOwnerID2) send any non-nil err
into errChan instead of calling t.Errorf, defer db.Unlock(ctx2, lockOwnerID2) as
before, and after the goroutine finishes (after receiving on released) read from
errChan and fail the test with t.Fatal or t.Errorf if an error was reported.
pkg/db/advisory_locks.go (1)

56-66: Consider returning nil on error for defensive clarity.

Returning a non-nil AdvisoryLock alongside an error is an unusual pattern. While the caller (context.go) correctly checks the error before using the lock, this could lead to accidental misuse if future callers forget to check.

♻️ Proposed fix
 	var txid struct{ ID int64 }
 	err := tx.Raw("select txid_current() as id").Scan(&txid).Error
+	if err != nil {
+		_ = tx.Rollback() // Clean up on error
+		return nil, err
+	}
 
 	return &AdvisoryLock{
 		txid:      txid.ID,
 		ownerUUID: ownerUUID,
 		id:        id,
 		lockType:  locktype,
 		g2:        tx,
 		startTime: time.Now(),
 	}, err
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/db/advisory_locks.go` around lines 56 - 66, The function is returning a
non-nil *AdvisoryLock together with err (created via tx.Raw("select
txid_current() as id").Scan(&txid).Error), which is fragile; instead, after
executing tx.Raw(...).Scan(&txid) check if err != nil and immediately return
nil, err; only construct and return &AdvisoryLock{...} (using txid.ID,
ownerUUID, id, locktype, g2: tx, startTime: time.Now()) when err is nil so
callers never receive a non-nil lock on error.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@pkg/db/advisory_locks.go`:
- Around line 56-66: The function is returning a non-nil *AdvisoryLock together
with err (created via tx.Raw("select txid_current() as id").Scan(&txid).Error),
which is fragile; instead, after executing tx.Raw(...).Scan(&txid) check if err
!= nil and immediately return nil, err; only construct and return
&AdvisoryLock{...} (using txid.ID, ownerUUID, id, locktype, g2: tx, startTime:
time.Now()) when err is nil so callers never receive a non-nil lock on error.

In `@pkg/db/context.go`:
- Around line 118-149: The advisoryLockMap stored in context is shared by
reference which can cause map race conditions if a single context is reused
across goroutines; add a doc comment near Unlock (and/or the advisoryLockMap
type) explaining the usage pattern: callers must not share the same context with
advisory locks across concurrent goroutines and should instead derive a fresh
context (e.g., context.Background() or context.WithCancel/WithTimeout) per
concurrent operation to avoid concurrent mutation of the map; reference
advisoryLockMap and Unlock in the comment so callers know which API has this
constraint.

In `@test/integration/advisory_locks_test.go`:
- Around line 288-297: Replace the goroutine's direct t.Errorf calls with
sending errors into an error channel (e.g., errChan := make(chan error, 1)) so
the main test goroutine can assert them after synchronization; specifically, in
the block that calls db.NewAdvisoryLockContext (producing ctx2, lockOwnerID2)
send any non-nil err into errChan instead of calling t.Errorf, defer
db.Unlock(ctx2, lockOwnerID2) as before, and after the goroutine finishes (after
receiving on released) read from errChan and fail the test with t.Fatal or
t.Errorf if an error was reported.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: a77cb951-fff5-462c-b81b-486f29aee60b

📥 Commits

Reviewing files that changed from the base of the PR and between 97284d9 and d0aa71f.

📒 Files selected for processing (6)
  • cmd/hyperfleet-api/migrate/cmd.go
  • docs/database.md
  • pkg/db/advisory_locks.go
  • pkg/db/context.go
  • pkg/db/migrations.go
  • test/integration/advisory_locks_test.go

locks = make(advisoryLockMap)
}

lock, err := newAdvisoryLock(ctx, connection, &lockOwnerID, &id, &lockType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Priority: Bug

The two error paths in NewAdvisoryLockContext (lines 97-100 and 104-107) leak the GORM
transaction created inside newAdvisoryLock. When either newAdvisoryLock or lock.lock() fails,
lock.g2 holds an open transaction that is never committed or rolled back.

The cleanest fix is to handle it inside newAdvisoryLock itself — roll back the transaction if
txid_current() fails:

  var txid struct{ ID int64 }
  err := tx.Raw("select txid_current() as id").Scan(&txid).Error
  if err != nil {
      tx.Rollback()
      return nil, err
  }

  return &AdvisoryLock{
      txid:      txid.ID,
      ownerUUID: ownerUUID,
      id:        id,
      lockType:  locktype,
      g2:        tx,
      startTime: time.Now(),
  }, nil

And in NewAdvisoryLockContext, roll back if lock.lock() fails:

err = lock.lock()
if err != nil {
    logger.WithError(ctx, err).Error("Failed to acquire advisory lock")
    lock.g2.Rollback() // clean up the open transaction
    return ctx, lockOwnerID, err
}

logger.With(ctx, "lock_id", lock.id).Warn("lockOwnerID could not be found in AdvisoryLock")
} else if *lock.ownerUUID == callerUUID {
lockID := "<missing>"
lockType := *lock.lockType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Priority: Bug

Line 131 dereferences lock.lockType without a nil check, but both lock.id (line 132) and
lock.ownerUUID (line 127) have nil guards. Should be consistent — if these fields are
pointers, guard them all:

  lockID := "<missing>"
  lockTypeStr := LockType("<missing>")
  if lock.lockType != nil {
      lockTypeStr = *lock.lockType
  }
  if lock.id != nil {
      lockID = *lock.id
  }

Minor, but a panic in Unlock during cleanup would be hard to debug in production.

// MigrateWithLock runs migrations with an advisory lock to prevent concurrent migrations
func MigrateWithLock(ctx context.Context, factory SessionFactory) error {
// Acquire advisory lock for migrations
ctx, lockOwnerID, err := NewAdvisoryLockContext(ctx, factory, "migrations", Migrations)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Priority: Pattern

The lock ID "migrations" on line 30 is a bare string, while the lock type already uses a
constant (Migrations). Since advisory_locks.go already has a constants block for lock types,
consider adding the lock ID there too:

  const (
      // Migrations lock type for database migrations
      Migrations LockType = "Migrations"

      // MigrationsLockID is the advisory lock ID used for migration coordination
      MigrationsLockID = "migrations"
  )

Then in migrations.go:

Suggested change
ctx, lockOwnerID, err := NewAdvisoryLockContext(ctx, factory, "migrations", Migrations)
ctx, lockOwnerID, err := NewAdvisoryLockContext(ctx, factory, MigrationsLockID, Migrations)

if l.lockType == nil {
return errors.New("AdvisoryLock: lockType is missing")
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

`pg_advisory_xact_lock` blocks indefinitely — if a pod hangs holding the migration lock (stuck migration, network partition), every other pod's init container blocks forever waiting.

The Helm chart doesn't set `activeDeadlineSeconds` on the pod spec either, so there's no Kubernetes-level safety net.

A couple of options:

  1. Use `pg_try_advisory_xact_lock()` in a retry loop with a configurable timeout
  2. `SET LOCAL statement_timeout = '300s'` on the lock session before acquiring
  3. At minimum, add `activeDeadlineSeconds` to the pod spec as a backstop

I'd lean toward option 2 — it's a one-liner and gives a clean error if the lock can't be acquired within 5 minutes.

locks = make(advisoryLockMap)
}

lock, err := newAdvisoryLock(ctx, connection, &lockOwnerID, &id, &lockType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

`advisoryLockMap` is a plain Go map stored via `context.WithValue`. The map is retrieved and mutated in place here — since maps are reference types, this mutation is visible to anything holding the same context. Go maps aren't safe for concurrent access and the runtime will panic if two goroutines hit the same map.

Current usage is fine (each init container starts from `context.Background()`), but since this is an exported API, a future caller sharing contexts across goroutines could hit a data race.

Worth either adding a `sync.Mutex` to the map wrapper or a doc comment on `NewAdvisoryLockContext` noting that the returned context must not be shared across goroutines.

go acquireLock(helper, &total, &waiter)
}

// Wait for all goroutines to complete
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test reads and writes `*total` from multiple goroutines without any Go-level synchronization. The advisory lock serializes execution at the DB level, but Go's memory model still requires explicit synchronization (mutex, atomic, channel) for visibility guarantees across goroutines. Running with `-race` would flag this.

Two options:

  1. Add a `sync.Mutex` around the `*total` read/write (quick fix, still proves DB-level serialization)
  2. Move the "work" into the database — read a row, sleep, update the row, verify final value. This would be a true end-to-end proof of lock serialization.

lockType *LockType
startTime time.Time
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

`txid` and `startTime` are set but never read anywhere. If `startTime` is intended for future lock-duration logging, it'd be nice to wire it into `unlock()` now (e.g., log the hold duration). Otherwise these are dead fields and the `txid_current()` query is a wasted round-trip per lock.

logger.WithError(ctx, err).Error("Failed to acquire advisory lock")
return ctx, lockOwnerID, err
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

`Unlock` returns `context.Context` but every caller discards it (`defer Unlock(ctx, lockOwnerID)`). Since the map is mutated in place via `delete()`, the returned context is the exact same object. The return type suggests immutable context semantics but doesn't deliver them.

Simplest fix: drop the return value to match the `defer` usage pattern.

}

// Track when the second goroutine acquires the lock
acquired := make(chan bool, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 100ms sleep here assumes the second goroutine has reached the `pg_advisory_xact_lock` call by that point. In CI with slow testcontainers under load, the goroutine might not have even started the lock call yet — the test would still pass but wouldn't actually be testing blocking behavior.

A more reliable approach: query `pg_locks` to confirm the second connection is actively waiting on the advisory lock before checking the `acquired` channel.

case <-time.After(5 * time.Second):
t.Error("Second goroutine did not acquire lock after first was released")
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of edge cases worth covering:

  1. Context cancellation while waiting — the current impl passes `ctx` to `connection.New(ctx)` but not to the blocking `pg_advisory_xact_lock` call. So context cancellation has no effect on a waiting lock. Even if we don't fix that now, a test documenting this behavior would be useful.

  2. Migration failure under lock — `TestConcurrentMigrations` only tests the happy path. A test that injects a migration error and verifies the lock is released (so other waiters proceed) would strengthen confidence in the defer cleanup.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants