Skip to content

Commit

Permalink
Ensure infeasible PVC modifications are retried at slower pace
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewSirenko committed Jan 8, 2025
1 parent 566362d commit 350de93
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 63 deletions.
2 changes: 1 addition & 1 deletion cmd/csi-resizer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func main() {
var mc modifycontroller.ModifyController
// Add modify controller only if the feature gate is enabled
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeAttributesClass) {
mc = modifycontroller.NewModifyController(modifierName, csiModifier, kubeClient, *resyncPeriod, *extraModifyMetadata, informerFactory,
mc = modifycontroller.NewModifyController(modifierName, csiModifier, kubeClient, *resyncPeriod, *retryIntervalMax, *extraModifyMetadata, informerFactory,
workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax))
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/csi/mock_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type MockClient struct {
expandCalled atomic.Int32
modifyCalled atomic.Int32
expansionError error
modifyFailed bool
modifyError error
checkMigratedLabel bool
usedSecrets atomic.Pointer[map[string]string]
usedCapability atomic.Pointer[csi.VolumeCapability]
Expand Down Expand Up @@ -74,8 +74,8 @@ func (c *MockClient) SetExpansionError(err error) {
c.expansionError = err
}

func (c *MockClient) SetModifyFailed() {
c.modifyFailed = true
func (c *MockClient) SetModifyError(err error) {
c.modifyError = err
}

func (c *MockClient) SetCheckMigratedLabel() {
Expand Down Expand Up @@ -135,8 +135,8 @@ func (c *MockClient) Modify(
secrets map[string]string,
mutableParameters map[string]string) error {
c.modifyCalled.Add(1)
if c.modifyFailed {
return fmt.Errorf("modify failed")
if c.modifyError != nil {
return c.modifyError
}
return nil
}
10 changes: 9 additions & 1 deletion pkg/modifycontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type modifyController struct {
extraModifyMetadata bool
// the key of the map is {PVC_NAMESPACE}/{PVC_NAME}
uncertainPVCs map[string]v1.PersistentVolumeClaim
// slowSet tracks PVCs for which modification failed with infeasible error and should be retried at slower rate.
slowSet *util.SlowSet
}

// NewModifyController returns a ModifyController.
Expand All @@ -69,6 +71,7 @@ func NewModifyController(
modifier modifier.Modifier,
kubeClient kubernetes.Interface,
resyncPeriod time.Duration,
maxRetryInterval time.Duration,
extraModifyMetadata bool,
informerFactory informers.SharedInformerFactory,
pvcRateLimiter workqueue.TypedRateLimiter[string]) ModifyController {
Expand Down Expand Up @@ -99,8 +102,9 @@ func NewModifyController(
claimQueue: claimQueue,
eventRecorder: eventRecorder,
extraModifyMetadata: extraModifyMetadata,
slowSet: util.NewSlowSet(maxRetryInterval),
}
// Add a resync period as the PVC's request modify can be modified again when we handling
// Add a resync period as the PVC's request modify can be modified again when we are handling
// a previous modify request of the same PVC.
pvcInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
AddFunc: ctrl.addPVC,
Expand Down Expand Up @@ -211,6 +215,10 @@ func (ctrl *modifyController) Run(
}

stopCh := ctx.Done()

// Starts go-routine that deletes expired slowSet entries.
go ctrl.slowSet.Run(stopCh)

for i := 0; i < workers; i++ {
go wait.Until(ctrl.sync, 0, stopCh)
}
Expand Down
93 changes: 91 additions & 2 deletions pkg/modifycontroller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ package modifycontroller

import (
"context"
"errors"
"fmt"
"github.com/kubernetes-csi/external-resizer/pkg/util"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/client-go/tools/cache"
"testing"
"time"

Expand Down Expand Up @@ -115,7 +121,7 @@ func TestModifyPVC(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
client := csi.NewMockClient(testDriverName, true, true, true, true, true, false)
if test.modifyFailure {
client.SetModifyFailed()
client.SetModifyError(fmt.Errorf("fake modification error"))
}

initialObjects := []runtime.Object{test.pvc, test.pv, testVacObject, targetVacObject}
Expand Down Expand Up @@ -213,6 +219,89 @@ func TestSyncPVC(t *testing.T) {
}
}

// TestInfeasibleRetry tests that sidecar doesn't spam plugin upon infeasible error code (e.g. invalid VAC parameter)
func TestInfeasibleRetry(t *testing.T) {
basePVC := createTestPVC(pvcName, targetVac /*vacName*/, testVac /*curVacName*/, testVac /*targetVacName*/)
basePV := createTestPV(1, pvcName, pvcNamespace, "foobaz" /*pvcUID*/, &fsVolumeMode, testVac)

tests := []struct {
name string
pvc *v1.PersistentVolumeClaim
expectedModifyCallCount int
csiModifyError error
eventuallyRemoveFromSlowSet bool
}{
{
name: "Should retry non-infeasible error normally",
pvc: basePVC,
expectedModifyCallCount: 2,
csiModifyError: status.Errorf(codes.Internal, "fake non-infeasible error"),
eventuallyRemoveFromSlowSet: false,
},
{
name: "Should NOT retry infeasible error normally",
pvc: basePVC,
expectedModifyCallCount: 1,
csiModifyError: status.Errorf(codes.InvalidArgument, "fake infeasible error"),
eventuallyRemoveFromSlowSet: false,
},
{
name: "Should EVENTUALLY retry infeasible error",
pvc: basePVC,
expectedModifyCallCount: 2,
csiModifyError: status.Errorf(codes.InvalidArgument, "fake infeasible error"),
eventuallyRemoveFromSlowSet: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// Setup
client := csi.NewMockClient(testDriverName, true, true, true, true, true, false)
if test.csiModifyError != nil {
client.SetModifyError(test.csiModifyError)
}

initialObjects := []runtime.Object{test.pvc, basePV, testVacObject, targetVacObject}
ctrlInstance, ctx := setupFakeK8sEnvironment(t, client, initialObjects)
defer ctx.Done()

// Attempt modification first time
err := ctrlInstance.syncPVC(pvcNamespace + "/" + pvcName)
if !errors.Is(err, test.csiModifyError) {
t.Errorf("for %s, unexpected first syncPVC error: %v", test.name, err)
}

// Fake time passing by removing from SlowSet
if test.eventuallyRemoveFromSlowSet {
pvcKey, _ := cache.MetaNamespaceKeyFunc(test.pvc)
ctrlInstance.slowSet.Remove(pvcKey)
}

// Attempt modification second time
err2 := ctrlInstance.syncPVC(pvcNamespace + "/" + pvcName)
switch test.expectedModifyCallCount {
case 1:
if !util.IsDelayRetryError(err2) {
t.Errorf("for %s, unexpected second syncPVC error: %v", test.name, err)
}
case 2:
if !errors.Is(err2, test.csiModifyError) {
t.Errorf("for %s, unexpected second syncPVC error: %v", test.name, err)
}
default:
t.Errorf("for %s, unexpected second syncPVC error: %v", test.name, err)
}

// Confirm CSI ModifyVolume was called desired amount of times
modifyCallCount := client.GetModifyCount()
if test.expectedModifyCallCount != modifyCallCount {
t.Fatalf("for %s: expected %d csi modify calls, but got %d", test.name, test.expectedModifyCallCount, modifyCallCount)
}
})
}
}

// setupFakeK8sEnvironment creates fake K8s environment and starts Informers and ModifyController
func setupFakeK8sEnvironment(t *testing.T, client *csi.MockClient, initialObjects []runtime.Object) (*modifyController, context.Context) {
t.Helper()
Expand All @@ -234,7 +323,7 @@ func setupFakeK8sEnvironment(t *testing.T, client *csi.MockClient, initialObject

controller := NewModifyController(driverName,
csiModifier, kubeClient,
0, false, informerFactory,
0 /* resyncPeriod */, 2*time.Minute, false, informerFactory,
workqueue.DefaultTypedControllerRateLimiter[string]())

/* Start informers and ModifyController*/
Expand Down
28 changes: 12 additions & 16 deletions pkg/modifycontroller/modify_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package modifycontroller

import (
"fmt"

"github.com/kubernetes-csi/external-resizer/pkg/util"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -57,13 +56,19 @@ func (ctrl *modifyController) markControllerModifyVolumeStatus(
[]v1.PersistentVolumeClaimCondition{pvcCondition})
}

updatedPVC, err := util.PatchClaim(ctrl.kubeClient, pvc, newPVC, true)
updatedPVC, err := util.PatchClaim(ctrl.kubeClient, pvc, newPVC, true /* addResourceVersionCheck */)
if err != nil {
return pvc, fmt.Errorf("mark PVC %q as modify volume failed, errored with: %v", pvc.Name, err)
}
// Remove this PVC from the uncertain cache since the status is known now
if modifyVolumeStatus == v1.PersistentVolumeClaimModifyVolumeInfeasible {
ctrl.removePVCFromModifyVolumeUncertainCache(pvc)
pvcKey, err := cache.MetaNamespaceKeyFunc(pvc)
if err != nil {
return pvc, err
}

ctrl.removePVCFromModifyVolumeUncertainCache(pvcKey)
ctrl.markForSlowRetry(pvc, pvcKey)
}
return updatedPVC, nil
}
Expand All @@ -84,7 +89,7 @@ func (ctrl *modifyController) updateConditionBasedOnError(pvc *v1.PersistentVolu
newPVC.Status.Conditions = util.MergeModifyVolumeConditionsOfPVC(newPVC.Status.Conditions,
[]v1.PersistentVolumeClaimCondition{pvcCondition})

updatedPVC, err := util.PatchClaim(ctrl.kubeClient, pvc, newPVC, false)
updatedPVC, err := util.PatchClaim(ctrl.kubeClient, pvc, newPVC, false /* addResourceVersionCheck */)
if err != nil {
return pvc, fmt.Errorf("mark PVC %q as controller expansion failed, errored with: %v", pvc.Name, err)
}
Expand Down Expand Up @@ -117,7 +122,7 @@ func (ctrl *modifyController) markControllerModifyVolumeCompleted(pvc *v1.Persis
if err != nil {
return pvc, pv, fmt.Errorf("update pv.Spec.VolumeAttributesClassName for PVC %q failed, errored with: %v", pvc.Name, err)
}
updatedPVC, err := util.PatchClaim(ctrl.kubeClient, pvc, newPVC, false)
updatedPVC, err := util.PatchClaim(ctrl.kubeClient, pvc, newPVC, false /* addResourceVersionCheck */)

if err != nil {
return pvc, pv, fmt.Errorf("mark PVC %q as ModifyVolumeCompleted failed, errored with: %v", pvc.Name, err)
Expand All @@ -140,22 +145,13 @@ func clearModifyVolumeConditions(conditions []v1.PersistentVolumeClaimCondition)
}

// removePVCFromModifyVolumeUncertainCache removes the pvc from the uncertain cache
func (ctrl *modifyController) removePVCFromModifyVolumeUncertainCache(pvc *v1.PersistentVolumeClaim) error {
func (ctrl *modifyController) removePVCFromModifyVolumeUncertainCache(pvcKey string) {
if ctrl.uncertainPVCs == nil {
return nil
return
}
// Format of the key of the uncertainPVCs is NAMESPACE/NAME of the pvc
pvcKey, err := cache.MetaNamespaceKeyFunc(pvc)
if err != nil {
return err
}
_, ok := ctrl.uncertainPVCs[pvcKey]
if ok {
delete(ctrl.uncertainPVCs, pvcKey)
}

if err != nil {
return err
}
return nil
}
13 changes: 7 additions & 6 deletions pkg/modifycontroller/modify_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestMarkControllerModifyVolumeStatus(t *testing.T) {
}
controller := NewModifyController(driverName,
csiModifier, kubeClient,
time.Second, false, informerFactory,
time.Second, 2*time.Minute, false, informerFactory,
workqueue.DefaultTypedControllerRateLimiter[string]())

ctrlInstance, _ := controller.(*modifyController)
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestUpdateConditionBasedOnError(t *testing.T) {
}
controller := NewModifyController(driverName,
csiModifier, kubeClient,
time.Second, false, informerFactory,
time.Second, 2*time.Minute, false, informerFactory,
workqueue.DefaultTypedControllerRateLimiter[string]())

ctrlInstance, _ := controller.(*modifyController)
Expand Down Expand Up @@ -248,7 +248,7 @@ func TestMarkControllerModifyVolumeCompleted(t *testing.T) {
}
controller := NewModifyController(driverName,
csiModifier, kubeClient,
time.Second, false, informerFactory,
time.Second, 2*time.Minute, false, informerFactory,
workqueue.DefaultTypedControllerRateLimiter[string]())

ctrlInstance, _ := controller.(*modifyController)
Expand Down Expand Up @@ -314,7 +314,7 @@ func TestRemovePVCFromModifyVolumeUncertainCache(t *testing.T) {
}
controller := NewModifyController(driverName,
csiModifier, kubeClient,
time.Second, false, informerFactory,
time.Second, 2*time.Minute, false, informerFactory,
workqueue.DefaultTypedControllerRateLimiter[string]())

ctrlInstance, _ := controller.(*modifyController)
Expand Down Expand Up @@ -346,10 +346,11 @@ func TestRemovePVCFromModifyVolumeUncertainCache(t *testing.T) {

time.Sleep(time.Second * 2)

err = ctrlInstance.removePVCFromModifyVolumeUncertainCache(tc.pvc)
pvcKey, err := cache.MetaNamespaceKeyFunc(tc.pvc)
if err != nil {
t.Errorf("err deleting pvc: %v", tc.pvc)
t.Errorf("failed to extract pvc key from pvc %v", tc.pvc)
}
ctrlInstance.removePVCFromModifyVolumeUncertainCache(pvcKey)

deletedPVCKey, err := cache.MetaNamespaceKeyFunc(tc.pvc)
if err != nil {
Expand Down
Loading

0 comments on commit 350de93

Please sign in to comment.