diff --git a/astronomer/providers/amazon/aws/hooks/redshift_cluster.py b/astronomer/providers/amazon/aws/hooks/redshift_cluster.py index 635612b49..b4e39db96 100644 --- a/astronomer/providers/amazon/aws/hooks/redshift_cluster.py +++ b/astronomer/providers/amazon/aws/hooks/redshift_cluster.py @@ -1,5 +1,8 @@ +from __future__ import annotations + import asyncio -from typing import Any, Dict, Optional +import warnings +from typing import Any import botocore.exceptions @@ -10,11 +13,19 @@ class RedshiftHookAsync(AwsBaseHookAsync): """Interact with AWS Redshift using aiobotocore python library""" def __init__(self, *args: Any, **kwargs: Any) -> None: + warnings.warn( + ( + "This module is deprecated and will be removed in 2.0.0." + "Please use :class: `~airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook`." + ), + DeprecationWarning, + stacklevel=2, + ) kwargs["client_type"] = "redshift" kwargs["resource_type"] = "redshift" super().__init__(*args, **kwargs) - async def cluster_status(self, cluster_identifier: str, delete_operation: bool = False) -> Dict[str, Any]: + async def cluster_status(self, cluster_identifier: str, delete_operation: bool = False) -> dict[str, Any]: """ Connects to the AWS redshift cluster via aiobotocore and get the status and returns the status of the cluster based on the cluster_identifier passed @@ -38,9 +49,9 @@ async def delete_cluster( self, cluster_identifier: str, skip_final_cluster_snapshot: bool = True, - final_cluster_snapshot_identifier: Optional[str] = None, + final_cluster_snapshot_identifier: str | None = None, polling_period_seconds: float = 5.0, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """ Connects to the AWS redshift cluster via aiobotocore and deletes the cluster based on the cluster_identifier passed @@ -77,7 +88,7 @@ async def delete_cluster( async def pause_cluster( self, cluster_identifier: str, polling_period_seconds: float = 5.0 - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """ Connects to the AWS redshift cluster via aiobotocore and pause the cluster based on the cluster_identifier passed @@ -106,7 +117,7 @@ async def resume_cluster( self, cluster_identifier: str, polling_period_seconds: float = 5.0, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """ Connects to the AWS redshift cluster via aiobotocore and resume the cluster for the cluster_identifier passed @@ -137,7 +148,7 @@ async def get_cluster_status( expected_state: str, flag: asyncio.Event, delete_operation: bool = False, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """ Make call self.cluster_status to know the status and run till the expected_state is met and set the flag diff --git a/astronomer/providers/amazon/aws/operators/redshift_cluster.py b/astronomer/providers/amazon/aws/operators/redshift_cluster.py index 80525746d..a7340db61 100644 --- a/astronomer/providers/amazon/aws/operators/redshift_cluster.py +++ b/astronomer/providers/amazon/aws/operators/redshift_cluster.py @@ -1,221 +1,64 @@ -from typing import Any, Optional +import warnings -from airflow.exceptions import AirflowException -from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHook from airflow.providers.amazon.aws.operators.redshift_cluster import ( RedshiftDeleteClusterOperator, RedshiftPauseClusterOperator, RedshiftResumeClusterOperator, ) -from astronomer.providers.amazon.aws.triggers.redshift_cluster import ( - RedshiftClusterTrigger, -) -from astronomer.providers.utils.typing_compat import Context - class RedshiftDeleteClusterOperatorAsync(RedshiftDeleteClusterOperator): """ - Delete an AWS Redshift Cluster if cluster status is in `available` state. - - :param cluster_identifier: ID of the AWS Redshift Cluster - :param aws_conn_id: aws connection to use - :param skip_final_cluster_snapshot: determines cluster snapshot creation - :param final_cluster_snapshot_identifier: name of final cluster snapshot - :param polling_interval: polling period in seconds to check for the status + This class is deprecated. + Please use :class: `~airflow.providers.amazon.aws.operators.redshift_cluster.RedshiftDeleteClusterOperator`. """ - def __init__( - self, - *, - skip_final_cluster_snapshot: bool = True, - final_cluster_snapshot_identifier: Optional[str] = None, - cluster_status_fetch_interval_seconds: int = 10, - aws_conn_id: str = "aws_default", - poll_interval: int = 5, - **kwargs: Any, - ): - self.skip_final_cluster_snapshot = skip_final_cluster_snapshot - self.final_cluster_snapshot_identifier = final_cluster_snapshot_identifier - self.cluster_status_fetch_interval_seconds = cluster_status_fetch_interval_seconds - self.aws_conn_id = aws_conn_id - self.poll_interval = poll_interval - super().__init__(**kwargs) - - def execute(self, context: Context) -> None: - """ - Logic that the operator uses to correctly identify which trigger to - execute, and defer execution as expected. - """ - redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id) - cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier) - if cluster_state == "available": - self.defer( - timeout=self.execution_timeout, - trigger=RedshiftClusterTrigger( - task_id=self.task_id, - polling_period_seconds=self.poll_interval, - aws_conn_id=self.aws_conn_id, - cluster_identifier=self.cluster_identifier, - operation_type="delete_cluster", - skip_final_cluster_snapshot=self.skip_final_cluster_snapshot, - final_cluster_snapshot_identifier=self.final_cluster_snapshot_identifier, - ), - method_name="execute_complete", - ) - elif cluster_state == "cluster_not_found": - self.log.warning( - "Unable to delete cluster since cluster is not found. It may have already been deleted" - ) - else: - raise AirflowException( - "Unable to delete cluster since cluster is currently in status: %s", cluster_state - ) - - def execute_complete(self, context: Context, event: Any = None) -> None: - """ - Callback for when the trigger fires - returns immediately. - Relies on trigger to throw an exception, otherwise it assumes execution was - successful. - """ - if event: - if "status" in event and event["status"] == "error": - msg = "{}: {}".format(event["status"], event["message"]) - raise AirflowException(msg) - elif "status" in event and event["status"] == "success": - self.log.info("%s completed successfully.", self.task_id) - self.log.info("Deleted cluster successfully") - else: - raise AirflowException("Did not receive valid event from the trigerrer") + def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def] + warnings.warn( + ( + "This module is deprecated. " + "Please use `airflow.providers.amazon.aws.operators.redshift_cluster.RedshiftDeleteClusterOperator` " + "and set deferrable to True instead." + ), + DeprecationWarning, + stacklevel=2, + ) + return super().__init__(*args, deferrable=True, **kwargs) class RedshiftResumeClusterOperatorAsync(RedshiftResumeClusterOperator): """ - Resume a paused AWS Redshift Cluster, and - Few points on the cluster creation to avoid this type of Exception - ex:- 'You can't Resume cluster redshift-cluster-1 because no recently available - backup was found. Create a manual snapshot or wait for an automated snapshot, then retry' - 1.While creating the cluster make sure it is created in unique and snapshot is created (or) - 2.If it is created with previously deleted cluster name make sure there is a snapshot in the cluster. (or) - 3.Delete the cluster with snapshot created (it is not suggested because this snapshot storage is chargeable) - - :param cluster_identifier: id of the AWS Redshift Cluster - :param aws_conn_id: aws connection to use + This class is deprecated. + Please use :class: `~airflow.providers.amazon.aws.operators.redshift_cluster.RedshiftResumeClusterOperator`. """ - def __init__( - self, - *, - poll_interval: int = 5, - **kwargs: Any, - ): - self.poll_interval = poll_interval - super().__init__(**kwargs) - - def execute(self, context: Context) -> None: - """ - Logic that the operator uses to correctly identify which trigger to - execute, and defer execution as expected. - """ - redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id) - cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier) - if cluster_state == "paused": - self.defer( - timeout=self.execution_timeout, - trigger=RedshiftClusterTrigger( - task_id=self.task_id, - polling_period_seconds=self.poll_interval, - aws_conn_id=self.aws_conn_id, - cluster_identifier=self.cluster_identifier, - operation_type="resume_cluster", - ), - method_name="execute_complete", - ) - else: - self.log.warning( - "Unable to resume cluster since cluster is currently in status: %s", cluster_state - ) - - def execute_complete(self, context: Context, event: Any = None) -> None: - """ - Callback for when the trigger fires - returns immediately. - Relies on trigger to throw an exception, otherwise it assumes execution was - successful. - """ - if event: - if "status" in event and event["status"] == "error": - msg = "{}: {}".format(event["status"], event["message"]) - raise AirflowException(msg) - elif "status" in event and event["status"] == "success": - self.log.info("%s completed successfully.", self.task_id) - self.log.info("Resumed cluster successfully, now its in available state") - return None - else: - self.log.info("%s completed successfully.", self.task_id) - return None + def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def] + warnings.warn( + ( + "This module is deprecated. " + "Please use `airflow.providers.apache.aws.operators.redshift_cluster.RedshiftResumeClusterOperator` " + "and set deferrable to True instead." + ), + DeprecationWarning, + stacklevel=2, + ) + return super().__init__(*args, deferrable=True, **kwargs) class RedshiftPauseClusterOperatorAsync(RedshiftPauseClusterOperator): """ - Pause an AWS Redshift Cluster if cluster status is in `available` state, and - Few points on the cluster creation to avoid this type of Exception - ex:- 'You can't pause cluster redshift-cluster-1 because no recently available - backup was found. Create a manual snapshot or wait for an automated snapshot, then retry' - 1.While creating the cluster make sure it is created in unique and snapshot is created (or) - 2.If it is created with previously deleted cluster name make sure there is a snapshot in the cluster. (or) - 3.Delete the cluster with snapshot created (it is not suggested because this snapshot storage is chargeable) - - :param cluster_identifier: id of the AWS Redshift Cluster - :param aws_conn_id: aws connection to use + This class is deprecated. + Please use :class: `~airflow.providers.amazon.aws.operators.redshift_cluster.RedshiftPauseClusterOperator`. """ - def __init__( - self, - *, - poll_interval: int = 5, - **kwargs: Any, - ): - self.poll_interval = poll_interval - super().__init__(**kwargs) - - def execute(self, context: Context) -> None: - """ - Logic that the operator uses to correctly identify which trigger to - execute, and defer execution as expected. - """ - redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id) - cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier) - if cluster_state == "available": - self.defer( - timeout=self.execution_timeout, - trigger=RedshiftClusterTrigger( - task_id=self.task_id, - polling_period_seconds=self.poll_interval, - aws_conn_id=self.aws_conn_id, - cluster_identifier=self.cluster_identifier, - operation_type="pause_cluster", - ), - method_name="execute_complete", - ) - else: - self.log.warning( - "Unable to pause cluster since cluster is currently in status: %s", cluster_state - ) - - def execute_complete(self, context: Context, event: Any = None) -> None: - """ - Callback for when the trigger fires - returns immediately. - Relies on trigger to throw an exception, otherwise it assumes execution was - successful. - """ - if event: - if "status" in event and event["status"] == "error": - msg = "{}: {}".format(event["status"], event["message"]) - raise AirflowException(msg) - elif "status" in event and event["status"] == "success": - self.log.info("%s completed successfully.", self.task_id) - self.log.info("Paused cluster successfully") - return None - else: - self.log.info("%s completed successfully.", self.task_id) - return None + def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def] + warnings.warn( + ( + "This module is deprecated. " + "Please use `airflow.providers.apache.aws.operators.redshift_cluster.RedshiftPauseClusterOperator` " + "and set deferrable to True instead." + ), + DeprecationWarning, + stacklevel=2, + ) + return super().__init__(*args, deferrable=True, **kwargs) diff --git a/astronomer/providers/amazon/aws/triggers/redshift_cluster.py b/astronomer/providers/amazon/aws/triggers/redshift_cluster.py index e3a724a29..9b0f3e75c 100644 --- a/astronomer/providers/amazon/aws/triggers/redshift_cluster.py +++ b/astronomer/providers/amazon/aws/triggers/redshift_cluster.py @@ -1,4 +1,5 @@ import asyncio +import warnings from typing import Any, AsyncIterator, Dict, Optional, Tuple from airflow.triggers.base import BaseTrigger, TriggerEvent @@ -30,6 +31,14 @@ def __init__( skip_final_cluster_snapshot: bool = True, final_cluster_snapshot_identifier: Optional[str] = None, ): + warnings.warn( + ( + "This module is deprecated and will be removed in 2.0.0." + "Please use hooks in :module: `~airflow.providers.amazon.aws.triggers.redshift_cluster`." + ), + DeprecationWarning, + stacklevel=2, + ) super().__init__() self.task_id = task_id self.polling_period_seconds = polling_period_seconds diff --git a/setup.cfg b/setup.cfg index 6e7c84309..51da25922 100644 --- a/setup.cfg +++ b/setup.cfg @@ -45,7 +45,7 @@ zip_safe = false [options.extras_require] amazon = - apache-airflow-providers-amazon>=8.15.0 + apache-airflow-providers-amazon>=8.16.0 aiobotocore>=2.1.1 apache.hive = apache-airflow-providers-apache-hive>=6.1.5 @@ -119,7 +119,7 @@ mypy = # All extras from above except 'mypy', 'docs' and 'tests' all = aiobotocore>=2.1.1 - apache-airflow-providers-amazon>=8.15.0 + apache-airflow-providers-amazon>=8.16.0 apache-airflow-providers-apache-hive>=6.1.5 apache-airflow-providers-apache-livy apache-airflow-providers-cncf-kubernetes>=4 diff --git a/tests/amazon/aws/operators/test_redshift_cluster.py b/tests/amazon/aws/operators/test_redshift_cluster.py index ef7be9400..26f5ddaf5 100644 --- a/tests/amazon/aws/operators/test_redshift_cluster.py +++ b/tests/amazon/aws/operators/test_redshift_cluster.py @@ -1,299 +1,40 @@ -from unittest import mock - -import pytest -from airflow.exceptions import AirflowException, TaskDeferred +from airflow.providers.amazon.aws.operators.redshift_cluster import ( + RedshiftDeleteClusterOperator, + RedshiftPauseClusterOperator, + RedshiftResumeClusterOperator, +) from astronomer.providers.amazon.aws.operators.redshift_cluster import ( RedshiftDeleteClusterOperatorAsync, RedshiftPauseClusterOperatorAsync, RedshiftResumeClusterOperatorAsync, ) -from astronomer.providers.amazon.aws.triggers.redshift_cluster import ( - RedshiftClusterTrigger, -) class TestRedshiftDeleteClusterOperatorAsync: - @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status") - @mock.patch("astronomer.providers.amazon.aws.hooks.redshift_cluster.RedshiftHookAsync.delete_cluster") - @mock.patch("astronomer.providers.amazon.aws.hooks.redshift_cluster.RedshiftHookAsync.get_client_async") - def test_delete_cluster( - self, mock_async_client, mock_async_delete_cluster, mock_sync_cluster_status, context - ): - """Test Delete cluster operator with available cluster state and check the trigger instance""" - mock_sync_cluster_status.return_value = "available" - mock_async_client.return_value.delete_cluster.return_value = { - "Cluster": {"ClusterIdentifier": "test_cluster", "ClusterStatus": "deleting"} - } - mock_async_delete_cluster.return_value = {"status": "success", "cluster_state": "cluster_not_found"} - - redshift_operator = RedshiftDeleteClusterOperatorAsync( - task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" - ) - - with pytest.raises(TaskDeferred) as exc: - redshift_operator.execute(context) - - assert isinstance( - exc.value.trigger, RedshiftClusterTrigger - ), "Trigger is not a RedshiftClusterTrigger" - - @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status") - @mock.patch("astronomer.providers.amazon.aws.hooks.redshift_cluster.RedshiftHookAsync.delete_cluster") - @mock.patch("astronomer.providers.amazon.aws.hooks.redshift_cluster.RedshiftHookAsync.get_client_async") - def test_delete_cluster_failure( - self, mock_async_client, mock_async_delete_cluster, mock_sync_cluster_status - ): - """Test Delete cluster operator with available cluster state in failure test case""" - mock_sync_cluster_status.return_value = "available" - mock_async_client.return_value.delete_cluster.return_value = { - "Cluster": {"ClusterIdentifier": "test_cluster", "ClusterStatus": "deleting"} - } - mock_async_delete_cluster.return_value = {"status": "success", "cluster_state": "cluster_not_found"} - - redshift_operator = RedshiftDeleteClusterOperatorAsync( - task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" - ) - - with pytest.raises(AirflowException): - redshift_operator.execute_complete( - context=None, event={"status": "error", "message": "test failure message"} - ) - - @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status") - @mock.patch("astronomer.providers.amazon.aws.hooks.redshift_cluster.RedshiftHookAsync.delete_cluster") - @mock.patch("astronomer.providers.amazon.aws.hooks.redshift_cluster.RedshiftHookAsync.get_client_async") - def test_delete_cluster_execute_complete( - self, mock_async_client, mock_async_delete_cluster, mock_sync_cluster_status - ): - """ - Test Delete cluster operator execute_complete with available cluster state and - return state as cluster_not_found. - """ - mock_sync_cluster_status.return_value = "available" - mock_async_client.return_value.delete_cluster.return_value = { - "Cluster": {"ClusterIdentifier": "test_cluster", "ClusterStatus": "deleting"} - } - mock_async_delete_cluster.return_value = {"status": "success", "cluster_state": "cluster_not_found"} - - redshift_operator = RedshiftDeleteClusterOperatorAsync( - task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" - ) - - with mock.patch.object(redshift_operator.log, "info") as mock_log_info: - redshift_operator.execute_complete( - context=None, event={"status": "success", "cluster_state": "cluster_not_found"} - ) - mock_log_info.assert_called_with("Deleted cluster successfully") - - def test_delete_cluster_execute_complete_invalid_trigger_event(self): - """Asserts that exception is raised when invalid event is received from triggerer""" + def test_init(self): task = RedshiftDeleteClusterOperatorAsync( task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" ) - with pytest.raises(AirflowException) as exception_info: - task.execute_complete(context=None, event=None) + assert isinstance(task, RedshiftDeleteClusterOperator) + assert task.deferrable is True - assert exception_info.value.args[0] == "Did not receive valid event from the trigerrer" - @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status") - def test_delete_cluster_execute_warning(self, mock_sync_cluster_status): - """Test Pause cluster operator execute method with warnings message""" - mock_sync_cluster_status.return_value = "cluster_not_found" - redshift_operator = RedshiftDeleteClusterOperatorAsync( +class TestRedshiftPauseClusterOperatorAsync: + def test_init(self): + task = RedshiftPauseClusterOperatorAsync( task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" ) - with mock.patch.object(redshift_operator.log, "warning") as mock_log_warning: - redshift_operator.execute(context=None) - mock_log_warning.assert_called_with( - "Unable to delete cluster since cluster is not found. It may have already been deleted" - ) + assert isinstance(task, RedshiftPauseClusterOperator) + assert task.deferrable is True class TestRedshiftResumeClusterOperatorAsync: - @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status") - @mock.patch("astronomer.providers.amazon.aws.hooks.redshift_cluster.RedshiftHookAsync.resume_cluster") - @mock.patch("astronomer.providers.amazon.aws.hooks.redshift_cluster.RedshiftHookAsync.get_client_async") - def test_resume_cluster( - self, mock_async_client, mock_async_resume_cluster, mock_sync_cluster_statue, context - ): - """Test Resume cluster operator run""" - mock_sync_cluster_statue.return_value = "paused" - mock_async_client.return_value.resume_cluster.return_value = { - "Cluster": {"ClusterIdentifier": "test_cluster", "ClusterStatus": "resuming"} - } - mock_async_resume_cluster.return_value = {"status": "success", "cluster_state": "available"} - - redshift_operator = RedshiftResumeClusterOperatorAsync( - task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" - ) - - with pytest.raises(TaskDeferred) as exc: - redshift_operator.execute(context) - - assert isinstance( - exc.value.trigger, RedshiftClusterTrigger - ), "Trigger is not a RedshiftClusterTrigger" - - @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status") - @mock.patch("astronomer.providers.amazon.aws.hooks.redshift_cluster.RedshiftHookAsync.resume_cluster") - @mock.patch("astronomer.providers.amazon.aws.hooks.redshift_cluster.RedshiftHookAsync.get_client_async") - def test_resume_cluster_failure( - self, mock_async_client, mock_async_resume_cluster, mock_sync_cluster_statue - ): - """Test Resume cluster operator Failure""" - mock_sync_cluster_statue.return_value = "paused" - mock_async_client.return_value.resume_cluster.return_value = { - "Cluster": {"ClusterIdentifier": "test_cluster", "ClusterStatus": "resuming"} - } - mock_async_resume_cluster.return_value = {"status": "success", "cluster_state": "available"} - - redshift_operator = RedshiftResumeClusterOperatorAsync( - task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" - ) - - with pytest.raises(AirflowException): - redshift_operator.execute_complete( - context=None, event={"status": "error", "message": "test failure message"} - ) - - @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status") - @mock.patch("astronomer.providers.amazon.aws.hooks.redshift_cluster.RedshiftHookAsync.resume_cluster") - @mock.patch("astronomer.providers.amazon.aws.hooks.redshift_cluster.RedshiftHookAsync.get_client_async") - def test_resume_cluster_execute_complete( - self, mock_async_client, mock_async_resume_cluster, mock_sync_cluster_statue - ): - """Test Resume cluster operator execute_complete with proper return value""" - mock_sync_cluster_statue.return_value = "paused" - mock_async_client.return_value.resume_cluster.return_value = { - "Cluster": {"ClusterIdentifier": "test_cluster", "ClusterStatus": "resuming"} - } - mock_async_resume_cluster.return_value = {"status": "success", "cluster_state": "available"} - - redshift_operator = RedshiftResumeClusterOperatorAsync( - task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" - ) - - with mock.patch.object(redshift_operator.log, "info") as mock_log_info: - redshift_operator.execute_complete( - context=None, event={"status": "success", "cluster_state": "available"} - ) - mock_log_info.assert_called_with("Resumed cluster successfully, now its in available state") - - def test_resume_cluster_execute_complete_with_event_none(self): - """Asserts that logging occurs as expected""" + def test_init(self): task = RedshiftResumeClusterOperatorAsync( task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" ) - with mock.patch.object(task.log, "info") as mock_log_info: - task.execute_complete(context=None, event=None) - mock_log_info.assert_called_with("%s completed successfully.", "task_test") - - @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status") - def test_resume_cluster_execute_warning(self, mock_sync_cluster_statue): - """Test Pause resume operator execute method with warnings message""" - mock_sync_cluster_statue.return_value = "available" - redshift_operator = RedshiftResumeClusterOperatorAsync( - task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" - ) - - with mock.patch.object(redshift_operator.log, "warning") as mock_log_warning: - redshift_operator.execute(context=None) - mock_log_warning.assert_called_with( - "Unable to resume cluster since cluster is currently in status: %s", "available" - ) - - -class TestRedshiftPauseClusterOperatorAsync: - @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status") - @mock.patch("astronomer.providers.amazon.aws.hooks.redshift_cluster.RedshiftHookAsync.pause_cluster") - @mock.patch("astronomer.providers.amazon.aws.hooks.redshift_cluster.RedshiftHookAsync.get_client_async") - def test_pause_cluster( - self, mock_async_client, mock_async_resume_cluster, mock_sync_cluster_statue, context - ): - """Test Pause cluster operator with available cluster state and check the trigger instance""" - mock_sync_cluster_statue.return_value = "available" - mock_async_client.return_value.pause_cluster.return_value = { - "Cluster": {"ClusterIdentifier": "test_cluster", "ClusterStatus": "pausing"} - } - mock_async_resume_cluster.return_value = {"status": "success", "cluster_state": "paused"} - - redshift_operator = RedshiftPauseClusterOperatorAsync( - task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" - ) - - with pytest.raises(TaskDeferred) as exc: - redshift_operator.execute(context) - - assert isinstance( - exc.value.trigger, RedshiftClusterTrigger - ), "Trigger is not a RedshiftClusterTrigger" - - @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status") - @mock.patch("astronomer.providers.amazon.aws.hooks.redshift_cluster.RedshiftHookAsync.pause_cluster") - @mock.patch("astronomer.providers.amazon.aws.hooks.redshift_cluster.RedshiftHookAsync.get_client_async") - def test_pause_cluster_failure( - self, mock_async_client, mock_async_resume_cluster, mock_sync_cluster_statue - ): - """Test Pause cluster operator with available cluster state in failure test case""" - mock_sync_cluster_statue.return_value = "available" - mock_async_client.return_value.pause_cluster.return_value = { - "Cluster": {"ClusterIdentifier": "test_cluster", "ClusterStatus": "pausing"} - } - mock_async_resume_cluster.return_value = {"status": "success", "cluster_state": "paused"} - - redshift_operator = RedshiftPauseClusterOperatorAsync( - task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" - ) - - with pytest.raises(AirflowException): - redshift_operator.execute_complete( - context=None, event={"status": "error", "message": "test failure message"} - ) - - @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status") - @mock.patch("astronomer.providers.amazon.aws.hooks.redshift_cluster.RedshiftHookAsync.pause_cluster") - @mock.patch("astronomer.providers.amazon.aws.hooks.redshift_cluster.RedshiftHookAsync.get_client_async") - def test_pause_cluster_execute_complete( - self, mock_async_client, mock_async_resume_cluster, mock_sync_cluster_statue - ): - """Test Pause cluster operator execute_complete with available cluster state and return state as paused""" - mock_sync_cluster_statue.return_value = "available" - mock_async_client.return_value.pause_cluster.return_value = { - "Cluster": {"ClusterIdentifier": "test_cluster", "ClusterStatus": "pausing"} - } - mock_async_resume_cluster.return_value = {"status": "success", "cluster_state": "paused"} - - redshift_operator = RedshiftPauseClusterOperatorAsync( - task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" - ) - - with mock.patch.object(redshift_operator.log, "info") as mock_log_info: - redshift_operator.execute_complete( - context=None, event={"status": "success", "cluster_state": "paused"} - ) - mock_log_info.assert_called_with("Paused cluster successfully") - def test_pause_cluster_execute_complete_none(self): - """Asserts that logging occurs as expected""" - task = RedshiftPauseClusterOperatorAsync( - task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" - ) - with mock.patch.object(task.log, "info") as mock_log_info: - task.execute_complete(context=None, event=None) - mock_log_info.assert_called_with("%s completed successfully.", "task_test") - - @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status") - def test_pause_cluster_execute_warning(self, mock_sync_cluster_statue): - """Test Pause cluster operator execute method with warnings message""" - mock_sync_cluster_statue.return_value = "paused" - redshift_operator = RedshiftPauseClusterOperatorAsync( - task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" - ) - - with mock.patch.object(redshift_operator.log, "warning") as mock_log_warning: - redshift_operator.execute(context=None) - mock_log_warning.assert_called_with( - "Unable to pause cluster since cluster is currently in status: %s", "paused" - ) + assert isinstance(task, RedshiftResumeClusterOperator) + assert task.deferrable is True