From 5441b43832ddbb6b905d6cddcbaec12b800cf82b Mon Sep 17 00:00:00 2001 From: ganeshvanahalli Date: Fri, 27 Oct 2023 11:47:15 -0500 Subject: [PATCH] Retain new feed messages when cache is cleared --- arbnode/transaction_streamer.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index bcc389dc01..2ee1526ee9 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -696,6 +696,7 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil var oldMsg *arbostypes.MessageWithMetadata var lastDelayedRead uint64 var hasNewConfirmedMessages bool + var cacheClearLen int messagesAfterPos := messageStartPos + arbutil.MessageIndex(len(messages)) broadcastStartPos := arbutil.MessageIndex(atomic.LoadUint64(&s.broadcasterQueuedMessagesPos)) @@ -724,10 +725,13 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil // Or no active broadcast reorg and broadcast messages start before or immediately after last L1 message if messagesAfterPos >= broadcastStartPos { broadcastSliceIndex := int(messagesAfterPos - broadcastStartPos) + messagesOldLen := len(messages) if broadcastSliceIndex < len(s.broadcasterQueuedMessages) { // Some cached feed messages can be used messages = append(messages, s.broadcasterQueuedMessages[broadcastSliceIndex:]...) } + // This calculation gives the exact length of cache which was appended to messages + cacheClearLen = broadcastSliceIndex + len(messages) - messagesOldLen } // L1 used or replaced broadcast cache items @@ -800,8 +804,14 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil } if clearQueueOnSuccess { - s.broadcasterQueuedMessages = s.broadcasterQueuedMessages[:0] - atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, 0) + // Check if new messages were added at the end of cache, if they were, then dont remove those particular messages + if len(s.broadcasterQueuedMessages) > cacheClearLen { + s.broadcasterQueuedMessages = s.broadcasterQueuedMessages[cacheClearLen:] + atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, uint64(broadcastStartPos)+uint64(cacheClearLen)) + } else { + s.broadcasterQueuedMessages = s.broadcasterQueuedMessages[:0] + atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, 0) + } s.broadcasterQueuedMessagesActiveReorg = false }