From d45758e71a8ed974f4124855a4ba094457d89d78 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Thu, 12 Feb 2026 08:42:58 +0100 Subject: [PATCH 1/3] Re-use p2p-client --- apps/testapp/go.mod | 2 +- apps/testapp/go.sum | 2 - block/components_test.go | 4 +- node/failover.go | 59 ++++++++-------- node/full.go | 27 +++++-- node/helpers_test.go | 51 +++++++++----- node/light.go | 13 +--- node/light_test.go | 6 +- node/node.go | 15 ++-- node/single_sequencer_integration_test.go | 12 ++-- pkg/cmd/run_node.go | 10 ++- pkg/cmd/run_node_test.go | 7 +- pkg/p2p/client.go | 38 +++++++--- pkg/p2p/utils_test.go | 6 +- pkg/store/header_store_adapter_test.go | 86 ++++++++++++++++++++++- pkg/store/store_adapter.go | 29 ++++++-- pkg/sync/sync_service.go | 18 +++-- test/e2e/failover_e2e_test.go | 6 ++ 18 files changed, 282 insertions(+), 109 deletions(-) diff --git a/apps/testapp/go.mod b/apps/testapp/go.mod index befa3aa536..d4e7306a95 100644 --- a/apps/testapp/go.mod +++ b/apps/testapp/go.mod @@ -2,7 +2,7 @@ module github.com/evstack/ev-node/apps/testapp go 1.25.6 -//replace github.com/evstack/ev-node => ../../ +replace github.com/evstack/ev-node => ../../ require ( github.com/evstack/ev-node v1.0.0-rc.4 diff --git a/apps/testapp/go.sum b/apps/testapp/go.sum index f07cb58dc1..13b8f9adce 100644 --- a/apps/testapp/go.sum +++ b/apps/testapp/go.sum @@ -367,8 +367,6 @@ github.com/envoyproxy/protoc-gen-validate v0.10.0/go.mod h1:DRjgyB0I43LtJapqN6Ni github.com/envoyproxy/protoc-gen-validate v0.10.1/go.mod h1:DRjgyB0I43LtJapqN6NiRwroiAU2PaFuvk/vjgh61ss= github.com/envoyproxy/protoc-gen-validate v1.0.1/go.mod h1:0vj8bNkYbSTNS2PIyH87KZaeN4x9zpL9Qt8fQC7d+vs= github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE= -github.com/evstack/ev-node v1.0.0-rc.4 h1:Ju7pSETFdadBZxmAj0//4z7hHkXbSRDy9iTzhF60Dew= -github.com/evstack/ev-node v1.0.0-rc.4/go.mod h1:xGCH5NCdGiYk6v3GVPm4NhzAtcKQgnaVnORg8b4tbOk= github.com/evstack/ev-node/core v1.0.0-rc.1 h1:Dic2PMUMAYUl5JW6DkDj6HXDEWYzorVJQuuUJOV0FjE= github.com/evstack/ev-node/core v1.0.0-rc.1/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= diff --git a/block/components_test.go b/block/components_test.go index 93c08a655a..b0d307ceb5 100644 --- a/block/components_test.go +++ b/block/components_test.go @@ -175,7 +175,7 @@ func TestNewAggregatorComponents_Creation(t *testing.T) { zerolog.Nop(), NopMetrics(), DefaultBlockOptions(), - nil, + nil, // raftNode ) require.NoError(t, err) @@ -258,7 +258,7 @@ func TestExecutor_RealExecutionClientFailure_StopsNode(t *testing.T) { zerolog.Nop(), NopMetrics(), DefaultBlockOptions(), - nil, + nil, // raftNode ) require.NoError(t, err) diff --git a/node/failover.go b/node/failover.go index 787f627ce6..0dfcb54179 100644 --- a/node/failover.go +++ b/node/failover.go @@ -14,13 +14,11 @@ import ( "github.com/evstack/ev-node/pkg/config" genesispkg "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/pkg/p2p" - "github.com/evstack/ev-node/pkg/p2p/key" "github.com/evstack/ev-node/pkg/raft" rpcserver "github.com/evstack/ev-node/pkg/rpc/server" "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/store" evsync "github.com/evstack/ev-node/pkg/sync" - ds "github.com/ipfs/go-datastore" "github.com/rs/zerolog" "golang.org/x/sync/errgroup" ) @@ -38,10 +36,7 @@ type failoverState struct { func newSyncMode( nodeConfig config.Config, - nodeKey *key.NodeKey, genesis genesispkg.Genesis, - rootDB ds.Batching, - daStore store.Store, exec coreexecutor.Executor, da block.DAClient, logger zerolog.Logger, @@ -49,6 +44,7 @@ func newSyncMode( blockMetrics *block.Metrics, nodeOpts NodeOptions, raftNode *raft.Node, + p2pClient *p2p.Client, ) (*failoverState, error) { blockComponentsFn := func(headerSyncService *evsync.HeaderSyncService, dataSyncService *evsync.DataSyncService) (*block.Components, error) { return block.NewSyncComponents( @@ -67,16 +63,13 @@ func newSyncMode( raftNode, ) } - return setupFailoverState(nodeConfig, nodeKey, rootDB, daStore, genesis, logger, rktStore, blockComponentsFn, raftNode) + return setupFailoverState(nodeConfig, genesis, logger, rktStore, blockComponentsFn, raftNode, p2pClient) } func newAggregatorMode( nodeConfig config.Config, - nodeKey *key.NodeKey, signer signer.Signer, genesis genesispkg.Genesis, - rootDB ds.Batching, - daStore store.Store, exec coreexecutor.Executor, sequencer coresequencer.Sequencer, da block.DAClient, @@ -85,8 +78,8 @@ func newAggregatorMode( blockMetrics *block.Metrics, nodeOpts NodeOptions, raftNode *raft.Node, + p2pClient *p2p.Client, ) (*failoverState, error) { - blockComponentsFn := func(headerSyncService *evsync.HeaderSyncService, dataSyncService *evsync.DataSyncService) (*block.Components, error) { return block.NewAggregatorComponents( nodeConfig, @@ -105,31 +98,24 @@ func newAggregatorMode( ) } - return setupFailoverState(nodeConfig, nodeKey, rootDB, daStore, genesis, logger, rktStore, blockComponentsFn, raftNode) + return setupFailoverState(nodeConfig, genesis, logger, rktStore, blockComponentsFn, raftNode, p2pClient) } func setupFailoverState( nodeConfig config.Config, - nodeKey *key.NodeKey, - rootDB ds.Batching, - daStore store.Store, genesis genesispkg.Genesis, logger zerolog.Logger, rktStore store.Store, buildComponentsFn func(headerSyncService *evsync.HeaderSyncService, dataSyncService *evsync.DataSyncService) (*block.Components, error), raftNode *raft.Node, + p2pClient *p2p.Client, ) (*failoverState, error) { - p2pClient, err := p2p.NewClient(nodeConfig.P2P, nodeKey.PrivKey, rootDB, genesis.ChainID, logger, nil) - if err != nil { - return nil, err - } - - headerSyncService, err := evsync.NewHeaderSyncService(daStore, nodeConfig, genesis, p2pClient, logger.With().Str("component", "HeaderSyncService").Logger()) + headerSyncService, err := evsync.NewHeaderSyncService(rktStore, nodeConfig, genesis, p2pClient, logger.With().Str("component", "HeaderSyncService").Logger()) if err != nil { return nil, fmt.Errorf("error while initializing HeaderSyncService: %w", err) } - dataSyncService, err := evsync.NewDataSyncService(daStore, nodeConfig, genesis, p2pClient, logger.With().Str("component", "DataSyncService").Logger()) + dataSyncService, err := evsync.NewDataSyncService(rktStore, nodeConfig, genesis, p2pClient, logger.With().Str("component", "DataSyncService").Logger()) if err != nil { return nil, fmt.Errorf("error while initializing DataSyncService: %w", err) } @@ -203,19 +189,30 @@ func (f *failoverState) Run(pCtx context.Context) (multiErr error) { return nil }) - if err := f.p2pClient.Start(ctx); err != nil { - return fmt.Errorf("start p2p: %w", err) - } - defer f.p2pClient.Close() // nolint: errcheck + // P2P client persists across mode switches (started/closed by FullNode.Run). + // Reconfigure() was already called in setupFailoverState to re-bootstrap DHT. - if err := f.headerSyncService.Start(ctx); err != nil { - return fmt.Errorf("error while starting header sync service: %w", err) + // Start header and data sync services concurrently. Each service's + // initFromP2PWithRetry can block up to 30s when peers have no blocks + // (e.g. lazy mode sequencer at height 0). Running them in parallel + // avoids a 60s cumulative startup delay. + syncWg, syncCtx := errgroup.WithContext(ctx) + syncWg.Go(func() error { + if err := f.headerSyncService.Start(syncCtx); err != nil { + return fmt.Errorf("error while starting header sync service: %w", err) + } + return nil + }) + syncWg.Go(func() error { + if err := f.dataSyncService.Start(syncCtx); err != nil { + return fmt.Errorf("error while starting data sync service: %w", err) + } + return nil + }) + if err := syncWg.Wait(); err != nil { + return err } defer stopService(f.headerSyncService.Stop, "header sync") - - if err := f.dataSyncService.Start(ctx); err != nil { - return fmt.Errorf("error while starting data sync service: %w", err) - } defer stopService(f.dataSyncService.Stop, "data sync") wg.Go(func() error { diff --git a/node/full.go b/node/full.go index 4fa2ff7c52..42e4b6349f 100644 --- a/node/full.go +++ b/node/full.go @@ -22,7 +22,7 @@ import ( coresequencer "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/pkg/config" genesispkg "github.com/evstack/ev-node/pkg/genesis" - "github.com/evstack/ev-node/pkg/p2p/key" + "github.com/evstack/ev-node/pkg/p2p" raftpkg "github.com/evstack/ev-node/pkg/raft" "github.com/evstack/ev-node/pkg/service" "github.com/evstack/ev-node/pkg/signer" @@ -53,7 +53,8 @@ type FullNode struct { nodeConfig config.Config - daClient block.DAClient + daClient block.DAClient + p2pClient *p2p.Client Store store.Store raftNode *raftpkg.Node @@ -66,7 +67,7 @@ type FullNode struct { // newFullNode creates a new Rollkit full node. func newFullNode( nodeConfig config.Config, - nodeKey *key.NodeKey, + p2pClient *p2p.Client, signer signer.Signer, genesis genesispkg.Genesis, database ds.Batching, @@ -103,16 +104,17 @@ func newFullNode( } } + // The p2p client is fully configured and started before leader election. + // SyncService.getPeerIDs() gates peer usage on conf.Node.Aggregator. leaderFactory := func() (raftpkg.Runnable, error) { logger.Info().Msg("Starting aggregator-MODE") nodeConfig.Node.Aggregator = true - nodeConfig.P2P.Peers = "" // peers are not supported in aggregator mode - return newAggregatorMode(nodeConfig, nodeKey, signer, genesis, database, evstore, exec, sequencer, daClient, logger, evstore, blockMetrics, nodeOpts, raftNode) + return newAggregatorMode(nodeConfig, signer, genesis, exec, sequencer, daClient, logger, evstore, blockMetrics, nodeOpts, raftNode, p2pClient) } followerFactory := func() (raftpkg.Runnable, error) { logger.Info().Msg("Starting sync-MODE") nodeConfig.Node.Aggregator = false - return newSyncMode(nodeConfig, nodeKey, genesis, database, evstore, exec, daClient, logger, evstore, blockMetrics, nodeOpts, raftNode) + return newSyncMode(nodeConfig, genesis, exec, daClient, logger, evstore, blockMetrics, nodeOpts, raftNode, p2pClient) } // Initialize raft node if enabled (for both aggregator and sync nodes) @@ -136,6 +138,7 @@ func newFullNode( genesis: genesis, nodeConfig: nodeConfig, daClient: daClient, + p2pClient: p2pClient, Store: evstore, leaderElection: leaderElection, raftNode: raftNode, @@ -279,6 +282,18 @@ func (n *FullNode) Run(parentCtx context.Context) error { (n.nodeConfig.Instrumentation.IsPrometheusEnabled() || n.nodeConfig.Instrumentation.IsPprofEnabled()) { n.prometheusSrv, n.pprofSrv = n.startInstrumentationServer() } + + // Start the P2P client once. It persists across mode switches so that + // the host and PubSub (including externally registered topics) survive. + if err := n.p2pClient.Start(ctx); err != nil { + return fmt.Errorf("start p2p: %w", err) + } + defer func() { + if err := n.p2pClient.Close(); err != nil { + n.Logger.Error().Err(err).Msg("error closing p2p client") + } + }() + // Start leader election if n.raftNode != nil { if err := n.raftNode.Start(ctx); err != nil { diff --git a/node/helpers_test.go b/node/helpers_test.go index 94989d2f7b..2127ef9aaf 100644 --- a/node/helpers_test.go +++ b/node/helpers_test.go @@ -16,12 +16,13 @@ import ( coresequencer "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/test/testda" "github.com/ipfs/go-datastore" + "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" "github.com/rs/zerolog" "github.com/stretchr/testify/require" evconfig "github.com/evstack/ev-node/pkg/config" - "github.com/evstack/ev-node/pkg/p2p/key" + "github.com/evstack/ev-node/pkg/p2p" remote_signer "github.com/evstack/ev-node/pkg/signer/noop" "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/types" @@ -67,22 +68,18 @@ func newDummyDAClient(maxBlobSize uint64) *testda.DummyDA { return getSharedDummyDA(maxBlobSize) } -func createTestComponents(t *testing.T, config evconfig.Config) (coreexecutor.Executor, coresequencer.Sequencer, block.DAClient, *key.NodeKey, datastore.Batching, func()) { +func createTestComponents(t *testing.T, config evconfig.Config) (coreexecutor.Executor, coresequencer.Sequencer, block.DAClient, crypto.PrivKey, datastore.Batching, func()) { executor := coreexecutor.NewDummyExecutor() sequencer := coresequencer.NewDummySequencer() daClient := newDummyDAClient(0) // Create genesis and keys for P2P client _, genesisValidatorKey, _ := types.GetGenesisWithPrivkey("test-chain") - p2pKey := &key.NodeKey{ - PrivKey: genesisValidatorKey, - PubKey: genesisValidatorKey.GetPublic(), - } ds, err := store.NewTestInMemoryKVStore() require.NoError(t, err) stop := daClient.StartHeightTicker(config.DA.BlockTime.Duration) - return executor, sequencer, daClient, p2pKey, ds, stop + return executor, sequencer, daClient, genesisValidatorKey, ds, stop } func getTestConfig(t *testing.T, n int) evconfig.Config { @@ -120,7 +117,7 @@ func newTestNode( executor coreexecutor.Executor, sequencer coresequencer.Sequencer, daClient block.DAClient, - nodeKey *key.NodeKey, + privKey crypto.PrivKey, ds datastore.Batching, stopDAHeightTicker func(), ) (*FullNode, func()) { @@ -133,13 +130,17 @@ func newTestNode( if testing.Verbose() { logger = zerolog.New(zerolog.NewTestWriter(t)) } + + p2pClient, err := newTestP2PClient(config, privKey, ds, genesis.ChainID, logger) + require.NoError(t, err) + node, err := NewNode( config, executor, sequencer, daClient, remoteSigner, - nodeKey, + p2pClient, genesis, ds, DefaultMetricsProvider(evconfig.DefaultInstrumentationConfig()), @@ -159,8 +160,8 @@ func newTestNode( func createNodeWithCleanup(t *testing.T, config evconfig.Config) (*FullNode, func()) { resetSharedDummyDA() - executor, sequencer, daClient, nodeKey, ds, stopDAHeightTicker := createTestComponents(t, config) - return newTestNode(t, config, executor, sequencer, daClient, nodeKey, ds, stopDAHeightTicker) + executor, sequencer, daClient, privKey, ds, stopDAHeightTicker := createTestComponents(t, config) + return newTestNode(t, config, executor, sequencer, daClient, privKey, ds, stopDAHeightTicker) } func createNodeWithCustomComponents( @@ -169,11 +170,11 @@ func createNodeWithCustomComponents( executor coreexecutor.Executor, sequencer coresequencer.Sequencer, daClient block.DAClient, - nodeKey *key.NodeKey, + privKey crypto.PrivKey, ds datastore.Batching, stopDAHeightTicker func(), ) (*FullNode, func()) { - return newTestNode(t, config, executor, sequencer, daClient, nodeKey, ds, stopDAHeightTicker) + return newTestNode(t, config, executor, sequencer, daClient, privKey, ds, stopDAHeightTicker) } // Creates the given number of nodes the given nodes using the given wait group to synchronize them @@ -192,24 +193,28 @@ func createNodesWithCleanup(t *testing.T, num int, config evconfig.Config) ([]*F aggListenAddress := config.P2P.ListenAddress aggPeers := config.P2P.Peers - executor, sequencer, daClient, aggP2PKey, ds, stopDAHeightTicker := createTestComponents(t, config) + executor, sequencer, daClient, aggPrivKey, ds, stopDAHeightTicker := createTestComponents(t, config) if d, ok := daClient.(*testda.DummyDA); ok { d.Reset() } - aggPeerID, err := peer.IDFromPrivateKey(aggP2PKey.PrivKey) + aggPeerID, err := peer.IDFromPrivateKey(aggPrivKey) require.NoError(err) logger := zerolog.Nop() if testing.Verbose() { logger = zerolog.New(zerolog.NewTestWriter(t)) } + + aggP2PClient, err := newTestP2PClient(config, aggPrivKey, ds, genesis.ChainID, logger) + require.NoError(err) + aggNode, err := NewNode( config, executor, sequencer, daClient, remoteSigner, - aggP2PKey, + aggP2PClient, genesis, ds, DefaultMetricsProvider(evconfig.DefaultInstrumentationConfig()), @@ -236,16 +241,19 @@ func createNodesWithCleanup(t *testing.T, num int, config evconfig.Config) ([]*F } config.P2P.ListenAddress = fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 40001+i) config.RPC.Address = fmt.Sprintf("127.0.0.1:%d", 8001+i) - executor, sequencer, daClient, nodeP2PKey, ds, stopDAHeightTicker := createTestComponents(t, config) + executor, sequencer, daClient, nodePrivKey, ds, stopDAHeightTicker := createTestComponents(t, config) stopDAHeightTicker() + nodeP2PClient, err := newTestP2PClient(config, nodePrivKey, ds, genesis.ChainID, logger) + require.NoError(err) + node, err := NewNode( config, executor, sequencer, daClient, nil, - nodeP2PKey, + nodeP2PClient, genesis, ds, DefaultMetricsProvider(evconfig.DefaultInstrumentationConfig()), @@ -257,7 +265,7 @@ func createNodesWithCleanup(t *testing.T, num int, config evconfig.Config) ([]*F // No-op: ticker already stopped } nodes[i], cleanups[i] = node.(*FullNode), cleanup - nodePeerID, err := peer.IDFromPrivateKey(nodeP2PKey.PrivKey) + nodePeerID, err := peer.IDFromPrivateKey(nodePrivKey) require.NoError(err) peersList = append(peersList, fmt.Sprintf("%s/p2p/%s", config.P2P.ListenAddress, nodePeerID.Loggable()["peerID"].(string))) } @@ -265,6 +273,11 @@ func createNodesWithCleanup(t *testing.T, num int, config evconfig.Config) ([]*F return nodes, cleanups } +// newTestP2PClient creates a p2p.Client for testing. +func newTestP2PClient(config evconfig.Config, privKey crypto.PrivKey, ds datastore.Batching, chainID string, logger zerolog.Logger) (*p2p.Client, error) { + return p2p.NewClient(config.P2P, privKey, ds, chainID, logger, nil) +} + // Helper to create N contexts and cancel functions func createNodeContexts(n int) ([]context.Context, []context.CancelFunc) { ctxs := make([]context.Context, n) diff --git a/node/light.go b/node/light.go index 8790507a07..7aeb9038b6 100644 --- a/node/light.go +++ b/node/light.go @@ -7,7 +7,6 @@ import ( "net/http" "time" - "github.com/evstack/ev-node/pkg/p2p/key" ds "github.com/ipfs/go-datastore" "github.com/rs/zerolog" @@ -39,15 +38,10 @@ type LightNode struct { func newLightNode( conf config.Config, genesis genesis.Genesis, - nodeKey *key.NodeKey, + p2pClient *p2p.Client, database ds.Batching, logger zerolog.Logger, ) (ln *LightNode, err error) { - p2pClient, err := p2p.NewClient(conf.P2P, nodeKey.PrivKey, database, genesis.ChainID, logger, nil) - if err != nil { - return nil, err - } - componentLogger := logger.With().Str("component", "HeaderSyncService").Logger() baseStore := store.New(database) @@ -166,9 +160,8 @@ func (ln *LightNode) Run(parentCtx context.Context) error { } // Stop P2P Client - err = ln.P2P.Close() - if err != nil { - multiErr = errors.Join(multiErr, fmt.Errorf("closing P2P client: %w", err)) + if err := ln.P2P.Close(); err != nil { + multiErr = errors.Join(multiErr, fmt.Errorf("closing p2p client: %w", err)) } if err = ln.Store.Close(); err != nil { diff --git a/node/light_test.go b/node/light_test.go index f459ad54b0..964cf66041 100644 --- a/node/light_test.go +++ b/node/light_test.go @@ -13,6 +13,7 @@ import ( "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/p2p" p2p_key "github.com/evstack/ev-node/pkg/p2p/key" ) @@ -39,7 +40,10 @@ func TestLightNodeLifecycle(t *testing.T) { logger := zerolog.Nop() db := ds_sync.MutexWrap(ds.NewMapDatastore()) - ln, err := newLightNode(conf, gen, p2pKey, db, logger) + p2pClient, err := p2p.NewClient(conf.P2P, p2pKey.PrivKey, db, gen.ChainID, logger, nil) + require.NoError(err) + + ln, err := newLightNode(conf, gen, p2pClient, db, logger) require.NoError(err) require.NotNil(ln) diff --git a/node/node.go b/node/node.go index 7ce087eeeb..139636d644 100644 --- a/node/node.go +++ b/node/node.go @@ -9,7 +9,7 @@ import ( coresequencer "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" - "github.com/evstack/ev-node/pkg/p2p/key" + "github.com/evstack/ev-node/pkg/p2p" "github.com/evstack/ev-node/pkg/service" "github.com/evstack/ev-node/pkg/signer" ) @@ -25,16 +25,19 @@ type NodeOptions struct { BlockOptions block.BlockOptions } -// NewNode returns a new Full or Light Node based on the config -// This is the entry point for composing a node, when compiling a node, you need to provide an executor +// NewNode returns a new Full or Light Node based on the config. +// This is the entry point for composing a node, when compiling a node, you need to provide an executor. // Example executors can be found in apps/ +// +// The p2pClient owns the node identity (private key) and is shared across +// mode switches. It supports in-place reconfiguration via Reconfigure(). func NewNode( conf config.Config, exec coreexecutor.Executor, sequencer coresequencer.Sequencer, daClient block.DAClient, signer signer.Signer, - nodeKey *key.NodeKey, + p2pClient *p2p.Client, genesis genesis.Genesis, database ds.Batching, metricsProvider MetricsProvider, @@ -42,7 +45,7 @@ func NewNode( nodeOptions NodeOptions, ) (Node, error) { if conf.Node.Light { - return newLightNode(conf, genesis, nodeKey, database, logger) + return newLightNode(conf, genesis, p2pClient, database, logger) } if err := nodeOptions.BlockOptions.Validate(); err != nil { @@ -51,7 +54,7 @@ func NewNode( return newFullNode( conf, - nodeKey, + p2pClient, signer, genesis, database, diff --git a/node/single_sequencer_integration_test.go b/node/single_sequencer_integration_test.go index 0a798afa6c..130b62e235 100644 --- a/node/single_sequencer_integration_test.go +++ b/node/single_sequencer_integration_test.go @@ -234,10 +234,10 @@ func TestStateRecovery(t *testing.T) { // Set up one sequencer config := getTestConfig(t, 1) - executor, sequencer, dac, nodeKey, _, stopDAHeightTicker := createTestComponents(t, config) + executor, sequencer, dac, privKey, _, stopDAHeightTicker := createTestComponents(t, config) ds, err := store.NewDefaultKVStore(config.RootDir, "db", "test") require.NoError(err) - node, cleanup := createNodeWithCustomComponents(t, config, executor, sequencer, dac, nodeKey, ds, stopDAHeightTicker) + node, cleanup := createNodeWithCustomComponents(t, config, executor, sequencer, dac, privKey, ds, stopDAHeightTicker) defer cleanup() var runningWg sync.WaitGroup @@ -262,10 +262,10 @@ func TestStateRecovery(t *testing.T) { shutdownAndWait(t, []context.CancelFunc{cancel}, &runningWg, 60*time.Second) // Create a new node instance using the same components - executor, sequencer, dac, nodeKey, _, stopDAHeightTicker = createTestComponents(t, config) + executor, sequencer, dac, privKey, _, stopDAHeightTicker = createTestComponents(t, config) ds, err = store.NewDefaultKVStore(config.RootDir, "db", "test") require.NoError(err) - node, cleanup = createNodeWithCustomComponents(t, config, executor, sequencer, dac, nodeKey, ds, stopDAHeightTicker) + node, cleanup = createNodeWithCustomComponents(t, config, executor, sequencer, dac, privKey, ds, stopDAHeightTicker) defer cleanup() // Verify state persistence @@ -319,7 +319,7 @@ func TestBatchQueueThrottlingWithDAFailure(t *testing.T) { config.DA.BlockTime = evconfig.DurationWrapper{Duration: 100 * time.Millisecond} // Longer DA time to ensure blocks are produced first // Create test components - executor, sequencer, dummyDA, ds, nodeKey, stopDAHeightTicker := createTestComponents(t, config) + executor, sequencer, dummyDA, privKey, ds, stopDAHeightTicker := createTestComponents(t, config) defer stopDAHeightTicker() // Cast executor to DummyExecutor so we can inject transactions @@ -331,7 +331,7 @@ func TestBatchQueueThrottlingWithDAFailure(t *testing.T) { require.True(ok, "Expected testda.DummyDA implementation") // Create node with components - node, cleanup := createNodeWithCustomComponents(t, config, executor, sequencer, dummyDAImpl, ds, nodeKey, func() {}) + node, cleanup := createNodeWithCustomComponents(t, config, executor, sequencer, dummyDAImpl, privKey, ds, func() {}) defer cleanup() ctx, cancel := context.WithCancel(t.Context()) diff --git a/pkg/cmd/run_node.go b/pkg/cmd/run_node.go index aa6a01b7ae..df96a0e371 100644 --- a/pkg/cmd/run_node.go +++ b/pkg/cmd/run_node.go @@ -23,6 +23,7 @@ import ( rollconf "github.com/evstack/ev-node/pkg/config" blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" genesispkg "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/p2p" "github.com/evstack/ev-node/pkg/p2p/key" "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/signer/file" @@ -163,6 +164,13 @@ func StartNode( executor = telemetry.WithTracingExecutor(executor) } + // Create the P2P client. It is long-lived and reconfigured in-place + // on mode switches, avoiding costly teardown of the libp2p stack. + p2pClient, err := p2p.NewClient(nodeConfig.P2P, nodeKey.PrivKey, datastore, genesis.ChainID, logger, nil) + if err != nil { + return fmt.Errorf("create p2p client: %w", err) + } + // Create and start the node rollnode, err := node.NewNode( nodeConfig, @@ -170,7 +178,7 @@ func StartNode( sequencer, daClient, signer, - nodeKey, + p2pClient, genesis, datastore, metrics, diff --git a/pkg/cmd/run_node_test.go b/pkg/cmd/run_node_test.go index 2e58c1c637..39ff4261b6 100644 --- a/pkg/cmd/run_node_test.go +++ b/pkg/cmd/run_node_test.go @@ -20,6 +20,7 @@ import ( "github.com/rs/zerolog" "github.com/spf13/cobra" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func createTestComponents(_ context.Context, t *testing.T) (coreexecutor.Executor, coresequencer.Sequencer, signer.Signer, *key.NodeKey, datastore.Batching, func()) { @@ -33,7 +34,11 @@ func createTestComponents(_ context.Context, t *testing.T) (coreexecutor.Executo // Create a dummy P2P client and datastore for testing ds := datastore.NewMapDatastore() - return executor, sequencer, keyProvider, nil, ds, func() {} + // Generate a dummy node key for the P2P client + nodeKey, err := key.GenerateNodeKey() + require.NoError(t, err) + + return executor, sequencer, keyProvider, nodeKey, ds, func() {} } func TestParseFlags(t *testing.T) { diff --git a/pkg/p2p/client.go b/pkg/p2p/client.go index a1c8de94ba..7859c3cf97 100644 --- a/pkg/p2p/client.go +++ b/pkg/p2p/client.go @@ -49,11 +49,13 @@ type Client struct { chainID string privKey crypto.PrivKey - host host.Host - dht *dht.IpfsDHT - disc *discovery.RoutingDiscovery - gater *conngater.BasicConnectionGater - ps *pubsub.PubSub + rawHost host.Host // unwrapped libp2p host, stored to avoid double-wrapping via routedhost + host host.Host // may be wrapped with routedhost after DHT setup + dht *dht.IpfsDHT + disc *discovery.RoutingDiscovery + gater *conngater.BasicConnectionGater + ps *pubsub.PubSub + started bool metrics *Metrics } @@ -121,9 +123,13 @@ func NewClientWithHost( // 3. Setup DHT, establish connection to seed nodes and initialize peer discovery. // 4. Use active peer discovery to look for peers from same ORU network. func (c *Client) Start(ctx context.Context) error { + if c.started { + return nil // already started — called from FullNode.Run() + } c.logger.Debug().Msg("starting P2P client") if c.host != nil { + c.rawHost = c.host return c.startWithHost(ctx, c.host) } @@ -131,6 +137,7 @@ func (c *Client) Start(ctx context.Context) error { if err != nil { return err } + c.rawHost = h return c.startWithHost(ctx, h) } @@ -165,19 +172,32 @@ func (c *Client) startWithHost(ctx context.Context, h host.Host) error { return err } + c.started = true return nil } // Close gently stops Client. func (c *Client) Close() error { - var dhtErr, hostErr error + var err error if c.dht != nil { - dhtErr = c.dht.Close() + err = errors.Join(err, c.dht.Close()) } if c.host != nil { - hostErr = c.host.Close() + err = errors.Join(err, c.host.Close()) } - return errors.Join(dhtErr, hostErr) + return err +} + +// PrivKey returns the node's private key. +func (c *Client) PrivKey() crypto.PrivKey { + return c.privKey +} + +// Reconfigure updates the mutable P2P configuration without tearing down +// the libp2p host, PubSub, or DHT. Currently this only updates the +// stored config; the sync service gates peer usage on conf.Node.Aggregator. +func (c *Client) Reconfigure(conf config.P2PConfig) { + c.conf = conf } // Addrs returns listen addresses of Client. diff --git a/pkg/p2p/utils_test.go b/pkg/p2p/utils_test.go index 5bd5664dd2..e14a621233 100644 --- a/pkg/p2p/utils_test.go +++ b/pkg/p2p/utils_test.go @@ -3,7 +3,6 @@ package p2p import ( "context" "crypto/rand" - "errors" "fmt" "net" "path/filepath" @@ -25,11 +24,10 @@ import ( type testNet []*Client -func (tn testNet) Close() (err error) { +func (tn testNet) Close() { for i := range tn { - err = errors.Join(err, tn[i].Close()) + _ = tn[i].Close() } - return } func (tn testNet) WaitForDHT() { diff --git a/pkg/store/header_store_adapter_test.go b/pkg/store/header_store_adapter_test.go index c80374aa71..9cb02c809e 100644 --- a/pkg/store/header_store_adapter_test.go +++ b/pkg/store/header_store_adapter_test.go @@ -599,7 +599,7 @@ func TestHeaderStoreAdapter_HeadPrefersPending(t *testing.T) { func TestHeaderStoreAdapter_GetFromPendingByHash(t *testing.T) { t.Parallel() - ctx := context.Background() + ctx := t.Context() ds, err := NewTestInMemoryKVStore() require.NoError(t, err) @@ -615,3 +615,87 @@ func TestHeaderStoreAdapter_GetFromPendingByHash(t *testing.T) { require.NoError(t, err) assert.Equal(t, h1.Height(), retrieved.Height()) } + +// TestHeaderStoreGetter_HeightGuard verifies that HeaderStoreGetter.GetByHeight +// and HasAt respect the committed store height. Data written to the datastore +// without updating store.Height() (like the executor's crash-recovery early save) +// must NOT be visible through the getter. +func TestHeaderStoreGetter_HeightGuard(t *testing.T) { + t.Parallel() + ctx := t.Context() + + ds, err := NewTestInMemoryKVStore() + require.NoError(t, err) + store := New(ds) + getter := NewHeaderStoreGetter(store) + + h1, d1 := types.GetRandomBlock(1, 2, "test-chain") + h2, d2 := types.GetRandomBlock(2, 2, "test-chain") + + specs := map[string]struct { + setup func() + height uint64 + expFound bool + expHasAt bool + }{ + "data at height without height update is invisible": { + setup: func() { + // Simulate the executor's early save: write data but do NOT call SetHeight. + batch, bErr := store.NewBatch(ctx) + require.NoError(t, bErr) + require.NoError(t, batch.SaveBlockData(h1, d1, &types.Signature{})) + require.NoError(t, batch.Commit()) + }, + height: 1, + expFound: false, + expHasAt: false, + }, + "data becomes visible after height is updated": { + setup: func() { + // Now commit the signed version with SetHeight (the final save). + batch, bErr := store.NewBatch(ctx) + require.NoError(t, bErr) + require.NoError(t, batch.SaveBlockData(h1, d1, &h1.Signature)) + require.NoError(t, batch.SetHeight(1)) + require.NoError(t, batch.Commit()) + }, + height: 1, + expFound: true, + expHasAt: true, + }, + "height above committed store height is invisible": { + setup: func() { + // Save h2 data but only set height to 1. + batch, bErr := store.NewBatch(ctx) + require.NoError(t, bErr) + require.NoError(t, batch.SaveBlockData(h2, d2, &types.Signature{})) + require.NoError(t, batch.Commit()) + }, + height: 2, + expFound: false, + expHasAt: false, + }, + } + + // Run in defined order since each step builds on the previous state. + for _, name := range []string{ + "data at height without height update is invisible", + "data becomes visible after height is updated", + "height above committed store height is invisible", + } { + spec := specs[name] + t.Run(name, func(t *testing.T) { + spec.setup() + + got, err := getter.GetByHeight(ctx, spec.height) + if spec.expFound { + require.NoError(t, err) + assert.Equal(t, spec.height, got.Height()) + } else { + require.Error(t, err) + } + + assert.Equal(t, spec.expHasAt, getter.HasAt(ctx, spec.height)) + }) + } +} diff --git a/pkg/store/store_adapter.go b/pkg/store/store_adapter.go index 08743c6eb9..33d8050601 100644 --- a/pkg/store/store_adapter.go +++ b/pkg/store/store_adapter.go @@ -746,7 +746,15 @@ func NewHeaderStoreGetter(store Store) *HeaderStoreGetter { // GetByHeight implements StoreGetter. func (g *HeaderStoreGetter) GetByHeight(ctx context.Context, height uint64) (*types.P2PSignedHeader, error) { - header, err := g.store.GetHeader(ctx, height) + // Guard: only return headers at or below the committed store height. + // The executor's early save writes an unsigned header to the datastore + // before updating store.Height(), so without this check the P2P layer + // could serve an unsigned header to peers. + storeHeight, err := g.store.Height(ctx) + if err != nil || height > storeHeight { + return nil, header.ErrNotFound + } + hdr, err := g.store.GetHeader(ctx, height) if err != nil { return nil, err } @@ -754,7 +762,7 @@ func (g *HeaderStoreGetter) GetByHeight(ctx context.Context, height uint64) (*ty daHint, _ := g.GetDAHint(ctx, height) return &types.P2PSignedHeader{ - SignedHeader: header, + SignedHeader: hdr, DAHeightHint: daHint, }, nil } @@ -800,7 +808,11 @@ func (g *HeaderStoreGetter) Height(ctx context.Context) (uint64, error) { // HasAt implements StoreGetter. func (g *HeaderStoreGetter) HasAt(ctx context.Context, height uint64) bool { - _, err := g.store.GetHeader(ctx, height) + storeHeight, err := g.store.Height(ctx) + if err != nil || height > storeHeight { + return false + } + _, err = g.store.GetHeader(ctx, height) return err == nil } @@ -816,6 +828,11 @@ func NewDataStoreGetter(store Store) *DataStoreGetter { // GetByHeight implements StoreGetter. func (g *DataStoreGetter) GetByHeight(ctx context.Context, height uint64) (*types.P2PData, error) { + // Guard: only return data at or below the committed store height. + storeHeight, err := g.store.Height(ctx) + if err != nil || height > storeHeight { + return nil, header.ErrNotFound + } _, data, err := g.store.GetBlockData(ctx, height) if err != nil { return nil, err @@ -870,7 +887,11 @@ func (g *DataStoreGetter) Height(ctx context.Context) (uint64, error) { // HasAt implements StoreGetter. func (g *DataStoreGetter) HasAt(ctx context.Context, height uint64) bool { - _, _, err := g.store.GetBlockData(ctx, height) + storeHeight, err := g.store.Height(ctx) + if err != nil || height > storeHeight { + return false + } + _, _, err = g.store.GetBlockData(ctx, height) return err == nil } diff --git a/pkg/sync/sync_service.go b/pkg/sync/sync_service.go index 8567e79764..b65b855a4b 100644 --- a/pkg/sync/sync_service.go +++ b/pkg/sync/sync_service.go @@ -20,7 +20,6 @@ import ( "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" - "github.com/evstack/ev-node/pkg/p2p" "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/types" ) @@ -38,6 +37,15 @@ type HeaderSyncService = SyncService[*types.P2PSignedHeader] // DataSyncService is the P2P Sync Service for blocks. type DataSyncService = SyncService[*types.P2PData] +// P2PClient defines the interface for P2P client operations needed by the sync service. +type P2PClient interface { + PubSub() *pubsub.PubSub + Info() (string, string, string, error) + Host() host.Host + ConnectionGater() *conngater.BasicConnectionGater + PeerIDs() []peer.ID +} + // SyncService is the P2P Sync Service for blocks and headers. // // Uses the go-header library for handling all P2P logic. @@ -48,7 +56,7 @@ type SyncService[H store.EntityWithDAHint[H]] struct { genesis genesis.Genesis - p2p *p2p.Client + p2p P2PClient ex *goheaderp2p.Exchange[H] sub *goheaderp2p.Subscriber[H] @@ -66,7 +74,7 @@ func NewDataSyncService( evStore store.Store, conf config.Config, genesis genesis.Genesis, - p2p *p2p.Client, + p2p P2PClient, logger zerolog.Logger, ) (*DataSyncService, error) { storeAdapter := store.NewDataStoreAdapter(evStore, genesis) @@ -78,7 +86,7 @@ func NewHeaderSyncService( evStore store.Store, conf config.Config, genesis genesis.Genesis, - p2p *p2p.Client, + p2p P2PClient, logger zerolog.Logger, ) (*HeaderSyncService, error) { storeAdapter := store.NewHeaderStoreAdapter(evStore, genesis) @@ -90,7 +98,7 @@ func newSyncService[H store.EntityWithDAHint[H]]( syncType syncType, conf config.Config, genesis genesis.Genesis, - p2p *p2p.Client, + p2p P2PClient, logger zerolog.Logger, ) (*SyncService[H], error) { if p2p == nil { diff --git a/test/e2e/failover_e2e_test.go b/test/e2e/failover_e2e_test.go index 580f3141ce..cc0276e43a 100644 --- a/test/e2e/failover_e2e_test.go +++ b/test/e2e/failover_e2e_test.go @@ -384,6 +384,12 @@ func TestHASequencerRollingRestartE2E(t *testing.T) { nodeJWT := getNodeJWT(nodeName) p2pPeers := getP2PPeers(nodeName) + // Kill old process just in case + if nodeDetails.IsRunning() { + _ = nodeDetails.Kill() + time.Sleep(200 * time.Millisecond) + } + restartedProc := setupRaftSequencerNode(t, sut, workDir, nodeName, nodeDetails.raftAddr, nodeJWT, genesisHash, testEndpoints.GetDAAddress(), "", raftCluster, p2pPeers, strings.TrimPrefix(nodeDetails.rpcAddr, "http://"), nodeDetails.p2pAddr, From bdca5fbfdcab5c5307ab038734e99bf9d2c81291 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Thu, 12 Feb 2026 15:00:55 +0100 Subject: [PATCH 2/3] x --- apps/evm/go.mod | 8 +- apps/evm/go.sum | 4 - block/internal/executing/executor.go | 21 ++--- .../executing/executor_restart_test.go | 7 +- block/internal/executing/pending.go | 89 +++++++++++++++++++ block/internal/syncing/syncer.go | 15 +++- pkg/store/header_store_adapter_test.go | 84 ----------------- pkg/store/keys.go | 2 +- pkg/store/store.go | 13 +-- pkg/store/store_adapter.go | 29 +----- test/e2e/evm_full_node_e2e_test.go | 2 +- test/e2e/evm_test_common.go | 2 + 12 files changed, 131 insertions(+), 145 deletions(-) create mode 100644 block/internal/executing/pending.go diff --git a/apps/evm/go.mod b/apps/evm/go.mod index ed4b6c5126..4052c3afb2 100644 --- a/apps/evm/go.mod +++ b/apps/evm/go.mod @@ -2,10 +2,10 @@ module github.com/evstack/ev-node/apps/evm go 1.25.6 -//replace ( -// github.com/evstack/ev-node => ../../ -// github.com/evstack/ev-node/execution/evm => ../../execution/evm -//) +replace ( + github.com/evstack/ev-node => ../../ + github.com/evstack/ev-node/execution/evm => ../../execution/evm +) require ( github.com/ethereum/go-ethereum v1.16.8 diff --git a/apps/evm/go.sum b/apps/evm/go.sum index 00e5995e9a..49e723062b 100644 --- a/apps/evm/go.sum +++ b/apps/evm/go.sum @@ -411,12 +411,8 @@ github.com/ethereum/go-ethereum v1.16.8 h1:LLLfkZWijhR5m6yrAXbdlTeXoqontH+Ga2f9i github.com/ethereum/go-ethereum v1.16.8/go.mod h1:Fs6QebQbavneQTYcA39PEKv2+zIjX7rPUZ14DER46wk= github.com/ethereum/go-verkle v0.2.2 h1:I2W0WjnrFUIzzVPwm8ykY+7pL2d4VhlsePn4j7cnFk8= github.com/ethereum/go-verkle v0.2.2/go.mod h1:M3b90YRnzqKyyzBEWJGqj8Qff4IDeXnzFw0P9bFw3uk= -github.com/evstack/ev-node v1.0.0-rc.4 h1:Ju7pSETFdadBZxmAj0//4z7hHkXbSRDy9iTzhF60Dew= -github.com/evstack/ev-node v1.0.0-rc.4/go.mod h1:xGCH5NCdGiYk6v3GVPm4NhzAtcKQgnaVnORg8b4tbOk= github.com/evstack/ev-node/core v1.0.0-rc.1 h1:Dic2PMUMAYUl5JW6DkDj6HXDEWYzorVJQuuUJOV0FjE= github.com/evstack/ev-node/core v1.0.0-rc.1/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY= -github.com/evstack/ev-node/execution/evm v1.0.0-rc.3 h1:3o8H1TNywnst56lo2RlS2SXulDfp9yZJtkYYh7ZJrdM= -github.com/evstack/ev-node/execution/evm v1.0.0-rc.3/go.mod h1:VUEEklKoclg45GL7dzLoDwu3UQ4ptT3rF8bw5zUmnRk= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 94417b9b79..786816d0ab 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -429,12 +429,12 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { // Check if there's an already stored block at the newHeight // If there is use that instead of creating a new block - pendingHeader, pendingData, err := e.store.GetBlockData(ctx, newHeight) - if err == nil { + pendingHeader, pendingData, err := e.getPendingBlock(ctx) + if err == nil && pendingHeader != nil && pendingHeader.Height() == newHeight { e.logger.Info().Uint64("height", newHeight).Msg("using pending block") header = pendingHeader data = pendingData - } else if !errors.Is(err, datastore.ErrNotFound) { + } else if err != nil && !errors.Is(err, datastore.ErrNotFound) { return fmt.Errorf("failed to get block data: %w", err) } else { // get batch from sequencer @@ -452,18 +452,9 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to create block: %w", err) } - - // saved early for crash recovery, will be overwritten later with the final signature - batch, err := e.store.NewBatch(ctx) - if err != nil { - return fmt.Errorf("failed to create batch for early save: %w", err) - } - if err = batch.SaveBlockData(header, data, &types.Signature{}); err != nil { + if err := e.savePendingBlock(ctx, header, data); err != nil { return fmt.Errorf("failed to save block data: %w", err) } - if err = batch.Commit(); err != nil { - return fmt.Errorf("failed to commit early save batch: %w", err) - } } if e.raftNode != nil && !e.raftNode.HasQuorum() { @@ -535,6 +526,10 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { } e.logger.Debug().Uint64("height", newHeight).Msg("proposed block to raft") } + if err := e.deletePendingBlock(e.ctx, batch); err != nil { + e.logger.Warn().Err(err).Uint64("height", newHeight).Msg("failed to delete pending block metadata") + } + if err := batch.Commit(); err != nil { return fmt.Errorf("failed to commit batch: %w", err) } diff --git a/block/internal/executing/executor_restart_test.go b/block/internal/executing/executor_restart_test.go index 571bc75214..b72888860b 100644 --- a/block/internal/executing/executor_restart_test.go +++ b/block/internal/executing/executor_restart_test.go @@ -158,11 +158,8 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) { pendingHeader.DataHash = pendingData.DACommitment() // Save pending block data (this is what would happen during a crash) - batch, err := memStore.NewBatch(context.Background()) - require.NoError(t, err) - err = batch.SaveBlockData(pendingHeader, pendingData, &types.Signature{}) - require.NoError(t, err) - err = batch.Commit() + // We use savePendingBlock directly which writes to the metadata keys expected by the executor on restart + err = exec1.savePendingBlock(context.Background(), pendingHeader, pendingData) require.NoError(t, err) // Stop first executor (simulating crash/restart) diff --git a/block/internal/executing/pending.go b/block/internal/executing/pending.go new file mode 100644 index 0000000000..687b5de26c --- /dev/null +++ b/block/internal/executing/pending.go @@ -0,0 +1,89 @@ +package executing + +import ( + "context" + "errors" + "fmt" + + "github.com/evstack/ev-node/pkg/store" + "github.com/evstack/ev-node/types" + ds "github.com/ipfs/go-datastore" +) + +const ( + headerKey = "pending_header" + dataKey = "pending_data" +) + +// getPendingBlock retrieves the pending block from metadata if it exists +func (e *Executor) getPendingBlock(ctx context.Context) (*types.SignedHeader, *types.Data, error) { + headerBytes, err := e.store.GetMetadata(ctx, headerKey) + if err != nil { + if errors.Is(err, ds.ErrNotFound) { + return nil, nil, nil + } + return nil, nil, err + } + + dataBytes, err := e.store.GetMetadata(ctx, dataKey) + if err != nil { + if errors.Is(err, ds.ErrNotFound) { + return nil, nil, nil + } + return nil, nil, err + } + + header := new(types.SignedHeader) + if err := header.UnmarshalBinary(headerBytes); err != nil { + return nil, nil, fmt.Errorf("unmarshal pending header: %w", err) + } + + data := new(types.Data) + if err := data.UnmarshalBinary(dataBytes); err != nil { + return nil, nil, fmt.Errorf("unmarshal pending data: %w", err) + } + return header, data, nil +} + +// savePendingBlock saves a block to metadata as pending +func (e *Executor) savePendingBlock(ctx context.Context, header *types.SignedHeader, data *types.Data) error { + headerBytes, err := header.MarshalBinary() + if err != nil { + return fmt.Errorf("marshal header: %w", err) + } + + dataBytes, err := data.MarshalBinary() + if err != nil { + return fmt.Errorf("marshal data: %w", err) + } + + batch, err := e.store.NewBatch(ctx) + if err != nil { + return fmt.Errorf("create batch for early save: %w", err) + } + + if err := batch.Put(ds.NewKey(store.GetMetaKey(headerKey)), headerBytes); err != nil { + return fmt.Errorf("save pending header: %w", err) + } + + if err := batch.Put(ds.NewKey(store.GetMetaKey(dataKey)), dataBytes); err != nil { + return fmt.Errorf("save pending data: %w", err) + } + + if err := batch.Commit(); err != nil { + return fmt.Errorf("commit pending block: %w", err) + } + return nil +} + +// deletePendingBlock removes pending block metadata +func (e *Executor) deletePendingBlock(ctx context.Context, batch store.Batch) error { + if err := batch.Delete(ds.NewKey(store.GetMetaKey(headerKey))); err != nil { + return fmt.Errorf("delete pending header: %w", err) + } + + if err := batch.Delete(ds.NewKey(store.GetMetaKey(dataKey))); err != nil { + return fmt.Errorf("delete pending data: %w", err) + } + return nil +} diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 939abb0389..1548b07a55 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -722,7 +722,7 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve currentState := s.getLastState() headerHash := header.Hash().String() - s.logger.Info().Uint64("height", nextHeight).Msg("syncing block") + s.logger.Info().Uint64("height", nextHeight).Msg("syncing block started") // Compared to the executor logic where the current block needs to be applied first, // here only the previous block needs to be applied to proceed to the verification. @@ -732,6 +732,17 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve s.cache.RemoveHeaderDAIncluded(headerHash) s.cache.RemoveDataDAIncluded(data.DACommitment().String()) + s.logger.Warn(). + Err(err). + Uint64("height", header.Height()). + Uint64("time", uint64(header.Time().Unix())). + Hex("proposer", header.ProposerAddress). + Str("data_hash", hex.EncodeToString(header.DataHash)). + Str("app_hash", hex.EncodeToString(header.AppHash)). + Hex("last_header_hash", header.LastHeaderHash). + Int("len signature", len(header.Signature)). + Msg("block validation failed") + if !errors.Is(err, errInvalidState) && !errors.Is(err, errInvalidBlock) { return errors.Join(errInvalidBlock, err) } @@ -799,7 +810,7 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve if s.p2pHandler != nil { s.p2pHandler.SetProcessedHeight(newState.LastBlockHeight) } - + s.logger.Info().Uint64("height", nextHeight).Msg("syncing block completed") return nil } diff --git a/pkg/store/header_store_adapter_test.go b/pkg/store/header_store_adapter_test.go index 9cb02c809e..33f804c676 100644 --- a/pkg/store/header_store_adapter_test.go +++ b/pkg/store/header_store_adapter_test.go @@ -615,87 +615,3 @@ func TestHeaderStoreAdapter_GetFromPendingByHash(t *testing.T) { require.NoError(t, err) assert.Equal(t, h1.Height(), retrieved.Height()) } - -// TestHeaderStoreGetter_HeightGuard verifies that HeaderStoreGetter.GetByHeight -// and HasAt respect the committed store height. Data written to the datastore -// without updating store.Height() (like the executor's crash-recovery early save) -// must NOT be visible through the getter. -func TestHeaderStoreGetter_HeightGuard(t *testing.T) { - t.Parallel() - ctx := t.Context() - - ds, err := NewTestInMemoryKVStore() - require.NoError(t, err) - store := New(ds) - getter := NewHeaderStoreGetter(store) - - h1, d1 := types.GetRandomBlock(1, 2, "test-chain") - h2, d2 := types.GetRandomBlock(2, 2, "test-chain") - - specs := map[string]struct { - setup func() - height uint64 - expFound bool - expHasAt bool - }{ - "data at height without height update is invisible": { - setup: func() { - // Simulate the executor's early save: write data but do NOT call SetHeight. - batch, bErr := store.NewBatch(ctx) - require.NoError(t, bErr) - require.NoError(t, batch.SaveBlockData(h1, d1, &types.Signature{})) - require.NoError(t, batch.Commit()) - }, - height: 1, - expFound: false, - expHasAt: false, - }, - "data becomes visible after height is updated": { - setup: func() { - // Now commit the signed version with SetHeight (the final save). - batch, bErr := store.NewBatch(ctx) - require.NoError(t, bErr) - require.NoError(t, batch.SaveBlockData(h1, d1, &h1.Signature)) - require.NoError(t, batch.SetHeight(1)) - require.NoError(t, batch.Commit()) - }, - height: 1, - expFound: true, - expHasAt: true, - }, - "height above committed store height is invisible": { - setup: func() { - // Save h2 data but only set height to 1. - batch, bErr := store.NewBatch(ctx) - require.NoError(t, bErr) - require.NoError(t, batch.SaveBlockData(h2, d2, &types.Signature{})) - require.NoError(t, batch.Commit()) - }, - height: 2, - expFound: false, - expHasAt: false, - }, - } - - // Run in defined order since each step builds on the previous state. - for _, name := range []string{ - "data at height without height update is invisible", - "data becomes visible after height is updated", - "height above committed store height is invisible", - } { - spec := specs[name] - t.Run(name, func(t *testing.T) { - spec.setup() - - got, err := getter.GetByHeight(ctx, spec.height) - if spec.expFound { - require.NoError(t, err) - assert.Equal(t, spec.height, got.Height()) - } else { - require.Error(t, err) - } - - assert.Equal(t, spec.expHasAt, getter.HasAt(ctx, spec.height)) - }) - } -} diff --git a/pkg/store/keys.go b/pkg/store/keys.go index dd989c0e82..520567d3e1 100644 --- a/pkg/store/keys.go +++ b/pkg/store/keys.go @@ -50,7 +50,7 @@ func getStateAtHeightKey(height uint64) string { return GenerateKey([]string{statePrefix, strconv.FormatUint(height, 10)}) } -func getMetaKey(key string) string { +func GetMetaKey(key string) string { return GenerateKey([]string{metaPrefix, key}) } diff --git a/pkg/store/store.go b/pkg/store/store.go index eafa47ae75..79c3f1deaf 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -113,6 +113,7 @@ func (s *DefaultStore) GetHeader(ctx context.Context, height uint64) (*types.Sig if err = header.UnmarshalBinary(headerBlob); err != nil { return nil, fmt.Errorf("unmarshal block header: %w", err) } + return header, nil } @@ -176,7 +177,7 @@ func (s *DefaultStore) GetStateAtHeight(ctx context.Context, height uint64) (typ // // Metadata is separated from other data by using prefix in KV. func (s *DefaultStore) SetMetadata(ctx context.Context, key string, value []byte) error { - err := s.db.Put(ctx, ds.NewKey(getMetaKey(key)), value) + err := s.db.Put(ctx, ds.NewKey(GetMetaKey(key)), value) if err != nil { return fmt.Errorf("failed to set metadata for key '%s': %w", key, err) } @@ -185,7 +186,7 @@ func (s *DefaultStore) SetMetadata(ctx context.Context, key string, value []byte // GetMetadata returns values stored for given key with SetMetadata. func (s *DefaultStore) GetMetadata(ctx context.Context, key string) ([]byte, error) { - data, err := s.db.Get(ctx, ds.NewKey(getMetaKey(key))) + data, err := s.db.Get(ctx, ds.NewKey(GetMetaKey(key))) if err != nil { return nil, fmt.Errorf("failed to get metadata for key '%s': %w", key, err) } @@ -196,7 +197,7 @@ func (s *DefaultStore) GetMetadata(ctx context.Context, key string) ([]byte, err // This is more efficient than iterating through known keys when the set of keys is unknown. func (s *DefaultStore) GetMetadataByPrefix(ctx context.Context, prefix string) ([]MetadataEntry, error) { // The full key in the datastore includes the meta prefix - fullPrefix := getMetaKey(prefix) + fullPrefix := GetMetaKey(prefix) results, err := s.db.Query(ctx, dsq.Query{Prefix: fullPrefix}) if err != nil { @@ -213,7 +214,7 @@ func (s *DefaultStore) GetMetadataByPrefix(ctx context.Context, prefix string) ( // Extract the original key by removing the meta prefix // The key from datastore is like "/m/cache/header-da-included/hash" // We want to return "cache/header-da-included/hash" - metaKeyPrefix := getMetaKey("") + metaKeyPrefix := GetMetaKey("") key := strings.TrimPrefix(result.Key, metaKeyPrefix) key = strings.TrimPrefix(key, "/") // Remove leading slash for consistency @@ -228,7 +229,7 @@ func (s *DefaultStore) GetMetadataByPrefix(ctx context.Context, prefix string) ( // DeleteMetadata removes a metadata key from the store. func (s *DefaultStore) DeleteMetadata(ctx context.Context, key string) error { - err := s.db.Delete(ctx, ds.NewKey(getMetaKey(key))) + err := s.db.Delete(ctx, ds.NewKey(GetMetaKey(key))) if err != nil { return fmt.Errorf("failed to delete metadata for key '%s': %w", key, err) } @@ -279,7 +280,7 @@ func (s *DefaultStore) Rollback(ctx context.Context, height uint64, aggregator b } else { // in case of syncing issues, rollback the included height is OK. bz := make([]byte, 8) binary.LittleEndian.PutUint64(bz, height) - if err := batch.Put(ctx, ds.NewKey(getMetaKey(DAIncludedHeightKey)), bz); err != nil { + if err := batch.Put(ctx, ds.NewKey(GetMetaKey(DAIncludedHeightKey)), bz); err != nil { return fmt.Errorf("failed to update DA included height: %w", err) } } diff --git a/pkg/store/store_adapter.go b/pkg/store/store_adapter.go index 33d8050601..08743c6eb9 100644 --- a/pkg/store/store_adapter.go +++ b/pkg/store/store_adapter.go @@ -746,15 +746,7 @@ func NewHeaderStoreGetter(store Store) *HeaderStoreGetter { // GetByHeight implements StoreGetter. func (g *HeaderStoreGetter) GetByHeight(ctx context.Context, height uint64) (*types.P2PSignedHeader, error) { - // Guard: only return headers at or below the committed store height. - // The executor's early save writes an unsigned header to the datastore - // before updating store.Height(), so without this check the P2P layer - // could serve an unsigned header to peers. - storeHeight, err := g.store.Height(ctx) - if err != nil || height > storeHeight { - return nil, header.ErrNotFound - } - hdr, err := g.store.GetHeader(ctx, height) + header, err := g.store.GetHeader(ctx, height) if err != nil { return nil, err } @@ -762,7 +754,7 @@ func (g *HeaderStoreGetter) GetByHeight(ctx context.Context, height uint64) (*ty daHint, _ := g.GetDAHint(ctx, height) return &types.P2PSignedHeader{ - SignedHeader: hdr, + SignedHeader: header, DAHeightHint: daHint, }, nil } @@ -808,11 +800,7 @@ func (g *HeaderStoreGetter) Height(ctx context.Context) (uint64, error) { // HasAt implements StoreGetter. func (g *HeaderStoreGetter) HasAt(ctx context.Context, height uint64) bool { - storeHeight, err := g.store.Height(ctx) - if err != nil || height > storeHeight { - return false - } - _, err = g.store.GetHeader(ctx, height) + _, err := g.store.GetHeader(ctx, height) return err == nil } @@ -828,11 +816,6 @@ func NewDataStoreGetter(store Store) *DataStoreGetter { // GetByHeight implements StoreGetter. func (g *DataStoreGetter) GetByHeight(ctx context.Context, height uint64) (*types.P2PData, error) { - // Guard: only return data at or below the committed store height. - storeHeight, err := g.store.Height(ctx) - if err != nil || height > storeHeight { - return nil, header.ErrNotFound - } _, data, err := g.store.GetBlockData(ctx, height) if err != nil { return nil, err @@ -887,11 +870,7 @@ func (g *DataStoreGetter) Height(ctx context.Context) (uint64, error) { // HasAt implements StoreGetter. func (g *DataStoreGetter) HasAt(ctx context.Context, height uint64) bool { - storeHeight, err := g.store.Height(ctx) - if err != nil || height > storeHeight { - return false - } - _, _, err = g.store.GetBlockData(ctx, height) + _, _, err := g.store.GetBlockData(ctx, height) return err == nil } diff --git a/test/e2e/evm_full_node_e2e_test.go b/test/e2e/evm_full_node_e2e_test.go index 9e302959d2..e1981dbc1b 100644 --- a/test/e2e/evm_full_node_e2e_test.go +++ b/test/e2e/evm_full_node_e2e_test.go @@ -146,7 +146,7 @@ func verifyTransactionSync(t *testing.T, sequencerClient, fullNodeClient *ethcli } } return false - }, 60*time.Second, 500*time.Millisecond, "Full node should sync the block containing the transaction") + }, 3*time.Minute, 500*time.Millisecond, "Full node should sync the block containing the transaction") // Final verification - both nodes should have the transaction in the same block sequencerReceipt, err := sequencerClient.TransactionReceipt(ctx, txHash) diff --git a/test/e2e/evm_test_common.go b/test/e2e/evm_test_common.go index 5dda0421bd..d5a7215168 100644 --- a/test/e2e/evm_test_common.go +++ b/test/e2e/evm_test_common.go @@ -378,6 +378,7 @@ func setupSequencerNodeLazy(t *testing.T, sut *SystemUnderTest, sequencerHome, j // Use helper methods to get complete URLs args := []string{ "start", + "--evnode.log.level", "debug", "--evnode.log.format", "json", "--evm.jwt-secret-file", jwtSecretFile, "--evm.genesis-hash", genesisHash, @@ -440,6 +441,7 @@ func setupFullNode(t *testing.T, sut *SystemUnderTest, fullNodeHome, sequencerHo // Use helper methods to get complete URLs args := []string{ "start", + "--evnode.log.level", "debug", "--evnode.log.format", "json", "--home", fullNodeHome, "--evm.jwt-secret-file", fullNodeJwtSecretFile, From 387c3fb9dc67d3449c92796ec7aa4ab9a06ac509 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Thu, 12 Feb 2026 15:23:21 +0100 Subject: [PATCH 3/3] Minor updates --- .../executing/executor_restart_test.go | 17 ++++++++-------- block/internal/syncing/syncer.go | 15 ++------------ node/failover.go | 12 +++-------- node/full.go | 5 +---- node/node.go | 3 --- pkg/cmd/run_node.go | 2 -- pkg/p2p/client.go | 20 +++---------------- 7 files changed, 17 insertions(+), 57 deletions(-) diff --git a/block/internal/executing/executor_restart_test.go b/block/internal/executing/executor_restart_test.go index b72888860b..e5c3b6af40 100644 --- a/block/internal/executing/executor_restart_test.go +++ b/block/internal/executing/executor_restart_test.go @@ -79,7 +79,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) { require.NoError(t, exec1.initializeState()) // Set up context for first executor - exec1.ctx, exec1.cancel = context.WithCancel(context.Background()) + exec1.ctx, exec1.cancel = context.WithCancel(t.Context()) // First executor produces a block normally mockSeq1.EXPECT().GetNextBatch(mock.Anything, mock.AnythingOfType("sequencer.GetNextBatchRequest")). @@ -101,12 +101,12 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) { require.NoError(t, err) // Verify first block was produced - h1, err := memStore.Height(context.Background()) + h1, err := memStore.Height(t.Context()) require.NoError(t, err) assert.Equal(t, uint64(1), h1) // Store the produced block data for later verification - originalHeader, originalData, err := memStore.GetBlockData(context.Background(), 1) + originalHeader, originalData, err := memStore.GetBlockData(t.Context(), 1) require.NoError(t, err) assert.Equal(t, 2, len(originalData.Txs), "first block should have 2 transactions") @@ -158,8 +158,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) { pendingHeader.DataHash = pendingData.DACommitment() // Save pending block data (this is what would happen during a crash) - // We use savePendingBlock directly which writes to the metadata keys expected by the executor on restart - err = exec1.savePendingBlock(context.Background(), pendingHeader, pendingData) + err = exec1.savePendingBlock(t.Context(), pendingHeader, pendingData) require.NoError(t, err) // Stop first executor (simulating crash/restart) @@ -196,7 +195,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) { require.NoError(t, exec2.initializeState()) // Set up context for second executor - exec2.ctx, exec2.cancel = context.WithCancel(context.Background()) + exec2.ctx, exec2.cancel = context.WithCancel(t.Context()) defer exec2.cancel() // Verify that the state is at height 1 (pending block at height 2 wasn't committed) @@ -218,12 +217,12 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) { require.NoError(t, err) // Verify height advanced to 2 - h2, err := memStore.Height(context.Background()) + h2, err := memStore.Height(t.Context()) require.NoError(t, err) assert.Equal(t, uint64(2), h2, "height should advance to 2 using pending block") // Verify the block at height 2 matches the pending block data - finalHeader, finalData, err := memStore.GetBlockData(context.Background(), 2) + finalHeader, finalData, err := memStore.GetBlockData(t.Context(), 2) require.NoError(t, err) assert.Equal(t, 3, len(finalData.Txs), "should use pending block with 3 transactions") assert.Equal(t, []byte("pending_tx1"), []byte(finalData.Txs[0])) @@ -385,7 +384,7 @@ func TestExecutor_RestartNoPendingHeader(t *testing.T) { require.NoError(t, err) // Verify normal operation - h, err := memStore.Height(context.Background()) + h, err := memStore.Height(t.Context()) require.NoError(t, err) assert.Equal(t, uint64(numBlocks+1), h) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 1548b07a55..939abb0389 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -722,7 +722,7 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve currentState := s.getLastState() headerHash := header.Hash().String() - s.logger.Info().Uint64("height", nextHeight).Msg("syncing block started") + s.logger.Info().Uint64("height", nextHeight).Msg("syncing block") // Compared to the executor logic where the current block needs to be applied first, // here only the previous block needs to be applied to proceed to the verification. @@ -732,17 +732,6 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve s.cache.RemoveHeaderDAIncluded(headerHash) s.cache.RemoveDataDAIncluded(data.DACommitment().String()) - s.logger.Warn(). - Err(err). - Uint64("height", header.Height()). - Uint64("time", uint64(header.Time().Unix())). - Hex("proposer", header.ProposerAddress). - Str("data_hash", hex.EncodeToString(header.DataHash)). - Str("app_hash", hex.EncodeToString(header.AppHash)). - Hex("last_header_hash", header.LastHeaderHash). - Int("len signature", len(header.Signature)). - Msg("block validation failed") - if !errors.Is(err, errInvalidState) && !errors.Is(err, errInvalidBlock) { return errors.Join(errInvalidBlock, err) } @@ -810,7 +799,7 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve if s.p2pHandler != nil { s.p2pHandler.SetProcessedHeight(newState.LastBlockHeight) } - s.logger.Info().Uint64("height", nextHeight).Msg("syncing block completed") + return nil } diff --git a/node/failover.go b/node/failover.go index 0dfcb54179..493b28e184 100644 --- a/node/failover.go +++ b/node/failover.go @@ -189,23 +189,17 @@ func (f *failoverState) Run(pCtx context.Context) (multiErr error) { return nil }) - // P2P client persists across mode switches (started/closed by FullNode.Run). - // Reconfigure() was already called in setupFailoverState to re-bootstrap DHT. - - // Start header and data sync services concurrently. Each service's - // initFromP2PWithRetry can block up to 30s when peers have no blocks - // (e.g. lazy mode sequencer at height 0). Running them in parallel - // avoids a 60s cumulative startup delay. + // start header and data sync services concurrently to avoid cumulative startup delay. syncWg, syncCtx := errgroup.WithContext(ctx) syncWg.Go(func() error { if err := f.headerSyncService.Start(syncCtx); err != nil { - return fmt.Errorf("error while starting header sync service: %w", err) + return fmt.Errorf("header sync service: %w", err) } return nil }) syncWg.Go(func() error { if err := f.dataSyncService.Start(syncCtx); err != nil { - return fmt.Errorf("error while starting data sync service: %w", err) + return fmt.Errorf("data sync service: %w", err) } return nil }) diff --git a/node/full.go b/node/full.go index 42e4b6349f..41106de365 100644 --- a/node/full.go +++ b/node/full.go @@ -104,8 +104,6 @@ func newFullNode( } } - // The p2p client is fully configured and started before leader election. - // SyncService.getPeerIDs() gates peer usage on conf.Node.Aggregator. leaderFactory := func() (raftpkg.Runnable, error) { logger.Info().Msg("Starting aggregator-MODE") nodeConfig.Node.Aggregator = true @@ -283,8 +281,7 @@ func (n *FullNode) Run(parentCtx context.Context) error { n.prometheusSrv, n.pprofSrv = n.startInstrumentationServer() } - // Start the P2P client once. It persists across mode switches so that - // the host and PubSub (including externally registered topics) survive. + // Start the P2P client once. It persists across mode switches if err := n.p2pClient.Start(ctx); err != nil { return fmt.Errorf("start p2p: %w", err) } diff --git a/node/node.go b/node/node.go index 139636d644..d8aeea333f 100644 --- a/node/node.go +++ b/node/node.go @@ -28,9 +28,6 @@ type NodeOptions struct { // NewNode returns a new Full or Light Node based on the config. // This is the entry point for composing a node, when compiling a node, you need to provide an executor. // Example executors can be found in apps/ -// -// The p2pClient owns the node identity (private key) and is shared across -// mode switches. It supports in-place reconfiguration via Reconfigure(). func NewNode( conf config.Config, exec coreexecutor.Executor, diff --git a/pkg/cmd/run_node.go b/pkg/cmd/run_node.go index df96a0e371..33b1eba006 100644 --- a/pkg/cmd/run_node.go +++ b/pkg/cmd/run_node.go @@ -164,8 +164,6 @@ func StartNode( executor = telemetry.WithTracingExecutor(executor) } - // Create the P2P client. It is long-lived and reconfigured in-place - // on mode switches, avoiding costly teardown of the libp2p stack. p2pClient, err := p2p.NewClient(nodeConfig.P2P, nodeKey.PrivKey, datastore, genesis.ChainID, logger, nil) if err != nil { return fmt.Errorf("create p2p client: %w", err) diff --git a/pkg/p2p/client.go b/pkg/p2p/client.go index 7859c3cf97..4288f749f9 100644 --- a/pkg/p2p/client.go +++ b/pkg/p2p/client.go @@ -49,8 +49,7 @@ type Client struct { chainID string privKey crypto.PrivKey - rawHost host.Host // unwrapped libp2p host, stored to avoid double-wrapping via routedhost - host host.Host // may be wrapped with routedhost after DHT setup + host host.Host dht *dht.IpfsDHT disc *discovery.RoutingDiscovery gater *conngater.BasicConnectionGater @@ -124,12 +123,11 @@ func NewClientWithHost( // 4. Use active peer discovery to look for peers from same ORU network. func (c *Client) Start(ctx context.Context) error { if c.started { - return nil // already started — called from FullNode.Run() + return nil } c.logger.Debug().Msg("starting P2P client") if c.host != nil { - c.rawHost = c.host return c.startWithHost(ctx, c.host) } @@ -137,7 +135,6 @@ func (c *Client) Start(ctx context.Context) error { if err != nil { return err } - c.rawHost = h return c.startWithHost(ctx, h) } @@ -185,21 +182,10 @@ func (c *Client) Close() error { if c.host != nil { err = errors.Join(err, c.host.Close()) } + c.started = false return err } -// PrivKey returns the node's private key. -func (c *Client) PrivKey() crypto.PrivKey { - return c.privKey -} - -// Reconfigure updates the mutable P2P configuration without tearing down -// the libp2p host, PubSub, or DHT. Currently this only updates the -// stored config; the sync service gates peer usage on conf.Node.Aggregator. -func (c *Client) Reconfigure(conf config.P2PConfig) { - c.conf = conf -} - // Addrs returns listen addresses of Client. func (c *Client) Addrs() []multiaddr.Multiaddr { return c.host.Addrs()