diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index d2e206dd264..f83098b7bbf 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -217,12 +217,7 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindLa defer as.RUnlock() var maxSeq SEQ - if as.inprogress != nil && as.inprogress.Sequence != nil { - if (*as.inprogress.Sequence).Int64() > maxSeq.Int64() { - maxSeq = *as.inprogress.Sequence - } - } - for _, tx := range as.unconfirmed { + for _, tx := range as.allTransactions { if tx.Sequence == nil { continue } @@ -271,9 +266,6 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) applyT fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]), txIDs ...int64, ) { - as.Lock() - defer as.Unlock() - // if txIDs is not empty then only apply the filter to those transactions if len(txIDs) > 0 { for _, txID := range txIDs { @@ -495,7 +487,9 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) AddTxT return nil } -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnstartedToInProgress(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnstartedToInProgress( + tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], +) error { as.Lock() defer as.Unlock() @@ -505,7 +499,6 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUn if tx != nil { // if tx is not nil then remove the tx from the unstarted queue - // TODO(jtw): what should be the unique idenitifier for each transaction? ID is being set by the postgres DB tx = as.unstarted.RemoveTxByID(tx.ID) } else { // if tx is nil then pop the next unstarted transaction @@ -520,6 +513,24 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUn return nil } +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveConfirmedMissingReceiptToUnconfirmed( + txID int64, +) error { + as.Lock() + defer as.Unlock() + + tx, ok := as.confirmedMissingReceipt[txID] + if !ok || tx == nil { + return fmt.Errorf("move_confirmed_missing_receipt_to_unconfirmed: no confirmed_missing_receipt transaction with ID %d: %w", txID, ErrTxnNotFound) + } + + tx.State = TxUnconfirmed + as.unconfirmed[tx.ID] = tx + delete(as.confirmedMissingReceipt, tx.ID) + + return nil +} + func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveInProgressToUnconfirmed( txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], ) error { @@ -530,25 +541,66 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveIn if tx == nil { return fmt.Errorf("move_in_progress_to_unconfirmed: no transaction in progress") } + + txAttempt.TxID = tx.ID + txAttempt.State = txmgrtypes.TxAttemptBroadcast tx.State = TxUnconfirmed + tx.TxAttempts = []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{txAttempt} - var found bool - for i := 0; i < len(tx.TxAttempts); i++ { - if tx.TxAttempts[i].ID == txAttempt.ID { - tx.TxAttempts[i] = txAttempt - found = true - break + as.unconfirmed[tx.ID] = tx + as.inprogress = nil + + return nil +} + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnconfirmedToConfirmed( + receipt txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], +) error { + as.Lock() + defer as.Unlock() + + for _, tx := range as.unconfirmed { + if tx.TxAttempts == nil { + continue + } + for i := 0; i < len(tx.TxAttempts); i++ { + txAttempt := tx.TxAttempts[i] + if receipt.GetTxHash() == txAttempt.Hash { + // TODO(jtw): not sure how to set blocknumber, transactionindex, and receipt on conflict + txAttempt.Receipts = []txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH]{receipt} + txAttempt.State = txmgrtypes.TxAttemptBroadcast + if txAttempt.BroadcastBeforeBlockNum == nil { + blockNum := receipt.GetBlockNumber().Int64() + txAttempt.BroadcastBeforeBlockNum = &blockNum + } + + tx.State = TxConfirmed + return nil + } } } - if !found { - // NOTE(jtw): this would mean that the TxAttempt did not exist for the Tx - // NOTE(jtw): should this log a warning? - // NOTE(jtw): can this happen? - tx.TxAttempts = append(tx.TxAttempts, txAttempt) + + return fmt.Errorf("move_unconfirmed_to_confirmed: no unconfirmed transaction with receipt %v: %w", receipt, ErrTxnNotFound) +} + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnstartedToFatalError( + etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + txError null.String, +) error { + as.Lock() + defer as.Unlock() + + tx := as.unstarted.RemoveTxByID(etx.ID) + if tx == nil { + return fmt.Errorf("move_unstarted_to_fatal_error: no unstarted transaction with ID %d", etx.ID) } - as.unconfirmed[tx.ID] = tx - as.inprogress = nil + tx.State = TxFatalError + tx.Sequence = nil + tx.TxAttempts = nil + tx.InitialBroadcastAt = nil + tx.Error = txError + as.fatalErrored[tx.ID] = tx return nil } @@ -573,6 +625,15 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveIn return nil } +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnconfirmedToConfirmedMissingReceipt() error { + // TODO + return nil +} +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveInProgressToConfirmedMissingReceipt() error { + // TODO + return nil +} + func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) abandon() { as.Lock() defer as.Unlock() diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index e97dc8ddc4e..81ec3fca544 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -10,6 +10,7 @@ import ( "time" "github.com/google/uuid" + "github.com/smartcontractkit/chainlink-common/pkg/logger" feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" "github.com/smartcontractkit/chainlink/v2/common/types" @@ -20,6 +21,9 @@ import ( // BIG TODO LIST // TODO: make sure that all state transitions are handled by the address state to ensure that the in-memory store is always in a consistent state // TODO: figure out if multiple tx attempts are actually stored in the db for each tx +// TODO: check that txns are deep copied when returned from the in-memory store +// TODO: need a way to get id for a tx attempt. since there are some methods where the persistent store creates a tx attempt and doesnt returns it +// TODO: make sure all address states are locked when updating the in-memory store var ( // ErrInvalidChainID is returned when the chain ID is invalid @@ -58,6 +62,7 @@ type InMemoryStore[ SEQ types.Sequence, FEE feetypes.Fee, ] struct { + lggr logger.Logger chainID CHAIN_ID keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ] @@ -75,11 +80,13 @@ func NewInMemoryStore[ SEQ types.Sequence, FEE feetypes.Fee, ]( + lggr logger.Logger, chainID CHAIN_ID, keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ], txStore PersistentTxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE], ) (*InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE], error) { ms := InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{ + lggr: lggr, chainID: chainID, keyStore: keyStore, txStore: txStore, @@ -120,7 +127,14 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Creat return tx, fmt.Errorf("create_transaction: %w", err) } - // TODO(jtw); HANDLE PRUNING STEP + // Prune the in-memory txs + pruned, err := txRequest.Strategy.PruneQueue(ctx, ms) + if err != nil { + return tx, fmt.Errorf("CreateTransaction failed to prune in-memory txs: %w", err) + } + if pruned > 0 { + ms.lggr.Warnf("Dropped %d old transactions from transaction queue", pruned) + } // Add the request to the Unstarted channel to be processed by the Broadcaster if err := as.AddTxToUnstarted(&tx); err != nil { @@ -227,6 +241,7 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Count } // UpdateTxUnstartedToInProgress updates a transaction from unstarted to in_progress. +// TODO THIS HAS SOME INCONSISTENCIES WITH THE PERSISTENT STORE func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxUnstartedToInProgress( ctx context.Context, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], @@ -251,6 +266,7 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat return fmt.Errorf("update_tx_unstarted_to_in_progress: %w", err) } tx.TxAttempts = append(tx.TxAttempts, *attempt) + // TODO: DOES THIS ATTEMPT HAVE AN ID? IF NOT, HOW DO WE GET IT? // Update in address state in memory if err := as.MoveUnstartedToInProgress(tx); err != nil { @@ -272,10 +288,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTx return nil, fmt.Errorf("get_tx_in_progress: %w", err) } - // NOTE(jtw): should this exist in the in-memory store? or just the persistent store? - // NOTE(jtw): where should this live? if len(tx.TxAttempts) != 1 || tx.TxAttempts[0].State != txmgrtypes.TxAttemptInProgress { - return nil, fmt.Errorf("get_tx_in_progress: expected in_progress transaction %v to have exactly one unsent attempt. "+ + return nil, fmt.Errorf("get_tx_in_progress: invariant violation: expected in_progress transaction %v to have exactly one unsent attempt. "+ "Your database is in an inconsistent state and this node will not function correctly until the problem is resolved", tx.ID) } @@ -283,6 +297,7 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTx } // UpdateTxAttemptInProgressToBroadcast updates a transaction attempt from in_progress to broadcast. +// It also updates the transaction state to unconfirmed. func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxAttemptInProgressToBroadcast( ctx context.Context, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], @@ -305,17 +320,17 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat return fmt.Errorf("update_tx_attempt_in_progress_to_broadcast: new attempt state must be broadcast, got: %s", newAttemptState) } + as, ok := ms.addressStates[tx.FromAddress] + if !ok { + return fmt.Errorf("update_tx_attempt_in_progress_to_broadcast: %w", ErrAddressNotFound) + } + // Persist to persistent storage if err := ms.txStore.UpdateTxAttemptInProgressToBroadcast(ctx, tx, attempt, newAttemptState); err != nil { return fmt.Errorf("update_tx_attempt_in_progress_to_broadcast: %w", err) } - // Ensure that the tx state is updated to unconfirmed since this is a chain agnostic operation attempt.State = newAttemptState - as, ok := ms.addressStates[tx.FromAddress] - if !ok { - return fmt.Errorf("update_tx_attempt_in_progress_to_broadcast: %w", ErrAddressNotFound) - } if err := as.MoveInProgressToUnconfirmed(attempt); err != nil { return fmt.Errorf("update_tx_attempt_in_progress_to_broadcast: %w", err) } @@ -324,7 +339,6 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat } // FindNextUnstartedTransactionFromAddress returns the next unstarted transaction for a given address. -// NOTE(jtw): method signature is different from most other signatures where the tx is passed in and updated func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindNextUnstartedTransactionFromAddress(_ context.Context, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], fromAddress ADDR, chainID CHAIN_ID) error { if ms.chainID.String() != chainID.String() { return fmt.Errorf("find_next_unstarted_transaction_from_address: %w", ErrInvalidChainID) @@ -339,11 +353,11 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindN return fmt.Errorf("find_next_unstarted_transaction_from_address: address %s is already busy with a transaction in progress", fromAddress) } - var err error - tx, err = as.PeekNextUnstartedTx() - if tx == nil { + etx, err := as.PeekNextUnstartedTx() + if err != nil || etx == nil { return fmt.Errorf("find_next_unstarted_transaction_from_address: %w", err) } + tx = ms.deepCopyTx(etx) return nil } @@ -361,7 +375,6 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveR return fmt.Errorf("save_replacement_in_progress_attempt: expected oldattempt to have an ID") } - // Check if fromaddress enabled as, ok := ms.addressStates[oldAttempt.Tx.FromAddress] if !ok { return fmt.Errorf("save_replacement_in_progress_attempt: %w", ErrAddressNotFound) @@ -376,30 +389,21 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveR if tx == nil { return fmt.Errorf("save_replacement_in_progress_attempt: %w", err) } - var found bool - for i := 0; i < len(tx.TxAttempts); i++ { - if tx.TxAttempts[i].ID == oldAttempt.ID { - tx.TxAttempts[i] = *replacementAttempt - found = true - } - } - if !found { - tx.TxAttempts = append(tx.TxAttempts, *replacementAttempt) - // NOTE(jtw): should this log a warning? - } + // TODO: DOES THIS ATTEMPT HAVE AN ID? IF NOT, HOW DO WE GET IT? + tx.TxAttempts = []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{*replacementAttempt} return nil } // UpdateTxFatalError updates a transaction to fatal_error. func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxFatalError(ctx context.Context, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { - if tx.State != TxInProgress { + if tx.State != TxInProgress && tx.State != TxUnstarted { return fmt.Errorf("update_tx_fatal_error: can only transition to fatal_error from in_progress, transaction is currently %s", tx.State) } if !tx.Error.Valid { return fmt.Errorf("update_tx_fatal_error: expected error field to be set") } - // Check if fromaddress enabled + as, ok := ms.addressStates[tx.FromAddress] if !ok { return fmt.Errorf("update_tx_fatal_error: %w", ErrAddressNotFound) @@ -411,11 +415,18 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat } // Update in memory store - if err := as.MoveInProgressToFatalError(tx.Error); err != nil { - return fmt.Errorf("update_tx_fatal_error: %w", err) + switch tx.State { + case TxInProgress: + if err := as.MoveInProgressToFatalError(tx.Error); err != nil { + return fmt.Errorf("update_tx_fatal_error: %w", err) + } + case TxUnstarted: + if err := as.MoveUnstartedToFatalError(*tx, tx.Error); err != nil { + return fmt.Errorf("update_tx_fatal_error: %w", err) + } } - return fmt.Errorf("update_tx_fatal_error: not implemented") + return nil } // Close closes the InMemoryStore @@ -468,11 +479,12 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SetBr if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 { return } - // TODO(jtw): how many tx_attempts are actually stored in the db for each tx? It looks like its only 1 - attempt := tx.TxAttempts[0] - if attempt.State == txmgrtypes.TxAttemptBroadcast && attempt.BroadcastBeforeBlockNum == nil && - tx.ChainID.String() == chainID.String() { - tx.TxAttempts[0].BroadcastBeforeBlockNum = &blockNum + + for i := 0; i < len(tx.TxAttempts); i++ { + attempt := tx.TxAttempts[i] + if attempt.State == txmgrtypes.TxAttemptBroadcast && attempt.BroadcastBeforeBlockNum == nil { + tx.TxAttempts[i].BroadcastBeforeBlockNum = &blockNum + } } } for _, as := range ms.addressStates { @@ -489,11 +501,7 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT } filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { - if tx.TxAttempts != nil && len(tx.TxAttempts) > 0 { - return tx.ChainID.String() == chainID.String() - } - - return false + return tx.TxAttempts != nil && len(tx.TxAttempts) > 0 } states := []txmgrtypes.TxState{TxConfirmedMissingReceipt} attempts := []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} @@ -515,7 +523,7 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat // Update in memory store fn := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { - if tx.BroadcastAt != nil { + if tx.BroadcastAt != nil && tx.BroadcastAt.Before(now) { tx.BroadcastAt = &now } } @@ -535,13 +543,19 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat } // Update in memory store - fn := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { - tx.State = TxUnconfirmed - } - + wg := sync.WaitGroup{} for _, as := range ms.addressStates { - as.ApplyToTxs(nil, fn, txIDs...) + wg.Add(1) + go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { + for _, txID := range txIDs { + if err := as.MoveConfirmedMissingReceiptToUnconfirmed(txID); err != nil { + continue + } + } + wg.Done() + }(as) } + wg.Wait() return nil } @@ -655,42 +669,18 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveF return err } - // convert receipts to map - receiptsMap := map[TX_HASH]R{} - for _, receipt := range receipts { - receiptsMap[receipt.GetTxHash()] = receipt - } - // Update in memory store - fn := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { - if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 { - return - } - attempt := tx.TxAttempts[0] - receipt, ok := receiptsMap[attempt.Hash] - if !ok { - return - } - - if attempt.Receipts != nil && len(attempt.Receipts) > 0 && - attempt.Receipts[0].GetBlockNumber() != nil && receipt.GetBlockNumber() != nil && - attempt.Receipts[0].GetBlockNumber().Cmp(receipt.GetBlockNumber()) == 0 { - return - } - // TODO(jtw): this needs to be finished - - attempt.State = txmgrtypes.TxAttemptBroadcast - if attempt.BroadcastBeforeBlockNum == nil { - blocknum := receipt.GetBlockNumber().Int64() - attempt.BroadcastBeforeBlockNum = &blocknum - } - attempt.Receipts = []txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH]{receipt} - - tx.State = TxConfirmed - } + wg := sync.WaitGroup{} for _, as := range ms.addressStates { - as.ApplyToTxs(nil, fn) + wg.Add(1) + go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { + for _, receipt := range receipts { + as.MoveUnconfirmedToConfirmed(receipt) + } + wg.Done() + }(as) } + wg.Wait() return nil } @@ -1270,6 +1260,7 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveC } // Update in memory store + // TODO: WHERE LEFT OFF fn := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { if tx.ID != attempt.TxID { return