Skip to content

Commit

Permalink
Case2272044573 merge for 1.5 1 (#298)
Browse files Browse the repository at this point in the history
* added HP for `sampling_method`

* added HP for `prob_buffer_row` remove after fixed in console

* Case2272044573 master merge (#285)

* removing duplicate files from data_path while creating symlink

* not creating duplicate symlinks to resolve FileExistsError

* not creating duplicate symlinks to resolve FileExistsError

* not creating duplicate symlinks to resolve FileExistsError

* not creating duplicate symlinks to resolve FileExistsError

* not creating duplicate symlinks to resolve FileExistsError

* not creating duplicate symlinks to resolve FileExistsError

* not creating duplicate symlinks to resolve FileExistsError

* not creating duplicate symlinks to resolve FileExistsError

* not creating duplicate symlinks to resolve FileExistsError

* not creating duplicate symlinks to resolve FileExistsError

* Add warnings when validation files are suspected to be identical with training files (#273)

* Add warnings when validation files are suspected to be identical with training files

* Resolving comments

* Add warnings when validation files are suspected to be identical with training files (#273)

* Add warnings when validation files are suspected to be identical with training files

* Resolving comments

* remove the return statement in check redundancy function

* fix some format issues

* fixing Nvidia key error

* fixing Nvidia key error

* fixing Nvidia key error

* add debug logs

* add default value for validate path

* check if validation path is not set

* resolve comments on logging

Co-authored-by: Nikhil Raverkar <[email protected]>
Co-authored-by: Haixin Wang <[email protected]>
Co-authored-by: haixiw <[email protected]>

* Revert "Case2272044573 master merge (#285)" (#290)

* Revert "Case2272044573 master merge (#285)"

This reverts commit 81405a6.

* Fix NVIDIA keyring error

* Bump docker runtime from 17 to 19

* Delete numpy-1.21.6 site packages

* Freeze protobuf to 3.20.1

Co-authored-by: Mark Bunday <[email protected]>

* fixed errors Case2272044573

* fixed errors Case2272044573

* fixed errors Case2272044573

* fixed errors Case2272044573

* fixed errors Case2272044573

Co-authored-by: Nikhil Raverkar <[email protected]>
Co-authored-by: Haixin Wang <[email protected]>
Co-authored-by: haixiw <[email protected]>
Co-authored-by: Mark Bunday <[email protected]>
Co-authored-by: Mark Bunday <[email protected]>
  • Loading branch information
6 people authored Jun 20, 2022
1 parent 4723e36 commit 9556801
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 39 deletions.
2 changes: 1 addition & 1 deletion docker/1.5-1/base/Dockerfile.cpu
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV PYTHONIOENCODING='utf-8'

RUN rm /etc/apt/sources.list.d/cuda.list && \
RUN rm /etc/apt/sources.list.d/cuda.list && \
rm /etc/apt/sources.list.d/nvidia-ml.list && \
apt-key del 7fa2af80 && \
apt-get update && apt-get install -y --no-install-recommends wget && \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ def interaction_constraints_validator(value, dependencies):
required=False),
hpv.CategoricalHyperparameter(name="single_precision_histogram", range=["true", "false"], required=False),
hpv.CategoricalHyperparameter(name="deterministic_histogram", range=["true", "false"], required=False),
hpv.CategoricalHyperparameter(name="sampling_method", range=["uniform", "gradient_based"], required=False),
hpv.IntegerHyperparameter(name="prob_buffer_row", range=hpv.Interval(min_open=1.0), required=False),
)

hyperparameters.declare_alias("eta", "learning_rate")
Expand Down
12 changes: 9 additions & 3 deletions src/sagemaker_xgboost_container/algorithm_mode/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from sklearn.model_selection import RepeatedKFold, RepeatedStratifiedKFold
from sagemaker_algorithm_toolkit import exceptions as exc
from sagemaker_algorithm_toolkit.channel_validation import Channel
from sagemaker_xgboost_container.data_utils import get_content_type, get_dmatrix, get_size, validate_data_file_path
from sagemaker_xgboost_container.data_utils import get_content_type, get_dmatrix, get_size, validate_data_file_path,\
check_data_redundancy
from sagemaker_xgboost_container import distributed
from sagemaker_xgboost_container import checkpointing
from sagemaker_xgboost_container.algorithm_mode import channel_validation as cv
Expand Down Expand Up @@ -138,9 +139,14 @@ def sagemaker_train(train_config, data_config, train_path, val_path, model_dir,
combine_train_val = '_kfold' in validated_train_config
train_dmatrix, val_dmatrix, train_val_dmatrix = get_validated_dmatrices(train_path, val_path, file_type,
csv_weights, is_pipe, combine_train_val)

checkpoint_dir = checkpoint_config.get("LocalPath", None)

if val_path is not None:
if train_path == val_path or os.path.basename(train_path) == os.path.basename(val_path):
logger.warning('Found same path for training and validation. This is not recommended and results may not '
'be correct.')
elif not is_pipe:
# Check if there is potential data redundancy between training and validation sets
check_data_redundancy(train_path, val_path)
train_args = dict(
train_cfg=validated_train_config,
train_dmatrix=train_dmatrix,
Expand Down
126 changes: 92 additions & 34 deletions src/sagemaker_xgboost_container/data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import os
import shutil

from typing import List, Union

import mlio
from mlio.integ.arrow import as_arrow_file
from mlio.integ.numpy import as_numpy
Expand Down Expand Up @@ -485,6 +487,51 @@ def get_recordio_protobuf_dmatrix(path, is_pipe=False):
raise exc.UserError("Failed to load recordio-protobuf data with exception:\n{}".format(e))


def _get_pipe_mode_files_path(data_path: Union[List[str], str]) -> List[str]:
"""
:param data_path: Either directory or file
"""
if isinstance(data_path, list):
files_path = data_path
else:
files_path = [data_path]
if not os.path.exists(f"{data_path}_0"):
logging.info(f"Pipe path {data_path} does not exist!")
return None
return files_path


def _get_file_mode_files_path(data_path: Union[List[str], str]) -> List[str]:
"""
:param data_path: Either directory or file
"""
# In file mode, we create a temp directory with symlink to all input files or
# directories to meet XGB's assumption that all files are in the same directory.

if isinstance(data_path, list):
logging.info('File path {} of input files'.format(data_path))
# Create a directory with symlinks to input files.
files_path = "/tmp/sagemaker_xgboost_input_data"
shutil.rmtree(files_path, ignore_errors=True)
os.mkdir(files_path)
for index, path in enumerate(data_path):
if not os.path.exists(path):
return None
if os.path.isfile(path):
_make_symlink(path, files_path, os.path.basename(path), index)
else:
for file in os.scandir(path):
_make_symlink(file, files_path, file.name, index)

else:
if not os.path.exists(data_path):
logging.info('File path {} does not exist!'.format(data_path))
return None
files_path = get_files_path_from_string(data_path)

return files_path


def get_dmatrix(data_path, content_type, csv_weights=0, is_pipe=False):
"""Create Data Matrix from CSV or LIBSVM file.
Expand All @@ -507,38 +554,13 @@ def get_dmatrix(data_path, content_type, csv_weights=0, is_pipe=False):
# So the only way to combine the data is to read them in one shot.
# Fortunately, milo can read multiple pipes together. So we extends
# the parameter data_path to support list. If data_path is string as usual,
# get_dmatrix will work as before. When it is a list, we work as follows.
# For pipe mode, it leverages mlio directly by creating a list of SageMakerPipe.
# In file mode, we create a temp directory with symlink to all input files or
# directories to meet XGB's assumption that all files are in the same directory.
# get_dmatrix will work as before. When it is a list, it works as explained in respective functions.

if is_pipe:
if isinstance(data_path, list):
files_path = data_path
else:
files_path = [data_path]
if not os.path.exists(data_path + '_0'):
logging.info('Pipe path {} does not exist!'.format(data_path))
return None
files_path = _get_pipe_mode_files_path(data_path)
else:
if not isinstance(data_path, list):
if not os.path.exists(data_path):
logging.info('File path {} does not exist!'.format(data_path))
return None
files_path = get_files_path(data_path)
else:
# Create a directory with symlinks to input files.
files_path = "/tmp/sagemaker_xgboost_input_data"
shutil.rmtree(files_path, ignore_errors=True)
os.mkdir(files_path)
for path in data_path:
if not os.path.exists(path):
return None
if os.path.isfile(path):
os.symlink(path, os.path.join(files_path, os.path.basename(path)))
else:
for file in os.scandir(path):
os.symlink(file, os.path.join(files_path, file.name))

files_path = _get_file_mode_files_path(data_path)
logging.info(f"files path: {files_path}")
if content_type.lower() == CSV:
dmatrix = get_csv_dmatrix(files_path, csv_weights, is_pipe)
elif content_type.lower() == LIBSVM:
Expand All @@ -565,11 +587,11 @@ def get_size(data_path, is_pipe=False):
:param is_pipe: Boolean to indicate if data is being read in pipe mode
:return: Size of data or 1 if sagemaker pipe found
"""
if is_pipe and os.path.exists(data_path + '_0'):
logging.info('Pipe path {} found.'.format(data_path))
if is_pipe and os.path.exists(f"{data_path}_0"):
logging.info(f"Pipe path {data_path} found.")
return 1
if not os.path.exists(data_path):
logging.info('Path {} does not exist!'.format(data_path))
logging.info(f"Path {data_path} does not exist!")
return 0
else:
total_size = 0
Expand All @@ -585,7 +607,7 @@ def get_size(data_path, is_pipe=False):
return total_size


def get_files_path(data_path):
def get_files_path_from_string(data_path: Union[List[str], str]) -> List[str]:
if os.path.isfile(data_path):
files_path = data_path
else:
Expand All @@ -595,3 +617,39 @@ def get_files_path(data_path):
break

return files_path


def _make_symlink(path, source_path, name, index):
base_name = os.path.join(source_path, f"{name}_{str(index)}")
logging.info(f'creating symlink between Path {source_path} and destination {base_name}')
os.symlink(path, base_name)


def check_data_redundancy(train_path, validate_path):
"""Log a warning if suspected duplicate files are found in the training and validation folders.
The validation score of models would be invalid if the same data is used for both training and validation.
Files are suspected of being duplicates when the file names are the same and their sizes are the same.
param train_path : path to training data
param validate_path : path to validation data
"""
if not os.path.exists(train_path):
raise exc.UserError("training data's path is not existed")
if not os.path.exists(validate_path):
raise exc.UserError("validation data's path is not existed")

training_files_set = set(f for f in os.listdir(train_path) if os.path.isfile(os.path.join(train_path, f)))
validation_files_set = set(f for f in os.listdir(validate_path) if os.path.isfile(os.path.join(validate_path, f)))
same_name_files = training_files_set.intersection(validation_files_set)
for f in same_name_files:
f_train_path = os.path.join(train_path, f)
f_validate_path = os.path.join(validate_path, f)
f_train_size = os.path.getsize(f_train_path)
f_validate_size = os.path.getsize(f_validate_path)
if f_train_size == f_validate_size:
logging.warning(f"Suspected identical files found. ({f_train_path} and {f_validate_path}"
f"with same size {f_validate_size} bytes)."
f" Note: Duplicate data in the training set and validation set is usually"
f" not intentional and can impair the validity of the model evaluation by"
f" the validation score.")
1 change: 0 additions & 1 deletion src/sagemaker_xgboost_container/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ def run_algorithm_mode():

train_path = os.environ[sm_env_constants.SM_CHANNEL_TRAIN]
val_path = os.environ.get(sm_env_constants.SM_CHANNEL_VALIDATION)

sm_hosts = json.loads(os.environ[sm_env_constants.SM_HOSTS])
sm_current_host = os.environ[sm_env_constants.SM_CURRENT_HOST]

Expand Down
31 changes: 31 additions & 0 deletions test/unit/test_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import subprocess
import sys
import time
from mock import patch

from sagemaker_algorithm_toolkit import exceptions as exc
from sagemaker_xgboost_container import data_utils
Expand Down Expand Up @@ -221,3 +222,33 @@ def test_parse_protobuf_dmatrix_single_feature_label(self):
pb_path = os.path.join(self.data_path, 'recordio_protobuf', file_path)
reader = data_utils.get_recordio_protobuf_dmatrix
self._check_dmatrix(reader, pb_path, 1, 1)

@patch("logging.warning")
def test_check_data_redundancy_positive(self, mock_log_warning):
current_path = Path(os.path.abspath(__file__))
data_path = os.path.join(str(current_path.parent.parent), 'resources', 'abalone', 'data')
file_path = os.path.join(data_path, "train")
data_utils.check_data_redundancy(file_path, file_path)
mock_log_warning.assert_called()

@patch("logging.warning")
def test_check_data_redundancy_negative(self, mock_log_warning):
current_path = Path(os.path.abspath(__file__))
data_path = os.path.join(str(current_path.parent.parent), 'resources', 'abalone', 'data')
file_path = [os.path.join(data_path, path) for path in ['train', 'validation']]
data_utils.check_data_redundancy(file_path[0], file_path[1])
mock_log_warning.assert_not_called()

def test_check_data_redundancy_does_not_throw_exception_file(self):
current_path = Path(os.path.abspath(__file__))
data_path = os.path.join(str(current_path.parent.parent), 'resources', 'abalone', 'data')
file_path = os.path.join(data_path, "train")
try:
data_utils.check_data_redundancy(file_path, file_path)
except Exception as e:
assert False, f"check_data_redundancy raised an exception {e} for file mode"

def test_check_data_redundancy_throws_exception_pipe(self):
pb_file_paths = ['pb_files']
with self.assertRaises(Exception):
data_utils.check_data_redundancy(pb_file_paths[0], pb_file_paths[1])

0 comments on commit 9556801

Please sign in to comment.