Skip to content

Commit

Permalink
clean up idempotency key location
Browse files Browse the repository at this point in the history
  • Loading branch information
poopoothegorilla committed Nov 15, 2023
1 parent 91bea75 commit 5314284
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 46 deletions.
51 changes: 40 additions & 11 deletions common/txmgr/address_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ type AddressState[
chainID CHAIN_ID
fromAddress ADDR

lock sync.RWMutex
unstarted *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
inprogress *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
lock sync.RWMutex
idempotencyKeyToTx map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
unstarted *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
inprogress *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
// TODO(jtw): using the TX ID as the key for the map might not make sense since the ID is set by the
// postgres DB which creates a dependency on the postgres DB. We should consider creating a UUID or ULID
// TX ID -> TX
Expand All @@ -41,17 +42,21 @@ func NewAddressState[
FEE feetypes.Fee,
](chainID CHAIN_ID, fromAddress ADDR, maxUnstarted int) *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
as := AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{
chainID: chainID,
fromAddress: fromAddress,
unstarted: NewTxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](maxUnstarted),
inprogress: nil,
unconfirmed: map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{},
chainID: chainID,
fromAddress: fromAddress,
idempotencyKeyToTx: map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{},
unstarted: NewTxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](maxUnstarted),
inprogress: nil,
unconfirmed: map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{},
}

return &as
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Initialize(txStore PersistentTxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) error {
as.lock.Lock()
defer as.lock.Unlock()

// Load all unstarted transactions from persistent storage
offset := 0
limit := 50
Expand All @@ -62,8 +67,9 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Initia
}
for i := 0; i < len(txs); i++ {
tx := txs[i]
if err := as.moveTxToUnstarted(&tx); err != nil {
return fmt.Errorf("address_state: initialization: %w", err)
as.unstarted.AddTx(&tx)
if tx.IdempotencyKey != nil {
as.idempotencyKeyToTx[*tx.IdempotencyKey] = &tx
}
}
if count <= offset+limit {
Expand All @@ -79,6 +85,9 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Initia
return fmt.Errorf("address_state: initialization: %w", err)
}
as.inprogress = tx
if tx.IdempotencyKey != nil {
as.idempotencyKeyToTx[*tx.IdempotencyKey] = tx
}

// Load all unconfirmed transactions from persistent storage
offset = 0
Expand All @@ -91,6 +100,9 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Initia
for i := 0; i < len(txs); i++ {
tx := txs[i]
as.unconfirmed[tx.ID] = &tx
if tx.IdempotencyKey != nil {
as.idempotencyKeyToTx[*tx.IdempotencyKey] = &tx
}
}
if count <= offset+limit {
break
Expand All @@ -109,6 +121,7 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) close(
as.unstarted = nil
as.inprogress = nil
clear(as.unconfirmed)
clear(as.idempotencyKeyToTx)
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) unstartedCount() int {
Expand All @@ -124,6 +137,13 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) unconf
return len(as.unconfirmed)
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxWithIdempotencyKey(key string) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
as.lock.RLock()
defer as.lock.RUnlock()

return as.idempotencyKeyToTx[key]
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {
as.lock.RLock()
defer as.lock.RUnlock()
Expand All @@ -148,7 +168,7 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekIn
return tx, nil
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveTxToUnstarted(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) addTxToUnstarted(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
as.lock.Lock()
defer as.lock.Unlock()

Expand All @@ -157,6 +177,9 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveTx
}

as.unstarted.AddTx(tx)
if tx.IdempotencyKey != nil {
as.idempotencyKeyToTx[*tx.IdempotencyKey] = tx
}

return nil
}
Expand Down Expand Up @@ -240,6 +263,12 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) abando
tx.Sequence = nil
tx.Error = null.NewString("abandoned", true)
}
for _, tx := range as.idempotencyKeyToTx {
tx.State = TxFatalError
tx.Sequence = nil
tx.Error = null.NewString("abandoned", true)
}

clear(as.unconfirmed)
}

Expand Down
45 changes: 10 additions & 35 deletions common/txmgr/inmemory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/label"
"gopkg.in/guregu/null.v4"
)

var (
Expand Down Expand Up @@ -86,9 +85,6 @@ type InMemoryStore[
keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ]
txStore PersistentTxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]

pendingLock sync.RWMutex
pendingIdempotencyKeys map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]

addressStatesLock sync.RWMutex
addressStates map[ADDR]*AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
}
Expand All @@ -110,8 +106,6 @@ func NewInMemoryStore[
keyStore: keyStore,
txStore: txStore,

pendingIdempotencyKeys: map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{},

addressStates: map[ADDR]*AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{},
}

Expand Down Expand Up @@ -159,15 +153,17 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT
return nil, fmt.Errorf("find_tx_with_idempotency_key: %w", ErrInvalidChainID)
}

ms.pendingLock.Lock()
defer ms.pendingLock.Unlock()

tx, ok := ms.pendingIdempotencyKeys[idempotencyKey]
if !ok {
return nil, fmt.Errorf("find_tx_with_idempotency_key: %w", ErrTxnNotFound)
// Check if the transaction is in the pending queue of all address states
ms.addressStatesLock.Lock()
defer ms.addressStatesLock.Unlock()
for _, as := range ms.addressStates {
if tx := as.findTxWithIdempotencyKey(idempotencyKey); tx != nil {
return tx, nil
}
}

return tx, nil
return nil, fmt.Errorf("find_tx_with_idempotency_key: %w", ErrTxnNotFound)

}

// CheckTxQueueCapacity checks if the queue capacity has been reached for a given address
Expand Down Expand Up @@ -437,10 +433,6 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close
// Close the event recorder
ms.txStore.Close()

ms.pendingLock.Lock()
clear(ms.pendingIdempotencyKeys)
ms.pendingLock.Unlock()

// Clear all address states
ms.addressStatesLock.Lock()
for _, as := range ms.addressStates {
Expand Down Expand Up @@ -468,17 +460,6 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Aband
}
as.abandon()

ms.pendingLock.Lock()
// Mark all pending transactions as abandoned
for _, tx := range ms.pendingIdempotencyKeys {
if tx.FromAddress == addr {
tx.State = TxFatalError
tx.Sequence = nil
tx.Error = null.NewString("abandoned", true)
}
}
ms.pendingLock.Unlock()

return nil
}

Expand All @@ -491,15 +472,9 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) sendT
// TODO(jtw); HANDLE PRUNING STEP

// Add the request to the Unstarted channel to be processed by the Broadcaster
if err := as.moveTxToUnstarted(&tx); err != nil {
if err := as.addTxToUnstarted(&tx); err != nil {
return fmt.Errorf("send_tx_to_unstarted_queue: %w", err)
}

ms.pendingLock.Lock()
if tx.IdempotencyKey != nil {
ms.pendingIdempotencyKeys[*tx.IdempotencyKey] = &tx
}
ms.pendingLock.Unlock()

return nil
}

0 comments on commit 5314284

Please sign in to comment.