Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] MTV-1699 Avoid potentially exceeding memory limit in multi-stage VDDK imports. #3559

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 120 additions & 105 deletions pkg/importer/vddk-datasource_amd64.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,8 +807,9 @@ func logOnError(err error) {

// VDDKDataSource is the data provider for vddk.
type VDDKDataSource struct {
VMware *VMwareClient
BackingFile string
NbdKit *NbdKitWrapper
ChangedBlocks *types.DiskChangeInfo
CurrentSnapshot string
PreviousSnapshot string
Size uint64
Expand All @@ -835,13 +836,12 @@ func createVddkDataSource(endpoint string, accessKey string, secKey string, thum
return nil, errors.New("previous checkpoint set without current")
}

// Log in to VMware, and get everything needed up front
// Log in to VMware to make sure disks and snapshots are present
vmware, err := newVMwareClient(endpoint, accessKey, secKey, thumbprint, uuid)
if err != nil {
klog.Errorf("Unable to log in to VMware: %v", err)
return nil, err
}
defer func() { _ = vmware.Close() }()

// Find disk object for backingFile disk image path
backingFileObject, err := vmware.FindDiskFromName(backingFile)
Expand All @@ -860,80 +860,6 @@ func createVddkDataSource(endpoint string, accessKey string, secKey string, thum
}
}

// If this is a warm migration (current and previous checkpoints set),
// then get the list of changed blocks from VMware for a delta copy.
var changed *types.DiskChangeInfo
if currentSnapshot != nil && previousCheckpoint != "" {
disk, err := vmware.FindSnapshotDisk(currentSnapshot, backingFileObject.DiskObjectId)
if err != nil {
klog.Errorf("Could not find matching disk in current snapshot: %v", err)
return nil, err
}
// QueryChangedDiskAreas needs to be called multiple times to get all possible disk changes.
// Experimentation shows it returns maximally 2000 changed blocks. If the disk has more than
// 2000 changed blocks we need to query the next chunk of the blocks starting from previous.
// Loop until QueryChangedDiskAreas starts returning zero-length block lists.
changed = &types.DiskChangeInfo{}
// Check if this is a snapshot or a change ID, and query disk areas as appropriate.
// Change IDs look like: 52 de c0 d9 b9 43 9d 10-61 d5 4c 1b e9 7b 65 63/81
changeIDPattern := `([0-9a-fA-F]{2}\s?)*-([0-9a-fA-F]{2}\s?)*\/([0-9a-fA-F]*)`
if matched, _ := regexp.MatchString(changeIDPattern, previousCheckpoint); matched {
for {
klog.Infof("Querying changed disk areas at offset %d", changed.Length)
request := types.QueryChangedDiskAreas{
ChangeId: previousCheckpoint,
DeviceKey: backingFileObject.Key,
Snapshot: currentSnapshot,
StartOffset: changed.Length,
This: vmware.vm.Reference(),
}
response, err := QueryChangedDiskAreas(vmware.context, vmware.vm.Client(), &request)
if err != nil {
klog.Errorf("Failed to query changed areas: %s", err)
return nil, err
}
klog.Infof("%d changed areas reported at offset %d with data length %d", len(response.Returnval.ChangedArea), changed.Length, response.Returnval.Length)
if len(response.Returnval.ChangedArea) == 0 { // No more changes
break
}
changed.ChangedArea = append(changed.ChangedArea, response.Returnval.ChangedArea...)
changed.Length += response.Returnval.Length
// The start offset should not be the size of the disk otherwise the QueryChangedDiskAreas will fail
if changed.Length >= disk.CapacityInBytes {
klog.Infof("the offset %d is greater or equal to disk capacity %d", changed.Length, disk.CapacityInBytes)
break
}
}
} else { // Previous checkpoint is a snapshot
previousSnapshot, err := vmware.vm.FindSnapshot(vmware.context, previousCheckpoint)
if err != nil {
klog.Errorf("Could not find previous snapshot %s: %v", previousCheckpoint, err)
return nil, err
}
if previousSnapshot != nil {
for {
klog.Infof("Querying changed disk areas at offset %d", changed.Length)
changedAreas, err := vmware.vm.QueryChangedDiskAreas(vmware.context, previousSnapshot, currentSnapshot, backingFileObject, changed.Length)
if err != nil {
klog.Errorf("Unable to query changed areas: %s", err)
return nil, err
}
klog.Infof("%d changed areas reported at offset %d with data length %d", len(changedAreas.ChangedArea), changed.Length, changedAreas.Length)
if len(changedAreas.ChangedArea) == 0 {
break
}
changed.ChangedArea = append(changed.ChangedArea, changedAreas.ChangedArea...)
changed.Length += changedAreas.Length
// The start offset should not be the size of the disk otherwise the QueryChangedDiskAreas will fail
if changed.Length >= disk.CapacityInBytes {
klog.Infof("the offset %d is greater or equal to disk capacity %d", changed.Length, disk.CapacityInBytes)
break
}
}
}
}
}

diskFileName := backingFile // By default, just set the nbdkit file name to the given backingFile path
if currentSnapshot != nil {
// When copying from a snapshot, set the nbdkit file name to the name of the disk in the snapshot
Expand All @@ -953,17 +879,10 @@ func createVddkDataSource(endpoint string, accessKey string, secKey string, thum

// Get the total transfer size of either the disk or the delta
var size uint64
if changed != nil { // Warm migration: get size of the delta
size = 0
for _, change := range changed.ChangedArea {
size += uint64(change.Length)
}
} else { // Cold migration: get size of the whole disk
size, err = nbdkit.Handle.GetSize()
if err != nil {
klog.Errorf("Unable to get source disk size: %v", err)
return nil, err
}
size, err = nbdkit.Handle.GetSize()
if err != nil {
klog.Errorf("Unable to get source disk size: %v", err)
return nil, err
}

MaxPreadLength = MaxPreadLengthESX
Expand All @@ -973,8 +892,9 @@ func createVddkDataSource(endpoint string, accessKey string, secKey string, thum
}

source := &VDDKDataSource{
VMware: vmware,
BackingFile: backingFile,
NbdKit: nbdkit,
ChangedBlocks: changed,
CurrentSnapshot: currentCheckpoint,
PreviousSnapshot: previousCheckpoint,
Size: size,
Expand Down Expand Up @@ -1023,29 +943,33 @@ func (vs *VDDKDataSource) Transfer(path string) (ProcessingPhase, error) {
return ProcessingPhaseTransferDataFile, nil
}

// IsWarm returns true if this is a multi-stage transfer.
func (vs *VDDKDataSource) IsWarm() bool {
return vs.CurrentSnapshot != ""
}

// IsDeltaCopy is called to determine if this is a full copy or one delta copy stage
// in a warm migration.
// in a warm migration. This is different from IsWarm because the first step is
// a full copy, and subsequent steps are delta copies.
func (vs *VDDKDataSource) IsDeltaCopy() bool {
result := vs.PreviousSnapshot != "" && vs.CurrentSnapshot != ""
return result
}

// Mockable stat, so unit tests can run through TransferFile
var MockableStat = os.Stat

// TransferFile is called to transfer the data from the source to the file passed in.
func (vs *VDDKDataSource) TransferFile(fileName string) (ProcessingPhase, error) {
if !vs.IsDeltaCopy() {
defer func() { _ = vs.VMware.Close() }()

if !vs.IsWarm() {
if err := CleanAll(fileName); err != nil {
return ProcessingPhaseError, err
}
}

if vs.ChangedBlocks != nil { // Warm migration pre-checks
if len(vs.ChangedBlocks.ChangedArea) < 1 { // No changes? Immediately return success.
klog.Infof("No changes reported between snapshot %s and snapshot %s, marking transfer complete.", vs.PreviousSnapshot, vs.CurrentSnapshot)
return ProcessingPhaseComplete, nil
}

// Make sure file exists before applying deltas.
_, err := os.Stat(fileName)
_, err := MockableStat(fileName)
if os.IsNotExist(err) {
klog.Infof("Disk image does not exist, cannot apply deltas for warm migration: %v", err)
return ProcessingPhaseError, err
Expand Down Expand Up @@ -1097,15 +1021,106 @@ func (vs *VDDKDataSource) TransferFile(fileName string) (ProcessingPhase, error)
}
}

if vs.ChangedBlocks != nil { // Warm migration delta copy
for _, extent := range vs.ChangedBlocks.ChangedArea {
blocks := GetBlockStatus(vs.NbdKit.Handle, extent)
for _, block := range blocks {
err := CopyRange(vs.NbdKit.Handle, sink, block, updateProgress)
if vs.IsDeltaCopy() { // Warm migration delta copy
// Find disk object for backingFile disk image path
backingFileObject, err := vs.VMware.FindDiskFromName(vs.BackingFile)
if err != nil {
klog.Errorf("Could not find VM disk %s: %v", vs.BackingFile, err)
return ProcessingPhaseError, err
}

// Find current snapshot object if requested
var currentSnapshot *types.ManagedObjectReference
if vs.CurrentSnapshot != "" {
currentSnapshot, err = vs.VMware.vm.FindSnapshot(vs.VMware.context, vs.CurrentSnapshot)
if err != nil {
klog.Errorf("Could not find current snapshot %s: %v", vs.CurrentSnapshot, err)
return ProcessingPhaseError, err
}
}

disk, err := vs.VMware.FindSnapshotDisk(currentSnapshot, backingFileObject.DiskObjectId)
if err != nil {
klog.Errorf("Could not find matching disk in current snapshot: %v", err)
return ProcessingPhaseError, err
}

// Check if this is a snapshot or a change ID, and query disk areas as appropriate.
// Change IDs look like: 52 de c0 d9 b9 43 9d 10-61 d5 4c 1b e9 7b 65 63/81
changeIDPattern := `([0-9a-fA-F]{2}\s?)*-([0-9a-fA-F]{2}\s?)*\/([0-9a-fA-F]*)`
isChangeID, _ := regexp.MatchString(changeIDPattern, vs.PreviousSnapshot)
var changed types.DiskChangeInfo
var previousSnapshot *types.ManagedObjectReference
if !isChangeID {
previousSnapshot, err = vs.VMware.vm.FindSnapshot(vs.VMware.context, vs.PreviousSnapshot)
if err != nil {
klog.Errorf("Could not find previous snapshot %s: %v", vs.PreviousSnapshot, err)
return ProcessingPhaseError, err
}
if previousSnapshot == nil {
return ProcessingPhaseError, fmt.Errorf("failed to find previous snapshot %s", vs.PreviousSnapshot)
}
}

// QueryChangedDiskAreas needs to be called multiple times to get all possible disk changes.
// Experimentation shows it returns maximally 2000 changed blocks. If the disk has more than
// 2000 changed blocks we need to query the next chunk of the blocks starting from previous.
// Loop until QueryChangedDiskAreas starts returning zero-length block lists.
for {
klog.Infof("Querying changed disk areas at offset %d", changed.Length)
if isChangeID { // Previous checkpoint is a change ID
request := types.QueryChangedDiskAreas{
ChangeId: vs.PreviousSnapshot,
DeviceKey: backingFileObject.Key,
Snapshot: currentSnapshot,
StartOffset: changed.Length,
This: vs.VMware.vm.Reference(),
}
response, err := QueryChangedDiskAreas(vs.VMware.context, vs.VMware.vm.Client(), &request)
if err != nil {
klog.Errorf("Unable to copy block at offset %d: %v", block.Offset, err)
klog.Errorf("Failed to query changed areas: %s", err)
return ProcessingPhaseError, err
}
klog.Infof("%d changed areas reported at offset %d with data length %d", len(response.Returnval.ChangedArea), changed.Length, response.Returnval.Length)
if len(response.Returnval.ChangedArea) == 0 { // No more changes
break
}
changed.ChangedArea = append(changed.ChangedArea, response.Returnval.ChangedArea...)
changed.Length += response.Returnval.Length
} else { // Previous checkpoint is a snapshot
changedAreas, err := vs.VMware.vm.QueryChangedDiskAreas(vs.VMware.context, previousSnapshot, currentSnapshot, backingFileObject, changed.Length)
if err != nil {
klog.Errorf("Unable to query changed areas: %s", err)
return ProcessingPhaseError, err
}
klog.Infof("%d changed areas reported at offset %d with data length %d", len(changedAreas.ChangedArea), changed.Length, changedAreas.Length)
if len(changedAreas.ChangedArea) == 0 {
break
}
changed.ChangedArea = append(changed.ChangedArea, changedAreas.ChangedArea...)
changed.Length += changedAreas.Length
}

// No changes? Immediately return success.
if len(changed.ChangedArea) < 1 {
klog.Infof("No changes reported between snapshot %s and snapshot %s, marking transfer complete.", vs.PreviousSnapshot, vs.CurrentSnapshot)
return ProcessingPhaseComplete, nil
}
// The start offset should not be the size of the disk otherwise the QueryChangedDiskAreas will fail
if changed.Length >= disk.CapacityInBytes {
klog.Infof("the offset %d is greater or equal to disk capacity %d", changed.Length, disk.CapacityInBytes)
break
}
// Copy actual data from query ranges to destination
for _, extent := range changed.ChangedArea {
blocks := GetBlockStatus(vs.NbdKit.Handle, extent)
for _, block := range blocks {
err := CopyRange(vs.NbdKit.Handle, sink, block, updateProgress)
if err != nil {
klog.Errorf("Unable to copy block at offset %d: %v", block.Offset, err)
return ProcessingPhaseError, err
}
}
}
}
} else { // Cold migration full copy
Expand Down
Loading