HYPERFLEET-618: add PostgreSQL advisory locks for migration coordination#72
HYPERFLEET-618: add PostgreSQL advisory locks for migration coordination#72ldornele wants to merge 1 commit intoopenshift-hyperfleet:mainfrom
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Hi @ldornele. Thanks for your PR. I'm waiting for a openshift-hyperfleet member to verify that this patch is reasonable to test. If it is, they should reply with Tip We noticed you've done this a few times! Consider joining the org to skip this step and gain Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
WalkthroughThis pull request introduces a PostgreSQL advisory lock mechanism to coordinate database migrations across multiple pods. The changes include a new advisory lock implementation using PostgreSQL's Sequence Diagram(s)sequenceDiagram
participant Pod as Hyperfleet Pod
participant Ctx as Context Manager
participant Lock as Advisory Lock
participant TX as DB Transaction
participant Migration as Migration Engine
participant PG as PostgreSQL
Pod->>Ctx: MigrateWithLock(ctx, factory)
Ctx->>Lock: NewAdvisoryLockContext(ctx, factory, id, Migrations)
Lock->>TX: Begin Transaction
TX->>PG: START TRANSACTION
Lock->>Lock: Retrieve txid_current()
Lock->>PG: pg_advisory_xact_lock(hash(id), hash(Migrations))
PG-->>Lock: Lock acquired or wait
Lock-->>Ctx: Return ctx with lock, ownerID, error
Ctx->>Migration: Migrate(ctx with lock)
Migration->>PG: Run pending migrations
PG-->>Migration: Complete migrations
Ctx->>Lock: Unlock(ctx, ownerID)
Lock->>TX: Commit Transaction
TX->>PG: COMMIT
PG-->>TX: Lock released automatically
Lock-->>Ctx: Acknowledged
Ctx-->>Pod: Success or error
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs). Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (3)
pkg/db/context.go (1)
118-149: Consider documenting the context usage pattern.The
advisoryLockMapis shared by reference when stored in context. The implementation is safe when each goroutine starts with its owncontext.Background()(as the tests demonstrate), but could lead to subtle map race conditions if callers share a single context across goroutines that concurrently callUnlock.Consider adding a doc comment clarifying that each concurrent operation should derive from a fresh context.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/db/context.go` around lines 118 - 149, The advisoryLockMap stored in context is shared by reference which can cause map race conditions if a single context is reused across goroutines; add a doc comment near Unlock (and/or the advisoryLockMap type) explaining the usage pattern: callers must not share the same context with advisory locks across concurrent goroutines and should instead derive a fresh context (e.g., context.Background() or context.WithCancel/WithTimeout) per concurrent operation to avoid concurrent mutation of the map; reference advisoryLockMap and Unlock in the comment so callers know which API has this constraint.test/integration/advisory_locks_test.go (1)
288-297: Minor: Consider using error channel instead of t.Errorf in goroutine.Calling
t.Errorffrom a goroutine can be fragile if the test function exits before the goroutine completes. While this test correctly synchronizes via channels, using an error channel would make the pattern more robust.♻️ Alternative pattern using error channel
// Track when the second goroutine acquires the lock acquired := make(chan bool, 1) released := make(chan bool, 1) + errChan := make(chan error, 1) // Second goroutine tries to acquire the same lock go func() { ctx2, lockOwnerID2, err := db.NewAdvisoryLockContext(context.Background(), helper.DBFactory, "blocking-test", db.Migrations) if err != nil { - t.Errorf("Failed to acquire second lock: %v", err) + errChan <- err return } defer db.Unlock(ctx2, lockOwnerID2) + errChan <- nil acquired <- true <-released // Wait for signal to release }()Then check
errChanafter the test completes to report any errors.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@test/integration/advisory_locks_test.go` around lines 288 - 297, Replace the goroutine's direct t.Errorf calls with sending errors into an error channel (e.g., errChan := make(chan error, 1)) so the main test goroutine can assert them after synchronization; specifically, in the block that calls db.NewAdvisoryLockContext (producing ctx2, lockOwnerID2) send any non-nil err into errChan instead of calling t.Errorf, defer db.Unlock(ctx2, lockOwnerID2) as before, and after the goroutine finishes (after receiving on released) read from errChan and fail the test with t.Fatal or t.Errorf if an error was reported.pkg/db/advisory_locks.go (1)
56-66: Consider returning nil on error for defensive clarity.Returning a non-nil
AdvisoryLockalongside an error is an unusual pattern. While the caller (context.go) correctly checks the error before using the lock, this could lead to accidental misuse if future callers forget to check.♻️ Proposed fix
var txid struct{ ID int64 } err := tx.Raw("select txid_current() as id").Scan(&txid).Error + if err != nil { + _ = tx.Rollback() // Clean up on error + return nil, err + } return &AdvisoryLock{ txid: txid.ID, ownerUUID: ownerUUID, id: id, lockType: locktype, g2: tx, startTime: time.Now(), }, err }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/db/advisory_locks.go` around lines 56 - 66, The function is returning a non-nil *AdvisoryLock together with err (created via tx.Raw("select txid_current() as id").Scan(&txid).Error), which is fragile; instead, after executing tx.Raw(...).Scan(&txid) check if err != nil and immediately return nil, err; only construct and return &AdvisoryLock{...} (using txid.ID, ownerUUID, id, locktype, g2: tx, startTime: time.Now()) when err is nil so callers never receive a non-nil lock on error.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@pkg/db/advisory_locks.go`:
- Around line 56-66: The function is returning a non-nil *AdvisoryLock together
with err (created via tx.Raw("select txid_current() as id").Scan(&txid).Error),
which is fragile; instead, after executing tx.Raw(...).Scan(&txid) check if err
!= nil and immediately return nil, err; only construct and return
&AdvisoryLock{...} (using txid.ID, ownerUUID, id, locktype, g2: tx, startTime:
time.Now()) when err is nil so callers never receive a non-nil lock on error.
In `@pkg/db/context.go`:
- Around line 118-149: The advisoryLockMap stored in context is shared by
reference which can cause map race conditions if a single context is reused
across goroutines; add a doc comment near Unlock (and/or the advisoryLockMap
type) explaining the usage pattern: callers must not share the same context with
advisory locks across concurrent goroutines and should instead derive a fresh
context (e.g., context.Background() or context.WithCancel/WithTimeout) per
concurrent operation to avoid concurrent mutation of the map; reference
advisoryLockMap and Unlock in the comment so callers know which API has this
constraint.
In `@test/integration/advisory_locks_test.go`:
- Around line 288-297: Replace the goroutine's direct t.Errorf calls with
sending errors into an error channel (e.g., errChan := make(chan error, 1)) so
the main test goroutine can assert them after synchronization; specifically, in
the block that calls db.NewAdvisoryLockContext (producing ctx2, lockOwnerID2)
send any non-nil err into errChan instead of calling t.Errorf, defer
db.Unlock(ctx2, lockOwnerID2) as before, and after the goroutine finishes (after
receiving on released) read from errChan and fail the test with t.Fatal or
t.Errorf if an error was reported.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: a77cb951-fff5-462c-b81b-486f29aee60b
📒 Files selected for processing (6)
cmd/hyperfleet-api/migrate/cmd.godocs/database.mdpkg/db/advisory_locks.gopkg/db/context.gopkg/db/migrations.gotest/integration/advisory_locks_test.go
| locks = make(advisoryLockMap) | ||
| } | ||
|
|
||
| lock, err := newAdvisoryLock(ctx, connection, &lockOwnerID, &id, &lockType) |
There was a problem hiding this comment.
Priority: Bug
The two error paths in NewAdvisoryLockContext (lines 97-100 and 104-107) leak the GORM
transaction created inside newAdvisoryLock. When either newAdvisoryLock or lock.lock() fails,
lock.g2 holds an open transaction that is never committed or rolled back.
The cleanest fix is to handle it inside newAdvisoryLock itself — roll back the transaction if
txid_current() fails:
var txid struct{ ID int64 }
err := tx.Raw("select txid_current() as id").Scan(&txid).Error
if err != nil {
tx.Rollback()
return nil, err
}
return &AdvisoryLock{
txid: txid.ID,
ownerUUID: ownerUUID,
id: id,
lockType: locktype,
g2: tx,
startTime: time.Now(),
}, nilAnd in NewAdvisoryLockContext, roll back if lock.lock() fails:
err = lock.lock()
if err != nil {
logger.WithError(ctx, err).Error("Failed to acquire advisory lock")
lock.g2.Rollback() // clean up the open transaction
return ctx, lockOwnerID, err
}| logger.With(ctx, "lock_id", lock.id).Warn("lockOwnerID could not be found in AdvisoryLock") | ||
| } else if *lock.ownerUUID == callerUUID { | ||
| lockID := "<missing>" | ||
| lockType := *lock.lockType |
There was a problem hiding this comment.
Priority: Bug
Line 131 dereferences lock.lockType without a nil check, but both lock.id (line 132) and
lock.ownerUUID (line 127) have nil guards. Should be consistent — if these fields are
pointers, guard them all:
lockID := "<missing>"
lockTypeStr := LockType("<missing>")
if lock.lockType != nil {
lockTypeStr = *lock.lockType
}
if lock.id != nil {
lockID = *lock.id
}Minor, but a panic in Unlock during cleanup would be hard to debug in production.
| // MigrateWithLock runs migrations with an advisory lock to prevent concurrent migrations | ||
| func MigrateWithLock(ctx context.Context, factory SessionFactory) error { | ||
| // Acquire advisory lock for migrations | ||
| ctx, lockOwnerID, err := NewAdvisoryLockContext(ctx, factory, "migrations", Migrations) |
There was a problem hiding this comment.
Priority: Pattern
The lock ID "migrations" on line 30 is a bare string, while the lock type already uses a
constant (Migrations). Since advisory_locks.go already has a constants block for lock types,
consider adding the lock ID there too:
const (
// Migrations lock type for database migrations
Migrations LockType = "Migrations"
// MigrationsLockID is the advisory lock ID used for migration coordination
MigrationsLockID = "migrations"
)Then in migrations.go:
| ctx, lockOwnerID, err := NewAdvisoryLockContext(ctx, factory, "migrations", Migrations) | |
| ctx, lockOwnerID, err := NewAdvisoryLockContext(ctx, factory, MigrationsLockID, Migrations) |
| if l.lockType == nil { | ||
| return errors.New("AdvisoryLock: lockType is missing") | ||
| } | ||
|
|
There was a problem hiding this comment.
`pg_advisory_xact_lock` blocks indefinitely — if a pod hangs holding the migration lock (stuck migration, network partition), every other pod's init container blocks forever waiting.
The Helm chart doesn't set `activeDeadlineSeconds` on the pod spec either, so there's no Kubernetes-level safety net.
A couple of options:
- Use `pg_try_advisory_xact_lock()` in a retry loop with a configurable timeout
- `SET LOCAL statement_timeout = '300s'` on the lock session before acquiring
- At minimum, add `activeDeadlineSeconds` to the pod spec as a backstop
I'd lean toward option 2 — it's a one-liner and gives a clean error if the lock can't be acquired within 5 minutes.
| locks = make(advisoryLockMap) | ||
| } | ||
|
|
||
| lock, err := newAdvisoryLock(ctx, connection, &lockOwnerID, &id, &lockType) |
There was a problem hiding this comment.
`advisoryLockMap` is a plain Go map stored via `context.WithValue`. The map is retrieved and mutated in place here — since maps are reference types, this mutation is visible to anything holding the same context. Go maps aren't safe for concurrent access and the runtime will panic if two goroutines hit the same map.
Current usage is fine (each init container starts from `context.Background()`), but since this is an exported API, a future caller sharing contexts across goroutines could hit a data race.
Worth either adding a `sync.Mutex` to the map wrapper or a doc comment on `NewAdvisoryLockContext` noting that the returned context must not be shared across goroutines.
| go acquireLock(helper, &total, &waiter) | ||
| } | ||
|
|
||
| // Wait for all goroutines to complete |
There was a problem hiding this comment.
This test reads and writes `*total` from multiple goroutines without any Go-level synchronization. The advisory lock serializes execution at the DB level, but Go's memory model still requires explicit synchronization (mutex, atomic, channel) for visibility guarantees across goroutines. Running with `-race` would flag this.
Two options:
- Add a `sync.Mutex` around the `*total` read/write (quick fix, still proves DB-level serialization)
- Move the "work" into the database — read a row, sleep, update the row, verify final value. This would be a true end-to-end proof of lock serialization.
| lockType *LockType | ||
| startTime time.Time | ||
| } | ||
|
|
There was a problem hiding this comment.
`txid` and `startTime` are set but never read anywhere. If `startTime` is intended for future lock-duration logging, it'd be nice to wire it into `unlock()` now (e.g., log the hold duration). Otherwise these are dead fields and the `txid_current()` query is a wasted round-trip per lock.
| logger.WithError(ctx, err).Error("Failed to acquire advisory lock") | ||
| return ctx, lockOwnerID, err | ||
| } | ||
|
|
There was a problem hiding this comment.
`Unlock` returns `context.Context` but every caller discards it (`defer Unlock(ctx, lockOwnerID)`). Since the map is mutated in place via `delete()`, the returned context is the exact same object. The return type suggests immutable context semantics but doesn't deliver them.
Simplest fix: drop the return value to match the `defer` usage pattern.
| } | ||
|
|
||
| // Track when the second goroutine acquires the lock | ||
| acquired := make(chan bool, 1) |
There was a problem hiding this comment.
The 100ms sleep here assumes the second goroutine has reached the `pg_advisory_xact_lock` call by that point. In CI with slow testcontainers under load, the goroutine might not have even started the lock call yet — the test would still pass but wouldn't actually be testing blocking behavior.
A more reliable approach: query `pg_locks` to confirm the second connection is actively waiting on the advisory lock before checking the `acquired` channel.
| case <-time.After(5 * time.Second): | ||
| t.Error("Second goroutine did not acquire lock after first was released") | ||
| } | ||
| } |
There was a problem hiding this comment.
A couple of edge cases worth covering:
-
Context cancellation while waiting — the current impl passes `ctx` to `connection.New(ctx)` but not to the blocking `pg_advisory_xact_lock` call. So context cancellation has no effect on a waiting lock. Even if we don't fix that now, a test documenting this behavior would be useful.
-
Migration failure under lock — `TestConcurrentMigrations` only tests the happy path. A test that injects a migration error and verifies the lock is released (so other waiters proceed) would strengthen confidence in the defer cleanup.
Summary
Adds PostgreSQL advisory locks to coordinate database migrations during rolling deployments, preventing race conditions when multiple pods start simultaneously.
Problem
During rolling deployments (e.g., scaling from 0→3 replicas), each pod's init container attempts to run database migrations concurrently. The current implementation using gormigrate lacks coordination for concurrent execution, leading to:
Solution: PostgreSQL Advisory Locks
We chose PostgreSQL advisory locks over other coordination mechanisms for the following reasons:
Why Advisory Locks?
✅ Zero infrastructure overhead
✅ Automatic cleanup
✅ Simple implementation
pg_advisory_xact_lock(id, type)call✅ Battle-tested pattern
Why NOT Other Approaches?
❌ Kubernetes Job
❌ Helm Hooks (pre-upgrade/pre-install)
❌ Leader Election
References
Summary by CodeRabbit
New Features
Documentation
Tests