From b296fadb6f648b1caf01d305af53e8c54bf7a502 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Tue, 10 Dec 2024 18:28:17 +0200 Subject: [PATCH] Fix status updates and add a new status field, contextName for the Datacenter --- .../v1alpha1/k8ssandracluster_types.go | 1 + .../crds/k8ssandra-operator-crds.yaml | 2 + .../bases/k8ssandra.io_k8ssandraclusters.yaml | 2 + controllers/k8ssandra/cleanup.go | 80 ++++++++++--------- controllers/k8ssandra/datacenters.go | 12 ++- .../k8ssandracluster_controller_test.go | 6 +- controllers/k8ssandra/reaper.go | 8 +- controllers/k8ssandra/schemas.go | 23 ++++-- controllers/k8ssandra/stargate.go | 9 +-- pkg/k8ssandra/util.go | 13 +-- pkg/k8ssandra/util_test.go | 34 -------- pkg/medusa/hostmap_test.go | 16 ++-- 12 files changed, 93 insertions(+), 113 deletions(-) delete mode 100644 pkg/k8ssandra/util_test.go diff --git a/apis/k8ssandra/v1alpha1/k8ssandracluster_types.go b/apis/k8ssandra/v1alpha1/k8ssandracluster_types.go index 78f8954d8..50528fe17 100644 --- a/apis/k8ssandra/v1alpha1/k8ssandracluster_types.go +++ b/apis/k8ssandra/v1alpha1/k8ssandracluster_types.go @@ -143,6 +143,7 @@ type K8ssandraClusterCondition struct { // K8ssandraStatus defines the observed of a k8ssandra instance type K8ssandraStatus struct { + ContextName string `json:"contextName,omitempty"` DecommissionProgress DecommissionProgress `json:"decommissionProgress,omitempty"` Cassandra *cassdcapi.CassandraDatacenterStatus `json:"cassandra,omitempty"` Stargate *stargateapi.StargateStatus `json:"stargate,omitempty"` diff --git a/charts/k8ssandra-operator/crds/k8ssandra-operator-crds.yaml b/charts/k8ssandra-operator/crds/k8ssandra-operator-crds.yaml index 25b5d50d9..6a28551eb 100644 --- a/charts/k8ssandra-operator/crds/k8ssandra-operator-crds.yaml +++ b/charts/k8ssandra-operator/crds/k8ssandra-operator-crds.yaml @@ -31176,6 +31176,8 @@ spec: format: date-time type: string type: object + contextName: + type: string decommissionProgress: type: string reaper: diff --git a/config/crd/bases/k8ssandra.io_k8ssandraclusters.yaml b/config/crd/bases/k8ssandra.io_k8ssandraclusters.yaml index 409c3f590..8d9de04d1 100644 --- a/config/crd/bases/k8ssandra.io_k8ssandraclusters.yaml +++ b/config/crd/bases/k8ssandra.io_k8ssandraclusters.yaml @@ -31114,6 +31114,8 @@ spec: format: date-time type: string type: object + contextName: + type: string decommissionProgress: type: string reaper: diff --git a/controllers/k8ssandra/cleanup.go b/controllers/k8ssandra/cleanup.go index b3acbe102..af1ac704b 100644 --- a/controllers/k8ssandra/cleanup.go +++ b/controllers/k8ssandra/cleanup.go @@ -144,7 +144,7 @@ func (r *K8ssandraClusterReconciler) checkFinalizer(ctx context.Context, kc *api } func (r *K8ssandraClusterReconciler) checkDcDeletion(ctx context.Context, kc *api.K8ssandraCluster, logger logr.Logger) result.ReconcileResult { - dcName, dcNameOverride := k8ssandra.GetDatacenterForDecommission(kc) + dcName := k8ssandra.GetDatacenterForDecommission(kc) if dcName == "" { return result.Continue() } @@ -163,76 +163,78 @@ func (r *K8ssandraClusterReconciler) checkDcDeletion(ctx context.Context, kc *ap default: logger.Info("Proceeding with DC deletion", "DC", dcName) - cassDcName := dcName - if dcNameOverride != "" { - cassDcName = dcNameOverride - } - return r.deleteDc(ctx, kc, dcName, cassDcName, logger) + return r.deleteDc(ctx, kc, dcName, logger) } } -func (r *K8ssandraClusterReconciler) deleteDc(ctx context.Context, kc *api.K8ssandraCluster, dcName string, cassDcName string, logger logr.Logger) result.ReconcileResult { +func (r *K8ssandraClusterReconciler) deleteDc(ctx context.Context, kc *api.K8ssandraCluster, dcName string, logger logr.Logger) result.ReconcileResult { kcKey := utils.GetKey(kc) - stargate, remoteClient, err := r.findStargateForDeletion(ctx, kcKey, cassDcName, nil) + dcRemoteClient, err := r.ClientCache.GetRemoteClient(kc.Status.Datacenters[dcName].ContextName) + if err != nil { + return result.Error(err) + } + + dc, _, err := r.findDcForDeletion(ctx, kcKey, dcName, dcRemoteClient) + if err != nil { + return result.Error(err) + } + + if dc == nil { + // Deletion was already done + delete(kc.Status.Datacenters, dcName) + logger.Info("DC deletion finished", "DC", dcName) + return result.Continue() + } + + stargate, remoteClient, err := r.findStargateForDeletion(ctx, kcKey, dc.DatacenterName(), nil) if err != nil { return result.Error(err) } if stargate != nil { if err = remoteClient.Delete(ctx, stargate); err != nil && !errors.IsNotFound(err) { - return result.Error(fmt.Errorf("failed to delete Stargate for dc (%s): %v", cassDcName, err)) + return result.Error(fmt.Errorf("failed to delete Stargate for dc (%s): %v", dc.DatacenterName(), err)) } logger.Info("Deleted Stargate", "Stargate", utils.GetKey(stargate)) } - reaper, remoteClient, err := r.findReaperForDeletion(ctx, kcKey, cassDcName, remoteClient) + reaper, remoteClient, err := r.findReaperForDeletion(ctx, kcKey, dc.DatacenterName(), remoteClient) if err != nil { return result.Error(err) } if reaper != nil { if err = remoteClient.Delete(ctx, reaper); err != nil && !errors.IsNotFound(err) { - return result.Error(fmt.Errorf("failed to delete Reaper for dc (%s): %v", cassDcName, err)) + return result.Error(fmt.Errorf("failed to delete Reaper for dc (%s): %v", dc.DatacenterName(), err)) } logger.Info("Deleted Reaper", "Reaper", utils.GetKey(reaper)) } - dc, remoteClient, err := r.findDcForDeletion(ctx, kcKey, dcName, remoteClient) - if err != nil { + if err := r.deleteContactPointsService(ctx, kc, dc, logger); err != nil { return result.Error(err) } - if dc != nil { - if err := r.deleteContactPointsService(ctx, kc, dc, logger); err != nil { - return result.Error(err) - } - - if dc.GetConditionStatus(cassdcapi.DatacenterDecommission) == corev1.ConditionTrue { - logger.Info("CassandraDatacenter decommissioning in progress", "CassandraDatacenter", utils.GetKey(dc)) - // There is no need to requeue here. Reconciliation will be trigger by updates made by cass-operator. - return result.Done() - } - - if !annotations.HasAnnotationWithValue(dc, cassdcapi.DecommissionOnDeleteAnnotation, "true") { - patch := client.MergeFrom(dc.DeepCopy()) - annotations.AddAnnotation(dc, cassdcapi.DecommissionOnDeleteAnnotation, "true") - if err = remoteClient.Patch(ctx, dc, patch); err != nil { - return result.Error(fmt.Errorf("failed to add %s annotation to dc: %v", cassdcapi.DecommissionOnDeleteAnnotation, err)) - } - } - - if err = remoteClient.Delete(ctx, dc); err != nil && !errors.IsNotFound(err) { - return result.Error(fmt.Errorf("failed to delete CassandraDatacenter (%s): %v", dcName, err)) - } - logger.Info("Deleted CassandraDatacenter", "CassandraDatacenter", utils.GetKey(dc)) + if dc.GetConditionStatus(cassdcapi.DatacenterDecommission) == corev1.ConditionTrue { + logger.Info("CassandraDatacenter decommissioning in progress", "CassandraDatacenter", utils.GetKey(dc)) // There is no need to requeue here. Reconciliation will be trigger by updates made by cass-operator. return result.Done() } - delete(kc.Status.Datacenters, dcName) - logger.Info("DC deletion finished", "DC", dcName) - return result.Continue() + if !annotations.HasAnnotationWithValue(dc, cassdcapi.DecommissionOnDeleteAnnotation, "true") { + patch := client.MergeFrom(dc.DeepCopy()) + annotations.AddAnnotation(dc, cassdcapi.DecommissionOnDeleteAnnotation, "true") + if err = dcRemoteClient.Patch(ctx, dc, patch); err != nil { + return result.Error(fmt.Errorf("failed to add %s annotation to dc: %v", cassdcapi.DecommissionOnDeleteAnnotation, err)) + } + } + + if err = dcRemoteClient.Delete(ctx, dc); err != nil && !errors.IsNotFound(err) { + return result.Error(fmt.Errorf("failed to delete CassandraDatacenter (%s): %v", dcName, err)) + } + logger.Info("Deleted CassandraDatacenter", "CassandraDatacenter", utils.GetKey(dc)) + // There is no need to requeue here. Reconciliation will be trigger by updates made by cass-operator. + return result.Done() } func (r *K8ssandraClusterReconciler) findStargateForDeletion( diff --git a/controllers/k8ssandra/datacenters.go b/controllers/k8ssandra/datacenters.go index 5961fccef..2e3d45fcb 100644 --- a/controllers/k8ssandra/datacenters.go +++ b/controllers/k8ssandra/datacenters.go @@ -148,7 +148,7 @@ func (r *K8ssandraClusterReconciler) reconcileDatacenters(ctx context.Context, k return result.Error(fmt.Errorf("CassandraDatacenter %s has cluster name %s, but expected %s. Cluster name cannot be changed in an existing cluster", dcKey, actualDc.Spec.ClusterName, cassClusterName)), actualDcs } - r.setStatusForDatacenter(kc, actualDc) + r.setStatusForDatacenter(kc, actualDc, dcConfig.K8sContext) r.reconcileContactPointsService(ctx, kc, actualDc, remoteClient, dcLogger) @@ -309,7 +309,7 @@ func (r *K8ssandraClusterReconciler) reconcileDatacenters(ctx context.Context, k return result.Continue(), actualDcs } -func (r *K8ssandraClusterReconciler) setStatusForDatacenter(kc *api.K8ssandraCluster, dc *cassdcapi.CassandraDatacenter) { +func (r *K8ssandraClusterReconciler) setStatusForDatacenter(kc *api.K8ssandraCluster, dc *cassdcapi.CassandraDatacenter, targetContext string) { if len(kc.Status.Datacenters) == 0 { kc.Status.Datacenters = make(map[string]api.K8ssandraStatus, 0) } @@ -318,9 +318,15 @@ func (r *K8ssandraClusterReconciler) setStatusForDatacenter(kc *api.K8ssandraClu if found { dc.Status.DeepCopyInto(kdcStatus.Cassandra) + if kdcStatus.ContextName != targetContext { + // This is pretty fatal situation if it happens to actually change the context, but for updates from previous versions we need it + kdcStatus.ContextName = targetContext + } + kc.Status.Datacenters[dc.Name] = kdcStatus } else { kc.Status.Datacenters[dc.Name] = api.K8ssandraStatus{ - Cassandra: dc.Status.DeepCopy(), + ContextName: targetContext, + Cassandra: dc.Status.DeepCopy(), } } } diff --git a/controllers/k8ssandra/k8ssandracluster_controller_test.go b/controllers/k8ssandra/k8ssandracluster_controller_test.go index cad05d642..849a3b483 100644 --- a/controllers/k8ssandra/k8ssandracluster_controller_test.go +++ b/controllers/k8ssandra/k8ssandracluster_controller_test.go @@ -47,7 +47,7 @@ import ( const ( timeout = time.Second * 5 - interval = time.Millisecond * 500 + interval = time.Millisecond * 100 ) var ( @@ -69,8 +69,8 @@ func TestK8ssandraCluster(t *testing.T) { reconcilerConfig := config.InitConfig() - reconcilerConfig.DefaultDelay = 100 * time.Millisecond - reconcilerConfig.LongDelay = 300 * time.Millisecond + reconcilerConfig.DefaultDelay = 50 * time.Millisecond + reconcilerConfig.LongDelay = 200 * time.Millisecond err := testEnv.Start(ctx, t, func(mgr manager.Manager, clientCache *clientcache.ClientCache, clusters []cluster.Cluster) error { err := (&K8ssandraClusterReconciler{ diff --git a/controllers/k8ssandra/reaper.go b/controllers/k8ssandra/reaper.go index c50852ea6..81ebb5a02 100644 --- a/controllers/k8ssandra/reaper.go +++ b/controllers/k8ssandra/reaper.go @@ -19,6 +19,7 @@ package k8ssandra import ( "context" "fmt" + "github.com/go-logr/logr" cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" api "github.com/k8ssandra/k8ssandra-operator/apis/k8ssandra/v1alpha1" @@ -242,11 +243,8 @@ func (r *K8ssandraClusterReconciler) setStatusForReaper(kc *api.K8ssandraCluster func (r *K8ssandraClusterReconciler) removeReaperStatus(kc *api.K8ssandraCluster, dcName string) { if kdcStatus, found := kc.Status.Datacenters[dcName]; found { - kc.Status.Datacenters[dcName] = api.K8ssandraStatus{ - Reaper: nil, - Cassandra: kdcStatus.Cassandra.DeepCopy(), - Stargate: kdcStatus.Stargate.DeepCopy(), - } + kdcStatus.Reaper = nil + kc.Status.Datacenters[dcName] = kdcStatus } } diff --git a/controllers/k8ssandra/schemas.go b/controllers/k8ssandra/schemas.go index 7a7eb01e7..16095c4ed 100644 --- a/controllers/k8ssandra/schemas.go +++ b/controllers/k8ssandra/schemas.go @@ -56,25 +56,38 @@ func (r *K8ssandraClusterReconciler) checkSchemas( } } - decommCassDcName, _ := k8ssandra.GetDatacenterForDecommission(kc) + decommCassDcName := k8ssandra.GetDatacenterForDecommission(kc) logger.Info("Checking if user keyspace replication needs to be updated", "decommissioning_dc", decommCassDcName) + logger.Info("Status of datacenters", "status", kc.Status.Datacenters) decommission := false + status := kc.Status.Datacenters[decommCassDcName] if decommCassDcName != "" { - decommission = kc.Status.Datacenters[decommCassDcName].DecommissionProgress == api.DecommUpdatingReplication + decommission = status.DecommissionProgress == api.DecommUpdatingReplication } - status := kc.Status.Datacenters[decommCassDcName] if decommission { kcKey := utils.GetKey(kc) - dc, _, err = r.findDcForDeletion(ctx, kcKey, decommCassDcName, remoteClient) + logger.Info("Decommissioning DC", "dc", decommCassDcName, "context", status.ContextName) + + var dcRemoteClient client.Client + if status.ContextName == "" { + dcRemoteClient = remoteClient + } else { + dcRemoteClient, err = r.ClientCache.GetRemoteClient(status.ContextName) + if err != nil { + return result.Error(err) + } + } + + dc, _, err = r.findDcForDeletion(ctx, kcKey, decommCassDcName, dcRemoteClient) if err != nil { return result.Error(err) } decommDcName := decommCassDcName if dc.Spec.DatacenterName != "" { - decommCassDcName = dc.Spec.DatacenterName + decommDcName = dc.Spec.DatacenterName } if recResult := r.checkUserKeyspacesReplicationForDecommission(kc, decommDcName, mgmtApi, logger); recResult.Completed() { diff --git a/controllers/k8ssandra/stargate.go b/controllers/k8ssandra/stargate.go index 9c1032dea..2caa83aa6 100644 --- a/controllers/k8ssandra/stargate.go +++ b/controllers/k8ssandra/stargate.go @@ -121,10 +121,10 @@ func (r *K8ssandraClusterReconciler) setStatusForStargate(kc *api.K8ssandraClust if found { if kdcStatus.Stargate == nil { kdcStatus.Stargate = stargate.Status.DeepCopy() - kc.Status.Datacenters[dcName] = kdcStatus } else { stargate.Status.DeepCopyInto(kdcStatus.Stargate) } + kc.Status.Datacenters[dcName] = kdcStatus } else { kc.Status.Datacenters[dcName] = api.K8ssandraStatus{ Stargate: stargate.Status.DeepCopy(), @@ -166,10 +166,7 @@ func (r *K8ssandraClusterReconciler) reconcileStargateAuthSchema( func (r *K8ssandraClusterReconciler) removeStargateStatus(kc *api.K8ssandraCluster, dcName string) { if kdcStatus, found := kc.Status.Datacenters[dcName]; found { - kc.Status.Datacenters[dcName] = api.K8ssandraStatus{ - Stargate: nil, - Cassandra: kdcStatus.Cassandra.DeepCopy(), - Reaper: kdcStatus.Reaper.DeepCopy(), - } + kdcStatus.Stargate = nil + kc.Status.Datacenters[dcName] = kdcStatus } } diff --git a/pkg/k8ssandra/util.go b/pkg/k8ssandra/util.go index 4f04876ce..53d1773fb 100644 --- a/pkg/k8ssandra/util.go +++ b/pkg/k8ssandra/util.go @@ -5,7 +5,7 @@ import ( "k8s.io/utils/strings/slices" ) -func GetDatacenterForDecommission(kc *api.K8ssandraCluster) (string, string) { +func GetDatacenterForDecommission(kc *api.K8ssandraCluster) string { dcNames := make([]string, 0) for _, dc := range kc.Spec.Cassandra.Datacenters { dcNames = append(dcNames, dc.Meta.Name) @@ -15,7 +15,7 @@ func GetDatacenterForDecommission(kc *api.K8ssandraCluster) (string, string) { for dcName, status := range kc.Status.Datacenters { if !slices.Contains(dcNames, dcName) { if status.DecommissionProgress != api.DecommNone { - return dcName, dcNameOverride(kc.Status.Datacenters[dcName].Cassandra.DatacenterName) + return dcName } } } @@ -23,16 +23,9 @@ func GetDatacenterForDecommission(kc *api.K8ssandraCluster) (string, string) { // No decommissions are in progress. Pick the first one we find. for dcName := range kc.Status.Datacenters { if !slices.Contains(dcNames, dcName) { - return dcName, dcNameOverride(kc.Status.Datacenters[dcName].Cassandra.DatacenterName) + return dcName } } - return "", "" -} - -func dcNameOverride(datacenterName *string) string { - if datacenterName != nil { - return *datacenterName - } return "" } diff --git a/pkg/k8ssandra/util_test.go b/pkg/k8ssandra/util_test.go deleted file mode 100644 index de6f4b749..000000000 --- a/pkg/k8ssandra/util_test.go +++ /dev/null @@ -1,34 +0,0 @@ -package k8ssandra - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestDcNameOverride(t *testing.T) { - assert := assert.New(t) - - t.Run("with non-nil string pointer", func(t *testing.T) { - datacenterName := "Test_Datacenter" - got := dcNameOverride(&datacenterName) - assert.Equal(datacenterName, got, "The two strings should be the same") - }) - - t.Run("with nil string pointer", func(t *testing.T) { - got := dcNameOverride(nil) - assert.Equal("", got, "Without a string pointer, the output should be an empty string") - }) - - t.Run("with empty string pointer", func(t *testing.T) { - datacenterName := "" - got := dcNameOverride(&datacenterName) - assert.Equal("", got, "With an empty string pointer, the output should be an empty string") - }) - - t.Run("with string containing spaces pointer", func(t *testing.T) { - datacenterName := " " - got := dcNameOverride(&datacenterName) - assert.Equal(datacenterName, got, "The two strings (with spaces) should be the same") - }) -} diff --git a/pkg/medusa/hostmap_test.go b/pkg/medusa/hostmap_test.go index 728a0baa5..b997a3363 100644 --- a/pkg/medusa/hostmap_test.go +++ b/pkg/medusa/hostmap_test.go @@ -103,7 +103,7 @@ func TestGetTargetRackFQDNs(t *testing.T) { result, err := getTargetRackFQDNs(cassDc) assert.NoError(t, err, err) expectedSourceRacks := map[NodeLocation][]string{ - {Rack: "default", DC: "dc2"}: {"test-cluster-dc2-default-sts-0", "test-cluster-dc2-default-sts-1", "test-cluster-dc2-default-sts-2"}, + {Rack: "default", DC: "test-dc2"}: {"test-cluster-dc2-default-sts-0", "test-cluster-dc2-default-sts-1", "test-cluster-dc2-default-sts-2"}, } assert.Equal(t, expectedSourceRacks, result) cassDc.Spec.Racks = []cassdcapi.Rack{ @@ -112,9 +112,9 @@ func TestGetTargetRackFQDNs(t *testing.T) { {Name: "rack3"}, } expectedSourceRacks = map[NodeLocation][]string{ - {Rack: "rack1", DC: "dc2"}: {"test-cluster-dc2-rack1-sts-0"}, - {Rack: "rack2", DC: "dc2"}: {"test-cluster-dc2-rack2-sts-0"}, - {Rack: "rack3", DC: "dc2"}: {"test-cluster-dc2-rack3-sts-0"}, + {Rack: "rack1", DC: "test-dc2"}: {"test-cluster-dc2-rack1-sts-0"}, + {Rack: "rack2", DC: "test-dc2"}: {"test-cluster-dc2-rack2-sts-0"}, + {Rack: "rack3", DC: "test-dc2"}: {"test-cluster-dc2-rack3-sts-0"}, } result, err = getTargetRackFQDNs(cassDc) assert.NoError(t, err, err) @@ -174,7 +174,7 @@ func TestGetTargetRackFQDNsOverrides(t *testing.T) { result, err := getTargetRackFQDNs(cassDc) assert.NoError(t, err, err) expectedSourceRacks := map[NodeLocation][]string{ - {Rack: "default", DC: "test-dc2"}: {"testcluster-test-dc2-default-sts-0", "testcluster-test-dc2-default-sts-1", "testcluster-test-dc2-default-sts-2"}, + {Rack: "default", DC: "Test DC2"}: {"testcluster-test-dc2-default-sts-0", "testcluster-test-dc2-default-sts-1", "testcluster-test-dc2-default-sts-2"}, } assert.Equal(t, expectedSourceRacks, result) cassDc.Spec.Racks = []cassdcapi.Rack{ @@ -183,9 +183,9 @@ func TestGetTargetRackFQDNsOverrides(t *testing.T) { {Name: "rack3"}, } expectedSourceRacks = map[NodeLocation][]string{ - {Rack: "rack1", DC: "test-dc2"}: {"testcluster-test-dc2-rack1-sts-0"}, - {Rack: "rack2", DC: "test-dc2"}: {"testcluster-test-dc2-rack2-sts-0"}, - {Rack: "rack3", DC: "test-dc2"}: {"testcluster-test-dc2-rack3-sts-0"}, + {Rack: "rack1", DC: "Test DC2"}: {"testcluster-test-dc2-rack1-sts-0"}, + {Rack: "rack2", DC: "Test DC2"}: {"testcluster-test-dc2-rack2-sts-0"}, + {Rack: "rack3", DC: "Test DC2"}: {"testcluster-test-dc2-rack3-sts-0"}, } result, err = getTargetRackFQDNs(cassDc) assert.NoError(t, err, err)