diff --git a/pkg/controller/plan/adapter/base/doc.go b/pkg/controller/plan/adapter/base/doc.go index 0900f3397..539b45661 100644 --- a/pkg/controller/plan/adapter/base/doc.go +++ b/pkg/controller/plan/adapter/base/doc.go @@ -107,6 +107,8 @@ type Client interface { CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (string, error) // Remove a snapshot. RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) error + // Check if the VM has a disk consolidation. + CheckDisksConsolidationReady(vmRef ref.Ref, hostsFunc util.HostsFunc) (bool, error) // Check if a snapshot is ready to transfer. CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, err error) // Set DataVolume checkpoints. diff --git a/pkg/controller/plan/adapter/ocp/client.go b/pkg/controller/plan/adapter/ocp/client.go index c17cf49a8..7c3e72ac9 100644 --- a/pkg/controller/plan/adapter/ocp/client.go +++ b/pkg/controller/plan/adapter/ocp/client.go @@ -44,6 +44,11 @@ func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.H return } +// Remove a VM snapshot. No-op for this provider. +func (r *Client) CheckDisksConsolidationReady(vmRef ref.Ref, hostsFunc util.HostsFunc) (b bool, err error) { + return true, nil +} + // Get disk deltas for a VM snapshot. No-op for this provider. func (r *Client) GetSnapshotDeltas(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (s map[string]string, err error) { return diff --git a/pkg/controller/plan/adapter/openstack/client.go b/pkg/controller/plan/adapter/openstack/client.go index e8e7a89da..2023cfc5c 100644 --- a/pkg/controller/plan/adapter/openstack/client.go +++ b/pkg/controller/plan/adapter/openstack/client.go @@ -124,6 +124,11 @@ func (r *Client) SetCheckpoints(vmRef ref.Ref, precopies []planapi.Precopy, data return nil } +// Remove a VM snapshot. No-op for this provider. +func (r *Client) CheckDisksConsolidationReady(vmRef ref.Ref, hostsFunc util.HostsFunc) (b bool, err error) { + return true, nil +} + // Remove a VM snapshot. No-op for this provider. func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (err error) { return diff --git a/pkg/controller/plan/adapter/ova/client.go b/pkg/controller/plan/adapter/ova/client.go index c1ef2838d..5be155e3f 100644 --- a/pkg/controller/plan/adapter/ova/client.go +++ b/pkg/controller/plan/adapter/ova/client.go @@ -55,6 +55,11 @@ func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.H return } +// Remove a VM snapshot. No-op for this provider. +func (r *Client) CheckDisksConsolidationReady(vmRef ref.Ref, hostsFunc util.HostsFunc) (b bool, err error) { + return true, nil +} + // Get disk deltas for a VM snapshot. No-op for this provider. func (r *Client) GetSnapshotDeltas(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (s map[string]string, err error) { return diff --git a/pkg/controller/plan/adapter/ovirt/client.go b/pkg/controller/plan/adapter/ovirt/client.go index 4a7a7fec5..c57fc93f8 100644 --- a/pkg/controller/plan/adapter/ovirt/client.go +++ b/pkg/controller/plan/adapter/ovirt/client.go @@ -109,6 +109,11 @@ func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.H return } +// Remove a VM snapshot. No-op for this provider. +func (r *Client) CheckDisksConsolidationReady(vmRef ref.Ref, hostsFunc util.HostsFunc) (b bool, err error) { + return true, nil +} + // Get disk deltas for a VM snapshot. No-op for this provider. func (r *Client) GetSnapshotDeltas(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (s map[string]string, err error) { return diff --git a/pkg/controller/plan/adapter/vsphere/BUILD.bazel b/pkg/controller/plan/adapter/vsphere/BUILD.bazel index c744b9e7c..b943e0d42 100644 --- a/pkg/controller/plan/adapter/vsphere/BUILD.bazel +++ b/pkg/controller/plan/adapter/vsphere/BUILD.bazel @@ -37,6 +37,7 @@ go_library( "//vendor/github.com/vmware/govmomi/object", "//vendor/github.com/vmware/govmomi/session", "//vendor/github.com/vmware/govmomi/vim25", + "//vendor/github.com/vmware/govmomi/vim25/methods", "//vendor/github.com/vmware/govmomi/vim25/mo", "//vendor/github.com/vmware/govmomi/vim25/soap", "//vendor/github.com/vmware/govmomi/vim25/types", diff --git a/pkg/controller/plan/adapter/vsphere/client.go b/pkg/controller/plan/adapter/vsphere/client.go index a48af1af5..10316179f 100644 --- a/pkg/controller/plan/adapter/vsphere/client.go +++ b/pkg/controller/plan/adapter/vsphere/client.go @@ -4,6 +4,7 @@ import ( "context" "fmt" liburl "net/url" + "sort" "strconv" "github.com/konveyor/forklift-controller/pkg/apis/forklift/v1beta1" @@ -17,6 +18,7 @@ import ( "github.com/vmware/govmomi/object" "github.com/vmware/govmomi/session" "github.com/vmware/govmomi/vim25" + "github.com/vmware/govmomi/vim25/methods" "github.com/vmware/govmomi/vim25/mo" "github.com/vmware/govmomi/vim25/soap" "github.com/vmware/govmomi/vim25/types" @@ -27,8 +29,11 @@ import ( ) const ( - snapshotName = "forklift-migration-precopy" - snapshotDesc = "Forklift Operator warm migration precopy" + snapshotName = "forklift-migration-precopy" + snapshotDesc = "Forklift Operator warm migration precopy" + vmDiskConsolidatedEvent = "com.vmware.vc.VmDiskConsolidatedEvent" + taskEvent = "vim.event.TaskEvent" + removeSnapshotTaskName = "RemoveSnapshot_Task" ) // vSphere VM Client @@ -75,6 +80,105 @@ func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hosts util.Hosts return } +// Remove a VM snapshot. No-op for this provider. +func (r *Client) CheckDisksConsolidationReady(vmRef ref.Ref, hostsFunc util.HostsFunc) (b bool, err error) { + vm, err := r.getVM(vmRef, hostsFunc) + if err != nil { + return + } + // Create an EventFilterSpec for VmDiskConsolidatedEvent and the specific VM + events, err := r.getEvents(vm) + if err != nil { + return false, err + } + snapshotRemovalEvent := r.getLatestSnapshotRemovalEvent(events) + if snapshotRemovalEvent == nil { + r.Log.V(1).Info("No snapshot event found", + "vmRef", vmRef) + return false, nil + } + eventInfo := snapshotRemovalEvent.(*types.TaskEvent).Info + switch eventInfo.State { + // @mnecas note: I was getting TaskInfoStateQueued even after the snapshot was removed and consolidated + case types.TaskInfoStateSuccess, types.TaskInfoStateQueued, types.TaskInfoStateRunning: + r.Log.V(1).Info("Consolidation check", "removalEventState", eventInfo.State, + "vmRef", vmRef) + // If the snapshot removal task finished check the consolidation event + return r.hasSnapshotRemovalConsolidation(vm, snapshotRemovalEvent) + case types.TaskInfoStateError: + r.Log.V(1).Info("The snapshot removal task failed with TaskInfoStateError", + "vmRef", vmRef) + return false, fmt.Errorf("snapshot removal task failed with TaskInfoStateError", eventInfo.Error) + default: + r.Log.V(1).Info("Unknown task state", + "vmRef", vmRef) + return false, fmt.Errorf("unknown task state", "vmRef", vmRef) + } +} + +func (r *Client) getEvents(vm *object.VirtualMachine) ([]types.BaseEvent, error) { + // Query for events + filter := types.EventFilterSpec{ + EventTypeId: []string{taskEvent}, + Entity: &types.EventFilterSpecByEntity{ + Entity: vm.Reference(), + Recursion: types.EventFilterSpecRecursionOptionSelf, + }, + } + req := types.QueryEvents{ + This: r.client.ServiceContent.EventManager.Reference(), + Filter: filter, + } + + response, err := methods.QueryEvents(context.TODO(), r.client, &req) + if err != nil { + return nil, err + } + return response.Returnval, err +} + +func (r *Client) getLatestSnapshotRemovalEvent(events []types.BaseEvent) types.BaseEvent { + sortedEvents := r.sortEventsByCreationTime(events) + for _, event := range sortedEvents { + eventInfo := event.(*types.TaskEvent).Info + if eventInfo.Name == removeSnapshotTaskName { + return event + } + } + return nil +} + +func (r *Client) sortEventsByCreationTime(events []types.BaseEvent) []types.BaseEvent { + sortedEvents := make([]types.BaseEvent, len(events)) + copy(sortedEvents, events) + sort.Slice(sortedEvents, func(i, j int) bool { + return sortedEvents[i].GetEvent().CreatedTime.After(sortedEvents[j].GetEvent().CreatedTime) + }) + return sortedEvents +} + +func (r *Client) hasSnapshotRemovalConsolidation(vm *object.VirtualMachine, snapshotRemovalEvent types.BaseEvent) (bool, error) { + // Query for events + filter := types.EventFilterSpec{ + EventTypeId: []string{vmDiskConsolidatedEvent}, + Entity: &types.EventFilterSpecByEntity{ + Entity: vm.Reference(), + Recursion: types.EventFilterSpecRecursionOptionSelf, + }, + EventChainId: snapshotRemovalEvent.GetEvent().ChainId, + } + req := types.QueryEvents{ + This: r.client.ServiceContent.EventManager.Reference(), + Filter: filter, + } + + response, err := methods.QueryEvents(context.TODO(), r.client, &req) + if err != nil { + return false, err + } + return len(response.Returnval) > 0, nil +} + // Set DataVolume checkpoints. func (r *Client) SetCheckpoints(vmRef ref.Ref, precopies []planapi.Precopy, datavolumes []cdi.DataVolume, final bool, hosts util.HostsFunc) (err error) { n := len(precopies) diff --git a/pkg/controller/plan/migration.go b/pkg/controller/plan/migration.go index 5d06b92ec..0342023c1 100644 --- a/pkg/controller/plan/migration.go +++ b/pkg/controller/plan/migration.go @@ -54,36 +54,38 @@ var ( // Phases. const ( - Started = "Started" - PreHook = "PreHook" - StorePowerState = "StorePowerState" - PowerOffSource = "PowerOffSource" - WaitForPowerOff = "WaitForPowerOff" - CreateDataVolumes = "CreateDataVolumes" - CreateVM = "CreateVM" - CopyDisks = "CopyDisks" - AllocateDisks = "AllocateDisks" - CopyingPaused = "CopyingPaused" - AddCheckpoint = "AddCheckpoint" - AddFinalCheckpoint = "AddFinalCheckpoint" - CreateSnapshot = "CreateSnapshot" - CreateInitialSnapshot = "CreateInitialSnapshot" - CreateFinalSnapshot = "CreateFinalSnapshot" - Finalize = "Finalize" - CreateGuestConversionPod = "CreateGuestConversionPod" - ConvertGuest = "ConvertGuest" - CopyDisksVirtV2V = "CopyDisksVirtV2V" - PostHook = "PostHook" - Completed = "Completed" - WaitForSnapshot = "WaitForSnapshot" - WaitForInitialSnapshot = "WaitForInitialSnapshot" - WaitForFinalSnapshot = "WaitForFinalSnapshot" - ConvertOpenstackSnapshot = "ConvertOpenstackSnapshot" - StoreSnapshotDeltas = "StoreSnapshotDeltas" - StoreInitialSnapshotDeltas = "StoreInitialSnapshotDeltas" - RemovePreviousSnapshot = "RemovePreviousSnapshot" - RemovePenultimateSnapshot = "RemovePenultimateSnapshot" - RemoveFinalSnapshot = "RemoveFinalSnapshot" + Started = "Started" + PreHook = "PreHook" + StorePowerState = "StorePowerState" + PowerOffSource = "PowerOffSource" + WaitForPowerOff = "WaitForPowerOff" + CreateDataVolumes = "CreateDataVolumes" + CreateVM = "CreateVM" + CopyDisks = "CopyDisks" + AllocateDisks = "AllocateDisks" + CopyingPaused = "CopyingPaused" + AddCheckpoint = "AddCheckpoint" + AddFinalCheckpoint = "AddFinalCheckpoint" + CreateSnapshot = "CreateSnapshot" + CreateInitialSnapshot = "CreateInitialSnapshot" + CreateFinalSnapshot = "CreateFinalSnapshot" + Finalize = "Finalize" + CreateGuestConversionPod = "CreateGuestConversionPod" + ConvertGuest = "ConvertGuest" + CopyDisksVirtV2V = "CopyDisksVirtV2V" + PostHook = "PostHook" + Completed = "Completed" + WaitForSnapshot = "WaitForSnapshot" + WaitForInitialSnapshot = "WaitForInitialSnapshot" + WaitForFinalSnapshot = "WaitForFinalSnapshot" + ConvertOpenstackSnapshot = "ConvertOpenstackSnapshot" + StoreSnapshotDeltas = "StoreSnapshotDeltas" + StoreInitialSnapshotDeltas = "StoreInitialSnapshotDeltas" + RemovePreviousSnapshot = "RemovePreviousSnapshot" + WaitForDisksConsolidation = "WaitForDisksConsolidation" + RemovePenultimateSnapshot = "RemovePenultimateSnapshot" + WaitForPenultimateDisksConsolidation = "WaitForPenultimateDisksConsolidation" + RemoveFinalSnapshot = "RemoveFinalSnapshot" ) // Steps. @@ -136,6 +138,7 @@ var ( {Name: CopyDisks}, {Name: CopyingPaused}, {Name: RemovePreviousSnapshot, All: VSphere}, + {Name: WaitForDisksConsolidation, All: VSphere}, {Name: CreateSnapshot}, {Name: WaitForSnapshot}, {Name: StoreSnapshotDeltas, All: VSphere}, @@ -144,6 +147,7 @@ var ( {Name: PowerOffSource}, {Name: WaitForPowerOff}, {Name: RemovePenultimateSnapshot, All: VSphere}, + {Name: WaitForPenultimateDisksConsolidation, All: VSphere}, {Name: CreateFinalSnapshot}, {Name: WaitForFinalSnapshot}, {Name: AddFinalCheckpoint}, @@ -661,9 +665,9 @@ func (r *Migration) step(vm *plan.VMStatus) (step string) { step = Initialize case AllocateDisks: step = DiskAllocation - case CopyDisks, CopyingPaused, RemovePreviousSnapshot, CreateSnapshot, WaitForSnapshot, StoreSnapshotDeltas, AddCheckpoint, ConvertOpenstackSnapshot: + case CopyDisks, CopyingPaused, RemovePreviousSnapshot, WaitForDisksConsolidation, CreateSnapshot, WaitForSnapshot, StoreSnapshotDeltas, AddCheckpoint, ConvertOpenstackSnapshot: step = DiskTransfer - case RemovePenultimateSnapshot, CreateFinalSnapshot, WaitForFinalSnapshot, AddFinalCheckpoint, Finalize, RemoveFinalSnapshot: + case RemovePenultimateSnapshot, WaitForPenultimateDisksConsolidation, CreateFinalSnapshot, WaitForFinalSnapshot, AddFinalCheckpoint, Finalize, RemoveFinalSnapshot: step = Cutover case CreateGuestConversionPod, ConvertGuest: step = ImageConversion @@ -1002,6 +1006,22 @@ func (r *Migration) execute(vm *plan.VMStatus) (err error) { break } vm.Phase = r.next(vm.Phase) + case WaitForDisksConsolidation, WaitForPenultimateDisksConsolidation: + step, found := vm.FindStep(r.step(vm)) + if !found { + vm.AddError(fmt.Sprintf("Step '%s' not found", r.step(vm))) + break + } + ready, err := r.provider.CheckDisksConsolidationReady(vm.Ref, r.kubevirt.loadHosts) + if err != nil { + r.Log.Error(err, "Failed to query events") + step.AddError(err.Error()) + err = nil + break + } + if ready { + vm.Phase = r.next(vm.Phase) + } case CreateInitialSnapshot, CreateSnapshot, CreateFinalSnapshot: step, found := vm.FindStep(r.step(vm)) if !found {