diff --git a/broadcaster/backlog/backlog.go b/broadcaster/backlog/backlog.go index 549a38ff86..f6501105c2 100644 --- a/broadcaster/backlog/backlog.go +++ b/broadcaster/backlog/backlog.go @@ -19,6 +19,8 @@ var ( errOutOfBounds = errors.New("message not found in backlog") confirmedSequenceNumberGauge = metrics.NewRegisteredGauge("arb/sequencenumber/confirmed", nil) + backlogSizeInBytesGauge = metrics.NewRegisteredGauge("arb/feed/backlog/bytes", nil) + backlogSizeGauge = metrics.NewRegisteredGauge("arb/feed/backlog/messages", nil) ) // Backlog defines the interface for backlog. @@ -54,6 +56,36 @@ func (b *backlog) Head() BacklogSegment { return b.head.Load() } +func (b *backlog) backlogSizeInBytes() (uint64, error) { + headSeg := b.head.Load() + tailSeg := b.tail.Load() + if headSeg == nil || tailSeg == nil { + if headSeg == nil && tailSeg == nil { + return 0, nil + } + return 0, errors.New("the head or tail segment of feed backlog is nil") + } + + headSeg.messagesLock.RLock() + if len(headSeg.messages) == 0 { + return 0, errors.New("head segment of the feed backlog is empty") + } + headMsg := headSeg.messages[0] + headSeg.messagesLock.RUnlock() + + tailSeg.messagesLock.RLock() + if len(tailSeg.messages) == 0 { + return 0, errors.New("tail segment of the feed backlog is empty") + } + tailMsg := tailSeg.messages[len(tailSeg.messages)-1] + size := tailMsg.CumulativeSumMsgSize + tailSeg.messagesLock.RUnlock() + + size -= headMsg.CumulativeSumMsgSize + size += headMsg.Size() + return size, nil +} + // Append will add the given messages to the backlogSegment at head until // that segment reaches its limit. If messages remain to be added a new segment // will be created. @@ -61,6 +93,12 @@ func (b *backlog) Append(bm *m.BroadcastMessage) error { if bm.ConfirmedSequenceNumberMessage != nil { b.delete(uint64(bm.ConfirmedSequenceNumberMessage.SequenceNumber)) + size, err := b.backlogSizeInBytes() + if err != nil { + log.Warn("error calculating backlogSizeInBytes", "err", err) + } else { + backlogSizeInBytesGauge.Update(int64(size)) + } } lookupByIndex := b.lookupByIndex.Load() @@ -75,6 +113,12 @@ func (b *backlog) Append(bm *m.BroadcastMessage) error { prevMsgIdx := segment.End() if segment.count() >= b.config().SegmentLimit { + segment.messagesLock.RLock() + if len(segment.messages) > 0 { + msg.CumulativeSumMsgSize = segment.messages[len(segment.messages)-1].CumulativeSumMsgSize + } + segment.messagesLock.RUnlock() + nextSegment := newBacklogSegment() segment.nextSegment.Store(nextSegment) prevMsgIdx = segment.End() @@ -89,6 +133,7 @@ func (b *backlog) Append(bm *m.BroadcastMessage) error { b.head.Store(segment) b.tail.Store(segment) b.messageCount.Store(0) + backlogSizeInBytesGauge.Update(0) log.Warn(err.Error()) } else if errors.Is(err, errSequenceNumberSeen) { log.Info("ignoring message sequence number, already in backlog", "message sequence number", msg.SequenceNumber) @@ -98,8 +143,10 @@ func (b *backlog) Append(bm *m.BroadcastMessage) error { } lookupByIndex.Store(uint64(msg.SequenceNumber), segment) b.messageCount.Add(1) + backlogSizeInBytesGauge.Inc(int64(msg.Size())) } + backlogSizeGauge.Update(int64(b.Count())) return nil } @@ -239,6 +286,8 @@ func (b *backlog) reset() { b.tail.Store(nil) b.lookupByIndex.Store(&containers.SyncMap[uint64, *backlogSegment]{}) b.messageCount.Store(0) + backlogSizeInBytesGauge.Update(0) + backlogSizeGauge.Update(0) } // BacklogSegment defines the interface for backlogSegment. @@ -361,9 +410,15 @@ func (s *backlogSegment) append(prevMsgIdx uint64, msg *m.BroadcastFeedMessage) s.messagesLock.Lock() defer s.messagesLock.Unlock() + prevCumulativeSum := uint64(0) + if len(s.messages) > 0 { + prevCumulativeSum = s.messages[len(s.messages)-1].CumulativeSumMsgSize + } if expSeqNum := prevMsgIdx + 1; prevMsgIdx == 0 || uint64(msg.SequenceNumber) == expSeqNum { + msg.UpdateCumulativeSumMsgSize(prevCumulativeSum) s.messages = append(s.messages, msg) } else if uint64(msg.SequenceNumber) > expSeqNum { + msg.UpdateCumulativeSumMsgSize(prevCumulativeSum) s.messages = nil s.messages = append(s.messages, msg) return fmt.Errorf("new message sequence number (%d) is greater than the expected sequence number (%d): %w", msg.SequenceNumber, expSeqNum, errDropSegments) diff --git a/broadcaster/message/message.go b/broadcaster/message/message.go index f436e765cb..a575ae5cd0 100644 --- a/broadcaster/message/message.go +++ b/broadcaster/message/message.go @@ -35,6 +35,16 @@ type BroadcastFeedMessage struct { SequenceNumber arbutil.MessageIndex `json:"sequenceNumber"` Message arbostypes.MessageWithMetadata `json:"message"` Signature []byte `json:"signature"` + + CumulativeSumMsgSize uint64 `json:"-"` +} + +func (m *BroadcastFeedMessage) Size() uint64 { + return uint64(len(m.Signature) + len(m.Message.Message.L2msg) + 160) +} + +func (m *BroadcastFeedMessage) UpdateCumulativeSumMsgSize(val uint64) { + m.CumulativeSumMsgSize += val + m.Size() } func (m *BroadcastFeedMessage) Hash(chainId uint64) (common.Hash, error) {