From 3ad2bc831a788f6b44c47b7fa8b87f6da4a4c595 Mon Sep 17 00:00:00 2001 From: Farber98 Date: Fri, 29 Nov 2024 12:55:12 -0300 Subject: [PATCH] move things arround + add reorg detection --- pkg/solana/txm/pendingtx.go | 245 ++++++++++++++++++++++++++---------- pkg/solana/txm/txm.go | 60 +++++++-- pkg/solana/txm/utils.go | 28 +++++ 3 files changed, 261 insertions(+), 72 deletions(-) diff --git a/pkg/solana/txm/pendingtx.go b/pkg/solana/txm/pendingtx.go index f5cd214b4..0f8d5bef1 100644 --- a/pkg/solana/txm/pendingtx.go +++ b/pkg/solana/txm/pendingtx.go @@ -47,6 +47,12 @@ type PendingTxContext interface { GetTxState(id string) (TxState, error) // TrimFinalizedErroredTxs removes transactions that have reached their retention time TrimFinalizedErroredTxs() int + // GetSignatureInfo returns the transaction ID and TxState for the provided signature + GetSignatureInfo(sig solana.Signature) (txInfo, error) + // UpdateSignatureStatus updates the status of the provided signature within sigToTxInfo map + UpdateSignatureStatus(sig solana.Signature, newStatus TxState) (string, error) + // OnReorg resets the transaction state to Broadcasted for the given signature and returns the pendingTx. + OnReorg(sig solana.Signature) (pendingTx, error) } // finishedTx is used to store info required to track transactions to finality or error @@ -174,12 +180,12 @@ func (c *pendingTxContext) AddSignature(id string, sig solana.Signature) error { func (c *pendingTxContext) Remove(sig solana.Signature) (id string, err error) { err = c.withReadLock(func() error { // check if already removed - txInfo, sigExists := c.sigToTxInfo[sig] + info, sigExists := c.sigToTxInfo[sig] if !sigExists { return ErrSigDoesNotExist } - _, broadcastedIDExists := c.broadcastedProcessedTxs[txInfo.id] - _, confirmedIDExists := c.confirmedTxs[txInfo.id] + _, broadcastedIDExists := c.broadcastedProcessedTxs[info.id] + _, confirmedIDExists := c.confirmedTxs[info.id] // transcation does not exist in tx maps if !broadcastedIDExists && !confirmedIDExists { return ErrTransactionNotFound @@ -192,31 +198,31 @@ func (c *pendingTxContext) Remove(sig solana.Signature) (id string, err error) { // upgrade to write lock if sig does not exist return c.withWriteLock(func() (string, error) { - txInfo, sigExists := c.sigToTxInfo[sig] + info, sigExists := c.sigToTxInfo[sig] if !sigExists { - return txInfo.id, ErrSigDoesNotExist + return info.id, ErrSigDoesNotExist } var tx pendingTx - if tempTx, exists := c.broadcastedProcessedTxs[txInfo.id]; exists { + if tempTx, exists := c.broadcastedProcessedTxs[info.id]; exists { tx = tempTx - delete(c.broadcastedProcessedTxs, txInfo.id) + delete(c.broadcastedProcessedTxs, info.id) } - if tempTx, exists := c.confirmedTxs[txInfo.id]; exists { + if tempTx, exists := c.confirmedTxs[info.id]; exists { tx = tempTx - delete(c.confirmedTxs, txInfo.id) + delete(c.confirmedTxs, info.id) } // call cancel func + remove from map - if cancel, exists := c.cancelBy[txInfo.id]; exists { + if cancel, exists := c.cancelBy[info.id]; exists { cancel() // cancel context - delete(c.cancelBy, txInfo.id) + delete(c.cancelBy, info.id) } // remove all signatures associated with transaction from sig map for _, s := range tx.signatures { delete(c.sigToTxInfo, s) } - return txInfo.id, nil + return info.id, nil }) } @@ -248,14 +254,14 @@ func (c *pendingTxContext) Expired(sig solana.Signature, confirmationTimeout tim if confirmationTimeout == 0 { return false } - txInfo, exists := c.sigToTxInfo[sig] + info, exists := c.sigToTxInfo[sig] if !exists { return false // return expired = false if timestamp does not exist (likely cleaned up by something else previously) } - if tx, exists := c.broadcastedProcessedTxs[txInfo.id]; exists { + if tx, exists := c.broadcastedProcessedTxs[info.id]; exists { return time.Since(tx.createTs) > confirmationTimeout } - if tx, exists := c.confirmedTxs[txInfo.id]; exists { + if tx, exists := c.confirmedTxs[info.id]; exists { return time.Since(tx.createTs) > confirmationTimeout } return false // return expired = false if tx does not exist (likely cleaned up by something else previously) @@ -264,12 +270,12 @@ func (c *pendingTxContext) Expired(sig solana.Signature, confirmationTimeout tim func (c *pendingTxContext) OnProcessed(sig solana.Signature) (string, error) { err := c.withReadLock(func() error { // validate if sig exists - txInfo, sigExists := c.sigToTxInfo[sig] + info, sigExists := c.sigToTxInfo[sig] if !sigExists { return ErrSigDoesNotExist } // Transactions should only move to processed from broadcasted - tx, exists := c.broadcastedProcessedTxs[txInfo.id] + tx, exists := c.broadcastedProcessedTxs[info.id] if !exists { return ErrTransactionNotFound } @@ -285,35 +291,35 @@ func (c *pendingTxContext) OnProcessed(sig solana.Signature) (string, error) { // upgrade to write lock if sig and id exist return c.withWriteLock(func() (string, error) { - txInfo, sigExists := c.sigToTxInfo[sig] + info, sigExists := c.sigToTxInfo[sig] if !sigExists { - return txInfo.id, ErrSigDoesNotExist + return info.id, ErrSigDoesNotExist } - tx, exists := c.broadcastedProcessedTxs[txInfo.id] + tx, exists := c.broadcastedProcessedTxs[info.id] if !exists { - return txInfo.id, ErrTransactionNotFound + return info.id, ErrTransactionNotFound } // update tx state to Processed tx.state = Processed // save updated tx back to the broadcasted map - c.broadcastedProcessedTxs[txInfo.id] = tx - return txInfo.id, nil + c.broadcastedProcessedTxs[info.id] = tx + return info.id, nil }) } func (c *pendingTxContext) OnConfirmed(sig solana.Signature) (string, error) { err := c.withReadLock(func() error { // validate if sig exists - txInfo, sigExists := c.sigToTxInfo[sig] + info, sigExists := c.sigToTxInfo[sig] if !sigExists { return ErrSigDoesNotExist } // Check if transaction already in confirmed state - if tx, exists := c.confirmedTxs[txInfo.id]; exists && tx.state == Confirmed { + if tx, exists := c.confirmedTxs[info.id]; exists && tx.state == Confirmed { return ErrAlreadyInExpectedState } // Transactions should only move to confirmed from broadcasted/processed - if _, exists := c.broadcastedProcessedTxs[txInfo.id]; !exists { + if _, exists := c.broadcastedProcessedTxs[info.id]; !exists { return ErrTransactionNotFound } return nil @@ -324,38 +330,38 @@ func (c *pendingTxContext) OnConfirmed(sig solana.Signature) (string, error) { // upgrade to write lock if id exists return c.withWriteLock(func() (string, error) { - txInfo, sigExists := c.sigToTxInfo[sig] + info, sigExists := c.sigToTxInfo[sig] if !sigExists { - return txInfo.id, ErrSigDoesNotExist + return info.id, ErrSigDoesNotExist } - tx, exists := c.broadcastedProcessedTxs[txInfo.id] + tx, exists := c.broadcastedProcessedTxs[info.id] if !exists { - return txInfo.id, ErrTransactionNotFound + return info.id, ErrTransactionNotFound } // call cancel func + remove from map to stop the retry/bumping cycle for this transaction - if cancel, exists := c.cancelBy[txInfo.id]; exists { + if cancel, exists := c.cancelBy[info.id]; exists { cancel() // cancel context - delete(c.cancelBy, txInfo.id) + delete(c.cancelBy, info.id) } // update tx state to Confirmed tx.state = Confirmed // move tx to confirmed map - c.confirmedTxs[txInfo.id] = tx + c.confirmedTxs[info.id] = tx // remove tx from broadcasted map - delete(c.broadcastedProcessedTxs, txInfo.id) - return txInfo.id, nil + delete(c.broadcastedProcessedTxs, info.id) + return info.id, nil }) } func (c *pendingTxContext) OnFinalized(sig solana.Signature, retentionTimeout time.Duration) (string, error) { err := c.withReadLock(func() error { - txInfo, sigExists := c.sigToTxInfo[sig] + info, sigExists := c.sigToTxInfo[sig] if !sigExists { return ErrSigDoesNotExist } // Allow transactions to transition from broadcasted, processed, or confirmed state in case there are delays between status checks - _, broadcastedExists := c.broadcastedProcessedTxs[txInfo.id] - _, confirmedExists := c.confirmedTxs[txInfo.id] + _, broadcastedExists := c.broadcastedProcessedTxs[info.id] + _, confirmedExists := c.confirmedTxs[info.id] if !broadcastedExists && !confirmedExists { return ErrTransactionNotFound } @@ -367,31 +373,31 @@ func (c *pendingTxContext) OnFinalized(sig solana.Signature, retentionTimeout ti // upgrade to write lock if id exists return c.withWriteLock(func() (string, error) { - txInfo, exists := c.sigToTxInfo[sig] + info, exists := c.sigToTxInfo[sig] if !exists { - return txInfo.id, ErrSigDoesNotExist + return info.id, ErrSigDoesNotExist } var tx, tempTx pendingTx var broadcastedExists, confirmedExists bool - if tempTx, broadcastedExists = c.broadcastedProcessedTxs[txInfo.id]; broadcastedExists { + if tempTx, broadcastedExists = c.broadcastedProcessedTxs[info.id]; broadcastedExists { tx = tempTx } - if tempTx, confirmedExists = c.confirmedTxs[txInfo.id]; confirmedExists { + if tempTx, confirmedExists = c.confirmedTxs[info.id]; confirmedExists { tx = tempTx } if !broadcastedExists && !confirmedExists { - return txInfo.id, ErrTransactionNotFound + return info.id, ErrTransactionNotFound } // call cancel func + remove from map to stop the retry/bumping cycle for this transaction // cancel is expected to be called and removed when tx is confirmed but checked here too in case state is skipped - if cancel, exists := c.cancelBy[txInfo.id]; exists { + if cancel, exists := c.cancelBy[info.id]; exists { cancel() // cancel context - delete(c.cancelBy, txInfo.id) + delete(c.cancelBy, info.id) } // delete from broadcasted map, if exists - delete(c.broadcastedProcessedTxs, txInfo.id) + delete(c.broadcastedProcessedTxs, info.id) // delete from confirmed map, if exists - delete(c.confirmedTxs, txInfo.id) + delete(c.confirmedTxs, info.id) // remove all related signatures from the sigToTxInfo map to skip picking up this tx in the confirmation logic for _, s := range tx.signatures { delete(c.sigToTxInfo, s) @@ -399,15 +405,15 @@ func (c *pendingTxContext) OnFinalized(sig solana.Signature, retentionTimeout ti // if retention duration is set to 0, delete transaction from storage // otherwise, move to finalized map if retentionTimeout == 0 { - return txInfo.id, nil + return info.id, nil } finalizedTx := finishedTx{ state: Finalized, retentionTs: time.Now().Add(retentionTimeout), } // move transaction from confirmed to finalized map - c.finalizedErroredTxs[txInfo.id] = finalizedTx - return txInfo.id, nil + c.finalizedErroredTxs[info.id] = finalizedTx + return info.id, nil }) } @@ -455,14 +461,14 @@ func (c *pendingTxContext) OnPrebroadcastError(id string, retentionTimeout time. func (c *pendingTxContext) OnError(sig solana.Signature, retentionTimeout time.Duration, txState TxState, _ TxErrType) (string, error) { err := c.withReadLock(func() error { - txInfo, sigExists := c.sigToTxInfo[sig] + info, sigExists := c.sigToTxInfo[sig] if !sigExists { return ErrSigDoesNotExist } // transaction can transition from any non-finalized state var broadcastedExists, confirmedExists bool - _, broadcastedExists = c.broadcastedProcessedTxs[txInfo.id] - _, confirmedExists = c.confirmedTxs[txInfo.id] + _, broadcastedExists = c.broadcastedProcessedTxs[info.id] + _, confirmedExists = c.confirmedTxs[info.id] // transcation does not exist in any tx maps if !broadcastedExists && !confirmedExists { return ErrTransactionNotFound @@ -475,16 +481,16 @@ func (c *pendingTxContext) OnError(sig solana.Signature, retentionTimeout time.D // upgrade to write lock if sig exists return c.withWriteLock(func() (string, error) { - txInfo, exists := c.sigToTxInfo[sig] + info, exists := c.sigToTxInfo[sig] if !exists { return "", ErrSigDoesNotExist } var tx, tempTx pendingTx var broadcastedExists, confirmedExists bool - if tempTx, broadcastedExists = c.broadcastedProcessedTxs[txInfo.id]; broadcastedExists { + if tempTx, broadcastedExists = c.broadcastedProcessedTxs[info.id]; broadcastedExists { tx = tempTx } - if tempTx, confirmedExists = c.confirmedTxs[txInfo.id]; confirmedExists { + if tempTx, confirmedExists = c.confirmedTxs[info.id]; confirmedExists { tx = tempTx } // transcation does not exist in any non-finalized maps @@ -492,29 +498,29 @@ func (c *pendingTxContext) OnError(sig solana.Signature, retentionTimeout time.D return "", ErrTransactionNotFound } // call cancel func + remove from map - if cancel, exists := c.cancelBy[txInfo.id]; exists { + if cancel, exists := c.cancelBy[info.id]; exists { cancel() // cancel context - delete(c.cancelBy, txInfo.id) + delete(c.cancelBy, info.id) } // delete from broadcasted map, if exists - delete(c.broadcastedProcessedTxs, txInfo.id) + delete(c.broadcastedProcessedTxs, info.id) // delete from confirmed map, if exists - delete(c.confirmedTxs, txInfo.id) + delete(c.confirmedTxs, info.id) // remove all related signatures from the sigToTxInfo map to skip picking up this tx in the confirmation logic for _, s := range tx.signatures { delete(c.sigToTxInfo, s) } // if retention duration is set to 0, skip adding transaction to the errored map if retentionTimeout == 0 { - return txInfo.id, nil + return info.id, nil } erroredTx := finishedTx{ state: txState, retentionTs: time.Now().Add(retentionTimeout), } // move transaction from broadcasted to error map - c.finalizedErroredTxs[txInfo.id] = erroredTx - return txInfo.id, nil + c.finalizedErroredTxs[info.id] = erroredTx + return info.id, nil }) } @@ -561,6 +567,107 @@ func (c *pendingTxContext) TrimFinalizedErroredTxs() int { return len(expiredIDs) } +func (c *pendingTxContext) GetSignatureInfo(sig solana.Signature) (txInfo, error) { + c.lock.RLock() + defer c.lock.RUnlock() + + info, exists := c.sigToTxInfo[sig] + if !exists { + return txInfo{}, ErrSigDoesNotExist + } + return info, nil +} + +func (c *pendingTxContext) UpdateSignatureStatus(sig solana.Signature, newStatus TxState) (string, error) { + // First, acquire a read lock to check if the signature exists and needs to be updated + err := c.withReadLock(func() error { + info, exists := c.sigToTxInfo[sig] + if !exists { + return ErrSigDoesNotExist + } + if info.status == newStatus { + return ErrAlreadyInExpectedState + } + return nil + }) + if err != nil { + return "", err + } + + // Upgrade to a write lock to perform the update + return c.withWriteLock(func() (string, error) { + info, exists := c.sigToTxInfo[sig] + if !exists { + return "", ErrSigDoesNotExist + } + if info.status == newStatus { + // Another goroutine might have updated the status; no action needed + return "", ErrAlreadyInExpectedState + } + info.status = newStatus + c.sigToTxInfo[sig] = info + return "", nil + }) +} + +func (c *pendingTxContext) OnReorg(sig solana.Signature) (pendingTx, error) { + // Acquire a read lock to check if the signature exists and needs to be reset + err := c.withReadLock(func() error { + // Check if the signature is still being tracked + info, exists := c.sigToTxInfo[sig] + if !exists { + return ErrSigDoesNotExist + } + + // Check if the transaction is still in a non finalized/errored state + var broadcastedExists, confirmedExists bool + _, broadcastedExists = c.broadcastedProcessedTxs[info.id] + _, confirmedExists = c.confirmedTxs[info.id] + if !broadcastedExists && !confirmedExists { + return ErrTransactionNotFound + } + return nil + }) + if err != nil { + // If transaction or sig are not found, return + return pendingTx{}, err + } + + var pTx pendingTx + // Acquire a write lock to perform the state reset + _, err = c.withWriteLock(func() (string, error) { + // Retrieve sig and tx again inside the write lock + info, exists := c.sigToTxInfo[sig] + if !exists { + return "", ErrSigDoesNotExist + } + var tx pendingTx + var broadcastedExists, confirmedExists bool + if tx, broadcastedExists = c.broadcastedProcessedTxs[info.id]; broadcastedExists { + pTx = tx + } + if tx, confirmedExists = c.confirmedTxs[info.id]; confirmedExists { + pTx = tx + } + if !broadcastedExists && !confirmedExists { + // transcation does not exist in any non finalized/errored maps + return "", ErrTransactionNotFound + } + + // Reset the signature and tx status for retrying + info.status, pTx.state = Broadcasted, Broadcasted + c.sigToTxInfo[sig] = info + return "", nil + }) + if err != nil { + // If transaction or sig were not found, return + return pendingTx{}, err + } + + // Return the transaction for retrying + return pTx, nil +} + func (c *pendingTxContext) withReadLock(fn func() error) error { c.lock.RLock() defer c.lock.RUnlock() @@ -687,3 +794,15 @@ func (c *pendingTxContextWithProm) GetTxState(id string) (TxState, error) { func (c *pendingTxContextWithProm) TrimFinalizedErroredTxs() int { return c.pendingTx.TrimFinalizedErroredTxs() } + +func (c *pendingTxContextWithProm) GetSignatureInfo(sig solana.Signature) (txInfo, error) { + return c.pendingTx.GetSignatureInfo(sig) +} + +func (c *pendingTxContextWithProm) UpdateSignatureStatus(sig solana.Signature, newStatus TxState) (string, error) { + return c.pendingTx.UpdateSignatureStatus(sig, newStatus) +} + +func (c *pendingTxContextWithProm) OnReorg(sig solana.Signature) (pendingTx, error) { + return c.pendingTx.OnReorg(sig) +} diff --git a/pkg/solana/txm/txm.go b/pkg/solana/txm/txm.go index 3ba39f2f5..062f2ad40 100644 --- a/pkg/solana/txm/txm.go +++ b/pkg/solana/txm/txm.go @@ -224,18 +224,11 @@ func (txm *Txm) sendWithRetry(ctx context.Context, msg pendingTx) (solanaGo.Tran txm.lggr.Debugw("tx initial broadcast", "id", msg.id, "fee", msg.cfg.BaseComputeUnitPrice, "signature", sig) - // Initialize signature list with initialTx signature. This list will be used to add new signatures and track retry attempts. - sigs := &signatureList{} - sigs.Allocate() - if initSetErr := sigs.Set(0, sig); initSetErr != nil { - return solanaGo.Transaction{}, "", solanaGo.Signature{}, fmt.Errorf("failed to save initial signature in signature list: %w", initSetErr) - } - // pass in copy of msg (to build new tx with bumped fee) and broadcasted tx == initTx (to retry tx without bumping) txm.done.Add(1) go func() { defer txm.done.Done() - txm.retryTx(ctx, msg, initTx, sigs) + txm.retryTx(ctx, msg, initTx, sig) }() // Return signed tx, id, signature for use in simulation @@ -286,7 +279,15 @@ func (txm *Txm) buildTx(ctx context.Context, msg pendingTx, retryCount int) (sol // retryTx contains the logic for retrying the transaction, including exponential backoff and fee bumping. // Retries until context cancelled by timeout or called externally. // It uses handleRetry helper function to handle each retry attempt. -func (txm *Txm) retryTx(ctx context.Context, msg pendingTx, currentTx solanaGo.Transaction, sigs *signatureList) { +func (txm *Txm) retryTx(ctx context.Context, msg pendingTx, currentTx solanaGo.Transaction, sig solanaGo.Signature) { + // Initialize signature list with initialTx signature. This list will be used to add new signatures and track retry attempts. + sigs := &signatureList{} + sigs.Allocate() + if initSetErr := sigs.Set(0, sig); initSetErr != nil { + txm.lggr.Errorw("failed to save initial signature in signature list", "error", initSetErr) + return + } + deltaT := 1 // initial delay in ms tick := time.After(0) bumpCount := 0 @@ -463,6 +464,12 @@ func (txm *Txm) processConfirmations(ctx context.Context, client client.ReaderWr continue } + // check if a potential re-org has occurred for this sig and handle it + err := txm.handleReorg(ctx, sig, status) + if err != nil { + continue + } + switch status.ConfirmationStatus { case rpc.ConfirmationStatusProcessed: // if signature is processed, keep polling for confirmed or finalized status @@ -522,6 +529,41 @@ func (txm *Txm) handleErrorSignatureStatus(sig solanaGo.Signature, status *rpc.S } } +// handleReorg handles the case where a transaction signature is in a potential reorg state on-chain. +// It updates the transaction state in the local memory and restarts the retry/bumping cycle for the transaction associated to that sig. +func (txm *Txm) handleReorg(ctx context.Context, sig solanaGo.Signature, status *rpc.SignatureStatusesResult) error { + // Retrieve last seen status for the tx associated to this sig in our in-memory layer. + txInfo, err := txm.txs.GetSignatureInfo(sig) + if err != nil { + txm.lggr.Errorw("failed to get signature info when checking for potential re-orgs", "signature", sig, "error", err) + return err + } + + // Check if tx has been reorged by detecting if we had a status regression + // If so, we'll handle the reorg by updating the status in our in-memory layer and retrying the transaction for that sig. + currentTxState := convertStatus(status) + if isStatusRegression(txInfo.status, currentTxState) { + txm.lggr.Warnw("potential re-org detected for transaction", "txID", txInfo.id, "signature", sig, "previousStatus", txInfo.status, "currentStatus", currentTxState) + // Update status for the tx associated to this sig in our in-memory layer with last seen on-chain status. + _, err = txm.txs.UpdateSignatureStatus(sig, currentTxState) + if err != nil { + txm.lggr.Errorw("failed to update signature status", "signature", sig, "error", err) + return err + } + + // Handle reorg in our in memory layer and retry transaction + pTx, err := txm.txs.OnReorg(sig) + if err != nil { + txm.lggr.Errorw("failed to handle potential re-org", "signature", sig, "id", pTx.id, "error", err) + return err + } + retryCtx, _ := context.WithTimeout(ctx, pTx.cfg.Timeout) // TODO: Ask here. How should we handle the ctx? + txm.retryTx(retryCtx, pTx, pTx.tx, sig) + } + + return nil +} + // handleProcessedSignatureStatus handles the case where a transaction signature is in the "processed" state on-chain. // It updates the transaction state in the local memory and checks if the confirmation timeout has been exceeded. // If the timeout is exceeded, it marks the transaction as errored. diff --git a/pkg/solana/txm/utils.go b/pkg/solana/txm/utils.go index fef260e3d..33e59a64e 100644 --- a/pkg/solana/txm/utils.go +++ b/pkg/solana/txm/utils.go @@ -111,6 +111,34 @@ func convertStatus(res *rpc.SignatureStatusesResult) TxState { return NotFound } +// isStatusRegression checks if the current status is a regression compared to the previous status: +// - Finalized -> Confirmed, Processed, Broadcasted: should not regress +// - Confirmed -> Processed, Broadcasted: should not regress +// - Processed -> Broadcasted: should not regress +// Returns true if a regression is detected, indicating a possible re-org. +func isStatusRegression(previous, current TxState) bool { + switch previous { + case Finalized: + // Finalized transactions should not regress. + if current != Finalized { + return true + } + case Confirmed: + // Confirmed transactions should not regress to Processed or Broadcasted. + if current != Confirmed && current != Finalized { + return true + } + case Processed: + // Processed transactions should not regress to Broadcasted. + if current != Processed && current != Confirmed && current != Finalized { + return true + } + default: + return false + } + return false +} + type signatureList struct { sigs []solana.Signature lock sync.RWMutex