diff --git a/relayer/main/main.go b/relayer/main/main.go index 0c45b737..3615536a 100644 --- a/relayer/main/main.go +++ b/relayer/main/main.go @@ -7,7 +7,6 @@ import ( "context" "fmt" "log" - "math/big" "net/http" "os" "runtime" @@ -153,13 +152,17 @@ func main() { trackedSubnets, &cfg, ) - initializeConnectionsAndCheckStake(logger, network, &cfg) - if err != nil { logger.Fatal("Failed to create app request network", zap.Error(err)) panic(err) } + err = relayer.InitializeConnectionsAndCheckStake(logger, network, &cfg) + if err != nil { + logger.Fatal("Failed to initialize connections and check stake", zap.Error(err)) + panic(err) + } + startMetricsServer(logger, gatherer, cfg.MetricsPort) relayerMetrics, err := relayer.NewApplicationRelayerMetrics(registerer) @@ -549,125 +552,3 @@ func initializeMetrics() (prometheus.Gatherer, prometheus.Registerer, error) { } return gatherer, registry, nil } - -func initializeConnectionsAndCheckStake( - logger logging.Logger, - network *peers.AppRequestNetwork, - cfg *config.Config, -) error { - // Manually connect to the validators of each of the source subnets. - // We return an error if we are unable to connect to sufficient stake on any of the subnets. - // Sufficient stake is determined by the Warp quora of the configured supported destinations, - // or if the subnet supports all destinations, by the quora of all configured destinations. - for _, sourceBlockchain := range cfg.SourceBlockchains { - if sourceBlockchain.GetSubnetID() == constants.PrimaryNetworkID { - if err := connectToPrimaryNetworkPeers(logger, network, cfg, sourceBlockchain); err != nil { - return fmt.Errorf( - "failed to connect to primary network peers: %w", - err, - ) - } - } else { - if err := connectToNonPrimaryNetworkPeers(logger, network, cfg, sourceBlockchain); err != nil { - return fmt.Errorf( - "failed to connect to non-primary network peers: %w", - err, - ) - } - } - } - return nil -} - -// Connect to the validators of the source blockchain. For each destination blockchain, -// verify that we have connected to a threshold of stake. -func connectToNonPrimaryNetworkPeers( - logger logging.Logger, - network *peers.AppRequestNetwork, - cfg *config.Config, - sourceBlockchain *config.SourceBlockchain, -) error { - subnetID := sourceBlockchain.GetSubnetID() - connectedValidators, err := network.ConnectToCanonicalValidators(subnetID) - if err != nil { - logger.Error( - "Failed to connect to canonical validators", - zap.String("subnetID", subnetID.String()), - zap.Error(err), - ) - return err - } - for _, destination := range sourceBlockchain.SupportedDestinations { - blockchainID := destination.GetBlockchainID() - if ok, quorum, err := checkForSufficientConnectedStake(logger, cfg, connectedValidators, blockchainID); !ok { - logger.Error( - "Failed to connect to a threshold of stake", - zap.String("destinationBlockchainID", blockchainID.String()), - zap.Uint64("connectedWeight", connectedValidators.ConnectedWeight), - zap.Uint64("totalValidatorWeight", connectedValidators.TotalValidatorWeight), - zap.Any("warpQuorum", quorum), - ) - return err - } - } - return nil -} - -// Connect to the validators of the destination blockchains. Verify that we have connected -// to a threshold of stake for each blockchain. -func connectToPrimaryNetworkPeers( - logger logging.Logger, - network *peers.AppRequestNetwork, - cfg *config.Config, - sourceBlockchain *config.SourceBlockchain, -) error { - for _, destination := range sourceBlockchain.SupportedDestinations { - blockchainID := destination.GetBlockchainID() - subnetID := cfg.GetSubnetID(blockchainID) - connectedValidators, err := network.ConnectToCanonicalValidators(subnetID) - if err != nil { - logger.Error( - "Failed to connect to canonical validators", - zap.String("subnetID", subnetID.String()), - zap.Error(err), - ) - return err - } - - if ok, quorum, err := checkForSufficientConnectedStake(logger, cfg, connectedValidators, blockchainID); !ok { - logger.Error( - "Failed to connect to a threshold of stake", - zap.String("destinationBlockchainID", blockchainID.String()), - zap.Uint64("connectedWeight", connectedValidators.ConnectedWeight), - zap.Uint64("totalValidatorWeight", connectedValidators.TotalValidatorWeight), - zap.Any("warpQuorum", quorum), - ) - return err - } - } - return nil -} - -// Fetch the warp quorum from the config and check if the connected stake exceeds the threshold -func checkForSufficientConnectedStake( - logger logging.Logger, - cfg *config.Config, - connectedValidators *peers.ConnectedCanonicalValidators, - destinationBlockchainID ids.ID, -) (bool, *config.WarpQuorum, error) { - quorum, err := cfg.GetWarpQuorum(destinationBlockchainID) - if err != nil { - logger.Error( - "Failed to get warp quorum from config", - zap.String("destinationBlockchainID", destinationBlockchainID.String()), - zap.Error(err), - ) - return false, nil, err - } - return utils.CheckStakeWeightExceedsThreshold( - big.NewInt(0).SetUint64(connectedValidators.ConnectedWeight), - connectedValidators.TotalValidatorWeight, - quorum.QuorumNumerator, - quorum.QuorumDenominator, - ), &quorum, nil -} diff --git a/relayer/network_utils.go b/relayer/network_utils.go new file mode 100644 index 00000000..a76334bb --- /dev/null +++ b/relayer/network_utils.go @@ -0,0 +1,139 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package relayer + +import ( + "fmt" + "math/big" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/constants" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/awm-relayer/peers" + "github.com/ava-labs/awm-relayer/relayer/config" + "github.com/ava-labs/awm-relayer/utils" + "go.uber.org/zap" +) + +func InitializeConnectionsAndCheckStake( + logger logging.Logger, + network *peers.AppRequestNetwork, + cfg *config.Config, +) error { + // Manually connect to the validators of each of the source subnets. + // We return an error if we are unable to connect to sufficient stake on any of the subnets. + // Sufficient stake is determined by the Warp quora of the configured supported destinations, + // or if the subnet supports all destinations, by the quora of all configured destinations. + for _, sourceBlockchain := range cfg.SourceBlockchains { + if sourceBlockchain.GetSubnetID() == constants.PrimaryNetworkID { + if err := connectToPrimaryNetworkPeers(logger, network, cfg, sourceBlockchain); err != nil { + return fmt.Errorf( + "failed to connect to primary network peers: %w", + err, + ) + } + } else { + if err := connectToNonPrimaryNetworkPeers(logger, network, cfg, sourceBlockchain); err != nil { + return fmt.Errorf( + "failed to connect to non-primary network peers: %w", + err, + ) + } + } + } + return nil +} + +// Connect to the validators of the source blockchain. For each destination blockchain, +// verify that we have connected to a threshold of stake. +func connectToNonPrimaryNetworkPeers( + logger logging.Logger, + network *peers.AppRequestNetwork, + cfg *config.Config, + sourceBlockchain *config.SourceBlockchain, +) error { + subnetID := sourceBlockchain.GetSubnetID() + connectedValidators, err := network.ConnectToCanonicalValidators(subnetID) + if err != nil { + logger.Error( + "Failed to connect to canonical validators", + zap.String("subnetID", subnetID.String()), + zap.Error(err), + ) + return err + } + for _, destination := range sourceBlockchain.SupportedDestinations { + blockchainID := destination.GetBlockchainID() + if ok, quorum, err := checkForSufficientConnectedStake(logger, cfg, connectedValidators, blockchainID); !ok { + logger.Error( + "Failed to connect to a threshold of stake", + zap.String("destinationBlockchainID", blockchainID.String()), + zap.Uint64("connectedWeight", connectedValidators.ConnectedWeight), + zap.Uint64("totalValidatorWeight", connectedValidators.TotalValidatorWeight), + zap.Any("warpQuorum", quorum), + ) + return err + } + } + return nil +} + +// Connect to the validators of the destination blockchains. Verify that we have connected +// to a threshold of stake for each blockchain. +func connectToPrimaryNetworkPeers( + logger logging.Logger, + network *peers.AppRequestNetwork, + cfg *config.Config, + sourceBlockchain *config.SourceBlockchain, +) error { + for _, destination := range sourceBlockchain.SupportedDestinations { + blockchainID := destination.GetBlockchainID() + subnetID := cfg.GetSubnetID(blockchainID) + connectedValidators, err := network.ConnectToCanonicalValidators(subnetID) + if err != nil { + logger.Error( + "Failed to connect to canonical validators", + zap.String("subnetID", subnetID.String()), + zap.Error(err), + ) + return err + } + + if ok, quorum, err := checkForSufficientConnectedStake(logger, cfg, connectedValidators, blockchainID); !ok { + logger.Error( + "Failed to connect to a threshold of stake", + zap.String("destinationBlockchainID", blockchainID.String()), + zap.Uint64("connectedWeight", connectedValidators.ConnectedWeight), + zap.Uint64("totalValidatorWeight", connectedValidators.TotalValidatorWeight), + zap.Any("warpQuorum", quorum), + ) + return err + } + } + return nil +} + +// Fetch the warp quorum from the config and check if the connected stake exceeds the threshold +func checkForSufficientConnectedStake( + logger logging.Logger, + cfg *config.Config, + connectedValidators *peers.ConnectedCanonicalValidators, + destinationBlockchainID ids.ID, +) (bool, *config.WarpQuorum, error) { + quorum, err := cfg.GetWarpQuorum(destinationBlockchainID) + if err != nil { + logger.Error( + "Failed to get warp quorum from config", + zap.String("destinationBlockchainID", destinationBlockchainID.String()), + zap.Error(err), + ) + return false, nil, err + } + return utils.CheckStakeWeightExceedsThreshold( + big.NewInt(0).SetUint64(connectedValidators.ConnectedWeight), + connectedValidators.TotalValidatorWeight, + quorum.QuorumNumerator, + quorum.QuorumDenominator, + ), &quorum, nil +}