diff --git a/pipelines/datasets/br_cvm_administradores_carteira/flows.py b/pipelines/datasets/br_cvm_administradores_carteira/flows.py index 530db4741..457d2d9f0 100644 --- a/pipelines/datasets/br_cvm_administradores_carteira/flows.py +++ b/pipelines/datasets/br_cvm_administradores_carteira/flows.py @@ -268,7 +268,7 @@ is_free=True, time_delta=6, time_unit="months", - upstream_tasks=[materialization_flow, data], + upstream_tasks=[wait_for_materialization, data], ) br_cvm_adm_car_pes_jur.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_cvm_adm_car_pes_jur.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) diff --git a/pipelines/datasets/br_cvm_administradores_carteira/schedules.py b/pipelines/datasets/br_cvm_administradores_carteira/schedules.py index 1396ab820..28c5d4b4c 100644 --- a/pipelines/datasets/br_cvm_administradores_carteira/schedules.py +++ b/pipelines/datasets/br_cvm_administradores_carteira/schedules.py @@ -22,6 +22,7 @@ "materialization_mode": "prod", "materialize_after_dump": True, "table_id": "responsavel", + "dbt_alias": True, }, ) ], @@ -41,7 +42,8 @@ "materialization_mode": "prod", "materialize_after_dump": True, "table_id": "pessoa_fisica", - "dbt_alias": False, + "udpate_metadata": True, + "dbt_alias": True, }, ) ], @@ -61,7 +63,8 @@ "materialization_mode": "prod", "materialize_after_dump": True, "table_id": "pessoa_juridica", - "dbt_alias": False, + "udpate_metadata": True, + "dbt_alias": True, }, ) ], diff --git a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py index f31bd2d7e..011d164f8 100644 --- a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py +++ b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py @@ -92,7 +92,9 @@ ) with case(update_metadata, True): - data = extract_last_date(dataset_id, table_id, "basedosdados") + data = extract_last_date( + dataset_id, table_id, "basedosdados", var_name="data_abertura_processo" + ) update_django_metadata( dataset_id, table_id, @@ -105,7 +107,7 @@ is_free=True, time_delta=6, time_unit="months", - upstream_tasks=[materialization_flow, data], + upstream_tasks=[wait_for_materialization, data], ) br_cvm_ofe_pub_dis_dia.storage = GCS(constants.GCS_FLOWS_BUCKET.value) diff --git a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/schedules.py b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/schedules.py index 810ca7a4e..978f90d68 100644 --- a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/schedules.py +++ b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/schedules.py @@ -21,7 +21,8 @@ "materialization_mode": "prod", "materialize after dump": True, "table_id": "dia", - "dbt_alias": False, + "udpate_metadata": True, + "dbt_alias": True, }, ), ], diff --git a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py index 59c063108..e77a601ac 100644 --- a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py +++ b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py @@ -72,7 +72,12 @@ def clean_table_oferta_distribuicao(root: str) -> str: @task -def extract_last_date(dataset_id: str, table_id: str, billing_project_id: str) -> str: +def extract_last_date( + dataset_id: str, + table_id: str, + billing_project_id: str, + var_name: str, +) -> datetime: """ Extracts the last update date of a given dataset table. @@ -87,22 +92,19 @@ def extract_last_date(dataset_id: str, table_id: str, billing_project_id: str) - Raises: Exception: If an error occurs while extracting the last update date. """ - log("dasdasdsadas") query_bd = f""" - SELECT MAX(data_abertura_processo) as max_date + SELECT MAX({var_name}) as max_date FROM `{billing_project_id}.{dataset_id}.{table_id}` """ - log(f"{query_bd}") - log("dasdasdsadas") t = bd.read_sql( query=query_bd, billing_project_id=billing_project_id, from_file=True, ) - log(f"{t}") + data = t["max_date"][0] log(f"A data mais recente da tabela é: {data}")