From 3a4979a1b666c6eb018dbd82c827b6fab21c9731 Mon Sep 17 00:00:00 2001 From: Bartek Tofel Date: Thu, 25 Apr 2024 18:11:30 +0200 Subject: [PATCH] change architecture so that we have only 1 subscription and then we fan out the header to all goroutines; use SafeEVMHeader --- .../ethereum_contracts_automation_seth.go | 3 +- .../testsetups/keeper_benchmark.go | 88 +++++++++++++++---- 2 files changed, 72 insertions(+), 19 deletions(-) diff --git a/integration-tests/contracts/ethereum_contracts_automation_seth.go b/integration-tests/contracts/ethereum_contracts_automation_seth.go index 9ba33cb3b42..d91e092d720 100644 --- a/integration-tests/contracts/ethereum_contracts_automation_seth.go +++ b/integration-tests/contracts/ethereum_contracts_automation_seth.go @@ -15,6 +15,7 @@ import ( "github.com/rs/zerolog" "github.com/smartcontractkit/seth" + "github.com/smartcontractkit/chainlink-testing-framework/blockchain" "github.com/smartcontractkit/chainlink-testing-framework/networks" "github.com/smartcontractkit/chainlink/integration-tests/contracts/ethereum" eth_contracts "github.com/smartcontractkit/chainlink/integration-tests/contracts/ethereum" @@ -2359,7 +2360,7 @@ func NewKeeperConsumerBenchmarkkUpkeepObserver( // ReceiveHeader will query the latest Keeper round and check to see whether upkeep was performed, it returns // true when observation has finished. -func (o *KeeperConsumerBenchmarkUpkeepObserver) ReceiveHeader(receivedHeader *types.Header) (bool, error) { +func (o *KeeperConsumerBenchmarkUpkeepObserver) ReceiveHeader(receivedHeader *blockchain.SafeEVMHeader) (bool, error) { if receivedHeader.Number.Uint64() <= o.lastBlockNum { // Uncle / reorg we won't count return false, nil } diff --git a/integration-tests/testsetups/keeper_benchmark.go b/integration-tests/testsetups/keeper_benchmark.go index 63651c45822..b21b5da78c9 100644 --- a/integration-tests/testsetups/keeper_benchmark.go +++ b/integration-tests/testsetups/keeper_benchmark.go @@ -2,6 +2,7 @@ package testsetups import ( "context" + "errors" "fmt" "math" "math/big" @@ -17,7 +18,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" - "github.com/pkg/errors" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/slack-go/slack" @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" + "github.com/smartcontractkit/chainlink-testing-framework/blockchain" "github.com/smartcontractkit/chainlink-testing-framework/k8s/environment" "github.com/smartcontractkit/chainlink-testing-framework/logging" reportModel "github.com/smartcontractkit/chainlink-testing-framework/testreporters" @@ -259,14 +260,69 @@ func (k *KeeperBenchmarkTest) Run() { var startedObservations = atomic.Int32{} var finishedObservations = atomic.Int32{} + // We create as many channels as listening goroutines (1 per upkeep). In the background we will be fanning out + // headers that we get from a single channel connected to EVM node to all upkeep-specific channels. + headerCh := make(chan *blockchain.SafeEVMHeader, 10) + sub, err := k.chainClient.Client.Client().EthSubscribe(context.Background(), headerCh, "newHeads") + require.NoError(k.t, err, "Subscribing to new headers for upkeep observation shouldn't fail") + + totalNumberOfChannels := 0 + for rIndex := range k.keeperRegistries { + totalNumberOfChannels += len(k.upkeepIDs[rIndex]) + } + + contractChannels := make([]chan *blockchain.SafeEVMHeader, totalNumberOfChannels) + for idx := 0; idx < totalNumberOfChannels; idx++ { + contractChannels[idx] = make(chan *blockchain.SafeEVMHeader, 10) // Buffered just in case processing is slow + } + + // signals all goroutines to stop when subscription error occurs + stopAllGoroutinesCh := make(chan struct{}) + + // this goroutine fans out headers to goroutines in the background + // and exists when all goroutines are done or when an error occurs + go func() { + defer func() { + // close all fanning out channels at the very end + for _, ch := range contractChannels { + close(ch) + } + k.log.Debug().Msg("Closed header distribution channels") + }() + for { + select { + case header := <-headerCh: + k.log.Debug().Int64("Number", header.Number.Int64()).Msg("Fanning out new header") + for _, ch := range contractChannels { + ch <- header + } + // we don't really care if it was a success or an error, we just want to exit + // if it was an error, we will have an error in the main goroutine + case <-errCtx.Done(): + k.log.Debug().Msg("All goroutines finished.") + sub.Unsubscribe() + return + case err := <-sub.Err(): + // no need to unsubscribe, subscripion errored + k.log.Err(err).Msg("header subscription failed. Exiting") + // close channel to signal all goroutines they should exit + close(stopAllGoroutinesCh) + return + } + } + }() + + currentChannelIndex := 0 for rIndex := range k.keeperRegistries { for index, upkeepID := range k.upkeepIDs[rIndex] { + chIndex := currentChannelIndex + currentChannelIndex++ upkeepIDCopy := upkeepID registryIndex := rIndex upkeepIndex := int64(index) errgroup.Go(func() error { startedObservations.Add(1) - k.log.Info().Str("UpkeepID", upkeepIDCopy.String()).Msg("Starting upkeep observation") + k.log.Info().Int("Channel index", chIndex).Str("UpkeepID", upkeepIDCopy.String()).Msg("Starting upkeep observation") confirmer := contracts.NewKeeperConsumerBenchmarkkUpkeepObserver( k.keeperConsumerContracts[registryIndex], @@ -281,39 +337,32 @@ func (k *KeeperBenchmarkTest) Run() { ) k.log.Debug().Str("UpkeepID", upkeepIDCopy.String()).Msg("Subscribing to new headers for upkeep observation") - // hard to say what should be the buffer size, we want it big, but too big as we might run out of memory - // but on L2s with super fast block times, we might run into `subscription queue overflow` errors, when this - // buffer is too small - headerCh := make(chan *types.Header, 5000) - sub, err := k.chainClient.Client.SubscribeNewHead(context.Background(), headerCh) - if err != nil { - return err - } for { select { - case subscriptionErr := <-sub.Err(): // header listening failed for the upkeep, exit - return errors.Wrapf(subscriptionErr, "listening for new headers for upkeep %s failed. Exiting", upkeepIDCopy.String()) - case <-errCtx.Done(): //one of goroutines errored, shut down gracefully + case <-stopAllGoroutinesCh: // header listening failed, exit + return errors.New("header distribution channel closed") + case <-errCtx.Done(): //one of goroutines errored, shut down gracefully, no need to return error k.log.Error().Err(errCtx.Err()).Str("UpkeepID", upkeepIDCopy.String()).Msg("Stopping obervations due to error in one of the goroutines") - sub.Unsubscribe() return nil - case header := <-headerCh: // new block, check if upkeep was performed + case header := <-contractChannels[chIndex]: // new block, check if upkeep was performed + k.log.Debug().Interface("Header number", header.Number).Str("UpkeepID", upkeepIDCopy.String()).Msg("Started processing new header") finished, headerErr := confirmer.ReceiveHeader(header) if headerErr != nil { return headerErr } - if finished { // observations should be completed as we are beyond block range + + if finished { // observations should be completed as we are beyond block range, if there are not there's a bug in test code finishedObservations.Add(1) k.log.Info().Str("Done/Total", fmt.Sprintf("%d/%d", finishedObservations.Load(), startedObservations.Load())).Str("UpkeepID", upkeepIDCopy.String()).Msg("Upkeep observation completed") - sub.Unsubscribe() if confirmer.Complete() { confirmer.LogDetails() return nil } - return fmt.Errorf("confimer has finished, but without completing observation, this should never happen. UpkdeepID: %s", upkeepIDCopy.String()) + return fmt.Errorf("confimer has finished, but without completing observation, this should never happen. Review your code. UpkdeepID: %s", upkeepIDCopy.String()) } + k.log.Debug().Interface("Header number", header.Number).Str("UpkeepID", upkeepIDCopy.String()).Msg("Finished processing new header") } } }) @@ -324,6 +373,9 @@ func (k *KeeperBenchmarkTest) Run() { k.t.Fatalf("errored when waiting for upkeeps: %v", err) } + // Close header distribution channel once all observations are done + close(stopAllGoroutinesCh) + // Main test loop k.observeUpkeepEvents()