Skip to content

Commit

Permalink
core: refactor function reorg (ethereum#23761, ethereum#24616, ethere…
Browse files Browse the repository at this point in the history
  • Loading branch information
gzliudan committed Jan 4, 2025
1 parent 97879c0 commit 7deeaee
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 88 deletions.
247 changes: 159 additions & 88 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ var (

errInsertionInterrupted = errors.New("insertion is interrupted")
errChainStopped = errors.New("blockchain is stopped")
errInvalidOldChain = errors.New("invalid old chain")
errInvalidNewChain = errors.New("invalid new chain")

CheckpointCh = make(chan int)
)
Expand Down Expand Up @@ -1476,7 +1478,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
if reorg {
// Reorganise the chain if the parent is not the head block
if block.ParentHash() != currentBlock.Hash() {
if err := bc.reorg(currentBlock, block); err != nil {
if err := bc.reorg(currentBlock.Header(), block.Header()); err != nil {
return NonStatTy, err
}
}
Expand All @@ -1491,9 +1493,8 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
bc.writeHeadBlock(block, false)
// prepare set of masternodes for the next epoch
if bc.chainConfig.XDPoS != nil && ((block.NumberU64() % bc.chainConfig.XDPoS.Epoch) == (bc.chainConfig.XDPoS.Epoch - bc.chainConfig.XDPoS.Gap)) {
err := bc.UpdateM1()
if err != nil {
log.Crit("Error when update masternodes set. Stopping node", "err", err, "blockNum", block.NumberU64())
if err := bc.UpdateM1(); err != nil {
log.Crit("Fail to update masternodes during writeBlockWithState", "number", block.Number, "hash", block.Hash().Hex(), "err", err)
}
}
}
Expand Down Expand Up @@ -2275,153 +2276,223 @@ func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log {
// reorg takes two blocks, an old chain and a new chain and will reconstruct the
// blocks and inserts them to be part of the new canonical chain and accumulates
// potential missing transactions and post an event about them.
func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
func (bc *BlockChain) reorg(oldHead, newHead *types.Header) error {
log.Warn("Reorg", "OldHash", oldHead.Hash().Hex(), "OldNum", oldHead.Number, "NewHash", newHead.Hash().Hex(), "NewNum", newHead.Number)

var (
newChain types.Blocks
oldChain types.Blocks
commonBlock *types.Block
deletedTxs types.Transactions
addedTxs types.Transactions
deletedLogs []*types.Log
newChain []*types.Header
oldChain []*types.Header
commonBlock *types.Header
)
log.Warn("Reorg", "oldBlock hash", oldBlock.Hash().Hex(), "number", oldBlock.NumberU64(), "newBlock hash", newBlock.Hash().Hex(), "number", newBlock.NumberU64())

// first reduce whoever is higher bound
if oldBlock.NumberU64() > newBlock.NumberU64() {
// reduce old chain
for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) {
oldChain = append(oldChain, oldBlock)
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
if logs := bc.collectLogs(oldBlock, true); len(logs) > 0 {
deletedLogs = append(deletedLogs, logs...)
}

// Reduce the longer chain to the same number as the shorter one
if oldHead.Number.Uint64() > newHead.Number.Uint64() {
// Old chain is longer, gather all transactions and logs as deleted ones
for ; oldHead != nil && oldHead.Number.Uint64() != newHead.Number.Uint64(); oldHead = bc.GetHeader(oldHead.ParentHash, oldHead.Number.Uint64()-1) {
oldChain = append(oldChain, oldHead)
}
} else {
// reduce new chain and append new chain blocks for inserting later on
for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) {
newChain = append(newChain, newBlock)
// New chain is longer, stash all blocks away for subsequent insertion
for ; newHead != nil && newHead.Number.Uint64() != oldHead.Number.Uint64(); newHead = bc.GetHeader(newHead.ParentHash, newHead.Number.Uint64()-1) {
newChain = append(newChain, newHead)
}
}
if oldBlock == nil {
return errors.New("invalid old chain")
if oldHead == nil {
return errInvalidOldChain
}
if newBlock == nil {
return errors.New("invalid new chain")
if newHead == nil {
return errInvalidNewChain
}

// Both sides of the reorg are at the same number, reduce both until the common
// ancestor is found
for {
if oldBlock.Hash() == newBlock.Hash() {
commonBlock = oldBlock
// If the common ancestor was found, bail out
if oldHead.Hash() == newHead.Hash() {
commonBlock = oldHead
break
}
// Remove an old block as well as stash away a new block
oldChain = append(oldChain, oldHead)
newChain = append(newChain, newHead)

oldChain = append(oldChain, oldBlock)
newChain = append(newChain, newBlock)
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
if logs := bc.collectLogs(oldBlock, true); len(logs) > 0 {
deletedLogs = append(deletedLogs, logs...)
// Step back with both chains
oldHead = bc.GetHeader(oldHead.ParentHash, oldHead.Number.Uint64()-1)
if oldHead == nil {
return errInvalidOldChain
}

oldBlock, newBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1), bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1)
if oldBlock == nil {
return errors.New("invalid old chain")
}
if newBlock == nil {
return errors.New("invalid new chain")
newHead = bc.GetHeader(newHead.ParentHash, newHead.Number.Uint64()-1)
if newHead == nil {
return errInvalidNewChain
}
}

// Ensure XDPoS engine committed block will be not reverted
if xdpos, ok := bc.Engine().(*XDPoS.XDPoS); ok {
latestCommittedBlock := xdpos.EngineV2.GetLatestCommittedBlockInfo()
if latestCommittedBlock != nil {
currentBlock := bc.CurrentBlock()
currentBlock.Number().Cmp(latestCommittedBlock.Number)
cmp := commonBlock.Number().Cmp(latestCommittedBlock.Number)
cmp := commonBlock.Number.Cmp(latestCommittedBlock.Number)
if cmp < 0 {
for _, oldBlock := range oldChain {
if oldBlock.Number().Cmp(latestCommittedBlock.Number) == 0 {
if oldBlock.Number.Cmp(latestCommittedBlock.Number) == 0 {
if oldBlock.Hash() != latestCommittedBlock.Hash {
log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "committed hash", latestCommittedBlock.Hash)
log.Error("Impossible reorg, please file an issue", "OldNum", oldBlock.Number, "OldHash", oldBlock.Hash().Hex(), "LatestCommittedHash", latestCommittedBlock.Hash.Hex())
} else {
log.Warn("Stop reorg, blockchain is under forking attack", "old committed num", oldBlock.Number(), "old committed hash", oldBlock.Hash())
return fmt.Errorf("stop reorg, blockchain is under forking attack. old committed num %d, hash %x", oldBlock.Number(), oldBlock.Hash())
log.Warn("Stop reorg, blockchain is under forking attack", "OldCommittedNum", oldBlock.Number, "OldCommittedHash", oldBlock.Hash().Hex())
return fmt.Errorf("stop reorg, blockchain is under forking attack. OldCommitted num %d, hash %s", oldBlock.Number, oldBlock.Hash().Hex())
}
}
}
} else if cmp == 0 {
if commonBlock.Hash() != latestCommittedBlock.Hash {
log.Error("Impossible reorg, please file an issue", "oldnum", commonBlock.Number(), "oldhash", commonBlock.Hash(), "committed hash", latestCommittedBlock.Hash)
log.Error("Impossible reorg, please file an issue", "OldNum", commonBlock.Number.Uint64(), "OldHash", commonBlock.Hash().Hex(), "LatestCommittedHash", latestCommittedBlock.Hash.Hex())
}
}
}
}

// Ensure the user sees large reorgs
if len(oldChain) > 0 && len(newChain) > 0 {
logFn := log.Warn
logFn := log.Info
msg := "Chain reorg detected"
if len(oldChain) > 63 {
msg = "Large chain reorg detected"
logFn = log.Warn
}
logFn("Chain split detected", "number", commonBlock.Number(), "hash", commonBlock.Hash(),
"drop", len(oldChain), "dropfrom", oldChain[0].Hash(), "add", len(newChain), "addfrom", newChain[0].Hash())
logFn(msg, "number", commonBlock.Number, "hash", commonBlock.Hash().Hex(),
"drop", len(oldChain), "dropfrom", oldChain[0].Hash().Hex(), "add", len(newChain), "addfrom", newChain[0].Hash().Hex())
blockReorgAddMeter.Mark(int64(len(newChain)))
blockReorgDropMeter.Mark(int64(len(oldChain)))
blockReorgMeter.Mark(1)
} else if len(newChain) > 0 {
// Special case happens in the post merge stage that current head is
// the ancestor of new head while these two blocks are not consecutive
log.Info("Extend chain", "add", len(newChain), "number", newChain[0].Number, "hash", newChain[0].Hash())
blockReorgAddMeter.Mark(int64(len(newChain)))
} else {
log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "newnum", newBlock.Number(), "newhash", newBlock.Hash())
// len(newChain) == 0 && len(oldChain) > 0
// rewind the canonical chain to a lower point.
log.Error("Impossible reorg, please file an issue", "oldnum", oldHead.Number, "oldhash", oldHead.Hash(), "oldblocks", len(oldChain), "newnum", newHead.Number, "newhash", newHead.Hash(), "newblocks", len(newChain))
}

// Insert the new chain(except the head block(reverse order)),
// taking care of the proper incremental order.
for i := len(newChain) - 1; i >= 0; i-- {
// insert the block in the canonical way, re-writing history
bc.writeHeadBlock(newChain[i], true)
// Acquire the tx-lookup lock before mutation. This step is essential
// as the txlookups should be changed atomically, and all subsequent
// reads should be blocked until the mutation is complete.
// bc.txLookupLock.Lock()

// Reorg can be executed, start reducing the chain's old blocks and appending
// the new blocks
var (
deletedTxs []common.Hash
rebirthTxs []common.Hash

deletedLogs []*types.Log
rebirthLogs []*types.Log
)

// Deleted log emission on the API uses forward order, which is borked, but
// we'll leave it in for legacy reasons.
//
// TODO(karalabe): This should be nuked out, no idea how, deprecate some APIs?
{
for i := len(oldChain) - 1; i >= 0; i-- {
block := bc.GetBlock(oldChain[i].Hash(), oldChain[i].Number.Uint64())
if block == nil {
return errInvalidOldChain // Corrupt database, mostly here to avoid weird panics
}
if logs := bc.collectLogs(block, true); len(logs) > 0 {
deletedLogs = append(deletedLogs, logs...)
}
if len(deletedLogs) > 512 {
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
deletedLogs = nil
}
// TODO(daniel): remove chainSideFeed, reference PR #30601
// Also send event for blocks removed from the canon chain.
// bc.chainSideFeed.Send(ChainSideEvent{Block: block})
}
if len(deletedLogs) > 0 {
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
}
}

// Collect the new added transactions.
addedTxs = append(addedTxs, newChain[i].Transactions()...)
// Undo old blocks in reverse order
for i := 0; i < len(oldChain); i++ {
// Collect all the deleted transactions
block := bc.GetBlock(oldChain[i].Hash(), oldChain[i].Number.Uint64())
if block == nil {
return errInvalidOldChain // Corrupt database, mostly here to avoid weird panics
}
for _, tx := range block.Transactions() {
deletedTxs = append(deletedTxs, tx.Hash())
}
// Collect deleted logs and emit them for new integrations
// if logs := bc.collectLogs(block, true); len(logs) > 0 {
// slices.Reverse(logs) // Emit revertals latest first, older then
// }
}

// Apply new blocks in forward order
for i := len(newChain) - 1; i >= 0; i-- {
// Collect all the included transactions
block := bc.GetBlock(newChain[i].Hash(), newChain[i].Number.Uint64())
if block == nil {
return errInvalidNewChain // Corrupt database, mostly here to avoid weird panics
}
for _, tx := range block.Transactions() {
rebirthTxs = append(rebirthTxs, tx.Hash())
}
// Collect inserted logs and emit them
if logs := bc.collectLogs(block, false); len(logs) > 0 {
rebirthLogs = append(rebirthLogs, logs...)
}
if len(rebirthLogs) > 512 {
bc.logsFeed.Send(rebirthLogs)
rebirthLogs = nil
}
// Update the head block
bc.writeHeadBlock(block, true)
// prepare set of masternodes for the next epoch
if bc.chainConfig.XDPoS != nil && ((newChain[i].NumberU64() % bc.chainConfig.XDPoS.Epoch) == (bc.chainConfig.XDPoS.Epoch - bc.chainConfig.XDPoS.Gap)) {
err := bc.UpdateM1()
if err != nil {
log.Crit("Error when update masternodes set. Stopping node", "err", err, "blockNumber", newChain[i].NumberU64())
if bc.chainConfig.XDPoS != nil && ((block.NumberU64() % bc.chainConfig.XDPoS.Epoch) == (bc.chainConfig.XDPoS.Epoch - bc.chainConfig.XDPoS.Gap)) {
if err := bc.UpdateM1(); err != nil {
log.Crit("Fail to update masternodes during reorg", "number", block.Number, "hash", block.Hash().Hex(), "err", err)
}
}
}
if len(rebirthLogs) > 0 {
bc.logsFeed.Send(rebirthLogs)
}

// Delete useless indexes right now which includes the non-canonical
// transaction indexes, canonical chain indexes which above the head.
indexesBatch := bc.db.NewBatch()
for _, tx := range types.TxDifference(deletedTxs, addedTxs) {
rawdb.DeleteTxLookupEntry(indexesBatch, tx.Hash())
batch := bc.db.NewBatch()
for _, tx := range types.HashDifference(deletedTxs, rebirthTxs) {
rawdb.DeleteTxLookupEntry(batch, tx)
}
// Delete all hash markers that are not part of the new canonical chain.
// Because the reorg function handles new chain head, all hash
// markers greater than new chain head should be deleted.
number := commonBlock.Number
if len(newChain) > 0 {
number = newChain[0].Number
}
// Delete any canonical number assignments above the new head
number := bc.CurrentBlock().NumberU64()
for i := number + 1; ; i++ {
for i := number.Uint64() + 1; ; i++ {
hash := rawdb.ReadCanonicalHash(bc.db, i)
if hash == (common.Hash{}) {
break
}
rawdb.DeleteCanonicalHash(indexesBatch, i)
rawdb.DeleteCanonicalHash(batch, i)
}
if err := indexesBatch.Write(); err != nil {
if err := batch.Write(); err != nil {
log.Crit("Failed to delete useless indexes", "err", err)
}
// If any logs need to be fired, do it now. In theory we could avoid creating
// this goroutine if there are no events to fire, but realistcally that only
// ever happens if we're reorging empty blocks, which will only happen on idle
// networks where performance is not an issue either way.
if len(deletedLogs) > 0 {
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
}
if len(oldChain) > 0 {
go func() {
for i := len(oldChain) - 1; i >= 0; i-- {
bc.chainSideFeed.Send(ChainSideEvent{Block: oldChain[i]})
}
}()
}

// Reset the tx lookup cache to clear stale txlookup cache.
// bc.txLookupCache.Purge()

// Release the tx-lookup lock after mutation.
// bc.txLookupLock.Unlock()

return nil
}

Expand Down
18 changes: 18 additions & 0 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,24 @@ func TxDifference(a, b Transactions) (keep Transactions) {
return keep
}

// HashDifference returns a new set of hashes that are present in a but not in b.
func HashDifference(a, b []common.Hash) []common.Hash {
keep := make([]common.Hash, 0, len(a))

remove := make(map[common.Hash]struct{})
for _, hash := range b {
remove[hash] = struct{}{}
}

for _, hash := range a {
if _, ok := remove[hash]; !ok {
keep = append(keep, hash)
}
}

return keep
}

// TxByNonce implements the sort interface to allow sorting a list of transactions
// by their nonces. This is usually only useful for sorting transactions from a
// single account, otherwise a nonce comparison doesn't make much sense.
Expand Down

0 comments on commit 7deeaee

Please sign in to comment.