Skip to content

Commit

Permalink
Update logic for workflows
Browse files Browse the repository at this point in the history
There was a race condition in the previous implementation of the
workflows that was caused by the dependency of the workflow feature
on a ConfigMap that served as counter.

This commit simplifies the logic of the Reconcile loop in two ways:

  1. Removes the dependency for an external workflow counter

  2. Introduces NextAction function which decides what next action
     should be performed by the Reconcile loop based on the current
     state of the OpenShift cluster. The input of this function is the
     instance which is currently being processed and a workflowLength.
     Using these two arguments it then tells the Reconcile loop which
     actions it should perform: Wait, CreateFirstJob, CreateNextJob,
     EndTesting, Failure.

This approach should simplify the reconcile logic and move the
test-operator towards a unified Reconcile loop.
  • Loading branch information
lpiwowar committed Nov 18, 2024
1 parent cd41d06 commit 5709ee4
Show file tree
Hide file tree
Showing 5 changed files with 382 additions and 304 deletions.
127 changes: 60 additions & 67 deletions controllers/ansibletest_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package controllers

import (
"context"
"errors"
"fmt"
"strconv"
"time"

Expand All @@ -37,7 +39,6 @@ import (
corev1 "k8s.io/api/core/v1"
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

Expand Down Expand Up @@ -68,9 +69,6 @@ func (r *AnsibleTestReconciler) GetLogger(ctx context.Context) logr.Logger {
func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, _err error) {
Log := r.GetLogger(ctx)

// How much time should we wait before calling Reconcile loop when there is a failure
requeueAfter := time.Second * 60

// Fetch the ansible instance
instance := &testv1beta1.AnsibleTest{}
err := r.Client.Get(ctx, req.NamespacedName, instance)
Expand All @@ -81,11 +79,6 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}

workflowActive := false
if len(instance.Spec.Workflow) > 0 {
workflowActive = true
}

// Create a helper
helper, err := helper.NewHelper(
instance,
Expand Down Expand Up @@ -142,40 +135,62 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)

}

// Ensure that there is an external counter and read its value
// We use the external counter to keep track of the workflow steps
r.WorkflowStepCounterCreate(ctx, instance, helper)
externalWorkflowCounter := r.WorkflowStepCounterRead(ctx, instance)
if externalWorkflowCounter == -1 {
return ctrl.Result{RequeueAfter: requeueAfter}, nil
}
workflowLength := len(instance.Spec.Workflow)
nextAction, nextWorkflowStep, err := r.NextAction(ctx, instance, workflowLength)

// Each job that is being executed by the test operator has
currentWorkflowStep := 0
runningAnsibleJob := &batchv1.Job{}
runningJobName := r.GetJobName(instance, externalWorkflowCounter-1)
err = r.Client.Get(ctx, client.ObjectKey{Namespace: instance.GetNamespace(), Name: runningJobName}, runningAnsibleJob)
if err != nil && !k8s_errors.IsNotFound(err) {
switch nextAction {
case Failure:
return ctrl.Result{}, err
} else if err == nil {
currentWorkflowStep, _ = strconv.Atoi(runningAnsibleJob.Labels["workflowStep"])
}

if r.CompletedJobExists(ctx, instance, currentWorkflowStep) {
// The job created by the instance was completed. Release the lock
// so that other instances can spawn a job.
instance.Status.Conditions.MarkTrue(condition.DeploymentReadyCondition, condition.DeploymentReadyMessage)
Log.Info("Job completed")
case Wait:
Log.Info(InfoWaitingOnJob)
return ctrl.Result{RequeueAfter: RequeueAfterValue}, nil

case EndTesting:
// All jobs created by the instance were completed. Release the lock
// so that other instances can spawn their jobs.
if lockReleased, err := r.ReleaseLock(ctx, instance); !lockReleased {
return ctrl.Result{}, err
Log.Info(fmt.Sprintf(InfoCanNotReleaseLock, testOperatorLockName))
return ctrl.Result{RequeueAfter: RequeueAfterValue}, err
}

instance.Status.Conditions.MarkTrue(
condition.DeploymentReadyCondition,
condition.DeploymentReadyMessage)

Log.Info(InfoTestingCompleted)
return ctrl.Result{}, nil

case CreateFirstJob:
lockAcquired, err := r.AcquireLock(ctx, instance, helper, false)
if !lockAcquired {
Log.Info(fmt.Sprintf(InfoCanNotAcquireLock, testOperatorLockName))
return ctrl.Result{RequeueAfter: RequeueAfterValue}, err
}

Log.Info(fmt.Sprintf(InfoCreatingFirstPod, nextWorkflowStep))

case CreateNextJob:
// Confirm that we still hold the lock. This is useful to check if for
// example somebody / something deleted the lock and it got claimed by
// another instance. This is considered to be an error state.
lockAcquired, err := r.AcquireLock(ctx, instance, helper, false)
if !lockAcquired {
Log.Error(err, ErrConfirmLockOwnership, testOperatorLockName)
return ctrl.Result{RequeueAfter: RequeueAfterValue}, err
}

Log.Info(fmt.Sprintf(InfoCreatingNextPod, nextWorkflowStep))

default:
return ctrl.Result{}, errors.New(ErrReceivedUnexpectedAction)
}

serviceLabels := map[string]string{
common.AppSelector: ansibletest.ServiceName,
"workflowStep": strconv.Itoa(externalWorkflowCounter),
"instanceName": instance.Name,
"operator": "test-operator",
workflowStepLabel: strconv.Itoa(nextWorkflowStep),
instanceNameLabel: instance.Name,
operatorNameLabel: "test-operator",
}

// Create PersistentVolumeClaim
Expand All @@ -194,51 +209,30 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
// Create PersistentVolumeClaim - end

// If the current job is executing the last workflow step -> do not create another job
if workflowActive && externalWorkflowCounter >= len(instance.Spec.Workflow) {
return ctrl.Result{}, nil
} else if !workflowActive && r.JobExists(ctx, instance, currentWorkflowStep) {
return ctrl.Result{}, nil
}

// We are about to start job that spawns the pod with tests.
// This lock ensures that there is always only one pod running.
lockAcquired, err := r.AcquireLock(ctx, instance, helper, false)
if !lockAcquired {
Log.Info("Can not acquire lock")
requeueAfter := time.Second * 60
return ctrl.Result{RequeueAfter: requeueAfter}, err
}
Log.Info("Lock acquired")

if workflowActive {
r.WorkflowStepCounterIncrease(ctx, instance, helper)
}

instance.Status.Conditions.MarkTrue(condition.ServiceConfigReadyCondition, condition.ServiceConfigReadyMessage)

// Create a new job
mountCerts := r.CheckSecretExists(ctx, instance, "combined-ca-bundle")
jobName := r.GetJobName(instance, externalWorkflowCounter)
envVars, workflowOverrideParams := r.PrepareAnsibleEnv(instance, externalWorkflowCounter)
jobName := r.GetJobName(instance, nextWorkflowStep)
envVars, workflowOverrideParams := r.PrepareAnsibleEnv(instance, nextWorkflowStep)
logsPVCName := r.GetPVCLogsName(instance, 0)
containerImage, err := r.GetContainerImage(ctx, workflowOverrideParams["ContainerImage"], instance)
privileged := r.OverwriteAnsibleWithWorkflow(instance.Spec, "Privileged", "pbool", externalWorkflowCounter).(bool)
privileged := r.OverwriteAnsibleWithWorkflow(instance.Spec, "Privileged", "pbool", nextWorkflowStep).(bool)
if err != nil {
return ctrl.Result{}, err
}

if externalWorkflowCounter < len(instance.Spec.Workflow) {
if instance.Spec.Workflow[externalWorkflowCounter].NodeSelector != nil {
instance.Spec.NodeSelector = *instance.Spec.Workflow[externalWorkflowCounter].NodeSelector
if nextWorkflowStep < len(instance.Spec.Workflow) {
if instance.Spec.Workflow[nextWorkflowStep].NodeSelector != nil {
instance.Spec.NodeSelector = *instance.Spec.Workflow[nextWorkflowStep].NodeSelector
}

if instance.Spec.Workflow[externalWorkflowCounter].Tolerations != nil {
instance.Spec.Tolerations = *instance.Spec.Workflow[externalWorkflowCounter].Tolerations
if instance.Spec.Workflow[nextWorkflowStep].Tolerations != nil {
instance.Spec.Tolerations = *instance.Spec.Workflow[nextWorkflowStep].Tolerations
}

if instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel != nil {
instance.Spec.SELinuxLevel = *instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel
if instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel != nil {
instance.Spec.SELinuxLevel = *instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel
}
}

Expand All @@ -260,7 +254,7 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
mountCerts,
envVars,
workflowOverrideParams,
externalWorkflowCounter,
nextWorkflowStep,
containerImage,
privileged,
)
Expand Down Expand Up @@ -297,7 +291,6 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrlResult, nil
}
// Create a new job - end

Log.Info("Reconciled Service successfully")
return ctrl.Result{}, nil
}
Expand Down
Loading

0 comments on commit 5709ee4

Please sign in to comment.