From 328b3b427ede879b886fa8be552bea3533dc06a3 Mon Sep 17 00:00:00 2001 From: gop Date: Wed, 20 Sep 2023 11:38:38 -0500 Subject: [PATCH 1/3] Removed the PendingEtxs/Rollup and MissingPendingEtx/Rollup fetch --- core/core.go | 16 --- core/headerchain.go | 47 +-------- core/slice.go | 8 -- eth/handler.go | 181 ++-------------------------------- eth/handler_eth.go | 28 ------ eth/peerset.go | 16 --- eth/protocols/eth/handler.go | 18 ++-- eth/protocols/eth/handlers.go | 93 ++++------------- eth/protocols/eth/peer.go | 109 +++----------------- eth/protocols/eth/protocol.go | 54 +--------- 10 files changed, 51 insertions(+), 519 deletions(-) diff --git a/core/core.go b/core/core.go index ee433ee52e..8bd8e3ac4c 100644 --- a/core/core.go +++ b/core/core.go @@ -484,10 +484,6 @@ func (c *Core) SendPendingEtxsToDom(pEtxs types.PendingEtxs) error { return c.sl.SendPendingEtxsToDom(pEtxs) } -func (c *Core) SubscribePendingEtxs(ch chan<- types.PendingEtxs) event.Subscription { - return c.sl.SubscribePendingEtxs(ch) -} - func (c *Core) AddPendingEtxs(pEtxs types.PendingEtxs) error { return c.sl.AddPendingEtxs(pEtxs) } @@ -496,10 +492,6 @@ func (c *Core) AddPendingEtxsRollup(pEtxsRollup types.PendingEtxsRollup) error { return c.sl.AddPendingEtxsRollup(pEtxsRollup) } -func (c *Core) SubscribePendingEtxsRollup(ch chan<- types.PendingEtxsRollup) event.Subscription { - return c.sl.SubscribePendingEtxsRollup(ch) -} - func (c *Core) GenerateRecoveryPendingHeader(pendingHeader *types.Header, checkpointHashes types.Termini) error { return c.sl.GenerateRecoveryPendingHeader(pendingHeader, checkpointHashes) } @@ -671,14 +663,6 @@ func (c *Core) GetTerminiByHash(hash common.Hash) *types.Termini { return c.sl.hc.GetTerminiByHash(hash) } -func (c *Core) SubscribeMissingPendingEtxsEvent(ch chan<- types.HashAndLocation) event.Subscription { - return c.sl.hc.SubscribeMissingPendingEtxsEvent(ch) -} - -func (c *Core) SubscribeMissingPendingEtxsRollupEvent(ch chan<- common.Hash) event.Subscription { - return c.sl.hc.SubscribeMissingPendingEtxsRollupEvent(ch) -} - // SubscribeChainSideEvent registers a subscription of ChainSideEvent. func (c *Core) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription { return c.sl.hc.SubscribeChainSideEvent(ch) diff --git a/core/headerchain.go b/core/headerchain.go index 0af34eef49..5abe1c6da7 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -50,11 +50,9 @@ type HeaderChain struct { headerCache *lru.Cache // Cache for the most recent block headers numberCache *lru.Cache // Cache for the most recent block numbers - pendingEtxsRollup *lru.Cache - pendingEtxs *lru.Cache - blooms *lru.Cache - missingPendingEtxsFeed event.Feed - missingPendingEtxsRollupFeed event.Feed + pendingEtxsRollup *lru.Cache + pendingEtxs *lru.Cache + blooms *lru.Cache wg sync.WaitGroup // chain processing wait group for shutting down running int32 // 0 if chain is running, 1 when stopped @@ -133,14 +131,12 @@ func (hc *HeaderChain) CollectSubRollup(b *types.Block) (types.Transactions, err pendingEtxs, err := hc.GetPendingEtxs(pEtxHash) if err != nil { // Start backfilling the missing pending ETXs needed to process this block - go hc.backfillPETXs(b.Header(), b.SubManifest()) return nil, ErrPendingEtxNotFound } subRollup = append(subRollup, pendingEtxs.Etxs...) } } else { // Start backfilling the missing pending ETXs needed to process this block - go hc.backfillPETXs(b.Header(), b.SubManifest()) return nil, ErrPendingEtxNotFound } // Region works normally as before collecting pendingEtxs for each hash in the manifest @@ -148,7 +144,6 @@ func (hc *HeaderChain) CollectSubRollup(b *types.Block) (types.Transactions, err pendingEtxs, err := hc.GetPendingEtxs(hash) if err != nil { // Start backfilling the missing pending ETXs needed to process this block - go hc.backfillPETXs(b.Header(), b.SubManifest()) return nil, ErrPendingEtxNotFound } subRollup = append(subRollup, pendingEtxs.Etxs...) @@ -208,34 +203,6 @@ func (hc *HeaderChain) GetBloom(hash common.Hash) (*types.Bloom, error) { return &bloom, nil } -// backfillPETXs collects any missing PendingETX objects needed to process the -// given header. This is done by informing the fetcher of any pending ETXs we do -// not have, so that they can be fetched from our peers. -func (hc *HeaderChain) backfillPETXs(header *types.Header, subManifest types.BlockManifest) { - nodeCtx := common.NodeLocation.Context() - for _, hash := range subManifest { - if nodeCtx == common.PRIME_CTX { - // In the case of prime, get the pendingEtxsRollup for each region block - // and then fetch the pending etx for each of the rollup hashes - if pEtxRollup, err := hc.GetPendingEtxsRollup(hash); err == nil { - for _, pEtxHash := range pEtxRollup.Manifest { - if _, err := hc.GetPendingEtxs(pEtxHash); err != nil { - // Send the pendingEtxs to the feed for broadcast - hc.missingPendingEtxsFeed.Send(types.HashAndLocation{Hash: pEtxHash, Location: pEtxRollup.Header.Location()}) - } - } - } else { - hc.missingPendingEtxsRollupFeed.Send(hash) - } - } else if nodeCtx == common.REGION_CTX { - if _, err := hc.GetPendingEtxs(hash); err != nil { - // Send the pendingEtxs to the feed for broadcast - hc.missingPendingEtxsFeed.Send(types.HashAndLocation{Hash: hash, Location: header.Location()}) - } - } - } -} - // Collect all emmitted ETXs since the last coincident block, but excluding // those emitted in this block func (hc *HeaderChain) CollectEtxRollup(b *types.Block) (types.Transactions, error) { @@ -914,14 +881,6 @@ func (hc *HeaderChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.S return hc.scope.Track(hc.chainSideFeed.Subscribe(ch)) } -func (hc *HeaderChain) SubscribeMissingPendingEtxsEvent(ch chan<- types.HashAndLocation) event.Subscription { - return hc.scope.Track(hc.missingPendingEtxsFeed.Subscribe(ch)) -} - -func (hc *HeaderChain) SubscribeMissingPendingEtxsRollupEvent(ch chan<- common.Hash) event.Subscription { - return hc.scope.Track(hc.missingPendingEtxsRollupFeed.Subscribe(ch)) -} - func (hc *HeaderChain) StateAt(root common.Hash) (*state.StateDB, error) { return hc.bc.processor.StateAt(root) } diff --git a/core/slice.go b/core/slice.go index 5e6fdc164a..abc0a7c1ea 100644 --- a/core/slice.go +++ b/core/slice.go @@ -1195,14 +1195,6 @@ func (sl *Slice) TxPool() *TxPool { return sl.txPool } func (sl *Slice) Miner() *Miner { return sl.miner } -func (sl *Slice) SubscribePendingEtxs(ch chan<- types.PendingEtxs) event.Subscription { - return sl.scope.Track(sl.pendingEtxsFeed.Subscribe(ch)) -} - -func (sl *Slice) SubscribePendingEtxsRollup(ch chan<- types.PendingEtxsRollup) event.Subscription { - return sl.scope.Track(sl.pendingEtxsRollupFeed.Subscribe(ch)) -} - func (sl *Slice) CurrentInfo(header *types.Header) bool { return sl.miner.worker.CurrentInfo(header) } diff --git a/eth/handler.go b/eth/handler.go index e7be57994b..9b342f2517 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -43,18 +43,6 @@ const ( // The number is referenced from the size of tx pool. txChanSize = 4096 - // c_pendingEtxBroadcastChanSize is the size of channel listening to pEtx Event. - c_pendingEtxBroadcastChanSize = 10 - - // c_missingPendingEtxsRollupChanSize is the size of channel listening to missing pEtxsRollup Event. - c_missingPendingEtxsRollupChanSize = 10 - - // c_pendingEtxRollupBroadcastChanSize is the size of channel listening to pEtx rollup Event. - c_pendingEtxRollupBroadcastChanSize = 10 - - // missingPendingEtxsChanSize is the size of channel listening to the MissingPendingEtxsEvent - missingPendingEtxsChanSize = 10 - // missingParentChanSize is the size of channel listening to the MissingParentEvent missingParentChanSize = 10 @@ -129,21 +117,12 @@ type handler struct { txFetcher *fetcher.TxFetcher peers *peerSet - eventMux *event.TypeMux - txsCh chan core.NewTxsEvent - txsSub event.Subscription - minedBlockSub *event.TypeMuxSubscription - missingPendingEtxsCh chan types.HashAndLocation - missingPendingEtxsSub event.Subscription - missingParentCh chan common.Hash - missingParentSub event.Subscription - - pEtxCh chan types.PendingEtxs - pEtxSub event.Subscription - pEtxRollupCh chan types.PendingEtxsRollup - pEtxRollupSub event.Subscription - missingPEtxsRollupCh chan common.Hash - missingPEtxsRollupSub event.Subscription + eventMux *event.TypeMux + txsCh chan core.NewTxsEvent + txsSub event.Subscription + minedBlockSub *event.TypeMuxSubscription + missingParentCh chan common.Hash + missingParentSub event.Subscription whitelist map[uint64]common.Hash @@ -331,12 +310,6 @@ func (h *handler) Start(maxPeers int) { go h.txBroadcastLoop() } - // broadcast pending etxs - h.wg.Add(1) - h.missingPendingEtxsCh = make(chan types.HashAndLocation, missingPendingEtxsChanSize) - h.missingPendingEtxsSub = h.core.SubscribeMissingPendingEtxsEvent(h.missingPendingEtxsCh) - go h.missingPendingEtxsLoop() - h.wg.Add(1) h.missingParentCh = make(chan common.Hash, missingParentChanSize) h.missingParentSub = h.core.SubscribeMissingParentEvent(h.missingParentCh) @@ -354,22 +327,6 @@ func (h *handler) Start(maxPeers int) { h.wg.Add(1) go h.txsyncLoop64() //Legacy initial tx echange, drop with eth/64. } - - h.wg.Add(1) - h.missingPEtxsRollupCh = make(chan common.Hash, c_pendingEtxBroadcastChanSize) - h.missingPEtxsRollupSub = h.core.SubscribeMissingPendingEtxsRollupEvent(h.missingPEtxsRollupCh) - go h.missingPEtxsRollupLoop() - - h.wg.Add(1) - h.pEtxCh = make(chan types.PendingEtxs, c_pendingEtxBroadcastChanSize) - h.pEtxSub = h.core.SubscribePendingEtxs(h.pEtxCh) - go h.broadcastPEtxLoop() - - // broadcast pending etxs rollup - h.wg.Add(1) - h.pEtxRollupCh = make(chan types.PendingEtxsRollup, c_pendingEtxRollupBroadcastChanSize) - h.pEtxRollupSub = h.core.SubscribePendingEtxsRollup(h.pEtxRollupCh) - go h.broadcastPEtxRollupLoop() } func (h *handler) Stop() { @@ -377,12 +334,8 @@ func (h *handler) Stop() { if nodeCtx == common.ZONE_CTX && h.core.ProcessingState() { h.txsSub.Unsubscribe() // quits txBroadcastLoop } - h.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop - h.missingPendingEtxsSub.Unsubscribe() // quits pendingEtxsBroadcastLoop - h.missingPEtxsRollupSub.Unsubscribe() // quits missingPEtxsRollupSub - h.missingParentSub.Unsubscribe() // quits missingParentLoop - h.pEtxSub.Unsubscribe() // quits pEtxSub - h.pEtxRollupSub.Unsubscribe() // quits pEtxRollupSub + h.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop + h.missingParentSub.Unsubscribe() // quits missingParentLoop // Quit chainSync and txsync64. // After this is done, no new peers will be accepted. @@ -509,54 +462,6 @@ func (h *handler) txBroadcastLoop() { } } -// missingPEtxsRollupLoop listens to the MissingBody event in Slice and calls the blockAnnounces. -func (h *handler) missingPEtxsRollupLoop() { - defer h.wg.Done() - for { - select { - case hash := <-h.missingPEtxsRollupCh: - // Check if any of the peers have the body - for _, peer := range h.selectSomePeers() { - log.Trace("Fetching the missing pending etxs rollup from", "peer", peer.ID(), "hash", hash) - peer.RequestOnePendingEtxsRollup(hash) - } - - case <-h.missingPEtxsRollupSub.Err(): - return - } - } -} - -// pendingEtxsBroadcastLoop announces new pendingEtxs to connected peers. -func (h *handler) missingPendingEtxsLoop() { - defer h.wg.Done() - for { - select { - case hashAndLocation := <-h.missingPendingEtxsCh: - // Only ask from peers running the slice for the missing pending etxs - // In the future, peers not responding before the timeout has to be punished - peersRunningSlice := h.peers.peerRunningSlice(hashAndLocation.Location) - // If the node doesn't have any peer running that slice, add a warning - if len(peersRunningSlice) == 0 { - log.Warn("Node doesn't have peers for given Location", "location", hashAndLocation.Location) - } - // Check if any of the peers have the body - for _, peer := range peersRunningSlice { - log.Trace("Fetching the missing pending etxs from", "peer", peer.ID(), "hash", hashAndLocation.Hash) - peer.RequestOnePendingEtxs(hashAndLocation.Hash) - } - if len(peersRunningSlice) == 0 { - for _, peer := range h.selectSomePeers() { - log.Trace("Fetching the missing pending etxs from", "peer", peer.ID(), "hash", hashAndLocation.Hash) - peer.RequestOnePendingEtxs(hashAndLocation.Hash) - } - } - case <-h.missingPendingEtxsSub.Err(): - return - } - } -} - // missingParentLoop announces new pendingEtxs to connected peers. func (h *handler) missingParentLoop() { defer h.wg.Done() @@ -574,76 +479,6 @@ func (h *handler) missingParentLoop() { } } -// pEtxLoop listens to the pendingEtxs event in Slice and anounces the pEtx to the peer -func (h *handler) broadcastPEtxLoop() { - defer h.wg.Done() - for { - select { - case pEtx := <-h.pEtxCh: - h.BroadcastPendingEtxs(pEtx) - case <-h.pEtxSub.Err(): - return - } - } -} - -// pEtxRollupLoop listens to the pendingEtxs event in Slice and anounces the pEtx to the peer -func (h *handler) broadcastPEtxRollupLoop() { - defer h.wg.Done() - for { - select { - case pEtxRollup := <-h.pEtxRollupCh: - h.BroadcastPendingEtxsRollup(pEtxRollup) - case <-h.pEtxRollupSub.Err(): - return - } - } -} - -// BroadcastPendingEtxs will either propagate a pendingEtxs to a subset of its peers -func (h *handler) BroadcastPendingEtxs(pEtx types.PendingEtxs) { - hash := pEtx.Header.Hash() - peers := h.peers.peersWithoutPendingEtxs(hash) - - // Send the block to a subset of our peers - var peerThreshold int - sqrtNumPeers := int(math.Sqrt(float64(len(peers)))) - if sqrtNumPeers < minPeerSend { - peerThreshold = len(peers) - } else { - peerThreshold = sqrtNumPeers - } - transfer := peers[:peerThreshold] - // If in region send the pendingEtxs directly, otherwise send the pendingEtxsManifest - for _, peer := range transfer { - peer.SendPendingEtxs(pEtx) - } - log.Trace("Propagated pending etxs", "hash", hash, "recipients", len(transfer), "len", len(pEtx.Etxs)) - return -} - -// BroadcastPendingEtxsRollup will either propagate a pending etx rollup to a subset of its peers -func (h *handler) BroadcastPendingEtxsRollup(pEtxRollup types.PendingEtxsRollup) { - hash := pEtxRollup.Header.Hash() - peers := h.peers.peersWithoutPendingEtxs(hash) - - // Send the block to a subset of our peers - var peerThreshold int - sqrtNumPeers := int(math.Sqrt(float64(len(peers)))) - if sqrtNumPeers < minPeerSend { - peerThreshold = len(peers) - } else { - peerThreshold = sqrtNumPeers - } - transfer := peers[:peerThreshold] - // If in region send the pendingEtxs directly, otherwise send the pendingEtxsManifest - for _, peer := range transfer { - peer.SendPendingEtxsRollup(pEtxRollup) - } - log.Trace("Propagated pending etxs rollup", "hash", hash, "recipients", len(transfer), "len", len(pEtxRollup.Manifest)) - return -} - func (h *handler) selectSomePeers() []*eth.Peer { // Get the min(sqrt(len(peers)), minPeerRequest) count := int(math.Sqrt(float64(len(h.peers.allPeers())))) diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 92b34f3292..eec5beb8f3 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -93,12 +93,6 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { case *eth.PooledTransactionsPacket: return h.txFetcher.Enqueue(peer.ID(), *packet, true) - case *eth.PendingEtxsPacket: - return h.handlePendingEtxs(*&packet.PendingEtxs) - - case *eth.PendingEtxsRollupPacket: - return h.handlePendingEtxsRollup(peer, *&packet.PendingEtxsRollup) - default: return fmt.Errorf("unexpected eth packet type: %T", packet) } @@ -211,25 +205,3 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block) er } return nil } - -func (h *ethHandler) handlePendingEtxs(pendingEtxs types.PendingEtxs) error { - err := h.core.AddPendingEtxs(pendingEtxs) - if err != nil { - log.Error("Error in handling pendingEtxs broadcast", "err", err) - return err - } - return nil -} - -func (h *ethHandler) handlePendingEtxsRollup(peer *eth.Peer, pEtxsRollup types.PendingEtxsRollup) error { - err := h.core.AddPendingEtxsRollup(pEtxsRollup) - if err != nil { - log.Error("Error in handling pendingEtxs rollup broadcast", "err", err) - return err - } - // For each hash in manifest request for the pendingEtxs - for _, hash := range pEtxsRollup.Manifest { - peer.RequestOnePendingEtxs(hash) - } - return nil -} diff --git a/eth/peerset.go b/eth/peerset.go index a50f83bd5f..e0030e1fd1 100644 --- a/eth/peerset.go +++ b/eth/peerset.go @@ -131,22 +131,6 @@ func (ps *peerSet) peersWithoutTransaction(hash common.Hash) []*ethPeer { return list } -// peersWithoutPendingEtxs retrieves a list of peers that do not have a given pending etxs in -// their set of known hashes so it might be propagated to them. -// TODO: Make sure that this is only being set on the broadcast receipient -func (ps *peerSet) peersWithoutPendingEtxs(hash common.Hash) []*ethPeer { - ps.lock.RLock() - defer ps.lock.RUnlock() - - list := make([]*ethPeer, 0, len(ps.peers)) - for _, p := range ps.peers { - if !p.KnownPendingEtxs(hash) { - list = append(list, p) - } - } - return list -} - // len returns if the current number of `eth` peers in the set. Since the `snap` // peers are tied to the existence of an `eth` connection, that will always be a // subset of `eth`. diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index d8a5669a7b..ad8f9e59cd 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -183,17 +183,13 @@ var eth66 = map[uint64]msgHandler{ TransactionsMsg: handleTransactions, NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes, // eth66 messages with request-id - GetBlockHeadersMsg: handleGetBlockHeaders66, - BlockHeadersMsg: handleBlockHeaders66, - GetBlockBodiesMsg: handleGetBlockBodies66, - BlockBodiesMsg: handleBlockBodies66, - GetPooledTransactionsMsg: handleGetPooledTransactions66, - PendingEtxsMsg: handlePendingEtxs, - PendingEtxsRollupMsg: handlePendingEtxsRollup, - GetOnePendingEtxsRollupMsg: handleGetOnePendingEtxsRollup66, - GetOnePendingEtxsMsg: handleGetOnePendingEtxs66, - PooledTransactionsMsg: handlePooledTransactions66, - GetBlockMsg: handleGetBlock66, + GetBlockHeadersMsg: handleGetBlockHeaders66, + BlockHeadersMsg: handleBlockHeaders66, + GetBlockBodiesMsg: handleGetBlockBodies66, + BlockBodiesMsg: handleBlockBodies66, + GetPooledTransactionsMsg: handleGetPooledTransactions66, + PooledTransactionsMsg: handlePooledTransactions66, + GetBlockMsg: handleGetBlock66, } // handleMessage is invoked whenever an inbound message is received from a remote diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index 5adc16dc06..b1fe4b013e 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -189,59 +189,6 @@ func handleGetBlock66(backend Backend, msg Decoder, peer *Peer) error { return nil } -func handlePendingEtxs(backend Backend, msg Decoder, peer *Peer) error { - // Decode the block pending etxs retrieval message - ann := new(PendingEtxsPacket) - if err := msg.Decode(&ann); err != nil { - return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) - } - // Mark the hashes as present at the remote node - peer.markPendingEtxs(ann.PendingEtxs.Header.Hash()) - - return backend.Handle(peer, ann) -} - -func handlePendingEtxsRollup(backend Backend, msg Decoder, peer *Peer) error { - // Decode the block pending etxs rollup retrieval message - ann := new(PendingEtxsRollupPacket) - if err := msg.Decode(&ann); err != nil { - return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) - } - return backend.Handle(peer, ann) -} - -func handleGetOnePendingEtxs66(backend Backend, msg Decoder, peer *Peer) error { - // Decode the block pending etxs retrieval message - var query GetOnePendingEtxsPacket66 - if err := msg.Decode(&query); err != nil { - return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) - } - requestTracker.Fulfil(peer.id, peer.version, GetOnePendingEtxsMsg, query.RequestId) - pendingEtxs := backend.Core().GetPendingEtxs(query.Hash) - if pendingEtxs == nil { - log.Debug("Couldn't complete a pendingEtxs request for", "Hash", query.Hash) - return nil - } - log.Trace("Completing a pendingEtxs request for", "Hash", pendingEtxs.Header.Hash()) - return peer.SendPendingEtxs(*pendingEtxs) -} - -func handleGetOnePendingEtxsRollup66(backend Backend, msg Decoder, peer *Peer) error { - // Decode the block pending etxs rollup retrieval message - var query GetOnePendingEtxsRollupPacket66 - if err := msg.Decode(&query); err != nil { - return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) - } - requestTracker.Fulfil(peer.id, peer.version, GetOnePendingEtxsRollupMsg, query.RequestId) - pendingEtxs := backend.Core().GetPendingEtxsRollup(query.Hash) - if pendingEtxs == nil { - log.Debug("Couldn't complete a pendingEtxs request for", "Hash", query.Hash) - return nil - } - log.Trace("Completing a pendingEtxs request for", "Hash", pendingEtxs.Header.Hash()) - return peer.SendPendingEtxsRollup(*pendingEtxs) -} - func handleNewBlockhashes(backend Backend, msg Decoder, peer *Peer) error { // A batch of new block announcements just arrived ann := new(NewBlockHashesPacket) @@ -348,10 +295,10 @@ func handleBlockBodies66(backend Backend, msg Decoder, peer *Peer) error { func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) error { nodeCtx := common.NodeLocation.Context() if nodeCtx != common.ZONE_CTX { - return errors.New("transactions are only handled in zone") - } - if !backend.Core().Slice().ProcessingState() { - return nil + return errors.New("transactions are only handled in zone") + } + if !backend.Core().Slice().ProcessingState() { + return nil } // New transaction announcement arrived, make sure we have // a valid and fresh chain to handle them @@ -372,10 +319,10 @@ func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) func handleGetPooledTransactions(backend Backend, msg Decoder, peer *Peer) error { nodeCtx := common.NodeLocation.Context() if nodeCtx != common.ZONE_CTX { - return errors.New("transactions are only handled in zone") - } - if !backend.Core().Slice().ProcessingState() { - return nil + return errors.New("transactions are only handled in zone") + } + if !backend.Core().Slice().ProcessingState() { + return nil } // Decode the pooled transactions retrieval message var query GetPooledTransactionsPacket @@ -428,10 +375,10 @@ func handleTransactions(backend Backend, msg Decoder, peer *Peer) error { nodeCtx := common.NodeLocation.Context() // Transactions arrived, make sure we have a valid and fresh chain to handle them if nodeCtx != common.ZONE_CTX { - return errors.New("transactions are only handled in zone") - } - if !backend.Core().Slice().ProcessingState() { - return nil + return errors.New("transactions are only handled in zone") + } + if !backend.Core().Slice().ProcessingState() { + return nil } if !backend.AcceptTxs() { return nil @@ -454,10 +401,10 @@ func handleTransactions(backend Backend, msg Decoder, peer *Peer) error { func handlePooledTransactions(backend Backend, msg Decoder, peer *Peer) error { nodeCtx := common.NodeLocation.Context() if nodeCtx != common.ZONE_CTX { - return errors.New("transactions are only handled in zone") - } - if !backend.Core().Slice().ProcessingState() { - return nil + return errors.New("transactions are only handled in zone") + } + if !backend.Core().Slice().ProcessingState() { + return nil } // Transactions arrived, make sure we have a valid and fresh chain to handle them if !backend.AcceptTxs() { @@ -481,10 +428,10 @@ func handlePooledTransactions(backend Backend, msg Decoder, peer *Peer) error { func handlePooledTransactions66(backend Backend, msg Decoder, peer *Peer) error { nodeCtx := common.NodeLocation.Context() if nodeCtx != common.ZONE_CTX { - return errors.New("transactions are only handled in zone") - } - if !backend.Core().Slice().ProcessingState() { - return nil + return errors.New("transactions are only handled in zone") + } + if !backend.Core().Slice().ProcessingState() { + return nil } // Transactions arrived, make sure we have a valid and fresh chain to handle them if !backend.AcceptTxs() { diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index f60a3e9d56..8f6e39fcc6 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -17,7 +17,6 @@ package eth import ( - "errors" "math/big" "math/rand" "sync" @@ -39,10 +38,6 @@ const ( // before starting to randomly evict them. maxKnownBlocks = 1024 - // maxKnownPendingEtxs is the maximum pendingEtxs Header hashes to keep in the known list - // before starting to randomly evict them. - maxKnownPendingEtxs = 1024 - // maxQueuedTxs is the maximum number of transactions to queue up before dropping // older broadcasts. maxQueuedTxs = 4096 @@ -88,8 +83,6 @@ type Peer struct { queuedBlocks chan *blockPropagation // Queue of blocks to broadcast to the peer queuedBlockAnns chan *types.Block // Queue of blocks to announce to the peer - knownPendingEtxs mapset.Set // Set of pending etxs hashes known to be known by this peer - txpool TxPool // Transaction pool used by the broadcasters for liveness checks knownTxs mapset.Set // Set of transaction hashes known to be known by this peer txBroadcast chan []common.Hash // Channel used to queue transaction propagation requests @@ -103,19 +96,18 @@ type Peer struct { // version. func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Peer { peer := &Peer{ - id: p.ID().String(), - Peer: p, - rw: rw, - version: version, - knownTxs: mapset.NewSet(), - knownBlocks: mapset.NewSet(), - knownPendingEtxs: mapset.NewSet(), - queuedBlocks: make(chan *blockPropagation, maxQueuedBlocks), - queuedBlockAnns: make(chan *types.Block, maxQueuedBlockAnns), - txBroadcast: make(chan []common.Hash), - txAnnounce: make(chan []common.Hash), - txpool: txpool, - term: make(chan struct{}), + id: p.ID().String(), + Peer: p, + rw: rw, + version: version, + knownTxs: mapset.NewSet(), + knownBlocks: mapset.NewSet(), + queuedBlocks: make(chan *blockPropagation, maxQueuedBlocks), + queuedBlockAnns: make(chan *types.Block, maxQueuedBlockAnns), + txBroadcast: make(chan []common.Hash), + txAnnounce: make(chan []common.Hash), + txpool: txpool, + term: make(chan struct{}), } // Start up all the broadcasters go peer.broadcastBlocks() @@ -178,11 +170,6 @@ func (p *Peer) KnownTransaction(hash common.Hash) bool { return p.knownTxs.Contains(hash) } -// KnownPendingEtxs returns whether peer is known to already have the pending etx. -func (p *Peer) KnownPendingEtxs(hash common.Hash) bool { - return p.knownPendingEtxs.Contains(hash) -} - // markBlock marks a block as known for the peer, ensuring that the block will // never be propagated to this particular peer. func (p *Peer) markBlock(hash common.Hash) { @@ -203,16 +190,6 @@ func (p *Peer) markTransaction(hash common.Hash) { p.knownTxs.Add(hash) } -// markPendingEtxs marks a pendingEtxs header as known for the peer, ensuring that the block will -// never be propagated to this particular peer. -func (p *Peer) markPendingEtxs(hash common.Hash) { - // If we reached the memory allowance, drop a previously known block hash - for p.knownPendingEtxs.Cardinality() >= maxKnownPendingEtxs { - p.knownPendingEtxs.Pop() - } - p.knownPendingEtxs.Add(hash) -} - // SendTransactions sends transactions to the peer and includes the hashes // in its transaction hash set for future reference. // @@ -537,65 +514,3 @@ func (p *Peer) RequestTxs(hashes []common.Hash) error { } return p2p.Send(p.rw, GetPooledTransactionsMsg, GetPooledTransactionsPacket(hashes)) } - -// RequestOnePendingEtx fetches a pendingEtx for a given block hash from a remote node. -func (p *Peer) RequestOnePendingEtxs(hash common.Hash) error { - p.Log().Debug("Fetching a pending etx", "hash", hash) - if p.Version() >= ETH66 { - id := rand.Uint64() - - requestTracker.Track(p.id, p.version, GetOnePendingEtxsMsg, PendingEtxsMsg, id) - return p2p.Send(p.rw, GetOnePendingEtxsMsg, &GetOnePendingEtxsPacket66{ - RequestId: id, - GetOnePendingEtxsPacket: GetOnePendingEtxsPacket{Hash: hash}, - }) - } - return errors.New("eth65 not supported for RequestOnePendingEtxs call") -} - -// RequestOnePendingEtxsRollup fetches a pendingEtxRollup for a given block hash from a remote node. -func (p *Peer) RequestOnePendingEtxsRollup(hash common.Hash) error { - p.Log().Debug("Fetching a pending etx rollup", "hash", hash) - if p.Version() >= ETH66 { - id := rand.Uint64() - - requestTracker.Track(p.id, p.version, GetOnePendingEtxsRollupMsg, PendingEtxsRollupMsg, id) - return p2p.Send(p.rw, GetOnePendingEtxsRollupMsg, &GetOnePendingEtxsPacket66{ - RequestId: id, - GetOnePendingEtxsPacket: GetOnePendingEtxsPacket{Hash: hash}, - }) - } - return errors.New("eth65 not supported for RequestOnePendingEtxsRollup call") -} - -// SendNewPendingEtxs propagates an entire pendingEtxs to a remote peer. -func (p *Peer) SendPendingEtxs(pendingEtxs types.PendingEtxs) error { - // Mark all the pendingEtxs hash as known, but ensure we don't overflow our limits - for p.knownPendingEtxs.Cardinality() >= maxKnownPendingEtxs { - p.knownPendingEtxs.Pop() - } - p.knownPendingEtxs.Add(pendingEtxs.Header.Hash()) - return p2p.Send(p.rw, PendingEtxsMsg, &PendingEtxsPacket{ - PendingEtxs: pendingEtxs, - }) -} - -// SendNewPendingEtxsRollup propagates an entire pending etx Rollup to a remote peer. -func (p *Peer) SendPendingEtxsRollup(pEtxsRollup types.PendingEtxsRollup) error { - p.Log().Debug("Fetching a pending etx", "hash", pEtxsRollup.Header.Hash()) - if p.Version() >= ETH66 { - id := rand.Uint64() - - // TODO: what to put in the GetOnePendingEtxsMsg - requestTracker.Track(p.id, p.version, GetOnePendingEtxsMsg, PendingEtxsRollupMsg, id) - // Mark all the pendingEtxs hash as known, but ensure we don't overflow our limits - for p.knownPendingEtxs.Cardinality() >= maxKnownPendingEtxs { - p.knownPendingEtxs.Pop() - } - p.knownPendingEtxs.Add(pEtxsRollup.Header.Hash()) - return p2p.Send(p.rw, PendingEtxsRollupMsg, &PendingEtxsRollupPacket{ - PendingEtxsRollup: pEtxsRollup, - }) - } - return errors.New("eth65 not supported for sendPendingEtxsManifest call") -} diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index 60cad4bf27..eea1bd44ab 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -44,7 +44,7 @@ var ProtocolVersions = []uint{ETH66, ETH65} // protocolLengths are the number of implemented message corresponding to // different protocol versions. -var protocolLengths = map[uint]uint64{ETH66: 21, ETH65: 19} +var protocolLengths = map[uint]uint64{ETH66: 12, ETH65: 12} // maxMessageSize is the maximum cap on the size of a protocol message. const maxMessageSize = 10 * 1024 * 1024 @@ -66,11 +66,6 @@ const ( PooledTransactionsMsg = 0x0a GetBlockMsg = 0x0b - - PendingEtxsMsg = 0x11 - GetOnePendingEtxsMsg = 0x12 - PendingEtxsRollupMsg = 0x13 - GetOnePendingEtxsRollupMsg = 0x14 ) var ( @@ -294,44 +289,6 @@ type GetBlockPacket66 struct { GetBlockPacket } -// GetOnePendingEtxsPacket represents a pending etx query -type GetOnePendingEtxsPacket struct { - Hash common.Hash -} - -type GetOnePendingEtxsPacket66 struct { - RequestId uint64 - GetOnePendingEtxsPacket -} - -// GetOnePendingEtxsRollupPacket represents a pending etx query -type GetOnePendingEtxsRollupPacket struct { - Hash common.Hash -} - -type GetOnePendingEtxsRollupPacket66 struct { - RequestId uint64 - GetOnePendingEtxsRollupPacket -} - -type PendingEtxsPacket struct { - PendingEtxs types.PendingEtxs -} - -type PendingEtxsPacket66 struct { - RequestId uint64 - PendingEtxsPacket -} - -type PendingEtxsRollupPacket struct { - PendingEtxsRollup types.PendingEtxsRollup -} - -type PendingEtxsRollupPacket66 struct { - RequestId uint64 - PendingEtxsRollupPacket -} - func (*StatusPacket) Name() string { return "Status" } func (*StatusPacket) Kind() byte { return StatusMsg } @@ -367,12 +324,3 @@ func (*PooledTransactionsPacket) Kind() byte { return PooledTransactionsMsg } func (*GetBlockPacket) Name() string { return "GetBlock" } func (*GetBlockPacket) Kind() byte { return GetBlockMsg } - -func (*GetOnePendingEtxsPacket) Name() string { return "GetOnePendingEtxs" } -func (*GetOnePendingEtxsPacket) Kind() byte { return GetOnePendingEtxsMsg } - -func (*PendingEtxsPacket) Name() string { return "PendingEtxs" } -func (*PendingEtxsPacket) Kind() byte { return PendingEtxsMsg } - -func (*PendingEtxsRollupPacket) Name() string { return "PendingEtxsManifest" } -func (*PendingEtxsRollupPacket) Kind() byte { return PendingEtxsRollupMsg } From 8816d5c076a20c05e5829caf278eb4ed2401ffa0 Mon Sep 17 00:00:00 2001 From: gop Date: Thu, 21 Sep 2023 18:26:16 -0500 Subject: [PATCH 2/3] Increase the frequency of Proc of hashNumberList to 1 when nearly synced --- core/core.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/core.go b/core/core.go index 8bd8e3ac4c..e804185eb8 100644 --- a/core/core.go +++ b/core/core.go @@ -41,6 +41,7 @@ const ( c_appendQueueRetryPriorityThreshold = 5 // If retry counter for a block is less than this number, then its put in the special list that is tried first to be appended c_appendQueueRemoveThreshold = 10 // Number of blocks behind the block should be from the current header to be eligble for removal from the append queue c_normalListProcCounter = 5 // Ratio of Number of times the PriorityList is serviced over the NormalList + c_normalListProcCounterAfterSync = 1 // Ratio of Number of times the PriorityList is serviced over the NormalList when near the sync c_statsPrintPeriod = 60 // Time between stats prints c_appendQueuePrintSize = 10 ) @@ -156,7 +157,11 @@ func (c *Core) InsertChain(blocks types.Blocks) (int, error) { // procAppendQueue sorts the append queue and attempts to append func (c *Core) procAppendQueue() { - if c.procCounter > c_normalListProcCounter { + normalListProcCounter := c_normalListProcCounter + if len(c.appendQueue.Keys()) < c_appendQueueThreshold { + normalListProcCounter = c_normalListProcCounterAfterSync + } + if c.procCounter > normalListProcCounter { c.procCounter = 0 } else { c.procCounter++ From e5a12b96dfde45235f354e13cb1fba0f34bef866 Mon Sep 17 00:00:00 2001 From: gop Date: Thu, 21 Sep 2023 18:31:31 -0500 Subject: [PATCH 3/3] bugfix: Fixing the currentHeader crash on restart after unclean shutdown --- core/headerchain.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/headerchain.go b/core/headerchain.go index 5abe1c6da7..1c75f0fbcc 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -435,6 +435,10 @@ func (hc *HeaderChain) loadLastState() error { if head := rawdb.ReadHeadBlockHash(hc.headerDb); head != (common.Hash{}) { if chead := hc.GetHeaderByHash(head); chead != nil { hc.currentHeader.Store(chead) + } else { + // This is only done if during the stop, currenthead hash was not stored + // properly and it doesn't crash the nodes + hc.currentHeader.Store(hc.genesisHeader) } }