Skip to content

Commit

Permalink
Merge pull request #2026 from OffchainLabs/recognize-batch-poster-bac…
Browse files Browse the repository at this point in the history
…klog

Recognize batch poster backlog from the data poster
  • Loading branch information
PlasmaPower authored Dec 16, 2023
2 parents 9b19f6c + 25b7099 commit 3221e11
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 8 deletions.
19 changes: 13 additions & 6 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type BatchPoster struct {
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
firstEphemeralError time.Time // first time a continuous error suspected to be ephemeral occurred
// This is an atomic variable that should only be accessed atomically.
// An estimate of the number of batches we want to post but haven't yet.
// This doesn't include batches which we don't want to post yet due to the L1 bounds.
backlog uint64
Expand Down Expand Up @@ -283,6 +284,7 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e
RedisLock: redisLock,
Config: dataPosterConfigFetcher,
MetadataRetriever: b.getBatchPosterPosition,
ExtraBacklog: b.GetBacklogEstimate,
RedisKey: "data-poster.queue",
})
if err != nil {
Expand Down Expand Up @@ -846,7 +848,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)

if b.building == nil || b.building.startMsgCount != batchPosition.MessageCount {
b.building = &buildingBatch{
segments: newBatchSegments(batchPosition.DelayedMessageCount, b.config(), b.backlog),
segments: newBatchSegments(batchPosition.DelayedMessageCount, b.config(), b.GetBacklogEstimate()),
msgCount: batchPosition.MessageCount,
startMsgCount: batchPosition.MessageCount,
}
Expand Down Expand Up @@ -1061,12 +1063,12 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
recentlyHitL1Bounds := time.Since(b.lastHitL1Bounds) < config.PollInterval*3
postedMessages := b.building.msgCount - batchPosition.MessageCount
unpostedMessages := msgCount - b.building.msgCount
b.backlog = uint64(unpostedMessages) / uint64(postedMessages)
if b.backlog > 10 {
backlog := uint64(unpostedMessages) / uint64(postedMessages)
if backlog > 10 {
logLevel := log.Warn
if recentlyHitL1Bounds {
logLevel = log.Info
} else if b.backlog > 30 {
} else if backlog > 30 {
logLevel = log.Error
}
logLevel(
Expand All @@ -1076,14 +1078,15 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
"messageCount", msgCount,
"lastPostedMessages", postedMessages,
"unpostedMessages", unpostedMessages,
"batchBacklogEstimate", b.backlog,
"batchBacklogEstimate", backlog,
)
}
if recentlyHitL1Bounds {
// This backlog isn't "real" in that we don't want to post any more messages.
// Setting the backlog to 0 here ensures that we don't lower compression as a result.
b.backlog = 0
backlog = 0
}
atomic.StoreUint64(&b.backlog, backlog)
b.building = nil

// If we aren't queueing up transactions, wait for the receipt before moving on to the next batch.
Expand All @@ -1098,6 +1101,10 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
return true, nil
}

func (b *BatchPoster) GetBacklogEstimate() uint64 {
return atomic.LoadUint64(&b.backlog)
}

func (b *BatchPoster) Start(ctxIn context.Context) {
b.dataPoster.Start(ctxIn)
b.redisLock.Start(ctxIn)
Expand Down
13 changes: 11 additions & 2 deletions arbnode/dataposter/data_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type DataPoster struct {
usingNoOpStorage bool
replacementTimes []time.Duration
metadataRetriever func(ctx context.Context, blockNum *big.Int) ([]byte, error)
extraBacklog func() uint64

// These fields are protected by the mutex.
// TODO: factor out these fields into separate structure, since now one
Expand Down Expand Up @@ -116,6 +117,7 @@ type DataPosterOpts struct {
RedisLock AttemptLocker
Config ConfigFetcher
MetadataRetriever func(ctx context.Context, blockNum *big.Int) ([]byte, error)
ExtraBacklog func() uint64
RedisKey string // Redis storage key
}

Expand Down Expand Up @@ -176,6 +178,10 @@ func NewDataPoster(ctx context.Context, opts *DataPosterOpts) (*DataPoster, erro
redisLock: opts.RedisLock,
errorCount: make(map[uint64]int),
maxFeeCapExpression: expression,
extraBacklog: opts.ExtraBacklog,
}
if dp.extraBacklog == nil {
dp.extraBacklog = func() uint64 { return 0 }
}
if cfg.ExternalSigner.URL != "" {
signer, sender, err := externalSigner(ctx, &cfg.ExternalSigner)
Expand Down Expand Up @@ -371,6 +377,7 @@ func (p *DataPoster) GetNextNonceAndMeta(ctx context.Context) (uint64, []byte, e
const minRbfIncrease = arbmath.OneInBips * 11 / 10

// evalMaxFeeCapExpr uses MaxFeeCapFormula from config to calculate the expression's result by plugging in appropriate parameter values
// backlogOfBatches should already include extraBacklog
func (p *DataPoster) evalMaxFeeCapExpr(backlogOfBatches uint64, elapsed time.Duration) (*big.Int, error) {
config := p.config()
parameters := map[string]any{
Expand Down Expand Up @@ -403,8 +410,10 @@ func (p *DataPoster) evalMaxFeeCapExpr(backlogOfBatches uint64, elapsed time.Dur
return resultBig, nil
}

func (p *DataPoster) feeAndTipCaps(ctx context.Context, nonce uint64, gasLimit uint64, lastFeeCap *big.Int, lastTipCap *big.Int, dataCreatedAt time.Time, backlogOfBatches uint64) (*big.Int, *big.Int, error) {
// The dataPosterBacklog argument should *not* include extraBacklog (it's added in in this function)
func (p *DataPoster) feeAndTipCaps(ctx context.Context, nonce uint64, gasLimit uint64, lastFeeCap *big.Int, lastTipCap *big.Int, dataCreatedAt time.Time, dataPosterBacklog uint64) (*big.Int, *big.Int, error) {
config := p.config()
dataPosterBacklog += p.extraBacklog()
latestHeader, err := p.headerReader.LastHeader(ctx)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -442,7 +451,7 @@ func (p *DataPoster) feeAndTipCaps(ctx context.Context, nonce uint64, gasLimit u
}

elapsed := time.Since(dataCreatedAt)
maxFeeCap, err := p.evalMaxFeeCapExpr(backlogOfBatches, elapsed)
maxFeeCap, err := p.evalMaxFeeCapExpr(dataPosterBacklog, elapsed)
if err != nil {
return nil, nil, err
}
Expand Down

0 comments on commit 3221e11

Please sign in to comment.