From bb12ee9eb8be3b1c394016e69b0b8da6ad48d813 Mon Sep 17 00:00:00 2001 From: amirylm Date: Wed, 14 Feb 2024 11:34:34 +0200 Subject: [PATCH 01/11] return timeout for retry interval --- .../evmregistry/v21/mercury/mercury.go | 10 +-- .../evmregistry/v21/mercury/mercury_test.go | 77 +++++++++++++++++++ 2 files changed, 82 insertions(+), 5 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/mercury.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/mercury.go index d24442b6ee9..45f1b54b2cd 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/mercury.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/mercury.go @@ -26,7 +26,8 @@ const ( BlockNumber = "blockNumber" // valid for v0.2 Timestamp = "timestamp" // valid for v0.3 totalFastPluginRetries = 5 - totalMediumPluginRetries = 10 + totalMediumPluginRetries = totalFastPluginRetries + 1 + RetryIntervalTimeout = time.Duration(-1) ) var GenerateHMACFn = func(method string, path string, body []byte, clientId string, secret string, ts int64) string { @@ -45,8 +46,7 @@ var GenerateHMACFn = func(method string, path string, body []byte, clientId stri } // CalculateRetryConfig returns plugin retry interval based on how many times plugin has retried this work -var CalculateRetryConfigFn = func(prk string, mercuryConfig MercuryConfigProvider) time.Duration { - var retryInterval time.Duration +var CalculateRetryConfigFn = func(prk string, mercuryConfig MercuryConfigProvider) (retryInterval time.Duration) { var retries int totalAttempts, ok := mercuryConfig.GetPluginRetry(prk) if ok { @@ -55,9 +55,9 @@ var CalculateRetryConfigFn = func(prk string, mercuryConfig MercuryConfigProvide retryInterval = 1 * time.Second } else if retries < totalMediumPluginRetries { retryInterval = 5 * time.Second + } else { + retryInterval = RetryIntervalTimeout } - // if the core node has retried totalMediumPluginRetries times, do not set retry interval and plugin will use - // the default interval } else { retryInterval = 1 * time.Second } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/mercury_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/mercury_test.go index ce82ec7ae8f..3854253d48a 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/mercury_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/mercury_test.go @@ -5,12 +5,15 @@ import ( "errors" "math/big" "testing" + "time" + "github.com/patrickmn/go-cache" "github.com/stretchr/testify/require" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/stretchr/testify/assert" + "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding" ) @@ -240,3 +243,77 @@ func TestPacker_UnpackCheckCallbackResult(t *testing.T) { }) } } + +func Test_CalculateRetryConfigFn(t *testing.T) { + tests := []struct { + name string + times int + expected time.Duration + }{ + { + name: "first retry", + times: 1, + expected: 1 * time.Second, + }, + { + name: "second retry", + times: 2, + expected: 1 * time.Second, + }, + { + name: "fifth retry", + times: 5, + expected: 1 * time.Second, + }, + { + name: "sixth retry", + times: 6, + expected: 5 * time.Second, + }, + { + name: "timeout", + times: totalMediumPluginRetries + 1, + expected: RetryIntervalTimeout, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + cfg := newMercuryConfigMock() + var result time.Duration + for i := 0; i < tc.times; i++ { + result = CalculateRetryConfigFn("prk", cfg) + } + assert.Equal(t, tc.expected, result) + }) + } +} + +type mercuryConfigMock struct { + pluginRetryCache *cache.Cache +} + +func newMercuryConfigMock() *mercuryConfigMock { + return &mercuryConfigMock{ + pluginRetryCache: cache.New(10*time.Second, time.Minute), + } +} + +func (c *mercuryConfigMock) Credentials() *types.MercuryCredentials { + return nil +} + +func (c *mercuryConfigMock) IsUpkeepAllowed(k string) (interface{}, bool) { + return nil, false +} + +func (c *mercuryConfigMock) SetUpkeepAllowed(k string, v interface{}, d time.Duration) { +} + +func (c *mercuryConfigMock) GetPluginRetry(k string) (interface{}, bool) { + return c.pluginRetryCache.Get(k) +} + +func (c *mercuryConfigMock) SetPluginRetry(k string, v interface{}, d time.Duration) { + c.pluginRetryCache.Set(k, v, d) +} From cb19d4b54686d489a33f781c0e4b6696035da5b7 Mon Sep 17 00:00:00 2001 From: amirylm Date: Wed, 14 Feb 2024 11:40:53 +0200 Subject: [PATCH 02/11] identify retryTimeout + placeholder for err handler --- .../evmregistry/v21/mercury/streams/streams.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go index 48ee0492f9e..5101c8b1661 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go @@ -15,6 +15,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/patrickmn/go-cache" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" "github.com/smartcontractkit/chainlink-common/pkg/services" ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation" @@ -251,6 +252,19 @@ func (s *streams) DoMercuryRequest(ctx context.Context, lookup *mercury.StreamsL checkResults[i].RetryInterval = retryInterval checkResults[i].PipelineExecutionState = uint8(state) checkResults[i].IneligibilityReason = uint8(reason) + retryTimeout := retryInterval == mercury.RetryIntervalTimeout + if retryTimeout { + // in case of retry timeout, setting retryable to false + checkResults[i].Retryable = false + } + upkeepType := core.GetUpkeepType(checkResults[i].UpkeepID) + switch state { + case encoding.MercuryFlakyFailure: + if retryTimeout && upkeepType == types.LogTrigger && retryable { + // TODO: prepare for error handler + } + default: + } return nil, err } From 76167b07ad1b0057c8c3a68143fee6d61d52a8a9 Mon Sep 17 00:00:00 2001 From: amirylm Date: Wed, 14 Feb 2024 16:47:49 +0200 Subject: [PATCH 03/11] err codes --- .../evmregistry/v21/encoding/interface.go | 11 ++++++ .../v21/mercury/streams/streams.go | 34 +++++++++++++++++-- 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go index 1f93fd3ee22..9e4ed22cf1d 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go @@ -43,6 +43,17 @@ const ( UpkeepNotAuthorized PipelineExecutionState = 9 ) +type ErrCode uint32 + +const ( + StatusOK ErrCode = 800200 + StatusPartielContent ErrCode = 800206 + StatusMercuryError ErrCode = 808500 + StatusBadRequest ErrCode = 800400 + StatusUnauthorized ErrCode = 800401 + StatusStreamLookupTimeout ErrCode = 808603 +) + type UpkeepInfo = iregistry21.KeeperRegistryBase21UpkeepInfo type Packer interface { diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go index 5101c8b1661..49a2f27fd40 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go @@ -6,6 +6,8 @@ import ( "fmt" "math/big" "net/http" + "strconv" + "strings" "sync" "time" @@ -258,9 +260,11 @@ func (s *streams) DoMercuryRequest(ctx context.Context, lookup *mercury.StreamsL checkResults[i].Retryable = false } upkeepType := core.GetUpkeepType(checkResults[i].UpkeepID) - switch state { - case encoding.MercuryFlakyFailure: - if retryTimeout && upkeepType == types.LogTrigger && retryable { + switch upkeepType { + case types.LogTrigger: + if retryTimeout && state == encoding.MercuryFlakyFailure { + errCode := buildErrCode(err) + s.lggr.Debugf("at block %d upkeep %s requested time %s doMercuryRequest err: %s, errCode: %d", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error(), errCode) // TODO: prepare for error handler } default: @@ -349,3 +353,27 @@ func (s *streams) Close() error { return nil }) } + +func buildErrCode(err error) encoding.ErrCode { + if err == nil { + return 0 + } + + errStr := err.Error() + baseCode, serr := strconv.Atoi(errStr) + if serr != nil { + if strings.Contains(errStr, "unauthorized upkeep") { + return encoding.StatusUnauthorized + } + if strings.Contains(errStr, "invalid format") { + return encoding.StatusBadRequest + } + return 0 + } + + errcode, err := strconv.Atoi(fmt.Sprintf("800%d", baseCode)) + if err != nil { + return 0 + } + return encoding.ErrCode(errcode) +} From 4ff0ac3189178b3a698abd7bd71a5c41150c9457 Mon Sep 17 00:00:00 2001 From: amirylm Date: Wed, 14 Feb 2024 18:00:41 +0200 Subject: [PATCH 04/11] fix test --- .../v21/mercury/v02/v02_request_test.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/v02_request_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/v02_request_test.go index 6c07c383504..5f8c079a9fd 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/v02_request_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/v02_request_test.go @@ -366,13 +366,14 @@ func TestV02_DoMercuryRequestV02(t *testing.T) { }, UpkeepId: upkeepId, }, - pluginRetries: 10, - mockHttpStatusCode: http.StatusInternalServerError, - mockChainlinkBlobs: []string{"0x00066dfcd1ed2d95b18c948dbc5bd64c687afe93e4ca7d663ddec14c20090ad80000000000000000000000000000000000000000000000000000000000081401000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000002200000000000000000000000000000000000000000000000000000000000000280000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001204554482d5553442d415242495452554d2d544553544e455400000000000000000000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000289ad8d367000000000000000000000000000000000000000000000000000000289acf0b38000000000000000000000000000000000000000000000000000000289b3da40000000000000000000000000000000000000000000000000000000000018ae7ce74d9fa252a8983976eab600dc7590c778d04813430841bc6e765c34cd81a168d00000000000000000000000000000000000000000000000000000000018ae7cb0000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000000000000260412b94e525ca6cedc9f544fd86f77606d52fe731a5d069dbe836a8bfc0fb8c911963b0ae7a14971f3b4621bffb802ef0605392b9a6c89c7fab1df8633a5ade00000000000000000000000000000000000000000000000000000000000000024500c2f521f83fba5efc2bf3effaaedde43d0a4adff785c1213b712a3aed0d8157642a84324db0cf9695ebd27708d4608eb0337e0dd87b0e43f0fa70c700d911"}, - expectedValues: [][]byte{nil}, - expectedRetryable: true, - expectedError: errors.New("failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 500\n#2: 500\n#3: 500"), - state: encoding.MercuryFlakyFailure, + pluginRetries: 10, + mockHttpStatusCode: http.StatusInternalServerError, + mockChainlinkBlobs: []string{"0x00066dfcd1ed2d95b18c948dbc5bd64c687afe93e4ca7d663ddec14c20090ad80000000000000000000000000000000000000000000000000000000000081401000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000002200000000000000000000000000000000000000000000000000000000000000280000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001204554482d5553442d415242495452554d2d544553544e455400000000000000000000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000289ad8d367000000000000000000000000000000000000000000000000000000289acf0b38000000000000000000000000000000000000000000000000000000289b3da40000000000000000000000000000000000000000000000000000000000018ae7ce74d9fa252a8983976eab600dc7590c778d04813430841bc6e765c34cd81a168d00000000000000000000000000000000000000000000000000000000018ae7cb0000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000000000000260412b94e525ca6cedc9f544fd86f77606d52fe731a5d069dbe836a8bfc0fb8c911963b0ae7a14971f3b4621bffb802ef0605392b9a6c89c7fab1df8633a5ade00000000000000000000000000000000000000000000000000000000000000024500c2f521f83fba5efc2bf3effaaedde43d0a4adff785c1213b712a3aed0d8157642a84324db0cf9695ebd27708d4608eb0337e0dd87b0e43f0fa70c700d911"}, + expectedValues: [][]byte{nil}, + expectedRetryInterval: mercury.RetryIntervalTimeout, + expectedRetryable: true, + expectedError: errors.New("failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 500\n#2: 500\n#3: 500"), + state: encoding.MercuryFlakyFailure, }, { name: "failure - not retryable", From 885674d348df18cc31995bcb7c7c9328177c2ee1 Mon Sep 17 00:00:00 2001 From: amirylm Date: Wed, 14 Feb 2024 19:28:14 +0200 Subject: [PATCH 05/11] err codes --- .../evmregistry/v21/encoding/interface.go | 32 +++++++++++++--- .../evmregistry/v21/mercury/mercury.go | 3 +- .../v21/mercury/streams/streams.go | 37 +++---------------- .../evmregistry/v21/mercury/v02/request.go | 11 ++++-- .../v21/mercury/v02/v02_request_test.go | 8 +++- .../evmregistry/v21/mercury/v03/request.go | 16 ++++++-- .../v21/mercury/v03/v03_request_test.go | 4 +- 7 files changed, 64 insertions(+), 47 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go index 9e4ed22cf1d..a794791c5e8 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go @@ -1,6 +1,8 @@ package encoding import ( + "net/http" + ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/automation_utils_2_1" @@ -43,17 +45,35 @@ const ( UpkeepNotAuthorized PipelineExecutionState = 9 ) +// ErrCode is used for invoking an error handler with a specific error code type ErrCode uint32 const ( - StatusOK ErrCode = 800200 - StatusPartielContent ErrCode = 800206 - StatusMercuryError ErrCode = 808500 - StatusBadRequest ErrCode = 800400 - StatusUnauthorized ErrCode = 800401 - StatusStreamLookupTimeout ErrCode = 808603 + ErrCodeNil ErrCode = 0 + ErrCodePartielContent ErrCode = 800206 + ErrCodeMercuryError ErrCode = 808500 + ErrCodeBadRequest ErrCode = 800400 + ErrCodeUnauthorized ErrCode = 800401 + ErrCodeStreamLookupTimeout ErrCode = 808603 ) +func HttpToErrCode(statusCode int) ErrCode { + switch statusCode { + case http.StatusOK: + return ErrCodeNil + case http.StatusPartialContent: + return ErrCodePartielContent + case http.StatusBadRequest: + return ErrCodeBadRequest + case http.StatusUnauthorized: + return ErrCodeUnauthorized + case http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout: + return ErrCodeMercuryError + default: + return 0 + } +} + type UpkeepInfo = iregistry21.KeeperRegistryBase21UpkeepInfo type Packer interface { diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/mercury.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/mercury.go index 45f1b54b2cd..472d3b22204 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/mercury.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/mercury.go @@ -68,6 +68,7 @@ var CalculateRetryConfigFn = func(prk string, mercuryConfig MercuryConfigProvide type MercuryData struct { Index int Error error + ErrCode encoding.ErrCode Retryable bool Bytes [][]byte State encoding.PipelineExecutionState @@ -86,7 +87,7 @@ type HttpClient interface { } type MercuryClient interface { - DoRequest(ctx context.Context, streamsLookup *StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, error) + DoRequest(ctx context.Context, streamsLookup *StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, encoding.ErrCode, error) } type StreamsLookupError struct { diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go index 49a2f27fd40..47b4b905e1c 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go @@ -6,8 +6,6 @@ import ( "fmt" "math/big" "net/http" - "strconv" - "strings" "sync" "time" @@ -240,13 +238,13 @@ func (s *streams) CheckCallback(ctx context.Context, values [][]byte, lookup *me } func (s *streams) DoMercuryRequest(ctx context.Context, lookup *mercury.StreamsLookup, checkResults []ocr2keepers.CheckResult, i int) ([][]byte, error) { - state, reason, values, retryable, retryInterval, err := encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0*time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", lookup.FeedParamKey, lookup.TimeParamKey, lookup.Feeds) + state, reason, values, retryable, retryInterval, errCode, err := encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0*time.Second, encoding.ErrCodeNil, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", lookup.FeedParamKey, lookup.TimeParamKey, lookup.Feeds) pluginRetryKey := generatePluginRetryKey(checkResults[i].WorkID, lookup.Block) if lookup.IsMercuryV02() { - state, reason, values, retryable, retryInterval, err = s.v02Client.DoRequest(ctx, lookup, pluginRetryKey) + state, reason, values, retryable, retryInterval, errCode, err = s.v02Client.DoRequest(ctx, lookup, pluginRetryKey) } else if lookup.IsMercuryV03() { - state, reason, values, retryable, retryInterval, err = s.v03Client.DoRequest(ctx, lookup, pluginRetryKey) + state, reason, values, retryable, retryInterval, errCode, err = s.v03Client.DoRequest(ctx, lookup, pluginRetryKey) } if err != nil { @@ -262,10 +260,9 @@ func (s *streams) DoMercuryRequest(ctx context.Context, lookup *mercury.StreamsL upkeepType := core.GetUpkeepType(checkResults[i].UpkeepID) switch upkeepType { case types.LogTrigger: - if retryTimeout && state == encoding.MercuryFlakyFailure { - errCode := buildErrCode(err) + if retryTimeout && errCode > encoding.ErrCodeNil { s.lggr.Debugf("at block %d upkeep %s requested time %s doMercuryRequest err: %s, errCode: %d", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error(), errCode) - // TODO: prepare for error handler + // TODO: prepare for error handler, make sure to pass errCode } default: } @@ -353,27 +350,3 @@ func (s *streams) Close() error { return nil }) } - -func buildErrCode(err error) encoding.ErrCode { - if err == nil { - return 0 - } - - errStr := err.Error() - baseCode, serr := strconv.Atoi(errStr) - if serr != nil { - if strings.Contains(errStr, "unauthorized upkeep") { - return encoding.StatusUnauthorized - } - if strings.Contains(errStr, "invalid format") { - return encoding.StatusBadRequest - } - return 0 - } - - errcode, err := strconv.Atoi(fmt.Sprintf("800%d", baseCode)) - if err != nil { - return 0 - } - return encoding.ErrCode(errcode) -} diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/request.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/request.go index 202661145bf..0c79f0aefd3 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/request.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/request.go @@ -53,11 +53,11 @@ func NewClient(mercuryConfig mercury.MercuryConfigProvider, httpClient mercury.H } } -func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, error) { +func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, encoding.ErrCode, error) { resultLen := len(streamsLookup.Feeds) ch := make(chan mercury.MercuryData, resultLen) if len(streamsLookup.Feeds) == 0 { - return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", streamsLookup.FeedParamKey, streamsLookup.TimeParamKey, streamsLookup.Feeds) + return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, 0, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", streamsLookup.FeedParamKey, streamsLookup.TimeParamKey, streamsLookup.Feeds) } for i := range streamsLookup.Feeds { // TODO (AUTO-7209): limit the number of concurrent requests @@ -69,6 +69,7 @@ func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLo var reqErr error var retryInterval time.Duration + var errCode encoding.ErrCode results := make([][]byte, len(streamsLookup.Feeds)) retryable := true allSuccess := true @@ -79,6 +80,7 @@ func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLo if m.Error != nil { reqErr = errors.Join(reqErr, m.Error) retryable = retryable && m.Retryable + errCode = m.ErrCode allSuccess = false if m.State != encoding.NoPipelineError { state = m.State @@ -91,7 +93,7 @@ func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLo retryInterval = mercury.CalculateRetryConfigFn(pluginRetryKey, c.mercuryConfig) } // only retry when not all successful AND none are not retryable - return state, encoding.UpkeepFailureReasonNone, results, retryable && !allSuccess, retryInterval, reqErr + return state, encoding.UpkeepFailureReasonNone, results, retryable && !allSuccess, retryInterval, errCode, reqErr } func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.MercuryData, index int, sl *mercury.StreamsLookup) { @@ -120,6 +122,7 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur httpRequest.Header.Set(signatureHeader, signature) // in the case of multiple retries here, use the last attempt's data + errCode := encoding.ErrCodeNil state := encoding.NoPipelineError retryable := false sent := false @@ -148,6 +151,7 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur c.lggr.Warnf("at block %s upkeep %s received status code %d for feed %s", sl.Time.String(), sl.UpkeepId.String(), httpResponse.StatusCode, sl.Feeds[index]) retryable = true state = encoding.MercuryFlakyFailure + errCode = encoding.HttpToErrCode(httpResponse.StatusCode) return errors.New(strconv.FormatInt(int64(httpResponse.StatusCode), 10)) case http.StatusOK: // continue @@ -193,6 +197,7 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur Bytes: [][]byte{}, Retryable: retryable, Error: fmt.Errorf("failed to request feed for %s: %w", sl.Feeds[index], retryErr), + ErrCode: errCode, State: state, } } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/v02_request_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/v02_request_test.go index 5f8c079a9fd..afa0d02781b 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/v02_request_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/v02_request_test.go @@ -290,6 +290,7 @@ func TestV02_DoMercuryRequestV02(t *testing.T) { expectedValues [][]byte expectedRetryable bool expectedRetryInterval time.Duration + expectedErrCode encoding.ErrCode expectedError error state encoding.PipelineExecutionState reason encoding.UpkeepFailureReason @@ -330,6 +331,7 @@ func TestV02_DoMercuryRequestV02(t *testing.T) { expectedRetryable: true, pluginRetries: 0, expectedRetryInterval: 1 * time.Second, + expectedErrCode: encoding.ErrCodeMercuryError, expectedError: errors.New("failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 500\n#2: 500\n#3: 500"), state: encoding.MercuryFlakyFailure, }, @@ -351,6 +353,7 @@ func TestV02_DoMercuryRequestV02(t *testing.T) { expectedValues: [][]byte{nil}, expectedRetryable: true, expectedRetryInterval: 5 * time.Second, + expectedErrCode: encoding.ErrCodeMercuryError, expectedError: errors.New("failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 500\n#2: 500\n#3: 500"), state: encoding.MercuryFlakyFailure, }, @@ -371,6 +374,7 @@ func TestV02_DoMercuryRequestV02(t *testing.T) { mockChainlinkBlobs: []string{"0x00066dfcd1ed2d95b18c948dbc5bd64c687afe93e4ca7d663ddec14c20090ad80000000000000000000000000000000000000000000000000000000000081401000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000002200000000000000000000000000000000000000000000000000000000000000280000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001204554482d5553442d415242495452554d2d544553544e455400000000000000000000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000289ad8d367000000000000000000000000000000000000000000000000000000289acf0b38000000000000000000000000000000000000000000000000000000289b3da40000000000000000000000000000000000000000000000000000000000018ae7ce74d9fa252a8983976eab600dc7590c778d04813430841bc6e765c34cd81a168d00000000000000000000000000000000000000000000000000000000018ae7cb0000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000000000000260412b94e525ca6cedc9f544fd86f77606d52fe731a5d069dbe836a8bfc0fb8c911963b0ae7a14971f3b4621bffb802ef0605392b9a6c89c7fab1df8633a5ade00000000000000000000000000000000000000000000000000000000000000024500c2f521f83fba5efc2bf3effaaedde43d0a4adff785c1213b712a3aed0d8157642a84324db0cf9695ebd27708d4608eb0337e0dd87b0e43f0fa70c700d911"}, expectedValues: [][]byte{nil}, expectedRetryInterval: mercury.RetryIntervalTimeout, + expectedErrCode: encoding.ErrCodeMercuryError, expectedRetryable: true, expectedError: errors.New("failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 500\n#2: 500\n#3: 500"), state: encoding.MercuryFlakyFailure, @@ -452,7 +456,7 @@ func TestV02_DoMercuryRequestV02(t *testing.T) { } c.httpClient = hc - state, reason, values, retryable, retryInterval, reqErr := c.DoRequest(testutils.Context(t), tt.lookup, tt.pluginRetryKey) + state, reason, values, retryable, retryInterval, errCode, reqErr := c.DoRequest(testutils.Context(t), tt.lookup, tt.pluginRetryKey) assert.Equal(t, tt.expectedValues, values) assert.Equal(t, tt.expectedRetryable, retryable) if retryable { @@ -460,8 +464,10 @@ func TestV02_DoMercuryRequestV02(t *testing.T) { assert.Equal(t, tt.pluginRetries+1, newRetries.(int)) } assert.Equal(t, tt.expectedRetryInterval, retryInterval) + assert.Equal(t, tt.expectedErrCode, errCode) assert.Equal(t, tt.state, state) assert.Equal(t, tt.reason, reason) + if tt.expectedError != nil { assert.True(t, strings.HasPrefix(reqErr.Error(), "failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000")) } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go index 584069adde3..9e5d32bcb82 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go @@ -61,9 +61,9 @@ func NewClient(mercuryConfig mercury.MercuryConfigProvider, httpClient mercury.H } } -func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, error) { +func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, encoding.ErrCode, error) { if len(streamsLookup.Feeds) == 0 { - return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", streamsLookup.FeedParamKey, streamsLookup.TimeParamKey, streamsLookup.Feeds) + return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, 0, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", streamsLookup.FeedParamKey, streamsLookup.TimeParamKey, streamsLookup.Feeds) } resultLen := 1 // Only 1 multi-feed request is made for all feeds ch := make(chan mercury.MercuryData, resultLen) @@ -73,6 +73,7 @@ func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLo var reqErr error var retryInterval time.Duration + var errCode encoding.ErrCode results := make([][]byte, len(streamsLookup.Feeds)) retryable := false state := encoding.NoPipelineError @@ -82,6 +83,7 @@ func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLo reqErr = m.Error retryable = m.Retryable state = m.State + errCode = m.ErrCode if retryable { retryInterval = mercury.CalculateRetryConfigFn(pluginRetryKey, c.mercuryConfig) } @@ -89,7 +91,7 @@ func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLo results = m.Bytes } - return state, encoding.UpkeepFailureReasonNone, results, retryable, retryInterval, reqErr + return state, encoding.UpkeepFailureReasonNone, results, retryable, retryInterval, errCode, reqErr } func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.MercuryData, sl *mercury.StreamsLookup) { @@ -130,6 +132,7 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur state := encoding.NoPipelineError retryable := false sent := false + errCode := encoding.ErrCodeNil retryErr := retry.Do( func() error { retryable = false @@ -154,26 +157,31 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur case http.StatusUnauthorized: retryable = false state = encoding.UpkeepNotAuthorized + errCode = encoding.HttpToErrCode(resp.StatusCode) return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by unauthorized upkeep", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode) case http.StatusBadRequest: retryable = false state = encoding.InvalidMercuryRequest + errCode = encoding.HttpToErrCode(resp.StatusCode) return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by invalid format of timestamp", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode) case http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout: retryable = true state = encoding.MercuryFlakyFailure + errCode = encoding.HttpToErrCode(resp.StatusCode) return fmt.Errorf("%d", resp.StatusCode) case http.StatusPartialContent: // TODO (AUTO-5044): handle response code 206 entirely with errors field parsing c.lggr.Warnf("at timestamp %s upkeep %s requested [%s] feeds but mercury v0.3 server returned 206 status, treating it as 404 and retrying", sl.Time.String(), sl.UpkeepId.String(), sl.Feeds) retryable = true state = encoding.MercuryFlakyFailure + errCode = encoding.HttpToErrCode(resp.StatusCode) return fmt.Errorf("%d", http.StatusPartialContent) case http.StatusOK: // continue default: retryable = false state = encoding.InvalidMercuryRequest + errCode = encoding.ErrCodeBadRequest return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode) } c.lggr.Debugf("at block %s upkeep %s received status code %d from mercury v0.3 with BODY=%s", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode, hexutil.Encode(body)) @@ -196,6 +204,7 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur c.lggr.Warnf("at timestamp %s upkeep %s mercury v0.3 server returned 206 status with [%s] reports while we requested [%s] feeds, retrying", sl.Time.String(), sl.UpkeepId.String(), receivedFeeds, sl.Feeds) retryable = true state = encoding.MercuryFlakyFailure + errCode = encoding.HttpToErrCode(http.StatusPartialContent) return fmt.Errorf("%d", http.StatusNotFound) } var reportBytes [][]byte @@ -233,6 +242,7 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur Bytes: [][]byte{}, Retryable: retryable, Error: retryErr, + ErrCode: errCode, State: state, } } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/v03_request_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/v03_request_test.go index a7742c04872..616226f1d4c 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/v03_request_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/v03_request_test.go @@ -109,6 +109,7 @@ func TestV03_DoMercuryRequestV03(t *testing.T) { expectedValues [][]byte expectedRetryable bool expectedRetryInterval time.Duration + expectedErrCode encoding.ErrCode expectedError error state encoding.PipelineExecutionState reason encoding.UpkeepFailureReason @@ -163,13 +164,14 @@ func TestV03_DoMercuryRequestV03(t *testing.T) { } c.httpClient = hc - state, reason, values, retryable, retryInterval, reqErr := c.DoRequest(testutils.Context(t), tt.lookup, tt.pluginRetryKey) + state, reason, values, retryable, retryInterval, errCode, reqErr := c.DoRequest(testutils.Context(t), tt.lookup, tt.pluginRetryKey) assert.Equal(t, tt.expectedValues, values) assert.Equal(t, tt.expectedRetryable, retryable) assert.Equal(t, tt.expectedRetryInterval, retryInterval) assert.Equal(t, tt.state, state) assert.Equal(t, tt.reason, reason) + assert.Equal(t, tt.expectedErrCode, errCode) if tt.expectedError != nil { assert.Equal(t, tt.expectedError.Error(), reqErr.Error()) } From dcacbdc9b7a4856c22e863e7abe7a56949569f67 Mon Sep 17 00:00:00 2001 From: amirylm Date: Wed, 14 Feb 2024 20:21:28 +0200 Subject: [PATCH 06/11] added err code for encoding --- .../plugins/ocr2keeper/evmregistry/v21/encoding/interface.go | 1 + .../plugins/ocr2keeper/evmregistry/v21/mercury/v02/request.go | 2 ++ .../plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go | 2 ++ 3 files changed, 5 insertions(+) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go index a794791c5e8..f018472e34a 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go @@ -54,6 +54,7 @@ const ( ErrCodeMercuryError ErrCode = 808500 ErrCodeBadRequest ErrCode = 800400 ErrCodeUnauthorized ErrCode = 800401 + ErrCodeEncodingError ErrCode = 808600 ErrCodeStreamLookupTimeout ErrCode = 808603 ) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/request.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/request.go index 0c79f0aefd3..129210378b2 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/request.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/request.go @@ -166,11 +166,13 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur if err = json.Unmarshal(responseBody, &m); err != nil { c.lggr.Warnf("at block %s upkeep %s failed to unmarshal body to MercuryV02Response for feed %s: %v", sl.Time.String(), sl.UpkeepId.String(), sl.Feeds[index], err) state = encoding.MercuryUnmarshalError + errCode = encoding.ErrCodeEncodingError return err } if blobBytes, err = hexutil.Decode(m.ChainlinkBlob); err != nil { c.lggr.Warnf("at block %s upkeep %s failed to decode chainlinkBlob %s for feed %s: %v", sl.Time.String(), sl.UpkeepId.String(), m.ChainlinkBlob, sl.Feeds[index], err) state = encoding.InvalidMercuryResponse + errCode = encoding.ErrCodeEncodingError return err } ch <- mercury.MercuryData{ diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go index 9e5d32bcb82..797643a3505 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go @@ -191,6 +191,7 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur c.lggr.Warnf("at timestamp %s upkeep %s failed to unmarshal body to MercuryV03Response from mercury v0.3: %v", sl.Time.String(), sl.UpkeepId.String(), err) retryable = false state = encoding.MercuryUnmarshalError + errCode = encoding.ErrCodeEncodingError return err } @@ -214,6 +215,7 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur c.lggr.Warnf("at timestamp %s upkeep %s failed to decode reportBlob %s: %v", sl.Time.String(), sl.UpkeepId.String(), rsp.FullReport, err) retryable = false state = encoding.InvalidMercuryResponse + errCode = encoding.ErrCodeEncodingError return err } reportBytes = append(reportBytes, b) From 16f0b1436e8f3145646e4c40cd5e01c4b0465f72 Mon Sep 17 00:00:00 2001 From: amirylm Date: Thu, 15 Feb 2024 09:59:44 +0200 Subject: [PATCH 07/11] review fixes: - typo - using encoding.ErrCodeNil instead of 0 - rename ErrCodeMercuryError -> ErrCodeDataStreamsError --- .../ocr2keeper/evmregistry/v21/encoding/interface.go | 8 ++++---- .../ocr2keeper/evmregistry/v21/mercury/v02/request.go | 2 +- .../evmregistry/v21/mercury/v02/v02_request_test.go | 6 +++--- .../ocr2keeper/evmregistry/v21/mercury/v03/request.go | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go index f018472e34a..9e7db5d4c5d 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go @@ -50,8 +50,8 @@ type ErrCode uint32 const ( ErrCodeNil ErrCode = 0 - ErrCodePartielContent ErrCode = 800206 - ErrCodeMercuryError ErrCode = 808500 + ErrCodePartialContent ErrCode = 800206 + ErrCodeDataStreamsError ErrCode = 808500 ErrCodeBadRequest ErrCode = 800400 ErrCodeUnauthorized ErrCode = 800401 ErrCodeEncodingError ErrCode = 808600 @@ -63,13 +63,13 @@ func HttpToErrCode(statusCode int) ErrCode { case http.StatusOK: return ErrCodeNil case http.StatusPartialContent: - return ErrCodePartielContent + return ErrCodePartialContent case http.StatusBadRequest: return ErrCodeBadRequest case http.StatusUnauthorized: return ErrCodeUnauthorized case http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout: - return ErrCodeMercuryError + return ErrCodeDataStreamsError default: return 0 } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/request.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/request.go index 129210378b2..5f8e91377a2 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/request.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/request.go @@ -57,7 +57,7 @@ func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLo resultLen := len(streamsLookup.Feeds) ch := make(chan mercury.MercuryData, resultLen) if len(streamsLookup.Feeds) == 0 { - return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, 0, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", streamsLookup.FeedParamKey, streamsLookup.TimeParamKey, streamsLookup.Feeds) + return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, encoding.ErrCodeNil, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", streamsLookup.FeedParamKey, streamsLookup.TimeParamKey, streamsLookup.Feeds) } for i := range streamsLookup.Feeds { // TODO (AUTO-7209): limit the number of concurrent requests diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/v02_request_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/v02_request_test.go index afa0d02781b..ac8cbfb866b 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/v02_request_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/v02_request_test.go @@ -331,7 +331,7 @@ func TestV02_DoMercuryRequestV02(t *testing.T) { expectedRetryable: true, pluginRetries: 0, expectedRetryInterval: 1 * time.Second, - expectedErrCode: encoding.ErrCodeMercuryError, + expectedErrCode: encoding.ErrCodeDataStreamsError, expectedError: errors.New("failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 500\n#2: 500\n#3: 500"), state: encoding.MercuryFlakyFailure, }, @@ -353,7 +353,7 @@ func TestV02_DoMercuryRequestV02(t *testing.T) { expectedValues: [][]byte{nil}, expectedRetryable: true, expectedRetryInterval: 5 * time.Second, - expectedErrCode: encoding.ErrCodeMercuryError, + expectedErrCode: encoding.ErrCodeDataStreamsError, expectedError: errors.New("failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 500\n#2: 500\n#3: 500"), state: encoding.MercuryFlakyFailure, }, @@ -374,7 +374,7 @@ func TestV02_DoMercuryRequestV02(t *testing.T) { mockChainlinkBlobs: []string{"0x00066dfcd1ed2d95b18c948dbc5bd64c687afe93e4ca7d663ddec14c20090ad80000000000000000000000000000000000000000000000000000000000081401000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000002200000000000000000000000000000000000000000000000000000000000000280000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001204554482d5553442d415242495452554d2d544553544e455400000000000000000000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000289ad8d367000000000000000000000000000000000000000000000000000000289acf0b38000000000000000000000000000000000000000000000000000000289b3da40000000000000000000000000000000000000000000000000000000000018ae7ce74d9fa252a8983976eab600dc7590c778d04813430841bc6e765c34cd81a168d00000000000000000000000000000000000000000000000000000000018ae7cb0000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000000000000260412b94e525ca6cedc9f544fd86f77606d52fe731a5d069dbe836a8bfc0fb8c911963b0ae7a14971f3b4621bffb802ef0605392b9a6c89c7fab1df8633a5ade00000000000000000000000000000000000000000000000000000000000000024500c2f521f83fba5efc2bf3effaaedde43d0a4adff785c1213b712a3aed0d8157642a84324db0cf9695ebd27708d4608eb0337e0dd87b0e43f0fa70c700d911"}, expectedValues: [][]byte{nil}, expectedRetryInterval: mercury.RetryIntervalTimeout, - expectedErrCode: encoding.ErrCodeMercuryError, + expectedErrCode: encoding.ErrCodeDataStreamsError, expectedRetryable: true, expectedError: errors.New("failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 500\n#2: 500\n#3: 500"), state: encoding.MercuryFlakyFailure, diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go index 797643a3505..7e90a2f547c 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go @@ -63,7 +63,7 @@ func NewClient(mercuryConfig mercury.MercuryConfigProvider, httpClient mercury.H func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, encoding.ErrCode, error) { if len(streamsLookup.Feeds) == 0 { - return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, 0, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", streamsLookup.FeedParamKey, streamsLookup.TimeParamKey, streamsLookup.Feeds) + return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, encoding.ErrCodeNil, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", streamsLookup.FeedParamKey, streamsLookup.TimeParamKey, streamsLookup.Feeds) } resultLen := 1 // Only 1 multi-feed request is made for all feeds ch := make(chan mercury.MercuryData, resultLen) From e2a5a00239549ddd8d79b3c06a3200f0be8a4d95 Mon Sep 17 00:00:00 2001 From: amirylm Date: Thu, 15 Feb 2024 10:45:26 +0200 Subject: [PATCH 08/11] fix err codes numbers --- .../ocr2keeper/evmregistry/v21/encoding/interface.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go index 9e7db5d4c5d..391b6b7790d 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go @@ -50,10 +50,10 @@ type ErrCode uint32 const ( ErrCodeNil ErrCode = 0 - ErrCodePartialContent ErrCode = 800206 + ErrCodePartialContent ErrCode = 808206 ErrCodeDataStreamsError ErrCode = 808500 - ErrCodeBadRequest ErrCode = 800400 - ErrCodeUnauthorized ErrCode = 800401 + ErrCodeBadRequest ErrCode = 808400 + ErrCodeUnauthorized ErrCode = 808401 ErrCodeEncodingError ErrCode = 808600 ErrCodeStreamLookupTimeout ErrCode = 808603 ) From 1dd48445b47c123611ea68e707cafaa520044863 Mon Sep 17 00:00:00 2001 From: amirylm Date: Thu, 15 Feb 2024 18:26:33 +0200 Subject: [PATCH 09/11] comments --- .../ocr2keeper/evmregistry/v21/encoding/interface.go | 2 +- .../ocr2keeper/evmregistry/v21/mercury/mercury.go | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go index 391b6b7790d..dc379217d83 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go @@ -45,7 +45,7 @@ const ( UpkeepNotAuthorized PipelineExecutionState = 9 ) -// ErrCode is used for invoking an error handler with a specific error code +// ErrCode is used for invoking an error handler with a specific error code. type ErrCode uint32 const ( diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/mercury.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/mercury.go index 472d3b22204..f951072003b 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/mercury.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/mercury.go @@ -87,6 +87,16 @@ type HttpClient interface { } type MercuryClient interface { + // DoRequest makes a data stream request, it manages retries and returns the following: + // state: the state of the pipeline execution, used by our components. + // upkeepFailureReason: the reason for the upkeep failure, used by our components. + // data: the data returned from the data stream. + // retryable: whether the request is retryable. + // retryInterval: the interval to wait before retrying the request, or RetryIntervalTimeout if no more retries are allowed. + // errCode: the error code of the request, to be passed to the user's error handler if applicable. + // error: the raw error that occurred during the request. + // + // Exploratory: consider to merge state/failureReason/errCode into a single object DoRequest(ctx context.Context, streamsLookup *StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, encoding.ErrCode, error) } From 8a677452cfc02fac379592f864ef8f82880d2085 Mon Sep 17 00:00:00 2001 From: amirylm Date: Thu, 15 Feb 2024 18:27:39 +0200 Subject: [PATCH 10/11] added function and tests for handling err code --- .../v21/mercury/streams/streams.go | 42 ++++++++--- .../v21/mercury/streams/streams_test.go | 72 +++++++++++++++++++ 2 files changed, 105 insertions(+), 9 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go index 47b4b905e1c..79526daaa9e 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go @@ -257,16 +257,13 @@ func (s *streams) DoMercuryRequest(ctx context.Context, lookup *mercury.StreamsL // in case of retry timeout, setting retryable to false checkResults[i].Retryable = false } - upkeepType := core.GetUpkeepType(checkResults[i].UpkeepID) - switch upkeepType { - case types.LogTrigger: - if retryTimeout && errCode > encoding.ErrCodeNil { - s.lggr.Debugf("at block %d upkeep %s requested time %s doMercuryRequest err: %s, errCode: %d", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error(), errCode) - // TODO: prepare for error handler, make sure to pass errCode - } - default: + errValues, eCodeErr := s.handleErrCode(&checkResults[i], errCode, err) + if eCodeErr != nil { + return nil, eCodeErr } - return nil, err + values = errValues // TODO: revisit this line once we have error handler + s.lggr.Debugf("at block %d upkeep %s requested time %s doMercuryRequest err: %s, errCode: %d", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error(), errCode) + return nil, err // TODO: remove this line once we have error handler } for j, v := range values { @@ -275,6 +272,33 @@ func (s *streams) DoMercuryRequest(ctx context.Context, lookup *mercury.StreamsL return values, nil } +// TODO: complete this function for preparing values for error handler +func (s *streams) handleErrCode(result *ocr2keepers.CheckResult, errCode encoding.ErrCode, err error) ([][]byte, error) { + upkeepType := core.GetUpkeepType(result.UpkeepID) + switch upkeepType { + case types.LogTrigger: + switch errCode { + case encoding.ErrCodePartialContent, encoding.ErrCodeDataStreamsError: + if result.RetryInterval != mercury.RetryIntervalTimeout { + return nil, err + } + case encoding.ErrCodeBadRequest, encoding.ErrCodeUnauthorized, encoding.ErrCodeEncodingError: + default: + return nil, err + } + case types.ConditionTrigger: + // switch errCode { + // case encoding.ErrCodePartialContent, encoding.ErrCodeDataStreamsError, encoding.ErrCodeBadRequest, encoding.ErrCodeUnauthorized, encoding.ErrCodeEncodingError: + // default: + // return nil, err // TODO: encomment this line once we have error handler + // } + default: + return nil, err + } + // TODO: prepare values for error handler + return [][]byte{}, nil +} + // AllowedToUseMercury retrieves upkeep's administrative offchain config and decode a mercuryEnabled bool to indicate if // this upkeep is allowed to use Mercury service. func (s *streams) AllowedToUseMercury(opts *bind.CallOpts, upkeepId *big.Int) (state encoding.PipelineExecutionState, reason encoding.UpkeepFailureReason, retryable bool, allow bool, err error) { diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams_test.go index c7bff2eac7a..87807a421bd 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams_test.go @@ -11,6 +11,8 @@ import ( "testing" "time" + clatypes "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" + "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/pkg/errors" @@ -28,6 +30,7 @@ import ( iregistry21 "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/i_keeper_registry_master_wrapper_2_1" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury" v02 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02" @@ -820,3 +823,72 @@ func TestStreams_StreamsLookup(t *testing.T) { }) } } + +func Test_HandleErrCode(t *testing.T) { + partialContentErr := errors.New("206") + + tests := []struct { + name string + checkResult *ocr2keepers.CheckResult + errCode encoding.ErrCode + err error + expectedValues [][]byte + expectedErr error + }{ + { + name: "no error", + checkResult: &ocr2keepers.CheckResult{ + UpkeepID: core.GenUpkeepID(clatypes.LogTrigger, "111"), + }, + errCode: encoding.ErrCodeNil, + err: nil, + expectedValues: [][]byte{}, + expectedErr: nil, + }, + { + name: "error code bad request", + checkResult: &ocr2keepers.CheckResult{ + UpkeepID: core.GenUpkeepID(clatypes.LogTrigger, "111"), + }, + errCode: encoding.ErrCodeBadRequest, + err: errors.New("400"), + expectedValues: [][]byte{}, + expectedErr: nil, + }, + { + name: "error code partial content with retry timeout", + checkResult: &ocr2keepers.CheckResult{ + UpkeepID: core.GenUpkeepID(clatypes.LogTrigger, "111"), + RetryInterval: mercury.RetryIntervalTimeout, + Retryable: true, + }, + errCode: encoding.ErrCodePartialContent, + err: errors.New("206"), + expectedValues: [][]byte{}, + expectedErr: nil, + }, + { + name: "error code partial content without retry timeout", + checkResult: &ocr2keepers.CheckResult{ + UpkeepID: core.GenUpkeepID(clatypes.LogTrigger, "111"), + RetryInterval: time.Second, + Retryable: true, + }, + errCode: encoding.ErrCodePartialContent, + err: partialContentErr, + expectedValues: nil, + expectedErr: partialContentErr, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := setupStreams(t) + defer s.Close() + + values, err := s.handleErrCode(tt.checkResult, tt.errCode, tt.err) + assert.Equal(t, len(tt.expectedValues), len(values)) + assert.Equal(t, tt.expectedErr, err) + }) + } +} From 0e52650b5264321a6a2253386401547d9b1393e0 Mon Sep 17 00:00:00 2001 From: amirylm Date: Thu, 15 Feb 2024 19:17:52 +0200 Subject: [PATCH 11/11] lint --- .../ocr2keeper/evmregistry/v21/mercury/streams/streams.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go index 79526daaa9e..98984702cea 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go @@ -257,11 +257,10 @@ func (s *streams) DoMercuryRequest(ctx context.Context, lookup *mercury.StreamsL // in case of retry timeout, setting retryable to false checkResults[i].Retryable = false } - errValues, eCodeErr := s.handleErrCode(&checkResults[i], errCode, err) + _, eCodeErr := s.handleErrCode(&checkResults[i], errCode, err) if eCodeErr != nil { return nil, eCodeErr } - values = errValues // TODO: revisit this line once we have error handler s.lggr.Debugf("at block %d upkeep %s requested time %s doMercuryRequest err: %s, errCode: %d", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error(), errCode) return nil, err // TODO: remove this line once we have error handler }