From 95e800cc96c4d60f7a3fdf4944a12e8bc138835c Mon Sep 17 00:00:00 2001 From: amit-momin Date: Thu, 14 Nov 2024 13:55:44 -0600 Subject: [PATCH] Updated finalization logic to delete stale receipts if detected --- core/chains/evm/txmgr/evm_tx_store.go | 9 ++++ core/chains/evm/txmgr/evm_tx_store_test.go | 27 ++++++++++++ core/chains/evm/txmgr/finalizer.go | 19 +++++++-- core/chains/evm/txmgr/finalizer_test.go | 4 +- core/chains/evm/txmgr/mocks/evm_tx_store.go | 47 +++++++++++++++++++++ 5 files changed, 101 insertions(+), 5 deletions(-) diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index c13b2f8d333..2c952c85ee4 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -43,6 +43,7 @@ type EvmTxStore interface { TxStoreWebApi // methods used solely in EVM components + DeleteReceiptByTxHash(ctx context.Context, txHash common.Hash) error FindAttemptsRequiringReceiptFetch(ctx context.Context, chainID *big.Int) (hashes []TxAttempt, err error) FindConfirmedTxesReceipts(ctx context.Context, finalizedBlockNum int64, chainID *big.Int) (receipts []*evmtypes.Receipt, err error) FindTxesPendingCallback(ctx context.Context, latest, finalized int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) @@ -1091,6 +1092,14 @@ AND evm.tx_attempts.eth_tx_id = ANY($1) return pkgerrors.Wrap(err, "deleteEthReceipts failed") } +func (o *evmTxStore) DeleteReceiptByTxHash(ctx context.Context, txHash common.Hash) error { + var cancel context.CancelFunc + ctx, cancel = o.stopCh.Ctx(ctx) + defer cancel() + _, err := o.q.ExecContext(ctx, `DELETE FROM evm.receipts WHERE tx_hash = $1`, txHash) + return err +} + func (o *evmTxStore) UpdateTxsForRebroadcast(ctx context.Context, etxIDs []int64, attemptIDs []int64) error { var cancel context.CancelFunc ctx, cancel = o.stopCh.Ctx(ctx) diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index ed34e2b6d8b..a05cf3f9010 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -1981,6 +1981,33 @@ func TestORM_FindTxesByIDs(t *testing.T) { }) } +func TestORM_DeleteReceiptsByTxHash(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + txStore := cltest.NewTestTxStore(t, db) + ctx := tests.Context(t) + ethKeyStore := cltest.NewKeyStore(t, db).Eth() + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) + + etx1 := mustInsertConfirmedEthTxWithReceipt(t, txStore, fromAddress, 0, 100) + etx2 := mustInsertConfirmedEthTxWithReceipt(t, txStore, fromAddress, 2, 100) + + // Delete one transaction's receipt + err := txStore.DeleteReceiptByTxHash(ctx, etx1.TxAttempts[0].Hash) + require.NoError(t, err) + + // receipt has been deleted + etx1, err = txStore.FindTxWithAttempts(ctx, etx1.ID) + require.NoError(t, err) + require.Empty(t, etx1.TxAttempts[0].Receipts) + + // receipt still exists for other tx + etx2, err = txStore.FindTxWithAttempts(ctx, etx2.ID) + require.NoError(t, err) + require.Len(t, etx2.TxAttempts[0].Receipts, 1) +} + func mustInsertTerminallyStuckTxWithAttempt(t *testing.T, txStore txmgr.TestEvmTxStore, fromAddress common.Address, nonceInt int64, broadcastBeforeBlockNum int64) txmgr.Tx { ctx := tests.Context(t) broadcast := time.Now() diff --git a/core/chains/evm/txmgr/finalizer.go b/core/chains/evm/txmgr/finalizer.go index 1fd80f7e27a..e10c01a0ae2 100644 --- a/core/chains/evm/txmgr/finalizer.go +++ b/core/chains/evm/txmgr/finalizer.go @@ -61,6 +61,7 @@ var ( const processHeadTimeout = 10 * time.Minute type finalizerTxStore interface { + DeleteReceiptByTxHash(ctx context.Context, txHash common.Hash) error FindAttemptsRequiringReceiptFetch(ctx context.Context, chainID *big.Int) (hashes []TxAttempt, err error) FindConfirmedTxesReceipts(ctx context.Context, finalizedBlockNum int64, chainID *big.Int) (receipts []*evmtypes.Receipt, err error) FindTxesPendingCallback(ctx context.Context, latest, finalized int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) @@ -267,8 +268,13 @@ func (f *evmFinalizer) processFinalizedHead(ctx context.Context, latestFinalized // Receipt block hash does not match the block hash in chain. Transaction has been re-org'd out but DB state has not been updated yet if blockHashInChain.String() != receipt.BlockHash.String() { // Log error if a transaction is marked as confirmed with a receipt older than the finalized block - // This scenario could potentially point to a re-org'd transaction the Confirmer has lost track of - f.lggr.Errorw("found confirmed transaction with re-org'd receipt older than finalized block", "receipt", receipt, "onchainBlockHash", blockHashInChain.String()) + // This scenario could potentially be caused by a stale receipt stored for a re-org'd transaction + f.lggr.Debugw("found confirmed transaction with re-org'd receipt", "receipt", receipt, "onchainBlockHash", blockHashInChain.String()) + err = f.txStore.DeleteReceiptByTxHash(ctx, receipt.GetTxHash()) + // Log error but allow process to continue so other transactions can still be marked as finalized + if err != nil { + f.lggr.Errorw("failed to delete receipt", "receipt", receipt) + } continue } finalizedReceipts = append(finalizedReceipts, receipt) @@ -353,8 +359,13 @@ func (f *evmFinalizer) batchCheckReceiptHashesOnchain(ctx context.Context, block finalizedReceipts = append(finalizedReceipts, receipt) } else { // Log error if a transaction is marked as confirmed with a receipt older than the finalized block - // This scenario could potentially point to a re-org'd transaction the Confirmer has lost track of - f.lggr.Errorw("found confirmed transaction with re-org'd receipt older than finalized block", "receipt", receipt, "onchainBlockHash", head.BlockHash().String()) + // This scenario could potentially be caused by a stale receipt stored for a re-org'd transaction + f.lggr.Debugw("found confirmed transaction with re-org'd receipt", "receipt", receipt, "onchainBlockHash", head.BlockHash().String()) + err = f.txStore.DeleteReceiptByTxHash(ctx, receipt.GetTxHash()) + // Log error but allow process to continue so other transactions can still be marked as finalized + if err != nil { + f.lggr.Errorw("failed to delete receipt", "receipt", receipt) + } } } } diff --git a/core/chains/evm/txmgr/finalizer_test.go b/core/chains/evm/txmgr/finalizer_test.go index 4d520ab0bac..76338d31836 100644 --- a/core/chains/evm/txmgr/finalizer_test.go +++ b/core/chains/evm/txmgr/finalizer_test.go @@ -89,7 +89,7 @@ func TestFinalizer_MarkTxFinalized(t *testing.T) { require.Equal(t, txmgrcommon.TxConfirmed, tx.State) }) - t.Run("returns not finalized for tx with receipt re-org'd out", func(t *testing.T) { + t.Run("returns not finalized for tx with receipt re-org'd out and deletes stale receipt", func(t *testing.T) { finalizer := txmgr.NewEvmFinalizer(logger.Test(t), testutils.FixtureChainID, rpcBatchSize, false, txStore, txmClient, ht) servicetest.Run(t, finalizer) @@ -117,6 +117,8 @@ func TestFinalizer_MarkTxFinalized(t *testing.T) { tx, err = txStore.FindTxWithIdempotencyKey(ctx, idempotencyKey, testutils.FixtureChainID) require.NoError(t, err) require.Equal(t, txmgrcommon.TxConfirmed, tx.State) + require.Len(t, tx.TxAttempts, 1) + require.Empty(t, tx.TxAttempts[0].Receipts) }) t.Run("returns finalized for tx with receipt in a finalized block", func(t *testing.T) { diff --git a/core/chains/evm/txmgr/mocks/evm_tx_store.go b/core/chains/evm/txmgr/mocks/evm_tx_store.go index 27d9b1bf583..ca98ad6ceb8 100644 --- a/core/chains/evm/txmgr/mocks/evm_tx_store.go +++ b/core/chains/evm/txmgr/mocks/evm_tx_store.go @@ -446,6 +446,53 @@ func (_c *EvmTxStore_DeleteInProgressAttempt_Call) RunAndReturn(run func(context return _c } +// DeleteReceiptByTxHash provides a mock function with given fields: ctx, txHash +func (_m *EvmTxStore) DeleteReceiptByTxHash(ctx context.Context, txHash common.Hash) error { + ret := _m.Called(ctx, txHash) + + if len(ret) == 0 { + panic("no return value specified for DeleteReceiptByTxHash") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) error); ok { + r0 = rf(ctx, txHash) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// EvmTxStore_DeleteReceiptByTxHash_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteReceiptByTxHash' +type EvmTxStore_DeleteReceiptByTxHash_Call struct { + *mock.Call +} + +// DeleteReceiptByTxHash is a helper method to define mock.On call +// - ctx context.Context +// - txHash common.Hash +func (_e *EvmTxStore_Expecter) DeleteReceiptByTxHash(ctx interface{}, txHash interface{}) *EvmTxStore_DeleteReceiptByTxHash_Call { + return &EvmTxStore_DeleteReceiptByTxHash_Call{Call: _e.mock.On("DeleteReceiptByTxHash", ctx, txHash)} +} + +func (_c *EvmTxStore_DeleteReceiptByTxHash_Call) Run(run func(ctx context.Context, txHash common.Hash)) *EvmTxStore_DeleteReceiptByTxHash_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(common.Hash)) + }) + return _c +} + +func (_c *EvmTxStore_DeleteReceiptByTxHash_Call) Return(_a0 error) *EvmTxStore_DeleteReceiptByTxHash_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *EvmTxStore_DeleteReceiptByTxHash_Call) RunAndReturn(run func(context.Context, common.Hash) error) *EvmTxStore_DeleteReceiptByTxHash_Call { + _c.Call.Return(run) + return _c +} + // FindAttemptsRequiringReceiptFetch provides a mock function with given fields: ctx, chainID func (_m *EvmTxStore) FindAttemptsRequiringReceiptFetch(ctx context.Context, chainID *big.Int) ([]txmgr.TxAttempt, error) { ret := _m.Called(ctx, chainID)