Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ebs br: add retry support for ListSnapshotBlocks() and ListChangedBlocks() #5232

Merged
merged 3 commits into from
Sep 27, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 87 additions & 32 deletions cmd/backup-manager/app/util/backup_size.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/ebs"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/dustin/go-humanize"
Expand Down Expand Up @@ -195,14 +196,14 @@ func calcBackupSize(ctx context.Context, volumes map[string]string, level string
return
}

// calculateSnapshotSize calculate size of an snapshot in bytes by listing its blocks.
// calculateSnapshotSize calculate size of a snapshot in bytes by listing its blocks.
func calculateSnapshotSize(volumeId, snapshotId string) (uint64, uint64, error) {
var snapshotSize uint64
var numApiReq uint64

start := time.Now()

klog.Infof("start to calculate snapshot size for %s, volume id %s",
klog.Infof("start to calculate size for snapshot %s, volume %s",
snapshotId, volumeId)

ebsSession, err := util.NewEBSSession(util.CloudAPIConcurrency)
Expand All @@ -213,24 +214,51 @@ func calculateSnapshotSize(volumeId, snapshotId string) (uint64, uint64, error)

var nextToken *string
for {
resp, err := ebsSession.EBS.ListSnapshotBlocks(&ebs.ListSnapshotBlocksInput{
SnapshotId: aws.String(snapshotId),
MaxResults: aws.Int64(ListSnapMaxReturnResult),
NextToken: nextToken,
})
numApiReq += 1
if err != nil {
return 0, numApiReq, err
// Each retry interval is around 1 second, and no more than 60 times retry (~1 minute)
backoff := wait.Backoff{
Duration: time.Second,
Steps: 60,
Factor: 1.0,
Jitter: 0.1,
}
if resp.BlockSize != nil {
snapshotSize += uint64(len(resp.Blocks)) * uint64(*resp.BlockSize)

isAllListed := false

listBlocks := func() error {
resp, err := ebsSession.EBS.ListSnapshotBlocks(&ebs.ListSnapshotBlocksInput{
SnapshotId: aws.String(snapshotId),
MaxResults: aws.Int64(ListSnapMaxReturnResult),
NextToken: nextToken,
})
numApiReq += 1
if err != nil {
return err
}
if resp.BlockSize != nil {
snapshotSize += uint64(len(resp.Blocks)) * uint64(*resp.BlockSize)
}

// check if there is more to retrieve
if resp.NextToken == nil {
isAllListed = true
}
nextToken = resp.NextToken

return nil
}

// check if there is more to retrieve
if resp.NextToken == nil {
isRetry := func(err error) bool {
BornChanger marked this conversation as resolved.
Show resolved Hide resolved
return request.IsErrorThrottle(err)
}
err = retry.OnError(backoff, isRetry, listBlocks)

if err != nil {
return 0, numApiReq, errors.Annotatef(err, "ListSnapshotBlocks() failed against snapshot id %s, volume id %s", snapshotId, volumeId)
}

if isAllListed {
break
}
nextToken = resp.NextToken
}

elapsed := time.Since(start)
Expand Down Expand Up @@ -261,30 +289,57 @@ func calculateChangedBlocksSize(volumeId, preSnapshotId, snapshotId string) (uin
var nextToken *string

for {
resp, err := ebsSession.EBS.ListChangedBlocks(&ebs.ListChangedBlocksInput{
FirstSnapshotId: aws.String(preSnapshotId),
MaxResults: aws.Int64(ListSnapMaxReturnResult),
SecondSnapshotId: aws.String(snapshotId),
NextToken: nextToken,
})
numApiReq += 1
if err != nil {
return 0, numApiReq, err
// Each retry interval is around 1 second, and no more than 60 times retry (~1 minute)
backoff := wait.Backoff{
Duration: time.Second,
Steps: 60,
Factor: 1.0,
Jitter: 0.1,
}
numBlocks += len(resp.ChangedBlocks)

// retrieve only changed block and blocks only existed in current snapshot (new add blocks)
for _, block := range resp.ChangedBlocks {
if block.SecondBlockToken != nil && resp.BlockSize != nil {
snapshotSize += uint64(*resp.BlockSize)
isAllChangeListed := false

listChangeBlocks := func() error {
resp, err := ebsSession.EBS.ListChangedBlocks(&ebs.ListChangedBlocksInput{
FirstSnapshotId: aws.String(preSnapshotId),
MaxResults: aws.Int64(ListSnapMaxReturnResult),
SecondSnapshotId: aws.String(snapshotId),
NextToken: nextToken,
})
numApiReq += 1
if err != nil {
return err
}
numBlocks += len(resp.ChangedBlocks)

// retrieve only changed block and blocks only existed in current snapshot (new add blocks)
for _, block := range resp.ChangedBlocks {
if block.SecondBlockToken != nil && resp.BlockSize != nil {
snapshotSize += uint64(*resp.BlockSize)
}
}

// check if there is more to retrieve
if resp.NextToken == nil {
isAllChangeListed = true
}
nextToken = resp.NextToken

return nil
}

// check if there is more to retrieve
if resp.NextToken == nil {
isRetry := func(err error) bool {
return request.IsErrorThrottle(err)
}
err = retry.OnError(backoff, isRetry, listChangeBlocks)

if err != nil {
return 0, numApiReq, errors.Annotatef(err, "ListChangedBlocks() failed against volume id %s, preSnapshot id %s, snapshot id %s", volumeId, preSnapshotId, snapshotId)
}

if isAllChangeListed {
break
}
nextToken = resp.NextToken
}

elapsed := time.Since(start)
Expand Down
Loading