From 07403c50cf5b436a51c3255753229c1461ee4b9a Mon Sep 17 00:00:00 2001 From: Farber98 Date: Thu, 21 Nov 2024 14:13:35 -0300 Subject: [PATCH] reduce rpc calls + refactors --- pkg/solana/txm/txm.go | 98 ++++++++++++----------------- pkg/solana/txm/txm_internal_test.go | 48 ++------------ pkg/solana/txm/txm_race_test.go | 21 ------- 3 files changed, 45 insertions(+), 122 deletions(-) diff --git a/pkg/solana/txm/txm.go b/pkg/solana/txm/txm.go index ba6cb7dcf..ea82593f3 100644 --- a/pkg/solana/txm/txm.go +++ b/pkg/solana/txm/txm.go @@ -183,24 +183,30 @@ func (txm *Txm) run() { // It prepares the transaction, builds and signs it, sends the initial transaction, and starts a retry routine with fee bumping if needed. // The function returns the signed transaction, its ID, and the initial signature for use in simulation. func (txm *Txm) sendWithRetry(ctx context.Context, msg pendingTx) (solanaGo.Transaction, string, solanaGo.Signature, error) { - // Prepare transaction assigning blockhash and lastValidBlockHeight (for expiration tracking). - // If required, it also performs balanceCheck and sets compute unit limit. - if err := txm.prepareTransaction(ctx, &msg); err != nil { - return solanaGo.Transaction{}, "", solanaGo.Signature{}, err - } - - // Build and sign initial transaction setting compute unit price + // Build and sign initial transaction setting compute unit price and limit initTx, err := txm.buildTx(ctx, msg, 0) if err != nil { return solanaGo.Transaction{}, "", solanaGo.Signature{}, err } + // Send initial transaction ctx, cancel := context.WithTimeout(ctx, msg.cfg.Timeout) - sig, err := txm.sendInitialTx(ctx, initTx, msg, cancel) + sig, err := txm.sendTx(ctx, &initTx) if err != nil { - return solanaGo.Transaction{}, "", solanaGo.Signature{}, err + // Do not retry and exit early if fails + cancel() + txm.txs.OnError(sig, txm.cfg.TxRetentionTimeout(), TxFailReject) //nolint // no need to check error since only incrementing metric here + return solanaGo.Transaction{}, "", solanaGo.Signature{}, fmt.Errorf("tx failed initial transmit: %w", err) + } + + // Store tx signature and cancel function + if err := txm.txs.New(msg, sig, cancel); err != nil { + cancel() // Cancel context when exiting early + return solanaGo.Transaction{}, "", solanaGo.Signature{}, fmt.Errorf("failed to save tx signature (%s) to inflight txs: %w", sig, err) } + txm.lggr.Debugw("tx initial broadcast", "id", msg.id, "fee", msg.cfg.BaseComputeUnitPrice, "signature", sig) + // Initialize signature list with initialTx signature. This list will be used to add new signatures and track retry attempts. sigs := &signatureList{} sigs.Allocate() @@ -219,31 +225,14 @@ func (txm *Txm) sendWithRetry(ctx context.Context, msg pendingTx) (solanaGo.Tran return initTx, msg.id, sig, nil } -// prepareTransaction sets blockhash and lastValidBlockHeight which will be used to track expiration. -// If required, it also performs balanceCheck and sets compute unit limit. -func (txm *Txm) prepareTransaction(ctx context.Context, msg *pendingTx) error { - client, err := txm.client.Get() - if err != nil { - return fmt.Errorf("failed to get client in sendWithRetry: %w", err) - } - blockhash, err := client.LatestBlockhash(ctx) - if err != nil { - return fmt.Errorf("failed to get blockhash: %w", err) - } - msg.tx.Message.RecentBlockhash = blockhash.Value.Blockhash - msg.lastValidBlockHeight = blockhash.Value.LastValidBlockHeight - +// buildTx builds and signs the transaction with the appropriate compute unit price. +func (txm *Txm) buildTx(ctx context.Context, msg pendingTx, retryCount int) (solanaGo.Transaction, error) { + // Set compute unit limit if specified if msg.cfg.ComputeUnitLimit != 0 { if err := fees.SetComputeUnitLimit(&msg.tx, fees.ComputeUnitLimit(msg.cfg.ComputeUnitLimit)); err != nil { - return fmt.Errorf("failed to add compute unit limit instruction: %w", err) + return solanaGo.Transaction{}, fmt.Errorf("failed to add compute unit limit instruction: %w", err) } } - - return nil -} - -// buildTx builds and signs the transaction with the appropriate compute unit price. -func (txm *Txm) buildTx(ctx context.Context, msg pendingTx, retryCount int) (solanaGo.Transaction, error) { // work with a copy newTx := msg.tx @@ -276,27 +265,6 @@ func (txm *Txm) buildTx(ctx context.Context, msg pendingTx, retryCount int) (sol return newTx, nil } -// sendInitialTx sends the initial tx and handles any errors that may occur. It also stores the transaction signature and cancellation function. -func (txm *Txm) sendInitialTx(ctx context.Context, initTx solanaGo.Transaction, msg pendingTx, cancel context.CancelFunc) (solanaGo.Signature, error) { - // Send initial transaction - sig, err := txm.sendTx(ctx, &initTx) - if err != nil { - // do not retry and exit early if fails - cancel() - txm.txs.OnError(sig, txm.cfg.TxRetentionTimeout(), TxFailReject) //nolint // no need to check error since only incrementing metric here - return solanaGo.Signature{}, fmt.Errorf("tx failed initial transmit: %w", err) - } - - // Store tx signature and cancel function - if err := txm.txs.New(msg, sig, cancel); err != nil { - cancel() // cancel context when exiting early - return solanaGo.Signature{}, fmt.Errorf("failed to save tx signature (%s) to inflight txs: %w", sig, err) - } - - txm.lggr.Debugw("tx initial broadcast", "id", msg.id, "fee", msg.cfg.BaseComputeUnitPrice, "signature", sig) - return sig, nil -} - // retryTx contains the logic for retrying the transaction, including exponential backoff and fee bumping. // Retries until context cancelled by timeout or called externally. // It uses handleRetry helper function to handle each retry attempt. @@ -413,9 +381,8 @@ func (txm *Txm) confirm() { case <-ctx.Done(): return case <-tick: - // If no signatures to confirm, we can break loop as there's nothing to process. - sigs := txm.txs.ListAll() - if len(sigs) == 0 { + // If no signatures to confirm and rebroadcast, we can break loop as there's nothing to process. + if txm.InflightTxs() == 0 { break } @@ -424,7 +391,7 @@ func (txm *Txm) confirm() { txm.lggr.Errorw("failed to get client in txm.confirm", "error", err) break } - txm.processConfirmations(ctx, client, sigs) + txm.processConfirmations(ctx, client) if txm.cfg.TxExpirationRebroadcast() { txm.rebroadcastExpiredTxs(ctx, client) } @@ -437,8 +404,8 @@ func (txm *Txm) confirm() { // It splits the signatures into batches, retrieves their statuses with an RPC call, and processes each status accordingly. // The function handles transitions, managing expiration, errors, and transitions between different states like broadcasted, processed, confirmed, and finalized. // It also determines when to end polling based on the status of each signature cancelling the exponential retry. -func (txm *Txm) processConfirmations(ctx context.Context, client client.ReaderWriter, sigs []solanaGo.Signature) { - sigsBatch, err := utils.BatchSplit(sigs, MaxSigsToConfirm) +func (txm *Txm) processConfirmations(ctx context.Context, client client.ReaderWriter) { + sigsBatch, err := utils.BatchSplit(txm.txs.ListAll(), MaxSigsToConfirm) if err != nil { // this should never happen txm.lggr.Fatalw("failed to batch signatures", "error", err) return @@ -586,7 +553,11 @@ func (txm *Txm) rebroadcastExpiredTxs(ctx context.Context, client client.ReaderW txm.lggr.Errorw("failed to get current slot height", "error", err) return } - + blockhash, err := client.LatestBlockhash(ctx) + if err != nil { + txm.lggr.Errorw("failed to get blockhash", "error", err) + return + } // Rebroadcast all expired txes for _, tx := range txm.txs.ListAllExpiredBroadcastedTxs(currHeight) { txm.lggr.Infow("transaction expired, rebroadcasting", "id", tx.id, "signature", tx.signatures) @@ -599,9 +570,17 @@ func (txm *Txm) rebroadcastExpiredTxs(ctx context.Context, client client.ReaderW txm.lggr.Errorw("failed to remove expired transaction", "id", tx.id, "error", err) continue } + // Overwrite blockhash and lastValidBlockHeight with latest values so that the transaction can be rebroadcasted an accepted. + tx.tx.Message.RecentBlockhash = blockhash.Value.Blockhash + rebroadcastTx := pendingTx{ + tx: tx.tx, + id: tx.id, + rebroadcastCount: tx.rebroadcastCount + 1, + lastValidBlockHeight: blockhash.Value.LastValidBlockHeight, + } // call sendWithRetry directly to avoid enqueuing // using same id in case it was set by caller and we need to maintain it. - _, _, _, err = txm.sendWithRetry(ctx, pendingTx{tx: tx.tx, id: tx.id, rebroadcastCount: tx.rebroadcastCount + 1}) + _, _, _, err = txm.sendWithRetry(ctx, rebroadcastTx) if err != nil { // TODO: add prebroadcast error handling when merged https://github.com/smartcontractkit/chainlink-solana/pull/936 txm.lggr.Errorw("failed to rebroadcast transaction", "id", tx.id, "error", err) @@ -859,6 +838,7 @@ func (txm *Txm) processSimulationError(id string, sig solanaGo.Signature, res *r } } +// InflightTxs returns the number of signatures being tracked for all transactions not yet finalized or errored func (txm *Txm) InflightTxs() int { return len(txm.txs.ListAll()) } diff --git a/pkg/solana/txm/txm_internal_test.go b/pkg/solana/txm/txm_internal_test.go index b6454af81..6fb044471 100644 --- a/pkg/solana/txm/txm_internal_test.go +++ b/pkg/solana/txm/txm_internal_test.go @@ -126,11 +126,6 @@ func TestTxm(t *testing.T) { cfg := config.NewDefault() cfg.Chain.FeeEstimatorMode = &estimator mc := mocks.NewReaderWriter(t) - mc.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{ - Value: &rpc.LatestBlockhashResult{ - LastValidBlockHeight: uint64(2000), - }, - }, nil) mc.On("GetLatestBlock", mock.Anything).Return(&rpc.GetBlockResult{}, nil).Maybe() mc.On("SlotHeight", mock.Anything).Return(uint64(0), nil).Maybe() @@ -768,11 +763,6 @@ func TestTxm_disabled_confirm_timeout_with_retention(t *testing.T) { // Enable retention timeout to keep transactions after finality cfg.Chain.TxRetentionTimeout = relayconfig.MustNewDuration(5 * time.Second) mc := mocks.NewReaderWriter(t) - mc.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{ - Value: &rpc.LatestBlockhashResult{ - LastValidBlockHeight: uint64(2000), - }, - }, nil) mc.On("GetLatestBlock", mock.Anything).Return(&rpc.GetBlockResult{}, nil).Maybe() mc.On("SlotHeight", mock.Anything).Return(uint64(0), nil).Maybe() @@ -886,11 +876,6 @@ func TestTxm_compute_unit_limit_estimation(t *testing.T) { // Enable retention timeout to keep transactions after finality or error cfg.Chain.TxRetentionTimeout = relayconfig.MustNewDuration(5 * time.Second) mc := mocks.NewReaderWriter(t) - mc.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{ - Value: &rpc.LatestBlockhashResult{ - LastValidBlockHeight: uint64(2000), - }, - }, nil) mc.On("GetLatestBlock", mock.Anything).Return(&rpc.GetBlockResult{}, nil).Maybe() mc.On("SlotHeight", mock.Anything).Return(uint64(0), nil).Maybe() @@ -1017,11 +1002,6 @@ func TestTxm_Enqueue(t *testing.T) { lggr := logger.Test(t) cfg := config.NewDefault() mc := mocks.NewReaderWriter(t) - mc.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{ - Value: &rpc.LatestBlockhashResult{ - LastValidBlockHeight: uint64(2000), - }, - }, nil) mc.On("SendTx", mock.Anything, mock.Anything).Return(solana.Signature{}, nil).Maybe() mc.On("SimulateTx", mock.Anything, mock.Anything, mock.Anything).Return(&rpc.SimulateTransactionResult{}, nil).Maybe() mc.On("SignatureStatuses", mock.Anything, mock.AnythingOfType("[]solana.Signature")).Return( @@ -1113,37 +1093,24 @@ func addSigAndLimitToTx(t *testing.T, keystore SimpleKeystore, pubkey solana.Pub func TestTxm_ExpirationRebroadcast(t *testing.T) { t.Parallel() - // Set up configurations estimator := "fixed" id := "mocknet-" + estimator + "-" + uuid.NewString() t.Logf("Starting new iteration: %s", id) - ctx := tests.Context(t) lggr := logger.Test(t) cfg := config.NewDefault() cfg.Chain.FeeEstimatorMode = &estimator - - // Enable TxExpirationRebroadcast txExpirationRebroadcast := true - cfg.Chain.TxExpirationRebroadcast = &txExpirationRebroadcast + cfg.Chain.TxExpirationRebroadcast = &txExpirationRebroadcast // enable expiration rebroadcast cfg.Chain.TxConfirmTimeout = relayconfig.MustNewDuration(5 * time.Second) - // Enable retention timeout to keep transactions after finality so we can check. - cfg.Chain.TxRetentionTimeout = relayconfig.MustNewDuration(15 * time.Second) + cfg.Chain.TxRetentionTimeout = relayconfig.MustNewDuration(10 * time.Second) // Enable retention to keep transactions after finality and be able to check. mc := mocks.NewReaderWriter(t) - // Set up LatestBlockhash to return different LastValidBlockHeight values - latestBlockhashCallCount := 0 + // First blockhash is set on sender. Second blockhash (the one returned here) is set on txExpirationRebroadcast before rebroadcasting. + // The first one will be invalid as it's initialized in 0 by default. This call will get a valid one greater than slotHeight and go through. mc.On("LatestBlockhash", mock.Anything).Return(func(_ context.Context) (*rpc.GetLatestBlockhashResult, error) { - latestBlockhashCallCount++ - if latestBlockhashCallCount == 1 { - return &rpc.GetLatestBlockhashResult{ - Value: &rpc.LatestBlockhashResult{ - LastValidBlockHeight: uint64(1000), - }, - }, nil - } return &rpc.GetLatestBlockhashResult{ Value: &rpc.LatestBlockhashResult{ LastValidBlockHeight: uint64(2000), @@ -1151,7 +1118,7 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) { }, nil }).Maybe() - // Set up SlotHeight to return a value greater than the initial LastValidBlockHeight + // Set up SlotHeight to return a value greater than 0 so the initial LastValidBlockHeight is invalid. mc.On("SlotHeight", mock.Anything).Return(uint64(1500), nil).Maybe() mkey := keyMocks.NewSimpleKeystore(t) mkey.On("Sign", mock.Anything, mock.Anything, mock.Anything).Return([]byte{}, nil) @@ -1159,7 +1126,6 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) { txm := NewTxm(id, loader, nil, cfg, mkey, lggr) require.NoError(t, txm.Start(ctx)) t.Cleanup(func() { require.NoError(t, txm.Close()) }) - sig1 := randomSignature(t) mc.On("SendTx", mock.Anything, mock.Anything).Return(sig1, nil).Maybe() mc.On("SimulateTx", mock.Anything, mock.Anything, mock.Anything).Return(&rpc.SimulateTransactionResult{}, nil).Maybe() @@ -1205,13 +1171,11 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) { } } } - tx, _ := getTx(t, 0, mkey) txID := "test" - assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &txID), SetTimeout(10*time.Second)) + assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &txID)) // Will create a expired transaction as lastValidBlockHeight is 0 by default. wg.Wait() time.Sleep(2 * time.Second) // Sleep to allow for rebroadcasting - // Check that transaction for txID has been finalized and rebroadcasted status, err := txm.GetTransactionStatus(ctx, txID) require.NoError(t, err) diff --git a/pkg/solana/txm/txm_race_test.go b/pkg/solana/txm/txm_race_test.go index f008023d0..ea175e63b 100644 --- a/pkg/solana/txm/txm_race_test.go +++ b/pkg/solana/txm/txm_race_test.go @@ -9,7 +9,6 @@ import ( "time" solanaGo "github.com/gagliardetto/solana-go" - "github.com/gagliardetto/solana-go/rpc" "go.uber.org/zap/zapcore" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -117,11 +116,6 @@ func TestTxm_SendWithRetry_Race(t *testing.T) { }, nil, ) - client.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{ - Value: &rpc.LatestBlockhashResult{ - LastValidBlockHeight: uint64(2000), - }, - }, nil) testRunner(t, client) }) @@ -158,11 +152,6 @@ func TestTxm_SendWithRetry_Race(t *testing.T) { }, nil, ) - client.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{ - Value: &rpc.LatestBlockhashResult{ - LastValidBlockHeight: uint64(2000), - }, - }, nil) testRunner(t, client) }) @@ -210,11 +199,6 @@ func TestTxm_SendWithRetry_Race(t *testing.T) { }, nil, ) - client.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{ - Value: &rpc.LatestBlockhashResult{ - LastValidBlockHeight: uint64(2000), - }, - }, nil) testRunner(t, client) }) @@ -249,11 +233,6 @@ func TestTxm_SendWithRetry_Race(t *testing.T) { require.NoError(t, fees.SetComputeUnitLimit(&msg3.tx, 200_000)) msg3.tx.Signatures = make([]solanaGo.Signature, 1) client.On("SendTx", mock.Anything, &msg3.tx).Return(solanaGo.Signature{4}, nil) - client.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{ - Value: &rpc.LatestBlockhashResult{ - LastValidBlockHeight: uint64(2000), - }, - }, nil) testRunner(t, client) }) }