Skip to content

Commit

Permalink
[KS-590] Auto-approval for workflow spec deletion (#15414)
Browse files Browse the repository at this point in the history
  • Loading branch information
bolekk committed Dec 2, 2024
1 parent 407eda0 commit 2f3ff0e
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 1 deletion.
21 changes: 21 additions & 0 deletions core/services/feeds/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,27 @@ func (s *service) DeleteJob(ctx context.Context, args *DeleteJobArgs) (int64, er
logger.Errorw("Failed to push metrics for job proposal deletion", "err", err)
}

// auto-cancellation for Workflow specs
if !proposal.ExternalJobID.Valid {
logger.Infow("ExternalJobID is null", "id", proposal.ID, "name", proposal.Name)
return proposal.ID, nil
}
job, err := s.jobORM.FindJobByExternalJobID(ctx, proposal.ExternalJobID.UUID)
if err != nil {
// NOTE: at this stage, we don't know if this job is of Workflow type
// so we don't want to return an error
logger.Infow("FindJobByExternalJobID failed", "id", proposal.ID, "externalJobID", proposal.ExternalJobID.UUID, "name", proposal.Name)
return proposal.ID, nil
}
if job.WorkflowSpecID != nil { // this is a Workflow job
specID := int64(*job.WorkflowSpecID)
if err := s.CancelSpec(ctx, proposal.ID); err != nil {
logger.Errorw("Failed to auto-cancel workflow spec", "id", specID, "err", err, "name", job.Name)
return 0, fmt.Errorf("failed to auto-cancel workflow spec %d: %w", specID, err)
}
logger.Infow("Successfully auto-cancelled a workflow spec", "id", specID)
}

return proposal.ID, nil
}

Expand Down
47 changes: 46 additions & 1 deletion core/services/feeds/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1269,12 +1269,25 @@ func Test_Service_DeleteJob(t *testing.T) {
}

approved = feeds.JobProposal{
ID: 1,
ID: 321,
FeedsManagerID: 1,
RemoteUUID: remoteUUID,
ExternalJobID: uuid.NullUUID{UUID: uuid.New(), Valid: true},
Status: feeds.JobProposalStatusApproved,
}

wfSpecID = int32(4321)
workflowJob = job.Job{
ID: 1,
WorkflowSpecID: &wfSpecID,
}
spec = &feeds.JobProposalSpec{
ID: 20,
Status: feeds.SpecStatusApproved,
JobProposalID: approved.ID,
Version: 1,
}

httpTimeout = *commonconfig.MustNewDuration(1 * time.Second)
)

Expand All @@ -1291,6 +1304,7 @@ func Test_Service_DeleteJob(t *testing.T) {
svc.orm.On("GetJobProposalByRemoteUUID", mock.Anything, approved.RemoteUUID).Return(&approved, nil)
svc.orm.On("DeleteProposal", mock.Anything, approved.ID).Return(nil)
svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil)
svc.jobORM.On("FindJobByExternalJobID", mock.Anything, approved.ExternalJobID.UUID).Return(job.Job{}, sql.ErrNoRows)
},
args: args,
wantID: approved.ID,
Expand Down Expand Up @@ -1334,6 +1348,37 @@ func Test_Service_DeleteJob(t *testing.T) {
args: args,
wantErr: "DeleteProposal failed",
},
{
name: "Delete workflow-spec with auto-cancellation",
before: func(svc *TestService) {
svc.orm.On("GetJobProposalByRemoteUUID", mock.Anything, approved.RemoteUUID).Return(&approved, nil)
svc.orm.On("DeleteProposal", mock.Anything, approved.ID).Return(nil)
svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil)
svc.jobORM.On("FindJobByExternalJobID", mock.Anything, approved.ExternalJobID.UUID).Return(workflowJob, nil)

// mocks for CancelSpec()
svc.orm.On("GetSpec", mock.Anything, approved.ID).Return(spec, nil)
svc.orm.On("GetJobProposal", mock.Anything, approved.ID).Return(&approved, nil)
svc.connMgr.On("GetClient", mock.Anything).Return(svc.fmsClient, nil)

svc.orm.On("CancelSpec", mock.Anything, approved.ID).Return(nil)
svc.jobORM.On("FindJobByExternalJobID", mock.Anything, approved.ExternalJobID.UUID).Return(workflowJob, nil)
svc.spawner.On("DeleteJob", mock.Anything, mock.Anything, workflowJob.ID).Return(nil)

svc.fmsClient.On("CancelledJob",
mock.MatchedBy(func(ctx context.Context) bool { return true }),
&proto.CancelledJobRequest{
Uuid: approved.RemoteUUID.String(),
Version: int64(spec.Version),
},
).Return(&proto.CancelledJobResponse{}, nil)
svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil)
svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm))
svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM))
},
args: args,
wantID: approved.ID,
},
}

for _, tc := range testCases {
Expand Down

0 comments on commit 2f3ff0e

Please sign in to comment.