From d00e15a3f1d11ab8bfdcd98fcfda9a67ce0c264f Mon Sep 17 00:00:00 2001 From: Gabe Wolofsky <80077912+gabrielwol@users.noreply.github.com> Date: Fri, 20 Dec 2024 11:00:08 -0500 Subject: [PATCH] #1085 use templating for conn details --- dags/ecocounter_open_data.py | 71 +++++++++++++++++++----------------- 1 file changed, 37 insertions(+), 34 deletions(-) diff --git a/dags/ecocounter_open_data.py b/dags/ecocounter_open_data.py index 9a8df0fd5..97da7a61b 100644 --- a/dags/ecocounter_open_data.py +++ b/dags/ecocounter_open_data.py @@ -130,14 +130,16 @@ def insert_15min(yr): ) return t.execute(context=context) - @task.bash(append_env=True) - def download_daily_open_data(export_path)->str: - conn = BaseHook.get_connection("ecocounter_bot") - os.environ['HOST'] = conn.host - os.environ['LOGIN'] = conn.login - os.environ['PW'] = conn.password - os.environ['EXPORT_PATH'] = export_path - return '''PGPASSWORD=$PW /usr/bin/psql -h $HOST -U $LOGIN -d bigdata -c \ + @task.bash( + env = { + 'HOST': '{{ conn.ecocounter_bot.host }}', + 'LOGIN': '{{ conn.ecocounter_bot.login }}', + 'PGPASSWORD': '{{ conn.ecocounter_bot.password }}', + 'EXPORT_PATH': EXPORT_PATH + } + ) + def download_daily_open_data()->str: + return '''/usr/bin/psql -h $HOST -U $LOGIN -d bigdata -c \ "SELECT location_dir_id, location_name, direction, linear_name_full, side_street, dt, daily_volume @@ -147,18 +149,18 @@ def download_daily_open_data(export_path)->str: --csv -o "$EXPORT_PATH/cycling_permanent_counts_daily.csv"''' @task.bash( - map_index_template="{{ yr }}", - append_env=True + env = { + 'HOST': '{{ conn.ecocounter_bot.host }}', + 'LOGIN': '{{ conn.ecocounter_bot.login }}', + 'PGPASSWORD': '{{ conn.ecocounter_bot.password }}', + 'EXPORT_PATH': EXPORT_PATH + }, + map_index_template="{{ yr }}" ) - def download_15min_open_data(yr, export_path)->str: + def download_15min_open_data(yr)->str: context = get_current_context() context["yr"] = yr - conn = BaseHook.get_connection("ecocounter_bot") - os.environ['HOST'] = conn.host - os.environ['LOGIN'] = conn.login - os.environ['PW'] = conn.password - os.environ['EXPORT_PATH'] = export_path - return f'''PGPASSWORD=$PW /usr/bin/psql -h $HOST -U $LOGIN -d bigdata -c \ + return f'''/usr/bin/psql -h $HOST -U $LOGIN -d bigdata -c \ "SELECT location_dir_id, datetime_bin, bin_volume FROM open_data.cycling_permanent_counts_15min WHERE @@ -168,17 +170,19 @@ def download_15min_open_data(yr, export_path)->str: --csv -o "$EXPORT_PATH/cycling_permanent_counts_15min_{yr}_{yr+1}.csv"''' #insert only latest year data, but download everything (single file) - insert_daily(yr) >> download_daily_open_data(export_path=EXPORT_PATH) - insert_15min(yr) >> download_15min_open_data(yr, export_path=EXPORT_PATH) + insert_daily(yr) >> download_daily_open_data() + insert_15min(yr) >> download_15min_open_data(yr) - @task.bash(append_env=True) - def download_locations_open_data(export_path)->str: - conn = BaseHook.get_connection("ecocounter_bot") - os.environ['HOST'] = conn.host - os.environ['LOGIN'] = conn.login - os.environ['PW'] = conn.password - os.environ['EXPORT_PATH'] = export_path - return '''PGPASSWORD=$PW /usr/bin/psql -h $HOST -U $LOGIN -d bigdata -c \ + @task.bash( + env = { + 'HOST': '{{ conn.ecocounter_bot.host }}', + 'LOGIN': '{{ conn.ecocounter_bot.login }}', + 'PGPASSWORD': '{{ conn.ecocounter_bot.password }}', + 'EXPORT_PATH': EXPORT_PATH + } + ) + def download_locations_open_data()->str: + return '''/usr/bin/psql -h $HOST -U $LOGIN -d bigdata -c \ "SELECT location_dir_id, location_name, direction, linear_name_full, side_street, longitude, latitude, centreline_id, bin_size, latest_calibration_study, first_active, last_active, date_decommissioned, technology @@ -186,12 +190,11 @@ def download_locations_open_data(export_path)->str: ORDER BY location_dir_id;" \ --csv -o "$EXPORT_PATH/cycling_permanent_counts_locations.csv"''' - #@task.bash(append_env=True) - #def output_readme(export_path)->str: - # os.environ['EXPORT_PATH'] = export_path - # return ''' + #@task.bash() + #def output_readme()->str: + # return f''' # pandoc -V geometry:margin=1in \ - # -o "$EXPORT_PATH/cycling_permanent_counts_readme.pdf" \ + # -o "{EXPORT_PATH}/cycling_permanent_counts_readme.pdf" \ # volumes/open_data/sql/cycling_permanent_counts_readme.md # ''' @@ -216,8 +219,8 @@ def status_message(ds = None, **context): wait_till_10th >> update_locations >> [ insert_and_download_data.expand(yr = yrs), - download_locations_open_data(export_path=EXPORT_PATH), - #output_readme(export_path=EXPORT_PATH) + download_locations_open_data(), + #output_readme() ] >> status_message() )