From ed1453bf84f7e0db877f0dcc577e49c84c4c0b58 Mon Sep 17 00:00:00 2001 From: Cameron McAvoy Date: Fri, 1 Nov 2024 14:55:17 -0500 Subject: [PATCH] Expose nodeclaim disruption candidates through nodeclaim condition, add eviction message from condition Signed-off-by: Cameron McAvoy --- pkg/apis/v1/nodeclaim_status.go | 1 + pkg/controllers/disruption/controller.go | 58 +++++++++++++------ .../disruption/orchestration/queue.go | 1 + pkg/controllers/disruption/suite_test.go | 19 ++++++ .../node/termination/suite_test.go | 14 ++--- .../termination/terminator/events/events.go | 4 +- .../node/termination/terminator/eviction.go | 34 ++++++++--- .../node/termination/terminator/suite_test.go | 14 +++-- .../node/termination/terminator/terminator.go | 2 +- pkg/controllers/state/statenode.go | 33 +++++++++-- pkg/events/suite_test.go | 18 +++--- 11 files changed, 145 insertions(+), 53 deletions(-) diff --git a/pkg/apis/v1/nodeclaim_status.go b/pkg/apis/v1/nodeclaim_status.go index aca25a4e77..8970e50486 100644 --- a/pkg/apis/v1/nodeclaim_status.go +++ b/pkg/apis/v1/nodeclaim_status.go @@ -30,6 +30,7 @@ const ( ConditionTypeDrifted = "Drifted" ConditionTypeInstanceTerminating = "InstanceTerminating" ConditionTypeConsistentStateFound = "ConsistentStateFound" + ConditionTypeDisruptionReason = "DisruptionReason" ) // NodeClaimStatus defines the observed state of NodeClaim diff --git a/pkg/controllers/disruption/controller.go b/pkg/controllers/disruption/controller.go index ff34302f0f..76844d770d 100644 --- a/pkg/controllers/disruption/controller.go +++ b/pkg/controllers/disruption/controller.go @@ -27,6 +27,7 @@ import ( "github.com/awslabs/operatorpkg/singleton" "github.com/samber/lo" + "go.uber.org/multierr" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/utils/clock" @@ -36,9 +37,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" - nodepoolutils "sigs.k8s.io/karpenter/pkg/utils/nodepool" - "sigs.k8s.io/karpenter/pkg/utils/pretty" - v1 "sigs.k8s.io/karpenter/pkg/apis/v1" "sigs.k8s.io/karpenter/pkg/cloudprovider" "sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration" @@ -49,6 +47,8 @@ import ( "sigs.k8s.io/karpenter/pkg/metrics" "sigs.k8s.io/karpenter/pkg/operator/injection" operatorlogging "sigs.k8s.io/karpenter/pkg/operator/logging" + nodepoolutils "sigs.k8s.io/karpenter/pkg/utils/nodepool" + "sigs.k8s.io/karpenter/pkg/utils/pretty" ) type Controller struct { @@ -124,14 +124,21 @@ func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) { // Karpenter taints nodes with a karpenter.sh/disruption taint as part of the disruption process while it progresses in memory. // If Karpenter restarts or fails with an error during a disruption action, some nodes can be left tainted. // Idempotently remove this taint from candidates that are not in the orchestration queue before continuing. - if err := state.RequireNoScheduleTaint(ctx, c.kubeClient, false, lo.Filter(c.cluster.Nodes(), func(s *state.StateNode, _ int) bool { - return !c.queue.HasAny(s.ProviderID()) - })...); err != nil { + outdatedNodes := lo.Filter(c.cluster.Nodes(), func(s *state.StateNode, _ int) bool { + return !c.queue.HasAny(s.ProviderID()) && !s.Deleted() + }) + if err := state.RequireNoScheduleTaint(ctx, c.kubeClient, false, outdatedNodes...); err != nil { if errors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } return reconcile.Result{}, fmt.Errorf("removing taint %s from nodes, %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err) } + if err := state.ClearNodeClaimsCondition(ctx, c.kubeClient, v1.ConditionTypeDisruptionReason, outdatedNodes...); err != nil { + if errors.IsConflict(err) { + return reconcile.Result{Requeue: true}, nil + } + return reconcile.Result{}, fmt.Errorf("removing %s condition from nodeclaims, %w", v1.ConditionTypeDisruptionReason, err) + } // Attempt different disruption methods. We'll only let one method perform an action for _, m := range c.methods { @@ -197,12 +204,9 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command, commandID := uuid.NewUUID() log.FromContext(ctx).WithValues("command-id", commandID, "reason", strings.ToLower(string(m.Reason()))).Info(fmt.Sprintf("disrupting nodeclaim(s) via %s", cmd)) - stateNodes := lo.Map(cmd.candidates, func(c *Candidate, _ int) *state.StateNode { - return c.StateNode - }) // Cordon the old nodes before we launch the replacements to prevent new pods from scheduling to the old nodes - if err := state.RequireNoScheduleTaint(ctx, c.kubeClient, true, stateNodes...); err != nil { - return fmt.Errorf("tainting nodes with %s (command-id: %s), %w", pretty.Taint(v1.DisruptedNoScheduleTaint), commandID, err) + if err := c.MarkDisrupted(ctx, m, cmd.candidates...); err != nil { + return fmt.Errorf("marking disrupted (command-id: %s), %w", commandID, err) } var nodeClaimNames []string @@ -226,12 +230,9 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command, // the node is cleaned up. schedulingResults.Record(log.IntoContext(ctx, operatorlogging.NopLogger), c.recorder, c.cluster) - providerIDs := lo.Map(cmd.candidates, func(c *Candidate, _ int) string { return c.ProviderID() }) - // We have the new NodeClaims created at the API server so mark the old NodeClaims for deletion - c.cluster.MarkForDeletion(providerIDs...) - - if err = c.queue.Add(orchestration.NewCommand(nodeClaimNames, - lo.Map(cmd.candidates, func(c *Candidate, _ int) *state.StateNode { return c.StateNode }), commandID, m.Reason(), m.ConsolidationType())); err != nil { + statenodes := lo.Map(cmd.candidates, func(c *Candidate, _ int) *state.StateNode { return c.StateNode }) + if err := c.queue.Add(orchestration.NewCommand(nodeClaimNames, statenodes, commandID, m.Reason(), m.ConsolidationType())); err != nil { + providerIDs := lo.Map(cmd.candidates, func(c *Candidate, _ int) string { return c.ProviderID() }) c.cluster.UnmarkForDeletion(providerIDs...) return fmt.Errorf("adding command to queue (command-id: %s), %w", commandID, err) } @@ -258,6 +259,29 @@ func (c *Controller) createReplacementNodeClaims(ctx context.Context, m Method, return nodeClaimNames, nil } +func (c *Controller) MarkDisrupted(ctx context.Context, m Method, candidates ...*Candidate) error { + stateNodes := lo.Map(candidates, func(c *Candidate, _ int) *state.StateNode { + return c.StateNode + }) + if err := state.RequireNoScheduleTaint(ctx, c.kubeClient, true, stateNodes...); err != nil { + return fmt.Errorf("tainting nodes with %s: %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err) + } + + providerIDs := lo.Map(candidates, func(c *Candidate, _ int) string { return c.ProviderID() }) + c.cluster.MarkForDeletion(providerIDs...) + + return multierr.Combine(lo.Map(candidates, func(candidate *Candidate, _ int) error { + // refresh nodeclaim before updating status + nodeClaim := &v1.NodeClaim{} + + if err := c.kubeClient.Get(ctx, client.ObjectKeyFromObject(candidate.NodeClaim), nodeClaim); err != nil { + return client.IgnoreNotFound(err) + } + nodeClaim.StatusConditions().SetTrueWithReason(v1.ConditionTypeDisruptionReason, v1.ConditionTypeDisruptionReason, string(m.Reason())) + return client.IgnoreNotFound(c.kubeClient.Status().Update(ctx, nodeClaim)) + })...) +} + func (c *Controller) recordRun(s string) { c.mu.Lock() defer c.mu.Unlock() diff --git a/pkg/controllers/disruption/orchestration/queue.go b/pkg/controllers/disruption/orchestration/queue.go index 1cf800c143..31a4ed6639 100644 --- a/pkg/controllers/disruption/orchestration/queue.go +++ b/pkg/controllers/disruption/orchestration/queue.go @@ -200,6 +200,7 @@ func (q *Queue) Reconcile(ctx context.Context) (reconcile.Result, error) { consolidationTypeLabel: cmd.consolidationType, }) multiErr := multierr.Combine(err, cmd.lastError, state.RequireNoScheduleTaint(ctx, q.kubeClient, false, cmd.candidates...)) + multiErr = multierr.Combine(multiErr, state.ClearNodeClaimsCondition(ctx, q.kubeClient, v1.ConditionTypeDisruptionReason, cmd.candidates...)) // Log the error log.FromContext(ctx).WithValues("nodes", strings.Join(lo.Map(cmd.candidates, func(s *state.StateNode, _ int) string { return s.Name() diff --git a/pkg/controllers/disruption/suite_test.go b/pkg/controllers/disruption/suite_test.go index 8deea114de..5afbf1202a 100644 --- a/pkg/controllers/disruption/suite_test.go +++ b/pkg/controllers/disruption/suite_test.go @@ -534,6 +534,7 @@ var _ = Describe("Disruption Taints", func() { }) nodePool.Spec.Disruption.ConsolidateAfter = v1.MustParseNillableDuration("Never") node.Spec.Taints = append(node.Spec.Taints, v1.DisruptedNoScheduleTaint) + nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeDisruptionReason) ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node, pod) ExpectManualBinding(ctx, env.Client, pod, node) @@ -542,6 +543,12 @@ var _ = Describe("Disruption Taints", func() { ExpectSingletonReconciled(ctx, disruptionController) node = ExpectNodeExists(ctx, env.Client, node.Name) Expect(node.Spec.Taints).ToNot(ContainElement(v1.DisruptedNoScheduleTaint)) + + nodeClaims := lo.Filter(ExpectNodeClaims(ctx, env.Client), func(nc *v1.NodeClaim, _ int) bool { + return nc.Status.ProviderID == node.Spec.ProviderID + }) + Expect(nodeClaims).To(HaveLen(1)) + Expect(nodeClaims[0].StatusConditions().Get(v1.ConditionTypeDisruptionReason)).To(BeNil()) }) It("should add and remove taints from NodeClaims that fail to disrupt", func() { nodePool.Spec.Disruption.ConsolidationPolicy = v1.ConsolidationPolicyWhenEmptyOrUnderutilized @@ -579,6 +586,12 @@ var _ = Describe("Disruption Taints", func() { node = ExpectNodeExists(ctx, env.Client, node.Name) Expect(node.Spec.Taints).To(ContainElement(v1.DisruptedNoScheduleTaint)) + nodeClaims := lo.Filter(ExpectNodeClaims(ctx, env.Client), func(nc *v1.NodeClaim, _ int) bool { + return nc.Status.ProviderID == node.Spec.ProviderID + }) + Expect(nodeClaims).To(HaveLen(1)) + Expect(nodeClaims[0].StatusConditions().Get(v1.ConditionTypeDisruptionReason)).ToNot(BeNil()) + Expect(nodeClaims[0].StatusConditions().Get(v1.ConditionTypeDisruptionReason).IsTrue()).To(BeTrue()) createdNodeClaim := lo.Reject(ExpectNodeClaims(ctx, env.Client), func(nc *v1.NodeClaim, _ int) bool { return nc.Name == nodeClaim.Name @@ -595,6 +608,12 @@ var _ = Describe("Disruption Taints", func() { node = ExpectNodeExists(ctx, env.Client, node.Name) Expect(node.Spec.Taints).ToNot(ContainElement(v1.DisruptedNoScheduleTaint)) + + nodeClaims = lo.Filter(ExpectNodeClaims(ctx, env.Client), func(nc *v1.NodeClaim, _ int) bool { + return nc.Status.ProviderID == node.Spec.ProviderID + }) + Expect(nodeClaims).To(HaveLen(1)) + Expect(nodeClaims[0].StatusConditions().Get(v1.ConditionTypeDisruptionReason)).To(BeNil()) }) }) diff --git a/pkg/controllers/node/termination/suite_test.go b/pkg/controllers/node/termination/suite_test.go index 5758579227..2382627ebf 100644 --- a/pkg/controllers/node/termination/suite_test.go +++ b/pkg/controllers/node/termination/suite_test.go @@ -203,7 +203,7 @@ var _ = Describe("Termination", func() { Expect(env.Client.Delete(ctx, node)).To(Succeed()) node = ExpectNodeExists(ctx, env.Client, node.Name) ExpectObjectReconciled(ctx, env.Client, terminationController, node) - Expect(queue.Has(podSkip)).To(BeFalse()) + Expect(queue.Has(node, podSkip)).To(BeFalse()) ExpectSingletonReconciled(ctx, queue) // Expect node to exist and be draining @@ -233,7 +233,7 @@ var _ = Describe("Termination", func() { Expect(env.Client.Delete(ctx, node)).To(Succeed()) node = ExpectNodeExists(ctx, env.Client, node.Name) ExpectObjectReconciled(ctx, env.Client, terminationController, node) - Expect(queue.Has(podSkip)).To(BeFalse()) + Expect(queue.Has(node, podSkip)).To(BeFalse()) ExpectSingletonReconciled(ctx, queue) // Expect node to exist and be draining @@ -243,7 +243,7 @@ var _ = Describe("Termination", func() { EventuallyExpectTerminating(ctx, env.Client, podEvict) ExpectDeleted(ctx, env.Client, podEvict) - Expect(queue.Has(podSkip)).To(BeFalse()) + Expect(queue.Has(node, podSkip)).To(BeFalse()) // Reconcile to delete node node = ExpectNodeExists(ctx, env.Client, node.Name) @@ -357,13 +357,13 @@ var _ = Describe("Termination", func() { ExpectNodeWithNodeClaimDraining(env.Client, node.Name) // Expect podNoEvict to be added to the queue - Expect(queue.Has(podNoEvict)).To(BeTrue()) + Expect(queue.Has(node, podNoEvict)).To(BeTrue()) // Attempt to evict the pod, but fail to do so ExpectSingletonReconciled(ctx, queue) // Expect podNoEvict to fail eviction due to PDB, and be retried - Expect(queue.Has(podNoEvict)).To(BeTrue()) + Expect(queue.Has(node, podNoEvict)).To(BeTrue()) // Delete pod to simulate successful eviction ExpectDeleted(ctx, env.Client, podNoEvict) @@ -507,7 +507,7 @@ var _ = Describe("Termination", func() { ExpectSingletonReconciled(ctx, queue) // Expect mirror pod to not be queued for eviction - Expect(queue.Has(podNoEvict)).To(BeFalse()) + Expect(queue.Has(node, podNoEvict)).To(BeFalse()) // Expect podEvict to be enqueued for eviction then be successful EventuallyExpectTerminating(ctx, env.Client, podEvict) @@ -681,7 +681,7 @@ var _ = Describe("Termination", func() { ExpectObjectReconciled(ctx, env.Client, terminationController, node) // Expect that the old pod's key still exists in the queue - Expect(queue.Has(pod)).To(BeTrue()) + Expect(queue.Has(node, pod)).To(BeTrue()) // Re-create the pod and node, it should now have the same name, but a different UUID node = test.Node(test.NodeOptions{ diff --git a/pkg/controllers/node/termination/terminator/events/events.go b/pkg/controllers/node/termination/terminator/events/events.go index d626173d56..0e790978f3 100644 --- a/pkg/controllers/node/termination/terminator/events/events.go +++ b/pkg/controllers/node/termination/terminator/events/events.go @@ -26,12 +26,12 @@ import ( "sigs.k8s.io/karpenter/pkg/events" ) -func EvictPod(pod *corev1.Pod) events.Event { +func EvictPod(pod *corev1.Pod, message string) events.Event { return events.Event{ InvolvedObject: pod, Type: corev1.EventTypeNormal, Reason: "Evicted", - Message: "Evicted pod", + Message: "Evicted pod: " + message, DedupeValues: []string{pod.Name}, } } diff --git a/pkg/controllers/node/termination/terminator/eviction.go b/pkg/controllers/node/termination/terminator/eviction.go index 4401837067..e6cf60971a 100644 --- a/pkg/controllers/node/termination/terminator/eviction.go +++ b/pkg/controllers/node/termination/terminator/eviction.go @@ -40,9 +40,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" terminatorevents "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator/events" "sigs.k8s.io/karpenter/pkg/events" "sigs.k8s.io/karpenter/pkg/operator/injection" + "sigs.k8s.io/karpenter/pkg/utils/node" ) const ( @@ -68,13 +70,15 @@ func IsNodeDrainError(err error) bool { type QueueKey struct { types.NamespacedName - UID types.UID + UID types.UID + providerID string } -func NewQueueKey(pod *corev1.Pod) QueueKey { +func NewQueueKey(pod *corev1.Pod, providerID string) QueueKey { return QueueKey{ NamespacedName: client.ObjectKeyFromObject(pod), UID: pod.UID, + providerID: providerID, } } @@ -118,12 +122,12 @@ func (q *Queue) Register(_ context.Context, m manager.Manager) error { } // Add adds pods to the Queue -func (q *Queue) Add(pods ...*corev1.Pod) { +func (q *Queue) Add(node *corev1.Node, pods ...*corev1.Pod) { q.mu.Lock() defer q.mu.Unlock() for _, pod := range pods { - qk := NewQueueKey(pod) + qk := NewQueueKey(pod, node.Spec.ProviderID) if !q.set.Has(qk) { q.set.Insert(qk) q.TypedRateLimitingInterface.Add(qk) @@ -131,11 +135,11 @@ func (q *Queue) Add(pods ...*corev1.Pod) { } } -func (q *Queue) Has(pod *corev1.Pod) bool { +func (q *Queue) Has(node *corev1.Node, pod *corev1.Pod) bool { q.mu.Lock() defer q.mu.Unlock() - return q.set.Has(NewQueueKey(pod)) + return q.set.Has(NewQueueKey(pod, node.Spec.ProviderID)) } func (q *Queue) Reconcile(ctx context.Context) (reconcile.Result, error) { @@ -171,6 +175,10 @@ func (q *Queue) Reconcile(ctx context.Context) (reconcile.Result, error) { // Evict returns true if successful eviction call, and false if there was an eviction-related error func (q *Queue) Evict(ctx context.Context, key QueueKey) bool { ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Pod", klog.KRef(key.Namespace, key.Name))) + evictionMessage, err := evictionReason(ctx, key, q.kubeClient) + if err != nil { + log.FromContext(ctx).V(1).Error(err, "failed looking up pod eviction reason") + } if err := q.kubeClient.SubResource("eviction").Create(ctx, &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: key.Namespace, Name: key.Name}}, &policyv1.Eviction{ @@ -205,6 +213,18 @@ func (q *Queue) Evict(ctx context.Context, key QueueKey) bool { return false } NodesEvictionRequestsTotal.Inc(map[string]string{CodeLabel: "200"}) - q.recorder.Publish(terminatorevents.EvictPod(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}})) + q.recorder.Publish(terminatorevents.EvictPod(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}}, evictionMessage)) return true } + +func evictionReason(ctx context.Context, key QueueKey, kubeClient client.Client) (string, error) { + nodeClaim, err := node.NodeClaimForNode(ctx, kubeClient, &corev1.Node{Spec: corev1.NodeSpec{ProviderID: key.providerID}}) + if err != nil { + return "", err + } + terminationCondition := nodeClaim.StatusConditions().Get(v1.ConditionTypeDisruptionReason) + if terminationCondition.IsTrue() { + return terminationCondition.Message, nil + } + return "Forceful Termination", nil +} diff --git a/pkg/controllers/node/termination/terminator/suite_test.go b/pkg/controllers/node/termination/terminator/suite_test.go index 2b0e3a72d4..39bd3ce16b 100644 --- a/pkg/controllers/node/termination/terminator/suite_test.go +++ b/pkg/controllers/node/termination/terminator/suite_test.go @@ -48,6 +48,7 @@ var recorder *test.EventRecorder var queue *terminator.Queue var pdb *policyv1.PodDisruptionBudget var pod *corev1.Pod +var node *corev1.Node var fakeClock *clock.FakeClock var terminatorInstance *terminator.Terminator @@ -92,12 +93,13 @@ var _ = Describe("Eviction/Queue", func() { Labels: testLabels, }, }) + node = test.Node(test.NodeOptions{ProviderID: "123456789"}) terminator.NodesEvictionRequestsTotal.Reset() }) Context("Eviction API", func() { It("should succeed with no event when the pod is not found", func() { - Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeTrue()) + Expect(queue.Evict(ctx, terminator.NewQueueKey(pod, node.Spec.ProviderID))).To(BeTrue()) Expect(recorder.Events()).To(HaveLen(0)) }) It("should succeed with no event when the pod UID conflicts", func() { @@ -108,7 +110,7 @@ var _ = Describe("Eviction/Queue", func() { }) It("should succeed with an evicted event when there are no PDBs", func() { ExpectApplied(ctx, env.Client, pod) - Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeTrue()) + Expect(queue.Evict(ctx, terminator.NewQueueKey(pod, node.Spec.ProviderID))).To(BeTrue()) ExpectMetricCounterValue(terminator.NodesEvictionRequestsTotal, 1, map[string]string{terminator.CodeLabel: "200"}) Expect(recorder.Calls("Evicted")).To(Equal(1)) }) @@ -118,12 +120,12 @@ var _ = Describe("Eviction/Queue", func() { MaxUnavailable: &intstr.IntOrString{IntVal: 1}, }) ExpectApplied(ctx, env.Client, pod) - Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeTrue()) + Expect(queue.Evict(ctx, terminator.NewQueueKey(pod, node.Spec.ProviderID))).To(BeTrue()) Expect(recorder.Calls("Evicted")).To(Equal(1)) }) It("should return a NodeDrainError event when a PDB is blocking", func() { ExpectApplied(ctx, env.Client, pdb, pod) - Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeFalse()) + Expect(queue.Evict(ctx, terminator.NewQueueKey(pod, node.Spec.ProviderID))).To(BeFalse()) Expect(recorder.Calls("FailedDraining")).To(Equal(1)) }) It("should fail when two PDBs refer to the same pod", func() { @@ -132,7 +134,7 @@ var _ = Describe("Eviction/Queue", func() { MaxUnavailable: &intstr.IntOrString{IntVal: 0}, }) ExpectApplied(ctx, env.Client, pdb, pdb2, pod) - Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeFalse()) + Expect(queue.Evict(ctx, terminator.NewQueueKey(pod, node.Spec.ProviderID))).To(BeFalse()) ExpectMetricCounterValue(terminator.NodesEvictionRequestsTotal, 1, map[string]string{terminator.CodeLabel: "500"}) }) It("should ensure that calling Evict() is valid while making Add() calls", func() { @@ -158,7 +160,7 @@ var _ = Describe("Eviction/Queue", func() { // Ensure that we add enough pods to the queue while we are pulling items off of the queue (enough to trigger a DATA RACE) for i := 0; i < 10000; i++ { - queue.Add(test.Pod()) + queue.Add(node, test.Pod()) } }) }) diff --git a/pkg/controllers/node/termination/terminator/terminator.go b/pkg/controllers/node/termination/terminator/terminator.go index 82940aa8ef..bedab8a6d2 100644 --- a/pkg/controllers/node/termination/terminator/terminator.go +++ b/pkg/controllers/node/termination/terminator/terminator.go @@ -109,7 +109,7 @@ func (t *Terminator) Drain(ctx context.Context, node *corev1.Node, nodeGracePeri for _, group := range podGroups { if len(group) > 0 { // Only add pods to the eviction queue that haven't been evicted yet - t.evictionQueue.Add(lo.Filter(group, func(p *corev1.Pod, _ int) bool { return podutil.IsEvictable(p) })...) + t.evictionQueue.Add(node, lo.Filter(group, func(p *corev1.Pod, _ int) bool { return podutil.IsEvictable(p) })...) return NewNodeDrainError(fmt.Errorf("%d pods are waiting to be evicted", lo.SumBy(podGroups, func(pods []*corev1.Pod) int { return len(pods) }))) } } diff --git a/pkg/controllers/state/statenode.go b/pkg/controllers/state/statenode.go index 08c8b51694..018081a6f6 100644 --- a/pkg/controllers/state/statenode.go +++ b/pkg/controllers/state/statenode.go @@ -18,7 +18,7 @@ package state import ( "context" - "errors" + stderrors "errors" "fmt" "time" @@ -52,7 +52,7 @@ func IsPodBlockEvictionError(err error) bool { return false } var podBlockEvictionError *PodBlockEvictionError - return errors.As(err, &podBlockEvictionError) + return stderrors.As(err, &podBlockEvictionError) } //go:generate controller-gen object:headerFile="../../../hack/boilerplate.go.txt" paths="." @@ -391,8 +391,11 @@ func (in *StateNode) MarkedForDeletion() bool { // 1. The Node has MarkedForDeletion set // 2. The Node has a NodeClaim counterpart and is actively deleting (or the nodeclaim is marked as terminating) // 3. The Node has no NodeClaim counterpart and is actively deleting - return in.markedForDeletion || - (in.NodeClaim != nil && (!in.NodeClaim.DeletionTimestamp.IsZero() || in.NodeClaim.StatusConditions().Get(v1.ConditionTypeInstanceTerminating).IsTrue())) || + return in.markedForDeletion || in.Deleted() +} + +func (in *StateNode) Deleted() bool { + return (in.NodeClaim != nil && (!in.NodeClaim.DeletionTimestamp.IsZero() || in.NodeClaim.StatusConditions().Get(v1.ConditionTypeInstanceTerminating).IsTrue())) || (in.Node != nil && in.NodeClaim == nil && !in.Node.DeletionTimestamp.IsZero()) } @@ -495,3 +498,25 @@ func RequireNoScheduleTaint(ctx context.Context, kubeClient client.Client, addTa } return multiErr } + +// ClearNodeClaimsCondition will remove the conditionType from the NodeClaim status of the provided statenodes +func ClearNodeClaimsCondition(ctx context.Context, kubeClient client.Client, conditionType string, nodes ...*StateNode) error { + return multierr.Combine(lo.Map(nodes, func(s *StateNode, _ int) error { + if !s.Initialized() || s.NodeClaim == nil { + return nil + } + nodeClaim := &v1.NodeClaim{} + if err := kubeClient.Get(ctx, client.ObjectKeyFromObject(s.NodeClaim), nodeClaim); err != nil { + return client.IgnoreNotFound(err) + } + stored := nodeClaim.DeepCopy() + _ = nodeClaim.StatusConditions().Clear(conditionType) + + if !equality.Semantic.DeepEqual(stored, nodeClaim) { + if err := kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { + return client.IgnoreNotFound(err) + } + } + return nil + })...) +} diff --git a/pkg/events/suite_test.go b/pkg/events/suite_test.go index fec8d1462a..076862a690 100644 --- a/pkg/events/suite_test.go +++ b/pkg/events/suite_test.go @@ -88,8 +88,8 @@ var _ = Describe("Event Creation", func() { Expect(internalRecorder.Calls(schedulingevents.NominatePodEvent(PodWithUID(), NodeWithUID(), NodeClaimWithUID()).Reason)).To(Equal(1)) }) It("should create a EvictPod event", func() { - eventRecorder.Publish(terminatorevents.EvictPod(PodWithUID())) - Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(1)) + eventRecorder.Publish(terminatorevents.EvictPod(PodWithUID(), "")) + Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID(), "").Reason)).To(Equal(1)) }) It("should create a PodFailedToSchedule event", func() { eventRecorder.Publish(schedulingevents.PodFailedToScheduleEvent(PodWithUID(), fmt.Errorf(""))) @@ -105,31 +105,31 @@ var _ = Describe("Dedupe", func() { It("should only create a single event when many events are created quickly", func() { pod := PodWithUID() for i := 0; i < 100; i++ { - eventRecorder.Publish(terminatorevents.EvictPod(pod)) + eventRecorder.Publish(terminatorevents.EvictPod(pod, "")) } - Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(1)) + Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID(), "").Reason)).To(Equal(1)) }) It("should allow the dedupe timeout to be overridden", func() { pod := PodWithUID() - evt := terminatorevents.EvictPod(pod) + evt := terminatorevents.EvictPod(pod, "") evt.DedupeTimeout = time.Second * 2 // Generate a set of events within the dedupe timeout for i := 0; i < 10; i++ { eventRecorder.Publish(evt) } - Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(1)) + Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID(), "").Reason)).To(Equal(1)) // Wait until after the overridden dedupe timeout time.Sleep(time.Second * 3) eventRecorder.Publish(evt) - Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(2)) + Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID(), "").Reason)).To(Equal(2)) }) It("should allow events with different entities to be created", func() { for i := 0; i < 100; i++ { - eventRecorder.Publish(terminatorevents.EvictPod(PodWithUID())) + eventRecorder.Publish(terminatorevents.EvictPod(PodWithUID(), "")) } - Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(100)) + Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID(), "").Reason)).To(Equal(100)) }) })