From 303b2c95a27151f5c6bf150de77cf3065bc14665 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Tue, 12 Dec 2023 17:25:53 +0100 Subject: [PATCH 1/5] Broadcaster: run checker only for unstarted txs --- common/txmgr/broadcaster.go | 54 ++++++++++++++------------- core/chains/evm/txmgr/evm_tx_store.go | 5 ++- 2 files changed, 32 insertions(+), 27 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index f10ecafc670..cd615589590 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -485,22 +485,9 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) proc return false, nil } n++ - var a txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - var retryable bool - a, _, _, retryable, err = eb.NewTxAttempt(ctx, *etx, eb.lggr) - if err != nil { - return retryable, fmt.Errorf("processUnstartedTxs failed on NewAttempt: %w", err) - } - if err := eb.txStore.UpdateTxUnstartedToInProgress(ctx, etx, &a); errors.Is(err, ErrTxRemoved) { - eb.lggr.Debugw("tx removed", "txID", etx.ID, "subject", etx.Subject) - continue - } else if err != nil { - return true, fmt.Errorf("processUnstartedTxs failed on UpdateTxUnstartedToInProgress: %w", err) - } - - if err, retryable := eb.handleInProgressTx(ctx, *etx, a, time.Now()); err != nil { - return retryable, fmt.Errorf("processUnstartedTxs failed on handleInProgressTx: %w", err) + if err, retryable := eb.handleUnstartedTx(ctx, etx); err != nil { + return retryable, fmt.Errorf("processUnstartedTxs failed on handleUnstartedTx: %w", err) } } } @@ -520,11 +507,10 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand return nil, false } -// There can be at most one in_progress transaction per address. -// Here we complete the job that we didn't finish last time. -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) handleInProgressTx(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time) (error, bool) { - if etx.State != TxInProgress { - return fmt.Errorf("invariant violation: expected transaction %v to be in_progress, it was %s", etx.ID, etx.State), false +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) handleUnstartedTx(ctx context.Context, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (error, bool) { + attempt, _, _, retryable, err := eb.NewTxAttempt(ctx, *etx, eb.lggr) + if err != nil { + return fmt.Errorf("processUnstartedTxs failed on NewAttempt: %w", err), retryable } checkerSpec, err := etx.GetChecker() @@ -543,17 +529,35 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand // anyway. checkCtx, cancel := context.WithTimeout(ctx, TransmitCheckTimeout) defer cancel() - err = checker.Check(checkCtx, lgr, etx, attempt) + err = checker.Check(checkCtx, lgr, *etx, attempt) if errors.Is(err, context.Canceled) { lgr.Warn("Transmission checker timed out, sending anyway") } else if err != nil { etx.Error = null.StringFrom(err.Error()) lgr.Warnw("Transmission checker failed, fatally erroring transaction.", "err", err) - return eb.saveFatallyErroredTransaction(lgr, &etx), true + return eb.saveFatallyErroredTransaction(lgr, etx), true } cancel() - lgr.Infow("Sending transaction", "txAttemptID", attempt.ID, "txHash", attempt.Hash, "err", err, "meta", etx.Meta, "feeLimit", etx.FeeLimit, "attempt", attempt, "etx", etx) + if err = eb.txStore.UpdateTxUnstartedToInProgress(ctx, etx, &attempt); errors.Is(err, ErrTxRemoved) { + eb.lggr.Debugw("tx removed", "txID", etx.ID, "subject", etx.Subject) + return nil, false + } else if err != nil { + return fmt.Errorf("processUnstartedTxs failed on UpdateTxUnstartedToInProgress: %w", err), true + } + + return eb.handleInProgressTx(ctx, *etx, attempt, time.Now()) +} + +// There can be at most one in_progress transaction per address. +// Here we complete the job that we didn't finish last time. +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) handleInProgressTx(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time) (error, bool) { + if etx.State != TxInProgress { + return fmt.Errorf("invariant violation: expected transaction %v to be in_progress, it was %s", etx.ID, etx.State), false + } + + lgr := etx.GetLogger(logger.With(eb.lggr, "fee", attempt.TxFee)) + lgr.Infow("Sending transaction", "txAttemptID", attempt.ID, "txHash", attempt.Hash, "meta", etx.Meta, "feeLimit", etx.FeeLimit, "attempt", attempt, "etx", etx) errType, err := eb.client.SendTransactionReturnCode(ctx, etx, attempt, lgr) if errType != client.Fatal { @@ -760,8 +764,8 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) save func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveFatallyErroredTransaction(lgr logger.Logger, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { ctx, cancel := eb.chStop.NewCtx() defer cancel() - if etx.State != TxInProgress { - return fmt.Errorf("can only transition to fatal_error from in_progress, transaction is currently %s", etx.State) + if etx.State != TxInProgress && etx.State != TxUnstarted { + return fmt.Errorf("can only transition to fatal_error from in_progress or unstarted, transaction is currently %s", etx.State) } if !etx.Error.Valid { return errors.New("expected error field to be set") diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index 730809e8dda..083b8290433 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -20,6 +20,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" + "github.com/smartcontractkit/chainlink/v2/common/txmgr" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" @@ -1555,8 +1556,8 @@ func (o *evmTxStore) UpdateTxFatalError(ctx context.Context, etx *Tx) error { ctx, cancel = o.mergeContexts(ctx) defer cancel() qq := o.q.WithOpts(pg.WithParentCtx(ctx)) - if etx.State != txmgr.TxInProgress { - return pkgerrors.Errorf("can only transition to fatal_error from in_progress, transaction is currently %s", etx.State) + if etx.State != txmgr.TxInProgress && etx.State != txmgr.TxUnstarted { + return pkgerrors.Errorf("can only transition to fatal_error from in_progress or unstarted, transaction is currently %s", etx.State) } if !etx.Error.Valid { return errors.New("expected error field to be set") From 27045343ea37481c49c635b289521f986e1dbdf9 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Tue, 12 Dec 2023 18:46:23 +0100 Subject: [PATCH 2/5] attempt to fix sigscanner From da96c05f614408b40b407b81dd864f2486721b09 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Wed, 13 Dec 2023 14:59:55 +0100 Subject: [PATCH 3/5] fix txstore invariant check --- core/chains/evm/txmgr/evm_tx_store.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index 1c9868741fa..afc46acb4dc 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -1556,8 +1556,8 @@ func (o *evmTxStore) UpdateTxFatalError(ctx context.Context, etx *Tx) error { ctx, cancel = o.mergeContexts(ctx) defer cancel() qq := o.q.WithOpts(pg.WithParentCtx(ctx)) - if etx.State != txmgr.TxInProgress { - return pkgerrors.Errorf("can only transition to fatal_error from in_progress, transaction is currently %s", etx.State) + if etx.State != txmgr.TxInProgress && etx.State != txmgr.TxUnstarted { + return pkgerrors.Errorf("can only transition to fatal_error from in_progress or unstarted, transaction is currently %s", etx.State) } if !etx.Error.Valid { return errors.New("expected error field to be set") From 251d99cda985dda42492b429a444490fde6ebad7 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Wed, 13 Dec 2023 16:08:39 +0100 Subject: [PATCH 4/5] add invariant to ensure we are processing unstarted tx --- common/txmgr/broadcaster.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index d45628a7fbe..f446670c4a8 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -508,6 +508,10 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand } func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) handleUnstartedTx(ctx context.Context, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (error, bool) { + if etx.State != TxUnstarted { + return fmt.Errorf("invariant violation: expected transaction %v to be unstarted, it was %s", etx.ID, etx.State), false + } + attempt, _, _, retryable, err := eb.NewTxAttempt(ctx, *etx, eb.lggr) if err != nil { return fmt.Errorf("processUnstartedTxs failed on NewAttempt: %w", err), retryable From 12c948ab33cadfbd3145d8f7536180f3c84ad526 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Wed, 13 Dec 2023 16:22:20 +0100 Subject: [PATCH 5/5] add explanation why we run check only on unstarted --- common/txmgr/broadcaster.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index f446670c4a8..59882716c9d 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -531,6 +531,9 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand // If the transmit check does not complete within the timeout, the transaction will be sent // anyway. + // It's intentional that we only run `Check` for unstarted transactions. + // Running it on other states might lead to nonce duplication, as we might mark applied transactions as fatally errored. + checkCtx, cancel := context.WithTimeout(ctx, TransmitCheckTimeout) defer cancel() err = checker.Check(checkCtx, lgr, *etx, attempt)