Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop retry flow for error handling (log trigger) #12026

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package encoding

import (
"net/http"

ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation"

"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/automation_utils_2_1"
Expand Down Expand Up @@ -43,6 +45,36 @@ const (
UpkeepNotAuthorized PipelineExecutionState = 9
)

// ErrCode is used for invoking an error handler with a specific error code.
type ErrCode uint32

const (
ErrCodeNil ErrCode = 0
ErrCodePartialContent ErrCode = 808206
ErrCodeDataStreamsError ErrCode = 808500
ErrCodeBadRequest ErrCode = 808400
ErrCodeUnauthorized ErrCode = 808401
ErrCodeEncodingError ErrCode = 808600
ErrCodeStreamLookupTimeout ErrCode = 808603
)

func HttpToErrCode(statusCode int) ErrCode {
switch statusCode {
case http.StatusOK:
return ErrCodeNil
case http.StatusPartialContent:
return ErrCodePartialContent
case http.StatusBadRequest:
return ErrCodeBadRequest
case http.StatusUnauthorized:
return ErrCodeUnauthorized
case http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout:
return ErrCodeDataStreamsError
default:
return 0
}
}

type UpkeepInfo = iregistry21.KeeperRegistryBase21UpkeepInfo

type Packer interface {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ const (
BlockNumber = "blockNumber" // valid for v0.2
Timestamp = "timestamp" // valid for v0.3
totalFastPluginRetries = 5
totalMediumPluginRetries = 10
totalMediumPluginRetries = totalFastPluginRetries + 1
RetryIntervalTimeout = time.Duration(-1)
)

var GenerateHMACFn = func(method string, path string, body []byte, clientId string, secret string, ts int64) string {
Expand All @@ -45,8 +46,7 @@ var GenerateHMACFn = func(method string, path string, body []byte, clientId stri
}

// CalculateRetryConfig returns plugin retry interval based on how many times plugin has retried this work
var CalculateRetryConfigFn = func(prk string, mercuryConfig MercuryConfigProvider) time.Duration {
var retryInterval time.Duration
var CalculateRetryConfigFn = func(prk string, mercuryConfig MercuryConfigProvider) (retryInterval time.Duration) {
var retries int
totalAttempts, ok := mercuryConfig.GetPluginRetry(prk)
if ok {
Expand All @@ -55,9 +55,9 @@ var CalculateRetryConfigFn = func(prk string, mercuryConfig MercuryConfigProvide
retryInterval = 1 * time.Second
} else if retries < totalMediumPluginRetries {
retryInterval = 5 * time.Second
} else {
retryInterval = RetryIntervalTimeout
}
// if the core node has retried totalMediumPluginRetries times, do not set retry interval and plugin will use
// the default interval
} else {
retryInterval = 1 * time.Second
}
Expand All @@ -68,6 +68,7 @@ var CalculateRetryConfigFn = func(prk string, mercuryConfig MercuryConfigProvide
type MercuryData struct {
Index int
Error error
ErrCode encoding.ErrCode
Retryable bool
Bytes [][]byte
State encoding.PipelineExecutionState
Expand All @@ -86,7 +87,17 @@ type HttpClient interface {
}

type MercuryClient interface {
DoRequest(ctx context.Context, streamsLookup *StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, error)
// DoRequest makes a data stream request, it manages retries and returns the following:
// state: the state of the pipeline execution, used by our components.
// upkeepFailureReason: the reason for the upkeep failure, used by our components.
// data: the data returned from the data stream.
// retryable: whether the request is retryable.
// retryInterval: the interval to wait before retrying the request, or RetryIntervalTimeout if no more retries are allowed.
// errCode: the error code of the request, to be passed to the user's error handler if applicable.
// error: the raw error that occurred during the request.
//
// Exploratory: consider to merge state/failureReason/errCode into a single object
DoRequest(ctx context.Context, streamsLookup *StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, encoding.ErrCode, error)
amirylm marked this conversation as resolved.
Show resolved Hide resolved
}

type StreamsLookupError struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import (
"errors"
"math/big"
"testing"
"time"

"github.com/patrickmn/go-cache"
"github.com/stretchr/testify/require"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/stretchr/testify/assert"

"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding"
)

Expand Down Expand Up @@ -240,3 +243,77 @@ func TestPacker_UnpackCheckCallbackResult(t *testing.T) {
})
}
}

func Test_CalculateRetryConfigFn(t *testing.T) {
tests := []struct {
name string
times int
expected time.Duration
}{
{
name: "first retry",
times: 1,
expected: 1 * time.Second,
},
{
name: "second retry",
times: 2,
expected: 1 * time.Second,
},
{
name: "fifth retry",
times: 5,
expected: 1 * time.Second,
},
{
name: "sixth retry",
times: 6,
expected: 5 * time.Second,
},
{
name: "timeout",
times: totalMediumPluginRetries + 1,
expected: RetryIntervalTimeout,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
cfg := newMercuryConfigMock()
var result time.Duration
for i := 0; i < tc.times; i++ {
result = CalculateRetryConfigFn("prk", cfg)
}
assert.Equal(t, tc.expected, result)
})
}
}

type mercuryConfigMock struct {
pluginRetryCache *cache.Cache
}

func newMercuryConfigMock() *mercuryConfigMock {
return &mercuryConfigMock{
pluginRetryCache: cache.New(10*time.Second, time.Minute),
}
}

func (c *mercuryConfigMock) Credentials() *types.MercuryCredentials {
return nil
}

func (c *mercuryConfigMock) IsUpkeepAllowed(k string) (interface{}, bool) {
return nil, false
}

func (c *mercuryConfigMock) SetUpkeepAllowed(k string, v interface{}, d time.Duration) {
}

func (c *mercuryConfigMock) GetPluginRetry(k string) (interface{}, bool) {
return c.pluginRetryCache.Get(k)
}

func (c *mercuryConfigMock) SetPluginRetry(k string, v interface{}, d time.Duration) {
c.pluginRetryCache.Set(k, v, d)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/patrickmn/go-cache"

"github.com/smartcontractkit/chainlink-automation/pkg/v3/types"
"github.com/smartcontractkit/chainlink-common/pkg/services"
ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation"

Expand Down Expand Up @@ -237,21 +238,31 @@ func (s *streams) CheckCallback(ctx context.Context, values [][]byte, lookup *me
}

func (s *streams) DoMercuryRequest(ctx context.Context, lookup *mercury.StreamsLookup, checkResults []ocr2keepers.CheckResult, i int) ([][]byte, error) {
state, reason, values, retryable, retryInterval, err := encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0*time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", lookup.FeedParamKey, lookup.TimeParamKey, lookup.Feeds)
state, reason, values, retryable, retryInterval, errCode, err := encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0*time.Second, encoding.ErrCodeNil, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", lookup.FeedParamKey, lookup.TimeParamKey, lookup.Feeds)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these initial values used anywhere? I see we always overwrite them with either 0.2.DoRequest or 0.3.DoRequest, maybe simple to just declare them as vars?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could have declare some of the vars, but some has initial value and thats why they are defined like this

pluginRetryKey := generatePluginRetryKey(checkResults[i].WorkID, lookup.Block)

if lookup.IsMercuryV02() {
state, reason, values, retryable, retryInterval, err = s.v02Client.DoRequest(ctx, lookup, pluginRetryKey)
state, reason, values, retryable, retryInterval, errCode, err = s.v02Client.DoRequest(ctx, lookup, pluginRetryKey)
} else if lookup.IsMercuryV03() {
state, reason, values, retryable, retryInterval, err = s.v03Client.DoRequest(ctx, lookup, pluginRetryKey)
state, reason, values, retryable, retryInterval, errCode, err = s.v03Client.DoRequest(ctx, lookup, pluginRetryKey)
}

if err != nil {
checkResults[i].Retryable = retryable
checkResults[i].RetryInterval = retryInterval
checkResults[i].PipelineExecutionState = uint8(state)
checkResults[i].IneligibilityReason = uint8(reason)
return nil, err
retryTimeout := retryInterval == mercury.RetryIntervalTimeout
if retryTimeout {
// in case of retry timeout, setting retryable to false
checkResults[i].Retryable = false
}
amirylm marked this conversation as resolved.
Show resolved Hide resolved
_, eCodeErr := s.handleErrCode(&checkResults[i], errCode, err)
if eCodeErr != nil {
return nil, eCodeErr
}
s.lggr.Debugf("at block %d upkeep %s requested time %s doMercuryRequest err: %s, errCode: %d", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error(), errCode)
return nil, err // TODO: remove this line once we have error handler
}

for j, v := range values {
Expand All @@ -260,6 +271,33 @@ func (s *streams) DoMercuryRequest(ctx context.Context, lookup *mercury.StreamsL
return values, nil
}

// TODO: complete this function for preparing values for error handler
func (s *streams) handleErrCode(result *ocr2keepers.CheckResult, errCode encoding.ErrCode, err error) ([][]byte, error) {
upkeepType := core.GetUpkeepType(result.UpkeepID)
switch upkeepType {
case types.LogTrigger:
switch errCode {
case encoding.ErrCodePartialContent, encoding.ErrCodeDataStreamsError:
if result.RetryInterval != mercury.RetryIntervalTimeout {
return nil, err
}
case encoding.ErrCodeBadRequest, encoding.ErrCodeUnauthorized, encoding.ErrCodeEncodingError:
default:
return nil, err
}
case types.ConditionTrigger:
// switch errCode {
// case encoding.ErrCodePartialContent, encoding.ErrCodeDataStreamsError, encoding.ErrCodeBadRequest, encoding.ErrCodeUnauthorized, encoding.ErrCodeEncodingError:
// default:
// return nil, err // TODO: encomment this line once we have error handler
// }
default:
return nil, err
}
// TODO: prepare values for error handler
return [][]byte{}, nil
}

// AllowedToUseMercury retrieves upkeep's administrative offchain config and decode a mercuryEnabled bool to indicate if
// this upkeep is allowed to use Mercury service.
func (s *streams) AllowedToUseMercury(opts *bind.CallOpts, upkeepId *big.Int) (state encoding.PipelineExecutionState, reason encoding.UpkeepFailureReason, retryable bool, allow bool, err error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"testing"
"time"

clatypes "github.com/smartcontractkit/chainlink-automation/pkg/v3/types"

"github.com/smartcontractkit/chainlink-common/pkg/types"

"github.com/pkg/errors"
Expand All @@ -28,6 +30,7 @@ import (
iregistry21 "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/i_keeper_registry_master_wrapper_2_1"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury"
v02 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02"
Expand Down Expand Up @@ -820,3 +823,72 @@ func TestStreams_StreamsLookup(t *testing.T) {
})
}
}

func Test_HandleErrCode(t *testing.T) {
partialContentErr := errors.New("206")

tests := []struct {
name string
checkResult *ocr2keepers.CheckResult
errCode encoding.ErrCode
err error
expectedValues [][]byte
expectedErr error
}{
{
name: "no error",
checkResult: &ocr2keepers.CheckResult{
UpkeepID: core.GenUpkeepID(clatypes.LogTrigger, "111"),
},
errCode: encoding.ErrCodeNil,
err: nil,
expectedValues: [][]byte{},
expectedErr: nil,
},
{
name: "error code bad request",
checkResult: &ocr2keepers.CheckResult{
UpkeepID: core.GenUpkeepID(clatypes.LogTrigger, "111"),
},
errCode: encoding.ErrCodeBadRequest,
err: errors.New("400"),
expectedValues: [][]byte{},
expectedErr: nil,
},
{
name: "error code partial content with retry timeout",
checkResult: &ocr2keepers.CheckResult{
UpkeepID: core.GenUpkeepID(clatypes.LogTrigger, "111"),
RetryInterval: mercury.RetryIntervalTimeout,
Retryable: true,
},
errCode: encoding.ErrCodePartialContent,
err: errors.New("206"),
expectedValues: [][]byte{},
expectedErr: nil,
},
{
name: "error code partial content without retry timeout",
checkResult: &ocr2keepers.CheckResult{
UpkeepID: core.GenUpkeepID(clatypes.LogTrigger, "111"),
RetryInterval: time.Second,
Retryable: true,
},
errCode: encoding.ErrCodePartialContent,
err: partialContentErr,
expectedValues: nil,
expectedErr: partialContentErr,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := setupStreams(t)
defer s.Close()

values, err := s.handleErrCode(tt.checkResult, tt.errCode, tt.err)
assert.Equal(t, len(tt.expectedValues), len(values))
assert.Equal(t, tt.expectedErr, err)
})
}
}
Loading
Loading