diff --git a/controllers/ansibletest_controller.go b/controllers/ansibletest_controller.go index b32ea14..c31ccc5 100644 --- a/controllers/ansibletest_controller.go +++ b/controllers/ansibletest_controller.go @@ -18,6 +18,8 @@ package controllers import ( "context" + "errors" + "fmt" "strconv" "time" @@ -37,7 +39,6 @@ import ( corev1 "k8s.io/api/core/v1" k8s_errors "k8s.io/apimachinery/pkg/api/errors" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -68,9 +69,6 @@ func (r *AnsibleTestReconciler) GetLogger(ctx context.Context) logr.Logger { func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, _err error) { Log := r.GetLogger(ctx) - // How much time should we wait before calling Reconcile loop when there is a failure - requeueAfter := time.Second * 60 - // Fetch the ansible instance instance := &testv1beta1.AnsibleTest{} err := r.Client.Get(ctx, req.NamespacedName, instance) @@ -81,11 +79,6 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, err } - workflowActive := false - if len(instance.Spec.Workflow) > 0 { - workflowActive = true - } - // Create a helper helper, err := helper.NewHelper( instance, @@ -142,40 +135,62 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) } - // Ensure that there is an external counter and read its value - // We use the external counter to keep track of the workflow steps - r.WorkflowStepCounterCreate(ctx, instance, helper) - externalWorkflowCounter := r.WorkflowStepCounterRead(ctx, instance) - if externalWorkflowCounter == -1 { - return ctrl.Result{RequeueAfter: requeueAfter}, nil - } + workflowLength := len(instance.Spec.Workflow) + nextAction, nextWorkflowStep, err := r.NextAction(ctx, instance, workflowLength) - // Each job that is being executed by the test operator has - currentWorkflowStep := 0 - runningAnsibleJob := &batchv1.Job{} - runningJobName := r.GetJobName(instance, externalWorkflowCounter-1) - err = r.Client.Get(ctx, client.ObjectKey{Namespace: instance.GetNamespace(), Name: runningJobName}, runningAnsibleJob) - if err != nil && !k8s_errors.IsNotFound(err) { + switch nextAction { + case Failure: return ctrl.Result{}, err - } else if err == nil { - currentWorkflowStep, _ = strconv.Atoi(runningAnsibleJob.Labels["workflowStep"]) - } - if r.CompletedJobExists(ctx, instance, currentWorkflowStep) { - // The job created by the instance was completed. Release the lock - // so that other instances can spawn a job. - instance.Status.Conditions.MarkTrue(condition.DeploymentReadyCondition, condition.DeploymentReadyMessage) - Log.Info("Job completed") + case Wait: + Log.Info(InfoWaitingOnJob) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, nil + + case EndTesting: + // All jobs created by the instance were completed. Release the lock + // so that other instances can spawn their jobs. if lockReleased, err := r.ReleaseLock(ctx, instance); !lockReleased { - return ctrl.Result{}, err + Log.Info(fmt.Sprintf(InfoCanNotReleaseLock, testOperatorLockName)) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, err } + + instance.Status.Conditions.MarkTrue( + condition.DeploymentReadyCondition, + condition.DeploymentReadyMessage) + + Log.Info(InfoTestingCompleted) + return ctrl.Result{}, nil + + case CreateFirstJob: + lockAcquired, err := r.AcquireLock(ctx, instance, helper, false) + if !lockAcquired { + Log.Info(fmt.Sprintf(InfoCanNotAcquireLock, testOperatorLockName)) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, err + } + + Log.Info(fmt.Sprintf(InfoCreatingFirstPod, nextWorkflowStep)) + + case CreateNextJob: + // Confirm that we still hold the lock. This is useful to check if for + // example somebody / something deleted the lock and it got claimed by + // another instance. This is considered to be an error state. + lockAcquired, err := r.AcquireLock(ctx, instance, helper, false) + if !lockAcquired { + Log.Error(err, ErrConfirmLockOwnership, testOperatorLockName) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, err + } + + Log.Info(fmt.Sprintf(InfoCreatingNextPod, nextWorkflowStep)) + + default: + return ctrl.Result{}, errors.New(ErrReceivedUnexpectedAction) } serviceLabels := map[string]string{ common.AppSelector: ansibletest.ServiceName, - "workflowStep": strconv.Itoa(externalWorkflowCounter), - "instanceName": instance.Name, - "operator": "test-operator", + workflowStepLabel: strconv.Itoa(nextWorkflowStep), + instanceNameLabel: instance.Name, + operatorNameLabel: "test-operator", } // Create PersistentVolumeClaim @@ -194,51 +209,30 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) } // Create PersistentVolumeClaim - end - // If the current job is executing the last workflow step -> do not create another job - if workflowActive && externalWorkflowCounter >= len(instance.Spec.Workflow) { - return ctrl.Result{}, nil - } else if !workflowActive && r.JobExists(ctx, instance, currentWorkflowStep) { - return ctrl.Result{}, nil - } - - // We are about to start job that spawns the pod with tests. - // This lock ensures that there is always only one pod running. - lockAcquired, err := r.AcquireLock(ctx, instance, helper, false) - if !lockAcquired { - Log.Info("Can not acquire lock") - requeueAfter := time.Second * 60 - return ctrl.Result{RequeueAfter: requeueAfter}, err - } - Log.Info("Lock acquired") - - if workflowActive { - r.WorkflowStepCounterIncrease(ctx, instance, helper) - } - instance.Status.Conditions.MarkTrue(condition.ServiceConfigReadyCondition, condition.ServiceConfigReadyMessage) // Create a new job mountCerts := r.CheckSecretExists(ctx, instance, "combined-ca-bundle") - jobName := r.GetJobName(instance, externalWorkflowCounter) - envVars, workflowOverrideParams := r.PrepareAnsibleEnv(instance, externalWorkflowCounter) + jobName := r.GetJobName(instance, nextWorkflowStep) + envVars, workflowOverrideParams := r.PrepareAnsibleEnv(instance, nextWorkflowStep) logsPVCName := r.GetPVCLogsName(instance, 0) containerImage, err := r.GetContainerImage(ctx, workflowOverrideParams["ContainerImage"], instance) - privileged := r.OverwriteAnsibleWithWorkflow(instance.Spec, "Privileged", "pbool", externalWorkflowCounter).(bool) + privileged := r.OverwriteAnsibleWithWorkflow(instance.Spec, "Privileged", "pbool", nextWorkflowStep).(bool) if err != nil { return ctrl.Result{}, err } - if externalWorkflowCounter < len(instance.Spec.Workflow) { - if instance.Spec.Workflow[externalWorkflowCounter].NodeSelector != nil { - instance.Spec.NodeSelector = *instance.Spec.Workflow[externalWorkflowCounter].NodeSelector + if nextWorkflowStep < len(instance.Spec.Workflow) { + if instance.Spec.Workflow[nextWorkflowStep].NodeSelector != nil { + instance.Spec.NodeSelector = *instance.Spec.Workflow[nextWorkflowStep].NodeSelector } - if instance.Spec.Workflow[externalWorkflowCounter].Tolerations != nil { - instance.Spec.Tolerations = *instance.Spec.Workflow[externalWorkflowCounter].Tolerations + if instance.Spec.Workflow[nextWorkflowStep].Tolerations != nil { + instance.Spec.Tolerations = *instance.Spec.Workflow[nextWorkflowStep].Tolerations } - if instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel != nil { - instance.Spec.SELinuxLevel = *instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel + if instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel != nil { + instance.Spec.SELinuxLevel = *instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel } } @@ -260,7 +254,7 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) mountCerts, envVars, workflowOverrideParams, - externalWorkflowCounter, + nextWorkflowStep, containerImage, privileged, ) @@ -297,7 +291,6 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrlResult, nil } // Create a new job - end - Log.Info("Reconciled Service successfully") return ctrl.Result{}, nil } diff --git a/controllers/common.go b/controllers/common.go index 0d831b5..4abed53 100644 --- a/controllers/common.go +++ b/controllers/common.go @@ -36,13 +36,33 @@ const ( customDataConfigMapinfix = "-custom-data-step-" workflowStepNumInvalid = -1 workflowStepNameInvalid = "no-step-name" + workflowStepLabel = "workflowStep" + instanceNameLabel = "instanceName" + operatorNameLabel = "operator" testOperatorLockName = "test-operator-lock" testOperatorLockOnwerField = "owner" ) const ( - ErrNetworkAttachments = "not all pods have interfaces with ips as configured in NetworkAttachments: %s" + ErrNetworkAttachments = "not all pods have interfaces with ips as configured in NetworkAttachments: %s" + ErrReceivedUnexpectedAction = "unexpected action received" + ErrConfirmLockOwnership = "can not confirm ownership of %s lock" +) + +const ( + InfoWaitingOnJob = "Waiting on either job to finish or release of the lock." + InfoTestingCompleted = "Testing completed. All pods spawned by the test-operator finished." + InfoCreatingFirstPod = "Creating first test pod (workflow step %d)." + InfoCreatingNextPod = "Creating next test pod (workflow step %d)." + InfoCanNotAcquireLock = "Can not acquire %s lock." + InfoCanNotReleaseLock = "Can not release %s lock." +) + +const ( + // RequeueAfterValue tells how much time should we wait before calling Reconcile + // loop again. + RequeueAfterValue = time.Second * 60 ) type Reconciler struct { @@ -52,6 +72,126 @@ type Reconciler struct { Scheme *runtime.Scheme } +// NextAction holds an action that should be performed by the Reconcile loop. +type NextAction int + +const ( + // Wait indicates that we should wait for the state of the OpenShift cluster + // to change + Wait = iota + + // CreateFirstJob indicates that the Reconcile loop should create the first job + // either specified in the .Spec section or in the .Spec.Workflow section. + CreateFirstJob + + // CreateNextJob indicates that the Reconcile loop should create a next job + // specified in the .Spec.Workflow section (if .Spec.Workflow is defined) + CreateNextJob + + // EndTesting indicates that all jobs have already finished. The Reconcile + // loop should end the testing and release resources that are required to + // be release (e.g., global lock) + EndTesting + + // Failure indicates that an unexpected error was encountered + Failure +) + +// NextAction indicates what action needs to be performed by the Reconcile loop +// based on the current state of the OpenShift cluster. +func (r *Reconciler) NextAction( + ctx context.Context, + instance client.Object, + workflowLength int, +) (NextAction, int, error) { + // Get the latest job. The latest job is job with the highest value stored + // in workflowStep label + workflowStepIdx := 0 + lastJob, err := r.GetLastJob(ctx, instance) + if err != nil { + return Failure, workflowStepIdx, err + } + + // If there is a job associated with the current instance. + if lastJob != nil { + workflowStepIdx, err := strconv.Atoi(lastJob.Labels[workflowStepLabel]) + if err != nil { + return Failure, workflowStepIdx, err + } + + // If the last job is not in Failed or Succeded state -> Wait + lastJobFinished := (lastJob.Status.Failed + lastJob.Status.Succeeded) > 0 + if !lastJobFinished { + return Wait, workflowStepIdx, nil + } + + // If the last job is in Failed or Succeeded state and it is NOT the last + // job which was supposed to be created -> CreateNextJob + if lastJobFinished && !isLastJobIndex(workflowStepIdx, workflowLength) { + workflowStepIdx++ + return CreateNextJob, workflowStepIdx, nil + } + + // Otherwise if the job is in Failed or Succeded stated and it IS the + // last job -> EndTesting + if lastJobFinished && isLastJobIndex(workflowStepIdx, workflowLength) { + return EndTesting, workflowStepIdx, nil + } + } + + // If there is not any job associated with the instance -> createFirstJob + if lastJob == nil { + return CreateFirstJob, workflowStepIdx, nil + } + + return Failure, workflowStepIdx, nil +} + +// isLastJobIndex returns true when jobIndex is the index of the last job that +// should be executed. Otherwise the return value is false. +func isLastJobIndex(jobIndex int, workflowLength int) bool { + switch workflowLength { + case 0: + return jobIndex == workflowLength + default: + return jobIndex == (workflowLength - 1) + } +} + +// GetLastJob returns job associated with an instance which has the highest value +// stored in the workflowStep label +func (r *Reconciler) GetLastJob( + ctx context.Context, + instance client.Object, +) (*batchv1.Job, error) { + labels := map[string]string{instanceNameLabel: instance.GetName()} + namespaceListOpt := client.InNamespace(instance.GetNamespace()) + labelsListOpt := client.MatchingLabels(labels) + jobList := &batchv1.JobList{} + err := r.Client.List(ctx, jobList, namespaceListOpt, labelsListOpt) + if err != nil { + return nil, err + } + + var maxJob *batchv1.Job + maxJobWorkflowStep := 0 + + for _, job := range jobList.Items { + workflowStep, err := strconv.Atoi(job.Labels[workflowStepLabel]) + if err != nil { + return &batchv1.Job{}, err + } + + if workflowStep >= maxJobWorkflowStep { + maxJobWorkflowStep = workflowStep + newMaxJob := job + maxJob = &newMaxJob + } + } + + return maxJob, nil +} + func GetEnvVarsConfigMapName(instance interface{}, workflowStepNum int) string { if _, ok := instance.(*v1beta1.Tobiko); ok { return "not-implemented" @@ -350,10 +490,11 @@ func (r *Reconciler) AcquireLock( return true, nil } - _, err := r.GetLockInfo(ctx, instance) + instanceGUID := string(instance.GetUID()) + cm, err := r.GetLockInfo(ctx, instance) if err != nil && k8s_errors.IsNotFound(err) { cm := map[string]string{ - testOperatorLockOnwerField: string(instance.GetUID()), + testOperatorLockOnwerField: instanceGUID, } cms := []util.Template{ @@ -368,6 +509,10 @@ func (r *Reconciler) AcquireLock( return err == nil, err } + if cm.Data[testOperatorLockOnwerField] == instanceGUID { + return true, nil + } + return false, err } @@ -376,7 +521,7 @@ func (r *Reconciler) ReleaseLock(ctx context.Context, instance client.Object) (b cm, err := r.GetLockInfo(ctx, instance) if err != nil && k8s_errors.IsNotFound(err) { - return false, nil + return true, nil } else if err != nil { return false, err } @@ -407,77 +552,6 @@ func (r *Reconciler) ReleaseLock(ctx context.Context, instance client.Object) (b return false, errors.New("failed to delete test-operator-lock") } -func (r *Reconciler) WorkflowStepCounterCreate(ctx context.Context, instance client.Object, h *helper.Helper) bool { - cm := &corev1.ConfigMap{} - err := r.Client.Get(ctx, client.ObjectKey{Namespace: instance.GetNamespace(), Name: r.GetWorkflowConfigMapName(instance)}, cm) - if err == nil { - return true - } - - counterData := make(map[string]string) - counterData["counter"] = "0" - - cms := []util.Template{ - { - Name: r.GetWorkflowConfigMapName(instance), - Namespace: instance.GetNamespace(), - CustomData: counterData, - }, - } - - err = configmap.EnsureConfigMaps(ctx, h, instance, cms, nil) - return err == nil -} - -func (r *Reconciler) WorkflowStepCounterIncrease(ctx context.Context, instance client.Object, h *helper.Helper) bool { - cm := &corev1.ConfigMap{} - err := r.Client.Get(ctx, client.ObjectKey{Namespace: instance.GetNamespace(), Name: r.GetWorkflowConfigMapName(instance)}, cm) - if err != nil { - return false - } - - counterValue, _ := strconv.Atoi(cm.Data["counter"]) - newCounterValue := strconv.Itoa(counterValue + 1) - cm.Data["counter"] = newCounterValue - - cms := []util.Template{ - { - Name: r.GetWorkflowConfigMapName(instance), - Namespace: instance.GetNamespace(), - CustomData: cm.Data, - }, - } - - err = configmap.EnsureConfigMaps(ctx, h, instance, cms, nil) - return err == nil -} - -func (r *Reconciler) WorkflowStepCounterRead(ctx context.Context, instance client.Object) int { - cm := &corev1.ConfigMap{} - err := r.Client.Get(ctx, client.ObjectKey{Namespace: instance.GetNamespace(), Name: r.GetWorkflowConfigMapName(instance)}, cm) - if err != nil { - return workflowStepNumInvalid - } - - counter, _ := strconv.Atoi(cm.Data["counter"]) - return counter -} - -func (r *Reconciler) CompletedJobExists(ctx context.Context, instance client.Object, workflowStepNum int) bool { - job := &batchv1.Job{} - err := r.Client.Get(ctx, client.ObjectKey{Namespace: instance.GetNamespace(), Name: r.GetJobName(instance, workflowStepNum)}, job) - - if err != nil { - return false - } - - if job.Status.Succeeded > 0 || job.Status.Failed > 0 { - return true - } - - return false -} - func (r *Reconciler) JobExists(ctx context.Context, instance client.Object, workflowStepNum int) bool { job := &batchv1.Job{} jobName := r.GetJobName(instance, workflowStepNum) diff --git a/controllers/horizontest_controller.go b/controllers/horizontest_controller.go index a120477..62b8e02 100644 --- a/controllers/horizontest_controller.go +++ b/controllers/horizontest_controller.go @@ -18,6 +18,8 @@ package controllers import ( "context" + "errors" + "fmt" "time" "github.com/go-logr/logr" @@ -63,10 +65,6 @@ func (r *HorizonTestReconciler) GetLogger(ctx context.Context) logr.Logger { // Reconcile - HorizonTest func (r *HorizonTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, _err error) { Log := r.GetLogger(ctx) - - // How much time should we wait before calling Reconcile loop when there is a failure - requeueAfter := time.Second * 60 - instance := &testv1beta1.HorizonTest{} err := r.Client.Get(ctx, req.NamespacedName, instance) if err != nil { @@ -130,25 +128,66 @@ func (r *HorizonTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) } - if r.CompletedJobExists(ctx, instance, 0) { - // The job created by the instance was completed. Release the lock - // so that other instances can spawn a job. - instance.Status.Conditions.MarkTrue(condition.DeploymentReadyCondition, condition.DeploymentReadyMessage) - Log.Info("Job completed") + workflowLength := 0 + nextAction, nextWorkflowStep, err := r.NextAction(ctx, instance, workflowLength) + + switch nextAction { + case Failure: + return ctrl.Result{}, err + + case Wait: + Log.Info(InfoWaitingOnJob) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, nil + + case EndTesting: + // All jobs created by the instance were completed. Release the lock + // so that other instances can spawn their jobs. if lockReleased, err := r.ReleaseLock(ctx, instance); !lockReleased { - return ctrl.Result{}, err + Log.Info(fmt.Sprintf(InfoCanNotReleaseLock, testOperatorLockName)) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, err + } + + instance.Status.Conditions.MarkTrue( + condition.DeploymentReadyCondition, + condition.DeploymentReadyMessage) + + Log.Info(InfoTestingCompleted) + return ctrl.Result{}, nil + + case CreateFirstJob: + lockAcquired, err := r.AcquireLock(ctx, instance, helper, instance.Spec.Parallel) + if !lockAcquired { + Log.Info(fmt.Sprintf(InfoCanNotAcquireLock, testOperatorLockName)) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, err } + + Log.Info(fmt.Sprintf(InfoCreatingFirstPod, nextWorkflowStep)) + + case CreateNextJob: + // Confirm that we still hold the lock. This is useful to check if for + // example somebody / something deleted the lock and it got claimed by + // another instance. This is considered to be an error state. + lockAcquired, err := r.AcquireLock(ctx, instance, helper, instance.Spec.Parallel) + if !lockAcquired { + Log.Error(err, ErrConfirmLockOwnership, testOperatorLockName) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, err + } + + Log.Info(fmt.Sprintf(InfoCreatingNextPod, nextWorkflowStep)) + + default: + return ctrl.Result{}, errors.New(ErrReceivedUnexpectedAction) } serviceLabels := map[string]string{ common.AppSelector: horizontest.ServiceName, - "instanceName": instance.Name, - "operator": "test-operator", + instanceNameLabel: instance.Name, + operatorNameLabel: "test-operator", // NOTE(lpiwowar): This is a workaround since the Horizontest CR does not support // workflows. However, the label might be required by automation that // consumes the test-operator (e.g., ci-framework). - "workflowStep": "0", + workflowStepLabel: "0", } yamlResult, err := EnsureCloudsConfigMapExists(ctx, instance, helper, serviceLabels) @@ -183,20 +222,6 @@ func (r *HorizonTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) mountKubeconfig = true } - // If the current job is executing the last workflow step -> do not create another job - if r.JobExists(ctx, instance, 0) { - return ctrl.Result{}, nil - } - - // We are about to start job that spawns the pod with tests. - // This lock ensures that there is always only one pod running. - lockAcquired, err := r.AcquireLock(ctx, instance, helper, instance.Spec.Parallel) - if !lockAcquired { - Log.Info("Cannot acquire lock") - return ctrl.Result{RequeueAfter: requeueAfter}, err - } - Log.Info("Lock acquired") - // Prepare HorizonTest env vars envVars := r.PrepareHorizonTestEnvVars(instance) jobName := r.GetJobName(instance, 0) @@ -253,7 +278,6 @@ func (r *HorizonTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrlResult, nil } // create Job - end - Log.Info("Reconciled Service successfully") return ctrl.Result{}, nil } diff --git a/controllers/tempest_controller.go b/controllers/tempest_controller.go index 9551db6..d82ff24 100644 --- a/controllers/tempest_controller.go +++ b/controllers/tempest_controller.go @@ -18,6 +18,7 @@ package controllers import ( "context" + "errors" "fmt" "strconv" "time" @@ -39,7 +40,6 @@ import ( corev1 "k8s.io/api/core/v1" k8s_errors "k8s.io/apimachinery/pkg/api/errors" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -71,9 +71,6 @@ func (r *TempestReconciler) GetLogger(ctx context.Context) logr.Logger { func (r *TempestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, _err error) { Log := r.GetLogger(ctx) - // How much time should we wait before calling Reconcile loop when there is a failure - requeueAfter := time.Second * 60 - // Fetch the Tempest instance instance := &testv1beta1.Tempest{} err := r.Client.Get(ctx, req.NamespacedName, instance) @@ -84,11 +81,6 @@ func (r *TempestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (re return ctrl.Result{}, err } - workflowActive := false - if len(instance.Spec.Workflow) > 0 { - workflowActive = true - } - // Create a helper helper, err := helper.NewHelper( instance, @@ -160,45 +152,68 @@ func (r *TempestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (re return r.reconcileDelete(ctx, instance, helper) } - // Ensure that there is an external counter and read its value - // We use the external counter to keep track of the workflow steps - r.WorkflowStepCounterCreate(ctx, instance, helper) - externalWorkflowCounter := r.WorkflowStepCounterRead(ctx, instance) - if externalWorkflowCounter == -1 { - return ctrl.Result{RequeueAfter: requeueAfter}, nil - } + workflowLength := len(instance.Spec.Workflow) + nextAction, nextWorkflowStep, err := r.NextAction(ctx, instance, workflowLength) - // Each job that is being executed by the test operator has - currentWorkflowStep := 0 - runningTobikoJob := &batchv1.Job{} - runningJobName := r.GetJobName(instance, externalWorkflowCounter-1) - err = r.Client.Get(ctx, client.ObjectKey{Namespace: instance.GetNamespace(), Name: runningJobName}, runningTobikoJob) - if err == nil { - currentWorkflowStep, _ = strconv.Atoi(runningTobikoJob.Labels["workflowStep"]) - } + switch nextAction { + case Failure: + return ctrl.Result{}, err + + case Wait: + Log.Info(InfoWaitingOnJob) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, nil - if r.CompletedJobExists(ctx, instance, currentWorkflowStep) { - // The job created by the instance was completed. Release the lock - // so that other instances can spawn a job. - instance.Status.Conditions.MarkTrue(condition.DeploymentReadyCondition, condition.DeploymentReadyMessage) - Log.Info("Job completed") + case EndTesting: + // All jobs created by the instance were completed. Release the lock + // so that other instances can spawn their jobs. if lockReleased, err := r.ReleaseLock(ctx, instance); !lockReleased { - return ctrl.Result{}, err + Log.Info(fmt.Sprintf(InfoCanNotReleaseLock, testOperatorLockName)) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, err + } + + instance.Status.Conditions.MarkTrue( + condition.DeploymentReadyCondition, + condition.DeploymentReadyMessage) + + Log.Info(InfoTestingCompleted) + return ctrl.Result{}, nil + + case CreateFirstJob: + lockAcquired, err := r.AcquireLock(ctx, instance, helper, instance.Spec.Parallel) + if !lockAcquired { + Log.Info(fmt.Sprintf(InfoCanNotAcquireLock, testOperatorLockName)) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, err } + + Log.Info(fmt.Sprintf(InfoCreatingFirstPod, nextWorkflowStep)) + + case CreateNextJob: + // Confirm that we still hold the lock. This is useful to check if for + // example somebody / something deleted the lock and it got claimed by + // another instance. This is considered to be an error state. + lockAcquired, err := r.AcquireLock(ctx, instance, helper, instance.Spec.Parallel) + if !lockAcquired { + Log.Error(err, ErrConfirmLockOwnership, testOperatorLockName) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, err + } + + Log.Info(fmt.Sprintf(InfoCreatingNextPod, nextWorkflowStep)) + + default: + return ctrl.Result{}, errors.New(ErrReceivedUnexpectedAction) } serviceLabels := map[string]string{ common.AppSelector: tempest.ServiceName, - "workflowStep": strconv.Itoa(externalWorkflowCounter), - "instanceName": instance.Name, - "operator": "test-operator", + workflowStepLabel: strconv.Itoa(nextWorkflowStep), + instanceNameLabel: instance.Name, + operatorNameLabel: "test-operator", } workflowStepNum := 0 - // Create multiple PVCs for parallel execution - if instance.Spec.Parallel && externalWorkflowCounter < len(instance.Spec.Workflow) { - workflowStepNum = externalWorkflowCounter + if instance.Spec.Parallel && nextWorkflowStep < len(instance.Spec.Workflow) { + workflowStepNum = nextWorkflowStep } // Create PersistentVolumeClaim @@ -223,29 +238,8 @@ func (r *TempestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (re mountSSHKey = r.CheckSecretExists(ctx, instance, instance.Spec.SSHKeySecretName) } - // If the current job is executing the last workflow step -> do not create another job - if workflowActive && externalWorkflowCounter >= len(instance.Spec.Workflow) { - return ctrl.Result{}, nil - } else if !workflowActive && r.JobExists(ctx, instance, currentWorkflowStep) { - return ctrl.Result{}, nil - } - - // We are about to start job that spawns the pod with tests. - // This lock ensures that there is always only one pod running. - lockAcquired, err := r.AcquireLock(ctx, instance, helper, instance.Spec.Parallel) - if !lockAcquired { - Log.Info("Can not acquire lock") - requeueAfter := time.Second * 60 - return ctrl.Result{RequeueAfter: requeueAfter}, err - } - Log.Info("Lock acquired") - - if workflowActive { - r.WorkflowStepCounterIncrease(ctx, instance, helper) - } - // Generate ConfigMaps - err = r.generateServiceConfigMaps(ctx, helper, instance, externalWorkflowCounter) + err = r.generateServiceConfigMaps(ctx, helper, instance, nextWorkflowStep) if err != nil { instance.Status.Conditions.Set(condition.FalseCondition( condition.ServiceConfigReadyCondition, @@ -293,7 +287,7 @@ func (r *TempestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (re } // NetworkAttachments - if r.JobExists(ctx, instance, externalWorkflowCounter) { + if r.JobExists(ctx, instance, nextWorkflowStep) { networkReady, networkAttachmentStatus, err := nad.VerifyNetworkStatusFromAnnotation( ctx, helper, @@ -327,9 +321,9 @@ func (r *TempestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (re // Create a new job mountCerts := r.CheckSecretExists(ctx, instance, "combined-ca-bundle") - customDataConfigMapName := GetCustomDataConfigMapName(instance, externalWorkflowCounter) - EnvVarsConfigMapName := GetEnvVarsConfigMapName(instance, externalWorkflowCounter) - jobName := r.GetJobName(instance, externalWorkflowCounter) + customDataConfigMapName := GetCustomDataConfigMapName(instance, nextWorkflowStep) + EnvVarsConfigMapName := GetEnvVarsConfigMapName(instance, nextWorkflowStep) + jobName := r.GetJobName(instance, nextWorkflowStep) logsPVCName := r.GetPVCLogsName(instance, workflowStepNum) containerImage, err := r.GetContainerImage(ctx, instance.Spec.ContainerImage, instance) if err != nil { @@ -348,17 +342,17 @@ func (r *TempestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (re // Note(lpiwowar): Remove all the workflow merge code to webhook once it is done. // It will simplify the logic and duplicite code (Tempest vs Tobiko) - if externalWorkflowCounter < len(instance.Spec.Workflow) { - if instance.Spec.Workflow[externalWorkflowCounter].NodeSelector != nil { - instance.Spec.NodeSelector = *instance.Spec.Workflow[externalWorkflowCounter].NodeSelector + if nextWorkflowStep < len(instance.Spec.Workflow) { + if instance.Spec.Workflow[nextWorkflowStep].NodeSelector != nil { + instance.Spec.NodeSelector = *instance.Spec.Workflow[nextWorkflowStep].NodeSelector } - if instance.Spec.Workflow[externalWorkflowCounter].Tolerations != nil { - instance.Spec.Tolerations = *instance.Spec.Workflow[externalWorkflowCounter].Tolerations + if instance.Spec.Workflow[nextWorkflowStep].Tolerations != nil { + instance.Spec.Tolerations = *instance.Spec.Workflow[nextWorkflowStep].Tolerations } - if instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel != nil { - instance.Spec.SELinuxLevel = *instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel + if instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel != nil { + instance.Spec.SELinuxLevel = *instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel } } @@ -388,7 +382,7 @@ func (r *TempestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (re // Release the lock and allow other controllers to spawn // a job. if lockReleased, lockErr := r.ReleaseLock(ctx, instance); lockReleased { - return ctrl.Result{}, lockErr + return ctrl.Result{RequeueAfter: RequeueAfterValue}, lockErr } instance.Status.Conditions.Set(condition.FalseCondition( @@ -408,7 +402,6 @@ func (r *TempestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (re } // Create a new job - end - Log.Info("Reconciled Service successfully") return ctrl.Result{}, nil } @@ -650,8 +643,8 @@ func (r *TempestReconciler) generateServiceConfigMaps( cmLabels := labels.GetLabels(instance, labels.GetGroupLabel(tempest.ServiceName), map[string]string{}) operatorLabels := map[string]string{ - "operator": "test-operator", - "instanceName": instance.Name, + operatorNameLabel: "test-operator", + instanceNameLabel: instance.Name, } // Combine labels diff --git a/controllers/tobiko_controller.go b/controllers/tobiko_controller.go index 36e0058..ee60298 100644 --- a/controllers/tobiko_controller.go +++ b/controllers/tobiko_controller.go @@ -18,6 +18,7 @@ package controllers import ( "context" + "errors" "fmt" "strconv" "time" @@ -39,7 +40,6 @@ import ( corev1 "k8s.io/api/core/v1" k8s_errors "k8s.io/apimachinery/pkg/api/errors" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -71,9 +71,6 @@ func (r *TobikoReconciler) GetLogger(ctx context.Context) logr.Logger { func (r *TobikoReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, _err error) { Log := r.GetLogger(ctx) - // How much time should we wait before calling Reconcile loop when there is a failure - requeueAfter := time.Second * 60 - instance := &testv1beta1.Tobiko{} err := r.Client.Get(ctx, req.NamespacedName, instance) if err != nil { @@ -83,12 +80,6 @@ func (r *TobikoReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res return ctrl.Result{}, err } - // Check whether the user wants to execute workflow - workflowActive := false - if len(instance.Spec.Workflow) > 0 { - workflowActive = true - } - helper, err := helper.NewHelper( instance, r.Client, @@ -148,38 +139,62 @@ func (r *TobikoReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res instance.Status.NetworkAttachments = map[string][]string{} } - // Ensure that there is an external counter and read its value - // We use the external counter to keep track of the workflow steps - r.WorkflowStepCounterCreate(ctx, instance, helper) - externalWorkflowCounter := r.WorkflowStepCounterRead(ctx, instance) - if externalWorkflowCounter == -1 { - return ctrl.Result{RequeueAfter: requeueAfter}, nil - } + workflowLength := len(instance.Spec.Workflow) + nextAction, nextWorkflowStep, err := r.NextAction(ctx, instance, workflowLength) - // Each job that is being executed by the test operator has - currentWorkflowStep := 0 - runningTobikoJob := &batchv1.Job{} - runningJobName := r.GetJobName(instance, externalWorkflowCounter-1) - err = r.Client.Get(ctx, client.ObjectKey{Namespace: instance.GetNamespace(), Name: runningJobName}, runningTobikoJob) - if err == nil { - currentWorkflowStep, _ = strconv.Atoi(runningTobikoJob.Labels["workflowStep"]) - } + switch nextAction { + case Failure: + return ctrl.Result{}, err + + case Wait: + Log.Info(InfoWaitingOnJob) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, nil - if r.CompletedJobExists(ctx, instance, currentWorkflowStep) { - instance.Status.Conditions.MarkTrue(condition.DeploymentReadyCondition, condition.DeploymentReadyMessage) - // The job created by the instance was completed. Release the lock - // so that other instances can spawn a job. - Log.Info("Job completed") + case EndTesting: + // All jobs created by the instance were completed. Release the lock + // so that other instances can spawn their jobs. if lockReleased, err := r.ReleaseLock(ctx, instance); !lockReleased { - return ctrl.Result{}, err + Log.Info(fmt.Sprintf(InfoCanNotReleaseLock, testOperatorLockName)) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, err + } + + instance.Status.Conditions.MarkTrue( + condition.DeploymentReadyCondition, + condition.DeploymentReadyMessage) + + Log.Info(InfoTestingCompleted) + return ctrl.Result{}, nil + + case CreateFirstJob: + lockAcquired, err := r.AcquireLock(ctx, instance, helper, instance.Spec.Parallel) + if !lockAcquired { + Log.Info(fmt.Sprintf(InfoCanNotAcquireLock, testOperatorLockName)) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, err } + + Log.Info(fmt.Sprintf(InfoCreatingFirstPod, nextWorkflowStep)) + + case CreateNextJob: + // Confirm that we still hold the lock. This needs to be checked in order + // to prevent situation when somebody / something deleted the lock and it + // got claimedy by another instance. + lockAcquired, err := r.AcquireLock(ctx, instance, helper, instance.Spec.Parallel) + if !lockAcquired { + Log.Error(err, ErrConfirmLockOwnership, testOperatorLockName) + return ctrl.Result{RequeueAfter: RequeueAfterValue}, err + } + + Log.Info(fmt.Sprintf(InfoCreatingNextPod, nextWorkflowStep)) + + default: + return ctrl.Result{}, errors.New(ErrReceivedUnexpectedAction) } serviceLabels := map[string]string{ common.AppSelector: tobiko.ServiceName, - "workflowStep": strconv.Itoa(externalWorkflowCounter), - "instanceName": instance.Name, - "operator": "test-operator", + workflowStepLabel: strconv.Itoa(nextWorkflowStep), + instanceNameLabel: instance.Name, + operatorNameLabel: "test-operator", } yamlResult, err := EnsureCloudsConfigMapExists(ctx, instance, helper, serviceLabels) @@ -191,8 +206,8 @@ func (r *TobikoReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res workflowStepNum := 0 // Create multiple PVCs for parallel execution - if instance.Spec.Parallel && externalWorkflowCounter < len(instance.Spec.Workflow) { - workflowStepNum = externalWorkflowCounter + if instance.Spec.Parallel && nextWorkflowStep < len(instance.Spec.Workflow) { + workflowStepNum = nextWorkflowStep } // Create PersistentVolumeClaim @@ -246,7 +261,7 @@ func (r *TobikoReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res } // NetworkAttachments - if r.JobExists(ctx, instance, externalWorkflowCounter) { + if r.JobExists(ctx, instance, nextWorkflowStep) { networkReady, networkAttachmentStatus, err := nad.VerifyNetworkStatusFromAnnotation( ctx, helper, @@ -293,32 +308,12 @@ func (r *TobikoReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res mountKubeconfig = true } - // If the current job is executing the last workflow step -> do not create another job - if workflowActive && externalWorkflowCounter >= len(instance.Spec.Workflow) { - return ctrl.Result{}, nil - } else if !workflowActive && r.JobExists(ctx, instance, currentWorkflowStep) { - return ctrl.Result{}, nil - } - - // We are about to start job that spawns the pod with tests. - // This lock ensures that there is always only one pod running. - lockAcquired, err := r.AcquireLock(ctx, instance, helper, instance.Spec.Parallel) - if !lockAcquired { - Log.Info("Can not acquire lock") - return ctrl.Result{RequeueAfter: requeueAfter}, err - } - Log.Info("Lock acquired") - - if workflowActive { - r.WorkflowStepCounterIncrease(ctx, instance, helper) - } - // Prepare Tobiko env vars - envVars := r.PrepareTobikoEnvVars(ctx, serviceLabels, instance, helper, externalWorkflowCounter) - jobName := r.GetJobName(instance, externalWorkflowCounter) + envVars := r.PrepareTobikoEnvVars(ctx, serviceLabels, instance, helper, nextWorkflowStep) + jobName := r.GetJobName(instance, nextWorkflowStep) logsPVCName := r.GetPVCLogsName(instance, workflowStepNum) containerImage, err := r.GetContainerImage(ctx, instance.Spec.ContainerImage, instance) - privileged := r.OverwriteValueWithWorkflow(instance.Spec, "Privileged", "pbool", externalWorkflowCounter).(bool) + privileged := r.OverwriteValueWithWorkflow(instance.Spec, "Privileged", "pbool", nextWorkflowStep).(bool) if err != nil { return ctrl.Result{}, err } @@ -372,7 +367,6 @@ func (r *TobikoReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res return ctrlResult, nil } // create Job - end - Log.Info("Reconciled Service successfully") return ctrl.Result{}, nil }