From 4ed659bee9f75918cf612950ddc940eb183ded7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Thu, 14 Sep 2023 16:54:53 -0300 Subject: [PATCH 01/15] feat: updating metadata --- pipelines/datasets/br_bcb_agencia/flows.py | 2 +- pipelines/datasets/br_bcb_estban/flows.py | 2 +- pipelines/datasets/br_me_comex_stat/flows.py | 106 ++---------------- .../br_ons_avaliacao_operacao/flows.py | 13 ++- .../br_ons_estimativa_custos/flows.py | 10 +- 5 files changed, 31 insertions(+), 102 deletions(-) diff --git a/pipelines/datasets/br_bcb_agencia/flows.py b/pipelines/datasets/br_bcb_agencia/flows.py index d0a19d686..51c59ac92 100644 --- a/pipelines/datasets/br_bcb_agencia/flows.py +++ b/pipelines/datasets/br_bcb_agencia/flows.py @@ -23,7 +23,7 @@ from pipelines.constants import constants from pipelines.utils.decorators import Flow from pipelines.utils.constants import constants as utils_constants -from pipelines.utils.metadata.flows import update_django_metadata +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, diff --git a/pipelines/datasets/br_bcb_estban/flows.py b/pipelines/datasets/br_bcb_estban/flows.py index 324307a75..0ba71346d 100644 --- a/pipelines/datasets/br_bcb_estban/flows.py +++ b/pipelines/datasets/br_bcb_estban/flows.py @@ -30,7 +30,7 @@ constants as br_bcb_estban_constants, ) from pipelines.utils.decorators import Flow -from pipelines.utils.metadata.flows import update_django_metadata +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, diff --git a/pipelines/datasets/br_me_comex_stat/flows.py b/pipelines/datasets/br_me_comex_stat/flows.py index 6c7475bce..5182da5d1 100644 --- a/pipelines/datasets/br_me_comex_stat/flows.py +++ b/pipelines/datasets/br_me_comex_stat/flows.py @@ -9,7 +9,7 @@ from prefect.run_configs import KubernetesRun from prefect.storage import GCS from prefect.tasks.prefect import create_flow_run, wait_for_flow_run - +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.constants import constants from pipelines.datasets.br_me_comex_stat.tasks import ( download_br_me_comex_stat, @@ -20,7 +20,6 @@ from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants from pipelines.utils.tasks import ( - update_django_metadata, rename_current_flow_run_dataset_table, get_current_flow_labels, create_table_and_upload_to_gcs, @@ -104,36 +103,6 @@ wait_for_materialization.retry_delay = timedelta( seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - - # materialize municipio_exportacao_atualizado - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) # coverage updater with case(update_metadata, True): update = update_django_metadata( @@ -142,22 +111,13 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, + is_bd_pro=True, + is_free=True, api_mode="prod", billing_project_id="basedosdados", date_format="yy-mm", - ) - - with case(update_metadata, True): - update_django_metadata( - dataset_id, - table_id + "_atualizado", - metadata_type="DateTimeRange", - bq_last_update=False, - bq_table_last_year_month=True, - api_mode="prod", - billing_project_id="basedosdados", - date_format="yy-mm", - upstream_tasks=[update], + time_delta=1, + time_unit="months", ) br_comex_municipio_exportacao.storage = GCS(constants.GCS_FLOWS_BUCKET.value) @@ -242,63 +202,15 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, + is_bd_pro=True, + is_free=True, api_mode="prod", billing_project_id="basedosdados", date_format="yy-mm", + time_delta=1, + time_unit="months", ) - # materialize municipio_importacao_atualizado - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): - update = update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - bq_table_last_year_month=True, - api_mode="prod", - billing_project_id="basedosdados", - date_format="yy-mm", - ) - with case(update_metadata, True): - update_django_metadata( - dataset_id, - table_id + "_atualizado", - metadata_type="DateTimeRange", - bq_last_update=False, - bq_table_last_year_month=True, - api_mode="prod", - billing_project_id="basedosdados", - date_format="yy-mm", - upstream_tasks=[update], - ) br_comex_municipio_importacao.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_comex_municipio_importacao.run_config = KubernetesRun( diff --git a/pipelines/datasets/br_ons_avaliacao_operacao/flows.py b/pipelines/datasets/br_ons_avaliacao_operacao/flows.py index 4144f267a..02f3332a0 100644 --- a/pipelines/datasets/br_ons_avaliacao_operacao/flows.py +++ b/pipelines/datasets/br_ons_avaliacao_operacao/flows.py @@ -9,7 +9,7 @@ from prefect.run_configs import KubernetesRun from prefect.storage import GCS from prefect.tasks.prefect import create_flow_run, wait_for_flow_run - +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.constants import constants from pipelines.datasets.br_ons_avaliacao_operacao.tasks import ( download_data, @@ -25,7 +25,6 @@ rename_current_flow_run_dataset_table, get_current_flow_labels, create_table_and_upload_to_gcs, - update_django_metadata, ) from pipelines.datasets.br_ons_avaliacao_operacao.schedules import ( @@ -113,6 +112,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -201,6 +202,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -292,6 +295,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -384,6 +389,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -477,6 +484,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], diff --git a/pipelines/datasets/br_ons_estimativa_custos/flows.py b/pipelines/datasets/br_ons_estimativa_custos/flows.py index 1fe8d4c32..f4db80454 100644 --- a/pipelines/datasets/br_ons_estimativa_custos/flows.py +++ b/pipelines/datasets/br_ons_estimativa_custos/flows.py @@ -18,6 +18,7 @@ from pipelines.datasets.br_ons_estimativa_custos.constants import ( constants as ons_constants, ) +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants @@ -25,7 +26,6 @@ rename_current_flow_run_dataset_table, get_current_flow_labels, create_table_and_upload_to_gcs, - update_django_metadata, ) from pipelines.datasets.br_ons_estimativa_custos.schedules import ( @@ -113,6 +113,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -206,6 +208,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -299,6 +303,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -393,6 +399,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], From b6b9efe1536af7372e9e7da59f60e024d416f093 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Thu, 14 Sep 2023 19:57:22 -0300 Subject: [PATCH 02/15] feat: removing _atualizado tables --- pipelines/datasets/br_me_comex_stat/flows.py | 91 ++------------------ 1 file changed, 8 insertions(+), 83 deletions(-) diff --git a/pipelines/datasets/br_me_comex_stat/flows.py b/pipelines/datasets/br_me_comex_stat/flows.py index 5182da5d1..8dfd18e80 100644 --- a/pipelines/datasets/br_me_comex_stat/flows.py +++ b/pipelines/datasets/br_me_comex_stat/flows.py @@ -276,35 +276,6 @@ run_name=f"Materialize {dataset_id}.{table_id}", ) - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - # materialize ncm_exportacao - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", - ) - wait_for_materialization = wait_for_flow_run( materialization_flow, stream_states=True, @@ -324,23 +295,15 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, + is_bd_pro=True, + is_free=True, api_mode="prod", billing_project_id="basedosdados", date_format="yy-mm", + time_delta=1, + time_unit="months", ) - with case(update_metadata, True): - update_django_metadata( - dataset_id, - table_id + "_atualizado", - metadata_type="DateTimeRange", - bq_last_update=False, - bq_table_last_year_month=True, - api_mode="prod", - billing_project_id="basedosdados", - date_format="yy-mm", - upstream_tasks=[update], - ) br_comex_ncm_exportacao.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_comex_ncm_exportacao.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) @@ -416,35 +379,6 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # materialize ncm_importacao_atualizado - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) with case(update_metadata, True): update = update_django_metadata( dataset_id, @@ -452,22 +386,13 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, + is_bd_pro=True, + is_free=True, api_mode="prod", billing_project_id="basedosdados", date_format="yy-mm", - ) - - with case(update_metadata, True): - update_django_metadata( - dataset_id, - table_id + "_atualizado", - metadata_type="DateTimeRange", - bq_last_update=False, - bq_table_last_year_month=True, - api_mode="prod", - billing_project_id="basedosdados", - date_format="yy-mm", - upstream_tasks=[update], + time_delta=1, + time_unit="months", ) br_comex_ncm_importacao.storage = GCS(constants.GCS_FLOWS_BUCKET.value) From a1e5256899349cea2a41cbe57dc28861c9a52b37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Thu, 14 Sep 2023 20:18:34 -0300 Subject: [PATCH 03/15] test: testing model registration --- pipelines/utils/metadata/tasks.py | 2 +- pipelines/utils/metadata/utils.py | 15 --------------- 2 files changed, 1 insertion(+), 16 deletions(-) diff --git a/pipelines/utils/metadata/tasks.py b/pipelines/utils/metadata/tasks.py index 72e22cccd..fc9b73058 100644 --- a/pipelines/utils/metadata/tasks.py +++ b/pipelines/utils/metadata/tasks.py @@ -44,7 +44,7 @@ def update_django_metadata( time_unit: str = "days", ): """ - Updates Django metadata. + Updates Django metadata. Version 1.2. Args: - `dataset_id (str):` The ID of the dataset. diff --git a/pipelines/utils/metadata/utils.py b/pipelines/utils/metadata/utils.py index 61f30024e..34fc84b55 100644 --- a/pipelines/utils/metadata/utils.py +++ b/pipelines/utils/metadata/utils.py @@ -474,21 +474,6 @@ def create_update( def parse_temporal_coverage(temporal_coverage): - padrao_ano = r"\d{4}\(\d{1,2}\)\d{4}" - padrao_mes = r"\d{4}-\d{2}\(\d{1,2}\)\d{4}-\d{2}" - padrao_semana = r"\d{4}-\d{2}-\d{2}\(\d{1,2}\)\d{4}-\d{2}-\d{2}" - padrao_dia = r"\d{4}-\d{2}-\d{2}\(\d{1,2}\)\d{4}-\d{2}-\d{2}" - - if ( - re.match(padrao_ano, temporal_coverage) - or re.match(padrao_mes, temporal_coverage) - or re.match(padrao_semana, temporal_coverage) - or re.match(padrao_dia, temporal_coverage) - ): - print("A data está no formato correto.") - else: - print("Aviso: A data não está no formato correto.") - # Extrai as informações de data e intervalo da string if "(" in temporal_coverage: start_str, interval_str, end_str = re.split(r"[(|)]", temporal_coverage) From feeb7dbe0e6a98faf849c14e1a4c841190742479 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Thu, 14 Sep 2023 21:24:23 -0300 Subject: [PATCH 04/15] fix: fix error message --- pipelines/utils/metadata/tasks.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/pipelines/utils/metadata/tasks.py b/pipelines/utils/metadata/tasks.py index fc9b73058..86f198ee4 100644 --- a/pipelines/utils/metadata/tasks.py +++ b/pipelines/utils/metadata/tasks.py @@ -107,17 +107,11 @@ def update_django_metadata( "weeks": "weeks", "days": "days", } - if not isinstance(_last_date, str): - raise ValueError("O parâmetro `last_date` deve ser do tipo string") - if time_unit not in unidades_permitidas: raise ValueError( f"Unidade temporal inválida. Escolha entre {', '.join(unidades_permitidas.keys())}" ) - if not isinstance(time_delta, int) or time_delta <= 0: - raise ValueError("Defasagem deve ser um número inteiro positivo") - if billing_project_id not in accepted_billing_project_id: raise Exception( f"The given billing_project_id: {billing_project_id} is invalid. The accepted valuesare {accepted_billing_project_id}" @@ -164,6 +158,8 @@ def update_django_metadata( api_mode=api_mode, ) elif is_bd_pro and is_free: + if not isinstance(time_delta, int) or time_delta <= 0: + raise ValueError("Defasagem deve ser um número inteiro positivo") last_date = extract_last_update( dataset_id, table_id, @@ -400,6 +396,9 @@ def update_django_metadata( api_mode=api_mode, ) else: + if not isinstance(_last_date, str): + raise ValueError("O parâmetro `last_date` deve ser do tipo string") + if is_free and not is_bd_pro: last_date = _last_date resource_to_temporal_coverage = parse_temporal_coverage(f"{_last_date}") From e000f3a895a7b0d3398b000d605e8aad890fa238 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Thu, 14 Sep 2023 22:21:41 -0300 Subject: [PATCH 05/15] feat: changing --- pipelines/datasets/br_bcb_agencia/flows.py | 2 +- pipelines/datasets/br_bcb_estban/flows.py | 4 ++-- pipelines/datasets/br_me_comex_stat/flows.py | 8 ++++---- pipelines/datasets/br_ons_avaliacao_operacao/flows.py | 10 +++++----- pipelines/datasets/br_ons_estimativa_custos/flows.py | 8 ++++---- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/pipelines/datasets/br_bcb_agencia/flows.py b/pipelines/datasets/br_bcb_agencia/flows.py index 51c59ac92..7a9ed006f 100644 --- a/pipelines/datasets/br_bcb_agencia/flows.py +++ b/pipelines/datasets/br_bcb_agencia/flows.py @@ -112,7 +112,7 @@ bq_last_update=False, bq_table_last_year_month=True, api_mode="prod", - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", date_format="yy-mm", is_bd_pro=True, is_free=True, diff --git a/pipelines/datasets/br_bcb_estban/flows.py b/pipelines/datasets/br_bcb_estban/flows.py index 0ba71346d..e5734105e 100644 --- a/pipelines/datasets/br_bcb_estban/flows.py +++ b/pipelines/datasets/br_bcb_estban/flows.py @@ -122,7 +122,7 @@ bq_last_update=False, bq_table_last_year_month=True, api_mode="prod", - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", date_format="yy-mm", is_bd_pro=True, is_free=True, @@ -221,7 +221,7 @@ bq_last_update=False, bq_table_last_year_month=True, api_mode="prod", - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", date_format="yy-mm", is_bd_pro=True, is_free=True, diff --git a/pipelines/datasets/br_me_comex_stat/flows.py b/pipelines/datasets/br_me_comex_stat/flows.py index 8dfd18e80..da1faa25a 100644 --- a/pipelines/datasets/br_me_comex_stat/flows.py +++ b/pipelines/datasets/br_me_comex_stat/flows.py @@ -114,7 +114,7 @@ is_bd_pro=True, is_free=True, api_mode="prod", - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", date_format="yy-mm", time_delta=1, time_unit="months", @@ -205,7 +205,7 @@ is_bd_pro=True, is_free=True, api_mode="prod", - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", date_format="yy-mm", time_delta=1, time_unit="months", @@ -298,7 +298,7 @@ is_bd_pro=True, is_free=True, api_mode="prod", - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", date_format="yy-mm", time_delta=1, time_unit="months", @@ -389,7 +389,7 @@ is_bd_pro=True, is_free=True, api_mode="prod", - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", date_format="yy-mm", time_delta=1, time_unit="months", diff --git a/pipelines/datasets/br_ons_avaliacao_operacao/flows.py b/pipelines/datasets/br_ons_avaliacao_operacao/flows.py index 02f3332a0..4cd3fd280 100644 --- a/pipelines/datasets/br_ons_avaliacao_operacao/flows.py +++ b/pipelines/datasets/br_ons_avaliacao_operacao/flows.py @@ -111,7 +111,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", is_bd_pro=True, is_free=False, api_mode="prod", @@ -201,7 +201,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", is_bd_pro=True, is_free=False, api_mode="prod", @@ -294,7 +294,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", is_bd_pro=True, is_free=False, api_mode="prod", @@ -388,7 +388,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", is_bd_pro=True, is_free=False, api_mode="prod", @@ -483,7 +483,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", is_bd_pro=True, is_free=False, api_mode="prod", diff --git a/pipelines/datasets/br_ons_estimativa_custos/flows.py b/pipelines/datasets/br_ons_estimativa_custos/flows.py index f4db80454..46bd0ad6f 100644 --- a/pipelines/datasets/br_ons_estimativa_custos/flows.py +++ b/pipelines/datasets/br_ons_estimativa_custos/flows.py @@ -112,7 +112,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", is_bd_pro=True, is_free=False, api_mode="prod", @@ -207,7 +207,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", is_bd_pro=True, is_free=False, api_mode="prod", @@ -302,7 +302,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", is_bd_pro=True, is_free=False, api_mode="prod", @@ -398,7 +398,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", is_bd_pro=True, is_free=False, api_mode="prod", From 41abe31aaaebd3427faea36564e3d39c27a9f834 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Fri, 15 Sep 2023 01:35:46 -0300 Subject: [PATCH 06/15] feat: registering more flows --- .../br_anatel_banda_larga_fixa/flows.py | 152 +++--------------- .../br_anatel_telefonia_movel/flows.py | 151 +++-------------- .../br_anp_precos_combustiveis/flows.py | 49 +----- pipelines/datasets/br_b3_cotacoes/flows.py | 5 +- .../br_rj_isp_estatisticas_seguranca/flows.py | 26 ++- 5 files changed, 85 insertions(+), 298 deletions(-) diff --git a/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py b/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py index 83cff365b..aa9c945ce 100644 --- a/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py +++ b/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py @@ -9,7 +9,7 @@ from prefect.storage import GCS from prefect.tasks.prefect import create_flow_run, wait_for_flow_run from pipelines.constants import constants -from pipelines.utils.tasks import update_django_metadata +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants @@ -110,44 +110,18 @@ wait_for_materialization.retry_delay = timedelta( seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - - # ! tabela bd pro - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[0] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[0]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[0] + "_atualizado", + table_id[0], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, @@ -192,43 +166,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[1] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[1]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[1] + "_atualizado", + table_id[1], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, @@ -275,43 +224,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[2] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[2]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[2] + "_atualizado", + table_id[2], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, @@ -357,44 +281,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[3] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[3]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[3] + "_atualizado", + table_id[3], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, diff --git a/pipelines/datasets/br_anatel_telefonia_movel/flows.py b/pipelines/datasets/br_anatel_telefonia_movel/flows.py index c2a8b75f1..94da6409e 100644 --- a/pipelines/datasets/br_anatel_telefonia_movel/flows.py +++ b/pipelines/datasets/br_anatel_telefonia_movel/flows.py @@ -10,7 +10,7 @@ from prefect.tasks.prefect import create_flow_run, wait_for_flow_run from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants -from pipelines.utils.tasks import update_django_metadata +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.constants import constants as utils_constants from pipelines.constants import constants from pipelines.datasets.br_anatel_telefonia_movel.constants import ( @@ -118,43 +118,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[0] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[0]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[0] + "_atualizado", + table_id[0], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, @@ -200,43 +175,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[1] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[1]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[1] + "_atualizado", + table_id[1], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, @@ -284,43 +234,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[2] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[2]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[2] + "_atualizado", + table_id[2], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, @@ -367,44 +292,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[3] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[3]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[3] + "_atualizado", + table_id[3], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, diff --git a/pipelines/datasets/br_anp_precos_combustiveis/flows.py b/pipelines/datasets/br_anp_precos_combustiveis/flows.py index ce81cd827..64e3c564d 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/flows.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/flows.py @@ -9,7 +9,7 @@ from prefect.storage import GCS from prefect.tasks.prefect import create_flow_run, wait_for_flow_run from pipelines.constants import constants -from pipelines.utils.tasks import update_django_metadata +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants @@ -101,50 +101,13 @@ table_id, metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=True, api_mode="prod", date_format="yy-mm-dd", - _last_date=get_date_max_mais, - upstream_tasks=[wait_upload_table], - ) - - # ! BD PRO - Atualizado - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}" "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - - with case(update_metadata, True): - update_django_metadata( - dataset_id, - table_id + "_atualizado", - metadata_type="DateTimeRange", - bq_last_update=False, - api_mode="prod", - date_format="yy-mm-dd", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="weeks", _last_date=get_date_max_pro, upstream_tasks=[wait_upload_table], ) diff --git a/pipelines/datasets/br_b3_cotacoes/flows.py b/pipelines/datasets/br_b3_cotacoes/flows.py index d3ededa47..8cbdd144d 100644 --- a/pipelines/datasets/br_b3_cotacoes/flows.py +++ b/pipelines/datasets/br_b3_cotacoes/flows.py @@ -8,7 +8,7 @@ from prefect.run_configs import KubernetesRun from prefect.storage import GCS from prefect.tasks.prefect import create_flow_run, wait_for_flow_run -from pipelines.utils.tasks import update_django_metadata +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.constants import constants from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow @@ -101,6 +101,9 @@ table_id, metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", _last_date=data_max, diff --git a/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py b/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py index 34a31534a..f9062fcd4 100644 --- a/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py +++ b/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py @@ -11,7 +11,7 @@ wait_for_flow_run, ) from pipelines.constants import constants -from pipelines.utils.tasks import update_django_metadata +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.constants import constants as utils_constants from pipelines.datasets.br_rj_isp_estatisticas_seguranca.tasks import ( download_files, @@ -123,7 +123,10 @@ dataset_id, table_id, metadata_type="DateTimeRange", + bq_table_last_year_month=False, bq_last_update=False, + is_bd_pro=False, + is_free=True, api_mode="prod", date_format="yy-mm", _last_date=date, @@ -214,7 +217,10 @@ dataset_id, table_id, metadata_type="DateTimeRange", + bq_table_last_year_month=False, bq_last_update=False, + is_bd_pro=False, + is_free=True, api_mode="prod", date_format="yy-mm", _last_date=date, @@ -307,7 +313,10 @@ dataset_id, table_id, metadata_type="DateTimeRange", + bq_table_last_year_month=False, bq_last_update=False, + is_bd_pro=False, + is_free=True, api_mode="prod", date_format="yy-mm", _last_date=date, @@ -401,7 +410,10 @@ dataset_id, table_id, metadata_type="DateTimeRange", + bq_table_last_year_month=False, bq_last_update=False, + is_bd_pro=False, + is_free=True, api_mode="prod", date_format="yy-mm", _last_date=date, @@ -495,7 +507,10 @@ dataset_id, table_id, metadata_type="DateTimeRange", + bq_table_last_year_month=False, bq_last_update=False, + is_bd_pro=False, + is_free=True, api_mode="prod", date_format="yy-mm", _last_date=date, @@ -590,7 +605,10 @@ dataset_id, table_id, metadata_type="DateTimeRange", + bq_table_last_year_month=False, bq_last_update=False, + is_bd_pro=False, + is_free=True, api_mode="prod", date_format="yy-mm", _last_date=date, @@ -681,7 +699,10 @@ dataset_id, table_id, metadata_type="DateTimeRange", + bq_table_last_year_month=False, bq_last_update=False, + is_bd_pro=False, + is_free=True, api_mode="prod", date_format="yy-mm", _last_date=date, @@ -772,7 +793,10 @@ dataset_id, table_id, metadata_type="DateTimeRange", + bq_table_last_year_month=False, bq_last_update=False, + is_bd_pro=False, + is_free=True, api_mode="prod", date_format="yy-mm", _last_date=date, From 795eefd05c85e8db38540d23a2d350860ac51f1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Fri, 15 Sep 2023 01:54:04 -0300 Subject: [PATCH 07/15] fix: fix log --- pipelines/utils/metadata/tasks.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pipelines/utils/metadata/tasks.py b/pipelines/utils/metadata/tasks.py index 86f198ee4..d440e1066 100644 --- a/pipelines/utils/metadata/tasks.py +++ b/pipelines/utils/metadata/tasks.py @@ -209,7 +209,7 @@ def update_django_metadata( ] = resource_to_temporal_coverage_free["endYear"] log( - f"Cobertura PRO ->> {_last_date} || Cobertura Grátis ->> {free_data}" + f"Cobertura PRO ->> {last_date} || Cobertura Grátis ->> {free_data}" ) # resource_to_temporal_coverage = parse_temporal_coverage(f"{last_date}") @@ -248,7 +248,7 @@ def update_django_metadata( date_format, billing_project_id=billing_project_id, ) - log(f"Cobertura PRO ->> {_last_date}") + log(f"Cobertura PRO ->> {last_date}") resource_to_temporal_coverage = parse_temporal_coverage(f"{last_date}") resource_to_temporal_coverage["coverage"] = ids.get("coverage_id_pro") @@ -340,7 +340,7 @@ def update_django_metadata( ] = resource_to_temporal_coverage_free["endYear"] log( - f"Cobertura PRO ->> {_last_date} || Cobertura Grátis ->> {free_data}" + f"Cobertura PRO ->> {last_date} || Cobertura Grátis ->> {free_data}" ) # resource_to_temporal_coverage = parse_temporal_coverage(f"{last_date}") @@ -379,7 +379,7 @@ def update_django_metadata( date_format, billing_project_id=billing_project_id, ) - log(f"Cobertura PRO ->> {_last_date}") + log(f"Cobertura PRO ->> {last_date}") resource_to_temporal_coverage = parse_temporal_coverage(f"{last_date}") resource_to_temporal_coverage["coverage"] = ids.get("coverage_id_pro") From a5084262433e36abde8411f5aedc2f54972b4879 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Fri, 15 Sep 2023 15:52:32 -0300 Subject: [PATCH 08/15] fx: small fix --- .../br_rj_isp_estatisticas_seguranca/flows.py | 224 +++++++++--------- 1 file changed, 112 insertions(+), 112 deletions(-) diff --git a/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py b/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py index f9062fcd4..8e0f7d7bd 100644 --- a/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py +++ b/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py @@ -117,20 +117,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_table_last_year_month=False, - bq_last_update=False, - is_bd_pro=False, - is_free=True, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) evolucao_mensal_cisp.storage = GCS(constants.GCS_FLOWS_BUCKET.value) evolucao_mensal_cisp.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) @@ -211,20 +211,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_table_last_year_month=False, - bq_last_update=False, - is_bd_pro=False, - is_free=True, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) taxa_evolucao_mensal_uf.storage = GCS(constants.GCS_FLOWS_BUCKET.value) taxa_evolucao_mensal_uf.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) @@ -307,20 +307,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_table_last_year_month=False, - bq_last_update=False, - is_bd_pro=False, - is_free=True, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) taxa_evolucao_mensal_municipio.storage = GCS(constants.GCS_FLOWS_BUCKET.value) @@ -404,20 +404,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_table_last_year_month=False, - bq_last_update=False, - is_bd_pro=False, - is_free=True, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) feminicidio_mensal_cisp.storage = GCS(constants.GCS_FLOWS_BUCKET.value) @@ -501,20 +501,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_table_last_year_month=False, - bq_last_update=False, - is_bd_pro=False, - is_free=True, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) evolucao_policial_morto_servico_mensal.storage = GCS(constants.GCS_FLOWS_BUCKET.value) evolucao_policial_morto_servico_mensal.run_config = KubernetesRun( @@ -599,20 +599,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_table_last_year_month=False, - bq_last_update=False, - is_bd_pro=False, - is_free=True, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) armas_apreendidas_mensal.storage = GCS(constants.GCS_FLOWS_BUCKET.value) armas_apreendidas_mensal.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) @@ -693,20 +693,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_table_last_year_month=False, - bq_last_update=False, - is_bd_pro=False, - is_free=True, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) evolucao_mensal_municipio.storage = GCS(constants.GCS_FLOWS_BUCKET.value) evolucao_mensal_municipio.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) @@ -787,20 +787,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_table_last_year_month=False, - bq_last_update=False, - is_bd_pro=False, - is_free=True, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) evolucao_mensal_uf.storage = GCS(constants.GCS_FLOWS_BUCKET.value) evolucao_mensal_uf.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) From ceab71e5b72e1ff6636134d86b26a1763a5f09a3 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 22 Sep 2023 20:54:31 +0000 Subject: [PATCH 09/15] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pipelines/utils/metadata/tasks.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pipelines/utils/metadata/tasks.py b/pipelines/utils/metadata/tasks.py index dbdda6c62..95670f891 100644 --- a/pipelines/utils/metadata/tasks.py +++ b/pipelines/utils/metadata/tasks.py @@ -108,11 +108,9 @@ def update_django_metadata( "days": "days", } - if not isinstance(_last_date, str) and _last_date is not None: raise ValueError("O parâmetro `last_date` deve ser uma string não nula") - if time_unit not in unidades_permitidas: raise ValueError( f"Unidade temporal inválida. Escolha entre {', '.join(unidades_permitidas.keys())}" From f06efd475681a0e2ebbbc30ebe897080105b4361 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Mon, 25 Sep 2023 12:09:09 -0300 Subject: [PATCH 10/15] feat: making requested changes --- pipelines/datasets/br_ons_avaliacao_operacao/flows.py | 10 +++++----- pipelines/datasets/br_ons_estimativa_custos/flows.py | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pipelines/datasets/br_ons_avaliacao_operacao/flows.py b/pipelines/datasets/br_ons_avaliacao_operacao/flows.py index 4cd3fd280..02f3332a0 100644 --- a/pipelines/datasets/br_ons_avaliacao_operacao/flows.py +++ b/pipelines/datasets/br_ons_avaliacao_operacao/flows.py @@ -111,7 +111,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", is_bd_pro=True, is_free=False, api_mode="prod", @@ -201,7 +201,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", is_bd_pro=True, is_free=False, api_mode="prod", @@ -294,7 +294,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", is_bd_pro=True, is_free=False, api_mode="prod", @@ -388,7 +388,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", is_bd_pro=True, is_free=False, api_mode="prod", @@ -483,7 +483,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", is_bd_pro=True, is_free=False, api_mode="prod", diff --git a/pipelines/datasets/br_ons_estimativa_custos/flows.py b/pipelines/datasets/br_ons_estimativa_custos/flows.py index 46bd0ad6f..f4db80454 100644 --- a/pipelines/datasets/br_ons_estimativa_custos/flows.py +++ b/pipelines/datasets/br_ons_estimativa_custos/flows.py @@ -112,7 +112,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", is_bd_pro=True, is_free=False, api_mode="prod", @@ -207,7 +207,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", is_bd_pro=True, is_free=False, api_mode="prod", @@ -302,7 +302,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", is_bd_pro=True, is_free=False, api_mode="prod", @@ -398,7 +398,7 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", is_bd_pro=True, is_free=False, api_mode="prod", From 75c5bb1eb940c4047d347019866ea62f2769519d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Mon, 25 Sep 2023 14:30:13 -0300 Subject: [PATCH 11/15] fix: fixing identation --- .../br_anp_precos_combustiveis/flows.py | 24 ++++--------------- 1 file changed, 5 insertions(+), 19 deletions(-) diff --git a/pipelines/datasets/br_anp_precos_combustiveis/flows.py b/pipelines/datasets/br_anp_precos_combustiveis/flows.py index 4a3c2a15e..c971d4dfa 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/flows.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/flows.py @@ -71,23 +71,6 @@ output_path = make_partitions(df=df, upstream_tasks=[df]) get_date_max_pro = data_max_bd_pro(df=df) - - with case(update_metadata, True): - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - bq_table_last_year_month=True, - api_mode="prod", - date_format="yy-mm-dd", - is_bd_pro=True, - is_free=True, - time_delta=6, - time_unit="weeks", - _last_date=get_date_max_pro, - upstream_tasks=[wait_upload_table], - # pylint: disable=C0103 wait_upload_table = create_table_and_upload_to_gcs( data_path=output_path, @@ -121,7 +104,6 @@ ) wait_for_materialization.max_retries = ( dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) wait_for_materialization.retry_delay = timedelta( seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value @@ -133,12 +115,16 @@ table_id, metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=True, api_mode="prod", date_format="yy-mm-dd", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="weeks", _last_date=get_date_max_pro, upstream_tasks=[wait_upload_table], ) - anp_microdados.storage = GCS(constants.GCS_FLOWS_BUCKET.value) anp_microdados.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) anp_microdados.schedule = every_week_anp_microdados From 0aa3429c05f407ed2b8ab02f823a6dbb6ad1602a Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 27 Sep 2023 19:17:13 +0000 Subject: [PATCH 12/15] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pipelines/datasets/br_anatel_banda_larga_fixa/flows.py | 8 +------- pipelines/datasets/br_anatel_telefonia_movel/flows.py | 7 +------ pipelines/datasets/br_anp_precos_combustiveis/flows.py | 9 +-------- pipelines/datasets/br_b3_cotacoes/flows.py | 5 +---- pipelines/datasets/br_bcb_agencia/flows.py | 5 +---- pipelines/datasets/br_bcb_estban/flows.py | 5 +---- pipelines/datasets/br_me_comex_stat/flows.py | 8 ++------ pipelines/datasets/br_ons_avaliacao_operacao/flows.py | 5 ++--- pipelines/datasets/br_ons_estimativa_custos/flows.py | 8 +------- .../datasets/br_rj_isp_estatisticas_seguranca/flows.py | 6 +----- 10 files changed, 12 insertions(+), 54 deletions(-) diff --git a/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py b/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py index 932b0de72..45d405c70 100644 --- a/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py +++ b/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py @@ -11,16 +11,9 @@ from prefect.tasks.prefect import create_flow_run, wait_for_flow_run from pipelines.constants import constants - -from pipelines.utils.metadata.tasks import update_django_metadata -from pipelines.utils.constants import constants as utils_constants -from pipelines.utils.decorators import Flow -from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants - from pipelines.datasets.br_anatel_banda_larga_fixa.schedules import ( every_month_anatel_microdados, ) - from pipelines.datasets.br_anatel_banda_larga_fixa.tasks import ( get_today_date_atualizado, treatment, @@ -31,6 +24,7 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, diff --git a/pipelines/datasets/br_anatel_telefonia_movel/flows.py b/pipelines/datasets/br_anatel_telefonia_movel/flows.py index 04e520a5c..613efdc48 100644 --- a/pipelines/datasets/br_anatel_telefonia_movel/flows.py +++ b/pipelines/datasets/br_anatel_telefonia_movel/flows.py @@ -10,12 +10,6 @@ from prefect.storage import GCS from prefect.tasks.prefect import create_flow_run, wait_for_flow_run - -from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants -from pipelines.utils.metadata.tasks import update_django_metadata -from pipelines.utils.constants import constants as utils_constants - - from pipelines.constants import constants from pipelines.datasets.br_anatel_telefonia_movel.constants import ( constants as anatel_constants, @@ -31,6 +25,7 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, diff --git a/pipelines/datasets/br_anp_precos_combustiveis/flows.py b/pipelines/datasets/br_anp_precos_combustiveis/flows.py index 22b5a05cc..dfb561d4f 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/flows.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/flows.py @@ -11,20 +11,12 @@ from prefect.tasks.prefect import create_flow_run, wait_for_flow_run from pipelines.constants import constants - -from pipelines.utils.metadata.tasks import update_django_metadata -from pipelines.utils.constants import constants as utils_constants -from pipelines.utils.decorators import Flow -from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants -from pipelines.datasets.br_anp_precos_combustiveis.utils import download_files - from pipelines.datasets.br_anp_precos_combustiveis.constants import ( constants as anatel_constants, ) from pipelines.datasets.br_anp_precos_combustiveis.schedules import ( every_week_anp_microdados, ) - from pipelines.datasets.br_anp_precos_combustiveis.tasks import ( check_for_updates, data_max_bd_mais, @@ -36,6 +28,7 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, diff --git a/pipelines/datasets/br_b3_cotacoes/flows.py b/pipelines/datasets/br_b3_cotacoes/flows.py index 63982de33..0732e52b0 100644 --- a/pipelines/datasets/br_b3_cotacoes/flows.py +++ b/pipelines/datasets/br_b3_cotacoes/flows.py @@ -10,16 +10,13 @@ from prefect.storage import GCS from prefect.tasks.prefect import create_flow_run, wait_for_flow_run -from pipelines.utils.metadata.tasks import update_django_metadata - - - from pipelines.constants import constants from pipelines.datasets.br_b3_cotacoes.schedules import all_day_cotacoes from pipelines.datasets.br_b3_cotacoes.tasks import data_max_b3, tratamento from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, diff --git a/pipelines/datasets/br_bcb_agencia/flows.py b/pipelines/datasets/br_bcb_agencia/flows.py index d2a0c42d3..902d05ee3 100644 --- a/pipelines/datasets/br_bcb_agencia/flows.py +++ b/pipelines/datasets/br_bcb_agencia/flows.py @@ -11,13 +11,10 @@ from pipelines.datasets.br_bcb_agencia.schedules import every_month_agencia from pipelines.datasets.br_bcb_agencia.tasks import clean_data, download_data from pipelines.utils.constants import constants as utils_constants - -from pipelines.utils.metadata.tasks import update_django_metadata - from pipelines.utils.decorators import Flow - from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants from pipelines.utils.metadata.flows import update_django_metadata +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, diff --git a/pipelines/datasets/br_bcb_estban/flows.py b/pipelines/datasets/br_bcb_estban/flows.py index e9f2cbfef..d1e7d4511 100644 --- a/pipelines/datasets/br_bcb_estban/flows.py +++ b/pipelines/datasets/br_bcb_estban/flows.py @@ -26,12 +26,9 @@ ) from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow - -from pipelines.utils.metadata.tasks import update_django_metadata - - from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants from pipelines.utils.metadata.flows import update_django_metadata +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, diff --git a/pipelines/datasets/br_me_comex_stat/flows.py b/pipelines/datasets/br_me_comex_stat/flows.py index 97e6f1d21..ecd15f637 100644 --- a/pipelines/datasets/br_me_comex_stat/flows.py +++ b/pipelines/datasets/br_me_comex_stat/flows.py @@ -9,7 +9,7 @@ from prefect.run_configs import KubernetesRun from prefect.storage import GCS from prefect.tasks.prefect import create_flow_run, wait_for_flow_run -from pipelines.utils.metadata.tasks import update_django_metadata + from pipelines.constants import constants from pipelines.datasets.br_me_comex_stat.constants import constants as comex_constants from pipelines.datasets.br_me_comex_stat.schedules import ( @@ -25,12 +25,8 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( - - rename_current_flow_run_dataset_table, - get_current_flow_labels, - - create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, diff --git a/pipelines/datasets/br_ons_avaliacao_operacao/flows.py b/pipelines/datasets/br_ons_avaliacao_operacao/flows.py index 991ac64aa..79c2d3738 100644 --- a/pipelines/datasets/br_ons_avaliacao_operacao/flows.py +++ b/pipelines/datasets/br_ons_avaliacao_operacao/flows.py @@ -9,7 +9,7 @@ from prefect.run_configs import KubernetesRun from prefect.storage import GCS from prefect.tasks.prefect import create_flow_run, wait_for_flow_run -from pipelines.utils.metadata.tasks import update_django_metadata + from pipelines.constants import constants from pipelines.datasets.br_ons_avaliacao_operacao.constants import ( constants as ons_constants, @@ -25,13 +25,12 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, - get_current_flow_labels, rename_current_flow_run_dataset_table, update_django_metadata, - ) with Flow( diff --git a/pipelines/datasets/br_ons_estimativa_custos/flows.py b/pipelines/datasets/br_ons_estimativa_custos/flows.py index c06b95a7a..f01ed1f30 100644 --- a/pipelines/datasets/br_ons_estimativa_custos/flows.py +++ b/pipelines/datasets/br_ons_estimativa_custos/flows.py @@ -14,9 +14,6 @@ from pipelines.datasets.br_ons_estimativa_custos.constants import ( constants as ons_constants, ) - -from pipelines.utils.metadata.tasks import update_django_metadata - from pipelines.datasets.br_ons_estimativa_custos.schedules import ( schedule_br_ons_estimativa_custos_balanco_energia_subsistemas, schedule_br_ons_estimativa_custos_balanco_energia_subsistemas_dessem, @@ -24,18 +21,15 @@ schedule_br_ons_estimativa_custos_custo_marginal_operacao_semi_horario, ) from pipelines.datasets.br_ons_estimativa_custos.tasks import download_data, wrang_data - from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, - - get_current_flow_labels, rename_current_flow_run_dataset_table, update_django_metadata, - ) with Flow( diff --git a/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py b/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py index f47b7918d..1406ea903 100644 --- a/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py +++ b/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py @@ -10,10 +10,6 @@ from prefect.tasks.prefect import create_flow_run, wait_for_flow_run from pipelines.constants import constants - -from pipelines.utils.metadata.tasks import update_django_metadata -from pipelines.utils.constants import constants as utils_constants - from pipelines.datasets.br_rj_isp_estatisticas_seguranca.constants import ( constants as isp_constants, ) @@ -27,7 +23,6 @@ every_month_taxa_evolucao_mensal_municipio, every_month_taxa_evolucao_mensal_uf, ) - from pipelines.datasets.br_rj_isp_estatisticas_seguranca.tasks import ( clean_data, download_files, @@ -36,6 +31,7 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, From 8e5729847eb71deb919f976be20108d5beb1fa9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Wed, 27 Sep 2023 16:28:30 -0300 Subject: [PATCH 13/15] fix: changing billing project id --- pipelines/datasets/br_bcb_agencia/flows.py | 3 +-- pipelines/datasets/br_bcb_estban/flows.py | 5 ++--- pipelines/datasets/br_me_comex_stat/flows.py | 9 ++++----- 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/pipelines/datasets/br_bcb_agencia/flows.py b/pipelines/datasets/br_bcb_agencia/flows.py index 902d05ee3..abab85f35 100644 --- a/pipelines/datasets/br_bcb_agencia/flows.py +++ b/pipelines/datasets/br_bcb_agencia/flows.py @@ -13,7 +13,6 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants -from pipelines.utils.metadata.flows import update_django_metadata from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, @@ -101,7 +100,7 @@ bq_last_update=False, bq_table_last_year_month=True, api_mode="prod", - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", date_format="yy-mm", is_bd_pro=True, is_free=True, diff --git a/pipelines/datasets/br_bcb_estban/flows.py b/pipelines/datasets/br_bcb_estban/flows.py index d1e7d4511..bc1490e22 100644 --- a/pipelines/datasets/br_bcb_estban/flows.py +++ b/pipelines/datasets/br_bcb_estban/flows.py @@ -27,7 +27,6 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants -from pipelines.utils.metadata.flows import update_django_metadata from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, @@ -119,7 +118,7 @@ bq_last_update=False, bq_table_last_year_month=True, api_mode="prod", - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", date_format="yy-mm", is_bd_pro=True, is_free=True, @@ -218,7 +217,7 @@ bq_last_update=False, bq_table_last_year_month=True, api_mode="prod", - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", date_format="yy-mm", is_bd_pro=True, is_free=True, diff --git a/pipelines/datasets/br_me_comex_stat/flows.py b/pipelines/datasets/br_me_comex_stat/flows.py index ecd15f637..1b5895cbe 100644 --- a/pipelines/datasets/br_me_comex_stat/flows.py +++ b/pipelines/datasets/br_me_comex_stat/flows.py @@ -30,7 +30,6 @@ create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) with Flow( @@ -114,7 +113,7 @@ is_bd_pro=True, is_free=True, api_mode="prod", - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", date_format="yy-mm", time_delta=1, time_unit="months", @@ -205,7 +204,7 @@ is_bd_pro=True, is_free=True, api_mode="prod", - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", date_format="yy-mm", time_delta=1, time_unit="months", @@ -298,7 +297,7 @@ is_bd_pro=True, is_free=True, api_mode="prod", - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", date_format="yy-mm", time_delta=1, time_unit="months", @@ -389,7 +388,7 @@ is_bd_pro=True, is_free=True, api_mode="prod", - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", date_format="yy-mm", time_delta=1, time_unit="months", From a3016f763acf22df17d82ae7eb25dd49ff6ffb3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Wed, 27 Sep 2023 16:40:53 -0300 Subject: [PATCH 14/15] fix: changing billing project id again --- pipelines/datasets/br_anatel_banda_larga_fixa/flows.py | 1 - pipelines/datasets/br_anatel_telefonia_movel/flows.py | 1 - pipelines/datasets/br_anp_precos_combustiveis/flows.py | 1 - pipelines/datasets/br_b3_cotacoes/flows.py | 1 - pipelines/datasets/br_ons_avaliacao_operacao/flows.py | 1 - pipelines/datasets/br_ons_estimativa_custos/flows.py | 1 - 6 files changed, 6 deletions(-) diff --git a/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py b/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py index 45d405c70..adb9678dc 100644 --- a/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py +++ b/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py @@ -29,7 +29,6 @@ create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) with Flow( diff --git a/pipelines/datasets/br_anatel_telefonia_movel/flows.py b/pipelines/datasets/br_anatel_telefonia_movel/flows.py index 613efdc48..d021bba7c 100644 --- a/pipelines/datasets/br_anatel_telefonia_movel/flows.py +++ b/pipelines/datasets/br_anatel_telefonia_movel/flows.py @@ -30,7 +30,6 @@ create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) with Flow(name="br_anatel_telefonia_movel", code_owners=["tricktx"]) as br_anatel: diff --git a/pipelines/datasets/br_anp_precos_combustiveis/flows.py b/pipelines/datasets/br_anp_precos_combustiveis/flows.py index dfb561d4f..f8cc27c9d 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/flows.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/flows.py @@ -34,7 +34,6 @@ get_current_flow_labels, log_task, rename_current_flow_run_dataset_table, - update_django_metadata, ) with Flow( diff --git a/pipelines/datasets/br_b3_cotacoes/flows.py b/pipelines/datasets/br_b3_cotacoes/flows.py index 0732e52b0..22b2cbdd4 100644 --- a/pipelines/datasets/br_b3_cotacoes/flows.py +++ b/pipelines/datasets/br_b3_cotacoes/flows.py @@ -21,7 +21,6 @@ create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) from pipelines.utils.utils import log diff --git a/pipelines/datasets/br_ons_avaliacao_operacao/flows.py b/pipelines/datasets/br_ons_avaliacao_operacao/flows.py index 79c2d3738..9a8aea290 100644 --- a/pipelines/datasets/br_ons_avaliacao_operacao/flows.py +++ b/pipelines/datasets/br_ons_avaliacao_operacao/flows.py @@ -30,7 +30,6 @@ create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) with Flow( diff --git a/pipelines/datasets/br_ons_estimativa_custos/flows.py b/pipelines/datasets/br_ons_estimativa_custos/flows.py index f01ed1f30..cf1b30eec 100644 --- a/pipelines/datasets/br_ons_estimativa_custos/flows.py +++ b/pipelines/datasets/br_ons_estimativa_custos/flows.py @@ -29,7 +29,6 @@ create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) with Flow( From e56b915eb635bc5c723651fad5a52d0828a73843 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Wed, 27 Sep 2023 16:54:13 -0300 Subject: [PATCH 15/15] fix: changing billing project id again again --- pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py b/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py index 1406ea903..d70322502 100644 --- a/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py +++ b/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py @@ -36,7 +36,6 @@ create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) # ! Evolucao_mensal_cisp