diff --git a/.changeset/red-balloons-repeat.md b/.changeset/red-balloons-repeat.md new file mode 100644 index 0000000000..674cae9602 --- /dev/null +++ b/.changeset/red-balloons-repeat.md @@ -0,0 +1,5 @@ +--- +"ccip": patch +--- + +Commit NewReportingPlugin retries on error diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/factory.go b/core/services/ocr2/plugins/ccip/ccipcommit/factory.go index 03a301ce67..79fd9f1ec8 100644 --- a/core/services/ocr2/plugins/ccip/ccipcommit/factory.go +++ b/core/services/ocr2/plugins/ccip/ccipcommit/factory.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/ethereum/go-ethereum/common" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcommon" "github.com/smartcontractkit/libocr/offchainreporting2plus/types" cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" @@ -64,35 +65,60 @@ func (rf *CommitReportingPluginFactory) UpdateDynamicReaders(ctx context.Context return nil } +type reportingPluginAndInfo struct { + plugin types.ReportingPlugin + pluginInfo types.ReportingPluginInfo +} + // NewReportingPlugin returns the ccip CommitReportingPlugin and satisfies the ReportingPluginFactory interface. func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.ReportingPluginConfig) (types.ReportingPlugin, types.ReportingPluginInfo, error) { - ctx := context.Background() // todo: consider adding some timeout - - destPriceReg, err := rf.config.commitStore.ChangeConfig(ctx, config.OnchainConfig, config.OffchainConfig) + initialRetryDelay := rf.config.newReportingPluginRetryConfig.InitialDelay + maxDelay := rf.config.newReportingPluginRetryConfig.MaxDelay + + pluginAndInfo, err := ccipcommon.RetryUntilSuccess( + rf.NewReportingPluginFn(config), + initialRetryDelay, + maxDelay, + ) if err != nil { return nil, types.ReportingPluginInfo{}, err } + return pluginAndInfo.plugin, pluginAndInfo.pluginInfo, nil +} - priceRegEvmAddr, err := ccipcalc.GenericAddrToEvm(destPriceReg) - if err != nil { - return nil, types.ReportingPluginInfo{}, err - } - if err = rf.UpdateDynamicReaders(ctx, priceRegEvmAddr); err != nil { - return nil, types.ReportingPluginInfo{}, err - } +// NewReportingPluginFn implements the NewReportingPlugin logic. It is defined as a function so that it can easily be +// retried via RetryUntilSuccess. NewReportingPlugin must return successfully in order for the Commit plugin to +// function, hence why we can only keep retrying it until it succeeds. +func (rf *CommitReportingPluginFactory) NewReportingPluginFn(config types.ReportingPluginConfig) func() (reportingPluginAndInfo, error) { + return func() (reportingPluginAndInfo, error) { + ctx := context.Background() // todo: consider adding some timeout - pluginOffChainConfig, err := rf.config.commitStore.OffchainConfig(ctx) - if err != nil { - return nil, types.ReportingPluginInfo{}, err - } + destPriceReg, err := rf.config.commitStore.ChangeConfig(ctx, config.OnchainConfig, config.OffchainConfig) + if err != nil { + return reportingPluginAndInfo{}, err + } - gasPriceEstimator, err := rf.config.commitStore.GasPriceEstimator(ctx) - if err != nil { - return nil, types.ReportingPluginInfo{}, err - } + priceRegEvmAddr, err := ccipcalc.GenericAddrToEvm(destPriceReg) + if err != nil { + return reportingPluginAndInfo{}, err + } + if err = rf.UpdateDynamicReaders(ctx, priceRegEvmAddr); err != nil { + return reportingPluginAndInfo{}, err + } + + pluginOffChainConfig, err := rf.config.commitStore.OffchainConfig(ctx) + if err != nil { + return reportingPluginAndInfo{}, err + } + + gasPriceEstimator, err := rf.config.commitStore.GasPriceEstimator(ctx) + if err != nil { + return reportingPluginAndInfo{}, err + } + + lggr := rf.config.lggr.Named("CommitReportingPlugin") - lggr := rf.config.lggr.Named("CommitReportingPlugin") - return &CommitReportingPlugin{ + plugin := &CommitReportingPlugin{ sourceChainSelector: rf.config.sourceChainSelector, sourceNative: rf.config.sourceNative, onRampReader: rf.config.onRampReader, @@ -106,8 +132,9 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin offchainConfig: pluginOffChainConfig, metricsCollector: rf.config.metricsCollector, chainHealthcheck: rf.config.chainHealthcheck, - }, - types.ReportingPluginInfo{ + } + + pluginInfo := types.ReportingPluginInfo{ Name: "CCIPCommit", UniqueReports: false, // See comment in CommitStore constructor. Limits: types.ReportingPluginLimits{ @@ -115,5 +142,8 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin MaxObservationLength: ccip.MaxObservationLength, MaxReportLength: MaxCommitReportLength, }, - }, nil + } + + return reportingPluginAndInfo{plugin, pluginInfo}, nil + } } diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/factory_test.go b/core/services/ocr2/plugins/ccip/ccipcommit/factory_test.go new file mode 100644 index 0000000000..0fe268035a --- /dev/null +++ b/core/services/ocr2/plugins/ccip/ccipcommit/factory_test.go @@ -0,0 +1,87 @@ +package ccipcommit + +import ( + "errors" + "testing" + "time" + + "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata" + ccipdataprovidermocks "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/ccipdataprovider/mocks" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks" +) + +// Assert that NewReportingPlugin keeps retrying until it succeeds. +// +// NewReportingPlugin makes several calls (e.g. CommitStoreReader.ChangeConfig) that can fail. We use mocks to cause the +// first call to each of these functions to fail, then all subsequent calls succeed. We assert that NewReportingPlugin +// retries a sufficient number of times to get through the transient errors and eventually succeed. +func TestNewReportingPluginRetriesUntilSuccess(t *testing.T) { + commitConfig := CommitPluginStaticConfig{} + + // For this unit test, ensure that there is no delay between retries + commitConfig.newReportingPluginRetryConfig = ccipdata.RetryConfig{ + InitialDelay: 0 * time.Nanosecond, + MaxDelay: 0 * time.Nanosecond, + } + + // Set up the OffRampReader mock + mockCommitStore := new(mocks.CommitStoreReader) + + // The first call is set to return an error, the following calls return a nil error + mockCommitStore. + On("ChangeConfig", mock.Anything, mock.Anything, mock.Anything). + Return(ccip.Address(""), errors.New("")). + Once() + mockCommitStore. + On("ChangeConfig", mock.Anything, mock.Anything, mock.Anything). + Return(ccip.Address("0x7c6e4F0BDe29f83BC394B75a7f313B7E5DbD2d77"), nil). + Times(5) + + mockCommitStore. + On("OffchainConfig", mock.Anything). + Return(ccip.CommitOffchainConfig{}, errors.New("")). + Once() + mockCommitStore. + On("OffchainConfig", mock.Anything). + Return(ccip.CommitOffchainConfig{}, nil). + Times(3) + + mockCommitStore. + On("GasPriceEstimator", mock.Anything). + Return(nil, errors.New("")). + Once() + mockCommitStore. + On("GasPriceEstimator", mock.Anything). + Return(nil, nil). + Times(2) + + commitConfig.commitStore = mockCommitStore + + priceRegistryProvider := new(ccipdataprovidermocks.PriceRegistry) + priceRegistryProvider. + On("NewPriceRegistryReader", mock.Anything, mock.Anything). + Return(nil, errors.New("")). + Once() + priceRegistryProvider. + On("NewPriceRegistryReader", mock.Anything, mock.Anything). + Return(nil, nil). + Once() + commitConfig.priceRegistryProvider = priceRegistryProvider + + commitConfig.lggr, _ = logger.NewLogger() + + factory := NewCommitReportingPluginFactory(commitConfig) + reportingConfig := types.ReportingPluginConfig{} + reportingConfig.OnchainConfig = []byte{1, 2, 3} + reportingConfig.OffchainConfig = []byte{1, 2, 3} + + // Assert that NewReportingPlugin succeeds despite many transient internal failures (mocked out above) + _, _, err := factory.NewReportingPlugin(reportingConfig) + assert.Equal(t, nil, err) +} diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go b/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go index 84af2749dc..8e4c4cb5ce 100644 --- a/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go +++ b/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go @@ -6,6 +6,7 @@ import ( "fmt" "math/big" "strings" + "time" "github.com/Masterminds/semver/v3" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -43,6 +44,11 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" ) +var defaultNewReportingPluginRetryConfig = ccipdata.RetryConfig{ + InitialDelay: time.Second, + MaxDelay: 5 * time.Minute, +} + func NewCommitServices(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer, new bool, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string)) ([]job.ServiceCtx, error) { pluginConfig, backfillArgs, chainHealthcheck, err := jobSpecToCommitPluginConfig(ctx, lggr, jb, pr, chainSet) if err != nil { @@ -235,21 +241,22 @@ func jobSpecToCommitPluginConfig(ctx context.Context, lggr logger.Logger, jb job "pluginConfig", params.pluginConfig, "staticConfig", params.commitStoreStaticCfg, // TODO bring back - //"dynamicOnRampConfig", dynamicOnRampConfig, + // "dynamicOnRampConfig", dynamicOnRampConfig, "sourceNative", sourceNative, "sourceRouter", sourceRouter.Address()) return &CommitPluginStaticConfig{ - lggr: commitLggr, - onRampReader: onRampReader, - offRamp: offRampReader, - sourceNative: ccipcalc.EvmAddrToGeneric(sourceNative), - priceGetter: priceGetter, - sourceChainSelector: params.commitStoreStaticCfg.SourceChainSelector, - destChainSelector: params.commitStoreStaticCfg.ChainSelector, - commitStore: commitStoreReader, - priceRegistryProvider: ccipdataprovider.NewEvmPriceRegistry(params.destChain.LogPoller(), params.destChain.Client(), commitLggr, ccip.CommitPluginLabel), - metricsCollector: metricsCollector, - chainHealthcheck: chainHealthcheck, + lggr: commitLggr, + newReportingPluginRetryConfig: defaultNewReportingPluginRetryConfig, + onRampReader: onRampReader, + offRamp: offRampReader, + sourceNative: ccipcalc.EvmAddrToGeneric(sourceNative), + priceGetter: priceGetter, + sourceChainSelector: params.commitStoreStaticCfg.SourceChainSelector, + destChainSelector: params.commitStoreStaticCfg.ChainSelector, + commitStore: commitStoreReader, + priceRegistryProvider: ccipdataprovider.NewEvmPriceRegistry(params.destChain.LogPoller(), params.destChain.Client(), commitLggr, ccip.CommitPluginLabel), + metricsCollector: metricsCollector, + chainHealthcheck: chainHealthcheck, }, &ccipcommon.BackfillArgs{ SourceLP: params.sourceChain.LogPoller(), DestLP: params.destChain.LogPoller(), diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go b/core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go index 67f713a544..65357adb8b 100644 --- a/core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go +++ b/core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go @@ -51,7 +51,8 @@ type update struct { } type CommitPluginStaticConfig struct { - lggr logger.Logger + lggr logger.Logger + newReportingPluginRetryConfig ccipdata.RetryConfig // Source onRampReader ccipdata.OnRampReader sourceChainSelector uint64