Skip to content

Commit

Permalink
#1112 use templating for conn details!
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielwol committed Dec 20, 2024
1 parent 8645bdb commit 015b8eb
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 18 deletions.
18 changes: 9 additions & 9 deletions dags/pull_here.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,14 @@ def get_download_link(request_id: str, access_token: str):
request_id = get_request_id(access_token)
download_url = get_download_link(request_id, access_token)

@task.bash(append_env=True)
def load_data_run(download_url)->str:
conn = BaseHook.get_connection("here_bot")
os.environ['HOST'] = conn.host
os.environ['LOGIN'] = conn.login
os.environ['PW'] = conn.password
os.environ['DOWNLOAD_URL'] = download_url
return '''curl $DOWNLOAD_URL | gunzip | PGPASSWORD=$PW psql -h $HOST -U $LOGIN -d bigdata -c "\\COPY here.ta_view FROM STDIN WITH (FORMAT csv, HEADER TRUE);" '''
@task.bash(env = {
'HOST': '{{ conn.here_bot.host }}',
'LOGIN': '{{ conn.here_bot.login }}',
'PGPASSWORD': '{{ conn.here_bot.password }}',
'DOWNLOAD_URL': download_url
})
def load_data_run()->str:
return '''curl $DOWNLOAD_URL | gunzip | psql -h $HOST -U $LOGIN -d bigdata -c "\\COPY here.ta_view FROM STDIN WITH (FORMAT csv, HEADER TRUE);" '''

# Create a task group for triggering the DAGs
@task_group(group_id='trigger_dags_tasks')
Expand All @@ -105,6 +105,6 @@ def trigger_dags(**kwargs):
)
trigger_operators.append(trigger_operator)

load_data_run(download_url) >> trigger_dags()
load_data_run() >> trigger_dags()

pull_here()
18 changes: 9 additions & 9 deletions dags/pull_here_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ def get_download_link(request_id: str, access_token: str):
request_id = get_request_id(access_token)
download_url = get_download_link(request_id, access_token)

@task.bash(append_env = True)
def load_data(download_url)->str:
conn = BaseHook.get_connection("here_bot")
os.environ['HOST'] = conn.host
os.environ['LOGIN'] = conn.login
os.environ['PW'] = conn.password
os.environ['DOWNLOAD_URL'] = download_url
return '''curl $DOWNLOAD_URL | gunzip | PGPASSWORD=$PW psql -h $HOST -U $LOGIN -d bigdata -c "\\COPY here.ta_path_view FROM STDIN WITH (FORMAT csv, HEADER TRUE);" '''
@task.bash(env = {
'HOST': '{{ conn.here_bot.host }}',
'LOGIN': '{{ conn.here_bot.login }}',
'PGPASSWORD': '{{ conn.here_bot.password }}',
'DOWNLOAD_URL': download_url
})
def load_data()->str:
return '''curl $DOWNLOAD_URL | gunzip | psql -h $HOST -U $LOGIN -d bigdata -c "\\COPY here.ta_path_view FROM STDIN WITH (FORMAT csv, HEADER TRUE);" '''

load_data(download_url)
load_data()

pull_here_path()

0 comments on commit 015b8eb

Please sign in to comment.