Skip to content

Commit

Permalink
Modify Datacenter deletion process to scaledown StatefulSets first an…
Browse files Browse the repository at this point in the history
…d only then delete the CassandraDatacenter
  • Loading branch information
burmanm committed Dec 13, 2024
1 parent 63ff7ef commit 7983384
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 11 deletions.
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func main() {
os.Exit(1)
}

if err := mgr.GetFieldIndexer().IndexField(ctx, &corev1.Pod{}, "spec.volumes.persistentVolumeClaim.claimName", func(obj client.Object) []string {
if err := mgr.GetCache().IndexField(ctx, &corev1.Pod{}, "spec.volumes.persistentVolumeClaim.claimName", func(obj client.Object) []string {
pod, ok := obj.(*corev1.Pod)
if !ok {
return nil
Expand Down
18 changes: 17 additions & 1 deletion internal/controllers/cassandra/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"go.uber.org/zap/zapcore"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -97,7 +98,7 @@ var _ = BeforeSuite(func() {
Expect(err).ToNot(HaveOccurred())

err = (&CassandraDatacenterReconciler{
Client: k8sClient,
Client: k8sManager.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("CassandraDatacenter"),
Scheme: k8sManager.GetScheme(),
Recorder: k8sManager.GetEventRecorderFor("cass-operator"),
Expand All @@ -116,6 +117,21 @@ var _ = BeforeSuite(func() {
}).SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())

Expect(k8sManager.GetCache().IndexField(ctx, &corev1.Pod{}, "spec.volumes.persistentVolumeClaim.claimName", func(obj client.Object) []string {
pod, ok := obj.(*corev1.Pod)
if !ok {
return nil
}

var pvcNames []string
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim != nil {
pvcNames = append(pvcNames, volume.PersistentVolumeClaim.ClaimName)
}
}
return pvcNames
})).ToNot(HaveOccurred())

// Reduce the polling times and sleeps to speed up the tests
cooldownPeriod = 1 * time.Millisecond
minimumRequeueTime = 10 * time.Millisecond
Expand Down
4 changes: 2 additions & 2 deletions internal/envtest/statefulset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (r *StatefulSetReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// TODO Get existing pods and modify them .

podList := &corev1.PodList{}
if err := r.Client.List(ctx, podList, client.MatchingLabels(sts.Spec.Template.Labels), client.InNamespace(req.Namespace)); err != nil {
if err := r.Client.List(ctx, podList, client.MatchingLabels(sts.Labels), client.InNamespace(req.Namespace)); err != nil {
logger.Error(err, "Failed to list the pods belonging to this StatefulSet")
return ctrl.Result{}, err
}
Expand All @@ -94,7 +94,7 @@ func (r *StatefulSetReconciler) Reconcile(ctx context.Context, req ctrl.Request)

if len(stsPods) > intendedReplicas {
// We need to delete the pods..
for i := len(stsPods) - 1; i > intendedReplicas; i-- {
for i := len(stsPods) - 1; i >= intendedReplicas; i-- {
pod := stsPods[i]
if err := r.Client.Delete(ctx, pod); err != nil {
logger.Error(err, "Failed to delete extra pod from this StS")
Expand Down
49 changes: 42 additions & 7 deletions pkg/reconciliation/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@ package reconciliation

import (
"fmt"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

v1 "k8s.io/api/core/v1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/utils/ptr"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

Expand Down Expand Up @@ -91,16 +94,33 @@ func TestProcessDeletion_FailedDelete(t *testing.T) {

mockClient := mocks.NewClient(t)
rc.Client = mockClient
rc.Datacenter.Spec.Size = 0

k8sMockClientList(mockClient, nil).
Run(func(args mock.Arguments) {
arg := args.Get(1).(*v1.PersistentVolumeClaimList)
arg.Items = []v1.PersistentVolumeClaim{{
_, ok := args.Get(1).(*corev1.PodList)
if ok {
if strings.HasPrefix(args.Get(2).(*client.ListOptions).FieldSelector.String(), "spec.volumes.persistentVolumeClaim.claimName") {
arg := args.Get(1).(*corev1.PodList)
arg.Items = []corev1.Pod{}
} else {
t.Fail()
}
return
}
arg := args.Get(1).(*corev1.PersistentVolumeClaimList)
arg.Items = []corev1.PersistentVolumeClaim{{
ObjectMeta: metav1.ObjectMeta{
Name: "pvc-1",
},
}}
})
}).Twice()

k8sMockClientGet(mockClient, nil).
Run(func(args mock.Arguments) {
arg := args.Get(2).(*appsv1.StatefulSet)
arg.Spec.Replicas = ptr.To[int32](0)
}).Once()

k8sMockClientDelete(mockClient, fmt.Errorf(""))

Expand Down Expand Up @@ -131,16 +151,31 @@ func TestProcessDeletion(t *testing.T) {

k8sMockClientList(mockClient, nil).
Run(func(args mock.Arguments) {
arg := args.Get(1).(*v1.PersistentVolumeClaimList)
arg.Items = []v1.PersistentVolumeClaim{{
_, ok := args.Get(1).(*corev1.PodList)
if ok {
if strings.HasPrefix(args.Get(2).(*client.ListOptions).FieldSelector.String(), "spec.volumes.persistentVolumeClaim.claimName") {
arg := args.Get(1).(*corev1.PodList)
arg.Items = []corev1.Pod{}
} else {
t.Fail()
}
return
}
arg := args.Get(1).(*corev1.PersistentVolumeClaimList)
arg.Items = []corev1.PersistentVolumeClaim{{
ObjectMeta: metav1.ObjectMeta{
Name: "pvc-1",
},
}}
}) // ListPods
}).Twice() // ListPods

k8sMockClientDelete(mockClient, nil) // Delete PVC
k8sMockClientUpdate(mockClient, nil) // Remove dc finalizer
k8sMockClientGet(mockClient, nil).
Run(func(args mock.Arguments) {
arg := args.Get(2).(*appsv1.StatefulSet)
arg.Spec.Replicas = ptr.To[int32](0)
}).Once()

emptySecretWatcher(t, rc)

Expand Down
24 changes: 24 additions & 0 deletions pkg/reconciliation/reconcile_datacenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,30 @@ func (rc *ReconciliationContext) ProcessDeletion() result.ReconcileResult {
// How could we have pods if we've decommissioned everything?
return result.RequeueSoon(5)
}
} else {
// This is small mini reconcile to make everything 0 sized before we finish deletion, but do not run decommission in Cassandra
rc.ReqLogger.Info("Proceeding with deletion, setting all StatefulSets to 0 replicas")
if err := rc.CalculateRackInformation(); err != nil {
return result.Error(err)
}

if res := rc.CheckRackCreation(); res.Completed() {
return res
}

waitingForRackScale := false
for _, sts := range rc.statefulSets {
currentReplicas := int(*sts.Spec.Replicas)
if currentReplicas > 0 {
waitingForRackScale = true
if err := rc.UpdateRackNodeCount(sts, 0); err != nil {
return result.Error(err)
}
}
}
if waitingForRackScale {
return result.RequeueSoon(5)
}
}

// Clean up annotation litter on the user Secrets
Expand Down
1 change: 1 addition & 0 deletions pkg/reconciliation/reconcile_racks.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ func (rc *ReconciliationContext) CheckRackLabels() result.ReconcileResult {

func (rc *ReconciliationContext) CheckRackStoppedState() result.ReconcileResult {
logger := rc.ReqLogger
logger.Info("reconcile_racks::CheckRackStoppedState")

emittedStoppingEvent := false
racksUpdated := false
Expand Down

0 comments on commit 7983384

Please sign in to comment.