Skip to content

Commit

Permalink
feat: nonce handling with signer (backport #3196) (#3213)
Browse files Browse the repository at this point in the history
Closes: #1910

This covers most cases by serializing the actual broadcasts to the
consensus node and enabling resubmissions in the case that there is a
sequence mismatch.

This covers most fail cases with the possible exception of proposal
nodes receiving the transactions in the reverse order to the initial
nodes that the user broadcasted to

There are also some interesting side affects that need to be handled
when an existing accepted transaction is later kicked out of the mempool
via CheckTx but overall I think this is a huge improvement for the UX of
users<hr>This is an automatic backport of pull request #3196 done by
[Mergify](https://mergify.com).

Co-authored-by: Callum Waters <[email protected]>
  • Loading branch information
mergify[bot] and cmwaters authored Apr 4, 2024
1 parent 1ada3c2 commit 4118c49
Show file tree
Hide file tree
Showing 7 changed files with 417 additions and 127 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ test-short:
## test-race: Run unit tests in race mode.
test-race:
@echo "--> Running tests in race mode"
@go test ./... -v -race -skip "TestPrepareProposalConsistency|TestIntegrationTestSuite|TestQGBRPCQueries|TestSquareSizeIntegrationTest|TestStandardSDKIntegrationTestSuite|TestTxsimCommandFlags|TestTxsimCommandEnvVar|TestMintIntegrationTestSuite|TestQGBCLI|TestUpgrade|TestMaliciousTestNode|TestMaxTotalBlobSizeSuite|TestQGBIntegrationSuite|TestSignerTestSuite|TestPriorityTestSuite|TestTimeInPrepareProposalContext"
@go test ./... -v -race -skip "TestPrepareProposalConsistency|TestIntegrationTestSuite|TestQGBRPCQueries|TestSquareSizeIntegrationTest|TestStandardSDKIntegrationTestSuite|TestTxsimCommandFlags|TestTxsimCommandEnvVar|TestMintIntegrationTestSuite|TestQGBCLI|TestUpgrade|TestMaliciousTestNode|TestMaxTotalBlobSizeSuite|TestQGBIntegrationSuite|TestSignerTestSuite|TestPriorityTestSuite|TestTimeInPrepareProposalContext|TestConcurrentTxSubmission"
.PHONY: test-race

## test-bench: Run unit tests in bench mode.
Expand Down
20 changes: 18 additions & 2 deletions app/errors/nonce_mismatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"strconv"
"strings"

sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)
Expand All @@ -13,16 +14,31 @@ func IsNonceMismatch(err error) bool {
return errors.Is(err, sdkerrors.ErrWrongSequence)
}

// IsNonceMismatch checks if the error code matches the sequence mismatch.
func IsNonceMismatchCode(code uint32) bool {
return code == sdkerrors.ErrWrongSequence.ABCICode()
}

// ParseNonceMismatch extracts the expected sequence number from the
// ErrWrongSequence error.
func ParseNonceMismatch(err error) (uint64, error) {
if !IsNonceMismatch(err) {
return 0, errors.New("error is not a sequence mismatch")
}

numbers := regexpInt.FindAllString(err.Error(), -1)
return ParseExpectedSequence(err.Error())
}

// ParseExpectedSequence extracts the expected sequence number from the
// ErrWrongSequence error.
func ParseExpectedSequence(str string) (uint64, error) {
if !strings.HasPrefix(str, "account sequence mismatch") {
return 0, fmt.Errorf("unexpected wrong sequence error: %s", str)
}

numbers := regexpInt.FindAllString(str, -1)
if len(numbers) != 2 {
return 0, fmt.Errorf("unexpected wrong sequence error: %w", err)
return 0, fmt.Errorf("expected two numbers in string, got %d", len(numbers))
}

// the first number is the expected sequence number
Expand Down
57 changes: 31 additions & 26 deletions app/test/priority_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package app_test
import (
"encoding/hex"
"sort"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -70,43 +71,47 @@ func (s *PriorityTestSuite) TestPriorityByGasPrice() {
t := s.T()

// quickly submit blobs with a random fee
hashes := make([]string, 0, len(s.signers))

hashes := make(chan string, len(s.signers))
blobSize := uint32(100)
gasLimit := blobtypes.DefaultEstimateGas([]uint32{blobSize})
wg := &sync.WaitGroup{}
for _, signer := range s.signers {
blobSize := uint32(100)
gasLimit := blobtypes.DefaultEstimateGas([]uint32{blobSize})
gasPrice := s.rand.Float64()
btx, err := signer.CreatePayForBlob(
blobfactory.ManyBlobs(
t,
s.rand,
[]namespace.Namespace{namespace.RandomBlobNamespace()},
[]int{100}),
user.SetGasLimitAndFee(gasLimit, gasPrice),
)
require.NoError(t, err)
resp, err := signer.BroadcastTx(s.cctx.GoContext(), btx)
require.NoError(t, err)
require.Equal(t, abci.CodeTypeOK, resp.Code)
hashes = append(hashes, resp.TxHash)
wg.Add(1)
go func() {
defer wg.Done()
gasPrice := float64(s.rand.Intn(1000)+1) / 1000
resp, err := signer.SubmitPayForBlob(
s.cctx.GoContext(),
blobfactory.ManyBlobs(
t,
s.rand,
[]namespace.Namespace{namespace.RandomBlobNamespace()},
[]int{100}),
user.SetGasLimitAndFee(gasLimit, gasPrice),
)
require.NoError(t, err)
require.Equal(t, abci.CodeTypeOK, resp.Code, resp.RawLog)
hashes <- resp.TxHash
}()
}

wg.Wait()
close(hashes)

err := s.cctx.WaitForNextBlock()
require.NoError(t, err)

// get the responses for each tx for analysis and sort by height
// note: use rpc types because they contain the tx index
heightMap := make(map[int64][]*rpctypes.ResultTx)
for _, hash := range hashes {
resp, err := s.signers[0].ConfirmTx(s.cctx.GoContext(), hash)
require.NoError(t, err)
require.NotNil(t, resp)
require.Equal(t, abci.CodeTypeOK, resp.Code)
for hash := range hashes {
// use the core rpc type because it contains the tx index
hash, err := hex.DecodeString(hash)
require.NoError(t, err)
coreRes, err := s.cctx.Client.Tx(s.cctx.GoContext(), hash, false)
require.NoError(t, err)
heightMap[resp.Height] = append(heightMap[resp.Height], coreRes)
heightMap[coreRes.Height] = append(heightMap[coreRes.Height], coreRes)
}
require.GreaterOrEqual(t, len(heightMap), 1)

Expand All @@ -123,7 +128,7 @@ func (s *PriorityTestSuite) TestPriorityByGasPrice() {

// check that there was at least one block with more than three transactions
// in it. This is more of a sanity check than a test.
require.True(t, highestNumOfTxsPerBlock > 3)
require.Greater(t, highestNumOfTxsPerBlock, 3)
}

func sortByIndex(txs []*rpctypes.ResultTx) []*rpctypes.ResultTx {
Expand All @@ -135,14 +140,14 @@ func sortByIndex(txs []*rpctypes.ResultTx) []*rpctypes.ResultTx {

func isSortedByFee(t *testing.T, ecfg encoding.Config, responses []*rpctypes.ResultTx) bool {
for i := 0; i < len(responses)-1; i++ {
if gasPrice(t, ecfg, responses[i]) <= gasPrice(t, ecfg, responses[i+1]) {
if getGasPrice(t, ecfg, responses[i]) <= getGasPrice(t, ecfg, responses[i+1]) {
return false
}
}
return true
}

func gasPrice(t *testing.T, ecfg encoding.Config, resp *rpctypes.ResultTx) float64 {
func getGasPrice(t *testing.T, ecfg encoding.Config, resp *rpctypes.ResultTx) float64 {
sdkTx, err := ecfg.TxConfig.TxDecoder()(resp.Tx)
require.NoError(t, err)
feeTx := sdkTx.(sdk.FeeTx)
Expand Down
85 changes: 85 additions & 0 deletions pkg/user/e2e_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package user_test

import (
"context"
"errors"
"sync"
"testing"
"time"

"github.com/celestiaorg/celestia-app/app"
"github.com/celestiaorg/celestia-app/app/encoding"
"github.com/celestiaorg/celestia-app/pkg/appconsts"
"github.com/celestiaorg/celestia-app/pkg/user"
"github.com/celestiaorg/celestia-app/test/util/blobfactory"
"github.com/celestiaorg/celestia-app/test/util/testnode"
"github.com/stretchr/testify/require"
tmrand "github.com/tendermint/tendermint/libs/rand"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
)

func TestConcurrentTxSubmission(t *testing.T) {
if testing.Short() {
t.Skip("skipping in short mode")
}

// Setup network
tmConfig := testnode.DefaultTendermintConfig()
tmConfig.Consensus.TimeoutCommit = 10 * time.Second
ctx, _, _ := testnode.NewNetwork(t, testnode.DefaultConfig().WithTendermintConfig(tmConfig))
_, err := ctx.WaitForHeight(1)
require.NoError(t, err)

// Setup signer
signer, err := newSingleSignerFromContext(ctx)
require.NoError(t, err)

// Pregenerate all the blobs
numTxs := 10
blobs := blobfactory.ManyRandBlobs(t, tmrand.NewRand(), blobfactory.Repeat(2048, numTxs)...)

// Prepare transactions
var (
wg sync.WaitGroup
errCh = make(chan error)
)

subCtx, cancel := context.WithCancel(ctx.GoContext())
defer cancel()
time.AfterFunc(time.Minute, cancel)
for i := 0; i < numTxs; i++ {
wg.Add(1)
go func(b *tmproto.Blob) {
defer wg.Done()
_, err := signer.SubmitPayForBlob(subCtx, []*tmproto.Blob{b}, user.SetGasLimitAndFee(500_000, appconsts.DefaultMinGasPrice))
if err != nil && !errors.Is(err, context.Canceled) {
// only catch the first error
select {
case errCh <- err:
cancel()
default:
}
}
}(blobs[i])
}
wg.Wait()

select {
case err := <-errCh:
require.NoError(t, err)
default:
}
}

func newSingleSignerFromContext(ctx testnode.Context) (*user.Signer, error) {
encCfg := encoding.MakeConfig(app.ModuleEncodingRegisters...)
record, err := ctx.Keyring.Key("validator")
if err != nil {
return nil, err
}
address, err := record.GetAddress()
if err != nil {
return nil, err
}
return user.SetupSigner(ctx.GoContext(), ctx.Keyring, ctx.GRPCClient, address, encCfg)
}
Loading

0 comments on commit 4118c49

Please sign in to comment.