Skip to content

Commit

Permalink
#1115 postgres_conn_id fix
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielwol committed Dec 17, 2024
1 parent 86668b5 commit 12e3241
Show file tree
Hide file tree
Showing 10 changed files with 29 additions and 29 deletions.
4 changes: 2 additions & 2 deletions dags/bluetooth_check_readers_temp.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def blip_pipeline():
update_routes_table = SQLExecuteQueryOperator(
sql='''SELECT * from bluetooth.insert_report_date_temp()''',
task_id='update_routes_table',
postgres_conn_id='bt_bot',
conn_id='bt_bot',
autocommit=True,
retries = 0
)
Expand All @@ -80,7 +80,7 @@ def blip_pipeline():
update_reader_status = SQLExecuteQueryOperator(
sql='''SELECT * from bluetooth.reader_status_history_temp('{{ ds }}')''',
task_id='update_reader_status',
postgres_conn_id='bt_bot',
conn_id='bt_bot',
autocommit=True,
retries = 0
)
Expand Down
2 changes: 1 addition & 1 deletion dags/citywide_tti_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def citywide_tti_aggregate():

aggregate_daily = SQLExecuteQueryOperator(sql="SELECT covid.generate_citywide_tti( '{{macros.ds_add(ds, -1)}}' )",
task_id='aggregate_daily',
postgres_conn_id='congestion_bot',
conn_id='congestion_bot',
autocommit=True,
retries = 0
)
Expand Down
6 changes: 3 additions & 3 deletions dags/ecocounter_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

from airflow.decorators import dag, task, task_group
from airflow.models import Variable
from airflow.hooks.base_hook import BaseHook
from airflow.hooks.base import BaseHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.macros import ds_add
from airflow.exceptions import AirflowSkipException
from airflow.sensors.external_task import ExternalTaskMarker
Expand Down Expand Up @@ -80,7 +80,7 @@ def check_partitions():
base_table := 'counts_unfiltered',
year_ := '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int
)""",
postgres_conn_id='ecocounter_bot',
conn_id='ecocounter_bot',
autocommit=True
)

Expand Down
8 changes: 4 additions & 4 deletions dags/eoy_create_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,24 +76,24 @@ def insert_holidays(yr=None):
here_create_tables = SQLExecuteQueryOperator(
task_id='here_create_tables',
sql="SELECT here.create_yearly_tables('{{ task_instance.xcom_pull('yr') }}')",
postgres_conn_id='here_bot',
conn_id='here_bot',
autocommit=True)
here_path_create_tables = SQLExecuteQueryOperator(
task_id='here_path_create_tables',
sql="SELECT here.create_yearly_tables_path('{{ task_instance.xcom_pull('yr') }}')",
postgres_conn_id='here_bot',
conn_id='here_bot',
autocommit=True)

bt_create_tables = SQLExecuteQueryOperator(
task_id='bluetooth_create_tables',
sql="SELECT bluetooth.create_obs_tables('{{ task_instance.xcom_pull('yr') }}')",
postgres_conn_id='bt_bot',
conn_id='bt_bot',
autocommit=True)

congestion_create_table = SQLExecuteQueryOperator(
task_id='congestion_create_table',
sql="SELECT congestion.create_yearly_tables('{{ task_instance.xcom_pull('yr') }}')",
postgres_conn_id='congestion_bot',
conn_id='congestion_bot',
autocommit=True)

bt_replace_trigger(yr=YR)
Expand Down
4 changes: 2 additions & 2 deletions dags/miovision_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,15 @@ def check_partitions():
sql=["SELECT miovision_api.create_yyyy_volumes_partition('volumes', '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int, 'datetime_bin')",
"SELECT miovision_api.create_yyyy_volumes_15min_partition('volumes_15min', '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int)",
"SELECT miovision_api.create_yyyy_volumes_15min_partition('volumes_15min_mvt', '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int)"],
postgres_conn_id='miovision_api_bot',
conn_id='miovision_api_bot',
autocommit=True
)

create_month_partition = SQLExecuteQueryOperator(
task_id='create_month_partition',
pre_execute=check_1st_of_month,
sql="""SELECT miovision_api.create_mm_nested_volumes_partitions('volumes'::text, '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int, '{{ macros.ds_format(ds, '%Y-%m-%d', '%m') }}'::int)""",
postgres_conn_id='miovision_api_bot',
conn_id='miovision_api_bot',
autocommit=True,
trigger_rule='none_failed_min_one_success'
)
Expand Down
10 changes: 5 additions & 5 deletions dags/refresh_wys_monthly.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,36 +52,36 @@ def last_month(ds):
#sql in bdit_data-sources/wys/api/sql/mat-view-stationary-signs.sql
sql='SELECT wys.refresh_mat_view_stationary_signs()',
task_id='wys_view_stat_signs',
postgres_conn_id='wys_bot',
conn_id='wys_bot',
autocommit=True,
retries = 0)
wys_view_mobile_api_id = SQLExecuteQueryOperator(
#sql in bdit_data-sources/wys/api/sql/function-refresh_mat_view_mobile_api_id.sql
#sql in bdit_data-sources/wys/api/sql/create-view-mobile_api_id.sql
sql='SELECT wys.refresh_mat_view_mobile_api_id()',
task_id='wys_view_mobile_api_id',
postgres_conn_id='wys_bot',
conn_id='wys_bot',
autocommit=True,
retries = 0)
od_wys_view = SQLExecuteQueryOperator(
#sql in bdit_data-sources/wys/api/sql/open_data/mat-view-stationary-locations.sql
sql='SELECT wys.refresh_od_mat_view()',
task_id='od_wys_view',
postgres_conn_id='wys_bot',
conn_id='wys_bot',
autocommit=True,
retries = 0)
wys_mobile_summary = SQLExecuteQueryOperator(
#sql in bdit_data-sources/wys/api/sql/function-mobile-summary.sql
sql="SELECT wys.mobile_summary_for_month('{{ last_month(ds) }}')",
task_id='wys_mobile_summary',
postgres_conn_id='wys_bot',
conn_id='wys_bot',
autocommit=True,
retries = 0)
wys_stat_summary = SQLExecuteQueryOperator(
#sql in bdit_data-sources/wys/api/sql/function-stationary-sign-summary.sql
sql="SELECT wys.stationary_summary_for_month('{{ last_month(ds) }}')",
task_id='wys_stat_summary',
postgres_conn_id='wys_bot',
conn_id='wys_bot',
autocommit=True,
retries = 0)

Expand Down
4 changes: 2 additions & 2 deletions dags/tti_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@
def tti_aggregate():
aggregate_citywide_tti = SQLExecuteQueryOperator(sql="SELECT covid.generate_citywide_tti( '{{macros.ds_add(ds, -1)}}' )",
task_id='aggregate_citywide_tti',
postgres_conn_id='congestion_bot',
conn_id='congestion_bot',
autocommit=True,
retries = 0
)

aggregate_downtown_tti = SQLExecuteQueryOperator(sql="SELECT covid.generate_downtown_tti( '{{macros.ds_add(ds, -1)}}' )",
task_id='aggregate_downtown_tti',
postgres_conn_id='congestion_bot',
conn_id='congestion_bot',
autocommit=True,
retries = 0
)
Expand Down
8 changes: 4 additions & 4 deletions dags/vds_pull_vdsdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def check_partitions():
#partition by year only:
"SELECT vds.partition_vds_yyyy('counts_15min_div2'::text, '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int)",
"SELECT vds.partition_vds_yyyy('counts_15min_bylane_div2'::text, '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int)"],
postgres_conn_id='vds_bot',
conn_id='vds_bot',
autocommit=True
)

Expand All @@ -122,7 +122,7 @@ def pull_vdsdata():
dt >= '{{ds}} 00:00:00'::timestamp
AND dt < '{{ds}} 00:00:00'::timestamp + INTERVAL '1 DAY'""",
task_id='delete_vdsdata',
postgres_conn_id='vds_bot',
conn_id='vds_bot',
autocommit=True,
retries=1,
trigger_rule='none_failed'
Expand All @@ -148,7 +148,7 @@ def summarize_v15():
summarize_v15_task = SQLExecuteQueryOperator(
sql=["delete/delete-counts_15min.sql", "insert/insert_counts_15min.sql"],
task_id='summarize_v15',
postgres_conn_id='vds_bot',
conn_id='vds_bot',
autocommit=True,
retries=1
)
Expand All @@ -157,7 +157,7 @@ def summarize_v15():
summarize_v15_bylane_task = SQLExecuteQueryOperator(
sql=["delete/delete-counts_15min_bylane.sql", "insert/insert_counts_15min_bylane.sql"],
task_id='summarize_v15_bylane',
postgres_conn_id='vds_bot',
conn_id='vds_bot',
autocommit=True,
retries=1
)
Expand Down
8 changes: 4 additions & 4 deletions dags/vds_pull_vdsvehicledata.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def check_partitions_TG():
task_id='create_partitions',
pre_execute=check_jan_1st,
sql="SELECT vds.partition_vds_yyyymm('raw_vdsvehicledata', '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int, 'dt')",
postgres_conn_id='vds_bot',
conn_id='vds_bot',
autocommit=True
)

Expand All @@ -85,7 +85,7 @@ def pull_vdsvehicledata():
dt >= '{{ds}} 00:00:00'::timestamp
AND dt < '{{ds}} 00:00:00'::timestamp + INTERVAL '1 DAY'""",
task_id='delete_vdsvehicledata',
postgres_conn_id='vds_bot',
conn_id='vds_bot',
autocommit=True,
retries=1,
trigger_rule='none_failed'
Expand All @@ -110,7 +110,7 @@ def summarize_vdsvehicledata():
summarize_speeds_task = SQLExecuteQueryOperator(
sql=["delete/delete-veh_speeds_15min.sql", "insert/insert_veh_speeds_15min.sql"],
task_id='summarize_speeds',
postgres_conn_id='vds_bot',
conn_id='vds_bot',
autocommit=True,
retries=1
)
Expand All @@ -119,7 +119,7 @@ def summarize_vdsvehicledata():
summarize_lengths_task = SQLExecuteQueryOperator(
sql=["delete/delete-veh_length_15min.sql", "insert/insert_veh_lengths_15min.sql"],
task_id='summarize_lengths',
postgres_conn_id='vds_bot',
conn_id='vds_bot',
autocommit=True,
retries=1
)
Expand Down
4 changes: 2 additions & 2 deletions dags/wys_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def check_partitions():
task_id='create_annual_partitions',
pre_execute=check_jan_1st,
sql="SELECT wys.create_yyyy_raw_data_partition('{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int)",
postgres_conn_id='wys_bot',
conn_id='wys_bot',
autocommit=True
)

Expand All @@ -80,7 +80,7 @@ def check_partitions():
pre_execute=check_1st_of_month,
trigger_rule='none_failed_min_one_success',
sql="SELECT wys.create_mm_nested_raw_data_partitions('{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int, '{{ macros.ds_format(ds, '%Y-%m-%d', '%m') }}'::int)",
postgres_conn_id='wys_bot',
conn_id='wys_bot',
autocommit=True
)

Expand Down

0 comments on commit 12e3241

Please sign in to comment.