Skip to content

Commit

Permalink
fix log filter context and batching
Browse files Browse the repository at this point in the history
  • Loading branch information
anirudhwarrier committed Nov 14, 2023
1 parent 725e6f5 commit 342dabd
Showing 1 changed file with 37 additions and 43 deletions.
80 changes: 37 additions & 43 deletions integration-tests/load/automationv2_1/automationv2_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
ocr3 "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3confighelper"
ocr2keepers30config "github.com/smartcontractkit/ocr2keepers/pkg/v3/config"
"github.com/smartcontractkit/wasp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"math/big"
"os"
Expand All @@ -35,6 +34,11 @@ import (
"time"
)

const (
StartupWaitTime = 30 * time.Second
StopWaitTime = 60 * time.Second
)

var (
baseTOML = `[Feature]
LogPoller = true
Expand Down Expand Up @@ -125,7 +129,6 @@ var (
numberOfEvents, _ = strconv.Atoi(getEnv("NUMBEROFEVENTS", "1")) // Number of events to emit per trigger
specType = getEnv("SPECTYPE", "minimum") // minimum, recommended, local specs for the test
logLevel = getEnv("LOGLEVEL", "info") // log level for the chainlink nodes
debug, _ = strconv.ParseBool(getEnv("DEBUG", "false"))
)

func TestLogTrigger(t *testing.T) {
Expand All @@ -143,16 +146,16 @@ func TestLogTrigger(t *testing.T) {
networkDetailTOML := `MinIncomingConfirmations = 1`
loadDuration := time.Duration(duration) * time.Second
automationDefaultLinkFunds := big.NewInt(0).Mul(big.NewInt(1e18), big.NewInt(int64(100))) //100 LINK
automationDefaultUpkeepGasLimit := uint32(2500000)
automationDefaultUpkeepGasLimit := uint32(1_000_000)

registrySettings := &contracts.KeeperRegistrySettings{
PaymentPremiumPPB: uint32(0),
FlatFeeMicroLINK: uint32(40000),
FlatFeeMicroLINK: uint32(40_000),
BlockCountPerTurn: big.NewInt(100),
CheckGasLimit: uint32(45_000_000), //45M
StalenessSeconds: big.NewInt(90_000),
GasCeilingMultiplier: uint16(2),
MaxPerformGas: uint32(5000000),
MaxPerformGas: uint32(5_000_000),
MinUpkeepSpend: big.NewInt(0),
FallbackGasPrice: big.NewInt(2e11),
FallbackLinkPrice: big.NewInt(2e18),
Expand Down Expand Up @@ -275,7 +278,7 @@ func TestLogTrigger(t *testing.T) {
offC, err := json.Marshal(ocr2keepers30config.OffchainConfig{
TargetProbability: "0.999",
TargetInRounds: 1,
PerformLockoutWindow: 3600000, // Intentionally set to be higher than in prod for testing purpose
PerformLockoutWindow: 3_600_000, // Intentionally set to be higher than in prod for testing purpose
GasLimitPerReport: 5_300_000,
GasOverheadPerUpkeep: 300_000,
MinConfirmations: 0,
Expand Down Expand Up @@ -428,10 +431,10 @@ func TestLogTrigger(t *testing.T) {
}
l.Info().Msg("Successfully registered all Automation Consumer Contracts")
l.Info().Interface("Upkeep IDs", upkeepIds).Msg("Upkeep IDs")
l.Info().Msg("Waiting 30s for plugin to start")
time.Sleep(time.Second * 30)
l.Info().Str("STARTUP_WAIT_TIME", StartupWaitTime.String()).Msg("Waiting for plugin to start")
time.Sleep(StartupWaitTime)

startingBlock, err := chainClient.LatestBlockNumber(context.Background())
startBlock, err := chainClient.LatestBlockNumber(context.Background())
require.NoError(t, err, "Error getting latest block number")

p := wasp.NewProfile()
Expand All @@ -452,6 +455,7 @@ func TestLogTrigger(t *testing.T) {
l,
numberOfEvents,
),
CallResultBufLen: 1000000,
})
p.Add(g, err)
}
Expand All @@ -462,40 +466,27 @@ func TestLogTrigger(t *testing.T) {
require.NoError(t, err, "Error running load generators")

l.Info().Msg("Finished load generators")
l.Info().Msg("Waiting for upkeeps to be performed")
time.Sleep(time.Second * 60)
l.Info().Str("STOP_WAIT_TIME", StopWaitTime.String()).Msg("Waiting for upkeeps to be performed")
time.Sleep(StopWaitTime)
l.Info().Msg("Finished waiting 60s for upkeeps to be performed")
endTime := time.Now()
testDuration := endTime.Sub(startTime)
l.Info().Dur("Duration", testDuration).Msg("Test Duration")
l.Info().Str("Duration", testDuration.String()).Msg("Test Duration")
endBlock, err := chainClient.LatestBlockNumber(context.Background())
require.NoError(t, err, "Error getting latest block number")
l.Info().Uint64("Starting Block", startingBlock).Uint64("Ending Block", endBlock).Msg("Test Block Range")
l.Info().Uint64("Starting Block", startBlock).Uint64("Ending Block", endBlock).Msg("Test Block Range")

upkeepCounters := make([]int64, 0)
upkeepDelays := make([][]int64, 0)
var numberOfEventsEmitted int
var batchSize = 500

if debug {
for i, consumerContract := range consumerContracts {
count, err := consumerContract.Counter(nil)
require.NoError(t, err, "Error getting counter value")
upkeepCounters = append(upkeepCounters, count.Int64())
l.Debug().
Int("Count", int(count.Int64())).
Int("Number", i+1).
Int("Out Of", numberOfUpkeeps).
Msg("Counter Value")
assert.GreaterOrEqual(
t, count.Int64(), int64(numberOfEvents*duration+1),
fmt.Sprintf("Upkeep %d should have been performed at least %d times", i, numberOfEvents*duration+1))
}
l.Info().Interface("Upkeep Counters", upkeepCounters).Msg("Upkeep Counters")
for _, gen := range p.Generators {
numberOfEventsEmitted += len(gen.GetData().OKData.Data)
}
l.Info().Int("Number of Events Emitted", numberOfEventsEmitted).Msg("Number of Events Emitted")

var batchSize = 100

if endBlock-startingBlock < uint64(batchSize) {
batchSize = int(endBlock - startingBlock)
if endBlock-startBlock < uint64(batchSize) {
batchSize = int(endBlock - startBlock)
}

for cIter, consumerContract := range consumerContracts {
Expand All @@ -504,14 +495,16 @@ func TestLogTrigger(t *testing.T) {
address = common.HexToAddress(consumerContract.Address())
timeout = 5 * time.Second
)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
for fromBlock := startingBlock; fromBlock < endBlock; fromBlock += uint64(batchSize) + 1 {
filterQuery := geth.FilterQuery{
Addresses: []common.Address{address},
FromBlock: big.NewInt(0).SetUint64(fromBlock),
ToBlock: big.NewInt(0).SetUint64(fromBlock + uint64(batchSize)),
Topics: [][]common.Hash{{consumerABI.Events["PerformingUpkeep"].ID}},
}
for fromBlock := startBlock; fromBlock < endBlock; fromBlock += uint64(batchSize) + 1 {
var (
filterQuery = geth.FilterQuery{
Addresses: []common.Address{address},
FromBlock: big.NewInt(0).SetUint64(fromBlock),
ToBlock: big.NewInt(0).SetUint64(fromBlock + uint64(batchSize)),
Topics: [][]common.Hash{{consumerABI.Events["PerformingUpkeep"].ID}},
}
)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
logsInBatch, err := chainClient.FilterLogs(ctx, filterQuery)
cancel()
if err != nil {
Expand Down Expand Up @@ -543,7 +536,7 @@ func TestLogTrigger(t *testing.T) {
upkeepDelays = append(upkeepDelays, delay)
}
if (cIter+1)%batchSize == 0 {
time.Sleep(time.Millisecond * 500)
time.Sleep(time.Second * 1)
}
}

Expand All @@ -563,7 +556,8 @@ func TestLogTrigger(t *testing.T) {

l.Info().
Int("Total Perform Count", len(allUpkeepDelays)).
Int("Total Events Emitted", numberOfEvents*numberOfUpkeeps*duration+numberOfUpkeeps).
Int("Total Events Emitted", numberOfEventsEmitted).
Int("Total Events Missed", numberOfEventsEmitted-len(allUpkeepDelays)).
Msg("Test completed")

t.Cleanup(func() {
Expand Down

0 comments on commit 342dabd

Please sign in to comment.