From f9f252f365b57b10f0a94a9bd5766209850b7d60 Mon Sep 17 00:00:00 2001 From: Lukasz Zajaczkowski Date: Tue, 28 May 2024 16:27:07 +0200 Subject: [PATCH] feat: add job timeout (#199) * add job timeout * kill job * fix unit tests --- internal/controller/stackrunjob_controller.go | 77 ++++++++++--------- pkg/controller/stacks/reconciler.go | 3 + pkg/controller/stacks/reconciler_test.go | 4 + 3 files changed, 49 insertions(+), 35 deletions(-) diff --git a/internal/controller/stackrunjob_controller.go b/internal/controller/stackrunjob_controller.go index 1cf15b57..637e3f42 100644 --- a/internal/controller/stackrunjob_controller.go +++ b/internal/controller/stackrunjob_controller.go @@ -20,28 +20,27 @@ import ( "context" "fmt" "strings" + "time" - "github.com/pluralsh/polly/algorithms" - "github.com/samber/lo" - "k8s.io/apimachinery/pkg/labels" - + console "github.com/pluralsh/console-client-go" clienterrors "github.com/pluralsh/deployment-operator/internal/errors" + "github.com/pluralsh/deployment-operator/pkg/client" "github.com/pluralsh/deployment-operator/pkg/controller/stacks" - - console "github.com/pluralsh/console-client-go" + "github.com/pluralsh/polly/algorithms" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" - "sigs.k8s.io/controller-runtime/pkg/predicate" - + apierrs "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" k8sClient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" - - "github.com/pluralsh/deployment-operator/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/predicate" ) const jobSelector = "stackrun.deployments.plural.sh" +const jobTimout = time.Minute * 40 // StackRunJobReconciler reconciles a Job resource. type StackRunJobReconciler struct { @@ -69,18 +68,20 @@ func (r *StackRunJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, err } - // Update step statuses, i.e., when stack run was successful or failed. - for _, step := range stackRun.Steps { - if update := r.getStepStatusUpdate(stackRun.Status, step.Status); update != nil { - err := r.ConsoleClient.UpdateStackRunStep(step.ID, console.RunStepAttributes{Status: *update}) - return ctrl.Result{}, err - } - } - // Exit if stack run is not in running state (run status already updated), // or if the job is still running (harness controls run status). if stackRun.Status != console.StackStatusRunning || job.Status.CompletionTime.IsZero() { - return ctrl.Result{}, nil + if isActiveJobTimout(stackRun.Status, job) { + if err := r.killJob(ctx, job); err != nil { + return ctrl.Result{}, err + } + logger.V(2).Info("stack run job failed", "name", job.Name, "namespace", job.Namespace) + err := r.ConsoleClient.UpdateStackRun(stackRunID, console.StackRunAttributes{ + Status: console.StackStatusFailed, + }) + return ctrl.Result{}, err + } + return requeue, nil } if hasSucceeded(job) { @@ -108,22 +109,6 @@ func (r *StackRunJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } -func (r *StackRunJobReconciler) getStepStatusUpdate(stackStatus console.StackStatus, stepStatus console.StepStatus) *console.StepStatus { - if stepStatus != console.StepStatusPending && stepStatus != console.StepStatusRunning { - return nil - } - - if stackStatus == console.StackStatusSuccessful { - return lo.ToPtr(console.StepStatusSuccessful) - } - - if stackStatus == console.StackStatusFailed || stackStatus == console.StackStatusCancelled { - return lo.ToPtr(console.StepStatusFailed) - } - - return nil -} - func (r *StackRunJobReconciler) getJobPodStatus(ctx context.Context, selector map[string]string) (console.StackStatus, error) { pod, err := r.getJobPod(ctx, selector) if err != nil { @@ -162,6 +147,21 @@ func (r *StackRunJobReconciler) getPodStatus(pod *corev1.Pod) (console.StackStat return getExitCodeStatus(containerStatus.State.Terminated.ExitCode), nil } +func (r *StackRunJobReconciler) killJob(ctx context.Context, job *batchv1.Job) error { + log := log.FromContext(ctx) + deletePolicy := metav1.DeletePropagationBackground // kill the job and its pods asap + if err := r.Delete(ctx, job, &k8sClient.DeleteOptions{ + PropagationPolicy: &deletePolicy, + }); err != nil { + if !apierrs.IsNotFound(err) { + return err + } + return nil + } + log.V(2).Info("Job killed successfully.", "JobName", job.Name, "Namespace", job.Namespace) + return nil +} + func getExitCodeStatus(exitCode int32) console.StackStatus { switch exitCode { case 64: @@ -178,6 +178,13 @@ func getStackRunID(job *batchv1.Job) string { return strings.TrimPrefix(job.Name, "stack-") } +func isActiveJobTimout(stackStatus console.StackStatus, job *batchv1.Job) bool { + if stackStatus == console.StackStatusPending && job.Status.CompletionTime.IsZero() && !job.Status.StartTime.IsZero() { + return time.Now().After(job.Status.StartTime.Add(jobTimout)) + } + return false +} + // SetupWithManager sets up the controller with the Manager. func (r *StackRunJobReconciler) SetupWithManager(mgr ctrl.Manager) error { byAnnotation := predicate.NewPredicateFuncs(func(object k8sClient.Object) bool { diff --git a/pkg/controller/stacks/reconciler.go b/pkg/controller/stacks/reconciler.go index ccc97b94..355eff78 100644 --- a/pkg/controller/stacks/reconciler.go +++ b/pkg/controller/stacks/reconciler.go @@ -108,6 +108,9 @@ func (r *StackReconciler) Reconcile(ctx context.Context, id string) (reconcile.R return reconcile.Result{}, err } + if stackRun.Status != console.StackStatusPending { + return reconcile.Result{}, nil + } _, err = r.reconcileRunJob(ctx, stackRun) return reconcile.Result{}, err } diff --git a/pkg/controller/stacks/reconciler_test.go b/pkg/controller/stacks/reconciler_test.go index 84877cb1..9ca7ace8 100644 --- a/pkg/controller/stacks/reconciler_test.go +++ b/pkg/controller/stacks/reconciler_test.go @@ -96,6 +96,7 @@ var _ = Describe("Reconciler", Ordered, func() { fakeConsoleClient.On("GetStackRun", mock.Anything).Return(&console.StackRunFragment{ ID: stackRunId, Approval: lo.ToPtr(false), + Status: console.StackStatusPending, }, nil) reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, time.Minute, namespace, "", "") @@ -109,6 +110,7 @@ var _ = Describe("Reconciler", Ordered, func() { stackRun := &console.StackRunFragment{ ID: stackRunId, Approval: lo.ToPtr(false), + Status: console.StackStatusPending, } fakeConsoleClient := mocks.NewClientMock(mocks.TestingT) @@ -146,6 +148,7 @@ var _ = Describe("Reconciler", Ordered, func() { Annotations: map[string]any{"test": annotationsValue}, ServiceAccount: lo.ToPtr("test-sa"), }, + Status: console.StackStatusPending, } fakeConsoleClient := mocks.NewClientMock(mocks.TestingT) @@ -200,6 +203,7 @@ var _ = Describe("Reconciler", Ordered, func() { Namespace: "", Raw: lo.ToPtr(string(marshalledJobSpec)), }, + Status: console.StackStatusPending, } fakeConsoleClient := mocks.NewClientMock(mocks.TestingT)