Skip to content

Commit

Permalink
feat(mvp): allow pass dirs as source paths for export operation (#27)
Browse files Browse the repository at this point in the history
Co-authored-by: Iuliia Sidorina <[email protected]>
  • Loading branch information
ulya-sidorina and Iuliia Sidorina authored Jul 26, 2024
1 parent b2c9e1e commit dad80d3
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 7 deletions.
146 changes: 143 additions & 3 deletions internal/connectors/client/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ package client
import (
"context"
"fmt"
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
"github.com/ydb-platform/ydb-go-sdk/v3/scheme"
"path"
"regexp"
"strings"
"time"
"ydbcp/internal/types"
"ydbcp/internal/util/xlog"

"github.com/ydb-platform/ydb-go-genproto/Ydb_Export_V1"
Expand All @@ -22,7 +28,7 @@ type ClientConnector interface {
Open(ctx context.Context, dsn string) (*ydb.Driver, error)
Close(ctx context.Context, clientDb *ydb.Driver) error

ExportToS3(ctx context.Context, clientDb *ydb.Driver, s3Settings *Ydb_Export.ExportToS3Settings) (string, error)
ExportToS3(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ExportSettings) (string, error)
ImportFromS3(ctx context.Context, clientDb *ydb.Driver, s3Settings *Ydb_Import.ImportFromS3Settings) (string, error)
GetOperationStatus(ctx context.Context, clientDb *ydb.Driver, operationId string) (*Ydb_Operations.GetOperationResponse, error)
ForgetOperation(ctx context.Context, clientDb *ydb.Driver, operationId string) (*Ydb_Operations.ForgetOperationResponse, error)
Expand Down Expand Up @@ -62,11 +68,137 @@ func (d *ClientYdbConnector) Close(ctx context.Context, clientDb *ydb.Driver) er
return nil
}

func (d *ClientYdbConnector) ExportToS3(ctx context.Context, clientDb *ydb.Driver, s3Settings *Ydb_Export.ExportToS3Settings) (string, error) {
func isSystemDirectory(name string) bool {
return strings.HasPrefix(name, ".sys") ||
strings.HasPrefix(name, ".metadata") ||
strings.HasPrefix(name, "~")
}

func isExportDirectory(fullPath string, database string) bool {
return strings.HasPrefix(fullPath, path.Join(database, "export"))
}

func listDirectory(ctx context.Context, clientDb *ydb.Driver, initialPath string, exclusions []regexp.Regexp) ([]string, error) {
var dir scheme.Directory
var err error

err = retry.Retry(ctx, func(ctx context.Context) (err error) {
dir, err = clientDb.Scheme().ListDirectory(ctx, initialPath)
return err
}, retry.WithIdempotent(true))

if err != nil {
return nil, fmt.Errorf("list directory %s was failed: %v", initialPath, err)
}

excluded := func(path string) bool {
for _, exclusion := range exclusions {
if exclusion.MatchString(path) {
xlog.Info(ctx, "Excluded path", zap.String("path", path))
return true
}
}

return false
}

result := make([]string, 0)
if dir.Entry.IsTable() {
if !excluded(initialPath) {
xlog.Info(ctx, "Included path", zap.String("path", initialPath))
result = append(result, initialPath)
}
}

for _, child := range dir.Children {
childPath := path.Join(initialPath, child.Name)

if child.IsDirectory() {
if isSystemDirectory(child.Name) || isExportDirectory(childPath, clientDb.Scheme().Database()) {
continue
}

var list []string
list, err = listDirectory(ctx, clientDb, childPath, exclusions)

if err != nil {
return nil, err
}

result = append(result, list...)
} else if child.IsTable() {
if !excluded(childPath) {
xlog.Info(ctx, "Included path", zap.String("path", childPath))
result = append(result, childPath)
}
}
}

return result, nil
}

func prepareItemsForExport(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ExportSettings) ([]*Ydb_Export.ExportToS3Settings_Item, error) {
sources := make([]string, 0)
exclusions := make([]regexp.Regexp, len(s3Settings.SourcePathToExclude))

for i, excludePath := range s3Settings.SourcePathToExclude {
reg, err := regexp.Compile(excludePath)
if err != nil {
return nil, fmt.Errorf("error compiling exclude path regexp: %s", err.Error())
}

exclusions[i] = *reg
}

if len(s3Settings.SourcePaths) > 0 {
for _, sourcePath := range s3Settings.SourcePaths {
list, err := listDirectory(ctx, clientDb, sourcePath, exclusions)
if err != nil {
return nil, err
}

sources = append(sources, list...)
}
} else {
// List root directory
list, err := listDirectory(ctx, clientDb, "/", exclusions)
if err != nil {
return nil, err
}

sources = append(sources, list...)
}

items := make([]*Ydb_Export.ExportToS3Settings_Item, len(sources))

for i, source := range sources {
// Destination prefix format: s3_destination_prefix/database_name/timestamp_backup_id/rel_source_path
destinationPrefix := path.Join(
s3Settings.DestinationPrefix,
clientDb.Scheme().Database(),
time.Now().Format(types.BackupTimestampFormat)+"_"+s3Settings.BackupID.String(),
strings.TrimPrefix(source, clientDb.Scheme().Database()+"/"),
)

items[i] = &Ydb_Export.ExportToS3Settings_Item{
SourcePath: source,
DestinationPrefix: destinationPrefix,
}
}

return items, nil
}

func (d *ClientYdbConnector) ExportToS3(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ExportSettings) (string, error) {
if clientDb == nil {
return "", fmt.Errorf("unititialized client db driver")
}

items, err := prepareItemsForExport(ctx, clientDb, s3Settings)
if err != nil {
return "", fmt.Errorf("error preparing list of items for export: %s", err.Error())
}

exportClient := Ydb_Export_V1.NewExportServiceClient(ydb.GRPCConn(clientDb))
xlog.Info(ctx, "Exporting data to s3",
zap.String("endpoint", s3Settings.Endpoint),
Expand All @@ -81,7 +213,15 @@ func (d *ClientYdbConnector) ExportToS3(ctx context.Context, clientDb *ydb.Drive
OperationTimeout: durationpb.New(time.Second),
CancelAfter: durationpb.New(time.Second),
},
Settings: s3Settings,
Settings: &Ydb_Export.ExportToS3Settings{
Endpoint: s3Settings.Endpoint,
Bucket: s3Settings.Bucket,
AccessKey: s3Settings.AccessKey,
SecretKey: s3Settings.SecretKey,
Description: s3Settings.Description,
NumberOfRetries: s3Settings.NumberOfRetries,
Items: items,
},
},
)

Expand Down
9 changes: 5 additions & 4 deletions internal/connectors/client/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"context"
"fmt"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue"
"path"
"ydbcp/internal/types"

"github.com/google/uuid"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Export"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Import"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations"
"github.com/ydb-platform/ydb-go-sdk/v3"
Expand Down Expand Up @@ -51,10 +52,10 @@ func (m *MockClientConnector) Close(_ context.Context, _ *ydb.Driver) error {
return nil
}

func (m *MockClientConnector) ExportToS3(_ context.Context, _ *ydb.Driver, s3Settings *Ydb_Export.ExportToS3Settings) (string, error) {
func (m *MockClientConnector) ExportToS3(_ context.Context, _ *ydb.Driver, s3Settings types.ExportSettings) (string, error) {
objects := make([]ObjectPath, 0)
for _, item := range s3Settings.Items {
objectPath := ObjectPath{Bucket: s3Settings.Bucket, KeyPrefix: item.DestinationPrefix}
for _, source := range s3Settings.SourcePaths {
objectPath := ObjectPath{Bucket: s3Settings.Bucket, KeyPrefix: path.Join(s3Settings.DestinationPrefix, source)}
if m.storage[objectPath] {
return "", fmt.Errorf("object %v already exist", objectPath)
}
Expand Down
16 changes: 16 additions & 0 deletions internal/types/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ const (
BackupStateAvailable = "Available"
BackupStateError = "Error"
BackupStateCancelled = "Cancelled"

BackupTimestampFormat = "20060102_150405"
)

func OperationToString(o Operation) string {
Expand Down Expand Up @@ -323,3 +325,17 @@ func MakeYdbConnectionString(params YdbConnectionParams) string {
}

type OperationHandler func(context.Context, Operation) error

type ExportSettings struct {
Endpoint string
Region string
Bucket string
AccessKey string
SecretKey string
Description string
NumberOfRetries uint32
SourcePaths []string
SourcePathToExclude []string
DestinationPrefix string
BackupID ObjectID
}
2 changes: 2 additions & 0 deletions pkg/proto/backup_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ message MakeBackupRequest {
string container_id = 1;
string database_name = 2;
string endpoint = 3;
// Full path to a table or directory. Empty source_paths means backup of root directory.
repeated string source_paths = 4; // [(size) = "<=256"];
// Regexp for paths excluded from backup.
repeated string source_paths_to_exclude = 5; // [(size) = "<=256"];
}

Expand Down

0 comments on commit dad80d3

Please sign in to comment.