Skip to content

Commit

Permalink
Merge branch 'master' into add-stopopcode-flatcalltracer
Browse files Browse the repository at this point in the history
  • Loading branch information
ganeshvanahalli authored Jun 17, 2024
2 parents abd6c47 + 8516902 commit b466858
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 20 deletions.
69 changes: 50 additions & 19 deletions core/state/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"math"
"os"
"path/filepath"
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -38,6 +38,7 @@ import (
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/trie/triedb/hashdb"
)

const (
Expand All @@ -56,8 +57,10 @@ const (

// Config includes all the configurations for pruning.
type Config struct {
Datadir string // The directory of the state database
BloomSize uint64 // The Megabytes of memory allocated to bloom-filter
Datadir string // The directory of the state database
BloomSize uint64 // The Megabytes of memory allocated to bloom-filter
Threads int // The maximum number of threads spawned in dumpRawTrieDescendants and removeOtherRoots
CleanCacheSize int // The Megabytes of clean cache size used in dumpRawTrieDescendants
}

// Pruner is an offline tool to prune the stale state with the
Expand Down Expand Up @@ -107,6 +110,10 @@ func NewPruner(db ethdb.Database, config Config) (*Pruner, error) {
if err != nil {
return nil, err
}
// sanitize threads number, if set too low
if config.Threads <= 0 {
config.Threads = 1
}
return &Pruner{
config: config,
chainHeader: headBlock.Header(),
Expand All @@ -124,7 +131,7 @@ func readStoredChainConfig(db ethdb.Database) *params.ChainConfig {
return rawdb.ReadChainConfig(db, block0Hash)
}

func removeOtherRoots(db ethdb.Database, rootsList []common.Hash, stateBloom *stateBloom) error {
func removeOtherRoots(db ethdb.Database, rootsList []common.Hash, stateBloom *stateBloom, threads int) error {
chainConfig := readStoredChainConfig(db)
var genesisBlockNum uint64
if chainConfig != nil {
Expand All @@ -139,7 +146,6 @@ func removeOtherRoots(db ethdb.Database, rootsList []common.Hash, stateBloom *st
return errors.New("failed to load head block")
}
blockRange := headBlock.NumberU64() - genesisBlockNum
threads := runtime.NumCPU()
var wg sync.WaitGroup
errors := make(chan error, threads)
for thread := 0; thread < threads; thread++ {
Expand Down Expand Up @@ -207,7 +213,7 @@ func removeOtherRoots(db ethdb.Database, rootsList []common.Hash, stateBloom *st
}

// Arbitrum: snaptree and root are for the final snapshot kept
func prune(snaptree *snapshot.Tree, allRoots []common.Hash, maindb ethdb.Database, stateBloom *stateBloom, bloomPath string, start time.Time) error {
func prune(snaptree *snapshot.Tree, allRoots []common.Hash, maindb ethdb.Database, stateBloom *stateBloom, bloomPath string, start time.Time, threads int) error {
// Delete all stale trie nodes in the disk. With the help of state bloom
// the trie nodes(and codes) belong to the active state will be filtered
// out. A very small part of stale tries will also be filtered because of
Expand Down Expand Up @@ -297,7 +303,7 @@ func prune(snaptree *snapshot.Tree, allRoots []common.Hash, maindb ethdb.Databas
}

// Clean up any false positives that are top-level state roots.
err := removeOtherRoots(maindb, allRoots, stateBloom)
err := removeOtherRoots(maindb, allRoots, stateBloom, threads)
if err != nil {
return err
}
Expand Down Expand Up @@ -333,8 +339,16 @@ func prune(snaptree *snapshot.Tree, allRoots []common.Hash, maindb ethdb.Databas
}

// We assume state blooms do not need the value, only the key
func dumpRawTrieDescendants(db ethdb.Database, root common.Hash, output *stateBloom) error {
sdb := state.NewDatabase(db)
func dumpRawTrieDescendants(db ethdb.Database, root common.Hash, output *stateBloom, config *Config) error {
// Offline pruning is only supported in legacy hash based scheme.
hashConfig := *hashdb.Defaults
hashConfig.CleanCacheSize = config.CleanCacheSize * 1024 * 1024
trieConfig := &trie.Config{
Preimages: false,
HashDB: &hashConfig,
}
sdb := state.NewDatabaseWithConfig(db, trieConfig)
defer sdb.TrieDB().Close()
tr, err := sdb.OpenTrie(root)
if err != nil {
return err
Expand All @@ -350,11 +364,12 @@ func dumpRawTrieDescendants(db ethdb.Database, root common.Hash, output *stateBl
// To do so, we create a semaphore out of a channel's buffer.
// Before launching a new goroutine, we acquire the semaphore by taking an entry from this channel.
// This channel doubles as a mechanism for the background goroutine to report an error on release.
threads := runtime.NumCPU()
threads := config.Threads
results := make(chan error, threads)
for i := 0; i < threads; i++ {
results <- nil
}
var threadsRunning atomic.Int32

for accountIt.Next(true) {
accountTrieHash := accountIt.Hash()
Expand Down Expand Up @@ -385,7 +400,10 @@ func dumpRawTrieDescendants(db ethdb.Database, root common.Hash, output *stateBl
output.Put(data.CodeHash, nil)
}
if data.Root != (common.Hash{}) {
storageTr, err := trie.NewStateTrie(trie.StorageTrieID(root, key, data.Root), sdb.TrieDB())
// note: we are passing data.Root as stateRoot here, to skip the check for stateRoot existence in trie.newTrieReader,
// we already check that when opening state trie and reading the account node
trieID := trie.StorageTrieID(data.Root, key, data.Root)
storageTr, err := trie.NewStateTrie(trieID, sdb.TrieDB())
if err != nil {
return err
}
Expand All @@ -394,14 +412,20 @@ func dumpRawTrieDescendants(db ethdb.Database, root common.Hash, output *stateBl
return err
}
go func() {
threadsRunning.Add(1)
defer threadsRunning.Add(-1)
var err error
defer func() {
results <- err
}()
threadStartedAt := time.Now()
threadLastLog := time.Now()

storageIt, err := storageTr.NodeIterator(nil)
if err != nil {
return
}
var processedNodes uint64
for storageIt.Next(true) {
storageTrieHash := storageIt.Hash()
if storageTrieHash != (common.Hash{}) {
Expand All @@ -411,6 +435,13 @@ func dumpRawTrieDescendants(db ethdb.Database, root common.Hash, output *stateBl
return
}
}
processedNodes++
if time.Since(threadLastLog) > 5*time.Minute {
elapsedTotal := time.Since(startedAt)
elapsedThread := time.Since(threadStartedAt)
log.Info("traversing trie database - traversing storage trie taking long", "key", key, "elapsedTotal", elapsedTotal, "elapsedThread", elapsedThread, "processedNodes", processedNodes, "threadsRunning", threadsRunning.Load())
threadLastLog = time.Now()
}
}
err = storageIt.Error()
if err != nil {
Expand Down Expand Up @@ -445,7 +476,7 @@ func (p *Pruner) Prune(inputRoots []common.Hash) error {
return err
}
if bloomExists {
return RecoverPruning(p.config.Datadir, p.db)
return RecoverPruning(p.config.Datadir, p.db, p.config.Threads)
}
// Retrieve all snapshot layers from the current HEAD.
// In theory there are 128 difflayers + 1 disk layer present,
Expand Down Expand Up @@ -511,14 +542,14 @@ func (p *Pruner) Prune(inputRoots []common.Hash) error {
return err
}
} else {
if err := dumpRawTrieDescendants(p.db, root, p.stateBloom); err != nil {
if err := dumpRawTrieDescendants(p.db, root, p.stateBloom, &p.config); err != nil {
return err
}
}
}
// Traverse the genesis, put all genesis state entries into the
// bloom filter too.
if err := extractGenesis(p.db, p.stateBloom); err != nil {
if err := extractGenesis(p.db, p.stateBloom, &p.config); err != nil {
return err
}

Expand All @@ -529,7 +560,7 @@ func (p *Pruner) Prune(inputRoots []common.Hash) error {
return err
}
log.Info("State bloom filter committed", "name", filterName, "roots", roots)
return prune(p.snaptree, roots, p.db, p.stateBloom, filterName, start)
return prune(p.snaptree, roots, p.db, p.stateBloom, filterName, start, p.config.Threads)
}

// RecoverPruning will resume the pruning procedure during the system restart.
Expand All @@ -539,7 +570,7 @@ func (p *Pruner) Prune(inputRoots []common.Hash) error {
// pruning can be resumed. What's more if the bloom filter is constructed, the
// pruning **has to be resumed**. Otherwise a lot of dangling nodes may be left
// in the disk.
func RecoverPruning(datadir string, db ethdb.Database) error {
func RecoverPruning(datadir string, db ethdb.Database, threads int) error {
exists, err := bloomFilterExists(datadir)
if err != nil {
return err
Expand Down Expand Up @@ -578,12 +609,12 @@ func RecoverPruning(datadir string, db ethdb.Database) error {
}
log.Info("Loaded state bloom filter", "path", stateBloomPath, "roots", stateBloomRoots)

return prune(snaptree, stateBloomRoots, db, stateBloom, stateBloomPath, time.Now())
return prune(snaptree, stateBloomRoots, db, stateBloom, stateBloomPath, time.Now(), threads)
}

// extractGenesis loads the genesis state and commits all the state entries
// into the given bloomfilter.
func extractGenesis(db ethdb.Database, stateBloom *stateBloom) error {
func extractGenesis(db ethdb.Database, stateBloom *stateBloom, config *Config) error {
genesisHash := rawdb.ReadCanonicalHash(db, 0)
if genesisHash == (common.Hash{}) {
return errors.New("missing genesis hash")
Expand All @@ -593,7 +624,7 @@ func extractGenesis(db ethdb.Database, stateBloom *stateBloom) error {
return errors.New("missing genesis block")
}

return dumpRawTrieDescendants(db, genesis.Root(), stateBloom)
return dumpRawTrieDescendants(db, genesis.Root(), stateBloom, config)
}

func bloomFilterPath(datadir string) string {
Expand Down
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
}
// Try to recover offline state pruning only in hash-based.
if scheme == rawdb.HashScheme {
if err := pruner.RecoverPruning(stack.ResolvePath(""), chainDb); err != nil {
if err := pruner.RecoverPruning(stack.ResolvePath(""), chainDb, 1); err != nil {
log.Error("Failed to recover state", "error", err)
}
}
Expand Down

0 comments on commit b466858

Please sign in to comment.