From 4ed659bee9f75918cf612950ddc940eb183ded7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Thu, 14 Sep 2023 16:54:53 -0300 Subject: [PATCH 01/23] feat: updating metadata --- pipelines/datasets/br_bcb_agencia/flows.py | 2 +- pipelines/datasets/br_bcb_estban/flows.py | 2 +- pipelines/datasets/br_me_comex_stat/flows.py | 106 ++---------------- .../br_ons_avaliacao_operacao/flows.py | 13 ++- .../br_ons_estimativa_custos/flows.py | 10 +- 5 files changed, 31 insertions(+), 102 deletions(-) diff --git a/pipelines/datasets/br_bcb_agencia/flows.py b/pipelines/datasets/br_bcb_agencia/flows.py index d0a19d686..51c59ac92 100644 --- a/pipelines/datasets/br_bcb_agencia/flows.py +++ b/pipelines/datasets/br_bcb_agencia/flows.py @@ -23,7 +23,7 @@ from pipelines.constants import constants from pipelines.utils.decorators import Flow from pipelines.utils.constants import constants as utils_constants -from pipelines.utils.metadata.flows import update_django_metadata +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, diff --git a/pipelines/datasets/br_bcb_estban/flows.py b/pipelines/datasets/br_bcb_estban/flows.py index 324307a75..0ba71346d 100644 --- a/pipelines/datasets/br_bcb_estban/flows.py +++ b/pipelines/datasets/br_bcb_estban/flows.py @@ -30,7 +30,7 @@ constants as br_bcb_estban_constants, ) from pipelines.utils.decorators import Flow -from pipelines.utils.metadata.flows import update_django_metadata +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, diff --git a/pipelines/datasets/br_me_comex_stat/flows.py b/pipelines/datasets/br_me_comex_stat/flows.py index 6c7475bce..5182da5d1 100644 --- a/pipelines/datasets/br_me_comex_stat/flows.py +++ b/pipelines/datasets/br_me_comex_stat/flows.py @@ -9,7 +9,7 @@ from prefect.run_configs import KubernetesRun from prefect.storage import GCS from prefect.tasks.prefect import create_flow_run, wait_for_flow_run - +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.constants import constants from pipelines.datasets.br_me_comex_stat.tasks import ( download_br_me_comex_stat, @@ -20,7 +20,6 @@ from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants from pipelines.utils.tasks import ( - update_django_metadata, rename_current_flow_run_dataset_table, get_current_flow_labels, create_table_and_upload_to_gcs, @@ -104,36 +103,6 @@ wait_for_materialization.retry_delay = timedelta( seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - - # materialize municipio_exportacao_atualizado - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) # coverage updater with case(update_metadata, True): update = update_django_metadata( @@ -142,22 +111,13 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, + is_bd_pro=True, + is_free=True, api_mode="prod", billing_project_id="basedosdados", date_format="yy-mm", - ) - - with case(update_metadata, True): - update_django_metadata( - dataset_id, - table_id + "_atualizado", - metadata_type="DateTimeRange", - bq_last_update=False, - bq_table_last_year_month=True, - api_mode="prod", - billing_project_id="basedosdados", - date_format="yy-mm", - upstream_tasks=[update], + time_delta=1, + time_unit="months", ) br_comex_municipio_exportacao.storage = GCS(constants.GCS_FLOWS_BUCKET.value) @@ -242,63 +202,15 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, + is_bd_pro=True, + is_free=True, api_mode="prod", billing_project_id="basedosdados", date_format="yy-mm", + time_delta=1, + time_unit="months", ) - # materialize municipio_importacao_atualizado - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): - update = update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - bq_table_last_year_month=True, - api_mode="prod", - billing_project_id="basedosdados", - date_format="yy-mm", - ) - with case(update_metadata, True): - update_django_metadata( - dataset_id, - table_id + "_atualizado", - metadata_type="DateTimeRange", - bq_last_update=False, - bq_table_last_year_month=True, - api_mode="prod", - billing_project_id="basedosdados", - date_format="yy-mm", - upstream_tasks=[update], - ) br_comex_municipio_importacao.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_comex_municipio_importacao.run_config = KubernetesRun( diff --git a/pipelines/datasets/br_ons_avaliacao_operacao/flows.py b/pipelines/datasets/br_ons_avaliacao_operacao/flows.py index 4144f267a..02f3332a0 100644 --- a/pipelines/datasets/br_ons_avaliacao_operacao/flows.py +++ b/pipelines/datasets/br_ons_avaliacao_operacao/flows.py @@ -9,7 +9,7 @@ from prefect.run_configs import KubernetesRun from prefect.storage import GCS from prefect.tasks.prefect import create_flow_run, wait_for_flow_run - +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.constants import constants from pipelines.datasets.br_ons_avaliacao_operacao.tasks import ( download_data, @@ -25,7 +25,6 @@ rename_current_flow_run_dataset_table, get_current_flow_labels, create_table_and_upload_to_gcs, - update_django_metadata, ) from pipelines.datasets.br_ons_avaliacao_operacao.schedules import ( @@ -113,6 +112,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -201,6 +202,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -292,6 +295,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -384,6 +389,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -477,6 +484,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], diff --git a/pipelines/datasets/br_ons_estimativa_custos/flows.py b/pipelines/datasets/br_ons_estimativa_custos/flows.py index 1fe8d4c32..f4db80454 100644 --- a/pipelines/datasets/br_ons_estimativa_custos/flows.py +++ b/pipelines/datasets/br_ons_estimativa_custos/flows.py @@ -18,6 +18,7 @@ from pipelines.datasets.br_ons_estimativa_custos.constants import ( constants as ons_constants, ) +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants @@ -25,7 +26,6 @@ rename_current_flow_run_dataset_table, get_current_flow_labels, create_table_and_upload_to_gcs, - update_django_metadata, ) from pipelines.datasets.br_ons_estimativa_custos.schedules import ( @@ -113,6 +113,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -206,6 +208,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -299,6 +303,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -393,6 +399,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], From b6b9efe1536af7372e9e7da59f60e024d416f093 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Thu, 14 Sep 2023 19:57:22 -0300 Subject: [PATCH 02/23] feat: removing _atualizado tables --- pipelines/datasets/br_me_comex_stat/flows.py | 91 ++------------------ 1 file changed, 8 insertions(+), 83 deletions(-) diff --git a/pipelines/datasets/br_me_comex_stat/flows.py b/pipelines/datasets/br_me_comex_stat/flows.py index 5182da5d1..8dfd18e80 100644 --- a/pipelines/datasets/br_me_comex_stat/flows.py +++ b/pipelines/datasets/br_me_comex_stat/flows.py @@ -276,35 +276,6 @@ run_name=f"Materialize {dataset_id}.{table_id}", ) - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - # materialize ncm_exportacao - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", - ) - wait_for_materialization = wait_for_flow_run( materialization_flow, stream_states=True, @@ -324,23 +295,15 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, + is_bd_pro=True, + is_free=True, api_mode="prod", billing_project_id="basedosdados", date_format="yy-mm", + time_delta=1, + time_unit="months", ) - with case(update_metadata, True): - update_django_metadata( - dataset_id, - table_id + "_atualizado", - metadata_type="DateTimeRange", - bq_last_update=False, - bq_table_last_year_month=True, - api_mode="prod", - billing_project_id="basedosdados", - date_format="yy-mm", - upstream_tasks=[update], - ) br_comex_ncm_exportacao.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_comex_ncm_exportacao.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) @@ -416,35 +379,6 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # materialize ncm_importacao_atualizado - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) with case(update_metadata, True): update = update_django_metadata( dataset_id, @@ -452,22 +386,13 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, + is_bd_pro=True, + is_free=True, api_mode="prod", billing_project_id="basedosdados", date_format="yy-mm", - ) - - with case(update_metadata, True): - update_django_metadata( - dataset_id, - table_id + "_atualizado", - metadata_type="DateTimeRange", - bq_last_update=False, - bq_table_last_year_month=True, - api_mode="prod", - billing_project_id="basedosdados", - date_format="yy-mm", - upstream_tasks=[update], + time_delta=1, + time_unit="months", ) br_comex_ncm_importacao.storage = GCS(constants.GCS_FLOWS_BUCKET.value) From a1e5256899349cea2a41cbe57dc28861c9a52b37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Thu, 14 Sep 2023 20:18:34 -0300 Subject: [PATCH 03/23] test: testing model registration --- pipelines/utils/metadata/tasks.py | 2 +- pipelines/utils/metadata/utils.py | 15 --------------- 2 files changed, 1 insertion(+), 16 deletions(-) diff --git a/pipelines/utils/metadata/tasks.py b/pipelines/utils/metadata/tasks.py index 72e22cccd..fc9b73058 100644 --- a/pipelines/utils/metadata/tasks.py +++ b/pipelines/utils/metadata/tasks.py @@ -44,7 +44,7 @@ def update_django_metadata( time_unit: str = "days", ): """ - Updates Django metadata. + Updates Django metadata. Version 1.2. Args: - `dataset_id (str):` The ID of the dataset. diff --git a/pipelines/utils/metadata/utils.py b/pipelines/utils/metadata/utils.py index 61f30024e..34fc84b55 100644 --- a/pipelines/utils/metadata/utils.py +++ b/pipelines/utils/metadata/utils.py @@ -474,21 +474,6 @@ def create_update( def parse_temporal_coverage(temporal_coverage): - padrao_ano = r"\d{4}\(\d{1,2}\)\d{4}" - padrao_mes = r"\d{4}-\d{2}\(\d{1,2}\)\d{4}-\d{2}" - padrao_semana = r"\d{4}-\d{2}-\d{2}\(\d{1,2}\)\d{4}-\d{2}-\d{2}" - padrao_dia = r"\d{4}-\d{2}-\d{2}\(\d{1,2}\)\d{4}-\d{2}-\d{2}" - - if ( - re.match(padrao_ano, temporal_coverage) - or re.match(padrao_mes, temporal_coverage) - or re.match(padrao_semana, temporal_coverage) - or re.match(padrao_dia, temporal_coverage) - ): - print("A data está no formato correto.") - else: - print("Aviso: A data não está no formato correto.") - # Extrai as informações de data e intervalo da string if "(" in temporal_coverage: start_str, interval_str, end_str = re.split(r"[(|)]", temporal_coverage) From feeb7dbe0e6a98faf849c14e1a4c841190742479 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Thu, 14 Sep 2023 21:24:23 -0300 Subject: [PATCH 04/23] fix: fix error message --- pipelines/utils/metadata/tasks.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/pipelines/utils/metadata/tasks.py b/pipelines/utils/metadata/tasks.py index fc9b73058..86f198ee4 100644 --- a/pipelines/utils/metadata/tasks.py +++ b/pipelines/utils/metadata/tasks.py @@ -107,17 +107,11 @@ def update_django_metadata( "weeks": "weeks", "days": "days", } - if not isinstance(_last_date, str): - raise ValueError("O parâmetro `last_date` deve ser do tipo string") - if time_unit not in unidades_permitidas: raise ValueError( f"Unidade temporal inválida. Escolha entre {', '.join(unidades_permitidas.keys())}" ) - if not isinstance(time_delta, int) or time_delta <= 0: - raise ValueError("Defasagem deve ser um número inteiro positivo") - if billing_project_id not in accepted_billing_project_id: raise Exception( f"The given billing_project_id: {billing_project_id} is invalid. The accepted valuesare {accepted_billing_project_id}" @@ -164,6 +158,8 @@ def update_django_metadata( api_mode=api_mode, ) elif is_bd_pro and is_free: + if not isinstance(time_delta, int) or time_delta <= 0: + raise ValueError("Defasagem deve ser um número inteiro positivo") last_date = extract_last_update( dataset_id, table_id, @@ -400,6 +396,9 @@ def update_django_metadata( api_mode=api_mode, ) else: + if not isinstance(_last_date, str): + raise ValueError("O parâmetro `last_date` deve ser do tipo string") + if is_free and not is_bd_pro: last_date = _last_date resource_to_temporal_coverage = parse_temporal_coverage(f"{_last_date}") From e000f3a895a7b0d3398b000d605e8aad890fa238 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Thu, 14 Sep 2023 22:21:41 -0300 Subject: [PATCH 05/23] feat: changing --- pipelines/datasets/br_bcb_agencia/flows.py | 2 +- pipelines/datasets/br_bcb_estban/flows.py | 4 ++-- pipelines/datasets/br_me_comex_stat/flows.py | 8 ++++---- pipelines/datasets/br_ons_avaliacao_operacao/flows.py | 10 +++++----- pipelines/datasets/br_ons_estimativa_custos/flows.py | 8 ++++---- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/pipelines/datasets/br_bcb_agencia/flows.py b/pipelines/datasets/br_bcb_agencia/flows.py index 51c59ac92..7a9ed006f 100644 --- a/pipelines/datasets/br_bcb_agencia/flows.py +++ b/pipelines/datasets/br_bcb_agencia/flows.py @@ -112,7 +112,7 @@ bq_last_update=False, bq_table_last_year_month=True, api_mode="prod", - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", date_format="yy-mm", is_bd_pro=True, is_free=True, diff --git a/pipelines/datasets/br_bcb_estban/flows.py b/pipelines/datasets/br_bcb_estban/flows.py index 0ba71346d..e5734105e 100644 --- a/pipelines/datasets/br_bcb_estban/flows.py +++ b/pipelines/datasets/br_bcb_estban/flows.py @@ -122,7 +122,7 @@ bq_last_update=False, bq_table_last_year_month=True, api_mode="prod", - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", date_format="yy-mm", is_bd_pro=True, is_free=True, @@ -221,7 +221,7 @@ bq_last_update=False, bq_table_last_year_month=True, api_mode="prod", - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", date_format="yy-mm", is_bd_pro=True, is_free=True, diff --git a/pipelines/datasets/br_me_comex_stat/flows.py b/pipelines/datasets/br_me_comex_stat/flows.py index 8dfd18e80..da1faa25a 100644 --- a/pipelines/datasets/br_me_comex_stat/flows.py +++ b/pipelines/datasets/br_me_comex_stat/flows.py @@ -114,7 +114,7 @@ is_bd_pro=True, is_free=True, api_mode="prod", - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", date_format="yy-mm", time_delta=1, time_unit="months", @@ -205,7 +205,7 @@ is_bd_pro=True, is_free=True, api_mode="prod", - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", date_format="yy-mm", time_delta=1, time_unit="months", @@ -298,7 +298,7 @@ is_bd_pro=True, is_free=True, api_mode="prod", - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", date_format="yy-mm", time_delta=1, time_unit="months", @@ -389,7 +389,7 @@ is_bd_pro=True, is_free=True, api_mode="prod", - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", date_format="yy-mm", time_delta=1, time_unit="months", diff --git a/pipelines/datasets/br_ons_avaliacao_operacao/flows.py b/pipelines/datasets/br_ons_avaliacao_operacao/flows.py index 02f3332a0..4cd3fd280 100644 --- a/pipelines/datasets/br_ons_avaliacao_operacao/flows.py +++ b/pipelines/datasets/br_ons_avaliacao_operacao/flows.py @@ -111,7 +111,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", is_bd_pro=True, is_free=False, api_mode="prod", @@ -201,7 +201,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", is_bd_pro=True, is_free=False, api_mode="prod", @@ -294,7 +294,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", is_bd_pro=True, is_free=False, api_mode="prod", @@ -388,7 +388,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", is_bd_pro=True, is_free=False, api_mode="prod", @@ -483,7 +483,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", is_bd_pro=True, is_free=False, api_mode="prod", diff --git a/pipelines/datasets/br_ons_estimativa_custos/flows.py b/pipelines/datasets/br_ons_estimativa_custos/flows.py index f4db80454..46bd0ad6f 100644 --- a/pipelines/datasets/br_ons_estimativa_custos/flows.py +++ b/pipelines/datasets/br_ons_estimativa_custos/flows.py @@ -112,7 +112,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", is_bd_pro=True, is_free=False, api_mode="prod", @@ -207,7 +207,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", is_bd_pro=True, is_free=False, api_mode="prod", @@ -302,7 +302,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", is_bd_pro=True, is_free=False, api_mode="prod", @@ -398,7 +398,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", is_bd_pro=True, is_free=False, api_mode="prod", From 41abe31aaaebd3427faea36564e3d39c27a9f834 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Fri, 15 Sep 2023 01:35:46 -0300 Subject: [PATCH 06/23] feat: registering more flows --- .../br_anatel_banda_larga_fixa/flows.py | 152 +++--------------- .../br_anatel_telefonia_movel/flows.py | 151 +++-------------- .../br_anp_precos_combustiveis/flows.py | 49 +----- pipelines/datasets/br_b3_cotacoes/flows.py | 5 +- .../br_rj_isp_estatisticas_seguranca/flows.py | 26 ++- 5 files changed, 85 insertions(+), 298 deletions(-) diff --git a/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py b/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py index 83cff365b..aa9c945ce 100644 --- a/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py +++ b/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py @@ -9,7 +9,7 @@ from prefect.storage import GCS from prefect.tasks.prefect import create_flow_run, wait_for_flow_run from pipelines.constants import constants -from pipelines.utils.tasks import update_django_metadata +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants @@ -110,44 +110,18 @@ wait_for_materialization.retry_delay = timedelta( seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - - # ! tabela bd pro - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[0] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[0]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[0] + "_atualizado", + table_id[0], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, @@ -192,43 +166,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[1] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[1]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[1] + "_atualizado", + table_id[1], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, @@ -275,43 +224,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[2] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[2]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[2] + "_atualizado", + table_id[2], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, @@ -357,44 +281,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[3] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[3]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[3] + "_atualizado", + table_id[3], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, diff --git a/pipelines/datasets/br_anatel_telefonia_movel/flows.py b/pipelines/datasets/br_anatel_telefonia_movel/flows.py index c2a8b75f1..94da6409e 100644 --- a/pipelines/datasets/br_anatel_telefonia_movel/flows.py +++ b/pipelines/datasets/br_anatel_telefonia_movel/flows.py @@ -10,7 +10,7 @@ from prefect.tasks.prefect import create_flow_run, wait_for_flow_run from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants -from pipelines.utils.tasks import update_django_metadata +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.constants import constants as utils_constants from pipelines.constants import constants from pipelines.datasets.br_anatel_telefonia_movel.constants import ( @@ -118,43 +118,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[0] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[0]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[0] + "_atualizado", + table_id[0], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, @@ -200,43 +175,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[1] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[1]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[1] + "_atualizado", + table_id[1], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, @@ -284,43 +234,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[2] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[2]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[2] + "_atualizado", + table_id[2], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, @@ -367,44 +292,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[3] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[3]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[3] + "_atualizado", + table_id[3], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, diff --git a/pipelines/datasets/br_anp_precos_combustiveis/flows.py b/pipelines/datasets/br_anp_precos_combustiveis/flows.py index ce81cd827..64e3c564d 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/flows.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/flows.py @@ -9,7 +9,7 @@ from prefect.storage import GCS from prefect.tasks.prefect import create_flow_run, wait_for_flow_run from pipelines.constants import constants -from pipelines.utils.tasks import update_django_metadata +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants @@ -101,50 +101,13 @@ table_id, metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=True, api_mode="prod", date_format="yy-mm-dd", - _last_date=get_date_max_mais, - upstream_tasks=[wait_upload_table], - ) - - # ! BD PRO - Atualizado - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}" "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - - with case(update_metadata, True): - update_django_metadata( - dataset_id, - table_id + "_atualizado", - metadata_type="DateTimeRange", - bq_last_update=False, - api_mode="prod", - date_format="yy-mm-dd", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="weeks", _last_date=get_date_max_pro, upstream_tasks=[wait_upload_table], ) diff --git a/pipelines/datasets/br_b3_cotacoes/flows.py b/pipelines/datasets/br_b3_cotacoes/flows.py index d3ededa47..8cbdd144d 100644 --- a/pipelines/datasets/br_b3_cotacoes/flows.py +++ b/pipelines/datasets/br_b3_cotacoes/flows.py @@ -8,7 +8,7 @@ from prefect.run_configs import KubernetesRun from prefect.storage import GCS from prefect.tasks.prefect import create_flow_run, wait_for_flow_run -from pipelines.utils.tasks import update_django_metadata +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.constants import constants from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow @@ -101,6 +101,9 @@ table_id, metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", _last_date=data_max, diff --git a/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py b/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py index 34a31534a..f9062fcd4 100644 --- a/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py +++ b/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py @@ -11,7 +11,7 @@ wait_for_flow_run, ) from pipelines.constants import constants -from pipelines.utils.tasks import update_django_metadata +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.constants import constants as utils_constants from pipelines.datasets.br_rj_isp_estatisticas_seguranca.tasks import ( download_files, @@ -123,7 +123,10 @@ dataset_id, table_id, metadata_type="DateTimeRange", + bq_table_last_year_month=False, bq_last_update=False, + is_bd_pro=False, + is_free=True, api_mode="prod", date_format="yy-mm", _last_date=date, @@ -214,7 +217,10 @@ dataset_id, table_id, metadata_type="DateTimeRange", + bq_table_last_year_month=False, bq_last_update=False, + is_bd_pro=False, + is_free=True, api_mode="prod", date_format="yy-mm", _last_date=date, @@ -307,7 +313,10 @@ dataset_id, table_id, metadata_type="DateTimeRange", + bq_table_last_year_month=False, bq_last_update=False, + is_bd_pro=False, + is_free=True, api_mode="prod", date_format="yy-mm", _last_date=date, @@ -401,7 +410,10 @@ dataset_id, table_id, metadata_type="DateTimeRange", + bq_table_last_year_month=False, bq_last_update=False, + is_bd_pro=False, + is_free=True, api_mode="prod", date_format="yy-mm", _last_date=date, @@ -495,7 +507,10 @@ dataset_id, table_id, metadata_type="DateTimeRange", + bq_table_last_year_month=False, bq_last_update=False, + is_bd_pro=False, + is_free=True, api_mode="prod", date_format="yy-mm", _last_date=date, @@ -590,7 +605,10 @@ dataset_id, table_id, metadata_type="DateTimeRange", + bq_table_last_year_month=False, bq_last_update=False, + is_bd_pro=False, + is_free=True, api_mode="prod", date_format="yy-mm", _last_date=date, @@ -681,7 +699,10 @@ dataset_id, table_id, metadata_type="DateTimeRange", + bq_table_last_year_month=False, bq_last_update=False, + is_bd_pro=False, + is_free=True, api_mode="prod", date_format="yy-mm", _last_date=date, @@ -772,7 +793,10 @@ dataset_id, table_id, metadata_type="DateTimeRange", + bq_table_last_year_month=False, bq_last_update=False, + is_bd_pro=False, + is_free=True, api_mode="prod", date_format="yy-mm", _last_date=date, From 795eefd05c85e8db38540d23a2d350860ac51f1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Fri, 15 Sep 2023 01:54:04 -0300 Subject: [PATCH 07/23] fix: fix log --- pipelines/utils/metadata/tasks.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pipelines/utils/metadata/tasks.py b/pipelines/utils/metadata/tasks.py index 86f198ee4..d440e1066 100644 --- a/pipelines/utils/metadata/tasks.py +++ b/pipelines/utils/metadata/tasks.py @@ -209,7 +209,7 @@ def update_django_metadata( ] = resource_to_temporal_coverage_free["endYear"] log( - f"Cobertura PRO ->> {_last_date} || Cobertura Grátis ->> {free_data}" + f"Cobertura PRO ->> {last_date} || Cobertura Grátis ->> {free_data}" ) # resource_to_temporal_coverage = parse_temporal_coverage(f"{last_date}") @@ -248,7 +248,7 @@ def update_django_metadata( date_format, billing_project_id=billing_project_id, ) - log(f"Cobertura PRO ->> {_last_date}") + log(f"Cobertura PRO ->> {last_date}") resource_to_temporal_coverage = parse_temporal_coverage(f"{last_date}") resource_to_temporal_coverage["coverage"] = ids.get("coverage_id_pro") @@ -340,7 +340,7 @@ def update_django_metadata( ] = resource_to_temporal_coverage_free["endYear"] log( - f"Cobertura PRO ->> {_last_date} || Cobertura Grátis ->> {free_data}" + f"Cobertura PRO ->> {last_date} || Cobertura Grátis ->> {free_data}" ) # resource_to_temporal_coverage = parse_temporal_coverage(f"{last_date}") @@ -379,7 +379,7 @@ def update_django_metadata( date_format, billing_project_id=billing_project_id, ) - log(f"Cobertura PRO ->> {_last_date}") + log(f"Cobertura PRO ->> {last_date}") resource_to_temporal_coverage = parse_temporal_coverage(f"{last_date}") resource_to_temporal_coverage["coverage"] = ids.get("coverage_id_pro") From a5084262433e36abde8411f5aedc2f54972b4879 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Fri, 15 Sep 2023 15:52:32 -0300 Subject: [PATCH 08/23] fx: small fix --- .../br_rj_isp_estatisticas_seguranca/flows.py | 224 +++++++++--------- 1 file changed, 112 insertions(+), 112 deletions(-) diff --git a/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py b/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py index f9062fcd4..8e0f7d7bd 100644 --- a/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py +++ b/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py @@ -117,20 +117,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_table_last_year_month=False, - bq_last_update=False, - is_bd_pro=False, - is_free=True, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) evolucao_mensal_cisp.storage = GCS(constants.GCS_FLOWS_BUCKET.value) evolucao_mensal_cisp.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) @@ -211,20 +211,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_table_last_year_month=False, - bq_last_update=False, - is_bd_pro=False, - is_free=True, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) taxa_evolucao_mensal_uf.storage = GCS(constants.GCS_FLOWS_BUCKET.value) taxa_evolucao_mensal_uf.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) @@ -307,20 +307,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_table_last_year_month=False, - bq_last_update=False, - is_bd_pro=False, - is_free=True, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) taxa_evolucao_mensal_municipio.storage = GCS(constants.GCS_FLOWS_BUCKET.value) @@ -404,20 +404,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_table_last_year_month=False, - bq_last_update=False, - is_bd_pro=False, - is_free=True, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) feminicidio_mensal_cisp.storage = GCS(constants.GCS_FLOWS_BUCKET.value) @@ -501,20 +501,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_table_last_year_month=False, - bq_last_update=False, - is_bd_pro=False, - is_free=True, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) evolucao_policial_morto_servico_mensal.storage = GCS(constants.GCS_FLOWS_BUCKET.value) evolucao_policial_morto_servico_mensal.run_config = KubernetesRun( @@ -599,20 +599,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_table_last_year_month=False, - bq_last_update=False, - is_bd_pro=False, - is_free=True, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) armas_apreendidas_mensal.storage = GCS(constants.GCS_FLOWS_BUCKET.value) armas_apreendidas_mensal.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) @@ -693,20 +693,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_table_last_year_month=False, - bq_last_update=False, - is_bd_pro=False, - is_free=True, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) evolucao_mensal_municipio.storage = GCS(constants.GCS_FLOWS_BUCKET.value) evolucao_mensal_municipio.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) @@ -787,20 +787,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_table_last_year_month=False, - bq_last_update=False, - is_bd_pro=False, - is_free=True, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) evolucao_mensal_uf.storage = GCS(constants.GCS_FLOWS_BUCKET.value) evolucao_mensal_uf.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) From d919852452e22ec1464890b90f263afa158fe0ae Mon Sep 17 00:00:00 2001 From: Laura Amaral Date: Tue, 19 Sep 2023 09:31:35 -0300 Subject: [PATCH 09/23] include dbt_test --- pipelines/utils/execute_dbt_model/flows.py | 2 ++ pipelines/utils/execute_dbt_model/tasks.py | 29 ++++++++++++++++++---- pipelines/utils/execute_dbt_model/utils.py | 18 ++++++++++++++ 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/pipelines/utils/execute_dbt_model/flows.py b/pipelines/utils/execute_dbt_model/flows.py index 1f331d591..3676f395e 100644 --- a/pipelines/utils/execute_dbt_model/flows.py +++ b/pipelines/utils/execute_dbt_model/flows.py @@ -23,6 +23,7 @@ table_id = Parameter("table_id") mode = Parameter("mode", default="dev", required=False) dbt_alias = Parameter("dbt_alias", default=False, required=False) + dbt_test = Parameter("dbt_test", default=False, required=False) ################# #################### # @@ -43,6 +44,7 @@ table_id=table_id, dbt_alias=dbt_alias, sync=True, + dbt_test=dbt_test ) run_dbt_model_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) diff --git a/pipelines/utils/execute_dbt_model/tasks.py b/pipelines/utils/execute_dbt_model/tasks.py index 93c738be6..0f08a1281 100644 --- a/pipelines/utils/execute_dbt_model/tasks.py +++ b/pipelines/utils/execute_dbt_model/tasks.py @@ -8,9 +8,11 @@ from dbt_client import DbtClient from prefect import task +from pipelines.utils.utils import log from pipelines.utils.execute_dbt_model.utils import ( get_dbt_client, + parse_dbt_logs, ) from pipelines.constants import constants @@ -44,6 +46,7 @@ def run_dbt_model( dataset_id: str, table_id: str, dbt_alias: bool, + dbt_test: bool, sync: bool = True, ): """ @@ -51,8 +54,24 @@ def run_dbt_model( """ if dbt_alias: table_id = f"{dataset_id}__{table_id}" - dbt_client.cli( - f"run --models {dataset_id}.{table_id}", - sync=sync, - logs=True, - ) + + # dbt_client.cli( + # f"run --models {dataset_id}.{table_id}", + # sync=sync, + # logs=True, + # ) + + if dbt_test: + log(f"test --models {dataset_id}.{table_id} --store-failures") + logs_dict = dbt_client.cli( + f"test --models {dataset_id}.{table_id} --store-failures", + sync=sync, + logs=True, + ) + for event in logs_dict["result"]["logs"]: + if event["levelname"] == "INFO" or event["levelname"] == "ERROR": + log(f"#####{event['levelname']}#####") + log(event["message"]) + if event["levelname"] == "DEBUG": + if "On model" in event["message"]: + log(event["message"]) \ No newline at end of file diff --git a/pipelines/utils/execute_dbt_model/utils.py b/pipelines/utils/execute_dbt_model/utils.py index c3c850acf..ed295eb8e 100644 --- a/pipelines/utils/execute_dbt_model/utils.py +++ b/pipelines/utils/execute_dbt_model/utils.py @@ -8,6 +8,8 @@ from dbt_client import DbtClient from prefect.schedules.clocks import IntervalClock +from pipelines.utils.utils import log + def get_dbt_client( host: str = "dbt-rpc", @@ -59,3 +61,19 @@ def generate_execute_dbt_model_schedules( # pylint: disable=too-many-arguments, ) ) return clocks + +def parse_dbt_logs(logs_dict: dict, log_queries: bool = False): + """Parse dbt returned logs, to print only needed + pieces. + + Args: + logs_dict (dict): logs dict returned when running a DBT + command via DbtClient.cli() with argument logs = True + """ + for event in logs_dict["result"]["logs"]: + if event["levelname"] == "INFO" or event["levelname"] == "ERROR": + log(f"#####{event['levelname']}#####") + log(event["message"]) + if event["levelname"] == "DEBUG" and log_queries: + if "On model" in event["message"]: + log(event["message"]) \ No newline at end of file From ceab71e5b72e1ff6636134d86b26a1763a5f09a3 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 22 Sep 2023 20:54:31 +0000 Subject: [PATCH 10/23] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pipelines/utils/metadata/tasks.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pipelines/utils/metadata/tasks.py b/pipelines/utils/metadata/tasks.py index dbdda6c62..95670f891 100644 --- a/pipelines/utils/metadata/tasks.py +++ b/pipelines/utils/metadata/tasks.py @@ -108,11 +108,9 @@ def update_django_metadata( "days": "days", } - if not isinstance(_last_date, str) and _last_date is not None: raise ValueError("O parâmetro `last_date` deve ser uma string não nula") - if time_unit not in unidades_permitidas: raise ValueError( f"Unidade temporal inválida. Escolha entre {', '.join(unidades_permitidas.keys())}" From 201cbaf7632e803a6600cc2cce238d31aa804e1b Mon Sep 17 00:00:00 2001 From: tricktx Date: Sun, 24 Sep 2023 21:26:11 -0300 Subject: [PATCH 11/23] fix check for updates --- pipelines/datasets/br_anp_precos_combustiveis/tasks.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pipelines/datasets/br_anp_precos_combustiveis/tasks.py b/pipelines/datasets/br_anp_precos_combustiveis/tasks.py index 48c911278..0ce9012f5 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/tasks.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/tasks.py @@ -38,9 +38,8 @@ def check_for_updates(dataset_id, table_id): # Obtém a data mais recente do site download_files(anatel_constants.URLS_DATA.value, anatel_constants.PATH_INPUT.value) df = pd.read_csv(anatel_constants.URL_GLP.value, sep=";", encoding="utf-8") - data_obj = df["Data da Coleta"].max() - data_obj = datetime.strptime(data_obj, "%d/%m/%Y").strftime("%Y-%m-%d") - data_obj = datetime.strptime(data_obj, "%Y-%m-%d").date() + data_obj = pd.to_datetime(df["Data da Coleta"]).max() + data_obj = data_obj.date() # Obtém a última data no site BD data_bq_obj = extract_last_date( From f06efd475681a0e2ebbbc30ebe897080105b4361 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Mon, 25 Sep 2023 12:09:09 -0300 Subject: [PATCH 12/23] feat: making requested changes --- pipelines/datasets/br_ons_avaliacao_operacao/flows.py | 10 +++++----- pipelines/datasets/br_ons_estimativa_custos/flows.py | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pipelines/datasets/br_ons_avaliacao_operacao/flows.py b/pipelines/datasets/br_ons_avaliacao_operacao/flows.py index 4cd3fd280..02f3332a0 100644 --- a/pipelines/datasets/br_ons_avaliacao_operacao/flows.py +++ b/pipelines/datasets/br_ons_avaliacao_operacao/flows.py @@ -111,7 +111,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", is_bd_pro=True, is_free=False, api_mode="prod", @@ -201,7 +201,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", is_bd_pro=True, is_free=False, api_mode="prod", @@ -294,7 +294,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", is_bd_pro=True, is_free=False, api_mode="prod", @@ -388,7 +388,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", is_bd_pro=True, is_free=False, api_mode="prod", @@ -483,7 +483,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", is_bd_pro=True, is_free=False, api_mode="prod", diff --git a/pipelines/datasets/br_ons_estimativa_custos/flows.py b/pipelines/datasets/br_ons_estimativa_custos/flows.py index 46bd0ad6f..f4db80454 100644 --- a/pipelines/datasets/br_ons_estimativa_custos/flows.py +++ b/pipelines/datasets/br_ons_estimativa_custos/flows.py @@ -112,7 +112,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", is_bd_pro=True, is_free=False, api_mode="prod", @@ -207,7 +207,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", is_bd_pro=True, is_free=False, api_mode="prod", @@ -302,7 +302,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", is_bd_pro=True, is_free=False, api_mode="prod", @@ -398,7 +398,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", is_bd_pro=True, is_free=False, api_mode="prod", From 2881d38fddd9933920807d3b9fe6ee15376ee0ca Mon Sep 17 00:00:00 2001 From: Laura Amaral Date: Mon, 25 Sep 2023 13:34:33 -0300 Subject: [PATCH 13/23] include dbt logs --- pipelines/utils/execute_dbt_model/tasks.py | 34 +++++++++++++++------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/pipelines/utils/execute_dbt_model/tasks.py b/pipelines/utils/execute_dbt_model/tasks.py index 0f08a1281..5909c052e 100644 --- a/pipelines/utils/execute_dbt_model/tasks.py +++ b/pipelines/utils/execute_dbt_model/tasks.py @@ -12,7 +12,6 @@ from pipelines.utils.execute_dbt_model.utils import ( get_dbt_client, - parse_dbt_logs, ) from pipelines.constants import constants @@ -55,21 +54,36 @@ def run_dbt_model( if dbt_alias: table_id = f"{dataset_id}__{table_id}" - # dbt_client.cli( - # f"run --models {dataset_id}.{table_id}", - # sync=sync, - # logs=True, - # ) - if dbt_test: - log(f"test --models {dataset_id}.{table_id} --store-failures") + log(f"test --models {dataset_id}.{table_id}") logs_dict = dbt_client.cli( - f"test --models {dataset_id}.{table_id} --store-failures", + f"test --models {dataset_id}.{table_id}", + sync=sync, + logs=True, + ) + for event in logs_dict["result"]["logs"]: + if event["levelname"] == "INFO" and "WARN" in event["message"]: + log(f"#####{event['levelname']}#####") + log(event["message"]) + if event["levelname"] == "DEBUG": + if "On model" in event["message"]: + log(event["message"]) + + for event in logs_dict["result"]["logs"]: + if event["levelname"] == "INFO" and "WARN" in event["message"]: + log(f"#####{event['levelname']}#####") + log(event["message"]) + if event["levelname"] == "DEBUG": + if "On model" in event["message"]: + log(event["message"]) + else: + logs_dict = dbt_client.cli( + f"run --models {dataset_id}.{table_id}", sync=sync, logs=True, ) for event in logs_dict["result"]["logs"]: - if event["levelname"] == "INFO" or event["levelname"] == "ERROR": + if event["levelname"] == "INFO" and "WARN" in event["message"]: log(f"#####{event['levelname']}#####") log(event["message"]) if event["levelname"] == "DEBUG": From 75c5bb1eb940c4047d347019866ea62f2769519d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Mon, 25 Sep 2023 14:30:13 -0300 Subject: [PATCH 14/23] fix: fixing identation --- .../br_anp_precos_combustiveis/flows.py | 24 ++++--------------- 1 file changed, 5 insertions(+), 19 deletions(-) diff --git a/pipelines/datasets/br_anp_precos_combustiveis/flows.py b/pipelines/datasets/br_anp_precos_combustiveis/flows.py index 4a3c2a15e..c971d4dfa 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/flows.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/flows.py @@ -71,23 +71,6 @@ output_path = make_partitions(df=df, upstream_tasks=[df]) get_date_max_pro = data_max_bd_pro(df=df) - - with case(update_metadata, True): - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - bq_table_last_year_month=True, - api_mode="prod", - date_format="yy-mm-dd", - is_bd_pro=True, - is_free=True, - time_delta=6, - time_unit="weeks", - _last_date=get_date_max_pro, - upstream_tasks=[wait_upload_table], - # pylint: disable=C0103 wait_upload_table = create_table_and_upload_to_gcs( data_path=output_path, @@ -121,7 +104,6 @@ ) wait_for_materialization.max_retries = ( dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) wait_for_materialization.retry_delay = timedelta( seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value @@ -133,12 +115,16 @@ table_id, metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=True, api_mode="prod", date_format="yy-mm-dd", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="weeks", _last_date=get_date_max_pro, upstream_tasks=[wait_upload_table], ) - anp_microdados.storage = GCS(constants.GCS_FLOWS_BUCKET.value) anp_microdados.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) anp_microdados.schedule = every_week_anp_microdados From 616efeb6a7793975f4981556d697c56a70c317df Mon Sep 17 00:00:00 2001 From: Laura Amaral Date: Wed, 27 Sep 2023 16:03:54 -0300 Subject: [PATCH 15/23] include 'test', 'run' and 'run and test' scenarios --- pipelines/utils/execute_dbt_model/flows.py | 6 ++-- pipelines/utils/execute_dbt_model/tasks.py | 33 +++++++++------------- 2 files changed, 17 insertions(+), 22 deletions(-) diff --git a/pipelines/utils/execute_dbt_model/flows.py b/pipelines/utils/execute_dbt_model/flows.py index 3676f395e..3874e613f 100644 --- a/pipelines/utils/execute_dbt_model/flows.py +++ b/pipelines/utils/execute_dbt_model/flows.py @@ -23,7 +23,7 @@ table_id = Parameter("table_id") mode = Parameter("mode", default="dev", required=False) dbt_alias = Parameter("dbt_alias", default=False, required=False) - dbt_test = Parameter("dbt_test", default=False, required=False) + dbt_command = Parameter("dbt_command", default='run', required=False) ################# #################### # @@ -44,8 +44,10 @@ table_id=table_id, dbt_alias=dbt_alias, sync=True, - dbt_test=dbt_test + dbt_command=dbt_command ) + + run_dbt_model_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) run_dbt_model_flow.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) diff --git a/pipelines/utils/execute_dbt_model/tasks.py b/pipelines/utils/execute_dbt_model/tasks.py index 5909c052e..4ea467297 100644 --- a/pipelines/utils/execute_dbt_model/tasks.py +++ b/pipelines/utils/execute_dbt_model/tasks.py @@ -45,47 +45,40 @@ def run_dbt_model( dataset_id: str, table_id: str, dbt_alias: bool, - dbt_test: bool, + dbt_command: str, sync: bool = True, ): """ Run a DBT model. """ + if dbt_command not in ["run", "test", "run and test", "run/test"]: + raise ValueError(f"Invalid dbt_command: {dbt_command}") + if dbt_alias: table_id = f"{dataset_id}__{table_id}" - if dbt_test: - log(f"test --models {dataset_id}.{table_id}") - logs_dict = dbt_client.cli( - f"test --models {dataset_id}.{table_id}", + if 'run' in dbt_command: + logs_dict = dbt_client.cli( + f"run --models {dataset_id}.{table_id}", sync=sync, logs=True, ) for event in logs_dict["result"]["logs"]: - if event["levelname"] == "INFO" and "WARN" in event["message"]: - log(f"#####{event['levelname']}#####") + if event["levelname"] == "INFO": log(event["message"]) if event["levelname"] == "DEBUG": if "On model" in event["message"]: log(event["message"]) - for event in logs_dict["result"]["logs"]: - if event["levelname"] == "INFO" and "WARN" in event["message"]: - log(f"#####{event['levelname']}#####") - log(event["message"]) - if event["levelname"] == "DEBUG": - if "On model" in event["message"]: - log(event["message"]) - else: - logs_dict = dbt_client.cli( - f"run --models {dataset_id}.{table_id}", + if 'test' in dbt_command: + logs_dict = dbt_client.cli( + f"test --models {dataset_id}.{table_id}", sync=sync, logs=True, ) for event in logs_dict["result"]["logs"]: - if event["levelname"] == "INFO" and "WARN" in event["message"]: - log(f"#####{event['levelname']}#####") + if event["levelname"] == "INFO": log(event["message"]) if event["levelname"] == "DEBUG": if "On model" in event["message"]: - log(event["message"]) \ No newline at end of file + log(event["message"]) From 0aa3429c05f407ed2b8ab02f823a6dbb6ad1602a Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 27 Sep 2023 19:17:13 +0000 Subject: [PATCH 16/23] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pipelines/datasets/br_anatel_banda_larga_fixa/flows.py | 8 +------- pipelines/datasets/br_anatel_telefonia_movel/flows.py | 7 +------ pipelines/datasets/br_anp_precos_combustiveis/flows.py | 9 +-------- pipelines/datasets/br_b3_cotacoes/flows.py | 5 +---- pipelines/datasets/br_bcb_agencia/flows.py | 5 +---- pipelines/datasets/br_bcb_estban/flows.py | 5 +---- pipelines/datasets/br_me_comex_stat/flows.py | 8 ++------ pipelines/datasets/br_ons_avaliacao_operacao/flows.py | 5 ++--- pipelines/datasets/br_ons_estimativa_custos/flows.py | 8 +------- .../datasets/br_rj_isp_estatisticas_seguranca/flows.py | 6 +----- 10 files changed, 12 insertions(+), 54 deletions(-) diff --git a/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py b/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py index 932b0de72..45d405c70 100644 --- a/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py +++ b/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py @@ -11,16 +11,9 @@ from prefect.tasks.prefect import create_flow_run, wait_for_flow_run from pipelines.constants import constants - -from pipelines.utils.metadata.tasks import update_django_metadata -from pipelines.utils.constants import constants as utils_constants -from pipelines.utils.decorators import Flow -from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants - from pipelines.datasets.br_anatel_banda_larga_fixa.schedules import ( every_month_anatel_microdados, ) - from pipelines.datasets.br_anatel_banda_larga_fixa.tasks import ( get_today_date_atualizado, treatment, @@ -31,6 +24,7 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, diff --git a/pipelines/datasets/br_anatel_telefonia_movel/flows.py b/pipelines/datasets/br_anatel_telefonia_movel/flows.py index 04e520a5c..613efdc48 100644 --- a/pipelines/datasets/br_anatel_telefonia_movel/flows.py +++ b/pipelines/datasets/br_anatel_telefonia_movel/flows.py @@ -10,12 +10,6 @@ from prefect.storage import GCS from prefect.tasks.prefect import create_flow_run, wait_for_flow_run - -from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants -from pipelines.utils.metadata.tasks import update_django_metadata -from pipelines.utils.constants import constants as utils_constants - - from pipelines.constants import constants from pipelines.datasets.br_anatel_telefonia_movel.constants import ( constants as anatel_constants, @@ -31,6 +25,7 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, diff --git a/pipelines/datasets/br_anp_precos_combustiveis/flows.py b/pipelines/datasets/br_anp_precos_combustiveis/flows.py index 22b5a05cc..dfb561d4f 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/flows.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/flows.py @@ -11,20 +11,12 @@ from prefect.tasks.prefect import create_flow_run, wait_for_flow_run from pipelines.constants import constants - -from pipelines.utils.metadata.tasks import update_django_metadata -from pipelines.utils.constants import constants as utils_constants -from pipelines.utils.decorators import Flow -from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants -from pipelines.datasets.br_anp_precos_combustiveis.utils import download_files - from pipelines.datasets.br_anp_precos_combustiveis.constants import ( constants as anatel_constants, ) from pipelines.datasets.br_anp_precos_combustiveis.schedules import ( every_week_anp_microdados, ) - from pipelines.datasets.br_anp_precos_combustiveis.tasks import ( check_for_updates, data_max_bd_mais, @@ -36,6 +28,7 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, diff --git a/pipelines/datasets/br_b3_cotacoes/flows.py b/pipelines/datasets/br_b3_cotacoes/flows.py index 63982de33..0732e52b0 100644 --- a/pipelines/datasets/br_b3_cotacoes/flows.py +++ b/pipelines/datasets/br_b3_cotacoes/flows.py @@ -10,16 +10,13 @@ from prefect.storage import GCS from prefect.tasks.prefect import create_flow_run, wait_for_flow_run -from pipelines.utils.metadata.tasks import update_django_metadata - - - from pipelines.constants import constants from pipelines.datasets.br_b3_cotacoes.schedules import all_day_cotacoes from pipelines.datasets.br_b3_cotacoes.tasks import data_max_b3, tratamento from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, diff --git a/pipelines/datasets/br_bcb_agencia/flows.py b/pipelines/datasets/br_bcb_agencia/flows.py index d2a0c42d3..902d05ee3 100644 --- a/pipelines/datasets/br_bcb_agencia/flows.py +++ b/pipelines/datasets/br_bcb_agencia/flows.py @@ -11,13 +11,10 @@ from pipelines.datasets.br_bcb_agencia.schedules import every_month_agencia from pipelines.datasets.br_bcb_agencia.tasks import clean_data, download_data from pipelines.utils.constants import constants as utils_constants - -from pipelines.utils.metadata.tasks import update_django_metadata - from pipelines.utils.decorators import Flow - from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants from pipelines.utils.metadata.flows import update_django_metadata +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, diff --git a/pipelines/datasets/br_bcb_estban/flows.py b/pipelines/datasets/br_bcb_estban/flows.py index e9f2cbfef..d1e7d4511 100644 --- a/pipelines/datasets/br_bcb_estban/flows.py +++ b/pipelines/datasets/br_bcb_estban/flows.py @@ -26,12 +26,9 @@ ) from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow - -from pipelines.utils.metadata.tasks import update_django_metadata - - from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants from pipelines.utils.metadata.flows import update_django_metadata +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, diff --git a/pipelines/datasets/br_me_comex_stat/flows.py b/pipelines/datasets/br_me_comex_stat/flows.py index 97e6f1d21..ecd15f637 100644 --- a/pipelines/datasets/br_me_comex_stat/flows.py +++ b/pipelines/datasets/br_me_comex_stat/flows.py @@ -9,7 +9,7 @@ from prefect.run_configs import KubernetesRun from prefect.storage import GCS from prefect.tasks.prefect import create_flow_run, wait_for_flow_run -from pipelines.utils.metadata.tasks import update_django_metadata + from pipelines.constants import constants from pipelines.datasets.br_me_comex_stat.constants import constants as comex_constants from pipelines.datasets.br_me_comex_stat.schedules import ( @@ -25,12 +25,8 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( - - rename_current_flow_run_dataset_table, - get_current_flow_labels, - - create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, diff --git a/pipelines/datasets/br_ons_avaliacao_operacao/flows.py b/pipelines/datasets/br_ons_avaliacao_operacao/flows.py index 991ac64aa..79c2d3738 100644 --- a/pipelines/datasets/br_ons_avaliacao_operacao/flows.py +++ b/pipelines/datasets/br_ons_avaliacao_operacao/flows.py @@ -9,7 +9,7 @@ from prefect.run_configs import KubernetesRun from prefect.storage import GCS from prefect.tasks.prefect import create_flow_run, wait_for_flow_run -from pipelines.utils.metadata.tasks import update_django_metadata + from pipelines.constants import constants from pipelines.datasets.br_ons_avaliacao_operacao.constants import ( constants as ons_constants, @@ -25,13 +25,12 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, - get_current_flow_labels, rename_current_flow_run_dataset_table, update_django_metadata, - ) with Flow( diff --git a/pipelines/datasets/br_ons_estimativa_custos/flows.py b/pipelines/datasets/br_ons_estimativa_custos/flows.py index c06b95a7a..f01ed1f30 100644 --- a/pipelines/datasets/br_ons_estimativa_custos/flows.py +++ b/pipelines/datasets/br_ons_estimativa_custos/flows.py @@ -14,9 +14,6 @@ from pipelines.datasets.br_ons_estimativa_custos.constants import ( constants as ons_constants, ) - -from pipelines.utils.metadata.tasks import update_django_metadata - from pipelines.datasets.br_ons_estimativa_custos.schedules import ( schedule_br_ons_estimativa_custos_balanco_energia_subsistemas, schedule_br_ons_estimativa_custos_balanco_energia_subsistemas_dessem, @@ -24,18 +21,15 @@ schedule_br_ons_estimativa_custos_custo_marginal_operacao_semi_horario, ) from pipelines.datasets.br_ons_estimativa_custos.tasks import download_data, wrang_data - from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, - - get_current_flow_labels, rename_current_flow_run_dataset_table, update_django_metadata, - ) with Flow( diff --git a/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py b/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py index f47b7918d..1406ea903 100644 --- a/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py +++ b/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py @@ -10,10 +10,6 @@ from prefect.tasks.prefect import create_flow_run, wait_for_flow_run from pipelines.constants import constants - -from pipelines.utils.metadata.tasks import update_django_metadata -from pipelines.utils.constants import constants as utils_constants - from pipelines.datasets.br_rj_isp_estatisticas_seguranca.constants import ( constants as isp_constants, ) @@ -27,7 +23,6 @@ every_month_taxa_evolucao_mensal_municipio, every_month_taxa_evolucao_mensal_uf, ) - from pipelines.datasets.br_rj_isp_estatisticas_seguranca.tasks import ( clean_data, download_files, @@ -36,6 +31,7 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, From 8e5729847eb71deb919f976be20108d5beb1fa9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Wed, 27 Sep 2023 16:28:30 -0300 Subject: [PATCH 17/23] fix: changing billing project id --- pipelines/datasets/br_bcb_agencia/flows.py | 3 +-- pipelines/datasets/br_bcb_estban/flows.py | 5 ++--- pipelines/datasets/br_me_comex_stat/flows.py | 9 ++++----- 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/pipelines/datasets/br_bcb_agencia/flows.py b/pipelines/datasets/br_bcb_agencia/flows.py index 902d05ee3..abab85f35 100644 --- a/pipelines/datasets/br_bcb_agencia/flows.py +++ b/pipelines/datasets/br_bcb_agencia/flows.py @@ -13,7 +13,6 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants -from pipelines.utils.metadata.flows import update_django_metadata from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, @@ -101,7 +100,7 @@ bq_last_update=False, bq_table_last_year_month=True, api_mode="prod", - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", date_format="yy-mm", is_bd_pro=True, is_free=True, diff --git a/pipelines/datasets/br_bcb_estban/flows.py b/pipelines/datasets/br_bcb_estban/flows.py index d1e7d4511..bc1490e22 100644 --- a/pipelines/datasets/br_bcb_estban/flows.py +++ b/pipelines/datasets/br_bcb_estban/flows.py @@ -27,7 +27,6 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants -from pipelines.utils.metadata.flows import update_django_metadata from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, @@ -119,7 +118,7 @@ bq_last_update=False, bq_table_last_year_month=True, api_mode="prod", - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", date_format="yy-mm", is_bd_pro=True, is_free=True, @@ -218,7 +217,7 @@ bq_last_update=False, bq_table_last_year_month=True, api_mode="prod", - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", date_format="yy-mm", is_bd_pro=True, is_free=True, diff --git a/pipelines/datasets/br_me_comex_stat/flows.py b/pipelines/datasets/br_me_comex_stat/flows.py index ecd15f637..1b5895cbe 100644 --- a/pipelines/datasets/br_me_comex_stat/flows.py +++ b/pipelines/datasets/br_me_comex_stat/flows.py @@ -30,7 +30,6 @@ create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) with Flow( @@ -114,7 +113,7 @@ is_bd_pro=True, is_free=True, api_mode="prod", - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", date_format="yy-mm", time_delta=1, time_unit="months", @@ -205,7 +204,7 @@ is_bd_pro=True, is_free=True, api_mode="prod", - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", date_format="yy-mm", time_delta=1, time_unit="months", @@ -298,7 +297,7 @@ is_bd_pro=True, is_free=True, api_mode="prod", - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", date_format="yy-mm", time_delta=1, time_unit="months", @@ -389,7 +388,7 @@ is_bd_pro=True, is_free=True, api_mode="prod", - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", date_format="yy-mm", time_delta=1, time_unit="months", From a3016f763acf22df17d82ae7eb25dd49ff6ffb3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Wed, 27 Sep 2023 16:40:53 -0300 Subject: [PATCH 18/23] fix: changing billing project id again --- pipelines/datasets/br_anatel_banda_larga_fixa/flows.py | 1 - pipelines/datasets/br_anatel_telefonia_movel/flows.py | 1 - pipelines/datasets/br_anp_precos_combustiveis/flows.py | 1 - pipelines/datasets/br_b3_cotacoes/flows.py | 1 - pipelines/datasets/br_ons_avaliacao_operacao/flows.py | 1 - pipelines/datasets/br_ons_estimativa_custos/flows.py | 1 - 6 files changed, 6 deletions(-) diff --git a/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py b/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py index 45d405c70..adb9678dc 100644 --- a/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py +++ b/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py @@ -29,7 +29,6 @@ create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) with Flow( diff --git a/pipelines/datasets/br_anatel_telefonia_movel/flows.py b/pipelines/datasets/br_anatel_telefonia_movel/flows.py index 613efdc48..d021bba7c 100644 --- a/pipelines/datasets/br_anatel_telefonia_movel/flows.py +++ b/pipelines/datasets/br_anatel_telefonia_movel/flows.py @@ -30,7 +30,6 @@ create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) with Flow(name="br_anatel_telefonia_movel", code_owners=["tricktx"]) as br_anatel: diff --git a/pipelines/datasets/br_anp_precos_combustiveis/flows.py b/pipelines/datasets/br_anp_precos_combustiveis/flows.py index dfb561d4f..f8cc27c9d 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/flows.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/flows.py @@ -34,7 +34,6 @@ get_current_flow_labels, log_task, rename_current_flow_run_dataset_table, - update_django_metadata, ) with Flow( diff --git a/pipelines/datasets/br_b3_cotacoes/flows.py b/pipelines/datasets/br_b3_cotacoes/flows.py index 0732e52b0..22b2cbdd4 100644 --- a/pipelines/datasets/br_b3_cotacoes/flows.py +++ b/pipelines/datasets/br_b3_cotacoes/flows.py @@ -21,7 +21,6 @@ create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) from pipelines.utils.utils import log diff --git a/pipelines/datasets/br_ons_avaliacao_operacao/flows.py b/pipelines/datasets/br_ons_avaliacao_operacao/flows.py index 79c2d3738..9a8aea290 100644 --- a/pipelines/datasets/br_ons_avaliacao_operacao/flows.py +++ b/pipelines/datasets/br_ons_avaliacao_operacao/flows.py @@ -30,7 +30,6 @@ create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) with Flow( diff --git a/pipelines/datasets/br_ons_estimativa_custos/flows.py b/pipelines/datasets/br_ons_estimativa_custos/flows.py index f01ed1f30..cf1b30eec 100644 --- a/pipelines/datasets/br_ons_estimativa_custos/flows.py +++ b/pipelines/datasets/br_ons_estimativa_custos/flows.py @@ -29,7 +29,6 @@ create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) with Flow( From e56b915eb635bc5c723651fad5a52d0828a73843 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Wed, 27 Sep 2023 16:54:13 -0300 Subject: [PATCH 19/23] fix: changing billing project id again again --- pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py b/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py index 1406ea903..d70322502 100644 --- a/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py +++ b/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py @@ -36,7 +36,6 @@ create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) # ! Evolucao_mensal_cisp From efe8daa1e16e1d521a9da7a1ce4159547acc245f Mon Sep 17 00:00:00 2001 From: Laura Amaral Date: Thu, 28 Sep 2023 08:26:36 -0300 Subject: [PATCH 20/23] remove parse_logs --- pipelines/utils/execute_dbt_model/utils.py | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/pipelines/utils/execute_dbt_model/utils.py b/pipelines/utils/execute_dbt_model/utils.py index ed295eb8e..e2d7ae404 100644 --- a/pipelines/utils/execute_dbt_model/utils.py +++ b/pipelines/utils/execute_dbt_model/utils.py @@ -60,20 +60,4 @@ def generate_execute_dbt_model_schedules( # pylint: disable=too-many-arguments, parameter_defaults=parameter_defaults, ) ) - return clocks - -def parse_dbt_logs(logs_dict: dict, log_queries: bool = False): - """Parse dbt returned logs, to print only needed - pieces. - - Args: - logs_dict (dict): logs dict returned when running a DBT - command via DbtClient.cli() with argument logs = True - """ - for event in logs_dict["result"]["logs"]: - if event["levelname"] == "INFO" or event["levelname"] == "ERROR": - log(f"#####{event['levelname']}#####") - log(event["message"]) - if event["levelname"] == "DEBUG" and log_queries: - if "On model" in event["message"]: - log(event["message"]) \ No newline at end of file + return clocks \ No newline at end of file From 1af50a075886762782c6e7e9b4d5ed2c9ae91faf Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 28 Sep 2023 11:26:58 +0000 Subject: [PATCH 21/23] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pipelines/utils/execute_dbt_model/flows.py | 5 ++--- pipelines/utils/execute_dbt_model/tasks.py | 8 ++++---- pipelines/utils/execute_dbt_model/utils.py | 2 +- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/pipelines/utils/execute_dbt_model/flows.py b/pipelines/utils/execute_dbt_model/flows.py index 2ba8b5280..a36108ccb 100644 --- a/pipelines/utils/execute_dbt_model/flows.py +++ b/pipelines/utils/execute_dbt_model/flows.py @@ -19,7 +19,7 @@ table_id = Parameter("table_id") mode = Parameter("mode", default="dev", required=False) dbt_alias = Parameter("dbt_alias", default=False, required=False) - dbt_command = Parameter("dbt_command", default='run', required=False) + dbt_command = Parameter("dbt_command", default="run", required=False) ################# #################### # @@ -40,10 +40,9 @@ table_id=table_id, dbt_alias=dbt_alias, sync=True, - dbt_command=dbt_command + dbt_command=dbt_command, ) - run_dbt_model_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) run_dbt_model_flow.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) diff --git a/pipelines/utils/execute_dbt_model/tasks.py b/pipelines/utils/execute_dbt_model/tasks.py index df2a49f6a..1092232ba 100644 --- a/pipelines/utils/execute_dbt_model/tasks.py +++ b/pipelines/utils/execute_dbt_model/tasks.py @@ -8,10 +8,10 @@ from dbt_client import DbtClient from prefect import task -from pipelines.utils.utils import log from pipelines.constants import constants from pipelines.utils.execute_dbt_model.utils import get_dbt_client +from pipelines.utils.utils import log @task( @@ -55,7 +55,7 @@ def run_dbt_model( if dbt_alias: table_id = f"{dataset_id}__{table_id}" - if 'run' in dbt_command: + if "run" in dbt_command: logs_dict = dbt_client.cli( f"run --models {dataset_id}.{table_id}", sync=sync, @@ -68,8 +68,8 @@ def run_dbt_model( if "On model" in event["message"]: log(event["message"]) - if 'test' in dbt_command: - logs_dict = dbt_client.cli( + if "test" in dbt_command: + logs_dict = dbt_client.cli( f"test --models {dataset_id}.{table_id}", sync=sync, logs=True, diff --git a/pipelines/utils/execute_dbt_model/utils.py b/pipelines/utils/execute_dbt_model/utils.py index 20fe8d8f9..6962ef569 100644 --- a/pipelines/utils/execute_dbt_model/utils.py +++ b/pipelines/utils/execute_dbt_model/utils.py @@ -60,4 +60,4 @@ def generate_execute_dbt_model_schedules( # pylint: disable=too-many-arguments, parameter_defaults=parameter_defaults, ) ) - return clocks \ No newline at end of file + return clocks From 347971426c14c0c324808e5b53038e69475b61b1 Mon Sep 17 00:00:00 2001 From: Laura Amaral Date: Thu, 28 Sep 2023 08:27:34 -0300 Subject: [PATCH 22/23] remove useless import --- pipelines/utils/execute_dbt_model/utils.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pipelines/utils/execute_dbt_model/utils.py b/pipelines/utils/execute_dbt_model/utils.py index 20fe8d8f9..7ee5d72b9 100644 --- a/pipelines/utils/execute_dbt_model/utils.py +++ b/pipelines/utils/execute_dbt_model/utils.py @@ -8,8 +8,6 @@ from dbt_client import DbtClient from prefect.schedules.clocks import IntervalClock -from pipelines.utils.utils import log - def get_dbt_client( host: str = "dbt-rpc", From 28c505566dbcb58226089226580d4e3c58f7b60f Mon Sep 17 00:00:00 2001 From: tricktx Date: Thu, 28 Sep 2023 11:01:26 -0300 Subject: [PATCH 23/23] register now --- pipelines/datasets/br_anp_precos_combustiveis/flows.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pipelines/datasets/br_anp_precos_combustiveis/flows.py b/pipelines/datasets/br_anp_precos_combustiveis/flows.py index 1fb522cd5..1fda2e806 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/flows.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/flows.py @@ -11,15 +11,11 @@ from prefect.tasks.prefect import create_flow_run, wait_for_flow_run from pipelines.constants import constants -from pipelines.datasets.br_anp_precos_combustiveis.constants import ( - constants as anatel_constants, -) from pipelines.datasets.br_anp_precos_combustiveis.schedules import ( every_week_anp_microdados, ) from pipelines.datasets.br_anp_precos_combustiveis.tasks import ( check_for_updates, - data_max_bd_mais, data_max_bd_pro, download_and_transform, make_partitions,