diff --git a/common/txmgr/resender.go b/common/txmgr/resender.go index ce770055609..06c466e1730 100644 --- a/common/txmgr/resender.go +++ b/common/txmgr/resender.go @@ -41,11 +41,13 @@ type Resender[ ADDR types.Hashable, TX_HASH types.Hashable, BLOCK_HASH types.Hashable, + R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], SEQ types.Sequence, FEE feetypes.Fee, ] struct { txStore txmgrtypes.TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE] client txmgrtypes.TransactionClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] ks txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ] chainID CHAIN_ID interval time.Duration @@ -64,25 +66,28 @@ func NewResender[ ADDR types.Hashable, TX_HASH types.Hashable, BLOCK_HASH types.Hashable, + R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], SEQ types.Sequence, FEE feetypes.Fee, ]( lggr logger.Logger, txStore txmgrtypes.TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE], client txmgrtypes.TransactionClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE], ks txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ], pollInterval time.Duration, config txmgrtypes.ResenderChainConfig, txConfig txmgrtypes.ResenderTransactionsConfig, -) *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { +) *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { if txConfig.ResendAfterThreshold() == 0 { panic("Resender requires a non-zero threshold") } // todo: add context to txStore https://smartcontract-it.atlassian.net/browse/BCI-1585 ctx, cancel := context.WithCancel(context.Background()) - return &Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{ + return &Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{ txStore, client, + tracker, ks, client.ConfiguredChainID(), pollInterval, @@ -97,18 +102,18 @@ func NewResender[ } // Start is a comment which satisfies the linter -func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Start() { +func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start() { er.logger.Debugf("Enabled with poll interval of %s and age threshold of %s", er.interval, er.txConfig.ResendAfterThreshold()) go er.runLoop() } // Stop is a comment which satisfies the linter -func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Stop() { +func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Stop() { er.cancel() <-er.chDone } -func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) runLoop() { +func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop() { defer close(er.chDone) if err := er.resendUnconfirmed(); err != nil { @@ -129,16 +134,20 @@ func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) runLoop() { } } -func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) resendUnconfirmed() error { - enabledAddresses, err := er.ks.EnabledAddressesForChain(er.chainID) +func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) resendUnconfirmed() error { + resendAddresses, err := er.ks.EnabledAddressesForChain(er.chainID) if err != nil { return fmt.Errorf("Resender failed getting enabled keys for chain %s: %w", er.chainID.String(), err) } + + resendAddresses = append(resendAddresses, er.tracker.GetAbandonedAddresses()...) + ageThreshold := er.txConfig.ResendAfterThreshold() maxInFlightTransactions := er.txConfig.MaxInFlight() olderThan := time.Now().Add(-ageThreshold) var allAttempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - for _, k := range enabledAddresses { + + for _, k := range resendAddresses { var attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] attempts, err = er.txStore.FindTxAttemptsRequiringResend(er.ctx, olderThan, maxInFlightTransactions, er.chainID, k) if err != nil { @@ -189,7 +198,7 @@ func logResendResult(lggr logger.Logger, codes []client.SendTxReturnCode) { lggr.Debugw("Completed", "n", len(codes), "nNew", nNew, "nFatal", nFatal) } -func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) logStuckAttempts(attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], fromAddress ADDR) { +func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) logStuckAttempts(attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], fromAddress ADDR) { if time.Since(er.lastAlertTimestamps[fromAddress.String()]) >= unconfirmedTxAlertLogFrequency { oldestAttempt, exists := findOldestUnconfirmedAttempt(attempts) if exists { diff --git a/common/txmgr/test_helpers.go b/common/txmgr/test_helpers.go index 95d08c2e953..0f128a23af4 100644 --- a/common/txmgr/test_helpers.go +++ b/common/txmgr/test_helpers.go @@ -2,6 +2,7 @@ package txmgr import ( "context" + "time" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" ) @@ -13,6 +14,14 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXX ec.client = client } +func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestSetTTL(ttl time.Duration) { + tr.ttl = ttl +} + +func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXDeliverBlock(blockHeight int64) { + tr.mb.Deliver(blockHeight) +} + func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) XXXTestStartInternal() error { return eb.startInternal() } @@ -33,7 +42,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXX return ec.closeInternal() } -func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) XXXTestResendUnconfirmed() error { +func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestResendUnconfirmed() error { return er.resendUnconfirmed() } diff --git a/common/txmgr/tracker.go b/common/txmgr/tracker.go new file mode 100644 index 00000000000..1a24dd5b5fe --- /dev/null +++ b/common/txmgr/tracker.go @@ -0,0 +1,336 @@ +package txmgr + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" + feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" + txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" + "github.com/smartcontractkit/chainlink/v2/common/types" + + "github.com/smartcontractkit/chainlink/v2/core/utils" +) + +const ( + // defaultTTL is the default time to live for abandoned transactions + // After this TTL, the TXM stops tracking abandoned Txs. + defaultTTL = 6 * time.Hour + // handleTxesTimeout represents a sanity limit on how long handleTxesByState + // should take to complete + handleTxesTimeout = 10 * time.Minute +) + +// AbandonedTx is a transaction who's 'FromAddress' was removed from the KeyStore(by the Node Operator). +// Thus, any new attempts for this Tx can't be signed/created. This means no fee bumping can be done. +// However, the Tx may still have live attempts in the chain's mempool, and could get confirmed on the +// chain as-is. Thus, the TXM should not directly discard this Tx. +type AbandonedTx[ADDR types.Hashable] struct { + id int64 + fromAddress ADDR +} + +// Tracker tracks all transactions which have abandoned fromAddresses. +// The fromAddresses can be deleted by Node Operators from the KeyStore. In such cases, +// existing in-flight transactions for these fromAddresses are considered abandoned too. +// Since such Txs can still have attempts on chain's mempool, these could still be confirmed. +// This tracker just tracks such Txs for some time, in case they get confirmed as-is. +type Tracker[ + CHAIN_ID types.ID, + ADDR types.Hashable, + TX_HASH types.Hashable, + BLOCK_HASH types.Hashable, + R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], + SEQ types.Sequence, + FEE feetypes.Fee, +] struct { + services.StateMachine + txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ] + chainID CHAIN_ID + lggr logger.Logger + enabledAddrs map[ADDR]bool + txCache map[int64]AbandonedTx[ADDR] + ttl time.Duration + lock sync.Mutex + mb *utils.Mailbox[int64] + wg sync.WaitGroup + isStarted bool + ctx context.Context + ctxCancel context.CancelFunc +} + +func NewTracker[ + CHAIN_ID types.ID, + ADDR types.Hashable, + TX_HASH types.Hashable, + BLOCK_HASH types.Hashable, + R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], + SEQ types.Sequence, + FEE feetypes.Fee, +]( + txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE], + keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ], + chainID CHAIN_ID, + lggr logger.Logger, +) *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { + return &Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{ + txStore: txStore, + keyStore: keyStore, + chainID: chainID, + lggr: logger.Named(lggr, "TxMgrTracker"), + enabledAddrs: map[ADDR]bool{}, + txCache: map[int64]AbandonedTx[ADDR]{}, + ttl: defaultTTL, + mb: utils.NewSingleMailbox[int64](), + lock: sync.Mutex{}, + wg: sync.WaitGroup{}, + } +} + +func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(_ context.Context) (err error) { + tr.lock.Lock() + defer tr.lock.Unlock() + return tr.StartOnce("Tracker", func() error { + return tr.startInternal() + }) +} + +func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) startInternal() (err error) { + tr.ctx, tr.ctxCancel = context.WithCancel(context.Background()) + + if err := tr.setEnabledAddresses(); err != nil { + return fmt.Errorf("failed to set enabled addresses: %w", err) + } + + if err := tr.trackAbandonedTxes(tr.ctx); err != nil { + return fmt.Errorf("failed to track abandoned txes: %w", err) + } + + tr.isStarted = true + if len(tr.txCache) == 0 { + tr.lggr.Infow("no abandoned txes found, skipping runLoop") + return nil + } + tr.wg.Add(1) + go tr.runLoop() + return nil +} + +func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() error { + tr.lock.Lock() + defer tr.lock.Unlock() + return tr.StopOnce("Tracker", func() error { + return tr.closeInternal() + }) +} + +func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) closeInternal() error { + tr.lggr.Infow("stopping tracker") + if !tr.isStarted { + return fmt.Errorf("tracker not started") + } + tr.ctxCancel() + tr.wg.Wait() + tr.isStarted = false + return nil +} + +func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop() { + defer tr.wg.Done() + ttlExceeded := time.NewTicker(tr.ttl) + defer ttlExceeded.Stop() + for { + select { + case <-tr.mb.Notify(): + for { + if tr.ctx.Err() != nil { + return + } + blockHeight, exists := tr.mb.Retrieve() + if !exists { + break + } + if err := tr.HandleTxesByState(tr.ctx, blockHeight); err != nil { + tr.lggr.Errorw(fmt.Errorf("failed to handle txes by state: %w", err).Error()) + } + } + case <-ttlExceeded.C: + tr.lggr.Infow("ttl exceeded") + tr.MarkAllTxesFatal(tr.ctx) + return + case <-tr.ctx.Done(): + return + } + } +} + +func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetAbandonedAddresses() []ADDR { + tr.lock.Lock() + defer tr.lock.Unlock() + + if !tr.isStarted { + return []ADDR{} + } + + abandonedAddrs := make([]ADDR, len(tr.txCache)) + for _, atx := range tr.txCache { + abandonedAddrs = append(abandonedAddrs, atx.fromAddress) + } + return abandonedAddrs +} + +func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) IsStarted() bool { + tr.lock.Lock() + defer tr.lock.Unlock() + return tr.isStarted +} + +func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) setEnabledAddresses() error { + enabledAddrs, err := tr.keyStore.EnabledAddressesForChain(tr.chainID) + if err != nil { + return fmt.Errorf("failed to get enabled addresses for chain: %w", err) + } + + if len(enabledAddrs) == 0 { + tr.lggr.Warnf("enabled address list is empty") + } + + for _, addr := range enabledAddrs { + tr.enabledAddrs[addr] = true + } + return nil +} + +// trackAbandonedTxes called once to find and insert all abandoned txes into the tracker. +func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) trackAbandonedTxes(ctx context.Context) (err error) { + if tr.isStarted { + return fmt.Errorf("tracker already started") + } + + nonFatalTxes, err := tr.txStore.GetNonFatalTransactions(ctx, tr.chainID) + if err != nil { + return fmt.Errorf("failed to get non fatal txes from txStore: %w", err) + } + + // insert abandoned txes + for _, tx := range nonFatalTxes { + if !tr.enabledAddrs[tx.FromAddress] { + tr.insertTx(tx) + } + } + + if err := tr.handleTxesByState(ctx, 0); err != nil { + return fmt.Errorf("failed to handle txes by state: %w", err) + } + + return nil +} + +func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) HandleTxesByState(ctx context.Context, blockHeight int64) error { + tr.lock.Lock() + defer tr.lock.Unlock() + tr.ctx, tr.ctxCancel = context.WithTimeout(ctx, handleTxesTimeout) + defer tr.ctxCancel() + return tr.handleTxesByState(ctx, blockHeight) +} + +func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) handleTxesByState(ctx context.Context, blockHeight int64) error { + for id, atx := range tr.txCache { + tx, err := tr.txStore.GetTxByID(ctx, atx.id) + if err != nil { + return fmt.Errorf("failed to get tx by ID: %w", err) + } + + switch tx.State { + case TxConfirmed: + if err := tr.handleConfirmedTx(tx, blockHeight); err != nil { + return fmt.Errorf("failed to handle confirmed txes: %w", err) + } + case TxConfirmedMissingReceipt, TxUnconfirmed: + // Keep tracking tx + case TxInProgress, TxUnstarted: + // Tx could never be sent on chain even once. That means that we need to sign + // an attempt to even broadcast this Tx to the chain. Since the fromAddress + // is deleted, we can't sign it. + errMsg := "The FromAddress for this Tx was deleted before this Tx could be broadcast to the chain." + if err := tr.markTxFatal(ctx, tx, errMsg); err != nil { + return fmt.Errorf("failed to mark tx as fatal: %w", err) + } + delete(tr.txCache, id) + case TxFatalError: + delete(tr.txCache, id) + default: + tr.lggr.Errorw(fmt.Sprintf("unhandled transaction state: %v", tx.State)) + } + } + + return nil +} + +// handleConfirmedTx removes a transaction from the tracker if it's been finalized on chain +func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) handleConfirmedTx( + tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + blockHeight int64, +) error { + finalized, err := tr.txStore.IsTxFinalized(tr.ctx, blockHeight, tx.ID, tr.chainID) + if err != nil { + return fmt.Errorf("failed to check if tx is finalized: %w", err) + } + + if finalized { + delete(tr.txCache, tx.ID) + } + + return nil +} + +// insertTx inserts a transaction into the tracker as an AbandonedTx +func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) insertTx( + tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { + if _, contains := tr.txCache[tx.ID]; contains { + return + } + + tr.txCache[tx.ID] = AbandonedTx[ADDR]{ + id: tx.ID, + fromAddress: tx.FromAddress, + } + tr.lggr.Debugw(fmt.Sprintf("inserted tx %v", tx.ID)) +} + +func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) markTxFatal(ctx context.Context, + tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + errMsg string) error { + tx.Error.SetValid(errMsg) + + // Set state to TxInProgress so the tracker can attempt to mark it as fatal + tx.State = TxInProgress + if err := tr.txStore.UpdateTxFatalError(ctx, tx); err != nil { + return fmt.Errorf("failed to mark tx %v as abandoned: %w", tx.ID, err) + } + return nil +} + +func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkAllTxesFatal(ctx context.Context) { + tr.lock.Lock() + defer tr.lock.Unlock() + errMsg := fmt.Sprintf( + "fromAddress for this Tx was deleted, and existing attempts onchain didn't finalize within %d hours, thus this Tx was abandoned.", + int(tr.ttl.Hours())) + + for _, atx := range tr.txCache { + tx, err := tr.txStore.GetTxByID(ctx, atx.id) + if err != nil { + tr.lggr.Errorw(fmt.Errorf("failed to get tx by ID: %w", err).Error()) + continue + } + + if err := tr.markTxFatal(ctx, tx, errMsg); err != nil { + tr.lggr.Errorw(fmt.Errorf("failed to mark tx as abandoned: %w", err).Error()) + } + } +} diff --git a/common/txmgr/txmgr.go b/common/txmgr/txmgr.go index 63bf039d8f7..3aac88e2f4b 100644 --- a/common/txmgr/txmgr.go +++ b/common/txmgr/txmgr.go @@ -95,9 +95,10 @@ type Txm[ wg sync.WaitGroup reaper *Reaper[CHAIN_ID] - resender *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + resender *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] broadcaster *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] confirmer *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] fwdMgr txmgrtypes.ForwarderManager[ADDR] txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ] @@ -132,7 +133,8 @@ func NewTxm[ sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ], broadcaster *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], confirmer *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE], - resender *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + resender *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE], + tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE], ) *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { b := Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{ logger: lggr, @@ -153,6 +155,7 @@ func NewTxm[ broadcaster: broadcaster, confirmer: confirmer, resender: resender, + tracker: tracker, } if txCfg.ResendAfterThreshold() <= 0 { @@ -183,6 +186,10 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx return fmt.Errorf("Txm: Estimator failed to start: %w", err) } + if err := ms.Start(ctx, b.tracker); err != nil { + return fmt.Errorf("Txm: Tracker failed to start: %w", err) + } + b.wg.Add(1) go b.runLoop() <-b.chSubbed @@ -260,6 +267,10 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() (m merr = errors.Join(merr, fmt.Errorf("Txm: failed to close TxAttemptBuilder: %w", err)) } + if err := b.tracker.Close(); err != nil { + merr = errors.Join(merr, fmt.Errorf("Txm: failed to close Tracker: %w", err)) + } + return nil }) } @@ -371,6 +382,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop() b.broadcaster.Trigger(address) case head := <-b.chHeads: b.confirmer.mb.Deliver(head) + b.tracker.mb.Deliver(head.BlockNumber()) case reset := <-b.reset: // This check prevents the weird edge-case where you can select // into this block after chStop has already been closed and the @@ -396,6 +408,9 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop() if err := utils.EnsureClosed(b.confirmer); err != nil { b.logger.Panicw(fmt.Sprintf("Failed to Close Confirmer: %v", err), "err", err) } + if err := utils.EnsureClosed(b.tracker); err != nil { + b.logger.Panicw(fmt.Sprintf("Failed to Close Tracker: %v", err), "err", err) + } return case <-keysChanged: // This check prevents the weird edge-case where you can select diff --git a/common/txmgr/types/mocks/tx_store.go b/common/txmgr/types/mocks/tx_store.go index 0e344b9b6f9..0a7738fd68a 100644 --- a/common/txmgr/types/mocks/tx_store.go +++ b/common/txmgr/types/mocks/tx_store.go @@ -544,6 +544,58 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetInProgre return r0, r1 } +// GetNonFatalTransactions provides a mock function with given fields: ctx, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetNonFatalTransactions(ctx context.Context, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + ret := _m.Called(ctx, chainID) + + var r0 []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error)); ok { + return rf(ctx, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]); ok { + r0 = rf(ctx, chainID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, CHAIN_ID) error); ok { + r1 = rf(ctx, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetTxByID provides a mock function with given fields: ctx, id +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTxByID(ctx context.Context, id int64) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + ret := _m.Called(ctx, id) + + var r0 *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error)); ok { + return rf(ctx, id) + } + if rf, ok := ret.Get(0).(func(context.Context, int64) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]); ok { + r0 = rf(ctx, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetTxInProgress provides a mock function with given fields: ctx, fromAddress func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTxInProgress(ctx context.Context, fromAddress ADDR) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { ret := _m.Called(ctx, fromAddress) @@ -594,6 +646,30 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) HasInProgre return r0, r1 } +// IsTxFinalized provides a mock function with given fields: ctx, blockHeight, txID, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID CHAIN_ID) (bool, error) { + ret := _m.Called(ctx, blockHeight, txID, chainID) + + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64, int64, CHAIN_ID) (bool, error)); ok { + return rf(ctx, blockHeight, txID, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, int64, int64, CHAIN_ID) bool); ok { + r0 = rf(ctx, blockHeight, txID, chainID) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(context.Context, int64, int64, CHAIN_ID) error); ok { + r1 = rf(ctx, blockHeight, txID, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // LoadTxAttempts provides a mock function with given fields: ctx, etx func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) LoadTxAttempts(ctx context.Context, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { ret := _m.Called(ctx, etx) diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go index f731031f926..251135795fd 100644 --- a/common/txmgr/types/tx_store.go +++ b/common/txmgr/types/tx_store.go @@ -81,6 +81,8 @@ type TransactionStore[ FindTransactionsConfirmedInBlockRange(ctx context.Context, highBlockNumber, lowBlockNumber int64, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) GetTxInProgress(ctx context.Context, fromAddress ADDR) (etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) GetInProgressTxAttempts(ctx context.Context, address ADDR, chainID CHAIN_ID) (attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) + GetNonFatalTransactions(ctx context.Context, chainID CHAIN_ID) (txs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) + GetTxByID(ctx context.Context, id int64) (tx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) HasInProgressTransaction(ctx context.Context, account ADDR, chainID CHAIN_ID) (exists bool, err error) LoadTxAttempts(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error MarkAllConfirmedMissingReceipt(ctx context.Context, chainID CHAIN_ID) (err error) @@ -100,6 +102,7 @@ type TransactionStore[ UpdateTxUnstartedToInProgress(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt *TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error UpdateTxFatalError(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error UpdateTxForRebroadcast(ctx context.Context, etx Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], etxAttempt TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error + IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID CHAIN_ID) (finalized bool, err error) } type TxHistoryReaper[CHAIN_ID types.ID] interface { diff --git a/core/chains/evm/txmgr/builder.go b/core/chains/evm/txmgr/builder.go index 84aa21e4de2..f0cbcbf8d92 100644 --- a/core/chains/evm/txmgr/builder.go +++ b/core/chains/evm/txmgr/builder.go @@ -50,13 +50,15 @@ func NewTxm( txmCfg := NewEvmTxmConfig(chainConfig) // wrap Evm specific config feeCfg := NewEvmTxmFeeConfig(fCfg) // wrap Evm specific config txmClient := NewEvmTxmClient(client) // wrap Evm specific client - ethBroadcaster := NewEvmBroadcaster(txStore, txmClient, txmCfg, feeCfg, txConfig, listenerConfig, keyStore, txAttemptBuilder, txNonceSyncer, lggr, checker, chainConfig.NonceAutoSync()) - ethConfirmer := NewEvmConfirmer(txStore, txmClient, txmCfg, feeCfg, txConfig, dbConfig, keyStore, txAttemptBuilder, lggr) - var ethResender *Resender + chainID := txmClient.ConfiguredChainID() + evmBroadcaster := NewEvmBroadcaster(txStore, txmClient, txmCfg, feeCfg, txConfig, listenerConfig, keyStore, txAttemptBuilder, txNonceSyncer, lggr, checker, chainConfig.NonceAutoSync()) + evmTracker := NewEvmTracker(txStore, keyStore, chainID, lggr) + evmConfirmer := NewEvmConfirmer(txStore, txmClient, txmCfg, feeCfg, txConfig, dbConfig, keyStore, txAttemptBuilder, lggr) + var evmResender *Resender if txConfig.ResendAfterThreshold() > 0 { - ethResender = NewEvmResender(lggr, txStore, txmClient, keyStore, txmgr.DefaultResenderPollInterval, chainConfig, txConfig) + evmResender = NewEvmResender(lggr, txStore, txmClient, evmTracker, keyStore, txmgr.DefaultResenderPollInterval, chainConfig, txConfig) } - txm = NewEvmTxm(txmClient.ConfiguredChainID(), txmCfg, txConfig, keyStore, lggr, checker, fwdMgr, txAttemptBuilder, txStore, txNonceSyncer, ethBroadcaster, ethConfirmer, ethResender) + txm = NewEvmTxm(chainID, txmCfg, txConfig, keyStore, lggr, checker, fwdMgr, txAttemptBuilder, txStore, txNonceSyncer, evmBroadcaster, evmConfirmer, evmResender, evmTracker) return txm, nil } @@ -75,21 +77,23 @@ func NewEvmTxm( broadcaster *Broadcaster, confirmer *Confirmer, resender *Resender, + tracker *Tracker, ) *Txm { - return txmgr.NewTxm(chainId, cfg, txCfg, keyStore, lggr, checkerFactory, fwdMgr, txAttemptBuilder, txStore, nonceSyncer, broadcaster, confirmer, resender) + return txmgr.NewTxm(chainId, cfg, txCfg, keyStore, lggr, checkerFactory, fwdMgr, txAttemptBuilder, txStore, nonceSyncer, broadcaster, confirmer, resender, tracker) } -// NewEvnResender creates a new concrete EvmResender +// NewEvmResender creates a new concrete EvmResender func NewEvmResender( lggr logger.Logger, txStore TransactionStore, client TransactionClient, + tracker *Tracker, ks KeyStore, pollInterval time.Duration, config EvmResenderConfig, txConfig txmgrtypes.ResenderTransactionsConfig, ) *Resender { - return txmgr.NewResender(lggr, txStore, client, ks, pollInterval, config, txConfig) + return txmgr.NewResender(lggr, txStore, client, tracker, ks, pollInterval, config, txConfig) } // NewEvmReaper instantiates a new EVM-specific reaper object @@ -112,6 +116,16 @@ func NewEvmConfirmer( return txmgr.NewConfirmer(txStore, client, chainConfig, feeConfig, txConfig, dbConfig, keystore, txAttemptBuilder, lggr, func(r *evmtypes.Receipt) bool { return r == nil }) } +// NewEvmTracker instantiates a new EVM tracker for abandoned transactions +func NewEvmTracker( + txStore TxStore, + keyStore KeyStore, + chainID *big.Int, + lggr logger.Logger, +) *Tracker { + return txmgr.NewTracker(txStore, keyStore, chainID, lggr) +} + // NewEvmBroadcaster returns a new concrete EvmBroadcaster func NewEvmBroadcaster( txStore TransactionStore, diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index 2788c2fd1c9..51c9f98e884 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -69,6 +69,7 @@ type TestEvmTxStore interface { FindTxAttemptsByTxIDs(ids []int64) ([]TxAttempt, error) InsertTxAttempt(attempt *TxAttempt) error LoadTxesAttempts(etxs []*Tx, qopts ...pg.QOpt) error + GetFatalTransactions(ctx context.Context) (txes []*Tx, err error) } type evmTxStore struct { @@ -552,6 +553,26 @@ func (o *evmTxStore) InsertReceipt(receipt *evmtypes.Receipt) (int64, error) { return r.ID, pkgerrors.Wrap(err, "InsertReceipt failed") } +func (o *evmTxStore) GetFatalTransactions(ctx context.Context) (txes []*Tx, err error) { + var cancel context.CancelFunc + ctx, cancel = o.mergeContexts(ctx) + defer cancel() + qq := o.q.WithOpts(pg.WithParentCtx(ctx)) + err = qq.Transaction(func(tx pg.Queryer) error { + stmt := `SELECT * FROM evm.txes WHERE state = 'fatal_error'` + var dbEtxs []DbEthTx + if err = tx.Select(&dbEtxs, stmt); err != nil { + return fmt.Errorf("failed to load evm.txes: %w", err) + } + txes = make([]*Tx, len(dbEtxs)) + dbEthTxsToEvmEthTxPtrs(dbEtxs, txes) + err = o.LoadTxesAttempts(txes, pg.WithParentCtx(ctx), pg.WithQueryer(tx)) + return fmt.Errorf("failed to load evm.tx_attempts: %w", err) + }, pg.OptReadOnlyTx()) + + return txes, nil +} + // FindTxWithAttempts finds the Tx with its attempts and receipts preloaded func (o *evmTxStore) FindTxWithAttempts(etxID int64) (etx Tx, err error) { err = o.q.Transaction(func(tx pg.Queryer) error { @@ -1107,6 +1128,25 @@ ORDER BY nonce ASC return etxs, pkgerrors.Wrap(err, "FindTransactionsConfirmedInBlockRange failed") } +func (o *evmTxStore) IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID *big.Int) (finalized bool, err error) { + var cancel context.CancelFunc + ctx, cancel = o.mergeContexts(ctx) + defer cancel() + + var count int32 + qq := o.q.WithOpts(pg.WithParentCtx(ctx)) + err = qq.GetContext(ctx, &count, ` + SELECT COUNT(evm.receipts.receipt) FROM evm.txes + INNER JOIN evm.tx_attempts ON evm.txes.id = evm.tx_attempts.eth_tx_id + INNER JOIN evm.receipts ON evm.tx_attempts.hash = evm.receipts.tx_hash + WHERE evm.receipts.block_number <= ($1 - evm.txes.min_confirmations) + AND evm.txes.id = $2 AND evm.txes.evm_chain_id = $3`, blockHeight, txID, chainID.String()) + if err != nil { + return false, fmt.Errorf("failed to retrieve transaction reciepts: %w", err) + } + return count > 0, nil +} + func saveAttemptWithNewState(ctx context.Context, q pg.Queryer, logger logger.Logger, attempt TxAttempt, broadcastAt time.Time) error { var dbAttempt DbEthTxAttempt dbAttempt.FromTxAttempt(&attempt) @@ -1223,6 +1263,51 @@ func (o *evmTxStore) SaveInProgressAttempt(ctx context.Context, attempt *TxAttem return nil } +func (o *evmTxStore) GetNonFatalTransactions(ctx context.Context, chainID *big.Int) (txes []*Tx, err error) { + var cancel context.CancelFunc + ctx, cancel = o.mergeContexts(ctx) + defer cancel() + qq := o.q.WithOpts(pg.WithParentCtx(ctx)) + err = qq.Transaction(func(tx pg.Queryer) error { + stmt := `SELECT * FROM evm.txes WHERE state <> 'fatal_error' AND evm_chain_id = $1` + var dbEtxs []DbEthTx + if err = tx.Select(&dbEtxs, stmt, chainID.String()); err != nil { + return fmt.Errorf("failed to load evm.txes: %w", err) + } + txes = make([]*Tx, len(dbEtxs)) + dbEthTxsToEvmEthTxPtrs(dbEtxs, txes) + err = o.LoadTxesAttempts(txes, pg.WithParentCtx(ctx), pg.WithQueryer(tx)) + return fmt.Errorf("failed to load evm.txes: %w", err) + }, pg.OptReadOnlyTx()) + + return txes, nil +} + +func (o *evmTxStore) GetTxByID(ctx context.Context, id int64) (txe *Tx, err error) { + var cancel context.CancelFunc + ctx, cancel = o.mergeContexts(ctx) + defer cancel() + qq := o.q.WithOpts(pg.WithParentCtx(ctx)) + + err = qq.Transaction(func(tx pg.Queryer) error { + stmt := `SELECT * FROM evm.txes WHERE id = $1` + var dbEtxs []DbEthTx + if err = tx.Select(&dbEtxs, stmt, id); err != nil { + return fmt.Errorf("failed to load evm.txes: %w", err) + } + txes := make([]*Tx, len(dbEtxs)) + dbEthTxsToEvmEthTxPtrs(dbEtxs, txes) + if len(txes) != 1 { + return fmt.Errorf("failed to get tx with id %v", id) + } + txe = txes[0] + err = o.LoadTxesAttempts(txes, pg.WithParentCtx(ctx), pg.WithQueryer(tx)) + return fmt.Errorf("failed to load evm.tx_attempts: %w", err) + }, pg.OptReadOnlyTx()) + + return txe, nil +} + // FindTxsRequiringGasBump returns transactions that have all // attempts which are unconfirmed for at least gasBumpThreshold blocks, // limited by limit pending transactions diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index d2cafcb8efa..e68641735ee 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -786,6 +786,31 @@ func TestORM_UpdateTxForRebroadcast(t *testing.T) { }) } +func TestORM_IsTxFinalized(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + cfg := newTestChainScopedConfig(t) + txStore := cltest.NewTestTxStore(t, db, cfg.Database()) + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + + t.Run("confirmed tx not past finality_depth", func(t *testing.T) { + confirmedAddr := cltest.MustGenerateRandomKey(t).Address + tx := mustInsertConfirmedEthTxWithReceipt(t, txStore, confirmedAddr, 123, 1) + finalized, err := txStore.IsTxFinalized(testutils.Context(t), 2, tx.ID, ethClient.ConfiguredChainID()) + require.NoError(t, err) + require.False(t, finalized) + }) + + t.Run("confirmed tx past finality_depth", func(t *testing.T) { + confirmedAddr := cltest.MustGenerateRandomKey(t).Address + tx := mustInsertConfirmedEthTxWithReceipt(t, txStore, confirmedAddr, 123, 1) + finalized, err := txStore.IsTxFinalized(testutils.Context(t), 10, tx.ID, ethClient.ConfiguredChainID()) + require.NoError(t, err) + require.True(t, finalized) + }) +} + func TestORM_FindTransactionsConfirmedInBlockRange(t *testing.T) { t.Parallel() @@ -1310,7 +1335,7 @@ func TestORM_UpdateTxUnstartedToInProgress(t *testing.T) { evmTxmCfg := txmgr.NewEvmTxmConfig(ccfg.EVM()) ec := evmtest.NewEthClientMockWithDefaultChain(t) txMgr := txmgr.NewEvmTxm(ec.ConfiguredChainID(), evmTxmCfg, ccfg.EVM().Transactions(), nil, logger.Test(t), nil, nil, - nil, txStore, nil, nil, nil, nil) + nil, txStore, nil, nil, nil, nil, nil) err := txMgr.XXXTestAbandon(fromAddress) // mark transaction as abandoned require.NoError(t, err) @@ -1365,6 +1390,81 @@ func TestORM_GetTxInProgress(t *testing.T) { }) } +func TestORM_GetNonFatalTransactions(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + cfg := newTestChainScopedConfig(t) + txStore := cltest.NewTestTxStore(t, db, cfg.Database()) + ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth() + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) + + t.Run("gets 0 non finalized eth transaction", func(t *testing.T) { + txes, err := txStore.GetNonFatalTransactions(testutils.Context(t), ethClient.ConfiguredChainID()) + require.NoError(t, err) + require.Empty(t, txes) + }) + + t.Run("get in progress, unstarted, and unconfirmed eth transactions", func(t *testing.T) { + inProgressTx := mustInsertInProgressEthTxWithAttempt(t, txStore, 123, fromAddress) + unstartedTx := mustCreateUnstartedGeneratedTx(t, txStore, fromAddress, ethClient.ConfiguredChainID()) + + txes, err := txStore.GetNonFatalTransactions(testutils.Context(t), ethClient.ConfiguredChainID()) + require.NoError(t, err) + + for _, tx := range txes { + require.True(t, tx.ID == inProgressTx.ID || tx.ID == unstartedTx.ID) + } + }) +} + +func TestORM_GetTxByID(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + cfg := newTestChainScopedConfig(t) + txStore := cltest.NewTestTxStore(t, db, cfg.Database()) + ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth() + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) + + t.Run("no transaction", func(t *testing.T) { + tx, err := txStore.GetTxByID(testutils.Context(t), int64(0)) + require.NoError(t, err) + require.Nil(t, tx) + }) + + t.Run("get transaction by ID", func(t *testing.T) { + insertedTx := mustInsertInProgressEthTxWithAttempt(t, txStore, 123, fromAddress) + tx, err := txStore.GetTxByID(testutils.Context(t), insertedTx.ID) + require.NoError(t, err) + require.NotNil(t, tx) + }) +} + +func TestORM_GetFatalTransactions(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + cfg := newTestChainScopedConfig(t) + txStore := cltest.NewTestTxStore(t, db, cfg.Database()) + ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth() + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) + + t.Run("gets 0 fatal eth transactions", func(t *testing.T) { + txes, err := txStore.GetFatalTransactions(testutils.Context(t)) + require.NoError(t, err) + require.Empty(t, txes) + }) + + t.Run("get fatal transactions", func(t *testing.T) { + fatalTx := mustInsertFatalErrorEthTx(t, txStore, fromAddress) + txes, err := txStore.GetFatalTransactions(testutils.Context(t)) + require.NoError(t, err) + require.Equal(t, txes[0].ID, fatalTx.ID) + }) +} + func TestORM_HasInProgressTransaction(t *testing.T) { t.Parallel() diff --git a/core/chains/evm/txmgr/mocks/evm_tx_store.go b/core/chains/evm/txmgr/mocks/evm_tx_store.go index f491bda40bb..00efc1add98 100644 --- a/core/chains/evm/txmgr/mocks/evm_tx_store.go +++ b/core/chains/evm/txmgr/mocks/evm_tx_store.go @@ -649,6 +649,58 @@ func (_m *EvmTxStore) GetInProgressTxAttempts(ctx context.Context, address commo return r0, r1 } +// GetNonFatalTransactions provides a mock function with given fields: ctx, chainID +func (_m *EvmTxStore) GetNonFatalTransactions(ctx context.Context, chainID *big.Int) ([]*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error) { + ret := _m.Called(ctx, chainID) + + var r0 []*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee] + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) ([]*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error)); ok { + return rf(ctx, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) []*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]); ok { + r0 = rf(ctx, chainID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { + r1 = rf(ctx, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetTxByID provides a mock function with given fields: ctx, id +func (_m *EvmTxStore) GetTxByID(ctx context.Context, id int64) (*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error) { + ret := _m.Called(ctx, id) + + var r0 *types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee] + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64) (*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error)); ok { + return rf(ctx, id) + } + if rf, ok := ret.Get(0).(func(context.Context, int64) *types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]); ok { + r0 = rf(ctx, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetTxInProgress provides a mock function with given fields: ctx, fromAddress func (_m *EvmTxStore) GetTxInProgress(ctx context.Context, fromAddress common.Address) (*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error) { ret := _m.Called(ctx, fromAddress) @@ -699,6 +751,30 @@ func (_m *EvmTxStore) HasInProgressTransaction(ctx context.Context, account comm return r0, r1 } +// IsTxFinalized provides a mock function with given fields: ctx, blockHeight, txID, chainID +func (_m *EvmTxStore) IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID *big.Int) (bool, error) { + ret := _m.Called(ctx, blockHeight, txID, chainID) + + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64, int64, *big.Int) (bool, error)); ok { + return rf(ctx, blockHeight, txID, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, int64, int64, *big.Int) bool); ok { + r0 = rf(ctx, blockHeight, txID, chainID) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(context.Context, int64, int64, *big.Int) error); ok { + r1 = rf(ctx, blockHeight, txID, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // LoadTxAttempts provides a mock function with given fields: ctx, etx func (_m *EvmTxStore) LoadTxAttempts(ctx context.Context, etx *types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]) error { ret := _m.Called(ctx, etx) diff --git a/core/chains/evm/txmgr/models.go b/core/chains/evm/txmgr/models.go index 9044c52c9ae..4c622ec945a 100644 --- a/core/chains/evm/txmgr/models.go +++ b/core/chains/evm/txmgr/models.go @@ -19,7 +19,8 @@ import ( type ( Confirmer = txmgr.Confirmer[*big.Int, *evmtypes.Head, common.Address, common.Hash, common.Hash, *evmtypes.Receipt, evmtypes.Nonce, gas.EvmFee] Broadcaster = txmgr.Broadcaster[*big.Int, *evmtypes.Head, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee] - Resender = txmgr.Resender[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee] + Resender = txmgr.Resender[*big.Int, common.Address, common.Hash, common.Hash, *evmtypes.Receipt, evmtypes.Nonce, gas.EvmFee] + Tracker = txmgr.Tracker[*big.Int, common.Address, common.Hash, common.Hash, *evmtypes.Receipt, evmtypes.Nonce, gas.EvmFee] Reaper = txmgr.Reaper[*big.Int] TxStore = txmgrtypes.TxStore[common.Address, *big.Int, common.Hash, common.Hash, *evmtypes.Receipt, evmtypes.Nonce, gas.EvmFee] TransactionStore = txmgrtypes.TransactionStore[common.Address, *big.Int, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee] diff --git a/core/chains/evm/txmgr/resender_test.go b/core/chains/evm/txmgr/resender_test.go index d2eefdece59..0e86c0d4f8c 100644 --- a/core/chains/evm/txmgr/resender_test.go +++ b/core/chains/evm/txmgr/resender_test.go @@ -65,7 +65,7 @@ func Test_EthResender_resendUnconfirmed(t *testing.T) { addr3TxesRawHex = append(addr3TxesRawHex, hexutil.Encode(etx.TxAttempts[0].SignedRawTx)) } - er := txmgr.NewEvmResender(lggr, txStore, txmgr.NewEvmTxmClient(ethClient), ethKeyStore, 100*time.Millisecond, ccfg.EVM(), ccfg.EVM().Transactions()) + er := txmgr.NewEvmResender(lggr, txStore, txmgr.NewEvmTxmClient(ethClient), txmgr.NewEvmTracker(txStore, ethKeyStore, big.NewInt(0), lggr), ethKeyStore, 100*time.Millisecond, ccfg.EVM(), ccfg.EVM().Transactions()) var resentHex = make(map[string]struct{}) ethClient.On("BatchCallContextAll", mock.Anything, mock.MatchedBy(func(elems []rpc.BatchElem) bool { @@ -121,7 +121,7 @@ func Test_EthResender_alertUnconfirmed(t *testing.T) { txStore := cltest.NewTestTxStore(t, db, logCfg) originalBroadcastAt := time.Unix(1616509100, 0) - er := txmgr.NewEvmResender(lggr, txStore, txmgr.NewEvmTxmClient(ethClient), ethKeyStore, 100*time.Millisecond, ccfg.EVM(), ccfg.EVM().Transactions()) + er := txmgr.NewEvmResender(lggr, txStore, txmgr.NewEvmTxmClient(ethClient), txmgr.NewEvmTracker(txStore, ethKeyStore, big.NewInt(0), lggr), ethKeyStore, 100*time.Millisecond, ccfg.EVM(), ccfg.EVM().Transactions()) t.Run("alerts only once for unconfirmed transaction attempt within the unconfirmedTxAlertDelay duration", func(t *testing.T) { _ = cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, int64(1), fromAddress, originalBroadcastAt) @@ -157,7 +157,7 @@ func Test_EthResender_Start(t *testing.T) { t.Run("resends transactions that have been languishing unconfirmed for too long", func(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - er := txmgr.NewEvmResender(lggr, txStore, txmgr.NewEvmTxmClient(ethClient), ethKeyStore, 100*time.Millisecond, ccfg.EVM(), ccfg.EVM().Transactions()) + er := txmgr.NewEvmResender(lggr, txStore, txmgr.NewEvmTxmClient(ethClient), txmgr.NewEvmTracker(txStore, ethKeyStore, big.NewInt(0), lggr), ethKeyStore, 100*time.Millisecond, ccfg.EVM(), ccfg.EVM().Transactions()) originalBroadcastAt := time.Unix(1616509100, 0) etx := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 0, fromAddress, originalBroadcastAt) diff --git a/core/chains/evm/txmgr/tracker_test.go b/core/chains/evm/txmgr/tracker_test.go new file mode 100644 index 00000000000..a31187f04e8 --- /dev/null +++ b/core/chains/evm/txmgr/tracker_test.go @@ -0,0 +1,161 @@ +package txmgr_test + +import ( + "context" + "math/big" + "testing" + "time" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" + "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore" + "github.com/smartcontractkit/chainlink/v2/core/utils" + + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" +) + +const waitTime = 5 * time.Millisecond + +func newTestEvmTrackerSetup(t *testing.T) (*txmgr.Tracker, txmgr.TestEvmTxStore, keystore.Eth, []common.Address) { + db := pgtest.NewSqlxDB(t) + cfg := newTestChainScopedConfig(t) + txStore := cltest.NewTestTxStore(t, db, cfg.Database()) + ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth() + chainID := big.NewInt(0) + enabledAddresses := generateEnabledAddresses(t, ethKeyStore, chainID) + lggr := logger.TestLogger(t) + return txmgr.NewEvmTracker(txStore, ethKeyStore, chainID, lggr), txStore, ethKeyStore, enabledAddresses +} + +func generateEnabledAddresses(t *testing.T, keyStore keystore.Eth, chainID *big.Int) []common.Address { + var enabledAddresses []common.Address + _, addr1 := cltest.MustInsertRandomKey(t, keyStore, *utils.NewBigI(chainID.Int64())) + _, addr2 := cltest.MustInsertRandomKey(t, keyStore, *utils.NewBigI(chainID.Int64())) + enabledAddresses = append(enabledAddresses, addr1, addr2) + return enabledAddresses +} + +func containsID(txes []*txmgr.Tx, id int64) bool { + for _, tx := range txes { + if tx.ID == id { + return true + } + } + return false +} + +func TestEvmTracker_Initialization(t *testing.T) { + t.Parallel() + + tracker, _, _, _ := newTestEvmTrackerSetup(t) + + err := tracker.Start(context.Background()) + require.NoError(t, err) + require.True(t, tracker.IsStarted()) + + t.Run("stop tracker", func(t *testing.T) { + err := tracker.Close() + require.NoError(t, err) + require.False(t, tracker.IsStarted()) + }) +} + +func TestEvmTracker_AddressTracking(t *testing.T) { + t.Parallel() + + t.Run("track abandoned addresses", func(t *testing.T) { + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + tracker, txStore, _, _ := newTestEvmTrackerSetup(t) + inProgressAddr := cltest.MustGenerateRandomKey(t).Address + unstartedAddr := cltest.MustGenerateRandomKey(t).Address + unconfirmedAddr := cltest.MustGenerateRandomKey(t).Address + confirmedAddr := cltest.MustGenerateRandomKey(t).Address + _ = mustInsertInProgressEthTxWithAttempt(t, txStore, 123, inProgressAddr) + _ = cltest.MustInsertUnconfirmedEthTx(t, txStore, 123, unconfirmedAddr) + _ = mustInsertConfirmedEthTxWithReceipt(t, txStore, confirmedAddr, 123, 1) + _ = mustCreateUnstartedTx(t, txStore, unstartedAddr, cltest.MustGenerateRandomKey(t).Address, []byte{}, 0, big.Int{}, ethClient.ConfiguredChainID()) + + err := tracker.Start(context.Background()) + require.NoError(t, err) + defer func(tracker *txmgr.Tracker) { + err = tracker.Close() + require.NoError(t, err) + }(tracker) + + addrs := tracker.GetAbandonedAddresses() + require.NotContains(t, addrs, inProgressAddr) + require.NotContains(t, addrs, unstartedAddr) + require.Contains(t, addrs, confirmedAddr) + require.Contains(t, addrs, unconfirmedAddr) + }) + + t.Run("stop tracking finalized tx", func(t *testing.T) { + tracker, txStore, _, _ := newTestEvmTrackerSetup(t) + confirmedAddr := cltest.MustGenerateRandomKey(t).Address + _ = mustInsertConfirmedEthTxWithReceipt(t, txStore, confirmedAddr, 123, 1) + + err := tracker.Start(context.Background()) + require.NoError(t, err) + defer func(tracker *txmgr.Tracker) { + err = tracker.Close() + require.NoError(t, err) + }(tracker) + + addrs := tracker.GetAbandonedAddresses() + require.Contains(t, addrs, confirmedAddr) + + // deliver block past minConfirmations to finalize tx + tracker.XXXDeliverBlock(10) + time.Sleep(waitTime) + + addrs = tracker.GetAbandonedAddresses() + require.NotContains(t, addrs, confirmedAddr) + }) +} + +func TestEvmTracker_ExceedingTTL(t *testing.T) { + t.Parallel() + + t.Run("confirmed but unfinalized transaction still tracked", func(t *testing.T) { + tracker, txStore, _, _ := newTestEvmTrackerSetup(t) + addr1 := cltest.MustGenerateRandomKey(t).Address + _ = mustInsertConfirmedEthTxWithReceipt(t, txStore, addr1, 123, 1) + + err := tracker.Start(context.Background()) + require.NoError(t, err) + defer func(tracker *txmgr.Tracker) { + err = tracker.Close() + require.NoError(t, err) + }(tracker) + + require.Contains(t, tracker.GetAbandonedAddresses(), addr1) + }) + + t.Run("exceeding ttl", func(t *testing.T) { + tracker, txStore, _, _ := newTestEvmTrackerSetup(t) + addr1 := cltest.MustGenerateRandomKey(t).Address + addr2 := cltest.MustGenerateRandomKey(t).Address + tx1 := mustInsertInProgressEthTxWithAttempt(t, txStore, 123, addr1) + tx2 := cltest.MustInsertUnconfirmedEthTx(t, txStore, 123, addr2) + + tracker.XXXTestSetTTL(time.Nanosecond) + err := tracker.Start(context.Background()) + require.NoError(t, err) + defer func(tracker *txmgr.Tracker) { + err = tracker.Close() + require.NoError(t, err) + }(tracker) + + time.Sleep(waitTime) + require.NotContains(t, tracker.GetAbandonedAddresses(), addr1, addr2) + + fatalTxes, err := txStore.GetFatalTransactions(context.Background()) + require.NoError(t, err) + require.True(t, containsID(fatalTxes, tx1.ID)) + require.True(t, containsID(fatalTxes, tx2.ID)) + }) +} diff --git a/core/internal/cltest/factories.go b/core/internal/cltest/factories.go index 46014c4e04f..f0ce8c4ff66 100644 --- a/core/internal/cltest/factories.go +++ b/core/internal/cltest/factories.go @@ -193,6 +193,7 @@ func MustInsertConfirmedEthTxWithLegacyAttempt(t *testing.T, txStore txmgr.TestE n := evmtypes.Nonce(nonce) etx.Sequence = &n etx.State = txmgrcommon.TxConfirmed + etx.MinConfirmations.SetValid(6) require.NoError(t, txStore.InsertTx(&etx)) attempt := NewLegacyEthTxAttempt(t, etx.ID) attempt.BroadcastBeforeBlockNum = &broadcastBeforeBlockNum diff --git a/core/scripts/common/vrf/model/model.go b/core/scripts/common/vrf/model/model.go index 42deb424536..0972c47e618 100644 --- a/core/scripts/common/vrf/model/model.go +++ b/core/scripts/common/vrf/model/model.go @@ -1,8 +1,9 @@ package model import ( - "github.com/ethereum/go-ethereum/common" "math/big" + + "github.com/ethereum/go-ethereum/common" ) var ( diff --git a/core/services/vrf/v2/integration_v2_test.go b/core/services/vrf/v2/integration_v2_test.go index 74d923ce09f..fa95b694f98 100644 --- a/core/services/vrf/v2/integration_v2_test.go +++ b/core/services/vrf/v2/integration_v2_test.go @@ -137,7 +137,7 @@ func makeTestTxm(t *testing.T, txStore txmgr.TestEvmTxStore, keyStore keystore.M _, _, evmConfig := txmgr.MakeTestConfigs(t) txmConfig := txmgr.NewEvmTxmConfig(evmConfig) txm := txmgr.NewEvmTxm(ec.ConfiguredChainID(), txmConfig, evmConfig.Transactions(), keyStore.Eth(), logger.TestLogger(t), nil, nil, - nil, txStore, nil, nil, nil, nil) + nil, txStore, nil, nil, nil, nil, nil) return txm } diff --git a/core/services/vrf/v2/listener_v2_test.go b/core/services/vrf/v2/listener_v2_test.go index bcc85b3700d..6192db95dfe 100644 --- a/core/services/vrf/v2/listener_v2_test.go +++ b/core/services/vrf/v2/listener_v2_test.go @@ -39,7 +39,7 @@ func makeTestTxm(t *testing.T, txStore txmgr.TestEvmTxStore, keyStore keystore.M ec := evmtest.NewEthClientMockWithDefaultChain(t) txmConfig := txmgr.NewEvmTxmConfig(evmConfig) txm := txmgr.NewEvmTxm(ec.ConfiguredChainID(), txmConfig, evmConfig.Transactions(), keyStore.Eth(), logger.TestLogger(t), nil, nil, - nil, txStore, nil, nil, nil, nil) + nil, txStore, nil, nil, nil, nil, nil) return txm } diff --git a/core/store/migrate/migrations/0210_remove_evm_key_states_fk_constraint.sql b/core/store/migrate/migrations/0210_remove_evm_key_states_fk_constraint.sql new file mode 100644 index 00000000000..119de9d260e --- /dev/null +++ b/core/store/migrate/migrations/0210_remove_evm_key_states_fk_constraint.sql @@ -0,0 +1,4 @@ +-- +goose Up +ALTER TABLE evm.txes DROP CONSTRAINT eth_txes_evm_chain_id_from_address_fkey; +-- +goose Down +ALTER TABLE evm.txes ADD CONSTRAINT eth_txes_evm_chain_id_from_address_fkey FOREIGN KEY (evm_chain_id, from_address) REFERENCES evm.key_states(evm_chain_id, address) ON DELETE CASCADE DEFERRABLE INITIALLY IMMEDIATE NOT VALID; \ No newline at end of file diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index c5f243626d2..e8013d3617a 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added a tracker component to the txmgr for tracking and gracefully handling abandoned transactions. Abandoned transactions occur when a fromAddress is removed from the keystore by a node operator. The tracker gives abandoned transactions a chance to be finalized on chain, or marks them as fatal_error if they are not finalized within a specified time to live (default 6hrs). - Added distributed tracing in the OpenTelemetry trace format to the node, currently focused at the LOOPP Plugin development effort. This includes a new set of `Tracing` TOML configurations. The default for collecting traces is off - you must explicitly enable traces and setup a valid OpenTelemetry collector. Refer to `.github/tracing/README.md` for more details. - Added a new, optional WebServer authentication option that supports LDAP as a user identity provider. This enables user login access and user roles to be managed and provisioned via a centralized remote server that supports the LDAP protocol, which can be helpful when running multiple nodes. See the documentation for more information and config setup instructions. There is a new `[WebServer].AuthenticationMethod` config option, when set to `ldap` requires the new `[WebServer.LDAP]` config section to be defined, see the reference `docs/core.toml`. - New prom metrics for mercury transmit queue: