diff --git a/cmd/csi-resizer/main.go b/cmd/csi-resizer/main.go index 6e0b7bc57..9c19a69e0 100644 --- a/cmd/csi-resizer/main.go +++ b/cmd/csi-resizer/main.go @@ -222,7 +222,7 @@ func main() { var mc modifycontroller.ModifyController // Add modify controller only if the feature gate is enabled if utilfeature.DefaultFeatureGate.Enabled(features.VolumeAttributesClass) { - mc = modifycontroller.NewModifyController(modifierName, csiModifier, kubeClient, *resyncPeriod, *extraModifyMetadata, informerFactory, + mc = modifycontroller.NewModifyController(modifierName, csiModifier, kubeClient, *resyncPeriod, *retryIntervalMax, *extraModifyMetadata, informerFactory, workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax)) } diff --git a/pkg/csi/mock_client.go b/pkg/csi/mock_client.go index c8644b83d..49c93c3e2 100644 --- a/pkg/csi/mock_client.go +++ b/pkg/csi/mock_client.go @@ -39,7 +39,7 @@ type MockClient struct { expandCalled atomic.Int32 modifyCalled atomic.Int32 expansionError error - modifyFailed bool + modifyError error checkMigratedLabel bool usedSecrets atomic.Pointer[map[string]string] usedCapability atomic.Pointer[csi.VolumeCapability] @@ -74,8 +74,8 @@ func (c *MockClient) SetExpansionError(err error) { c.expansionError = err } -func (c *MockClient) SetModifyFailed() { - c.modifyFailed = true +func (c *MockClient) SetModifyError(err error) { + c.modifyError = err } func (c *MockClient) SetCheckMigratedLabel() { @@ -135,8 +135,8 @@ func (c *MockClient) Modify( secrets map[string]string, mutableParameters map[string]string) error { c.modifyCalled.Add(1) - if c.modifyFailed { - return fmt.Errorf("modify failed") + if c.modifyError != nil { + return c.modifyError } return nil } diff --git a/pkg/modifycontroller/controller.go b/pkg/modifycontroller/controller.go index aa45cb066..976a6e0dd 100644 --- a/pkg/modifycontroller/controller.go +++ b/pkg/modifycontroller/controller.go @@ -61,6 +61,8 @@ type modifyController struct { extraModifyMetadata bool // the key of the map is {PVC_NAMESPACE}/{PVC_NAME} uncertainPVCs map[string]v1.PersistentVolumeClaim + // slowSet tracks PVCs for which modification failed with infeasible error and should be retried at slower rate. + slowSet *util.SlowSet } // NewModifyController returns a ModifyController. @@ -69,6 +71,7 @@ func NewModifyController( modifier modifier.Modifier, kubeClient kubernetes.Interface, resyncPeriod time.Duration, + maxRetryInterval time.Duration, extraModifyMetadata bool, informerFactory informers.SharedInformerFactory, pvcRateLimiter workqueue.TypedRateLimiter[string]) ModifyController { @@ -99,8 +102,9 @@ func NewModifyController( claimQueue: claimQueue, eventRecorder: eventRecorder, extraModifyMetadata: extraModifyMetadata, + slowSet: util.NewSlowSet(maxRetryInterval), } - // Add a resync period as the PVC's request modify can be modified again when we handling + // Add a resync period as the PVC's request modify can be modified again when we are handling // a previous modify request of the same PVC. pvcInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ AddFunc: ctrl.addPVC, @@ -211,6 +215,10 @@ func (ctrl *modifyController) Run( } stopCh := ctx.Done() + + // Starts go-routine that deletes expired slowSet entries. + go ctrl.slowSet.Run(stopCh) + for i := 0; i < workers; i++ { go wait.Until(ctrl.sync, 0, stopCh) } @@ -235,7 +243,7 @@ func (ctrl *modifyController) sync() { } } -// syncPVC checks if a pvc requests resizing, and execute the resize operation if requested. +// syncPVC checks if a pvc requests modification, and execute the ModifyVolume operation if requested. func (ctrl *modifyController) syncPVC(key string) error { klog.V(4).InfoS("Started PVC processing for modify controller", "key", key) @@ -260,7 +268,7 @@ func (ctrl *modifyController) syncPVC(key string) error { } // Only trigger modify volume if the following conditions are met - // 1. Non empty vac name + // 1. Non-empty vac name // 2. PVC is in Bound state // 3. PV CSI driver name matches local driver vacName := pvc.Spec.VolumeAttributesClassName diff --git a/pkg/modifycontroller/controller_test.go b/pkg/modifycontroller/controller_test.go index 104481390..895aa9ac2 100644 --- a/pkg/modifycontroller/controller_test.go +++ b/pkg/modifycontroller/controller_test.go @@ -2,6 +2,12 @@ package modifycontroller import ( "context" + "errors" + "fmt" + "github.com/kubernetes-csi/external-resizer/pkg/util" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "k8s.io/client-go/tools/cache" "testing" "time" @@ -37,7 +43,7 @@ func TestController(t *testing.T) { }{ { name: "Modify called", - pvc: createTestPVC(pvcName, "target-vac" /*vacName*/, testVac /*curVacName*/, testVac /*targetVacName*/), + pvc: createTestPVC(pvcName, targetVac /*vacName*/, testVac /*curVacName*/, testVac /*targetVacName*/), pv: basePV, vacExists: true, callCSIModify: true, @@ -61,57 +67,13 @@ func TestController(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { // Setup - client := csi.NewMockClient("foo", true, true, true, true, true, false) - driverName, _ := client.GetDriverName(context.TODO()) - - var initialObjects []runtime.Object - initialObjects = append(initialObjects, test.pvc) - initialObjects = append(initialObjects, test.pv) - // existing vac set in the pvc and pv - initialObjects = append(initialObjects, testVacObject) - if test.vacExists { - initialObjects = append(initialObjects, targetVacObject) - } - - kubeClient, informerFactory := fakeK8s(initialObjects) - pvInformer := informerFactory.Core().V1().PersistentVolumes() - pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() - vacInformer := informerFactory.Storage().V1beta1().VolumeAttributesClasses() - - csiModifier, err := modifier.NewModifierFromClient(client, 15*time.Second, kubeClient, informerFactory, false, driverName) - if err != nil { - t.Fatalf("Test %s: Unable to create modifier: %v", test.name, err) - } - - featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeAttributesClass, true) - controller := NewModifyController(driverName, - csiModifier, kubeClient, - time.Second, false, informerFactory, - workqueue.DefaultTypedControllerRateLimiter[string]()) - - ctrlInstance, _ := controller.(*modifyController) + client := csi.NewMockClient(testDriverName, true, true, true, true, true, false) - stopCh := make(chan struct{}) - informerFactory.Start(stopCh) - - ctx := context.TODO() + initialObjects := []runtime.Object{test.pvc, test.pv, testVacObject, targetVacObject} + ctrlInstance, ctx := setupFakeK8sEnvironment(t, client, initialObjects) defer ctx.Done() - go controller.Run(1, ctx) - - for _, obj := range initialObjects { - switch obj.(type) { - case *v1.PersistentVolume: - pvInformer.Informer().GetStore().Add(obj) - case *v1.PersistentVolumeClaim: - pvcInformer.Informer().GetStore().Add(obj) - case *storagev1beta1.VolumeAttributesClass: - vacInformer.Informer().GetStore().Add(obj) - default: - t.Fatalf("Test %s: Unknown initalObject type: %+v", test.name, obj) - } - } - time.Sleep(time.Second * 2) - _, _, err, _ = ctrlInstance.modify(test.pvc, test.pv) + + _, _, err, _ := ctrlInstance.modify(test.pvc, test.pv) if err != nil { t.Fatalf("for %s: unexpected error: %v", test.name, err) } @@ -141,14 +103,14 @@ func TestModifyPVC(t *testing.T) { }{ { name: "Modify succeeded", - pvc: createTestPVC(pvcName, "target-vac" /*vacName*/, testVac /*curVacName*/, testVac /*targetVacName*/), + pvc: createTestPVC(pvcName, targetVac /*vacName*/, testVac /*curVacName*/, testVac /*targetVacName*/), pv: basePV, modifyFailure: false, expectFailure: false, }, { name: "Modify failed", - pvc: createTestPVC(pvcName, "target-vac" /*vacName*/, testVac /*curVacName*/, testVac /*targetVacName*/), + pvc: createTestPVC(pvcName, targetVac /*vacName*/, testVac /*curVacName*/, testVac /*targetVacName*/), pv: basePV, modifyFailure: true, expectFailure: true, @@ -157,77 +119,235 @@ func TestModifyPVC(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - client := csi.NewMockClient("mock", true, true, true, true, true, false) + client := csi.NewMockClient(testDriverName, true, true, true, true, true, false) if test.modifyFailure { - client.SetModifyFailed() + client.SetModifyError(fmt.Errorf("fake modification error")) } - driverName, _ := client.GetDriverName(context.TODO()) - initialObjects := []runtime.Object{} - if test.pvc != nil { - initialObjects = append(initialObjects, test.pvc) + initialObjects := []runtime.Object{test.pvc, test.pv, testVacObject, targetVacObject} + ctrlInstance, ctx := setupFakeK8sEnvironment(t, client, initialObjects) + defer ctx.Done() + + _, _, err, _ := ctrlInstance.modify(test.pvc, test.pv) + + if test.expectFailure && err == nil { + t.Errorf("for %s expected error got nothing", test.name) } - if test.pv != nil { - test.pv.Spec.PersistentVolumeSource.CSI.Driver = driverName - initialObjects = append(initialObjects, test.pv) + + if !test.expectFailure { + if err != nil { + t.Errorf("for %s, unexpected error: %v", test.name, err) + } } + }) + } +} - // existing vac set in the pvc and pv - initialObjects = append(initialObjects, testVacObject) - // new vac used in modify volume - initialObjects = append(initialObjects, targetVacObject) +func TestSyncPVC(t *testing.T) { + basePVC := createTestPVC(pvcName, targetVac /*vacName*/, testVac /*curVacName*/, testVac /*targetVacName*/) + basePV := createTestPV(1, pvcName, pvcNamespace, "foobaz" /*pvcUID*/, &fsVolumeMode, testVac) - kubeClient, informerFactory := fakeK8s(initialObjects) - pvInformer := informerFactory.Core().V1().PersistentVolumes() - pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() - vacInformer := informerFactory.Storage().V1beta1().VolumeAttributesClasses() + otherDriverPV := createTestPV(1, pvcName, pvcNamespace, "foobaz" /*pvcUID*/, &fsVolumeMode, testVac) + otherDriverPV.Spec.PersistentVolumeSource.CSI.Driver = "some-other-driver" - csiModifier, err := modifier.NewModifierFromClient(client, 15*time.Second, kubeClient, informerFactory, false, driverName) + unboundPVC := createTestPVC(pvcName, targetVac /*vacName*/, testVac /*curVacName*/, testVac /*targetVacName*/) + unboundPVC.Status.Phase = v1.ClaimPending + + pvcWithUncreatedPV := createTestPVC(pvcName, targetVac /*vacName*/, testVac /*curVacName*/, testVac /*targetVacName*/) + pvcWithUncreatedPV.Spec.VolumeName = "" + + tests := []struct { + name string + pvc *v1.PersistentVolumeClaim + pv *v1.PersistentVolume + callCSIModify bool + }{ + { + name: "Should execute ModifyVolume operation when PVC's VAC changes", + pvc: basePVC, + pv: basePV, + callCSIModify: true, + }, + { + name: "Should NOT modify if PVC managed by another CSI Driver", + pvc: basePVC, + pv: otherDriverPV, + callCSIModify: false, + }, + { + name: "Should NOT modify if PVC has empty Spec.VACName", + pvc: createTestPVC(pvcName, "" /*vacName*/, testVac /*curVacName*/, testVac /*targetVacName*/), + pv: basePV, + callCSIModify: false, + }, + { + name: "Should NOT modify if PVC not in bound state", + pvc: unboundPVC, + pv: basePV, + callCSIModify: false, + }, + { + name: "Should NOT modify if PVC's PV not created yet", + pvc: pvcWithUncreatedPV, + pv: basePV, + callCSIModify: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + client := csi.NewMockClient(testDriverName, true, true, true, true, true, false) + + initialObjects := []runtime.Object{test.pvc, test.pv, testVacObject, targetVacObject} + ctrlInstance, ctx := setupFakeK8sEnvironment(t, client, initialObjects) + defer ctx.Done() + + err := ctrlInstance.syncPVC(pvcNamespace + "/" + pvcName) if err != nil { - t.Fatalf("Test %s: Unable to create modifier: %v", test.name, err) + t.Errorf("for %s, unexpected error: %v", test.name, err) } - featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeAttributesClass, true) - controller := NewModifyController(driverName, - csiModifier, kubeClient, - time.Second, false, informerFactory, - workqueue.DefaultTypedControllerRateLimiter[string]()) + modifyCallCount := client.GetModifyCount() + if test.callCSIModify && modifyCallCount == 0 { + t.Fatalf("for %s: expected csi modify call, no csi modify call was made", test.name) + } - ctrlInstance, _ := controller.(*modifyController) + if !test.callCSIModify && modifyCallCount > 0 { + t.Fatalf("for %s: expected no csi modify call, received csi modify request", test.name) + } + }) + } +} - stopCh := make(chan struct{}) - informerFactory.Start(stopCh) +// TestInfeasibleRetry tests that sidecar doesn't spam plugin upon infeasible error code (e.g. invalid VAC parameter) +func TestInfeasibleRetry(t *testing.T) { + basePVC := createTestPVC(pvcName, targetVac /*vacName*/, testVac /*curVacName*/, testVac /*targetVacName*/) + basePV := createTestPV(1, pvcName, pvcNamespace, "foobaz" /*pvcUID*/, &fsVolumeMode, testVac) - ctx := context.TODO() - defer ctx.Done() - go controller.Run(1, ctx) - - for _, obj := range initialObjects { - switch obj.(type) { - case *v1.PersistentVolume: - pvInformer.Informer().GetStore().Add(obj) - case *v1.PersistentVolumeClaim: - pvcInformer.Informer().GetStore().Add(obj) - case *storagev1beta1.VolumeAttributesClass: - vacInformer.Informer().GetStore().Add(obj) - default: - t.Fatalf("Test %s: Unknown initalObject type: %+v", test.name, obj) - } + tests := []struct { + name string + pvc *v1.PersistentVolumeClaim + expectedModifyCallCount int + csiModifyError error + eventuallyRemoveFromSlowSet bool + }{ + { + name: "Should retry non-infeasible error normally", + pvc: basePVC, + expectedModifyCallCount: 2, + csiModifyError: status.Errorf(codes.Internal, "fake non-infeasible error"), + eventuallyRemoveFromSlowSet: false, + }, + { + name: "Should NOT retry infeasible error normally", + pvc: basePVC, + expectedModifyCallCount: 1, + csiModifyError: status.Errorf(codes.InvalidArgument, "fake infeasible error"), + eventuallyRemoveFromSlowSet: false, + }, + { + name: "Should EVENTUALLY retry infeasible error", + pvc: basePVC, + expectedModifyCallCount: 2, + csiModifyError: status.Errorf(codes.InvalidArgument, "fake infeasible error"), + eventuallyRemoveFromSlowSet: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // Setup + client := csi.NewMockClient(testDriverName, true, true, true, true, true, false) + if test.csiModifyError != nil { + client.SetModifyError(test.csiModifyError) } - time.Sleep(time.Second * 2) + initialObjects := []runtime.Object{test.pvc, basePV, testVacObject, targetVacObject} + ctrlInstance, ctx := setupFakeK8sEnvironment(t, client, initialObjects) + defer ctx.Done() - _, _, err, _ = ctrlInstance.modify(test.pvc, test.pv) + // Attempt modification first time + err := ctrlInstance.syncPVC(pvcNamespace + "/" + pvcName) + if !errors.Is(err, test.csiModifyError) { + t.Errorf("for %s, unexpected first syncPVC error: %v", test.name, err) + } - if test.expectFailure && err == nil { - t.Errorf("for %s expected error got nothing", test.name) + // Fake time passing by removing from SlowSet + if test.eventuallyRemoveFromSlowSet { + pvcKey, _ := cache.MetaNamespaceKeyFunc(test.pvc) + ctrlInstance.slowSet.Remove(pvcKey) } - if !test.expectFailure { - if err != nil { - t.Errorf("for %s, unexpected error: %v", test.name, err) + // Attempt modification second time + err2 := ctrlInstance.syncPVC(pvcNamespace + "/" + pvcName) + switch test.expectedModifyCallCount { + case 1: + if !util.IsDelayRetryError(err2) { + t.Errorf("for %s, unexpected second syncPVC error: %v", test.name, err) + } + case 2: + if !errors.Is(err2, test.csiModifyError) { + t.Errorf("for %s, unexpected second syncPVC error: %v", test.name, err) } + default: + t.Errorf("for %s, unexpected second syncPVC error: %v", test.name, err) + } + + // Confirm CSI ModifyVolume was called desired amount of times + modifyCallCount := client.GetModifyCount() + if test.expectedModifyCallCount != modifyCallCount { + t.Fatalf("for %s: expected %d csi modify calls, but got %d", test.name, test.expectedModifyCallCount, modifyCallCount) } }) } } + +// setupFakeK8sEnvironment creates fake K8s environment and starts Informers and ModifyController +func setupFakeK8sEnvironment(t *testing.T, client *csi.MockClient, initialObjects []runtime.Object) (*modifyController, context.Context) { + t.Helper() + + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeAttributesClass, true) + + /* Create fake kubeClient, Informers, and ModifyController */ + kubeClient, informerFactory := fakeK8s(initialObjects) + pvInformer := informerFactory.Core().V1().PersistentVolumes() + pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() + vacInformer := informerFactory.Storage().V1beta1().VolumeAttributesClasses() + + driverName, _ := client.GetDriverName(context.TODO()) + + csiModifier, err := modifier.NewModifierFromClient(client, 15*time.Second, kubeClient, informerFactory, false, driverName) + if err != nil { + t.Fatalf("Test %s: Unable to create modifier: %v", t.Name(), err) + } + + controller := NewModifyController(driverName, + csiModifier, kubeClient, + 0 /* resyncPeriod */, 2*time.Minute, false, informerFactory, + workqueue.DefaultTypedControllerRateLimiter[string]()) + + /* Start informers and ModifyController*/ + stopCh := make(chan struct{}) + informerFactory.Start(stopCh) + + ctx := context.TODO() + go controller.Run(1, ctx) + + /* Add initial objects to informer caches */ + for _, obj := range initialObjects { + switch obj.(type) { + case *v1.PersistentVolume: + pvInformer.Informer().GetStore().Add(obj) + case *v1.PersistentVolumeClaim: + pvcInformer.Informer().GetStore().Add(obj) + case *storagev1beta1.VolumeAttributesClass: + vacInformer.Informer().GetStore().Add(obj) + default: + t.Fatalf("Test %s: Unknown initalObject type: %+v", t.Name(), obj) + } + } + + ctrlInstance, _ := controller.(*modifyController) + + return ctrlInstance, ctx +} diff --git a/pkg/modifycontroller/modify_status.go b/pkg/modifycontroller/modify_status.go index d9365373d..4a12d034a 100644 --- a/pkg/modifycontroller/modify_status.go +++ b/pkg/modifycontroller/modify_status.go @@ -18,7 +18,6 @@ package modifycontroller import ( "fmt" - "github.com/kubernetes-csi/external-resizer/pkg/util" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -57,13 +56,19 @@ func (ctrl *modifyController) markControllerModifyVolumeStatus( []v1.PersistentVolumeClaimCondition{pvcCondition}) } - updatedPVC, err := util.PatchClaim(ctrl.kubeClient, pvc, newPVC, true) + updatedPVC, err := util.PatchClaim(ctrl.kubeClient, pvc, newPVC, true /* addResourceVersionCheck */) if err != nil { return pvc, fmt.Errorf("mark PVC %q as modify volume failed, errored with: %v", pvc.Name, err) } // Remove this PVC from the uncertain cache since the status is known now if modifyVolumeStatus == v1.PersistentVolumeClaimModifyVolumeInfeasible { - ctrl.removePVCFromModifyVolumeUncertainCache(pvc) + pvcKey, err := cache.MetaNamespaceKeyFunc(pvc) + if err != nil { + return pvc, err + } + + ctrl.removePVCFromModifyVolumeUncertainCache(pvcKey) + ctrl.markForSlowRetry(pvc, pvcKey) } return updatedPVC, nil } @@ -84,7 +89,7 @@ func (ctrl *modifyController) updateConditionBasedOnError(pvc *v1.PersistentVolu newPVC.Status.Conditions = util.MergeModifyVolumeConditionsOfPVC(newPVC.Status.Conditions, []v1.PersistentVolumeClaimCondition{pvcCondition}) - updatedPVC, err := util.PatchClaim(ctrl.kubeClient, pvc, newPVC, false) + updatedPVC, err := util.PatchClaim(ctrl.kubeClient, pvc, newPVC, false /* addResourceVersionCheck */) if err != nil { return pvc, fmt.Errorf("mark PVC %q as controller expansion failed, errored with: %v", pvc.Name, err) } @@ -117,7 +122,7 @@ func (ctrl *modifyController) markControllerModifyVolumeCompleted(pvc *v1.Persis if err != nil { return pvc, pv, fmt.Errorf("update pv.Spec.VolumeAttributesClassName for PVC %q failed, errored with: %v", pvc.Name, err) } - updatedPVC, err := util.PatchClaim(ctrl.kubeClient, pvc, newPVC, false) + updatedPVC, err := util.PatchClaim(ctrl.kubeClient, pvc, newPVC, false /* addResourceVersionCheck */) if err != nil { return pvc, pv, fmt.Errorf("mark PVC %q as ModifyVolumeCompleted failed, errored with: %v", pvc.Name, err) @@ -140,22 +145,13 @@ func clearModifyVolumeConditions(conditions []v1.PersistentVolumeClaimCondition) } // removePVCFromModifyVolumeUncertainCache removes the pvc from the uncertain cache -func (ctrl *modifyController) removePVCFromModifyVolumeUncertainCache(pvc *v1.PersistentVolumeClaim) error { +func (ctrl *modifyController) removePVCFromModifyVolumeUncertainCache(pvcKey string) { if ctrl.uncertainPVCs == nil { - return nil + return } // Format of the key of the uncertainPVCs is NAMESPACE/NAME of the pvc - pvcKey, err := cache.MetaNamespaceKeyFunc(pvc) - if err != nil { - return err - } _, ok := ctrl.uncertainPVCs[pvcKey] if ok { delete(ctrl.uncertainPVCs, pvcKey) } - - if err != nil { - return err - } - return nil } diff --git a/pkg/modifycontroller/modify_status_test.go b/pkg/modifycontroller/modify_status_test.go index fb49778a6..932245a65 100644 --- a/pkg/modifycontroller/modify_status_test.go +++ b/pkg/modifycontroller/modify_status_test.go @@ -28,12 +28,14 @@ import ( const ( pvcName = "foo" pvcNamespace = "modify" + pvName = "testPV" ) var ( fsVolumeMode = v1.PersistentVolumeFilesystem testVac = "test-vac" targetVac = "target-vac" + testDriverName = "mock" infeasibleErr = status.Errorf(codes.InvalidArgument, "Parameters in VolumeAttributesClass is invalid") finalErr = status.Errorf(codes.Internal, "Final error") pvcConditionInProgress = v1.PersistentVolumeClaimCondition{ @@ -118,7 +120,7 @@ func TestMarkControllerModifyVolumeStatus(t *testing.T) { } controller := NewModifyController(driverName, csiModifier, kubeClient, - time.Second, false, informerFactory, + time.Second, 2*time.Minute, false, informerFactory, workqueue.DefaultTypedControllerRateLimiter[string]()) ctrlInstance, _ := controller.(*modifyController) @@ -178,7 +180,7 @@ func TestUpdateConditionBasedOnError(t *testing.T) { } controller := NewModifyController(driverName, csiModifier, kubeClient, - time.Second, false, informerFactory, + time.Second, 2*time.Minute, false, informerFactory, workqueue.DefaultTypedControllerRateLimiter[string]()) ctrlInstance, _ := controller.(*modifyController) @@ -246,7 +248,7 @@ func TestMarkControllerModifyVolumeCompleted(t *testing.T) { } controller := NewModifyController(driverName, csiModifier, kubeClient, - time.Second, false, informerFactory, + time.Second, 2*time.Minute, false, informerFactory, workqueue.DefaultTypedControllerRateLimiter[string]()) ctrlInstance, _ := controller.(*modifyController) @@ -312,7 +314,7 @@ func TestRemovePVCFromModifyVolumeUncertainCache(t *testing.T) { } controller := NewModifyController(driverName, csiModifier, kubeClient, - time.Second, false, informerFactory, + time.Second, 2*time.Minute, false, informerFactory, workqueue.DefaultTypedControllerRateLimiter[string]()) ctrlInstance, _ := controller.(*modifyController) @@ -344,10 +346,11 @@ func TestRemovePVCFromModifyVolumeUncertainCache(t *testing.T) { time.Sleep(time.Second * 2) - err = ctrlInstance.removePVCFromModifyVolumeUncertainCache(tc.pvc) + pvcKey, err := cache.MetaNamespaceKeyFunc(tc.pvc) if err != nil { - t.Errorf("err deleting pvc: %v", tc.pvc) + t.Errorf("failed to extract pvc key from pvc %v", tc.pvc) } + ctrlInstance.removePVCFromModifyVolumeUncertainCache(pvcKey) deletedPVCKey, err := cache.MetaNamespaceKeyFunc(tc.pvc) if err != nil { @@ -390,7 +393,7 @@ func createTestPV(capacityGB int, pvcName, pvcNamespace string, pvcUID types.UID }, PersistentVolumeSource: v1.PersistentVolumeSource{ CSI: &v1.CSIPersistentVolumeSource{ - Driver: "foo", + Driver: testDriverName, VolumeHandle: "foo", }, }, diff --git a/pkg/modifycontroller/modify_volume.go b/pkg/modifycontroller/modify_volume.go index 2d9d6fa33..d1c69a11b 100644 --- a/pkg/modifycontroller/modify_volume.go +++ b/pkg/modifycontroller/modify_volume.go @@ -38,40 +38,35 @@ const ( func (ctrl *modifyController) modify(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) (*v1.PersistentVolumeClaim, *v1.PersistentVolume, error, bool) { pvcSpecVacName := pvc.Spec.VolumeAttributesClassName curVacName := pvc.Status.CurrentVolumeAttributesClassName + pvcKey, err := cache.MetaNamespaceKeyFunc(pvc) + if err != nil { + return pvc, pv, err, false + } + + // Requeue PVC if modification recently failed with infeasible error. + delayModificationErr := ctrl.delayModificationIfRecentlyInfeasible(pvc, pvcKey) + if delayModificationErr != nil { + return pvc, pv, delayModificationErr, false + } if pvcSpecVacName != nil && curVacName == nil { // First time adding VAC to a PVC return ctrl.validateVACAndModifyVolumeWithTarget(pvc, pv) } else if pvcSpecVacName != nil && curVacName != nil && *pvcSpecVacName != *curVacName { - targetVacName := *pvcSpecVacName - if pvc.Status.ModifyVolumeStatus != nil { - targetVacName = pvc.Status.ModifyVolumeStatus.TargetVolumeAttributesClassName - } - if *curVacName == targetVacName { - // if somehow both curVacName and targetVacName is same, what does this mean?? - // I am not sure about this. + // Check if PVC in uncertain state + _, inUncertainState := ctrl.uncertainPVCs[pvcKey] + if !inUncertainState { + klog.V(3).InfoS("previous operation on the PVC failed with a final error, retrying") return ctrl.validateVACAndModifyVolumeWithTarget(pvc, pv) } else { - // Check if the PVC is in uncertain State - pvcKey, err := cache.MetaNamespaceKeyFunc(pvc) + vac, err := ctrl.vacLister.Get(*pvcSpecVacName) if err != nil { return pvc, pv, err, false } - _, ok := ctrl.uncertainPVCs[pvcKey] - if !ok { - // PVC is not in uncertain state - klog.V(3).InfoS("previous operation on the PVC failed with a final error, retrying") - return ctrl.validateVACAndModifyVolumeWithTarget(pvc, pv) - } else { - vac, err := ctrl.vacLister.Get(*pvcSpecVacName) - if err != nil { - return pvc, pv, err, false - } - return ctrl.controllerModifyVolumeWithTarget(pvc, pv, vac, pvcSpecVacName) - } + return ctrl.controllerModifyVolumeWithTarget(pvc, pv, vac, pvcSpecVacName) } - } + // No modification required return pvc, pv, nil, false } @@ -118,28 +113,31 @@ func (ctrl *modifyController) controllerModifyVolumeWithTarget( ctrl.eventRecorder.Eventf(pvc, v1.EventTypeNormal, util.VolumeModifySuccess, fmt.Sprintf("external resizer modified volume %s with vac %s successfully ", pvc.Name, vacObj.Name)) return pvc, pv, nil, true } else { - status, ok := status.FromError(err) + errStatus, ok := status.FromError(err) if ok { - ctrl.updateConditionBasedOnError(pvc, err) - if !util.IsFinalError(err) { + pvc, updateConditionErr := ctrl.updateConditionBasedOnError(pvc, err) + if updateConditionErr != nil { + return nil, nil, err, false + } + pvcKey, keyErr := cache.MetaNamespaceKeyFunc(pvc) + if keyErr != nil { + return pvc, pv, keyErr, false + } + if !util.IsFinalError(keyErr) { // update conditions and cache pvc as uncertain - pvcKey, err := cache.MetaNamespaceKeyFunc(pvc) - if err != nil { - return pvc, pv, err, false - } ctrl.uncertainPVCs[pvcKey] = *pvc - } else { // Only InvalidArgument can be set to Infeasible state // Final errors other than InvalidArgument will still be in InProgress state - if status.Code() == codes.InvalidArgument { + if errStatus.Code() == codes.InvalidArgument { // Mark pvc.Status.ModifyVolumeStatus as infeasible pvc, markModifyVolumeInfeasibleError := ctrl.markControllerModifyVolumeStatus(pvc, v1.PersistentVolumeClaimModifyVolumeInfeasible, err) if markModifyVolumeInfeasibleError != nil { return pvc, pv, markModifyVolumeInfeasibleError, false } + ctrl.markForSlowRetry(pvc, pvcKey) } - ctrl.removePVCFromModifyVolumeUncertainCache(pvc) + ctrl.removePVCFromModifyVolumeUncertainCache(pvcKey) } } else { return pvc, pv, fmt.Errorf("cannot get error status from modify volume err: %v ", err), false @@ -171,3 +169,34 @@ func (ctrl *modifyController) callModifyVolumeOnPlugin( } return pvc, pv, nil } + +// func delayModificationIfRecentlyInfeasible returns a delayRetryError if PVC modification recently failed with +// infeasible error +func (ctrl *modifyController) delayModificationIfRecentlyInfeasible(pvc *v1.PersistentVolumeClaim, pvcKey string) error { + // Do not delay modification if PVC updated with new VAC + s := pvc.Status.ModifyVolumeStatus + if s == nil || pvc.Spec.VolumeAttributesClassName == nil || s.TargetVolumeAttributesClassName != *pvc.Spec.VolumeAttributesClassName { + // remove key from slowSet because new VAC may be feasible + ctrl.slowSet.Remove(pvcKey) + return nil + } + + inSlowSet := ctrl.slowSet.Contains(pvcKey) + ctrl.markForSlowRetry(pvc, pvcKey) + + if inSlowSet { + msg := fmt.Sprintf("skipping volume modification for pvc %s, because modification previously failed with infeasible error", pvcKey) + klog.V(4).Infof(msg) + delayRetryError := util.NewDelayRetryError(msg, ctrl.slowSet.TimeRemaining(pvcKey)) + return delayRetryError + } + return nil +} + +// func markForSlowRetry adds PVC to controller's slowSet IF PVC's ModifyVolumeStatus is Infeasible +func (ctrl *modifyController) markForSlowRetry(pvc *v1.PersistentVolumeClaim, pvcKey string) { + s := pvc.Status.ModifyVolumeStatus + if s != nil && s.Status == v1.PersistentVolumeClaimModifyVolumeInfeasible { + ctrl.slowSet.Add(pvcKey) + } +} diff --git a/pkg/modifycontroller/modify_volume_test.go b/pkg/modifycontroller/modify_volume_test.go index 76770c6d1..25a8abbe8 100644 --- a/pkg/modifycontroller/modify_volume_test.go +++ b/pkg/modifycontroller/modify_volume_test.go @@ -1,42 +1,35 @@ package modifycontroller import ( - "context" "testing" - "time" "github.com/google/go-cmp/cmp" "github.com/kubernetes-csi/external-resizer/pkg/csi" - "github.com/kubernetes-csi/external-resizer/pkg/features" - "github.com/kubernetes-csi/external-resizer/pkg/modifier" v1 "k8s.io/api/core/v1" storagev1beta1 "k8s.io/api/storage/v1beta1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" - "k8s.io/client-go/util/workqueue" - featuregatetesting "k8s.io/component-base/featuregate/testing" ) var ( testVacObject = &storagev1beta1.VolumeAttributesClass{ ObjectMeta: metav1.ObjectMeta{Name: testVac}, - DriverName: "test-driver", + DriverName: testDriverName, Parameters: map[string]string{"iops": "3000"}, } targetVacObject = &storagev1beta1.VolumeAttributesClass{ ObjectMeta: metav1.ObjectMeta{Name: targetVac}, - DriverName: "test-driver", + DriverName: testDriverName, Parameters: map[string]string{ "iops": "4567", - "csi.storage.k8s.io/pvc/name": "foo", - "csi.storage.k8s.io/pvc/namespace": "modify", - "csi.storage.k8s.io/pv/name": "testPV", + "csi.storage.k8s.io/pvc/name": pvcName, + "csi.storage.k8s.io/pvc/namespace": pvcNamespace, + "csi.storage.k8s.io/pv/name": pvName, }, } ) @@ -111,50 +104,14 @@ func TestModify(t *testing.T) { test := tests[i] t.Run(test.name, func(t *testing.T) { // Setup - featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeAttributesClass, true) - client := csi.NewMockClient("foo", true, true, true, true, true, test.withExtraMetadata) - driverName, _ := client.GetDriverName(context.TODO()) - - var initialObjects []runtime.Object - initialObjects = append(initialObjects, test.pvc) - initialObjects = append(initialObjects, test.pv) - // existing vac set in the pvc and pv - initialObjects = append(initialObjects, testVacObject) + client := csi.NewMockClient(testDriverName, true, true, true, true, true, test.withExtraMetadata) + initialObjects := []runtime.Object{test.pvc, test.pv, testVacObject} if test.vacExists { initialObjects = append(initialObjects, targetVacObject) } + ctrlInstance, ctx := setupFakeK8sEnvironment(t, client, initialObjects) + defer ctx.Done() - kubeClient, informerFactory := fakeK8s(initialObjects) - pvInformer := informerFactory.Core().V1().PersistentVolumes() - pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() - vacInformer := informerFactory.Storage().V1beta1().VolumeAttributesClasses() - - csiModifier, err := modifier.NewModifierFromClient(client, 15*time.Second, kubeClient, informerFactory, test.withExtraMetadata, driverName) - if err != nil { - t.Fatalf("Test %s: Unable to create modifier: %v", test.name, err) - } - controller := NewModifyController(driverName, - csiModifier, kubeClient, - time.Second, test.withExtraMetadata, informerFactory, - workqueue.DefaultTypedControllerRateLimiter[string]()) - - ctrlInstance, _ := controller.(*modifyController) - - stopCh := make(chan struct{}) - informerFactory.Start(stopCh) - - for _, obj := range initialObjects { - switch obj.(type) { - case *v1.PersistentVolume: - pvInformer.Informer().GetStore().Add(obj) - case *v1.PersistentVolumeClaim: - pvcInformer.Informer().GetStore().Add(obj) - case *storagev1beta1.VolumeAttributesClass: - vacInformer.Informer().GetStore().Add(obj) - default: - t.Fatalf("Test %s: Unknown initalObject type: %+v", test.name, obj) - } - } // Action pvc, pv, err, modifyCalled := ctrlInstance.modify(test.pvc, test.pv) // Verify @@ -199,7 +156,7 @@ func TestModify(t *testing.T) { func createTestPVC(pvcName string, vacName string, curVacName string, targetVacName string) *v1.PersistentVolumeClaim { pvc := &v1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{Name: pvcName, Namespace: "modify"}, + ObjectMeta: metav1.ObjectMeta{Name: pvcName, Namespace: pvcNamespace}, Spec: v1.PersistentVolumeClaimSpec{ AccessModes: []v1.PersistentVolumeAccessMode{ v1.ReadWriteOnce, @@ -211,6 +168,7 @@ func createTestPVC(pvcName string, vacName string, curVacName string, targetVacN }, }, VolumeAttributesClassName: &vacName, + VolumeName: pvName, }, Status: v1.PersistentVolumeClaimStatus{ Phase: v1.ClaimBound,