Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an optional limit to the maximum size of the relay catchup buffer #1873

Merged
merged 2 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion broadcaster/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type ConfirmedSequenceNumberMessage struct {
}

func NewBroadcaster(config wsbroadcastserver.BroadcasterConfigFetcher, chainId uint64, feedErrChan chan error, dataSigner signature.DataSignerFunc) *Broadcaster {
catchupBuffer := NewSequenceNumberCatchupBuffer(func() bool { return config().LimitCatchup })
catchupBuffer := NewSequenceNumberCatchupBuffer(func() bool { return config().LimitCatchup }, func() int { return config().MaxCatchup })
return &Broadcaster{
server: wsbroadcastserver.NewWSBroadcastServer(config, catchupBuffer, chainId, feedErrChan),
catchupBuffer: catchupBuffer,
Expand Down
29 changes: 23 additions & 6 deletions broadcaster/sequencenumbercatchupbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ type SequenceNumberCatchupBuffer struct {
messages []*BroadcastFeedMessage
messageCount int32
limitCatchup func() bool
maxCatchup func() int
lambchr marked this conversation as resolved.
Show resolved Hide resolved
}

func NewSequenceNumberCatchupBuffer(limitCatchup func() bool) *SequenceNumberCatchupBuffer {
func NewSequenceNumberCatchupBuffer(limitCatchup func() bool, maxCatchup func() int) *SequenceNumberCatchupBuffer {
return &SequenceNumberCatchupBuffer{
limitCatchup: limitCatchup,
maxCatchup: maxCatchup,
}
}

Expand Down Expand Up @@ -98,6 +100,15 @@ func (b *SequenceNumberCatchupBuffer) OnRegisterClient(clientConnection *wsbroad
return nil, bmCount, time.Since(start)
}

// Takes as input an index into the messages array, not a message index
func (b *SequenceNumberCatchupBuffer) pruneBufferToIndex(idx int) {
b.messages = b.messages[idx:]
if len(b.messages) > 10 && cap(b.messages) > len(b.messages)*10 {
// Too much spare capacity, copy to fresh slice to reset memory usage
b.messages = append([]*BroadcastFeedMessage(nil), b.messages[:len(b.messages)]...)
}
}

func (b *SequenceNumberCatchupBuffer) deleteConfirmed(confirmedSequenceNumber arbutil.MessageIndex) {
if len(b.messages) == 0 {
return
Expand Down Expand Up @@ -126,11 +137,7 @@ func (b *SequenceNumberCatchupBuffer) deleteConfirmed(confirmedSequenceNumber ar
return
}

b.messages = b.messages[confirmedIndex+1:]
if len(b.messages) > 10 && cap(b.messages) > len(b.messages)*10 {
// Too much spare capacity, copy to fresh slice to reset memory usage
b.messages = append([]*BroadcastFeedMessage(nil), b.messages[:len(b.messages)]...)
}
b.pruneBufferToIndex(int(confirmedIndex) + 1)
}

func (b *SequenceNumberCatchupBuffer) OnDoBroadcast(bmi interface{}) error {
Expand All @@ -147,6 +154,12 @@ func (b *SequenceNumberCatchupBuffer) OnDoBroadcast(bmi interface{}) error {
confirmedSequenceNumberGauge.Update(int64(confirmMsg.SequenceNumber))
}

maxCatchup := b.maxCatchup()
if maxCatchup == 0 {
b.messages = nil
return nil
}

for _, newMsg := range broadcastMessage.Messages {
if len(b.messages) == 0 {
// Add to empty list
Expand All @@ -167,6 +180,10 @@ func (b *SequenceNumberCatchupBuffer) OnDoBroadcast(bmi interface{}) error {
}
}

if maxCatchup >= 0 && len(b.messages) > maxCatchup {
b.pruneBufferToIndex(len(b.messages) - maxCatchup)
}

return nil

}
Expand Down
46 changes: 46 additions & 0 deletions broadcaster/sequencenumbercatchupbuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import (

"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/util/arbmath"
)

func TestGetEmptyCacheMessages(t *testing.T) {
buffer := SequenceNumberCatchupBuffer{
messages: nil,
messageCount: 0,
limitCatchup: func() bool { return false },
maxCatchup: func() int { return -1 },
}

// Get everything
Expand Down Expand Up @@ -60,6 +62,7 @@ func TestGetCacheMessages(t *testing.T) {
messages: createDummyBroadcastMessages(indexes),
messageCount: int32(len(indexes)),
limitCatchup: func() bool { return false },
maxCatchup: func() int { return -1 },
}

// Get everything
Expand Down Expand Up @@ -110,6 +113,7 @@ func TestDeleteConfirmedNil(t *testing.T) {
messages: nil,
messageCount: 0,
limitCatchup: func() bool { return false },
maxCatchup: func() int { return -1 },
}

buffer.deleteConfirmed(0)
Expand All @@ -124,6 +128,7 @@ func TestDeleteConfirmInvalidOrder(t *testing.T) {
messages: createDummyBroadcastMessages(indexes),
messageCount: int32(len(indexes)),
limitCatchup: func() bool { return false },
maxCatchup: func() int { return -1 },
}

// Confirm before cache
Expand All @@ -139,6 +144,7 @@ func TestDeleteConfirmed(t *testing.T) {
messages: createDummyBroadcastMessages(indexes),
messageCount: int32(len(indexes)),
limitCatchup: func() bool { return false },
maxCatchup: func() int { return -1 },
}

// Confirm older than cache
Expand All @@ -154,6 +160,7 @@ func TestDeleteFreeMem(t *testing.T) {
messages: createDummyBroadcastMessagesImpl(indexes, len(indexes)*10+1),
messageCount: int32(len(indexes)),
limitCatchup: func() bool { return false },
maxCatchup: func() int { return -1 },
}

// Confirm older than cache
Expand All @@ -169,6 +176,7 @@ func TestBroadcastBadMessage(t *testing.T) {
messages: nil,
messageCount: 0,
limitCatchup: func() bool { return false },
maxCatchup: func() int { return -1 },
}

var foo int
Expand All @@ -187,6 +195,7 @@ func TestBroadcastPastSeqNum(t *testing.T) {
messages: createDummyBroadcastMessagesImpl(indexes, len(indexes)*10+1),
messageCount: int32(len(indexes)),
limitCatchup: func() bool { return false },
maxCatchup: func() int { return -1 },
}

bm := BroadcastMessage{
Expand All @@ -208,6 +217,8 @@ func TestBroadcastFutureSeqNum(t *testing.T) {
buffer := SequenceNumberCatchupBuffer{
messages: createDummyBroadcastMessagesImpl(indexes, len(indexes)*10+1),
messageCount: int32(len(indexes)),
limitCatchup: func() bool { return false },
maxCatchup: func() int { return -1 },
}

bm := BroadcastMessage{
Expand All @@ -223,3 +234,38 @@ func TestBroadcastFutureSeqNum(t *testing.T) {
}

}

func TestMaxCatchupBufferSize(t *testing.T) {
limit := 5
buffer := SequenceNumberCatchupBuffer{
messages: nil,
messageCount: 0,
limitCatchup: func() bool { return false },
maxCatchup: func() int { return limit },
}

firstMessage := 10
for i := firstMessage; i <= 20; i += 2 {
bm := BroadcastMessage{
Messages: []*BroadcastFeedMessage{
{
SequenceNumber: arbutil.MessageIndex(i),
},
{
SequenceNumber: arbutil.MessageIndex(i + 1),
},
},
}
err := buffer.OnDoBroadcast(bm)
Require(t, err)
haveMessages := buffer.getCacheMessages(0)
expectedCount := arbmath.MinInt(i+len(bm.Messages)-firstMessage, limit)
if len(haveMessages.Messages) != expectedCount {
t.Errorf("after broadcasting messages %v and %v, expected to have %v messages but got %v", i, i+1, expectedCount, len(haveMessages.Messages))
}
expectedFirstMessage := arbutil.MessageIndex(arbmath.MaxInt(firstMessage, i+len(bm.Messages)-limit))
if haveMessages.Messages[0].SequenceNumber != expectedFirstMessage {
t.Errorf("after broadcasting messages %v and %v, expected the first message to be %v but got %v", i, i+1, expectedFirstMessage, haveMessages.Messages[0].SequenceNumber)
}
}
}
4 changes: 4 additions & 0 deletions wsbroadcastserver/wsbroadcastserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type BroadcasterConfig struct {
EnableCompression bool `koanf:"enable-compression" reload:"hot"` // if reloaded to false will cause disconnection of clients with enabled compression on next broadcast
RequireCompression bool `koanf:"require-compression" reload:"hot"` // if reloaded to true will cause disconnection of clients with disabled compression on next broadcast
LimitCatchup bool `koanf:"limit-catchup" reload:"hot"`
MaxCatchup int `koanf:"max-catchup" reload:"hot"`
ConnectionLimits ConnectionLimiterConfig `koanf:"connection-limits" reload:"hot"`
ClientDelay time.Duration `koanf:"client-delay" reload:"hot"`
}
Expand Down Expand Up @@ -93,6 +94,7 @@ func BroadcasterConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Bool(prefix+".enable-compression", DefaultBroadcasterConfig.EnableCompression, "enable per message deflate compression support")
f.Bool(prefix+".require-compression", DefaultBroadcasterConfig.RequireCompression, "require clients to use compression")
f.Bool(prefix+".limit-catchup", DefaultBroadcasterConfig.LimitCatchup, "only supply catchup buffer if requested sequence number is reasonable")
f.Int(prefix+".max-catchup", DefaultBroadcasterConfig.MaxCatchup, "the maximum size of the catchup buffer (-1 means unlimited)")
ConnectionLimiterConfigAddOptions(prefix+".connection-limits", f)
f.Duration(prefix+".client-delay", DefaultBroadcasterConfig.ClientDelay, "delay the first messages sent to each client by this amount")
}
Expand All @@ -117,6 +119,7 @@ var DefaultBroadcasterConfig = BroadcasterConfig{
EnableCompression: true,
RequireCompression: false,
LimitCatchup: false,
MaxCatchup: -1,
ConnectionLimits: DefaultConnectionLimiterConfig,
ClientDelay: 0,
}
Expand All @@ -141,6 +144,7 @@ var DefaultTestBroadcasterConfig = BroadcasterConfig{
EnableCompression: true,
RequireCompression: false,
LimitCatchup: false,
MaxCatchup: -1,
ConnectionLimits: DefaultConnectionLimiterConfig,
ClientDelay: 0,
}
Expand Down