Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move next nonce logic from Keystore to Broadcaster #10108

Merged
merged 9 commits into from
Oct 12, 2023
168 changes: 124 additions & 44 deletions common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ type Broadcaster[
txStore txmgrtypes.TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE]
client txmgrtypes.TransactionClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH]
sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ]
resumeCallback ResumeCallback
chainID CHAIN_ID
config txmgrtypes.BroadcasterChainConfig
Expand Down Expand Up @@ -143,6 +143,10 @@ type Broadcaster[
utils.StartStopOnce

parseAddr func(string) (ADDR, error)

sequenceLock sync.RWMutex
nextSequenceMap map[ADDR]SEQ
generateNextSequence types.GenerateNextSequenceFunc[SEQ]
}

func NewBroadcaster[
Expand All @@ -163,11 +167,12 @@ func NewBroadcaster[
keystore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ],
eventBroadcaster pg.EventBroadcaster,
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH],
sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ],
logger logger.Logger,
checkerFactory TransmitCheckerFactory[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
autoSyncSequence bool,
parseAddress func(string) (ADDR, error),
generateNextSequence types.GenerateNextSequenceFunc[SEQ],
) *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
logger = logger.Named("Broadcaster")
b := &Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{
Expand All @@ -189,6 +194,7 @@ func NewBroadcaster[
}

b.processUnstartedTxsImpl = b.processUnstartedTxs
b.generateNextSequence = generateNextSequence
return b
}

Expand Down Expand Up @@ -235,6 +241,13 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) star
eb.wg.Add(1)
go eb.txInsertTriggerer()

eb.sequenceLock.Lock()
prashantkumar1982 marked this conversation as resolved.
Show resolved Hide resolved
defer eb.sequenceLock.Unlock()
eb.nextSequenceMap, err = eb.loadNextSequenceMap(eb.enabledAddresses)
if err != nil {
return errors.Wrap(err, "Broadcaster: failed to load next sequence map")
}

eb.isStarted = true
return nil
}
Expand Down Expand Up @@ -312,6 +325,33 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) txIn
}
}

// Load the next sequence map using the tx table or on-chain (if not found in tx table)
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) loadNextSequenceMap(addresses []ADDR) (map[ADDR]SEQ, error) {
ctx, cancel := eb.chStop.NewCtx()
defer cancel()

nextSequenceMap := make(map[ADDR]SEQ)
for _, address := range addresses {
// Get the highest sequence from the tx table
// Will need to be incremented since this sequence is already used
seq, err := eb.txStore.FindLatestSequence(ctx, address, eb.chainID)
if err != nil {
// Look for nonce on-chain if no tx found for address in TxStore or if error occurred
// Returns the nonce that should be used for the next transaction so no need to increment
seq, err = eb.client.PendingSequenceAt(ctx, address)
if err != nil {
return nil, errors.New("failed to retrieve next sequence from on-chain causing failure to load next sequence map on broadcaster startup")
}

nextSequenceMap[address] = seq
} else {
nextSequenceMap[address] = eb.generateNextSequence(seq)
prashantkumar1982 marked this conversation as resolved.
Show resolved Hide resolved
}
}

return nextSequenceMap, nil
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) newSequenceSyncBackoff() backoff.Backoff {
return backoff.Backoff{
Min: 100 * time.Millisecond,
Expand Down Expand Up @@ -389,31 +429,42 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) moni
}
}

// syncSequence tries to sync the key sequence, retrying indefinitely until success
// syncSequence tries to sync the key sequence, retrying indefinitely until success or stop signal is sent
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SyncSequence(ctx context.Context, addr ADDR) {
sequenceSyncRetryBackoff := eb.newSequenceSyncBackoff()
if err := eb.sequenceSyncer.Sync(ctx, addr); err != nil {
// Enter retry loop with backoff
var attempt int
eb.logger.Errorw("Failed to sync with on-chain sequence", "address", addr.String(), "attempt", attempt, "err", err)
for {
select {
case <-eb.chStop:
return
case <-time.After(sequenceSyncRetryBackoff.Duration()):
attempt++

if err := eb.sequenceSyncer.Sync(ctx, addr); err != nil {
if attempt > 5 {
eb.logger.Criticalw("Failed to sync with on-chain sequence", "address", addr.String(), "attempt", attempt, "err", err)
eb.SvcErrBuffer.Append(err)
} else {
eb.logger.Warnw("Failed to sync with on-chain sequence", "address", addr.String(), "attempt", attempt, "err", err)
}
continue
localSequence, err := eb.GetNextSequence(addr)
// Address not found in map so skip sync
if err != nil {
eb.logger.Criticalw("Failed to retrieve local next sequence for address", "address", addr.String(), "err", err)
return
}

// Enter loop with retries
var attempt int
for {
select {
case <-eb.chStop:
return
case <-time.After(sequenceSyncRetryBackoff.Duration()):
attempt++
newNextSequence, err := eb.sequenceSyncer.Sync(ctx, addr, localSequence)
if err != nil {
if attempt > 5 {
eb.logger.Criticalw("Failed to sync with on-chain sequence", "address", addr.String(), "attempt", attempt, "err", err)
eb.SvcErrBuffer.Append(err)
} else {
eb.logger.Warnw("Failed to sync with on-chain sequence", "address", addr.String(), "attempt", attempt, "err", err)
}
return
continue
}
// Found new sequence to use from on-chain
if localSequence.String() != newNextSequence.String() {
eb.logger.Infow("Fast-forward sequence", "address", addr, "newNextSequence", newNextSequence, "oldNextSequence", localSequence)
// Set new sequence in the map
eb.SetNextSequence(addr, newNextSequence)
}
return

}
}
}
Expand Down Expand Up @@ -505,16 +556,6 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
return nil, false
}

// This function is used to pass the queryer from the txmgr to the keystore.
// It is inevitable we have to pass the queryer because we need the keystate's next sequence to be incremented
// atomically alongside the transition from `in_progress` to `broadcast` so it is ready for the next transaction
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) incrementNextSequenceAtomic(tx pg.Queryer, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
if err := eb.incrementNextSequence(etx.FromAddress, *etx.Sequence, pg.WithQueryer(tx)); err != nil {
return errors.Wrap(err, "saveUnconfirmed failed")
}
return nil
}

// There can be at most one in_progress transaction per address.
// Here we complete the job that we didn't finish last time.
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) handleInProgressTx(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time) (error, bool) {
Expand Down Expand Up @@ -603,9 +644,19 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
// and hand off to the confirmer to get the receipt (or mark as
// failed).
observeTimeUntilBroadcast(eb.chainID, etx.CreatedAt, time.Now())
return eb.txStore.UpdateTxAttemptInProgressToBroadcast(&etx, attempt, txmgrtypes.TxAttemptBroadcast, func(tx pg.Queryer) error {
return eb.incrementNextSequenceAtomic(tx, etx)
}), true
// Check if from_address exists in map to ensure it is valid before broadcasting
var sequence SEQ
sequence, err = eb.GetNextSequence(etx.FromAddress)
if err != nil {
return err, true
}
err = eb.txStore.UpdateTxAttemptInProgressToBroadcast(ctx, &etx, attempt, txmgrtypes.TxAttemptBroadcast)
if err != nil {
return err, true
}
// Increment sequence if successfully broadcasted
eb.IncrementNextSequence(etx.FromAddress, sequence)
return err, true
case clienttypes.Underpriced:
return eb.tryAgainBumpingGas(ctx, lgr, err, etx, attempt, initialBroadcastAt)
case clienttypes.InsufficientFunds:
Expand Down Expand Up @@ -650,9 +701,20 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
// Despite the error, the RPC node considers the previously sent
// transaction to have been accepted. In this case, the right thing to
// do is assume success and hand off to Confirmer
return eb.txStore.UpdateTxAttemptInProgressToBroadcast(&etx, attempt, txmgrtypes.TxAttemptBroadcast, func(tx pg.Queryer) error {
return eb.incrementNextSequenceAtomic(tx, etx)
}), true

// Check if from_address exists in map to ensure it is valid before broadcasting
var sequence SEQ
sequence, err = eb.GetNextSequence(etx.FromAddress)
if err != nil {
return err, true
}
err = eb.txStore.UpdateTxAttemptInProgressToBroadcast(ctx, &etx, attempt, txmgrtypes.TxAttemptBroadcast)
if err != nil {
return err, true
}
// Increment sequence if successfully broadcasted
eb.IncrementNextSequence(etx.FromAddress, sequence)
return err, true
}
// Either the unknown error prevented the transaction from being mined, or
// it has not yet propagated to the mempool, or there is some race on the
Expand All @@ -679,7 +741,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) next
return nil, errors.Wrap(err, "findNextUnstartedTransactionFromAddress failed")
}

sequence, err := eb.getNextSequence(etx.FromAddress)
sequence, err := eb.GetNextSequence(etx.FromAddress)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -763,12 +825,30 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) save
return eb.txStore.UpdateTxFatalError(ctx, etx)
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) getNextSequence(address ADDR) (sequence SEQ, err error) {
return eb.ks.NextSequence(address, eb.chainID)
// Used to get the next usable sequence for a transaction
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetNextSequence(address ADDR) (seq SEQ, err error) {
eb.sequenceLock.Lock()
defer eb.sequenceLock.Unlock()
// Get next sequence from map
seq, exists := eb.nextSequenceMap[address]
if !exists {
return seq, errors.New(fmt.Sprint("address not found in next sequence map: ", address))
}
return seq, nil
}

// Used to increment the sequence in the mapping to have the next usable one available for the next transaction
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) IncrementNextSequence(address ADDR, seq SEQ) {
eb.sequenceLock.Lock()
defer eb.sequenceLock.Unlock()
eb.nextSequenceMap[address] = eb.generateNextSequence(seq)
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) incrementNextSequence(address ADDR, currentSequence SEQ, qopts ...pg.QOpt) error {
return eb.ks.IncrementNextSequence(address, eb.chainID, currentSequence, qopts...)
// Used to set the next sequence explicitly to a certain value
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SetNextSequence(address ADDR, seq SEQ) {
eb.sequenceLock.Lock()
defer eb.sequenceLock.Unlock()
eb.nextSequenceMap[address] = seq
}

func observeTimeUntilBroadcast[CHAIN_ID types.ID](chainID CHAIN_ID, createdAt, broadcastAt time.Time) {
Expand Down
10 changes: 5 additions & 5 deletions common/txmgr/mocks/tx_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/common/types"
)

type SequenceSyncer[ADDR types.Hashable, TX_HASH types.Hashable, BLOCK_HASH types.Hashable] interface {
Sync(ctx context.Context, addr ADDR) (err error)
type SequenceSyncer[ADDR types.Hashable, TX_HASH types.Hashable, BLOCK_HASH types.Hashable, SEQ types.Sequence] interface {
Sync(ctx context.Context, addr ADDR, localSequence SEQ) (SEQ, error)
}
14 changes: 6 additions & 8 deletions common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type TxManager[
GetForwarderForEOA(eoa ADDR) (forwarder ADDR, err error)
RegisterResumeCallback(fn ResumeCallback)
SendNativeToken(ctx context.Context, chainID CHAIN_ID, from, to ADDR, value big.Int, gasLimit uint32) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
Reset(f func(), addr ADDR, abandon bool) error
Reset(addr ADDR, abandon bool) error
}

type reset struct {
Expand Down Expand Up @@ -91,7 +91,7 @@ type Txm[
confirmer *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
fwdMgr txmgrtypes.ForwarderManager[ADDR]
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH]
sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ]
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RegisterResumeCallback(fn ResumeCallback) {
Expand Down Expand Up @@ -120,7 +120,7 @@ func NewTxm[
fwdMgr txmgrtypes.ForwarderManager[ADDR],
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH],
sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ],
broadcaster *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
confirmer *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
resender *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
Expand Down Expand Up @@ -196,13 +196,11 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx
})
}

// Reset stops Broadcaster/Confirmer, executes callback, then starts them
// again
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Reset(callback func(), addr ADDR, abandon bool) (err error) {
// Reset stops Broadcaster/Confirmer, executes callback, then starts them again
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Reset(addr ADDR, abandon bool) (err error) {
ok := b.IfStarted(func() {
done := make(chan error)
f := func() {
callback()
if abandon {
err = b.abandon(addr)
}
Expand Down Expand Up @@ -546,7 +544,7 @@ func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Cre
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetForwarderForEOA(addr ADDR) (fwdr ADDR, err error) {
return fwdr, err
}
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Reset(f func(), addr ADDR, abandon bool) error {
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Reset(addr ADDR, abandon bool) error {
return nil
}

Expand Down
3 changes: 0 additions & 3 deletions common/txmgr/types/keystore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package types

import (
"github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

// KeyStore encompasses the subset of keystore used by txmgr
Expand All @@ -17,8 +16,6 @@ type KeyStore[
SEQ types.Sequence,
] interface {
CheckEnabled(address ADDR, chainID CHAIN_ID) error
NextSequence(address ADDR, chainID CHAIN_ID, qopts ...pg.QOpt) (SEQ, error)
EnabledAddressesForChain(chainId CHAIN_ID) ([]ADDR, error)
IncrementNextSequence(address ADDR, chainID CHAIN_ID, currentSequence SEQ, qopts ...pg.QOpt) error
SubscribeToKeyChanges() (ch chan struct{}, unsub func())
}
Loading
Loading