From 9556801355e22b34aca4e4e583b399db0b7850ec Mon Sep 17 00:00:00 2001 From: Nikhil Raverkar Date: Mon, 20 Jun 2022 17:43:23 -0400 Subject: [PATCH] Case2272044573 merge for 1.5 1 (#298) * added HP for `sampling_method` * added HP for `prob_buffer_row` remove after fixed in console * Case2272044573 master merge (#285) * removing duplicate files from data_path while creating symlink * not creating duplicate symlinks to resolve FileExistsError * not creating duplicate symlinks to resolve FileExistsError * not creating duplicate symlinks to resolve FileExistsError * not creating duplicate symlinks to resolve FileExistsError * not creating duplicate symlinks to resolve FileExistsError * not creating duplicate symlinks to resolve FileExistsError * not creating duplicate symlinks to resolve FileExistsError * not creating duplicate symlinks to resolve FileExistsError * not creating duplicate symlinks to resolve FileExistsError * not creating duplicate symlinks to resolve FileExistsError * Add warnings when validation files are suspected to be identical with training files (#273) * Add warnings when validation files are suspected to be identical with training files * Resolving comments * Add warnings when validation files are suspected to be identical with training files (#273) * Add warnings when validation files are suspected to be identical with training files * Resolving comments * remove the return statement in check redundancy function * fix some format issues * fixing Nvidia key error * fixing Nvidia key error * fixing Nvidia key error * add debug logs * add default value for validate path * check if validation path is not set * resolve comments on logging Co-authored-by: Nikhil Raverkar Co-authored-by: Haixin Wang <98612668+haixiw@users.noreply.github.com> Co-authored-by: haixiw * Revert "Case2272044573 master merge (#285)" (#290) * Revert "Case2272044573 master merge (#285)" This reverts commit 81405a65db282a7bcae14c412ba000fde8aa5da4. * Fix NVIDIA keyring error * Bump docker runtime from 17 to 19 * Delete numpy-1.21.6 site packages * Freeze protobuf to 3.20.1 Co-authored-by: Mark Bunday * fixed errors Case2272044573 * fixed errors Case2272044573 * fixed errors Case2272044573 * fixed errors Case2272044573 * fixed errors Case2272044573 Co-authored-by: Nikhil Raverkar Co-authored-by: Haixin Wang <98612668+haixiw@users.noreply.github.com> Co-authored-by: haixiw Co-authored-by: Mark Bunday <15115482+mabunday@users.noreply.github.com> Co-authored-by: Mark Bunday --- docker/1.5-1/base/Dockerfile.cpu | 2 +- .../hyperparameter_validation.py | 2 + .../algorithm_mode/train.py | 12 +- src/sagemaker_xgboost_container/data_utils.py | 126 +++++++++++++----- src/sagemaker_xgboost_container/training.py | 1 - test/unit/test_data_utils.py | 31 +++++ 6 files changed, 135 insertions(+), 39 deletions(-) diff --git a/docker/1.5-1/base/Dockerfile.cpu b/docker/1.5-1/base/Dockerfile.cpu index d497a7e0..cc0003c4 100644 --- a/docker/1.5-1/base/Dockerfile.cpu +++ b/docker/1.5-1/base/Dockerfile.cpu @@ -23,7 +23,7 @@ ENV PYTHONDONTWRITEBYTECODE=1 ENV PYTHONUNBUFFERED=1 ENV PYTHONIOENCODING='utf-8' -RUN rm /etc/apt/sources.list.d/cuda.list && \ +RUN rm /etc/apt/sources.list.d/cuda.list && \ rm /etc/apt/sources.list.d/nvidia-ml.list && \ apt-key del 7fa2af80 && \ apt-get update && apt-get install -y --no-install-recommends wget && \ diff --git a/src/sagemaker_xgboost_container/algorithm_mode/hyperparameter_validation.py b/src/sagemaker_xgboost_container/algorithm_mode/hyperparameter_validation.py index b86b2e93..54006c85 100644 --- a/src/sagemaker_xgboost_container/algorithm_mode/hyperparameter_validation.py +++ b/src/sagemaker_xgboost_container/algorithm_mode/hyperparameter_validation.py @@ -231,6 +231,8 @@ def interaction_constraints_validator(value, dependencies): required=False), hpv.CategoricalHyperparameter(name="single_precision_histogram", range=["true", "false"], required=False), hpv.CategoricalHyperparameter(name="deterministic_histogram", range=["true", "false"], required=False), + hpv.CategoricalHyperparameter(name="sampling_method", range=["uniform", "gradient_based"], required=False), + hpv.IntegerHyperparameter(name="prob_buffer_row", range=hpv.Interval(min_open=1.0), required=False), ) hyperparameters.declare_alias("eta", "learning_rate") diff --git a/src/sagemaker_xgboost_container/algorithm_mode/train.py b/src/sagemaker_xgboost_container/algorithm_mode/train.py index 1867807a..bffed5b7 100644 --- a/src/sagemaker_xgboost_container/algorithm_mode/train.py +++ b/src/sagemaker_xgboost_container/algorithm_mode/train.py @@ -20,7 +20,8 @@ from sklearn.model_selection import RepeatedKFold, RepeatedStratifiedKFold from sagemaker_algorithm_toolkit import exceptions as exc from sagemaker_algorithm_toolkit.channel_validation import Channel -from sagemaker_xgboost_container.data_utils import get_content_type, get_dmatrix, get_size, validate_data_file_path +from sagemaker_xgboost_container.data_utils import get_content_type, get_dmatrix, get_size, validate_data_file_path,\ + check_data_redundancy from sagemaker_xgboost_container import distributed from sagemaker_xgboost_container import checkpointing from sagemaker_xgboost_container.algorithm_mode import channel_validation as cv @@ -138,9 +139,14 @@ def sagemaker_train(train_config, data_config, train_path, val_path, model_dir, combine_train_val = '_kfold' in validated_train_config train_dmatrix, val_dmatrix, train_val_dmatrix = get_validated_dmatrices(train_path, val_path, file_type, csv_weights, is_pipe, combine_train_val) - checkpoint_dir = checkpoint_config.get("LocalPath", None) - + if val_path is not None: + if train_path == val_path or os.path.basename(train_path) == os.path.basename(val_path): + logger.warning('Found same path for training and validation. This is not recommended and results may not ' + 'be correct.') + elif not is_pipe: + # Check if there is potential data redundancy between training and validation sets + check_data_redundancy(train_path, val_path) train_args = dict( train_cfg=validated_train_config, train_dmatrix=train_dmatrix, diff --git a/src/sagemaker_xgboost_container/data_utils.py b/src/sagemaker_xgboost_container/data_utils.py index 34ebc676..68934feb 100644 --- a/src/sagemaker_xgboost_container/data_utils.py +++ b/src/sagemaker_xgboost_container/data_utils.py @@ -16,6 +16,8 @@ import os import shutil +from typing import List, Union + import mlio from mlio.integ.arrow import as_arrow_file from mlio.integ.numpy import as_numpy @@ -485,6 +487,51 @@ def get_recordio_protobuf_dmatrix(path, is_pipe=False): raise exc.UserError("Failed to load recordio-protobuf data with exception:\n{}".format(e)) +def _get_pipe_mode_files_path(data_path: Union[List[str], str]) -> List[str]: + """ + :param data_path: Either directory or file + """ + if isinstance(data_path, list): + files_path = data_path + else: + files_path = [data_path] + if not os.path.exists(f"{data_path}_0"): + logging.info(f"Pipe path {data_path} does not exist!") + return None + return files_path + + +def _get_file_mode_files_path(data_path: Union[List[str], str]) -> List[str]: + """ + :param data_path: Either directory or file + """ + # In file mode, we create a temp directory with symlink to all input files or + # directories to meet XGB's assumption that all files are in the same directory. + + if isinstance(data_path, list): + logging.info('File path {} of input files'.format(data_path)) + # Create a directory with symlinks to input files. + files_path = "/tmp/sagemaker_xgboost_input_data" + shutil.rmtree(files_path, ignore_errors=True) + os.mkdir(files_path) + for index, path in enumerate(data_path): + if not os.path.exists(path): + return None + if os.path.isfile(path): + _make_symlink(path, files_path, os.path.basename(path), index) + else: + for file in os.scandir(path): + _make_symlink(file, files_path, file.name, index) + + else: + if not os.path.exists(data_path): + logging.info('File path {} does not exist!'.format(data_path)) + return None + files_path = get_files_path_from_string(data_path) + + return files_path + + def get_dmatrix(data_path, content_type, csv_weights=0, is_pipe=False): """Create Data Matrix from CSV or LIBSVM file. @@ -507,38 +554,13 @@ def get_dmatrix(data_path, content_type, csv_weights=0, is_pipe=False): # So the only way to combine the data is to read them in one shot. # Fortunately, milo can read multiple pipes together. So we extends # the parameter data_path to support list. If data_path is string as usual, - # get_dmatrix will work as before. When it is a list, we work as follows. - # For pipe mode, it leverages mlio directly by creating a list of SageMakerPipe. - # In file mode, we create a temp directory with symlink to all input files or - # directories to meet XGB's assumption that all files are in the same directory. + # get_dmatrix will work as before. When it is a list, it works as explained in respective functions. + if is_pipe: - if isinstance(data_path, list): - files_path = data_path - else: - files_path = [data_path] - if not os.path.exists(data_path + '_0'): - logging.info('Pipe path {} does not exist!'.format(data_path)) - return None + files_path = _get_pipe_mode_files_path(data_path) else: - if not isinstance(data_path, list): - if not os.path.exists(data_path): - logging.info('File path {} does not exist!'.format(data_path)) - return None - files_path = get_files_path(data_path) - else: - # Create a directory with symlinks to input files. - files_path = "/tmp/sagemaker_xgboost_input_data" - shutil.rmtree(files_path, ignore_errors=True) - os.mkdir(files_path) - for path in data_path: - if not os.path.exists(path): - return None - if os.path.isfile(path): - os.symlink(path, os.path.join(files_path, os.path.basename(path))) - else: - for file in os.scandir(path): - os.symlink(file, os.path.join(files_path, file.name)) - + files_path = _get_file_mode_files_path(data_path) + logging.info(f"files path: {files_path}") if content_type.lower() == CSV: dmatrix = get_csv_dmatrix(files_path, csv_weights, is_pipe) elif content_type.lower() == LIBSVM: @@ -565,11 +587,11 @@ def get_size(data_path, is_pipe=False): :param is_pipe: Boolean to indicate if data is being read in pipe mode :return: Size of data or 1 if sagemaker pipe found """ - if is_pipe and os.path.exists(data_path + '_0'): - logging.info('Pipe path {} found.'.format(data_path)) + if is_pipe and os.path.exists(f"{data_path}_0"): + logging.info(f"Pipe path {data_path} found.") return 1 if not os.path.exists(data_path): - logging.info('Path {} does not exist!'.format(data_path)) + logging.info(f"Path {data_path} does not exist!") return 0 else: total_size = 0 @@ -585,7 +607,7 @@ def get_size(data_path, is_pipe=False): return total_size -def get_files_path(data_path): +def get_files_path_from_string(data_path: Union[List[str], str]) -> List[str]: if os.path.isfile(data_path): files_path = data_path else: @@ -595,3 +617,39 @@ def get_files_path(data_path): break return files_path + + +def _make_symlink(path, source_path, name, index): + base_name = os.path.join(source_path, f"{name}_{str(index)}") + logging.info(f'creating symlink between Path {source_path} and destination {base_name}') + os.symlink(path, base_name) + + +def check_data_redundancy(train_path, validate_path): + """Log a warning if suspected duplicate files are found in the training and validation folders. + + The validation score of models would be invalid if the same data is used for both training and validation. + Files are suspected of being duplicates when the file names are the same and their sizes are the same. + + param train_path : path to training data + param validate_path : path to validation data + """ + if not os.path.exists(train_path): + raise exc.UserError("training data's path is not existed") + if not os.path.exists(validate_path): + raise exc.UserError("validation data's path is not existed") + + training_files_set = set(f for f in os.listdir(train_path) if os.path.isfile(os.path.join(train_path, f))) + validation_files_set = set(f for f in os.listdir(validate_path) if os.path.isfile(os.path.join(validate_path, f))) + same_name_files = training_files_set.intersection(validation_files_set) + for f in same_name_files: + f_train_path = os.path.join(train_path, f) + f_validate_path = os.path.join(validate_path, f) + f_train_size = os.path.getsize(f_train_path) + f_validate_size = os.path.getsize(f_validate_path) + if f_train_size == f_validate_size: + logging.warning(f"Suspected identical files found. ({f_train_path} and {f_validate_path}" + f"with same size {f_validate_size} bytes)." + f" Note: Duplicate data in the training set and validation set is usually" + f" not intentional and can impair the validity of the model evaluation by" + f" the validation score.") diff --git a/src/sagemaker_xgboost_container/training.py b/src/sagemaker_xgboost_container/training.py index d84043fe..1adaf5f3 100644 --- a/src/sagemaker_xgboost_container/training.py +++ b/src/sagemaker_xgboost_container/training.py @@ -55,7 +55,6 @@ def run_algorithm_mode(): train_path = os.environ[sm_env_constants.SM_CHANNEL_TRAIN] val_path = os.environ.get(sm_env_constants.SM_CHANNEL_VALIDATION) - sm_hosts = json.loads(os.environ[sm_env_constants.SM_HOSTS]) sm_current_host = os.environ[sm_env_constants.SM_CURRENT_HOST] diff --git a/test/unit/test_data_utils.py b/test/unit/test_data_utils.py index 366eef1a..2ca5d3be 100644 --- a/test/unit/test_data_utils.py +++ b/test/unit/test_data_utils.py @@ -19,6 +19,7 @@ import subprocess import sys import time +from mock import patch from sagemaker_algorithm_toolkit import exceptions as exc from sagemaker_xgboost_container import data_utils @@ -221,3 +222,33 @@ def test_parse_protobuf_dmatrix_single_feature_label(self): pb_path = os.path.join(self.data_path, 'recordio_protobuf', file_path) reader = data_utils.get_recordio_protobuf_dmatrix self._check_dmatrix(reader, pb_path, 1, 1) + + @patch("logging.warning") + def test_check_data_redundancy_positive(self, mock_log_warning): + current_path = Path(os.path.abspath(__file__)) + data_path = os.path.join(str(current_path.parent.parent), 'resources', 'abalone', 'data') + file_path = os.path.join(data_path, "train") + data_utils.check_data_redundancy(file_path, file_path) + mock_log_warning.assert_called() + + @patch("logging.warning") + def test_check_data_redundancy_negative(self, mock_log_warning): + current_path = Path(os.path.abspath(__file__)) + data_path = os.path.join(str(current_path.parent.parent), 'resources', 'abalone', 'data') + file_path = [os.path.join(data_path, path) for path in ['train', 'validation']] + data_utils.check_data_redundancy(file_path[0], file_path[1]) + mock_log_warning.assert_not_called() + + def test_check_data_redundancy_does_not_throw_exception_file(self): + current_path = Path(os.path.abspath(__file__)) + data_path = os.path.join(str(current_path.parent.parent), 'resources', 'abalone', 'data') + file_path = os.path.join(data_path, "train") + try: + data_utils.check_data_redundancy(file_path, file_path) + except Exception as e: + assert False, f"check_data_redundancy raised an exception {e} for file mode" + + def test_check_data_redundancy_throws_exception_pipe(self): + pb_file_paths = ['pb_files'] + with self.assertRaises(Exception): + data_utils.check_data_redundancy(pb_file_paths[0], pb_file_paths[1])