Skip to content

Commit

Permalink
[BCI-2235] Abandoned Transaction Tracking (#11143)
Browse files Browse the repository at this point in the history
* Create Abandoned Tracker

* Handle abandoned transactions

* Update abandoned_tracker.go

* fix tabbing

* Query for non-finalized txes

* fix tracker

* Resend abandoned txes

* Add evm_tx_store test

* Resend abandoned transactions

* add tracker testing

* presubmit

* tidy

* generate

* fix test

* lint

* tidy

* fix txmgr tests

* generate

* Get tx by ID

* generate

* ensure tx attemps

* Confirm abandoned txes

* tracker reset

* lint

* Update tracker.go

* Get Tx by ID

* Update text

* Make tracker its own component

* check finality_depth

* remove TxAbandoned state

* generate

* Fix race conditions

* Update db functions

* Add state machine to tracker

* Delete coverage.txt

* lint

* count receipts in query

* add context

* update cltest

* Add tracker description

* filter chainID

* update logger

* generate

* Update CHANGELOG.md

* Test unstarted txes

* lint

* undo imports

---------

Co-authored-by: amit-momin <[email protected]>
  • Loading branch information
DylanTinianov and amit-momin authored Nov 29, 2023
1 parent 4fbb56e commit c8ca97e
Show file tree
Hide file tree
Showing 19 changed files with 920 additions and 28 deletions.
27 changes: 18 additions & 9 deletions common/txmgr/resender.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ type Resender[
ADDR types.Hashable,
TX_HASH types.Hashable,
BLOCK_HASH types.Hashable,
R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH],
SEQ types.Sequence,
FEE feetypes.Fee,
] struct {
txStore txmgrtypes.TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE]
client txmgrtypes.TransactionClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
ks txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ]
chainID CHAIN_ID
interval time.Duration
Expand All @@ -64,25 +66,28 @@ func NewResender[
ADDR types.Hashable,
TX_HASH types.Hashable,
BLOCK_HASH types.Hashable,
R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH],
SEQ types.Sequence,
FEE feetypes.Fee,
](
lggr logger.Logger,
txStore txmgrtypes.TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE],
client txmgrtypes.TransactionClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
ks txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ],
pollInterval time.Duration,
config txmgrtypes.ResenderChainConfig,
txConfig txmgrtypes.ResenderTransactionsConfig,
) *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
) *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
if txConfig.ResendAfterThreshold() == 0 {
panic("Resender requires a non-zero threshold")
}
// todo: add context to txStore https://smartcontract-it.atlassian.net/browse/BCI-1585
ctx, cancel := context.WithCancel(context.Background())
return &Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{
return &Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{
txStore,
client,
tracker,
ks,
client.ConfiguredChainID(),
pollInterval,
Expand All @@ -97,18 +102,18 @@ func NewResender[
}

// Start is a comment which satisfies the linter
func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Start() {
func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start() {
er.logger.Debugf("Enabled with poll interval of %s and age threshold of %s", er.interval, er.txConfig.ResendAfterThreshold())
go er.runLoop()
}

// Stop is a comment which satisfies the linter
func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Stop() {
func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Stop() {
er.cancel()
<-er.chDone
}

func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) runLoop() {
func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop() {
defer close(er.chDone)

if err := er.resendUnconfirmed(); err != nil {
Expand All @@ -129,16 +134,20 @@ func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) runLoop() {
}
}

func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) resendUnconfirmed() error {
enabledAddresses, err := er.ks.EnabledAddressesForChain(er.chainID)
func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) resendUnconfirmed() error {
resendAddresses, err := er.ks.EnabledAddressesForChain(er.chainID)
if err != nil {
return fmt.Errorf("Resender failed getting enabled keys for chain %s: %w", er.chainID.String(), err)
}

resendAddresses = append(resendAddresses, er.tracker.GetAbandonedAddresses()...)

ageThreshold := er.txConfig.ResendAfterThreshold()
maxInFlightTransactions := er.txConfig.MaxInFlight()
olderThan := time.Now().Add(-ageThreshold)
var allAttempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
for _, k := range enabledAddresses {

for _, k := range resendAddresses {
var attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
attempts, err = er.txStore.FindTxAttemptsRequiringResend(er.ctx, olderThan, maxInFlightTransactions, er.chainID, k)
if err != nil {
Expand Down Expand Up @@ -189,7 +198,7 @@ func logResendResult(lggr logger.Logger, codes []client.SendTxReturnCode) {
lggr.Debugw("Completed", "n", len(codes), "nNew", nNew, "nFatal", nFatal)
}

func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) logStuckAttempts(attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], fromAddress ADDR) {
func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) logStuckAttempts(attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], fromAddress ADDR) {
if time.Since(er.lastAlertTimestamps[fromAddress.String()]) >= unconfirmedTxAlertLogFrequency {
oldestAttempt, exists := findOldestUnconfirmedAttempt(attempts)
if exists {
Expand Down
11 changes: 10 additions & 1 deletion common/txmgr/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package txmgr

import (
"context"
"time"

txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
)
Expand All @@ -13,6 +14,14 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXX
ec.client = client
}

func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestSetTTL(ttl time.Duration) {
tr.ttl = ttl
}

func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXDeliverBlock(blockHeight int64) {
tr.mb.Deliver(blockHeight)
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) XXXTestStartInternal() error {
return eb.startInternal()
}
Expand All @@ -33,7 +42,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXX
return ec.closeInternal()
}

func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) XXXTestResendUnconfirmed() error {
func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestResendUnconfirmed() error {
return er.resendUnconfirmed()
}

Expand Down
Loading

0 comments on commit c8ca97e

Please sign in to comment.