From 7bb3148e3d14eb565055c42ebcf0c6eef5213186 Mon Sep 17 00:00:00 2001 From: hbelmiro Date: Tue, 9 Apr 2024 17:05:49 -0300 Subject: [PATCH] : : Added OwnerReferences to ScheduledWorkflow Signed-off-by: hbelmiro --- .../apiserver/resource/resource_manager.go | 23 ++++++++++++++++++- .../src/apiserver/template/argo_template.go | 7 ++++-- backend/src/apiserver/template/template.go | 2 +- .../src/apiserver/template/template_test.go | 7 ++++-- backend/src/apiserver/template/v2_template.go | 7 ++++-- 5 files changed, 38 insertions(+), 8 deletions(-) diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index 94442f2bd98..422e053b0ff 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -967,7 +967,7 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model // TODO(gkcalat): consider changing the flow. Other resource UUIDs are assigned by their respective stores (DB). // Convert modelJob into scheduledWorkflow. - scheduledWorkflow, err := tmpl.ScheduledWorkflow(job) + scheduledWorkflow, err := tmpl.ScheduledWorkflow(job, r.getOwnerReferences()) if err != nil { return nil, util.Wrap(err, "Failed to create a recurring run during scheduled workflow creation") } @@ -1012,6 +1012,27 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model return r.jobStore.CreateJob(job) } +func (r *ResourceManager) getOwnerReferences() []v1.OwnerReference { + ownerName := common.GetStringConfigWithDefault("OWNER_NAME", "") + ownerAPIVersion := common.GetStringConfigWithDefault("OWNER_API_VERSION", "") + ownerKind := common.GetStringConfigWithDefault("OWNER_KIND", "") + ownerUID := types.UID(common.GetStringConfigWithDefault("OWNER_UID", "")) + + if ownerName == "" || ownerAPIVersion == "" || ownerKind == "" || ownerUID == "" { + glog.Info("Missing ScheduledWorkflow owner fields. Proceeding without OwnerReferences") + return []v1.OwnerReference{} + } else { + return []v1.OwnerReference{ + { + APIVersion: ownerAPIVersion, + Kind: ownerKind, + Name: ownerName, + UID: ownerUID, + }, + } + } +} + // Enables or disables a recurring run with given id. func (r *ResourceManager) ChangeJobMode(ctx context.Context, jobId string, enable bool) error { job, err := r.GetJob(jobId) diff --git a/backend/src/apiserver/template/argo_template.go b/backend/src/apiserver/template/argo_template.go index 638642b4c2f..29c281dcaa1 100644 --- a/backend/src/apiserver/template/argo_template.go +++ b/backend/src/apiserver/template/argo_template.go @@ -94,7 +94,7 @@ type Argo struct { wf *util.Workflow } -func (t *Argo) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.ScheduledWorkflow, error) { +func (t *Argo) ScheduledWorkflow(modelJob *model.Job, ownerReferences []metav1.OwnerReference) (*scheduledworkflow.ScheduledWorkflow, error) { workflow := util.NewWorkflow(t.wf.Workflow.DeepCopy()) // Overwrite namespace from the job object if modelJob.Namespace != "" { @@ -137,7 +137,10 @@ func (t *Argo) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Schedu APIVersion: "kubeflow.org/v1beta1", Kind: "ScheduledWorkflow", }, - ObjectMeta: metav1.ObjectMeta{GenerateName: swfGeneratedName}, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: swfGeneratedName, + OwnerReferences: ownerReferences, + }, Spec: scheduledworkflow.ScheduledWorkflowSpec{ Enabled: modelJob.Enabled, MaxConcurrency: &modelJob.MaxConcurrency, diff --git a/backend/src/apiserver/template/template.go b/backend/src/apiserver/template/template.go index 611b05b3f3f..4753dadc9fd 100644 --- a/backend/src/apiserver/template/template.go +++ b/backend/src/apiserver/template/template.go @@ -128,7 +128,7 @@ type Template interface { // Get workflow RunWorkflow(modelRun *model.Run, options RunWorkflowOptions) (util.ExecutionSpec, error) - ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.ScheduledWorkflow, error) + ScheduledWorkflow(modelJob *model.Job, ownerReferences []metav1.OwnerReference) (*scheduledworkflow.ScheduledWorkflow, error) } type RunWorkflowOptions struct { diff --git a/backend/src/apiserver/template/template_test.go b/backend/src/apiserver/template/template_test.go index 082bf7bb25c..3d0a7a1a78a 100644 --- a/backend/src/apiserver/template/template_test.go +++ b/backend/src/apiserver/template/template_test.go @@ -202,7 +202,10 @@ func TestScheduledWorkflow(t *testing.T) { APIVersion: "kubeflow.org/v2beta1", Kind: "ScheduledWorkflow", }, - ObjectMeta: metav1.ObjectMeta{GenerateName: "name1"}, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "name1", + OwnerReferences: []metav1.OwnerReference{}, + }, Spec: scheduledworkflow.ScheduledWorkflowSpec{ Enabled: true, MaxConcurrency: util.Int64Pointer(1), @@ -221,7 +224,7 @@ func TestScheduledWorkflow(t *testing.T) { }, } - actualScheduledWorkflow, err := v2Template.ScheduledWorkflow(modelJob) + actualScheduledWorkflow, err := v2Template.ScheduledWorkflow(modelJob, []metav1.OwnerReference{}) assert.Nil(t, err) // We don't compare this field because it changes with every driver/launcher image release. diff --git a/backend/src/apiserver/template/v2_template.go b/backend/src/apiserver/template/v2_template.go index ac627dd935d..35c1498ae00 100644 --- a/backend/src/apiserver/template/v2_template.go +++ b/backend/src/apiserver/template/v2_template.go @@ -41,7 +41,7 @@ type V2Spec struct { } // Converts modelJob to ScheduledWorkflow. -func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.ScheduledWorkflow, error) { +func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job, ownerReferences []metav1.OwnerReference) (*scheduledworkflow.ScheduledWorkflow, error) { job := &pipelinespec.PipelineJob{} bytes, err := protojson.Marshal(t.spec) @@ -108,7 +108,10 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche APIVersion: "kubeflow.org/v2beta1", Kind: "ScheduledWorkflow", }, - ObjectMeta: metav1.ObjectMeta{GenerateName: swfGeneratedName}, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: swfGeneratedName, + OwnerReferences: ownerReferences, + }, Spec: scheduledworkflow.ScheduledWorkflowSpec{ Enabled: modelJob.Enabled, MaxConcurrency: &modelJob.MaxConcurrency,