Skip to content

Commit

Permalink
Update event logic
Browse files Browse the repository at this point in the history
  • Loading branch information
davidcauchi committed Nov 4, 2024
1 parent 0642e39 commit 62bcd56
Showing 1 changed file with 151 additions and 80 deletions.
231 changes: 151 additions & 80 deletions integration-tests/testsetups/ocr.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,13 +618,16 @@ func (o *OCRSoakTest) testLoop(testDuration time.Duration, newValue int) {

ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
// Channel to signal polling to reset round event counter
resetEventCounter := make(chan struct{})
defer close(resetEventCounter)

lastValue := 0
newRoundTrigger := time.NewTimer(0) // Want to trigger a new round ASAP
defer newRoundTrigger.Stop()
o.setFilterQuery()
wg.Add(1)
go o.pollingOCREvents(ctx, endTest, &wg)
go o.pollingOCREvents(ctx, &wg, resetEventCounter)

n := o.Config.GetNetworkConfig()

Expand Down Expand Up @@ -714,7 +717,7 @@ func (o *OCRSoakTest) testLoop(testDuration time.Duration, newValue int) {
os.Exit(interruptedExitCode) // Exit with interrupted code to indicate test was interrupted, not just a normal failure
case <-endTest:
cancel()
wg.Wait()
wg.Wait() // Wait for polling to complete
return
case <-newRoundTrigger.C:
err := o.triggerNewRound(newValue)
Expand All @@ -725,6 +728,8 @@ func (o *OCRSoakTest) testLoop(testDuration time.Duration, newValue int) {
Str("Waiting", timerReset.String()).
Msg("Error triggering new round, waiting and trying again. Possible connection issues with mockserver")
}
// Signal polling to reset event counter
resetEventCounter <- struct{}{}
newRoundTrigger.Reset(timerReset)

// Change value for the next round
Expand Down Expand Up @@ -831,107 +836,170 @@ func (o *OCRSoakTest) setFilterQuery() {
}

// pollingOCREvents Polls the blocks for OCR events and logs them to the test logger
func (o *OCRSoakTest) pollingOCREvents(ctx context.Context, endTest <-chan time.Time, wg *sync.WaitGroup) error {
func (o *OCRSoakTest) pollingOCREvents(ctx context.Context, wg *sync.WaitGroup, resetEventCounter <-chan struct{}) {
defer wg.Done()
// Keep track of the last processed block number
processedBlockNum := o.startingBlockNum - 1
// TODO: Make this configurable
pollInterval := time.Second * 30
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()

// Retrieve expected number of events per round from configuration
expectedEventsPerRound := *o.Config.GetActiveOCRConfig().Common.NumberOfContracts
eventCounter := 0
roundTimeout := o.Config.GetActiveOCRConfig().Soak.TimeBetweenRounds.Duration
timeoutTimer := time.NewTimer(roundTimeout)
round := 0
defer timeoutTimer.Stop()

o.log.Info().Msg("Start Polling for Answer Updated Events")

for {
select {
case <-ctx.Done():
o.log.Info().Msg("Test is done, stopping polling")
return nil
case <-endTest:
o.log.Info().Msg("Test ended, stopping polling")
return nil
case <-ticker.C:
// Get the latest block number to search up to the current block
latestBlock, err := o.seth.Client.BlockNumber(context.Background())
if err != nil {
o.log.Error().Err(err).Msg("Error getting latest block number")
continue
case <-resetEventCounter:
if round != 0 {
if eventCounter == expectedEventsPerRound {
o.log.Info().
Int("Events found", eventCounter).
Int("Events Expected", expectedEventsPerRound).
Msg("All expected events found")
} else if eventCounter < expectedEventsPerRound {
o.log.Warn().
Int("Events found", eventCounter).
Int("Events Expected", expectedEventsPerRound).
Msg("Expected to find more events")
}
}

// Skip if this block has already been checked
if processedBlockNum == latestBlock {
o.log.Debug().
Uint64("Latest Block", latestBlock).
Uint64("Last Processed Block Number", processedBlockNum).
Msg("No new blocks since last poll")
continue
// Reset event counter and timer for new round
eventCounter = 0
// Safely stop and drain the timer if a value is present
if !timeoutTimer.Stop() {
<-timeoutTimer.C
}
// Check if the latest block is behind processedBlockNum due to possible reorgs
if processedBlockNum > latestBlock {
o.log.Error().
Uint64("From Block", processedBlockNum).
Uint64("To Block", latestBlock).
Msg("The latest block is behind the processed block. This could happen due to RPC issues or possibly a reorg")
processedBlockNum = latestBlock
continue
timeoutTimer.Reset(roundTimeout)
o.log.Info().Msg("Polling for new round, event counter reset")
round++
case <-ctx.Done():
o.log.Info().Msg("Test duration ended, finalizing event polling")
timeoutTimer.Reset(roundTimeout)
// Wait until expected events are fetched or until timeout
for eventCounter < expectedEventsPerRound {
select {
case <-timeoutTimer.C:
o.log.Warn().Msg("Timeout reached while waiting for final events")
return
case <-ticker.C:
o.fetchAndProcessEvents(&eventCounter, expectedEventsPerRound, &processedBlockNum)
}
}
o.log.Info().
Int("Events found", eventCounter).
Int("Events Expected", expectedEventsPerRound).
Msg("Stop polling.")
return
case <-ticker.C:
o.fetchAndProcessEvents(&eventCounter, expectedEventsPerRound, &processedBlockNum)
}
}
}

fromBlock := processedBlockNum + 1
// Helper function to poll events and update eventCounter
func (o *OCRSoakTest) fetchAndProcessEvents(eventCounter *int, expectedEvents int, processedBlockNum *uint64) {
latestBlock, err := o.seth.Client.BlockNumber(context.Background())
if err != nil {
o.log.Error().Err(err).Msg("Error getting latest block number")
return
}

// Prepare the filter query with updated block range
o.filterQuery.FromBlock = big.NewInt(0).SetUint64(fromBlock)
o.filterQuery.ToBlock = big.NewInt(0).SetUint64(latestBlock)
if *processedBlockNum == latestBlock {
o.log.Debug().
Uint64("Latest Block", latestBlock).
Uint64("Last Processed Block Number", *processedBlockNum).
Msg("No new blocks since last poll")
return
}

o.log.Debug().
Uint64("From Block", fromBlock).
Uint64("To Block", latestBlock).
Msg("Fetching logs for the specified range")
// Check if the latest block is behind processedBlockNum due to possible reorgs
if *processedBlockNum > latestBlock {
o.log.Error().
Uint64("From Block", *processedBlockNum).
Uint64("To Block", latestBlock).
Msg("The latest block is behind the processed block. This could happen due to RPC issues or possibly a reorg")
*processedBlockNum = latestBlock
return
}

// Fetch logs for the specified range
logs, err := o.seth.Client.FilterLogs(context.Background(), o.filterQuery)
fromBlock := *processedBlockNum + 1
o.filterQuery.FromBlock = big.NewInt(0).SetUint64(fromBlock)
o.filterQuery.ToBlock = big.NewInt(0).SetUint64(latestBlock)

o.log.Debug().
Uint64("From Block", fromBlock).
Uint64("To Block", latestBlock).
Msg("Fetching logs for the specified range")

logs, err := o.seth.Client.FilterLogs(context.Background(), o.filterQuery)
if err != nil {
o.log.Error().Err(err).Msg("Error fetching logs")
return
}

for _, event := range logs {
*eventCounter++
if o.OCRVersion == "1" {
answerUpdated, err := o.ocrV1Instances[0].ParseEventAnswerUpdated(event)
if err != nil {
o.log.Error().Err(err).Msg("Error fetching logs")
o.log.Warn().
Err(err).
Str("Address", event.Address.Hex()).
Uint64("Block Number", event.BlockNumber).
Msg("Error parsing event as AnswerUpdated")
continue
}

// Process the fetched logs
for _, event := range logs {
if o.OCRVersion == "1" {
answerUpdated, err := o.ocrV1Instances[0].ParseEventAnswerUpdated(event)
if err != nil {
o.log.Warn().
Err(err).
Str("Address", event.Address.Hex()).
Uint64("Block Number", event.BlockNumber).
Msg("Error parsing event as AnswerUpdated")
continue
}
o.log.Info().
Str("Address", event.Address.Hex()).
Uint64("Block Number", event.BlockNumber).
Uint64("Round ID", answerUpdated.RoundId.Uint64()).
Int64("Answer", answerUpdated.Current.Int64()).
Msg("Answer Updated Event")
} else if o.OCRVersion == "2" {
answerUpdated, err := o.ocrV2Instances[0].ParseEventAnswerUpdated(event)
if err != nil {
o.log.Warn().
Err(err).
Str("Address", event.Address.Hex()).
Uint64("Block Number", event.BlockNumber).
Msg("Error parsing event as AnswerUpdated")
continue
}
o.log.Info().
Str("Address", event.Address.Hex()).
Uint64("Block Number", event.BlockNumber).
Uint64("Round ID", answerUpdated.RoundId.Uint64()).
Int64("Answer", answerUpdated.Current.Int64()).
Msg("Answer Updated Event")
}
if *eventCounter <= expectedEvents {
o.log.Info().
Str("Address", event.Address.Hex()).
Uint64("Block Number", event.BlockNumber).
Uint64("Round ID", answerUpdated.RoundId.Uint64()).
Int64("Answer", answerUpdated.Current.Int64()).
Msg("Answer Updated Event")
} else {
o.log.Error().
Str("Address", event.Address.Hex()).
Uint64("Block Number", event.BlockNumber).
Uint64("Round ID", answerUpdated.RoundId.Uint64()).
Int64("Answer", answerUpdated.Current.Int64()).
Msg("Excess event detected, beyond expected count")
}
} else if o.OCRVersion == "2" {
answerUpdated, err := o.ocrV2Instances[0].ParseEventAnswerUpdated(event)
if err != nil {
o.log.Warn().
Err(err).
Str("Address", event.Address.Hex()).
Uint64("Block Number", event.BlockNumber).
Msg("Error parsing event as AnswerUpdated")
continue
}
if *eventCounter <= expectedEvents {
o.log.Info().
Str("Address", event.Address.Hex()).
Uint64("Block Number", event.BlockNumber).
Uint64("Round ID", answerUpdated.RoundId.Uint64()).
Int64("Answer", answerUpdated.Current.Int64()).
Msg("Answer Updated Event")
} else {
o.log.Error().
Str("Address", event.Address.Hex()).
Uint64("Block Number", event.BlockNumber).
Uint64("Round ID", answerUpdated.RoundId.Uint64()).
Int64("Answer", answerUpdated.Current.Int64()).
Msg("Excess event detected, beyond expected count")
}

processedBlockNum = latestBlock
}
}
*processedBlockNum = latestBlock
}

// triggers a new OCR round by setting a new mock adapter value
Expand Down Expand Up @@ -980,6 +1048,9 @@ func (o *OCRSoakTest) collectEvents() error {
o.ocrRoundStates[len(o.ocrRoundStates)-1].EndTime = start // Set end time for last expected event
o.log.Info().Msg("Collecting on-chain events")

// Set from block to be starting block before filtering
o.filterQuery.FromBlock = big.NewInt(0).SetUint64(o.startingBlockNum)

// We must retrieve the events, use exponential backoff for timeout to retry
timeout := time.Second * 15
o.log.Info().Interface("Filter Query", o.filterQuery).Str("Timeout", timeout.String()).Msg("Retrieving on-chain events")
Expand Down

0 comments on commit 62bcd56

Please sign in to comment.