Skip to content

Commit

Permalink
remove rebroadcast count and fix package
Browse files Browse the repository at this point in the history
  • Loading branch information
Farber98 committed Nov 27, 2024
1 parent 56a64da commit 9148d7d
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 59 deletions.
43 changes: 8 additions & 35 deletions pkg/solana/txm/pendingtx.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ type PendingTxContext interface {
GetTxState(id string) (TxState, error)
// TrimFinalizedErroredTxs removes transactions that have reached their retention time
TrimFinalizedErroredTxs() int
// GetTxRebroadcastCount returns the number of times a transaction has been rebroadcasted if found.
GetTxRebroadcastCount(id string) (int, error)
}

// finishedTx is used to store info required to track transactions to finality or error
Expand All @@ -57,17 +55,15 @@ type pendingTx struct {
cfg TxConfig
signatures []solana.Signature
id string
rebroadcastCount int
createTs time.Time
state TxState
lastValidBlockHeight uint64 // to track expiration
}

// finishedTx is used to store minimal info specifically for finalized or errored transactions for external status checks
type finishedTx struct {
retentionTs time.Time
state TxState
rebroadcastCount int
retentionTs time.Time
state TxState
}

var _ PendingTxContext = &pendingTxContext{}
Expand Down Expand Up @@ -401,9 +397,8 @@ func (c *pendingTxContext) OnFinalized(sig solana.Signature, retentionTimeout ti
return id, nil
}
finalizedTx := finishedTx{
state: Finalized,
retentionTs: time.Now().Add(retentionTimeout),
rebroadcastCount: tx.rebroadcastCount,
state: Finalized,
retentionTs: time.Now().Add(retentionTimeout),
}
// move transaction from confirmed to finalized map
c.finalizedErroredTxs[id] = finalizedTx
Expand Down Expand Up @@ -443,9 +438,8 @@ func (c *pendingTxContext) OnPrebroadcastError(id string, retentionTimeout time.
return "", ErrIDAlreadyExists
}
erroredTx := finishedTx{
state: txState,
retentionTs: time.Now().Add(retentionTimeout),
rebroadcastCount: tx.rebroadcastCount,
state: txState,
retentionTs: time.Now().Add(retentionTimeout),
}
// add transaction to error map
c.finalizedErroredTxs[id] = erroredTx
Expand Down Expand Up @@ -510,9 +504,8 @@ func (c *pendingTxContext) OnError(sig solana.Signature, retentionTimeout time.D
return id, nil
}
erroredTx := finishedTx{
state: txState,
retentionTs: time.Now().Add(retentionTimeout),
rebroadcastCount: tx.rebroadcastCount,
state: txState,
retentionTs: time.Now().Add(retentionTimeout),
}
// move transaction from broadcasted to error map
c.finalizedErroredTxs[id] = erroredTx
Expand Down Expand Up @@ -575,22 +568,6 @@ func (c *pendingTxContext) withWriteLock(fn func() (string, error)) (string, err
return fn()
}

// GetTxRebroadcastCount returns the number of times a transaction has been rebroadcasted if found.
func (c *pendingTxContext) GetTxRebroadcastCount(id string) (int, error) {
c.lock.RLock()
defer c.lock.RUnlock()
if tx, exists := c.broadcastedProcessedTxs[id]; exists {
return tx.rebroadcastCount, nil
}
if tx, exists := c.confirmedTxs[id]; exists {
return tx.rebroadcastCount, nil
}
if tx, exists := c.finalizedErroredTxs[id]; exists {
return tx.rebroadcastCount, nil
}
return 0, fmt.Errorf("failed to find transaction for id: %s", id)
}

var _ PendingTxContext = &pendingTxContextWithProm{}

type pendingTxContextWithProm struct {
Expand Down Expand Up @@ -705,7 +682,3 @@ func (c *pendingTxContextWithProm) GetTxState(id string) (TxState, error) {
func (c *pendingTxContextWithProm) TrimFinalizedErroredTxs() int {
return c.pendingTx.TrimFinalizedErroredTxs()
}

func (c *pendingTxContextWithProm) GetTxRebroadcastCount(id string) (int, error) {
return c.pendingTx.GetTxRebroadcastCount(id)
}
7 changes: 3 additions & 4 deletions pkg/solana/txm/txm.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,10 +589,9 @@ func (txm *Txm) rebroadcastExpiredTxs(ctx context.Context, client client.ReaderW
continue
}
rebroadcastTx := pendingTx{
tx: tx.tx,
cfg: tx.cfg,
id: tx.id, // using same id in case it was set by caller and we need to maintain it.
rebroadcastCount: tx.rebroadcastCount + 1,
tx: tx.tx,
cfg: tx.cfg,
id: tx.id, // using same id in case it was set by caller and we need to maintain it.
}
// call sendWithRetry directly to avoid enqueuing
_, _, _, sendErr := txm.sendWithRetry(ctx, rebroadcastTx)
Expand Down
31 changes: 13 additions & 18 deletions pkg/solana/txm/txm_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1362,21 +1362,21 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) {
prom.finalized++
prom.assertEqual(t)

// Check that transaction for txID has been finalized and rebroadcasted
// Check that transaction for txID has been finalized and rebroadcasted 1 time.
status, err := txm.GetTransactionStatus(ctx, txID)
require.NoError(t, err)
require.Equal(t, types.Finalized, status)
rebroadcastCount, err := txm.txs.GetTxRebroadcastCount(txID)
require.NoError(t, err)
require.Equal(t, 1, rebroadcastCount)
require.Equal(t, 1, callCount-1) // -1 because the first call is not a rebroadcast
})

t.Run("WithoutRebroadcast", func(t *testing.T) {
txExpirationRebroadcast := false
statuses := map[solana.Signature]func() *rpc.SignatureStatusesResult{}

// mocking the call within sendWithRetry. Rebroadcast is off, so we won't compare it against the slotHeight.
callCount := 0
latestBlockhashFunc := func() (*rpc.GetLatestBlockhashResult, error) {
defer func() { callCount++ }()
return &rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: uint64(2000),
Expand Down Expand Up @@ -1419,9 +1419,7 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) {
status, err := txm.GetTransactionStatus(ctx, txID)
require.NoError(t, err)
require.Equal(t, types.Failed, status)
rebroadcastCount, err := txm.txs.GetTxRebroadcastCount(txID)
require.NoError(t, err)
require.Equal(t, 0, rebroadcastCount)
require.Equal(t, 0, callCount-1) // -1 because the first call is not a rebroadcast
})

t.Run("WithMultipleRebroadcast", func(t *testing.T) {
Expand Down Expand Up @@ -1495,13 +1493,11 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) {
prom.finalized++
prom.assertEqual(t)

// Check that transaction for txID has been finalized and rebroadcasted
// Check that transaction for txID has been finalized and rebroadcasted multiple times.
status, err := txm.GetTransactionStatus(ctx, txID)
require.NoError(t, err)
require.Equal(t, types.Finalized, status)
rebroadcastCount, err := txm.txs.GetTxRebroadcastCount(txID)
require.NoError(t, err)
require.Equal(t, expectedRebroadcastsCount, rebroadcastCount)
require.Equal(t, expectedRebroadcastsCount, callCount-1)
})

t.Run("ConfirmedBeforeRebroadcast", func(t *testing.T) {
Expand All @@ -1517,7 +1513,10 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) {
slotHeightFunc := func() (uint64, error) {
return uint64(1500), nil
}

callCount := 0
latestBlockhashFunc := func() (*rpc.GetLatestBlockhashResult, error) {
defer func() { callCount++ }()
return &rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: uint64(1000),
Expand Down Expand Up @@ -1561,9 +1560,7 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) {
status, err := txm.GetTransactionStatus(ctx, txID)
require.NoError(t, err)
require.Equal(t, types.Finalized, status)
rebroadcastCount, err := txm.txs.GetTxRebroadcastCount(txID)
require.NoError(t, err)
require.Equal(t, 0, rebroadcastCount)
require.Equal(t, 0, callCount-1) // -1 because the first call is not a rebroadcast
})

t.Run("RebroadcastWithError", func(t *testing.T) {
Expand Down Expand Up @@ -1621,12 +1618,10 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) {
prom.error++
prom.assertEqual(t)

// Transaction should be moved to failed after trying to rebroadcast and failing to get confirmations
// Transaction should be moved to failed after trying to rebroadcast 1 time.
status, err := txm.GetTransactionStatus(ctx, txID)
require.NoError(t, err)
require.Equal(t, types.Failed, status)
rebroadcastCount, err := txm.txs.GetTxRebroadcastCount(txID)
require.NoError(t, err)
require.Equal(t, 1, rebroadcastCount)
require.Equal(t, 1, callCount-1) // -1 because the first call is not a rebroadcast
})
}
5 changes: 3 additions & 2 deletions pkg/solana/txm/txm_load_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//go:build integration

package txm
package txm_test

import (
"context"
Expand All @@ -16,6 +16,7 @@ import (

solanaClient "github.com/smartcontractkit/chainlink-solana/pkg/solana/client"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/txm"
keyMocks "github.com/smartcontractkit/chainlink-solana/pkg/solana/txm/mocks"

relayconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
Expand Down Expand Up @@ -71,7 +72,7 @@ func TestTxm_Integration(t *testing.T) {
client, err := solanaClient.NewClient(url, cfg, 2*time.Second, lggr)
require.NoError(t, err)
loader := utils.NewLazyLoad(func() (solanaClient.ReaderWriter, error) { return client, nil })
txm := NewTxm("localnet", loader, nil, cfg, mkey, lggr)
txm := txm.NewTxm("localnet", loader, nil, cfg, mkey, lggr)

// track initial balance
initBal, err := client.Balance(ctx, pubKey)
Expand Down

0 comments on commit 9148d7d

Please sign in to comment.