From 660c03975dedc4ec3e36c3f5835db00165b6fc59 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Mon, 29 Jul 2024 16:05:49 +0530 Subject: [PATCH] merge blockchain.go --- core/blockchain.go | 388 ++++++++++++++++++++------------------------- 1 file changed, 169 insertions(+), 219 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 8cf4d00d1a..7172475270 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -38,6 +38,7 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state/snapshot" + "github.com/ethereum/go-ethereum/core/tracing" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" @@ -51,7 +52,6 @@ import ( "github.com/ethereum/go-ethereum/triedb" "github.com/ethereum/go-ethereum/triedb/hashdb" "github.com/ethereum/go-ethereum/triedb/pathdb" - "golang.org/x/exp/slices" ) var ( @@ -63,26 +63,26 @@ var ( chainInfoGauge = metrics.NewRegisteredGaugeInfo("chain/info", nil) - accountReadTimer = metrics.NewRegisteredTimer("chain/account/reads", nil) - accountHashTimer = metrics.NewRegisteredTimer("chain/account/hashes", nil) - accountUpdateTimer = metrics.NewRegisteredTimer("chain/account/updates", nil) - accountCommitTimer = metrics.NewRegisteredTimer("chain/account/commits", nil) + accountReadTimer = metrics.NewRegisteredResettingTimer("chain/account/reads", nil) + accountHashTimer = metrics.NewRegisteredResettingTimer("chain/account/hashes", nil) + accountUpdateTimer = metrics.NewRegisteredResettingTimer("chain/account/updates", nil) + accountCommitTimer = metrics.NewRegisteredResettingTimer("chain/account/commits", nil) - storageReadTimer = metrics.NewRegisteredTimer("chain/storage/reads", nil) - storageHashTimer = metrics.NewRegisteredTimer("chain/storage/hashes", nil) - storageUpdateTimer = metrics.NewRegisteredTimer("chain/storage/updates", nil) - storageCommitTimer = metrics.NewRegisteredTimer("chain/storage/commits", nil) + storageReadTimer = metrics.NewRegisteredResettingTimer("chain/storage/reads", nil) + storageHashTimer = metrics.NewRegisteredResettingTimer("chain/storage/hashes", nil) + storageUpdateTimer = metrics.NewRegisteredResettingTimer("chain/storage/updates", nil) + storageCommitTimer = metrics.NewRegisteredResettingTimer("chain/storage/commits", nil) - snapshotAccountReadTimer = metrics.NewRegisteredTimer("chain/snapshot/account/reads", nil) - snapshotStorageReadTimer = metrics.NewRegisteredTimer("chain/snapshot/storage/reads", nil) - snapshotCommitTimer = metrics.NewRegisteredTimer("chain/snapshot/commits", nil) + snapshotAccountReadTimer = metrics.NewRegisteredResettingTimer("chain/snapshot/account/reads", nil) + snapshotStorageReadTimer = metrics.NewRegisteredResettingTimer("chain/snapshot/storage/reads", nil) + snapshotCommitTimer = metrics.NewRegisteredResettingTimer("chain/snapshot/commits", nil) - triedbCommitTimer = metrics.NewRegisteredTimer("chain/triedb/commits", nil) + triedbCommitTimer = metrics.NewRegisteredResettingTimer("chain/triedb/commits", nil) - blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil) - blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil) - blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil) - blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil) + blockInsertTimer = metrics.NewRegisteredResettingTimer("chain/inserts", nil) + blockValidationTimer = metrics.NewRegisteredResettingTimer("chain/validation", nil) + blockExecutionTimer = metrics.NewRegisteredResettingTimer("chain/execution", nil) + blockWriteTimer = metrics.NewRegisteredResettingTimer("chain/write", nil) blockReorgMeter = metrics.NewRegisteredMeter("chain/reorg/executes", nil) blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil) @@ -102,8 +102,6 @@ const ( blockCacheLimit = 256 receiptsCacheLimit = 32 txLookupCacheLimit = 1024 - maxFutureBlocks = 256 - maxTimeFutureBlocks = 30 DefaultTriesInMemory = 128 // BlockChainVersion ensures that an incompatible database forces a resync from scratch. @@ -159,13 +157,16 @@ type CacheConfig struct { } // arbitrum: exposing CacheConfig.triedbConfig to be used by Nitro when initializing arbos in database -func (c *CacheConfig) TriedbConfig() *triedb.Config { - return c.triedbConfig() +func (c *CacheConfig) TriedbConfig(sVerkle bool) *triedb.Config { + return c.triedbConfig(sVerkle) } // triedbConfig derives the configures for trie database. -func (c *CacheConfig) triedbConfig() *triedb.Config { - config := &triedb.Config{Preimages: c.Preimages} +func (c *CacheConfig) triedbConfig(isVerkle bool) *triedb.Config { + config := &triedb.Config{ + Preimages: c.Preimages, + IsVerkle: isVerkle, + } if c.StateScheme == rawdb.HashScheme { config.HashDB = &hashdb.Config{ CleanCacheSize: c.TrieCleanLimit * 1024 * 1024, @@ -265,10 +266,9 @@ type BlockChain struct { bodyRLPCache *lru.Cache[common.Hash, rlp.RawValue] receiptsCache *lru.Cache[common.Hash, []*types.Receipt] blockCache *lru.Cache[common.Hash, *types.Block] - txLookupCache *lru.Cache[common.Hash, txLookup] - // future blocks are blocks added for later processing - futureBlocks *lru.Cache[common.Hash, *types.Block] + txLookupLock sync.RWMutex + txLookupCache *lru.Cache[common.Hash, txLookup] wg sync.WaitGroup quit chan struct{} // shutdown signal, closed in Stop. @@ -281,6 +281,7 @@ type BlockChain struct { processor Processor // Block transaction processor interface forker *ForkChoice vmConfig vm.Config + logger *tracing.Hooks numberOfBlocksToSkipStateSaving uint32 amountOfGasInBlocksToSkipStateSaving uint64 @@ -299,7 +300,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par cacheConfig = defaultCacheConfig } // Open trie database with provided config - triedb := triedb.NewDatabase(db, cacheConfig.triedbConfig()) + triedb := triedb.NewDatabase(db, cacheConfig.triedbConfig(genesis != nil && genesis.IsVerkle())) var genesisHash common.Hash var genesisErr error @@ -339,9 +340,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit), blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit), txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit), - futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks), engine: engine, vmConfig: vmConfig, + logger: vmConfig.Tracer, } bc.flushInterval.Store(int64(cacheConfig.TrieTimeLimit)) bc.forker = NewForkChoice(bc, shouldPreserve) @@ -459,19 +460,19 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par // it in advance. bc.engine.VerifyHeader(bc, bc.CurrentHeader()) - // Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain - for hash := range BadHashes { - if header := bc.GetHeaderByHash(hash); header != nil { - // get the canonical block corresponding to the offending header's number - headerByNumber := bc.GetHeaderByNumber(header.Number.Uint64()) - // make sure the headerByNumber (if present) is in our current canonical chain - if headerByNumber != nil && headerByNumber.Hash() == header.Hash() { - log.Error("Found bad hash, rewinding chain", "number", header.Number, "hash", header.ParentHash) - if err := bc.SetHead(header.Number.Uint64() - 1); err != nil { - return nil, err - } - log.Error("Chain rewind was successful, resuming normal operation") + if bc.logger != nil && bc.logger.OnBlockchainInit != nil { + bc.logger.OnBlockchainInit(chainConfig) + } + if bc.logger != nil && bc.logger.OnGenesisBlock != nil { + if block := bc.CurrentBlock(); block.Number.Uint64() == 0 { + alloc, err := getGenesisState(bc.db, block.Hash()) + if err != nil { + return nil, fmt.Errorf("failed to get genesis state: %w", err) + } + if alloc == nil { + return nil, errors.New("live blockchain tracer requires genesis alloc to be set") } + bc.logger.OnGenesisBlock(bc.genesisBlock, alloc) } } @@ -496,11 +497,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } bc.snaps, _ = snapshot.New(snapconfig, bc.db, bc.triedb, head.Root) } - - // Start future block processor. - bc.wg.Add(1) - go bc.updateFutureBlocks() - // Rewind the chain in case of an incompatible config upgrade. if compat, ok := genesisErr.(*params.ConfigCompatError); ok { log.Warn("Rewinding chain to upgrade configuration", "err", compat) @@ -511,6 +507,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } rawdb.WriteChainConfig(db, genesisHash, chainConfig) } + // Start tx indexer if it's enabled. if txLookupLimit != nil { bc.txIndexer = newTxIndexer(*txLookupLimit, bc) @@ -699,10 +696,9 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha var blockNumber uint64 // (no root == always 0) var rootFound bool - // Retrieve the last pivot block to short circuit rollbacks beyond it and the - // current freezer limit to start nuking id underflown + // Retrieve the last pivot block to short circuit rollbacks beyond it + // and the current freezer limit to start nuking it's underflown. pivot := rawdb.ReadLastPivotNumber(bc.db) - frozen, _ := bc.db.Ancients() updateFn := func(db ethdb.KeyValueWriter, header *types.Header) (*types.Header, bool) { // Rewind the blockchain, ensuring we don't end up with a stateless head @@ -785,7 +781,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha // last step, however the direction of SetHead is from high // to low, so it's safe to update in-memory markers directly. bc.currentBlock.Store(newHeadBlock.Header()) - headBlockGauge.Update(int64(newHeadBlock.NumberU64())) + headBlockGauge.Update(newHeadBlock.Number().Int64()) // The head state is missing, which is only possible in the path-based // scheme. This situation occurs when the chain head is rewound below @@ -793,7 +789,10 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha // approach except for rerunning a snap sync. Do nothing here until the // state syncer picks it up. if !bc.HasState(newHeadBlock.Root()) { - log.Info("Chain is stateless, wait state sync", "number", newHeadBlock.Number(), "hash", newHeadBlock.Hash()) + if newHeadBlock.Number().Uint64() != 0 { + log.Crit("Chain is stateless at a non-genesis block") + } + log.Info("Chain is stateless, wait state sync", "number", newHeadBlock.Number, "hash", newHeadBlock.Hash()) } } // Rewind the snap block in a simpleton way to the target head @@ -820,6 +819,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha // intent afterwards is full block importing, delete the chain segment // between the stateful-block and the sethead target. var wipe bool + frozen, _ := bc.db.Ancients() if headNumber+1 < frozen { wipe = pivot == nil || headNumber >= *pivot } @@ -850,7 +850,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha // touching the header chain altogether, unless the freezer is broken if repair { if target, force := updateFn(bc.db, bc.CurrentBlock()); force { - bc.hc.SetHead(target.Number.Uint64(), updateFn, delFn) + bc.hc.SetHead(target.Number.Uint64(), nil, delFn) } } else { // Rewind the chain to the requested head and keep going backwards until a @@ -869,7 +869,6 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha bc.receiptsCache.Purge() bc.blockCache.Purge() bc.txLookupCache.Purge() - bc.futureBlocks.Purge() // Clear safe block, finalized block if needed if safe := bc.CurrentSafeBlock(); safe != nil && head < safe.Number.Uint64() { @@ -1115,6 +1114,10 @@ func (bc *BlockChain) Stop() { } } } + // Allow tracers to clean-up and release resources. + if bc.logger != nil && bc.logger.OnClose != nil { + bc.logger.OnClose() + } // Close the trie database, release all the held resources as the last step. if err := bc.triedb.Close(); err != nil { log.Error("Failed to close trie db", "err", err) @@ -1134,24 +1137,6 @@ func (bc *BlockChain) insertStopped() bool { return bc.procInterrupt.Load() } -func (bc *BlockChain) procFutureBlocks() { - blocks := make([]*types.Block, 0, bc.futureBlocks.Len()) - for _, hash := range bc.futureBlocks.Keys() { - if block, exist := bc.futureBlocks.Peek(hash); exist { - blocks = append(blocks, block) - } - } - if len(blocks) > 0 { - slices.SortFunc(blocks, func(a, b *types.Block) int { - return a.Number().Cmp(b.Number()) - }) - // Insert one by one as chain insertion needs contiguous ancestry between blocks - for i := range blocks { - bc.InsertChain(blocks[i : i+1]) - } - } -} - // WriteStatus status of write type WriteStatus byte @@ -1550,17 +1535,6 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. return nil } -// WriteBlockAndSetHead writes the given block and all associated state to the database, -// and applies the block as the new chain head. -func (bc *BlockChain) WriteBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) { - if !bc.chainmu.TryLock() { - return NonStatTy, errChainStopped - } - defer bc.chainmu.Unlock() - - return bc.writeBlockAndSetHead(block, receipts, logs, state, emitHeadEvent) -} - // writeBlockAndSetHead is the internal implementation of WriteBlockAndSetHead. // This function expects the chain mutex to be held. func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) { @@ -1587,8 +1561,6 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types if status == CanonStatTy { bc.writeHeadBlock(block) } - bc.futureBlocks.Remove(block.Hash()) - if status == CanonStatTy { bc.chainFeed.Send(ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) if len(logs) > 0 { @@ -1608,25 +1580,6 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types return status, nil } -// addFutureBlock checks if the block is within the max allowed window to get -// accepted for future processing, and returns an error if the block is too far -// ahead and was not added. -// -// TODO after the transition, the future block shouldn't be kept. Because -// it's not checked in the Geth side anymore. -func (bc *BlockChain) addFutureBlock(block *types.Block) error { - max := uint64(time.Now().Unix() + maxTimeFutureBlocks) - if block.Time() > max { - return fmt.Errorf("future block timestamp %v > allowed %v", block.Time(), max) - } - if block.Difficulty().Cmp(common.Big0) == 0 { - // Never add PoS blocks into the future queue - return nil - } - bc.futureBlocks.Add(block.Hash(), block) - return nil -} - // InsertChain attempts to insert the given batch of blocks in to the canonical // chain or, otherwise, create a fork. If an error is returned it will return // the index number of the failing block as well an error describing what went @@ -1764,26 +1717,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) _, err := bc.recoverAncestors(block) return it.index, err } - // First block is future, shove it (and all children) to the future queue (unknown ancestor) - case errors.Is(err, consensus.ErrFutureBlock) || (errors.Is(err, consensus.ErrUnknownAncestor) && bc.futureBlocks.Contains(it.first().ParentHash())): - for block != nil && (it.index == 0 || errors.Is(err, consensus.ErrUnknownAncestor)) { - log.Debug("Future block, postponing import", "number", block.Number(), "hash", block.Hash()) - if err := bc.addFutureBlock(block); err != nil { - return it.index, err - } - block, err = it.next() - } - stats.queued += it.processed() - stats.ignored += it.remaining() - - // If there are any still remaining, mark as ignored - return it.index, err - // Some other error(except ErrKnownBlock) occurred, abort. // ErrKnownBlock is allowed here since some known blocks // still need re-execution to generate snapshots that are missing case err != nil && !errors.Is(err, ErrKnownBlock): - bc.futureBlocks.Remove(block.Hash()) stats.ignored += len(it.chain) bc.reportBlock(block, nil, err) return it.index, err @@ -1806,11 +1743,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) log.Debug("Abort during block processing") break } - // If the header is a banned one, straight out abort - if BadHashes[block.Hash()] { - bc.reportBlock(block, nil, ErrBannedHash) - return it.index, ErrBannedHash - } // If the block is known (in the middle of the chain), it's a special case for // Clique blocks where they can share state among each other, so importing an // older block might complete the state of the subsequent one. In this case, @@ -1844,6 +1776,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) return it.index, err } stats.processed++ + if bc.logger != nil && bc.logger.OnSkippedBlock != nil { + bc.logger.OnSkippedBlock(tracing.BlockEvent{ + Block: block, + TD: bc.GetTd(block.ParentHash(), block.NumberU64()-1), + Finalized: bc.CurrentFinalBlock(), + Safe: bc.CurrentSafeBlock(), + }) + } // We can assume that logs are empty here, since the only way for consecutive // Clique blocks to have the same state is if there are no transactions. @@ -1861,6 +1801,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) if err != nil { return it.index, err } + statedb.SetLogger(bc.logger) // Enable prefetching to pull in trie node paths while processing transactions statedb.StartPrefetcher("chain") @@ -1874,7 +1815,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps) go func(start time.Time, followup *types.Block, throwaway *state.StateDB) { - bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt) + // Disable tracing for prefetcher executions. + vmCfg := bc.vmConfig + vmCfg.Tracer = nil + bc.prefetcher.Prefetch(followup, throwaway, vmCfg, &followupInterrupt) blockPrefetchExecuteTimer.Update(time.Since(start)) if followupInterrupt.Load() { @@ -1884,68 +1828,15 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) } } - // Process block using the parent state as reference point - pstart := time.Now() - receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig) - if err != nil { - bc.reportBlock(block, receipts, err) - followupInterrupt.Store(true) - return it.index, err - } - ptime := time.Since(pstart) - - vstart := time.Now() - if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { - bc.reportBlock(block, receipts, err) - followupInterrupt.Store(true) - return it.index, err - } - vtime := time.Since(vstart) - proctime := time.Since(start) // processing + validation - - // Update the metrics touched during block processing and validation - accountReadTimer.Update(statedb.AccountReads) // Account reads are complete(in processing) - storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete(in processing) - snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete(in processing) - snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete(in processing) - accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete(in validation) - storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete(in validation) - accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete(in validation) - storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete(in validation) - triehash := statedb.AccountHashes + statedb.StorageHashes // The time spent on tries hashing - trieUpdate := statedb.AccountUpdates + statedb.StorageUpdates // The time spent on tries update - trieRead := statedb.SnapshotAccountReads + statedb.AccountReads // The time spent on account read - trieRead += statedb.SnapshotStorageReads + statedb.StorageReads // The time spent on storage read - blockExecutionTimer.Update(ptime - trieRead) // The time spent on EVM processing - blockValidationTimer.Update(vtime - (triehash + trieUpdate)) // The time spent on block validation - - // Write the block to the chain and get the status. - var ( - wstart = time.Now() - status WriteStatus - ) - if !setHead { - // Don't set the head, only insert the block - err = bc.writeBlockWithState(block, receipts, statedb) - } else { - status, err = bc.writeBlockAndSetHead(block, receipts, logs, statedb, false) - } + // The traced section of block import. + res, err := bc.processBlock(block, statedb, start, setHead) followupInterrupt.Store(true) if err != nil { return it.index, err } - // Update the metrics touched during block commit - accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them - storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them - snapshotCommitTimer.Update(statedb.SnapshotCommits) // Snapshot commits are complete, we can mark them - triedbCommitTimer.Update(statedb.TrieDBCommits) // Trie database commits are complete, we can mark them - - blockWriteTimer.Update(time.Since(wstart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits - statedb.TrieDBCommits) - blockInsertTimer.UpdateSince(start) - // Report the import stats before returning the various results stats.processed++ - stats.usedGas += usedGas + stats.usedGas += res.usedGas var snapDiffItems, snapBufItems common.StorageSize if bc.snaps != nil { @@ -1957,11 +1848,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) if !setHead { // After merge we expect few side chains. Simply count // all blocks the CL gives us for GC processing time - bc.gcproc += proctime - + bc.gcproc += res.procTime return it.index, nil // Direct block insertion of a single block } - switch status { + switch res.status { case CanonStatTy: log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()), "txs", len(block.Transactions()), "gas", block.GasUsed(), @@ -1971,7 +1861,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) lastCanon = block // Only count canonical blocks for GC processing time - bc.gcproc += proctime + bc.gcproc += res.procTime case SideStatTy: log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), @@ -1988,24 +1878,93 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) "root", block.Root()) } } + stats.ignored += it.remaining() + return it.index, err +} - // Any blocks remaining here? The only ones we care about are the future ones - if block != nil && errors.Is(err, consensus.ErrFutureBlock) { - if err := bc.addFutureBlock(block); err != nil { - return it.index, err - } - block, err = it.next() +// blockProcessingResult is a summary of block processing +// used for updating the stats. +type blockProcessingResult struct { + usedGas uint64 + procTime time.Duration + status WriteStatus +} - for ; block != nil && errors.Is(err, consensus.ErrUnknownAncestor); block, err = it.next() { - if err := bc.addFutureBlock(block); err != nil { - return it.index, err - } - stats.queued++ - } +// processBlock executes and validates the given block. If there was no error +// it writes the block and associated state to database. +func (bc *BlockChain) processBlock(block *types.Block, statedb *state.StateDB, start time.Time, setHead bool) (_ *blockProcessingResult, blockEndErr error) { + if bc.logger != nil && bc.logger.OnBlockStart != nil { + td := bc.GetTd(block.ParentHash(), block.NumberU64()-1) + bc.logger.OnBlockStart(tracing.BlockEvent{ + Block: block, + TD: td, + Finalized: bc.CurrentFinalBlock(), + Safe: bc.CurrentSafeBlock(), + }) + } + if bc.logger != nil && bc.logger.OnBlockEnd != nil { + defer func() { + bc.logger.OnBlockEnd(blockEndErr) + }() } - stats.ignored += it.remaining() - return it.index, err + // Process block using the parent state as reference point + pstart := time.Now() + receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig) + if err != nil { + bc.reportBlock(block, receipts, err) + return nil, err + } + ptime := time.Since(pstart) + + vstart := time.Now() + if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { + bc.reportBlock(block, receipts, err) + return nil, err + } + vtime := time.Since(vstart) + proctime := time.Since(start) // processing + validation + + // Update the metrics touched during block processing and validation + accountReadTimer.Update(statedb.AccountReads) // Account reads are complete(in processing) + storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete(in processing) + snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete(in processing) + snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete(in processing) + accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete(in validation) + storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete(in validation) + accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete(in validation) + storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete(in validation) + triehash := statedb.AccountHashes + statedb.StorageHashes // The time spent on tries hashing + trieUpdate := statedb.AccountUpdates + statedb.StorageUpdates // The time spent on tries update + trieRead := statedb.SnapshotAccountReads + statedb.AccountReads // The time spent on account read + trieRead += statedb.SnapshotStorageReads + statedb.StorageReads // The time spent on storage read + blockExecutionTimer.Update(ptime - trieRead) // The time spent on EVM processing + blockValidationTimer.Update(vtime - (triehash + trieUpdate)) // The time spent on block validation + + // Write the block to the chain and get the status. + var ( + wstart = time.Now() + status WriteStatus + ) + if !setHead { + // Don't set the head, only insert the block + err = bc.writeBlockWithState(block, receipts, statedb) + } else { + status, err = bc.writeBlockAndSetHead(block, receipts, logs, statedb, false) + } + if err != nil { + return nil, err + } + // Update the metrics touched during block commit + accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them + storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them + snapshotCommitTimer.Update(statedb.SnapshotCommits) // Snapshot commits are complete, we can mark them + triedbCommitTimer.Update(statedb.TrieDBCommits) // Trie database commits are complete, we can mark them + + blockWriteTimer.Update(time.Since(wstart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits - statedb.TrieDBCommits) + blockInsertTimer.UpdateSince(start) + + return &blockProcessingResult{usedGas: usedGas, procTime: proctime, status: status}, nil } // insertSideChain is called when an import batch hits upon a pruned ancestor @@ -2313,14 +2272,14 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { // rewind the canonical chain to a lower point. log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "oldblocks", len(oldChain), "newnum", newBlock.Number(), "newhash", newBlock.Hash(), "newblocks", len(newChain)) } - // Reset the tx lookup cache in case to clear stale txlookups. - // This is done before writing any new chain data to avoid the - // weird scenario that canonical chain is changed while the - // stale lookups are still cached. - bc.txLookupCache.Purge() + // Acquire the tx-lookup lock before mutation. This step is essential + // as the txlookups should be changed atomically, and all subsequent + // reads should be blocked until the mutation is complete. + bc.txLookupLock.Lock() - // Insert the new chain(except the head block(reverse order)), - // taking care of the proper incremental order. + // Insert the new chain segment in incremental order, from the old + // to the new. The new chain head (newChain[0]) is not inserted here, + // as it will be handled separately outside of this function for i := len(newChain) - 1; i >= 1; i-- { // Insert the block in the canonical way, re-writing history bc.writeHeadBlock(newChain[i]) @@ -2357,6 +2316,11 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { if err := indexesBatch.Write(); err != nil { log.Crit("Failed to delete useless indexes", "err", err) } + // Reset the tx lookup cache to clear stale txlookup cache. + bc.txLookupCache.Purge() + + // Release the tx-lookup lock after mutation. + bc.txLookupLock.Unlock() // Send out events for logs from the old canon chain, and 'reborn' // logs from the new canon chain. The number of logs can be very @@ -2459,20 +2423,6 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) { return head.Hash(), nil } -func (bc *BlockChain) updateFutureBlocks() { - futureTimer := time.NewTicker(5 * time.Second) - defer futureTimer.Stop() - defer bc.wg.Done() - for { - select { - case <-futureTimer.C: - bc.procFutureBlocks() - case <-bc.quit: - return - } - } -} - // skipBlock returns 'true', if the block being imported can be skipped over, meaning // that the block does not need to be processed but can be considered already fully 'done'. func (bc *BlockChain) skipBlock(err error, it *insertIterator) bool {