Skip to content

Commit

Permalink
Merging ccip/ccipcommit directory
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusz-sekara committed Jan 10, 2025
1 parent ba343f3 commit a44d94d
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 96 deletions.
29 changes: 21 additions & 8 deletions core/services/ocr2/plugins/ccip/ccipcommit/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,11 @@ type reportingPluginAndInfo struct {
func (rf *CommitReportingPluginFactory) NewReportingPlugin(ctx context.Context, config types.ReportingPluginConfig) (types.ReportingPlugin, types.ReportingPluginInfo, error) {
initialRetryDelay := rf.config.newReportingPluginRetryConfig.InitialDelay
maxDelay := rf.config.newReportingPluginRetryConfig.MaxDelay
maxRetries := rf.config.newReportingPluginRetryConfig.MaxRetries

pluginAndInfo, err := ccipcommon.RetryUntilSuccess(rf.NewReportingPluginFn(ctx, config), initialRetryDelay, maxDelay)
pluginAndInfo, err := ccipcommon.RetryUntilSuccess(
rf.NewReportingPluginFn(ctx, config), initialRetryDelay, maxDelay, maxRetries,
)
if err != nil {
return nil, types.ReportingPluginInfo{}, err
}
Expand All @@ -86,33 +89,33 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(ctx context.Context,
// retried via RetryUntilSuccess. NewReportingPlugin must return successfully in order for the Commit plugin to
// function, hence why we can only keep retrying it until it succeeds.
func (rf *CommitReportingPluginFactory) NewReportingPluginFn(ctx context.Context, config types.ReportingPluginConfig) func() (reportingPluginAndInfo, error) {
return func() (reportingPluginAndInfo, error) {
newReportingPluginFn := func() (reportingPluginAndInfo, error) {
destPriceReg, err := rf.config.commitStore.ChangeConfig(ctx, config.OnchainConfig, config.OffchainConfig)
if err != nil {
return reportingPluginAndInfo{}, err
return reportingPluginAndInfo{}, fmt.Errorf("commitStore.ChangeConfig error: %w", err)
}

priceRegEvmAddr, err := ccipcalc.GenericAddrToEvm(destPriceReg)
if err != nil {
return reportingPluginAndInfo{}, err
return reportingPluginAndInfo{}, fmt.Errorf("GenericAddrToEvm error: %w", err)
}
if err = rf.UpdateDynamicReaders(ctx, priceRegEvmAddr); err != nil {
return reportingPluginAndInfo{}, err
return reportingPluginAndInfo{}, fmt.Errorf("UpdateDynamicReaders error: %w", err)
}

pluginOffChainConfig, err := rf.config.commitStore.OffchainConfig(ctx)
if err != nil {
return reportingPluginAndInfo{}, err
return reportingPluginAndInfo{}, fmt.Errorf("commitStore.OffchainConfig error: %w", err)
}

gasPriceEstimator, err := rf.config.commitStore.GasPriceEstimator(ctx)
if err != nil {
return reportingPluginAndInfo{}, err
return reportingPluginAndInfo{}, fmt.Errorf("commitStore.GasPriceEstimator error: %w", err)
}

err = rf.config.priceService.UpdateDynamicConfig(ctx, gasPriceEstimator, rf.destPriceRegReader)
if err != nil {
return reportingPluginAndInfo{}, err
return reportingPluginAndInfo{}, fmt.Errorf("priceService.UpdateDynamicConfig error: %w", err)
}

lggr := rf.config.lggr.Named("CommitReportingPlugin")
Expand Down Expand Up @@ -145,4 +148,14 @@ func (rf *CommitReportingPluginFactory) NewReportingPluginFn(ctx context.Context

return reportingPluginAndInfo{plugin, pluginInfo}, nil
}

return func() (reportingPluginAndInfo, error) {
result, err := newReportingPluginFn()
if err != nil {
rf.config.lggr.Errorw("NewReportingPlugin failed", "err", err)
rf.config.metricsCollector.NewReportingPluginError()
}

return result, err
}
}
6 changes: 4 additions & 2 deletions core/services/ocr2/plugins/ccip/ccipcommit/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import (
"testing"
"time"

"github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-common/pkg/types/ccip"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

"github.com/smartcontractkit/chainlink/v2/core/logger"
ccip2 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata"
ccipdataprovidermocks "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/ccipdataprovider/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks"
Expand All @@ -28,6 +28,8 @@ import (
func TestNewReportingPluginRetriesUntilSuccess(t *testing.T) {
ctx := tests.Context(t)
commitConfig := CommitPluginStaticConfig{}
commitConfig.lggr = logger.TestLogger(t)
commitConfig.metricsCollector = ccip2.NoopMetricsCollector

// For this unit test, ensure that there is no delay between retries
commitConfig.newReportingPluginRetryConfig = ccipdata.RetryConfig{
Expand Down
69 changes: 22 additions & 47 deletions core/services/ocr2/plugins/ccip/ccipcommit/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,9 @@ package ccipcommit
import (
"context"
"encoding/json"
"fmt"
"math/big"
"strings"
"time"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/pricegetter"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/rpclib"

"github.com/Masterminds/semver/v3"
"github.com/ethereum/go-ethereum/common"
libocr2 "github.com/smartcontractkit/libocr/offchainreporting2plus"
Expand All @@ -27,7 +22,6 @@ import (
db "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdb"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
"github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip"
Expand All @@ -40,9 +34,29 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
)

var defaultNewReportingPluginRetryConfig = ccipdata.RetryConfig{InitialDelay: time.Second, MaxDelay: 5 * time.Minute}
var defaultNewReportingPluginRetryConfig = ccipdata.RetryConfig{
InitialDelay: time.Second,
MaxDelay: 10 * time.Minute,
// Retry for approximately 4hrs (MaxDelay of 10m = 6 times per hour, times 4 hours, plus 10 because the first
// 10 retries only take 20 minutes due to an initial retry of 1s and exponential backoff)
MaxRetries: (6 * 4) + 10,
}

func NewCommitServices(ctx context.Context, ds sqlutil.DataSource, srcProvider commontypes.CCIPCommitProvider, dstProvider commontypes.CCIPCommitProvider, chainSet legacyevm.LegacyChainContainer, jb job.Job, lggr logger.Logger, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, new bool, sourceChainID int64, destChainID int64, logError func(string)) ([]job.ServiceCtx, error) {
func NewCommitServices(
ctx context.Context,
ds sqlutil.DataSource,
srcProvider commontypes.CCIPCommitProvider,
dstProvider commontypes.CCIPCommitProvider,
priceGetter ccip.AllTokensPriceGetter,
jb job.Job,
lggr logger.Logger,
pr pipeline.Runner,
argsNoPlugin libocr2.OCR2OracleArgs,
new bool,
sourceChainID int64,
destChainID int64,
logError func(string),
) ([]job.ServiceCtx, error) {
spec := jb.OCR2OracleSpec

var pluginConfig ccipconfig.CommitPluginJobSpecConfig
Expand All @@ -69,45 +83,6 @@ func NewCommitServices(ctx context.Context, ds sqlutil.DataSource, srcProvider c
commitStoreReader = ccip.NewProviderProxyCommitStoreReader(srcCommitStore, dstCommitStore)
commitLggr := lggr.Named("CCIPCommit").With("sourceChain", sourceChainID, "destChain", destChainID)

var priceGetter pricegetter.AllTokensPriceGetter
withPipeline := strings.Trim(pluginConfig.TokenPricesUSDPipeline, "\n\t ") != ""
if withPipeline {
priceGetter, err = pricegetter.NewPipelineGetter(pluginConfig.TokenPricesUSDPipeline, pr, jb.ID, jb.ExternalJobID, jb.Name.ValueOrZero(), lggr)
if err != nil {
return nil, fmt.Errorf("creating pipeline price getter: %w", err)
}
} else {
// Use dynamic price getter.
if pluginConfig.PriceGetterConfig == nil {
return nil, fmt.Errorf("priceGetterConfig is nil")
}

// Build price getter clients for all chains specified in the aggregator configurations.
// Some lanes (e.g. Wemix/Kroma) requires other clients than source and destination, since they use feeds from other chains.
priceGetterClients := map[uint64]pricegetter.DynamicPriceGetterClient{}
for _, aggCfg := range pluginConfig.PriceGetterConfig.AggregatorPrices {
chainID := aggCfg.ChainID
// Retrieve the chain.
chain, _, err2 := ccipconfig.GetChainByChainID(chainSet, chainID)
if err2 != nil {
return nil, fmt.Errorf("retrieving chain for chainID %d: %w", chainID, err2)
}
caller := rpclib.NewDynamicLimitedBatchCaller(
lggr,
chain.Client(),
rpclib.DefaultRpcBatchSizeLimit,
rpclib.DefaultRpcBatchBackOffMultiplier,
rpclib.DefaultMaxParallelRpcCalls,
)
priceGetterClients[chainID] = pricegetter.NewDynamicPriceGetterClient(caller)
}

priceGetter, err = pricegetter.NewDynamicPriceGetter(*pluginConfig.PriceGetterConfig, priceGetterClients)
if err != nil {
return nil, fmt.Errorf("creating dynamic price getter: %w", err)
}
}

offRampReader, err := dstProvider.NewOffRampReader(ctx, pluginConfig.OffRamp)
if err != nil {
return nil, err
Expand Down
76 changes: 56 additions & 20 deletions core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,24 +461,42 @@ func (r *CommitReportingPlugin) selectPriceUpdates(ctx context.Context, now time
// The returned latestGasPrice and latestTokenPrices should not contain nil values.
func (r *CommitReportingPlugin) calculatePriceUpdates(ctx context.Context, gasPriceObs map[uint64][]*big.Int, tokenPriceObs map[cciptypes.Address][]*big.Int, latestGasPrice map[uint64]update, latestTokenPrices map[cciptypes.Address]update) ([]cciptypes.GasPrice, []cciptypes.TokenPrice, error) {
var tokenPriceUpdates []cciptypes.TokenPrice
// Token prices are mostly heartbeat driven. To maximize heartbeat batching, the price inclusion rule is as follows:
// If any token requires heartbeat update, include all token prices in the report.
// Otherwise, only include token prices that exceed deviation threshold.
needTokenHeartbeat := false
for token := range tokenPriceObs {
latestTokenPrice, exists := latestTokenPrices[token]
if !exists || time.Since(latestTokenPrice.timestamp) >= r.offchainConfig.TokenPriceHeartBeat {
r.lggr.Infow("Token requires heartbeat update", "token", token)
needTokenHeartbeat = true
break
}
}

for token, tokenPriceObservations := range tokenPriceObs {
medianPrice := ccipcalc.BigIntSortedMiddle(tokenPriceObservations)

if needTokenHeartbeat {
r.lggr.Debugw("Token price update included due to heartbeat", "token", token, "newPrice", medianPrice)
tokenPriceUpdates = append(tokenPriceUpdates, cciptypes.TokenPrice{
Token: token,
Value: medianPrice,
})
continue
}

latestTokenPrice, exists := latestTokenPrices[token]
if exists {
tokenPriceUpdatedRecently := time.Since(latestTokenPrice.timestamp) < r.offchainConfig.TokenPriceHeartBeat
tokenPriceNotChanged := !ccipcalc.Deviates(medianPrice, latestTokenPrice.value, int64(r.offchainConfig.TokenPriceDeviationPPB))
if tokenPriceUpdatedRecently && tokenPriceNotChanged {
r.lggr.Debugw("token price was updated recently, skipping the update",
if ccipcalc.Deviates(medianPrice, latestTokenPrice.value, int64(r.offchainConfig.TokenPriceDeviationPPB)) {
r.lggr.Debugw("Token price update included due to deviation",
"token", token, "newPrice", medianPrice, "existingPrice", latestTokenPrice.value)
continue // skip the update if we recently had a price update close to the new value
tokenPriceUpdates = append(tokenPriceUpdates, cciptypes.TokenPrice{
Token: token,
Value: medianPrice,
})
}
}

tokenPriceUpdates = append(tokenPriceUpdates, cciptypes.TokenPrice{
Token: token,
Value: medianPrice,
})
}

// Determinism required.
Expand All @@ -487,31 +505,49 @@ func (r *CommitReportingPlugin) calculatePriceUpdates(ctx context.Context, gasPr
})

var gasPriceUpdate []cciptypes.GasPrice
// Gas prices are mostly heartbeat driven. To maximize heartbeat batching, the price inclusion rule is as follows:
// If any source chain gas price requires heartbeat update, include all gas prices in the report.
// Otherwise, only include gas prices that exceed deviation threshold.
needGasHeartbeat := false
for chainSelector := range gasPriceObs {
latestGasPrice, exists := latestGasPrice[chainSelector]
if !exists || latestGasPrice.value == nil || time.Since(latestGasPrice.timestamp) >= r.offchainConfig.GasPriceHeartBeat {
r.lggr.Infow("Chain gas price requires heartbeat update", "chainSelector", chainSelector)
needGasHeartbeat = true
break
}
}

for chainSelector, gasPriceObservations := range gasPriceObs {
newGasPrice, err := r.gasPriceEstimator.Median(ctx, gasPriceObservations) // Compute the median price
if err != nil {
return nil, nil, fmt.Errorf("failed to calculate median gas price for chain selector %d: %w", chainSelector, err)
}

// Default to updating so that we update if there are no prior updates.
if needGasHeartbeat {
r.lggr.Debugw("Gas price update included due to heartbeat", "chainSelector", chainSelector)
gasPriceUpdate = append(gasPriceUpdate, cciptypes.GasPrice{
DestChainSelector: chainSelector,
Value: newGasPrice,
})
continue
}

latestGasPrice, exists := latestGasPrice[chainSelector]
if exists && latestGasPrice.value != nil {
gasPriceUpdatedRecently := time.Since(latestGasPrice.timestamp) < r.offchainConfig.GasPriceHeartBeat
gasPriceDeviated, err := r.gasPriceEstimator.Deviates(ctx, newGasPrice, latestGasPrice.value)
if err != nil {
return nil, nil, err
}
if gasPriceUpdatedRecently && !gasPriceDeviated {
r.lggr.Debugw("gas price was updated recently and not deviated sufficiently, skipping the update",
if gasPriceDeviated {
r.lggr.Debugw("Gas price update included due to deviation",
"chainSelector", chainSelector, "newPrice", newGasPrice, "existingPrice", latestGasPrice.value)
continue
gasPriceUpdate = append(gasPriceUpdate, cciptypes.GasPrice{
DestChainSelector: chainSelector,
Value: newGasPrice,
})
}
}

gasPriceUpdate = append(gasPriceUpdate, cciptypes.GasPrice{
DestChainSelector: chainSelector,
Value: newGasPrice,
})
}

sort.Slice(gasPriceUpdate, func(i, j int) bool {
Expand Down
Loading

0 comments on commit a44d94d

Please sign in to comment.