From 08b05bb1f2bbade931172f647084b3016de9dda0 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Fri, 12 Jul 2024 17:19:45 +0300 Subject: [PATCH 1/6] Refactor statusConditions updates, Fixes #669 --- pkg/reconciliation/constructor.go | 15 +- pkg/reconciliation/decommission_node.go | 47 +---- pkg/reconciliation/reconcile_datacenter.go | 9 +- pkg/reconciliation/reconcile_racks.go | 190 +++++++-------------- 4 files changed, 80 insertions(+), 181 deletions(-) diff --git a/pkg/reconciliation/constructor.go b/pkg/reconciliation/constructor.go index ce9f5204..473b11c5 100644 --- a/pkg/reconciliation/constructor.go +++ b/pkg/reconciliation/constructor.go @@ -85,11 +85,16 @@ func setOperatorProgressStatus(rc *ReconciliationContext, newState api.ProgressS } func setDatacenterStatus(rc *ReconciliationContext) error { - patch := client.MergeFrom(rc.Datacenter.DeepCopy()) - rc.Datacenter.Status.ObservedGeneration = rc.Datacenter.Generation - rc.setCondition(api.NewDatacenterCondition(api.DatacenterRequiresUpdate, corev1.ConditionFalse)) - if err := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, patch); err != nil { - rc.ReqLogger.Error(err, "error updating the Cassandra Operator Progress state") + if rc.Datacenter.Status.ObservedGeneration != rc.Datacenter.Generation { + patch := client.MergeFrom(rc.Datacenter.DeepCopy()) + rc.Datacenter.Status.ObservedGeneration = rc.Datacenter.Generation + if err := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, patch); err != nil { + rc.ReqLogger.Error(err, "error updating the Cassandra Operator Progress state") + return err + } + } + + if err := rc.setConditionStatus(api.DatacenterRequiresUpdate, corev1.ConditionFalse); err != nil { return err } diff --git a/pkg/reconciliation/decommission_node.go b/pkg/reconciliation/decommission_node.go index de2c5160..e83db46d 100644 --- a/pkg/reconciliation/decommission_node.go +++ b/pkg/reconciliation/decommission_node.go @@ -16,6 +16,7 @@ import ( "github.com/k8ssandra/cass-operator/pkg/events" "github.com/k8ssandra/cass-operator/pkg/httphelper" "github.com/k8ssandra/cass-operator/pkg/monitoring" + "github.com/pkg/errors" "k8s.io/apimachinery/pkg/types" ) @@ -82,19 +83,8 @@ func (rc *ReconciliationContext) DecommissionNodes(epData httphelper.CassMetadat if maxReplicas > desiredNodeCount { logger.V(1).Info("reconcile_racks::DecommissionNodes::scaleDownRack", "Rack", rackInfo.RackName, "maxReplicas", maxReplicas, "desiredNodeCount", desiredNodeCount) - dcPatch := client.MergeFrom(dc.DeepCopy()) - updated := false - - updated = rc.setCondition( - api.NewDatacenterCondition( - api.DatacenterScalingDown, corev1.ConditionTrue)) || updated - - if updated { - err := rc.Client.Status().Patch(rc.Ctx, dc, dcPatch) - if err != nil { - logger.Error(err, "error patching datacenter status for scaling down rack started") - return result.Error(err) - } + if err := rc.setConditionStatus(api.DatacenterScalingDown, corev1.ConditionTrue); err != nil { + return result.Error(err) } rc.ReqLogger.Info( @@ -219,21 +209,8 @@ func (rc *ReconciliationContext) CheckDecommissioningNodes(epData httphelper.Cas } } - dcPatch := client.MergeFrom(rc.Datacenter.DeepCopy()) - updated := false - - updated = rc.setCondition( - api.NewDatacenterCondition( - api.DatacenterScalingDown, corev1.ConditionFalse)) || updated - - if updated { - err := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, dcPatch) - if err != nil { - rc.ReqLogger.Error(err, "error patching datacenter status for scaling down finished") - return result.Error(err) - } - // Requeue after updating to ensure we verify previous steps with the new size - return result.RequeueSoon(0) + if err := rc.setConditionStatus(api.DatacenterScalingDown, corev1.ConditionFalse); err != nil { + return result.Error(err) } return result.Continue() @@ -424,20 +401,12 @@ func (rc *ReconciliationContext) EnsurePodsCanAbsorbDecommData(decommPod *corev1 rc.ReqLogger.Error(fmt.Errorf(msg), msg) rc.Recorder.Eventf(rc.Datacenter, corev1.EventTypeWarning, events.InvalidDatacenterSpec, msg) - dcPatch := client.MergeFrom(rc.Datacenter.DeepCopy()) - updated := rc.setCondition( + if err := rc.setCondition( api.NewDatacenterConditionWithReason(api.DatacenterValid, corev1.ConditionFalse, "notEnoughSpaceToScaleDown", msg, ), - ) - - if updated { - patchErr := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, dcPatch) - if patchErr != nil { - msg := "error patching condition Valid for failed scale down." - rc.ReqLogger.Error(patchErr, msg) - return patchErr - } + ); err != nil { + return errors.Wrap(err, msg) } return fmt.Errorf(msg) diff --git a/pkg/reconciliation/reconcile_datacenter.go b/pkg/reconciliation/reconcile_datacenter.go index cee40f81..82d52be0 100644 --- a/pkg/reconciliation/reconcile_datacenter.go +++ b/pkg/reconciliation/reconcile_datacenter.go @@ -66,13 +66,8 @@ func (rc *ReconciliationContext) ProcessDeletion() result.ReconcileResult { } if len(dcs) > 1 { - dcPatch := client.MergeFrom(rc.Datacenter.DeepCopy()) - if updated := rc.setCondition(api.NewDatacenterCondition(api.DatacenterDecommission, corev1.ConditionTrue)); updated { - err := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, dcPatch) - if err != nil { - rc.ReqLogger.Error(err, "error patching datacenter status for decommissiong started") - return result.Error(err) - } + if err := rc.setConditionStatus(api.DatacenterDecommission, corev1.ConditionTrue); err != nil { + return result.Error(err) } rc.ReqLogger.V(1).Info("Decommissioning the datacenter to 0 nodes first before deletion") diff --git a/pkg/reconciliation/reconcile_racks.go b/pkg/reconciliation/reconcile_racks.go index c838b102..90c7f4e7 100644 --- a/pkg/reconciliation/reconcile_racks.go +++ b/pkg/reconciliation/reconcile_racks.go @@ -371,13 +371,11 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult { logger. WithValues("rackName", rackName). Info("update is blocked, but statefulset needs an update. Marking datacenter as requiring update.") - dcPatch := client.MergeFrom(dc.DeepCopy()) - if updated := rc.setCondition(api.NewDatacenterCondition(api.DatacenterRequiresUpdate, corev1.ConditionTrue)); updated { - if err := rc.Client.Status().Patch(rc.Ctx, dc, dcPatch); err != nil { - logger.Error(err, "error patching datacenter status for updating") - return result.Error(err) - } + + if err := rc.setConditionStatus(api.DatacenterRequiresUpdate, corev1.ConditionTrue); err != nil { + return result.Error(err) } + return result.Continue() } @@ -411,16 +409,8 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult { rc.Recorder.Eventf(rc.Datacenter, corev1.EventTypeNormal, events.UpdatingRack, "Updating rack %s", rackName) - dcPatch := client.MergeFrom(dc.DeepCopy()) - updated := rc.setCondition( - api.NewDatacenterCondition(api.DatacenterUpdating, corev1.ConditionTrue)) - - if updated { - err := rc.Client.Status().Patch(rc.Ctx, dc, dcPatch) - if err != nil { - logger.Error(err, "error patching datacenter status for updating") - return result.Error(err) - } + if err := rc.setConditionStatus(api.DatacenterUpdating, corev1.ConditionTrue); err != nil { + return result.Error(err) } if err := setOperatorProgressStatus(rc, api.ProgressUpdating); err != nil { @@ -506,11 +496,7 @@ func (rc *ReconciliationContext) CheckRackForceUpgrade() result.ReconcileResult rc.Recorder.Eventf(rc.Datacenter, corev1.EventTypeNormal, events.UpdatingRack, "Force updating rack %s", rackName) - dcPatch := client.MergeFrom(dc.DeepCopy()) - rc.setCondition(api.NewDatacenterCondition(api.DatacenterUpdating, corev1.ConditionTrue)) - - if err := rc.Client.Status().Patch(rc.Ctx, dc, dcPatch); err != nil { - logger.Error(err, "error patching datacenter status for updating condition") + if err := rc.setConditionStatus(api.DatacenterUpdating, corev1.ConditionTrue); err != nil { return result.Error(err) } @@ -610,7 +596,6 @@ func (rc *ReconciliationContext) CheckRackLabels() result.ReconcileResult { func (rc *ReconciliationContext) CheckRackStoppedState() result.ReconcileResult { logger := rc.ReqLogger - dc := rc.Datacenter emittedStoppingEvent := false racksUpdated := false @@ -629,19 +614,12 @@ func (rc *ReconciliationContext) CheckRackStoppedState() result.ReconcileResult ) if !emittedStoppingEvent { - dcPatch := client.MergeFrom(dc.DeepCopy()) - updated := rc.setCondition( - api.NewDatacenterCondition(api.DatacenterStopped, corev1.ConditionTrue)) - updated = rc.setCondition( - api.NewDatacenterCondition( - api.DatacenterReady, corev1.ConditionFalse)) || updated - - if updated { - err := rc.Client.Status().Patch(rc.Ctx, dc, dcPatch) - if err != nil { - logger.Error(err, "error patching datacenter status for stopping") - return result.Error(err) - } + if err := rc.setConditionStatus(api.DatacenterStopped, corev1.ConditionTrue); err != nil { + return result.Error(err) + } + + if err := rc.setConditionStatus(api.DatacenterReady, corev1.ConditionFalse); err != nil { + return result.Error(err) } rc.Recorder.Eventf(rc.Datacenter, corev1.EventTypeNormal, events.StoppingDatacenter, @@ -912,30 +890,19 @@ func (rc *ReconciliationContext) CheckRackScale() result.ReconcileResult { maxReplicas := *statefulSet.Spec.Replicas if maxReplicas < desiredNodeCount { - dcPatch := client.MergeFrom(dc.DeepCopy()) - updated := false - // Check to see if we are resuming from stopped and update conditions appropriately if dc.GetConditionStatus(api.DatacenterStopped) == corev1.ConditionTrue { - updated = rc.setCondition( - api.NewDatacenterCondition( - api.DatacenterStopped, corev1.ConditionFalse)) || updated + if err := rc.setConditionStatus(api.DatacenterStopped, corev1.ConditionFalse); err != nil { + return result.Error(err) + } - updated = rc.setCondition( - api.NewDatacenterCondition( - api.DatacenterResuming, corev1.ConditionTrue)) || updated + if err := rc.setConditionStatus(api.DatacenterResuming, corev1.ConditionTrue); err != nil { + return result.Error(err) + } } else if dc.GetConditionStatus(api.DatacenterReady) == corev1.ConditionTrue { // We weren't resuming from a stopped state, so we must be growing the // size of the rack and this isn't the initialization stage - updated = rc.setCondition( - api.NewDatacenterCondition( - api.DatacenterScalingUp, corev1.ConditionTrue)) || updated - } - - if updated { - err := rc.Client.Status().Patch(rc.Ctx, dc, dcPatch) - if err != nil { - logger.Error(err, "error patching datacenter status for scaling rack started") + if err := rc.setConditionStatus(api.DatacenterScalingUp, corev1.ConditionTrue); err != nil { return result.Error(err) } } @@ -1243,8 +1210,9 @@ func (rc *ReconciliationContext) startReplacePodsIfReplacePodsSpecified() error if len(dc.Status.NodeReplacements) > 0 { podNamesString := strings.Join(dc.Status.NodeReplacements, ", ") - _ = rc.setCondition( - api.NewDatacenterCondition(api.DatacenterReplacingNodes, corev1.ConditionTrue)) + if err := rc.setConditionStatus(api.DatacenterReplacingNodes, corev1.ConditionTrue); err != nil { + return err + } rc.Recorder.Eventf(rc.Datacenter, corev1.EventTypeNormal, events.ReplacingNode, "Replacing Cassandra nodes for pods %s", podNamesString) @@ -1325,27 +1293,10 @@ func (rc *ReconciliationContext) UpdateStatus() result.ReconcileResult { } func (rc *ReconciliationContext) updateHealth(healthy bool) error { - updated := false - dcPatch := client.MergeFrom(rc.Datacenter.DeepCopy()) - if !healthy { - updated = rc.setCondition( - api.NewDatacenterCondition( - api.DatacenterHealthy, corev1.ConditionFalse)) - } else { - updated = rc.setCondition( - api.NewDatacenterCondition( - api.DatacenterHealthy, corev1.ConditionTrue)) + return rc.setConditionStatus(api.DatacenterHealthy, corev1.ConditionFalse) } - - if updated { - err := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, dcPatch) - if err != nil { - return err - } - } - - return nil + return rc.setConditionStatus(api.DatacenterHealthy, corev1.ConditionTrue) } func hasBeenXMinutes(x int, sinceTime time.Time) bool { @@ -2231,20 +2182,13 @@ func (rc *ReconciliationContext) CheckRollingRestart() result.ReconcileResult { logger := rc.ReqLogger if dc.Spec.DeprecatedRollingRestartRequested { - dcPatch := client.MergeFrom(dc.DeepCopy()) - dc.Status.LastRollingRestart = metav1.Now() - _ = rc.setCondition( - api.NewDatacenterCondition(api.DatacenterRollingRestart, corev1.ConditionTrue)) - err := rc.Client.Status().Patch(rc.Ctx, dc, dcPatch) - if err != nil { - logger.Error(err, "error patching datacenter status for rolling restart") + if err := rc.setConditionStatus(api.DatacenterRollingRestart, corev1.ConditionTrue); err != nil { return result.Error(err) } - dcPatch = client.MergeFromWithOptions(dc.DeepCopy(), client.MergeFromWithOptimisticLock{}) + dcPatch := client.MergeFromWithOptions(dc.DeepCopy(), client.MergeFromWithOptimisticLock{}) dc.Spec.DeprecatedRollingRestartRequested = false - err = rc.Client.Patch(rc.Ctx, dc, dcPatch) - if err != nil { + if err := rc.Client.Patch(rc.Ctx, dc, dcPatch); err != nil { logger.Error(err, "error patching datacenter for rolling restart") return result.Error(err) } @@ -2275,42 +2219,41 @@ func (rc *ReconciliationContext) CheckRollingRestart() result.ReconcileResult { return result.Continue() } -func (rc *ReconciliationContext) setCondition(condition *api.DatacenterCondition) bool { +func (rc *ReconciliationContext) setConditionStatus(conditionType api.DatacenterConditionType, status corev1.ConditionStatus) error { + return rc.setCondition(api.NewDatacenterCondition(conditionType, status)) +} + +func (rc *ReconciliationContext) setCondition(condition *api.DatacenterCondition) error { dc := rc.Datacenter + updated := false if dc.GetConditionStatus(condition.Type) != condition.Status { // We are changing the status, so record the transition time condition.LastTransitionTime = metav1.Now() dc.SetCondition(*condition) - return true + updated = true } - return false + + if updated { + // We use Update here to avoid removing some other changes to the Status that might have happened, + // as well as updating them at the same time + return rc.Client.Status().Update(rc.Ctx, dc) + } + + return nil } func (rc *ReconciliationContext) CheckConditionInitializedAndReady() result.ReconcileResult { rc.ReqLogger.Info("reconcile_racks::CheckConditionInitializedAndReady") dc := rc.Datacenter - dcPatch := client.MergeFrom(dc.DeepCopy()) - logger := rc.ReqLogger - updated := false - updated = rc.setCondition( - api.NewDatacenterCondition(api.DatacenterInitialized, corev1.ConditionTrue)) || updated - - if dc.GetConditionStatus(api.DatacenterStopped) == corev1.ConditionFalse { - updated = rc.setCondition( - api.NewDatacenterCondition(api.DatacenterReady, corev1.ConditionTrue)) || updated + if err := rc.setConditionStatus(api.DatacenterInitialized, corev1.ConditionTrue); err != nil { + return result.Error(err) } - if updated { - err := rc.Client.Status().Patch(rc.Ctx, dc, dcPatch) - if err != nil { - logger.Error(err, "error patching datacenter status") + if dc.GetConditionStatus(api.DatacenterStopped) == corev1.ConditionFalse { + if err := rc.setConditionStatus(api.DatacenterReady, corev1.ConditionTrue); err != nil { return result.Error(err) } - - // We may have ignored some changes before becoming ready. Ensure the reconcile loop - // gets a chance to run again to pick up anything missed. - return result.RequeueSoon(0) } return result.Continue() @@ -2385,8 +2328,6 @@ func (rc *ReconciliationContext) createTask(command taskapi.CassandraCommand) er func (rc *ReconciliationContext) CheckClearActionConditions() result.ReconcileResult { rc.ReqLogger.Info("reconcile_racks::CheckClearActionConditions") dc := rc.Datacenter - logger := rc.ReqLogger - dcPatch := client.MergeFrom(dc.DeepCopy()) // If we are here, any action that was in progress should now be completed, so start // clearing conditions @@ -2400,7 +2341,6 @@ func (rc *ReconciliationContext) CheckClearActionConditions() result.ReconcileRe conditionsThatShouldBeTrue := []api.DatacenterConditionType{ api.DatacenterValid, } - updated := false // Explicitly handle scaling up here because we want to run a cleanup afterwards if dc.GetConditionStatus(api.DatacenterScalingUp) == corev1.ConditionTrue { @@ -2409,44 +2349,34 @@ func (rc *ReconciliationContext) CheckClearActionConditions() result.ReconcileRe return res } - updated = rc.setCondition( - api.NewDatacenterCondition(api.DatacenterScalingUp, corev1.ConditionFalse)) || updated + if err := rc.setConditionStatus(api.DatacenterScalingUp, corev1.ConditionFalse); err != nil { + return result.Error(err) + } } // Make sure that the stopped condition matches the spec, because logically // we can make it through a reconcile loop while the dc is in a stopped state // and we don't want to reset the stopped condition prematurely if dc.Spec.Stopped { - updated = rc.setCondition( - api.NewDatacenterCondition(api.DatacenterStopped, corev1.ConditionTrue)) || updated + if err := rc.setConditionStatus(api.DatacenterStopped, corev1.ConditionTrue); err != nil { + return result.Error(err) + } } else { - updated = rc.setCondition( - api.NewDatacenterCondition(api.DatacenterStopped, corev1.ConditionFalse)) || updated + if err := rc.setConditionStatus(api.DatacenterStopped, corev1.ConditionFalse); err != nil { + return result.Error(err) + } } for _, conditionType := range conditionsThatShouldBeFalse { - updated = rc.setCondition( - api.NewDatacenterCondition(conditionType, corev1.ConditionFalse)) || updated + if err := rc.setConditionStatus(conditionType, corev1.ConditionFalse); err != nil { + return result.Error(err) + } } for _, conditionType := range conditionsThatShouldBeTrue { - updated = rc.setCondition( - api.NewDatacenterCondition(conditionType, corev1.ConditionTrue)) || updated - } - - if updated { - err := rc.Client.Status().Patch(rc.Ctx, dc, dcPatch) - if err != nil { - logger.Error(err, "error patching datacenter status") + if err := rc.setConditionStatus(conditionType, corev1.ConditionTrue); err != nil { return result.Error(err) } - - // There may have been changes to the CassandraDatacenter resource that we ignored - // while executing some action on the cluster. For example, a user may have - // requested to scale up the node count while we were in the middle of a rolling - // restart. To account for this, we requeue to ensure reconcile gets called again - // to pick up any such changes that we ignored previously. - return result.RequeueSoon(0) } // Nothing has changed, carry on From 2583704970eacccd11c2f3dd84aafe3ada24f462 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Fri, 12 Jul 2024 19:01:42 +0300 Subject: [PATCH 2/6] Add new metrics for all the condition statuses as well as OperatorProgress --- pkg/monitoring/metrics.go | 75 +++++++++- pkg/monitoring/metrics_test.go | 166 ++++++++++++++++++++- pkg/reconciliation/constructor.go | 3 + pkg/reconciliation/reconcile_racks.go | 3 + pkg/reconciliation/reconcile_racks_test.go | 46 ++++++ pkg/reconciliation/testing.go | 14 ++ 6 files changed, 305 insertions(+), 2 deletions(-) diff --git a/pkg/monitoring/metrics.go b/pkg/monitoring/metrics.go index 33152e03..bff9c46b 100644 --- a/pkg/monitoring/metrics.go +++ b/pkg/monitoring/metrics.go @@ -1,6 +1,7 @@ package monitoring import ( + "fmt" "strings" api "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" @@ -53,7 +54,9 @@ func getPodStatus(pod *corev1.Pod) PodStatus { } var ( - PodStatusVec *prometheus.GaugeVec + PodStatusVec *prometheus.GaugeVec + DatacenterStatusVec *prometheus.GaugeVec + DatacenterOperatorStatusVec *prometheus.GaugeVec ) func init() { @@ -64,8 +67,34 @@ func init() { Help: "Cassandra pod statuses", }, []string{"namespace", "cluster", "datacenter", "rack", "pod", "status"}) + datacenterConditionVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "cass_operator", + Subsystem: "datacenter", + Name: "status", + Help: "CassandraDatacenter conditions", + }, []string{"namespace", "cluster", "datacenter", "condition"}) + + datacenterOperatorStatusVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "cass_operator", + Subsystem: "datacenter", + Name: "progress", + Help: "CassandraDatacenter progress state", + }, []string{"namespace", "cluster", "datacenter", "progress"}) + + taskVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "cass_operator", + Subsystem: "task", + Name: "status", + Help: "Cassandra task statuses", + }, []string{"namespace", "cluster", "task", "status"}) + metrics.Registry.MustRegister(podVec) + metrics.Registry.MustRegister(datacenterConditionVec) + metrics.Registry.MustRegister(datacenterOperatorStatusVec) + metrics.Registry.MustRegister(taskVec) PodStatusVec = podVec + DatacenterStatusVec = datacenterConditionVec + DatacenterOperatorStatusVec = datacenterOperatorStatusVec } func UpdatePodStatusMetric(pod *corev1.Pod) { @@ -85,3 +114,47 @@ func RemovePodStatusMetric(pod *corev1.Pod) { func RemoveDatacenterPods(namespace, cluster, datacenter string) { PodStatusVec.DeletePartialMatch(prometheus.Labels{"namespace": namespace, "cluster": cluster, "datacenter": datacenter}) } + +func SetDatacenterConditionMetric(dc *api.CassandraDatacenter, conditionType api.DatacenterConditionType, status corev1.ConditionStatus) { + cond := float64(0) + if status == corev1.ConditionTrue { + cond = 1 + } + + DatacenterStatusVec.WithLabelValues(dc.Namespace, dc.Spec.ClusterName, dc.DatacenterName(), string(conditionType)).Set(cond) +} + +func UpdateOperatorDatacenterProgressStatusMetric(dc *api.CassandraDatacenter, state api.ProgressState) { + // Delete other statuses + DatacenterOperatorStatusVec.DeletePartialMatch(prometheus.Labels{"namespace": dc.Namespace, "cluster": dc.Spec.ClusterName, "datacenter": dc.DatacenterName()}) + + // Set this one only + DatacenterOperatorStatusVec.WithLabelValues(dc.Namespace, dc.Spec.ClusterName, dc.DatacenterName(), string(state)).Set(1) +} + +// Add CassandraTask status also (how many pods done etc) per task +// Add podnames to the CassandraTask status that are done? Or waiting? + +func GetMetricValue(name string, labels map[string]string) (float64, error) { + families, err := metrics.Registry.Gather() + if err != nil { + return 0, err + } + + for _, fam := range families { + if *fam.Name == name { + Metric: + for _, m := range fam.Metric { + for _, label := range m.Label { + if val, ok := labels[*label.Name]; ok { + if val != *label.Value { + continue Metric + } + } + } + return *m.Gauge.Value, nil + } + } + } + return 0, fmt.Errorf("no metric found") +} diff --git a/pkg/monitoring/metrics_test.go b/pkg/monitoring/metrics_test.go index 826fb916..d5a1fed1 100644 --- a/pkg/monitoring/metrics_test.go +++ b/pkg/monitoring/metrics_test.go @@ -78,7 +78,7 @@ func TestMetricAdder(t *testing.T) { require.Error(err) } -func TestNamespaceSeparatation(t *testing.T) { +func TestNamespaceSeparation(t *testing.T) { require := require.New(t) pods := make([]*corev1.Pod, 2) for i := 0; i < len(pods); i++ { @@ -150,3 +150,167 @@ func getCurrentPodStatus(podName string) (string, error) { } return "", fmt.Errorf("No pod status found") } + +func TestOperatorStateMetrics(t *testing.T) { + require := require.New(t) + + dc := &api.CassandraDatacenter{ + ObjectMeta: metav1.ObjectMeta{ + Name: "dc1", + Namespace: "ns", + }, + Spec: api.CassandraDatacenterSpec{ + ClusterName: "cluster1", + }, + Status: api.CassandraDatacenterStatus{}, + } + + UpdateOperatorDatacenterProgressStatusMetric(dc, api.ProgressUpdating) + + status, err := getCurrentDatacenterStatus("dc1") + require.NoError(err) + require.Equal("Updating", status) + + UpdateOperatorDatacenterProgressStatusMetric(dc, api.ProgressReady) + + status, err = getCurrentDatacenterStatus("dc1") + require.NoError(err) + require.Equal("Ready", status) +} + +func getCurrentDatacenterStatus(dcName string) (string, error) { + families, err := metrics.Registry.Gather() + if err != nil { + return "", err + } + + for _, fam := range families { + if *fam.Name == "cass_operator_datacenter_progress" { + Metric: + for _, m := range fam.Metric { + status := "" + for _, label := range m.Label { + if *label.Name == "datacenter" { + if *label.Value != dcName { + continue Metric + } + } + if *label.Name == "progress" { + status = *label.Value + } + } + if *m.Gauge.Value > 0 { + return status, nil + } + } + } + } + return "", fmt.Errorf("No datacenter status found") +} + +func TestDatacenterConditionMetrics(t *testing.T) { + require := require.New(t) + + dc := &api.CassandraDatacenter{ + ObjectMeta: metav1.ObjectMeta{ + Name: "dc1", + Namespace: "ns", + }, + Spec: api.CassandraDatacenterSpec{ + ClusterName: "cluster1", + }, + Status: api.CassandraDatacenterStatus{ + Conditions: []api.DatacenterCondition{ + { + Type: api.DatacenterReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } + + SetDatacenterConditionMetric(dc, api.DatacenterReady, corev1.ConditionTrue) + + status, err := getCurrentDatacenterCondition("dc1", api.DatacenterReady) + require.NoError(err) + require.Equal(float64(1), status) + + SetDatacenterConditionMetric(dc, api.DatacenterInitialized, corev1.ConditionTrue) + SetDatacenterConditionMetric(dc, api.DatacenterReady, corev1.ConditionFalse) + + status, err = getCurrentDatacenterCondition("dc1", api.DatacenterReady) + require.NoError(err) + require.Equal(float64(0), status) + + status, err = getCurrentDatacenterCondition("dc1", api.DatacenterInitialized) + require.NoError(err) + require.Equal(float64(1), status) +} + +func getCurrentDatacenterCondition(dcName string, conditionType api.DatacenterConditionType) (float64, error) { + return GetMetricValue("cass_operator_datacenter_status", map[string]string{"datacenter": dcName, "condition": string(conditionType)}) +} + +/* +func TestTaskMetrics(t *testing.T) { + require := require.New(t) + + task := &api.CassandraClusterTask{ + ObjectMeta: metav1.ObjectMeta{ + Name: "task1", + Namespace: "ns", + }, + Status: api.CassandraClusterTaskStatus{ + Phase: api.TaskPhaseRunning, + }, + } + + SetTaskStatusMetric(task) + + status, err := getCurrentTaskStatus("task1") + require.NoError(err) + require.Equal("running", status) + + task.Status.Phase = api.TaskPhaseCompleted + SetTaskStatusMetric(task) + + status, err = getCurrentTaskStatus("task1") + require.NoError(err) + require.Equal("completed", status) + + RemoveTaskStatusMetric(task) + + _, err = getCurrentTaskStatus("task1") + require.Error(err) +} + +func getCurrentTaskStatus(taskName string) (string, error) { + families, err := metrics.Registry.Gather() + if err != nil { + return "", err + } + + for _, fam := range families { + if *fam.Name == "cass_operator_tasks_status" { + Metric: + for _, m := range fam.Metric { + status := "" + for _, label := range m.Label { + if *label.Name == "task" { + if *label.Value != taskName { + continue Metric + } + } + if *label.Name == "status" { + status = *label.Value + } + } + if *m.Gauge.Value > 0 { + return status, nil + } + } + } + } + return "", fmt.Errorf("No task status found") +} +*/ diff --git a/pkg/reconciliation/constructor.go b/pkg/reconciliation/constructor.go index 473b11c5..edb29ab2 100644 --- a/pkg/reconciliation/constructor.go +++ b/pkg/reconciliation/constructor.go @@ -9,6 +9,7 @@ import ( "fmt" api "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" + "github.com/k8ssandra/cass-operator/pkg/monitoring" "github.com/k8ssandra/cass-operator/pkg/oplabels" "github.com/k8ssandra/cass-operator/pkg/utils" @@ -70,6 +71,8 @@ func setOperatorProgressStatus(rc *ReconciliationContext, newState api.ProgressS return err } + monitoring.UpdateOperatorDatacenterProgressStatusMetric(rc.Datacenter, newState) + // The allow-upgrade=once annotation is temporary and should be removed after first successful reconcile if metav1.HasAnnotation(rc.Datacenter.ObjectMeta, api.UpdateAllowedAnnotation) && rc.Datacenter.Annotations[api.UpdateAllowedAnnotation] == string(api.AllowUpdateOnce) { // remove the annotation diff --git a/pkg/reconciliation/reconcile_racks.go b/pkg/reconciliation/reconcile_racks.go index 90c7f4e7..c7d56210 100644 --- a/pkg/reconciliation/reconcile_racks.go +++ b/pkg/reconciliation/reconcile_racks.go @@ -2182,6 +2182,7 @@ func (rc *ReconciliationContext) CheckRollingRestart() result.ReconcileResult { logger := rc.ReqLogger if dc.Spec.DeprecatedRollingRestartRequested { + dc.Status.LastRollingRestart = metav1.Now() if err := rc.setConditionStatus(api.DatacenterRollingRestart, corev1.ConditionTrue); err != nil { return result.Error(err) } @@ -2234,6 +2235,8 @@ func (rc *ReconciliationContext) setCondition(condition *api.DatacenterCondition } if updated { + // Modify the metric also + monitoring.SetDatacenterConditionMetric(dc, condition.Type, condition.Status) // We use Update here to avoid removing some other changes to the Status that might have happened, // as well as updating them at the same time return rc.Client.Status().Update(rc.Ctx, dc) diff --git a/pkg/reconciliation/reconcile_racks_test.go b/pkg/reconciliation/reconcile_racks_test.go index 219fc1a6..e19c6b82 100644 --- a/pkg/reconciliation/reconcile_racks_test.go +++ b/pkg/reconciliation/reconcile_racks_test.go @@ -22,6 +22,7 @@ import ( "github.com/k8ssandra/cass-operator/internal/result" "github.com/k8ssandra/cass-operator/pkg/httphelper" "github.com/k8ssandra/cass-operator/pkg/mocks" + "github.com/k8ssandra/cass-operator/pkg/monitoring" "github.com/k8ssandra/cass-operator/pkg/oplabels" "github.com/k8ssandra/cass-operator/pkg/utils" "github.com/stretchr/testify/assert" @@ -330,6 +331,9 @@ func TestCheckRackPodTemplate_CanaryUpgrade(t *testing.T) { assert.Nil(t, err) assert.Equal(t, rc.Datacenter.Status.CassandraOperatorProgress, api.ProgressUpdating) + val, err := monitoring.GetMetricValue("cass_operator_datacenter_progress", map[string]string{"datacenter": rc.Datacenter.DatacenterName(), "progress": string(api.ProgressUpdating)}) + assert.NoError(t, err) + assert.Equal(t, float64(1), val) expectedStrategy := appsv1.StatefulSetUpdateStrategy{ Type: appsv1.RollingUpdateStatefulSetStrategyType, @@ -2727,5 +2731,47 @@ func TestCheckRackPodTemplateWithVolumeExpansion(t *testing.T) { // The fakeClient behavior does not prevent us from modifying the StS fields, so this test behaves unlike real world in that sense res = rc.CheckRackPodTemplate() require.Equal(result.Continue(), res, "Recreating StS should throw us to silence period") +} + +func TestSetConditionStatus(t *testing.T) { + rc, _, cleanupMockScr := setupTest() + defer cleanupMockScr() + assert := assert.New(t) + + mockClient := mocks.NewClient(t) + rc.Client = mockClient + + k8sMockClientStatusUpdate(mockClient.Status().(*mocks.SubResourceClient), nil).Times(2) + assert.NoError(rc.setConditionStatus(api.DatacenterHealthy, corev1.ConditionTrue)) + assert.Equal(corev1.ConditionTrue, rc.Datacenter.GetConditionStatus(api.DatacenterHealthy)) + val, err := monitoring.GetMetricValue("cass_operator_datacenter_status", map[string]string{"datacenter": rc.Datacenter.DatacenterName(), "condition": string(api.DatacenterHealthy)}) + assert.NoError(err) + assert.Equal(float64(1), val) + + assert.NoError(rc.setConditionStatus(api.DatacenterHealthy, corev1.ConditionFalse)) + assert.Equal(corev1.ConditionFalse, rc.Datacenter.GetConditionStatus(api.DatacenterHealthy)) + val, err = monitoring.GetMetricValue("cass_operator_datacenter_status", map[string]string{"datacenter": rc.Datacenter.DatacenterName(), "condition": string(api.DatacenterHealthy)}) + assert.NoError(err) + assert.Equal(float64(0), val) +} + +func TestDatacenterStatus(t *testing.T) { + rc, _, cleanupMockScr := setupTest() + defer cleanupMockScr() + assert := assert.New(t) + mockClient := mocks.NewClient(t) + rc.Client = mockClient + + k8sMockClientStatusPatch(mockClient.Status().(*mocks.SubResourceClient), nil).Once() + k8sMockClientStatusUpdate(mockClient.Status().(*mocks.SubResourceClient), nil).Times(2) + assert.NoError(rc.setConditionStatus(api.DatacenterRequiresUpdate, corev1.ConditionTrue)) // This uses one StatusUpdate call + rc.Datacenter.Status.ObservedGeneration = 0 + rc.Datacenter.ObjectMeta.Generation = 1 + assert.NoError(setDatacenterStatus(rc)) + assert.Equal(int64(1), rc.Datacenter.Status.ObservedGeneration) + assert.Equal(corev1.ConditionFalse, rc.Datacenter.GetConditionStatus(api.DatacenterRequiresUpdate)) + val, err := monitoring.GetMetricValue("cass_operator_datacenter_status", map[string]string{"datacenter": rc.Datacenter.DatacenterName(), "condition": string(api.DatacenterRequiresUpdate)}) + assert.NoError(err) + assert.Equal(float64(0), val) } diff --git a/pkg/reconciliation/testing.go b/pkg/reconciliation/testing.go index c53a3bfa..1865c308 100644 --- a/pkg/reconciliation/testing.go +++ b/pkg/reconciliation/testing.go @@ -240,6 +240,20 @@ func k8sMockClientStatusPatch(mockClient *mocks.SubResourceClient, returnArg int Once() } +func k8sMockClientStatusUpdate(mockClient *mocks.SubResourceClient, returnArg interface{}) *mock.Call { + return mockClient.On("Update", + mock.MatchedBy( + func(ctx context.Context) bool { + return ctx != nil + }), + mock.MatchedBy( + func(obj runtime.Object) bool { + return obj != nil + })). + Return(returnArg). + Once() +} + func k8sMockClientCreate(mockClient *mocks.Client, returnArg interface{}) *mock.Call { return mockClient.On("Create", mock.MatchedBy( From 02ddae5cc3c6144bcdafbef3877f63bcba8fbb5f Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Fri, 12 Jul 2024 19:45:05 +0300 Subject: [PATCH 3/6] Remove task metrics for now --- pkg/monitoring/metrics.go | 8 ----- pkg/monitoring/metrics_test.go | 64 ---------------------------------- 2 files changed, 72 deletions(-) diff --git a/pkg/monitoring/metrics.go b/pkg/monitoring/metrics.go index bff9c46b..ced1260a 100644 --- a/pkg/monitoring/metrics.go +++ b/pkg/monitoring/metrics.go @@ -81,17 +81,9 @@ func init() { Help: "CassandraDatacenter progress state", }, []string{"namespace", "cluster", "datacenter", "progress"}) - taskVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: "cass_operator", - Subsystem: "task", - Name: "status", - Help: "Cassandra task statuses", - }, []string{"namespace", "cluster", "task", "status"}) - metrics.Registry.MustRegister(podVec) metrics.Registry.MustRegister(datacenterConditionVec) metrics.Registry.MustRegister(datacenterOperatorStatusVec) - metrics.Registry.MustRegister(taskVec) PodStatusVec = podVec DatacenterStatusVec = datacenterConditionVec DatacenterOperatorStatusVec = datacenterOperatorStatusVec diff --git a/pkg/monitoring/metrics_test.go b/pkg/monitoring/metrics_test.go index d5a1fed1..5d85dff3 100644 --- a/pkg/monitoring/metrics_test.go +++ b/pkg/monitoring/metrics_test.go @@ -250,67 +250,3 @@ func TestDatacenterConditionMetrics(t *testing.T) { func getCurrentDatacenterCondition(dcName string, conditionType api.DatacenterConditionType) (float64, error) { return GetMetricValue("cass_operator_datacenter_status", map[string]string{"datacenter": dcName, "condition": string(conditionType)}) } - -/* -func TestTaskMetrics(t *testing.T) { - require := require.New(t) - - task := &api.CassandraClusterTask{ - ObjectMeta: metav1.ObjectMeta{ - Name: "task1", - Namespace: "ns", - }, - Status: api.CassandraClusterTaskStatus{ - Phase: api.TaskPhaseRunning, - }, - } - - SetTaskStatusMetric(task) - - status, err := getCurrentTaskStatus("task1") - require.NoError(err) - require.Equal("running", status) - - task.Status.Phase = api.TaskPhaseCompleted - SetTaskStatusMetric(task) - - status, err = getCurrentTaskStatus("task1") - require.NoError(err) - require.Equal("completed", status) - - RemoveTaskStatusMetric(task) - - _, err = getCurrentTaskStatus("task1") - require.Error(err) -} - -func getCurrentTaskStatus(taskName string) (string, error) { - families, err := metrics.Registry.Gather() - if err != nil { - return "", err - } - - for _, fam := range families { - if *fam.Name == "cass_operator_tasks_status" { - Metric: - for _, m := range fam.Metric { - status := "" - for _, label := range m.Label { - if *label.Name == "task" { - if *label.Value != taskName { - continue Metric - } - } - if *label.Name == "status" { - status = *label.Value - } - } - if *m.Gauge.Value > 0 { - return status, nil - } - } - } - } - return "", fmt.Errorf("No task status found") -} -*/ From 4cb7aa34bee1ec81dc57cf32bafbb88f057842d2 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Fri, 12 Jul 2024 19:48:30 +0300 Subject: [PATCH 4/6] Add CHANGELOG --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 103b6668..c82b7286 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,8 +12,9 @@ Changelog for Cass Operator, new PRs should update the `main / unreleased` secti ## unreleased * [FEATURE] [#263]((https://github.com/k8ssandra/cass-operator/issues/263) Allow increasing the size of CassandraDataVolumeClaimSpec if the selected StorageClass supports it. This feature is currently behind a opt-in feature flag and requires an annotation ``cassandra.datastax.com/allow-storage-changes: true`` to be set in the CassandraDatacenter. -* [ENHANCEMENT] [#648](https://github.com/k8ssandra/cass-operator/issues/648) Make MinReadySeconds configurable value in the Spec. * [FEATURE] [#646](https://github.com/k8ssandra/cass-operator/issues/646) Allow starting multiple parallel pods if they have already previously bootstrapped and not planned for replacement. Set annotation ``cassandra.datastax.com/allow-parallel-starts: true`` to enable this feature. +* [ENHANCEMENT] [#648](https://github.com/k8ssandra/cass-operator/issues/648) Make MinReadySeconds configurable value in the Spec. +* [ENHANCEMENT] [#349](https://github.com/k8ssandra/cass-operator/issues/349) Add CassandraDatacenter.Status fields as metrics also ## v1.21.1 From 0483d4331e47bfa5d1198d811207c4c624384e4f Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Fri, 12 Jul 2024 19:52:27 +0300 Subject: [PATCH 5/6] Fix changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c82b7286..500e2432 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ Changelog for Cass Operator, new PRs should update the `main / unreleased` secti * [FEATURE] [#263]((https://github.com/k8ssandra/cass-operator/issues/263) Allow increasing the size of CassandraDataVolumeClaimSpec if the selected StorageClass supports it. This feature is currently behind a opt-in feature flag and requires an annotation ``cassandra.datastax.com/allow-storage-changes: true`` to be set in the CassandraDatacenter. * [FEATURE] [#646](https://github.com/k8ssandra/cass-operator/issues/646) Allow starting multiple parallel pods if they have already previously bootstrapped and not planned for replacement. Set annotation ``cassandra.datastax.com/allow-parallel-starts: true`` to enable this feature. * [ENHANCEMENT] [#648](https://github.com/k8ssandra/cass-operator/issues/648) Make MinReadySeconds configurable value in the Spec. -* [ENHANCEMENT] [#349](https://github.com/k8ssandra/cass-operator/issues/349) Add CassandraDatacenter.Status fields as metrics also +* [ENHANCEMENT] [#184](https://github.com/k8ssandra/cass-operator/issues/349) Add CassandraDatacenter.Status fields as metrics also ## v1.21.1 From e7b78cc316e65035b5afbea9977c4b905f5051a9 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Thu, 18 Jul 2024 13:04:14 +0300 Subject: [PATCH 6/6] Fix rebase --- pkg/reconciliation/reconcile_racks.go | 42 +++++++++++---------------- 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/pkg/reconciliation/reconcile_racks.go b/pkg/reconciliation/reconcile_racks.go index c7d56210..86d55c1b 100644 --- a/pkg/reconciliation/reconcile_racks.go +++ b/pkg/reconciliation/reconcile_racks.go @@ -186,12 +186,8 @@ func (rc *ReconciliationContext) CheckPVCResizing() result.ReconcileResult { } } - dcPatch := client.MergeFrom(rc.Datacenter.DeepCopy()) - if updated := rc.setCondition(api.NewDatacenterCondition(api.DatacenterResizingVolumes, corev1.ConditionFalse)); updated { - if err := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, dcPatch); err != nil { - rc.ReqLogger.Error(err, "error patching datacenter status for updating") - return result.Error(err) - } + if err := rc.setConditionStatus(api.DatacenterResizingVolumes, corev1.ConditionFalse); err != nil { + return result.Error(err) } return result.Continue() @@ -232,15 +228,16 @@ func (rc *ReconciliationContext) CheckVolumeClaimSizes(statefulSet, desiredSts * // TODO This code is a bit repetitive with all the Status patches. Needs a refactoring in cass-operator since this is a known // pattern. https://github.com/k8ssandra/cass-operator/issues/669 if currentSize.Cmp(createdSize) > 0 { - dcPatch := client.MergeFrom(rc.Datacenter.DeepCopy()) - if updated := rc.setCondition(api.NewDatacenterCondition(api.DatacenterValid, corev1.ConditionFalse)); updated { - if err := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, dcPatch); err != nil { - rc.ReqLogger.Error(err, "error patching datacenter status for updating") - return result.Error(err) - } + msg := fmt.Sprintf("shrinking PVC %s is not supported", claim.Name) + if err := rc.setCondition( + api.NewDatacenterConditionWithReason(api.DatacenterValid, + corev1.ConditionFalse, "shrinkingDataVolumeNotSupported", msg, + )); err != nil { + return result.Error(err) } + rc.Recorder.Eventf(rc.Datacenter, corev1.EventTypeWarning, events.InvalidDatacenterSpec, "Shrinking CassandraDatacenter PVCs is not supported") - return result.Error(fmt.Errorf("shrinking PVC %s is not supported", claim.Name)) + return result.Error(fmt.Errorf(msg)) } if currentSize.Cmp(createdSize) < 0 { @@ -254,22 +251,17 @@ func (rc *ReconciliationContext) CheckVolumeClaimSizes(statefulSet, desiredSts * if !supportsExpansion { msg := fmt.Sprintf("PVC resize requested, but StorageClass %s does not support expansion", *claim.Spec.StorageClassName) rc.Recorder.Eventf(rc.Datacenter, corev1.EventTypeWarning, events.InvalidDatacenterSpec, msg) - dcPatch := client.MergeFrom(rc.Datacenter.DeepCopy()) - if updated := rc.setCondition(api.NewDatacenterCondition(api.DatacenterValid, corev1.ConditionFalse)); updated { - if err := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, dcPatch); err != nil { - rc.ReqLogger.Error(err, "error patching datacenter status for updating") - return result.Error(err) - } + if err := rc.setCondition( + api.NewDatacenterConditionWithReason(api.DatacenterValid, + corev1.ConditionFalse, "storageClassDoesNotSupportExpansion", msg, + )); err != nil { + return result.Error(err) } return result.Error(fmt.Errorf(msg)) } - dcPatch := client.MergeFrom(rc.Datacenter.DeepCopy()) - if updated := rc.setCondition(api.NewDatacenterCondition(api.DatacenterResizingVolumes, corev1.ConditionTrue)); updated { - if err := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, dcPatch); err != nil { - rc.ReqLogger.Error(err, "error patching datacenter status for updating") - return result.Error(err) - } + if err := rc.setConditionStatus(api.DatacenterResizingVolumes, corev1.ConditionTrue); err != nil { + return result.Error(err) } rc.Recorder.Eventf(rc.Datacenter, corev1.EventTypeNormal, events.ResizingPVC, "Resizing PVCs for %s", statefulSet.Name)