diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline.go b/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline.go index d3530994702..c9752ea14db 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline.go @@ -41,7 +41,10 @@ func (r *EvmRegistry) CheckUpkeeps(ctx context.Context, keys ...ocr2keepers.Upke } chResult := make(chan checkResult, 1) - go r.doCheck(ctx, keys, chResult) + + r.threadCtrl.Go(func(ctx context.Context) { + r.doCheck(ctx, keys, chResult) + }) select { case rs := <-chResult: diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go index 660550afe97..fb2821a74b7 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go @@ -149,10 +149,15 @@ func (r *EvmRegistry) streamsLookup(ctx context.Context, checkResults []ocr2keep } var wg sync.WaitGroup + for i, lookup := range lookups { + i := i wg.Add(1) - go r.doLookup(ctx, &wg, lookup, i, checkResults, lggr) + r.threadCtrl.Go(func(ctx context.Context) { + r.doLookup(ctx, &wg, lookup, i, checkResults, lggr) + }) } + wg.Wait() // don't surface error to plugin bc StreamsLookup process should be self-contained. @@ -289,14 +294,19 @@ func (r *EvmRegistry) doMercuryRequest(ctx context.Context, sl *StreamsLookup, p if sl.FeedParamKey == feedIdHex && sl.TimeParamKey == blockNumber { // only mercury v0.2 for i := range sl.Feeds { - go r.singleFeedRequest(ctx, ch, i, sl, lggr) + i := i + r.threadCtrl.Go(func(ctx context.Context) { + r.singleFeedRequest(ctx, ch, i, sl, lggr) + }) } } else if sl.FeedParamKey == feedIDs { // only mercury v0.3 resultLen = 1 isMercuryV03 = true ch = make(chan MercuryData, resultLen) - go r.multiFeedsRequest(ctx, ch, sl, lggr) + r.threadCtrl.Go(func(ctx context.Context) { + r.multiFeedsRequest(ctx, ch, sl, lggr) + }) } else { return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", sl.FeedParamKey, sl.TimeParamKey, sl.Feeds) } diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup_test.go index 8d7c67d80ce..145d701454d 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup_test.go @@ -25,6 +25,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21/encoding" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21/mocks" + "github.com/smartcontractkit/chainlink/v2/core/utils" evmClientMocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" @@ -70,7 +71,8 @@ func setupEVMRegistry(t *testing.T) *EvmRegistry { allowListCache: cache.New(defaultAllowListExpiration, cleanupInterval), pluginRetryCache: cache.New(defaultPluginRetryExpiration, cleanupInterval), }, - hc: mockHttpClient, + hc: mockHttpClient, + threadCtrl: utils.NewThreadControl(), } return r } @@ -220,6 +222,7 @@ func TestEvmRegistry_StreamsLookup(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { r := setupEVMRegistry(t) + defer r.Close() client := new(evmClientMocks.Client) r.client = client @@ -362,6 +365,7 @@ func TestEvmRegistry_AllowedToUseMercury(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { r := setupEVMRegistry(t) + defer r.Close() client := new(evmClientMocks.Client) r.client = client @@ -576,9 +580,12 @@ func TestEvmRegistry_DoMercuryRequestV02(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { r := setupEVMRegistry(t) + defer r.Close() + if tt.pluginRetries != 0 { r.mercury.pluginRetryCache.Set(tt.pluginRetryKey, tt.pluginRetries, cache.DefaultExpiration) } + hc := mocks.NewHttpClient(t) for _, blob := range tt.mockChainlinkBlobs { @@ -812,6 +819,8 @@ func TestEvmRegistry_SingleFeedRequest(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { r := setupEVMRegistry(t) + defer r.Close() + hc := mocks.NewHttpClient(t) mr := MercuryV02Response{ChainlinkBlob: tt.blob} @@ -1157,6 +1166,8 @@ func TestEvmRegistry_MultiFeedRequest(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { r := setupEVMRegistry(t) + defer r.Close() + if tt.pluginRetries != 0 { r.mercury.pluginRetryCache.Set(tt.pluginRetryKey, tt.pluginRetries, cache.DefaultExpiration) } @@ -1319,6 +1330,8 @@ func TestEvmRegistry_CheckCallback(t *testing.T) { t.Run(tt.name, func(t *testing.T) { client := new(evmClientMocks.Client) r := setupEVMRegistry(t) + defer r.Close() + payload, err := r.abi.Pack("checkCallback", tt.lookup.upkeepId, values, tt.lookup.ExtraData) require.Nil(t, err) args := map[string]interface{}{