Skip to content

Commit

Permalink
[vms/avm] Remove snow.Context from Network (#2834)
Browse files Browse the repository at this point in the history
  • Loading branch information
dhrubabasu authored Mar 8, 2024
1 parent 73b6c60 commit 8fedfd9
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 39 deletions.
59 changes: 31 additions & 28 deletions vms/avm/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/network/p2p/gossip"
"github.com/ava-labs/avalanchego/snow"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/vms/avm/txs"
"github.com/ava-labs/avalanchego/vms/avm/txs/mempool"
"github.com/ava-labs/avalanchego/vms/components/message"
Expand All @@ -31,27 +31,30 @@ var (
type Network struct {
*p2p.Network

log logging.Logger
parser txs.Parser
mempool *gossipMempool
appSender common.AppSender

txPushGossiper *gossip.PushGossiper[*txs.Tx]
txPushGossipFrequency time.Duration
txPullGossiper gossip.Gossiper
txPullGossipFrequency time.Duration

ctx *snow.Context
parser txs.Parser
mempool *gossipMempool
appSender common.AppSender
}

func New(
ctx *snow.Context,
log logging.Logger,
nodeID ids.NodeID,
subnetID ids.ID,
vdrs validators.State,
parser txs.Parser,
txVerifier TxVerifier,
mempool mempool.Mempool,
appSender common.AppSender,
registerer prometheus.Registerer,
config Config,
) (*Network, error) {
p2pNetwork, err := p2p.NewNetwork(ctx.Log, appSender, registerer, "p2p")
p2pNetwork, err := p2p.NewNetwork(log, appSender, registerer, "p2p")
if err != nil {
return nil, err
}
Expand All @@ -61,9 +64,9 @@ func New(
}
validators := p2p.NewValidators(
p2pNetwork.Peers,
ctx.Log,
ctx.SubnetID,
ctx.ValidatorState,
log,
subnetID,
vdrs,
config.MaxValidatorSetStaleness,
)
txGossipClient := p2pNetwork.NewClient(
Expand All @@ -78,7 +81,7 @@ func New(
gossipMempool, err := newGossipMempool(
mempool,
registerer,
ctx.Log,
log,
txVerifier,
parser,
config.ExpectedBloomFilterElements,
Expand Down Expand Up @@ -111,7 +114,7 @@ func New(
}

var txPullGossiper gossip.Gossiper = gossip.NewPullGossiper[*txs.Tx](
ctx.Log,
log,
marshaller,
gossipMempool,
txGossipClient,
Expand All @@ -122,12 +125,12 @@ func New(
// Gossip requests are only served if a node is a validator
txPullGossiper = gossip.ValidatorGossiper{
Gossiper: txPullGossiper,
NodeID: ctx.NodeID,
NodeID: nodeID,
Validators: validators,
}

handler := gossip.NewHandler[*txs.Tx](
ctx.Log,
log,
marshaller,
gossipMempool,
txGossipMetrics,
Expand All @@ -141,10 +144,10 @@ func New(
config.PullGossipThrottlingPeriod,
config.PullGossipThrottlingLimit,
),
ctx.Log,
log,
),
validators,
ctx.Log,
log,
)

// We allow pushing txs between all peers, but only serve gossip requests
Expand All @@ -160,34 +163,34 @@ func New(

return &Network{
Network: p2pNetwork,
log: log,
parser: parser,
mempool: gossipMempool,
appSender: appSender,
txPushGossiper: txPushGossiper,
txPushGossipFrequency: config.PushGossipFrequency,
txPullGossiper: txPullGossiper,
txPullGossipFrequency: config.PullGossipFrequency,
ctx: ctx,
parser: parser,
mempool: gossipMempool,
appSender: appSender,
}, nil
}

func (n *Network) PushGossip(ctx context.Context) {
gossip.Every(ctx, n.ctx.Log, n.txPushGossiper, n.txPushGossipFrequency)
gossip.Every(ctx, n.log, n.txPushGossiper, n.txPushGossipFrequency)
}

func (n *Network) PullGossip(ctx context.Context) {
gossip.Every(ctx, n.ctx.Log, n.txPullGossiper, n.txPullGossipFrequency)
gossip.Every(ctx, n.log, n.txPullGossiper, n.txPullGossipFrequency)
}

func (n *Network) AppGossip(ctx context.Context, nodeID ids.NodeID, msgBytes []byte) error {
n.ctx.Log.Debug("called AppGossip message handler",
n.log.Debug("called AppGossip message handler",
zap.Stringer("nodeID", nodeID),
zap.Int("messageLen", len(msgBytes)),
)

msgIntf, err := message.Parse(msgBytes)
if err != nil {
n.ctx.Log.Debug("forwarding AppGossip message to SDK network",
n.log.Debug("forwarding AppGossip message to SDK network",
zap.String("reason", "failed to parse message"),
)

Expand All @@ -196,15 +199,15 @@ func (n *Network) AppGossip(ctx context.Context, nodeID ids.NodeID, msgBytes []b

msg, ok := msgIntf.(*message.Tx)
if !ok {
n.ctx.Log.Debug("dropping unexpected message",
n.log.Debug("dropping unexpected message",
zap.Stringer("nodeID", nodeID),
)
return nil
}

tx, err := n.parser.ParseTx(msg.Tx)
if err != nil {
n.ctx.Log.Verbo("received invalid tx",
n.log.Verbo("received invalid tx",
zap.Stringer("nodeID", nodeID),
zap.Binary("tx", msg.Tx),
zap.Error(err),
Expand All @@ -213,7 +216,7 @@ func (n *Network) AppGossip(ctx context.Context, nodeID ids.NodeID, msgBytes []b
}

if err := n.mempool.Add(tx); err != nil {
n.ctx.Log.Debug("tx failed to be added to the mempool",
n.log.Debug("tx failed to be added to the mempool",
zap.Stringer("txID", tx.ID()),
zap.Error(err),
)
Expand Down
26 changes: 16 additions & 10 deletions vms/avm/network/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"go.uber.org/mock/gomock"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/snowtest"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/vms/avm/block/executor"
"github.com/ava-labs/avalanchego/vms/avm/fxs"
Expand Down Expand Up @@ -209,10 +209,12 @@ func TestNetworkAppGossip(t *testing.T) {
txVerifierFunc = tt.txVerifierFunc
}

snowCtx := snowtest.Context(t, ids.Empty)
n, err := New(
&snow.Context{
Log: logging.NoLog{},
},
logging.NoLog{},
ids.EmptyNodeID,
ids.Empty,
snowCtx.ValidatorState,
parser,
txVerifierFunc(ctrl),
mempoolFunc(ctrl),
Expand Down Expand Up @@ -350,10 +352,12 @@ func TestNetworkIssueTxFromRPC(t *testing.T) {
appSenderFunc = tt.appSenderFunc
}

snowCtx := snowtest.Context(t, ids.Empty)
n, err := New(
&snow.Context{
Log: logging.NoLog{},
},
logging.NoLog{},
ids.EmptyNodeID,
ids.Empty,
snowCtx.ValidatorState,
parser,
txVerifierFunc(ctrl),
mempoolFunc(ctrl),
Expand Down Expand Up @@ -437,10 +441,12 @@ func TestNetworkIssueTxFromRPCWithoutVerification(t *testing.T) {
appSenderFunc = tt.appSenderFunc
}

snowCtx := snowtest.Context(t, ids.Empty)
n, err := New(
&snow.Context{
Log: logging.NoLog{},
},
logging.NoLog{},
ids.EmptyNodeID,
ids.Empty,
snowCtx.ValidatorState,
parser,
executor.NewMockManager(ctrl), // Should never verify a tx
mempoolFunc(ctrl),
Expand Down
5 changes: 4 additions & 1 deletion vms/avm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,10 @@ func (vm *VM) Linearize(ctx context.Context, stopVertexID ids.ID, toEngine chan<

// Invariant: The context lock is not held when calling network.IssueTx.
vm.network, err = network.New(
vm.ctx,
vm.ctx.Log,
vm.ctx.NodeID,
vm.ctx.SubnetID,
vm.ctx.ValidatorState,
vm.parser,
network.NewLockedTxVerifier(
&vm.ctx.Lock,
Expand Down

0 comments on commit 8fedfd9

Please sign in to comment.