From 4e73be72871aa342b918c568c3439a11256278c6 Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Thu, 21 Dec 2023 00:37:17 -0700 Subject: [PATCH 1/4] Always broadcast feed, even when the node isn't the sequencer --- arbnode/transaction_streamer.go | 12 ++++++------ broadcaster/broadcaster.go | 25 ++++++++++++++++++++++++- 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index db0658f923..24ef2a7cc4 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -862,12 +862,6 @@ func (s *TransactionStreamer) WriteMessageFromSequencer(pos arbutil.MessageIndex return err } - if s.broadcastServer != nil { - if err := s.broadcastServer.BroadcastSingle(msgWithMeta, pos); err != nil { - log.Error("failed broadcasting message", "pos", pos, "err", err) - } - } - return nil } @@ -927,6 +921,12 @@ func (s *TransactionStreamer) writeMessages(pos arbutil.MessageIndex, messages [ default: } + if s.broadcastServer != nil { + if err := s.broadcastServer.BroadcastMessages(messages, pos); err != nil { + log.Error("failed broadcasting message", "pos", pos, "err", err) + } + } + return nil } diff --git a/broadcaster/broadcaster.go b/broadcaster/broadcaster.go index 8a70e39810..38ffb0696c 100644 --- a/broadcaster/broadcaster.go +++ b/broadcaster/broadcaster.go @@ -5,6 +5,7 @@ package broadcaster import ( "context" + "errors" "net" "github.com/gobwas/ws" @@ -56,10 +57,11 @@ func (b *Broadcaster) NewBroadcastFeedMessage(message arbostypes.MessageWithMeta }, nil } -func (b *Broadcaster) BroadcastSingle(msg arbostypes.MessageWithMetadata, seq arbutil.MessageIndex) error { +func (b *Broadcaster) BroadcastSingle(msg arbostypes.MessageWithMetadata, seq arbutil.MessageIndex) (err error) { defer func() { if r := recover(); r != nil { log.Error("recovered error in BroadcastSingle", "recover", r) + err = errors.New("panic in BroadcastSingle") } }() bfm, err := b.NewBroadcastFeedMessage(msg, seq) @@ -79,6 +81,27 @@ func (b *Broadcaster) BroadcastSingleFeedMessage(bfm *m.BroadcastFeedMessage) { b.BroadcastFeedMessages(broadcastFeedMessages) } +func (b *Broadcaster) BroadcastMessages(messages []arbostypes.MessageWithMetadata, seq arbutil.MessageIndex) (err error) { + defer func() { + if r := recover(); r != nil { + log.Error("recovered error in BroadcastMessages", "recover", r) + err = errors.New("panic in BroadcastMessages") + } + }() + var feedMessages []*m.BroadcastFeedMessage + for _, msg := range messages { + bfm, err := b.NewBroadcastFeedMessage(msg, seq) + if err != nil { + return err + } + feedMessages = append(feedMessages, bfm) + } + + b.BroadcastFeedMessages(feedMessages) + + return nil +} + func (b *Broadcaster) BroadcastFeedMessages(messages []*m.BroadcastFeedMessage) { bm := &m.BroadcastMessage{ From ae54e0852e53e78f6fb7fafd531bdb066dec7369 Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Thu, 21 Dec 2023 00:42:56 -0700 Subject: [PATCH 2/4] Add comment about PopulateFeedBacklog in seq coordinator being redundant --- arbnode/seq_coordinator.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index cb6f4fe502..ecf38ddf42 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -650,6 +650,8 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { log.Warn("failed sequencing delayed messages after catching lock", "err", err) } } + // This should be redundant now that even non-primary sequencers broadcast over the feed, + // but the backlog efficiently deduplicates messages, so better safe than sorry. err = c.streamer.PopulateFeedBacklog() if err != nil { log.Warn("failed to populate the feed backlog on lockout acquisition", "err", err) From 11d76f62213f50540c280253fc0c5dc8efe31b59 Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Thu, 21 Dec 2023 10:21:31 -0700 Subject: [PATCH 3/4] Fix broadcastSegment Contains and its usage --- broadcaster/backlog/backlog.go | 2 +- wsbroadcastserver/clientconnection.go | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/broadcaster/backlog/backlog.go b/broadcaster/backlog/backlog.go index 851561f482..0ef32167fe 100644 --- a/broadcaster/backlog/backlog.go +++ b/broadcaster/backlog/backlog.go @@ -379,7 +379,7 @@ func (s *backlogSegment) Contains(i uint64) bool { s.messagesLock.RLock() defer s.messagesLock.RUnlock() start := s.start() - if i < start || i > s.end() { + if i < start || i > s.end() || len(s.messages) == 0 { return false } diff --git a/wsbroadcastserver/clientconnection.go b/wsbroadcastserver/clientconnection.go index 49cd2af7e6..5e8763bd95 100644 --- a/wsbroadcastserver/clientconnection.go +++ b/wsbroadcastserver/clientconnection.go @@ -134,7 +134,10 @@ func (cc *ClientConnection) writeBacklog(ctx context.Context, segment backlog.Ba msgs := prevSegment.Messages() if prevSegment.Contains(uint64(cc.requestedSeqNum)) { requestedIdx := int(cc.requestedSeqNum) - int(prevSegment.Start()) - msgs = msgs[requestedIdx:] + // This might be false if messages were added after we fetched the segment's messages + if len(msgs) > requestedIdx { + msgs = msgs[requestedIdx:] + } } bm := &m.BroadcastMessage{ Version: m.V1, From 4b816bcb16ce04f20bef2a94c1e98dbd1a3cba58 Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Thu, 21 Dec 2023 10:23:25 -0700 Subject: [PATCH 4/4] Further improve requestedSeqNum check --- wsbroadcastserver/clientconnection.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/wsbroadcastserver/clientconnection.go b/wsbroadcastserver/clientconnection.go index 5e8763bd95..82b9603a0e 100644 --- a/wsbroadcastserver/clientconnection.go +++ b/wsbroadcastserver/clientconnection.go @@ -119,6 +119,7 @@ func (cc *ClientConnection) Remove() { func (cc *ClientConnection) writeBacklog(ctx context.Context, segment backlog.BacklogSegment) error { var prevSegment backlog.BacklogSegment + isFirstSegment := true for !backlog.IsBacklogSegmentNil(segment) { // must get the next segment before the messages to be sent are // retrieved ensures another segment is not added in between calls. @@ -132,13 +133,14 @@ func (cc *ClientConnection) writeBacklog(ctx context.Context, segment backlog.Ba } msgs := prevSegment.Messages() - if prevSegment.Contains(uint64(cc.requestedSeqNum)) { + if isFirstSegment && prevSegment.Contains(uint64(cc.requestedSeqNum)) { requestedIdx := int(cc.requestedSeqNum) - int(prevSegment.Start()) // This might be false if messages were added after we fetched the segment's messages - if len(msgs) > requestedIdx { + if len(msgs) >= requestedIdx { msgs = msgs[requestedIdx:] } } + isFirstSegment = false bm := &m.BroadcastMessage{ Version: m.V1, Messages: msgs,