Skip to content

Commit

Permalink
Fix status updates and add a new status field, contextName for the Da…
Browse files Browse the repository at this point in the history
…tacenter
  • Loading branch information
burmanm committed Dec 10, 2024
1 parent 15ae6ac commit b296fad
Show file tree
Hide file tree
Showing 12 changed files with 93 additions and 113 deletions.
1 change: 1 addition & 0 deletions apis/k8ssandra/v1alpha1/k8ssandracluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ type K8ssandraClusterCondition struct {

// K8ssandraStatus defines the observed of a k8ssandra instance
type K8ssandraStatus struct {
ContextName string `json:"contextName,omitempty"`
DecommissionProgress DecommissionProgress `json:"decommissionProgress,omitempty"`
Cassandra *cassdcapi.CassandraDatacenterStatus `json:"cassandra,omitempty"`
Stargate *stargateapi.StargateStatus `json:"stargate,omitempty"`
Expand Down
2 changes: 2 additions & 0 deletions charts/k8ssandra-operator/crds/k8ssandra-operator-crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31176,6 +31176,8 @@ spec:
format: date-time
type: string
type: object
contextName:
type: string
decommissionProgress:
type: string
reaper:
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/k8ssandra.io_k8ssandraclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31114,6 +31114,8 @@ spec:
format: date-time
type: string
type: object
contextName:
type: string
decommissionProgress:
type: string
reaper:
Expand Down
80 changes: 41 additions & 39 deletions controllers/k8ssandra/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (r *K8ssandraClusterReconciler) checkFinalizer(ctx context.Context, kc *api
}

func (r *K8ssandraClusterReconciler) checkDcDeletion(ctx context.Context, kc *api.K8ssandraCluster, logger logr.Logger) result.ReconcileResult {
dcName, dcNameOverride := k8ssandra.GetDatacenterForDecommission(kc)
dcName := k8ssandra.GetDatacenterForDecommission(kc)
if dcName == "" {
return result.Continue()
}
Expand All @@ -163,76 +163,78 @@ func (r *K8ssandraClusterReconciler) checkDcDeletion(ctx context.Context, kc *ap
default:
logger.Info("Proceeding with DC deletion", "DC", dcName)

cassDcName := dcName
if dcNameOverride != "" {
cassDcName = dcNameOverride
}
return r.deleteDc(ctx, kc, dcName, cassDcName, logger)
return r.deleteDc(ctx, kc, dcName, logger)
}
}

func (r *K8ssandraClusterReconciler) deleteDc(ctx context.Context, kc *api.K8ssandraCluster, dcName string, cassDcName string, logger logr.Logger) result.ReconcileResult {
func (r *K8ssandraClusterReconciler) deleteDc(ctx context.Context, kc *api.K8ssandraCluster, dcName string, logger logr.Logger) result.ReconcileResult {
kcKey := utils.GetKey(kc)

stargate, remoteClient, err := r.findStargateForDeletion(ctx, kcKey, cassDcName, nil)
dcRemoteClient, err := r.ClientCache.GetRemoteClient(kc.Status.Datacenters[dcName].ContextName)
if err != nil {
return result.Error(err)
}

dc, _, err := r.findDcForDeletion(ctx, kcKey, dcName, dcRemoteClient)
if err != nil {
return result.Error(err)
}

if dc == nil {
// Deletion was already done
delete(kc.Status.Datacenters, dcName)
logger.Info("DC deletion finished", "DC", dcName)
return result.Continue()
}

stargate, remoteClient, err := r.findStargateForDeletion(ctx, kcKey, dc.DatacenterName(), nil)
if err != nil {
return result.Error(err)
}

if stargate != nil {
if err = remoteClient.Delete(ctx, stargate); err != nil && !errors.IsNotFound(err) {
return result.Error(fmt.Errorf("failed to delete Stargate for dc (%s): %v", cassDcName, err))
return result.Error(fmt.Errorf("failed to delete Stargate for dc (%s): %v", dc.DatacenterName(), err))
}
logger.Info("Deleted Stargate", "Stargate", utils.GetKey(stargate))
}

reaper, remoteClient, err := r.findReaperForDeletion(ctx, kcKey, cassDcName, remoteClient)
reaper, remoteClient, err := r.findReaperForDeletion(ctx, kcKey, dc.DatacenterName(), remoteClient)
if err != nil {
return result.Error(err)
}

if reaper != nil {
if err = remoteClient.Delete(ctx, reaper); err != nil && !errors.IsNotFound(err) {
return result.Error(fmt.Errorf("failed to delete Reaper for dc (%s): %v", cassDcName, err))
return result.Error(fmt.Errorf("failed to delete Reaper for dc (%s): %v", dc.DatacenterName(), err))
}
logger.Info("Deleted Reaper", "Reaper", utils.GetKey(reaper))
}

dc, remoteClient, err := r.findDcForDeletion(ctx, kcKey, dcName, remoteClient)
if err != nil {
if err := r.deleteContactPointsService(ctx, kc, dc, logger); err != nil {
return result.Error(err)
}

if dc != nil {
if err := r.deleteContactPointsService(ctx, kc, dc, logger); err != nil {
return result.Error(err)
}

if dc.GetConditionStatus(cassdcapi.DatacenterDecommission) == corev1.ConditionTrue {
logger.Info("CassandraDatacenter decommissioning in progress", "CassandraDatacenter", utils.GetKey(dc))
// There is no need to requeue here. Reconciliation will be trigger by updates made by cass-operator.
return result.Done()
}

if !annotations.HasAnnotationWithValue(dc, cassdcapi.DecommissionOnDeleteAnnotation, "true") {
patch := client.MergeFrom(dc.DeepCopy())
annotations.AddAnnotation(dc, cassdcapi.DecommissionOnDeleteAnnotation, "true")
if err = remoteClient.Patch(ctx, dc, patch); err != nil {
return result.Error(fmt.Errorf("failed to add %s annotation to dc: %v", cassdcapi.DecommissionOnDeleteAnnotation, err))
}
}

if err = remoteClient.Delete(ctx, dc); err != nil && !errors.IsNotFound(err) {
return result.Error(fmt.Errorf("failed to delete CassandraDatacenter (%s): %v", dcName, err))
}
logger.Info("Deleted CassandraDatacenter", "CassandraDatacenter", utils.GetKey(dc))
if dc.GetConditionStatus(cassdcapi.DatacenterDecommission) == corev1.ConditionTrue {
logger.Info("CassandraDatacenter decommissioning in progress", "CassandraDatacenter", utils.GetKey(dc))
// There is no need to requeue here. Reconciliation will be trigger by updates made by cass-operator.
return result.Done()
}

delete(kc.Status.Datacenters, dcName)
logger.Info("DC deletion finished", "DC", dcName)
return result.Continue()
if !annotations.HasAnnotationWithValue(dc, cassdcapi.DecommissionOnDeleteAnnotation, "true") {
patch := client.MergeFrom(dc.DeepCopy())
annotations.AddAnnotation(dc, cassdcapi.DecommissionOnDeleteAnnotation, "true")
if err = dcRemoteClient.Patch(ctx, dc, patch); err != nil {
return result.Error(fmt.Errorf("failed to add %s annotation to dc: %v", cassdcapi.DecommissionOnDeleteAnnotation, err))
}
}

if err = dcRemoteClient.Delete(ctx, dc); err != nil && !errors.IsNotFound(err) {
return result.Error(fmt.Errorf("failed to delete CassandraDatacenter (%s): %v", dcName, err))
}
logger.Info("Deleted CassandraDatacenter", "CassandraDatacenter", utils.GetKey(dc))
// There is no need to requeue here. Reconciliation will be trigger by updates made by cass-operator.
return result.Done()
}

func (r *K8ssandraClusterReconciler) findStargateForDeletion(
Expand Down
12 changes: 9 additions & 3 deletions controllers/k8ssandra/datacenters.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (r *K8ssandraClusterReconciler) reconcileDatacenters(ctx context.Context, k
return result.Error(fmt.Errorf("CassandraDatacenter %s has cluster name %s, but expected %s. Cluster name cannot be changed in an existing cluster", dcKey, actualDc.Spec.ClusterName, cassClusterName)), actualDcs
}

r.setStatusForDatacenter(kc, actualDc)
r.setStatusForDatacenter(kc, actualDc, dcConfig.K8sContext)

r.reconcileContactPointsService(ctx, kc, actualDc, remoteClient, dcLogger)

Expand Down Expand Up @@ -309,7 +309,7 @@ func (r *K8ssandraClusterReconciler) reconcileDatacenters(ctx context.Context, k
return result.Continue(), actualDcs
}

func (r *K8ssandraClusterReconciler) setStatusForDatacenter(kc *api.K8ssandraCluster, dc *cassdcapi.CassandraDatacenter) {
func (r *K8ssandraClusterReconciler) setStatusForDatacenter(kc *api.K8ssandraCluster, dc *cassdcapi.CassandraDatacenter, targetContext string) {
if len(kc.Status.Datacenters) == 0 {
kc.Status.Datacenters = make(map[string]api.K8ssandraStatus, 0)
}
Expand All @@ -318,9 +318,15 @@ func (r *K8ssandraClusterReconciler) setStatusForDatacenter(kc *api.K8ssandraClu

if found {
dc.Status.DeepCopyInto(kdcStatus.Cassandra)
if kdcStatus.ContextName != targetContext {
// This is pretty fatal situation if it happens to actually change the context, but for updates from previous versions we need it
kdcStatus.ContextName = targetContext
}
kc.Status.Datacenters[dc.Name] = kdcStatus
} else {
kc.Status.Datacenters[dc.Name] = api.K8ssandraStatus{
Cassandra: dc.Status.DeepCopy(),
ContextName: targetContext,
Cassandra: dc.Status.DeepCopy(),
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions controllers/k8ssandra/k8ssandracluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import (

const (
timeout = time.Second * 5
interval = time.Millisecond * 500
interval = time.Millisecond * 100
)

var (
Expand All @@ -69,8 +69,8 @@ func TestK8ssandraCluster(t *testing.T) {

reconcilerConfig := config.InitConfig()

reconcilerConfig.DefaultDelay = 100 * time.Millisecond
reconcilerConfig.LongDelay = 300 * time.Millisecond
reconcilerConfig.DefaultDelay = 50 * time.Millisecond
reconcilerConfig.LongDelay = 200 * time.Millisecond

err := testEnv.Start(ctx, t, func(mgr manager.Manager, clientCache *clientcache.ClientCache, clusters []cluster.Cluster) error {
err := (&K8ssandraClusterReconciler{
Expand Down
8 changes: 3 additions & 5 deletions controllers/k8ssandra/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package k8ssandra
import (
"context"
"fmt"

"github.com/go-logr/logr"
cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1"
api "github.com/k8ssandra/k8ssandra-operator/apis/k8ssandra/v1alpha1"
Expand Down Expand Up @@ -242,11 +243,8 @@ func (r *K8ssandraClusterReconciler) setStatusForReaper(kc *api.K8ssandraCluster

func (r *K8ssandraClusterReconciler) removeReaperStatus(kc *api.K8ssandraCluster, dcName string) {
if kdcStatus, found := kc.Status.Datacenters[dcName]; found {
kc.Status.Datacenters[dcName] = api.K8ssandraStatus{
Reaper: nil,
Cassandra: kdcStatus.Cassandra.DeepCopy(),
Stargate: kdcStatus.Stargate.DeepCopy(),
}
kdcStatus.Reaper = nil
kc.Status.Datacenters[dcName] = kdcStatus
}
}

Expand Down
23 changes: 18 additions & 5 deletions controllers/k8ssandra/schemas.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,25 +56,38 @@ func (r *K8ssandraClusterReconciler) checkSchemas(
}
}

decommCassDcName, _ := k8ssandra.GetDatacenterForDecommission(kc)
decommCassDcName := k8ssandra.GetDatacenterForDecommission(kc)

logger.Info("Checking if user keyspace replication needs to be updated", "decommissioning_dc", decommCassDcName)
logger.Info("Status of datacenters", "status", kc.Status.Datacenters)
decommission := false
status := kc.Status.Datacenters[decommCassDcName]
if decommCassDcName != "" {
decommission = kc.Status.Datacenters[decommCassDcName].DecommissionProgress == api.DecommUpdatingReplication
decommission = status.DecommissionProgress == api.DecommUpdatingReplication
}
status := kc.Status.Datacenters[decommCassDcName]

if decommission {
kcKey := utils.GetKey(kc)
dc, _, err = r.findDcForDeletion(ctx, kcKey, decommCassDcName, remoteClient)
logger.Info("Decommissioning DC", "dc", decommCassDcName, "context", status.ContextName)

var dcRemoteClient client.Client
if status.ContextName == "" {
dcRemoteClient = remoteClient
} else {
dcRemoteClient, err = r.ClientCache.GetRemoteClient(status.ContextName)
if err != nil {
return result.Error(err)
}
}

dc, _, err = r.findDcForDeletion(ctx, kcKey, decommCassDcName, dcRemoteClient)
if err != nil {
return result.Error(err)
}

decommDcName := decommCassDcName
if dc.Spec.DatacenterName != "" {
decommCassDcName = dc.Spec.DatacenterName
decommDcName = dc.Spec.DatacenterName
}

if recResult := r.checkUserKeyspacesReplicationForDecommission(kc, decommDcName, mgmtApi, logger); recResult.Completed() {
Expand Down
9 changes: 3 additions & 6 deletions controllers/k8ssandra/stargate.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,10 @@ func (r *K8ssandraClusterReconciler) setStatusForStargate(kc *api.K8ssandraClust
if found {
if kdcStatus.Stargate == nil {
kdcStatus.Stargate = stargate.Status.DeepCopy()
kc.Status.Datacenters[dcName] = kdcStatus
} else {
stargate.Status.DeepCopyInto(kdcStatus.Stargate)
}
kc.Status.Datacenters[dcName] = kdcStatus
} else {
kc.Status.Datacenters[dcName] = api.K8ssandraStatus{
Stargate: stargate.Status.DeepCopy(),
Expand Down Expand Up @@ -166,10 +166,7 @@ func (r *K8ssandraClusterReconciler) reconcileStargateAuthSchema(

func (r *K8ssandraClusterReconciler) removeStargateStatus(kc *api.K8ssandraCluster, dcName string) {
if kdcStatus, found := kc.Status.Datacenters[dcName]; found {
kc.Status.Datacenters[dcName] = api.K8ssandraStatus{
Stargate: nil,
Cassandra: kdcStatus.Cassandra.DeepCopy(),
Reaper: kdcStatus.Reaper.DeepCopy(),
}
kdcStatus.Stargate = nil
kc.Status.Datacenters[dcName] = kdcStatus
}
}
13 changes: 3 additions & 10 deletions pkg/k8ssandra/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"k8s.io/utils/strings/slices"
)

func GetDatacenterForDecommission(kc *api.K8ssandraCluster) (string, string) {
func GetDatacenterForDecommission(kc *api.K8ssandraCluster) string {
dcNames := make([]string, 0)
for _, dc := range kc.Spec.Cassandra.Datacenters {
dcNames = append(dcNames, dc.Meta.Name)
Expand All @@ -15,24 +15,17 @@ func GetDatacenterForDecommission(kc *api.K8ssandraCluster) (string, string) {
for dcName, status := range kc.Status.Datacenters {
if !slices.Contains(dcNames, dcName) {
if status.DecommissionProgress != api.DecommNone {
return dcName, dcNameOverride(kc.Status.Datacenters[dcName].Cassandra.DatacenterName)
return dcName
}
}
}

// No decommissions are in progress. Pick the first one we find.
for dcName := range kc.Status.Datacenters {
if !slices.Contains(dcNames, dcName) {
return dcName, dcNameOverride(kc.Status.Datacenters[dcName].Cassandra.DatacenterName)
return dcName
}
}

return "", ""
}

func dcNameOverride(datacenterName *string) string {
if datacenterName != nil {
return *datacenterName
}
return ""
}
34 changes: 0 additions & 34 deletions pkg/k8ssandra/util_test.go

This file was deleted.

Loading

0 comments on commit b296fad

Please sign in to comment.