From 2312827156f24fa4a6e420aec12e5a3aeac81e2b Mon Sep 17 00:00:00 2001
From: amit-momin <108959691+amit-momin@users.noreply.github.com>
Date: Tue, 6 Aug 2024 14:33:16 -0500
Subject: [PATCH] Add finalizer component to TXM (#13638)
* Added a finalizer component that assesses confirmed transactions for finality
* Moved Finalizer component into EVM code and addressed feedback
* Fixed linting and renumbered sql migration
* Added limit to Finalizer RPC batch calls
* Cleaned up unneeded code
* Renumbered sql migration
* Updated Finalizer to use LatestAndFinalizedBlock method from HeadTracker
* Fixed health check tests and fixed linting
* Fixed lint error
* Fixed lint error
* Added finalized state to replace finalized column
* Updated finalizer batch RPC validation to use blockByNumber and added filter to DB query
* Updated reaper to reap old confirmed transactions
* Fixed migration test
* Fixed lint error
* Changed log level
* Renumbered sql migration
* Updated Finalizer to only process on new finalized heads and improved query performance
* Fixed mocks
* Updated TxStore method name and fixed mocks
* Fixed mock
* Updated TxStore method to exit early
* Removed unused error
---------
Co-authored-by: Silas Lenihan <32529249+silaslenihan@users.noreply.github.com>
---
.changeset/itchy-bugs-clean.md | 5 +
common/txmgr/models.go | 1 +
common/txmgr/reaper.go | 9 +-
common/txmgr/txmgr.go | 20 +-
common/txmgr/types/config.go | 6 -
common/txmgr/types/finalizer.go | 12 +
.../txmgr/types/mocks/reaper_chain_config.go | 77 -----
common/txmgr/types/mocks/tx_store.go | 80 +----
common/txmgr/types/tx_store.go | 3 +-
.../evm/headtracker/simulated_head_tracker.go | 29 ++
core/chains/evm/txmgr/builder.go | 12 +-
core/chains/evm/txmgr/client.go | 4 +
core/chains/evm/txmgr/config.go | 1 -
core/chains/evm/txmgr/evm_tx_store.go | 124 ++++++--
core/chains/evm/txmgr/evm_tx_store_test.go | 83 +++--
core/chains/evm/txmgr/finalizer.go | 294 ++++++++++++++++++
core/chains/evm/txmgr/finalizer_test.go | 240 ++++++++++++++
core/chains/evm/txmgr/mocks/evm_tx_store.go | 190 ++++++-----
core/chains/evm/txmgr/models.go | 3 +-
core/chains/evm/txmgr/reaper_test.go | 64 ++--
core/chains/evm/txmgr/test_helpers.go | 13 +-
core/chains/evm/txmgr/txmgr_test.go | 72 ++++-
core/chains/legacyevm/chain.go | 2 +-
core/chains/legacyevm/evm_txm.go | 5 +-
.../promreporter/prom_reporter_test.go | 3 +-
core/services/vrf/delegate_test.go | 2 +-
core/services/vrf/v2/integration_v2_test.go | 2 +-
core/services/vrf/v2/listener_v2_test.go | 2 +-
core/store/migrate/migrate_test.go | 11 +
.../0248_add_tx_finalized_state.sql | 135 ++++++++
core/web/testdata/body/health.html | 3 +
core/web/testdata/body/health.json | 9 +
core/web/testdata/body/health.txt | 1 +
testdata/scripts/health/multi-chain.txtar | 10 +
34 files changed, 1170 insertions(+), 357 deletions(-)
create mode 100644 .changeset/itchy-bugs-clean.md
create mode 100644 common/txmgr/types/finalizer.go
delete mode 100644 common/txmgr/types/mocks/reaper_chain_config.go
create mode 100644 core/chains/evm/txmgr/finalizer.go
create mode 100644 core/chains/evm/txmgr/finalizer_test.go
create mode 100644 core/store/migrate/migrations/0248_add_tx_finalized_state.sql
diff --git a/.changeset/itchy-bugs-clean.md b/.changeset/itchy-bugs-clean.md
new file mode 100644
index 00000000000..beeed8ace1e
--- /dev/null
+++ b/.changeset/itchy-bugs-clean.md
@@ -0,0 +1,5 @@
+---
+"chainlink": minor
+---
+
+Introduced finalized transaction state. Added a finalizer component to the TXM to mark transactions as finalized. #internal
diff --git a/common/txmgr/models.go b/common/txmgr/models.go
index dd121a2c7c4..ca5e7d4f251 100644
--- a/common/txmgr/models.go
+++ b/common/txmgr/models.go
@@ -11,4 +11,5 @@ const (
TxUnconfirmed = txmgrtypes.TxState("unconfirmed")
TxConfirmed = txmgrtypes.TxState("confirmed")
TxConfirmedMissingReceipt = txmgrtypes.TxState("confirmed_missing_receipt")
+ TxFinalized = txmgrtypes.TxState("finalized")
)
diff --git a/common/txmgr/reaper.go b/common/txmgr/reaper.go
index 932b58f6430..0c797548b16 100644
--- a/common/txmgr/reaper.go
+++ b/common/txmgr/reaper.go
@@ -14,7 +14,6 @@ import (
// Reaper handles periodic database cleanup for Txm
type Reaper[CHAIN_ID types.ID] struct {
store txmgrtypes.TxHistoryReaper[CHAIN_ID]
- config txmgrtypes.ReaperChainConfig
txConfig txmgrtypes.ReaperTransactionsConfig
chainID CHAIN_ID
log logger.Logger
@@ -25,10 +24,9 @@ type Reaper[CHAIN_ID types.ID] struct {
}
// NewReaper instantiates a new reaper object
-func NewReaper[CHAIN_ID types.ID](lggr logger.Logger, store txmgrtypes.TxHistoryReaper[CHAIN_ID], config txmgrtypes.ReaperChainConfig, txConfig txmgrtypes.ReaperTransactionsConfig, chainID CHAIN_ID) *Reaper[CHAIN_ID] {
+func NewReaper[CHAIN_ID types.ID](lggr logger.Logger, store txmgrtypes.TxHistoryReaper[CHAIN_ID], txConfig txmgrtypes.ReaperTransactionsConfig, chainID CHAIN_ID) *Reaper[CHAIN_ID] {
r := &Reaper[CHAIN_ID]{
store,
- config,
txConfig,
chainID,
logger.Named(lggr, "Reaper"),
@@ -103,13 +101,12 @@ func (r *Reaper[CHAIN_ID]) ReapTxes(headNum int64) error {
r.log.Debug("Transactions.ReaperThreshold set to 0; skipping ReapTxes")
return nil
}
- minBlockNumberToKeep := headNum - int64(r.config.FinalityDepth())
mark := time.Now()
timeThreshold := mark.Add(-threshold)
- r.log.Debugw(fmt.Sprintf("reaping old txes created before %s", timeThreshold.Format(time.RFC3339)), "ageThreshold", threshold, "timeThreshold", timeThreshold, "minBlockNumberToKeep", minBlockNumberToKeep)
+ r.log.Debugw(fmt.Sprintf("reaping old txes created before %s", timeThreshold.Format(time.RFC3339)), "ageThreshold", threshold, "timeThreshold", timeThreshold)
- if err := r.store.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, r.chainID); err != nil {
+ if err := r.store.ReapTxHistory(ctx, timeThreshold, r.chainID); err != nil {
return err
}
diff --git a/common/txmgr/txmgr.go b/common/txmgr/txmgr.go
index fc27e930c37..49ac8a89b73 100644
--- a/common/txmgr/txmgr.go
+++ b/common/txmgr/txmgr.go
@@ -108,6 +108,7 @@ type Txm[
broadcaster *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
confirmer *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
+ finalizer txmgrtypes.Finalizer[BLOCK_HASH, HEAD]
fwdMgr txmgrtypes.ForwarderManager[ADDR]
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
newErrorClassifier NewErrorClassifier
@@ -143,6 +144,7 @@ func NewTxm[
confirmer *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
resender *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
+ finalizer txmgrtypes.Finalizer[BLOCK_HASH, HEAD],
newErrorClassifierFunc NewErrorClassifier,
) *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
b := Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{
@@ -165,13 +167,14 @@ func NewTxm[
resender: resender,
tracker: tracker,
newErrorClassifier: newErrorClassifierFunc,
+ finalizer: finalizer,
}
if txCfg.ResendAfterThreshold() <= 0 {
b.logger.Info("Resender: Disabled")
}
if txCfg.ReaperThreshold() > 0 && txCfg.ReaperInterval() > 0 {
- b.reaper = NewReaper[CHAIN_ID](lggr, b.txStore, cfg, txCfg, chainId)
+ b.reaper = NewReaper[CHAIN_ID](lggr, b.txStore, txCfg, chainId)
} else {
b.logger.Info("TxReaper: Disabled")
}
@@ -199,6 +202,10 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx
return fmt.Errorf("Txm: Tracker failed to start: %w", err)
}
+ if err := ms.Start(ctx, b.finalizer); err != nil {
+ return fmt.Errorf("Txm: Finalizer failed to start: %w", err)
+ }
+
b.logger.Info("Txm starting runLoop")
b.wg.Add(1)
go b.runLoop()
@@ -293,6 +300,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) HealthRepo
services.CopyHealth(report, b.broadcaster.HealthReport())
services.CopyHealth(report, b.confirmer.HealthReport())
services.CopyHealth(report, b.txAttemptBuilder.HealthReport())
+ services.CopyHealth(report, b.finalizer.HealthReport())
})
if b.txConfig.ForwardersEnabled() {
@@ -415,6 +423,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
case head := <-b.chHeads:
b.confirmer.mb.Deliver(head)
b.tracker.mb.Deliver(head.BlockNumber())
+ b.finalizer.DeliverLatestHead(head)
case reset := <-b.reset:
// This check prevents the weird edge-case where you can select
// into this block after chStop has already been closed and the
@@ -446,6 +455,10 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
if err != nil && (!errors.Is(err, services.ErrAlreadyStopped) || !errors.Is(err, services.ErrCannotStopUnstarted)) {
b.logger.Errorw(fmt.Sprintf("Failed to Close Tracker: %v", err), "err", err)
}
+ err = b.finalizer.Close()
+ if err != nil && (!errors.Is(err, services.ErrAlreadyStopped) || !errors.Is(err, services.ErrCannotStopUnstarted)) {
+ b.logger.Errorw(fmt.Sprintf("Failed to Close Finalizer: %v", err), "err", err)
+ }
return
case <-keysChanged:
// This check prevents the weird edge-case where you can select
@@ -644,9 +657,10 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTransac
// Return unconfirmed for ConfirmedMissingReceipt since a receipt is required to determine if it is finalized
return commontypes.Unconfirmed, nil
case TxConfirmed:
- // TODO: Check for finality and return finalized status
- // Return unconfirmed if tx receipt's block is newer than the latest finalized block
+ // Return unconfirmed for confirmed transactions because they are not yet finalized
return commontypes.Unconfirmed, nil
+ case TxFinalized:
+ return commontypes.Finalized, nil
case TxFatalError:
// Use an ErrorClassifier to determine if the transaction is considered Fatal
txErr := b.newErrorClassifier(tx.GetError())
diff --git a/common/txmgr/types/config.go b/common/txmgr/types/config.go
index 4d9af5f0673..8b11a45d11d 100644
--- a/common/txmgr/types/config.go
+++ b/common/txmgr/types/config.go
@@ -5,7 +5,6 @@ import "time"
type TransactionManagerChainConfig interface {
BroadcasterChainConfig
ConfirmerChainConfig
- ReaperChainConfig
}
type TransactionManagerFeeConfig interface {
@@ -74,11 +73,6 @@ type ResenderTransactionsConfig interface {
MaxInFlight() uint32
}
-// ReaperConfig is the config subset used by the reaper
-type ReaperChainConfig interface {
- FinalityDepth() uint32
-}
-
type ReaperTransactionsConfig interface {
ReaperInterval() time.Duration
ReaperThreshold() time.Duration
diff --git a/common/txmgr/types/finalizer.go b/common/txmgr/types/finalizer.go
new file mode 100644
index 00000000000..be3c897d0e2
--- /dev/null
+++ b/common/txmgr/types/finalizer.go
@@ -0,0 +1,12 @@
+package types
+
+import (
+ "github.com/smartcontractkit/chainlink-common/pkg/services"
+ "github.com/smartcontractkit/chainlink/v2/common/types"
+)
+
+type Finalizer[BLOCK_HASH types.Hashable, HEAD types.Head[BLOCK_HASH]] interface {
+ // interfaces for running the underlying estimator
+ services.Service
+ DeliverLatestHead(head HEAD) bool
+}
diff --git a/common/txmgr/types/mocks/reaper_chain_config.go b/common/txmgr/types/mocks/reaper_chain_config.go
deleted file mode 100644
index 0531b071708..00000000000
--- a/common/txmgr/types/mocks/reaper_chain_config.go
+++ /dev/null
@@ -1,77 +0,0 @@
-// Code generated by mockery v2.43.2. DO NOT EDIT.
-
-package mocks
-
-import mock "github.com/stretchr/testify/mock"
-
-// ReaperConfig is an autogenerated mock type for the ReaperChainConfig type
-type ReaperConfig struct {
- mock.Mock
-}
-
-type ReaperConfig_Expecter struct {
- mock *mock.Mock
-}
-
-func (_m *ReaperConfig) EXPECT() *ReaperConfig_Expecter {
- return &ReaperConfig_Expecter{mock: &_m.Mock}
-}
-
-// FinalityDepth provides a mock function with given fields:
-func (_m *ReaperConfig) FinalityDepth() uint32 {
- ret := _m.Called()
-
- if len(ret) == 0 {
- panic("no return value specified for FinalityDepth")
- }
-
- var r0 uint32
- if rf, ok := ret.Get(0).(func() uint32); ok {
- r0 = rf()
- } else {
- r0 = ret.Get(0).(uint32)
- }
-
- return r0
-}
-
-// ReaperConfig_FinalityDepth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FinalityDepth'
-type ReaperConfig_FinalityDepth_Call struct {
- *mock.Call
-}
-
-// FinalityDepth is a helper method to define mock.On call
-func (_e *ReaperConfig_Expecter) FinalityDepth() *ReaperConfig_FinalityDepth_Call {
- return &ReaperConfig_FinalityDepth_Call{Call: _e.mock.On("FinalityDepth")}
-}
-
-func (_c *ReaperConfig_FinalityDepth_Call) Run(run func()) *ReaperConfig_FinalityDepth_Call {
- _c.Call.Run(func(args mock.Arguments) {
- run()
- })
- return _c
-}
-
-func (_c *ReaperConfig_FinalityDepth_Call) Return(_a0 uint32) *ReaperConfig_FinalityDepth_Call {
- _c.Call.Return(_a0)
- return _c
-}
-
-func (_c *ReaperConfig_FinalityDepth_Call) RunAndReturn(run func() uint32) *ReaperConfig_FinalityDepth_Call {
- _c.Call.Return(run)
- return _c
-}
-
-// NewReaperConfig creates a new instance of ReaperConfig. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
-// The first argument is typically a *testing.T value.
-func NewReaperConfig(t interface {
- mock.TestingT
- Cleanup(func())
-}) *ReaperConfig {
- mock := &ReaperConfig{}
- mock.Mock.Test(t)
-
- t.Cleanup(func() { mock.AssertExpectations(t) })
-
- return mock
-}
diff --git a/common/txmgr/types/mocks/tx_store.go b/common/txmgr/types/mocks/tx_store.go
index ee166638e34..0b9c7110660 100644
--- a/common/txmgr/types/mocks/tx_store.go
+++ b/common/txmgr/types/mocks/tx_store.go
@@ -1760,65 +1760,6 @@ func (_c *TxStore_HasInProgressTransaction_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_H
return _c
}
-// IsTxFinalized provides a mock function with given fields: ctx, blockHeight, txID, chainID
-func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID CHAIN_ID) (bool, error) {
- ret := _m.Called(ctx, blockHeight, txID, chainID)
-
- if len(ret) == 0 {
- panic("no return value specified for IsTxFinalized")
- }
-
- var r0 bool
- var r1 error
- if rf, ok := ret.Get(0).(func(context.Context, int64, int64, CHAIN_ID) (bool, error)); ok {
- return rf(ctx, blockHeight, txID, chainID)
- }
- if rf, ok := ret.Get(0).(func(context.Context, int64, int64, CHAIN_ID) bool); ok {
- r0 = rf(ctx, blockHeight, txID, chainID)
- } else {
- r0 = ret.Get(0).(bool)
- }
-
- if rf, ok := ret.Get(1).(func(context.Context, int64, int64, CHAIN_ID) error); ok {
- r1 = rf(ctx, blockHeight, txID, chainID)
- } else {
- r1 = ret.Error(1)
- }
-
- return r0, r1
-}
-
-// TxStore_IsTxFinalized_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsTxFinalized'
-type TxStore_IsTxFinalized_Call[ADDR types.Hashable, CHAIN_ID types.ID, TX_HASH types.Hashable, BLOCK_HASH types.Hashable, R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], SEQ types.Sequence, FEE feetypes.Fee] struct {
- *mock.Call
-}
-
-// IsTxFinalized is a helper method to define mock.On call
-// - ctx context.Context
-// - blockHeight int64
-// - txID int64
-// - chainID CHAIN_ID
-func (_e *TxStore_Expecter[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) IsTxFinalized(ctx interface{}, blockHeight interface{}, txID interface{}, chainID interface{}) *TxStore_IsTxFinalized_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
- return &TxStore_IsTxFinalized_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{Call: _e.mock.On("IsTxFinalized", ctx, blockHeight, txID, chainID)}
-}
-
-func (_c *TxStore_IsTxFinalized_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Run(run func(ctx context.Context, blockHeight int64, txID int64, chainID CHAIN_ID)) *TxStore_IsTxFinalized_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
- _c.Call.Run(func(args mock.Arguments) {
- run(args[0].(context.Context), args[1].(int64), args[2].(int64), args[3].(CHAIN_ID))
- })
- return _c
-}
-
-func (_c *TxStore_IsTxFinalized_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Return(finalized bool, err error) *TxStore_IsTxFinalized_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
- _c.Call.Return(finalized, err)
- return _c
-}
-
-func (_c *TxStore_IsTxFinalized_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RunAndReturn(run func(context.Context, int64, int64, CHAIN_ID) (bool, error)) *TxStore_IsTxFinalized_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
- _c.Call.Return(run)
- return _c
-}
-
// LoadTxAttempts provides a mock function with given fields: ctx, etx
func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) LoadTxAttempts(ctx context.Context, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
ret := _m.Called(ctx, etx)
@@ -2069,17 +2010,17 @@ func (_c *TxStore_PruneUnstartedTxQueue_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH
return _c
}
-// ReapTxHistory provides a mock function with given fields: ctx, minBlockNumberToKeep, timeThreshold, chainID
-func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ReapTxHistory(ctx context.Context, minBlockNumberToKeep int64, timeThreshold time.Time, chainID CHAIN_ID) error {
- ret := _m.Called(ctx, minBlockNumberToKeep, timeThreshold, chainID)
+// ReapTxHistory provides a mock function with given fields: ctx, timeThreshold, chainID
+func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ReapTxHistory(ctx context.Context, timeThreshold time.Time, chainID CHAIN_ID) error {
+ ret := _m.Called(ctx, timeThreshold, chainID)
if len(ret) == 0 {
panic("no return value specified for ReapTxHistory")
}
var r0 error
- if rf, ok := ret.Get(0).(func(context.Context, int64, time.Time, CHAIN_ID) error); ok {
- r0 = rf(ctx, minBlockNumberToKeep, timeThreshold, chainID)
+ if rf, ok := ret.Get(0).(func(context.Context, time.Time, CHAIN_ID) error); ok {
+ r0 = rf(ctx, timeThreshold, chainID)
} else {
r0 = ret.Error(0)
}
@@ -2094,16 +2035,15 @@ type TxStore_ReapTxHistory_Call[ADDR types.Hashable, CHAIN_ID types.ID, TX_HASH
// ReapTxHistory is a helper method to define mock.On call
// - ctx context.Context
-// - minBlockNumberToKeep int64
// - timeThreshold time.Time
// - chainID CHAIN_ID
-func (_e *TxStore_Expecter[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ReapTxHistory(ctx interface{}, minBlockNumberToKeep interface{}, timeThreshold interface{}, chainID interface{}) *TxStore_ReapTxHistory_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
- return &TxStore_ReapTxHistory_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{Call: _e.mock.On("ReapTxHistory", ctx, minBlockNumberToKeep, timeThreshold, chainID)}
+func (_e *TxStore_Expecter[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ReapTxHistory(ctx interface{}, timeThreshold interface{}, chainID interface{}) *TxStore_ReapTxHistory_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
+ return &TxStore_ReapTxHistory_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{Call: _e.mock.On("ReapTxHistory", ctx, timeThreshold, chainID)}
}
-func (_c *TxStore_ReapTxHistory_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Run(run func(ctx context.Context, minBlockNumberToKeep int64, timeThreshold time.Time, chainID CHAIN_ID)) *TxStore_ReapTxHistory_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
+func (_c *TxStore_ReapTxHistory_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Run(run func(ctx context.Context, timeThreshold time.Time, chainID CHAIN_ID)) *TxStore_ReapTxHistory_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
_c.Call.Run(func(args mock.Arguments) {
- run(args[0].(context.Context), args[1].(int64), args[2].(time.Time), args[3].(CHAIN_ID))
+ run(args[0].(context.Context), args[1].(time.Time), args[2].(CHAIN_ID))
})
return _c
}
@@ -2113,7 +2053,7 @@ func (_c *TxStore_ReapTxHistory_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ
return _c
}
-func (_c *TxStore_ReapTxHistory_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RunAndReturn(run func(context.Context, int64, time.Time, CHAIN_ID) error) *TxStore_ReapTxHistory_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
+func (_c *TxStore_ReapTxHistory_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RunAndReturn(run func(context.Context, time.Time, CHAIN_ID) error) *TxStore_ReapTxHistory_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
_c.Call.Return(run)
return _c
}
diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go
index 875339cfbac..63b56dd169a 100644
--- a/common/txmgr/types/tx_store.go
+++ b/common/txmgr/types/tx_store.go
@@ -105,11 +105,10 @@ type TransactionStore[
UpdateTxUnstartedToInProgress(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt *TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error
UpdateTxFatalError(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error
UpdateTxForRebroadcast(ctx context.Context, etx Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], etxAttempt TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error
- IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID CHAIN_ID) (finalized bool, err error)
}
type TxHistoryReaper[CHAIN_ID types.ID] interface {
- ReapTxHistory(ctx context.Context, minBlockNumberToKeep int64, timeThreshold time.Time, chainID CHAIN_ID) error
+ ReapTxHistory(ctx context.Context, timeThreshold time.Time, chainID CHAIN_ID) error
}
type UnstartedTxQueuePruner interface {
diff --git a/core/chains/evm/headtracker/simulated_head_tracker.go b/core/chains/evm/headtracker/simulated_head_tracker.go
index e1e550de992..62bb4968c2f 100644
--- a/core/chains/evm/headtracker/simulated_head_tracker.go
+++ b/core/chains/evm/headtracker/simulated_head_tracker.go
@@ -2,6 +2,7 @@ package headtracker
import (
"context"
+ "errors"
"fmt"
"math/big"
@@ -51,3 +52,31 @@ func (ht *simulatedHeadTracker) LatestAndFinalizedBlock(ctx context.Context) (*e
return latest, finalizedBlock, nil
}
+
+func (ht *simulatedHeadTracker) LatestChain() *evmtypes.Head {
+ return nil
+}
+
+func (ht *simulatedHeadTracker) HealthReport() map[string]error {
+ return nil
+}
+
+func (ht *simulatedHeadTracker) Start(_ context.Context) error {
+ return nil
+}
+
+func (ht *simulatedHeadTracker) Close() error {
+ return nil
+}
+
+func (ht *simulatedHeadTracker) Backfill(_ context.Context, _ *evmtypes.Head) error {
+ return errors.New("unimplemented")
+}
+
+func (ht *simulatedHeadTracker) Name() string {
+ return "SimulatedHeadTracker"
+}
+
+func (ht *simulatedHeadTracker) Ready() error {
+ return nil
+}
diff --git a/core/chains/evm/txmgr/builder.go b/core/chains/evm/txmgr/builder.go
index 8234d55b960..d85d6acdc8c 100644
--- a/core/chains/evm/txmgr/builder.go
+++ b/core/chains/evm/txmgr/builder.go
@@ -13,6 +13,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/config/chaintype"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/forwarders"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"
+ httypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/keystore"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
@@ -32,6 +33,7 @@ func NewTxm(
logPoller logpoller.LogPoller,
keyStore keystore.Eth,
estimator gas.EvmFeeEstimator,
+ headTracker httypes.HeadTracker,
) (txm TxManager,
err error,
) {
@@ -54,11 +56,12 @@ func NewTxm(
evmTracker := NewEvmTracker(txStore, keyStore, chainID, lggr)
stuckTxDetector := NewStuckTxDetector(lggr, client.ConfiguredChainID(), chainConfig.ChainType(), fCfg.PriceMax(), txConfig.AutoPurge(), estimator, txStore, client)
evmConfirmer := NewEvmConfirmer(txStore, txmClient, txmCfg, feeCfg, txConfig, dbConfig, keyStore, txAttemptBuilder, lggr, stuckTxDetector)
+ evmFinalizer := NewEvmFinalizer(lggr, client.ConfiguredChainID(), chainConfig.RPCDefaultBatchSize(), txStore, client, headTracker)
var evmResender *Resender
if txConfig.ResendAfterThreshold() > 0 {
evmResender = NewEvmResender(lggr, txStore, txmClient, evmTracker, keyStore, txmgr.DefaultResenderPollInterval, chainConfig, txConfig)
}
- txm = NewEvmTxm(chainID, txmCfg, txConfig, keyStore, lggr, checker, fwdMgr, txAttemptBuilder, txStore, evmBroadcaster, evmConfirmer, evmResender, evmTracker)
+ txm = NewEvmTxm(chainID, txmCfg, txConfig, keyStore, lggr, checker, fwdMgr, txAttemptBuilder, txStore, evmBroadcaster, evmConfirmer, evmResender, evmTracker, evmFinalizer)
return txm, nil
}
@@ -77,8 +80,9 @@ func NewEvmTxm(
confirmer *Confirmer,
resender *Resender,
tracker *Tracker,
+ finalizer Finalizer,
) *Txm {
- return txmgr.NewTxm(chainId, cfg, txCfg, keyStore, lggr, checkerFactory, fwdMgr, txAttemptBuilder, txStore, broadcaster, confirmer, resender, tracker, client.NewTxError)
+ return txmgr.NewTxm(chainId, cfg, txCfg, keyStore, lggr, checkerFactory, fwdMgr, txAttemptBuilder, txStore, broadcaster, confirmer, resender, tracker, finalizer, client.NewTxError)
}
// NewEvmResender creates a new concrete EvmResender
@@ -96,8 +100,8 @@ func NewEvmResender(
}
// NewEvmReaper instantiates a new EVM-specific reaper object
-func NewEvmReaper(lggr logger.Logger, store txmgrtypes.TxHistoryReaper[*big.Int], config EvmReaperConfig, txConfig txmgrtypes.ReaperTransactionsConfig, chainID *big.Int) *Reaper {
- return txmgr.NewReaper(lggr, store, config, txConfig, chainID)
+func NewEvmReaper(lggr logger.Logger, store txmgrtypes.TxHistoryReaper[*big.Int], txConfig txmgrtypes.ReaperTransactionsConfig, chainID *big.Int) *Reaper {
+ return txmgr.NewReaper(lggr, store, txConfig, chainID)
}
// NewEvmConfirmer instantiates a new EVM confirmer
diff --git a/core/chains/evm/txmgr/client.go b/core/chains/evm/txmgr/client.go
index 661a180af50..e995080a260 100644
--- a/core/chains/evm/txmgr/client.go
+++ b/core/chains/evm/txmgr/client.go
@@ -183,3 +183,7 @@ func (c *evmTxmClient) CallContract(ctx context.Context, a TxAttempt, blockNumbe
}, blockNumber)
return client.ExtractRPCError(errCall)
}
+
+func (c *evmTxmClient) HeadByHash(ctx context.Context, hash common.Hash) (*evmtypes.Head, error) {
+ return c.client.HeadByHash(ctx, hash)
+}
diff --git a/core/chains/evm/txmgr/config.go b/core/chains/evm/txmgr/config.go
index b53f99840b9..af20c9a5901 100644
--- a/core/chains/evm/txmgr/config.go
+++ b/core/chains/evm/txmgr/config.go
@@ -48,7 +48,6 @@ type (
EvmBroadcasterConfig txmgrtypes.BroadcasterChainConfig
EvmConfirmerConfig txmgrtypes.ConfirmerChainConfig
EvmResenderConfig txmgrtypes.ResenderChainConfig
- EvmReaperConfig txmgrtypes.ReaperChainConfig
)
var _ EvmTxmConfig = (*evmTxmConfig)(nil)
diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go
index e83a83907e4..45de437e443 100644
--- a/core/chains/evm/txmgr/evm_tx_store.go
+++ b/core/chains/evm/txmgr/evm_tx_store.go
@@ -44,6 +44,10 @@ type EvmTxStore interface {
// redeclare TxStore for mockery
txmgrtypes.TxStore[common.Address, *big.Int, common.Hash, common.Hash, *evmtypes.Receipt, evmtypes.Nonce, gas.EvmFee]
TxStoreWebApi
+
+ // methods used solely in EVM components
+ FindConfirmedTxesReceipts(ctx context.Context, finalizedBlockNum int64, chainID *big.Int) (receipts []Receipt, err error)
+ UpdateTxStatesToFinalizedUsingReceiptIds(ctx context.Context, etxIDs []int64, chainId *big.Int) error
}
// TxStoreWebApi encapsulates the methods that are not used by the txmgr and only used by the various web controllers, readers, or evm specific components
@@ -87,7 +91,7 @@ var _ TestEvmTxStore = (*evmTxStore)(nil)
// Directly maps to columns of database table "evm.receipts".
// Do not modify type unless you
// intend to modify the database schema
-type dbReceipt struct {
+type DbReceipt struct {
ID int64
TxHash common.Hash
BlockHash common.Hash
@@ -97,8 +101,8 @@ type dbReceipt struct {
CreatedAt time.Time
}
-func DbReceiptFromEvmReceipt(evmReceipt *evmtypes.Receipt) dbReceipt {
- return dbReceipt{
+func DbReceiptFromEvmReceipt(evmReceipt *evmtypes.Receipt) DbReceipt {
+ return DbReceipt{
TxHash: evmReceipt.TxHash,
BlockHash: evmReceipt.BlockHash,
BlockNumber: evmReceipt.BlockNumber.Int64(),
@@ -107,7 +111,7 @@ func DbReceiptFromEvmReceipt(evmReceipt *evmtypes.Receipt) dbReceipt {
}
}
-func DbReceiptToEvmReceipt(receipt *dbReceipt) *evmtypes.Receipt {
+func DbReceiptToEvmReceipt(receipt *DbReceipt) *evmtypes.Receipt {
return &receipt.Receipt
}
@@ -131,7 +135,7 @@ type dbReceiptPlus struct {
FailOnRevert bool `db:"FailOnRevert"`
}
-func fromDBReceipts(rs []dbReceipt) []*evmtypes.Receipt {
+func fromDBReceipts(rs []DbReceipt) []*evmtypes.Receipt {
receipts := make([]*evmtypes.Receipt, len(rs))
for i := 0; i < len(rs); i++ {
receipts[i] = DbReceiptToEvmReceipt(&rs[i])
@@ -677,7 +681,7 @@ func (o *evmTxStore) loadEthTxesAttemptsReceipts(ctx context.Context, etxs []*Tx
attemptHashes = append(attemptHashes, attempt.Hash.Bytes())
}
}
- var rs []dbReceipt
+ var rs []DbReceipt
if err = o.q.SelectContext(ctx, &rs, `SELECT * FROM evm.receipts WHERE tx_hash = ANY($1)`, pq.Array(attemptHashes)); err != nil {
return pkgerrors.Wrap(err, "loadEthTxesAttemptsReceipts failed to load evm.receipts")
}
@@ -700,7 +704,7 @@ func loadConfirmedAttemptsReceipts(ctx context.Context, q sqlutil.DataSource, at
byHash[attempt.Hash.String()] = &attempts[i]
hashes = append(hashes, attempt.Hash.Bytes())
}
- var rs []dbReceipt
+ var rs []DbReceipt
if err := q.SelectContext(ctx, &rs, `SELECT * FROM evm.receipts WHERE tx_hash = ANY($1)`, pq.Array(hashes)); err != nil {
return pkgerrors.Wrap(err, "loadConfirmedAttemptsReceipts failed to load evm.receipts")
}
@@ -1116,7 +1120,7 @@ func updateEthTxAttemptUnbroadcast(ctx context.Context, orm *evmTxStore, attempt
func updateEthTxUnconfirm(ctx context.Context, orm *evmTxStore, etx Tx) error {
if etx.State != txmgr.TxConfirmed {
- return errors.New("expected eth_tx state to be confirmed")
+ return errors.New("expected tx state to be confirmed")
}
_, err := orm.q.ExecContext(ctx, `UPDATE evm.txes SET state = 'unconfirmed' WHERE id = $1`, etx.ID)
return pkgerrors.Wrap(err, "updateEthTxUnconfirm failed")
@@ -1205,24 +1209,6 @@ AND evm_chain_id = $1`, chainID.String()).Scan(&earliestUnconfirmedTxBlock)
return earliestUnconfirmedTxBlock, err
}
-func (o *evmTxStore) IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID *big.Int) (finalized bool, err error) {
- var cancel context.CancelFunc
- ctx, cancel = o.stopCh.Ctx(ctx)
- defer cancel()
-
- var count int32
- err = o.q.GetContext(ctx, &count, `
- SELECT COUNT(evm.receipts.receipt) FROM evm.txes
- INNER JOIN evm.tx_attempts ON evm.txes.id = evm.tx_attempts.eth_tx_id
- INNER JOIN evm.receipts ON evm.tx_attempts.hash = evm.receipts.tx_hash
- WHERE evm.receipts.block_number <= ($1 - evm.txes.min_confirmations)
- AND evm.txes.id = $2 AND evm.txes.evm_chain_id = $3`, blockHeight, txID, chainID.String())
- if err != nil {
- return false, fmt.Errorf("failed to retrieve transaction reciepts: %w", err)
- }
- return count > 0, nil
-}
-
func (o *evmTxStore) saveAttemptWithNewState(ctx context.Context, attempt TxAttempt, broadcastAt time.Time) error {
var dbAttempt DbEthTxAttempt
dbAttempt.FromTxAttempt(&attempt)
@@ -1872,7 +1858,7 @@ id < (
return
}
-func (o *evmTxStore) ReapTxHistory(ctx context.Context, minBlockNumberToKeep int64, timeThreshold time.Time, chainID *big.Int) error {
+func (o *evmTxStore) ReapTxHistory(ctx context.Context, timeThreshold time.Time, chainID *big.Int) error {
var cancel context.CancelFunc
ctx, cancel = o.stopCh.Ctx(ctx)
defer cancel()
@@ -1885,19 +1871,18 @@ func (o *evmTxStore) ReapTxHistory(ctx context.Context, minBlockNumberToKeep int
res, err := o.q.ExecContext(ctx, `
WITH old_enough_receipts AS (
SELECT tx_hash FROM evm.receipts
- WHERE block_number < $1
ORDER BY block_number ASC, id ASC
- LIMIT $2
+ LIMIT $1
)
DELETE FROM evm.txes
USING old_enough_receipts, evm.tx_attempts
WHERE evm.tx_attempts.eth_tx_id = evm.txes.id
AND evm.tx_attempts.hash = old_enough_receipts.tx_hash
-AND evm.txes.created_at < $3
-AND evm.txes.state = 'confirmed'
-AND evm_chain_id = $4`, minBlockNumberToKeep, limit, timeThreshold, chainID.String())
+AND evm.txes.created_at < $2
+AND evm.txes.state = 'finalized'
+AND evm_chain_id = $3`, limit, timeThreshold, chainID.String())
if err != nil {
- return count, pkgerrors.Wrap(err, "ReapTxes failed to delete old confirmed evm.txes")
+ return count, pkgerrors.Wrap(err, "ReapTxes failed to delete old finalized evm.txes")
}
rowsAffected, err := res.RowsAffected()
if err != nil {
@@ -1906,7 +1891,7 @@ AND evm_chain_id = $4`, minBlockNumberToKeep, limit, timeThreshold, chainID.Stri
return uint(rowsAffected), err
}, batchSize)
if err != nil {
- return pkgerrors.Wrap(err, "TxmReaper#reapEthTxes batch delete of confirmed evm.txes failed")
+ return pkgerrors.Wrap(err, "TxmReaper#reapEthTxes batch delete of finalized evm.txes failed")
}
// Delete old 'fatal_error' evm.txes
err = sqlutil.Batch(func(_, limit uint) (count uint, err error) {
@@ -1927,6 +1912,38 @@ AND evm_chain_id = $2`, timeThreshold, chainID.String())
if err != nil {
return pkgerrors.Wrap(err, "TxmReaper#reapEthTxes batch delete of fatally errored evm.txes failed")
}
+ // Delete old 'confirmed' evm.txes that were never finalized
+ // This query should never result in changes but added just in case transactions slip through the cracks
+ // to avoid them building up in the DB
+ err = sqlutil.Batch(func(_, limit uint) (count uint, err error) {
+ res, err := o.q.ExecContext(ctx, `
+WITH old_enough_receipts AS (
+ SELECT tx_hash FROM evm.receipts
+ ORDER BY block_number ASC, id ASC
+ LIMIT $1
+)
+DELETE FROM evm.txes
+USING old_enough_receipts, evm.tx_attempts
+WHERE evm.tx_attempts.eth_tx_id = evm.txes.id
+AND evm.tx_attempts.hash = old_enough_receipts.tx_hash
+AND evm.txes.created_at < $2
+AND evm.txes.state = 'confirmed'
+AND evm_chain_id = $3`, limit, timeThreshold, chainID.String())
+ if err != nil {
+ return count, pkgerrors.Wrap(err, "ReapTxes failed to delete old confirmed evm.txes")
+ }
+ rowsAffected, err := res.RowsAffected()
+ if err != nil {
+ return count, pkgerrors.Wrap(err, "ReapTxes failed to get rows affected")
+ }
+ if rowsAffected > 0 {
+ o.logger.Errorf("%d confirmed transactions were reaped before being marked as finalized. This should never happen unless the threshold is set too low or the transactions were lost track of", rowsAffected)
+ }
+ return uint(rowsAffected), err
+ }, batchSize)
+ if err != nil {
+ return pkgerrors.Wrap(err, "TxmReaper#reapEthTxes batch delete of confirmed evm.txes failed")
+ }
return nil
}
@@ -2055,3 +2072,42 @@ func (o *evmTxStore) UpdateTxAttemptBroadcastBeforeBlockNum(ctx context.Context,
_, err := o.q.ExecContext(ctx, sql, blockNum, id)
return err
}
+
+// Returns all confirmed transactions with receipt block nums older than or equal to the finalized block number
+func (o *evmTxStore) FindConfirmedTxesReceipts(ctx context.Context, finalizedBlockNum int64, chainID *big.Int) (receipts []Receipt, err error) {
+ var cancel context.CancelFunc
+ ctx, cancel = o.stopCh.Ctx(ctx)
+ defer cancel()
+ err = o.Transact(ctx, true, func(orm *evmTxStore) error {
+ sql := `SELECT evm.receipts.* FROM evm.receipts
+ INNER JOIN evm.tx_attempts ON evm.tx_attempts.hash = evm.receipts.tx_hash
+ INNER JOIN evm.txes ON evm.txes.id = evm.tx_attempts.eth_tx_id
+ WHERE evm.txes.state = 'confirmed' AND evm.receipts.block_number <= $1 AND evm.txes.evm_chain_id = $2`
+ var dbReceipts []DbReceipt
+ err = o.q.SelectContext(ctx, &dbReceipts, sql, finalizedBlockNum, chainID.String())
+ if len(dbReceipts) == 0 {
+ return nil
+ }
+ receipts = dbReceipts
+ return nil
+ })
+ return receipts, err
+}
+
+// Mark transactions corresponding to receipt IDs as finalized
+func (o *evmTxStore) UpdateTxStatesToFinalizedUsingReceiptIds(ctx context.Context, receiptIDs []int64, chainId *big.Int) error {
+ if len(receiptIDs) == 0 {
+ return nil
+ }
+ var cancel context.CancelFunc
+ ctx, cancel = o.stopCh.Ctx(ctx)
+ defer cancel()
+ sql := `
+UPDATE evm.txes SET state = 'finalized' WHERE evm.txes.evm_chain_id = $1 AND evm.txes.id IN (SELECT evm.txes.id FROM evm.txes
+ INNER JOIN evm.tx_attempts ON evm.tx_attempts.eth_tx_id = evm.txes.id
+ INNER JOIN evm.receipts ON evm.receipts.tx_hash = evm.tx_attempts.hash
+ WHERE evm.receipts.id = ANY($2))
+`
+ _, err := o.q.ExecContext(ctx, sql, chainId.String(), pq.Array(receiptIDs))
+ return err
+}
diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go
index afb8de4ca52..191a0a5fed2 100644
--- a/core/chains/evm/txmgr/evm_tx_store_test.go
+++ b/core/chains/evm/txmgr/evm_tx_store_test.go
@@ -783,30 +783,6 @@ func TestORM_UpdateTxForRebroadcast(t *testing.T) {
})
}
-func TestORM_IsTxFinalized(t *testing.T) {
- t.Parallel()
-
- db := pgtest.NewSqlxDB(t)
- txStore := cltest.NewTestTxStore(t, db)
- ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
-
- t.Run("confirmed tx not past finality_depth", func(t *testing.T) {
- confirmedAddr := cltest.MustGenerateRandomKey(t).Address
- tx := mustInsertConfirmedEthTxWithReceipt(t, txStore, confirmedAddr, 123, 1)
- finalized, err := txStore.IsTxFinalized(tests.Context(t), 2, tx.ID, ethClient.ConfiguredChainID())
- require.NoError(t, err)
- require.False(t, finalized)
- })
-
- t.Run("confirmed tx past finality_depth", func(t *testing.T) {
- confirmedAddr := cltest.MustGenerateRandomKey(t).Address
- tx := mustInsertConfirmedEthTxWithReceipt(t, txStore, confirmedAddr, 123, 1)
- finalized, err := txStore.IsTxFinalized(tests.Context(t), 10, tx.ID, ethClient.ConfiguredChainID())
- require.NoError(t, err)
- require.True(t, finalized)
- })
-}
-
func TestORM_FindTransactionsConfirmedInBlockRange(t *testing.T) {
t.Parallel()
@@ -1382,7 +1358,7 @@ func TestORM_UpdateTxUnstartedToInProgress(t *testing.T) {
evmTxmCfg := txmgr.NewEvmTxmConfig(ccfg.EVM())
ec := evmtest.NewEthClientMockWithDefaultChain(t)
txMgr := txmgr.NewEvmTxm(ec.ConfiguredChainID(), evmTxmCfg, ccfg.EVM().Transactions(), nil, logger.Test(t), nil, nil,
- nil, txStore, nil, nil, nil, nil)
+ nil, txStore, nil, nil, nil, nil, nil)
err := txMgr.XXXTestAbandon(fromAddress) // mark transaction as abandoned
require.NoError(t, err)
@@ -1871,3 +1847,60 @@ func AssertCountPerSubject(t *testing.T, txStore txmgr.TestEvmTxStore, expected
require.NoError(t, err)
require.Equal(t, int(expected), count)
}
+
+func TestORM_FindTransactionsByState(t *testing.T) {
+ t.Parallel()
+
+ ctx := tests.Context(t)
+ db := pgtest.NewSqlxDB(t)
+ txStore := cltest.NewTestTxStore(t, db)
+ kst := cltest.NewKeyStore(t, db)
+ _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())
+ finalizedBlockNum := int64(100)
+
+ mustInsertUnstartedTx(t, txStore, fromAddress)
+ mustInsertInProgressEthTxWithAttempt(t, txStore, 0, fromAddress)
+ mustInsertUnconfirmedEthTxWithAttemptState(t, txStore, 1, fromAddress, txmgrtypes.TxAttemptBroadcast)
+ mustInsertConfirmedMissingReceiptEthTxWithLegacyAttempt(t, txStore, 2, finalizedBlockNum, time.Now(), fromAddress)
+ mustInsertConfirmedEthTxWithReceipt(t, txStore, fromAddress, 3, finalizedBlockNum+1)
+ mustInsertConfirmedEthTxWithReceipt(t, txStore, fromAddress, 4, finalizedBlockNum)
+ mustInsertFatalErrorEthTx(t, txStore, fromAddress)
+
+ receipts, err := txStore.FindConfirmedTxesReceipts(ctx, finalizedBlockNum, testutils.FixtureChainID)
+ require.NoError(t, err)
+ require.Len(t, receipts, 1)
+}
+
+func TestORM_UpdateTxesFinalized(t *testing.T) {
+ t.Parallel()
+
+ ctx := tests.Context(t)
+ db := pgtest.NewSqlxDB(t)
+ txStore := cltest.NewTestTxStore(t, db)
+ kst := cltest.NewKeyStore(t, db)
+ broadcast := time.Now()
+ _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())
+
+ t.Run("successfully finalizes a confirmed transaction", func(t *testing.T) {
+ nonce := evmtypes.Nonce(0)
+ tx := &txmgr.Tx{
+ Sequence: &nonce,
+ FromAddress: fromAddress,
+ EncodedPayload: []byte{1, 2, 3},
+ State: txmgrcommon.TxConfirmed,
+ BroadcastAt: &broadcast,
+ InitialBroadcastAt: &broadcast,
+ }
+ err := txStore.InsertTx(ctx, tx)
+ require.NoError(t, err)
+ attempt := newBroadcastLegacyEthTxAttempt(t, tx.ID)
+ err = txStore.InsertTxAttempt(ctx, &attempt)
+ require.NoError(t, err)
+ receipt := mustInsertEthReceipt(t, txStore, 100, testutils.NewHash(), attempt.Hash)
+ err = txStore.UpdateTxStatesToFinalizedUsingReceiptIds(ctx, []int64{receipt.ID}, testutils.FixtureChainID)
+ require.NoError(t, err)
+ etx, err := txStore.FindTxWithAttempts(ctx, tx.ID)
+ require.NoError(t, err)
+ require.Equal(t, txmgrcommon.TxFinalized, etx.State)
+ })
+}
diff --git a/core/chains/evm/txmgr/finalizer.go b/core/chains/evm/txmgr/finalizer.go
new file mode 100644
index 00000000000..6d5fb81782c
--- /dev/null
+++ b/core/chains/evm/txmgr/finalizer.go
@@ -0,0 +1,294 @@
+package txmgr
+
+import (
+ "context"
+ "fmt"
+ "math/big"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common/hexutil"
+ "github.com/ethereum/go-ethereum/rpc"
+
+ "github.com/smartcontractkit/chainlink-common/pkg/logger"
+ "github.com/smartcontractkit/chainlink-common/pkg/services"
+ "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox"
+
+ evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
+)
+
+var _ Finalizer = (*evmFinalizer)(nil)
+
+// processHeadTimeout represents a sanity limit on how long ProcessHead should take to complete
+const processHeadTimeout = 10 * time.Minute
+
+type finalizerTxStore interface {
+ FindConfirmedTxesReceipts(ctx context.Context, finalizedBlockNum int64, chainID *big.Int) ([]Receipt, error)
+ UpdateTxStatesToFinalizedUsingReceiptIds(ctx context.Context, txs []int64, chainId *big.Int) error
+}
+
+type finalizerChainClient interface {
+ BatchCallContext(ctx context.Context, elems []rpc.BatchElem) error
+}
+
+type finalizerHeadTracker interface {
+ LatestAndFinalizedBlock(ctx context.Context) (latest, finalized *evmtypes.Head, err error)
+}
+
+// Finalizer handles processing new finalized blocks and marking transactions as finalized accordingly in the TXM DB
+type evmFinalizer struct {
+ services.StateMachine
+ lggr logger.SugaredLogger
+ chainId *big.Int
+ rpcBatchSize int
+
+ txStore finalizerTxStore
+ client finalizerChainClient
+ headTracker finalizerHeadTracker
+
+ mb *mailbox.Mailbox[*evmtypes.Head]
+ stopCh services.StopChan
+ wg sync.WaitGroup
+
+ lastProcessedFinalizedBlockNum int64
+}
+
+func NewEvmFinalizer(
+ lggr logger.Logger,
+ chainId *big.Int,
+ rpcBatchSize uint32,
+ txStore finalizerTxStore,
+ client finalizerChainClient,
+ headTracker finalizerHeadTracker,
+) *evmFinalizer {
+ lggr = logger.Named(lggr, "Finalizer")
+ return &evmFinalizer{
+ lggr: logger.Sugared(lggr),
+ chainId: chainId,
+ rpcBatchSize: int(rpcBatchSize),
+ txStore: txStore,
+ client: client,
+ headTracker: headTracker,
+ mb: mailbox.NewSingle[*evmtypes.Head](),
+ }
+}
+
+// Start the finalizer
+func (f *evmFinalizer) Start(ctx context.Context) error {
+ return f.StartOnce("Finalizer", func() error {
+ f.lggr.Debugf("started Finalizer with RPC batch size limit: %d", f.rpcBatchSize)
+ f.stopCh = make(chan struct{})
+ f.wg.Add(1)
+ go f.runLoop()
+ return nil
+ })
+}
+
+// Close the finalizer
+func (f *evmFinalizer) Close() error {
+ return f.StopOnce("Finalizer", func() error {
+ f.lggr.Debug("closing Finalizer")
+ close(f.stopCh)
+ f.wg.Wait()
+ return nil
+ })
+}
+
+func (f *evmFinalizer) Name() string {
+ return f.lggr.Name()
+}
+
+func (f *evmFinalizer) HealthReport() map[string]error {
+ return map[string]error{f.Name(): f.Healthy()}
+}
+
+func (f *evmFinalizer) runLoop() {
+ defer f.wg.Done()
+ ctx, cancel := f.stopCh.NewCtx()
+ defer cancel()
+ for {
+ select {
+ case <-f.mb.Notify():
+ for {
+ if ctx.Err() != nil {
+ return
+ }
+ head, exists := f.mb.Retrieve()
+ if !exists {
+ break
+ }
+ if err := f.ProcessHead(ctx, head); err != nil {
+ f.lggr.Errorw("Error processing head", "err", err)
+ f.SvcErrBuffer.Append(err)
+ continue
+ }
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+func (f *evmFinalizer) DeliverLatestHead(head *evmtypes.Head) bool {
+ return f.mb.Deliver(head)
+}
+
+func (f *evmFinalizer) ProcessHead(ctx context.Context, head *evmtypes.Head) error {
+ ctx, cancel := context.WithTimeout(ctx, processHeadTimeout)
+ defer cancel()
+ _, latestFinalizedHead, err := f.headTracker.LatestAndFinalizedBlock(ctx)
+ if err != nil {
+ return fmt.Errorf("failed to retrieve latest finalized head: %w", err)
+ }
+ return f.processFinalizedHead(ctx, latestFinalizedHead)
+}
+
+// Determines if any confirmed transactions can be marked as finalized by comparing their receipts against the latest finalized block
+func (f *evmFinalizer) processFinalizedHead(ctx context.Context, latestFinalizedHead *evmtypes.Head) error {
+ // Cannot determine finality without a finalized head for comparison
+ if latestFinalizedHead == nil || !latestFinalizedHead.IsValid() {
+ return fmt.Errorf("invalid latestFinalizedHead")
+ }
+ // Only continue processing if the latestFinalizedHead has not already been processed
+ // Helps avoid unnecessary processing on every head if blocks are finalized in batches
+ if latestFinalizedHead.BlockNumber() == f.lastProcessedFinalizedBlockNum {
+ return nil
+ }
+ if latestFinalizedHead.BlockNumber() < f.lastProcessedFinalizedBlockNum {
+ f.lggr.Errorw("Received finalized block older than one already processed. This should never happen and could be an issue with RPCs.", "lastProcessedFinalizedBlockNum", f.lastProcessedFinalizedBlockNum, "retrievedFinalizedBlockNum", latestFinalizedHead.BlockNumber())
+ return nil
+ }
+
+ earliestBlockNumInChain := latestFinalizedHead.EarliestHeadInChain().BlockNumber()
+ f.lggr.Debugw("processing latest finalized head", "blockNum", latestFinalizedHead.BlockNumber(), "blockHash", latestFinalizedHead.BlockHash(), "earliestBlockNumInChain", earliestBlockNumInChain)
+
+ // Retrieve all confirmed transactions with receipts older than or equal to the finalized block, loaded with attempts and receipts
+ unfinalizedReceipts, err := f.txStore.FindConfirmedTxesReceipts(ctx, latestFinalizedHead.BlockNumber(), f.chainId)
+ if err != nil {
+ return fmt.Errorf("failed to retrieve receipts for confirmed, unfinalized transactions: %w", err)
+ }
+
+ var finalizedReceipts []Receipt
+ // Group by block hash transactions whose receipts cannot be validated using the cached heads
+ blockNumToReceiptsMap := make(map[int64][]Receipt)
+ // Find transactions with receipt block nums older than the latest finalized block num and block hashes still in chain
+ for _, receipt := range unfinalizedReceipts {
+ // The tx store query ensures transactions have receipts but leaving this check here for a belts and braces approach
+ if receipt.Receipt.IsZero() || receipt.Receipt.IsUnmined() {
+ f.lggr.AssumptionViolationw("invalid receipt found for confirmed transaction", "receipt", receipt)
+ continue
+ }
+ // The tx store query only returns transactions with receipts older than or equal to the finalized block but leaving this check here for a belts and braces approach
+ if receipt.BlockNumber > latestFinalizedHead.BlockNumber() {
+ continue
+ }
+ // Receipt block num older than earliest head in chain. Validate hash using RPC call later
+ if receipt.BlockNumber < earliestBlockNumInChain {
+ blockNumToReceiptsMap[receipt.BlockNumber] = append(blockNumToReceiptsMap[receipt.BlockNumber], receipt)
+ continue
+ }
+ blockHashInChain := latestFinalizedHead.HashAtHeight(receipt.BlockNumber)
+ // Receipt block hash does not match the block hash in chain. Transaction has been re-org'd out but DB state has not been updated yet
+ if blockHashInChain.String() != receipt.BlockHash.String() {
+ // Log error if a transaction is marked as confirmed with a receipt older than the finalized block
+ // This scenario could potentially point to a re-org'd transaction the Confirmer has lost track of
+ f.lggr.Errorw("found confirmed transaction with re-org'd receipt older than finalized block", "receipt", receipt, "onchainBlockHash", blockHashInChain.String())
+ continue
+ }
+ finalizedReceipts = append(finalizedReceipts, receipt)
+ }
+
+ // Check if block hashes exist for receipts on-chain older than the earliest cached head
+ // Transactions are grouped by their receipt block hash to avoid repeat requests on the same hash in case transactions were confirmed in the same block
+ validatedReceipts := f.batchCheckReceiptHashesOnchain(ctx, blockNumToReceiptsMap)
+ finalizedReceipts = append(finalizedReceipts, validatedReceipts...)
+
+ receiptIDs := f.buildReceiptIdList(finalizedReceipts)
+
+ err = f.txStore.UpdateTxStatesToFinalizedUsingReceiptIds(ctx, receiptIDs, f.chainId)
+ if err != nil {
+ return fmt.Errorf("failed to update transactions as finalized: %w", err)
+ }
+ // Update lastProcessedFinalizedBlockNum after processing has completed to allow failed processing to retry on subsequent heads
+ // Does not need to be protected with mutex lock because the Finalizer only runs in a single loop
+ f.lastProcessedFinalizedBlockNum = latestFinalizedHead.BlockNumber()
+ return nil
+}
+
+func (f *evmFinalizer) batchCheckReceiptHashesOnchain(ctx context.Context, blockNumToReceiptsMap map[int64][]Receipt) []Receipt {
+ if len(blockNumToReceiptsMap) == 0 {
+ return nil
+ }
+ // Group the RPC batch calls in groups of rpcBatchSize
+ var rpcBatchGroups [][]rpc.BatchElem
+ var rpcBatch []rpc.BatchElem
+ for blockNum := range blockNumToReceiptsMap {
+ elem := rpc.BatchElem{
+ Method: "eth_getBlockByNumber",
+ Args: []any{
+ hexutil.EncodeBig(big.NewInt(blockNum)),
+ false,
+ },
+ Result: new(evmtypes.Head),
+ }
+ rpcBatch = append(rpcBatch, elem)
+ if len(rpcBatch) >= f.rpcBatchSize {
+ rpcBatchGroups = append(rpcBatchGroups, rpcBatch)
+ rpcBatch = []rpc.BatchElem{}
+ }
+ }
+ if len(rpcBatch) > 0 {
+ rpcBatchGroups = append(rpcBatchGroups, rpcBatch)
+ }
+
+ var finalizedReceipts []Receipt
+ for _, rpcBatch := range rpcBatchGroups {
+ err := f.client.BatchCallContext(ctx, rpcBatch)
+ if err != nil {
+ // Continue if batch RPC call failed so other batches can still be considered for finalization
+ f.lggr.Errorw("failed to find blocks due to batch call failure", "error", err)
+ continue
+ }
+ for _, req := range rpcBatch {
+ if req.Error != nil {
+ // Continue if particular RPC call failed so other txs can still be considered for finalization
+ f.lggr.Errorw("failed to find block by number", "blockNum", req.Args[0], "error", req.Error)
+ continue
+ }
+ head, ok := req.Result.(*evmtypes.Head)
+ if !ok || !head.IsValid() {
+ // Continue if particular RPC call yielded a nil block so other txs can still be considered for finalization
+ f.lggr.Errorw("retrieved nil head for block number", "blockNum", req.Args[0])
+ continue
+ }
+ receipts := blockNumToReceiptsMap[head.BlockNumber()]
+ // Check if transaction receipts match the block hash at the given block num
+ // If they do not, the transactions may have been re-org'd out
+ // The expectation is for the Confirmer to pick up on these re-orgs and get the transaction included
+ for _, receipt := range receipts {
+ if receipt.BlockHash.String() == head.BlockHash().String() {
+ finalizedReceipts = append(finalizedReceipts, receipt)
+ } else {
+ // Log error if a transaction is marked as confirmed with a receipt older than the finalized block
+ // This scenario could potentially point to a re-org'd transaction the Confirmer has lost track of
+ f.lggr.Errorw("found confirmed transaction with re-org'd receipt older than finalized block", "receipt", receipt, "onchainBlockHash", head.BlockHash().String())
+ }
+ }
+ }
+ }
+ return finalizedReceipts
+}
+
+// Build list of transaction IDs
+func (f *evmFinalizer) buildReceiptIdList(finalizedReceipts []Receipt) []int64 {
+ receiptIds := make([]int64, len(finalizedReceipts))
+ for i, receipt := range finalizedReceipts {
+ f.lggr.Debugw("transaction considered finalized",
+ "txHash", receipt.TxHash.String(),
+ "receiptBlockNum", receipt.BlockNumber,
+ "receiptBlockHash", receipt.BlockHash.String(),
+ )
+ receiptIds[i] = receipt.ID
+ }
+ return receiptIds
+}
diff --git a/core/chains/evm/txmgr/finalizer_test.go b/core/chains/evm/txmgr/finalizer_test.go
new file mode 100644
index 00000000000..f83a53bf499
--- /dev/null
+++ b/core/chains/evm/txmgr/finalizer_test.go
@@ -0,0 +1,240 @@
+package txmgr_test
+
+import (
+ "errors"
+ "math/big"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/hexutil"
+ "github.com/ethereum/go-ethereum/rpc"
+ "github.com/google/uuid"
+ "github.com/stretchr/testify/mock"
+ "github.com/stretchr/testify/require"
+
+ "github.com/smartcontractkit/chainlink-common/pkg/logger"
+ "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
+ "github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
+
+ txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr"
+
+ "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker"
+ "github.com/smartcontractkit/chainlink/v2/core/chains/evm/testutils"
+ "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
+ evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
+ "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
+ "github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
+ "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
+)
+
+func TestFinalizer_MarkTxFinalized(t *testing.T) {
+ t.Parallel()
+ ctx := tests.Context(t)
+ db := pgtest.NewSqlxDB(t)
+ txStore := cltest.NewTestTxStore(t, db)
+ ethKeyStore := cltest.NewKeyStore(t, db).Eth()
+ feeLimit := uint64(10_000)
+ ethClient := testutils.NewEthClientMockWithDefaultChain(t)
+ rpcBatchSize := uint32(1)
+ ht := headtracker.NewSimulatedHeadTracker(ethClient, true, 0)
+
+ head := &evmtypes.Head{
+ Hash: utils.NewHash(),
+ Number: 100,
+ Parent: &evmtypes.Head{
+ Hash: utils.NewHash(),
+ Number: 99,
+ IsFinalized: true,
+ },
+ }
+
+ t.Run("returns not finalized for tx with receipt newer than finalized block", func(t *testing.T) {
+ finalizer := txmgr.NewEvmFinalizer(logger.Test(t), testutils.FixtureChainID, rpcBatchSize, txStore, ethClient, ht)
+ servicetest.Run(t, finalizer)
+
+ idempotencyKey := uuid.New().String()
+ _, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)
+ nonce := evmtypes.Nonce(0)
+ broadcast := time.Now()
+ tx := &txmgr.Tx{
+ Sequence: &nonce,
+ IdempotencyKey: &idempotencyKey,
+ FromAddress: fromAddress,
+ EncodedPayload: []byte{1, 2, 3},
+ FeeLimit: feeLimit,
+ State: txmgrcommon.TxConfirmed,
+ BroadcastAt: &broadcast,
+ InitialBroadcastAt: &broadcast,
+ }
+ attemptHash := insertTxAndAttemptWithIdempotencyKey(t, txStore, tx, idempotencyKey)
+ // Insert receipt for unfinalized block num
+ mustInsertEthReceipt(t, txStore, head.Number, head.Hash, attemptHash)
+ ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(head, nil).Once()
+ ethClient.On("LatestFinalizedBlock", mock.Anything).Return(head.Parent, nil).Once()
+ err := finalizer.ProcessHead(ctx, head)
+ require.NoError(t, err)
+ tx, err = txStore.FindTxWithIdempotencyKey(ctx, idempotencyKey, testutils.FixtureChainID)
+ require.NoError(t, err)
+ require.Equal(t, txmgrcommon.TxConfirmed, tx.State)
+ })
+
+ t.Run("returns not finalized for tx with receipt re-org'd out", func(t *testing.T) {
+ finalizer := txmgr.NewEvmFinalizer(logger.Test(t), testutils.FixtureChainID, rpcBatchSize, txStore, ethClient, ht)
+ servicetest.Run(t, finalizer)
+
+ idempotencyKey := uuid.New().String()
+ _, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)
+ nonce := evmtypes.Nonce(0)
+ broadcast := time.Now()
+ tx := &txmgr.Tx{
+ Sequence: &nonce,
+ IdempotencyKey: &idempotencyKey,
+ FromAddress: fromAddress,
+ EncodedPayload: []byte{1, 2, 3},
+ FeeLimit: feeLimit,
+ State: txmgrcommon.TxConfirmed,
+ BroadcastAt: &broadcast,
+ InitialBroadcastAt: &broadcast,
+ }
+ attemptHash := insertTxAndAttemptWithIdempotencyKey(t, txStore, tx, idempotencyKey)
+ // Insert receipt for finalized block num
+ mustInsertEthReceipt(t, txStore, head.Parent.Number, utils.NewHash(), attemptHash)
+ ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(head, nil).Once()
+ ethClient.On("LatestFinalizedBlock", mock.Anything).Return(head.Parent, nil).Once()
+ err := finalizer.ProcessHead(ctx, head)
+ require.NoError(t, err)
+ tx, err = txStore.FindTxWithIdempotencyKey(ctx, idempotencyKey, testutils.FixtureChainID)
+ require.NoError(t, err)
+ require.Equal(t, txmgrcommon.TxConfirmed, tx.State)
+ })
+
+ t.Run("returns finalized for tx with receipt in a finalized block", func(t *testing.T) {
+ finalizer := txmgr.NewEvmFinalizer(logger.Test(t), testutils.FixtureChainID, rpcBatchSize, txStore, ethClient, ht)
+ servicetest.Run(t, finalizer)
+
+ idempotencyKey := uuid.New().String()
+ _, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)
+ nonce := evmtypes.Nonce(0)
+ broadcast := time.Now()
+ tx := &txmgr.Tx{
+ Sequence: &nonce,
+ IdempotencyKey: &idempotencyKey,
+ FromAddress: fromAddress,
+ EncodedPayload: []byte{1, 2, 3},
+ FeeLimit: feeLimit,
+ State: txmgrcommon.TxConfirmed,
+ BroadcastAt: &broadcast,
+ InitialBroadcastAt: &broadcast,
+ }
+ attemptHash := insertTxAndAttemptWithIdempotencyKey(t, txStore, tx, idempotencyKey)
+ // Insert receipt for finalized block num
+ mustInsertEthReceipt(t, txStore, head.Parent.Number, head.Parent.Hash, attemptHash)
+ ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(head, nil).Once()
+ ethClient.On("LatestFinalizedBlock", mock.Anything).Return(head.Parent, nil).Once()
+ err := finalizer.ProcessHead(ctx, head)
+ require.NoError(t, err)
+ tx, err = txStore.FindTxWithIdempotencyKey(ctx, idempotencyKey, testutils.FixtureChainID)
+ require.NoError(t, err)
+ require.Equal(t, txmgrcommon.TxFinalized, tx.State)
+ })
+
+ t.Run("returns finalized for tx with receipt older than block history depth", func(t *testing.T) {
+ finalizer := txmgr.NewEvmFinalizer(logger.Test(t), testutils.FixtureChainID, rpcBatchSize, txStore, ethClient, ht)
+ servicetest.Run(t, finalizer)
+
+ idempotencyKey := uuid.New().String()
+ _, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)
+ nonce := evmtypes.Nonce(0)
+ broadcast := time.Now()
+ tx := &txmgr.Tx{
+ Sequence: &nonce,
+ IdempotencyKey: &idempotencyKey,
+ FromAddress: fromAddress,
+ EncodedPayload: []byte{1, 2, 3},
+ FeeLimit: feeLimit,
+ State: txmgrcommon.TxConfirmed,
+ BroadcastAt: &broadcast,
+ InitialBroadcastAt: &broadcast,
+ }
+ attemptHash := insertTxAndAttemptWithIdempotencyKey(t, txStore, tx, idempotencyKey)
+ // Insert receipt for finalized block num
+ receiptBlockHash1 := utils.NewHash()
+ mustInsertEthReceipt(t, txStore, head.Parent.Number-2, receiptBlockHash1, attemptHash)
+ idempotencyKey = uuid.New().String()
+ nonce = evmtypes.Nonce(1)
+ tx = &txmgr.Tx{
+ Sequence: &nonce,
+ IdempotencyKey: &idempotencyKey,
+ FromAddress: fromAddress,
+ EncodedPayload: []byte{1, 2, 3},
+ FeeLimit: feeLimit,
+ State: txmgrcommon.TxConfirmed,
+ BroadcastAt: &broadcast,
+ InitialBroadcastAt: &broadcast,
+ }
+ attemptHash = insertTxAndAttemptWithIdempotencyKey(t, txStore, tx, idempotencyKey)
+ // Insert receipt for finalized block num
+ receiptBlockHash2 := utils.NewHash()
+ mustInsertEthReceipt(t, txStore, head.Parent.Number-1, receiptBlockHash2, attemptHash)
+ // Separate batch calls will be made for each tx due to RPC batch size set to 1 when finalizer initialized above
+ ethClient.On("BatchCallContext", mock.Anything, mock.IsType([]rpc.BatchElem{})).Run(func(args mock.Arguments) {
+ rpcElements := args.Get(1).([]rpc.BatchElem)
+ require.Equal(t, 1, len(rpcElements))
+
+ require.Equal(t, "eth_getBlockByNumber", rpcElements[0].Method)
+ require.Equal(t, false, rpcElements[0].Args[1])
+
+ reqBlockNum := rpcElements[0].Args[0].(string)
+ req1BlockNum := hexutil.EncodeBig(big.NewInt(head.Parent.Number - 2))
+ req2BlockNum := hexutil.EncodeBig(big.NewInt(head.Parent.Number - 1))
+ var headResult evmtypes.Head
+ if req1BlockNum == reqBlockNum {
+ headResult = evmtypes.Head{Number: head.Parent.Number - 2, Hash: receiptBlockHash1}
+ } else if req2BlockNum == reqBlockNum {
+ headResult = evmtypes.Head{Number: head.Parent.Number - 1, Hash: receiptBlockHash2}
+ } else {
+ require.Fail(t, "unrecognized block hash")
+ }
+ rpcElements[0].Result = &headResult
+ }).Return(nil).Twice()
+ ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(head, nil).Once()
+ ethClient.On("LatestFinalizedBlock", mock.Anything).Return(head.Parent, nil).Once()
+ err := finalizer.ProcessHead(ctx, head)
+ require.NoError(t, err)
+ tx, err = txStore.FindTxWithIdempotencyKey(ctx, idempotencyKey, testutils.FixtureChainID)
+ require.NoError(t, err)
+ require.Equal(t, txmgrcommon.TxFinalized, tx.State)
+ })
+
+ t.Run("returns error if failed to retrieve latest head in headtracker", func(t *testing.T) {
+ finalizer := txmgr.NewEvmFinalizer(logger.Test(t), testutils.FixtureChainID, rpcBatchSize, txStore, ethClient, ht)
+ servicetest.Run(t, finalizer)
+
+ ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(nil, errors.New("failed to get latest head")).Once()
+ err := finalizer.ProcessHead(ctx, head)
+ require.Error(t, err)
+ })
+
+ t.Run("returns error if failed to calculate latest finalized head in headtracker", func(t *testing.T) {
+ finalizer := txmgr.NewEvmFinalizer(logger.Test(t), testutils.FixtureChainID, rpcBatchSize, txStore, ethClient, ht)
+ servicetest.Run(t, finalizer)
+
+ ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(head, nil).Once()
+ ethClient.On("LatestFinalizedBlock", mock.Anything).Return(nil, errors.New("failed to calculate latest finalized head")).Once()
+ err := finalizer.ProcessHead(ctx, head)
+ require.Error(t, err)
+ })
+}
+
+func insertTxAndAttemptWithIdempotencyKey(t *testing.T, txStore txmgr.TestEvmTxStore, tx *txmgr.Tx, idempotencyKey string) common.Hash {
+ ctx := tests.Context(t)
+ err := txStore.InsertTx(ctx, tx)
+ require.NoError(t, err)
+ tx, err = txStore.FindTxWithIdempotencyKey(ctx, idempotencyKey, testutils.FixtureChainID)
+ require.NoError(t, err)
+ attempt := cltest.NewLegacyEthTxAttempt(t, tx.ID)
+ err = txStore.InsertTxAttempt(ctx, &attempt)
+ require.NoError(t, err)
+ return attempt.Hash
+}
diff --git a/core/chains/evm/txmgr/mocks/evm_tx_store.go b/core/chains/evm/txmgr/mocks/evm_tx_store.go
index b28e55ec324..b40c0ca8376 100644
--- a/core/chains/evm/txmgr/mocks/evm_tx_store.go
+++ b/core/chains/evm/txmgr/mocks/evm_tx_store.go
@@ -18,6 +18,8 @@ import (
time "time"
+ txmgr "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
+
types "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
uuid "github.com/google/uuid"
@@ -444,6 +446,66 @@ func (_c *EvmTxStore_DeleteInProgressAttempt_Call) RunAndReturn(run func(context
return _c
}
+// FindConfirmedTxesReceipts provides a mock function with given fields: ctx, finalizedBlockNum, chainID
+func (_m *EvmTxStore) FindConfirmedTxesReceipts(ctx context.Context, finalizedBlockNum int64, chainID *big.Int) ([]txmgr.DbReceipt, error) {
+ ret := _m.Called(ctx, finalizedBlockNum, chainID)
+
+ if len(ret) == 0 {
+ panic("no return value specified for FindConfirmedTxesReceipts")
+ }
+
+ var r0 []txmgr.DbReceipt
+ var r1 error
+ if rf, ok := ret.Get(0).(func(context.Context, int64, *big.Int) ([]txmgr.DbReceipt, error)); ok {
+ return rf(ctx, finalizedBlockNum, chainID)
+ }
+ if rf, ok := ret.Get(0).(func(context.Context, int64, *big.Int) []txmgr.DbReceipt); ok {
+ r0 = rf(ctx, finalizedBlockNum, chainID)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).([]txmgr.DbReceipt)
+ }
+ }
+
+ if rf, ok := ret.Get(1).(func(context.Context, int64, *big.Int) error); ok {
+ r1 = rf(ctx, finalizedBlockNum, chainID)
+ } else {
+ r1 = ret.Error(1)
+ }
+
+ return r0, r1
+}
+
+// EvmTxStore_FindConfirmedTxesReceipts_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FindConfirmedTxesReceipts'
+type EvmTxStore_FindConfirmedTxesReceipts_Call struct {
+ *mock.Call
+}
+
+// FindConfirmedTxesReceipts is a helper method to define mock.On call
+// - ctx context.Context
+// - finalizedBlockNum int64
+// - chainID *big.Int
+func (_e *EvmTxStore_Expecter) FindConfirmedTxesReceipts(ctx interface{}, finalizedBlockNum interface{}, chainID interface{}) *EvmTxStore_FindConfirmedTxesReceipts_Call {
+ return &EvmTxStore_FindConfirmedTxesReceipts_Call{Call: _e.mock.On("FindConfirmedTxesReceipts", ctx, finalizedBlockNum, chainID)}
+}
+
+func (_c *EvmTxStore_FindConfirmedTxesReceipts_Call) Run(run func(ctx context.Context, finalizedBlockNum int64, chainID *big.Int)) *EvmTxStore_FindConfirmedTxesReceipts_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run(args[0].(context.Context), args[1].(int64), args[2].(*big.Int))
+ })
+ return _c
+}
+
+func (_c *EvmTxStore_FindConfirmedTxesReceipts_Call) Return(receipts []txmgr.DbReceipt, err error) *EvmTxStore_FindConfirmedTxesReceipts_Call {
+ _c.Call.Return(receipts, err)
+ return _c
+}
+
+func (_c *EvmTxStore_FindConfirmedTxesReceipts_Call) RunAndReturn(run func(context.Context, int64, *big.Int) ([]txmgr.DbReceipt, error)) *EvmTxStore_FindConfirmedTxesReceipts_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
// FindEarliestUnconfirmedBroadcastTime provides a mock function with given fields: ctx, chainID
func (_m *EvmTxStore) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID *big.Int) (null.Time, error) {
ret := _m.Called(ctx, chainID)
@@ -2058,65 +2120,6 @@ func (_c *EvmTxStore_HasInProgressTransaction_Call) RunAndReturn(run func(contex
return _c
}
-// IsTxFinalized provides a mock function with given fields: ctx, blockHeight, txID, chainID
-func (_m *EvmTxStore) IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID *big.Int) (bool, error) {
- ret := _m.Called(ctx, blockHeight, txID, chainID)
-
- if len(ret) == 0 {
- panic("no return value specified for IsTxFinalized")
- }
-
- var r0 bool
- var r1 error
- if rf, ok := ret.Get(0).(func(context.Context, int64, int64, *big.Int) (bool, error)); ok {
- return rf(ctx, blockHeight, txID, chainID)
- }
- if rf, ok := ret.Get(0).(func(context.Context, int64, int64, *big.Int) bool); ok {
- r0 = rf(ctx, blockHeight, txID, chainID)
- } else {
- r0 = ret.Get(0).(bool)
- }
-
- if rf, ok := ret.Get(1).(func(context.Context, int64, int64, *big.Int) error); ok {
- r1 = rf(ctx, blockHeight, txID, chainID)
- } else {
- r1 = ret.Error(1)
- }
-
- return r0, r1
-}
-
-// EvmTxStore_IsTxFinalized_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsTxFinalized'
-type EvmTxStore_IsTxFinalized_Call struct {
- *mock.Call
-}
-
-// IsTxFinalized is a helper method to define mock.On call
-// - ctx context.Context
-// - blockHeight int64
-// - txID int64
-// - chainID *big.Int
-func (_e *EvmTxStore_Expecter) IsTxFinalized(ctx interface{}, blockHeight interface{}, txID interface{}, chainID interface{}) *EvmTxStore_IsTxFinalized_Call {
- return &EvmTxStore_IsTxFinalized_Call{Call: _e.mock.On("IsTxFinalized", ctx, blockHeight, txID, chainID)}
-}
-
-func (_c *EvmTxStore_IsTxFinalized_Call) Run(run func(ctx context.Context, blockHeight int64, txID int64, chainID *big.Int)) *EvmTxStore_IsTxFinalized_Call {
- _c.Call.Run(func(args mock.Arguments) {
- run(args[0].(context.Context), args[1].(int64), args[2].(int64), args[3].(*big.Int))
- })
- return _c
-}
-
-func (_c *EvmTxStore_IsTxFinalized_Call) Return(finalized bool, err error) *EvmTxStore_IsTxFinalized_Call {
- _c.Call.Return(finalized, err)
- return _c
-}
-
-func (_c *EvmTxStore_IsTxFinalized_Call) RunAndReturn(run func(context.Context, int64, int64, *big.Int) (bool, error)) *EvmTxStore_IsTxFinalized_Call {
- _c.Call.Return(run)
- return _c
-}
-
// LoadTxAttempts provides a mock function with given fields: ctx, etx
func (_m *EvmTxStore) LoadTxAttempts(ctx context.Context, etx *types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]) error {
ret := _m.Called(ctx, etx)
@@ -2367,17 +2370,17 @@ func (_c *EvmTxStore_PruneUnstartedTxQueue_Call) RunAndReturn(run func(context.C
return _c
}
-// ReapTxHistory provides a mock function with given fields: ctx, minBlockNumberToKeep, timeThreshold, chainID
-func (_m *EvmTxStore) ReapTxHistory(ctx context.Context, minBlockNumberToKeep int64, timeThreshold time.Time, chainID *big.Int) error {
- ret := _m.Called(ctx, minBlockNumberToKeep, timeThreshold, chainID)
+// ReapTxHistory provides a mock function with given fields: ctx, timeThreshold, chainID
+func (_m *EvmTxStore) ReapTxHistory(ctx context.Context, timeThreshold time.Time, chainID *big.Int) error {
+ ret := _m.Called(ctx, timeThreshold, chainID)
if len(ret) == 0 {
panic("no return value specified for ReapTxHistory")
}
var r0 error
- if rf, ok := ret.Get(0).(func(context.Context, int64, time.Time, *big.Int) error); ok {
- r0 = rf(ctx, minBlockNumberToKeep, timeThreshold, chainID)
+ if rf, ok := ret.Get(0).(func(context.Context, time.Time, *big.Int) error); ok {
+ r0 = rf(ctx, timeThreshold, chainID)
} else {
r0 = ret.Error(0)
}
@@ -2392,16 +2395,15 @@ type EvmTxStore_ReapTxHistory_Call struct {
// ReapTxHistory is a helper method to define mock.On call
// - ctx context.Context
-// - minBlockNumberToKeep int64
// - timeThreshold time.Time
// - chainID *big.Int
-func (_e *EvmTxStore_Expecter) ReapTxHistory(ctx interface{}, minBlockNumberToKeep interface{}, timeThreshold interface{}, chainID interface{}) *EvmTxStore_ReapTxHistory_Call {
- return &EvmTxStore_ReapTxHistory_Call{Call: _e.mock.On("ReapTxHistory", ctx, minBlockNumberToKeep, timeThreshold, chainID)}
+func (_e *EvmTxStore_Expecter) ReapTxHistory(ctx interface{}, timeThreshold interface{}, chainID interface{}) *EvmTxStore_ReapTxHistory_Call {
+ return &EvmTxStore_ReapTxHistory_Call{Call: _e.mock.On("ReapTxHistory", ctx, timeThreshold, chainID)}
}
-func (_c *EvmTxStore_ReapTxHistory_Call) Run(run func(ctx context.Context, minBlockNumberToKeep int64, timeThreshold time.Time, chainID *big.Int)) *EvmTxStore_ReapTxHistory_Call {
+func (_c *EvmTxStore_ReapTxHistory_Call) Run(run func(ctx context.Context, timeThreshold time.Time, chainID *big.Int)) *EvmTxStore_ReapTxHistory_Call {
_c.Call.Run(func(args mock.Arguments) {
- run(args[0].(context.Context), args[1].(int64), args[2].(time.Time), args[3].(*big.Int))
+ run(args[0].(context.Context), args[1].(time.Time), args[2].(*big.Int))
})
return _c
}
@@ -2411,7 +2413,7 @@ func (_c *EvmTxStore_ReapTxHistory_Call) Return(_a0 error) *EvmTxStore_ReapTxHis
return _c
}
-func (_c *EvmTxStore_ReapTxHistory_Call) RunAndReturn(run func(context.Context, int64, time.Time, *big.Int) error) *EvmTxStore_ReapTxHistory_Call {
+func (_c *EvmTxStore_ReapTxHistory_Call) RunAndReturn(run func(context.Context, time.Time, *big.Int) error) *EvmTxStore_ReapTxHistory_Call {
_c.Call.Return(run)
return _c
}
@@ -3197,6 +3199,54 @@ func (_c *EvmTxStore_UpdateTxForRebroadcast_Call) RunAndReturn(run func(context.
return _c
}
+// UpdateTxStatesToFinalizedUsingReceiptIds provides a mock function with given fields: ctx, etxIDs, chainId
+func (_m *EvmTxStore) UpdateTxStatesToFinalizedUsingReceiptIds(ctx context.Context, etxIDs []int64, chainId *big.Int) error {
+ ret := _m.Called(ctx, etxIDs, chainId)
+
+ if len(ret) == 0 {
+ panic("no return value specified for UpdateTxStatesToFinalizedUsingReceiptIds")
+ }
+
+ var r0 error
+ if rf, ok := ret.Get(0).(func(context.Context, []int64, *big.Int) error); ok {
+ r0 = rf(ctx, etxIDs, chainId)
+ } else {
+ r0 = ret.Error(0)
+ }
+
+ return r0
+}
+
+// EvmTxStore_UpdateTxStatesToFinalizedUsingReceiptIds_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateTxStatesToFinalizedUsingReceiptIds'
+type EvmTxStore_UpdateTxStatesToFinalizedUsingReceiptIds_Call struct {
+ *mock.Call
+}
+
+// UpdateTxStatesToFinalizedUsingReceiptIds is a helper method to define mock.On call
+// - ctx context.Context
+// - etxIDs []int64
+// - chainId *big.Int
+func (_e *EvmTxStore_Expecter) UpdateTxStatesToFinalizedUsingReceiptIds(ctx interface{}, etxIDs interface{}, chainId interface{}) *EvmTxStore_UpdateTxStatesToFinalizedUsingReceiptIds_Call {
+ return &EvmTxStore_UpdateTxStatesToFinalizedUsingReceiptIds_Call{Call: _e.mock.On("UpdateTxStatesToFinalizedUsingReceiptIds", ctx, etxIDs, chainId)}
+}
+
+func (_c *EvmTxStore_UpdateTxStatesToFinalizedUsingReceiptIds_Call) Run(run func(ctx context.Context, etxIDs []int64, chainId *big.Int)) *EvmTxStore_UpdateTxStatesToFinalizedUsingReceiptIds_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run(args[0].(context.Context), args[1].([]int64), args[2].(*big.Int))
+ })
+ return _c
+}
+
+func (_c *EvmTxStore_UpdateTxStatesToFinalizedUsingReceiptIds_Call) Return(_a0 error) *EvmTxStore_UpdateTxStatesToFinalizedUsingReceiptIds_Call {
+ _c.Call.Return(_a0)
+ return _c
+}
+
+func (_c *EvmTxStore_UpdateTxStatesToFinalizedUsingReceiptIds_Call) RunAndReturn(run func(context.Context, []int64, *big.Int) error) *EvmTxStore_UpdateTxStatesToFinalizedUsingReceiptIds_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
// UpdateTxUnstartedToInProgress provides a mock function with given fields: ctx, etx, attempt
func (_m *EvmTxStore) UpdateTxUnstartedToInProgress(ctx context.Context, etx *types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], attempt *types.TxAttempt[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]) error {
ret := _m.Called(ctx, etx, attempt)
diff --git a/core/chains/evm/txmgr/models.go b/core/chains/evm/txmgr/models.go
index f8682ffd500..1ba3d193cba 100644
--- a/core/chains/evm/txmgr/models.go
+++ b/core/chains/evm/txmgr/models.go
@@ -36,12 +36,13 @@ type (
Tx = txmgrtypes.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]
TxMeta = txmgrtypes.TxMeta[common.Address, common.Hash]
TxAttempt = txmgrtypes.TxAttempt[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]
- Receipt = dbReceipt // EvmReceipt is the exported DB table model for receipts
+ Receipt = DbReceipt // DbReceipt is the exported DB table model for receipts
ReceiptPlus = txmgrtypes.ReceiptPlus[*evmtypes.Receipt]
StuckTxDetector = txmgrtypes.StuckTxDetector[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]
TxmClient = txmgrtypes.TxmClient[*big.Int, common.Address, common.Hash, common.Hash, *evmtypes.Receipt, evmtypes.Nonce, gas.EvmFee]
TransactionClient = txmgrtypes.TransactionClient[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]
ChainReceipt = txmgrtypes.ChainReceipt[common.Hash, common.Hash]
+ Finalizer = txmgrtypes.Finalizer[common.Hash, *evmtypes.Head]
)
var _ KeyStore = (keystore.Eth)(nil) // check interface in txmgr to avoid circular import
diff --git a/core/chains/evm/txmgr/reaper_test.go b/core/chains/evm/txmgr/reaper_test.go
index b3ce48b702c..cfaccdf04eb 100644
--- a/core/chains/evm/txmgr/reaper_test.go
+++ b/core/chains/evm/txmgr/reaper_test.go
@@ -12,18 +12,17 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/utils"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
- txmgrmocks "github.com/smartcontractkit/chainlink/v2/common/txmgr/types/mocks"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
)
-func newReaperWithChainID(t *testing.T, db txmgrtypes.TxHistoryReaper[*big.Int], cfg txmgrtypes.ReaperChainConfig, txConfig txmgrtypes.ReaperTransactionsConfig, cid *big.Int) *txmgr.Reaper {
- return txmgr.NewEvmReaper(logger.Test(t), db, cfg, txConfig, cid)
+func newReaperWithChainID(t *testing.T, db txmgrtypes.TxHistoryReaper[*big.Int], txConfig txmgrtypes.ReaperTransactionsConfig, cid *big.Int) *txmgr.Reaper {
+ return txmgr.NewEvmReaper(logger.Test(t), db, txConfig, cid)
}
-func newReaper(t *testing.T, db txmgrtypes.TxHistoryReaper[*big.Int], cfg txmgrtypes.ReaperChainConfig, txConfig txmgrtypes.ReaperTransactionsConfig) *txmgr.Reaper {
- return newReaperWithChainID(t, db, cfg, txConfig, &cltest.FixtureChainID)
+func newReaper(t *testing.T, db txmgrtypes.TxHistoryReaper[*big.Int], txConfig txmgrtypes.ReaperTransactionsConfig) *txmgr.Reaper {
+ return newReaperWithChainID(t, db, txConfig, &cltest.FixtureChainID)
}
type reaperConfig struct {
@@ -51,12 +50,9 @@ func TestReaper_ReapTxes(t *testing.T) {
oneDayAgo := time.Now().Add(-24 * time.Hour)
t.Run("with nothing in the database, doesn't error", func(t *testing.T) {
- config := txmgrmocks.NewReaperConfig(t)
- config.On("FinalityDepth").Return(uint32(10))
-
tc := &reaperConfig{reaperThreshold: 1 * time.Hour}
- r := newReaper(t, txStore, config, tc)
+ r := newReaper(t, txStore, tc)
err := r.ReapTxes(42)
assert.NoError(t, err)
@@ -66,11 +62,9 @@ func TestReaper_ReapTxes(t *testing.T) {
mustInsertConfirmedEthTxWithReceipt(t, txStore, from, nonce, 5)
t.Run("skips if threshold=0", func(t *testing.T) {
- config := txmgrmocks.NewReaperConfig(t)
-
tc := &reaperConfig{reaperThreshold: 0 * time.Second}
- r := newReaper(t, txStore, config, tc)
+ r := newReaper(t, txStore, tc)
err := r.ReapTxes(42)
assert.NoError(t, err)
@@ -79,12 +73,9 @@ func TestReaper_ReapTxes(t *testing.T) {
})
t.Run("doesn't touch ethtxes with different chain ID", func(t *testing.T) {
- config := txmgrmocks.NewReaperConfig(t)
- config.On("FinalityDepth").Return(uint32(10))
-
tc := &reaperConfig{reaperThreshold: 1 * time.Hour}
- r := newReaperWithChainID(t, txStore, config, tc, big.NewInt(42))
+ r := newReaperWithChainID(t, txStore, tc, big.NewInt(42))
err := r.ReapTxes(42)
assert.NoError(t, err)
@@ -92,41 +83,30 @@ func TestReaper_ReapTxes(t *testing.T) {
cltest.AssertCount(t, db, "evm.txes", 1)
})
- t.Run("deletes confirmed evm.txes that exceed the age threshold with at least EVM.FinalityDepth blocks above their receipt", func(t *testing.T) {
- config := txmgrmocks.NewReaperConfig(t)
- config.On("FinalityDepth").Return(uint32(10))
-
+ t.Run("deletes finalized evm.txes that exceed the age threshold", func(t *testing.T) {
tc := &reaperConfig{reaperThreshold: 1 * time.Hour}
- r := newReaper(t, txStore, config, tc)
+ r := newReaper(t, txStore, tc)
err := r.ReapTxes(42)
assert.NoError(t, err)
// Didn't delete because eth_tx was not old enough
cltest.AssertCount(t, db, "evm.txes", 1)
- pgtest.MustExec(t, db, `UPDATE evm.txes SET created_at=$1`, oneDayAgo)
-
- err = r.ReapTxes(12)
- assert.NoError(t, err)
- // Didn't delete because eth_tx although old enough, was still within EVM.FinalityDepth of the current head
- cltest.AssertCount(t, db, "evm.txes", 1)
+ pgtest.MustExec(t, db, `UPDATE evm.txes SET created_at=$1, state='finalized'`, oneDayAgo)
err = r.ReapTxes(42)
assert.NoError(t, err)
- // Now it deleted because the eth_tx was past EVM.FinalityDepth
+ // Now it deleted because the eth_tx was past the age threshold
cltest.AssertCount(t, db, "evm.txes", 0)
})
mustInsertFatalErrorEthTx(t, txStore, from)
t.Run("deletes errored evm.txes that exceed the age threshold", func(t *testing.T) {
- config := txmgrmocks.NewReaperConfig(t)
- config.On("FinalityDepth").Return(uint32(10))
-
tc := &reaperConfig{reaperThreshold: 1 * time.Hour}
- r := newReaper(t, txStore, config, tc)
+ r := newReaper(t, txStore, tc)
err := r.ReapTxes(42)
assert.NoError(t, err)
@@ -140,4 +120,24 @@ func TestReaper_ReapTxes(t *testing.T) {
// Deleted because it is old enough now
cltest.AssertCount(t, db, "evm.txes", 0)
})
+
+ mustInsertConfirmedEthTxWithReceipt(t, txStore, from, 0, 42)
+
+ t.Run("deletes confirmed evm.txes that exceed the age threshold", func(t *testing.T) {
+ tc := &reaperConfig{reaperThreshold: 1 * time.Hour}
+
+ r := newReaper(t, txStore, tc)
+
+ err := r.ReapTxes(42)
+ assert.NoError(t, err)
+ // Didn't delete because eth_tx was not old enough
+ cltest.AssertCount(t, db, "evm.txes", 1)
+
+ pgtest.MustExec(t, db, `UPDATE evm.txes SET created_at=$1`, oneDayAgo)
+
+ err = r.ReapTxes(42)
+ assert.NoError(t, err)
+ // Now it deleted because the eth_tx was past the age threshold
+ cltest.AssertCount(t, db, "evm.txes", 0)
+ })
}
diff --git a/core/chains/evm/txmgr/test_helpers.go b/core/chains/evm/txmgr/test_helpers.go
index 3b3584a988b..8d208744329 100644
--- a/core/chains/evm/txmgr/test_helpers.go
+++ b/core/chains/evm/txmgr/test_helpers.go
@@ -53,6 +53,7 @@ type TestEvmConfig struct {
Threshold uint32
MinAttempts uint32
DetectionApiUrl *url.URL
+ RpcDefaultBatchSize uint32
}
func (e *TestEvmConfig) Transactions() evmconfig.Transactions {
@@ -65,6 +66,8 @@ func (e *TestEvmConfig) FinalityDepth() uint32 { return 42 }
func (e *TestEvmConfig) ChainType() chaintype.ChainType { return "" }
+func (e *TestEvmConfig) RPCDefaultBatchSize() uint32 { return e.RpcDefaultBatchSize }
+
type TestGasEstimatorConfig struct {
bumpThreshold uint64
}
@@ -141,10 +144,9 @@ type autoPurgeConfig struct {
func (a *autoPurgeConfig) Enabled() bool { return false }
type MockConfig struct {
- EvmConfig *TestEvmConfig
- RpcDefaultBatchSize uint32
- finalityDepth uint32
- finalityTagEnabled bool
+ EvmConfig *TestEvmConfig
+ finalityDepth uint32
+ finalityTagEnabled bool
}
func (c *MockConfig) EVM() evmconfig.EVM {
@@ -156,11 +158,10 @@ func (c *MockConfig) ChainType() chaintype.ChainType { return "" }
func (c *MockConfig) FinalityDepth() uint32 { return c.finalityDepth }
func (c *MockConfig) SetFinalityDepth(fd uint32) { c.finalityDepth = fd }
func (c *MockConfig) FinalityTagEnabled() bool { return c.finalityTagEnabled }
-func (c *MockConfig) RPCDefaultBatchSize() uint32 { return c.RpcDefaultBatchSize }
func MakeTestConfigs(t *testing.T) (*MockConfig, *TestDatabaseConfig, *TestEvmConfig) {
db := &TestDatabaseConfig{defaultQueryTimeout: utils.DefaultQueryTimeout}
- ec := &TestEvmConfig{BumpThreshold: 42, MaxInFlight: uint32(42), MaxQueued: uint64(0), ReaperInterval: time.Duration(0), ReaperThreshold: time.Duration(0)}
+ ec := &TestEvmConfig{BumpThreshold: 42, MaxInFlight: uint32(42), MaxQueued: uint64(0), ReaperInterval: time.Duration(0), ReaperThreshold: time.Duration(0), RpcDefaultBatchSize: uint32(250)}
config := &MockConfig{EvmConfig: ec}
return config, db, ec
}
diff --git a/core/chains/evm/txmgr/txmgr_test.go b/core/chains/evm/txmgr/txmgr_test.go
index 40df5616c99..5f932db8720 100644
--- a/core/chains/evm/txmgr/txmgr_test.go
+++ b/core/chains/evm/txmgr/txmgr_test.go
@@ -85,7 +85,8 @@ func makeTestEvmTxm(
lggr,
lp,
keyStore,
- estimator)
+ estimator,
+ ht)
}
func TestTxm_SendNativeToken_DoesNotSendToZero(t *testing.T) {
@@ -489,14 +490,20 @@ func TestTxm_Lifecycle(t *testing.T) {
config, dbConfig, evmConfig := txmgr.MakeTestConfigs(t)
config.SetFinalityDepth(uint32(42))
- config.RpcDefaultBatchSize = uint32(4)
+ evmConfig.RpcDefaultBatchSize = uint32(4)
evmConfig.ResendAfterThreshold = 1 * time.Hour
evmConfig.ReaperThreshold = 1 * time.Hour
evmConfig.ReaperInterval = 1 * time.Hour
kst.On("EnabledAddressesForChain", mock.Anything, &cltest.FixtureChainID).Return([]common.Address{}, nil)
+ head := cltest.Head(42)
+ finalizedHead := cltest.Head(0)
+
+ ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(head, nil).Once()
+ ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(finalizedHead, nil).Once()
+
keyChangeCh := make(chan struct{})
unsub := cltest.NewAwaiter()
kst.On("SubscribeToKeyChanges", mock.Anything).Return(keyChangeCh, unsub.ItHappened)
@@ -505,7 +512,6 @@ func TestTxm_Lifecycle(t *testing.T) {
txm, err := makeTestEvmTxm(t, db, ethClient, estimator, evmConfig, evmConfig.GasEstimator(), evmConfig.Transactions(), dbConfig, dbConfig.Listener(), kst)
require.NoError(t, err)
- head := cltest.Head(42)
// It should not hang or panic
txm.OnNewLongestChain(tests.Context(t), head)
@@ -607,8 +613,20 @@ func TestTxm_GetTransactionStatus(t *testing.T) {
gcfg := configtest.NewTestGeneralConfig(t)
cfg := evmtest.NewChainScopedConfig(t, gcfg)
+ head := &evmtypes.Head{
+ Hash: utils.NewHash(),
+ Number: 100,
+ Parent: &evmtypes.Head{
+ Hash: utils.NewHash(),
+ Number: 99,
+ IsFinalized: true,
+ },
+ }
+
ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
ethClient.On("PendingNonceAt", mock.Anything, mock.Anything).Return(uint64(0), nil).Maybe()
+ ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(head, nil).Once()
+ ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(head.Parent, nil).Once()
feeEstimator := gasmocks.NewEvmFeeEstimator(t)
feeEstimator.On("Start", mock.Anything).Return(nil).Once()
feeEstimator.On("Close", mock.Anything).Return(nil).Once()
@@ -617,15 +635,6 @@ func TestTxm_GetTransactionStatus(t *testing.T) {
require.NoError(t, err)
servicetest.Run(t, txm)
- head := &evmtypes.Head{
- Hash: utils.NewHash(),
- Number: 100,
- Parent: &evmtypes.Head{
- Hash: utils.NewHash(),
- Number: 99,
- IsFinalized: true,
- },
- }
txm.OnNewLongestChain(ctx, head)
t.Run("returns error if transaction not found", func(t *testing.T) {
@@ -715,13 +724,42 @@ func TestTxm_GetTransactionStatus(t *testing.T) {
attempt := cltest.NewLegacyEthTxAttempt(t, tx.ID)
err = txStore.InsertTxAttempt(ctx, &attempt)
require.NoError(t, err)
- // Insert receipt for finalized block num
- mustInsertEthReceipt(t, txStore, head.Parent.Number, head.ParentHash, attempt.Hash)
+ // Insert receipt for unfinalized block num
+ mustInsertEthReceipt(t, txStore, head.Number, head.Hash, attempt.Hash)
state, err := txm.GetTransactionStatus(ctx, idempotencyKey)
require.NoError(t, err)
require.Equal(t, commontypes.Unconfirmed, state)
})
+ t.Run("returns finalized for finalized state", func(t *testing.T) {
+ idempotencyKey := uuid.New().String()
+ _, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)
+ nonce := evmtypes.Nonce(0)
+ broadcast := time.Now()
+ tx := &txmgr.Tx{
+ Sequence: &nonce,
+ IdempotencyKey: &idempotencyKey,
+ FromAddress: fromAddress,
+ EncodedPayload: []byte{1, 2, 3},
+ FeeLimit: feeLimit,
+ State: txmgrcommon.TxFinalized,
+ BroadcastAt: &broadcast,
+ InitialBroadcastAt: &broadcast,
+ }
+ err := txStore.InsertTx(ctx, tx)
+ require.NoError(t, err)
+ tx, err = txStore.FindTxWithIdempotencyKey(ctx, idempotencyKey, testutils.FixtureChainID)
+ require.NoError(t, err)
+ attempt := cltest.NewLegacyEthTxAttempt(t, tx.ID)
+ err = txStore.InsertTxAttempt(ctx, &attempt)
+ require.NoError(t, err)
+ // Insert receipt for finalized block num
+ mustInsertEthReceipt(t, txStore, head.Parent.Number, head.Parent.Hash, attempt.Hash)
+ state, err := txm.GetTransactionStatus(ctx, idempotencyKey)
+ require.NoError(t, err)
+ require.Equal(t, commontypes.Finalized, state)
+ })
+
t.Run("returns unconfirmed for confirmed missing receipt state", func(t *testing.T) {
idempotencyKey := uuid.New().String()
_, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)
@@ -1018,6 +1056,12 @@ func mustCreateUnstartedTxFromEvmTxRequest(t testing.TB, txStore txmgr.EvmTxStor
return tx
}
+func mustInsertUnstartedTx(t testing.TB, txStore txmgr.TestEvmTxStore, fromAddress common.Address) {
+ etx := cltest.NewEthTx(fromAddress)
+ ctx := tests.Context(t)
+ require.NoError(t, txStore.InsertTx(ctx, &etx))
+}
+
func txRequestWithStrategy(strategy txmgrtypes.TxStrategy) func(*txmgr.TxRequest) {
return func(tx *txmgr.TxRequest) {
tx.Strategy = strategy
diff --git a/core/chains/legacyevm/chain.go b/core/chains/legacyevm/chain.go
index 129c0318820..68ff8d4e111 100644
--- a/core/chains/legacyevm/chain.go
+++ b/core/chains/legacyevm/chain.go
@@ -247,7 +247,7 @@ func newChain(ctx context.Context, cfg *evmconfig.ChainScoped, nodes []*toml.Nod
}
// note: gas estimator is started as a part of the txm
- txm, gasEstimator, err := newEvmTxm(opts.DS, cfg.EVM(), opts.AppConfig.EVMRPCEnabled(), opts.AppConfig.Database(), opts.AppConfig.Database().Listener(), client, l, logPoller, opts)
+ txm, gasEstimator, err := newEvmTxm(opts.DS, cfg.EVM(), opts.AppConfig.EVMRPCEnabled(), opts.AppConfig.Database(), opts.AppConfig.Database().Listener(), client, l, logPoller, opts, headTracker)
if err != nil {
return nil, fmt.Errorf("failed to instantiate EvmTxm for chain with ID %s: %w", chainID.String(), err)
}
diff --git a/core/chains/legacyevm/evm_txm.go b/core/chains/legacyevm/evm_txm.go
index cecfd4ffafe..ab116749665 100644
--- a/core/chains/legacyevm/evm_txm.go
+++ b/core/chains/legacyevm/evm_txm.go
@@ -7,6 +7,7 @@ import (
evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
evmconfig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/config"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"
+ httypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
"github.com/smartcontractkit/chainlink/v2/core/logger"
@@ -22,6 +23,7 @@ func newEvmTxm(
lggr logger.Logger,
logPoller logpoller.LogPoller,
opts ChainRelayExtenderConfig,
+ headTracker httypes.HeadTracker,
) (txm txmgr.TxManager,
estimator gas.EvmFeeEstimator,
err error,
@@ -63,7 +65,8 @@ func newEvmTxm(
lggr,
logPoller,
opts.KeyStore,
- estimator)
+ estimator,
+ headTracker)
} else {
txm = opts.GenTxManager(chainID)
}
diff --git a/core/services/promreporter/prom_reporter_test.go b/core/services/promreporter/prom_reporter_test.go
index b61fa25bdc4..a0a4a247c21 100644
--- a/core/services/promreporter/prom_reporter_test.go
+++ b/core/services/promreporter/prom_reporter_test.go
@@ -62,7 +62,8 @@ func newLegacyChainContainer(t *testing.T, db *sqlx.DB) legacyevm.LegacyChainCon
lggr,
lp,
keyStore,
- estimator)
+ estimator,
+ ht)
require.NoError(t, err)
cfg := configtest.NewGeneralConfig(t, nil)
diff --git a/core/services/vrf/delegate_test.go b/core/services/vrf/delegate_test.go
index 889b19d0e04..9718dc376a7 100644
--- a/core/services/vrf/delegate_test.go
+++ b/core/services/vrf/delegate_test.go
@@ -83,7 +83,7 @@ func buildVrfUni(t *testing.T, db *sqlx.DB, cfg chainlink.GeneralConfig) vrfUniv
btORM := bridges.NewORM(db)
ks := keystore.NewInMemory(db, utils.FastScryptParams, lggr)
_, dbConfig, evmConfig := txmgr.MakeTestConfigs(t)
- txm, err := txmgr.NewTxm(db, evmConfig, evmConfig.GasEstimator(), evmConfig.Transactions(), nil, dbConfig, dbConfig.Listener(), ec, logger.TestLogger(t), nil, ks.Eth(), nil)
+ txm, err := txmgr.NewTxm(db, evmConfig, evmConfig.GasEstimator(), evmConfig.Transactions(), nil, dbConfig, dbConfig.Listener(), ec, logger.TestLogger(t), nil, ks.Eth(), nil, nil)
orm := headtracker.NewORM(*testutils.FixtureChainID, db)
require.NoError(t, orm.IdempotentInsertHead(testutils.Context(t), cltest.Head(51)))
jrm := job.NewORM(db, prm, btORM, ks, lggr)
diff --git a/core/services/vrf/v2/integration_v2_test.go b/core/services/vrf/v2/integration_v2_test.go
index e9ae908565a..178b555667b 100644
--- a/core/services/vrf/v2/integration_v2_test.go
+++ b/core/services/vrf/v2/integration_v2_test.go
@@ -142,7 +142,7 @@ func makeTestTxm(t *testing.T, txStore txmgr.TestEvmTxStore, keyStore keystore.M
_, _, evmConfig := txmgr.MakeTestConfigs(t)
txmConfig := txmgr.NewEvmTxmConfig(evmConfig)
txm := txmgr.NewEvmTxm(ec.ConfiguredChainID(), txmConfig, evmConfig.Transactions(), keyStore.Eth(), logger.TestLogger(t), nil, nil,
- nil, txStore, nil, nil, nil, nil)
+ nil, txStore, nil, nil, nil, nil, nil)
return txm
}
diff --git a/core/services/vrf/v2/listener_v2_test.go b/core/services/vrf/v2/listener_v2_test.go
index ac59f1fdb69..b7a8710c4f8 100644
--- a/core/services/vrf/v2/listener_v2_test.go
+++ b/core/services/vrf/v2/listener_v2_test.go
@@ -40,7 +40,7 @@ func makeTestTxm(t *testing.T, txStore txmgr.TestEvmTxStore, keyStore keystore.M
ec := evmtest.NewEthClientMockWithDefaultChain(t)
txmConfig := txmgr.NewEvmTxmConfig(evmConfig)
txm := txmgr.NewEvmTxm(ec.ConfiguredChainID(), txmConfig, evmConfig.Transactions(), keyStore.Eth(), logger.TestLogger(t), nil, nil,
- nil, txStore, nil, nil, nil, nil)
+ nil, txStore, nil, nil, nil, nil, nil)
return txm
}
diff --git a/core/store/migrate/migrate_test.go b/core/store/migrate/migrate_test.go
index 9a8bf96573e..f4e91f0a2d2 100644
--- a/core/store/migrate/migrate_test.go
+++ b/core/store/migrate/migrate_test.go
@@ -618,3 +618,14 @@ func BenchmarkBackfillingRecordsWithMigration202(b *testing.B) {
require.NoError(b, err)
}
}
+
+func TestRollback_247_TxStateEnumUpdate(t *testing.T) {
+ ctx := testutils.Context(t)
+ _, db := heavyweight.FullTestDBV2(t, nil)
+ p, err := migrate.NewProvider(ctx, db.DB)
+ require.NoError(t, err)
+ _, err = p.DownTo(ctx, 54)
+ require.NoError(t, err)
+ _, err = p.UpTo(ctx, 247)
+ require.NoError(t, err)
+}
diff --git a/core/store/migrate/migrations/0248_add_tx_finalized_state.sql b/core/store/migrate/migrations/0248_add_tx_finalized_state.sql
new file mode 100644
index 00000000000..dcfe8eec734
--- /dev/null
+++ b/core/store/migrate/migrations/0248_add_tx_finalized_state.sql
@@ -0,0 +1,135 @@
+-- +goose Up
+-- Creating new column and enum instead of just adding new value to the existing enum so the migration changes match the rollback logic
+-- Otherwise, migration will complain about mismatching column order
+
+-- +goose StatementBegin
+-- Rename the existing enum with finalized state to mark it as old
+ALTER TYPE evm.txes_state RENAME TO txes_state_old;
+
+-- Create new enum without finalized state
+CREATE TYPE evm.txes_state AS ENUM (
+ 'unstarted',
+ 'in_progress',
+ 'fatal_error',
+ 'unconfirmed',
+ 'confirmed_missing_receipt',
+ 'confirmed',
+ 'finalized'
+);
+
+-- Add a new state column with the new enum type to the txes table
+ALTER TABLE evm.txes ADD COLUMN state_new evm.txes_state;
+
+-- Copy data from the old column to the new
+UPDATE evm.txes SET state_new = state::text::evm.txes_state;
+
+-- Drop constraints referring to old enum type on the old state column
+ALTER TABLE evm.txes ALTER COLUMN state DROP DEFAULT;
+ALTER TABLE evm.txes DROP CONSTRAINT chk_eth_txes_fsm;
+DROP INDEX IF EXISTS idx_eth_txes_state_from_address_evm_chain_id;
+DROP INDEX IF EXISTS idx_eth_txes_min_unconfirmed_nonce_for_key_evm_chain_id;
+DROP INDEX IF EXISTS idx_only_one_in_progress_tx_per_account_id_per_evm_chain_id;
+DROP INDEX IF EXISTS idx_eth_txes_unstarted_subject_id_evm_chain_id;
+
+-- Drop the old state column
+ALTER TABLE evm.txes DROP state;
+
+-- Drop the old enum type
+DROP TYPE evm.txes_state_old;
+
+-- Rename the new column name state to replace the old column
+ALTER TABLE evm.txes RENAME state_new TO state;
+
+-- Reset the state column's default
+ALTER TABLE evm.txes ALTER COLUMN state SET DEFAULT 'unstarted'::evm.txes_state, ALTER COLUMN state SET NOT NULL;
+
+-- Recreate constraint with finalized state
+ALTER TABLE evm.txes ADD CONSTRAINT chk_eth_txes_fsm CHECK (
+ state = 'unstarted'::evm.txes_state AND nonce IS NULL AND error IS NULL AND broadcast_at IS NULL AND initial_broadcast_at IS NULL
+ OR
+ state = 'in_progress'::evm.txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NULL AND initial_broadcast_at IS NULL
+ OR
+ state = 'fatal_error'::evm.txes_state AND error IS NOT NULL
+ OR
+ state = 'unconfirmed'::evm.txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NOT NULL AND initial_broadcast_at IS NOT NULL
+ OR
+ state = 'confirmed'::evm.txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NOT NULL AND initial_broadcast_at IS NOT NULL
+ OR
+ state = 'confirmed_missing_receipt'::evm.txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NOT NULL AND initial_broadcast_at IS NOT NULL
+ OR
+ state = 'finalized'::evm.txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NOT NULL AND initial_broadcast_at IS NOT NULL
+) NOT VALID;
+
+-- Recreate index to include finalized state
+CREATE INDEX idx_eth_txes_state_from_address_evm_chain_id ON evm.txes(evm_chain_id, from_address, state) WHERE state <> 'confirmed'::evm.txes_state AND state <> 'finalized'::evm.txes_state;
+CREATE INDEX idx_eth_txes_min_unconfirmed_nonce_for_key_evm_chain_id ON evm.txes(evm_chain_id, from_address, nonce) WHERE state = 'unconfirmed'::evm.txes_state;
+CREATE UNIQUE INDEX idx_only_one_in_progress_tx_per_account_id_per_evm_chain_id ON evm.txes(evm_chain_id, from_address) WHERE state = 'in_progress'::evm.txes_state;
+CREATE INDEX idx_eth_txes_unstarted_subject_id_evm_chain_id ON evm.txes(evm_chain_id, subject, id) WHERE subject IS NOT NULL AND state = 'unstarted'::evm.txes_state;
+-- +goose StatementEnd
+
+-- +goose Down
+-- +goose StatementBegin
+
+-- Rename the existing enum with finalized state to mark it as old
+ALTER TYPE evm.txes_state RENAME TO txes_state_old;
+
+-- Create new enum without finalized state
+CREATE TYPE evm.txes_state AS ENUM (
+ 'unstarted',
+ 'in_progress',
+ 'fatal_error',
+ 'unconfirmed',
+ 'confirmed_missing_receipt',
+ 'confirmed'
+);
+
+-- Add a new state column with the new enum type to the txes table
+ALTER TABLE evm.txes ADD COLUMN state_new evm.txes_state;
+
+-- Update all transactions with finalized state to confirmed in the old state column
+UPDATE evm.txes SET state = 'confirmed'::evm.txes_state_old WHERE state = 'finalized'::evm.txes_state_old;
+
+-- Copy data from the old column to the new
+UPDATE evm.txes SET state_new = state::text::evm.txes_state;
+
+-- Drop constraints referring to old enum type on the old state column
+ALTER TABLE evm.txes ALTER COLUMN state DROP DEFAULT;
+ALTER TABLE evm.txes DROP CONSTRAINT chk_eth_txes_fsm;
+DROP INDEX IF EXISTS idx_eth_txes_state_from_address_evm_chain_id;
+DROP INDEX IF EXISTS idx_eth_txes_min_unconfirmed_nonce_for_key_evm_chain_id;
+DROP INDEX IF EXISTS idx_only_one_in_progress_tx_per_account_id_per_evm_chain_id;
+DROP INDEX IF EXISTS idx_eth_txes_unstarted_subject_id_evm_chain_id;
+
+-- Drop the old state column
+ALTER TABLE evm.txes DROP state;
+
+-- Drop the old enum type
+DROP TYPE evm.txes_state_old;
+
+-- Rename the new column name state to replace the old column
+ALTER TABLE evm.txes RENAME state_new TO state;
+
+-- Reset the state column's default
+ALTER TABLE evm.txes ALTER COLUMN state SET DEFAULT 'unstarted'::evm.txes_state, ALTER COLUMN state SET NOT NULL;
+
+-- Recereate constraint without finalized state
+ALTER TABLE evm.txes ADD CONSTRAINT chk_eth_txes_fsm CHECK (
+ state = 'unstarted'::evm.txes_state AND nonce IS NULL AND error IS NULL AND broadcast_at IS NULL AND initial_broadcast_at IS NULL
+ OR
+ state = 'in_progress'::evm.txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NULL AND initial_broadcast_at IS NULL
+ OR
+ state = 'fatal_error'::evm.txes_state AND error IS NOT NULL
+ OR
+ state = 'unconfirmed'::evm.txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NOT NULL AND initial_broadcast_at IS NOT NULL
+ OR
+ state = 'confirmed'::evm.txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NOT NULL AND initial_broadcast_at IS NOT NULL
+ OR
+ state = 'confirmed_missing_receipt'::evm.txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NOT NULL AND initial_broadcast_at IS NOT NULL
+) NOT VALID;
+
+-- Recreate index with new enum type
+CREATE INDEX idx_eth_txes_state_from_address_evm_chain_id ON evm.txes(evm_chain_id, from_address, state) WHERE state <> 'confirmed'::evm.txes_state;
+CREATE INDEX idx_eth_txes_min_unconfirmed_nonce_for_key_evm_chain_id ON evm.txes(evm_chain_id, from_address, nonce) WHERE state = 'unconfirmed'::evm.txes_state;
+CREATE UNIQUE INDEX idx_only_one_in_progress_tx_per_account_id_per_evm_chain_id ON evm.txes(evm_chain_id, from_address) WHERE state = 'in_progress'::evm.txes_state;
+CREATE INDEX idx_eth_txes_unstarted_subject_id_evm_chain_id ON evm.txes(evm_chain_id, subject, id) WHERE subject IS NOT NULL AND state = 'unstarted'::evm.txes_state;
+-- +goose StatementEnd
diff --git a/core/web/testdata/body/health.html b/core/web/testdata/body/health.html
index 2a1b2227530..90d301bc8b8 100644
--- a/core/web/testdata/body/health.html
+++ b/core/web/testdata/body/health.html
@@ -63,6 +63,9 @@
Confirmer
+
+ Finalizer
+
WrappedEvmEstimator
diff --git a/core/web/testdata/body/health.json b/core/web/testdata/body/health.json
index 10415c0abdc..839428a5103 100644
--- a/core/web/testdata/body/health.json
+++ b/core/web/testdata/body/health.json
@@ -90,6 +90,15 @@
"output": ""
}
},
+ {
+ "type": "checks",
+ "id": "EVM.0.Txm.Finalizer",
+ "attributes": {
+ "name": "EVM.0.Txm.Finalizer",
+ "status": "passing",
+ "output": ""
+ }
+ },
{
"type": "checks",
"id": "EVM.0.Txm.WrappedEvmEstimator",
diff --git a/core/web/testdata/body/health.txt b/core/web/testdata/body/health.txt
index 09c8cff6c2d..3709b4e15f0 100644
--- a/core/web/testdata/body/health.txt
+++ b/core/web/testdata/body/health.txt
@@ -9,6 +9,7 @@ ok EVM.0.Txm
ok EVM.0.Txm.BlockHistoryEstimator
ok EVM.0.Txm.Broadcaster
ok EVM.0.Txm.Confirmer
+ok EVM.0.Txm.Finalizer
ok EVM.0.Txm.WrappedEvmEstimator
ok JobSpawner
ok Mailbox.Monitor
diff --git a/testdata/scripts/health/multi-chain.txtar b/testdata/scripts/health/multi-chain.txtar
index 7e01493b30b..76937329cb8 100644
--- a/testdata/scripts/health/multi-chain.txtar
+++ b/testdata/scripts/health/multi-chain.txtar
@@ -82,6 +82,7 @@ ok EVM.1.Txm
ok EVM.1.Txm.BlockHistoryEstimator
ok EVM.1.Txm.Broadcaster
ok EVM.1.Txm.Confirmer
+ok EVM.1.Txm.Finalizer
ok EVM.1.Txm.WrappedEvmEstimator
ok JobSpawner
ok Mailbox.Monitor
@@ -219,6 +220,15 @@ ok TelemetryManager
"output": ""
}
},
+ {
+ "type": "checks",
+ "id": "EVM.1.Txm.Finalizer",
+ "attributes": {
+ "name": "EVM.1.Txm.Finalizer",
+ "status": "passing",
+ "output": ""
+ }
+ },
{
"type": "checks",
"id": "EVM.1.Txm.WrappedEvmEstimator",