diff --git a/pipelines/datasets/br_anp_precos_combustiveis/flows.py b/pipelines/datasets/br_anp_precos_combustiveis/flows.py index eaa5e4c96..6a003b91d 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/flows.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/flows.py @@ -66,7 +66,6 @@ with case(dados_desatualizados, True): df = download_and_transform(upstream_tasks=[rename_flow_run]) output_path = make_partitions(df=df, upstream_tasks=[df]) - get_date_max_pro = data_max_bd_pro(df=df) # pylint: disable=C0103 wait_upload_table = create_table_and_upload_to_gcs( @@ -91,6 +90,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks=[wait_upload_table], ) wait_for_materialization = wait_for_flow_run( @@ -105,14 +105,16 @@ wait_for_materialization.retry_delay = timedelta( seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - + get_date_max_pro = data_max_bd_pro( + df=df, upstream_tasks=[wait_upload_table] + ) with case(update_metadata, True): update_django_metadata( dataset_id, table_id, metadata_type="DateTimeRange", bq_last_update=False, - bq_table_last_year_month=True, + bq_table_last_year_month=False, api_mode="prod", date_format="yy-mm-dd", is_bd_pro=True, @@ -120,7 +122,7 @@ time_delta=6, time_unit="weeks", _last_date=get_date_max_pro, - upstream_tasks=[wait_upload_table], + upstream_tasks=[get_date_max_pro], ) anp_microdados.storage = GCS(constants.GCS_FLOWS_BUCKET.value) anp_microdados.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) diff --git a/pipelines/datasets/br_anp_precos_combustiveis/tasks.py b/pipelines/datasets/br_anp_precos_combustiveis/tasks.py index 46769d354..60f37a94a 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/tasks.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/tasks.py @@ -40,9 +40,16 @@ def check_for_updates(dataset_id, table_id): # Obtém a data mais recente do site download_files(anatel_constants.URLS_DATA.value, anatel_constants.PATH_INPUT.value) df = pd.read_csv(anatel_constants.URL_GLP.value, sep=";", encoding="utf-8") - data_obj = pd.to_datetime(df["Data da Coleta"]).max() - data_obj = data_obj.date() - + data_obj = ( + df["Data da Coleta"].str[6:10] + + "-" + + df["Data da Coleta"].str[3:5] + + "-" + + df["Data da Coleta"].str[0:2] + ) + data_obj = data_obj.apply(lambda x: pd.to_datetime(x).strftime("%Y-%m-%d")) + data_obj = pd.to_datetime(data_obj, format="%Y-%m-%d").dt.date + data_obj = data_obj.max() # Obtém a última data no site BD data_bq_obj = extract_last_date( dataset_id, table_id, "yy-mm-dd", "basedosdados", data="data_coleta"