diff --git a/pkg/solana/txm/pendingtx.go b/pkg/solana/txm/pendingtx.go index 540a1221c..44895fd40 100644 --- a/pkg/solana/txm/pendingtx.go +++ b/pkg/solana/txm/pendingtx.go @@ -47,8 +47,6 @@ type PendingTxContext interface { GetTxState(id string) (TxState, error) // TrimFinalizedErroredTxs removes transactions that have reached their retention time TrimFinalizedErroredTxs() int - // GetTxRebroadcastCount returns the number of times a transaction has been rebroadcasted if found. - GetTxRebroadcastCount(id string) (int, error) } // finishedTx is used to store info required to track transactions to finality or error @@ -57,7 +55,6 @@ type pendingTx struct { cfg TxConfig signatures []solana.Signature id string - rebroadcastCount int createTs time.Time state TxState lastValidBlockHeight uint64 // to track expiration @@ -65,9 +62,8 @@ type pendingTx struct { // finishedTx is used to store minimal info specifically for finalized or errored transactions for external status checks type finishedTx struct { - retentionTs time.Time - state TxState - rebroadcastCount int + retentionTs time.Time + state TxState } var _ PendingTxContext = &pendingTxContext{} @@ -401,9 +397,8 @@ func (c *pendingTxContext) OnFinalized(sig solana.Signature, retentionTimeout ti return id, nil } finalizedTx := finishedTx{ - state: Finalized, - retentionTs: time.Now().Add(retentionTimeout), - rebroadcastCount: tx.rebroadcastCount, + state: Finalized, + retentionTs: time.Now().Add(retentionTimeout), } // move transaction from confirmed to finalized map c.finalizedErroredTxs[id] = finalizedTx @@ -443,9 +438,8 @@ func (c *pendingTxContext) OnPrebroadcastError(id string, retentionTimeout time. return "", ErrIDAlreadyExists } erroredTx := finishedTx{ - state: txState, - retentionTs: time.Now().Add(retentionTimeout), - rebroadcastCount: tx.rebroadcastCount, + state: txState, + retentionTs: time.Now().Add(retentionTimeout), } // add transaction to error map c.finalizedErroredTxs[id] = erroredTx @@ -510,9 +504,8 @@ func (c *pendingTxContext) OnError(sig solana.Signature, retentionTimeout time.D return id, nil } erroredTx := finishedTx{ - state: txState, - retentionTs: time.Now().Add(retentionTimeout), - rebroadcastCount: tx.rebroadcastCount, + state: txState, + retentionTs: time.Now().Add(retentionTimeout), } // move transaction from broadcasted to error map c.finalizedErroredTxs[id] = erroredTx @@ -575,22 +568,6 @@ func (c *pendingTxContext) withWriteLock(fn func() (string, error)) (string, err return fn() } -// GetTxRebroadcastCount returns the number of times a transaction has been rebroadcasted if found. -func (c *pendingTxContext) GetTxRebroadcastCount(id string) (int, error) { - c.lock.RLock() - defer c.lock.RUnlock() - if tx, exists := c.broadcastedProcessedTxs[id]; exists { - return tx.rebroadcastCount, nil - } - if tx, exists := c.confirmedTxs[id]; exists { - return tx.rebroadcastCount, nil - } - if tx, exists := c.finalizedErroredTxs[id]; exists { - return tx.rebroadcastCount, nil - } - return 0, fmt.Errorf("failed to find transaction for id: %s", id) -} - var _ PendingTxContext = &pendingTxContextWithProm{} type pendingTxContextWithProm struct { @@ -705,7 +682,3 @@ func (c *pendingTxContextWithProm) GetTxState(id string) (TxState, error) { func (c *pendingTxContextWithProm) TrimFinalizedErroredTxs() int { return c.pendingTx.TrimFinalizedErroredTxs() } - -func (c *pendingTxContextWithProm) GetTxRebroadcastCount(id string) (int, error) { - return c.pendingTx.GetTxRebroadcastCount(id) -} diff --git a/pkg/solana/txm/txm.go b/pkg/solana/txm/txm.go index 27f2e641e..3ba39f2f5 100644 --- a/pkg/solana/txm/txm.go +++ b/pkg/solana/txm/txm.go @@ -589,10 +589,9 @@ func (txm *Txm) rebroadcastExpiredTxs(ctx context.Context, client client.ReaderW continue } rebroadcastTx := pendingTx{ - tx: tx.tx, - cfg: tx.cfg, - id: tx.id, // using same id in case it was set by caller and we need to maintain it. - rebroadcastCount: tx.rebroadcastCount + 1, + tx: tx.tx, + cfg: tx.cfg, + id: tx.id, // using same id in case it was set by caller and we need to maintain it. } // call sendWithRetry directly to avoid enqueuing _, _, _, sendErr := txm.sendWithRetry(ctx, rebroadcastTx) diff --git a/pkg/solana/txm/txm_internal_test.go b/pkg/solana/txm/txm_internal_test.go index 5eb1d31a0..95017a29e 100644 --- a/pkg/solana/txm/txm_internal_test.go +++ b/pkg/solana/txm/txm_internal_test.go @@ -1362,13 +1362,11 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) { prom.finalized++ prom.assertEqual(t) - // Check that transaction for txID has been finalized and rebroadcasted + // Check that transaction for txID has been finalized and rebroadcasted 1 time. status, err := txm.GetTransactionStatus(ctx, txID) require.NoError(t, err) require.Equal(t, types.Finalized, status) - rebroadcastCount, err := txm.txs.GetTxRebroadcastCount(txID) - require.NoError(t, err) - require.Equal(t, 1, rebroadcastCount) + require.Equal(t, 1, callCount-1) // -1 because the first call is not a rebroadcast }) t.Run("WithoutRebroadcast", func(t *testing.T) { @@ -1376,7 +1374,9 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) { statuses := map[solana.Signature]func() *rpc.SignatureStatusesResult{} // mocking the call within sendWithRetry. Rebroadcast is off, so we won't compare it against the slotHeight. + callCount := 0 latestBlockhashFunc := func() (*rpc.GetLatestBlockhashResult, error) { + defer func() { callCount++ }() return &rpc.GetLatestBlockhashResult{ Value: &rpc.LatestBlockhashResult{ LastValidBlockHeight: uint64(2000), @@ -1419,9 +1419,7 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) { status, err := txm.GetTransactionStatus(ctx, txID) require.NoError(t, err) require.Equal(t, types.Failed, status) - rebroadcastCount, err := txm.txs.GetTxRebroadcastCount(txID) - require.NoError(t, err) - require.Equal(t, 0, rebroadcastCount) + require.Equal(t, 0, callCount-1) // -1 because the first call is not a rebroadcast }) t.Run("WithMultipleRebroadcast", func(t *testing.T) { @@ -1495,13 +1493,11 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) { prom.finalized++ prom.assertEqual(t) - // Check that transaction for txID has been finalized and rebroadcasted + // Check that transaction for txID has been finalized and rebroadcasted multiple times. status, err := txm.GetTransactionStatus(ctx, txID) require.NoError(t, err) require.Equal(t, types.Finalized, status) - rebroadcastCount, err := txm.txs.GetTxRebroadcastCount(txID) - require.NoError(t, err) - require.Equal(t, expectedRebroadcastsCount, rebroadcastCount) + require.Equal(t, expectedRebroadcastsCount, callCount-1) }) t.Run("ConfirmedBeforeRebroadcast", func(t *testing.T) { @@ -1517,7 +1513,10 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) { slotHeightFunc := func() (uint64, error) { return uint64(1500), nil } + + callCount := 0 latestBlockhashFunc := func() (*rpc.GetLatestBlockhashResult, error) { + defer func() { callCount++ }() return &rpc.GetLatestBlockhashResult{ Value: &rpc.LatestBlockhashResult{ LastValidBlockHeight: uint64(1000), @@ -1561,9 +1560,7 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) { status, err := txm.GetTransactionStatus(ctx, txID) require.NoError(t, err) require.Equal(t, types.Finalized, status) - rebroadcastCount, err := txm.txs.GetTxRebroadcastCount(txID) - require.NoError(t, err) - require.Equal(t, 0, rebroadcastCount) + require.Equal(t, 0, callCount-1) // -1 because the first call is not a rebroadcast }) t.Run("RebroadcastWithError", func(t *testing.T) { @@ -1621,12 +1618,10 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) { prom.error++ prom.assertEqual(t) - // Transaction should be moved to failed after trying to rebroadcast and failing to get confirmations + // Transaction should be moved to failed after trying to rebroadcast 1 time. status, err := txm.GetTransactionStatus(ctx, txID) require.NoError(t, err) require.Equal(t, types.Failed, status) - rebroadcastCount, err := txm.txs.GetTxRebroadcastCount(txID) - require.NoError(t, err) - require.Equal(t, 1, rebroadcastCount) + require.Equal(t, 1, callCount-1) // -1 because the first call is not a rebroadcast }) } diff --git a/pkg/solana/txm/txm_load_test.go b/pkg/solana/txm/txm_load_test.go index 3d4941374..aa3d6aac7 100644 --- a/pkg/solana/txm/txm_load_test.go +++ b/pkg/solana/txm/txm_load_test.go @@ -1,6 +1,6 @@ //go:build integration -package txm +package txm_test import ( "context" @@ -16,6 +16,7 @@ import ( solanaClient "github.com/smartcontractkit/chainlink-solana/pkg/solana/client" "github.com/smartcontractkit/chainlink-solana/pkg/solana/config" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/txm" keyMocks "github.com/smartcontractkit/chainlink-solana/pkg/solana/txm/mocks" relayconfig "github.com/smartcontractkit/chainlink-common/pkg/config" @@ -71,7 +72,7 @@ func TestTxm_Integration(t *testing.T) { client, err := solanaClient.NewClient(url, cfg, 2*time.Second, lggr) require.NoError(t, err) loader := utils.NewLazyLoad(func() (solanaClient.ReaderWriter, error) { return client, nil }) - txm := NewTxm("localnet", loader, nil, cfg, mkey, lggr) + txm := txm.NewTxm("localnet", loader, nil, cfg, mkey, lggr) // track initial balance initBal, err := client.Balance(ctx, pubKey)