diff --git a/execution/gethexec/sequencer.go b/execution/gethexec/sequencer.go index 993ea70866..79b70ca655 100644 --- a/execution/gethexec/sequencer.go +++ b/execution/gethexec/sequencer.go @@ -328,12 +328,36 @@ func (c nonceFailureCache) Add(err NonceError, queueItem txQueueItem) { } } +type synchronizedTxQueue struct { + queue containers.Queue[txQueueItem] + mutex sync.RWMutex +} + +func (q *synchronizedTxQueue) Push(item txQueueItem) { + q.mutex.Lock() + q.queue.Push(item) + q.mutex.Unlock() +} + +func (q *synchronizedTxQueue) Pop() txQueueItem { + q.mutex.Lock() + defer q.mutex.Unlock() + return q.queue.Pop() + +} + +func (q *synchronizedTxQueue) Len() int { + q.mutex.RLock() + defer q.mutex.RUnlock() + return q.queue.Len() +} + type Sequencer struct { stopwaiter.StopWaiter execEngine *ExecutionEngine txQueue chan txQueueItem - txRetryQueue containers.Queue[txQueueItem] + txRetryQueue synchronizedTxQueue l1Reader *headerreader.HeaderReader config SequencerConfigFetcher senderWhitelist map[common.Address]struct{} @@ -357,7 +381,7 @@ type Sequencer struct { expectedSurplus int64 expectedSurplusUpdated bool auctioneerAddr common.Address - timeboostAuctionResolutionTxQueue containers.Queue[txQueueItem] + timeboostAuctionResolutionTxQueue chan txQueueItem } func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderReader, configFetcher SequencerConfigFetcher) (*Sequencer, error) { @@ -373,15 +397,16 @@ func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderRead senderWhitelist[common.HexToAddress(address)] = struct{}{} } s := &Sequencer{ - execEngine: execEngine, - txQueue: make(chan txQueueItem, config.QueueSize), - l1Reader: l1Reader, - config: configFetcher, - senderWhitelist: senderWhitelist, - nonceCache: newNonceCache(config.NonceCacheSize), - l1Timestamp: 0, - pauseChan: nil, - onForwarderSet: make(chan struct{}, 1), + execEngine: execEngine, + txQueue: make(chan txQueueItem, config.QueueSize), + l1Reader: l1Reader, + config: configFetcher, + senderWhitelist: senderWhitelist, + nonceCache: newNonceCache(config.NonceCacheSize), + l1Timestamp: 0, + pauseChan: nil, + onForwarderSet: make(chan struct{}, 1), + timeboostAuctionResolutionTxQueue: make(chan txQueueItem, 10), // There should never be more than 1 outstanding auction resolutions } s.nonceFailures = &nonceFailureCache{ containers.NewLruCacheWithOnEvict(config.NonceCacheSize, s.onNonceFailureEvict), @@ -566,7 +591,7 @@ func (s *Sequencer) PublishAuctionResolutionTransaction(ctx context.Context, tx return err } log.Info("Prioritizing auction resolution transaction from auctioneer", "txHash", tx.Hash().Hex()) - s.timeboostAuctionResolutionTxQueue.Push(txQueueItem{ + s.timeboostAuctionResolutionTxQueue <- txQueueItem{ tx: tx, txSize: len(txBytes), options: nil, @@ -574,7 +599,7 @@ func (s *Sequencer) PublishAuctionResolutionTransaction(ctx context.Context, tx returnedResult: &atomic.Bool{}, ctx: context.TODO(), firstAppearance: time.Now(), - }) + } return nil } @@ -902,10 +927,12 @@ func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) { for { var queueItem txQueueItem - if s.timeboostAuctionResolutionTxQueue.Len() > 0 { - queueItem = s.timeboostAuctionResolutionTxQueue.Pop() - log.Info("Popped the auction resolution tx", queueItem.tx.Hash()) - } else if s.txRetryQueue.Len() > 0 { + + if s.txRetryQueue.Len() > 0 { + // The txRetryQueue is not modeled as a channel because it is only added to from + // this function (Sequencer.createBlock). So it is sufficient to check its + // len at the start of this loop, since items can't be added to it asynchronously, + // which is not true for the main txQueue or timeboostAuctionResolutionQueue. queueItem = s.txRetryQueue.Pop() } else if len(queueItems) == 0 { var nextNonceExpiryChan <-chan time.Time @@ -914,6 +941,8 @@ func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) { } select { case queueItem = <-s.txQueue: + case queueItem = <-s.timeboostAuctionResolutionTxQueue: + log.Info("Popped the auction resolution tx", "txHash", queueItem.tx.Hash()) case <-nextNonceExpiryChan: // No need to stop the previous timer since it already elapsed nextNonceExpiryTimer = s.expireNonceFailures() @@ -932,6 +961,8 @@ func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) { done := false select { case queueItem = <-s.txQueue: + case queueItem = <-s.timeboostAuctionResolutionTxQueue: + log.Info("Popped the auction resolution tx", "txHash", queueItem.tx.Hash()) default: done = true } @@ -1262,11 +1293,18 @@ func (s *Sequencer) StopAndWait() { if s.config().Timeboost.Enable && s.expressLaneService != nil { s.expressLaneService.StopAndWait() } - if s.txRetryQueue.Len() == 0 && len(s.txQueue) == 0 && s.nonceFailures.Len() == 0 { + if s.txRetryQueue.Len() == 0 && + len(s.txQueue) == 0 && + s.nonceFailures.Len() == 0 && + len(s.timeboostAuctionResolutionTxQueue) == 0 { return } // this usually means that coordinator's safe-shutdown-delay is too low - log.Warn("Sequencer has queued items while shutting down", "txQueue", len(s.txQueue), "retryQueue", s.txRetryQueue.Len(), "nonceFailures", s.nonceFailures.Len()) + log.Warn("Sequencer has queued items while shutting down", + "txQueue", len(s.txQueue), + "retryQueue", s.txRetryQueue.Len(), + "nonceFailures", s.nonceFailures.Len(), + "timeboostAuctionResolutionTxQueue", len(s.timeboostAuctionResolutionTxQueue)) _, forwarder := s.GetPauseAndForwarder() if forwarder != nil { var wg sync.WaitGroup @@ -1287,6 +1325,8 @@ func (s *Sequencer) StopAndWait() { select { case item = <-s.txQueue: source = "txQueue" + case item = <-s.timeboostAuctionResolutionTxQueue: + source = "timeboostAuctionResolutionTxQueue" default: break emptyqueues }