From a8b921c553a365e5ba1102e15df1c6d94256d62b Mon Sep 17 00:00:00 2001 From: Patrick Date: Tue, 9 Jul 2024 17:22:47 -0400 Subject: [PATCH] using provider based commitStoreReader for exec (#1150) ## Motivation Follow on to #1080, using a provider based commitStoreReader in the exec plugin --------- Co-authored-by: ilija42 <57732589+ilija42@users.noreply.github.com> Co-authored-by: Aaron Lu <50029043+aalu1418@users.noreply.github.com> --- core/services/ocr2/delegate.go | 12 +----------- .../plugins/ccip/ccipcommit/initializers.go | 5 ++++- .../plugins/ccip/ccipexec/initializers.go | 17 +++++++---------- core/services/relay/evm/evm.go | 2 ++ core/services/relay/evm/exec_provider.go | 19 +++++++++++++------ 5 files changed, 27 insertions(+), 28 deletions(-) diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index eca2cdc31c..f080356730 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -1976,10 +1976,6 @@ func (d *Delegate) newServicesCCIPExecution(ctx context.Context, lggr logger.Sug if err != nil { return nil, ErrJobSpecNoRelayer{Err: err, PluginName: string(spec.PluginType)} } - dstChain, err := d.legacyChains.Get(dstRid.ChainID) - if err != nil { - return nil, fmt.Errorf("ccip services; failed to get chain %s: %w", dstRid.ChainID, err) - } logError := func(msg string) { lggr.ErrorIf(d.jobORM.RecordError(context.Background(), jb.ID, msg), "unable to record error") @@ -2008,12 +2004,6 @@ func (d *Delegate) newServicesCCIPExecution(ctx context.Context, lggr logger.Sug return nil, err } - srcChainIDstr := strconv.FormatUint(srcChainID, 10) - srcChain, err := d.legacyChains.Get(srcChainIDstr) - if err != nil { - return nil, fmt.Errorf("open source chain: %w", err) - } - oracleArgsNoPlugin2 := libocr2.OCR2OracleArgs{ BinaryNetworkEndpointFactory: d.peerWrapper.Peer2, V2Bootstrappers: bootstrapPeers, @@ -2033,7 +2023,7 @@ func (d *Delegate) newServicesCCIPExecution(ctx context.Context, lggr logger.Sug MetricsRegisterer: prometheus.WrapRegistererWith(map[string]string{"job_name": jb.Name.ValueOrZero()}, prometheus.DefaultRegisterer), } - return ccipexec.NewExecServices(ctx, lggr, jb, srcProvider, dstProvider, srcChain, dstChain, int64(srcChainID), dstChainID, d.legacyChains, d.isNewlyCreatedJob, oracleArgsNoPlugin2, logError) + return ccipexec.NewExecServices(ctx, lggr, jb, srcProvider, dstProvider, int64(srcChainID), dstChainID, d.isNewlyCreatedJob, oracleArgsNoPlugin2, logError) } func (d *Delegate) ccipExecGetDstProvider(ctx context.Context, jb job.Job, pluginJobSpecConfig ccipconfig.ExecPluginJobSpecConfig, transmitterID string) (types.CCIPExecProvider, error) { diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go b/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go index 729d8ca5c1..fde5d61d73 100644 --- a/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go +++ b/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go @@ -56,6 +56,9 @@ func NewCommitServices(ctx context.Context, ds sqlutil.DataSource, srcProvider c } commitStoreAddress := common.HexToAddress(spec.ContractID) + + // commit store contract doesn't exist on the source chain, but we have an implementation of it + // to get access to a gas estimator on the source chain srcCommitStore, err := srcProvider.NewCommitStoreReader(ctx, ccipcalc.EvmAddrToGeneric(commitStoreAddress)) if err != nil { return nil, err @@ -222,7 +225,7 @@ func CommitReportToEthTxMeta(typ ccipconfig.ContractType, ver semver.Version) (f // https://github.com/smartcontractkit/ccip/blob/68e2197472fb017dd4e5630d21e7878d58bc2a44/core/services/feeds/service.go#L716 // TODO once that transaction is broken up, we should be able to simply rely on oracle.Close() to cleanup the filters. // Until then we have to deterministically reload the readers from the spec (and thus their filters) and close them. -func UnregisterCommitPluginLpFilters(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer) error { +func UnregisterCommitPluginLpFilters(_ context.Context, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer) error { params, err := extractJobSpecParams(jb, chainSet) if err != nil { return err diff --git a/core/services/ocr2/plugins/ccip/ccipexec/initializers.go b/core/services/ocr2/plugins/ccip/ccipexec/initializers.go index a9f474c17d..e30ebd32c7 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/initializers.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/initializers.go @@ -50,7 +50,7 @@ var ( var defaultNewReportingPluginRetryConfig = ccipdata.RetryConfig{InitialDelay: time.Second, MaxDelay: 5 * time.Minute} -func NewExecServices(ctx context.Context, lggr logger.Logger, jb job.Job, srcProvider types.CCIPExecProvider, dstProvider types.CCIPExecProvider, srcChain legacyevm.Chain, dstChain legacyevm.Chain, srcChainID int64, dstChainID int64, chainSet legacyevm.LegacyChainContainer, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string)) ([]job.ServiceCtx, error) { +func NewExecServices(ctx context.Context, lggr logger.Logger, jb job.Job, srcProvider types.CCIPExecProvider, dstProvider types.CCIPExecProvider, srcChainID int64, dstChainID int64, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string)) ([]job.ServiceCtx, error) { if jb.OCR2OracleSpec == nil { return nil, fmt.Errorf("spec is nil") } @@ -89,21 +89,18 @@ func NewExecServices(ctx context.Context, lggr logger.Logger, jb job.Job, srcPro return nil, fmt.Errorf("get source wrapped native token: %w", err) } - versionFinder := ccip.NewEvmVersionFinder() - commitStoreReader, err := factory.NewCommitStoreReader(lggr, versionFinder, offRampConfig.CommitStore, dstChain.Client(), dstChain.LogPoller()) + srcCommitStore, err := srcProvider.NewCommitStoreReader(ctx, offRampConfig.CommitStore) if err != nil { - return nil, fmt.Errorf("could not load commitStoreReader reader: %w", err) + return nil, fmt.Errorf("could not create src commitStoreReader reader: %w", err) } - err = commitStoreReader.SetGasEstimator(ctx, srcChain.GasEstimator()) + dstCommitStore, err := dstProvider.NewCommitStoreReader(ctx, offRampConfig.CommitStore) if err != nil { - return nil, fmt.Errorf("could not set gas estimator: %w", err) + return nil, fmt.Errorf("could not create dst commitStoreReader reader: %w", err) } - err = commitStoreReader.SetSourceMaxGasPrice(ctx, srcChain.Config().EVM().GasEstimator().PriceMax().ToInt()) - if err != nil { - return nil, fmt.Errorf("could not set source max gas price: %w", err) - } + var commitStoreReader ccipdata.CommitStoreReader + commitStoreReader = ccip.NewProviderProxyCommitStoreReader(srcCommitStore, dstCommitStore) tokenDataProviders := make(map[cciptypes.Address]tokendata.Reader) // init usdc token data provider diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index 780bf4749b..3cc7633aa3 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -478,6 +478,8 @@ func (r *Relayer) NewCCIPExecProvider(rargs commontypes.RelayArgs, pargs commont r.lggr, versionFinder, r.chain.Client(), + r.chain.GasEstimator(), + r.chain.Config().EVM().GasEstimator().PriceMax().ToInt(), r.chain.LogPoller(), execPluginConfig.SourceStartBlock, execPluginConfig.JobID, diff --git a/core/services/relay/evm/exec_provider.go b/core/services/relay/evm/exec_provider.go index 757ca3a498..740a61f23d 100644 --- a/core/services/relay/evm/exec_provider.go +++ b/core/services/relay/evm/exec_provider.go @@ -28,6 +28,8 @@ type SrcExecProvider struct { client client.Client lp logpoller.LogPoller startBlock uint64 + estimator gas.EvmFeeEstimator + maxGasPrice *big.Int usdcReader *ccip.USDCReaderImpl usdcAttestationAPI string usdcAttestationAPITimeoutSeconds int @@ -39,6 +41,8 @@ func NewSrcExecProvider( lggr logger.Logger, versionFinder ccip.VersionFinder, client client.Client, + estimator gas.EvmFeeEstimator, + maxGasPrice *big.Int, lp logpoller.LogPoller, startBlock uint64, jobID string, @@ -60,6 +64,8 @@ func NewSrcExecProvider( lggr: lggr, versionFinder: versionFinder, client: client, + estimator: estimator, + maxGasPrice: maxGasPrice, lp: lp, startBlock: startBlock, usdcReader: usdcReader, @@ -120,9 +126,9 @@ func (s SrcExecProvider) Codec() commontypes.Codec { return nil } -func (s SrcExecProvider) NewCommitStoreReader(ctx context.Context, addr cciptypes.Address) (cciptypes.CommitStoreReader, error) { - // TODO CCIP-2493 - return nil, fmt.Errorf("invalid: NewCommitStoreReader not implemented") +func (s SrcExecProvider) NewCommitStoreReader(ctx context.Context, addr cciptypes.Address) (commitStoreReader cciptypes.CommitStoreReader, err error) { + commitStoreReader = NewIncompleteSourceCommitStoreReader(s.estimator, s.maxGasPrice) + return } func (s SrcExecProvider) NewOffRampReader(ctx context.Context, addr cciptypes.Address) (cciptypes.OffRampReader, error) { @@ -262,9 +268,10 @@ func (d DstExecProvider) Codec() commontypes.Codec { return nil } -func (d DstExecProvider) NewCommitStoreReader(ctx context.Context, addr cciptypes.Address) (cciptypes.CommitStoreReader, error) { - // TODO CCIP-2493 - return nil, fmt.Errorf("invalid: NewCommitStoreReader not yet implemented") +func (d DstExecProvider) NewCommitStoreReader(ctx context.Context, addr cciptypes.Address) (commitStoreReader cciptypes.CommitStoreReader, err error) { + versionFinder := ccip.NewEvmVersionFinder() + commitStoreReader, err = NewIncompleteDestCommitStoreReader(d.lggr, versionFinder, addr, d.client, d.lp) + return } func (d DstExecProvider) NewOffRampReader(ctx context.Context, offRampAddress cciptypes.Address) (offRampReader cciptypes.OffRampReader, err error) {