diff --git a/dags/pull_here.py b/dags/pull_here.py index d789bc461..681ad2060 100644 --- a/dags/pull_here.py +++ b/dags/pull_here.py @@ -81,14 +81,14 @@ def get_download_link(request_id: str, access_token: str): request_id = get_request_id(access_token) download_url = get_download_link(request_id, access_token) - @task.bash(append_env=True) - def load_data_run(download_url)->str: - conn = BaseHook.get_connection("here_bot") - os.environ['HOST'] = conn.host - os.environ['LOGIN'] = conn.login - os.environ['PW'] = conn.password - os.environ['DOWNLOAD_URL'] = download_url - return '''curl $DOWNLOAD_URL | gunzip | PGPASSWORD=$PW psql -h $HOST -U $LOGIN -d bigdata -c "\\COPY here.ta_view FROM STDIN WITH (FORMAT csv, HEADER TRUE);" ''' + @task.bash(env = { + 'HOST': '{{ conn.here_bot.host }}', + 'LOGIN': '{{ conn.here_bot.login }}', + 'PGPASSWORD': '{{ conn.here_bot.password }}', + 'DOWNLOAD_URL': download_url + }) + def load_data_run()->str: + return '''curl $DOWNLOAD_URL | gunzip | psql -h $HOST -U $LOGIN -d bigdata -c "\\COPY here.ta_view FROM STDIN WITH (FORMAT csv, HEADER TRUE);" ''' # Create a task group for triggering the DAGs @task_group(group_id='trigger_dags_tasks') @@ -105,6 +105,6 @@ def trigger_dags(**kwargs): ) trigger_operators.append(trigger_operator) - load_data_run(download_url) >> trigger_dags() + load_data_run() >> trigger_dags() pull_here() diff --git a/dags/pull_here_path.py b/dags/pull_here_path.py index 1e8d7fd56..aeaa4d8de 100644 --- a/dags/pull_here_path.py +++ b/dags/pull_here_path.py @@ -76,15 +76,15 @@ def get_download_link(request_id: str, access_token: str): request_id = get_request_id(access_token) download_url = get_download_link(request_id, access_token) - @task.bash(append_env = True) - def load_data(download_url)->str: - conn = BaseHook.get_connection("here_bot") - os.environ['HOST'] = conn.host - os.environ['LOGIN'] = conn.login - os.environ['PW'] = conn.password - os.environ['DOWNLOAD_URL'] = download_url - return '''curl $DOWNLOAD_URL | gunzip | PGPASSWORD=$PW psql -h $HOST -U $LOGIN -d bigdata -c "\\COPY here.ta_path_view FROM STDIN WITH (FORMAT csv, HEADER TRUE);" ''' + @task.bash(env = { + 'HOST': '{{ conn.here_bot.host }}', + 'LOGIN': '{{ conn.here_bot.login }}', + 'PGPASSWORD': '{{ conn.here_bot.password }}', + 'DOWNLOAD_URL': download_url + }) + def load_data()->str: + return '''curl $DOWNLOAD_URL | gunzip | psql -h $HOST -U $LOGIN -d bigdata -c "\\COPY here.ta_path_view FROM STDIN WITH (FORMAT csv, HEADER TRUE);" ''' - load_data(download_url) + load_data() pull_here_path() \ No newline at end of file