diff --git a/core/services/pipeline/internal/eautils/eautils.go b/core/services/pipeline/internal/eautils/eautils.go new file mode 100644 index 00000000000..30faa826b22 --- /dev/null +++ b/core/services/pipeline/internal/eautils/eautils.go @@ -0,0 +1,39 @@ +package eautils + +import ( + "encoding/json" + "net/http" +) + +type AdapterStatus struct { + ErrorMessage *string `json:"errorMessage"` + Error any `json:"error"` + StatusCode *int `json:"statusCode"` + ProviderStatusCode *int `json:"providerStatusCode"` +} + +func BestEffortExtractEAStatus(responseBytes []byte) (code int, ok bool) { + var status AdapterStatus + err := json.Unmarshal(responseBytes, &status) + if err != nil { + return 0, false + } + + if status.StatusCode == nil { + return 0, false + } + + if *status.StatusCode != http.StatusOK { + return *status.StatusCode, true + } + + if status.ProviderStatusCode != nil && *status.ProviderStatusCode != http.StatusOK { + return *status.ProviderStatusCode, true + } + + if status.Error != nil { + return http.StatusInternalServerError, true + } + + return *status.StatusCode, true +} diff --git a/core/services/pipeline/internal/eautils/eautils_test.go b/core/services/pipeline/internal/eautils/eautils_test.go new file mode 100644 index 00000000000..80183b80d2b --- /dev/null +++ b/core/services/pipeline/internal/eautils/eautils_test.go @@ -0,0 +1,61 @@ +package eautils + +import ( + "net/http" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBestEffortExtractEAStatus(t *testing.T) { + tests := []struct { + name string + arg []byte + expectCode int + expectOk bool + }{ + { + name: "invalid object", + arg: []byte(`{"error": "invalid json object" `), + expectCode: 0, + expectOk: false, + }, + { + name: "no status code in object", + arg: []byte(`{}`), + expectCode: 0, + expectOk: false, + }, + { + name: "invalid status code", + arg: []byte(`{"statusCode":400}`), + expectCode: http.StatusBadRequest, + expectOk: true, + }, + { + name: "invalid provider status code", + arg: []byte(`{"statusCode":200, "providerStatusCode":500}`), + expectCode: http.StatusInternalServerError, + expectOk: true, + }, + { + name: "valid statuses with error message", + arg: []byte(`{"statusCode":200, "providerStatusCode":200, "error": "unexpected error"}`), + expectCode: http.StatusInternalServerError, + expectOk: true, + }, + { + name: "valid status code", + arg: []byte(`{"statusCode":200}`), + expectCode: http.StatusOK, + expectOk: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + code, ok := BestEffortExtractEAStatus(tt.arg) + assert.Equal(t, tt.expectCode, code) + assert.Equal(t, tt.expectOk, ok) + }) + } +} diff --git a/core/services/pipeline/task.bridge.go b/core/services/pipeline/task.bridge.go index f9490ea791d..1da34d19134 100644 --- a/core/services/pipeline/task.bridge.go +++ b/core/services/pipeline/task.bridge.go @@ -16,6 +16,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/bridges" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/pipeline/internal/eautils" ) // NOTE: These metrics generate a new label per bridge, this should be safe @@ -167,7 +168,13 @@ func (t *BridgeTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inp var cachedResponse bool responseBytes, statusCode, headers, elapsed, err := makeHTTPRequest(requestCtx, lggr, "POST", url, reqHeaders, requestData, t.httpClient, t.config.DefaultHTTPLimit()) - if err != nil { + + // check for external adapter response object status + if code, ok := eautils.BestEffortExtractEAStatus(responseBytes); ok { + statusCode = code + } + + if err != nil || statusCode != http.StatusOK { promBridgeErrors.WithLabelValues(t.Name).Inc() if cacheTTL == 0 { return Result{Error: err}, RunInfo{IsRetryable: isRetryableHTTPError(statusCode, err)} diff --git a/core/services/pipeline/task.bridge_test.go b/core/services/pipeline/task.bridge_test.go index bf1e63d6314..7673add1e35 100644 --- a/core/services/pipeline/task.bridge_test.go +++ b/core/services/pipeline/task.bridge_test.go @@ -19,7 +19,6 @@ import ( "github.com/shopspring/decimal" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gopkg.in/guregu/null.v4" commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink/v2/core/bridges" @@ -32,6 +31,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" + "github.com/smartcontractkit/chainlink/v2/core/services/pipeline/internal/eautils" "github.com/smartcontractkit/chainlink/v2/core/store/models" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -59,11 +59,43 @@ type adapterResponseData struct { // adapterResponse is the HTTP response as defined by the external adapter: // https://github.com/smartcontractkit/bnc-adapter type adapterResponse struct { - Data adapterResponseData `json:"data"` - ErrorMessage null.String `json:"errorMessage"` + eautils.AdapterStatus + Data adapterResponseData `json:"data"` } -func (pr adapterResponse) Result() *decimal.Decimal { +func (pr *adapterResponse) SetStatusCode(code int) { + pr.StatusCode = &code +} + +func (pr *adapterResponse) UnsetStatusCode() { + pr.StatusCode = nil +} + +func (pr *adapterResponse) SetProviderStatusCode(code int) { + pr.ProviderStatusCode = &code +} + +func (pr *adapterResponse) UnsetProviderStatusCode() { + pr.ProviderStatusCode = nil +} + +func (pr *adapterResponse) SetError(msg string) { + pr.Error = msg +} + +func (pr *adapterResponse) UnsetError() { + pr.Error = nil +} + +func (pr *adapterResponse) SetErrorMessage(msg string) { + pr.ErrorMessage = &msg +} + +func (pr *adapterResponse) UnsetErrorMessage() { + pr.ErrorMessage = nil +} + +func (pr *adapterResponse) Result() *decimal.Decimal { return pr.Data.Result } @@ -295,7 +327,7 @@ func TestBridgeTask_DoesNotReturnStaleResults(t *testing.T) { task.HelperSetDependencies(cfg.JobPipeline(), cfg.WebServer(), orm, specID, uuid.UUID{}, c) // Insert entry 1m in the past, stale value, should not be used in case of EA failure. - err = queryer.ExecQ(`INSERT INTO bridge_last_value(dot_id, spec_id, value, finished_at) + err = queryer.ExecQ(`INSERT INTO bridge_last_value(dot_id, spec_id, value, finished_at) VALUES($1, $2, $3, $4) ON CONFLICT ON CONSTRAINT bridge_last_value_pkey DO UPDATE SET value = $3, finished_at = $4;`, task.DotID(), specID, big.NewInt(9700).Bytes(), time.Now().Add(-1*time.Minute)) require.NoError(t, err) @@ -786,9 +818,10 @@ func TestBridgeTask_ErrorMessage(t *testing.T) { handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusTooManyRequests) - err := json.NewEncoder(w).Encode(adapterResponse{ - ErrorMessage: null.StringFrom("could not hit data fetcher"), - }) + + resp := &adapterResponse{} + resp.SetErrorMessage("could not hit data fetcher") + err := json.NewEncoder(w).Encode(resp) require.NoError(t, err) }) @@ -1016,3 +1049,93 @@ func TestBridgeTask_Headers(t *testing.T) { assert.Equal(t, []string{"Content-Length", "38", "Content-Type", "footype", "User-Agent", "Go-http-client/1.1", "X-Header-1", "foo", "X-Header-2", "bar"}, allHeaders(headers)) }) } + +func TestBridgeTask_AdapterResponseStatusFailure(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { + c.WebServer.BridgeCacheTTL = commonconfig.MustNewDuration(1 * time.Minute) + }) + + testAdapterResponse := &adapterResponse{ + Data: adapterResponseData{Result: &decimal.Zero}, + } + + queryer := pg.NewQ(db, logger.TestLogger(t), cfg.Database()) + s1 := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + err := json.NewEncoder(w).Encode(testAdapterResponse) + require.NoError(t, err) + })) + defer s1.Close() + + feedURL, err := url.ParseRequestURI(s1.URL) + require.NoError(t, err) + + orm := bridges.NewORM(db, logger.TestLogger(t), cfg.Database()) + _, bridge := cltest.MustCreateBridge(t, db, cltest.BridgeOpts{URL: feedURL.String()}, cfg.Database()) + + task := pipeline.BridgeTask{ + BaseTask: pipeline.NewBaseTask(0, "bridge", nil, nil, 0), + Name: bridge.Name.String(), + RequestData: btcUSDPairing, + } + c := clhttptest.NewTestLocalOnlyHTTPClient() + trORM := pipeline.NewORM(db, logger.TestLogger(t), cfg.Database(), cfg.JobPipeline().MaxSuccessfulRuns()) + specID, err := trORM.CreateSpec(pipeline.Pipeline{}, *models.NewInterval(5 * time.Minute), pg.WithParentCtx(testutils.Context(t))) + require.NoError(t, err) + task.HelperSetDependencies(cfg.JobPipeline(), cfg.WebServer(), orm, specID, uuid.UUID{}, c) + + // Insert entry 1m in the past, stale value, should not be used in case of EA failure. + err = queryer.ExecQ(`INSERT INTO bridge_last_value(dot_id, spec_id, value, finished_at) + VALUES($1, $2, $3, $4) ON CONFLICT ON CONSTRAINT bridge_last_value_pkey + DO UPDATE SET value = $3, finished_at = $4;`, task.DotID(), specID, big.NewInt(9700).Bytes(), time.Now()) + require.NoError(t, err) + + vars := pipeline.NewVarsFrom( + map[string]interface{}{ + "jobRun": map[string]interface{}{ + "meta": map[string]interface{}{ + "shouldFail": true, + }, + }, + }, + ) + + // expect all external adapter response status failures to be served from the cache + testAdapterResponse.SetStatusCode(http.StatusBadRequest) + result, runInfo := task.Run(testutils.Context(t), logger.TestLogger(t), vars, nil) + + require.NoError(t, result.Error) + require.NotNil(t, result.Value) + require.False(t, runInfo.IsRetryable) + require.False(t, runInfo.IsPending) + + testAdapterResponse.SetStatusCode(http.StatusOK) + testAdapterResponse.SetProviderStatusCode(http.StatusBadRequest) + result, runInfo = task.Run(testutils.Context(t), logger.TestLogger(t), vars, nil) + + require.NoError(t, result.Error) + require.NotNil(t, result.Value) + require.False(t, runInfo.IsRetryable) + require.False(t, runInfo.IsPending) + + testAdapterResponse.SetStatusCode(http.StatusOK) + testAdapterResponse.SetProviderStatusCode(http.StatusOK) + testAdapterResponse.SetError("some error") + result, runInfo = task.Run(testutils.Context(t), logger.TestLogger(t), vars, nil) + + require.NoError(t, result.Error) + require.NotNil(t, result.Value) + require.False(t, runInfo.IsRetryable) + require.False(t, runInfo.IsPending) + + testAdapterResponse.SetStatusCode(http.StatusInternalServerError) + result, runInfo = task.Run(testutils.Context(t), logger.TestLogger(t), vars, nil) + + require.NoError(t, result.Error) + require.NotNil(t, result.Value) + require.False(t, runInfo.IsRetryable) + require.False(t, runInfo.IsPending) +} diff --git a/core/services/pipeline/task.http_test.go b/core/services/pipeline/task.http_test.go index c0dd93df430..36ccc147a78 100644 --- a/core/services/pipeline/task.http_test.go +++ b/core/services/pipeline/task.http_test.go @@ -14,7 +14,6 @@ import ( "github.com/shopspring/decimal" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gopkg.in/guregu/null.v4" "github.com/smartcontractkit/chainlink/v2/core/bridges" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" @@ -264,9 +263,9 @@ func TestHTTPTask_ErrorMessage(t *testing.T) { handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusTooManyRequests) - err := json.NewEncoder(w).Encode(adapterResponse{ - ErrorMessage: null.StringFrom("could not hit data fetcher"), - }) + resp := &adapterResponse{} + resp.SetErrorMessage("could not hit data fetcher") + err := json.NewEncoder(w).Encode(resp) require.NoError(t, err) })