Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop retry flow for error handling (log trigger) #12026

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package encoding

import (
"net/http"

ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation"

"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/automation_utils_2_1"
Expand Down Expand Up @@ -43,6 +45,36 @@ const (
UpkeepNotAuthorized PipelineExecutionState = 9
)

// ErrCode is used for invoking an error handler with a specific error code
type ErrCode uint32

const (
ErrCodeNil ErrCode = 0
ErrCodePartielContent ErrCode = 800206
amirylm marked this conversation as resolved.
Show resolved Hide resolved
ErrCodeMercuryError ErrCode = 808500
amirylm marked this conversation as resolved.
Show resolved Hide resolved
ErrCodeBadRequest ErrCode = 800400
ErrCodeUnauthorized ErrCode = 800401
ErrCodeEncodingError ErrCode = 808600
ErrCodeStreamLookupTimeout ErrCode = 808603
)

func HttpToErrCode(statusCode int) ErrCode {
switch statusCode {
case http.StatusOK:
return ErrCodeNil
case http.StatusPartialContent:
return ErrCodePartielContent
case http.StatusBadRequest:
return ErrCodeBadRequest
case http.StatusUnauthorized:
return ErrCodeUnauthorized
case http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout:
return ErrCodeMercuryError
default:
return 0
}
}

type UpkeepInfo = iregistry21.KeeperRegistryBase21UpkeepInfo

type Packer interface {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ const (
BlockNumber = "blockNumber" // valid for v0.2
Timestamp = "timestamp" // valid for v0.3
totalFastPluginRetries = 5
totalMediumPluginRetries = 10
totalMediumPluginRetries = totalFastPluginRetries + 1
RetryIntervalTimeout = time.Duration(-1)
)

var GenerateHMACFn = func(method string, path string, body []byte, clientId string, secret string, ts int64) string {
Expand All @@ -45,8 +46,7 @@ var GenerateHMACFn = func(method string, path string, body []byte, clientId stri
}

// CalculateRetryConfig returns plugin retry interval based on how many times plugin has retried this work
var CalculateRetryConfigFn = func(prk string, mercuryConfig MercuryConfigProvider) time.Duration {
var retryInterval time.Duration
var CalculateRetryConfigFn = func(prk string, mercuryConfig MercuryConfigProvider) (retryInterval time.Duration) {
var retries int
totalAttempts, ok := mercuryConfig.GetPluginRetry(prk)
if ok {
Expand All @@ -55,9 +55,9 @@ var CalculateRetryConfigFn = func(prk string, mercuryConfig MercuryConfigProvide
retryInterval = 1 * time.Second
} else if retries < totalMediumPluginRetries {
retryInterval = 5 * time.Second
} else {
retryInterval = RetryIntervalTimeout
}
// if the core node has retried totalMediumPluginRetries times, do not set retry interval and plugin will use
// the default interval
} else {
retryInterval = 1 * time.Second
}
Expand All @@ -68,6 +68,7 @@ var CalculateRetryConfigFn = func(prk string, mercuryConfig MercuryConfigProvide
type MercuryData struct {
Index int
Error error
ErrCode encoding.ErrCode
Retryable bool
Bytes [][]byte
State encoding.PipelineExecutionState
Expand All @@ -86,7 +87,7 @@ type HttpClient interface {
}

type MercuryClient interface {
DoRequest(ctx context.Context, streamsLookup *StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, error)
DoRequest(ctx context.Context, streamsLookup *StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, encoding.ErrCode, error)
amirylm marked this conversation as resolved.
Show resolved Hide resolved
}

type StreamsLookupError struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import (
"errors"
"math/big"
"testing"
"time"

"github.com/patrickmn/go-cache"
"github.com/stretchr/testify/require"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/stretchr/testify/assert"

"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding"
)

Expand Down Expand Up @@ -240,3 +243,77 @@ func TestPacker_UnpackCheckCallbackResult(t *testing.T) {
})
}
}

func Test_CalculateRetryConfigFn(t *testing.T) {
tests := []struct {
name string
times int
expected time.Duration
}{
{
name: "first retry",
times: 1,
expected: 1 * time.Second,
},
{
name: "second retry",
times: 2,
expected: 1 * time.Second,
},
{
name: "fifth retry",
times: 5,
expected: 1 * time.Second,
},
{
name: "sixth retry",
times: 6,
expected: 5 * time.Second,
},
{
name: "timeout",
times: totalMediumPluginRetries + 1,
expected: RetryIntervalTimeout,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
cfg := newMercuryConfigMock()
var result time.Duration
for i := 0; i < tc.times; i++ {
result = CalculateRetryConfigFn("prk", cfg)
}
assert.Equal(t, tc.expected, result)
})
}
}

type mercuryConfigMock struct {
pluginRetryCache *cache.Cache
}

func newMercuryConfigMock() *mercuryConfigMock {
return &mercuryConfigMock{
pluginRetryCache: cache.New(10*time.Second, time.Minute),
}
}

func (c *mercuryConfigMock) Credentials() *types.MercuryCredentials {
return nil
}

func (c *mercuryConfigMock) IsUpkeepAllowed(k string) (interface{}, bool) {
return nil, false
}

func (c *mercuryConfigMock) SetUpkeepAllowed(k string, v interface{}, d time.Duration) {
}

func (c *mercuryConfigMock) GetPluginRetry(k string) (interface{}, bool) {
return c.pluginRetryCache.Get(k)
}

func (c *mercuryConfigMock) SetPluginRetry(k string, v interface{}, d time.Duration) {
c.pluginRetryCache.Set(k, v, d)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/patrickmn/go-cache"

"github.com/smartcontractkit/chainlink-automation/pkg/v3/types"
"github.com/smartcontractkit/chainlink-common/pkg/services"
ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation"

Expand Down Expand Up @@ -237,20 +238,34 @@ func (s *streams) CheckCallback(ctx context.Context, values [][]byte, lookup *me
}

func (s *streams) DoMercuryRequest(ctx context.Context, lookup *mercury.StreamsLookup, checkResults []ocr2keepers.CheckResult, i int) ([][]byte, error) {
state, reason, values, retryable, retryInterval, err := encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0*time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", lookup.FeedParamKey, lookup.TimeParamKey, lookup.Feeds)
state, reason, values, retryable, retryInterval, errCode, err := encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0*time.Second, encoding.ErrCodeNil, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", lookup.FeedParamKey, lookup.TimeParamKey, lookup.Feeds)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these initial values used anywhere? I see we always overwrite them with either 0.2.DoRequest or 0.3.DoRequest, maybe simple to just declare them as vars?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could have declare some of the vars, but some has initial value and thats why they are defined like this

pluginRetryKey := generatePluginRetryKey(checkResults[i].WorkID, lookup.Block)

if lookup.IsMercuryV02() {
state, reason, values, retryable, retryInterval, err = s.v02Client.DoRequest(ctx, lookup, pluginRetryKey)
state, reason, values, retryable, retryInterval, errCode, err = s.v02Client.DoRequest(ctx, lookup, pluginRetryKey)
} else if lookup.IsMercuryV03() {
state, reason, values, retryable, retryInterval, err = s.v03Client.DoRequest(ctx, lookup, pluginRetryKey)
state, reason, values, retryable, retryInterval, errCode, err = s.v03Client.DoRequest(ctx, lookup, pluginRetryKey)
}

if err != nil {
checkResults[i].Retryable = retryable
checkResults[i].RetryInterval = retryInterval
checkResults[i].PipelineExecutionState = uint8(state)
checkResults[i].IneligibilityReason = uint8(reason)
retryTimeout := retryInterval == mercury.RetryIntervalTimeout
if retryTimeout {
// in case of retry timeout, setting retryable to false
checkResults[i].Retryable = false
}
amirylm marked this conversation as resolved.
Show resolved Hide resolved
upkeepType := core.GetUpkeepType(checkResults[i].UpkeepID)
switch upkeepType {
case types.LogTrigger:
if retryTimeout && errCode > encoding.ErrCodeNil {
s.lggr.Debugf("at block %d upkeep %s requested time %s doMercuryRequest err: %s, errCode: %d", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error(), errCode)
// TODO: prepare for error handler, make sure to pass errCode
}
default:
}
return nil, err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ func NewClient(mercuryConfig mercury.MercuryConfigProvider, httpClient mercury.H
}
}

func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, error) {
func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, encoding.ErrCode, error) {
resultLen := len(streamsLookup.Feeds)
ch := make(chan mercury.MercuryData, resultLen)
if len(streamsLookup.Feeds) == 0 {
return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", streamsLookup.FeedParamKey, streamsLookup.TimeParamKey, streamsLookup.Feeds)
return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, 0, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", streamsLookup.FeedParamKey, streamsLookup.TimeParamKey, streamsLookup.Feeds)
}
for i := range streamsLookup.Feeds {
// TODO (AUTO-7209): limit the number of concurrent requests
Expand All @@ -69,6 +69,7 @@ func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLo

var reqErr error
var retryInterval time.Duration
var errCode encoding.ErrCode
results := make([][]byte, len(streamsLookup.Feeds))
retryable := true
allSuccess := true
Expand All @@ -79,6 +80,7 @@ func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLo
if m.Error != nil {
reqErr = errors.Join(reqErr, m.Error)
retryable = retryable && m.Retryable
errCode = m.ErrCode
allSuccess = false
if m.State != encoding.NoPipelineError {
state = m.State
Expand All @@ -91,7 +93,7 @@ func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLo
retryInterval = mercury.CalculateRetryConfigFn(pluginRetryKey, c.mercuryConfig)
}
// only retry when not all successful AND none are not retryable
return state, encoding.UpkeepFailureReasonNone, results, retryable && !allSuccess, retryInterval, reqErr
return state, encoding.UpkeepFailureReasonNone, results, retryable && !allSuccess, retryInterval, errCode, reqErr
}

func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.MercuryData, index int, sl *mercury.StreamsLookup) {
Expand Down Expand Up @@ -120,6 +122,7 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur
httpRequest.Header.Set(signatureHeader, signature)

// in the case of multiple retries here, use the last attempt's data
errCode := encoding.ErrCodeNil
state := encoding.NoPipelineError
retryable := false
sent := false
Expand Down Expand Up @@ -148,6 +151,7 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur
c.lggr.Warnf("at block %s upkeep %s received status code %d for feed %s", sl.Time.String(), sl.UpkeepId.String(), httpResponse.StatusCode, sl.Feeds[index])
retryable = true
state = encoding.MercuryFlakyFailure
errCode = encoding.HttpToErrCode(httpResponse.StatusCode)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this errCode is only updated for this group of status? why not move this assignment outside of switch in L149?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want the error code just in specific scenarios, so just to be sure I've preferred to do it specifically when needed.

return errors.New(strconv.FormatInt(int64(httpResponse.StatusCode), 10))
case http.StatusOK:
// continue
Expand All @@ -162,11 +166,13 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur
if err = json.Unmarshal(responseBody, &m); err != nil {
c.lggr.Warnf("at block %s upkeep %s failed to unmarshal body to MercuryV02Response for feed %s: %v", sl.Time.String(), sl.UpkeepId.String(), sl.Feeds[index], err)
state = encoding.MercuryUnmarshalError
errCode = encoding.ErrCodeEncodingError
return err
}
if blobBytes, err = hexutil.Decode(m.ChainlinkBlob); err != nil {
c.lggr.Warnf("at block %s upkeep %s failed to decode chainlinkBlob %s for feed %s: %v", sl.Time.String(), sl.UpkeepId.String(), m.ChainlinkBlob, sl.Feeds[index], err)
state = encoding.InvalidMercuryResponse
errCode = encoding.ErrCodeEncodingError
return err
}
ch <- mercury.MercuryData{
Expand All @@ -193,6 +199,7 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur
Bytes: [][]byte{},
Retryable: retryable,
Error: fmt.Errorf("failed to request feed for %s: %w", sl.Feeds[index], retryErr),
ErrCode: errCode,
State: state,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ func TestV02_DoMercuryRequestV02(t *testing.T) {
expectedValues [][]byte
expectedRetryable bool
expectedRetryInterval time.Duration
expectedErrCode encoding.ErrCode
expectedError error
state encoding.PipelineExecutionState
reason encoding.UpkeepFailureReason
Expand Down Expand Up @@ -330,6 +331,7 @@ func TestV02_DoMercuryRequestV02(t *testing.T) {
expectedRetryable: true,
pluginRetries: 0,
expectedRetryInterval: 1 * time.Second,
expectedErrCode: encoding.ErrCodeMercuryError,
expectedError: errors.New("failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 500\n#2: 500\n#3: 500"),
state: encoding.MercuryFlakyFailure,
},
Expand All @@ -351,6 +353,7 @@ func TestV02_DoMercuryRequestV02(t *testing.T) {
expectedValues: [][]byte{nil},
expectedRetryable: true,
expectedRetryInterval: 5 * time.Second,
expectedErrCode: encoding.ErrCodeMercuryError,
expectedError: errors.New("failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 500\n#2: 500\n#3: 500"),
state: encoding.MercuryFlakyFailure,
},
Expand All @@ -366,13 +369,15 @@ func TestV02_DoMercuryRequestV02(t *testing.T) {
},
UpkeepId: upkeepId,
},
pluginRetries: 10,
mockHttpStatusCode: http.StatusInternalServerError,
mockChainlinkBlobs: []string{"0x00066dfcd1ed2d95b18c948dbc5bd64c687afe93e4ca7d663ddec14c20090ad80000000000000000000000000000000000000000000000000000000000081401000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000002200000000000000000000000000000000000000000000000000000000000000280000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001204554482d5553442d415242495452554d2d544553544e455400000000000000000000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000289ad8d367000000000000000000000000000000000000000000000000000000289acf0b38000000000000000000000000000000000000000000000000000000289b3da40000000000000000000000000000000000000000000000000000000000018ae7ce74d9fa252a8983976eab600dc7590c778d04813430841bc6e765c34cd81a168d00000000000000000000000000000000000000000000000000000000018ae7cb0000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000000000000260412b94e525ca6cedc9f544fd86f77606d52fe731a5d069dbe836a8bfc0fb8c911963b0ae7a14971f3b4621bffb802ef0605392b9a6c89c7fab1df8633a5ade00000000000000000000000000000000000000000000000000000000000000024500c2f521f83fba5efc2bf3effaaedde43d0a4adff785c1213b712a3aed0d8157642a84324db0cf9695ebd27708d4608eb0337e0dd87b0e43f0fa70c700d911"},
expectedValues: [][]byte{nil},
expectedRetryable: true,
expectedError: errors.New("failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 500\n#2: 500\n#3: 500"),
state: encoding.MercuryFlakyFailure,
pluginRetries: 10,
mockHttpStatusCode: http.StatusInternalServerError,
mockChainlinkBlobs: []string{"0x00066dfcd1ed2d95b18c948dbc5bd64c687afe93e4ca7d663ddec14c20090ad80000000000000000000000000000000000000000000000000000000000081401000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000002200000000000000000000000000000000000000000000000000000000000000280000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001204554482d5553442d415242495452554d2d544553544e455400000000000000000000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000289ad8d367000000000000000000000000000000000000000000000000000000289acf0b38000000000000000000000000000000000000000000000000000000289b3da40000000000000000000000000000000000000000000000000000000000018ae7ce74d9fa252a8983976eab600dc7590c778d04813430841bc6e765c34cd81a168d00000000000000000000000000000000000000000000000000000000018ae7cb0000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000000000000260412b94e525ca6cedc9f544fd86f77606d52fe731a5d069dbe836a8bfc0fb8c911963b0ae7a14971f3b4621bffb802ef0605392b9a6c89c7fab1df8633a5ade00000000000000000000000000000000000000000000000000000000000000024500c2f521f83fba5efc2bf3effaaedde43d0a4adff785c1213b712a3aed0d8157642a84324db0cf9695ebd27708d4608eb0337e0dd87b0e43f0fa70c700d911"},
expectedValues: [][]byte{nil},
expectedRetryInterval: mercury.RetryIntervalTimeout,
expectedErrCode: encoding.ErrCodeMercuryError,
expectedRetryable: true,
expectedError: errors.New("failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 500\n#2: 500\n#3: 500"),
state: encoding.MercuryFlakyFailure,
},
{
name: "failure - not retryable",
Expand Down Expand Up @@ -451,16 +456,18 @@ func TestV02_DoMercuryRequestV02(t *testing.T) {
}
c.httpClient = hc

state, reason, values, retryable, retryInterval, reqErr := c.DoRequest(testutils.Context(t), tt.lookup, tt.pluginRetryKey)
state, reason, values, retryable, retryInterval, errCode, reqErr := c.DoRequest(testutils.Context(t), tt.lookup, tt.pluginRetryKey)
assert.Equal(t, tt.expectedValues, values)
assert.Equal(t, tt.expectedRetryable, retryable)
if retryable {
newRetries, _ := c.mercuryConfig.GetPluginRetry(tt.pluginRetryKey)
assert.Equal(t, tt.pluginRetries+1, newRetries.(int))
}
assert.Equal(t, tt.expectedRetryInterval, retryInterval)
assert.Equal(t, tt.expectedErrCode, errCode)
assert.Equal(t, tt.state, state)
assert.Equal(t, tt.reason, reason)

if tt.expectedError != nil {
assert.True(t, strings.HasPrefix(reqErr.Error(), "failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000"))
}
Expand Down
Loading
Loading