diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 5d61bc5f14..a54336fd5a 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -73,6 +73,7 @@ type BatchPoster struct { dataPoster *dataposter.DataPoster redisLock *redislock.Simple firstEphemeralError time.Time // first time a continuous error suspected to be ephemeral occurred + messagesPerBatch *arbmath.MovingAverage[uint64] // This is an atomic variable that should only be accessed atomically. // An estimate of the number of batches we want to post but haven't yet. // This doesn't include batches which we don't want to post yet due to the L1 bounds. @@ -272,6 +273,10 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e daWriter: opts.DAWriter, redisLock: redisLock, } + b.messagesPerBatch, err = arbmath.NewMovingAverage[uint64](20) + if err != nil { + return nil, err + } dataPosterConfigFetcher := func() *dataposter.DataPosterConfig { return &(opts.Config().DataPoster) } @@ -1062,8 +1067,21 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) ) recentlyHitL1Bounds := time.Since(b.lastHitL1Bounds) < config.PollInterval*3 postedMessages := b.building.msgCount - batchPosition.MessageCount + b.messagesPerBatch.Update(uint64(postedMessages)) unpostedMessages := msgCount - b.building.msgCount - backlog := uint64(unpostedMessages) / uint64(postedMessages) + messagesPerBatch := b.messagesPerBatch.Average() + if messagesPerBatch == 0 { + // This should be impossible because we always post at least one message in a batch. + // That said, better safe than sorry, as we would panic if this remained at 0. + log.Warn( + "messagesPerBatch is somehow zero", + "postedMessages", postedMessages, + "buildingFrom", batchPosition.MessageCount, + "buildingTo", b.building.msgCount, + ) + messagesPerBatch = 1 + } + backlog := uint64(unpostedMessages) / messagesPerBatch if backlog > 10 { logLevel := log.Warn if recentlyHitL1Bounds { @@ -1076,7 +1094,8 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) "recentlyHitL1Bounds", recentlyHitL1Bounds, "currentPosition", b.building.msgCount, "messageCount", msgCount, - "lastPostedMessages", postedMessages, + "messagesPerBatch", messagesPerBatch, + "postedMessages", postedMessages, "unpostedMessages", unpostedMessages, "batchBacklogEstimate", backlog, ) diff --git a/util/arbmath/math.go b/util/arbmath/math.go index 3d1df0ce64..eaac79bfad 100644 --- a/util/arbmath/math.go +++ b/util/arbmath/math.go @@ -47,14 +47,14 @@ type Float interface { ~float32 | ~float64 } -// Ordered is anything that implements comparison operators such as `<` and `>`. +// Number is anything that implements operators such as `<`, `+` and `/`. // Unfortunately, that doesn't include big ints. -type Ordered interface { +type Number interface { Integer | Float } // MinInt the minimum of two ints -func MinInt[T Ordered](value, ceiling T) T { +func MinInt[T Number](value, ceiling T) T { if value > ceiling { return ceiling } @@ -62,7 +62,7 @@ func MinInt[T Ordered](value, ceiling T) T { } // MaxInt the maximum of two ints -func MaxInt[T Ordered](value, floor T) T { +func MaxInt[T Number](value, floor T) T { if value < floor { return floor } diff --git a/util/arbmath/moving_average.go b/util/arbmath/moving_average.go new file mode 100644 index 0000000000..fcd5f6e307 --- /dev/null +++ b/util/arbmath/moving_average.go @@ -0,0 +1,46 @@ +// Copyright 2023, Offchain Labs, Inc. +// For license information, see https://github.com/nitro/blob/master/LICENSE + +package arbmath + +import "fmt" + +// A simple moving average of a generic number type. +type MovingAverage[T Number] struct { + buffer []T + bufferPosition int + sum T +} + +func NewMovingAverage[T Number](period int) (*MovingAverage[T], error) { + if period <= 0 { + return nil, fmt.Errorf("MovingAverage period specified as %v but it must be positive", period) + } + return &MovingAverage[T]{ + buffer: make([]T, 0, period), + }, nil +} + +func (a *MovingAverage[T]) Update(value T) { + period := cap(a.buffer) + if period == 0 { + return + } + if len(a.buffer) < period { + a.buffer = append(a.buffer, value) + a.sum += value + } else { + a.sum += value + a.sum -= a.buffer[a.bufferPosition] + a.buffer[a.bufferPosition] = value + a.bufferPosition = (a.bufferPosition + 1) % period + } +} + +// Average returns the current moving average, or zero if no values have been added. +func (a *MovingAverage[T]) Average() T { + if len(a.buffer) == 0 { + return 0 + } + return a.sum / T(len(a.buffer)) +} diff --git a/util/arbmath/moving_average_test.go b/util/arbmath/moving_average_test.go new file mode 100644 index 0000000000..9013d3f487 --- /dev/null +++ b/util/arbmath/moving_average_test.go @@ -0,0 +1,47 @@ +// Copyright 2023, Offchain Labs, Inc. +// For license information, see https://github.com/nitro/blob/master/LICENSE + +package arbmath + +import "testing" + +func TestMovingAverage(t *testing.T) { + _, err := NewMovingAverage[int](0) + if err == nil { + t.Error("Expected error when creating moving average of period 0") + } + _, err = NewMovingAverage[int](-1) + if err == nil { + t.Error("Expected error when creating moving average of period -1") + } + + ma, err := NewMovingAverage[int](5) + if err != nil { + t.Fatalf("Error creating moving average of period 5: %v", err) + } + if ma.Average() != 0 { + t.Errorf("Average() = %v, want 0", ma.Average()) + } + ma.Update(2) + if ma.Average() != 2 { + t.Errorf("Average() = %v, want 2", ma.Average()) + } + ma.Update(4) + if ma.Average() != 3 { + t.Errorf("Average() = %v, want 3", ma.Average()) + } + + for i := 0; i < 5; i++ { + ma.Update(10) + } + if ma.Average() != 10 { + t.Errorf("Average() = %v, want 10", ma.Average()) + } + + for i := 0; i < 5; i++ { + ma.Update(0) + } + if ma.Average() != 0 { + t.Errorf("Average() = %v, want 0", ma.Average()) + } +}