Skip to content

Commit

Permalink
Merge pull request #9 from adrianchiris/add-drain
Browse files Browse the repository at this point in the history
Drain state implementation
  • Loading branch information
ykulazhenkov authored Aug 22, 2024
2 parents 72b91bb + b69752a commit 61d1ad3
Show file tree
Hide file tree
Showing 18 changed files with 1,602 additions and 18 deletions.
4 changes: 4 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
inpackage: False
testonly: False
with-expecter: True
keeptree: True
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ catalog-push: ## Push a catalog image.
##@ Binary Dependencies download

MOCKERY ?= $(LOCALBIN)/mockery
MOCKERY_VERSION ?= v2.27.1
MOCKERY_VERSION ?= v2.44.2
.PHONY: mockery
mockery: $(MOCKERY) ## Download mockery locally if necessary.
$(MOCKERY): | $(LOCALBIN)
Expand Down
4 changes: 2 additions & 2 deletions api/v1alpha1/nodemaintenance_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ type NodeMaintenanceStatus struct {

// DrainStatus represents the status of draining for the node
type DrainStatus struct {
// TotalPods is the number of pods on the node at the time NodeMaintenance was Scheduled
// TotalPods is the number of pods on the node at the time NodeMaintenance started draining
TotalPods uint32 `json:"totalPods,omitempty"`

// EvictionPods is the total number of pods that need to be evicted at the time NodeMaintenance was scheduled
// EvictionPods is the total number of pods that need to be evicted at the time NodeMaintenance started draining
EvictionPods uint32 `json:"evictionPods,omitempty"`

// DrainProgress represents the draining progress as percentage
Expand Down
29 changes: 28 additions & 1 deletion api/v1alpha1/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,36 @@

package v1alpha1

import "fmt"
import (
"fmt"
"regexp"

corev1 "k8s.io/api/core/v1"
)

// CanonicalString is a canonical string representation of NodeMaintenance
func (nm *NodeMaintenance) CanonicalString() string {
return fmt.Sprintf("%s/%s:%s@%s", nm.Namespace, nm.Name, nm.Spec.RequestorID, nm.Spec.NodeName)
}

// Match matches PodEvictionFiterEntry on pod. returns true if Pod matches filter, false otherwise.
func (e *PodEvictionFiterEntry) Match(pod *corev1.Pod) bool {
var match bool
// match on ByResourceName regex
re, err := regexp.Compile(*e.ByResourceNameRegex)
if err != nil {
return match
}

OUTER:
for _, c := range pod.Spec.Containers {
for resourceName := range c.Resources.Requests {
if re.MatchString(resourceName.String()) {
match = true
break OUTER
}
}
}

return match
}
4 changes: 3 additions & 1 deletion cmd/maintenance-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
maintenancev1alpha1 "github.com/Mellanox/maintenance-operator/api/v1alpha1"
"github.com/Mellanox/maintenance-operator/internal/controller"
"github.com/Mellanox/maintenance-operator/internal/cordon"
"github.com/Mellanox/maintenance-operator/internal/drain"
operatorlog "github.com/Mellanox/maintenance-operator/internal/log"
"github.com/Mellanox/maintenance-operator/internal/podcompletion"
"github.com/Mellanox/maintenance-operator/internal/scheduler"
Expand Down Expand Up @@ -144,6 +145,7 @@ func main() {
os.Exit(1)
}

ctx := ctrl.SetupSignalHandler()
mgrClient := mgr.GetClient()

nmrOptions := controller.NewNodeMaintenanceReconcilerOptions()
Expand All @@ -153,6 +155,7 @@ func main() {
Options: nmrOptions,
CordonHandler: cordon.NewCordonHandler(mgrClient, k8sInterface),
WaitPodCompletionHandler: podcompletion.NewPodCompletionHandler(mgrClient),
DrainManager: drain.NewManager(ctrl.Log.WithName("DrainManager"), ctx, k8sInterface),
}).SetupWithManager(mgr, ctrl.Log.WithName("NodeMaintenanceReconciler")); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "NodeMaintenance")
os.Exit(1)
Expand Down Expand Up @@ -191,7 +194,6 @@ func main() {
os.Exit(1)
}

ctx := ctrl.SetupSignalHandler()
// index fields in mgr cache

// pod spec.nodeName used in nodemaintenance controller.
Expand Down
4 changes: 2 additions & 2 deletions config/crd/bases/maintenance.nvidia.com_nodemaintenances.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,12 @@ spec:
type: integer
evictionPods:
description: EvictionPods is the total number of pods that need
to be evicted at the time NodeMaintenance was scheduled
to be evicted at the time NodeMaintenance started draining
format: int32
type: integer
totalPods:
description: TotalPods is the number of pods on the node at the
time NodeMaintenance was Scheduled
time NodeMaintenance started draining
format: int32
type: integer
waitForEviction:
Expand Down
11 changes: 11 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- pods/eviction
verbs:
- create
- delete
- get
- list
- patch
- update
- apiGroups:
- maintenance.nvidia.com
resources:
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/onsi/ginkgo/v2 v2.19.0
github.com/onsi/gomega v1.33.1
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.4
go.uber.org/zap v1.26.0
k8s.io/api v0.30.3
k8s.io/apimachinery v0.30.3
Expand Down Expand Up @@ -63,13 +64,15 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.16.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spf13/cobra v1.7.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/xlab/treeprint v1.2.0 // indirect
go.starlark.net v0.0.0-20230525235612-a134d8f9ddca // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
161 changes: 160 additions & 1 deletion internal/controller/nodemaintenance_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package controller
import (
"context"
"errors"
"fmt"
"reflect"
"sync"
"time"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
Expand All @@ -39,14 +42,17 @@ import (

maintenancev1 "github.com/Mellanox/maintenance-operator/api/v1alpha1"
"github.com/Mellanox/maintenance-operator/internal/cordon"
"github.com/Mellanox/maintenance-operator/internal/drain"
"github.com/Mellanox/maintenance-operator/internal/k8sutils"
operatorlog "github.com/Mellanox/maintenance-operator/internal/log"
"github.com/Mellanox/maintenance-operator/internal/podcompletion"
"github.com/Mellanox/maintenance-operator/internal/utils"
)

var (
defaultMaxNodeMaintenanceTime = 1600 * time.Second
waitPodCompletionRequeueTime = 10 * time.Second
drainReqeueTime = 10 * time.Second
)

const (
Expand Down Expand Up @@ -101,6 +107,7 @@ type NodeMaintenanceReconciler struct {
Options *NodeMaintenanceReconcilerOptions
CordonHandler cordon.Handler
WaitPodCompletionHandler podcompletion.Handler
DrainManager drain.Manager
}

//+kubebuilder:rbac:groups=maintenance.nvidia.com,resources=nodemaintenances,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -109,6 +116,7 @@ type NodeMaintenanceReconciler struct {
//+kubebuilder:rbac:groups="",resources=events,verbs=create
//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;update;patch
//+kubebuilder:rbac:groups="",resources=pods,verbs=get;watch;list;update;patch;delete
//+kubebuilder:rbac:groups="",resources=pods/eviction,verbs=create;get;list;update;patch;delete

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
Expand All @@ -123,6 +131,7 @@ func (r *NodeMaintenanceReconciler) Reconcile(ctx context.Context, req ctrl.Requ
// load any stored options
r.Options.Load()
reqLog.Info("loaded options", "maxNodeMaintenanceTime", r.Options.MaxNodeMaintenanceTime())
reqLog.Info("outstanding drain requests", "num", len(r.DrainManager.ListRequests()))

// get NodeMaintenance object
nm := &maintenancev1.NodeMaintenance{}
Expand Down Expand Up @@ -184,7 +193,10 @@ func (r *NodeMaintenanceReconciler) Reconcile(ctx context.Context, req ctrl.Requ
reqLog.Error(err, "failed to handle waitForPodCompletion state for NodeMaintenance object")
}
case maintenancev1.ConditionReasonDraining:
// TODO(adrianc): implement
res, err = r.handleDrainState(ctx, reqLog, nm, node)
if err != nil {
reqLog.Error(err, "failed to handle drain state for NodeMaintenance object")
}
case maintenancev1.ConditionReasonReady:
err = r.handleReadyState(ctx, reqLog, nm, node)
if err != nil {
Expand Down Expand Up @@ -340,6 +352,115 @@ func (r *NodeMaintenanceReconciler) handleWaitPodCompletionState(ctx context.Con
// we can can progress to next step for this NodeMaintenance
}

// update condition and send event
err = k8sutils.SetReadyConditionReason(ctx, r.Client, nm, maintenancev1.ConditionReasonDraining)
if err != nil {
reqLog.Error(err, "failed to update status for NodeMaintenance object")
return res, err
}

r.EventRecorder.Event(
nm, corev1.EventTypeNormal, maintenancev1.ConditionChangedEventType, maintenancev1.ConditionReasonDraining)

return res, nil
}

func (r *NodeMaintenanceReconciler) handleDrainState(ctx context.Context, reqLog logr.Logger, nm *maintenancev1.NodeMaintenance, node *corev1.Node) (ctrl.Result, error) {
reqLog.Info("Handle Draining NodeMaintenance")
var err error
var res ctrl.Result

if !nm.GetDeletionTimestamp().IsZero() {
// object is being deleted, handle cleanup.
reqLog.Info("NodeMaintenance object is deleting")

reqLog.Info("handle drain request removal")
drainReqUID := drain.DrainRequestUIDFromNodeMaintenance(nm)
req := r.DrainManager.GetRequest(drainReqUID)
if req != nil {
reqLog.Info("stopping and removing drain request", "reqUID", drainReqUID, "state", req.State())
}
r.DrainManager.RemoveRequest(drainReqUID)

if nm.Spec.Cordon {
reqLog.Info("handle uncordon of node, ", "node", node.Name)
err = r.CordonHandler.HandleUnCordon(ctx, reqLog, nm, node)
if err != nil {
return res, err
}
}

// TODO(adrianc): unpause MCP in OCP when support is added.

// remove finalizer if exists and return
reqLog.Info("NodeMaintenance object is deleting, removing maintenance finalizer", "namespace", nm.Namespace, "name", nm.Name)
err = k8sutils.RemoveFinalizer(ctx, r.Client, nm, maintenancev1.MaintenanceFinalizerName)
if err != nil {
reqLog.Error(err, "failed to remove finalizer for NodeMaintenance", "namespace", nm.Namespace, "name", nm.Name)
}
return res, err
}

if nm.Spec.DrainSpec != nil {
drainReqUID := drain.DrainRequestUIDFromNodeMaintenance(nm)
req := r.DrainManager.GetRequest(drainReqUID)
if req == nil {
reqLog.Info("sending new drain request")
req = r.DrainManager.NewDrainRequest(nm)
// reset and update initial drain status
nm.Status.Drain = nil
if err = r.updateDrainStatus(ctx, nm, req); err != nil {
return res, err
}
_ = r.DrainManager.AddRequest(req)
return ctrl.Result{Requeue: true, RequeueAfter: drainReqeueTime}, nil
}

reqLog.Info("drain request details", "uid", req.UID(), "state", req.State())

// handle update of drain spec
if !reflect.DeepEqual(req.Spec().Spec, *nm.Spec.DrainSpec) {
reqLog.Info("drain spec has changed, removing current request and requeue")
r.DrainManager.RemoveRequest(req.UID())
return ctrl.Result{Requeue: true, RequeueAfter: drainReqeueTime}, nil
}

if req.State() == drain.DrainStateInProgress {
// update progress and requeue
if err = r.updateDrainStatus(ctx, nm, req); err != nil {
return res, err
}
return ctrl.Result{Requeue: true, RequeueAfter: drainReqeueTime}, nil
}

// handle request in error state
if req.State() == drain.DrainStateError || req.State() == drain.DrainStateCanceled {
reqLog.Info("drain request error. removing current request and requeue", "state", req.State())
r.DrainManager.RemoveRequest(req.UID())
return ctrl.Result{Requeue: true, RequeueAfter: drainReqeueTime}, nil
}

// Drain completed successfully
reqLog.Info("drain completed successfully", "reqUID", req.UID(), "state", req.State())
if err = r.updateDrainStatus(ctx, nm, req); err != nil {
return res, err
}

r.DrainManager.RemoveRequest(req.UID())
} else {
// if nil, remove any pending requests for NodeMaintenance if present
r.DrainManager.RemoveRequest(drain.DrainRequestUIDFromNodeMaintenance(nm))
if nm.Status.Drain != nil {
// clear out drain status
nm.Status.Drain = nil
err = r.Client.Status().Update(ctx, nm)
if err != nil {
reqLog.Error(err, "failed to update drain status for NodeMaintenance object")
return res, err
}
}
}

// update condition and send event
err = k8sutils.SetReadyConditionReason(ctx, r.Client, nm, maintenancev1.ConditionReasonReady)
if err != nil {
Expand All @@ -353,6 +474,44 @@ func (r *NodeMaintenanceReconciler) handleWaitPodCompletionState(ctx context.Con
return res, nil
}

// updateDrainStatus updates NodeMaintenance drain status in place. returns error if occurred
func (r *NodeMaintenanceReconciler) updateDrainStatus(ctx context.Context, nm *maintenancev1.NodeMaintenance, drainReq drain.DrainRequest) error {
ds, err := drainReq.Status()
if err != nil {
return fmt.Errorf("failed to update drain status. %w", err)
}

if nm.Status.Drain == nil {
// set initial status
podsOnNode := &corev1.PodList{}
selectorFields := fields.OneTermEqualSelector("spec.nodeName", nm.Spec.NodeName)
err = r.Client.List(ctx, podsOnNode, &client.ListOptions{FieldSelector: selectorFields})
if err != nil {
return fmt.Errorf("failed to list pods. %w", err)
}

nm.Status.Drain = &maintenancev1.DrainStatus{
TotalPods: uint32(len(podsOnNode.Items)),
EvictionPods: uint32(len(ds.PodsToDelete)),
}
}

removedPods := utils.MaxInt(int(nm.Status.Drain.EvictionPods)-len(ds.PodsToDelete), 0)

nm.Status.Drain.DrainProgress = 100
if nm.Status.Drain.EvictionPods != 0 {
nm.Status.Drain.DrainProgress = uint32(float32(removedPods) / float32(nm.Status.Drain.EvictionPods) * 100)
}
nm.Status.Drain.WaitForEviction = ds.PodsToDelete

err = r.Client.Status().Update(ctx, nm)
if err != nil {
return fmt.Errorf("failed to update drain status for NodeMaintenance object. %w", err)
}

return nil
}

func (r *NodeMaintenanceReconciler) handleReadyState(ctx context.Context, reqLog logr.Logger, nm *maintenancev1.NodeMaintenance, node *corev1.Node) error {
reqLog.Info("Handle Ready NodeMaintenance")
// handle finalizers
Expand Down
Loading

0 comments on commit 61d1ad3

Please sign in to comment.