Skip to content

Commit

Permalink
offchain - batched rpc calls to get token pool rate limits and token …
Browse files Browse the repository at this point in the history
…decimals (#252)

Co-authored-by: Mateusz Sekara <[email protected]>
  • Loading branch information
dimkouv and mateusz-sekara authored Nov 7, 2023
1 parent af2deee commit 0756cbd
Show file tree
Hide file tree
Showing 16 changed files with 374 additions and 200 deletions.
1 change: 0 additions & 1 deletion core/services/ocr2/plugins/ccip/commit_reporting_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin
rf.config.destLP,
rf.config.offRamp,
rf.destPriceRegReader,
rf.config.destClient,
int64(rf.config.commitStore.OffchainConfig().DestFinalityDepth),
),
gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ func TestCommitReportingPlugin_Report(t *testing.T) {

aos := make([]types.AttributedObservation, 0, len(tc.observations))
for _, o := range tc.observations {
obs, err := o.Marshal()
assert.NoError(t, err)
obs, err2 := o.Marshal()
assert.NoError(t, err2)
aos = append(aos, types.AttributedObservation{Observation: obs})
}

Expand Down
100 changes: 56 additions & 44 deletions core/services/ocr2/plugins/ccip/execution_reporting_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/custom_token_pool"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/evm_2_evm_offramp"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal"
Expand Down Expand Up @@ -66,19 +65,18 @@ type ExecutionPluginStaticConfig struct {
type ExecutionReportingPlugin struct {
config ExecutionPluginStaticConfig

F int
lggr logger.Logger
inflightReports *inflightExecReportsContainer
snoozedRoots cache.SnoozedRoots
destPriceRegistry ccipdata.PriceRegistryReader
destWrappedNative common.Address
onchainConfig ccipdata.ExecOnchainConfig
offchainConfig ccipdata.ExecOffchainConfig
cachedSourceFeeTokens cache.AutoSync[[]common.Address]
cachedDestTokens cache.AutoSync[cache.CachedTokens]
cachedTokenPools cache.AutoSync[map[common.Address]common.Address]
customTokenPoolFactory func(ctx context.Context, poolAddress common.Address, bind bind.ContractBackend) (custom_token_pool.CustomTokenPoolInterface, error)
gasPriceEstimator prices.GasPriceEstimatorExec
F int
lggr logger.Logger
inflightReports *inflightExecReportsContainer
snoozedRoots cache.SnoozedRoots
destPriceRegistry ccipdata.PriceRegistryReader
destWrappedNative common.Address
onchainConfig ccipdata.ExecOnchainConfig
offchainConfig ccipdata.ExecOffchainConfig
cachedSourceFeeTokens cache.AutoSync[[]common.Address]
cachedDestTokens cache.AutoSync[cache.CachedTokens]
cachedTokenPools cache.AutoSync[map[common.Address]common.Address]
gasPriceEstimator prices.GasPriceEstimatorExec
}

type ExecutionReportingPluginFactory struct {
Expand Down Expand Up @@ -157,10 +155,7 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPlugin(config types.Repor
cachedDestTokens: cachedDestTokens,
cachedSourceFeeTokens: cachedSourceFeeTokens,
cachedTokenPools: cachedTokenPools,
customTokenPoolFactory: func(ctx context.Context, poolAddress common.Address, contractBackend bind.ContractBackend) (custom_token_pool.CustomTokenPoolInterface, error) {
return custom_token_pool.NewCustomTokenPool(poolAddress, contractBackend)
},
gasPriceEstimator: rf.config.offRampReader.GasPriceEstimator(),
gasPriceEstimator: rf.config.offRampReader.GasPriceEstimator(),
}, types.ReportingPluginInfo{
Name: "CCIPExecution",
// Setting this to false saves on calldata since OffRamp doesn't require agreement between NOPs
Expand Down Expand Up @@ -358,49 +353,66 @@ func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context
return []ObservedMessage{}, nil
}

// destPoolRateLimits returns a map that consists of the rate limits of each destination tokens of the provided reports.
// destPoolRateLimits returns a map that consists of the rate limits of each destination token of the provided reports.
// If a token is missing from the returned map it either means that token was not found or token pool is disabled for this token.
func (r *ExecutionReportingPlugin) destPoolRateLimits(ctx context.Context, commitReports []commitReportWithSendRequests, sourceToDestToken map[common.Address]common.Address) (map[common.Address]*big.Int, error) {
dstTokens := make(map[common.Address]struct{}) // todo: replace with a set or uniqueSlice data structure
tokenPools, err := r.cachedTokenPools.Get(ctx)
if err != nil {
return nil, fmt.Errorf("get cached token pools: %w", err)
}

dstTokenToPool := make(map[common.Address]common.Address)
dstPoolToToken := make(map[common.Address]common.Address)
dstPools := make([]common.Address, 0)

for _, msg := range commitReports {
for _, req := range msg.sendRequestsWithMeta {
for _, tk := range req.TokenAmounts {
if dstToken, exists := sourceToDestToken[tk.Token]; exists {
dstTokens[dstToken] = struct{}{}
dstToken, exists := sourceToDestToken[tk.Token]
if !exists {
r.lggr.Warnw("token not found on destination chain", "sourceToken", tk)
continue
}
r.lggr.Warnw("token not found on destination chain", "sourceToken", tk)

// another message with the same token exists in the report
// we skip it since we don't want to query for the rate limit twice
if _, seen := dstTokenToPool[dstToken]; seen {
continue
}

poolAddress, exists := tokenPools[dstToken]
if !exists {
return nil, fmt.Errorf("pool for token '%s' does not exist", dstToken)
}

if tokenAddr, seen := dstPoolToToken[poolAddress]; seen {
return nil, fmt.Errorf("pool is already seen for token %s", tokenAddr)
}

dstTokenToPool[dstToken] = poolAddress
dstPoolToToken[poolAddress] = dstToken
dstPools = append(dstPools, poolAddress)
}
}
}

tokenPools, err := r.cachedTokenPools.Get(ctx)
rateLimits, err := r.config.offRampReader.GetTokenPoolsRateLimits(ctx, dstPools)
if err != nil {
return nil, fmt.Errorf("get cached token pools: %w", err)
return nil, fmt.Errorf("fetch pool rate limits: %w", err)
}

res := make(map[common.Address]*big.Int, len(dstTokens))

for dstToken := range dstTokens {
poolAddress, exists := tokenPools[dstToken]
if !exists {
return nil, fmt.Errorf("pool for token '%s' does not exist", dstToken)
}

tokenPool, err := r.customTokenPoolFactory(ctx, poolAddress, r.config.destClient)
if err != nil {
return nil, fmt.Errorf("new custom dest token pool %s: %w", poolAddress, err)
}

offRampAddr := r.config.offRampReader.Address()
rateLimiterState, err := tokenPool.CurrentOffRampRateLimiterState(&bind.CallOpts{Context: ctx}, offRampAddr)
if err != nil {
return nil, fmt.Errorf("get rate off ramp rate limiter state: %w", err)
res := make(map[common.Address]*big.Int, len(dstTokenToPool))
for i, rateLimit := range rateLimits {
// if the rate limit is disabled for this token pool then we omit it from the result
if !rateLimit.IsEnabled {
continue
}

if rateLimiterState.IsEnabled {
res[dstToken] = rateLimiterState.Tokens
tokenAddr, exists := dstPoolToToken[dstPools[i]]
if !exists {
return nil, fmt.Errorf("pool to token mapping does not contain %s", dstPools[i])
}
res[tokenAddr] = rateLimit.Tokens
}

return res, nil
Expand Down
143 changes: 74 additions & 69 deletions core/services/ocr2/plugins/ccip/execution_reporting_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/cometbft/cometbft/libs/rand"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"github.com/smartcontractkit/libocr/commontypes"
Expand All @@ -23,7 +22,6 @@ import (
"github.com/stretchr/testify/require"

lpMocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller/mocks"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/custom_token_pool"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/cache"
Expand Down Expand Up @@ -128,6 +126,8 @@ func TestExecutionReportingPlugin_Observation(t *testing.T) {
mockOffRampReader.On("CurrentRateLimiterState", mock.Anything).Return(tc.rateLimiterState, nil).Maybe()
mockOffRampReader.On("Address").Return(offRamp.Address()).Maybe()
mockOffRampReader.On("GetSenderNonce", mock.Anything, mock.Anything).Return(offRamp.GetSenderNonce(nil, utils.RandomAddress())).Maybe()
mockOffRampReader.On("GetTokenPoolsRateLimits", ctx, []common.Address{}).
Return([]ccipdata.TokenBucketRateLimit{}, nil).Maybe()
p.config.offRampReader = mockOffRampReader

mockOnRampReader := ccipdata.NewMockOnRampReader(t)
Expand Down Expand Up @@ -765,11 +765,14 @@ func TestExecutionReportingPlugin_destPoolRateLimits(t *testing.T) {
tk2pool := utils.RandomAddress()

testCases := []struct {
name string
tokenAmounts []internal.TokenAmount
sourceToDestToken map[common.Address]common.Address
destPools map[common.Address]common.Address
poolRateLimits map[common.Address]custom_token_pool.RateLimiterTokenBucket
name string
tokenAmounts []internal.TokenAmount
// the order of the following fields: sourceTokens, destTokens and poolRateLimits
// should follow the order of the tokenAmounts
sourceTokens []common.Address
destTokens []common.Address
destPools []common.Address
poolRateLimits []ccipdata.TokenBucketRateLimit
destPoolsCacheErr error

expRateLimits map[common.Address]*big.Int
Expand All @@ -783,17 +786,12 @@ func TestExecutionReportingPlugin_destPoolRateLimits(t *testing.T) {
{Token: tk1},
{Token: tk1},
},
sourceToDestToken: map[common.Address]common.Address{
tk1: tk1dest,
tk2: tk2dest,
},
destPools: map[common.Address]common.Address{
tk1dest: tk1pool,
tk2dest: tk2pool,
},
poolRateLimits: map[common.Address]custom_token_pool.RateLimiterTokenBucket{
tk1pool: {Tokens: big.NewInt(1000), IsEnabled: true},
tk2pool: {Tokens: big.NewInt(2000), IsEnabled: true},
sourceTokens: []common.Address{tk1, tk2},
destTokens: []common.Address{tk1dest, tk2dest},
destPools: []common.Address{tk1pool, tk2pool},
poolRateLimits: []ccipdata.TokenBucketRateLimit{
{Tokens: big.NewInt(1000), IsEnabled: true},
{Tokens: big.NewInt(2000), IsEnabled: true},
},
expRateLimits: map[common.Address]*big.Int{
tk1dest: big.NewInt(1000),
Expand All @@ -802,19 +800,16 @@ func TestExecutionReportingPlugin_destPoolRateLimits(t *testing.T) {
expErr: false,
},
{
name: "token missing from source to dest mapping",
name: "missing from source to dest mapping should not return error",
tokenAmounts: []internal.TokenAmount{
{Token: tk1},
{Token: tk2}, // <-- missing form sourceToDestToken
},
sourceToDestToken: map[common.Address]common.Address{
tk1: tk1dest,
},
destPools: map[common.Address]common.Address{
tk1dest: tk1pool,
{Token: tk2}, // <- missing
},
poolRateLimits: map[common.Address]custom_token_pool.RateLimiterTokenBucket{
tk1pool: {Tokens: big.NewInt(1000), IsEnabled: true},
sourceTokens: []common.Address{tk1},
destTokens: []common.Address{tk1dest},
destPools: []common.Address{tk1pool},
poolRateLimits: []ccipdata.TokenBucketRateLimit{
{Tokens: big.NewInt(1000), IsEnabled: true},
},
expRateLimits: map[common.Address]*big.Int{
tk1dest: big.NewInt(1000),
Expand All @@ -827,63 +822,82 @@ func TestExecutionReportingPlugin_destPoolRateLimits(t *testing.T) {
{Token: tk1},
{Token: tk2},
},
sourceToDestToken: map[common.Address]common.Address{
tk1: tk1dest,
tk2: tk2dest,
},
destPools: map[common.Address]common.Address{
tk1dest: tk1pool,
tk2dest: tk2pool,
},
poolRateLimits: map[common.Address]custom_token_pool.RateLimiterTokenBucket{
tk1pool: {Tokens: big.NewInt(1000), IsEnabled: true},
tk2pool: {Tokens: big.NewInt(2000), IsEnabled: false}, // <--- pool disabled
sourceTokens: []common.Address{tk1, tk2},
destTokens: []common.Address{tk1dest, tk2dest},
destPools: []common.Address{tk1pool, tk2pool},
poolRateLimits: []ccipdata.TokenBucketRateLimit{
{Tokens: big.NewInt(1000), IsEnabled: true},
{Tokens: big.NewInt(2000), IsEnabled: false},
},
expRateLimits: map[common.Address]*big.Int{
tk1dest: big.NewInt(1000),
},
expErr: false,
},
{
name: "dest pool cache error",
tokenAmounts: []internal.TokenAmount{{Token: tk1}},
sourceToDestToken: map[common.Address]common.Address{tk1: tk1dest},
destPoolsCacheErr: errors.New("some random error"),
name: "dest pool cache error",
tokenAmounts: []internal.TokenAmount{
{Token: tk1},
},
sourceTokens: []common.Address{tk1},
destTokens: []common.Address{tk1dest},
destPools: []common.Address{tk1pool},
poolRateLimits: []ccipdata.TokenBucketRateLimit{
{Tokens: big.NewInt(1000), IsEnabled: true},
},
expRateLimits: map[common.Address]*big.Int{
tk1dest: big.NewInt(1000),
},
destPoolsCacheErr: errors.New("some err"),
expErr: true,
},
{
name: "pool for token not found",
tokenAmounts: []internal.TokenAmount{{Token: tk1}},
sourceToDestToken: map[common.Address]common.Address{tk1: tk1dest},
destPools: map[common.Address]common.Address{},
expErr: true,
name: "pool for token not found",
tokenAmounts: []internal.TokenAmount{
{Token: tk1}, {Token: tk2}, {Token: tk1}, {Token: tk2},
},
sourceTokens: []common.Address{tk1, tk2},
destTokens: []common.Address{tk1dest, tk2dest},
destPools: []common.Address{tk1pool}, // <-- pool2 not found
poolRateLimits: []ccipdata.TokenBucketRateLimit{
{Tokens: big.NewInt(1000), IsEnabled: true},
},
expRateLimits: map[common.Address]*big.Int{
tk1dest: big.NewInt(1000),
},
expErr: true,
},
}

ctx := testutils.Context(t)
lggr := logger.TestLogger(t)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
sourceToDestMapping := make(map[common.Address]common.Address)
for i, srcTk := range tc.sourceTokens {
sourceToDestMapping[srcTk] = tc.destTokens[i]
}

poolsMapping := make(map[common.Address]common.Address)
for i, poolAddr := range tc.destPools {
poolsMapping[tc.destTokens[i]] = poolAddr
}

p := &ExecutionReportingPlugin{}
p.lggr = lggr

tokenPoolsCache := cache.NewMockAutoSync[map[common.Address]common.Address](t)
tokenPoolsCache.On("Get", ctx).Return(tc.destPools, tc.destPoolsCacheErr).Maybe()
tokenPoolsCache.On("Get", ctx).Return(poolsMapping, tc.destPoolsCacheErr).Maybe()
p.cachedTokenPools = tokenPoolsCache

offRamp, offRampAddr := testhelpers.NewFakeOffRamp(t)
offRamp.SetTokenPools(tc.destPools)

offRampAddr := utils.RandomAddress()
mockOffRampReader := ccipdata.NewMockOffRampReader(t)
mockOffRampReader.On("Address").Return(offRampAddr, nil).Maybe()
mockOffRampReader.On("GetTokenPoolsRateLimits", ctx, tc.destPools).
Return(tc.poolRateLimits, nil).
Maybe()
p.config.offRampReader = mockOffRampReader

p.customTokenPoolFactory = func(ctx context.Context, poolAddress common.Address, _ bind.ContractBackend) (custom_token_pool.CustomTokenPoolInterface, error) {
mp := &mockPool{}
mp.On("CurrentOffRampRateLimiterState", mock.Anything, offRampAddr).Return(tc.poolRateLimits[poolAddress], nil)
return mp, nil
}

rateLimits, err := p.destPoolRateLimits(ctx, []commitReportWithSendRequests{
{
sendRequestsWithMeta: []internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{
Expand All @@ -894,7 +908,8 @@ func TestExecutionReportingPlugin_destPoolRateLimits(t *testing.T) {
},
},
},
}, tc.sourceToDestToken)
}, sourceToDestMapping)

if tc.expErr {
assert.Error(t, err)
return
Expand Down Expand Up @@ -1711,13 +1726,3 @@ func generateExecutionReport(t *testing.T, numMsgs, tokensPerMsg, bytesPerMsg in
ProofFlagBits: big.NewInt(rand.Int64()),
}
}

type mockPool struct {
custom_token_pool.CustomTokenPoolInterface
mock.Mock
}

func (mp *mockPool) CurrentOffRampRateLimiterState(opts *bind.CallOpts, offRamp common.Address) (custom_token_pool.RateLimiterTokenBucket, error) {
args := mp.Called(opts, offRamp)
return args.Get(0).(custom_token_pool.RateLimiterTokenBucket), args.Error(1)
}
Loading

0 comments on commit 0756cbd

Please sign in to comment.