Skip to content

Commit

Permalink
netsync: refactor handleBlockMsg
Browse files Browse the repository at this point in the history
handleBlockMsg used to check that the block header is both valid and
then process the blocks as they come in.  It's now refactored so that
it also handles blocks that are not in order.  For out of order block
downloads handleBlockMsg would mark the block as an orphan but it's now
refactored to handle those cases.

Whenever a block that's not the next from the chain tip is received,
it's now temporarily stored in memory until the next block from the
chain tip is received.  And then all the blocks that are in sequence are
processed.
  • Loading branch information
kcalvinalvin committed Dec 13, 2024
1 parent e646d43 commit 976be30
Showing 1 changed file with 150 additions and 54 deletions.
204 changes: 150 additions & 54 deletions netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,12 @@ type SyncManager struct {
lastProgressTime time.Time

// The following fields are used for headers-first mode.
headersFirstMode bool
headerList *list.List
startHeader *list.Element
nextCheckpoint *chaincfg.Checkpoint
headersFirstMode bool
headerList *list.List
startHeader *list.Element
nextCheckpoint *chaincfg.Checkpoint
queuedBlocks map[chainhash.Hash]*blockMsg
queuedBlocksPrevHash map[chainhash.Hash]chainhash.Hash

// An optional fee estimator.
feeEstimator *mempool.FeeEstimator
Expand Down Expand Up @@ -685,30 +687,10 @@ func (sm *SyncManager) current() bool {
return true
}

// handleBlockMsg handles block messages from all peers.
func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
// processBlock checks if the block connects to the best chain.
func (sm *SyncManager) processBlock(bmsg *blockMsg) (bool, error) {
peer := bmsg.peer
state, exists := sm.peerStates[peer]
if !exists {
log.Warnf("Received block message from unknown peer %s", peer)
return
}

// If we didn't ask for this block then the peer is misbehaving.
blockHash := bmsg.block.Hash()
if _, exists = state.requestedBlocks[*blockHash]; !exists {
// The regression test intentionally sends some blocks twice
// to test duplicate block insertion fails. Don't disconnect
// the peer or ignore the block when we're in regression test
// mode in this case so the chain code is actually fed the
// duplicate blocks.
if sm.chainParams != &chaincfg.RegressionNetParams {
log.Warnf("Got unrequested block %v from %s -- "+
"disconnecting", blockHash, peer.Addr())
peer.Disconnect()
return
}
}

// When in headers-first mode, if the block matches the hash of the
// first header in the list of headers that are being fetched, it's
Expand All @@ -734,12 +716,6 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
}
}

// Remove block from request maps. Either chain will know about it and
// so we shouldn't have any more instances of trying to fetch it, or we
// will fail the insert and thus we'll retry next time we get an inv.
delete(state.requestedBlocks, *blockHash)
delete(sm.requestedBlocks, *blockHash)

// Process the block to include validation, best chain selection, orphan
// handling, etc.
_, isOrphan, err := sm.chain.ProcessBlock(bmsg.block, behaviorFlags)
Expand All @@ -764,7 +740,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
// send it.
code, reason := mempool.ErrToRejectErr(err)
peer.PushRejectMsg(wire.CmdBlock, code, reason, blockHash, false)
return
return false, err
}

// Meta-data about the new block this peer is reporting. We use this
Expand Down Expand Up @@ -840,22 +816,136 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
}
}

return isCheckpointBlock, nil
}

// handleBlockMsg handles block messages from all peers.
func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
peer := bmsg.peer
state, exists := sm.peerStates[peer]
if !exists {
log.Warnf("Received block message from unknown peer %s", peer)
return
}

// If we didn't ask for this block then the peer is misbehaving.
blockHash := bmsg.block.Hash()
if _, exists := state.requestedBlocks[*blockHash]; !exists {
// The regression test intentionally sends some blocks twice
// to test duplicate block insertion fails. Don't disconnect
// the peer or ignore the block when we're in regression test
// mode in this case so the chain code is actually fed the
// duplicate blocks.
if sm.chainParams != &chaincfg.RegressionNetParams {
log.Warnf("Got unrequested block %v from %s."+
"this peer may be a stalling peer -- disconnecting",
blockHash, peer.Addr())
peer.Disconnect()
return
}
}

// Remove block from request maps. Either chain will know about it and
// so we shouldn't have any more instances of trying to fetch it or we
// will fail the insert and thus we'll retry next time we get an inv.
delete(sm.requestedBlocks, *blockHash)
delete(state.requestedBlocks, *blockHash)

_, err := sm.processBlock(bmsg)
if err != nil {
return
}

// If we are not in headers first mode, it's a good time to periodically
// flush the blockchain cache because we don't expect new blocks immediately.
// After that, there is nothing more to do.
if !sm.headersFirstMode {
if err := sm.chain.FlushUtxoCache(blockchain.FlushPeriodic); err != nil {
log.Errorf("Error while flushing the blockchain cache: %v", err)
}
if err := sm.chain.FlushUtxoCache(blockchain.FlushPeriodic); err != nil {
log.Errorf("Error while flushing the blockchain cache: %v", err)
}
}

// handleBlockMsgInHeadersFirst handles block messages from all peers when the
// sync manager is in headers first mode. For blocks received out of order, it
// first keeps them in memory and sends them to be processed when the next block
// from the tip is available.
func (sm *SyncManager) handleBlockMsgInHeadersFirst(bmsg *blockMsg) {
blockHash := bmsg.block.Hash()
peer := bmsg.peer
state, exists := sm.peerStates[peer]
if !exists {
log.Warnf("Received block message from unknown peer %s", peer)
return
}

// If we didn't ask for this block then the peer is misbehaving.
if _, exists := state.requestedBlocks[*blockHash]; !exists {
// The regression test intentionally sends some blocks twice
// to test duplicate block insertion fails. Don't disconnect
// the peer or ignore the block when we're in regression test
// mode in this case so the chain code is actually fed the
// duplicate blocks.
if sm.chainParams != &chaincfg.RegressionNetParams {
log.Warnf("Got unrequested block %v from %s."+
"this peer may be a stalling peer -- disconnecting",
blockHash, peer.Addr())
peer.Disconnect()
return
}
}

// Add the block to the queue.
sm.queuedBlocks[*blockHash] = bmsg
sm.queuedBlocksPrevHash[bmsg.block.MsgBlock().Header.PrevBlock] = *blockHash

// Remove block from the request map. Either chain will know about it
// and so we shouldn't have any more instances of trying to fetch it, we
// keep it in the queued blocks map, or we will fail the insert and thus
// we'll retry next time we get an inv.
delete(sm.requestedBlocks, *blockHash)

// Since we may receive blocks out of order, attempt to find the next block
// and any other descendent blocks that connect to it.
processBlocks := make([]*blockMsg, 0, 1024)

bestHash := sm.chain.BestSnapshot().Hash
for len(sm.queuedBlocks) > 0 {
hash, found := sm.queuedBlocksPrevHash[bestHash]
if !found {
break
}

b, found := sm.queuedBlocks[hash]
if !found {
// Break when we're missing the next block in
// sequence.
break
}

// Append the block to be processed and delete from the
// queue.
delete(sm.queuedBlocks, hash)
delete(sm.queuedBlocksPrevHash, bestHash)
processBlocks = append(processBlocks, b)
bestHash = hash
}

var isCheckpointBlock bool
if len(processBlocks) > 0 {
for _, blockMsg := range processBlocks {
var err error
isCheckpointBlock, err = sm.processBlock(blockMsg)
if err != nil {
return
}
}
}

// This is headers-first mode, so if the block is not a checkpoint
// request more blocks using the header list when the request queue is
// getting short.
if !isCheckpointBlock {
if sm.startHeader != nil &&
len(state.requestedBlocks) < minInFlightBlocks {
len(sm.requestedBlocks) < minInFlightBlocks {
sm.fetchHeaderBlocks()
}
return
Expand All @@ -870,7 +960,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
sm.nextCheckpoint = sm.findNextHeaderCheckpoint(prevHeight)
if sm.nextCheckpoint != nil {
locator := blockchain.BlockLocator([]*chainhash.Hash{prevHash})
err := peer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash)
err := sm.syncPeer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash)
if err != nil {
log.Warnf("Failed to send getheaders message to "+
"peer %s: %v", peer.Addr(), err)
Expand All @@ -889,7 +979,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
sm.headerList.Init()
log.Infof("Reached the final checkpoint -- switching to normal mode")
locator := blockchain.BlockLocator([]*chainhash.Hash{blockHash})
err = peer.PushGetBlocksMsg(locator, &zeroHash)
err := peer.PushGetBlocksMsg(locator, &zeroHash)
if err != nil {
log.Warnf("Failed to send getblocks message to peer %s: %v",
peer.Addr(), err)
Expand Down Expand Up @@ -1362,7 +1452,11 @@ out:
msg.reply <- struct{}{}

case *blockMsg:
sm.handleBlockMsg(msg)
if sm.headersFirstMode {
sm.handleBlockMsgInHeadersFirst(msg)
} else {
sm.handleBlockMsg(msg)
}
msg.reply <- struct{}{}

case *invMsg:
Expand Down Expand Up @@ -1670,19 +1764,21 @@ func (sm *SyncManager) Pause() chan<- struct{} {
// block, tx, and inv updates.
func New(config *Config) (*SyncManager, error) {
sm := SyncManager{
peerNotifier: config.PeerNotifier,
chain: config.Chain,
txMemPool: config.TxMemPool,
chainParams: config.ChainParams,
rejectedTxns: make(map[chainhash.Hash]struct{}),
requestedTxns: make(map[chainhash.Hash]struct{}),
requestedBlocks: make(map[chainhash.Hash]struct{}),
peerStates: make(map[*peerpkg.Peer]*peerSyncState),
progressLogger: newBlockProgressLogger("Processed", log),
msgChan: make(chan interface{}, config.MaxPeers*3),
headerList: list.New(),
quit: make(chan struct{}),
feeEstimator: config.FeeEstimator,
peerNotifier: config.PeerNotifier,
chain: config.Chain,
txMemPool: config.TxMemPool,
chainParams: config.ChainParams,
rejectedTxns: make(map[chainhash.Hash]struct{}),
requestedTxns: make(map[chainhash.Hash]struct{}),
requestedBlocks: make(map[chainhash.Hash]struct{}),
peerStates: make(map[*peerpkg.Peer]*peerSyncState),
progressLogger: newBlockProgressLogger("Processed", log),
msgChan: make(chan interface{}, config.MaxPeers*3),
headerList: list.New(),
quit: make(chan struct{}),
queuedBlocks: make(map[chainhash.Hash]*blockMsg),
queuedBlocksPrevHash: make(map[chainhash.Hash]chainhash.Hash),
feeEstimator: config.FeeEstimator,
}

best := sm.chain.BestSnapshot()
Expand Down

0 comments on commit 976be30

Please sign in to comment.