Skip to content

Commit

Permalink
uuid pk -> string pk
Browse files Browse the repository at this point in the history
  • Loading branch information
qrort committed Aug 21, 2024
1 parent 7a3d050 commit 217e5cd
Show file tree
Hide file tree
Showing 19 changed files with 267 additions and 265 deletions.
66 changes: 46 additions & 20 deletions internal/connectors/client/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,15 @@ type ClientConnector interface {

ExportToS3(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ExportSettings) (string, error)
ImportFromS3(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ImportSettings) (string, error)
GetOperationStatus(ctx context.Context, clientDb *ydb.Driver, operationId string) (*Ydb_Operations.GetOperationResponse, error)
ForgetOperation(ctx context.Context, clientDb *ydb.Driver, operationId string) (*Ydb_Operations.ForgetOperationResponse, error)
CancelOperation(ctx context.Context, clientDb *ydb.Driver, operationId string) (*Ydb_Operations.CancelOperationResponse, error)
GetOperationStatus(
ctx context.Context, clientDb *ydb.Driver, operationId string,
) (*Ydb_Operations.GetOperationResponse, error)
ForgetOperation(
ctx context.Context, clientDb *ydb.Driver, operationId string,
) (*Ydb_Operations.ForgetOperationResponse, error)
CancelOperation(
ctx context.Context, clientDb *ydb.Driver, operationId string,
) (*Ydb_Operations.CancelOperationResponse, error)
}

type ClientYdbConnector struct {
Expand Down Expand Up @@ -99,14 +105,18 @@ func isExportDirectory(fullPath string, database string) bool {
return strings.HasPrefix(fullPath, path.Join(database, "export"))
}

func listDirectory(ctx context.Context, clientDb *ydb.Driver, initialPath string, exclusions []regexp.Regexp) ([]string, error) {
func listDirectory(ctx context.Context, clientDb *ydb.Driver, initialPath string, exclusions []regexp.Regexp) (
[]string, error,
) {
var dir scheme.Directory
var err error

err = retry.Retry(ctx, func(ctx context.Context) (err error) {
dir, err = clientDb.Scheme().ListDirectory(ctx, initialPath)
return err
}, retry.WithIdempotent(true))
err = retry.Retry(
ctx, func(ctx context.Context) (err error) {
dir, err = clientDb.Scheme().ListDirectory(ctx, initialPath)
return err
}, retry.WithIdempotent(true),
)

if err != nil {
return nil, fmt.Errorf("list directory %s was failed: %v", initialPath, err)
Expand Down Expand Up @@ -158,7 +168,9 @@ func listDirectory(ctx context.Context, clientDb *ydb.Driver, initialPath string
return result, nil
}

func prepareItemsForExport(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ExportSettings) ([]*Ydb_Export.ExportToS3Settings_Item, error) {
func prepareItemsForExport(
ctx context.Context, clientDb *ydb.Driver, s3Settings types.ExportSettings,
) ([]*Ydb_Export.ExportToS3Settings_Item, error) {
sources := make([]string, 0)
exclusions := make([]regexp.Regexp, len(s3Settings.SourcePathToExclude))

Expand Down Expand Up @@ -197,7 +209,7 @@ func prepareItemsForExport(ctx context.Context, clientDb *ydb.Driver, s3Settings
destinationPrefix := path.Join(
s3Settings.DestinationPrefix,
clientDb.Scheme().Database(),
time.Now().Format(types.BackupTimestampFormat)+"_"+s3Settings.BackupID.String(),
time.Now().Format(types.BackupTimestampFormat)+"_"+s3Settings.BackupID,
strings.TrimPrefix(source, clientDb.Scheme().Database()+"/"),
)

Expand All @@ -210,7 +222,9 @@ func prepareItemsForExport(ctx context.Context, clientDb *ydb.Driver, s3Settings
return items, nil
}

func (d *ClientYdbConnector) ExportToS3(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ExportSettings) (string, error) {
func (d *ClientYdbConnector) ExportToS3(
ctx context.Context, clientDb *ydb.Driver, s3Settings types.ExportSettings,
) (string, error) {
if clientDb == nil {
return "", fmt.Errorf("unititialized client db driver")
}
Expand All @@ -221,7 +235,8 @@ func (d *ClientYdbConnector) ExportToS3(ctx context.Context, clientDb *ydb.Drive
}

exportClient := Ydb_Export_V1.NewExportServiceClient(ydb.GRPCConn(clientDb))
xlog.Info(ctx, "Exporting data to s3",
xlog.Info(
ctx, "Exporting data to s3",
zap.String("endpoint", s3Settings.Endpoint),
zap.String("region", s3Settings.Region),
zap.String("bucket", s3Settings.Bucket),
Expand Down Expand Up @@ -329,7 +344,8 @@ func (d *ClientYdbConnector) ImportFromS3(ctx context.Context, clientDb *ydb.Dri
}

importClient := Ydb_Import_V1.NewImportServiceClient(ydb.GRPCConn(clientDb))
xlog.Info(ctx, "Importing data from s3",
xlog.Info(
ctx, "Importing data from s3",
zap.String("endpoint", s3Settings.Endpoint),
zap.String("region", s3Settings.Region),
zap.String("bucket", s3Settings.Bucket),
Expand Down Expand Up @@ -361,21 +377,25 @@ func (d *ClientYdbConnector) ImportFromS3(ctx context.Context, clientDb *ydb.Dri
}

if response.GetOperation().GetStatus() != Ydb.StatusIds_SUCCESS {
return "", fmt.Errorf("importing from s3 was failed: %v",
return "", fmt.Errorf(
"importing from s3 was failed: %v",
response.GetOperation().GetIssues(),
)
}

return response.GetOperation().GetId(), nil
}

func (d *ClientYdbConnector) GetOperationStatus(ctx context.Context, clientDb *ydb.Driver, operationId string) (*Ydb_Operations.GetOperationResponse, error) {
func (d *ClientYdbConnector) GetOperationStatus(
ctx context.Context, clientDb *ydb.Driver, operationId string,
) (*Ydb_Operations.GetOperationResponse, error) {
if clientDb == nil {
return nil, fmt.Errorf("unititialized client db driver")
}

client := Ydb_Operation_V1.NewOperationServiceClient(ydb.GRPCConn(clientDb))
xlog.Info(ctx, "Requesting operation status",
xlog.Info(
ctx, "Requesting operation status",
zap.String("id", operationId),
)

Expand All @@ -393,13 +413,16 @@ func (d *ClientYdbConnector) GetOperationStatus(ctx context.Context, clientDb *y
return response, nil
}

func (d *ClientYdbConnector) ForgetOperation(ctx context.Context, clientDb *ydb.Driver, operationId string) (*Ydb_Operations.ForgetOperationResponse, error) {
func (d *ClientYdbConnector) ForgetOperation(
ctx context.Context, clientDb *ydb.Driver, operationId string,
) (*Ydb_Operations.ForgetOperationResponse, error) {
if clientDb == nil {
return nil, fmt.Errorf("unititialized client db driver")
}

client := Ydb_Operation_V1.NewOperationServiceClient(ydb.GRPCConn(clientDb))
xlog.Info(ctx, "Forgetting operation",
xlog.Info(
ctx, "Forgetting operation",
zap.String("id", operationId),
)

Expand All @@ -417,13 +440,16 @@ func (d *ClientYdbConnector) ForgetOperation(ctx context.Context, clientDb *ydb.
return response, nil
}

func (d *ClientYdbConnector) CancelOperation(ctx context.Context, clientDb *ydb.Driver, operationId string) (*Ydb_Operations.CancelOperationResponse, error) {
func (d *ClientYdbConnector) CancelOperation(
ctx context.Context, clientDb *ydb.Driver, operationId string,
) (*Ydb_Operations.CancelOperationResponse, error) {
if clientDb == nil {
return nil, fmt.Errorf("unititialized client db driver")
}

client := Ydb_Operation_V1.NewOperationServiceClient(ydb.GRPCConn(clientDb))
xlog.Info(ctx, "Cancelling operation",
xlog.Info(
ctx, "Cancelling operation",
zap.String("id", operationId),
)

Expand Down
20 changes: 10 additions & 10 deletions internal/connectors/db/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ type DBConnector interface {
SelectBackupsByStatus(ctx context.Context, backupStatus string) ([]*types.Backup, error)
ActiveOperations(context.Context) ([]types.Operation, error)
UpdateOperation(context.Context, types.Operation) error
CreateOperation(context.Context, types.Operation) (types.ObjectID, error)
CreateBackup(context.Context, types.Backup) (types.ObjectID, error)
UpdateBackup(context context.Context, id types.ObjectID, backupState string) error
CreateOperation(context.Context, types.Operation) (string, error)
CreateBackup(context.Context, types.Backup) (string, error)
UpdateBackup(context context.Context, id string, backupState string) error
ExecuteUpsert(ctx context.Context, queryBuilder queries.WriteTableQuery) error
Close(context.Context)
}
Expand Down Expand Up @@ -307,29 +307,29 @@ func (d *YdbConnector) UpdateOperation(

func (d *YdbConnector) CreateOperation(
ctx context.Context, operation types.Operation,
) (types.ObjectID, error) {
operation.SetId(types.GenerateObjectID())
) (string, error) {
operation.SetID(types.GenerateObjectID())
err := d.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithCreateOperation(operation))
if err != nil {
return types.ObjectID{}, err
return "", err
}
return operation.GetId(), nil
return operation.GetID(), nil
}

func (d *YdbConnector) CreateBackup(
ctx context.Context, backup types.Backup,
) (types.ObjectID, error) {
) (string, error) {
id := types.GenerateObjectID()
backup.ID = id
err := d.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithCreateBackup(backup))
if err != nil {
return types.ObjectID{}, err
return "", err
}
return id, nil
}

func (d *YdbConnector) UpdateBackup(
ctx context.Context, id types.ObjectID, backupStatus string,
ctx context.Context, id string, backupStatus string,
) error {
backup := types.Backup{
ID: id,
Expand Down
38 changes: 19 additions & 19 deletions internal/connectors/db/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,30 @@ import (
)

type MockDBConnector struct {
operations map[types.ObjectID]types.Operation
backups map[types.ObjectID]types.Backup
operations map[string]types.Operation
backups map[string]types.Backup
}

type Option func(*MockDBConnector)

func NewMockDBConnector(options ...Option) *MockDBConnector {
connector := &MockDBConnector{
operations: make(map[types.ObjectID]types.Operation),
backups: make(map[types.ObjectID]types.Backup),
operations: make(map[string]types.Operation),
backups: make(map[string]types.Backup),
}
for _, opt := range options {
opt(connector)
}
return connector
}

func WithOperations(operations map[types.ObjectID]types.Operation) Option {
func WithOperations(operations map[string]types.Operation) Option {
return func(c *MockDBConnector) {
c.operations = operations
}
}

func WithBackups(backups map[types.ObjectID]types.Backup) Option {
func WithBackups(backups map[string]types.Backup) Option {
return func(c *MockDBConnector) {
c.backups = backups
}
Expand All @@ -61,7 +61,7 @@ func (c *MockDBConnector) SelectBackupsByStatus(
}

func (c *MockDBConnector) UpdateBackup(
_ context.Context, id types.ObjectID, backupStatus string,
_ context.Context, id string, backupStatus string,
) error {
if _, ok := c.backups[id]; !ok {
return fmt.Errorf("no backup found for id %v", id)
Expand All @@ -77,8 +77,8 @@ func (c *MockDBConnector) GetTableClient() table.Client {
return nil
}

func (c *MockDBConnector) CreateBackup(_ context.Context, backup types.Backup) (types.ObjectID, error) {
var id types.ObjectID
func (c *MockDBConnector) CreateBackup(_ context.Context, backup types.Backup) (string, error) {
var id string
for {
id = types.GenerateObjectID()
if _, exist := c.backups[id]; !exist {
Expand All @@ -105,49 +105,49 @@ func (c *MockDBConnector) ActiveOperations(_ context.Context) (
func (c *MockDBConnector) UpdateOperation(
_ context.Context, op types.Operation,
) error {
if _, exist := c.operations[op.GetId()]; !exist {
if _, exist := c.operations[op.GetID()]; !exist {
return fmt.Errorf(
"update nonexistent operation %s", types.OperationToString(op),
)
}
c.operations[op.GetId()] = op
c.operations[op.GetID()] = op
return nil
}

func (c *MockDBConnector) CreateOperation(
_ context.Context, op types.Operation,
) (types.ObjectID, error) {
var id types.ObjectID
) (string, error) {
var id string
for {
id = types.GenerateObjectID()
if _, exist := c.operations[id]; !exist {
break
}
}
op.SetId(id)
op.SetID(id)
c.operations[id] = op
return id, nil
}

func (c *MockDBConnector) GetOperation(
_ context.Context, operationID types.ObjectID,
_ context.Context, operationID string,
) (types.Operation, error) {
if op, exist := c.operations[operationID]; exist {
return op, nil
}
return &types.GenericOperation{}, fmt.Errorf(
"operation not found, id %s", operationID.String(),
"operation not found, id %s", operationID,
)
}

func (c *MockDBConnector) GetBackup(
_ context.Context, backupID types.ObjectID,
_ context.Context, backupID string,
) (types.Backup, error) {
if backup, exist := c.backups[backupID]; exist {
return backup, nil
}
return types.Backup{}, fmt.Errorf(
"backup not found, id %s", backupID.String(),
"backup not found, id %s", backupID,
)
}

Expand All @@ -159,7 +159,7 @@ func (c *MockDBConnector) SelectOperations(

func (c *MockDBConnector) ExecuteUpsert(_ context.Context, queryBuilder queries.WriteTableQuery) error {
queryBuilderMock := queryBuilder.(*queries.WriteTableQueryMock)
c.operations[queryBuilderMock.Operation.GetId()] = queryBuilderMock.Operation
c.operations[queryBuilderMock.Operation.GetID()] = queryBuilderMock.Operation
c.backups[queryBuilderMock.Backup.ID] = queryBuilderMock.Backup
return nil
}
Loading

0 comments on commit 217e5cd

Please sign in to comment.