From 452113175daeb998ce8b509d16b595103709e153 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 20 Dec 2023 19:43:29 +0800 Subject: [PATCH] feat(amazon): deprecate S3KeysUnchangedSensorAsync --- astronomer/providers/amazon/aws/sensors/s3.py | 80 ++++--------------- tests/amazon/aws/sensors/test_s3_sensors.py | 79 ++---------------- 2 files changed, 19 insertions(+), 140 deletions(-) diff --git a/astronomer/providers/amazon/aws/sensors/s3.py b/astronomer/providers/amazon/aws/sensors/s3.py index 94ec86cc8..5c47ad104 100644 --- a/astronomer/providers/amazon/aws/sensors/s3.py +++ b/astronomer/providers/amazon/aws/sensors/s3.py @@ -9,10 +9,9 @@ from airflow.sensors.base import BaseSensorOperator from astronomer.providers.amazon.aws.triggers.s3 import ( - S3KeysUnchangedTrigger, S3KeyTrigger, ) -from astronomer.providers.utils.sensor_util import poke, raise_error_or_skip_exception +from astronomer.providers.utils.sensor_util import raise_error_or_skip_exception from astronomer.providers.utils.typing_compat import Context @@ -155,72 +154,21 @@ def __init__( class S3KeysUnchangedSensorAsync(S3KeysUnchangedSensor): """ - Checks for changes in the number of objects at prefix in AWS S3 - bucket and returns True if the inactivity period has passed with no - increase in the number of objects. Note, this sensor will not behave correctly - in reschedule mode, as the state of the listed objects in the S3 bucket will - be lost between rescheduled invocations. - - :param bucket_name: Name of the S3 bucket - :param prefix: The prefix being waited on. Relative path from bucket root level. - :param aws_conn_id: a reference to the s3 connection - :param verify: Whether or not to verify SSL certificates for S3 connection. - By default SSL certificates are verified. - You can provide the following values: - - - ``False``: do not validate SSL certificates. SSL will still be used - (unless use_ssl is False), but SSL certificates will not be - verified. - - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. - You can specify this argument if you want to use a different - CA cert bundle than the one used by botocore. - :param inactivity_period: The total seconds of inactivity to designate - keys unchanged. Note, this mechanism is not real time and - this operator may not return until a poke_interval after this period - has passed with no additional objects sensed. - :param min_objects: The minimum number of objects needed for keys unchanged - sensor to be considered valid. - :param previous_objects: The set of object ids found during the last poke. - :param allow_delete: Should this sensor consider objects being deleted - between pokes valid behavior. If true a warning message will be logged - when this happens. If false an error will be raised. + This class is deprecated. + Please use :class: `~airflow.providers.amazon.aws.sensors.s3.S3KeysUnchangedSensor`. """ - def __init__( - self, - **kwargs: Any, - ) -> None: - super().__init__(**kwargs) - - def execute(self, context: Context) -> None: - """Defers Trigger class to check for changes in the number of objects at prefix in AWS S3""" - if not poke(self, context): - self.defer( - timeout=timedelta(seconds=self.timeout), - trigger=S3KeysUnchangedTrigger( - bucket_name=self.bucket_name, - prefix=self.prefix, - inactivity_period=self.inactivity_period, - min_objects=self.min_objects, - previous_objects=self.previous_objects, - inactivity_seconds=self.inactivity_seconds, - allow_delete=self.allow_delete, - aws_conn_id=self.aws_conn_id, - verify=self.verify, - last_activity_time=self.last_activity_time, - ), - method_name="execute_complete", - ) - - def execute_complete(self, context: Context, event: Any = None) -> None: - """ - Callback for when the trigger fires - returns immediately. - Relies on trigger to throw an exception, otherwise it assumes execution was - successful. - """ - if event["status"] == "error": - raise_error_or_skip_exception(self.soft_fail, event["message"]) - return None + def __init__(self, *args, **kwargs): + warnings.warn( + ( + "This module is deprecated. " + "Please use `airflow.providers.amazon.aws.sensors.s3.S3KeysUnchangedSensor` " + "and set deferrable to True instead." + ), + DeprecationWarning, + stacklevel=2, + ) + return super().__init__(*args, deferrable=True, **kwargs) class S3PrefixSensorAsync(BaseSensorOperator): diff --git a/tests/amazon/aws/sensors/test_s3_sensors.py b/tests/amazon/aws/sensors/test_s3_sensors.py index 8ba3047ad..aabc7f67e 100644 --- a/tests/amazon/aws/sensors/test_s3_sensors.py +++ b/tests/amazon/aws/sensors/test_s3_sensors.py @@ -7,6 +7,7 @@ from airflow.exceptions import AirflowException, AirflowSkipException, TaskDeferred from airflow.models import DAG, DagRun, TaskInstance from airflow.models.variable import Variable +from airflow.providers.amazon.aws.sensors.s3 import S3KeysUnchangedSensor from airflow.utils import timezone from parameterized import parameterized @@ -17,7 +18,6 @@ S3PrefixSensorAsync, ) from astronomer.providers.amazon.aws.triggers.s3 import ( - S3KeysUnchangedTrigger, S3KeyTrigger, ) @@ -293,81 +293,12 @@ def test_soft_fail_enable(self, context): class TestS3KeysUnchangedSensorAsync: - @mock.patch(f"{MODULE}.S3KeysUnchangedSensorAsync.defer") - @mock.patch(f"{MODULE}.S3KeysUnchangedSensorAsync.poke", return_value=True) - def test_s3_keys_unchanged_sensor_async_finish_before_deferred(self, mock_poke, mock_defer, context): - """Assert task is not deferred when it receives a finish status before deferring""" - S3KeysUnchangedSensorAsync( - task_id="s3_keys_unchanged_sensor", bucket_name="test_bucket", prefix="test" - ) - assert not mock_defer.called - - @mock.patch(f"{MODULE}.S3KeysUnchangedSensorAsync.poke", return_value=False) - @mock.patch("airflow.providers.amazon.aws.sensors.s3.S3Hook") - def test_s3_keys_unchanged_sensor_check_trigger_instance(self, mock_hook, mock_poke, context): - """ - Asserts that a task is deferred and an S3KeysUnchangedTrigger will be fired - when the S3KeysUnchangedSensorAsync is executed. - """ - mock_hook.check_for_key.return_value = False - - sensor = S3KeysUnchangedSensorAsync( + def test_init(self): + task = S3KeysUnchangedSensorAsync( task_id="s3_keys_unchanged_sensor", bucket_name="test_bucket", prefix="test" ) - - with pytest.raises(TaskDeferred) as exc: - sensor.execute(context) - - assert isinstance( - exc.value.trigger, S3KeysUnchangedTrigger - ), "Trigger is not a S3KeysUnchangedTrigger" - - @parameterized.expand([["bucket", "test"]]) - @mock.patch(f"{MODULE}.S3KeysUnchangedSensorAsync.poke", return_value=False) - @mock.patch("airflow.providers.amazon.aws.sensors.s3.S3Hook") - def test_s3_keys_unchanged_sensor_execute_complete_success(self, bucket, prefix, mock_hook, mock_poke): - """ - Asserts that a task completed with success status - """ - mock_hook.check_for_key.return_value = False - - sensor = S3KeysUnchangedSensorAsync( - task_id="s3_keys_unchanged_sensor", - bucket_name=bucket, - prefix=prefix, - ) - assert sensor.execute_complete(context={}, event={"status": "success"}) is None - - @parameterized.expand([["bucket", "test"]]) - @mock.patch(f"{MODULE}.S3KeysUnchangedSensorAsync.poke", return_value=False) - @mock.patch("airflow.providers.amazon.aws.sensors.s3.S3Hook") - def test_s3_keys_unchanged_sensor_execute_complete_error(self, bucket, prefix, mock_hook, mock_poke): - """ - Asserts that a task is completed with error. - """ - mock_hook.check_for_key.return_value = False - - sensor = S3KeysUnchangedSensorAsync( - task_id="s3_keys_unchanged_sensor", - bucket_name=bucket, - prefix=prefix, - ) - with pytest.raises(AirflowException): - sensor.execute_complete(context={}, event={"status": "error", "message": "Mocked error"}) - - @mock.patch(f"{MODULE}.S3KeysUnchangedSensorAsync.poke", return_value=False) - def test_s3_keys_unchanged_sensor_raise_value_error(self, mock_poke): - """ - Test if the S3KeysUnchangedTrigger raises Value error for negative inactivity_period. - """ - with pytest.raises(ValueError): - S3KeysUnchangedSensorAsync( - task_id="s3_keys_unchanged_sensor", - bucket_name="test_bucket", - prefix="test", - inactivity_period=-100, - ) - + assert isinstance(task, S3KeysUnchangedSensor) + assert task.deferrable is True class TestS3KeySizeSensorAsync(unittest.TestCase): def test_deprecation_warnings_generated(self):