diff --git a/core/blockchain.go b/core/blockchain.go
index b6d328cc642b..66b6fdcddc67 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -48,6 +48,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/ethclient"
"github.com/XinFinOrg/XDPoSChain/ethdb"
"github.com/XinFinOrg/XDPoSChain/event"
+ "github.com/XinFinOrg/XDPoSChain/internal/syncx"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/metrics"
"github.com/XinFinOrg/XDPoSChain/params"
@@ -81,6 +82,9 @@ var (
blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil)
blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil)
+ errInsertionInterrupted = errors.New("insertion is interrupted")
+ errChainStopped = errors.New("blockchain is stopped")
+
CheckpointCh = make(chan int)
)
@@ -149,9 +153,11 @@ type BlockChain struct {
scope event.SubscriptionScope
genesisBlock *types.Block
- mu sync.RWMutex // global mutex for locking chain operations
- chainmu sync.RWMutex // blockchain insertion lock
- procmu sync.RWMutex // block processor lock
+ // This mutex synchronizes chain write operations.
+ // Readers don't need to take it, they can just read the database.
+ chainmu *syncx.ClosableMutex
+
+ procmu sync.RWMutex // block processor lock
currentBlock atomic.Value // Current head of the block chain
currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!)
@@ -170,10 +176,10 @@ type BlockChain struct {
// future blocks are blocks added for later processing
futureBlocks *lru.Cache[common.Hash, *types.Block]
- wg sync.WaitGroup // chain processing wait group for shutting down
- quit chan struct{} // shutdown signal, closed in Stop.
- running int32 // 0 if chain is running, 1 when stopped
- procInterrupt int32 // interrupt signaler for block processing
+ wg sync.WaitGroup
+ quit chan struct{} // shutdown signal, closed in Stop.
+ running int32 // 0 if chain is running, 1 when stopped
+ procInterrupt int32 // interrupt signaler for block processing
engine consensus.Engine
processor Processor // block processor interface
@@ -212,6 +218,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
triegc: prque.New[int64, common.Hash](nil),
stateCache: state.NewDatabase(db),
quit: make(chan struct{}),
+ chainmu: syncx.NewClosableMutex(),
bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit),
bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit),
receiptsCache: lru.NewCache[common.Hash, types.Receipts](receiptsCacheLimit),
@@ -262,8 +269,11 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
}
}
}
- // Take ownership of this particular state
- go bc.update()
+
+ // Start future block processor.
+ bc.wg.Add(1)
+ go bc.futureBlocksLoop()
+
return bc, nil
}
@@ -411,15 +421,74 @@ func (bc *BlockChain) loadLastState() error {
func (bc *BlockChain) SetHead(head uint64) error {
log.Warn("Rewinding blockchain", "target", head)
- bc.mu.Lock()
- defer bc.mu.Unlock()
+ if !bc.chainmu.TryLock() {
+ return errChainStopped
+ }
+ defer bc.chainmu.Unlock()
+
+ updateFn := func(db ethdb.KeyValueWriter, header *types.Header) {
+ // Rewind the block chain, ensuring we don't end up with a stateless head block
+ if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() < currentBlock.NumberU64() {
+ newHeadBlock := bc.GetBlock(header.Hash(), header.Number.Uint64())
+ if newHeadBlock == nil {
+ newHeadBlock = bc.genesisBlock
+ } else {
+ if _, err := state.New(newHeadBlock.Root(), bc.stateCache); err != nil {
+ // Rewound state missing, rolled back to before pivot, reset to genesis
+ newHeadBlock = bc.genesisBlock
+ }
+ }
+ rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash())
+
+ // Degrade the chain markers if they are explicitly reverted.
+ // In theory we should update all in-memory markers in the
+ // last step, however the direction of SetHead is from high
+ // to low, so it's safe the update in-memory markers directly.
+ bc.currentBlock.Store(newHeadBlock)
+ headBlockGauge.Update(int64(newHeadBlock.NumberU64()))
+ }
+
+ // Rewind the fast block in a simpleton way to the target head
+ if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && header.Number.Uint64() < currentFastBlock.NumberU64() {
+ newHeadFastBlock := bc.GetBlock(header.Hash(), header.Number.Uint64())
+ // If either blocks reached nil, reset to the genesis state
+ if newHeadFastBlock == nil {
+ newHeadFastBlock = bc.genesisBlock
+ }
+ rawdb.WriteHeadFastBlockHash(db, newHeadFastBlock.Hash())
+
+ // Degrade the chain markers if they are explicitly reverted.
+ // In theory we should update all in-memory markers in the
+ // last step, however the direction of SetHead is from high
+ // to low, so it's safe the update in-memory markers directly.
+ bc.currentFastBlock.Store(newHeadFastBlock)
+ headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64()))
+ }
+ }
// Rewind the header chain, deleting all block bodies until then
- delFn := func(hash common.Hash, num uint64) {
- DeleteBody(bc.db, hash, num)
+ delFn := func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) {
+ // Ignore the error here since light client won't hit this path
+ frozen, _ := bc.db.Ancients()
+ if num+1 <= frozen {
+ // Truncate all relative data(header, total difficulty, body, receipt
+ // and canonical hash) from ancient store.
+ if err := bc.db.TruncateAncients(num + 1); err != nil {
+ log.Crit("Failed to truncate ancient data", "number", num, "err", err)
+ }
+
+ // Remove the hash <-> number mapping from the active store.
+ rawdb.DeleteHeaderNumber(db, hash)
+ } else {
+ // Remove relative body and receipts from the active store.
+ // The header, total difficulty and canonical hash will be
+ // removed in the hc.SetHead function.
+ rawdb.DeleteBody(db, hash, num)
+ rawdb.DeleteReceipts(db, hash, num)
+ }
+ // Todo(rjl493456442) txlookup, bloombits, etc
}
- bc.hc.SetHead(head, delFn)
- currentHeader := bc.hc.CurrentHeader()
+ bc.hc.SetHead(head, updateFn, delFn)
// Clear out any stale content from the caches
bc.bodyCache.Purge()
@@ -429,38 +498,6 @@ func (bc *BlockChain) SetHead(head uint64) error {
bc.futureBlocks.Purge()
bc.blocksHashCache.Purge()
- // Rewind the block chain, ensuring we don't end up with a stateless head block
- if currentBlock := bc.CurrentBlock(); currentBlock != nil && currentHeader.Number.Uint64() < currentBlock.NumberU64() {
- bc.currentBlock.Store(bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64()))
- headBlockGauge.Update(int64(currentHeader.Number.Uint64()))
- }
- if currentBlock := bc.CurrentBlock(); currentBlock != nil {
- if _, err := state.New(currentBlock.Root(), bc.stateCache); err != nil {
- // Rewound state missing, rolled back to before pivot, reset to genesis
- bc.currentBlock.Store(bc.genesisBlock)
- headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
- }
- }
- // Rewind the fast block in a simpleton way to the target head
- if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && currentHeader.Number.Uint64() < currentFastBlock.NumberU64() {
- bc.currentFastBlock.Store(bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64()))
- headFastBlockGauge.Update(int64(currentHeader.Number.Uint64()))
- }
- // If either blocks reached nil, reset to the genesis state
- if currentBlock := bc.CurrentBlock(); currentBlock == nil {
- bc.currentBlock.Store(bc.genesisBlock)
- headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
- }
- if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock == nil {
- bc.currentFastBlock.Store(bc.genesisBlock)
- headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
- }
- currentBlock := bc.CurrentBlock()
- currentFastBlock := bc.CurrentFastBlock()
- rawdb.WriteHeadBlockHash(bc.db, currentBlock.Hash())
- if err := WriteHeadFastBlockHash(bc.db, currentFastBlock.Hash()); err != nil {
- log.Crit("Failed to reset head fast block", "err", err)
- }
return bc.loadLastState()
}
@@ -475,11 +512,14 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error {
if _, err := trie.NewSecure(block.Root(), bc.stateCache.TrieDB()); err != nil {
return err
}
- // If all checks out, manually set the head block
- bc.mu.Lock()
+
+ // If all checks out, manually set the head block.
+ if !bc.chainmu.TryLock() {
+ return errChainStopped
+ }
bc.currentBlock.Store(block)
headBlockGauge.Update(int64(block.NumberU64()))
- bc.mu.Unlock()
+ bc.chainmu.Unlock()
log.Info("Committed new head block", "number", block.Number(), "hash", hash)
return nil
@@ -598,24 +638,28 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
if err := bc.SetHead(0); err != nil {
return err
}
- bc.mu.Lock()
- defer bc.mu.Unlock()
+ if !bc.chainmu.TryLock() {
+ return errChainStopped
+ }
+ defer bc.chainmu.Unlock()
// Prepare the genesis block and reinitialise the chain
- if err := bc.hc.WriteTd(genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil {
- log.Crit("Failed to write genesis block TD", "err", err)
+ batch := bc.db.NewBatch()
+ rawdb.WriteTd(batch, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty())
+ rawdb.WriteBlock(batch, genesis)
+ if err := batch.Write(); err != nil {
+ log.Crit("Failed to write genesis block", "err", err)
}
- rawdb.WriteBlock(bc.db, genesis)
+ bc.writeHeadBlock(genesis, false)
+
+ // Last update all in-memory chain markers
bc.genesisBlock = genesis
- bc.insert(bc.genesisBlock, false)
bc.currentBlock.Store(bc.genesisBlock)
headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
-
bc.hc.SetGenesis(bc.genesisBlock.Header())
bc.hc.SetCurrentHeader(bc.genesisBlock.Header())
bc.currentFastBlock.Store(bc.genesisBlock)
headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
-
return nil
}
@@ -672,8 +716,10 @@ func (bc *BlockChain) Export(w io.Writer) error {
// ExportN writes a subset of the active chain to the given writer.
func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
- bc.mu.RLock()
- defer bc.mu.RUnlock()
+ if !bc.chainmu.TryLock() {
+ return errChainStopped
+ }
+ defer bc.chainmu.Unlock()
if first > last {
return fmt.Errorf("export failed: first (%d) is greater than last (%d)", first, last)
@@ -690,31 +736,41 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
return err
}
}
-
return nil
}
-// insert injects a new head block into the current block chain. This method
+// writeHeadBlock injects a new head block into the current block chain. This method
// assumes that the block is indeed a true head. It will also reset the head
// header and the head fast sync block to this very same block if they are older
// or if they are on a different side chain.
//
// Note, this function assumes that the `mu` mutex is held!
-func (bc *BlockChain) insert(block *types.Block, writeBlock bool) {
-
+func (bc *BlockChain) writeHeadBlock(block *types.Block, writeBlock bool) {
blockHash := block.Hash()
blockNumberU64 := block.NumberU64()
- // If the block is on a side chain or an unknown one, force other heads onto it too
- updateHeads := GetCanonicalHash(bc.db, blockNumberU64) != blockHash
-
// Add the block to the canonical chain number scheme and mark as the head
- rawdb.WriteCanonicalHash(bc.db, blockHash, blockNumberU64)
- rawdb.WriteHeadBlockHash(bc.db, blockHash)
+ batch := bc.db.NewBatch()
+ rawdb.WriteHeadHeaderHash(batch, blockHash)
+ rawdb.WriteHeadFastBlockHash(batch, blockHash)
+ rawdb.WriteCanonicalHash(batch, blockHash, blockNumberU64)
+ rawdb.WriteTxLookupEntriesByBlock(batch, block)
+ rawdb.WriteHeadBlockHash(batch, blockHash)
if writeBlock {
- rawdb.WriteBlock(bc.db, block)
+ rawdb.WriteBlock(batch, block)
}
+ // Flush the whole batch into the disk, exit the node if failed
+ if err := batch.Write(); err != nil {
+ log.Crit("Failed to update chain indexes and markers", "err", err)
+ }
+
+ // Update all in-memory chain markers in the last step
+ bc.hc.SetCurrentHeader(block.Header())
+
+ bc.currentFastBlock.Store(block)
+ headFastBlockGauge.Update(int64(blockNumberU64))
+
bc.currentBlock.Store(block)
headBlockGauge.Update(int64(block.NumberU64()))
@@ -725,17 +781,6 @@ func (bc *BlockChain) insert(block *types.Block, writeBlock bool) {
engine.CacheNoneTIPSigningTxs(block.Header(), block.Transactions(), bc.GetReceiptsByHash(blockHash))
}
}
-
- // If the block is better than our head or is on a different chain, force update heads
- if updateHeads {
- bc.hc.SetCurrentHeader(block.Header())
-
- if err := WriteHeadFastBlockHash(bc.db, blockHash); err != nil {
- log.Crit("Failed to insert head fast block hash", "err", err)
- }
- bc.currentFastBlock.Store(block)
- headFastBlockGauge.Update(int64(block.NumberU64()))
- }
}
// Genesis retrieves the chain's genesis block.
@@ -1001,15 +1046,38 @@ func (bc *BlockChain) Stop() {
if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) {
return
}
- // Unsubscribe all subscriptions registered from blockchain
+
+ // Unsubscribe all subscriptions registered from blockchain.
bc.scope.Close()
+
+ // Signal shutdown to all goroutines.
close(bc.quit)
- atomic.StoreInt32(&bc.procInterrupt, 1)
+ bc.StopInsert()
+
+ // Now wait for all chain modifications to end and persistent goroutines to exit.
+ //
+ // Note: Close waits for the mutex to become available, i.e. any running chain
+ // modification will have exited when Close returns. Since we also called StopInsert,
+ // the mutex should become available quickly. It cannot be taken again after Close has
+ // returned.
+ bc.chainmu.Close()
bc.wg.Wait()
bc.saveData()
log.Info("Blockchain manager stopped")
}
+// StopInsert interrupts all insertion methods, causing them to return
+// errInsertionInterrupted as soon as possible. Insertion is permanently disabled after
+// calling this method.
+func (bc *BlockChain) StopInsert() {
+ atomic.StoreInt32(&bc.procInterrupt, 1)
+}
+
+// insertStopped returns true after StopInsert has been called.
+func (bc *BlockChain) insertStopped() bool {
+ return atomic.LoadInt32(&bc.procInterrupt) == 1
+}
+
func (bc *BlockChain) procFutureBlocks() {
blocks := make([]*types.Block, 0, bc.futureBlocks.Len())
for _, hash := range bc.futureBlocks.Keys() {
@@ -1053,34 +1121,49 @@ const (
// Rollback is designed to remove a chain of links from the database that aren't
// certain enough to be valid.
func (bc *BlockChain) Rollback(chain []common.Hash) {
- bc.mu.Lock()
- defer bc.mu.Unlock()
+ if !bc.chainmu.TryLock() {
+ return
+ }
+ defer bc.chainmu.Unlock()
+ batch := bc.db.NewBatch()
for i := len(chain) - 1; i >= 0; i-- {
hash := chain[i]
+ // Degrade the chain markers if they are explicitly reverted.
+ // In theory we should update all in-memory markers in the
+ // last step, however the direction of rollback is from high
+ // to low, so it's safe the update in-memory markers directly.
currentHeader := bc.hc.CurrentHeader()
if currentHeader.Hash() == hash {
- bc.hc.SetCurrentHeader(bc.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1))
+ newHeadHeader := bc.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1)
+ rawdb.WriteHeadHeaderHash(batch, currentHeader.ParentHash)
+ bc.hc.SetCurrentHeader(newHeadHeader)
}
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock.Hash() == hash {
newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1)
+ rawdb.WriteHeadFastBlockHash(batch, currentFastBlock.ParentHash())
bc.currentFastBlock.Store(newFastBlock)
- WriteHeadFastBlockHash(bc.db, newFastBlock.Hash())
headFastBlockGauge.Update(int64(newFastBlock.NumberU64()))
}
if currentBlock := bc.CurrentBlock(); currentBlock.Hash() == hash {
newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1)
+ rawdb.WriteHeadBlockHash(batch, currentBlock.ParentHash())
bc.currentBlock.Store(newBlock)
headBlockGauge.Update(int64(newBlock.NumberU64()))
- rawdb.WriteHeadBlockHash(bc.db, newBlock.Hash())
}
}
+ if err := batch.Write(); err != nil {
+ log.Crit("Failed to rollback chain markers", "err", err)
+ }
+ // TODO: Truncate ancient data which exceeds the current header.
}
// InsertReceiptChain attempts to complete an already existing header chain with
// transaction and receipt data.
func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
+ // We don't require the chainMu here since we want to maximize the
+ // concurrency of header insertion and receipt insertion.
bc.wg.Add(1)
defer bc.wg.Done()
@@ -1128,8 +1211,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if err := WriteTxLookupEntries(batch, block); err != nil {
return i, fmt.Errorf("failed to write lookup metadata: %v", err)
}
- stats.processed++
+ // Write everything belongs to the blocks into the database. So that
+ // we can ensure all components of body is completed(body, receipts,
+ // tx indexes)
if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
return 0, err
@@ -1137,7 +1222,11 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
bytes += batch.ValueSize()
batch.Reset()
}
+ stats.processed++
}
+ // Write everything belongs to the blocks into the database. So that
+ // we can ensure all components of body is completed(body, receipts,
+ // tx indexes)
if batch.ValueSize() > 0 {
bytes += batch.ValueSize()
if err := batch.Write(); err != nil {
@@ -1146,7 +1235,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
}
// Update the head fast sync block if better
- bc.mu.Lock()
+ if !bc.chainmu.TryLock() {
+ return 0, errChainStopped
+ }
head := blockChain[len(blockChain)-1]
if td := bc.GetTd(head.Hash(), head.NumberU64()); td != nil { // Rewind may have occurred, skip in that case
currentFastBlock := bc.CurrentFastBlock()
@@ -1158,7 +1249,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
headFastBlockGauge.Update(int64(head.NumberU64()))
}
}
- bc.mu.Unlock()
+ bc.chainmu.Unlock()
log.Info("Imported new block receipts",
"count", stats.processed,
@@ -1172,24 +1263,38 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
var lastWrite uint64
-// WriteBlockWithoutState writes only the block and its metadata to the database,
+// writeBlockWithoutState writes only the block and its metadata to the database,
// but does not write any state. This is used to construct competing side forks
// up to the point where they exceed the canonical total difficulty.
-func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (err error) {
- bc.wg.Add(1)
- defer bc.wg.Done()
+func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (err error) {
+ if bc.insertStopped() {
+ return errInsertionInterrupted
+ }
- if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), td); err != nil {
- return err
+ batch := bc.db.NewBatch()
+ rawdb.WriteTd(batch, block.Hash(), block.NumberU64(), td)
+ rawdb.WriteBlock(batch, block)
+ if err := batch.Write(); err != nil {
+ log.Crit("Failed to write block into disk", "err", err)
}
- rawdb.WriteBlock(bc.db, block)
return nil
}
// WriteBlockWithState writes the block and all associated state to the database.
func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB, tradingState *tradingstate.TradingStateDB, lendingState *lendingstate.LendingStateDB) (status WriteStatus, err error) {
- bc.wg.Add(1)
- defer bc.wg.Done()
+ if !bc.chainmu.TryLock() {
+ return NonStatTy, errInsertionInterrupted
+ }
+ defer bc.chainmu.Unlock()
+ return bc.writeBlockWithState(block, receipts, state, tradingState, lendingState)
+}
+
+// writeBlockWithState writes the block and all associated state to the database,
+// but is expects the chain mutex to be held.
+func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB, tradingState *tradingstate.TradingStateDB, lendingState *lendingstate.LendingStateDB) (status WriteStatus, err error) {
+ if bc.insertStopped() {
+ return NonStatTy, errInsertionInterrupted
+ }
// Calculate the total difficulty of the block
ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1)
@@ -1197,24 +1302,29 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
return NonStatTy, consensus.ErrUnknownAncestor
}
// Make sure no inconsistent state is leaked during insertion
- bc.mu.Lock()
- defer bc.mu.Unlock()
-
currentBlock := bc.CurrentBlock()
localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
externTd := new(big.Int).Add(block.Difficulty(), ptd)
- // Irrelevant of the canonical status, write the block itself to the database
- if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), externTd); err != nil {
- return NonStatTy, err
- }
- // Write other block data using a batch.
- batch := bc.db.NewBatch()
- rawdb.WriteBlock(batch, block)
+ // Irrelevant of the canonical status, write the block itself to the database.
+ //
+ // Note all the components of block(td, hash->number map, header, body, receipts)
+ // should be written atomically. BlockBatch is used for containing all components.
+ blockBatch := bc.db.NewBatch()
+ rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd)
+ rawdb.WriteBlock(blockBatch, block)
+ rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts)
+ rawdb.WritePreimages(blockBatch, state.Preimages())
+ if err := blockBatch.Write(); err != nil {
+ log.Crit("Failed to write block into disk", "err", err)
+ }
+ // Commit all cached state changes into underlying memory database.
root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number()))
if err != nil {
return NonStatTy, err
}
+ triedb := bc.stateCache.TrieDB()
+
tradingRoot := common.Hash{}
if tradingState != nil {
tradingRoot, err = tradingState.Commit()
@@ -1229,6 +1339,7 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
return NonStatTy, err
}
}
+
engine, _ := bc.Engine().(*XDPoS.XDPoS)
var tradingTrieDb *trie.Database
var tradingService utils.TradingService
@@ -1244,7 +1355,7 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
lendingTrieDb = lendingService.GetStateCache().TrieDB()
}
}
- triedb := bc.stateCache.TrieDB()
+
// If we're running an archive node, always flush
if bc.cacheConfig.Disabled {
if err := triedb.Commit(root, false); err != nil {
@@ -1352,9 +1463,7 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
}
}
}
- if err := WriteBlockReceipts(batch, block.Hash(), block.NumberU64(), receipts); err != nil {
- return NonStatTy, err
- }
+
// If the total difficulty is higher than our known, add it to the canonical chain
// Second clause in the if statement reduces the vulnerability to selfish mining.
// Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
@@ -1364,24 +1473,6 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
// Split same-difficulty blocks by number
reorg = block.NumberU64() > currentBlock.NumberU64()
}
-
- // This is the ETH fix. We shall ultimately have this workflow,
- // but due to below code has diverged significantly between ETH and XDC, and current issue we have,
- // it's best to have it in a different PR with more investigations.
- // if reorg {
- // // Write the positional metadata for transaction and receipt lookups
- // if err := WriteTxLookupEntries(batch, block); err != nil {
- // return NonStatTy, err
- // }
- // // Write hash preimages
- // if err := WritePreimages(bc.db, block.NumberU64(), state.Preimages()); err != nil {
- // return NonStatTy, err
- // }
- // }
- // if err := batch.Write(); err != nil {
- // return NonStatTy, err
- // }
-
if reorg {
// Reorganise the chain if the parent is not the head block
if block.ParentHash() != currentBlock.Hash() {
@@ -1389,26 +1480,15 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
return NonStatTy, err
}
}
- // Write the positional metadata for transaction and receipt lookups
- if err := WriteTxLookupEntries(batch, block); err != nil {
- return NonStatTy, err
- }
- // Write hash preimages
- if err := WritePreimages(bc.db, block.NumberU64(), state.Preimages()); err != nil {
- return NonStatTy, err
- }
status = CanonStatTy
} else {
status = SideStatTy
}
- if err := batch.Write(); err != nil {
- return NonStatTy, err
- }
// Set new head.
if status == CanonStatTy {
// WriteBlock has already been called, no need to write again
- bc.insert(block, false)
+ bc.writeHeadBlock(block, false)
// prepare set of masternodes for the next epoch
if bc.chainConfig.XDPoS != nil && ((block.NumberU64() % bc.chainConfig.XDPoS.Epoch) == (bc.chainConfig.XDPoS.Epoch - bc.chainConfig.XDPoS.Gap)) {
err := bc.UpdateM1()
@@ -1435,38 +1515,51 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
//
// After insertion is done, all accumulated events will be fired.
func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
- n, events, logs, err := bc.insertChain(chain, true)
- bc.PostChainEvents(events, logs)
- return n, err
-}
-
-// insertChain will execute the actual chain insertion and event aggregation. The
-// only reason this method exists as a separate one is to make locking cleaner
-// with deferred statements.
-func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) {
// Sanity check that we have something meaningful to import
if len(chain) == 0 {
- return 0, nil, nil, nil
+ return 0, nil
}
- engine, _ := bc.Engine().(*XDPoS.XDPoS)
// Do a sanity check that the provided chain is actually ordered and linked
for i := 1; i < len(chain); i++ {
- if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() {
+ block, prev := chain[i], chain[i-1]
+ if block.NumberU64() != chain[i-1].NumberU64()+1 || block.ParentHash() != chain[i-1].Hash() {
// Chain broke ancestry, log a messge (programming error) and skip insertion
- log.Error("Non contiguous block insert", "number", chain[i].Number(), "hash", chain[i].Hash(),
- "parent", chain[i].ParentHash(), "prevnumber", chain[i-1].Number(), "prevhash", chain[i-1].Hash())
+ log.Error("Non contiguous block insert",
+ "number", block.Number(),
+ "hash", block.Hash(),
+ "parent", block.ParentHash(),
+ "prevnumber", prev.Number(),
+ "prevhash", prev.Hash())
- return 0, nil, nil, fmt.Errorf("non contiguous insert: item %d is #%d [%x..], item %d is #%d [%x..] (parent [%x..])", i-1, chain[i-1].NumberU64(),
- chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4])
+ return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x..], item %d is #%d [%x..] (parent [%x..])", i-1, prev.NumberU64(),
+ prev.Hash().Bytes()[:4], i, block.NumberU64(), block.Hash().Bytes()[:4], block.ParentHash().Bytes()[:4])
}
}
- // Pre-checks passed, start the full block imports
- bc.wg.Add(1)
- defer bc.wg.Done()
- bc.chainmu.Lock()
+ // Pre-check passed, start the full block imports.
+ if !bc.chainmu.TryLock() {
+ return 0, errChainStopped
+ }
defer bc.chainmu.Unlock()
+ n, events, logs, err := bc.insertChain(chain, true)
+ bc.PostChainEvents(events, logs)
+ return n, err
+}
+
+// insertChain is the internal implementation of InsertChain, which assumes that
+// 1) chains are contiguous, and 2) The chain mutex is held.
+//
+// This method is split out so that import batches that require re-injecting
+// historical blocks can do so without releasing the lock, which could lead to
+// racey behaviour. If a sidechain import is in progress, and the historic state
+// is imported, but then new canon-head is added before the actual sidechain
+// completes, then the historic state could be pruned again
+func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) {
+ // If the chain is terminating, don't even bother starting up.
+ if bc.insertStopped() {
+ return 0, nil, nil, nil
+ }
// A queued approach to delivering events. This is generally
// faster than direct delivery and requires much less mutex
@@ -1543,7 +1636,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
externTd := new(big.Int).Add(bc.GetTd(block.ParentHash(), block.NumberU64()-1), block.Difficulty())
if localTd.Cmp(externTd) > 0 {
- if err = bc.WriteBlockWithoutState(block, externTd); err != nil {
+ if err = bc.writeBlockWithoutState(block, externTd); err != nil {
return i, events, coalescedLogs, err
}
continue
@@ -1561,10 +1654,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
}
log.Debug("Number block need calculated again", "number", block.NumberU64(), "hash", block.Hash().Hex(), "winners", len(winner))
// Import all the pruned blocks to make the state available
- bc.chainmu.Unlock()
// During reorg, we use verifySeals=false
_, evs, logs, err := bc.insertChain(winner, false)
- bc.chainmu.Lock()
events, coalescedLogs = evs, logs
if err != nil {
@@ -1593,6 +1684,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
var tradingService utils.TradingService
var lendingService utils.LendingService
isSDKNode := false
+ engine, _ := bc.Engine().(*XDPoS.XDPoS)
if bc.Config().IsTIPXDCXReceiver(block.Number()) && bc.chainConfig.XDPoS != nil && engine != nil && block.NumberU64() > bc.chainConfig.XDPoS.Epoch {
author, err := bc.Engine().Author(block.Header()) // Ignore error, we're past header validation
if err != nil {
@@ -1710,7 +1802,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
proctime := time.Since(bstart)
// Write the block to the chain and get the status.
- status, err := bc.WriteBlockWithState(block, receipts, statedb, tradingState, lendingState)
+ status, err := bc.writeBlockWithState(block, receipts, statedb, tradingState, lendingState)
t3 := time.Now()
if err != nil {
return i, events, coalescedLogs, err
@@ -2056,12 +2148,14 @@ func (bc *BlockChain) insertBlock(block *types.Block) ([]interface{}, []*types.L
bc.wg.Add(1)
defer bc.wg.Done()
// Write the block to the chain and get the status.
- bc.chainmu.Lock()
+ if !bc.chainmu.TryLock() {
+ return nil, nil, errChainStopped
+ }
defer bc.chainmu.Unlock()
if bc.HasBlockAndFullState(block.Hash(), block.NumberU64()) {
return events, coalescedLogs, nil
}
- status, err := bc.WriteBlockWithState(block, result.receipts, result.state, result.tradingState, result.lendingState)
+ status, err := bc.writeBlockWithState(block, result.receipts, result.state, result.tradingState, result.lendingState)
if err != nil {
return events, coalescedLogs, err
@@ -2169,23 +2263,25 @@ func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log {
var logs []*types.Log
for _, receipt := range receipts {
for _, log := range receipt.Logs {
- l := *log
- l.Removed = removed
- logs = append(logs, &l)
+ if removed {
+ log.Removed = true
+ }
+ logs = append(logs, log)
}
}
return logs
}
-// reorgs takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them
-// to be part of the new canonical chain and accumulates potential missing transactions and post an
-// event about them
+// reorg takes two blocks, an old chain and a new chain and will reconstruct the
+// blocks and inserts them to be part of the new canonical chain and accumulates
+// potential missing transactions and post an event about them.
func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
var (
newChain types.Blocks
oldChain types.Blocks
commonBlock *types.Block
deletedTxs types.Transactions
+ addedTxs types.Transactions
deletedLogs []*types.Log
)
log.Warn("Reorg", "oldBlock hash", oldBlock.Hash().Hex(), "number", oldBlock.NumberU64(), "newBlock hash", newBlock.Hash().Hex(), "number", newBlock.NumberU64())
@@ -2234,6 +2330,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
return errors.New("invalid new chain")
}
}
+
// Ensure XDPoS engine committed block will be not reverted
if xdpos, ok := bc.Engine().(*XDPoS.XDPoS); ok {
latestCommittedBlock := xdpos.EngineV2.GetLatestCommittedBlockInfo()
@@ -2259,6 +2356,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
}
}
}
+
// Ensure the user sees large reorgs
if len(oldChain) > 0 && len(newChain) > 0 {
logFn := log.Warn
@@ -2273,16 +2371,16 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
} else {
log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "newnum", newBlock.Number(), "newhash", newBlock.Hash())
}
- // Insert the new chain, taking care of the proper incremental order
- var addedTxs types.Transactions
+
+ // Insert the new chain(except the head block(reverse order)),
+ // taking care of the proper incremental order.
for i := len(newChain) - 1; i >= 0; i-- {
// insert the block in the canonical way, re-writing history
- bc.insert(newChain[i], true)
- // write lookup entries for hash based transaction/receipt searches
- if err := WriteTxLookupEntries(bc.db, newChain[i]); err != nil {
- return err
- }
+ bc.writeHeadBlock(newChain[i], true)
+
+ // Collect the new added transactions.
addedTxs = append(addedTxs, newChain[i].Transactions()...)
+
// prepare set of masternodes for the next epoch
if bc.chainConfig.XDPoS != nil && ((newChain[i].NumberU64() % bc.chainConfig.XDPoS.Epoch) == (bc.chainConfig.XDPoS.Epoch - bc.chainConfig.XDPoS.Gap)) {
err := bc.UpdateM1()
@@ -2291,20 +2389,36 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
}
}
}
- // calculate the difference between deleted and added transactions
- diff := types.TxDifference(deletedTxs, addedTxs)
- // When transactions get deleted from the database that means the
- // receipts that were created in the fork must also be deleted
- for _, tx := range diff {
- DeleteTxLookupEntry(bc.db, tx.Hash())
+
+ // Delete useless indexes right now which includes the non-canonical
+ // transaction indexes, canonical chain indexes which above the head.
+ indexesBatch := bc.db.NewBatch()
+ for _, tx := range types.TxDifference(deletedTxs, addedTxs) {
+ rawdb.DeleteTxLookupEntry(indexesBatch, tx.Hash())
}
+ // Delete any canonical number assignments above the new head
+ number := bc.CurrentBlock().NumberU64()
+ for i := number + 1; ; i++ {
+ hash := rawdb.ReadCanonicalHash(bc.db, i)
+ if hash == (common.Hash{}) {
+ break
+ }
+ rawdb.DeleteCanonicalHash(indexesBatch, i)
+ }
+ if err := indexesBatch.Write(); err != nil {
+ log.Crit("Failed to delete useless indexes", "err", err)
+ }
+ // If any logs need to be fired, do it now. In theory we could avoid creating
+ // this goroutine if there are no events to fire, but realistcally that only
+ // ever happens if we're reorging empty blocks, which will only happen on idle
+ // networks where performance is not an issue either way.
if len(deletedLogs) > 0 {
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
}
if len(oldChain) > 0 {
go func() {
- for _, block := range oldChain {
- bc.chainSideFeed.Send(ChainSideEvent{Block: block})
+ for i := len(oldChain) - 1; i >= 0; i-- {
+ bc.chainSideFeed.Send(ChainSideEvent{Block: oldChain[i]})
}
}()
}
@@ -2333,7 +2447,10 @@ func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
}
}
-func (bc *BlockChain) update() {
+// futureBlocksLoop processes the 'future block' queue.
+func (bc *BlockChain) futureBlocksLoop() {
+ defer bc.wg.Done()
+
futureTimer := time.NewTicker(10 * time.Millisecond)
defer futureTimer.Stop()
for {
@@ -2417,17 +2534,12 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i
return i, err
}
- // Make sure only one thread manipulates the chain at once
- bc.chainmu.Lock()
+ if !bc.chainmu.TryLock() {
+ return 0, errChainStopped
+ }
defer bc.chainmu.Unlock()
- bc.wg.Add(1)
- defer bc.wg.Done()
-
whFunc := func(header *types.Header) error {
- bc.mu.Lock()
- defer bc.mu.Unlock()
-
_, err := bc.hc.WriteHeader(header)
return err
}
diff --git a/core/blockchain_test.go b/core/blockchain_test.go
index de15d9ca6c5a..31ba9bb4a701 100644
--- a/core/blockchain_test.go
+++ b/core/blockchain_test.go
@@ -25,10 +25,9 @@ import (
"testing"
"time"
- "github.com/XinFinOrg/XDPoSChain/core/rawdb"
-
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/consensus/ethash"
+ "github.com/XinFinOrg/XDPoSChain/core/rawdb"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/core/vm"
@@ -129,11 +128,11 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
blockchain.reportBlock(block, receipts, err)
return err
}
- blockchain.mu.Lock()
+ blockchain.chainmu.MustLock()
WriteTd(blockchain.db, block.Hash(), block.NumberU64(), new(big.Int).Add(block.Difficulty(), blockchain.GetTdByHash(block.ParentHash())))
rawdb.WriteBlock(blockchain.db, block)
statedb.Commit(true)
- blockchain.mu.Unlock()
+ blockchain.chainmu.Unlock()
}
return nil
}
@@ -147,10 +146,10 @@ func testHeaderChainImport(chain []*types.Header, blockchain *BlockChain) error
return err
}
// Manually insert the header into the database, but don't reorganise (allows subsequent testing)
- blockchain.mu.Lock()
+ blockchain.chainmu.MustLock()
WriteTd(blockchain.db, header.Hash(), header.Number.Uint64(), new(big.Int).Add(header.Difficulty, blockchain.GetTdByHash(header.ParentHash)))
rawdb.WriteHeader(blockchain.db, header)
- blockchain.mu.Unlock()
+ blockchain.chainmu.Unlock()
}
return nil
}
diff --git a/core/database_util.go b/core/database_util.go
index e65ac0ff8e2c..d25e06ac3a6f 100644
--- a/core/database_util.go
+++ b/core/database_util.go
@@ -487,11 +487,6 @@ func DeleteBlockReceipts(db DatabaseDeleter, hash common.Hash, number uint64) {
db.Delete(append(append(blockReceiptsPrefix, encodeBlockNumber(number)...), hash.Bytes()...))
}
-// DeleteTxLookupEntry removes all transaction data associated with a hash.
-func DeleteTxLookupEntry(db DatabaseDeleter, hash common.Hash) {
- db.Delete(append(lookupPrefix, hash.Bytes()...))
-}
-
// PreimageTable returns a Database instance with the key prefix for preimage entries.
func PreimageTable(db ethdb.Database) ethdb.Database {
return rawdb.NewTable(db, preimagePrefix)
diff --git a/core/database_util_test.go b/core/database_util_test.go
index 1f29908b963f..0c1494ba527e 100644
--- a/core/database_util_test.go
+++ b/core/database_util_test.go
@@ -24,8 +24,8 @@ import (
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"github.com/XinFinOrg/XDPoSChain/core/types"
- "golang.org/x/crypto/sha3"
"github.com/XinFinOrg/XDPoSChain/rlp"
+ "golang.org/x/crypto/sha3"
)
// Tests block header storage and retrieval operations.
@@ -304,7 +304,7 @@ func TestLookupStorage(t *testing.T) {
}
// Delete the transactions and check purge
for i, tx := range txs {
- DeleteTxLookupEntry(db, tx.Hash())
+ rawdb.DeleteTxLookupEntry(db, tx.Hash())
if txn, _, _, _ := GetTransaction(db, tx.Hash()); txn != nil {
t.Fatalf("tx #%d [%x]: deleted transaction returned: %v", i, tx.Hash(), txn)
}
diff --git a/core/headerchain.go b/core/headerchain.go
index 51a876d2ecac..633803f2faab 100644
--- a/core/headerchain.go
+++ b/core/headerchain.go
@@ -45,6 +45,14 @@ const (
// HeaderChain implements the basic block header chain logic that is shared by
// core.BlockChain and light.LightChain. It is not usable in itself, only as
// a part of either structure.
+//
+// HeaderChain is responsible for maintaining the header chain including the
+// header query and updating.
+//
+// The components maintained by headerchain includes: (1) total difficult
+// (2) header (3) block hash -> number mapping (4) canonical number -> hash mapping
+// and (5) head header flag.
+//
// It is not thread safe either, the encapsulating chain structures should do
// the necessary mutex locking/unlocking.
type HeaderChain struct {
@@ -66,11 +74,8 @@ type HeaderChain struct {
engine consensus.Engine
}
-// NewHeaderChain creates a new HeaderChain structure.
-//
-// getValidator should return the parent's validator
-// procInterrupt points to the parent's interrupt semaphore
-// wg points to the parent's shutdown wait group
+// NewHeaderChain creates a new HeaderChain structure. ProcInterrupt points
+// to the parent's interrupt semaphore.
func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine, procInterrupt func() bool) (*HeaderChain, error) {
// Seed a fast but crypto originating random generator
seed, err := crand.Int(crand.Reader, big.NewInt(math.MaxInt64))
@@ -143,41 +148,54 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er
externTd := new(big.Int).Add(header.Difficulty, ptd)
// Irrelevant of the canonical status, write the td and header to the database
- if err := hc.WriteTd(hash, number, externTd); err != nil {
- log.Crit("Failed to write header total difficulty", "err", err)
+ //
+ // Note all the components of header(td, hash->number index and header) should
+ // be written atomically.
+ headerBatch := hc.chainDb.NewBatch()
+ rawdb.WriteTd(headerBatch, hash, number, externTd)
+ rawdb.WriteHeader(headerBatch, header)
+ if err := headerBatch.Write(); err != nil {
+ log.Crit("Failed to write header into disk", "err", err)
}
- rawdb.WriteHeader(hc.chainDb, header)
// If the total difficulty is higher than our known, add it to the canonical chain
// Second clause in the if statement reduces the vulnerability to selfish mining.
// Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
if externTd.Cmp(localTd) > 0 || (externTd.Cmp(localTd) == 0 && mrand.Float64() < 0.5) {
+ // If the header can be added into canonical chain, adjust the
+ // header chain markers(canonical indexes and head header flag).
+ //
+ // Note all markers should be written atomically.
+
// Delete any canonical number assignments above the new head
+ markerBatch := hc.chainDb.NewBatch()
for i := number + 1; ; i++ {
- hash := GetCanonicalHash(hc.chainDb, i)
+ hash := rawdb.ReadCanonicalHash(hc.chainDb, i)
if hash == (common.Hash{}) {
break
}
- DeleteCanonicalHash(hc.chainDb, i)
+ rawdb.DeleteCanonicalHash(markerBatch, i)
}
+
// Overwrite any stale canonical number assignments
var (
headHash = header.ParentHash
headNumber = header.Number.Uint64() - 1
headHeader = hc.GetHeader(headHash, headNumber)
)
- for GetCanonicalHash(hc.chainDb, headNumber) != headHash {
- rawdb.WriteCanonicalHash(hc.chainDb, headHash, headNumber)
+ for rawdb.ReadCanonicalHash(hc.chainDb, headNumber) != headHash {
+ rawdb.WriteCanonicalHash(markerBatch, headHash, headNumber)
headHash = headHeader.ParentHash
headNumber = headHeader.Number.Uint64() - 1
headHeader = hc.GetHeader(headHash, headNumber)
}
// Extend the canonical chain with the new header
- rawdb.WriteCanonicalHash(hc.chainDb, hash, number)
- if err := WriteHeadHeaderHash(hc.chainDb, hash); err != nil {
- log.Crit("Failed to insert head header hash", "err", err)
+ rawdb.WriteCanonicalHash(markerBatch, hash, number)
+ rawdb.WriteHeadHeaderHash(markerBatch, hash)
+ if err := markerBatch.Write(); err != nil {
+ log.Crit("Failed to write header markers into disk", "err", err)
}
-
+ // Last step update all in-memory head header markers
hc.currentHeaderHash = hash
hc.currentHeader.Store(types.CopyHeader(header))
headHeaderGauge.Update(header.Number.Int64())
@@ -186,10 +204,9 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er
} else {
status = SideStatTy
}
-
+ hc.tdCache.Add(hash, externTd)
hc.headerCache.Add(hash, header)
hc.numberCache.Add(hash, number)
-
return
}
@@ -328,16 +345,6 @@ func (hc *HeaderChain) GetTdByHash(hash common.Hash) *big.Int {
return hc.GetTd(hash, hc.GetBlockNumber(hash))
}
-// WriteTd stores a block's total difficulty into the database, also caching it
-// along the way.
-func (hc *HeaderChain) WriteTd(hash common.Hash, number uint64, td *big.Int) error {
- if err := WriteTd(hc.chainDb, hash, number, td); err != nil {
- return err
- }
- hc.tdCache.Add(hash, new(big.Int).Set(td))
- return nil
-}
-
// GetHeader retrieves a block header from the database by hash and number,
// caching it if found.
func (hc *HeaderChain) GetHeader(hash common.Hash, number uint64) *types.Header {
@@ -361,12 +368,13 @@ func (hc *HeaderChain) GetHeaderByHash(hash common.Hash) *types.Header {
}
// HasHeader checks if a block header is present in the database or not.
+// In theory, if header is present in the database, all relative components
+// like td and hash->number should be present too.
func (hc *HeaderChain) HasHeader(hash common.Hash, number uint64) bool {
if hc.numberCache.Contains(hash) || hc.headerCache.Contains(hash) {
return true
}
- ok, _ := hc.chainDb.Has(headerKey(hash, number))
- return ok
+ return rawdb.HasHeader(hc.chainDb, hash, number)
}
// GetHeaderByNumber retrieves a block header from the database by number,
@@ -390,58 +398,79 @@ func (hc *HeaderChain) CurrentHeader() *types.Header {
return hc.currentHeader.Load().(*types.Header)
}
-// SetCurrentHeader sets the current head header of the canonical chain.
+// SetCurrentHeader sets the in-memory head header marker of the canonical chan
+// as the given header.
func (hc *HeaderChain) SetCurrentHeader(head *types.Header) {
- if err := WriteHeadHeaderHash(hc.chainDb, head.Hash()); err != nil {
- log.Crit("Failed to insert head header hash", "err", err)
- }
-
hc.currentHeader.Store(head)
hc.currentHeaderHash = head.Hash()
headHeaderGauge.Update(head.Number.Int64())
}
-// DeleteCallback is a callback function that is called by SetHead before
-// each header is deleted.
-type DeleteCallback func(common.Hash, uint64)
-
-// SetHead rewinds the local chain to a new head. Everything above the new head
-// will be deleted and the new one set.
-func (hc *HeaderChain) SetHead(head uint64, delFn DeleteCallback) {
- height := uint64(0)
+type (
+ // UpdateHeadBlocksCallback is a callback function that is called by SetHead
+ // before head header is updated.
+ UpdateHeadBlocksCallback func(ethdb.KeyValueWriter, *types.Header)
- if hdr := hc.CurrentHeader(); hdr != nil {
- height = hdr.Number.Uint64()
- }
+ // DeleteBlockContentCallback is a callback function that is called by SetHead
+ // before each header is deleted.
+ DeleteBlockContentCallback func(ethdb.KeyValueWriter, common.Hash, uint64)
+)
+// SetHead rewinds the local chain to a new head. In the case of headers, everything
+// above the new head will be deleted and the new one set. In the case of blocks
+// though, the head may be further rewound if block bodies are missing (non-archive
+// nodes after a fast sync).
+func (hc *HeaderChain) SetHead(head uint64, updateFn UpdateHeadBlocksCallback, delFn DeleteBlockContentCallback) {
+ var (
+ parentHash common.Hash
+ batch = hc.chainDb.NewBatch()
+ )
for hdr := hc.CurrentHeader(); hdr != nil && hdr.Number.Uint64() > head; hdr = hc.CurrentHeader() {
- hash := hdr.Hash()
- num := hdr.Number.Uint64()
+ hash, num := hdr.Hash(), hdr.Number.Uint64()
+
+ // Rewind block chain to new head.
+ parent := hc.GetHeader(hdr.ParentHash, num-1)
+ if parent == nil {
+ parent = hc.genesisHeader
+ }
+ parentHash = hdr.ParentHash
+ // Notably, since geth has the possibility for setting the head to a low
+ // height which is even lower than ancient head.
+ // In order to ensure that the head is always no higher than the data in
+ // the database(ancient store or active store), we need to update head
+ // first then remove the relative data from the database.
+ //
+ // Update head first(head fast block, head full block) before deleting the data.
+ markerBatch := hc.chainDb.NewBatch()
+ if updateFn != nil {
+ updateFn(markerBatch, parent)
+ }
+ // Update head header then.
+ rawdb.WriteHeadHeaderHash(markerBatch, parentHash)
+ if err := markerBatch.Write(); err != nil {
+ log.Crit("Failed to update chain markers", "error", err)
+ }
+ hc.currentHeader.Store(parent)
+ hc.currentHeaderHash = parentHash
+ headHeaderGauge.Update(parent.Number.Int64())
+
+ // Remove the relative data from the database.
if delFn != nil {
- delFn(hash, num)
+ delFn(batch, hash, num)
}
- DeleteHeader(hc.chainDb, hash, num)
- DeleteTd(hc.chainDb, hash, num)
- hc.currentHeader.Store(hc.GetHeader(hdr.ParentHash, hdr.Number.Uint64()-1))
+ // Rewind header chain to new head.
+ rawdb.DeleteHeader(batch, hash, num)
+ rawdb.DeleteTd(batch, hash, num)
+ rawdb.DeleteCanonicalHash(batch, num)
}
- // Roll back the canonical chain numbering
- for i := height; i > head; i-- {
- DeleteCanonicalHash(hc.chainDb, i)
+ // Flush all accumulated deletions.
+ if err := batch.Write(); err != nil {
+ log.Crit("Failed to rewind block", "error", err)
}
// Clear out any stale content from the caches
hc.headerCache.Purge()
hc.tdCache.Purge()
hc.numberCache.Purge()
-
- if hc.CurrentHeader() == nil {
- hc.currentHeader.Store(hc.genesisHeader)
- }
- hc.currentHeaderHash = hc.CurrentHeader().Hash()
- headHeaderGauge.Update(hc.CurrentHeader().Number.Int64())
-
- if err := WriteHeadHeaderHash(hc.chainDb, hc.currentHeaderHash); err != nil {
- log.Crit("Failed to reset head header hash", "err", err)
- }
}
// SetGenesis sets a new genesis block header for the chain
diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go
index 7073e8431a60..b8cb9cbd7650 100644
--- a/core/rawdb/accessors_chain.go
+++ b/core/rawdb/accessors_chain.go
@@ -24,12 +24,32 @@ import (
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/core/types"
+ "github.com/XinFinOrg/XDPoSChain/crypto"
"github.com/XinFinOrg/XDPoSChain/ethdb"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/params"
"github.com/XinFinOrg/XDPoSChain/rlp"
)
+// ReadCanonicalHash retrieves the hash assigned to a canonical block number.
+func ReadCanonicalHash(db ethdb.Reader, number uint64) common.Hash {
+ data, _ := db.Ancient(freezerHashTable, number)
+ if len(data) == 0 {
+ data, _ = db.Get(headerHashKey(number))
+ // In the background freezer is moving data from leveldb to flatten files.
+ // So during the first check for ancient db, the data is not yet in there,
+ // but when we reach into leveldb, the data was already moved. That would
+ // result in a not found error.
+ if len(data) == 0 {
+ data, _ = db.Ancient(freezerHashTable, number)
+ }
+ }
+ if len(data) == 0 {
+ return common.Hash{}
+ }
+ return common.BytesToHash(data)
+}
+
// WriteCanonicalHash stores the hash assigned to a canonical block number.
func WriteCanonicalHash(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
if err := db.Put(headerHashKey(number), hash.Bytes()); err != nil {
@@ -37,6 +57,13 @@ func WriteCanonicalHash(db ethdb.KeyValueWriter, hash common.Hash, number uint64
}
}
+// DeleteCanonicalHash removes the number to hash canonical mapping.
+func DeleteCanonicalHash(db ethdb.KeyValueWriter, number uint64) {
+ if err := db.Delete(headerHashKey(number)); err != nil {
+ log.Crit("Failed to delete number to hash mapping", "err", err)
+ }
+}
+
// ReadHeaderNumber returns the header number assigned to a hash.
func ReadHeaderNumber(db ethdb.KeyValueReader, hash common.Hash) *uint64 {
data, _ := db.Get(headerNumberKey(hash))
@@ -56,6 +83,20 @@ func WriteHeaderNumber(db ethdb.KeyValueWriter, hash common.Hash, number uint64)
}
}
+// DeleteHeaderNumber removes hash->number mapping.
+func DeleteHeaderNumber(db ethdb.KeyValueWriter, hash common.Hash) {
+ if err := db.Delete(headerNumberKey(hash)); err != nil {
+ log.Crit("Failed to delete hash to number mapping", "err", err)
+ }
+}
+
+// WriteHeadHeaderHash stores the hash of the current canonical head header.
+func WriteHeadHeaderHash(db ethdb.KeyValueWriter, hash common.Hash) {
+ if err := db.Put(headHeaderKey, hash.Bytes()); err != nil {
+ log.Crit("Failed to store last header's hash", "err", err)
+ }
+}
+
// WriteHeadBlockHash stores the head block's hash.
func WriteHeadBlockHash(db ethdb.KeyValueWriter, hash common.Hash) {
if err := db.Put(headBlockKey, hash.Bytes()); err != nil {
@@ -63,10 +104,47 @@ func WriteHeadBlockHash(db ethdb.KeyValueWriter, hash common.Hash) {
}
}
+// WriteHeadFastBlockHash stores the hash of the current fast-sync head block.
+func WriteHeadFastBlockHash(db ethdb.KeyValueWriter, hash common.Hash) {
+ if err := db.Put(headFastBlockKey, hash.Bytes()); err != nil {
+ log.Crit("Failed to store last fast block's hash", "err", err)
+ }
+}
+
// ReadHeaderRLP retrieves a block header in its raw RLP database encoding.
func ReadHeaderRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
- data, _ := db.Get(headerKey(number, hash))
- return data
+ // First try to look up the data in ancient database. Extra hash
+ // comparison is necessary since ancient database only maintains
+ // the canonical data.
+ data, _ := db.Ancient(freezerHeaderTable, number)
+ if len(data) > 0 && crypto.Keccak256Hash(data) == hash {
+ return data
+ }
+ // Then try to look up the data in leveldb.
+ data, _ = db.Get(headerKey(number, hash))
+ if len(data) > 0 {
+ return data
+ }
+ // In the background freezer is moving data from leveldb to flatten files.
+ // So during the first check for ancient db, the data is not yet in there,
+ // but when we reach into leveldb, the data was already moved. That would
+ // result in a not found error.
+ data, _ = db.Ancient(freezerHeaderTable, number)
+ if len(data) > 0 && crypto.Keccak256Hash(data) == hash {
+ return data
+ }
+ return nil // Can't find the data anywhere.
+}
+
+// HasHeader verifies the existence of a block header corresponding to the hash.
+func HasHeader(db ethdb.Reader, hash common.Hash, number uint64) bool {
+ if has, err := db.Ancient(freezerHashTable, number); err == nil && common.BytesToHash(has) == hash {
+ return true
+ }
+ if has, err := db.Has(headerKey(number, hash)); !has || err != nil {
+ return false
+ }
+ return true
}
// ReadHeader retrieves the block header corresponding to the hash.
@@ -104,6 +182,22 @@ func WriteHeader(db ethdb.KeyValueWriter, header *types.Header) {
}
}
+// DeleteHeader removes all block header data associated with a hash.
+func DeleteHeader(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
+ deleteHeaderWithoutNumber(db, hash, number)
+ if err := db.Delete(headerNumberKey(hash)); err != nil {
+ log.Crit("Failed to delete hash to number mapping", "err", err)
+ }
+}
+
+// deleteHeaderWithoutNumber removes only the block header but does not remove
+// the hash to number mapping.
+func deleteHeaderWithoutNumber(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
+ if err := db.Delete(headerKey(number, hash)); err != nil {
+ log.Crit("Failed to delete header", "err", err)
+ }
+}
+
// ReadBodyRLP retrieves the block body (transactions and uncles) in RLP encoding.
func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
// First try to look up the data in ancient database. Extra hash
@@ -165,6 +259,31 @@ func WriteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64, body *t
WriteBodyRLP(db, hash, number, data)
}
+// DeleteBody removes all block body data associated with a hash.
+func DeleteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
+ if err := db.Delete(blockBodyKey(number, hash)); err != nil {
+ log.Crit("Failed to delete block body", "err", err)
+ }
+}
+
+// WriteTd stores the total difficulty of a block into the database.
+func WriteTd(db ethdb.KeyValueWriter, hash common.Hash, number uint64, td *big.Int) {
+ data, err := rlp.EncodeToBytes(td)
+ if err != nil {
+ log.Crit("Failed to RLP encode block total difficulty", "err", err)
+ }
+ if err := db.Put(headerTDKey(number, hash), data); err != nil {
+ log.Crit("Failed to store block total difficulty", "err", err)
+ }
+}
+
+// DeleteTd removes all block total difficulty data associated with a hash.
+func DeleteTd(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
+ if err := db.Delete(headerTDKey(number, hash)); err != nil {
+ log.Crit("Failed to delete block total difficulty", "err", err)
+ }
+}
+
// ReadReceiptsRLP retrieves all the transaction receipts belonging to a block in RLP encoding.
func ReadReceiptsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
// First try to look up the data in ancient database. Extra hash
@@ -270,6 +389,13 @@ func WriteReceipts(db ethdb.KeyValueWriter, hash common.Hash, number uint64, rec
}
}
+// DeleteReceipts removes all receipt data associated with a block hash.
+func DeleteReceipts(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
+ if err := db.Delete(blockReceiptsKey(number, hash)); err != nil {
+ log.Crit("Failed to delete block receipts", "err", err)
+ }
+}
+
// storedReceiptRLP is the storage encoding of a receipt.
// Re-definition in core/types/receipt.go.
type storedReceiptRLP struct {
diff --git a/core/rawdb/accessors_indexes.go b/core/rawdb/accessors_indexes.go
new file mode 100644
index 000000000000..7f9409875193
--- /dev/null
+++ b/core/rawdb/accessors_indexes.go
@@ -0,0 +1,56 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package rawdb
+
+import (
+ "github.com/XinFinOrg/XDPoSChain/common"
+ "github.com/XinFinOrg/XDPoSChain/core/types"
+ "github.com/XinFinOrg/XDPoSChain/ethdb"
+ "github.com/XinFinOrg/XDPoSChain/log"
+ "github.com/XinFinOrg/XDPoSChain/rlp"
+)
+
+type TxLookupEntry struct {
+ BlockHash common.Hash
+ BlockIndex uint64
+ Index uint64
+}
+
+// WriteTxLookupEntriesByBlock stores a positional metadata for every transaction from
+// a block, enabling hash based transaction and receipt lookups.
+func WriteTxLookupEntriesByBlock(db ethdb.KeyValueWriter, block *types.Block) {
+ // Iterate over each transaction and encode its metadata
+ for i, tx := range block.Transactions() {
+ entry := TxLookupEntry{
+ BlockHash: block.Hash(),
+ BlockIndex: block.NumberU64(),
+ Index: uint64(i),
+ }
+ data, err := rlp.EncodeToBytes(entry)
+ if err != nil {
+ log.Crit("Failed to RLP encode TxLookupEntry", "err", err)
+ }
+ if err := db.Put(txLookupKey(tx.Hash()), data); err != nil {
+ log.Crit("Failed to store tx lookup entry", "err", err)
+ }
+ }
+}
+
+// DeleteTxLookupEntry removes all transaction data associated with a hash.
+func DeleteTxLookupEntry(db ethdb.KeyValueWriter, hash common.Hash) {
+ db.Delete(txLookupKey(hash))
+}
diff --git a/core/rawdb/accessors_metadata.go b/core/rawdb/accessors_metadata.go
new file mode 100644
index 000000000000..a56ab22d7b4c
--- /dev/null
+++ b/core/rawdb/accessors_metadata.go
@@ -0,0 +1,34 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package rawdb
+
+import (
+ "github.com/XinFinOrg/XDPoSChain/common"
+ "github.com/XinFinOrg/XDPoSChain/ethdb"
+ "github.com/XinFinOrg/XDPoSChain/log"
+)
+
+// WritePreimages writes the provided set of preimages to the database.
+func WritePreimages(db ethdb.KeyValueWriter, preimages map[common.Hash][]byte) {
+ for hash, preimage := range preimages {
+ if err := db.Put(preimageKey(hash), preimage); err != nil {
+ log.Crit("Failed to store trie preimage", "err", err)
+ }
+ }
+ preimageCounter.Inc(int64(len(preimages)))
+ preimageHitCounter.Inc(int64(len(preimages)))
+}
diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go
index 26d098347e74..a0a1852c7550 100644
--- a/core/rawdb/schema.go
+++ b/core/rawdb/schema.go
@@ -21,23 +21,41 @@ import (
"encoding/binary"
"github.com/XinFinOrg/XDPoSChain/common"
+ "github.com/XinFinOrg/XDPoSChain/metrics"
)
// The fields below define the low level database schema prefixing.
var (
+ // headHeaderKey tracks the latest known header's hash.
+ headHeaderKey = []byte("LastHeader")
+
// headBlockKey tracks the latest known full block's hash.
headBlockKey = []byte("LastBlock")
+ // headFastBlockKey tracks the latest known incomplete block's hash during fast sync.
+ headFastBlockKey = []byte("LastFast")
+
// Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes).
headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header
+ headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td
headerHashSuffix = []byte("n") // headerPrefix + num (uint64 big endian) + headerHashSuffix -> hash
headerNumberPrefix = []byte("H") // headerNumberPrefix + hash -> num (uint64 big endian)
blockBodyPrefix = []byte("b") // blockBodyPrefix + num (uint64 big endian) + hash -> block body
blockReceiptsPrefix = []byte("r") // blockReceiptsPrefix + num (uint64 big endian) + hash -> block receipts
+
+ txLookupPrefix = []byte("l") // txLookupPrefix + hash -> transaction/receipt lookup metadata
+
+ preimagePrefix = []byte("secure-key-") // preimagePrefix + hash -> preimage
+
+ preimageCounter = metrics.NewRegisteredCounter("db/preimage/total", nil)
+ preimageHitCounter = metrics.NewRegisteredCounter("db/preimage/hits", nil)
)
const (
+ // freezerHeaderTable indicates the name of the freezer header table.
+ freezerHeaderTable = "headers"
+
// freezerHashTable indicates the name of the freezer canonical hash table.
freezerHashTable = "hashes"
@@ -60,6 +78,11 @@ func headerKey(number uint64, hash common.Hash) []byte {
return append(append(headerPrefix, encodeBlockNumber(number)...), hash.Bytes()...)
}
+// headerTDKey = headerPrefix + num (uint64 big endian) + hash + headerTDSuffix
+func headerTDKey(number uint64, hash common.Hash) []byte {
+ return append(headerKey(number, hash), headerTDSuffix...)
+}
+
// headerHashKey = headerPrefix + num (uint64 big endian) + headerHashSuffix
func headerHashKey(number uint64) []byte {
return append(append(headerPrefix, encodeBlockNumber(number)...), headerHashSuffix...)
@@ -79,3 +102,13 @@ func blockBodyKey(number uint64, hash common.Hash) []byte {
func blockReceiptsKey(number uint64, hash common.Hash) []byte {
return append(append(blockReceiptsPrefix, encodeBlockNumber(number)...), hash.Bytes()...)
}
+
+// txLookupKey = txLookupPrefix + hash
+func txLookupKey(hash common.Hash) []byte {
+ return append(txLookupPrefix, hash.Bytes()...)
+}
+
+// preimageKey = preimagePrefix + hash
+func preimageKey(hash common.Hash) []byte {
+ return append(preimagePrefix, hash.Bytes()...)
+}
diff --git a/internal/syncx/mutex.go b/internal/syncx/mutex.go
new file mode 100644
index 000000000000..96a21986c60c
--- /dev/null
+++ b/internal/syncx/mutex.go
@@ -0,0 +1,64 @@
+// Copyright 2021 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+// Package syncx contains exotic synchronization primitives.
+package syncx
+
+// ClosableMutex is a mutex that can also be closed.
+// Once closed, it can never be taken again.
+type ClosableMutex struct {
+ ch chan struct{}
+}
+
+func NewClosableMutex() *ClosableMutex {
+ ch := make(chan struct{}, 1)
+ ch <- struct{}{}
+ return &ClosableMutex{ch}
+}
+
+// TryLock attempts to lock cm.
+// If the mutex is closed, TryLock returns false.
+func (cm *ClosableMutex) TryLock() bool {
+ _, ok := <-cm.ch
+ return ok
+}
+
+// MustLock locks cm.
+// If the mutex is closed, MustLock panics.
+func (cm *ClosableMutex) MustLock() {
+ _, ok := <-cm.ch
+ if !ok {
+ panic("mutex closed")
+ }
+}
+
+// Unlock unlocks cm.
+func (cm *ClosableMutex) Unlock() {
+ select {
+ case cm.ch <- struct{}{}:
+ default:
+ panic("Unlock of already-unlocked ClosableMutex")
+ }
+}
+
+// Close locks the mutex, then closes it.
+func (cm *ClosableMutex) Close() {
+ _, ok := <-cm.ch
+ if !ok {
+ panic("Close of already-closed ClosableMutex")
+ }
+ close(cm.ch)
+}
diff --git a/light/lightchain.go b/light/lightchain.go
index 9cbf95ddf9c6..4b90cf3cca7a 100644
--- a/light/lightchain.go
+++ b/light/lightchain.go
@@ -56,7 +56,6 @@ type LightChain struct {
scope event.SubscriptionScope
genesisBlock *types.Block
- mu sync.RWMutex
chainmu sync.RWMutex
bodyCache *lru.Cache[common.Hash, *types.Body]
@@ -147,7 +146,6 @@ func (lc *LightChain) loadLastState() error {
lc.hc.SetCurrentHeader(header)
}
}
-
// Issue a status log and return
header := lc.hc.CurrentHeader()
headerTd := lc.GetTd(header.Hash(), header.Number.Uint64())
@@ -159,10 +157,10 @@ func (lc *LightChain) loadLastState() error {
// SetHead rewinds the local chain to a new head. Everything above the new
// head will be deleted and the new one set.
func (lc *LightChain) SetHead(head uint64) {
- lc.mu.Lock()
- defer lc.mu.Unlock()
+ lc.chainmu.Lock()
+ defer lc.chainmu.Unlock()
- lc.hc.SetHead(head, nil)
+ lc.hc.SetHead(head, nil, nil)
lc.loadLastState()
}
@@ -182,14 +180,17 @@ func (lc *LightChain) ResetWithGenesisBlock(genesis *types.Block) {
// Dump the entire block chain and purge the caches
lc.SetHead(0)
- lc.mu.Lock()
- defer lc.mu.Unlock()
+ lc.chainmu.Lock()
+ defer lc.chainmu.Unlock()
// Prepare the genesis block and reinitialise the chain
- if err := core.WriteTd(lc.chainDb, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil {
- log.Crit("Failed to write genesis block TD", "err", err)
+ batch := lc.chainDb.NewBatch()
+ rawdb.WriteTd(batch, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty())
+ rawdb.WriteBlock(batch, genesis)
+ rawdb.WriteHeadHeaderHash(batch, genesis.Hash())
+ if err := batch.Write(); err != nil {
+ log.Crit("Failed to reset genesis block", "err", err)
}
- rawdb.WriteBlock(lc.chainDb, genesis)
lc.genesisBlock = genesis
lc.hc.SetGenesis(lc.genesisBlock.Header())
lc.hc.SetCurrentHeader(lc.genesisBlock.Header())
@@ -297,16 +298,25 @@ func (lc *LightChain) Stop() {
// Rollback is designed to remove a chain of links from the database that aren't
// certain enough to be valid.
func (lc *LightChain) Rollback(chain []common.Hash) {
- lc.mu.Lock()
- defer lc.mu.Unlock()
+ lc.chainmu.Lock()
+ defer lc.chainmu.Unlock()
+ batch := lc.chainDb.NewBatch()
for i := len(chain) - 1; i >= 0; i-- {
hash := chain[i]
+ // Degrade the chain markers if they are explicitly reverted.
+ // In theory we should update all in-memory markers in the
+ // last step, however the direction of rollback is from high
+ // to low, so it's safe the update in-memory markers directly.
if head := lc.hc.CurrentHeader(); head.Hash() == hash {
+ rawdb.WriteHeadHeaderHash(batch, head.ParentHash)
lc.hc.SetCurrentHeader(lc.GetHeader(head.ParentHash, head.Number.Uint64()-1))
}
}
+ if err := batch.Write(); err != nil {
+ log.Crit("Failed to rollback light chain", "error", err)
+ }
}
// postChainEvents iterates over the events generated by a chain insertion and
@@ -344,19 +354,13 @@ func (lc *LightChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i
// Make sure only one thread manipulates the chain at once
lc.chainmu.Lock()
- defer func() {
- lc.chainmu.Unlock()
- time.Sleep(time.Millisecond * 10) // ugly hack; do not hog chain lock in case syncing is CPU-limited by validation
- }()
+ defer lc.chainmu.Unlock()
lc.wg.Add(1)
defer lc.wg.Done()
var events []interface{}
whFunc := func(header *types.Header) error {
- lc.mu.Lock()
- defer lc.mu.Unlock()
-
status, err := lc.hc.WriteHeader(header)
switch status {
@@ -448,13 +452,17 @@ func (lc *LightChain) SyncCht(ctx context.Context) bool {
chtCount, _, _ := lc.odr.ChtIndexer().Sections()
if headNum+1 < chtCount*CHTFrequencyClient {
num := chtCount*CHTFrequencyClient - 1
- header, err := GetHeaderByNumber(ctx, lc.odr, num)
- if header != nil && err == nil {
- lc.mu.Lock()
+ // Retrieve the latest useful header and update to it
+ if header, err := GetHeaderByNumber(ctx, lc.odr, num); header != nil && err == nil {
+ lc.chainmu.Lock()
+ defer lc.chainmu.Unlock()
+
+ // Ensure the chain didn't move past the latest block while retrieving it
if lc.hc.CurrentHeader().Number.Uint64() < header.Number.Uint64() {
+ log.Info("Updated latest header based on CHT", "number", header.Number, "hash", header.Hash())
+ rawdb.WriteHeadHeaderHash(lc.chainDb, header.Hash())
lc.hc.SetCurrentHeader(header)
}
- lc.mu.Unlock()
return true
}
}
diff --git a/light/lightchain_test.go b/light/lightchain_test.go
index 7d762e2e3f73..4733810e94d9 100644
--- a/light/lightchain_test.go
+++ b/light/lightchain_test.go
@@ -18,10 +18,11 @@ package light
import (
"context"
- "github.com/XinFinOrg/XDPoSChain/core/rawdb"
"math/big"
"testing"
+ "github.com/XinFinOrg/XDPoSChain/core/rawdb"
+
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/consensus/ethash"
"github.com/XinFinOrg/XDPoSChain/core"
@@ -122,10 +123,10 @@ func testHeaderChainImport(chain []*types.Header, lightchain *LightChain) error
return err
}
// Manually insert the header into the database, but don't reorganize (allows subsequent testing)
- lightchain.mu.Lock()
+ lightchain.chainmu.Lock()
core.WriteTd(lightchain.chainDb, header.Hash(), header.Number.Uint64(), new(big.Int).Add(header.Difficulty, lightchain.GetTdByHash(header.ParentHash)))
rawdb.WriteHeader(lightchain.chainDb, header)
- lightchain.mu.Unlock()
+ lightchain.chainmu.Unlock()
}
return nil
}
diff --git a/light/txpool.go b/light/txpool.go
index a1d9190a3874..7a7bf619d7f3 100644
--- a/light/txpool.go
+++ b/light/txpool.go
@@ -25,6 +25,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/core"
+ "github.com/XinFinOrg/XDPoSChain/core/rawdb"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/txpool"
"github.com/XinFinOrg/XDPoSChain/core/types"
@@ -205,15 +206,17 @@ func (p *TxPool) checkMinedTxs(ctx context.Context, hash common.Hash, number uin
// rollbackTxs marks the transactions contained in recently rolled back blocks
// as rolled back. It also removes any positional lookup entries.
func (p *TxPool) rollbackTxs(hash common.Hash, txc txStateChanges) {
+ batch := p.chainDb.NewBatch()
if list, ok := p.mined[hash]; ok {
for _, tx := range list {
txHash := tx.Hash()
- core.DeleteTxLookupEntry(p.chainDb, txHash)
+ rawdb.DeleteTxLookupEntry(batch, txHash)
p.pending[txHash] = tx
txc.setState(txHash, false)
}
delete(p.mined, hash)
}
+ batch.Write()
}
// reorgOnNewHead sets a new head header, processing (and rolling back if necessary)