Skip to content

Commit

Permalink
Wait for DisksConsolidation
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Necas <[email protected]>
  • Loading branch information
mnecas committed Nov 19, 2024
1 parent 4ae505e commit 08cfe77
Show file tree
Hide file tree
Showing 8 changed files with 181 additions and 34 deletions.
2 changes: 2 additions & 0 deletions pkg/controller/plan/adapter/base/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ type Client interface {
CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (string, error)
// Remove a snapshot.
RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) error
// Check if the VM has a disk consolidation.
CheckDisksConsolidationReady(vmRef ref.Ref, hostsFunc util.HostsFunc) (bool, error)
// Check if a snapshot is ready to transfer.
CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, err error)
// Set DataVolume checkpoints.
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/plan/adapter/ocp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.H
return
}

// Remove a VM snapshot. No-op for this provider.
func (r *Client) CheckDisksConsolidationReady(vmRef ref.Ref, hostsFunc util.HostsFunc) (b bool, err error) {
return true, nil
}

// Get disk deltas for a VM snapshot. No-op for this provider.
func (r *Client) GetSnapshotDeltas(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (s map[string]string, err error) {
return
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/plan/adapter/openstack/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ func (r *Client) SetCheckpoints(vmRef ref.Ref, precopies []planapi.Precopy, data
return nil
}

// Remove a VM snapshot. No-op for this provider.
func (r *Client) CheckDisksConsolidationReady(vmRef ref.Ref, hostsFunc util.HostsFunc) (b bool, err error) {
return true, nil
}

// Remove a VM snapshot. No-op for this provider.
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (err error) {
return
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/plan/adapter/ova/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.H
return
}

// Remove a VM snapshot. No-op for this provider.
func (r *Client) CheckDisksConsolidationReady(vmRef ref.Ref, hostsFunc util.HostsFunc) (b bool, err error) {
return true, nil
}

// Get disk deltas for a VM snapshot. No-op for this provider.
func (r *Client) GetSnapshotDeltas(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (s map[string]string, err error) {
return
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/plan/adapter/ovirt/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.H
return
}

// Remove a VM snapshot. No-op for this provider.
func (r *Client) CheckDisksConsolidationReady(vmRef ref.Ref, hostsFunc util.HostsFunc) (b bool, err error) {
return true, nil
}

// Get disk deltas for a VM snapshot. No-op for this provider.
func (r *Client) GetSnapshotDeltas(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (s map[string]string, err error) {
return
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/plan/adapter/vsphere/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_library(
"//vendor/github.com/vmware/govmomi/object",
"//vendor/github.com/vmware/govmomi/session",
"//vendor/github.com/vmware/govmomi/vim25",
"//vendor/github.com/vmware/govmomi/vim25/methods",
"//vendor/github.com/vmware/govmomi/vim25/mo",
"//vendor/github.com/vmware/govmomi/vim25/soap",
"//vendor/github.com/vmware/govmomi/vim25/types",
Expand Down
108 changes: 106 additions & 2 deletions pkg/controller/plan/adapter/vsphere/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
liburl "net/url"
"sort"
"strconv"

"github.com/konveyor/forklift-controller/pkg/apis/forklift/v1beta1"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/vmware/govmomi/object"
"github.com/vmware/govmomi/session"
"github.com/vmware/govmomi/vim25"
"github.com/vmware/govmomi/vim25/methods"
"github.com/vmware/govmomi/vim25/mo"
"github.com/vmware/govmomi/vim25/soap"
"github.com/vmware/govmomi/vim25/types"
Expand All @@ -27,8 +29,11 @@ import (
)

const (
snapshotName = "forklift-migration-precopy"
snapshotDesc = "Forklift Operator warm migration precopy"
snapshotName = "forklift-migration-precopy"
snapshotDesc = "Forklift Operator warm migration precopy"
vmDiskConsolidatedEvent = "com.vmware.vc.VmDiskConsolidatedEvent"
taskEvent = "vim.event.TaskEvent"
removeSnapshotTaskName = "RemoveSnapshot_Task"
)

// vSphere VM Client
Expand Down Expand Up @@ -75,6 +80,105 @@ func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hosts util.Hosts
return
}

// Remove a VM snapshot. No-op for this provider.
func (r *Client) CheckDisksConsolidationReady(vmRef ref.Ref, hostsFunc util.HostsFunc) (b bool, err error) {
vm, err := r.getVM(vmRef, hostsFunc)
if err != nil {
return
}
// Create an EventFilterSpec for VmDiskConsolidatedEvent and the specific VM
events, err := r.getEvents(vm)
if err != nil {
return false, err
}
snapshotRemovalEvent := r.getLatestSnapshotRemovalEvent(events)
if snapshotRemovalEvent == nil {
r.Log.V(1).Info("No snapshot event found",
"vmRef", vmRef)
return false, nil
}
eventInfo := snapshotRemovalEvent.(*types.TaskEvent).Info
switch eventInfo.State {
// @mnecas note: I was getting TaskInfoStateQueued even after the snapshot was removed and consolidated
case types.TaskInfoStateSuccess, types.TaskInfoStateQueued, types.TaskInfoStateRunning:
r.Log.V(1).Info("Consolidation check", "removalEventState", eventInfo.State,
"vmRef", vmRef)
// If the snapshot removal task finished check the consolidation event
return r.hasSnapshotRemovalConsolidation(vm, snapshotRemovalEvent)
case types.TaskInfoStateError:
r.Log.V(1).Info("The snapshot removal task failed with TaskInfoStateError",
"vmRef", vmRef)
return false, fmt.Errorf("snapshot removal task failed with TaskInfoStateError", eventInfo.Error)
default:
r.Log.V(1).Info("Unknown task state",
"vmRef", vmRef)
return false, fmt.Errorf("unknown task state", "vmRef", vmRef)
}
}

func (r *Client) getEvents(vm *object.VirtualMachine) ([]types.BaseEvent, error) {
// Query for events
filter := types.EventFilterSpec{
EventTypeId: []string{taskEvent},
Entity: &types.EventFilterSpecByEntity{
Entity: vm.Reference(),
Recursion: types.EventFilterSpecRecursionOptionSelf,
},
}
req := types.QueryEvents{
This: r.client.ServiceContent.EventManager.Reference(),
Filter: filter,
}

response, err := methods.QueryEvents(context.TODO(), r.client, &req)
if err != nil {
return nil, err
}
return response.Returnval, err
}

func (r *Client) getLatestSnapshotRemovalEvent(events []types.BaseEvent) types.BaseEvent {
sortedEvents := r.sortEventsByCreationTime(events)
for _, event := range sortedEvents {
eventInfo := event.(*types.TaskEvent).Info
if eventInfo.Name == removeSnapshotTaskName {
return event
}
}
return nil
}

func (r *Client) sortEventsByCreationTime(events []types.BaseEvent) []types.BaseEvent {
sortedEvents := make([]types.BaseEvent, len(events))
copy(sortedEvents, events)
sort.Slice(sortedEvents, func(i, j int) bool {
return sortedEvents[i].GetEvent().CreatedTime.After(sortedEvents[j].GetEvent().CreatedTime)
})
return sortedEvents
}

func (r *Client) hasSnapshotRemovalConsolidation(vm *object.VirtualMachine, snapshotRemovalEvent types.BaseEvent) (bool, error) {
// Query for events
filter := types.EventFilterSpec{
EventTypeId: []string{vmDiskConsolidatedEvent},
Entity: &types.EventFilterSpecByEntity{
Entity: vm.Reference(),
Recursion: types.EventFilterSpecRecursionOptionSelf,
},
EventChainId: snapshotRemovalEvent.GetEvent().ChainId,
}
req := types.QueryEvents{
This: r.client.ServiceContent.EventManager.Reference(),
Filter: filter,
}

response, err := methods.QueryEvents(context.TODO(), r.client, &req)
if err != nil {
return false, err
}
return len(response.Returnval) > 0, nil
}

// Set DataVolume checkpoints.
func (r *Client) SetCheckpoints(vmRef ref.Ref, precopies []planapi.Precopy, datavolumes []cdi.DataVolume, final bool, hosts util.HostsFunc) (err error) {
n := len(precopies)
Expand Down
84 changes: 52 additions & 32 deletions pkg/controller/plan/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,36 +54,38 @@ var (

// Phases.
const (
Started = "Started"
PreHook = "PreHook"
StorePowerState = "StorePowerState"
PowerOffSource = "PowerOffSource"
WaitForPowerOff = "WaitForPowerOff"
CreateDataVolumes = "CreateDataVolumes"
CreateVM = "CreateVM"
CopyDisks = "CopyDisks"
AllocateDisks = "AllocateDisks"
CopyingPaused = "CopyingPaused"
AddCheckpoint = "AddCheckpoint"
AddFinalCheckpoint = "AddFinalCheckpoint"
CreateSnapshot = "CreateSnapshot"
CreateInitialSnapshot = "CreateInitialSnapshot"
CreateFinalSnapshot = "CreateFinalSnapshot"
Finalize = "Finalize"
CreateGuestConversionPod = "CreateGuestConversionPod"
ConvertGuest = "ConvertGuest"
CopyDisksVirtV2V = "CopyDisksVirtV2V"
PostHook = "PostHook"
Completed = "Completed"
WaitForSnapshot = "WaitForSnapshot"
WaitForInitialSnapshot = "WaitForInitialSnapshot"
WaitForFinalSnapshot = "WaitForFinalSnapshot"
ConvertOpenstackSnapshot = "ConvertOpenstackSnapshot"
StoreSnapshotDeltas = "StoreSnapshotDeltas"
StoreInitialSnapshotDeltas = "StoreInitialSnapshotDeltas"
RemovePreviousSnapshot = "RemovePreviousSnapshot"
RemovePenultimateSnapshot = "RemovePenultimateSnapshot"
RemoveFinalSnapshot = "RemoveFinalSnapshot"
Started = "Started"
PreHook = "PreHook"
StorePowerState = "StorePowerState"
PowerOffSource = "PowerOffSource"
WaitForPowerOff = "WaitForPowerOff"
CreateDataVolumes = "CreateDataVolumes"
CreateVM = "CreateVM"
CopyDisks = "CopyDisks"
AllocateDisks = "AllocateDisks"
CopyingPaused = "CopyingPaused"
AddCheckpoint = "AddCheckpoint"
AddFinalCheckpoint = "AddFinalCheckpoint"
CreateSnapshot = "CreateSnapshot"
CreateInitialSnapshot = "CreateInitialSnapshot"
CreateFinalSnapshot = "CreateFinalSnapshot"
Finalize = "Finalize"
CreateGuestConversionPod = "CreateGuestConversionPod"
ConvertGuest = "ConvertGuest"
CopyDisksVirtV2V = "CopyDisksVirtV2V"
PostHook = "PostHook"
Completed = "Completed"
WaitForSnapshot = "WaitForSnapshot"
WaitForInitialSnapshot = "WaitForInitialSnapshot"
WaitForFinalSnapshot = "WaitForFinalSnapshot"
ConvertOpenstackSnapshot = "ConvertOpenstackSnapshot"
StoreSnapshotDeltas = "StoreSnapshotDeltas"
StoreInitialSnapshotDeltas = "StoreInitialSnapshotDeltas"
RemovePreviousSnapshot = "RemovePreviousSnapshot"
WaitForDisksConsolidation = "WaitForDisksConsolidation"
RemovePenultimateSnapshot = "RemovePenultimateSnapshot"
WaitForPenultimateDisksConsolidation = "WaitForPenultimateDisksConsolidation"
RemoveFinalSnapshot = "RemoveFinalSnapshot"
)

// Steps.
Expand Down Expand Up @@ -136,6 +138,7 @@ var (
{Name: CopyDisks},
{Name: CopyingPaused},
{Name: RemovePreviousSnapshot, All: VSphere},
{Name: WaitForDisksConsolidation, All: VSphere},
{Name: CreateSnapshot},
{Name: WaitForSnapshot},
{Name: StoreSnapshotDeltas, All: VSphere},
Expand All @@ -144,6 +147,7 @@ var (
{Name: PowerOffSource},
{Name: WaitForPowerOff},
{Name: RemovePenultimateSnapshot, All: VSphere},
{Name: WaitForPenultimateDisksConsolidation, All: VSphere},
{Name: CreateFinalSnapshot},
{Name: WaitForFinalSnapshot},
{Name: AddFinalCheckpoint},
Expand Down Expand Up @@ -661,9 +665,9 @@ func (r *Migration) step(vm *plan.VMStatus) (step string) {
step = Initialize
case AllocateDisks:
step = DiskAllocation
case CopyDisks, CopyingPaused, RemovePreviousSnapshot, CreateSnapshot, WaitForSnapshot, StoreSnapshotDeltas, AddCheckpoint, ConvertOpenstackSnapshot:
case CopyDisks, CopyingPaused, RemovePreviousSnapshot, WaitForDisksConsolidation, CreateSnapshot, WaitForSnapshot, StoreSnapshotDeltas, AddCheckpoint, ConvertOpenstackSnapshot:
step = DiskTransfer
case RemovePenultimateSnapshot, CreateFinalSnapshot, WaitForFinalSnapshot, AddFinalCheckpoint, Finalize, RemoveFinalSnapshot:
case RemovePenultimateSnapshot, WaitForPenultimateDisksConsolidation, CreateFinalSnapshot, WaitForFinalSnapshot, AddFinalCheckpoint, Finalize, RemoveFinalSnapshot:
step = Cutover
case CreateGuestConversionPod, ConvertGuest:
step = ImageConversion
Expand Down Expand Up @@ -1002,6 +1006,22 @@ func (r *Migration) execute(vm *plan.VMStatus) (err error) {
break
}
vm.Phase = r.next(vm.Phase)
case WaitForDisksConsolidation, WaitForPenultimateDisksConsolidation:
step, found := vm.FindStep(r.step(vm))
if !found {
vm.AddError(fmt.Sprintf("Step '%s' not found", r.step(vm)))
break
}
ready, err := r.provider.CheckDisksConsolidationReady(vm.Ref, r.kubevirt.loadHosts)
if err != nil {
r.Log.Error(err, "Failed to query events")
step.AddError(err.Error())
err = nil
break
}
if ready {
vm.Phase = r.next(vm.Phase)
}
case CreateInitialSnapshot, CreateSnapshot, CreateFinalSnapshot:
step, found := vm.FindStep(r.step(vm))
if !found {
Expand Down

0 comments on commit 08cfe77

Please sign in to comment.