Skip to content

Commit

Permalink
fix: refactor utils functions
Browse files Browse the repository at this point in the history
  • Loading branch information
folhesgabriel committed Oct 31, 2024
1 parent 7c52602 commit 008800f
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 49 deletions.
2 changes: 1 addition & 1 deletion pipelines/datasets/br_me_cnpj/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class constants(Enum): # pylint: disable=c0103
"TO",
]

URL = "https://dadosabertos.rfb.gov.br/CNPJ/"
URL = "https://arquivos.receitafederal.gov.br/cnpj/dados_abertos_cnpj/?C=N;O=D"

HEADERS = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36"
Expand Down
8 changes: 4 additions & 4 deletions pipelines/datasets/br_me_cnpj/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
log_task(f"Não há atualizações para a tabela de {tabelas}!")

with case(dados_desatualizados, True):
output_filepath = main(tabelas)
output_filepath = main(tabelas,data_atualizacao=data_source_max_date)
wait_upload_table = create_table_and_upload_to_gcs(
data_path=output_filepath,
dataset_id=dataset_id,
Expand Down Expand Up @@ -161,7 +161,7 @@
log_task(f"Não há atualizações para a tabela de {tabelas}!")

with case(dados_desatualizados, True):
output_filepath = main(tabelas)
output_filepath = main(tabelas,data_atualizacao=data_source_max_date)
wait_upload_table = create_table_and_upload_to_gcs(
data_path=output_filepath,
dataset_id=dataset_id,
Expand Down Expand Up @@ -255,7 +255,7 @@
log_task(f"Não há atualizações para a tabela de {tabelas}!")

with case(dados_desatualizados, True):
output_filepath = main(tabelas)
output_filepath = main(tabelas,data_atualizacao=data_source_max_date)

wait_upload_table = create_table_and_upload_to_gcs(
data_path=output_filepath,
Expand Down Expand Up @@ -393,7 +393,7 @@
log_task(f"Não há atualizações para a tabela de {tabelas}!")

with case(dados_desatualizados, True):
output_filepath = main(tabelas)
output_filepath = main(tabelas,data_atualizacao=data_source_max_date)
wait_upload_table = create_table_and_upload_to_gcs(
data_path=output_filepath,
dataset_id=dataset_id,
Expand Down
26 changes: 11 additions & 15 deletions pipelines/datasets/br_me_cnpj/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,59 +60,55 @@ def get_data_source_max_date():
bool: Returns True if updates are available, otherwise returns False.
"""
# Obtém a data mais recente do site
data_obj = data_url(url, headers).strftime("%Y-%m-%d")
data_obj = data_url(url=url, headers=headers)
return data_obj


@task
def main(tabelas):
def main(tabelas:[str], data_atualizacao:datetime)-> str:
"""
Performs the download, processing, and organization of CNPJ data.
Args:
tabelas (list): A list of tables to be processed.
data_atualizaão: Most recent database release extracted from API
Returns:
str: The path to the output folder where the data has been organized.
"""
arquivos_baixados = [] # Lista para rastrear os arquivos baixados
data_coleta = data_url(url, headers).date() # Obtém a data da atualização dos dados

for tabela in tabelas:
sufixo = tabela.lower()

# Define o caminho para a pasta de entrada (input)
input_path = f"/tmp/data/br_me_cnpj/input/data={data_coleta}/"
input_path = f"/tmp/data/br_me_cnpj/input/data={data_atualizacao}/"
os.makedirs(input_path, exist_ok=True)
log("Pasta destino input construído")

# Define o caminho para a pasta de saída (output)
output_path = destino_output(sufixo, data_coleta)
output_path = destino_output(sufixo, data_atualizacao)

# Loop para baixar e processar os arquivos
for i in range(0, 10):
if tabela != "Simples":
nome_arquivo = f"{tabela}{i}"
url_download = f"https://dadosabertos.rfb.gov.br/CNPJ/dados_abertos_cnpj/2024-08/{tabela}{i}.zip"
#url_download = f"http://200.152.38.155/CNPJ/{tabela}{i}.zip"
url_download = f"https://arquivos.receitafederal.gov.br/cnpj/dados_abertos_cnpj/{data_atualizacao.strftime('%Y-%m')}/{tabela}{i}.zip"
if nome_arquivo not in arquivos_baixados:
arquivos_baixados.append(nome_arquivo)
asyncio.run((download_unzip_csv(url_download, input_path)))
if tabela == "Estabelecimentos":
process_csv_estabelecimentos(
input_path, output_path, data_coleta, i
input_path, output_path, data_atualizacao, i
)
elif tabela == "Socios":
process_csv_socios(input_path, output_path, data_coleta, i)
process_csv_socios(input_path, output_path, data_atualizacao, i)
elif tabela == "Empresas":
process_csv_empresas(input_path, output_path, data_coleta, i)
process_csv_empresas(input_path, output_path, data_atualizacao, i)
else:
nome_arquivo = f"{tabela}"
url_download = f"https://dadosabertos.rfb.gov.br/CNPJ/dados_abertos_cnpj/2024-08/{tabela}.zip"
#url_download = f"http://200.152.38.155/CNPJ/{tabela}.zip"
url_download = f"https://arquivos.receitafederal.gov.br/cnpj/dados_abertos_cnpj/{data_atualizacao.strftime('%Y-%m')}/{tabela}.zip"
if nome_arquivo not in arquivos_baixados:
arquivos_baixados.append(nome_arquivo)
asyncio.run((download_unzip_csv(url_download, input_path)))
process_csv_simples(input_path, output_path, data_coleta, sufixo)
process_csv_simples(input_path, output_path, data_atualizacao, sufixo)

return output_path
55 changes: 26 additions & 29 deletions pipelines/datasets/br_me_cnpj/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import re
import requests
from bs4 import BeautifulSoup
from loguru import logger
Expand All @@ -25,48 +26,44 @@
headers = constants_cnpj.HEADERS.value


# ! Checa a data do site ME
# def data_url(url, headers):
# link_data = requests.get(url, headers=headers)
# soup = BeautifulSoup(link_data.text, "html.parser")
# span_element = soup.find_all("td", align="right")

# # Extrai a segunda ocorrência
# data_completa = span_element[1].text.strip()
# # Extrai a parte da data no formato "YYYY-MM-DD"
# data_str = data_completa[0:10]
# # Converte a string da data em um objeto de data
# data = datetime.strptime(data_str, "%Y-%m-%d")
# return data
def data_url(url:str, headers:dict)-> datetime:


def data_url(url, headers):
max_attempts = constants_cnpj.MAX_ATTEMPTS.value
timeout = constants_cnpj.TIMEOUT.value
attempts = constants_cnpj.ATTEMPTS.value
date_pattern = re.compile(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}")

while attempts < max_attempts:
try:
link_data = requests.get(url, headers=headers, timeout=timeout)
time.sleep(2)
soup = BeautifulSoup(link_data.text, "html.parser")

span_element = soup.find_all("td", align="right")
# Extrai a segunda ocorrência
data_completa = span_element[3].text.strip()
# Extrai a parte da data no formato "YYYY-MM-DD"
data_str = data_completa[0:10]
# Converte a string da data em um objeto de data
data = datetime.strptime(data_str, "%Y-%m-%d")

return data
link_data.raise_for_status()
break
except (ConnectionError, Timeout) as e:
log(f"Tentativa {attempts + 1} falhou. Erro: {e}")
time.sleep(2)
time.sleep(1)
attempts += 1
else:
log(f"Máximo de {max_attempts} tentativas alcançado. Verifique a conexão ou o endpoint.")
return None

soup = BeautifulSoup(link_data.text, "html.parser")

dates = [
datetime.strptime(element.get_text(strip=True), "%Y-%m-%d %H:%M").date()
for element in soup.find_all('td', align="right")
if date_pattern.match(element.get_text(strip=True))
]

try:
max_date = max(dates)
log(f"A data máxima extraida da API é: {max_date}")
except ValueError as e:
log(f"A lista que deveria conter a data mais recente de atualização retornou nula. Verifique o endpoint \n. Erro: {e}")

return max_date


log("Todas as tentativas falharam. Tratamento adicional é necessário.")
return None


# ! Cria o caminho do output
Expand Down

0 comments on commit 008800f

Please sign in to comment.