Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement beacon db pruner #14687

Open
wants to merge 20 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
- Log before blob filesystem cache warm-up.
- New design for the attestation pool. [PR](https://github.com/prysmaticlabs/prysm/pull/14324)
- Add field param placeholder for Electra blob target and max to pass spec tests.
- Add Beacon DB pruning service to prune historical data beyond weak subjectivity checkpoint.

### Changed

Expand Down
1 change: 1 addition & 0 deletions beacon-chain/db/iface/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type NoHeadAccessDatabase interface {
SaveLightClientBootstrap(ctx context.Context, blockRoot []byte, bootstrap interfaces.LightClientBootstrap) error

CleanUpDirtyStates(ctx context.Context, slotsPerArchivedPoint primitives.Slot) error
DeleteBlocksAndStatesBeforeSlot(ctx context.Context, slot primitives.Slot) error
}

// HeadAccessDatabase defines a struct with access to reading chain head data.
Expand Down
76 changes: 62 additions & 14 deletions beacon-chain/db/kv/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (s *Store) Blocks(ctx context.Context, f *filters.QueryFilter) ([]interface
err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(blocksBucket)

keys, err := blockRootsByFilter(ctx, tx, f)
keys, _, err := blockRootsByFilter(ctx, tx, f)
if err != nil {
return err
}
Expand Down Expand Up @@ -131,7 +131,7 @@ func (s *Store) BlockRoots(ctx context.Context, f *filters.QueryFilter) ([][32]b
defer span.End()
blockRoots := make([][32]byte, 0)
err := s.db.View(func(tx *bolt.Tx) error {
keys, err := blockRootsByFilter(ctx, tx, f)
keys, _, err := blockRootsByFilter(ctx, tx, f)
if err != nil {
return err
}
Expand Down Expand Up @@ -238,6 +238,52 @@ func (s *Store) DeleteBlock(ctx context.Context, root [32]byte) error {
})
}

// DeleteBlocksAndStatesBeforeSlot deletes all blocks and states before the given slot.
func (s *Store) DeleteBlocksAndStatesBeforeSlot(ctx context.Context, cutoffSlot primitives.Slot) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.DeleteBlocksAndStatesBeforeSlot")
defer span.End()

// Perform all deletions in a single transaction for atomicity
return s.db.Update(func(tx *bolt.Tx) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are not deleting everything you should. Looking at bucket names, there are also:

  • checkpointBucket (if this bucket stores what the name suggests, then if we remove an old checkpoint block, this bucket will still contain the checkpoint)
  • stateValidatorsBucket (I think this one is very important because validators are a big chunk of the state)
  • blockParentRootIndicesBucket
  • stateSlotIndicesBucket
  • finalizedBlockRootsIndexBucket
  • blockRootValidatorHashesBucket
  • originCheckpointBlockRootKey (I don't know the implications of removing this, it's best to ask @kasey about it. The bucket is used in a few places, but after the first pruning the origin checkpoint will "move". Shouldn't we update the value in the bucket accordingly?)

filter := filters.NewFilter().SetStartSlot(0).SetEndSlot(cutoffSlot)
_, rootsBySlot, err := blockRootsByFilter(ctx, tx, filter)
if err != nil {
return errors.Wrap(err, "could not retrieve block roots by filter")
}

for slot, roots := range rootsBySlot {
for _, root := range roots {
// Delete Block
if err = tx.Bucket(blocksBucket).Delete(root); err != nil {
return errors.Wrap(err, "could not delete block")
}

// Delete State
if err = tx.Bucket(stateBucket).Delete(root); err != nil {
return errors.Wrap(err, "could not delete state")
}

// Delete state summary
if err = tx.Bucket(stateSummaryBucket).Delete(root); err != nil {
return errors.Wrap(err, "could not delete state summary")
}

// Delete block from cache
s.blockCache.Del(string(root))
// Delete state summary from cache
s.stateSummaryCache.delete([32]byte(root))
Comment on lines +271 to +274
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might not be very important to fix, but if the transaction fails these deletions will not roll back. Maybe it would be better to store a slice of roots and then clear caches after the transaction completes? The downside of this is allocating the slice.

}

// Delete slot indices of a block root
if err = tx.Bucket(blockSlotIndicesBucket).Delete(bytesutil.SlotToBytesBigEndian(slot)); err != nil {
return errors.Wrap(err, "could not delete block slot index")
}
}

return nil
})
}

// SaveBlock to the db.
func (s *Store) SaveBlock(ctx context.Context, signed interfaces.ReadOnlySignedBeaconBlock) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveBlock")
Expand Down Expand Up @@ -590,26 +636,26 @@ func (s *Store) SaveRegistrationsByValidatorIDs(ctx context.Context, ids []primi
}

// blockRootsByFilter retrieves the block roots given the filter criteria.
func blockRootsByFilter(ctx context.Context, tx *bolt.Tx, f *filters.QueryFilter) ([][]byte, error) {
func blockRootsByFilter(ctx context.Context, tx *bolt.Tx, f *filters.QueryFilter) ([][]byte, map[primitives.Slot][][]byte, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems a bit wasteful to return the roots in a slice and map. Can you make this work with just the map?

ctx, span := trace.StartSpan(ctx, "BeaconDB.blockRootsByFilter")
defer span.End()

// If no filter criteria are specified, return an error.
if f == nil {
return nil, errors.New("must specify a filter criteria for retrieving blocks")
return nil, nil, errors.New("must specify a filter criteria for retrieving blocks")
}

// Creates a list of indices from the passed in filter values, such as:
// []byte("0x2093923") in the parent root indices bucket to be used for looking up
// block roots that were stored under each of those indices for O(1) lookup.
indicesByBucket, err := createBlockIndicesFromFilters(ctx, f)
if err != nil {
return nil, errors.Wrap(err, "could not determine lookup indices")
return nil, nil, errors.Wrap(err, "could not determine lookup indices")
}

// We retrieve block roots that match a filter criteria of slot ranges, if specified.
filtersMap := f.Filters()
rootsBySlotRange, err := blockRootsBySlotRange(
rootsBySlotRange, rootsBySlot, err := blockRootsBySlotRange(
ctx,
tx.Bucket(blockSlotIndicesBucket),
filtersMap[filters.StartSlot],
Expand All @@ -619,7 +665,7 @@ func blockRootsByFilter(ctx context.Context, tx *bolt.Tx, f *filters.QueryFilter
filtersMap[filters.SlotStep],
)
if err != nil {
return nil, err
return nil, nil, err
}

// Once we have a list of block roots that correspond to each
Expand All @@ -643,7 +689,7 @@ func blockRootsByFilter(ctx context.Context, tx *bolt.Tx, f *filters.QueryFilter
}
}

return keys, nil
return keys, rootsBySlot, nil
}

// blockRootsBySlotRange looks into a boltDB bucket and performs a binary search
Expand All @@ -653,13 +699,13 @@ func blockRootsBySlotRange(
ctx context.Context,
bkt *bolt.Bucket,
startSlotEncoded, endSlotEncoded, startEpochEncoded, endEpochEncoded, slotStepEncoded interface{},
) ([][]byte, error) {
) ([][]byte, map[primitives.Slot][][]byte, error) {
_, span := trace.StartSpan(ctx, "BeaconDB.blockRootsBySlotRange")
defer span.End()

// Return nothing when all slot parameters are missing
if startSlotEncoded == nil && endSlotEncoded == nil && startEpochEncoded == nil && endEpochEncoded == nil {
return [][]byte{}, nil
return [][]byte{}, nil, nil
}

var startSlot, endSlot primitives.Slot
Expand All @@ -680,11 +726,11 @@ func blockRootsBySlotRange(
if startEpochOk && endEpochOk {
startSlot, err = slots.EpochStart(startEpoch)
if err != nil {
return nil, err
return nil, nil, err
}
endSlot, err = slots.EpochStart(endEpoch)
if err != nil {
return nil, err
return nil, nil, err
}
endSlot = endSlot + params.BeaconConfig().SlotsPerEpoch - 1
}
Expand All @@ -695,10 +741,11 @@ func blockRootsBySlotRange(
return key != nil && bytes.Compare(key, max) <= 0
}
if endSlot < startSlot {
return nil, errInvalidSlotRange
return nil, nil, errInvalidSlotRange
}
rootsRange := endSlot.SubSlot(startSlot).Div(step)
roots := make([][]byte, 0, rootsRange)
rootsBySlot := make(map[primitives.Slot][][]byte)
c := bkt.Cursor()
for k, v := c.Seek(min); conditional(k, max); k, v = c.Next() {
if step > 1 {
Expand All @@ -713,8 +760,9 @@ func blockRootsBySlotRange(
splitRoots = append(splitRoots, v[i:i+32])
}
roots = append(roots, splitRoots...)
rootsBySlot[bytesutil.BytesToSlotBigEndian(k)] = roots
}
return roots, nil
return roots, rootsBySlot, nil
}

// blockRootsBySlot retrieves the block roots by slot
Expand Down
58 changes: 58 additions & 0 deletions beacon-chain/db/kv/blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"testing"
"time"

"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"

"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filters"
Expand Down Expand Up @@ -353,6 +355,62 @@ func TestStore_DeleteFinalizedBlock(t *testing.T) {
require.NoError(t, db.SaveFinalizedCheckpoint(ctx, cp))
require.ErrorIs(t, db.DeleteBlock(ctx, root), ErrDeleteJustifiedAndFinalized)
}

func TestStore_DeleteBlocksAndStatesBeforeSlot(t *testing.T) {
slotsPerEpoch := uint64(params.BeaconConfig().SlotsPerEpoch)
db := setupDB(t)
ctx := context.Background()

require.NoError(t, db.SaveGenesisBlockRoot(ctx, genesisBlockRoot))

// We have blocks for 4 epochs.
blks := makeBlocks(t, 0, slotsPerEpoch*4, genesisBlockRoot)
require.NoError(t, db.SaveBlocks(ctx, blks))
ss := make([]*ethpb.StateSummary, len(blks))
sts := make([]*state.BeaconState, len(blks))
for i, blk := range blks {
r, err := blk.Block().HashTreeRoot()
require.NoError(t, err)
ss[i] = &ethpb.StateSummary{
Slot: blk.Block().Slot(),
Root: r[:],
}

st, err := util.NewBeaconState()
require.NoError(t, err)
require.NoError(t, db.SaveState(ctx, st, r))
sts[i] = &st
}
require.NoError(t, db.SaveStateSummaries(ctx, ss))

// Delete blocks of first epoch.
require.NoError(t, db.DeleteBlocksAndStatesBeforeSlot(ctx, primitives.Slot(slotsPerEpoch)))

// Check if we deleted the blocks successfully.
for i := 0; i < int(slotsPerEpoch); i++ {
root, err := blks[i].Block().HashTreeRoot()
require.NoError(t, err)

res, err := db.BlocksBySlot(ctx, blks[i].Block().Slot())
require.NoError(t, err)
require.Equal(t, 0, len(res))

require.Equal(t, false, db.HasBlock(ctx, root))
require.Equal(t, false, db.HasState(ctx, root))
require.Equal(t, false, db.HasStateSummary(ctx, root))
}

// Check if we have rest of the blocks.
for i := slotsPerEpoch; i < slotsPerEpoch*4; i++ {
root, err := blks[slotsPerEpoch].Block().HashTreeRoot()
require.NoError(t, err)

require.Equal(t, true, db.HasBlock(ctx, root))
require.Equal(t, true, db.HasState(ctx, root))
require.Equal(t, true, db.HasStateSummary(ctx, root))
}
}

func TestStore_GenesisBlock(t *testing.T) {
db := setupDB(t)
ctx := context.Background()
Expand Down
38 changes: 38 additions & 0 deletions beacon-chain/db/pruner/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
load("@prysm//tools/go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
srcs = ["pruner.go"],
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/pruner",
visibility = [
"//beacon-chain:__subpackages__",
],
deps = [
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/iface:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
"//time/slots:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = ["pruner_test.go"],
embed = [":go_default_library"],
deps = [
"//beacon-chain/db/testing:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/primitives:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"//time/slots/testing:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
],
)
Loading