Skip to content

Commit

Permalink
Merge pull request #1978 from OffchainLabs/prioritize-feed-reads
Browse files Browse the repository at this point in the history
Prioritize reading messages from primary feeds over secondaries
  • Loading branch information
ganeshvanahalli authored Nov 30, 2023
2 parents 09d45c1 + 42a18c0 commit 1bf26f7
Showing 1 changed file with 77 additions and 53 deletions.
130 changes: 77 additions & 53 deletions broadcastclients/broadcastclients.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,75 +140,89 @@ func (bcs *BroadcastClients) Start(ctx context.Context) {
defer startSecondaryFeedTimer.Stop()
defer stopSecondaryFeedTimer.Stop()
defer primaryFeedIsDownTimer.Stop()

msgHandler := func(msg broadcaster.BroadcastFeedMessage, router *Router) error {
if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok {
return nil
}
if _, ok := recentFeedItemsOld[msg.SequenceNumber]; ok {
return nil
}
recentFeedItemsNew[msg.SequenceNumber] = time.Now()
if err := router.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil {
return err
}
return nil
}
confSeqHandler := func(cs arbutil.MessageIndex, router *Router) {
if cs == lastConfirmed {
return
}
lastConfirmed = cs
if router.forwardConfirmationChan != nil {
router.forwardConfirmationChan <- cs
}
}

// Multiple select statements to prioritize reading messages from primary feeds' channels and avoid starving of timers
for {
select {
// Cycle buckets to get rid of old entries
case <-recentFeedItemsCleanup.C:
recentFeedItemsOld = recentFeedItemsNew
recentFeedItemsNew = make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE)
// Primary feeds have been up and running for PRIMARY_FEED_UPTIME=10 mins without a failure, stop the recently started secondary feed
case <-stopSecondaryFeedTimer.C:
bcs.stopSecondaryFeed()
default:
}

select {
case <-ctx.Done():
return

// Primary feeds
case msg := <-bcs.primaryRouter.messageChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME)
if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok {
continue
}
if _, ok := recentFeedItemsOld[msg.SequenceNumber]; ok {
continue
}
recentFeedItemsNew[msg.SequenceNumber] = time.Now()
if err := bcs.primaryRouter.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil {
if err := msgHandler(msg, bcs.primaryRouter); err != nil {
log.Error("Error routing message from Primary Sequencer Feeds", "err", err)
}
case cs := <-bcs.primaryRouter.confirmedSequenceNumberChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME)
if cs == lastConfirmed {
continue
}
lastConfirmed = cs
if bcs.primaryRouter.forwardConfirmationChan != nil {
bcs.primaryRouter.forwardConfirmationChan <- cs
}

// Secondary Feeds
case msg := <-bcs.secondaryRouter.messageChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok {
continue
}
if _, ok := recentFeedItemsOld[msg.SequenceNumber]; ok {
continue
}
recentFeedItemsNew[msg.SequenceNumber] = time.Now()
if err := bcs.secondaryRouter.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil {
log.Error("Error routing message from Secondary Sequencer Feeds", "err", err)
}
case cs := <-bcs.secondaryRouter.confirmedSequenceNumberChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
if cs == lastConfirmed {
continue
}
lastConfirmed = cs
if bcs.secondaryRouter.forwardConfirmationChan != nil {
bcs.secondaryRouter.forwardConfirmationChan <- cs
}

// Cycle buckets to get rid of old entries
case <-recentFeedItemsCleanup.C:
recentFeedItemsOld = recentFeedItemsNew
recentFeedItemsNew = make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE)

// failed to get messages from both primary and secondary feeds for ~5 seconds, start a new secondary feed
case <-startSecondaryFeedTimer.C:
bcs.startSecondaryFeed(ctx)

// failed to get messages from primary feed for ~5 seconds, reset the timer responsible for stopping a secondary
confSeqHandler(cs, bcs.primaryRouter)
// Failed to get messages from primary feed for ~5 seconds, reset the timer responsible for stopping a secondary
case <-primaryFeedIsDownTimer.C:
stopSecondaryFeedTimer.Reset(PRIMARY_FEED_UPTIME)
default:
select {
case <-ctx.Done():
return
// Secondary Feeds
case msg := <-bcs.secondaryRouter.messageChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
if err := msgHandler(msg, bcs.secondaryRouter); err != nil {
log.Error("Error routing message from Secondary Sequencer Feeds", "err", err)
}
case cs := <-bcs.secondaryRouter.confirmedSequenceNumberChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
confSeqHandler(cs, bcs.secondaryRouter)

// primary feeds have been up and running for PRIMARY_FEED_UPTIME=10 mins without a failure, stop the recently started secondary feed
case <-stopSecondaryFeedTimer.C:
bcs.stopSecondaryFeed()
case msg := <-bcs.primaryRouter.messageChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME)
if err := msgHandler(msg, bcs.primaryRouter); err != nil {
log.Error("Error routing message from Primary Sequencer Feeds", "err", err)
}
case cs := <-bcs.primaryRouter.confirmedSequenceNumberChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME)
confSeqHandler(cs, bcs.primaryRouter)
case <-startSecondaryFeedTimer.C:
bcs.startSecondaryFeed(ctx)
case <-primaryFeedIsDownTimer.C:
stopSecondaryFeedTimer.Reset(PRIMARY_FEED_UPTIME)
}
}
}
})
Expand Down Expand Up @@ -239,6 +253,16 @@ func (bcs *BroadcastClients) stopSecondaryFeed() {
bcs.secondaryClients[pos].StopAndWait()
bcs.secondaryClients = bcs.secondaryClients[:pos]
log.Info("disconnected secondary feed", "url", bcs.secondaryURL[pos])

// flush the secondary feeds' message and confirmedSequenceNumber channels
for {
select {
case <-bcs.secondaryRouter.messageChan:
case <-bcs.secondaryRouter.confirmedSequenceNumberChan:
default:
return
}
}
}
}

Expand Down

0 comments on commit 1bf26f7

Please sign in to comment.