Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
davidcauchi committed Oct 29, 2024
1 parent 8767282 commit accbfb1
Showing 1 changed file with 17 additions and 81 deletions.
98 changes: 17 additions & 81 deletions integration-tests/testsetups/ocr.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

geth "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/pelletier/go-toml/v2"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -619,7 +618,7 @@ func (o *OCRSoakTest) testLoop(testDuration time.Duration, newValue int) {
newRoundTrigger := time.NewTimer(0) // Want to trigger a new round ASAP
defer newRoundTrigger.Stop()
o.setFilterQuery()
err := o.observeOCREventsPolling(endTest)
err := o.pollingOCREvents(endTest)
require.NoError(o.t, err, "Error setting up polling for OCR events")

n := o.Config.GetNetworkConfig()
Expand Down Expand Up @@ -824,90 +823,18 @@ func (o *OCRSoakTest) setFilterQuery() {
Msg("Filter Query Set")
}

// observeOCREvents subscribes to OCR events and logs them to the test logger
// WARNING: Should only be used for observation and logging. This is not a reliable way to collect events.
func (o *OCRSoakTest) observeOCREvents() error {
eventLogs := make(chan types.Log)
ctx, cancel := context.WithTimeout(testcontext.Get(o.t), 5*time.Second)
eventSub, err := o.seth.Client.SubscribeFilterLogs(ctx, o.filterQuery, eventLogs)
cancel()
if err != nil {
return err
}

go func() {
for {
select {
case event := <-eventLogs:
if o.OCRVersion == "1" {
answerUpdated, err := o.ocrV1Instances[0].ParseEventAnswerUpdated(event)
if err != nil {
o.log.Warn().
Err(err).
Str("Address", event.Address.Hex()).
Uint64("Block Number", event.BlockNumber).
Msg("Error parsing event as AnswerUpdated")
continue
}
o.log.Info().
Str("Address", event.Address.Hex()).
Uint64("Block Number", event.BlockNumber).
Uint64("Round ID", answerUpdated.RoundId.Uint64()).
Int64("Answer", answerUpdated.Current.Int64()).
Msg("Answer Updated Event")
} else if o.OCRVersion == "2" {
answerUpdated, err := o.ocrV2Instances[0].ParseEventAnswerUpdated(event)
if err != nil {
o.log.Warn().
Err(err).
Str("Address", event.Address.Hex()).
Uint64("Block Number", event.BlockNumber).
Msg("Error parsing event as AnswerUpdated")
continue
}
o.log.Info().
Str("Address", event.Address.Hex()).
Uint64("Block Number", event.BlockNumber).
Uint64("Round ID", answerUpdated.RoundId.Uint64()).
Int64("Answer", answerUpdated.Current.Int64()).
Msg("Answer Updated Event")
}
case err = <-eventSub.Err():
backoff := time.Second
for err != nil {
o.log.Info().
Err(err).
Str("Backoff", backoff.String()).
Interface("Query", o.filterQuery).
Msg("Error while subscribed to OCR Logs. Resubscribing")
ctx, cancel = context.WithTimeout(testcontext.Get(o.t), backoff)
eventSub, err = o.seth.Client.SubscribeFilterLogs(ctx, o.filterQuery, eventLogs)
cancel()
if err != nil {
time.Sleep(backoff)
backoff = time.Duration(math.Min(float64(backoff)*2, float64(30*time.Second)))
}
}
}
}
}()

return nil
}

func (o *OCRSoakTest) observeOCREventsPolling(endTest <-chan time.Time) error {
// pollingOCREvents Polls the blocks for OCR events and logs them to the test logger
func (o *OCRSoakTest) pollingOCREvents(endTest <-chan time.Time) error {
// Keep track of the last processed block number
startingBlockNum := o.startingBlockNum
// Initialized to an invalid block number (max value of uint64)
lastCheckedBlockNum := ^uint64(0)
// Polling interval can be customized in the test config, defaulting to 30 seconds
pollInterval := time.Second * 30
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()

// Start polling in a separate goroutine
go func() {
o.log.Info().Msg("Start Polling for OCR Events")
pollInterval := time.Second * 30
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()
o.log.Info().Msg("Start Polling for Answer Updated Events")
for {
select {
case <-endTest:
Expand All @@ -929,12 +856,21 @@ func (o *OCRSoakTest) observeOCREventsPolling(endTest <-chan time.Time) error {
Msg("No new blocks since last poll")
continue
}
// Check if the latest block is behind the starting block just in case
if startingBlockNum > latestBlock {
o.log.Error().
Uint64("From Block", startingBlockNum).
Uint64("To Block", latestBlock).
Msg("The latest block is behind the starting block. This could happen due to RPC issues or possibly a reorg")
// May need to `startingBlockNum = latestBlock` if this happens frequently due to reorgs
continue
}

// Prepare the filter query with updated block range
o.filterQuery.FromBlock = big.NewInt(0).SetUint64(startingBlockNum)
o.filterQuery.ToBlock = big.NewInt(0).SetUint64(latestBlock)

o.log.Info().
o.log.Debug().
Uint64("From Block", startingBlockNum).
Uint64("To Block", latestBlock).
Msg("Fetching logs for the specified range")
Expand Down

0 comments on commit accbfb1

Please sign in to comment.