Skip to content

Commit

Permalink
feat: Support extra listeners on NLBs
Browse files Browse the repository at this point in the history
The `zalando.org/aws-nlb-extra-listeners` annotation accepts a JSON
string that describes a list of extra listen/target ports to add to an
NLB. These will be routed to pods matching a specific label in the same
namespace as the ingress. As such, this depends on the AWS CNI mode
feature.

Signed-off-by: Jeremy Huntwork <[email protected]>
  • Loading branch information
jhuntwork committed Oct 6, 2022
1 parent cc59821 commit 9ef95a9
Show file tree
Hide file tree
Showing 16 changed files with 618 additions and 121 deletions.
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ Overview of configuration which can be set via Ingress annotations.
|`zalando.org/aws-load-balancer-ssl-policy`|`string`|`ELBSecurityPolicy-2016-08`|
|`zalando.org/aws-load-balancer-type`| `nlb` \| `alb`|`alb`|
|`zalando.org/aws-load-balancer-http2`| `true` \| `false`|`true`|
|[`zalando.org/aws-nlb-extra-listeners`](#extra-listen-ports)|`string`|N/A|
|`zalando.org/aws-waf-web-acl-id` | `string` | N/A |
|`kubernetes.io/ingress.class`|`string`|N/A|

Expand Down Expand Up @@ -664,6 +665,29 @@ In *AWS CNI Mode* (`target-access-mode=AWSCNI`) the controller actively manages
| `AWSCNI` | `false` | `true` | PodIP != HostIP: limited scaling and host bound |
| `AWSCNI` | `false` | `false` | free scaling, pod VPC CNI IP used |

## Advanced Options for NLBs

### Extra Listen Ports

Some real world scenarios may require non-standard TCP or UDP ingresses. The `zalando.org/aws-nlb-extra-listeners`
annotation allows you to specify a list of additional listeners to add to your NLB. The value of the annotation should
be a valid JSON string of the following format.

```json
[
{
"protocol": "TCP",
"listenport": 22,
"targetport": 2222,
"podlabel": "application=ssh-service"
}
]
```

The `podlabel` value is used to register targets in the target group associated with the listener. This depends on the
AWS CNI Mode feature, where individual pods receive accessible IP addresses. The value is used to identify pods running
in the same namespace as the ingress that will receive traffic from the load balancer.

## Trying it out

The Ingress Controller's responsibility is limited to managing load balancers, as described above. To have a fully
Expand Down
113 changes: 85 additions & 28 deletions aws/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,19 @@ type Adapter struct {

type TargetCNIconfig struct {
Enabled bool
TargetGroupCh chan []string
TargetGroupCh chan []TargetGroupWithLabels
}

type TargetGroupWithLabels struct {
ARN string
PodNamespace string
PodLabel string
}
type CNIEndpoint struct {
IPAddress string
Namespace string
Podlabel string
}
type manifest struct {
securityGroup *securityGroupDetails
instance *instanceDetails
Expand Down Expand Up @@ -233,7 +243,7 @@ func NewAdapter(clusterID, newControllerID, vpcID string, debug, disableInstrume
customFilter: DefaultCustomFilter,
TargetCNI: &TargetCNIconfig{
Enabled: false,
TargetGroupCh: make(chan []string, 10),
TargetGroupCh: make(chan []TargetGroupWithLabels, 10),
},
}

Expand Down Expand Up @@ -590,8 +600,33 @@ func (a *Adapter) UpdateTargetGroupsAndAutoScalingGroups(stacks []*Stack, proble
a.TargetCNI.TargetGroupCh <- targetTypesARNs[elbv2.TargetTypeEnumIp]
}

// run through any target groups with ALB targets and register all ALBs
for _, tg := range targetTypesARNs[elbv2.TargetTypeEnumAlb] {
albARNs := make([]string, 0, len(stacks))
for _, stack := range stacks {
if stack.LoadBalancerType == LoadBalancerTypeApplication {
albARNs = append(albARNs, stack.loadbalancerARN)
}
}
registeredTargets, err := a.getRegisteredTargets(tg.ARN)
if err != nil {
problems.Add("failed to get existing targets: %w", err)
}
if err := a.registerAndDeregister(albARNs, registeredTargets, tg.ARN); err != nil {
problems.Add("failed to update target registration %w", err)
}
}

// remove the IP TGs from the list keeping all other TGs including problematic #127 and nonexistent #436
targetGroupARNs := difference(allTargetGroupARNs, targetTypesARNs[elbv2.TargetTypeEnumIp])
var targetGroupARNs []string
for targetType, tgList := range targetTypesARNs {
if targetType == elbv2.TargetTypeEnumIp || targetType == elbv2.TargetTypeEnumAlb {
continue
}
for _, tg := range tgList {
targetGroupARNs = append(targetGroupARNs, tg.ARN)
}
}

ownerTags := map[string]string{
clusterIDTagPrefix + a.ClusterID(): resourceLifecycleOwned,
Expand Down Expand Up @@ -637,7 +672,7 @@ func (a *Adapter) UpdateTargetGroupsAndAutoScalingGroups(stacks []*Stack, proble
// All the required resources (listeners and target group) are created in a
// transactional fashion.
// Failure to create the stack causes it to be deleted automatically.
func (a *Adapter) CreateStack(certificateARNs []string, scheme, securityGroup, owner, sslPolicy, ipAddressType, wafWebACLID string, cwAlarms CloudWatchAlarmList, loadBalancerType string, http2 bool) (string, error) {
func (a *Adapter) CreateStack(certificateARNs []string, scheme, securityGroup, owner, sslPolicy, ipAddressType, wafWebACLID string, cwAlarms CloudWatchAlarmList, loadBalancerType string, http2 bool, extraListeners []ExtraListener) (string, error) {
certARNs := make(map[string]time.Time, len(certificateARNs))
for _, arn := range certificateARNs {
certARNs[arn] = time.Time{}
Expand Down Expand Up @@ -688,6 +723,7 @@ func (a *Adapter) CreateStack(certificateARNs []string, scheme, securityGroup, o
httpRedirectToHTTPS: a.httpRedirectToHTTPS,
nlbCrossZone: a.nlbCrossZone,
http2: http2,
extraListeners: extraListeners,
tags: a.stackTags,
internalDomains: a.internalDomains,
targetAccessModeCNI: a.TargetCNI.Enabled,
Expand All @@ -702,7 +738,7 @@ func (a *Adapter) CreateStack(certificateARNs []string, scheme, securityGroup, o
return createStack(a.cloudformation, spec)
}

func (a *Adapter) UpdateStack(stackName string, certificateARNs map[string]time.Time, scheme, securityGroup, owner, sslPolicy, ipAddressType, wafWebACLID string, cwAlarms CloudWatchAlarmList, loadBalancerType string, http2 bool) (string, error) {
func (a *Adapter) UpdateStack(stackName string, certificateARNs map[string]time.Time, scheme, securityGroup, owner, sslPolicy, ipAddressType, wafWebACLID string, cwAlarms CloudWatchAlarmList, loadBalancerType string, http2 bool, extraListeners []ExtraListener) (string, error) {
if _, ok := SSLPolicies[sslPolicy]; !ok {
return "", fmt.Errorf("invalid SSLPolicy '%s' defined", sslPolicy)
}
Expand Down Expand Up @@ -744,6 +780,7 @@ func (a *Adapter) UpdateStack(stackName string, certificateARNs map[string]time.
httpRedirectToHTTPS: a.httpRedirectToHTTPS,
nlbCrossZone: a.nlbCrossZone,
http2: http2,
extraListeners: extraListeners,
tags: a.stackTags,
internalDomains: a.internalDomains,
targetAccessModeCNI: a.TargetCNI.Enabled,
Expand Down Expand Up @@ -1037,36 +1074,56 @@ func nonTargetedASGs(ownedASGs, targetedASGs map[string]*autoScalingGroupDetails
return nonTargetedASGs
}

func (a *Adapter) getRegisteredTargets(tgARN string) ([]string, error) {
tgh, err := a.elbv2.DescribeTargetHealth(&elbv2.DescribeTargetHealthInput{TargetGroupArn: &tgARN})
if err != nil {
log.Errorf("unable to describe target health %v", err)
return []string{}, err
}
registeredTargets := make([]string, len(tgh.TargetHealthDescriptions))
for i, target := range tgh.TargetHealthDescriptions {
registeredTargets[i] = *target.Target.Id
}
return registeredTargets, nil
}

func (a *Adapter) registerAndDeregister(new []string, old []string, tgARN string) error {
toRegister := difference(new, old)
if len(toRegister) > 0 {
log.Info("Registering CNI targets: ", toRegister)
err := registerTargetsOnTargetGroups(a.elbv2, []string{tgARN}, toRegister)
if err != nil {
return err
}
}
toDeregister := difference(old, new)
if len(toDeregister) > 0 {
log.Info("Deregistering CNI targets: ", toDeregister)
err := deregisterTargetsOnTargetGroups(a.elbv2, []string{tgARN}, toDeregister)
if err != nil {
return err
}
}
return nil
}

// SetTargetsOnCNITargetGroups implements desired state for CNI target groups
// by polling the current list of targets thus creating a diff of what needs to be added and removed.
func (a *Adapter) SetTargetsOnCNITargetGroups(endpoints, cniTargetGroupARNs []string) error {
log.Debugf("setting targets on CNI target groups: '%v'", cniTargetGroupARNs)
for _, targetGroupARN := range cniTargetGroupARNs {
tgh, err := a.elbv2.DescribeTargetHealth(&elbv2.DescribeTargetHealthInput{TargetGroupArn: &targetGroupARN})
func (a *Adapter) SetTargetsOnCNITargetGroups(endpoints []CNIEndpoint, cniTargetGroups []TargetGroupWithLabels) error {
log.Debugf("setting targets on CNI target groups: '%v'", cniTargetGroups)
for _, targetGroup := range cniTargetGroups {
registeredTargets, err := a.getRegisteredTargets(targetGroup.ARN)
if err != nil {
log.Errorf("unable to describe target health %v", err)
// continue for processing of the rest of the target groups
continue
}
registeredInstances := make([]string, len(tgh.TargetHealthDescriptions))
for i, target := range tgh.TargetHealthDescriptions {
registeredInstances[i] = *target.Target.Id
}
toRegister := difference(endpoints, registeredInstances)
if len(toRegister) > 0 {
log.Info("Registering CNI targets: ", toRegister)
err := registerTargetsOnTargetGroups(a.elbv2, []string{targetGroupARN}, toRegister)
if err != nil {
return err
var matchingEndpoints []string
for _, endpoint := range endpoints {
if endpoint.Podlabel == targetGroup.PodLabel && endpoint.Namespace == targetGroup.PodNamespace {
matchingEndpoints = append(matchingEndpoints, endpoint.IPAddress)
}
}
toDeregister := difference(registeredInstances, endpoints)
if len(toDeregister) > 0 {
log.Info("Deregistering CNI targets: ", toDeregister)
err := deregisterTargetsOnTargetGroups(a.elbv2, []string{targetGroupARN}, toDeregister)
if err != nil {
return err
}
if err := a.registerAndDeregister(matchingEndpoints, registeredTargets, targetGroup.ARN); err != nil {
return err
}
}
return nil
Expand Down
95 changes: 90 additions & 5 deletions aws/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,8 +948,91 @@ func TestWithxlbHealthyThresholdCount(t *testing.T) {
})
}

func Test_getRegisteredTargets(t *testing.T) {
t.Run("should return an error if unable to describe target health", func(t *testing.T) {
m := &mockElbv2Client{
outputs: elbv2MockOutputs{
describeTargetHealth: &apiResponse{
response: elbv2.DescribeTargetHealthOutput{TargetHealthDescriptions: []*elbv2.TargetHealthDescription{}},
err: fmt.Errorf("error")},
registerTargets: R(mockDTOutput(), nil),
deregisterTargets: R(mockDTOutput(), nil),
},
}
a := Adapter{elbv2: m}
_, err := a.getRegisteredTargets("none")
require.Error(t, err)
})
t.Run("should a slice of strings representing the ids of found targets", func(t *testing.T) {
thOut := elbv2.DescribeTargetHealthOutput{TargetHealthDescriptions: []*elbv2.TargetHealthDescription{
{Target: &elbv2.TargetDescription{Id: aws.String("asg1")}},
{Target: &elbv2.TargetDescription{Id: aws.String("asg2")}},
{Target: &elbv2.TargetDescription{Id: aws.String("blah")}},
}}
expected := []string{"asg1", "asg2", "blah"}
m := &mockElbv2Client{
outputs: elbv2MockOutputs{
describeTargetHealth: &apiResponse{response: &thOut, err: nil},
registerTargets: R(mockDTOutput(), nil),
deregisterTargets: R(mockDTOutput(), nil),
},
}
a := Adapter{elbv2: m}
response, err := a.getRegisteredTargets("none")
require.Nil(t, err)
require.Equal(t, expected, response)
})
}

func Test_registerAndDeregister(t *testing.T) {
t.Run("should return an error if unable to register new targets", func(t *testing.T) {
m := &mockElbv2Client{
outputs: elbv2MockOutputs{
registerTargets: R(mockDTOutput(), fmt.Errorf("this is an error")),
deregisterTargets: R(mockDTOutput(), nil),
},
}
a := Adapter{elbv2: m}
err := a.registerAndDeregister([]string{"new"}, []string{"old"}, "none")
require.Error(t, err)
})
t.Run("should return an error if unable to deregister new targets", func(t *testing.T) {
m := &mockElbv2Client{
outputs: elbv2MockOutputs{
registerTargets: R(mockDTOutput(), nil),
deregisterTargets: R(mockDTOutput(), fmt.Errorf("this is an error")),
},
}
a := Adapter{elbv2: m}
err := a.registerAndDeregister([]string{"new"}, []string{"old"}, "none")
require.Error(t, err)
})
t.Run("should return nil if there's nothing to register", func(t *testing.T) {
m := &mockElbv2Client{
outputs: elbv2MockOutputs{
registerTargets: R(mockDTOutput(), fmt.Errorf("this is an error")),
deregisterTargets: R(mockDTOutput(), fmt.Errorf("this is also an error")),
},
}
a := Adapter{elbv2: m}
err := a.registerAndDeregister([]string{"same"}, []string{"same"}, "none")
require.Nil(t, err)
})
t.Run("should return nil if there's no upstream errors", func(t *testing.T) {
m := &mockElbv2Client{
outputs: elbv2MockOutputs{
registerTargets: R(mockDTOutput(), nil),
deregisterTargets: R(mockDTOutput(), nil),
},
}
a := Adapter{elbv2: m}
err := a.registerAndDeregister([]string{"new"}, []string{"old"}, "none")
require.Nil(t, err)
})
}

func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) {
tgARNs := []string{"asg1"}
tgARNs := []TargetGroupWithLabels{{ARN: "asg1"}}
thOut := elbv2.DescribeTargetHealthOutput{TargetHealthDescriptions: []*elbv2.TargetHealthDescription{}}
m := &mockElbv2Client{
outputs: elbv2MockOutputs{
Expand All @@ -961,7 +1044,7 @@ func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) {
a := &Adapter{elbv2: m, TargetCNI: &TargetCNIconfig{}}

t.Run("adding a new endpoint", func(t *testing.T) {
require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1"}, tgARNs))
require.NoError(t, a.SetTargetsOnCNITargetGroups([]CNIEndpoint{{IPAddress: "1.1.1.1"}}, tgARNs))
require.Equal(t, []*elbv2.RegisterTargetsInput{{
TargetGroupArn: aws.String("asg1"),
Targets: []*elbv2.TargetDescription{{Id: aws.String("1.1.1.1")}},
Expand All @@ -975,7 +1058,8 @@ func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) {
}
m.rtinputs, m.dtinputs = nil, nil

require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1", "2.2.2.2", "3.3.3.3"}, tgARNs))
require.NoError(t, a.SetTargetsOnCNITargetGroups(
[]CNIEndpoint{{IPAddress: "1.1.1.1"}, {IPAddress: "2.2.2.2"}, {IPAddress: "3.3.3.3"}}, tgARNs))
require.Equal(t, []*elbv2.TargetDescription{
{Id: aws.String("2.2.2.2")},
{Id: aws.String("3.3.3.3")},
Expand All @@ -991,7 +1075,7 @@ func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) {
}}
m.rtinputs, m.dtinputs = nil, nil

require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1", "3.3.3.3"}, tgARNs))
require.NoError(t, a.SetTargetsOnCNITargetGroups([]CNIEndpoint{{IPAddress: "1.1.1.1"}, {IPAddress: "3.3.3.3"}}, tgARNs))
require.Equal(t, []*elbv2.RegisterTargetsInput(nil), m.rtinputs)
require.Equal(t, []*elbv2.TargetDescription{{Id: aws.String("2.2.2.2")}}, m.dtinputs[0].Targets)
})
Expand All @@ -1004,7 +1088,8 @@ func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) {
}}
m.rtinputs, m.dtinputs = nil, nil

require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1", "2.2.2.2", "3.3.3.3"}, tgARNs))
require.NoError(t, a.SetTargetsOnCNITargetGroups(
[]CNIEndpoint{{IPAddress: "1.1.1.1"}, {IPAddress: "2.2.2.2"}, {IPAddress: "3.3.3.3"}}, tgARNs))
require.Equal(t, []*elbv2.TargetDescription{{Id: aws.String("3.3.3.3")}}, m.rtinputs[0].Targets)
require.Equal(t, []*elbv2.TargetDescription{{Id: aws.String("4.4.4.4")}}, m.dtinputs[0].Targets)
})
Expand Down
29 changes: 26 additions & 3 deletions aws/asg.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,16 +258,39 @@ func describeTargetGroups(elbv2svc elbv2iface.ELBV2API) (map[string]struct{}, er
}

// map the target group slice into specific types such as instance, ip, etc
func categorizeTargetTypeInstance(elbv2svc elbv2iface.ELBV2API, allTGARNs []string) (map[string][]string, error) {
targetTypes := make(map[string][]string)
func categorizeTargetTypeInstance(elbv2svc elbv2iface.ELBV2API, allTGARNs []string) (map[string][]TargetGroupWithLabels, error) {
targetTypes := make(map[string][]TargetGroupWithLabels)
err := elbv2svc.DescribeTargetGroupsPagesWithContext(context.TODO(), &elbv2.DescribeTargetGroupsInput{},
func(resp *elbv2.DescribeTargetGroupsOutput, lastPage bool) bool {
for _, tg := range resp.TargetGroups {
for _, v := range allTGARNs {
if v != aws.StringValue(tg.TargetGroupArn) {
continue
}
targetTypes[aws.StringValue(tg.TargetType)] = append(targetTypes[aws.StringValue(tg.TargetType)], aws.StringValue(tg.TargetGroupArn))
var podlabel, podnamespace string
log.Debugf("Looking for tags on %s", aws.StringValue(tg.TargetGroupArn))
out, err := elbv2svc.DescribeTags(&elbv2.DescribeTagsInput{ResourceArns: []*string{tg.TargetGroupArn}})
if err != nil {
log.Errorf("cannot describe tags on target group: %v", err)
} else {
for _, desc := range out.TagDescriptions {
for _, tag := range desc.Tags {
switch aws.StringValue(tag.Key) {
case podLabelTag:
podlabel = aws.StringValue(tag.Value)
case podNamespaceTag:
podnamespace = aws.StringValue(tag.Value)
}
}
}
}
log.Debugf("Adding tg with label: '%s' in namespace: '%s'", podlabel, podnamespace)
targetTypes[aws.StringValue(tg.TargetType)] = append(
targetTypes[aws.StringValue(tg.TargetType)],
TargetGroupWithLabels{
ARN: aws.StringValue(tg.TargetGroupArn),
PodLabel: podlabel,
PodNamespace: podnamespace})
}
}
return true
Expand Down
Loading

0 comments on commit 9ef95a9

Please sign in to comment.