Skip to content

Commit

Permalink
#1085 use templating for conn details
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielwol committed Dec 20, 2024
1 parent ae5c566 commit d00e15a
Showing 1 changed file with 37 additions and 34 deletions.
71 changes: 37 additions & 34 deletions dags/ecocounter_open_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,16 @@ def insert_15min(yr):
)
return t.execute(context=context)

@task.bash(append_env=True)
def download_daily_open_data(export_path)->str:
conn = BaseHook.get_connection("ecocounter_bot")
os.environ['HOST'] = conn.host
os.environ['LOGIN'] = conn.login
os.environ['PW'] = conn.password
os.environ['EXPORT_PATH'] = export_path
return '''PGPASSWORD=$PW /usr/bin/psql -h $HOST -U $LOGIN -d bigdata -c \
@task.bash(
env = {
'HOST': '{{ conn.ecocounter_bot.host }}',
'LOGIN': '{{ conn.ecocounter_bot.login }}',
'PGPASSWORD': '{{ conn.ecocounter_bot.password }}',
'EXPORT_PATH': EXPORT_PATH
}
)
def download_daily_open_data()->str:
return '''/usr/bin/psql -h $HOST -U $LOGIN -d bigdata -c \
"SELECT
location_dir_id, location_name, direction, linear_name_full,
side_street, dt, daily_volume
Expand All @@ -147,18 +149,18 @@ def download_daily_open_data(export_path)->str:
--csv -o "$EXPORT_PATH/cycling_permanent_counts_daily.csv"'''

@task.bash(
map_index_template="{{ yr }}",
append_env=True
env = {
'HOST': '{{ conn.ecocounter_bot.host }}',
'LOGIN': '{{ conn.ecocounter_bot.login }}',
'PGPASSWORD': '{{ conn.ecocounter_bot.password }}',
'EXPORT_PATH': EXPORT_PATH
},
map_index_template="{{ yr }}"
)
def download_15min_open_data(yr, export_path)->str:
def download_15min_open_data(yr)->str:
context = get_current_context()
context["yr"] = yr
conn = BaseHook.get_connection("ecocounter_bot")
os.environ['HOST'] = conn.host
os.environ['LOGIN'] = conn.login
os.environ['PW'] = conn.password
os.environ['EXPORT_PATH'] = export_path
return f'''PGPASSWORD=$PW /usr/bin/psql -h $HOST -U $LOGIN -d bigdata -c \
return f'''/usr/bin/psql -h $HOST -U $LOGIN -d bigdata -c \
"SELECT location_dir_id, datetime_bin, bin_volume
FROM open_data.cycling_permanent_counts_15min
WHERE
Expand All @@ -168,30 +170,31 @@ def download_15min_open_data(yr, export_path)->str:
--csv -o "$EXPORT_PATH/cycling_permanent_counts_15min_{yr}_{yr+1}.csv"'''

#insert only latest year data, but download everything (single file)
insert_daily(yr) >> download_daily_open_data(export_path=EXPORT_PATH)
insert_15min(yr) >> download_15min_open_data(yr, export_path=EXPORT_PATH)
insert_daily(yr) >> download_daily_open_data()
insert_15min(yr) >> download_15min_open_data(yr)

@task.bash(append_env=True)
def download_locations_open_data(export_path)->str:
conn = BaseHook.get_connection("ecocounter_bot")
os.environ['HOST'] = conn.host
os.environ['LOGIN'] = conn.login
os.environ['PW'] = conn.password
os.environ['EXPORT_PATH'] = export_path
return '''PGPASSWORD=$PW /usr/bin/psql -h $HOST -U $LOGIN -d bigdata -c \
@task.bash(
env = {
'HOST': '{{ conn.ecocounter_bot.host }}',
'LOGIN': '{{ conn.ecocounter_bot.login }}',
'PGPASSWORD': '{{ conn.ecocounter_bot.password }}',
'EXPORT_PATH': EXPORT_PATH
}
)
def download_locations_open_data()->str:
return '''/usr/bin/psql -h $HOST -U $LOGIN -d bigdata -c \
"SELECT location_dir_id, location_name, direction, linear_name_full, side_street,
longitude, latitude, centreline_id, bin_size, latest_calibration_study,
first_active, last_active, date_decommissioned, technology
FROM open_data.cycling_permanent_counts_locations
ORDER BY location_dir_id;" \
--csv -o "$EXPORT_PATH/cycling_permanent_counts_locations.csv"'''

#@task.bash(append_env=True)
#def output_readme(export_path)->str:
# os.environ['EXPORT_PATH'] = export_path
# return '''
#@task.bash()
#def output_readme()->str:
# return f'''
# pandoc -V geometry:margin=1in \
# -o "$EXPORT_PATH/cycling_permanent_counts_readme.pdf" \
# -o "{EXPORT_PATH}/cycling_permanent_counts_readme.pdf" \
# volumes/open_data/sql/cycling_permanent_counts_readme.md
# '''

Expand All @@ -216,8 +219,8 @@ def status_message(ds = None, **context):
wait_till_10th >>
update_locations >> [
insert_and_download_data.expand(yr = yrs),
download_locations_open_data(export_path=EXPORT_PATH),
#output_readme(export_path=EXPORT_PATH)
download_locations_open_data(),
#output_readme()
] >>
status_message()
)
Expand Down

0 comments on commit d00e15a

Please sign in to comment.