Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix timeboost auction resolution queue handling #2764

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 59 additions & 19 deletions execution/gethexec/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,12 +328,36 @@ func (c nonceFailureCache) Add(err NonceError, queueItem txQueueItem) {
}
}

type synchronizedTxQueue struct {
Copy link
Contributor

@ganeshvanahalli ganeshvanahalli Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth it to change this to something like SynchronizedQueue[T any] and add it to containers package? Seems like it can be used in other places as well

queue containers.Queue[txQueueItem]
mutex sync.RWMutex
}

func (q *synchronizedTxQueue) Push(item txQueueItem) {
q.mutex.Lock()
q.queue.Push(item)
q.mutex.Unlock()
}

func (q *synchronizedTxQueue) Pop() txQueueItem {
q.mutex.Lock()
defer q.mutex.Unlock()
return q.queue.Pop()

}

func (q *synchronizedTxQueue) Len() int {
q.mutex.RLock()
defer q.mutex.RUnlock()
return q.queue.Len()
}

type Sequencer struct {
stopwaiter.StopWaiter

execEngine *ExecutionEngine
txQueue chan txQueueItem
txRetryQueue containers.Queue[txQueueItem]
txRetryQueue synchronizedTxQueue
l1Reader *headerreader.HeaderReader
config SequencerConfigFetcher
senderWhitelist map[common.Address]struct{}
Expand All @@ -357,7 +381,7 @@ type Sequencer struct {
expectedSurplus int64
expectedSurplusUpdated bool
auctioneerAddr common.Address
timeboostAuctionResolutionTxQueue containers.Queue[txQueueItem]
timeboostAuctionResolutionTxQueue chan txQueueItem
}

func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderReader, configFetcher SequencerConfigFetcher) (*Sequencer, error) {
Expand All @@ -373,15 +397,16 @@ func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderRead
senderWhitelist[common.HexToAddress(address)] = struct{}{}
}
s := &Sequencer{
execEngine: execEngine,
txQueue: make(chan txQueueItem, config.QueueSize),
l1Reader: l1Reader,
config: configFetcher,
senderWhitelist: senderWhitelist,
nonceCache: newNonceCache(config.NonceCacheSize),
l1Timestamp: 0,
pauseChan: nil,
onForwarderSet: make(chan struct{}, 1),
execEngine: execEngine,
txQueue: make(chan txQueueItem, config.QueueSize),
l1Reader: l1Reader,
config: configFetcher,
senderWhitelist: senderWhitelist,
nonceCache: newNonceCache(config.NonceCacheSize),
l1Timestamp: 0,
pauseChan: nil,
onForwarderSet: make(chan struct{}, 1),
timeboostAuctionResolutionTxQueue: make(chan txQueueItem, 10), // There should never be more than 1 outstanding auction resolutions
}
s.nonceFailures = &nonceFailureCache{
containers.NewLruCacheWithOnEvict(config.NonceCacheSize, s.onNonceFailureEvict),
Expand Down Expand Up @@ -566,15 +591,15 @@ func (s *Sequencer) PublishAuctionResolutionTransaction(ctx context.Context, tx
return err
}
log.Info("Prioritizing auction resolution transaction from auctioneer", "txHash", tx.Hash().Hex())
s.timeboostAuctionResolutionTxQueue.Push(txQueueItem{
s.timeboostAuctionResolutionTxQueue <- txQueueItem{
tx: tx,
txSize: len(txBytes),
options: nil,
resultChan: make(chan error, 1),
returnedResult: &atomic.Bool{},
ctx: context.TODO(),
firstAppearance: time.Now(),
})
}
return nil
}

Expand Down Expand Up @@ -902,10 +927,12 @@ func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) {

for {
var queueItem txQueueItem
if s.timeboostAuctionResolutionTxQueue.Len() > 0 {
queueItem = s.timeboostAuctionResolutionTxQueue.Pop()
log.Info("Popped the auction resolution tx", queueItem.tx.Hash())
} else if s.txRetryQueue.Len() > 0 {

if s.txRetryQueue.Len() > 0 {
// The txRetryQueue is not modeled as a channel because it is only added to from
// this function (Sequencer.createBlock). So it is sufficient to check its
// len at the start of this loop, since items can't be added to it asynchronously,
// which is not true for the main txQueue or timeboostAuctionResolutionQueue.
queueItem = s.txRetryQueue.Pop()
} else if len(queueItems) == 0 {
var nextNonceExpiryChan <-chan time.Time
Expand All @@ -914,6 +941,8 @@ func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) {
}
select {
case queueItem = <-s.txQueue:
case queueItem = <-s.timeboostAuctionResolutionTxQueue:
log.Info("Popped the auction resolution tx", "txHash", queueItem.tx.Hash())
case <-nextNonceExpiryChan:
// No need to stop the previous timer since it already elapsed
nextNonceExpiryTimer = s.expireNonceFailures()
Expand All @@ -932,6 +961,8 @@ func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) {
done := false
select {
case queueItem = <-s.txQueue:
case queueItem = <-s.timeboostAuctionResolutionTxQueue:
log.Info("Popped the auction resolution tx", "txHash", queueItem.tx.Hash())
default:
done = true
}
Expand Down Expand Up @@ -1262,11 +1293,18 @@ func (s *Sequencer) StopAndWait() {
if s.config().Timeboost.Enable && s.expressLaneService != nil {
s.expressLaneService.StopAndWait()
}
if s.txRetryQueue.Len() == 0 && len(s.txQueue) == 0 && s.nonceFailures.Len() == 0 {
if s.txRetryQueue.Len() == 0 &&
len(s.txQueue) == 0 &&
s.nonceFailures.Len() == 0 &&
len(s.timeboostAuctionResolutionTxQueue) == 0 {
return
}
// this usually means that coordinator's safe-shutdown-delay is too low
log.Warn("Sequencer has queued items while shutting down", "txQueue", len(s.txQueue), "retryQueue", s.txRetryQueue.Len(), "nonceFailures", s.nonceFailures.Len())
log.Warn("Sequencer has queued items while shutting down",
"txQueue", len(s.txQueue),
"retryQueue", s.txRetryQueue.Len(),
"nonceFailures", s.nonceFailures.Len(),
"timeboostAuctionResolutionTxQueue", len(s.timeboostAuctionResolutionTxQueue))
_, forwarder := s.GetPauseAndForwarder()
if forwarder != nil {
var wg sync.WaitGroup
Expand All @@ -1287,6 +1325,8 @@ func (s *Sequencer) StopAndWait() {
select {
case item = <-s.txQueue:
source = "txQueue"
case item = <-s.timeboostAuctionResolutionTxQueue:
source = "timeboostAuctionResolutionTxQueue"
default:
break emptyqueues
}
Expand Down
Loading