Skip to content

Commit

Permalink
move state from txes to sigs
Browse files Browse the repository at this point in the history
  • Loading branch information
Farber98 committed Nov 30, 2024
1 parent a6ce47b commit 3a6e643
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 32 deletions.
80 changes: 58 additions & 22 deletions pkg/solana/txm/pendingtx.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ type pendingTx struct {
signatures []solana.Signature
id string
createTs time.Time
state TxState
lastValidBlockHeight uint64 // to track expiration
}

Expand Down Expand Up @@ -129,7 +128,6 @@ func (c *pendingTxContext) New(tx pendingTx, sig solana.Signature, cancel contex
// add signature to tx
tx.signatures = append(tx.signatures, sig)
tx.createTs = time.Now()
tx.state = Broadcasted
// save to the broadcasted map since transaction was just broadcasted
c.broadcastedProcessedTxs[tx.id] = tx
return "", nil
Expand Down Expand Up @@ -237,7 +235,7 @@ func (c *pendingTxContext) ListAllExpiredBroadcastedTxs(currHeight uint64) []pen
defer c.lock.RUnlock()
broadcastedTxes := make([]pendingTx, 0, len(c.broadcastedProcessedTxs)) // worst case, all of them
for _, tx := range c.broadcastedProcessedTxs {
if tx.state == Broadcasted && tx.lastValidBlockHeight < currHeight {
if tx.lastValidBlockHeight < currHeight {
broadcastedTxes = append(broadcastedTxes, tx)
}
}
Expand Down Expand Up @@ -273,12 +271,12 @@ func (c *pendingTxContext) OnProcessed(sig solana.Signature) (string, error) {
return ErrSigDoesNotExist
}
// Transactions should only move to processed from broadcasted
tx, exists := c.broadcastedProcessedTxs[info.id]
_, exists := c.broadcastedProcessedTxs[info.id]
if !exists {
return ErrTransactionNotFound
}
// Check if tranasction already in processed state
if tx.state == Processed {
// Check if sig already in processed state
if info.state == Processed {
return ErrAlreadyInExpectedState
}
return nil
Expand All @@ -297,8 +295,8 @@ func (c *pendingTxContext) OnProcessed(sig solana.Signature) (string, error) {
if !exists {
return info.id, ErrTransactionNotFound
}
// update sig and tx to Processed
info.state, tx.state = Processed, Processed
// update sig to Processed
info.state = Processed
// save updated sig and tx back to the maps
c.sigToTxInfo[sig] = info
c.broadcastedProcessedTxs[info.id] = tx
Expand All @@ -313,8 +311,8 @@ func (c *pendingTxContext) OnConfirmed(sig solana.Signature) (string, error) {
if !sigExists {
return ErrSigDoesNotExist
}
// Check if transaction already in confirmed state
if tx, exists := c.confirmedTxs[info.id]; exists && tx.state == Confirmed {
// Check if sig already in confirmed state
if _, exists := c.confirmedTxs[info.id]; exists && info.state == Confirmed {
return ErrAlreadyInExpectedState
}
// Transactions should only move to confirmed from broadcasted/processed
Expand Down Expand Up @@ -343,7 +341,7 @@ func (c *pendingTxContext) OnConfirmed(sig solana.Signature) (string, error) {
delete(c.cancelBy, info.id)
}
// update sig and tx state to Confirmed
info.state, tx.state = Confirmed, Confirmed
info.state = Confirmed
c.sigToTxInfo[sig] = info
// move tx to confirmed map
c.confirmedTxs[info.id] = tx
Expand Down Expand Up @@ -524,21 +522,58 @@ func (c *pendingTxContext) OnError(sig solana.Signature, retentionTimeout time.D
})
}

// GetTxState retrieves the aggregated state of a transaction based on all its signatures.
// It performs state aggregation only for transactions in broadcastedProcessedTxs or confirmedTxs.
// For transactions in finalizedErroredTxs, it directly returns the stored state.
func (c *pendingTxContext) GetTxState(id string) (TxState, error) {
c.lock.RLock()
defer c.lock.RUnlock()

// Check if the transaction exists in broadcastedProcessedTxs
if tx, exists := c.broadcastedProcessedTxs[id]; exists {
return tx.state, nil
return c.aggregateTxState(tx), nil
}

// Check if the transaction exists in confirmedTxs
if tx, exists := c.confirmedTxs[id]; exists {
return tx.state, nil
return c.aggregateTxState(tx), nil
}

// Check if the transaction exists in finalizedErroredTxs
if tx, exists := c.finalizedErroredTxs[id]; exists {
return tx.state, nil
}

// Transaction not found in any map
return NotFound, fmt.Errorf("failed to find transaction for id: %s", id)
}

// aggregateTxState determines the highest TxState among all signatures of a pending transaction.
func (c *pendingTxContext) aggregateTxState(tx pendingTx) TxState {
// Define the priority of states
statePriority := map[TxState]int{
Broadcasted: 1,
Processed: 2,
Confirmed: 3,
}

// Update highestState based on individual signature states
highestState := Broadcasted
for _, sig := range tx.signatures {
info, exists := c.sigToTxInfo[sig]
if !exists {
continue
}
if priority, ok := statePriority[info.state]; ok {
if priority > statePriority[highestState] {
highestState = info.state
}
}
}

return highestState
}

// TrimFinalizedErroredTxs deletes transactions from the finalized/errored map and the allTxs map after the retention period has passed
func (c *pendingTxContext) TrimFinalizedErroredTxs() int {
var expiredIDs []string
Expand Down Expand Up @@ -587,12 +622,11 @@ func (c *pendingTxContext) OnReorg(sig solana.Signature) (pendingTx, error) {
return ErrSigDoesNotExist
}

// Check if the transaction is still in a non finalized/errored state
var broadcastedExists, confirmedExists bool
_, broadcastedExists = c.broadcastedProcessedTxs[info.id]
_, confirmedExists = c.confirmedTxs[info.id]
if !broadcastedExists && !confirmedExists {
return ErrTransactionNotFound
// Check if the transaction is still in a non-finalized/non-errored state
if _, exists := c.broadcastedProcessedTxs[info.id]; !exists {
if _, exists := c.confirmedTxs[info.id]; !exists {
return ErrTransactionNotFound
}
}
return nil
})
Expand All @@ -609,6 +643,8 @@ func (c *pendingTxContext) OnReorg(sig solana.Signature) (pendingTx, error) {
if !exists {
return "", ErrSigDoesNotExist
}

// Attempt to find the transaction in the broadcasted or confirmed maps
var tx pendingTx
var broadcastedExists, confirmedExists bool
if tx, broadcastedExists = c.broadcastedProcessedTxs[info.id]; broadcastedExists {
Expand All @@ -618,12 +654,12 @@ func (c *pendingTxContext) OnReorg(sig solana.Signature) (pendingTx, error) {
pTx = tx
}
if !broadcastedExists && !confirmedExists {
// transcation does not exist in any non finalized/errored maps
// transaction does not exist in any non finalized/errored maps
return "", ErrTransactionNotFound
}

// Reset the signature status and tx for retrying
info.state, pTx.state = Broadcasted, Broadcasted
// Reset the signature status for retrying
info.state = Broadcasted
c.sigToTxInfo[sig] = info
return "", nil
})
Expand Down
6 changes: 3 additions & 3 deletions pkg/solana/txm/pendingtx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestPendingTxContext_new(t *testing.T) {
require.Equal(t, sig, tx.signatures[0])

// Check status is Broadcasted
require.Equal(t, Broadcasted, tx.state)
require.Equal(t, Broadcasted, txInfo.state)

// Check it does not exist in confirmed map
_, exists = txs.confirmedTxs[msg.id]
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestPendingTxContext_on_broadcasted_processed(t *testing.T) {
require.Equal(t, sig, tx.signatures[0])

// Check status is Processed
require.Equal(t, Processed, tx.state)
require.Equal(t, Processed, txInfo.state)

// Check it does not exist in confirmed map
_, exists = txs.confirmedTxs[msg.id]
Expand Down Expand Up @@ -361,7 +361,7 @@ func TestPendingTxContext_on_confirmed(t *testing.T) {
require.Equal(t, sig, tx.signatures[0])

// Check status is Confirmed
require.Equal(t, Confirmed, tx.state)
require.Equal(t, Confirmed, txInfo.state)

// Check it does not exist in finalized map
_, exists = txs.finalizedErroredTxs[msg.id]
Expand Down
9 changes: 5 additions & 4 deletions pkg/solana/txm/txm.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (txm *Txm) sendWithRetry(ctx context.Context, msg pendingTx) (solanaGo.Tran
txm.done.Add(1)
go func() {
defer txm.done.Done()
txm.retryTx(ctx, msg, initTx, sig)
txm.retryTx(ctx, msg, initTx, sig, func() {})
}()

// Return signed tx, id, signature for use in simulation
Expand Down Expand Up @@ -279,7 +279,8 @@ func (txm *Txm) buildTx(ctx context.Context, msg pendingTx, retryCount int) (sol
// retryTx contains the logic for retrying the transaction, including exponential backoff and fee bumping.
// Retries until context cancelled by timeout or called externally.
// It uses handleRetry helper function to handle each retry attempt.
func (txm *Txm) retryTx(ctx context.Context, msg pendingTx, currentTx solanaGo.Transaction, sig solanaGo.Signature) {
func (txm *Txm) retryTx(ctx context.Context, msg pendingTx, currentTx solanaGo.Transaction, sig solanaGo.Signature, cancel context.CancelFunc) {
defer cancel()
// Initialize signature list with initialTx signature. This list will be used to add new signatures and track retry attempts.
sigs := &signatureList{}
sigs.Allocate()
Expand Down Expand Up @@ -550,11 +551,11 @@ func (txm *Txm) handleReorg(ctx context.Context, sig solanaGo.Signature, status
txm.lggr.Errorw("failed to handle potential re-org", "signature", sig, "id", pTx.id, "error", err)
return err
}
retryCtx, _ := context.WithTimeout(ctx, pTx.cfg.Timeout) // TODO: Ask here. How should we handle the ctx?
retryCtx, cancel := context.WithTimeout(ctx, pTx.cfg.Timeout) // TODO: How should we handle the ctx?
txm.done.Add(1)
go func() {
defer txm.done.Done()
txm.retryTx(retryCtx, pTx, pTx.tx, sig)
txm.retryTx(retryCtx, pTx, pTx.tx, sig, cancel)
}()
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/solana/txm/txm_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1503,8 +1503,7 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) {
return sig1, nil
}

// Mock LatestBlockhash to return an invalid blockhash less than slotHeight
// We won't use it as there will be no rebroadcasts txes to process. All txes will be confirmed before.
// There will be no rebroadcasts txes to process
slotHeightFunc := func() (uint64, error) {
return uint64(1500), nil
}
Expand All @@ -1514,7 +1513,7 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) {
defer func() { callCount++ }()
return &rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: uint64(1000),
LastValidBlockHeight: uint64(2000),
},
}, nil
}
Expand Down

0 comments on commit 3a6e643

Please sign in to comment.