Skip to content

Commit

Permalink
Merge pull request #7558 from qiuming-best/uploader-fast-fail
Browse files Browse the repository at this point in the history
Fix snapshot leak for backup
  • Loading branch information
qiuming-best authored Apr 1, 2024
2 parents c9b41ba + 3d5282e commit 3465e8c
Show file tree
Hide file tree
Showing 12 changed files with 131 additions and 41 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/7558-qiuming-best
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix snapshot leak for backup
4 changes: 2 additions & 2 deletions pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,8 @@ func (s *nodeAgentServer) markInProgressPVBsFailed(client ctrlclient.Client) {
}

if err := controller.UpdatePVBStatusToFailed(s.ctx, client, &pvbs.Items[i],
fmt.Sprintf("get a podvolumebackup with status %q during the server starting, mark it as %q", velerov1api.PodVolumeBackupPhaseInProgress, velerov1api.PodVolumeBackupPhaseFailed),
time.Now(), s.logger); err != nil {
fmt.Errorf("get a podvolumebackup with status %q during the server starting, mark it as %q", velerov1api.PodVolumeBackupPhaseInProgress, velerov1api.PodVolumeBackupPhaseFailed),
"", time.Now(), s.logger); err != nil {
s.logger.WithError(errors.WithStack(err)).Errorf("failed to patch podvolumebackup %q", pvb.GetName())
continue
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/data_upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namesp
}
}

func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace string, duName string, err error) {
func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace, duName string, err error) {
defer r.closeDataPath(ctx, duName)

log := r.logger.WithField("dataupload", duName)
Expand Down Expand Up @@ -698,6 +698,9 @@ func (r *DataUploadReconciler) updateStatusToFailed(ctx context.Context, du *vel
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
}

if dataPathError, ok := err.(datapath.DataPathError); ok {
du.Status.SnapshotID = dataPathError.GetSnapshotID()
}
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if patchErr := r.client.Patch(ctx, du, client.MergeFrom(original)); patchErr != nil {
log.WithError(patchErr).Error("error updating DataUpload status")
Expand Down
13 changes: 8 additions & 5 deletions pkg/controller/pod_volume_backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}

fsBackup, err := r.dataPathMgr.CreateFileSystemBR(pvb.Name, pVBRRequestor, ctx, r.Client, pvb.Namespace, callbacks, log)

if err != nil {
if err == datapath.ConcurrentLimitExceed {
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
Expand Down Expand Up @@ -225,7 +226,7 @@ func (r *PodVolumeBackupReconciler) OnDataPathCompleted(ctx context.Context, nam
log.Info("PodVolumeBackup completed")
}

func (r *PodVolumeBackupReconciler) OnDataPathFailed(ctx context.Context, namespace string, pvbName string, err error) {
func (r *PodVolumeBackupReconciler) OnDataPathFailed(ctx context.Context, namespace, pvbName string, err error) {
defer r.closeDataPath(ctx, pvbName)

log := r.logger.WithField("pvb", pvbName)
Expand Down Expand Up @@ -348,17 +349,19 @@ func (r *PodVolumeBackupReconciler) closeDataPath(ctx context.Context, pvbName s

func (r *PodVolumeBackupReconciler) errorOut(ctx context.Context, pvb *velerov1api.PodVolumeBackup, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) {
r.closeDataPath(ctx, pvb.Name)
_ = UpdatePVBStatusToFailed(ctx, r.Client, pvb, errors.WithMessage(err, msg).Error(), r.clock.Now(), log)
_ = UpdatePVBStatusToFailed(ctx, r.Client, pvb, err, msg, r.clock.Now(), log)

return ctrl.Result{}, err
}

func UpdatePVBStatusToFailed(ctx context.Context, c client.Client, pvb *velerov1api.PodVolumeBackup, errString string, time time.Time, log logrus.FieldLogger) error {
func UpdatePVBStatusToFailed(ctx context.Context, c client.Client, pvb *velerov1api.PodVolumeBackup, errOut error, msg string, time time.Time, log logrus.FieldLogger) error {
original := pvb.DeepCopy()
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed
pvb.Status.Message = errString
pvb.Status.CompletionTimestamp = &metav1.Time{Time: time}

if dataPathError, ok := errOut.(datapath.DataPathError); ok {
pvb.Status.SnapshotID = dataPathError.GetSnapshotID()
}
pvb.Status.Message = errors.WithMessage(errOut, msg).Error()
err := c.Patch(ctx, pvb, client.MergeFrom(original))
if err != nil {
log.WithError(err).Error("error updating PodVolumeBackup status")
Expand Down
33 changes: 33 additions & 0 deletions pkg/datapath/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package datapath

// DataPathError represents an error that occurred during a backup or restore operation
type DataPathError struct {
snapshotID string
err error
}

// Error implements error.
func (e DataPathError) Error() string {
return e.err.Error()
}

// GetSnapshotID returns the snapshot ID for the error.
func (e DataPathError) GetSnapshotID() string {
return e.snapshotID
}
45 changes: 45 additions & 0 deletions pkg/datapath/error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package datapath

import (
"errors"
"testing"
)

func TestGetSnapshotID(t *testing.T) {
// Create a DataPathError instance for testing
err := DataPathError{snapshotID: "123", err: errors.New("example error")}
// Call the GetSnapshotID method to retrieve the snapshot ID
snapshotID := err.GetSnapshotID()
// Check if the retrieved snapshot ID matches the expected value
if snapshotID != "123" {
t.Errorf("GetSnapshotID() returned unexpected snapshot ID: got %s, want %s", snapshotID, "123")
}
}

func TestError(t *testing.T) {
// Create a DataPathError instance for testing
err := DataPathError{snapshotID: "123", err: errors.New("example error")}
// Call the Error method to retrieve the error message
errMsg := err.Error()
// Check if the retrieved error message matches the expected value
expectedErrMsg := "example error"
if errMsg != expectedErrMsg {
t.Errorf("Error() returned unexpected error message: got %s, want %s", errMsg, expectedErrMsg)
}
}
12 changes: 10 additions & 2 deletions pkg/datapath/file_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,11 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, realSource string, paren
if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
} else if err != nil {
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, err)
dataPathErr := DataPathError{
snapshotID: snapshotID,
err: err,
}
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, dataPathErr)
} else {
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source}})
}
Expand All @@ -161,7 +165,11 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint, uplo
if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
} else if err != nil {
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, err)
dataPathErr := DataPathError{
snapshotID: snapshotID,
err: err,
}
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, dataPathErr)
} else {
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Restore: RestoreResult{Target: target}})
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/datapath/file_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestAsyncBackup(t *testing.T) {
var asyncErr error
var asyncResult Result
finish := make(chan struct{})

var failErr = errors.New("fake-fail-error")
tests := []struct {
name string
uploaderProv provider.Provider
Expand All @@ -49,12 +49,12 @@ func TestAsyncBackup(t *testing.T) {
OnCompleted: nil,
OnCancelled: nil,
OnFailed: func(ctx context.Context, namespace string, job string, err error) {
asyncErr = err
asyncErr = failErr
asyncResult = Result{}
finish <- struct{}{}
},
},
err: errors.New("fake-error"),
err: failErr,
},
{
name: "async backup cancel",
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestAsyncRestore(t *testing.T) {
var asyncErr error
var asyncResult Result
finish := make(chan struct{})

var failErr = errors.New("fake-fail-error")
tests := []struct {
name string
uploaderProv provider.Provider
Expand All @@ -133,12 +133,12 @@ func TestAsyncRestore(t *testing.T) {
OnCompleted: nil,
OnCancelled: nil,
OnFailed: func(ctx context.Context, namespace string, job string, err error) {
asyncErr = err
asyncErr = failErr
asyncResult = Result{}
finish <- struct{}{}
},
},
err: errors.New("fake-error"),
err: failErr,
},
{
name: "async restore cancel",
Expand Down
14 changes: 8 additions & 6 deletions pkg/uploader/kopia/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var filesystemEntryFunc = snapshotfs.FilesystemEntryFromIDWithPath
var restoreEntryFunc = restore.Entry

const UploaderConfigMultipartKey = "uploader-multipart"
const MaxErrorReported = 10

// SnapshotUploader which mainly used for UT test that could overwrite Upload interface
type SnapshotUploader interface {
Expand Down Expand Up @@ -182,17 +183,14 @@ func Backup(ctx context.Context, fsUploader SnapshotUploader, repoWriter repo.Re
}

kopiaCtx := kopia.SetupKopiaLog(ctx, log)
snapID, snapshotSize, err := SnapshotSource(kopiaCtx, repoWriter, fsUploader, sourceInfo, sourceEntry, forceFull, parentSnapshot, tags, uploaderCfg, log, "Kopia Uploader")
if err != nil {
return nil, false, err
}

snapID, snapshotSize, err := SnapshotSource(kopiaCtx, repoWriter, fsUploader, sourceInfo, sourceEntry, forceFull, parentSnapshot, tags, uploaderCfg, log, "Kopia Uploader")
snapshotInfo := &uploader.SnapshotInfo{
ID: snapID,
Size: snapshotSize,
}

return snapshotInfo, false, nil
return snapshotInfo, false, err
}

func getLocalFSEntry(path0 string) (fs.Entry, error) {
Expand Down Expand Up @@ -307,6 +305,10 @@ func reportSnapshotStatus(manifest *snapshot.Manifest, policyTree *policy.Tree)
var errs []string
if ds := manifest.RootEntry.DirSummary; ds != nil {
for _, ent := range ds.FailedEntries {
if len(errs) > MaxErrorReported {
errs = append(errs, "too many errors, ignored...")
break
}
policy := policyTree.EffectivePolicy()
if !(policy != nil && bool(*policy.ErrorHandlingPolicy.IgnoreUnknownTypes) && strings.Contains(ent.Error, fs.ErrUnknown.Error())) {
errs = append(errs, fmt.Sprintf("Error when processing %v: %v", ent.EntryPath, ent.Error))
Expand All @@ -315,7 +317,7 @@ func reportSnapshotStatus(manifest *snapshot.Manifest, policyTree *policy.Tree)
}

if len(errs) != 0 {
return "", 0, errors.New(strings.Join(errs, "\n"))
return string(manifestID), snapSize, errors.New(strings.Join(errs, "\n"))
}

return string(manifestID), snapSize, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/uploader/kopia/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ func TestReportSnapshotStatus(t *testing.T) {
},
{
shouldError: true,
expectedResult: "",
expectedSize: 0,
expectedResult: "sample-manifest-id",
expectedSize: 1024,
directorySummary: &fs.DirectorySummary{
FailedEntries: []*fs.EntryWithError{
{
Expand Down
22 changes: 12 additions & 10 deletions pkg/uploader/provider/kopia.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func (kp *kopiaProvider) RunBackup(
progress.Updater = updater
progress.Log = log
kpUploader.Progress = progress
kpUploader.FailFast = true
quit := make(chan struct{})
log.Info("Starting backup")
go kp.CheckContext(ctx, quit, nil, kpUploader)
Expand All @@ -167,19 +168,20 @@ func (kp *kopiaProvider) RunBackup(
uploaderCfg[kopia.UploaderConfigMultipartKey] = "true"
}

snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, realSource, forceFull, parentSnapshot, volMode, uploaderCfg, tags, log)
snapshotInfo, _, err := BackupFunc(ctx, kpUploader, repoWriter, path, realSource, forceFull, parentSnapshot, volMode, uploaderCfg, tags, log)
if err != nil {
if kpUploader.IsCanceled() {
log.Error("Kopia backup is canceled")
return "", false, ErrorCanceled
snapshotID := ""
if snapshotInfo != nil {
snapshotID = snapshotInfo.ID
} else {
return "", false, errors.Wrapf(err, "Failed to run kopia backup")
log.Infof("Kopia backup failed with %v and get empty snapshot ID", err)
}

if kpUploader.IsCanceled() {
log.Warn("Kopia backup is canceled")
return snapshotID, false, ErrorCanceled
}
} else if isSnapshotEmpty {
log.Debugf("Kopia backup got empty dir with path %s", path)
return "", true, nil
} else if snapshotInfo == nil {
return "", false, fmt.Errorf("failed to get kopia backup snapshot info for path %v", path)
return snapshotID, false, errors.Wrapf(err, "Failed to run kopia backup")
}

// which ensure that the statistic data of TotalBytes equal to BytesDone when finished
Expand Down
7 changes: 0 additions & 7 deletions pkg/uploader/provider/kopia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,6 @@ func TestRunBackup(t *testing.T) {
},
notError: false,
},
{
name: "got empty snapshot",
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, uploaderCfg map[string]string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return nil, true, errors.New("snapshot is empty")
},
notError: false,
},
{
name: "success to backup block mode volume",
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, uploaderCfg map[string]string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
Expand Down

0 comments on commit 3465e8c

Please sign in to comment.