Skip to content

Commit

Permalink
Merge pull request #12216 from smartcontractkit/jtw/step-3-02-non-per…
Browse files Browse the repository at this point in the history
…sistent

TXM In-memory: step 3-02-Non-Persistent Methods
  • Loading branch information
poopoothegorilla authored Apr 4, 2024
2 parents ef5b26a + 482f9a1 commit 5341bbd
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 3 deletions.
42 changes: 39 additions & 3 deletions common/txmgr/inmemory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,24 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat

// GetTxInProgress returns the in_progress transaction for a given address.
func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTxInProgress(ctx context.Context, fromAddress ADDR) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {
return nil, nil
ms.addressStatesLock.RLock()
defer ms.addressStatesLock.RUnlock()
as, ok := ms.addressStates[fromAddress]
if !ok {
return nil, nil
}

tx := as.peekInProgressTx()
if tx == nil {
return nil, nil
}

if len(tx.TxAttempts) != 1 || tx.TxAttempts[0].State != txmgrtypes.TxAttemptInProgress {
return nil, fmt.Errorf("get_tx_in_progress: invariant violation: expected in_progress transaction %v to have exactly one unsent attempt. "+
"Your database is in an inconsistent state and this node will not function correctly until the problem is resolved", tx.ID)
}

return ms.deepCopyTx(*tx), nil
}

// UpdateTxAttemptInProgressToBroadcast updates a transaction attempt from in_progress to broadcast.
Expand All @@ -192,8 +209,27 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat
}

// FindNextUnstartedTransactionFromAddress returns the next unstarted transaction for a given address.
func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindNextUnstartedTransactionFromAddress(_ context.Context, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], fromAddress ADDR, chainID CHAIN_ID) error {
return nil
func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindNextUnstartedTransactionFromAddress(_ context.Context, fromAddress ADDR, chainID CHAIN_ID) (
*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
error,
) {
if ms.chainID.String() != chainID.String() {
panic("invalid chain ID")
}
ms.addressStatesLock.RLock()
defer ms.addressStatesLock.RUnlock()
as, ok := ms.addressStates[fromAddress]
if !ok {
return nil, fmt.Errorf("find_next_unstarted_transaction_from_address: %w: %q", ErrAddressNotFound, fromAddress)
}

etx := as.peekNextUnstartedTx()
if etx == nil {
return nil, fmt.Errorf("find_next_unstarted_transaction_from_address: %w", ErrTxnNotFound)
}
tx := ms.deepCopyTx(*etx)

return tx, nil
}

// SaveReplacementInProgressAttempt saves a replacement attempt for a transaction that is in_progress.
Expand Down
142 changes: 142 additions & 0 deletions core/chains/evm/txmgr/evm_inmemory_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
commontxmgr "github.com/smartcontractkit/chainlink/v2/common/txmgr"

txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"

evmassets "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets"
Expand All @@ -24,6 +25,147 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
)

func TestInMemoryStore_GetTxInProgress(t *testing.T) {
t.Parallel()

db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())
_, otherAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := testutils.Context(t)

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

// insert the transaction into the persistent store
inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 123, fromAddress)
require.NotNil(t, inTx)
// insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx))

// insert non in-progress transaction for another address
otherTx := cltest.NewEthTx(otherAddress)
require.NoError(t, persistentStore.InsertTx(ctx, &otherTx))
require.NoError(t, inMemoryStore.XXXTestInsertTx(otherAddress, &otherTx))

tcs := []struct {
name string
fromAddress common.Address

hasErr bool
hasTx bool
}{
{"finds the correct inprogress transaction", fromAddress, false, true},
{"wrong fromAddress", common.Address{}, false, false},
{"no inprogress transaction", otherAddress, false, false},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
actTx, actErr := inMemoryStore.GetTxInProgress(ctx, tc.fromAddress)
expTx, expErr := persistentStore.GetTxInProgress(ctx, tc.fromAddress)
if tc.hasErr {
require.NotNil(t, actErr)
require.NotNil(t, expErr)
require.Equal(t, expErr, actErr)
} else {
require.Nil(t, actErr)
require.Nil(t, expErr)
}
if tc.hasTx {
require.NotNil(t, actTx)
require.NotNil(t, expTx)
assertTxEqual(t, *expTx, *actTx)
} else {
require.Nil(t, actTx)
require.Nil(t, expTx)
}
})
}
}

func TestInMemoryStore_FindNextUnstartedTransactionFromAddress(t *testing.T) {
t.Parallel()

db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())
_, otherAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := testutils.Context(t)

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

// insert the transaction into the persistent store
inTx := mustCreateUnstartedGeneratedTx(t, persistentStore, fromAddress, chainID)
// insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx))

// insert non in-progress transaction for another address
otherTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 13, otherAddress)
require.NoError(t, inMemoryStore.XXXTestInsertTx(otherAddress, &otherTx))

tcs := []struct {
name string
fromAddress common.Address
chainID *big.Int

hasErr bool
hasTx bool
}{
{"finds the correct inprogress transaction", fromAddress, chainID, false, true},
{"no unstarted transaction", otherAddress, chainID, true, false},
{"wrong chainID", fromAddress, big.NewInt(123), true, false},
{"unknown address", common.Address{}, chainID, true, false},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
actTx, actErr := inMemoryStore.FindNextUnstartedTransactionFromAddress(ctx, tc.fromAddress, tc.chainID)
expTx, expErr := persistentStore.FindNextUnstartedTransactionFromAddress(ctx, tc.fromAddress, tc.chainID)
if tc.hasErr {
require.NotNil(t, actErr)
require.NotNil(t, expErr)
} else {
require.Nil(t, actErr)
require.Nil(t, expErr)
}
if tc.hasTx {
require.NotNil(t, actTx)
require.NotNil(t, expTx)
assertTxEqual(t, *expTx, *actTx)
} else {
require.Nil(t, actTx)
require.Nil(t, expTx)
}
})
}
}

func TestInMemoryStore_CreateTransaction(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 5341bbd

Please sign in to comment.