-
Notifications
You must be signed in to change notification settings - Fork 14
HYPERFLEET-618: add PostgreSQL advisory locks for migration coordination #72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,107 @@ | ||
| package db | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "hash/fnv" | ||
| "time" | ||
|
|
||
| "gorm.io/gorm" | ||
| ) | ||
|
|
||
| // LockType represents the type of advisory lock | ||
| type LockType string | ||
|
|
||
| const ( | ||
| // Migrations lock type for database migrations | ||
| Migrations LockType = "Migrations" | ||
| ) | ||
|
|
||
| // AdvisoryLock represents a postgres advisory lock | ||
| // | ||
| // begin # start a Tx | ||
| // select pg_advisory_xact_lock(id, lockType) # obtain the lock (blocking) | ||
| // end # end the Tx and release the lock | ||
| // | ||
| // ownerUUID is a way to own the lock. Only the very first | ||
| // service call that owns the lock will have the correct ownerUUID. This is necessary | ||
| // to allow functions to call other service functions as part of the same lock (id, lockType). | ||
| type AdvisoryLock struct { | ||
| g2 *gorm.DB | ||
| txid int64 | ||
| ownerUUID *string | ||
| id *string | ||
| lockType *LockType | ||
| startTime time.Time | ||
| } | ||
|
|
||
| // newAdvisoryLock constructs a new AdvisoryLock object. | ||
| func newAdvisoryLock(ctx context.Context, connection SessionFactory, ownerUUID *string, id *string, locktype *LockType) (*AdvisoryLock, error) { | ||
| if connection == nil { | ||
| return nil, errors.New("AdvisoryLock: connection factory is missing") | ||
| } | ||
|
|
||
| // it requires a new DB session to start the advisory lock. | ||
| g2 := connection.New(ctx) | ||
|
|
||
| // start a Tx to ensure gorm will obtain/release the lock using a same connection. | ||
| tx := g2.Begin() | ||
| if tx.Error != nil { | ||
| return nil, tx.Error | ||
| } | ||
|
|
||
| // current transaction ID set by postgres. these are *not* distinct across time | ||
| // and do get reset after postgres performs "vacuuming" to reclaim used IDs. | ||
| var txid struct{ ID int64 } | ||
| err := tx.Raw("select txid_current() as id").Scan(&txid).Error | ||
|
|
||
| return &AdvisoryLock{ | ||
| txid: txid.ID, | ||
| ownerUUID: ownerUUID, | ||
| id: id, | ||
| lockType: locktype, | ||
| g2: tx, | ||
| startTime: time.Now(), | ||
| }, err | ||
| } | ||
|
|
||
| // lock calls select pg_advisory_xact_lock(id, lockType) to obtain the lock defined by (id, lockType). | ||
| // it is blocked if some other thread currently is holding the same lock (id, lockType). | ||
| // if blocked, it can be unblocked or timed out when overloaded. | ||
| func (l *AdvisoryLock) lock() error { | ||
| if l.g2 == nil { | ||
| return errors.New("AdvisoryLock: transaction is missing") | ||
| } | ||
| if l.id == nil { | ||
| return errors.New("AdvisoryLock: id is missing") | ||
| } | ||
| if l.lockType == nil { | ||
| return errors.New("AdvisoryLock: lockType is missing") | ||
| } | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. `pg_advisory_xact_lock` blocks indefinitely — if a pod hangs holding the migration lock (stuck migration, network partition), every other pod's init container blocks forever waiting. The Helm chart doesn't set `activeDeadlineSeconds` on the pod spec either, so there's no Kubernetes-level safety net. A couple of options:
I'd lean toward option 2 — it's a one-liner and gives a clean error if the lock can't be acquired within 5 minutes. |
||
| idAsInt := hash(*l.id) | ||
| typeAsInt := hash(string(*l.lockType)) | ||
| err := l.g2.Exec("select pg_advisory_xact_lock(?, ?)", idAsInt, typeAsInt).Error | ||
| return err | ||
| } | ||
|
|
||
| func (l *AdvisoryLock) unlock() error { | ||
| if l.g2 == nil { | ||
| return errors.New("AdvisoryLock: transaction is missing") | ||
| } | ||
|
|
||
| // it ends the Tx and implicitly releases the lock. | ||
| err := l.g2.Commit().Error | ||
| l.g2 = nil | ||
| return err | ||
| } | ||
|
|
||
| // hash string to int32 (postgres integer) | ||
| // https://pkg.go.dev/math#pkg-constants | ||
| // https://www.postgresql.org/docs/12/datatype-numeric.html | ||
| func hash(s string) int32 { | ||
| h := fnv.New32a() | ||
| h.Write([]byte(s)) | ||
| // Sum32() returns uint32. needs conversion. | ||
| return int32(h.Sum32()) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,10 +3,33 @@ package db | |
| import ( | ||
| "context" | ||
|
|
||
| "github.com/google/uuid" | ||
|
|
||
| dbContext "github.com/openshift-hyperfleet/hyperfleet-api/pkg/db/db_context" | ||
| "github.com/openshift-hyperfleet/hyperfleet-api/pkg/logger" | ||
| ) | ||
|
|
||
| type advisoryLockKey string | ||
|
|
||
| const ( | ||
| advisoryLock advisoryLockKey = "advisoryLock" | ||
| ) | ||
|
|
||
| type advisoryLockMap map[string]*AdvisoryLock | ||
|
|
||
| func (m advisoryLockMap) key(id string, lockType LockType) string { | ||
| return id + ":" + string(lockType) | ||
| } | ||
|
|
||
| func (m advisoryLockMap) get(id string, lockType LockType) (*AdvisoryLock, bool) { | ||
| lock, ok := m[m.key(id, lockType)] | ||
| return lock, ok | ||
| } | ||
|
|
||
| func (m advisoryLockMap) set(id string, lockType LockType, lock *AdvisoryLock) { | ||
| m[m.key(id, lockType)] = lock | ||
| } | ||
|
|
||
| // NewContext returns a new context with transaction stored in it. | ||
| // Upon error, the original context is still returned along with an error | ||
| func NewContext(ctx context.Context, connection SessionFactory) (context.Context, error) { | ||
|
|
@@ -53,3 +76,74 @@ func MarkForRollback(ctx context.Context, err error) { | |
| transaction.SetRollbackFlag(true) | ||
| logger.WithError(ctx, err).Info("Marked transaction for rollback") | ||
| } | ||
|
|
||
| // NewAdvisoryLockContext returns a new context with AdvisoryLock stored in it. | ||
| // Upon error, the original context is still returned along with an error | ||
| func NewAdvisoryLockContext(ctx context.Context, connection SessionFactory, id string, lockType LockType) (context.Context, string, error) { | ||
| // lockOwnerID will be different for every service function that attempts to start a lock. | ||
| // only the initial call in the stack must unlock. | ||
| // Unlock() will compare UUIDs and ensure only the top level call succeeds. | ||
| lockOwnerID := uuid.New().String() | ||
|
|
||
| locks, found := ctx.Value(advisoryLock).(advisoryLockMap) | ||
| if found { | ||
| if _, ok := locks.get(id, lockType); ok { | ||
| return ctx, lockOwnerID, nil | ||
| } | ||
| } else { | ||
| locks = make(advisoryLockMap) | ||
| } | ||
|
|
||
| lock, err := newAdvisoryLock(ctx, connection, &lockOwnerID, &id, &lockType) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Priority: Bug The two error paths in NewAdvisoryLockContext (lines 97-100 and 104-107) leak the GORM The cleanest fix is to handle it inside newAdvisoryLock itself — roll back the transaction if var txid struct{ ID int64 }
err := tx.Raw("select txid_current() as id").Scan(&txid).Error
if err != nil {
tx.Rollback()
return nil, err
}
return &AdvisoryLock{
txid: txid.ID,
ownerUUID: ownerUUID,
id: id,
lockType: locktype,
g2: tx,
startTime: time.Now(),
}, nilAnd in NewAdvisoryLockContext, roll back if lock.lock() fails: err = lock.lock()
if err != nil {
logger.WithError(ctx, err).Error("Failed to acquire advisory lock")
lock.g2.Rollback() // clean up the open transaction
return ctx, lockOwnerID, err
}
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. `advisoryLockMap` is a plain Go map stored via `context.WithValue`. The map is retrieved and mutated in place here — since maps are reference types, this mutation is visible to anything holding the same context. Go maps aren't safe for concurrent access and the runtime will panic if two goroutines hit the same map. Current usage is fine (each init container starts from `context.Background()`), but since this is an exported API, a future caller sharing contexts across goroutines could hit a data race. Worth either adding a `sync.Mutex` to the map wrapper or a doc comment on `NewAdvisoryLockContext` noting that the returned context must not be shared across goroutines. |
||
| if err != nil { | ||
| logger.WithError(ctx, err).Error("Failed to create advisory lock") | ||
| return ctx, lockOwnerID, err | ||
| } | ||
|
|
||
| // obtain the advisory lock (blocking) | ||
| err = lock.lock() | ||
| if err != nil { | ||
| logger.WithError(ctx, err).Error("Failed to acquire advisory lock") | ||
| return ctx, lockOwnerID, err | ||
| } | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. `Unlock` returns `context.Context` but every caller discards it (`defer Unlock(ctx, lockOwnerID)`). Since the map is mutated in place via `delete()`, the returned context is the exact same object. The return type suggests immutable context semantics but doesn't deliver them. Simplest fix: drop the return value to match the `defer` usage pattern. |
||
| locks.set(id, lockType, lock) | ||
|
|
||
| ctx = context.WithValue(ctx, advisoryLock, locks) | ||
| logger.With(ctx, "lock_id", id, "lock_type", lockType).Info("Acquired advisory lock") | ||
|
|
||
| return ctx, lockOwnerID, nil | ||
| } | ||
|
|
||
| // Unlock searches current locks and unlocks the one matching its owner id. | ||
| func Unlock(ctx context.Context, callerUUID string) context.Context { | ||
| locks, ok := ctx.Value(advisoryLock).(advisoryLockMap) | ||
| if !ok { | ||
| logger.Error(ctx, "Could not retrieve locks from context") | ||
| return ctx | ||
| } | ||
|
|
||
| for k, lock := range locks { | ||
| if lock.ownerUUID == nil { | ||
| logger.With(ctx, "lock_id", lock.id).Warn("lockOwnerID could not be found in AdvisoryLock") | ||
| } else if *lock.ownerUUID == callerUUID { | ||
| lockID := "<missing>" | ||
| lockType := *lock.lockType | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Priority: Bug Line 131 dereferences lock.lockType without a nil check, but both lock.id (line 132) and lockID := "<missing>"
lockTypeStr := LockType("<missing>")
if lock.lockType != nil {
lockTypeStr = *lock.lockType
}
if lock.id != nil {
lockID = *lock.id
}Minor, but a panic in Unlock during cleanup would be hard to debug in production. |
||
| if lock.id != nil { | ||
| lockID = *lock.id | ||
| } | ||
|
|
||
| if err := lock.unlock(); err != nil { | ||
| logger.With(ctx, "lock_id", lockID, "lock_type", lockType).WithError(err).Error("Could not unlock lock") | ||
| } else { | ||
| logger.With(ctx, "lock_id", lockID, "lock_type", lockType).Info("Unlocked lock") | ||
| } | ||
| delete(locks, k) | ||
| } else { | ||
| // the resolving UUID belongs to a service call that did *not* initiate the lock. | ||
| // it is ignored. | ||
| } | ||
| } | ||
|
|
||
| return ctx | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -24,6 +24,27 @@ func Migrate(g2 *gorm.DB) error { | |||||
| return nil | ||||||
| } | ||||||
|
|
||||||
| // MigrateWithLock runs migrations with an advisory lock to prevent concurrent migrations | ||||||
| func MigrateWithLock(ctx context.Context, factory SessionFactory) error { | ||||||
| // Acquire advisory lock for migrations | ||||||
| ctx, lockOwnerID, err := NewAdvisoryLockContext(ctx, factory, "migrations", Migrations) | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Priority: Pattern The lock ID "migrations" on line 30 is a bare string, while the lock type already uses a const (
// Migrations lock type for database migrations
Migrations LockType = "Migrations"
// MigrationsLockID is the advisory lock ID used for migration coordination
MigrationsLockID = "migrations"
)Then in migrations.go:
Suggested change
|
||||||
| if err != nil { | ||||||
| logger.WithError(ctx, err).Error("Could not lock migrations") | ||||||
| return err | ||||||
| } | ||||||
| defer Unlock(ctx, lockOwnerID) | ||||||
|
|
||||||
| // Run migrations with the locked context | ||||||
| g2 := factory.New(ctx) | ||||||
| if err := Migrate(g2); err != nil { | ||||||
| logger.WithError(ctx, err).Error("Could not migrate") | ||||||
| return err | ||||||
| } | ||||||
|
|
||||||
| logger.Info(ctx, "Migration completed successfully") | ||||||
| return nil | ||||||
| } | ||||||
|
|
||||||
| // MigrateTo a specific migration will not seed the database, seeds are up to date with the latest | ||||||
| // schema based on the most recent migration | ||||||
| // This should be for testing purposes mainly | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
`txid` and `startTime` are set but never read anywhere. If `startTime` is intended for future lock-duration logging, it'd be nice to wire it into `unlock()` now (e.g., log the hold duration). Otherwise these are dead fields and the `txid_current()` query is a wasted round-trip per lock.