From 1ea8346299147067fbb39cfdd46d299f3465c7f9 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen <83104894+ca-nguyen@users.noreply.github.com> Date: Wed, 3 Nov 2021 16:24:15 -0700 Subject: [PATCH] feat: Support placeholders for TuningStep (#173) --- src/stepfunctions/steps/sagemaker.py | 20 ++- tests/integ/test_sagemaker_steps.py | 101 +++++++++++- tests/unit/test_sagemaker_steps.py | 235 ++++++++++++++++++++++++++- 3 files changed, 340 insertions(+), 16 deletions(-) diff --git a/src/stepfunctions/steps/sagemaker.py b/src/stepfunctions/steps/sagemaker.py index 8b273ea..f7e2e12 100644 --- a/src/stepfunctions/steps/sagemaker.py +++ b/src/stepfunctions/steps/sagemaker.py @@ -465,7 +465,10 @@ def __init__(self, state_id, tuner, job_name, data, wait_for_completion=True, ta :class:`sagemaker.amazon.amazon_estimator.RecordSet` objects, where each instance is a different channel of training data. wait_for_completion(bool, optional): Boolean value set to `True` if the Task state should wait for the tuning job to complete before proceeding to the next step in the workflow. Set to `False` if the Task state should submit the tuning job and proceed to the next step. (default: True) - tags (list[dict], optional): `List of tags `_ to associate with the resource. + tags (list[dict] or Placeholder, optional): `List of tags `_ to associate with the resource. + parameters(dict, optional): The value of this field is merged with other arguments to become the request payload for SageMaker `CreateHyperParameterTuningJob`_. + You can use `parameters` to override the value provided by other arguments and specify any field's value dynamically using `Placeholders`_. + """ if wait_for_completion: """ @@ -483,19 +486,22 @@ def __init__(self, state_id, tuner, job_name, data, wait_for_completion=True, ta kwargs[Field.Resource.value] = get_service_integration_arn(SAGEMAKER_SERVICE_NAME, SageMakerApi.CreateHyperParameterTuningJob) - parameters = tuning_config(tuner=tuner, inputs=data, job_name=job_name).copy() + tuning_parameters = tuning_config(tuner=tuner, inputs=data, job_name=job_name).copy() if job_name is not None: - parameters['HyperParameterTuningJobName'] = job_name + tuning_parameters['HyperParameterTuningJobName'] = job_name - if 'S3Operations' in parameters: - del parameters['S3Operations'] + if 'S3Operations' in tuning_parameters: + del tuning_parameters['S3Operations'] if tags: - parameters['Tags'] = tags_dict_to_kv_list(tags) + tuning_parameters['Tags'] = tags if isinstance(tags, Placeholder) else tags_dict_to_kv_list(tags) - kwargs[Field.Parameters.value] = parameters + if Field.Parameters.value in kwargs and isinstance(kwargs[Field.Parameters.value], dict): + # Update tuning parameters with input parameters + merge_dicts(tuning_parameters, kwargs[Field.Parameters.value]) + kwargs[Field.Parameters.value] = tuning_parameters super(TuningStep, self).__init__(state_id, **kwargs) diff --git a/tests/integ/test_sagemaker_steps.py b/tests/integ/test_sagemaker_steps.py index 279b90e..1d41527 100644 --- a/tests/integ/test_sagemaker_steps.py +++ b/tests/integ/test_sagemaker_steps.py @@ -104,7 +104,6 @@ def test_training_step(pca_estimator_fixture, record_set_fixture, sfn_client, sf # Cleanup state_machine_delete_wait(sfn_client, workflow.state_machine_arn) - # End of Cleanup def test_training_step_with_placeholders(pca_estimator_fixture, record_set_fixture, sfn_client, sfn_role_arn): @@ -193,7 +192,7 @@ def test_model_step(trained_estimator, sfn_client, sagemaker_session, sfn_role_a state_machine_delete_wait(sfn_client, workflow.state_machine_arn) model_name = get_resource_name_from_arn(execution_output.get("ModelArn")).split("/")[1] delete_sagemaker_model(model_name, sagemaker_session) - # End of Cleanup + def test_model_step_with_placeholders(trained_estimator, sfn_client, sagemaker_session, sfn_role_arn): @@ -288,7 +287,6 @@ def test_transform_step(trained_estimator, sfn_client, sfn_role_arn): # Cleanup state_machine_delete_wait(sfn_client, workflow.state_machine_arn) - # End of Cleanup def test_transform_step_with_placeholder(trained_estimator, sfn_client, sfn_role_arn): @@ -413,7 +411,7 @@ def test_endpoint_config_step(trained_estimator, sfn_client, sagemaker_session, state_machine_delete_wait(sfn_client, workflow.state_machine_arn) delete_sagemaker_endpoint_config(endpoint_config_name, sagemaker_session) delete_sagemaker_model(model.name, sagemaker_session) - # End of Cleanup + def test_create_endpoint_step(trained_estimator, record_set_fixture, sfn_client, sagemaker_session, sfn_role_arn): # Setup: Create model and endpoint config for trained estimator in SageMaker @@ -456,7 +454,7 @@ def test_create_endpoint_step(trained_estimator, record_set_fixture, sfn_client, delete_sagemaker_endpoint(endpoint_name, sagemaker_session) delete_sagemaker_endpoint_config(model.name, sagemaker_session) delete_sagemaker_model(model.name, sagemaker_session) - # End of Cleanup + def test_tuning_step(sfn_client, record_set_for_hyperparameter_tuning, sagemaker_role_arn, sfn_role_arn): job_name = generate_job_name() @@ -507,7 +505,97 @@ def test_tuning_step(sfn_client, record_set_for_hyperparameter_tuning, sagemaker # Cleanup state_machine_delete_wait(sfn_client, workflow.state_machine_arn) - # End of Cleanup + + +def test_tuning_step_with_placeholders(sfn_client, record_set_for_hyperparameter_tuning, sagemaker_role_arn, sfn_role_arn): + kmeans = KMeans( + role=sagemaker_role_arn, + instance_count=1, + instance_type=INSTANCE_TYPE, + k=10 + ) + + hyperparameter_ranges = { + "extra_center_factor": IntegerParameter(4, 10), + "mini_batch_size": IntegerParameter(10, 100), + "epochs": IntegerParameter(1, 2), + "init_method": CategoricalParameter(["kmeans++", "random"]), + } + + tuner = HyperparameterTuner( + estimator=kmeans, + objective_metric_name="test:msd", + hyperparameter_ranges=hyperparameter_ranges, + objective_type="Maximize", + max_jobs=2, + max_parallel_jobs=1, + ) + + execution_input = ExecutionInput(schema={ + 'job_name': str, + 'objective_metric_name': str, + 'objective_type': str, + 'max_jobs': int, + 'max_parallel_jobs': int, + 'early_stopping_type': str, + 'strategy': str, + }) + + parameters = { + 'HyperParameterTuningJobConfig': { + 'HyperParameterTuningJobObjective': { + 'MetricName': execution_input['objective_metric_name'], + 'Type': execution_input['objective_type'] + }, + 'ResourceLimits': {'MaxNumberOfTrainingJobs': execution_input['max_jobs'], + 'MaxParallelTrainingJobs': execution_input['max_parallel_jobs']}, + 'Strategy': execution_input['strategy'], + 'TrainingJobEarlyStoppingType': execution_input['early_stopping_type'] + }, + 'TrainingJobDefinition': { + 'AlgorithmSpecification': { + 'TrainingInputMode': 'File' + } + } + } + + # Build workflow definition + tuning_step = TuningStep('Tuning', tuner=tuner, job_name=execution_input['job_name'], + data=record_set_for_hyperparameter_tuning, parameters=parameters) + tuning_step.add_retry(SAGEMAKER_RETRY_STRATEGY) + workflow_graph = Chain([tuning_step]) + + with timeout(minutes=DEFAULT_TIMEOUT_MINUTES): + # Create workflow and check definition + workflow = create_workflow_and_check_definition( + workflow_graph=workflow_graph, + workflow_name=unique_name_from_base("integ-test-tuning-step-workflow"), + sfn_client=sfn_client, + sfn_role_arn=sfn_role_arn + ) + + job_name = generate_job_name() + + inputs = { + 'job_name': job_name, + 'objective_metric_name': 'test:msd', + 'objective_type': 'Minimize', + 'max_jobs': 2, + 'max_parallel_jobs': 2, + 'early_stopping_type': 'Off', + 'strategy': 'Bayesian', + } + + # Execute workflow + execution = workflow.execute(inputs=inputs) + execution_output = execution.get_output(wait=True) + + # Check workflow output + assert execution_output.get("HyperParameterTuningJobStatus") == "Completed" + + # Cleanup + state_machine_delete_wait(sfn_client, workflow.state_machine_arn) + def test_processing_step(sklearn_processor_fixture, sagemaker_session, sfn_client, sfn_role_arn): region = boto3.session.Session().region_name @@ -561,7 +649,6 @@ def test_processing_step(sklearn_processor_fixture, sagemaker_session, sfn_clien # Cleanup state_machine_delete_wait(sfn_client, workflow.state_machine_arn) - # End of Cleanup def test_processing_step_with_placeholders(sklearn_processor_fixture, sagemaker_session, sfn_client, sfn_role_arn, diff --git a/tests/unit/test_sagemaker_steps.py b/tests/unit/test_sagemaker_steps.py index 6200334..55d3a88 100644 --- a/tests/unit/test_sagemaker_steps.py +++ b/tests/unit/test_sagemaker_steps.py @@ -24,12 +24,13 @@ from sagemaker.debugger import Rule, rule_configs, DebuggerHookConfig, CollectionConfig from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput +from sagemaker.parameter import IntegerParameter, CategoricalParameter +from sagemaker.tuner import HyperparameterTuner from unittest.mock import MagicMock, patch from stepfunctions.inputs import ExecutionInput, StepInput -from stepfunctions.steps.fields import Field from stepfunctions.steps.sagemaker import TrainingStep, TransformStep, ModelStep, EndpointStep, EndpointConfigStep,\ - ProcessingStep + ProcessingStep, TuningStep from stepfunctions.steps.sagemaker import tuning_config from tests.unit.utils import mock_boto_api_call @@ -1474,3 +1475,233 @@ def test_processing_step_creation_with_placeholders(sklearn_processor): 'Resource': 'arn:aws:states:::sagemaker:createProcessingJob.sync', 'End': True } + + +@patch('botocore.client.BaseClient._make_api_call', new=mock_boto_api_call) +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_tuning_step_creation_with_framework_estimator(tensorflow_estimator): + hyperparameter_ranges = { + "extra_center_factor": IntegerParameter(4, 10), + "epochs": IntegerParameter(1, 2), + "init_method": CategoricalParameter(["kmeans++", "random"]), + } + + tuner = HyperparameterTuner( + estimator=tensorflow_estimator, + objective_metric_name="test:msd", + hyperparameter_ranges=hyperparameter_ranges, + objective_type="Minimize", + max_jobs=2, + max_parallel_jobs=2, + ) + + step = TuningStep('Tuning', + tuner=tuner, + data={'train': 's3://sagemaker/train'}, + job_name='tensorflow-job', + tags=DEFAULT_TAGS + ) + + state_machine_definition = step.to_dict() + # The sagemaker_job_name is generated - expected name will be taken from the generated definition + generated_sagemaker_job_name = state_machine_definition['Parameters']['TrainingJobDefinition']\ + ['StaticHyperParameters']['sagemaker_job_name'] + expected_definition = { + 'Type': 'Task', + 'Parameters': { + 'HyperParameterTuningJobConfig': { + 'HyperParameterTuningJobObjective': { + 'MetricName': 'test:msd', + 'Type': 'Minimize' + }, + 'ParameterRanges': { + 'CategoricalParameterRanges': [ + { + 'Name': 'init_method', + 'Values': ['"kmeans++"', '"random"'] + }], + 'ContinuousParameterRanges': [], + 'IntegerParameterRanges': [ + { + 'MaxValue': '10', + 'MinValue': '4', + 'Name': 'extra_center_factor', + 'ScalingType': 'Auto' + }, + { + 'MaxValue': '2', + 'MinValue': '1', + 'Name': 'epochs', + 'ScalingType': 'Auto' + } + ] + }, + 'ResourceLimits': {'MaxNumberOfTrainingJobs': 2, + 'MaxParallelTrainingJobs': 2}, + 'Strategy': 'Bayesian', + 'TrainingJobEarlyStoppingType': 'Off' + }, + 'HyperParameterTuningJobName': 'tensorflow-job', + 'Tags': [{'Key': 'Purpose', 'Value': 'unittests'}], + 'TrainingJobDefinition': { + 'AlgorithmSpecification': { + 'TrainingImage': '520713654638.dkr.ecr.us-east-1.amazonaws.com/sagemaker-tensorflow:1.13-gpu-py2', + 'TrainingInputMode': 'File' + }, + 'InputDataConfig': [{'ChannelName': 'train', + 'DataSource': {'S3DataSource': { + 'S3DataDistributionType': 'FullyReplicated', + 'S3DataType': 'S3Prefix', + 'S3Uri': 's3://sagemaker/train'}}}], + 'OutputDataConfig': {'S3OutputPath': 's3://sagemaker/models'}, + 'ResourceConfig': {'InstanceCount': 1, + 'InstanceType': 'ml.p2.xlarge', + 'VolumeSizeInGB': 30}, + 'RoleArn': 'execution-role', + 'StaticHyperParameters': { + 'checkpoint_path': '"s3://sagemaker/models/sagemaker-tensorflow/checkpoints"', + 'evaluation_steps': '100', + 'sagemaker_container_log_level': '20', + 'sagemaker_estimator_class_name': '"TensorFlow"', + 'sagemaker_estimator_module': '"sagemaker.tensorflow.estimator"', + 'sagemaker_job_name': generated_sagemaker_job_name, + 'sagemaker_program': '"tf_train.py"', + 'sagemaker_region': '"us-east-1"', + 'sagemaker_submit_directory': '"s3://sagemaker/source"', + 'training_steps': '1000'}, + 'StoppingCondition': {'MaxRuntimeInSeconds': 86400}}}, + 'Resource': 'arn:aws:states:::sagemaker:createHyperParameterTuningJob.sync', + 'End': True + } + + assert state_machine_definition == expected_definition + + +@patch('botocore.client.BaseClient._make_api_call', new=mock_boto_api_call) +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_tuning_step_creation_with_placeholders(tensorflow_estimator): + execution_input = ExecutionInput(schema={ + 'data_input': str, + 'tags': list, + 'objective_metric_name': str, + 'hyperparameter_ranges': str, + 'objective_type': str, + 'max_jobs': int, + 'max_parallel_jobs': int, + 'early_stopping_type': str, + 'strategy': str, + }) + + step_input = StepInput(schema={ + 'job_name': str + }) + + hyperparameter_ranges = { + "extra_center_factor": IntegerParameter(4, 10), + "epochs": IntegerParameter(1, 2), + "init_method": CategoricalParameter(["kmeans++", "random"]), + } + + tuner = HyperparameterTuner( + estimator=tensorflow_estimator, + objective_metric_name="test:msd", + hyperparameter_ranges=hyperparameter_ranges, + objective_type="Minimize", + max_jobs=2, + max_parallel_jobs=2, + ) + + parameters = { + 'HyperParameterTuningJobConfig': { + 'HyperParameterTuningJobObjective': { + 'MetricName': execution_input['objective_metric_name'], + 'Type': execution_input['objective_type'] + }, + 'ResourceLimits': {'MaxNumberOfTrainingJobs': execution_input['max_jobs'], + 'MaxParallelTrainingJobs': execution_input['max_parallel_jobs']}, + 'Strategy': execution_input['strategy'], + 'TrainingJobEarlyStoppingType': execution_input['early_stopping_type'] + }, + 'TrainingJobDefinition': { + 'AlgorithmSpecification': { + 'TrainingInputMode': 'File' + }, + 'HyperParameterRanges': execution_input['hyperparameter_ranges'], + 'InputDataConfig': execution_input['data_input'] + } + } + + step = TuningStep('Tuning', + tuner=tuner, + data={'train': 's3://sagemaker/train'}, + job_name=step_input['job_name'], + tags=execution_input['tags'], + parameters=parameters + ) + + state_machine_definition = step.to_dict() + # The sagemaker_job_name is generated - expected name will be taken from the generated definition + generated_sagemaker_job_name = state_machine_definition['Parameters']['TrainingJobDefinition']['StaticHyperParameters']['sagemaker_job_name'] + expected_parameters = { + 'HyperParameterTuningJobConfig': { + 'HyperParameterTuningJobObjective': { + 'MetricName.$': "$$.Execution.Input['objective_metric_name']", + 'Type.$': "$$.Execution.Input['objective_type']" + }, + 'ParameterRanges': { + 'CategoricalParameterRanges': [ + { + 'Name': 'init_method', + 'Values': ['"kmeans++"', '"random"'] + }], + 'ContinuousParameterRanges': [], + 'IntegerParameterRanges': [ + { + 'MaxValue': '10', + 'MinValue': '4', + 'Name': 'extra_center_factor', + 'ScalingType': 'Auto' + }, + { + 'MaxValue': '2', + 'MinValue': '1', + 'Name': 'epochs', + 'ScalingType': 'Auto' + } + ] + }, + 'ResourceLimits': {'MaxNumberOfTrainingJobs.$': "$$.Execution.Input['max_jobs']", + 'MaxParallelTrainingJobs.$': "$$.Execution.Input['max_parallel_jobs']"}, + 'Strategy.$': "$$.Execution.Input['strategy']", + 'TrainingJobEarlyStoppingType.$': "$$.Execution.Input['early_stopping_type']" + }, + 'HyperParameterTuningJobName.$': "$['job_name']", + 'Tags.$': "$$.Execution.Input['tags']", + 'TrainingJobDefinition': { + 'AlgorithmSpecification': { + 'TrainingImage': '520713654638.dkr.ecr.us-east-1.amazonaws.com/sagemaker-tensorflow:1.13-gpu-py2', + 'TrainingInputMode': 'File' + }, + 'HyperParameterRanges.$': "$$.Execution.Input['hyperparameter_ranges']", + 'InputDataConfig.$': "$$.Execution.Input['data_input']", + 'OutputDataConfig': {'S3OutputPath': 's3://sagemaker/models'}, + 'ResourceConfig': {'InstanceCount': 1, + 'InstanceType': 'ml.p2.xlarge', + 'VolumeSizeInGB': 30}, + 'RoleArn': 'execution-role', + 'StaticHyperParameters': { + 'checkpoint_path': '"s3://sagemaker/models/sagemaker-tensorflow/checkpoints"', + 'evaluation_steps': '100', + 'sagemaker_container_log_level': '20', + 'sagemaker_estimator_class_name': '"TensorFlow"', + 'sagemaker_estimator_module': '"sagemaker.tensorflow.estimator"', + 'sagemaker_job_name': generated_sagemaker_job_name, + 'sagemaker_program': '"tf_train.py"', + 'sagemaker_region': '"us-east-1"', + 'sagemaker_submit_directory': '"s3://sagemaker/source"', + 'training_steps': '1000'}, + 'StoppingCondition': {'MaxRuntimeInSeconds': 86400} + } + } + + assert state_machine_definition['Parameters'] == expected_parameters