From 32c2a0ee1dcc22c363ffcea728f6ddb5f4d47f9a Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Wed, 4 Oct 2023 18:32:15 -0300 Subject: [PATCH] insere upstream_task na task wait_for_flow_run --- pipelines/utils/crawler_ibge_inflacao/flows.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pipelines/utils/crawler_ibge_inflacao/flows.py b/pipelines/utils/crawler_ibge_inflacao/flows.py index 1f76a3412..fff143988 100644 --- a/pipelines/utils/crawler_ibge_inflacao/flows.py +++ b/pipelines/utils/crawler_ibge_inflacao/flows.py @@ -26,7 +26,6 @@ from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, - get_temporal_coverage, rename_current_flow_run_dataset_table, ) @@ -49,7 +48,6 @@ rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id ) - # USA A FUNÇAO EXTRACT_LAST_DATE ENTAO QUEBRA SE A BSE NAO EXISTIR needs_to_update = check_for_updates( indice=INDICE, dataset_id=dataset_id, table_id=table_id @@ -92,6 +90,7 @@ stream_states=True, stream_logs=True, raise_final_state=True, + upstream_tasks=[wait_upload_table], ) wait_for_materialization.max_retries = ( dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value @@ -182,6 +181,7 @@ stream_states=True, stream_logs=True, raise_final_state=True, + upstream_tasks=[wait_upload_table], ) wait_for_materialization.max_retries = ( dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value @@ -272,6 +272,7 @@ stream_states=True, stream_logs=True, raise_final_state=True, + upstream_tasks=[wait_upload_table], ) wait_for_materialization.max_retries = ( dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value @@ -364,6 +365,7 @@ stream_states=True, stream_logs=True, raise_final_state=True, + upstream_tasks=[wait_upload_table], ) wait_for_materialization.max_retries = ( dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value