diff --git a/core/state/pruner/pruner.go b/core/state/pruner/pruner.go index 3604f6c94c..ce53715f27 100644 --- a/core/state/pruner/pruner.go +++ b/core/state/pruner/pruner.go @@ -24,8 +24,8 @@ import ( "math" "os" "path/filepath" - "runtime" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -38,6 +38,7 @@ import ( "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" + "github.com/ethereum/go-ethereum/trie/triedb/hashdb" ) const ( @@ -56,8 +57,10 @@ const ( // Config includes all the configurations for pruning. type Config struct { - Datadir string // The directory of the state database - BloomSize uint64 // The Megabytes of memory allocated to bloom-filter + Datadir string // The directory of the state database + BloomSize uint64 // The Megabytes of memory allocated to bloom-filter + Threads int // The maximum number of threads spawned in dumpRawTrieDescendants and removeOtherRoots + CleanCacheSize int // The Megabytes of clean cache size used in dumpRawTrieDescendants } // Pruner is an offline tool to prune the stale state with the @@ -107,6 +110,10 @@ func NewPruner(db ethdb.Database, config Config) (*Pruner, error) { if err != nil { return nil, err } + // sanitize threads number, if set too low + if config.Threads <= 0 { + config.Threads = 1 + } return &Pruner{ config: config, chainHeader: headBlock.Header(), @@ -124,7 +131,7 @@ func readStoredChainConfig(db ethdb.Database) *params.ChainConfig { return rawdb.ReadChainConfig(db, block0Hash) } -func removeOtherRoots(db ethdb.Database, rootsList []common.Hash, stateBloom *stateBloom) error { +func removeOtherRoots(db ethdb.Database, rootsList []common.Hash, stateBloom *stateBloom, threads int) error { chainConfig := readStoredChainConfig(db) var genesisBlockNum uint64 if chainConfig != nil { @@ -139,7 +146,6 @@ func removeOtherRoots(db ethdb.Database, rootsList []common.Hash, stateBloom *st return errors.New("failed to load head block") } blockRange := headBlock.NumberU64() - genesisBlockNum - threads := runtime.NumCPU() var wg sync.WaitGroup errors := make(chan error, threads) for thread := 0; thread < threads; thread++ { @@ -207,7 +213,7 @@ func removeOtherRoots(db ethdb.Database, rootsList []common.Hash, stateBloom *st } // Arbitrum: snaptree and root are for the final snapshot kept -func prune(snaptree *snapshot.Tree, allRoots []common.Hash, maindb ethdb.Database, stateBloom *stateBloom, bloomPath string, start time.Time) error { +func prune(snaptree *snapshot.Tree, allRoots []common.Hash, maindb ethdb.Database, stateBloom *stateBloom, bloomPath string, start time.Time, threads int) error { // Delete all stale trie nodes in the disk. With the help of state bloom // the trie nodes(and codes) belong to the active state will be filtered // out. A very small part of stale tries will also be filtered because of @@ -297,7 +303,7 @@ func prune(snaptree *snapshot.Tree, allRoots []common.Hash, maindb ethdb.Databas } // Clean up any false positives that are top-level state roots. - err := removeOtherRoots(maindb, allRoots, stateBloom) + err := removeOtherRoots(maindb, allRoots, stateBloom, threads) if err != nil { return err } @@ -333,8 +339,16 @@ func prune(snaptree *snapshot.Tree, allRoots []common.Hash, maindb ethdb.Databas } // We assume state blooms do not need the value, only the key -func dumpRawTrieDescendants(db ethdb.Database, root common.Hash, output *stateBloom) error { - sdb := state.NewDatabase(db) +func dumpRawTrieDescendants(db ethdb.Database, root common.Hash, output *stateBloom, config *Config) error { + // Offline pruning is only supported in legacy hash based scheme. + hashConfig := *hashdb.Defaults + hashConfig.CleanCacheSize = config.CleanCacheSize * 1024 * 1024 + trieConfig := &trie.Config{ + Preimages: false, + HashDB: &hashConfig, + } + sdb := state.NewDatabaseWithConfig(db, trieConfig) + defer sdb.TrieDB().Close() tr, err := sdb.OpenTrie(root) if err != nil { return err @@ -350,11 +364,12 @@ func dumpRawTrieDescendants(db ethdb.Database, root common.Hash, output *stateBl // To do so, we create a semaphore out of a channel's buffer. // Before launching a new goroutine, we acquire the semaphore by taking an entry from this channel. // This channel doubles as a mechanism for the background goroutine to report an error on release. - threads := runtime.NumCPU() + threads := config.Threads results := make(chan error, threads) for i := 0; i < threads; i++ { results <- nil } + var threadsRunning atomic.Int32 for accountIt.Next(true) { accountTrieHash := accountIt.Hash() @@ -385,7 +400,10 @@ func dumpRawTrieDescendants(db ethdb.Database, root common.Hash, output *stateBl output.Put(data.CodeHash, nil) } if data.Root != (common.Hash{}) { - storageTr, err := trie.NewStateTrie(trie.StorageTrieID(root, key, data.Root), sdb.TrieDB()) + // note: we are passing data.Root as stateRoot here, to skip the check for stateRoot existence in trie.newTrieReader, + // we already check that when opening state trie and reading the account node + trieID := trie.StorageTrieID(data.Root, key, data.Root) + storageTr, err := trie.NewStateTrie(trieID, sdb.TrieDB()) if err != nil { return err } @@ -394,14 +412,20 @@ func dumpRawTrieDescendants(db ethdb.Database, root common.Hash, output *stateBl return err } go func() { + threadsRunning.Add(1) + defer threadsRunning.Add(-1) var err error defer func() { results <- err }() + threadStartedAt := time.Now() + threadLastLog := time.Now() + storageIt, err := storageTr.NodeIterator(nil) if err != nil { return } + var processedNodes uint64 for storageIt.Next(true) { storageTrieHash := storageIt.Hash() if storageTrieHash != (common.Hash{}) { @@ -411,6 +435,13 @@ func dumpRawTrieDescendants(db ethdb.Database, root common.Hash, output *stateBl return } } + processedNodes++ + if time.Since(threadLastLog) > 5*time.Minute { + elapsedTotal := time.Since(startedAt) + elapsedThread := time.Since(threadStartedAt) + log.Info("traversing trie database - traversing storage trie taking long", "key", key, "elapsedTotal", elapsedTotal, "elapsedThread", elapsedThread, "processedNodes", processedNodes, "threadsRunning", threadsRunning.Load()) + threadLastLog = time.Now() + } } err = storageIt.Error() if err != nil { @@ -445,7 +476,7 @@ func (p *Pruner) Prune(inputRoots []common.Hash) error { return err } if bloomExists { - return RecoverPruning(p.config.Datadir, p.db) + return RecoverPruning(p.config.Datadir, p.db, p.config.Threads) } // Retrieve all snapshot layers from the current HEAD. // In theory there are 128 difflayers + 1 disk layer present, @@ -511,14 +542,14 @@ func (p *Pruner) Prune(inputRoots []common.Hash) error { return err } } else { - if err := dumpRawTrieDescendants(p.db, root, p.stateBloom); err != nil { + if err := dumpRawTrieDescendants(p.db, root, p.stateBloom, &p.config); err != nil { return err } } } // Traverse the genesis, put all genesis state entries into the // bloom filter too. - if err := extractGenesis(p.db, p.stateBloom); err != nil { + if err := extractGenesis(p.db, p.stateBloom, &p.config); err != nil { return err } @@ -529,7 +560,7 @@ func (p *Pruner) Prune(inputRoots []common.Hash) error { return err } log.Info("State bloom filter committed", "name", filterName, "roots", roots) - return prune(p.snaptree, roots, p.db, p.stateBloom, filterName, start) + return prune(p.snaptree, roots, p.db, p.stateBloom, filterName, start, p.config.Threads) } // RecoverPruning will resume the pruning procedure during the system restart. @@ -539,7 +570,7 @@ func (p *Pruner) Prune(inputRoots []common.Hash) error { // pruning can be resumed. What's more if the bloom filter is constructed, the // pruning **has to be resumed**. Otherwise a lot of dangling nodes may be left // in the disk. -func RecoverPruning(datadir string, db ethdb.Database) error { +func RecoverPruning(datadir string, db ethdb.Database, threads int) error { exists, err := bloomFilterExists(datadir) if err != nil { return err @@ -578,12 +609,12 @@ func RecoverPruning(datadir string, db ethdb.Database) error { } log.Info("Loaded state bloom filter", "path", stateBloomPath, "roots", stateBloomRoots) - return prune(snaptree, stateBloomRoots, db, stateBloom, stateBloomPath, time.Now()) + return prune(snaptree, stateBloomRoots, db, stateBloom, stateBloomPath, time.Now(), threads) } // extractGenesis loads the genesis state and commits all the state entries // into the given bloomfilter. -func extractGenesis(db ethdb.Database, stateBloom *stateBloom) error { +func extractGenesis(db ethdb.Database, stateBloom *stateBloom, config *Config) error { genesisHash := rawdb.ReadCanonicalHash(db, 0) if genesisHash == (common.Hash{}) { return errors.New("missing genesis hash") @@ -593,7 +624,7 @@ func extractGenesis(db ethdb.Database, stateBloom *stateBloom) error { return errors.New("missing genesis block") } - return dumpRawTrieDescendants(db, genesis.Root(), stateBloom) + return dumpRawTrieDescendants(db, genesis.Root(), stateBloom, config) } func bloomFilterPath(datadir string) string { diff --git a/eth/backend.go b/eth/backend.go index a8046dacd7..924aa71de1 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -140,7 +140,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { } // Try to recover offline state pruning only in hash-based. if scheme == rawdb.HashScheme { - if err := pruner.RecoverPruning(stack.ResolvePath(""), chainDb); err != nil { + if err := pruner.RecoverPruning(stack.ResolvePath(""), chainDb, 1); err != nil { log.Error("Failed to recover state", "error", err) } }