From 2473808f76e907bd6e7a92025bb32615b76518ca Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Sun, 17 Dec 2023 22:36:45 -0700 Subject: [PATCH 1/2] Use a moving average to determine messages per batch --- arbnode/batch_poster.go | 40 +++++++++++++++---------- util/arbmath/math.go | 8 ++--- util/arbmath/moving_average.go | 45 +++++++++++++++++++++++++++++ util/arbmath/moving_average_test.go | 35 ++++++++++++++++++++++ 4 files changed, 109 insertions(+), 19 deletions(-) create mode 100644 util/arbmath/moving_average.go create mode 100644 util/arbmath/moving_average_test.go diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 5d61bc5f14..594eec40d9 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -73,6 +73,7 @@ type BatchPoster struct { dataPoster *dataposter.DataPoster redisLock *redislock.Simple firstEphemeralError time.Time // first time a continuous error suspected to be ephemeral occurred + messagesPerBatch *arbmath.MovingAverage[uint64] // This is an atomic variable that should only be accessed atomically. // An estimate of the number of batches we want to post but haven't yet. // This doesn't include batches which we don't want to post yet due to the L1 bounds. @@ -258,19 +259,20 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e return nil, err } b := &BatchPoster{ - l1Reader: opts.L1Reader, - inbox: opts.Inbox, - streamer: opts.Streamer, - syncMonitor: opts.SyncMonitor, - config: opts.Config, - bridge: bridge, - seqInbox: seqInbox, - seqInboxABI: seqInboxABI, - seqInboxAddr: opts.DeployInfo.SequencerInbox, - gasRefunderAddr: opts.Config().gasRefunder, - bridgeAddr: opts.DeployInfo.Bridge, - daWriter: opts.DAWriter, - redisLock: redisLock, + l1Reader: opts.L1Reader, + inbox: opts.Inbox, + streamer: opts.Streamer, + syncMonitor: opts.SyncMonitor, + config: opts.Config, + bridge: bridge, + seqInbox: seqInbox, + seqInboxABI: seqInboxABI, + seqInboxAddr: opts.DeployInfo.SequencerInbox, + gasRefunderAddr: opts.Config().gasRefunder, + bridgeAddr: opts.DeployInfo.Bridge, + daWriter: opts.DAWriter, + redisLock: redisLock, + messagesPerBatch: arbmath.NewMovingAverage[uint64](20), } dataPosterConfigFetcher := func() *dataposter.DataPosterConfig { return &(opts.Config().DataPoster) @@ -1062,8 +1064,15 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) ) recentlyHitL1Bounds := time.Since(b.lastHitL1Bounds) < config.PollInterval*3 postedMessages := b.building.msgCount - batchPosition.MessageCount + b.messagesPerBatch.Update(uint64(postedMessages)) unpostedMessages := msgCount - b.building.msgCount - backlog := uint64(unpostedMessages) / uint64(postedMessages) + messagesPerBatch := b.messagesPerBatch.Average() + if messagesPerBatch == 0 { + // This should be impossible because we always post at least one message in a batch. + // That said, better safe than sorry, as we would panic if this remained at 0. + messagesPerBatch = 1 + } + backlog := uint64(unpostedMessages) / messagesPerBatch if backlog > 10 { logLevel := log.Warn if recentlyHitL1Bounds { @@ -1076,7 +1085,8 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) "recentlyHitL1Bounds", recentlyHitL1Bounds, "currentPosition", b.building.msgCount, "messageCount", msgCount, - "lastPostedMessages", postedMessages, + "messagesPerBatch", messagesPerBatch, + "postedMessages", postedMessages, "unpostedMessages", unpostedMessages, "batchBacklogEstimate", backlog, ) diff --git a/util/arbmath/math.go b/util/arbmath/math.go index 3d1df0ce64..eaac79bfad 100644 --- a/util/arbmath/math.go +++ b/util/arbmath/math.go @@ -47,14 +47,14 @@ type Float interface { ~float32 | ~float64 } -// Ordered is anything that implements comparison operators such as `<` and `>`. +// Number is anything that implements operators such as `<`, `+` and `/`. // Unfortunately, that doesn't include big ints. -type Ordered interface { +type Number interface { Integer | Float } // MinInt the minimum of two ints -func MinInt[T Ordered](value, ceiling T) T { +func MinInt[T Number](value, ceiling T) T { if value > ceiling { return ceiling } @@ -62,7 +62,7 @@ func MinInt[T Ordered](value, ceiling T) T { } // MaxInt the maximum of two ints -func MaxInt[T Ordered](value, floor T) T { +func MaxInt[T Number](value, floor T) T { if value < floor { return floor } diff --git a/util/arbmath/moving_average.go b/util/arbmath/moving_average.go new file mode 100644 index 0000000000..fe5a349f4c --- /dev/null +++ b/util/arbmath/moving_average.go @@ -0,0 +1,45 @@ +// Copyright 2023, Offchain Labs, Inc. +// For license information, see https://github.com/nitro/blob/master/LICENSE + +package arbmath + +// A simple moving average of a generic number type. +type MovingAverage[T Number] struct { + period int + buffer []T + bufferPosition int + sum T +} + +func NewMovingAverage[T Number](period int) *MovingAverage[T] { + if period <= 0 { + panic("MovingAverage period must be positive") + } + return &MovingAverage[T]{ + period: period, + buffer: make([]T, 0, period), + } +} + +func (a *MovingAverage[T]) Update(value T) { + if a.period <= 0 { + return + } + if len(a.buffer) < a.period { + a.buffer = append(a.buffer, value) + a.sum += value + } else { + a.sum += value + a.sum -= a.buffer[a.bufferPosition] + a.buffer[a.bufferPosition] = value + a.bufferPosition = (a.bufferPosition + 1) % a.period + } +} + +// Average returns the current moving average, or zero if no values have been added. +func (a *MovingAverage[T]) Average() T { + if len(a.buffer) == 0 { + return 0 + } + return a.sum / T(len(a.buffer)) +} diff --git a/util/arbmath/moving_average_test.go b/util/arbmath/moving_average_test.go new file mode 100644 index 0000000000..249080717c --- /dev/null +++ b/util/arbmath/moving_average_test.go @@ -0,0 +1,35 @@ +// Copyright 2023, Offchain Labs, Inc. +// For license information, see https://github.com/nitro/blob/master/LICENSE + +package arbmath + +import "testing" + +func TestMovingAverage(t *testing.T) { + ma := NewMovingAverage[int](5) + if ma.Average() != 0 { + t.Errorf("moving average should be 0 at start, got %v", ma.Average()) + } + ma.Update(2) + if ma.Average() != 2 { + t.Errorf("moving average should be 2, got %v", ma.Average()) + } + ma.Update(4) + if ma.Average() != 3 { + t.Errorf("moving average should be 3, got %v", ma.Average()) + } + + for i := 0; i < 5; i++ { + ma.Update(10) + } + if ma.Average() != 10 { + t.Errorf("moving average should be 10, got %v", ma.Average()) + } + + for i := 0; i < 5; i++ { + ma.Update(0) + } + if ma.Average() != 0 { + t.Errorf("moving average should be 0, got %v", ma.Average()) + } +} From 06269e0ec3d2efad07dc894a4e929baa0172ecc4 Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Mon, 18 Dec 2023 09:31:37 -0700 Subject: [PATCH 2/2] Address PR review comments --- arbnode/batch_poster.go | 37 ++++++++++++++++++----------- util/arbmath/moving_average.go | 17 ++++++------- util/arbmath/moving_average_test.go | 24 ++++++++++++++----- 3 files changed, 50 insertions(+), 28 deletions(-) diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 594eec40d9..a54336fd5a 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -259,20 +259,23 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e return nil, err } b := &BatchPoster{ - l1Reader: opts.L1Reader, - inbox: opts.Inbox, - streamer: opts.Streamer, - syncMonitor: opts.SyncMonitor, - config: opts.Config, - bridge: bridge, - seqInbox: seqInbox, - seqInboxABI: seqInboxABI, - seqInboxAddr: opts.DeployInfo.SequencerInbox, - gasRefunderAddr: opts.Config().gasRefunder, - bridgeAddr: opts.DeployInfo.Bridge, - daWriter: opts.DAWriter, - redisLock: redisLock, - messagesPerBatch: arbmath.NewMovingAverage[uint64](20), + l1Reader: opts.L1Reader, + inbox: opts.Inbox, + streamer: opts.Streamer, + syncMonitor: opts.SyncMonitor, + config: opts.Config, + bridge: bridge, + seqInbox: seqInbox, + seqInboxABI: seqInboxABI, + seqInboxAddr: opts.DeployInfo.SequencerInbox, + gasRefunderAddr: opts.Config().gasRefunder, + bridgeAddr: opts.DeployInfo.Bridge, + daWriter: opts.DAWriter, + redisLock: redisLock, + } + b.messagesPerBatch, err = arbmath.NewMovingAverage[uint64](20) + if err != nil { + return nil, err } dataPosterConfigFetcher := func() *dataposter.DataPosterConfig { return &(opts.Config().DataPoster) @@ -1070,6 +1073,12 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) if messagesPerBatch == 0 { // This should be impossible because we always post at least one message in a batch. // That said, better safe than sorry, as we would panic if this remained at 0. + log.Warn( + "messagesPerBatch is somehow zero", + "postedMessages", postedMessages, + "buildingFrom", batchPosition.MessageCount, + "buildingTo", b.building.msgCount, + ) messagesPerBatch = 1 } backlog := uint64(unpostedMessages) / messagesPerBatch diff --git a/util/arbmath/moving_average.go b/util/arbmath/moving_average.go index fe5a349f4c..fcd5f6e307 100644 --- a/util/arbmath/moving_average.go +++ b/util/arbmath/moving_average.go @@ -3,36 +3,37 @@ package arbmath +import "fmt" + // A simple moving average of a generic number type. type MovingAverage[T Number] struct { - period int buffer []T bufferPosition int sum T } -func NewMovingAverage[T Number](period int) *MovingAverage[T] { +func NewMovingAverage[T Number](period int) (*MovingAverage[T], error) { if period <= 0 { - panic("MovingAverage period must be positive") + return nil, fmt.Errorf("MovingAverage period specified as %v but it must be positive", period) } return &MovingAverage[T]{ - period: period, buffer: make([]T, 0, period), - } + }, nil } func (a *MovingAverage[T]) Update(value T) { - if a.period <= 0 { + period := cap(a.buffer) + if period == 0 { return } - if len(a.buffer) < a.period { + if len(a.buffer) < period { a.buffer = append(a.buffer, value) a.sum += value } else { a.sum += value a.sum -= a.buffer[a.bufferPosition] a.buffer[a.bufferPosition] = value - a.bufferPosition = (a.bufferPosition + 1) % a.period + a.bufferPosition = (a.bufferPosition + 1) % period } } diff --git a/util/arbmath/moving_average_test.go b/util/arbmath/moving_average_test.go index 249080717c..9013d3f487 100644 --- a/util/arbmath/moving_average_test.go +++ b/util/arbmath/moving_average_test.go @@ -6,30 +6,42 @@ package arbmath import "testing" func TestMovingAverage(t *testing.T) { - ma := NewMovingAverage[int](5) + _, err := NewMovingAverage[int](0) + if err == nil { + t.Error("Expected error when creating moving average of period 0") + } + _, err = NewMovingAverage[int](-1) + if err == nil { + t.Error("Expected error when creating moving average of period -1") + } + + ma, err := NewMovingAverage[int](5) + if err != nil { + t.Fatalf("Error creating moving average of period 5: %v", err) + } if ma.Average() != 0 { - t.Errorf("moving average should be 0 at start, got %v", ma.Average()) + t.Errorf("Average() = %v, want 0", ma.Average()) } ma.Update(2) if ma.Average() != 2 { - t.Errorf("moving average should be 2, got %v", ma.Average()) + t.Errorf("Average() = %v, want 2", ma.Average()) } ma.Update(4) if ma.Average() != 3 { - t.Errorf("moving average should be 3, got %v", ma.Average()) + t.Errorf("Average() = %v, want 3", ma.Average()) } for i := 0; i < 5; i++ { ma.Update(10) } if ma.Average() != 10 { - t.Errorf("moving average should be 10, got %v", ma.Average()) + t.Errorf("Average() = %v, want 10", ma.Average()) } for i := 0; i < 5; i++ { ma.Update(0) } if ma.Average() != 0 { - t.Errorf("moving average should be 0, got %v", ma.Average()) + t.Errorf("Average() = %v, want 0", ma.Average()) } }