diff --git a/airflow_plugins/operators/sensors/file_sensor.py b/airflow_plugins/operators/sensors/file_sensor.py index 98ea4f8..cf25b74 100644 --- a/airflow_plugins/operators/sensors/file_sensor.py +++ b/airflow_plugins/operators/sensors/file_sensor.py @@ -3,6 +3,7 @@ import time from datetime import datetime, timedelta +import pytz from airflow.exceptions import ( AirflowException, AirflowSensorTimeout, @@ -12,6 +13,7 @@ from airflow.operators.sensors import BaseSensorOperator from airflow.utils.decorators import apply_defaults from pytz import timezone +from pytz.exceptions import UnknownTimeZoneError from airflow_plugins.hooks import FTPHook from airflow_plugins.operators import FileOperator @@ -66,7 +68,14 @@ def _send_notification(self, context, success=False): send_notification(ti.get_dagrun(), text, title, color) return - runtime = datetime.now() - ti.start_date + try: + tz = timezone(ti.start_date.tm_zone) + except (AttributeError, UnknownTimeZoneError): # tm_zone not set on t + runtime = datetime.now() - ti.start_date + else: + runtime = datetime.utcnow().replace( + tzinfo=ti.tm_zone) - ti.start_date + if runtime >= self.notify_after: if (self.last_notification is None or runtime >= self.last_notification + self.notify_delta): @@ -149,6 +158,7 @@ def execute(self, context): # notify about success in case of previous warnings self._send_notification(context, success=True) logging.info('Success criteria met. Exiting.') + self._send_notification(context, success=False) return poke_result def poke(self, context): @@ -196,6 +206,10 @@ def poke(self, context): def get_last_modified(fileobj): timestamp = fileobj.last_modified + + if isinstance(timestamp, datetime): + return timestamp + tformat = '%a, %d %b %Y %H:%M:%S %Z' dt = datetime.strptime(timestamp, tformat) t = time.strptime(timestamp, tformat) diff --git a/tests/operators/sensors/test_file_sensor.py b/tests/operators/sensors/test_file_sensor.py index d252ecf..3d6899c 100644 --- a/tests/operators/sensors/test_file_sensor.py +++ b/tests/operators/sensors/test_file_sensor.py @@ -1,10 +1,10 @@ from datetime import datetime, timedelta from time import sleep -import boto +import boto3 import pytest from airflow.exceptions import AirflowException -from boto.s3.key import Key +# from boto3.s3.key import Key from mock import Mock from moto import mock_s3 @@ -46,13 +46,14 @@ def test_files_sensor_fail_on_unsupported_connection(): @mock_s3 def test_files_on_s3(): - conn = boto.connect_s3() - bucket = conn.create_bucket('storiesbi-datapipeline') + conn = boto3.resource('s3') + conn.create_bucket(Bucket='storiesbi-datapipeline') + get_or_update_conn("s3.stories.bi", conn_type="s3") file_sensor = FileSensor( task_id="check_new_file", - path="foo", + path="ss://storiesbi-datapipeline/foo", conn_id="s3.stories.bi", modified="anytime" ) @@ -61,27 +62,22 @@ def test_files_on_s3(): assert not file_sensor.poke(ctx) - k = Key(bucket) - k.key = "foo" - k.set_contents_from_string("bar") + conn.Object('storiesbi-datapipeline', 'foo').put(Body="bar") assert file_sensor.poke(ctx) @mock_s3 def test_files_on_s3_modified_after(): - conn = boto.connect_s3() - bucket = conn.create_bucket('storiesbi-datapipeline') - - k = Key(bucket) - k.key = "foo" - k.set_contents_from_string("bar") + conn = boto3.resource('s3') + conn.create_bucket(Bucket='storiesbi-datapipeline') + conn.Object('storiesbi-datapipeline', 'foo').put(Body="bar") get_or_update_conn("s3.stories.bi", conn_type="s3") file_sensor = FileSensor( task_id="check_new_file", - path="foo", + path="s3://storiesbi-datapipeline/foo", conn_id="s3.stories.bi", modified=datetime.now() ) @@ -92,19 +88,16 @@ def test_files_on_s3_modified_after(): # Hacky hacky! sleep(1) - key = bucket.get_key("foo") - key.set_contents_from_string("baz") + conn.Object('storiesbi-datapipeline', 'foo').put(Body="baz") assert file_sensor.poke(ctx) @mock_s3 def test_files_on_s3_from_custom_bucket_defined_in_path(): - conn = boto.connect_s3() - bucket = conn.create_bucket('testing') - k = Key(bucket) - k.key = "foo" - k.set_contents_from_string("baz") + conn = boto3.resource('s3') + conn.create_bucket(Bucket='testing') + conn.Object('testing', 'foo').put(Body="baz") get_or_update_conn("s3.stories.bi", conn_type="s3") yesterday = datetime.now() - timedelta(1) @@ -119,3 +112,22 @@ def test_files_on_s3_from_custom_bucket_defined_in_path(): file_sensor.pre_execute(ctx) assert file_sensor.poke(ctx) + + +@mock_s3 +def test_operator_notification(): + conn = boto3.resource('s3') + conn.create_bucket(Bucket='testing') + conn.Object('testing', 'foo').put(Body="baz") + + get_or_update_conn("s3.stories.bi", conn_type="s3") + yesterday = datetime.now() - timedelta(1) + + file_sensor = FileSensor( + task_id="check_new_file", + path="s3://testing/foo", + conn_id="s3.stories.bi", + modified=yesterday + ) + + file_sensor._send_notification(ctx, success=False)