From c2b3402e9669ee221ecd3960305d9e95bc779f7e Mon Sep 17 00:00:00 2001 From: Martin Necas Date: Thu, 17 Oct 2024 14:36:33 +0200 Subject: [PATCH] Add offload plugin Issue: when migrating the VMs the VM disks need to be transfered over the network. This is inefficient, takes additional storage and is slow. We need an alternative way for the disk transfer. Design: This PR is PoC of adding an offload plugin for the disk transfer. The offload plugin is a specific tool that needs to be implemented by the CSI provider. To specify the storage offload plugin the user will need to specify it in the StorageMap destination. This will allow the users to migrate the VM even with disks across multiple data store types. For example, some could be managed by the offloadPlugin and some would still go over the network if needed. Example of the storage map with offload plugin: ``` spec: map: - destination: offloadPlugin: image: 'quay.io/mnecas0/offload:latest' vars: test1: test1 test2: test2 storageClass: CSI source: id: datastore-30 ... ``` The offload plugin is started right after the CreateDataVolumes, this the way the Kubernetes CSI will create empty PVs into which the disks can be transfered. The OffloadPlugin step creates a job on the destination cluster. The job provided offload plugin image and start. The job is started with the following parameters: - `HOST` = url to the esxi host - `PLAN_NAME` = plane name - `NAMESPACE` = namepsace where the migration is running In addition to these variables, it also mounts the secrets to the vCenter. The secrets are in the path `/etc/secret` and the files are: - `accessKeyId` with a username - `secretKey` with a password Note: This change additionally requires https://github.com/kubev2v/forklift/pull/1109, because right now the cold migration transfer is managed by the virt-v2v. The #1109 removes this dependency and moves it out to the CNV CDI. This allows us to split the transfer and conversion steps which were in the same step from the forklift perspective. Once the disks are transferred the Forklift does the `virt-v2v-in-place` on the disks and starts the VM. In the same way, this step will be done also on the offload plugin as we will transfer the disks using the offload plugin and then start `virt-v2v-in-place` on the disks. TODO: [ ] Add design doc showing larger details, this is just PoC [ ] Add check if OffloadPlugin image exists [ ] Add check of the offload plugin disk transfer status [ ] Allow storage map with OffloadPlugin and without combination [ ] Improve the name of the offload pugin job, right now its the VM ID Signed-off-by: Martin Necas --- .../forklift.konveyor.io_storagemaps.yaml | 15 ++ pkg/apis/forklift/v1beta1/mapping.go | 8 + pkg/apis/forklift/v1beta1/plan.go | 9 + pkg/controller/plan/BUILD.bazel | 1 + pkg/controller/plan/adapter/base/doc.go | 2 + pkg/controller/plan/adapter/ocp/builder.go | 3 + .../plan/adapter/openstack/builder.go | 3 + pkg/controller/plan/adapter/ova/builder.go | 3 + pkg/controller/plan/adapter/ovirt/builder.go | 3 + .../plan/adapter/vsphere/builder.go | 29 ++- pkg/controller/plan/migration.go | 53 ++++- pkg/controller/plan/offloadplugin.go | 195 ++++++++++++++++++ pkg/controller/plan/validation.go | 2 + 13 files changed, 313 insertions(+), 13 deletions(-) create mode 100644 pkg/controller/plan/offloadplugin.go diff --git a/operator/config/crd/bases/forklift.konveyor.io_storagemaps.yaml b/operator/config/crd/bases/forklift.konveyor.io_storagemaps.yaml index c7c2d8362..6572f32ab 100644 --- a/operator/config/crd/bases/forklift.konveyor.io_storagemaps.yaml +++ b/operator/config/crd/bases/forklift.konveyor.io_storagemaps.yaml @@ -60,6 +60,21 @@ spec: - ReadWriteMany - ReadOnlyMany type: string + offloadPlugin: + description: Offload plugin. + properties: + image: + description: Offload plugin. + type: string + vars: + additionalProperties: + type: string + description: Offload plugin variables passed to the + job + type: object + required: + - image + type: object storageClass: description: A storage class. type: string diff --git a/pkg/apis/forklift/v1beta1/mapping.go b/pkg/apis/forklift/v1beta1/mapping.go index bbd6b60dc..04a5f1ffe 100644 --- a/pkg/apis/forklift/v1beta1/mapping.go +++ b/pkg/apis/forklift/v1beta1/mapping.go @@ -57,6 +57,8 @@ type StoragePair struct { type DestinationStorage struct { // A storage class. StorageClass string `json:"storageClass"` + // Offload plugin. + OffloadPlugin *OffloadPlugin `json:"offloadPlugin,omitempty"` // Volume mode. // +kubebuilder:validation:Enum=Filesystem;Block VolumeMode core.PersistentVolumeMode `json:"volumeMode,omitempty"` @@ -64,6 +66,12 @@ type DestinationStorage struct { // +kubebuilder:validation:Enum=ReadWriteOnce;ReadWriteMany;ReadOnlyMany AccessMode core.PersistentVolumeAccessMode `json:"accessMode,omitempty"` } +type OffloadPlugin struct { + // Offload plugin. + Image string `json:"image"` + // Offload plugin variables passed to the job + Vars map[string]string `json:"vars,omitempty"` +} // Network map spec. type NetworkMapSpec struct { diff --git a/pkg/apis/forklift/v1beta1/plan.go b/pkg/apis/forklift/v1beta1/plan.go index ba6714bbf..cb3c731d8 100644 --- a/pkg/apis/forklift/v1beta1/plan.go +++ b/pkg/apis/forklift/v1beta1/plan.go @@ -118,3 +118,12 @@ func (r *Plan) IsSourceProviderOCP() bool { func (r *Plan) IsSourceProviderOVA() bool { return r.Provider.Source.Type() == Ova } + +func (r *Plan) HasOffloadPlugin() bool { + for _, storageMap := range r.Map.Storage.Spec.Map { + if storageMap.Destination.OffloadPlugin != nil { + return true + } + } + return false +} diff --git a/pkg/controller/plan/BUILD.bazel b/pkg/controller/plan/BUILD.bazel index ce4fc4458..5f3f67e56 100644 --- a/pkg/controller/plan/BUILD.bazel +++ b/pkg/controller/plan/BUILD.bazel @@ -8,6 +8,7 @@ go_library( "hook.go", "kubevirt.go", "migration.go", + "offloadplugin.go", "predicate.go", "util.go", "validation.go", diff --git a/pkg/controller/plan/adapter/base/doc.go b/pkg/controller/plan/adapter/base/doc.go index f7e93fe4b..52cb61805 100644 --- a/pkg/controller/plan/adapter/base/doc.go +++ b/pkg/controller/plan/adapter/base/doc.go @@ -80,6 +80,8 @@ type Builder interface { LunPersistentVolumeClaims(vmRef ref.Ref) (pvcs []core.PersistentVolumeClaim, err error) // check whether the builder supports Volume Populators SupportsVolumePopulators() bool + // check whether the builder supports offload plugin + SupportsOffloadPlugin() bool // Build populator volumes PopulatorVolumes(vmRef ref.Ref, annotations map[string]string, secretName string) ([]*core.PersistentVolumeClaim, error) // Transferred bytes diff --git a/pkg/controller/plan/adapter/ocp/builder.go b/pkg/controller/plan/adapter/ocp/builder.go index dc142fdaf..8555b1896 100644 --- a/pkg/controller/plan/adapter/ocp/builder.go +++ b/pkg/controller/plan/adapter/ocp/builder.go @@ -609,6 +609,9 @@ func pvcSourceName(namespace, name string) string { func (r *Builder) SupportsVolumePopulators() bool { return false } +func (r *Builder) SupportsOffloadPlugin() bool { + return false +} func (r *Builder) PopulatorVolumes(vmRef ref.Ref, annotations map[string]string, secretName string) (pvcs []*core.PersistentVolumeClaim, err error) { err = planbase.VolumePopulatorNotSupportedError diff --git a/pkg/controller/plan/adapter/openstack/builder.go b/pkg/controller/plan/adapter/openstack/builder.go index fc703c332..e90327710 100644 --- a/pkg/controller/plan/adapter/openstack/builder.go +++ b/pkg/controller/plan/adapter/openstack/builder.go @@ -936,6 +936,9 @@ func (r *Builder) LunPersistentVolumeClaims(vmRef ref.Ref) (pvcs []core.Persiste func (r *Builder) SupportsVolumePopulators() bool { return true } +func (r *Builder) SupportsOffloadPlugin() bool { + return false +} func (r *Builder) PopulatorVolumes(vmRef ref.Ref, annotations map[string]string, secretName string) (pvcs []*core.PersistentVolumeClaim, err error) { workload := &model.Workload{} diff --git a/pkg/controller/plan/adapter/ova/builder.go b/pkg/controller/plan/adapter/ova/builder.go index db442ffd2..b1eda3c52 100644 --- a/pkg/controller/plan/adapter/ova/builder.go +++ b/pkg/controller/plan/adapter/ova/builder.go @@ -543,6 +543,9 @@ func (r *Builder) LunPersistentVolumeClaims(vmRef ref.Ref) (pvcs []core.Persiste func (r *Builder) SupportsVolumePopulators() bool { return false } +func (r *Builder) SupportsOffloadPlugin() bool { + return false +} func (r *Builder) PopulatorVolumes(vmRef ref.Ref, annotations map[string]string, secretName string) (pvcs []*core.PersistentVolumeClaim, err error) { err = planbase.VolumePopulatorNotSupportedError diff --git a/pkg/controller/plan/adapter/ovirt/builder.go b/pkg/controller/plan/adapter/ovirt/builder.go index 60390998d..8e2214ce1 100644 --- a/pkg/controller/plan/adapter/ovirt/builder.go +++ b/pkg/controller/plan/adapter/ovirt/builder.go @@ -715,6 +715,9 @@ func (r *Builder) LunPersistentVolumeClaims(vmRef ref.Ref) (pvcs []core.Persiste func (r *Builder) SupportsVolumePopulators() bool { return !r.Context.Plan.Spec.Warm && r.Context.Plan.Provider.Destination.IsHost() } +func (r *Builder) SupportsOffloadPlugin() bool { + return false +} func (r *Builder) PopulatorVolumes(vmRef ref.Ref, annotations map[string]string, secretName string) (pvcs []*core.PersistentVolumeClaim, err error) { workload := &model.Workload{} diff --git a/pkg/controller/plan/adapter/vsphere/builder.go b/pkg/controller/plan/adapter/vsphere/builder.go index 968fa7ff7..ba273decb 100644 --- a/pkg/controller/plan/adapter/vsphere/builder.go +++ b/pkg/controller/plan/adapter/vsphere/builder.go @@ -435,15 +435,21 @@ func (r *Builder) DataVolumes(vmRef ref.Ref, secret *core.Secret, _ *core.Config storageClass := mapped.Destination.StorageClass var dvSource cdi.DataVolumeSource // Let CDI do the copying - dvSource = cdi.DataVolumeSource{ - VDDK: &cdi.DataVolumeSourceVDDK{ - BackingFile: r.baseVolume(disk.File), - UUID: vm.UUID, - URL: url, - SecretRef: secret.Name, - Thumbprint: thumbprint, - InitImageURL: r.Source.Provider.Spec.Settings[api.VDDK], - }, + if mapped.Destination.OffloadPlugin != nil { + dvSource = cdi.DataVolumeSource{ + Blank: &cdi.DataVolumeBlankImage{}, + } + } else { + dvSource = cdi.DataVolumeSource{ + VDDK: &cdi.DataVolumeSourceVDDK{ + BackingFile: r.baseVolume(disk.File), + UUID: vm.UUID, + URL: url, + SecretRef: secret.Name, + Thumbprint: thumbprint, + InitImageURL: r.Source.Provider.Spec.Settings[api.VDDK], + }, + } } dvSpec := cdi.DataVolumeSpec{ Source: &dvSource, @@ -747,6 +753,7 @@ func (r *Builder) Tasks(vmRef ref.Ref) (list []*plan.Task, err error) { err = liberr.Wrap(err, "vm", vmRef.String()) return } + // TODO: Filter offload plugin by the Datastore? for _, disk := range vm.Disks { mB := disk.Capacity / 0x100000 list = append( @@ -954,6 +961,10 @@ func (r *Builder) SupportsVolumePopulators() bool { return false } +func (r *Builder) SupportsOffloadPlugin() bool { + return true +} + func (r *Builder) PopulatorVolumes(vmRef ref.Ref, annotations map[string]string, secretName string) (pvcs []*core.PersistentVolumeClaim, err error) { err = planbase.VolumePopulatorNotSupportedError return diff --git a/pkg/controller/plan/migration.go b/pkg/controller/plan/migration.go index b8262987e..673766a9c 100644 --- a/pkg/controller/plan/migration.go +++ b/pkg/controller/plan/migration.go @@ -49,6 +49,7 @@ var ( CDIDiskCopy libitr.Flag = 0x08 OvaImageMigration libitr.Flag = 0x10 OpenstackImageMigration libitr.Flag = 0x20 + HasOffloadPlugin libitr.Flag = 0x40 ) // Phases. @@ -59,6 +60,7 @@ const ( PowerOffSource = "PowerOffSource" WaitForPowerOff = "WaitForPowerOff" CreateDataVolumes = "CreateDataVolumes" + OffloadPlugin = "OffloadPlugin" CreateVM = "CreateVM" CopyDisks = "CopyDisks" CopyingPaused = "CopyingPaused" @@ -105,6 +107,7 @@ var ( {Name: PowerOffSource}, {Name: WaitForPowerOff}, {Name: CreateDataVolumes}, + {Name: OffloadPlugin, All: HasOffloadPlugin}, {Name: CopyDisks, All: CDIDiskCopy}, {Name: CreateGuestConversionPod, All: RequiresConversion}, {Name: ConvertGuest, All: RequiresConversion}, @@ -640,7 +643,7 @@ func (r *Migration) step(vm *plan.VMStatus) (step string) { switch vm.Phase { case Started, CreateInitialSnapshot, WaitForInitialSnapshot, CreateDataVolumes: step = Initialize - case CopyDisks, CopyingPaused, CreateSnapshot, WaitForSnapshot, AddCheckpoint, ConvertOpenstackSnapshot: + case CopyDisks, CopyingPaused, CreateSnapshot, WaitForSnapshot, AddCheckpoint, ConvertOpenstackSnapshot, OffloadPlugin: step = DiskTransfer case CreateFinalSnapshot, WaitForFinalSnapshot, AddFinalCheckpoint, Finalize: step = Cutover @@ -740,6 +743,44 @@ func (r *Migration) execute(vm *plan.VMStatus) (err error) { } else { vm.Phase = Completed } + case OffloadPlugin: + step, found := vm.FindStep(r.step(vm)) + if !found { + vm.AddError(fmt.Sprintf("Step '%s' not found", r.step(vm))) + break + } + step.MarkStarted() + step.Phase = Running + runner := OffloadPluginRunner{ + Context: r.Context, + kubevirt: r.kubevirt, + } + var job *batchv1.Job + job, err = runner.Run(vm) + if err != nil { + r.Log.Error(err, "Starting the offload plugin", "vm", vm.Name) + step.AddError(err.Error()) + err = nil + return + } + conditions := libcnd.Conditions{} + for _, cnd := range job.Status.Conditions { + conditions.SetCondition(libcnd.Condition{ + Type: string(cnd.Type), + Status: string(cnd.Status), + Reason: cnd.Reason, + Message: cnd.Message, + }) + } + if conditions.HasCondition("Failed") { + step.AddError(conditions.FindCondition("Failed").Message) + step.MarkCompleted() + } else if int(job.Status.Failed) > retry { + step.AddError("Retry limit exceeded.") + step.MarkCompleted() + } else if job.Status.Succeeded > 0 { + vm.Phase = r.next(vm.Phase) + } case CreateDataVolumes: step, found := vm.FindStep(r.step(vm)) if !found { @@ -885,7 +926,9 @@ func (r *Migration) execute(vm *plan.VMStatus) (err error) { } step.MarkStarted() step.Phase = Running - + if r.builder.SupportsOffloadPlugin() { + // TODO: Add progress of the offload plugin transfer + } if r.builder.SupportsVolumePopulators() { err = r.updatePopulatorCopyProgress(vm, step) } else { @@ -1848,12 +1891,14 @@ func (r *Predicate) Evaluate(flag libitr.Flag) (allowed bool, err error) { _, allowed = r.vm.FindHook(PostHook) case RequiresConversion: allowed = r.context.Source.Provider.RequiresConversion() - case OvaImageMigration: - allowed = r.context.Plan.IsSourceProviderOVA() case CDIDiskCopy: allowed = !r.context.Plan.IsSourceProviderOVA() + case OvaImageMigration: + allowed = r.context.Plan.IsSourceProviderOVA() case OpenstackImageMigration: allowed = r.context.Plan.IsSourceProviderOpenstack() + case HasOffloadPlugin: + allowed = r.context.Plan.HasOffloadPlugin() } return diff --git a/pkg/controller/plan/offloadplugin.go b/pkg/controller/plan/offloadplugin.go new file mode 100644 index 000000000..8e3255db2 --- /dev/null +++ b/pkg/controller/plan/offloadplugin.go @@ -0,0 +1,195 @@ +package plan + +import ( + "context" + api "github.com/konveyor/forklift-controller/pkg/apis/forklift/v1beta1" + planapi "github.com/konveyor/forklift-controller/pkg/apis/forklift/v1beta1/plan" + plancontext "github.com/konveyor/forklift-controller/pkg/controller/plan/context" + liberr "github.com/konveyor/forklift-controller/pkg/lib/error" + batch "k8s.io/api/batch/v1" + core "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/kubernetes/scheme" + "path" + "sigs.k8s.io/controller-runtime/pkg/client" + k8sutil "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "strings" +) + +const retry int = 5 + +// OffloadPluginRunner +type OffloadPluginRunner struct { + *plancontext.Context + // VM. + vm *planapi.VMStatus + kubevirt KubeVirt +} + +// Run. +func (r *OffloadPluginRunner) Run(vm *planapi.VMStatus) (*batch.Job, error) { + r.vm = vm + return r.ensureJob() +} + +// Ensure the job. +func (r *OffloadPluginRunner) ensureJob() (job *batch.Job, err error) { + list := batch.JobList{} + err = r.Destination.Client.List( + context.TODO(), + &list, + &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(r.labels()), + Namespace: r.Plan.Namespace, + }) + if err != nil { + err = liberr.Wrap(err) + return + } + if len(list.Items) == 0 { + job, err = r.job() + if err != nil { + return + } + err = r.Destination.Client.Create(context.TODO(), job) + if err != nil { + err = liberr.Wrap(err) + return + } + r.Log.Info( + "Created (offload plugin) job.", + "job", + path.Join( + job.Namespace, + job.Name)) + } else { + job = &list.Items[0] + r.Log.V(1).Info( + "Found (offload plugin) job.", + "job", + path.Join( + job.Namespace, + job.Name)) + } + + return +} + +// Build the Job. +func (r *OffloadPluginRunner) job() (job *batch.Job, err error) { + secret, err := r.kubevirt.ensureSecret(r.vm.Ref, r.kubevirt.secretDataSetterForCDI(r.vm.Ref), r.labels()) + template := r.template(secret) + backOff := int32(1) + job = &batch.Job{ + Spec: batch.JobSpec{ + Template: *template, + BackoffLimit: &backOff, + }, + ObjectMeta: meta.ObjectMeta{ + Namespace: r.Plan.Namespace, + GenerateName: strings.ToLower( + strings.Join([]string{ + r.vm.ID, + "offloadplugin"}, + "-") + "-"), + Labels: r.labels(), + }, + } + err = k8sutil.SetOwnerReference(r.Plan, job, scheme.Scheme) + if err != nil { + err = liberr.Wrap(err) + return + } + + return +} + +// FIXME: This is just a tmp before we settle on the design in the end we could have multiple maps with multiple images +// we might even have multiple jobs with multiple offload plugins... depends on the mapping and design +func (r *OffloadPluginRunner) getOffloadPluginFromStorageMap() *api.OffloadPlugin { + for _, storageMap := range r.Context.Plan.Map.Storage.Spec.Map { + if storageMap.Destination.OffloadPlugin != nil { + return storageMap.Destination.OffloadPlugin + } + } + return nil +} + +// Build pod template. +func (r *OffloadPluginRunner) template(secret *core.Secret) (template *core.PodTemplateSpec) { + offloadPlugin := r.getOffloadPluginFromStorageMap() + volumes, mounts := r.getVolumesAndMounts(secret) + template = &core.PodTemplateSpec{ + Spec: core.PodSpec{ + RestartPolicy: core.RestartPolicyNever, + Containers: []core.Container{ + { + Name: "offloadplugin", + Image: offloadPlugin.Image, + Env: r.getEnvironments(offloadPlugin), + VolumeMounts: mounts, + }, + }, + Volumes: volumes, + }, + } + + return +} + +// Labels for created resources. +func (r *OffloadPluginRunner) labels() map[string]string { + return map[string]string{ + kPlan: string(r.Plan.UID), + kMigration: string(r.Migration.UID), + kVM: r.vm.ID, + kStep: r.vm.Phase, + } +} + +func (r *OffloadPluginRunner) getEnvironments(offloadPlugin *api.OffloadPlugin) (environments []core.EnvVar) { + environments = append(environments, + core.EnvVar{ + Name: "HOST", + Value: r.Context.Source.Provider.Spec.URL, + }, + core.EnvVar{ + Name: "PLAN_NAME", + Value: r.Context.Plan.Name, + }, + core.EnvVar{ + Name: "NAMESPACE", + Value: r.Context.Plan.Namespace, + }, + ) + for key, val := range offloadPlugin.Vars { + environments = append(environments, + core.EnvVar{ + Name: key, + Value: val, + }) + } + return environments +} + +func (r *OffloadPluginRunner) getVolumesAndMounts(secret *core.Secret) (volumes []core.Volume, mounts []core.VolumeMount) { + var secretVolumeName = "secret-volume" + volumes = append(volumes, + core.Volume{ + Name: secretVolumeName, + VolumeSource: core.VolumeSource{ + Secret: &core.SecretVolumeSource{ + SecretName: secret.Name, + }, + }, + }, + ) + mounts = append(mounts, + core.VolumeMount{ + Name: secretVolumeName, + MountPath: "/etc/secret", + ReadOnly: true, + }) + return volumes, mounts +} diff --git a/pkg/controller/plan/validation.go b/pkg/controller/plan/validation.go index 9800304c2..98f21370f 100644 --- a/pkg/controller/plan/validation.go +++ b/pkg/controller/plan/validation.go @@ -893,6 +893,7 @@ func (r *Reconciler) cancelOtherActiveVddkCheckJobs(plan *api.Plan) (err error) queryLabels := make(map[string]string, 1) queryLabels["plan"] = jobLabels["plan"] + queryLabels["vddk-validation"] = "vddk-validation" delete(queryLabels, "vddk") jobs := &batchv1.JobList{} @@ -976,6 +977,7 @@ func getVddkImageValidationJobLabels(plan *api.Plan) map[string]string { sum := md5.Sum([]byte(image)) return map[string]string{ "plan": string(plan.ObjectMeta.UID), + "step": "vddk-validation", "vddk": hex.EncodeToString(sum[:]), } }