Skip to content

Commit

Permalink
fix(ydbcp): support db writer for restore operation
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina committed Aug 28, 2024
1 parent 608ba43 commit 4d8b185
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 12 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
ydb:
image: cr.ai.nebius.cloud/crnca8q7ti1i7vpqs28l/ydb-local:24.1.16-stream-nb-1.1-1
image: cr.yandex/yc/yandex-docker-local-ydb:24.1
platform: linux/amd64
hostname: local-ydb
#volumes:
Expand Down
15 changes: 10 additions & 5 deletions internal/connectors/client/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func prepareItemsForExport(
items := make([]*Ydb_Export.ExportToS3Settings_Item, len(sources))

for i, source := range sources {
// Destination prefix format: s3_destination_prefix/database_name/timestamp_backup_id/rel_source_path
// Destination prefix format: s3_destination_prefix/database_name/timestamp/rel_source_path
destinationPrefix := path.Join(
s3Settings.DestinationPrefix,
clientDb.Scheme().Database(),
Expand Down Expand Up @@ -295,7 +295,8 @@ func prepareItemsForImport(ctx context.Context, clientDb *ydb.Driver, s3Settings
},
)

items := make([]*Ydb_Import.ImportFromS3Settings_Item, len(s3Settings.SourcePaths))
items := make([]*Ydb_Import.ImportFromS3Settings_Item, 0)
itemsPtr := &items

for _, sourcePath := range s3Settings.SourcePaths {
if sourcePath[len(sourcePath)-1] != '/' {
Expand All @@ -312,8 +313,8 @@ func prepareItemsForImport(ctx context.Context, clientDb *ydb.Driver, s3Settings

key, found := strings.CutSuffix(*object.Key, "scheme.pb")
if found {
items = append(
items,
*itemsPtr = append(
*itemsPtr,
&Ydb_Import.ImportFromS3Settings_Item{
SourcePrefix: key,
DestinationPath: path.Join(
Expand All @@ -335,7 +336,7 @@ func prepareItemsForImport(ctx context.Context, clientDb *ydb.Driver, s3Settings
}
}

return items, nil
return *itemsPtr, nil
}

func (d *ClientYdbConnector) ImportFromS3(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ImportSettings) (string, error) {
Expand All @@ -348,6 +349,10 @@ func (d *ClientYdbConnector) ImportFromS3(ctx context.Context, clientDb *ydb.Dri
return "", fmt.Errorf("error preparing list of items for import: %s", err.Error())
}

if len(items) == 0 {
return "", fmt.Errorf("empty list of items for import")
}

importClient := Ydb_Import_V1.NewImportServiceClient(ydb.GRPCConn(clientDb))
xlog.Info(
ctx, "Importing data from s3",
Expand Down
1 change: 1 addition & 0 deletions internal/connectors/db/process_result_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
DatabaseName: databaseName,
},
YdbOperationId: StringOrEmpty(ydbOperationId),
SourcePaths: sourcePathsSlice,
Audit: auditFromDb(creator, createdAt, completedAt),
}, nil
}
Expand Down
40 changes: 37 additions & 3 deletions internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,13 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i
)
}
}
tb, ok := operation.(*types.TakeBackupOperation)
if ok {

if operation.GetType() == types.OperationTypeTB {
tb, ok := operation.(*types.TakeBackupOperation)
if !ok {
xlog.Error(ctx, "error cast operation to TakeBackupOperation", zap.String("operation_id", operation.GetID()))
}

d.AddValueParam(
"$container_id", table_types.StringValueFromString(tb.ContainerID),
)
Expand Down Expand Up @@ -108,8 +113,37 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i
table_types.StringValueFromString(strings.Join(tb.SourcePathToExclude, ",")),
)
}
} else if operation.GetType() == types.OperationTypeRB {
rb, ok := operation.(*types.RestoreBackupOperation)
if !ok {
xlog.Error(ctx, "error cast operation to RestoreBackupOperation", zap.String("operation_id", operation.GetID()))
}

d.AddValueParam(
"$container_id", table_types.StringValueFromString(rb.ContainerID),
)
d.AddValueParam(
"$database",
table_types.StringValueFromString(rb.YdbConnectionParams.DatabaseName),
)
d.AddValueParam(
"$endpoint",
table_types.StringValueFromString(rb.YdbConnectionParams.Endpoint),
)
d.AddValueParam(
"$backup_id",
table_types.StringValueFromString(rb.BackupId),
)
d.AddValueParam(
"$operation_id",
table_types.StringValueFromString(rb.YdbOperationId),
)
d.AddValueParam("$message", table_types.StringValueFromString(rb.Message))

if len(rb.SourcePaths) > 0 {
d.AddValueParam("$paths", table_types.StringValueFromString(strings.Join(rb.SourcePaths, ",")))
}
} else {
//TODO: support RestoreBackup operation
xlog.Error(ctx, "unknown operation type write to db", zap.String("operation_type", string(operation.GetType())))
}

Expand Down
2 changes: 2 additions & 0 deletions internal/server/services/backup/backupservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ func (s *BackupService) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequ
CreatedAt: timestamppb.Now(),
Creator: subject,
},
SourcePaths: req.GetSourcePaths(),
DestinationPrefix: req.GetDestinationPrefix(),
}

operationID, err := s.driver.CreateOperation(ctx, op)
Expand Down
10 changes: 7 additions & 3 deletions internal/types/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,12 @@ type RestoreBackupOperation struct {
Message string
YdbConnectionParams YdbConnectionParams
YdbOperationId string
DestinationPaths []string
Audit *pb.AuditInfo
// TODO: this field is not used, because DestinationPaths
// will be constructed from SourcePaths and DestinationPrefix
DestinationPaths []string
SourcePaths []string
DestinationPrefix string
Audit *pb.AuditInfo
}

func (o *RestoreBackupOperation) GetID() string {
Expand Down Expand Up @@ -203,7 +207,7 @@ func (o *RestoreBackupOperation) Proto() *pb.Operation {
DatabaseEndpoint: o.YdbConnectionParams.Endpoint,
YdbServerOperationId: o.YdbOperationId,
BackupId: o.BackupId,
SourcePaths: nil,
SourcePaths: o.SourcePaths,
SourcePathsToExclude: nil,
RestorePaths: o.DestinationPaths,
Audit: o.Audit,
Expand Down

0 comments on commit 4d8b185

Please sign in to comment.