Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TXM In-memory: address_state methods: step 3-02 #12176

Closed
wants to merge 45 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
dced7b7
implement PruneUnstartedTxQueue
poopoothegorilla Feb 23, 2024
d537a89
implement DeleteTxs
poopoothegorilla Feb 23, 2024
74d6b1e
implement PeekNextUnstartedTx
poopoothegorilla Feb 23, 2024
d187dad
implement PeekInProgressTx
poopoothegorilla Feb 23, 2024
fd32104
implement AddTxToUnstartedQueue
poopoothegorilla Feb 23, 2024
24444c3
Merge branch 'jtw/step-3-in-memory-work' into jtw/step-2-02
poopoothegorilla Feb 26, 2024
eb4d436
add CreateTransaction initial logic
poopoothegorilla Feb 26, 2024
01c79c7
Merge branch 'jtw/step-3-in-memory-work' into jtw/step-3-02
poopoothegorilla Feb 26, 2024
71b509a
Merge branch 'jtw/step-3-in-memory-work' into jtw/step-3-02
poopoothegorilla Feb 26, 2024
7095808
Merge branch 'jtw/step-3-02' into jtw/step-3-02-create-transaction
poopoothegorilla Feb 26, 2024
154d514
cleanup
poopoothegorilla Feb 26, 2024
362dbe1
add tests for create transaction
poopoothegorilla Feb 26, 2024
320be20
Merge branch 'jtw/step-3-in-memory-work' into jtw/step-3-02
poopoothegorilla Feb 26, 2024
ca3eec7
Merge branch 'jtw/step-3-02' into jtw/step-3-02-create-transaction
poopoothegorilla Feb 26, 2024
e8a2b41
implement PruneUnstartedTxQueue
poopoothegorilla Feb 27, 2024
ddf02a3
add panic if incorrect ChainID
poopoothegorilla Feb 27, 2024
13aa9b9
implement methods which are read only
poopoothegorilla Feb 29, 2024
16e05b8
Merge branch 'jtw/step-3-in-memory-work' into jtw/step-3-02
poopoothegorilla Mar 7, 2024
4dfffee
Merge branch 'jtw/step-3-02' into jtw/step-3-02-create-transaction
poopoothegorilla Mar 11, 2024
2577538
clean up test for CreateTransaction
poopoothegorilla Mar 11, 2024
0f1725d
Merge branch 'jtw/step-3-02' into jtw/step-3-02-non-persistent
poopoothegorilla Mar 11, 2024
4e642a3
add tests for GetTxInProgress
poopoothegorilla Mar 11, 2024
4a39135
Merge branch 'jtw/step-3-in-memory-work' into jtw/step-3-02
poopoothegorilla Mar 11, 2024
88a136d
Merge branch 'jtw/step-3-02' into jtw/step-3-02-non-persistent
poopoothegorilla Mar 11, 2024
523354d
add tests for FindNextUnstartedTransactionFromAddress
poopoothegorilla Mar 11, 2024
ee4d834
Merge branch 'jtw/step-3-02' into jtw/step-3-02-prune-unstarted-tx-queue
poopoothegorilla Mar 11, 2024
36c2523
add test for pruning
poopoothegorilla Mar 11, 2024
4d9577a
add test for pruning
poopoothegorilla Mar 11, 2024
c550580
fix delete method
poopoothegorilla Mar 11, 2024
4d3d0bb
Merge branch 'jtw/step-3-02' into jtw/step-3-02-create-transaction
poopoothegorilla Mar 21, 2024
b470da7
address comments
poopoothegorilla Mar 21, 2024
0fcd77a
Merge branch 'jtw/step-3-in-memory-work' into jtw/step-3-02
poopoothegorilla Mar 21, 2024
395d35f
fix address state issue if from address not available
poopoothegorilla Mar 21, 2024
bbc0c35
simplify addTxToUnstartedQueue
poopoothegorilla Mar 21, 2024
ec76f10
Merge branch 'jtw/step-3-02' into jtw/step-3-02-create-transaction
poopoothegorilla Mar 21, 2024
f7eb968
address comments
poopoothegorilla Mar 21, 2024
ef3b6a3
Merge branch 'jtw/step-3-02' into jtw/step-3-02-prune-unstarted-tx-queue
poopoothegorilla Mar 22, 2024
93900d5
cleanup context usage in tests
poopoothegorilla Mar 22, 2024
854225a
Merge pull request #12227 from smartcontractkit/jtw/step-3-02-prune-u…
poopoothegorilla Mar 29, 2024
4737097
Merge branch 'jtw/step-3-02' into jtw/step-3-02-create-transaction
poopoothegorilla Apr 1, 2024
bffcc71
clean up
poopoothegorilla Apr 1, 2024
ef5b26a
Merge pull request #12181 from smartcontractkit/jtw/step-3-02-create-…
poopoothegorilla Apr 1, 2024
e6fcbe7
Merge branch 'jtw/step-3-02' into jtw/step-3-02-non-persistent
poopoothegorilla Apr 4, 2024
482f9a1
panic on chainID mismatch
poopoothegorilla Apr 4, 2024
5341bbd
Merge pull request #12216 from smartcontractkit/jtw/step-3-02-non-per…
poopoothegorilla Apr 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 115 additions & 8 deletions common/txmgr/address_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,12 @@
}

// countTransactionsByState returns the number of transactions that are in the given state
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) countTransactionsByState(txState txmgrtypes.TxState) int {

Check failure on line 131 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).countTransactionsByState is unused (unused)

Check failure on line 131 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).countTransactionsByState is unused (unused)
return 0
}

// findTxWithIdempotencyKey returns the transaction with the given idempotency key. If no transaction is found, nil is returned.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxWithIdempotencyKey(key string) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {

Check failure on line 136 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).findTxWithIdempotencyKey is unused (unused)

Check failure on line 136 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).findTxWithIdempotencyKey is unused (unused)
return nil
}

Expand All @@ -141,7 +141,7 @@
// If txIDs are provided, only the transactions with those IDs are considered.
// If no txIDs are provided, all transactions in the given states are considered.
// If no txStates are provided, all transactions are considered.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) applyToTxsByState(

Check failure on line 144 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).applyToTxsByState is unused (unused)

Check failure on line 144 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).applyToTxsByState is unused (unused)
txStates []txmgrtypes.TxState,
fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]),
txIDs ...int64,
Expand All @@ -153,7 +153,7 @@
// If no txIDs are provided, all transactions are considered.
// If no txStates are provided, all transactions are considered.
// The txFilter is applied to the transactions and the txAttemptFilter is applied to the attempts.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxAttempts(

Check failure on line 156 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).findTxAttempts is unused (unused)

Check failure on line 156 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).findTxAttempts is unused (unused)
txStates []txmgrtypes.TxState,
txFilter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
txAttemptFilter func(*txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
Expand All @@ -171,34 +171,94 @@
filter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
txIDs ...int64,
) []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
return nil
as.RLock()
defer as.RUnlock()

// if txStates is empty then apply the filter to only the as.allTransactions map
if len(txStates) == 0 {
return as._findTxs(as.allTxs, filter, txIDs...)
}

var txs []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
for _, txState := range txStates {
switch txState {
case TxUnstarted:
filter2 := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool {
if tx.State != TxUnstarted {
return false
}
return filter(tx)
}
txs = append(txs, as._findTxs(as.allTxs, filter2, txIDs...)...)
case TxInProgress:
if as.inprogressTx != nil && filter(as.inprogressTx) {
txs = append(txs, *as.inprogressTx)
}
case TxUnconfirmed:
txs = append(txs, as._findTxs(as.unconfirmedTxs, filter, txIDs...)...)
case TxConfirmedMissingReceipt:
txs = append(txs, as._findTxs(as.confirmedMissingReceiptTxs, filter, txIDs...)...)
case TxConfirmed:
txs = append(txs, as._findTxs(as.confirmedTxs, filter, txIDs...)...)
case TxFatalError:
txs = append(txs, as._findTxs(as.fatalErroredTxs, filter, txIDs...)...)
default:
panic("findTxs: unknown transaction state")
}
}

return txs
}

// pruneUnstartedTxQueue removes the transactions with the given IDs from the unstarted transaction queue.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneUnstartedTxQueue(ids []int64) {
as.Lock()
defer as.Unlock()

txs := as.unstartedTxs.PruneByTxIDs(ids)
as._deleteTxs(txs...)
}

// deleteTxs removes the transactions with the given IDs from the address state.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {

Check failure on line 223 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).deleteTxs is unused (unused)

Check failure on line 223 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).deleteTxs is unused (unused)
as.Lock()
defer as.Unlock()

as._deleteTxs(txs...)
}

// peekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {
return nil, nil
// If there are no unstarted transactions, nil is returned.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekNextUnstartedTx() *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
as.RLock()
defer as.RUnlock()

return as.unstartedTxs.PeekNextTx()
}

// peekInProgressTx returns the in-progress transaction without removing it from the in-progress state.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {
return nil, nil
// If there is no in-progress transaction, nil is returned.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekInProgressTx() *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
as.RLock()
defer as.RUnlock()

return as.inprogressTx
}

// addTxToUnstarted adds the given transaction to the unstarted queue.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) addTxToUnstarted(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
return nil
// addTxToUnstartedQueue adds the given transaction to the unstarted queue.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) addTxToUnstartedQueue(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {
as.Lock()
defer as.Unlock()

as.unstartedTxs.AddTx(tx)
as.allTxs[tx.ID] = tx
if tx.IdempotencyKey != nil {
as.idempotencyKeyToTx[*tx.IdempotencyKey] = tx
}
}

// moveUnstartedToInProgress moves the next unstarted transaction to the in-progress state.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUnstartedToInProgress(

Check failure on line 261 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).moveUnstartedToInProgress is unused (unused)

Check failure on line 261 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).moveUnstartedToInProgress is unused (unused)
etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
txAttempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) error {
Expand All @@ -206,7 +266,7 @@
}

// moveConfirmedMissingReceiptToUnconfirmed moves the confirmed missing receipt transaction to the unconfirmed state.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveConfirmedMissingReceiptToUnconfirmed(

Check failure on line 269 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).moveConfirmedMissingReceiptToUnconfirmed is unused (unused)

Check failure on line 269 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).moveConfirmedMissingReceiptToUnconfirmed is unused (unused)
txID int64,
) error {
return nil
Expand Down Expand Up @@ -248,3 +308,50 @@
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveConfirmedToUnconfirmed(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
return nil
}

// This is not a concurrency-safe method and should only be called from within a lock
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {
for _, tx := range txs {
if tx.IdempotencyKey != nil {
delete(as.idempotencyKeyToTx, *tx.IdempotencyKey)
}
txID := tx.ID
if as.inprogressTx != nil && as.inprogressTx.ID == txID {
as.inprogressTx = nil
}
delete(as.allTxs, txID)
delete(as.unconfirmedTxs, txID)
delete(as.confirmedMissingReceiptTxs, txID)
delete(as.confirmedTxs, txID)
delete(as.fatalErroredTxs, txID)
as.unstartedTxs.RemoveTxByID(txID)
}
}

// This method is not concurrent safe and should only be called from within a lock
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _findTxs(
txIDsToTx map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
filter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
txIDs ...int64,
) []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
var txs []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
// if txIDs is not empty then only apply the filter to those transactions
if len(txIDs) > 0 {
for _, txID := range txIDs {
tx := txIDsToTx[txID]
if tx != nil && filter(tx) {
txs = append(txs, *tx)
}
}
return txs
}

// if txIDs is empty then apply the filter to all transactions
for _, tx := range txIDsToTx {
if filter(tx) {
txs = append(txs, *tx)
}
}

return txs
}
85 changes: 79 additions & 6 deletions common/txmgr/inmemory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math/big"
"sync"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -47,7 +48,8 @@ type inMemoryStore[
keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ]
persistentTxStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]

addressStates map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
addressStatesLock sync.RWMutex
addressStates map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
}

// NewInMemoryStore returns a new inMemoryStore
Expand Down Expand Up @@ -108,7 +110,29 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Creat
txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
error,
) {
return txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{}, nil
tx := txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{}
if ms.chainID.String() != chainID.String() {
panic("invalid chain ID")
}

ms.addressStatesLock.Lock()
as, ok := ms.addressStates[txRequest.FromAddress]
if !ok {
as = newAddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](ms.lggr, chainID, txRequest.FromAddress, ms.maxUnstarted, nil)
ms.addressStates[txRequest.FromAddress] = as
}
ms.addressStatesLock.Unlock()

// Persist Transaction to persistent storage
tx, err := ms.persistentTxStore.CreateTransaction(ctx, txRequest, chainID)
if err != nil {
return tx, fmt.Errorf("create_transaction: %w", err)
}

// Update in memory store
// Add the request to the Unstarted channel to be processed by the Broadcaster
as.addTxToUnstartedQueue(&tx)
return *ms.deepCopyTx(tx), nil
}

// FindTxWithIdempotencyKey returns a transaction with the given idempotency key
Expand Down Expand Up @@ -153,7 +177,24 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat

// GetTxInProgress returns the in_progress transaction for a given address.
func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTxInProgress(ctx context.Context, fromAddress ADDR) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {
return nil, nil
ms.addressStatesLock.RLock()
defer ms.addressStatesLock.RUnlock()
as, ok := ms.addressStates[fromAddress]
if !ok {
return nil, nil
}

tx := as.peekInProgressTx()
if tx == nil {
return nil, nil
}

if len(tx.TxAttempts) != 1 || tx.TxAttempts[0].State != txmgrtypes.TxAttemptInProgress {
return nil, fmt.Errorf("get_tx_in_progress: invariant violation: expected in_progress transaction %v to have exactly one unsent attempt. "+
"Your database is in an inconsistent state and this node will not function correctly until the problem is resolved", tx.ID)
}

return ms.deepCopyTx(*tx), nil
}

// UpdateTxAttemptInProgressToBroadcast updates a transaction attempt from in_progress to broadcast.
Expand All @@ -168,8 +209,27 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat
}

// FindNextUnstartedTransactionFromAddress returns the next unstarted transaction for a given address.
func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindNextUnstartedTransactionFromAddress(_ context.Context, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], fromAddress ADDR, chainID CHAIN_ID) error {
return nil
func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindNextUnstartedTransactionFromAddress(_ context.Context, fromAddress ADDR, chainID CHAIN_ID) (
*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
error,
) {
if ms.chainID.String() != chainID.String() {
panic("invalid chain ID")
}
ms.addressStatesLock.RLock()
defer ms.addressStatesLock.RUnlock()
as, ok := ms.addressStates[fromAddress]
if !ok {
return nil, fmt.Errorf("find_next_unstarted_transaction_from_address: %w: %q", ErrAddressNotFound, fromAddress)
}

etx := as.peekNextUnstartedTx()
if etx == nil {
return nil, fmt.Errorf("find_next_unstarted_transaction_from_address: %w", ErrTxnNotFound)
}
tx := ms.deepCopyTx(*etx)

return tx, nil
}

// SaveReplacementInProgressAttempt saves a replacement attempt for a transaction that is in_progress.
Expand Down Expand Up @@ -260,7 +320,20 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT
}

func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) ([]int64, error) {
return nil, nil
// Persist to persistent storage
ids, err := ms.persistentTxStore.PruneUnstartedTxQueue(ctx, queueSize, subject)
if err != nil {
return ids, err
}

// Update in memory store
ms.addressStatesLock.RLock()
defer ms.addressStatesLock.RUnlock()
for _, as := range ms.addressStates {
as.pruneUnstartedTxQueue(ids)
}

return ids, nil
}

func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ReapTxHistory(ctx context.Context, minBlockNumberToKeep int64, timeThreshold time.Time, chainID CHAIN_ID) error {
Expand Down
Loading
Loading