diff --git a/cmd/main.go b/cmd/main.go index 58bf99d2..634fd3f7 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -26,11 +26,13 @@ import ( "go.uber.org/zap/zapcore" _ "k8s.io/client-go/plugin/pkg/client/auth" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" @@ -119,6 +121,8 @@ func main() { os.Exit(1) } + ctx := ctrl.SetupSignalHandler() + if err = (&controllers.CassandraDatacenterReconciler{ Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("CassandraDatacenter"), @@ -152,8 +156,16 @@ func main() { os.Exit(1) } + mgr.GetCache().IndexField(ctx, &corev1.Event{}, "involvedObject.name", func(obj client.Object) []string { + event := obj.(*corev1.Event) + if event.InvolvedObject.Kind == "Pod" { + return []string{event.InvolvedObject.Name} + } + return []string{} + }) + setupLog.Info("starting manager") - if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { + if err := mgr.Start(ctx); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) } diff --git a/pkg/reconciliation/reconcile_racks.go b/pkg/reconciliation/reconcile_racks.go index 3c56b618..bdf48fe9 100644 --- a/pkg/reconciliation/reconcile_racks.go +++ b/pkg/reconciliation/reconcile_racks.go @@ -168,16 +168,14 @@ func (rc *ReconciliationContext) CheckRackCreation() result.ReconcileResult { } func (rc *ReconciliationContext) failureModeDetection() bool { - // TODO Even if these are true, we shouldn't allow update if we have a pod starting (that hasn't crashed yet) - - // First check - do we even need a force? - // We can check if StatefulSet was updated, but that wouldn't tell us if there's crashing pods for _, pod := range rc.dcPods { + if pod == nil { + continue + } if pod.Status.Phase == corev1.PodPending { if hasBeenXMinutes(5, pod.Status.StartTime.Time) { // Pod has been over 5 minutes in Pending state. This can be normal, but lets see // if we have some detected failures events like FailedScheduling - events := &corev1.EventList{} if err := rc.Client.List(rc.Ctx, events, &client.ListOptions{Namespace: pod.Namespace, FieldSelector: fields.SelectorFromSet(fields.Set{"involvedObject.name": pod.Name})}); err != nil { rc.ReqLogger.Error(err, "error getting events for pod", "pod", pod.Name) @@ -186,6 +184,7 @@ func (rc *ReconciliationContext) failureModeDetection() bool { for _, event := range events.Items { if event.Reason == "FailedScheduling" { + rc.ReqLogger.Info("Found FailedScheduling event for pod", "pod", pod.Name) // We have a failed scheduling event return true } @@ -201,6 +200,7 @@ func (rc *ReconciliationContext) failureModeDetection() bool { if waitingReason == "CrashLoopBackOff" || waitingReason == "ImagePullBackOff" || waitingReason == "ErrImagePull" { + rc.ReqLogger.Info("Failing container state for pod", "pod", pod.Name, "reason", waitingReason) // We have a container in a failing state return true } @@ -208,6 +208,7 @@ func (rc *ReconciliationContext) failureModeDetection() bool { if containerStatus.RestartCount > 2 { if containerStatus.State.Terminated != nil { if containerStatus.State.Terminated.ExitCode != 0 { + rc.ReqLogger.Info("Failing container state for pod", "pod", pod.Name, "exitCode", containerStatus.State.Terminated.ExitCode) return true } } @@ -221,12 +222,14 @@ func (rc *ReconciliationContext) failureModeDetection() bool { waitingReason == "ImagePullBackOff" || waitingReason == "ErrImagePull" { // We have a container in a failing state + rc.ReqLogger.Info("Failing initcontainer state for pod", "pod", pod.Name, "reason", waitingReason) return true } } if containerStatus.RestartCount > 2 { if containerStatus.State.Terminated != nil { if containerStatus.State.Terminated.ExitCode != 0 { + rc.ReqLogger.Info("Failing initcontainer state for pod", "pod", pod.Name, "exitCode", containerStatus.State.Terminated.ExitCode) return true } } @@ -377,12 +380,13 @@ func (rc *ReconciliationContext) CheckRackPodTemplate(force bool) result.Reconci logger.Info("reconcile_racks::CheckRackPodTemplate") for idx := range rc.desiredRackInformation { - rackName := rc.desiredRackInformation[idx].RackName if force { forceRacks := dc.Spec.ForceUpgradeRacks - if utils.IndexOfString(forceRacks, rackName) <= 0 { - continue + if len(forceRacks) > 0 { + if utils.IndexOfString(forceRacks, rackName) <= 0 { + continue + } } } @@ -402,9 +406,9 @@ func (rc *ReconciliationContext) CheckRackPodTemplate(force bool) result.Reconci updatedReplicas = status.CurrentReplicas + status.UpdatedReplicas } - if !force && statefulSet.Generation != status.ObservedGeneration || + if !force && (statefulSet.Generation != status.ObservedGeneration || status.Replicas != status.ReadyReplicas || - status.Replicas != updatedReplicas { + status.Replicas != updatedReplicas) { logger.Info( "waiting for upgrade to finish on statefulset", @@ -523,115 +527,24 @@ func (rc *ReconciliationContext) CheckRackPodTemplate(force bool) result.Reconci return result.Continue() } -/* - TODO An idea.. if the startNode phase is failing due to a Pod being unable to start (or get ready?), we could - make that as a state for CheckRackForceUpgrade to be allowed. - - TODO Also, verify this code is close to the CheckRackPodTemplate() code or even merge those two if at all possible at this stage, - given that so much time has passed since the original comment. -*/ - func (rc *ReconciliationContext) CheckRackForceUpgrade() result.ReconcileResult { dc := rc.Datacenter logger := rc.ReqLogger - logger.Info("starting CheckRackForceUpgrade()") - - forceRacks := dc.Spec.ForceUpgradeRacks - if len(forceRacks) == 0 { - return result.Continue() - } + logger.Info("reconcile_racks::CheckRackForceUpgrade") // Datacenter configuration isn't healthy, we allow upgrades here before pods start if rc.failureModeDetection() { + logger.Info("Failure detected, forcing CheckRackPodTemplate()") return rc.CheckRackPodTemplate(true) } - return rc.CheckRackPodTemplate(true) -} - -/* -func (rc *ReconciliationContext) CheckRackForceUpgrade() result.ReconcileResult { - // This code is *very* similar to CheckRackPodTemplate(), but it's not an exact - // copy. Some 3 to 5 line parts could maybe be extracted into functions. - logger := rc.ReqLogger - dc := rc.Datacenter - logger.Info("starting CheckRackForceUpgrade()") - forceRacks := dc.Spec.ForceUpgradeRacks if len(forceRacks) == 0 { return result.Continue() } - for idx, nextRack := range rc.desiredRackInformation { - rackName := rc.desiredRackInformation[idx].RackName - if utils.IndexOfString(forceRacks, rackName) >= 0 { - statefulSet := rc.statefulSets[idx] - - // have to use zero here, because each statefulset is created with no replicas - // in GetStatefulSetForRack() - desiredSts, err := newStatefulSetForCassandraDatacenter(statefulSet, rackName, dc, nextRack.NodeCount) - if err != nil { - logger.Error(err, "error calling newStatefulSetForCassandraDatacenter") - return result.Error(err) - } - - // Set the CassandraDatacenter as the owner and controller - err = setControllerReference( - rc.Datacenter, - desiredSts, - rc.Scheme) - if err != nil { - logger.Error(err, "error calling setControllerReference for statefulset", "desiredSts.Namespace", - desiredSts.Namespace, "desireSts.Name", desiredSts.Name) - return result.Error(err) - } - - // "fix" the replica count, and maintain labels and annotations the k8s admin may have set - desiredSts.Spec.Replicas = statefulSet.Spec.Replicas - desiredSts.Labels = utils.MergeMap(map[string]string{}, statefulSet.Labels, desiredSts.Labels) - desiredSts.Annotations = utils.MergeMap(map[string]string{}, statefulSet.Annotations, desiredSts.Annotations) - - desiredSts.DeepCopyInto(statefulSet) - - rc.Recorder.Eventf(rc.Datacenter, corev1.EventTypeNormal, events.UpdatingRack, - "Force updating rack %s", rackName) - - if err := rc.setConditionStatus(api.DatacenterUpdating, corev1.ConditionTrue); err != nil { - return result.Error(err) - } - - if err := setOperatorProgressStatus(rc, api.ProgressUpdating); err != nil { - return result.Error(err) - } - - logger.Info("Force updating statefulset pod specs", - "statefulSet", statefulSet, - ) - - if err := rc.Client.Update(rc.Ctx, statefulSet); err != nil { - if errors.IsInvalid(err) { - if err = rc.deleteStatefulSet(statefulSet); err != nil { - return result.Error(err) - } - } else { - return result.Error(err) - } - } - } - } - - dcPatch := client.MergeFrom(dc.DeepCopy()) - dc.Spec.ForceUpgradeRacks = nil - - if err := rc.Client.Patch(rc.Ctx, dc, dcPatch); err != nil { - logger.Error(err, "error patching datacenter to clear force upgrade") - return result.Error(err) - } - - logger.Info("done CheckRackForceUpgrade()") - return result.Done() + return rc.CheckRackPodTemplate(true) } -*/ func (rc *ReconciliationContext) deleteStatefulSet(statefulSet *appsv1.StatefulSet) error { policy := metav1.DeletePropagationOrphan