Skip to content

Commit

Permalink
Merge pull request #2001 from OffchainLabs/fix-feed-timer-reset
Browse files Browse the repository at this point in the history
Clear out the channel when resetting feed timers
  • Loading branch information
joshuacolvin0 authored Dec 5, 2023
2 parents beb2dc4 + bbe6517 commit 4a3cbe2
Showing 1 changed file with 24 additions and 13 deletions.
37 changes: 24 additions & 13 deletions broadcastclients/broadcastclients.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,18 @@ func (bcs *BroadcastClients) adjustCount(delta int32) {
}
}

// Clears out a ticker's channel and resets it to the interval
func clearAndResetTicker(timer *time.Ticker, interval time.Duration) {
timer.Stop()
// Clear out any previous ticks
// A ticker's channel is only buffers one tick, so we don't need a loop here
select {
case <-timer.C:
default:
}
timer.Reset(interval)
}

func (bcs *BroadcastClients) Start(ctx context.Context) {
bcs.primaryRouter.StopWaiter.Start(ctx, bcs.primaryRouter)
bcs.secondaryRouter.StopWaiter.Start(ctx, bcs.secondaryRouter)
Expand Down Expand Up @@ -182,46 +194,45 @@ func (bcs *BroadcastClients) Start(ctx context.Context) {
return
// Primary feeds
case msg := <-bcs.primaryRouter.messageChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME)
if err := msgHandler(msg, bcs.primaryRouter); err != nil {
log.Error("Error routing message from Primary Sequencer Feeds", "err", err)
}
clearAndResetTicker(startSecondaryFeedTimer, MAX_FEED_INACTIVE_TIME)
clearAndResetTicker(primaryFeedIsDownTimer, MAX_FEED_INACTIVE_TIME)
case cs := <-bcs.primaryRouter.confirmedSequenceNumberChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME)
confSeqHandler(cs, bcs.primaryRouter)
clearAndResetTicker(startSecondaryFeedTimer, MAX_FEED_INACTIVE_TIME)
clearAndResetTicker(primaryFeedIsDownTimer, MAX_FEED_INACTIVE_TIME)
// Failed to get messages from primary feed for ~5 seconds, reset the timer responsible for stopping a secondary
case <-primaryFeedIsDownTimer.C:
stopSecondaryFeedTimer.Reset(PRIMARY_FEED_UPTIME)
clearAndResetTicker(stopSecondaryFeedTimer, PRIMARY_FEED_UPTIME)
default:
select {
case <-ctx.Done():
return
// Secondary Feeds
case msg := <-bcs.secondaryRouter.messageChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
if err := msgHandler(msg, bcs.secondaryRouter); err != nil {
log.Error("Error routing message from Secondary Sequencer Feeds", "err", err)
}
clearAndResetTicker(startSecondaryFeedTimer, MAX_FEED_INACTIVE_TIME)
case cs := <-bcs.secondaryRouter.confirmedSequenceNumberChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
confSeqHandler(cs, bcs.secondaryRouter)

clearAndResetTicker(startSecondaryFeedTimer, MAX_FEED_INACTIVE_TIME)
case msg := <-bcs.primaryRouter.messageChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME)
if err := msgHandler(msg, bcs.primaryRouter); err != nil {
log.Error("Error routing message from Primary Sequencer Feeds", "err", err)
}
clearAndResetTicker(startSecondaryFeedTimer, MAX_FEED_INACTIVE_TIME)
clearAndResetTicker(primaryFeedIsDownTimer, MAX_FEED_INACTIVE_TIME)
case cs := <-bcs.primaryRouter.confirmedSequenceNumberChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME)
confSeqHandler(cs, bcs.primaryRouter)
clearAndResetTicker(startSecondaryFeedTimer, MAX_FEED_INACTIVE_TIME)
clearAndResetTicker(primaryFeedIsDownTimer, MAX_FEED_INACTIVE_TIME)
case <-startSecondaryFeedTimer.C:
bcs.startSecondaryFeed(ctx)
case <-primaryFeedIsDownTimer.C:
stopSecondaryFeedTimer.Reset(PRIMARY_FEED_UPTIME)
clearAndResetTicker(stopSecondaryFeedTimer, PRIMARY_FEED_UPTIME)
}
}
}
Expand Down

0 comments on commit 4a3cbe2

Please sign in to comment.