diff --git a/tests/antithesis/avalanchego/main.go b/tests/antithesis/avalanchego/main.go index e6f6b9441b5a..7bb1b81be7c0 100644 --- a/tests/antithesis/avalanchego/main.go +++ b/tests/antithesis/avalanchego/main.go @@ -66,14 +66,15 @@ func main() { kc := secp256k1fx.NewKeychain(genesis.EWOQKey) walletSyncStartTime := time.Now() - wallet := e2e.NewWallet(tc, kc, tmpnet.NodeURI{URI: c.URIs[0]}) + genesisLog := tests.NewDefaultLogger(fmt.Sprintf("worker %d", 0)) + wallet := newWallet(tc, genesisLog, kc, c.URIs[0], c.URIs) tc.Log().Info("synced wallet", zap.Duration("duration", time.Since(walletSyncStartTime)), ) genesisWorkload := &workload{ id: 0, - log: tests.NewDefaultLogger(fmt.Sprintf("worker %d", 0)), + log: genesisLog, wallet: wallet, addrs: set.Of(genesis.EWOQKey.Address()), uris: c.URIs, @@ -121,14 +122,15 @@ func main() { uri := c.URIs[i%len(c.URIs)] kc := secp256k1fx.NewKeychain(key) walletSyncStartTime := time.Now() - wallet := e2e.NewWallet(tc, kc, tmpnet.NodeURI{URI: uri}) + workloadLog := tests.NewDefaultLogger(fmt.Sprintf("worker %d", i)) + wallet := newWallet(tc, workloadLog, kc, uri, c.URIs) tc.Log().Info("synced wallet", zap.Duration("duration", time.Since(walletSyncStartTime)), ) workloads[i] = &workload{ id: i, - log: tests.NewDefaultLogger(fmt.Sprintf("worker %d", i)), + log: workloadLog, wallet: wallet, addrs: set.Of(addr), uris: c.URIs, @@ -146,6 +148,16 @@ func main() { genesisWorkload.run(ctx) } +func newWallet(tc tests.TestContext, log logging.Logger, keychain *secp256k1fx.Keychain, nodeURI string, uris []string) *primary.Wallet { + wallet := e2e.NewWallet(tc, log, keychain, tmpnet.NodeURI{URI: nodeURI}) + // Add options to ensure verification against all nodes with logging + return primary.NewWalletWithOptions( + wallet, + common.WithVerificationURIs(uris), + common.WithLog(log), + ) +} + type workload struct { id int log logging.Logger @@ -245,6 +257,7 @@ func (w *workload) executeTest(ctx context.Context) { case 5: w.log.Info("executing banff.TestCustomAssetTransfer") addr, _ := w.addrs.Peek() + // TODO(marun) need to supply node URIs banff.TestCustomAssetTransfer(tc, *w.wallet, addr) case 6: w.log.Info("sleeping") diff --git a/tests/fixture/e2e/apitest.go b/tests/fixture/e2e/apitest.go index f2f246e940f0..63df5f171083 100644 --- a/tests/fixture/e2e/apitest.go +++ b/tests/fixture/e2e/apitest.go @@ -7,6 +7,7 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/tests" "github.com/ava-labs/avalanchego/wallet/subnet/primary" + "github.com/ava-labs/avalanchego/wallet/subnet/primary/common" ) // TODO(marun) What else does a test need? e.g. node URIs? @@ -18,7 +19,17 @@ func ExecuteAPITest(apiTest APITestFunction) { tc := NewTestContext() env := GetEnv(tc) keychain := env.NewKeychain() - wallet := NewWallet(tc, keychain, env.GetRandomNodeURI()) + log := tc.Log() + wallet := NewWallet(tc, log, keychain, env.GetRandomNodeURI()) + uris := make([]string, len(env.URIs)) + for i, uri := range env.URIs { + uris[i] = uri.URI + } + wallet = primary.NewWalletWithOptions( + wallet, + common.WithVerificationURIs(uris), + common.WithLog(log), + ) apiTest(tc, *wallet, keychain.Keys[0].Address()) _ = CheckBootstrapIsPossible(tc, env.GetNetwork()) } diff --git a/tests/fixture/e2e/helpers.go b/tests/fixture/e2e/helpers.go index 8ec84a5d692e..ee24cad57d52 100644 --- a/tests/fixture/e2e/helpers.go +++ b/tests/fixture/e2e/helpers.go @@ -23,6 +23,7 @@ import ( "github.com/ava-labs/avalanchego/tests" "github.com/ava-labs/avalanchego/tests/fixture/tmpnet" "github.com/ava-labs/avalanchego/utils/crypto/secp256k1" + "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/vms/platformvm/txs/fee" "github.com/ava-labs/avalanchego/vms/secp256k1fx" "github.com/ava-labs/avalanchego/wallet/chain/p/builder" @@ -61,8 +62,8 @@ func NewPrivateKey(tc tests.TestContext) *secp256k1.PrivateKey { } // Create a new wallet for the provided keychain against the specified node URI. -func NewWallet(tc tests.TestContext, keychain *secp256k1fx.Keychain, nodeURI tmpnet.NodeURI) *primary.Wallet { - tc.Log().Info("initializing a new wallet", +func NewWallet(tc tests.TestContext, log logging.Logger, keychain *secp256k1fx.Keychain, nodeURI tmpnet.NodeURI) *primary.Wallet { + log.Info("initializing a new wallet", zap.Stringer("nodeID", nodeURI.NodeID), zap.String("URI", nodeURI.URI), ) @@ -78,7 +79,7 @@ func NewWallet(tc tests.TestContext, keychain *secp256k1fx.Keychain, nodeURI tmp baseWallet, common.WithPostIssuanceFunc( func(id ids.ID) { - tc.Log().Info("issued transaction", + log.Info("issued transaction", zap.Stringer("txID", id), ) }, diff --git a/wallet/chain/x/wallet.go b/wallet/chain/x/wallet.go index a5bac3e8b6ce..184cab8ab88b 100644 --- a/wallet/chain/x/wallet.go +++ b/wallet/chain/x/wallet.go @@ -4,7 +4,13 @@ package x import ( + "context" + "fmt" + + "go.uber.org/zap" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/vms/avm" "github.com/ava-labs/avalanchego/vms/avm/txs" "github.com/ava-labs/avalanchego/vms/components/avax" @@ -148,6 +154,7 @@ func NewWallet( builder: builder, signer: signer, client: client, + log: logging.NoLog{}, } } @@ -156,6 +163,7 @@ type wallet struct { builder builder.Builder signer signer.Signer client avm.Client + log logging.Logger } func (w *wallet) Builder() builder.Builder { @@ -293,7 +301,12 @@ func (w *wallet) IssueTx( ) error { ops := common.NewOptions(options) ctx := ops.Context() - txID, err := w.client.IssueTx(ctx, tx.Bytes()) + // TODO(marun) Connectivity problems possible + // - how to determine if an error is related to connectivity? + // - how to retry if a connectivity problem is detected? + txID, err := ops.IssuanceWrapper()(func() (ids.ID, error) { + return w.client.IssueTx(ctx, tx.Bytes()) + }) if err != nil { return err } @@ -306,9 +319,43 @@ func (w *wallet) IssueTx( return w.backend.AcceptTx(ctx, tx) } - if err := avm.AwaitTxAccepted(w.client, ctx, txID, ops.PollFrequency()); err != nil { - return err + if uris := ops.VerificationURIs(); len(uris) == 0 { + // Default case - wait for acceptance on only the API node. + if err := ops.AcceptanceWrapper()(func() error { + return avm.AwaitTxAccepted(w.client, ctx, txID, ops.PollFrequency()) + }); err != nil { + return err + } + } else { + // Verify the transaction more extensively against the provided URIs + if err := w.awaitTxAccepted(ctx, txID, uris, ops); err != nil { + return err + } } return w.backend.AcceptTx(ctx, tx) } + +// Verify the acceptance of the transaction on the provided URIs. +func (w *wallet) awaitTxAccepted(ctx context.Context, txID ids.ID, uris []string, ops *common.Options) error { + log := ops.Log() + + acceptanceWrapper := ops.AcceptanceWrapper() + for _, uri := range uris { + client := avm.NewClient(uri, "X") + if err := acceptanceWrapper(func() error { + return avm.AwaitTxAccepted(client, ctx, txID, ops.PollFrequency()) + }); err != nil { + return fmt.Errorf("failed to confirm X-chain transaction %s on %s: %w", txID, uri, err) + } + log.Info("confirmed X-chain transaction", + zap.Stringer("txID", txID), + zap.String("uri", uri), + ) + } + log.Info("confirmed X-chain transaction", + zap.Stringer("txID", txID), + ) + + return nil +} diff --git a/wallet/subnet/primary/common/options.go b/wallet/subnet/primary/common/options.go index d6803825ca1d..3648927f704a 100644 --- a/wallet/subnet/primary/common/options.go +++ b/wallet/subnet/primary/common/options.go @@ -9,6 +9,7 @@ import ( "time" "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/vms/secp256k1fx" @@ -21,6 +22,14 @@ const defaultPollFrequency = 100 * time.Millisecond // has been issued with the ID of the issued transaction. type PostIssuanceFunc func(ids.ID) +// Signature of function that can wrap a transaction issuance function +// to enable e.g. retries on recoverable errors. +type IssuanceWrapperFunc func(func() (ids.ID, error)) (ids.ID, error) + +// Signature of function that can wrap a transaction acceptance +// function to enable e.g. retries on recoverable errors. +type AcceptanceWrapperFunc func(func() error) error + type Option func(*Options) type Options struct { @@ -49,6 +58,13 @@ type Options struct { pollFrequency time.Duration postIssuanceFunc PostIssuanceFunc + + verificationURIs []string + + log logging.Logger + + issuanceWrapper IssuanceWrapperFunc + acceptanceWrapper AcceptanceWrapperFunc } func NewOptions(ops []Option) *Options { @@ -136,6 +152,35 @@ func (o *Options) PostIssuanceFunc() PostIssuanceFunc { return o.postIssuanceFunc } +func (o *Options) VerificationURIs() []string { + return o.verificationURIs +} + +func (o *Options) Log() logging.Logger { + if o.log == nil { + return logging.NoLog{} + } + return o.log +} + +func (o *Options) IssuanceWrapper() IssuanceWrapperFunc { + if o.issuanceWrapper != nil { + return func(issueFunc func() (ids.ID, error)) (ids.ID, error) { + return issueFunc() + } + } + return o.issuanceWrapper +} + +func (o *Options) AcceptanceWrapper() AcceptanceWrapperFunc { + if o.acceptanceWrapper != nil { + return func(awaitFunc func() error) error { + return awaitFunc() + } + } + return o.acceptanceWrapper +} + func WithContext(ctx context.Context) Option { return func(o *Options) { o.ctx = ctx @@ -205,3 +250,27 @@ func WithPostIssuanceFunc(f PostIssuanceFunc) Option { o.postIssuanceFunc = f } } + +func WithVerificationURIs(uris []string) Option { + return func(o *Options) { + o.verificationURIs = uris + } +} + +func WithLog(log logging.Logger) Option { + return func(o *Options) { + o.log = log + } +} + +func WithIssuanceWrapper(wrapper IssuanceWrapperFunc) Option { + return func(o *Options) { + o.issuanceWrapper = wrapper + } +} + +func WithAwaitWrapper(wrapper AcceptanceWrapperFunc) Option { + return func(o *Options) { + o.acceptanceWrapper = wrapper + } +}