From 5709ee425886d36c8500c95f98a3b6da876ea8b1 Mon Sep 17 00:00:00 2001 From: Lukas Piwowarski Date: Fri, 1 Nov 2024 10:34:02 -0400 Subject: [PATCH] Update logic for workflows There was a race condition in the previous implementation of the workflows that was caused by the dependency of the workflow feature on a ConfigMap that served as counter. This commit simplifies the logic of the Reconcile loop in two ways: 1. Removes the dependency for an external workflow counter 2. Introduces NextAction function which decides what next action should be performed by the Reconcile loop based on the current state of the OpenShift cluster. The input of this function is the instance which is currently being processed and a workflowLength. Using these two arguments it then tells the Reconcile loop which actions it should perform: Wait, CreateFirstJob, CreateNextJob, EndTesting, Failure. This approach should simplify the reconcile logic and move the test-operator towards a unified Reconcile loop. --- controllers/ansibletest_controller.go | 127 +++++++-------- controllers/common.go | 224 +++++++++++++++++--------- controllers/horizontest_controller.go | 80 +++++---- controllers/tempest_controller.go | 139 ++++++++-------- controllers/tobiko_controller.go | 116 +++++++------ 5 files changed, 382 insertions(+), 304 deletions(-) diff --git a/controllers/ansibletest_controller.go b/controllers/ansibletest_controller.go index b32ea14..c31ccc5 100644 --- a/controllers/ansibletest_controller.go +++ b/controllers/ansibletest_controller.go @@ -18,6 +18,8 @@ package controllers import ( "context" + "errors" + "fmt" "strconv" "time" @@ -37,7 +39,6 @@ import ( corev1 "k8s.io/api/core/v1" k8s_errors "k8s.io/apimachinery/pkg/api/errors" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -68,9 +69,6 @@ func (r *AnsibleTestReconciler) GetLogger(ctx context.Context) logr.Logger { func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, _err error) { Log := r.GetLogger(ctx) - // How much time should we wait before calling Reconcile loop when there is a failure - requeueAfter := time.Second * 60 - // Fetch the ansible instance instance := &testv1beta1.AnsibleTest{} err := r.Client.Get(ctx, req.NamespacedName, instance) @@ -81,11 +79,6 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, err } - workflowActive := false - if len(instance.Spec.Workflow) > 0 { - workflowActive = true - } - // Create a helper helper, err := helper.NewHelper( instance, @@ -142,40 +135,62 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) } - // Ensure that there is an external counter and read its value - // We use the external counter to keep track of the workflow steps - r.WorkflowStepCounterCreate(ctx, instance, helper) - externalWorkflowCounter := r.WorkflowStepCounterRead(ctx, instance) - if externalWorkflowCounter == -1 { - return ctrl.Result{RequeueAfter: requeueAfter}, nil - } + workflowLength := len(instance.Spec.Workflow) + nextAction, nextWorkflowStep, err := r.NextAction(ctx, instance, workflowLength) - // Each job that is being executed by the test operator has - currentWorkflowStep := 0 - runningAnsibleJob := &batchv1.Job{} - runningJobName := r.GetJobName(instance, externalWorkflowCounter-1) - err = r.Client.Get(ctx, client.ObjectKey{Namespace: instance.GetNamespace(), Name: runningJobName}, runningAnsibleJob) - if err != nil && !k8s_errors.IsNotFound(err) { + switch nextAction { + case Failure: return ctrl.Result{}, err - } else if err == nil { - currentWorkflowStep, _ = strconv.Atoi(runningAnsibleJob.Labels["workflowStep"]) - } - if r.CompletedJobExists(ctx, instance, currentWorkflowStep) { - // The job created by the instance was completed. Release the lock - // so that other instances can spawn a job. - instance.Status.Conditions.MarkTrue(condition.DeploymentReadyCondition, condition.DeploymentReadyMessage) - Log.Info("Job completed") + case Wait: + Log.Info(InfoWaitingOnJob) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, nil + + case EndTesting: + // All jobs created by the instance were completed. Release the lock + // so that other instances can spawn their jobs. if lockReleased, err := r.ReleaseLock(ctx, instance); !lockReleased { - return ctrl.Result{}, err + Log.Info(fmt.Sprintf(InfoCanNotReleaseLock, testOperatorLockName)) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, err } + + instance.Status.Conditions.MarkTrue( + condition.DeploymentReadyCondition, + condition.DeploymentReadyMessage) + + Log.Info(InfoTestingCompleted) + return ctrl.Result{}, nil + + case CreateFirstJob: + lockAcquired, err := r.AcquireLock(ctx, instance, helper, false) + if !lockAcquired { + Log.Info(fmt.Sprintf(InfoCanNotAcquireLock, testOperatorLockName)) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, err + } + + Log.Info(fmt.Sprintf(InfoCreatingFirstPod, nextWorkflowStep)) + + case CreateNextJob: + // Confirm that we still hold the lock. This is useful to check if for + // example somebody / something deleted the lock and it got claimed by + // another instance. This is considered to be an error state. + lockAcquired, err := r.AcquireLock(ctx, instance, helper, false) + if !lockAcquired { + Log.Error(err, ErrConfirmLockOwnership, testOperatorLockName) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, err + } + + Log.Info(fmt.Sprintf(InfoCreatingNextPod, nextWorkflowStep)) + + default: + return ctrl.Result{}, errors.New(ErrReceivedUnexpectedAction) } serviceLabels := map[string]string{ common.AppSelector: ansibletest.ServiceName, - "workflowStep": strconv.Itoa(externalWorkflowCounter), - "instanceName": instance.Name, - "operator": "test-operator", + workflowStepLabel: strconv.Itoa(nextWorkflowStep), + instanceNameLabel: instance.Name, + operatorNameLabel: "test-operator", } // Create PersistentVolumeClaim @@ -194,51 +209,30 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) } // Create PersistentVolumeClaim - end - // If the current job is executing the last workflow step -> do not create another job - if workflowActive && externalWorkflowCounter >= len(instance.Spec.Workflow) { - return ctrl.Result{}, nil - } else if !workflowActive && r.JobExists(ctx, instance, currentWorkflowStep) { - return ctrl.Result{}, nil - } - - // We are about to start job that spawns the pod with tests. - // This lock ensures that there is always only one pod running. - lockAcquired, err := r.AcquireLock(ctx, instance, helper, false) - if !lockAcquired { - Log.Info("Can not acquire lock") - requeueAfter := time.Second * 60 - return ctrl.Result{RequeueAfter: requeueAfter}, err - } - Log.Info("Lock acquired") - - if workflowActive { - r.WorkflowStepCounterIncrease(ctx, instance, helper) - } - instance.Status.Conditions.MarkTrue(condition.ServiceConfigReadyCondition, condition.ServiceConfigReadyMessage) // Create a new job mountCerts := r.CheckSecretExists(ctx, instance, "combined-ca-bundle") - jobName := r.GetJobName(instance, externalWorkflowCounter) - envVars, workflowOverrideParams := r.PrepareAnsibleEnv(instance, externalWorkflowCounter) + jobName := r.GetJobName(instance, nextWorkflowStep) + envVars, workflowOverrideParams := r.PrepareAnsibleEnv(instance, nextWorkflowStep) logsPVCName := r.GetPVCLogsName(instance, 0) containerImage, err := r.GetContainerImage(ctx, workflowOverrideParams["ContainerImage"], instance) - privileged := r.OverwriteAnsibleWithWorkflow(instance.Spec, "Privileged", "pbool", externalWorkflowCounter).(bool) + privileged := r.OverwriteAnsibleWithWorkflow(instance.Spec, "Privileged", "pbool", nextWorkflowStep).(bool) if err != nil { return ctrl.Result{}, err } - if externalWorkflowCounter < len(instance.Spec.Workflow) { - if instance.Spec.Workflow[externalWorkflowCounter].NodeSelector != nil { - instance.Spec.NodeSelector = *instance.Spec.Workflow[externalWorkflowCounter].NodeSelector + if nextWorkflowStep < len(instance.Spec.Workflow) { + if instance.Spec.Workflow[nextWorkflowStep].NodeSelector != nil { + instance.Spec.NodeSelector = *instance.Spec.Workflow[nextWorkflowStep].NodeSelector } - if instance.Spec.Workflow[externalWorkflowCounter].Tolerations != nil { - instance.Spec.Tolerations = *instance.Spec.Workflow[externalWorkflowCounter].Tolerations + if instance.Spec.Workflow[nextWorkflowStep].Tolerations != nil { + instance.Spec.Tolerations = *instance.Spec.Workflow[nextWorkflowStep].Tolerations } - if instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel != nil { - instance.Spec.SELinuxLevel = *instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel + if instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel != nil { + instance.Spec.SELinuxLevel = *instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel } } @@ -260,7 +254,7 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) mountCerts, envVars, workflowOverrideParams, - externalWorkflowCounter, + nextWorkflowStep, containerImage, privileged, ) @@ -297,7 +291,6 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrlResult, nil } // Create a new job - end - Log.Info("Reconciled Service successfully") return ctrl.Result{}, nil } diff --git a/controllers/common.go b/controllers/common.go index a194793..9d41070 100644 --- a/controllers/common.go +++ b/controllers/common.go @@ -37,13 +37,33 @@ const ( customDataConfigMapinfix = "-custom-data-step-" workflowStepNumInvalid = -1 workflowStepNameInvalid = "no-step-name" + workflowStepLabel = "workflowStep" + instanceNameLabel = "instanceName" + operatorNameLabel = "operator" testOperatorLockName = "test-operator-lock" testOperatorLockOnwerField = "owner" ) const ( - ErrNetworkAttachments = "not all pods have interfaces with ips as configured in NetworkAttachments: %s" + ErrNetworkAttachments = "not all pods have interfaces with ips as configured in NetworkAttachments: %s" + ErrReceivedUnexpectedAction = "unexpected action received" + ErrConfirmLockOwnership = "can not confirm ownership of %s lock" +) + +const ( + InfoWaitingOnJob = "Waiting on either job to finish or release of the lock." + InfoTestingCompleted = "Testing completed. All pods spawned by the test-operator finished." + InfoCreatingFirstPod = "Creating first test pod (workflow step %d)." + InfoCreatingNextPod = "Creating next test pod (workflow step %d)." + InfoCanNotAcquireLock = "Can not acquire %s lock." + InfoCanNotReleaseLock = "Can not release %s lock." +) + +const ( + // RequeueAfterValue tells how much time should we wait before calling Reconcile + // loop again. + RequeueAfterValue = time.Second * 60 ) type Reconciler struct { @@ -53,6 +73,126 @@ type Reconciler struct { Scheme *runtime.Scheme } +// NextAction holds an action that should be performed by the Reconcile loop. +type NextAction int + +const ( + // Wait indicates that we should wait for the state of the OpenShift cluster + // to change + Wait = iota + + // CreateFirstJob indicates that the Reconcile loop should create the first job + // either specified in the .Spec section or in the .Spec.Workflow section. + CreateFirstJob + + // CreateNextJob indicates that the Reconcile loop should create a next job + // specified in the .Spec.Workflow section (if .Spec.Workflow is defined) + CreateNextJob + + // EndTesting indicates that all jobs have already finished. The Reconcile + // loop should end the testing and release resources that are required to + // be release (e.g., global lock) + EndTesting + + // Failure indicates that an unexpected error was encountered + Failure +) + +// NextAction indicates what action needs to be performed by the Reconcile loop +// based on the current state of the OpenShift cluster. +func (r *Reconciler) NextAction( + ctx context.Context, + instance client.Object, + workflowLength int, +) (NextAction, int, error) { + // Get the latest job. The latest job is job with the highest value stored + // in workflowStep label + workflowStepIdx := 0 + lastJob, err := r.GetLastJob(ctx, instance) + if err != nil { + return Failure, workflowStepIdx, err + } + + // If there is a job associated with the current instance. + if lastJob != nil { + workflowStepIdx, err := strconv.Atoi(lastJob.Labels[workflowStepLabel]) + if err != nil { + return Failure, workflowStepIdx, err + } + + // If the last job is not in Failed or Succeded state -> Wait + lastJobFinished := (lastJob.Status.Failed + lastJob.Status.Succeeded) > 0 + if !lastJobFinished { + return Wait, workflowStepIdx, nil + } + + // If the last job is in Failed or Succeeded state and it is NOT the last + // job which was supposed to be created -> CreateNextJob + if lastJobFinished && !isLastJobIndex(workflowStepIdx, workflowLength) { + workflowStepIdx++ + return CreateNextJob, workflowStepIdx, nil + } + + // Otherwise if the job is in Failed or Succeded stated and it IS the + // last job -> EndTesting + if lastJobFinished && isLastJobIndex(workflowStepIdx, workflowLength) { + return EndTesting, workflowStepIdx, nil + } + } + + // If there is not any job associated with the instance -> createFirstJob + if lastJob == nil { + return CreateFirstJob, workflowStepIdx, nil + } + + return Failure, workflowStepIdx, nil +} + +// isLastJobIndex returns true when jobIndex is the index of the last job that +// should be executed. Otherwise the return value is false. +func isLastJobIndex(jobIndex int, workflowLength int) bool { + switch workflowLength { + case 0: + return jobIndex == workflowLength + default: + return jobIndex == (workflowLength - 1) + } +} + +// GetLastJob returns job associated with an instance which has the highest value +// stored in the workflowStep label +func (r *Reconciler) GetLastJob( + ctx context.Context, + instance client.Object, +) (*batchv1.Job, error) { + labels := map[string]string{instanceNameLabel: instance.GetName()} + namespaceListOpt := client.InNamespace(instance.GetNamespace()) + labelsListOpt := client.MatchingLabels(labels) + jobList := &batchv1.JobList{} + err := r.Client.List(ctx, jobList, namespaceListOpt, labelsListOpt) + if err != nil { + return nil, err + } + + var maxJob *batchv1.Job + maxJobWorkflowStep := 0 + + for _, job := range jobList.Items { + workflowStep, err := strconv.Atoi(job.Labels[workflowStepLabel]) + if err != nil { + return &batchv1.Job{}, err + } + + if workflowStep >= maxJobWorkflowStep { + maxJobWorkflowStep = workflowStep + newMaxJob := job + maxJob = &newMaxJob + } + } + + return maxJob, nil +} + func GetEnvVarsConfigMapName(instance interface{}, workflowStepNum int) string { if _, ok := instance.(*v1beta1.Tobiko); ok { return "not-implemented" @@ -351,10 +491,11 @@ func (r *Reconciler) AcquireLock( return true, nil } - _, err := r.GetLockInfo(ctx, instance) + instanceGUID := string(instance.GetUID()) + cm, err := r.GetLockInfo(ctx, instance) if err != nil && k8s_errors.IsNotFound(err) { cm := map[string]string{ - testOperatorLockOnwerField: string(instance.GetUID()), + testOperatorLockOnwerField: instanceGUID, } cms := []util.Template{ @@ -369,6 +510,10 @@ func (r *Reconciler) AcquireLock( return err == nil, err } + if cm.Data[testOperatorLockOnwerField] == instanceGUID { + return true, nil + } + return false, err } @@ -377,7 +522,7 @@ func (r *Reconciler) ReleaseLock(ctx context.Context, instance client.Object) (b cm, err := r.GetLockInfo(ctx, instance) if err != nil && k8s_errors.IsNotFound(err) { - return false, nil + return true, nil } else if err != nil { return false, err } @@ -408,77 +553,6 @@ func (r *Reconciler) ReleaseLock(ctx context.Context, instance client.Object) (b return false, errors.New("failed to delete test-operator-lock") } -func (r *Reconciler) WorkflowStepCounterCreate(ctx context.Context, instance client.Object, h *helper.Helper) bool { - cm := &corev1.ConfigMap{} - err := r.Client.Get(ctx, client.ObjectKey{Namespace: instance.GetNamespace(), Name: r.GetWorkflowConfigMapName(instance)}, cm) - if err == nil { - return true - } - - counterData := make(map[string]string) - counterData["counter"] = "0" - - cms := []util.Template{ - { - Name: r.GetWorkflowConfigMapName(instance), - Namespace: instance.GetNamespace(), - CustomData: counterData, - }, - } - - err = configmap.EnsureConfigMaps(ctx, h, instance, cms, nil) - return err == nil -} - -func (r *Reconciler) WorkflowStepCounterIncrease(ctx context.Context, instance client.Object, h *helper.Helper) bool { - cm := &corev1.ConfigMap{} - err := r.Client.Get(ctx, client.ObjectKey{Namespace: instance.GetNamespace(), Name: r.GetWorkflowConfigMapName(instance)}, cm) - if err != nil { - return false - } - - counterValue, _ := strconv.Atoi(cm.Data["counter"]) - newCounterValue := strconv.Itoa(counterValue + 1) - cm.Data["counter"] = newCounterValue - - cms := []util.Template{ - { - Name: r.GetWorkflowConfigMapName(instance), - Namespace: instance.GetNamespace(), - CustomData: cm.Data, - }, - } - - err = configmap.EnsureConfigMaps(ctx, h, instance, cms, nil) - return err == nil -} - -func (r *Reconciler) WorkflowStepCounterRead(ctx context.Context, instance client.Object) int { - cm := &corev1.ConfigMap{} - err := r.Client.Get(ctx, client.ObjectKey{Namespace: instance.GetNamespace(), Name: r.GetWorkflowConfigMapName(instance)}, cm) - if err != nil { - return workflowStepNumInvalid - } - - counter, _ := strconv.Atoi(cm.Data["counter"]) - return counter -} - -func (r *Reconciler) CompletedJobExists(ctx context.Context, instance client.Object, workflowStepNum int) bool { - job := &batchv1.Job{} - err := r.Client.Get(ctx, client.ObjectKey{Namespace: instance.GetNamespace(), Name: r.GetJobName(instance, workflowStepNum)}, job) - - if err != nil { - return false - } - - if job.Status.Succeeded > 0 || job.Status.Failed > 0 { - return true - } - - return false -} - func (r *Reconciler) JobExists(ctx context.Context, instance client.Object, workflowStepNum int) bool { job := &batchv1.Job{} jobName := r.GetJobName(instance, workflowStepNum) diff --git a/controllers/horizontest_controller.go b/controllers/horizontest_controller.go index a120477..62b8e02 100644 --- a/controllers/horizontest_controller.go +++ b/controllers/horizontest_controller.go @@ -18,6 +18,8 @@ package controllers import ( "context" + "errors" + "fmt" "time" "github.com/go-logr/logr" @@ -63,10 +65,6 @@ func (r *HorizonTestReconciler) GetLogger(ctx context.Context) logr.Logger { // Reconcile - HorizonTest func (r *HorizonTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, _err error) { Log := r.GetLogger(ctx) - - // How much time should we wait before calling Reconcile loop when there is a failure - requeueAfter := time.Second * 60 - instance := &testv1beta1.HorizonTest{} err := r.Client.Get(ctx, req.NamespacedName, instance) if err != nil { @@ -130,25 +128,66 @@ func (r *HorizonTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) } - if r.CompletedJobExists(ctx, instance, 0) { - // The job created by the instance was completed. Release the lock - // so that other instances can spawn a job. - instance.Status.Conditions.MarkTrue(condition.DeploymentReadyCondition, condition.DeploymentReadyMessage) - Log.Info("Job completed") + workflowLength := 0 + nextAction, nextWorkflowStep, err := r.NextAction(ctx, instance, workflowLength) + + switch nextAction { + case Failure: + return ctrl.Result{}, err + + case Wait: + Log.Info(InfoWaitingOnJob) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, nil + + case EndTesting: + // All jobs created by the instance were completed. Release the lock + // so that other instances can spawn their jobs. if lockReleased, err := r.ReleaseLock(ctx, instance); !lockReleased { - return ctrl.Result{}, err + Log.Info(fmt.Sprintf(InfoCanNotReleaseLock, testOperatorLockName)) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, err + } + + instance.Status.Conditions.MarkTrue( + condition.DeploymentReadyCondition, + condition.DeploymentReadyMessage) + + Log.Info(InfoTestingCompleted) + return ctrl.Result{}, nil + + case CreateFirstJob: + lockAcquired, err := r.AcquireLock(ctx, instance, helper, instance.Spec.Parallel) + if !lockAcquired { + Log.Info(fmt.Sprintf(InfoCanNotAcquireLock, testOperatorLockName)) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, err } + + Log.Info(fmt.Sprintf(InfoCreatingFirstPod, nextWorkflowStep)) + + case CreateNextJob: + // Confirm that we still hold the lock. This is useful to check if for + // example somebody / something deleted the lock and it got claimed by + // another instance. This is considered to be an error state. + lockAcquired, err := r.AcquireLock(ctx, instance, helper, instance.Spec.Parallel) + if !lockAcquired { + Log.Error(err, ErrConfirmLockOwnership, testOperatorLockName) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, err + } + + Log.Info(fmt.Sprintf(InfoCreatingNextPod, nextWorkflowStep)) + + default: + return ctrl.Result{}, errors.New(ErrReceivedUnexpectedAction) } serviceLabels := map[string]string{ common.AppSelector: horizontest.ServiceName, - "instanceName": instance.Name, - "operator": "test-operator", + instanceNameLabel: instance.Name, + operatorNameLabel: "test-operator", // NOTE(lpiwowar): This is a workaround since the Horizontest CR does not support // workflows. However, the label might be required by automation that // consumes the test-operator (e.g., ci-framework). - "workflowStep": "0", + workflowStepLabel: "0", } yamlResult, err := EnsureCloudsConfigMapExists(ctx, instance, helper, serviceLabels) @@ -183,20 +222,6 @@ func (r *HorizonTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) mountKubeconfig = true } - // If the current job is executing the last workflow step -> do not create another job - if r.JobExists(ctx, instance, 0) { - return ctrl.Result{}, nil - } - - // We are about to start job that spawns the pod with tests. - // This lock ensures that there is always only one pod running. - lockAcquired, err := r.AcquireLock(ctx, instance, helper, instance.Spec.Parallel) - if !lockAcquired { - Log.Info("Cannot acquire lock") - return ctrl.Result{RequeueAfter: requeueAfter}, err - } - Log.Info("Lock acquired") - // Prepare HorizonTest env vars envVars := r.PrepareHorizonTestEnvVars(instance) jobName := r.GetJobName(instance, 0) @@ -253,7 +278,6 @@ func (r *HorizonTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrlResult, nil } // create Job - end - Log.Info("Reconciled Service successfully") return ctrl.Result{}, nil } diff --git a/controllers/tempest_controller.go b/controllers/tempest_controller.go index 9551db6..d82ff24 100644 --- a/controllers/tempest_controller.go +++ b/controllers/tempest_controller.go @@ -18,6 +18,7 @@ package controllers import ( "context" + "errors" "fmt" "strconv" "time" @@ -39,7 +40,6 @@ import ( corev1 "k8s.io/api/core/v1" k8s_errors "k8s.io/apimachinery/pkg/api/errors" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -71,9 +71,6 @@ func (r *TempestReconciler) GetLogger(ctx context.Context) logr.Logger { func (r *TempestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, _err error) { Log := r.GetLogger(ctx) - // How much time should we wait before calling Reconcile loop when there is a failure - requeueAfter := time.Second * 60 - // Fetch the Tempest instance instance := &testv1beta1.Tempest{} err := r.Client.Get(ctx, req.NamespacedName, instance) @@ -84,11 +81,6 @@ func (r *TempestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (re return ctrl.Result{}, err } - workflowActive := false - if len(instance.Spec.Workflow) > 0 { - workflowActive = true - } - // Create a helper helper, err := helper.NewHelper( instance, @@ -160,45 +152,68 @@ func (r *TempestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (re return r.reconcileDelete(ctx, instance, helper) } - // Ensure that there is an external counter and read its value - // We use the external counter to keep track of the workflow steps - r.WorkflowStepCounterCreate(ctx, instance, helper) - externalWorkflowCounter := r.WorkflowStepCounterRead(ctx, instance) - if externalWorkflowCounter == -1 { - return ctrl.Result{RequeueAfter: requeueAfter}, nil - } + workflowLength := len(instance.Spec.Workflow) + nextAction, nextWorkflowStep, err := r.NextAction(ctx, instance, workflowLength) - // Each job that is being executed by the test operator has - currentWorkflowStep := 0 - runningTobikoJob := &batchv1.Job{} - runningJobName := r.GetJobName(instance, externalWorkflowCounter-1) - err = r.Client.Get(ctx, client.ObjectKey{Namespace: instance.GetNamespace(), Name: runningJobName}, runningTobikoJob) - if err == nil { - currentWorkflowStep, _ = strconv.Atoi(runningTobikoJob.Labels["workflowStep"]) - } + switch nextAction { + case Failure: + return ctrl.Result{}, err + + case Wait: + Log.Info(InfoWaitingOnJob) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, nil - if r.CompletedJobExists(ctx, instance, currentWorkflowStep) { - // The job created by the instance was completed. Release the lock - // so that other instances can spawn a job. - instance.Status.Conditions.MarkTrue(condition.DeploymentReadyCondition, condition.DeploymentReadyMessage) - Log.Info("Job completed") + case EndTesting: + // All jobs created by the instance were completed. Release the lock + // so that other instances can spawn their jobs. if lockReleased, err := r.ReleaseLock(ctx, instance); !lockReleased { - return ctrl.Result{}, err + Log.Info(fmt.Sprintf(InfoCanNotReleaseLock, testOperatorLockName)) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, err + } + + instance.Status.Conditions.MarkTrue( + condition.DeploymentReadyCondition, + condition.DeploymentReadyMessage) + + Log.Info(InfoTestingCompleted) + return ctrl.Result{}, nil + + case CreateFirstJob: + lockAcquired, err := r.AcquireLock(ctx, instance, helper, instance.Spec.Parallel) + if !lockAcquired { + Log.Info(fmt.Sprintf(InfoCanNotAcquireLock, testOperatorLockName)) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, err } + + Log.Info(fmt.Sprintf(InfoCreatingFirstPod, nextWorkflowStep)) + + case CreateNextJob: + // Confirm that we still hold the lock. This is useful to check if for + // example somebody / something deleted the lock and it got claimed by + // another instance. This is considered to be an error state. + lockAcquired, err := r.AcquireLock(ctx, instance, helper, instance.Spec.Parallel) + if !lockAcquired { + Log.Error(err, ErrConfirmLockOwnership, testOperatorLockName) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, err + } + + Log.Info(fmt.Sprintf(InfoCreatingNextPod, nextWorkflowStep)) + + default: + return ctrl.Result{}, errors.New(ErrReceivedUnexpectedAction) } serviceLabels := map[string]string{ common.AppSelector: tempest.ServiceName, - "workflowStep": strconv.Itoa(externalWorkflowCounter), - "instanceName": instance.Name, - "operator": "test-operator", + workflowStepLabel: strconv.Itoa(nextWorkflowStep), + instanceNameLabel: instance.Name, + operatorNameLabel: "test-operator", } workflowStepNum := 0 - // Create multiple PVCs for parallel execution - if instance.Spec.Parallel && externalWorkflowCounter < len(instance.Spec.Workflow) { - workflowStepNum = externalWorkflowCounter + if instance.Spec.Parallel && nextWorkflowStep < len(instance.Spec.Workflow) { + workflowStepNum = nextWorkflowStep } // Create PersistentVolumeClaim @@ -223,29 +238,8 @@ func (r *TempestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (re mountSSHKey = r.CheckSecretExists(ctx, instance, instance.Spec.SSHKeySecretName) } - // If the current job is executing the last workflow step -> do not create another job - if workflowActive && externalWorkflowCounter >= len(instance.Spec.Workflow) { - return ctrl.Result{}, nil - } else if !workflowActive && r.JobExists(ctx, instance, currentWorkflowStep) { - return ctrl.Result{}, nil - } - - // We are about to start job that spawns the pod with tests. - // This lock ensures that there is always only one pod running. - lockAcquired, err := r.AcquireLock(ctx, instance, helper, instance.Spec.Parallel) - if !lockAcquired { - Log.Info("Can not acquire lock") - requeueAfter := time.Second * 60 - return ctrl.Result{RequeueAfter: requeueAfter}, err - } - Log.Info("Lock acquired") - - if workflowActive { - r.WorkflowStepCounterIncrease(ctx, instance, helper) - } - // Generate ConfigMaps - err = r.generateServiceConfigMaps(ctx, helper, instance, externalWorkflowCounter) + err = r.generateServiceConfigMaps(ctx, helper, instance, nextWorkflowStep) if err != nil { instance.Status.Conditions.Set(condition.FalseCondition( condition.ServiceConfigReadyCondition, @@ -293,7 +287,7 @@ func (r *TempestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (re } // NetworkAttachments - if r.JobExists(ctx, instance, externalWorkflowCounter) { + if r.JobExists(ctx, instance, nextWorkflowStep) { networkReady, networkAttachmentStatus, err := nad.VerifyNetworkStatusFromAnnotation( ctx, helper, @@ -327,9 +321,9 @@ func (r *TempestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (re // Create a new job mountCerts := r.CheckSecretExists(ctx, instance, "combined-ca-bundle") - customDataConfigMapName := GetCustomDataConfigMapName(instance, externalWorkflowCounter) - EnvVarsConfigMapName := GetEnvVarsConfigMapName(instance, externalWorkflowCounter) - jobName := r.GetJobName(instance, externalWorkflowCounter) + customDataConfigMapName := GetCustomDataConfigMapName(instance, nextWorkflowStep) + EnvVarsConfigMapName := GetEnvVarsConfigMapName(instance, nextWorkflowStep) + jobName := r.GetJobName(instance, nextWorkflowStep) logsPVCName := r.GetPVCLogsName(instance, workflowStepNum) containerImage, err := r.GetContainerImage(ctx, instance.Spec.ContainerImage, instance) if err != nil { @@ -348,17 +342,17 @@ func (r *TempestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (re // Note(lpiwowar): Remove all the workflow merge code to webhook once it is done. // It will simplify the logic and duplicite code (Tempest vs Tobiko) - if externalWorkflowCounter < len(instance.Spec.Workflow) { - if instance.Spec.Workflow[externalWorkflowCounter].NodeSelector != nil { - instance.Spec.NodeSelector = *instance.Spec.Workflow[externalWorkflowCounter].NodeSelector + if nextWorkflowStep < len(instance.Spec.Workflow) { + if instance.Spec.Workflow[nextWorkflowStep].NodeSelector != nil { + instance.Spec.NodeSelector = *instance.Spec.Workflow[nextWorkflowStep].NodeSelector } - if instance.Spec.Workflow[externalWorkflowCounter].Tolerations != nil { - instance.Spec.Tolerations = *instance.Spec.Workflow[externalWorkflowCounter].Tolerations + if instance.Spec.Workflow[nextWorkflowStep].Tolerations != nil { + instance.Spec.Tolerations = *instance.Spec.Workflow[nextWorkflowStep].Tolerations } - if instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel != nil { - instance.Spec.SELinuxLevel = *instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel + if instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel != nil { + instance.Spec.SELinuxLevel = *instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel } } @@ -388,7 +382,7 @@ func (r *TempestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (re // Release the lock and allow other controllers to spawn // a job. if lockReleased, lockErr := r.ReleaseLock(ctx, instance); lockReleased { - return ctrl.Result{}, lockErr + return ctrl.Result{RequeueAfter: RequeueAfterValue}, lockErr } instance.Status.Conditions.Set(condition.FalseCondition( @@ -408,7 +402,6 @@ func (r *TempestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (re } // Create a new job - end - Log.Info("Reconciled Service successfully") return ctrl.Result{}, nil } @@ -650,8 +643,8 @@ func (r *TempestReconciler) generateServiceConfigMaps( cmLabels := labels.GetLabels(instance, labels.GetGroupLabel(tempest.ServiceName), map[string]string{}) operatorLabels := map[string]string{ - "operator": "test-operator", - "instanceName": instance.Name, + operatorNameLabel: "test-operator", + instanceNameLabel: instance.Name, } // Combine labels diff --git a/controllers/tobiko_controller.go b/controllers/tobiko_controller.go index 36e0058..ee60298 100644 --- a/controllers/tobiko_controller.go +++ b/controllers/tobiko_controller.go @@ -18,6 +18,7 @@ package controllers import ( "context" + "errors" "fmt" "strconv" "time" @@ -39,7 +40,6 @@ import ( corev1 "k8s.io/api/core/v1" k8s_errors "k8s.io/apimachinery/pkg/api/errors" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -71,9 +71,6 @@ func (r *TobikoReconciler) GetLogger(ctx context.Context) logr.Logger { func (r *TobikoReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, _err error) { Log := r.GetLogger(ctx) - // How much time should we wait before calling Reconcile loop when there is a failure - requeueAfter := time.Second * 60 - instance := &testv1beta1.Tobiko{} err := r.Client.Get(ctx, req.NamespacedName, instance) if err != nil { @@ -83,12 +80,6 @@ func (r *TobikoReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res return ctrl.Result{}, err } - // Check whether the user wants to execute workflow - workflowActive := false - if len(instance.Spec.Workflow) > 0 { - workflowActive = true - } - helper, err := helper.NewHelper( instance, r.Client, @@ -148,38 +139,62 @@ func (r *TobikoReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res instance.Status.NetworkAttachments = map[string][]string{} } - // Ensure that there is an external counter and read its value - // We use the external counter to keep track of the workflow steps - r.WorkflowStepCounterCreate(ctx, instance, helper) - externalWorkflowCounter := r.WorkflowStepCounterRead(ctx, instance) - if externalWorkflowCounter == -1 { - return ctrl.Result{RequeueAfter: requeueAfter}, nil - } + workflowLength := len(instance.Spec.Workflow) + nextAction, nextWorkflowStep, err := r.NextAction(ctx, instance, workflowLength) - // Each job that is being executed by the test operator has - currentWorkflowStep := 0 - runningTobikoJob := &batchv1.Job{} - runningJobName := r.GetJobName(instance, externalWorkflowCounter-1) - err = r.Client.Get(ctx, client.ObjectKey{Namespace: instance.GetNamespace(), Name: runningJobName}, runningTobikoJob) - if err == nil { - currentWorkflowStep, _ = strconv.Atoi(runningTobikoJob.Labels["workflowStep"]) - } + switch nextAction { + case Failure: + return ctrl.Result{}, err + + case Wait: + Log.Info(InfoWaitingOnJob) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, nil - if r.CompletedJobExists(ctx, instance, currentWorkflowStep) { - instance.Status.Conditions.MarkTrue(condition.DeploymentReadyCondition, condition.DeploymentReadyMessage) - // The job created by the instance was completed. Release the lock - // so that other instances can spawn a job. - Log.Info("Job completed") + case EndTesting: + // All jobs created by the instance were completed. Release the lock + // so that other instances can spawn their jobs. if lockReleased, err := r.ReleaseLock(ctx, instance); !lockReleased { - return ctrl.Result{}, err + Log.Info(fmt.Sprintf(InfoCanNotReleaseLock, testOperatorLockName)) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, err + } + + instance.Status.Conditions.MarkTrue( + condition.DeploymentReadyCondition, + condition.DeploymentReadyMessage) + + Log.Info(InfoTestingCompleted) + return ctrl.Result{}, nil + + case CreateFirstJob: + lockAcquired, err := r.AcquireLock(ctx, instance, helper, instance.Spec.Parallel) + if !lockAcquired { + Log.Info(fmt.Sprintf(InfoCanNotAcquireLock, testOperatorLockName)) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, err } + + Log.Info(fmt.Sprintf(InfoCreatingFirstPod, nextWorkflowStep)) + + case CreateNextJob: + // Confirm that we still hold the lock. This needs to be checked in order + // to prevent situation when somebody / something deleted the lock and it + // got claimedy by another instance. + lockAcquired, err := r.AcquireLock(ctx, instance, helper, instance.Spec.Parallel) + if !lockAcquired { + Log.Error(err, ErrConfirmLockOwnership, testOperatorLockName) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, err + } + + Log.Info(fmt.Sprintf(InfoCreatingNextPod, nextWorkflowStep)) + + default: + return ctrl.Result{}, errors.New(ErrReceivedUnexpectedAction) } serviceLabels := map[string]string{ common.AppSelector: tobiko.ServiceName, - "workflowStep": strconv.Itoa(externalWorkflowCounter), - "instanceName": instance.Name, - "operator": "test-operator", + workflowStepLabel: strconv.Itoa(nextWorkflowStep), + instanceNameLabel: instance.Name, + operatorNameLabel: "test-operator", } yamlResult, err := EnsureCloudsConfigMapExists(ctx, instance, helper, serviceLabels) @@ -191,8 +206,8 @@ func (r *TobikoReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res workflowStepNum := 0 // Create multiple PVCs for parallel execution - if instance.Spec.Parallel && externalWorkflowCounter < len(instance.Spec.Workflow) { - workflowStepNum = externalWorkflowCounter + if instance.Spec.Parallel && nextWorkflowStep < len(instance.Spec.Workflow) { + workflowStepNum = nextWorkflowStep } // Create PersistentVolumeClaim @@ -246,7 +261,7 @@ func (r *TobikoReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res } // NetworkAttachments - if r.JobExists(ctx, instance, externalWorkflowCounter) { + if r.JobExists(ctx, instance, nextWorkflowStep) { networkReady, networkAttachmentStatus, err := nad.VerifyNetworkStatusFromAnnotation( ctx, helper, @@ -293,32 +308,12 @@ func (r *TobikoReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res mountKubeconfig = true } - // If the current job is executing the last workflow step -> do not create another job - if workflowActive && externalWorkflowCounter >= len(instance.Spec.Workflow) { - return ctrl.Result{}, nil - } else if !workflowActive && r.JobExists(ctx, instance, currentWorkflowStep) { - return ctrl.Result{}, nil - } - - // We are about to start job that spawns the pod with tests. - // This lock ensures that there is always only one pod running. - lockAcquired, err := r.AcquireLock(ctx, instance, helper, instance.Spec.Parallel) - if !lockAcquired { - Log.Info("Can not acquire lock") - return ctrl.Result{RequeueAfter: requeueAfter}, err - } - Log.Info("Lock acquired") - - if workflowActive { - r.WorkflowStepCounterIncrease(ctx, instance, helper) - } - // Prepare Tobiko env vars - envVars := r.PrepareTobikoEnvVars(ctx, serviceLabels, instance, helper, externalWorkflowCounter) - jobName := r.GetJobName(instance, externalWorkflowCounter) + envVars := r.PrepareTobikoEnvVars(ctx, serviceLabels, instance, helper, nextWorkflowStep) + jobName := r.GetJobName(instance, nextWorkflowStep) logsPVCName := r.GetPVCLogsName(instance, workflowStepNum) containerImage, err := r.GetContainerImage(ctx, instance.Spec.ContainerImage, instance) - privileged := r.OverwriteValueWithWorkflow(instance.Spec, "Privileged", "pbool", externalWorkflowCounter).(bool) + privileged := r.OverwriteValueWithWorkflow(instance.Spec, "Privileged", "pbool", nextWorkflowStep).(bool) if err != nil { return ctrl.Result{}, err } @@ -372,7 +367,6 @@ func (r *TobikoReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res return ctrlResult, nil } // create Job - end - Log.Info("Reconciled Service successfully") return ctrl.Result{}, nil }