diff --git a/cmd/agent/console.go b/cmd/agent/console.go index 483fc6b5..8de0db4a 100644 --- a/cmd/agent/console.go +++ b/cmd/agent/console.go @@ -10,6 +10,7 @@ import ( consolectrl "github.com/pluralsh/deployment-operator/pkg/controller" "github.com/pluralsh/deployment-operator/pkg/controller/stacks" v1 "github.com/pluralsh/deployment-operator/pkg/controller/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" ctrclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -49,6 +50,7 @@ func registerConsoleReconcilersOrDie( mgr *controller.Manager, config *rest.Config, k8sClient ctrclient.Client, + scheme *runtime.Scheme, consoleClient client.Client, ) { mgr.AddReconcilerOrDie(service.Identifier, func() (v1.Reconciler, error) { @@ -78,7 +80,7 @@ func registerConsoleReconcilersOrDie( os.Exit(1) } - r := stacks.NewStackReconciler(consoleClient, k8sClient, args.ControllerCacheTTL(), stacksPollInterval, namespace, args.ConsoleUrl(), args.DeployToken()) + r := stacks.NewStackReconciler(consoleClient, k8sClient, scheme, args.ControllerCacheTTL(), stacksPollInterval, namespace, args.ConsoleUrl(), args.DeployToken()) return r, nil }) } diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 3813a338..a2e02eb9 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -57,7 +57,7 @@ func main() { // Initialize Pipeline Gate Cache cache.InitGateCache(args.ControllerCacheTTL(), extConsoleClient) - registerConsoleReconcilersOrDie(consoleManager, config, kubeManager.GetClient(), extConsoleClient) + registerConsoleReconcilersOrDie(consoleManager, config, kubeManager.GetClient(), kubeManager.GetScheme(), extConsoleClient) registerKubeReconcilersOrDie(ctx, kubeManager, consoleManager, config, extConsoleClient, discoveryClient) //+kubebuilder:scaffold:builder diff --git a/internal/utils/kubernetes.go b/internal/utils/kubernetes.go index 88eab6ce..a1e6f174 100644 --- a/internal/utils/kubernetes.go +++ b/internal/utils/kubernetes.go @@ -52,6 +52,33 @@ func TryAddControllerRef(ctx context.Context, client ctrlruntimeclient.Client, o }) } +func TryAddOwnerRef(ctx context.Context, client ctrlruntimeclient.Client, owner ctrlruntimeclient.Object, object ctrlruntimeclient.Object, scheme *runtime.Scheme) error { + key := ctrlruntimeclient.ObjectKeyFromObject(object) + + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + if err := client.Get(ctx, key, object); err != nil { + return err + } + + if owner.GetDeletionTimestamp() != nil || object.GetDeletionTimestamp() != nil { + return nil + } + + original := object.DeepCopyObject().(ctrlruntimeclient.Object) + + err := controllerutil.SetOwnerReference(owner, object, scheme) + if err != nil { + return err + } + + if reflect.DeepEqual(original.GetOwnerReferences(), object.GetOwnerReferences()) { + return nil + } + + return client.Patch(ctx, object, ctrlruntimeclient.MergeFromWithOptions(original, ctrlruntimeclient.MergeFromWithOptimisticLock{})) + }) +} + func AsName(val string) string { return strings.ReplaceAll(val, " ", "-") } diff --git a/pkg/controller/stacks/job.go b/pkg/controller/stacks/job.go index 83cc614b..cd4437e1 100644 --- a/pkg/controller/stacks/job.go +++ b/pkg/controller/stacks/job.go @@ -8,6 +8,7 @@ import ( console "github.com/pluralsh/console/go/client" "github.com/pluralsh/deployment-operator/internal/metrics" + "github.com/pluralsh/deployment-operator/internal/utils" consoleclient "github.com/pluralsh/deployment-operator/pkg/client" "github.com/pluralsh/polly/algorithms" "github.com/samber/lo" @@ -77,26 +78,34 @@ func init() { func (r *StackReconciler) reconcileRunJob(ctx context.Context, run *console.StackRunFragment) (*batchv1.Job, error) { logger := log.FromContext(ctx) - jobName := GetRunJobName(run) + + name := GetRunResourceName(run) + jobSpec := getRunJobSpec(name, run.JobSpec) + namespace := r.GetRunResourceNamespace(jobSpec) + foundJob := &batchv1.Job{} - if err := r.k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: r.namespace}, foundJob); err != nil { + if err := r.k8sClient.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, foundJob); err != nil { if !apierrs.IsNotFound(err) { return nil, err } - if _, err = r.upsertRunSecret(ctx); err != nil { + secret, err := r.upsertRunSecret(ctx, name, namespace, run.ID) + if err != nil { return nil, err } - logger.V(2).Info("generating job", "namespace", r.namespace, "name", jobName) - job := r.GenerateRunJob(run, jobName) - - logger.V(2).Info("creating job", "namespace", job.Namespace, "name", job.Name) + job := r.GenerateRunJob(run, jobSpec, name, namespace) + logger.V(2).Info("creating job for stack run", "id", run.ID, "namespace", job.Namespace, "name", job.Name) if err := r.k8sClient.Create(ctx, job); err != nil { logger.Error(err, "unable to create job") return nil, err } + if err := utils.TryAddOwnerRef(ctx, r.k8sClient, job, secret, r.scheme); err != nil { + logger.Error(err, "error setting owner reference for job secret") + return nil, err + } + metrics.Record().StackRunJobCreation() if err := r.consoleClient.UpdateStackRun(run.ID, console.StackRunAttributes{ Status: run.Status, @@ -110,22 +119,29 @@ func (r *StackReconciler) reconcileRunJob(ctx context.Context, run *console.Stac return job, nil } - return foundJob, nil + return foundJob, nil } -func GetRunJobName(run *console.StackRunFragment) string { +// GetRunResourceName returns a resource name used for a job and a secret connected to a given run. +func GetRunResourceName(run *console.StackRunFragment) string { return fmt.Sprintf("stack-%s", run.ID) } -func (r *StackReconciler) GenerateRunJob(run *console.StackRunFragment, name string) *batchv1.Job { - var jobSpec *batchv1.JobSpec +// GetRunResourceNamespace returns a resource namespace used for a job and a secret connected to a given run. +func (r *StackReconciler) GetRunResourceNamespace(jobSpec *batchv1.JobSpec) (namespace string) { + if jobSpec != nil { + namespace = jobSpec.Template.Namespace + } - // Use job spec defined in run as base if it is available. - if run.JobSpec != nil { - jobSpec = getRunJobSpec(name, run.JobSpec) + if namespace == "" { + namespace = r.namespace } + return +} + +func (r *StackReconciler) GenerateRunJob(run *console.StackRunFragment, jobSpec *batchv1.JobSpec, name, namespace string) *batchv1.Job { // If user-defined job spec was not available initialize it here. if jobSpec == nil { jobSpec = &batchv1.JobSpec{} @@ -133,16 +149,13 @@ func (r *StackReconciler) GenerateRunJob(run *console.StackRunFragment, name str // Set requirements like name, namespace, container and volume. jobSpec.Template.ObjectMeta.Name = name + jobSpec.Template.ObjectMeta.Namespace = namespace if jobSpec.Template.Annotations == nil { jobSpec.Template.Annotations = map[string]string{} } jobSpec.Template.Annotations[podDefaultContainerAnnotation] = DefaultJobContainer - if jobSpec.Template.ObjectMeta.Namespace == "" { - jobSpec.Template.ObjectMeta.Namespace = r.namespace - } - jobSpec.Template.Spec.RestartPolicy = corev1.RestartPolicyNever jobSpec.BackoffLimit = lo.ToPtr(int32(0)) @@ -157,7 +170,7 @@ func (r *StackReconciler) GenerateRunJob(run *console.StackRunFragment, name str return &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: name, - Namespace: r.namespace, + Namespace: namespace, Annotations: map[string]string{jobSelector: name}, Labels: map[string]string{jobSelector: name}, }, @@ -209,9 +222,7 @@ func (r *StackReconciler) ensureDefaultContainer(containers []corev1.Container, containers[index].Image = r.getDefaultContainerImage(run) } - containers[index].Args = r.getDefaultContainerArgs(run.ID) - - containers[index].EnvFrom = r.getDefaultContainerEnvFrom() + containers[index].EnvFrom = r.getDefaultContainerEnvFrom(run) containers[index].VolumeMounts = ensureDefaultVolumeMounts(containers[index].VolumeMounts) } @@ -222,14 +233,13 @@ func (r *StackReconciler) getDefaultContainer(run *console.StackRunFragment) cor return corev1.Container{ Name: DefaultJobContainer, Image: r.getDefaultContainerImage(run), - Args: r.getDefaultContainerArgs(run.ID), VolumeMounts: []corev1.VolumeMount{ defaultJobContainerVolumeMount, defaultJobTmpContainerVolumeMount, }, SecurityContext: ensureDefaultContainerSecurityContext(nil), Env: make([]corev1.EnvVar, 0), - EnvFrom: r.getDefaultContainerEnvFrom(), + EnvFrom: r.getDefaultContainerEnvFrom(run), } } @@ -299,22 +309,18 @@ func (r *StackReconciler) getTag(run *console.StackRunFragment) string { return defaultImageTag } -func (r *StackReconciler) getDefaultContainerEnvFrom() []corev1.EnvFromSource { +func (r *StackReconciler) getDefaultContainerEnvFrom(run *console.StackRunFragment) []corev1.EnvFromSource { return []corev1.EnvFromSource{ { SecretRef: &corev1.SecretEnvSource{ LocalObjectReference: corev1.LocalObjectReference{ - Name: jobRunSecretName, + Name: GetRunResourceName(run), }, }, }, } } -func (r *StackReconciler) getDefaultContainerArgs(runID string) []string { - return []string{fmt.Sprintf("--stack-run-id=%s", runID)} -} - func ensureDefaultVolumeMounts(mounts []corev1.VolumeMount) []corev1.VolumeMount { return append( algorithms.Filter(mounts, func(v corev1.VolumeMount) bool { diff --git a/pkg/controller/stacks/job_test.go b/pkg/controller/stacks/job_test.go index 0dca0fab..d181e234 100644 --- a/pkg/controller/stacks/job_test.go +++ b/pkg/controller/stacks/job_test.go @@ -7,6 +7,7 @@ import ( console "github.com/pluralsh/console/go/client" "github.com/samber/lo" "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/pluralsh/deployment-operator/pkg/test/mocks" @@ -16,7 +17,7 @@ func TestGetDefaultContainerImage(t *testing.T) { var kClient client.Client fakeConsoleClient := mocks.NewClientMock(t) namespace := "default" - reconciler := NewStackReconciler(fakeConsoleClient, kClient, time.Minute, 0, namespace, "", "") + reconciler := NewStackReconciler(fakeConsoleClient, kClient, scheme.Scheme, time.Minute, 0, namespace, "", "") cases := []struct { name string run *console.StackRunFragment diff --git a/pkg/controller/stacks/reconciler.go b/pkg/controller/stacks/reconciler.go index 8c59df0d..b318e870 100644 --- a/pkg/controller/stacks/reconciler.go +++ b/pkg/controller/stacks/reconciler.go @@ -7,6 +7,7 @@ import ( console "github.com/pluralsh/console/go/client" "github.com/pluralsh/polly/algorithms" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/util/workqueue" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" @@ -25,6 +26,7 @@ const ( type StackReconciler struct { consoleClient client.Client k8sClient ctrlclient.Client + scheme *runtime.Scheme stackQueue workqueue.TypedRateLimitingInterface[string] stackCache *client.Cache[console.StackRunFragment] namespace string @@ -33,10 +35,11 @@ type StackReconciler struct { pollInterval time.Duration } -func NewStackReconciler(consoleClient client.Client, k8sClient ctrlclient.Client, refresh, pollInterval time.Duration, namespace, consoleURL, deployToken string) *StackReconciler { +func NewStackReconciler(consoleClient client.Client, k8sClient ctrlclient.Client, scheme *runtime.Scheme, refresh, pollInterval time.Duration, namespace, consoleURL, deployToken string) *StackReconciler { return &StackReconciler{ consoleClient: consoleClient, k8sClient: k8sClient, + scheme: scheme, stackQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()), stackCache: client.NewCache[console.StackRunFragment](refresh, func(id string) (*console.StackRunFragment, error) { return consoleClient.GetStackRun(id) diff --git a/pkg/controller/stacks/reconciler_test.go b/pkg/controller/stacks/reconciler_test.go index 4affe2ef..e411bb9f 100644 --- a/pkg/controller/stacks/reconciler_test.go +++ b/pkg/controller/stacks/reconciler_test.go @@ -16,6 +16,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/yaml" errors2 "github.com/pluralsh/deployment-operator/internal/errors" @@ -73,7 +74,7 @@ var _ = Describe("Reconciler", Ordered, func() { GqlErrors: &gqlerror.List{gqlerror.Errorf("%s", errors2.ErrNotFound.String())}, }) - reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, time.Minute, 0, namespace, "", "") + reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, scheme.Scheme, time.Minute, 0, namespace, "", "") _, err := reconciler.Reconcile(ctx, stackRunId) Expect(err).NotTo(HaveOccurred()) @@ -85,7 +86,7 @@ var _ = Describe("Reconciler", Ordered, func() { GqlErrors: &gqlerror.List{gqlerror.Errorf("unknown error")}, }) - reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, time.Minute, 0, namespace, "", "") + reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, scheme.Scheme, time.Minute, 0, namespace, "", "") _, err := reconciler.Reconcile(ctx, stackRunId) Expect(err).To(HaveOccurred()) @@ -100,7 +101,7 @@ var _ = Describe("Reconciler", Ordered, func() { Status: console.StackStatusPending, }, nil) - reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, time.Minute, 0, namespace, "", "") + reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, scheme.Scheme, time.Minute, 0, namespace, "", "") _, err := reconciler.Reconcile(ctx, stackRunId) Expect(err).NotTo(HaveOccurred()) @@ -118,13 +119,13 @@ var _ = Describe("Reconciler", Ordered, func() { fakeConsoleClient.On("GetStackRun", mock.Anything).Return(stackRun, nil) fakeConsoleClient.On("UpdateStackRun", mock.Anything, mock.Anything).Return(nil) - reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, time.Minute, 0, namespace, "", "") + reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, scheme.Scheme, time.Minute, 0, namespace, "", "") _, err := reconciler.Reconcile(ctx, stackRunId) Expect(err).NotTo(HaveOccurred()) job := &batchv1.Job{} - Expect(kClient.Get(ctx, types.NamespacedName{Name: stacks.GetRunJobName(stackRun), Namespace: namespace}, job)).NotTo(HaveOccurred()) + Expect(kClient.Get(ctx, types.NamespacedName{Name: stacks.GetRunResourceName(stackRun), Namespace: namespace}, job)).NotTo(HaveOccurred()) Expect(*job.Spec.BackoffLimit).To(Equal(int32(0))) Expect(job.Spec.Template.Spec.Containers).To(HaveLen(1)) Expect(job.Spec.Template.Spec.Volumes).To(HaveLen(2)) @@ -157,13 +158,13 @@ var _ = Describe("Reconciler", Ordered, func() { fakeConsoleClient.On("GetStackRun", mock.Anything).Return(stackRun, nil) fakeConsoleClient.On("UpdateStackRun", mock.Anything, mock.Anything).Return(nil) - reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, time.Minute, 0, namespace, "", "") + reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, scheme.Scheme, time.Minute, 0, namespace, "", "") _, err := reconciler.Reconcile(ctx, stackRunId) Expect(err).NotTo(HaveOccurred()) job := &batchv1.Job{} - Expect(kClient.Get(ctx, types.NamespacedName{Name: stacks.GetRunJobName(stackRun), Namespace: namespace}, job)).NotTo(HaveOccurred()) + Expect(kClient.Get(ctx, types.NamespacedName{Name: stacks.GetRunResourceName(stackRun), Namespace: namespace}, job)).NotTo(HaveOccurred()) Expect(*job.Spec.BackoffLimit).To(Equal(int32(0))) Expect(job.Spec.Template.Spec.Containers).To(HaveLen(3)) Expect(job.Spec.Template.ObjectMeta.Labels).To(ContainElement(labelsValue)) @@ -213,13 +214,13 @@ var _ = Describe("Reconciler", Ordered, func() { fakeConsoleClient.On("GetStackRun", mock.Anything).Return(stackRun, nil) fakeConsoleClient.On("UpdateStackRun", mock.Anything, mock.Anything).Return(nil) - reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, time.Minute, 0, namespace, "", "") + reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, scheme.Scheme, time.Minute, 0, namespace, "", "") _, err = reconciler.Reconcile(ctx, stackRunId) Expect(err).NotTo(HaveOccurred()) job := &batchv1.Job{} - Expect(kClient.Get(ctx, types.NamespacedName{Name: stacks.GetRunJobName(stackRun), Namespace: namespace}, job)).NotTo(HaveOccurred()) + Expect(kClient.Get(ctx, types.NamespacedName{Name: stacks.GetRunResourceName(stackRun), Namespace: namespace}, job)).NotTo(HaveOccurred()) Expect(*job.Spec.ActiveDeadlineSeconds).To(Equal(*jobSpec.ActiveDeadlineSeconds)) Expect(*job.Spec.BackoffLimit).To(Equal(int32(0))) // Overridden by controller. Expect(job.Spec.Template.Spec.ServiceAccountName).To(Equal(jobSpec.Template.Spec.ServiceAccountName)) diff --git a/pkg/controller/stacks/secret.go b/pkg/controller/stacks/secret.go index 70199f13..1c817462 100644 --- a/pkg/controller/stacks/secret.go +++ b/pkg/controller/stacks/secret.go @@ -11,39 +11,40 @@ import ( ) const ( - jobRunSecretName = "job-run-env" - envConsoleUrl = "PLRL_CONSOLE_URL" - envConsoleToken = "PLRL_CONSOLE_TOKEN" + envConsoleURL = "PLRL_CONSOLE_URL" + envConsoleToken = "PLRL_CONSOLE_TOKEN" + envStackRunID = "PLRL_STACK_RUN_ID" ) -func (r *StackReconciler) getRunSecretData() map[string]string { +func (r *StackReconciler) getRunSecretData(runID string) map[string]string { return map[string]string{ - envConsoleUrl: r.consoleURL, + envConsoleURL: r.consoleURL, envConsoleToken: r.deployToken, + envStackRunID: runID, } } -func (r *StackReconciler) hasRunSecretData(data map[string][]byte) bool { +func (r *StackReconciler) hasRunSecretData(data map[string][]byte, runID string) bool { token, hasToken := data[envConsoleToken] - url, hasUrl := data[envConsoleUrl] - return hasToken && hasUrl && string(token) == r.deployToken && string(url) == r.consoleURL + url, hasUrl := data[envConsoleURL] + id, hasID := data[envConsoleURL] + return hasToken && hasUrl && hasID && + string(token) == r.deployToken && string(url) == r.consoleURL && string(id) == runID } -func (r *StackReconciler) upsertRunSecret(ctx context.Context) (*corev1.Secret, error) { +func (r *StackReconciler) upsertRunSecret(ctx context.Context, name, namespace, runID string) (*corev1.Secret, error) { logger := log.FromContext(ctx) - secret := &corev1.Secret{} - if err := r.k8sClient.Get(ctx, types.NamespacedName{Name: jobRunSecretName, Namespace: r.namespace}, secret); err != nil { + secret := &corev1.Secret{} + if err := r.k8sClient.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, secret); err != nil { if !apierrs.IsNotFound(err) { return nil, err } - logger.V(2).Info("generating secret", "namespace", r.namespace, "name", jobRunSecretName) secret = &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{Name: jobRunSecretName, Namespace: r.namespace}, - StringData: r.getRunSecretData(), + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}, + StringData: r.getRunSecretData(runID), } - logger.V(2).Info("creating secret", "namespace", secret.Namespace, "name", secret.Name) if err := r.k8sClient.Create(ctx, secret); err != nil { logger.Error(err, "unable to create secret") @@ -53,9 +54,9 @@ func (r *StackReconciler) upsertRunSecret(ctx context.Context) (*corev1.Secret, return secret, nil } - if !r.hasRunSecretData(secret.Data) { + if !r.hasRunSecretData(secret.Data, runID) { logger.V(2).Info("updating secret", "namespace", secret.Namespace, "name", secret.Name) - secret.StringData = r.getRunSecretData() + secret.StringData = r.getRunSecretData(runID) if err := r.k8sClient.Update(ctx, secret); err != nil { logger.Error(err, "unable to update secret") return nil, err @@ -63,5 +64,4 @@ func (r *StackReconciler) upsertRunSecret(ctx context.Context) (*corev1.Secret, } return secret, nil - }