diff --git a/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py b/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py index c4b85f662..adb9678dc 100644 --- a/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py +++ b/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py @@ -24,11 +24,11 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) with Flow( @@ -110,44 +110,18 @@ wait_for_materialization.retry_delay = timedelta( seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - - # ! tabela bd pro - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[0] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[0]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[0] + "_atualizado", + table_id[0], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, @@ -192,43 +166,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[1] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[1]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[1] + "_atualizado", + table_id[1], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, @@ -275,43 +224,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[2] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[2]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[2] + "_atualizado", + table_id[2], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, @@ -357,44 +281,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[3] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[3]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[3] + "_atualizado", + table_id[3], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, diff --git a/pipelines/datasets/br_anatel_telefonia_movel/flows.py b/pipelines/datasets/br_anatel_telefonia_movel/flows.py index 072503bcc..d021bba7c 100644 --- a/pipelines/datasets/br_anatel_telefonia_movel/flows.py +++ b/pipelines/datasets/br_anatel_telefonia_movel/flows.py @@ -25,11 +25,11 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) with Flow(name="br_anatel_telefonia_movel", code_owners=["tricktx"]) as br_anatel: @@ -116,43 +116,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[0] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[0]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[0] + "_atualizado", + table_id[0], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, @@ -198,43 +173,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[1] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[1]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[1] + "_atualizado", + table_id[1], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, @@ -282,43 +232,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[2] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[2]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[2] + "_atualizado", + table_id[2], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, @@ -365,44 +290,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[3] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[3]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[3] + "_atualizado", + table_id[3], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, diff --git a/pipelines/datasets/br_anp_precos_combustiveis/flows.py b/pipelines/datasets/br_anp_precos_combustiveis/flows.py index 1fb522cd5..f8cc27c9d 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/flows.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/flows.py @@ -28,12 +28,12 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, log_task, rename_current_flow_run_dataset_table, - update_django_metadata, ) with Flow( @@ -116,12 +116,16 @@ table_id, metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=True, api_mode="prod", date_format="yy-mm-dd", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="weeks", _last_date=get_date_max_pro, upstream_tasks=[wait_upload_table], ) - anp_microdados.storage = GCS(constants.GCS_FLOWS_BUCKET.value) anp_microdados.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) anp_microdados.schedule = every_week_anp_microdados diff --git a/pipelines/datasets/br_b3_cotacoes/flows.py b/pipelines/datasets/br_b3_cotacoes/flows.py index 21068a654..22b2cbdd4 100644 --- a/pipelines/datasets/br_b3_cotacoes/flows.py +++ b/pipelines/datasets/br_b3_cotacoes/flows.py @@ -16,11 +16,11 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) from pipelines.utils.utils import log @@ -94,6 +94,9 @@ table_id, metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", _last_date=data_max, diff --git a/pipelines/datasets/br_bcb_agencia/flows.py b/pipelines/datasets/br_bcb_agencia/flows.py index 67dfa704a..abab85f35 100644 --- a/pipelines/datasets/br_bcb_agencia/flows.py +++ b/pipelines/datasets/br_bcb_agencia/flows.py @@ -13,7 +13,7 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants -from pipelines.utils.metadata.flows import update_django_metadata +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, diff --git a/pipelines/datasets/br_bcb_estban/flows.py b/pipelines/datasets/br_bcb_estban/flows.py index c519474f3..bc1490e22 100644 --- a/pipelines/datasets/br_bcb_estban/flows.py +++ b/pipelines/datasets/br_bcb_estban/flows.py @@ -27,7 +27,7 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants -from pipelines.utils.metadata.flows import update_django_metadata +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, diff --git a/pipelines/datasets/br_me_comex_stat/flows.py b/pipelines/datasets/br_me_comex_stat/flows.py index 376310173..1b5895cbe 100644 --- a/pipelines/datasets/br_me_comex_stat/flows.py +++ b/pipelines/datasets/br_me_comex_stat/flows.py @@ -25,11 +25,11 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) with Flow( @@ -102,36 +102,6 @@ wait_for_materialization.retry_delay = timedelta( seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - - # materialize municipio_exportacao_atualizado - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) # coverage updater with case(update_metadata, True): update = update_django_metadata( @@ -140,22 +110,13 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, + is_bd_pro=True, + is_free=True, api_mode="prod", billing_project_id="basedosdados", date_format="yy-mm", - ) - - with case(update_metadata, True): - update_django_metadata( - dataset_id, - table_id + "_atualizado", - metadata_type="DateTimeRange", - bq_last_update=False, - bq_table_last_year_month=True, - api_mode="prod", - billing_project_id="basedosdados", - date_format="yy-mm", - upstream_tasks=[update], + time_delta=1, + time_unit="months", ) br_comex_municipio_exportacao.storage = GCS(constants.GCS_FLOWS_BUCKET.value) @@ -240,63 +201,15 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, + is_bd_pro=True, + is_free=True, api_mode="prod", billing_project_id="basedosdados", date_format="yy-mm", + time_delta=1, + time_unit="months", ) - # materialize municipio_importacao_atualizado - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): - update = update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - bq_table_last_year_month=True, - api_mode="prod", - billing_project_id="basedosdados", - date_format="yy-mm", - ) - with case(update_metadata, True): - update_django_metadata( - dataset_id, - table_id + "_atualizado", - metadata_type="DateTimeRange", - bq_last_update=False, - bq_table_last_year_month=True, - api_mode="prod", - billing_project_id="basedosdados", - date_format="yy-mm", - upstream_tasks=[update], - ) br_comex_municipio_importacao.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_comex_municipio_importacao.run_config = KubernetesRun( @@ -362,35 +275,6 @@ run_name=f"Materialize {dataset_id}.{table_id}", ) - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - # materialize ncm_exportacao - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", - ) - wait_for_materialization = wait_for_flow_run( materialization_flow, stream_states=True, @@ -410,23 +294,15 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, + is_bd_pro=True, + is_free=True, api_mode="prod", billing_project_id="basedosdados", date_format="yy-mm", + time_delta=1, + time_unit="months", ) - with case(update_metadata, True): - update_django_metadata( - dataset_id, - table_id + "_atualizado", - metadata_type="DateTimeRange", - bq_last_update=False, - bq_table_last_year_month=True, - api_mode="prod", - billing_project_id="basedosdados", - date_format="yy-mm", - upstream_tasks=[update], - ) br_comex_ncm_exportacao.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_comex_ncm_exportacao.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) @@ -502,35 +378,6 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # materialize ncm_importacao_atualizado - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) with case(update_metadata, True): update = update_django_metadata( dataset_id, @@ -538,22 +385,13 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, + is_bd_pro=True, + is_free=True, api_mode="prod", billing_project_id="basedosdados", date_format="yy-mm", - ) - - with case(update_metadata, True): - update_django_metadata( - dataset_id, - table_id + "_atualizado", - metadata_type="DateTimeRange", - bq_last_update=False, - bq_table_last_year_month=True, - api_mode="prod", - billing_project_id="basedosdados", - date_format="yy-mm", - upstream_tasks=[update], + time_delta=1, + time_unit="months", ) br_comex_ncm_importacao.storage = GCS(constants.GCS_FLOWS_BUCKET.value) diff --git a/pipelines/datasets/br_ons_avaliacao_operacao/flows.py b/pipelines/datasets/br_ons_avaliacao_operacao/flows.py index 2c21e96ff..9a8aea290 100644 --- a/pipelines/datasets/br_ons_avaliacao_operacao/flows.py +++ b/pipelines/datasets/br_ons_avaliacao_operacao/flows.py @@ -25,11 +25,11 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) with Flow( @@ -108,6 +108,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -196,6 +198,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -287,6 +291,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -379,6 +385,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -472,6 +480,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], diff --git a/pipelines/datasets/br_ons_estimativa_custos/flows.py b/pipelines/datasets/br_ons_estimativa_custos/flows.py index dc3b3c942..cf1b30eec 100644 --- a/pipelines/datasets/br_ons_estimativa_custos/flows.py +++ b/pipelines/datasets/br_ons_estimativa_custos/flows.py @@ -24,11 +24,11 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) with Flow( @@ -108,6 +108,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -201,6 +203,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -294,6 +298,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -388,6 +394,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], diff --git a/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py b/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py index ecdb83d03..d70322502 100644 --- a/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py +++ b/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py @@ -31,11 +31,11 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) # ! Evolucao_mensal_cisp @@ -112,17 +112,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) evolucao_mensal_cisp.storage = GCS(constants.GCS_FLOWS_BUCKET.value) evolucao_mensal_cisp.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) @@ -203,17 +206,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) taxa_evolucao_mensal_uf.storage = GCS(constants.GCS_FLOWS_BUCKET.value) taxa_evolucao_mensal_uf.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) @@ -296,17 +302,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) taxa_evolucao_mensal_municipio.storage = GCS(constants.GCS_FLOWS_BUCKET.value) @@ -390,17 +399,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) feminicidio_mensal_cisp.storage = GCS(constants.GCS_FLOWS_BUCKET.value) @@ -484,17 +496,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) evolucao_policial_morto_servico_mensal.storage = GCS(constants.GCS_FLOWS_BUCKET.value) evolucao_policial_morto_servico_mensal.run_config = KubernetesRun( @@ -579,17 +594,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) armas_apreendidas_mensal.storage = GCS(constants.GCS_FLOWS_BUCKET.value) armas_apreendidas_mensal.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) @@ -670,17 +688,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) evolucao_mensal_municipio.storage = GCS(constants.GCS_FLOWS_BUCKET.value) evolucao_mensal_municipio.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) @@ -761,17 +782,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) evolucao_mensal_uf.storage = GCS(constants.GCS_FLOWS_BUCKET.value) evolucao_mensal_uf.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) diff --git a/pipelines/utils/metadata/tasks.py b/pipelines/utils/metadata/tasks.py index 6c1969e43..1c529f374 100644 --- a/pipelines/utils/metadata/tasks.py +++ b/pipelines/utils/metadata/tasks.py @@ -45,7 +45,7 @@ def update_django_metadata( time_unit: str = "days", ): """ - Updates Django metadata. + Updates Django metadata. Version 1.2. Args: - `dataset_id (str):` The ID of the dataset. @@ -117,9 +117,6 @@ def update_django_metadata( f"Unidade temporal inválida. Escolha entre {', '.join(unidades_permitidas.keys())}" ) - if not isinstance(time_delta, int) or time_delta <= 0: - raise ValueError("Defasagem deve ser um número inteiro positivo") - if billing_project_id not in accepted_billing_project_id: raise Exception( f"The given billing_project_id: {billing_project_id} is invalid. The accepted valuesare {accepted_billing_project_id}" @@ -166,6 +163,8 @@ def update_django_metadata( api_mode=api_mode, ) elif is_bd_pro and is_free: + if not isinstance(time_delta, int) or time_delta <= 0: + raise ValueError("Defasagem deve ser um número inteiro positivo") last_date = extract_last_update( dataset_id, table_id, @@ -215,7 +214,7 @@ def update_django_metadata( ] = resource_to_temporal_coverage_free["endYear"] log( - f"Cobertura PRO ->> {_last_date} || Cobertura Grátis ->> {free_data}" + f"Cobertura PRO ->> {last_date} || Cobertura Grátis ->> {free_data}" ) # resource_to_temporal_coverage = parse_temporal_coverage(f"{last_date}") @@ -254,7 +253,7 @@ def update_django_metadata( date_format, billing_project_id=billing_project_id, ) - log(f"Cobertura PRO ->> {_last_date}") + log(f"Cobertura PRO ->> {last_date}") resource_to_temporal_coverage = parse_temporal_coverage(f"{last_date}") resource_to_temporal_coverage["coverage"] = ids.get("coverage_id_pro") @@ -346,7 +345,7 @@ def update_django_metadata( ] = resource_to_temporal_coverage_free["endYear"] log( - f"Cobertura PRO ->> {_last_date} || Cobertura Grátis ->> {free_data}" + f"Cobertura PRO ->> {last_date} || Cobertura Grátis ->> {free_data}" ) # resource_to_temporal_coverage = parse_temporal_coverage(f"{last_date}") @@ -385,7 +384,7 @@ def update_django_metadata( date_format, billing_project_id=billing_project_id, ) - log(f"Cobertura PRO ->> {_last_date}") + log(f"Cobertura PRO ->> {last_date}") resource_to_temporal_coverage = parse_temporal_coverage(f"{last_date}") resource_to_temporal_coverage["coverage"] = ids.get("coverage_id_pro") @@ -402,6 +401,9 @@ def update_django_metadata( api_mode=api_mode, ) else: + if not isinstance(_last_date, str): + raise ValueError("O parâmetro `last_date` deve ser do tipo string") + if is_free and not is_bd_pro: last_date = _last_date resource_to_temporal_coverage = parse_temporal_coverage(f"{_last_date}") diff --git a/pipelines/utils/metadata/utils.py b/pipelines/utils/metadata/utils.py index b2dae8094..bffffbffd 100644 --- a/pipelines/utils/metadata/utils.py +++ b/pipelines/utils/metadata/utils.py @@ -472,21 +472,6 @@ def create_update( def parse_temporal_coverage(temporal_coverage): - padrao_ano = r"\d{4}\(\d{1,2}\)\d{4}" - padrao_mes = r"\d{4}-\d{2}\(\d{1,2}\)\d{4}-\d{2}" - padrao_semana = r"\d{4}-\d{2}-\d{2}\(\d{1,2}\)\d{4}-\d{2}-\d{2}" - padrao_dia = r"\d{4}-\d{2}-\d{2}\(\d{1,2}\)\d{4}-\d{2}-\d{2}" - - if ( - re.match(padrao_ano, temporal_coverage) - or re.match(padrao_mes, temporal_coverage) - or re.match(padrao_semana, temporal_coverage) - or re.match(padrao_dia, temporal_coverage) - ): - print("A data está no formato correto.") - else: - print("Aviso: A data não está no formato correto.") - # Extrai as informações de data e intervalo da string if "(" in temporal_coverage: start_str, interval_str, end_str = re.split(r"[(|)]", temporal_coverage)