Skip to content

Commit

Permalink
Passing qopts to readers instances to allow registering filters withi…
Browse files Browse the repository at this point in the history
…n the same TX (#318)
  • Loading branch information
mateusz-sekara authored Nov 24, 2023
1 parent f97a413 commit 41ac11f
Show file tree
Hide file tree
Showing 23 changed files with 154 additions and 79 deletions.
14 changes: 13 additions & 1 deletion core/services/ocr2/plugins/ccip/commit_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Run
}

// Load all the readers relevant for this plugin.
onRampReader, err := ccipdata.NewOnRampReader(commitLggr, staticConfig.SourceChainSelector, staticConfig.ChainSelector, staticConfig.OnRamp, sourceChain.LogPoller(), sourceChain.Client(), qopts...)
onRampReader, err := ccipdata.NewOnRampReader(commitLggr, staticConfig.SourceChainSelector, staticConfig.ChainSelector, staticConfig.OnRamp, sourceChain.LogPoller(), sourceChain.Client())
if err != nil {
return nil, nil, errors.Wrap(err, "failed onramp reader")
}
Expand Down Expand Up @@ -128,6 +128,18 @@ func NewCommitServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainC
}
wrappedPluginFactory := NewCommitReportingPluginFactory(*pluginConfig)

if err1 := pluginConfig.onRampReader.RegisterFilters(qopts...); err1 != nil {
return nil, err1
}

if err1 := pluginConfig.commitStore.RegisterFilters(qopts...); err1 != nil {
return nil, err1
}

if err1 := pluginConfig.offRamp.RegisterFilters(qopts...); err1 != nil {
return nil, err1
}

argsNoPlugin.ReportingPluginFactory = promwrapper.NewPromFactory(wrappedPluginFactory, "CCIPCommit", jb.OCR2OracleSpec.Relay, pluginConfig.destChainEVMID)
argsNoPlugin.Logger = relaylogger.NewOCRWrapper(pluginConfig.lggr, true, logError)
oracle, err := libocr2.NewOracle(argsNoPlugin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ func TestCommitReportingPlugin_Report(t *testing.T) {
tokenDecimalsCache.On("Get", ctx).Return(tc.tokenDecimals, nil)

lp := mocks2.NewLogPoller(t)
lp.On("RegisterFilter", mock.Anything).Return(nil)
commitStoreReader, err := ccipdata.NewCommitStoreV1_2_0(logger.TestLogger(t), utils.RandomAddress(), nil, lp, nil)
assert.NoError(t, err)

Expand Down
19 changes: 14 additions & 5 deletions core/services/ocr2/plugins/ccip/execution_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
)

// TODO pass context?
func jobSpecToExecPluginConfig(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer) (*ExecutionPluginStaticConfig, *BackfillArgs, error) {
func jobSpecToExecPluginConfig(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, qopts ...pg.QOpt) (*ExecutionPluginStaticConfig, *BackfillArgs, error) {
if jb.OCR2OracleSpec == nil {
return nil, nil, errors.New("spec is nil")
}
Expand Down Expand Up @@ -73,8 +73,7 @@ func jobSpecToExecPluginConfig(lggr logger.Logger, jb job.Job, chainSet evm.Lega
execLggr := lggr.Named("CCIPExecution").With(
"sourceChain", ChainName(int64(chainID)),
"destChain", ChainName(destChainID))
onRampReader, err := ccipdata.NewOnRampReader(execLggr, offRampConfig.SourceChainSelector,
offRampConfig.ChainSelector, offRampConfig.OnRamp, sourceChain.LogPoller(), sourceChain.Client())
onRampReader, err := ccipdata.NewOnRampReader(execLggr, offRampConfig.SourceChainSelector, offRampConfig.ChainSelector, offRampConfig.OnRamp, sourceChain.LogPoller(), sourceChain.Client())
if err != nil {
return nil, nil, errors.Wrap(err, "create onramp reader")
}
Expand Down Expand Up @@ -145,12 +144,22 @@ func jobSpecToExecPluginConfig(lggr logger.Logger, jb job.Job, chainSet evm.Lega
}

func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string), qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
execPluginConfig, backfillArgs, err := jobSpecToExecPluginConfig(lggr, jb, chainSet)
execPluginConfig, backfillArgs, err := jobSpecToExecPluginConfig(lggr, jb, chainSet, qopts...)
if err != nil {
return nil, err
}
wrappedPluginFactory := NewExecutionReportingPluginFactory(*execPluginConfig)

if err1 := execPluginConfig.offRampReader.RegisterFilters(qopts...); err1 != nil {
return nil, err1
}
if err1 := execPluginConfig.onRampReader.RegisterFilters(qopts...); err1 != nil {
return nil, err1
}
if err1 := execPluginConfig.commitStoreReader.RegisterFilters(qopts...); err1 != nil {
return nil, err1
}

argsNoPlugin.ReportingPluginFactory = promwrapper.NewPromFactory(wrappedPluginFactory, "CCIPExecution", jb.OCR2OracleSpec.Relay, execPluginConfig.destChainEVMID)
argsNoPlugin.Logger = relaylogger.NewOCRWrapper(execPluginConfig.lggr, true, logError)
oracle, err := libocr2.NewOracle(argsNoPlugin)
Expand Down Expand Up @@ -207,7 +216,7 @@ func getTokenDataProviders(lggr logger.Logger, pluginConfig ccipconfig.Execution
// UnregisterExecPluginLpFilters unregisters all the registered filters for both source and dest chains.
// See comment in UnregisterCommitPluginLpFilters
func UnregisterExecPluginLpFilters(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, qopts ...pg.QOpt) error {
execPluginConfig, _, err := jobSpecToExecPluginConfig(lggr, jb, chainSet)
execPluginConfig, _, err := jobSpecToExecPluginConfig(lggr, jb, chainSet, qopts...)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,6 @@ func TestExecutionReportingPlugin_buildReport(t *testing.T) {
p.config.commitStoreReader = commitStore

lp := lpMocks.NewLogPoller(t)
lp.On("RegisterFilter", mock.Anything).Return(nil)
offRampReader, err := ccipdata.NewOffRampV1_0_0(logger.TestLogger(t), utils.RandomAddress(), nil, lp, nil)
assert.NoError(t, err)
p.config.offRampReader = offRampReader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/abihelpers"
ccipconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/config"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/prices"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

type CommitStoreInterval struct {
Expand Down Expand Up @@ -111,6 +112,7 @@ type CommitStoreReader interface {
DecodeCommitReport(report []byte) (CommitStoreReport, error)
VerifyExecutionReport(ctx context.Context, report ExecReport) (bool, error)
GetCommitStoreStaticConfig(ctx context.Context) (CommitStoreStaticConfig, error)
RegisterFilters(qopts ...pg.QOpt) error
}

func NewCommitStoreReader(lggr logger.Logger, address common.Address, ec client.Client, lp logpoller.LogPoller, estimator gas.EvmFeeEstimator) (CommitStoreReader, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ func TestCommitFilters(t *testing.T) {
assertFilterRegistration(t, new(lpmocks.LogPoller), func(lp *lpmocks.LogPoller, addr common.Address) ccipdata.Closer {
c, err := ccipdata.NewCommitStoreV1_0_0(logger.TestLogger(t), addr, new(mocks.Client), lp, nil)
require.NoError(t, err)
require.NoError(t, c.RegisterFilters())
return c
}, 1)
assertFilterRegistration(t, new(lpmocks.LogPoller), func(lp *lpmocks.LogPoller, addr common.Address) ccipdata.Closer {
c, err := ccipdata.NewCommitStoreV1_2_0(logger.TestLogger(t), addr, new(mocks.Client), lp, nil)
require.NoError(t, err)
require.NoError(t, c.RegisterFilters())
return c
}, 1)
}
Expand Down Expand Up @@ -213,9 +215,11 @@ func TestCommitStoreReaders(t *testing.T) {
ge := new(gasmocks.EvmFeeEstimator)
c10r, err := ccipdata.NewCommitStoreReader(lggr, addr, ec, lp, ge)
require.NoError(t, err)
require.NoError(t, c10r.RegisterFilters())
assert.Equal(t, reflect.TypeOf(c10r).String(), reflect.TypeOf(&ccipdata.CommitStoreV1_0_0{}).String())
c12r, err := ccipdata.NewCommitStoreReader(lggr, addr2, ec, lp, ge)
require.NoError(t, err)
require.NoError(t, c12r.RegisterFilters())
assert.Equal(t, reflect.TypeOf(c12r).String(), reflect.TypeOf(&ccipdata.CommitStoreV1_2_0{}).String())

// Apply config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,10 @@ func (c *CommitStoreV1_0_0) VerifyExecutionReport(ctx context.Context, report Ex
return true, nil
}

func (c *CommitStoreV1_0_0) RegisterFilters(qopts ...pg.QOpt) error {
return logpollerutil.RegisterLpFilters(c.lp, c.filters, qopts...)
}

func NewCommitStoreV1_0_0(lggr logger.Logger, addr common.Address, ec client.Client, lp logpoller.LogPoller, estimator gas.EvmFeeEstimator) (*CommitStoreV1_0_0, error) {
commitStore, err := commit_store_1_0_0.NewCommitStore(addr, ec)
if err != nil {
Expand All @@ -354,16 +358,13 @@ func NewCommitStoreV1_0_0(lggr logger.Logger, addr common.Address, ec client.Cli
commitStoreABI := abihelpers.MustParseABI(commit_store_1_0_0.CommitStoreABI)
eventSig := abihelpers.MustGetEventID(ReportAccepted, commitStoreABI)
commitReportArgs := abihelpers.MustGetEventInputs(ReportAccepted, commitStoreABI)
var filters = []logpoller.Filter{
filters := []logpoller.Filter{
{
Name: logpoller.FilterName(EXEC_REPORT_ACCEPTS, addr.String()),
EventSigs: []common.Hash{eventSig},
Addresses: []common.Address{addr},
},
}
if err := logpollerutil.RegisterLpFilters(lp, filters); err != nil {
return nil, err
}
return &CommitStoreV1_0_0{
commitStore: commitStore,
address: addr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller/mocks"
Expand All @@ -32,10 +31,7 @@ func TestCommitReportEncodingV1_0_0(t *testing.T) {
Interval: CommitStoreInterval{Min: 1, Max: 10},
}

lp := mocks.NewLogPoller(t)
lp.On("RegisterFilter", mock.Anything).Return(nil)

c, err := NewCommitStoreV1_0_0(logger.TestLogger(t), utils.RandomAddress(), nil, lp, nil)
c, err := NewCommitStoreV1_0_0(logger.TestLogger(t), utils.RandomAddress(), nil, mocks.NewLogPoller(t), nil)
assert.NoError(t, err)

encodedReport, err := c.EncodeCommitReport(report)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,10 @@ func (c *CommitStoreV1_2_0) VerifyExecutionReport(ctx context.Context, report Ex
return true, nil
}

func (c *CommitStoreV1_2_0) RegisterFilters(qopts ...pg.QOpt) error {
return logpollerutil.RegisterLpFilters(c.lp, c.filters, qopts...)
}

func NewCommitStoreV1_2_0(lggr logger.Logger, addr common.Address, ec client.Client, lp logpoller.LogPoller, estimator gas.EvmFeeEstimator) (*CommitStoreV1_2_0, error) {
commitStore, err := commit_store.NewCommitStore(addr, ec)
if err != nil {
Expand All @@ -374,17 +378,13 @@ func NewCommitStoreV1_2_0(lggr logger.Logger, addr common.Address, ec client.Cli
commitStoreABI := abihelpers.MustParseABI(commit_store.CommitStoreABI)
eventSig := abihelpers.MustGetEventID(ReportAccepted, commitStoreABI)
commitReportArgs := abihelpers.MustGetEventInputs(ReportAccepted, commitStoreABI)
var filters = []logpoller.Filter{
filters := []logpoller.Filter{
{
Name: logpoller.FilterName(EXEC_REPORT_ACCEPTS, addr.String()),
EventSigs: []common.Hash{eventSig},
Addresses: []common.Address{addr},
},
}
if err := logpollerutil.RegisterLpFilters(lp, filters); err != nil {
return nil, err
}

lggr.Infow("Initializing CommitStoreV1_2_0 with estimator", "estimator", estimator)

return &CommitStoreV1_2_0{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller/mocks"
Expand Down Expand Up @@ -40,10 +39,7 @@ func TestCommitReportEncodingV_1_2_0(t *testing.T) {
Interval: CommitStoreInterval{Min: 1, Max: 10},
}

lp := mocks.NewLogPoller(t)
lp.On("RegisterFilter", mock.Anything).Return(nil)

c, err := NewCommitStoreV1_2_0(logger.TestLogger(t), utils.RandomAddress(), nil, lp, nil)
c, err := NewCommitStoreV1_2_0(logger.TestLogger(t), utils.RandomAddress(), nil, mocks.NewLogPoller(t), nil)
assert.NoError(t, err)

encodedReport, err := c.EncodeCommitReport(report)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
ccipconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/config"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/prices"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/store/models"
)

Expand Down Expand Up @@ -113,6 +114,7 @@ type TokenBucketRateLimit struct {
//go:generate mockery --quiet --name OffRampReader --filename offramp_reader_mock.go --case=underscore
type OffRampReader interface {
Closer
RegisterFilters(qopts ...pg.QOpt) error
// Will error if messages are not a compatible version.
EncodeExecutionReport(report ExecReport) ([]byte, error)
DecodeExecutionReport(report []byte) (ExecReport, error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ func TestOffRampFilters(t *testing.T) {
assertFilterRegistration(t, new(lpmocks.LogPoller), func(lp *lpmocks.LogPoller, addr common.Address) ccipdata.Closer {
c, err := ccipdata.NewOffRampV1_0_0(logger.TestLogger(t), addr, new(mocks.Client), lp, nil)
require.NoError(t, err)
require.NoError(t, c.RegisterFilters())
return c
}, 3)
assertFilterRegistration(t, new(lpmocks.LogPoller), func(lp *lpmocks.LogPoller, addr common.Address) ccipdata.Closer {
c, err := ccipdata.NewOffRampV1_2_0(logger.TestLogger(t), addr, new(mocks.Client), lp, nil)
require.NoError(t, err)
require.NoError(t, c.RegisterFilters())
return c
}, 3)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks"
Expand All @@ -28,9 +27,7 @@ func TestExecutionReportEncodingV100(t *testing.T) {
ProofFlagBits: big.NewInt(133),
}

lp := lpmocks.NewLogPoller(t)
lp.On("RegisterFilter", mock.Anything).Return(nil)
offRamp, err := ccipdata.NewOffRampV1_0_0(logger.TestLogger(t), utils.RandomAddress(), nil, lp, nil)
offRamp, err := ccipdata.NewOffRampV1_0_0(logger.TestLogger(t), utils.RandomAddress(), nil, lpmocks.NewLogPoller(t), nil)
require.NoError(t, err)

encodeExecutionReport, err := offRamp.EncodeExecutionReport(report)
Expand All @@ -45,6 +42,7 @@ func TestOffRampFiltersV100(t *testing.T) {
assertFilterRegistration(t, new(lpmocks.LogPoller), func(lp *lpmocks.LogPoller, addr common.Address) ccipdata.Closer {
c, err := ccipdata.NewOffRampV1_0_0(logger.TestLogger(t), addr, new(mocks.Client), lp, nil)
require.NoError(t, err)
require.NoError(t, c.RegisterFilters())
return c
}, 3)
}
Loading

0 comments on commit 41ac11f

Please sign in to comment.