Skip to content

Commit

Permalink
CCIP-3593 Adding more logs to non-trivial places in USDC token proces…
Browse files Browse the repository at this point in the history
…sing (#191)
  • Loading branch information
mateusz-sekara authored Oct 1, 2024
1 parent c600a05 commit 0c1e265
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 23 deletions.
2 changes: 1 addition & 1 deletion execute/tokendata/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func createUSDCTokenObserver(
return nil, err
}

client, err := usdc.NewSequentialAttestationClient(cctpConfig)
client, err := usdc.NewSequentialAttestationClient(lggr, cctpConfig)
if err != nil {
return nil, fmt.Errorf("create attestation client: %w", err)
}
Expand Down
15 changes: 14 additions & 1 deletion execute/tokendata/usdc/attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"

"github.com/smartcontractkit/chainlink-common/pkg/hashutil"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"

"github.com/smartcontractkit/chainlink-ccip/execute/exectypes"
Expand Down Expand Up @@ -60,12 +61,17 @@ type AttestationClient interface {
}

type sequentialAttestationClient struct {
lggr logger.Logger
client HTTPClient
hasher hashutil.Hasher[[32]byte]
}

func NewSequentialAttestationClient(config pluginconfig.USDCCCTPObserverConfig) (AttestationClient, error) {
func NewSequentialAttestationClient(
lggr logger.Logger,
config pluginconfig.USDCCCTPObserverConfig,
) (AttestationClient, error) {
client, err := NewHTTPClient(
lggr,
config.AttestationAPI,
config.AttestationAPIInterval.Duration(),
config.AttestationAPITimeout.Duration(),
Expand All @@ -74,6 +80,7 @@ func NewSequentialAttestationClient(config pluginconfig.USDCCCTPObserverConfig)
return nil, fmt.Errorf("create HTTP client: %w", err)
}
return &sequentialAttestationClient{
lggr: lggr,
client: client,
hasher: hashutil.NewKeccak(),
}, nil
Expand All @@ -89,6 +96,12 @@ func (s *sequentialAttestationClient) Attestations(
outcome[chainSelector] = make(map[exectypes.MessageTokenID]AttestationStatus)

for tokenID, messageHash := range hashes {
s.lggr.Debugw(
"Fetching attestation from the API",
"chainSelector", chainSelector,
"messageHash", messageHash,
"messageTokenID", tokenID,
)
// TODO sequential processing
outcome[chainSelector][tokenID] = s.fetchSingleMessage(ctx, messageHash)
}
Expand Down
3 changes: 2 additions & 1 deletion execute/tokendata/usdc/attestation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -155,7 +156,7 @@ func Test_AttestationClient(t *testing.T) {
server := httptest.NewServer(createHandler(t, tc.success, tc.pending))
defer server.Close()

client, err := NewSequentialAttestationClient(pluginconfig.USDCCCTPObserverConfig{
client, err := NewSequentialAttestationClient(logger.Test(t), pluginconfig.USDCCCTPObserverConfig{
AttestationAPI: server.URL,
AttestationAPIInterval: commonconfig.MustNewDuration(1 * time.Millisecond),
AttestationAPITimeout: commonconfig.MustNewDuration(1 * time.Minute),
Expand Down
39 changes: 33 additions & 6 deletions execute/tokendata/usdc/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sync"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"
"golang.org/x/time/rate"
)
Expand Down Expand Up @@ -50,6 +51,7 @@ type HTTPClient interface {
// Therefore AttestationClient is a higher level abstraction that uses httpClient to fetch attestations and can be more
// oriented around caching/processing the attestation data instead of handling the API specifics.
type httpClient struct {
lggr logger.Logger
apiURL *url.URL
apiTimeout time.Duration
rate *rate.Limiter
Expand All @@ -58,13 +60,19 @@ type httpClient struct {
coolDownMu *sync.RWMutex
}

func NewHTTPClient(api string, apiInterval time.Duration, apiTimeout time.Duration) (HTTPClient, error) {
func NewHTTPClient(
lggr logger.Logger,
api string,
apiInterval time.Duration,
apiTimeout time.Duration,
) (HTTPClient, error) {
u, err := url.ParseRequestURI(api)
if err != nil {
return nil, err
}

return &httpClient{
lggr: lggr,
apiURL: u,
apiTimeout: apiTimeout,
rate: rate.NewLimiter(rate.Every(apiInterval), 1),
Expand Down Expand Up @@ -108,14 +116,19 @@ func (r httpResponse) attestationToBytes() (cciptypes.Bytes, error) {

func (h *httpClient) Get(ctx context.Context, messageHash cciptypes.Bytes32) (cciptypes.Bytes, HTTPStatus, error) {
// Terminate immediately when rate limited
if h.inCoolDownPeriod() {
if coolDown, duration := h.inCoolDownPeriod(); coolDown {
h.lggr.Errorw(
"Rate limited by the Attestation API, dropping all requests",
"coolDownDuration", duration,
)
return nil, http.StatusTooManyRequests, ErrRateLimit
}

if h.rate != nil {
// Wait blocks until it the attestation API can be called or the
// context is Done.
if waitErr := h.rate.Wait(ctx); waitErr != nil {
h.lggr.Warnw("Self rate-limited, sending too many requests to the Attestation API")
return nil, http.StatusTooManyRequests, ErrRateLimit
}
}
Expand All @@ -127,7 +140,15 @@ func (h *httpClient) Get(ctx context.Context, messageHash cciptypes.Bytes32) (cc

requestURL := *h.apiURL
requestURL.Path = path.Join(requestURL.Path, apiVersion, attestationPath, messageHash.String())
return h.callAPI(timeoutCtx, requestURL)

response, httpStatus, err := h.callAPI(timeoutCtx, requestURL)
h.lggr.Debugw(
"Response from attestation API",
"messageHash", messageHash,
"status", httpStatus,
"err", err,
)
return response, httpStatus, err
}

func (h *httpClient) callAPI(ctx context.Context, url url.URL) (cciptypes.Bytes, HTTPStatus, error) {
Expand Down Expand Up @@ -191,14 +212,20 @@ func (h *httpClient) setCoolDownPeriod(headers http.Header) {
}
}

coolDownDuration = min(coolDownDuration, maxCoolDownDuration)
//Logging on the error level, because we should always self-rate limit before hitting the API rate limit
h.lggr.Errorw(
"Rate limited by the Attestation API, setting cool down",
"coolDownDuration", coolDownDuration,
)

h.coolDownMu.Lock()
defer h.coolDownMu.Unlock()
coolDownDuration = min(coolDownDuration, maxCoolDownDuration)
h.coolDownUntil = time.Now().Add(coolDownDuration)
}

func (h *httpClient) inCoolDownPeriod() bool {
func (h *httpClient) inCoolDownPeriod() (bool, time.Duration) {
h.coolDownMu.RLock()
defer h.coolDownMu.RUnlock()
return time.Now().Before(h.coolDownUntil)
return time.Now().Before(h.coolDownUntil), time.Until(h.coolDownUntil)
}
11 changes: 6 additions & 5 deletions execute/tokendata/usdc/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -48,7 +49,7 @@ func Test_NewHTTPClient_New(t *testing.T) {

for _, tc := range tt {
t.Run(tc.api, func(t *testing.T) {
client, err := NewHTTPClient(tc.api, 1*time.Millisecond, longTimeout)
client, err := NewHTTPClient(logger.Test(t), tc.api, 1*time.Millisecond, longTimeout)
if tc.wantErr {
require.Error(t, err)
} else {
Expand Down Expand Up @@ -211,7 +212,7 @@ func Test_HTTPClient_Get(t *testing.T) {
attestationURI, err := url.ParseRequestURI(ts.URL)
require.NoError(t, err)

client, err := NewHTTPClient(attestationURI.String(), tc.timeout, tc.timeout)
client, err := NewHTTPClient(logger.Test(t), attestationURI.String(), tc.timeout, tc.timeout)
require.NoError(t, err)
response, statusCode, err := client.Get(tests.Context(t), tc.messageHash)

Expand Down Expand Up @@ -242,7 +243,7 @@ func Test_HTTPClient_Cooldown(t *testing.T) {
attestationURI, err := url.ParseRequestURI(ts.URL)
require.NoError(t, err)

client, err := NewHTTPClient(attestationURI.String(), 1*time.Millisecond, longTimeout)
client, err := NewHTTPClient(logger.Test(t), attestationURI.String(), 1*time.Millisecond, longTimeout)
require.NoError(t, err)
_, _, err = client.Get(tests.Context(t), [32]byte{1, 2, 3})
require.EqualError(t, err, ErrUnknownResponse.Error())
Expand Down Expand Up @@ -271,7 +272,7 @@ func Test_HTTPClient_CoolDownWithRetryHeader(t *testing.T) {
attestationURI, err := url.ParseRequestURI(ts.URL)
require.NoError(t, err)

client, err := NewHTTPClient(attestationURI.String(), 1*time.Millisecond, longTimeout)
client, err := NewHTTPClient(logger.Test(t), attestationURI.String(), 1*time.Millisecond, longTimeout)
require.NoError(t, err)
_, _, err = client.Get(tests.Context(t), [32]byte{1, 2, 3})
require.EqualError(t, err, ErrUnknownResponse.Error())
Expand Down Expand Up @@ -338,7 +339,7 @@ func Test_HTTPClient_RateLimiting_Parallel(t *testing.T) {
attestationURI, err := url.ParseRequestURI(ts.URL)
require.NoError(t, err)

client, err := NewHTTPClient(attestationURI.String(), tc.rateConfig, longTimeout)
client, err := NewHTTPClient(logger.Test(t), attestationURI.String(), tc.rateConfig, longTimeout)
require.NoError(t, err)

ctx := context.Background()
Expand Down
35 changes: 32 additions & 3 deletions execute/tokendata/usdc/usdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"
"golang.org/x/exp/maps"

"github.com/smartcontractkit/chainlink-ccip/execute/exectypes"
"github.com/smartcontractkit/chainlink-ccip/pkg/reader"
Expand All @@ -29,11 +30,16 @@ func NewTokenDataObserver(
usdcMessageReader reader.USDCMessageReader,
attestationClient AttestationClient,
) *TokenDataObserver {
supportedPoolsBySelector := make(map[string]string)
supportedTokens := make(map[string]struct{})
for chainSelector, tokenConfig := range tokens {
key := sourceTokenIdentifier(chainSelector, tokenConfig.SourcePoolAddress)
supportedTokens[key] = struct{}{}
supportedPoolsBySelector[chainSelector.String()] = tokenConfig.SourcePoolAddress
}
lggr.Infow("Created USDC Token Data Observer",
"supportedTokenPools", supportedPoolsBySelector,
)

return &TokenDataObserver{
lggr: lggr,
Expand Down Expand Up @@ -85,10 +91,19 @@ func (u *TokenDataObserver) pickOnlyUSDCMessages(
for seqNum, message := range messages {
for i, tokenAmount := range message.TokenAmounts {
tokenIdentifier := sourceTokenIdentifier(chainSelector, tokenAmount.SourcePoolAddress.String())
if _, ok := u.supportedTokens[tokenIdentifier]; !ok {
continue
_, ok := u.supportedTokens[tokenIdentifier]
if ok {
usdcMessages[chainSelector][exectypes.NewMessageTokenID(seqNum, i)] = tokenAmount
}
usdcMessages[chainSelector][exectypes.NewMessageTokenID(seqNum, i)] = tokenAmount

u.lggr.Debugw(
"Scanning message's tokens for USDC data",
"isUSDC", ok,
"seqNum", seqNum,
"sourceChainSelector", chainSelector,
"sourcePoolAddress", tokenAmount.SourcePoolAddress.String(),
"destTokenAddress", tokenAmount.DestTokenAddress.String(),
)
}
}
}
Expand All @@ -109,6 +124,13 @@ func (u *TokenDataObserver) fetchUSDCMessageHashes(
// TODO Sequential reading USDC messages from the source chain
usdcHashes, err := u.usdcMessageReader.MessageHashes(ctx, chainSelector, u.destChainSelector, messages)
if err != nil {
u.lggr.Errorw(
"Failed fetching USDC events from the source chain",
"sourceChainSelector", chainSelector,
"destChainSelector", u.destChainSelector,
"messageTokenIDs", maps.Keys(messages),
"error", err,
)
return nil, err
}
output[chainSelector] = usdcHashes
Expand Down Expand Up @@ -141,6 +163,13 @@ func (u *TokenDataObserver) extractTokenData(
tokenData := make([]exectypes.TokenData, len(message.TokenAmounts))
for i, tokenAmount := range message.TokenAmounts {
if !u.IsTokenSupported(chainSelector, tokenAmount) {
u.lggr.Debugw(
"Ignoring unsupported token",
"seqNum", seqNum,
"sourceChainSelector", chainSelector,
"sourcePoolAddress", tokenAmount.SourcePoolAddress.String(),
"destTokenAddress", tokenAmount.DestTokenAddress.String(),
)
tokenData[i] = exectypes.NotSupportedTokenData()
} else {
tokenData[i] = u.attestationToTokenData(ctx, seqNum, i, attestations[chainSelector])
Expand Down
11 changes: 5 additions & 6 deletions execute/tokendata/usdc/usdc_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,11 @@ func Test_USDC_CCTP_Flow(t *testing.T) {
})
require.NoError(t, err)

attestation, err := usdc.NewSequentialAttestationClient(
pluginconfig.USDCCCTPObserverConfig{
AttestationAPI: server.URL,
AttestationAPIInterval: commonconfig.MustNewDuration(1 * time.Microsecond),
AttestationAPITimeout: commonconfig.MustNewDuration(1 * time.Second),
})
attestation, err := usdc.NewSequentialAttestationClient(logger.Test(t), pluginconfig.USDCCCTPObserverConfig{
AttestationAPI: server.URL,
AttestationAPIInterval: commonconfig.MustNewDuration(1 * time.Microsecond),
AttestationAPITimeout: commonconfig.MustNewDuration(1 * time.Second),
})
require.NoError(t, err)

tkReader := usdc.NewTokenDataObserver(
Expand Down

0 comments on commit 0c1e265

Please sign in to comment.