Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions block/internal/syncing/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ type Syncer struct {
// P2P wait coordination
p2pWaitState atomic.Value // stores p2pWaitState

// DA head-reached signal for recovery mode (stays true once DA head is seen)
daHeadReached atomic.Bool

// blockSyncer is the interface used for block sync operations.
// defaults to self, but can be wrapped with tracing.
blockSyncer BlockSyncer
Expand Down Expand Up @@ -400,6 +403,7 @@ func (s *Syncer) daWorkerLoop() {
var backoff time.Duration
if err == nil {
// No error, means we are caught up.
s.daHeadReached.Store(true)
backoff = s.config.DA.BlockTime.Duration
} else {
// Error, back off for a shorter duration.
Expand All @@ -417,6 +421,12 @@ func (s *Syncer) daWorkerLoop() {
}
}

// HasReachedDAHead returns true once the DA worker has reached the DA head.
// Once set, it stays true.
func (s *Syncer) HasReachedDAHead() bool {
return s.daHeadReached.Load()
}

func (s *Syncer) fetchDAUntilCaughtUp() error {
for {
select {
Expand Down
172 changes: 172 additions & 0 deletions node/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,175 @@ func (a *singleRoleElector) state() *failoverState {
}
return nil
}

var _ leaderElection = &sequencerRecoveryElector{}
var _ testSupportElection = &sequencerRecoveryElector{}

// sequencerRecoveryElector implements leaderElection for disaster recovery.
// It starts in sync mode (follower), catches up from DA and P2P, then switches to aggregator (leader) mode.
// This is for single-sequencer setups that don't use raft.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this feels like a code smell. i dont follow why this needs to be its own flow. it should be merely, do i have a signing key, is the aggregator flag on, what is the head block, which we can grab from p2p, or checking where da sync is in relation to the head of the da chain

this feature should be combined with sync vs follow. they tie together but arent blocked on each other.

type sequencerRecoveryElector struct {
running atomic.Bool
logger zerolog.Logger
followerFactory func() (raft.Runnable, error)
leaderFactory func() (raft.Runnable, error)
store store.Store
daBlockTime time.Duration
p2pTimeout time.Duration

// activeState tracks the current failoverState for test access
activeState atomic.Pointer[failoverState]
}

func newSequencerRecoveryElector(
logger zerolog.Logger,
leaderFactory func() (raft.Runnable, error),
followerFactory func() (raft.Runnable, error),
store store.Store,
daBlockTime time.Duration,
p2pTimeout time.Duration,
) (*sequencerRecoveryElector, error) {
return &sequencerRecoveryElector{
logger: logger.With().Str("component", "sequencer-recovery").Logger(),
followerFactory: followerFactory,
leaderFactory: leaderFactory,
store: store,
daBlockTime: daBlockTime,
p2pTimeout: p2pTimeout,
}, nil
}

func (s *sequencerRecoveryElector) Run(pCtx context.Context) error {
s.running.Store(true)
defer s.running.Store(false)

syncCtx, cancel := context.WithCancel(pCtx)
defer cancel()
syncState, syncErrCh, err := s.startSyncPhase(syncCtx)
if err != nil {
return err
}

s.logger.Info().Msg("monitoring catchup status from DA and P2P")
caughtUp, err := s.waitForCatchup(syncCtx, syncState, syncErrCh)
if err != nil {
return err
}
if !caughtUp {
return <-syncErrCh
}
s.logger.Info().Msg("caught up with DA and P2P, stopping sync mode")
cancel()

if err := <-syncErrCh; err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("sync mode failed before switchover completed: %w", err)
}

return s.startAggregatorPhase(pCtx)
}

func (s *sequencerRecoveryElector) startSyncPhase(ctx context.Context) (*failoverState, <-chan error, error) {
s.logger.Info().Msg("starting sequencer recovery: syncing from DA and P2P")

syncRunnable, err := s.followerFactory()
if err != nil {
return nil, nil, fmt.Errorf("create sync mode: %w", err)
}

syncState, ok := syncRunnable.(*failoverState)
if !ok {
return nil, nil, fmt.Errorf("unexpected runnable type from follower factory")
}

s.activeState.Store(syncState)

syncErrCh := make(chan error, 1)
go func() {
syncErrCh <- syncState.Run(ctx)
}()

return syncState, syncErrCh, nil
}

func (s *sequencerRecoveryElector) startAggregatorPhase(ctx context.Context) error {
s.logger.Info().Msg("starting aggregator mode after recovery")

aggRunnable, err := s.leaderFactory()
if err != nil {
return fmt.Errorf("create aggregator mode after recovery: %w", err)
}

if aggState, ok := aggRunnable.(*failoverState); ok {
s.activeState.Store(aggState)
}

return aggRunnable.Run(ctx)
}

// waitForCatchup polls DA and P2P catchup status until both sources indicate the node is caught up.
// Returns (true, nil) when caught up, (false, nil) if context cancelled, or (false, err) on error.
func (s *sequencerRecoveryElector) waitForCatchup(ctx context.Context, syncState *failoverState, syncErrCh <-chan error) (bool, error) {
pollInterval := s.daBlockTime
if pollInterval <= 0 {
pollInterval = 2 * time.Second
}

ticker := time.NewTicker(pollInterval)
defer ticker.Stop()

var timeoutCh <-chan time.Time
if s.p2pTimeout > 0 {
s.logger.Debug().Dur("p2p_timeout", s.p2pTimeout).Msg("P2P catchup timeout configured")
timeoutCh = time.After(s.p2pTimeout)
} else {
s.logger.Debug().Msg("P2P catchup timeout disabled, relying on DA only")
}
ignoreP2P := false

for {
select {
case <-ctx.Done():
return false, nil
case err := <-syncErrCh:
return false, fmt.Errorf("sync mode exited during recovery: %w", err)
case <-timeoutCh:
s.logger.Info().Msg("sequencer recovery: P2P catchup timeout reached, ignoring P2P status")
ignoreP2P = true
timeoutCh = nil
case <-ticker.C:
// Check DA caught up
daCaughtUp := syncState.bc.Syncer != nil && syncState.bc.Syncer.HasReachedDAHead()

// Check P2P caught up: store height >= best known height from P2P
storeHeight, err := s.store.Height(ctx)
if err != nil {
s.logger.Warn().Err(err).Msg("failed to get store height during recovery")
continue
}

maxP2PHeight := max(
syncState.headerSyncService.Store().Height(),
syncState.dataSyncService.Store().Height(),
)

p2pCaughtUp := ignoreP2P || (maxP2PHeight == 0 || storeHeight >= maxP2PHeight)

if daCaughtUp && p2pCaughtUp && storeHeight > 0 {
s.logger.Info().
Uint64("store_height", storeHeight).
Uint64("max_p2p_height", maxP2PHeight).
Msg("sequencer recovery: fully caught up")
return true, nil
}
}
}
}

func (s *sequencerRecoveryElector) IsRunning() bool {
return s.running.Load()
}

// for testing purposes only
func (s *sequencerRecoveryElector) state() *failoverState {
return s.activeState.Load()
}
4 changes: 4 additions & 0 deletions node/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ func newFullNode(
switch {
case nodeConfig.Node.Aggregator && nodeConfig.Raft.Enable:
leaderElection = raftpkg.NewDynamicLeaderElection(logger, leaderFactory, followerFactory, raftNode)
case nodeConfig.Node.Aggregator && nodeConfig.Node.SequencerRecovery.Duration > 0 && !nodeConfig.Raft.Enable:
if leaderElection, err = newSequencerRecoveryElector(logger, leaderFactory, followerFactory, evstore, nodeConfig.DA.BlockTime.Duration, nodeConfig.Node.SequencerRecovery.Duration); err != nil {
return nil, err
}
case nodeConfig.Node.Aggregator && !nodeConfig.Raft.Enable:
if leaderElection, err = newSingleRoleElector(leaderFactory); err != nil {
return nil, err
Expand Down
Loading
Loading