Skip to content

Commit

Permalink
Updated finalization logic to delete stale receipts if detected
Browse files Browse the repository at this point in the history
  • Loading branch information
amit-momin committed Nov 14, 2024
1 parent 35b4631 commit 95e800c
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 5 deletions.
9 changes: 9 additions & 0 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type EvmTxStore interface {
TxStoreWebApi

// methods used solely in EVM components
DeleteReceiptByTxHash(ctx context.Context, txHash common.Hash) error
FindAttemptsRequiringReceiptFetch(ctx context.Context, chainID *big.Int) (hashes []TxAttempt, err error)
FindConfirmedTxesReceipts(ctx context.Context, finalizedBlockNum int64, chainID *big.Int) (receipts []*evmtypes.Receipt, err error)
FindTxesPendingCallback(ctx context.Context, latest, finalized int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error)
Expand Down Expand Up @@ -1091,6 +1092,14 @@ AND evm.tx_attempts.eth_tx_id = ANY($1)
return pkgerrors.Wrap(err, "deleteEthReceipts failed")
}

func (o *evmTxStore) DeleteReceiptByTxHash(ctx context.Context, txHash common.Hash) error {
var cancel context.CancelFunc
ctx, cancel = o.stopCh.Ctx(ctx)
defer cancel()
_, err := o.q.ExecContext(ctx, `DELETE FROM evm.receipts WHERE tx_hash = $1`, txHash)
return err
}

func (o *evmTxStore) UpdateTxsForRebroadcast(ctx context.Context, etxIDs []int64, attemptIDs []int64) error {
var cancel context.CancelFunc
ctx, cancel = o.stopCh.Ctx(ctx)
Expand Down
27 changes: 27 additions & 0 deletions core/chains/evm/txmgr/evm_tx_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1981,6 +1981,33 @@ func TestORM_FindTxesByIDs(t *testing.T) {
})
}

func TestORM_DeleteReceiptsByTxHash(t *testing.T) {
t.Parallel()

db := pgtest.NewSqlxDB(t)
txStore := cltest.NewTestTxStore(t, db)
ctx := tests.Context(t)
ethKeyStore := cltest.NewKeyStore(t, db).Eth()
_, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore)

etx1 := mustInsertConfirmedEthTxWithReceipt(t, txStore, fromAddress, 0, 100)
etx2 := mustInsertConfirmedEthTxWithReceipt(t, txStore, fromAddress, 2, 100)

// Delete one transaction's receipt
err := txStore.DeleteReceiptByTxHash(ctx, etx1.TxAttempts[0].Hash)
require.NoError(t, err)

// receipt has been deleted
etx1, err = txStore.FindTxWithAttempts(ctx, etx1.ID)
require.NoError(t, err)
require.Empty(t, etx1.TxAttempts[0].Receipts)

// receipt still exists for other tx
etx2, err = txStore.FindTxWithAttempts(ctx, etx2.ID)
require.NoError(t, err)
require.Len(t, etx2.TxAttempts[0].Receipts, 1)
}

func mustInsertTerminallyStuckTxWithAttempt(t *testing.T, txStore txmgr.TestEvmTxStore, fromAddress common.Address, nonceInt int64, broadcastBeforeBlockNum int64) txmgr.Tx {
ctx := tests.Context(t)
broadcast := time.Now()
Expand Down
19 changes: 15 additions & 4 deletions core/chains/evm/txmgr/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var (
const processHeadTimeout = 10 * time.Minute

type finalizerTxStore interface {
DeleteReceiptByTxHash(ctx context.Context, txHash common.Hash) error
FindAttemptsRequiringReceiptFetch(ctx context.Context, chainID *big.Int) (hashes []TxAttempt, err error)
FindConfirmedTxesReceipts(ctx context.Context, finalizedBlockNum int64, chainID *big.Int) (receipts []*evmtypes.Receipt, err error)
FindTxesPendingCallback(ctx context.Context, latest, finalized int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error)
Expand Down Expand Up @@ -267,8 +268,13 @@ func (f *evmFinalizer) processFinalizedHead(ctx context.Context, latestFinalized
// Receipt block hash does not match the block hash in chain. Transaction has been re-org'd out but DB state has not been updated yet
if blockHashInChain.String() != receipt.BlockHash.String() {
// Log error if a transaction is marked as confirmed with a receipt older than the finalized block
// This scenario could potentially point to a re-org'd transaction the Confirmer has lost track of
f.lggr.Errorw("found confirmed transaction with re-org'd receipt older than finalized block", "receipt", receipt, "onchainBlockHash", blockHashInChain.String())
// This scenario could potentially be caused by a stale receipt stored for a re-org'd transaction
f.lggr.Debugw("found confirmed transaction with re-org'd receipt", "receipt", receipt, "onchainBlockHash", blockHashInChain.String())
err = f.txStore.DeleteReceiptByTxHash(ctx, receipt.GetTxHash())
// Log error but allow process to continue so other transactions can still be marked as finalized
if err != nil {
f.lggr.Errorw("failed to delete receipt", "receipt", receipt)
}
continue
}
finalizedReceipts = append(finalizedReceipts, receipt)
Expand Down Expand Up @@ -353,8 +359,13 @@ func (f *evmFinalizer) batchCheckReceiptHashesOnchain(ctx context.Context, block
finalizedReceipts = append(finalizedReceipts, receipt)
} else {
// Log error if a transaction is marked as confirmed with a receipt older than the finalized block
// This scenario could potentially point to a re-org'd transaction the Confirmer has lost track of
f.lggr.Errorw("found confirmed transaction with re-org'd receipt older than finalized block", "receipt", receipt, "onchainBlockHash", head.BlockHash().String())
// This scenario could potentially be caused by a stale receipt stored for a re-org'd transaction
f.lggr.Debugw("found confirmed transaction with re-org'd receipt", "receipt", receipt, "onchainBlockHash", head.BlockHash().String())
err = f.txStore.DeleteReceiptByTxHash(ctx, receipt.GetTxHash())
// Log error but allow process to continue so other transactions can still be marked as finalized
if err != nil {
f.lggr.Errorw("failed to delete receipt", "receipt", receipt)
}
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion core/chains/evm/txmgr/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestFinalizer_MarkTxFinalized(t *testing.T) {
require.Equal(t, txmgrcommon.TxConfirmed, tx.State)
})

t.Run("returns not finalized for tx with receipt re-org'd out", func(t *testing.T) {
t.Run("returns not finalized for tx with receipt re-org'd out and deletes stale receipt", func(t *testing.T) {
finalizer := txmgr.NewEvmFinalizer(logger.Test(t), testutils.FixtureChainID, rpcBatchSize, false, txStore, txmClient, ht)
servicetest.Run(t, finalizer)

Expand Down Expand Up @@ -117,6 +117,8 @@ func TestFinalizer_MarkTxFinalized(t *testing.T) {
tx, err = txStore.FindTxWithIdempotencyKey(ctx, idempotencyKey, testutils.FixtureChainID)
require.NoError(t, err)
require.Equal(t, txmgrcommon.TxConfirmed, tx.State)
require.Len(t, tx.TxAttempts, 1)
require.Empty(t, tx.TxAttempts[0].Receipts)
})

t.Run("returns finalized for tx with receipt in a finalized block", func(t *testing.T) {
Expand Down
47 changes: 47 additions & 0 deletions core/chains/evm/txmgr/mocks/evm_tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 95e800c

Please sign in to comment.