diff --git a/pipelines/datasets/br_me_cnpj/constants.py b/pipelines/datasets/br_me_cnpj/constants.py index b0b899486..2f3e4bef9 100644 --- a/pipelines/datasets/br_me_cnpj/constants.py +++ b/pipelines/datasets/br_me_cnpj/constants.py @@ -51,7 +51,7 @@ class constants(Enum): # pylint: disable=c0103 "TO", ] - URL = "https://dadosabertos.rfb.gov.br/CNPJ/" + URL = "https://arquivos.receitafederal.gov.br/cnpj/dados_abertos_cnpj/?C=N;O=D" HEADERS = { "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36" diff --git a/pipelines/datasets/br_me_cnpj/flows.py b/pipelines/datasets/br_me_cnpj/flows.py index e8f1a4a72..f18203c0f 100644 --- a/pipelines/datasets/br_me_cnpj/flows.py +++ b/pipelines/datasets/br_me_cnpj/flows.py @@ -68,7 +68,7 @@ log_task(f"Não há atualizações para a tabela de {tabelas}!") with case(dados_desatualizados, True): - output_filepath = main(tabelas) + output_filepath = main(tabelas,data_atualizacao=data_source_max_date) wait_upload_table = create_table_and_upload_to_gcs( data_path=output_filepath, dataset_id=dataset_id, @@ -161,7 +161,7 @@ log_task(f"Não há atualizações para a tabela de {tabelas}!") with case(dados_desatualizados, True): - output_filepath = main(tabelas) + output_filepath = main(tabelas,data_atualizacao=data_source_max_date) wait_upload_table = create_table_and_upload_to_gcs( data_path=output_filepath, dataset_id=dataset_id, @@ -255,7 +255,7 @@ log_task(f"Não há atualizações para a tabela de {tabelas}!") with case(dados_desatualizados, True): - output_filepath = main(tabelas) + output_filepath = main(tabelas,data_atualizacao=data_source_max_date) wait_upload_table = create_table_and_upload_to_gcs( data_path=output_filepath, @@ -393,7 +393,7 @@ log_task(f"Não há atualizações para a tabela de {tabelas}!") with case(dados_desatualizados, True): - output_filepath = main(tabelas) + output_filepath = main(tabelas,data_atualizacao=data_source_max_date) wait_upload_table = create_table_and_upload_to_gcs( data_path=output_filepath, dataset_id=dataset_id, diff --git a/pipelines/datasets/br_me_cnpj/tasks.py b/pipelines/datasets/br_me_cnpj/tasks.py index 82369f196..b9a915b5f 100644 --- a/pipelines/datasets/br_me_cnpj/tasks.py +++ b/pipelines/datasets/br_me_cnpj/tasks.py @@ -60,59 +60,55 @@ def get_data_source_max_date(): bool: Returns True if updates are available, otherwise returns False. """ # Obtém a data mais recente do site - data_obj = data_url(url, headers).strftime("%Y-%m-%d") + data_obj = data_url(url=url, headers=headers) return data_obj - @task -def main(tabelas): +def main(tabelas:[str], data_atualizacao:datetime)-> str: """ Performs the download, processing, and organization of CNPJ data. Args: tabelas (list): A list of tables to be processed. + data_atualizaão: Most recent database release extracted from API Returns: str: The path to the output folder where the data has been organized. """ arquivos_baixados = [] # Lista para rastrear os arquivos baixados - data_coleta = data_url(url, headers).date() # Obtém a data da atualização dos dados - for tabela in tabelas: sufixo = tabela.lower() # Define o caminho para a pasta de entrada (input) - input_path = f"/tmp/data/br_me_cnpj/input/data={data_coleta}/" + input_path = f"/tmp/data/br_me_cnpj/input/data={data_atualizacao}/" os.makedirs(input_path, exist_ok=True) log("Pasta destino input construído") # Define o caminho para a pasta de saída (output) - output_path = destino_output(sufixo, data_coleta) + output_path = destino_output(sufixo, data_atualizacao) # Loop para baixar e processar os arquivos for i in range(0, 10): if tabela != "Simples": nome_arquivo = f"{tabela}{i}" - url_download = f"https://dadosabertos.rfb.gov.br/CNPJ/dados_abertos_cnpj/2024-08/{tabela}{i}.zip" - #url_download = f"http://200.152.38.155/CNPJ/{tabela}{i}.zip" + url_download = f"https://arquivos.receitafederal.gov.br/cnpj/dados_abertos_cnpj/{data_atualizacao.strftime('%Y-%m')}/{tabela}{i}.zip" if nome_arquivo not in arquivos_baixados: arquivos_baixados.append(nome_arquivo) asyncio.run((download_unzip_csv(url_download, input_path))) if tabela == "Estabelecimentos": process_csv_estabelecimentos( - input_path, output_path, data_coleta, i + input_path, output_path, data_atualizacao, i ) elif tabela == "Socios": - process_csv_socios(input_path, output_path, data_coleta, i) + process_csv_socios(input_path, output_path, data_atualizacao, i) elif tabela == "Empresas": - process_csv_empresas(input_path, output_path, data_coleta, i) + process_csv_empresas(input_path, output_path, data_atualizacao, i) else: nome_arquivo = f"{tabela}" - url_download = f"https://dadosabertos.rfb.gov.br/CNPJ/dados_abertos_cnpj/2024-08/{tabela}.zip" - #url_download = f"http://200.152.38.155/CNPJ/{tabela}.zip" + url_download = f"https://arquivos.receitafederal.gov.br/cnpj/dados_abertos_cnpj/{data_atualizacao.strftime('%Y-%m')}/{tabela}.zip" if nome_arquivo not in arquivos_baixados: arquivos_baixados.append(nome_arquivo) asyncio.run((download_unzip_csv(url_download, input_path))) - process_csv_simples(input_path, output_path, data_coleta, sufixo) + process_csv_simples(input_path, output_path, data_atualizacao, sufixo) return output_path diff --git a/pipelines/datasets/br_me_cnpj/utils.py b/pipelines/datasets/br_me_cnpj/utils.py index b4280f86f..a608b5c02 100644 --- a/pipelines/datasets/br_me_cnpj/utils.py +++ b/pipelines/datasets/br_me_cnpj/utils.py @@ -12,6 +12,7 @@ import pandas as pd import pyarrow as pa import pyarrow.parquet as pq +import re import requests from bs4 import BeautifulSoup from loguru import logger @@ -25,48 +26,44 @@ headers = constants_cnpj.HEADERS.value -# ! Checa a data do site ME -# def data_url(url, headers): -# link_data = requests.get(url, headers=headers) -# soup = BeautifulSoup(link_data.text, "html.parser") -# span_element = soup.find_all("td", align="right") -# # Extrai a segunda ocorrência -# data_completa = span_element[1].text.strip() -# # Extrai a parte da data no formato "YYYY-MM-DD" -# data_str = data_completa[0:10] -# # Converte a string da data em um objeto de data -# data = datetime.strptime(data_str, "%Y-%m-%d") -# return data +def data_url(url:str, headers:dict)-> datetime: - -def data_url(url, headers): max_attempts = constants_cnpj.MAX_ATTEMPTS.value timeout = constants_cnpj.TIMEOUT.value attempts = constants_cnpj.ATTEMPTS.value + date_pattern = re.compile(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}") while attempts < max_attempts: try: link_data = requests.get(url, headers=headers, timeout=timeout) - time.sleep(2) - soup = BeautifulSoup(link_data.text, "html.parser") - - span_element = soup.find_all("td", align="right") - # Extrai a segunda ocorrência - data_completa = span_element[3].text.strip() - # Extrai a parte da data no formato "YYYY-MM-DD" - data_str = data_completa[0:10] - # Converte a string da data em um objeto de data - data = datetime.strptime(data_str, "%Y-%m-%d") - - return data + link_data.raise_for_status() + break except (ConnectionError, Timeout) as e: log(f"Tentativa {attempts + 1} falhou. Erro: {e}") - time.sleep(2) + time.sleep(1) attempts += 1 + else: + log(f"Máximo de {max_attempts} tentativas alcançado. Verifique a conexão ou o endpoint.") + return None + + soup = BeautifulSoup(link_data.text, "html.parser") + + dates = [ + datetime.strptime(element.get_text(strip=True), "%Y-%m-%d %H:%M").date() + for element in soup.find_all('td', align="right") + if date_pattern.match(element.get_text(strip=True)) + ] + + try: + max_date = max(dates) + log(f"A data máxima extraida da API é: {max_date}") + except ValueError as e: + log(f"A lista que deveria conter a data mais recente de atualização retornou nula. Verifique o endpoint \n. Erro: {e}") + + return max_date + - log("Todas as tentativas falharam. Tratamento adicional é necessário.") - return None # ! Cria o caminho do output