Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch token price updates #623

Merged
merged 25 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/services/ocr2/plugins/ccip/ccipcommit/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin
lggr: lggr,
inflightReports: newInflightCommitReportsContainer(rf.config.commitStore.OffchainConfig().InflightCacheExpiry),
destPriceRegistryReader: rf.destPriceRegReader,
offRampReader: rf.config.offRamp,
offRampReaders: rf.config.offRamps,
gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(),
offchainConfig: pluginOffChainConfig,
metricsCollector: rf.config.metricsCollector,
Expand Down
45 changes: 38 additions & 7 deletions core/services/ocr2/plugins/ccip/ccipcommit/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/Masterminds/semver/v3"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
chainselectors "github.com/smartcontractkit/chain-selectors"
Expand Down Expand Up @@ -43,7 +44,7 @@ import (
)

func NewCommitServices(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer, new bool, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string), qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
pluginConfig, backfillArgs, chainHealthcheck, err := jobSpecToCommitPluginConfig(lggr, jb, pr, chainSet, qopts...)
pluginConfig, backfillArgs, chainHealthcheck, err := jobSpecToCommitPluginConfig(ctx, lggr, jb, pr, chainSet, qopts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -114,7 +115,7 @@ func UnregisterCommitPluginLpFilters(lggr logger.Logger, jb job.Job, chainSet le
return multiErr
}

func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Runner, chainSet legacyevm.LegacyChainContainer, qopts ...pg.QOpt) (*CommitPluginStaticConfig, *ccipcommon.BackfillArgs, *cache.ObservedChainHealthcheck, error) {
func jobSpecToCommitPluginConfig(ctx context.Context, lggr logger.Logger, jb job.Job, pr pipeline.Runner, chainSet legacyevm.LegacyChainContainer, qopts ...pg.QOpt) (*CommitPluginStaticConfig, *ccipcommon.BackfillArgs, *cache.ObservedChainHealthcheck, error) {
params, err := extractJobSpecParams(jb, chainSet)
if err != nil {
return nil, nil, nil, err
Expand Down Expand Up @@ -186,28 +187,58 @@ func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Run
if err != nil {
return nil, nil, nil, errors.Wrap(err, "failed offramp reader")
}
// Look up all destination offRamps connected to the same router
destRouterAddr, err := offRampReader.GetRouter(ctx)
if err != nil {
return nil, nil, nil, err
}
destRouterEvmAddr, err := ccipcalc.GenericAddrToEvm(destRouterAddr)
dimkouv marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, nil, nil, err
}
destRouter, err := router.NewRouter(destRouterEvmAddr, params.destChain.Client())
if err != nil {
return nil, nil, nil, err
}
destRouterOffRamps, err := destRouter.GetOffRamps(&bind.CallOpts{Context: ctx})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On initialization we call the router to get the configured offramp addresses.
If the configuration on the router changes how is the plugin notified about the change in order to initialize the new offramp or delete the reference to the removed off ramp?

Copy link
Collaborator Author

@matYang matYang Mar 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a limitation of the current iteration, right now if we add/remove lanes or tokens, the leader lane will need to accept new jobspecs, in additional to the impacted lane as is today; Connor has an idea that may avoid this, I'm looking into if it's feasible.

if err != nil {
return nil, nil, nil, err
}
var destOffRampReaders []ccipdata.OffRampReader
for _, o := range destRouterOffRamps {
destOffRampAddr := cciptypes.Address(o.OffRamp.String())
destOffRampReader, err2 := factory.NewOffRampReader(commitLggr, versionFinder, destOffRampAddr, params.destChain.Client(), params.destChain.LogPoller(), params.destChain.GasEstimator(), params.destChain.Config().EVM().GasEstimator().PriceMax().ToInt(), true, qopts...)
dimkouv marked this conversation as resolved.
Show resolved Hide resolved
if err2 != nil {
return nil, nil, nil, err2
}

destOffRampReaders = append(destOffRampReaders, destOffRampReader)
}

onRampRouterAddr, err := onRampReader.RouterAddress()
if err != nil {
return nil, nil, nil, err
}
routerAddr, err := ccipcalc.GenericAddrToEvm(onRampRouterAddr)
sourceRouterAddr, err := ccipcalc.GenericAddrToEvm(onRampRouterAddr)
if err != nil {
return nil, nil, nil, err
}
sourceRouter, err := router.NewRouter(routerAddr, params.sourceChain.Client())
sourceRouter, err := router.NewRouter(sourceRouterAddr, params.sourceChain.Client())
if err != nil {
return nil, nil, nil, err
}
sourceNative, err := sourceRouter.GetWrappedNative(nil)
sourceNative, err := sourceRouter.GetWrappedNative(&bind.CallOpts{Context: ctx})
dimkouv marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, nil, nil, err
}

// Prom wrappers
onRampReader = observability.NewObservedOnRampReader(onRampReader, params.sourceChain.ID().Int64(), ccip.CommitPluginLabel)
offRampReader = observability.NewObservedOffRampReader(offRampReader, params.destChain.ID().Int64(), ccip.CommitPluginLabel)
commitStoreReader = observability.NewObservedCommitStoreReader(commitStoreReader, params.destChain.ID().Int64(), ccip.CommitPluginLabel)
metricsCollector := ccip.NewPluginMetricsCollector(ccip.CommitPluginLabel, params.sourceChain.ID().Int64(), params.destChain.ID().Int64())
for i, o := range destOffRampReaders {
destOffRampReaders[i] = observability.NewObservedOffRampReader(o, params.destChain.ID().Int64(), ccip.CommitPluginLabel)
}

chainHealthcheck := cache.NewObservedChainHealthCheck(
cache.NewChainHealthcheck(
Expand Down Expand Up @@ -237,7 +268,7 @@ func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Run
return &CommitPluginStaticConfig{
lggr: commitLggr,
onRampReader: onRampReader,
offRamp: offRampReader,
offRamps: destOffRampReaders,
sourceNative: ccipcalc.EvmAddrToGeneric(sourceNative),
priceGetter: priceGetter,
sourceChainSelector: params.commitStoreStaticCfg.SourceChainSelector,
Expand Down
23 changes: 10 additions & 13 deletions core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type CommitPluginStaticConfig struct {
sourceChainSelector uint64
sourceNative cciptypes.Address
// Dest
offRamp ccipdata.OffRampReader
offRamps []ccipdata.OffRampReader
commitStore ccipdata.CommitStoreReader
destChainSelector uint64
priceRegistryProvider ccipdataprovider.PriceRegistry
Expand All @@ -81,7 +81,7 @@ type CommitReportingPlugin struct {
commitStoreReader ccipdata.CommitStoreReader
destPriceRegistryReader ccipdata.PriceRegistryReader
offchainConfig cciptypes.CommitOffchainConfig
offRampReader ccipdata.OffRampReader
offRampReaders []ccipdata.OffRampReader
F int
// Offchain
priceGetter pricegetter.PriceGetter
Expand Down Expand Up @@ -213,26 +213,24 @@ func (r *CommitReportingPlugin) observePriceUpdates(
return nil, nil, nil
}

feeTokens, bridgeableTokens, err := ccipcommon.GetDestinationTokens(ctx, r.offRampReader, r.destPriceRegistryReader)
sortedChainTokens, err := ccipcommon.GetSortedChainTokens(ctx, r.offRampReaders, r.destPriceRegistryReader)
if err != nil {
return nil, nil, fmt.Errorf("get destination tokens: %w", err)
}
destTokens := ccipcommon.FlattenUniqueSlice(feeTokens, bridgeableTokens)

return r.generatePriceUpdates(ctx, lggr, destTokens)
return r.generatePriceUpdates(ctx, lggr, sortedChainTokens)
}

// All prices are USD ($1=1e18) denominated. All prices must be not nil.
// Return token prices should contain the exact same tokens as in tokenDecimals.
func (r *CommitReportingPlugin) generatePriceUpdates(
dimkouv marked this conversation as resolved.
Show resolved Hide resolved
ctx context.Context,
lggr logger.Logger,
destTokens []cciptypes.Address,
sortedChainTokens []cciptypes.Address,
) (sourceGasPriceUSD *big.Int, tokenPricesUSD map[cciptypes.Address]*big.Int, err error) {
// Include wrapped native in our token query as way to identify the source native USD price.
// notice USD is in 1e18 scale, i.e. $1 = 1e18
queryTokens := ccipcommon.FlattenUniqueSlice([]cciptypes.Address{r.sourceNative}, destTokens)
sort.Slice(queryTokens, func(i, j int) bool { return queryTokens[i] < queryTokens[j] }) // make the query deterministic
queryTokens := ccipcommon.FlattenUniqueSlice([]cciptypes.Address{r.sourceNative}, sortedChainTokens)

rawTokenPricesUSD, err := r.priceGetter.TokenPricesUSD(ctx, queryTokens)
if err != nil {
Expand All @@ -252,13 +250,13 @@ func (r *CommitReportingPlugin) generatePriceUpdates(
return nil, nil, fmt.Errorf("missing source native (%s) price", r.sourceNative)
}

destTokensDecimals, err := r.destPriceRegistryReader.GetTokensDecimals(ctx, destTokens)
destTokensDecimals, err := r.destPriceRegistryReader.GetTokensDecimals(ctx, sortedChainTokens)
if err != nil {
return nil, nil, fmt.Errorf("get tokens decimals: %w", err)
}

tokenPricesUSD = make(map[cciptypes.Address]*big.Int, len(rawTokenPricesUSD))
for i, token := range destTokens {
for i, token := range sortedChainTokens {
tokenPricesUSD[token] = calculateUsdPer1e18TokenAmount(rawTokenPricesUSD[token], destTokensDecimals[i])
dimkouv marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down Expand Up @@ -380,14 +378,13 @@ func (r *CommitReportingPlugin) Report(ctx context.Context, epochAndRound types.

parsableObservations := ccip.GetParsableObservations[ccip.CommitObservation](lggr, observations)

feeTokens, bridgeableTokens, err := ccipcommon.GetDestinationTokens(ctx, r.offRampReader, r.destPriceRegistryReader)
sortedChainTokens, err := ccipcommon.GetSortedChainTokens(ctx, r.offRampReaders, r.destPriceRegistryReader)
if err != nil {
return false, nil, fmt.Errorf("get destination tokens: %w", err)
}
destTokens := ccipcommon.FlattenUniqueSlice(feeTokens, bridgeableTokens)

// Filters out parsable but faulty observations
validObservations, err := validateObservations(ctx, lggr, destTokens, r.F, parsableObservations, r.offchainConfig.PriceReportingDisabled)
validObservations, err := validateObservations(ctx, lggr, sortedChainTokens, r.F, parsableObservations, r.offchainConfig.PriceReportingDisabled)
if err != nil {
return false, nil, err
}
Expand Down
Loading
Loading