Skip to content

Commit

Permalink
fix track config delay in ocr soak
Browse files Browse the repository at this point in the history
  • Loading branch information
davidcauchi committed Nov 7, 2024
1 parent 340a6bf commit 6961ab1
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 90 deletions.
40 changes: 39 additions & 1 deletion integration-tests/actions/ocr2_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"crypto/ed25519"
"encoding/hex"
"fmt"
"net/http"
"strings"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/google/uuid"
"github.com/lib/pq"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"golang.org/x/sync/errgroup"
"gopkg.in/guregu/null.v4"
Expand Down Expand Up @@ -192,6 +194,7 @@ func CreateOCRv2Jobs(
mockServerValue int, // Value to get from the mock server when querying the path
chainId int64, // EVM chain ID
forwardingAllowed bool,
l zerolog.Logger,
) error {
// Collect P2P ID
bootstrapP2PIds, err := bootstrapNode.MustReadP2PKeys()
Expand All @@ -218,6 +221,9 @@ func CreateOCRv2Jobs(
}
}

// Initialize map to store job IDs for each chainlink node
jobIDs := make(map[*nodeclient.ChainlinkK8sClient][]string)

for _, ocrInstance := range ocrInstances {
bootstrapSpec := &nodeclient.OCR2TaskJobSpec{
Name: fmt.Sprintf("ocr2-bootstrap-%s", ocrInstance.Address()),
Expand Down Expand Up @@ -284,10 +290,42 @@ func CreateOCRv2Jobs(
P2PV2Bootstrappers: pq.StringArray{p2pV2Bootstrapper}, // bootstrap node key and address <p2p-key>@bootstrap:6690
},
}
_, err = chainlinkNode.MustCreateJob(ocrSpec)
var ocrJob *nodeclient.Job
ocrJob, err = chainlinkNode.MustCreateJob(ocrSpec)
if err != nil {
return fmt.Errorf("creating OCR task job on OCR node have failed: %w", err)
}
jobIDs[chainlinkNode] = append(jobIDs[chainlinkNode], ocrJob.Data.ID) // Store each job ID per node
}
}
l.Info().Msg("Verify OCRv2 jobs have been created")
for chainlinkNode, ids := range jobIDs {
for _, jobID := range ids {
var retries = 4
var baseDelay = time.Second * 2
for i := 0; i < retries; i++ {
_, resp, err := chainlinkNode.ReadJob(jobID)
if err == nil && resp.StatusCode == http.StatusOK {
l.Info().
Str("Node", chainlinkNode.PodName).
Str("Job ID", jobID).
Msg("OCRv2 job successfully created")
break
}
if i == retries-1 {
return fmt.Errorf("failed to verify job creation for node %s, jobID %s after %d retries", chainlinkNode.PodName, jobID, retries)
}

delay := baseDelay << i // Exponential delay: baseDelay * 2^i
l.Debug().
Str("Node", chainlinkNode.PodName).
Str("Job ID", jobID).
Int("Attempt", i+1).
Dur("Delay", delay).
Msg("Exponential backoff: Waiting for next retry")

time.Sleep(delay)
}
}
}
return nil
Expand Down
238 changes: 149 additions & 89 deletions integration-tests/testsetups/ocr.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type OCRSoakTest struct {
startTime time.Time
timeLeft time.Duration
startingBlockNum uint64
startingValue int
testEnvironment *environment.Environment
namespace string
log zerolog.Logger
Expand All @@ -88,6 +89,8 @@ type OCRSoakTest struct {
ocrV2Instances []contracts.OffchainAggregatorV2
ocrV2InstanceMap map[string]contracts.OffchainAggregatorV2 // address : instance

linkContract *contracts.EthereumLinkToken

rpcNetwork blockchain.EVMNetwork // network configuration for the blockchain node
reorgHappened bool // flag to indicate if a reorg happened during the test
gasSpikeSimulationHappened bool // flag to indicate if a gas spike simulation happened during the test
Expand Down Expand Up @@ -270,107 +273,148 @@ func (o *OCRSoakTest) Environment() *environment.Environment {
return o.testEnvironment
}

// Setup initializes the OCR Soak Test by setting up clients, funding nodes, and deploying OCR contracts.
func (o *OCRSoakTest) Setup(ocrTestConfig tt.OcrTestConfig) {
o.initializeClients(ocrTestConfig)
o.fundChainlinkNodes()

o.startingValue = 5

var forwarders []common.Address
if o.OperatorForwarderFlow {
_, forwarders = o.deployForwarderContracts()
}

o.setupOCRContracts(ocrTestConfig, forwarders)
o.log.Info().Msg("OCR Soak Test Setup Complete")
}

// initializeClients sets up the Seth client, Chainlink nodes, and mock server.
func (o *OCRSoakTest) initializeClients(ocrTestConfig tt.OcrTestConfig) {
sethClient, err := seth_utils.GetChainClient(o.Config, o.rpcNetwork)
require.NoError(o.t, err, "Error creating seth client")
o.seth = sethClient

nodes, err := nodeclient.ConnectChainlinkNodes(o.testEnvironment)
require.NoError(o.t, err, "Connecting to chainlink nodes shouldn't fail")
o.bootstrapNode, o.workerNodes = nodes[0], nodes[1:]

o.mockServer = ctf_client.ConnectMockServer(o.testEnvironment)
require.NoError(o.t, err, "Creating mockserver clients shouldn't fail")

linkContract, err := actions.LinkTokenContract(o.log, sethClient, ocrTestConfig.GetActiveOCRConfig())
require.NoError(o.t, err, "Error loading/deploying link token contract")
o.linkContract = linkContract
}

// Fund Chainlink nodes, excluding the bootstrap node
// fundChainlinkNodes funds the Chainlink worker nodes.
func (o *OCRSoakTest) fundChainlinkNodes() {
o.log.Info().Float64("ETH amount per node", *o.Config.Common.ChainlinkNodeFunding).Msg("Funding Chainlink nodes")
err = actions.FundChainlinkNodesFromRootAddress(o.log, sethClient, contracts.ChainlinkK8sClientToChainlinkNodeWithKeysAndAddress(o.workerNodes), big.NewFloat(*o.Config.Common.ChainlinkNodeFunding))
err := actions.FundChainlinkNodesFromRootAddress(o.log, o.seth, contracts.ChainlinkK8sClientToChainlinkNodeWithKeysAndAddress(o.workerNodes), big.NewFloat(*o.Config.Common.ChainlinkNodeFunding))
require.NoError(o.t, err, "Error funding Chainlink nodes")
}

var forwarders []common.Address
if o.OperatorForwarderFlow {
var operators []common.Address
operators, forwarders, _ = actions.DeployForwarderContracts(
o.t, o.seth, common.HexToAddress(linkContract.Address()), len(o.workerNodes),
)
require.Equal(o.t, len(o.workerNodes), len(operators), "Number of operators should match number of nodes")
require.Equal(o.t, len(o.workerNodes), len(forwarders), "Number of authorized forwarders should match number of nodes")
forwarderNodesAddresses, err := actions.ChainlinkNodeAddresses(o.workerNodes)
require.NoError(o.t, err, "Retrieving on-chain wallet addresses for chainlink nodes shouldn't fail")
for i := range o.workerNodes {
actions.AcceptAuthorizedReceiversOperator(
o.t, o.log, o.seth, operators[i], forwarders[i], []common.Address{forwarderNodesAddresses[i]})
require.NoError(o.t, err, "Accepting Authorize Receivers on Operator shouldn't fail")
actions.TrackForwarder(o.t, o.seth, forwarders[i], o.workerNodes[i])
}
} else if o.OCRVersion == "1" {
if o.OperatorForwarderFlow {
o.ocrV1Instances, err = actions.DeployOCRContractsForwarderFlow(
o.log,
o.seth,
o.Config.GetActiveOCRConfig(),
common.HexToAddress(linkContract.Address()),
contracts.ChainlinkK8sClientToChainlinkNodeWithKeysAndAddress(o.workerNodes),
forwarders,
)
require.NoError(o.t, err, "Error deploying OCR Forwarder contracts")
} else {
o.ocrV1Instances, err = actions.SetupOCRv1Contracts(
o.log,
sethClient,
o.Config.GetActiveOCRConfig(),
common.HexToAddress(linkContract.Address()),
contracts.ChainlinkK8sClientToChainlinkNodeWithKeysAndAddress(o.workerNodes),
)
require.NoError(o.t, err)
}
} else if o.OCRVersion == "2" {
var transmitters []string
// deployForwarderContracts deploys forwarder contracts if OperatorForwarderFlow is enabled.
func (o *OCRSoakTest) deployForwarderContracts() (operators []common.Address, forwarders []common.Address) {
operators, forwarders, _ = actions.DeployForwarderContracts(
o.t, o.seth, common.HexToAddress(o.linkContract.Address()), len(o.workerNodes),
)
require.Equal(o.t, len(o.workerNodes), len(operators), "Number of operators should match number of nodes")
require.Equal(o.t, len(o.workerNodes), len(forwarders), "Number of authorized forwarders should match number of nodes")

forwarderNodesAddresses, err := actions.ChainlinkNodeAddresses(o.workerNodes)
require.NoError(o.t, err, "Retrieving on-chain wallet addresses for chainlink nodes shouldn't fail")
for i := range o.workerNodes {
actions.AcceptAuthorizedReceiversOperator(o.t, o.log, o.seth, operators[i], forwarders[i], []common.Address{forwarderNodesAddresses[i]})
require.NoError(o.t, err, "Accepting Authorize Receivers on Operator shouldn't fail")
actions.TrackForwarder(o.t, o.seth, forwarders[i], o.workerNodes[i])
}
return operators, forwarders
}

if o.OperatorForwarderFlow {
for _, forwarder := range forwarders {
transmitters = append(transmitters, forwarder.Hex())
}
} else {
for _, node := range o.workerNodes {
nodeAddress, err := node.PrimaryEthAddress()
require.NoError(o.t, err, "Error getting node's primary ETH address")
transmitters = append(transmitters, nodeAddress)
}
}
// setupOCRContracts deploys and configures OCR contracts based on the version and forwarder flow.
func (o *OCRSoakTest) setupOCRContracts(ocrTestConfig tt.OcrTestConfig, forwarders []common.Address) {
if o.OCRVersion == "1" {
o.setupOCRv1Contracts(forwarders)
} else if o.OCRVersion == "2" {
o.setupOCRv2Contracts(ocrTestConfig, forwarders)
}
}

ocrOffchainOptions := contracts.DefaultOffChainAggregatorOptions()
o.ocrV2Instances, err = actions.SetupOCRv2Contracts(
// setupOCRv1Contracts deploys and configures OCRv1 contracts based on the forwarder flow.
func (o *OCRSoakTest) setupOCRv1Contracts(forwarders []common.Address) {
var err error
if o.OperatorForwarderFlow {
o.ocrV1Instances, err = actions.DeployOCRContractsForwarderFlow(
o.log,
o.seth,
ocrTestConfig.GetActiveOCRConfig(),
common.HexToAddress(linkContract.Address()),
transmitters,
ocrOffchainOptions,
o.Config.GetActiveOCRConfig(),
common.HexToAddress(o.linkContract.Address()),
contracts.ChainlinkK8sClientToChainlinkNodeWithKeysAndAddress(o.workerNodes),
forwarders,
)
require.NoError(o.t, err, "Error deploying OCRv2 contracts")

if !ocrTestConfig.GetActiveOCRConfig().UseExistingOffChainAggregatorsContracts() || (ocrTestConfig.GetActiveOCRConfig().UseExistingOffChainAggregatorsContracts() && ocrTestConfig.GetActiveOCRConfig().ConfigureExistingOffChainAggregatorsContracts()) {
contractConfig, err := actions.BuildMedianOCR2Config(o.workerNodes, ocrOffchainOptions)
require.NoError(o.t, err, "Error building median config")
err = actions.ConfigureOCRv2AggregatorContracts(contractConfig, o.ocrV2Instances)
require.NoError(o.t, err, "Error configuring OCRv2 aggregator contracts")
}
require.NoError(o.t, err, "Error deploying OCR Forwarder contracts")
o.createJobsWithForwarder()
} else {
o.ocrV1Instances, err = actions.SetupOCRv1Contracts(
o.log,
o.seth,
o.Config.GetActiveOCRConfig(),
common.HexToAddress(o.linkContract.Address()),
contracts.ChainlinkK8sClientToChainlinkNodeWithKeysAndAddress(o.workerNodes),
)
require.NoError(o.t, err, "Error setting up OCRv1 contracts")
err = o.createOCRv1Jobs()
require.NoError(o.t, err, "Error creating OCR jobs")
}

if o.OCRVersion == "1" {
for _, ocrInstance := range o.ocrV1Instances {
o.ocrV1InstanceMap[ocrInstance.Address()] = ocrInstance
o.storeOCRInstancesV1()
}

// setupOCRv2Contracts sets up and configures OCRv2 contracts.
func (o *OCRSoakTest) setupOCRv2Contracts(ocrTestConfig tt.OcrTestConfig, forwarders []common.Address) {
var err error
var transmitters []string
if o.OperatorForwarderFlow {
for _, forwarder := range forwarders {
transmitters = append(transmitters, forwarder.Hex())
}
} else if o.OCRVersion == "2" {
for _, ocrInstance := range o.ocrV2Instances {
o.ocrV2InstanceMap[ocrInstance.Address()] = ocrInstance
} else {
for _, node := range o.workerNodes {
nodeAddress, err := node.PrimaryEthAddress()
require.NoError(o.t, err, "Error getting node's primary ETH address")
transmitters = append(transmitters, nodeAddress)
}
}

o.log.Info().Msg("OCR Soak Test Setup Complete")
ocrOffchainOptions := contracts.DefaultOffChainAggregatorOptions()
o.ocrV2Instances, err = actions.SetupOCRv2Contracts(
o.log, o.seth, ocrTestConfig.GetActiveOCRConfig(), common.HexToAddress(o.linkContract.Address()), transmitters, ocrOffchainOptions,
)
require.NoError(o.t, err, "Error deploying OCRv2 contracts")
err = o.createOCRv2Jobs()
require.NoError(o.t, err, "Error creating OCR jobs")
if !ocrTestConfig.GetActiveOCRConfig().UseExistingOffChainAggregatorsContracts() || (ocrTestConfig.GetActiveOCRConfig().UseExistingOffChainAggregatorsContracts() && ocrTestConfig.GetActiveOCRConfig().ConfigureExistingOffChainAggregatorsContracts()) {
contractConfig, err := actions.BuildMedianOCR2Config(o.workerNodes, ocrOffchainOptions)
require.NoError(o.t, err, "Error building median config")
err = actions.ConfigureOCRv2AggregatorContracts(contractConfig, o.ocrV2Instances)
require.NoError(o.t, err, "Error configuring OCRv2 aggregator contracts")
}
o.storeOCRInstancesV2()
}

// storeOCRInstancesV1 stores OCRv1 contract instances by their addresses.
func (o *OCRSoakTest) storeOCRInstancesV1() {
for _, ocrInstance := range o.ocrV1Instances {
o.ocrV1InstanceMap[ocrInstance.Address()] = ocrInstance
}
}

// storeOCRInstancesV2 stores OCRv2 contract instances by their addresses.
func (o *OCRSoakTest) storeOCRInstancesV2() {
for _, ocrInstance := range o.ocrV2Instances {
o.ocrV2InstanceMap[ocrInstance.Address()] = ocrInstance
}
}

// Run starts the OCR soak test
Expand All @@ -384,31 +428,47 @@ func (o *OCRSoakTest) Run() {
require.NoError(o.t, err, "Error getting current block number")
o.startingBlockNum = latestBlockNum

startingValue := 5
if o.OperatorForwarderFlow {
actions.CreateOCRJobsWithForwarder(o.t, o.ocrV1Instances, o.bootstrapNode, o.workerNodes, startingValue, o.mockServer, o.seth.ChainID)
} else if o.OCRVersion == "1" {
ctx, cancel := context.WithTimeout(testcontext.Get(o.t), time.Second*5)
chainId, err := o.seth.Client.ChainID(ctx)
cancel()
require.NoError(o.t, err, "Error getting chain ID")
err = actions.CreateOCRJobs(o.ocrV1Instances, o.bootstrapNode, o.workerNodes, startingValue, o.mockServer, chainId.String())
require.NoError(o.t, err, "Error creating OCR jobs")
} else if o.OCRVersion == "2" {
err := actions.CreateOCRv2Jobs(o.ocrV2Instances, o.bootstrapNode, o.workerNodes, o.mockServer, startingValue, o.seth.ChainID, o.OperatorForwarderFlow)
require.NoError(o.t, err, "Error creating OCR jobs")
}

o.log.Info().
Str("Test Duration", o.Config.GetActiveOCRConfig().Common.TestDuration.Duration.Truncate(time.Second).String()).
Int("Number of OCR Contracts", *config.GetActiveOCRConfig().Common.NumberOfContracts).
Str("OCR Version", o.OCRVersion).
Msg("Starting OCR Soak Test")

o.testLoop(o.Config.GetActiveOCRConfig().Common.TestDuration.Duration, startingValue)
o.testLoop(o.Config.GetActiveOCRConfig().Common.TestDuration.Duration, o.startingValue)
o.complete()
}

// createJobsWithForwarder creates OCR jobs with the forwarder setup.
func (o *OCRSoakTest) createJobsWithForwarder() {
actions.CreateOCRJobsWithForwarder(o.t, o.ocrV1Instances, o.bootstrapNode, o.workerNodes, o.startingValue, o.mockServer, o.seth.ChainID)
}

// createOCRv1Jobs creates OCRv1 jobs.
func (o *OCRSoakTest) createOCRv1Jobs() error {
ctx, cancel := context.WithTimeout(testcontext.Get(o.t), time.Second*5)
defer cancel()

chainId, err := o.seth.Client.ChainID(ctx)
if err != nil {
return fmt.Errorf("error getting chain ID: %w", err)
}

err = actions.CreateOCRJobs(o.ocrV1Instances, o.bootstrapNode, o.workerNodes, o.startingValue, o.mockServer, chainId.String())
if err != nil {
return fmt.Errorf("error creating OCRv1 jobs: %w", err)
}
return nil
}

// createOCRv2Jobs creates OCRv2 jobs.
func (o *OCRSoakTest) createOCRv2Jobs() error {
err := actions.CreateOCRv2Jobs(o.ocrV2Instances, o.bootstrapNode, o.workerNodes, o.mockServer, o.startingValue, o.seth.ChainID, o.OperatorForwarderFlow, o.log)
if err != nil {
return fmt.Errorf("error creating OCRv2 jobs: %w", err)
}
return nil
}

// Networks returns the networks that the test is running on
func (o *OCRSoakTest) TearDownVals(t *testing.T) (
*testing.T,
Expand Down

0 comments on commit 6961ab1

Please sign in to comment.