Skip to content

Commit

Permalink
#905 add check_data_availability, update dag params
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielwol committed Mar 25, 2024
1 parent 688b1bc commit c44deb8
Showing 1 changed file with 27 additions and 3 deletions.
30 changes: 27 additions & 3 deletions dags/miovision_open_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
repo_path = os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
sys.path.insert(0, repo_path)
from dags.dag_functions import task_fail_slack_alert, send_slack_msg
from dags.custom_operators import SQLCheckOperatorWithReturnValue
except:
raise ImportError("Cannot import DAG helper functions.")

Expand All @@ -30,7 +31,7 @@
'owner': ','.join(DAG_OWNERS),
'depends_on_past':False,
#set earlier start_date + catchup when ready?
'start_date': pendulum.datetime(2023, 12, 18, tz="America/Toronto"),
'start_date': pendulum.datetime(2024, 1, 1, tz="America/Toronto"),
'email_on_failure': False,
'email_on_success': False,
'retries': 0,
Expand All @@ -42,7 +43,8 @@
dag_id=DAG_NAME,
default_args=default_args,
schedule='0 14 3 * *', # 2pm, 3rd day of each month
catchup=False,
catchup=True,
max_active_runs=1,
tags=["miovision", "open_data"],
doc_md=__doc__
)
Expand All @@ -52,6 +54,24 @@ def miovision_open_data_dag():
#for the first of the month. Decided it should run later
#to give time for anomalous_range updates if any.

check_data_availability = SQLCheckOperatorWithReturnValue(
task_id="check_data_availability",
sql="""WITH daily_volumes AS (
SELECT dt::date, COALESCE(SUM(daily_volume), 0) AS daily_volume
FROM generate_series('{{ macros.ds_format(ds, '%Y-%m-%d', '%Y-%m-01') }}'::date,
'{{ macros.ds_format(ds, '%Y-%m-%d', '%Y-%m-01') }}'::date + '1 month'::interval - '1 day'::interval,
'1 day'::interval) AS dates(dt)
LEFT JOIN miovision_api.volumes_daily_unfiltered USING (dt)
GROUP BY dt
ORDER BY dt
)
SELECT NOT(COUNT(*) > 0), 'Missing dates: ' || string_agg(dt::text, ', ')
FROM daily_volumes
WHERE daily_volume = 0""",
conn_id="miovision_api_bot"
)

refresh_monthly_open_data = PostgresOperator(
task_id='refresh_monthly_open_data',
sql="SELECT gwolofs.insert_miovision_open_data_monthly_summary('{{ macros.ds_format(ds, '%Y-%m-%d', '%Y-%m-01') }}'::date)",
Expand All @@ -78,6 +98,10 @@ def status_message(ds = None, **context):
msg=f":meow_miovision: :open_data_to: DAG ran successfully for {mnth} :white_check_mark:"
)

refresh_monthly_open_data >> refresh_15min_open_data >> status_message()
(
check_data_availability >>
[refresh_monthly_open_data, refresh_15min_open_data] >>
status_message()
)

miovision_open_data_dag()

0 comments on commit c44deb8

Please sign in to comment.