diff --git a/cmd/backup-manager/app/util/backup_size.go b/cmd/backup-manager/app/util/backup_size.go index 5489e2d590..65c5b287de 100644 --- a/cmd/backup-manager/app/util/backup_size.go +++ b/cmd/backup-manager/app/util/backup_size.go @@ -23,6 +23,7 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/service/ebs" "github.com/aws/aws-sdk-go/service/ec2" "github.com/dustin/go-humanize" @@ -195,14 +196,14 @@ func calcBackupSize(ctx context.Context, volumes map[string]string, level string return } -// calculateSnapshotSize calculate size of an snapshot in bytes by listing its blocks. +// calculateSnapshotSize calculate size of a snapshot in bytes by listing its blocks. func calculateSnapshotSize(volumeId, snapshotId string) (uint64, uint64, error) { var snapshotSize uint64 var numApiReq uint64 start := time.Now() - klog.Infof("start to calculate snapshot size for %s, volume id %s", + klog.Infof("start to calculate size for snapshot %s, volume %s", snapshotId, volumeId) ebsSession, err := util.NewEBSSession(util.CloudAPIConcurrency) @@ -213,24 +214,51 @@ func calculateSnapshotSize(volumeId, snapshotId string) (uint64, uint64, error) var nextToken *string for { - resp, err := ebsSession.EBS.ListSnapshotBlocks(&ebs.ListSnapshotBlocksInput{ - SnapshotId: aws.String(snapshotId), - MaxResults: aws.Int64(ListSnapMaxReturnResult), - NextToken: nextToken, - }) - numApiReq += 1 - if err != nil { - return 0, numApiReq, err + // Each retry interval is around 1 second, and no more than 60 times retry (~1 minute) + backoff := wait.Backoff{ + Duration: time.Second, + Steps: 60, + Factor: 1.0, + Jitter: 0.1, } - if resp.BlockSize != nil { - snapshotSize += uint64(len(resp.Blocks)) * uint64(*resp.BlockSize) + + isAllListed := false + + listBlocks := func() error { + resp, err := ebsSession.EBS.ListSnapshotBlocks(&ebs.ListSnapshotBlocksInput{ + SnapshotId: aws.String(snapshotId), + MaxResults: aws.Int64(ListSnapMaxReturnResult), + NextToken: nextToken, + }) + numApiReq += 1 + if err != nil { + return err + } + if resp.BlockSize != nil { + snapshotSize += uint64(len(resp.Blocks)) * uint64(*resp.BlockSize) + } + + // check if there is more to retrieve + if resp.NextToken == nil { + isAllListed = true + } + nextToken = resp.NextToken + + return nil } - // check if there is more to retrieve - if resp.NextToken == nil { + isRetry := func(err error) bool { + return request.IsErrorThrottle(err) + } + err = retry.OnError(backoff, isRetry, listBlocks) + + if err != nil { + return 0, numApiReq, errors.Annotatef(err, "ListSnapshotBlocks() failed against snapshot id %s, volume id %s", snapshotId, volumeId) + } + + if isAllListed { break } - nextToken = resp.NextToken } elapsed := time.Since(start) @@ -261,30 +289,57 @@ func calculateChangedBlocksSize(volumeId, preSnapshotId, snapshotId string) (uin var nextToken *string for { - resp, err := ebsSession.EBS.ListChangedBlocks(&ebs.ListChangedBlocksInput{ - FirstSnapshotId: aws.String(preSnapshotId), - MaxResults: aws.Int64(ListSnapMaxReturnResult), - SecondSnapshotId: aws.String(snapshotId), - NextToken: nextToken, - }) - numApiReq += 1 - if err != nil { - return 0, numApiReq, err + // Each retry interval is around 1 second, and no more than 60 times retry (~1 minute) + backoff := wait.Backoff{ + Duration: time.Second, + Steps: 60, + Factor: 1.0, + Jitter: 0.1, } - numBlocks += len(resp.ChangedBlocks) - // retrieve only changed block and blocks only existed in current snapshot (new add blocks) - for _, block := range resp.ChangedBlocks { - if block.SecondBlockToken != nil && resp.BlockSize != nil { - snapshotSize += uint64(*resp.BlockSize) + isAllChangeListed := false + + listChangeBlocks := func() error { + resp, err := ebsSession.EBS.ListChangedBlocks(&ebs.ListChangedBlocksInput{ + FirstSnapshotId: aws.String(preSnapshotId), + MaxResults: aws.Int64(ListSnapMaxReturnResult), + SecondSnapshotId: aws.String(snapshotId), + NextToken: nextToken, + }) + numApiReq += 1 + if err != nil { + return err + } + numBlocks += len(resp.ChangedBlocks) + + // retrieve only changed block and blocks only existed in current snapshot (new add blocks) + for _, block := range resp.ChangedBlocks { + if block.SecondBlockToken != nil && resp.BlockSize != nil { + snapshotSize += uint64(*resp.BlockSize) + } + } + + // check if there is more to retrieve + if resp.NextToken == nil { + isAllChangeListed = true } + nextToken = resp.NextToken + + return nil } - // check if there is more to retrieve - if resp.NextToken == nil { + isRetry := func(err error) bool { + return request.IsErrorThrottle(err) + } + err = retry.OnError(backoff, isRetry, listChangeBlocks) + + if err != nil { + return 0, numApiReq, errors.Annotatef(err, "ListChangedBlocks() failed against volume id %s, preSnapshot id %s, snapshot id %s", volumeId, preSnapshotId, snapshotId) + } + + if isAllChangeListed { break } - nextToken = resp.NextToken } elapsed := time.Since(start)