From 36df2c29fc781e794b5272c7244f113750d8421b Mon Sep 17 00:00:00 2001 From: Jean Arnaud Date: Tue, 14 Nov 2023 12:50:54 +0100 Subject: [PATCH] Monitor USDC Attestation API (#277) --- ...d_reader_test.go => cached_reader_test.go} | 0 .../ccip/tokendata/http/http_client.go | 47 ++++++ .../tokendata/http/observed_http_client.go | 68 +++++++++ .../observability/usdc_client_test.go | 139 ++++++++++++++++++ .../ocr2/plugins/ccip/tokendata/usdc/usdc.go | 45 ++---- 5 files changed, 268 insertions(+), 31 deletions(-) rename core/services/ocr2/plugins/ccip/tokendata/{chached_reader_test.go => cached_reader_test.go} (100%) create mode 100644 core/services/ocr2/plugins/ccip/tokendata/http/http_client.go create mode 100644 core/services/ocr2/plugins/ccip/tokendata/http/observed_http_client.go create mode 100644 core/services/ocr2/plugins/ccip/tokendata/observability/usdc_client_test.go diff --git a/core/services/ocr2/plugins/ccip/tokendata/chached_reader_test.go b/core/services/ocr2/plugins/ccip/tokendata/cached_reader_test.go similarity index 100% rename from core/services/ocr2/plugins/ccip/tokendata/chached_reader_test.go rename to core/services/ocr2/plugins/ccip/tokendata/cached_reader_test.go diff --git a/core/services/ocr2/plugins/ccip/tokendata/http/http_client.go b/core/services/ocr2/plugins/ccip/tokendata/http/http_client.go new file mode 100644 index 0000000000..3a00f81c0c --- /dev/null +++ b/core/services/ocr2/plugins/ccip/tokendata/http/http_client.go @@ -0,0 +1,47 @@ +package http + +import ( + "context" + "io" + "net/http" + "time" + + "github.com/pkg/errors" + + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata" +) + +type IHttpClient interface { + // Get issue a GET request to the given url and return the response body and status code. + Get(ctx context.Context, url string, timeout time.Duration) ([]byte, int, error) +} + +type HttpClient struct { +} + +func (s *HttpClient) Get(ctx context.Context, url string, timeout time.Duration) ([]byte, int, error) { + // Use a timeout to guard against attestation API hanging, causing observation timeout and failing to make any progress. + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + req, err := http.NewRequestWithContext(timeoutCtx, "GET", url, nil) + if err != nil { + return nil, http.StatusBadRequest, err + } + req.Header.Add("accept", "application/json") + res, err := http.DefaultClient.Do(req) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + return nil, http.StatusRequestTimeout, tokendata.ErrTimeout + } + return nil, res.StatusCode, err + } + defer res.Body.Close() + + // Explicitly signal if the API is being rate limited + if res.StatusCode == http.StatusTooManyRequests { + return nil, res.StatusCode, tokendata.ErrRateLimit + } + + body, err := io.ReadAll(res.Body) + return body, res.StatusCode, err +} diff --git a/core/services/ocr2/plugins/ccip/tokendata/http/observed_http_client.go b/core/services/ocr2/plugins/ccip/tokendata/http/observed_http_client.go new file mode 100644 index 0000000000..fa49406b10 --- /dev/null +++ b/core/services/ocr2/plugins/ccip/tokendata/http/observed_http_client.go @@ -0,0 +1,68 @@ +package http + +import ( + "context" + "strconv" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + usdcLatencyBuckets = []float64{ + float64(10 * time.Millisecond), + float64(25 * time.Millisecond), + float64(50 * time.Millisecond), + float64(75 * time.Millisecond), + float64(100 * time.Millisecond), + float64(250 * time.Millisecond), + float64(500 * time.Millisecond), + float64(750 * time.Millisecond), + float64(1 * time.Second), + float64(2 * time.Second), + float64(3 * time.Second), + float64(4 * time.Second), + float64(5 * time.Second), + } + usdcClientHistogram = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "ccip_usdc_client_request_total", + Help: "Latency of calls to the USDC client", + Buckets: usdcLatencyBuckets, + }, []string{"status", "success"}) +) + +type ObservedIHttpClient struct { + IHttpClient + histogram *prometheus.HistogramVec +} + +// NewObservedIHttpClient Create a new ObservedIHttpClient with the USDC client metric. +func NewObservedIHttpClient(origin IHttpClient) *ObservedIHttpClient { + return NewObservedIHttpClientWithMetric(origin, usdcClientHistogram) +} + +func NewObservedIHttpClientWithMetric(origin IHttpClient, histogram *prometheus.HistogramVec) *ObservedIHttpClient { + return &ObservedIHttpClient{ + IHttpClient: origin, + histogram: histogram, + } +} + +func (o *ObservedIHttpClient) Get(ctx context.Context, url string, timeout time.Duration) ([]byte, int, error) { + return withObservedHttpClient(o.histogram, func() ([]byte, int, error) { + return o.IHttpClient.Get(ctx, url, timeout) + }) +} + +func withObservedHttpClient[T any](histogram *prometheus.HistogramVec, contract func() (T, int, error)) (T, int, error) { + contractExecutionStarted := time.Now() + value, status, err := contract() + histogram. + WithLabelValues( + strconv.FormatInt(int64(status), 10), + strconv.FormatBool(err == nil), + ). + Observe(float64(time.Since(contractExecutionStarted))) + return value, status, err +} diff --git a/core/services/ocr2/plugins/ccip/tokendata/observability/usdc_client_test.go b/core/services/ocr2/plugins/ccip/tokendata/observability/usdc_client_test.go new file mode 100644 index 0000000000..9a0bb8c98a --- /dev/null +++ b/core/services/ocr2/plugins/ccip/tokendata/observability/usdc_client_test.go @@ -0,0 +1,139 @@ +package observability + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "net/url" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + io_prometheus_client "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks" + http2 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata/http" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata/usdc" +) + +type expected struct { + status string + result string + count int +} + +func TestUSDCClientMonitoring(t *testing.T) { + + tests := []struct { + name string + server *httptest.Server + requests int + expected []expected + }{ + { + name: "success", + server: newSuccessServer(t), + requests: 5, + expected: []expected{ + {"200", "true", 5}, + {"429", "false", 0}, + }, + }, + { + name: "rate_limited", + server: newRateLimitedServer(), + requests: 26, + expected: []expected{ + {"200", "true", 0}, + {"429", "false", 26}, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + testMonitoring(t, test.name, test.server, test.requests, test.expected, logger.TestLogger(t)) + }) + } + +} + +func testMonitoring(t *testing.T, name string, server *httptest.Server, requests int, expected []expected, log logger.Logger) { + server.Start() + defer server.Close() + attestationURI, err := url.ParseRequestURI(server.URL) + require.NoError(t, err) + + // Define test histogram (avoid side effects from other tests if using the real usdcHistogram). + histogram := promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "test_client_histogram_" + name, + Help: "Latency of calls to the USDC mock client", + Buckets: []float64{float64(250 * time.Millisecond), float64(1 * time.Second), float64(5 * time.Second)}, + }, []string{"status", "success"}) + + // Mock USDC reader. + usdcReader := mocks.NewUSDCReader(t) + msgBody := []byte{0xb0, 0xd1} + usdcReader.On("GetLastUSDCMessagePriorToLogIndexInTx", mock.Anything, mock.Anything, mock.Anything).Return(msgBody, nil) + + // Service with monitored http client. + observedHttpClient := http2.NewObservedIHttpClientWithMetric(&http2.HttpClient{}, histogram) + tokenDataReaderDefault := usdc.NewUSDCTokenDataReader(log, usdcReader, attestationURI, 0) + tokenDataReader := usdc.NewUSDCTokenDataReaderWithHttpClient(*tokenDataReaderDefault, observedHttpClient) + require.NotNil(t, tokenDataReader) + + for i := 0; i < requests; i++ { + _, _ = tokenDataReader.ReadTokenData(context.Background(), internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{}) + } + + // Check that the metrics are updated as expected. + for _, e := range expected { + assert.Equal(t, e.count, counterFromHistogramByLabels(t, histogram, e.status, e.result)) + } +} + +func counterFromHistogramByLabels(t *testing.T, histogramVec *prometheus.HistogramVec, labels ...string) int { + observer, err := histogramVec.GetMetricWithLabelValues(labels...) + require.NoError(t, err) + + metricCh := make(chan prometheus.Metric, 1) + observer.(prometheus.Histogram).Collect(metricCh) + close(metricCh) + + metric := <-metricCh + pb := &io_prometheus_client.Metric{} + err = metric.Write(pb) + require.NoError(t, err) + + return int(pb.GetHistogram().GetSampleCount()) +} + +func newSuccessServer(t *testing.T) *httptest.Server { + return httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + response := struct { + Status string `json:"status"` + Attestation string `json:"attestation"` + }{ + Status: "complete", + Attestation: "720502893578a89a8a87982982ef781c18b193", + } + responseBytes, err := json.Marshal(response) + require.NoError(t, err) + _, err = w.Write(responseBytes) + require.NoError(t, err) + })) +} + +func newRateLimitedServer() *httptest.Server { + return httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusTooManyRequests) + })) +} diff --git a/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go b/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go index c48ae03562..7ef4833be6 100644 --- a/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go +++ b/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go @@ -5,8 +5,6 @@ import ( "encoding/hex" "encoding/json" "fmt" - "io" - "net/http" "net/url" "strings" "time" @@ -19,6 +17,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata/http" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -67,6 +66,7 @@ func (m messageAndAttestation) Validate() error { type TokenDataReader struct { lggr logger.Logger usdcReader ccipdata.USDCReader + httpClient http.IHttpClient attestationApi *url.URL attestationApiTimeout time.Duration } @@ -83,15 +83,25 @@ func NewUSDCTokenDataReader(lggr logger.Logger, usdcReader ccipdata.USDCReader, if usdcAttestationApiTimeoutSeconds == 0 { timeout = defaultAttestationTimeout } - return &TokenDataReader{ lggr: lggr, usdcReader: usdcReader, + httpClient: http.NewObservedIHttpClient(&http.HttpClient{}), attestationApi: usdcAttestationApi, attestationApiTimeout: timeout, } } +func NewUSDCTokenDataReaderWithHttpClient(origin TokenDataReader, httpClient http.IHttpClient) *TokenDataReader { + return &TokenDataReader{ + lggr: origin.lggr, + usdcReader: origin.usdcReader, + httpClient: httpClient, + attestationApi: origin.attestationApi, + attestationApiTimeout: origin.attestationApiTimeout, + } +} + func (s *TokenDataReader) ReadTokenData(ctx context.Context, msg internal.EVM2EVMOnRampCCIPSendRequestedWithMeta) (messageAndAttestation []byte, err error) { messageBody, err := s.getUSDCMessageBody(ctx, msg) if err != nil { @@ -144,45 +154,18 @@ func (s *TokenDataReader) getUSDCMessageBody(ctx context.Context, msg internal.E func (s *TokenDataReader) callAttestationApi(ctx context.Context, usdcMessageHash [32]byte) (attestationResponse, error) { fullAttestationUrl := fmt.Sprintf("%s/%s/%s/0x%x", s.attestationApi, apiVersion, attestationPath, usdcMessageHash) - - // Use a timeout to guard against attestation API hanging, causing observation timeout and failing to make any progress. - timeoutCtx, cancel := context.WithTimeout(ctx, s.attestationApiTimeout) - defer cancel() - req, err := http.NewRequestWithContext(timeoutCtx, "GET", fullAttestationUrl, nil) - - if err != nil { - return attestationResponse{}, err - } - req.Header.Add("accept", "application/json") - res, err := http.DefaultClient.Do(req) + body, _, err := s.httpClient.Get(ctx, fullAttestationUrl, s.attestationApiTimeout) if err != nil { - if errors.Is(err, context.DeadlineExceeded) { - return attestationResponse{}, tokendata.ErrTimeout - } return attestationResponse{}, err } - defer res.Body.Close() - - // Explicitly signal if the API is being rate limited - if res.StatusCode == http.StatusTooManyRequests { - return attestationResponse{}, tokendata.ErrRateLimit - } - - body, err := io.ReadAll(res.Body) - if err != nil { - return attestationResponse{}, err - } - var response attestationResponse err = json.Unmarshal(body, &response) if err != nil { return attestationResponse{}, err } - if response.Status == "" { return attestationResponse{}, fmt.Errorf("invalid attestation response: %s", string(body)) } - return response, nil }