diff --git a/internal/connectors/client/connector.go b/internal/connectors/client/connector.go index 8d9f65e9..5e27b83f 100644 --- a/internal/connectors/client/connector.go +++ b/internal/connectors/client/connector.go @@ -3,7 +3,13 @@ package client import ( "context" "fmt" + "github.com/ydb-platform/ydb-go-sdk/v3/retry" + "github.com/ydb-platform/ydb-go-sdk/v3/scheme" + "path" + "regexp" + "strings" "time" + "ydbcp/internal/types" "ydbcp/internal/util/xlog" "github.com/ydb-platform/ydb-go-genproto/Ydb_Export_V1" @@ -22,7 +28,7 @@ type ClientConnector interface { Open(ctx context.Context, dsn string) (*ydb.Driver, error) Close(ctx context.Context, clientDb *ydb.Driver) error - ExportToS3(ctx context.Context, clientDb *ydb.Driver, s3Settings *Ydb_Export.ExportToS3Settings) (string, error) + ExportToS3(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ExportSettings) (string, error) ImportFromS3(ctx context.Context, clientDb *ydb.Driver, s3Settings *Ydb_Import.ImportFromS3Settings) (string, error) GetOperationStatus(ctx context.Context, clientDb *ydb.Driver, operationId string) (*Ydb_Operations.GetOperationResponse, error) ForgetOperation(ctx context.Context, clientDb *ydb.Driver, operationId string) (*Ydb_Operations.ForgetOperationResponse, error) @@ -62,11 +68,120 @@ func (d *ClientYdbConnector) Close(ctx context.Context, clientDb *ydb.Driver) er return nil } -func (d *ClientYdbConnector) ExportToS3(ctx context.Context, clientDb *ydb.Driver, s3Settings *Ydb_Export.ExportToS3Settings) (string, error) { +func isSystemDirectory(name string) bool { + return strings.HasPrefix(name, ".sys") || + strings.HasPrefix(name, ".metadata") || + strings.HasPrefix(name, "~") +} + +func isExportDirectory(fullPath string, database string) bool { + return strings.HasPrefix(fullPath, path.Join(database, "export")) +} + +func listDirectory(ctx context.Context, clientDb *ydb.Driver, sources *[]string, initialPath string, exclusions []regexp.Regexp) error { + var dir scheme.Directory + var err error + + err = retry.Retry(ctx, func(ctx context.Context) (err error) { + dir, err = clientDb.Scheme().ListDirectory(ctx, initialPath) + return err + }, retry.WithIdempotent(true)) + + if err != nil { + return fmt.Errorf("list directory %s was failed: %v", initialPath, err) + } + + filterPath := func(path string) bool { + for _, exclusion := range exclusions { + if exclusion.MatchString(path) { + xlog.Info(ctx, "Excluded path", zap.String("path", path)) + return false + } + } + + return true + } + + if dir.Entry.IsTable() { + if filterPath(initialPath) { + *sources = append(*sources, initialPath) + } + } + + for _, child := range dir.Children { + childPath := path.Join(initialPath, child.Name) + + if child.IsDirectory() { + if isSystemDirectory(child.Name) || isExportDirectory(childPath, clientDb.Scheme().Database()) { + continue + } + + err = listDirectory(ctx, clientDb, sources, childPath, exclusions) + } else if child.IsTable() { + if filterPath(childPath) { + *sources = append(*sources, childPath) + } + } + } + + return err +} + +func buildExportItems(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ExportSettings) ([]*Ydb_Export.ExportToS3Settings_Item, error) { + sources := make([]string, 0) + exclusions := make([]regexp.Regexp, len(s3Settings.SourcePathToExclude)) + + for i, excludePath := range s3Settings.SourcePathToExclude { + reg, err := regexp.Compile(excludePath) + if err != nil { + return nil, fmt.Errorf("error compiling exclude path regexp: %s", err.Error()) + } + + exclusions[i] = *reg + } + + if len(s3Settings.SourcePaths) > 0 { + for _, sourcePath := range s3Settings.SourcePaths { + err := listDirectory(ctx, clientDb, &sources, sourcePath, exclusions) + if err != nil { + return nil, err + } + } + } else { + // List root directory + err := listDirectory(ctx, clientDb, &sources, "/", exclusions) + if err != nil { + return nil, err + } + } + + items := make([]*Ydb_Export.ExportToS3Settings_Item, len(sources)) + + for i, source := range sources { + items[i] = &Ydb_Export.ExportToS3Settings_Item{ + SourcePath: source, + DestinationPrefix: path.Join( + s3Settings.DestinationPrefix, + clientDb.Scheme().Database(), + time.Now().Format(types.BackupTimestampFormat), + strings.TrimPrefix(source, clientDb.Scheme().Database()+"/"), + ), + } + } + + return items, nil +} + +func (d *ClientYdbConnector) ExportToS3(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ExportSettings) (string, error) { if clientDb == nil { return "", fmt.Errorf("unititialized client db driver") } + items, err := buildExportItems(ctx, clientDb, s3Settings) + if err != nil { + return "", fmt.Errorf("error preparing list of exported items : %s", err.Error()) + } + exportClient := Ydb_Export_V1.NewExportServiceClient(ydb.GRPCConn(clientDb)) xlog.Info(ctx, "Exporting data to s3", zap.String("endpoint", s3Settings.Endpoint), @@ -81,7 +196,15 @@ func (d *ClientYdbConnector) ExportToS3(ctx context.Context, clientDb *ydb.Drive OperationTimeout: durationpb.New(time.Second), CancelAfter: durationpb.New(time.Second), }, - Settings: s3Settings, + Settings: &Ydb_Export.ExportToS3Settings{ + Endpoint: s3Settings.Endpoint, + Bucket: s3Settings.Bucket, + AccessKey: s3Settings.AccessKey, + SecretKey: s3Settings.SecretKey, + Description: s3Settings.Description, + NumberOfRetries: s3Settings.NumberOfRetries, + Items: items, + }, }, ) diff --git a/internal/connectors/client/mock.go b/internal/connectors/client/mock.go index 84a2e83b..550c4cb0 100644 --- a/internal/connectors/client/mock.go +++ b/internal/connectors/client/mock.go @@ -4,10 +4,11 @@ import ( "context" "fmt" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue" + "path" + "ydbcp/internal/types" "github.com/google/uuid" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Export" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Import" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations" "github.com/ydb-platform/ydb-go-sdk/v3" @@ -51,10 +52,10 @@ func (m *MockClientConnector) Close(_ context.Context, _ *ydb.Driver) error { return nil } -func (m *MockClientConnector) ExportToS3(_ context.Context, _ *ydb.Driver, s3Settings *Ydb_Export.ExportToS3Settings) (string, error) { +func (m *MockClientConnector) ExportToS3(_ context.Context, _ *ydb.Driver, s3Settings types.ExportSettings) (string, error) { objects := make([]ObjectPath, 0) - for _, item := range s3Settings.Items { - objectPath := ObjectPath{Bucket: s3Settings.Bucket, KeyPrefix: item.DestinationPrefix} + for _, source := range s3Settings.SourcePaths { + objectPath := ObjectPath{Bucket: s3Settings.Bucket, KeyPrefix: path.Join(s3Settings.DestinationPrefix, source)} if m.storage[objectPath] { return "", fmt.Errorf("object %v already exist", objectPath) } diff --git a/internal/types/backup.go b/internal/types/backup.go index 2285cd6a..86f1288a 100644 --- a/internal/types/backup.go +++ b/internal/types/backup.go @@ -207,6 +207,8 @@ const ( BackupStateAvailable = "Available" BackupStateError = "Error" BackupStateCancelled = "Cancelled" + + BackupTimestampFormat = "20060102_150405" ) func OperationToString(o Operation) string { @@ -265,3 +267,16 @@ func MakeYdbConnectionString(params YdbConnectionParams) string { } type OperationHandler func(context.Context, Operation) error + +type ExportSettings struct { + Endpoint string + Region string + Bucket string + AccessKey string + SecretKey string + Description string + NumberOfRetries uint32 + SourcePaths []string + SourcePathToExclude []string + DestinationPrefix string +} diff --git a/pkg/proto/backup_service.proto b/pkg/proto/backup_service.proto index 057b2cd0..dae694ea 100644 --- a/pkg/proto/backup_service.proto +++ b/pkg/proto/backup_service.proto @@ -48,7 +48,9 @@ message MakeBackupRequest { string container_id = 1; string database_name = 2; string endpoint = 3; + // Full path to a table or directory. Empty source_paths means backup of root directory. repeated string source_paths = 4; // [(size) = "<=256"]; + // Regexp for paths excluded from backup. repeated string source_paths_to_exclude = 5; // [(size) = "<=256"]; }