diff --git a/apps/evm/cmd/rollback.go b/apps/evm/cmd/rollback.go index 3f11ef8d4f..911c330dd6 100644 --- a/apps/evm/cmd/rollback.go +++ b/apps/evm/cmd/rollback.go @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/common" ds "github.com/ipfs/go-datastore" + "github.com/rs/zerolog" "github.com/spf13/cobra" "github.com/evstack/ev-node/execution/evm" @@ -30,6 +31,7 @@ func NewRollbackCmd() *cobra.Command { if err != nil { return err } + logger := rollcmd.SetupLogger(nodeConfig.Log) goCtx := cmd.Context() if goCtx == nil { @@ -69,7 +71,7 @@ func NewRollbackCmd() *cobra.Command { } // rollback execution layer via EngineClient - engineClient, err := createRollbackEngineClient(cmd, rawEvolveDB) + engineClient, err := createRollbackEngineClient(cmd, rawEvolveDB, logger.With().Str("module", "engine_client").Logger()) if err != nil { cmd.Printf("Warning: failed to create engine client, skipping EL rollback: %v\n", err) } else { @@ -99,7 +101,7 @@ func NewRollbackCmd() *cobra.Command { return cmd } -func createRollbackEngineClient(cmd *cobra.Command, db ds.Batching) (*evm.EngineClient, error) { +func createRollbackEngineClient(cmd *cobra.Command, db ds.Batching, logger zerolog.Logger) (*evm.EngineClient, error) { ethURL, err := cmd.Flags().GetString(evm.FlagEvmEthURL) if err != nil { return nil, fmt.Errorf("failed to get '%s' flag: %w", evm.FlagEvmEthURL, err) @@ -128,5 +130,5 @@ func createRollbackEngineClient(cmd *cobra.Command, db ds.Batching) (*evm.Engine return nil, fmt.Errorf("JWT secret file '%s' is empty", jwtSecretFile) } - return evm.NewEngineExecutionClient(ethURL, engineURL, jwtSecret, common.Hash{}, common.Address{}, db, false) + return evm.NewEngineExecutionClient(ethURL, engineURL, jwtSecret, common.Hash{}, common.Address{}, db, false, logger) } diff --git a/apps/evm/cmd/run.go b/apps/evm/cmd/run.go index 6690d02d52..5ae854037e 100644 --- a/apps/evm/cmd/run.go +++ b/apps/evm/cmd/run.go @@ -55,7 +55,7 @@ var RunCmd = &cobra.Command{ } tracingEnabled := nodeConfig.Instrumentation.IsTracingEnabled() - executor, err := createExecutionClient(cmd, datastore, tracingEnabled) + executor, err := createExecutionClient(cmd, datastore, tracingEnabled, logger.With().Str("module", "engine_client").Logger()) if err != nil { return err } @@ -67,11 +67,6 @@ var RunCmd = &cobra.Command{ daClient := block.NewDAClient(blobClient, nodeConfig, logger) - // Attach logger to the EVM engine client if available - if ec, ok := executor.(*evm.EngineClient); ok { - ec.SetLogger(logger.With().Str("module", "engine_client").Logger()) - } - headerNamespace := da.NamespaceFromString(nodeConfig.DA.GetNamespace()) dataNamespace := da.NamespaceFromString(nodeConfig.DA.GetDataNamespace()) @@ -192,7 +187,7 @@ func createSequencer( return sequencer, nil } -func createExecutionClient(cmd *cobra.Command, db datastore.Batching, tracingEnabled bool) (execution.Executor, error) { +func createExecutionClient(cmd *cobra.Command, db datastore.Batching, tracingEnabled bool, logger zerolog.Logger) (execution.Executor, error) { // Read execution client parameters from flags ethURL, err := cmd.Flags().GetString(evm.FlagEvmEthURL) if err != nil { @@ -237,7 +232,7 @@ func createExecutionClient(cmd *cobra.Command, db datastore.Batching, tracingEna genesisHash := common.HexToHash(genesisHashStr) feeRecipient := common.HexToAddress(feeRecipientStr) - return evm.NewEngineExecutionClient(ethURL, engineURL, jwtSecret, genesisHash, feeRecipient, db, tracingEnabled) + return evm.NewEngineExecutionClient(ethURL, engineURL, jwtSecret, genesisHash, feeRecipient, db, tracingEnabled, logger) } // addFlags adds flags related to the EVM execution client diff --git a/block/components.go b/block/components.go index d5af466ef6..2956a98d81 100644 --- a/block/components.go +++ b/block/components.go @@ -12,6 +12,7 @@ import ( "github.com/evstack/ev-node/block/internal/common" da "github.com/evstack/ev-node/block/internal/da" "github.com/evstack/ev-node/block/internal/executing" + "github.com/evstack/ev-node/block/internal/pruner" "github.com/evstack/ev-node/block/internal/reaping" "github.com/evstack/ev-node/block/internal/submitting" "github.com/evstack/ev-node/block/internal/syncing" @@ -29,6 +30,7 @@ import ( // Components represents the block-related components type Components struct { Executor *executing.Executor + Pruner *pruner.Pruner Reaper *reaping.Reaper Syncer *syncing.Syncer Submitter *submitting.Submitter @@ -60,6 +62,11 @@ func (bc *Components) Start(ctx context.Context) error { return fmt.Errorf("failed to start executor: %w", err) } } + if bc.Pruner != nil { + if err := bc.Pruner.Start(ctxWithCancel); err != nil { + return fmt.Errorf("failed to start pruner: %w", err) + } + } if bc.Reaper != nil { if err := bc.Reaper.Start(ctxWithCancel); err != nil { return fmt.Errorf("failed to start reaper: %w", err) @@ -96,6 +103,11 @@ func (bc *Components) Stop() error { errs = errors.Join(errs, fmt.Errorf("failed to stop executor: %w", err)) } } + if bc.Pruner != nil { + if err := bc.Pruner.Stop(); err != nil { + errs = errors.Join(errs, fmt.Errorf("failed to stop pruner: %w", err)) + } + } if bc.Reaper != nil { if err := bc.Reaper.Stop(); err != nil { errs = errors.Join(errs, fmt.Errorf("failed to stop reaper: %w", err)) @@ -166,6 +178,12 @@ func NewSyncComponents( syncer.SetBlockSyncer(syncing.WithTracingBlockSyncer(syncer)) } + var execPruner coreexecutor.ExecPruner + if p, ok := exec.(coreexecutor.ExecPruner); ok { + execPruner = p + } + pruner := pruner.New(logger, store, execPruner, config.Node) + // Create submitter for sync nodes (no signer, only DA inclusion processing) var daSubmitter submitting.DASubmitterAPI = submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger, headerDAHintAppender, dataDAHintAppender) if config.Instrumentation.IsTracingEnabled() { @@ -189,6 +207,7 @@ func NewSyncComponents( Syncer: syncer, Submitter: submitter, Cache: cacheManager, + Pruner: pruner, errorCh: errorCh, }, nil } @@ -248,6 +267,12 @@ func NewAggregatorComponents( executor.SetBlockProducer(executing.WithTracingBlockProducer(executor)) } + var execPruner coreexecutor.ExecPruner + if p, ok := exec.(coreexecutor.ExecPruner); ok { + execPruner = p + } + pruner := pruner.New(logger, store, execPruner, config.Node) + reaper, err := reaping.NewReaper( exec, sequencer, @@ -264,6 +289,7 @@ func NewAggregatorComponents( if config.Node.BasedSequencer { // no submissions needed for bases sequencer return &Components{ Executor: executor, + Pruner: pruner, Reaper: reaper, Cache: cacheManager, errorCh: errorCh, @@ -290,6 +316,7 @@ func NewAggregatorComponents( return &Components{ Executor: executor, + Pruner: pruner, Reaper: reaper, Submitter: submitter, Cache: cacheManager, diff --git a/block/components_test.go b/block/components_test.go index 93c08a655a..3d1a1f4a1e 100644 --- a/block/components_test.go +++ b/block/components_test.go @@ -127,6 +127,7 @@ func TestNewSyncComponents_Creation(t *testing.T) { assert.NotNil(t, components.Syncer) assert.NotNil(t, components.Submitter) assert.NotNil(t, components.Cache) + assert.NotNil(t, components.Pruner) assert.NotNil(t, components.errorCh) assert.Nil(t, components.Executor) // Sync nodes don't have executors } @@ -183,6 +184,7 @@ func TestNewAggregatorComponents_Creation(t *testing.T) { assert.NotNil(t, components.Executor) assert.NotNil(t, components.Submitter) assert.NotNil(t, components.Cache) + assert.NotNil(t, components.Pruner) assert.NotNil(t, components.errorCh) assert.Nil(t, components.Syncer) // Aggregator nodes currently don't create syncers in this constructor } diff --git a/block/internal/pruner/pruner.go b/block/internal/pruner/pruner.go new file mode 100644 index 0000000000..14e23bc7a4 --- /dev/null +++ b/block/internal/pruner/pruner.go @@ -0,0 +1,195 @@ +package pruner + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "sync" + "time" + + ds "github.com/ipfs/go-datastore" + "github.com/rs/zerolog" + + coreexecutor "github.com/evstack/ev-node/core/execution" + + "github.com/evstack/ev-node/pkg/config" + "github.com/evstack/ev-node/pkg/store" +) + +const defaultPruneInterval = 15 * time.Minute + +// Pruner periodically removes old state and execution metadata entries. +type Pruner struct { + store store.Store + execPruner coreexecutor.ExecPruner + cfg config.NodeConfig + logger zerolog.Logger + lastPruned uint64 + + // Lifecycle + ctx context.Context + wg sync.WaitGroup + cancel context.CancelFunc +} + +// New creates a new Pruner instance. +func New( + logger zerolog.Logger, + store store.Store, + execPruner coreexecutor.ExecPruner, + cfg config.NodeConfig, +) *Pruner { + return &Pruner{ + store: store, + execPruner: execPruner, + cfg: cfg, + logger: logger.With().Str("component", "prune").Logger(), + } +} + +// Start begins the pruning loop. +func (p *Pruner) Start(ctx context.Context) error { + p.ctx, p.cancel = context.WithCancel(ctx) + + // Start pruner loop + p.wg.Go(p.pruneLoop) + + p.logger.Info().Msg("pruner started") + return nil +} + +// Stop stops the pruning loop. +func (p *Pruner) Stop() error { + if p.cancel != nil { + p.cancel() + } + p.wg.Wait() + + p.logger.Info().Msg("pruner stopped") + return nil +} + +func (p *Pruner) pruneLoop() { + ticker := time.NewTicker(defaultPruneInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := p.pruneRecoveryHistory(p.ctx, p.cfg.RecoveryHistoryDepth); err != nil { + p.logger.Error().Err(err).Msg("failed to prune recovery history") + } + + if err := p.pruneBlocks(); err != nil { + p.logger.Error().Err(err).Msg("failed to prune old blocks") + } + + // TODO: add pruning of old blocks // https://github.com/evstack/ev-node/pull/2984 + case <-p.ctx.Done(): + return + } + } +} + +func (p *Pruner) pruneBlocks() error { + if !p.cfg.PruningEnabled || p.cfg.PruningKeepRecent == 0 || p.cfg.PruningInterval == 0 { + return nil + } + + var currentDAIncluded uint64 + currentDAIncludedBz, err := p.store.GetMetadata(p.ctx, store.DAIncludedHeightKey) + if err == nil && len(currentDAIncludedBz) == 8 { + currentDAIncluded = binary.LittleEndian.Uint64(currentDAIncludedBz) + } else { + // if we cannot get the current DA height, we cannot safely prune, so we skip pruning until we can get it. + return nil + } + + var lastPruned uint64 + if bz, err := p.store.GetMetadata(p.ctx, store.LastPrunedBlockHeightKey); err == nil && len(bz) == 8 { + lastPruned = binary.LittleEndian.Uint64(bz) + } + + storeHeight, err := p.store.Height(p.ctx) + if err != nil { + return fmt.Errorf("failed to get store height for pruning: %w", err) + } + if storeHeight <= lastPruned+p.cfg.PruningInterval { + return nil + } + + // Never prune blocks that are not DA included + upperBound := min(storeHeight, currentDAIncluded) + if upperBound <= p.cfg.PruningKeepRecent { + // Not enough fully included blocks to prune + return nil + } + + targetHeight := upperBound - p.cfg.PruningKeepRecent + + if err := p.store.PruneBlocks(p.ctx, targetHeight); err != nil { + p.logger.Error().Err(err).Uint64("target_height", targetHeight).Msg("failed to prune old block data") + } + + if p.execPruner != nil { + if err := p.execPruner.PruneExec(p.ctx, targetHeight); err != nil && !errors.Is(err, ds.ErrNotFound) { + return err + } + } + + return nil +} + +// pruneRecoveryHistory prunes old state and execution metadata entries based on the configured retention depth. +// It does not prunes old blocks, as those are handled by the pruning logic. +// Pruning old state does not lose history but limit the ability to recover (replay or rollback) to the last HEAD-N blocks, where N is the retention depth. +func (p *Pruner) pruneRecoveryHistory(ctx context.Context, retention uint64) error { + if p.cfg.RecoveryHistoryDepth == 0 { + return nil + } + + height, err := p.store.Height(ctx) + if err != nil { + return err + } + + if height <= retention { + return nil + } + + target := height - retention + if target <= p.lastPruned { + return nil + } + + // maxPruneBatch limits how many heights we prune per cycle to bound work. + // it is callibrated to prune the last N blocks in one cycle, where N is the number of blocks produced in the defaultPruneInterval. + blockTime := p.cfg.BlockTime.Duration + if blockTime == 0 { + blockTime = 1 + } + + maxPruneBatch := max(uint64(defaultPruneInterval/blockTime), (target-p.lastPruned)/5) + + start := p.lastPruned + 1 + end := target + if end-start+1 > maxPruneBatch { + end = start + maxPruneBatch - 1 + } + + for h := start; h <= end; h++ { + if err := p.store.DeleteStateAtHeight(ctx, h); err != nil && !errors.Is(err, ds.ErrNotFound) { + return err + } + + if p.execPruner != nil { + if err := p.execPruner.PruneExec(ctx, h); err != nil && !errors.Is(err, ds.ErrNotFound) { + return err + } + } + } + + p.lastPruned = end + return nil +} diff --git a/block/internal/pruner/pruner_test.go b/block/internal/pruner/pruner_test.go new file mode 100644 index 0000000000..2d6b73d406 --- /dev/null +++ b/block/internal/pruner/pruner_test.go @@ -0,0 +1,52 @@ +package pruner + +import ( + "context" + "testing" + "time" + + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + + "github.com/evstack/ev-node/pkg/config" + "github.com/evstack/ev-node/pkg/store" + "github.com/evstack/ev-node/types" +) + +type execMetaAdapter struct { + existing map[uint64]struct{} +} + +func (e *execMetaAdapter) PruneExec(ctx context.Context, height uint64) error { + delete(e.existing, height) + return nil +} + +func TestPrunerPrunesRecoveryHistory(t *testing.T) { + t.Parallel() + + ctx := context.Background() + kv := dssync.MutexWrap(ds.NewMapDatastore()) + stateStore := store.New(kv) + + for height := uint64(1); height <= 3; height++ { + batch, err := stateStore.NewBatch(ctx) + require.NoError(t, err) + require.NoError(t, batch.SetHeight(height)) + require.NoError(t, batch.UpdateState(types.State{LastBlockHeight: height})) + require.NoError(t, batch.Commit()) + } + + execAdapter := &execMetaAdapter{existing: map[uint64]struct{}{1: {}, 2: {}, 3: {}}} + + recoveryPruner := New(zerolog.Nop(), stateStore, execAdapter, config.NodeConfig{RecoveryHistoryDepth: 2, BlockTime: config.DurationWrapper{Duration: 10 * time.Second}}) + require.NoError(t, recoveryPruner.pruneRecoveryHistory(ctx, recoveryPruner.cfg.RecoveryHistoryDepth)) + + _, err := stateStore.GetStateAtHeight(ctx, 1) + require.ErrorIs(t, err, ds.ErrNotFound) + + _, exists := execAdapter.existing[1] + require.False(t, exists) +} diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 6a40bd818e..1c5b034c19 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -364,50 +364,6 @@ func (s *Submitter) processDAInclusionLoop() { // This can only be performed after the height has been persisted to store s.cache.DeleteHeight(nextHeight) } - - // Run height-based pruning if enabled. - s.pruneBlocks() - } - } -} - -func (s *Submitter) pruneBlocks() { - if !s.config.Node.PruningEnabled || s.config.Node.PruningKeepRecent == 0 || s.config.Node.PruningInterval == 0 { - return - } - - currentDAIncluded := s.GetDAIncludedHeight() - - var lastPruned uint64 - if bz, err := s.store.GetMetadata(s.ctx, store.LastPrunedBlockHeightKey); err == nil && len(bz) == 8 { - lastPruned = binary.LittleEndian.Uint64(bz) - } - - storeHeight, err := s.store.Height(s.ctx) - if err != nil { - s.logger.Error().Err(err).Msg("failed to get store height for pruning") - return - } - if storeHeight <= lastPruned+s.config.Node.PruningInterval { - return - } - - // Never prune blocks that are not DA included - upperBound := min(storeHeight, currentDAIncluded) - if upperBound <= s.config.Node.PruningKeepRecent { - // Not enough fully included blocks to prune - return - } - - targetHeight := upperBound - s.config.Node.PruningKeepRecent - - if err := s.store.PruneBlocks(s.ctx, targetHeight); err != nil { - s.logger.Error().Err(err).Uint64("target_height", targetHeight).Msg("failed to prune old block data") - } - - if pruner, ok := s.exec.(coreexecutor.ExecPruner); ok { - if err := pruner.PruneExec(s.ctx, targetHeight); err != nil { - s.logger.Error().Err(err).Uint64("target_height", targetHeight).Msg("failed to prune execution metadata") } } } diff --git a/execution/evm/execution.go b/execution/evm/execution.go index 399f7c91ee..a25fbd3567 100644 --- a/execution/evm/execution.go +++ b/execution/evm/execution.go @@ -202,6 +202,7 @@ func NewEngineExecutionClient( feeRecipient common.Address, db ds.Batching, tracingEnabled bool, + logger zerolog.Logger, ) (*EngineClient, error) { if db == nil { return nil, errors.New("db is required for EVM execution client") @@ -265,22 +266,10 @@ func NewEngineExecutionClient( currentSafeBlockHash: genesisHash, currentFinalizedBlockHash: genesisHash, blockHashCache: make(map[uint64]common.Hash), - logger: zerolog.Nop(), + logger: logger, }, nil } -// PruneExec implements execution.ExecPruner by delegating to the -// underlying EVMStore. It is safe to call this multiple times with the same -// or increasing heights; the store tracks its own last-pruned height. -func (c *EngineClient) PruneExec(ctx context.Context, height uint64) error { - return c.store.PruneExec(ctx, height) -} - -// SetLogger allows callers to attach a structured logger. -func (c *EngineClient) SetLogger(l zerolog.Logger) { - c.logger = l -} - // InitChain initializes the blockchain with the given genesis parameters func (c *EngineClient) InitChain(ctx context.Context, genesisTime time.Time, initialHeight uint64, chainID string) ([]byte, error) { if initialHeight != 1 { @@ -1103,6 +1092,13 @@ func (c *EngineClient) Rollback(ctx context.Context, targetHeight uint64) error return nil } +// PruneExec implements execution.ExecPruner by delegating to the +// underlying EVMStore. It is safe to call this multiple times with the same +// or increasing heights; the store tracks its own last-pruned height. +func (c *EngineClient) PruneExec(ctx context.Context, height uint64) error { + return c.store.PruneExec(ctx, height) +} + // decodeSecret decodes a hex-encoded JWT secret string into a byte slice. func decodeSecret(jwtSecret string) ([]byte, error) { secret, err := hex.DecodeString(strings.TrimPrefix(jwtSecret, "0x")) diff --git a/execution/evm/test/execution_test.go b/execution/evm/test/execution_test.go index aa6f2db174..867b99b77d 100644 --- a/execution/evm/test/execution_test.go +++ b/execution/evm/test/execution_test.go @@ -12,6 +12,7 @@ import ( "github.com/ethereum/go-ethereum/ethclient" ds "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" + "github.com/rs/zerolog" "github.com/stretchr/testify/require" "github.com/evstack/ev-node/execution/evm" @@ -75,6 +76,7 @@ func TestEngineExecution(t *testing.T) { common.Address{}, store, false, + zerolog.Nop(), ) require.NoError(tt, err) @@ -172,6 +174,7 @@ func TestEngineExecution(t *testing.T) { common.Address{}, store, false, + zerolog.Nop(), ) require.NoError(tt, err) diff --git a/execution/evm/test/go.mod b/execution/evm/test/go.mod index 1eaa13d1d7..744c263775 100644 --- a/execution/evm/test/go.mod +++ b/execution/evm/test/go.mod @@ -8,6 +8,7 @@ require ( github.com/evstack/ev-node/execution/evm v0.0.0-00010101000000-000000000000 github.com/golang-jwt/jwt/v5 v5.3.1 github.com/ipfs/go-datastore v0.9.0 + github.com/rs/zerolog v1.34.0 github.com/stretchr/testify v1.11.1 ) @@ -147,7 +148,6 @@ require ( github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/rogpeppe/go-internal v1.14.1 // indirect - github.com/rs/zerolog v1.34.0 // indirect github.com/sagikazarmark/locafero v0.11.0 // indirect github.com/sasha-s/go-deadlock v0.3.5 // indirect github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect diff --git a/pkg/config/config.go b/pkg/config/config.go index b964ac7bf2..d48501b63d 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -51,6 +51,8 @@ const ( FlagReadinessMaxBlocksBehind = FlagPrefixEvnode + "node.readiness_max_blocks_behind" // FlagScrapeInterval is a flag for specifying the reaper scrape interval FlagScrapeInterval = FlagPrefixEvnode + "node.scrape_interval" + // FlagRecoveryHistoryDepth is a flag for specifying how much recovery history to keep + FlagRecoveryHistoryDepth = FlagPrefixEvnode + "node.recovery_history_depth" // FlagClearCache is a flag for clearing the cache FlagClearCache = FlagPrefixEvnode + "clear_cache" @@ -265,9 +267,10 @@ type NodeConfig struct { // Pruning configuration // When enabled, the node will periodically prune old block data (headers, data, // signatures, and hash index) from the local store while keeping recent history. - PruningEnabled bool `mapstructure:"pruning_enabled" yaml:"pruning_enabled" comment:"Enable height-based pruning of stored block data. When disabled, all blocks are kept (archive mode)."` - PruningKeepRecent uint64 `mapstructure:"pruning_keep_recent" yaml:"pruning_keep_recent" comment:"Number of most recent blocks to retain when pruning is enabled. Must be > 0 when pruning is enabled; set pruning_enabled=false to keep all blocks (archive mode)."` - PruningInterval uint64 `mapstructure:"pruning_interval" yaml:"pruning_interval" comment:"Run pruning every N blocks. Must be >= 1 when pruning is enabled."` + PruningEnabled bool `mapstructure:"pruning_enabled" yaml:"pruning_enabled" comment:"Enable height-based pruning of stored block data. When disabled, all blocks are kept (archive mode)."` + PruningKeepRecent uint64 `mapstructure:"pruning_keep_recent" yaml:"pruning_keep_recent" comment:"Number of most recent blocks to retain when pruning is enabled. Must be > 0 when pruning is enabled; set pruning_enabled=false to keep all blocks (archive mode)."` + PruningInterval uint64 `mapstructure:"pruning_interval" yaml:"pruning_interval" comment:"Run pruning every N blocks. Must be >= 1 when pruning is enabled."` + RecoveryHistoryDepth uint64 `mapstructure:"recovery_history_depth" yaml:"recovery_history_depth" comment:"Number of recent heights to keep state and execution metadata indexed for recovery (0 keeps all)."` } // LogConfig contains all logging configuration parameters @@ -458,6 +461,7 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().Bool(FlagPrefixEvnode+"node.pruning_enabled", def.Node.PruningEnabled, "enable height-based pruning of stored block data (headers, data, signatures, index)") cmd.Flags().Uint64(FlagPrefixEvnode+"node.pruning_keep_recent", def.Node.PruningKeepRecent, "number of most recent blocks to retain when pruning is enabled (must be > 0; disable pruning to keep all blocks)") cmd.Flags().Uint64(FlagPrefixEvnode+"node.pruning_interval", def.Node.PruningInterval, "run pruning every N blocks (must be >= 1 when pruning is enabled)") + cmd.Flags().Uint64(FlagRecoveryHistoryDepth, def.Node.RecoveryHistoryDepth, "number of recent heights to keep state and execution metadata indexed for recovery (0 keeps all)") // Data Availability configuration flags cmd.Flags().String(FlagDAAddress, def.DA.Address, "DA address (host:port)") diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 9189aa57e5..d8a3ac6c73 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -33,6 +33,7 @@ func TestDefaultConfig(t *testing.T) { assert.Equal(t, uint64(0), def.Node.MaxPendingHeadersAndData) assert.Equal(t, false, def.Node.LazyMode) assert.Equal(t, 60*time.Second, def.Node.LazyBlockInterval.Duration) + assert.Equal(t, uint64(0), def.Node.RecoveryHistoryDepth) assert.Equal(t, "file", def.Signer.SignerType) assert.Equal(t, "config", def.Signer.SignerPath) assert.Equal(t, "127.0.0.1:7331", def.RPC.Address) @@ -64,6 +65,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagReadinessWindowSeconds, DefaultConfig().Node.ReadinessWindowSeconds) assertFlagValue(t, flags, FlagReadinessMaxBlocksBehind, DefaultConfig().Node.ReadinessMaxBlocksBehind) assertFlagValue(t, flags, FlagScrapeInterval, DefaultConfig().Node.ScrapeInterval) + assertFlagValue(t, flags, FlagRecoveryHistoryDepth, DefaultConfig().Node.RecoveryHistoryDepth) // DA flags assertFlagValue(t, flags, FlagDAAddress, DefaultConfig().DA.Address) @@ -112,7 +114,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagRPCEnableDAVisualization, DefaultConfig().RPC.EnableDAVisualization) // Count the number of flags we're explicitly checking - expectedFlagCount := 66 // Update this number if you add more flag checks above + expectedFlagCount := 67 // Update this number if you add more flag checks above // Get the actual number of flags (both regular and persistent) actualFlagCount := 0 diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index ee2cbfbeec..68c450e26b 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -16,6 +16,8 @@ const ( ConfigName = ConfigFileName + "." + ConfigExtension // AppConfigDir is the directory name for the app configuration. AppConfigDir = "config" + + defaultRecoveryHistoryDepth = uint64(0) ) // DefaultRootDir returns the default root directory for evolve @@ -66,6 +68,7 @@ func DefaultConfig() Config { LazyMode: false, LazyBlockInterval: DurationWrapper{60 * time.Second}, Light: false, + RecoveryHistoryDepth: defaultRecoveryHistoryDepth, ReadinessWindowSeconds: defaultReadinessWindowSeconds, ReadinessMaxBlocksBehind: calculateReadinessMaxBlocksBehind(defaultBlockTime.Duration, defaultReadinessWindowSeconds), ScrapeInterval: DurationWrapper{1 * time.Second}, diff --git a/pkg/store/cached_store.go b/pkg/store/cached_store.go index 8a5cca5b38..490f44411a 100644 --- a/pkg/store/cached_store.go +++ b/pkg/store/cached_store.go @@ -169,6 +169,12 @@ func (cs *CachedStore) PruneBlocks(ctx context.Context, height uint64) error { return nil } +// DeleteStateAtHeight removes the state entry at the given height from the underlying store. +// This value is not cached, so nothing to invalidate. +func (cs *CachedStore) DeleteStateAtHeight(ctx context.Context, height uint64) error { + return cs.DeleteStateAtHeight(ctx, height) +} + // Close closes the underlying store. func (cs *CachedStore) Close() error { cs.ClearCache() diff --git a/pkg/store/store.go b/pkg/store/store.go index d981400f07..908f42dbf7 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -172,6 +172,14 @@ func (s *DefaultStore) GetStateAtHeight(ctx context.Context, height uint64) (typ return state, nil } +// DeleteStateAtHeight removes the state entry at the given height. +func (s *DefaultStore) DeleteStateAtHeight(ctx context.Context, height uint64) error { + if err := s.db.Delete(ctx, ds.NewKey(getStateAtHeightKey(height))); err != nil && !errors.Is(err, ds.ErrNotFound) { + return fmt.Errorf("failed to delete state at height %d: %w", height, err) + } + return nil +} + // SetMetadata saves arbitrary value in the store. // // Metadata is separated from other data by using prefix in KV. diff --git a/pkg/store/store_test.go b/pkg/store/store_test.go index f1b9a131cc..08ead91074 100644 --- a/pkg/store/store_test.go +++ b/pkg/store/store_test.go @@ -591,6 +591,27 @@ func TestUpdateStateError(t *testing.T) { require.Contains(err.Error(), mockErrPut.Error()) } +func TestDeleteStateAtHeight(t *testing.T) { + t.Parallel() + require := require.New(t) + + kv, err := NewTestInMemoryKVStore() + require.NoError(err) + + store := New(kv) + + batch, err := store.NewBatch(t.Context()) + require.NoError(err) + require.NoError(batch.SetHeight(1)) + require.NoError(batch.UpdateState(types.State{LastBlockHeight: 1})) + require.NoError(batch.Commit()) + + require.NoError(store.(*DefaultStore).DeleteStateAtHeight(t.Context(), 1)) + + _, err = store.GetStateAtHeight(t.Context(), 1) + require.ErrorIs(err, ds.ErrNotFound) +} + func TestGetStateError(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/pkg/store/tracing.go b/pkg/store/tracing.go index 42d686d612..ea1d5c8842 100644 --- a/pkg/store/tracing.go +++ b/pkg/store/tracing.go @@ -228,6 +228,22 @@ func (t *tracedStore) DeleteMetadata(ctx context.Context, key string) error { return nil } +// DeleteStateAtHeight removes the state entry at the given height from the underlying store. +func (t *tracedStore) DeleteStateAtHeight(ctx context.Context, height uint64) error { + ctx, span := t.tracer.Start(ctx, "Store.DeleteStateAtHeight", + trace.WithAttributes(attribute.Int64("height", int64(height))), + ) + defer span.End() + + if err := t.inner.DeleteStateAtHeight(ctx, height); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} + func (t *tracedStore) Rollback(ctx context.Context, height uint64, aggregator bool) error { ctx, span := t.tracer.Start(ctx, "Store.Rollback", trace.WithAttributes( diff --git a/pkg/store/tracing_test.go b/pkg/store/tracing_test.go index 5477a66985..1109791c76 100644 --- a/pkg/store/tracing_test.go +++ b/pkg/store/tracing_test.go @@ -30,6 +30,8 @@ type tracingMockStore struct { setMetadataFn func(ctx context.Context, key string, value []byte) error deleteMetadataFn func(ctx context.Context, key string) error rollbackFn func(ctx context.Context, height uint64, aggregator bool) error + pruneBlocksFn func(ctx context.Context, height uint64) error + deleteStateAtHeightFn func(ctx context.Context, height uint64) error newBatchFn func(ctx context.Context) (Batch, error) } @@ -125,8 +127,16 @@ func (m *tracingMockStore) Rollback(ctx context.Context, height uint64, aggregat } func (m *tracingMockStore) PruneBlocks(ctx context.Context, height uint64) error { - // For tracing tests we don't need pruning behavior; just satisfy the Store - // interface. Specific pruning behavior is tested separately in store_test.go. + if m.pruneBlocksFn != nil { + return m.pruneBlocksFn(ctx, height) + } + return nil +} + +func (m *tracingMockStore) DeleteStateAtHeight(ctx context.Context, height uint64) error { + if m.deleteStateAtHeightFn != nil { + return m.deleteStateAtHeightFn(ctx, height) + } return nil } diff --git a/pkg/store/types.go b/pkg/store/types.go index a5635feec1..f785850b2c 100644 --- a/pkg/store/types.go +++ b/pkg/store/types.go @@ -99,4 +99,8 @@ type Pruner interface { // up to and including the given height from the store, without modifying // state snapshots or the current chain height. PruneBlocks(ctx context.Context, height uint64) error + + // DeleteStateAtHeight removes the state at the given height from the store. + // It does not affect the current state or any states at other heights, allowing for targeted pruning of historical state snapshots. + DeleteStateAtHeight(ctx context.Context, height uint64) error } diff --git a/test/mocks/store.go b/test/mocks/store.go index efd1939940..2cde50c543 100644 --- a/test/mocks/store.go +++ b/test/mocks/store.go @@ -39,14 +39,6 @@ func (_m *MockStore) EXPECT() *MockStore_Expecter { return &MockStore_Expecter{mock: &_m.Mock} } -// PruneBlocks provides a mock implementation for the Store's pruning method. -// Tests using MockStore currently do not exercise pruning behavior, so this -// method simply satisfies the interface and can be extended with expectations -// later if needed. -func (_mock *MockStore) PruneBlocks(ctx context.Context, height uint64) error { - return nil -} - // Close provides a mock function for the type MockStore func (_mock *MockStore) Close() error { ret := _mock.Called() @@ -148,6 +140,63 @@ func (_c *MockStore_DeleteMetadata_Call) RunAndReturn(run func(ctx context.Conte return _c } +// DeleteStateAtHeight provides a mock function for the type MockStore +func (_mock *MockStore) DeleteStateAtHeight(ctx context.Context, height uint64) error { + ret := _mock.Called(ctx, height) + + if len(ret) == 0 { + panic("no return value specified for DeleteStateAtHeight") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(context.Context, uint64) error); ok { + r0 = returnFunc(ctx, height) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockStore_DeleteStateAtHeight_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteStateAtHeight' +type MockStore_DeleteStateAtHeight_Call struct { + *mock.Call +} + +// DeleteStateAtHeight is a helper method to define mock.On call +// - ctx context.Context +// - height uint64 +func (_e *MockStore_Expecter) DeleteStateAtHeight(ctx interface{}, height interface{}) *MockStore_DeleteStateAtHeight_Call { + return &MockStore_DeleteStateAtHeight_Call{Call: _e.mock.On("DeleteStateAtHeight", ctx, height)} +} + +func (_c *MockStore_DeleteStateAtHeight_Call) Run(run func(ctx context.Context, height uint64)) *MockStore_DeleteStateAtHeight_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 uint64 + if args[1] != nil { + arg1 = args[1].(uint64) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockStore_DeleteStateAtHeight_Call) Return(err error) *MockStore_DeleteStateAtHeight_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockStore_DeleteStateAtHeight_Call) RunAndReturn(run func(ctx context.Context, height uint64) error) *MockStore_DeleteStateAtHeight_Call { + _c.Call.Return(run) + return _c +} + // GetBlockByHash provides a mock function for the type MockStore func (_mock *MockStore) GetBlockByHash(ctx context.Context, hash []byte) (*types.SignedHeader, *types.Data, error) { ret := _mock.Called(ctx, hash) @@ -888,6 +937,63 @@ func (_c *MockStore_NewBatch_Call) RunAndReturn(run func(ctx context.Context) (s return _c } +// PruneBlocks provides a mock function for the type MockStore +func (_mock *MockStore) PruneBlocks(ctx context.Context, height uint64) error { + ret := _mock.Called(ctx, height) + + if len(ret) == 0 { + panic("no return value specified for PruneBlocks") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(context.Context, uint64) error); ok { + r0 = returnFunc(ctx, height) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockStore_PruneBlocks_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PruneBlocks' +type MockStore_PruneBlocks_Call struct { + *mock.Call +} + +// PruneBlocks is a helper method to define mock.On call +// - ctx context.Context +// - height uint64 +func (_e *MockStore_Expecter) PruneBlocks(ctx interface{}, height interface{}) *MockStore_PruneBlocks_Call { + return &MockStore_PruneBlocks_Call{Call: _e.mock.On("PruneBlocks", ctx, height)} +} + +func (_c *MockStore_PruneBlocks_Call) Run(run func(ctx context.Context, height uint64)) *MockStore_PruneBlocks_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 uint64 + if args[1] != nil { + arg1 = args[1].(uint64) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockStore_PruneBlocks_Call) Return(err error) *MockStore_PruneBlocks_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockStore_PruneBlocks_Call) RunAndReturn(run func(ctx context.Context, height uint64) error) *MockStore_PruneBlocks_Call { + _c.Call.Return(run) + return _c +} + // Rollback provides a mock function for the type MockStore func (_mock *MockStore) Rollback(ctx context.Context, height uint64, aggregator bool) error { ret := _mock.Called(ctx, height, aggregator)