From 451984ad0469c7d685e305dba0a0d94eb0c9a053 Mon Sep 17 00:00:00 2001 From: nogo <110664798+0xnogo@users.noreply.github.com> Date: Tue, 30 Jul 2024 18:18:58 +0200 Subject: [PATCH] Zk Overflow in Execution Plugin (#947) Changes to make the execution plugin overflow-compatible --- .mockery.yaml | 3 + core/services/ocr2/delegate.go | 10 +- .../ocr2/plugins/ccip/ccipexec/batching.go | 345 +++++++ .../plugins/ccip/ccipexec/batching_test.go | 844 ++++++++++++++++++ .../ocr2/plugins/ccip/ccipexec/factory.go | 6 + .../plugins/ccip/ccipexec/initializers.go | 2 + .../ocr2/plugins/ccip/ccipexec/ocr2.go | 223 +---- .../ocr2/plugins/ccip/ccipexec/ocr2_test.go | 450 +--------- .../ccip/internal/ccipdata/v1_0_0/offramp.go | 3 + .../internal/ccipdata/v1_0_0/offramp_test.go | 7 +- .../ccip/internal/ccipdata/v1_2_0/offramp.go | 3 + .../internal/ccipdata/v1_2_0/offramp_test.go | 45 +- .../ccip/internal/ccipdata/v1_5_0/offramp.go | 1 + .../plugins/ccip/transmitter/transmitter.go | 143 +++ .../ccip/transmitter/transmitter_test.go | 282 ++++++ core/services/relay/evm/evm.go | 12 + .../mocks/ccip_transaction_status_checker.go | 104 +++ .../evm/statuschecker/txm_status_checker.go | 54 ++ .../statuschecker/txm_status_checker_test.go | 103 +++ 19 files changed, 1990 insertions(+), 650 deletions(-) create mode 100644 core/services/ocr2/plugins/ccip/transmitter/transmitter.go create mode 100644 core/services/ocr2/plugins/ccip/transmitter/transmitter_test.go create mode 100644 core/services/relay/evm/statuschecker/mocks/ccip_transaction_status_checker.go create mode 100644 core/services/relay/evm/statuschecker/txm_status_checker.go create mode 100644 core/services/relay/evm/statuschecker/txm_status_checker_test.go diff --git a/.mockery.yaml b/.mockery.yaml index 529a3f22ad..d458eef5d5 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -511,6 +511,9 @@ packages: PriceGetter: config: mockname: "Mock{{ .InterfaceName }}" + github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/statuschecker: + interfaces: + CCIPTransactionStatusChecker: github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/rpclib: config: outpkg: rpclibmocks diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 5246780e04..92d6cc65bc 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -330,15 +330,15 @@ func (d *Delegate) cleanupEVM(ctx context.Context, jb job.Job, relayID types.Rel dstProvider, err2 := d.ccipCommitGetDstProvider(ctx, jb, pluginJobSpecConfig, transmitterID) if err2 != nil { - return err2 + return err } srcProvider, _, err2 := d.ccipCommitGetSrcProvider(ctx, jb, pluginJobSpecConfig, transmitterID, dstProvider) if err2 != nil { - return err2 + return err } err2 = ccipcommit.UnregisterCommitPluginLpFilters(srcProvider, dstProvider) - if err != nil { + if err2 != nil { d.lggr.Errorw("failed to unregister ccip commit plugin filters", "err", err2, "spec", spec) } return nil @@ -353,12 +353,12 @@ func (d *Delegate) cleanupEVM(ctx context.Context, jb job.Job, relayID types.Rel dstProvider, err2 := d.ccipExecGetDstProvider(ctx, jb, pluginJobSpecConfig, transmitterID) if err2 != nil { - return err2 + return err } srcProvider, _, err2 := d.ccipExecGetSrcProvider(ctx, jb, pluginJobSpecConfig, transmitterID, dstProvider) if err2 != nil { - return err2 + return err } err2 = ccipexec.UnregisterExecPluginLpFilters(srcProvider, dstProvider) if err2 != nil { diff --git a/core/services/ocr2/plugins/ccip/ccipexec/batching.go b/core/services/ocr2/plugins/ccip/ccipexec/batching.go index e3e7ae3253..b457dd986d 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/batching.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/batching.go @@ -3,19 +3,303 @@ package ccipexec import ( "context" "fmt" + "math/big" + "time" + mapset "github.com/deckarep/golang-set/v2" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/pkg/errors" "github.com/smartcontractkit/chainlink-common/pkg/hashutil" "github.com/smartcontractkit/chainlink-common/pkg/merklemulti" + "github.com/smartcontractkit/chainlink-common/pkg/types" cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" + "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/abihelpers" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/prices" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/statuschecker" ) +type BatchContext struct { + report commitReportWithSendRequests + inflight []InflightInternalExecutionReport + inflightAggregateValue *big.Int + lggr logger.Logger + availableDataLen int + availableGas uint64 + expectedNonces map[cciptypes.Address]uint64 + sendersNonce map[cciptypes.Address]uint64 + sourceTokenPricesUSD map[cciptypes.Address]*big.Int + destTokenPricesUSD map[cciptypes.Address]*big.Int + gasPrice *big.Int + sourceToDestToken map[cciptypes.Address]cciptypes.Address + aggregateTokenLimit *big.Int + tokenDataRemainingDuration time.Duration + tokenDataWorker tokendata.Worker + gasPriceEstimator prices.GasPriceEstimatorExec + destWrappedNative cciptypes.Address + offchainConfig cciptypes.ExecOffchainConfig +} + +type BatchingStrategy interface { + BuildBatch(ctx context.Context, batchCtx *BatchContext) ([]ccip.ObservedMessage, []messageExecStatus) +} + +type BestEffortBatchingStrategy struct{} + +type ZKOverflowBatchingStrategy struct { + statuschecker statuschecker.CCIPTransactionStatusChecker +} + +func NewBatchingStrategy(batchingStrategyID uint32, statusChecker statuschecker.CCIPTransactionStatusChecker) (BatchingStrategy, error) { + var batchingStrategy BatchingStrategy + switch batchingStrategyID { + case 0: + batchingStrategy = &BestEffortBatchingStrategy{} + case 1: + batchingStrategy = &ZKOverflowBatchingStrategy{ + statuschecker: statusChecker, + } + default: + return nil, errors.Errorf("unknown batching strategy ID %d", batchingStrategyID) + } + return batchingStrategy, nil +} + +// BestEffortBatchingStrategy is a batching strategy that tries to batch as many messages as possible (up to certain limits). +func (s *BestEffortBatchingStrategy) BuildBatch( + ctx context.Context, + batchCtx *BatchContext, +) ([]ccip.ObservedMessage, []messageExecStatus) { + batchBuilder := newBatchBuildContainer(len(batchCtx.report.sendRequestsWithMeta)) + for _, msg := range batchCtx.report.sendRequestsWithMeta { + msgLggr := batchCtx.lggr.With("messageID", hexutil.Encode(msg.MessageID[:]), "seqNr", msg.SequenceNumber) + status, messageMaxGas, tokenData, msgValue, err := performCommonChecks(ctx, batchCtx, msg, msgLggr) + + if err != nil { + return []ccip.ObservedMessage{}, []messageExecStatus{} + } + + if status.shouldBeSkipped() { + batchBuilder.skip(msg, status) + continue + } + + updateBatchContext(batchCtx, msg, messageMaxGas, msgValue, msgLggr) + batchBuilder.addToBatch(msg, tokenData) + } + return batchBuilder.batch, batchBuilder.statuses +} + +// ZKOverflowBatchingStrategy is a batching strategy for ZK chains overflowing under certain conditions. +// It is a simple batching strategy that only allows one message to be added to the batch. +// TXM is used to perform the ZK check: if the message failed the check, it will be skipped. +func (bs ZKOverflowBatchingStrategy) BuildBatch( + ctx context.Context, + batchCtx *BatchContext, +) ([]ccip.ObservedMessage, []messageExecStatus) { + batchBuilder := newBatchBuildContainer(len(batchCtx.report.sendRequestsWithMeta)) + inflightSeqNums := getInflightSeqNums(batchCtx.inflight) + + for _, msg := range batchCtx.report.sendRequestsWithMeta { + msgId := hexutil.Encode(msg.MessageID[:]) + msgLggr := batchCtx.lggr.With("messageID", msgId, "seqNr", msg.SequenceNumber) + + // Check if msg is inflight + if exists := inflightSeqNums.Contains(msg.SequenceNumber); exists { + // Message is inflight, skip it + msgLggr.Infow("Skipping message - already inflight", "message", msgId) + batchBuilder.skip(msg, SkippedInflight) + continue + } + // Message is not inflight, continue with checks + // Check if the messsage is overflown using TXM + statuses, count, err := bs.statuschecker.CheckMessageStatus(ctx, msgId) + if err != nil { + batchBuilder.skip(msg, TXMCheckError) + continue + } + + msgLggr.Infow("TXM check result", "statuses", statuses, "count", count) + + if len(statuses) == 0 { + // No status found for message = first time we see it + msgLggr.Infow("No status found for message - proceeding with checks", "message", msgId) + } else { + // Status(es) found for message = check if any of them is final to decide if we should add it to the batch + hasFatalStatus := false + for _, s := range statuses { + if s == types.Fatal { + msgLggr.Infow("Skipping message - found a fatal TXM status", "message", msgId) + batchBuilder.skip(msg, TXMFatalStatus) + hasFatalStatus = true + break + } + } + if hasFatalStatus { + continue + } + msgLggr.Infow("No fatal status found for message - proceeding with checks", "message", msgId) + } + + status, messageMaxGas, tokenData, msgValue, err := performCommonChecks(ctx, batchCtx, msg, msgLggr) + + if err != nil { + return []ccip.ObservedMessage{}, []messageExecStatus{} + } + + if status.shouldBeSkipped() { + batchBuilder.skip(msg, status) + continue + } + + updateBatchContext(batchCtx, msg, messageMaxGas, msgValue, msgLggr) + msgLggr.Infow("Adding message to batch", "message", msgId) + batchBuilder.addToBatch(msg, tokenData) + + // Batch size is limited to 1 for ZK Overflow chains + break + } + return batchBuilder.batch, batchBuilder.statuses +} + +func performCommonChecks( + ctx context.Context, + batchCtx *BatchContext, + msg cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta, + msgLggr logger.Logger, +) (messageStatus, uint64, [][]byte, *big.Int, error) { + if msg.Executed { + msgLggr.Infow("Skipping message - already executed") + return AlreadyExecuted, 0, nil, nil, nil + } + + if len(msg.Data) > batchCtx.availableDataLen { + msgLggr.Infow("Skipping message - insufficient remaining batch data length", "msgDataLen", len(msg.Data), "availableBatchDataLen", batchCtx.availableDataLen) + return InsufficientRemainingBatchDataLength, 0, nil, nil, nil + } + + messageMaxGas, err1 := calculateMessageMaxGas( + msg.GasLimit, + len(batchCtx.report.sendRequestsWithMeta), + len(msg.Data), + len(msg.TokenAmounts), + ) + if err1 != nil { + msgLggr.Errorw("Skipping message - message max gas calculation error", "err", err1) + return MessageMaxGasCalcError, 0, nil, nil, nil + } + + // Check sufficient gas in batch + if batchCtx.availableGas < messageMaxGas { + msgLggr.Infow("Skipping message - insufficient remaining batch gas limit", "availableGas", batchCtx.availableGas, "messageMaxGas", messageMaxGas) + return InsufficientRemainingBatchGas, 0, nil, nil, nil + } + + if _, ok := batchCtx.expectedNonces[msg.Sender]; !ok { + nonce, ok1 := batchCtx.sendersNonce[msg.Sender] + if !ok1 { + msgLggr.Errorw("Skipping message - missing nonce", "sender", msg.Sender) + return MissingNonce, 0, nil, nil, nil + } + batchCtx.expectedNonces[msg.Sender] = nonce + 1 + } + + // Check expected nonce is valid for sequenced messages. + // Sequenced messages have non-zero nonces. + if msg.Nonce > 0 && msg.Nonce != batchCtx.expectedNonces[msg.Sender] { + msgLggr.Warnw("Skipping message - invalid nonce", "have", msg.Nonce, "want", batchCtx.expectedNonces[msg.Sender]) + return InvalidNonce, 0, nil, nil, nil + } + + msgValue, err1 := aggregateTokenValue(batchCtx.lggr, batchCtx.destTokenPricesUSD, batchCtx.sourceToDestToken, msg.TokenAmounts) + if err1 != nil { + msgLggr.Errorw("Skipping message - aggregate token value compute error", "err", err1) + return AggregateTokenValueComputeError, 0, nil, nil, nil + } + + // if token limit is smaller than message value skip message + if tokensLeft, hasCapacity := hasEnoughTokens(batchCtx.aggregateTokenLimit, msgValue, batchCtx.inflightAggregateValue); !hasCapacity { + msgLggr.Warnw("Skipping message - aggregate token limit exceeded", "aggregateTokenLimit", tokensLeft.String(), "msgValue", msgValue.String()) + return AggregateTokenLimitExceeded, 0, nil, nil, nil + } + + tokenData, elapsed, err1 := getTokenDataWithTimeout(ctx, msg, batchCtx.tokenDataRemainingDuration, batchCtx.tokenDataWorker) + batchCtx.tokenDataRemainingDuration -= elapsed + if err1 != nil { + if errors.Is(err1, tokendata.ErrNotReady) { + msgLggr.Warnw("Skipping message - token data not ready", "err", err1) + return TokenDataNotReady, 0, nil, nil, nil + } + msgLggr.Errorw("Skipping message - token data fetch error", "err", err1) + return TokenDataFetchError, 0, nil, nil, nil + } + + dstWrappedNativePrice, exists := batchCtx.destTokenPricesUSD[batchCtx.destWrappedNative] + if !exists { + msgLggr.Errorw("Skipping message - token not in destination token prices", "token", batchCtx.destWrappedNative) + return TokenNotInDestTokenPrices, 0, nil, nil, nil + } + + // calculating the source chain fee, dividing by 1e18 for denomination. + // For example: + // FeeToken=link; FeeTokenAmount=1e17 i.e. 0.1 link, price is 6e18 USD/link (1 USD = 1e18), + // availableFee is 1e17*6e18/1e18 = 6e17 = 0.6 USD + sourceFeeTokenPrice, exists := batchCtx.sourceTokenPricesUSD[msg.FeeToken] + if !exists { + msgLggr.Errorw("Skipping message - token not in source token prices", "token", msg.FeeToken) + return TokenNotInSrcTokenPrices, 0, nil, nil, nil + } + + // Fee boosting + execCostUsd, err1 := batchCtx.gasPriceEstimator.EstimateMsgCostUSD(batchCtx.gasPrice, dstWrappedNativePrice, msg) + if err1 != nil { + msgLggr.Errorw("Failed to estimate message cost USD", "err", err1) + return "", 0, nil, nil, errors.New("failed to estimate message cost USD") + } + + availableFee := big.NewInt(0).Mul(msg.FeeTokenAmount, sourceFeeTokenPrice) + availableFee = availableFee.Div(availableFee, big.NewInt(1e18)) + availableFeeUsd := waitBoostedFee(time.Since(msg.BlockTimestamp), availableFee, batchCtx.offchainConfig.RelativeBoostPerWaitHour) + if availableFeeUsd.Cmp(execCostUsd) < 0 { + msgLggr.Infow( + "Skipping message - insufficient remaining fee", + "availableFeeUsd", availableFeeUsd, + "execCostUsd", execCostUsd, + "sourceBlockTimestamp", msg.BlockTimestamp, + "waitTime", time.Since(msg.BlockTimestamp), + "boost", batchCtx.offchainConfig.RelativeBoostPerWaitHour, + ) + return InsufficientRemainingFee, 0, nil, nil, nil + } + + return SuccesfullyValidated, messageMaxGas, tokenData, msgValue, nil +} + +// getTokenDataWithCappedLatency gets the token data for the provided message. +// Stops and returns an error if more than allowedWaitingTime is passed. +func getTokenDataWithTimeout( + ctx context.Context, + msg cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta, + timeout time.Duration, + tokenDataWorker tokendata.Worker, +) ([][]byte, time.Duration, error) { + if len(msg.TokenAmounts) == 0 { + return nil, 0, nil + } + + ctxTimeout, cf := context.WithTimeout(ctx, timeout) + defer cf() + tStart := time.Now() + tokenData, err := tokenDataWorker.GetMsgTokenData(ctxTimeout, msg) + tDur := time.Since(tStart) + return tokenData, tDur, err +} + func getProofData( ctx context.Context, sourceReader ccipdata.OnRampReader, @@ -67,6 +351,59 @@ func validateSendRequests(sendReqs []cciptypes.EVM2EVMMessageWithTxMeta, interva return nil } +func getInflightSeqNums(inflight []InflightInternalExecutionReport) mapset.Set[uint64] { + seqNums := mapset.NewSet[uint64]() + for _, report := range inflight { + for _, msg := range report.messages { + seqNums.Add(msg.SequenceNumber) + } + } + return seqNums +} + +func aggregateTokenValue(lggr logger.Logger, destTokenPricesUSD map[cciptypes.Address]*big.Int, sourceToDest map[cciptypes.Address]cciptypes.Address, tokensAndAmount []cciptypes.TokenAmount) (*big.Int, error) { + sum := big.NewInt(0) + for i := 0; i < len(tokensAndAmount); i++ { + price, ok := destTokenPricesUSD[sourceToDest[tokensAndAmount[i].Token]] + if !ok { + // If we don't have a price for the token, we will assume it's worth 0. + lggr.Infof("No price for token %s, assuming 0", tokensAndAmount[i].Token) + continue + } + sum.Add(sum, new(big.Int).Quo(new(big.Int).Mul(price, tokensAndAmount[i].Amount), big.NewInt(1e18))) + } + return sum, nil +} + +func updateBatchContext( + batchCtx *BatchContext, + msg cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta, + messageMaxGas uint64, + msgValue *big.Int, + msgLggr logger.Logger) { + batchCtx.availableGas -= messageMaxGas + batchCtx.availableDataLen -= len(msg.Data) + batchCtx.aggregateTokenLimit.Sub(batchCtx.aggregateTokenLimit, msgValue) + if msg.Nonce > 0 { + batchCtx.expectedNonces[msg.Sender] = msg.Nonce + 1 + } + + msgLggr.Infow( + "Message successfully added to execution batch", + "nonce", msg.Nonce, + "sender", msg.Sender, + "value", msgValue, + "availableAggrTokenLimit", batchCtx.aggregateTokenLimit, + "availableGas", batchCtx.availableGas, + "availableDataLen", batchCtx.availableDataLen, + ) +} + +func hasEnoughTokens(tokenLimit *big.Int, msgValue *big.Int, inflightValue *big.Int) (*big.Int, bool) { + tokensLeft := big.NewInt(0).Sub(tokenLimit, inflightValue) + return tokensLeft, tokensLeft.Cmp(msgValue) >= 0 +} + func buildExecutionReportForMessages( msgsInRoot []cciptypes.EVM2EVMMessageWithTxMeta, tree *merklemulti.Tree[[32]byte], @@ -138,6 +475,7 @@ func getCommitReportForSeqNum(ctx context.Context, commitStoreReader ccipdata.Co type messageStatus string const ( + SuccesfullyValidated messageStatus = "successfully_validated" AlreadyExecuted messageStatus = "already_executed" SenderAlreadySkipped messageStatus = "sender_already_skipped" MessageMaxGasCalcError messageStatus = "message_max_gas_calc_error" @@ -153,8 +491,15 @@ const ( TokenNotInSrcTokenPrices messageStatus = "token_not_in_src_token_prices" InsufficientRemainingFee messageStatus = "insufficient_remaining_fee" AddedToBatch messageStatus = "added_to_batch" + TXMCheckError messageStatus = "txm_check_error" + TXMFatalStatus messageStatus = "txm_fatal_status" + SkippedInflight messageStatus = "skipped_inflight" ) +func (m messageStatus) shouldBeSkipped() bool { + return m != SuccesfullyValidated +} + type messageExecStatus struct { SeqNr uint64 MessageId string diff --git a/core/services/ocr2/plugins/ccip/ccipexec/batching_test.go b/core/services/ocr2/plugins/ccip/ccipexec/batching_test.go index dc3594c820..3647556a6d 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/batching_test.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/batching_test.go @@ -1,13 +1,62 @@ package ccipexec import ( + "bytes" + "context" + "encoding/binary" + "math" + "math/big" + "reflect" "testing" + "time" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/smartcontractkit/chainlink-common/pkg/types" cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcalc" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/prices" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata" + mockstatuschecker "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/statuschecker/mocks" ) +type testCase struct { + name string + reqs []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta + inflight []InflightInternalExecutionReport + tokenLimit, destGasPrice, inflightAggregateValue *big.Int + srcPrices, dstPrices map[cciptypes.Address]*big.Int + offRampNoncesBySender map[cciptypes.Address]uint64 + srcToDestTokens map[cciptypes.Address]cciptypes.Address + expectedSeqNrs []ccip.ObservedMessage + expectedStates []messageExecStatus + statuschecker func(m *mockstatuschecker.CCIPTransactionStatusChecker) + skipGasPriceEstimator bool +} + +func Test_NewBatchingStrategy(t *testing.T) { + t.Parallel() + + mockStatusChecker := mockstatuschecker.NewCCIPTransactionStatusChecker(t) + + testCases := []int{0, 1, 2} + + for _, batchingStrategyId := range testCases { + factory, err := NewBatchingStrategy(uint32(batchingStrategyId), mockStatusChecker) + if batchingStrategyId == 2 { + assert.Error(t, err) + } else { + assert.NotNil(t, factory) + assert.NoError(t, err) + } + } +} + func Test_validateSendRequests(t *testing.T) { testCases := []struct { name string @@ -64,3 +113,798 @@ func Test_validateSendRequests(t *testing.T) { }) } } + +type delayedTokenDataWorker struct { + delay time.Duration + tokendata.Worker +} + +func (m delayedTokenDataWorker) GetMsgTokenData(ctx context.Context, msg cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta) ([][]byte, error) { + time.Sleep(m.delay) + return nil, ctx.Err() +} + +func TestExecutionReportingPlugin_getTokenDataWithCappedLatency(t *testing.T) { + testCases := []struct { + name string + allowedWaitingTime time.Duration + workerLatency time.Duration + expErr bool + }{ + { + name: "happy flow", + allowedWaitingTime: 10 * time.Millisecond, + workerLatency: time.Nanosecond, + expErr: false, + }, + { + name: "worker takes long to reply", + allowedWaitingTime: 10 * time.Millisecond, + workerLatency: 20 * time.Millisecond, + expErr: true, + }, + } + + ctx := testutils.Context(t) + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + tokenDataWorker := delayedTokenDataWorker{delay: tc.workerLatency} + + msg := cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{ + EVM2EVMMessage: cciptypes.EVM2EVMMessage{TokenAmounts: make([]cciptypes.TokenAmount, 1)}, + } + + _, _, err := getTokenDataWithTimeout(ctx, msg, tc.allowedWaitingTime, tokenDataWorker) + if tc.expErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + }) + } +} + +func TestBatchingStrategies(t *testing.T) { + sender1 := ccipcalc.HexToAddress("0xa") + destNative := ccipcalc.HexToAddress("0xb") + srcNative := ccipcalc.HexToAddress("0xc") + + msg1 := createTestMessage(1, sender1, 1, srcNative, big.NewInt(1e9), false, nil) + + msg2 := msg1 + msg2.Executed = true + + msg3 := msg1 + msg3.Executed = true + msg3.Finalized = true + + msg4 := msg1 + msg4.TokenAmounts = []cciptypes.TokenAmount{ + {Token: srcNative, Amount: big.NewInt(100)}, + } + + msg5 := msg4 + msg5.SequenceNumber = msg5.SequenceNumber + 1 + msg5.Nonce = msg5.Nonce + 1 + + zkMsg1 := createTestMessage(1, sender1, 0, srcNative, big.NewInt(1e9), false, nil) + zkMsg2 := createTestMessage(2, sender1, 0, srcNative, big.NewInt(1e9), false, nil) + zkMsg3 := createTestMessage(3, sender1, 0, srcNative, big.NewInt(1e9), false, nil) + zkMsg4 := createTestMessage(4, sender1, 0, srcNative, big.NewInt(1e9), false, nil) + + testCases := []testCase{ + { + name: "single message no tokens", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg1}, + inflight: []InflightInternalExecutionReport{}, + inflightAggregateValue: big.NewInt(0), + tokenLimit: big.NewInt(0), + destGasPrice: big.NewInt(10), + srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, + dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, + offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, + expectedSeqNrs: []ccip.ObservedMessage{{SeqNr: uint64(1)}}, + expectedStates: []messageExecStatus{newMessageExecState(msg1.SequenceNumber, msg1.MessageID, AddedToBatch)}, + }, + { + name: "gasPriceEstimator returns error", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg1}, + inflight: []InflightInternalExecutionReport{}, + inflightAggregateValue: big.NewInt(0), + tokenLimit: big.NewInt(0), + destGasPrice: big.NewInt(10), + srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, + dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, + offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, + }, + { + name: "executed non finalized messages should be skipped", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg2}, + inflight: []InflightInternalExecutionReport{}, + inflightAggregateValue: big.NewInt(0), + tokenLimit: big.NewInt(0), + destGasPrice: big.NewInt(10), + srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, + dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, + offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, + expectedStates: []messageExecStatus{newMessageExecState(msg2.SequenceNumber, msg2.MessageID, AlreadyExecuted)}, + skipGasPriceEstimator: true, + }, + { + name: "finalized executed log", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg3}, + inflight: []InflightInternalExecutionReport{}, + inflightAggregateValue: big.NewInt(0), + tokenLimit: big.NewInt(0), + destGasPrice: big.NewInt(10), + srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, + dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, + offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, + expectedStates: []messageExecStatus{newMessageExecState(msg3.SequenceNumber, msg3.MessageID, AlreadyExecuted)}, + skipGasPriceEstimator: true, + }, + { + name: "dst token price does not exist", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg1}, + inflight: []InflightInternalExecutionReport{}, + inflightAggregateValue: big.NewInt(0), + tokenLimit: big.NewInt(0), + destGasPrice: big.NewInt(10), + srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, + dstPrices: map[cciptypes.Address]*big.Int{}, + offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, + expectedStates: []messageExecStatus{newMessageExecState(msg1.SequenceNumber, msg1.MessageID, TokenNotInDestTokenPrices)}, + skipGasPriceEstimator: true, + }, + { + name: "src token price does not exist", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg1}, + inflight: []InflightInternalExecutionReport{}, + inflightAggregateValue: big.NewInt(0), + tokenLimit: big.NewInt(0), + destGasPrice: big.NewInt(10), + srcPrices: map[cciptypes.Address]*big.Int{}, + dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, + offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, + expectedStates: []messageExecStatus{newMessageExecState(msg1.SequenceNumber, msg1.MessageID, TokenNotInSrcTokenPrices)}, + skipGasPriceEstimator: true, + }, + { + name: "message with tokens is not executed if limit is reached", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg4}, + inflight: []InflightInternalExecutionReport{}, + inflightAggregateValue: big.NewInt(0), + tokenLimit: big.NewInt(99), + destGasPrice: big.NewInt(1), + srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1e18)}, + dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1e18)}, + srcToDestTokens: map[cciptypes.Address]cciptypes.Address{ + srcNative: destNative, + }, + offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, + expectedStates: []messageExecStatus{newMessageExecState(msg4.SequenceNumber, msg4.MessageID, AggregateTokenLimitExceeded)}, + skipGasPriceEstimator: true, + }, + { + name: "message with tokens is not executed if limit is reached when inflight is full", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg5}, + inflight: []InflightInternalExecutionReport{{createdAt: time.Now(), messages: []cciptypes.EVM2EVMMessage{msg4.EVM2EVMMessage}}}, + inflightAggregateValue: big.NewInt(100), + tokenLimit: big.NewInt(50), + destGasPrice: big.NewInt(10), + srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1e18)}, + dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1e18)}, + srcToDestTokens: map[cciptypes.Address]cciptypes.Address{ + srcNative: destNative, + }, + offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 1}, + expectedStates: []messageExecStatus{newMessageExecState(msg5.SequenceNumber, msg5.MessageID, AggregateTokenLimitExceeded)}, + skipGasPriceEstimator: true, + }, + { + name: "skip when nonce doesn't match chain value", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg1}, + inflight: []InflightInternalExecutionReport{}, + inflightAggregateValue: big.NewInt(0), + tokenLimit: big.NewInt(0), + destGasPrice: big.NewInt(10), + srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, + dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, + offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 123}, + expectedStates: []messageExecStatus{newMessageExecState(msg1.SequenceNumber, msg1.MessageID, InvalidNonce)}, + skipGasPriceEstimator: true, + }, + { + name: "skip when nonce not found", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg1}, + inflight: []InflightInternalExecutionReport{}, + inflightAggregateValue: big.NewInt(0), + tokenLimit: big.NewInt(0), + destGasPrice: big.NewInt(10), + srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, + dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, + offRampNoncesBySender: map[cciptypes.Address]uint64{}, + expectedStates: []messageExecStatus{newMessageExecState(msg1.SequenceNumber, msg1.MessageID, MissingNonce)}, + skipGasPriceEstimator: true, + }, + { + name: "unordered messages", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{ + { + EVM2EVMMessage: cciptypes.EVM2EVMMessage{ + SequenceNumber: 10, + FeeTokenAmount: big.NewInt(1e9), + Sender: sender1, + Nonce: 0, + GasLimit: big.NewInt(1), + Data: bytes.Repeat([]byte{'a'}, 1000), + FeeToken: srcNative, + MessageID: [32]byte{}, + }, + BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), + }, + }, + inflight: []InflightInternalExecutionReport{}, + inflightAggregateValue: big.NewInt(0), + tokenLimit: big.NewInt(0), + destGasPrice: big.NewInt(10), + srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, + dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, + offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, + expectedSeqNrs: []ccip.ObservedMessage{{SeqNr: uint64(10)}}, + expectedStates: []messageExecStatus{ + newMessageExecState(10, [32]byte{}, AddedToBatch), + }, + }, + { + name: "unordered messages not blocked by nonce", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{ + { + EVM2EVMMessage: cciptypes.EVM2EVMMessage{ + SequenceNumber: 9, + FeeTokenAmount: big.NewInt(1e9), + Sender: sender1, + Nonce: 5, + GasLimit: big.NewInt(1), + Data: bytes.Repeat([]byte{'a'}, 1000), + FeeToken: srcNative, + MessageID: [32]byte{}, + }, + BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), + }, + { + EVM2EVMMessage: cciptypes.EVM2EVMMessage{ + SequenceNumber: 10, + FeeTokenAmount: big.NewInt(1e9), + Sender: sender1, + Nonce: 0, + GasLimit: big.NewInt(1), + Data: bytes.Repeat([]byte{'a'}, 1000), + FeeToken: srcNative, + MessageID: [32]byte{}, + }, + BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), + }, + }, + inflight: []InflightInternalExecutionReport{}, + inflightAggregateValue: big.NewInt(0), + tokenLimit: big.NewInt(0), + destGasPrice: big.NewInt(10), + srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, + dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, + offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 3}, + expectedSeqNrs: []ccip.ObservedMessage{{SeqNr: uint64(10)}}, + expectedStates: []messageExecStatus{ + newMessageExecState(9, [32]byte{}, InvalidNonce), + newMessageExecState(10, [32]byte{}, AddedToBatch), + }, + }, + } + + bestEffortTestCases := []testCase{ + { + name: "skip when batch gas limit is reached", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{ + { + EVM2EVMMessage: cciptypes.EVM2EVMMessage{ + SequenceNumber: 10, + FeeTokenAmount: big.NewInt(1e9), + Sender: sender1, + Nonce: 1, + GasLimit: big.NewInt(1), + Data: bytes.Repeat([]byte{'a'}, 1000), + FeeToken: srcNative, + MessageID: [32]byte{}, + }, + BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), + }, + { + EVM2EVMMessage: cciptypes.EVM2EVMMessage{ + SequenceNumber: 11, + FeeTokenAmount: big.NewInt(1e9), + Sender: sender1, + Nonce: 2, + GasLimit: big.NewInt(math.MaxInt64), + Data: bytes.Repeat([]byte{'a'}, 1000), + FeeToken: srcNative, + MessageID: [32]byte{}, + }, + BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), + }, + { + EVM2EVMMessage: cciptypes.EVM2EVMMessage{ + SequenceNumber: 12, + FeeTokenAmount: big.NewInt(1e9), + Sender: sender1, + Nonce: 3, + GasLimit: big.NewInt(1), + Data: bytes.Repeat([]byte{'a'}, 1000), + FeeToken: srcNative, + MessageID: [32]byte{}, + }, + BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), + }, + }, + inflight: []InflightInternalExecutionReport{}, + inflightAggregateValue: big.NewInt(0), + tokenLimit: big.NewInt(0), + destGasPrice: big.NewInt(10), + srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, + dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, + offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, + expectedSeqNrs: []ccip.ObservedMessage{{SeqNr: uint64(10)}}, + expectedStates: []messageExecStatus{ + newMessageExecState(10, [32]byte{}, AddedToBatch), + newMessageExecState(11, [32]byte{}, InsufficientRemainingBatchGas), + newMessageExecState(12, [32]byte{}, InvalidNonce), + }, + }, + { + name: "some messages skipped after hitting max batch data len", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{ + { + EVM2EVMMessage: cciptypes.EVM2EVMMessage{ + SequenceNumber: 10, + FeeTokenAmount: big.NewInt(1e9), + Sender: sender1, + Nonce: 1, + GasLimit: big.NewInt(1), + Data: bytes.Repeat([]byte{'a'}, 1000), + FeeToken: srcNative, + MessageID: [32]byte{}, + }, + BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), + }, + { + EVM2EVMMessage: cciptypes.EVM2EVMMessage{ + SequenceNumber: 11, + FeeTokenAmount: big.NewInt(1e9), + Sender: sender1, + Nonce: 2, + GasLimit: big.NewInt(1), + Data: bytes.Repeat([]byte{'a'}, MaxDataLenPerBatch-500), // skipped from batch + FeeToken: srcNative, + MessageID: [32]byte{}, + }, + BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), + }, + { + EVM2EVMMessage: cciptypes.EVM2EVMMessage{ + SequenceNumber: 12, + FeeTokenAmount: big.NewInt(1e9), + Sender: sender1, + Nonce: 3, + GasLimit: big.NewInt(1), + Data: bytes.Repeat([]byte{'a'}, 1000), + FeeToken: srcNative, + MessageID: [32]byte{}, + }, + BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), + }, + }, + inflight: []InflightInternalExecutionReport{}, + inflightAggregateValue: big.NewInt(0), + tokenLimit: big.NewInt(0), + destGasPrice: big.NewInt(10), + srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, + dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, + offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, + expectedSeqNrs: []ccip.ObservedMessage{{SeqNr: uint64(10)}}, + expectedStates: []messageExecStatus{ + newMessageExecState(10, [32]byte{}, AddedToBatch), + newMessageExecState(11, [32]byte{}, InsufficientRemainingBatchDataLength), + newMessageExecState(12, [32]byte{}, InvalidNonce), + }, + }, + { + name: "unordered messages then ordered messages", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{ + { + EVM2EVMMessage: cciptypes.EVM2EVMMessage{ + SequenceNumber: 9, + FeeTokenAmount: big.NewInt(1e9), + Sender: sender1, + Nonce: 0, + GasLimit: big.NewInt(1), + Data: bytes.Repeat([]byte{'a'}, 1000), + FeeToken: srcNative, + MessageID: [32]byte{}, + }, + BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), + }, + { + EVM2EVMMessage: cciptypes.EVM2EVMMessage{ + SequenceNumber: 10, + FeeTokenAmount: big.NewInt(1e9), + Sender: sender1, + Nonce: 5, + GasLimit: big.NewInt(1), + Data: bytes.Repeat([]byte{'a'}, 1000), + FeeToken: srcNative, + MessageID: [32]byte{}, + }, + BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), + }, + }, + inflight: []InflightInternalExecutionReport{}, + inflightAggregateValue: big.NewInt(0), + tokenLimit: big.NewInt(0), + destGasPrice: big.NewInt(10), + srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, + dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, + offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 4}, + expectedSeqNrs: []ccip.ObservedMessage{{SeqNr: uint64(9)}, {SeqNr: uint64(10)}}, + expectedStates: []messageExecStatus{ + newMessageExecState(9, [32]byte{}, AddedToBatch), + newMessageExecState(10, [32]byte{}, AddedToBatch), + }, + }, + } + + specificZkOverflowTestCases := []testCase{ + { + name: "batch size is 1", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{zkMsg1, zkMsg2, zkMsg3}, + inflight: []InflightInternalExecutionReport{}, + inflightAggregateValue: big.NewInt(0), + tokenLimit: big.NewInt(0), + destGasPrice: big.NewInt(10), + srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, + dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, + offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, + expectedSeqNrs: []ccip.ObservedMessage{{SeqNr: zkMsg1.SequenceNumber}}, + expectedStates: []messageExecStatus{ + newMessageExecState(zkMsg1.SequenceNumber, zkMsg1.MessageID, AddedToBatch), + }, + statuschecker: func(m *mockstatuschecker.CCIPTransactionStatusChecker) { + m.Mock = mock.Mock{} // reset mock + m.On("CheckMessageStatus", mock.Anything, mock.Anything).Return([]types.TransactionStatus{}, -1, nil) + }, + }, + { + name: "snooze fatal message and return empty batch", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{zkMsg1}, + inflight: []InflightInternalExecutionReport{}, + inflightAggregateValue: big.NewInt(0), + tokenLimit: big.NewInt(0), + destGasPrice: big.NewInt(10), + srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, + dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, + offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, + expectedStates: []messageExecStatus{ + newMessageExecState(zkMsg1.SequenceNumber, zkMsg1.MessageID, TXMFatalStatus), + }, + statuschecker: func(m *mockstatuschecker.CCIPTransactionStatusChecker) { + m.Mock = mock.Mock{} // reset mock + m.On("CheckMessageStatus", mock.Anything, zkMsg1.MessageID.String()).Return([]types.TransactionStatus{types.Fatal}, 0, nil) + }, + skipGasPriceEstimator: true, + }, + { + name: "snooze fatal message and add next message to batch", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{zkMsg1, zkMsg2}, + inflight: []InflightInternalExecutionReport{}, + inflightAggregateValue: big.NewInt(0), + tokenLimit: big.NewInt(0), + destGasPrice: big.NewInt(10), + srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, + dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, + offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, + expectedSeqNrs: []ccip.ObservedMessage{{SeqNr: zkMsg2.SequenceNumber}}, + expectedStates: []messageExecStatus{ + newMessageExecState(zkMsg1.SequenceNumber, zkMsg1.MessageID, TXMFatalStatus), + newMessageExecState(zkMsg2.SequenceNumber, zkMsg2.MessageID, AddedToBatch), + }, + statuschecker: func(m *mockstatuschecker.CCIPTransactionStatusChecker) { + m.Mock = mock.Mock{} // reset mock + m.On("CheckMessageStatus", mock.Anything, zkMsg1.MessageID.String()).Return([]types.TransactionStatus{types.Fatal}, 0, nil) + m.On("CheckMessageStatus", mock.Anything, zkMsg2.MessageID.String()).Return([]types.TransactionStatus{}, -1, nil) + }, + }, + { + name: "all messages are fatal and batch is empty", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{zkMsg1, zkMsg2}, + inflight: []InflightInternalExecutionReport{}, + inflightAggregateValue: big.NewInt(0), + tokenLimit: big.NewInt(0), + destGasPrice: big.NewInt(10), + srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, + dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, + offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, + expectedStates: []messageExecStatus{ + newMessageExecState(zkMsg1.SequenceNumber, zkMsg1.MessageID, TXMFatalStatus), + newMessageExecState(zkMsg2.SequenceNumber, zkMsg2.MessageID, TXMFatalStatus), + }, + statuschecker: func(m *mockstatuschecker.CCIPTransactionStatusChecker) { + m.Mock = mock.Mock{} // reset mock + m.On("CheckMessageStatus", mock.Anything, zkMsg1.MessageID.String()).Return([]types.TransactionStatus{types.Fatal}, 0, nil) + m.On("CheckMessageStatus", mock.Anything, zkMsg2.MessageID.String()).Return([]types.TransactionStatus{types.Fatal}, 0, nil) + }, + skipGasPriceEstimator: true, + }, + { + name: "message batched when unconfirmed or failed", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{zkMsg1, zkMsg2}, + inflight: []InflightInternalExecutionReport{}, + inflightAggregateValue: big.NewInt(0), + tokenLimit: big.NewInt(0), + destGasPrice: big.NewInt(10), + srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, + dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, + offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, + expectedSeqNrs: []ccip.ObservedMessage{{SeqNr: zkMsg1.SequenceNumber}}, + expectedStates: []messageExecStatus{ + newMessageExecState(zkMsg1.SequenceNumber, zkMsg1.MessageID, AddedToBatch), + }, + statuschecker: func(m *mockstatuschecker.CCIPTransactionStatusChecker) { + m.Mock = mock.Mock{} // reset mock + m.On("CheckMessageStatus", mock.Anything, zkMsg1.MessageID.String()).Return([]types.TransactionStatus{types.Unconfirmed, types.Failed}, 1, nil) + }, + }, + { + name: "message snoozed when multiple statuses with fatal", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{zkMsg1, zkMsg2}, + inflight: []InflightInternalExecutionReport{}, + inflightAggregateValue: big.NewInt(0), + tokenLimit: big.NewInt(0), + destGasPrice: big.NewInt(10), + srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, + dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, + offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, + expectedSeqNrs: []ccip.ObservedMessage{{SeqNr: zkMsg2.SequenceNumber}}, + expectedStates: []messageExecStatus{ + newMessageExecState(zkMsg1.SequenceNumber, zkMsg1.MessageID, TXMFatalStatus), + newMessageExecState(zkMsg2.SequenceNumber, zkMsg2.MessageID, AddedToBatch), + }, + statuschecker: func(m *mockstatuschecker.CCIPTransactionStatusChecker) { + m.Mock = mock.Mock{} // reset mock + m.On("CheckMessageStatus", mock.Anything, zkMsg1.MessageID.String()).Return([]types.TransactionStatus{types.Unconfirmed, types.Failed, types.Fatal}, 2, nil) + m.On("CheckMessageStatus", mock.Anything, zkMsg2.MessageID.String()).Return([]types.TransactionStatus{}, -1, nil) + }, + }, + { + name: "txm return error for message", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{zkMsg1, zkMsg2}, + inflight: []InflightInternalExecutionReport{}, + inflightAggregateValue: big.NewInt(0), + tokenLimit: big.NewInt(0), + destGasPrice: big.NewInt(10), + srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, + dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, + offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, + expectedSeqNrs: []ccip.ObservedMessage{{SeqNr: zkMsg2.SequenceNumber}}, + expectedStates: []messageExecStatus{ + newMessageExecState(zkMsg1.SequenceNumber, zkMsg1.MessageID, TXMCheckError), + newMessageExecState(zkMsg2.SequenceNumber, zkMsg2.MessageID, AddedToBatch), + }, + statuschecker: func(m *mockstatuschecker.CCIPTransactionStatusChecker) { + m.Mock = mock.Mock{} // reset mock + m.On("CheckMessageStatus", mock.Anything, zkMsg1.MessageID.String()).Return([]types.TransactionStatus{}, -1, errors.New("dummy txm error")) + m.On("CheckMessageStatus", mock.Anything, zkMsg2.MessageID.String()).Return([]types.TransactionStatus{}, -1, nil) + }, + }, + { + name: "snooze message when inflight", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{zkMsg1}, + inflight: createInflight(zkMsg1), + inflightAggregateValue: zkMsg1.FeeTokenAmount, + tokenLimit: big.NewInt(0), + destGasPrice: big.NewInt(10), + srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, + dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, + offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, + expectedStates: []messageExecStatus{ + newMessageExecState(zkMsg1.SequenceNumber, zkMsg1.MessageID, SkippedInflight), + }, + skipGasPriceEstimator: true, + }, + { + name: "snooze when not inflight but txm returns error", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{zkMsg1}, + inflight: []InflightInternalExecutionReport{}, + inflightAggregateValue: big.NewInt(0), + tokenLimit: big.NewInt(0), + destGasPrice: big.NewInt(10), + srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, + dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, + offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, + expectedStates: []messageExecStatus{ + newMessageExecState(zkMsg1.SequenceNumber, zkMsg1.MessageID, TXMCheckError), + }, + statuschecker: func(m *mockstatuschecker.CCIPTransactionStatusChecker) { + m.Mock = mock.Mock{} // reset mock + m.On("CheckMessageStatus", mock.Anything, zkMsg1.MessageID.String()).Return([]types.TransactionStatus{}, -1, errors.New("dummy txm error")) + }, + skipGasPriceEstimator: true, + }, + { + name: "snooze when not inflight but txm returns fatal status", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{zkMsg1}, + inflight: []InflightInternalExecutionReport{}, + inflightAggregateValue: big.NewInt(0), + tokenLimit: big.NewInt(0), + destGasPrice: big.NewInt(10), + srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, + dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, + offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, + expectedStates: []messageExecStatus{ + newMessageExecState(zkMsg1.SequenceNumber, zkMsg1.MessageID, TXMFatalStatus), + }, + statuschecker: func(m *mockstatuschecker.CCIPTransactionStatusChecker) { + m.Mock = mock.Mock{} // reset mock + m.On("CheckMessageStatus", mock.Anything, zkMsg1.MessageID.String()).Return([]types.TransactionStatus{types.Unconfirmed, types.Failed, types.Fatal}, 2, nil) + }, + skipGasPriceEstimator: true, + }, + { + name: "snooze messages when inflight but batch valid messages", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{zkMsg1, zkMsg2, zkMsg3, zkMsg4}, + inflight: createInflight(zkMsg1, zkMsg2), + inflightAggregateValue: big.NewInt(0), + tokenLimit: big.NewInt(0), + destGasPrice: big.NewInt(10), + srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, + dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, + offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, + expectedSeqNrs: []ccip.ObservedMessage{{SeqNr: zkMsg3.SequenceNumber}}, + expectedStates: []messageExecStatus{ + newMessageExecState(zkMsg1.SequenceNumber, zkMsg1.MessageID, SkippedInflight), + newMessageExecState(zkMsg2.SequenceNumber, zkMsg2.MessageID, SkippedInflight), + newMessageExecState(zkMsg3.SequenceNumber, zkMsg3.MessageID, AddedToBatch), + }, + statuschecker: func(m *mockstatuschecker.CCIPTransactionStatusChecker) { + m.Mock = mock.Mock{} // reset mock + m.On("CheckMessageStatus", mock.Anything, zkMsg3.MessageID.String()).Return([]types.TransactionStatus{}, -1, nil) + }, + skipGasPriceEstimator: false, + }, + } + + t.Run("BestEffortBatchingStrategy", func(t *testing.T) { + strategy := &BestEffortBatchingStrategy{} + runBatchingStrategyTests(t, strategy, 1_000_000, append(testCases, bestEffortTestCases...)) + }) + + t.Run("ZKOverflowBatchingStrategy", func(t *testing.T) { + mockedStatusChecker := mockstatuschecker.NewCCIPTransactionStatusChecker(t) + strategy := &ZKOverflowBatchingStrategy{ + statuschecker: mockedStatusChecker, + } + runBatchingStrategyTests(t, strategy, 1_000_000, append(testCases, specificZkOverflowTestCases...)) + }) +} + +// Function to set up and run tests for a given batching strategy +func runBatchingStrategyTests(t *testing.T, strategy BatchingStrategy, availableGas uint64, testCases []testCase) { + destNative := ccipcalc.HexToAddress("0xb") + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + lggr := logger.TestLogger(t) + + gasPriceEstimator := prices.NewMockGasPriceEstimatorExec(t) + if !tc.skipGasPriceEstimator { + if tc.expectedSeqNrs != nil { + gasPriceEstimator.On("EstimateMsgCostUSD", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(big.NewInt(0), nil) + } else { + gasPriceEstimator.On("EstimateMsgCostUSD", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(big.NewInt(0), errors.New("error")) + } + } + + // default case for ZKOverflowBatchingStrategy + if strategyType := reflect.TypeOf(strategy); tc.statuschecker == nil && strategyType == reflect.TypeOf(&ZKOverflowBatchingStrategy{}) { + strategy.(*ZKOverflowBatchingStrategy).statuschecker.(*mockstatuschecker.CCIPTransactionStatusChecker).On("CheckMessageStatus", mock.Anything, mock.Anything).Return([]types.TransactionStatus{}, -1, nil) + } + + // Mock calls to TXM + if tc.statuschecker != nil { + tc.statuschecker(strategy.(*ZKOverflowBatchingStrategy).statuschecker.(*mockstatuschecker.CCIPTransactionStatusChecker)) + } + + batchContext := &BatchContext{ + report: commitReportWithSendRequests{sendRequestsWithMeta: tc.reqs}, + inflight: tc.inflight, + inflightAggregateValue: tc.inflightAggregateValue, + lggr: lggr, + availableDataLen: MaxDataLenPerBatch, + availableGas: availableGas, + expectedNonces: make(map[cciptypes.Address]uint64), + sendersNonce: tc.offRampNoncesBySender, + sourceTokenPricesUSD: tc.srcPrices, + destTokenPricesUSD: tc.dstPrices, + gasPrice: tc.destGasPrice, + sourceToDestToken: tc.srcToDestTokens, + aggregateTokenLimit: tc.tokenLimit, + tokenDataRemainingDuration: 5 * time.Second, + tokenDataWorker: tokendata.NewBackgroundWorker(map[cciptypes.Address]tokendata.Reader{}, 10, 5*time.Second, time.Hour), + gasPriceEstimator: gasPriceEstimator, + destWrappedNative: destNative, + offchainConfig: cciptypes.ExecOffchainConfig{ + DestOptimisticConfirmations: 1, + BatchGasLimit: 300_000, + RelativeBoostPerWaitHour: 1, + }, + } + + seqNrs, execStates := strategy.BuildBatch(context.Background(), batchContext) + + runAssertions(t, tc, seqNrs, execStates) + }) + } +} + +// Utility function to run common assertions +func runAssertions(t *testing.T, tc testCase, seqNrs []ccip.ObservedMessage, execStates []messageExecStatus) { + if tc.expectedSeqNrs == nil { + assert.Len(t, seqNrs, 0) + } else { + assert.Equal(t, tc.expectedSeqNrs, seqNrs) + } + + if tc.expectedStates == nil { + assert.Len(t, execStates, 0) + } else { + assert.Equal(t, tc.expectedStates, execStates) + } +} + +func createTestMessage(seqNr uint64, sender cciptypes.Address, nonce uint64, feeToken cciptypes.Address, feeAmount *big.Int, executed bool, data []byte) cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta { + return cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{ + EVM2EVMMessage: cciptypes.EVM2EVMMessage{ + SequenceNumber: seqNr, + FeeTokenAmount: feeAmount, + Sender: sender, + Nonce: nonce, + GasLimit: big.NewInt(1), + Strict: false, + Receiver: "", + Data: data, + TokenAmounts: nil, + FeeToken: feeToken, + MessageID: generateMessageIDFromInt(seqNr), + }, + BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), + Executed: executed, + } +} + +func generateMessageIDFromInt(input uint64) [32]byte { + var messageID [32]byte + binary.LittleEndian.PutUint32(messageID[:], uint32(input)) + return messageID +} + +func createInflight(msgs ...cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta) []InflightInternalExecutionReport { + reports := make([]InflightInternalExecutionReport, len(msgs)) + + for i, msg := range msgs { + reports[i] = InflightInternalExecutionReport{ + messages: []cciptypes.EVM2EVMMessage{msg.EVM2EVMMessage}, + createdAt: msg.BlockTimestamp, + } + } + + return reports +} diff --git a/core/services/ocr2/plugins/ccip/ccipexec/factory.go b/core/services/ocr2/plugins/ccip/ccipexec/factory.go index 1a18793a83..7cc9111f37 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/factory.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/factory.go @@ -111,6 +111,11 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPluginFn(config types.Rep return reportingPluginAndInfo{}, fmt.Errorf("get onchain config from offramp: %w", err) } + batchingStrategy, err := NewBatchingStrategy(offchainConfig.BatchingStrategyID, rf.config.txmStatusChecker) + if err != nil { + return reportingPluginAndInfo{}, fmt.Errorf("get batching strategy: %w", err) + } + msgVisibilityInterval := offchainConfig.MessageVisibilityInterval.Duration() if msgVisibilityInterval.Seconds() == 0 { rf.config.lggr.Info("MessageVisibilityInterval not set, falling back to PermissionLessExecutionThreshold") @@ -139,6 +144,7 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPluginFn(config types.Rep commitRootsCache: cache.NewCommitRootsCache(lggr, msgVisibilityInterval, offchainConfig.RootSnoozeTime.Duration()), metricsCollector: rf.config.metricsCollector, chainHealthcheck: rf.config.chainHealthcheck, + batchingStrategy: batchingStrategy, } pluginInfo := types.ReportingPluginInfo{ diff --git a/core/services/ocr2/plugins/ccip/ccipexec/initializers.go b/core/services/ocr2/plugins/ccip/ccipexec/initializers.go index dfd3d287af..7826f6058f 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/initializers.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/initializers.go @@ -20,6 +20,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/cache" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcalc" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/statuschecker" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -165,6 +166,7 @@ func NewExecServices(ctx context.Context, lggr logger.Logger, jb job.Job, srcPro metricsCollector: metricsCollector, chainHealthcheck: chainHealthcheck, newReportingPluginRetryConfig: defaultNewReportingPluginRetryConfig, + txmStatusChecker: statuschecker.NewTxmStatusChecker(dstProvider.GetTransactionStatus), }) argsNoPlugin.ReportingPluginFactory = promwrapper.NewPromFactory(wrappedPluginFactory, "CCIPExecution", jb.OCR2OracleSpec.Relay, big.NewInt(0).SetInt64(dstChainID)) diff --git a/core/services/ocr2/plugins/ccip/ccipexec/ocr2.go b/core/services/ocr2/plugins/ccip/ccipexec/ocr2.go index 3f594a423b..51c8c4e305 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/ocr2.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/ocr2.go @@ -17,7 +17,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/hashutil" cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" - "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/cache" @@ -27,6 +26,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/ccipdataprovider" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/prices" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/statuschecker" ) const ( @@ -63,6 +63,7 @@ type ExecutionPluginStaticConfig struct { metricsCollector ccip.PluginMetricsCollector chainHealthcheck cache.ChainHealthcheck newReportingPluginRetryConfig ccipdata.RetryConfig + txmStatusChecker statuschecker.CCIPTransactionStatusChecker } type ExecutionReportingPlugin struct { @@ -72,6 +73,8 @@ type ExecutionReportingPlugin struct { offchainConfig cciptypes.ExecOffchainConfig tokenDataWorker tokendata.Worker metricsCollector ccip.PluginMetricsCollector + batchingStrategy BatchingStrategy + // Source gasPriceEstimator prices.GasPriceEstimatorExec sourcePriceRegistry ccipdata.PriceRegistryReader @@ -79,8 +82,8 @@ type ExecutionReportingPlugin struct { sourcePriceRegistryLock sync.RWMutex sourceWrappedNativeToken cciptypes.Address onRampReader ccipdata.OnRampReader - // Dest + // Dest commitStoreReader ccipdata.CommitStoreReader destPriceRegistry ccipdata.PriceRegistryReader destWrappedNative cciptypes.Address @@ -206,17 +209,11 @@ func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context return nil, err } - inflightAggregateValue, err := getInflightAggregateRateLimit(lggr, inflight, tokenExecData.destTokenPrices, tokenExecData.sourceToDestTokens) - if err != nil { - lggr.Errorw("Unexpected error computing inflight values", "err", err) - return []ccip.ObservedMessage{}, nil - } - batch, msgExecStates := r.buildBatch( ctx, + inflight, rootLggr, rep, - inflightAggregateValue, tokenExecData.rateLimiterTokenBucket.Tokens, tokenExecData.sourceTokenPrices, tokenExecData.destTokenPrices, @@ -257,9 +254,9 @@ func (r *ExecutionReportingPlugin) getExecutedSeqNrsInRange(ctx context.Context, // profitability of execution. func (r *ExecutionReportingPlugin) buildBatch( ctx context.Context, + inflight []InflightInternalExecutionReport, lggr logger.Logger, report commitReportWithSendRequests, - inflightAggregateValue *big.Int, aggregateTokenLimit *big.Int, sourceTokenPricesUSD map[cciptypes.Address]*big.Int, destTokenPricesUSD map[cciptypes.Address]*big.Int, @@ -277,174 +274,34 @@ func (r *ExecutionReportingPlugin) buildBatch( return []ccip.ObservedMessage{}, []messageExecStatus{} } - availableGas := uint64(r.offchainConfig.BatchGasLimit) - expectedNonces := make(map[cciptypes.Address]uint64) - availableDataLen := MaxDataLenPerBatch - tokenDataRemainingDuration := MaximumAllowedTokenDataWaitTimePerBatch - batchBuilder := newBatchBuildContainer(len(report.sendRequestsWithMeta)) - - for _, msg := range report.sendRequestsWithMeta { - msgLggr := lggr.With("messageID", hexutil.Encode(msg.MessageID[:]), "seqNr", msg.SequenceNumber) - - if msg.Executed { - msgLggr.Infow("Skipping message - already executed") - batchBuilder.skip(msg, AlreadyExecuted) - continue - } - - if len(msg.Data) > availableDataLen { - msgLggr.Infow("Skipping message - insufficient remaining batch data length", "msgDataLen", len(msg.Data), "availableBatchDataLen", availableDataLen) - batchBuilder.skip(msg, InsufficientRemainingBatchDataLength) - continue - } - - messageMaxGas, err1 := calculateMessageMaxGas( - msg.GasLimit, - len(report.sendRequestsWithMeta), - len(msg.Data), - len(msg.TokenAmounts), - ) - if err1 != nil { - msgLggr.Errorw("Skipping message - message max gas calculation error", "err", err1) - batchBuilder.skip(msg, MessageMaxGasCalcError) - continue - } - - // Check sufficient gas in batch - if availableGas < messageMaxGas { - msgLggr.Infow("Skipping message - insufficient remaining batch gas limit", "availableGas", availableGas, "messageMaxGas", messageMaxGas) - batchBuilder.skip(msg, InsufficientRemainingBatchGas) - continue - } - - if _, ok := expectedNonces[msg.Sender]; !ok { - nonce, ok1 := sendersNonce[msg.Sender] - if !ok1 { - msgLggr.Errorw("Skipping message - missing nonce", "sender", msg.Sender) - batchBuilder.skip(msg, MissingNonce) - continue - } - expectedNonces[msg.Sender] = nonce + 1 - } - - // Check expected nonce is valid for sequenced messages. - // Sequenced messages have non-zero nonces. - if msg.Nonce > 0 && msg.Nonce != expectedNonces[msg.Sender] { - msgLggr.Warnw("Skipping message - invalid nonce", "have", msg.Nonce, "want", expectedNonces[msg.Sender]) - batchBuilder.skip(msg, InvalidNonce) - continue - } - - msgValue, err1 := aggregateTokenValue(lggr, destTokenPricesUSD, sourceToDestToken, msg.TokenAmounts) - if err1 != nil { - msgLggr.Errorw("Skipping message - aggregate token value compute error", "err", err1) - batchBuilder.skip(msg, AggregateTokenValueComputeError) - continue - } - - // if token limit is smaller than message value skip message - if tokensLeft, hasCapacity := hasEnoughTokens(aggregateTokenLimit, msgValue, inflightAggregateValue); !hasCapacity { - msgLggr.Warnw("Skipping message - aggregate token limit exceeded", "aggregateTokenLimit", tokensLeft.String(), "msgValue", msgValue.String()) - batchBuilder.skip(msg, AggregateTokenLimitExceeded) - continue - } - - tokenData, elapsed, err1 := r.getTokenDataWithTimeout(ctx, msg, tokenDataRemainingDuration) - tokenDataRemainingDuration -= elapsed - if err1 != nil { - if errors.Is(err1, tokendata.ErrNotReady) { - msgLggr.Warnw("Skipping message - token data not ready", "err", err1) - batchBuilder.skip(msg, TokenDataNotReady) - continue - } - msgLggr.Errorw("Skipping message - token data fetch error", "err", err1) - batchBuilder.skip(msg, TokenDataFetchError) - continue - } - - dstWrappedNativePrice, exists := destTokenPricesUSD[r.destWrappedNative] - if !exists { - msgLggr.Errorw("Skipping message - token not in destination token prices", "token", r.destWrappedNative) - batchBuilder.skip(msg, TokenNotInDestTokenPrices) - continue - } - - // calculating the source chain fee, dividing by 1e18 for denomination. - // For example: - // FeeToken=link; FeeTokenAmount=1e17 i.e. 0.1 link, price is 6e18 USD/link (1 USD = 1e18), - // availableFee is 1e17*6e18/1e18 = 6e17 = 0.6 USD - sourceFeeTokenPrice, exists := sourceTokenPricesUSD[msg.FeeToken] - if !exists { - msgLggr.Errorw("Skipping message - token not in source token prices", "token", msg.FeeToken) - batchBuilder.skip(msg, TokenNotInSrcTokenPrices) - continue - } - - // Fee boosting - execCostUsd, err1 := r.gasPriceEstimator.EstimateMsgCostUSD(gasPrice, dstWrappedNativePrice, msg) - if err1 != nil { - msgLggr.Errorw("Failed to estimate message cost USD", "err", err1) - return []ccip.ObservedMessage{}, []messageExecStatus{} - } - - availableFee := big.NewInt(0).Mul(msg.FeeTokenAmount, sourceFeeTokenPrice) - availableFee = availableFee.Div(availableFee, big.NewInt(1e18)) - availableFeeUsd := waitBoostedFee(time.Since(msg.BlockTimestamp), availableFee, r.offchainConfig.RelativeBoostPerWaitHour) - if availableFeeUsd.Cmp(execCostUsd) < 0 { - msgLggr.Infow( - "Skipping message - insufficient remaining fee", - "availableFeeUsd", availableFeeUsd, - "execCostUsd", execCostUsd, - "sourceBlockTimestamp", msg.BlockTimestamp, - "waitTime", time.Since(msg.BlockTimestamp), - "boost", r.offchainConfig.RelativeBoostPerWaitHour, - ) - batchBuilder.skip(msg, InsufficientRemainingFee) - continue - } - - availableGas -= messageMaxGas - availableDataLen -= len(msg.Data) - aggregateTokenLimit.Sub(aggregateTokenLimit, msgValue) - expectedNonces[msg.Sender] = msg.Nonce + 1 - batchBuilder.addToBatch(msg, tokenData) - - msgLggr.Infow( - "Message added to execution batch", - "nonce", msg.Nonce, - "sender", msg.Sender, - "value", msgValue, - "availableAggrTokenLimit", aggregateTokenLimit, - "availableGas", availableGas, - "availableDataLen", availableDataLen, - ) + inflightAggregateValue, err := getInflightAggregateRateLimit(lggr, inflight, destTokenPricesUSD, sourceToDestToken) + if err != nil { + lggr.Errorw("Unexpected error computing inflight values", "err", err) + return []ccip.ObservedMessage{}, nil } - return batchBuilder.batch, batchBuilder.statuses -} - -// getTokenDataWithCappedLatency gets the token data for the provided message. -// Stops and returns an error if more than allowedWaitingTime is passed. -func (r *ExecutionReportingPlugin) getTokenDataWithTimeout( - ctx context.Context, - msg cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta, - timeout time.Duration, -) ([][]byte, time.Duration, error) { - if len(msg.TokenAmounts) == 0 { - return nil, 0, nil - } - - ctxTimeout, cf := context.WithTimeout(ctx, timeout) - defer cf() - tStart := time.Now() - tokenData, err := r.tokenDataWorker.GetMsgTokenData(ctxTimeout, msg) - tDur := time.Since(tStart) - return tokenData, tDur, err -} - -func hasEnoughTokens(tokenLimit *big.Int, msgValue *big.Int, inflightValue *big.Int) (*big.Int, bool) { - tokensLeft := big.NewInt(0).Sub(tokenLimit, inflightValue) - return tokensLeft, tokensLeft.Cmp(msgValue) >= 0 + batchCtx := &BatchContext{ + report, + inflight, + inflightAggregateValue, + lggr, + MaxDataLenPerBatch, + uint64(r.offchainConfig.BatchGasLimit), + make(map[cciptypes.Address]uint64), + sendersNonce, + sourceTokenPricesUSD, + destTokenPricesUSD, + gasPrice, + sourceToDestToken, + aggregateTokenLimit, + MaximumAllowedTokenDataWaitTimePerBatch, + r.tokenDataWorker, + r.gasPriceEstimator, + r.destWrappedNative, + r.offchainConfig, + } + + return r.batchingStrategy.BuildBatch(ctx, batchCtx) } func calculateMessageMaxGas(gasLimit *big.Int, numRequests, dataLen, numTokens int) (uint64, error) { @@ -546,20 +403,6 @@ func (r *ExecutionReportingPlugin) getReportsWithSendRequests( return reportsWithSendReqs, nil } -func aggregateTokenValue(lggr logger.Logger, destTokenPricesUSD map[cciptypes.Address]*big.Int, sourceToDest map[cciptypes.Address]cciptypes.Address, tokensAndAmount []cciptypes.TokenAmount) (*big.Int, error) { - sum := big.NewInt(0) - for i := 0; i < len(tokensAndAmount); i++ { - price, ok := destTokenPricesUSD[sourceToDest[tokensAndAmount[i].Token]] - if !ok { - // If we don't have a price for the token, we will assume it's worth 0. - lggr.Infof("No price for token %s, assuming 0", tokensAndAmount[i].Token) - continue - } - sum.Add(sum, new(big.Int).Quo(new(big.Int).Mul(price, tokensAndAmount[i].Amount), big.NewInt(1e18))) - } - return sum, nil -} - // Assumes non-empty report. Messages to execute can span more than one report, but are assumed to be in order of increasing // sequence number. func (r *ExecutionReportingPlugin) buildReport(ctx context.Context, lggr logger.Logger, observedMessages []ccip.ObservedMessage) ([]byte, error) { diff --git a/core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go b/core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go index bbb59179aa..626448b0f2 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go @@ -218,6 +218,9 @@ func TestExecutionReportingPlugin_Observation(t *testing.T) { p.commitRootsCache = cache.NewCommitRootsCache(logger.TestLogger(t), time.Minute, time.Minute) p.chainHealthcheck = cache.NewChainHealthcheck(p.lggr, mockOnRampReader, commitStoreReader) + bs := &BestEffortBatchingStrategy{} + p.batchingStrategy = bs + _, err = p.Observation(ctx, types.ReportTimestamp{}, types.Query{}) if tc.expErr { assert.Error(t, err) @@ -458,401 +461,6 @@ func TestExecutionReportingPlugin_buildReport(t *testing.T) { assert.LessOrEqual(t, len(execReport), MaxExecutionReportLength, "built execution report length") } -func TestExecutionReportingPlugin_buildBatch(t *testing.T) { - offRamp, _ := testhelpers.NewFakeOffRamp(t) - lggr := logger.TestLogger(t) - - sender1 := ccipcalc.HexToAddress("0xa") - destNative := ccipcalc.HexToAddress("0xb") - srcNative := ccipcalc.HexToAddress("0xc") - - msg1 := cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{ - EVM2EVMMessage: cciptypes.EVM2EVMMessage{ - SequenceNumber: 1, - FeeTokenAmount: big.NewInt(1e9), - Sender: sender1, - Nonce: 1, - GasLimit: big.NewInt(1), - Strict: false, - Receiver: "", - Data: nil, - TokenAmounts: nil, - FeeToken: srcNative, - MessageID: [32]byte{}, - }, - BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), - } - - msg2 := msg1 - msg2.Executed = true - - msg3 := msg1 - msg3.Executed = true - msg3.Finalized = true - - msg4 := msg1 - msg4.TokenAmounts = []cciptypes.TokenAmount{ - {Token: srcNative, Amount: big.NewInt(100)}, - } - - msg5 := msg4 - msg5.SequenceNumber = msg5.SequenceNumber + 1 - msg5.Nonce = msg5.Nonce + 1 - - var tt = []struct { - name string - reqs []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta - inflight *big.Int - tokenRateLimitCapacity, destGasPrice *big.Int - srcPrices, dstPrices map[cciptypes.Address]*big.Int - offRampNoncesBySender map[cciptypes.Address]uint64 - srcToDestTokens map[cciptypes.Address]cciptypes.Address - expectedSeqNrs []ccip.ObservedMessage - expectedStates []messageExecStatus - }{ - { - name: "single message no tokens", - reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg1}, - inflight: big.NewInt(0), - tokenRateLimitCapacity: big.NewInt(0), - destGasPrice: big.NewInt(10), - srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, - dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, - offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, - expectedSeqNrs: []ccip.ObservedMessage{{SeqNr: uint64(1)}}, - expectedStates: []messageExecStatus{newMessageExecState(msg1.SequenceNumber, msg1.MessageID, AddedToBatch)}, - }, - { - name: "executed non finalized messages should be skipped", - reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg2}, - inflight: big.NewInt(0), - tokenRateLimitCapacity: big.NewInt(0), - destGasPrice: big.NewInt(10), - srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, - dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, - offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, - expectedStates: []messageExecStatus{newMessageExecState(msg2.SequenceNumber, msg2.MessageID, AlreadyExecuted)}, - }, - { - name: "finalized executed log", - reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg3}, - inflight: big.NewInt(0), - tokenRateLimitCapacity: big.NewInt(0), - destGasPrice: big.NewInt(10), - srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, - dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, - offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, - expectedStates: []messageExecStatus{newMessageExecState(msg3.SequenceNumber, msg3.MessageID, AlreadyExecuted)}, - }, - { - name: "dst token price does not exist", - reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg1}, - inflight: big.NewInt(0), - tokenRateLimitCapacity: big.NewInt(0), - destGasPrice: big.NewInt(10), - srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, - dstPrices: map[cciptypes.Address]*big.Int{}, - offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, - expectedStates: []messageExecStatus{newMessageExecState(msg1.SequenceNumber, msg1.MessageID, TokenNotInDestTokenPrices)}, - }, - { - name: "src token price does not exist", - reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg1}, - inflight: big.NewInt(0), - tokenRateLimitCapacity: big.NewInt(0), - destGasPrice: big.NewInt(10), - srcPrices: map[cciptypes.Address]*big.Int{}, - dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, - offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, - expectedStates: []messageExecStatus{newMessageExecState(msg1.SequenceNumber, msg1.MessageID, TokenNotInSrcTokenPrices)}, - }, - { - name: "message with tokens is not executed if limit is reached", - reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg4}, - inflight: big.NewInt(0), - tokenRateLimitCapacity: big.NewInt(2), - destGasPrice: big.NewInt(10), - srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1e18)}, - dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1e18)}, - srcToDestTokens: map[cciptypes.Address]cciptypes.Address{ - srcNative: destNative, - }, - offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, - expectedStates: []messageExecStatus{newMessageExecState(msg4.SequenceNumber, msg4.MessageID, AggregateTokenLimitExceeded)}, - }, - { - name: "message with tokens is not executed if limit is reached when inflight is full", - reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg5}, - inflight: new(big.Int).Mul(big.NewInt(1e18), big.NewInt(100)), - tokenRateLimitCapacity: big.NewInt(19), - destGasPrice: big.NewInt(10), - srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1e18)}, - dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1e18)}, - srcToDestTokens: map[cciptypes.Address]cciptypes.Address{ - srcNative: destNative, - }, - offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 1}, - expectedStates: []messageExecStatus{newMessageExecState(msg5.SequenceNumber, msg5.MessageID, AggregateTokenLimitExceeded)}, - }, - { - name: "skip when nonce doesn't match chain value", - reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg1}, - inflight: big.NewInt(0), - tokenRateLimitCapacity: big.NewInt(0), - destGasPrice: big.NewInt(10), - srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, - dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, - offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 123}, - expectedStates: []messageExecStatus{newMessageExecState(msg1.SequenceNumber, msg1.MessageID, InvalidNonce)}, - }, - { - name: "skip when nonce not found", - reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg1}, - inflight: big.NewInt(0), - tokenRateLimitCapacity: big.NewInt(0), - destGasPrice: big.NewInt(10), - srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, - dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, - offRampNoncesBySender: map[cciptypes.Address]uint64{}, - expectedStates: []messageExecStatus{newMessageExecState(msg1.SequenceNumber, msg1.MessageID, MissingNonce)}, - }, - { - name: "skip when batch gas limit is reached", - reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{ - { - EVM2EVMMessage: cciptypes.EVM2EVMMessage{ - SequenceNumber: 10, - FeeTokenAmount: big.NewInt(1e9), - Sender: sender1, - Nonce: 1, - GasLimit: big.NewInt(1), - Data: bytes.Repeat([]byte{'a'}, 1000), - FeeToken: srcNative, - MessageID: [32]byte{}, - }, - BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), - }, - { - EVM2EVMMessage: cciptypes.EVM2EVMMessage{ - SequenceNumber: 11, - FeeTokenAmount: big.NewInt(1e9), - Sender: sender1, - Nonce: 2, - GasLimit: big.NewInt(math.MaxInt64), - Data: bytes.Repeat([]byte{'a'}, 1000), - FeeToken: srcNative, - MessageID: [32]byte{}, - }, - BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), - }, - { - EVM2EVMMessage: cciptypes.EVM2EVMMessage{ - SequenceNumber: 12, - FeeTokenAmount: big.NewInt(1e9), - Sender: sender1, - Nonce: 3, - GasLimit: big.NewInt(1), - Data: bytes.Repeat([]byte{'a'}, 1000), - FeeToken: srcNative, - MessageID: [32]byte{}, - }, - BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), - }, - }, - inflight: big.NewInt(0), - tokenRateLimitCapacity: big.NewInt(0), - destGasPrice: big.NewInt(10), - srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, - dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, - offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, - expectedSeqNrs: []ccip.ObservedMessage{{SeqNr: uint64(10)}}, - expectedStates: []messageExecStatus{ - newMessageExecState(10, [32]byte{}, AddedToBatch), - newMessageExecState(11, [32]byte{}, InsufficientRemainingBatchGas), - newMessageExecState(12, [32]byte{}, InvalidNonce), - }, - }, - { - name: "some messages skipped after hitting max batch data len", - reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{ - { - EVM2EVMMessage: cciptypes.EVM2EVMMessage{ - SequenceNumber: 10, - FeeTokenAmount: big.NewInt(1e9), - Sender: sender1, - Nonce: 1, - GasLimit: big.NewInt(1), - Data: bytes.Repeat([]byte{'a'}, 1000), - FeeToken: srcNative, - MessageID: [32]byte{}, - }, - BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), - }, - { - EVM2EVMMessage: cciptypes.EVM2EVMMessage{ - SequenceNumber: 11, - FeeTokenAmount: big.NewInt(1e9), - Sender: sender1, - Nonce: 2, - GasLimit: big.NewInt(1), - Data: bytes.Repeat([]byte{'a'}, MaxDataLenPerBatch-500), // skipped from batch - FeeToken: srcNative, - MessageID: [32]byte{}, - }, - BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), - }, - { - EVM2EVMMessage: cciptypes.EVM2EVMMessage{ - SequenceNumber: 12, - FeeTokenAmount: big.NewInt(1e9), - Sender: sender1, - Nonce: 3, - GasLimit: big.NewInt(1), - Data: bytes.Repeat([]byte{'a'}, 1000), - FeeToken: srcNative, - MessageID: [32]byte{}, - }, - BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), - }, - }, - inflight: big.NewInt(0), - tokenRateLimitCapacity: big.NewInt(0), - destGasPrice: big.NewInt(10), - srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, - dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, - offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, - expectedSeqNrs: []ccip.ObservedMessage{{SeqNr: uint64(10)}}, - expectedStates: []messageExecStatus{ - newMessageExecState(10, [32]byte{}, AddedToBatch), - newMessageExecState(11, [32]byte{}, InsufficientRemainingBatchDataLength), - newMessageExecState(12, [32]byte{}, InvalidNonce), - }, - }, - { - name: "unordered messages", - reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{ - { - EVM2EVMMessage: cciptypes.EVM2EVMMessage{ - SequenceNumber: 10, - FeeTokenAmount: big.NewInt(1e9), - Sender: sender1, - Nonce: 0, - GasLimit: big.NewInt(1), - Data: bytes.Repeat([]byte{'a'}, 1000), - FeeToken: srcNative, - MessageID: [32]byte{}, - }, - BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), - }, - }, - inflight: big.NewInt(0), - tokenRateLimitCapacity: big.NewInt(0), - destGasPrice: big.NewInt(10), - srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, - dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, - offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0}, - expectedSeqNrs: []ccip.ObservedMessage{{SeqNr: uint64(10)}}, - expectedStates: []messageExecStatus{ - newMessageExecState(10, [32]byte{}, AddedToBatch), - }, - }, - { - name: "unordered messages not blocked by nonce", - reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{ - { - EVM2EVMMessage: cciptypes.EVM2EVMMessage{ - SequenceNumber: 9, - FeeTokenAmount: big.NewInt(1e9), - Sender: sender1, - Nonce: 5, - GasLimit: big.NewInt(1), - Data: bytes.Repeat([]byte{'a'}, 1000), - FeeToken: srcNative, - MessageID: [32]byte{}, - }, - BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), - }, - { - EVM2EVMMessage: cciptypes.EVM2EVMMessage{ - SequenceNumber: 10, - FeeTokenAmount: big.NewInt(1e9), - Sender: sender1, - Nonce: 0, - GasLimit: big.NewInt(1), - Data: bytes.Repeat([]byte{'a'}, 1000), - FeeToken: srcNative, - MessageID: [32]byte{}, - }, - BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), - }, - }, - inflight: big.NewInt(0), - tokenRateLimitCapacity: big.NewInt(0), - destGasPrice: big.NewInt(10), - srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, - dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)}, - offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 3}, - expectedSeqNrs: []ccip.ObservedMessage{{SeqNr: uint64(10)}}, - expectedStates: []messageExecStatus{ - newMessageExecState(9, [32]byte{}, InvalidNonce), - newMessageExecState(10, [32]byte{}, AddedToBatch), - }, - }, - } - - for _, tc := range tt { - tc := tc - t.Run(tc.name, func(t *testing.T) { - offRamp.SetSenderNonces(tc.offRampNoncesBySender) - - gasPriceEstimator := prices.NewMockGasPriceEstimatorExec(t) - if tc.expectedSeqNrs != nil { - gasPriceEstimator.On("EstimateMsgCostUSD", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(big.NewInt(0), nil) - } - - // Mock calls to reader. - mockOffRampReader := ccipdatamocks.NewOffRampReader(t) - mockOffRampReader.On("ListSenderNonces", mock.Anything, mock.Anything).Return(tc.offRampNoncesBySender, nil).Maybe() - - plugin := ExecutionReportingPlugin{ - tokenDataWorker: tokendata.NewBackgroundWorker(map[cciptypes.Address]tokendata.Reader{}, 10, 5*time.Second, time.Hour), - offRampReader: mockOffRampReader, - destWrappedNative: destNative, - offchainConfig: cciptypes.ExecOffchainConfig{ - DestOptimisticConfirmations: 1, - BatchGasLimit: 500_000, - RelativeBoostPerWaitHour: 1, - }, - lggr: logger.TestLogger(t), - gasPriceEstimator: gasPriceEstimator, - } - - seqNrs, execStates := plugin.buildBatch( - context.Background(), - lggr, - commitReportWithSendRequests{sendRequestsWithMeta: tc.reqs}, - tc.inflight, - tc.tokenRateLimitCapacity, - tc.srcPrices, - tc.dstPrices, - tc.destGasPrice, - tc.srcToDestTokens, - ) - if tc.expectedSeqNrs == nil { - assert.Len(t, seqNrs, 0) - } else { - assert.Equal(t, tc.expectedSeqNrs, seqNrs) - } - - if tc.expectedStates == nil { - assert.Len(t, execStates, 0) - } else { - assert.Equal(t, tc.expectedStates, execStates) - } - }) - } -} - func TestExecutionReportingPlugin_getReportsWithSendRequests(t *testing.T) { testCases := []struct { name string @@ -985,58 +593,6 @@ func TestExecutionReportingPlugin_getReportsWithSendRequests(t *testing.T) { } } -type delayedTokenDataWorker struct { - delay time.Duration - tokendata.Worker -} - -func (m delayedTokenDataWorker) GetMsgTokenData(ctx context.Context, msg cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta) ([][]byte, error) { - time.Sleep(m.delay) - return nil, ctx.Err() -} - -func TestExecutionReportingPlugin_getTokenDataWithCappedLatency(t *testing.T) { - testCases := []struct { - name string - allowedWaitingTime time.Duration - workerLatency time.Duration - expErr bool - }{ - { - name: "happy flow", - allowedWaitingTime: 10 * time.Millisecond, - workerLatency: time.Nanosecond, - expErr: false, - }, - { - name: "worker takes long to reply", - allowedWaitingTime: 10 * time.Millisecond, - workerLatency: 20 * time.Millisecond, - expErr: true, - }, - } - - ctx := testutils.Context(t) - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - p := &ExecutionReportingPlugin{} - p.tokenDataWorker = delayedTokenDataWorker{delay: tc.workerLatency} - - msg := cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{ - EVM2EVMMessage: cciptypes.EVM2EVMMessage{TokenAmounts: make([]cciptypes.TokenAmount, 1)}, - } - - _, _, err := p.getTokenDataWithTimeout(ctx, msg, tc.allowedWaitingTime) - if tc.expErr { - assert.Error(t, err) - return - } - assert.NoError(t, err) - }) - } -} - func Test_calculateObservedMessagesConsensus(t *testing.T) { type args struct { observations []ccip.ExecutionObservation diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/offramp.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/offramp.go index 410b91328a..137cbaf451 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/offramp.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/offramp.go @@ -109,6 +109,8 @@ type ExecOffchainConfig struct { InflightCacheExpiry config.Duration // See [ccipdata.ExecOffchainConfig.RootSnoozeTime] RootSnoozeTime config.Duration + // See [ccipdata.ExecOffchainConfig.BatchingStrategyID] + BatchingStrategyID uint32 // See [ccipdata.ExecOffchainConfig.MessageVisibilityInterval] MessageVisibilityInterval config.Duration } @@ -416,6 +418,7 @@ func (o *OffRamp) ChangeConfig(ctx context.Context, onchainConfigBytes []byte, o InflightCacheExpiry: offchainConfigParsed.InflightCacheExpiry, RootSnoozeTime: offchainConfigParsed.RootSnoozeTime, MessageVisibilityInterval: offchainConfigParsed.MessageVisibilityInterval, + BatchingStrategyID: offchainConfigParsed.BatchingStrategyID, } onchainConfig := cciptypes.ExecOnchainConfig{ PermissionLessExecutionThresholdSeconds: time.Second * time.Duration(onchainConfigParsed.PermissionLessExecutionThresholdSeconds), diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/offramp_test.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/offramp_test.go index 657617c34a..44fb6ca063 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/offramp_test.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/offramp_test.go @@ -94,6 +94,7 @@ func TestExecOffchainConfig100_AllFieldsRequired(t *testing.T) { RelativeBoostPerWaitHour: 0.07, InflightCacheExpiry: *config.MustNewDuration(64 * time.Second), RootSnoozeTime: *config.MustNewDuration(128 * time.Minute), + BatchingStrategyID: 0, } encoded, err := ccipconfig.EncodeOffchainConfig(&cfg) require.NoError(t, err) @@ -115,7 +116,11 @@ func TestExecOffchainConfig100_AllFieldsRequired(t *testing.T) { encodedPartialConfig, err := json.Marshal(partialConfig) require.NoError(t, err) _, err = ccipconfig.DecodeOffchainConfig[ExecOffchainConfig](encodedPartialConfig) - require.ErrorContains(t, err, keyToDelete) + if keyToDelete == "BatchingStrategyID" { + require.NoError(t, err) + } else { + require.ErrorContains(t, err, keyToDelete) + } } } diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/offramp.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/offramp.go index 94c9a45e26..fa00894b38 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/offramp.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/offramp.go @@ -94,6 +94,8 @@ type JSONExecOffchainConfig struct { InflightCacheExpiry config.Duration // See [ccipdata.ExecOffchainConfig.RootSnoozeTime] RootSnoozeTime config.Duration + // See [ccipdata.ExecOffchainConfig.BatchingStrategyID] + BatchingStrategyID uint32 // See [ccipdata.ExecOffchainConfig.MessageVisibilityInterval] MessageVisibilityInterval config.Duration } @@ -172,6 +174,7 @@ func (o *OffRamp) ChangeConfig(ctx context.Context, onchainConfigBytes []byte, o InflightCacheExpiry: offchainConfigParsed.InflightCacheExpiry, RootSnoozeTime: offchainConfigParsed.RootSnoozeTime, MessageVisibilityInterval: offchainConfigParsed.MessageVisibilityInterval, + BatchingStrategyID: offchainConfigParsed.BatchingStrategyID, } onchainConfig := cciptypes.ExecOnchainConfig{ PermissionLessExecutionThresholdSeconds: time.Second * time.Duration(onchainConfigParsed.PermissionLessExecutionThresholdSeconds), diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/offramp_test.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/offramp_test.go index df7f7ead59..7d174d5db7 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/offramp_test.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/offramp_test.go @@ -26,6 +26,7 @@ func TestExecOffchainConfig120_Encoding(t *testing.T) { RelativeBoostPerWaitHour: 0.07, InflightCacheExpiry: *config.MustNewDuration(64 * time.Second), RootSnoozeTime: *config.MustNewDuration(128 * time.Minute), + BatchingStrategyID: 0, } tests := []struct { @@ -79,6 +80,12 @@ func TestExecOffchainConfig120_Encoding(t *testing.T) { }), errPattern: "RootSnoozeTime", }, + { + name: "must set BatchingStrategyId", + want: modifyCopy(validConfig, func(c *JSONExecOffchainConfig) { + c.BatchingStrategyID = 1 + }), + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { @@ -125,18 +132,42 @@ func TestExecOffchainConfig120_ParseRawJson(t *testing.T) { "RootSnoozeTime": "128m" }`), }, + { + name: "with BatchingStrategyId", + config: []byte(`{ + "DestOptimisticConfirmations": 6, + "BatchGasLimit": 5000000, + "RelativeBoostPerWaitHour": 0.07, + "InflightCacheExpiry": "64s", + "RootSnoozeTime": "128m", + "BatchingStrategyId": 1 + }`), + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { decoded, err := ccipconfig.DecodeOffchainConfig[JSONExecOffchainConfig](tc.config) require.NoError(t, err) - require.Equal(t, JSONExecOffchainConfig{ - DestOptimisticConfirmations: 6, - BatchGasLimit: 5_000_000, - RelativeBoostPerWaitHour: 0.07, - InflightCacheExpiry: *config.MustNewDuration(64 * time.Second), - RootSnoozeTime: *config.MustNewDuration(128 * time.Minute), - }, decoded) + + if tc.name == "with BatchingStrategyId" { + require.Equal(t, JSONExecOffchainConfig{ + DestOptimisticConfirmations: 6, + BatchGasLimit: 5_000_000, + RelativeBoostPerWaitHour: 0.07, + InflightCacheExpiry: *config.MustNewDuration(64 * time.Second), + RootSnoozeTime: *config.MustNewDuration(128 * time.Minute), + BatchingStrategyID: 1, // Actual value + }, decoded) + } else { + require.Equal(t, JSONExecOffchainConfig{ + DestOptimisticConfirmations: 6, + BatchGasLimit: 5_000_000, + RelativeBoostPerWaitHour: 0.07, + InflightCacheExpiry: *config.MustNewDuration(64 * time.Second), + RootSnoozeTime: *config.MustNewDuration(128 * time.Minute), + BatchingStrategyID: 0, // Default + }, decoded) + } }) } } diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_5_0/offramp.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_5_0/offramp.go index 89d3047381..cac61c6787 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_5_0/offramp.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_5_0/offramp.go @@ -157,6 +157,7 @@ func (o *OffRamp) ChangeConfig(ctx context.Context, onchainConfigBytes []byte, o InflightCacheExpiry: offchainConfigParsed.InflightCacheExpiry, RootSnoozeTime: offchainConfigParsed.RootSnoozeTime, MessageVisibilityInterval: offchainConfigParsed.MessageVisibilityInterval, + BatchingStrategyID: offchainConfigParsed.BatchingStrategyID, } onchainConfig := cciptypes.ExecOnchainConfig{ PermissionLessExecutionThresholdSeconds: time.Second * time.Duration(onchainConfigParsed.PermissionLessExecutionThresholdSeconds), diff --git a/core/services/ocr2/plugins/ccip/transmitter/transmitter.go b/core/services/ocr2/plugins/ccip/transmitter/transmitter.go new file mode 100644 index 0000000000..3e2962b33a --- /dev/null +++ b/core/services/ocr2/plugins/ccip/transmitter/transmitter.go @@ -0,0 +1,143 @@ +package transmitter + +import ( + "context" + "fmt" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" + + commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" + statuschecker "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/statuschecker" +) + +type roundRobinKeystore interface { + GetRoundRobinAddress(ctx context.Context, chainID *big.Int, addresses ...common.Address) (address common.Address, err error) +} + +type txManager interface { + CreateTransaction(ctx context.Context, txRequest txmgr.TxRequest) (tx txmgr.Tx, err error) + GetTransactionStatus(ctx context.Context, transactionID string) (state commontypes.TransactionStatus, err error) +} + +type Transmitter interface { + CreateEthTransaction(ctx context.Context, toAddress common.Address, payload []byte, txMeta *txmgr.TxMeta) error + FromAddress() common.Address +} + +type transmitter struct { + txm txManager + fromAddresses []common.Address + gasLimit uint64 + effectiveTransmitterAddress common.Address + strategy types.TxStrategy + checker txmgr.TransmitCheckerSpec + chainID *big.Int + keystore roundRobinKeystore + statuschecker statuschecker.CCIPTransactionStatusChecker // Used for CCIP's idempotency key generation +} + +// NewTransmitter creates a new eth transmitter +func NewTransmitter( + txm txManager, + fromAddresses []common.Address, + gasLimit uint64, + effectiveTransmitterAddress common.Address, + strategy types.TxStrategy, + checker txmgr.TransmitCheckerSpec, + chainID *big.Int, + keystore roundRobinKeystore, +) (Transmitter, error) { + // Ensure that a keystore is provided. + if keystore == nil { + return nil, errors.New("nil keystore provided to transmitter") + } + + return &transmitter{ + txm: txm, + fromAddresses: fromAddresses, + gasLimit: gasLimit, + effectiveTransmitterAddress: effectiveTransmitterAddress, + strategy: strategy, + checker: checker, + chainID: chainID, + keystore: keystore, + }, nil +} + +func NewTransmitterWithStatusChecker( + txm txManager, + fromAddresses []common.Address, + gasLimit uint64, + effectiveTransmitterAddress common.Address, + strategy types.TxStrategy, + checker txmgr.TransmitCheckerSpec, + chainID *big.Int, + keystore roundRobinKeystore, +) (Transmitter, error) { + t, err := NewTransmitter(txm, fromAddresses, gasLimit, effectiveTransmitterAddress, strategy, checker, chainID, keystore) + + if err != nil { + return nil, err + } + + transmitter, ok := t.(*transmitter) + if !ok { + return nil, errors.New("failed to type assert Transmitter to *transmitter") + } + transmitter.statuschecker = statuschecker.NewTxmStatusChecker(txm.GetTransactionStatus) + + return transmitter, nil +} + +func (t *transmitter) CreateEthTransaction(ctx context.Context, toAddress common.Address, payload []byte, txMeta *txmgr.TxMeta) error { + roundRobinFromAddress, err := t.keystore.GetRoundRobinAddress(ctx, t.chainID, t.fromAddresses...) + if err != nil { + return fmt.Errorf("skipped OCR transmission, error getting round-robin address: %w", err) + } + + var idempotencyKey *string + + // Define idempotency key for CCIP Execution Plugin + if len(txMeta.MessageIDs) == 1 && t.statuschecker != nil { + messageId := txMeta.MessageIDs[0] + _, count, err1 := t.statuschecker.CheckMessageStatus(ctx, messageId) + + if err1 != nil { + return errors.Wrap(err, "skipped OCR transmission, error getting message status") + } + idempotencyKey = func() *string { + s := fmt.Sprintf("%s-%d", messageId, count+1) + return &s + }() + } + + _, err = t.txm.CreateTransaction(ctx, txmgr.TxRequest{ + IdempotencyKey: idempotencyKey, + FromAddress: roundRobinFromAddress, + ToAddress: toAddress, + EncodedPayload: payload, + FeeLimit: t.gasLimit, + ForwarderAddress: t.forwarderAddress(), + Strategy: t.strategy, + Checker: t.checker, + Meta: txMeta, + }) + return errors.Wrap(err, "skipped OCR transmission") +} + +func (t *transmitter) FromAddress() common.Address { + return t.effectiveTransmitterAddress +} + +func (t *transmitter) forwarderAddress() common.Address { + for _, a := range t.fromAddresses { + if a == t.effectiveTransmitterAddress { + return common.Address{} + } + } + return t.effectiveTransmitterAddress +} diff --git a/core/services/ocr2/plugins/ccip/transmitter/transmitter_test.go b/core/services/ocr2/plugins/ccip/transmitter/transmitter_test.go new file mode 100644 index 0000000000..d177f1baa5 --- /dev/null +++ b/core/services/ocr2/plugins/ccip/transmitter/transmitter_test.go @@ -0,0 +1,282 @@ +package transmitter + +import ( + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" + + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" + "github.com/smartcontractkit/chainlink-common/pkg/types" + commontxmmocks "github.com/smartcontractkit/chainlink/v2/common/txmgr/types/mocks" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" + txmmocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr/mocks" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" + "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" + + "github.com/smartcontractkit/chainlink/v2/core/utils" +) + +var ( + FixtureChainID = *testutils.FixtureChainID + Password = testutils.Password +) + +func newMockTxStrategy(t *testing.T) *commontxmmocks.TxStrategy { + return commontxmmocks.NewTxStrategy(t) +} + +func Test_DefaultTransmitter_CreateEthTransaction(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + ethKeyStore := NewKeyStore(t, db).Eth() + + _, fromAddress := MustInsertRandomKey(t, ethKeyStore) + + gasLimit := uint64(1000) + chainID := big.NewInt(0) + effectiveTransmitterAddress := fromAddress + toAddress := testutils.NewAddress() + payload := []byte{1, 2, 3} + txm := txmmocks.NewMockEvmTxManager(t) + strategy := newMockTxStrategy(t) + + transmitter, err := ocrcommon.NewTransmitter( + txm, + []common.Address{fromAddress}, + gasLimit, + effectiveTransmitterAddress, + strategy, + txmgr.TransmitCheckerSpec{}, + chainID, + ethKeyStore, + ) + require.NoError(t, err) + + txm.On("CreateTransaction", mock.Anything, txmgr.TxRequest{ + FromAddress: fromAddress, + ToAddress: toAddress, + EncodedPayload: payload, + FeeLimit: gasLimit, + ForwarderAddress: common.Address{}, + Meta: nil, + Strategy: strategy, + }).Return(txmgr.Tx{}, nil).Once() + require.NoError(t, transmitter.CreateEthTransaction(testutils.Context(t), toAddress, payload, nil)) +} + +func Test_DefaultTransmitter_Forwarding_Enabled_CreateEthTransaction(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + ethKeyStore := NewKeyStore(t, db).Eth() + + _, fromAddress := MustInsertRandomKey(t, ethKeyStore) + _, fromAddress2 := MustInsertRandomKey(t, ethKeyStore) + + gasLimit := uint64(1000) + chainID := big.NewInt(0) + effectiveTransmitterAddress := common.Address{} + toAddress := testutils.NewAddress() + payload := []byte{1, 2, 3} + txm := txmmocks.NewMockEvmTxManager(t) + strategy := newMockTxStrategy(t) + + transmitter, err := ocrcommon.NewTransmitter( + txm, + []common.Address{fromAddress, fromAddress2}, + gasLimit, + effectiveTransmitterAddress, + strategy, + txmgr.TransmitCheckerSpec{}, + chainID, + ethKeyStore, + ) + require.NoError(t, err) + + txm.On("CreateTransaction", mock.Anything, txmgr.TxRequest{ + FromAddress: fromAddress, + ToAddress: toAddress, + EncodedPayload: payload, + FeeLimit: gasLimit, + ForwarderAddress: common.Address{}, + Meta: nil, + Strategy: strategy, + }).Return(txmgr.Tx{}, nil).Once() + txm.On("CreateTransaction", mock.Anything, txmgr.TxRequest{ + FromAddress: fromAddress2, + ToAddress: toAddress, + EncodedPayload: payload, + FeeLimit: gasLimit, + ForwarderAddress: common.Address{}, + Meta: nil, + Strategy: strategy, + }).Return(txmgr.Tx{}, nil).Once() + require.NoError(t, transmitter.CreateEthTransaction(testutils.Context(t), toAddress, payload, nil)) + require.NoError(t, transmitter.CreateEthTransaction(testutils.Context(t), toAddress, payload, nil)) +} + +func Test_DefaultTransmitter_Forwarding_Enabled_CreateEthTransaction_Round_Robin_Error(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + ethKeyStore := NewKeyStore(t, db).Eth() + + fromAddress := common.Address{} + + gasLimit := uint64(1000) + chainID := big.NewInt(0) + effectiveTransmitterAddress := common.Address{} + toAddress := testutils.NewAddress() + payload := []byte{1, 2, 3} + txm := txmmocks.NewMockEvmTxManager(t) + strategy := newMockTxStrategy(t) + + transmitter, err := ocrcommon.NewTransmitter( + txm, + []common.Address{fromAddress}, + gasLimit, + effectiveTransmitterAddress, + strategy, + txmgr.TransmitCheckerSpec{}, + chainID, + ethKeyStore, + ) + require.NoError(t, err) + require.Error(t, transmitter.CreateEthTransaction(testutils.Context(t), toAddress, payload, nil)) +} + +func Test_DefaultTransmitter_Forwarding_Enabled_CreateEthTransaction_No_Keystore_Error(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + ethKeyStore := NewKeyStore(t, db).Eth() + + _, fromAddress := MustInsertRandomKey(t, ethKeyStore) + _, fromAddress2 := MustInsertRandomKey(t, ethKeyStore) + + gasLimit := uint64(1000) + chainID := big.NewInt(0) + effectiveTransmitterAddress := common.Address{} + txm := txmmocks.NewMockEvmTxManager(t) + strategy := newMockTxStrategy(t) + + _, err := ocrcommon.NewTransmitter( + txm, + []common.Address{fromAddress, fromAddress2}, + gasLimit, + effectiveTransmitterAddress, + strategy, + txmgr.TransmitCheckerSpec{}, + chainID, + nil, + ) + require.Error(t, err) +} + +func Test_Transmitter_With_StatusChecker_CreateEthTransaction(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + ethKeyStore := NewKeyStore(t, db).Eth() + + _, fromAddress := MustInsertRandomKey(t, ethKeyStore) + + gasLimit := uint64(1000) + chainID := big.NewInt(0) + effectiveTransmitterAddress := fromAddress + txm := txmmocks.NewMockEvmTxManager(t) + strategy := newMockTxStrategy(t) + toAddress := testutils.NewAddress() + payload := []byte{1, 2, 3} + idempotencyKey := "1-0" + txMeta := &txmgr.TxMeta{MessageIDs: []string{"1"}} + + transmitter, err := NewTransmitterWithStatusChecker( + txm, + []common.Address{fromAddress}, + gasLimit, + effectiveTransmitterAddress, + strategy, + txmgr.TransmitCheckerSpec{}, + chainID, + ethKeyStore, + ) + require.NoError(t, err) + + // This case is for when the message ID was not found in the status checker + txm.On("GetTransactionStatus", mock.Anything, idempotencyKey).Return(types.Unknown, errors.New("dummy")).Once() + + txm.On("CreateTransaction", mock.Anything, txmgr.TxRequest{ + IdempotencyKey: &idempotencyKey, + FromAddress: fromAddress, + ToAddress: toAddress, + EncodedPayload: payload, + FeeLimit: gasLimit, + ForwarderAddress: common.Address{}, + Meta: txMeta, + Strategy: strategy, + }).Return(txmgr.Tx{}, nil).Once() + + require.NoError(t, transmitter.CreateEthTransaction(testutils.Context(t), toAddress, payload, txMeta)) + txm.AssertExpectations(t) +} + +func NewKeyStore(t testing.TB, ds sqlutil.DataSource) keystore.Master { + ctx := testutils.Context(t) + keystore := keystore.NewInMemory(ds, utils.FastScryptParams, logger.TestLogger(t)) + require.NoError(t, keystore.Unlock(ctx, Password)) + return keystore +} + +type RandomKey struct { + Nonce int64 + Disabled bool + + chainIDs []ubig.Big // nil: Fixture, set empty for none +} + +func (r RandomKey) MustInsert(t testing.TB, keystore keystore.Eth) (ethkey.KeyV2, common.Address) { + ctx := testutils.Context(t) + chainIDs := r.chainIDs + if chainIDs == nil { + chainIDs = []ubig.Big{*ubig.New(&FixtureChainID)} + } + + key := MustGenerateRandomKey(t) + keystore.XXXTestingOnlyAdd(ctx, key) + + for _, cid := range chainIDs { + require.NoError(t, keystore.Add(ctx, key.Address, cid.ToInt())) + require.NoError(t, keystore.Enable(ctx, key.Address, cid.ToInt())) + if r.Disabled { + require.NoError(t, keystore.Disable(ctx, key.Address, cid.ToInt())) + } + } + + return key, key.Address +} + +func MustInsertRandomKey(t testing.TB, keystore keystore.Eth, chainIDs ...ubig.Big) (ethkey.KeyV2, common.Address) { + r := RandomKey{} + if len(chainIDs) > 0 { + r.chainIDs = chainIDs + } + return r.MustInsert(t, keystore) +} + +func MustGenerateRandomKey(t testing.TB) ethkey.KeyV2 { + key, err := ethkey.NewV2() + require.NoError(t, err) + return key +} diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index 676dbd54b5..73a3aed4a1 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -17,6 +17,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/ccipcommit" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/ccipexec" ccipconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/config" + cciptransmitter "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/transmitter" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" @@ -829,6 +830,17 @@ func generateTransmitterFrom(ctx context.Context, rargs commontypes.RelayArgs, e configWatcher.chain.ID(), ethKeystore, ) + case commontypes.CCIPExecution: + transmitter, err = cciptransmitter.NewTransmitterWithStatusChecker( + configWatcher.chain.TxManager(), + fromAddresses, + gasLimit, + effectiveTransmitterAddress, + strategy, + checker, + configWatcher.chain.ID(), + ethKeystore, + ) default: transmitter, err = ocrcommon.NewTransmitter( configWatcher.chain.TxManager(), diff --git a/core/services/relay/evm/statuschecker/mocks/ccip_transaction_status_checker.go b/core/services/relay/evm/statuschecker/mocks/ccip_transaction_status_checker.go new file mode 100644 index 0000000000..9bd59ccf4e --- /dev/null +++ b/core/services/relay/evm/statuschecker/mocks/ccip_transaction_status_checker.go @@ -0,0 +1,104 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mocks + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + + types "github.com/smartcontractkit/chainlink-common/pkg/types" +) + +// CCIPTransactionStatusChecker is an autogenerated mock type for the CCIPTransactionStatusChecker type +type CCIPTransactionStatusChecker struct { + mock.Mock +} + +type CCIPTransactionStatusChecker_Expecter struct { + mock *mock.Mock +} + +func (_m *CCIPTransactionStatusChecker) EXPECT() *CCIPTransactionStatusChecker_Expecter { + return &CCIPTransactionStatusChecker_Expecter{mock: &_m.Mock} +} + +// CheckMessageStatus provides a mock function with given fields: ctx, msgID +func (_m *CCIPTransactionStatusChecker) CheckMessageStatus(ctx context.Context, msgID string) ([]types.TransactionStatus, int, error) { + ret := _m.Called(ctx, msgID) + + if len(ret) == 0 { + panic("no return value specified for CheckMessageStatus") + } + + var r0 []types.TransactionStatus + var r1 int + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, string) ([]types.TransactionStatus, int, error)); ok { + return rf(ctx, msgID) + } + if rf, ok := ret.Get(0).(func(context.Context, string) []types.TransactionStatus); ok { + r0 = rf(ctx, msgID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]types.TransactionStatus) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) int); ok { + r1 = rf(ctx, msgID) + } else { + r1 = ret.Get(1).(int) + } + + if rf, ok := ret.Get(2).(func(context.Context, string) error); ok { + r2 = rf(ctx, msgID) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// CCIPTransactionStatusChecker_CheckMessageStatus_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckMessageStatus' +type CCIPTransactionStatusChecker_CheckMessageStatus_Call struct { + *mock.Call +} + +// CheckMessageStatus is a helper method to define mock.On call +// - ctx context.Context +// - msgID string +func (_e *CCIPTransactionStatusChecker_Expecter) CheckMessageStatus(ctx interface{}, msgID interface{}) *CCIPTransactionStatusChecker_CheckMessageStatus_Call { + return &CCIPTransactionStatusChecker_CheckMessageStatus_Call{Call: _e.mock.On("CheckMessageStatus", ctx, msgID)} +} + +func (_c *CCIPTransactionStatusChecker_CheckMessageStatus_Call) Run(run func(ctx context.Context, msgID string)) *CCIPTransactionStatusChecker_CheckMessageStatus_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *CCIPTransactionStatusChecker_CheckMessageStatus_Call) Return(transactionStatuses []types.TransactionStatus, retryCounter int, err error) *CCIPTransactionStatusChecker_CheckMessageStatus_Call { + _c.Call.Return(transactionStatuses, retryCounter, err) + return _c +} + +func (_c *CCIPTransactionStatusChecker_CheckMessageStatus_Call) RunAndReturn(run func(context.Context, string) ([]types.TransactionStatus, int, error)) *CCIPTransactionStatusChecker_CheckMessageStatus_Call { + _c.Call.Return(run) + return _c +} + +// NewCCIPTransactionStatusChecker creates a new instance of CCIPTransactionStatusChecker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewCCIPTransactionStatusChecker(t interface { + mock.TestingT + Cleanup(func()) +}) *CCIPTransactionStatusChecker { + mock := &CCIPTransactionStatusChecker{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/services/relay/evm/statuschecker/txm_status_checker.go b/core/services/relay/evm/statuschecker/txm_status_checker.go new file mode 100644 index 0000000000..f22e6d78b9 --- /dev/null +++ b/core/services/relay/evm/statuschecker/txm_status_checker.go @@ -0,0 +1,54 @@ +package statuschecker + +import ( + "context" + "fmt" + + "github.com/smartcontractkit/chainlink-common/pkg/types" +) + +// CCIPTransactionStatusChecker is an interface that defines the method for checking the status of a transaction. +// CheckMessageStatus checks the status of a transaction for a given message ID. +// It returns a list of transaction statuses, the retry counter, and an error if any occurred during the process. +// + +type CCIPTransactionStatusChecker interface { + CheckMessageStatus(ctx context.Context, msgID string) (transactionStatuses []types.TransactionStatus, retryCounter int, err error) +} + +type TxmStatusChecker struct { + getTransactionStatus func(ctx context.Context, transactionID string) (types.TransactionStatus, error) +} + +func NewTxmStatusChecker(getTransactionStatus func(ctx context.Context, transactionID string) (types.TransactionStatus, error)) *TxmStatusChecker { + return &TxmStatusChecker{getTransactionStatus: getTransactionStatus} +} + +// CheckMessageStatus checks the status of a message by checking the status of all transactions associated with the message ID. +// It returns a slice of all statuses and the number of transactions found (-1 if none). +// The key will follow the format: -. TXM will be queried for each key until a NotFound error is returned. +// The goal is to find all transactions associated with a message ID and snooze messages if they are fatal in the Execution Plugin. +func (tsc *TxmStatusChecker) CheckMessageStatus(ctx context.Context, msgID string) ([]types.TransactionStatus, int, error) { + var counter int + const maxStatuses = 1000 // Cap the number of statuses to avoid infinite loop + + allStatuses := make([]types.TransactionStatus, 0) + + for { + transactionID := fmt.Sprintf("%s-%d", msgID, counter) + status, err := tsc.getTransactionStatus(ctx, transactionID) + if err != nil && status == types.Unknown { + // If the status is unknown and err not nil, it means the transaction was not found + break + } + allStatuses = append(allStatuses, status) + counter++ + + // Break the loop if the cap is reached + if counter >= maxStatuses { + return allStatuses, counter - 1, fmt.Errorf("maximum number of statuses reached, possible infinite loop") + } + } + + return allStatuses, counter - 1, nil +} diff --git a/core/services/relay/evm/statuschecker/txm_status_checker_test.go b/core/services/relay/evm/statuschecker/txm_status_checker_test.go new file mode 100644 index 0000000000..456d07e7a7 --- /dev/null +++ b/core/services/relay/evm/statuschecker/txm_status_checker_test.go @@ -0,0 +1,103 @@ +package statuschecker + +import ( + "context" + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr/mocks" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" +) + +func Test_CheckMessageStatus(t *testing.T) { + testutils.SkipShort(t, "") + ctx := context.Background() + mockTxManager := mocks.NewMockEvmTxManager(t) + checker := NewTxmStatusChecker(mockTxManager.GetTransactionStatus) + + msgID := "test-message-id" + + // Define test cases + testCases := []struct { + name string + setupMock func() + expectedStatus []types.TransactionStatus + expectedCounter int + expectedError error + }{ + { + name: "No transactions found", + setupMock: func() { + mockTxManager.Mock = mock.Mock{} + mockTxManager.On("GetTransactionStatus", ctx, "test-message-id-0").Return(types.Unknown, errors.New("failed to find transaction with IdempotencyKey test-message-id-0")) + }, + expectedStatus: []types.TransactionStatus{}, + expectedCounter: -1, + expectedError: nil, + }, + { + name: "Single transaction found", + setupMock: func() { + mockTxManager.Mock = mock.Mock{} + mockTxManager.On("GetTransactionStatus", ctx, "test-message-id-0").Return(types.Finalized, nil) + mockTxManager.On("GetTransactionStatus", ctx, "test-message-id-1").Return(types.Unknown, errors.New("failed to find transaction with IdempotencyKey test-message-id-1")) + }, + expectedStatus: []types.TransactionStatus{types.Finalized}, + expectedCounter: 0, + expectedError: nil, + }, + { + name: "Multiple transactions found", + setupMock: func() { + mockTxManager.Mock = mock.Mock{} + mockTxManager.On("GetTransactionStatus", ctx, "test-message-id-0").Return(types.Finalized, nil) + mockTxManager.On("GetTransactionStatus", ctx, "test-message-id-1").Return(types.Failed, nil) + mockTxManager.On("GetTransactionStatus", ctx, "test-message-id-2").Return(types.Unknown, errors.New("failed to find transaction with IdempotencyKey test-message-id-2")) + }, + expectedStatus: []types.TransactionStatus{types.Finalized, types.Failed}, + expectedCounter: 1, + expectedError: nil, + }, + { + name: "Unknown status without nil (in progress)", + setupMock: func() { + mockTxManager.Mock = mock.Mock{} + mockTxManager.On("GetTransactionStatus", ctx, "test-message-id-0").Return(types.Unknown, nil) + mockTxManager.On("GetTransactionStatus", ctx, "test-message-id-1").Return(types.Unknown, errors.New("failed to find transaction with IdempotencyKey test-message-id-1")) + }, + expectedStatus: []types.TransactionStatus{types.Unknown}, + expectedCounter: 0, + expectedError: nil, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + tc.setupMock() + statuses, counter, err := checker.CheckMessageStatus(ctx, msgID) + assert.Equal(t, tc.expectedStatus, statuses) + assert.Equal(t, tc.expectedCounter, counter) + assert.Equal(t, tc.expectedError, err) + mockTxManager.AssertExpectations(t) + }) + } +} + +func Test_FailForMoreThan1000Retries(t *testing.T) { + ctx := context.Background() + mockTxManager := mocks.NewMockEvmTxManager(t) + checker := NewTxmStatusChecker(mockTxManager.GetTransactionStatus) + + for i := 0; i < 1000; i++ { + mockTxManager.On("GetTransactionStatus", ctx, fmt.Sprintf("test-message-id-%d", i)).Return(types.Finalized, nil) + } + + msgID := "test-message-id" + _, _, err := checker.CheckMessageStatus(ctx, msgID) + assert.EqualError(t, err, "maximum number of statuses reached, possible infinite loop") +}