From 78661dbc269c34edc7100c6efe38672ed9f288c8 Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Wed, 20 Sep 2023 17:29:40 -0300 Subject: [PATCH] insere update_metadta em br_cvm_administradores_carteira --- .../br_cvm_administradores_carteira/flows.py | 98 +++++++++---------- .../br_cvm_administradores_carteira/tasks.py | 44 +++++++++ 2 files changed, 89 insertions(+), 53 deletions(-) diff --git a/pipelines/datasets/br_cvm_administradores_carteira/flows.py b/pipelines/datasets/br_cvm_administradores_carteira/flows.py index 6741af85f..530db4741 100644 --- a/pipelines/datasets/br_cvm_administradores_carteira/flows.py +++ b/pipelines/datasets/br_cvm_administradores_carteira/flows.py @@ -21,13 +21,15 @@ clean_table_responsavel, clean_table_pessoa_fisica, clean_table_pessoa_juridica, + extract_last_date, ) +from pipelines.utils.metadata.tasks import update_django_metadata + from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, - update_metadata, get_temporal_coverage, rename_current_flow_run_dataset_table, get_current_flow_labels, @@ -37,7 +39,7 @@ URL = "http://dados.cvm.gov.br/dados/ADM_CART/CAD/DADOS/cad_adm_cart.zip" with Flow( - name="br_cvm_administradores_carteira.responsavel", code_owners=["lucas_cr"] + name="br_cvm_administradores_carteira.responsavel", code_owners=["Equipe Pipelines"] ) as br_cvm_adm_car_res: # Parameters dataset_id = Parameter( @@ -68,17 +70,7 @@ dump_mode="append", wait=filepath, ) - - # no generate temporal coverage since there is no date variable - # wait_update_metadata = update_metadata( - # dataset_id=dataset_id, - # table_id=table_id, - # fields_to_update=[ - # {"last_updated": {"data": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}} - # ], - # upstream_tasks=[wait_upload_table], - # ) - + # dont generate temporal coverage since there's no date variable with case(materialize_after_dump, True): # Trigger DBT flow run current_flow_labels = get_current_flow_labels() @@ -113,7 +105,7 @@ br_cvm_adm_car_res.schedule = schedule_responsavel with Flow( - "br_cvm_administradores_carteira.pessoa_fisica", code_owners=["lucas_cr"] + "br_cvm_administradores_carteira.pessoa_fisica", code_owners=["Equipe Pipelines"] ) as br_cvm_adm_car_pes_fis: # Parameters dataset_id = Parameter( @@ -129,6 +121,7 @@ "materialize_after_dump", default=True, required=False ) dbt_alias = Parameter("dbt_alias", default=False, required=False) + update_metadata = Parameter("update_metadata", default=False, required=False) rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id @@ -145,25 +138,6 @@ wait=filepath, ) - # update_metadata - temporal_coverage = get_temporal_coverage( - filepath=filepath, - date_cols=["data_registro"], - time_unit="day", - interval="1", - upstream_tasks=[wait_upload_table], - ) - - # wait_update_metadata = update_metadata( - # dataset_id=dataset_id, - # table_id=table_id, - # fields_to_update=[ - # {"last_updated": {"data": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}}, - # {"temporal_coverage": [temporal_coverage]}, - # ], - # upstream_tasks=[temporal_coverage], - # ) - with case(materialize_after_dump, True): # Trigger DBT flow run current_flow_labels = get_current_flow_labels() @@ -192,13 +166,31 @@ wait_for_materialization.retry_delay = timedelta( seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + with case(update_metadata, True): + data = extract_last_date( + dataset_id, table_id, "basedosdados", var_name="data_registro" + ) + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + _last_date=data, + bq_last_update=False, + api_mode="prod", + date_format="yy-mm-dd", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="months", + upstream_tasks=[materialization_flow, data], + ) br_cvm_adm_car_pes_fis.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_cvm_adm_car_pes_fis.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) br_cvm_adm_car_pes_fis.schedule = schedule_fisica with Flow( - "br_cvm_administradores_carteira.pessoa_juridica", code_owners=["lucas_cr"] + "br_cvm_administradores_carteira.pessoa_juridica", code_owners=["Equipe Pipelines"] ) as br_cvm_adm_car_pes_jur: # Parameters dataset_id = Parameter( @@ -214,6 +206,7 @@ "materialize_after_dump", default=True, required=False ) dbt_alias = Parameter("dbt_alias", default=False, required=False) + update_metadata = Parameter("update_metadata", default=False, required=False) rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id @@ -230,25 +223,6 @@ wait=filepath, ) - # update_metadata - temporal_coverage = get_temporal_coverage( - filepath=filepath, - date_cols=["data_registro"], - time_unit="day", - interval="1", - upstream_tasks=[wait_upload_table], - ) - - # wait_update_metadata = update_metadata( - # dataset_id=dataset_id, - # table_id=table_id, - # fields_to_update=[ - # {"last_updated": {"data": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}}, - # {"temporal_coverage": [temporal_coverage]}, - # ], - # upstream_tasks=[temporal_coverage], - # ) - with case(materialize_after_dump, True): # Trigger DBT flow run current_flow_labels = get_current_flow_labels() @@ -278,6 +252,24 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + with case(update_metadata, True): + data = extract_last_date( + dataset_id, table_id, "basedosdados", var_name="data_registro" + ) + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + _last_date=data, + bq_last_update=False, + api_mode="prod", + date_format="yy-mm-dd", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="months", + upstream_tasks=[materialization_flow, data], + ) br_cvm_adm_car_pes_jur.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_cvm_adm_car_pes_jur.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) br_cvm_adm_car_pes_jur.schedule = schedule_juridica diff --git a/pipelines/datasets/br_cvm_administradores_carteira/tasks.py b/pipelines/datasets/br_cvm_administradores_carteira/tasks.py index 8a3dc2cbf..f6268bf0f 100644 --- a/pipelines/datasets/br_cvm_administradores_carteira/tasks.py +++ b/pipelines/datasets/br_cvm_administradores_carteira/tasks.py @@ -12,6 +12,9 @@ from pandas.api.types import is_string_dtype from prefect import task from unidecode import unidecode +import basedosdados as bd +from datetime import datetime +from pipelines.utils.utils import log @task @@ -150,3 +153,44 @@ def clean_table_pessoa_juridica(root: str) -> str: dataframe.to_csv(ou_filepath, index=False) return ou_filepath + + +@task +def extract_last_date( + dataset_id: str, + table_id: str, + billing_project_id: str, + var_name: str, +) -> datetime: + """ + Extracts the last update date of a given dataset table. + + Args: + dataset_id (str): The ID of the dataset. + table_id (str): The ID of the table. + billing_project_id (str): The billing project ID. + + Returns: + str: The last update date in the format 'yyyy-mm-dd'. + + Raises: + Exception: If an error occurs while extracting the last update date. + """ + + query_bd = f""" + SELECT MAX({var_name}) as max_date + FROM + `{billing_project_id}.{dataset_id}.{table_id}` + """ + + t = bd.read_sql( + query=query_bd, + billing_project_id=billing_project_id, + from_file=True, + ) + + data = t["max_date"][0] + + log(f"A data mais recente da tabela é: {data}") + + return str(data)