Skip to content

Commit

Permalink
finish initial implementation of core methods of txStore
Browse files Browse the repository at this point in the history
  • Loading branch information
poopoothegorilla committed Jan 4, 2024
1 parent 121c98b commit 3f46194
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 32 deletions.
41 changes: 40 additions & 1 deletion common/txmgr/address_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTx
return as.idempotencyKeyToTx[key]
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindLatestSequence() SEQ {
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) LatestSequence() SEQ {
as.RLock()
defer as.RUnlock()

Expand All @@ -230,6 +230,23 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindLa
return maxSeq
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MaxConfirmedSequence() SEQ {
as.RLock()
defer as.RUnlock()

var maxSeq SEQ
for _, tx := range as.confirmed {
if tx.Sequence == nil {
continue
}
if (*tx.Sequence).Int64() > maxSeq.Int64() {
maxSeq = *tx.Sequence
}
}

return maxSeq
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ApplyToTxs(
txStates []txmgrtypes.TxState,
fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]),
Expand Down Expand Up @@ -625,6 +642,28 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveIn

return nil
}
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveConfirmedMissingReceiptToFatalError(
etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
txError null.String,
) error {
as.Lock()
defer as.Unlock()

tx, ok := as.confirmedMissingReceipt[etx.ID]
if !ok || tx == nil {
return fmt.Errorf("move_confirmed_missing_receipt_to_fatal_error: no confirmed_missing_receipt transaction with ID %d: %w", etx.ID, ErrTxnNotFound)
}

tx.State = TxFatalError
tx.Sequence = nil
tx.TxAttempts = nil
tx.InitialBroadcastAt = nil
tx.Error = txError
as.fatalErrored[tx.ID] = tx
delete(as.confirmedMissingReceipt, tx.ID)

return nil
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnconfirmedToConfirmedMissingReceipt(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error {
as.Lock()
Expand Down
166 changes: 135 additions & 31 deletions common/txmgr/inmemory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package txmgr
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
"sort"
Expand All @@ -27,15 +28,18 @@ import (

var (
// ErrInvalidChainID is returned when the chain ID is invalid
ErrInvalidChainID = fmt.Errorf("invalid chain ID")
ErrInvalidChainID = errors.New("invalid chain ID")
// ErrTxnNotFound is returned when a transaction is not found
ErrTxnNotFound = fmt.Errorf("transaction not found")
ErrTxnNotFound = errors.New("transaction not found")
// ErrExistingIdempotencyKey is returned when a transaction with the same idempotency key already exists
ErrExistingIdempotencyKey = fmt.Errorf("transaction with idempotency key already exists")
ErrExistingIdempotencyKey = errors.New("transaction with idempotency key already exists")
// ErrAddressNotFound is returned when an address is not found
ErrAddressNotFound = fmt.Errorf("address not found")
ErrAddressNotFound = errors.New("address not found")
// ErrSequenceNotFound is returned when a sequence is not found
ErrSequenceNotFound = fmt.Errorf("sequence not found")
ErrSequenceNotFound = errors.New("sequence not found")
// ErrCouldNotGetReceipt is the error string we save if we reach our finality depth for a confirmed transaction without ever getting a receipt
// This most likely happened because an external wallet used the account for this nonce
ErrCouldNotGetReceipt = errors.New("could not get receipt")
)

type PersistentTxStore[
Expand All @@ -62,7 +66,7 @@ type InMemoryStore[
SEQ types.Sequence,
FEE feetypes.Fee,
] struct {
lggr logger.Logger
lggr logger.SugaredLogger
chainID CHAIN_ID

keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ]
Expand All @@ -80,7 +84,7 @@ func NewInMemoryStore[
SEQ types.Sequence,
FEE feetypes.Fee,
](
lggr logger.Logger,
lggr logger.SugaredLogger,
chainID CHAIN_ID,
keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ],
txStore PersistentTxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
Expand Down Expand Up @@ -202,7 +206,7 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindL
return seq, fmt.Errorf("find_latest_sequence: %w", ErrAddressNotFound)
}

seq = as.FindLatestSequence()
seq = as.LatestSequence()
if seq.Int64() == 0 {
return seq, fmt.Errorf("find_latest_sequence: %w", ErrSequenceNotFound)
}
Expand Down Expand Up @@ -1473,39 +1477,139 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkA
return fmt.Errorf("mark_all_confirmed_missing_receipt: %w", ErrInvalidChainID)
}

// TODO(jtw): need to complete
// Persist to persistent storage
if err := ms.txStore.MarkAllConfirmedMissingReceipt(ctx, chainID); err != nil {
return err
}

/*
// Persist to persistent storage
if err := ms.txStore.MarkAllConfirmedMissingReceipt(ctx, chainID); err != nil {
return err
}
// Update in memory store
wg := sync.WaitGroup{}
errsLock := sync.Mutex{}
var errs error
for _, as := range ms.addressStates {
wg.Add(1)
go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) {
maxConfirmedSequence := as.MaxConfirmedSequence()
filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool {
if tx.Sequence == nil {
return false
}
if tx.State != TxUnconfirmed {
return false
}

// Update in memory store
fn := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {
if tx.State != TxUnconfirmed {
return
return (*tx.Sequence).Int64() < maxConfirmedSequence.Int64()
}
if tx.Sequence >= maxSequence {
return
states := []txmgrtypes.TxState{TxUnconfirmed}
txs := as.FetchTxs(states, filter)
for _, tx := range txs {
attempt := tx.TxAttempts[0]

if err := as.MoveUnconfirmedToConfirmedMissingReceipt(attempt, *tx.BroadcastAt); err != nil {
err = fmt.Errorf("mark_all_confirmed_missing_receipt: address: %s: %w", as.fromAddress, err)
errsLock.Lock()
errs = errors.Join(errs, err)
errsLock.Unlock()
}
}
wg.Done()
}(as)
}
wg.Wait()

tx.State = TxConfirmedMissingReceipt
}
states := []txmgrtypes.TxState{TxUnconfirmed}
for _, as := range ms.addressStates {
as.ApplyToTxs(states, fn)
}
*/

return nil
return errs
}
func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkOldTxesMissingReceiptAsErrored(ctx context.Context, blockNum int64, finalityDepth uint32, chainID CHAIN_ID) error {
// TODO
return ms.txStore.MarkOldTxesMissingReceiptAsErrored(ctx, blockNum, finalityDepth, chainID)
if ms.chainID.String() != chainID.String() {
return fmt.Errorf("mark_old_txes_missing_receipt_as_errored: %w", ErrInvalidChainID)
}

// Persist to persistent storage
if err := ms.txStore.MarkOldTxesMissingReceiptAsErrored(ctx, blockNum, finalityDepth, chainID); err != nil {
return err
}

// Update in memory store
type result struct {
ID int64
Sequence SEQ
FromAddress ADDR
MaxBroadcastBeforeBlockNum int64
TxHashes []TX_HASH
}
var resultsLock sync.Mutex
var results []result
wg := sync.WaitGroup{}
errsLock := sync.Mutex{}
var errs error
for _, as := range ms.addressStates {
wg.Add(1)
go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) {
filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool {
if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 {
return false
}
if tx.State != TxConfirmedMissingReceipt {
return false
}
attempt := tx.TxAttempts[0]
if attempt.BroadcastBeforeBlockNum == nil {
return false
}

return *attempt.BroadcastBeforeBlockNum < blockNum-int64(finalityDepth)
}
states := []txmgrtypes.TxState{TxConfirmedMissingReceipt}
txs := as.FetchTxs(states, filter)
for _, tx := range txs {
if err := as.MoveConfirmedMissingReceiptToFatalError(tx, null.StringFrom(ErrCouldNotGetReceipt.Error())); err != nil {
err = fmt.Errorf("mark_old_txes_missing_receipt_as_errored: address: %s: %w", as.fromAddress, err)
errsLock.Lock()
errs = errors.Join(errs, err)
errsLock.Unlock()
continue
}
hashes := make([]TX_HASH, len(tx.TxAttempts))
maxBroadcastBeforeBlockNum := int64(0)
for i, attempt := range tx.TxAttempts {
hashes[i] = attempt.Hash
if attempt.BroadcastBeforeBlockNum != nil {
if *attempt.BroadcastBeforeBlockNum > maxBroadcastBeforeBlockNum {
maxBroadcastBeforeBlockNum = *attempt.BroadcastBeforeBlockNum
}
}
}
result := result{

Check failure on line 1582 in common/txmgr/inmemory_store.go

View workflow job for this annotation

GitHub Actions / lint

shadow: declaration of "result" shadows declaration at line 1533 (govet)
ID: tx.ID,
Sequence: *tx.Sequence,
FromAddress: tx.FromAddress,
MaxBroadcastBeforeBlockNum: maxBroadcastBeforeBlockNum,
TxHashes: hashes,
}
resultsLock.Lock()
results = append(results, result)
resultsLock.Unlock()
}
wg.Done()
}(as)
}
wg.Wait()

for _, r := range results {
ms.lggr.Criticalw(fmt.Sprintf("eth_tx with ID %v expired without ever getting a receipt for any of our attempts. "+
"Current block height is %v, transaction was broadcast before block height %v. This transaction may not have not been sent and will be marked as fatally errored. "+
"This can happen if there is another instance of chainlink running that is using the same private key, or if "+
"an external wallet has been used to send a transaction from account %s with nonce %v."+
" Please note that Chainlink requires exclusive ownership of it's private keys and sharing keys across multiple"+
" chainlink instances, or using the chainlink keys with an external wallet is NOT SUPPORTED and WILL lead to missed transactions",
r.ID, blockNum, r.MaxBroadcastBeforeBlockNum, r.FromAddress, r.Sequence), "ethTxID", r.ID, "sequence", r.Sequence, "fromAddress", r.FromAddress, "txHashes", r.TxHashes)
}

return errs
}

func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deepCopyTx(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
// TODO: COMPLETE DEEP COPY WORK
etx := *tx
etx.TxAttempts = make([]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], len(tx.TxAttempts))
copy(etx.TxAttempts, tx.TxAttempts)
Expand Down

0 comments on commit 3f46194

Please sign in to comment.