diff --git a/pkg/reconciliation/constructor.go b/pkg/reconciliation/constructor.go index ce9f5204..473b11c5 100644 --- a/pkg/reconciliation/constructor.go +++ b/pkg/reconciliation/constructor.go @@ -85,11 +85,16 @@ func setOperatorProgressStatus(rc *ReconciliationContext, newState api.ProgressS } func setDatacenterStatus(rc *ReconciliationContext) error { - patch := client.MergeFrom(rc.Datacenter.DeepCopy()) - rc.Datacenter.Status.ObservedGeneration = rc.Datacenter.Generation - rc.setCondition(api.NewDatacenterCondition(api.DatacenterRequiresUpdate, corev1.ConditionFalse)) - if err := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, patch); err != nil { - rc.ReqLogger.Error(err, "error updating the Cassandra Operator Progress state") + if rc.Datacenter.Status.ObservedGeneration != rc.Datacenter.Generation { + patch := client.MergeFrom(rc.Datacenter.DeepCopy()) + rc.Datacenter.Status.ObservedGeneration = rc.Datacenter.Generation + if err := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, patch); err != nil { + rc.ReqLogger.Error(err, "error updating the Cassandra Operator Progress state") + return err + } + } + + if err := rc.setConditionStatus(api.DatacenterRequiresUpdate, corev1.ConditionFalse); err != nil { return err } diff --git a/pkg/reconciliation/decommission_node.go b/pkg/reconciliation/decommission_node.go index de2c5160..e83db46d 100644 --- a/pkg/reconciliation/decommission_node.go +++ b/pkg/reconciliation/decommission_node.go @@ -16,6 +16,7 @@ import ( "github.com/k8ssandra/cass-operator/pkg/events" "github.com/k8ssandra/cass-operator/pkg/httphelper" "github.com/k8ssandra/cass-operator/pkg/monitoring" + "github.com/pkg/errors" "k8s.io/apimachinery/pkg/types" ) @@ -82,19 +83,8 @@ func (rc *ReconciliationContext) DecommissionNodes(epData httphelper.CassMetadat if maxReplicas > desiredNodeCount { logger.V(1).Info("reconcile_racks::DecommissionNodes::scaleDownRack", "Rack", rackInfo.RackName, "maxReplicas", maxReplicas, "desiredNodeCount", desiredNodeCount) - dcPatch := client.MergeFrom(dc.DeepCopy()) - updated := false - - updated = rc.setCondition( - api.NewDatacenterCondition( - api.DatacenterScalingDown, corev1.ConditionTrue)) || updated - - if updated { - err := rc.Client.Status().Patch(rc.Ctx, dc, dcPatch) - if err != nil { - logger.Error(err, "error patching datacenter status for scaling down rack started") - return result.Error(err) - } + if err := rc.setConditionStatus(api.DatacenterScalingDown, corev1.ConditionTrue); err != nil { + return result.Error(err) } rc.ReqLogger.Info( @@ -219,21 +209,8 @@ func (rc *ReconciliationContext) CheckDecommissioningNodes(epData httphelper.Cas } } - dcPatch := client.MergeFrom(rc.Datacenter.DeepCopy()) - updated := false - - updated = rc.setCondition( - api.NewDatacenterCondition( - api.DatacenterScalingDown, corev1.ConditionFalse)) || updated - - if updated { - err := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, dcPatch) - if err != nil { - rc.ReqLogger.Error(err, "error patching datacenter status for scaling down finished") - return result.Error(err) - } - // Requeue after updating to ensure we verify previous steps with the new size - return result.RequeueSoon(0) + if err := rc.setConditionStatus(api.DatacenterScalingDown, corev1.ConditionFalse); err != nil { + return result.Error(err) } return result.Continue() @@ -424,20 +401,12 @@ func (rc *ReconciliationContext) EnsurePodsCanAbsorbDecommData(decommPod *corev1 rc.ReqLogger.Error(fmt.Errorf(msg), msg) rc.Recorder.Eventf(rc.Datacenter, corev1.EventTypeWarning, events.InvalidDatacenterSpec, msg) - dcPatch := client.MergeFrom(rc.Datacenter.DeepCopy()) - updated := rc.setCondition( + if err := rc.setCondition( api.NewDatacenterConditionWithReason(api.DatacenterValid, corev1.ConditionFalse, "notEnoughSpaceToScaleDown", msg, ), - ) - - if updated { - patchErr := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, dcPatch) - if patchErr != nil { - msg := "error patching condition Valid for failed scale down." - rc.ReqLogger.Error(patchErr, msg) - return patchErr - } + ); err != nil { + return errors.Wrap(err, msg) } return fmt.Errorf(msg) diff --git a/pkg/reconciliation/reconcile_datacenter.go b/pkg/reconciliation/reconcile_datacenter.go index cee40f81..82d52be0 100644 --- a/pkg/reconciliation/reconcile_datacenter.go +++ b/pkg/reconciliation/reconcile_datacenter.go @@ -66,13 +66,8 @@ func (rc *ReconciliationContext) ProcessDeletion() result.ReconcileResult { } if len(dcs) > 1 { - dcPatch := client.MergeFrom(rc.Datacenter.DeepCopy()) - if updated := rc.setCondition(api.NewDatacenterCondition(api.DatacenterDecommission, corev1.ConditionTrue)); updated { - err := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, dcPatch) - if err != nil { - rc.ReqLogger.Error(err, "error patching datacenter status for decommissiong started") - return result.Error(err) - } + if err := rc.setConditionStatus(api.DatacenterDecommission, corev1.ConditionTrue); err != nil { + return result.Error(err) } rc.ReqLogger.V(1).Info("Decommissioning the datacenter to 0 nodes first before deletion") diff --git a/pkg/reconciliation/reconcile_racks.go b/pkg/reconciliation/reconcile_racks.go index c838b102..90c7f4e7 100644 --- a/pkg/reconciliation/reconcile_racks.go +++ b/pkg/reconciliation/reconcile_racks.go @@ -371,13 +371,11 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult { logger. WithValues("rackName", rackName). Info("update is blocked, but statefulset needs an update. Marking datacenter as requiring update.") - dcPatch := client.MergeFrom(dc.DeepCopy()) - if updated := rc.setCondition(api.NewDatacenterCondition(api.DatacenterRequiresUpdate, corev1.ConditionTrue)); updated { - if err := rc.Client.Status().Patch(rc.Ctx, dc, dcPatch); err != nil { - logger.Error(err, "error patching datacenter status for updating") - return result.Error(err) - } + + if err := rc.setConditionStatus(api.DatacenterRequiresUpdate, corev1.ConditionTrue); err != nil { + return result.Error(err) } + return result.Continue() } @@ -411,16 +409,8 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult { rc.Recorder.Eventf(rc.Datacenter, corev1.EventTypeNormal, events.UpdatingRack, "Updating rack %s", rackName) - dcPatch := client.MergeFrom(dc.DeepCopy()) - updated := rc.setCondition( - api.NewDatacenterCondition(api.DatacenterUpdating, corev1.ConditionTrue)) - - if updated { - err := rc.Client.Status().Patch(rc.Ctx, dc, dcPatch) - if err != nil { - logger.Error(err, "error patching datacenter status for updating") - return result.Error(err) - } + if err := rc.setConditionStatus(api.DatacenterUpdating, corev1.ConditionTrue); err != nil { + return result.Error(err) } if err := setOperatorProgressStatus(rc, api.ProgressUpdating); err != nil { @@ -506,11 +496,7 @@ func (rc *ReconciliationContext) CheckRackForceUpgrade() result.ReconcileResult rc.Recorder.Eventf(rc.Datacenter, corev1.EventTypeNormal, events.UpdatingRack, "Force updating rack %s", rackName) - dcPatch := client.MergeFrom(dc.DeepCopy()) - rc.setCondition(api.NewDatacenterCondition(api.DatacenterUpdating, corev1.ConditionTrue)) - - if err := rc.Client.Status().Patch(rc.Ctx, dc, dcPatch); err != nil { - logger.Error(err, "error patching datacenter status for updating condition") + if err := rc.setConditionStatus(api.DatacenterUpdating, corev1.ConditionTrue); err != nil { return result.Error(err) } @@ -610,7 +596,6 @@ func (rc *ReconciliationContext) CheckRackLabels() result.ReconcileResult { func (rc *ReconciliationContext) CheckRackStoppedState() result.ReconcileResult { logger := rc.ReqLogger - dc := rc.Datacenter emittedStoppingEvent := false racksUpdated := false @@ -629,19 +614,12 @@ func (rc *ReconciliationContext) CheckRackStoppedState() result.ReconcileResult ) if !emittedStoppingEvent { - dcPatch := client.MergeFrom(dc.DeepCopy()) - updated := rc.setCondition( - api.NewDatacenterCondition(api.DatacenterStopped, corev1.ConditionTrue)) - updated = rc.setCondition( - api.NewDatacenterCondition( - api.DatacenterReady, corev1.ConditionFalse)) || updated - - if updated { - err := rc.Client.Status().Patch(rc.Ctx, dc, dcPatch) - if err != nil { - logger.Error(err, "error patching datacenter status for stopping") - return result.Error(err) - } + if err := rc.setConditionStatus(api.DatacenterStopped, corev1.ConditionTrue); err != nil { + return result.Error(err) + } + + if err := rc.setConditionStatus(api.DatacenterReady, corev1.ConditionFalse); err != nil { + return result.Error(err) } rc.Recorder.Eventf(rc.Datacenter, corev1.EventTypeNormal, events.StoppingDatacenter, @@ -912,30 +890,19 @@ func (rc *ReconciliationContext) CheckRackScale() result.ReconcileResult { maxReplicas := *statefulSet.Spec.Replicas if maxReplicas < desiredNodeCount { - dcPatch := client.MergeFrom(dc.DeepCopy()) - updated := false - // Check to see if we are resuming from stopped and update conditions appropriately if dc.GetConditionStatus(api.DatacenterStopped) == corev1.ConditionTrue { - updated = rc.setCondition( - api.NewDatacenterCondition( - api.DatacenterStopped, corev1.ConditionFalse)) || updated + if err := rc.setConditionStatus(api.DatacenterStopped, corev1.ConditionFalse); err != nil { + return result.Error(err) + } - updated = rc.setCondition( - api.NewDatacenterCondition( - api.DatacenterResuming, corev1.ConditionTrue)) || updated + if err := rc.setConditionStatus(api.DatacenterResuming, corev1.ConditionTrue); err != nil { + return result.Error(err) + } } else if dc.GetConditionStatus(api.DatacenterReady) == corev1.ConditionTrue { // We weren't resuming from a stopped state, so we must be growing the // size of the rack and this isn't the initialization stage - updated = rc.setCondition( - api.NewDatacenterCondition( - api.DatacenterScalingUp, corev1.ConditionTrue)) || updated - } - - if updated { - err := rc.Client.Status().Patch(rc.Ctx, dc, dcPatch) - if err != nil { - logger.Error(err, "error patching datacenter status for scaling rack started") + if err := rc.setConditionStatus(api.DatacenterScalingUp, corev1.ConditionTrue); err != nil { return result.Error(err) } } @@ -1243,8 +1210,9 @@ func (rc *ReconciliationContext) startReplacePodsIfReplacePodsSpecified() error if len(dc.Status.NodeReplacements) > 0 { podNamesString := strings.Join(dc.Status.NodeReplacements, ", ") - _ = rc.setCondition( - api.NewDatacenterCondition(api.DatacenterReplacingNodes, corev1.ConditionTrue)) + if err := rc.setConditionStatus(api.DatacenterReplacingNodes, corev1.ConditionTrue); err != nil { + return err + } rc.Recorder.Eventf(rc.Datacenter, corev1.EventTypeNormal, events.ReplacingNode, "Replacing Cassandra nodes for pods %s", podNamesString) @@ -1325,27 +1293,10 @@ func (rc *ReconciliationContext) UpdateStatus() result.ReconcileResult { } func (rc *ReconciliationContext) updateHealth(healthy bool) error { - updated := false - dcPatch := client.MergeFrom(rc.Datacenter.DeepCopy()) - if !healthy { - updated = rc.setCondition( - api.NewDatacenterCondition( - api.DatacenterHealthy, corev1.ConditionFalse)) - } else { - updated = rc.setCondition( - api.NewDatacenterCondition( - api.DatacenterHealthy, corev1.ConditionTrue)) + return rc.setConditionStatus(api.DatacenterHealthy, corev1.ConditionFalse) } - - if updated { - err := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, dcPatch) - if err != nil { - return err - } - } - - return nil + return rc.setConditionStatus(api.DatacenterHealthy, corev1.ConditionTrue) } func hasBeenXMinutes(x int, sinceTime time.Time) bool { @@ -2231,20 +2182,13 @@ func (rc *ReconciliationContext) CheckRollingRestart() result.ReconcileResult { logger := rc.ReqLogger if dc.Spec.DeprecatedRollingRestartRequested { - dcPatch := client.MergeFrom(dc.DeepCopy()) - dc.Status.LastRollingRestart = metav1.Now() - _ = rc.setCondition( - api.NewDatacenterCondition(api.DatacenterRollingRestart, corev1.ConditionTrue)) - err := rc.Client.Status().Patch(rc.Ctx, dc, dcPatch) - if err != nil { - logger.Error(err, "error patching datacenter status for rolling restart") + if err := rc.setConditionStatus(api.DatacenterRollingRestart, corev1.ConditionTrue); err != nil { return result.Error(err) } - dcPatch = client.MergeFromWithOptions(dc.DeepCopy(), client.MergeFromWithOptimisticLock{}) + dcPatch := client.MergeFromWithOptions(dc.DeepCopy(), client.MergeFromWithOptimisticLock{}) dc.Spec.DeprecatedRollingRestartRequested = false - err = rc.Client.Patch(rc.Ctx, dc, dcPatch) - if err != nil { + if err := rc.Client.Patch(rc.Ctx, dc, dcPatch); err != nil { logger.Error(err, "error patching datacenter for rolling restart") return result.Error(err) } @@ -2275,42 +2219,41 @@ func (rc *ReconciliationContext) CheckRollingRestart() result.ReconcileResult { return result.Continue() } -func (rc *ReconciliationContext) setCondition(condition *api.DatacenterCondition) bool { +func (rc *ReconciliationContext) setConditionStatus(conditionType api.DatacenterConditionType, status corev1.ConditionStatus) error { + return rc.setCondition(api.NewDatacenterCondition(conditionType, status)) +} + +func (rc *ReconciliationContext) setCondition(condition *api.DatacenterCondition) error { dc := rc.Datacenter + updated := false if dc.GetConditionStatus(condition.Type) != condition.Status { // We are changing the status, so record the transition time condition.LastTransitionTime = metav1.Now() dc.SetCondition(*condition) - return true + updated = true } - return false + + if updated { + // We use Update here to avoid removing some other changes to the Status that might have happened, + // as well as updating them at the same time + return rc.Client.Status().Update(rc.Ctx, dc) + } + + return nil } func (rc *ReconciliationContext) CheckConditionInitializedAndReady() result.ReconcileResult { rc.ReqLogger.Info("reconcile_racks::CheckConditionInitializedAndReady") dc := rc.Datacenter - dcPatch := client.MergeFrom(dc.DeepCopy()) - logger := rc.ReqLogger - updated := false - updated = rc.setCondition( - api.NewDatacenterCondition(api.DatacenterInitialized, corev1.ConditionTrue)) || updated - - if dc.GetConditionStatus(api.DatacenterStopped) == corev1.ConditionFalse { - updated = rc.setCondition( - api.NewDatacenterCondition(api.DatacenterReady, corev1.ConditionTrue)) || updated + if err := rc.setConditionStatus(api.DatacenterInitialized, corev1.ConditionTrue); err != nil { + return result.Error(err) } - if updated { - err := rc.Client.Status().Patch(rc.Ctx, dc, dcPatch) - if err != nil { - logger.Error(err, "error patching datacenter status") + if dc.GetConditionStatus(api.DatacenterStopped) == corev1.ConditionFalse { + if err := rc.setConditionStatus(api.DatacenterReady, corev1.ConditionTrue); err != nil { return result.Error(err) } - - // We may have ignored some changes before becoming ready. Ensure the reconcile loop - // gets a chance to run again to pick up anything missed. - return result.RequeueSoon(0) } return result.Continue() @@ -2385,8 +2328,6 @@ func (rc *ReconciliationContext) createTask(command taskapi.CassandraCommand) er func (rc *ReconciliationContext) CheckClearActionConditions() result.ReconcileResult { rc.ReqLogger.Info("reconcile_racks::CheckClearActionConditions") dc := rc.Datacenter - logger := rc.ReqLogger - dcPatch := client.MergeFrom(dc.DeepCopy()) // If we are here, any action that was in progress should now be completed, so start // clearing conditions @@ -2400,7 +2341,6 @@ func (rc *ReconciliationContext) CheckClearActionConditions() result.ReconcileRe conditionsThatShouldBeTrue := []api.DatacenterConditionType{ api.DatacenterValid, } - updated := false // Explicitly handle scaling up here because we want to run a cleanup afterwards if dc.GetConditionStatus(api.DatacenterScalingUp) == corev1.ConditionTrue { @@ -2409,44 +2349,34 @@ func (rc *ReconciliationContext) CheckClearActionConditions() result.ReconcileRe return res } - updated = rc.setCondition( - api.NewDatacenterCondition(api.DatacenterScalingUp, corev1.ConditionFalse)) || updated + if err := rc.setConditionStatus(api.DatacenterScalingUp, corev1.ConditionFalse); err != nil { + return result.Error(err) + } } // Make sure that the stopped condition matches the spec, because logically // we can make it through a reconcile loop while the dc is in a stopped state // and we don't want to reset the stopped condition prematurely if dc.Spec.Stopped { - updated = rc.setCondition( - api.NewDatacenterCondition(api.DatacenterStopped, corev1.ConditionTrue)) || updated + if err := rc.setConditionStatus(api.DatacenterStopped, corev1.ConditionTrue); err != nil { + return result.Error(err) + } } else { - updated = rc.setCondition( - api.NewDatacenterCondition(api.DatacenterStopped, corev1.ConditionFalse)) || updated + if err := rc.setConditionStatus(api.DatacenterStopped, corev1.ConditionFalse); err != nil { + return result.Error(err) + } } for _, conditionType := range conditionsThatShouldBeFalse { - updated = rc.setCondition( - api.NewDatacenterCondition(conditionType, corev1.ConditionFalse)) || updated + if err := rc.setConditionStatus(conditionType, corev1.ConditionFalse); err != nil { + return result.Error(err) + } } for _, conditionType := range conditionsThatShouldBeTrue { - updated = rc.setCondition( - api.NewDatacenterCondition(conditionType, corev1.ConditionTrue)) || updated - } - - if updated { - err := rc.Client.Status().Patch(rc.Ctx, dc, dcPatch) - if err != nil { - logger.Error(err, "error patching datacenter status") + if err := rc.setConditionStatus(conditionType, corev1.ConditionTrue); err != nil { return result.Error(err) } - - // There may have been changes to the CassandraDatacenter resource that we ignored - // while executing some action on the cluster. For example, a user may have - // requested to scale up the node count while we were in the middle of a rolling - // restart. To account for this, we requeue to ensure reconcile gets called again - // to pick up any such changes that we ignored previously. - return result.RequeueSoon(0) } // Nothing has changed, carry on