Skip to content

Commit

Permalink
Deprecate RedshiftDeleteClusterOperatorAsync, RedshiftPauseClusterOpe…
Browse files Browse the repository at this point in the history
…ratorAsync and RedshiftResumeClusterOperatorAsync operators (#1399)

* feat(amazon): deprecate redshift_cluster
* build(setup.cfg): pin apache-airflow-providers-amazon>=8.16.0
* feat(amazon): add deprecation warning to redshift_cluster triggerers and hooks
  • Loading branch information
Lee-W authored Jan 17, 2024
1 parent 4208201 commit 1fa825f
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 481 deletions.
25 changes: 18 additions & 7 deletions astronomer/providers/amazon/aws/hooks/redshift_cluster.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from __future__ import annotations

import asyncio
from typing import Any, Dict, Optional
import warnings
from typing import Any

import botocore.exceptions

Expand All @@ -10,11 +13,19 @@ class RedshiftHookAsync(AwsBaseHookAsync):
"""Interact with AWS Redshift using aiobotocore python library"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
warnings.warn(
(
"This module is deprecated and will be removed in 2.0.0."
"Please use :class: `~airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook`."
),
DeprecationWarning,
stacklevel=2,
)
kwargs["client_type"] = "redshift"
kwargs["resource_type"] = "redshift"
super().__init__(*args, **kwargs)

async def cluster_status(self, cluster_identifier: str, delete_operation: bool = False) -> Dict[str, Any]:
async def cluster_status(self, cluster_identifier: str, delete_operation: bool = False) -> dict[str, Any]:
"""
Connects to the AWS redshift cluster via aiobotocore and get the status
and returns the status of the cluster based on the cluster_identifier passed
Expand All @@ -38,9 +49,9 @@ async def delete_cluster(
self,
cluster_identifier: str,
skip_final_cluster_snapshot: bool = True,
final_cluster_snapshot_identifier: Optional[str] = None,
final_cluster_snapshot_identifier: str | None = None,
polling_period_seconds: float = 5.0,
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""
Connects to the AWS redshift cluster via aiobotocore and
deletes the cluster based on the cluster_identifier passed
Expand Down Expand Up @@ -77,7 +88,7 @@ async def delete_cluster(

async def pause_cluster(
self, cluster_identifier: str, polling_period_seconds: float = 5.0
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""
Connects to the AWS redshift cluster via aiobotocore and
pause the cluster based on the cluster_identifier passed
Expand Down Expand Up @@ -106,7 +117,7 @@ async def resume_cluster(
self,
cluster_identifier: str,
polling_period_seconds: float = 5.0,
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""
Connects to the AWS redshift cluster via aiobotocore and
resume the cluster for the cluster_identifier passed
Expand Down Expand Up @@ -137,7 +148,7 @@ async def get_cluster_status(
expected_state: str,
flag: asyncio.Event,
delete_operation: bool = False,
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""
Make call self.cluster_status to know the status and run till the expected_state is met and set the flag
Expand Down
237 changes: 40 additions & 197 deletions astronomer/providers/amazon/aws/operators/redshift_cluster.py
Original file line number Diff line number Diff line change
@@ -1,221 +1,64 @@
from typing import Any, Optional
import warnings

from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHook
from airflow.providers.amazon.aws.operators.redshift_cluster import (
RedshiftDeleteClusterOperator,
RedshiftPauseClusterOperator,
RedshiftResumeClusterOperator,
)

from astronomer.providers.amazon.aws.triggers.redshift_cluster import (
RedshiftClusterTrigger,
)
from astronomer.providers.utils.typing_compat import Context


class RedshiftDeleteClusterOperatorAsync(RedshiftDeleteClusterOperator):
"""
Delete an AWS Redshift Cluster if cluster status is in `available` state.
:param cluster_identifier: ID of the AWS Redshift Cluster
:param aws_conn_id: aws connection to use
:param skip_final_cluster_snapshot: determines cluster snapshot creation
:param final_cluster_snapshot_identifier: name of final cluster snapshot
:param polling_interval: polling period in seconds to check for the status
This class is deprecated.
Please use :class: `~airflow.providers.amazon.aws.operators.redshift_cluster.RedshiftDeleteClusterOperator`.
"""

def __init__(
self,
*,
skip_final_cluster_snapshot: bool = True,
final_cluster_snapshot_identifier: Optional[str] = None,
cluster_status_fetch_interval_seconds: int = 10,
aws_conn_id: str = "aws_default",
poll_interval: int = 5,
**kwargs: Any,
):
self.skip_final_cluster_snapshot = skip_final_cluster_snapshot
self.final_cluster_snapshot_identifier = final_cluster_snapshot_identifier
self.cluster_status_fetch_interval_seconds = cluster_status_fetch_interval_seconds
self.aws_conn_id = aws_conn_id
self.poll_interval = poll_interval
super().__init__(**kwargs)

def execute(self, context: Context) -> None:
"""
Logic that the operator uses to correctly identify which trigger to
execute, and defer execution as expected.
"""
redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
if cluster_state == "available":
self.defer(
timeout=self.execution_timeout,
trigger=RedshiftClusterTrigger(
task_id=self.task_id,
polling_period_seconds=self.poll_interval,
aws_conn_id=self.aws_conn_id,
cluster_identifier=self.cluster_identifier,
operation_type="delete_cluster",
skip_final_cluster_snapshot=self.skip_final_cluster_snapshot,
final_cluster_snapshot_identifier=self.final_cluster_snapshot_identifier,
),
method_name="execute_complete",
)
elif cluster_state == "cluster_not_found":
self.log.warning(
"Unable to delete cluster since cluster is not found. It may have already been deleted"
)
else:
raise AirflowException(
"Unable to delete cluster since cluster is currently in status: %s", cluster_state
)

def execute_complete(self, context: Context, event: Any = None) -> None:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
"""
if event:
if "status" in event and event["status"] == "error":
msg = "{}: {}".format(event["status"], event["message"])
raise AirflowException(msg)
elif "status" in event and event["status"] == "success":
self.log.info("%s completed successfully.", self.task_id)
self.log.info("Deleted cluster successfully")
else:
raise AirflowException("Did not receive valid event from the trigerrer")
def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def]
warnings.warn(
(
"This module is deprecated. "
"Please use `airflow.providers.amazon.aws.operators.redshift_cluster.RedshiftDeleteClusterOperator` "
"and set deferrable to True instead."
),
DeprecationWarning,
stacklevel=2,
)
return super().__init__(*args, deferrable=True, **kwargs)


class RedshiftResumeClusterOperatorAsync(RedshiftResumeClusterOperator):
"""
Resume a paused AWS Redshift Cluster, and
Few points on the cluster creation to avoid this type of Exception
ex:- 'You can't Resume cluster redshift-cluster-1 because no recently available
backup was found. Create a manual snapshot or wait for an automated snapshot, then retry'
1.While creating the cluster make sure it is created in unique and snapshot is created (or)
2.If it is created with previously deleted cluster name make sure there is a snapshot in the cluster. (or)
3.Delete the cluster with snapshot created (it is not suggested because this snapshot storage is chargeable)
:param cluster_identifier: id of the AWS Redshift Cluster
:param aws_conn_id: aws connection to use
This class is deprecated.
Please use :class: `~airflow.providers.amazon.aws.operators.redshift_cluster.RedshiftResumeClusterOperator`.
"""

def __init__(
self,
*,
poll_interval: int = 5,
**kwargs: Any,
):
self.poll_interval = poll_interval
super().__init__(**kwargs)

def execute(self, context: Context) -> None:
"""
Logic that the operator uses to correctly identify which trigger to
execute, and defer execution as expected.
"""
redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
if cluster_state == "paused":
self.defer(
timeout=self.execution_timeout,
trigger=RedshiftClusterTrigger(
task_id=self.task_id,
polling_period_seconds=self.poll_interval,
aws_conn_id=self.aws_conn_id,
cluster_identifier=self.cluster_identifier,
operation_type="resume_cluster",
),
method_name="execute_complete",
)
else:
self.log.warning(
"Unable to resume cluster since cluster is currently in status: %s", cluster_state
)

def execute_complete(self, context: Context, event: Any = None) -> None:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
"""
if event:
if "status" in event and event["status"] == "error":
msg = "{}: {}".format(event["status"], event["message"])
raise AirflowException(msg)
elif "status" in event and event["status"] == "success":
self.log.info("%s completed successfully.", self.task_id)
self.log.info("Resumed cluster successfully, now its in available state")
return None
else:
self.log.info("%s completed successfully.", self.task_id)
return None
def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def]
warnings.warn(
(
"This module is deprecated. "
"Please use `airflow.providers.apache.aws.operators.redshift_cluster.RedshiftResumeClusterOperator` "
"and set deferrable to True instead."
),
DeprecationWarning,
stacklevel=2,
)
return super().__init__(*args, deferrable=True, **kwargs)


class RedshiftPauseClusterOperatorAsync(RedshiftPauseClusterOperator):
"""
Pause an AWS Redshift Cluster if cluster status is in `available` state, and
Few points on the cluster creation to avoid this type of Exception
ex:- 'You can't pause cluster redshift-cluster-1 because no recently available
backup was found. Create a manual snapshot or wait for an automated snapshot, then retry'
1.While creating the cluster make sure it is created in unique and snapshot is created (or)
2.If it is created with previously deleted cluster name make sure there is a snapshot in the cluster. (or)
3.Delete the cluster with snapshot created (it is not suggested because this snapshot storage is chargeable)
:param cluster_identifier: id of the AWS Redshift Cluster
:param aws_conn_id: aws connection to use
This class is deprecated.
Please use :class: `~airflow.providers.amazon.aws.operators.redshift_cluster.RedshiftPauseClusterOperator`.
"""

def __init__(
self,
*,
poll_interval: int = 5,
**kwargs: Any,
):
self.poll_interval = poll_interval
super().__init__(**kwargs)

def execute(self, context: Context) -> None:
"""
Logic that the operator uses to correctly identify which trigger to
execute, and defer execution as expected.
"""
redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
if cluster_state == "available":
self.defer(
timeout=self.execution_timeout,
trigger=RedshiftClusterTrigger(
task_id=self.task_id,
polling_period_seconds=self.poll_interval,
aws_conn_id=self.aws_conn_id,
cluster_identifier=self.cluster_identifier,
operation_type="pause_cluster",
),
method_name="execute_complete",
)
else:
self.log.warning(
"Unable to pause cluster since cluster is currently in status: %s", cluster_state
)

def execute_complete(self, context: Context, event: Any = None) -> None:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
"""
if event:
if "status" in event and event["status"] == "error":
msg = "{}: {}".format(event["status"], event["message"])
raise AirflowException(msg)
elif "status" in event and event["status"] == "success":
self.log.info("%s completed successfully.", self.task_id)
self.log.info("Paused cluster successfully")
return None
else:
self.log.info("%s completed successfully.", self.task_id)
return None
def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def]
warnings.warn(
(
"This module is deprecated. "
"Please use `airflow.providers.apache.aws.operators.redshift_cluster.RedshiftPauseClusterOperator` "
"and set deferrable to True instead."
),
DeprecationWarning,
stacklevel=2,
)
return super().__init__(*args, deferrable=True, **kwargs)
9 changes: 9 additions & 0 deletions astronomer/providers/amazon/aws/triggers/redshift_cluster.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import warnings
from typing import Any, AsyncIterator, Dict, Optional, Tuple

from airflow.triggers.base import BaseTrigger, TriggerEvent
Expand Down Expand Up @@ -30,6 +31,14 @@ def __init__(
skip_final_cluster_snapshot: bool = True,
final_cluster_snapshot_identifier: Optional[str] = None,
):
warnings.warn(
(
"This module is deprecated and will be removed in 2.0.0."
"Please use hooks in :module: `~airflow.providers.amazon.aws.triggers.redshift_cluster`."
),
DeprecationWarning,
stacklevel=2,
)
super().__init__()
self.task_id = task_id
self.polling_period_seconds = polling_period_seconds
Expand Down
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ zip_safe = false

[options.extras_require]
amazon =
apache-airflow-providers-amazon>=8.15.0
apache-airflow-providers-amazon>=8.16.0
aiobotocore>=2.1.1
apache.hive =
apache-airflow-providers-apache-hive>=6.1.5
Expand Down Expand Up @@ -119,7 +119,7 @@ mypy =
# All extras from above except 'mypy', 'docs' and 'tests'
all =
aiobotocore>=2.1.1
apache-airflow-providers-amazon>=8.15.0
apache-airflow-providers-amazon>=8.16.0
apache-airflow-providers-apache-hive>=6.1.5
apache-airflow-providers-apache-livy
apache-airflow-providers-cncf-kubernetes>=4
Expand Down
Loading

0 comments on commit 1fa825f

Please sign in to comment.