Skip to content

Commit

Permalink
init: update scan order, replacing pv scan by scanning pvcs
Browse files Browse the repository at this point in the history
  • Loading branch information
talasulin committed Sep 3, 2024
1 parent 13d71fb commit 17b6713
Showing 1 changed file with 39 additions and 43 deletions.
82 changes: 39 additions & 43 deletions internal/controller/pvc_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/utils/strings/slices"
"strconv"

"github.com/AppsFlyer/local-pvc-releaser/internal/exporters"
Expand All @@ -39,6 +38,7 @@ import (
const (
RemovingNode = "RemovingNode"
NodeControllerComponent = "node-controller"
PVCnodeAnnotationKey = "volume.kubernetes.io/selected-node"
)

// PVCReconciler reconciles a PersistentVolumeClaim object
Expand Down Expand Up @@ -68,34 +68,36 @@ func (r *PVCReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R

r.Logger.Info("node termination event found", "Message", nodeTerminationEvent.Message, "EventID", nodeTerminationEvent.UID, "EventTime", nodeTerminationEvent.LastTimestamp)

pvList := &v1.PersistentVolumeList{}
if err := r.List(ctx, pvList); err != nil {
return ctrl.Result{}, err
}
terminatedNodeName := nodeTerminationEvent.InvolvedObject.Name

allPvcList := &v1.PersistentVolumeClaimList{}
if err := r.List(ctx, allPvcList); err != nil {
pvcList := &v1.PersistentVolumeClaimList{}
if err := r.List(ctx, pvcList); err != nil {
return ctrl.Result{}, err
}

nodePvList := r.FilterPVListByNodeName(pvList, nodeTerminationEvent.InvolvedObject.Name)
// Filtering the related PVC objects bounded to the terminated node
nodePvcList := r.FilterPVCListByNodeName(pvcList, terminatedNodeName)

if len(nodePvList) == 0 {
r.Logger.Info(fmt.Sprintf("could not find any bounded pv objects for node - %s. will not take any action", nodeTerminationEvent.InvolvedObject.Name))
if len(nodePvcList) == 0 {
r.Logger.Info(fmt.Sprintf("could not find any bounded pvc objects for node - %s. will not take any action", terminatedNodeName))
return ctrl.Result{}, nil
}

relatedPvcList := make([]*v1.PersistentVolumeClaim, 0)
for _, pv := range nodePvList {
if pvc := r.FilterPVCListByPV(allPvcList, pv); pvc != nil {
r.Logger.Info(fmt.Sprintf("pvc - %s is bounded to pv - %s and marked for deletion", pvc.Name, pv.Name))
relatedPvcList = append(relatedPvcList, pvc)
continue
pvcListPendingDeletion := make([]*v1.PersistentVolumeClaim, 0)
for _, nodePvc := range nodePvcList {

err, isLocal := r.CheckLocalPvStoragePluginByPVC(ctx, nodePvc)
if err != nil {
return ctrl.Result{}, err
}

if isLocal {
r.Logger.Info(fmt.Sprintf("pvc - %s is bounded to a pv with local storage on the terminated node and will be marked for deletion", nodePvc.Name))
pvcListPendingDeletion = append(pvcListPendingDeletion, nodePvc)
}
r.Logger.Info(fmt.Sprintf("could not find the pvc object for pv - %s. pvc handling will be skipped", pv.Name))
}

if err := r.CleanPVCS(ctx, relatedPvcList); err != nil {
if err := r.CleanPVCS(ctx, pvcListPendingDeletion); err != nil {
r.Logger.Error(err, "failed to delete pvc objects from kubernetes")
}

Expand Down Expand Up @@ -124,41 +126,35 @@ func (r *PVCReconciler) CleanPVCS(ctx context.Context, pvcs []*v1.PersistentVolu
return nil
}

func (r *PVCReconciler) FilterPVListByNodeName(pvList *v1.PersistentVolumeList, nodeName string) []*v1.PersistentVolume {
var relatedPVs []*v1.PersistentVolume

for i := 0; i < len(pvList.Items); i++ {
pv := &pvList.Items[i]
// Ignoring PVs without affinity rules or PVs that already got released
if pv.Spec.NodeAffinity != nil && pv.Spec.NodeAffinity.Required == nil || pv.Status.Phase != v1.VolumeBound {
continue
}
func (r *PVCReconciler) FilterPVCListByNodeName(pvcList *v1.PersistentVolumeClaimList, nodeName string) []*v1.PersistentVolumeClaim {
var relatedPVCs []*v1.PersistentVolumeClaim

for _, nst := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms {
for _, matchEx := range nst.MatchExpressions {
if slices.Contains(matchEx.Values, nodeName) {
r.Logger.Info(fmt.Sprintf("pv - %s is bounded to node - %s. will be marked for pvc cleanup", pv.Name, nodeName))
relatedPVs = append(relatedPVs, pv)
for i := 0; i < len(pvcList.Items); i++ {
pvc := &pvcList.Items[i]

break
}
}
if pvc.Annotations[PVCnodeAnnotationKey] == nodeName {
r.Logger.Info(fmt.Sprintf("pvc - %s is bounded to node - %s. will be marked for pv 'local' plugin scan.", pvc.Name, nodeName))
relatedPVCs = append(relatedPVCs, pvc)
}
}

return relatedPVs
return relatedPVCs
}

func (r *PVCReconciler) FilterPVCListByPV(pvcList *v1.PersistentVolumeClaimList, pv *v1.PersistentVolume) *v1.PersistentVolumeClaim {
for i := 0; i < len(pvcList.Items); i++ {
claim := &pvcList.Items[i]
func (r *PVCReconciler) CheckLocalPvStoragePluginByPVC(ctx context.Context, pvc *v1.PersistentVolumeClaim) (error, bool) {
pv := &v1.PersistentVolume{}
pvKey := client.ObjectKey{Name: pvc.Spec.VolumeName}

if claim.Spec.VolumeName == pv.Name {
return claim
}
if err := r.Get(ctx, pvKey, pv); err != nil {
r.Logger.Error(err, fmt.Sprintf("could not find the attached pv object - %s", pvc.Spec.VolumeName))
return err, false
}

return nil
if pv.Spec.Local != nil {
return nil, true
}

return nil, false
}

// SetupWithManager sets up the controller with the Manager.
Expand Down

0 comments on commit 17b6713

Please sign in to comment.