Skip to content

Commit

Permalink
Merge pull request #231 from lpiwowar/bugfix/OSPRH-10386
Browse files Browse the repository at this point in the history
[OSPRH-10386] Update logic for workflows
  • Loading branch information
openshift-merge-bot[bot] authored Nov 19, 2024
2 parents 104d029 + 5709ee4 commit 66bdaa2
Show file tree
Hide file tree
Showing 5 changed files with 382 additions and 304 deletions.
127 changes: 60 additions & 67 deletions controllers/ansibletest_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package controllers

import (
"context"
"errors"
"fmt"
"strconv"
"time"

Expand All @@ -37,7 +39,6 @@ import (
corev1 "k8s.io/api/core/v1"
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

Expand Down Expand Up @@ -68,9 +69,6 @@ func (r *AnsibleTestReconciler) GetLogger(ctx context.Context) logr.Logger {
func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, _err error) {
Log := r.GetLogger(ctx)

// How much time should we wait before calling Reconcile loop when there is a failure
requeueAfter := time.Second * 60

// Fetch the ansible instance
instance := &testv1beta1.AnsibleTest{}
err := r.Client.Get(ctx, req.NamespacedName, instance)
Expand All @@ -81,11 +79,6 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}

workflowActive := false
if len(instance.Spec.Workflow) > 0 {
workflowActive = true
}

// Create a helper
helper, err := helper.NewHelper(
instance,
Expand Down Expand Up @@ -142,40 +135,62 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)

}

// Ensure that there is an external counter and read its value
// We use the external counter to keep track of the workflow steps
r.WorkflowStepCounterCreate(ctx, instance, helper)
externalWorkflowCounter := r.WorkflowStepCounterRead(ctx, instance)
if externalWorkflowCounter == -1 {
return ctrl.Result{RequeueAfter: requeueAfter}, nil
}
workflowLength := len(instance.Spec.Workflow)
nextAction, nextWorkflowStep, err := r.NextAction(ctx, instance, workflowLength)

// Each job that is being executed by the test operator has
currentWorkflowStep := 0
runningAnsibleJob := &batchv1.Job{}
runningJobName := r.GetJobName(instance, externalWorkflowCounter-1)
err = r.Client.Get(ctx, client.ObjectKey{Namespace: instance.GetNamespace(), Name: runningJobName}, runningAnsibleJob)
if err != nil && !k8s_errors.IsNotFound(err) {
switch nextAction {
case Failure:
return ctrl.Result{}, err
} else if err == nil {
currentWorkflowStep, _ = strconv.Atoi(runningAnsibleJob.Labels["workflowStep"])
}

if r.CompletedJobExists(ctx, instance, currentWorkflowStep) {
// The job created by the instance was completed. Release the lock
// so that other instances can spawn a job.
instance.Status.Conditions.MarkTrue(condition.DeploymentReadyCondition, condition.DeploymentReadyMessage)
Log.Info("Job completed")
case Wait:
Log.Info(InfoWaitingOnJob)
return ctrl.Result{RequeueAfter: RequeueAfterValue}, nil

case EndTesting:
// All jobs created by the instance were completed. Release the lock
// so that other instances can spawn their jobs.
if lockReleased, err := r.ReleaseLock(ctx, instance); !lockReleased {
return ctrl.Result{}, err
Log.Info(fmt.Sprintf(InfoCanNotReleaseLock, testOperatorLockName))
return ctrl.Result{RequeueAfter: RequeueAfterValue}, err
}

instance.Status.Conditions.MarkTrue(
condition.DeploymentReadyCondition,
condition.DeploymentReadyMessage)

Log.Info(InfoTestingCompleted)
return ctrl.Result{}, nil

case CreateFirstJob:
lockAcquired, err := r.AcquireLock(ctx, instance, helper, false)
if !lockAcquired {
Log.Info(fmt.Sprintf(InfoCanNotAcquireLock, testOperatorLockName))
return ctrl.Result{RequeueAfter: RequeueAfterValue}, err
}

Log.Info(fmt.Sprintf(InfoCreatingFirstPod, nextWorkflowStep))

case CreateNextJob:
// Confirm that we still hold the lock. This is useful to check if for
// example somebody / something deleted the lock and it got claimed by
// another instance. This is considered to be an error state.
lockAcquired, err := r.AcquireLock(ctx, instance, helper, false)
if !lockAcquired {
Log.Error(err, ErrConfirmLockOwnership, testOperatorLockName)
return ctrl.Result{RequeueAfter: RequeueAfterValue}, err
}

Log.Info(fmt.Sprintf(InfoCreatingNextPod, nextWorkflowStep))

default:
return ctrl.Result{}, errors.New(ErrReceivedUnexpectedAction)
}

serviceLabels := map[string]string{
common.AppSelector: ansibletest.ServiceName,
"workflowStep": strconv.Itoa(externalWorkflowCounter),
"instanceName": instance.Name,
"operator": "test-operator",
workflowStepLabel: strconv.Itoa(nextWorkflowStep),
instanceNameLabel: instance.Name,
operatorNameLabel: "test-operator",
}

// Create PersistentVolumeClaim
Expand All @@ -194,51 +209,30 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
// Create PersistentVolumeClaim - end

// If the current job is executing the last workflow step -> do not create another job
if workflowActive && externalWorkflowCounter >= len(instance.Spec.Workflow) {
return ctrl.Result{}, nil
} else if !workflowActive && r.JobExists(ctx, instance, currentWorkflowStep) {
return ctrl.Result{}, nil
}

// We are about to start job that spawns the pod with tests.
// This lock ensures that there is always only one pod running.
lockAcquired, err := r.AcquireLock(ctx, instance, helper, false)
if !lockAcquired {
Log.Info("Can not acquire lock")
requeueAfter := time.Second * 60
return ctrl.Result{RequeueAfter: requeueAfter}, err
}
Log.Info("Lock acquired")

if workflowActive {
r.WorkflowStepCounterIncrease(ctx, instance, helper)
}

instance.Status.Conditions.MarkTrue(condition.ServiceConfigReadyCondition, condition.ServiceConfigReadyMessage)

// Create a new job
mountCerts := r.CheckSecretExists(ctx, instance, "combined-ca-bundle")
jobName := r.GetJobName(instance, externalWorkflowCounter)
envVars, workflowOverrideParams := r.PrepareAnsibleEnv(instance, externalWorkflowCounter)
jobName := r.GetJobName(instance, nextWorkflowStep)
envVars, workflowOverrideParams := r.PrepareAnsibleEnv(instance, nextWorkflowStep)
logsPVCName := r.GetPVCLogsName(instance, 0)
containerImage, err := r.GetContainerImage(ctx, workflowOverrideParams["ContainerImage"], instance)
privileged := r.OverwriteAnsibleWithWorkflow(instance.Spec, "Privileged", "pbool", externalWorkflowCounter).(bool)
privileged := r.OverwriteAnsibleWithWorkflow(instance.Spec, "Privileged", "pbool", nextWorkflowStep).(bool)
if err != nil {
return ctrl.Result{}, err
}

if externalWorkflowCounter < len(instance.Spec.Workflow) {
if instance.Spec.Workflow[externalWorkflowCounter].NodeSelector != nil {
instance.Spec.NodeSelector = *instance.Spec.Workflow[externalWorkflowCounter].NodeSelector
if nextWorkflowStep < len(instance.Spec.Workflow) {
if instance.Spec.Workflow[nextWorkflowStep].NodeSelector != nil {
instance.Spec.NodeSelector = *instance.Spec.Workflow[nextWorkflowStep].NodeSelector
}

if instance.Spec.Workflow[externalWorkflowCounter].Tolerations != nil {
instance.Spec.Tolerations = *instance.Spec.Workflow[externalWorkflowCounter].Tolerations
if instance.Spec.Workflow[nextWorkflowStep].Tolerations != nil {
instance.Spec.Tolerations = *instance.Spec.Workflow[nextWorkflowStep].Tolerations
}

if instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel != nil {
instance.Spec.SELinuxLevel = *instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel
if instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel != nil {
instance.Spec.SELinuxLevel = *instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel
}
}

Expand All @@ -260,7 +254,7 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
mountCerts,
envVars,
workflowOverrideParams,
externalWorkflowCounter,
nextWorkflowStep,
containerImage,
privileged,
)
Expand Down Expand Up @@ -297,7 +291,6 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrlResult, nil
}
// Create a new job - end

Log.Info("Reconciled Service successfully")
return ctrl.Result{}, nil
}
Expand Down
Loading

0 comments on commit 66bdaa2

Please sign in to comment.