Skip to content

Commit

Permalink
pipeline/task.bridge: do not cache invalid external adapter response …
Browse files Browse the repository at this point in the history
…objects. (#11725)

* pipeline/task.bridge: check for external adapter status in response object

* pipeline/task.bridge: external adapter response object tests

* pipeline: move ea specific utilities into an internal package and add unit tests

* pipeline/task.bridge: rebase fix for commonconfig

* Update core/services/pipeline/internal/eautils/eautils.go

Co-authored-by: Jordan Krage <[email protected]>

* pipeline/task.bridge: ea status error can also be an object

---------

Co-authored-by: Jordan Krage <[email protected]>
  • Loading branch information
brunotm and jmank88 authored Jan 22, 2024
1 parent f8a47b9 commit b164040
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 13 deletions.
39 changes: 39 additions & 0 deletions core/services/pipeline/internal/eautils/eautils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package eautils

import (
"encoding/json"
"net/http"
)

type AdapterStatus struct {
ErrorMessage *string `json:"errorMessage"`
Error any `json:"error"`
StatusCode *int `json:"statusCode"`
ProviderStatusCode *int `json:"providerStatusCode"`
}

func BestEffortExtractEAStatus(responseBytes []byte) (code int, ok bool) {
var status AdapterStatus
err := json.Unmarshal(responseBytes, &status)
if err != nil {
return 0, false
}

if status.StatusCode == nil {
return 0, false
}

if *status.StatusCode != http.StatusOK {
return *status.StatusCode, true
}

if status.ProviderStatusCode != nil && *status.ProviderStatusCode != http.StatusOK {
return *status.ProviderStatusCode, true
}

if status.Error != nil {
return http.StatusInternalServerError, true
}

return *status.StatusCode, true
}
61 changes: 61 additions & 0 deletions core/services/pipeline/internal/eautils/eautils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package eautils

import (
"net/http"
"testing"

"github.com/stretchr/testify/assert"
)

func TestBestEffortExtractEAStatus(t *testing.T) {
tests := []struct {
name string
arg []byte
expectCode int
expectOk bool
}{
{
name: "invalid object",
arg: []byte(`{"error": "invalid json object" `),
expectCode: 0,
expectOk: false,
},
{
name: "no status code in object",
arg: []byte(`{}`),
expectCode: 0,
expectOk: false,
},
{
name: "invalid status code",
arg: []byte(`{"statusCode":400}`),
expectCode: http.StatusBadRequest,
expectOk: true,
},
{
name: "invalid provider status code",
arg: []byte(`{"statusCode":200, "providerStatusCode":500}`),
expectCode: http.StatusInternalServerError,
expectOk: true,
},
{
name: "valid statuses with error message",
arg: []byte(`{"statusCode":200, "providerStatusCode":200, "error": "unexpected error"}`),
expectCode: http.StatusInternalServerError,
expectOk: true,
},
{
name: "valid status code",
arg: []byte(`{"statusCode":200}`),
expectCode: http.StatusOK,
expectOk: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
code, ok := BestEffortExtractEAStatus(tt.arg)
assert.Equal(t, tt.expectCode, code)
assert.Equal(t, tt.expectOk, ok)
})
}
}
9 changes: 8 additions & 1 deletion core/services/pipeline/task.bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/bridges"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline/internal/eautils"
)

// NOTE: These metrics generate a new label per bridge, this should be safe
Expand Down Expand Up @@ -167,7 +168,13 @@ func (t *BridgeTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inp

var cachedResponse bool
responseBytes, statusCode, headers, elapsed, err := makeHTTPRequest(requestCtx, lggr, "POST", url, reqHeaders, requestData, t.httpClient, t.config.DefaultHTTPLimit())
if err != nil {

// check for external adapter response object status
if code, ok := eautils.BestEffortExtractEAStatus(responseBytes); ok {
statusCode = code
}

if err != nil || statusCode != http.StatusOK {
promBridgeErrors.WithLabelValues(t.Name).Inc()
if cacheTTL == 0 {
return Result{Error: err}, RunInfo{IsRetryable: isRetryableHTTPError(statusCode, err)}
Expand Down
139 changes: 131 additions & 8 deletions core/services/pipeline/task.bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v4"

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink/v2/core/bridges"
Expand All @@ -32,6 +31,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/chainlink"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline/internal/eautils"
"github.com/smartcontractkit/chainlink/v2/core/store/models"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)
Expand Down Expand Up @@ -59,11 +59,43 @@ type adapterResponseData struct {
// adapterResponse is the HTTP response as defined by the external adapter:
// https://github.com/smartcontractkit/bnc-adapter
type adapterResponse struct {
Data adapterResponseData `json:"data"`
ErrorMessage null.String `json:"errorMessage"`
eautils.AdapterStatus
Data adapterResponseData `json:"data"`
}

func (pr adapterResponse) Result() *decimal.Decimal {
func (pr *adapterResponse) SetStatusCode(code int) {
pr.StatusCode = &code
}

func (pr *adapterResponse) UnsetStatusCode() {
pr.StatusCode = nil
}

func (pr *adapterResponse) SetProviderStatusCode(code int) {
pr.ProviderStatusCode = &code
}

func (pr *adapterResponse) UnsetProviderStatusCode() {
pr.ProviderStatusCode = nil
}

func (pr *adapterResponse) SetError(msg string) {
pr.Error = msg
}

func (pr *adapterResponse) UnsetError() {
pr.Error = nil
}

func (pr *adapterResponse) SetErrorMessage(msg string) {
pr.ErrorMessage = &msg
}

func (pr *adapterResponse) UnsetErrorMessage() {
pr.ErrorMessage = nil
}

func (pr *adapterResponse) Result() *decimal.Decimal {
return pr.Data.Result
}

Expand Down Expand Up @@ -295,7 +327,7 @@ func TestBridgeTask_DoesNotReturnStaleResults(t *testing.T) {
task.HelperSetDependencies(cfg.JobPipeline(), cfg.WebServer(), orm, specID, uuid.UUID{}, c)

// Insert entry 1m in the past, stale value, should not be used in case of EA failure.
err = queryer.ExecQ(`INSERT INTO bridge_last_value(dot_id, spec_id, value, finished_at)
err = queryer.ExecQ(`INSERT INTO bridge_last_value(dot_id, spec_id, value, finished_at)
VALUES($1, $2, $3, $4) ON CONFLICT ON CONSTRAINT bridge_last_value_pkey
DO UPDATE SET value = $3, finished_at = $4;`, task.DotID(), specID, big.NewInt(9700).Bytes(), time.Now().Add(-1*time.Minute))
require.NoError(t, err)
Expand Down Expand Up @@ -786,9 +818,10 @@ func TestBridgeTask_ErrorMessage(t *testing.T) {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusTooManyRequests)
err := json.NewEncoder(w).Encode(adapterResponse{
ErrorMessage: null.StringFrom("could not hit data fetcher"),
})

resp := &adapterResponse{}
resp.SetErrorMessage("could not hit data fetcher")
err := json.NewEncoder(w).Encode(resp)
require.NoError(t, err)
})

Expand Down Expand Up @@ -1016,3 +1049,93 @@ func TestBridgeTask_Headers(t *testing.T) {
assert.Equal(t, []string{"Content-Length", "38", "Content-Type", "footype", "User-Agent", "Go-http-client/1.1", "X-Header-1", "foo", "X-Header-2", "bar"}, allHeaders(headers))
})
}

func TestBridgeTask_AdapterResponseStatusFailure(t *testing.T) {
t.Parallel()

db := pgtest.NewSqlxDB(t)
cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) {
c.WebServer.BridgeCacheTTL = commonconfig.MustNewDuration(1 * time.Minute)
})

testAdapterResponse := &adapterResponse{
Data: adapterResponseData{Result: &decimal.Zero},
}

queryer := pg.NewQ(db, logger.TestLogger(t), cfg.Database())
s1 := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
err := json.NewEncoder(w).Encode(testAdapterResponse)
require.NoError(t, err)
}))
defer s1.Close()

feedURL, err := url.ParseRequestURI(s1.URL)
require.NoError(t, err)

orm := bridges.NewORM(db, logger.TestLogger(t), cfg.Database())
_, bridge := cltest.MustCreateBridge(t, db, cltest.BridgeOpts{URL: feedURL.String()}, cfg.Database())

task := pipeline.BridgeTask{
BaseTask: pipeline.NewBaseTask(0, "bridge", nil, nil, 0),
Name: bridge.Name.String(),
RequestData: btcUSDPairing,
}
c := clhttptest.NewTestLocalOnlyHTTPClient()
trORM := pipeline.NewORM(db, logger.TestLogger(t), cfg.Database(), cfg.JobPipeline().MaxSuccessfulRuns())
specID, err := trORM.CreateSpec(pipeline.Pipeline{}, *models.NewInterval(5 * time.Minute), pg.WithParentCtx(testutils.Context(t)))
require.NoError(t, err)
task.HelperSetDependencies(cfg.JobPipeline(), cfg.WebServer(), orm, specID, uuid.UUID{}, c)

// Insert entry 1m in the past, stale value, should not be used in case of EA failure.
err = queryer.ExecQ(`INSERT INTO bridge_last_value(dot_id, spec_id, value, finished_at)
VALUES($1, $2, $3, $4) ON CONFLICT ON CONSTRAINT bridge_last_value_pkey
DO UPDATE SET value = $3, finished_at = $4;`, task.DotID(), specID, big.NewInt(9700).Bytes(), time.Now())
require.NoError(t, err)

vars := pipeline.NewVarsFrom(
map[string]interface{}{
"jobRun": map[string]interface{}{
"meta": map[string]interface{}{
"shouldFail": true,
},
},
},
)

// expect all external adapter response status failures to be served from the cache
testAdapterResponse.SetStatusCode(http.StatusBadRequest)
result, runInfo := task.Run(testutils.Context(t), logger.TestLogger(t), vars, nil)

require.NoError(t, result.Error)
require.NotNil(t, result.Value)
require.False(t, runInfo.IsRetryable)
require.False(t, runInfo.IsPending)

testAdapterResponse.SetStatusCode(http.StatusOK)
testAdapterResponse.SetProviderStatusCode(http.StatusBadRequest)
result, runInfo = task.Run(testutils.Context(t), logger.TestLogger(t), vars, nil)

require.NoError(t, result.Error)
require.NotNil(t, result.Value)
require.False(t, runInfo.IsRetryable)
require.False(t, runInfo.IsPending)

testAdapterResponse.SetStatusCode(http.StatusOK)
testAdapterResponse.SetProviderStatusCode(http.StatusOK)
testAdapterResponse.SetError("some error")
result, runInfo = task.Run(testutils.Context(t), logger.TestLogger(t), vars, nil)

require.NoError(t, result.Error)
require.NotNil(t, result.Value)
require.False(t, runInfo.IsRetryable)
require.False(t, runInfo.IsPending)

testAdapterResponse.SetStatusCode(http.StatusInternalServerError)
result, runInfo = task.Run(testutils.Context(t), logger.TestLogger(t), vars, nil)

require.NoError(t, result.Error)
require.NotNil(t, result.Value)
require.False(t, runInfo.IsRetryable)
require.False(t, runInfo.IsPending)
}
7 changes: 3 additions & 4 deletions core/services/pipeline/task.http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v4"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
Expand Down Expand Up @@ -264,9 +263,9 @@ func TestHTTPTask_ErrorMessage(t *testing.T) {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusTooManyRequests)
err := json.NewEncoder(w).Encode(adapterResponse{
ErrorMessage: null.StringFrom("could not hit data fetcher"),
})
resp := &adapterResponse{}
resp.SetErrorMessage("could not hit data fetcher")
err := json.NewEncoder(w).Encode(resp)
require.NoError(t, err)
})

Expand Down

0 comments on commit b164040

Please sign in to comment.