Skip to content

Commit

Permalink
Stop retry flow for error handling (log trigger) (#12026)
Browse files Browse the repository at this point in the history
* return timeout for retry interval

* identify retryTimeout + placeholder for err handler

* err codes

* comments

* added function and tests for handling err code
  • Loading branch information
amirylm authored Feb 15, 2024
1 parent 18b185f commit 75c8094
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package encoding

import (
"net/http"

ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation"

"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/automation_utils_2_1"
Expand Down Expand Up @@ -43,6 +45,36 @@ const (
UpkeepNotAuthorized PipelineExecutionState = 9
)

// ErrCode is used for invoking an error handler with a specific error code.
type ErrCode uint32

const (
ErrCodeNil ErrCode = 0
ErrCodePartialContent ErrCode = 808206
ErrCodeDataStreamsError ErrCode = 808500
ErrCodeBadRequest ErrCode = 808400
ErrCodeUnauthorized ErrCode = 808401
ErrCodeEncodingError ErrCode = 808600
ErrCodeStreamLookupTimeout ErrCode = 808603
)

func HttpToErrCode(statusCode int) ErrCode {
switch statusCode {
case http.StatusOK:
return ErrCodeNil
case http.StatusPartialContent:
return ErrCodePartialContent
case http.StatusBadRequest:
return ErrCodeBadRequest
case http.StatusUnauthorized:
return ErrCodeUnauthorized
case http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout:
return ErrCodeDataStreamsError
default:
return 0
}
}

type UpkeepInfo = iregistry21.KeeperRegistryBase21UpkeepInfo

type Packer interface {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ const (
BlockNumber = "blockNumber" // valid for v0.2
Timestamp = "timestamp" // valid for v0.3
totalFastPluginRetries = 5
totalMediumPluginRetries = 10
totalMediumPluginRetries = totalFastPluginRetries + 1
RetryIntervalTimeout = time.Duration(-1)
)

var GenerateHMACFn = func(method string, path string, body []byte, clientId string, secret string, ts int64) string {
Expand All @@ -45,8 +46,7 @@ var GenerateHMACFn = func(method string, path string, body []byte, clientId stri
}

// CalculateRetryConfig returns plugin retry interval based on how many times plugin has retried this work
var CalculateRetryConfigFn = func(prk string, mercuryConfig MercuryConfigProvider) time.Duration {
var retryInterval time.Duration
var CalculateRetryConfigFn = func(prk string, mercuryConfig MercuryConfigProvider) (retryInterval time.Duration) {
var retries int
totalAttempts, ok := mercuryConfig.GetPluginRetry(prk)
if ok {
Expand All @@ -55,9 +55,9 @@ var CalculateRetryConfigFn = func(prk string, mercuryConfig MercuryConfigProvide
retryInterval = 1 * time.Second
} else if retries < totalMediumPluginRetries {
retryInterval = 5 * time.Second
} else {
retryInterval = RetryIntervalTimeout
}
// if the core node has retried totalMediumPluginRetries times, do not set retry interval and plugin will use
// the default interval
} else {
retryInterval = 1 * time.Second
}
Expand All @@ -68,6 +68,7 @@ var CalculateRetryConfigFn = func(prk string, mercuryConfig MercuryConfigProvide
type MercuryData struct {
Index int
Error error
ErrCode encoding.ErrCode
Retryable bool
Bytes [][]byte
State encoding.PipelineExecutionState
Expand All @@ -86,7 +87,17 @@ type HttpClient interface {
}

type MercuryClient interface {
DoRequest(ctx context.Context, streamsLookup *StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, error)
// DoRequest makes a data stream request, it manages retries and returns the following:
// state: the state of the pipeline execution, used by our components.
// upkeepFailureReason: the reason for the upkeep failure, used by our components.
// data: the data returned from the data stream.
// retryable: whether the request is retryable.
// retryInterval: the interval to wait before retrying the request, or RetryIntervalTimeout if no more retries are allowed.
// errCode: the error code of the request, to be passed to the user's error handler if applicable.
// error: the raw error that occurred during the request.
//
// Exploratory: consider to merge state/failureReason/errCode into a single object
DoRequest(ctx context.Context, streamsLookup *StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, encoding.ErrCode, error)
}

type StreamsLookupError struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import (
"errors"
"math/big"
"testing"
"time"

"github.com/patrickmn/go-cache"
"github.com/stretchr/testify/require"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/stretchr/testify/assert"

"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding"
)

Expand Down Expand Up @@ -240,3 +243,77 @@ func TestPacker_UnpackCheckCallbackResult(t *testing.T) {
})
}
}

func Test_CalculateRetryConfigFn(t *testing.T) {
tests := []struct {
name string
times int
expected time.Duration
}{
{
name: "first retry",
times: 1,
expected: 1 * time.Second,
},
{
name: "second retry",
times: 2,
expected: 1 * time.Second,
},
{
name: "fifth retry",
times: 5,
expected: 1 * time.Second,
},
{
name: "sixth retry",
times: 6,
expected: 5 * time.Second,
},
{
name: "timeout",
times: totalMediumPluginRetries + 1,
expected: RetryIntervalTimeout,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
cfg := newMercuryConfigMock()
var result time.Duration
for i := 0; i < tc.times; i++ {
result = CalculateRetryConfigFn("prk", cfg)
}
assert.Equal(t, tc.expected, result)
})
}
}

type mercuryConfigMock struct {
pluginRetryCache *cache.Cache
}

func newMercuryConfigMock() *mercuryConfigMock {
return &mercuryConfigMock{
pluginRetryCache: cache.New(10*time.Second, time.Minute),
}
}

func (c *mercuryConfigMock) Credentials() *types.MercuryCredentials {
return nil
}

func (c *mercuryConfigMock) IsUpkeepAllowed(k string) (interface{}, bool) {
return nil, false
}

func (c *mercuryConfigMock) SetUpkeepAllowed(k string, v interface{}, d time.Duration) {
}

func (c *mercuryConfigMock) GetPluginRetry(k string) (interface{}, bool) {
return c.pluginRetryCache.Get(k)
}

func (c *mercuryConfigMock) SetPluginRetry(k string, v interface{}, d time.Duration) {
c.pluginRetryCache.Set(k, v, d)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/patrickmn/go-cache"

"github.com/smartcontractkit/chainlink-automation/pkg/v3/types"
"github.com/smartcontractkit/chainlink-common/pkg/services"
ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation"

Expand Down Expand Up @@ -237,21 +238,31 @@ func (s *streams) CheckCallback(ctx context.Context, values [][]byte, lookup *me
}

func (s *streams) DoMercuryRequest(ctx context.Context, lookup *mercury.StreamsLookup, checkResults []ocr2keepers.CheckResult, i int) ([][]byte, error) {
state, reason, values, retryable, retryInterval, err := encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0*time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", lookup.FeedParamKey, lookup.TimeParamKey, lookup.Feeds)
state, reason, values, retryable, retryInterval, errCode, err := encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0*time.Second, encoding.ErrCodeNil, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", lookup.FeedParamKey, lookup.TimeParamKey, lookup.Feeds)
pluginRetryKey := generatePluginRetryKey(checkResults[i].WorkID, lookup.Block)

if lookup.IsMercuryV02() {
state, reason, values, retryable, retryInterval, err = s.v02Client.DoRequest(ctx, lookup, pluginRetryKey)
state, reason, values, retryable, retryInterval, errCode, err = s.v02Client.DoRequest(ctx, lookup, pluginRetryKey)
} else if lookup.IsMercuryV03() {
state, reason, values, retryable, retryInterval, err = s.v03Client.DoRequest(ctx, lookup, pluginRetryKey)
state, reason, values, retryable, retryInterval, errCode, err = s.v03Client.DoRequest(ctx, lookup, pluginRetryKey)
}

if err != nil {
checkResults[i].Retryable = retryable
checkResults[i].RetryInterval = retryInterval
checkResults[i].PipelineExecutionState = uint8(state)
checkResults[i].IneligibilityReason = uint8(reason)
return nil, err
retryTimeout := retryInterval == mercury.RetryIntervalTimeout
if retryTimeout {
// in case of retry timeout, setting retryable to false
checkResults[i].Retryable = false
}
_, eCodeErr := s.handleErrCode(&checkResults[i], errCode, err)
if eCodeErr != nil {
return nil, eCodeErr
}
s.lggr.Debugf("at block %d upkeep %s requested time %s doMercuryRequest err: %s, errCode: %d", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error(), errCode)
return nil, err // TODO: remove this line once we have error handler
}

for j, v := range values {
Expand All @@ -260,6 +271,33 @@ func (s *streams) DoMercuryRequest(ctx context.Context, lookup *mercury.StreamsL
return values, nil
}

// TODO: complete this function for preparing values for error handler
func (s *streams) handleErrCode(result *ocr2keepers.CheckResult, errCode encoding.ErrCode, err error) ([][]byte, error) {
upkeepType := core.GetUpkeepType(result.UpkeepID)
switch upkeepType {
case types.LogTrigger:
switch errCode {
case encoding.ErrCodePartialContent, encoding.ErrCodeDataStreamsError:
if result.RetryInterval != mercury.RetryIntervalTimeout {
return nil, err
}
case encoding.ErrCodeBadRequest, encoding.ErrCodeUnauthorized, encoding.ErrCodeEncodingError:
default:
return nil, err
}
case types.ConditionTrigger:
// switch errCode {
// case encoding.ErrCodePartialContent, encoding.ErrCodeDataStreamsError, encoding.ErrCodeBadRequest, encoding.ErrCodeUnauthorized, encoding.ErrCodeEncodingError:
// default:
// return nil, err // TODO: encomment this line once we have error handler
// }
default:
return nil, err
}
// TODO: prepare values for error handler
return [][]byte{}, nil
}

// AllowedToUseMercury retrieves upkeep's administrative offchain config and decode a mercuryEnabled bool to indicate if
// this upkeep is allowed to use Mercury service.
func (s *streams) AllowedToUseMercury(opts *bind.CallOpts, upkeepId *big.Int) (state encoding.PipelineExecutionState, reason encoding.UpkeepFailureReason, retryable bool, allow bool, err error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"testing"
"time"

clatypes "github.com/smartcontractkit/chainlink-automation/pkg/v3/types"

"github.com/smartcontractkit/chainlink-common/pkg/types"

"github.com/pkg/errors"
Expand All @@ -28,6 +30,7 @@ import (
iregistry21 "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/i_keeper_registry_master_wrapper_2_1"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury"
v02 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02"
Expand Down Expand Up @@ -820,3 +823,72 @@ func TestStreams_StreamsLookup(t *testing.T) {
})
}
}

func Test_HandleErrCode(t *testing.T) {
partialContentErr := errors.New("206")

tests := []struct {
name string
checkResult *ocr2keepers.CheckResult
errCode encoding.ErrCode
err error
expectedValues [][]byte
expectedErr error
}{
{
name: "no error",
checkResult: &ocr2keepers.CheckResult{
UpkeepID: core.GenUpkeepID(clatypes.LogTrigger, "111"),
},
errCode: encoding.ErrCodeNil,
err: nil,
expectedValues: [][]byte{},
expectedErr: nil,
},
{
name: "error code bad request",
checkResult: &ocr2keepers.CheckResult{
UpkeepID: core.GenUpkeepID(clatypes.LogTrigger, "111"),
},
errCode: encoding.ErrCodeBadRequest,
err: errors.New("400"),
expectedValues: [][]byte{},
expectedErr: nil,
},
{
name: "error code partial content with retry timeout",
checkResult: &ocr2keepers.CheckResult{
UpkeepID: core.GenUpkeepID(clatypes.LogTrigger, "111"),
RetryInterval: mercury.RetryIntervalTimeout,
Retryable: true,
},
errCode: encoding.ErrCodePartialContent,
err: errors.New("206"),
expectedValues: [][]byte{},
expectedErr: nil,
},
{
name: "error code partial content without retry timeout",
checkResult: &ocr2keepers.CheckResult{
UpkeepID: core.GenUpkeepID(clatypes.LogTrigger, "111"),
RetryInterval: time.Second,
Retryable: true,
},
errCode: encoding.ErrCodePartialContent,
err: partialContentErr,
expectedValues: nil,
expectedErr: partialContentErr,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := setupStreams(t)
defer s.Close()

values, err := s.handleErrCode(tt.checkResult, tt.errCode, tt.err)
assert.Equal(t, len(tt.expectedValues), len(values))
assert.Equal(t, tt.expectedErr, err)
})
}
}
Loading

0 comments on commit 75c8094

Please sign in to comment.