diff --git a/internal/controller/pvc_controller.go b/internal/controller/pvc_controller.go index 5f3f8c2..e42c58d 100644 --- a/internal/controller/pvc_controller.go +++ b/internal/controller/pvc_controller.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "github.com/prometheus/client_golang/prometheus" - "k8s.io/utils/strings/slices" "strconv" "github.com/AppsFlyer/local-pvc-releaser/internal/exporters" @@ -39,6 +38,7 @@ import ( const ( RemovingNode = "RemovingNode" NodeControllerComponent = "node-controller" + PVCnodeAnnotationKey = "volume.kubernetes.io/selected-node" ) // PVCReconciler reconciles a PersistentVolumeClaim object @@ -68,34 +68,36 @@ func (r *PVCReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R r.Logger.Info("node termination event found", "Message", nodeTerminationEvent.Message, "EventID", nodeTerminationEvent.UID, "EventTime", nodeTerminationEvent.LastTimestamp) - pvList := &v1.PersistentVolumeList{} - if err := r.List(ctx, pvList); err != nil { - return ctrl.Result{}, err - } + terminatedNodeName := nodeTerminationEvent.InvolvedObject.Name - allPvcList := &v1.PersistentVolumeClaimList{} - if err := r.List(ctx, allPvcList); err != nil { + pvcList := &v1.PersistentVolumeClaimList{} + if err := r.List(ctx, pvcList); err != nil { return ctrl.Result{}, err } - nodePvList := r.FilterPVListByNodeName(pvList, nodeTerminationEvent.InvolvedObject.Name) + // Filtering the related PVC objects bounded to the terminated node + nodePvcList := r.FilterPVCListByNodeName(pvcList, terminatedNodeName) - if len(nodePvList) == 0 { - r.Logger.Info(fmt.Sprintf("could not find any bounded pv objects for node - %s. will not take any action", nodeTerminationEvent.InvolvedObject.Name)) + if len(nodePvcList) == 0 { + r.Logger.Info(fmt.Sprintf("could not find any bounded pvc objects for node - %s. will not take any action", terminatedNodeName)) return ctrl.Result{}, nil } - relatedPvcList := make([]*v1.PersistentVolumeClaim, 0) - for _, pv := range nodePvList { - if pvc := r.FilterPVCListByPV(allPvcList, pv); pvc != nil { - r.Logger.Info(fmt.Sprintf("pvc - %s is bounded to pv - %s and marked for deletion", pvc.Name, pv.Name)) - relatedPvcList = append(relatedPvcList, pvc) - continue + pvcListPendingDeletion := make([]*v1.PersistentVolumeClaim, 0) + for _, nodePvc := range nodePvcList { + + err, isLocal := r.CheckLocalPvStoragePluginByPVC(ctx, nodePvc) + if err != nil { + return ctrl.Result{}, err + } + + if isLocal { + r.Logger.Info(fmt.Sprintf("pvc - %s is bounded to a pv with local storage on the terminated node and will be marked for deletion", nodePvc.Name)) + pvcListPendingDeletion = append(pvcListPendingDeletion, nodePvc) } - r.Logger.Info(fmt.Sprintf("could not find the pvc object for pv - %s. pvc handling will be skipped", pv.Name)) } - if err := r.CleanPVCS(ctx, relatedPvcList); err != nil { + if err := r.CleanPVCS(ctx, pvcListPendingDeletion); err != nil { r.Logger.Error(err, "failed to delete pvc objects from kubernetes") } @@ -124,41 +126,35 @@ func (r *PVCReconciler) CleanPVCS(ctx context.Context, pvcs []*v1.PersistentVolu return nil } -func (r *PVCReconciler) FilterPVListByNodeName(pvList *v1.PersistentVolumeList, nodeName string) []*v1.PersistentVolume { - var relatedPVs []*v1.PersistentVolume - - for i := 0; i < len(pvList.Items); i++ { - pv := &pvList.Items[i] - // Ignoring PVs without affinity rules or PVs that already got released - if pv.Spec.NodeAffinity != nil && pv.Spec.NodeAffinity.Required == nil || pv.Status.Phase != v1.VolumeBound { - continue - } +func (r *PVCReconciler) FilterPVCListByNodeName(pvcList *v1.PersistentVolumeClaimList, nodeName string) []*v1.PersistentVolumeClaim { + var relatedPVCs []*v1.PersistentVolumeClaim - for _, nst := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms { - for _, matchEx := range nst.MatchExpressions { - if slices.Contains(matchEx.Values, nodeName) { - r.Logger.Info(fmt.Sprintf("pv - %s is bounded to node - %s. will be marked for pvc cleanup", pv.Name, nodeName)) - relatedPVs = append(relatedPVs, pv) + for i := 0; i < len(pvcList.Items); i++ { + pvc := &pvcList.Items[i] - break - } - } + if pvcNode, exists := pvc.Annotations[PVCnodeAnnotationKey]; exists && pvcNode == nodeName { + r.Logger.Info(fmt.Sprintf("pvc - %s is bounded to node - %s. will be marked for pv 'local' plugin scan.", pvc.Name, nodeName)) + relatedPVCs = append(relatedPVCs, pvc) } } - return relatedPVs + return relatedPVCs } -func (r *PVCReconciler) FilterPVCListByPV(pvcList *v1.PersistentVolumeClaimList, pv *v1.PersistentVolume) *v1.PersistentVolumeClaim { - for i := 0; i < len(pvcList.Items); i++ { - claim := &pvcList.Items[i] +func (r *PVCReconciler) CheckLocalPvStoragePluginByPVC(ctx context.Context, pvc *v1.PersistentVolumeClaim) (error, bool) { + pv := &v1.PersistentVolume{} + pvKey := client.ObjectKey{Name: pvc.Spec.VolumeName} - if claim.Spec.VolumeName == pv.Name { - return claim - } + if err := r.Get(ctx, pvKey, pv); err != nil { + r.Logger.Error(err, fmt.Sprintf("could not find the attached pv object - %s", pvc.Spec.VolumeName)) + return err, false } - return nil + if pv.Spec.Local != nil { + return nil, true + } + + return nil, false } // SetupWithManager sets up the controller with the Manager. diff --git a/test/integration_test.go b/test/integration_test.go index bd2a2c5..555b6e2 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -50,9 +50,10 @@ var _ = Describe("Successful PVC Release", func() { }, timeout, interval).Should(Succeed()) pvcAnnotations := map[string]string{ - "appsflyer.com/local-pvc-releaser": "enabled", + "appsflyer.com/local-pvc-releaser": "enabled", + "volume.kubernetes.io/selected-node": nodeName, } - pvc := objects.Helper().PersistentVolumeClaim().Create(pvcName, storageClassName, pvcAnnotations) + pvc := objects.Helper().PersistentVolumeClaim().Create(pvcName, pvName, storageClassName, pvcAnnotations) Expect(k8sClient.Create(ctx, pvc)).Should(Succeed()) // Create a PersistentVolumeClaim (PVC) @@ -112,28 +113,14 @@ var _ = Describe("Ignoring PVC cleanup", func() { objects.Helper().Event().DeleteAll(ctx, k8sClient) }) Context("When Receiving event on node-termination", func() { - It("Should ignore as the PV is not bounded to PVC", func() { - By("By Creating a PV and PVC objects with different storage class and triggering node termination event") - pv := objects.Helper().PersistentVolume().Create(pvName, nodeName, storageClassName) - Expect(k8sClient.Create(ctx, pv)).Should(Succeed()) - - fetchedPv := &v1.PersistentVolume{} - Eventually(func() error { - if err := k8sClient.Get(ctx, types.NamespacedName{Name: pv.Name, Namespace: pv.Namespace}, fetchedPv); err != nil { - return err - } - - if fetchedPv.Name != pv.Name { - return errors.New("Unable to find the PV after creation, creation failed") - } - - return nil - }, timeout, interval).Should(Succeed()) + It("Should ignore as there are no PVs attached", func() { + By("By Creating only a PVC object") pvcAnnotations := map[string]string{ - "appsflyer.com/local-pvc-releaser": "enabled", + "appsflyer.com/local-pvc-releaser": "enabled", + "volume.kubernetes.io/selected-node": nodeName, } - pvc := objects.Helper().PersistentVolumeClaim().Create(pvcName, "different-storage-class", pvcAnnotations) + pvc := objects.Helper().PersistentVolumeClaim().Create(pvcName, "", "different-storage-class", pvcAnnotations) Expect(k8sClient.Create(ctx, pvc)).Should(Succeed()) // Create a PersistentVolumeClaim (PVC) @@ -210,9 +197,10 @@ var _ = Describe("Ignoring PVC cleanup", func() { }, timeout, interval).Should(Succeed()) pvcAnnotations := map[string]string{ - "test-should-fail": "true", + "test-should-fail": "true", + "volume.kubernetes.io/selected-node": nodeName, } - pvc := objects.Helper().PersistentVolumeClaim().Create(pvcName, "different-storage-class", pvcAnnotations) + pvc := objects.Helper().PersistentVolumeClaim().Create(pvcName, pvName, "different-storage-class", pvcAnnotations) Expect(k8sClient.Create(ctx, pvc)).Should(Succeed()) // Create a PersistentVolumeClaim (PVC) diff --git a/test/objects/PersistentVolumeClaim.go b/test/objects/PersistentVolumeClaim.go index 2917196..4e3b89b 100644 --- a/test/objects/PersistentVolumeClaim.go +++ b/test/objects/PersistentVolumeClaim.go @@ -12,7 +12,7 @@ import ( ) type PVC interface { - Create(name, storageClassName string, annotations map[string]string) *corev1.PersistentVolumeClaim + Create(name, pvName string, storageClassName string, annotations map[string]string) *corev1.PersistentVolumeClaim DeleteAll(ctx context.Context, client client.Client) RemoveProtectionFinalizer(ctx context.Context, client client.Client, pvc *corev1.PersistentVolumeClaim, finalizerName string) error } @@ -24,7 +24,7 @@ func NewPVC() PVC { return &persistentVolumeClaim{} } -func (_ persistentVolumeClaim) Create(name, storageClassName string, annotations map[string]string) *corev1.PersistentVolumeClaim { +func (_ persistentVolumeClaim) Create(name, pvName string, storageClassName string, annotations map[string]string) *corev1.PersistentVolumeClaim { pvc := &corev1.PersistentVolumeClaim{ TypeMeta: metav1.TypeMeta{ Kind: "PersistentVolumeClaim", @@ -46,6 +46,7 @@ func (_ persistentVolumeClaim) Create(name, storageClassName string, annotations }, }, StorageClassName: &storageClassName, + VolumeName: pvName, }, }