Skip to content

Commit

Permalink
Upgrade to new airflow.
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelkuty committed May 9, 2018
1 parent 8481c58 commit cecded4
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 23 deletions.
16 changes: 15 additions & 1 deletion airflow_plugins/operators/sensors/file_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import time
from datetime import datetime, timedelta

import pytz
from airflow.exceptions import (
AirflowException,
AirflowSensorTimeout,
Expand All @@ -12,6 +13,7 @@
from airflow.operators.sensors import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from pytz import timezone
from pytz.exceptions import UnknownTimeZoneError

from airflow_plugins.hooks import FTPHook
from airflow_plugins.operators import FileOperator
Expand Down Expand Up @@ -66,7 +68,14 @@ def _send_notification(self, context, success=False):
send_notification(ti.get_dagrun(), text, title, color)
return

runtime = datetime.now() - ti.start_date
try:
tz = timezone(ti.start_date.tm_zone)
except (AttributeError, UnknownTimeZoneError): # tm_zone not set on t
runtime = datetime.now() - ti.start_date.replace(tzinfo=None)
else:
runtime = datetime.utcnow().replace(
tzinfo=ti.tm_zone) - ti.start_date

if runtime >= self.notify_after:
if (self.last_notification is None or
runtime >= self.last_notification + self.notify_delta):
Expand Down Expand Up @@ -149,6 +158,7 @@ def execute(self, context):
# notify about success in case of previous warnings
self._send_notification(context, success=True)
logging.info('Success criteria met. Exiting.')
self._send_notification(context, success=False)
return poke_result

def poke(self, context):
Expand Down Expand Up @@ -196,6 +206,10 @@ def poke(self, context):

def get_last_modified(fileobj):
timestamp = fileobj.last_modified

if isinstance(timestamp, datetime):
return timestamp

tformat = '%a, %d %b %Y %H:%M:%S %Z'
dt = datetime.strptime(timestamp, tformat)
t = time.strptime(timestamp, tformat)
Expand Down
56 changes: 34 additions & 22 deletions tests/operators/sensors/test_file_sensor.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from datetime import datetime, timedelta
from time import sleep

import boto
import boto3
import pytest
from airflow.exceptions import AirflowException
from boto.s3.key import Key
# from boto3.s3.key import Key
from mock import Mock
from moto import mock_s3

Expand Down Expand Up @@ -46,13 +46,14 @@ def test_files_sensor_fail_on_unsupported_connection():

@mock_s3
def test_files_on_s3():
conn = boto.connect_s3()
bucket = conn.create_bucket('storiesbi-datapipeline')
conn = boto3.resource('s3')
conn.create_bucket(Bucket='storiesbi-datapipeline')

get_or_update_conn("s3.stories.bi", conn_type="s3")

file_sensor = FileSensor(
task_id="check_new_file",
path="foo",
path="ss://storiesbi-datapipeline/foo",
conn_id="s3.stories.bi",
modified="anytime"
)
Expand All @@ -61,27 +62,22 @@ def test_files_on_s3():

assert not file_sensor.poke(ctx)

k = Key(bucket)
k.key = "foo"
k.set_contents_from_string("bar")
conn.Object('storiesbi-datapipeline', 'foo').put(Body="bar")

assert file_sensor.poke(ctx)


@mock_s3
def test_files_on_s3_modified_after():
conn = boto.connect_s3()
bucket = conn.create_bucket('storiesbi-datapipeline')

k = Key(bucket)
k.key = "foo"
k.set_contents_from_string("bar")
conn = boto3.resource('s3')
conn.create_bucket(Bucket='storiesbi-datapipeline')
conn.Object('storiesbi-datapipeline', 'foo').put(Body="bar")

get_or_update_conn("s3.stories.bi", conn_type="s3")

file_sensor = FileSensor(
task_id="check_new_file",
path="foo",
path="s3://storiesbi-datapipeline/foo",
conn_id="s3.stories.bi",
modified=datetime.now()
)
Expand All @@ -92,19 +88,16 @@ def test_files_on_s3_modified_after():

# Hacky hacky!
sleep(1)
key = bucket.get_key("foo")
key.set_contents_from_string("baz")
conn.Object('storiesbi-datapipeline', 'foo').put(Body="baz")

assert file_sensor.poke(ctx)


@mock_s3
def test_files_on_s3_from_custom_bucket_defined_in_path():
conn = boto.connect_s3()
bucket = conn.create_bucket('testing')
k = Key(bucket)
k.key = "foo"
k.set_contents_from_string("baz")
conn = boto3.resource('s3')
conn.create_bucket(Bucket='testing')
conn.Object('testing', 'foo').put(Body="baz")

get_or_update_conn("s3.stories.bi", conn_type="s3")
yesterday = datetime.now() - timedelta(1)
Expand All @@ -119,3 +112,22 @@ def test_files_on_s3_from_custom_bucket_defined_in_path():
file_sensor.pre_execute(ctx)

assert file_sensor.poke(ctx)


@mock_s3
def test_operator_notification():
conn = boto3.resource('s3')
conn.create_bucket(Bucket='testing')
conn.Object('testing', 'foo').put(Body="baz")

get_or_update_conn("s3.stories.bi", conn_type="s3")
yesterday = datetime.now() - timedelta(1)

file_sensor = FileSensor(
task_id="check_new_file",
path="s3://testing/foo",
conn_id="s3.stories.bi",
modified=yesterday
)

file_sensor._send_notification(ctx, success=False)

0 comments on commit cecded4

Please sign in to comment.