Skip to content

Commit

Permalink
#1085 use variable for EXPORT_PATH
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielwol committed Nov 28, 2024
1 parent 90782fb commit 54669ac
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 23 deletions.
50 changes: 30 additions & 20 deletions dags/ecocounter_open_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""
import sys
import os
from datetime import timedelta, datetime
from datetime import timedelta
import logging
import pendulum
from functools import partial
Expand Down Expand Up @@ -32,6 +32,7 @@

README_PATH = os.path.join(repo_path, 'volumes/ecocounter/readme.md')
DOC_MD = get_readme_docmd(README_PATH, DAG_NAME)
EXPORT_PATH = '/home/airflow/open_data/permanent-bike-counters' #'/data/open_data/permanent-bike-counters'

default_args = {
'owner': ','.join(DAG_OWNERS),
Expand Down Expand Up @@ -137,7 +138,8 @@ def insert_15min(yr):
env={
"HOST": BaseHook.get_connection("ecocounter_bot").host,
"USER" : BaseHook.get_connection("ecocounter_bot").login,
"PGPASSWORD": BaseHook.get_connection("ecocounter_bot").password
"PGPASSWORD": BaseHook.get_connection("ecocounter_bot").password,
"EXPORT_PATH": EXPORT_PATH,
}
)
def download_daily_open_data(yr)->str:
Expand All @@ -146,17 +148,16 @@ def download_daily_open_data(yr)->str:
return f'''/usr/bin/psql -h $HOST -U $USER -d bigdata -c \
"SELECT location_name, direction, dt, daily_volume
FROM open_data.cycling_permanent_counts_daily
WHERE
dt >= to_date({yr}::text, 'yyyy')
AND dt < LEAST(date_trunc('month', now()), to_date(({yr}::int+1)::text, 'yyyy'));" \
--csv -o "/data/open_data/permanent-bike-counters/cycling_permanent_counts_daily_{yr}.csv"'''
WHERE dt < LEAST(date_trunc('month', now()));" \
--csv -o "$EXPORT_PATH/cycling_permanent_counts_daily.csv"'''

@task.bash(
map_index_template="{{ yr }}",
env={
"HOST": BaseHook.get_connection("ecocounter_bot").host,
"USER" : BaseHook.get_connection("ecocounter_bot").login,
"PGPASSWORD": BaseHook.get_connection("ecocounter_bot").password
"PGPASSWORD": BaseHook.get_connection("ecocounter_bot").password,
"EXPORT_PATH": EXPORT_PATH,
}
)
def download_15min_open_data(yr)->str:
Expand All @@ -168,24 +169,35 @@ def download_15min_open_data(yr)->str:
WHERE
datetime_bin >= to_date({yr}::text, 'yyyy')
AND datetime_bin < LEAST(date_trunc('month', now()), to_date(({yr}+1)::text, 'yyyy'));" \
--csv -o "/data/open_data/permanent-bike-counters/cycling_permanent_counts_15min_{yr}.csv"'''
--csv -o "$EXPORT_PATH/cycling_permanent_counts_15min_{yr}_{yr+1}.csv"'''

insert_daily(yr) >> download_daily_open_data(yr)
insert_15min(yr) >> download_15min_open_data(yr)

@task.bash(env={
"HOST": BaseHook.get_connection("ecocounter_bot").host,
"USER" : BaseHook.get_connection("ecocounter_bot").login,
"PGPASSWORD": BaseHook.get_connection("ecocounter_bot").password
"PGPASSWORD": BaseHook.get_connection("ecocounter_bot").password,
"EXPORT_PATH": EXPORT_PATH,
})
def download_locations_open_data()->str:
return '''/usr/bin/psql -h $HOST -U $USER -d bigdata -c \
"SELECT location_name, direction, linear_name_full, side_street, longitude,
latitude, centreline_id, bin_size, latest_calibration_study,
first_active, last_active, date_decommissioned, technology
FROM open_data.cycling_permanent_counts_locations" \
--csv -o /data/open_data/permanent-bike-counters/cycling_permanent_counts_locations.csv'''

--csv -o "$EXPORT_PATH/cycling_permanent_counts_locations.csv"'''

#@task.bash(env={
# "EXPORT_PATH": EXPORT_PATH,
#})
#def output_readme()->str:
# return '''
# pandoc -V geometry:margin=1in \
# -o "$EXPORT_PATH/cycling_permanent_counts_readme.pdf" \
# volumes/open_data/sql/cycling_permanent_counts_readme.md
# '''

@task(
retries=0,
trigger_rule='all_success',
Expand All @@ -195,23 +207,21 @@ def status_message(ds = None, **context):
mnth = ds_format(ds, '%Y-%m-%d', '%Y-%m-01')
send_slack_msg(
context=context,
msg=f"Ecocounter :open_data_to: DAG ran successfully for {mnth} :white_check_mark:",
msg=f"Ecocounter :open_data_to: DAG ran successfully for {mnth} :white_check_mark:. "
f"Remember to `cp {EXPORT_PATH}/* /data/open_data/permanent-bike-counters` as bigdata.",
use_proxy=True
)

@task.bash()
def output_readme()->str:
return '''
pandoc -V geometry:margin=1in \
-o /data/open_data/permanent-bike-counters/cycling_permanent_counts_readme.pdf \
volumes/open_data/sql/cycling_permanent_counts_readme.md
'''
yrs = get_years()
(
check_data_availability >>
reminder_message() >>
wait_till_10th >>
[insert_and_download_data.expand(yr = yrs), download_locations_open_data(), output_readme()] >>
[
insert_and_download_data.expand(yr = yrs),
download_locations_open_data()
#output_readme()
] >>
status_message()
)

Expand Down
6 changes: 3 additions & 3 deletions volumes/ecocounter/open_data/ecocounter_open_data_export.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ done

#as gwolofs
#grant permission for bigdata to read from my home folder.
setfacl -R -m u:bigdata:rx /data/home/gwolofs/open_data/permanent-bike-counters
setfacl -R -m u:bigdata:rx $dest_path

pbrun su - bigdata
rm /data/open_data/permanent-bike-counters/*
cp -r /data/home/gwolofs/open_data/permanent-bike-counters/*.csv /data/open_data/permanent-bike-counters
cp -r /data/home/gwolofs/open_data/permanent-bike-counters/*.pdf /data/open_data/permanent-bike-counters
cp -r "$dest_path/*.csv" /data/open_data/permanent-bike-counters
cp -r "$dest_path/*.pdf" /data/open_data/permanent-bike-counters

cd /data/open_data/permanent-bike-counters
wc -l ./*

0 comments on commit 54669ac

Please sign in to comment.