From 350de932bf8410a65f990f53aaae693dcdb25538 Mon Sep 17 00:00:00 2001 From: Drew Sirenko <68304519+AndrewSirenko@users.noreply.github.com> Date: Tue, 7 Jan 2025 21:01:19 +0000 Subject: [PATCH] Ensure infeasible PVC modifications are retried at slower pace --- cmd/csi-resizer/main.go | 2 +- pkg/csi/mock_client.go | 10 +-- pkg/modifycontroller/controller.go | 10 ++- pkg/modifycontroller/controller_test.go | 93 +++++++++++++++++++++- pkg/modifycontroller/modify_status.go | 28 +++---- pkg/modifycontroller/modify_status_test.go | 13 +-- pkg/modifycontroller/modify_volume.go | 93 ++++++++++++++-------- 7 files changed, 186 insertions(+), 63 deletions(-) diff --git a/cmd/csi-resizer/main.go b/cmd/csi-resizer/main.go index 6e0b7bc57..9c19a69e0 100644 --- a/cmd/csi-resizer/main.go +++ b/cmd/csi-resizer/main.go @@ -222,7 +222,7 @@ func main() { var mc modifycontroller.ModifyController // Add modify controller only if the feature gate is enabled if utilfeature.DefaultFeatureGate.Enabled(features.VolumeAttributesClass) { - mc = modifycontroller.NewModifyController(modifierName, csiModifier, kubeClient, *resyncPeriod, *extraModifyMetadata, informerFactory, + mc = modifycontroller.NewModifyController(modifierName, csiModifier, kubeClient, *resyncPeriod, *retryIntervalMax, *extraModifyMetadata, informerFactory, workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax)) } diff --git a/pkg/csi/mock_client.go b/pkg/csi/mock_client.go index c8644b83d..49c93c3e2 100644 --- a/pkg/csi/mock_client.go +++ b/pkg/csi/mock_client.go @@ -39,7 +39,7 @@ type MockClient struct { expandCalled atomic.Int32 modifyCalled atomic.Int32 expansionError error - modifyFailed bool + modifyError error checkMigratedLabel bool usedSecrets atomic.Pointer[map[string]string] usedCapability atomic.Pointer[csi.VolumeCapability] @@ -74,8 +74,8 @@ func (c *MockClient) SetExpansionError(err error) { c.expansionError = err } -func (c *MockClient) SetModifyFailed() { - c.modifyFailed = true +func (c *MockClient) SetModifyError(err error) { + c.modifyError = err } func (c *MockClient) SetCheckMigratedLabel() { @@ -135,8 +135,8 @@ func (c *MockClient) Modify( secrets map[string]string, mutableParameters map[string]string) error { c.modifyCalled.Add(1) - if c.modifyFailed { - return fmt.Errorf("modify failed") + if c.modifyError != nil { + return c.modifyError } return nil } diff --git a/pkg/modifycontroller/controller.go b/pkg/modifycontroller/controller.go index eb8094906..976a6e0dd 100644 --- a/pkg/modifycontroller/controller.go +++ b/pkg/modifycontroller/controller.go @@ -61,6 +61,8 @@ type modifyController struct { extraModifyMetadata bool // the key of the map is {PVC_NAMESPACE}/{PVC_NAME} uncertainPVCs map[string]v1.PersistentVolumeClaim + // slowSet tracks PVCs for which modification failed with infeasible error and should be retried at slower rate. + slowSet *util.SlowSet } // NewModifyController returns a ModifyController. @@ -69,6 +71,7 @@ func NewModifyController( modifier modifier.Modifier, kubeClient kubernetes.Interface, resyncPeriod time.Duration, + maxRetryInterval time.Duration, extraModifyMetadata bool, informerFactory informers.SharedInformerFactory, pvcRateLimiter workqueue.TypedRateLimiter[string]) ModifyController { @@ -99,8 +102,9 @@ func NewModifyController( claimQueue: claimQueue, eventRecorder: eventRecorder, extraModifyMetadata: extraModifyMetadata, + slowSet: util.NewSlowSet(maxRetryInterval), } - // Add a resync period as the PVC's request modify can be modified again when we handling + // Add a resync period as the PVC's request modify can be modified again when we are handling // a previous modify request of the same PVC. pvcInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ AddFunc: ctrl.addPVC, @@ -211,6 +215,10 @@ func (ctrl *modifyController) Run( } stopCh := ctx.Done() + + // Starts go-routine that deletes expired slowSet entries. + go ctrl.slowSet.Run(stopCh) + for i := 0; i < workers; i++ { go wait.Until(ctrl.sync, 0, stopCh) } diff --git a/pkg/modifycontroller/controller_test.go b/pkg/modifycontroller/controller_test.go index 7bd7571ab..895aa9ac2 100644 --- a/pkg/modifycontroller/controller_test.go +++ b/pkg/modifycontroller/controller_test.go @@ -2,6 +2,12 @@ package modifycontroller import ( "context" + "errors" + "fmt" + "github.com/kubernetes-csi/external-resizer/pkg/util" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "k8s.io/client-go/tools/cache" "testing" "time" @@ -115,7 +121,7 @@ func TestModifyPVC(t *testing.T) { t.Run(test.name, func(t *testing.T) { client := csi.NewMockClient(testDriverName, true, true, true, true, true, false) if test.modifyFailure { - client.SetModifyFailed() + client.SetModifyError(fmt.Errorf("fake modification error")) } initialObjects := []runtime.Object{test.pvc, test.pv, testVacObject, targetVacObject} @@ -213,6 +219,89 @@ func TestSyncPVC(t *testing.T) { } } +// TestInfeasibleRetry tests that sidecar doesn't spam plugin upon infeasible error code (e.g. invalid VAC parameter) +func TestInfeasibleRetry(t *testing.T) { + basePVC := createTestPVC(pvcName, targetVac /*vacName*/, testVac /*curVacName*/, testVac /*targetVacName*/) + basePV := createTestPV(1, pvcName, pvcNamespace, "foobaz" /*pvcUID*/, &fsVolumeMode, testVac) + + tests := []struct { + name string + pvc *v1.PersistentVolumeClaim + expectedModifyCallCount int + csiModifyError error + eventuallyRemoveFromSlowSet bool + }{ + { + name: "Should retry non-infeasible error normally", + pvc: basePVC, + expectedModifyCallCount: 2, + csiModifyError: status.Errorf(codes.Internal, "fake non-infeasible error"), + eventuallyRemoveFromSlowSet: false, + }, + { + name: "Should NOT retry infeasible error normally", + pvc: basePVC, + expectedModifyCallCount: 1, + csiModifyError: status.Errorf(codes.InvalidArgument, "fake infeasible error"), + eventuallyRemoveFromSlowSet: false, + }, + { + name: "Should EVENTUALLY retry infeasible error", + pvc: basePVC, + expectedModifyCallCount: 2, + csiModifyError: status.Errorf(codes.InvalidArgument, "fake infeasible error"), + eventuallyRemoveFromSlowSet: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // Setup + client := csi.NewMockClient(testDriverName, true, true, true, true, true, false) + if test.csiModifyError != nil { + client.SetModifyError(test.csiModifyError) + } + + initialObjects := []runtime.Object{test.pvc, basePV, testVacObject, targetVacObject} + ctrlInstance, ctx := setupFakeK8sEnvironment(t, client, initialObjects) + defer ctx.Done() + + // Attempt modification first time + err := ctrlInstance.syncPVC(pvcNamespace + "/" + pvcName) + if !errors.Is(err, test.csiModifyError) { + t.Errorf("for %s, unexpected first syncPVC error: %v", test.name, err) + } + + // Fake time passing by removing from SlowSet + if test.eventuallyRemoveFromSlowSet { + pvcKey, _ := cache.MetaNamespaceKeyFunc(test.pvc) + ctrlInstance.slowSet.Remove(pvcKey) + } + + // Attempt modification second time + err2 := ctrlInstance.syncPVC(pvcNamespace + "/" + pvcName) + switch test.expectedModifyCallCount { + case 1: + if !util.IsDelayRetryError(err2) { + t.Errorf("for %s, unexpected second syncPVC error: %v", test.name, err) + } + case 2: + if !errors.Is(err2, test.csiModifyError) { + t.Errorf("for %s, unexpected second syncPVC error: %v", test.name, err) + } + default: + t.Errorf("for %s, unexpected second syncPVC error: %v", test.name, err) + } + + // Confirm CSI ModifyVolume was called desired amount of times + modifyCallCount := client.GetModifyCount() + if test.expectedModifyCallCount != modifyCallCount { + t.Fatalf("for %s: expected %d csi modify calls, but got %d", test.name, test.expectedModifyCallCount, modifyCallCount) + } + }) + } +} + // setupFakeK8sEnvironment creates fake K8s environment and starts Informers and ModifyController func setupFakeK8sEnvironment(t *testing.T, client *csi.MockClient, initialObjects []runtime.Object) (*modifyController, context.Context) { t.Helper() @@ -234,7 +323,7 @@ func setupFakeK8sEnvironment(t *testing.T, client *csi.MockClient, initialObject controller := NewModifyController(driverName, csiModifier, kubeClient, - 0, false, informerFactory, + 0 /* resyncPeriod */, 2*time.Minute, false, informerFactory, workqueue.DefaultTypedControllerRateLimiter[string]()) /* Start informers and ModifyController*/ diff --git a/pkg/modifycontroller/modify_status.go b/pkg/modifycontroller/modify_status.go index d9365373d..4a12d034a 100644 --- a/pkg/modifycontroller/modify_status.go +++ b/pkg/modifycontroller/modify_status.go @@ -18,7 +18,6 @@ package modifycontroller import ( "fmt" - "github.com/kubernetes-csi/external-resizer/pkg/util" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -57,13 +56,19 @@ func (ctrl *modifyController) markControllerModifyVolumeStatus( []v1.PersistentVolumeClaimCondition{pvcCondition}) } - updatedPVC, err := util.PatchClaim(ctrl.kubeClient, pvc, newPVC, true) + updatedPVC, err := util.PatchClaim(ctrl.kubeClient, pvc, newPVC, true /* addResourceVersionCheck */) if err != nil { return pvc, fmt.Errorf("mark PVC %q as modify volume failed, errored with: %v", pvc.Name, err) } // Remove this PVC from the uncertain cache since the status is known now if modifyVolumeStatus == v1.PersistentVolumeClaimModifyVolumeInfeasible { - ctrl.removePVCFromModifyVolumeUncertainCache(pvc) + pvcKey, err := cache.MetaNamespaceKeyFunc(pvc) + if err != nil { + return pvc, err + } + + ctrl.removePVCFromModifyVolumeUncertainCache(pvcKey) + ctrl.markForSlowRetry(pvc, pvcKey) } return updatedPVC, nil } @@ -84,7 +89,7 @@ func (ctrl *modifyController) updateConditionBasedOnError(pvc *v1.PersistentVolu newPVC.Status.Conditions = util.MergeModifyVolumeConditionsOfPVC(newPVC.Status.Conditions, []v1.PersistentVolumeClaimCondition{pvcCondition}) - updatedPVC, err := util.PatchClaim(ctrl.kubeClient, pvc, newPVC, false) + updatedPVC, err := util.PatchClaim(ctrl.kubeClient, pvc, newPVC, false /* addResourceVersionCheck */) if err != nil { return pvc, fmt.Errorf("mark PVC %q as controller expansion failed, errored with: %v", pvc.Name, err) } @@ -117,7 +122,7 @@ func (ctrl *modifyController) markControllerModifyVolumeCompleted(pvc *v1.Persis if err != nil { return pvc, pv, fmt.Errorf("update pv.Spec.VolumeAttributesClassName for PVC %q failed, errored with: %v", pvc.Name, err) } - updatedPVC, err := util.PatchClaim(ctrl.kubeClient, pvc, newPVC, false) + updatedPVC, err := util.PatchClaim(ctrl.kubeClient, pvc, newPVC, false /* addResourceVersionCheck */) if err != nil { return pvc, pv, fmt.Errorf("mark PVC %q as ModifyVolumeCompleted failed, errored with: %v", pvc.Name, err) @@ -140,22 +145,13 @@ func clearModifyVolumeConditions(conditions []v1.PersistentVolumeClaimCondition) } // removePVCFromModifyVolumeUncertainCache removes the pvc from the uncertain cache -func (ctrl *modifyController) removePVCFromModifyVolumeUncertainCache(pvc *v1.PersistentVolumeClaim) error { +func (ctrl *modifyController) removePVCFromModifyVolumeUncertainCache(pvcKey string) { if ctrl.uncertainPVCs == nil { - return nil + return } // Format of the key of the uncertainPVCs is NAMESPACE/NAME of the pvc - pvcKey, err := cache.MetaNamespaceKeyFunc(pvc) - if err != nil { - return err - } _, ok := ctrl.uncertainPVCs[pvcKey] if ok { delete(ctrl.uncertainPVCs, pvcKey) } - - if err != nil { - return err - } - return nil } diff --git a/pkg/modifycontroller/modify_status_test.go b/pkg/modifycontroller/modify_status_test.go index d6706a763..932245a65 100644 --- a/pkg/modifycontroller/modify_status_test.go +++ b/pkg/modifycontroller/modify_status_test.go @@ -120,7 +120,7 @@ func TestMarkControllerModifyVolumeStatus(t *testing.T) { } controller := NewModifyController(driverName, csiModifier, kubeClient, - time.Second, false, informerFactory, + time.Second, 2*time.Minute, false, informerFactory, workqueue.DefaultTypedControllerRateLimiter[string]()) ctrlInstance, _ := controller.(*modifyController) @@ -180,7 +180,7 @@ func TestUpdateConditionBasedOnError(t *testing.T) { } controller := NewModifyController(driverName, csiModifier, kubeClient, - time.Second, false, informerFactory, + time.Second, 2*time.Minute, false, informerFactory, workqueue.DefaultTypedControllerRateLimiter[string]()) ctrlInstance, _ := controller.(*modifyController) @@ -248,7 +248,7 @@ func TestMarkControllerModifyVolumeCompleted(t *testing.T) { } controller := NewModifyController(driverName, csiModifier, kubeClient, - time.Second, false, informerFactory, + time.Second, 2*time.Minute, false, informerFactory, workqueue.DefaultTypedControllerRateLimiter[string]()) ctrlInstance, _ := controller.(*modifyController) @@ -314,7 +314,7 @@ func TestRemovePVCFromModifyVolumeUncertainCache(t *testing.T) { } controller := NewModifyController(driverName, csiModifier, kubeClient, - time.Second, false, informerFactory, + time.Second, 2*time.Minute, false, informerFactory, workqueue.DefaultTypedControllerRateLimiter[string]()) ctrlInstance, _ := controller.(*modifyController) @@ -346,10 +346,11 @@ func TestRemovePVCFromModifyVolumeUncertainCache(t *testing.T) { time.Sleep(time.Second * 2) - err = ctrlInstance.removePVCFromModifyVolumeUncertainCache(tc.pvc) + pvcKey, err := cache.MetaNamespaceKeyFunc(tc.pvc) if err != nil { - t.Errorf("err deleting pvc: %v", tc.pvc) + t.Errorf("failed to extract pvc key from pvc %v", tc.pvc) } + ctrlInstance.removePVCFromModifyVolumeUncertainCache(pvcKey) deletedPVCKey, err := cache.MetaNamespaceKeyFunc(tc.pvc) if err != nil { diff --git a/pkg/modifycontroller/modify_volume.go b/pkg/modifycontroller/modify_volume.go index 2d9d6fa33..d1c69a11b 100644 --- a/pkg/modifycontroller/modify_volume.go +++ b/pkg/modifycontroller/modify_volume.go @@ -38,40 +38,35 @@ const ( func (ctrl *modifyController) modify(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) (*v1.PersistentVolumeClaim, *v1.PersistentVolume, error, bool) { pvcSpecVacName := pvc.Spec.VolumeAttributesClassName curVacName := pvc.Status.CurrentVolumeAttributesClassName + pvcKey, err := cache.MetaNamespaceKeyFunc(pvc) + if err != nil { + return pvc, pv, err, false + } + + // Requeue PVC if modification recently failed with infeasible error. + delayModificationErr := ctrl.delayModificationIfRecentlyInfeasible(pvc, pvcKey) + if delayModificationErr != nil { + return pvc, pv, delayModificationErr, false + } if pvcSpecVacName != nil && curVacName == nil { // First time adding VAC to a PVC return ctrl.validateVACAndModifyVolumeWithTarget(pvc, pv) } else if pvcSpecVacName != nil && curVacName != nil && *pvcSpecVacName != *curVacName { - targetVacName := *pvcSpecVacName - if pvc.Status.ModifyVolumeStatus != nil { - targetVacName = pvc.Status.ModifyVolumeStatus.TargetVolumeAttributesClassName - } - if *curVacName == targetVacName { - // if somehow both curVacName and targetVacName is same, what does this mean?? - // I am not sure about this. + // Check if PVC in uncertain state + _, inUncertainState := ctrl.uncertainPVCs[pvcKey] + if !inUncertainState { + klog.V(3).InfoS("previous operation on the PVC failed with a final error, retrying") return ctrl.validateVACAndModifyVolumeWithTarget(pvc, pv) } else { - // Check if the PVC is in uncertain State - pvcKey, err := cache.MetaNamespaceKeyFunc(pvc) + vac, err := ctrl.vacLister.Get(*pvcSpecVacName) if err != nil { return pvc, pv, err, false } - _, ok := ctrl.uncertainPVCs[pvcKey] - if !ok { - // PVC is not in uncertain state - klog.V(3).InfoS("previous operation on the PVC failed with a final error, retrying") - return ctrl.validateVACAndModifyVolumeWithTarget(pvc, pv) - } else { - vac, err := ctrl.vacLister.Get(*pvcSpecVacName) - if err != nil { - return pvc, pv, err, false - } - return ctrl.controllerModifyVolumeWithTarget(pvc, pv, vac, pvcSpecVacName) - } + return ctrl.controllerModifyVolumeWithTarget(pvc, pv, vac, pvcSpecVacName) } - } + // No modification required return pvc, pv, nil, false } @@ -118,28 +113,31 @@ func (ctrl *modifyController) controllerModifyVolumeWithTarget( ctrl.eventRecorder.Eventf(pvc, v1.EventTypeNormal, util.VolumeModifySuccess, fmt.Sprintf("external resizer modified volume %s with vac %s successfully ", pvc.Name, vacObj.Name)) return pvc, pv, nil, true } else { - status, ok := status.FromError(err) + errStatus, ok := status.FromError(err) if ok { - ctrl.updateConditionBasedOnError(pvc, err) - if !util.IsFinalError(err) { + pvc, updateConditionErr := ctrl.updateConditionBasedOnError(pvc, err) + if updateConditionErr != nil { + return nil, nil, err, false + } + pvcKey, keyErr := cache.MetaNamespaceKeyFunc(pvc) + if keyErr != nil { + return pvc, pv, keyErr, false + } + if !util.IsFinalError(keyErr) { // update conditions and cache pvc as uncertain - pvcKey, err := cache.MetaNamespaceKeyFunc(pvc) - if err != nil { - return pvc, pv, err, false - } ctrl.uncertainPVCs[pvcKey] = *pvc - } else { // Only InvalidArgument can be set to Infeasible state // Final errors other than InvalidArgument will still be in InProgress state - if status.Code() == codes.InvalidArgument { + if errStatus.Code() == codes.InvalidArgument { // Mark pvc.Status.ModifyVolumeStatus as infeasible pvc, markModifyVolumeInfeasibleError := ctrl.markControllerModifyVolumeStatus(pvc, v1.PersistentVolumeClaimModifyVolumeInfeasible, err) if markModifyVolumeInfeasibleError != nil { return pvc, pv, markModifyVolumeInfeasibleError, false } + ctrl.markForSlowRetry(pvc, pvcKey) } - ctrl.removePVCFromModifyVolumeUncertainCache(pvc) + ctrl.removePVCFromModifyVolumeUncertainCache(pvcKey) } } else { return pvc, pv, fmt.Errorf("cannot get error status from modify volume err: %v ", err), false @@ -171,3 +169,34 @@ func (ctrl *modifyController) callModifyVolumeOnPlugin( } return pvc, pv, nil } + +// func delayModificationIfRecentlyInfeasible returns a delayRetryError if PVC modification recently failed with +// infeasible error +func (ctrl *modifyController) delayModificationIfRecentlyInfeasible(pvc *v1.PersistentVolumeClaim, pvcKey string) error { + // Do not delay modification if PVC updated with new VAC + s := pvc.Status.ModifyVolumeStatus + if s == nil || pvc.Spec.VolumeAttributesClassName == nil || s.TargetVolumeAttributesClassName != *pvc.Spec.VolumeAttributesClassName { + // remove key from slowSet because new VAC may be feasible + ctrl.slowSet.Remove(pvcKey) + return nil + } + + inSlowSet := ctrl.slowSet.Contains(pvcKey) + ctrl.markForSlowRetry(pvc, pvcKey) + + if inSlowSet { + msg := fmt.Sprintf("skipping volume modification for pvc %s, because modification previously failed with infeasible error", pvcKey) + klog.V(4).Infof(msg) + delayRetryError := util.NewDelayRetryError(msg, ctrl.slowSet.TimeRemaining(pvcKey)) + return delayRetryError + } + return nil +} + +// func markForSlowRetry adds PVC to controller's slowSet IF PVC's ModifyVolumeStatus is Infeasible +func (ctrl *modifyController) markForSlowRetry(pvc *v1.PersistentVolumeClaim, pvcKey string) { + s := pvc.Status.ModifyVolumeStatus + if s != nil && s.Status == v1.PersistentVolumeClaimModifyVolumeInfeasible { + ctrl.slowSet.Add(pvcKey) + } +}