Skip to content

Commit

Permalink
Batch token price updates (#623)
Browse files Browse the repository at this point in the history
## Motivation
Batch all token price updates on leader lane so save price update costs

## Solution
Leader lane:
1. finds out all dest tokens from the Router
2. queries and reports prices for all dest tokens
  • Loading branch information
matYang authored Mar 28, 2024
1 parent cf201bd commit 23a3cb8
Show file tree
Hide file tree
Showing 15 changed files with 621 additions and 84 deletions.
2 changes: 1 addition & 1 deletion core/services/ocr2/plugins/ccip/ccipcommit/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin
lggr: lggr,
inflightReports: newInflightCommitReportsContainer(rf.config.commitStore.OffchainConfig().InflightCacheExpiry),
destPriceRegistryReader: rf.destPriceRegReader,
offRampReader: rf.config.offRamp,
offRampReaders: rf.config.offRamps,
gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(),
offchainConfig: pluginOffChainConfig,
metricsCollector: rf.config.metricsCollector,
Expand Down
56 changes: 49 additions & 7 deletions core/services/ocr2/plugins/ccip/ccipcommit/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/Masterminds/semver/v3"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
chainselectors "github.com/smartcontractkit/chain-selectors"
Expand All @@ -17,6 +18,7 @@ import (
commonlogger "github.com/smartcontractkit/chainlink-common/pkg/logger"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/cache"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcalc"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/ccipdataprovider"
Expand All @@ -43,7 +45,7 @@ import (
)

func NewCommitServices(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer, new bool, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string), qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
pluginConfig, backfillArgs, chainHealthcheck, err := jobSpecToCommitPluginConfig(lggr, jb, pr, chainSet, qopts...)
pluginConfig, backfillArgs, chainHealthcheck, err := jobSpecToCommitPluginConfig(ctx, lggr, jb, pr, chainSet, qopts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -114,7 +116,7 @@ func UnregisterCommitPluginLpFilters(ctx context.Context, lggr logger.Logger, jb
return multiErr
}

func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Runner, chainSet legacyevm.LegacyChainContainer, qopts ...pg.QOpt) (*CommitPluginStaticConfig, *ccipcommon.BackfillArgs, *cache.ObservedChainHealthcheck, error) {
func jobSpecToCommitPluginConfig(ctx context.Context, lggr logger.Logger, jb job.Job, pr pipeline.Runner, chainSet legacyevm.LegacyChainContainer, qopts ...pg.QOpt) (*CommitPluginStaticConfig, *ccipcommon.BackfillArgs, *cache.ObservedChainHealthcheck, error) {
params, err := extractJobSpecParams(jb, chainSet)
if err != nil {
return nil, nil, nil, err
Expand Down Expand Up @@ -186,28 +188,68 @@ func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Run
if err != nil {
return nil, nil, nil, errors.Wrap(err, "failed offramp reader")
}
// Look up all destination offRamps connected to the same router
destRouterAddr, err := offRampReader.GetRouter(ctx)
if err != nil {
return nil, nil, nil, err
}
destRouterEvmAddr, err := ccipcalc.GenericAddrToEvm(destRouterAddr)
if err != nil {
return nil, nil, nil, err
}
destRouter, err := router.NewRouter(destRouterEvmAddr, params.destChain.Client())
if err != nil {
return nil, nil, nil, err
}
destRouterOffRamps, err := destRouter.GetOffRamps(&bind.CallOpts{Context: ctx})
if err != nil {
return nil, nil, nil, err
}
var destOffRampReaders []ccipdata.OffRampReader
for _, o := range destRouterOffRamps {
destOffRampAddr := cciptypes.Address(o.OffRamp.String())
destOffRampReader, err2 := factory.NewOffRampReader(
commitLggr,
versionFinder,
destOffRampAddr,
params.destChain.Client(),
params.destChain.LogPoller(),
params.destChain.GasEstimator(),
params.destChain.Config().EVM().GasEstimator().PriceMax().ToInt(),
true,
qopts...,
)
if err2 != nil {
return nil, nil, nil, err2
}

destOffRampReaders = append(destOffRampReaders, destOffRampReader)
}

onRampRouterAddr, err := onRampReader.RouterAddress()
if err != nil {
return nil, nil, nil, err
}
routerAddr, err := ccipcalc.GenericAddrToEvm(onRampRouterAddr)
sourceRouterAddr, err := ccipcalc.GenericAddrToEvm(onRampRouterAddr)
if err != nil {
return nil, nil, nil, err
}
sourceRouter, err := router.NewRouter(routerAddr, params.sourceChain.Client())
sourceRouter, err := router.NewRouter(sourceRouterAddr, params.sourceChain.Client())
if err != nil {
return nil, nil, nil, err
}
sourceNative, err := sourceRouter.GetWrappedNative(nil)
sourceNative, err := sourceRouter.GetWrappedNative(&bind.CallOpts{Context: ctx})
if err != nil {
return nil, nil, nil, err
}

// Prom wrappers
onRampReader = observability.NewObservedOnRampReader(onRampReader, params.sourceChain.ID().Int64(), ccip.CommitPluginLabel)
offRampReader = observability.NewObservedOffRampReader(offRampReader, params.destChain.ID().Int64(), ccip.CommitPluginLabel)
commitStoreReader = observability.NewObservedCommitStoreReader(commitStoreReader, params.destChain.ID().Int64(), ccip.CommitPluginLabel)
metricsCollector := ccip.NewPluginMetricsCollector(ccip.CommitPluginLabel, params.sourceChain.ID().Int64(), params.destChain.ID().Int64())
for i, o := range destOffRampReaders {
destOffRampReaders[i] = observability.NewObservedOffRampReader(o, params.destChain.ID().Int64(), ccip.CommitPluginLabel)
}

chainHealthcheck := cache.NewObservedChainHealthCheck(
cache.NewChainHealthcheck(
Expand Down Expand Up @@ -237,7 +279,7 @@ func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Run
return &CommitPluginStaticConfig{
lggr: commitLggr,
onRampReader: onRampReader,
offRamp: offRampReader,
offRamps: destOffRampReaders,
sourceNative: ccipcalc.EvmAddrToGeneric(sourceNative),
priceGetter: priceGetter,
sourceChainSelector: params.commitStoreStaticCfg.SourceChainSelector,
Expand Down
24 changes: 11 additions & 13 deletions core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/utils/mathutil"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/cache"

"github.com/smartcontractkit/chainlink/v2/core/logger"
Expand Down Expand Up @@ -60,7 +61,7 @@ type CommitPluginStaticConfig struct {
sourceChainSelector uint64
sourceNative cciptypes.Address
// Dest
offRamp ccipdata.OffRampReader
offRamps []ccipdata.OffRampReader
commitStore ccipdata.CommitStoreReader
destChainSelector uint64
priceRegistryProvider ccipdataprovider.PriceRegistry
Expand All @@ -81,7 +82,7 @@ type CommitReportingPlugin struct {
commitStoreReader ccipdata.CommitStoreReader
destPriceRegistryReader ccipdata.PriceRegistryReader
offchainConfig cciptypes.CommitOffchainConfig
offRampReader ccipdata.OffRampReader
offRampReaders []ccipdata.OffRampReader
F int
// Offchain
priceGetter pricegetter.PriceGetter
Expand Down Expand Up @@ -213,26 +214,24 @@ func (r *CommitReportingPlugin) observePriceUpdates(
return nil, nil, nil
}

feeTokens, bridgeableTokens, err := ccipcommon.GetDestinationTokens(ctx, r.offRampReader, r.destPriceRegistryReader)
sortedChainTokens, err := ccipcommon.GetSortedChainTokens(ctx, r.offRampReaders, r.destPriceRegistryReader)
if err != nil {
return nil, nil, fmt.Errorf("get destination tokens: %w", err)
}
destTokens := ccipcommon.FlattenUniqueSlice(feeTokens, bridgeableTokens)

return r.generatePriceUpdates(ctx, lggr, destTokens)
return r.generatePriceUpdates(ctx, lggr, sortedChainTokens)
}

// All prices are USD ($1=1e18) denominated. All prices must be not nil.
// Return token prices should contain the exact same tokens as in tokenDecimals.
func (r *CommitReportingPlugin) generatePriceUpdates(
ctx context.Context,
lggr logger.Logger,
destTokens []cciptypes.Address,
sortedChainTokens []cciptypes.Address,
) (sourceGasPriceUSD *big.Int, tokenPricesUSD map[cciptypes.Address]*big.Int, err error) {
// Include wrapped native in our token query as way to identify the source native USD price.
// notice USD is in 1e18 scale, i.e. $1 = 1e18
queryTokens := ccipcommon.FlattenUniqueSlice([]cciptypes.Address{r.sourceNative}, destTokens)
sort.Slice(queryTokens, func(i, j int) bool { return queryTokens[i] < queryTokens[j] }) // make the query deterministic
queryTokens := ccipcommon.FlattenUniqueSlice([]cciptypes.Address{r.sourceNative}, sortedChainTokens)

rawTokenPricesUSD, err := r.priceGetter.TokenPricesUSD(ctx, queryTokens)
if err != nil {
Expand All @@ -252,13 +251,13 @@ func (r *CommitReportingPlugin) generatePriceUpdates(
return nil, nil, fmt.Errorf("missing source native (%s) price", r.sourceNative)
}

destTokensDecimals, err := r.destPriceRegistryReader.GetTokensDecimals(ctx, destTokens)
destTokensDecimals, err := r.destPriceRegistryReader.GetTokensDecimals(ctx, sortedChainTokens)
if err != nil {
return nil, nil, fmt.Errorf("get tokens decimals: %w", err)
}

tokenPricesUSD = make(map[cciptypes.Address]*big.Int, len(rawTokenPricesUSD))
for i, token := range destTokens {
for i, token := range sortedChainTokens {
tokenPricesUSD[token] = calculateUsdPer1e18TokenAmount(rawTokenPricesUSD[token], destTokensDecimals[i])
}

Expand Down Expand Up @@ -380,14 +379,13 @@ func (r *CommitReportingPlugin) Report(ctx context.Context, epochAndRound types.

parsableObservations := ccip.GetParsableObservations[ccip.CommitObservation](lggr, observations)

feeTokens, bridgeableTokens, err := ccipcommon.GetDestinationTokens(ctx, r.offRampReader, r.destPriceRegistryReader)
sortedChainTokens, err := ccipcommon.GetSortedChainTokens(ctx, r.offRampReaders, r.destPriceRegistryReader)
if err != nil {
return false, nil, fmt.Errorf("get destination tokens: %w", err)
}
destTokens := ccipcommon.FlattenUniqueSlice(feeTokens, bridgeableTokens)

// Filters out parsable but faulty observations
validObservations, err := validateObservations(ctx, lggr, destTokens, r.F, parsableObservations, r.offchainConfig.PriceReportingDisabled)
validObservations, err := validateObservations(ctx, lggr, sortedChainTokens, r.F, parsableObservations, r.offchainConfig.PriceReportingDisabled)
if err != nil {
return false, nil, err
}
Expand Down
Loading

0 comments on commit 23a3cb8

Please sign in to comment.