From 62bcd56e2d34498f0387f7653f6d2289e8fbda30 Mon Sep 17 00:00:00 2001 From: davidcauchi Date: Mon, 4 Nov 2024 18:27:23 +0100 Subject: [PATCH] Update event logic --- integration-tests/testsetups/ocr.go | 231 ++++++++++++++++++---------- 1 file changed, 151 insertions(+), 80 deletions(-) diff --git a/integration-tests/testsetups/ocr.go b/integration-tests/testsetups/ocr.go index 5f7e4ddf8a8..259cf6bfec3 100644 --- a/integration-tests/testsetups/ocr.go +++ b/integration-tests/testsetups/ocr.go @@ -618,13 +618,16 @@ func (o *OCRSoakTest) testLoop(testDuration time.Duration, newValue int) { ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup + // Channel to signal polling to reset round event counter + resetEventCounter := make(chan struct{}) + defer close(resetEventCounter) lastValue := 0 newRoundTrigger := time.NewTimer(0) // Want to trigger a new round ASAP defer newRoundTrigger.Stop() o.setFilterQuery() wg.Add(1) - go o.pollingOCREvents(ctx, endTest, &wg) + go o.pollingOCREvents(ctx, &wg, resetEventCounter) n := o.Config.GetNetworkConfig() @@ -714,7 +717,7 @@ func (o *OCRSoakTest) testLoop(testDuration time.Duration, newValue int) { os.Exit(interruptedExitCode) // Exit with interrupted code to indicate test was interrupted, not just a normal failure case <-endTest: cancel() - wg.Wait() + wg.Wait() // Wait for polling to complete return case <-newRoundTrigger.C: err := o.triggerNewRound(newValue) @@ -725,6 +728,8 @@ func (o *OCRSoakTest) testLoop(testDuration time.Duration, newValue int) { Str("Waiting", timerReset.String()). Msg("Error triggering new round, waiting and trying again. Possible connection issues with mockserver") } + // Signal polling to reset event counter + resetEventCounter <- struct{}{} newRoundTrigger.Reset(timerReset) // Change value for the next round @@ -831,7 +836,7 @@ func (o *OCRSoakTest) setFilterQuery() { } // pollingOCREvents Polls the blocks for OCR events and logs them to the test logger -func (o *OCRSoakTest) pollingOCREvents(ctx context.Context, endTest <-chan time.Time, wg *sync.WaitGroup) error { +func (o *OCRSoakTest) pollingOCREvents(ctx context.Context, wg *sync.WaitGroup, resetEventCounter <-chan struct{}) { defer wg.Done() // Keep track of the last processed block number processedBlockNum := o.startingBlockNum - 1 @@ -839,99 +844,162 @@ func (o *OCRSoakTest) pollingOCREvents(ctx context.Context, endTest <-chan time. pollInterval := time.Second * 30 ticker := time.NewTicker(pollInterval) defer ticker.Stop() + + // Retrieve expected number of events per round from configuration + expectedEventsPerRound := *o.Config.GetActiveOCRConfig().Common.NumberOfContracts + eventCounter := 0 + roundTimeout := o.Config.GetActiveOCRConfig().Soak.TimeBetweenRounds.Duration + timeoutTimer := time.NewTimer(roundTimeout) + round := 0 + defer timeoutTimer.Stop() + o.log.Info().Msg("Start Polling for Answer Updated Events") + for { select { - case <-ctx.Done(): - o.log.Info().Msg("Test is done, stopping polling") - return nil - case <-endTest: - o.log.Info().Msg("Test ended, stopping polling") - return nil - case <-ticker.C: - // Get the latest block number to search up to the current block - latestBlock, err := o.seth.Client.BlockNumber(context.Background()) - if err != nil { - o.log.Error().Err(err).Msg("Error getting latest block number") - continue + case <-resetEventCounter: + if round != 0 { + if eventCounter == expectedEventsPerRound { + o.log.Info(). + Int("Events found", eventCounter). + Int("Events Expected", expectedEventsPerRound). + Msg("All expected events found") + } else if eventCounter < expectedEventsPerRound { + o.log.Warn(). + Int("Events found", eventCounter). + Int("Events Expected", expectedEventsPerRound). + Msg("Expected to find more events") + } } - - // Skip if this block has already been checked - if processedBlockNum == latestBlock { - o.log.Debug(). - Uint64("Latest Block", latestBlock). - Uint64("Last Processed Block Number", processedBlockNum). - Msg("No new blocks since last poll") - continue + // Reset event counter and timer for new round + eventCounter = 0 + // Safely stop and drain the timer if a value is present + if !timeoutTimer.Stop() { + <-timeoutTimer.C } - // Check if the latest block is behind processedBlockNum due to possible reorgs - if processedBlockNum > latestBlock { - o.log.Error(). - Uint64("From Block", processedBlockNum). - Uint64("To Block", latestBlock). - Msg("The latest block is behind the processed block. This could happen due to RPC issues or possibly a reorg") - processedBlockNum = latestBlock - continue + timeoutTimer.Reset(roundTimeout) + o.log.Info().Msg("Polling for new round, event counter reset") + round++ + case <-ctx.Done(): + o.log.Info().Msg("Test duration ended, finalizing event polling") + timeoutTimer.Reset(roundTimeout) + // Wait until expected events are fetched or until timeout + for eventCounter < expectedEventsPerRound { + select { + case <-timeoutTimer.C: + o.log.Warn().Msg("Timeout reached while waiting for final events") + return + case <-ticker.C: + o.fetchAndProcessEvents(&eventCounter, expectedEventsPerRound, &processedBlockNum) + } } + o.log.Info(). + Int("Events found", eventCounter). + Int("Events Expected", expectedEventsPerRound). + Msg("Stop polling.") + return + case <-ticker.C: + o.fetchAndProcessEvents(&eventCounter, expectedEventsPerRound, &processedBlockNum) + } + } +} - fromBlock := processedBlockNum + 1 +// Helper function to poll events and update eventCounter +func (o *OCRSoakTest) fetchAndProcessEvents(eventCounter *int, expectedEvents int, processedBlockNum *uint64) { + latestBlock, err := o.seth.Client.BlockNumber(context.Background()) + if err != nil { + o.log.Error().Err(err).Msg("Error getting latest block number") + return + } - // Prepare the filter query with updated block range - o.filterQuery.FromBlock = big.NewInt(0).SetUint64(fromBlock) - o.filterQuery.ToBlock = big.NewInt(0).SetUint64(latestBlock) + if *processedBlockNum == latestBlock { + o.log.Debug(). + Uint64("Latest Block", latestBlock). + Uint64("Last Processed Block Number", *processedBlockNum). + Msg("No new blocks since last poll") + return + } - o.log.Debug(). - Uint64("From Block", fromBlock). - Uint64("To Block", latestBlock). - Msg("Fetching logs for the specified range") + // Check if the latest block is behind processedBlockNum due to possible reorgs + if *processedBlockNum > latestBlock { + o.log.Error(). + Uint64("From Block", *processedBlockNum). + Uint64("To Block", latestBlock). + Msg("The latest block is behind the processed block. This could happen due to RPC issues or possibly a reorg") + *processedBlockNum = latestBlock + return + } - // Fetch logs for the specified range - logs, err := o.seth.Client.FilterLogs(context.Background(), o.filterQuery) + fromBlock := *processedBlockNum + 1 + o.filterQuery.FromBlock = big.NewInt(0).SetUint64(fromBlock) + o.filterQuery.ToBlock = big.NewInt(0).SetUint64(latestBlock) + + o.log.Debug(). + Uint64("From Block", fromBlock). + Uint64("To Block", latestBlock). + Msg("Fetching logs for the specified range") + + logs, err := o.seth.Client.FilterLogs(context.Background(), o.filterQuery) + if err != nil { + o.log.Error().Err(err).Msg("Error fetching logs") + return + } + + for _, event := range logs { + *eventCounter++ + if o.OCRVersion == "1" { + answerUpdated, err := o.ocrV1Instances[0].ParseEventAnswerUpdated(event) if err != nil { - o.log.Error().Err(err).Msg("Error fetching logs") + o.log.Warn(). + Err(err). + Str("Address", event.Address.Hex()). + Uint64("Block Number", event.BlockNumber). + Msg("Error parsing event as AnswerUpdated") continue } - - // Process the fetched logs - for _, event := range logs { - if o.OCRVersion == "1" { - answerUpdated, err := o.ocrV1Instances[0].ParseEventAnswerUpdated(event) - if err != nil { - o.log.Warn(). - Err(err). - Str("Address", event.Address.Hex()). - Uint64("Block Number", event.BlockNumber). - Msg("Error parsing event as AnswerUpdated") - continue - } - o.log.Info(). - Str("Address", event.Address.Hex()). - Uint64("Block Number", event.BlockNumber). - Uint64("Round ID", answerUpdated.RoundId.Uint64()). - Int64("Answer", answerUpdated.Current.Int64()). - Msg("Answer Updated Event") - } else if o.OCRVersion == "2" { - answerUpdated, err := o.ocrV2Instances[0].ParseEventAnswerUpdated(event) - if err != nil { - o.log.Warn(). - Err(err). - Str("Address", event.Address.Hex()). - Uint64("Block Number", event.BlockNumber). - Msg("Error parsing event as AnswerUpdated") - continue - } - o.log.Info(). - Str("Address", event.Address.Hex()). - Uint64("Block Number", event.BlockNumber). - Uint64("Round ID", answerUpdated.RoundId.Uint64()). - Int64("Answer", answerUpdated.Current.Int64()). - Msg("Answer Updated Event") - } + if *eventCounter <= expectedEvents { + o.log.Info(). + Str("Address", event.Address.Hex()). + Uint64("Block Number", event.BlockNumber). + Uint64("Round ID", answerUpdated.RoundId.Uint64()). + Int64("Answer", answerUpdated.Current.Int64()). + Msg("Answer Updated Event") + } else { + o.log.Error(). + Str("Address", event.Address.Hex()). + Uint64("Block Number", event.BlockNumber). + Uint64("Round ID", answerUpdated.RoundId.Uint64()). + Int64("Answer", answerUpdated.Current.Int64()). + Msg("Excess event detected, beyond expected count") + } + } else if o.OCRVersion == "2" { + answerUpdated, err := o.ocrV2Instances[0].ParseEventAnswerUpdated(event) + if err != nil { + o.log.Warn(). + Err(err). + Str("Address", event.Address.Hex()). + Uint64("Block Number", event.BlockNumber). + Msg("Error parsing event as AnswerUpdated") + continue + } + if *eventCounter <= expectedEvents { + o.log.Info(). + Str("Address", event.Address.Hex()). + Uint64("Block Number", event.BlockNumber). + Uint64("Round ID", answerUpdated.RoundId.Uint64()). + Int64("Answer", answerUpdated.Current.Int64()). + Msg("Answer Updated Event") + } else { + o.log.Error(). + Str("Address", event.Address.Hex()). + Uint64("Block Number", event.BlockNumber). + Uint64("Round ID", answerUpdated.RoundId.Uint64()). + Int64("Answer", answerUpdated.Current.Int64()). + Msg("Excess event detected, beyond expected count") } - - processedBlockNum = latestBlock } } + *processedBlockNum = latestBlock } // triggers a new OCR round by setting a new mock adapter value @@ -980,6 +1048,9 @@ func (o *OCRSoakTest) collectEvents() error { o.ocrRoundStates[len(o.ocrRoundStates)-1].EndTime = start // Set end time for last expected event o.log.Info().Msg("Collecting on-chain events") + // Set from block to be starting block before filtering + o.filterQuery.FromBlock = big.NewInt(0).SetUint64(o.startingBlockNum) + // We must retrieve the events, use exponential backoff for timeout to retry timeout := time.Second * 15 o.log.Info().Interface("Filter Query", o.filterQuery).Str("Timeout", timeout.String()).Msg("Retrieving on-chain events")