diff --git a/pipelines/datasets/br_rf_cno/flows.py b/pipelines/datasets/br_rf_cno/flows.py index eb30502ec..a961772a8 100644 --- a/pipelines/datasets/br_rf_cno/flows.py +++ b/pipelines/datasets/br_rf_cno/flows.py @@ -96,8 +96,7 @@ table_id=table_ids, dump_mode=unmapped("append"), source_format=unmapped('parquet'), - wait=unmapped(files), - upstream_tasks=[files] + upstream_tasks=[unmapped(files)] # https://github.com/PrefectHQ/prefect/issues/2752 ) dbt_parameters = create_parameters_list( @@ -108,7 +107,7 @@ download_csv_file = True, dbt_command = 'run', disable_elementary = True, - upstream_tasks=[wait_upload_table] + upstream_tasks=[unmapped(wait_upload_table)] ) with case(materialize_after_dump, True): @@ -120,7 +119,7 @@ parameters=dbt_parameters, labels=unmapped(current_flow_labels), run_name=f"Materialize {dataset_id}.{table_ids}", - upstream_tasks=[dbt_parameters] + upstream_tasks=[unmapped(dbt_parameters)] ) wait_for_materialization = wait_for_flow_run.map( @@ -128,7 +127,7 @@ stream_states=unmapped(True), stream_logs=unmapped(True), raise_final_state=unmapped(True), - upstream_tasks=[materialization_flow] + upstream_tasks=[unmapped(materialization_flow)] ) wait_for_materialization.max_retries = ( dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value @@ -144,10 +143,9 @@ date_column_name=unmapped({"date": "data_extracao"}), date_format=unmapped("%Y-%m-%d"), coverage_type=unmapped("all_bdpro"), - #time_delta=unmapped({"months": 6}), prefect_mode=unmapped(materialization_mode), bq_project=unmapped("basedosdados"), - upstream_tasks=[wait_for_materialization], + upstream_tasks=[unmapped(wait_for_materialization)], ) diff --git a/pipelines/datasets/br_rf_cno/tasks.py b/pipelines/datasets/br_rf_cno/tasks.py index 3aeefb3e7..7f362ac66 100644 --- a/pipelines/datasets/br_rf_cno/tasks.py +++ b/pipelines/datasets/br_rf_cno/tasks.py @@ -2,7 +2,8 @@ """ Tasks for br_rf_cno """ - +from datetime import datetime, timedelta +from pipelines.constants import constants import os from bs4 import BeautifulSoup from datetime import datetime @@ -106,7 +107,10 @@ def wrangling(input_dir: str, output_dir: str, partition_date: str) -> None: log('----- Wrangling completed') -@task +@task( + max_retries=5, + retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), +) def crawl_cno(root: str, url: str) -> None: """ Downloads and unpacks a ZIP file from the given URL.