Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drain state implementation #9

Merged
merged 3 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
inpackage: False
testonly: False
with-expecter: True
keeptree: True
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ catalog-push: ## Push a catalog image.
##@ Binary Dependencies download

MOCKERY ?= $(LOCALBIN)/mockery
MOCKERY_VERSION ?= v2.27.1
MOCKERY_VERSION ?= v2.44.2
.PHONY: mockery
mockery: $(MOCKERY) ## Download mockery locally if necessary.
$(MOCKERY): | $(LOCALBIN)
Expand Down
4 changes: 2 additions & 2 deletions api/v1alpha1/nodemaintenance_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ type NodeMaintenanceStatus struct {

// DrainStatus represents the status of draining for the node
type DrainStatus struct {
// TotalPods is the number of pods on the node at the time NodeMaintenance was Scheduled
// TotalPods is the number of pods on the node at the time NodeMaintenance started draining
TotalPods uint32 `json:"totalPods,omitempty"`

// EvictionPods is the total number of pods that need to be evicted at the time NodeMaintenance was scheduled
// EvictionPods is the total number of pods that need to be evicted at the time NodeMaintenance started draining
EvictionPods uint32 `json:"evictionPods,omitempty"`

// DrainProgress represents the draining progress as percentage
Expand Down
29 changes: 28 additions & 1 deletion api/v1alpha1/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,36 @@

package v1alpha1

import "fmt"
import (
"fmt"
"regexp"

corev1 "k8s.io/api/core/v1"
)

// CanonicalString is a canonical string representation of NodeMaintenance
func (nm *NodeMaintenance) CanonicalString() string {
return fmt.Sprintf("%s/%s:%s@%s", nm.Namespace, nm.Name, nm.Spec.RequestorID, nm.Spec.NodeName)
}

// Match matches PodEvictionFiterEntry on pod. returns true if Pod matches filter, false otherwise.
func (e *PodEvictionFiterEntry) Match(pod *corev1.Pod) bool {
var match bool
// match on ByResourceName regex
re, err := regexp.Compile(*e.ByResourceNameRegex)
if err != nil {
return match
}

OUTER:
for _, c := range pod.Spec.Containers {
for resourceName := range c.Resources.Requests {
if re.MatchString(resourceName.String()) {
match = true
break OUTER
}
}
}

return match
}
4 changes: 3 additions & 1 deletion cmd/maintenance-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
maintenancev1alpha1 "github.com/Mellanox/maintenance-operator/api/v1alpha1"
"github.com/Mellanox/maintenance-operator/internal/controller"
"github.com/Mellanox/maintenance-operator/internal/cordon"
"github.com/Mellanox/maintenance-operator/internal/drain"
operatorlog "github.com/Mellanox/maintenance-operator/internal/log"
"github.com/Mellanox/maintenance-operator/internal/podcompletion"
"github.com/Mellanox/maintenance-operator/internal/scheduler"
Expand Down Expand Up @@ -144,6 +145,7 @@ func main() {
os.Exit(1)
}

ctx := ctrl.SetupSignalHandler()
mgrClient := mgr.GetClient()

nmrOptions := controller.NewNodeMaintenanceReconcilerOptions()
Expand All @@ -153,6 +155,7 @@ func main() {
Options: nmrOptions,
CordonHandler: cordon.NewCordonHandler(mgrClient, k8sInterface),
WaitPodCompletionHandler: podcompletion.NewPodCompletionHandler(mgrClient),
DrainManager: drain.NewManager(ctrl.Log.WithName("DrainManager"), ctx, k8sInterface),
}).SetupWithManager(mgr, ctrl.Log.WithName("NodeMaintenanceReconciler")); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "NodeMaintenance")
os.Exit(1)
Expand Down Expand Up @@ -191,7 +194,6 @@ func main() {
os.Exit(1)
}

ctx := ctrl.SetupSignalHandler()
// index fields in mgr cache

// pod spec.nodeName used in nodemaintenance controller.
Expand Down
4 changes: 2 additions & 2 deletions config/crd/bases/maintenance.nvidia.com_nodemaintenances.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,12 @@ spec:
type: integer
evictionPods:
description: EvictionPods is the total number of pods that need
to be evicted at the time NodeMaintenance was scheduled
to be evicted at the time NodeMaintenance started draining
format: int32
type: integer
totalPods:
description: TotalPods is the number of pods on the node at the
time NodeMaintenance was Scheduled
time NodeMaintenance started draining
format: int32
type: integer
waitForEviction:
Expand Down
11 changes: 11 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- pods/eviction
verbs:
- create
- delete
- get
- list
- patch
- update
- apiGroups:
- maintenance.nvidia.com
resources:
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/onsi/ginkgo/v2 v2.19.0
github.com/onsi/gomega v1.33.1
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.4
go.uber.org/zap v1.26.0
k8s.io/api v0.30.3
k8s.io/apimachinery v0.30.3
Expand Down Expand Up @@ -63,13 +64,15 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.16.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spf13/cobra v1.7.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/xlab/treeprint v1.2.0 // indirect
go.starlark.net v0.0.0-20230525235612-a134d8f9ddca // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
161 changes: 160 additions & 1 deletion internal/controller/nodemaintenance_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package controller
import (
"context"
"errors"
"fmt"
"reflect"
"sync"
"time"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
Expand All @@ -39,14 +42,17 @@ import (

maintenancev1 "github.com/Mellanox/maintenance-operator/api/v1alpha1"
"github.com/Mellanox/maintenance-operator/internal/cordon"
"github.com/Mellanox/maintenance-operator/internal/drain"
"github.com/Mellanox/maintenance-operator/internal/k8sutils"
operatorlog "github.com/Mellanox/maintenance-operator/internal/log"
"github.com/Mellanox/maintenance-operator/internal/podcompletion"
"github.com/Mellanox/maintenance-operator/internal/utils"
)

var (
defaultMaxNodeMaintenanceTime = 1600 * time.Second
waitPodCompletionRequeueTime = 10 * time.Second
drainReqeueTime = 10 * time.Second
)

const (
Expand Down Expand Up @@ -101,6 +107,7 @@ type NodeMaintenanceReconciler struct {
Options *NodeMaintenanceReconcilerOptions
CordonHandler cordon.Handler
WaitPodCompletionHandler podcompletion.Handler
DrainManager drain.Manager
}

//+kubebuilder:rbac:groups=maintenance.nvidia.com,resources=nodemaintenances,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -109,6 +116,7 @@ type NodeMaintenanceReconciler struct {
//+kubebuilder:rbac:groups="",resources=events,verbs=create
//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;update;patch
//+kubebuilder:rbac:groups="",resources=pods,verbs=get;watch;list;update;patch;delete
//+kubebuilder:rbac:groups="",resources=pods/eviction,verbs=create;get;list;update;patch;delete

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
Expand All @@ -123,6 +131,7 @@ func (r *NodeMaintenanceReconciler) Reconcile(ctx context.Context, req ctrl.Requ
// load any stored options
r.Options.Load()
reqLog.Info("loaded options", "maxNodeMaintenanceTime", r.Options.MaxNodeMaintenanceTime())
reqLog.Info("outstanding drain requests", "num", len(r.DrainManager.ListRequests()))

// get NodeMaintenance object
nm := &maintenancev1.NodeMaintenance{}
Expand Down Expand Up @@ -184,7 +193,10 @@ func (r *NodeMaintenanceReconciler) Reconcile(ctx context.Context, req ctrl.Requ
reqLog.Error(err, "failed to handle waitForPodCompletion state for NodeMaintenance object")
}
case maintenancev1.ConditionReasonDraining:
// TODO(adrianc): implement
res, err = r.handleDrainState(ctx, reqLog, nm, node)
if err != nil {
reqLog.Error(err, "failed to handle drain state for NodeMaintenance object")
}
case maintenancev1.ConditionReasonReady:
err = r.handleReadyState(ctx, reqLog, nm, node)
if err != nil {
Expand Down Expand Up @@ -340,6 +352,115 @@ func (r *NodeMaintenanceReconciler) handleWaitPodCompletionState(ctx context.Con
// we can can progress to next step for this NodeMaintenance
}

// update condition and send event
err = k8sutils.SetReadyConditionReason(ctx, r.Client, nm, maintenancev1.ConditionReasonDraining)
if err != nil {
reqLog.Error(err, "failed to update status for NodeMaintenance object")
return res, err
}

r.EventRecorder.Event(
nm, corev1.EventTypeNormal, maintenancev1.ConditionChangedEventType, maintenancev1.ConditionReasonDraining)

return res, nil
}

func (r *NodeMaintenanceReconciler) handleDrainState(ctx context.Context, reqLog logr.Logger, nm *maintenancev1.NodeMaintenance, node *corev1.Node) (ctrl.Result, error) {
reqLog.Info("Handle Draining NodeMaintenance")
var err error
var res ctrl.Result

if !nm.GetDeletionTimestamp().IsZero() {
// object is being deleted, handle cleanup.
reqLog.Info("NodeMaintenance object is deleting")

reqLog.Info("handle drain request removal")
drainReqUID := drain.DrainRequestUIDFromNodeMaintenance(nm)
req := r.DrainManager.GetRequest(drainReqUID)
if req != nil {
reqLog.Info("stopping and removing drain request", "reqUID", drainReqUID, "state", req.State())
}
r.DrainManager.RemoveRequest(drainReqUID)

if nm.Spec.Cordon {
reqLog.Info("handle uncordon of node, ", "node", node.Name)
err = r.CordonHandler.HandleUnCordon(ctx, reqLog, nm, node)
if err != nil {
return res, err
}
}

// TODO(adrianc): unpause MCP in OCP when support is added.

// remove finalizer if exists and return
reqLog.Info("NodeMaintenance object is deleting, removing maintenance finalizer", "namespace", nm.Namespace, "name", nm.Name)
err = k8sutils.RemoveFinalizer(ctx, r.Client, nm, maintenancev1.MaintenanceFinalizerName)
if err != nil {
reqLog.Error(err, "failed to remove finalizer for NodeMaintenance", "namespace", nm.Namespace, "name", nm.Name)
}
return res, err
}

if nm.Spec.DrainSpec != nil {
drainReqUID := drain.DrainRequestUIDFromNodeMaintenance(nm)
req := r.DrainManager.GetRequest(drainReqUID)
if req == nil {
reqLog.Info("sending new drain request")
req = r.DrainManager.NewDrainRequest(nm)
// reset and update initial drain status
nm.Status.Drain = nil
if err = r.updateDrainStatus(ctx, nm, req); err != nil {
return res, err
}
_ = r.DrainManager.AddRequest(req)
return ctrl.Result{Requeue: true, RequeueAfter: drainReqeueTime}, nil
}

reqLog.Info("drain request details", "uid", req.UID(), "state", req.State())

// handle update of drain spec
if !reflect.DeepEqual(req.Spec().Spec, *nm.Spec.DrainSpec) {
reqLog.Info("drain spec has changed, removing current request and requeue")
r.DrainManager.RemoveRequest(req.UID())
return ctrl.Result{Requeue: true, RequeueAfter: drainReqeueTime}, nil
}

if req.State() == drain.DrainStateInProgress {
// update progress and requeue
if err = r.updateDrainStatus(ctx, nm, req); err != nil {
return res, err
}
return ctrl.Result{Requeue: true, RequeueAfter: drainReqeueTime}, nil
}

// handle request in error state
if req.State() == drain.DrainStateError || req.State() == drain.DrainStateCanceled {
reqLog.Info("drain request error. removing current request and requeue", "state", req.State())
r.DrainManager.RemoveRequest(req.UID())
return ctrl.Result{Requeue: true, RequeueAfter: drainReqeueTime}, nil
}

// Drain completed successfully
reqLog.Info("drain completed successfully", "reqUID", req.UID(), "state", req.State())
if err = r.updateDrainStatus(ctx, nm, req); err != nil {
return res, err
}

r.DrainManager.RemoveRequest(req.UID())
} else {
// if nil, remove any pending requests for NodeMaintenance if present
r.DrainManager.RemoveRequest(drain.DrainRequestUIDFromNodeMaintenance(nm))
if nm.Status.Drain != nil {
// clear out drain status
nm.Status.Drain = nil
err = r.Client.Status().Update(ctx, nm)
if err != nil {
reqLog.Error(err, "failed to update drain status for NodeMaintenance object")
return res, err
}
}
}

// update condition and send event
err = k8sutils.SetReadyConditionReason(ctx, r.Client, nm, maintenancev1.ConditionReasonReady)
if err != nil {
Expand All @@ -353,6 +474,44 @@ func (r *NodeMaintenanceReconciler) handleWaitPodCompletionState(ctx context.Con
return res, nil
}

// updateDrainStatus updates NodeMaintenance drain status in place. returns error if occurred
func (r *NodeMaintenanceReconciler) updateDrainStatus(ctx context.Context, nm *maintenancev1.NodeMaintenance, drainReq drain.DrainRequest) error {
ds, err := drainReq.Status()
if err != nil {
return fmt.Errorf("failed to update drain status. %w", err)
}

if nm.Status.Drain == nil {
// set initial status
podsOnNode := &corev1.PodList{}
selectorFields := fields.OneTermEqualSelector("spec.nodeName", nm.Spec.NodeName)
err = r.Client.List(ctx, podsOnNode, &client.ListOptions{FieldSelector: selectorFields})
if err != nil {
return fmt.Errorf("failed to list pods. %w", err)
}

nm.Status.Drain = &maintenancev1.DrainStatus{
TotalPods: uint32(len(podsOnNode.Items)),
EvictionPods: uint32(len(ds.PodsToDelete)),
}
}

removedPods := utils.MaxInt(int(nm.Status.Drain.EvictionPods)-len(ds.PodsToDelete), 0)

nm.Status.Drain.DrainProgress = 100
if nm.Status.Drain.EvictionPods != 0 {
nm.Status.Drain.DrainProgress = uint32(float32(removedPods) / float32(nm.Status.Drain.EvictionPods) * 100)
}
nm.Status.Drain.WaitForEviction = ds.PodsToDelete

err = r.Client.Status().Update(ctx, nm)
if err != nil {
return fmt.Errorf("failed to update drain status for NodeMaintenance object. %w", err)
}

return nil
}

func (r *NodeMaintenanceReconciler) handleReadyState(ctx context.Context, reqLog logr.Logger, nm *maintenancev1.NodeMaintenance, node *corev1.Node) error {
reqLog.Info("Handle Ready NodeMaintenance")
// handle finalizers
Expand Down
Loading
Loading