diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index cb6f4fe502..ecf38ddf42 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -650,6 +650,8 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { log.Warn("failed sequencing delayed messages after catching lock", "err", err) } } + // This should be redundant now that even non-primary sequencers broadcast over the feed, + // but the backlog efficiently deduplicates messages, so better safe than sorry. err = c.streamer.PopulateFeedBacklog() if err != nil { log.Warn("failed to populate the feed backlog on lockout acquisition", "err", err) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index db0658f923..24ef2a7cc4 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -862,12 +862,6 @@ func (s *TransactionStreamer) WriteMessageFromSequencer(pos arbutil.MessageIndex return err } - if s.broadcastServer != nil { - if err := s.broadcastServer.BroadcastSingle(msgWithMeta, pos); err != nil { - log.Error("failed broadcasting message", "pos", pos, "err", err) - } - } - return nil } @@ -927,6 +921,12 @@ func (s *TransactionStreamer) writeMessages(pos arbutil.MessageIndex, messages [ default: } + if s.broadcastServer != nil { + if err := s.broadcastServer.BroadcastMessages(messages, pos); err != nil { + log.Error("failed broadcasting message", "pos", pos, "err", err) + } + } + return nil } diff --git a/broadcaster/backlog/backlog.go b/broadcaster/backlog/backlog.go index 851561f482..0ef32167fe 100644 --- a/broadcaster/backlog/backlog.go +++ b/broadcaster/backlog/backlog.go @@ -379,7 +379,7 @@ func (s *backlogSegment) Contains(i uint64) bool { s.messagesLock.RLock() defer s.messagesLock.RUnlock() start := s.start() - if i < start || i > s.end() { + if i < start || i > s.end() || len(s.messages) == 0 { return false } diff --git a/broadcaster/broadcaster.go b/broadcaster/broadcaster.go index 8a70e39810..38ffb0696c 100644 --- a/broadcaster/broadcaster.go +++ b/broadcaster/broadcaster.go @@ -5,6 +5,7 @@ package broadcaster import ( "context" + "errors" "net" "github.com/gobwas/ws" @@ -56,10 +57,11 @@ func (b *Broadcaster) NewBroadcastFeedMessage(message arbostypes.MessageWithMeta }, nil } -func (b *Broadcaster) BroadcastSingle(msg arbostypes.MessageWithMetadata, seq arbutil.MessageIndex) error { +func (b *Broadcaster) BroadcastSingle(msg arbostypes.MessageWithMetadata, seq arbutil.MessageIndex) (err error) { defer func() { if r := recover(); r != nil { log.Error("recovered error in BroadcastSingle", "recover", r) + err = errors.New("panic in BroadcastSingle") } }() bfm, err := b.NewBroadcastFeedMessage(msg, seq) @@ -79,6 +81,27 @@ func (b *Broadcaster) BroadcastSingleFeedMessage(bfm *m.BroadcastFeedMessage) { b.BroadcastFeedMessages(broadcastFeedMessages) } +func (b *Broadcaster) BroadcastMessages(messages []arbostypes.MessageWithMetadata, seq arbutil.MessageIndex) (err error) { + defer func() { + if r := recover(); r != nil { + log.Error("recovered error in BroadcastMessages", "recover", r) + err = errors.New("panic in BroadcastMessages") + } + }() + var feedMessages []*m.BroadcastFeedMessage + for _, msg := range messages { + bfm, err := b.NewBroadcastFeedMessage(msg, seq) + if err != nil { + return err + } + feedMessages = append(feedMessages, bfm) + } + + b.BroadcastFeedMessages(feedMessages) + + return nil +} + func (b *Broadcaster) BroadcastFeedMessages(messages []*m.BroadcastFeedMessage) { bm := &m.BroadcastMessage{ diff --git a/wsbroadcastserver/clientconnection.go b/wsbroadcastserver/clientconnection.go index 49cd2af7e6..82b9603a0e 100644 --- a/wsbroadcastserver/clientconnection.go +++ b/wsbroadcastserver/clientconnection.go @@ -119,6 +119,7 @@ func (cc *ClientConnection) Remove() { func (cc *ClientConnection) writeBacklog(ctx context.Context, segment backlog.BacklogSegment) error { var prevSegment backlog.BacklogSegment + isFirstSegment := true for !backlog.IsBacklogSegmentNil(segment) { // must get the next segment before the messages to be sent are // retrieved ensures another segment is not added in between calls. @@ -132,10 +133,14 @@ func (cc *ClientConnection) writeBacklog(ctx context.Context, segment backlog.Ba } msgs := prevSegment.Messages() - if prevSegment.Contains(uint64(cc.requestedSeqNum)) { + if isFirstSegment && prevSegment.Contains(uint64(cc.requestedSeqNum)) { requestedIdx := int(cc.requestedSeqNum) - int(prevSegment.Start()) - msgs = msgs[requestedIdx:] + // This might be false if messages were added after we fetched the segment's messages + if len(msgs) >= requestedIdx { + msgs = msgs[requestedIdx:] + } } + isFirstSegment = false bm := &m.BroadcastMessage{ Version: m.V1, Messages: msgs,