diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index d43020e..8f4992b 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -550,10 +550,14 @@ class GlueDataBrewStartJobRunStep(Task): Creates a Task state that starts a DataBrew job. See `Manage AWS Glue DataBrew Jobs with Step Functions `_ for more details. """ - def __init__(self, state_id, wait_for_completion=True, **kwargs): + def __init__(self, state_id, integration_pattern=IntegrationPattern.WaitForCompletion, **kwargs): """ Args: state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + integration_pattern (stepfunctions.steps.integration_resources.IntegrationPattern, optional): Service integration pattern used to call the integrated service. Supported integration patterns (default: WaitForCompletion): + + * WaitForCompletion: Wait for the Databrew job to complete before going to the next state. (See `Run A Job `_ for more details.) + * CallAndContinue: Call StartJobRun and progress to the next state (See `Request Response `_ for more details.) comment (str, optional): Human-readable comment or description. (default: None) timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60) timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. @@ -563,23 +567,18 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') - wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) """ - if wait_for_completion: - """ - Example resource arn: arn:aws:states:::databrew:startJobRun.sync - """ - - kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_DATABREW_SERVICE_NAME, - GlueDataBrewApi.StartJobRun, - IntegrationPattern.WaitForCompletion) - else: - """ - Example resource arn: arn:aws:states:::databrew:startJobRun - """ + supported_integ_patterns = [IntegrationPattern.WaitForCompletion, IntegrationPattern.CallAndContinue] - kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_DATABREW_SERVICE_NAME, - GlueDataBrewApi.StartJobRun) + is_integration_pattern_valid(integration_pattern, supported_integ_patterns) + kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_DATABREW_SERVICE_NAME, + GlueDataBrewApi.StartJobRun, + integration_pattern) + """ + Example resource arns: + - CallAndContinue: arn: arn:aws:states:::databrew:startJobRun + - WaitForCompletion: arn: arn:aws:states:::databrew:startJobRun.sync + """ super(GlueDataBrewStartJobRunStep, self).__init__(state_id, **kwargs) diff --git a/tests/unit/test_service_steps.py b/tests/unit/test_service_steps.py index 961221e..a2e5a38 100644 --- a/tests/unit/test_service_steps.py +++ b/tests/unit/test_service_steps.py @@ -677,8 +677,8 @@ def test_emr_modify_instance_group_by_name_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') -def test_databrew_start_job_run_step_creation_sync(): - step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run - Sync', parameters={ +def test_databrew_start_job_run_step_creation_default(): + step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run - default', parameters={ "Name": "MyWorkflowJobRun" }) @@ -693,10 +693,30 @@ def test_databrew_start_job_run_step_creation_sync(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') -def test_databrew_start_job_run_step_creation(): - step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run', wait_for_completion=False, parameters={ - "Name": "MyWorkflowJobRun" - }) +def test_databrew_start_job_run_step_creation_wait_for_completion(): + step = GlueDataBrewStartJobRunStep( + 'Start Glue DataBrew Job Run - WaitForCompletion', integration_pattern=IntegrationPattern.WaitForCompletion, + parameters={ + "Name": "MyWorkflowJobRun" + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::databrew:startJobRun.sync', + 'Parameters': { + 'Name': 'MyWorkflowJobRun' + }, + 'End': True + } + + +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_databrew_start_job_run_step_creation_call_and_continue(): + step = GlueDataBrewStartJobRunStep( + 'Start Glue DataBrew Job Run - CallAndContinue', + integration_pattern=IntegrationPattern.CallAndContinue, parameters={ + "Name": "MyWorkflowJobRun" + }) assert step.to_dict() == { 'Type': 'Task', @@ -708,6 +728,16 @@ def test_databrew_start_job_run_step_creation(): } +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_databrew_start_job_run_step_creation_wait_for_task_token_raises_error(): + error_message = re.escape(f"Integration Pattern ({IntegrationPattern.WaitForTaskToken.name}) is not supported for this step - " + f"Please use one of the following: " + f"{[IntegrationPattern.WaitForCompletion.name, IntegrationPattern.CallAndContinue.name]}") + with pytest.raises(ValueError, match=error_message): + GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run - WaitForTaskToken', + integration_pattern=IntegrationPattern.WaitForTaskToken) + + @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_create_cluster_step_creation_call_and_continue(): step = EksCreateClusterStep("Create Eks cluster - CallAndContinue", integration_pattern=IntegrationPattern.CallAndContinue,