From 342dabdac38d8d10d87093d2c599a6c964d58e78 Mon Sep 17 00:00:00 2001 From: anirudhwarrier <12178754+anirudhwarrier@users.noreply.github.com> Date: Wed, 15 Nov 2023 01:09:40 +0400 Subject: [PATCH] fix log filter context and batching --- .../automationv2_1/automationv2_1_test.go | 80 +++++++++---------- 1 file changed, 37 insertions(+), 43 deletions(-) diff --git a/integration-tests/load/automationv2_1/automationv2_1_test.go b/integration-tests/load/automationv2_1/automationv2_1_test.go index 04099c427d4..865e4d699dd 100644 --- a/integration-tests/load/automationv2_1/automationv2_1_test.go +++ b/integration-tests/load/automationv2_1/automationv2_1_test.go @@ -25,7 +25,6 @@ import ( ocr3 "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3confighelper" ocr2keepers30config "github.com/smartcontractkit/ocr2keepers/pkg/v3/config" "github.com/smartcontractkit/wasp" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "math/big" "os" @@ -35,6 +34,11 @@ import ( "time" ) +const ( + StartupWaitTime = 30 * time.Second + StopWaitTime = 60 * time.Second +) + var ( baseTOML = `[Feature] LogPoller = true @@ -125,7 +129,6 @@ var ( numberOfEvents, _ = strconv.Atoi(getEnv("NUMBEROFEVENTS", "1")) // Number of events to emit per trigger specType = getEnv("SPECTYPE", "minimum") // minimum, recommended, local specs for the test logLevel = getEnv("LOGLEVEL", "info") // log level for the chainlink nodes - debug, _ = strconv.ParseBool(getEnv("DEBUG", "false")) ) func TestLogTrigger(t *testing.T) { @@ -143,16 +146,16 @@ func TestLogTrigger(t *testing.T) { networkDetailTOML := `MinIncomingConfirmations = 1` loadDuration := time.Duration(duration) * time.Second automationDefaultLinkFunds := big.NewInt(0).Mul(big.NewInt(1e18), big.NewInt(int64(100))) //100 LINK - automationDefaultUpkeepGasLimit := uint32(2500000) + automationDefaultUpkeepGasLimit := uint32(1_000_000) registrySettings := &contracts.KeeperRegistrySettings{ PaymentPremiumPPB: uint32(0), - FlatFeeMicroLINK: uint32(40000), + FlatFeeMicroLINK: uint32(40_000), BlockCountPerTurn: big.NewInt(100), CheckGasLimit: uint32(45_000_000), //45M StalenessSeconds: big.NewInt(90_000), GasCeilingMultiplier: uint16(2), - MaxPerformGas: uint32(5000000), + MaxPerformGas: uint32(5_000_000), MinUpkeepSpend: big.NewInt(0), FallbackGasPrice: big.NewInt(2e11), FallbackLinkPrice: big.NewInt(2e18), @@ -275,7 +278,7 @@ func TestLogTrigger(t *testing.T) { offC, err := json.Marshal(ocr2keepers30config.OffchainConfig{ TargetProbability: "0.999", TargetInRounds: 1, - PerformLockoutWindow: 3600000, // Intentionally set to be higher than in prod for testing purpose + PerformLockoutWindow: 3_600_000, // Intentionally set to be higher than in prod for testing purpose GasLimitPerReport: 5_300_000, GasOverheadPerUpkeep: 300_000, MinConfirmations: 0, @@ -428,10 +431,10 @@ func TestLogTrigger(t *testing.T) { } l.Info().Msg("Successfully registered all Automation Consumer Contracts") l.Info().Interface("Upkeep IDs", upkeepIds).Msg("Upkeep IDs") - l.Info().Msg("Waiting 30s for plugin to start") - time.Sleep(time.Second * 30) + l.Info().Str("STARTUP_WAIT_TIME", StartupWaitTime.String()).Msg("Waiting for plugin to start") + time.Sleep(StartupWaitTime) - startingBlock, err := chainClient.LatestBlockNumber(context.Background()) + startBlock, err := chainClient.LatestBlockNumber(context.Background()) require.NoError(t, err, "Error getting latest block number") p := wasp.NewProfile() @@ -452,6 +455,7 @@ func TestLogTrigger(t *testing.T) { l, numberOfEvents, ), + CallResultBufLen: 1000000, }) p.Add(g, err) } @@ -462,40 +466,27 @@ func TestLogTrigger(t *testing.T) { require.NoError(t, err, "Error running load generators") l.Info().Msg("Finished load generators") - l.Info().Msg("Waiting for upkeeps to be performed") - time.Sleep(time.Second * 60) + l.Info().Str("STOP_WAIT_TIME", StopWaitTime.String()).Msg("Waiting for upkeeps to be performed") + time.Sleep(StopWaitTime) l.Info().Msg("Finished waiting 60s for upkeeps to be performed") endTime := time.Now() testDuration := endTime.Sub(startTime) - l.Info().Dur("Duration", testDuration).Msg("Test Duration") + l.Info().Str("Duration", testDuration.String()).Msg("Test Duration") endBlock, err := chainClient.LatestBlockNumber(context.Background()) require.NoError(t, err, "Error getting latest block number") - l.Info().Uint64("Starting Block", startingBlock).Uint64("Ending Block", endBlock).Msg("Test Block Range") + l.Info().Uint64("Starting Block", startBlock).Uint64("Ending Block", endBlock).Msg("Test Block Range") - upkeepCounters := make([]int64, 0) upkeepDelays := make([][]int64, 0) + var numberOfEventsEmitted int + var batchSize = 500 - if debug { - for i, consumerContract := range consumerContracts { - count, err := consumerContract.Counter(nil) - require.NoError(t, err, "Error getting counter value") - upkeepCounters = append(upkeepCounters, count.Int64()) - l.Debug(). - Int("Count", int(count.Int64())). - Int("Number", i+1). - Int("Out Of", numberOfUpkeeps). - Msg("Counter Value") - assert.GreaterOrEqual( - t, count.Int64(), int64(numberOfEvents*duration+1), - fmt.Sprintf("Upkeep %d should have been performed at least %d times", i, numberOfEvents*duration+1)) - } - l.Info().Interface("Upkeep Counters", upkeepCounters).Msg("Upkeep Counters") + for _, gen := range p.Generators { + numberOfEventsEmitted += len(gen.GetData().OKData.Data) } + l.Info().Int("Number of Events Emitted", numberOfEventsEmitted).Msg("Number of Events Emitted") - var batchSize = 100 - - if endBlock-startingBlock < uint64(batchSize) { - batchSize = int(endBlock - startingBlock) + if endBlock-startBlock < uint64(batchSize) { + batchSize = int(endBlock - startBlock) } for cIter, consumerContract := range consumerContracts { @@ -504,14 +495,16 @@ func TestLogTrigger(t *testing.T) { address = common.HexToAddress(consumerContract.Address()) timeout = 5 * time.Second ) - ctx, cancel := context.WithTimeout(context.Background(), timeout) - for fromBlock := startingBlock; fromBlock < endBlock; fromBlock += uint64(batchSize) + 1 { - filterQuery := geth.FilterQuery{ - Addresses: []common.Address{address}, - FromBlock: big.NewInt(0).SetUint64(fromBlock), - ToBlock: big.NewInt(0).SetUint64(fromBlock + uint64(batchSize)), - Topics: [][]common.Hash{{consumerABI.Events["PerformingUpkeep"].ID}}, - } + for fromBlock := startBlock; fromBlock < endBlock; fromBlock += uint64(batchSize) + 1 { + var ( + filterQuery = geth.FilterQuery{ + Addresses: []common.Address{address}, + FromBlock: big.NewInt(0).SetUint64(fromBlock), + ToBlock: big.NewInt(0).SetUint64(fromBlock + uint64(batchSize)), + Topics: [][]common.Hash{{consumerABI.Events["PerformingUpkeep"].ID}}, + } + ) + ctx, cancel := context.WithTimeout(context.Background(), timeout) logsInBatch, err := chainClient.FilterLogs(ctx, filterQuery) cancel() if err != nil { @@ -543,7 +536,7 @@ func TestLogTrigger(t *testing.T) { upkeepDelays = append(upkeepDelays, delay) } if (cIter+1)%batchSize == 0 { - time.Sleep(time.Millisecond * 500) + time.Sleep(time.Second * 1) } } @@ -563,7 +556,8 @@ func TestLogTrigger(t *testing.T) { l.Info(). Int("Total Perform Count", len(allUpkeepDelays)). - Int("Total Events Emitted", numberOfEvents*numberOfUpkeeps*duration+numberOfUpkeeps). + Int("Total Events Emitted", numberOfEventsEmitted). + Int("Total Events Missed", numberOfEventsEmitted-len(allUpkeepDelays)). Msg("Test completed") t.Cleanup(func() {