Skip to content

Commit

Permalink
adiciona flows atualizados de br_ibge_ipca; br_ibge_inpc; br_ibge_ipca15
Browse files Browse the repository at this point in the history
  • Loading branch information
folhesgabriel committed Oct 4, 2023
1 parent f927b66 commit 61e5911
Show file tree
Hide file tree
Showing 6 changed files with 385 additions and 56 deletions.
8 changes: 4 additions & 4 deletions pipelines/datasets/br_ibge_inpc/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

br_ibge_inpc_mes_categoria_brasil = deepcopy(flow_ibge_inflacao_mes_brasil)
br_ibge_inpc_mes_categoria_brasil.name = "br_ibge_inpc.mes_categoria_brasil"
br_ibge_inpc_mes_categoria_brasil.code_owners = ["lucas_cr"]
br_ibge_inpc_mes_categoria_brasil.code_owners = ["equipe_pipelines"]
br_ibge_inpc_mes_categoria_brasil.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_ibge_inpc_mes_categoria_brasil.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value
Expand All @@ -37,7 +37,7 @@

br_ibge_inpc_mes_categoria_rm = deepcopy(flow_ibge_inflacao_mes_rm)
br_ibge_inpc_mes_categoria_rm.name = "br_ibge_inpc.mes_categoria_rm"
br_ibge_inpc_mes_categoria_rm.code_owners = ["lucas_cr"]
br_ibge_inpc_mes_categoria_rm.code_owners = ["equipe_pipelines"]
br_ibge_inpc_mes_categoria_rm.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_ibge_inpc_mes_categoria_rm.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value
Expand All @@ -46,7 +46,7 @@

br_ibge_inpc_mes_categoria_municipio = deepcopy(flow_ibge_inflacao_mes_municipio)
br_ibge_inpc_mes_categoria_municipio.name = "br_ibge_inpc.mes_categoria_municipio"
br_ibge_inpc_mes_categoria_municipio.code_owners = ["lucas_cr"]
br_ibge_inpc_mes_categoria_municipio.code_owners = ["equipe_pipelines"]
br_ibge_inpc_mes_categoria_municipio.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_ibge_inpc_mes_categoria_municipio.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value
Expand All @@ -57,7 +57,7 @@

br_ibge_inpc_mes_brasil = deepcopy(flow_ibge_inflacao_mes_geral)
br_ibge_inpc_mes_brasil.name = "br_ibge_inpc.mes_brasil"
br_ibge_inpc_mes_brasil.code_owners = ["lucas_cr"]
br_ibge_inpc_mes_brasil.code_owners = ["equipe_pipelines"]
br_ibge_inpc_mes_brasil.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_ibge_inpc_mes_brasil.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
br_ibge_inpc_mes_brasil.schedule = br_ibge_inpc_mes_brasil_every_month
8 changes: 4 additions & 4 deletions pipelines/datasets/br_ibge_ipca/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

br_ibge_ipca_mes_categoria_brasil = deepcopy(flow_ibge_inflacao_mes_brasil)
br_ibge_ipca_mes_categoria_brasil.name = "br_ibge_ipca.mes_categoria_brasil"
br_ibge_ipca_mes_categoria_brasil.code_owners = ["lucas_cr"]
br_ibge_ipca_mes_categoria_brasil.code_owners = ["equipe_pipelines"]
br_ibge_ipca_mes_categoria_brasil.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_ibge_ipca_mes_categoria_brasil.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value
Expand All @@ -37,7 +37,7 @@

br_ibge_ipca_mes_categoria_rm = deepcopy(flow_ibge_inflacao_mes_rm)
br_ibge_ipca_mes_categoria_rm.name = "br_ibge_ipca.mes_categoria_rm"
br_ibge_ipca_mes_categoria_rm.code_owners = ["lucas_cr"]
br_ibge_ipca_mes_categoria_rm.code_owners = ["equipe_pipelines"]
br_ibge_ipca_mes_categoria_rm.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_ibge_ipca_mes_categoria_rm.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value
Expand All @@ -46,7 +46,7 @@

br_ibge_ipca_mes_categoria_municipio = deepcopy(flow_ibge_inflacao_mes_municipio)
br_ibge_ipca_mes_categoria_municipio.name = "br_ibge_ipca.mes_categoria_municipio"
br_ibge_ipca_mes_categoria_municipio.code_owners = ["lucas_cr"]
br_ibge_ipca_mes_categoria_municipio.code_owners = ["equipe_pipelines"]
br_ibge_ipca_mes_categoria_municipio.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_ibge_ipca_mes_categoria_municipio.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value
Expand All @@ -57,7 +57,7 @@

br_ibge_ipca_mes_brasil = deepcopy(flow_ibge_inflacao_mes_geral)
br_ibge_ipca_mes_brasil.name = "br_ibge_ipca.mes_brasil"
br_ibge_ipca_mes_brasil.code_owners = ["lucas_cr"]
br_ibge_ipca_mes_brasil.code_owners = ["equipe_pipelines"]
br_ibge_ipca_mes_brasil.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_ibge_ipca_mes_brasil.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
br_ibge_ipca_mes_brasil.schedule = br_ibge_ipca_mes_brasil_every_month
8 changes: 4 additions & 4 deletions pipelines/datasets/br_ibge_ipca15/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

br_ibge_ipca15_mes_categoria_brasil = deepcopy(flow_ibge_inflacao_mes_brasil)
br_ibge_ipca15_mes_categoria_brasil.name = "br_ibge_ipca15.mes_categoria_brasil"
br_ibge_ipca15_mes_categoria_brasil.code_owners = ["lucas_cr"]
br_ibge_ipca15_mes_categoria_brasil.code_owners = ["equipe_pipelines"]
br_ibge_ipca15_mes_categoria_brasil.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_ibge_ipca15_mes_categoria_brasil.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value
Expand All @@ -37,7 +37,7 @@

br_ibge_ipca15_mes_categoria_rm = deepcopy(flow_ibge_inflacao_mes_rm)
br_ibge_ipca15_mes_categoria_rm.name = "br_ibge_ipca15.mes_categoria_rm"
br_ibge_ipca15_mes_categoria_rm.code_owners = ["lucas_cr"]
br_ibge_ipca15_mes_categoria_rm.code_owners = ["equipe_pipelines"]
br_ibge_ipca15_mes_categoria_rm.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_ibge_ipca15_mes_categoria_rm.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value
Expand All @@ -46,7 +46,7 @@

br_ibge_ipca15_mes_categoria_municipio = deepcopy(flow_ibge_inflacao_mes_municipio)
br_ibge_ipca15_mes_categoria_municipio.name = "br_ibge_ipca15.mes_categoria_municipio"
br_ibge_ipca15_mes_categoria_municipio.code_owners = ["lucas_cr"]
br_ibge_ipca15_mes_categoria_municipio.code_owners = ["equipe_pipelines"]
br_ibge_ipca15_mes_categoria_municipio.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_ibge_ipca15_mes_categoria_municipio.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value
Expand All @@ -57,7 +57,7 @@

br_ibge_ipca15_mes_brasil = deepcopy(flow_ibge_inflacao_mes_geral)
br_ibge_ipca15_mes_brasil.name = "br_ibge_ipca15.mes_brasil"
br_ibge_ipca15_mes_brasil.code_owners = ["lucas_cr"]
br_ibge_ipca15_mes_brasil.code_owners = ["equipe_pipelines"]
br_ibge_ipca15_mes_brasil.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_ibge_ipca15_mes_brasil.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
br_ibge_ipca15_mes_brasil.schedule = br_ibge_ipca15_mes_brasil_every_month
124 changes: 111 additions & 13 deletions pipelines/utils/crawler_ibge_inflacao/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from pipelines.constants import constants
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.crawler_ibge_inflacao.tasks import (
check_for_updates,
clean_mes_brasil,
clean_mes_geral,
clean_mes_municipio,
Expand All @@ -21,18 +22,17 @@
)
from pipelines.utils.decorators import Flow
from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants
from pipelines.utils.metadata.tasks import update_django_metadata
from pipelines.utils.tasks import (
create_table_and_upload_to_gcs,
get_current_flow_labels,
get_temporal_coverage,
rename_current_flow_run_dataset_table,
update_metadata,
)

with Flow(
name="BD Template - IBGE Inflação: mes_brasil"
) as flow_ibge_inflacao_mes_brasil:
# Parameters
INDICE = Parameter("indice")
FOLDER = Parameter("folder")
dataset_id = Parameter("dataset_id")
Expand All @@ -44,12 +44,21 @@
"materialize after dump", default=True, required=False
)
dbt_alias = Parameter("dbt_alias", default=False, required=False)
update_metadata = Parameter("update_metadata", default=False, required=False)

rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
)
# USA A FUNÇAO EXTRACT_LAST_DATE ENTAO QUEBRA SE A BSE NAO EXISTIR

needs_to_update = check_for_updates(
indice=INDICE, dataset_id=dataset_id, table_id=table_id
)

was_downloaded = crawler(indice=INDICE, folder=FOLDER)
with case(needs_to_update[0], True):
was_downloaded = crawler(
indice=INDICE, folder=FOLDER, upstream_tasks=[needs_to_update]
)
# pylint: disable=E1123

with case(was_downloaded, True):
Expand All @@ -63,13 +72,13 @@
wait=filepath,
)

temporal_coverage = get_temporal_coverage(
filepath=filepath,
date_cols=["ano", "mes"],
time_unit="month",
interval="1",
upstream_tasks=[wait_upload_table],
)
# temporal_coverage = get_temporal_coverage(
# filepath=filepath,
# date_cols=["ano", "mes"],
# time_unit="month",
# interval="1",
# upstream_tasks=[wait_upload_table],
# )

# wait_update_metadata = update_metadata(
# dataset_id=dataset_id,
Expand Down Expand Up @@ -113,6 +122,22 @@
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)
with case(update_metadata, True):
update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
# needs_to_update[1] é a data (Y%-m%) mais recente
_last_date=needs_to_update[1],
bq_last_update=False,
api_mode="prod",
date_format="yy-mm",
is_bd_pro=True,
is_free=True,
time_delta=6,
time_unit="months",
upstream_tasks=[wait_for_materialization],
)

flow_ibge_inflacao_mes_brasil.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
flow_ibge_inflacao_mes_brasil.run_config = KubernetesRun(
Expand All @@ -133,12 +158,20 @@
"materialize after dump", default=True, required=False
)
dbt_alias = Parameter("dbt_alias", default=False, required=False)
update_metadata = Parameter("update_metadata", default=False, required=False)

rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
)

was_downloaded = crawler(indice=INDICE, folder=FOLDER)
needs_to_update = check_for_updates(
indice=INDICE, dataset_id=dataset_id, table_id=table_id
)

with case(needs_to_update[0], True):
was_downloaded = crawler(
indice=INDICE, folder=FOLDER, upstream_tasks=[needs_to_update]
)
# pylint: disable=E1123

with case(was_downloaded, True):
Expand Down Expand Up @@ -203,6 +236,22 @@
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)
with case(update_metadata, True):
update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
# needs_to_update[1] é a data (Y%-m%) mais recente
_last_date=needs_to_update[1],
bq_last_update=False,
api_mode="prod",
date_format="yy-mm",
is_bd_pro=True,
is_free=True,
time_delta=6,
time_unit="months",
upstream_tasks=[wait_for_materialization],
)

flow_ibge_inflacao_mes_rm.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
flow_ibge_inflacao_mes_rm.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
Expand All @@ -223,12 +272,20 @@
"materialize after dump", default=True, required=False
)
dbt_alias = Parameter("dbt_alias", default=False, required=False)
update_metadata = Parameter("update_metadata", default=False, required=False)

rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
)

was_downloaded = crawler(indice=INDICE, folder=FOLDER)
needs_to_update = check_for_updates(
indice=INDICE, dataset_id=dataset_id, table_id=table_id
)

with case(needs_to_update[0], True):
was_downloaded = crawler(
indice=INDICE, folder=FOLDER, upstream_tasks=[needs_to_update]
)
# pylint: disable=E1123

with case(was_downloaded, True):
Expand Down Expand Up @@ -293,6 +350,22 @@
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)
with case(update_metadata, True):
update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
# needs_to_update[1] é a data (Y%-m%) mais recente
_last_date=needs_to_update[1],
bq_last_update=False,
api_mode="prod",
date_format="yy-mm",
is_bd_pro=True,
is_free=True,
time_delta=6,
time_unit="months",
upstream_tasks=[wait_for_materialization],
)


flow_ibge_inflacao_mes_municipio.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
Expand All @@ -315,12 +388,20 @@
)

dbt_alias = Parameter("dbt_alias", default=False, required=False)
update_metadata = Parameter("update_metadata", default=False, required=False)

rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
)

was_downloaded = crawler(indice=INDICE, folder=FOLDER)
needs_to_update = check_for_updates(
indice=INDICE, dataset_id=dataset_id, table_id=table_id
)

with case(needs_to_update[0], True):
was_downloaded = crawler(
indice=INDICE, folder=FOLDER, upstream_tasks=[needs_to_update]
)
# pylint: disable=E1123

with case(was_downloaded, True):
Expand Down Expand Up @@ -386,6 +467,23 @@
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

with case(update_metadata, True):
update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
# needs_to_update[1] é a data (Y%-m%) mais recente
_last_date=needs_to_update[1],
bq_last_update=False,
api_mode="prod",
date_format="yy-mm",
is_bd_pro=True,
is_free=True,
time_delta=6,
time_unit="months",
upstream_tasks=[wait_for_materialization],
)

flow_ibge_inflacao_mes_geral.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
flow_ibge_inflacao_mes_geral.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value
Expand Down
Loading

0 comments on commit 61e5911

Please sign in to comment.