Skip to content

Commit

Permalink
confirm loop refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Farber98 committed Nov 18, 2024
1 parent 47aad6f commit f2ac5a1
Showing 1 changed file with 144 additions and 128 deletions.
272 changes: 144 additions & 128 deletions pkg/solana/txm/txm.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,23 @@ func (txm *Txm) prepareTransaction(ctx context.Context, msg *PendingTx) error {
return nil
}

func solanaValidateBalance(ctx context.Context, reader client.Reader, from solana.PublicKey, amount uint64, msg string) error {
balance, err := reader.Balance(ctx, from)
if err != nil {
return err
}

fee, err := reader.GetFeeForMessage(ctx, msg)
if err != nil {
return err
}

if balance < (amount + fee) {
return fmt.Errorf("balance %d is too low for this transaction to be executed: amount %d + fee %d", balance, amount, fee)
}
return nil
}

// buildTx builds and signs the transaction with the appropriate compute unit price.
func (txm *Txm) buildTx(ctx context.Context, msg PendingTx, retryCount int) (solanaGo.Transaction, error) {
// work with a copy
Expand Down Expand Up @@ -435,153 +452,152 @@ func (txm *Txm) confirm() {
case <-ctx.Done():
return
case <-tick:
// get list of tx signatures to confirm
sigs := txm.txs.ListAll()

// exit switch if not txs to confirm
if len(sigs) == 0 {
break
}

// get client
client, err := txm.client.Get()
if err != nil {
txm.lggr.Errorw("failed to get client in soltxm.confirm", "error", err)
break // exit switch
txm.lggr.Errorw("failed to get client in txm.confirm", "error", err)
return
}
txm.processConfirmations(ctx, client)
}
tick = time.After(utils.WithJitter(txm.cfg.ConfirmPollPeriod()))
}
}

// batch sigs no more than MaxSigsToConfirm each
sigsBatch, err := utils.BatchSplit(sigs, MaxSigsToConfirm)
if err != nil { // this should never happen
txm.lggr.Fatalw("failed to batch signatures", "error", err)
break // exit switch
}
func (txm *Txm) processConfirmations(ctx context.Context, client client.ReaderWriter) {
// Get list of transaction signatures to confirm
sigs := txm.txs.ListAll()

// process signatures
processSigs := func(s []solanaGo.Signature, res []*rpc.SignatureStatusesResult) {
// sort signatures and results process successful first
s, res, err := SortSignaturesAndResults(s, res)
if err != nil {
txm.lggr.Errorw("sorting error", "error", err)
return
}
if len(sigs) == 0 {
return
}

for i := 0; i < len(res); i++ {
// if status is nil (sig not found), continue polling
// sig not found could mean invalid tx or not picked up yet
if res[i] == nil {
txm.lggr.Debugw("tx state: not found",
"signature", s[i],
)

// check confirm timeout exceeded
if txm.txs.Expired(s[i], txm.cfg.TxConfirmTimeout()) {
id, err := txm.txs.OnError(s[i], txm.cfg.TxRetentionTimeout(), TxFailDrop)
if err != nil {
txm.lggr.Infow("failed to mark transaction as errored", "id", id, "signature", s[i], "timeoutSeconds", txm.cfg.TxConfirmTimeout(), "error", err)
} else {
txm.lggr.Infow("failed to find transaction within confirm timeout", "id", id, "signature", s[i], "timeoutSeconds", txm.cfg.TxConfirmTimeout())
}
}
continue
}

// if signature has an error, end polling
if res[i].Err != nil {
id, err := txm.txs.OnError(s[i], txm.cfg.TxRetentionTimeout(), TxFailRevert)
if err != nil {
txm.lggr.Infow("failed to mark transaction as errored", "id", id, "signature", s[i], "error", err)
} else {
txm.lggr.Debugw("tx state: failed", "id", id, "signature", s[i], "error", res[i].Err, "status", res[i].ConfirmationStatus)
}
continue
}

// if signature is processed, keep polling for confirmed or finalized status
if res[i].ConfirmationStatus == rpc.ConfirmationStatusProcessed {
// update transaction state in local memory
id, err := txm.txs.OnProcessed(s[i])
if err != nil && !errors.Is(err, ErrAlreadyInExpectedState) {
txm.lggr.Errorw("failed to mark transaction as processed", "signature", s[i], "error", err)
} else if err == nil {
txm.lggr.Debugw("marking transaction as processed", "id", id, "signature", s[i])
}
// check confirm timeout exceeded if TxConfirmTimeout set
if txm.cfg.TxConfirmTimeout() != 0*time.Second && txm.txs.Expired(s[i], txm.cfg.TxConfirmTimeout()) {
id, err := txm.txs.OnError(s[i], txm.cfg.TxRetentionTimeout(), TxFailDrop)
if err != nil {
txm.lggr.Infow("failed to mark transaction as errored", "id", id, "signature", s[i], "timeoutSeconds", txm.cfg.TxConfirmTimeout(), "error", err)
} else {
txm.lggr.Debugw("tx failed to move beyond 'processed' within confirm timeout", "id", id, "signature", s[i], "timeoutSeconds", txm.cfg.TxConfirmTimeout())
}
}
continue
}

// if signature is confirmed, keep polling for finalized status
if res[i].ConfirmationStatus == rpc.ConfirmationStatusConfirmed {
id, err := txm.txs.OnConfirmed(s[i])
if err != nil && !errors.Is(err, ErrAlreadyInExpectedState) {
txm.lggr.Errorw("failed to mark transaction as confirmed", "id", id, "signature", s[i], "error", err)
} else if err == nil {
txm.lggr.Debugw("marking transaction as confirmed", "id", id, "signature", s[i])
}
continue
}

// if signature is finalized, end polling
if res[i].ConfirmationStatus == rpc.ConfirmationStatusFinalized {
id, err := txm.txs.OnFinalized(s[i], txm.cfg.TxRetentionTimeout())
if err != nil {
txm.lggr.Errorw("failed to mark transaction as finalized", "id", id, "signature", s[i], "error", err)
} else {
txm.lggr.Debugw("marking transaction as finalized", "id", id, "signature", s[i])
}
continue
}
}
}
// batch sigs no more than MaxSigsToConfirm each
sigsBatch, err := utils.BatchSplit(sigs, MaxSigsToConfirm)
if err != nil { // this should never happen
txm.lggr.Fatalw("failed to batch signatures", "error", err)
return
}

// waitgroup for processing
var wg sync.WaitGroup
var wg sync.WaitGroup
for i := 0; i < len(sigsBatch); i++ {
// fetch signature statuses
statuses, err := client.SignatureStatuses(ctx, sigsBatch[i])
if err != nil {
txm.lggr.Errorw("failed to get signature statuses in txm.confirm", "error", err)
break // exit for loop
}

// loop through batch
for i := 0; i < len(sigsBatch); i++ {
// fetch signature statuses
statuses, err := client.SignatureStatuses(ctx, sigsBatch[i])
if err != nil {
txm.lggr.Errorw("failed to get signature statuses in soltxm.confirm", "error", err)
break // exit for loop
}
wg.Add(1)
// nonblocking: process batches as soon as they come in
go func(index int) {
defer wg.Done()
txm.processSignatureStatuses(sigsBatch[i], statuses)
}(i)
}
wg.Wait() // wait for processing to finish
}

wg.Add(1)
// nonblocking: process batches as soon as they come in
go func(index int) {
defer wg.Done()
processSigs(sigsBatch[index], statuses)
}(i)
}
wg.Wait() // wait for processing to finish
func (txm *Txm) processSignatureStatuses(sigs []solanaGo.Signature, res []*rpc.SignatureStatusesResult) {
// Sort signatures and results process successful first
sortedSigs, sortedRes, err := SortSignaturesAndResults(sigs, res)
if err != nil {
txm.lggr.Errorw("sorting error", "error", err)
return
}

for i := 0; i < len(sortedRes); i++ {
sig, status := sortedSigs[i], sortedRes[i]
// if status is nil (sig not found), continue polling
// sig not found could mean invalid tx or not picked up yet
if status == nil {
txm.handleNotFoundSignatureStatus(sig)
continue
}

// if signature has an error, end polling
if status.Err != nil {
txm.handleErrorSignatureStatus(sig, status)
continue
}

switch status.ConfirmationStatus {
case rpc.ConfirmationStatusProcessed:
// if signature is processed, keep polling for confirmed or finalized status
txm.handleProcessedSignatureStatus(sig)
continue
case rpc.ConfirmationStatusConfirmed:
// if signature is confirmed, keep polling for finalized status
txm.handleConfirmedSignatureStatus(sig)
continue
case rpc.ConfirmationStatusFinalized:
// if signature is finalized, end polling
txm.handleFinalizedSignatureStatus(sig)
continue
default:
txm.lggr.Warnw("unknown confirmation status", "signature", sig, "status", status.ConfirmationStatus)
continue
}
tick = time.After(utils.WithJitter(txm.cfg.ConfirmPollPeriod()))
}
}

func solanaValidateBalance(ctx context.Context, reader client.Reader, from solana.PublicKey, amount uint64, msg string) error {
balance, err := reader.Balance(ctx, from)
if err != nil {
return err
func (txm *Txm) handleNotFoundSignatureStatus(sig solanaGo.Signature) {
txm.lggr.Debugw("tx state: not found", "signature", sig)

// check confirm timeout exceeded
if txm.txs.Expired(sig, txm.cfg.TxConfirmTimeout()) {
id, err := txm.txs.OnError(sig, txm.cfg.TxRetentionTimeout(), TxFailDrop)
if err != nil {
txm.lggr.Infow("failed to mark transaction as errored", "id", id, "signature", sig, "timeoutSeconds", txm.cfg.TxConfirmTimeout(), "error", err)
} else {
txm.lggr.Infow("failed to find transaction within confirm timeout", "id", id, "signature", sig, "timeoutSeconds", txm.cfg.TxConfirmTimeout())
}
}
}

fee, err := reader.GetFeeForMessage(ctx, msg)
func (txm *Txm) handleErrorSignatureStatus(sig solanaGo.Signature, status *rpc.SignatureStatusesResult) {
id, err := txm.txs.OnError(sig, txm.cfg.TxRetentionTimeout(), TxFailRevert)
if err != nil {
return err
txm.lggr.Infow("failed to mark transaction as errored", "id", id, "signature", sig, "error", err)
} else {
txm.lggr.Debugw("tx state: failed", "id", id, "signature", sig, "error", status.Err, "status", status.ConfirmationStatus)
}
}

if balance < (amount + fee) {
return fmt.Errorf("balance %d is too low for this transaction to be executed: amount %d + fee %d", balance, amount, fee)
func (txm *Txm) handleProcessedSignatureStatus(sig solanaGo.Signature) {
// update transaction state in local memory
id, err := txm.txs.OnProcessed(sig)
if err != nil && !errors.Is(err, ErrAlreadyInExpectedState) {
txm.lggr.Errorw("failed to mark transaction as processed", "signature", sig, "error", err)
} else if err == nil {
txm.lggr.Debugw("marking transaction as processed", "id", id, "signature", sig)
}
// check confirm timeout exceeded if TxConfirmTimeout set
if txm.cfg.TxConfirmTimeout() != 0*time.Second && txm.txs.Expired(sig, txm.cfg.TxConfirmTimeout()) {
id, err := txm.txs.OnError(sig, txm.cfg.TxRetentionTimeout(), TxFailDrop)
if err != nil {
txm.lggr.Infow("failed to mark transaction as errored", "id", id, "signature", sig, "timeoutSeconds", txm.cfg.TxConfirmTimeout(), "error", err)
} else {
txm.lggr.Debugw("tx failed to move beyond 'processed' within confirm timeout", "id", id, "signature", sig, "timeoutSeconds", txm.cfg.TxConfirmTimeout())
}
}
}

func (txm *Txm) handleConfirmedSignatureStatus(sig solanaGo.Signature) {
id, err := txm.txs.OnConfirmed(sig)
if err != nil && !errors.Is(err, ErrAlreadyInExpectedState) {
txm.lggr.Errorw("failed to mark transaction as confirmed", "id", id, "signature", sig, "error", err)
} else if err == nil {
txm.lggr.Debugw("marking transaction as confirmed", "id", id, "signature", sig)
}
}

func (txm *Txm) handleFinalizedSignatureStatus(sig solanaGo.Signature) {
id, err := txm.txs.OnFinalized(sig, txm.cfg.TxRetentionTimeout())
if err != nil {
txm.lggr.Errorw("failed to mark transaction as finalized", "id", id, "signature", sig, "error", err)
} else {
txm.lggr.Debugw("marking transaction as finalized", "id", id, "signature", sig)
}
return nil
}

// goroutine that simulates tx (use a bounded number of goroutines to pick from queue?)
Expand Down

0 comments on commit f2ac5a1

Please sign in to comment.