Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TXM In-memory: step 3-04-SaveReplacementInProgressAttempt #12236

56 changes: 54 additions & 2 deletions common/txmgr/address_state.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package txmgr

import (
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -129,12 +130,12 @@
}

// countTransactionsByState returns the number of transactions that are in the given state
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) countTransactionsByState(txState txmgrtypes.TxState) int {

Check failure on line 133 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).countTransactionsByState is unused (unused)
return 0
}

// findTxWithIdempotencyKey returns the transaction with the given idempotency key. If no transaction is found, nil is returned.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxWithIdempotencyKey(key string) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {

Check failure on line 138 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).findTxWithIdempotencyKey is unused (unused)
return nil
}

Expand All @@ -144,7 +145,7 @@
// If no txStates are provided, all transactions are considered.
// This method does not handle transactions in the UnstartedTx state.
// Any transaction states that are unknown will cause a panic including UnstartedTx.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) applyToTxsByState(

Check failure on line 148 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).applyToTxsByState is unused (unused)
txStates []txmgrtypes.TxState,
fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]),
txIDs ...int64,
Expand Down Expand Up @@ -183,7 +184,7 @@
// If no txIDs are provided, all transactions are considered.
// If no txStates are provided, all transactions are considered.
// The txFilter is applied to the transactions and the txAttemptFilter is applied to the attempts.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxAttempts(

Check failure on line 187 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).findTxAttempts is unused (unused)
txStates []txmgrtypes.TxState,
txFilter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
txAttemptFilter func(*txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
Expand Down Expand Up @@ -233,25 +234,76 @@
}

// pruneUnstartedTxQueue removes the transactions with the given IDs from the unstarted transaction queue.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneUnstartedTxQueue(ids []int64) {

Check failure on line 237 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).pruneUnstartedTxQueue is unused (unused)
}

// deleteTxs removes the transactions with the given IDs from the address state.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {

Check failure on line 241 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).deleteTxs is unused (unused)
as.Lock()
defer as.Unlock()

as._deleteTxs(txs...)
}

// deleteTxAttempts removes the attempts with the given IDs from the address state.
// It removes the attempts from the hash lookup map and from the transaction.
// If an attempt is not found in the hash lookup map, it is ignored.
// If a transaction is not found in the allTxs map, it is ignored.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxAttempts(txAttempts ...txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {
as.Lock()
defer as.Unlock()

for _, txAttempt := range txAttempts {
// remove the attempt from the hash lookup map
delete(as.attemptHashToTxAttempt, txAttempt.Hash)
// remove the attempt from the transaction
if tx := as.allTxs[txAttempt.TxID]; tx != nil {
var removeIndex int
for i := 0; i < len(tx.TxAttempts); i++ {
if tx.TxAttempts[i].ID == txAttempt.ID {
removeIndex = i
break
}
}
tx.TxAttempts = append(tx.TxAttempts[:removeIndex], tx.TxAttempts[removeIndex+1:]...)
}
}
}

// addTxAttempt adds the given attempt to the transaction which matches its TxID.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) addTxAttempts(txAttempts ...txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
as.Lock()
defer as.Unlock()

var errs error
for i := 0; i < len(txAttempts); i++ {
txAttempt := txAttempts[i]
tx := as.allTxs[txAttempt.TxID]
if tx == nil {
errs = errors.Join(errs, fmt.Errorf("no transaction with ID %d", txAttempt.TxID))
continue
}
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved

// add the attempt to the transaction
if tx.TxAttempts == nil {
tx.TxAttempts = []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{}
}
tx.TxAttempts = append(tx.TxAttempts, txAttempt)
// add the attempt to the hash lookup map
as.attemptHashToTxAttempt[txAttempt.Hash] = &txAttempt
}

return errs
}

// peekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {

Check failure on line 300 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).peekNextUnstartedTx is unused (unused)
return nil, nil
}

// peekInProgressTx returns the in-progress transaction without removing it from the in-progress state.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {
return nil, nil
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekInProgressTx() *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {

Check failure on line 305 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).peekInProgressTx is unused (unused)
return nil
}

// addTxToUnstarted adds the given transaction to the unstarted queue.
Expand Down Expand Up @@ -403,7 +455,7 @@
return nil
}

func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _applyToTxs(

Check failure on line 458 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE])._applyToTxs is unused (unused)
txIDsToTx map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]),
txIDs ...int64,
Expand Down Expand Up @@ -452,7 +504,7 @@
return txs
}

func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {

Check failure on line 507 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE])._deleteTxs is unused (unused)
for _, tx := range txs {
if tx.IdempotencyKey != nil {
delete(as.idempotencyKeyToTx, *tx.IdempotencyKey)
Expand Down
43 changes: 42 additions & 1 deletion common/txmgr/inmemory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math/big"
"sync"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -46,7 +47,8 @@ type inMemoryStore[
keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ]
persistentTxStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]

addressStates map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
addressStatesLock sync.RWMutex
addressStates map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
}

// NewInMemoryStore returns a new inMemoryStore
Expand Down Expand Up @@ -171,6 +173,45 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveR
oldAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
replacementAttempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) error {
if oldAttempt.State != txmgrtypes.TxAttemptInProgress || replacementAttempt.State != txmgrtypes.TxAttemptInProgress {
return fmt.Errorf("expected attempts to be in_progress")
}
if oldAttempt.ID == 0 {
return fmt.Errorf("expected oldAttempt to have an ID")
}

ms.addressStatesLock.RLock()
defer ms.addressStatesLock.RUnlock()
filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool {
return true
}
var as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
var tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
for _, vas := range ms.addressStates {
txs := vas.findTxs(nil, filter, oldAttempt.TxID)
if len(txs) == 1 {
tx = &txs[0]
as = vas
break
}
}
if tx == nil {
return fmt.Errorf("save_replacement_in_progress_attempt: %w", ErrTxnNotFound)
}

// Persist to persistent storage
if err := ms.persistentTxStore.SaveReplacementInProgressAttempt(ctx, oldAttempt, replacementAttempt); err != nil {
return fmt.Errorf("save_replacement_in_progress_attempt: %w", err)
}

// Update in memory store
// delete the old attempt
as.deleteTxAttempts(oldAttempt)
// add the new attempt
if err := as.addTxAttempts(*replacementAttempt); err != nil {
return fmt.Errorf("save_replacement_in_progress_attempt: failed to add a replacement transaction attempt: %w", err)
amit-momin marked this conversation as resolved.
Show resolved Hide resolved
}

return nil
}

Expand Down
119 changes: 119 additions & 0 deletions core/chains/evm/txmgr/evm_inmemory_store_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,133 @@
package txmgr_test

import (
"context"
"math/big"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
commontxmgr "github.com/smartcontractkit/chainlink/v2/common/txmgr"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"

evmgas "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"
evmtxmgr "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
)

func TestInMemoryStore_SaveReplacementInProgressAttempt(t *testing.T) {
t.Parallel()

t.Run("successfully replace tx attempt", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db, dbcfg)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := context.Background()
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

// Insert a transaction into persistent store
inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 123, fromAddress)
// Insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx))

oldAttempt := inTx.TxAttempts[0]
newAttempt := cltest.NewDynamicFeeEthTxAttempt(t, inTx.ID)
err = inMemoryStore.SaveReplacementInProgressAttempt(
testutils.Context(t),
oldAttempt,
&newAttempt,
)
require.NoError(t, err)

expTx, err := persistentStore.FindTxWithAttempts(inTx.ID)
require.NoError(t, err)
fn := func(tx *evmtxmgr.Tx) bool { return true }
actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID)
require.Equal(t, 1, len(actTxs))
actTx := actTxs[0]
assertTxEqual(t, expTx, actTx)
assert.Equal(t, txmgrtypes.TxAttemptInProgress, actTx.TxAttempts[0].State)
assert.Equal(t, newAttempt.Hash, actTx.TxAttempts[0].Hash)
assert.NotEqual(t, oldAttempt.ID, actTx.TxAttempts[0].ID)
})

t.Run("error parity for in-memory vs persistent store", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db, dbcfg)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := context.Background()

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

// Insert a transaction into persistent store
inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 124, fromAddress)
// Insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx))

oldAttempt := inTx.TxAttempts[0]
newAttempt := cltest.NewDynamicFeeEthTxAttempt(t, inTx.ID)

t.Run("error when old attempt is not in progress", func(t *testing.T) {
oldAttempt.State = txmgrtypes.TxAttemptBroadcast
expErr := persistentStore.SaveReplacementInProgressAttempt(testutils.Context(t), oldAttempt, &newAttempt)
actErr := inMemoryStore.SaveReplacementInProgressAttempt(testutils.Context(t), oldAttempt, &newAttempt)
assert.Equal(t, expErr, actErr)
oldAttempt.State = txmgrtypes.TxAttemptInProgress
})

t.Run("error when new attempt is not in progress", func(t *testing.T) {
newAttempt.State = txmgrtypes.TxAttemptBroadcast
expErr := persistentStore.SaveReplacementInProgressAttempt(testutils.Context(t), oldAttempt, &newAttempt)
actErr := inMemoryStore.SaveReplacementInProgressAttempt(testutils.Context(t), oldAttempt, &newAttempt)
assert.Equal(t, expErr, actErr)
newAttempt.State = txmgrtypes.TxAttemptInProgress
})

t.Run("error when old attempt id is 0", func(t *testing.T) {
originalID := oldAttempt.ID
oldAttempt.ID = 0
expErr := persistentStore.SaveReplacementInProgressAttempt(testutils.Context(t), oldAttempt, &newAttempt)
actErr := inMemoryStore.SaveReplacementInProgressAttempt(testutils.Context(t), oldAttempt, &newAttempt)
assert.Equal(t, expErr, actErr)
oldAttempt.ID = originalID
})
})
}

// assertTxEqual asserts that two transactions are equal
func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) {
assert.Equal(t, exp.ID, act.ID)
Expand Down
Loading