Skip to content

Commit

Permalink
Merge branch 'combine-app-gossip-implementations' into validator-rank…
Browse files Browse the repository at this point in the history
…ings
  • Loading branch information
StephenButtolph committed Mar 8, 2024
2 parents 158c202 + c955435 commit e5f591e
Show file tree
Hide file tree
Showing 25 changed files with 396 additions and 623 deletions.
105 changes: 48 additions & 57 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/ava-labs/avalanchego/network/dialer"
"github.com/ava-labs/avalanchego/network/peer"
"github.com/ava-labs/avalanchego/network/throttling"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/networking/router"
"github.com/ava-labs/avalanchego/snow/networking/sender"
"github.com/ava-labs/avalanchego/subnets"
Expand All @@ -47,8 +48,7 @@ const (
)

var (
_ sender.ExternalSender = (*network)(nil)
_ Network = (*network)(nil)
_ Network = (*network)(nil)

errNotValidator = errors.New("node is not a validator")
errNotTracked = errors.New("subnet is not tracked")
Expand Down Expand Up @@ -310,25 +310,40 @@ func NewNetwork(
return n, nil
}

func (n *network) Send(msg message.OutboundMessage, nodeIDs set.Set[ids.NodeID], subnetID ids.ID, allower subnets.Allower) set.Set[ids.NodeID] {
peers := n.getPeers(nodeIDs, subnetID, allower)
n.peerConfig.Metrics.MultipleSendsFailed(
msg.Op(),
nodeIDs.Len()-len(peers),
)
return n.send(msg, peers)
}

func (n *network) Gossip(
func (n *network) Send(
msg message.OutboundMessage,
config common.SendConfig,
subnetID ids.ID,
numValidatorsToSend int,
numNonValidatorsToSend int,
numPeersToSend int,
allower subnets.Allower,
) set.Set[ids.NodeID] {
peers := n.samplePeers(subnetID, numValidatorsToSend, numNonValidatorsToSend, numPeersToSend, allower)
return n.send(msg, peers)
namedPeers := n.getPeers(config.NodeIDs, subnetID, allower)
n.peerConfig.Metrics.MultipleSendsFailed(
msg.Op(),
config.NodeIDs.Len()-len(namedPeers),
)

var (
sampledPeers = n.samplePeers(config, subnetID, allower)
sentTo = set.NewSet[ids.NodeID](len(namedPeers) + len(sampledPeers))
now = n.peerConfig.Clock.Time()
)

// send to peer and update metrics
for _, peers := range [][]peer.Peer{namedPeers, sampledPeers} {
for _, peer := range peers {
if peer.Send(n.onCloseCtx, msg) {
sentTo.Add(peer.ID())

// TODO: move send fail rate calculations into the peer metrics
// record metrics for success
n.sendFailRateCalculator.Observe(0, now)
} else {
// record metrics for failure
n.sendFailRateCalculator.Observe(1, now)
}
}
}
return sentTo
}

// HealthCheck returns information about several network layer health checks.
Expand Down Expand Up @@ -696,24 +711,19 @@ func (n *network) getPeers(
}

func (n *network) samplePeers(
config common.SendConfig,
subnetID ids.ID,
numValidatorsToSample,
numNonValidatorsToSample int,
numPeersToSample int,
allower subnets.Allower,
) []peer.Peer {
// If there are fewer validators than [numValidatorsToSample], then only
// sample [numValidatorsToSample] validators.
subnetValidatorsLen := n.config.Validators.Count(subnetID)
if subnetValidatorsLen < numValidatorsToSample {
numValidatorsToSample = subnetValidatorsLen
}
numValidatorsToSample := min(config.Validators, n.config.Validators.Count(subnetID))

n.peersLock.RLock()
defer n.peersLock.RUnlock()

return n.connectedPeers.Sample(
numValidatorsToSample+numNonValidatorsToSample+numPeersToSample,
numValidatorsToSample+config.NonValidators+config.Peers,
func(p peer.Peer) bool {
// Only return peers that are tracking [subnetID]
trackedSubnets := p.TrackedSubnets()
Expand All @@ -722,14 +732,20 @@ func (n *network) samplePeers(
}

peerID := p.ID()
// if the peer was already explicitly included, don't include in the
// sample
if config.NodeIDs.Contains(peerID) {
return false
}

_, isValidator := n.config.Validators.GetValidator(subnetID, peerID)
// check if the peer is allowed to connect to the subnet
if !allower.IsAllowed(peerID, isValidator) {
return false
}

if numPeersToSample > 0 {
numPeersToSample--
if config.Peers > 0 {
config.Peers--
return true
}

Expand All @@ -738,37 +754,12 @@ func (n *network) samplePeers(
return numValidatorsToSample >= 0
}

numNonValidatorsToSample--
return numNonValidatorsToSample >= 0
config.NonValidators--
return config.NonValidators >= 0
},
)
}

// send the message to the provided peers.
//
// send takes ownership of the provided message reference. So, the provided
// message should only be inspected if the reference has been externally
// increased.
func (n *network) send(msg message.OutboundMessage, peers []peer.Peer) set.Set[ids.NodeID] {
sentTo := set.NewSet[ids.NodeID](len(peers))
now := n.peerConfig.Clock.Time()

// send to peer and update metrics
for _, peer := range peers {
if peer.Send(n.onCloseCtx, msg) {
sentTo.Add(peer.ID())

// TODO: move send fail rate calculations into the peer metrics
// record metrics for success
n.sendFailRateCalculator.Observe(0, now)
} else {
// record metrics for failure
n.sendFailRateCalculator.Observe(1, now)
}
}
return sentTo
}

func (n *network) disconnectedFromConnecting(nodeID ids.NodeID) {
n.peersLock.Lock()
defer n.peersLock.Unlock()
Expand Down Expand Up @@ -1208,10 +1199,10 @@ func (n *network) runTimers() {
// pullGossipPeerLists requests validators from peers in the network
func (n *network) pullGossipPeerLists() {
peers := n.samplePeers(
common.SendConfig{
Validators: 1,
},
constants.PrimaryNetworkID,
1, // numValidatorsToSample
0, // numNonValidatorsToSample
0, // numPeersToSample
subnets.NoOpAllower,
)

Expand Down
29 changes: 18 additions & 11 deletions network/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/ava-labs/avalanchego/network/peer"
"github.com/ava-labs/avalanchego/network/throttling"
"github.com/ava-labs/avalanchego/proto/pb/p2p"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/networking/router"
"github.com/ava-labs/avalanchego/snow/networking/tracker"
"github.com/ava-labs/avalanchego/snow/uptime"
Expand Down Expand Up @@ -327,7 +328,14 @@ func TestSend(t *testing.T) {
require.NoError(err)

toSend := set.Of(nodeIDs[1])
sentTo := net0.Send(outboundGetMsg, toSend, constants.PrimaryNetworkID, subnets.NoOpAllower)
sentTo := net0.Send(
outboundGetMsg,
common.SendConfig{
NodeIDs: toSend,
},
constants.PrimaryNetworkID,
subnets.NoOpAllower,
)
require.Equal(toSend, sentTo)

inboundGetMsg := <-received
Expand All @@ -339,7 +347,7 @@ func TestSend(t *testing.T) {
wg.Wait()
}

func TestSendAndGossipWithFilter(t *testing.T) {
func TestSendWithFilter(t *testing.T) {
require := require.New(t)

received := make(chan message.InboundMessage)
Expand All @@ -366,21 +374,20 @@ func TestSendAndGossipWithFilter(t *testing.T) {

toSend := set.Of(nodeIDs...)
validNodeID := nodeIDs[1]
sentTo := net0.Send(outboundGetMsg, toSend, constants.PrimaryNetworkID, newNodeIDConnector(validNodeID))
sentTo := net0.Send(
outboundGetMsg,
common.SendConfig{
NodeIDs: toSend,
},
constants.PrimaryNetworkID,
newNodeIDConnector(validNodeID),
)
require.Len(sentTo, 1)
require.Contains(sentTo, validNodeID)

inboundGetMsg := <-received
require.Equal(message.GetOp, inboundGetMsg.Op())

// Test Gossip now
sentTo = net0.Gossip(outboundGetMsg, constants.PrimaryNetworkID, 0, 0, len(nodeIDs), newNodeIDConnector(validNodeID))
require.Len(sentTo, 1)
require.Contains(sentTo, validNodeID)

inboundGetMsg = <-received
require.Equal(message.GetOp, inboundGetMsg.Op())

for _, net := range networks {
net.StartClose()
}
Expand Down
21 changes: 2 additions & 19 deletions network/p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,29 +108,12 @@ func (c *Client) AppRequest(
// AppGossip sends a gossip message to a random set of peers.
func (c *Client) AppGossip(
ctx context.Context,
config common.SendConfig,
appGossipBytes []byte,
numValidators int,
numNonValidators int,
numPeers int,
) error {
return c.sender.SendAppGossip(
ctx,
PrefixMessage(c.handlerPrefix, appGossipBytes),
numValidators,
numNonValidators,
numPeers,
)
}

// AppGossipSpecific sends a gossip message to a predetermined set of peers.
func (c *Client) AppGossipSpecific(
ctx context.Context,
nodeIDs set.Set[ids.NodeID],
appGossipBytes []byte,
) error {
return c.sender.SendAppGossipSpecific(
ctx,
nodeIDs,
config,
PrefixMessage(c.handlerPrefix, appGossipBytes),
)
}
Expand Down
9 changes: 6 additions & 3 deletions network/p2p/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ava-labs/avalanchego/cache"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/bloom"
"github.com/ava-labs/avalanchego/utils/buffer"
Expand Down Expand Up @@ -462,10 +463,12 @@ func (p *PushGossiper[T]) gossip(

return p.client.AppGossip(
ctx,
common.SendConfig{
Validators: gossipParams.Validators,
NonValidators: gossipParams.NonValidators,
Peers: gossipParams.Peers,
},
msgBytes,
gossipParams.Validators,
gossipParams.NonValidators,
gossipParams.Peers,
)
}

Expand Down
22 changes: 14 additions & 8 deletions network/p2p/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,13 @@ func TestMessageRouting(t *testing.T) {
require.NoError(network.AddHandler(1, testHandler))
client := network.NewClient(1)

require.NoError(client.AppGossip(ctx, wantMsg, 0, 0, 1))
require.NoError(client.AppGossip(
ctx,
common.SendConfig{
Peers: 1,
},
wantMsg,
))
require.NoError(network.AppGossip(ctx, wantNodeID, <-sender.SentAppGossip))
require.True(appGossipCalled)

Expand All @@ -89,7 +95,6 @@ func TestClientPrefixesMessages(t *testing.T) {
sender := common.FakeSender{
SentAppRequest: make(chan []byte, 1),
SentAppGossip: make(chan []byte, 1),
SentAppGossipSpecific: make(chan []byte, 1),
SentCrossChainAppRequest: make(chan []byte, 1),
}

Expand Down Expand Up @@ -129,15 +134,16 @@ func TestClientPrefixesMessages(t *testing.T) {
require.Equal(handlerPrefix, gotCrossChainAppRequest[0])
require.Equal(want, gotCrossChainAppRequest[1:])

require.NoError(client.AppGossip(ctx, want, 0, 0, 1))
require.NoError(client.AppGossip(
ctx,
common.SendConfig{
Peers: 1,
},
want,
))
gotAppGossip := <-sender.SentAppGossip
require.Equal(handlerPrefix, gotAppGossip[0])
require.Equal(want, gotAppGossip[1:])

require.NoError(client.AppGossipSpecific(ctx, set.Of(ids.EmptyNodeID), want))
gotAppGossip = <-sender.SentAppGossipSpecific
require.Equal(handlerPrefix, gotAppGossip[0])
require.Equal(want, gotAppGossip[1:])
}

// Tests that the Client callback is called on a successful response
Expand Down
16 changes: 5 additions & 11 deletions proto/appsender/appsender.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ service AppSender {
rpc SendAppResponse(SendAppResponseMsg) returns (google.protobuf.Empty);
rpc SendAppError(SendAppErrorMsg) returns (google.protobuf.Empty);
rpc SendAppGossip(SendAppGossipMsg) returns (google.protobuf.Empty);
rpc SendAppGossipSpecific(SendAppGossipSpecificMsg) returns (google.protobuf.Empty);

rpc SendCrossChainAppRequest(SendCrossChainAppRequestMsg) returns (google.protobuf.Empty);
rpc SendCrossChainAppResponse(SendCrossChainAppResponseMsg) returns (google.protobuf.Empty);
Expand Down Expand Up @@ -48,18 +47,13 @@ message SendAppErrorMsg {
}

message SendAppGossipMsg {
// The message body
bytes msg = 1;
uint64 num_validators = 2;
uint64 num_non_validators = 3;
uint64 num_peers = 4;
}

message SendAppGossipSpecificMsg {
// The nodes to send this request to
// Who to send this message to
repeated bytes node_ids = 1;
uint64 validators = 2;
uint64 non_validators = 3;
uint64 peers = 4;
// The message body
bytes msg = 2;
bytes msg = 5;
}

message SendCrossChainAppRequestMsg {
Expand Down
Loading

0 comments on commit e5f591e

Please sign in to comment.