Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for DisksConsolidation #1197

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/controller/plan/adapter/base/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ type Client interface {
// Create a snapshot of the source VM.
CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (string, error)
// Remove a snapshot.
RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc, consolidate bool) error
RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) error
// Check if the VM has a disk consolidation.
CheckDisksConsolidationReady(vmRef ref.Ref, hostsFunc util.HostsFunc) (bool, error)
// Check if a snapshot is ready to transfer.
CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, err error)
// Set DataVolume checkpoints.
Expand Down
7 changes: 6 additions & 1 deletion pkg/controller/plan/adapter/ocp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,15 @@ func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (string
}

// Remove a VM snapshot. No-op for this provider.
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc, consolidate bool) (err error) {
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (err error) {
return
}

// Remove a VM snapshot. No-op for this provider.
func (r *Client) CheckDisksConsolidationReady(vmRef ref.Ref, hostsFunc util.HostsFunc) (b bool, err error) {
return true, nil
}

// Get disk deltas for a VM snapshot. No-op for this provider.
func (r *Client) GetSnapshotDeltas(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (s map[string]string, err error) {
return
Expand Down
7 changes: 6 additions & 1 deletion pkg/controller/plan/adapter/openstack/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,13 @@ func (r *Client) SetCheckpoints(vmRef ref.Ref, precopies []planapi.Precopy, data
return nil
}

// Check if the VM has a disk consolidation.
func (r *Client) CheckDisksConsolidationReady(vmRef ref.Ref, hostsFunc util.HostsFunc) (b bool, err error) {
return true, nil
}

// Remove a VM snapshot. No-op for this provider.
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc, consolidate bool) (err error) {
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (err error) {
return
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/controller/plan/adapter/ova/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,15 @@ func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (snapsh
}

// Remove a VM snapshot. No-op for this provider.
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc, consolidate bool) (err error) {
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (err error) {
return
}

// Remove a VM snapshot. No-op for this provider.
func (r *Client) CheckDisksConsolidationReady(vmRef ref.Ref, hostsFunc util.HostsFunc) (b bool, err error) {
return true, nil
}

// Get disk deltas for a VM snapshot. No-op for this provider.
func (r *Client) GetSnapshotDeltas(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (s map[string]string, err error) {
return
Expand Down
7 changes: 6 additions & 1 deletion pkg/controller/plan/adapter/ovirt/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,15 @@ func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool,
}

// Remove a VM snapshot. No-op for this provider.
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc, consolidate bool) (err error) {
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (err error) {
return
}

// Remove a VM snapshot. No-op for this provider.
func (r *Client) CheckDisksConsolidationReady(vmRef ref.Ref, hostsFunc util.HostsFunc) (b bool, err error) {
return true, nil
}

// Get disk deltas for a VM snapshot. No-op for this provider.
func (r *Client) GetSnapshotDeltas(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (s map[string]string, err error) {
return
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/plan/adapter/vsphere/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_library(
"//vendor/github.com/vmware/govmomi/object",
"//vendor/github.com/vmware/govmomi/session",
"//vendor/github.com/vmware/govmomi/vim25",
"//vendor/github.com/vmware/govmomi/vim25/methods",
"//vendor/github.com/vmware/govmomi/vim25/mo",
"//vendor/github.com/vmware/govmomi/vim25/soap",
"//vendor/github.com/vmware/govmomi/vim25/types",
Expand Down
115 changes: 110 additions & 5 deletions pkg/controller/plan/adapter/vsphere/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"context"
"fmt"
liburl "net/url"
"sort"
"strconv"

"github.com/konveyor/forklift-controller/pkg/apis/forklift/v1beta1"
Expand All @@ -17,6 +18,7 @@
"github.com/vmware/govmomi/object"
"github.com/vmware/govmomi/session"
"github.com/vmware/govmomi/vim25"
"github.com/vmware/govmomi/vim25/methods"
"github.com/vmware/govmomi/vim25/mo"
"github.com/vmware/govmomi/vim25/soap"
"github.com/vmware/govmomi/vim25/types"
Expand All @@ -27,8 +29,11 @@
)

const (
snapshotName = "forklift-migration-precopy"
snapshotDesc = "Forklift Operator warm migration precopy"
snapshotName = "forklift-migration-precopy"
snapshotDesc = "Forklift Operator warm migration precopy"
vmDiskConsolidatedEvent = "com.vmware.vc.VmDiskConsolidatedEvent"
taskEvent = "vim.event.TaskEvent"
removeSnapshotTaskName = "RemoveSnapshot_Task"
)

// vSphere VM Client
Expand Down Expand Up @@ -67,14 +72,113 @@
}

// Remove a VM snapshot.
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hosts util.HostsFunc, consolidate bool) (err error) {
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hosts util.HostsFunc) (err error) {
r.Log.V(1).Info("RemoveSnapshot",
"vmRef", vmRef,
"snapshot", snapshot)
err = r.removeSnapshot(vmRef, snapshot, false, hosts, consolidate)
err = r.removeSnapshot(vmRef, snapshot, false, hosts)
return
}

// Remove a VM snapshot. No-op for this provider.
func (r *Client) CheckDisksConsolidationReady(vmRef ref.Ref, hostsFunc util.HostsFunc) (b bool, err error) {
vm, err := r.getVM(vmRef, hostsFunc)
if err != nil {
return
}
// Create an EventFilterSpec for VmDiskConsolidatedEvent and the specific VM
events, err := r.getEvents(vm)
if err != nil {
return false, err
}
snapshotRemovalEvent := r.getLatestSnapshotRemovalEvent(events)
if snapshotRemovalEvent == nil {
r.Log.V(1).Info("No snapshot event found",
"vmRef", vmRef)
return false, nil
}
eventInfo := snapshotRemovalEvent.(*types.TaskEvent).Info
switch eventInfo.State {
// @mnecas note: I was getting TaskInfoStateQueued even after the snapshot was removed and consolidated
case types.TaskInfoStateSuccess, types.TaskInfoStateQueued, types.TaskInfoStateRunning:
r.Log.V(1).Info("Consolidation check", "removalEventState", eventInfo.State,
"vmRef", vmRef)
// If the snapshot removal task finished check the consolidation event
return r.hasSnapshotRemovalConsolidation(vm, snapshotRemovalEvent)
case types.TaskInfoStateError:
r.Log.V(1).Info("The snapshot removal task failed with TaskInfoStateError",
"vmRef", vmRef)
return false, fmt.Errorf("snapshot removal task failed with TaskInfoStateError", eventInfo.Error)

Check failure on line 111 in pkg/controller/plan/adapter/vsphere/client.go

View workflow job for this annotation

GitHub Actions / lint

printf: fmt.Errorf call has arguments but no formatting directives (govet)
default:
r.Log.V(1).Info("Unknown task state",
"vmRef", vmRef)
return false, fmt.Errorf("unknown task state", "vmRef", vmRef)

Check failure on line 115 in pkg/controller/plan/adapter/vsphere/client.go

View workflow job for this annotation

GitHub Actions / lint

printf: fmt.Errorf call has arguments but no formatting directives (govet)
}
}

func (r *Client) getEvents(vm *object.VirtualMachine) ([]types.BaseEvent, error) {
// Query for events
filter := types.EventFilterSpec{
EventTypeId: []string{taskEvent},
Entity: &types.EventFilterSpecByEntity{
Entity: vm.Reference(),
Recursion: types.EventFilterSpecRecursionOptionSelf,
},
}
req := types.QueryEvents{
This: r.client.ServiceContent.EventManager.Reference(),
Filter: filter,
}

response, err := methods.QueryEvents(context.TODO(), r.client, &req)
if err != nil {
return nil, err
}
return response.Returnval, err
}

func (r *Client) getLatestSnapshotRemovalEvent(events []types.BaseEvent) types.BaseEvent {
sortedEvents := r.sortEventsByCreationTime(events)
for _, event := range sortedEvents {
eventInfo := event.(*types.TaskEvent).Info
if eventInfo.Name == removeSnapshotTaskName {
return event
}
}
return nil
}

func (r *Client) sortEventsByCreationTime(events []types.BaseEvent) []types.BaseEvent {
sortedEvents := make([]types.BaseEvent, len(events))
copy(sortedEvents, events)
sort.Slice(sortedEvents, func(i, j int) bool {
return sortedEvents[i].GetEvent().CreatedTime.After(sortedEvents[j].GetEvent().CreatedTime)
})
return sortedEvents
}

func (r *Client) hasSnapshotRemovalConsolidation(vm *object.VirtualMachine, snapshotRemovalEvent types.BaseEvent) (bool, error) {
// Query for events
filter := types.EventFilterSpec{
EventTypeId: []string{vmDiskConsolidatedEvent},
Entity: &types.EventFilterSpecByEntity{
Entity: vm.Reference(),
Recursion: types.EventFilterSpecRecursionOptionSelf,
},
EventChainId: snapshotRemovalEvent.GetEvent().ChainId,
}
req := types.QueryEvents{
This: r.client.ServiceContent.EventManager.Reference(),
Filter: filter,
}

response, err := methods.QueryEvents(context.TODO(), r.client, &req)
if err != nil {
return false, err
}
return len(response.Returnval) > 0, nil
}

// Set DataVolume checkpoints.
func (r *Client) SetCheckpoints(vmRef ref.Ref, precopies []planapi.Precopy, datavolumes []cdi.DataVolume, final bool, hosts util.HostsFunc) (err error) {
n := len(precopies)
Expand Down Expand Up @@ -371,7 +475,7 @@
}

// Remove a VM snapshot and optionally its children.
func (r *Client) removeSnapshot(vmRef ref.Ref, snapshot string, children bool, hosts util.HostsFunc, consolidate bool) (err error) {
func (r *Client) removeSnapshot(vmRef ref.Ref, snapshot string, children bool, hosts util.HostsFunc) (err error) {
r.Log.Info("Removing snapshot",
"vmRef", vmRef,
"snapshot", snapshot,
Expand All @@ -381,6 +485,7 @@
if err != nil {
return
}
consolidate := true
_, err = vm.RemoveSnapshot(context.TODO(), snapshot, children, &consolidate)
if err != nil {
err = liberr.Wrap(err)
Expand Down
92 changes: 54 additions & 38 deletions pkg/controller/plan/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,36 +54,38 @@ var (

// Phases.
const (
Started = "Started"
PreHook = "PreHook"
StorePowerState = "StorePowerState"
PowerOffSource = "PowerOffSource"
WaitForPowerOff = "WaitForPowerOff"
CreateDataVolumes = "CreateDataVolumes"
CreateVM = "CreateVM"
CopyDisks = "CopyDisks"
AllocateDisks = "AllocateDisks"
CopyingPaused = "CopyingPaused"
AddCheckpoint = "AddCheckpoint"
AddFinalCheckpoint = "AddFinalCheckpoint"
CreateSnapshot = "CreateSnapshot"
CreateInitialSnapshot = "CreateInitialSnapshot"
CreateFinalSnapshot = "CreateFinalSnapshot"
Finalize = "Finalize"
CreateGuestConversionPod = "CreateGuestConversionPod"
ConvertGuest = "ConvertGuest"
CopyDisksVirtV2V = "CopyDisksVirtV2V"
PostHook = "PostHook"
Completed = "Completed"
WaitForSnapshot = "WaitForSnapshot"
WaitForInitialSnapshot = "WaitForInitialSnapshot"
WaitForFinalSnapshot = "WaitForFinalSnapshot"
ConvertOpenstackSnapshot = "ConvertOpenstackSnapshot"
StoreSnapshotDeltas = "StoreSnapshotDeltas"
StoreInitialSnapshotDeltas = "StoreInitialSnapshotDeltas"
RemovePreviousSnapshot = "RemovePreviousSnapshot"
RemovePenultimateSnapshot = "RemovePenultimateSnapshot"
RemoveFinalSnapshot = "RemoveFinalSnapshot"
Started = "Started"
PreHook = "PreHook"
StorePowerState = "StorePowerState"
PowerOffSource = "PowerOffSource"
WaitForPowerOff = "WaitForPowerOff"
CreateDataVolumes = "CreateDataVolumes"
CreateVM = "CreateVM"
CopyDisks = "CopyDisks"
AllocateDisks = "AllocateDisks"
CopyingPaused = "CopyingPaused"
AddCheckpoint = "AddCheckpoint"
AddFinalCheckpoint = "AddFinalCheckpoint"
CreateSnapshot = "CreateSnapshot"
CreateInitialSnapshot = "CreateInitialSnapshot"
CreateFinalSnapshot = "CreateFinalSnapshot"
Finalize = "Finalize"
CreateGuestConversionPod = "CreateGuestConversionPod"
ConvertGuest = "ConvertGuest"
CopyDisksVirtV2V = "CopyDisksVirtV2V"
PostHook = "PostHook"
Completed = "Completed"
WaitForSnapshot = "WaitForSnapshot"
WaitForInitialSnapshot = "WaitForInitialSnapshot"
WaitForFinalSnapshot = "WaitForFinalSnapshot"
ConvertOpenstackSnapshot = "ConvertOpenstackSnapshot"
StoreSnapshotDeltas = "StoreSnapshotDeltas"
StoreInitialSnapshotDeltas = "StoreInitialSnapshotDeltas"
RemovePreviousSnapshot = "RemovePreviousSnapshot"
WaitForDisksConsolidation = "WaitForDisksConsolidation"
RemovePenultimateSnapshot = "RemovePenultimateSnapshot"
WaitForPenultimateDisksConsolidation = "WaitForPenultimateDisksConsolidation"
RemoveFinalSnapshot = "RemoveFinalSnapshot"
)

// Steps.
Expand Down Expand Up @@ -136,6 +138,7 @@ var (
{Name: CopyDisks},
{Name: CopyingPaused},
{Name: RemovePreviousSnapshot, All: VSphere},
{Name: WaitForDisksConsolidation, All: VSphere},
{Name: CreateSnapshot},
{Name: WaitForSnapshot},
{Name: StoreSnapshotDeltas, All: VSphere},
Expand All @@ -144,6 +147,7 @@ var (
{Name: PowerOffSource},
{Name: WaitForPowerOff},
{Name: RemovePenultimateSnapshot, All: VSphere},
{Name: WaitForPenultimateDisksConsolidation, All: VSphere},
{Name: CreateFinalSnapshot},
{Name: WaitForFinalSnapshot},
{Name: AddFinalCheckpoint},
Expand Down Expand Up @@ -517,7 +521,7 @@ func (r *Migration) removeLastWarmSnapshot(vm *plan.VMStatus) {
return
}
snapshot := vm.Warm.Precopies[n-1].Snapshot
if err := r.provider.RemoveSnapshot(vm.Ref, snapshot, r.kubevirt.loadHosts, true); err != nil {
if err := r.provider.RemoveSnapshot(vm.Ref, snapshot, r.kubevirt.loadHosts); err != nil {
r.Log.Error(
err,
"Failed to clean up warm migration snapshots.",
Expand Down Expand Up @@ -661,9 +665,9 @@ func (r *Migration) step(vm *plan.VMStatus) (step string) {
step = Initialize
case AllocateDisks:
step = DiskAllocation
case CopyDisks, CopyingPaused, RemovePreviousSnapshot, CreateSnapshot, WaitForSnapshot, StoreSnapshotDeltas, AddCheckpoint, ConvertOpenstackSnapshot:
case CopyDisks, CopyingPaused, RemovePreviousSnapshot, WaitForDisksConsolidation, CreateSnapshot, WaitForSnapshot, StoreSnapshotDeltas, AddCheckpoint, ConvertOpenstackSnapshot:
step = DiskTransfer
case RemovePenultimateSnapshot, CreateFinalSnapshot, WaitForFinalSnapshot, AddFinalCheckpoint, Finalize, RemoveFinalSnapshot:
case RemovePenultimateSnapshot, WaitForPenultimateDisksConsolidation, CreateFinalSnapshot, WaitForFinalSnapshot, AddFinalCheckpoint, Finalize, RemoveFinalSnapshot:
step = Cutover
case CreateGuestConversionPod, ConvertGuest:
step = ImageConversion
Expand Down Expand Up @@ -995,17 +999,29 @@ func (r *Migration) execute(vm *plan.VMStatus) (err error) {
break
}
n := len(vm.Warm.Precopies)
consolidate := false
if vm.Phase == RemoveFinalSnapshot {
consolidate = true
}
err = r.provider.RemoveSnapshot(vm.Ref, vm.Warm.Precopies[n-1].Snapshot, r.kubevirt.loadHosts, consolidate)
err = r.provider.RemoveSnapshot(vm.Ref, vm.Warm.Precopies[n-1].Snapshot, r.kubevirt.loadHosts)
if err != nil {
step.AddError(err.Error())
err = nil
break
}
vm.Phase = r.next(vm.Phase)
case WaitForDisksConsolidation, WaitForPenultimateDisksConsolidation:
step, found := vm.FindStep(r.step(vm))
if !found {
vm.AddError(fmt.Sprintf("Step '%s' not found", r.step(vm)))
break
}
ready, err := r.provider.CheckDisksConsolidationReady(vm.Ref, r.kubevirt.loadHosts)
if err != nil {
r.Log.Error(err, "Failed to query events")
step.AddError(err.Error())
err = nil
break
}
if ready {
vm.Phase = r.next(vm.Phase)
}
case CreateInitialSnapshot, CreateSnapshot, CreateFinalSnapshot:
step, found := vm.FindStep(r.step(vm))
if !found {
Expand Down
Loading