diff --git a/internal/connectors/client/connector.go b/internal/connectors/client/connector.go index cf6c5e26..948eb005 100644 --- a/internal/connectors/client/connector.go +++ b/internal/connectors/client/connector.go @@ -209,7 +209,7 @@ func prepareItemsForExport( items := make([]*Ydb_Export.ExportToS3Settings_Item, len(sources)) for i, source := range sources { - // Destination prefix format: s3_destination_prefix/database_name/timestamp_backup_id/rel_source_path + // Destination prefix format: s3_destination_prefix/database_name/timestamp/rel_source_path destinationPrefix := path.Join( s3Settings.DestinationPrefix, clientDb.Scheme().Database(), @@ -295,7 +295,8 @@ func prepareItemsForImport(ctx context.Context, clientDb *ydb.Driver, s3Settings }, ) - items := make([]*Ydb_Import.ImportFromS3Settings_Item, len(s3Settings.SourcePaths)) + items := make([]*Ydb_Import.ImportFromS3Settings_Item, 0) + itemsPtr := &items for _, sourcePath := range s3Settings.SourcePaths { if sourcePath[len(sourcePath)-1] != '/' { @@ -312,8 +313,8 @@ func prepareItemsForImport(ctx context.Context, clientDb *ydb.Driver, s3Settings key, found := strings.CutSuffix(*object.Key, "scheme.pb") if found { - items = append( - items, + *itemsPtr = append( + *itemsPtr, &Ydb_Import.ImportFromS3Settings_Item{ SourcePrefix: key, DestinationPath: path.Join( @@ -335,7 +336,7 @@ func prepareItemsForImport(ctx context.Context, clientDb *ydb.Driver, s3Settings } } - return items, nil + return *itemsPtr, nil } func (d *ClientYdbConnector) ImportFromS3(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ImportSettings) (string, error) { @@ -348,6 +349,10 @@ func (d *ClientYdbConnector) ImportFromS3(ctx context.Context, clientDb *ydb.Dri return "", fmt.Errorf("error preparing list of items for import: %s", err.Error()) } + if len(items) == 0 { + return "", fmt.Errorf("empty list of items for import") + } + importClient := Ydb_Import_V1.NewImportServiceClient(ydb.GRPCConn(clientDb)) xlog.Info( ctx, "Importing data from s3", diff --git a/internal/connectors/db/yql/queries/write.go b/internal/connectors/db/yql/queries/write.go index 43ac901a..d6856d93 100644 --- a/internal/connectors/db/yql/queries/write.go +++ b/internal/connectors/db/yql/queries/write.go @@ -77,8 +77,13 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i ) } } - tb, ok := operation.(*types.TakeBackupOperation) - if ok { + + if operation.GetType() == types.OperationTypeTB { + tb, ok := operation.(*types.TakeBackupOperation) + if !ok { + xlog.Error(ctx, "error cast operation to TakeBackupOperation", zap.String("operation_id", operation.GetID())) + } + d.AddValueParam( "$container_id", table_types.StringValueFromString(tb.ContainerID), ) @@ -108,8 +113,35 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i table_types.StringValueFromString(strings.Join(tb.SourcePathToExclude, ",")), ) } + } else if operation.GetType() == types.OperationTypeRB { + rb, ok := operation.(*types.RestoreBackupOperation) + if !ok { + xlog.Error(ctx, "error cast operation to RestoreBackupOperation", zap.String("operation_id", operation.GetID())) + } + + d.AddValueParam( + "$container_id", table_types.StringValueFromString(rb.ContainerID), + ) + d.AddValueParam( + "$database", + table_types.StringValueFromString(rb.YdbConnectionParams.DatabaseName), + ) + d.AddValueParam( + "$endpoint", + table_types.StringValueFromString(rb.YdbConnectionParams.Endpoint), + ) + d.AddValueParam( + "$backup_id", + table_types.StringValueFromString(rb.BackupId), + ) + d.AddValueParam( + "$operation_id", + table_types.StringValueFromString(rb.YdbOperationId), + ) + d.AddValueParam("$message", table_types.StringValueFromString(rb.Message)) + + // TODO: add other fields } else { - //TODO: support RestoreBackup operation xlog.Error(ctx, "unknown operation type write to db", zap.String("operation_type", string(operation.GetType()))) }