Skip to content

Commit

Permalink
drain - controller implementation
Browse files Browse the repository at this point in the history
implement handling of drain state.

- add handler function for drain state
- use drain manager to handle drain requests
- handle NodeMaintenance updates/deletion during drain
- extend controller test to cover draining state

Signed-off-by: adrianc <[email protected]>
  • Loading branch information
adrianchiris committed Aug 21, 2024
1 parent c12570b commit b69752a
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 12 deletions.
4 changes: 3 additions & 1 deletion cmd/maintenance-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
maintenancev1alpha1 "github.com/Mellanox/maintenance-operator/api/v1alpha1"
"github.com/Mellanox/maintenance-operator/internal/controller"
"github.com/Mellanox/maintenance-operator/internal/cordon"
"github.com/Mellanox/maintenance-operator/internal/drain"
operatorlog "github.com/Mellanox/maintenance-operator/internal/log"
"github.com/Mellanox/maintenance-operator/internal/podcompletion"
"github.com/Mellanox/maintenance-operator/internal/scheduler"
Expand Down Expand Up @@ -144,6 +145,7 @@ func main() {
os.Exit(1)
}

ctx := ctrl.SetupSignalHandler()
mgrClient := mgr.GetClient()

nmrOptions := controller.NewNodeMaintenanceReconcilerOptions()
Expand All @@ -153,6 +155,7 @@ func main() {
Options: nmrOptions,
CordonHandler: cordon.NewCordonHandler(mgrClient, k8sInterface),
WaitPodCompletionHandler: podcompletion.NewPodCompletionHandler(mgrClient),
DrainManager: drain.NewManager(ctrl.Log.WithName("DrainManager"), ctx, k8sInterface),
}).SetupWithManager(mgr, ctrl.Log.WithName("NodeMaintenanceReconciler")); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "NodeMaintenance")
os.Exit(1)
Expand Down Expand Up @@ -191,7 +194,6 @@ func main() {
os.Exit(1)
}

ctx := ctrl.SetupSignalHandler()
// index fields in mgr cache

// pod spec.nodeName used in nodemaintenance controller.
Expand Down
11 changes: 11 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- pods/eviction
verbs:
- create
- delete
- get
- list
- patch
- update
- apiGroups:
- maintenance.nvidia.com
resources:
Expand Down
161 changes: 160 additions & 1 deletion internal/controller/nodemaintenance_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package controller
import (
"context"
"errors"
"fmt"
"reflect"
"sync"
"time"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
Expand All @@ -39,14 +42,17 @@ import (

maintenancev1 "github.com/Mellanox/maintenance-operator/api/v1alpha1"
"github.com/Mellanox/maintenance-operator/internal/cordon"
"github.com/Mellanox/maintenance-operator/internal/drain"
"github.com/Mellanox/maintenance-operator/internal/k8sutils"
operatorlog "github.com/Mellanox/maintenance-operator/internal/log"
"github.com/Mellanox/maintenance-operator/internal/podcompletion"
"github.com/Mellanox/maintenance-operator/internal/utils"
)

var (
defaultMaxNodeMaintenanceTime = 1600 * time.Second
waitPodCompletionRequeueTime = 10 * time.Second
drainReqeueTime = 10 * time.Second
)

const (
Expand Down Expand Up @@ -101,6 +107,7 @@ type NodeMaintenanceReconciler struct {
Options *NodeMaintenanceReconcilerOptions
CordonHandler cordon.Handler
WaitPodCompletionHandler podcompletion.Handler
DrainManager drain.Manager
}

//+kubebuilder:rbac:groups=maintenance.nvidia.com,resources=nodemaintenances,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -109,6 +116,7 @@ type NodeMaintenanceReconciler struct {
//+kubebuilder:rbac:groups="",resources=events,verbs=create
//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;update;patch
//+kubebuilder:rbac:groups="",resources=pods,verbs=get;watch;list;update;patch;delete
//+kubebuilder:rbac:groups="",resources=pods/eviction,verbs=create;get;list;update;patch;delete

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
Expand All @@ -123,6 +131,7 @@ func (r *NodeMaintenanceReconciler) Reconcile(ctx context.Context, req ctrl.Requ
// load any stored options
r.Options.Load()
reqLog.Info("loaded options", "maxNodeMaintenanceTime", r.Options.MaxNodeMaintenanceTime())
reqLog.Info("outstanding drain requests", "num", len(r.DrainManager.ListRequests()))

// get NodeMaintenance object
nm := &maintenancev1.NodeMaintenance{}
Expand Down Expand Up @@ -184,7 +193,10 @@ func (r *NodeMaintenanceReconciler) Reconcile(ctx context.Context, req ctrl.Requ
reqLog.Error(err, "failed to handle waitForPodCompletion state for NodeMaintenance object")
}
case maintenancev1.ConditionReasonDraining:
// TODO(adrianc): implement
res, err = r.handleDrainState(ctx, reqLog, nm, node)
if err != nil {
reqLog.Error(err, "failed to handle drain state for NodeMaintenance object")
}
case maintenancev1.ConditionReasonReady:
err = r.handleReadyState(ctx, reqLog, nm, node)
if err != nil {
Expand Down Expand Up @@ -340,6 +352,115 @@ func (r *NodeMaintenanceReconciler) handleWaitPodCompletionState(ctx context.Con
// we can can progress to next step for this NodeMaintenance
}

// update condition and send event
err = k8sutils.SetReadyConditionReason(ctx, r.Client, nm, maintenancev1.ConditionReasonDraining)
if err != nil {
reqLog.Error(err, "failed to update status for NodeMaintenance object")
return res, err
}

r.EventRecorder.Event(
nm, corev1.EventTypeNormal, maintenancev1.ConditionChangedEventType, maintenancev1.ConditionReasonDraining)

return res, nil
}

func (r *NodeMaintenanceReconciler) handleDrainState(ctx context.Context, reqLog logr.Logger, nm *maintenancev1.NodeMaintenance, node *corev1.Node) (ctrl.Result, error) {
reqLog.Info("Handle Draining NodeMaintenance")
var err error
var res ctrl.Result

if !nm.GetDeletionTimestamp().IsZero() {
// object is being deleted, handle cleanup.
reqLog.Info("NodeMaintenance object is deleting")

reqLog.Info("handle drain request removal")
drainReqUID := drain.DrainRequestUIDFromNodeMaintenance(nm)
req := r.DrainManager.GetRequest(drainReqUID)
if req != nil {
reqLog.Info("stopping and removing drain request", "reqUID", drainReqUID, "state", req.State())
}
r.DrainManager.RemoveRequest(drainReqUID)

if nm.Spec.Cordon {
reqLog.Info("handle uncordon of node, ", "node", node.Name)
err = r.CordonHandler.HandleUnCordon(ctx, reqLog, nm, node)
if err != nil {
return res, err
}
}

// TODO(adrianc): unpause MCP in OCP when support is added.

// remove finalizer if exists and return
reqLog.Info("NodeMaintenance object is deleting, removing maintenance finalizer", "namespace", nm.Namespace, "name", nm.Name)
err = k8sutils.RemoveFinalizer(ctx, r.Client, nm, maintenancev1.MaintenanceFinalizerName)
if err != nil {
reqLog.Error(err, "failed to remove finalizer for NodeMaintenance", "namespace", nm.Namespace, "name", nm.Name)
}
return res, err
}

if nm.Spec.DrainSpec != nil {
drainReqUID := drain.DrainRequestUIDFromNodeMaintenance(nm)
req := r.DrainManager.GetRequest(drainReqUID)
if req == nil {
reqLog.Info("sending new drain request")
req = r.DrainManager.NewDrainRequest(nm)
// reset and update initial drain status
nm.Status.Drain = nil
if err = r.updateDrainStatus(ctx, nm, req); err != nil {
return res, err
}
_ = r.DrainManager.AddRequest(req)
return ctrl.Result{Requeue: true, RequeueAfter: drainReqeueTime}, nil
}

reqLog.Info("drain request details", "uid", req.UID(), "state", req.State())

// handle update of drain spec
if !reflect.DeepEqual(req.Spec().Spec, *nm.Spec.DrainSpec) {
reqLog.Info("drain spec has changed, removing current request and requeue")
r.DrainManager.RemoveRequest(req.UID())
return ctrl.Result{Requeue: true, RequeueAfter: drainReqeueTime}, nil
}

if req.State() == drain.DrainStateInProgress {
// update progress and requeue
if err = r.updateDrainStatus(ctx, nm, req); err != nil {
return res, err
}
return ctrl.Result{Requeue: true, RequeueAfter: drainReqeueTime}, nil
}

// handle request in error state
if req.State() == drain.DrainStateError || req.State() == drain.DrainStateCanceled {
reqLog.Info("drain request error. removing current request and requeue", "state", req.State())
r.DrainManager.RemoveRequest(req.UID())
return ctrl.Result{Requeue: true, RequeueAfter: drainReqeueTime}, nil
}

// Drain completed successfully
reqLog.Info("drain completed successfully", "reqUID", req.UID(), "state", req.State())
if err = r.updateDrainStatus(ctx, nm, req); err != nil {
return res, err
}

r.DrainManager.RemoveRequest(req.UID())
} else {
// if nil, remove any pending requests for NodeMaintenance if present
r.DrainManager.RemoveRequest(drain.DrainRequestUIDFromNodeMaintenance(nm))
if nm.Status.Drain != nil {
// clear out drain status
nm.Status.Drain = nil
err = r.Client.Status().Update(ctx, nm)
if err != nil {
reqLog.Error(err, "failed to update drain status for NodeMaintenance object")
return res, err
}
}
}

// update condition and send event
err = k8sutils.SetReadyConditionReason(ctx, r.Client, nm, maintenancev1.ConditionReasonReady)
if err != nil {
Expand All @@ -353,6 +474,44 @@ func (r *NodeMaintenanceReconciler) handleWaitPodCompletionState(ctx context.Con
return res, nil
}

// updateDrainStatus updates NodeMaintenance drain status in place. returns error if occurred
func (r *NodeMaintenanceReconciler) updateDrainStatus(ctx context.Context, nm *maintenancev1.NodeMaintenance, drainReq drain.DrainRequest) error {
ds, err := drainReq.Status()
if err != nil {
return fmt.Errorf("failed to update drain status. %w", err)
}

if nm.Status.Drain == nil {
// set initial status
podsOnNode := &corev1.PodList{}
selectorFields := fields.OneTermEqualSelector("spec.nodeName", nm.Spec.NodeName)
err = r.Client.List(ctx, podsOnNode, &client.ListOptions{FieldSelector: selectorFields})
if err != nil {
return fmt.Errorf("failed to list pods. %w", err)
}

nm.Status.Drain = &maintenancev1.DrainStatus{
TotalPods: uint32(len(podsOnNode.Items)),
EvictionPods: uint32(len(ds.PodsToDelete)),
}
}

removedPods := utils.MaxInt(int(nm.Status.Drain.EvictionPods)-len(ds.PodsToDelete), 0)

nm.Status.Drain.DrainProgress = 100
if nm.Status.Drain.EvictionPods != 0 {
nm.Status.Drain.DrainProgress = uint32(float32(removedPods) / float32(nm.Status.Drain.EvictionPods) * 100)
}
nm.Status.Drain.WaitForEviction = ds.PodsToDelete

err = r.Client.Status().Update(ctx, nm)
if err != nil {
return fmt.Errorf("failed to update drain status for NodeMaintenance object. %w", err)
}

return nil
}

func (r *NodeMaintenanceReconciler) handleReadyState(ctx context.Context, reqLog logr.Logger, nm *maintenancev1.NodeMaintenance, node *corev1.Node) error {
reqLog.Info("Handle Ready NodeMaintenance")
// handle finalizers
Expand Down
43 changes: 33 additions & 10 deletions internal/controller/nodemaintenance_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

maintenancev1 "github.com/Mellanox/maintenance-operator/api/v1alpha1"
"github.com/Mellanox/maintenance-operator/internal/cordon"
"github.com/Mellanox/maintenance-operator/internal/drain"
"github.com/Mellanox/maintenance-operator/internal/k8sutils"
"github.com/Mellanox/maintenance-operator/internal/podcompletion"
"github.com/Mellanox/maintenance-operator/internal/testutils"
Expand Down Expand Up @@ -78,6 +79,8 @@ var _ = Describe("NodeMaintenance Controller", func() {
Options: options,
CordonHandler: cordon.NewCordonHandler(k8sClient, k8sInterface),
WaitPodCompletionHandler: podcompletion.NewPodCompletionHandler(k8sClient),
DrainManager: drain.NewManager(ctrllog.Log.WithName("DrainManager"),
testCtx, k8sInterface),
}

// setup reconciler with manager
Expand Down Expand Up @@ -167,14 +170,23 @@ var _ = Describe("NodeMaintenance Controller", func() {
It("Full lifecycle of NodeMaintenance", func() {
By("Create NodeMaintenance")
nm := testutils.GetTestNodeMaintenance("test-nm", "test-node-0", "some-operator.nvidia.com", "")
nm.Spec.WaitForPodCompletion = &maintenancev1.WaitForPodCompletionSpec{}
nm.Spec.WaitForPodCompletion = &maintenancev1.WaitForPodCompletionSpec{PodSelector: "for=wait-completion"}
nm.Spec.DrainSpec = &maintenancev1.DrainSpec{
Force: true,
PodSelector: "for=drain",
}
Expect(k8sClient.Create(testCtx, nm)).ToNot(HaveOccurred())
nmObjectsToCleanup = append(nmObjectsToCleanup, nm)

By("Create test pod")
pod := testutils.GetTestPod("test-pod", "test-node-0", nil)
Expect(k8sClient.Create(testCtx, pod)).ToNot(HaveOccurred())
podObjectsToCleanup = append(podObjectsToCleanup, pod)
By("Create test pods")
// pod to wait on for completion
podForWaitCompletion := testutils.GetTestPod("test-pod", "test-node-0", map[string]string{"for": "wait-completion"})
Expect(k8sClient.Create(testCtx, podForWaitCompletion)).ToNot(HaveOccurred())
podObjectsToCleanup = append(podObjectsToCleanup, podForWaitCompletion)
// pod for drain
podForDrain := testutils.GetTestPod("test-pod-2", "test-node-0", map[string]string{"for": "drain"})
Expect(k8sClient.Create(testCtx, podForDrain)).ToNot(HaveOccurred())
podObjectsToCleanup = append(podObjectsToCleanup, podForDrain)

By("Eventually NodeMaintenance condition is set to Pending")
Eventually(testutils.GetReadyConditionReasonForFn(testCtx, k8sClient, client.ObjectKeyFromObject(nm))).
Expand All @@ -193,13 +205,23 @@ var _ = Describe("NodeMaintenance Controller", func() {
Within(time.Second).WithPolling(100 * time.Millisecond).
Should(Equal(maintenancev1.ConditionReasonWaitForPodCompletion))

By("After deleting pod, NodeMaintenance is eventually Ready")
// NOTE(adrianc) for pods we must provide DeleteOptions as below else apiserver will not delete pod object
By("After deleting wait for completion pod, NodeMaintenance is Draining")
// NOTE(adrianc): for pods we must provide DeleteOptions as below else apiserver will not delete pod object
var grace int64
Expect(k8sClient.Delete(testCtx, pod, &client.DeleteOptions{GracePeriodSeconds: &grace, Preconditions: &metav1.Preconditions{UID: &pod.UID}})).
Expect(k8sClient.Delete(testCtx, podForWaitCompletion, &client.DeleteOptions{
GracePeriodSeconds: &grace, Preconditions: &metav1.Preconditions{UID: &podForWaitCompletion.UID}})).
ToNot(HaveOccurred())
Eventually(testutils.GetReadyConditionReasonForFn(testCtx, k8sClient, client.ObjectKeyFromObject(nm))).
WithTimeout(20 * time.Second).WithPolling(1 * time.Second).Should(Equal(maintenancev1.ConditionReasonDraining))

By("Eventually NodeMaintenance is Ready")
// NOTE(adrianc): as above comment, we need to "help" drain to delete the targeted pod since api server will not
// delete pod object without setting specific delete options
Expect(k8sClient.Delete(testCtx, podForDrain, &client.DeleteOptions{
GracePeriodSeconds: &grace, Preconditions: &metav1.Preconditions{UID: &podForDrain.UID}})).
ToNot(HaveOccurred())
Eventually(testutils.GetReadyConditionReasonForFn(testCtx, k8sClient, client.ObjectKeyFromObject(nm))).
WithTimeout(20 * time.Second).WithPolling(1 * time.Second).Should(Equal(maintenancev1.ConditionReasonReady))
WithTimeout(20 * time.Second).WithPolling(5 * time.Second).Should(Equal(maintenancev1.ConditionReasonReady))

By("Validating expected")
node := &corev1.Node{
Expand All @@ -219,7 +241,8 @@ var _ = Describe("NodeMaintenance Controller", func() {
Eventually(testutils.EventsForObjFn(testCtx, k8sClient, nm.UID)).WithTimeout(10 * time.Second).
WithPolling(1 * time.Second).Should(ContainElements(
maintenancev1.ConditionReasonPending, maintenancev1.ConditionReasonCordon,
maintenancev1.ConditionReasonWaitForPodCompletion, maintenancev1.ConditionReasonReady))
maintenancev1.ConditionReasonWaitForPodCompletion, maintenancev1.ConditionReasonDraining,
maintenancev1.ConditionReasonReady))

By("Should Uncordon node after NodeMaintenance is deleted")
Expect(k8sClient.Delete(testCtx, nm)).ToNot(HaveOccurred())
Expand Down

0 comments on commit b69752a

Please sign in to comment.