Skip to content

Commit

Permalink
Monitor USDC Attestation API (#277)
Browse files Browse the repository at this point in the history
  • Loading branch information
jarnaud authored Nov 14, 2023
1 parent cdfca9f commit 36df2c2
Show file tree
Hide file tree
Showing 5 changed files with 268 additions and 31 deletions.
47 changes: 47 additions & 0 deletions core/services/ocr2/plugins/ccip/tokendata/http/http_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package http

import (
"context"
"io"
"net/http"
"time"

"github.com/pkg/errors"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata"
)

type IHttpClient interface {
// Get issue a GET request to the given url and return the response body and status code.
Get(ctx context.Context, url string, timeout time.Duration) ([]byte, int, error)
}

type HttpClient struct {
}

func (s *HttpClient) Get(ctx context.Context, url string, timeout time.Duration) ([]byte, int, error) {
// Use a timeout to guard against attestation API hanging, causing observation timeout and failing to make any progress.
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
req, err := http.NewRequestWithContext(timeoutCtx, "GET", url, nil)
if err != nil {
return nil, http.StatusBadRequest, err
}
req.Header.Add("accept", "application/json")
res, err := http.DefaultClient.Do(req)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return nil, http.StatusRequestTimeout, tokendata.ErrTimeout
}
return nil, res.StatusCode, err
}
defer res.Body.Close()

// Explicitly signal if the API is being rate limited
if res.StatusCode == http.StatusTooManyRequests {
return nil, res.StatusCode, tokendata.ErrRateLimit
}

body, err := io.ReadAll(res.Body)
return body, res.StatusCode, err
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package http

import (
"context"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
usdcLatencyBuckets = []float64{
float64(10 * time.Millisecond),
float64(25 * time.Millisecond),
float64(50 * time.Millisecond),
float64(75 * time.Millisecond),
float64(100 * time.Millisecond),
float64(250 * time.Millisecond),
float64(500 * time.Millisecond),
float64(750 * time.Millisecond),
float64(1 * time.Second),
float64(2 * time.Second),
float64(3 * time.Second),
float64(4 * time.Second),
float64(5 * time.Second),
}
usdcClientHistogram = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "ccip_usdc_client_request_total",
Help: "Latency of calls to the USDC client",
Buckets: usdcLatencyBuckets,
}, []string{"status", "success"})
)

type ObservedIHttpClient struct {
IHttpClient
histogram *prometheus.HistogramVec
}

// NewObservedIHttpClient Create a new ObservedIHttpClient with the USDC client metric.
func NewObservedIHttpClient(origin IHttpClient) *ObservedIHttpClient {
return NewObservedIHttpClientWithMetric(origin, usdcClientHistogram)
}

func NewObservedIHttpClientWithMetric(origin IHttpClient, histogram *prometheus.HistogramVec) *ObservedIHttpClient {
return &ObservedIHttpClient{
IHttpClient: origin,
histogram: histogram,
}
}

func (o *ObservedIHttpClient) Get(ctx context.Context, url string, timeout time.Duration) ([]byte, int, error) {
return withObservedHttpClient(o.histogram, func() ([]byte, int, error) {
return o.IHttpClient.Get(ctx, url, timeout)
})
}

func withObservedHttpClient[T any](histogram *prometheus.HistogramVec, contract func() (T, int, error)) (T, int, error) {
contractExecutionStarted := time.Now()
value, status, err := contract()
histogram.
WithLabelValues(
strconv.FormatInt(int64(status), 10),
strconv.FormatBool(err == nil),
).
Observe(float64(time.Since(contractExecutionStarted)))
return value, status, err
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package observability

import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks"
http2 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata/http"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata/usdc"
)

type expected struct {
status string
result string
count int
}

func TestUSDCClientMonitoring(t *testing.T) {

tests := []struct {
name string
server *httptest.Server
requests int
expected []expected
}{
{
name: "success",
server: newSuccessServer(t),
requests: 5,
expected: []expected{
{"200", "true", 5},
{"429", "false", 0},
},
},
{
name: "rate_limited",
server: newRateLimitedServer(),
requests: 26,
expected: []expected{
{"200", "true", 0},
{"429", "false", 26},
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testMonitoring(t, test.name, test.server, test.requests, test.expected, logger.TestLogger(t))
})
}

}

func testMonitoring(t *testing.T, name string, server *httptest.Server, requests int, expected []expected, log logger.Logger) {
server.Start()
defer server.Close()
attestationURI, err := url.ParseRequestURI(server.URL)
require.NoError(t, err)

// Define test histogram (avoid side effects from other tests if using the real usdcHistogram).
histogram := promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "test_client_histogram_" + name,
Help: "Latency of calls to the USDC mock client",
Buckets: []float64{float64(250 * time.Millisecond), float64(1 * time.Second), float64(5 * time.Second)},
}, []string{"status", "success"})

// Mock USDC reader.
usdcReader := mocks.NewUSDCReader(t)
msgBody := []byte{0xb0, 0xd1}
usdcReader.On("GetLastUSDCMessagePriorToLogIndexInTx", mock.Anything, mock.Anything, mock.Anything).Return(msgBody, nil)

// Service with monitored http client.
observedHttpClient := http2.NewObservedIHttpClientWithMetric(&http2.HttpClient{}, histogram)
tokenDataReaderDefault := usdc.NewUSDCTokenDataReader(log, usdcReader, attestationURI, 0)
tokenDataReader := usdc.NewUSDCTokenDataReaderWithHttpClient(*tokenDataReaderDefault, observedHttpClient)
require.NotNil(t, tokenDataReader)

for i := 0; i < requests; i++ {
_, _ = tokenDataReader.ReadTokenData(context.Background(), internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{})
}

// Check that the metrics are updated as expected.
for _, e := range expected {
assert.Equal(t, e.count, counterFromHistogramByLabels(t, histogram, e.status, e.result))
}
}

func counterFromHistogramByLabels(t *testing.T, histogramVec *prometheus.HistogramVec, labels ...string) int {
observer, err := histogramVec.GetMetricWithLabelValues(labels...)
require.NoError(t, err)

metricCh := make(chan prometheus.Metric, 1)
observer.(prometheus.Histogram).Collect(metricCh)
close(metricCh)

metric := <-metricCh
pb := &io_prometheus_client.Metric{}
err = metric.Write(pb)
require.NoError(t, err)

return int(pb.GetHistogram().GetSampleCount())
}

func newSuccessServer(t *testing.T) *httptest.Server {
return httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
response := struct {
Status string `json:"status"`
Attestation string `json:"attestation"`
}{
Status: "complete",
Attestation: "720502893578a89a8a87982982ef781c18b193",
}
responseBytes, err := json.Marshal(response)
require.NoError(t, err)
_, err = w.Write(responseBytes)
require.NoError(t, err)
}))
}

func newRateLimitedServer() *httptest.Server {
return httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusTooManyRequests)
}))
}
45 changes: 14 additions & 31 deletions core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
Expand All @@ -19,6 +17,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata/http"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)
Expand Down Expand Up @@ -67,6 +66,7 @@ func (m messageAndAttestation) Validate() error {
type TokenDataReader struct {
lggr logger.Logger
usdcReader ccipdata.USDCReader
httpClient http.IHttpClient
attestationApi *url.URL
attestationApiTimeout time.Duration
}
Expand All @@ -83,15 +83,25 @@ func NewUSDCTokenDataReader(lggr logger.Logger, usdcReader ccipdata.USDCReader,
if usdcAttestationApiTimeoutSeconds == 0 {
timeout = defaultAttestationTimeout
}

return &TokenDataReader{
lggr: lggr,
usdcReader: usdcReader,
httpClient: http.NewObservedIHttpClient(&http.HttpClient{}),
attestationApi: usdcAttestationApi,
attestationApiTimeout: timeout,
}
}

func NewUSDCTokenDataReaderWithHttpClient(origin TokenDataReader, httpClient http.IHttpClient) *TokenDataReader {
return &TokenDataReader{
lggr: origin.lggr,
usdcReader: origin.usdcReader,
httpClient: httpClient,
attestationApi: origin.attestationApi,
attestationApiTimeout: origin.attestationApiTimeout,
}
}

func (s *TokenDataReader) ReadTokenData(ctx context.Context, msg internal.EVM2EVMOnRampCCIPSendRequestedWithMeta) (messageAndAttestation []byte, err error) {
messageBody, err := s.getUSDCMessageBody(ctx, msg)
if err != nil {
Expand Down Expand Up @@ -144,45 +154,18 @@ func (s *TokenDataReader) getUSDCMessageBody(ctx context.Context, msg internal.E

func (s *TokenDataReader) callAttestationApi(ctx context.Context, usdcMessageHash [32]byte) (attestationResponse, error) {
fullAttestationUrl := fmt.Sprintf("%s/%s/%s/0x%x", s.attestationApi, apiVersion, attestationPath, usdcMessageHash)

// Use a timeout to guard against attestation API hanging, causing observation timeout and failing to make any progress.
timeoutCtx, cancel := context.WithTimeout(ctx, s.attestationApiTimeout)
defer cancel()
req, err := http.NewRequestWithContext(timeoutCtx, "GET", fullAttestationUrl, nil)

if err != nil {
return attestationResponse{}, err
}
req.Header.Add("accept", "application/json")
res, err := http.DefaultClient.Do(req)
body, _, err := s.httpClient.Get(ctx, fullAttestationUrl, s.attestationApiTimeout)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return attestationResponse{}, tokendata.ErrTimeout
}
return attestationResponse{}, err
}
defer res.Body.Close()

// Explicitly signal if the API is being rate limited
if res.StatusCode == http.StatusTooManyRequests {
return attestationResponse{}, tokendata.ErrRateLimit
}

body, err := io.ReadAll(res.Body)
if err != nil {
return attestationResponse{}, err
}

var response attestationResponse
err = json.Unmarshal(body, &response)
if err != nil {
return attestationResponse{}, err
}

if response.Status == "" {
return attestationResponse{}, fmt.Errorf("invalid attestation response: %s", string(body))
}

return response, nil
}

Expand Down

0 comments on commit 36df2c2

Please sign in to comment.