Skip to content

Commit

Permalink
Merge pull request #2048 from OffchainLabs/relay-backlogsize-metrics
Browse files Browse the repository at this point in the history
Add metrics for relay backlog size and backlog size in bytes
  • Loading branch information
PlasmaPower authored Jan 11, 2024
2 parents 80f2ac8 + 1e743ca commit ce28683
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 0 deletions.
55 changes: 55 additions & 0 deletions broadcaster/backlog/backlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ var (
errOutOfBounds = errors.New("message not found in backlog")

confirmedSequenceNumberGauge = metrics.NewRegisteredGauge("arb/sequencenumber/confirmed", nil)
backlogSizeInBytesGauge = metrics.NewRegisteredGauge("arb/feed/backlog/bytes", nil)
backlogSizeGauge = metrics.NewRegisteredGauge("arb/feed/backlog/messages", nil)
)

// Backlog defines the interface for backlog.
Expand Down Expand Up @@ -54,13 +56,49 @@ func (b *backlog) Head() BacklogSegment {
return b.head.Load()
}

func (b *backlog) backlogSizeInBytes() (uint64, error) {
headSeg := b.head.Load()
tailSeg := b.tail.Load()
if headSeg == nil || tailSeg == nil {
if headSeg == nil && tailSeg == nil {
return 0, nil
}
return 0, errors.New("the head or tail segment of feed backlog is nil")
}

headSeg.messagesLock.RLock()
if len(headSeg.messages) == 0 {
return 0, errors.New("head segment of the feed backlog is empty")
}
headMsg := headSeg.messages[0]
headSeg.messagesLock.RUnlock()

tailSeg.messagesLock.RLock()
if len(tailSeg.messages) == 0 {
return 0, errors.New("tail segment of the feed backlog is empty")
}
tailMsg := tailSeg.messages[len(tailSeg.messages)-1]
size := tailMsg.CumulativeSumMsgSize
tailSeg.messagesLock.RUnlock()

size -= headMsg.CumulativeSumMsgSize
size += headMsg.Size()
return size, nil
}

// Append will add the given messages to the backlogSegment at head until
// that segment reaches its limit. If messages remain to be added a new segment
// will be created.
func (b *backlog) Append(bm *m.BroadcastMessage) error {

if bm.ConfirmedSequenceNumberMessage != nil {
b.delete(uint64(bm.ConfirmedSequenceNumberMessage.SequenceNumber))
size, err := b.backlogSizeInBytes()
if err != nil {
log.Warn("error calculating backlogSizeInBytes", "err", err)
} else {
backlogSizeInBytesGauge.Update(int64(size))
}
}

lookupByIndex := b.lookupByIndex.Load()
Expand All @@ -75,6 +113,12 @@ func (b *backlog) Append(bm *m.BroadcastMessage) error {

prevMsgIdx := segment.End()
if segment.count() >= b.config().SegmentLimit {
segment.messagesLock.RLock()
if len(segment.messages) > 0 {
msg.CumulativeSumMsgSize = segment.messages[len(segment.messages)-1].CumulativeSumMsgSize
}
segment.messagesLock.RUnlock()

nextSegment := newBacklogSegment()
segment.nextSegment.Store(nextSegment)
prevMsgIdx = segment.End()
Expand All @@ -89,6 +133,7 @@ func (b *backlog) Append(bm *m.BroadcastMessage) error {
b.head.Store(segment)
b.tail.Store(segment)
b.messageCount.Store(0)
backlogSizeInBytesGauge.Update(0)
log.Warn(err.Error())
} else if errors.Is(err, errSequenceNumberSeen) {
log.Info("ignoring message sequence number, already in backlog", "message sequence number", msg.SequenceNumber)
Expand All @@ -98,8 +143,10 @@ func (b *backlog) Append(bm *m.BroadcastMessage) error {
}
lookupByIndex.Store(uint64(msg.SequenceNumber), segment)
b.messageCount.Add(1)
backlogSizeInBytesGauge.Inc(int64(msg.Size()))
}

backlogSizeGauge.Update(int64(b.Count()))
return nil
}

Expand Down Expand Up @@ -239,6 +286,8 @@ func (b *backlog) reset() {
b.tail.Store(nil)
b.lookupByIndex.Store(&containers.SyncMap[uint64, *backlogSegment]{})
b.messageCount.Store(0)
backlogSizeInBytesGauge.Update(0)
backlogSizeGauge.Update(0)
}

// BacklogSegment defines the interface for backlogSegment.
Expand Down Expand Up @@ -361,9 +410,15 @@ func (s *backlogSegment) append(prevMsgIdx uint64, msg *m.BroadcastFeedMessage)
s.messagesLock.Lock()
defer s.messagesLock.Unlock()

prevCumulativeSum := uint64(0)
if len(s.messages) > 0 {
prevCumulativeSum = s.messages[len(s.messages)-1].CumulativeSumMsgSize
}
if expSeqNum := prevMsgIdx + 1; prevMsgIdx == 0 || uint64(msg.SequenceNumber) == expSeqNum {
msg.UpdateCumulativeSumMsgSize(prevCumulativeSum)
s.messages = append(s.messages, msg)
} else if uint64(msg.SequenceNumber) > expSeqNum {
msg.UpdateCumulativeSumMsgSize(prevCumulativeSum)
s.messages = nil
s.messages = append(s.messages, msg)
return fmt.Errorf("new message sequence number (%d) is greater than the expected sequence number (%d): %w", msg.SequenceNumber, expSeqNum, errDropSegments)
Expand Down
10 changes: 10 additions & 0 deletions broadcaster/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ type BroadcastFeedMessage struct {
SequenceNumber arbutil.MessageIndex `json:"sequenceNumber"`
Message arbostypes.MessageWithMetadata `json:"message"`
Signature []byte `json:"signature"`

CumulativeSumMsgSize uint64 `json:"-"`
}

func (m *BroadcastFeedMessage) Size() uint64 {
return uint64(len(m.Signature) + len(m.Message.Message.L2msg) + 160)
}

func (m *BroadcastFeedMessage) UpdateCumulativeSumMsgSize(val uint64) {
m.CumulativeSumMsgSize += val + m.Size()
}

func (m *BroadcastFeedMessage) Hash(chainId uint64) (common.Hash, error) {
Expand Down

0 comments on commit ce28683

Please sign in to comment.