Skip to content

Commit

Permalink
Drop Node events when EC2 instance does not exist and node is not new
Browse files Browse the repository at this point in the history
  • Loading branch information
cartermckinnon committed Nov 30, 2023
1 parent 08ac6f0 commit 02a76b9
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 7 deletions.
19 changes: 19 additions & 0 deletions pkg/controllers/tagging/tagging_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ const (

// The label for depicting total number of errors a work item encounter and fail
errorsAfterRetriesExhaustedWorkItemErrorMetric = "errors_after_retries_exhausted"

// The period of time after Node creation to retry tagging due to eventual consistency of the CreateTags API.
newNodeEventualConsistencyGracePeriod = time.Minute * 5
)

// Controller is the controller implementation for tagging cluster resources.
Expand Down Expand Up @@ -292,6 +295,18 @@ func (tc *Controller) tagEc2Instance(node *v1.Node) error {
err := tc.cloud.TagResource(string(instanceID), tc.tags)

if err != nil {
if awsv1.IsAWSErrorInstanceNotFound(err) {
// This can happen for two reasons.
// 1. The CreateTags API is eventually consistent. In rare cases, a newly-created instance may not be taggable for a short period.
// We will re-queue the event and retry.
if isNodeWithinEventualConsistencyGracePeriod(node) {
return fmt.Errorf("EC2 instance %s for node %s does not exist, but node is within eventual consistency grace period", instanceID, node.GetName())
}
// 2. The event in our workQueue is stale, and the instance no longer exists.
// Tagging will never succeed, and the event should not be re-queued.
klog.Infof("Skip tagging since EC2 instance %s for node %s does not exist", instanceID, node.GetName())
return nil
}
klog.Errorf("Error in tagging EC2 instance %s for node %s, error: %v", instanceID, node.GetName(), err)
return err
}
Expand Down Expand Up @@ -380,3 +395,7 @@ func (tc *Controller) getChecksumOfTags() string {
sort.Strings(tags)
return fmt.Sprintf("%x", md5.Sum([]byte(strings.Join(tags, ","))))
}

func isNodeWithinEventualConsistencyGracePeriod(node *v1.Node) bool {
return time.Since(node.CreationTimestamp.Time) < newNodeEventualConsistencyGracePeriod
}
37 changes: 33 additions & 4 deletions pkg/controllers/tagging/tagging_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ import (
"bytes"
"context"
"flag"
"os"
"strings"
"testing"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
Expand All @@ -25,10 +30,6 @@ import (
"k8s.io/client-go/util/workqueue"
awsv1 "k8s.io/cloud-provider-aws/pkg/providers/v1"
"k8s.io/klog/v2"
"os"
"strings"
"testing"
"time"
)

const TestClusterID = "clusterid.test"
Expand Down Expand Up @@ -162,6 +163,34 @@ func Test_NodesJoiningAndLeaving(t *testing.T) {
toBeTagged: false,
expectedMessages: []string{"Successfully untagged i-0001"},
},
{
name: "node0 is recently created and the instance is not found the first 3 CreateTags attempts",
currNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Now(),
},
Spec: v1.NodeSpec{
ProviderID: "i-not-found-count-3-0001",
},
},
toBeTagged: true,
expectedMessages: []string{"Successfully tagged i-not-found-count-3", "node is within eventual consistency grace period"},
},
{
name: "node0 is not recently created and the instance is not found",
currNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Spec: v1.NodeSpec{
ProviderID: "i-not-found",
},
},
toBeTagged: true,
expectedMessages: []string{"Skip tagging since EC2 instance i-not-found for node node0 does not exist"},
},
}

awsServices := awsv1.NewFakeAWSServices(TestClusterID)
Expand Down
5 changes: 3 additions & 2 deletions pkg/providers/v1/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -1707,7 +1707,7 @@ func (c *Cloud) InstanceExistsByProviderID(ctx context.Context, providerID strin
instances, err := c.ec2.DescribeInstances(request)
if err != nil {
// if err is InstanceNotFound, return false with no error
if isAWSErrorInstanceNotFound(err) {
if IsAWSErrorInstanceNotFound(err) {
return false, nil
}
return false, err
Expand Down Expand Up @@ -1946,7 +1946,8 @@ func (c *Cloud) GetZoneByNodeName(ctx context.Context, nodeName types.NodeName)

}

func isAWSErrorInstanceNotFound(err error) bool {
// IsAWSErrorInstanceNotFound returns true if the specified error is an awserr.Error with the code `InvalidInstanceId.NotFound`.
func IsAWSErrorInstanceNotFound(err error) bool {
if err == nil {
return false
}
Expand Down
26 changes: 26 additions & 0 deletions pkg/providers/v1/aws_fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"sort"
"strconv"
"strings"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -47,6 +48,8 @@ type FakeAWSServices struct {
asg *FakeASG
metadata *FakeMetadata
kms *FakeKMS

callCounts map[string]int
}

// NewFakeAWSServices creates a new FakeAWSServices
Expand Down Expand Up @@ -79,6 +82,8 @@ func NewFakeAWSServices(clusterID string) *FakeAWSServices {
tag.Value = aws.String(clusterID)
selfInstance.Tags = []*ec2.Tag{&tag}

s.callCounts = make(map[string]int)

return s
}

Expand All @@ -97,6 +102,15 @@ func (s *FakeAWSServices) WithRegion(region string) *FakeAWSServices {
return s
}

// countCall increments the counter for the given service, api, and resourceID and returns the resulting call count
func (s *FakeAWSServices) countCall(service string, api string, resourceID string) int {
key := fmt.Sprintf("%s:%s:%s", service, api, resourceID)
s.callCounts[key]++
count := s.callCounts[key]
klog.Warningf("call count: %s:%d", key, count)
return count
}

// Compute returns a fake EC2 client
func (s *FakeAWSServices) Compute(region string) (EC2, error) {
return s.ec2, nil
Expand Down Expand Up @@ -295,13 +309,25 @@ func (ec2i *FakeEC2Impl) DescribeAvailabilityZones(request *ec2.DescribeAvailabi
// CreateTags is a mock for CreateTags from EC2
func (ec2i *FakeEC2Impl) CreateTags(input *ec2.CreateTagsInput) (*ec2.CreateTagsOutput, error) {
for _, id := range input.Resources {
callCount := ec2i.aws.countCall("CreateTags", *id)

Check failure on line 312 in pkg/providers/v1/aws_fakes.go

View workflow job for this annotation

GitHub Actions / govulncheck

not enough arguments in call to ec2i.aws.countCall
if *id == "i-error" {
return nil, errors.New("Unable to tag")
}

if *id == "i-not-found" {
return nil, awserr.New("InvalidInstanceID.NotFound", "Instance not found", nil)
}
// return an Instance not found error for the first `n` calls
// instance ID should be of the format `i-not-found-count-$N-$SUFFIX`
if strings.HasPrefix(*id, "i-not-found-count-") {
notFoundCount, err := strconv.Atoi(strings.Split(*id, "-")[4])
if err != nil {
panic(err)
}
if callCount < notFoundCount {
return nil, awserr.New("InvalidInstanceID.NotFound", "Instance not found", nil)
}
}
}
return &ec2.CreateTagsOutput{}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/providers/v1/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (c *Cloud) UntagResource(resourceID string, tags map[string]string) error {
if err != nil {
// An instance not found should not fail the untagging workflow as it
// would for tagging, since the target state is already reached.
if isAWSErrorInstanceNotFound(err) {
if IsAWSErrorInstanceNotFound(err) {
klog.Infof("Couldn't find resource when trying to untag it hence skipping it, %v", err)
return nil
}
Expand Down

0 comments on commit 02a76b9

Please sign in to comment.