Skip to content

Commit

Permalink
Move next nonce logic from Keystore to Broadcaster (#10108)
Browse files Browse the repository at this point in the history
* Moved next_nonce logic from keystore to broadcaster

* Changed sequence Increment method name to Next

* Addressed PR feedback

* Fixed changelog entry

* Addressed PR feedback

* Updated to pass the next seq func as a parameter to the broadcaster

* Removed unnecessary mutex initialization

* Addressed PR feedback

* Changed FindHighestSequence method name to FindLatestSequence
  • Loading branch information
amit-momin authored Oct 12, 2023
1 parent 838d162 commit 4983951
Show file tree
Hide file tree
Showing 41 changed files with 648 additions and 993 deletions.
168 changes: 124 additions & 44 deletions common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ type Broadcaster[
txStore txmgrtypes.TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE]
client txmgrtypes.TransactionClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH]
sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ]
resumeCallback ResumeCallback
chainID CHAIN_ID
config txmgrtypes.BroadcasterChainConfig
Expand Down Expand Up @@ -143,6 +143,10 @@ type Broadcaster[
utils.StartStopOnce

parseAddr func(string) (ADDR, error)

sequenceLock sync.RWMutex
nextSequenceMap map[ADDR]SEQ
generateNextSequence types.GenerateNextSequenceFunc[SEQ]
}

func NewBroadcaster[
Expand All @@ -163,11 +167,12 @@ func NewBroadcaster[
keystore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ],
eventBroadcaster pg.EventBroadcaster,
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH],
sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ],
logger logger.Logger,
checkerFactory TransmitCheckerFactory[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
autoSyncSequence bool,
parseAddress func(string) (ADDR, error),
generateNextSequence types.GenerateNextSequenceFunc[SEQ],
) *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
logger = logger.Named("Broadcaster")
b := &Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{
Expand All @@ -189,6 +194,7 @@ func NewBroadcaster[
}

b.processUnstartedTxsImpl = b.processUnstartedTxs
b.generateNextSequence = generateNextSequence
return b
}

Expand Down Expand Up @@ -235,6 +241,13 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) star
eb.wg.Add(1)
go eb.txInsertTriggerer()

eb.sequenceLock.Lock()
defer eb.sequenceLock.Unlock()
eb.nextSequenceMap, err = eb.loadNextSequenceMap(eb.enabledAddresses)
if err != nil {
return errors.Wrap(err, "Broadcaster: failed to load next sequence map")
}

eb.isStarted = true
return nil
}
Expand Down Expand Up @@ -312,6 +325,33 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) txIn
}
}

// Load the next sequence map using the tx table or on-chain (if not found in tx table)
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) loadNextSequenceMap(addresses []ADDR) (map[ADDR]SEQ, error) {
ctx, cancel := eb.chStop.NewCtx()
defer cancel()

nextSequenceMap := make(map[ADDR]SEQ)
for _, address := range addresses {
// Get the highest sequence from the tx table
// Will need to be incremented since this sequence is already used
seq, err := eb.txStore.FindLatestSequence(ctx, address, eb.chainID)
if err != nil {
// Look for nonce on-chain if no tx found for address in TxStore or if error occurred
// Returns the nonce that should be used for the next transaction so no need to increment
seq, err = eb.client.PendingSequenceAt(ctx, address)
if err != nil {
return nil, errors.New("failed to retrieve next sequence from on-chain causing failure to load next sequence map on broadcaster startup")
}

nextSequenceMap[address] = seq
} else {
nextSequenceMap[address] = eb.generateNextSequence(seq)
}
}

return nextSequenceMap, nil
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) newSequenceSyncBackoff() backoff.Backoff {
return backoff.Backoff{
Min: 100 * time.Millisecond,
Expand Down Expand Up @@ -389,31 +429,42 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) moni
}
}

// syncSequence tries to sync the key sequence, retrying indefinitely until success
// syncSequence tries to sync the key sequence, retrying indefinitely until success or stop signal is sent
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SyncSequence(ctx context.Context, addr ADDR) {
sequenceSyncRetryBackoff := eb.newSequenceSyncBackoff()
if err := eb.sequenceSyncer.Sync(ctx, addr); err != nil {
// Enter retry loop with backoff
var attempt int
eb.logger.Errorw("Failed to sync with on-chain sequence", "address", addr.String(), "attempt", attempt, "err", err)
for {
select {
case <-eb.chStop:
return
case <-time.After(sequenceSyncRetryBackoff.Duration()):
attempt++

if err := eb.sequenceSyncer.Sync(ctx, addr); err != nil {
if attempt > 5 {
eb.logger.Criticalw("Failed to sync with on-chain sequence", "address", addr.String(), "attempt", attempt, "err", err)
eb.SvcErrBuffer.Append(err)
} else {
eb.logger.Warnw("Failed to sync with on-chain sequence", "address", addr.String(), "attempt", attempt, "err", err)
}
continue
localSequence, err := eb.GetNextSequence(addr)
// Address not found in map so skip sync
if err != nil {
eb.logger.Criticalw("Failed to retrieve local next sequence for address", "address", addr.String(), "err", err)
return
}

// Enter loop with retries
var attempt int
for {
select {
case <-eb.chStop:
return
case <-time.After(sequenceSyncRetryBackoff.Duration()):
attempt++
newNextSequence, err := eb.sequenceSyncer.Sync(ctx, addr, localSequence)
if err != nil {
if attempt > 5 {
eb.logger.Criticalw("Failed to sync with on-chain sequence", "address", addr.String(), "attempt", attempt, "err", err)
eb.SvcErrBuffer.Append(err)
} else {
eb.logger.Warnw("Failed to sync with on-chain sequence", "address", addr.String(), "attempt", attempt, "err", err)
}
return
continue
}
// Found new sequence to use from on-chain
if localSequence.String() != newNextSequence.String() {
eb.logger.Infow("Fast-forward sequence", "address", addr, "newNextSequence", newNextSequence, "oldNextSequence", localSequence)
// Set new sequence in the map
eb.SetNextSequence(addr, newNextSequence)
}
return

}
}
}
Expand Down Expand Up @@ -505,16 +556,6 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
return nil, false
}

// This function is used to pass the queryer from the txmgr to the keystore.
// It is inevitable we have to pass the queryer because we need the keystate's next sequence to be incremented
// atomically alongside the transition from `in_progress` to `broadcast` so it is ready for the next transaction
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) incrementNextSequenceAtomic(tx pg.Queryer, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
if err := eb.incrementNextSequence(etx.FromAddress, *etx.Sequence, pg.WithQueryer(tx)); err != nil {
return errors.Wrap(err, "saveUnconfirmed failed")
}
return nil
}

// There can be at most one in_progress transaction per address.
// Here we complete the job that we didn't finish last time.
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) handleInProgressTx(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time) (error, bool) {
Expand Down Expand Up @@ -603,9 +644,19 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
// and hand off to the confirmer to get the receipt (or mark as
// failed).
observeTimeUntilBroadcast(eb.chainID, etx.CreatedAt, time.Now())
return eb.txStore.UpdateTxAttemptInProgressToBroadcast(&etx, attempt, txmgrtypes.TxAttemptBroadcast, func(tx pg.Queryer) error {
return eb.incrementNextSequenceAtomic(tx, etx)
}), true
// Check if from_address exists in map to ensure it is valid before broadcasting
var sequence SEQ
sequence, err = eb.GetNextSequence(etx.FromAddress)
if err != nil {
return err, true
}
err = eb.txStore.UpdateTxAttemptInProgressToBroadcast(ctx, &etx, attempt, txmgrtypes.TxAttemptBroadcast)
if err != nil {
return err, true
}
// Increment sequence if successfully broadcasted
eb.IncrementNextSequence(etx.FromAddress, sequence)
return err, true
case clienttypes.Underpriced:
return eb.tryAgainBumpingGas(ctx, lgr, err, etx, attempt, initialBroadcastAt)
case clienttypes.InsufficientFunds:
Expand Down Expand Up @@ -650,9 +701,20 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
// Despite the error, the RPC node considers the previously sent
// transaction to have been accepted. In this case, the right thing to
// do is assume success and hand off to Confirmer
return eb.txStore.UpdateTxAttemptInProgressToBroadcast(&etx, attempt, txmgrtypes.TxAttemptBroadcast, func(tx pg.Queryer) error {
return eb.incrementNextSequenceAtomic(tx, etx)
}), true

// Check if from_address exists in map to ensure it is valid before broadcasting
var sequence SEQ
sequence, err = eb.GetNextSequence(etx.FromAddress)
if err != nil {
return err, true
}
err = eb.txStore.UpdateTxAttemptInProgressToBroadcast(ctx, &etx, attempt, txmgrtypes.TxAttemptBroadcast)
if err != nil {
return err, true
}
// Increment sequence if successfully broadcasted
eb.IncrementNextSequence(etx.FromAddress, sequence)
return err, true
}
// Either the unknown error prevented the transaction from being mined, or
// it has not yet propagated to the mempool, or there is some race on the
Expand All @@ -679,7 +741,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) next
return nil, errors.Wrap(err, "findNextUnstartedTransactionFromAddress failed")
}

sequence, err := eb.getNextSequence(etx.FromAddress)
sequence, err := eb.GetNextSequence(etx.FromAddress)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -763,12 +825,30 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) save
return eb.txStore.UpdateTxFatalError(ctx, etx)
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) getNextSequence(address ADDR) (sequence SEQ, err error) {
return eb.ks.NextSequence(address, eb.chainID)
// Used to get the next usable sequence for a transaction
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetNextSequence(address ADDR) (seq SEQ, err error) {
eb.sequenceLock.Lock()
defer eb.sequenceLock.Unlock()
// Get next sequence from map
seq, exists := eb.nextSequenceMap[address]
if !exists {
return seq, errors.New(fmt.Sprint("address not found in next sequence map: ", address))
}
return seq, nil
}

// Used to increment the sequence in the mapping to have the next usable one available for the next transaction
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) IncrementNextSequence(address ADDR, seq SEQ) {
eb.sequenceLock.Lock()
defer eb.sequenceLock.Unlock()
eb.nextSequenceMap[address] = eb.generateNextSequence(seq)
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) incrementNextSequence(address ADDR, currentSequence SEQ, qopts ...pg.QOpt) error {
return eb.ks.IncrementNextSequence(address, eb.chainID, currentSequence, qopts...)
// Used to set the next sequence explicitly to a certain value
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SetNextSequence(address ADDR, seq SEQ) {
eb.sequenceLock.Lock()
defer eb.sequenceLock.Unlock()
eb.nextSequenceMap[address] = seq
}

func observeTimeUntilBroadcast[CHAIN_ID types.ID](chainID CHAIN_ID, createdAt, broadcastAt time.Time) {
Expand Down
10 changes: 5 additions & 5 deletions common/txmgr/mocks/tx_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/common/types"
)

type SequenceSyncer[ADDR types.Hashable, TX_HASH types.Hashable, BLOCK_HASH types.Hashable] interface {
Sync(ctx context.Context, addr ADDR) (err error)
type SequenceSyncer[ADDR types.Hashable, TX_HASH types.Hashable, BLOCK_HASH types.Hashable, SEQ types.Sequence] interface {
Sync(ctx context.Context, addr ADDR, localSequence SEQ) (SEQ, error)
}
14 changes: 6 additions & 8 deletions common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type TxManager[
GetForwarderForEOA(eoa ADDR) (forwarder ADDR, err error)
RegisterResumeCallback(fn ResumeCallback)
SendNativeToken(ctx context.Context, chainID CHAIN_ID, from, to ADDR, value big.Int, gasLimit uint32) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
Reset(f func(), addr ADDR, abandon bool) error
Reset(addr ADDR, abandon bool) error
}

type reset struct {
Expand Down Expand Up @@ -91,7 +91,7 @@ type Txm[
confirmer *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
fwdMgr txmgrtypes.ForwarderManager[ADDR]
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH]
sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ]
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RegisterResumeCallback(fn ResumeCallback) {
Expand Down Expand Up @@ -120,7 +120,7 @@ func NewTxm[
fwdMgr txmgrtypes.ForwarderManager[ADDR],
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH],
sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ],
broadcaster *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
confirmer *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
resender *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
Expand Down Expand Up @@ -196,13 +196,11 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx
})
}

// Reset stops Broadcaster/Confirmer, executes callback, then starts them
// again
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Reset(callback func(), addr ADDR, abandon bool) (err error) {
// Reset stops Broadcaster/Confirmer, executes callback, then starts them again
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Reset(addr ADDR, abandon bool) (err error) {
ok := b.IfStarted(func() {
done := make(chan error)
f := func() {
callback()
if abandon {
err = b.abandon(addr)
}
Expand Down Expand Up @@ -546,7 +544,7 @@ func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Cre
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetForwarderForEOA(addr ADDR) (fwdr ADDR, err error) {
return fwdr, err
}
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Reset(f func(), addr ADDR, abandon bool) error {
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Reset(addr ADDR, abandon bool) error {
return nil
}

Expand Down
3 changes: 0 additions & 3 deletions common/txmgr/types/keystore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package types

import (
"github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

// KeyStore encompasses the subset of keystore used by txmgr
Expand All @@ -17,8 +16,6 @@ type KeyStore[
SEQ types.Sequence,
] interface {
CheckEnabled(address ADDR, chainID CHAIN_ID) error
NextSequence(address ADDR, chainID CHAIN_ID, qopts ...pg.QOpt) (SEQ, error)
EnabledAddressesForChain(chainId CHAIN_ID) ([]ADDR, error)
IncrementNextSequence(address ADDR, chainID CHAIN_ID, currentSequence SEQ, qopts ...pg.QOpt) error
SubscribeToKeyChanges() (ch chan struct{}, unsub func())
}
Loading

0 comments on commit 4983951

Please sign in to comment.