From 61e59115ed93310a2af8ddebe9d1e18a3761b94a Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Wed, 4 Oct 2023 16:20:46 -0300 Subject: [PATCH] adiciona flows atualizados de br_ibge_ipca; br_ibge_inpc; br_ibge_ipca15 --- pipelines/datasets/br_ibge_inpc/flows.py | 8 +- pipelines/datasets/br_ibge_ipca/flows.py | 8 +- pipelines/datasets/br_ibge_ipca15/flows.py | 8 +- .../utils/crawler_ibge_inflacao/flows.py | 124 ++++++++- .../utils/crawler_ibge_inflacao/tasks.py | 251 +++++++++++++++--- .../utils/crawler_ibge_inflacao/utils.py | 42 +++ 6 files changed, 385 insertions(+), 56 deletions(-) diff --git a/pipelines/datasets/br_ibge_inpc/flows.py b/pipelines/datasets/br_ibge_inpc/flows.py index 216acd39b..6759d2102 100644 --- a/pipelines/datasets/br_ibge_inpc/flows.py +++ b/pipelines/datasets/br_ibge_inpc/flows.py @@ -25,7 +25,7 @@ br_ibge_inpc_mes_categoria_brasil = deepcopy(flow_ibge_inflacao_mes_brasil) br_ibge_inpc_mes_categoria_brasil.name = "br_ibge_inpc.mes_categoria_brasil" -br_ibge_inpc_mes_categoria_brasil.code_owners = ["lucas_cr"] +br_ibge_inpc_mes_categoria_brasil.code_owners = ["equipe_pipelines"] br_ibge_inpc_mes_categoria_brasil.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_ibge_inpc_mes_categoria_brasil.run_config = KubernetesRun( image=constants.DOCKER_IMAGE.value @@ -37,7 +37,7 @@ br_ibge_inpc_mes_categoria_rm = deepcopy(flow_ibge_inflacao_mes_rm) br_ibge_inpc_mes_categoria_rm.name = "br_ibge_inpc.mes_categoria_rm" -br_ibge_inpc_mes_categoria_rm.code_owners = ["lucas_cr"] +br_ibge_inpc_mes_categoria_rm.code_owners = ["equipe_pipelines"] br_ibge_inpc_mes_categoria_rm.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_ibge_inpc_mes_categoria_rm.run_config = KubernetesRun( image=constants.DOCKER_IMAGE.value @@ -46,7 +46,7 @@ br_ibge_inpc_mes_categoria_municipio = deepcopy(flow_ibge_inflacao_mes_municipio) br_ibge_inpc_mes_categoria_municipio.name = "br_ibge_inpc.mes_categoria_municipio" -br_ibge_inpc_mes_categoria_municipio.code_owners = ["lucas_cr"] +br_ibge_inpc_mes_categoria_municipio.code_owners = ["equipe_pipelines"] br_ibge_inpc_mes_categoria_municipio.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_ibge_inpc_mes_categoria_municipio.run_config = KubernetesRun( image=constants.DOCKER_IMAGE.value @@ -57,7 +57,7 @@ br_ibge_inpc_mes_brasil = deepcopy(flow_ibge_inflacao_mes_geral) br_ibge_inpc_mes_brasil.name = "br_ibge_inpc.mes_brasil" -br_ibge_inpc_mes_brasil.code_owners = ["lucas_cr"] +br_ibge_inpc_mes_brasil.code_owners = ["equipe_pipelines"] br_ibge_inpc_mes_brasil.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_ibge_inpc_mes_brasil.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) br_ibge_inpc_mes_brasil.schedule = br_ibge_inpc_mes_brasil_every_month diff --git a/pipelines/datasets/br_ibge_ipca/flows.py b/pipelines/datasets/br_ibge_ipca/flows.py index 053710525..9d7f6ac3a 100644 --- a/pipelines/datasets/br_ibge_ipca/flows.py +++ b/pipelines/datasets/br_ibge_ipca/flows.py @@ -25,7 +25,7 @@ br_ibge_ipca_mes_categoria_brasil = deepcopy(flow_ibge_inflacao_mes_brasil) br_ibge_ipca_mes_categoria_brasil.name = "br_ibge_ipca.mes_categoria_brasil" -br_ibge_ipca_mes_categoria_brasil.code_owners = ["lucas_cr"] +br_ibge_ipca_mes_categoria_brasil.code_owners = ["equipe_pipelines"] br_ibge_ipca_mes_categoria_brasil.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_ibge_ipca_mes_categoria_brasil.run_config = KubernetesRun( image=constants.DOCKER_IMAGE.value @@ -37,7 +37,7 @@ br_ibge_ipca_mes_categoria_rm = deepcopy(flow_ibge_inflacao_mes_rm) br_ibge_ipca_mes_categoria_rm.name = "br_ibge_ipca.mes_categoria_rm" -br_ibge_ipca_mes_categoria_rm.code_owners = ["lucas_cr"] +br_ibge_ipca_mes_categoria_rm.code_owners = ["equipe_pipelines"] br_ibge_ipca_mes_categoria_rm.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_ibge_ipca_mes_categoria_rm.run_config = KubernetesRun( image=constants.DOCKER_IMAGE.value @@ -46,7 +46,7 @@ br_ibge_ipca_mes_categoria_municipio = deepcopy(flow_ibge_inflacao_mes_municipio) br_ibge_ipca_mes_categoria_municipio.name = "br_ibge_ipca.mes_categoria_municipio" -br_ibge_ipca_mes_categoria_municipio.code_owners = ["lucas_cr"] +br_ibge_ipca_mes_categoria_municipio.code_owners = ["equipe_pipelines"] br_ibge_ipca_mes_categoria_municipio.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_ibge_ipca_mes_categoria_municipio.run_config = KubernetesRun( image=constants.DOCKER_IMAGE.value @@ -57,7 +57,7 @@ br_ibge_ipca_mes_brasil = deepcopy(flow_ibge_inflacao_mes_geral) br_ibge_ipca_mes_brasil.name = "br_ibge_ipca.mes_brasil" -br_ibge_ipca_mes_brasil.code_owners = ["lucas_cr"] +br_ibge_ipca_mes_brasil.code_owners = ["equipe_pipelines"] br_ibge_ipca_mes_brasil.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_ibge_ipca_mes_brasil.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) br_ibge_ipca_mes_brasil.schedule = br_ibge_ipca_mes_brasil_every_month diff --git a/pipelines/datasets/br_ibge_ipca15/flows.py b/pipelines/datasets/br_ibge_ipca15/flows.py index 77fe8c539..a7a077306 100644 --- a/pipelines/datasets/br_ibge_ipca15/flows.py +++ b/pipelines/datasets/br_ibge_ipca15/flows.py @@ -25,7 +25,7 @@ br_ibge_ipca15_mes_categoria_brasil = deepcopy(flow_ibge_inflacao_mes_brasil) br_ibge_ipca15_mes_categoria_brasil.name = "br_ibge_ipca15.mes_categoria_brasil" -br_ibge_ipca15_mes_categoria_brasil.code_owners = ["lucas_cr"] +br_ibge_ipca15_mes_categoria_brasil.code_owners = ["equipe_pipelines"] br_ibge_ipca15_mes_categoria_brasil.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_ibge_ipca15_mes_categoria_brasil.run_config = KubernetesRun( image=constants.DOCKER_IMAGE.value @@ -37,7 +37,7 @@ br_ibge_ipca15_mes_categoria_rm = deepcopy(flow_ibge_inflacao_mes_rm) br_ibge_ipca15_mes_categoria_rm.name = "br_ibge_ipca15.mes_categoria_rm" -br_ibge_ipca15_mes_categoria_rm.code_owners = ["lucas_cr"] +br_ibge_ipca15_mes_categoria_rm.code_owners = ["equipe_pipelines"] br_ibge_ipca15_mes_categoria_rm.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_ibge_ipca15_mes_categoria_rm.run_config = KubernetesRun( image=constants.DOCKER_IMAGE.value @@ -46,7 +46,7 @@ br_ibge_ipca15_mes_categoria_municipio = deepcopy(flow_ibge_inflacao_mes_municipio) br_ibge_ipca15_mes_categoria_municipio.name = "br_ibge_ipca15.mes_categoria_municipio" -br_ibge_ipca15_mes_categoria_municipio.code_owners = ["lucas_cr"] +br_ibge_ipca15_mes_categoria_municipio.code_owners = ["equipe_pipelines"] br_ibge_ipca15_mes_categoria_municipio.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_ibge_ipca15_mes_categoria_municipio.run_config = KubernetesRun( image=constants.DOCKER_IMAGE.value @@ -57,7 +57,7 @@ br_ibge_ipca15_mes_brasil = deepcopy(flow_ibge_inflacao_mes_geral) br_ibge_ipca15_mes_brasil.name = "br_ibge_ipca15.mes_brasil" -br_ibge_ipca15_mes_brasil.code_owners = ["lucas_cr"] +br_ibge_ipca15_mes_brasil.code_owners = ["equipe_pipelines"] br_ibge_ipca15_mes_brasil.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_ibge_ipca15_mes_brasil.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) br_ibge_ipca15_mes_brasil.schedule = br_ibge_ipca15_mes_brasil_every_month diff --git a/pipelines/utils/crawler_ibge_inflacao/flows.py b/pipelines/utils/crawler_ibge_inflacao/flows.py index 402a21cd1..248c6b038 100644 --- a/pipelines/utils/crawler_ibge_inflacao/flows.py +++ b/pipelines/utils/crawler_ibge_inflacao/flows.py @@ -13,6 +13,7 @@ from pipelines.constants import constants from pipelines.utils.constants import constants as utils_constants from pipelines.utils.crawler_ibge_inflacao.tasks import ( + check_for_updates, clean_mes_brasil, clean_mes_geral, clean_mes_municipio, @@ -21,18 +22,17 @@ ) from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, get_temporal_coverage, rename_current_flow_run_dataset_table, - update_metadata, ) with Flow( name="BD Template - IBGE Inflação: mes_brasil" ) as flow_ibge_inflacao_mes_brasil: - # Parameters INDICE = Parameter("indice") FOLDER = Parameter("folder") dataset_id = Parameter("dataset_id") @@ -44,12 +44,21 @@ "materialize after dump", default=True, required=False ) dbt_alias = Parameter("dbt_alias", default=False, required=False) + update_metadata = Parameter("update_metadata", default=False, required=False) rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id ) + # USA A FUNÇAO EXTRACT_LAST_DATE ENTAO QUEBRA SE A BSE NAO EXISTIR + + needs_to_update = check_for_updates( + indice=INDICE, dataset_id=dataset_id, table_id=table_id + ) - was_downloaded = crawler(indice=INDICE, folder=FOLDER) + with case(needs_to_update[0], True): + was_downloaded = crawler( + indice=INDICE, folder=FOLDER, upstream_tasks=[needs_to_update] + ) # pylint: disable=E1123 with case(was_downloaded, True): @@ -63,13 +72,13 @@ wait=filepath, ) - temporal_coverage = get_temporal_coverage( - filepath=filepath, - date_cols=["ano", "mes"], - time_unit="month", - interval="1", - upstream_tasks=[wait_upload_table], - ) + # temporal_coverage = get_temporal_coverage( + # filepath=filepath, + # date_cols=["ano", "mes"], + # time_unit="month", + # interval="1", + # upstream_tasks=[wait_upload_table], + # ) # wait_update_metadata = update_metadata( # dataset_id=dataset_id, @@ -113,6 +122,22 @@ wait_for_materialization.retry_delay = timedelta( seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + with case(update_metadata, True): + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + # needs_to_update[1] é a data (Y%-m%) mais recente + _last_date=needs_to_update[1], + bq_last_update=False, + api_mode="prod", + date_format="yy-mm", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="months", + upstream_tasks=[wait_for_materialization], + ) flow_ibge_inflacao_mes_brasil.storage = GCS(constants.GCS_FLOWS_BUCKET.value) flow_ibge_inflacao_mes_brasil.run_config = KubernetesRun( @@ -133,12 +158,20 @@ "materialize after dump", default=True, required=False ) dbt_alias = Parameter("dbt_alias", default=False, required=False) + update_metadata = Parameter("update_metadata", default=False, required=False) rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id ) - was_downloaded = crawler(indice=INDICE, folder=FOLDER) + needs_to_update = check_for_updates( + indice=INDICE, dataset_id=dataset_id, table_id=table_id + ) + + with case(needs_to_update[0], True): + was_downloaded = crawler( + indice=INDICE, folder=FOLDER, upstream_tasks=[needs_to_update] + ) # pylint: disable=E1123 with case(was_downloaded, True): @@ -203,6 +236,22 @@ wait_for_materialization.retry_delay = timedelta( seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + with case(update_metadata, True): + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + # needs_to_update[1] é a data (Y%-m%) mais recente + _last_date=needs_to_update[1], + bq_last_update=False, + api_mode="prod", + date_format="yy-mm", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="months", + upstream_tasks=[wait_for_materialization], + ) flow_ibge_inflacao_mes_rm.storage = GCS(constants.GCS_FLOWS_BUCKET.value) flow_ibge_inflacao_mes_rm.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) @@ -223,12 +272,20 @@ "materialize after dump", default=True, required=False ) dbt_alias = Parameter("dbt_alias", default=False, required=False) + update_metadata = Parameter("update_metadata", default=False, required=False) rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id ) - was_downloaded = crawler(indice=INDICE, folder=FOLDER) + needs_to_update = check_for_updates( + indice=INDICE, dataset_id=dataset_id, table_id=table_id + ) + + with case(needs_to_update[0], True): + was_downloaded = crawler( + indice=INDICE, folder=FOLDER, upstream_tasks=[needs_to_update] + ) # pylint: disable=E1123 with case(was_downloaded, True): @@ -293,6 +350,22 @@ wait_for_materialization.retry_delay = timedelta( seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + with case(update_metadata, True): + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + # needs_to_update[1] é a data (Y%-m%) mais recente + _last_date=needs_to_update[1], + bq_last_update=False, + api_mode="prod", + date_format="yy-mm", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="months", + upstream_tasks=[wait_for_materialization], + ) flow_ibge_inflacao_mes_municipio.storage = GCS(constants.GCS_FLOWS_BUCKET.value) @@ -315,12 +388,20 @@ ) dbt_alias = Parameter("dbt_alias", default=False, required=False) + update_metadata = Parameter("update_metadata", default=False, required=False) rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id ) - was_downloaded = crawler(indice=INDICE, folder=FOLDER) + needs_to_update = check_for_updates( + indice=INDICE, dataset_id=dataset_id, table_id=table_id + ) + + with case(needs_to_update[0], True): + was_downloaded = crawler( + indice=INDICE, folder=FOLDER, upstream_tasks=[needs_to_update] + ) # pylint: disable=E1123 with case(was_downloaded, True): @@ -386,6 +467,23 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + with case(update_metadata, True): + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + # needs_to_update[1] é a data (Y%-m%) mais recente + _last_date=needs_to_update[1], + bq_last_update=False, + api_mode="prod", + date_format="yy-mm", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="months", + upstream_tasks=[wait_for_materialization], + ) + flow_ibge_inflacao_mes_geral.storage = GCS(constants.GCS_FLOWS_BUCKET.value) flow_ibge_inflacao_mes_geral.run_config = KubernetesRun( image=constants.DOCKER_IMAGE.value diff --git a/pipelines/utils/crawler_ibge_inflacao/tasks.py b/pipelines/utils/crawler_ibge_inflacao/tasks.py index 21274631a..0eadd4ba5 100644 --- a/pipelines/utils/crawler_ibge_inflacao/tasks.py +++ b/pipelines/utils/crawler_ibge_inflacao/tasks.py @@ -8,14 +8,19 @@ import glob import os import ssl +from datetime import datetime as dt from time import sleep +import basedosdados as bd import pandas as pd import wget from prefect import task from tqdm import tqdm -from pipelines.utils.crawler_ibge_inflacao.utils import get_legacy_session +from pipelines.utils.crawler_ibge_inflacao.utils import ( + extract_last_date, + get_legacy_session, +) from pipelines.utils.utils import log # necessary for use wget, see: https://stackoverflow.com/questions/35569042/ssl-certificate-verify-failed-with-python3 @@ -23,6 +28,116 @@ # pylint: disable=C0206 # pylint: disable=C0201 # pylint: disable=R0914 +# https://sidra.ibge.gov.br/tabela/7062 +# https://sidra.ibge.gov.br/tabela/7063 +# https://sidra.ibge.gov.br/tabela/7060 + + +@task +def check_for_updates( + indice: str, + table_id: str, + dataset_id: str, +) -> bool: + """ + Crawler para checar atualizações nas dos conjuntos br_ibge_inpc; br_ibge_ipca; br_ibge_ipca15 + + indice: inpc | ipca | ip15 + """ + + n_mes = { + "janeiro": "1", + "fevereiro": "2", + "março": "3", + "abril": "4", + "maio": "5", + "junho": "6", + "julho": "7", + "agosto": "8", + "setembro": "9", + "outubro": "10", + "novembro": "11", + "dezembro": "12", + } + + if indice not in ["inpc", "ipca", "ip15"]: + raise ValueError( + "indice argument must be one of the following: 'inpc', 'ipca', 'ip15'" + ) + + log(f"Checking for updates in {indice} index for {dataset_id}.{table_id}") + + links = { + "ipca": "https://sidra.ibge.gov.br/geratabela?format=br.csv&name=tabela7060.csv&terr=NC&rank=-&query=t/7060/n1/all/v/all/p/last%201/c315/7169/d/v63%202,v66%204,v69%202,v2265%202/l/,v,t%2Bp%2Bc315", + "inpc": "https://sidra.ibge.gov.br/geratabela?format=br.csv&name=tabela7063.csv&terr=NC&rank=-&query=t/7063/n1/all/v/all/p/last%201/c315/7169/d/v44%202,v45%204,v68%202,v2292%202/l/,v,t%2Bp%2Bc315", + "ip15": "https://sidra.ibge.gov.br/geratabela?format=br.csv&name=tabela7062.csv&terr=NC&rank=-&query=t/7062/n1/all/v/all/p/last%201/c315/7169/d/v355%202,v356%202,v357%204,v1120%202/l/,v,t%2Bp%2Bc315", + } + + links = {k: v for k, v in links.items() if k.__contains__(indice)} + + links_keys = list(links.keys()) + log(links_keys) + success_dwnl = [] + + os.system('mkdir -p "/tmp/check_for_updates/"') + # + for key in tqdm(links_keys): + try: + response = get_legacy_session().get(links[key]) + # download the csv + with open(f"/tmp/check_for_updates/{key}.csv", "wb") as f: + f.write(response.content) + success_dwnl.append(key) + sleep(5) + except Exception as e: + log(e) + try: + sleep(5) + response = get_legacy_session().get(links[key]) + # download the csv + with open(f"/tmp/check_for_updates/{key}.csv", "wb") as f: + f.write(response.content) + success_dwnl.append(key) + except Exception as e: # pylint: disable=redefined-outer-name + log(e) + + log(f"success_dwnl: {success_dwnl}") + if len(links_keys) == len(success_dwnl): + log("All files were successfully downloaded") + + # quebra o flow se houver erro no download de um arquivo. + else: + rems = set(links_keys) - set(success_dwnl) + log(f"The file was not downloaded {rems}") + + file_name = os.listdir("/tmp/check_for_updates") + file_path = "/tmp/check_for_updates/" + file_name[0] + + dataframe = pd.read_csv(file_path, skiprows=2, skipfooter=14, sep=";") + + dataframe = dataframe[["Mês"]] + + dataframe[["mes", "ano"]] = dataframe["Mês"].str.split(pat=" ", n=1, expand=True) + + dataframe["mes"] = dataframe["mes"].map(n_mes) + + dataframe = dataframe["ano"][0] + "-" + dataframe["mes"][0] + + dataframe = dt.strptime(dataframe, "%Y-%m") + + max_date_ibge = dataframe.strftime("%Y-%m") + + log(f"A data mais no site do IBGE para a tabela {indice} é : {max_date_ibge}") + # TROCAR PARA BSEDOSDADOS ANTES DE IR PRA PROD + max_date_bd = extract_last_date( + dataset_id=dataset_id, table_id=table_id, billing_project_id="basedosdados-dev" + ) + + if max_date_ibge > max_date_bd: + log(f"A tabela {indice} foi atualizada no site do IBGE") + return True, str(max_date_ibge) + else: + return False @task @@ -82,12 +197,32 @@ def crawler(indice: str, folder: str) -> bool: "rm/ipca_item": "https://sidra.ibge.gov.br/geratabela?format=br.csv&name=tabela7060.csv&terr=NC&rank=-&query=t/7060/n7/all/v/all/p/all/c315/7172,7184,7200,7219,7241,7254,7283,7303,7335,7349,7356,7372,7384,7389,7401,7415,7433,7447,7454,7461,7480,7484,7488,7495,7517,7522,7541,7549,7560,7572,7587,7605,7616,7621,7627,7640,7656,7662,7684,7690,7695,7698,7714,7730,7758,7777,7782,7788,12427,107678,109464/d/v63%202,v66%204,v69%202,v2265%202/l/,v,t%2Bp%2Bc315", "rm/inpc_item": "https://sidra.ibge.gov.br/geratabela?format=br.csv&name=tabela7063.csv&terr=NC&rank=-&query=t/7063/n7/all/v/all/p/all/c315/7172,7184,7200,7219,7241,7254,7283,7303,7335,7349,7356,7372,7384,7389,7401,7415,7433,7447,7454,7461,7480,7484,7488,7495,7517,7522,7541,7549,7560,7572,7587,7605,7616,7621,7627,7640,7656,7662,7684,7690,7695,7698,7714,7730,7758,7777,7782,7788,12427,107678,109464/d/v44%202,v45%204,v68%202,v2292%202/l/,v,t%2Bp%2Bc315", "rm/ip15_item": "https://sidra.ibge.gov.br/geratabela?format=br.csv&name=tabela7062.csv&terr=NC&rank=-&query=t/7062/n7/all/v/all/p/all/c315/7172,7184,7200,7219,7241,7254,7283,7303,7335,7349,7356,7372,7384,7389,7401,7415,7433,7447,7454,7461,7480,7484,7488,7495,7517,7522,7541,7549,7560,7572,7587,7605,7616,7621,7627,7640,7656,7662,7684,7690,7695,7698,7714,7730,7758,7777,7782,7788,12427,107678,109464/d/v355%202,v356%202,v357%204,v1120%202/l/,v,t%2Bp%2Bc315", - "rm/ipca_subitem_1": "https://sidra.ibge.gov.br/geratabela/DownloadSelecaoComplexa/947075621", - "rm/ipca_subitem_2": "https://sidra.ibge.gov.br/geratabela/DownloadSelecaoComplexa/847634158", - "rm/inpc_subitem_1": "https://sidra.ibge.gov.br/geratabela/DownloadSelecaoComplexa/-884745035", - "rm/inpc_subitem_2": "https://sidra.ibge.gov.br/geratabela/DownloadSelecaoComplexa/-1265010694", - "rm/ip15_subitem_1": "https://sidra.ibge.gov.br/geratabela/DownloadSelecaoComplexa/-1750716307", - "rm/ip15_subitem_2": "https://sidra.ibge.gov.br/geratabela/DownloadSelecaoComplexa/-1258570016", + # link errado; mesmo de br/ipca_subitem + # "rm/ipca_subitem_1": "https://sidra.ibge.gov.br/geratabela/DownloadSelecaoComplexa/947075621", + # "rm/ipca_subitem_2": "https://sidra.ibge.gov.br/geratabela/DownloadSelecaoComplexa/847634158", + # usar última seleção para criar + # verificar se sao csv e se precisa de alguma config de exibição + # todo: configurar csv na seleção e settar id_territorio + "rm/ipca_subitem_2020": "https://sidra.ibge.gov.br/geratabela/DownloadSelecaoComplexa/366270194", + "rm/ipca_subitem_2021": "https://sidra.ibge.gov.br/geratabela/DownloadSelecaoComplexa/-1651704499", + "rm/ipca_subitem_2022": "https://sidra.ibge.gov.br/geratabela/DownloadSelecaoComplexa/-1811208819", + "rm/ipca_subitem_2023": "https://sidra.ibge.gov.br/geratabela/DownloadSelecaoComplexa/1963915159", + # https://sidra.ibge.gov.br/tabela/7063 + # "rm/inpc_subitem_1": "https://sidra.ibge.gov.br/geratabela/DownloadSelecaoComplexa/-884745035", + # "rm/inpc_subitem_2": "https://sidra.ibge.gov.br/geratabela/DownloadSelecaoComplexa/-1265010694", + # ok + "rm/inpc_subitem_2020": "https://sidra.ibge.gov.br/geratabela/DownloadSelecaoComplexa/-1428442922", + "rm/inpc_subitem_2021": "https://sidra.ibge.gov.br/geratabela/DownloadSelecaoComplexa/844331411", + "rm/inpc_subitem_2022": "https://sidra.ibge.gov.br/geratabela/DownloadSelecaoComplexa/-1481717997", + "rm/inpc_subitem_2023": "https://sidra.ibge.gov.br/geratabela/DownloadSelecaoComplexa/-1485551467", + # "rm/ip15_subitem_1": "https://sidra.ibge.gov.br/geratabela/DownloadSelecaoComplexa/-1750716307", + # "rm/ip15_subitem_2": "https://sidra.ibge.gov.br/geratabela/DownloadSelecaoComplexa/-1258570016", + # corrigir as seleções + # https://sidra.ibge.gov.br/tabela/7062 + "rm/ip15_subitem_2020": "https://sidra.ibge.gov.br/geratabela/DownloadSelecaoComplexa/51560386", + "rm/ip15_subitem_2021": "https://sidra.ibge.gov.br/geratabela/DownloadSelecaoComplexa/382082792", + "rm/ip15_subitem_2022": "https://sidra.ibge.gov.br/geratabela/DownloadSelecaoComplexa/1727207272", + "rm/ip15_subitem_2023": "https://sidra.ibge.gov.br/geratabela/DownloadSelecaoComplexa/999465717", "rm/ipca_geral": "https://sidra.ibge.gov.br/geratabela?format=br.csv&name=tabela7060.csv&terr=NC&rank=-&query=t/7060/n7/all/v/all/p/all/c315/7169/d/v63%202,v66%204,v69%202,v2265%202/l/,v,t%2Bp%2Bc315", "rm/inpc_geral": "https://sidra.ibge.gov.br/geratabela?format=br.csv&name=tabela7063.csv&terr=NC&rank=-&query=t/7063/n7/all/v/all/p/all/c315/7169/d/v44%202,v45%204,v68%202,v2292%202/l/,v,t%2Bp%2Bc315", "rm/ip15_geral": "https://sidra.ibge.gov.br/geratabela?format=br.csv&name=tabela7062.csv&terr=NC&rank=-&query=t/7062/n7/all/v/all/p/all/c315/7169/d/v355%202,v356%202,v357%204,v1120%202/l/,v,t%2Bp%2Bc315", @@ -120,18 +255,26 @@ def crawler(indice: str, folder: str) -> bool: if k.__contains__(indice) & k.__contains__(folder) } links_keys = list(links.keys()) + log(links_keys) success_dwnl = [] if folder != "rm": # precisei adicionar try catchs no loop para conseguir baixar todas # as tabelas sem ter pproblema com o limite de requisição do sidra for key in tqdm(links_keys): try: - wget.download(links[key], out=f"/tmp/data/input/{key}.csv") + response = get_legacy_session().get(links[key]) + # download the csv + with open(f"/tmp/data/input/{key}.csv", "wb") as f: + f.write(response.content) success_dwnl.append(key) + sleep(10) except Exception: try: sleep(10) - wget.download(links[key], out=f"/tmp/data/input/{key}.csv") + response = get_legacy_session().get(links[key]) + # download the csv + with open(f"/tmp/data/input/{key}.csv", "wb") as f: + f.write(response.content) success_dwnl.append(key) except Exception: pass @@ -143,6 +286,7 @@ def crawler(indice: str, folder: str) -> bool: with open(f"/tmp/data/input/{key}.csv", "wb") as f: f.write(response.content) success_dwnl.append(key) + sleep(10) except Exception as e: log(e) try: @@ -156,15 +300,14 @@ def crawler(indice: str, folder: str) -> bool: log(e) log(os.system("tree /tmp/data")) + log(f"success_dwnl: {success_dwnl}") if len(links_keys) == len(success_dwnl): log("All files were successfully downloaded") return True - - log("The folowing files failed to download:") - rems = set(links_keys) - set(success_dwnl) - for rem in rems: - log(rem) - return False + # quebra o flow se houver erro no download de um arquivo. + else: + rems = set(links_keys) - set(success_dwnl) + raise Exception(f"The following files failed to download: {rems}") @task @@ -244,13 +387,15 @@ def clean_mes_brasil(indice: str) -> None: dataframe = dataframe.replace(",", ".", regex=True) # Split coluna data e substituir mes - dataframe[["mes", "ano"]] = dataframe["ano"].str.split(" ", 1, expand=True) + dataframe[["mes", "ano"]] = dataframe["ano"].str.split( + pat=" ", n=1, expand=True + ) dataframe["mes"] = dataframe["mes"].map(n_mes) # Split coluna categoria e add id_categoria_bd if arq.split("_")[-1].split(".")[0] != "geral": dataframe[["id_categoria", "categoria"]] = dataframe["categoria"].str.split( - ".", 1, expand=True + pat=".", n=1, expand=True ) if arq.split("_")[-1].split(".")[0] == "grupo": @@ -382,8 +527,10 @@ def clean_mes_rm(indice: str): arq, skipfooter=14, skiprows=2, sep=";", dtype="str" ) except Exception as e: - log(f"Error reading {arq}: {e}") - continue + log( + f"Error reading {arq}: {e}. Check the the file. It may have surparsed the 200.000 values download limit of IBGE SIDRA API" + ) + break # renomear colunas. dataframe.rename(columns=rename, inplace=True) # substituir "..." por vazio @@ -394,13 +541,15 @@ def clean_mes_rm(indice: str): dataframe = dataframe.replace(",", ".", regex=True) # Split coluna data e substituir mes - dataframe[["mes", "ano"]] = dataframe["ano"].str.split(" ", 1, expand=True) + dataframe[["mes", "ano"]] = dataframe["ano"].str.split( + pat=" ", n=1, expand=True + ) dataframe["mes"] = dataframe["mes"].map(n_mes) # Split coluna categoria e add id_categoria_bd if arq.split("_")[-1].split(".")[0] != "geral": dataframe[["id_categoria", "categoria"]] = dataframe["categoria"].str.split( - ".", 1, expand=True + pat=".", n=1, expand=True ) if arq.split("_")[-1].split(".")[0] == "grupo": @@ -421,18 +570,51 @@ def clean_mes_rm(indice: str): ) dataframe = dataframe[ordem] item = pd.DataFrame(dataframe) - elif "_".join(arq.split("_")[1:]).split(".", maxsplit=1)[0] == "subitem_1": + + elif "_".join(arq.split("_")[1:]).split(".", maxsplit=1)[0] == "subitem_2020": dataframe["id_categoria_bd"] = dataframe["id_categoria"].apply( lambda x: x[0] + "." + x[1] + "." + x[2:4] + "." + x[4:7] ) dataframe = dataframe[ordem] - subitem_1 = pd.DataFrame(dataframe) - elif "_".join(arq.split("_")[1:]).split(".", maxsplit=1)[0] == "subitem_2": + subitem_2020 = pd.DataFrame(dataframe) + + elif "_".join(arq.split("_")[1:]).split(".", maxsplit=1)[0] == "subitem_2021": dataframe["id_categoria_bd"] = dataframe["id_categoria"].apply( lambda x: x[0] + "." + x[1] + "." + x[2:4] + "." + x[4:7] ) dataframe = dataframe[ordem] - subitem_2 = pd.DataFrame(dataframe) + subitem_2021 = pd.DataFrame(dataframe) + + elif "_".join(arq.split("_")[1:]).split(".", maxsplit=1)[0] == "subitem_2022": + dataframe["id_categoria_bd"] = dataframe["id_categoria"].apply( + lambda x: x[0] + "." + x[1] + "." + x[2:4] + "." + x[4:7] + ) + dataframe = dataframe[ordem] + subitem_2022 = pd.DataFrame(dataframe) + + elif "_".join(arq.split("_")[1:]).split(".", maxsplit=1)[0] == "subitem_2023": + dataframe["id_categoria_bd"] = dataframe["id_categoria"].apply( + lambda x: x[0] + "." + x[1] + "." + x[2:4] + "." + x[4:7] + ) + dataframe = dataframe[ordem] + subitem_2023 = pd.DataFrame(dataframe) + + # elif "_".join(arq.split("_")[1:]).split(".", maxsplit=1)[0] == "subitem_1": + # dataframe["id_categoria_bd"] = dataframe["id_categoria"].apply( + # lambda x: x[0] + "." + x[1] + "." + x[2:4] + "." + x[4:7] + # ) + # dataframe = dataframe[ordem] + # subitem_1 = pd.DataFrame(dataframe) + # todo: criar mais 3 elifs + # todo: com mesmo padrao de nome dos links + + # elif "_".join(arq.split("_")[1:]).split(".", maxsplit=1)[0] == "subitem_2": + # dataframe["id_categoria_bd"] = dataframe["id_categoria"].apply( + # lambda x: x[0] + "." + x[1] + "." + x[2:4] + "." + x[4:7] + # ) + # dataframe = dataframe[ordem] + # subitem_2 = pd.DataFrame(dataframe) + elif arq.split("_")[-1].split(".")[0] == "geral": dataframe["id_categoria"] = "" dataframe["id_categoria_bd"] = "0.0.00.000" @@ -441,11 +623,16 @@ def clean_mes_rm(indice: str): # Add only dataframes defined in previous loop. Download failure leads to some dataframe not being defined files_dict = { + # todo: adicionar os demais subitesm "grupo": grupo if "grupo" in locals() else "", "subgrupo": subgrupo if "subgrupo" in locals() else "", "item": item if "item" in locals() else "", - "subitem_1": subitem_1 if "subitem_1" in locals() else "", - "subitem_2": subitem_2 if "subitem_2" in locals() else "", + "subitem_2020": subitem_2020 if "subitem_2020" in locals() else "", + "subitem_2021": subitem_2021 if "subitem_2021" in locals() else "", + "subitem_2022": subitem_2022 if "subitem_2022" in locals() else "", + "subitem_2023": subitem_2023 if "subitem_2023" in locals() else "", + # "subitem_1": subitem_1 if "subitem_1" in locals() else "", + # "subitem_2": subitem_2 if "subitem_2" in locals() else "", "geral": geral if "geral" in locals() else "", } @@ -539,13 +726,15 @@ def clean_mes_municipio(indice: str): dataframe = dataframe.replace(",", ".", regex=True) # Split coluna data e substituir mes - dataframe[["mes", "ano"]] = dataframe["ano"].str.split(" ", 1, expand=True) + dataframe[["mes", "ano"]] = dataframe["ano"].str.split( + pat=" ", n=1, expand=True + ) dataframe["mes"] = dataframe["mes"].map(n_mes) # Split coluna categoria e add id_categoria_bd if arq.split("_")[-1].split(".")[0] != "geral": dataframe[["id_categoria", "categoria"]] = dataframe["categoria"].str.split( - ".", 1, expand=True + pat=".", n=1, expand=True ) if arq.split("_")[-1].split(".")[0] == "grupo": @@ -673,12 +862,12 @@ def clean_mes_geral(indice: str): ) for arq in arquivos: + log(arq) if indice == "ip15": dataframe = pd.read_csv(arq, skiprows=2, skipfooter=11, sep=";") else: dataframe = pd.read_csv(arq, skiprows=2, skipfooter=13, sep=";") - - dataframe["mes"], dataframe["ano"] = dataframe["Mês"].str.split(" ", 1).str + dataframe[["mes", "ano"]] = dataframe["Mês"].str.split(" ", n=1, expand=True) dataframe["mes"] = dataframe["mes"].map(n_mes) # renomear colunas diff --git a/pipelines/utils/crawler_ibge_inflacao/utils.py b/pipelines/utils/crawler_ibge_inflacao/utils.py index 3e95f3393..68d53e7bc 100644 --- a/pipelines/utils/crawler_ibge_inflacao/utils.py +++ b/pipelines/utils/crawler_ibge_inflacao/utils.py @@ -6,12 +6,14 @@ import ssl from datetime import datetime +import basedosdados as bd import requests import urllib3 from prefect.schedules import Schedule, adjustments, filters from prefect.schedules.clocks import CronClock from pipelines.constants import constants +from pipelines.utils.utils import log def generate_inflacao_clocks(parameters: dict): @@ -60,3 +62,43 @@ def get_legacy_session(): session = requests.session() session.mount("https://", CustomHttpAdapter(ctx)) return session + + +def extract_last_date( + dataset_id: str, + table_id: str, + billing_project_id: str, +) -> str: + """ + Extracts the last update date of a given dataset table. + + Args: + dataset_id (str): The ID of the dataset. + table_id (str): The ID of the table. + billing_project_id (str): The billing project ID. + + Returns: + str: The last update date in the format 'yyyy-mm-dd'. + + Raises: + Exception: If an error occurs while extracting the last update date. + """ + + query_bd = f""" + SELECT MAX(DATE(CAST(ano AS INT64),CAST(mes AS INT64),1)) as max_date + FROM + `{billing_project_id}.{dataset_id}.{table_id}` + """ + + t = bd.read_sql( + query=query_bd, + billing_project_id=billing_project_id, + from_file=True, + ) + + data = t["max_date"][0] + data = data.strftime("%Y-%m") + + log(f"A data mais recente da tabela é: {data}") + + return data