diff --git a/docs/source/sctool/partials/sctool_restore.yaml b/docs/source/sctool/partials/sctool_restore.yaml index 6c709b1bb..18d0620b1 100644 --- a/docs/source/sctool/partials/sctool_restore.yaml +++ b/docs/source/sctool/partials/sctool_restore.yaml @@ -107,6 +107,13 @@ options: usage: | The maximum number of Scylla restore jobs that can be run at the same time (on different SSTables). Each node can take part in at most one restore at any given moment. + - name: rate-limit + default_value: '[]' + usage: | + Limits the download rate (as expressed in megabytes (MiB) per second) at which sstables can be downloaded from backup location to Scylla nodes. + You can set limits for more than one DC using a comma-separated list expressed in the format `[:]`. + The : part is optional and is only needed when different datacenters require different download limits. + Set to 0 for no limit (default 0). - name: restore-schema default_value: "false" usage: | diff --git a/docs/source/sctool/partials/sctool_restore_update.yaml b/docs/source/sctool/partials/sctool_restore_update.yaml index 3e9e3b8fd..44b79e79c 100644 --- a/docs/source/sctool/partials/sctool_restore_update.yaml +++ b/docs/source/sctool/partials/sctool_restore_update.yaml @@ -105,6 +105,13 @@ options: usage: | The maximum number of Scylla restore jobs that can be run at the same time (on different SSTables). Each node can take part in at most one restore at any given moment. + - name: rate-limit + default_value: '[]' + usage: | + Limits the download rate (as expressed in megabytes (MiB) per second) at which sstables can be downloaded from backup location to Scylla nodes. + You can set limits for more than one DC using a comma-separated list expressed in the format `[:]`. + The : part is optional and is only needed when different datacenters require different download limits. + Set to 0 for no limit (default 0). - name: restore-schema default_value: "false" usage: | diff --git a/pkg/command/restore/cmd.go b/pkg/command/restore/cmd.go index c830dce5f..dcd82dda3 100644 --- a/pkg/command/restore/cmd.go +++ b/pkg/command/restore/cmd.go @@ -30,6 +30,7 @@ type command struct { batchSize int parallel int transfers int + rateLimit []string allowCompaction bool restoreSchema bool restoreTables bool @@ -81,6 +82,7 @@ func (cmd *command) init() { w.Unwrap().IntVar(&cmd.batchSize, "batch-size", 2, "") w.Unwrap().IntVar(&cmd.parallel, "parallel", 0, "") w.Unwrap().IntVar(&cmd.transfers, "transfers", 0, "") + w.Unwrap().StringSliceVar(&cmd.rateLimit, "rate-limit", nil, "") w.Unwrap().BoolVar(&cmd.allowCompaction, "allow-compaction", false, "") w.Unwrap().BoolVar(&cmd.restoreSchema, "restore-schema", false, "") w.Unwrap().BoolVar(&cmd.restoreTables, "restore-tables", false, "") @@ -152,6 +154,10 @@ func (cmd *command) run(args []string) error { props["transfers"] = cmd.transfers ok = true } + if cmd.Flag("rate-limit").Changed { + props["rate_limit"] = cmd.rateLimit + ok = true + } if cmd.Flag("allow-compaction").Changed { props["allow_compaction"] = cmd.allowCompaction ok = true diff --git a/pkg/command/restore/res.yaml b/pkg/command/restore/res.yaml index 3b4a67cd0..a780671f8 100644 --- a/pkg/command/restore/res.yaml +++ b/pkg/command/restore/res.yaml @@ -38,6 +38,12 @@ transfers: | Set to 0 for the fastest download (results in setting transfers to 2*node_shard_count). Set to -1 for using the transfers value defined in node's 'scylla-manager-agent.yaml' config file. +rate-limit: | + Limits the download rate (as expressed in megabytes (MiB) per second) at which sstables can be downloaded from backup location to Scylla nodes. + You can set limits for more than one DC using a comma-separated list expressed in the format `[:]`. + The : part is optional and is only needed when different datacenters require different download limits. + Set to 0 for no limit (default 0). + allow-compaction: | Defines if auto compactions should be running on Scylla nodes during restore. Disabling auto compactions decreases restore time duration, but increases compaction workload after the restore is done. diff --git a/pkg/scyllaclient/client_rclone.go b/pkg/scyllaclient/client_rclone.go index ae22e03fd..8b8ae6854 100644 --- a/pkg/scyllaclient/client_rclone.go +++ b/pkg/scyllaclient/client_rclone.go @@ -27,12 +27,25 @@ import ( func (c *Client) RcloneSetBandwidthLimit(ctx context.Context, host string, limit int) error { p := operations.CoreBwlimitParams{ Context: forceHost(ctx, host), - BandwidthRate: &models.Bandwidth{Rate: fmt.Sprintf("%dM", limit)}, + BandwidthRate: &models.Bandwidth{Rate: marshallRateLimit(limit)}, } _, err := c.agentOps.CoreBwlimit(&p) // nolint: errcheck return err } +// RcloneGetBandwidthLimit gets bandwidth limit of all the current and future +// transfers performed under current client session. +func (c *Client) RcloneGetBandwidthLimit(ctx context.Context, host string) (string, error) { + p := operations.CoreBwlimitParams{ + Context: forceHost(ctx, host), + } + resp, err := c.agentOps.CoreBwlimit(&p) + if err != nil { + return "", err + } + return resp.Payload.Rate, nil +} + // RcloneSetTransfers sets the default amount of transfers on rclone server. // This change is not persisted after server restart. // Transfers correspond to the number of file transfers to run in parallel. @@ -227,14 +240,22 @@ func (c *Client) rcloneMoveOrCopyFile(ctx context.Context, host, dstRemotePath, return err } +const ( + // TransfersFromConfig describes transfers value which results in setting transfers + // to the value from host's scylla-manager-agent.yaml config. + TransfersFromConfig = -1 + // NoRateLimit describes unlimited rate limit. + NoRateLimit = 0 +) + // RcloneMoveDir moves contents of the directory pointed by srcRemotePath to // the directory pointed by dstRemotePath. // Remotes need to be registered with the server first. // Returns ID of the asynchronous job. // Remote path format is "name:bucket/path". // If specified, a suffix will be added to otherwise overwritten or deleted files. -func (c *Client) RcloneMoveDir(ctx context.Context, host string, transfers int, dstRemotePath, srcRemotePath, suffix string) (int64, error) { - return c.rcloneMoveOrCopyDir(ctx, host, transfers, dstRemotePath, srcRemotePath, true, suffix) +func (c *Client) RcloneMoveDir(ctx context.Context, host string, transfers, rateLimit int, dstRemotePath, srcRemotePath, suffix string) (int64, error) { + return c.rcloneMoveOrCopyDir(ctx, host, transfers, rateLimit, dstRemotePath, srcRemotePath, true, suffix) } // RcloneCopyDir copies contents of the directory pointed by srcRemotePath to @@ -243,11 +264,11 @@ func (c *Client) RcloneMoveDir(ctx context.Context, host string, transfers int, // Returns ID of the asynchronous job. // Remote path format is "name:bucket/path". // If specified, a suffix will be added to otherwise overwritten or deleted files. -func (c *Client) RcloneCopyDir(ctx context.Context, host string, transfers int, dstRemotePath, srcRemotePath, suffix string) (int64, error) { - return c.rcloneMoveOrCopyDir(ctx, host, transfers, dstRemotePath, srcRemotePath, false, suffix) +func (c *Client) RcloneCopyDir(ctx context.Context, host string, transfers, rateLimit int, dstRemotePath, srcRemotePath, suffix string) (int64, error) { + return c.rcloneMoveOrCopyDir(ctx, host, transfers, rateLimit, dstRemotePath, srcRemotePath, false, suffix) } -func (c *Client) rcloneMoveOrCopyDir(ctx context.Context, host string, transfers int, dstRemotePath, srcRemotePath string, doMove bool, suffix string) (int64, error) { +func (c *Client) rcloneMoveOrCopyDir(ctx context.Context, host string, transfers, rateLimit int, dstRemotePath, srcRemotePath string, doMove bool, suffix string) (int64, error) { dstFs, dstRemote, err := rcloneSplitRemotePath(dstRemotePath) if err != nil { return 0, err @@ -257,12 +278,13 @@ func (c *Client) rcloneMoveOrCopyDir(ctx context.Context, host string, transfers return 0, err } m := models.MoveOrCopyFileOptions{ - DstFs: dstFs, - DstRemote: dstRemote, - SrcFs: srcFs, - SrcRemote: srcRemote, - Suffix: suffix, - Transfers: int64(transfers), + DstFs: dstFs, + DstRemote: dstRemote, + SrcFs: srcFs, + SrcRemote: srcRemote, + Suffix: suffix, + Transfers: int64(transfers), + BandwidthRate: marshallRateLimit(rateLimit), } var jobID int64 @@ -293,15 +315,11 @@ func (c *Client) rcloneMoveOrCopyDir(ctx context.Context, host string, transfers return jobID, nil } -// TransfersFromConfig describes transfers value which results in setting transfers -// to the value from host's scylla-manager-agent.yaml config. -const TransfersFromConfig = -1 - // RcloneCopyPaths copies paths from srcRemoteDir/path to dstRemoteDir/path. // Remotes need to be registered with the server first. // Remote path format is "name:bucket/path". // Both dstRemoteDir and srRemoteDir must point to a directory. -func (c *Client) RcloneCopyPaths(ctx context.Context, host string, transfers int, dstRemoteDir, srcRemoteDir string, paths []string) (int64, error) { +func (c *Client) RcloneCopyPaths(ctx context.Context, host string, transfers, rateLimit int, dstRemoteDir, srcRemoteDir string, paths []string) (int64, error) { dstFs, dstRemote, err := rcloneSplitRemotePath(dstRemoteDir) if err != nil { return 0, err @@ -317,12 +335,13 @@ func (c *Client) RcloneCopyPaths(ctx context.Context, host string, transfers int p := operations.SyncCopyPathsParams{ Context: forceHost(ctx, host), Options: &models.CopyPathsOptions{ - DstFs: dstFs, - DstRemote: dstRemote, - SrcFs: srcFs, - SrcRemote: srcRemote, - Paths: paths, - Transfers: int64(transfers), + DstFs: dstFs, + DstRemote: dstRemote, + SrcFs: srcFs, + SrcRemote: srcRemote, + Paths: paths, + Transfers: int64(transfers), + BandwidthRate: marshallRateLimit(rateLimit), }, Async: true, } @@ -738,3 +757,7 @@ func rcloneSplitRemotePath(remotePath string) (fs, path string, err error) { } return } + +func marshallRateLimit(limit int) string { + return fmt.Sprintf("%dM", limit) +} diff --git a/pkg/scyllaclient/client_rclone_agent_integration_test.go b/pkg/scyllaclient/client_rclone_agent_integration_test.go index 87dc8d174..b2db38af6 100644 --- a/pkg/scyllaclient/client_rclone_agent_integration_test.go +++ b/pkg/scyllaclient/client_rclone_agent_integration_test.go @@ -123,7 +123,7 @@ func TestRcloneDeletePathsInBatchesAgentIntegration(t *testing.T) { t.Fatalf("Create files on Scylla node, err = {%s}, stdOut={%s}, stdErr={%s}", err, stdOut, stdErr) } } - id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(dirName), "data:"+dirName, "") + id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, 0, remotePath(dirName), "data:"+dirName, "") if err != nil { t.Fatal(errors.Wrap(err, "copy created files to backup location")) } @@ -200,7 +200,7 @@ func TestRcloneSkippingFilesAgentIntegration(t *testing.T) { if err != nil { t.Fatal(err) } - id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(""), "data:tmp/copy", "") + id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, scyllaclient.NoRateLimit, remotePath(""), "data:tmp/copy", "") if err != nil { t.Fatal(err) } @@ -223,7 +223,7 @@ func TestRcloneSkippingFilesAgentIntegration(t *testing.T) { } } - id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(""), "data:tmp/copy", "") + id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, scyllaclient.NoRateLimit, remotePath(""), "data:tmp/copy", "") if err != nil { t.Fatal(err) } @@ -259,9 +259,6 @@ func TestRcloneStoppingTransferIntegration(t *testing.T) { ctx := context.Background() - if err := client.RcloneSetBandwidthLimit(ctx, testHost, 1); err != nil { - t.Fatal(err) - } defer func() { if err := client.RcloneSetBandwidthLimit(ctx, testHost, 0); err != nil { t.Fatal(err) @@ -283,7 +280,7 @@ func TestRcloneStoppingTransferIntegration(t *testing.T) { } }() - id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(""), "data:tmp", "") + id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, 1, remotePath(""), "data:tmp", "") if err != nil { t.Fatal(err) } @@ -327,9 +324,6 @@ func TestRcloneJobProgressIntegration(t *testing.T) { S3InitBucket(t, testBucket) ctx := context.Background() - if err := client.RcloneSetBandwidthLimit(ctx, testHost, 1); err != nil { - t.Fatal(err) - } defer func() { if err := client.RcloneSetBandwidthLimit(ctx, testHost, 0); err != nil { t.Fatal(err) @@ -351,7 +345,7 @@ func TestRcloneJobProgressIntegration(t *testing.T) { }() Print("When: first batch upload") - id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(""), "data:tmp", "") + id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, 1, remotePath(""), "data:tmp", "") if err != nil { t.Fatal(err) } @@ -383,7 +377,7 @@ func TestRcloneJobProgressIntegration(t *testing.T) { } Print("When: second batch upload") - id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(""), "data:tmp", "") + id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, scyllaclient.NoRateLimit, remotePath(""), "data:tmp", "") if err != nil { t.Fatal(err) } @@ -408,7 +402,7 @@ func TestRcloneJobProgressIntegration(t *testing.T) { } Print("When: third batch upload") - id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(""), "data:tmp", "") + id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, scyllaclient.NoRateLimit, remotePath(""), "data:tmp", "") if err != nil { t.Fatal(err) } @@ -487,7 +481,7 @@ func TestRcloneSuffixOptionIntegration(t *testing.T) { Print("Copy src into dst") - id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, dstPath, srcPath, "") + id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, scyllaclient.NoRateLimit, dstPath, srcPath, "") if err != nil { t.Fatal(err) } @@ -534,7 +528,7 @@ func TestRcloneSuffixOptionIntegration(t *testing.T) { Print("Copy src into dst after file modification") - id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, dstPath, srcPath, firstSuffix) + id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, scyllaclient.NoRateLimit, dstPath, srcPath, firstSuffix) if err != nil { t.Fatal(err) } @@ -583,7 +577,7 @@ func TestRcloneSuffixOptionIntegration(t *testing.T) { Print("Copy src into dst after another file modification") - id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, dstPath, srcPath, secondSuffix) + id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, scyllaclient.NoRateLimit, dstPath, srcPath, secondSuffix) if err != nil { t.Fatal(err) } diff --git a/pkg/scyllaclient/client_rclone_integration_test.go b/pkg/scyllaclient/client_rclone_integration_test.go index 5485a7275..39e3bb905 100644 --- a/pkg/scyllaclient/client_rclone_integration_test.go +++ b/pkg/scyllaclient/client_rclone_integration_test.go @@ -36,7 +36,7 @@ func TestRcloneLocalToS3CopyDirIntegration(t *testing.T) { ctx := context.Background() copyDir := func(dir string) (*scyllaclient.RcloneJobInfo, error) { - id, err := client.RcloneCopyDir(ctx, scyllaclienttest.TestHost, scyllaclient.TransfersFromConfig, remotePath("/copy"), "rclonetest:"+dir, "") + id, err := client.RcloneCopyDir(ctx, scyllaclienttest.TestHost, scyllaclient.TransfersFromConfig, scyllaclient.NoRateLimit, remotePath("/copy"), "rclonetest:"+dir, "") if err != nil { t.Fatal(err) } @@ -106,7 +106,7 @@ func TestRcloneS3ToLocalCopyDirIntegration(t *testing.T) { defer closeServer() ctx := context.Background() - id, err := client.RcloneCopyDir(ctx, scyllaclienttest.TestHost, scyllaclient.TransfersFromConfig, "rclonetest:foo", remotePath("/copy"), "") + id, err := client.RcloneCopyDir(ctx, scyllaclienttest.TestHost, scyllaclient.TransfersFromConfig, scyllaclient.NoRateLimit, "rclonetest:foo", remotePath("/copy"), "") if err != nil { t.Fatal(err) } diff --git a/pkg/service/backup/backup.go b/pkg/service/backup/backup.go index 074b04575..e8ebc1d00 100644 --- a/pkg/service/backup/backup.go +++ b/pkg/service/backup/backup.go @@ -3,59 +3,12 @@ package backup import ( - "strings" - "github.com/pkg/errors" - "github.com/scylladb/go-set/strset" "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" - "github.com/scylladb/scylla-manager/v3/pkg/util/slice" "go.uber.org/multierr" ) -func checkDCs(dcAtPos func(int) (string, string), n int, dcMap map[string][]string) (err error) { - allDCs := strset.New() - for dc := range dcMap { - allDCs.Add(dc) - } - - for i := 0; i < n; i++ { - dc, str := dcAtPos(i) - if dc == "" { - continue - } - if !allDCs.Has(dc) { - err = multierr.Append(err, errors.Errorf("%q no such datacenter %s", str, dc)) - } - } - return -} - -func checkAllDCsCovered(locations []Location, dcs []string) error { - hasDCs := strset.New() - hasDefault := false - - for _, l := range locations { - if l.DC == "" { - hasDefault = true - continue - } - hasDCs.Add(l.DC) - } - - if !hasDefault { - if d := strset.Difference(strset.New(dcs...), hasDCs); !d.IsEmpty() { - msg := "missing location(s) for datacenters %s" - if d.Size() == 1 { - msg = "missing location for datacenter %s" - } - return errors.Errorf(msg, strings.Join(d.List(), ", ")) - } - } - - return nil -} - func makeHostInfo(nodes []scyllaclient.NodeStatusInfo, locations []Location, rateLimits []DCLimit, transfers int) ([]hostInfo, error) { // DC location index dcl := map[string]Location{} @@ -95,29 +48,3 @@ func makeHostInfo(nodes []scyllaclient.NodeStatusInfo, locations []Location, rat return hi, errs } - -// filterDCLocations takes list of locations and returns only locations that -// belong to the provided list of datacenters. -func filterDCLocations(locations []Location, dcs []string) []Location { - var filtered []Location - for _, l := range locations { - if l.DC == "" || slice.ContainsString(dcs, l.DC) { - filtered = append(filtered, l) - continue - } - } - return filtered -} - -// filterDCLimits takes list of DCLimits and returns only locations that belong -// to the provided list of datacenters. -func filterDCLimits(limits []DCLimit, dcs []string) []DCLimit { - var filtered []DCLimit - for _, l := range limits { - if l.DC == "" || slice.ContainsString(dcs, l.DC) { - filtered = append(filtered, l) - continue - } - } - return filtered -} diff --git a/pkg/service/backup/backup_test.go b/pkg/service/backup/backup_test.go index db5d34224..49f38e80a 100644 --- a/pkg/service/backup/backup_test.go +++ b/pkg/service/backup/backup_test.go @@ -68,7 +68,7 @@ func TestFilterDCLocations(t *testing.T) { t.Run(test.Name, func(t *testing.T) { t.Parallel() - if diff := cmp.Diff(test.Expect, filterDCLocations(test.Locations, test.DCs)); diff != "" { + if diff := cmp.Diff(test.Expect, FilterDCs(test.Locations, test.DCs)); diff != "" { t.Error(diff) } }) @@ -134,7 +134,7 @@ func TestFilterDCLimit(t *testing.T) { t.Run(test.Name, func(t *testing.T) { t.Parallel() - if diff := cmp.Diff(test.Expect, filterDCLimits(test.DCLimits, test.DCs)); diff != "" { + if diff := cmp.Diff(test.Expect, FilterDCs(test.DCLimits, test.DCs)); diff != "" { t.Error(diff) } }) diff --git a/pkg/service/backup/backupspec/dclimit.go b/pkg/service/backup/backupspec/dclimit.go new file mode 100644 index 000000000..85bff1a3f --- /dev/null +++ b/pkg/service/backup/backupspec/dclimit.go @@ -0,0 +1,118 @@ +// Copyright (C) 2024 ScyllaDB + +package backupspec + +import ( + "fmt" + "regexp" + "strconv" + "strings" + + "github.com/pkg/errors" + "github.com/scylladb/go-set/strset" + "github.com/scylladb/scylla-manager/v3/pkg/util/slice" + "go.uber.org/multierr" +) + +// DCLimit specifies a rate limit for a DC. +type DCLimit struct { + DC string `json:"dc"` + Limit int `json:"limit"` +} + +func (l DCLimit) String() string { + p := fmt.Sprint(l.Limit) + if l.DC != "" { + p = l.DC + ":" + p + } + return p +} + +// Datacenter returns datacenter that is limited. +func (l DCLimit) Datacenter() string { + return l.DC +} + +func (l DCLimit) MarshalText() (text []byte, err error) { + return []byte(l.String()), nil +} + +func (l *DCLimit) UnmarshalText(text []byte) error { + pattern := regexp.MustCompile(`^(([a-zA-Z0-9\-\_\.]+):)?([0-9]+)$`) + + m := pattern.FindSubmatch(text) + if m == nil { + return errors.Errorf("invalid limit %q, the format is [dc:]", string(text)) + } + + limit, err := strconv.ParseInt(string(m[3]), 10, 64) + if err != nil { + return errors.Wrap(err, "invalid limit value") + } + + l.DC = string(m[2]) + l.Limit = int(limit) + + return nil +} + +// Datacenterer describes object that is connected to some datacenter. +type Datacenterer interface { + Datacenter() string +} + +// CheckDCs validates that all dcs are exist in provided dcMap . +func CheckDCs[T Datacenterer](dcs []T, dcMap map[string][]string) (err error) { + allDCs := strset.New() + for dc := range dcMap { + allDCs.Add(dc) + } + + for _, dc := range dcs { + if dc.Datacenter() == "" { + continue + } + if !allDCs.Has(dc.Datacenter()) { + err = multierr.Append(err, errors.Errorf("no such datacenter %s", dc.Datacenter())) + } + } + return +} + +// CheckAllDCsCovered validates that all dcToCover exist in provided dcs. +func CheckAllDCsCovered[T Datacenterer](dcs []T, dcToCover []string) error { + hasDCs := strset.New() + hasDefault := false + + for _, dc := range dcs { + if dc.Datacenter() == "" { + hasDefault = true + continue + } + hasDCs.Add(dc.Datacenter()) + } + + if !hasDefault { + if d := strset.Difference(strset.New(dcToCover...), hasDCs); !d.IsEmpty() { + msg := "missing object(s) for datacenters %s" + if d.Size() == 1 { + msg = "missing object for datacenter %s" + } + return errors.Errorf(msg, strings.Join(d.List(), ", ")) + } + } + + return nil +} + +// FilterDCs returns dcs present in filteredDCs. +func FilterDCs[T Datacenterer](dcs []T, filteredDCs []string) []T { + var filtered []T + for _, dc := range dcs { + if dc.Datacenter() == "" || slice.ContainsString(filteredDCs, dc.Datacenter()) { + filtered = append(filtered, dc) + continue + } + } + return filtered +} diff --git a/pkg/service/backup/backupspec/location.go b/pkg/service/backup/backupspec/location.go index eab03ed3c..fe10d9a38 100644 --- a/pkg/service/backup/backupspec/location.go +++ b/pkg/service/backup/backupspec/location.go @@ -109,6 +109,11 @@ func (l Location) String() string { return p } +// Datacenter returns location's datacenter. +func (l Location) Datacenter() string { + return l.DC +} + func (l Location) MarshalText() (text []byte, err error) { return []byte(l.String()), nil } diff --git a/pkg/service/backup/model.go b/pkg/service/backup/model.go index eed2f6d6d..08a761941 100644 --- a/pkg/service/backup/model.go +++ b/pkg/service/backup/model.go @@ -7,9 +7,7 @@ import ( "encoding/json" "fmt" "reflect" - "regexp" "slices" - "strconv" "strings" "time" @@ -199,49 +197,6 @@ type TableProgress struct { Error string `json:"error,omitempty"` } -// DCLimit specifies a rate limit for a DC. -type DCLimit struct { - DC string `json:"dc"` - Limit int `json:"limit"` -} - -func (l DCLimit) String() string { - p := fmt.Sprint(l.Limit) - if l.DC != "" { - p = l.DC + ":" + p - } - return p -} - -func (l DCLimit) MarshalText() (text []byte, err error) { - return []byte(l.String()), nil -} - -func (l *DCLimit) UnmarshalText(text []byte) error { - pattern := regexp.MustCompile(`^(([a-zA-Z0-9\-\_\.]+):)?([0-9]+)$`) - - m := pattern.FindSubmatch(text) - if m == nil { - return errors.Errorf("invalid limit %q, the format is [dc:]", string(text)) - } - - limit, err := strconv.ParseInt(string(m[3]), 10, 64) - if err != nil { - return errors.Wrap(err, "invalid limit value") - } - - l.DC = string(m[2]) - l.Limit = int(limit) - - return nil -} - -func dcLimitDCAtPos(s []DCLimit) func(int) (string, string) { - return func(i int) (string, string) { - return s[i].DC, s[i].String() - } -} - // taskProperties is the main data structure of the runner.Properties blob. type taskProperties struct { Keyspace []string `json:"keyspace"` @@ -272,23 +227,23 @@ func (p taskProperties) validate(dcs []string, dcMap map[string][]string) error } // Validate location DCs - if err := checkDCs(func(i int) (string, string) { return p.Location[i].DC, p.Location[i].String() }, len(p.Location), dcMap); err != nil { + if err := CheckDCs(p.Location, dcMap); err != nil { return errors.Wrap(err, "invalid location") } // Validate rate limit DCs - if err := checkDCs(dcLimitDCAtPos(p.RateLimit), len(p.RateLimit), dcMap); err != nil { + if err := CheckDCs(p.RateLimit, dcMap); err != nil { return errors.Wrap(err, "invalid rate-limit") } // Validate upload parallel DCs - if err := checkDCs(dcLimitDCAtPos(p.SnapshotParallel), len(p.SnapshotParallel), dcMap); err != nil { + if err := CheckDCs(p.SnapshotParallel, dcMap); err != nil { return errors.Wrap(err, "invalid snapshot-parallel") } // Validate snapshot parallel DCs - if err := checkDCs(dcLimitDCAtPos(p.UploadParallel), len(p.UploadParallel), dcMap); err != nil { + if err := CheckDCs(p.UploadParallel, dcMap); err != nil { return errors.Wrap(err, "invalid upload-parallel") } // Validate all DCs have backup location - if err := checkAllDCsCovered(filterDCLocations(p.Location, dcs), dcs); err != nil { + if err := CheckAllDCsCovered(FilterDCs(p.Location, dcs), dcs); err != nil { return errors.Wrap(err, "invalid location") } return nil @@ -298,7 +253,7 @@ func (p taskProperties) toTarget(ctx context.Context, client *scyllaclient.Clien liveNodes scyllaclient.NodeStatusInfoSlice, filters []tabFilter, validators []tabValidator, ) (Target, error) { policy := p.extractRetention() - rateLimit := filterDCLimits(p.RateLimit, dcs) + rateLimit := FilterDCs(p.RateLimit, dcs) if len(rateLimit) == 0 { rateLimit = []DCLimit{{Limit: defaultRateLimit}} } @@ -311,14 +266,14 @@ func (p taskProperties) toTarget(ctx context.Context, client *scyllaclient.Clien return Target{ Units: units, DC: dcs, - Location: filterDCLocations(p.Location, dcs), + Location: FilterDCs(p.Location, dcs), Retention: policy.Retention, RetentionDays: policy.RetentionDays, RetentionMap: p.RetentionMap, RateLimit: rateLimit, Transfers: p.Transfers, - SnapshotParallel: filterDCLimits(p.SnapshotParallel, dcs), - UploadParallel: filterDCLimits(p.UploadParallel, dcs), + SnapshotParallel: FilterDCs(p.SnapshotParallel, dcs), + UploadParallel: FilterDCs(p.UploadParallel, dcs), Continue: p.Continue, PurgeOnly: p.PurgeOnly, SkipSchema: p.SkipSchema, diff --git a/pkg/service/backup/parallel.go b/pkg/service/backup/parallel.go index c0296c3dc..aba2ba730 100644 --- a/pkg/service/backup/parallel.go +++ b/pkg/service/backup/parallel.go @@ -4,6 +4,7 @@ package backup import ( "github.com/pkg/errors" + . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" "github.com/scylladb/scylla-manager/v3/pkg/util/parallel" ) diff --git a/pkg/service/backup/parallel_test.go b/pkg/service/backup/parallel_test.go index 198af994a..637c41a5c 100644 --- a/pkg/service/backup/parallel_test.go +++ b/pkg/service/backup/parallel_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" ) func TestMakeHostsLimit(t *testing.T) { diff --git a/pkg/service/backup/service_backup_integration_test.go b/pkg/service/backup/service_backup_integration_test.go index ede22060a..610ab0f63 100644 --- a/pkg/service/backup/service_backup_integration_test.go +++ b/pkg/service/backup/service_backup_integration_test.go @@ -436,17 +436,17 @@ func TestGetTargetErrorIntegration(t *testing.T) { { Name: "invalid location dc", JSON: `{"location": ["foobar:s3:backuptest-get-target"]}`, - Error: `invalid location: "foobar:s3:backuptest-get-target" no such datacenter foobar`, + Error: `invalid location: no such datacenter foobar`, }, { Name: "no location for dc", JSON: `{"location": ["dc1:s3:backuptest-get-target"]}`, - Error: "invalid location: missing location for datacenter", + Error: "invalid location: missing object for datacenter", }, { Name: "no location dc filtered out", JSON: `{"dc": ["dc2"], "location": ["dc1:s3:backuptest-get-target"]}`, - Error: "invalid location: missing location for datacenter", + Error: "invalid location: missing object for datacenter", }, { Name: "inaccessible location", @@ -456,17 +456,17 @@ func TestGetTargetErrorIntegration(t *testing.T) { { Name: "invalid rate limit dc", JSON: `{"rate_limit": ["foobar:100"], "location": ["s3:backuptest-get-target"]}`, - Error: `invalid rate-limit: "foobar:100" no such datacenter foobar`, + Error: `invalid rate-limit: no such datacenter foobar`, }, { Name: "invalid snapshot parallel dc", JSON: `{"snapshot_parallel": ["foobar:100"], "location": ["s3:backuptest-get-target"]}`, - Error: `invalid snapshot-parallel: "foobar:100" no such datacenter foobar`, + Error: `invalid snapshot-parallel: no such datacenter foobar`, }, { Name: "invalid upload parallel dc", JSON: `{"upload_parallel": ["foobar:100"], "location": ["s3:backuptest-get-target"]}`, - Error: `invalid upload-parallel: "foobar:100" no such datacenter foobar`, + Error: `invalid upload-parallel: no such datacenter foobar`, }, } @@ -949,7 +949,7 @@ func TestBackupResumeIntegration(t *testing.T) { DC: []string{"dc1"}, Location: []Location{location}, Retention: 2, - RateLimit: []backup.DCLimit{ + RateLimit: []DCLimit{ {"dc1", 1}, }, Continue: true, diff --git a/pkg/service/backup/service_deduplicate_integration_test.go b/pkg/service/backup/service_deduplicate_integration_test.go index 4dc8e01c6..80abc9e73 100644 --- a/pkg/service/backup/service_deduplicate_integration_test.go +++ b/pkg/service/backup/service_deduplicate_integration_test.go @@ -46,7 +46,7 @@ func TestBackupPauseResumeOnDeduplicationStage(t *testing.T) { DC: []string{"dc1"}, Location: []backupspec.Location{location}, Retention: 2, - RateLimit: []backup.DCLimit{ + RateLimit: []backupspec.DCLimit{ {"dc1", 1}, }, Continue: true, diff --git a/pkg/service/backup/worker_deduplicate.go b/pkg/service/backup/worker_deduplicate.go index 148677a11..82cdf59ec 100644 --- a/pkg/service/backup/worker_deduplicate.go +++ b/pkg/service/backup/worker_deduplicate.go @@ -12,6 +12,7 @@ import ( "github.com/pkg/errors" "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" + . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" "github.com/scylladb/scylla-manager/v3/pkg/sstable" "github.com/scylladb/scylla-manager/v3/pkg/util/parallel" ) diff --git a/pkg/service/backup/worker_upload.go b/pkg/service/backup/worker_upload.go index 36c745fba..d9771653f 100644 --- a/pkg/service/backup/worker_upload.go +++ b/pkg/service/backup/worker_upload.go @@ -186,7 +186,7 @@ func (w *worker) uploadSnapshotDir(ctx context.Context, h hostInfo, d snapshotDi func (w *worker) uploadDataDir(ctx context.Context, hi hostInfo, dst, src string, d snapshotDir) error { // Ensure file versioning during upload - id, err := w.Client.RcloneMoveDir(ctx, d.Host, hi.Transfers, dst, src, VersionedFileExt(w.SnapshotTag)) + id, err := w.Client.RcloneMoveDir(ctx, d.Host, hi.Transfers, hi.RateLimit.Limit, dst, src, VersionedFileExt(w.SnapshotTag)) if err != nil { return err } diff --git a/pkg/service/restore/model.go b/pkg/service/restore/model.go index 431975fd8..da616b515 100644 --- a/pkg/service/restore/model.go +++ b/pkg/service/restore/model.go @@ -29,6 +29,7 @@ type Target struct { BatchSize int `json:"batch_size,omitempty"` Parallel int `json:"parallel,omitempty"` Transfers int `json:"transfers"` + RateLimit []DCLimit `json:"rate_limit,omitempty"` AllowCompaction bool `json:"allow_compaction,omitempty"` RestoreSchema bool `json:"restore_schema,omitempty"` RestoreTables bool `json:"restore_tables,omitempty"` @@ -42,6 +43,7 @@ const ( maxBatchSize = 0 // maxTransfers are experimentally defined to 2*node_shard_cnt. maxTransfers = 0 + maxRateLimit = 0 ) func defaultTarget() Target { @@ -55,7 +57,7 @@ func defaultTarget() Target { // validateProperties makes a simple validation of params set by user. // It does not perform validations that require access to the service. -func (t Target) validateProperties() error { +func (t Target) validateProperties(dcMap map[string][]string) error { if len(t.Location) == 0 { return errors.New("missing location") } @@ -72,6 +74,9 @@ func (t Target) validateProperties() error { return errors.New("transfers param has to be equal to -1 (set transfers to the value from scylla-manager-agent.yaml config) " + "or 0 (set transfers for fastest download) or greater than zero") } + if err := CheckDCs(t.RateLimit, dcMap); err != nil { + return errors.Wrap(err, "invalid rate limit") + } if t.RestoreSchema == t.RestoreTables { return errors.New("choose EXACTLY ONE restore type ('--restore-schema' or '--restore-tables' flag)") } @@ -295,4 +300,5 @@ type TableName struct { type HostInfo struct { Host string Transfers int + RateLimit int } diff --git a/pkg/service/restore/restore_integration_test.go b/pkg/service/restore/restore_integration_test.go index cf81acae7..318377cd5 100644 --- a/pkg/service/restore/restore_integration_test.go +++ b/pkg/service/restore/restore_integration_test.go @@ -498,7 +498,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { Print("Fill setup") fillTable(t, h.srcCluster.rootSession, 100, ks, tab) - validateState := func(ch clusterHelper, tombstone string, compaction bool, transfers int) { + validateState := func(ch clusterHelper, tombstone string, compaction bool, transfers int, rateLimit int) { // Validate tombstone_gc mode if got := tombstoneGCMode(t, ch.rootSession, ks, tab); tombstone != got { t.Errorf("expected tombstone_gc=%s, got %s", tombstone, got) @@ -523,6 +523,20 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { t.Errorf("expected transfers=%d, got=%d on host %s", transfers, got, host) } } + // Validate rate limit + for _, host := range ch.Client.Config().Hosts { + got, err := ch.Client.RcloneGetBandwidthLimit(context.Background(), host) + if err != nil { + t.Fatal(errors.Wrapf(err, "check transfers on host %s", host)) + } + rawLimit := fmt.Sprintf("%dM", rateLimit) + if rateLimit == 0 { + rawLimit = "off" + } + if rawLimit != got { + t.Errorf("expected rate_limit=%s, got=%s on host %s", rawLimit, got, host) + } + } } shardCnt, err := h.dstCluster.Client.ShardCount(context.Background(), ManagedClusterHost()) @@ -531,35 +545,39 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { } transfers0 := 2 * int(shardCnt) - // Set initial transfers - for _, host := range ManagedClusterHosts() { - err := h.dstCluster.Client.RcloneSetTransfers(context.Background(), host, 10) - if err != nil { - t.Fatal(errors.Wrapf(err, "set initial transfers on host %s", host)) - } - } - for _, host := range ManagedSecondClusterHosts() { - err := h.srcCluster.Client.RcloneSetTransfers(context.Background(), host, 10) - if err != nil { - t.Fatal(errors.Wrapf(err, "set initial transfers on host %s", host)) + setTransfersAndRateLimit := func(ch clusterHelper, transfers int, rateLimit int) { + for _, host := range ch.Client.Config().Hosts { + err := ch.Client.RcloneSetTransfers(context.Background(), host, transfers) + if err != nil { + t.Fatal(errors.Wrapf(err, "set transfers on host %s", host)) + } + err = ch.Client.RcloneSetBandwidthLimit(context.Background(), host, rateLimit) + if err != nil { + t.Fatal(errors.Wrapf(err, "set rate limit on host %s", host)) + } } } + Print("Set initial transfers and rate limit") + setTransfersAndRateLimit(h.srcCluster, 10, 99) + setTransfersAndRateLimit(h.dstCluster, 10, 99) + Print("Validate state before backup") - validateState(h.srcCluster, "repair", true, 10) + validateState(h.srcCluster, "repair", true, 10, 99) Print("Run backup") loc := []Location{testLocation("preparation", "")} S3InitBucket(t, loc[0].Path) ksFilter := []string{ks} tag := h.runBackup(t, map[string]any{ - "location": loc, - "keyspace": ksFilter, - "transfers": 3, + "location": loc, + "keyspace": ksFilter, + "transfers": 3, + "rate_limit": []string{"88"}, }) Print("Validate state after backup") - validateState(h.srcCluster, "repair", true, 3) + validateState(h.srcCluster, "repair", true, 3, 88) runRestore := func(ctx context.Context, finishedRestore chan error) { grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser) @@ -569,10 +587,12 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { "keyspace": ksFilter, "snapshot_tag": tag, "transfers": 0, + "rate_limit": []string{"0"}, "restore_tables": true, }) if err != nil { - t.Error(err) + finishedRestore <- err + return } finishedRestore <- h.dstRestoreSvc.Restore(ctx, h.dstCluster.ClusterID, h.dstCluster.TaskID, h.dstCluster.RunID, rawProps) } @@ -601,7 +621,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { makeLASHang(reachedDataStageChan, hangLAS) Print("Validate state before restore") - validateState(h.dstCluster, "repair", true, 10) + validateState(h.dstCluster, "repair", true, 10, 99) Print("Run restore") finishedRestore := make(chan error) @@ -609,10 +629,14 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { go runRestore(restoreCtx, finishedRestore) Print("Wait for data stage") - <-reachedDataStageChan + select { + case <-reachedDataStageChan: + case err := <-finishedRestore: + t.Fatalf("Restore finished before reaching data stage with: %s", err) + } Print("Validate state during restore data") - validateState(h.dstCluster, "disabled", false, transfers0) + validateState(h.dstCluster, "disabled", false, transfers0, 0) Print("Pause restore") restoreCancel() @@ -627,7 +651,11 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { } Print("Validate state during pause") - validateState(h.dstCluster, "disabled", true, transfers0) + validateState(h.dstCluster, "disabled", true, transfers0, 0) + + Print("Change transfers and rate limit during pause") + setTransfersAndRateLimit(h.srcCluster, 9, 55) + setTransfersAndRateLimit(h.dstCluster, 9, 55) reachedDataStageChan = make(chan struct{}) hangLAS = make(chan struct{}) @@ -639,10 +667,14 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { go runRestore(context.Background(), finishedRestore) Print("Wait for data stage") - <-reachedDataStageChan + select { + case <-reachedDataStageChan: + case err := <-finishedRestore: + t.Fatalf("Restore finished before reaching data stage with: %s", err) + } Print("Validate state during restore data after pause") - validateState(h.dstCluster, "disabled", false, transfers0) + validateState(h.dstCluster, "disabled", false, transfers0, 0) Print("Release LAS") close(hangLAS) @@ -654,7 +686,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { } Print("Validate state after restore success") - validateState(h.dstCluster, "repair", true, transfers0) + validateState(h.dstCluster, "repair", true, transfers0, 0) Print("Validate table contents") h.validateIdenticalTables(t, []table{{ks: ks, tab: tab}}) diff --git a/pkg/service/restore/tables_worker.go b/pkg/service/restore/tables_worker.go index ecb5c4b64..e35409f70 100644 --- a/pkg/service/restore/tables_worker.go +++ b/pkg/service/restore/tables_worker.go @@ -202,15 +202,13 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error { bd := newBatchDispatcher(workload, w.target.BatchSize, hostToShard, w.target.locationHosts) f := func(n int) (err error) { - transfers := w.target.Transfers - if transfers == maxTransfers { - transfers = 2 * int(hostToShard[hosts[n]]) - } - hi := HostInfo{ - Host: hosts[n], - Transfers: transfers, + host := hosts[n] + dc, err := w.client.HostDatacenter(ctx, host) + if err != nil { + return errors.Wrapf(err, "get host %s data center", host) } - w.logger.Info(ctx, "Host info", "host", hi.Host, "transfers", hi.Transfers) + hi := w.hostInfo(host, dc, hostToShard[host]) + w.logger.Info(ctx, "Host info", "host", hi.Host, "transfers", hi.Transfers, "rate limit", hi.RateLimit) for { // Download and stream in parallel b, ok := bd.DispatchBatch(hi.Host) @@ -350,3 +348,31 @@ func (w *tablesWorker) setAutoCompaction(ctx context.Context, hosts []string, en } return nil } + +func (w *tablesWorker) hostInfo(host, dc string, shards uint) HostInfo { + return HostInfo{ + Host: host, + Transfers: hostTransfers(w.target.Transfers, shards), + RateLimit: dcRateLimit(w.target.RateLimit, dc), + } +} + +func hostTransfers(transfers int, shards uint) int { + if transfers == maxTransfers { + transfers = 2 * int(shards) + } + return transfers +} + +func dcRateLimit(limits []DCLimit, dc string) int { + defaultLimit := maxRateLimit + for _, limit := range limits { + if limit.DC == dc { + return limit.Limit + } + if limit.DC == "" { + defaultLimit = limit.Limit + } + } + return defaultLimit +} diff --git a/pkg/service/restore/tablesdir_worker.go b/pkg/service/restore/tablesdir_worker.go index 7c9efb198..199204c94 100644 --- a/pkg/service/restore/tablesdir_worker.go +++ b/pkg/service/restore/tablesdir_worker.go @@ -166,7 +166,7 @@ func (w *tablesWorker) startDownload(ctx context.Context, hi HostInfo, b batch) for _, sst := range sstables { files = append(files, sst.Files...) } - jobID, err = w.client.RcloneCopyPaths(ctx, hi.Host, hi.Transfers, uploadDir, b.RemoteSSTableDir, files) + jobID, err = w.client.RcloneCopyPaths(ctx, hi.Host, hi.Transfers, hi.RateLimit, uploadDir, b.RemoteSSTableDir, files) if err != nil { return 0, 0, errors.Wrap(err, "download batch to upload dir") } diff --git a/pkg/service/restore/worker.go b/pkg/service/restore/worker.go index 8984a934a..c6a42d1b3 100644 --- a/pkg/service/restore/worker.go +++ b/pkg/service/restore/worker.go @@ -67,7 +67,12 @@ func (w *worker) initTarget(ctx context.Context, properties json.RawMessage) err if err := json.Unmarshal(properties, &t); err != nil { return err } - if err := t.validateProperties(); err != nil { + + dcMap, err := w.client.Datacenters(ctx) + if err != nil { + return errors.Wrap(err, "get data centers") + } + if err := t.validateProperties(dcMap); err != nil { return err }