From 5db4d19bb5596daded9a6e8b49ad6540f10f42b6 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Tue, 10 Feb 2026 10:56:07 +0100 Subject: [PATCH 1/2] Recover sequencer --- block/internal/syncing/syncer.go | 10 + node/failover.go | 177 +++++++++++ node/full.go | 4 + node/sequencer_recovery_integration_test.go | 317 ++++++++++++++++++++ pkg/config/config.go | 9 + pkg/config/config_test.go | 2 +- pkg/config/defaults.go | 1 + test/testda/dummy.go | 13 + 8 files changed, 532 insertions(+), 1 deletion(-) create mode 100644 node/sequencer_recovery_integration_test.go diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 6df6b2c22..ad5407343 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -122,6 +122,9 @@ type Syncer struct { // P2P wait coordination p2pWaitState atomic.Value // stores p2pWaitState + // DA head-reached signal for recovery mode (stays true once DA head is seen) + daHeadReached atomic.Bool + // blockSyncer is the interface used for block sync operations. // defaults to self, but can be wrapped with tracing. blockSyncer BlockSyncer @@ -400,6 +403,7 @@ func (s *Syncer) daWorkerLoop() { var backoff time.Duration if err == nil { // No error, means we are caught up. + s.daHeadReached.Store(true) backoff = s.config.DA.BlockTime.Duration } else { // Error, back off for a shorter duration. @@ -417,6 +421,12 @@ func (s *Syncer) daWorkerLoop() { } } +// HasReachedDAHead returns true once the DA worker has reached the DA head. +// Once set, it stays true. +func (s *Syncer) HasReachedDAHead() bool { + return s.daHeadReached.Load() +} + func (s *Syncer) fetchDAUntilCaughtUp() error { for { select { diff --git a/node/failover.go b/node/failover.go index 787f627ce..82a22be5c 100644 --- a/node/failover.go +++ b/node/failover.go @@ -286,3 +286,180 @@ func (a *singleRoleElector) state() *failoverState { } return nil } + +var _ leaderElection = &sequencerRecoveryElector{} +var _ testSupportElection = &sequencerRecoveryElector{} + +// sequencerRecoveryElector implements leaderElection for disaster recovery. +// It starts in sync mode (follower), catches up from DA and P2P, then switches to aggregator (leader) mode. +// This is for single-sequencer setups that don't use raft. +type sequencerRecoveryElector struct { + running atomic.Bool + logger zerolog.Logger + followerFactory func() (raft.Runnable, error) + leaderFactory func() (raft.Runnable, error) + store store.Store + daBlockTime time.Duration + p2pTimeout time.Duration + + // activeState tracks the current failoverState for test access + activeState atomic.Pointer[failoverState] +} + +func newSequencerRecoveryElector( + logger zerolog.Logger, + leaderFactory func() (raft.Runnable, error), + followerFactory func() (raft.Runnable, error), + store store.Store, + daBlockTime time.Duration, + p2pTimeout time.Duration, +) (*sequencerRecoveryElector, error) { + return &sequencerRecoveryElector{ + logger: logger.With().Str("component", "sequencer-recovery").Logger(), + followerFactory: followerFactory, + leaderFactory: leaderFactory, + store: store, + daBlockTime: daBlockTime, + p2pTimeout: p2pTimeout, + }, nil +} + +func (s *sequencerRecoveryElector) Run(pCtx context.Context) error { + s.running.Store(true) + defer s.running.Store(false) + + syncCtx, cancel := context.WithCancel(pCtx) + defer cancel() + syncState, syncErrCh, err := s.startSyncPhase(syncCtx) + if err != nil { + return err + } + + s.logger.Info().Msg("monitoring catchup status from DA and P2P") + caughtUp, err := s.waitForCatchup(syncCtx, syncState, syncErrCh) + if err != nil { + return err + } + if !caughtUp { + return <-syncErrCh + } + s.logger.Info().Msg("caught up with DA and P2P, stopping sync mode") + cancel() + + if err := <-syncErrCh; err != nil && !errors.Is(err, context.Canceled) { + return fmt.Errorf("sync mode stopped with error during recovery switchover: %w", err) + } + + return s.startAggregatorPhase(pCtx) +} + +func (s *sequencerRecoveryElector) startSyncPhase(ctx context.Context) (*failoverState, <-chan error, error) { + s.logger.Info().Msg("starting sequencer recovery: syncing from DA and P2P") + + syncRunnable, err := s.followerFactory() + if err != nil { + return nil, nil, fmt.Errorf("create sync mode: %w", err) + } + + syncState, ok := syncRunnable.(*failoverState) + if !ok { + return nil, nil, fmt.Errorf("unexpected runnable type from follower factory") + } + + s.activeState.Store(syncState) + + syncErrCh := make(chan error, 1) + go func() { + syncErrCh <- syncState.Run(ctx) + }() + + return syncState, syncErrCh, nil +} + +func (s *sequencerRecoveryElector) startAggregatorPhase(ctx context.Context) error { + s.logger.Info().Msg("starting aggregator mode after recovery") + + aggRunnable, err := s.leaderFactory() + if err != nil { + return fmt.Errorf("create aggregator mode after recovery: %w", err) + } + + if aggState, ok := aggRunnable.(*failoverState); ok { + s.activeState.Store(aggState) + } + + return aggRunnable.Run(ctx) +} + +// waitForCatchup polls DA and P2P catchup status until both sources indicate the node is caught up. +// Returns (true, nil) when caught up, (false, nil) if context cancelled, or (false, err) on error. +func (s *sequencerRecoveryElector) waitForCatchup(ctx context.Context, syncState *failoverState, syncErrCh <-chan error) (bool, error) { + pollInterval := s.daBlockTime + if pollInterval <= 0 { + pollInterval = 2 * time.Second + } + + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + + var timeoutCh <-chan time.Time + if s.p2pTimeout > 0 { + timeoutCh = time.After(s.p2pTimeout) + } + ignoreP2P := false + + for { + select { + case <-ctx.Done(): + return false, nil + case err := <-syncErrCh: + return false, fmt.Errorf("sync mode exited during recovery: %w", err) + case <-timeoutCh: + s.logger.Info().Msg("sequencer recovery: P2P catchup timeout reached, ignoring P2P status") + ignoreP2P = true + timeoutCh = nil + case <-ticker.C: + // Check DA caught up + daCaughtUp := syncState.bc.Syncer != nil && syncState.bc.Syncer.HasReachedDAHead() + + // Check P2P caught up: store height >= best known height from P2P + storeHeight, err := s.store.Height(ctx) + if err != nil { + s.logger.Warn().Err(err).Msg("failed to get store height during recovery") + continue + } + + maxP2PHeight := max( + syncState.headerSyncService.Store().Height(), + syncState.dataSyncService.Store().Height(), + ) + + p2pCaughtUp := ignoreP2P || (maxP2PHeight == 0 || storeHeight >= maxP2PHeight) + + s.logger.Debug(). + Bool("da_caught_up", daCaughtUp). + Bool("p2p_caught_up", p2pCaughtUp). + Bool("ignore_p2p", ignoreP2P). + Uint64("store_height", storeHeight). + Uint64("max_p2p_height", maxP2PHeight). + Msg("recovery catchup status") + + if daCaughtUp && p2pCaughtUp && storeHeight > 0 { + s.logger.Info(). + Uint64("store_height", storeHeight). + Uint64("max_p2p_height", maxP2PHeight). + Msg("sequencer recovery: fully caught up") + return true, nil + } + } + } +} + +func (s *sequencerRecoveryElector) IsRunning() bool { + return s.running.Load() +} + +// for testing purposes only +func (s *sequencerRecoveryElector) state() *failoverState { + return s.activeState.Load() +} diff --git a/node/full.go b/node/full.go index 4fa2ff7c5..d2daf3bd3 100644 --- a/node/full.go +++ b/node/full.go @@ -120,6 +120,10 @@ func newFullNode( switch { case nodeConfig.Node.Aggregator && nodeConfig.Raft.Enable: leaderElection = raftpkg.NewDynamicLeaderElection(logger, leaderFactory, followerFactory, raftNode) + case nodeConfig.Node.Aggregator && nodeConfig.Node.SequencerRecovery.Duration > 0 && !nodeConfig.Raft.Enable: + if leaderElection, err = newSequencerRecoveryElector(logger, leaderFactory, followerFactory, evstore, nodeConfig.DA.BlockTime.Duration, nodeConfig.Node.SequencerRecovery.Duration); err != nil { + return nil, err + } case nodeConfig.Node.Aggregator && !nodeConfig.Raft.Enable: if leaderElection, err = newSingleRoleElector(leaderFactory); err != nil { return nil, err diff --git a/node/sequencer_recovery_integration_test.go b/node/sequencer_recovery_integration_test.go new file mode 100644 index 000000000..723e507e9 --- /dev/null +++ b/node/sequencer_recovery_integration_test.go @@ -0,0 +1,317 @@ +//go:build integration + +package node + +import ( + "context" + "errors" + "fmt" + "sync" + "testing" + "time" + + "github.com/evstack/ev-node/pkg/genesis" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + + coreexecutor "github.com/evstack/ev-node/core/execution" + coresequencer "github.com/evstack/ev-node/core/sequencer" + evconfig "github.com/evstack/ev-node/pkg/config" + "github.com/evstack/ev-node/pkg/p2p/key" + signer "github.com/evstack/ev-node/pkg/signer/noop" + "github.com/evstack/ev-node/pkg/store" + "github.com/evstack/ev-node/types" +) + +// TestSequencerRecoveryFromDA verifies that a new sequencer node can recover from DA. +// +// This test: +// 1. Starts a normal sequencer and waits for it to produce blocks and submit them to DA +// 2. Stops the sequencer +// 3. Starts a NEW sequencer with SequencerRecovery=true using the same DA but a fresh store +// 4. Verifies the recovery node syncs all blocks from DA +// 5. Verifies the recovery node switches to aggregator mode and produces NEW blocks +func TestSequencerRecoveryFromDA(t *testing.T) { + config := getTestConfig(t, 1) + config.Node.BlockTime = evconfig.DurationWrapper{Duration: 100 * time.Millisecond} + config.DA.BlockTime = evconfig.DurationWrapper{Duration: 200 * time.Millisecond} + config.P2P.Peers = "none" // DA-only recovery + + // Shared genesis and keys for both nodes + genesis, genesisValidatorKey, _ := types.GetGenesisWithPrivkey("test-chain") + // Common components setup + resetSharedDummyDA() + + // 1. Start original sequencer + executor, sequencer, daClient, nodeKey, ds, stopDAHeightTicker := createTestComponents(t, config) + + signer, err := signer.NewNoopSigner(genesisValidatorKey) + require.NoError(t, err) + + originalNode, err := NewNode(config, executor, sequencer, daClient, signer, nodeKey, genesis, ds, + DefaultMetricsProvider(evconfig.DefaultInstrumentationConfig()), testLogger(t), NodeOptions{}) + require.NoError(t, err) + + originalCleanup := func() { + stopDAHeightTicker() + } + + errChan := make(chan error, 1) + cancel1 := runNodeInBackground(t, originalNode.(*FullNode), errChan) + + blocksProduced := uint64(5) + require.NoError(t, waitForAtLeastNDAIncludedHeight(originalNode, blocksProduced), + "original sequencer should produce and DA-include blocks") + requireEmptyChan(t, errChan) + + originalHashes := collectBlockHashes(t, originalNode.(*FullNode), blocksProduced) + + // Stop original sequencer + cancel1() + originalCleanup() + + verifyBlobsInDA(t) + + // Start recovery sequencer with fresh store but same DA + recoveryConfig := getTestConfig(t, 2) + recoveryConfig.Node.BlockTime = evconfig.DurationWrapper{Duration: 100 * time.Millisecond} + recoveryConfig.DA.BlockTime = evconfig.DurationWrapper{Duration: 200 * time.Millisecond} + recoveryConfig.Node.SequencerRecovery = evconfig.DurationWrapper{Duration: 500 * time.Millisecond} + recoveryConfig.P2P.Peers = "" + + recoveryNode, recNodeCleanup := setupRecoveryNode(t, recoveryConfig, genesis, genesisValidatorKey, testLogger(t)) + defer recNodeCleanup() + + errChan2 := make(chan error, 1) + cancel2 := runNodeInBackground(t, recoveryNode, errChan2) + defer cancel2() + + // Give the node a moment to start (or fail), then check for early errors + requireNodeStartedSuccessfully(t, errChan2, 500*time.Millisecond) + + // Recovery node must sync all DA blocks then produce new ones + newBlocksTarget := blocksProduced + 3 + require.NoError(t, waitForAtLeastNBlocks(recoveryNode, newBlocksTarget, Store), + "recovery node should sync from DA and then produce new blocks") + requireEmptyChan(t, errChan2) + + assertBlockHashesMatch(t, recoveryNode, originalHashes) +} + +// TestSequencerRecoveryFromP2P verifies recovery when some blocks are only on P2P, not yet DA-included. +// +// This test: +// 1. Starts a sequencer with fast block time but slow DA, so blocks outpace DA inclusion +// 2. Starts a fullnode connected via P2P that syncs those blocks +// 3. Stops the sequencer (some blocks exist only on fullnode via P2P, not on DA) +// 4. Starts a recovery sequencer with P2P peer pointing to the fullnode +// 5. Verifies the recovery node catches up from both DA and P2P before producing new blocks +func TestSequencerRecoveryFromP2P(t *testing.T) { + genesis, genesisValidatorKey, _ := types.GetGenesisWithPrivkey("test-chain") + remoteSigner, err := signer.NewNoopSigner(genesisValidatorKey) + require.NoError(t, err) + + logger := testLogger(t) + + // Phase 1: Start sequencer with fast blocks, slow DA + seqConfig := getTestConfig(t, 1) + seqConfig.Node.BlockTime = evconfig.DurationWrapper{Duration: 100 * time.Millisecond} + seqConfig.DA.BlockTime = evconfig.DurationWrapper{Duration: 10 * time.Second} // very slow DA + resetSharedDummyDA() + + seqExecutor, seqSequencer, daClient, seqP2PKey, seqDS, stopTicker := createTestComponents(t, seqConfig) + defer stopTicker() + + seqPeerAddr := peerAddress(t, seqP2PKey, seqConfig.P2P.ListenAddress) + seqNode, err := NewNode(seqConfig, seqExecutor, seqSequencer, daClient, remoteSigner, seqP2PKey, genesis, seqDS, + DefaultMetricsProvider(evconfig.DefaultInstrumentationConfig()), logger, NodeOptions{}) + require.NoError(t, err) + sequencer := seqNode.(*FullNode) + + errChan := make(chan error, 3) + seqCancel := runNodeInBackground(t, sequencer, errChan) + + blocksViaP2P := uint64(5) + require.NoError(t, waitForAtLeastNBlocks(sequencer, blocksViaP2P, Store), + "sequencer should produce blocks") + + // Phase 2: Start fullnode connected to sequencer via P2P + fnConfig := getTestConfig(t, 2) + fnConfig.Node.Aggregator = false + fnConfig.Node.BlockTime = evconfig.DurationWrapper{Duration: 100 * time.Millisecond} + fnConfig.DA.BlockTime = evconfig.DurationWrapper{Duration: 10 * time.Second} + fnConfig.P2P.ListenAddress = "/ip4/127.0.0.1/tcp/40002" + fnConfig.P2P.Peers = seqPeerAddr + fnConfig.RPC.Address = "127.0.0.1:8002" + + fnP2PKey := &key.NodeKey{PrivKey: genesisValidatorKey, PubKey: genesisValidatorKey.GetPublic()} + fnDS, err := store.NewTestInMemoryKVStore() + require.NoError(t, err) + + fnNode, err := NewNode(fnConfig, coreexecutor.NewDummyExecutor(), coresequencer.NewDummySequencer(), + daClient, nil, fnP2PKey, genesis, fnDS, + DefaultMetricsProvider(evconfig.DefaultInstrumentationConfig()), logger, NodeOptions{}) + require.NoError(t, err) + fullnode := fnNode.(*FullNode) + + fnCancel := runNodeInBackground(t, fullnode, errChan) + defer fnCancel() + + require.NoError(t, waitForAtLeastNBlocks(fullnode, blocksViaP2P, Store), + "fullnode should sync blocks via P2P") + requireEmptyChan(t, errChan) + + fnHeight, err := fullnode.Store.Height(t.Context()) + require.NoError(t, err) + originalHashes := collectBlockHashes(t, fullnode, fnHeight) + + // Stop sequencer (fullnode keeps running with P2P-only blocks) + seqCancel() + + // Phase 3: Start recovery sequencer connected to surviving fullnode via P2P + fnPeerAddr := peerAddress(t, fnP2PKey, fnConfig.P2P.ListenAddress) + + recConfig := getTestConfig(t, 3) + recConfig.Node.BlockTime = evconfig.DurationWrapper{Duration: 100 * time.Millisecond} + recConfig.DA.BlockTime = evconfig.DurationWrapper{Duration: 200 * time.Millisecond} + recConfig.Node.SequencerRecovery = evconfig.DurationWrapper{Duration: 3 * time.Minute} + recConfig.P2P.ListenAddress = "/ip4/127.0.0.1/tcp/40003" + recConfig.P2P.Peers = fnPeerAddr + recConfig.RPC.Address = "127.0.0.1:8003" + + recoveryNode, recStopTicker := setupRecoveryNode(t, recConfig, genesis, genesisValidatorKey, logger) + defer recStopTicker() + + recCancel := runNodeInBackground(t, recoveryNode, errChan) + defer recCancel() + + newTarget := fnHeight + 3 + require.NoError(t, waitForAtLeastNBlocks(recoveryNode, newTarget, Store), + "recovery node should catch up via P2P and produce new blocks") + requireEmptyChan(t, errChan) + + assertBlockHashesMatch(t, recoveryNode, originalHashes) + + // Shutdown + recCancel() + fnCancel() +} + +// collectBlockHashes returns a map of height→hash for blocks 1..maxHeight from the given node. +func collectBlockHashes(t *testing.T, node *FullNode, maxHeight uint64) map[uint64]types.Hash { + t.Helper() + hashes := make(map[uint64]types.Hash, maxHeight) + for h := uint64(1); h <= maxHeight; h++ { + header, _, err := node.Store.GetBlockData(t.Context(), h) + require.NoError(t, err, "failed to get block %d", h) + hashes[h] = header.Hash() + } + return hashes +} + +// assertBlockHashesMatch verifies that the node's blocks match the expected hashes. +func assertBlockHashesMatch(t *testing.T, node *FullNode, expected map[uint64]types.Hash) { + t.Helper() + for h, expHash := range expected { + header, _, err := node.Store.GetBlockData(t.Context(), h) + require.NoError(t, err, "node should have block %d", h) + require.Equal(t, expHash, header.Hash(), "block hash mismatch at height %d", h) + } +} + +// peerAddress returns the P2P multiaddr string for a given node key and listen address. +func peerAddress(t *testing.T, nodeKey *key.NodeKey, listenAddr string) string { + t.Helper() + peerID, err := peer.IDFromPrivateKey(nodeKey.PrivKey) + require.NoError(t, err) + return fmt.Sprintf("%s/p2p/%s", listenAddr, peerID.Loggable()["peerID"].(string)) +} + +// testLogger returns a zerolog.Logger that writes to testing.T if verbose, nop otherwise. +func testLogger(t *testing.T) zerolog.Logger { + t.Helper() + if testing.Verbose() { + return zerolog.New(zerolog.NewTestWriter(t)) + } + return zerolog.Nop() +} + +// runNodeInBackground starts a node in a goroutine and returns a cancel function. +// Errors from node.Run (except context.Canceled) are sent to errChan. +func runNodeInBackground(t *testing.T, node *FullNode, errChan chan error) context.CancelFunc { + t.Helper() + ctx, cancel := context.WithCancel(t.Context()) + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + if err := node.Run(ctx); err != nil && !errors.Is(err, context.Canceled) { + t.Logf("node Run() returned error: %v", err) + errChan <- err + } + }() + + return func() { + cancel() + shutdownAndWait(t, []context.CancelFunc{func() {}}, &wg, 10*time.Second) + } +} + +// setupRecoveryNode creates and configures a recovery sequencer node. +// Returns the node and a cleanup function for stopping the ticker. +func setupRecoveryNode(t *testing.T, config evconfig.Config, genesis genesis.Genesis, genesisValidatorKey crypto.PrivKey, logger zerolog.Logger) (*FullNode, func()) { + t.Helper() + + recExecutor, recSequencer, recDAClient, recKey, recDS, recStopTicker := createTestComponents(t, config) + + // Create recovery signer (same key as validator) + recSigner, err := signer.NewNoopSigner(genesisValidatorKey) + require.NoError(t, err) + + recNode, err := NewNode(config, recExecutor, recSequencer, recDAClient, recSigner, recKey, genesis, recDS, + DefaultMetricsProvider(evconfig.DefaultInstrumentationConfig()), logger, NodeOptions{}) + require.NoError(t, err) + + return recNode.(*FullNode), recStopTicker +} + +// stopNodeAndCleanup stops a running node and calls its cleanup function. + +// requireNodeStartedSuccessfully checks that a node started without early errors. +// It waits for the specified duration and checks the error channel. +func requireNodeStartedSuccessfully(t *testing.T, errChan chan error, waitTime time.Duration) { + t.Helper() + time.Sleep(waitTime) + select { + case err := <-errChan: + require.NoError(t, err, "recovery node failed to start") + default: + // Node is still running, good + } +} + +// verifyBlobsInDA is a diagnostic helper that verifies blobs are still in shared DA. +func verifyBlobsInDA(t *testing.T) { + t.Helper() + sharedDA := getSharedDummyDA(0) + daHeight := sharedDA.Height() + t.Logf("DIAG: sharedDA height after original node stopped: %d", daHeight) + blobsFound := 0 + for h := uint64(0); h <= daHeight; h++ { + res := sharedDA.Retrieve(context.Background(), h, sharedDA.GetHeaderNamespace()) + if res.Code == 1 { // StatusSuccess + blobsFound += len(res.Data) + t.Logf("DIAG: found %d header blob(s) at DA height %d (Success)", len(res.Data), h) + } + res = sharedDA.Retrieve(context.Background(), h, sharedDA.GetDataNamespace()) + if res.Code == 1 { // StatusSuccess + blobsFound += len(res.Data) + t.Logf("DIAG: found %d data blob(s) at DA height %d (Success)", len(res.Data), h) + } + } + t.Logf("DIAG: total blobs found in DA: %d", blobsFound) + require.Greater(t, blobsFound, 0, "shared DA should contain blobs from original sequencer") +} diff --git a/pkg/config/config.go b/pkg/config/config.go index e03a277ce..c9a0831de 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -51,6 +51,8 @@ const ( FlagReadinessMaxBlocksBehind = FlagPrefixEvnode + "node.readiness_max_blocks_behind" // FlagScrapeInterval is a flag for specifying the reaper scrape interval FlagScrapeInterval = FlagPrefixEvnode + "node.scrape_interval" + // FlagSequencerRecovery is a flag for starting in sync mode first, then switching to aggregator after catchup + FlagSequencerRecovery = FlagPrefixEvnode + "node.sequencer_recovery" // FlagClearCache is a flag for clearing the cache FlagClearCache = FlagPrefixEvnode + "clear_cache" @@ -257,6 +259,7 @@ type NodeConfig struct { LazyMode bool `mapstructure:"lazy_mode" yaml:"lazy_mode" comment:"Enables lazy aggregation mode, where blocks are only produced when transactions are available or after LazyBlockTime. Optimizes resources by avoiding empty block creation during periods of inactivity."` LazyBlockInterval DurationWrapper `mapstructure:"lazy_block_interval" yaml:"lazy_block_interval" comment:"Maximum interval between blocks in lazy aggregation mode (LazyAggregator). Ensures blocks are produced periodically even without transactions to keep the chain active. Generally larger than BlockTime."` ScrapeInterval DurationWrapper `mapstructure:"scrape_interval" yaml:"scrape_interval" comment:"Interval at which the reaper polls the execution layer for new transactions. Lower values reduce transaction detection latency but increase RPC load. Examples: \"250ms\", \"500ms\", \"1s\"."` + SequencerRecovery DurationWrapper `mapstructure:"sequencer_recovery" yaml:"sequencer_recovery" comment:"Start in sync mode first, catch up from DA and P2P, then switch to aggregator mode. Requires aggregator mode. Value specifies time to wait for P2P reconnections. Use for disaster recovery of a sequencer that lost its data."` // Readiness / health configuration ReadinessWindowSeconds uint64 `mapstructure:"readiness_window_seconds" yaml:"readiness_window_seconds" comment:"Time window in seconds used to calculate ReadinessMaxBlocksBehind based on block time. Default: 15 seconds."` @@ -351,6 +354,11 @@ func (c *Config) Validate() error { return fmt.Errorf("based sequencer mode requires aggregator mode to be enabled") } + // Validate sequencer recovery requires aggregator mode + if c.Node.SequencerRecovery.Duration > 0 && !c.Node.Aggregator { + return fmt.Errorf("sequencer recovery mode requires aggregator mode to be enabled") + } + // Validate namespaces if err := validateNamespace(c.DA.GetNamespace()); err != nil { return fmt.Errorf("could not validate namespace (%s): %w", c.DA.GetNamespace(), err) @@ -436,6 +444,7 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().Uint64(FlagReadinessWindowSeconds, def.Node.ReadinessWindowSeconds, "time window in seconds for calculating readiness threshold based on block time (default: 15s)") cmd.Flags().Uint64(FlagReadinessMaxBlocksBehind, def.Node.ReadinessMaxBlocksBehind, "how many blocks behind best-known head the node can be and still be considered ready (0 = must be at head)") cmd.Flags().Duration(FlagScrapeInterval, def.Node.ScrapeInterval.Duration, "interval at which the reaper polls the execution layer for new transactions") + cmd.Flags().Duration(FlagSequencerRecovery, def.Node.SequencerRecovery.Duration, "start in sync mode first, catch up from DA/P2P, then switch to aggregator (disaster recovery). Value specifies time to wait for in-flight P2P blocks.") // Data Availability configuration flags cmd.Flags().String(FlagDAAddress, def.DA.Address, "DA address (host:port)") diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 1834e1b40..9d4532b90 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -112,7 +112,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagRPCEnableDAVisualization, DefaultConfig().RPC.EnableDAVisualization) // Count the number of flags we're explicitly checking - expectedFlagCount := 63 // Update this number if you add more flag checks above + expectedFlagCount := 64 // Update this number if you add more flag checks above // Get the actual number of flags (both regular and persistent) actualFlagCount := 0 diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index 0de2f4bc2..14277acbd 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -69,6 +69,7 @@ func DefaultConfig() Config { ReadinessWindowSeconds: defaultReadinessWindowSeconds, ReadinessMaxBlocksBehind: calculateReadinessMaxBlocksBehind(defaultBlockTime.Duration, defaultReadinessWindowSeconds), ScrapeInterval: DurationWrapper{1 * time.Second}, + SequencerRecovery: DurationWrapper{0}, }, DA: DAConfig{ Address: "http://localhost:7980", diff --git a/test/testda/dummy.go b/test/testda/dummy.go index 684d3fcee..f4e0c4397 100644 --- a/test/testda/dummy.go +++ b/test/testda/dummy.go @@ -72,6 +72,19 @@ func New(opts ...Option) *DummyDA { return d } +// BlobCount returns the total number of blobs stored across all heights and namespaces. +func (d *DummyDA) BlobCount() int { + d.mu.Lock() + defer d.mu.Unlock() + count := 0 + for _, nss := range d.blobs { + for _, blobs := range nss { + count += len(blobs) + } + } + return count +} + // Submit stores blobs and returns success or simulated failure. func (d *DummyDA) Submit(_ context.Context, data [][]byte, _ float64, namespace []byte, _ []byte) datypes.ResultSubmit { if d.failSubmit.Load() { From 7aa2b745a2120166618fe1c6f48de5b3bbe77f8a Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Tue, 10 Feb 2026 11:32:59 +0100 Subject: [PATCH 2/2] Review feedback --- node/failover.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/node/failover.go b/node/failover.go index 82a22be5c..9e430ea28 100644 --- a/node/failover.go +++ b/node/failover.go @@ -347,7 +347,7 @@ func (s *sequencerRecoveryElector) Run(pCtx context.Context) error { cancel() if err := <-syncErrCh; err != nil && !errors.Is(err, context.Canceled) { - return fmt.Errorf("sync mode stopped with error during recovery switchover: %w", err) + return fmt.Errorf("sync mode failed before switchover completed: %w", err) } return s.startAggregatorPhase(pCtx) @@ -404,7 +404,10 @@ func (s *sequencerRecoveryElector) waitForCatchup(ctx context.Context, syncState var timeoutCh <-chan time.Time if s.p2pTimeout > 0 { + s.logger.Debug().Dur("p2p_timeout", s.p2pTimeout).Msg("P2P catchup timeout configured") timeoutCh = time.After(s.p2pTimeout) + } else { + s.logger.Debug().Msg("P2P catchup timeout disabled, relying on DA only") } ignoreP2P := false @@ -436,14 +439,6 @@ func (s *sequencerRecoveryElector) waitForCatchup(ctx context.Context, syncState p2pCaughtUp := ignoreP2P || (maxP2PHeight == 0 || storeHeight >= maxP2PHeight) - s.logger.Debug(). - Bool("da_caught_up", daCaughtUp). - Bool("p2p_caught_up", p2pCaughtUp). - Bool("ignore_p2p", ignoreP2P). - Uint64("store_height", storeHeight). - Uint64("max_p2p_height", maxP2PHeight). - Msg("recovery catchup status") - if daCaughtUp && p2pCaughtUp && storeHeight > 0 { s.logger.Info(). Uint64("store_height", storeHeight).