Skip to content

Commit

Permalink
fix: Use job spec namespace if it is available (#292)
Browse files Browse the repository at this point in the history
* use namespace from job spec

* pass run id through secret

* update upsert function

* add owner ref

* fix tests
  • Loading branch information
maciaszczykm authored Oct 2, 2024
1 parent bfa0304 commit ee7174c
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 61 deletions.
4 changes: 3 additions & 1 deletion cmd/agent/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
consolectrl "github.com/pluralsh/deployment-operator/pkg/controller"
"github.com/pluralsh/deployment-operator/pkg/controller/stacks"
v1 "github.com/pluralsh/deployment-operator/pkg/controller/v1"
"k8s.io/apimachinery/pkg/runtime"

"k8s.io/client-go/rest"
ctrclient "sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -49,6 +50,7 @@ func registerConsoleReconcilersOrDie(
mgr *controller.Manager,
config *rest.Config,
k8sClient ctrclient.Client,
scheme *runtime.Scheme,
consoleClient client.Client,
) {
mgr.AddReconcilerOrDie(service.Identifier, func() (v1.Reconciler, error) {
Expand Down Expand Up @@ -78,7 +80,7 @@ func registerConsoleReconcilersOrDie(
os.Exit(1)
}

r := stacks.NewStackReconciler(consoleClient, k8sClient, args.ControllerCacheTTL(), stacksPollInterval, namespace, args.ConsoleUrl(), args.DeployToken())
r := stacks.NewStackReconciler(consoleClient, k8sClient, scheme, args.ControllerCacheTTL(), stacksPollInterval, namespace, args.ConsoleUrl(), args.DeployToken())
return r, nil
})
}
2 changes: 1 addition & 1 deletion cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func main() {
// Initialize Pipeline Gate Cache
cache.InitGateCache(args.ControllerCacheTTL(), extConsoleClient)

registerConsoleReconcilersOrDie(consoleManager, config, kubeManager.GetClient(), extConsoleClient)
registerConsoleReconcilersOrDie(consoleManager, config, kubeManager.GetClient(), kubeManager.GetScheme(), extConsoleClient)
registerKubeReconcilersOrDie(ctx, kubeManager, consoleManager, config, extConsoleClient, discoveryClient)

//+kubebuilder:scaffold:builder
Expand Down
27 changes: 27 additions & 0 deletions internal/utils/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,33 @@ func TryAddControllerRef(ctx context.Context, client ctrlruntimeclient.Client, o
})
}

func TryAddOwnerRef(ctx context.Context, client ctrlruntimeclient.Client, owner ctrlruntimeclient.Object, object ctrlruntimeclient.Object, scheme *runtime.Scheme) error {
key := ctrlruntimeclient.ObjectKeyFromObject(object)

return retry.RetryOnConflict(retry.DefaultRetry, func() error {
if err := client.Get(ctx, key, object); err != nil {
return err
}

if owner.GetDeletionTimestamp() != nil || object.GetDeletionTimestamp() != nil {
return nil
}

original := object.DeepCopyObject().(ctrlruntimeclient.Object)

err := controllerutil.SetOwnerReference(owner, object, scheme)
if err != nil {
return err
}

if reflect.DeepEqual(original.GetOwnerReferences(), object.GetOwnerReferences()) {
return nil
}

return client.Patch(ctx, object, ctrlruntimeclient.MergeFromWithOptions(original, ctrlruntimeclient.MergeFromWithOptimisticLock{}))
})
}

func AsName(val string) string {
return strings.ReplaceAll(val, " ", "-")
}
Expand Down
66 changes: 36 additions & 30 deletions pkg/controller/stacks/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

console "github.com/pluralsh/console/go/client"
"github.com/pluralsh/deployment-operator/internal/metrics"
"github.com/pluralsh/deployment-operator/internal/utils"
consoleclient "github.com/pluralsh/deployment-operator/pkg/client"
"github.com/pluralsh/polly/algorithms"
"github.com/samber/lo"
Expand Down Expand Up @@ -77,26 +78,34 @@ func init() {

func (r *StackReconciler) reconcileRunJob(ctx context.Context, run *console.StackRunFragment) (*batchv1.Job, error) {
logger := log.FromContext(ctx)
jobName := GetRunJobName(run)

name := GetRunResourceName(run)
jobSpec := getRunJobSpec(name, run.JobSpec)
namespace := r.GetRunResourceNamespace(jobSpec)

foundJob := &batchv1.Job{}
if err := r.k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: r.namespace}, foundJob); err != nil {
if err := r.k8sClient.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, foundJob); err != nil {
if !apierrs.IsNotFound(err) {
return nil, err
}

if _, err = r.upsertRunSecret(ctx); err != nil {
secret, err := r.upsertRunSecret(ctx, name, namespace, run.ID)
if err != nil {
return nil, err
}

logger.V(2).Info("generating job", "namespace", r.namespace, "name", jobName)
job := r.GenerateRunJob(run, jobName)

logger.V(2).Info("creating job", "namespace", job.Namespace, "name", job.Name)
job := r.GenerateRunJob(run, jobSpec, name, namespace)
logger.V(2).Info("creating job for stack run", "id", run.ID, "namespace", job.Namespace, "name", job.Name)
if err := r.k8sClient.Create(ctx, job); err != nil {
logger.Error(err, "unable to create job")
return nil, err
}

if err := utils.TryAddOwnerRef(ctx, r.k8sClient, job, secret, r.scheme); err != nil {
logger.Error(err, "error setting owner reference for job secret")
return nil, err
}

metrics.Record().StackRunJobCreation()
if err := r.consoleClient.UpdateStackRun(run.ID, console.StackRunAttributes{
Status: run.Status,
Expand All @@ -110,39 +119,43 @@ func (r *StackReconciler) reconcileRunJob(ctx context.Context, run *console.Stac

return job, nil
}
return foundJob, nil

return foundJob, nil
}

func GetRunJobName(run *console.StackRunFragment) string {
// GetRunResourceName returns a resource name used for a job and a secret connected to a given run.
func GetRunResourceName(run *console.StackRunFragment) string {
return fmt.Sprintf("stack-%s", run.ID)
}

func (r *StackReconciler) GenerateRunJob(run *console.StackRunFragment, name string) *batchv1.Job {
var jobSpec *batchv1.JobSpec
// GetRunResourceNamespace returns a resource namespace used for a job and a secret connected to a given run.
func (r *StackReconciler) GetRunResourceNamespace(jobSpec *batchv1.JobSpec) (namespace string) {
if jobSpec != nil {
namespace = jobSpec.Template.Namespace
}

// Use job spec defined in run as base if it is available.
if run.JobSpec != nil {
jobSpec = getRunJobSpec(name, run.JobSpec)
if namespace == "" {
namespace = r.namespace
}

return
}

func (r *StackReconciler) GenerateRunJob(run *console.StackRunFragment, jobSpec *batchv1.JobSpec, name, namespace string) *batchv1.Job {
// If user-defined job spec was not available initialize it here.
if jobSpec == nil {
jobSpec = &batchv1.JobSpec{}
}

// Set requirements like name, namespace, container and volume.
jobSpec.Template.ObjectMeta.Name = name
jobSpec.Template.ObjectMeta.Namespace = namespace

if jobSpec.Template.Annotations == nil {
jobSpec.Template.Annotations = map[string]string{}
}
jobSpec.Template.Annotations[podDefaultContainerAnnotation] = DefaultJobContainer

if jobSpec.Template.ObjectMeta.Namespace == "" {
jobSpec.Template.ObjectMeta.Namespace = r.namespace
}

jobSpec.Template.Spec.RestartPolicy = corev1.RestartPolicyNever

jobSpec.BackoffLimit = lo.ToPtr(int32(0))
Expand All @@ -157,7 +170,7 @@ func (r *StackReconciler) GenerateRunJob(run *console.StackRunFragment, name str
return &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: r.namespace,
Namespace: namespace,
Annotations: map[string]string{jobSelector: name},
Labels: map[string]string{jobSelector: name},
},
Expand Down Expand Up @@ -209,9 +222,7 @@ func (r *StackReconciler) ensureDefaultContainer(containers []corev1.Container,
containers[index].Image = r.getDefaultContainerImage(run)
}

containers[index].Args = r.getDefaultContainerArgs(run.ID)

containers[index].EnvFrom = r.getDefaultContainerEnvFrom()
containers[index].EnvFrom = r.getDefaultContainerEnvFrom(run)

containers[index].VolumeMounts = ensureDefaultVolumeMounts(containers[index].VolumeMounts)
}
Expand All @@ -222,14 +233,13 @@ func (r *StackReconciler) getDefaultContainer(run *console.StackRunFragment) cor
return corev1.Container{
Name: DefaultJobContainer,
Image: r.getDefaultContainerImage(run),
Args: r.getDefaultContainerArgs(run.ID),
VolumeMounts: []corev1.VolumeMount{
defaultJobContainerVolumeMount,
defaultJobTmpContainerVolumeMount,
},
SecurityContext: ensureDefaultContainerSecurityContext(nil),
Env: make([]corev1.EnvVar, 0),
EnvFrom: r.getDefaultContainerEnvFrom(),
EnvFrom: r.getDefaultContainerEnvFrom(run),
}
}

Expand Down Expand Up @@ -299,22 +309,18 @@ func (r *StackReconciler) getTag(run *console.StackRunFragment) string {
return defaultImageTag
}

func (r *StackReconciler) getDefaultContainerEnvFrom() []corev1.EnvFromSource {
func (r *StackReconciler) getDefaultContainerEnvFrom(run *console.StackRunFragment) []corev1.EnvFromSource {
return []corev1.EnvFromSource{
{
SecretRef: &corev1.SecretEnvSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: jobRunSecretName,
Name: GetRunResourceName(run),
},
},
},
}
}

func (r *StackReconciler) getDefaultContainerArgs(runID string) []string {
return []string{fmt.Sprintf("--stack-run-id=%s", runID)}
}

func ensureDefaultVolumeMounts(mounts []corev1.VolumeMount) []corev1.VolumeMount {
return append(
algorithms.Filter(mounts, func(v corev1.VolumeMount) bool {
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/stacks/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
console "github.com/pluralsh/console/go/client"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/pluralsh/deployment-operator/pkg/test/mocks"
Expand All @@ -16,7 +17,7 @@ func TestGetDefaultContainerImage(t *testing.T) {
var kClient client.Client
fakeConsoleClient := mocks.NewClientMock(t)
namespace := "default"
reconciler := NewStackReconciler(fakeConsoleClient, kClient, time.Minute, 0, namespace, "", "")
reconciler := NewStackReconciler(fakeConsoleClient, kClient, scheme.Scheme, time.Minute, 0, namespace, "", "")
cases := []struct {
name string
run *console.StackRunFragment
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/stacks/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

console "github.com/pluralsh/console/go/client"
"github.com/pluralsh/polly/algorithms"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/util/workqueue"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -25,6 +26,7 @@ const (
type StackReconciler struct {
consoleClient client.Client
k8sClient ctrlclient.Client
scheme *runtime.Scheme
stackQueue workqueue.TypedRateLimitingInterface[string]
stackCache *client.Cache[console.StackRunFragment]
namespace string
Expand All @@ -33,10 +35,11 @@ type StackReconciler struct {
pollInterval time.Duration
}

func NewStackReconciler(consoleClient client.Client, k8sClient ctrlclient.Client, refresh, pollInterval time.Duration, namespace, consoleURL, deployToken string) *StackReconciler {
func NewStackReconciler(consoleClient client.Client, k8sClient ctrlclient.Client, scheme *runtime.Scheme, refresh, pollInterval time.Duration, namespace, consoleURL, deployToken string) *StackReconciler {
return &StackReconciler{
consoleClient: consoleClient,
k8sClient: k8sClient,
scheme: scheme,
stackQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()),
stackCache: client.NewCache[console.StackRunFragment](refresh, func(id string) (*console.StackRunFragment, error) {
return consoleClient.GetStackRun(id)
Expand Down
19 changes: 10 additions & 9 deletions pkg/controller/stacks/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/yaml"

errors2 "github.com/pluralsh/deployment-operator/internal/errors"
Expand Down Expand Up @@ -73,7 +74,7 @@ var _ = Describe("Reconciler", Ordered, func() {
GqlErrors: &gqlerror.List{gqlerror.Errorf("%s", errors2.ErrNotFound.String())},
})

reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, time.Minute, 0, namespace, "", "")
reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, scheme.Scheme, time.Minute, 0, namespace, "", "")

_, err := reconciler.Reconcile(ctx, stackRunId)
Expect(err).NotTo(HaveOccurred())
Expand All @@ -85,7 +86,7 @@ var _ = Describe("Reconciler", Ordered, func() {
GqlErrors: &gqlerror.List{gqlerror.Errorf("unknown error")},
})

reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, time.Minute, 0, namespace, "", "")
reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, scheme.Scheme, time.Minute, 0, namespace, "", "")

_, err := reconciler.Reconcile(ctx, stackRunId)
Expect(err).To(HaveOccurred())
Expand All @@ -100,7 +101,7 @@ var _ = Describe("Reconciler", Ordered, func() {
Status: console.StackStatusPending,
}, nil)

reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, time.Minute, 0, namespace, "", "")
reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, scheme.Scheme, time.Minute, 0, namespace, "", "")

_, err := reconciler.Reconcile(ctx, stackRunId)
Expect(err).NotTo(HaveOccurred())
Expand All @@ -118,13 +119,13 @@ var _ = Describe("Reconciler", Ordered, func() {
fakeConsoleClient.On("GetStackRun", mock.Anything).Return(stackRun, nil)
fakeConsoleClient.On("UpdateStackRun", mock.Anything, mock.Anything).Return(nil)

reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, time.Minute, 0, namespace, "", "")
reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, scheme.Scheme, time.Minute, 0, namespace, "", "")

_, err := reconciler.Reconcile(ctx, stackRunId)
Expect(err).NotTo(HaveOccurred())

job := &batchv1.Job{}
Expect(kClient.Get(ctx, types.NamespacedName{Name: stacks.GetRunJobName(stackRun), Namespace: namespace}, job)).NotTo(HaveOccurred())
Expect(kClient.Get(ctx, types.NamespacedName{Name: stacks.GetRunResourceName(stackRun), Namespace: namespace}, job)).NotTo(HaveOccurred())
Expect(*job.Spec.BackoffLimit).To(Equal(int32(0)))
Expect(job.Spec.Template.Spec.Containers).To(HaveLen(1))
Expect(job.Spec.Template.Spec.Volumes).To(HaveLen(2))
Expand Down Expand Up @@ -157,13 +158,13 @@ var _ = Describe("Reconciler", Ordered, func() {
fakeConsoleClient.On("GetStackRun", mock.Anything).Return(stackRun, nil)
fakeConsoleClient.On("UpdateStackRun", mock.Anything, mock.Anything).Return(nil)

reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, time.Minute, 0, namespace, "", "")
reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, scheme.Scheme, time.Minute, 0, namespace, "", "")

_, err := reconciler.Reconcile(ctx, stackRunId)
Expect(err).NotTo(HaveOccurred())

job := &batchv1.Job{}
Expect(kClient.Get(ctx, types.NamespacedName{Name: stacks.GetRunJobName(stackRun), Namespace: namespace}, job)).NotTo(HaveOccurred())
Expect(kClient.Get(ctx, types.NamespacedName{Name: stacks.GetRunResourceName(stackRun), Namespace: namespace}, job)).NotTo(HaveOccurred())
Expect(*job.Spec.BackoffLimit).To(Equal(int32(0)))
Expect(job.Spec.Template.Spec.Containers).To(HaveLen(3))
Expect(job.Spec.Template.ObjectMeta.Labels).To(ContainElement(labelsValue))
Expand Down Expand Up @@ -213,13 +214,13 @@ var _ = Describe("Reconciler", Ordered, func() {
fakeConsoleClient.On("GetStackRun", mock.Anything).Return(stackRun, nil)
fakeConsoleClient.On("UpdateStackRun", mock.Anything, mock.Anything).Return(nil)

reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, time.Minute, 0, namespace, "", "")
reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, scheme.Scheme, time.Minute, 0, namespace, "", "")

_, err = reconciler.Reconcile(ctx, stackRunId)
Expect(err).NotTo(HaveOccurred())

job := &batchv1.Job{}
Expect(kClient.Get(ctx, types.NamespacedName{Name: stacks.GetRunJobName(stackRun), Namespace: namespace}, job)).NotTo(HaveOccurred())
Expect(kClient.Get(ctx, types.NamespacedName{Name: stacks.GetRunResourceName(stackRun), Namespace: namespace}, job)).NotTo(HaveOccurred())
Expect(*job.Spec.ActiveDeadlineSeconds).To(Equal(*jobSpec.ActiveDeadlineSeconds))
Expect(*job.Spec.BackoffLimit).To(Equal(int32(0))) // Overridden by controller.
Expect(job.Spec.Template.Spec.ServiceAccountName).To(Equal(jobSpec.Template.Spec.ServiceAccountName))
Expand Down
Loading

0 comments on commit ee7174c

Please sign in to comment.