Skip to content

Commit

Permalink
feat: nonce handling with signer (#3196)
Browse files Browse the repository at this point in the history
Closes: #1910

This covers most cases by serializing the actual broadcasts to the
consensus node and enabling resubmissions in the case that there is a
sequence mismatch.

This covers most fail cases with the possible exception of proposal
nodes receiving the transactions in the reverse order to the initial
nodes that the user broadcasted to

There are also some interesting side affects that need to be handled
when an existing accepted transaction is later kicked out of the mempool
via CheckTx but overall I think this is a huge improvement for the UX of
users
  • Loading branch information
cmwaters authored Mar 26, 2024
1 parent f23cf46 commit deefb54
Show file tree
Hide file tree
Showing 14 changed files with 450 additions and 136 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ test-race:
# TODO: Remove the -skip flag once the following tests no longer contain data races.
# https://github.com/celestiaorg/celestia-app/issues/1369
@echo "--> Running tests in race mode"
@go test ./... -v -race -skip "TestPrepareProposalConsistency|TestIntegrationTestSuite|TestBlobstreamRPCQueries|TestSquareSizeIntegrationTest|TestStandardSDKIntegrationTestSuite|TestTxsimCommandFlags|TestTxsimCommandEnvVar|TestMintIntegrationTestSuite|TestBlobstreamCLI|TestUpgrade|TestMaliciousTestNode|TestBigBlobSuite|TestQGBIntegrationSuite|TestSignerTestSuite|TestPriorityTestSuite|TestTimeInPrepareProposalContext|TestBlobstream|TestCLITestSuite|TestLegacyUpgrade"
@go test ./... -v -race -skip "TestPrepareProposalConsistency|TestIntegrationTestSuite|TestBlobstreamRPCQueries|TestSquareSizeIntegrationTest|TestStandardSDKIntegrationTestSuite|TestTxsimCommandFlags|TestTxsimCommandEnvVar|TestMintIntegrationTestSuite|TestBlobstreamCLI|TestUpgrade|TestMaliciousTestNode|TestBigBlobSuite|TestQGBIntegrationSuite|TestSignerTestSuite|TestPriorityTestSuite|TestTimeInPrepareProposalContext|TestBlobstream|TestCLITestSuite|TestLegacyUpgrade|TestSignerTwins|TestConcurrentTxSubmission"
.PHONY: test-race

## test-bench: Run unit tests in bench mode.
Expand Down
4 changes: 1 addition & 3 deletions app/errors/insufficient_gas_price_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ func TestInsufficientMinGasPriceIntegration(t *testing.T) {
msg, err := blob.NewMsgPayForBlobs(signer.Address().String(), appconsts.LatestVersion, b)
require.NoError(t, err)

tx, err := signer.CreateTx([]sdk.Msg{msg}, user.SetGasLimit(gasLimit), user.SetFeeAmount(fee))
require.NoError(t, err)
sdkTx, err := enc.TxConfig.TxDecoder()(tx)
sdkTx, err := signer.CreateTx([]sdk.Msg{msg}, user.SetGasLimit(gasLimit), user.SetFeeAmount(fee))
require.NoError(t, err)

decorator := ante.NewDeductFeeDecorator(testApp.AccountKeeper, testApp.BankKeeper, testApp.FeeGrantKeeper, nil)
Expand Down
20 changes: 18 additions & 2 deletions app/errors/nonce_mismatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"strconv"
"strings"

sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)
Expand All @@ -13,16 +14,31 @@ func IsNonceMismatch(err error) bool {
return errors.Is(err, sdkerrors.ErrWrongSequence)
}

// IsNonceMismatch checks if the error code matches the sequence mismatch.
func IsNonceMismatchCode(code uint32) bool {
return code == sdkerrors.ErrWrongSequence.ABCICode()
}

// ParseNonceMismatch extracts the expected sequence number from the
// ErrWrongSequence error.
func ParseNonceMismatch(err error) (uint64, error) {
if !IsNonceMismatch(err) {
return 0, errors.New("error is not a sequence mismatch")
}

numbers := regexpInt.FindAllString(err.Error(), -1)
return ParseExpectedSequence(err.Error())
}

// ParseExpectedSequence extracts the expected sequence number from the
// ErrWrongSequence error.
func ParseExpectedSequence(str string) (uint64, error) {
if !strings.HasPrefix(str, "account sequence mismatch") {
return 0, fmt.Errorf("unexpected wrong sequence error: %s", str)
}

numbers := regexpInt.FindAllString(str, -1)
if len(numbers) != 2 {
return 0, fmt.Errorf("unexpected wrong sequence error: %w", err)
return 0, fmt.Errorf("expected two numbers in string, got %d", len(numbers))
}

// the first number is the expected sequence number
Expand Down
4 changes: 1 addition & 3 deletions app/errors/nonce_mismatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ func TestNonceMismatchIntegration(t *testing.T) {
msg, err := blob.NewMsgPayForBlobs(signer.Address().String(), appconsts.LatestVersion, b)
require.NoError(t, err)

tx, err := signer.CreateTx([]sdk.Msg{msg})
require.NoError(t, err)
sdkTx, err := enc.TxConfig.TxDecoder()(tx)
sdkTx, err := signer.CreateTx([]sdk.Msg{msg})
require.NoError(t, err)

decorator := ante.NewSigVerificationDecorator(testApp.AccountKeeper, encCfg.TxConfig.SignModeHandler())
Expand Down
7 changes: 3 additions & 4 deletions app/test/big_blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/celestiaorg/celestia-app/app"
"github.com/celestiaorg/celestia-app/app/encoding"
"github.com/celestiaorg/celestia-app/pkg/appconsts"
"github.com/celestiaorg/celestia-app/pkg/user"
"github.com/celestiaorg/celestia-app/test/util/testfactory"
"github.com/celestiaorg/celestia-app/test/util/testnode"
Expand Down Expand Up @@ -77,12 +78,10 @@ func (s *BigBlobSuite) TestErrBlobsTooLarge() {

for _, tc := range testCases {
s.Run(tc.name, func() {
blobTx, err := signer.CreatePayForBlob([]*blob.Blob{tc.blob}, user.SetGasLimit(1e9), user.SetFee(2000000))
require.NoError(t, err)
subCtx, cancel := context.WithTimeout(s.cctx.GoContext(), 30*time.Second)
defer cancel()
res, err := signer.BroadcastTx(subCtx, blobTx)
require.NoError(t, err)
res, err := signer.SubmitPayForBlob(subCtx, []*blob.Blob{tc.blob}, user.SetGasLimitAndFee(1e9, appconsts.DefaultGlobalMinGasPrice))
require.Error(t, err)
require.NotNil(t, res)
require.Equal(t, tc.want, res.Code, res.Logs)
})
Expand Down
55 changes: 30 additions & 25 deletions app/test/priority_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package app_test
import (
"encoding/hex"
"sort"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -71,42 +72,46 @@ func (s *PriorityTestSuite) TestPriorityByGasPrice() {
t := s.T()

// quickly submit blobs with a random fee
hashes := make([]string, 0, len(s.signers))

hashes := make(chan string, len(s.signers))
blobSize := uint32(100)
gasLimit := blobtypes.DefaultEstimateGas([]uint32{blobSize})
wg := &sync.WaitGroup{}
for _, signer := range s.signers {
blobSize := uint32(100)
gasLimit := blobtypes.DefaultEstimateGas([]uint32{blobSize})
gasPrice := s.rand.Float64()
btx, err := signer.CreatePayForBlob(
blobfactory.ManyBlobs(
s.rand,
[]namespace.Namespace{namespace.RandomBlobNamespace()},
[]int{100}),
user.SetGasLimitAndFee(gasLimit, gasPrice),
)
require.NoError(t, err)
resp, err := signer.BroadcastTx(s.cctx.GoContext(), btx)
require.NoError(t, err)
require.Equal(t, abci.CodeTypeOK, resp.Code, resp.RawLog)
hashes = append(hashes, resp.TxHash)
wg.Add(1)
go func() {
defer wg.Done()
gasPrice := float64(s.rand.Intn(1000)+1) / 1000
resp, err := signer.SubmitPayForBlob(
s.cctx.GoContext(),
blobfactory.ManyBlobs(
s.rand,
[]namespace.Namespace{namespace.RandomBlobNamespace()},
[]int{100}),
user.SetGasLimitAndFee(gasLimit, gasPrice),
)
require.NoError(t, err)
require.Equal(t, abci.CodeTypeOK, resp.Code, resp.RawLog)
hashes <- resp.TxHash
}()
}

wg.Wait()
close(hashes)

err := s.cctx.WaitForNextBlock()
require.NoError(t, err)

// get the responses for each tx for analysis and sort by height
// note: use rpc types because they contain the tx index
heightMap := make(map[int64][]*rpctypes.ResultTx)
for _, hash := range hashes {
resp, err := s.signers[0].ConfirmTx(s.cctx.GoContext(), hash)
require.NoError(t, err)
require.NotNil(t, resp)
require.Equal(t, abci.CodeTypeOK, resp.Code)
for hash := range hashes {
// use the core rpc type because it contains the tx index
hash, err := hex.DecodeString(hash)
require.NoError(t, err)
coreRes, err := s.cctx.Client.Tx(s.cctx.GoContext(), hash, false)
require.NoError(t, err)
heightMap[resp.Height] = append(heightMap[resp.Height], coreRes)
heightMap[coreRes.Height] = append(heightMap[coreRes.Height], coreRes)
}
require.GreaterOrEqual(t, len(heightMap), 1)

Expand All @@ -123,7 +128,7 @@ func (s *PriorityTestSuite) TestPriorityByGasPrice() {

// check that there was at least one block with more than three transactions
// in it. This is more of a sanity check than a test.
require.True(t, highestNumOfTxsPerBlock > 3)
require.Greater(t, highestNumOfTxsPerBlock, 3)
}

func sortByIndex(txs []*rpctypes.ResultTx) []*rpctypes.ResultTx {
Expand All @@ -135,14 +140,14 @@ func sortByIndex(txs []*rpctypes.ResultTx) []*rpctypes.ResultTx {

func isSortedByFee(t *testing.T, ecfg encoding.Config, responses []*rpctypes.ResultTx) bool {
for i := 0; i < len(responses)-1; i++ {
if gasPrice(t, ecfg, responses[i]) <= gasPrice(t, ecfg, responses[i+1]) {
if getGasPrice(t, ecfg, responses[i]) <= getGasPrice(t, ecfg, responses[i+1]) {
return false
}
}
return true
}

func gasPrice(t *testing.T, ecfg encoding.Config, resp *rpctypes.ResultTx) float64 {
func getGasPrice(t *testing.T, ecfg encoding.Config, resp *rpctypes.ResultTx) float64 {
sdkTx, err := ecfg.TxConfig.TxDecoder()(resp.Tx)
require.NoError(t, err)
feeTx := sdkTx.(sdk.FeeTx)
Expand Down
1 change: 1 addition & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,7 @@ github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy
github.com/moricho/tparallel v0.3.0/go.mod h1:leENX2cUv7Sv2qDgdi0D0fCftN8fRC67Bcn8pqzeYNI=
github.com/moul/http2curl v1.0.0/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOAZ3H0fQ=
github.com/mrunalp/fileutils v0.5.0/go.mod h1:M1WthSahJixYnrXQl/DFQuteStB1weuxD2QJNHXfbSQ=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus=
github.com/nakabonne/nestif v0.3.1/go.mod h1:9EtoZochLn5iUprVDmDjqGKPofoUEBL8U4Ngq6aY7OE=
github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
github.com/nats-io/nats-server/v2 v2.5.0/go.mod h1:Kj86UtrXAL6LwYRA6H4RqzkHhK0Vcv2ZnKD5WbQ1t3g=
Expand Down
100 changes: 100 additions & 0 deletions pkg/user/e2e_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package user_test

import (
"context"
"errors"
"sync"
"testing"
"time"

"github.com/celestiaorg/celestia-app/pkg/appconsts"
"github.com/celestiaorg/celestia-app/pkg/user"
"github.com/celestiaorg/celestia-app/test/util/blobfactory"
"github.com/celestiaorg/celestia-app/test/util/testnode"
"github.com/celestiaorg/go-square/blob"
"github.com/stretchr/testify/require"
tmrand "github.com/tendermint/tendermint/libs/rand"
)

func TestConcurrentTxSubmission(t *testing.T) {
if testing.Short() {
t.Skip("skipping in short mode")
}

// Setup network
tmConfig := testnode.DefaultTendermintConfig()
tmConfig.Consensus.TimeoutCommit = 10 * time.Second
ctx, _, _ := testnode.NewNetwork(t, testnode.DefaultConfig().WithTendermintConfig(tmConfig))
_, err := ctx.WaitForHeight(1)
require.NoError(t, err)

// Setup signer
signer, err := testnode.NewSingleSignerFromContext(ctx)
require.NoError(t, err)

// Pregenerate all the blobs
numTxs := 10
blobs := blobfactory.ManyRandBlobs(tmrand.NewRand(), blobfactory.Repeat(2048, numTxs)...)

// Prepare transactions
var (
wg sync.WaitGroup
errCh = make(chan error)
)

subCtx, cancel := context.WithCancel(ctx.GoContext())
defer cancel()
time.AfterFunc(time.Minute, cancel)
for i := 0; i < numTxs; i++ {
wg.Add(1)
go func(b *blob.Blob) {
defer wg.Done()
_, err := signer.SubmitPayForBlob(subCtx, []*blob.Blob{b}, user.SetGasLimitAndFee(500_000, appconsts.DefaultGlobalMinGasPrice))
if err != nil && !errors.Is(err, context.Canceled) {
// only catch the first error
select {
case errCh <- err:
cancel()
default:
}
}
}(blobs[i])
}
wg.Wait()

select {
case err := <-errCh:
require.NoError(t, err)
default:
}
}

func TestSignerTwins(t *testing.T) {
if testing.Short() {
t.Skip("skipping in short mode")
}

// Setup network
tmConfig := testnode.DefaultTendermintConfig()
tmConfig.Consensus.TimeoutCommit = 10 * time.Second
ctx, _, _ := testnode.NewNetwork(t, testnode.DefaultConfig().WithTendermintConfig(tmConfig))
_, err := ctx.WaitForHeight(1)
require.NoError(t, err)

signer1, err := testnode.NewSingleSignerFromContext(ctx)
require.NoError(t, err)
signer2, err := testnode.NewSingleSignerFromContext(ctx)
require.NoError(t, err)

blobs := blobfactory.ManyRandBlobs(tmrand.NewRand(), blobfactory.Repeat(2048, 8)...)

_, err = signer1.SubmitPayForBlob(ctx.GoContext(), blobs[:1], user.SetGasLimitAndFee(500_000, appconsts.DefaultGlobalMinGasPrice))
require.NoError(t, err)

_, err = signer2.SubmitPayForBlob(ctx.GoContext(), blobs[1:3], user.SetGasLimitAndFee(500_000, appconsts.DefaultGlobalMinGasPrice))
require.NoError(t, err)

signer1.ForceSetSequence(4)
_, err = signer1.SubmitPayForBlob(ctx.GoContext(), blobs[3:5], user.SetGasLimitAndFee(500_000, appconsts.DefaultGlobalMinGasPrice))
require.NoError(t, err)
}
Loading

0 comments on commit deefb54

Please sign in to comment.