Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] br_me_cnpj #878

Merged
merged 2 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pipelines/datasets/br_me_cnpj/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class constants(Enum): # pylint: disable=c0103
"TO",
]

URL = "https://dadosabertos.rfb.gov.br/CNPJ/"
URL = "https://arquivos.receitafederal.gov.br/cnpj/dados_abertos_cnpj/?C=N;O=D"

HEADERS = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36"
Expand Down
8 changes: 4 additions & 4 deletions pipelines/datasets/br_me_cnpj/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
log_task(f"Não há atualizações para a tabela de {tabelas}!")

with case(dados_desatualizados, True):
output_filepath = main(tabelas)
output_filepath = main(tabelas,data_atualizacao=data_source_max_date)
wait_upload_table = create_table_and_upload_to_gcs(
data_path=output_filepath,
dataset_id=dataset_id,
Expand Down Expand Up @@ -161,7 +161,7 @@
log_task(f"Não há atualizações para a tabela de {tabelas}!")

with case(dados_desatualizados, True):
output_filepath = main(tabelas)
output_filepath = main(tabelas,data_atualizacao=data_source_max_date)
wait_upload_table = create_table_and_upload_to_gcs(
data_path=output_filepath,
dataset_id=dataset_id,
Expand Down Expand Up @@ -255,7 +255,7 @@
log_task(f"Não há atualizações para a tabela de {tabelas}!")

with case(dados_desatualizados, True):
output_filepath = main(tabelas)
output_filepath = main(tabelas,data_atualizacao=data_source_max_date)

wait_upload_table = create_table_and_upload_to_gcs(
data_path=output_filepath,
Expand Down Expand Up @@ -393,7 +393,7 @@
log_task(f"Não há atualizações para a tabela de {tabelas}!")

with case(dados_desatualizados, True):
output_filepath = main(tabelas)
output_filepath = main(tabelas,data_atualizacao=data_source_max_date)
wait_upload_table = create_table_and_upload_to_gcs(
data_path=output_filepath,
dataset_id=dataset_id,
Expand Down
26 changes: 11 additions & 15 deletions pipelines/datasets/br_me_cnpj/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,59 +60,55 @@ def get_data_source_max_date():
bool: Returns True if updates are available, otherwise returns False.
"""
# Obtém a data mais recente do site
data_obj = data_url(url, headers).strftime("%Y-%m-%d")
data_obj = data_url(url=url, headers=headers)
return data_obj


@task
def main(tabelas):
def main(tabelas:[str], data_atualizacao:datetime)-> str:
"""
Performs the download, processing, and organization of CNPJ data.

Args:
tabelas (list): A list of tables to be processed.
data_atualizaão: Most recent database release extracted from API

Returns:
str: The path to the output folder where the data has been organized.
"""
arquivos_baixados = [] # Lista para rastrear os arquivos baixados
data_coleta = data_url(url, headers).date() # Obtém a data da atualização dos dados

for tabela in tabelas:
sufixo = tabela.lower()

# Define o caminho para a pasta de entrada (input)
input_path = f"/tmp/data/br_me_cnpj/input/data={data_coleta}/"
input_path = f"/tmp/data/br_me_cnpj/input/data={data_atualizacao}/"
os.makedirs(input_path, exist_ok=True)
log("Pasta destino input construído")

# Define o caminho para a pasta de saída (output)
output_path = destino_output(sufixo, data_coleta)
output_path = destino_output(sufixo, data_atualizacao)

# Loop para baixar e processar os arquivos
for i in range(0, 10):
if tabela != "Simples":
nome_arquivo = f"{tabela}{i}"
url_download = f"https://dadosabertos.rfb.gov.br/CNPJ/dados_abertos_cnpj/2024-08/{tabela}{i}.zip"
#url_download = f"http://200.152.38.155/CNPJ/{tabela}{i}.zip"
url_download = f"https://arquivos.receitafederal.gov.br/cnpj/dados_abertos_cnpj/{data_atualizacao.strftime('%Y-%m')}/{tabela}{i}.zip"
if nome_arquivo not in arquivos_baixados:
arquivos_baixados.append(nome_arquivo)
asyncio.run((download_unzip_csv(url_download, input_path)))
if tabela == "Estabelecimentos":
process_csv_estabelecimentos(
input_path, output_path, data_coleta, i
input_path, output_path, data_atualizacao, i
)
elif tabela == "Socios":
process_csv_socios(input_path, output_path, data_coleta, i)
process_csv_socios(input_path, output_path, data_atualizacao, i)
elif tabela == "Empresas":
process_csv_empresas(input_path, output_path, data_coleta, i)
process_csv_empresas(input_path, output_path, data_atualizacao, i)
else:
nome_arquivo = f"{tabela}"
url_download = f"https://dadosabertos.rfb.gov.br/CNPJ/dados_abertos_cnpj/2024-08/{tabela}.zip"
#url_download = f"http://200.152.38.155/CNPJ/{tabela}.zip"
url_download = f"https://arquivos.receitafederal.gov.br/cnpj/dados_abertos_cnpj/{data_atualizacao.strftime('%Y-%m')}/{tabela}.zip"
if nome_arquivo not in arquivos_baixados:
arquivos_baixados.append(nome_arquivo)
asyncio.run((download_unzip_csv(url_download, input_path)))
process_csv_simples(input_path, output_path, data_coleta, sufixo)
process_csv_simples(input_path, output_path, data_atualizacao, sufixo)

return output_path
55 changes: 26 additions & 29 deletions pipelines/datasets/br_me_cnpj/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import re
import requests
from bs4 import BeautifulSoup
from loguru import logger
Expand All @@ -25,48 +26,44 @@
headers = constants_cnpj.HEADERS.value


# ! Checa a data do site ME
# def data_url(url, headers):
# link_data = requests.get(url, headers=headers)
# soup = BeautifulSoup(link_data.text, "html.parser")
# span_element = soup.find_all("td", align="right")

# # Extrai a segunda ocorrência
# data_completa = span_element[1].text.strip()
# # Extrai a parte da data no formato "YYYY-MM-DD"
# data_str = data_completa[0:10]
# # Converte a string da data em um objeto de data
# data = datetime.strptime(data_str, "%Y-%m-%d")
# return data
def data_url(url:str, headers:dict)-> datetime:


def data_url(url, headers):
max_attempts = constants_cnpj.MAX_ATTEMPTS.value
timeout = constants_cnpj.TIMEOUT.value
attempts = constants_cnpj.ATTEMPTS.value
date_pattern = re.compile(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}")

while attempts < max_attempts:
try:
link_data = requests.get(url, headers=headers, timeout=timeout)
time.sleep(2)
soup = BeautifulSoup(link_data.text, "html.parser")

span_element = soup.find_all("td", align="right")
# Extrai a segunda ocorrência
data_completa = span_element[3].text.strip()
# Extrai a parte da data no formato "YYYY-MM-DD"
data_str = data_completa[0:10]
# Converte a string da data em um objeto de data
data = datetime.strptime(data_str, "%Y-%m-%d")

return data
link_data.raise_for_status()
break
except (ConnectionError, Timeout) as e:
log(f"Tentativa {attempts + 1} falhou. Erro: {e}")
time.sleep(2)
time.sleep(1)
attempts += 1
else:
log(f"Máximo de {max_attempts} tentativas alcançado. Verifique a conexão ou o endpoint.")
return None

soup = BeautifulSoup(link_data.text, "html.parser")

dates = [
datetime.strptime(element.get_text(strip=True), "%Y-%m-%d %H:%M").date()
for element in soup.find_all('td', align="right")
if date_pattern.match(element.get_text(strip=True))
]

try:
max_date = max(dates)
log(f"A data máxima extraida da API é: {max_date}")
except ValueError as e:
log(f"A lista que deveria conter a data mais recente de atualização retornou nula. Verifique o endpoint \n. Erro: {e}")

return max_date


log("Todas as tentativas falharam. Tratamento adicional é necessário.")
return None


# ! Cria o caminho do output
Expand Down
Loading