From f8673815a9acd38ac34a237acaca90f9262c2506 Mon Sep 17 00:00:00 2001 From: tricktx Date: Thu, 15 Aug 2024 16:54:03 -0300 Subject: [PATCH 1/5] [fix] add upstream with unmapped in flow --- pipelines/datasets/br_rf_cno/flows.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/pipelines/datasets/br_rf_cno/flows.py b/pipelines/datasets/br_rf_cno/flows.py index eb30502ec..8af700750 100644 --- a/pipelines/datasets/br_rf_cno/flows.py +++ b/pipelines/datasets/br_rf_cno/flows.py @@ -96,8 +96,7 @@ table_id=table_ids, dump_mode=unmapped("append"), source_format=unmapped('parquet'), - wait=unmapped(files), - upstream_tasks=[files] + upstream_tasks=[unmapped(files)] # https://github.com/PrefectHQ/prefect/issues/2752 ) dbt_parameters = create_parameters_list( @@ -108,7 +107,7 @@ download_csv_file = True, dbt_command = 'run', disable_elementary = True, - upstream_tasks=[wait_upload_table] + upstream_tasks=[unmapped(wait_upload_table)] ) with case(materialize_after_dump, True): @@ -120,7 +119,7 @@ parameters=dbt_parameters, labels=unmapped(current_flow_labels), run_name=f"Materialize {dataset_id}.{table_ids}", - upstream_tasks=[dbt_parameters] + upstream_tasks=[unmapped(dbt_parameters)] ) wait_for_materialization = wait_for_flow_run.map( @@ -128,7 +127,7 @@ stream_states=unmapped(True), stream_logs=unmapped(True), raise_final_state=unmapped(True), - upstream_tasks=[materialization_flow] + upstream_tasks=[unmapped(materialization_flow)] ) wait_for_materialization.max_retries = ( dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value @@ -147,7 +146,7 @@ #time_delta=unmapped({"months": 6}), prefect_mode=unmapped(materialization_mode), bq_project=unmapped("basedosdados"), - upstream_tasks=[wait_for_materialization], + upstream_tasks=[unmapped(wait_for_materialization)], ) From 3b9b39007f576298673f04371d34371612d78b01 Mon Sep 17 00:00:00 2001 From: tricktx Date: Thu, 15 Aug 2024 17:42:50 -0300 Subject: [PATCH 2/5] [register] date_column_name --- pipelines/datasets/br_rf_cno/flows.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pipelines/datasets/br_rf_cno/flows.py b/pipelines/datasets/br_rf_cno/flows.py index 8af700750..beee5564b 100644 --- a/pipelines/datasets/br_rf_cno/flows.py +++ b/pipelines/datasets/br_rf_cno/flows.py @@ -137,10 +137,11 @@ ) with case(update_metadata, True): + update_django_metadata.map( dataset_id=unmapped(dataset_id), table_id=table_ids, - date_column_name=unmapped({"date": "data_extracao"}), + date_column_name=unmapped({"date": "data_extracao"}), #register_date date_format=unmapped("%Y-%m-%d"), coverage_type=unmapped("all_bdpro"), #time_delta=unmapped({"months": 6}), From ff352445605cbe965043451b0f73423ea9033f6f Mon Sep 17 00:00:00 2001 From: tricktx Date: Thu, 15 Aug 2024 17:56:13 -0300 Subject: [PATCH 3/5] [register] date_column_name part 2 --- pipelines/datasets/br_rf_cno/flows.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pipelines/datasets/br_rf_cno/flows.py b/pipelines/datasets/br_rf_cno/flows.py index beee5564b..36fdfd7a8 100644 --- a/pipelines/datasets/br_rf_cno/flows.py +++ b/pipelines/datasets/br_rf_cno/flows.py @@ -137,14 +137,12 @@ ) with case(update_metadata, True): - update_django_metadata.map( dataset_id=unmapped(dataset_id), table_id=table_ids, date_column_name=unmapped({"date": "data_extracao"}), #register_date date_format=unmapped("%Y-%m-%d"), coverage_type=unmapped("all_bdpro"), - #time_delta=unmapped({"months": 6}), prefect_mode=unmapped(materialization_mode), bq_project=unmapped("basedosdados"), upstream_tasks=[unmapped(wait_for_materialization)], From 905958c45e150355be693363c4427c1bfb96be68 Mon Sep 17 00:00:00 2001 From: tricktx Date: Fri, 16 Aug 2024 10:10:56 -0300 Subject: [PATCH 4/5] [register] date_column_name part 3 --- pipelines/datasets/br_rf_cno/flows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/datasets/br_rf_cno/flows.py b/pipelines/datasets/br_rf_cno/flows.py index 36fdfd7a8..a961772a8 100644 --- a/pipelines/datasets/br_rf_cno/flows.py +++ b/pipelines/datasets/br_rf_cno/flows.py @@ -140,7 +140,7 @@ update_django_metadata.map( dataset_id=unmapped(dataset_id), table_id=table_ids, - date_column_name=unmapped({"date": "data_extracao"}), #register_date + date_column_name=unmapped({"date": "data_extracao"}), date_format=unmapped("%Y-%m-%d"), coverage_type=unmapped("all_bdpro"), prefect_mode=unmapped(materialization_mode), From ee59295d1924e798661e4ceb88a401c89f4df013 Mon Sep 17 00:00:00 2001 From: tricktx Date: Fri, 16 Aug 2024 10:13:59 -0300 Subject: [PATCH 5/5] [fix] add max_retries in crawl_cno --- pipelines/datasets/br_rf_cno/tasks.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pipelines/datasets/br_rf_cno/tasks.py b/pipelines/datasets/br_rf_cno/tasks.py index 3aeefb3e7..7f362ac66 100644 --- a/pipelines/datasets/br_rf_cno/tasks.py +++ b/pipelines/datasets/br_rf_cno/tasks.py @@ -2,7 +2,8 @@ """ Tasks for br_rf_cno """ - +from datetime import datetime, timedelta +from pipelines.constants import constants import os from bs4 import BeautifulSoup from datetime import datetime @@ -106,7 +107,10 @@ def wrangling(input_dir: str, output_dir: str, partition_date: str) -> None: log('----- Wrangling completed') -@task +@task( + max_retries=5, + retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), +) def crawl_cno(root: str, url: str) -> None: """ Downloads and unpacks a ZIP file from the given URL.