Skip to content

Commit

Permalink
fix: fixing br_inmet_bdmep pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
arthurfg committed Sep 29, 2023
1 parent 54c25ec commit cb138cc
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 3 deletions.
21 changes: 19 additions & 2 deletions pipelines/datasets/br_inmet_bdmep/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@

from pipelines.constants import constants
from pipelines.datasets.br_inmet_bdmep.schedules import every_month_inmet
from pipelines.datasets.br_inmet_bdmep.tasks import get_base_inmet
from pipelines.datasets.br_inmet_bdmep.tasks import get_base_inmet, get_today_date
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants
from pipelines.utils.metadata.tasks import update_django_metadata
from pipelines.utils.tasks import (
create_table_and_upload_to_gcs,
get_current_flow_labels,
Expand All @@ -29,7 +30,8 @@
# Parameters
dataset_id = Parameter("dataset_id", default="br_inmet_bdmep", required=True)
table_id = Parameter("table_id", default="microdados", required=True)
year = Parameter("year", default=2023, required=True)
year = Parameter("year", default=2023, required=False)
update_metadata = Parameter("update_metadata", default=True, required=False)

materialization_mode = Parameter(
"materialization_mode", default="prod", required=False
Expand Down Expand Up @@ -80,6 +82,21 @@
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)
with case(update_metadata, True):
date = get_today_date()
update = update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
bq_last_update=False,
bq_table_last_year_month=False,
_last_date=date,
api_mode="prod",
date_format="yy-mm",
is_bd_pro=False,
is_free=True,
upstream_tasks=[wait_for_materialization],
)

br_inmet.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_inmet.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
Expand Down
5 changes: 4 additions & 1 deletion pipelines/datasets/br_inmet_bdmep/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@

from pipelines.constants import constants

d = datetime.today()
every_month_inmet = Schedule(
clocks=[
CronClock(
Expand All @@ -88,9 +89,11 @@
parameter_defaults={
"dataset_id": "br_inmet_bdmep",
"table_id": "microdados",
"materialization_mode": "dev",
"materialization_mode": "prod",
"materialize_after_dump": True,
"dbt_alias": True,
"year": d.strftime("%Y"),
"update_metadata": True,
},
),
],
Expand Down
10 changes: 10 additions & 0 deletions pipelines/datasets/br_inmet_bdmep/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""
import glob
import os
from datetime import datetime

import numpy as np
import pandas as pd
Expand All @@ -29,8 +30,10 @@ def get_base_inmet(year: int) -> str:
Retorna:
- str: o caminho para o diretório que contém os arquivos CSV de saída.
"""
log(f"Baixando os dados para o ano {year}.")

download_inmet(year)
log("Dados baixados.")

files = glob.glob(os.path.join(f"/tmp/data/input/{year}/", "*.CSV"))

Expand All @@ -48,3 +51,10 @@ def get_base_inmet(year: int) -> str:
base.to_csv(name, index=False)

return "/tmp/data/output/microdados/"


@task
def get_today_date():
d = datetime.today()

return d.strftime("%Y-%m")

0 comments on commit cb138cc

Please sign in to comment.