Skip to content

Commit

Permalink
feat(restore): control rate limit during download (#4070)
Browse files Browse the repository at this point in the history
* refactor(backup): extract DCLimit to backupspec

As we want to add rate limit to restore, we should
reuse the code already present in the backup pkg.
This commit also makes functions generic.

* feat(scyllaclient): add RcloneGetBandwidthLimit

It's useful for testing purposes.

* feat(restore): add RateLimit to Target

This is the first step to control rate limit in the context of restore.

* feat(scyllaclient): set rate limit in RcloneCopyPaths

This commit requires changes in the usage of RcloneCopyPaths in restore pkg.

* feat(command/restore): add --rate-limit flag

This commit allows user to control rclone server rate limit used during restore.

* feat(scyllaclient): set rate limit in rcloneMoveOrCopyDir

This commit requires changes in the usage of RcloneMoveDir in backup pkg.

* feat(restore_test): extend TestRestoreTablesPreparationIntegration with rate limit

This way this test also checks rate limit before and after backup.
It also checks transfers before, in the middle, when paused,
when resumed, and after restore.

This commit also extends the test to change transfers
and rate limit values when restore is paused, so that
it validates that they are correctly re-set during
restore data stage.
  • Loading branch information
Michal-Leszczynski authored Oct 15, 2024
1 parent 2099e2e commit 8af93cf
Show file tree
Hide file tree
Showing 23 changed files with 335 additions and 215 deletions.
7 changes: 7 additions & 0 deletions docs/source/sctool/partials/sctool_restore.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ options:
usage: |
The maximum number of Scylla restore jobs that can be run at the same time (on different SSTables).
Each node can take part in at most one restore at any given moment.
- name: rate-limit
default_value: '[]'
usage: |
Limits the download rate (as expressed in megabytes (MiB) per second) at which sstables can be downloaded from backup location to Scylla nodes.
You can set limits for more than one DC using a comma-separated list expressed in the format `[<dc>:]<limit>`.
The <dc>: part is optional and is only needed when different datacenters require different download limits.
Set to 0 for no limit (default 0).
- name: restore-schema
default_value: "false"
usage: |
Expand Down
7 changes: 7 additions & 0 deletions docs/source/sctool/partials/sctool_restore_update.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ options:
usage: |
The maximum number of Scylla restore jobs that can be run at the same time (on different SSTables).
Each node can take part in at most one restore at any given moment.
- name: rate-limit
default_value: '[]'
usage: |
Limits the download rate (as expressed in megabytes (MiB) per second) at which sstables can be downloaded from backup location to Scylla nodes.
You can set limits for more than one DC using a comma-separated list expressed in the format `[<dc>:]<limit>`.
The <dc>: part is optional and is only needed when different datacenters require different download limits.
Set to 0 for no limit (default 0).
- name: restore-schema
default_value: "false"
usage: |
Expand Down
6 changes: 6 additions & 0 deletions pkg/command/restore/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type command struct {
batchSize int
parallel int
transfers int
rateLimit []string
allowCompaction bool
restoreSchema bool
restoreTables bool
Expand Down Expand Up @@ -81,6 +82,7 @@ func (cmd *command) init() {
w.Unwrap().IntVar(&cmd.batchSize, "batch-size", 2, "")
w.Unwrap().IntVar(&cmd.parallel, "parallel", 0, "")
w.Unwrap().IntVar(&cmd.transfers, "transfers", 0, "")
w.Unwrap().StringSliceVar(&cmd.rateLimit, "rate-limit", nil, "")
w.Unwrap().BoolVar(&cmd.allowCompaction, "allow-compaction", false, "")
w.Unwrap().BoolVar(&cmd.restoreSchema, "restore-schema", false, "")
w.Unwrap().BoolVar(&cmd.restoreTables, "restore-tables", false, "")
Expand Down Expand Up @@ -152,6 +154,10 @@ func (cmd *command) run(args []string) error {
props["transfers"] = cmd.transfers
ok = true
}
if cmd.Flag("rate-limit").Changed {
props["rate_limit"] = cmd.rateLimit
ok = true
}
if cmd.Flag("allow-compaction").Changed {
props["allow_compaction"] = cmd.allowCompaction
ok = true
Expand Down
6 changes: 6 additions & 0 deletions pkg/command/restore/res.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ transfers: |
Set to 0 for the fastest download (results in setting transfers to 2*node_shard_count).
Set to -1 for using the transfers value defined in node's 'scylla-manager-agent.yaml' config file.
rate-limit: |
Limits the download rate (as expressed in megabytes (MiB) per second) at which sstables can be downloaded from backup location to Scylla nodes.
You can set limits for more than one DC using a comma-separated list expressed in the format `[<dc>:]<limit>`.
The <dc>: part is optional and is only needed when different datacenters require different download limits.
Set to 0 for no limit (default 0).
allow-compaction: |
Defines if auto compactions should be running on Scylla nodes during restore.
Disabling auto compactions decreases restore time duration, but increases compaction workload after the restore is done.
Expand Down
69 changes: 46 additions & 23 deletions pkg/scyllaclient/client_rclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,25 @@ import (
func (c *Client) RcloneSetBandwidthLimit(ctx context.Context, host string, limit int) error {
p := operations.CoreBwlimitParams{
Context: forceHost(ctx, host),
BandwidthRate: &models.Bandwidth{Rate: fmt.Sprintf("%dM", limit)},
BandwidthRate: &models.Bandwidth{Rate: marshallRateLimit(limit)},
}
_, err := c.agentOps.CoreBwlimit(&p) // nolint: errcheck
return err
}

// RcloneGetBandwidthLimit gets bandwidth limit of all the current and future
// transfers performed under current client session.
func (c *Client) RcloneGetBandwidthLimit(ctx context.Context, host string) (string, error) {
p := operations.CoreBwlimitParams{
Context: forceHost(ctx, host),
}
resp, err := c.agentOps.CoreBwlimit(&p)
if err != nil {
return "", err
}
return resp.Payload.Rate, nil
}

// RcloneSetTransfers sets the default amount of transfers on rclone server.
// This change is not persisted after server restart.
// Transfers correspond to the number of file transfers to run in parallel.
Expand Down Expand Up @@ -227,14 +240,22 @@ func (c *Client) rcloneMoveOrCopyFile(ctx context.Context, host, dstRemotePath,
return err
}

const (
// TransfersFromConfig describes transfers value which results in setting transfers
// to the value from host's scylla-manager-agent.yaml config.
TransfersFromConfig = -1
// NoRateLimit describes unlimited rate limit.
NoRateLimit = 0
)

// RcloneMoveDir moves contents of the directory pointed by srcRemotePath to
// the directory pointed by dstRemotePath.
// Remotes need to be registered with the server first.
// Returns ID of the asynchronous job.
// Remote path format is "name:bucket/path".
// If specified, a suffix will be added to otherwise overwritten or deleted files.
func (c *Client) RcloneMoveDir(ctx context.Context, host string, transfers int, dstRemotePath, srcRemotePath, suffix string) (int64, error) {
return c.rcloneMoveOrCopyDir(ctx, host, transfers, dstRemotePath, srcRemotePath, true, suffix)
func (c *Client) RcloneMoveDir(ctx context.Context, host string, transfers, rateLimit int, dstRemotePath, srcRemotePath, suffix string) (int64, error) {
return c.rcloneMoveOrCopyDir(ctx, host, transfers, rateLimit, dstRemotePath, srcRemotePath, true, suffix)
}

// RcloneCopyDir copies contents of the directory pointed by srcRemotePath to
Expand All @@ -243,11 +264,11 @@ func (c *Client) RcloneMoveDir(ctx context.Context, host string, transfers int,
// Returns ID of the asynchronous job.
// Remote path format is "name:bucket/path".
// If specified, a suffix will be added to otherwise overwritten or deleted files.
func (c *Client) RcloneCopyDir(ctx context.Context, host string, transfers int, dstRemotePath, srcRemotePath, suffix string) (int64, error) {
return c.rcloneMoveOrCopyDir(ctx, host, transfers, dstRemotePath, srcRemotePath, false, suffix)
func (c *Client) RcloneCopyDir(ctx context.Context, host string, transfers, rateLimit int, dstRemotePath, srcRemotePath, suffix string) (int64, error) {
return c.rcloneMoveOrCopyDir(ctx, host, transfers, rateLimit, dstRemotePath, srcRemotePath, false, suffix)
}

func (c *Client) rcloneMoveOrCopyDir(ctx context.Context, host string, transfers int, dstRemotePath, srcRemotePath string, doMove bool, suffix string) (int64, error) {
func (c *Client) rcloneMoveOrCopyDir(ctx context.Context, host string, transfers, rateLimit int, dstRemotePath, srcRemotePath string, doMove bool, suffix string) (int64, error) {
dstFs, dstRemote, err := rcloneSplitRemotePath(dstRemotePath)
if err != nil {
return 0, err
Expand All @@ -257,12 +278,13 @@ func (c *Client) rcloneMoveOrCopyDir(ctx context.Context, host string, transfers
return 0, err
}
m := models.MoveOrCopyFileOptions{
DstFs: dstFs,
DstRemote: dstRemote,
SrcFs: srcFs,
SrcRemote: srcRemote,
Suffix: suffix,
Transfers: int64(transfers),
DstFs: dstFs,
DstRemote: dstRemote,
SrcFs: srcFs,
SrcRemote: srcRemote,
Suffix: suffix,
Transfers: int64(transfers),
BandwidthRate: marshallRateLimit(rateLimit),
}

var jobID int64
Expand Down Expand Up @@ -293,15 +315,11 @@ func (c *Client) rcloneMoveOrCopyDir(ctx context.Context, host string, transfers
return jobID, nil
}

// TransfersFromConfig describes transfers value which results in setting transfers
// to the value from host's scylla-manager-agent.yaml config.
const TransfersFromConfig = -1

// RcloneCopyPaths copies paths from srcRemoteDir/path to dstRemoteDir/path.
// Remotes need to be registered with the server first.
// Remote path format is "name:bucket/path".
// Both dstRemoteDir and srRemoteDir must point to a directory.
func (c *Client) RcloneCopyPaths(ctx context.Context, host string, transfers int, dstRemoteDir, srcRemoteDir string, paths []string) (int64, error) {
func (c *Client) RcloneCopyPaths(ctx context.Context, host string, transfers, rateLimit int, dstRemoteDir, srcRemoteDir string, paths []string) (int64, error) {
dstFs, dstRemote, err := rcloneSplitRemotePath(dstRemoteDir)
if err != nil {
return 0, err
Expand All @@ -317,12 +335,13 @@ func (c *Client) RcloneCopyPaths(ctx context.Context, host string, transfers int
p := operations.SyncCopyPathsParams{
Context: forceHost(ctx, host),
Options: &models.CopyPathsOptions{
DstFs: dstFs,
DstRemote: dstRemote,
SrcFs: srcFs,
SrcRemote: srcRemote,
Paths: paths,
Transfers: int64(transfers),
DstFs: dstFs,
DstRemote: dstRemote,
SrcFs: srcFs,
SrcRemote: srcRemote,
Paths: paths,
Transfers: int64(transfers),
BandwidthRate: marshallRateLimit(rateLimit),
},
Async: true,
}
Expand Down Expand Up @@ -738,3 +757,7 @@ func rcloneSplitRemotePath(remotePath string) (fs, path string, err error) {
}
return
}

func marshallRateLimit(limit int) string {
return fmt.Sprintf("%dM", limit)
}
26 changes: 10 additions & 16 deletions pkg/scyllaclient/client_rclone_agent_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestRcloneDeletePathsInBatchesAgentIntegration(t *testing.T) {
t.Fatalf("Create files on Scylla node, err = {%s}, stdOut={%s}, stdErr={%s}", err, stdOut, stdErr)
}
}
id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(dirName), "data:"+dirName, "")
id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, 0, remotePath(dirName), "data:"+dirName, "")
if err != nil {
t.Fatal(errors.Wrap(err, "copy created files to backup location"))
}
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestRcloneSkippingFilesAgentIntegration(t *testing.T) {
if err != nil {
t.Fatal(err)
}
id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(""), "data:tmp/copy", "")
id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, scyllaclient.NoRateLimit, remotePath(""), "data:tmp/copy", "")
if err != nil {
t.Fatal(err)
}
Expand All @@ -223,7 +223,7 @@ func TestRcloneSkippingFilesAgentIntegration(t *testing.T) {
}
}

id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(""), "data:tmp/copy", "")
id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, scyllaclient.NoRateLimit, remotePath(""), "data:tmp/copy", "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -259,9 +259,6 @@ func TestRcloneStoppingTransferIntegration(t *testing.T) {

ctx := context.Background()

if err := client.RcloneSetBandwidthLimit(ctx, testHost, 1); err != nil {
t.Fatal(err)
}
defer func() {
if err := client.RcloneSetBandwidthLimit(ctx, testHost, 0); err != nil {
t.Fatal(err)
Expand All @@ -283,7 +280,7 @@ func TestRcloneStoppingTransferIntegration(t *testing.T) {
}
}()

id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(""), "data:tmp", "")
id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, 1, remotePath(""), "data:tmp", "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -327,9 +324,6 @@ func TestRcloneJobProgressIntegration(t *testing.T) {
S3InitBucket(t, testBucket)
ctx := context.Background()

if err := client.RcloneSetBandwidthLimit(ctx, testHost, 1); err != nil {
t.Fatal(err)
}
defer func() {
if err := client.RcloneSetBandwidthLimit(ctx, testHost, 0); err != nil {
t.Fatal(err)
Expand All @@ -351,7 +345,7 @@ func TestRcloneJobProgressIntegration(t *testing.T) {
}()

Print("When: first batch upload")
id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(""), "data:tmp", "")
id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, 1, remotePath(""), "data:tmp", "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -383,7 +377,7 @@ func TestRcloneJobProgressIntegration(t *testing.T) {
}

Print("When: second batch upload")
id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(""), "data:tmp", "")
id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, scyllaclient.NoRateLimit, remotePath(""), "data:tmp", "")
if err != nil {
t.Fatal(err)
}
Expand All @@ -408,7 +402,7 @@ func TestRcloneJobProgressIntegration(t *testing.T) {
}

Print("When: third batch upload")
id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(""), "data:tmp", "")
id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, scyllaclient.NoRateLimit, remotePath(""), "data:tmp", "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -487,7 +481,7 @@ func TestRcloneSuffixOptionIntegration(t *testing.T) {

Print("Copy src into dst")

id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, dstPath, srcPath, "")
id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, scyllaclient.NoRateLimit, dstPath, srcPath, "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -534,7 +528,7 @@ func TestRcloneSuffixOptionIntegration(t *testing.T) {

Print("Copy src into dst after file modification")

id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, dstPath, srcPath, firstSuffix)
id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, scyllaclient.NoRateLimit, dstPath, srcPath, firstSuffix)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -583,7 +577,7 @@ func TestRcloneSuffixOptionIntegration(t *testing.T) {

Print("Copy src into dst after another file modification")

id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, dstPath, srcPath, secondSuffix)
id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, scyllaclient.NoRateLimit, dstPath, srcPath, secondSuffix)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/scyllaclient/client_rclone_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestRcloneLocalToS3CopyDirIntegration(t *testing.T) {
ctx := context.Background()

copyDir := func(dir string) (*scyllaclient.RcloneJobInfo, error) {
id, err := client.RcloneCopyDir(ctx, scyllaclienttest.TestHost, scyllaclient.TransfersFromConfig, remotePath("/copy"), "rclonetest:"+dir, "")
id, err := client.RcloneCopyDir(ctx, scyllaclienttest.TestHost, scyllaclient.TransfersFromConfig, scyllaclient.NoRateLimit, remotePath("/copy"), "rclonetest:"+dir, "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestRcloneS3ToLocalCopyDirIntegration(t *testing.T) {
defer closeServer()
ctx := context.Background()

id, err := client.RcloneCopyDir(ctx, scyllaclienttest.TestHost, scyllaclient.TransfersFromConfig, "rclonetest:foo", remotePath("/copy"), "")
id, err := client.RcloneCopyDir(ctx, scyllaclienttest.TestHost, scyllaclient.TransfersFromConfig, scyllaclient.NoRateLimit, "rclonetest:foo", remotePath("/copy"), "")
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit 8af93cf

Please sign in to comment.