Skip to content

Commit

Permalink
feat(mvp): support MakeRestore
Browse files Browse the repository at this point in the history
  • Loading branch information
Iuliia Sidorina committed Aug 16, 2024
1 parent 6e0ce65 commit 9226cba
Show file tree
Hide file tree
Showing 8 changed files with 230 additions and 38 deletions.
71 changes: 71 additions & 0 deletions cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,77 @@ func (s *server) MakeBackup(ctx context.Context, req *pb.MakeBackupRequest) (*pb
return op.Proto(), nil
}

func (s *server) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequest) (*pb.Operation, error) {
xlog.Info(ctx, "MakeRestore", zap.String("request", req.String()))

clientConnectionParams := types.YdbConnectionParams{
Endpoint: req.GetDatabaseEndpoint(),
DatabaseName: req.GetDatabaseName(),
}
dsn := types.MakeYdbConnectionString(clientConnectionParams)
client, err := s.clientConn.Open(ctx, dsn)
if err != nil {
return nil, fmt.Errorf("can't open client connection, dsn %s: %w", dsn, err)
}
defer func() {
if err := s.clientConn.Close(ctx, client); err != nil {
xlog.Error(ctx, "can't close client connection", zap.Error(err))
}
}()

accessKey, err := s.s3.AccessKey()
if err != nil {
return nil, fmt.Errorf("can't get S3AccessKey: %w", err)
}
secretKey, err := s.s3.SecretKey()
if err != nil {
return nil, fmt.Errorf("can't get S3SecretKey: %w", err)
}

s3Settings := types.ImportSettings{
Endpoint: s.s3.Endpoint,
Bucket: s.s3.Bucket,
AccessKey: accessKey,
SecretKey: secretKey,
Description: "ydbcp restore", // TODO: write description
NumberOfRetries: 10, // TODO: get value from configuration
BackupID: req.GetBackupId(),
SourcePaths: req.GetSourcePaths(),
S3ForcePathStyle: true,
DestinationPrefix: req.GetDestinationPrefix(),
}

clientOperationID, err := s.clientConn.ImportFromS3(ctx, client, s3Settings)
if err != nil {
return nil, fmt.Errorf("can't start import operation, dsn %s: %w", dsn, err)
}

xlog.Debug(
ctx, "import operation started", zap.String("clientOperationID", clientOperationID), zap.String("dsn", dsn),
)

op := &types.RestoreBackupOperation{
ContainerID: req.GetContainerId(),
BackupId: types.MustObjectIDFromString(req.GetBackupId()),
State: types.OperationStatePending,
YdbConnectionParams: types.YdbConnectionParams{
Endpoint: req.GetDatabaseEndpoint(),
DatabaseName: req.GetDatabaseName(),
},
YdbOperationId: clientOperationID,
CreatedAt: time.Now(),
}

operationID, err := s.driver.CreateOperation(ctx, op)
if err != nil {
xlog.Error(ctx, "can't create operation", zap.String("operation", types.OperationToString(op)), zap.Error(err))
return nil, err
}

op.Id = operationID
return op.Proto(), nil
}

func (s *server) ListBackups(ctx context.Context, request *pb.ListBackupsRequest) (*pb.ListBackupsResponse, error) {
xlog.Debug(ctx, "ListBackups", zap.String("request", request.String()))
queryFilters := make([]queries.QueryFilter, 0)
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22
toolchain go1.22.1

require (
github.com/aws/aws-sdk-go v1.55.5
github.com/google/uuid v1.6.0
github.com/jonboulle/clockwork v0.3.0
github.com/stretchr/testify v1.8.1
Expand All @@ -20,6 +21,7 @@ require (
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang-jwt/jwt/v4 v4.4.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/net v0.26.0 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU=
github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
Expand Down Expand Up @@ -51,6 +53,10 @@ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jonboulle/clockwork v0.3.0 h1:9BSCMi8C+0qdApAp4auwX0RkLGUjs956h0EkuQymUhg=
github.com/jonboulle/clockwork v0.3.0/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
Expand Down Expand Up @@ -169,6 +175,8 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
86 changes: 83 additions & 3 deletions internal/connectors/client/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package client
import (
"context"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"path"
"regexp"
"strings"
Expand Down Expand Up @@ -32,7 +36,7 @@ type ClientConnector interface {
Close(ctx context.Context, clientDb *ydb.Driver) error

ExportToS3(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ExportSettings) (string, error)
ImportFromS3(ctx context.Context, clientDb *ydb.Driver, s3Settings *Ydb_Import.ImportFromS3Settings) (string, error)
ImportFromS3(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ImportSettings) (string, error)
GetOperationStatus(ctx context.Context, clientDb *ydb.Driver, operationId string) (*Ydb_Operations.GetOperationResponse, error)
ForgetOperation(ctx context.Context, clientDb *ydb.Driver, operationId string) (*Ydb_Operations.ForgetOperationResponse, error)
CancelOperation(ctx context.Context, clientDb *ydb.Driver, operationId string) (*Ydb_Operations.CancelOperationResponse, error)
Expand Down Expand Up @@ -218,6 +222,7 @@ func (d *ClientYdbConnector) ExportToS3(ctx context.Context, clientDb *ydb.Drive
exportClient := Ydb_Export_V1.NewExportServiceClient(ydb.GRPCConn(clientDb))
xlog.Info(ctx, "Exporting data to s3",
zap.String("endpoint", s3Settings.Endpoint),
zap.String("region", s3Settings.Region),
zap.String("bucket", s3Settings.Bucket),
zap.String("description", s3Settings.Description),
)
Expand All @@ -232,6 +237,7 @@ func (d *ClientYdbConnector) ExportToS3(ctx context.Context, clientDb *ydb.Drive
Settings: &Ydb_Export.ExportToS3Settings{
Endpoint: s3Settings.Endpoint,
Bucket: s3Settings.Bucket,
Region: s3Settings.Region,
AccessKey: s3Settings.AccessKey,
SecretKey: s3Settings.SecretKey,
Description: s3Settings.Description,
Expand All @@ -252,14 +258,79 @@ func (d *ClientYdbConnector) ExportToS3(ctx context.Context, clientDb *ydb.Drive
return response.GetOperation().GetId(), nil
}

func (d *ClientYdbConnector) ImportFromS3(ctx context.Context, clientDb *ydb.Driver, s3Settings *Ydb_Import.ImportFromS3Settings) (string, error) {
func prepareItemsForImport(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ImportSettings) ([]*Ydb_Import.ImportFromS3Settings_Item, error) {
if len(s3Settings.SourcePaths) == 0 {
return nil, fmt.Errorf("empty list of source paths for import")
}

s := session.Must(session.NewSession())

s3Client := s3.New(s,
&aws.Config{
Region: &s3Settings.Region,
Credentials: credentials.NewStaticCredentials(s3Settings.AccessKey, s3Settings.SecretKey, ""),
Endpoint: &s3Settings.Endpoint,
S3ForcePathStyle: &s3Settings.S3ForcePathStyle,
},
)

items := make([]*Ydb_Import.ImportFromS3Settings_Item, len(s3Settings.SourcePaths))

for _, sourcePath := range s3Settings.SourcePaths {
if sourcePath[len(sourcePath)-1] != '/' {
sourcePath = sourcePath + "/"
}

err := s3Client.ListObjectsPages(
&s3.ListObjectsInput{
Bucket: &s3Settings.Bucket,
Prefix: &sourcePath,
},
func(p *s3.ListObjectsOutput, last bool) (shouldContinue bool) {
for _, object := range p.Contents {

key, found := strings.CutSuffix(*object.Key, "scheme.pb")
if found {
items = append(
items,
&Ydb_Import.ImportFromS3Settings_Item{
SourcePrefix: key,
DestinationPath: path.Join(
clientDb.Scheme().Database(),
s3Settings.DestinationPrefix,
key[len(sourcePath):],
),
},
)
}
}

return true
},
)

if err != nil {
return nil, err
}
}

return items, nil
}

func (d *ClientYdbConnector) ImportFromS3(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ImportSettings) (string, error) {
if clientDb == nil {
return "", fmt.Errorf("unititialized client db driver")
}

items, err := prepareItemsForImport(ctx, clientDb, s3Settings)
if err != nil {
return "", fmt.Errorf("error preparing list of items for import: %s", err.Error())
}

importClient := Ydb_Import_V1.NewImportServiceClient(ydb.GRPCConn(clientDb))
xlog.Info(ctx, "Importing data from s3",
zap.String("endpoint", s3Settings.Endpoint),
zap.String("region", s3Settings.Region),
zap.String("bucket", s3Settings.Bucket),
zap.String("description", s3Settings.Description),
)
Expand All @@ -271,7 +342,16 @@ func (d *ClientYdbConnector) ImportFromS3(ctx context.Context, clientDb *ydb.Dri
OperationTimeout: durationpb.New(time.Second),
CancelAfter: durationpb.New(time.Second),
},
Settings: s3Settings,
Settings: &Ydb_Import.ImportFromS3Settings{
Endpoint: s3Settings.Endpoint,
Bucket: s3Settings.Bucket,
Region: s3Settings.Region,
AccessKey: s3Settings.AccessKey,
SecretKey: s3Settings.SecretKey,
Description: s3Settings.Description,
NumberOfRetries: s3Settings.NumberOfRetries,
Items: items,
},
},
)

Expand Down
7 changes: 3 additions & 4 deletions internal/connectors/client/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/google/uuid"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Import"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations"
"github.com/ydb-platform/ydb-go-sdk/v3"
)
Expand Down Expand Up @@ -78,9 +77,9 @@ func (m *MockClientConnector) ExportToS3(_ context.Context, _ *ydb.Driver, s3Set
return newOp.Id, nil
}

func (m *MockClientConnector) ImportFromS3(_ context.Context, _ *ydb.Driver, s3Settings *Ydb_Import.ImportFromS3Settings) (string, error) {
for _, item := range s3Settings.Items {
objectPath := ObjectPath{Bucket: s3Settings.Bucket, KeyPrefix: item.SourcePrefix}
func (m *MockClientConnector) ImportFromS3(_ context.Context, _ *ydb.Driver, s3Settings types.ImportSettings) (string, error) {
for _, source := range s3Settings.SourcePaths {
objectPath := ObjectPath{Bucket: s3Settings.Bucket, KeyPrefix: source}
if !m.storage[objectPath] {
return "", fmt.Errorf("object %v doesn't exist", objectPath)
}
Expand Down
19 changes: 19 additions & 0 deletions internal/types/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ func MustObjectIDFromBytes(b [16]byte) ObjectID {
return ObjectID(uuid.Must(uuid.FromBytes(b[:])))
}

func MustObjectIDFromString(s string) ObjectID {
return ObjectID(uuid.Must(uuid.Parse(s)))
}

func (bid ObjectID) Bytes() [16]byte {
return bid
}
Expand Down Expand Up @@ -279,6 +283,7 @@ const (
OperationTypeTB = OperationType("TB")
OperationTypeRB = OperationType("RB")
BackupTimestampFormat = "20060102_150405"
S3ForcePathStyle = true
)

func OperationToString(o Operation) string {
Expand Down Expand Up @@ -344,3 +349,17 @@ type ExportSettings struct {
DestinationPrefix string
BackupID ObjectID
}

type ImportSettings struct {
Endpoint string
Region string
Bucket string
AccessKey string
SecretKey string
Description string
NumberOfRetries uint32
BackupID string
SourcePaths []string
S3ForcePathStyle bool
DestinationPrefix string
}
Loading

0 comments on commit 9226cba

Please sign in to comment.