Skip to content

Commit

Permalink
swap catchupbuffer for backlog in wsbroadcastserver
Browse files Browse the repository at this point in the history
  • Loading branch information
lambchr committed Oct 18, 2023
1 parent 3382abb commit b21297d
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 81 deletions.
30 changes: 25 additions & 5 deletions broadcaster/backlog/backlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ var (
)

type Backlog interface {
Head() BacklogSegment
Append(bm *m.BroadcastMessage) error
Get(start, end uint64) (*m.BroadcastMessage, error)
MessageCount() int
Count() int
}

type backlog struct {
Expand All @@ -40,6 +41,10 @@ func NewBacklog(c ConfigFetcher) Backlog {
}
}

func (b *backlog) Head() BacklogSegment {
return b.head.Load()
}

// Append will add the given messages to the backlogSegment at head until
// that segment reaches its limit. If messages remain to be added a new segment
// will be created.
Expand All @@ -50,6 +55,8 @@ func (b *backlog) Append(bm *m.BroadcastMessage) error {
// add to metric?
}

// TODO(clamb): Do I need a max catchup config for the backlog? Similar to catchup buffer

for _, msg := range bm.Messages {
s := b.tail.Load()
if s == nil {
Expand All @@ -59,7 +66,7 @@ func (b *backlog) Append(bm *m.BroadcastMessage) error {
}

prevMsgIdx := s.end.Load()
if s.MessageCount() >= b.config().SegmentLimit {
if s.count() >= b.config().SegmentLimit {
nextS := &backlogSegment{}
s.nextSegment.Store(nextS)
prevMsgIdx = s.end.Load()
Expand Down Expand Up @@ -207,7 +214,7 @@ func (b *backlog) lookup(i uint64) (*backlogSegment, error) {
return s, nil
}

func (s *backlog) MessageCount() int {
func (s *backlog) Count() int {
return int(s.messageCount.Load())
}

Expand All @@ -221,6 +228,11 @@ func (b *backlog) reset() {
b.messageCount.Store(0)
}

type BacklogSegment interface {
Next() BacklogSegment
Messages() []*m.BroadcastFeedMessage
}

type backlogSegment struct {
start atomic.Uint64
end atomic.Uint64
Expand All @@ -230,6 +242,14 @@ type backlogSegment struct {
previousSegment atomic.Pointer[backlogSegment]
}

func (s *backlogSegment) Next() BacklogSegment {
return s.nextSegment.Load()
}

func (s *backlogSegment) Messages() []*m.BroadcastFeedMessage {
return s.messages
}

// get reads messages from the given start to end MessageIndex
func (s *backlogSegment) get(start, end uint64) ([]*m.BroadcastFeedMessage, error) {
noMsgs := []*m.BroadcastFeedMessage{}
Expand Down Expand Up @@ -291,7 +311,7 @@ func (s *backlogSegment) updateSegment(seen *bool) {
}
}

// MessageCount returns the number of messages stored in the backlog
func (s *backlogSegment) MessageCount() int {
// count returns the number of messages stored in the backlog segment
func (s *backlogSegment) count() int {
return int(s.messageCount.Load())
}
4 changes: 2 additions & 2 deletions broadcaster/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (b *Broadcaster) BroadcastSingleFeedMessage(bfm *m.BroadcastFeedMessage) {

func (b *Broadcaster) BroadcastFeedMessages(messages []*m.BroadcastFeedMessage) {

bm := m.BroadcastMessage{
bm := &m.BroadcastMessage{
Version: 1,
Messages: messages,
}
Expand All @@ -90,7 +90,7 @@ func (b *Broadcaster) BroadcastFeedMessages(messages []*m.BroadcastFeedMessage)

func (b *Broadcaster) Confirm(seq arbutil.MessageIndex) {
log.Debug("confirming sequence number", "sequenceNumber", seq)
b.server.Broadcast(m.BroadcastMessage{
b.server.Broadcast(&m.BroadcastMessage{
Version: 1,
ConfirmedSequenceNumberMessage: &m.ConfirmedSequenceNumberMessage{seq}})
}
Expand Down
123 changes: 102 additions & 21 deletions wsbroadcastserver/clientconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,26 @@ package wsbroadcastserver

import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcaster/backlog"

"github.com/gobwas/ws"
"github.com/gobwas/ws/wsflate"
"github.com/mailru/easygo/netpoll"
"github.com/offchainlabs/nitro/util/stopwaiter"
)

var errContextDone = errors.New("context done")

// ClientConnection represents client connection.
type ClientConnection struct {
stopwaiter.StopWaiter
Expand All @@ -36,6 +41,8 @@ type ClientConnection struct {

lastHeardUnix int64
out chan []byte
backlog backlog.Backlog
registered chan bool

compression bool
flateReader *wsflate.Reader
Expand All @@ -51,6 +58,7 @@ func NewClientConnection(
connectingIP net.IP,
compression bool,
delay time.Duration,
bklg backlog.Backlog,
) *ClientConnection {
return &ClientConnection{
conn: conn,
Expand All @@ -65,6 +73,8 @@ func NewClientConnection(
compression: compression,
flateReader: NewFlateReader(),
delay: delay,
backlog: bklg,
registered: make(chan bool, 1),
}
}

Expand All @@ -76,33 +86,98 @@ func (cc *ClientConnection) Compression() bool {
return cc.compression
}

func (cc *ClientConnection) writeBacklog(ctx context.Context, segment backlog.BacklogSegment) (backlog.BacklogSegment, error) {
for segment != nil {
select {
case <-ctx.Done():
return nil, errContextDone
default:
}

bm := &m.BroadcastMessage{

Check failure on line 97 in wsbroadcastserver/clientconnection.go

View workflow job for this annotation

GitHub Actions / Go Tests (defaults)

undefined: m

Check failure on line 97 in wsbroadcastserver/clientconnection.go

View workflow job for this annotation

GitHub Actions / Go Tests (race)

undefined: m

Check failure on line 97 in wsbroadcastserver/clientconnection.go

View workflow job for this annotation

GitHub Actions / Go Tests (challenge)

undefined: m
Messages: segment.Messages(),
}
notCompressed, compressed, err := serializeMessage(cc.clientManager, bm, !cc.compression, cc.compression)
if err != nil {
return nil, err
}

data := []byte{}
if cc.compression {
data = compressed.Bytes()
} else {
data = notCompressed.Bytes()
}
err := cc.writeRaw(data)

Check failure on line 111 in wsbroadcastserver/clientconnection.go

View workflow job for this annotation

GitHub Actions / Go Tests (defaults)

no new variables on left side of :=

Check failure on line 111 in wsbroadcastserver/clientconnection.go

View workflow job for this annotation

GitHub Actions / Go Tests (race)

no new variables on left side of :=

Check failure on line 111 in wsbroadcastserver/clientconnection.go

View workflow job for this annotation

GitHub Actions / Go Tests (challenge)

no new variables on left side of :=
if err != nil {
return nil, err
}
log.Debug("segment sent to client", "client", cc.Name, "sentCount", len(bm.Messages))

prevSegment = segment

Check failure on line 117 in wsbroadcastserver/clientconnection.go

View workflow job for this annotation

GitHub Actions / Go Tests (defaults)

undefined: prevSegment

Check failure on line 117 in wsbroadcastserver/clientconnection.go

View workflow job for this annotation

GitHub Actions / Go Tests (race)

undefined: prevSegment

Check failure on line 117 in wsbroadcastserver/clientconnection.go

View workflow job for this annotation

GitHub Actions / Go Tests (challenge)

undefined: prevSegment
segment = segment.Next()
}
return prevSegment, nil

Check failure on line 120 in wsbroadcastserver/clientconnection.go

View workflow job for this annotation

GitHub Actions / Go Tests (defaults)

undefined: prevSegment

Check failure on line 120 in wsbroadcastserver/clientconnection.go

View workflow job for this annotation

GitHub Actions / Go Tests (race)

undefined: prevSegment

Check failure on line 120 in wsbroadcastserver/clientconnection.go

View workflow job for this annotation

GitHub Actions / Go Tests (challenge)

undefined: prevSegment

}

func (cc *ClientConnection) Start(parentCtx context.Context) {
cc.StopWaiter.Start(parentCtx, cc)
cc.LaunchThread(func(ctx context.Context) {
// A delay may be configured, ensures the Broadcaster delays before any
// messages are sent to the client. The ClientConnection has not been
// registered so the out channel filling is not a concern.
if cc.delay != 0 {
var delayQueue [][]byte
t := time.NewTimer(cc.delay)
done := false
for !done {
select {
case <-ctx.Done():
return
case data := <-cc.out:
delayQueue = append(delayQueue, data)
case <-t.C:
for _, data := range delayQueue {
err := cc.writeRaw(data)
if err != nil {
logWarn(err, "error writing data to client")
cc.clientManager.Remove(cc)
return
}
}
done = true
}
select {
case <-ctx.Done():
return
case <-t.C:
}
}

// Send the current backlog before registering the ClientConnection in
// case the backlog is very large
head := cc.backlog.Head()
segment, err := cc.writeBacklog(ctx, head)
if errors.Is(err, errContextDone) {
return
} else if err != nil {
logWarn(err, "error writing messages from backlog")
cc.clientManager.Remove(cc)
return
}

cc.clientManager.Register(cc)
timer := time.NewTimer(5 * time.Second)
select {
case <-ctx.Done():
return
case <-cc.registered:
log.Debug("ClientConnection registered with ClientManager", "client", cc.Name)
case <-timer.C:
log.Warn("timed out waiting for ClientConnection to register with ClientManager", "client", cc.Name)
}

// The backlog may have had more messages added to it whilst the
// ClientConnection registers with the ClientManager, therefore the
// last segment must be sent again. This may result in duplicate
// messages being sent to the client but the client should handle any
// duplicate messages. The ClientConnection can not be registered
// before the backlog is sent as the backlog may be very large. This
// could result in the out channel running out of space.
_, err := cc.writeBacklog(ctx, head)

Check failure on line 169 in wsbroadcastserver/clientconnection.go

View workflow job for this annotation

GitHub Actions / Go Tests (defaults)

no new variables on left side of :=

Check failure on line 169 in wsbroadcastserver/clientconnection.go

View workflow job for this annotation

GitHub Actions / Go Tests (race)

no new variables on left side of :=

Check failure on line 169 in wsbroadcastserver/clientconnection.go

View workflow job for this annotation

GitHub Actions / Go Tests (challenge)

no new variables on left side of :=
if errors.Is(err, errContextDone) {
return
} else if err != nil {
logWarn(err, "error writing messages from backlog")
cc.clientManager.Remove(cc)
return
}

// TODO(clamb): does this still need to consider the requested seq number from the client? currently it just sends everything in the backlog

// broadcast any new messages sent to the out channel
for {
select {
case <-ctx.Done():
Expand All @@ -119,6 +194,12 @@ func (cc *ClientConnection) Start(parentCtx context.Context) {
})
}

// Registered is used by the ClientManager to indicate that ClientConnection
// has been registered with the ClientManager
func (cc *ClientConnection) Registered() {
cc.registered <- true
}

func (cc *ClientConnection) StopOnly() {
// Ignore errors from conn.Close since we are just shutting down
_ = cc.conn.Close()
Expand Down Expand Up @@ -161,11 +242,11 @@ func (cc *ClientConnection) readRequest(ctx context.Context, timeout time.Durati
return data, opCode, err
}

func (cc *ClientConnection) Write(x interface{}) error {
func (cc *ClientConnection) Write(bm *m.BroadcastMessage) error {

Check failure on line 245 in wsbroadcastserver/clientconnection.go

View workflow job for this annotation

GitHub Actions / Go Tests (defaults)

undefined: m

Check failure on line 245 in wsbroadcastserver/clientconnection.go

View workflow job for this annotation

GitHub Actions / Go Tests (race)

undefined: m

Check failure on line 245 in wsbroadcastserver/clientconnection.go

View workflow job for this annotation

GitHub Actions / Go Tests (challenge)

undefined: m
cc.ioMutex.Lock()
defer cc.ioMutex.Unlock()

notCompressed, compressed, err := serializeMessage(cc.clientManager, x, !cc.compression, cc.compression)
notCompressed, compressed, err := serializeMessage(cc.clientManager, bm, !cc.compression, cc.compression)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit b21297d

Please sign in to comment.