From f2ac5a1b3e8a2027fa7063d010d939c57e29f31b Mon Sep 17 00:00:00 2001 From: Farber98 Date: Mon, 18 Nov 2024 19:34:14 -0300 Subject: [PATCH] confirm loop refactor --- pkg/solana/txm/txm.go | 272 ++++++++++++++++++++++-------------------- 1 file changed, 144 insertions(+), 128 deletions(-) diff --git a/pkg/solana/txm/txm.go b/pkg/solana/txm/txm.go index a283afd6a..93d633e52 100644 --- a/pkg/solana/txm/txm.go +++ b/pkg/solana/txm/txm.go @@ -253,6 +253,23 @@ func (txm *Txm) prepareTransaction(ctx context.Context, msg *PendingTx) error { return nil } +func solanaValidateBalance(ctx context.Context, reader client.Reader, from solana.PublicKey, amount uint64, msg string) error { + balance, err := reader.Balance(ctx, from) + if err != nil { + return err + } + + fee, err := reader.GetFeeForMessage(ctx, msg) + if err != nil { + return err + } + + if balance < (amount + fee) { + return fmt.Errorf("balance %d is too low for this transaction to be executed: amount %d + fee %d", balance, amount, fee) + } + return nil +} + // buildTx builds and signs the transaction with the appropriate compute unit price. func (txm *Txm) buildTx(ctx context.Context, msg PendingTx, retryCount int) (solanaGo.Transaction, error) { // work with a copy @@ -435,153 +452,152 @@ func (txm *Txm) confirm() { case <-ctx.Done(): return case <-tick: - // get list of tx signatures to confirm - sigs := txm.txs.ListAll() - - // exit switch if not txs to confirm - if len(sigs) == 0 { - break - } - - // get client client, err := txm.client.Get() if err != nil { - txm.lggr.Errorw("failed to get client in soltxm.confirm", "error", err) - break // exit switch + txm.lggr.Errorw("failed to get client in txm.confirm", "error", err) + return } + txm.processConfirmations(ctx, client) + } + tick = time.After(utils.WithJitter(txm.cfg.ConfirmPollPeriod())) + } +} - // batch sigs no more than MaxSigsToConfirm each - sigsBatch, err := utils.BatchSplit(sigs, MaxSigsToConfirm) - if err != nil { // this should never happen - txm.lggr.Fatalw("failed to batch signatures", "error", err) - break // exit switch - } +func (txm *Txm) processConfirmations(ctx context.Context, client client.ReaderWriter) { + // Get list of transaction signatures to confirm + sigs := txm.txs.ListAll() - // process signatures - processSigs := func(s []solanaGo.Signature, res []*rpc.SignatureStatusesResult) { - // sort signatures and results process successful first - s, res, err := SortSignaturesAndResults(s, res) - if err != nil { - txm.lggr.Errorw("sorting error", "error", err) - return - } + if len(sigs) == 0 { + return + } - for i := 0; i < len(res); i++ { - // if status is nil (sig not found), continue polling - // sig not found could mean invalid tx or not picked up yet - if res[i] == nil { - txm.lggr.Debugw("tx state: not found", - "signature", s[i], - ) - - // check confirm timeout exceeded - if txm.txs.Expired(s[i], txm.cfg.TxConfirmTimeout()) { - id, err := txm.txs.OnError(s[i], txm.cfg.TxRetentionTimeout(), TxFailDrop) - if err != nil { - txm.lggr.Infow("failed to mark transaction as errored", "id", id, "signature", s[i], "timeoutSeconds", txm.cfg.TxConfirmTimeout(), "error", err) - } else { - txm.lggr.Infow("failed to find transaction within confirm timeout", "id", id, "signature", s[i], "timeoutSeconds", txm.cfg.TxConfirmTimeout()) - } - } - continue - } - - // if signature has an error, end polling - if res[i].Err != nil { - id, err := txm.txs.OnError(s[i], txm.cfg.TxRetentionTimeout(), TxFailRevert) - if err != nil { - txm.lggr.Infow("failed to mark transaction as errored", "id", id, "signature", s[i], "error", err) - } else { - txm.lggr.Debugw("tx state: failed", "id", id, "signature", s[i], "error", res[i].Err, "status", res[i].ConfirmationStatus) - } - continue - } - - // if signature is processed, keep polling for confirmed or finalized status - if res[i].ConfirmationStatus == rpc.ConfirmationStatusProcessed { - // update transaction state in local memory - id, err := txm.txs.OnProcessed(s[i]) - if err != nil && !errors.Is(err, ErrAlreadyInExpectedState) { - txm.lggr.Errorw("failed to mark transaction as processed", "signature", s[i], "error", err) - } else if err == nil { - txm.lggr.Debugw("marking transaction as processed", "id", id, "signature", s[i]) - } - // check confirm timeout exceeded if TxConfirmTimeout set - if txm.cfg.TxConfirmTimeout() != 0*time.Second && txm.txs.Expired(s[i], txm.cfg.TxConfirmTimeout()) { - id, err := txm.txs.OnError(s[i], txm.cfg.TxRetentionTimeout(), TxFailDrop) - if err != nil { - txm.lggr.Infow("failed to mark transaction as errored", "id", id, "signature", s[i], "timeoutSeconds", txm.cfg.TxConfirmTimeout(), "error", err) - } else { - txm.lggr.Debugw("tx failed to move beyond 'processed' within confirm timeout", "id", id, "signature", s[i], "timeoutSeconds", txm.cfg.TxConfirmTimeout()) - } - } - continue - } - - // if signature is confirmed, keep polling for finalized status - if res[i].ConfirmationStatus == rpc.ConfirmationStatusConfirmed { - id, err := txm.txs.OnConfirmed(s[i]) - if err != nil && !errors.Is(err, ErrAlreadyInExpectedState) { - txm.lggr.Errorw("failed to mark transaction as confirmed", "id", id, "signature", s[i], "error", err) - } else if err == nil { - txm.lggr.Debugw("marking transaction as confirmed", "id", id, "signature", s[i]) - } - continue - } - - // if signature is finalized, end polling - if res[i].ConfirmationStatus == rpc.ConfirmationStatusFinalized { - id, err := txm.txs.OnFinalized(s[i], txm.cfg.TxRetentionTimeout()) - if err != nil { - txm.lggr.Errorw("failed to mark transaction as finalized", "id", id, "signature", s[i], "error", err) - } else { - txm.lggr.Debugw("marking transaction as finalized", "id", id, "signature", s[i]) - } - continue - } - } - } + // batch sigs no more than MaxSigsToConfirm each + sigsBatch, err := utils.BatchSplit(sigs, MaxSigsToConfirm) + if err != nil { // this should never happen + txm.lggr.Fatalw("failed to batch signatures", "error", err) + return + } - // waitgroup for processing - var wg sync.WaitGroup + var wg sync.WaitGroup + for i := 0; i < len(sigsBatch); i++ { + // fetch signature statuses + statuses, err := client.SignatureStatuses(ctx, sigsBatch[i]) + if err != nil { + txm.lggr.Errorw("failed to get signature statuses in txm.confirm", "error", err) + break // exit for loop + } - // loop through batch - for i := 0; i < len(sigsBatch); i++ { - // fetch signature statuses - statuses, err := client.SignatureStatuses(ctx, sigsBatch[i]) - if err != nil { - txm.lggr.Errorw("failed to get signature statuses in soltxm.confirm", "error", err) - break // exit for loop - } + wg.Add(1) + // nonblocking: process batches as soon as they come in + go func(index int) { + defer wg.Done() + txm.processSignatureStatuses(sigsBatch[i], statuses) + }(i) + } + wg.Wait() // wait for processing to finish +} - wg.Add(1) - // nonblocking: process batches as soon as they come in - go func(index int) { - defer wg.Done() - processSigs(sigsBatch[index], statuses) - }(i) - } - wg.Wait() // wait for processing to finish +func (txm *Txm) processSignatureStatuses(sigs []solanaGo.Signature, res []*rpc.SignatureStatusesResult) { + // Sort signatures and results process successful first + sortedSigs, sortedRes, err := SortSignaturesAndResults(sigs, res) + if err != nil { + txm.lggr.Errorw("sorting error", "error", err) + return + } + + for i := 0; i < len(sortedRes); i++ { + sig, status := sortedSigs[i], sortedRes[i] + // if status is nil (sig not found), continue polling + // sig not found could mean invalid tx or not picked up yet + if status == nil { + txm.handleNotFoundSignatureStatus(sig) + continue + } + + // if signature has an error, end polling + if status.Err != nil { + txm.handleErrorSignatureStatus(sig, status) + continue + } + + switch status.ConfirmationStatus { + case rpc.ConfirmationStatusProcessed: + // if signature is processed, keep polling for confirmed or finalized status + txm.handleProcessedSignatureStatus(sig) + continue + case rpc.ConfirmationStatusConfirmed: + // if signature is confirmed, keep polling for finalized status + txm.handleConfirmedSignatureStatus(sig) + continue + case rpc.ConfirmationStatusFinalized: + // if signature is finalized, end polling + txm.handleFinalizedSignatureStatus(sig) + continue + default: + txm.lggr.Warnw("unknown confirmation status", "signature", sig, "status", status.ConfirmationStatus) + continue } - tick = time.After(utils.WithJitter(txm.cfg.ConfirmPollPeriod())) } } -func solanaValidateBalance(ctx context.Context, reader client.Reader, from solana.PublicKey, amount uint64, msg string) error { - balance, err := reader.Balance(ctx, from) - if err != nil { - return err +func (txm *Txm) handleNotFoundSignatureStatus(sig solanaGo.Signature) { + txm.lggr.Debugw("tx state: not found", "signature", sig) + + // check confirm timeout exceeded + if txm.txs.Expired(sig, txm.cfg.TxConfirmTimeout()) { + id, err := txm.txs.OnError(sig, txm.cfg.TxRetentionTimeout(), TxFailDrop) + if err != nil { + txm.lggr.Infow("failed to mark transaction as errored", "id", id, "signature", sig, "timeoutSeconds", txm.cfg.TxConfirmTimeout(), "error", err) + } else { + txm.lggr.Infow("failed to find transaction within confirm timeout", "id", id, "signature", sig, "timeoutSeconds", txm.cfg.TxConfirmTimeout()) + } } +} - fee, err := reader.GetFeeForMessage(ctx, msg) +func (txm *Txm) handleErrorSignatureStatus(sig solanaGo.Signature, status *rpc.SignatureStatusesResult) { + id, err := txm.txs.OnError(sig, txm.cfg.TxRetentionTimeout(), TxFailRevert) if err != nil { - return err + txm.lggr.Infow("failed to mark transaction as errored", "id", id, "signature", sig, "error", err) + } else { + txm.lggr.Debugw("tx state: failed", "id", id, "signature", sig, "error", status.Err, "status", status.ConfirmationStatus) } +} - if balance < (amount + fee) { - return fmt.Errorf("balance %d is too low for this transaction to be executed: amount %d + fee %d", balance, amount, fee) +func (txm *Txm) handleProcessedSignatureStatus(sig solanaGo.Signature) { + // update transaction state in local memory + id, err := txm.txs.OnProcessed(sig) + if err != nil && !errors.Is(err, ErrAlreadyInExpectedState) { + txm.lggr.Errorw("failed to mark transaction as processed", "signature", sig, "error", err) + } else if err == nil { + txm.lggr.Debugw("marking transaction as processed", "id", id, "signature", sig) + } + // check confirm timeout exceeded if TxConfirmTimeout set + if txm.cfg.TxConfirmTimeout() != 0*time.Second && txm.txs.Expired(sig, txm.cfg.TxConfirmTimeout()) { + id, err := txm.txs.OnError(sig, txm.cfg.TxRetentionTimeout(), TxFailDrop) + if err != nil { + txm.lggr.Infow("failed to mark transaction as errored", "id", id, "signature", sig, "timeoutSeconds", txm.cfg.TxConfirmTimeout(), "error", err) + } else { + txm.lggr.Debugw("tx failed to move beyond 'processed' within confirm timeout", "id", id, "signature", sig, "timeoutSeconds", txm.cfg.TxConfirmTimeout()) + } + } +} + +func (txm *Txm) handleConfirmedSignatureStatus(sig solanaGo.Signature) { + id, err := txm.txs.OnConfirmed(sig) + if err != nil && !errors.Is(err, ErrAlreadyInExpectedState) { + txm.lggr.Errorw("failed to mark transaction as confirmed", "id", id, "signature", sig, "error", err) + } else if err == nil { + txm.lggr.Debugw("marking transaction as confirmed", "id", id, "signature", sig) + } +} + +func (txm *Txm) handleFinalizedSignatureStatus(sig solanaGo.Signature) { + id, err := txm.txs.OnFinalized(sig, txm.cfg.TxRetentionTimeout()) + if err != nil { + txm.lggr.Errorw("failed to mark transaction as finalized", "id", id, "signature", sig, "error", err) + } else { + txm.lggr.Debugw("marking transaction as finalized", "id", id, "signature", sig) } - return nil } // goroutine that simulates tx (use a bounded number of goroutines to pick from queue?)