Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dual contract transmitter #15202

Merged
merged 13 commits into from
Nov 15, 2024
4 changes: 4 additions & 0 deletions common/txmgr/types/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ type TxMeta[ADDR types.Hashable, TX_HASH types.Hashable] struct {
MessageIDs []string `json:"MessageIDs,omitempty"`
// SeqNumbers is used by CCIP for tx to committed sequence numbers correlation in logs
SeqNumbers []uint64 `json:"SeqNumbers,omitempty"`

// Dual Broadcast
DualBroadcast *bool `json:"DualBroadcast,omitempty"`
DualBroadcastParams *string `json:"DualBroadcastParams,omitempty"`
}

type TxAttempt[
Expand Down
63 changes: 51 additions & 12 deletions core/services/job/job_orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2014,7 +2014,7 @@ func mustInsertPipelineRun(t *testing.T, orm pipeline.ORM, j job.Job) pipeline.R
return run
}

func TestORM_CreateJob_OCR2_With_AdaptiveSend(t *testing.T) {
func TestORM_CreateJob_OCR2_With_DualTransmission(t *testing.T) {
ctx := testutils.Context(t)
customChainID := big.New(testutils.NewRandomEVMChainID())

Expand All @@ -2030,27 +2030,66 @@ func TestORM_CreateJob_OCR2_With_AdaptiveSend(t *testing.T) {
db := pgtest.NewSqlxDB(t)
keyStore := cltest.NewKeyStore(t, db)
require.NoError(t, keyStore.OCR2().Add(ctx, cltest.DefaultOCR2Key))

_, transmitterID := cltest.MustInsertRandomKey(t, keyStore.Eth())

baseJobSpec := fmt.Sprintf(testspecs.OCR2EVMDualTransmissionSpecMinimalTemplate, transmitterID.String())

lggr := logger.TestLogger(t)
pipelineORM := pipeline.NewORM(db, lggr, config.JobPipeline().MaxSuccessfulRuns())
bridgesORM := bridges.NewORM(db)

jobORM := NewTestORM(t, db, pipelineORM, bridgesORM, keyStore)

adaptiveSendKey := cltest.MustGenerateRandomKey(t)
// Enabled but no config set
enabledDualTransmissionSpec := `
enableDualTransmission=true`

jb, err := ocr2validate.ValidatedOracleSpecToml(testutils.Context(t), config.OCR2(), config.Insecure(), testspecs.GetOCR2EVMWithAdaptiveSendSpecMinimal(cltest.DefaultOCR2Key.ID(), transmitterID.String(), adaptiveSendKey.EIP55Address.String()), nil)
jb, err := ocr2validate.ValidatedOracleSpecToml(testutils.Context(t), config.OCR2(), config.Insecure(), baseJobSpec+enabledDualTransmissionSpec, nil)
require.NoError(t, err)
require.ErrorContains(t, jobORM.CreateJob(ctx, &jb), "dual transmission is enabled but no dual transmission config present")

// ContractAddress not set
emptyContractAddress := `
enableDualTransmission=true
[relayConfig.dualTransmission]
contractAddress=""
`
jb, err = ocr2validate.ValidatedOracleSpecToml(testutils.Context(t), config.OCR2(), config.Insecure(), baseJobSpec+emptyContractAddress, nil)
require.NoError(t, err)
require.ErrorContains(t, jobORM.CreateJob(ctx, &jb), "invalid contract address in dual transmission config")

// Transmitter address not set
emptyTransmitterAddress := `
enableDualTransmission=true
[relayConfig.dualTransmission]
contractAddress = '0x613a38AC1659769640aaE063C651F48E0250454C'
transmitterAddress = ''
`
jb, err = ocr2validate.ValidatedOracleSpecToml(testutils.Context(t), config.OCR2(), config.Insecure(), baseJobSpec+emptyTransmitterAddress, nil)
require.NoError(t, err)
require.ErrorContains(t, jobORM.CreateJob(ctx, &jb), "invalid transmitter address in dual transmission config")

dtTransmitterAddress := cltest.MustGenerateRandomKey(t)
completeDualTransmissionSpec := fmt.Sprintf(`
enableDualTransmission=true
[relayConfig.dualTransmission]
contractAddress = '0x613a38AC1659769640aaE063C651F48E0250454C'
transmitterAddress = '%s'
[relayConfig.dualTransmission.meta]
key1 = 'val1'
key2 = ['val2','val3']
`,
dtTransmitterAddress.Address.String())

jb, err = ocr2validate.ValidatedOracleSpecToml(testutils.Context(t), config.OCR2(), config.Insecure(), baseJobSpec+completeDualTransmissionSpec, nil)
require.NoError(t, err)
require.Equal(t, "arbitrary-value", jb.AdaptiveSendSpec.Metadata["arbitraryParam"])

t.Run("unknown transmitter address", func(t *testing.T) {
require.ErrorContains(t, jobORM.CreateJob(ctx, &jb), "failed to validate AdaptiveSendSpec.TransmitterAddress: no EVM key matching")
})
jb.OCR2OracleSpec.TransmitterID = null.StringFrom(transmitterID.String())

t.Run("multiple jobs", func(t *testing.T) {
keyStore.Eth().XXXTestingOnlyAdd(ctx, adaptiveSendKey)
require.NoError(t, jobORM.CreateJob(ctx, &jb), "failed to validate AdaptiveSendSpec.TransmitterAddress: no EVM key matching")
})
// Unknown transmitter address
require.ErrorContains(t, jobORM.CreateJob(ctx, &jb), "unknown dual transmission transmitterAddress: no EVM key matching:")

// Should not error
keyStore.Eth().XXXTestingOnlyAdd(ctx, dtTransmitterAddress)
require.NoError(t, jobORM.CreateJob(ctx, &jb))
}
24 changes: 0 additions & 24 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ type Job struct {
CCIPSpecID *int32
CCIPSpec *CCIPSpec
CCIPBootstrapSpecID *int32
AdaptiveSendSpec *AdaptiveSendSpec `toml:"adaptiveSend"`
JobSpecErrors []SpecError
Type Type `toml:"type"`
SchemaVersion uint32 `toml:"schemaVersion"`
Expand Down Expand Up @@ -1061,26 +1060,3 @@ type CCIPSpec struct {
// and RMN network info for offchain blessing.
PluginConfig JSONConfig `toml:"pluginConfig"`
}

type AdaptiveSendSpec struct {
TransmitterAddress *evmtypes.EIP55Address `toml:"transmitterAddress"`
ContractAddress *evmtypes.EIP55Address `toml:"contractAddress"`
Delay time.Duration `toml:"delay"`
Metadata JSONConfig `toml:"metadata"`
}

func (o *AdaptiveSendSpec) Validate() error {
if o.TransmitterAddress == nil {
return errors.New("no AdaptiveSendSpec.TransmitterAddress found")
}

if o.ContractAddress == nil {
return errors.New("no AdaptiveSendSpec.ContractAddress found")
}

if o.Delay.Seconds() <= 1 {
return errors.New("AdaptiveSendSpec.Delay not set or smaller than 1s")
}

return nil
}
61 changes: 0 additions & 61 deletions core/services/job/models_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/codec"
"github.com/smartcontractkit/chainlink-common/pkg/types"
pkgworkflows "github.com/smartcontractkit/chainlink-common/pkg/workflows"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/relay"
Expand Down Expand Up @@ -352,62 +350,3 @@ func TestWorkflowSpec_Validate(t *testing.T) {
require.NotEmpty(t, w.WorkflowID)
})
}

func TestAdaptiveSendConfig(t *testing.T) {
tests := []struct {
name string
shouldError bool
expectedErrorMessage string
config job.AdaptiveSendSpec
}{
{
name: "AdaptiveSendSpec.TransmitterAddress not set",
shouldError: true,
expectedErrorMessage: "no AdaptiveSendSpec.TransmitterAddress found",
config: job.AdaptiveSendSpec{
TransmitterAddress: nil,
ContractAddress: ptr(cltest.NewEIP55Address()),
Delay: time.Second * 30,
},
},
{
name: "AdaptiveSendSpec.ContractAddress not set",
shouldError: true,
expectedErrorMessage: "no AdaptiveSendSpec.ContractAddress found",
config: job.AdaptiveSendSpec{
TransmitterAddress: ptr(cltest.NewEIP55Address()),
ContractAddress: nil,
Delay: time.Second * 30,
},
},
{
name: "AdaptiveSendSpec.Delay not set",
shouldError: true,
expectedErrorMessage: "AdaptiveSendSpec.Delay not set or smaller than 1s",
config: job.AdaptiveSendSpec{
TransmitterAddress: ptr(cltest.NewEIP55Address()),
ContractAddress: ptr(cltest.NewEIP55Address()),
},
},
{
name: "AdaptiveSendSpec.Delay set to 50ms",
shouldError: true,
expectedErrorMessage: "AdaptiveSendSpec.Delay not set or smaller than 1s",
config: job.AdaptiveSendSpec{
TransmitterAddress: ptr(cltest.NewEIP55Address()),
ContractAddress: ptr(cltest.NewEIP55Address()),
Delay: time.Millisecond * 50,
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.shouldError {
require.ErrorContains(t, test.config.Validate(), test.expectedErrorMessage)
} else {
require.NoError(t, test.config.Validate())
}
})
}
}
28 changes: 23 additions & 5 deletions core/services/job/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/types"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
evmconfig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/config"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
Expand Down Expand Up @@ -304,10 +303,29 @@ func (o *orm) CreateJob(ctx context.Context, jb *Job) error {
}
}

if jb.AdaptiveSendSpec != nil {
err = validateKeyStoreMatchForRelay(ctx, jb.OCR2OracleSpec.Relay, tx.keyStore, jb.AdaptiveSendSpec.TransmitterAddress.String())
if err != nil {
return fmt.Errorf("failed to validate AdaptiveSendSpec.TransmitterAddress: %w", err)
if enableDualTransmission, ok := jb.OCR2OracleSpec.RelayConfig["enableDualTransmission"]; ok && enableDualTransmission != nil {
rawDualTransmissionConfig, ok := jb.OCR2OracleSpec.RelayConfig["dualTransmission"]
if !ok {
return errors.New("dual transmission is enabled but no dual transmission config present")
}

dualTransmissionConfig, ok := rawDualTransmissionConfig.(map[string]interface{})
if !ok {
return errors.New("invalid dual transmission config")
}

dtContractAddress, ok := dualTransmissionConfig["contractAddress"].(string)
if !ok || !common.IsHexAddress(dtContractAddress) {
return errors.New("invalid contract address in dual transmission config")
}

dtTransmitterAddress, ok := dualTransmissionConfig["transmitterAddress"].(string)
if !ok || !common.IsHexAddress(dtTransmitterAddress) {
return errors.New("invalid transmitter address in dual transmission config")
}

if err = validateKeyStoreMatchForRelay(ctx, jb.OCR2OracleSpec.Relay, tx.keyStore, dtTransmitterAddress); err != nil {
return errors.Wrap(err, "unknown dual transmission transmitterAddress")
}
}

Expand Down
10 changes: 0 additions & 10 deletions core/services/ocr2/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ func ValidatedOracleSpecToml(ctx context.Context, config OCR2Config, insConf Ins
if err = validateTimingParameters(config, insConf, spec); err != nil {
return jb, err
}
if err = validateAdaptiveSendSpec(ctx, jb); err != nil {
return jb, err
}
return jb, nil
}

Expand Down Expand Up @@ -380,10 +377,3 @@ func validateOCR2LLOSpec(jsonConfig job.JSONConfig) error {
}
return pkgerrors.Wrap(pluginConfig.Validate(), "LLO PluginConfig is invalid")
}

func validateAdaptiveSendSpec(ctx context.Context, spec job.Job) error {
if spec.AdaptiveSendSpec != nil {
return spec.AdaptiveSendSpec.Validate()
}
return nil
}
70 changes: 68 additions & 2 deletions core/services/ocrcommon/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package ocrcommon

import (
"context"
errors2 "errors"
"math/big"
"net/url"
"slices"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -11,6 +13,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/forwarders"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
types2 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
)

type roundRobinKeystore interface {
Expand Down Expand Up @@ -88,13 +91,14 @@ func NewOCR2FeedsTransmitter(
checker txmgr.TransmitCheckerSpec,
chainID *big.Int,
keystore roundRobinKeystore,
dualTransmissionConfig *types2.DualTransmissionConfig,
) (Transmitter, error) {
// Ensure that a keystore is provided.
if keystore == nil {
return nil, errors.New("nil keystore provided to transmitter")
}

return &ocr2FeedsTransmitter{
baseTransmitter := &ocr2FeedsTransmitter{
ocr2Aggregator: ocr2Aggregator,
txManagerOCR2: txm,
transmitter: transmitter{
Expand All @@ -107,7 +111,17 @@ func NewOCR2FeedsTransmitter(
chainID: chainID,
keystore: keystore,
},
}, nil
}

if dualTransmissionConfig != nil {
return &ocr2FeedsDualTransmission{
transmitter: *baseTransmitter,
secondaryContractAddress: dualTransmissionConfig.ContractAddress,
secondaryFromAddress: dualTransmissionConfig.TransmitterAddress,
secondaryMeta: dualTransmissionConfig.Meta,
}, nil
}
return baseTransmitter, nil
}

func (t *transmitter) CreateEthTransaction(ctx context.Context, toAddress common.Address, payload []byte, txMeta *txmgr.TxMeta) error {
Expand Down Expand Up @@ -203,3 +217,55 @@ func (t *ocr2FeedsTransmitter) forwarderAddress(ctx context.Context, eoa, ocr2Ag

return forwarderAddress, nil
}

type ocr2FeedsDualTransmission struct {
transmitter ocr2FeedsTransmitter

secondaryContractAddress common.Address
secondaryFromAddress common.Address
secondaryMeta map[string][]string
}

func (t *ocr2FeedsDualTransmission) CreateEthTransaction(ctx context.Context, toAddress common.Address, payload []byte, txMeta *txmgr.TxMeta) error {
// Primary transmission
errPrimary := t.transmitter.CreateEthTransaction(ctx, toAddress, payload, txMeta)
pavel-raykov marked this conversation as resolved.
Show resolved Hide resolved
errPrimary = errors.Wrap(errPrimary, "skipped primary transmission")
george-dorin marked this conversation as resolved.
Show resolved Hide resolved

if txMeta == nil {
txMeta = &txmgr.TxMeta{}
}

dualBroadcast := true
dualBroadcastParams := t.urlParams()

txMeta.DualBroadcast = &dualBroadcast
txMeta.DualBroadcastParams = &dualBroadcastParams

// Secondary transmission
_, errSecondary := t.transmitter.txm.CreateTransaction(ctx, txmgr.TxRequest{
FromAddress: t.secondaryFromAddress,
ToAddress: t.secondaryContractAddress,
EncodedPayload: payload,
FeeLimit: t.transmitter.gasLimit,
Strategy: t.transmitter.strategy,
Checker: t.transmitter.checker,
Meta: txMeta,
})

errSecondary = errors.Wrap(errSecondary, "skipped secondary transmission")
return errors2.Join(errPrimary, errSecondary)
}

func (t *ocr2FeedsDualTransmission) FromAddress(ctx context.Context) common.Address {
return t.transmitter.FromAddress(ctx)
}

func (t *ocr2FeedsDualTransmission) urlParams() string {
values := url.Values{}
for k, v := range t.secondaryMeta {
for _, p := range v {
values.Add(k, p)
}
}
return values.Encode()
}
Loading
Loading