From 1ad12ca481a4f223000690476a98f228e2db645a Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Wed, 30 Oct 2024 18:40:24 +0100 Subject: [PATCH] Fix timeboost auction resolution queue handling If there were auction resolution transactions in the timeboostAuctionResolutionTxQueue but no normal transactions came through the txQueue, then the auction resolution tx would not be processed because the main createBlock loop was waiting on the txQueue channel and not the auction resolution queue. We had tried to use the same pattern as we had used for the txRetryQueue but this is not correct as the auction resolution queue, like the txQueue, can have items added to it asynchronously, whereas the txRetryQueue is only added to from the createBlock itself. This commit makes the timeboostAuctionResolutionTxQueue also a channel so that it can be waited on in the same select statement and handle asynchronous events correctly. This also fixes two issues related to shutdown. The first was a race condition with shutting down and txRetryQueue handling, and the second was that we were missing adding forwarding for outstanding auction resolution txs upon shutdown. Fixes NIT-2878 --- execution/gethexec/sequencer.go | 78 +++++++++++++++++++++++++-------- 1 file changed, 59 insertions(+), 19 deletions(-) diff --git a/execution/gethexec/sequencer.go b/execution/gethexec/sequencer.go index a10a39854b..bf80ae204c 100644 --- a/execution/gethexec/sequencer.go +++ b/execution/gethexec/sequencer.go @@ -332,12 +332,36 @@ func (c nonceFailureCache) Add(err NonceError, queueItem txQueueItem) { } } +type synchronizedTxQueue struct { + queue containers.Queue[txQueueItem] + mutex sync.RWMutex +} + +func (q *synchronizedTxQueue) Push(item txQueueItem) { + q.mutex.Lock() + q.queue.Push(item) + q.mutex.Unlock() +} + +func (q *synchronizedTxQueue) Pop() txQueueItem { + q.mutex.Lock() + defer q.mutex.Unlock() + return q.queue.Pop() + +} + +func (q *synchronizedTxQueue) Len() int { + q.mutex.RLock() + defer q.mutex.RUnlock() + return q.queue.Len() +} + type Sequencer struct { stopwaiter.StopWaiter execEngine *ExecutionEngine txQueue chan txQueueItem - txRetryQueue containers.Queue[txQueueItem] + txRetryQueue synchronizedTxQueue l1Reader *headerreader.HeaderReader config SequencerConfigFetcher senderWhitelist map[common.Address]struct{} @@ -361,7 +385,7 @@ type Sequencer struct { expectedSurplus int64 expectedSurplusUpdated bool auctioneerAddr common.Address - timeboostAuctionResolutionTxQueue containers.Queue[txQueueItem] + timeboostAuctionResolutionTxQueue chan txQueueItem } func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderReader, configFetcher SequencerConfigFetcher) (*Sequencer, error) { @@ -377,15 +401,16 @@ func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderRead senderWhitelist[common.HexToAddress(address)] = struct{}{} } s := &Sequencer{ - execEngine: execEngine, - txQueue: make(chan txQueueItem, config.QueueSize), - l1Reader: l1Reader, - config: configFetcher, - senderWhitelist: senderWhitelist, - nonceCache: newNonceCache(config.NonceCacheSize), - l1Timestamp: 0, - pauseChan: nil, - onForwarderSet: make(chan struct{}, 1), + execEngine: execEngine, + txQueue: make(chan txQueueItem, config.QueueSize), + l1Reader: l1Reader, + config: configFetcher, + senderWhitelist: senderWhitelist, + nonceCache: newNonceCache(config.NonceCacheSize), + l1Timestamp: 0, + pauseChan: nil, + onForwarderSet: make(chan struct{}, 1), + timeboostAuctionResolutionTxQueue: make(chan txQueueItem, 10), // There should never be more than 1 outstanding auction resolutions } s.nonceFailures = &nonceFailureCache{ containers.NewLruCacheWithOnEvict(config.NonceCacheSize, s.onNonceFailureEvict), @@ -570,7 +595,7 @@ func (s *Sequencer) PublishAuctionResolutionTransaction(ctx context.Context, tx return err } log.Info("Prioritizing auction resolution transaction from auctioneer", "txHash", tx.Hash().Hex()) - s.timeboostAuctionResolutionTxQueue.Push(txQueueItem{ + s.timeboostAuctionResolutionTxQueue <- txQueueItem{ tx: tx, txSize: len(txBytes), options: nil, @@ -578,7 +603,7 @@ func (s *Sequencer) PublishAuctionResolutionTransaction(ctx context.Context, tx returnedResult: &atomic.Bool{}, ctx: context.TODO(), firstAppearance: time.Now(), - }) + } return nil } @@ -906,10 +931,12 @@ func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) { for { var queueItem txQueueItem - if s.timeboostAuctionResolutionTxQueue.Len() > 0 { - queueItem = s.timeboostAuctionResolutionTxQueue.Pop() - log.Info("Popped the auction resolution tx", queueItem.tx.Hash()) - } else if s.txRetryQueue.Len() > 0 { + + if s.txRetryQueue.Len() > 0 { + // The txRetryQueue is not modeled as a channel because it is only added to from + // this function (Sequencer.createBlock). So it is sufficient to check its + // len at the start of this loop, since items can't be added to it asynchronously, + // which is not true for the main txQueue or timeboostAuctionResolutionQueue. queueItem = s.txRetryQueue.Pop() } else if len(queueItems) == 0 { var nextNonceExpiryChan <-chan time.Time @@ -918,6 +945,8 @@ func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) { } select { case queueItem = <-s.txQueue: + case queueItem = <-s.timeboostAuctionResolutionTxQueue: + log.Info("Popped the auction resolution tx", "txHash", queueItem.tx.Hash()) case <-nextNonceExpiryChan: // No need to stop the previous timer since it already elapsed nextNonceExpiryTimer = s.expireNonceFailures() @@ -936,6 +965,8 @@ func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) { done := false select { case queueItem = <-s.txQueue: + case queueItem = <-s.timeboostAuctionResolutionTxQueue: + log.Info("Popped the auction resolution tx", "txHash", queueItem.tx.Hash()) default: done = true } @@ -1265,11 +1296,18 @@ func (s *Sequencer) StopAndWait() { if s.config().Timeboost.Enable && s.expressLaneService != nil { s.expressLaneService.StopAndWait() } - if s.txRetryQueue.Len() == 0 && len(s.txQueue) == 0 && s.nonceFailures.Len() == 0 { + if s.txRetryQueue.Len() == 0 && + len(s.txQueue) == 0 && + s.nonceFailures.Len() == 0 && + len(s.timeboostAuctionResolutionTxQueue) == 0 { return } // this usually means that coordinator's safe-shutdown-delay is too low - log.Warn("Sequencer has queued items while shutting down", "txQueue", len(s.txQueue), "retryQueue", s.txRetryQueue.Len(), "nonceFailures", s.nonceFailures.Len()) + log.Warn("Sequencer has queued items while shutting down", + "txQueue", len(s.txQueue), + "retryQueue", s.txRetryQueue.Len(), + "nonceFailures", s.nonceFailures.Len(), + "timeboostAuctionResolutionTxQueue", len(s.timeboostAuctionResolutionTxQueue)) _, forwarder := s.GetPauseAndForwarder() if forwarder != nil { var wg sync.WaitGroup @@ -1290,6 +1328,8 @@ func (s *Sequencer) StopAndWait() { select { case item = <-s.txQueue: source = "txQueue" + case item = <-s.timeboostAuctionResolutionTxQueue: + source = "timeboostAuctionResolutionTxQueue" default: break emptyqueues }