Skip to content

Commit

Permalink
[BCI-2151] Refactor prom reporter db API (#11394)
Browse files Browse the repository at this point in the history
* Refactor prom reporter db API

* Add test case

* lint

* Use Txm from LegacyChainContainer

* Use legacyevm

* lint

* Use TXM chainID

* lint

---------

Co-authored-by: Prashant Yadav <[email protected]>
  • Loading branch information
DylanTinianov and prashantkumar1982 authored Nov 30, 2023
1 parent 82ed297 commit c21f4ff
Show file tree
Hide file tree
Showing 12 changed files with 476 additions and 31 deletions.
74 changes: 74 additions & 0 deletions common/txmgr/mocks/tx_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 28 additions & 1 deletion common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
"time"

"github.com/google/uuid"
nullv4 "gopkg.in/guregu/null.v4"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"

feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
Expand Down Expand Up @@ -55,6 +55,9 @@ type TxManager[
FindTxesWithMetaFieldByReceiptBlockNum(ctx context.Context, metaField string, blockNum int64, chainID *big.Int) (txes []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
// Find transactions loaded with transaction attempts and receipts by transaction IDs and states
FindTxesWithAttemptsAndReceiptsByIdsAndState(ctx context.Context, ids []big.Int, states []txmgrtypes.TxState, chainID *big.Int) (txes []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindEarliestUnconfirmedBroadcastTime(ctx context.Context) (nullv4.Time, error)
FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context) (nullv4.Int, error)
CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState) (count uint32, err error)
}

type reset struct {
Expand Down Expand Up @@ -576,6 +579,18 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesWi
return
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedBroadcastTime(ctx context.Context) (nullv4.Time, error) {
return b.txStore.FindEarliestUnconfirmedBroadcastTime(ctx, b.chainID)
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context) (nullv4.Int, error) {
return b.txStore.FindEarliestUnconfirmedTxAttemptBlock(ctx, b.chainID)
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState) (count uint32, err error) {
return b.txStore.CountTransactionsByState(ctx, state, b.chainID)
}

type NullTxManager[
CHAIN_ID types.ID,
HEAD types.Head[BLOCK_HASH],
Expand Down Expand Up @@ -642,3 +657,15 @@ func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Fin
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) FindTxesWithAttemptsAndReceiptsByIdsAndState(ctx context.Context, ids []big.Int, states []txmgrtypes.TxState, chainID *big.Int) (txes []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) {
return txes, errors.New(n.ErrMsg)
}

func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) FindEarliestUnconfirmedBroadcastTime(ctx context.Context) (nullv4.Time, error) {
return nullv4.Time{}, errors.New(n.ErrMsg)
}

func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context) (nullv4.Int, error) {
return nullv4.Int{}, errors.New(n.ErrMsg)
}

func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState) (count uint32, err error) {
return count, errors.New(n.ErrMsg)
}
74 changes: 74 additions & 0 deletions common/txmgr/types/mocks/tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/google/uuid"
"gopkg.in/guregu/null.v4"

feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
Expand Down Expand Up @@ -64,6 +65,7 @@ type TransactionStore[
FEE feetypes.Fee,
] interface {
CountUnconfirmedTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (count uint32, err error)
CountTransactionsByState(ctx context.Context, state TxState, chainID CHAIN_ID) (count uint32, err error)
CountUnstartedTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (count uint32, err error)
CreateTransaction(ctx context.Context, txRequest TxRequest[ADDR, TX_HASH], chainID CHAIN_ID) (tx Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
DeleteInProgressAttempt(ctx context.Context, attempt TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error
Expand All @@ -79,6 +81,8 @@ type TransactionStore[
FindTxWithSequence(ctx context.Context, fromAddress ADDR, seq SEQ) (etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindNextUnstartedTransactionFromAddress(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], fromAddress ADDR, chainID CHAIN_ID) error
FindTransactionsConfirmedInBlockRange(ctx context.Context, highBlockNumber, lowBlockNumber int64, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (null.Time, error)
FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (null.Int, error)
GetTxInProgress(ctx context.Context, fromAddress ADDR) (etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
GetInProgressTxAttempts(ctx context.Context, address ADDR, chainID CHAIN_ID) (attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
GetNonFatalTransactions(ctx context.Context, chainID CHAIN_ID) (txs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
Expand Down
47 changes: 47 additions & 0 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,39 @@ ORDER BY nonce ASC
return etxs, pkgerrors.Wrap(err, "FindTransactionsConfirmedInBlockRange failed")
}

func (o *evmTxStore) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID *big.Int) (broadcastAt nullv4.Time, err error) {
var cancel context.CancelFunc
ctx, cancel = o.mergeContexts(ctx)
defer cancel()
qq := o.q.WithOpts(pg.WithParentCtx(ctx))
err = qq.Transaction(func(tx pg.Queryer) error {
if err = qq.QueryRowContext(ctx, `SELECT min(initial_broadcast_at) FROM evm.txes WHERE state = 'unconfirmed' AND evm_chain_id = $1`, chainID.String()).Scan(&broadcastAt); err != nil {
return fmt.Errorf("failed to query for unconfirmed eth_tx count: %w", err)
}
return nil
}, pg.OptReadOnlyTx())
return broadcastAt, err
}

func (o *evmTxStore) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID *big.Int) (earliestUnconfirmedTxBlock nullv4.Int, err error) {
var cancel context.CancelFunc
ctx, cancel = o.mergeContexts(ctx)
defer cancel()
qq := o.q.WithOpts(pg.WithParentCtx(ctx))
err = qq.Transaction(func(tx pg.Queryer) error {
err = qq.QueryRowContext(ctx, `
SELECT MIN(broadcast_before_block_num) FROM evm.tx_attempts
JOIN evm.txes ON evm.txes.id = evm.tx_attempts.eth_tx_id
WHERE evm.txes.state = 'unconfirmed'
AND evm_chain_id = $1`, chainID.String()).Scan(&earliestUnconfirmedTxBlock)
if err != nil {
return fmt.Errorf("failed to query for earliest unconfirmed tx block: %w", err)
}
return nil
}, pg.OptReadOnlyTx())
return earliestUnconfirmedTxBlock, err
}

func (o *evmTxStore) IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID *big.Int) (finalized bool, err error) {
var cancel context.CancelFunc
ctx, cancel = o.mergeContexts(ctx)
Expand Down Expand Up @@ -1733,6 +1766,20 @@ func (o *evmTxStore) CountUnconfirmedTransactions(ctx context.Context, fromAddre
return o.countTransactionsWithState(ctx, fromAddress, txmgr.TxUnconfirmed, chainID)
}

// CountTransactionsByState returns the number of transactions with any fromAddress in the given state
func (o *evmTxStore) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState, chainID *big.Int) (count uint32, err error) {
var cancel context.CancelFunc
ctx, cancel = o.mergeContexts(ctx)
defer cancel()
qq := o.q.WithOpts(pg.WithParentCtx(ctx))
err = qq.Get(&count, `SELECT count(*) FROM evm.txes WHERE state = $1 AND evm_chain_id = $2`,
state, chainID.String())
if err != nil {
return 0, fmt.Errorf("failed to CountTransactionsByState: %w", err)
}
return count, nil
}

// CountUnstartedTransactions returns the number of unconfirmed transactions
func (o *evmTxStore) CountUnstartedTransactions(ctx context.Context, fromAddress common.Address, chainID *big.Int) (count uint32, err error) {
return o.countTransactionsWithState(ctx, fromAddress, txmgr.TxUnstarted, chainID)
Expand Down
Loading

0 comments on commit c21f4ff

Please sign in to comment.