Skip to content

feat: add leader election for Discovery and Engine components#1423

Open
thunguo wants to merge 1 commit intoapache:developfrom
thunguo:feat/leader-election
Open

feat: add leader election for Discovery and Engine components#1423
thunguo wants to merge 1 commit intoapache:developfrom
thunguo:feat/leader-election

Conversation

@thunguo
Copy link

@thunguo thunguo commented Mar 7, 2026

Please provide a description of this PR:
Add leader election for Discovery and Engine components.
Issue #1425

To help us figure out who should review this PR, please put an X in all the areas that this PR affects.

  • Docs
  • Installation
  • User Experience
  • Dubboctl
  • Console
  • Core Component

Please check any characteristics that apply to this pull request.

@sonarqubecloud
Copy link

sonarqubecloud bot commented Mar 7, 2026

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a DB-backed leader election mechanism so that, in multi-replica deployments, only the elected leader runs Discovery/Engine “business logic” (e.g., list-watch and DB writes), improving consistency (Issue #1425).

Changes:

  • Introduces pkg/core/leader with a GORM leader_leases model and a LeaderElection loop (plus unit tests).
  • Wires leader election into Discovery and Engine components (conditional on non-memory store).
  • Exposes DB access plumbing from the Store component (and adds a Pool() accessor on GormStore) to support leader-election DB usage.

Reviewed changes

Copilot reviewed 9 out of 10 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
pkg/store/dbcommon/gorm_store.go Adds Pool() accessor intended to expose the shared DB connection pool.
pkg/core/store/component.go Adds a leader.DBSource implementation to expose a shared *gorm.DB from the store layer.
pkg/core/leader/model.go Adds the LeaderLease GORM model for leader_leases.
pkg/core/leader/leader.go Implements DB-based leader election (acquire/renew/release + run loop).
pkg/core/leader/db_source.go Adds DBSource interface for components that can provide a *gorm.DB.
pkg/core/leader/leader_test.go Adds unit tests for leader election behavior using in-memory SQLite.
pkg/core/engine/component.go Integrates leader election into Engine startup flow.
pkg/core/discovery/component.go Integrates leader election into Discovery startup flow; fixes a log message typo.
go.mod / go.sum Dependency graph changes (tidy/reclassification + zookeeper direct dep).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +22 to +30
// LeaderLease is the GORM model for the leader_leases table
// It uses optimistic locking via the Version field to ensure atomic leader elections
type LeaderLease struct {
ID uint `gorm:"primaryKey;autoIncrement"`
Component string `gorm:"uniqueIndex;size:64;not null"`
HolderID string `gorm:"size:255;not null"`
AcquiredAt time.Time `gorm:"not null"`
ExpiresAt time.Time `gorm:"not null"`
Version int64 `gorm:"not null;default:0"`
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The struct/doc comment says the Version field is used for optimistic locking to ensure atomic leader elections, but TryAcquire does not include version in its acquisition UPDATE predicate (it only uses it for Renew). Either update the documentation to match the actual behavior, or extend acquisition to use Version as part of the atomicity guarantee.

Copilot uses AI. Check for mistakes.
Comment on lines +219 to +255
// RunLeaderElection runs the leader election loop
// It blocks and runs onStartLeading/onStopLeading callbacks as leadership changes
// This is designed to be run in a separate goroutine
func (le *LeaderElection) RunLeaderElection(ctx context.Context, stopCh <-chan struct{},
onStartLeading func(), onStopLeading func()) {

ticker := time.NewTicker(le.acquireRetry)
defer ticker.Stop()

renewTicker := time.NewTicker(le.renewInterval)
renewTicker.Stop() // Don't start renewal ticker yet

isLeader := false

for {
select {
case <-ctx.Done():
if isLeader {
le.Release(context.Background())
onStopLeading()
}
return
case <-stopCh:
if isLeader {
le.Release(context.Background())
onStopLeading()
}
return
case <-ticker.C:
// Try to acquire leadership if not already leader
if !isLeader {
if le.TryAcquire(ctx) {
logger.Infof("leader election: component %s acquired leadership (holder: %s)", le.component, le.holderID)
isLeader = true
renewTicker.Reset(le.renewInterval)
onStartLeading()
}
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RunLeaderElection waits for the first ticker.C before the initial TryAcquire, which can delay leader startup by up to acquireRetry (default 5s). Consider attempting TryAcquire once before starting the ticker loop so a leader can start work immediately on boot.

Copilot uses AI. Check for mistakes.
Comment on lines +41 to +45
// poolProvider is an internal interface for stores that provide DB access
// This avoids circular imports by not referencing dbcommon directly
type poolProvider interface {
Pool() interface{} // Returns *ConnectionPool, but we don't type it to avoid import
}
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

poolProvider is defined as Pool() interface{}, but GormStore.Pool() returns *ConnectionPool. Go method return types are not covariant, so store.(poolProvider) will never succeed and GetDB() will always return (nil, false), effectively disabling leader election for DB stores. Consider exposing a small, import-cycle-safe interface on DB-backed stores (e.g. a GetDB() *gorm.DB method on the store itself) or adjust the Pool() signature to exactly match the interface and then type-assert to a GetDB() *gorm.DB interface instead of using reflection.

Copilot uses AI. Check for mistakes.
Comment on lines +126 to +145
// GetDB returns the shared DB connection if the underlying store is DB-backed
// Implements the leader.DBSource interface
func (sc *storeComponent) GetDB() (*gorm.DB, bool) {
// Try to get DB from any store that has a Pool() method (all GormStores share the same ConnectionPool)
for _, store := range sc.stores {
if pp, ok := store.(poolProvider); ok {
pool := pp.Pool()
if pool == nil {
continue
}
// Use reflection to call GetDB() on the pool to avoid importing dbcommon
poolVal := reflect.ValueOf(pool)
getDBMethod := poolVal.MethodByName("GetDB")
if getDBMethod.IsValid() {
result := getDBMethod.Call(nil)
if len(result) > 0 {
if db, ok := result[0].Interface().(*gorm.DB); ok {
return db, true
}
}
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetDB() relies on the poolProvider type assertion + reflection. Given the current poolProvider signature mismatch, this loop will never find a DB and leader election will never be initialized. Even after fixing the signature, prefer a direct type assertion to a minimal interface (e.g. interface{ GetDB() *gorm.DB }) instead of reflection to avoid runtime surprises and make this easier to test.

Copilot uses AI. Check for mistakes.
Comment on lines +191 to +202
// Run leader election with callbacks for starting/stopping leadership
e.leaderElection.RunLeaderElection(ctx, ch,
func() { // onStartLeading callback
logger.Infof("engine: became leader, starting business logic")
if err := e.startBusinessLogic(ch); err != nil {
logger.Errorf("engine: failed to start business logic: %v", err)
}
},
func() { // onStopLeading callback
logger.Warnf("engine: lost leadership, stopping business logic")
},
)
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When leadership is acquired, startBusinessLogic(ch) starts informers using the global stop channel ch, but on leadership loss the onStopLeading callback only logs. This means informers/subscribers will continue running (and can be started multiple times on leadership flaps), violating the follower contract and potentially duplicating list-watch + DB writes. Use a per-leadership stop channel/context for informers, and in onStopLeading close it and Unsubscribe any subscribers (or otherwise ensure business logic is idempotent and fully stops on leadership loss).

Copilot uses AI. Check for mistakes.
Comment on lines +160 to +171
// Run leader election with callbacks for starting/stopping leadership
d.leaderElection.RunLeaderElection(ctx, ch,
func() { // onStartLeading callback
logger.Infof("discovery: became leader, starting business logic")
if err := d.startBusinessLogic(ch); err != nil {
logger.Errorf("discovery: failed to start business logic: %v", err)
}
},
func() { // onStopLeading callback
logger.Warnf("discovery: lost leadership, stopping business logic")
},
)
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as engine: startBusinessLogic(ch) launches informers with the global stop channel, but onStopLeading doesn’t stop them or unsubscribe, so a node that loses leadership will keep running leader-only behavior and repeated elections can start duplicate goroutines / duplicate subscription attempts. Use a dedicated stop channel/context per leadership term and stop/unsubscribe in onStopLeading to ensure followers do not execute list-watch or writes.

Copilot uses AI. Check for mistakes.
Comment on lines +150 to +155
if result.Error != nil {
// If insertion fails, it means another replica just created it
// This is expected in concurrent scenarios
logger.Debugf("leader election: failed to insert lease for component %s (probably created by another replica): %v", le.component, result.Error)
le.isLeader.Store(false)
return false
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TryAcquire treats any Create error as a benign race ("probably created by another replica") and logs it at debug level. That will also hide real DB failures (connectivity, permission, schema issues) and can leave the component stuck as follower without a visible signal. Consider checking specifically for unique-constraint violations and logging/handling other errors as warnings (or returning an error).

Copilot uses AI. Check for mistakes.
acquireRetry time.Duration
isLeader atomic.Bool
currentVersion int64
stopCh chan struct{}
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LeaderElection has a stopCh field that is never used (the stop channel is passed into RunLeaderElection instead). Removing the unused field will simplify the struct and avoid confusion about which stop mechanism is authoritative.

Suggested change
stopCh chan struct{}

Copilot uses AI. Check for mistakes.

// Pool returns the connection pool for this store
// Used by other components (e.g., leader election) that need direct DB access
func (gs *GormStore) Pool() *ConnectionPool {
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new Pool() accessor returns *ConnectionPool, but the only consumer (storeComponent.GetDB) currently looks for a Pool() interface{} method, so GormStore will not satisfy that interface and the accessor won’t be used. Align the method signature with the consumer approach (or replace this with a GetDB() *gorm.DB method on GormStore to avoid exposing the pool at all).

Suggested change
func (gs *GormStore) Pool() *ConnectionPool {
func (gs *GormStore) Pool() interface{} {

Copilot uses AI. Check for mistakes.
ticker := time.NewTicker(le.acquireRetry)
defer ticker.Stop()

renewTicker := time.NewTicker(le.renewInterval)
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renewTicker is created but not defer-stopped on all exit paths (only stopped in some state transitions). If the function returns while renewTicker is active, the ticker goroutine can leak. Add defer renewTicker.Stop() right after creation (similar to ticker).

Suggested change
renewTicker := time.NewTicker(le.renewInterval)
renewTicker := time.NewTicker(le.renewInterval)
defer renewTicker.Stop()

Copilot uses AI. Check for mistakes.
Copy link
Contributor

@robocanic robocanic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great Work! I left some comments and hope you can discuss it with me.

}

// Initialize leader election if using DB store
if ctx.Config().Store.Type != storecfg.Memory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: 这里的多个if else处理可以更优雅一点,不符合预期的情况提前返回。
比如
if ctx.Config().Store.Type == storecfg.Memory return
// 再写后面的逻辑

func (sc *storeComponent) GetDB() (*gorm.DB, bool) {
// Try to get DB from any store that has a Pool() method (all GormStores share the same ConnectionPool)
for _, store := range sc.stores {
if pp, ok := store.(poolProvider); ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: 这个for-loop里面的if 条件不满足的可以先continue,编码看起来会更优雅一点

// DefaultLeaseDuration is the default duration for a leader lease
DefaultLeaseDuration = 30 * time.Second
// DefaultRenewInterval is the default interval for renewing the lease
DefaultRenewInterval = 10 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: 这里renewInterval和leaseDuration有整除的关系,会不会出现这样一种情况,某一个正常的节点在上一个周期是主节点,但renew时由于并发的关系被其他节点抢占了主,从而该节点变成了从节点,但是由主->从的这个状态变化并没有让这个节点的discovery和engine停止,下一个周期会有两个节点的discovery和engine都在do list-watch,往数据库里面写数据,会导致脏数据的产生。
这里面有两个核心问题:

  1. 一个是已经成为主节点的节点,在正常情况下应该一直持有这个lease,除非自己挂了,才需要重新选主
  2. 如果有异常情况,主节点虽然是正常的,但是由于网络原因在下一个周期没抢到lease,那该节点需要停止主节点才能干的事情

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants