Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mvp): allow dirs as source paths for export operation #27

Merged
merged 1 commit into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 143 additions & 3 deletions internal/connectors/client/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ package client
import (
"context"
"fmt"
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
"github.com/ydb-platform/ydb-go-sdk/v3/scheme"
"path"
"regexp"
"strings"
"time"
"ydbcp/internal/types"
"ydbcp/internal/util/xlog"

"github.com/ydb-platform/ydb-go-genproto/Ydb_Export_V1"
Expand All @@ -22,7 +28,7 @@ type ClientConnector interface {
Open(ctx context.Context, dsn string) (*ydb.Driver, error)
Close(ctx context.Context, clientDb *ydb.Driver) error

ExportToS3(ctx context.Context, clientDb *ydb.Driver, s3Settings *Ydb_Export.ExportToS3Settings) (string, error)
ExportToS3(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ExportSettings) (string, error)
ImportFromS3(ctx context.Context, clientDb *ydb.Driver, s3Settings *Ydb_Import.ImportFromS3Settings) (string, error)
GetOperationStatus(ctx context.Context, clientDb *ydb.Driver, operationId string) (*Ydb_Operations.GetOperationResponse, error)
ForgetOperation(ctx context.Context, clientDb *ydb.Driver, operationId string) (*Ydb_Operations.ForgetOperationResponse, error)
Expand Down Expand Up @@ -62,11 +68,137 @@ func (d *ClientYdbConnector) Close(ctx context.Context, clientDb *ydb.Driver) er
return nil
}

func (d *ClientYdbConnector) ExportToS3(ctx context.Context, clientDb *ydb.Driver, s3Settings *Ydb_Export.ExportToS3Settings) (string, error) {
func isSystemDirectory(name string) bool {
return strings.HasPrefix(name, ".sys") ||
strings.HasPrefix(name, ".metadata") ||
strings.HasPrefix(name, "~")
}

func isExportDirectory(fullPath string, database string) bool {
return strings.HasPrefix(fullPath, path.Join(database, "export"))
}

func listDirectory(ctx context.Context, clientDb *ydb.Driver, initialPath string, exclusions []regexp.Regexp) ([]string, error) {
var dir scheme.Directory
var err error

err = retry.Retry(ctx, func(ctx context.Context) (err error) {
dir, err = clientDb.Scheme().ListDirectory(ctx, initialPath)
return err
}, retry.WithIdempotent(true))

if err != nil {
return nil, fmt.Errorf("list directory %s was failed: %v", initialPath, err)
}

excluded := func(path string) bool {
for _, exclusion := range exclusions {
if exclusion.MatchString(path) {
xlog.Info(ctx, "Excluded path", zap.String("path", path))
return true
}
}

return false
}

result := make([]string, 0)
if dir.Entry.IsTable() {
if !excluded(initialPath) {
xlog.Info(ctx, "Included path", zap.String("path", initialPath))
result = append(result, initialPath)
}
}

for _, child := range dir.Children {
childPath := path.Join(initialPath, child.Name)

if child.IsDirectory() {
if isSystemDirectory(child.Name) || isExportDirectory(childPath, clientDb.Scheme().Database()) {
continue
}

var list []string
list, err = listDirectory(ctx, clientDb, childPath, exclusions)

if err != nil {
return nil, err
}

result = append(result, list...)
} else if child.IsTable() {
if !excluded(childPath) {
xlog.Info(ctx, "Included path", zap.String("path", childPath))
result = append(result, childPath)
}
}
}

return result, nil
}

func prepareItemsForExport(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ExportSettings) ([]*Ydb_Export.ExportToS3Settings_Item, error) {
sources := make([]string, 0)
exclusions := make([]regexp.Regexp, len(s3Settings.SourcePathToExclude))

for i, excludePath := range s3Settings.SourcePathToExclude {
reg, err := regexp.Compile(excludePath)
if err != nil {
return nil, fmt.Errorf("error compiling exclude path regexp: %s", err.Error())
}

exclusions[i] = *reg
}

if len(s3Settings.SourcePaths) > 0 {
for _, sourcePath := range s3Settings.SourcePaths {
list, err := listDirectory(ctx, clientDb, sourcePath, exclusions)
if err != nil {
return nil, err
}

sources = append(sources, list...)
}
} else {
// List root directory
list, err := listDirectory(ctx, clientDb, "/", exclusions)
if err != nil {
return nil, err
}

sources = append(sources, list...)
}

items := make([]*Ydb_Export.ExportToS3Settings_Item, len(sources))

for i, source := range sources {
// Destination prefix format: s3_destination_prefix/database_name/timestamp_backup_id/rel_source_path
destinationPrefix := path.Join(
s3Settings.DestinationPrefix,
clientDb.Scheme().Database(),
time.Now().Format(types.BackupTimestampFormat)+"_"+s3Settings.BackupID.String(),
strings.TrimPrefix(source, clientDb.Scheme().Database()+"/"),
)

items[i] = &Ydb_Export.ExportToS3Settings_Item{
SourcePath: source,
DestinationPrefix: destinationPrefix,
}
}

return items, nil
}

func (d *ClientYdbConnector) ExportToS3(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ExportSettings) (string, error) {
if clientDb == nil {
return "", fmt.Errorf("unititialized client db driver")
}

items, err := prepareItemsForExport(ctx, clientDb, s3Settings)
if err != nil {
return "", fmt.Errorf("error preparing list of items for export: %s", err.Error())
}

exportClient := Ydb_Export_V1.NewExportServiceClient(ydb.GRPCConn(clientDb))
xlog.Info(ctx, "Exporting data to s3",
zap.String("endpoint", s3Settings.Endpoint),
Expand All @@ -81,7 +213,15 @@ func (d *ClientYdbConnector) ExportToS3(ctx context.Context, clientDb *ydb.Drive
OperationTimeout: durationpb.New(time.Second),
CancelAfter: durationpb.New(time.Second),
},
Settings: s3Settings,
Settings: &Ydb_Export.ExportToS3Settings{
Endpoint: s3Settings.Endpoint,
Bucket: s3Settings.Bucket,
AccessKey: s3Settings.AccessKey,
SecretKey: s3Settings.SecretKey,
Description: s3Settings.Description,
NumberOfRetries: s3Settings.NumberOfRetries,
Items: items,
},
},
)

Expand Down
9 changes: 5 additions & 4 deletions internal/connectors/client/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"context"
"fmt"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue"
"path"
"ydbcp/internal/types"

"github.com/google/uuid"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Export"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Import"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations"
"github.com/ydb-platform/ydb-go-sdk/v3"
Expand Down Expand Up @@ -51,10 +52,10 @@ func (m *MockClientConnector) Close(_ context.Context, _ *ydb.Driver) error {
return nil
}

func (m *MockClientConnector) ExportToS3(_ context.Context, _ *ydb.Driver, s3Settings *Ydb_Export.ExportToS3Settings) (string, error) {
func (m *MockClientConnector) ExportToS3(_ context.Context, _ *ydb.Driver, s3Settings types.ExportSettings) (string, error) {
objects := make([]ObjectPath, 0)
for _, item := range s3Settings.Items {
objectPath := ObjectPath{Bucket: s3Settings.Bucket, KeyPrefix: item.DestinationPrefix}
for _, source := range s3Settings.SourcePaths {
objectPath := ObjectPath{Bucket: s3Settings.Bucket, KeyPrefix: path.Join(s3Settings.DestinationPrefix, source)}
if m.storage[objectPath] {
return "", fmt.Errorf("object %v already exist", objectPath)
}
Expand Down
16 changes: 16 additions & 0 deletions internal/types/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ const (
BackupStateAvailable = "Available"
BackupStateError = "Error"
BackupStateCancelled = "Cancelled"

BackupTimestampFormat = "20060102_150405"
)

func OperationToString(o Operation) string {
Expand Down Expand Up @@ -265,3 +267,17 @@ func MakeYdbConnectionString(params YdbConnectionParams) string {
}

type OperationHandler func(context.Context, Operation) error

type ExportSettings struct {
Endpoint string
Region string
Bucket string
AccessKey string
SecretKey string
Description string
NumberOfRetries uint32
SourcePaths []string
SourcePathToExclude []string
DestinationPrefix string
BackupID ObjectID
}
2 changes: 2 additions & 0 deletions pkg/proto/backup_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ message MakeBackupRequest {
string container_id = 1;
string database_name = 2;
string endpoint = 3;
// Full path to a table or directory. Empty source_paths means backup of root directory.
repeated string source_paths = 4; // [(size) = "<=256"];
// Regexp for paths excluded from backup.
repeated string source_paths_to_exclude = 5; // [(size) = "<=256"];
}

Expand Down
Loading