Skip to content

Commit

Permalink
tests + check to save rpc calls
Browse files Browse the repository at this point in the history
  • Loading branch information
Farber98 committed Nov 21, 2024
1 parent 07403c5 commit 6e982c0
Show file tree
Hide file tree
Showing 3 changed files with 357 additions and 75 deletions.
21 changes: 20 additions & 1 deletion pkg/solana/txm/pendingtx.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type PendingTxContext interface {
ListAll() []solana.Signature
// ListAllExpiredBroadcastedTxs returns all the txes that are in broadcasted state and have expired for given slot height compared against their lastValidBlockHeight.
ListAllExpiredBroadcastedTxs(currHeight uint64) []pendingTx
// ListAllBroadcastedTxs returns all the txes that are in broadcasted state.
ListAllBroadcastedTxs() []pendingTx
// Expired returns whether or not confirmation timeout amount of time has passed since creation
Expired(sig solana.Signature, confirmationTimeout time.Duration) bool
// OnProcessed marks transactions as Processed
Expand Down Expand Up @@ -215,7 +217,7 @@ func (c *pendingTxContext) ListAll() []solana.Signature {
return maps.Keys(c.sigToID)
}

// ListAllExpiredBroadcastedTxs returns all the expired broadcasted that are in broadcasted state and have expired for given slot height.
// ListAllExpiredBroadcastedTxs returns all the txes that are in broadcasted state and have expired for given slot height compared against their lastValidBlockHeight.
func (c *pendingTxContext) ListAllExpiredBroadcastedTxs(currHeight uint64) []pendingTx {
c.lock.RLock()
defer c.lock.RUnlock()
Expand All @@ -228,6 +230,19 @@ func (c *pendingTxContext) ListAllExpiredBroadcastedTxs(currHeight uint64) []pen
return broadcastedTxes
}

// ListAllBroadcastedTxs returns all the txes that are in broadcasted state.
func (c *pendingTxContext) ListAllBroadcastedTxs() []pendingTx {
c.lock.RLock()
defer c.lock.RUnlock()
broadcastedTxes := make([]pendingTx, 0, len(c.broadcastedProcessedTxs)) // worst case, all of them
for _, tx := range c.broadcastedProcessedTxs {
if tx.state == Broadcasted {
broadcastedTxes = append(broadcastedTxes, tx)
}
}
return broadcastedTxes
}

// Expired returns if the timeout for trying to confirm a signature has been reached
func (c *pendingTxContext) Expired(sig solana.Signature, confirmationTimeout time.Duration) bool {
c.lock.RLock()
Expand Down Expand Up @@ -638,3 +653,7 @@ func (c *pendingTxContextWithProm) TrimFinalizedErroredTxs() {
func (c *pendingTxContextWithProm) GetTxRebroadcastCount(id string) (int, error) {
return c.pendingTx.GetTxRebroadcastCount(id)
}

func (c *pendingTxContextWithProm) ListAllBroadcastedTxs() []pendingTx {
return c.pendingTx.ListAllBroadcastedTxs()
}
6 changes: 6 additions & 0 deletions pkg/solana/txm/txm.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,12 @@ func (txm *Txm) confirm() {
break
}
txm.processConfirmations(ctx, client)

// In case all txes where confirmed and there's nothing to rebroadcast.
// This check saves making 2 RPC calls (slot height + blockhash) when there's nothing to process.
if len(txm.txs.ListAllBroadcastedTxs()) == 0 {
break
}
if txm.cfg.TxExpirationRebroadcast() {
txm.rebroadcastExpiredTxs(ctx, client)
}
Expand Down
Loading

0 comments on commit 6e982c0

Please sign in to comment.