diff --git a/README.md b/README.md index bc8b8cab..4d7ed765 100644 --- a/README.md +++ b/README.md @@ -137,6 +137,7 @@ Overview of configuration which can be set via Ingress annotations. |`zalando.org/aws-load-balancer-ssl-policy`|`string`|`ELBSecurityPolicy-2016-08`| |`zalando.org/aws-load-balancer-type`| `nlb` \| `alb`|`alb`| |`zalando.org/aws-load-balancer-http2`| `true` \| `false`|`true`| +|[`zalando.org/aws-nlb-extra-listeners`](#extra-listen-ports)|`string`|N/A| |`zalando.org/aws-waf-web-acl-id` | `string` | N/A | |`kubernetes.io/ingress.class`|`string`|N/A| @@ -664,6 +665,29 @@ In *AWS CNI Mode* (`target-access-mode=AWSCNI`) the controller actively manages | `AWSCNI` | `false` | `true` | PodIP != HostIP: limited scaling and host bound | | `AWSCNI` | `false` | `false` | free scaling, pod VPC CNI IP used | +## Advanced Options for NLBs + +### Extra Listen Ports + +Some real world scenarios may require non-standard TCP or UDP ingresses. The `zalando.org/aws-nlb-extra-listeners` +annotation allows you to specify a list of additional listeners to add to your NLB. The value of the annotation should +be a valid JSON string of the following format. + +```json +[ + { + "protocol": "TCP", + "listenport": 22, + "targetport": 2222, + "podlabel": "application=ssh-service" + } +] +``` + +The `podlabel` value is used to register targets in the target group associated with the listener. This depends on the +AWS CNI Mode feature, where individual pods receive accessible IP addresses. The value is used to identify pods running +in the same namespace as the ingress that will receive traffic from the load balancer. + ## Trying it out The Ingress Controller's responsibility is limited to managing load balancers, as described above. To have a fully diff --git a/aws/adapter.go b/aws/adapter.go index 85e89dd4..5985184e 100644 --- a/aws/adapter.go +++ b/aws/adapter.go @@ -79,9 +79,19 @@ type Adapter struct { type TargetCNIconfig struct { Enabled bool - TargetGroupCh chan []string + TargetGroupCh chan []TargetGroupWithLabels } +type TargetGroupWithLabels struct { + ARN string + PodNamespace string + PodLabel string +} +type CNIEndpoint struct { + IPAddress string + Namespace string + Podlabel string +} type manifest struct { securityGroup *securityGroupDetails instance *instanceDetails @@ -233,7 +243,7 @@ func NewAdapter(clusterID, newControllerID, vpcID string, debug, disableInstrume customFilter: DefaultCustomFilter, TargetCNI: &TargetCNIconfig{ Enabled: false, - TargetGroupCh: make(chan []string, 10), + TargetGroupCh: make(chan []TargetGroupWithLabels, 10), }, } @@ -590,8 +600,33 @@ func (a *Adapter) UpdateTargetGroupsAndAutoScalingGroups(stacks []*Stack, proble a.TargetCNI.TargetGroupCh <- targetTypesARNs[elbv2.TargetTypeEnumIp] } + // run through any target groups with ALB targets and register all ALBs + for _, tg := range targetTypesARNs[elbv2.TargetTypeEnumAlb] { + albARNs := make([]string, 0, len(stacks)) + for _, stack := range stacks { + if stack.LoadBalancerType == LoadBalancerTypeApplication { + albARNs = append(albARNs, stack.loadbalancerARN) + } + } + registeredTargets, err := a.getRegisteredTargets(tg.ARN) + if err != nil { + problems.Add("failed to get existing targets: %w", err) + } + if err := a.registerAndDeregister(albARNs, registeredTargets, tg.ARN); err != nil { + problems.Add("failed to update target registration %w", err) + } + } + // remove the IP TGs from the list keeping all other TGs including problematic #127 and nonexistent #436 - targetGroupARNs := difference(allTargetGroupARNs, targetTypesARNs[elbv2.TargetTypeEnumIp]) + var targetGroupARNs []string + for targetType, tgList := range targetTypesARNs { + if targetType == elbv2.TargetTypeEnumIp || targetType == elbv2.TargetTypeEnumAlb { + continue + } + for _, tg := range tgList { + targetGroupARNs = append(targetGroupARNs, tg.ARN) + } + } ownerTags := map[string]string{ clusterIDTagPrefix + a.ClusterID(): resourceLifecycleOwned, @@ -637,7 +672,7 @@ func (a *Adapter) UpdateTargetGroupsAndAutoScalingGroups(stacks []*Stack, proble // All the required resources (listeners and target group) are created in a // transactional fashion. // Failure to create the stack causes it to be deleted automatically. -func (a *Adapter) CreateStack(certificateARNs []string, scheme, securityGroup, owner, sslPolicy, ipAddressType, wafWebACLID string, cwAlarms CloudWatchAlarmList, loadBalancerType string, http2 bool) (string, error) { +func (a *Adapter) CreateStack(certificateARNs []string, scheme, securityGroup, owner, sslPolicy, ipAddressType, wafWebACLID string, cwAlarms CloudWatchAlarmList, loadBalancerType string, http2 bool, extraListeners []ExtraListener) (string, error) { certARNs := make(map[string]time.Time, len(certificateARNs)) for _, arn := range certificateARNs { certARNs[arn] = time.Time{} @@ -688,6 +723,7 @@ func (a *Adapter) CreateStack(certificateARNs []string, scheme, securityGroup, o httpRedirectToHTTPS: a.httpRedirectToHTTPS, nlbCrossZone: a.nlbCrossZone, http2: http2, + extraListeners: extraListeners, tags: a.stackTags, internalDomains: a.internalDomains, targetAccessModeCNI: a.TargetCNI.Enabled, @@ -702,7 +738,7 @@ func (a *Adapter) CreateStack(certificateARNs []string, scheme, securityGroup, o return createStack(a.cloudformation, spec) } -func (a *Adapter) UpdateStack(stackName string, certificateARNs map[string]time.Time, scheme, securityGroup, owner, sslPolicy, ipAddressType, wafWebACLID string, cwAlarms CloudWatchAlarmList, loadBalancerType string, http2 bool) (string, error) { +func (a *Adapter) UpdateStack(stackName string, certificateARNs map[string]time.Time, scheme, securityGroup, owner, sslPolicy, ipAddressType, wafWebACLID string, cwAlarms CloudWatchAlarmList, loadBalancerType string, http2 bool, extraListeners []ExtraListener) (string, error) { if _, ok := SSLPolicies[sslPolicy]; !ok { return "", fmt.Errorf("invalid SSLPolicy '%s' defined", sslPolicy) } @@ -744,6 +780,7 @@ func (a *Adapter) UpdateStack(stackName string, certificateARNs map[string]time. httpRedirectToHTTPS: a.httpRedirectToHTTPS, nlbCrossZone: a.nlbCrossZone, http2: http2, + extraListeners: extraListeners, tags: a.stackTags, internalDomains: a.internalDomains, targetAccessModeCNI: a.TargetCNI.Enabled, @@ -1037,36 +1074,56 @@ func nonTargetedASGs(ownedASGs, targetedASGs map[string]*autoScalingGroupDetails return nonTargetedASGs } +func (a *Adapter) getRegisteredTargets(tgARN string) ([]string, error) { + tgh, err := a.elbv2.DescribeTargetHealth(&elbv2.DescribeTargetHealthInput{TargetGroupArn: &tgARN}) + if err != nil { + log.Errorf("unable to describe target health %v", err) + return []string{}, err + } + registeredTargets := make([]string, len(tgh.TargetHealthDescriptions)) + for i, target := range tgh.TargetHealthDescriptions { + registeredTargets[i] = *target.Target.Id + } + return registeredTargets, nil +} + +func (a *Adapter) registerAndDeregister(new []string, old []string, tgARN string) error { + toRegister := difference(new, old) + if len(toRegister) > 0 { + log.Info("Registering CNI targets: ", toRegister) + err := registerTargetsOnTargetGroups(a.elbv2, []string{tgARN}, toRegister) + if err != nil { + return err + } + } + toDeregister := difference(old, new) + if len(toDeregister) > 0 { + log.Info("Deregistering CNI targets: ", toDeregister) + err := deregisterTargetsOnTargetGroups(a.elbv2, []string{tgARN}, toDeregister) + if err != nil { + return err + } + } + return nil +} + // SetTargetsOnCNITargetGroups implements desired state for CNI target groups // by polling the current list of targets thus creating a diff of what needs to be added and removed. -func (a *Adapter) SetTargetsOnCNITargetGroups(endpoints, cniTargetGroupARNs []string) error { - log.Debugf("setting targets on CNI target groups: '%v'", cniTargetGroupARNs) - for _, targetGroupARN := range cniTargetGroupARNs { - tgh, err := a.elbv2.DescribeTargetHealth(&elbv2.DescribeTargetHealthInput{TargetGroupArn: &targetGroupARN}) +func (a *Adapter) SetTargetsOnCNITargetGroups(endpoints []CNIEndpoint, cniTargetGroups []TargetGroupWithLabels) error { + log.Debugf("setting targets on CNI target groups: '%v'", cniTargetGroups) + for _, targetGroup := range cniTargetGroups { + registeredTargets, err := a.getRegisteredTargets(targetGroup.ARN) if err != nil { - log.Errorf("unable to describe target health %v", err) - // continue for processing of the rest of the target groups continue } - registeredInstances := make([]string, len(tgh.TargetHealthDescriptions)) - for i, target := range tgh.TargetHealthDescriptions { - registeredInstances[i] = *target.Target.Id - } - toRegister := difference(endpoints, registeredInstances) - if len(toRegister) > 0 { - log.Info("Registering CNI targets: ", toRegister) - err := registerTargetsOnTargetGroups(a.elbv2, []string{targetGroupARN}, toRegister) - if err != nil { - return err + var matchingEndpoints []string + for _, endpoint := range endpoints { + if endpoint.Podlabel == targetGroup.PodLabel && endpoint.Namespace == targetGroup.PodNamespace { + matchingEndpoints = append(matchingEndpoints, endpoint.IPAddress) } } - toDeregister := difference(registeredInstances, endpoints) - if len(toDeregister) > 0 { - log.Info("Deregistering CNI targets: ", toDeregister) - err := deregisterTargetsOnTargetGroups(a.elbv2, []string{targetGroupARN}, toDeregister) - if err != nil { - return err - } + if err := a.registerAndDeregister(matchingEndpoints, registeredTargets, targetGroup.ARN); err != nil { + return err } } return nil diff --git a/aws/adapter_test.go b/aws/adapter_test.go index e095b4cd..ab7ff5f5 100644 --- a/aws/adapter_test.go +++ b/aws/adapter_test.go @@ -948,8 +948,91 @@ func TestWithxlbHealthyThresholdCount(t *testing.T) { }) } +func Test_getRegisteredTargets(t *testing.T) { + t.Run("should return an error if unable to describe target health", func(t *testing.T) { + m := &mockElbv2Client{ + outputs: elbv2MockOutputs{ + describeTargetHealth: &apiResponse{ + response: elbv2.DescribeTargetHealthOutput{TargetHealthDescriptions: []*elbv2.TargetHealthDescription{}}, + err: fmt.Errorf("error")}, + registerTargets: R(mockDTOutput(), nil), + deregisterTargets: R(mockDTOutput(), nil), + }, + } + a := Adapter{elbv2: m} + _, err := a.getRegisteredTargets("none") + require.Error(t, err) + }) + t.Run("should a slice of strings representing the ids of found targets", func(t *testing.T) { + thOut := elbv2.DescribeTargetHealthOutput{TargetHealthDescriptions: []*elbv2.TargetHealthDescription{ + {Target: &elbv2.TargetDescription{Id: aws.String("asg1")}}, + {Target: &elbv2.TargetDescription{Id: aws.String("asg2")}}, + {Target: &elbv2.TargetDescription{Id: aws.String("blah")}}, + }} + expected := []string{"asg1", "asg2", "blah"} + m := &mockElbv2Client{ + outputs: elbv2MockOutputs{ + describeTargetHealth: &apiResponse{response: &thOut, err: nil}, + registerTargets: R(mockDTOutput(), nil), + deregisterTargets: R(mockDTOutput(), nil), + }, + } + a := Adapter{elbv2: m} + response, err := a.getRegisteredTargets("none") + require.Nil(t, err) + require.Equal(t, expected, response) + }) +} + +func Test_registerAndDeregister(t *testing.T) { + t.Run("should return an error if unable to register new targets", func(t *testing.T) { + m := &mockElbv2Client{ + outputs: elbv2MockOutputs{ + registerTargets: R(mockDTOutput(), fmt.Errorf("this is an error")), + deregisterTargets: R(mockDTOutput(), nil), + }, + } + a := Adapter{elbv2: m} + err := a.registerAndDeregister([]string{"new"}, []string{"old"}, "none") + require.Error(t, err) + }) + t.Run("should return an error if unable to deregister new targets", func(t *testing.T) { + m := &mockElbv2Client{ + outputs: elbv2MockOutputs{ + registerTargets: R(mockDTOutput(), nil), + deregisterTargets: R(mockDTOutput(), fmt.Errorf("this is an error")), + }, + } + a := Adapter{elbv2: m} + err := a.registerAndDeregister([]string{"new"}, []string{"old"}, "none") + require.Error(t, err) + }) + t.Run("should return nil if there's nothing to register", func(t *testing.T) { + m := &mockElbv2Client{ + outputs: elbv2MockOutputs{ + registerTargets: R(mockDTOutput(), fmt.Errorf("this is an error")), + deregisterTargets: R(mockDTOutput(), fmt.Errorf("this is also an error")), + }, + } + a := Adapter{elbv2: m} + err := a.registerAndDeregister([]string{"same"}, []string{"same"}, "none") + require.Nil(t, err) + }) + t.Run("should return nil if there's no upstream errors", func(t *testing.T) { + m := &mockElbv2Client{ + outputs: elbv2MockOutputs{ + registerTargets: R(mockDTOutput(), nil), + deregisterTargets: R(mockDTOutput(), nil), + }, + } + a := Adapter{elbv2: m} + err := a.registerAndDeregister([]string{"new"}, []string{"old"}, "none") + require.Nil(t, err) + }) +} + func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) { - tgARNs := []string{"asg1"} + tgARNs := []TargetGroupWithLabels{{ARN: "asg1"}} thOut := elbv2.DescribeTargetHealthOutput{TargetHealthDescriptions: []*elbv2.TargetHealthDescription{}} m := &mockElbv2Client{ outputs: elbv2MockOutputs{ @@ -961,7 +1044,7 @@ func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) { a := &Adapter{elbv2: m, TargetCNI: &TargetCNIconfig{}} t.Run("adding a new endpoint", func(t *testing.T) { - require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1"}, tgARNs)) + require.NoError(t, a.SetTargetsOnCNITargetGroups([]CNIEndpoint{{IPAddress: "1.1.1.1"}}, tgARNs)) require.Equal(t, []*elbv2.RegisterTargetsInput{{ TargetGroupArn: aws.String("asg1"), Targets: []*elbv2.TargetDescription{{Id: aws.String("1.1.1.1")}}, @@ -975,7 +1058,8 @@ func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) { } m.rtinputs, m.dtinputs = nil, nil - require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1", "2.2.2.2", "3.3.3.3"}, tgARNs)) + require.NoError(t, a.SetTargetsOnCNITargetGroups( + []CNIEndpoint{{IPAddress: "1.1.1.1"}, {IPAddress: "2.2.2.2"}, {IPAddress: "3.3.3.3"}}, tgARNs)) require.Equal(t, []*elbv2.TargetDescription{ {Id: aws.String("2.2.2.2")}, {Id: aws.String("3.3.3.3")}, @@ -991,7 +1075,7 @@ func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) { }} m.rtinputs, m.dtinputs = nil, nil - require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1", "3.3.3.3"}, tgARNs)) + require.NoError(t, a.SetTargetsOnCNITargetGroups([]CNIEndpoint{{IPAddress: "1.1.1.1"}, {IPAddress: "3.3.3.3"}}, tgARNs)) require.Equal(t, []*elbv2.RegisterTargetsInput(nil), m.rtinputs) require.Equal(t, []*elbv2.TargetDescription{{Id: aws.String("2.2.2.2")}}, m.dtinputs[0].Targets) }) @@ -1004,7 +1088,8 @@ func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) { }} m.rtinputs, m.dtinputs = nil, nil - require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1", "2.2.2.2", "3.3.3.3"}, tgARNs)) + require.NoError(t, a.SetTargetsOnCNITargetGroups( + []CNIEndpoint{{IPAddress: "1.1.1.1"}, {IPAddress: "2.2.2.2"}, {IPAddress: "3.3.3.3"}}, tgARNs)) require.Equal(t, []*elbv2.TargetDescription{{Id: aws.String("3.3.3.3")}}, m.rtinputs[0].Targets) require.Equal(t, []*elbv2.TargetDescription{{Id: aws.String("4.4.4.4")}}, m.dtinputs[0].Targets) }) diff --git a/aws/asg.go b/aws/asg.go index 687954af..d62c9b49 100644 --- a/aws/asg.go +++ b/aws/asg.go @@ -258,8 +258,8 @@ func describeTargetGroups(elbv2svc elbv2iface.ELBV2API) (map[string]struct{}, er } // map the target group slice into specific types such as instance, ip, etc -func categorizeTargetTypeInstance(elbv2svc elbv2iface.ELBV2API, allTGARNs []string) (map[string][]string, error) { - targetTypes := make(map[string][]string) +func categorizeTargetTypeInstance(elbv2svc elbv2iface.ELBV2API, allTGARNs []string) (map[string][]TargetGroupWithLabels, error) { + targetTypes := make(map[string][]TargetGroupWithLabels) err := elbv2svc.DescribeTargetGroupsPagesWithContext(context.TODO(), &elbv2.DescribeTargetGroupsInput{}, func(resp *elbv2.DescribeTargetGroupsOutput, lastPage bool) bool { for _, tg := range resp.TargetGroups { @@ -267,7 +267,30 @@ func categorizeTargetTypeInstance(elbv2svc elbv2iface.ELBV2API, allTGARNs []stri if v != aws.StringValue(tg.TargetGroupArn) { continue } - targetTypes[aws.StringValue(tg.TargetType)] = append(targetTypes[aws.StringValue(tg.TargetType)], aws.StringValue(tg.TargetGroupArn)) + var podlabel, podnamespace string + log.Debugf("Looking for tags on %s", aws.StringValue(tg.TargetGroupArn)) + out, err := elbv2svc.DescribeTags(&elbv2.DescribeTagsInput{ResourceArns: []*string{tg.TargetGroupArn}}) + if err != nil { + log.Errorf("cannot describe tags on target group: %v", err) + } else { + for _, desc := range out.TagDescriptions { + for _, tag := range desc.Tags { + switch aws.StringValue(tag.Key) { + case podLabelTag: + podlabel = aws.StringValue(tag.Value) + case podNamespaceTag: + podnamespace = aws.StringValue(tag.Value) + } + } + } + } + log.Debugf("Adding tg with label: '%s' in namespace: '%s'", podlabel, podnamespace) + targetTypes[aws.StringValue(tg.TargetType)] = append( + targetTypes[aws.StringValue(tg.TargetType)], + TargetGroupWithLabels{ + ARN: aws.StringValue(tg.TargetGroupArn), + PodLabel: podlabel, + PodNamespace: podnamespace}) } } return true diff --git a/aws/asg_test.go b/aws/asg_test.go index 87e825b3..00354b61 100644 --- a/aws/asg_test.go +++ b/aws/asg_test.go @@ -682,29 +682,29 @@ func TestProcessChunked(t *testing.T) { func Test_categorizeTargetTypeInstance(t *testing.T) { for _, test := range []struct { name string - targetGroups map[string][]string + targetGroups map[string][]TargetGroupWithLabels }{ { name: "one from any type", - targetGroups: map[string][]string{ - elbv2.TargetTypeEnumInstance: {"instancy"}, - elbv2.TargetTypeEnumAlb: {"albly"}, - elbv2.TargetTypeEnumIp: {"ipvy"}, - elbv2.TargetTypeEnumLambda: {"lambada"}, + targetGroups: map[string][]TargetGroupWithLabels{ + elbv2.TargetTypeEnumInstance: {{ARN: "instancy"}}, + elbv2.TargetTypeEnumAlb: {{ARN: "albly"}}, + elbv2.TargetTypeEnumIp: {{ARN: "ipvy"}}, + elbv2.TargetTypeEnumLambda: {{ARN: "lambada"}}, }, }, { name: "one type many target groups", - targetGroups: map[string][]string{ - elbv2.TargetTypeEnumInstance: {"instancy", "foo", "void", "bar", "blank"}, + targetGroups: map[string][]TargetGroupWithLabels{ + elbv2.TargetTypeEnumInstance: {{ARN: "instancy"}, {ARN: "foo"}, {ARN: "void"}, {ARN: "bar"}, {ARN: "blank"}}, }, }, { name: "several types many target groups", - targetGroups: map[string][]string{ - elbv2.TargetTypeEnumInstance: {"instancy", "foo", "void", "bar", "blank"}, - elbv2.TargetTypeEnumAlb: {"albly", "alblily"}, - elbv2.TargetTypeEnumIp: {"ipvy"}, + targetGroups: map[string][]TargetGroupWithLabels{ + elbv2.TargetTypeEnumInstance: {{ARN: "instancy"}, {ARN: "foo"}, {ARN: "void"}, {ARN: "bar"}, {ARN: "blank"}}, + elbv2.TargetTypeEnumAlb: {{ARN: "albly"}, {ARN: "alblily"}}, + elbv2.TargetTypeEnumIp: {{ARN: "ipvy"}}, }, }, } { @@ -713,17 +713,20 @@ func Test_categorizeTargetTypeInstance(t *testing.T) { tgResponse := []*elbv2.TargetGroup{} for k, v := range test.targetGroups { for _, i := range v { - tg = append(tg, i) - tgResponse = append(tgResponse, &elbv2.TargetGroup{TargetGroupArn: aws.String(i), TargetType: aws.String(k)}) + tg = append(tg, i.ARN) + tgResponse = append(tgResponse, &elbv2.TargetGroup{TargetGroupArn: aws.String(i.ARN), TargetType: aws.String(k)}) } } - mockElbv2Svc := &mockElbv2Client{outputs: elbv2MockOutputs{describeTargetGroups: R(&elbv2.DescribeTargetGroupsOutput{TargetGroups: tgResponse}, nil)}} + mockElbv2Svc := &mockElbv2Client{outputs: elbv2MockOutputs{ + describeTargetGroups: R(&elbv2.DescribeTargetGroupsOutput{TargetGroups: tgResponse}, nil), + describeTags: R(&elbv2.DescribeTagsOutput{TagDescriptions: []*elbv2.TagDescription{}}, nil), + }} got, err := categorizeTargetTypeInstance(mockElbv2Svc, tg) assert.NoError(t, err) for k, v := range test.targetGroups { assert.Len(t, got[k], len(v)) - assert.Equal(t, got[k], v) + assert.Equal(t, v, got[k]) } }) } diff --git a/aws/cf.go b/aws/cf.go index 00a33ffd..6b60c955 100644 --- a/aws/cf.go +++ b/aws/cf.go @@ -1,6 +1,8 @@ package aws import ( + "encoding/base64" + "encoding/json" "fmt" "strings" "time" @@ -15,6 +17,9 @@ const ( certificateARNTagPrefix = "ingress:certificate-arn/" ingressOwnerTag = "ingress:owner" cwAlarmConfigHashTag = "cloudwatch:alarm-config-hash" + extraListenersTag = "ingress:extra-listeners" + podLabelTag = "ingress:podlabel" + podNamespaceTag = "ingress:podnamespace" ) // Stack is a simple wrapper around a CloudFormation Stack. @@ -29,12 +34,22 @@ type Stack struct { IpAddressType string LoadBalancerType string HTTP2 bool + ExtraListeners []ExtraListener OwnerIngress string CWAlarmConfigHash string TargetGroupARNs []string WAFWebACLID string CertificateARNs map[string]time.Time tags map[string]string + loadbalancerARN string +} + +type ExtraListener struct { + ListenProtocol string `json:"protocol"` + ListenPort int64 `json:"listenport"` + TargetPort int64 `json:"targetport"` + PodLabel string `json:"podlabel,omitempty"` + Namespace string } // IsComplete returns true if the stack status is a complete state. @@ -107,12 +122,15 @@ func (o stackOutput) dnsName() string { return o[outputLoadBalancerDNSName] } +func (o stackOutput) lbARN() string { + return o[outputLoadBalancerARN] +} + func (o stackOutput) targetGroupARNs() (arns []string) { - if arn, ok := o[outputTargetGroupARN]; ok { - arns = append(arns, arn) - } - if arn, ok := o[outputHTTPTargetGroupARN]; ok { - arns = append(arns, arn) + for k, v := range o { + if strings.Contains(k, "TargetGroupARN") { + arns = append(arns, v) + } } return } @@ -130,6 +148,7 @@ func convertStackParameters(parameters []*cloudformation.Parameter) map[string]s const ( // The following constants should be part of the Output section of the CloudFormation template outputLoadBalancerDNSName = "LoadBalancerDNSName" + outputLoadBalancerARN = "LoadBalancerARN" outputTargetGroupARN = "TargetGroupARN" outputHTTPTargetGroupARN = "HTTPTargetGroupARN" @@ -181,6 +200,7 @@ type stackSpec struct { cwAlarms CloudWatchAlarmList httpRedirectToHTTPS bool nlbCrossZone bool + extraListeners []ExtraListener http2 bool denyInternalDomains bool denyInternalDomainsResponse denyResp @@ -270,6 +290,11 @@ func createStack(svc cloudformationiface.CloudFormationAPI, spec *stackSpec) (st params.Tags = append(params.Tags, cfTag(cwAlarmConfigHashTag, spec.cwAlarms.Hash())) } + if len(spec.extraListeners) > 0 { + listeners, _ := json.Marshal(spec.extraListeners) + params.Tags = append(params.Tags, cfTag(extraListenersTag, base64.StdEncoding.EncodeToString(listeners))) + } + resp, err := svc.CreateStack(params) if err != nil { return spec.name, err @@ -343,6 +368,11 @@ func updateStack(svc cloudformationiface.CloudFormationAPI, spec *stackSpec) (st params.Tags = append(params.Tags, cfTag(cwAlarmConfigHashTag, spec.cwAlarms.Hash())) } + if len(spec.extraListeners) > 0 { + listeners, _ := json.Marshal(spec.extraListeners) + params.Tags = append(params.Tags, cfTag(extraListenersTag, base64.StdEncoding.EncodeToString(listeners))) + } + if spec.stackTerminationProtection { params := &cloudformation.UpdateTerminationProtectionInput{ StackName: aws.String(spec.name), @@ -420,7 +450,7 @@ func getStack(svc cloudformationiface.CloudFormationAPI, stackName string) (*Sta if err != nil { return nil, ErrLoadBalancerStackNotReady } - return mapToManagedStack(stack), nil + return mapToManagedStack(stack) } func getCFStackByName(svc cloudformationiface.CloudFormationAPI, stackName string) (*cloudformation.Stack, error) { @@ -447,13 +477,14 @@ func getCFStackByName(svc cloudformationiface.CloudFormationAPI, stackName strin return stack, nil } -func mapToManagedStack(stack *cloudformation.Stack) *Stack { +func mapToManagedStack(stack *cloudformation.Stack) (*Stack, error) { outputs := newStackOutput(stack.Outputs) tags := convertCloudFormationTags(stack.Tags) parameters := convertStackParameters(stack.Parameters) certificateARNs := make(map[string]time.Time, len(tags)) ownerIngress := "" + var extraListeners []ExtraListener for key, value := range tags { if strings.HasPrefix(key, certificateARNTagPrefix) { arn := strings.TrimPrefix(key, certificateARNTagPrefix) @@ -473,6 +504,13 @@ func mapToManagedStack(stack *cloudformation.Stack) *Stack { if key == ingressOwnerTag { ownerIngress = value } + + if key == extraListenersTag { + decodedListeners, _ := base64.StdEncoding.DecodeString(value) + if err := json.Unmarshal(decodedListeners, &extraListeners); err != nil { + return &Stack{}, err + } + } } http2 := true @@ -497,16 +535,23 @@ func mapToManagedStack(stack *cloudformation.Stack) *Stack { statusReason: aws.StringValue(stack.StackStatusReason), CWAlarmConfigHash: tags[cwAlarmConfigHashTag], WAFWebACLID: parameters[parameterLoadBalancerWAFWebACLIDParameter], - } + ExtraListeners: extraListeners, + loadbalancerARN: outputs.lbARN(), + }, nil } func findManagedStacks(svc cloudformationiface.CloudFormationAPI, clusterID, controllerID string) ([]*Stack, error) { stacks := make([]*Stack, 0) + errors := make([]error, 0) err := svc.DescribeStacksPages(&cloudformation.DescribeStacksInput{}, func(page *cloudformation.DescribeStacksOutput, lastPage bool) bool { for _, s := range page.Stacks { if isManagedStack(s.Tags, clusterID, controllerID) { - stacks = append(stacks, mapToManagedStack(s)) + stack, err := mapToManagedStack(s) + if err != nil { + errors = append(errors, err) + } + stacks = append(stacks, stack) } } return true @@ -514,6 +559,9 @@ func findManagedStacks(svc cloudformationiface.CloudFormationAPI, clusterID, con if err != nil { return nil, fmt.Errorf("findManagedStacks failed to list stacks: %v", err) } + if len(errors) > 0 { + return nil, fmt.Errorf("mapToManagedStacks returned errors: %v", errors) + } return stacks, nil } diff --git a/aws/cf_template.go b/aws/cf_template.go index 75a2b7b2..33f5ffd4 100644 --- a/aws/cf_template.go +++ b/aws/cf_template.go @@ -120,19 +120,25 @@ func generateTemplate(spec *stackSpec) (string, error) { Description: "DNS name for the LoadBalancer", Value: cloudformation.GetAtt("LB", "DNSName").String(), }, + outputLoadBalancerARN: &cloudformation.Output{ + Description: "ARN of the LoadBalancer", + Value: cloudformation.Ref("LB").String(), + }, outputTargetGroupARN: &cloudformation.Output{ - Description: "The ARN of the TargetGroup", + Description: "The ARN of the main TargetGroup", Value: cloudformation.Ref(httpsTargetGroupName).String(), }, } - template.AddResource(httpsTargetGroupName, newTargetGroup(spec, parameterTargetGroupTargetPortParameter)) + listener := ExtraListener{} + template.AddResource(httpsTargetGroupName, newTargetGroup(spec, parameterTargetGroupTargetPortParameter, listener)) if !spec.httpDisabled { // Use the same target group for HTTP Listener or create another one if needed httpTargetGroupName := httpsTargetGroupName if spec.httpTargetPort != spec.targetPort { httpTargetGroupName = "TGHTTP" + listener := ExtraListener{} template.Parameters[parameterTargetGroupHTTPTargetPortParameter] = &cloudformation.Parameter{ Type: "Number", Description: "The HTTP target port", @@ -141,7 +147,7 @@ func generateTemplate(spec *stackSpec) (string, error) { Description: "The ARN of the HTTP TargetGroup", Value: cloudformation.Ref(httpTargetGroupName).String(), } - template.AddResource(httpTargetGroupName, newTargetGroup(spec, parameterTargetGroupHTTPTargetPortParameter)) + template.AddResource(httpTargetGroupName, newTargetGroup(spec, parameterTargetGroupHTTPTargetPortParameter, listener)) } // Add an HTTP Listener resource @@ -282,6 +288,28 @@ func generateTemplate(spec *stackSpec) (string, error) { } + if spec.loadbalancerType == LoadBalancerTypeNetwork { + for idx, listener := range spec.extraListeners { + tgName := fmt.Sprintf("ExtraTG%d", idx) + template.Outputs[fmt.Sprintf("%sTargetGroupARN", tgName)] = &cloudformation.Output{ + Description: fmt.Sprintf("The ARN of the %s TargetGroup", tgName), + Value: cloudformation.Ref(tgName).String(), + } + template.AddResource(tgName, newTargetGroup(spec, parameterTargetGroupHTTPTargetPortParameter, listener)) + template.AddResource(fmt.Sprintf("ExtraListener%d", idx), &cloudformation.ElasticLoadBalancingV2Listener{ + DefaultActions: &cloudformation.ElasticLoadBalancingV2ListenerActionList{ + { + Type: cloudformation.String("forward"), + TargetGroupArn: cloudformation.Ref(tgName).String(), + }, + }, + LoadBalancerArn: cloudformation.Ref("LB").String(), + Port: cloudformation.Integer(listener.ListenPort), + Protocol: cloudformation.String(listener.ListenProtocol), + }) + } + } + // Build up the LoadBalancerAttributes list, as there is no way to make attributes conditional in the template lbAttrList := make(cloudformation.ElasticLoadBalancingV2LoadBalancerLoadBalancerAttributeList, 0, 4) @@ -444,17 +472,31 @@ func generateDenyInternalTrafficRule(listenerName string, rulePriority int64, in } } -func newTargetGroup(spec *stackSpec, targetPortParameter string) *cloudformation.ElasticLoadBalancingV2TargetGroup { +func newTargetGroup(spec *stackSpec, targetPortParameter string, listener ExtraListener) *cloudformation.ElasticLoadBalancingV2TargetGroup { targetType := elbv2.TargetTypeEnumInstance if spec.targetAccessModeCNI { targetType = elbv2.TargetTypeEnumIp } protocol := "HTTP" healthCheckProtocol := "HTTP" + port := cloudformation.Ref(targetPortParameter).Integer() + healthCheckPort := cloudformation.Ref(parameterTargetGroupHealthCheckPortParameter).String() healthyThresholdCount, unhealthyThresholdCount := spec.albHealthyThresholdCount, spec.albUnhealthyThresholdCount + + tgAttributes := &cloudformation.ElasticLoadBalancingV2TargetGroupTargetGroupAttributeList{ + { + Key: cloudformation.String("deregistration_delay.timeout_seconds"), + Value: cloudformation.String(fmt.Sprintf("%d", spec.deregistrationDelayTimeoutSeconds)), + }, + } + if spec.loadbalancerType == LoadBalancerTypeNetwork { protocol = "TCP" - healthCheckProtocol = "HTTP" + if listener.ListenProtocol != "" { + protocol, healthCheckProtocol = listener.ListenProtocol, listener.ListenProtocol + port = cloudformation.Integer(listener.TargetPort) + healthCheckPort = cloudformation.String(fmt.Sprintf("%d", listener.TargetPort)) + } // For NLBs the healthy and unhealthy threshold count value must be equal healthyThresholdCount, unhealthyThresholdCount = spec.nlbHealthyThresholdCount, spec.nlbHealthyThresholdCount } else if spec.targetHTTPS { @@ -462,25 +504,42 @@ func newTargetGroup(spec *stackSpec, targetPortParameter string) *cloudformation healthCheckProtocol = "HTTPS" } + interval := cloudformation.Ref(parameterTargetGroupHealthCheckIntervalParameter).Integer() + healthCheckPath := cloudformation.Ref(parameterTargetGroupHealthCheckPathParameter).String() + if protocol == "TCP" || healthCheckProtocol == "TCP" { + interval = cloudformation.Integer(10) + if healthCheckProtocol == "TCP" { + healthCheckPath = cloudformation.Ref("AWS::NoValue").String() + } + } + targetGroup := &cloudformation.ElasticLoadBalancingV2TargetGroup{ - TargetGroupAttributes: &cloudformation.ElasticLoadBalancingV2TargetGroupTargetGroupAttributeList{ - { - Key: cloudformation.String("deregistration_delay.timeout_seconds"), - Value: cloudformation.String(fmt.Sprintf("%d", spec.deregistrationDelayTimeoutSeconds)), - }, - }, - HealthCheckIntervalSeconds: cloudformation.Ref(parameterTargetGroupHealthCheckIntervalParameter).Integer(), - HealthCheckPath: cloudformation.Ref(parameterTargetGroupHealthCheckPathParameter).String(), - HealthCheckPort: cloudformation.Ref(parameterTargetGroupHealthCheckPortParameter).String(), + TargetGroupAttributes: tgAttributes, + HealthCheckIntervalSeconds: interval, + HealthCheckPath: healthCheckPath, + HealthCheckPort: healthCheckPort, HealthCheckProtocol: cloudformation.String(healthCheckProtocol), HealthyThresholdCount: cloudformation.Integer(int64(healthyThresholdCount)), UnhealthyThresholdCount: cloudformation.Integer(int64(unhealthyThresholdCount)), - Port: cloudformation.Ref(targetPortParameter).Integer(), + Port: port, Protocol: cloudformation.String(protocol), TargetType: cloudformation.String(targetType), VPCID: cloudformation.Ref(parameterTargetGroupVPCIDParameter).String(), } + if listener.PodLabel != "" { + targetGroup.Tags = &cloudformation.TagList{ + { + Key: cloudformation.String(podLabelTag), + Value: cloudformation.String(listener.PodLabel), + }, + { + Key: cloudformation.String(podNamespaceTag), + Value: cloudformation.String(listener.Namespace), + }, + } + } + // custom target group healthcheck only supported when the target group protocol is != TCP if protocol != "TCP" { targetGroup.HealthCheckTimeoutSeconds = cloudformation.Ref(parameterTargetGroupHealthCheckTimeoutParameter).Integer() diff --git a/aws/cf_template_test.go b/aws/cf_template_test.go index 6d67ceeb..7218438b 100644 --- a/aws/cf_template_test.go +++ b/aws/cf_template_test.go @@ -614,6 +614,16 @@ func TestGenerateTemplate(t *testing.T) { require.NotEqual(t, cloudformation.Integer(3), tg.UnhealthyThresholdCount) }, }, + { + name: "Nlbs with ExtraListeners should have an ExtraTG0", + spec: &stackSpec{ + loadbalancerType: LoadBalancerTypeNetwork, + extraListeners: []ExtraListener{{ListenProtocol: "TCP", ListenPort: 22, TargetPort: 2222, PodLabel: "app=test"}}, + }, + validate: func(t *testing.T, template *cloudformation.Template) { + validateTargetGroupListener(t, template, "ExtraTG0", "ExtraListener0", 22, "TCP") + }, + }, { name: "Default TG type is Instance", spec: &stackSpec{ diff --git a/aws/cf_test.go b/aws/cf_test.go index 603e1e37..eb37fce2 100644 --- a/aws/cf_test.go +++ b/aws/cf_test.go @@ -72,6 +72,19 @@ func TestCreatingStack(t *testing.T) { "fake-stack-id", false, }, + { + "stack extra listeners", + stackSpec{ + name: "foo", + securityGroupID: "bar", + vpcID: "baz", + loadbalancerType: LoadBalancerTypeNetwork, + extraListeners: []ExtraListener{{ListenProtocol: "TCP", TargetPort: 2222, ListenPort: 22, PodLabel: "app=test"}}, + }, + cfMockOutputs{createStack: R(mockCSOutput("fake-stack-id"), nil)}, + "fake-stack-id", + false, + }, { "stack with NLB http port", stackSpec{ @@ -177,6 +190,19 @@ func TestUpdatingStack(t *testing.T) { "fake-stack-id", false, }, + { + "stack extra listeners", + stackSpec{ + name: "foo", + securityGroupID: "bar", + vpcID: "baz", + loadbalancerType: LoadBalancerTypeNetwork, + extraListeners: []ExtraListener{{ListenProtocol: "TCP", TargetPort: 2222, ListenPort: 22, PodLabel: "app=test"}}, + }, + cfMockOutputs{updateStack: R(mockUSOutput("fake-stack-id"), nil)}, + "fake-stack-id", + false, + }, } { t.Run(ti.name, func(t *testing.T) { c := &mockCloudFormationClient{outputs: ti.givenOutputs} @@ -194,6 +220,17 @@ func TestUpdatingStack(t *testing.T) { } } +func Test_lbARN(t *testing.T) { + t.Run("should return the ARN", func(t *testing.T) { + want := "blah" + s := stackOutput{outputLoadBalancerARN: want} + got := s.lbARN() + if want != got { + t.Errorf("unexpected result. wanted %+v, got %+v", want, got) + } + }) +} + func TestDeleteStack(t *testing.T) { for _, ti := range []struct { msg string diff --git a/kubernetes/adapter.go b/kubernetes/adapter.go index 615cff15..85d5a095 100644 --- a/kubernetes/adapter.go +++ b/kubernetes/adapter.go @@ -1,6 +1,7 @@ package kubernetes import ( + "encoding/json" "errors" "fmt" "strings" @@ -23,6 +24,7 @@ type Adapter struct { ingressDefaultLoadBalancerType string clusterLocalDomain string routeGroupSupport bool + extraCNIEndpoints []aws.CNIEndpoint } type IngressType string @@ -78,6 +80,7 @@ type Ingress struct { ClusterLocal bool CertificateARN string Hostname string + ExtraListeners []aws.ExtraListener Scheme string SecurityGroup string SSLPolicy string @@ -210,6 +213,24 @@ func (a *Adapter) newIngress(typ IngressType, metadata kubeItemMetadata, host st wafWebAclId, hasWAF := annotations[ingressWAFWebACLIDAnnotation] + var extraListeners []aws.ExtraListener + rawlisteners, hasExtraListeners := annotations[ingressNLBExtraListenersAnnotation] + if hasExtraListeners { + if loadBalancerType != loadBalancerTypeNLB { + return nil, errors.New("extra listeners are only supported on NLBs") + } + if err := json.Unmarshal([]byte(rawlisteners), &extraListeners); err != nil { + return nil, fmt.Errorf("unable to parse aws-nlb-extra-listeners annotation: %v", err) + } + for idx, listener := range extraListeners { + if listener.ListenProtocol != "TCP" && listener.ListenProtocol != "UDP" && listener.ListenProtocol != "TCP_UDP" { + return nil, errors.New("only TCP, UDP, or TCP_UDP are allowed as protocols for extra listeners") + } + extraListeners[idx].Namespace = metadata.Namespace + a.extraCNIEndpoints = append(a.extraCNIEndpoints, aws.CNIEndpoint{Namespace: metadata.Namespace, Podlabel: listener.PodLabel}) + } + } + if (loadBalancerType == loadBalancerTypeNLB) && (hasSG || hasWAF) { if hasLB { return nil, errors.New("security group or WAF are not supported by NLB (configured by annotation)") @@ -251,6 +272,7 @@ func (a *Adapter) newIngress(typ IngressType, metadata kubeItemMetadata, host st LoadBalancerType: loadBalancerType, WAFWebACLID: wafWebAclId, HTTP2: http2, + ExtraListeners: extraListeners, }, nil } diff --git a/kubernetes/adapter_test.go b/kubernetes/adapter_test.go index 730338b8..f80df690 100644 --- a/kubernetes/adapter_test.go +++ b/kubernetes/adapter_test.go @@ -441,6 +441,114 @@ func TestNewIngressFromKube(tt *testing.T) { }, }, }, + { + msg: "test setting extra listeners annotation on an ALB raises an error", + defaultLoadBalancerType: aws.LoadBalancerTypeApplication, + ingressError: true, + kubeIngress: &ingress{ + Metadata: kubeItemMetadata{ + Namespace: "default", + Name: "foo", + Annotations: map[string]string{ + ingressNLBExtraListenersAnnotation: `[{"protocol": "TCP", "listenport": 22, "targetport": 2222, "podlabel": "app=test"}]`, + }, + }, + Status: ingressStatus{ + LoadBalancer: ingressLoadBalancerStatus{ + Ingress: []ingressLoadBalancer{ + {Hostname: "bar"}, + }, + }, + }, + }, + }, + { + msg: "test invalid extra listeners annotation raises an error", + defaultLoadBalancerType: aws.LoadBalancerTypeApplication, + ingressError: true, + kubeIngress: &ingress{ + Metadata: kubeItemMetadata{ + Namespace: "default", + Name: "foo", + Annotations: map[string]string{ + ingressLoadBalancerTypeAnnotation: loadBalancerTypeNLB, + ingressNLBExtraListenersAnnotation: "bad data", + }, + }, + Status: ingressStatus{ + LoadBalancer: ingressLoadBalancerStatus{ + Ingress: []ingressLoadBalancer{ + {Hostname: "bar"}, + }, + }, + }, + }, + }, + { + msg: "test using an unsupported protocol raises an error", + defaultLoadBalancerType: aws.LoadBalancerTypeApplication, + ingressError: true, + kubeIngress: &ingress{ + Metadata: kubeItemMetadata{ + Namespace: "default", + Name: "foo", + Annotations: map[string]string{ + ingressLoadBalancerTypeAnnotation: loadBalancerTypeNLB, + ingressNLBExtraListenersAnnotation: `[{"protocol": "HTTP", "listenport": 22, "targetport": 2222, "podlabel": "app=test"}]`, + }, + }, + Status: ingressStatus{ + LoadBalancer: ingressLoadBalancerStatus{ + Ingress: []ingressLoadBalancer{ + {Hostname: "bar"}, + }, + }, + }, + }, + }, + { + msg: "test extra listener with proper annotation does not raise an error", + defaultLoadBalancerType: aws.LoadBalancerTypeApplication, + ingressError: false, + ingress: &Ingress{ + Namespace: "default", + Name: "foo", + Hostname: "bar", + Scheme: "internet-facing", + Shared: true, + HTTP2: true, + SecurityGroup: testIngressDefaultSecurityGroup, + ClusterLocal: true, + SSLPolicy: testSSLPolicy, + IPAddressType: testIPAddressTypeDefault, + LoadBalancerType: aws.LoadBalancerTypeNetwork, + ResourceType: TypeIngress, + ExtraListeners: []aws.ExtraListener{{ + ListenProtocol: "TCP", + ListenPort: 22, + TargetPort: 2222, + PodLabel: "app=test", + Namespace: "default", + }}, + }, + kubeIngress: &ingress{ + Metadata: kubeItemMetadata{ + Namespace: "default", + Name: "foo", + Annotations: map[string]string{ + ingressLoadBalancerTypeAnnotation: loadBalancerTypeNLB, + ingressNLBExtraListenersAnnotation: `[{"protocol": "TCP", "listenport": 22, "targetport": 2222, "podlabel": "app=test"}]`, + }, + }, + Status: ingressStatus{ + LoadBalancer: ingressLoadBalancerStatus{ + Ingress: []ingressLoadBalancer{ + {Hostname: "bar"}, + }, + }, + }, + }, + }, } { tt.Run(tc.msg, func(t *testing.T) { a, err := NewAdapter(testConfig, IngressAPIVersionNetworking, testIngressFilter, testIngressDefaultSecurityGroup, testSSLPolicy, tc.defaultLoadBalancerType, DefaultClusterLocalDomain, false) diff --git a/kubernetes/ingress.go b/kubernetes/ingress.go index e1da9fcb..f88e3653 100644 --- a/kubernetes/ingress.go +++ b/kubernetes/ingress.go @@ -61,21 +61,22 @@ type ingressLoadBalancer struct { const ( // ingressALBIPAddressType is used in external-dns, https://github.com/kubernetes-incubator/external-dns/pull/1079 - ingressALBIPAddressType = "alb.ingress.kubernetes.io/ip-address-type" - IngressAPIVersionExtensions = "extensions/v1beta1" - IngressAPIVersionNetworking = "networking.k8s.io/v1beta1" - IngressAPIVersionNetworkingV1 = "networking.k8s.io/v1" - ingressListResource = "/apis/%s/ingresses" - ingressPatchStatusResource = "/apis/%s/namespaces/%s/ingresses/%s/status" - ingressCertificateARNAnnotation = "zalando.org/aws-load-balancer-ssl-cert" - ingressSchemeAnnotation = "zalando.org/aws-load-balancer-scheme" - ingressSharedAnnotation = "zalando.org/aws-load-balancer-shared" - ingressSecurityGroupAnnotation = "zalando.org/aws-load-balancer-security-group" - ingressSSLPolicyAnnotation = "zalando.org/aws-load-balancer-ssl-policy" - ingressLoadBalancerTypeAnnotation = "zalando.org/aws-load-balancer-type" - ingressHTTP2Annotation = "zalando.org/aws-load-balancer-http2" - ingressWAFWebACLIDAnnotation = "zalando.org/aws-waf-web-acl-id" - ingressClassAnnotation = "kubernetes.io/ingress.class" + ingressALBIPAddressType = "alb.ingress.kubernetes.io/ip-address-type" + IngressAPIVersionExtensions = "extensions/v1beta1" + IngressAPIVersionNetworking = "networking.k8s.io/v1beta1" + IngressAPIVersionNetworkingV1 = "networking.k8s.io/v1" + ingressListResource = "/apis/%s/ingresses" + ingressPatchStatusResource = "/apis/%s/namespaces/%s/ingresses/%s/status" + ingressCertificateARNAnnotation = "zalando.org/aws-load-balancer-ssl-cert" + ingressSchemeAnnotation = "zalando.org/aws-load-balancer-scheme" + ingressSharedAnnotation = "zalando.org/aws-load-balancer-shared" + ingressSecurityGroupAnnotation = "zalando.org/aws-load-balancer-security-group" + ingressSSLPolicyAnnotation = "zalando.org/aws-load-balancer-ssl-policy" + ingressLoadBalancerTypeAnnotation = "zalando.org/aws-load-balancer-type" + ingressHTTP2Annotation = "zalando.org/aws-load-balancer-http2" + ingressWAFWebACLIDAnnotation = "zalando.org/aws-waf-web-acl-id" + ingressNLBExtraListenersAnnotation = "zalando.org/aws-nlb-extra-listeners" + ingressClassAnnotation = "kubernetes.io/ingress.class" ) func getAnnotationsString(annotations map[string]string, key string, defaultValue string) string { diff --git a/kubernetes/pods.go b/kubernetes/pods.go index b358b4b1..04c50090 100644 --- a/kubernetes/pods.go +++ b/kubernetes/pods.go @@ -3,13 +3,12 @@ package kubernetes import ( "context" "fmt" - "sort" "sync" "time" log "github.com/sirupsen/logrus" + "github.com/zalando-incubator/kube-ingress-aws-controller/aws" corev1 "k8s.io/api/core/v1" - apisv1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" @@ -17,14 +16,29 @@ import ( const resyncInterval = 1 * time.Minute +func (a *Adapter) storeWatchedPods(pod *corev1.Pod, podEndpoints *sync.Map) { + for k, v := range pod.Labels { + selector := fmt.Sprintf("%s=%s", k, v) + if pod.Namespace == a.cniPodNamespace && selector == a.cniPodLabelSelector { + log.Infof("New discovered pod: %s IP: %s", pod.Name, pod.Status.PodIP) + podEndpoints.LoadOrStore(pod.Name, aws.CNIEndpoint{IPAddress: pod.Status.PodIP}) + } + for _, endpoint := range a.extraCNIEndpoints { + if endpoint.Namespace == pod.Namespace && endpoint.Podlabel == selector { + log.Infof("New discovered pod: %s IP: %s", pod.Name, pod.Status.PodIP) + podEndpoints.LoadOrStore(pod.Name, aws.CNIEndpoint{IPAddress: pod.Status.PodIP, Namespace: pod.Namespace, Podlabel: selector}) + } + } + } +} + // PodInformer is a event handler for Pod events registered to, that builds a local list of valid and relevant pods // and sends an event to the endpoint channel, triggering a resync of the targets. -func (a *Adapter) PodInformer(ctx context.Context, endpointChan chan<- []string) (err error) { +func (a *Adapter) PodInformer(ctx context.Context, endpointChan chan<- []aws.CNIEndpoint) (err error) { podEndpoints := sync.Map{} - log.Infof("Watching for Pods with labelselector %s in namespace %s", a.cniPodLabelSelector, a.cniPodNamespace) - factory := informers.NewSharedInformerFactoryWithOptions(a.clientset, resyncInterval, informers.WithNamespace(a.cniPodNamespace), - informers.WithTweakListOptions(func(options *apisv1.ListOptions) { options.LabelSelector = a.cniPodLabelSelector })) + // log.Infof("Watching for Pods with labelselector %s in namespace %s", a.cniPodLabelSelector, a.cniPodNamespace) + factory := informers.NewSharedInformerFactoryWithOptions(a.clientset, resyncInterval) informer := factory.Core().V1().Pods().Informer() factory.Start(ctx.Done()) @@ -39,12 +53,12 @@ func (a *Adapter) PodInformer(ctx context.Context, endpointChan chan<- []string) if err == nil && len(podList) > 0 { break } - log.Errorf("Error listing Pods with labelselector %s in namespace %s: %v", a.cniPodNamespace, a.cniPodLabelSelector, err) + log.Errorf("error listing Pods: %v", err) time.Sleep(resyncInterval) } for _, pod := range podList { if !isPodTerminating(pod) && isPodRunning(pod) { - podEndpoints.Store(pod.Name, pod.Status.PodIP) + a.storeWatchedPods(pod, &podEndpoints) } } queueEndpoints(&podEndpoints, endpointChan) @@ -68,11 +82,12 @@ func (a *Adapter) PodInformer(ctx context.Context, endpointChan chan<- []string) queueEndpoints(&podEndpoints, endpointChan) case isPodRunning(pod): - if _, isStored := podEndpoints.LoadOrStore(pod.Name, pod.Status.PodIP); isStored { + if _, isStored := podEndpoints.Load(pod.Name); isStored { return } - log.Infof("New discovered pod: %s IP: %s", pod.Name, pod.Status.PodIP) + a.storeWatchedPods(pod, &podEndpoints) queueEndpoints(&podEndpoints, endpointChan) + } }, }) @@ -80,13 +95,12 @@ func (a *Adapter) PodInformer(ctx context.Context, endpointChan chan<- []string) return nil } -func queueEndpoints(podEndpoints *sync.Map, endpointChan chan<- []string) { - podList := []string{} +func queueEndpoints(podEndpoints *sync.Map, endpointChan chan<- []aws.CNIEndpoint) { + podList := []aws.CNIEndpoint{} podEndpoints.Range(func(key, value interface{}) bool { - podList = append(podList, value.(string)) + podList = append(podList, value.(aws.CNIEndpoint)) return true }) - sort.StringSlice(podList).Sort() endpointChan <- podList } diff --git a/kubernetes/pods_test.go b/kubernetes/pods_test.go index ff321918..5a34fd67 100644 --- a/kubernetes/pods_test.go +++ b/kubernetes/pods_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/stretchr/testify/require" + "github.com/zalando-incubator/kube-ingress-aws-controller/aws" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -36,7 +37,7 @@ func TestAdapter_PodInformer(t *testing.T) { client := fake.NewSimpleClientset() a.clientset = client - pods := make(chan []string, 10) + pods := make(chan []aws.CNIEndpoint, 10) t.Run("initial state of five ready pods, a terminating and pending one", func(t *testing.T) { for i := 1; i <= 5; i++ { diff --git a/worker.go b/worker.go index ddba79ee..3aaa3a69 100644 --- a/worker.go +++ b/worker.go @@ -36,6 +36,7 @@ type loadBalancer struct { certTTL time.Duration cwAlarms aws.CloudWatchAlarmList loadBalancerType string + extraListeners []aws.ExtraListener } const ( @@ -385,6 +386,7 @@ func getAllLoadBalancers(certs CertificatesFinder, certTTL time.Duration, stacks ipAddressType: stack.IpAddressType, loadBalancerType: stack.LoadBalancerType, http2: stack.HTTP2, + extraListeners: stack.ExtraListeners, wafWebACLID: stack.WAFWebACLID, certTTL: certTTL, } @@ -478,6 +480,7 @@ func matchIngressesToLoadBalancers( loadBalancerType: ingress.LoadBalancerType, http2: ingress.HTTP2, wafWebACLID: ingress.WAFWebACLID, + extraListeners: ingress.ExtraListeners, }, ) } @@ -535,7 +538,7 @@ func createStack(awsAdapter *aws.Adapter, lb *loadBalancer, problems *problem.Li log.Infof("Creating stack for certificates %q / ingress %q", certificates, lb.ingresses) - stackId, err := awsAdapter.CreateStack(certificates, lb.scheme, lb.securityGroup, lb.Owner(), lb.sslPolicy, lb.ipAddressType, lb.wafWebACLID, lb.cwAlarms, lb.loadBalancerType, lb.http2) + stackId, err := awsAdapter.CreateStack(certificates, lb.scheme, lb.securityGroup, lb.Owner(), lb.sslPolicy, lb.ipAddressType, lb.wafWebACLID, lb.cwAlarms, lb.loadBalancerType, lb.http2, lb.extraListeners) if err != nil { if isAlreadyExistsError(err) { lb.stack, err = awsAdapter.GetStack(stackId) @@ -555,7 +558,7 @@ func updateStack(awsAdapter *aws.Adapter, lb *loadBalancer, problems *problem.Li log.Infof("Updating %q stack for %d certificates / %d ingresses", lb.scheme, len(certificates), len(lb.ingresses)) - stackId, err := awsAdapter.UpdateStack(lb.stack.Name, certificates, lb.scheme, lb.securityGroup, lb.Owner(), lb.sslPolicy, lb.ipAddressType, lb.wafWebACLID, lb.cwAlarms, lb.loadBalancerType, lb.http2) + stackId, err := awsAdapter.UpdateStack(lb.stack.Name, certificates, lb.scheme, lb.securityGroup, lb.Owner(), lb.sslPolicy, lb.ipAddressType, lb.wafWebACLID, lb.cwAlarms, lb.loadBalancerType, lb.http2, lb.extraListeners) if isNoUpdatesToBePerformedError(err) { log.Debugf("Stack(%q) is already up to date", certificates) } else if err != nil { @@ -670,13 +673,13 @@ func getCloudWatchAlarmsFromConfigMap(configMap *kubernetes.ConfigMap) aws.Cloud // cniEventHandler syncronizes the events from kubernetes and the status updates from the load balancer controller. // Events updates a rate limited. func cniEventHandler(ctx context.Context, targetCNIcfg *aws.TargetCNIconfig, - targetSetter func([]string, []string) error, informer func(context.Context, chan<- []string) error) { + targetSetter func([]aws.CNIEndpoint, []aws.TargetGroupWithLabels) error, informer func(context.Context, chan<- []aws.CNIEndpoint) error) { log.Infoln("Starting CNI event handler") rateLimiter := time.NewTicker(cniEventRateLimit) defer rateLimiter.Stop() - endpointCh := make(chan []string, 10) + endpointCh := make(chan []aws.CNIEndpoint, 10) go func() { err := informer(ctx, endpointCh) if err != nil { @@ -685,7 +688,8 @@ func cniEventHandler(ctx context.Context, targetCNIcfg *aws.TargetCNIconfig, } }() - var cniTargetGroupARNs, endpoints []string + var cniTargetGroupARNs []aws.TargetGroupWithLabels + var endpoints []aws.CNIEndpoint for { select { case <-ctx.Done(): diff --git a/worker_test.go b/worker_test.go index 17022a44..7d17366a 100644 --- a/worker_test.go +++ b/worker_test.go @@ -1025,21 +1025,22 @@ func TestDoWorkPanicReturnsProblem(t *testing.T) { func Test_cniEventHandler(t *testing.T) { t.Run("handles messages from channels and calls update functions", func(t *testing.T) { - targetCNIcfg := &aws.TargetCNIconfig{TargetGroupCh: make(chan []string, 10)} - targetCNIcfg.TargetGroupCh <- []string{"bar", "baz"} - targetCNIcfg.TargetGroupCh <- []string{"foo"} // flush + targetCNIcfg := &aws.TargetCNIconfig{TargetGroupCh: make(chan []aws.TargetGroupWithLabels, 10)} + targetCNIcfg.TargetGroupCh <- []aws.TargetGroupWithLabels{{ARN: "bar"}, {ARN: "baz"}} + targetCNIcfg.TargetGroupCh <- []aws.TargetGroupWithLabels{{ARN: "foo"}} // flush mutex := &sync.Mutex{} - var targetSet, cniTGARNs []string - mockTargetSetter := func(endpoints, cniTargetGroupARNs []string) error { + var targetSet []aws.CNIEndpoint + var cniTGARNs []aws.TargetGroupWithLabels + mockTargetSetter := func(endpoints []aws.CNIEndpoint, cniTargetGroupARNs []aws.TargetGroupWithLabels) error { mutex.Lock() targetSet = endpoints cniTGARNs = cniTargetGroupARNs mutex.Unlock() return nil } - mockInformer := func(_ context.Context, c chan<- []string) error { - c <- []string{"4.3.2.1", "4.3.2.1"} - c <- []string{"1.2.3.4"} // flush + mockInformer := func(_ context.Context, c chan<- []aws.CNIEndpoint) error { + c <- []aws.CNIEndpoint{{IPAddress: "4.3.2.1"}, {IPAddress: "4.3.2.1"}} + c <- []aws.CNIEndpoint{{IPAddress: "1.2.3.4"}} // flush return nil } ctx, cl := context.WithCancel(context.Background()) @@ -1049,11 +1050,11 @@ func Test_cniEventHandler(t *testing.T) { require.Eventually(t, func() bool { mutex.Lock() defer mutex.Unlock() - return reflect.DeepEqual(targetSet, []string{"1.2.3.4"}) + return reflect.DeepEqual(targetSet, []aws.CNIEndpoint{{IPAddress: "1.2.3.4"}}) }, wait.ForeverTestTimeout, time.Millisecond*100) require.Eventually(t, func() bool { - return reflect.DeepEqual(cniTGARNs, []string{"foo"}) + return reflect.DeepEqual(cniTGARNs, []aws.TargetGroupWithLabels{{ARN: "foo"}}) }, wait.ForeverTestTimeout, time.Millisecond*100) }) }