Skip to content

Commit

Permalink
Merge pull request #2032 from OffchainLabs/batch-size-moving-average
Browse files Browse the repository at this point in the history
Use a moving average to determine messages per batch
  • Loading branch information
PlasmaPower authored Dec 18, 2023
2 parents 9306bea + 06269e0 commit bf56ebb
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 6 deletions.
23 changes: 21 additions & 2 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type BatchPoster struct {
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
firstEphemeralError time.Time // first time a continuous error suspected to be ephemeral occurred
messagesPerBatch *arbmath.MovingAverage[uint64]
// This is an atomic variable that should only be accessed atomically.
// An estimate of the number of batches we want to post but haven't yet.
// This doesn't include batches which we don't want to post yet due to the L1 bounds.
Expand Down Expand Up @@ -272,6 +273,10 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e
daWriter: opts.DAWriter,
redisLock: redisLock,
}
b.messagesPerBatch, err = arbmath.NewMovingAverage[uint64](20)
if err != nil {
return nil, err
}
dataPosterConfigFetcher := func() *dataposter.DataPosterConfig {
return &(opts.Config().DataPoster)
}
Expand Down Expand Up @@ -1062,8 +1067,21 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
)
recentlyHitL1Bounds := time.Since(b.lastHitL1Bounds) < config.PollInterval*3
postedMessages := b.building.msgCount - batchPosition.MessageCount
b.messagesPerBatch.Update(uint64(postedMessages))
unpostedMessages := msgCount - b.building.msgCount
backlog := uint64(unpostedMessages) / uint64(postedMessages)
messagesPerBatch := b.messagesPerBatch.Average()
if messagesPerBatch == 0 {
// This should be impossible because we always post at least one message in a batch.
// That said, better safe than sorry, as we would panic if this remained at 0.
log.Warn(
"messagesPerBatch is somehow zero",
"postedMessages", postedMessages,
"buildingFrom", batchPosition.MessageCount,
"buildingTo", b.building.msgCount,
)
messagesPerBatch = 1
}
backlog := uint64(unpostedMessages) / messagesPerBatch
if backlog > 10 {
logLevel := log.Warn
if recentlyHitL1Bounds {
Expand All @@ -1076,7 +1094,8 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
"recentlyHitL1Bounds", recentlyHitL1Bounds,
"currentPosition", b.building.msgCount,
"messageCount", msgCount,
"lastPostedMessages", postedMessages,
"messagesPerBatch", messagesPerBatch,
"postedMessages", postedMessages,
"unpostedMessages", unpostedMessages,
"batchBacklogEstimate", backlog,
)
Expand Down
8 changes: 4 additions & 4 deletions util/arbmath/math.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,22 @@ type Float interface {
~float32 | ~float64
}

// Ordered is anything that implements comparison operators such as `<` and `>`.
// Number is anything that implements operators such as `<`, `+` and `/`.
// Unfortunately, that doesn't include big ints.
type Ordered interface {
type Number interface {
Integer | Float
}

// MinInt the minimum of two ints
func MinInt[T Ordered](value, ceiling T) T {
func MinInt[T Number](value, ceiling T) T {
if value > ceiling {
return ceiling
}
return value
}

// MaxInt the maximum of two ints
func MaxInt[T Ordered](value, floor T) T {
func MaxInt[T Number](value, floor T) T {
if value < floor {
return floor
}
Expand Down
46 changes: 46 additions & 0 deletions util/arbmath/moving_average.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2023, Offchain Labs, Inc.
// For license information, see https://github.com/nitro/blob/master/LICENSE

package arbmath

import "fmt"

// A simple moving average of a generic number type.
type MovingAverage[T Number] struct {
buffer []T
bufferPosition int
sum T
}

func NewMovingAverage[T Number](period int) (*MovingAverage[T], error) {
if period <= 0 {
return nil, fmt.Errorf("MovingAverage period specified as %v but it must be positive", period)
}
return &MovingAverage[T]{
buffer: make([]T, 0, period),
}, nil
}

func (a *MovingAverage[T]) Update(value T) {
period := cap(a.buffer)
if period == 0 {
return
}
if len(a.buffer) < period {
a.buffer = append(a.buffer, value)
a.sum += value
} else {
a.sum += value
a.sum -= a.buffer[a.bufferPosition]
a.buffer[a.bufferPosition] = value
a.bufferPosition = (a.bufferPosition + 1) % period
}
}

// Average returns the current moving average, or zero if no values have been added.
func (a *MovingAverage[T]) Average() T {
if len(a.buffer) == 0 {
return 0
}
return a.sum / T(len(a.buffer))
}
47 changes: 47 additions & 0 deletions util/arbmath/moving_average_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2023, Offchain Labs, Inc.
// For license information, see https://github.com/nitro/blob/master/LICENSE

package arbmath

import "testing"

func TestMovingAverage(t *testing.T) {
_, err := NewMovingAverage[int](0)
if err == nil {
t.Error("Expected error when creating moving average of period 0")
}
_, err = NewMovingAverage[int](-1)
if err == nil {
t.Error("Expected error when creating moving average of period -1")
}

ma, err := NewMovingAverage[int](5)
if err != nil {
t.Fatalf("Error creating moving average of period 5: %v", err)
}
if ma.Average() != 0 {
t.Errorf("Average() = %v, want 0", ma.Average())
}
ma.Update(2)
if ma.Average() != 2 {
t.Errorf("Average() = %v, want 2", ma.Average())
}
ma.Update(4)
if ma.Average() != 3 {
t.Errorf("Average() = %v, want 3", ma.Average())
}

for i := 0; i < 5; i++ {
ma.Update(10)
}
if ma.Average() != 10 {
t.Errorf("Average() = %v, want 10", ma.Average())
}

for i := 0; i < 5; i++ {
ma.Update(0)
}
if ma.Average() != 0 {
t.Errorf("Average() = %v, want 0", ma.Average())
}
}

0 comments on commit bf56ebb

Please sign in to comment.