Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug Fix - Replacing scan order from PV to PVC while counting on K8s native node annotations #52

Merged
merged 3 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 39 additions & 43 deletions internal/controller/pvc_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/utils/strings/slices"
"strconv"

"github.com/AppsFlyer/local-pvc-releaser/internal/exporters"
Expand All @@ -39,6 +38,7 @@ import (
const (
RemovingNode = "RemovingNode"
NodeControllerComponent = "node-controller"
PVCnodeAnnotationKey = "volume.kubernetes.io/selected-node"
)

// PVCReconciler reconciles a PersistentVolumeClaim object
Expand Down Expand Up @@ -68,34 +68,36 @@ func (r *PVCReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R

r.Logger.Info("node termination event found", "Message", nodeTerminationEvent.Message, "EventID", nodeTerminationEvent.UID, "EventTime", nodeTerminationEvent.LastTimestamp)

pvList := &v1.PersistentVolumeList{}
if err := r.List(ctx, pvList); err != nil {
return ctrl.Result{}, err
}
terminatedNodeName := nodeTerminationEvent.InvolvedObject.Name

allPvcList := &v1.PersistentVolumeClaimList{}
if err := r.List(ctx, allPvcList); err != nil {
pvcList := &v1.PersistentVolumeClaimList{}
if err := r.List(ctx, pvcList); err != nil {
return ctrl.Result{}, err
}

nodePvList := r.FilterPVListByNodeName(pvList, nodeTerminationEvent.InvolvedObject.Name)
// Filtering the related PVC objects bounded to the terminated node
nodePvcList := r.FilterPVCListByNodeName(pvcList, terminatedNodeName)

if len(nodePvList) == 0 {
r.Logger.Info(fmt.Sprintf("could not find any bounded pv objects for node - %s. will not take any action", nodeTerminationEvent.InvolvedObject.Name))
if len(nodePvcList) == 0 {
r.Logger.Info(fmt.Sprintf("could not find any bounded pvc objects for node - %s. will not take any action", terminatedNodeName))
return ctrl.Result{}, nil
}

relatedPvcList := make([]*v1.PersistentVolumeClaim, 0)
for _, pv := range nodePvList {
if pvc := r.FilterPVCListByPV(allPvcList, pv); pvc != nil {
r.Logger.Info(fmt.Sprintf("pvc - %s is bounded to pv - %s and marked for deletion", pvc.Name, pv.Name))
relatedPvcList = append(relatedPvcList, pvc)
continue
pvcListPendingDeletion := make([]*v1.PersistentVolumeClaim, 0)
for _, nodePvc := range nodePvcList {

err, isLocal := r.CheckLocalPvStoragePluginByPVC(ctx, nodePvc)
if err != nil {
return ctrl.Result{}, err
}

if isLocal {
r.Logger.Info(fmt.Sprintf("pvc - %s is bounded to a pv with local storage on the terminated node and will be marked for deletion", nodePvc.Name))
pvcListPendingDeletion = append(pvcListPendingDeletion, nodePvc)
}
r.Logger.Info(fmt.Sprintf("could not find the pvc object for pv - %s. pvc handling will be skipped", pv.Name))
}

if err := r.CleanPVCS(ctx, relatedPvcList); err != nil {
if err := r.CleanPVCS(ctx, pvcListPendingDeletion); err != nil {
r.Logger.Error(err, "failed to delete pvc objects from kubernetes")
}

Expand Down Expand Up @@ -124,41 +126,35 @@ func (r *PVCReconciler) CleanPVCS(ctx context.Context, pvcs []*v1.PersistentVolu
return nil
}

func (r *PVCReconciler) FilterPVListByNodeName(pvList *v1.PersistentVolumeList, nodeName string) []*v1.PersistentVolume {
var relatedPVs []*v1.PersistentVolume

for i := 0; i < len(pvList.Items); i++ {
pv := &pvList.Items[i]
// Ignoring PVs without affinity rules or PVs that already got released
if pv.Spec.NodeAffinity != nil && pv.Spec.NodeAffinity.Required == nil || pv.Status.Phase != v1.VolumeBound {
continue
}
func (r *PVCReconciler) FilterPVCListByNodeName(pvcList *v1.PersistentVolumeClaimList, nodeName string) []*v1.PersistentVolumeClaim {
var relatedPVCs []*v1.PersistentVolumeClaim

for _, nst := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms {
for _, matchEx := range nst.MatchExpressions {
if slices.Contains(matchEx.Values, nodeName) {
r.Logger.Info(fmt.Sprintf("pv - %s is bounded to node - %s. will be marked for pvc cleanup", pv.Name, nodeName))
relatedPVs = append(relatedPVs, pv)
for i := 0; i < len(pvcList.Items); i++ {
pvc := &pvcList.Items[i]

break
}
}
if pvcNode, exists := pvc.Annotations[PVCnodeAnnotationKey]; exists && pvcNode == nodeName {
r.Logger.Info(fmt.Sprintf("pvc - %s is bounded to node - %s. will be marked for pv 'local' plugin scan.", pvc.Name, nodeName))
relatedPVCs = append(relatedPVCs, pvc)
}
}

return relatedPVs
return relatedPVCs
}

func (r *PVCReconciler) FilterPVCListByPV(pvcList *v1.PersistentVolumeClaimList, pv *v1.PersistentVolume) *v1.PersistentVolumeClaim {
for i := 0; i < len(pvcList.Items); i++ {
claim := &pvcList.Items[i]
func (r *PVCReconciler) CheckLocalPvStoragePluginByPVC(ctx context.Context, pvc *v1.PersistentVolumeClaim) (error, bool) {
pv := &v1.PersistentVolume{}
pvKey := client.ObjectKey{Name: pvc.Spec.VolumeName}

if claim.Spec.VolumeName == pv.Name {
return claim
}
if err := r.Get(ctx, pvKey, pv); err != nil {
r.Logger.Error(err, fmt.Sprintf("could not find the attached pv object - %s", pvc.Spec.VolumeName))
return err, false
}

return nil
if pv.Spec.Local != nil {
return nil, true
}

return nil, false
}

// SetupWithManager sets up the controller with the Manager.
Expand Down
34 changes: 11 additions & 23 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ var _ = Describe("Successful PVC Release", func() {
}, timeout, interval).Should(Succeed())

pvcAnnotations := map[string]string{
"appsflyer.com/local-pvc-releaser": "enabled",
"appsflyer.com/local-pvc-releaser": "enabled",
"volume.kubernetes.io/selected-node": nodeName,
}
pvc := objects.Helper().PersistentVolumeClaim().Create(pvcName, storageClassName, pvcAnnotations)
pvc := objects.Helper().PersistentVolumeClaim().Create(pvcName, pvName, storageClassName, pvcAnnotations)
Expect(k8sClient.Create(ctx, pvc)).Should(Succeed())

// Create a PersistentVolumeClaim (PVC)
Expand Down Expand Up @@ -112,28 +113,14 @@ var _ = Describe("Ignoring PVC cleanup", func() {
objects.Helper().Event().DeleteAll(ctx, k8sClient)
})
Context("When Receiving event on node-termination", func() {
It("Should ignore as the PV is not bounded to PVC", func() {
By("By Creating a PV and PVC objects with different storage class and triggering node termination event")
pv := objects.Helper().PersistentVolume().Create(pvName, nodeName, storageClassName)
Expect(k8sClient.Create(ctx, pv)).Should(Succeed())

fetchedPv := &v1.PersistentVolume{}
Eventually(func() error {
if err := k8sClient.Get(ctx, types.NamespacedName{Name: pv.Name, Namespace: pv.Namespace}, fetchedPv); err != nil {
return err
}

if fetchedPv.Name != pv.Name {
return errors.New("Unable to find the PV after creation, creation failed")
}

return nil
}, timeout, interval).Should(Succeed())
It("Should ignore as there are no PVs attached", func() {
By("By Creating only a PVC object")

pvcAnnotations := map[string]string{
"appsflyer.com/local-pvc-releaser": "enabled",
"appsflyer.com/local-pvc-releaser": "enabled",
"volume.kubernetes.io/selected-node": nodeName,
}
pvc := objects.Helper().PersistentVolumeClaim().Create(pvcName, "different-storage-class", pvcAnnotations)
pvc := objects.Helper().PersistentVolumeClaim().Create(pvcName, "", "different-storage-class", pvcAnnotations)
Expect(k8sClient.Create(ctx, pvc)).Should(Succeed())

// Create a PersistentVolumeClaim (PVC)
Expand Down Expand Up @@ -210,9 +197,10 @@ var _ = Describe("Ignoring PVC cleanup", func() {
}, timeout, interval).Should(Succeed())

pvcAnnotations := map[string]string{
"test-should-fail": "true",
"test-should-fail": "true",
"volume.kubernetes.io/selected-node": nodeName,
}
pvc := objects.Helper().PersistentVolumeClaim().Create(pvcName, "different-storage-class", pvcAnnotations)
pvc := objects.Helper().PersistentVolumeClaim().Create(pvcName, pvName, "different-storage-class", pvcAnnotations)
Expect(k8sClient.Create(ctx, pvc)).Should(Succeed())

// Create a PersistentVolumeClaim (PVC)
Expand Down
5 changes: 3 additions & 2 deletions test/objects/PersistentVolumeClaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

type PVC interface {
Create(name, storageClassName string, annotations map[string]string) *corev1.PersistentVolumeClaim
Create(name, pvName string, storageClassName string, annotations map[string]string) *corev1.PersistentVolumeClaim
DeleteAll(ctx context.Context, client client.Client)
RemoveProtectionFinalizer(ctx context.Context, client client.Client, pvc *corev1.PersistentVolumeClaim, finalizerName string) error
}
Expand All @@ -24,7 +24,7 @@ func NewPVC() PVC {
return &persistentVolumeClaim{}
}

func (_ persistentVolumeClaim) Create(name, storageClassName string, annotations map[string]string) *corev1.PersistentVolumeClaim {
func (_ persistentVolumeClaim) Create(name, pvName string, storageClassName string, annotations map[string]string) *corev1.PersistentVolumeClaim {
pvc := &corev1.PersistentVolumeClaim{
TypeMeta: metav1.TypeMeta{
Kind: "PersistentVolumeClaim",
Expand All @@ -46,6 +46,7 @@ func (_ persistentVolumeClaim) Create(name, storageClassName string, annotations
},
},
StorageClassName: &storageClassName,
VolumeName: pvName,
},
}

Expand Down
Loading