diff --git a/pipelines/datasets/br_inmet_bdmep/flows.py b/pipelines/datasets/br_inmet_bdmep/flows.py index 8f21d36e8..e2589a6c8 100644 --- a/pipelines/datasets/br_inmet_bdmep/flows.py +++ b/pipelines/datasets/br_inmet_bdmep/flows.py @@ -12,10 +12,11 @@ from pipelines.constants import constants from pipelines.datasets.br_inmet_bdmep.schedules import every_month_inmet -from pipelines.datasets.br_inmet_bdmep.tasks import get_base_inmet +from pipelines.datasets.br_inmet_bdmep.tasks import get_base_inmet, get_today_date from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, @@ -29,7 +30,8 @@ # Parameters dataset_id = Parameter("dataset_id", default="br_inmet_bdmep", required=True) table_id = Parameter("table_id", default="microdados", required=True) - year = Parameter("year", default=2023, required=True) + year = Parameter("year", default=2023, required=False) + update_metadata = Parameter("update_metadata", default=True, required=False) materialization_mode = Parameter( "materialization_mode", default="prod", required=False @@ -80,6 +82,21 @@ wait_for_materialization.retry_delay = timedelta( seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + with case(update_metadata, True): + date = get_today_date() + update = update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_last_update=False, + bq_table_last_year_month=False, + _last_date=date, + api_mode="prod", + date_format="yy-mm", + is_bd_pro=False, + is_free=True, + upstream_tasks=[wait_for_materialization], + ) br_inmet.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_inmet.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) diff --git a/pipelines/datasets/br_inmet_bdmep/schedules.py b/pipelines/datasets/br_inmet_bdmep/schedules.py index ac5a6c842..4d14b73a2 100644 --- a/pipelines/datasets/br_inmet_bdmep/schedules.py +++ b/pipelines/datasets/br_inmet_bdmep/schedules.py @@ -77,6 +77,7 @@ from pipelines.constants import constants +d = datetime.today() every_month_inmet = Schedule( clocks=[ CronClock( @@ -88,9 +89,11 @@ parameter_defaults={ "dataset_id": "br_inmet_bdmep", "table_id": "microdados", - "materialization_mode": "dev", + "materialization_mode": "prod", "materialize_after_dump": True, "dbt_alias": True, + "year": d.strftime("%Y"), + "update_metadata": True, }, ), ], diff --git a/pipelines/datasets/br_inmet_bdmep/tasks.py b/pipelines/datasets/br_inmet_bdmep/tasks.py index d13785593..11af202a9 100644 --- a/pipelines/datasets/br_inmet_bdmep/tasks.py +++ b/pipelines/datasets/br_inmet_bdmep/tasks.py @@ -4,6 +4,7 @@ """ import glob import os +from datetime import datetime import numpy as np import pandas as pd @@ -29,8 +30,10 @@ def get_base_inmet(year: int) -> str: Retorna: - str: o caminho para o diretório que contém os arquivos CSV de saída. """ + log(f"Baixando os dados para o ano {year}.") download_inmet(year) + log("Dados baixados.") files = glob.glob(os.path.join(f"/tmp/data/input/{year}/", "*.CSV")) @@ -48,3 +51,10 @@ def get_base_inmet(year: int) -> str: base.to_csv(name, index=False) return "/tmp/data/output/microdados/" + + +@task +def get_today_date(): + d = datetime.today() + + return d.strftime("%Y-%m")