Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removing the PendingEtx/Rollup Fetching #1150

Merged
merged 3 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,10 +484,6 @@ func (c *Core) SendPendingEtxsToDom(pEtxs types.PendingEtxs) error {
return c.sl.SendPendingEtxsToDom(pEtxs)
}

func (c *Core) SubscribePendingEtxs(ch chan<- types.PendingEtxs) event.Subscription {
return c.sl.SubscribePendingEtxs(ch)
}

func (c *Core) AddPendingEtxs(pEtxs types.PendingEtxs) error {
return c.sl.AddPendingEtxs(pEtxs)
}
Expand All @@ -496,10 +492,6 @@ func (c *Core) AddPendingEtxsRollup(pEtxsRollup types.PendingEtxsRollup) error {
return c.sl.AddPendingEtxsRollup(pEtxsRollup)
}

func (c *Core) SubscribePendingEtxsRollup(ch chan<- types.PendingEtxsRollup) event.Subscription {
return c.sl.SubscribePendingEtxsRollup(ch)
}

func (c *Core) GenerateRecoveryPendingHeader(pendingHeader *types.Header, checkpointHashes types.Termini) error {
return c.sl.GenerateRecoveryPendingHeader(pendingHeader, checkpointHashes)
}
Expand Down Expand Up @@ -671,14 +663,6 @@ func (c *Core) GetTerminiByHash(hash common.Hash) *types.Termini {
return c.sl.hc.GetTerminiByHash(hash)
}

func (c *Core) SubscribeMissingPendingEtxsEvent(ch chan<- types.HashAndLocation) event.Subscription {
return c.sl.hc.SubscribeMissingPendingEtxsEvent(ch)
}

func (c *Core) SubscribeMissingPendingEtxsRollupEvent(ch chan<- common.Hash) event.Subscription {
return c.sl.hc.SubscribeMissingPendingEtxsRollupEvent(ch)
}

// SubscribeChainSideEvent registers a subscription of ChainSideEvent.
func (c *Core) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription {
return c.sl.hc.SubscribeChainSideEvent(ch)
Expand Down
47 changes: 3 additions & 44 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,9 @@ type HeaderChain struct {
headerCache *lru.Cache // Cache for the most recent block headers
numberCache *lru.Cache // Cache for the most recent block numbers

pendingEtxsRollup *lru.Cache
pendingEtxs *lru.Cache
blooms *lru.Cache
missingPendingEtxsFeed event.Feed
missingPendingEtxsRollupFeed event.Feed
pendingEtxsRollup *lru.Cache
pendingEtxs *lru.Cache
blooms *lru.Cache

wg sync.WaitGroup // chain processing wait group for shutting down
running int32 // 0 if chain is running, 1 when stopped
Expand Down Expand Up @@ -133,22 +131,19 @@ func (hc *HeaderChain) CollectSubRollup(b *types.Block) (types.Transactions, err
pendingEtxs, err := hc.GetPendingEtxs(pEtxHash)
if err != nil {
// Start backfilling the missing pending ETXs needed to process this block
go hc.backfillPETXs(b.Header(), b.SubManifest())
return nil, ErrPendingEtxNotFound
}
subRollup = append(subRollup, pendingEtxs.Etxs...)
}
} else {
// Start backfilling the missing pending ETXs needed to process this block
go hc.backfillPETXs(b.Header(), b.SubManifest())
return nil, ErrPendingEtxNotFound
}
// Region works normally as before collecting pendingEtxs for each hash in the manifest
} else if nodeCtx == common.REGION_CTX {
pendingEtxs, err := hc.GetPendingEtxs(hash)
if err != nil {
// Start backfilling the missing pending ETXs needed to process this block
go hc.backfillPETXs(b.Header(), b.SubManifest())
return nil, ErrPendingEtxNotFound
}
subRollup = append(subRollup, pendingEtxs.Etxs...)
Expand Down Expand Up @@ -208,34 +203,6 @@ func (hc *HeaderChain) GetBloom(hash common.Hash) (*types.Bloom, error) {
return &bloom, nil
}

// backfillPETXs collects any missing PendingETX objects needed to process the
// given header. This is done by informing the fetcher of any pending ETXs we do
// not have, so that they can be fetched from our peers.
func (hc *HeaderChain) backfillPETXs(header *types.Header, subManifest types.BlockManifest) {
nodeCtx := common.NodeLocation.Context()
for _, hash := range subManifest {
if nodeCtx == common.PRIME_CTX {
// In the case of prime, get the pendingEtxsRollup for each region block
// and then fetch the pending etx for each of the rollup hashes
if pEtxRollup, err := hc.GetPendingEtxsRollup(hash); err == nil {
for _, pEtxHash := range pEtxRollup.Manifest {
if _, err := hc.GetPendingEtxs(pEtxHash); err != nil {
// Send the pendingEtxs to the feed for broadcast
hc.missingPendingEtxsFeed.Send(types.HashAndLocation{Hash: pEtxHash, Location: pEtxRollup.Header.Location()})
}
}
} else {
hc.missingPendingEtxsRollupFeed.Send(hash)
}
} else if nodeCtx == common.REGION_CTX {
if _, err := hc.GetPendingEtxs(hash); err != nil {
// Send the pendingEtxs to the feed for broadcast
hc.missingPendingEtxsFeed.Send(types.HashAndLocation{Hash: hash, Location: header.Location()})
}
}
}
}

// Collect all emmitted ETXs since the last coincident block, but excluding
// those emitted in this block
func (hc *HeaderChain) CollectEtxRollup(b *types.Block) (types.Transactions, error) {
Expand Down Expand Up @@ -914,14 +881,6 @@ func (hc *HeaderChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.S
return hc.scope.Track(hc.chainSideFeed.Subscribe(ch))
}

func (hc *HeaderChain) SubscribeMissingPendingEtxsEvent(ch chan<- types.HashAndLocation) event.Subscription {
return hc.scope.Track(hc.missingPendingEtxsFeed.Subscribe(ch))
}

func (hc *HeaderChain) SubscribeMissingPendingEtxsRollupEvent(ch chan<- common.Hash) event.Subscription {
return hc.scope.Track(hc.missingPendingEtxsRollupFeed.Subscribe(ch))
}

func (hc *HeaderChain) StateAt(root common.Hash) (*state.StateDB, error) {
return hc.bc.processor.StateAt(root)
}
8 changes: 0 additions & 8 deletions core/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -1195,14 +1195,6 @@ func (sl *Slice) TxPool() *TxPool { return sl.txPool }

func (sl *Slice) Miner() *Miner { return sl.miner }

func (sl *Slice) SubscribePendingEtxs(ch chan<- types.PendingEtxs) event.Subscription {
return sl.scope.Track(sl.pendingEtxsFeed.Subscribe(ch))
}

func (sl *Slice) SubscribePendingEtxsRollup(ch chan<- types.PendingEtxsRollup) event.Subscription {
return sl.scope.Track(sl.pendingEtxsRollupFeed.Subscribe(ch))
}

func (sl *Slice) CurrentInfo(header *types.Header) bool {
return sl.miner.worker.CurrentInfo(header)
}
Expand Down
181 changes: 8 additions & 173 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,6 @@ const (
// The number is referenced from the size of tx pool.
txChanSize = 4096

// c_pendingEtxBroadcastChanSize is the size of channel listening to pEtx Event.
c_pendingEtxBroadcastChanSize = 10

// c_missingPendingEtxsRollupChanSize is the size of channel listening to missing pEtxsRollup Event.
c_missingPendingEtxsRollupChanSize = 10

// c_pendingEtxRollupBroadcastChanSize is the size of channel listening to pEtx rollup Event.
c_pendingEtxRollupBroadcastChanSize = 10

// missingPendingEtxsChanSize is the size of channel listening to the MissingPendingEtxsEvent
missingPendingEtxsChanSize = 10

// missingParentChanSize is the size of channel listening to the MissingParentEvent
missingParentChanSize = 10

Expand Down Expand Up @@ -129,21 +117,12 @@ type handler struct {
txFetcher *fetcher.TxFetcher
peers *peerSet

eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
txsSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
missingPendingEtxsCh chan types.HashAndLocation
missingPendingEtxsSub event.Subscription
missingParentCh chan common.Hash
missingParentSub event.Subscription

pEtxCh chan types.PendingEtxs
pEtxSub event.Subscription
pEtxRollupCh chan types.PendingEtxsRollup
pEtxRollupSub event.Subscription
missingPEtxsRollupCh chan common.Hash
missingPEtxsRollupSub event.Subscription
eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
txsSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
missingParentCh chan common.Hash
missingParentSub event.Subscription

whitelist map[uint64]common.Hash

Expand Down Expand Up @@ -331,12 +310,6 @@ func (h *handler) Start(maxPeers int) {
go h.txBroadcastLoop()
}

// broadcast pending etxs
h.wg.Add(1)
h.missingPendingEtxsCh = make(chan types.HashAndLocation, missingPendingEtxsChanSize)
h.missingPendingEtxsSub = h.core.SubscribeMissingPendingEtxsEvent(h.missingPendingEtxsCh)
go h.missingPendingEtxsLoop()

h.wg.Add(1)
h.missingParentCh = make(chan common.Hash, missingParentChanSize)
h.missingParentSub = h.core.SubscribeMissingParentEvent(h.missingParentCh)
Expand All @@ -354,35 +327,15 @@ func (h *handler) Start(maxPeers int) {
h.wg.Add(1)
go h.txsyncLoop64() //Legacy initial tx echange, drop with eth/64.
}

h.wg.Add(1)
h.missingPEtxsRollupCh = make(chan common.Hash, c_pendingEtxBroadcastChanSize)
h.missingPEtxsRollupSub = h.core.SubscribeMissingPendingEtxsRollupEvent(h.missingPEtxsRollupCh)
go h.missingPEtxsRollupLoop()

h.wg.Add(1)
h.pEtxCh = make(chan types.PendingEtxs, c_pendingEtxBroadcastChanSize)
h.pEtxSub = h.core.SubscribePendingEtxs(h.pEtxCh)
go h.broadcastPEtxLoop()

// broadcast pending etxs rollup
h.wg.Add(1)
h.pEtxRollupCh = make(chan types.PendingEtxsRollup, c_pendingEtxRollupBroadcastChanSize)
h.pEtxRollupSub = h.core.SubscribePendingEtxsRollup(h.pEtxRollupCh)
go h.broadcastPEtxRollupLoop()
}

func (h *handler) Stop() {
nodeCtx := common.NodeLocation.Context()
if nodeCtx == common.ZONE_CTX && h.core.ProcessingState() {
h.txsSub.Unsubscribe() // quits txBroadcastLoop
}
h.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
h.missingPendingEtxsSub.Unsubscribe() // quits pendingEtxsBroadcastLoop
h.missingPEtxsRollupSub.Unsubscribe() // quits missingPEtxsRollupSub
h.missingParentSub.Unsubscribe() // quits missingParentLoop
h.pEtxSub.Unsubscribe() // quits pEtxSub
h.pEtxRollupSub.Unsubscribe() // quits pEtxRollupSub
h.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
h.missingParentSub.Unsubscribe() // quits missingParentLoop

// Quit chainSync and txsync64.
// After this is done, no new peers will be accepted.
Expand Down Expand Up @@ -509,54 +462,6 @@ func (h *handler) txBroadcastLoop() {
}
}

// missingPEtxsRollupLoop listens to the MissingBody event in Slice and calls the blockAnnounces.
func (h *handler) missingPEtxsRollupLoop() {
defer h.wg.Done()
for {
select {
case hash := <-h.missingPEtxsRollupCh:
// Check if any of the peers have the body
for _, peer := range h.selectSomePeers() {
log.Trace("Fetching the missing pending etxs rollup from", "peer", peer.ID(), "hash", hash)
peer.RequestOnePendingEtxsRollup(hash)
}

case <-h.missingPEtxsRollupSub.Err():
return
}
}
}

// pendingEtxsBroadcastLoop announces new pendingEtxs to connected peers.
func (h *handler) missingPendingEtxsLoop() {
defer h.wg.Done()
for {
select {
case hashAndLocation := <-h.missingPendingEtxsCh:
// Only ask from peers running the slice for the missing pending etxs
// In the future, peers not responding before the timeout has to be punished
peersRunningSlice := h.peers.peerRunningSlice(hashAndLocation.Location)
// If the node doesn't have any peer running that slice, add a warning
if len(peersRunningSlice) == 0 {
log.Warn("Node doesn't have peers for given Location", "location", hashAndLocation.Location)
}
// Check if any of the peers have the body
for _, peer := range peersRunningSlice {
log.Trace("Fetching the missing pending etxs from", "peer", peer.ID(), "hash", hashAndLocation.Hash)
peer.RequestOnePendingEtxs(hashAndLocation.Hash)
}
if len(peersRunningSlice) == 0 {
for _, peer := range h.selectSomePeers() {
log.Trace("Fetching the missing pending etxs from", "peer", peer.ID(), "hash", hashAndLocation.Hash)
peer.RequestOnePendingEtxs(hashAndLocation.Hash)
}
}
case <-h.missingPendingEtxsSub.Err():
return
}
}
}

// missingParentLoop announces new pendingEtxs to connected peers.
func (h *handler) missingParentLoop() {
defer h.wg.Done()
Expand All @@ -574,76 +479,6 @@ func (h *handler) missingParentLoop() {
}
}

// pEtxLoop listens to the pendingEtxs event in Slice and anounces the pEtx to the peer
func (h *handler) broadcastPEtxLoop() {
defer h.wg.Done()
for {
select {
case pEtx := <-h.pEtxCh:
h.BroadcastPendingEtxs(pEtx)
case <-h.pEtxSub.Err():
return
}
}
}

// pEtxRollupLoop listens to the pendingEtxs event in Slice and anounces the pEtx to the peer
func (h *handler) broadcastPEtxRollupLoop() {
defer h.wg.Done()
for {
select {
case pEtxRollup := <-h.pEtxRollupCh:
h.BroadcastPendingEtxsRollup(pEtxRollup)
case <-h.pEtxRollupSub.Err():
return
}
}
}

// BroadcastPendingEtxs will either propagate a pendingEtxs to a subset of its peers
func (h *handler) BroadcastPendingEtxs(pEtx types.PendingEtxs) {
hash := pEtx.Header.Hash()
peers := h.peers.peersWithoutPendingEtxs(hash)

// Send the block to a subset of our peers
var peerThreshold int
sqrtNumPeers := int(math.Sqrt(float64(len(peers))))
if sqrtNumPeers < minPeerSend {
peerThreshold = len(peers)
} else {
peerThreshold = sqrtNumPeers
}
transfer := peers[:peerThreshold]
// If in region send the pendingEtxs directly, otherwise send the pendingEtxsManifest
for _, peer := range transfer {
peer.SendPendingEtxs(pEtx)
}
log.Trace("Propagated pending etxs", "hash", hash, "recipients", len(transfer), "len", len(pEtx.Etxs))
return
}

// BroadcastPendingEtxsRollup will either propagate a pending etx rollup to a subset of its peers
func (h *handler) BroadcastPendingEtxsRollup(pEtxRollup types.PendingEtxsRollup) {
hash := pEtxRollup.Header.Hash()
peers := h.peers.peersWithoutPendingEtxs(hash)

// Send the block to a subset of our peers
var peerThreshold int
sqrtNumPeers := int(math.Sqrt(float64(len(peers))))
if sqrtNumPeers < minPeerSend {
peerThreshold = len(peers)
} else {
peerThreshold = sqrtNumPeers
}
transfer := peers[:peerThreshold]
// If in region send the pendingEtxs directly, otherwise send the pendingEtxsManifest
for _, peer := range transfer {
peer.SendPendingEtxsRollup(pEtxRollup)
}
log.Trace("Propagated pending etxs rollup", "hash", hash, "recipients", len(transfer), "len", len(pEtxRollup.Manifest))
return
}

func (h *handler) selectSomePeers() []*eth.Peer {
// Get the min(sqrt(len(peers)), minPeerRequest)
count := int(math.Sqrt(float64(len(h.peers.allPeers()))))
Expand Down
Loading