Skip to content

Commit

Permalink
subscribe to headers in benchmark test to observe upkeeps
Browse files Browse the repository at this point in the history
  • Loading branch information
Tofel committed Apr 19, 2024
1 parent e429fb9 commit 7e77724
Show file tree
Hide file tree
Showing 3 changed files with 269 additions and 203 deletions.
184 changes: 184 additions & 0 deletions integration-tests/contracts/ethereum_contracts_automation_seth.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package contracts

import (
"context"
"errors"
"fmt"
"math/big"
"strconv"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/smartcontractkit/chainlink-testing-framework/networks"
"github.com/smartcontractkit/chainlink/integration-tests/contracts/ethereum"
eth_contracts "github.com/smartcontractkit/chainlink/integration-tests/contracts/ethereum"
"github.com/smartcontractkit/chainlink/integration-tests/testreporters"
"github.com/smartcontractkit/chainlink/integration-tests/wrappers"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/arbitrum_module"
Expand Down Expand Up @@ -2296,3 +2298,185 @@ func DeployKeeperConsumerBenchmark(client *seth.Client) (AutomationConsumerBench
address: &data.Address,
}, nil
}

// KeeperConsumerBenchmarkUpkeepObserver is a header subscription that awaits for a round of upkeeps
type KeeperConsumerBenchmarkUpkeepObserver struct {
instance AutomationConsumerBenchmark
registry KeeperRegistry
upkeepID *big.Int

firstBlockNum uint64 // Records the number of the first block that came in
lastBlockNum uint64 // Records the number of the last block that came in
blockRange int64 // How many blocks to watch upkeeps for
upkeepSLA int64 // SLA after which an upkeep is counted as 'missed'
metricsReporter *testreporters.KeeperBenchmarkTestReporter // Testreporter to track results
upkeepIndex int64
firstEligibleBuffer int64

// State variables, changes as we get blocks
blocksSinceSubscription int64 // How many blocks have passed since subscribing
blocksSinceEligible int64 // How many blocks have come in since upkeep has been eligible for check
countEligible int64 // Number of times the upkeep became eligible
countMissed int64 // Number of times we missed SLA for performing upkeep
upkeepCount int64 // The count of upkeeps done so far
allCheckDelays []int64 // Tracks the amount of blocks missed before an upkeep since it became eligible
complete bool
l zerolog.Logger
}

// NewKeeperConsumerBenchmarkkUpkeepObserver provides a new instance of a KeeperConsumerBenchmarkkUpkeepObserver
// Used to track and log benchmark test results for keepers
func NewKeeperConsumerBenchmarkkUpkeepObserver(
contract AutomationConsumerBenchmark,
registry KeeperRegistry,
upkeepID *big.Int,
blockRange int64,
upkeepSLA int64,
metricsReporter *testreporters.KeeperBenchmarkTestReporter,
upkeepIndex int64,
firstEligibleBuffer int64,
logger zerolog.Logger,
) *KeeperConsumerBenchmarkUpkeepObserver {
return &KeeperConsumerBenchmarkUpkeepObserver{
instance: contract,
registry: registry,
upkeepID: upkeepID,
blockRange: blockRange,
upkeepSLA: upkeepSLA,
blocksSinceSubscription: 0,
blocksSinceEligible: 0,
upkeepCount: 0,
allCheckDelays: []int64{},
metricsReporter: metricsReporter,
complete: false,
lastBlockNum: 0,
upkeepIndex: upkeepIndex,
firstBlockNum: 0,
firstEligibleBuffer: firstEligibleBuffer,
l: logger,
}
}

// ReceiveHeader will query the latest Keeper round and check to see whether upkeep was performed, it returns
// true when observation has finished.
func (o *KeeperConsumerBenchmarkUpkeepObserver) ReceiveHeader(receivedHeader *types.Header) (bool, error) {
if receivedHeader.Number.Uint64() <= o.lastBlockNum { // Uncle / reorg we won't count
return false, nil
}
if o.firstBlockNum == 0 {
o.firstBlockNum = receivedHeader.Number.Uint64()
}
o.lastBlockNum = receivedHeader.Number.Uint64()
// Increment block counters
o.blocksSinceSubscription++

upkeepCount, err := o.instance.GetUpkeepCount(context.Background(), big.NewInt(o.upkeepIndex))
if err != nil {
return false, err
}

if upkeepCount.Int64() > o.upkeepCount { // A new upkeep was done
if upkeepCount.Int64() != o.upkeepCount+1 {
return false, errors.New("upkeep count increased by more than 1 in a single block")
}
o.l.Info().
Uint64("Block_Number", receivedHeader.Number.Uint64()).
Str("Upkeep_ID", o.upkeepID.String()).
Str("Contract_Address", o.instance.Address()).
Int64("Upkeep_Count", upkeepCount.Int64()).
Int64("Blocks_since_eligible", o.blocksSinceEligible).
Str("Registry_Address", o.registry.Address()).
Msg("Upkeep Performed")

if o.blocksSinceEligible > o.upkeepSLA {
o.l.Warn().
Uint64("Block_Number", receivedHeader.Number.Uint64()).
Str("Upkeep_ID", o.upkeepID.String()).
Str("Contract_Address", o.instance.Address()).
Int64("Blocks_since_eligible", o.blocksSinceEligible).
Str("Registry_Address", o.registry.Address()).
Msg("Upkeep Missed SLA")
o.countMissed++
}

o.allCheckDelays = append(o.allCheckDelays, o.blocksSinceEligible)
o.upkeepCount++
o.blocksSinceEligible = 0
}

isEligible, err := o.instance.CheckEligible(context.Background(), big.NewInt(o.upkeepIndex), big.NewInt(o.blockRange), big.NewInt(o.firstEligibleBuffer))
if err != nil {
return false, err
}
if isEligible {
if o.blocksSinceEligible == 0 {
// First time this upkeep became eligible
o.countEligible++
o.l.Info().
Uint64("Block_Number", receivedHeader.Number.Uint64()).
Str("Upkeep_ID", o.upkeepID.String()).
Str("Contract_Address", o.instance.Address()).
Str("Registry_Address", o.registry.Address()).
Msg("Upkeep Now Eligible")
}
o.blocksSinceEligible++
}

if o.blocksSinceSubscription >= o.blockRange || int64(o.lastBlockNum-o.firstBlockNum) >= o.blockRange {
if o.blocksSinceEligible > 0 {
if o.blocksSinceEligible > o.upkeepSLA {
o.l.Warn().
Uint64("Block_Number", receivedHeader.Number.Uint64()).
Str("Upkeep_ID", o.upkeepID.String()).
Str("Contract_Address", o.instance.Address()).
Int64("Blocks_since_eligible", o.blocksSinceEligible).
Str("Registry_Address", o.registry.Address()).
Msg("Upkeep remained eligible at end of test and missed SLA")
o.countMissed++
} else {
o.l.Info().
Uint64("Block_Number", receivedHeader.Number.Uint64()).
Str("Upkeep_ID", o.upkeepID.String()).
Str("Contract_Address", o.instance.Address()).
Int64("Upkeep_Count", upkeepCount.Int64()).
Int64("Blocks_since_eligible", o.blocksSinceEligible).
Str("Registry_Address", o.registry.Address()).
Msg("Upkeep remained eligible at end of test and was within SLA")
}
o.allCheckDelays = append(o.allCheckDelays, o.blocksSinceEligible)
}

o.l.Info().
Uint64("Block_Number", receivedHeader.Number.Uint64()).
Str("Upkeep_ID", o.upkeepID.String()).
Str("Contract_Address", o.instance.Address()).
Int64("Upkeeps_Performed", upkeepCount.Int64()).
Int64("Total_Blocks_Watched", o.blocksSinceSubscription).
Str("Registry_Address", o.registry.Address()).
Msg("Finished Watching for Upkeeps")

o.complete = true
return true, nil
}
return false, nil
}

// Complete returns whether watching for upkeeps has completed
func (o *KeeperConsumerBenchmarkUpkeepObserver) Complete() bool {
return o.complete
}

// LogDetails logs the results of the benchmark test to testreporter
func (o *KeeperConsumerBenchmarkUpkeepObserver) LogDetails() {
report := testreporters.KeeperBenchmarkTestReport{
ContractAddress: o.instance.Address(),
TotalEligibleCount: o.countEligible,
TotalSLAMissedUpkeeps: o.countMissed,
TotalPerformedUpkeeps: o.upkeepCount,
AllCheckDelays: o.allCheckDelays,
RegistryAddress: o.registry.Address(),
}
o.metricsReporter.ReportMutex.Lock()
o.metricsReporter.Reports = append(o.metricsReporter.Reports, report)
defer o.metricsReporter.ReportMutex.Unlock()
}
20 changes: 10 additions & 10 deletions integration-tests/contracts/ethereum_keeper_contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -1595,8 +1595,8 @@ func (o *KeeperConsumerPerformanceRoundConfirmer) logDetails() {
defer o.metricsReporter.ReportMutex.Unlock()
}

// KeeperConsumerBenchmarkRoundConfirmer is a header subscription that awaits for a round of upkeeps
type KeeperConsumerBenchmarkRoundConfirmer struct {
// LegacyKeeperConsumerBenchmarkRoundConfirmer is a header subscription that awaits for a round of upkeeps
type LegacyKeeperConsumerBenchmarkRoundConfirmer struct {
instance AutomationConsumerBenchmark
registry KeeperRegistry
upkeepID *big.Int
Expand All @@ -1623,9 +1623,9 @@ type KeeperConsumerBenchmarkRoundConfirmer struct {
l zerolog.Logger
}

// NewKeeperConsumerBenchmarkRoundConfirmer provides a new instance of a KeeperConsumerBenchmarkRoundConfirmer
// NewLegacyKeeperConsumerBenchmarkRoundConfirmer provides a new instance of a LegacyKeeperConsumerBenchmarkRoundConfirmer
// Used to track and log benchmark test results for keepers
func NewKeeperConsumerBenchmarkRoundConfirmer(
func NewLegacyKeeperConsumerBenchmarkRoundConfirmer(
contract AutomationConsumerBenchmark,
registry KeeperRegistry,
upkeepID *big.Int,
Expand All @@ -1635,9 +1635,9 @@ func NewKeeperConsumerBenchmarkRoundConfirmer(
upkeepIndex int64,
firstEligibleBuffer int64,
logger zerolog.Logger,
) *KeeperConsumerBenchmarkRoundConfirmer {
) *LegacyKeeperConsumerBenchmarkRoundConfirmer {
ctx, cancelFunc := context.WithCancel(context.Background())
return &KeeperConsumerBenchmarkRoundConfirmer{
return &LegacyKeeperConsumerBenchmarkRoundConfirmer{
instance: contract,
registry: registry,
upkeepID: upkeepID,
Expand All @@ -1661,7 +1661,7 @@ func NewKeeperConsumerBenchmarkRoundConfirmer(
}

// ReceiveHeader will query the latest Keeper round and check to see whether the round has confirmed
func (o *KeeperConsumerBenchmarkRoundConfirmer) ReceiveHeader(receivedHeader blockchain.NodeHeader) error {
func (o *LegacyKeeperConsumerBenchmarkRoundConfirmer) ReceiveHeader(receivedHeader blockchain.NodeHeader) error {
if receivedHeader.Number.Uint64() <= o.lastBlockNum { // Uncle / reorg we won't count
return nil
}
Expand Down Expand Up @@ -1765,7 +1765,7 @@ func (o *KeeperConsumerBenchmarkRoundConfirmer) ReceiveHeader(receivedHeader blo
}

// Wait is a blocking function that will wait until the round has confirmed, and timeout if the deadline has passed
func (o *KeeperConsumerBenchmarkRoundConfirmer) Wait() error {
func (o *LegacyKeeperConsumerBenchmarkRoundConfirmer) Wait() error {
defer func() { o.complete = true }()
for {
select {
Expand All @@ -1779,11 +1779,11 @@ func (o *KeeperConsumerBenchmarkRoundConfirmer) Wait() error {
}
}

func (o *KeeperConsumerBenchmarkRoundConfirmer) Complete() bool {
func (o *LegacyKeeperConsumerBenchmarkRoundConfirmer) Complete() bool {
return o.complete
}

func (o *KeeperConsumerBenchmarkRoundConfirmer) logDetails() {
func (o *LegacyKeeperConsumerBenchmarkRoundConfirmer) logDetails() {
report := testreporters.KeeperBenchmarkTestReport{
ContractAddress: o.instance.Address(),
TotalEligibleCount: o.countEligible,
Expand Down
Loading

0 comments on commit 7e77724

Please sign in to comment.