diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 7a4cfc21c2..5d61bc5f14 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -73,6 +73,7 @@ type BatchPoster struct { dataPoster *dataposter.DataPoster redisLock *redislock.Simple firstEphemeralError time.Time // first time a continuous error suspected to be ephemeral occurred + // This is an atomic variable that should only be accessed atomically. // An estimate of the number of batches we want to post but haven't yet. // This doesn't include batches which we don't want to post yet due to the L1 bounds. backlog uint64 @@ -283,6 +284,7 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e RedisLock: redisLock, Config: dataPosterConfigFetcher, MetadataRetriever: b.getBatchPosterPosition, + ExtraBacklog: b.GetBacklogEstimate, RedisKey: "data-poster.queue", }) if err != nil { @@ -846,7 +848,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) if b.building == nil || b.building.startMsgCount != batchPosition.MessageCount { b.building = &buildingBatch{ - segments: newBatchSegments(batchPosition.DelayedMessageCount, b.config(), b.backlog), + segments: newBatchSegments(batchPosition.DelayedMessageCount, b.config(), b.GetBacklogEstimate()), msgCount: batchPosition.MessageCount, startMsgCount: batchPosition.MessageCount, } @@ -1061,12 +1063,12 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) recentlyHitL1Bounds := time.Since(b.lastHitL1Bounds) < config.PollInterval*3 postedMessages := b.building.msgCount - batchPosition.MessageCount unpostedMessages := msgCount - b.building.msgCount - b.backlog = uint64(unpostedMessages) / uint64(postedMessages) - if b.backlog > 10 { + backlog := uint64(unpostedMessages) / uint64(postedMessages) + if backlog > 10 { logLevel := log.Warn if recentlyHitL1Bounds { logLevel = log.Info - } else if b.backlog > 30 { + } else if backlog > 30 { logLevel = log.Error } logLevel( @@ -1076,14 +1078,15 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) "messageCount", msgCount, "lastPostedMessages", postedMessages, "unpostedMessages", unpostedMessages, - "batchBacklogEstimate", b.backlog, + "batchBacklogEstimate", backlog, ) } if recentlyHitL1Bounds { // This backlog isn't "real" in that we don't want to post any more messages. // Setting the backlog to 0 here ensures that we don't lower compression as a result. - b.backlog = 0 + backlog = 0 } + atomic.StoreUint64(&b.backlog, backlog) b.building = nil // If we aren't queueing up transactions, wait for the receipt before moving on to the next batch. @@ -1098,6 +1101,10 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) return true, nil } +func (b *BatchPoster) GetBacklogEstimate() uint64 { + return atomic.LoadUint64(&b.backlog) +} + func (b *BatchPoster) Start(ctxIn context.Context) { b.dataPoster.Start(ctxIn) b.redisLock.Start(ctxIn) diff --git a/arbnode/dataposter/data_poster.go b/arbnode/dataposter/data_poster.go index 042a5487d2..8326419593 100644 --- a/arbnode/dataposter/data_poster.go +++ b/arbnode/dataposter/data_poster.go @@ -62,6 +62,7 @@ type DataPoster struct { usingNoOpStorage bool replacementTimes []time.Duration metadataRetriever func(ctx context.Context, blockNum *big.Int) ([]byte, error) + extraBacklog func() uint64 // These fields are protected by the mutex. // TODO: factor out these fields into separate structure, since now one @@ -116,6 +117,7 @@ type DataPosterOpts struct { RedisLock AttemptLocker Config ConfigFetcher MetadataRetriever func(ctx context.Context, blockNum *big.Int) ([]byte, error) + ExtraBacklog func() uint64 RedisKey string // Redis storage key } @@ -176,6 +178,10 @@ func NewDataPoster(ctx context.Context, opts *DataPosterOpts) (*DataPoster, erro redisLock: opts.RedisLock, errorCount: make(map[uint64]int), maxFeeCapExpression: expression, + extraBacklog: opts.ExtraBacklog, + } + if dp.extraBacklog == nil { + dp.extraBacklog = func() uint64 { return 0 } } if cfg.ExternalSigner.URL != "" { signer, sender, err := externalSigner(ctx, &cfg.ExternalSigner) @@ -371,6 +377,7 @@ func (p *DataPoster) GetNextNonceAndMeta(ctx context.Context) (uint64, []byte, e const minRbfIncrease = arbmath.OneInBips * 11 / 10 // evalMaxFeeCapExpr uses MaxFeeCapFormula from config to calculate the expression's result by plugging in appropriate parameter values +// backlogOfBatches should already include extraBacklog func (p *DataPoster) evalMaxFeeCapExpr(backlogOfBatches uint64, elapsed time.Duration) (*big.Int, error) { config := p.config() parameters := map[string]any{ @@ -403,8 +410,10 @@ func (p *DataPoster) evalMaxFeeCapExpr(backlogOfBatches uint64, elapsed time.Dur return resultBig, nil } -func (p *DataPoster) feeAndTipCaps(ctx context.Context, nonce uint64, gasLimit uint64, lastFeeCap *big.Int, lastTipCap *big.Int, dataCreatedAt time.Time, backlogOfBatches uint64) (*big.Int, *big.Int, error) { +// The dataPosterBacklog argument should *not* include extraBacklog (it's added in in this function) +func (p *DataPoster) feeAndTipCaps(ctx context.Context, nonce uint64, gasLimit uint64, lastFeeCap *big.Int, lastTipCap *big.Int, dataCreatedAt time.Time, dataPosterBacklog uint64) (*big.Int, *big.Int, error) { config := p.config() + dataPosterBacklog += p.extraBacklog() latestHeader, err := p.headerReader.LastHeader(ctx) if err != nil { return nil, nil, err @@ -442,7 +451,7 @@ func (p *DataPoster) feeAndTipCaps(ctx context.Context, nonce uint64, gasLimit u } elapsed := time.Since(dataCreatedAt) - maxFeeCap, err := p.evalMaxFeeCapExpr(backlogOfBatches, elapsed) + maxFeeCap, err := p.evalMaxFeeCapExpr(dataPosterBacklog, elapsed) if err != nil { return nil, nil, err }