From 8e329edc87202b7e2b1fb150b693d44ffea00870 Mon Sep 17 00:00:00 2001 From: James Walker Date: Wed, 21 Feb 2024 13:11:27 -0500 Subject: [PATCH 01/16] add address state --- common/txmgr/address_state.go | 257 ++++++++++++++++++++++++++++++++++ 1 file changed, 257 insertions(+) create mode 100644 common/txmgr/address_state.go diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go new file mode 100644 index 00000000000..f6fd8d05433 --- /dev/null +++ b/common/txmgr/address_state.go @@ -0,0 +1,257 @@ +package txmgr + +import ( + "sync" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" + txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" + "github.com/smartcontractkit/chainlink/v2/common/types" + "gopkg.in/guregu/null.v4" +) + +// AddressState is the state of all transactions for a given address +type AddressState[ + CHAIN_ID types.ID, + ADDR, TX_HASH, BLOCK_HASH types.Hashable, + R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], + SEQ types.Sequence, + FEE feetypes.Fee, +] struct { + lggr logger.SugaredLogger + chainID CHAIN_ID + fromAddress ADDR + + sync.RWMutex + idempotencyKeyToTx map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + unstarted *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + inprogress *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + // NOTE: currently the unconfirmed map's key is the transaction ID that is assigned via the postgres DB + unconfirmed map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + confirmedMissingReceipt map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + confirmed map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + allTransactions map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + fatalErrored map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + // TODO: FINISH populate attemptHashToTxAttempt + // TODO: ANY NEW ATTEMPTS NEED TO BE ADDED TO THIS MAP + attemptHashToTxAttempt map[TX_HASH]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] +} + +// NewAddressState returns a new AddressState instance with initialized transaction state +func NewAddressState[ + CHAIN_ID types.ID, + ADDR, TX_HASH, BLOCK_HASH types.Hashable, + R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], + SEQ types.Sequence, + FEE feetypes.Fee, +]( + lggr logger.SugaredLogger, + chainID CHAIN_ID, + fromAddress ADDR, + maxUnstarted int, + txs []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], +) (*AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE], error) { + // Count the number of transactions in each state to reduce the number of map resizes + counts := map[txmgrtypes.TxState]int{ + TxUnstarted: 0, + TxInProgress: 0, + TxUnconfirmed: 0, + TxConfirmedMissingReceipt: 0, + TxConfirmed: 0, + TxFatalError: 0, + } + var idempotencyKeysCount int + var txAttemptCount int + for _, tx := range txs { + counts[tx.State]++ + if tx.IdempotencyKey != nil { + idempotencyKeysCount++ + } + if tx.State == TxUnconfirmed { + txAttemptCount += len(tx.TxAttempts) + } + } + + // TODO: MAKE BETTER + // nit: probably not a big deal but not all txs have an idempotency key so we're probably initializing this map bigger than it needs to be here. + as := AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{ + lggr: lggr, + chainID: chainID, + fromAddress: fromAddress, + + idempotencyKeyToTx: make(map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], idempotencyKeysCount), + unstarted: NewTxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](maxUnstarted), + inprogress: nil, + unconfirmed: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxUnconfirmed]), + confirmedMissingReceipt: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxConfirmedMissingReceipt]), + confirmed: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxConfirmed]), + allTransactions: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], len(txs)), + fatalErrored: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxFatalError]), + attemptHashToTxAttempt: make(map[TX_HASH]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], txAttemptCount), + } + + // Load all transactions supplied + for i := 0; i < len(txs); i++ { + tx := txs[i] + switch tx.State { + case TxUnstarted: + as.unstarted.AddTx(&tx) + case TxInProgress: + as.inprogress = &tx + case TxUnconfirmed: + as.unconfirmed[tx.ID] = &tx + case TxConfirmedMissingReceipt: + as.confirmedMissingReceipt[tx.ID] = &tx + case TxConfirmed: + as.confirmed[tx.ID] = &tx + case TxFatalError: + as.fatalErrored[tx.ID] = &tx + } + as.allTransactions[tx.ID] = &tx + if tx.IdempotencyKey != nil { + as.idempotencyKeyToTx[*tx.IdempotencyKey] = &tx + } + for _, txAttempt := range tx.TxAttempts { + as.attemptHashToTxAttempt[txAttempt.Hash] = txAttempt + } + } + + return &as, nil +} + +// CountTransactionsByState returns the number of transactions that are in the given state +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(txState txmgrtypes.TxState) int { + return 0 +} + +// FindTxWithIdempotencyKey returns the transaction with the given idempotency key. If no transaction is found, nil is returned. +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxWithIdempotencyKey(key string) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { + return nil +} + +// ApplyToTxsByState calls the given function for each transaction in the given states. +// If txIDs are provided, only the transactions with those IDs are considered. +// If no txIDs are provided, all transactions in the given states are considered. +// If no txStates are provided, all transactions are considered. +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ApplyToTxsByState( + txStates []txmgrtypes.TxState, + fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]), + txIDs ...int64, +) { +} + +// FetchTxAttempts returns all attempts for the given transactions that match the given filters. +// If txIDs are provided, only the transactions with those IDs are considered. +// If no txIDs are provided, all transactions are considered. +// If no txStates are provided, all transactions are considered. +// The txFilter is applied to the transactions and the txAttemptFilter is applied to the attempts. +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FetchTxAttempts( + txStates []txmgrtypes.TxState, + txFilter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, + txAttemptFilter func(*txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, + txIDs ...int64, +) []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { + return nil +} + +// FetchTxs returns all transactions that match the given filters. +// If txIDs are provided, only the transactions with those IDs are considered. +// If no txIDs are provided, all transactions are considered. +// If no txStates are provided, all transactions are considered. +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FetchTxs( + txStates []txmgrtypes.TxState, + filter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, + txIDs ...int64, +) []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { + return nil +} + +// PruneUnstartedTxQueue removes the transactions with the given IDs from the unstarted transaction queue. +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PruneUnstartedTxQueue(ids []int64) { +} + +// DeleteTxs removes the transactions with the given IDs from the address state. +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) DeleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { +} + +// PeekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue. +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + return nil, nil +} + +// PeekInProgressTx returns the in-progress transaction without removing it from the in-progress state. +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + return nil, nil +} + +// AddTxToUnstarted adds the given transaction to the unstarted queue. +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) AddTxToUnstarted(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + return nil +} + +// MoveUnstartedToInProgress moves the next unstarted transaction to the in-progress state. +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnstartedToInProgress( + etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + txAttempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], +) error { + return nil +} + +// MoveConfirmedMissingReceiptToUnconfirmed moves the confirmed missing receipt transaction to the unconfirmed state. +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveConfirmedMissingReceiptToUnconfirmed( + txID int64, +) error { + return nil +} + +// MoveInProgressToUnconfirmed moves the in-progress transaction to the unconfirmed state. +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveInProgressToUnconfirmed( + etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], +) error { + return nil +} + +// MoveUnconfirmedToConfirmed moves the unconfirmed transaction to the confirmed state. +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnconfirmedToConfirmed( + receipt txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], +) error { + return nil +} + +// MoveUnstartedToFatalError moves the unstarted transaction to the fatal error state. +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnstartedToFatalError( + etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + txError null.String, +) error { + return nil +} + +// MoveInProgressToFatalError moves the in-progress transaction to the fatal error state. +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveInProgressToFatalError(txError null.String) error { + return nil +} + +// MoveConfirmedMissingReceiptToFatalError moves the confirmed missing receipt transaction to the fatal error state. +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveConfirmedMissingReceiptToFatalError( + etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + txError null.String, +) error { + return nil +} + +// MoveUnconfirmedToConfirmedMissingReceipt moves the unconfirmed transaction to the confirmed missing receipt state. +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnconfirmedToConfirmedMissingReceipt(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { + return nil +} + +// MoveInProgressToConfirmedMissingReceipt moves the in-progress transaction to the confirmed missing receipt state. +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveInProgressToConfirmedMissingReceipt(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { + return nil +} + +// MoveConfirmedToUnconfirmed moves the confirmed transaction to the unconfirmed state. +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveConfirmedToUnconfirmed(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + return nil +} From b69865e874bd45c74af9bdcef770c39201b2c66d Mon Sep 17 00:00:00 2001 From: James Walker Date: Fri, 23 Feb 2024 13:23:36 -0500 Subject: [PATCH 02/16] change to better naming scheme and remove old TODOs --- common/txmgr/address_state.go | 56 ++++++++++++++++------------------- 1 file changed, 26 insertions(+), 30 deletions(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index f6fd8d05433..c2d78816024 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -24,18 +24,16 @@ type AddressState[ fromAddress ADDR sync.RWMutex - idempotencyKeyToTx map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - unstarted *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] - inprogress *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + idempotencyKeyToTxn map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + unstartedTxns *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + inprogressTxn *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] // NOTE: currently the unconfirmed map's key is the transaction ID that is assigned via the postgres DB - unconfirmed map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - confirmedMissingReceipt map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - confirmed map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - allTransactions map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - fatalErrored map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - // TODO: FINISH populate attemptHashToTxAttempt - // TODO: ANY NEW ATTEMPTS NEED TO BE ADDED TO THIS MAP - attemptHashToTxAttempt map[TX_HASH]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + unconfirmedTxns map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + confirmedMissingReceiptTxns map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + confirmedTxns map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + allTxns map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + fatalErroredTxns map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + attemptHashToTxAttempt map[TX_HASH]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] } // NewAddressState returns a new AddressState instance with initialized transaction state @@ -73,22 +71,20 @@ func NewAddressState[ } } - // TODO: MAKE BETTER - // nit: probably not a big deal but not all txs have an idempotency key so we're probably initializing this map bigger than it needs to be here. as := AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{ lggr: lggr, chainID: chainID, fromAddress: fromAddress, - idempotencyKeyToTx: make(map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], idempotencyKeysCount), - unstarted: NewTxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](maxUnstarted), - inprogress: nil, - unconfirmed: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxUnconfirmed]), - confirmedMissingReceipt: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxConfirmedMissingReceipt]), - confirmed: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxConfirmed]), - allTransactions: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], len(txs)), - fatalErrored: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxFatalError]), - attemptHashToTxAttempt: make(map[TX_HASH]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], txAttemptCount), + idempotencyKeyToTxn: make(map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], idempotencyKeysCount), + unstartedTxns: NewTxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](maxUnstarted), + inprogressTxn: nil, + unconfirmedTxns: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxUnconfirmed]), + confirmedMissingReceiptTxns: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxConfirmedMissingReceipt]), + confirmedTxns: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxConfirmed]), + allTxns: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], len(txs)), + fatalErroredTxns: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxFatalError]), + attemptHashToTxAttempt: make(map[TX_HASH]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], txAttemptCount), } // Load all transactions supplied @@ -96,21 +92,21 @@ func NewAddressState[ tx := txs[i] switch tx.State { case TxUnstarted: - as.unstarted.AddTx(&tx) + as.unstartedTxns.AddTx(&tx) case TxInProgress: - as.inprogress = &tx + as.inprogressTxn = &tx case TxUnconfirmed: - as.unconfirmed[tx.ID] = &tx + as.unconfirmedTxns[tx.ID] = &tx case TxConfirmedMissingReceipt: - as.confirmedMissingReceipt[tx.ID] = &tx + as.confirmedMissingReceiptTxns[tx.ID] = &tx case TxConfirmed: - as.confirmed[tx.ID] = &tx + as.confirmedTxns[tx.ID] = &tx case TxFatalError: - as.fatalErrored[tx.ID] = &tx + as.fatalErroredTxns[tx.ID] = &tx } - as.allTransactions[tx.ID] = &tx + as.allTxns[tx.ID] = &tx if tx.IdempotencyKey != nil { - as.idempotencyKeyToTx[*tx.IdempotencyKey] = &tx + as.idempotencyKeyToTxn[*tx.IdempotencyKey] = &tx } for _, txAttempt := range tx.TxAttempts { as.attemptHashToTxAttempt[txAttempt.Hash] = txAttempt From 36574faa914955095abe2d2485324942bbb1d529 Mon Sep 17 00:00:00 2001 From: James Walker Date: Fri, 23 Feb 2024 13:29:09 -0500 Subject: [PATCH 03/16] change naming of transactions from Txn to Tx to match the struct --- common/txmgr/address_state.go | 52 +++++++++++++++++------------------ 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index c2d78816024..67942e55f73 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -24,16 +24,16 @@ type AddressState[ fromAddress ADDR sync.RWMutex - idempotencyKeyToTxn map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - unstartedTxns *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] - inprogressTxn *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + idempotencyKeyToTx map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + unstartedTxs *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + inprogressTx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] // NOTE: currently the unconfirmed map's key is the transaction ID that is assigned via the postgres DB - unconfirmedTxns map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - confirmedMissingReceiptTxns map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - confirmedTxns map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - allTxns map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - fatalErroredTxns map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - attemptHashToTxAttempt map[TX_HASH]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + unconfirmedTxs map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + confirmedMissingReceiptTxs map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + confirmedTxs map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + allTxs map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + fatalErroredTxs map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + attemptHashToTxAttempt map[TX_HASH]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] } // NewAddressState returns a new AddressState instance with initialized transaction state @@ -76,15 +76,15 @@ func NewAddressState[ chainID: chainID, fromAddress: fromAddress, - idempotencyKeyToTxn: make(map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], idempotencyKeysCount), - unstartedTxns: NewTxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](maxUnstarted), - inprogressTxn: nil, - unconfirmedTxns: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxUnconfirmed]), - confirmedMissingReceiptTxns: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxConfirmedMissingReceipt]), - confirmedTxns: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxConfirmed]), - allTxns: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], len(txs)), - fatalErroredTxns: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxFatalError]), - attemptHashToTxAttempt: make(map[TX_HASH]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], txAttemptCount), + idempotencyKeyToTx: make(map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], idempotencyKeysCount), + unstartedTxs: NewTxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](maxUnstarted), + inprogressTx: nil, + unconfirmedTxs: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxUnconfirmed]), + confirmedMissingReceiptTxs: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxConfirmedMissingReceipt]), + confirmedTxs: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxConfirmed]), + allTxs: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], len(txs)), + fatalErroredTxs: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxFatalError]), + attemptHashToTxAttempt: make(map[TX_HASH]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], txAttemptCount), } // Load all transactions supplied @@ -92,21 +92,21 @@ func NewAddressState[ tx := txs[i] switch tx.State { case TxUnstarted: - as.unstartedTxns.AddTx(&tx) + as.unstartedTxs.AddTx(&tx) case TxInProgress: - as.inprogressTxn = &tx + as.inprogressTx = &tx case TxUnconfirmed: - as.unconfirmedTxns[tx.ID] = &tx + as.unconfirmedTxs[tx.ID] = &tx case TxConfirmedMissingReceipt: - as.confirmedMissingReceiptTxns[tx.ID] = &tx + as.confirmedMissingReceiptTxs[tx.ID] = &tx case TxConfirmed: - as.confirmedTxns[tx.ID] = &tx + as.confirmedTxs[tx.ID] = &tx case TxFatalError: - as.fatalErroredTxns[tx.ID] = &tx + as.fatalErroredTxs[tx.ID] = &tx } - as.allTxns[tx.ID] = &tx + as.allTxs[tx.ID] = &tx if tx.IdempotencyKey != nil { - as.idempotencyKeyToTxn[*tx.IdempotencyKey] = &tx + as.idempotencyKeyToTx[*tx.IdempotencyKey] = &tx } for _, txAttempt := range tx.TxAttempts { as.attemptHashToTxAttempt[txAttempt.Hash] = txAttempt From 788d9b77ae56f866dbec5021641b177c3d83a840 Mon Sep 17 00:00:00 2001 From: James Walker Date: Fri, 23 Feb 2024 13:43:14 -0500 Subject: [PATCH 04/16] goimports --- common/txmgr/address_state.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index 67942e55f73..2bbb777c634 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -4,11 +4,12 @@ import ( "sync" "time" + "gopkg.in/guregu/null.v4" + "github.com/smartcontractkit/chainlink-common/pkg/logger" feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" "github.com/smartcontractkit/chainlink/v2/common/types" - "gopkg.in/guregu/null.v4" ) // AddressState is the state of all transactions for a given address From 25186b5857b020542ebc4536287724cdd3fb77de Mon Sep 17 00:00:00 2001 From: James Walker Date: Thu, 29 Feb 2024 15:15:13 -0500 Subject: [PATCH 05/16] fix incorrect note --- common/txmgr/address_state.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index 2bbb777c634..b090199e535 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -25,16 +25,17 @@ type AddressState[ fromAddress ADDR sync.RWMutex - idempotencyKeyToTx map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - unstartedTxs *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] - inprogressTx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - // NOTE: currently the unconfirmed map's key is the transaction ID that is assigned via the postgres DB + idempotencyKeyToTx map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + attemptHashToTxAttempt map[TX_HASH]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + + unstartedTxs *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + inprogressTx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + // NOTE: below each map's key is the transaction ID that is assigned via the persistent datastore unconfirmedTxs map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] confirmedMissingReceiptTxs map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] confirmedTxs map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] allTxs map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] fatalErroredTxs map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - attemptHashToTxAttempt map[TX_HASH]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] } // NewAddressState returns a new AddressState instance with initialized transaction state From e0d99393f101f03db88008620991b4016e3829e2 Mon Sep 17 00:00:00 2001 From: James Walker Date: Thu, 29 Feb 2024 15:16:34 -0500 Subject: [PATCH 06/16] change naming from fetch to find --- common/txmgr/address_state.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index b090199e535..6a71e81de32 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -139,12 +139,12 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ApplyT ) { } -// FetchTxAttempts returns all attempts for the given transactions that match the given filters. +// FindTxAttempts returns all attempts for the given transactions that match the given filters. // If txIDs are provided, only the transactions with those IDs are considered. // If no txIDs are provided, all transactions are considered. // If no txStates are provided, all transactions are considered. // The txFilter is applied to the transactions and the txAttemptFilter is applied to the attempts. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FetchTxAttempts( +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxAttempts( txStates []txmgrtypes.TxState, txFilter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, txAttemptFilter func(*txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, @@ -153,11 +153,11 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FetchT return nil } -// FetchTxs returns all transactions that match the given filters. +// FindTxs returns all transactions that match the given filters. // If txIDs are provided, only the transactions with those IDs are considered. // If no txIDs are provided, all transactions are considered. // If no txStates are provided, all transactions are considered. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FetchTxs( +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxs( txStates []txmgrtypes.TxState, filter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, txIDs ...int64, From cd5f156559bc923bb008d3321a7cc4613e4c0ae0 Mon Sep 17 00:00:00 2001 From: James Walker Date: Thu, 29 Feb 2024 15:28:55 -0500 Subject: [PATCH 07/16] remove empty line --- common/txmgr/address_state.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index 6a71e81de32..c393fef6ef6 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -27,9 +27,8 @@ type AddressState[ sync.RWMutex idempotencyKeyToTx map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] attemptHashToTxAttempt map[TX_HASH]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - - unstartedTxs *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] - inprogressTx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + unstartedTxs *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + inprogressTx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] // NOTE: below each map's key is the transaction ID that is assigned via the persistent datastore unconfirmedTxs map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] confirmedMissingReceiptTxs map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] From dcb01d93c5d518d1761074f0fd37b253a4a579bf Mon Sep 17 00:00:00 2001 From: James Walker Date: Thu, 29 Feb 2024 15:58:23 -0500 Subject: [PATCH 08/16] update MoveTxToFatalError --- common/txmgr/address_state.go | 20 +++----------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index c393fef6ef6..8c8e47241f6 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -217,23 +217,9 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUn return nil } -// MoveUnstartedToFatalError moves the unstarted transaction to the fatal error state. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnstartedToFatalError( - etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], - txError null.String, -) error { - return nil -} - -// MoveInProgressToFatalError moves the in-progress transaction to the fatal error state. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveInProgressToFatalError(txError null.String) error { - return nil -} - -// MoveConfirmedMissingReceiptToFatalError moves the confirmed missing receipt transaction to the fatal error state. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveConfirmedMissingReceiptToFatalError( - etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], - txError null.String, +// MoveTxToFatalError moves a transaction to the fatal error state. +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveTxToFatalError( + txID int64, txError null.String, ) error { return nil } From c364858faa3de700d1fc4bdc917d0577768519a1 Mon Sep 17 00:00:00 2001 From: James Walker Date: Wed, 6 Mar 2024 16:49:12 -0500 Subject: [PATCH 09/16] update comment for addressState --- common/internal/queues/tx_priority_queue.go | 24 ++++----- common/txmgr/address_state.go | 55 +++++++++++---------- 2 files changed, 41 insertions(+), 38 deletions(-) diff --git a/common/internal/queues/tx_priority_queue.go b/common/internal/queues/tx_priority_queue.go index 49a0acf35a9..cc6026ecf05 100644 --- a/common/internal/queues/tx_priority_queue.go +++ b/common/internal/queues/tx_priority_queue.go @@ -8,8 +8,8 @@ import ( "github.com/smartcontractkit/chainlink/v2/common/types" ) -// txPriorityQueue is a priority queue of transactions prioritized by creation time. The oldest transaction is at the front of the queue. -type txPriorityQueue[ +// TxPriorityQueue is a priority queue of transactions prioritized by creation time. The oldest transaction is at the front of the queue. +type TxPriorityQueue[ CHAIN_ID types.ID, ADDR, TX_HASH, BLOCK_HASH types.Hashable, R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], @@ -26,8 +26,8 @@ func NewTxPriorityQueue[ R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], SEQ types.Sequence, FEE feetypes.Fee, -](capacity int) *txPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { - pq := txPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{ +](capacity int) *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { + pq := TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{ ph: NewPriorityHeap[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](capacity), } @@ -35,7 +35,7 @@ func NewTxPriorityQueue[ } // AddTx adds a transaction to the queue -func (pq *txPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) AddTx(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { +func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) AddTx(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { if pq.ph.Len() == pq.ph.Cap() { heap.Pop(pq.ph) } @@ -44,7 +44,7 @@ func (pq *txPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Add } // RemoveNextTx removes the next transaction to be processed from the queue -func (pq *txPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RemoveNextTx() *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { +func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RemoveNextTx() *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { if pq.ph.Len() == 0 { return nil } @@ -53,7 +53,7 @@ func (pq *txPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Rem } // RemoveTxByID removes the transaction with the given ID from the queue -func (pq *txPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RemoveTxByID(id int64) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { +func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RemoveTxByID(id int64) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { if pq.ph.Len() == 0 { return nil } @@ -66,7 +66,7 @@ func (pq *txPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Rem } // PruneByTxIDs removes the transactions with the given IDs from the queue -func (pq *txPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PruneByTxIDs(ids []int64) []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { +func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PruneByTxIDs(ids []int64) []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { removed := []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} for _, id := range ids { if tx := pq.RemoveTxByID(id); tx != nil { @@ -78,21 +78,21 @@ func (pq *txPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Pru } // PeekNextTx returns the next transaction to be processed without removing it from the queue -func (pq *txPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekNextTx() *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { +func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekNextTx() *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { return pq.ph.Peek() } // Close clears the queue -func (pq *txPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() { +func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() { pq.ph.Close() } // Cap returns the capacity of the queue -func (pq *txPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Cap() int { +func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Cap() int { return pq.ph.Cap() } // Len returns the length of the queue -func (pq *txPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Len() int { +func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Len() int { return pq.ph.Len() } diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index 8c8e47241f6..a2b5491c08f 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -8,12 +8,15 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" + "github.com/smartcontractkit/chainlink/v2/common/internal/queues" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" "github.com/smartcontractkit/chainlink/v2/common/types" ) -// AddressState is the state of all transactions for a given address -type AddressState[ +// addressState is the state of all transactions for a given address. +// It holds information about all transactions for a given address, including unstarted, in-progress, unconfirmed, confirmed, and fatal errored transactions. +// It is designed to help transition transactions between states and to provide information about the current state of transactions for a given address. +type addressState[ CHAIN_ID types.ID, ADDR, TX_HASH, BLOCK_HASH types.Hashable, R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], @@ -27,7 +30,7 @@ type AddressState[ sync.RWMutex idempotencyKeyToTx map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] attemptHashToTxAttempt map[TX_HASH]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - unstartedTxs *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + unstartedTxs *queues.TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] inprogressTx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] // NOTE: below each map's key is the transaction ID that is assigned via the persistent datastore unconfirmedTxs map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] @@ -37,8 +40,8 @@ type AddressState[ fatalErroredTxs map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] } -// NewAddressState returns a new AddressState instance with initialized transaction state -func NewAddressState[ +// newAddressState returns a new addressState instance with initialized transaction state +func newAddressState[ CHAIN_ID types.ID, ADDR, TX_HASH, BLOCK_HASH types.Hashable, R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], @@ -50,7 +53,7 @@ func NewAddressState[ fromAddress ADDR, maxUnstarted int, txs []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], -) (*AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE], error) { +) (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE], error) { // Count the number of transactions in each state to reduce the number of map resizes counts := map[txmgrtypes.TxState]int{ TxUnstarted: 0, @@ -72,13 +75,13 @@ func NewAddressState[ } } - as := AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{ + as := addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{ lggr: lggr, chainID: chainID, fromAddress: fromAddress, idempotencyKeyToTx: make(map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], idempotencyKeysCount), - unstartedTxs: NewTxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](maxUnstarted), + unstartedTxs: queues.NewTxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](maxUnstarted), inprogressTx: nil, unconfirmedTxs: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxUnconfirmed]), confirmedMissingReceiptTxs: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxConfirmedMissingReceipt]), @@ -118,12 +121,12 @@ func NewAddressState[ } // CountTransactionsByState returns the number of transactions that are in the given state -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(txState txmgrtypes.TxState) int { +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(txState txmgrtypes.TxState) int { return 0 } // FindTxWithIdempotencyKey returns the transaction with the given idempotency key. If no transaction is found, nil is returned. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxWithIdempotencyKey(key string) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxWithIdempotencyKey(key string) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { return nil } @@ -131,7 +134,7 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTx // If txIDs are provided, only the transactions with those IDs are considered. // If no txIDs are provided, all transactions in the given states are considered. // If no txStates are provided, all transactions are considered. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ApplyToTxsByState( +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ApplyToTxsByState( txStates []txmgrtypes.TxState, fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]), txIDs ...int64, @@ -143,7 +146,7 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ApplyT // If no txIDs are provided, all transactions are considered. // If no txStates are provided, all transactions are considered. // The txFilter is applied to the transactions and the txAttemptFilter is applied to the attempts. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxAttempts( +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxAttempts( txStates []txmgrtypes.TxState, txFilter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, txAttemptFilter func(*txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, @@ -156,7 +159,7 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTx // If txIDs are provided, only the transactions with those IDs are considered. // If no txIDs are provided, all transactions are considered. // If no txStates are provided, all transactions are considered. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxs( +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxs( txStates []txmgrtypes.TxState, filter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, txIDs ...int64, @@ -165,30 +168,30 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTx } // PruneUnstartedTxQueue removes the transactions with the given IDs from the unstarted transaction queue. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PruneUnstartedTxQueue(ids []int64) { +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PruneUnstartedTxQueue(ids []int64) { } // DeleteTxs removes the transactions with the given IDs from the address state. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) DeleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) DeleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { } // PeekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { return nil, nil } // PeekInProgressTx returns the in-progress transaction without removing it from the in-progress state. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { return nil, nil } // AddTxToUnstarted adds the given transaction to the unstarted queue. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) AddTxToUnstarted(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) AddTxToUnstarted(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { return nil } // MoveUnstartedToInProgress moves the next unstarted transaction to the in-progress state. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnstartedToInProgress( +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnstartedToInProgress( etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], txAttempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], ) error { @@ -196,14 +199,14 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUn } // MoveConfirmedMissingReceiptToUnconfirmed moves the confirmed missing receipt transaction to the unconfirmed state. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveConfirmedMissingReceiptToUnconfirmed( +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveConfirmedMissingReceiptToUnconfirmed( txID int64, ) error { return nil } // MoveInProgressToUnconfirmed moves the in-progress transaction to the unconfirmed state. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveInProgressToUnconfirmed( +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveInProgressToUnconfirmed( etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], ) error { @@ -211,30 +214,30 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveIn } // MoveUnconfirmedToConfirmed moves the unconfirmed transaction to the confirmed state. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnconfirmedToConfirmed( +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnconfirmedToConfirmed( receipt txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], ) error { return nil } // MoveTxToFatalError moves a transaction to the fatal error state. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveTxToFatalError( +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveTxToFatalError( txID int64, txError null.String, ) error { return nil } // MoveUnconfirmedToConfirmedMissingReceipt moves the unconfirmed transaction to the confirmed missing receipt state. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnconfirmedToConfirmedMissingReceipt(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnconfirmedToConfirmedMissingReceipt(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { return nil } // MoveInProgressToConfirmedMissingReceipt moves the in-progress transaction to the confirmed missing receipt state. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveInProgressToConfirmedMissingReceipt(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveInProgressToConfirmedMissingReceipt(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { return nil } // MoveConfirmedToUnconfirmed moves the confirmed transaction to the unconfirmed state. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveConfirmedToUnconfirmed(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveConfirmedToUnconfirmed(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { return nil } From 44854b0727c20726c28a3b65177521e995c75bb6 Mon Sep 17 00:00:00 2001 From: James Walker Date: Wed, 6 Mar 2024 16:57:20 -0500 Subject: [PATCH 10/16] address comments --- common/txmgr/address_state.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index a2b5491c08f..50119bee25f 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -51,9 +51,9 @@ func newAddressState[ lggr logger.SugaredLogger, chainID CHAIN_ID, fromAddress ADDR, - maxUnstarted int, + maxUnstarted uint64, txs []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], -) (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE], error) { +) *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { // Count the number of transactions in each state to reduce the number of map resizes counts := map[txmgrtypes.TxState]int{ TxUnstarted: 0, @@ -70,7 +70,7 @@ func newAddressState[ if tx.IdempotencyKey != nil { idempotencyKeysCount++ } - if tx.State == TxUnconfirmed { + if len(tx.TxAttempts) > 0 { txAttemptCount += len(tx.TxAttempts) } } @@ -81,7 +81,7 @@ func newAddressState[ fromAddress: fromAddress, idempotencyKeyToTx: make(map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], idempotencyKeysCount), - unstartedTxs: queues.NewTxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](maxUnstarted), + unstartedTxs: queues.NewTxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](int(maxUnstarted)), inprogressTx: nil, unconfirmedTxs: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxUnconfirmed]), confirmedMissingReceiptTxs: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxConfirmedMissingReceipt]), @@ -107,6 +107,8 @@ func newAddressState[ as.confirmedTxs[tx.ID] = &tx case TxFatalError: as.fatalErroredTxs[tx.ID] = &tx + default: + panic("unknown transaction state") } as.allTxs[tx.ID] = &tx if tx.IdempotencyKey != nil { @@ -117,7 +119,7 @@ func newAddressState[ } } - return &as, nil + return &as } // CountTransactionsByState returns the number of transactions that are in the given state From 751788e11378bdf83ba9c9315b9d3b7281c4deaa Mon Sep 17 00:00:00 2001 From: James Walker Date: Wed, 6 Mar 2024 17:27:25 -0500 Subject: [PATCH 11/16] change txAttempt to pointers in addressState --- common/txmgr/address_state.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index 50119bee25f..c4d24936807 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -29,7 +29,7 @@ type addressState[ sync.RWMutex idempotencyKeyToTx map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - attemptHashToTxAttempt map[TX_HASH]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + attemptHashToTxAttempt map[TX_HASH]*txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] unstartedTxs *queues.TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] inprogressTx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] // NOTE: below each map's key is the transaction ID that is assigned via the persistent datastore @@ -88,7 +88,7 @@ func newAddressState[ confirmedTxs: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxConfirmed]), allTxs: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], len(txs)), fatalErroredTxs: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxFatalError]), - attemptHashToTxAttempt: make(map[TX_HASH]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], txAttemptCount), + attemptHashToTxAttempt: make(map[TX_HASH]*txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], txAttemptCount), } // Load all transactions supplied @@ -115,7 +115,7 @@ func newAddressState[ as.idempotencyKeyToTx[*tx.IdempotencyKey] = &tx } for _, txAttempt := range tx.TxAttempts { - as.attemptHashToTxAttempt[txAttempt.Hash] = txAttempt + as.attemptHashToTxAttempt[txAttempt.Hash] = &txAttempt } } From 0f91f8c244965add47d49906c50f7b6ceb42f0f4 Mon Sep 17 00:00:00 2001 From: James Walker Date: Wed, 6 Mar 2024 17:30:58 -0500 Subject: [PATCH 12/16] unexport all addressState methods --- common/txmgr/address_state.go | 72 +++++++++++++++++------------------ 1 file changed, 36 insertions(+), 36 deletions(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index c4d24936807..6338837ef09 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -122,33 +122,33 @@ func newAddressState[ return &as } -// CountTransactionsByState returns the number of transactions that are in the given state -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(txState txmgrtypes.TxState) int { +// countTransactionsByState returns the number of transactions that are in the given state +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) countTransactionsByState(txState txmgrtypes.TxState) int { return 0 } -// FindTxWithIdempotencyKey returns the transaction with the given idempotency key. If no transaction is found, nil is returned. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxWithIdempotencyKey(key string) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { +// findTxWithIdempotencyKey returns the transaction with the given idempotency key. If no transaction is found, nil is returned. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxWithIdempotencyKey(key string) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { return nil } -// ApplyToTxsByState calls the given function for each transaction in the given states. +// applyToTxsByState calls the given function for each transaction in the given states. // If txIDs are provided, only the transactions with those IDs are considered. // If no txIDs are provided, all transactions in the given states are considered. // If no txStates are provided, all transactions are considered. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ApplyToTxsByState( +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) applyToTxsByState( txStates []txmgrtypes.TxState, fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]), txIDs ...int64, ) { } -// FindTxAttempts returns all attempts for the given transactions that match the given filters. +// findTxAttempts returns all attempts for the given transactions that match the given filters. // If txIDs are provided, only the transactions with those IDs are considered. // If no txIDs are provided, all transactions are considered. // If no txStates are provided, all transactions are considered. // The txFilter is applied to the transactions and the txAttemptFilter is applied to the attempts. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxAttempts( +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxAttempts( txStates []txmgrtypes.TxState, txFilter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, txAttemptFilter func(*txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, @@ -157,11 +157,11 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTx return nil } -// FindTxs returns all transactions that match the given filters. +// findTxs returns all transactions that match the given filters. // If txIDs are provided, only the transactions with those IDs are considered. // If no txIDs are provided, all transactions are considered. // If no txStates are provided, all transactions are considered. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxs( +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxs( txStates []txmgrtypes.TxState, filter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, txIDs ...int64, @@ -169,77 +169,77 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTx return nil } -// PruneUnstartedTxQueue removes the transactions with the given IDs from the unstarted transaction queue. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PruneUnstartedTxQueue(ids []int64) { +// pruneUnstartedTxQueue removes the transactions with the given IDs from the unstarted transaction queue. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneUnstartedTxQueue(ids []int64) { } -// DeleteTxs removes the transactions with the given IDs from the address state. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) DeleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { +// deleteTxs removes the transactions with the given IDs from the address state. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { } -// PeekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { +// peekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { return nil, nil } -// PeekInProgressTx returns the in-progress transaction without removing it from the in-progress state. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { +// peekInProgressTx returns the in-progress transaction without removing it from the in-progress state. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { return nil, nil } -// AddTxToUnstarted adds the given transaction to the unstarted queue. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) AddTxToUnstarted(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { +// addTxToUnstarted adds the given transaction to the unstarted queue. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) addTxToUnstarted(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { return nil } -// MoveUnstartedToInProgress moves the next unstarted transaction to the in-progress state. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnstartedToInProgress( +// moveUnstartedToInProgress moves the next unstarted transaction to the in-progress state. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUnstartedToInProgress( etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], txAttempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], ) error { return nil } -// MoveConfirmedMissingReceiptToUnconfirmed moves the confirmed missing receipt transaction to the unconfirmed state. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveConfirmedMissingReceiptToUnconfirmed( +// moveConfirmedMissingReceiptToUnconfirmed moves the confirmed missing receipt transaction to the unconfirmed state. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveConfirmedMissingReceiptToUnconfirmed( txID int64, ) error { return nil } -// MoveInProgressToUnconfirmed moves the in-progress transaction to the unconfirmed state. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveInProgressToUnconfirmed( +// moveInProgressToUnconfirmed moves the in-progress transaction to the unconfirmed state. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveInProgressToUnconfirmed( etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], ) error { return nil } -// MoveUnconfirmedToConfirmed moves the unconfirmed transaction to the confirmed state. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnconfirmedToConfirmed( +// moveUnconfirmedToConfirmed moves the unconfirmed transaction to the confirmed state. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUnconfirmedToConfirmed( receipt txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], ) error { return nil } -// MoveTxToFatalError moves a transaction to the fatal error state. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveTxToFatalError( +// moveTxToFatalError moves a transaction to the fatal error state. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveTxToFatalError( txID int64, txError null.String, ) error { return nil } -// MoveUnconfirmedToConfirmedMissingReceipt moves the unconfirmed transaction to the confirmed missing receipt state. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnconfirmedToConfirmedMissingReceipt(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { +// moveUnconfirmedToConfirmedMissingReceipt moves the unconfirmed transaction to the confirmed missing receipt state. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUnconfirmedToConfirmedMissingReceipt(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { return nil } -// MoveInProgressToConfirmedMissingReceipt moves the in-progress transaction to the confirmed missing receipt state. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveInProgressToConfirmedMissingReceipt(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { +// moveInProgressToConfirmedMissingReceipt moves the in-progress transaction to the confirmed missing receipt state. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveInProgressToConfirmedMissingReceipt(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { return nil } -// MoveConfirmedToUnconfirmed moves the confirmed transaction to the unconfirmed state. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveConfirmedToUnconfirmed(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { +// moveConfirmedToUnconfirmed moves the confirmed transaction to the unconfirmed state. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveConfirmedToUnconfirmed(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { return nil } From a3cb0de8d97eab9bc4e2cf4479490839424606ce Mon Sep 17 00:00:00 2001 From: James Walker Date: Thu, 7 Mar 2024 15:35:48 -0500 Subject: [PATCH 13/16] remove implicit for loop stuff --- common/txmgr/address_state.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index 6338837ef09..85cb895adc6 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -114,7 +114,8 @@ func newAddressState[ if tx.IdempotencyKey != nil { as.idempotencyKeyToTx[*tx.IdempotencyKey] = &tx } - for _, txAttempt := range tx.TxAttempts { + for i := 0; i < len(tx.TxAttempts); i++ { + txAttempt := tx.TxAttempts[i] as.attemptHashToTxAttempt[txAttempt.Hash] = &txAttempt } } From 6e1e9bc1a4b8ecf8d74f11cefc183e628be19129 Mon Sep 17 00:00:00 2001 From: James Walker Date: Fri, 8 Mar 2024 14:44:35 -0500 Subject: [PATCH 14/16] add check for max queue size --- common/txmgr/address_state.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index 85cb895adc6..a9e5ebf0aac 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -54,6 +54,10 @@ func newAddressState[ maxUnstarted uint64, txs []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], ) *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { + if maxUnstarted == 0 { + panic("new_address_state: MaxUnstarted queue size must be greater than 0") + } + // Count the number of transactions in each state to reduce the number of map resizes counts := map[txmgrtypes.TxState]int{ TxUnstarted: 0, From b10e97eb42cad387d22706df3aedbd1da7fa5714 Mon Sep 17 00:00:00 2001 From: amit-momin Date: Thu, 27 Jun 2024 17:33:37 -0500 Subject: [PATCH 15/16] Suppressed unused function warnings while adding framework --- common/txmgr/address_state.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index a9e5ebf0aac..d458952c178 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -40,6 +40,7 @@ type addressState[ fatalErroredTxs map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] } +//lint:ignore U1000 Ignore unused function temporarily while adding the framework // newAddressState returns a new addressState instance with initialized transaction state func newAddressState[ CHAIN_ID types.ID, @@ -127,16 +128,19 @@ func newAddressState[ return &as } +//lint:ignore U1000 Ignore unused function temporarily while adding the framework // countTransactionsByState returns the number of transactions that are in the given state func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) countTransactionsByState(txState txmgrtypes.TxState) int { return 0 } +//lint:ignore U1000 Ignore unused function temporarily while adding the framework // findTxWithIdempotencyKey returns the transaction with the given idempotency key. If no transaction is found, nil is returned. func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxWithIdempotencyKey(key string) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { return nil } +//lint:ignore U1000 Ignore unused function temporarily while adding the framework // applyToTxsByState calls the given function for each transaction in the given states. // If txIDs are provided, only the transactions with those IDs are considered. // If no txIDs are provided, all transactions in the given states are considered. @@ -148,6 +152,7 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) applyT ) { } +//lint:ignore U1000 Ignore unused function temporarily while adding the framework // findTxAttempts returns all attempts for the given transactions that match the given filters. // If txIDs are provided, only the transactions with those IDs are considered. // If no txIDs are provided, all transactions are considered. @@ -162,6 +167,7 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTx return nil } +//lint:ignore U1000 Ignore unused function temporarily while adding the framework // findTxs returns all transactions that match the given filters. // If txIDs are provided, only the transactions with those IDs are considered. // If no txIDs are provided, all transactions are considered. @@ -174,29 +180,35 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTx return nil } +//lint:ignore U1000 Ignore unused function temporarily while adding the framework // pruneUnstartedTxQueue removes the transactions with the given IDs from the unstarted transaction queue. func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneUnstartedTxQueue(ids []int64) { } +//lint:ignore U1000 Ignore unused function temporarily while adding the framework // deleteTxs removes the transactions with the given IDs from the address state. func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { } +//lint:ignore U1000 Ignore unused function temporarily while adding the framework // peekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue. func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { return nil, nil } +//lint:ignore U1000 Ignore unused function temporarily while adding the framework // peekInProgressTx returns the in-progress transaction without removing it from the in-progress state. func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { return nil, nil } +//lint:ignore U1000 Ignore unused function temporarily while adding the framework // addTxToUnstarted adds the given transaction to the unstarted queue. func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) addTxToUnstarted(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { return nil } +//lint:ignore U1000 Ignore unused function temporarily while adding the framework // moveUnstartedToInProgress moves the next unstarted transaction to the in-progress state. func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUnstartedToInProgress( etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], @@ -205,6 +217,7 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUn return nil } +//lint:ignore U1000 Ignore unused function temporarily while adding the framework // moveConfirmedMissingReceiptToUnconfirmed moves the confirmed missing receipt transaction to the unconfirmed state. func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveConfirmedMissingReceiptToUnconfirmed( txID int64, @@ -212,6 +225,7 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveCo return nil } +//lint:ignore U1000 Ignore unused function temporarily while adding the framework // moveInProgressToUnconfirmed moves the in-progress transaction to the unconfirmed state. func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveInProgressToUnconfirmed( etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], @@ -220,6 +234,7 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveIn return nil } +//lint:ignore U1000 Ignore unused function temporarily while adding the framework // moveUnconfirmedToConfirmed moves the unconfirmed transaction to the confirmed state. func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUnconfirmedToConfirmed( receipt txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], @@ -227,6 +242,7 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUn return nil } +//lint:ignore U1000 Ignore unused function temporarily while adding the framework // moveTxToFatalError moves a transaction to the fatal error state. func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveTxToFatalError( txID int64, txError null.String, @@ -234,16 +250,19 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveTx return nil } +//lint:ignore U1000 Ignore unused function temporarily while adding the framework // moveUnconfirmedToConfirmedMissingReceipt moves the unconfirmed transaction to the confirmed missing receipt state. func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUnconfirmedToConfirmedMissingReceipt(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { return nil } +//lint:ignore U1000 Ignore unused function temporarily while adding the framework // moveInProgressToConfirmedMissingReceipt moves the in-progress transaction to the confirmed missing receipt state. func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveInProgressToConfirmedMissingReceipt(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { return nil } +//lint:ignore U1000 Ignore unused function temporarily while adding the framework // moveConfirmedToUnconfirmed moves the confirmed transaction to the unconfirmed state. func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveConfirmedToUnconfirmed(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { return nil From c1d40af28b2eb93ed4b9b28bf16ece88dfed66cb Mon Sep 17 00:00:00 2001 From: amit-momin Date: Thu, 27 Jun 2024 17:48:16 -0500 Subject: [PATCH 16/16] Fixed linting error --- common/txmgr/address_state.go | 57 +++++++++++++++++++++++------------ 1 file changed, 38 insertions(+), 19 deletions(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index d458952c178..9ffc5baa081 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -40,8 +40,9 @@ type addressState[ fatalErroredTxs map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] } -//lint:ignore U1000 Ignore unused function temporarily while adding the framework // newAddressState returns a new addressState instance with initialized transaction state +// +//lint:ignore U1000 Ignore unused function temporarily while adding the framework func newAddressState[ CHAIN_ID types.ID, ADDR, TX_HASH, BLOCK_HASH types.Hashable, @@ -128,23 +129,26 @@ func newAddressState[ return &as } -//lint:ignore U1000 Ignore unused function temporarily while adding the framework // countTransactionsByState returns the number of transactions that are in the given state +// +//lint:ignore U1000 Ignore unused function temporarily while adding the framework func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) countTransactionsByState(txState txmgrtypes.TxState) int { return 0 } -//lint:ignore U1000 Ignore unused function temporarily while adding the framework // findTxWithIdempotencyKey returns the transaction with the given idempotency key. If no transaction is found, nil is returned. +// +//lint:ignore U1000 Ignore unused function temporarily while adding the framework func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxWithIdempotencyKey(key string) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { return nil } -//lint:ignore U1000 Ignore unused function temporarily while adding the framework // applyToTxsByState calls the given function for each transaction in the given states. // If txIDs are provided, only the transactions with those IDs are considered. // If no txIDs are provided, all transactions in the given states are considered. // If no txStates are provided, all transactions are considered. +// +//lint:ignore U1000 Ignore unused function temporarily while adding the framework func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) applyToTxsByState( txStates []txmgrtypes.TxState, fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]), @@ -152,12 +156,13 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) applyT ) { } -//lint:ignore U1000 Ignore unused function temporarily while adding the framework // findTxAttempts returns all attempts for the given transactions that match the given filters. // If txIDs are provided, only the transactions with those IDs are considered. // If no txIDs are provided, all transactions are considered. // If no txStates are provided, all transactions are considered. // The txFilter is applied to the transactions and the txAttemptFilter is applied to the attempts. +// +//lint:ignore U1000 Ignore unused function temporarily while adding the framework func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxAttempts( txStates []txmgrtypes.TxState, txFilter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, @@ -167,11 +172,12 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTx return nil } -//lint:ignore U1000 Ignore unused function temporarily while adding the framework // findTxs returns all transactions that match the given filters. // If txIDs are provided, only the transactions with those IDs are considered. // If no txIDs are provided, all transactions are considered. // If no txStates are provided, all transactions are considered. +// +//lint:ignore U1000 Ignore unused function temporarily while adding the framework func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxs( txStates []txmgrtypes.TxState, filter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, @@ -180,36 +186,42 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTx return nil } -//lint:ignore U1000 Ignore unused function temporarily while adding the framework // pruneUnstartedTxQueue removes the transactions with the given IDs from the unstarted transaction queue. +// +//lint:ignore U1000 Ignore unused function temporarily while adding the framework func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneUnstartedTxQueue(ids []int64) { } -//lint:ignore U1000 Ignore unused function temporarily while adding the framework // deleteTxs removes the transactions with the given IDs from the address state. +// +//lint:ignore U1000 Ignore unused function temporarily while adding the framework func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { } -//lint:ignore U1000 Ignore unused function temporarily while adding the framework // peekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue. +// +//lint:ignore U1000 Ignore unused function temporarily while adding the framework func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { return nil, nil } -//lint:ignore U1000 Ignore unused function temporarily while adding the framework // peekInProgressTx returns the in-progress transaction without removing it from the in-progress state. +// +//lint:ignore U1000 Ignore unused function temporarily while adding the framework func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { return nil, nil } -//lint:ignore U1000 Ignore unused function temporarily while adding the framework // addTxToUnstarted adds the given transaction to the unstarted queue. +// +//lint:ignore U1000 Ignore unused function temporarily while adding the framework func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) addTxToUnstarted(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { return nil } -//lint:ignore U1000 Ignore unused function temporarily while adding the framework // moveUnstartedToInProgress moves the next unstarted transaction to the in-progress state. +// +//lint:ignore U1000 Ignore unused function temporarily while adding the framework func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUnstartedToInProgress( etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], txAttempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], @@ -217,16 +229,18 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUn return nil } -//lint:ignore U1000 Ignore unused function temporarily while adding the framework // moveConfirmedMissingReceiptToUnconfirmed moves the confirmed missing receipt transaction to the unconfirmed state. +// +//lint:ignore U1000 Ignore unused function temporarily while adding the framework func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveConfirmedMissingReceiptToUnconfirmed( txID int64, ) error { return nil } -//lint:ignore U1000 Ignore unused function temporarily while adding the framework // moveInProgressToUnconfirmed moves the in-progress transaction to the unconfirmed state. +// +//lint:ignore U1000 Ignore unused function temporarily while adding the framework func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveInProgressToUnconfirmed( etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], @@ -234,36 +248,41 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveIn return nil } -//lint:ignore U1000 Ignore unused function temporarily while adding the framework // moveUnconfirmedToConfirmed moves the unconfirmed transaction to the confirmed state. +// +//lint:ignore U1000 Ignore unused function temporarily while adding the framework func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUnconfirmedToConfirmed( receipt txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], ) error { return nil } -//lint:ignore U1000 Ignore unused function temporarily while adding the framework // moveTxToFatalError moves a transaction to the fatal error state. +// +//lint:ignore U1000 Ignore unused function temporarily while adding the framework func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveTxToFatalError( txID int64, txError null.String, ) error { return nil } -//lint:ignore U1000 Ignore unused function temporarily while adding the framework // moveUnconfirmedToConfirmedMissingReceipt moves the unconfirmed transaction to the confirmed missing receipt state. +// +//lint:ignore U1000 Ignore unused function temporarily while adding the framework func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUnconfirmedToConfirmedMissingReceipt(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { return nil } -//lint:ignore U1000 Ignore unused function temporarily while adding the framework // moveInProgressToConfirmedMissingReceipt moves the in-progress transaction to the confirmed missing receipt state. +// +//lint:ignore U1000 Ignore unused function temporarily while adding the framework func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveInProgressToConfirmedMissingReceipt(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { return nil } -//lint:ignore U1000 Ignore unused function temporarily while adding the framework // moveConfirmedToUnconfirmed moves the confirmed transaction to the unconfirmed state. +// +//lint:ignore U1000 Ignore unused function temporarily while adding the framework func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveConfirmedToUnconfirmed(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { return nil }