Skip to content

Commit

Permalink
move network util functions from main.go to their own file in relayer
Browse files Browse the repository at this point in the history
  • Loading branch information
iansuvak committed Aug 15, 2024
1 parent 776fe94 commit a4302df
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 125 deletions.
131 changes: 6 additions & 125 deletions relayer/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"fmt"
"log"
"math/big"
"net/http"
"os"
"runtime"
Expand Down Expand Up @@ -153,13 +152,17 @@ func main() {
trackedSubnets,
&cfg,
)
initializeConnectionsAndCheckStake(logger, network, &cfg)

if err != nil {
logger.Fatal("Failed to create app request network", zap.Error(err))
panic(err)
}

err = relayer.InitializeConnectionsAndCheckStake(logger, network, &cfg)
if err != nil {
logger.Fatal("Failed to initialize connections and check stake", zap.Error(err))
panic(err)
}

startMetricsServer(logger, gatherer, cfg.MetricsPort)

relayerMetrics, err := relayer.NewApplicationRelayerMetrics(registerer)
Expand Down Expand Up @@ -549,125 +552,3 @@ func initializeMetrics() (prometheus.Gatherer, prometheus.Registerer, error) {
}
return gatherer, registry, nil
}

func initializeConnectionsAndCheckStake(
logger logging.Logger,
network *peers.AppRequestNetwork,
cfg *config.Config,
) error {
// Manually connect to the validators of each of the source subnets.
// We return an error if we are unable to connect to sufficient stake on any of the subnets.
// Sufficient stake is determined by the Warp quora of the configured supported destinations,
// or if the subnet supports all destinations, by the quora of all configured destinations.
for _, sourceBlockchain := range cfg.SourceBlockchains {
if sourceBlockchain.GetSubnetID() == constants.PrimaryNetworkID {
if err := connectToPrimaryNetworkPeers(logger, network, cfg, sourceBlockchain); err != nil {
return fmt.Errorf(
"failed to connect to primary network peers: %w",
err,
)
}
} else {
if err := connectToNonPrimaryNetworkPeers(logger, network, cfg, sourceBlockchain); err != nil {
return fmt.Errorf(
"failed to connect to non-primary network peers: %w",
err,
)
}
}
}
return nil
}

// Connect to the validators of the source blockchain. For each destination blockchain,
// verify that we have connected to a threshold of stake.
func connectToNonPrimaryNetworkPeers(
logger logging.Logger,
network *peers.AppRequestNetwork,
cfg *config.Config,
sourceBlockchain *config.SourceBlockchain,
) error {
subnetID := sourceBlockchain.GetSubnetID()
connectedValidators, err := network.ConnectToCanonicalValidators(subnetID)
if err != nil {
logger.Error(
"Failed to connect to canonical validators",
zap.String("subnetID", subnetID.String()),
zap.Error(err),
)
return err
}
for _, destination := range sourceBlockchain.SupportedDestinations {
blockchainID := destination.GetBlockchainID()
if ok, quorum, err := checkForSufficientConnectedStake(logger, cfg, connectedValidators, blockchainID); !ok {
logger.Error(
"Failed to connect to a threshold of stake",
zap.String("destinationBlockchainID", blockchainID.String()),
zap.Uint64("connectedWeight", connectedValidators.ConnectedWeight),
zap.Uint64("totalValidatorWeight", connectedValidators.TotalValidatorWeight),
zap.Any("warpQuorum", quorum),
)
return err
}
}
return nil
}

// Connect to the validators of the destination blockchains. Verify that we have connected
// to a threshold of stake for each blockchain.
func connectToPrimaryNetworkPeers(
logger logging.Logger,
network *peers.AppRequestNetwork,
cfg *config.Config,
sourceBlockchain *config.SourceBlockchain,
) error {
for _, destination := range sourceBlockchain.SupportedDestinations {
blockchainID := destination.GetBlockchainID()
subnetID := cfg.GetSubnetID(blockchainID)
connectedValidators, err := network.ConnectToCanonicalValidators(subnetID)
if err != nil {
logger.Error(
"Failed to connect to canonical validators",
zap.String("subnetID", subnetID.String()),
zap.Error(err),
)
return err
}

if ok, quorum, err := checkForSufficientConnectedStake(logger, cfg, connectedValidators, blockchainID); !ok {
logger.Error(
"Failed to connect to a threshold of stake",
zap.String("destinationBlockchainID", blockchainID.String()),
zap.Uint64("connectedWeight", connectedValidators.ConnectedWeight),
zap.Uint64("totalValidatorWeight", connectedValidators.TotalValidatorWeight),
zap.Any("warpQuorum", quorum),
)
return err
}
}
return nil
}

// Fetch the warp quorum from the config and check if the connected stake exceeds the threshold
func checkForSufficientConnectedStake(
logger logging.Logger,
cfg *config.Config,
connectedValidators *peers.ConnectedCanonicalValidators,
destinationBlockchainID ids.ID,
) (bool, *config.WarpQuorum, error) {
quorum, err := cfg.GetWarpQuorum(destinationBlockchainID)
if err != nil {
logger.Error(
"Failed to get warp quorum from config",
zap.String("destinationBlockchainID", destinationBlockchainID.String()),
zap.Error(err),
)
return false, nil, err
}
return utils.CheckStakeWeightExceedsThreshold(
big.NewInt(0).SetUint64(connectedValidators.ConnectedWeight),
connectedValidators.TotalValidatorWeight,
quorum.QuorumNumerator,
quorum.QuorumDenominator,
), &quorum, nil
}
139 changes: 139 additions & 0 deletions relayer/network_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package relayer

import (
"fmt"
"math/big"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/constants"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/awm-relayer/peers"
"github.com/ava-labs/awm-relayer/relayer/config"
"github.com/ava-labs/awm-relayer/utils"
"go.uber.org/zap"
)

func InitializeConnectionsAndCheckStake(
logger logging.Logger,
network *peers.AppRequestNetwork,
cfg *config.Config,
) error {
// Manually connect to the validators of each of the source subnets.
// We return an error if we are unable to connect to sufficient stake on any of the subnets.
// Sufficient stake is determined by the Warp quora of the configured supported destinations,
// or if the subnet supports all destinations, by the quora of all configured destinations.
for _, sourceBlockchain := range cfg.SourceBlockchains {
if sourceBlockchain.GetSubnetID() == constants.PrimaryNetworkID {
if err := connectToPrimaryNetworkPeers(logger, network, cfg, sourceBlockchain); err != nil {
return fmt.Errorf(
"failed to connect to primary network peers: %w",
err,
)
}
} else {
if err := connectToNonPrimaryNetworkPeers(logger, network, cfg, sourceBlockchain); err != nil {
return fmt.Errorf(
"failed to connect to non-primary network peers: %w",
err,
)
}
}
}
return nil
}

// Connect to the validators of the source blockchain. For each destination blockchain,
// verify that we have connected to a threshold of stake.
func connectToNonPrimaryNetworkPeers(
logger logging.Logger,
network *peers.AppRequestNetwork,
cfg *config.Config,
sourceBlockchain *config.SourceBlockchain,
) error {
subnetID := sourceBlockchain.GetSubnetID()
connectedValidators, err := network.ConnectToCanonicalValidators(subnetID)
if err != nil {
logger.Error(
"Failed to connect to canonical validators",
zap.String("subnetID", subnetID.String()),
zap.Error(err),
)
return err
}
for _, destination := range sourceBlockchain.SupportedDestinations {
blockchainID := destination.GetBlockchainID()
if ok, quorum, err := checkForSufficientConnectedStake(logger, cfg, connectedValidators, blockchainID); !ok {
logger.Error(
"Failed to connect to a threshold of stake",
zap.String("destinationBlockchainID", blockchainID.String()),
zap.Uint64("connectedWeight", connectedValidators.ConnectedWeight),
zap.Uint64("totalValidatorWeight", connectedValidators.TotalValidatorWeight),
zap.Any("warpQuorum", quorum),
)
return err
}
}
return nil
}

// Connect to the validators of the destination blockchains. Verify that we have connected
// to a threshold of stake for each blockchain.
func connectToPrimaryNetworkPeers(
logger logging.Logger,
network *peers.AppRequestNetwork,
cfg *config.Config,
sourceBlockchain *config.SourceBlockchain,
) error {
for _, destination := range sourceBlockchain.SupportedDestinations {
blockchainID := destination.GetBlockchainID()
subnetID := cfg.GetSubnetID(blockchainID)
connectedValidators, err := network.ConnectToCanonicalValidators(subnetID)
if err != nil {
logger.Error(
"Failed to connect to canonical validators",
zap.String("subnetID", subnetID.String()),
zap.Error(err),
)
return err
}

if ok, quorum, err := checkForSufficientConnectedStake(logger, cfg, connectedValidators, blockchainID); !ok {
logger.Error(
"Failed to connect to a threshold of stake",
zap.String("destinationBlockchainID", blockchainID.String()),
zap.Uint64("connectedWeight", connectedValidators.ConnectedWeight),
zap.Uint64("totalValidatorWeight", connectedValidators.TotalValidatorWeight),
zap.Any("warpQuorum", quorum),
)
return err
}
}
return nil
}

// Fetch the warp quorum from the config and check if the connected stake exceeds the threshold
func checkForSufficientConnectedStake(
logger logging.Logger,
cfg *config.Config,
connectedValidators *peers.ConnectedCanonicalValidators,
destinationBlockchainID ids.ID,
) (bool, *config.WarpQuorum, error) {
quorum, err := cfg.GetWarpQuorum(destinationBlockchainID)
if err != nil {
logger.Error(
"Failed to get warp quorum from config",
zap.String("destinationBlockchainID", destinationBlockchainID.String()),
zap.Error(err),
)
return false, nil, err
}
return utils.CheckStakeWeightExceedsThreshold(
big.NewInt(0).SetUint64(connectedValidators.ConnectedWeight),
connectedValidators.TotalValidatorWeight,
quorum.QuorumNumerator,
quorum.QuorumDenominator,
), &quorum, nil
}

0 comments on commit a4302df

Please sign in to comment.