From 2e314cddf0f4dbd29cad4a43926dc1a5390cc70f Mon Sep 17 00:00:00 2001 From: amit-momin <108959691+amit-momin@users.noreply.github.com> Date: Fri, 16 Aug 2024 16:02:35 -0500 Subject: [PATCH] Update ZK overflow detection logic (#14132) * Updated ZK overflow detection to skip tx with non-broadcast attempts and delayed detection for zkEVM * Added changeset * Added assumption violation log * Updated XLayer to use the same detection logic as zkEVM * Fixed test --- .changeset/big-dots-report.md | 5 ++ core/chains/evm/config/toml/config.go | 12 +++- core/chains/evm/txmgr/stuck_tx_detector.go | 53 +++++++++++++++--- .../evm/txmgr/stuck_tx_detector_test.go | 56 ++++++++++++++++++- core/chains/evm/txmgr/txmgr_test.go | 6 +- 5 files changed, 118 insertions(+), 14 deletions(-) create mode 100644 .changeset/big-dots-report.md diff --git a/.changeset/big-dots-report.md b/.changeset/big-dots-report.md new file mode 100644 index 00000000000..01475010f0d --- /dev/null +++ b/.changeset/big-dots-report.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +Updated ZK overflow detection to skip transactions with non-broadcasted attempts. Delayed detection for zkEVM using the MinAttempts config. Updated XLayer to use the same detection logic as zkEVM. #internal diff --git a/core/chains/evm/config/toml/config.go b/core/chains/evm/config/toml/config.go index 2cb29d97696..ac7841ac497 100644 --- a/core/chains/evm/config/toml/config.go +++ b/core/chains/evm/config/toml/config.go @@ -412,8 +412,16 @@ func (c *Chain) ValidateConfig() (err error) { err = multierr.Append(err, commonconfig.ErrInvalid{Name: "Transactions.AutoPurge.DetectionApiUrl", Value: c.Transactions.AutoPurge.DetectionApiUrl.Scheme, Msg: "must be http or https"}) } } - case chaintype.ChainZkEvm: - // No other configs are needed + case chaintype.ChainZkEvm, chaintype.ChainXLayer: + // MinAttempts is an optional config that can be used to delay the stuck tx detection for zkEVM or XLayer + // If MinAttempts is set, BumpThreshold cannot be 0 + if c.Transactions.AutoPurge.MinAttempts != nil && *c.Transactions.AutoPurge.MinAttempts != 0 { + if c.GasEstimator.BumpThreshold == nil { + err = multierr.Append(err, commonconfig.ErrMissing{Name: "GasEstimator.BumpThreshold", Msg: fmt.Sprintf("must be set if Transactions.AutoPurge.MinAttempts is set for %s", chainType)}) + } else if *c.GasEstimator.BumpThreshold == 0 { + err = multierr.Append(err, commonconfig.ErrInvalid{Name: "GasEstimator.BumpThreshold", Value: 0, Msg: fmt.Sprintf("cannot be 0 if Transactions.AutoPurge.MinAttempts is set for %s", chainType)}) + } + } default: // Bump Threshold is required because the stuck tx heuristic relies on a minimum number of bump attempts to exist if c.GasEstimator.BumpThreshold == nil { diff --git a/core/chains/evm/txmgr/stuck_tx_detector.go b/core/chains/evm/txmgr/stuck_tx_detector.go index 5901be0b02d..5d621dc0b20 100644 --- a/core/chains/evm/txmgr/stuck_tx_detector.go +++ b/core/chains/evm/txmgr/stuck_tx_detector.go @@ -44,7 +44,7 @@ type stuckTxDetectorConfig interface { } type stuckTxDetector struct { - lggr logger.Logger + lggr logger.SugaredLogger chainID *big.Int chainType chaintype.ChainType maxPrice *assets.Wei @@ -64,7 +64,7 @@ func NewStuckTxDetector(lggr logger.Logger, chainID *big.Int, chainType chaintyp t.DisableCompression = true httpClient := &http.Client{Transport: t} return &stuckTxDetector{ - lggr: lggr, + lggr: logger.Sugared(lggr), chainID: chainID, chainType: chainType, maxPrice: maxPrice, @@ -128,7 +128,7 @@ func (d *stuckTxDetector) DetectStuckTransactions(ctx context.Context, enabledAd switch d.chainType { case chaintype.ChainScroll: return d.detectStuckTransactionsScroll(ctx, txs) - case chaintype.ChainZkEvm: + case chaintype.ChainZkEvm, chaintype.ChainXLayer: return d.detectStuckTransactionsZkEVM(ctx, txs) default: return d.detectStuckTransactionsHeuristic(ctx, txs, blockNum) @@ -153,11 +153,28 @@ func (d *stuckTxDetector) FindUnconfirmedTxWithLowestNonce(ctx context.Context, } } - // Build list of potentially stuck tx but exclude any that are already marked for purge + // Build list of potentially stuck tx but exclude any that are already marked for purge or have non-broadcasted attempts var stuckTxs []Tx for _, tx := range lowestNonceTxMap { - // Attempts are loaded newest to oldest so one marked for purge will always be first - if len(tx.TxAttempts) > 0 && !tx.TxAttempts[0].IsPurgeAttempt { + if len(tx.TxAttempts) == 0 { + d.lggr.AssumptionViolationw("encountered an unconfirmed transaction without an attempt", "tx", tx) + continue + } + // Check the transaction's attempts in case any are already marked for purge or if any are not broadcasted + // We can only have one non-broadcasted attempt for a transaction at a time + // Skip purge detection until all attempts are broadcasted to avoid conflicts with the purge attempt + var foundPurgeAttempt, foundNonBroadcastAttempt bool + for _, attempt := range tx.TxAttempts { + if attempt.IsPurgeAttempt { + foundPurgeAttempt = true + break + } + if attempt.State != types.TxAttemptBroadcast { + foundNonBroadcastAttempt = true + break + } + } + if !foundPurgeAttempt && !foundNonBroadcastAttempt { stuckTxs = append(stuckTxs, tx) } } @@ -322,14 +339,32 @@ func (d *stuckTxDetector) detectStuckTransactionsScroll(ctx context.Context, txs // Uses eth_getTransactionByHash to detect that a transaction has been discarded due to overflow // Currently only used by zkEVM but if other chains follow the same behavior in the future func (d *stuckTxDetector) detectStuckTransactionsZkEVM(ctx context.Context, txs []Tx) ([]Tx, error) { - txReqs := make([]rpc.BatchElem, len(txs)) + minAttempts := 0 + if d.cfg.MinAttempts() != nil { + minAttempts = int(*d.cfg.MinAttempts()) + } + // Check transactions have MinAttempts to ensure it has enough time to return results for getTransactionByHash + // zkEVM has a significant delay between broadcasting a transaction and getting a proper result from the RPC + var filteredTx []Tx + for _, tx := range txs { + if len(tx.TxAttempts) >= minAttempts { + filteredTx = append(filteredTx, tx) + } + } + + // No transactions to process + if len(filteredTx) == 0 { + return filteredTx, nil + } + + txReqs := make([]rpc.BatchElem, len(filteredTx)) txHashMap := make(map[common.Hash]Tx) - txRes := make([]*map[string]interface{}, len(txs)) + txRes := make([]*map[string]interface{}, len(filteredTx)) // Build batch request elems to perform // Does not need to be separated out into smaller batches // Max number of transactions to check is equal to the number of enabled addresses which is a relatively small amount - for i, tx := range txs { + for i, tx := range filteredTx { latestAttemptHash := tx.TxAttempts[0].Hash var result map[string]interface{} txReqs[i] = rpc.BatchElem{ diff --git a/core/chains/evm/txmgr/stuck_tx_detector_test.go b/core/chains/evm/txmgr/stuck_tx_detector_test.go index 5f0d73be184..def49f8e113 100644 --- a/core/chains/evm/txmgr/stuck_tx_detector_test.go +++ b/core/chains/evm/txmgr/stuck_tx_detector_test.go @@ -155,6 +155,30 @@ func TestStuckTxDetector_FindPotentialStuckTxs(t *testing.T) { require.NoError(t, err) require.Len(t, stuckTxs, 0) }) + + t.Run("excludes transactions with a in-progress attempt", func(t *testing.T) { + _, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore) + etx := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 0, fromAddress) + attempt := cltest.NewLegacyEthTxAttempt(t, etx.ID) + attempt.TxFee.Legacy = assets.NewWeiI(2) + attempt.State = txmgrtypes.TxAttemptInProgress + require.NoError(t, txStore.InsertTxAttempt(ctx, &attempt)) + stuckTxs, err := stuckTxDetector.FindUnconfirmedTxWithLowestNonce(ctx, []common.Address{fromAddress}) + require.NoError(t, err) + require.Len(t, stuckTxs, 0) + }) + + t.Run("excludes transactions with an insufficient funds attempt", func(t *testing.T) { + _, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore) + etx := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 0, fromAddress) + attempt := cltest.NewLegacyEthTxAttempt(t, etx.ID) + attempt.TxFee.Legacy = assets.NewWeiI(2) + attempt.State = txmgrtypes.TxAttemptInsufficientFunds + require.NoError(t, txStore.InsertTxAttempt(ctx, &attempt)) + stuckTxs, err := stuckTxDetector.FindUnconfirmedTxWithLowestNonce(ctx, []common.Address{fromAddress}) + require.NoError(t, err) + require.Len(t, stuckTxs, 0) + }) } func TestStuckTxDetector_DetectStuckTransactionsHeuristic(t *testing.T) { @@ -271,8 +295,9 @@ func TestStuckTxDetector_DetectStuckTransactionsZkEVM(t *testing.T) { enabled: true, } blockNum := int64(100) - stuckTxDetector := txmgr.NewStuckTxDetector(lggr, testutils.FixtureChainID, chaintype.ChainZkEvm, assets.NewWei(assets.NewEth(100).ToInt()), autoPurgeCfg, feeEstimator, txStore, ethClient) + t.Run("returns empty list if no stuck transactions identified", func(t *testing.T) { + stuckTxDetector := txmgr.NewStuckTxDetector(lggr, testutils.FixtureChainID, chaintype.ChainZkEvm, assets.NewWei(assets.NewEth(100).ToInt()), autoPurgeCfg, feeEstimator, txStore, ethClient) _, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore) tx := mustInsertUnconfirmedTxWithBroadcastAttempts(t, txStore, 0, fromAddress, 1, blockNum, tenGwei) attempts := tx.TxAttempts[0] @@ -292,6 +317,7 @@ func TestStuckTxDetector_DetectStuckTransactionsZkEVM(t *testing.T) { }) t.Run("returns stuck transactions discarded by chain", func(t *testing.T) { + stuckTxDetector := txmgr.NewStuckTxDetector(lggr, testutils.FixtureChainID, chaintype.ChainZkEvm, assets.NewWei(assets.NewEth(100).ToInt()), autoPurgeCfg, feeEstimator, txStore, ethClient) // Insert tx that will be mocked as stuck _, fromAddress1 := cltest.MustInsertRandomKey(t, ethKeyStore) mustInsertUnconfirmedTxWithBroadcastAttempts(t, txStore, 0, fromAddress1, 1, blockNum, tenGwei) @@ -316,6 +342,34 @@ func TestStuckTxDetector_DetectStuckTransactionsZkEVM(t *testing.T) { // Expect only 1 tx to return as stuck due to nil eth_getTransactionByHash response require.Len(t, txs, 1) }) + + t.Run("skips stuck tx detection for transactions that do not have enough attempts", func(t *testing.T) { + autoPurgeCfg.minAttempts = ptr(uint32(2)) + stuckTxDetector := txmgr.NewStuckTxDetector(lggr, testutils.FixtureChainID, chaintype.ChainZkEvm, assets.NewWei(assets.NewEth(100).ToInt()), autoPurgeCfg, feeEstimator, txStore, ethClient) + // Insert tx with enough attempts for detection + _, fromAddress1 := cltest.MustInsertRandomKey(t, ethKeyStore) + etx1 := mustInsertUnconfirmedTxWithBroadcastAttempts(t, txStore, 0, fromAddress1, 1, blockNum, tenGwei) + attempt := cltest.NewLegacyEthTxAttempt(t, etx1.ID) + attempt.TxFee.Legacy = assets.NewWeiI(2) + attempt.State = txmgrtypes.TxAttemptBroadcast + require.NoError(t, txStore.InsertTxAttempt(ctx, &attempt)) + + // Insert tx that will be skipped for too few attempts + _, fromAddress2 := cltest.MustInsertRandomKey(t, ethKeyStore) + mustInsertUnconfirmedTxWithBroadcastAttempts(t, txStore, 0, fromAddress2, 1, blockNum, tenGwei) + + // Return nil response for a tx and a normal response for the other + ethClient.On("BatchCallContext", mock.Anything, mock.MatchedBy(func(b []rpc.BatchElem) bool { + return len(b) == 1 + })).Return(nil).Run(func(args mock.Arguments) { + elems := args.Get(1).([]rpc.BatchElem) + elems[0].Result = nil // Return nil to signal discarded tx + }).Once() + + txs, err := stuckTxDetector.DetectStuckTransactions(ctx, []common.Address{fromAddress1, fromAddress2}, blockNum) + require.NoError(t, err) + require.Len(t, txs, 1) + }) } func TestStuckTxDetector_DetectStuckTransactionsScroll(t *testing.T) { diff --git a/core/chains/evm/txmgr/txmgr_test.go b/core/chains/evm/txmgr/txmgr_test.go index 86bf5fcc4bd..4d85c260876 100644 --- a/core/chains/evm/txmgr/txmgr_test.go +++ b/core/chains/evm/txmgr/txmgr_test.go @@ -804,7 +804,8 @@ func TestTxm_GetTransactionStatus(t *testing.T) { require.NoError(t, err) state, err := txm.GetTransactionStatus(ctx, idempotencyKey) require.Equal(t, commontypes.Fatal, state) - require.Error(t, err, evmclient.TerminallyStuckMsg) + require.Error(t, err) + require.Equal(t, evmclient.TerminallyStuckMsg, err.Error()) // Test a terminally stuck client error returns Fatal nonce = evmtypes.Nonce(1) @@ -825,7 +826,8 @@ func TestTxm_GetTransactionStatus(t *testing.T) { require.NoError(t, err) state, err = txm.GetTransactionStatus(ctx, idempotencyKey) require.Equal(t, commontypes.Fatal, state) - require.Error(t, err, evmclient.TerminallyStuckMsg) + require.Error(t, err) + require.Equal(t, terminallyStuckClientError, err.Error()) }) t.Run("returns failed for fatal error state with other error", func(t *testing.T) {