Skip to content

Commit

Permalink
Merge branch 'master' into flag-for-clearing-leveldb
Browse files Browse the repository at this point in the history
  • Loading branch information
anodar authored Sep 20, 2023
2 parents 9fd0ff5 + b359dbc commit 11a2a3c
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 7 deletions.
2 changes: 1 addition & 1 deletion broadcaster/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type ConfirmedSequenceNumberMessage struct {
}

func NewBroadcaster(config wsbroadcastserver.BroadcasterConfigFetcher, chainId uint64, feedErrChan chan error, dataSigner signature.DataSignerFunc) *Broadcaster {
catchupBuffer := NewSequenceNumberCatchupBuffer(func() bool { return config().LimitCatchup })
catchupBuffer := NewSequenceNumberCatchupBuffer(func() bool { return config().LimitCatchup }, func() int { return config().MaxCatchup })
return &Broadcaster{
server: wsbroadcastserver.NewWSBroadcastServer(config, catchupBuffer, chainId, feedErrChan),
catchupBuffer: catchupBuffer,
Expand Down
29 changes: 23 additions & 6 deletions broadcaster/sequencenumbercatchupbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ type SequenceNumberCatchupBuffer struct {
messages []*BroadcastFeedMessage
messageCount int32
limitCatchup func() bool
maxCatchup func() int
}

func NewSequenceNumberCatchupBuffer(limitCatchup func() bool) *SequenceNumberCatchupBuffer {
func NewSequenceNumberCatchupBuffer(limitCatchup func() bool, maxCatchup func() int) *SequenceNumberCatchupBuffer {
return &SequenceNumberCatchupBuffer{
limitCatchup: limitCatchup,
maxCatchup: maxCatchup,
}
}

Expand Down Expand Up @@ -98,6 +100,15 @@ func (b *SequenceNumberCatchupBuffer) OnRegisterClient(clientConnection *wsbroad
return nil, bmCount, time.Since(start)
}

// Takes as input an index into the messages array, not a message index
func (b *SequenceNumberCatchupBuffer) pruneBufferToIndex(idx int) {
b.messages = b.messages[idx:]
if len(b.messages) > 10 && cap(b.messages) > len(b.messages)*10 {
// Too much spare capacity, copy to fresh slice to reset memory usage
b.messages = append([]*BroadcastFeedMessage(nil), b.messages[:len(b.messages)]...)
}
}

func (b *SequenceNumberCatchupBuffer) deleteConfirmed(confirmedSequenceNumber arbutil.MessageIndex) {
if len(b.messages) == 0 {
return
Expand Down Expand Up @@ -126,11 +137,7 @@ func (b *SequenceNumberCatchupBuffer) deleteConfirmed(confirmedSequenceNumber ar
return
}

b.messages = b.messages[confirmedIndex+1:]
if len(b.messages) > 10 && cap(b.messages) > len(b.messages)*10 {
// Too much spare capacity, copy to fresh slice to reset memory usage
b.messages = append([]*BroadcastFeedMessage(nil), b.messages[:len(b.messages)]...)
}
b.pruneBufferToIndex(int(confirmedIndex) + 1)
}

func (b *SequenceNumberCatchupBuffer) OnDoBroadcast(bmi interface{}) error {
Expand All @@ -147,6 +154,12 @@ func (b *SequenceNumberCatchupBuffer) OnDoBroadcast(bmi interface{}) error {
confirmedSequenceNumberGauge.Update(int64(confirmMsg.SequenceNumber))
}

maxCatchup := b.maxCatchup()
if maxCatchup == 0 {
b.messages = nil
return nil
}

for _, newMsg := range broadcastMessage.Messages {
if len(b.messages) == 0 {
// Add to empty list
Expand All @@ -167,6 +180,10 @@ func (b *SequenceNumberCatchupBuffer) OnDoBroadcast(bmi interface{}) error {
}
}

if maxCatchup >= 0 && len(b.messages) > maxCatchup {
b.pruneBufferToIndex(len(b.messages) - maxCatchup)
}

return nil

}
Expand Down
46 changes: 46 additions & 0 deletions broadcaster/sequencenumbercatchupbuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import (

"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/util/arbmath"
)

func TestGetEmptyCacheMessages(t *testing.T) {
buffer := SequenceNumberCatchupBuffer{
messages: nil,
messageCount: 0,
limitCatchup: func() bool { return false },
maxCatchup: func() int { return -1 },
}

// Get everything
Expand Down Expand Up @@ -60,6 +62,7 @@ func TestGetCacheMessages(t *testing.T) {
messages: createDummyBroadcastMessages(indexes),
messageCount: int32(len(indexes)),
limitCatchup: func() bool { return false },
maxCatchup: func() int { return -1 },
}

// Get everything
Expand Down Expand Up @@ -110,6 +113,7 @@ func TestDeleteConfirmedNil(t *testing.T) {
messages: nil,
messageCount: 0,
limitCatchup: func() bool { return false },
maxCatchup: func() int { return -1 },
}

buffer.deleteConfirmed(0)
Expand All @@ -124,6 +128,7 @@ func TestDeleteConfirmInvalidOrder(t *testing.T) {
messages: createDummyBroadcastMessages(indexes),
messageCount: int32(len(indexes)),
limitCatchup: func() bool { return false },
maxCatchup: func() int { return -1 },
}

// Confirm before cache
Expand All @@ -139,6 +144,7 @@ func TestDeleteConfirmed(t *testing.T) {
messages: createDummyBroadcastMessages(indexes),
messageCount: int32(len(indexes)),
limitCatchup: func() bool { return false },
maxCatchup: func() int { return -1 },
}

// Confirm older than cache
Expand All @@ -154,6 +160,7 @@ func TestDeleteFreeMem(t *testing.T) {
messages: createDummyBroadcastMessagesImpl(indexes, len(indexes)*10+1),
messageCount: int32(len(indexes)),
limitCatchup: func() bool { return false },
maxCatchup: func() int { return -1 },
}

// Confirm older than cache
Expand All @@ -169,6 +176,7 @@ func TestBroadcastBadMessage(t *testing.T) {
messages: nil,
messageCount: 0,
limitCatchup: func() bool { return false },
maxCatchup: func() int { return -1 },
}

var foo int
Expand All @@ -187,6 +195,7 @@ func TestBroadcastPastSeqNum(t *testing.T) {
messages: createDummyBroadcastMessagesImpl(indexes, len(indexes)*10+1),
messageCount: int32(len(indexes)),
limitCatchup: func() bool { return false },
maxCatchup: func() int { return -1 },
}

bm := BroadcastMessage{
Expand All @@ -208,6 +217,8 @@ func TestBroadcastFutureSeqNum(t *testing.T) {
buffer := SequenceNumberCatchupBuffer{
messages: createDummyBroadcastMessagesImpl(indexes, len(indexes)*10+1),
messageCount: int32(len(indexes)),
limitCatchup: func() bool { return false },
maxCatchup: func() int { return -1 },
}

bm := BroadcastMessage{
Expand All @@ -223,3 +234,38 @@ func TestBroadcastFutureSeqNum(t *testing.T) {
}

}

func TestMaxCatchupBufferSize(t *testing.T) {
limit := 5
buffer := SequenceNumberCatchupBuffer{
messages: nil,
messageCount: 0,
limitCatchup: func() bool { return false },
maxCatchup: func() int { return limit },
}

firstMessage := 10
for i := firstMessage; i <= 20; i += 2 {
bm := BroadcastMessage{
Messages: []*BroadcastFeedMessage{
{
SequenceNumber: arbutil.MessageIndex(i),
},
{
SequenceNumber: arbutil.MessageIndex(i + 1),
},
},
}
err := buffer.OnDoBroadcast(bm)
Require(t, err)
haveMessages := buffer.getCacheMessages(0)
expectedCount := arbmath.MinInt(i+len(bm.Messages)-firstMessage, limit)
if len(haveMessages.Messages) != expectedCount {
t.Errorf("after broadcasting messages %v and %v, expected to have %v messages but got %v", i, i+1, expectedCount, len(haveMessages.Messages))
}
expectedFirstMessage := arbutil.MessageIndex(arbmath.MaxInt(firstMessage, i+len(bm.Messages)-limit))
if haveMessages.Messages[0].SequenceNumber != expectedFirstMessage {
t.Errorf("after broadcasting messages %v and %v, expected the first message to be %v but got %v", i, i+1, expectedFirstMessage, haveMessages.Messages[0].SequenceNumber)
}
}
}
4 changes: 4 additions & 0 deletions wsbroadcastserver/wsbroadcastserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type BroadcasterConfig struct {
EnableCompression bool `koanf:"enable-compression" reload:"hot"` // if reloaded to false will cause disconnection of clients with enabled compression on next broadcast
RequireCompression bool `koanf:"require-compression" reload:"hot"` // if reloaded to true will cause disconnection of clients with disabled compression on next broadcast
LimitCatchup bool `koanf:"limit-catchup" reload:"hot"`
MaxCatchup int `koanf:"max-catchup" reload:"hot"`
ConnectionLimits ConnectionLimiterConfig `koanf:"connection-limits" reload:"hot"`
ClientDelay time.Duration `koanf:"client-delay" reload:"hot"`
}
Expand Down Expand Up @@ -93,6 +94,7 @@ func BroadcasterConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Bool(prefix+".enable-compression", DefaultBroadcasterConfig.EnableCompression, "enable per message deflate compression support")
f.Bool(prefix+".require-compression", DefaultBroadcasterConfig.RequireCompression, "require clients to use compression")
f.Bool(prefix+".limit-catchup", DefaultBroadcasterConfig.LimitCatchup, "only supply catchup buffer if requested sequence number is reasonable")
f.Int(prefix+".max-catchup", DefaultBroadcasterConfig.MaxCatchup, "the maximum size of the catchup buffer (-1 means unlimited)")
ConnectionLimiterConfigAddOptions(prefix+".connection-limits", f)
f.Duration(prefix+".client-delay", DefaultBroadcasterConfig.ClientDelay, "delay the first messages sent to each client by this amount")
}
Expand All @@ -117,6 +119,7 @@ var DefaultBroadcasterConfig = BroadcasterConfig{
EnableCompression: true,
RequireCompression: false,
LimitCatchup: false,
MaxCatchup: -1,
ConnectionLimits: DefaultConnectionLimiterConfig,
ClientDelay: 0,
}
Expand All @@ -141,6 +144,7 @@ var DefaultTestBroadcasterConfig = BroadcasterConfig{
EnableCompression: true,
RequireCompression: false,
LimitCatchup: false,
MaxCatchup: -1,
ConnectionLimits: DefaultConnectionLimiterConfig,
ClientDelay: 0,
}
Expand Down

0 comments on commit 11a2a3c

Please sign in to comment.