Skip to content

Commit

Permalink
Move waiting for tasks to separate phase to unblock process
Browse files Browse the repository at this point in the history
Issue:
When the Forklift creates the snapshot we need to wait for the task to
finish. Right now we are using task.WaitForResult, this causes the whole
process to wait for the snapshot creation and blocks other VM migrations.
Same problem we have with snapshot removal, if the ESXi host is busy and
we start the snapshot removal the snapshots can take longer than the
reconcile cycle (3s) and we can fail due to it.

Fix:
Instead of using the task.WaitForResult the forklift will start querying
for the latest tasks per VM, by default it's 10 tasks. This querying
will be done in a separate phase then the creation/deletion. So we will
have WaitFor phases for each of the object manipulations.
We find the specific task for the creation/deletion and check its
status. This has the advantage that we are not only getting the status of the
task but in addition also the results of the task, so we can propagate them to
the user, in case the creation/deletion fails.

Ref:
- https://issues.redhat.com/browse/MTV-1753
- https://issues.redhat.com/browse/MTV-1775

Signed-off-by: Martin Necas <[email protected]>
  • Loading branch information
mnecas committed Dec 12, 2024
1 parent 7c33864 commit ac6e41a
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 73 deletions.
1 change: 0 additions & 1 deletion operator/roles/forkliftcontroller/defaults/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ controller_snapshot_removal_timeout_minuts: 120
controller_snapshot_status_check_rate_seconds: 10
controller_cleanup_retries: 10
controller_dv_status_check_retries: 10
controller_snapshot_removal_check_retries: 20
controller_vsphere_incremental_backup: true
controller_ovirt_warm_migration: true
controller_retain_precopy_importer_pods: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ spec:
- name: DV_STATUS_CHECK_RETRIES
value: "{{ controller_dv_status_check_retries }}"
{% endif %}
{% if controller_snapshot_removal_check_retries is number %}
- name: SNAPSHOT_REMOVAL_CHECK_RETRIES
value: "{{ controller_snapshot_removal_check_retries }}"
{% endif %}
{% if controller_max_vm_inflight is number %}
- name: MAX_VM_INFLIGHT
value: "{{ controller_max_vm_inflight }}"
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/plan/adapter/base/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ type Client interface {
// Remove a snapshot.
RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) error
// Check if a snapshot is ready to transfer.
CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, err error)
CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, snapshotId string, err error)
// Check if a snapshot is removed.
CheckSnapshotRemoved(vmRef ref.Ref, snapshot string) (ready bool, err error)
// Set DataVolume checkpoints.
SetCheckpoints(vmRef ref.Ref, precopies []planapi.Precopy, datavolumes []cdi.DataVolume, final bool, hostsFunc util.HostsFunc) (err error)
// Close connections to the provider API.
Expand Down
7 changes: 6 additions & 1 deletion pkg/controller/plan/adapter/ocp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ type Client struct {
}

// CheckSnapshotReady implements base.Client
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (bool, error) {
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, snapshotId string, err error) {
return
}

// CheckSnapshotRemoved implements base.Client
func (r *Client) CheckSnapshotRemoved(vmRef ref.Ref, snapshot string) (bool, error) {
return false, nil
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/controller/plan/adapter/openstack/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,15 @@ func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (imageI
}

// Check if a snapshot is ready to transfer.
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, imageID string) (ready bool, err error) {
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, snapshotId string, err error) {
return
}

// CheckSnapshotRemoved implements base.Client
func (r *Client) CheckSnapshotRemoved(vmRef ref.Ref, snapshot string) (bool, error) {
return false, nil
}

// Set DataVolume checkpoints.
func (r *Client) SetCheckpoints(vmRef ref.Ref, precopies []planapi.Precopy, datavolumes []cdi.DataVolume, final bool, hostsFunc util.HostsFunc) error {
return nil
Expand Down
7 changes: 6 additions & 1 deletion pkg/controller/plan/adapter/ova/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,15 @@ func (r *Client) GetSnapshotDeltas(vmRef ref.Ref, snapshot string, hostsFunc uti
}

// Check if a snapshot is ready to transfer, to avoid importer restarts.
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, err error) {
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, snapshotId string, err error) {
return
}

// CheckSnapshotRemoved implements base.Client
func (r *Client) CheckSnapshotRemoved(vmRef ref.Ref, snapshot string) (bool, error) {
return false, nil
}

// Set DataVolume checkpoints.
func (r *Client) SetCheckpoints(vmRef ref.Ref, precopies []planapi.Precopy, datavolumes []cdi.DataVolume, final bool, hostsFunc util.HostsFunc) (err error) {
return
Expand Down
7 changes: 6 additions & 1 deletion pkg/controller/plan/adapter/ovirt/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (snapsh
}

// Check if a snapshot is ready to transfer, to avoid importer restarts.
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, err error) {
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, snapshotId string, err error) {
correlationID, err := r.getSnapshotCorrelationID(vmRef, &snapshot)
if err != nil {
err = liberr.Wrap(err)
Expand Down Expand Up @@ -104,6 +104,11 @@ func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool,
return
}

// CheckSnapshotRemoved implements base.Client
func (r *Client) CheckSnapshotRemoved(vmRef ref.Ref, snapshot string) (bool, error) {
return false, nil
}

// Remove a VM snapshot. No-op for this provider.
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (err error) {
return
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/plan/adapter/vsphere/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_library(
"//vendor/github.com/vmware/govmomi/find",
"//vendor/github.com/vmware/govmomi/object",
"//vendor/github.com/vmware/govmomi/session",
"//vendor/github.com/vmware/govmomi/task",
"//vendor/github.com/vmware/govmomi/vim25",
"//vendor/github.com/vmware/govmomi/vim25/mo",
"//vendor/github.com/vmware/govmomi/vim25/soap",
Expand Down
94 changes: 79 additions & 15 deletions pkg/controller/plan/adapter/vsphere/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/vmware/govmomi"
"github.com/vmware/govmomi/object"
"github.com/vmware/govmomi/session"
"github.com/vmware/govmomi/task"
"github.com/vmware/govmomi/vim25"
"github.com/vmware/govmomi/vim25/mo"
"github.com/vmware/govmomi/vim25/soap"
Expand All @@ -27,8 +28,11 @@ import (
)

const (
snapshotName = "forklift-migration-precopy"
snapshotDesc = "Forklift Operator warm migration precopy"
snapshotName = "forklift-migration-precopy"
snapshotDesc = "Forklift Operator warm migration precopy"
VirtualMachine = "VirtualMachine"
CreateSnapshotTask = "CreateSnapshot_Task"
RemoveSnapshotTask = "RemoveSnapshot_Task"
)

// vSphere VM Client
Expand All @@ -39,9 +43,9 @@ type Client struct {
}

// Create a VM snapshot and return its ID.
func (r *Client) CreateSnapshot(vmRef ref.Ref, hosts util.HostsFunc) (id string, err error) {
func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (id string, err error) {
r.Log.V(1).Info("Creating snapshot", "vmRef", vmRef)
vm, err := r.getVM(vmRef, hosts)
vm, err := r.getVM(vmRef, hostsFunc)
if err != nil {
return
}
Expand All @@ -50,28 +54,88 @@ func (r *Client) CreateSnapshot(vmRef ref.Ref, hosts util.HostsFunc) (id string,
err = liberr.Wrap(err)
return
}
res, err := task.WaitForResult(context.TODO(), nil)
return task.Common.Reference().Value, nil
}

// Check if a snapshot is ready to transfer.
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, snapshotId string, err error) {
taskInfo, err := r.getLatestTaskByName(vmRef, CreateSnapshotTask)
if err != nil {
err = liberr.Wrap(err)
return
return false, "", liberr.Wrap(err)
}
if taskInfo == nil {
return false, "", nil
}
id = res.Result.(types.ManagedObjectReference).Value
r.Log.Info("Created snapshot", "vmRef", vmRef, "id", id)
ready, err = r.checkTaskStatus(taskInfo)
if err != nil {
return false, "", liberr.Wrap(err)
}
if ready {
return true, taskInfo.Result.(types.ManagedObjectReference).Value, nil
} else {
// The snapshot is not ready, retry the check
return false, "", nil
}
}

return
// Check if a snapshot is removed.
func (r *Client) CheckSnapshotRemoved(vmRef ref.Ref, snapshot string) (ready bool, err error) {
taskInfo, err := r.getLatestTaskByName(vmRef, RemoveSnapshotTask)
if err != nil {
return false, liberr.Wrap(err)
}
if taskInfo == nil {
return false, nil
}
return r.checkTaskStatus(taskInfo)
}

// Check if a snapshot is ready to transfer.
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, err error) {
return true, nil
func (r *Client) checkTaskStatus(taskInfo *types.TaskInfo) (ready bool, err error) {
r.Log.Info("Snapshot task", "task", taskInfo.Task.Value, "name", taskInfo.Name, "status", taskInfo.State)
switch taskInfo.State {
case types.TaskInfoStateSuccess:
return true, nil
case types.TaskInfoStateError:
return false, fmt.Errorf(taskInfo.Error.LocalizedMessage)
default:
return false, nil
}
}

func (r *Client) getLatestTaskByName(vmRef ref.Ref, taskName string) (*types.TaskInfo, error) {
taskManager := task.NewManager(r.client.Client)
taskCollector, err := taskManager.CreateCollectorForTasks(context.TODO(), types.TaskFilterSpec{
Entity: &types.TaskFilterSpecByEntity{
Entity: types.ManagedObjectReference{
Type: VirtualMachine,
Value: vmRef.ID,
},
Recursion: types.TaskFilterSpecRecursionOptionSelf,
},
})
if err != nil {
return nil, err
}
//nolint:errcheck
defer taskCollector.Destroy(context.Background())
tasks, err := taskCollector.LatestPage(context.TODO())
if err != nil {
return nil, err
}
for _, taskInfo := range tasks {
if taskInfo.Name == taskName {
return &taskInfo, nil
}
}
return nil, nil
}

// Remove a VM snapshot.
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hosts util.HostsFunc) (err error) {
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (err error) {
r.Log.V(1).Info("RemoveSnapshot",
"vmRef", vmRef,
"snapshot", snapshot)
err = r.removeSnapshot(vmRef, snapshot, false, hosts)
err = r.removeSnapshot(vmRef, snapshot, false, hostsFunc)
return
}

Expand Down
39 changes: 16 additions & 23 deletions pkg/controller/plan/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ const (
TransferCompleted = "Transfer completed."
PopulatorPodPrefix = "populate-"
DvStatusCheckRetriesAnnotation = "dvStatusCheckRetries"
SnapshotRemovalCheckRetries = "snapshotRemovalCheckRetries"
)

var (
Expand Down Expand Up @@ -1027,27 +1026,15 @@ func (r *Migration) execute(vm *plan.VMStatus) (err error) {
vm.AddError(fmt.Sprintf("Step '%s' not found", r.step(vm)))
break
}
// FIXME: This is just temporary timeout to unblock the migrations which get stuck on issue https://issues.redhat.com/browse/MTV-1753
// This should be fixed properly by adding the task manager inside the inventory and monitor the task status
// from the main controller.
var retries int
retriesAnnotation := step.Annotations[SnapshotRemovalCheckRetries]
if retriesAnnotation == "" {
step.Annotations[SnapshotRemovalCheckRetries] = "1"
} else {
retries, err = strconv.Atoi(retriesAnnotation)
if err != nil {
step.AddError(err.Error())
err = nil
break
}
if retries >= settings.Settings.SnapshotRemovalCheckRetries {
vm.Phase = r.next(vm.Phase)
// Reset for next precopy
step.Annotations[SnapshotRemovalCheckRetries] = "1"
} else {
step.Annotations[SnapshotRemovalCheckRetries] = strconv.Itoa(retries + 1)
}
snapshot := vm.Warm.Precopies[len(vm.Warm.Precopies)-1].Snapshot
ready, err := r.provider.CheckSnapshotRemoved(vm.Ref, snapshot)
if err != nil {
step.AddError(err.Error())
err = nil
break
}
if ready {
vm.Phase = r.next(vm.Phase)
}
case CreateInitialSnapshot, CreateSnapshot, CreateFinalSnapshot:
step, found := vm.FindStep(r.step(vm))
Expand Down Expand Up @@ -1076,12 +1063,18 @@ func (r *Migration) execute(vm *plan.VMStatus) (err error) {
break
}
snapshot := vm.Warm.Precopies[len(vm.Warm.Precopies)-1].Snapshot
ready, err := r.provider.CheckSnapshotReady(vm.Ref, snapshot)
ready, snapshotId, err := r.provider.CheckSnapshotReady(vm.Ref, snapshot)
if err != nil {
step.AddError(err.Error())
err = nil
break
}
// If the provider does not directly create the snapshot, but we need to wait for the snapshot to be created
// We start the creation task in CreateSnapshot, set the task ID as a snapshot id which needs to be replaced
// by the snapshot id after the task finishes.
if snapshotId != "" {
vm.Warm.Precopies[len(vm.Warm.Precopies)-1].Snapshot = snapshotId
}
if ready {
vm.Phase = r.next(vm.Phase)
}
Expand Down
44 changes: 19 additions & 25 deletions pkg/settings/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,25 @@ import (

// Environment variables.
const (
MaxVmInFlight = "MAX_VM_INFLIGHT"
HookRetry = "HOOK_RETRY"
ImporterRetry = "IMPORTER_RETRY"
VirtV2vImage = "VIRT_V2V_IMAGE"
PrecopyInterval = "PRECOPY_INTERVAL"
VirtV2vDontRequestKVM = "VIRT_V2V_DONT_REQUEST_KVM"
SnapshotRemovalTimeout = "SNAPSHOT_REMOVAL_TIMEOUT"
SnapshotStatusCheckRate = "SNAPSHOT_STATUS_CHECK_RATE"
CDIExportTokenTTL = "CDI_EXPORT_TOKEN_TTL"
FileSystemOverhead = "FILESYSTEM_OVERHEAD"
BlockOverhead = "BLOCK_OVERHEAD"
CleanupRetries = "CLEANUP_RETRIES"
DvStatusCheckRetries = "DV_STATUS_CHECK_RETRIES"
SnapshotRemovalCheckRetries = "SNAPSHOT_REMOVAL_CHECK_RETRIES"
OvirtOsConfigMap = "OVIRT_OS_MAP"
VsphereOsConfigMap = "VSPHERE_OS_MAP"
VirtCustomizeConfigMap = "VIRT_CUSTOMIZE_MAP"
VddkJobActiveDeadline = "VDDK_JOB_ACTIVE_DEADLINE"
VirtV2vExtraArgs = "VIRT_V2V_EXTRA_ARGS"
VirtV2vExtraConfConfigMap = "VIRT_V2V_EXTRA_CONF_CONFIG_MAP"
MaxVmInFlight = "MAX_VM_INFLIGHT"
HookRetry = "HOOK_RETRY"
ImporterRetry = "IMPORTER_RETRY"
VirtV2vImage = "VIRT_V2V_IMAGE"
PrecopyInterval = "PRECOPY_INTERVAL"
VirtV2vDontRequestKVM = "VIRT_V2V_DONT_REQUEST_KVM"
SnapshotRemovalTimeout = "SNAPSHOT_REMOVAL_TIMEOUT"
SnapshotStatusCheckRate = "SNAPSHOT_STATUS_CHECK_RATE"
CDIExportTokenTTL = "CDI_EXPORT_TOKEN_TTL"
FileSystemOverhead = "FILESYSTEM_OVERHEAD"
BlockOverhead = "BLOCK_OVERHEAD"
CleanupRetries = "CLEANUP_RETRIES"
DvStatusCheckRetries = "DV_STATUS_CHECK_RETRIES"
OvirtOsConfigMap = "OVIRT_OS_MAP"
VsphereOsConfigMap = "VSPHERE_OS_MAP"
VirtCustomizeConfigMap = "VIRT_CUSTOMIZE_MAP"
VddkJobActiveDeadline = "VDDK_JOB_ACTIVE_DEADLINE"
VirtV2vExtraArgs = "VIRT_V2V_EXTRA_ARGS"
VirtV2vExtraConfConfigMap = "VIRT_V2V_EXTRA_CONF_CONFIG_MAP"
)

// Migration settings
Expand Down Expand Up @@ -62,8 +61,6 @@ type Migration struct {
CleanupRetries int
// DvStatusCheckRetries retries
DvStatusCheckRetries int
// SnapshotRemovalCheckRetries retries
SnapshotRemovalCheckRetries int
// oVirt OS config map name
OvirtOsConfigMap string
// vSphere OS config map name
Expand Down Expand Up @@ -109,9 +106,6 @@ func (r *Migration) Load() (err error) {
if r.DvStatusCheckRetries, err = getPositiveEnvLimit(DvStatusCheckRetries, 10); err != nil {
return liberr.Wrap(err)
}
if r.SnapshotRemovalCheckRetries, err = getPositiveEnvLimit(SnapshotRemovalCheckRetries, 20); err != nil {
return liberr.Wrap(err)
}
if virtV2vImage, ok := os.LookupEnv(VirtV2vImage); ok {
r.VirtV2vImage = virtV2vImage
} else if Settings.Role.Has(MainRole) {
Expand Down

0 comments on commit ac6e41a

Please sign in to comment.