diff --git a/pkg/controllers/tagging/tagging_controller.go b/pkg/controllers/tagging/tagging_controller.go index 1d1b3bef50..909c8237eb 100644 --- a/pkg/controllers/tagging/tagging_controller.go +++ b/pkg/controllers/tagging/tagging_controller.go @@ -34,6 +34,8 @@ import ( nodehelpers "k8s.io/cloud-provider/node/helpers" _ "k8s.io/component-base/metrics/prometheus/workqueue" // enable prometheus provider for workqueue metrics "k8s.io/klog/v2" + + "k8s.io/cloud-provider-aws/pkg/providers/v1/variant" ) func init() { @@ -231,8 +233,9 @@ func (tc *Controller) process() bool { } klog.Infof("Instance ID of work item %s is %s", workItem, instanceID) - if awsv1.IsFargateNode(string(instanceID)) { - klog.Infof("Skip processing the node %s since it is a Fargate node", instanceID) + if variant.IsVariantNode(string(instanceID)) { + klog.Infof("Skip processing the node %s since it is a %s node", + instanceID, variant.NodeType(string(instanceID))) tc.workqueue.Forget(obj) return nil } diff --git a/pkg/controllers/tagging/tagging_controller_test.go b/pkg/controllers/tagging/tagging_controller_test.go index a96b4135f5..89142bec12 100644 --- a/pkg/controllers/tagging/tagging_controller_test.go +++ b/pkg/controllers/tagging/tagging_controller_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/util/workqueue" awsv1 "k8s.io/cloud-provider-aws/pkg/providers/v1" + "k8s.io/cloud-provider-aws/pkg/providers/v1/config" "k8s.io/klog/v2" ) @@ -133,7 +134,7 @@ func Test_NodesJoiningAndLeaving(t *testing.T) { }, }, toBeTagged: true, - expectedMessages: []string{"Skip processing the node fargate-ip-10-0-55-27.us-west-2.compute.internal since it is a Fargate node"}, + expectedMessages: []string{"Skip processing the node fargate-ip-10-0-55-27.us-west-2.compute.internal since it is a fargate node"}, }, { name: "node0 leaves the cluster, failed to untag.", @@ -194,7 +195,7 @@ func Test_NodesJoiningAndLeaving(t *testing.T) { } awsServices := awsv1.NewFakeAWSServices(TestClusterID) - fakeAws, _ := awsv1.NewAWSCloud(awsv1.CloudConfig{}, awsServices) + fakeAws, _ := awsv1.NewAWSCloud(config.CloudConfig{}, awsServices) for _, testcase := range testcases { t.Run(testcase.name, func(t *testing.T) { diff --git a/pkg/providers/v1/aws.go b/pkg/providers/v1/aws.go index ca56321bd3..a2b8fcac0b 100644 --- a/pkg/providers/v1/aws.go +++ b/pkg/providers/v1/aws.go @@ -33,7 +33,6 @@ import ( "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/credentials/stscreds" - "github.com/aws/aws-sdk-go/aws/ec2metadata" "github.com/aws/aws-sdk-go/aws/endpoints" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" @@ -68,6 +67,11 @@ import ( volumehelpers "k8s.io/cloud-provider/volume/helpers" "k8s.io/klog/v2" netutils "k8s.io/utils/net" + + "k8s.io/cloud-provider-aws/pkg/providers/v1/config" + "k8s.io/cloud-provider-aws/pkg/providers/v1/iface" + "k8s.io/cloud-provider-aws/pkg/providers/v1/variant" + _ "k8s.io/cloud-provider-aws/pkg/providers/v1/variant/fargate" // ensure the fargate variant gets registered ) // NLBHealthCheckRuleDescription is the comment used on a security group rule to @@ -285,9 +289,6 @@ const ( // but we are using a lower limit on purpose filterNodeLimit = 150 - // fargateNodeNamePrefix string is added to awsInstance nodeName and providerID of Fargate nodes. - fargateNodeNamePrefix = "fargate-" - // privateDNSNamePrefix is the prefix added to ENI Private DNS Name. privateDNSNamePrefix = "ip-" @@ -334,62 +335,14 @@ const DefaultVolumeType = "gp2" // Services is an abstraction over AWS, to allow mocking/other implementations type Services interface { - Compute(region string) (EC2, error) + Compute(region string) (iface.EC2, error) LoadBalancing(region string) (ELB, error) LoadBalancingV2(region string) (ELBV2, error) Autoscaling(region string) (ASG, error) - Metadata() (EC2Metadata, error) + Metadata() (config.EC2Metadata, error) KeyManagement(region string) (KMS, error) } -// EC2 is an abstraction over AWS', to allow mocking/other implementations -// Note that the DescribeX functions return a list, so callers don't need to deal with paging -// TODO: Should we rename this to AWS (EBS & ELB are not technically part of EC2) -type EC2 interface { - // Query EC2 for instances matching the filter - DescribeInstances(request *ec2.DescribeInstancesInput) ([]*ec2.Instance, error) - - // Attach a volume to an instance - AttachVolume(*ec2.AttachVolumeInput) (*ec2.VolumeAttachment, error) - // Detach a volume from an instance it is attached to - DetachVolume(request *ec2.DetachVolumeInput) (resp *ec2.VolumeAttachment, err error) - // Lists volumes - DescribeVolumes(request *ec2.DescribeVolumesInput) ([]*ec2.Volume, error) - // Create an EBS volume - CreateVolume(request *ec2.CreateVolumeInput) (resp *ec2.Volume, err error) - // Delete an EBS volume - DeleteVolume(*ec2.DeleteVolumeInput) (*ec2.DeleteVolumeOutput, error) - - ModifyVolume(*ec2.ModifyVolumeInput) (*ec2.ModifyVolumeOutput, error) - - DescribeVolumeModifications(*ec2.DescribeVolumesModificationsInput) ([]*ec2.VolumeModification, error) - - DescribeSecurityGroups(request *ec2.DescribeSecurityGroupsInput) ([]*ec2.SecurityGroup, error) - - CreateSecurityGroup(*ec2.CreateSecurityGroupInput) (*ec2.CreateSecurityGroupOutput, error) - DeleteSecurityGroup(request *ec2.DeleteSecurityGroupInput) (*ec2.DeleteSecurityGroupOutput, error) - - AuthorizeSecurityGroupIngress(*ec2.AuthorizeSecurityGroupIngressInput) (*ec2.AuthorizeSecurityGroupIngressOutput, error) - RevokeSecurityGroupIngress(*ec2.RevokeSecurityGroupIngressInput) (*ec2.RevokeSecurityGroupIngressOutput, error) - - DescribeSubnets(*ec2.DescribeSubnetsInput) ([]*ec2.Subnet, error) - - DescribeAvailabilityZones(request *ec2.DescribeAvailabilityZonesInput) ([]*ec2.AvailabilityZone, error) - - CreateTags(*ec2.CreateTagsInput) (*ec2.CreateTagsOutput, error) - DeleteTags(input *ec2.DeleteTagsInput) (*ec2.DeleteTagsOutput, error) - - DescribeRouteTables(request *ec2.DescribeRouteTablesInput) ([]*ec2.RouteTable, error) - CreateRoute(request *ec2.CreateRouteInput) (*ec2.CreateRouteOutput, error) - DeleteRoute(request *ec2.DeleteRouteInput) (*ec2.DeleteRouteOutput, error) - - ModifyInstanceAttribute(request *ec2.ModifyInstanceAttributeInput) (*ec2.ModifyInstanceAttributeOutput, error) - - DescribeVpcs(input *ec2.DescribeVpcsInput) (*ec2.DescribeVpcsOutput, error) - - DescribeNetworkInterfaces(input *ec2.DescribeNetworkInterfacesInput) (*ec2.DescribeNetworkInterfacesOutput, error) -} - // ELB is a simple pass-through of AWS' ELB client interface, which allows for testing type ELB interface { CreateLoadBalancer(*elb.CreateLoadBalancerInput) (*elb.CreateLoadBalancerOutput, error) @@ -462,13 +415,6 @@ type KMS interface { DescribeKey(*kms.DescribeKeyInput) (*kms.DescribeKeyOutput, error) } -// EC2Metadata is an abstraction over the AWS metadata service. -type EC2Metadata interface { - // Query the EC2 metadata service (used to discover instance-id etc) - GetMetadata(path string) (string, error) - Region() (string, error) -} - // AWS volume types const ( // Provisioned IOPS SSD @@ -563,13 +509,13 @@ var _ cloudprovider.PVLabeler = (*Cloud)(nil) // Cloud is an implementation of Interface, LoadBalancer and Instances for Amazon Web Services. type Cloud struct { - ec2 EC2 + ec2 iface.EC2 elb ELB elbv2 ELBV2 asg ASG kms KMS - metadata EC2Metadata - cfg *CloudConfig + metadata config.EC2Metadata + cfg *config.CloudConfig region string vpcID string @@ -602,165 +548,6 @@ type Cloud struct { deviceAllocators map[types.NodeName]DeviceAllocator } -var _ Volumes = &Cloud{} - -// CloudConfig wraps the settings for the AWS cloud provider. -// NOTE: Cloud config files should follow the same Kubernetes deprecation policy as -// flags or CLIs. Config fields should not change behavior in incompatible ways and -// should be deprecated for at least 2 release prior to removing. -// See https://kubernetes.io/docs/reference/using-api/deprecation-policy/#deprecating-a-flag-or-cli -// for more details. -type CloudConfig struct { - Global struct { - // TODO: Is there any use for this? We can get it from the instance metadata service - // Maybe if we're not running on AWS, e.g. bootstrap; for now it is not very useful - Zone string - - Region string - - // The AWS VPC flag enables the possibility to run the master components - // on a different aws account, on a different cloud provider or on-premises. - // If the flag is set also the KubernetesClusterTag must be provided - VPC string - // SubnetID enables using a specific subnet to use for ELB's - SubnetID string - // RouteTableID enables using a specific RouteTable - RouteTableID string - - // RoleARN is the IAM role to assume when interaction with AWS APIs. - RoleARN string - // SourceARN is value which is passed while assuming role specified by RoleARN. When a service - // assumes a role in your account, you can include the aws:SourceAccount and aws:SourceArn global - // condition context keys in your role trust policy to limit access to the role to only requests that are generated - // by expected resources. https://docs.aws.amazon.com/IAM/latest/UserGuide/confused-deputy.html - SourceARN string - - // KubernetesClusterTag is the legacy cluster id we'll use to identify our cluster resources - KubernetesClusterTag string - // KubernetesClusterID is the cluster id we'll use to identify our cluster resources - KubernetesClusterID string - - //The aws provider creates an inbound rule per load balancer on the node security - //group. However, this can run into the AWS security group rule limit of 50 if - //many LoadBalancers are created. - // - //This flag disables the automatic ingress creation. It requires that the user - //has setup a rule that allows inbound traffic on kubelet ports from the - //local VPC subnet (so load balancers can access it). E.g. 10.82.0.0/16 30000-32000. - DisableSecurityGroupIngress bool - - //AWS has a hard limit of 500 security groups. For large clusters creating a security group for each ELB - //can cause the max number of security groups to be reached. If this is set instead of creating a new - //Security group for each ELB this security group will be used instead. - ElbSecurityGroup string - - // NodeIPFamilies determines which IP addresses are added to node objects and their ordering. - NodeIPFamilies []string - } - // [ServiceOverride "1"] - // Service = s3 - // Region = region1 - // URL = https://s3.foo.bar - // SigningRegion = signing_region - // SigningMethod = signing_method - // - // [ServiceOverride "2"] - // Service = ec2 - // Region = region2 - // URL = https://ec2.foo.bar - // SigningRegion = signing_region - // SigningMethod = signing_method - ServiceOverride map[string]*struct { - Service string - Region string - URL string - SigningRegion string - SigningMethod string - SigningName string - } -} - -// GetRegion returns the AWS region from the config, if set, or gets it from the metadata -// service if unset and sets in config -func (cfg *CloudConfig) GetRegion(metadata EC2Metadata) (string, error) { - if cfg.Global.Region != "" { - return cfg.Global.Region, nil - } - - klog.Info("Loading region from metadata service") - region, err := metadata.Region() - if err != nil { - return "", err - } - - cfg.Global.Region = region - return region, nil -} - -func (cfg *CloudConfig) validateOverrides() error { - if len(cfg.ServiceOverride) == 0 { - return nil - } - set := make(map[string]bool) - for onum, ovrd := range cfg.ServiceOverride { - // Note: gcfg does not space trim, so we have to when comparing to empty string "" - name := strings.TrimSpace(ovrd.Service) - if name == "" { - return fmt.Errorf("service name is missing [Service is \"\"] in override %s", onum) - } - // insure the map service name is space trimmed - ovrd.Service = name - - region := strings.TrimSpace(ovrd.Region) - if region == "" { - return fmt.Errorf("service region is missing [Region is \"\"] in override %s", onum) - } - // insure the map region is space trimmed - ovrd.Region = region - - url := strings.TrimSpace(ovrd.URL) - if url == "" { - return fmt.Errorf("url is missing [URL is \"\"] in override %s", onum) - } - signingRegion := strings.TrimSpace(ovrd.SigningRegion) - if signingRegion == "" { - return fmt.Errorf("signingRegion is missing [SigningRegion is \"\"] in override %s", onum) - } - signature := name + "_" + region - if set[signature] { - return fmt.Errorf("duplicate entry found for service override [%s] (%s in %s)", onum, name, region) - } - set[signature] = true - } - return nil -} - -func (cfg *CloudConfig) getResolver() endpoints.ResolverFunc { - defaultResolver := endpoints.DefaultResolver() - defaultResolverFn := func(service, region string, - optFns ...func(*endpoints.Options)) (endpoints.ResolvedEndpoint, error) { - return defaultResolver.EndpointFor(service, region, optFns...) - } - if len(cfg.ServiceOverride) == 0 { - return defaultResolverFn - } - - return func(service, region string, - optFns ...func(*endpoints.Options)) (endpoints.ResolvedEndpoint, error) { - for _, override := range cfg.ServiceOverride { - if override.Service == service && override.Region == region { - return endpoints.ResolvedEndpoint{ - URL: override.URL, - SigningRegion: override.SigningRegion, - SigningMethod: override.SigningMethod, - SigningName: override.SigningName, - }, nil - } - } - return defaultResolver.EndpointFor(service, region, optFns...) - } -} - // awsSdkEC2 is an implementation of the EC2 interface, backed by aws-sdk-go type awsSdkEC2 struct { ec2 ec2iface.EC2API @@ -768,23 +555,7 @@ type awsSdkEC2 struct { // Interface to make the CloudConfig immutable for awsSDKProvider type awsCloudConfigProvider interface { - getResolver() endpoints.ResolverFunc -} - -type awsSDKProvider struct { - creds *credentials.Credentials - cfg awsCloudConfigProvider - - mutex sync.Mutex - regionDelayers map[string]*CrossRequestRetryDelay -} - -func newAWSSDKProvider(creds *credentials.Credentials, cfg *CloudConfig) *awsSDKProvider { - return &awsSDKProvider{ - creds: creds, - cfg: cfg, - regionDelayers: make(map[string]*CrossRequestRetryDelay), - } + GetResolver() endpoints.ResolverFunc } func (p *awsSDKProvider) addHandlers(regionName string, h *request.Handlers) { @@ -814,39 +585,6 @@ func (p *awsSDKProvider) addHandlers(regionName string, h *request.Handlers) { p.addAPILoggingHandlers(h) } -func (p *awsSDKProvider) addAPILoggingHandlers(h *request.Handlers) { - h.Send.PushBackNamed(request.NamedHandler{ - Name: "k8s/api-request", - Fn: awsSendHandlerLogger, - }) - - h.ValidateResponse.PushFrontNamed(request.NamedHandler{ - Name: "k8s/api-validate-response", - Fn: awsValidateResponseHandlerLogger, - }) -} - -// Get a CrossRequestRetryDelay, scoped to the region, not to the request. -// This means that when we hit a limit on a call, we will delay _all_ calls to the API. -// We do this to protect the AWS account from becoming overloaded and effectively locked. -// We also log when we hit request limits. -// Note that this delays the current goroutine; this is bad behaviour and will -// likely cause k8s to become slow or unresponsive for cloud operations. -// However, this throttle is intended only as a last resort. When we observe -// this throttling, we need to address the root cause (e.g. add a delay to a -// controller retry loop) -func (p *awsSDKProvider) getCrossRequestRetryDelay(regionName string) *CrossRequestRetryDelay { - p.mutex.Lock() - defer p.mutex.Unlock() - - delayer, found := p.regionDelayers[regionName] - if !found { - delayer = NewCrossRequestRetryDelay() - p.regionDelayers[regionName] = delayer - } - return delayer -} - // InstanceIDIndexFunc indexes based on a Node's instance ID found in its spec.providerID func InstanceIDIndexFunc(obj interface{}) ([]string, error) { node, ok := obj.(*v1.Node) @@ -877,126 +615,6 @@ func (c *Cloud) SetInformers(informerFactory informers.SharedInformerFactory) { }) } -func (p *awsSDKProvider) Compute(regionName string) (EC2, error) { - awsConfig := &aws.Config{ - Region: ®ionName, - Credentials: p.creds, - } - awsConfig = awsConfig.WithCredentialsChainVerboseErrors(true). - WithEndpointResolver(p.cfg.getResolver()) - sess, err := session.NewSessionWithOptions(session.Options{ - Config: *awsConfig, - SharedConfigState: session.SharedConfigEnable, - }) - - if err != nil { - return nil, fmt.Errorf("unable to initialize AWS session: %v", err) - } - service := ec2.New(sess) - - p.addHandlers(regionName, &service.Handlers) - - ec2 := &awsSdkEC2{ - ec2: service, - } - return ec2, nil -} - -func (p *awsSDKProvider) LoadBalancing(regionName string) (ELB, error) { - awsConfig := &aws.Config{ - Region: ®ionName, - Credentials: p.creds, - } - awsConfig = awsConfig.WithCredentialsChainVerboseErrors(true). - WithEndpointResolver(p.cfg.getResolver()) - sess, err := session.NewSessionWithOptions(session.Options{ - Config: *awsConfig, - SharedConfigState: session.SharedConfigEnable, - }) - if err != nil { - return nil, fmt.Errorf("unable to initialize AWS session: %v", err) - } - elbClient := elb.New(sess) - p.addHandlers(regionName, &elbClient.Handlers) - - return elbClient, nil -} - -func (p *awsSDKProvider) LoadBalancingV2(regionName string) (ELBV2, error) { - awsConfig := &aws.Config{ - Region: ®ionName, - Credentials: p.creds, - } - awsConfig = awsConfig.WithCredentialsChainVerboseErrors(true). - WithEndpointResolver(p.cfg.getResolver()) - sess, err := session.NewSessionWithOptions(session.Options{ - Config: *awsConfig, - SharedConfigState: session.SharedConfigEnable, - }) - if err != nil { - return nil, fmt.Errorf("unable to initialize AWS session: %v", err) - } - elbClient := elbv2.New(sess) - - p.addHandlers(regionName, &elbClient.Handlers) - - return elbClient, nil -} - -func (p *awsSDKProvider) Autoscaling(regionName string) (ASG, error) { - awsConfig := &aws.Config{ - Region: ®ionName, - Credentials: p.creds, - } - awsConfig = awsConfig.WithCredentialsChainVerboseErrors(true). - WithEndpointResolver(p.cfg.getResolver()) - sess, err := session.NewSessionWithOptions(session.Options{ - Config: *awsConfig, - SharedConfigState: session.SharedConfigEnable, - }) - if err != nil { - return nil, fmt.Errorf("unable to initialize AWS session: %v", err) - } - client := autoscaling.New(sess) - - p.addHandlers(regionName, &client.Handlers) - - return client, nil -} - -func (p *awsSDKProvider) Metadata() (EC2Metadata, error) { - sess, err := session.NewSession(&aws.Config{ - EndpointResolver: p.cfg.getResolver(), - }) - if err != nil { - return nil, fmt.Errorf("unable to initialize AWS session: %v", err) - } - client := ec2metadata.New(sess) - p.addAPILoggingHandlers(&client.Handlers) - return client, nil -} - -func (p *awsSDKProvider) KeyManagement(regionName string) (KMS, error) { - awsConfig := &aws.Config{ - Region: ®ionName, - Credentials: p.creds, - } - awsConfig = awsConfig.WithCredentialsChainVerboseErrors(true). - WithEndpointResolver(p.cfg.getResolver()) - sess, err := session.NewSessionWithOptions(session.Options{ - Config: *awsConfig, - SharedConfigState: session.SharedConfigEnable, - }) - if err != nil { - return nil, fmt.Errorf("unable to initialize AWS session: %v", err) - } - kmsClient := kms.New(sess) - - p.addHandlers(regionName, &kmsClient.Handlers) - - return kmsClient, nil -} - func newEc2Filter(name string, values ...string) *ec2.Filter { filter := &ec2.Filter{ Name: aws.String(name), @@ -1273,7 +891,7 @@ func init() { return nil, fmt.Errorf("unable to read AWS cloud provider config file: %v", err) } - if err = cfg.validateOverrides(); err != nil { + if err = cfg.ValidateOverrides(); err != nil { return nil, fmt.Errorf("unable to validate custom endpoint overrides: %v", err) } @@ -1312,7 +930,7 @@ func init() { } aws := newAWSSDKProvider(creds, cfg) - return newAWSCloud(*cfg, aws) + return newAWSCloud2(*cfg, aws, aws, creds) }) } @@ -1337,12 +955,12 @@ func getSTSClient(sess *session.Session, roleARN, sourceARN string) (*sts.STS, e } // readAWSCloudConfig reads an instance of AWSCloudConfig from config reader. -func readAWSCloudConfig(config io.Reader) (*CloudConfig, error) { - var cfg CloudConfig +func readAWSCloudConfig(cloudConfig io.Reader) (*config.CloudConfig, error) { + var cfg config.CloudConfig var err error - if config != nil { - err = gcfg.FatalOnly(gcfg.ReadInto(&cfg, config)) + if cloudConfig != nil { + err = gcfg.FatalOnly(gcfg.ReadInto(&cfg, cloudConfig)) if err != nil { return nil, err } @@ -1367,10 +985,14 @@ func azToRegion(az string) (string, error) { return region, nil } +func newAWSCloud(cfg config.CloudConfig, awsServices Services) (*Cloud, error) { + return newAWSCloud2(cfg, awsServices, nil, nil) +} + // newAWSCloud creates a new instance of AWSCloud. // AWSProvider and instanceId are primarily for tests -func newAWSCloud(cfg CloudConfig, awsServices Services) (*Cloud, error) { - // We have some state in the Cloud object - in particular the attaching map +func newAWSCloud2(cfg config.CloudConfig, awsServices Services, provider config.SDKProvider, credentials *credentials.Credentials) (*Cloud, error) { + // We have some state in the Cloud object // Log so that if we are building multiple Cloud objects, it is obvious! klog.Infof("Building AWS cloudprovider") @@ -1465,11 +1087,17 @@ func newAWSCloud(cfg CloudConfig, awsServices Services) (*Cloud, error) { } klog.Infof("The following IP families will be added to nodes: %v", cfg.Global.NodeIPFamilies) + variants := variant.GetVariants() + for _, v := range variants { + if err := v.Initialize(&cfg, credentials, provider, awsCloud.ec2, awsCloud.region); err != nil { + return nil, err + } + } return awsCloud, nil } // NewAWSCloud calls and return new aws cloud from newAWSCloud with the supplied configuration -func NewAWSCloud(cfg CloudConfig, awsServices Services) (*Cloud, error) { +func NewAWSCloud(cfg config.CloudConfig, awsServices Services) (*Cloud, error) { return newAWSCloud(cfg, awsServices) } @@ -1628,16 +1256,6 @@ func extractIPv6NodeAddresses(instance *ec2.Instance) ([]v1.NodeAddress, error) return addresses, nil } -// getNodeAddressesForFargateNode generates list of Node addresses for Fargate node. -func getNodeAddressesForFargateNode(privateDNSName, privateIP string) []v1.NodeAddress { - addresses := []v1.NodeAddress{} - addresses = append(addresses, v1.NodeAddress{Type: v1.NodeInternalIP, Address: privateIP}) - if privateDNSName != "" { - addresses = append(addresses, v1.NodeAddress{Type: v1.NodeInternalDNS, Address: privateDNSName}) - } - return addresses -} - // NodeAddressesByProviderID returns the node addresses of an instances with the specified unique providerID // This method will not be called from the node that is requesting this ID. i.e. metadata service // and other local methods cannot be used here @@ -1647,31 +1265,8 @@ func (c *Cloud) NodeAddressesByProviderID(ctx context.Context, providerID string return nil, err } - if IsFargateNode(string(instanceID)) { - eni, err := c.describeNetworkInterfaces(string(instanceID)) - if eni == nil || err != nil { - return nil, err - } - - var addresses []v1.NodeAddress - - // Assign NodeInternalIP based on IP family - for _, family := range c.cfg.Global.NodeIPFamilies { - switch family { - case "ipv4": - nodeAddresses := getNodeAddressesForFargateNode(aws.StringValue(eni.PrivateDnsName), aws.StringValue(eni.PrivateIpAddress)) - addresses = append(addresses, nodeAddresses...) - case "ipv6": - if eni.Ipv6Addresses == nil || len(eni.Ipv6Addresses) == 0 { - klog.Errorf("no Ipv6Addresses associated with the eni") - continue - } - internalIPv6Address := eni.Ipv6Addresses[0].Ipv6Address - nodeAddresses := getNodeAddressesForFargateNode(aws.StringValue(eni.PrivateDnsName), aws.StringValue(internalIPv6Address)) - addresses = append(addresses, nodeAddresses...) - } - } - return addresses, nil + if v := variant.GetVariant(string(instanceID)); v != nil { + return v.NodeAddresses(string(instanceID), c.vpcID) } instance, err := describeInstance(c.ec2, instanceID) @@ -1709,9 +1304,8 @@ func (c *Cloud) InstanceExistsByProviderID(ctx context.Context, providerID strin return false, err } - if IsFargateNode(string(instanceID)) { - eni, err := c.describeNetworkInterfaces(string(instanceID)) - return eni != nil, err + if v := variant.GetVariant(string(instanceID)); v != nil { + return v.InstanceExists(string(instanceID), c.vpcID) } request := &ec2.DescribeInstancesInput{ @@ -1749,9 +1343,8 @@ func (c *Cloud) InstanceShutdownByProviderID(ctx context.Context, providerID str return false, err } - if IsFargateNode(string(instanceID)) { - eni, err := c.describeNetworkInterfaces(string(instanceID)) - return eni != nil, err + if v := variant.GetVariant(string(instanceID)); v != nil { + return v.InstanceShutdown(string(instanceID), c.vpcID) } request := &ec2.DescribeInstancesInput{ @@ -1810,8 +1403,8 @@ func (c *Cloud) InstanceTypeByProviderID(ctx context.Context, providerID string) return "", err } - if IsFargateNode(string(instanceID)) { - return "", nil + if v := variant.GetVariant(string(instanceID)); v != nil { + return v.InstanceTypeByProviderID(string(instanceID)) } instance, err := describeInstance(c.ec2, instanceID) @@ -1919,15 +1512,8 @@ func (c *Cloud) GetZoneByProviderID(ctx context.Context, providerID string) (clo return cloudprovider.Zone{}, err } - if IsFargateNode(string(instanceID)) { - eni, err := c.describeNetworkInterfaces(string(instanceID)) - if eni == nil || err != nil { - return cloudprovider.Zone{}, err - } - return cloudprovider.Zone{ - FailureDomain: *eni.AvailabilityZone, - Region: c.region, - }, nil + if v := variant.GetVariant(string(instanceID)); v != nil { + return v.GetZone(string(instanceID), c.vpcID, c.region) } instance, err := c.getInstanceByID(string(instanceID)) @@ -1983,52 +1569,6 @@ func IsAWSErrorInstanceNotFound(err error) bool { // This should be stored as a single letter (i.e. c, not sdc or /dev/sdc) type mountDevice string -type awsInstance struct { - ec2 EC2 - - // id in AWS - awsID string - - // node name in k8s - nodeName types.NodeName - - // availability zone the instance resides in - availabilityZone string - - // ID of VPC the instance resides in - vpcID string - - // ID of subnet the instance resides in - subnetID string - - // instance type - instanceType string -} - -// newAWSInstance creates a new awsInstance object -func newAWSInstance(ec2Service EC2, instance *ec2.Instance) *awsInstance { - az := "" - if instance.Placement != nil { - az = aws.StringValue(instance.Placement.AvailabilityZone) - } - self := &awsInstance{ - ec2: ec2Service, - awsID: aws.StringValue(instance.InstanceId), - nodeName: mapInstanceToNodeName(instance), - availabilityZone: az, - instanceType: aws.StringValue(instance.InstanceType), - vpcID: aws.StringValue(instance.VpcId), - subnetID: aws.StringValue(instance.SubnetId), - } - - return self -} - -// Gets the full information about this instance from the EC2 API -func (i *awsInstance) describeInstance() (*ec2.Instance, error) { - return describeInstance(i.ec2, InstanceID(i.awsID)) -} - // Gets the mountDevice already assigned to the volume, or assigns an unused mountDevice. // If the volume is already assigned, this will return the existing mountDevice with alreadyAttached=true. // Otherwise the mountDevice is assigned by finding the first available mountDevice, and it is returned with alreadyAttached=false. @@ -2139,7 +1679,7 @@ func (c *Cloud) endAttaching(i *awsInstance, volumeID EBSVolumeID, mountDevice m } type awsDisk struct { - ec2 EC2 + ec2 iface.EC2 // Name in k8s name KubernetesVolumeID @@ -2731,12 +2271,14 @@ func (c *Cloud) waitUntilVolumeAvailable(volumeName KubernetesVolumeID) error { // Unreachable code return err } + time.Sleep(5 * time.Second) backoff := wait.Backoff{ Duration: volumeCreateInitialDelay, Factor: volumeCreateBackoffFactor, Steps: volumeCreateBackoffSteps, } + err = wait.ExponentialBackoff(backoff, func() (done bool, err error) { vol, err := disk.describeVolume() if err != nil { @@ -5138,11 +4680,6 @@ func (c *Cloud) getFullInstance(nodeName types.NodeName) (*awsInstance, *ec2.Ins return awsInstance, instance, err } -// IsFargateNode returns true if given node runs on Fargate compute -func IsFargateNode(nodeName string) bool { - return strings.HasPrefix(nodeName, fargateNodeNamePrefix) -} - // extract private ip address from node name func nodeNameToIPAddress(nodeName string) string { nodeName = strings.TrimPrefix(nodeName, privateDNSNamePrefix) @@ -5247,43 +4784,7 @@ func getInitialAttachDetachDelay(status string) time.Duration { return volumeAttachmentStatusInitialDelay } -// describeNetworkInterfaces returns network interface information for the given DNS name. -func (c *Cloud) describeNetworkInterfaces(nodeName string) (*ec2.NetworkInterface, error) { - eniEndpoint := strings.TrimPrefix(nodeName, fargateNodeNamePrefix) - - filters := []*ec2.Filter{ - newEc2Filter("attachment.status", "attached"), - newEc2Filter("vpc-id", c.vpcID), - } - - // when enableDnsSupport is set to false in a VPC, interface will not have private DNS names. - // convert node name to ip address because ip-name based and resource-named EC2 resources - // may have different privateDNSName formats but same privateIpAddress format - if strings.HasPrefix(eniEndpoint, privateDNSNamePrefix) { - eniEndpoint = nodeNameToIPAddress(eniEndpoint) - } - - filters = append(filters, newEc2Filter("private-ip-address", eniEndpoint)) - - request := &ec2.DescribeNetworkInterfacesInput{ - Filters: filters, - } - - eni, err := c.ec2.DescribeNetworkInterfaces(request) - if err != nil { - return nil, err - } - if len(eni.NetworkInterfaces) == 0 { - return nil, nil - } - if len(eni.NetworkInterfaces) != 1 { - // This should not be possible - ids should be unique - return nil, fmt.Errorf("multiple interfaces found with same id %q", eni.NetworkInterfaces) - } - return eni.NetworkInterfaces[0], nil -} - -func getRegionFromMetadata(cfg CloudConfig, metadata EC2Metadata) (string, error) { +func getRegionFromMetadata(cfg config.CloudConfig, metadata config.EC2Metadata) (string, error) { // For backwards compatibility reasons, keeping this check to avoid breaking possible // cases where Zone was set to override the region configuration. Otherwise, fall back // to getting region the standard way. diff --git a/pkg/providers/v1/aws_fakes.go b/pkg/providers/v1/aws_fakes.go index 08fb1ec716..ec090af069 100644 --- a/pkg/providers/v1/aws_fakes.go +++ b/pkg/providers/v1/aws_fakes.go @@ -31,6 +31,9 @@ import ( "github.com/aws/aws-sdk-go/service/elbv2" "github.com/aws/aws-sdk-go/service/kms" "k8s.io/klog/v2" + + "k8s.io/cloud-provider-aws/pkg/providers/v1/config" + "k8s.io/cloud-provider-aws/pkg/providers/v1/iface" ) // FakeAWSServices is an fake AWS session used for testing @@ -151,7 +154,7 @@ func (s *FakeAWSServices) countCall(service string, api string, resourceID strin } // Compute returns a fake EC2 client -func (s *FakeAWSServices) Compute(region string) (EC2, error) { +func (s *FakeAWSServices) Compute(region string) (iface.EC2, error) { return s.ec2, nil } @@ -171,7 +174,7 @@ func (s *FakeAWSServices) Autoscaling(region string) (ASG, error) { } // Metadata returns a fake EC2Metadata client -func (s *FakeAWSServices) Metadata() (EC2Metadata, error) { +func (s *FakeAWSServices) Metadata() (config.EC2Metadata, error) { return s.metadata, nil } @@ -182,7 +185,7 @@ func (s *FakeAWSServices) KeyManagement(region string) (KMS, error) { // FakeEC2 is a fake EC2 client used for testing type FakeEC2 interface { - EC2 + iface.EC2 CreateSubnet(*ec2.Subnet) (*ec2.CreateSubnetOutput, error) RemoveSubnets() CreateRouteTable(*ec2.RouteTable) (*ec2.CreateRouteTableOutput, error) @@ -818,6 +821,7 @@ func contains(haystack []*string, needle string) bool { // DescribeNetworkInterfaces returns list of ENIs for testing func (ec2i *FakeEC2Impl) DescribeNetworkInterfaces(input *ec2.DescribeNetworkInterfacesInput) (*ec2.DescribeNetworkInterfacesOutput, error) { + fargateNodeNamePrefix := "fargate-" networkInterface := []*ec2.NetworkInterface{ { PrivateIpAddress: aws.String("1.2.3.4"), diff --git a/pkg/providers/v1/aws_instance.go b/pkg/providers/v1/aws_instance.go new file mode 100644 index 0000000000..e7e8b152a1 --- /dev/null +++ b/pkg/providers/v1/aws_instance.go @@ -0,0 +1,71 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "k8s.io/apimachinery/pkg/types" + + "k8s.io/cloud-provider-aws/pkg/providers/v1/iface" +) + +type awsInstance struct { + ec2 iface.EC2 + + // id in AWS + awsID string + + // node name in k8s + nodeName types.NodeName + + // availability zone the instance resides in + availabilityZone string + + // ID of VPC the instance resides in + vpcID string + + // ID of subnet the instance resides in + subnetID string + + // instance type + instanceType string +} + +// newAWSInstance creates a new awsInstance object +func newAWSInstance(ec2Service iface.EC2, instance *ec2.Instance) *awsInstance { + az := "" + if instance.Placement != nil { + az = aws.StringValue(instance.Placement.AvailabilityZone) + } + self := &awsInstance{ + ec2: ec2Service, + awsID: aws.StringValue(instance.InstanceId), + nodeName: mapInstanceToNodeName(instance), + availabilityZone: az, + instanceType: aws.StringValue(instance.InstanceType), + vpcID: aws.StringValue(instance.VpcId), + subnetID: aws.StringValue(instance.SubnetId), + } + + return self +} + +// Gets the full information about this instance from the EC2 API +func (i *awsInstance) describeInstance() (*ec2.Instance, error) { + return describeInstance(i.ec2, InstanceID(i.awsID)) +} diff --git a/pkg/providers/v1/aws_loadbalancer_test.go b/pkg/providers/v1/aws_loadbalancer_test.go index 866742c3d9..309d9eb209 100644 --- a/pkg/providers/v1/aws_loadbalancer_test.go +++ b/pkg/providers/v1/aws_loadbalancer_test.go @@ -31,6 +31,8 @@ import ( "github.com/aws/aws-sdk-go/service/elb" "github.com/aws/aws-sdk-go/service/elbv2" "github.com/stretchr/testify/assert" + + "k8s.io/cloud-provider-aws/pkg/providers/v1/config" ) func TestElbProtocolsAreEqual(t *testing.T) { @@ -584,7 +586,7 @@ func TestCloud_findInstancesForELB(t *testing.T) { } newNode, newInstance := makeNodeInstancePair(1) awsServices := NewFakeAWSServices(TestClusterID) - c, err := newAWSCloud(CloudConfig{}, awsServices) + c, err := newAWSCloud(config.CloudConfig{}, awsServices) if err != nil { t.Errorf("Error building aws cloud: %v", err) return diff --git a/pkg/providers/v1/aws_sdk.go b/pkg/providers/v1/aws_sdk.go new file mode 100644 index 0000000000..7776b33320 --- /dev/null +++ b/pkg/providers/v1/aws_sdk.go @@ -0,0 +1,233 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws + +import ( + "fmt" + "sync" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/ec2metadata" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/autoscaling" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/aws/aws-sdk-go/service/elb" + "github.com/aws/aws-sdk-go/service/elbv2" + "github.com/aws/aws-sdk-go/service/kms" + "k8s.io/client-go/pkg/version" + + "k8s.io/cloud-provider-aws/pkg/providers/v1/config" + "k8s.io/cloud-provider-aws/pkg/providers/v1/iface" +) + +type awsSDKProvider struct { + creds *credentials.Credentials + cfg awsCloudConfigProvider + + mutex sync.Mutex + regionDelayers map[string]*CrossRequestRetryDelay +} + +func newAWSSDKProvider(creds *credentials.Credentials, cfg *config.CloudConfig) *awsSDKProvider { + return &awsSDKProvider{ + creds: creds, + cfg: cfg, + regionDelayers: make(map[string]*CrossRequestRetryDelay), + } +} + +func (p *awsSDKProvider) AddHandlers(regionName string, h *request.Handlers) { + h.Build.PushFrontNamed(request.NamedHandler{ + Name: "k8s/user-agent", + Fn: request.MakeAddToUserAgentHandler("kubernetes", version.Get().String()), + }) + + h.Sign.PushFrontNamed(request.NamedHandler{ + Name: "k8s/logger", + Fn: awsHandlerLogger, + }) + + delayer := p.getCrossRequestRetryDelay(regionName) + if delayer != nil { + h.Sign.PushFrontNamed(request.NamedHandler{ + Name: "k8s/delay-presign", + Fn: delayer.BeforeSign, + }) + + h.AfterRetry.PushFrontNamed(request.NamedHandler{ + Name: "k8s/delay-afterretry", + Fn: delayer.AfterRetry, + }) + } + + p.addAPILoggingHandlers(h) +} + +func (p *awsSDKProvider) addAPILoggingHandlers(h *request.Handlers) { + h.Send.PushBackNamed(request.NamedHandler{ + Name: "k8s/api-request", + Fn: awsSendHandlerLogger, + }) + + h.ValidateResponse.PushFrontNamed(request.NamedHandler{ + Name: "k8s/api-validate-response", + Fn: awsValidateResponseHandlerLogger, + }) +} + +// Get a CrossRequestRetryDelay, scoped to the region, not to the request. +// This means that when we hit a limit on a call, we will delay _all_ calls to the API. +// We do this to protect the AWS account from becoming overloaded and effectively locked. +// We also log when we hit request limits. +// Note that this delays the current goroutine; this is bad behaviour and will +// likely cause k8s to become slow or unresponsive for cloud operations. +// However, this throttle is intended only as a last resort. When we observe +// this throttling, we need to address the root cause (e.g. add a delay to a +// controller retry loop) +func (p *awsSDKProvider) getCrossRequestRetryDelay(regionName string) *CrossRequestRetryDelay { + p.mutex.Lock() + defer p.mutex.Unlock() + + delayer, found := p.regionDelayers[regionName] + if !found { + delayer = NewCrossRequestRetryDelay() + p.regionDelayers[regionName] = delayer + } + return delayer +} + +func (p *awsSDKProvider) Compute(regionName string) (iface.EC2, error) { + awsConfig := &aws.Config{ + Region: ®ionName, + Credentials: p.creds, + } + awsConfig = awsConfig.WithCredentialsChainVerboseErrors(true). + WithEndpointResolver(p.cfg.GetResolver()) + sess, err := session.NewSessionWithOptions(session.Options{ + Config: *awsConfig, + SharedConfigState: session.SharedConfigEnable, + }) + + if err != nil { + return nil, fmt.Errorf("unable to initialize AWS session: %v", err) + } + service := ec2.New(sess) + + p.AddHandlers(regionName, &service.Handlers) + + ec2 := &awsSdkEC2{ + ec2: service, + } + return ec2, nil +} + +func (p *awsSDKProvider) LoadBalancing(regionName string) (ELB, error) { + awsConfig := &aws.Config{ + Region: ®ionName, + Credentials: p.creds, + } + awsConfig = awsConfig.WithCredentialsChainVerboseErrors(true). + WithEndpointResolver(p.cfg.GetResolver()) + sess, err := session.NewSessionWithOptions(session.Options{ + Config: *awsConfig, + SharedConfigState: session.SharedConfigEnable, + }) + if err != nil { + return nil, fmt.Errorf("unable to initialize AWS session: %v", err) + } + elbClient := elb.New(sess) + p.AddHandlers(regionName, &elbClient.Handlers) + + return elbClient, nil +} + +func (p *awsSDKProvider) LoadBalancingV2(regionName string) (ELBV2, error) { + awsConfig := &aws.Config{ + Region: ®ionName, + Credentials: p.creds, + } + awsConfig = awsConfig.WithCredentialsChainVerboseErrors(true). + WithEndpointResolver(p.cfg.GetResolver()) + sess, err := session.NewSessionWithOptions(session.Options{ + Config: *awsConfig, + SharedConfigState: session.SharedConfigEnable, + }) + if err != nil { + return nil, fmt.Errorf("unable to initialize AWS session: %v", err) + } + elbClient := elbv2.New(sess) + + p.AddHandlers(regionName, &elbClient.Handlers) + + return elbClient, nil +} + +func (p *awsSDKProvider) Autoscaling(regionName string) (ASG, error) { + awsConfig := &aws.Config{ + Region: ®ionName, + Credentials: p.creds, + } + awsConfig = awsConfig.WithCredentialsChainVerboseErrors(true). + WithEndpointResolver(p.cfg.GetResolver()) + sess, err := session.NewSessionWithOptions(session.Options{ + Config: *awsConfig, + SharedConfigState: session.SharedConfigEnable, + }) + if err != nil { + return nil, fmt.Errorf("unable to initialize AWS session: %v", err) + } + client := autoscaling.New(sess) + + p.AddHandlers(regionName, &client.Handlers) + + return client, nil +} + +func (p *awsSDKProvider) Metadata() (config.EC2Metadata, error) { + sess, err := session.NewSession(&aws.Config{ + EndpointResolver: p.cfg.GetResolver(), + }) + if err != nil { + return nil, fmt.Errorf("unable to initialize AWS session: %v", err) + } + client := ec2metadata.New(sess) + p.addAPILoggingHandlers(&client.Handlers) + return client, nil +} + +func (p *awsSDKProvider) KeyManagement(regionName string) (KMS, error) { + awsConfig := &aws.Config{ + Region: ®ionName, + Credentials: p.creds, + } + awsConfig = awsConfig.WithCredentialsChainVerboseErrors(true). + WithEndpointResolver(p.cfg.GetResolver()) + sess, err := session.NewSessionWithOptions(session.Options{ + Config: *awsConfig, + SharedConfigState: session.SharedConfigEnable, + }) + if err != nil { + return nil, fmt.Errorf("unable to initialize AWS session: %v", err) + } + kmsClient := kms.New(sess) + + p.AddHandlers(regionName, &kmsClient.Handlers) + + return kmsClient, nil +} diff --git a/pkg/providers/v1/aws_test.go b/pkg/providers/v1/aws_test.go index b6c334659d..2b3877529e 100644 --- a/pkg/providers/v1/aws_test.go +++ b/pkg/providers/v1/aws_test.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "io" + cloudvolume "k8s.io/cloud-provider/volume" "math/rand" "reflect" "sort" @@ -43,7 +44,8 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" - cloudvolume "k8s.io/cloud-provider/volume" + + "k8s.io/cloud-provider-aws/pkg/providers/v1/config" ) const TestClusterID = "clusterid.test" @@ -390,7 +392,7 @@ func TestOverridesActiveConfig(t *testing.T) { t.Logf("Running test case %s", test.name) cfg, err := readAWSCloudConfig(test.reader) if err == nil { - err = cfg.validateOverrides() + err = cfg.ValidateOverrides() } if test.expectError { if err == nil { @@ -442,7 +444,7 @@ func TestOverridesActiveConfig(t *testing.T) { sd.signingName, found.SigningName, test.name) } - fn := cfg.getResolver() + fn := cfg.GetResolver() ep1, e := fn(sd.name, sd.region, nil) if e != nil { t.Errorf("Expected a valid endpoint for %s in case %s", @@ -518,7 +520,7 @@ func mockInstancesResp(selfInstance *ec2.Instance, instances []*ec2.Instance) (* awsServices := newMockedFakeAWSServices(TestClusterID) awsServices.instances = instances awsServices.selfInstance = selfInstance - awsCloud, err := newAWSCloud(CloudConfig{}, awsServices) + awsCloud, err := newAWSCloud(config.CloudConfig{}, awsServices) if err != nil { panic(err) } @@ -542,7 +544,7 @@ func mockInstancesResp(selfInstance *ec2.Instance, instances []*ec2.Instance) (* func mockZone(region, availabilityZone string) *Cloud { awsServices := newMockedFakeAWSServices(TestClusterID).WithAz(availabilityZone).WithRegion(region) - awsCloud, err := newAWSCloud(CloudConfig{}, awsServices) + awsCloud, err := newAWSCloud(config.CloudConfig{}, awsServices) if err != nil { panic(err) } @@ -823,7 +825,7 @@ func TestGetRegion(t *testing.T) { func TestFindVPCID(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, err := newAWSCloud(CloudConfig{}, awsServices) + c, err := newAWSCloud(config.CloudConfig{}, awsServices) if err != nil { t.Errorf("Error building aws cloud: %v", err) return @@ -897,7 +899,7 @@ func constructRouteTable(subnetID string, public bool) *ec2.RouteTable { func Test_findELBSubnets(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, err := newAWSCloud(CloudConfig{}, awsServices) + c, err := newAWSCloud(config.CloudConfig{}, awsServices) if err != nil { t.Errorf("Error building aws cloud: %v", err) return @@ -1172,7 +1174,7 @@ func Test_findELBSubnets(t *testing.T) { func Test_getLoadBalancerSubnets(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, err := newAWSCloud(CloudConfig{}, awsServices) + c, err := newAWSCloud(config.CloudConfig{}, awsServices) if err != nil { t.Errorf("Error building aws cloud: %v", err) return @@ -1278,7 +1280,7 @@ func Test_getLoadBalancerSubnets(t *testing.T) { func TestSubnetIDsinVPC(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, err := newAWSCloud(CloudConfig{}, awsServices) + c, err := newAWSCloud(config.CloudConfig{}, awsServices) if err != nil { t.Errorf("Error building aws cloud: %v", err) return @@ -1622,7 +1624,7 @@ func TestFindInstanceByNodeNameExcludesTerminatedInstances(t *testing.T) { awsServices.instances = append(awsDefaultInstances, &testInstance) - c, err := newAWSCloud(CloudConfig{}, awsServices) + c, err := newAWSCloud(config.CloudConfig{}, awsServices) if err != nil { t.Errorf("Error building aws cloud: %v", err) return @@ -1650,7 +1652,7 @@ func TestFindInstanceByNodeNameExcludesTerminatedInstances(t *testing.T) { func TestGetInstanceByNodeNameBatching(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, err := newAWSCloud(CloudConfig{}, awsServices) + c, err := newAWSCloud(config.CloudConfig{}, awsServices) assert.Nil(t, err, "Error building aws cloud: %v", err) var tag ec2.Tag tag.Key = aws.String(TagNameKubernetesClusterPrefix + TestClusterID) @@ -1678,7 +1680,7 @@ func TestGetInstanceByNodeNameBatching(t *testing.T) { func TestGetVolumeLabels(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, err := newAWSCloud(CloudConfig{}, awsServices) + c, err := newAWSCloud(config.CloudConfig{}, awsServices) assert.Nil(t, err, "Error building aws cloud: %v", err) volumeID := EBSVolumeID("vol-VolumeId") expectedVolumeRequest := &ec2.DescribeVolumesInput{VolumeIds: []*string{volumeID.awsString()}} @@ -1779,7 +1781,7 @@ func TestGetLabelsForVolume(t *testing.T) { expectedVolumeRequest := &ec2.DescribeVolumesInput{VolumeIds: []*string{test.expectedVolumeID}} awsServices.ec2.(*MockedFakeEC2).On("DescribeVolumes", expectedVolumeRequest).Return(test.expectedEC2Volumes) - c, err := newAWSCloud(CloudConfig{}, awsServices) + c, err := newAWSCloud(config.CloudConfig{}, awsServices) assert.Nil(t, err, "Error building aws cloud: %v", err) l, err := c.GetLabelsForVolume(context.TODO(), test.pv) @@ -1792,7 +1794,7 @@ func TestGetLabelsForVolume(t *testing.T) { func TestDescribeLoadBalancerOnDelete(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, _ := newAWSCloud(CloudConfig{}, awsServices) + c, _ := newAWSCloud(config.CloudConfig{}, awsServices) awsServices.elb.(*MockedFakeELB).expectDescribeLoadBalancers("aid") c.EnsureLoadBalancerDeleted(context.TODO(), TestClusterName, &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "myservice", UID: "id"}}) @@ -1800,7 +1802,7 @@ func TestDescribeLoadBalancerOnDelete(t *testing.T) { func TestDescribeLoadBalancerOnUpdate(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, _ := newAWSCloud(CloudConfig{}, awsServices) + c, _ := newAWSCloud(config.CloudConfig{}, awsServices) awsServices.elb.(*MockedFakeELB).expectDescribeLoadBalancers("aid") c.UpdateLoadBalancer(context.TODO(), TestClusterName, &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "myservice", UID: "id"}}, []*v1.Node{}) @@ -1808,7 +1810,7 @@ func TestDescribeLoadBalancerOnUpdate(t *testing.T) { func TestDescribeLoadBalancerOnGet(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, _ := newAWSCloud(CloudConfig{}, awsServices) + c, _ := newAWSCloud(config.CloudConfig{}, awsServices) awsServices.elb.(*MockedFakeELB).expectDescribeLoadBalancers("aid") c.GetLoadBalancer(context.TODO(), TestClusterName, &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "myservice", UID: "id"}}) @@ -1816,7 +1818,7 @@ func TestDescribeLoadBalancerOnGet(t *testing.T) { func TestDescribeLoadBalancerOnEnsure(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, _ := newAWSCloud(CloudConfig{}, awsServices) + c, _ := newAWSCloud(config.CloudConfig{}, awsServices) awsServices.elb.(*MockedFakeELB).expectDescribeLoadBalancers("aid") c.EnsureLoadBalancer(context.TODO(), TestClusterName, &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "myservice", UID: "id"}}, []*v1.Node{}) @@ -2154,7 +2156,7 @@ func TestGetKeyValuePropertiesFromAnnotation(t *testing.T) { func TestLBExtraSecurityGroupsAnnotation(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, _ := newAWSCloud(CloudConfig{}, awsServices) + c, _ := newAWSCloud(config.CloudConfig{}, awsServices) sg1 := map[string]string{ServiceAnnotationLoadBalancerExtraSecurityGroups: "sg-000001"} sg2 := map[string]string{ServiceAnnotationLoadBalancerExtraSecurityGroups: "sg-000002"} @@ -2190,7 +2192,7 @@ func TestLBExtraSecurityGroupsAnnotation(t *testing.T) { func TestLBSecurityGroupsAnnotation(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, _ := newAWSCloud(CloudConfig{}, awsServices) + c, _ := newAWSCloud(config.CloudConfig{}, awsServices) sg1 := map[string]string{ServiceAnnotationLoadBalancerSecurityGroups: "sg-000001"} sg2 := map[string]string{ServiceAnnotationLoadBalancerSecurityGroups: "sg-000002"} @@ -2225,7 +2227,7 @@ func TestLBSecurityGroupsAnnotation(t *testing.T) { func TestAddLoadBalancerTags(t *testing.T) { loadBalancerName := "test-elb" awsServices := newMockedFakeAWSServices(TestClusterID) - c, _ := newAWSCloud(CloudConfig{}, awsServices) + c, _ := newAWSCloud(config.CloudConfig{}, awsServices) want := make(map[string]string) want["tag1"] = "val1" @@ -2399,7 +2401,7 @@ func TestEnsureLoadBalancerHealthCheck(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, err := newAWSCloud(CloudConfig{}, awsServices) + c, err := newAWSCloud(config.CloudConfig{}, awsServices) assert.Nil(t, err, "Error building aws cloud: %v", err) expectedHC := test.want awsServices.elb.(*MockedFakeELB).expectConfigureHealthCheck(&lbName, &expectedHC, nil) @@ -2413,7 +2415,7 @@ func TestEnsureLoadBalancerHealthCheck(t *testing.T) { t.Run("does not make an API call if the current health check is the same", func(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, err := newAWSCloud(CloudConfig{}, awsServices) + c, err := newAWSCloud(config.CloudConfig{}, awsServices) assert.Nil(t, err, "Error building aws cloud: %v", err) expectedHC := *defaultHC timeout := int64(3) @@ -2435,7 +2437,7 @@ func TestEnsureLoadBalancerHealthCheck(t *testing.T) { t.Run("validates resulting expected health check before making an API call", func(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, err := newAWSCloud(CloudConfig{}, awsServices) + c, err := newAWSCloud(config.CloudConfig{}, awsServices) assert.Nil(t, err, "Error building aws cloud: %v", err) expectedHC := *defaultHC invalidThreshold := int64(1) @@ -2451,7 +2453,7 @@ func TestEnsureLoadBalancerHealthCheck(t *testing.T) { t.Run("handles invalid override values", func(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, err := newAWSCloud(CloudConfig{}, awsServices) + c, err := newAWSCloud(config.CloudConfig{}, awsServices) assert.Nil(t, err, "Error building aws cloud: %v", err) annotations := map[string]string{ServiceAnnotationLoadBalancerHCTimeout: "3.3"} @@ -2463,7 +2465,7 @@ func TestEnsureLoadBalancerHealthCheck(t *testing.T) { t.Run("returns error when updating the health check fails", func(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, err := newAWSCloud(CloudConfig{}, awsServices) + c, err := newAWSCloud(config.CloudConfig{}, awsServices) assert.Nil(t, err, "Error building aws cloud: %v", err) returnErr := fmt.Errorf("throttling error") awsServices.elb.(*MockedFakeELB).expectConfigureHealthCheck(&lbName, defaultHC, returnErr) @@ -2500,7 +2502,7 @@ func TestFindSecurityGroupForInstanceMultipleTagged(t *testing.T) { func TestCreateDisk(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, _ := newAWSCloud(CloudConfig{}, awsServices) + c, _ := newAWSCloud(config.CloudConfig{}, awsServices) volumeOptions := &VolumeOptions{ AvailabilityZone: "us-west-2a", @@ -2541,7 +2543,7 @@ func TestCreateDisk(t *testing.T) { func TestCreateDiskFailDescribeVolume(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, _ := newAWSCloud(CloudConfig{}, awsServices) + c, _ := newAWSCloud(config.CloudConfig{}, awsServices) volumeOptions := &VolumeOptions{ AvailabilityZone: "us-west-2a", @@ -2594,7 +2596,7 @@ const ( func TestNodeNameToInstanceID(t *testing.T) { fakeAWS := newMockedFakeAWSServices(TestClusterID) - c, err := newAWSCloud(CloudConfig{}, fakeAWS) + c, err := newAWSCloud(config.CloudConfig{}, fakeAWS) assert.NoError(t, err) fakeClient := &fake.Clientset{} @@ -2704,7 +2706,7 @@ func TestInstanceIDToNodeName(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - awsCloud, err := newAWSCloud(CloudConfig{}, awsServices) + awsCloud, err := newAWSCloud(config.CloudConfig{}, awsServices) if err != nil { t.Fatalf("error creating mock cloud: %v", err) } @@ -3196,7 +3198,7 @@ func (m *MockedFakeEC2) maybeExpectDescribeSecurityGroups(clusterID, groupName s func TestNLBNodeRegistration(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) awsServices.elbv2 = &MockedFakeELBV2{Tags: make(map[string][]elbv2.Tag), RegisteredInstances: make(map[string][]string), LoadBalancerAttributes: make(map[string]map[string]string)} - c, _ := newAWSCloud(CloudConfig{}, awsServices) + c, _ := newAWSCloud(config.CloudConfig{}, awsServices) awsServices.ec2.(*MockedFakeEC2).Subnets = []*ec2.Subnet{ { @@ -3754,7 +3756,7 @@ func Test_parseStringSliceAnnotation(t *testing.T) { func TestNodeAddressesForFargate(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, _ := newAWSCloud(CloudConfig{}, awsServices) + c, _ := newAWSCloud(config.CloudConfig{}, awsServices) nodeAddresses, _ := c.NodeAddressesByProviderID(context.TODO(), "aws:///us-west-2c/1abc-2def/fargate-ip-return-private-dns-name.us-west-2.compute.internal") verifyNodeAddressesForFargate(t, "IPV4", true, nodeAddresses) @@ -3762,7 +3764,7 @@ func TestNodeAddressesForFargate(t *testing.T) { func TestNodeAddressesForFargateIPV6Family(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, _ := newAWSCloud(CloudConfig{}, awsServices) + c, _ := newAWSCloud(config.CloudConfig{}, awsServices) c.cfg.Global.NodeIPFamilies = []string{"ipv6"} nodeAddresses, _ := c.NodeAddressesByProviderID(context.TODO(), "aws:///us-west-2c/1abc-2def/fargate-ip-return-private-dns-name-ipv6.us-west-2.compute.internal") @@ -3771,7 +3773,7 @@ func TestNodeAddressesForFargateIPV6Family(t *testing.T) { func TestNodeAddressesForFargatePrivateIP(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, _ := newAWSCloud(CloudConfig{}, awsServices) + c, _ := newAWSCloud(config.CloudConfig{}, awsServices) nodeAddresses, _ := c.NodeAddressesByProviderID(context.TODO(), "aws:///us-west-2c/1abc-2def/fargate-192.168.164.88") verifyNodeAddressesForFargate(t, "IPV4", false, nodeAddresses) @@ -3796,7 +3798,7 @@ func verifyNodeAddressesForFargate(t *testing.T, ipFamily string, verifyPublicIP func TestNodeAddressesOrderedByDeviceIndex(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, _ := newAWSCloud(CloudConfig{}, awsServices) + c, _ := newAWSCloud(config.CloudConfig{}, awsServices) nodeAddresses, _ := c.NodeAddressesByProviderID(context.TODO(), "aws:///us-west-2a/i-self") expectedAddresses := []v1.NodeAddress{ @@ -3813,7 +3815,7 @@ func TestNodeAddressesOrderedByDeviceIndex(t *testing.T) { func TestInstanceExistsByProviderIDForFargate(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, _ := newAWSCloud(CloudConfig{}, awsServices) + c, _ := newAWSCloud(config.CloudConfig{}, awsServices) instanceExist, err := c.InstanceExistsByProviderID(context.TODO(), "aws:///us-west-2c/1abc-2def/fargate-192.168.164.88") assert.Nil(t, err) @@ -3822,7 +3824,7 @@ func TestInstanceExistsByProviderIDForFargate(t *testing.T) { func TestInstanceExistsByProviderIDWithNodeNameForFargate(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, _ := newAWSCloud(CloudConfig{}, awsServices) + c, _ := newAWSCloud(config.CloudConfig{}, awsServices) instanceExist, err := c.InstanceExistsByProviderID(context.TODO(), "aws:///us-west-2c/1abc-2def/fargate-ip-192-168-164-88.us-west-2.compute.internal") assert.Nil(t, err) @@ -3842,7 +3844,7 @@ func TestInstanceExistsByProviderIDForInstanceNotFound(t *testing.T) { func TestInstanceNotExistsByProviderIDForFargate(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, _ := newAWSCloud(CloudConfig{}, awsServices) + c, _ := newAWSCloud(config.CloudConfig{}, awsServices) instanceExist, err := c.InstanceExistsByProviderID(context.TODO(), "aws:///us-west-2c/1abc-2def/fargate-not-found") assert.Nil(t, err) @@ -3851,7 +3853,7 @@ func TestInstanceNotExistsByProviderIDForFargate(t *testing.T) { func TestInstanceShutdownByProviderIDForFargate(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, _ := newAWSCloud(CloudConfig{}, awsServices) + c, _ := newAWSCloud(config.CloudConfig{}, awsServices) instanceExist, err := c.InstanceShutdownByProviderID(context.TODO(), "aws:///us-west-2c/1abc-2def/fargate-192.168.164.88") assert.Nil(t, err) @@ -3860,7 +3862,7 @@ func TestInstanceShutdownByProviderIDForFargate(t *testing.T) { func TestInstanceShutdownNotExistsByProviderIDForFargate(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, _ := newAWSCloud(CloudConfig{}, awsServices) + c, _ := newAWSCloud(config.CloudConfig{}, awsServices) instanceExist, err := c.InstanceShutdownByProviderID(context.TODO(), "aws:///us-west-2c/1abc-2def/fargate-not-found") assert.Nil(t, err) @@ -3869,7 +3871,7 @@ func TestInstanceShutdownNotExistsByProviderIDForFargate(t *testing.T) { func TestInstanceTypeByProviderIDForFargate(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, _ := newAWSCloud(CloudConfig{}, awsServices) + c, _ := newAWSCloud(config.CloudConfig{}, awsServices) instanceType, err := c.InstanceTypeByProviderID(context.TODO(), "aws:///us-west-2c/1abc-2def/fargate-not-found") assert.Nil(t, err) @@ -3878,7 +3880,7 @@ func TestInstanceTypeByProviderIDForFargate(t *testing.T) { func TestGetZoneByProviderIDForFargate(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) - c, _ := newAWSCloud(CloudConfig{}, awsServices) + c, _ := newAWSCloud(config.CloudConfig{}, awsServices) zoneDetails, err := c.GetZoneByProviderID(context.TODO(), "aws:///us-west-2c/1abc-2def/fargate-192.168.164.88") assert.Nil(t, err) @@ -3888,18 +3890,18 @@ func TestGetZoneByProviderIDForFargate(t *testing.T) { func TestGetRegionFromMetadata(t *testing.T) { awsServices := newMockedFakeAWSServices(TestClusterID) // Returns region from zone if set - cfg := CloudConfig{} + cfg := config.CloudConfig{} cfg.Global.Zone = "us-west-2a" region, err := getRegionFromMetadata(cfg, awsServices.metadata) assert.NoError(t, err) assert.Equal(t, "us-west-2", region) // Returns error if can map to region - cfg = CloudConfig{} + cfg = config.CloudConfig{} cfg.Global.Zone = "some-fake-zone" _, err = getRegionFromMetadata(cfg, awsServices.metadata) assert.Error(t, err) // Returns region from metadata if zone unset - cfg = CloudConfig{} + cfg = config.CloudConfig{} region, err = getRegionFromMetadata(cfg, awsServices.metadata) assert.NoError(t, err) assert.Equal(t, "us-west-2", region) diff --git a/pkg/providers/v1/config/config.go b/pkg/providers/v1/config/config.go new file mode 100644 index 0000000000..ef6e371115 --- /dev/null +++ b/pkg/providers/v1/config/config.go @@ -0,0 +1,182 @@ +package config + +import ( + "fmt" + "github.com/aws/aws-sdk-go/aws/request" + "strings" + + "github.com/aws/aws-sdk-go/aws/endpoints" + + "k8s.io/klog/v2" +) + +// CloudConfig wraps the settings for the AWS cloud provider. +// NOTE: Cloud config files should follow the same Kubernetes deprecation policy as +// flags or CLIs. Config fields should not change behavior in incompatible ways and +// should be deprecated for at least 2 release prior to removing. +// See https://kubernetes.io/docs/reference/using-api/deprecation-policy/#deprecating-a-flag-or-cli +// for more details. +type CloudConfig struct { + Global struct { + // TODO: Is there any use for this? We can get it from the instance metadata service + // Maybe if we're not running on AWS, e.g. bootstrap; for now it is not very useful + Zone string + + Region string + + // The AWS VPC flag enables the possibility to run the master components + // on a different aws account, on a different cloud provider or on-premises. + // If the flag is set also the KubernetesClusterTag must be provided + VPC string + // SubnetID enables using a specific subnet to use for ELB's + SubnetID string + // RouteTableID enables using a specific RouteTable + RouteTableID string + + // RoleARN is the IAM role to assume when interaction with AWS APIs. + RoleARN string + // SourceARN is value which is passed while assuming role specified by RoleARN. When a service + // assumes a role in your account, you can include the aws:SourceAccount and aws:SourceArn global + // condition context keys in your role trust policy to limit access to the role to only requests that are generated + // by expected resources. https://docs.aws.amazon.com/IAM/latest/UserGuide/confused-deputy.html + SourceARN string + + // KubernetesClusterTag is the legacy cluster id we'll use to identify our cluster resources + KubernetesClusterTag string + // KubernetesClusterID is the cluster id we'll use to identify our cluster resources + KubernetesClusterID string + + //The aws provider creates an inbound rule per load balancer on the node security + //group. However, this can run into the AWS security group rule limit of 50 if + //many LoadBalancers are created. + // + //This flag disables the automatic ingress creation. It requires that the user + //has setup a rule that allows inbound traffic on kubelet ports from the + //local VPC subnet (so load balancers can access it). E.g. 10.82.0.0/16 30000-32000. + DisableSecurityGroupIngress bool + + //AWS has a hard limit of 500 security groups. For large clusters creating a security group for each ELB + //can cause the max number of security groups to be reached. If this is set instead of creating a new + //Security group for each ELB this security group will be used instead. + ElbSecurityGroup string + + // NodeIPFamilies determines which IP addresses are added to node objects and their ordering. + NodeIPFamilies []string + } + // [ServiceOverride "1"] + // Service = s3 + // Region = region1 + // URL = https://s3.foo.bar + // SigningRegion = signing_region + // SigningMethod = signing_method + // + // [ServiceOverride "2"] + // Service = ec2 + // Region = region2 + // URL = https://ec2.foo.bar + // SigningRegion = signing_region + // SigningMethod = signing_method + ServiceOverride map[string]*struct { + Service string + Region string + URL string + SigningRegion string + SigningMethod string + SigningName string + } +} + +// EC2Metadata is an abstraction over the AWS metadata service. +type EC2Metadata interface { + // Query the EC2 metadata service (used to discover instance-id etc) + GetMetadata(path string) (string, error) + Region() (string, error) +} + +// GetRegion returns the AWS region from the config, if set, or gets it from the metadata +// service if unset and sets in config +func (cfg *CloudConfig) GetRegion(metadata EC2Metadata) (string, error) { + if cfg.Global.Region != "" { + return cfg.Global.Region, nil + } + + klog.Info("Loading region from metadata service") + region, err := metadata.Region() + if err != nil { + return "", err + } + + cfg.Global.Region = region + return region, nil +} + +// ValidateOverrides ensures overrides are correct +func (cfg *CloudConfig) ValidateOverrides() error { + if len(cfg.ServiceOverride) == 0 { + return nil + } + set := make(map[string]bool) + for onum, ovrd := range cfg.ServiceOverride { + // Note: gcfg does not space trim, so we have to when comparing to empty string "" + name := strings.TrimSpace(ovrd.Service) + if name == "" { + return fmt.Errorf("service name is missing [Service is \"\"] in override %s", onum) + } + // insure the map service name is space trimmed + ovrd.Service = name + + region := strings.TrimSpace(ovrd.Region) + if region == "" { + return fmt.Errorf("service region is missing [Region is \"\"] in override %s", onum) + } + // insure the map region is space trimmed + ovrd.Region = region + + url := strings.TrimSpace(ovrd.URL) + if url == "" { + return fmt.Errorf("url is missing [URL is \"\"] in override %s", onum) + } + signingRegion := strings.TrimSpace(ovrd.SigningRegion) + if signingRegion == "" { + return fmt.Errorf("signingRegion is missing [SigningRegion is \"\"] in override %s", onum) + } + signature := name + "_" + region + if set[signature] { + return fmt.Errorf("duplicate entry found for service override [%s] (%s in %s)", onum, name, region) + } + set[signature] = true + } + return nil +} + +// GetResolver computes the correct resolver to use +func (cfg *CloudConfig) GetResolver() endpoints.ResolverFunc { + defaultResolver := endpoints.DefaultResolver() + defaultResolverFn := func(service, region string, + optFns ...func(*endpoints.Options)) (endpoints.ResolvedEndpoint, error) { + return defaultResolver.EndpointFor(service, region, optFns...) + } + if len(cfg.ServiceOverride) == 0 { + return defaultResolverFn + } + + return func(service, region string, + optFns ...func(*endpoints.Options)) (endpoints.ResolvedEndpoint, error) { + for _, override := range cfg.ServiceOverride { + if override.Service == service && override.Region == region { + return endpoints.ResolvedEndpoint{ + URL: override.URL, + SigningRegion: override.SigningRegion, + SigningMethod: override.SigningMethod, + SigningName: override.SigningName, + }, nil + } + } + return defaultResolver.EndpointFor(service, region, optFns...) + } +} + +// SDKProvider can be used by variants to add their own handlers +type SDKProvider interface { + AddHandlers(regionName string, h *request.Handlers) +} diff --git a/pkg/providers/v1/iface/types.go b/pkg/providers/v1/iface/types.go new file mode 100644 index 0000000000..0897627822 --- /dev/null +++ b/pkg/providers/v1/iface/types.go @@ -0,0 +1,51 @@ +package iface + +import "github.com/aws/aws-sdk-go/service/ec2" + +// EC2 is an abstraction over AWS', to allow mocking/other implementations +// Note that the DescribeX functions return a list, so callers don't need to deal with paging +// TODO: Should we rename this to AWS (EBS & ELB are not technically part of EC2) +type EC2 interface { + // Query EC2 for instances matching the filter + DescribeInstances(request *ec2.DescribeInstancesInput) ([]*ec2.Instance, error) + + // Attach a volume to an instance + AttachVolume(*ec2.AttachVolumeInput) (*ec2.VolumeAttachment, error) + // Detach a volume from an instance it is attached to + DetachVolume(request *ec2.DetachVolumeInput) (resp *ec2.VolumeAttachment, err error) + // Lists volumes + DescribeVolumes(request *ec2.DescribeVolumesInput) ([]*ec2.Volume, error) + // Create an EBS volume + CreateVolume(request *ec2.CreateVolumeInput) (resp *ec2.Volume, err error) + // Delete an EBS volume + DeleteVolume(*ec2.DeleteVolumeInput) (*ec2.DeleteVolumeOutput, error) + + ModifyVolume(*ec2.ModifyVolumeInput) (*ec2.ModifyVolumeOutput, error) + + DescribeVolumeModifications(*ec2.DescribeVolumesModificationsInput) ([]*ec2.VolumeModification, error) + + DescribeSecurityGroups(request *ec2.DescribeSecurityGroupsInput) ([]*ec2.SecurityGroup, error) + + CreateSecurityGroup(*ec2.CreateSecurityGroupInput) (*ec2.CreateSecurityGroupOutput, error) + DeleteSecurityGroup(request *ec2.DeleteSecurityGroupInput) (*ec2.DeleteSecurityGroupOutput, error) + + AuthorizeSecurityGroupIngress(*ec2.AuthorizeSecurityGroupIngressInput) (*ec2.AuthorizeSecurityGroupIngressOutput, error) + RevokeSecurityGroupIngress(*ec2.RevokeSecurityGroupIngressInput) (*ec2.RevokeSecurityGroupIngressOutput, error) + + DescribeSubnets(*ec2.DescribeSubnetsInput) ([]*ec2.Subnet, error) + + DescribeAvailabilityZones(request *ec2.DescribeAvailabilityZonesInput) ([]*ec2.AvailabilityZone, error) + + CreateTags(*ec2.CreateTagsInput) (*ec2.CreateTagsOutput, error) + DeleteTags(input *ec2.DeleteTagsInput) (*ec2.DeleteTagsOutput, error) + + DescribeRouteTables(request *ec2.DescribeRouteTablesInput) ([]*ec2.RouteTable, error) + CreateRoute(request *ec2.CreateRouteInput) (*ec2.CreateRouteOutput, error) + DeleteRoute(request *ec2.DeleteRouteInput) (*ec2.DeleteRouteOutput, error) + + ModifyInstanceAttribute(request *ec2.ModifyInstanceAttributeInput) (*ec2.ModifyInstanceAttributeOutput, error) + + DescribeVpcs(input *ec2.DescribeVpcsInput) (*ec2.DescribeVpcsOutput, error) + + DescribeNetworkInterfaces(input *ec2.DescribeNetworkInterfacesInput) (*ec2.DescribeNetworkInterfacesOutput, error) +} diff --git a/pkg/providers/v1/instances.go b/pkg/providers/v1/instances.go index ed3aad13f9..2d1e024a4a 100644 --- a/pkg/providers/v1/instances.go +++ b/pkg/providers/v1/instances.go @@ -28,6 +28,9 @@ import ( "github.com/aws/aws-sdk-go/service/ec2" v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" + + "k8s.io/cloud-provider-aws/pkg/providers/v1/iface" + "k8s.io/cloud-provider-aws/pkg/providers/v1/variant" ) // awsInstanceRegMatch represents Regex Match for AWS instance. @@ -78,7 +81,7 @@ func (name KubernetesInstanceID) MapToAWSInstanceID() (InstanceID, error) { // We sanity check the resulting volume; the two known formats are // i-12345678 and i-12345678abcdef01 - if awsID == "" || !(awsInstanceRegMatch.MatchString(awsID) || IsFargateNode(awsID)) { + if awsID == "" || !(awsInstanceRegMatch.MatchString(awsID) || variant.IsVariantNode(awsID)) { return "", fmt.Errorf("Invalid format for AWS instance (%s)", name) } @@ -122,7 +125,7 @@ func mapToAWSInstanceIDsTolerant(nodes []*v1.Node) []InstanceID { } // Gets the full information about this instance from the EC2 API -func describeInstance(ec2Client EC2, instanceID InstanceID) (*ec2.Instance, error) { +func describeInstance(ec2Client iface.EC2, instanceID InstanceID) (*ec2.Instance, error) { request := &ec2.DescribeInstancesInput{ InstanceIds: []*string{instanceID.awsString()}, } diff --git a/pkg/providers/v1/tags.go b/pkg/providers/v1/tags.go index 9b82436097..8c5ee23153 100644 --- a/pkg/providers/v1/tags.go +++ b/pkg/providers/v1/tags.go @@ -25,6 +25,8 @@ import ( "k8s.io/klog/v2" "k8s.io/apimachinery/pkg/util/wait" + + "k8s.io/cloud-provider-aws/pkg/providers/v1/iface" ) // TagNameKubernetesClusterPrefix is the tag name we use to differentiate multiple @@ -162,7 +164,7 @@ func (t *awsTagging) hasNoClusterPrefixTag(tags []*ec2.Tag) bool { // Ensure that a resource has the correct tags // If it has no tags, we assume that this was a problem caused by an error in between creation and tagging, // and we add the tags. If it has a different cluster's tags, that is an error. -func (t *awsTagging) readRepairClusterTags(client EC2, resourceID string, lifecycle ResourceLifecycle, additionalTags map[string]string, observedTags []*ec2.Tag) error { +func (t *awsTagging) readRepairClusterTags(client iface.EC2, resourceID string, lifecycle ResourceLifecycle, additionalTags map[string]string, observedTags []*ec2.Tag) error { actualTagMap := make(map[string]string) for _, tag := range observedTags { actualTagMap[aws.StringValue(tag.Key)] = aws.StringValue(tag.Value) @@ -198,7 +200,7 @@ func (t *awsTagging) readRepairClusterTags(client EC2, resourceID string, lifecy // createTags calls EC2 CreateTags, but adds retry-on-failure logic // We retry mainly because if we create an object, we cannot tag it until it is "fully created" (eventual consistency) // The error code varies though (depending on what we are tagging), so we simply retry on all errors -func (t *awsTagging) createTags(client EC2, resourceID string, lifecycle ResourceLifecycle, additionalTags map[string]string) error { +func (t *awsTagging) createTags(client iface.EC2, resourceID string, lifecycle ResourceLifecycle, additionalTags map[string]string) error { tags := t.buildTags(lifecycle, additionalTags) if tags == nil || len(tags) == 0 { diff --git a/pkg/providers/v1/tags_test.go b/pkg/providers/v1/tags_test.go index af2e70927e..d7b8dad58a 100644 --- a/pkg/providers/v1/tags_test.go +++ b/pkg/providers/v1/tags_test.go @@ -28,11 +28,13 @@ import ( "github.com/aws/aws-sdk-go/service/ec2" "github.com/stretchr/testify/assert" "k8s.io/klog/v2" + + "k8s.io/cloud-provider-aws/pkg/providers/v1/config" ) func TestFilterTags(t *testing.T) { awsServices := NewFakeAWSServices(TestClusterID) - c, err := newAWSCloud(CloudConfig{}, awsServices) + c, err := newAWSCloud(config.CloudConfig{}, awsServices) if err != nil { t.Errorf("Error building aws cloud: %v", err) return @@ -125,7 +127,7 @@ func TestFindClusterID(t *testing.T) { func TestHasClusterTag(t *testing.T) { awsServices := NewFakeAWSServices(TestClusterID) - c, err := newAWSCloud(CloudConfig{}, awsServices) + c, err := newAWSCloud(config.CloudConfig{}, awsServices) if err != nil { t.Errorf("Error building aws cloud: %v", err) return @@ -190,7 +192,7 @@ func TestHasClusterTag(t *testing.T) { func TestHasNoClusterPrefixTag(t *testing.T) { awsServices := NewFakeAWSServices(TestClusterID) - c, err := newAWSCloud(CloudConfig{}, awsServices) + c, err := newAWSCloud(config.CloudConfig{}, awsServices) if err != nil { t.Errorf("Error building aws cloud: %v", err) return @@ -242,7 +244,7 @@ func TestTagResource(t *testing.T) { klog.InitFlags(testFlags) testFlags.Parse([]string{"--logtostderr=false"}) awsServices := NewFakeAWSServices(TestClusterID) - c, err := newAWSCloud(CloudConfig{}, awsServices) + c, err := newAWSCloud(config.CloudConfig{}, awsServices) if err != nil { t.Errorf("Error building aws cloud: %v", err) return @@ -294,7 +296,7 @@ func TestUntagResource(t *testing.T) { klog.InitFlags(testFlags) testFlags.Parse([]string{"--logtostderr=false"}) awsServices := NewFakeAWSServices(TestClusterID) - c, err := newAWSCloud(CloudConfig{}, awsServices) + c, err := newAWSCloud(config.CloudConfig{}, awsServices) if err != nil { t.Errorf("Error building aws cloud: %v", err) return diff --git a/pkg/providers/v1/variant/fargate/fargate.go b/pkg/providers/v1/variant/fargate/fargate.go new file mode 100644 index 0000000000..f4d7174603 --- /dev/null +++ b/pkg/providers/v1/variant/fargate/fargate.go @@ -0,0 +1,170 @@ +package fargate + +import ( + "fmt" + "strings" + + awssdk "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/service/ec2" + + v1 "k8s.io/api/core/v1" + cloudprovider "k8s.io/cloud-provider" + "k8s.io/klog/v2" + + "k8s.io/cloud-provider-aws/pkg/providers/v1/config" + "k8s.io/cloud-provider-aws/pkg/providers/v1/iface" + "k8s.io/cloud-provider-aws/pkg/providers/v1/variant" +) + +const ( + // fargateNodeNamePrefix string is added to awsInstance nodeName and providerID of Fargate nodes. + fargateNodeNamePrefix = "fargate-" +) + +type fargateVariant struct { + cloudConfig *config.CloudConfig + ec2API iface.EC2 + credentials *credentials.Credentials + provider config.SDKProvider +} + +func (f *fargateVariant) Initialize(cloudConfig *config.CloudConfig, credentials *credentials.Credentials, provider config.SDKProvider, ec2API iface.EC2, region string) error { + f.cloudConfig = cloudConfig + f.ec2API = ec2API + f.credentials = credentials + f.provider = provider + return nil +} + +func (f *fargateVariant) InstanceTypeByProviderID(instanceID string) (string, error) { + return "", nil +} + +func (f *fargateVariant) GetZone(instanceID, vpcID, region string) (cloudprovider.Zone, error) { + eni, err := f.DescribeNetworkInterfaces(f.ec2API, instanceID, vpcID) + if eni == nil || err != nil { + return cloudprovider.Zone{}, err + } + return cloudprovider.Zone{ + FailureDomain: *eni.AvailabilityZone, + Region: region, + }, nil +} + +func (f *fargateVariant) IsSupportedNode(nodeName string) bool { + return strings.HasPrefix(nodeName, fargateNodeNamePrefix) +} + +func (f *fargateVariant) NodeAddresses(instanceID, vpcID string) ([]v1.NodeAddress, error) { + eni, err := f.DescribeNetworkInterfaces(f.ec2API, instanceID, vpcID) + if eni == nil || err != nil { + return nil, err + } + + var addresses []v1.NodeAddress + + // Assign NodeInternalIP based on IP family + for _, family := range f.cloudConfig.Global.NodeIPFamilies { + switch family { + case "ipv4": + nodeAddresses := getNodeAddressesForFargateNode(awssdk.StringValue(eni.PrivateDnsName), awssdk.StringValue(eni.PrivateIpAddress)) + addresses = append(addresses, nodeAddresses...) + case "ipv6": + if eni.Ipv6Addresses == nil || len(eni.Ipv6Addresses) == 0 { + klog.Errorf("no Ipv6Addresses associated with the eni") + continue + } + internalIPv6Address := eni.Ipv6Addresses[0].Ipv6Address + nodeAddresses := getNodeAddressesForFargateNode(awssdk.StringValue(eni.PrivateDnsName), awssdk.StringValue(internalIPv6Address)) + addresses = append(addresses, nodeAddresses...) + } + } + return addresses, nil +} + +func (f *fargateVariant) InstanceExists(instanceID, vpcID string) (bool, error) { + eni, err := f.DescribeNetworkInterfaces(f.ec2API, instanceID, vpcID) + return eni != nil, err +} + +func (f *fargateVariant) InstanceShutdown(instanceID, vpcID string) (bool, error) { + eni, err := f.DescribeNetworkInterfaces(f.ec2API, instanceID, vpcID) + return eni != nil, err +} + +func newEc2Filter(name string, values ...string) *ec2.Filter { + filter := &ec2.Filter{ + Name: awssdk.String(name), + } + for _, value := range values { + filter.Values = append(filter.Values, awssdk.String(value)) + } + return filter +} + +const ( + // privateDNSNamePrefix is the prefix added to ENI Private DNS Name. + privateDNSNamePrefix = "ip-" +) + +// extract private ip address from node name +func nodeNameToIPAddress(nodeName string) string { + nodeName = strings.TrimPrefix(nodeName, privateDNSNamePrefix) + nodeName = strings.Split(nodeName, ".")[0] + return strings.ReplaceAll(nodeName, "-", ".") +} + +// DescribeNetworkInterfaces returns network interface information for the given DNS name. +func (f *fargateVariant) DescribeNetworkInterfaces(ec2API iface.EC2, instanceID, vpcID string) (*ec2.NetworkInterface, error) { + eniEndpoint := strings.TrimPrefix(instanceID, fargateNodeNamePrefix) + + filters := []*ec2.Filter{ + newEc2Filter("attachment.status", "attached"), + newEc2Filter("vpc-id", vpcID), + } + + // when enableDnsSupport is set to false in a VPC, interface will not have private DNS names. + // convert node name to ip address because ip-name based and resource-named EC2 resources + // may have different privateDNSName formats but same privateIpAddress format + if strings.HasPrefix(eniEndpoint, privateDNSNamePrefix) { + eniEndpoint = nodeNameToIPAddress(eniEndpoint) + } + + filters = append(filters, newEc2Filter("private-ip-address", eniEndpoint)) + + request := &ec2.DescribeNetworkInterfacesInput{ + Filters: filters, + } + + eni, err := ec2API.DescribeNetworkInterfaces(request) + if err != nil { + return nil, err + } + if len(eni.NetworkInterfaces) == 0 { + return nil, nil + } + if len(eni.NetworkInterfaces) != 1 { + // This should not be possible - ids should be unique + return nil, fmt.Errorf("multiple interfaces found with same id %q", eni.NetworkInterfaces) + } + return eni.NetworkInterfaces[0], nil +} + +func init() { + v := &fargateVariant{} + variant.RegisterVariant( + "fargate", + v, + ) +} + +// getNodeAddressesForFargateNode generates list of Node addresses for Fargate node. +func getNodeAddressesForFargateNode(privateDNSName, privateIP string) []v1.NodeAddress { + addresses := []v1.NodeAddress{} + addresses = append(addresses, v1.NodeAddress{Type: v1.NodeInternalIP, Address: privateIP}) + if privateDNSName != "" { + addresses = append(addresses, v1.NodeAddress{Type: v1.NodeInternalDNS, Address: privateDNSName}) + } + return addresses +} diff --git a/pkg/providers/v1/variant/variant.go b/pkg/providers/v1/variant/variant.go new file mode 100644 index 0000000000..39df86b795 --- /dev/null +++ b/pkg/providers/v1/variant/variant.go @@ -0,0 +1,88 @@ +package variant + +import ( + "fmt" + "sync" + + v1 "k8s.io/api/core/v1" + cloudprovider "k8s.io/cloud-provider" + + "github.com/aws/aws-sdk-go/aws/credentials" + + "k8s.io/cloud-provider-aws/pkg/providers/v1/config" + "k8s.io/cloud-provider-aws/pkg/providers/v1/iface" +) + +var variantsLock sync.Mutex +var variants = make(map[string]Variant) + +// Variant is a slightly different type of node +type Variant interface { + Initialize(cloudConfig *config.CloudConfig, credentials *credentials.Credentials, + provider config.SDKProvider, ec2API iface.EC2, region string) error + IsSupportedNode(nodeName string) bool + NodeAddresses(instanceID, vpcID string) ([]v1.NodeAddress, error) + GetZone(instanceID, vpcID, region string) (cloudprovider.Zone, error) + InstanceExists(instanceID, vpcID string) (bool, error) + InstanceShutdown(instanceID, vpcID string) (bool, error) + InstanceTypeByProviderID(id string) (string, error) +} + +// RegisterVariant is used to register code that needs to be called for a specific variant +func RegisterVariant(name string, variant Variant) { + variantsLock.Lock() + defer variantsLock.Unlock() + if _, found := variants[name]; found { + panic(fmt.Sprintf("%q was registered twice", name)) + } + variants[name] = variant +} + +// IsVariantNode helps evaluate if a specific variant handles a given instance +func IsVariantNode(instanceID string) bool { + variantsLock.Lock() + defer variantsLock.Unlock() + for _, v := range variants { + if v.IsSupportedNode(instanceID) { + return true + } + } + return false +} + +// NodeType returns the type name example: "fargate" +func NodeType(instanceID string) string { + variantsLock.Lock() + defer variantsLock.Unlock() + for key, v := range variants { + if v.IsSupportedNode(instanceID) { + return key + } + } + return "" +} + +// GetVariant returns the interface that can then be used to handle a specific instance +func GetVariant(instanceID string) Variant { + variantsLock.Lock() + defer variantsLock.Unlock() + for _, v := range variants { + if v.IsSupportedNode(instanceID) { + return v + } + } + return nil +} + +// GetVariants returns the names of all the variants registered +func GetVariants() []Variant { + variantsLock.Lock() + defer variantsLock.Unlock() + var values []Variant + + // Iterate over the map and collect all values + for _, v := range variants { + values = append(values, v) + } + return values +}