diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/factory.go b/core/services/ocr2/plugins/ccip/ccipcommit/factory.go index 648f62a23a..0cc4e3fa67 100644 --- a/core/services/ocr2/plugins/ccip/ccipcommit/factory.go +++ b/core/services/ocr2/plugins/ccip/ccipcommit/factory.go @@ -74,8 +74,11 @@ type reportingPluginAndInfo struct { func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.ReportingPluginConfig) (types.ReportingPlugin, types.ReportingPluginInfo, error) { initialRetryDelay := rf.config.newReportingPluginRetryConfig.InitialDelay maxDelay := rf.config.newReportingPluginRetryConfig.MaxDelay + maxRetries := rf.config.newReportingPluginRetryConfig.MaxRetries - pluginAndInfo, err := ccipcommon.RetryUntilSuccess(rf.NewReportingPluginFn(config), initialRetryDelay, maxDelay) + pluginAndInfo, err := ccipcommon.RetryUntilSuccess( + rf.NewReportingPluginFn(config), initialRetryDelay, maxDelay, maxRetries, + ) if err != nil { return nil, types.ReportingPluginInfo{}, err } @@ -86,35 +89,35 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin // retried via RetryUntilSuccess. NewReportingPlugin must return successfully in order for the Commit plugin to // function, hence why we can only keep retrying it until it succeeds. func (rf *CommitReportingPluginFactory) NewReportingPluginFn(config types.ReportingPluginConfig) func() (reportingPluginAndInfo, error) { - return func() (reportingPluginAndInfo, error) { + newReportingPluginFn := func() (reportingPluginAndInfo, error) { ctx := context.Background() // todo: consider adding some timeout destPriceReg, err := rf.config.commitStore.ChangeConfig(ctx, config.OnchainConfig, config.OffchainConfig) if err != nil { - return reportingPluginAndInfo{}, err + return reportingPluginAndInfo{}, fmt.Errorf("commitStore.ChangeConfig error: %w", err) } priceRegEvmAddr, err := ccipcalc.GenericAddrToEvm(destPriceReg) if err != nil { - return reportingPluginAndInfo{}, err + return reportingPluginAndInfo{}, fmt.Errorf("GenericAddrToEvm error: %w", err) } if err = rf.UpdateDynamicReaders(ctx, priceRegEvmAddr); err != nil { - return reportingPluginAndInfo{}, err + return reportingPluginAndInfo{}, fmt.Errorf("UpdateDynamicReaders error: %w", err) } pluginOffChainConfig, err := rf.config.commitStore.OffchainConfig(ctx) if err != nil { - return reportingPluginAndInfo{}, err + return reportingPluginAndInfo{}, fmt.Errorf("commitStore.OffchainConfig error: %w", err) } gasPriceEstimator, err := rf.config.commitStore.GasPriceEstimator(ctx) if err != nil { - return reportingPluginAndInfo{}, err + return reportingPluginAndInfo{}, fmt.Errorf("commitStore.GasPriceEstimator error: %w", err) } err = rf.config.priceService.UpdateDynamicConfig(ctx, gasPriceEstimator, rf.destPriceRegReader) if err != nil { - return reportingPluginAndInfo{}, err + return reportingPluginAndInfo{}, fmt.Errorf("priceService.UpdateDynamicConfig error: %w", err) } lggr := rf.config.lggr.Named("CommitReportingPlugin") @@ -147,4 +150,14 @@ func (rf *CommitReportingPluginFactory) NewReportingPluginFn(config types.Report return reportingPluginAndInfo{plugin, pluginInfo}, nil } + + return func() (reportingPluginAndInfo, error) { + result, err := newReportingPluginFn() + if err != nil { + rf.config.lggr.Errorw("NewReportingPlugin failed", "err", err) + rf.config.metricsCollector.NewReportingPluginError() + } + + return result, err + } } diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/factory_test.go b/core/services/ocr2/plugins/ccip/ccipcommit/factory_test.go index 825026bd17..7b3a3cc105 100644 --- a/core/services/ocr2/plugins/ccip/ccipcommit/factory_test.go +++ b/core/services/ocr2/plugins/ccip/ccipcommit/factory_test.go @@ -11,6 +11,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" "github.com/smartcontractkit/chainlink/v2/core/logger" + ccip2 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata" ccipdataprovidermocks "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/ccipdataprovider/mocks" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks" @@ -24,6 +25,8 @@ import ( // retries a sufficient number of times to get through the transient errors and eventually succeed. func TestNewReportingPluginRetriesUntilSuccess(t *testing.T) { commitConfig := CommitPluginStaticConfig{} + commitConfig.lggr = logger.TestLogger(t) + commitConfig.metricsCollector = ccip2.NoopMetricsCollector // For this unit test, ensure that there is no delay between retries commitConfig.newReportingPluginRetryConfig = ccipdata.RetryConfig{ diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go b/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go index e964896ab9..30926cfd4e 100644 --- a/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go +++ b/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go @@ -40,7 +40,13 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" ) -var defaultNewReportingPluginRetryConfig = ccipdata.RetryConfig{InitialDelay: time.Second, MaxDelay: 5 * time.Minute} +var defaultNewReportingPluginRetryConfig = ccipdata.RetryConfig{ + InitialDelay: time.Second, + MaxDelay: 10 * time.Minute, + // Retry for approximately 4hrs (MaxDelay of 10m = 6 times per hour, times 4 hours, plus 10 because the first + // 10 retries only take 20 minutes due to an initial retry of 1s and exponential backoff) + MaxRetries: (6 * 4) + 10, +} func NewCommitServices(ctx context.Context, ds sqlutil.DataSource, srcProvider commontypes.CCIPCommitProvider, dstProvider commontypes.CCIPCommitProvider, chainSet legacyevm.LegacyChainContainer, jb job.Job, lggr logger.Logger, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, new bool, sourceChainID int64, destChainID int64, logError func(string)) ([]job.ServiceCtx, error) { spec := jb.OCR2OracleSpec diff --git a/core/services/ocr2/plugins/ccip/ccipexec/factory.go b/core/services/ocr2/plugins/ccip/ccipexec/factory.go index 97caf2e719..7e5cadc7ea 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/factory.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/factory.go @@ -71,8 +71,11 @@ type reportingPluginAndInfo struct { func (rf *ExecutionReportingPluginFactory) NewReportingPlugin(config types.ReportingPluginConfig) (types.ReportingPlugin, types.ReportingPluginInfo, error) { initialRetryDelay := rf.config.newReportingPluginRetryConfig.InitialDelay maxDelay := rf.config.newReportingPluginRetryConfig.MaxDelay + maxRetries := rf.config.newReportingPluginRetryConfig.MaxRetries - pluginAndInfo, err := ccipcommon.RetryUntilSuccess(rf.NewReportingPluginFn(config), initialRetryDelay, maxDelay) + pluginAndInfo, err := ccipcommon.RetryUntilSuccess( + rf.NewReportingPluginFn(config), initialRetryDelay, maxDelay, maxRetries, + ) if err != nil { return nil, types.ReportingPluginInfo{}, err } @@ -83,7 +86,7 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPlugin(config types.Repor // retried via RetryUntilSuccess. NewReportingPlugin must return successfully in order for the Exec plugin to function, // hence why we can only keep retrying it until it succeeds. func (rf *ExecutionReportingPluginFactory) NewReportingPluginFn(config types.ReportingPluginConfig) func() (reportingPluginAndInfo, error) { - return func() (reportingPluginAndInfo, error) { + newReportingPluginFn := func() (reportingPluginAndInfo, error) { ctx := context.Background() // todo: consider setting a timeout destPriceRegistry, destWrappedNative, err := rf.config.offRampReader.ChangeConfig(ctx, config.OnchainConfig, config.OffchainConfig) @@ -161,4 +164,14 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPluginFn(config types.Rep return reportingPluginAndInfo{plugin, pluginInfo}, nil } + + return func() (reportingPluginAndInfo, error) { + result, err := newReportingPluginFn() + if err != nil { + rf.config.lggr.Errorw("NewReportingPlugin failed", "err", err) + rf.config.metricsCollector.NewReportingPluginError() + } + + return result, err + } } diff --git a/core/services/ocr2/plugins/ccip/ccipexec/factory_test.go b/core/services/ocr2/plugins/ccip/ccipexec/factory_test.go index 7bbb9be0c6..869aa7e332 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/factory_test.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/factory_test.go @@ -11,6 +11,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" "github.com/smartcontractkit/chainlink/v2/core/logger" + ccip2 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata" ccipdataprovidermocks "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/ccipdataprovider/mocks" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks" @@ -23,6 +24,8 @@ import ( // retries a sufficient number of times to get through the transient errors and eventually succeed. func TestNewReportingPluginRetriesUntilSuccess(t *testing.T) { execConfig := ExecutionPluginStaticConfig{} + execConfig.lggr = logger.TestLogger(t) + execConfig.metricsCollector = ccip2.NoopMetricsCollector // For this unit test, ensure that there is no delay between retries execConfig.newReportingPluginRetryConfig = ccipdata.RetryConfig{ diff --git a/core/services/ocr2/plugins/ccip/ccipexec/initializers.go b/core/services/ocr2/plugins/ccip/ccipexec/initializers.go index 7826f6058f..c8d079b263 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/initializers.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/initializers.go @@ -46,7 +46,13 @@ var ( tokenDataWorkerNumWorkers = 5 ) -var defaultNewReportingPluginRetryConfig = ccipdata.RetryConfig{InitialDelay: time.Second, MaxDelay: 5 * time.Minute} +var defaultNewReportingPluginRetryConfig = ccipdata.RetryConfig{ + InitialDelay: time.Second, + MaxDelay: 10 * time.Minute, + // Retry for approximately 4hrs (MaxDelay of 10m = 6 times per hour, times 4 hours, plus 10 because the first + // 10 retries only take 20 minutes due to an initial retry of 1s and exponential backoff) + MaxRetries: (6 * 4) + 10, +} func NewExecServices(ctx context.Context, lggr logger.Logger, jb job.Job, srcProvider types.CCIPExecProvider, dstProvider types.CCIPExecProvider, srcChainID int64, dstChainID int64, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string)) ([]job.ServiceCtx, error) { if jb.OCR2OracleSpec == nil { diff --git a/core/services/ocr2/plugins/ccip/internal/ccipcommon/shortcuts.go b/core/services/ocr2/plugins/ccip/internal/ccipcommon/shortcuts.go index 4f5ba6cfae..7d80ee5e6c 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipcommon/shortcuts.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipcommon/shortcuts.go @@ -127,14 +127,19 @@ func SelectorToBytes(chainSelector uint64) [16]byte { return b } -// RetryUntilSuccess repeatedly calls fn until it returns a nil error. After each failed call there is an exponential -// backoff applied, between initialDelay and maxDelay. -func RetryUntilSuccess[T any](fn func() (T, error), initialDelay time.Duration, maxDelay time.Duration) (T, error) { +// RetryUntilSuccess repeatedly calls fn until it returns a nil error or retries have been exhausted. After each failed +// call there is an exponential backoff applied, between initialDelay and maxDelay. +func RetryUntilSuccess[T any]( + fn func() (T, error), + initialDelay time.Duration, + maxDelay time.Duration, + maxRetries uint, +) (T, error) { return retry.DoWithData( fn, retry.Delay(initialDelay), retry.MaxDelay(maxDelay), retry.DelayType(retry.BackOffDelay), - retry.UntilSucceeded(), + retry.Attempts(maxRetries), ) } diff --git a/core/services/ocr2/plugins/ccip/internal/ccipcommon/shortcuts_test.go b/core/services/ocr2/plugins/ccip/internal/ccipcommon/shortcuts_test.go index 73a3b83495..ecf2c4dc7a 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipcommon/shortcuts_test.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipcommon/shortcuts_test.go @@ -183,14 +183,20 @@ func TestRetryUntilSuccess(t *testing.T) { } // Assert that RetryUntilSuccess returns the expected value when fn returns success on the 5th attempt - numCalls, err := RetryUntilSuccess(fn, initialDelay, maxDelay) + numCalls, err := RetryUntilSuccess(fn, initialDelay, maxDelay, 10) assert.Nil(t, err) assert.Equal(t, 5, numCalls) // Assert that RetryUntilSuccess returns the expected value when fn returns success on the 8th attempt numAttempts = 8 numCalls = 0 - numCalls, err = RetryUntilSuccess(fn, initialDelay, maxDelay) + numCalls, err = RetryUntilSuccess(fn, initialDelay, maxDelay, 10) assert.Nil(t, err) assert.Equal(t, 8, numCalls) + + // Assert that RetryUntilSuccess exhausts retries + numAttempts = 8 + numCalls = 0 + numCalls, err = RetryUntilSuccess(fn, initialDelay, maxDelay, 2) + assert.NotNil(t, err) } diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/retry_config.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/retry_config.go index 41161ee938..80c5364e18 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/retry_config.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/retry_config.go @@ -2,8 +2,10 @@ package ccipdata import "time" -// RetryConfig configures an initial delay between retries and a max delay between retries +// RetryConfig configures an initial delay between retries, a max delay between retries, and a maximum number of +// times to retry type RetryConfig struct { InitialDelay time.Duration MaxDelay time.Duration + MaxRetries uint } diff --git a/core/services/ocr2/plugins/ccip/metrics.go b/core/services/ocr2/plugins/ccip/metrics.go index f481b5d447..9ec9fde316 100644 --- a/core/services/ocr2/plugins/ccip/metrics.go +++ b/core/services/ocr2/plugins/ccip/metrics.go @@ -20,6 +20,10 @@ var ( Name: "ccip_sequence_number_counter", Help: "Sequence number of the last message processed by the plugin", }, []string{"plugin", "source", "dest", "ocrPhase"}) + newReportingPluginErrorCounter = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "ccip_new_reporting_plugin_error_counter", + Help: "The count of the number of errors when calling NewReportingPlugin", + }, []string{"plugin"}) ) type ocrPhase string @@ -35,6 +39,7 @@ type PluginMetricsCollector interface { NumberOfMessagesBasedOnInterval(phase ocrPhase, seqNrMin, seqNrMax uint64) UnexpiredCommitRoots(count int) SequenceNumber(phase ocrPhase, seqNr uint64) + NewReportingPluginError() } type pluginMetricsCollector struct { @@ -79,6 +84,12 @@ func (p *pluginMetricsCollector) SequenceNumber(phase ocrPhase, seqNr uint64) { Set(float64(seqNr)) } +func (p *pluginMetricsCollector) NewReportingPluginError() { + newReportingPluginErrorCounter. + WithLabelValues(p.pluginName). + Inc() +} + var ( // NoopMetricsCollector is a no-op implementation of PluginMetricsCollector NoopMetricsCollector PluginMetricsCollector = noop{} @@ -97,3 +108,6 @@ func (d noop) UnexpiredCommitRoots(int) { func (d noop) SequenceNumber(ocrPhase, uint64) { } + +func (d noop) NewReportingPluginError() { +}