From accbfb119144ed8e936d82096b6adeef309dcd70 Mon Sep 17 00:00:00 2001 From: davidcauchi Date: Tue, 29 Oct 2024 11:46:32 +0100 Subject: [PATCH] Update --- integration-tests/testsetups/ocr.go | 98 +++++------------------------ 1 file changed, 17 insertions(+), 81 deletions(-) diff --git a/integration-tests/testsetups/ocr.go b/integration-tests/testsetups/ocr.go index 6ad0485bfb9..1b2b8ba6784 100644 --- a/integration-tests/testsetups/ocr.go +++ b/integration-tests/testsetups/ocr.go @@ -20,7 +20,6 @@ import ( geth "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" "github.com/pelletier/go-toml/v2" "github.com/rs/zerolog" "github.com/stretchr/testify/require" @@ -619,7 +618,7 @@ func (o *OCRSoakTest) testLoop(testDuration time.Duration, newValue int) { newRoundTrigger := time.NewTimer(0) // Want to trigger a new round ASAP defer newRoundTrigger.Stop() o.setFilterQuery() - err := o.observeOCREventsPolling(endTest) + err := o.pollingOCREvents(endTest) require.NoError(o.t, err, "Error setting up polling for OCR events") n := o.Config.GetNetworkConfig() @@ -824,90 +823,18 @@ func (o *OCRSoakTest) setFilterQuery() { Msg("Filter Query Set") } -// observeOCREvents subscribes to OCR events and logs them to the test logger -// WARNING: Should only be used for observation and logging. This is not a reliable way to collect events. -func (o *OCRSoakTest) observeOCREvents() error { - eventLogs := make(chan types.Log) - ctx, cancel := context.WithTimeout(testcontext.Get(o.t), 5*time.Second) - eventSub, err := o.seth.Client.SubscribeFilterLogs(ctx, o.filterQuery, eventLogs) - cancel() - if err != nil { - return err - } - - go func() { - for { - select { - case event := <-eventLogs: - if o.OCRVersion == "1" { - answerUpdated, err := o.ocrV1Instances[0].ParseEventAnswerUpdated(event) - if err != nil { - o.log.Warn(). - Err(err). - Str("Address", event.Address.Hex()). - Uint64("Block Number", event.BlockNumber). - Msg("Error parsing event as AnswerUpdated") - continue - } - o.log.Info(). - Str("Address", event.Address.Hex()). - Uint64("Block Number", event.BlockNumber). - Uint64("Round ID", answerUpdated.RoundId.Uint64()). - Int64("Answer", answerUpdated.Current.Int64()). - Msg("Answer Updated Event") - } else if o.OCRVersion == "2" { - answerUpdated, err := o.ocrV2Instances[0].ParseEventAnswerUpdated(event) - if err != nil { - o.log.Warn(). - Err(err). - Str("Address", event.Address.Hex()). - Uint64("Block Number", event.BlockNumber). - Msg("Error parsing event as AnswerUpdated") - continue - } - o.log.Info(). - Str("Address", event.Address.Hex()). - Uint64("Block Number", event.BlockNumber). - Uint64("Round ID", answerUpdated.RoundId.Uint64()). - Int64("Answer", answerUpdated.Current.Int64()). - Msg("Answer Updated Event") - } - case err = <-eventSub.Err(): - backoff := time.Second - for err != nil { - o.log.Info(). - Err(err). - Str("Backoff", backoff.String()). - Interface("Query", o.filterQuery). - Msg("Error while subscribed to OCR Logs. Resubscribing") - ctx, cancel = context.WithTimeout(testcontext.Get(o.t), backoff) - eventSub, err = o.seth.Client.SubscribeFilterLogs(ctx, o.filterQuery, eventLogs) - cancel() - if err != nil { - time.Sleep(backoff) - backoff = time.Duration(math.Min(float64(backoff)*2, float64(30*time.Second))) - } - } - } - } - }() - - return nil -} - -func (o *OCRSoakTest) observeOCREventsPolling(endTest <-chan time.Time) error { +// pollingOCREvents Polls the blocks for OCR events and logs them to the test logger +func (o *OCRSoakTest) pollingOCREvents(endTest <-chan time.Time) error { // Keep track of the last processed block number startingBlockNum := o.startingBlockNum // Initialized to an invalid block number (max value of uint64) lastCheckedBlockNum := ^uint64(0) - // Polling interval can be customized in the test config, defaulting to 30 seconds - pollInterval := time.Second * 30 - ticker := time.NewTicker(pollInterval) - defer ticker.Stop() - // Start polling in a separate goroutine go func() { - o.log.Info().Msg("Start Polling for OCR Events") + pollInterval := time.Second * 30 + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + o.log.Info().Msg("Start Polling for Answer Updated Events") for { select { case <-endTest: @@ -929,12 +856,21 @@ func (o *OCRSoakTest) observeOCREventsPolling(endTest <-chan time.Time) error { Msg("No new blocks since last poll") continue } + // Check if the latest block is behind the starting block just in case + if startingBlockNum > latestBlock { + o.log.Error(). + Uint64("From Block", startingBlockNum). + Uint64("To Block", latestBlock). + Msg("The latest block is behind the starting block. This could happen due to RPC issues or possibly a reorg") + // May need to `startingBlockNum = latestBlock` if this happens frequently due to reorgs + continue + } // Prepare the filter query with updated block range o.filterQuery.FromBlock = big.NewInt(0).SetUint64(startingBlockNum) o.filterQuery.ToBlock = big.NewInt(0).SetUint64(latestBlock) - o.log.Info(). + o.log.Debug(). Uint64("From Block", startingBlockNum). Uint64("To Block", latestBlock). Msg("Fetching logs for the specified range")