diff --git a/netsync/manager.go b/netsync/manager.go index 3215a86ace..7e8e859f89 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -199,10 +199,12 @@ type SyncManager struct { lastProgressTime time.Time // The following fields are used for headers-first mode. - headersFirstMode bool - headerList *list.List - startHeader *list.Element - nextCheckpoint *chaincfg.Checkpoint + headersFirstMode bool + headerList *list.List + startHeader *list.Element + nextCheckpoint *chaincfg.Checkpoint + queuedBlocks map[chainhash.Hash]*blockMsg + queuedBlocksPrevHash map[chainhash.Hash]chainhash.Hash // An optional fee estimator. feeEstimator *mempool.FeeEstimator @@ -685,30 +687,10 @@ func (sm *SyncManager) current() bool { return true } -// handleBlockMsg handles block messages from all peers. -func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { +// processBlock checks if the block connects to the best chain. +func (sm *SyncManager) processBlock(bmsg *blockMsg) (bool, error) { peer := bmsg.peer - state, exists := sm.peerStates[peer] - if !exists { - log.Warnf("Received block message from unknown peer %s", peer) - return - } - - // If we didn't ask for this block then the peer is misbehaving. blockHash := bmsg.block.Hash() - if _, exists = state.requestedBlocks[*blockHash]; !exists { - // The regression test intentionally sends some blocks twice - // to test duplicate block insertion fails. Don't disconnect - // the peer or ignore the block when we're in regression test - // mode in this case so the chain code is actually fed the - // duplicate blocks. - if sm.chainParams != &chaincfg.RegressionNetParams { - log.Warnf("Got unrequested block %v from %s -- "+ - "disconnecting", blockHash, peer.Addr()) - peer.Disconnect() - return - } - } // When in headers-first mode, if the block matches the hash of the // first header in the list of headers that are being fetched, it's @@ -734,12 +716,6 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { } } - // Remove block from request maps. Either chain will know about it and - // so we shouldn't have any more instances of trying to fetch it, or we - // will fail the insert and thus we'll retry next time we get an inv. - delete(state.requestedBlocks, *blockHash) - delete(sm.requestedBlocks, *blockHash) - // Process the block to include validation, best chain selection, orphan // handling, etc. _, isOrphan, err := sm.chain.ProcessBlock(bmsg.block, behaviorFlags) @@ -764,7 +740,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { // send it. code, reason := mempool.ErrToRejectErr(err) peer.PushRejectMsg(wire.CmdBlock, code, reason, blockHash, false) - return + return false, err } // Meta-data about the new block this peer is reporting. We use this @@ -840,22 +816,136 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { } } + return isCheckpointBlock, nil +} + +// handleBlockMsg handles block messages from all peers. +func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { + peer := bmsg.peer + state, exists := sm.peerStates[peer] + if !exists { + log.Warnf("Received block message from unknown peer %s", peer) + return + } + + // If we didn't ask for this block then the peer is misbehaving. + blockHash := bmsg.block.Hash() + if _, exists := state.requestedBlocks[*blockHash]; !exists { + // The regression test intentionally sends some blocks twice + // to test duplicate block insertion fails. Don't disconnect + // the peer or ignore the block when we're in regression test + // mode in this case so the chain code is actually fed the + // duplicate blocks. + if sm.chainParams != &chaincfg.RegressionNetParams { + log.Warnf("Got unrequested block %v from %s."+ + "this peer may be a stalling peer -- disconnecting", + blockHash, peer.Addr()) + peer.Disconnect() + return + } + } + + // Remove block from request maps. Either chain will know about it and + // so we shouldn't have any more instances of trying to fetch it or we + // will fail the insert and thus we'll retry next time we get an inv. + delete(sm.requestedBlocks, *blockHash) + delete(state.requestedBlocks, *blockHash) + + _, err := sm.processBlock(bmsg) + if err != nil { + return + } + // If we are not in headers first mode, it's a good time to periodically // flush the blockchain cache because we don't expect new blocks immediately. // After that, there is nothing more to do. - if !sm.headersFirstMode { - if err := sm.chain.FlushUtxoCache(blockchain.FlushPeriodic); err != nil { - log.Errorf("Error while flushing the blockchain cache: %v", err) - } + if err := sm.chain.FlushUtxoCache(blockchain.FlushPeriodic); err != nil { + log.Errorf("Error while flushing the blockchain cache: %v", err) + } +} + +// handleBlockMsgInHeadersFirst handles block messages from all peers when the +// sync manager is in headers first mode. For blocks received out of order, it +// first keeps them in memory and sends them to be processed when the next block +// from the tip is available. +func (sm *SyncManager) handleBlockMsgInHeadersFirst(bmsg *blockMsg) { + blockHash := bmsg.block.Hash() + peer := bmsg.peer + state, exists := sm.peerStates[peer] + if !exists { + log.Warnf("Received block message from unknown peer %s", peer) return } + // If we didn't ask for this block then the peer is misbehaving. + if _, exists := state.requestedBlocks[*blockHash]; !exists { + // The regression test intentionally sends some blocks twice + // to test duplicate block insertion fails. Don't disconnect + // the peer or ignore the block when we're in regression test + // mode in this case so the chain code is actually fed the + // duplicate blocks. + if sm.chainParams != &chaincfg.RegressionNetParams { + log.Warnf("Got unrequested block %v from %s."+ + "this peer may be a stalling peer -- disconnecting", + blockHash, peer.Addr()) + peer.Disconnect() + return + } + } + + // Add the block to the queue. + sm.queuedBlocks[*blockHash] = bmsg + sm.queuedBlocksPrevHash[bmsg.block.MsgBlock().Header.PrevBlock] = *blockHash + + // Remove block from the request map. Either chain will know about it + // and so we shouldn't have any more instances of trying to fetch it, we + // keep it in the queued blocks map, or we will fail the insert and thus + // we'll retry next time we get an inv. + delete(sm.requestedBlocks, *blockHash) + + // Since we may receive blocks out of order, attempt to find the next block + // and any other descendent blocks that connect to it. + processBlocks := make([]*blockMsg, 0, 1024) + + bestHash := sm.chain.BestSnapshot().Hash + for len(sm.queuedBlocks) > 0 { + hash, found := sm.queuedBlocksPrevHash[bestHash] + if !found { + break + } + + b, found := sm.queuedBlocks[hash] + if !found { + // Break when we're missing the next block in + // sequence. + break + } + + // Append the block to be processed and delete from the + // queue. + delete(sm.queuedBlocks, hash) + delete(sm.queuedBlocksPrevHash, bestHash) + processBlocks = append(processBlocks, b) + bestHash = hash + } + + var isCheckpointBlock bool + if len(processBlocks) > 0 { + for _, blockMsg := range processBlocks { + var err error + isCheckpointBlock, err = sm.processBlock(blockMsg) + if err != nil { + return + } + } + } + // This is headers-first mode, so if the block is not a checkpoint // request more blocks using the header list when the request queue is // getting short. if !isCheckpointBlock { if sm.startHeader != nil && - len(state.requestedBlocks) < minInFlightBlocks { + len(sm.requestedBlocks) < minInFlightBlocks { sm.fetchHeaderBlocks() } return @@ -870,7 +960,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { sm.nextCheckpoint = sm.findNextHeaderCheckpoint(prevHeight) if sm.nextCheckpoint != nil { locator := blockchain.BlockLocator([]*chainhash.Hash{prevHash}) - err := peer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash) + err := sm.syncPeer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash) if err != nil { log.Warnf("Failed to send getheaders message to "+ "peer %s: %v", peer.Addr(), err) @@ -889,7 +979,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { sm.headerList.Init() log.Infof("Reached the final checkpoint -- switching to normal mode") locator := blockchain.BlockLocator([]*chainhash.Hash{blockHash}) - err = peer.PushGetBlocksMsg(locator, &zeroHash) + err := peer.PushGetBlocksMsg(locator, &zeroHash) if err != nil { log.Warnf("Failed to send getblocks message to peer %s: %v", peer.Addr(), err) @@ -1362,7 +1452,11 @@ out: msg.reply <- struct{}{} case *blockMsg: - sm.handleBlockMsg(msg) + if sm.headersFirstMode { + sm.handleBlockMsgInHeadersFirst(msg) + } else { + sm.handleBlockMsg(msg) + } msg.reply <- struct{}{} case *invMsg: @@ -1670,19 +1764,21 @@ func (sm *SyncManager) Pause() chan<- struct{} { // block, tx, and inv updates. func New(config *Config) (*SyncManager, error) { sm := SyncManager{ - peerNotifier: config.PeerNotifier, - chain: config.Chain, - txMemPool: config.TxMemPool, - chainParams: config.ChainParams, - rejectedTxns: make(map[chainhash.Hash]struct{}), - requestedTxns: make(map[chainhash.Hash]struct{}), - requestedBlocks: make(map[chainhash.Hash]struct{}), - peerStates: make(map[*peerpkg.Peer]*peerSyncState), - progressLogger: newBlockProgressLogger("Processed", log), - msgChan: make(chan interface{}, config.MaxPeers*3), - headerList: list.New(), - quit: make(chan struct{}), - feeEstimator: config.FeeEstimator, + peerNotifier: config.PeerNotifier, + chain: config.Chain, + txMemPool: config.TxMemPool, + chainParams: config.ChainParams, + rejectedTxns: make(map[chainhash.Hash]struct{}), + requestedTxns: make(map[chainhash.Hash]struct{}), + requestedBlocks: make(map[chainhash.Hash]struct{}), + peerStates: make(map[*peerpkg.Peer]*peerSyncState), + progressLogger: newBlockProgressLogger("Processed", log), + msgChan: make(chan interface{}, config.MaxPeers*3), + headerList: list.New(), + quit: make(chan struct{}), + queuedBlocks: make(map[chainhash.Hash]*blockMsg), + queuedBlocksPrevHash: make(map[chainhash.Hash]chainhash.Hash), + feeEstimator: config.FeeEstimator, } best := sm.chain.BestSnapshot()