Skip to content

Commit

Permalink
reduce rpc calls + refactors
Browse files Browse the repository at this point in the history
  • Loading branch information
Farber98 committed Nov 21, 2024
1 parent da4dc2f commit 07403c5
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 122 deletions.
98 changes: 39 additions & 59 deletions pkg/solana/txm/txm.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,24 +183,30 @@ func (txm *Txm) run() {
// It prepares the transaction, builds and signs it, sends the initial transaction, and starts a retry routine with fee bumping if needed.
// The function returns the signed transaction, its ID, and the initial signature for use in simulation.
func (txm *Txm) sendWithRetry(ctx context.Context, msg pendingTx) (solanaGo.Transaction, string, solanaGo.Signature, error) {
// Prepare transaction assigning blockhash and lastValidBlockHeight (for expiration tracking).
// If required, it also performs balanceCheck and sets compute unit limit.
if err := txm.prepareTransaction(ctx, &msg); err != nil {
return solanaGo.Transaction{}, "", solanaGo.Signature{}, err
}

// Build and sign initial transaction setting compute unit price
// Build and sign initial transaction setting compute unit price and limit
initTx, err := txm.buildTx(ctx, msg, 0)
if err != nil {
return solanaGo.Transaction{}, "", solanaGo.Signature{}, err
}

// Send initial transaction
ctx, cancel := context.WithTimeout(ctx, msg.cfg.Timeout)
sig, err := txm.sendInitialTx(ctx, initTx, msg, cancel)
sig, err := txm.sendTx(ctx, &initTx)
if err != nil {
return solanaGo.Transaction{}, "", solanaGo.Signature{}, err
// Do not retry and exit early if fails
cancel()
txm.txs.OnError(sig, txm.cfg.TxRetentionTimeout(), TxFailReject) //nolint // no need to check error since only incrementing metric here
return solanaGo.Transaction{}, "", solanaGo.Signature{}, fmt.Errorf("tx failed initial transmit: %w", err)
}

// Store tx signature and cancel function
if err := txm.txs.New(msg, sig, cancel); err != nil {
cancel() // Cancel context when exiting early
return solanaGo.Transaction{}, "", solanaGo.Signature{}, fmt.Errorf("failed to save tx signature (%s) to inflight txs: %w", sig, err)
}

txm.lggr.Debugw("tx initial broadcast", "id", msg.id, "fee", msg.cfg.BaseComputeUnitPrice, "signature", sig)

// Initialize signature list with initialTx signature. This list will be used to add new signatures and track retry attempts.
sigs := &signatureList{}
sigs.Allocate()
Expand All @@ -219,31 +225,14 @@ func (txm *Txm) sendWithRetry(ctx context.Context, msg pendingTx) (solanaGo.Tran
return initTx, msg.id, sig, nil
}

// prepareTransaction sets blockhash and lastValidBlockHeight which will be used to track expiration.
// If required, it also performs balanceCheck and sets compute unit limit.
func (txm *Txm) prepareTransaction(ctx context.Context, msg *pendingTx) error {
client, err := txm.client.Get()
if err != nil {
return fmt.Errorf("failed to get client in sendWithRetry: %w", err)
}
blockhash, err := client.LatestBlockhash(ctx)
if err != nil {
return fmt.Errorf("failed to get blockhash: %w", err)
}
msg.tx.Message.RecentBlockhash = blockhash.Value.Blockhash
msg.lastValidBlockHeight = blockhash.Value.LastValidBlockHeight

// buildTx builds and signs the transaction with the appropriate compute unit price.
func (txm *Txm) buildTx(ctx context.Context, msg pendingTx, retryCount int) (solanaGo.Transaction, error) {
// Set compute unit limit if specified
if msg.cfg.ComputeUnitLimit != 0 {
if err := fees.SetComputeUnitLimit(&msg.tx, fees.ComputeUnitLimit(msg.cfg.ComputeUnitLimit)); err != nil {
return fmt.Errorf("failed to add compute unit limit instruction: %w", err)
return solanaGo.Transaction{}, fmt.Errorf("failed to add compute unit limit instruction: %w", err)
}
}

return nil
}

// buildTx builds and signs the transaction with the appropriate compute unit price.
func (txm *Txm) buildTx(ctx context.Context, msg pendingTx, retryCount int) (solanaGo.Transaction, error) {
// work with a copy
newTx := msg.tx

Expand Down Expand Up @@ -276,27 +265,6 @@ func (txm *Txm) buildTx(ctx context.Context, msg pendingTx, retryCount int) (sol
return newTx, nil
}

// sendInitialTx sends the initial tx and handles any errors that may occur. It also stores the transaction signature and cancellation function.
func (txm *Txm) sendInitialTx(ctx context.Context, initTx solanaGo.Transaction, msg pendingTx, cancel context.CancelFunc) (solanaGo.Signature, error) {
// Send initial transaction
sig, err := txm.sendTx(ctx, &initTx)
if err != nil {
// do not retry and exit early if fails
cancel()
txm.txs.OnError(sig, txm.cfg.TxRetentionTimeout(), TxFailReject) //nolint // no need to check error since only incrementing metric here
return solanaGo.Signature{}, fmt.Errorf("tx failed initial transmit: %w", err)
}

// Store tx signature and cancel function
if err := txm.txs.New(msg, sig, cancel); err != nil {
cancel() // cancel context when exiting early
return solanaGo.Signature{}, fmt.Errorf("failed to save tx signature (%s) to inflight txs: %w", sig, err)
}

txm.lggr.Debugw("tx initial broadcast", "id", msg.id, "fee", msg.cfg.BaseComputeUnitPrice, "signature", sig)
return sig, nil
}

// retryTx contains the logic for retrying the transaction, including exponential backoff and fee bumping.
// Retries until context cancelled by timeout or called externally.
// It uses handleRetry helper function to handle each retry attempt.
Expand Down Expand Up @@ -413,9 +381,8 @@ func (txm *Txm) confirm() {
case <-ctx.Done():
return
case <-tick:
// If no signatures to confirm, we can break loop as there's nothing to process.
sigs := txm.txs.ListAll()
if len(sigs) == 0 {
// If no signatures to confirm and rebroadcast, we can break loop as there's nothing to process.
if txm.InflightTxs() == 0 {
break
}

Expand All @@ -424,7 +391,7 @@ func (txm *Txm) confirm() {
txm.lggr.Errorw("failed to get client in txm.confirm", "error", err)
break
}
txm.processConfirmations(ctx, client, sigs)
txm.processConfirmations(ctx, client)
if txm.cfg.TxExpirationRebroadcast() {
txm.rebroadcastExpiredTxs(ctx, client)
}
Expand All @@ -437,8 +404,8 @@ func (txm *Txm) confirm() {
// It splits the signatures into batches, retrieves their statuses with an RPC call, and processes each status accordingly.
// The function handles transitions, managing expiration, errors, and transitions between different states like broadcasted, processed, confirmed, and finalized.
// It also determines when to end polling based on the status of each signature cancelling the exponential retry.
func (txm *Txm) processConfirmations(ctx context.Context, client client.ReaderWriter, sigs []solanaGo.Signature) {
sigsBatch, err := utils.BatchSplit(sigs, MaxSigsToConfirm)
func (txm *Txm) processConfirmations(ctx context.Context, client client.ReaderWriter) {
sigsBatch, err := utils.BatchSplit(txm.txs.ListAll(), MaxSigsToConfirm)
if err != nil { // this should never happen
txm.lggr.Fatalw("failed to batch signatures", "error", err)
return
Expand Down Expand Up @@ -586,7 +553,11 @@ func (txm *Txm) rebroadcastExpiredTxs(ctx context.Context, client client.ReaderW
txm.lggr.Errorw("failed to get current slot height", "error", err)
return
}

blockhash, err := client.LatestBlockhash(ctx)
if err != nil {
txm.lggr.Errorw("failed to get blockhash", "error", err)
return
}
// Rebroadcast all expired txes
for _, tx := range txm.txs.ListAllExpiredBroadcastedTxs(currHeight) {
txm.lggr.Infow("transaction expired, rebroadcasting", "id", tx.id, "signature", tx.signatures)
Expand All @@ -599,9 +570,17 @@ func (txm *Txm) rebroadcastExpiredTxs(ctx context.Context, client client.ReaderW
txm.lggr.Errorw("failed to remove expired transaction", "id", tx.id, "error", err)
continue
}
// Overwrite blockhash and lastValidBlockHeight with latest values so that the transaction can be rebroadcasted an accepted.
tx.tx.Message.RecentBlockhash = blockhash.Value.Blockhash
rebroadcastTx := pendingTx{
tx: tx.tx,
id: tx.id,
rebroadcastCount: tx.rebroadcastCount + 1,
lastValidBlockHeight: blockhash.Value.LastValidBlockHeight,
}
// call sendWithRetry directly to avoid enqueuing
// using same id in case it was set by caller and we need to maintain it.
_, _, _, err = txm.sendWithRetry(ctx, pendingTx{tx: tx.tx, id: tx.id, rebroadcastCount: tx.rebroadcastCount + 1})
_, _, _, err = txm.sendWithRetry(ctx, rebroadcastTx)
if err != nil {
// TODO: add prebroadcast error handling when merged https://github.com/smartcontractkit/chainlink-solana/pull/936
txm.lggr.Errorw("failed to rebroadcast transaction", "id", tx.id, "error", err)
Expand Down Expand Up @@ -859,6 +838,7 @@ func (txm *Txm) processSimulationError(id string, sig solanaGo.Signature, res *r
}
}

// InflightTxs returns the number of signatures being tracked for all transactions not yet finalized or errored
func (txm *Txm) InflightTxs() int {
return len(txm.txs.ListAll())
}
Expand Down
48 changes: 6 additions & 42 deletions pkg/solana/txm/txm_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,6 @@ func TestTxm(t *testing.T) {
cfg := config.NewDefault()
cfg.Chain.FeeEstimatorMode = &estimator
mc := mocks.NewReaderWriter(t)
mc.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: uint64(2000),
},
}, nil)
mc.On("GetLatestBlock", mock.Anything).Return(&rpc.GetBlockResult{}, nil).Maybe()
mc.On("SlotHeight", mock.Anything).Return(uint64(0), nil).Maybe()

Expand Down Expand Up @@ -768,11 +763,6 @@ func TestTxm_disabled_confirm_timeout_with_retention(t *testing.T) {
// Enable retention timeout to keep transactions after finality
cfg.Chain.TxRetentionTimeout = relayconfig.MustNewDuration(5 * time.Second)
mc := mocks.NewReaderWriter(t)
mc.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: uint64(2000),
},
}, nil)
mc.On("GetLatestBlock", mock.Anything).Return(&rpc.GetBlockResult{}, nil).Maybe()
mc.On("SlotHeight", mock.Anything).Return(uint64(0), nil).Maybe()

Expand Down Expand Up @@ -886,11 +876,6 @@ func TestTxm_compute_unit_limit_estimation(t *testing.T) {
// Enable retention timeout to keep transactions after finality or error
cfg.Chain.TxRetentionTimeout = relayconfig.MustNewDuration(5 * time.Second)
mc := mocks.NewReaderWriter(t)
mc.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: uint64(2000),
},
}, nil)
mc.On("GetLatestBlock", mock.Anything).Return(&rpc.GetBlockResult{}, nil).Maybe()
mc.On("SlotHeight", mock.Anything).Return(uint64(0), nil).Maybe()

Expand Down Expand Up @@ -1017,11 +1002,6 @@ func TestTxm_Enqueue(t *testing.T) {
lggr := logger.Test(t)
cfg := config.NewDefault()
mc := mocks.NewReaderWriter(t)
mc.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: uint64(2000),
},
}, nil)
mc.On("SendTx", mock.Anything, mock.Anything).Return(solana.Signature{}, nil).Maybe()
mc.On("SimulateTx", mock.Anything, mock.Anything, mock.Anything).Return(&rpc.SimulateTransactionResult{}, nil).Maybe()
mc.On("SignatureStatuses", mock.Anything, mock.AnythingOfType("[]solana.Signature")).Return(
Expand Down Expand Up @@ -1113,53 +1093,39 @@ func addSigAndLimitToTx(t *testing.T, keystore SimpleKeystore, pubkey solana.Pub

func TestTxm_ExpirationRebroadcast(t *testing.T) {
t.Parallel()

// Set up configurations
estimator := "fixed"
id := "mocknet-" + estimator + "-" + uuid.NewString()
t.Logf("Starting new iteration: %s", id)

ctx := tests.Context(t)
lggr := logger.Test(t)
cfg := config.NewDefault()
cfg.Chain.FeeEstimatorMode = &estimator

// Enable TxExpirationRebroadcast
txExpirationRebroadcast := true
cfg.Chain.TxExpirationRebroadcast = &txExpirationRebroadcast
cfg.Chain.TxExpirationRebroadcast = &txExpirationRebroadcast // enable expiration rebroadcast
cfg.Chain.TxConfirmTimeout = relayconfig.MustNewDuration(5 * time.Second)
// Enable retention timeout to keep transactions after finality so we can check.
cfg.Chain.TxRetentionTimeout = relayconfig.MustNewDuration(15 * time.Second)
cfg.Chain.TxRetentionTimeout = relayconfig.MustNewDuration(10 * time.Second) // Enable retention to keep transactions after finality and be able to check.

mc := mocks.NewReaderWriter(t)

// Set up LatestBlockhash to return different LastValidBlockHeight values
latestBlockhashCallCount := 0
// First blockhash is set on sender. Second blockhash (the one returned here) is set on txExpirationRebroadcast before rebroadcasting.
// The first one will be invalid as it's initialized in 0 by default. This call will get a valid one greater than slotHeight and go through.
mc.On("LatestBlockhash", mock.Anything).Return(func(_ context.Context) (*rpc.GetLatestBlockhashResult, error) {
latestBlockhashCallCount++
if latestBlockhashCallCount == 1 {
return &rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: uint64(1000),
},
}, nil
}
return &rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: uint64(2000),
},
}, nil
}).Maybe()

// Set up SlotHeight to return a value greater than the initial LastValidBlockHeight
// Set up SlotHeight to return a value greater than 0 so the initial LastValidBlockHeight is invalid.
mc.On("SlotHeight", mock.Anything).Return(uint64(1500), nil).Maybe()
mkey := keyMocks.NewSimpleKeystore(t)
mkey.On("Sign", mock.Anything, mock.Anything, mock.Anything).Return([]byte{}, nil)
loader := utils.NewLazyLoad(func() (client.ReaderWriter, error) { return mc, nil })
txm := NewTxm(id, loader, nil, cfg, mkey, lggr)
require.NoError(t, txm.Start(ctx))
t.Cleanup(func() { require.NoError(t, txm.Close()) })

sig1 := randomSignature(t)
mc.On("SendTx", mock.Anything, mock.Anything).Return(sig1, nil).Maybe()
mc.On("SimulateTx", mock.Anything, mock.Anything, mock.Anything).Return(&rpc.SimulateTransactionResult{}, nil).Maybe()
Expand Down Expand Up @@ -1205,13 +1171,11 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) {
}
}
}

tx, _ := getTx(t, 0, mkey)
txID := "test"
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &txID), SetTimeout(10*time.Second))
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &txID)) // Will create a expired transaction as lastValidBlockHeight is 0 by default.
wg.Wait()
time.Sleep(2 * time.Second) // Sleep to allow for rebroadcasting

// Check that transaction for txID has been finalized and rebroadcasted
status, err := txm.GetTransactionStatus(ctx, txID)
require.NoError(t, err)
Expand Down
21 changes: 0 additions & 21 deletions pkg/solana/txm/txm_race_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

solanaGo "github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"
"go.uber.org/zap/zapcore"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
Expand Down Expand Up @@ -117,11 +116,6 @@ func TestTxm_SendWithRetry_Race(t *testing.T) {
},
nil,
)
client.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: uint64(2000),
},
}, nil)
testRunner(t, client)
})

Expand Down Expand Up @@ -158,11 +152,6 @@ func TestTxm_SendWithRetry_Race(t *testing.T) {
},
nil,
)
client.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: uint64(2000),
},
}, nil)
testRunner(t, client)
})

Expand Down Expand Up @@ -210,11 +199,6 @@ func TestTxm_SendWithRetry_Race(t *testing.T) {
},
nil,
)
client.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: uint64(2000),
},
}, nil)
testRunner(t, client)
})

Expand Down Expand Up @@ -249,11 +233,6 @@ func TestTxm_SendWithRetry_Race(t *testing.T) {
require.NoError(t, fees.SetComputeUnitLimit(&msg3.tx, 200_000))
msg3.tx.Signatures = make([]solanaGo.Signature, 1)
client.On("SendTx", mock.Anything, &msg3.tx).Return(solanaGo.Signature{4}, nil)
client.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: uint64(2000),
},
}, nil)
testRunner(t, client)
})
}

0 comments on commit 07403c5

Please sign in to comment.