Skip to content

Commit

Permalink
<UPSTREAM>: <carry>: Added OwnerReferences to ScheduledWorkflow
Browse files Browse the repository at this point in the history
Signed-off-by: hbelmiro <[email protected]>
  • Loading branch information
hbelmiro committed Apr 26, 2024
1 parent 95cfc15 commit 7bb3148
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 8 deletions.
23 changes: 22 additions & 1 deletion backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,7 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model

// TODO(gkcalat): consider changing the flow. Other resource UUIDs are assigned by their respective stores (DB).
// Convert modelJob into scheduledWorkflow.
scheduledWorkflow, err := tmpl.ScheduledWorkflow(job)
scheduledWorkflow, err := tmpl.ScheduledWorkflow(job, r.getOwnerReferences())
if err != nil {
return nil, util.Wrap(err, "Failed to create a recurring run during scheduled workflow creation")
}
Expand Down Expand Up @@ -1012,6 +1012,27 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model
return r.jobStore.CreateJob(job)
}

func (r *ResourceManager) getOwnerReferences() []v1.OwnerReference {
ownerName := common.GetStringConfigWithDefault("OWNER_NAME", "")
ownerAPIVersion := common.GetStringConfigWithDefault("OWNER_API_VERSION", "")
ownerKind := common.GetStringConfigWithDefault("OWNER_KIND", "")
ownerUID := types.UID(common.GetStringConfigWithDefault("OWNER_UID", ""))

if ownerName == "" || ownerAPIVersion == "" || ownerKind == "" || ownerUID == "" {
glog.Info("Missing ScheduledWorkflow owner fields. Proceeding without OwnerReferences")
return []v1.OwnerReference{}
} else {
return []v1.OwnerReference{
{
APIVersion: ownerAPIVersion,
Kind: ownerKind,
Name: ownerName,
UID: ownerUID,
},
}
}
}

// Enables or disables a recurring run with given id.
func (r *ResourceManager) ChangeJobMode(ctx context.Context, jobId string, enable bool) error {
job, err := r.GetJob(jobId)
Expand Down
7 changes: 5 additions & 2 deletions backend/src/apiserver/template/argo_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ type Argo struct {
wf *util.Workflow
}

func (t *Argo) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.ScheduledWorkflow, error) {
func (t *Argo) ScheduledWorkflow(modelJob *model.Job, ownerReferences []metav1.OwnerReference) (*scheduledworkflow.ScheduledWorkflow, error) {
workflow := util.NewWorkflow(t.wf.Workflow.DeepCopy())
// Overwrite namespace from the job object
if modelJob.Namespace != "" {
Expand Down Expand Up @@ -137,7 +137,10 @@ func (t *Argo) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Schedu
APIVersion: "kubeflow.org/v1beta1",
Kind: "ScheduledWorkflow",
},
ObjectMeta: metav1.ObjectMeta{GenerateName: swfGeneratedName},
ObjectMeta: metav1.ObjectMeta{
GenerateName: swfGeneratedName,
OwnerReferences: ownerReferences,
},
Spec: scheduledworkflow.ScheduledWorkflowSpec{
Enabled: modelJob.Enabled,
MaxConcurrency: &modelJob.MaxConcurrency,
Expand Down
2 changes: 1 addition & 1 deletion backend/src/apiserver/template/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ type Template interface {
// Get workflow
RunWorkflow(modelRun *model.Run, options RunWorkflowOptions) (util.ExecutionSpec, error)

ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.ScheduledWorkflow, error)
ScheduledWorkflow(modelJob *model.Job, ownerReferences []metav1.OwnerReference) (*scheduledworkflow.ScheduledWorkflow, error)
}

type RunWorkflowOptions struct {
Expand Down
7 changes: 5 additions & 2 deletions backend/src/apiserver/template/template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,10 @@ func TestScheduledWorkflow(t *testing.T) {
APIVersion: "kubeflow.org/v2beta1",
Kind: "ScheduledWorkflow",
},
ObjectMeta: metav1.ObjectMeta{GenerateName: "name1"},
ObjectMeta: metav1.ObjectMeta{
GenerateName: "name1",
OwnerReferences: []metav1.OwnerReference{},
},
Spec: scheduledworkflow.ScheduledWorkflowSpec{
Enabled: true,
MaxConcurrency: util.Int64Pointer(1),
Expand All @@ -221,7 +224,7 @@ func TestScheduledWorkflow(t *testing.T) {
},
}

actualScheduledWorkflow, err := v2Template.ScheduledWorkflow(modelJob)
actualScheduledWorkflow, err := v2Template.ScheduledWorkflow(modelJob, []metav1.OwnerReference{})
assert.Nil(t, err)

// We don't compare this field because it changes with every driver/launcher image release.
Expand Down
7 changes: 5 additions & 2 deletions backend/src/apiserver/template/v2_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type V2Spec struct {
}

// Converts modelJob to ScheduledWorkflow.
func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.ScheduledWorkflow, error) {
func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job, ownerReferences []metav1.OwnerReference) (*scheduledworkflow.ScheduledWorkflow, error) {
job := &pipelinespec.PipelineJob{}

bytes, err := protojson.Marshal(t.spec)
Expand Down Expand Up @@ -108,7 +108,10 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche
APIVersion: "kubeflow.org/v2beta1",
Kind: "ScheduledWorkflow",
},
ObjectMeta: metav1.ObjectMeta{GenerateName: swfGeneratedName},
ObjectMeta: metav1.ObjectMeta{
GenerateName: swfGeneratedName,
OwnerReferences: ownerReferences,
},
Spec: scheduledworkflow.ScheduledWorkflowSpec{
Enabled: modelJob.Enabled,
MaxConcurrency: &modelJob.MaxConcurrency,
Expand Down

0 comments on commit 7bb3148

Please sign in to comment.