diff --git a/network/p2p/acp118/aggregator.go b/network/p2p/acp118/aggregator.go new file mode 100644 index 000000000000..78bbfe4c5b1a --- /dev/null +++ b/network/p2p/acp118/aggregator.go @@ -0,0 +1,255 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package acp118 + +import ( + "context" + "errors" + "fmt" + "math/big" + + "go.uber.org/zap" + "google.golang.org/protobuf/proto" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p" + "github.com/ava-labs/avalanchego/proto/pb/sdk" + "github.com/ava-labs/avalanchego/utils/crypto/bls" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/set" + "github.com/ava-labs/avalanchego/vms/platformvm/warp" +) + +var errFailedVerification = errors.New("failed verification") + +type indexedValidator struct { + *warp.Validator + Index int +} + +type result struct { + NodeID ids.NodeID + Validator indexedValidator + Signature *bls.Signature + Err error +} + +// NewSignatureAggregator returns an instance of SignatureAggregator +func NewSignatureAggregator(log logging.Logger, client *p2p.Client) *SignatureAggregator { + return &SignatureAggregator{ + log: log, + client: client, + } +} + +// SignatureAggregator aggregates validator signatures for warp messages +type SignatureAggregator struct { + log logging.Logger + client *p2p.Client +} + +// AggregateSignatures blocks until quorumNum/quorumDen signatures from +// validators are requested to be aggregated into a warp message or the context +// is canceled. Returns the signed message and the amount of stake that signed +// the message. Caller is responsible for providing a well-formed canonical +// validator set corresponding to the signer bitset in the message. +func (s *SignatureAggregator) AggregateSignatures( + ctx context.Context, + message *warp.Message, + justification []byte, + validators []*warp.Validator, + quorumNum uint64, + quorumDen uint64, +) ( + _ *warp.Message, + aggregatedStake *big.Int, + totalStake *big.Int, + finished bool, + _ error, +) { + request := &sdk.SignatureRequest{ + Message: message.UnsignedMessage.Bytes(), + Justification: justification, + } + + requestBytes, err := proto.Marshal(request) + if err != nil { + return nil, nil, nil, false, fmt.Errorf("failed to marshal signature request: %w", err) + } + + nodeIDsToValidator := make(map[ids.NodeID]indexedValidator) + // TODO expose concrete type to avoid type casting + bitSetSignature, ok := message.Signature.(*warp.BitSetSignature) + if !ok { + return nil, nil, nil, false, errors.New("invalid warp signature type") + } + + signerBitSet := set.BitsFromBytes(bitSetSignature.Signers) + + nonSigners := make([]ids.NodeID, 0, len(validators)) + aggregatedStakeWeight := new(big.Int) + totalStakeWeight := new(big.Int) + for i, validator := range validators { + totalStakeWeight.Add(totalStakeWeight, new(big.Int).SetUint64(validator.Weight)) + + // Only try to aggregate signatures from validators that are not already in + // the signer bit set + if signerBitSet.Contains(i) { + aggregatedStakeWeight.Add(aggregatedStakeWeight, new(big.Int).SetUint64(validator.Weight)) + continue + } + + v := indexedValidator{ + Index: i, + Validator: validator, + } + + for _, nodeID := range v.NodeIDs { + nodeIDsToValidator[nodeID] = v + } + + nonSigners = append(nonSigners, v.NodeIDs...) + } + + // Account for requested signatures + the signature that was provided + signatures := make([]*bls.Signature, 0, len(nonSigners)+1) + if bitSetSignature.Signature != [bls.SignatureLen]byte{} { + blsSignature, err := bls.SignatureFromBytes(bitSetSignature.Signature[:]) + if err != nil { + return nil, nil, nil, false, fmt.Errorf("failed to parse bls signature: %w", err) + } + signatures = append(signatures, blsSignature) + } + + results := make(chan result) + handler := responseHandler{ + message: message, + nodeIDsToValidators: nodeIDsToValidator, + results: results, + } + + if err := s.client.AppRequest(ctx, set.Of(nonSigners...), requestBytes, handler.HandleResponse); err != nil { + return nil, nil, nil, false, fmt.Errorf("failed to send aggregation request: %w", err) + } + + minThreshold := new(big.Int).Mul(totalStakeWeight, new(big.Int).SetUint64(quorumNum)) + minThreshold.Div(minThreshold, new(big.Int).SetUint64(quorumDen)) + + // Block until: + // 1. The context is cancelled + // 2. We get responses from all validators + // 3. The specified security threshold is reached + for i := 0; i < len(nonSigners); i++ { + select { + case <-ctx.Done(): + // Try to return whatever progress we have if the context is cancelled + msg, err := newWarpMessage(message, signerBitSet, signatures) + if err != nil { + return nil, nil, nil, false, err + } + + return msg, aggregatedStakeWeight, totalStakeWeight, false, nil + case result := <-results: + if result.Err != nil { + s.log.Debug( + "dropping response", + zap.Stringer("nodeID", result.NodeID), + zap.Error(err), + ) + continue + } + + // Validators may share public keys so drop any duplicate signatures + if signerBitSet.Contains(result.Validator.Index) { + s.log.Debug( + "dropping duplicate signature", + zap.Stringer("nodeID", result.NodeID), + zap.Error(err), + ) + continue + } + + signatures = append(signatures, result.Signature) + signerBitSet.Add(result.Validator.Index) + aggregatedStakeWeight.Add(aggregatedStakeWeight, new(big.Int).SetUint64(result.Validator.Weight)) + + if aggregatedStakeWeight.Cmp(minThreshold) != -1 { + msg, err := newWarpMessage(message, signerBitSet, signatures) + if err != nil { + return nil, nil, nil, false, err + } + + return msg, aggregatedStakeWeight, totalStakeWeight, true, nil + } + } + } + + msg, err := newWarpMessage(message, signerBitSet, signatures) + if err != nil { + return nil, nil, nil, false, err + } + + return msg, aggregatedStakeWeight, totalStakeWeight, true, nil +} + +func newWarpMessage( + message *warp.Message, + signerBitSet set.Bits, + signatures []*bls.Signature, +) (*warp.Message, error) { + if len(signatures) == 0 { + return message, nil + } + + aggregateSignature, err := bls.AggregateSignatures(signatures) + if err != nil { + return nil, err + } + + bitSetSignature := &warp.BitSetSignature{ + Signers: signerBitSet.Bytes(), + Signature: [bls.SignatureLen]byte{}, + } + copy(bitSetSignature.Signature[:], bls.SignatureToBytes(aggregateSignature)) + + return warp.NewMessage(&message.UnsignedMessage, bitSetSignature) +} + +type responseHandler struct { + message *warp.Message + nodeIDsToValidators map[ids.NodeID]indexedValidator + results chan result +} + +func (r *responseHandler) HandleResponse( + _ context.Context, + nodeID ids.NodeID, + responseBytes []byte, + err error, +) { + validator := r.nodeIDsToValidators[nodeID] + if err != nil { + r.results <- result{NodeID: nodeID, Validator: validator, Err: err} + return + } + + response := &sdk.SignatureResponse{} + if err := proto.Unmarshal(responseBytes, response); err != nil { + r.results <- result{NodeID: nodeID, Validator: validator, Err: err} + return + } + + signature, err := bls.SignatureFromBytes(response.Signature) + if err != nil { + r.results <- result{NodeID: nodeID, Validator: validator, Err: err} + return + } + + if !bls.Verify(validator.PublicKey, signature, r.message.UnsignedMessage.Bytes()) { + r.results <- result{NodeID: nodeID, Validator: validator, Err: errFailedVerification} + return + } + + r.results <- result{NodeID: nodeID, Validator: validator, Signature: signature} +} diff --git a/network/p2p/acp118/aggregator_test.go b/network/p2p/acp118/aggregator_test.go new file mode 100644 index 000000000000..573f8f79f5b3 --- /dev/null +++ b/network/p2p/acp118/aggregator_test.go @@ -0,0 +1,513 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package acp118 + +import ( + "context" + "math/big" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p" + "github.com/ava-labs/avalanchego/network/p2p/p2ptest" + "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/utils/crypto/bls" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/set" + "github.com/ava-labs/avalanchego/vms/platformvm/warp" +) + +func TestSignatureAggregator_AggregateSignatures(t *testing.T) { + networkID := uint32(123) + chainID := ids.GenerateTestID() + + nodeID0 := ids.GenerateTestNodeID() + sk0, err := bls.NewSecretKey() + require.NoError(t, err) + pk0 := bls.PublicFromSecretKey(sk0) + signer0 := warp.NewSigner(sk0, networkID, chainID) + + nodeID1 := ids.GenerateTestNodeID() + sk1, err := bls.NewSecretKey() + require.NoError(t, err) + pk1 := bls.PublicFromSecretKey(sk1) + signer1 := warp.NewSigner(sk1, networkID, chainID) + + nodeID2 := ids.GenerateTestNodeID() + sk2, err := bls.NewSecretKey() + require.NoError(t, err) + pk2 := bls.PublicFromSecretKey(sk2) + signer2 := warp.NewSigner(sk2, networkID, chainID) + + unsignedMsg, err := warp.NewUnsignedMessage( + networkID, + chainID, + []byte("payload"), + ) + require.NoError(t, err) + + tests := []struct { + name string + peers map[ids.NodeID]p2p.Handler + ctx context.Context + signature warp.BitSetSignature + validators []*warp.Validator + quorumNum uint64 + quorumDen uint64 + wantTotalStake int + wantSigners int + wantFinished bool + wantErr error + }{ + { + name: "single validator - less than threshold", + peers: map[ids.NodeID]p2p.Handler{ + nodeID0: NewHandler(&testVerifier{Errs: []*common.AppError{common.ErrUndefined}}, signer0), + }, + ctx: context.Background(), + validators: []*warp.Validator{ + { + PublicKey: pk0, + Weight: 1, + NodeIDs: []ids.NodeID{nodeID0}, + }, + }, + wantTotalStake: 1, + wantFinished: true, + quorumNum: 1, + quorumDen: 1, + }, + { + name: "single validator - equal to threshold", + peers: map[ids.NodeID]p2p.Handler{ + nodeID0: NewHandler(&testVerifier{}, signer0), + }, + ctx: context.Background(), + validators: []*warp.Validator{ + { + PublicKey: pk0, + Weight: 1, + NodeIDs: []ids.NodeID{nodeID0}, + }, + }, + wantTotalStake: 1, + wantSigners: 1, + wantFinished: true, + quorumNum: 1, + quorumDen: 1, + }, + { + name: "single validator - greater than threshold", + peers: map[ids.NodeID]p2p.Handler{ + nodeID0: NewHandler(&testVerifier{}, signer0), + }, + ctx: context.Background(), + validators: []*warp.Validator{ + { + PublicKey: pk0, + Weight: 1, + NodeIDs: []ids.NodeID{nodeID0}, + }, + }, + wantTotalStake: 1, + wantSigners: 1, + wantFinished: true, + quorumNum: 1, + quorumDen: 2, + }, + { + name: "multiple validators - less than threshold - equal weights", + peers: map[ids.NodeID]p2p.Handler{ + nodeID0: NewHandler(&testVerifier{}, signer0), + nodeID1: NewHandler(&testVerifier{Errs: []*common.AppError{common.ErrUndefined}}, signer1), + nodeID2: NewHandler(&testVerifier{Errs: []*common.AppError{common.ErrUndefined}}, signer2), + }, + ctx: context.Background(), + validators: []*warp.Validator{ + { + PublicKey: pk0, + Weight: 1, + NodeIDs: []ids.NodeID{nodeID0}, + }, + { + PublicKey: pk1, + Weight: 1, + NodeIDs: []ids.NodeID{nodeID1}, + }, + { + PublicKey: pk2, + Weight: 1, + NodeIDs: []ids.NodeID{nodeID2}, + }, + }, + wantTotalStake: 3, + wantSigners: 1, + wantFinished: true, + quorumNum: 2, + quorumDen: 3, + }, + { + name: "multiple validators - equal to threshold - equal weights", + peers: map[ids.NodeID]p2p.Handler{ + nodeID0: NewHandler(&testVerifier{}, signer0), + nodeID1: NewHandler(&testVerifier{}, signer1), + nodeID2: NewHandler(&testVerifier{Errs: []*common.AppError{common.ErrUndefined}}, signer2), + }, + ctx: context.Background(), + validators: []*warp.Validator{ + { + PublicKey: pk0, + Weight: 1, + NodeIDs: []ids.NodeID{nodeID0}, + }, + { + PublicKey: pk1, + Weight: 1, + NodeIDs: []ids.NodeID{nodeID1}, + }, + { + PublicKey: pk2, + Weight: 1, + NodeIDs: []ids.NodeID{nodeID2}, + }, + }, + wantTotalStake: 3, + wantSigners: 2, + wantFinished: true, + quorumNum: 2, + quorumDen: 3, + }, + { + name: "multiple validators - greater than threshold - equal weights", + peers: map[ids.NodeID]p2p.Handler{ + nodeID0: NewHandler(&testVerifier{}, signer0), + nodeID1: NewHandler(&testVerifier{}, signer1), + nodeID2: NewHandler(&testVerifier{}, signer2), + }, + ctx: context.Background(), + validators: []*warp.Validator{ + { + PublicKey: pk0, + Weight: 1, + NodeIDs: []ids.NodeID{nodeID0}, + }, + { + PublicKey: pk1, + Weight: 1, + NodeIDs: []ids.NodeID{nodeID1}, + }, + { + PublicKey: pk2, + Weight: 1, + NodeIDs: []ids.NodeID{nodeID2}, + }, + }, + wantTotalStake: 3, + wantSigners: 2, + wantFinished: true, + quorumNum: 2, + quorumDen: 3, + }, + { + name: "multiple validators - less than threshold - different weights", + peers: map[ids.NodeID]p2p.Handler{ + nodeID0: NewHandler(&testVerifier{}, signer0), + nodeID1: NewHandler(&testVerifier{Errs: []*common.AppError{common.ErrUndefined}}, signer1), + nodeID2: NewHandler(&testVerifier{Errs: []*common.AppError{common.ErrUndefined}}, signer2), + }, + ctx: context.Background(), + validators: []*warp.Validator{ + { + PublicKey: pk0, + Weight: 1, + NodeIDs: []ids.NodeID{nodeID0}, + }, + { + PublicKey: pk1, + Weight: 2, + NodeIDs: []ids.NodeID{nodeID1}, + }, + { + PublicKey: pk2, + Weight: 3, + NodeIDs: []ids.NodeID{nodeID2}, + }, + }, + wantTotalStake: 6, + wantSigners: 1, + wantFinished: true, + quorumNum: 2, + quorumDen: 3, + }, + { + name: "multiple validators - equal to threshold - different weights", + peers: map[ids.NodeID]p2p.Handler{ + nodeID0: NewHandler(&testVerifier{}, signer0), + nodeID1: NewHandler(&testVerifier{}, signer1), + nodeID2: NewHandler(&testVerifier{Errs: []*common.AppError{common.ErrUndefined}}, signer2), + }, + ctx: context.Background(), + validators: []*warp.Validator{ + { + PublicKey: pk0, + Weight: 1, + NodeIDs: []ids.NodeID{nodeID0}, + }, + { + PublicKey: pk1, + Weight: 2, + NodeIDs: []ids.NodeID{nodeID1}, + }, + { + PublicKey: pk2, + Weight: 3, + NodeIDs: []ids.NodeID{nodeID2}, + }, + }, + wantTotalStake: 6, + wantSigners: 2, + wantFinished: true, + quorumNum: 1, + quorumDen: 2, + }, + { + name: "multiple validators - greater than threshold - different weights", + peers: map[ids.NodeID]p2p.Handler{ + nodeID0: NewHandler(&testVerifier{}, signer0), + nodeID1: NewHandler(&testVerifier{}, signer1), + nodeID2: NewHandler(&testVerifier{Errs: []*common.AppError{common.ErrUndefined}}, signer2), + }, + ctx: context.Background(), + validators: []*warp.Validator{ + { + PublicKey: pk0, + Weight: 1, + NodeIDs: []ids.NodeID{nodeID0}, + }, + { + PublicKey: pk1, + Weight: 2, + NodeIDs: []ids.NodeID{nodeID1}, + }, + { + PublicKey: pk2, + Weight: 7, + NodeIDs: []ids.NodeID{nodeID2}, + }, + }, + wantTotalStake: 10, + wantSigners: 2, + wantFinished: true, + quorumNum: 4, + quorumDen: 10, + }, + { + name: "multiple validators - shared public keys", + peers: map[ids.NodeID]p2p.Handler{ + nodeID0: NewHandler(&testVerifier{}, signer1), + nodeID1: NewHandler(&testVerifier{}, signer1), + nodeID2: NewHandler(&testVerifier{}, signer1), + }, + ctx: context.Background(), + validators: []*warp.Validator{ + { + PublicKey: pk1, + Weight: 1, + NodeIDs: []ids.NodeID{nodeID0, nodeID1, nodeID2}, + }, + }, + wantTotalStake: 1, + wantSigners: 1, + wantFinished: true, + quorumNum: 2, + quorumDen: 3, + }, + { + name: "multiple validators - unique and shared public keys", + peers: map[ids.NodeID]p2p.Handler{ + nodeID0: NewHandler(&testVerifier{}, signer0), + nodeID1: NewHandler(&testVerifier{}, signer1), + nodeID2: NewHandler(&testVerifier{}, signer1), + }, + ctx: context.Background(), + validators: []*warp.Validator{ + { + PublicKey: pk0, + Weight: 1, + NodeIDs: []ids.NodeID{nodeID0}, + }, + { + PublicKey: pk1, + Weight: 1, + NodeIDs: []ids.NodeID{nodeID1, nodeID2}, + }, + }, + wantTotalStake: 2, + wantSigners: 1, + wantFinished: true, + quorumNum: 2, + quorumDen: 3, + }, + { + name: "single validator - context canceled", + peers: map[ids.NodeID]p2p.Handler{ + nodeID0: NewHandler(&testVerifier{}, signer0), + }, + ctx: func() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + return ctx + }(), + validators: []*warp.Validator{ + { + PublicKey: pk0, + Weight: 1, + NodeIDs: []ids.NodeID{nodeID0}, + }, + }, + wantTotalStake: 1, + quorumNum: 1, + quorumDen: 1, + }, + { + name: "multiple validators - context canceled", + peers: map[ids.NodeID]p2p.Handler{ + nodeID0: NewHandler(&testVerifier{}, signer0), + nodeID1: NewHandler(&testVerifier{}, signer1), + nodeID2: NewHandler(&testVerifier{}, signer2), + }, + ctx: func() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + return ctx + }(), + validators: []*warp.Validator{ + { + PublicKey: pk0, + Weight: 1, + NodeIDs: []ids.NodeID{nodeID0}, + }, + { + PublicKey: pk1, + Weight: 1, + NodeIDs: []ids.NodeID{nodeID1}, + }, + { + PublicKey: pk2, + Weight: 1, + NodeIDs: []ids.NodeID{nodeID2}, + }, + }, + wantTotalStake: 3, + quorumNum: 1, + quorumDen: 1, + }, + { + name: "multiple validators - resume aggregation on signature", + peers: map[ids.NodeID]p2p.Handler{ + nodeID0: NewHandler(&testVerifier{}, signer0), + nodeID1: NewHandler(&testVerifier{}, signer1), + nodeID2: NewHandler(&testVerifier{}, signer2), + }, + ctx: context.Background(), + signature: func() warp.BitSetSignature { + sig := warp.BitSetSignature{ + Signers: set.NewBits(0).Bytes(), + Signature: [96]byte{}, + } + + sigBytes, err := signer0.Sign(unsignedMsg) + require.NoError(t, err) + copy(sig.Signature[:], sigBytes) + + return sig + }(), + validators: []*warp.Validator{ + { + PublicKey: pk0, + Weight: 1, + NodeIDs: []ids.NodeID{nodeID0}, + }, + { + PublicKey: pk1, + Weight: 1, + NodeIDs: []ids.NodeID{nodeID1}, + }, + { + PublicKey: pk2, + Weight: 1, + NodeIDs: []ids.NodeID{nodeID2}, + }, + }, + wantTotalStake: 3, + wantSigners: 3, + wantFinished: true, + quorumNum: 1, + quorumDen: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + + client := p2ptest.NewClientWithPeers( + t, + context.Background(), + ids.EmptyNodeID, + p2p.NoOpHandler{}, + tt.peers, + ) + aggregator := NewSignatureAggregator(logging.NoLog{}, client) + + msg, err := warp.NewMessage(unsignedMsg, &tt.signature) + require.NoError(err) + + gotMsg, gotAggregatedStake, gotTotalStake, finished, err := aggregator.AggregateSignatures( + tt.ctx, + msg, + []byte("justification"), + tt.validators, + tt.quorumNum, + tt.quorumDen, + ) + require.ErrorIs(err, tt.wantErr) + require.Equal(tt.wantFinished, finished) + + if tt.wantErr != nil { + return + } + + gotSignature := gotMsg.Signature.(*warp.BitSetSignature) + bitSet := set.BitsFromBytes(gotSignature.Signers) + require.Equal(tt.wantSigners, bitSet.Len()) + + pks := make([]*bls.PublicKey, 0) + wantAggregatedStake := uint64(0) + for i := 0; i < bitSet.BitLen(); i++ { + if !bitSet.Contains(i) { + continue + } + + pks = append(pks, tt.validators[i].PublicKey) + wantAggregatedStake += tt.validators[i].Weight + } + + if tt.wantSigners > 0 { + aggPk, err := bls.AggregatePublicKeys(pks) + require.NoError(err) + blsSig, err := bls.SignatureFromBytes(gotSignature.Signature[:]) + require.NoError(err) + require.True(bls.Verify(aggPk, blsSig, unsignedMsg.Bytes())) + } + + require.Equal(new(big.Int).SetUint64(wantAggregatedStake), gotAggregatedStake) + require.Equal(new(big.Int).SetUint64(uint64(tt.wantTotalStake)), gotTotalStake) + }) + } +} diff --git a/network/p2p/acp118/handler_test.go b/network/p2p/acp118/handler_test.go index e58d61a8f6a0..e67518a5a572 100644 --- a/network/p2p/acp118/handler_test.go +++ b/network/p2p/acp118/handler_test.go @@ -12,6 +12,7 @@ import ( "github.com/ava-labs/avalanchego/cache" "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/avalanchego/network/p2p/p2ptest" "github.com/ava-labs/avalanchego/proto/pb/sdk" "github.com/ava-labs/avalanchego/snow/engine/common" @@ -84,9 +85,10 @@ func TestHandler(t *testing.T) { c := p2ptest.NewClient( t, ctx, - h, clientNodeID, + p2p.NoOpHandler{}, serverNodeID, + h, ) unsignedMessage, err := warp.NewUnsignedMessage( @@ -134,7 +136,7 @@ func TestHandler(t *testing.T) { } for _, expectedErr = range tt.expectedErrs { - require.NoError(c.AppRequest(ctx, set.Of(clientNodeID), requestBytes, onResponse)) + require.NoError(c.AppRequest(ctx, set.Of(serverNodeID), requestBytes, onResponse)) <-handled } }) diff --git a/network/p2p/p2ptest/client.go b/network/p2p/p2ptest/client.go index b75654028666..4d722b968e2f 100644 --- a/network/p2p/p2ptest/client.go +++ b/network/p2p/p2ptest/client.go @@ -5,6 +5,7 @@ package p2ptest import ( "context" + "fmt" "testing" "time" @@ -19,72 +20,109 @@ import ( "github.com/ava-labs/avalanchego/utils/set" ) +func NewSelfClient(t *testing.T, ctx context.Context, nodeID ids.NodeID, handler p2p.Handler) *p2p.Client { + return NewClient(t, ctx, nodeID, handler, nodeID, handler) +} + // NewClient generates a client-server pair and returns the client used to // communicate with a server with the specified handler func NewClient( t *testing.T, ctx context.Context, - handler p2p.Handler, clientNodeID ids.NodeID, + clientHandler p2p.Handler, serverNodeID ids.NodeID, + serverHandler p2p.Handler, ) *p2p.Client { - clientSender := &enginetest.Sender{} - serverSender := &enginetest.Sender{} - - clientNetwork, err := p2p.NewNetwork(logging.NoLog{}, clientSender, prometheus.NewRegistry(), "") - require.NoError(t, err) + return NewClientWithPeers( + t, + ctx, + clientNodeID, + clientHandler, + map[ids.NodeID]p2p.Handler{ + serverNodeID: serverHandler, + }, + ) +} - serverNetwork, err := p2p.NewNetwork(logging.NoLog{}, serverSender, prometheus.NewRegistry(), "") - require.NoError(t, err) +// NewClientWithPeers generates a client to communicate to a set of peers +func NewClientWithPeers( + t *testing.T, + ctx context.Context, + clientNodeID ids.NodeID, + clientHandler p2p.Handler, + peers map[ids.NodeID]p2p.Handler, +) *p2p.Client { + peers[clientNodeID] = clientHandler + + peerSenders := make(map[ids.NodeID]*enginetest.Sender) + peerNetworks := make(map[ids.NodeID]*p2p.Network) + for nodeID := range peers { + peerSenders[nodeID] = &enginetest.Sender{} + peerNetwork, err := p2p.NewNetwork(logging.NoLog{}, peerSenders[nodeID], prometheus.NewRegistry(), "") + require.NoError(t, err) + peerNetworks[nodeID] = peerNetwork + } - clientSender.SendAppGossipF = func(ctx context.Context, _ common.SendConfig, gossipBytes []byte) error { + peerSenders[clientNodeID].SendAppGossipF = func(ctx context.Context, sendConfig common.SendConfig, gossipBytes []byte) error { // Send the request asynchronously to avoid deadlock when the server // sends the response back to the client - go func() { - require.NoError(t, serverNetwork.AppGossip(ctx, clientNodeID, gossipBytes)) - }() + for nodeID := range sendConfig.NodeIDs { + go func() { + require.NoError(t, peerNetworks[nodeID].AppGossip(ctx, nodeID, gossipBytes)) + }() + } return nil } - clientSender.SendAppRequestF = func(ctx context.Context, _ set.Set[ids.NodeID], requestID uint32, requestBytes []byte) error { - // Send the request asynchronously to avoid deadlock when the server - // sends the response back to the client - go func() { - require.NoError(t, serverNetwork.AppRequest(ctx, clientNodeID, requestID, time.Time{}, requestBytes)) - }() + peerSenders[clientNodeID].SendAppRequestF = func(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, requestBytes []byte) error { + for nodeID := range nodeIDs { + network, ok := peerNetworks[nodeID] + if !ok { + return fmt.Errorf("%s is not connected", nodeID) + } + + // Send the request asynchronously to avoid deadlock when the server + // sends the response back to the client + go func() { + require.NoError(t, network.AppRequest(ctx, clientNodeID, requestID, time.Time{}, requestBytes)) + }() + } return nil } - serverSender.SendAppResponseF = func(ctx context.Context, _ ids.NodeID, requestID uint32, responseBytes []byte) error { - // Send the request asynchronously to avoid deadlock when the server - // sends the response back to the client - go func() { - require.NoError(t, clientNetwork.AppResponse(ctx, serverNodeID, requestID, responseBytes)) - }() + for nodeID := range peers { + peerSenders[nodeID].SendAppResponseF = func(ctx context.Context, _ ids.NodeID, requestID uint32, responseBytes []byte) error { + // Send the request asynchronously to avoid deadlock when the server + // sends the response back to the client + go func() { + require.NoError(t, peerNetworks[clientNodeID].AppResponse(ctx, nodeID, requestID, responseBytes)) + }() - return nil + return nil + } } - serverSender.SendAppErrorF = func(ctx context.Context, _ ids.NodeID, requestID uint32, errorCode int32, errorMessage string) error { - // Send the request asynchronously to avoid deadlock when the server - // sends the response back to the client - go func() { - require.NoError(t, clientNetwork.AppRequestFailed(ctx, serverNodeID, requestID, &common.AppError{ - Code: errorCode, - Message: errorMessage, - })) - }() - - return nil + for nodeID := range peers { + peerSenders[nodeID].SendAppErrorF = func(ctx context.Context, _ ids.NodeID, requestID uint32, errorCode int32, errorMessage string) error { + go func() { + require.NoError(t, peerNetworks[clientNodeID].AppRequestFailed(ctx, nodeID, requestID, &common.AppError{ + Code: errorCode, + Message: errorMessage, + })) + }() + + return nil + } } - require.NoError(t, clientNetwork.Connected(ctx, clientNodeID, nil)) - require.NoError(t, clientNetwork.Connected(ctx, serverNodeID, nil)) - require.NoError(t, serverNetwork.Connected(ctx, clientNodeID, nil)) - require.NoError(t, serverNetwork.Connected(ctx, serverNodeID, nil)) + for nodeID := range peers { + require.NoError(t, peerNetworks[nodeID].Connected(ctx, clientNodeID, nil)) + require.NoError(t, peerNetworks[nodeID].Connected(ctx, nodeID, nil)) + require.NoError(t, peerNetworks[nodeID].AddHandler(0, peers[nodeID])) + } - require.NoError(t, serverNetwork.AddHandler(0, handler)) - return clientNetwork.NewClient(0) + return peerNetworks[clientNodeID].NewClient(0) } diff --git a/network/p2p/p2ptest/client_test.go b/network/p2p/p2ptest/client_test.go index 45ae970ecf0f..c0032195aff3 100644 --- a/network/p2p/p2ptest/client_test.go +++ b/network/p2p/p2ptest/client_test.go @@ -16,7 +16,7 @@ import ( "github.com/ava-labs/avalanchego/utils/set" ) -func TestNewClient_AppGossip(t *testing.T) { +func TestClient_AppGossip(t *testing.T) { require := require.New(t) ctx := context.Background() @@ -27,12 +27,18 @@ func TestNewClient_AppGossip(t *testing.T) { }, } - client := NewClient(t, ctx, testHandler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) - require.NoError(client.AppGossip(ctx, common.SendConfig{}, []byte("foobar"))) + nodeID := ids.GenerateTestNodeID() + client := NewSelfClient( + t, + ctx, + nodeID, + testHandler, + ) + require.NoError(client.AppGossip(ctx, common.SendConfig{NodeIDs: set.Of(nodeID)}, []byte("foobar"))) <-appGossipChan } -func TestNewClient_AppRequest(t *testing.T) { +func TestClient_AppRequest(t *testing.T) { tests := []struct { name string appResponse []byte @@ -43,7 +49,7 @@ func TestNewClient_AppRequest(t *testing.T) { name: "AppRequest - response", appResponse: []byte("foobar"), appRequestF: func(ctx context.Context, client *p2p.Client, onResponse p2p.AppResponseCallback) error { - return client.AppRequest(ctx, set.Of(ids.GenerateTestNodeID()), []byte("foo"), onResponse) + return client.AppRequest(ctx, set.Of(ids.EmptyNodeID), []byte("foo"), onResponse) }, }, { @@ -53,7 +59,7 @@ func TestNewClient_AppRequest(t *testing.T) { Message: "foobar", }, appRequestF: func(ctx context.Context, client *p2p.Client, onResponse p2p.AppResponseCallback) error { - return client.AppRequest(ctx, set.Of(ids.GenerateTestNodeID()), []byte("foo"), onResponse) + return client.AppRequest(ctx, set.Of(ids.EmptyNodeID), []byte("foo"), onResponse) }, }, { @@ -94,14 +100,19 @@ func TestNewClient_AppRequest(t *testing.T) { }, } - client := NewClient(t, ctx, testHandler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + client := NewSelfClient( + t, + ctx, + ids.EmptyNodeID, + testHandler, + ) require.NoError(tt.appRequestF( ctx, client, func(_ context.Context, _ ids.NodeID, responseBytes []byte, err error) { + defer close(appRequestChan) require.ErrorIs(err, tt.appErr) require.Equal(tt.appResponse, responseBytes) - close(appRequestChan) }, )) <-appRequestChan diff --git a/x/sync/sync_test.go b/x/sync/sync_test.go index 8f065d838e5f..a2800e4f812e 100644 --- a/x/sync/sync_test.go +++ b/x/sync/sync_test.go @@ -39,8 +39,8 @@ func Test_Creation(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetRangeProofHandler(logging.NoLog{}, db)), + ChangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetChangeProofHandler(logging.NoLog{}, db)), SimultaneousWorkLimit: 5, Log: logging.NoLog{}, BranchFactor: merkledb.BranchFactor16, @@ -72,8 +72,8 @@ func Test_Completion(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, emptyDB), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, emptyDB), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetRangeProofHandler(logging.NoLog{}, emptyDB)), + ChangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetChangeProofHandler(logging.NoLog{}, emptyDB)), TargetRoot: emptyRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -177,8 +177,8 @@ func Test_Sync_FindNextKey_InSync(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetRangeProofHandler(logging.NoLog{}, dbToSync)), + ChangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetChangeProofHandler(logging.NoLog{}, dbToSync)), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -253,8 +253,8 @@ func Test_Sync_FindNextKey_Deleted(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetRangeProofHandler(logging.NoLog{}, db)), + ChangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetChangeProofHandler(logging.NoLog{}, db)), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -303,8 +303,8 @@ func Test_Sync_FindNextKey_BranchInLocal(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetRangeProofHandler(logging.NoLog{}, db)), + ChangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetChangeProofHandler(logging.NoLog{}, db)), TargetRoot: targetRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -340,8 +340,8 @@ func Test_Sync_FindNextKey_BranchInReceived(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetRangeProofHandler(logging.NoLog{}, db)), + ChangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetChangeProofHandler(logging.NoLog{}, db)), TargetRoot: targetRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -376,8 +376,8 @@ func Test_Sync_FindNextKey_ExtraValues(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetRangeProofHandler(logging.NoLog{}, dbToSync)), + ChangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetChangeProofHandler(logging.NoLog{}, dbToSync)), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -438,8 +438,8 @@ func TestFindNextKeyEmptyEndProof(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetRangeProofHandler(logging.NoLog{}, db)), + ChangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetChangeProofHandler(logging.NoLog{}, db)), TargetRoot: ids.Empty, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -507,8 +507,8 @@ func Test_Sync_FindNextKey_DifferentChild(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetRangeProofHandler(logging.NoLog{}, dbToSync)), + ChangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetChangeProofHandler(logging.NoLog{}, dbToSync)), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -730,8 +730,8 @@ func TestFindNextKeyRandom(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: localDB, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, remoteDB), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, remoteDB), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetRangeProofHandler(logging.NoLog{}, remoteDB)), + ChangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetChangeProofHandler(logging.NoLog{}, remoteDB)), TargetRoot: ids.GenerateTestID(), SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -777,7 +777,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { response.KeyValues = append(response.KeyValues, merkledb.KeyValue{}) }) - return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + return p2ptest.NewSelfClient(t, context.Background(), ids.EmptyNodeID, handler) }, }, { @@ -787,7 +787,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { response.KeyValues = response.KeyValues[min(1, len(response.KeyValues)):] }) - return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + return p2ptest.NewSelfClient(t, context.Background(), ids.EmptyNodeID, handler) }, }, { @@ -813,7 +813,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { } }) - return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + return p2ptest.NewSelfClient(t, context.Background(), ids.EmptyNodeID, handler) }, }, { @@ -824,7 +824,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { _ = slices.Delete(response.KeyValues, i, min(len(response.KeyValues), i+1)) }) - return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + return p2ptest.NewSelfClient(t, context.Background(), ids.EmptyNodeID, handler) }, }, { @@ -835,7 +835,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { response.EndProof = nil }) - return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + return p2ptest.NewSelfClient(t, context.Background(), ids.EmptyNodeID, handler) }, }, { @@ -845,7 +845,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { response.EndProof = nil }) - return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + return p2ptest.NewSelfClient(t, context.Background(), ids.EmptyNodeID, handler) }, }, { @@ -857,16 +857,16 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { response.KeyValues = nil }) - return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + return p2ptest.NewSelfClient(t, context.Background(), ids.EmptyNodeID, handler) }, }, { name: "range proof server flake", rangeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - return p2ptest.NewClient(t, context.Background(), &flakyHandler{ + return p2ptest.NewSelfClient(t, context.Background(), ids.EmptyNodeID, &flakyHandler{ Handler: NewGetRangeProofHandler(logging.NoLog{}, db), c: &counter{m: 2}, - }, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + }) }, }, { @@ -876,7 +876,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { response.KeyChanges = append(response.KeyChanges, make([]merkledb.KeyChange, defaultRequestKeyLimit)...) }) - return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + return p2ptest.NewSelfClient(t, context.Background(), ids.EmptyNodeID, handler) }, }, { @@ -886,7 +886,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { response.KeyChanges = response.KeyChanges[min(1, len(response.KeyChanges)):] }) - return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + return p2ptest.NewSelfClient(t, context.Background(), ids.EmptyNodeID, handler) }, }, { @@ -897,7 +897,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { _ = slices.Delete(response.KeyChanges, i, min(len(response.KeyChanges), i+1)) }) - return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + return p2ptest.NewSelfClient(t, context.Background(), ids.EmptyNodeID, handler) }, }, { @@ -908,16 +908,16 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { response.EndProof = nil }) - return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + return p2ptest.NewSelfClient(t, context.Background(), ids.EmptyNodeID, handler) }, }, { name: "change proof flaky server", changeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - return p2ptest.NewClient(t, context.Background(), &flakyHandler{ + return p2ptest.NewSelfClient(t, context.Background(), ids.EmptyNodeID, &flakyHandler{ Handler: NewGetChangeProofHandler(logging.NoLog{}, db), c: &counter{m: 2}, - }, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + }) }, }, } @@ -946,13 +946,13 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { ) rangeProofHandler := NewGetRangeProofHandler(logging.NoLog{}, dbToSync) - rangeProofClient = p2ptest.NewClient(t, ctx, rangeProofHandler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + rangeProofClient = p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, rangeProofHandler) if tt.rangeProofClient != nil { rangeProofClient = tt.rangeProofClient(dbToSync) } changeProofHandler := NewGetChangeProofHandler(logging.NoLog{}, dbToSync) - changeProofClient = p2ptest.NewClient(t, ctx, changeProofHandler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + changeProofClient = p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, changeProofHandler) if tt.changeProofClient != nil { changeProofClient = tt.changeProofClient(dbToSync) } @@ -1031,8 +1031,8 @@ func Test_Sync_Result_Correct_Root_With_Sync_Restart(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetRangeProofHandler(logging.NoLog{}, dbToSync)), + ChangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetChangeProofHandler(logging.NoLog{}, dbToSync)), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -1058,8 +1058,8 @@ func Test_Sync_Result_Correct_Root_With_Sync_Restart(t *testing.T) { newSyncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetRangeProofHandler(logging.NoLog{}, dbToSync)), + ChangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetChangeProofHandler(logging.NoLog{}, dbToSync)), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -1130,15 +1130,15 @@ func Test_Sync_Result_Correct_Root_Update_Root_During(t *testing.T) { updatedRootChan <- struct{}{} ctx := context.Background() - rangeProofClient := p2ptest.NewClient(t, ctx, &waitingHandler{ + rangeProofClient := p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, &waitingHandler{ handler: NewGetRangeProofHandler(logging.NoLog{}, dbToSync), updatedRootChan: updatedRootChan, - }, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + }) - changeProofClient := p2ptest.NewClient(t, ctx, &waitingHandler{ + changeProofClient := p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, &waitingHandler{ handler: NewGetChangeProofHandler(logging.NoLog{}, dbToSync), updatedRootChan: updatedRootChan, - }, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + }) syncer, err := NewManager(ManagerConfig{ DB: db, @@ -1189,8 +1189,8 @@ func Test_Sync_UpdateSyncTarget(t *testing.T) { ctx := context.Background() m, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetRangeProofHandler(logging.NoLog{}, db)), + ChangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetChangeProofHandler(logging.NoLog{}, db)), TargetRoot: ids.Empty, SimultaneousWorkLimit: 5, Log: logging.NoLog{},