-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement beacon db pruner #14687
base: develop
Are you sure you want to change the base?
Implement beacon db pruner #14687
Changes from all commits
b001104
e466cb4
bf5e898
9c30fdb
eddcfdb
4aa0bad
1cb73b6
d854375
9cb2195
1c8ca8d
2b3a2d7
aee4684
b373d0d
073e633
11b7ee8
a776e55
742fcd4
a510979
a2038ce
599f8be
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -102,7 +102,7 @@ func (s *Store) Blocks(ctx context.Context, f *filters.QueryFilter) ([]interface | |
err := s.db.View(func(tx *bolt.Tx) error { | ||
bkt := tx.Bucket(blocksBucket) | ||
|
||
keys, err := blockRootsByFilter(ctx, tx, f) | ||
keys, _, err := blockRootsByFilter(ctx, tx, f) | ||
if err != nil { | ||
return err | ||
} | ||
|
@@ -131,7 +131,7 @@ func (s *Store) BlockRoots(ctx context.Context, f *filters.QueryFilter) ([][32]b | |
defer span.End() | ||
blockRoots := make([][32]byte, 0) | ||
err := s.db.View(func(tx *bolt.Tx) error { | ||
keys, err := blockRootsByFilter(ctx, tx, f) | ||
keys, _, err := blockRootsByFilter(ctx, tx, f) | ||
if err != nil { | ||
return err | ||
} | ||
|
@@ -238,6 +238,52 @@ func (s *Store) DeleteBlock(ctx context.Context, root [32]byte) error { | |
}) | ||
} | ||
|
||
// DeleteBlocksAndStatesBeforeSlot deletes all blocks and states before the given slot. | ||
func (s *Store) DeleteBlocksAndStatesBeforeSlot(ctx context.Context, cutoffSlot primitives.Slot) error { | ||
ctx, span := trace.StartSpan(ctx, "BeaconDB.DeleteBlocksAndStatesBeforeSlot") | ||
defer span.End() | ||
|
||
// Perform all deletions in a single transaction for atomicity | ||
return s.db.Update(func(tx *bolt.Tx) error { | ||
filter := filters.NewFilter().SetStartSlot(0).SetEndSlot(cutoffSlot) | ||
_, rootsBySlot, err := blockRootsByFilter(ctx, tx, filter) | ||
if err != nil { | ||
return errors.Wrap(err, "could not retrieve block roots by filter") | ||
} | ||
|
||
for slot, roots := range rootsBySlot { | ||
for _, root := range roots { | ||
// Delete Block | ||
if err = tx.Bucket(blocksBucket).Delete(root); err != nil { | ||
return errors.Wrap(err, "could not delete block") | ||
} | ||
|
||
// Delete State | ||
if err = tx.Bucket(stateBucket).Delete(root); err != nil { | ||
return errors.Wrap(err, "could not delete state") | ||
} | ||
|
||
// Delete state summary | ||
if err = tx.Bucket(stateSummaryBucket).Delete(root); err != nil { | ||
return errors.Wrap(err, "could not delete state summary") | ||
} | ||
|
||
// Delete block from cache | ||
s.blockCache.Del(string(root)) | ||
// Delete state summary from cache | ||
s.stateSummaryCache.delete([32]byte(root)) | ||
Comment on lines
+271
to
+274
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might not be very important to fix, but if the transaction fails these deletions will not roll back. Maybe it would be better to store a slice of roots and then clear caches after the transaction completes? The downside of this is allocating the slice. |
||
} | ||
|
||
// Delete slot indices of a block root | ||
if err = tx.Bucket(blockSlotIndicesBucket).Delete(bytesutil.SlotToBytesBigEndian(slot)); err != nil { | ||
return errors.Wrap(err, "could not delete block slot index") | ||
} | ||
} | ||
|
||
return nil | ||
}) | ||
} | ||
|
||
// SaveBlock to the db. | ||
func (s *Store) SaveBlock(ctx context.Context, signed interfaces.ReadOnlySignedBeaconBlock) error { | ||
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveBlock") | ||
|
@@ -590,26 +636,26 @@ func (s *Store) SaveRegistrationsByValidatorIDs(ctx context.Context, ids []primi | |
} | ||
|
||
// blockRootsByFilter retrieves the block roots given the filter criteria. | ||
func blockRootsByFilter(ctx context.Context, tx *bolt.Tx, f *filters.QueryFilter) ([][]byte, error) { | ||
func blockRootsByFilter(ctx context.Context, tx *bolt.Tx, f *filters.QueryFilter) ([][]byte, map[primitives.Slot][][]byte, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems a bit wasteful to return the roots in a slice and map. Can you make this work with just the map? |
||
ctx, span := trace.StartSpan(ctx, "BeaconDB.blockRootsByFilter") | ||
defer span.End() | ||
|
||
// If no filter criteria are specified, return an error. | ||
if f == nil { | ||
return nil, errors.New("must specify a filter criteria for retrieving blocks") | ||
return nil, nil, errors.New("must specify a filter criteria for retrieving blocks") | ||
} | ||
|
||
// Creates a list of indices from the passed in filter values, such as: | ||
// []byte("0x2093923") in the parent root indices bucket to be used for looking up | ||
// block roots that were stored under each of those indices for O(1) lookup. | ||
indicesByBucket, err := createBlockIndicesFromFilters(ctx, f) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "could not determine lookup indices") | ||
return nil, nil, errors.Wrap(err, "could not determine lookup indices") | ||
} | ||
|
||
// We retrieve block roots that match a filter criteria of slot ranges, if specified. | ||
filtersMap := f.Filters() | ||
rootsBySlotRange, err := blockRootsBySlotRange( | ||
rootsBySlotRange, rootsBySlot, err := blockRootsBySlotRange( | ||
ctx, | ||
tx.Bucket(blockSlotIndicesBucket), | ||
filtersMap[filters.StartSlot], | ||
|
@@ -619,7 +665,7 @@ func blockRootsByFilter(ctx context.Context, tx *bolt.Tx, f *filters.QueryFilter | |
filtersMap[filters.SlotStep], | ||
) | ||
if err != nil { | ||
return nil, err | ||
return nil, nil, err | ||
} | ||
|
||
// Once we have a list of block roots that correspond to each | ||
|
@@ -643,7 +689,7 @@ func blockRootsByFilter(ctx context.Context, tx *bolt.Tx, f *filters.QueryFilter | |
} | ||
} | ||
|
||
return keys, nil | ||
return keys, rootsBySlot, nil | ||
} | ||
|
||
// blockRootsBySlotRange looks into a boltDB bucket and performs a binary search | ||
|
@@ -653,13 +699,13 @@ func blockRootsBySlotRange( | |
ctx context.Context, | ||
bkt *bolt.Bucket, | ||
startSlotEncoded, endSlotEncoded, startEpochEncoded, endEpochEncoded, slotStepEncoded interface{}, | ||
) ([][]byte, error) { | ||
) ([][]byte, map[primitives.Slot][][]byte, error) { | ||
_, span := trace.StartSpan(ctx, "BeaconDB.blockRootsBySlotRange") | ||
defer span.End() | ||
|
||
// Return nothing when all slot parameters are missing | ||
if startSlotEncoded == nil && endSlotEncoded == nil && startEpochEncoded == nil && endEpochEncoded == nil { | ||
return [][]byte{}, nil | ||
return [][]byte{}, nil, nil | ||
} | ||
|
||
var startSlot, endSlot primitives.Slot | ||
|
@@ -680,11 +726,11 @@ func blockRootsBySlotRange( | |
if startEpochOk && endEpochOk { | ||
startSlot, err = slots.EpochStart(startEpoch) | ||
if err != nil { | ||
return nil, err | ||
return nil, nil, err | ||
} | ||
endSlot, err = slots.EpochStart(endEpoch) | ||
if err != nil { | ||
return nil, err | ||
return nil, nil, err | ||
} | ||
endSlot = endSlot + params.BeaconConfig().SlotsPerEpoch - 1 | ||
} | ||
|
@@ -695,10 +741,11 @@ func blockRootsBySlotRange( | |
return key != nil && bytes.Compare(key, max) <= 0 | ||
} | ||
if endSlot < startSlot { | ||
return nil, errInvalidSlotRange | ||
return nil, nil, errInvalidSlotRange | ||
} | ||
rootsRange := endSlot.SubSlot(startSlot).Div(step) | ||
roots := make([][]byte, 0, rootsRange) | ||
rootsBySlot := make(map[primitives.Slot][][]byte) | ||
c := bkt.Cursor() | ||
for k, v := c.Seek(min); conditional(k, max); k, v = c.Next() { | ||
if step > 1 { | ||
|
@@ -713,8 +760,9 @@ func blockRootsBySlotRange( | |
splitRoots = append(splitRoots, v[i:i+32]) | ||
} | ||
roots = append(roots, splitRoots...) | ||
rootsBySlot[bytesutil.BytesToSlotBigEndian(k)] = roots | ||
} | ||
return roots, nil | ||
return roots, rootsBySlot, nil | ||
} | ||
|
||
// blockRootsBySlot retrieves the block roots by slot | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
load("@prysm//tools/go:def.bzl", "go_library", "go_test") | ||
|
||
go_library( | ||
name = "go_default_library", | ||
srcs = ["pruner.go"], | ||
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/pruner", | ||
visibility = [ | ||
"//beacon-chain:__subpackages__", | ||
], | ||
deps = [ | ||
"//beacon-chain/core/helpers:go_default_library", | ||
"//beacon-chain/db:go_default_library", | ||
"//beacon-chain/db/iface:go_default_library", | ||
"//config/params:go_default_library", | ||
"//consensus-types/primitives:go_default_library", | ||
"//time/slots:go_default_library", | ||
"@com_github_pkg_errors//:go_default_library", | ||
"@com_github_sirupsen_logrus//:go_default_library", | ||
], | ||
) | ||
|
||
go_test( | ||
name = "go_default_test", | ||
srcs = ["pruner_test.go"], | ||
embed = [":go_default_library"], | ||
deps = [ | ||
"//beacon-chain/db/testing:go_default_library", | ||
"//config/params:go_default_library", | ||
"//consensus-types/blocks:go_default_library", | ||
"//consensus-types/primitives:go_default_library", | ||
"//proto/prysm/v1alpha1:go_default_library", | ||
"//testing/require:go_default_library", | ||
"//testing/util:go_default_library", | ||
"//time/slots/testing:go_default_library", | ||
"@com_github_sirupsen_logrus//:go_default_library", | ||
"@com_github_sirupsen_logrus//hooks/test:go_default_library", | ||
], | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are not deleting everything you should. Looking at bucket names, there are also:
checkpointBucket
(if this bucket stores what the name suggests, then if we remove an old checkpoint block, this bucket will still contain the checkpoint)stateValidatorsBucket
(I think this one is very important because validators are a big chunk of the state)blockParentRootIndicesBucket
stateSlotIndicesBucket
finalizedBlockRootsIndexBucket
blockRootValidatorHashesBucket
originCheckpointBlockRootKey
(I don't know the implications of removing this, it's best to ask @kasey about it. The bucket is used in a few places, but after the first pruning the origin checkpoint will "move". Shouldn't we update the value in the bucket accordingly?)