diff --git a/.changeset/tidy-planets-kneel.md b/.changeset/tidy-planets-kneel.md new file mode 100644 index 00000000000..514b7c8bcbf --- /dev/null +++ b/.changeset/tidy-planets-kneel.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Remove dependency of MinConfirmations in EVM TXM code #update #deprecation_notice diff --git a/common/txmgr/confirmer.go b/common/txmgr/confirmer.go index d67bd451228..bc1edaf00da 100644 --- a/common/txmgr/confirmer.go +++ b/common/txmgr/confirmer.go @@ -346,7 +346,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pro if ec.resumeCallback != nil { mark = time.Now() - if err := ec.ResumePendingTaskRuns(ctx, head); err != nil { + if err := ec.ResumePendingTaskRuns(ctx); err != nil { return fmt.Errorf("ResumePendingTaskRuns failed: %w", err) } @@ -1231,9 +1231,8 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) sen } // ResumePendingTaskRuns issues callbacks to task runs that are pending waiting for receipts -func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ResumePendingTaskRuns(ctx context.Context, head types.Head[BLOCK_HASH]) error { - receiptsPlus, err := ec.txStore.FindTxesPendingCallback(ctx, head.BlockNumber(), ec.chainID) - +func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ResumePendingTaskRuns(ctx context.Context) error { + receiptsPlus, err := ec.txStore.FindTxesPendingCallback(ctx, ec.chainID) if err != nil { return err } diff --git a/common/txmgr/types/mocks/tx_store.go b/common/txmgr/types/mocks/tx_store.go index 4467729e167..acf9093cba8 100644 --- a/common/txmgr/types/mocks/tx_store.go +++ b/common/txmgr/types/mocks/tx_store.go @@ -1096,9 +1096,9 @@ func (_c *TxStore_FindTxesByMetaFieldAndStates_Call[ADDR, CHAIN_ID, TX_HASH, BLO return _c } -// FindTxesPendingCallback provides a mock function with given fields: ctx, blockNum, chainID -func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error) { - ret := _m.Called(ctx, blockNum, chainID) +// FindTxesPendingCallback provides a mock function with given fields: ctx, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesPendingCallback(ctx context.Context, chainID CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error) { + ret := _m.Called(ctx, chainID) if len(ret) == 0 { panic("no return value specified for FindTxesPendingCallback") @@ -1106,19 +1106,19 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesPen var r0 []txmgrtypes.ReceiptPlus[R] var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int64, CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error)); ok { - return rf(ctx, blockNum, chainID) + if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error)); ok { + return rf(ctx, chainID) } - if rf, ok := ret.Get(0).(func(context.Context, int64, CHAIN_ID) []txmgrtypes.ReceiptPlus[R]); ok { - r0 = rf(ctx, blockNum, chainID) + if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) []txmgrtypes.ReceiptPlus[R]); ok { + r0 = rf(ctx, chainID) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]txmgrtypes.ReceiptPlus[R]) } } - if rf, ok := ret.Get(1).(func(context.Context, int64, CHAIN_ID) error); ok { - r1 = rf(ctx, blockNum, chainID) + if rf, ok := ret.Get(1).(func(context.Context, CHAIN_ID) error); ok { + r1 = rf(ctx, chainID) } else { r1 = ret.Error(1) } @@ -1133,15 +1133,14 @@ type TxStore_FindTxesPendingCallback_Call[ADDR types.Hashable, CHAIN_ID types.ID // FindTxesPendingCallback is a helper method to define mock.On call // - ctx context.Context -// - blockNum int64 // - chainID CHAIN_ID -func (_e *TxStore_Expecter[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesPendingCallback(ctx interface{}, blockNum interface{}, chainID interface{}) *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { - return &TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{Call: _e.mock.On("FindTxesPendingCallback", ctx, blockNum, chainID)} +func (_e *TxStore_Expecter[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesPendingCallback(ctx interface{}, chainID interface{}) *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { + return &TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{Call: _e.mock.On("FindTxesPendingCallback", ctx, chainID)} } -func (_c *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Run(run func(ctx context.Context, blockNum int64, chainID CHAIN_ID)) *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { +func (_c *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Run(run func(ctx context.Context, chainID CHAIN_ID)) *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64), args[2].(CHAIN_ID)) + run(args[0].(context.Context), args[1].(CHAIN_ID)) }) return _c } @@ -1151,7 +1150,7 @@ func (_c *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HA return _c } -func (_c *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RunAndReturn(run func(context.Context, int64, CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error)) *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { +func (_c *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RunAndReturn(run func(context.Context, CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error)) *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { _c.Call.Return(run) return _c } diff --git a/common/txmgr/types/tx.go b/common/txmgr/types/tx.go index b65f7edf6e5..bcbd909ca81 100644 --- a/common/txmgr/types/tx.go +++ b/common/txmgr/types/tx.go @@ -83,8 +83,9 @@ type TxRequest[ADDR types.Hashable, TX_HASH types.Hashable] struct { // Pipeline variables - if you aren't calling this from chain tx task within // the pipeline, you don't need these variables - MinConfirmations clnull.Uint32 PipelineTaskRunID *uuid.UUID + // deprecated + MinConfirmations clnull.Uint32 Strategy TxStrategy diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go index 3d874cc4366..08ae3f03741 100644 --- a/common/txmgr/types/tx_store.go +++ b/common/txmgr/types/tx_store.go @@ -33,8 +33,8 @@ type TxStore[ TxHistoryReaper[CHAIN_ID] TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE] - // Find confirmed txes beyond the minConfirmations param that require callback but have not yet been signaled - FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error) + // Find finalized txes that require callback but have not yet been signaled + FindTxesPendingCallback(ctx context.Context, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error) // Update tx to mark that its callback has been signaled UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error SaveFetchedReceipts(ctx context.Context, r []R, state TxState, errorMsg *string, chainID CHAIN_ID) error diff --git a/core/chains/evm/client/simulated_backend_client.go b/core/chains/evm/client/simulated_backend_client.go index 9f8da08e806..6eb6d9c71cb 100644 --- a/core/chains/evm/client/simulated_backend_client.go +++ b/core/chains/evm/client/simulated_backend_client.go @@ -703,14 +703,17 @@ func (c *SimulatedBackendClient) ethGetHeaderByNumber(ctx context.Context, resul } func (c *SimulatedBackendClient) LatestFinalizedBlock(ctx context.Context) (*evmtypes.Head, error) { - block := c.b.Blockchain().CurrentFinalBlock() - return &evmtypes.Head{ - EVMChainID: ubig.NewI(c.chainId.Int64()), - Hash: block.Hash(), - Number: block.Number.Int64(), - ParentHash: block.ParentHash, - Timestamp: time.Unix(int64(block.Time), 0), - }, nil + if block := c.b.Blockchain().CurrentFinalBlock(); block != nil { + return &evmtypes.Head{ + EVMChainID: ubig.NewI(c.chainId.Int64()), + Hash: block.Hash(), + Number: block.Number.Int64(), + ParentHash: block.ParentHash, + Timestamp: time.Unix(int64(block.Time), 0), + }, nil + } + + return nil, fmt.Errorf("SimulatedBackendClient failed to find latest finalized block") } func (c *SimulatedBackendClient) ethGetLogs(ctx context.Context, result interface{}, args ...interface{}) error { diff --git a/core/chains/evm/txmgr/confirmer_test.go b/core/chains/evm/txmgr/confirmer_test.go index a9dec223bf8..c7bf6817e18 100644 --- a/core/chains/evm/txmgr/confirmer_test.go +++ b/core/chains/evm/txmgr/confirmer_test.go @@ -3033,9 +3033,6 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { }, }, } - - minConfirmations := int64(2) - pgtest.MustExec(t, db, `SET CONSTRAINTS fk_pipeline_runs_pruning_key DEFERRED`) pgtest.MustExec(t, db, `SET CONSTRAINTS pipeline_runs_pipeline_spec_id_fkey DEFERRED`) @@ -3049,16 +3046,16 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { tr := cltest.MustInsertUnfinishedPipelineTaskRun(t, db, run.ID) etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 1, 1, fromAddress) - mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, etx.TxAttempts[0].Hash) + mustInsertEthReceipt(t, txStore, head.Number, head.Hash, etx.TxAttempts[0].Hash) // Setting both signal_callback and callback_completed to TRUE to simulate a completed pipeline task // It would only be in a state past suspended if the resume callback was called and callback_completed was set to TRUE - pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE, callback_completed = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID) + pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, signal_callback = TRUE, callback_completed = TRUE WHERE id = $2`, &tr.ID, etx.ID) - err := ec.ResumePendingTaskRuns(tests.Context(t), &head) + err := ec.ResumePendingTaskRuns(tests.Context(t)) require.NoError(t, err) }) - t.Run("doesn't process task runs where the receipt is younger than minConfirmations", func(t *testing.T) { + t.Run("doesn't process task runs where the receipt is not finalized", func(t *testing.T) { ec := newEthConfirmer(t, txStore, ethClient, config, evmcfg, ethKeyStore, func(context.Context, uuid.UUID, interface{}, error) error { t.Fatal("No value expected") return nil @@ -3070,13 +3067,13 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 2, 1, fromAddress) mustInsertEthReceipt(t, txStore, head.Number, head.Hash, etx.TxAttempts[0].Hash) - pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID) + pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, signal_callback = TRUE, callback_completed = TRUE WHERE id = $2`, &tr.ID, etx.ID) - err := ec.ResumePendingTaskRuns(tests.Context(t), &head) + err := ec.ResumePendingTaskRuns(tests.Context(t)) require.NoError(t, err) }) - t.Run("processes eth_txes with receipts older than minConfirmations", func(t *testing.T) { + t.Run("processes eth_txes with receipts for finalized tx", func(t *testing.T) { ch := make(chan interface{}) nonce := evmtypes.Nonce(3) var err error @@ -3090,25 +3087,25 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { tr := cltest.MustInsertUnfinishedPipelineTaskRun(t, db, run.ID) pgtest.MustExec(t, db, `UPDATE pipeline_runs SET state = 'suspended' WHERE id = $1`, run.ID) - etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, int64(nonce), 1, fromAddress) + etx := cltest.MustInsertFinalizedEthTxWithLegacyAttempt(t, txStore, int64(nonce), 1, fromAddress) pgtest.MustExec(t, db, `UPDATE evm.txes SET meta='{"FailOnRevert": true}'`) - receipt := mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, etx.TxAttempts[0].Hash) + receipt := mustInsertEthReceipt(t, txStore, head.Number, head.Hash, etx.TxAttempts[0].Hash) - pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID) + pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, signal_callback = TRUE WHERE id = $2`, &tr.ID, etx.ID) done := make(chan struct{}) t.Cleanup(func() { <-done }) go func() { defer close(done) - err2 := ec.ResumePendingTaskRuns(tests.Context(t), &head) + err2 := ec.ResumePendingTaskRuns(tests.Context(t)) if !assert.NoError(t, err2) { return } // Retrieve Tx to check if callback completed flag was set to true - updateTx, err3 := txStore.FindTxWithSequence(tests.Context(t), fromAddress, nonce) - if assert.NoError(t, err3) { - assert.Equal(t, true, updateTx.CallbackCompleted) - } + txs, err3 := txStore.FindTxesByFromAddressAndNonce(tests.Context(t), fromAddress, int64(nonce)) + assert.Nil(t, err3) + assert.Equal(t, 1, len(txs)) + assert.Equal(t, true, txs[0].CallbackCompleted) }() select { @@ -3126,7 +3123,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { pgtest.MustExec(t, db, `DELETE FROM pipeline_runs`) - t.Run("processes eth_txes with receipt older than minConfirmations that reverted", func(t *testing.T) { + t.Run("processes eth_txes with receipt for finalized tx that reverted", func(t *testing.T) { type data struct { value any error @@ -3142,27 +3139,27 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { tr := cltest.MustInsertUnfinishedPipelineTaskRun(t, db, run.ID) pgtest.MustExec(t, db, `UPDATE pipeline_runs SET state = 'suspended' WHERE id = $1`, run.ID) - etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, int64(nonce), 1, fromAddress) + etx := cltest.MustInsertFinalizedEthTxWithLegacyAttempt(t, txStore, int64(nonce), 1, fromAddress) pgtest.MustExec(t, db, `UPDATE evm.txes SET meta='{"FailOnRevert": true}'`) // receipt is not passed through as a value since it reverted and caused an error - mustInsertRevertedEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, etx.TxAttempts[0].Hash) + mustInsertRevertedEthReceipt(t, txStore, head.Number-2, head.Hash, etx.TxAttempts[0].Hash) - pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID) + pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, signal_callback = TRUE WHERE id = $2`, &tr.ID, etx.ID) done := make(chan struct{}) t.Cleanup(func() { <-done }) go func() { defer close(done) - err2 := ec.ResumePendingTaskRuns(tests.Context(t), &head) + err2 := ec.ResumePendingTaskRuns(tests.Context(t)) if !assert.NoError(t, err2) { return } // Retrieve Tx to check if callback completed flag was set to true - updateTx, err3 := txStore.FindTxWithSequence(tests.Context(t), fromAddress, nonce) - if assert.NoError(t, err3) { - assert.Equal(t, true, updateTx.CallbackCompleted) - } + txs, err := txStore.FindTxesByFromAddressAndNonce(tests.Context(t), fromAddress, int64(nonce)) + assert.Nil(t, err) + assert.Equal(t, 1, len(txs)) + assert.Equal(t, true, txs[0].CallbackCompleted) }() select { @@ -3187,17 +3184,18 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { run := cltest.MustInsertPipelineRun(t, db) tr := cltest.MustInsertUnfinishedPipelineTaskRun(t, db, run.ID) - etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, int64(nonce), 1, fromAddress) - mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, etx.TxAttempts[0].Hash) - pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID) + etx := cltest.MustInsertFinalizedEthTxWithLegacyAttempt(t, txStore, int64(nonce), 1, fromAddress) + mustInsertEthReceipt(t, txStore, head.Number, head.Hash, etx.TxAttempts[0].Hash) + pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, signal_callback = TRUE WHERE id = $2`, &tr.ID, etx.ID) - err := ec.ResumePendingTaskRuns(tests.Context(t), &head) + err := ec.ResumePendingTaskRuns(tests.Context(t)) require.Error(t, err) // Retrieve Tx to check if callback completed flag was left unchanged - updateTx, err := txStore.FindTxWithSequence(tests.Context(t), fromAddress, nonce) - require.NoError(t, err) - require.Equal(t, false, updateTx.CallbackCompleted) + txs, err := txStore.FindTxesByFromAddressAndNonce(tests.Context(t), fromAddress, int64(nonce)) + assert.Nil(t, err) + assert.Equal(t, 1, len(txs)) + assert.Equal(t, false, txs[0].CallbackCompleted) }) } diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index fa2251168d9..4e0bf3b9cd4 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -76,6 +76,7 @@ type TestEvmTxStore interface { GetAllTxAttempts(ctx context.Context) (attempts []TxAttempt, err error) CountTxesByStateAndSubject(ctx context.Context, state txmgrtypes.TxState, subject uuid.UUID) (count int, err error) FindTxesByFromAddressAndState(ctx context.Context, fromAddress common.Address, state string) (txes []*Tx, err error) + FindTxesByFromAddressAndNonce(ctx context.Context, fromAddress common.Address, nonce int64) (txes []*Tx, err error) UpdateTxAttemptBroadcastBeforeBlockNum(ctx context.Context, id int64, blockNum uint) error } @@ -250,7 +251,6 @@ func (db DbEthTx) ToTx(tx *Tx) { tx.Meta = db.Meta tx.Subject = db.Subject tx.PipelineTaskRunID = db.PipelineTaskRunID - tx.MinConfirmations = db.MinConfirmations tx.ChainID = db.EVMChainID.ToInt() tx.TransmitChecker = db.TransmitChecker tx.InitialBroadcastAt = db.InitialBroadcastAt @@ -1056,19 +1056,20 @@ WHERE evm.tx_attempts.state = 'in_progress' AND evm.txes.from_address = $1 AND e } // Find confirmed txes requiring callback but have not yet been signaled -func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) { +func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) { var rs []dbReceiptPlus var cancel context.CancelFunc ctx, cancel = o.stopCh.Ctx(ctx) defer cancel() + err = o.q.SelectContext(ctx, &rs, ` SELECT evm.txes.pipeline_task_run_id, evm.receipts.receipt, COALESCE((evm.txes.meta->>'FailOnRevert')::boolean, false) "FailOnRevert" FROM evm.txes INNER JOIN evm.tx_attempts ON evm.txes.id = evm.tx_attempts.eth_tx_id INNER JOIN evm.receipts ON evm.tx_attempts.hash = evm.receipts.tx_hash WHERE evm.txes.pipeline_task_run_id IS NOT NULL AND evm.txes.signal_callback = TRUE AND evm.txes.callback_completed = FALSE - AND evm.receipts.block_number <= ($1 - evm.txes.min_confirmations) AND evm.txes.evm_chain_id = $2 - `, blockNum, chainID.String()) + AND evm.txes.state = 'finalized' AND evm.txes.evm_chain_id = $1 + `, chainID.String()) if err != nil { return nil, fmt.Errorf("failed to retrieve transactions pending pipeline resume callback: %w", err) } @@ -1852,10 +1853,10 @@ func (o *evmTxStore) CreateTransaction(ctx context.Context, txRequest TxRequest, err = orm.q.GetContext(ctx, &dbEtx, ` INSERT INTO evm.txes (from_address, to_address, encoded_payload, value, gas_limit, state, created_at, meta, subject, evm_chain_id, min_confirmations, pipeline_task_run_id, transmit_checker, idempotency_key, signal_callback) VALUES ( -$1,$2,$3,$4,$5,'unstarted',NOW(),$6,$7,$8,$9,$10,$11,$12,$13 +$1,$2,$3,$4,$5,'unstarted',NOW(),$6,$7,$8,NULL,$9,$10,$11,$12 ) RETURNING "txes".* -`, txRequest.FromAddress, txRequest.ToAddress, txRequest.EncodedPayload, assets.Eth(txRequest.Value), txRequest.FeeLimit, txRequest.Meta, txRequest.Strategy.Subject(), chainID.String(), txRequest.MinConfirmations, txRequest.PipelineTaskRunID, txRequest.Checker, txRequest.IdempotencyKey, txRequest.SignalCallback) +`, txRequest.FromAddress, txRequest.ToAddress, txRequest.EncodedPayload, assets.Eth(txRequest.Value), txRequest.FeeLimit, txRequest.Meta, txRequest.Strategy.Subject(), chainID.String(), txRequest.PipelineTaskRunID, txRequest.Checker, txRequest.IdempotencyKey, txRequest.SignalCallback) if err != nil { return pkgerrors.Wrap(err, "CreateEthTransaction failed to insert evm tx") } @@ -2100,6 +2101,18 @@ func (o *evmTxStore) FindTxesByFromAddressAndState(ctx context.Context, fromAddr return txes, err } +func (o *evmTxStore) FindTxesByFromAddressAndNonce(ctx context.Context, fromAddress common.Address, nonce int64) (txes []*Tx, err error) { + var cancel context.CancelFunc + ctx, cancel = o.stopCh.Ctx(ctx) + defer cancel() + sql := "SELECT * FROM evm.txes WHERE from_address = $1 AND nonce = $2" + var dbEtxs []DbEthTx + err = o.q.SelectContext(ctx, &dbEtxs, sql, fromAddress, nonce) + txes = make([]*Tx, len(dbEtxs)) + dbEthTxsToEvmEthTxPtrs(dbEtxs, txes) + return txes, err +} + func (o *evmTxStore) UpdateTxAttemptBroadcastBeforeBlockNum(ctx context.Context, id int64, blockNum uint) error { var cancel context.CancelFunc ctx, cancel = o.stopCh.Ctx(ctx) diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index c711c2788e8..1e69e4852f3 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -642,8 +642,6 @@ func TestORM_FindTxesPendingCallback(t *testing.T) { }, } - minConfirmations := int64(2) - // Suspended run waiting for callback run1 := cltest.MustInsertPipelineRun(t, db) tr1 := cltest.MustInsertUnfinishedPipelineTaskRun(t, db, run1.ID) @@ -651,8 +649,8 @@ func TestORM_FindTxesPendingCallback(t *testing.T) { etx1 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 3, 1, fromAddress) pgtest.MustExec(t, db, `UPDATE evm.txes SET meta='{"FailOnRevert": true}'`) attempt1 := etx1.TxAttempts[0] - mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, attempt1.Hash) - pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr1.ID, minConfirmations, etx1.ID) + receipt1 := mustInsertEthReceipt(t, txStore, head.BlockNumber(), head.Hash, attempt1.Hash) + pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, signal_callback = TRUE WHERE id = $2`, &tr1.ID, etx1.ID) // Callback to pipeline service completed. Should be ignored run2 := cltest.MustInsertPipelineRunWithStatus(t, db, 0, pipeline.RunStatusCompleted, 0) @@ -660,33 +658,37 @@ func TestORM_FindTxesPendingCallback(t *testing.T) { etx2 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 4, 1, fromAddress) pgtest.MustExec(t, db, `UPDATE evm.txes SET meta='{"FailOnRevert": false}'`) attempt2 := etx2.TxAttempts[0] - mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, attempt2.Hash) - pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE, callback_completed = TRUE WHERE id = $3`, &tr2.ID, minConfirmations, etx2.ID) + mustInsertEthReceipt(t, txStore, head.BlockNumber()-2, head.Hash, attempt2.Hash) + pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, signal_callback = TRUE, callback_completed = TRUE WHERE id = $2`, &tr2.ID, etx2.ID) - // Suspended run younger than minConfirmations. Should be ignored + // Suspend not yet finalized run. Should be ignored run3 := cltest.MustInsertPipelineRun(t, db) tr3 := cltest.MustInsertUnfinishedPipelineTaskRun(t, db, run3.ID) pgtest.MustExec(t, db, `UPDATE pipeline_runs SET state = 'suspended' WHERE id = $1`, run3.ID) etx3 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 5, 1, fromAddress) pgtest.MustExec(t, db, `UPDATE evm.txes SET meta='{"FailOnRevert": false}'`) attempt3 := etx3.TxAttempts[0] - mustInsertEthReceipt(t, txStore, head.Number, head.Hash, attempt3.Hash) - pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr3.ID, minConfirmations, etx3.ID) + receipt3 := mustInsertEthReceipt(t, txStore, head.Number, head.Hash, attempt3.Hash) + pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, signal_callback = TRUE WHERE id = $2`, &tr3.ID, etx3.ID) // Tx not marked for callback. Should be ignore etx4 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 6, 1, fromAddress) attempt4 := etx4.TxAttempts[0] mustInsertEthReceipt(t, txStore, head.Number, head.Hash, attempt4.Hash) - pgtest.MustExec(t, db, `UPDATE evm.txes SET min_confirmations = $1 WHERE id = $2`, minConfirmations, etx4.ID) // Unconfirmed Tx without receipts. Should be ignored - etx5 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 7, 1, fromAddress) - pgtest.MustExec(t, db, `UPDATE evm.txes SET min_confirmations = $1 WHERE id = $2`, minConfirmations, etx5.ID) + cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 7, 1, fromAddress) + + // update txes state to be 'finalized' using finalizer + err := txStore.UpdateTxStatesToFinalizedUsingReceiptIds(tests.Context(t), []int64{receipt1.ID}, testutils.FixtureChainID) + require.NoError(t, err) + err = txStore.UpdateTxStatesToFinalizedUsingReceiptIds(tests.Context(t), []int64{receipt3.ID}, testutils.FixtureChainID) + require.NoError(t, err) // Search evm.txes table for tx requiring callback - receiptsPlus, err := txStore.FindTxesPendingCallback(tests.Context(t), head.Number, ethClient.ConfiguredChainID()) + receiptsPlus, err := txStore.FindTxesPendingCallback(tests.Context(t), ethClient.ConfiguredChainID()) require.NoError(t, err) - assert.Len(t, receiptsPlus, 1) + assert.Len(t, receiptsPlus, 2) assert.Equal(t, tr1.ID, receiptsPlus[0].ID) } diff --git a/core/chains/evm/txmgr/mocks/evm_tx_store.go b/core/chains/evm/txmgr/mocks/evm_tx_store.go index a9a175e3d94..6047c41c8b4 100644 --- a/core/chains/evm/txmgr/mocks/evm_tx_store.go +++ b/core/chains/evm/txmgr/mocks/evm_tx_store.go @@ -1395,9 +1395,9 @@ func (_c *EvmTxStore_FindTxesByMetaFieldAndStates_Call) RunAndReturn(run func(co return _c } -// FindTxesPendingCallback provides a mock function with given fields: ctx, blockNum, chainID -func (_m *EvmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error) { - ret := _m.Called(ctx, blockNum, chainID) +// FindTxesPendingCallback provides a mock function with given fields: ctx, chainID +func (_m *EvmTxStore) FindTxesPendingCallback(ctx context.Context, chainID *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error) { + ret := _m.Called(ctx, chainID) if len(ret) == 0 { panic("no return value specified for FindTxesPendingCallback") @@ -1405,19 +1405,19 @@ func (_m *EvmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int6 var r0 []types.ReceiptPlus[*evmtypes.Receipt] var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int64, *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error)); ok { - return rf(ctx, blockNum, chainID) + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error)); ok { + return rf(ctx, chainID) } - if rf, ok := ret.Get(0).(func(context.Context, int64, *big.Int) []types.ReceiptPlus[*evmtypes.Receipt]); ok { - r0 = rf(ctx, blockNum, chainID) + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) []types.ReceiptPlus[*evmtypes.Receipt]); ok { + r0 = rf(ctx, chainID) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]types.ReceiptPlus[*evmtypes.Receipt]) } } - if rf, ok := ret.Get(1).(func(context.Context, int64, *big.Int) error); ok { - r1 = rf(ctx, blockNum, chainID) + if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { + r1 = rf(ctx, chainID) } else { r1 = ret.Error(1) } @@ -1432,15 +1432,14 @@ type EvmTxStore_FindTxesPendingCallback_Call struct { // FindTxesPendingCallback is a helper method to define mock.On call // - ctx context.Context -// - blockNum int64 // - chainID *big.Int -func (_e *EvmTxStore_Expecter) FindTxesPendingCallback(ctx interface{}, blockNum interface{}, chainID interface{}) *EvmTxStore_FindTxesPendingCallback_Call { - return &EvmTxStore_FindTxesPendingCallback_Call{Call: _e.mock.On("FindTxesPendingCallback", ctx, blockNum, chainID)} +func (_e *EvmTxStore_Expecter) FindTxesPendingCallback(ctx interface{}, chainID interface{}) *EvmTxStore_FindTxesPendingCallback_Call { + return &EvmTxStore_FindTxesPendingCallback_Call{Call: _e.mock.On("FindTxesPendingCallback", ctx, chainID)} } -func (_c *EvmTxStore_FindTxesPendingCallback_Call) Run(run func(ctx context.Context, blockNum int64, chainID *big.Int)) *EvmTxStore_FindTxesPendingCallback_Call { +func (_c *EvmTxStore_FindTxesPendingCallback_Call) Run(run func(ctx context.Context, chainID *big.Int)) *EvmTxStore_FindTxesPendingCallback_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64), args[2].(*big.Int)) + run(args[0].(context.Context), args[1].(*big.Int)) }) return _c } @@ -1450,7 +1449,7 @@ func (_c *EvmTxStore_FindTxesPendingCallback_Call) Return(receiptsPlus []types.R return _c } -func (_c *EvmTxStore_FindTxesPendingCallback_Call) RunAndReturn(run func(context.Context, int64, *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error)) *EvmTxStore_FindTxesPendingCallback_Call { +func (_c *EvmTxStore_FindTxesPendingCallback_Call) RunAndReturn(run func(context.Context, *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error)) *EvmTxStore_FindTxesPendingCallback_Call { _c.Call.Return(run) return _c } diff --git a/core/internal/cltest/factories.go b/core/internal/cltest/factories.go index c488dca94a9..1dc318c3d12 100644 --- a/core/internal/cltest/factories.go +++ b/core/internal/cltest/factories.go @@ -220,6 +220,26 @@ func MustInsertConfirmedEthTxWithLegacyAttempt(t *testing.T, txStore txmgr.TestE return etx } +func MustInsertFinalizedEthTxWithLegacyAttempt(t *testing.T, txStore txmgr.TestEvmTxStore, nonce int64, broadcastBeforeBlockNum int64, fromAddress common.Address) txmgr.Tx { + timeNow := time.Now() + etx := NewEthTx(fromAddress) + ctx := testutils.Context(t) + + etx.BroadcastAt = &timeNow + etx.InitialBroadcastAt = &timeNow + n := evmtypes.Nonce(nonce) + etx.Sequence = &n + etx.State = txmgrcommon.TxFinalized + etx.MinConfirmations.SetValid(6) + require.NoError(t, txStore.InsertTx(ctx, &etx)) + attempt := NewLegacyEthTxAttempt(t, etx.ID) + attempt.BroadcastBeforeBlockNum = &broadcastBeforeBlockNum + attempt.State = txmgrtypes.TxAttemptBroadcast + require.NoError(t, txStore.InsertTxAttempt(ctx, &attempt)) + etx.TxAttempts = append(etx.TxAttempts, attempt) + return etx +} + func NewLegacyEthTxAttempt(t *testing.T, etxID int64) txmgr.TxAttempt { gasPrice := assets.NewWeiI(1) return txmgr.TxAttempt{ diff --git a/core/internal/features/features_test.go b/core/internal/features/features_test.go index 159ea27e939..fd768d0d27e 100644 --- a/core/internal/features/features_test.go +++ b/core/internal/features/features_test.go @@ -474,6 +474,10 @@ func setupAppForEthTx(t *testing.T, operatorContracts OperatorContracts) (app *c cfg := configtest.NewGeneralConfigSimulated(t, func(c *chainlink.Config, s *chainlink.Secrets) { c.Database.Listener.FallbackPollInterval = commonconfig.MustNewDuration(100 * time.Millisecond) + finalityTagEnabled := true + FinalityTagBypass := false + c.EVM[0].HeadTracker.FinalityTagBypass = &FinalityTagBypass + c.EVM[0].FinalityTagEnabled = &finalityTagEnabled }) app = cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, cfg, b, lggr) b.Commit() @@ -505,6 +509,7 @@ func TestIntegration_AsyncEthTx(t *testing.T) { t.Parallel() operatorContracts := setupOperatorContracts(t) b := operatorContracts.sim + b.Blockchain().SetFinalized(b.Blockchain().CurrentBlock()) t.Run("with FailOnRevert enabled, run succeeds when transaction is successful", func(t *testing.T) { app, sendingAddr, o := setupAppForEthTx(t, operatorContracts) @@ -532,7 +537,9 @@ observationSource = """ assert.Equal(t, []*string(nil), run.Errors) testutils.WaitForLogMessage(t, o, "Sending transaction") - b.Commit() // Needs at least two confirmations + b.Commit() // Needs at least two confirmations + time.Sleep(10 * time.Millisecond) // wait for finalizer to process confirmed tx + b.Blockchain().SetFinalized(b.Blockchain().CurrentBlock()) b.Commit() // Needs at least two confirmations b.Commit() // Needs at least two confirmations testutils.WaitForLogMessage(t, o, "Resume run success") @@ -578,7 +585,9 @@ observationSource = """ assert.Equal(t, []*string(nil), run.Errors) testutils.WaitForLogMessage(t, o, "Sending transaction") - b.Commit() // Needs at least two confirmations + b.Commit() // Needs at least two confirmations + time.Sleep(10 * time.Millisecond) // wait for finalizer to process confirmed tx + b.Blockchain().SetFinalized(b.Blockchain().CurrentBlock()) b.Commit() // Needs at least two confirmations b.Commit() // Needs at least two confirmations testutils.WaitForLogMessage(t, o, "Resume run success") @@ -617,6 +626,8 @@ observationSource = """ testutils.WaitForLogMessage(t, o, "Sending transaction") b.Commit() // Needs at least two confirmations + time.Sleep(10 * time.Millisecond) + b.Blockchain().SetFinalized(b.Blockchain().CurrentBlock()) b.Commit() // Needs at least two confirmations b.Commit() // Needs at least two confirmations testutils.WaitForLogMessage(t, o, "Resume run success")