Skip to content

Commit

Permalink
Merge pull request #890 from basedosdados/staging/refactor_cnpj
Browse files Browse the repository at this point in the history
[Refactor] br_me_cnpj
  • Loading branch information
folhesgabriel authored Nov 14, 2024
2 parents 66f7613 + 710bed5 commit f0bbb9f
Show file tree
Hide file tree
Showing 6 changed files with 1,309 additions and 1,158 deletions.
4 changes: 2 additions & 2 deletions pipelines/datasets/br_me_cnpj/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@ class constants(Enum): # pylint: disable=c0103
"data_situacao_especial",
]

default_chunk_size = 2**20 # 1MB
default_chunk_size = 20 * 1024 * 1024 # 20MB

default_max_retries = 32

default_max_parallel = 16

default_timeout = 3 * 60 * 1000 # 3 minutes
default_timeout = 1 * 60 * 1000 # 1 minute
48 changes: 24 additions & 24 deletions pipelines/datasets/br_me_cnpj/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,21 @@
)
tabelas = constants_cnpj.TABELAS.value[0:1]

data_source_max_date = get_data_source_max_date()
folder_date, today_date = get_data_source_max_date()

dados_desatualizados = check_if_data_is_outdated(
dataset_id=dataset_id,
table_id=table_id,
data_source_max_date=data_source_max_date,
date_format="%Y-%m-%d",
upstream_tasks=[data_source_max_date],
data_source_max_date=folder_date,
date_format="%Y-%m",
upstream_tasks=[today_date],
)

with case(dados_desatualizados, False):
log_task(f"Não há atualizações para a tabela de {tabelas}!")

with case(dados_desatualizados, True):
output_filepath = main(tabelas,data_atualizacao=data_source_max_date)
output_filepath = main(tabelas,folder_date=folder_date,today_date=today_date)
wait_upload_table = create_table_and_upload_to_gcs(
data_path=output_filepath,
dataset_id=dataset_id,
Expand Down Expand Up @@ -124,7 +124,7 @@

br_me_cnpj_empresas.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_me_cnpj_empresas.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
#br_me_cnpj_empresas.schedule = every_day_empresas
br_me_cnpj_empresas.schedule = every_day_empresas

with Flow(
name="br_me_cnpj.socios",
Expand All @@ -147,21 +147,21 @@
)
tabelas = constants_cnpj.TABELAS.value[1:2]

data_source_max_date = get_data_source_max_date()
folder_date, today_date = get_data_source_max_date()

dados_desatualizados = check_if_data_is_outdated(
dataset_id=dataset_id,
table_id=table_id,
data_source_max_date=data_source_max_date,
date_format="%Y-%m-%d",
upstream_tasks=[data_source_max_date],
data_source_max_date=folder_date,
date_format="%Y-%m",
upstream_tasks=[today_date],
)

with case(dados_desatualizados, False):
log_task(f"Não há atualizações para a tabela de {tabelas}!")

with case(dados_desatualizados, True):
output_filepath = main(tabelas,data_atualizacao=data_source_max_date)
output_filepath = main(tabelas,folder_date=folder_date,today_date=today_date)
wait_upload_table = create_table_and_upload_to_gcs(
data_path=output_filepath,
dataset_id=dataset_id,
Expand Down Expand Up @@ -216,7 +216,7 @@

br_me_cnpj_socios.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_me_cnpj_socios.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
#br_me_cnpj_socios.schedule = every_day_socios
br_me_cnpj_socios.schedule = every_day_socios


with Flow(
Expand All @@ -241,21 +241,21 @@
)
tabelas = constants_cnpj.TABELAS.value[2:3]

data_source_max_date = get_data_source_max_date()
folder_date, today_date = get_data_source_max_date()

dados_desatualizados = check_if_data_is_outdated(
dataset_id=dataset_id,
table_id=table_id,
data_source_max_date=data_source_max_date,
date_format="%Y-%m-%d",
upstream_tasks=[data_source_max_date],
data_source_max_date=folder_date,
date_format="%Y-%m",
upstream_tasks=[today_date],
)

with case(dados_desatualizados, False):
log_task(f"Não há atualizações para a tabela de {tabelas}!")

with case(dados_desatualizados, True):
output_filepath = main(tabelas,data_atualizacao=data_source_max_date)
output_filepath = main(tabelas,folder_date=folder_date,today_date=today_date)

wait_upload_table = create_table_and_upload_to_gcs(
data_path=output_filepath,
Expand Down Expand Up @@ -355,7 +355,7 @@
br_me_cnpj_estabelecimentos.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value
)
#br_me_cnpj_estabelecimentos.schedule = every_day_estabelecimentos
br_me_cnpj_estabelecimentos.schedule = every_day_estabelecimentos


with Flow(
Expand All @@ -379,21 +379,21 @@
)
tabelas = constants_cnpj.TABELAS.value[3:]

data_source_max_date = get_data_source_max_date()
folder_date, today_date = get_data_source_max_date()

dados_desatualizados = check_if_data_is_outdated(
dataset_id="br_me_cnpj",
table_id="estabelecimentos",
data_source_max_date=data_source_max_date,
date_format="%Y-%m-%d",
upstream_tasks=[data_source_max_date],
data_source_max_date=folder_date,
date_format="%Y-%m",
upstream_tasks=[today_date],
)

with case(dados_desatualizados, False):
log_task(f"Não há atualizações para a tabela de {tabelas}!")

with case(dados_desatualizados, True):
output_filepath = main(tabelas,data_atualizacao=data_source_max_date)
output_filepath = main(tabelas,folder_date=folder_date,today_date=today_date)
wait_upload_table = create_table_and_upload_to_gcs(
data_path=output_filepath,
dataset_id=dataset_id,
Expand Down Expand Up @@ -444,5 +444,5 @@

br_me_cnpj_simples.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_me_cnpj_simples.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
#br_me_cnpj_simples.schedule = every_day_simples
br_me_cnpj_simples.schedule = every_day_simples

65 changes: 23 additions & 42 deletions pipelines/datasets/br_me_cnpj/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@
"""
import asyncio
import os
from google.cloud import storage
import pathlib
from typing import Union, List
from datetime import datetime
from datetime import datetime,timedelta
import basedosdados as bd
from prefect import task

Expand All @@ -16,61 +14,44 @@
data_url,
destino_output,
download_unzip_csv,
download_unzip_csv_sync,
process_csv_empresas,
process_csv_estabelecimentos,
process_csv_simples,
process_csv_socios,
)
from pipelines.utils.utils import log
from pipelines.constants import constants

ufs = constants_cnpj.UFS.value
url = constants_cnpj.URL.value
headers = constants_cnpj.HEADERS.value


@task
def calculate_defasagem():
"""
Calculates the month lag based on the current month.
Returns:
int: Number of lagged months.
"""
current_month = datetime.now().month
current_year = datetime.now().year

if current_year == 2023:
if current_month >= 10:
defasagem = 6
else:
defasagem = current_month - 4
else:
defasagem = 6

return defasagem


@task
def get_data_source_max_date():
@task(
max_retries=3,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def get_data_source_max_date() -> tuple[datetime,datetime]:
"""
Checks if there are available updates for a specific dataset and table.
Returns:
bool: Returns True if updates are available, otherwise returns False.
tuple: Returns a tuple with the date extracted from the CNPJs API folder and today date
to be used as partition
"""
# Obtém a data mais recente do site
data_obj = data_url(url=url, headers=headers)
return data_obj

folder_date, today_date = data_url(url=url, headers=headers)
return folder_date, today_date

@task
def main(tabelas:[str], data_atualizacao:datetime)-> str:
def main(tabelas:[str], folder_date:datetime, today_date:datetime)-> str:
"""
Performs the download, processing, and organization of CNPJ data.
Args:
tabelas (list): A list of tables to be processed.
data_atualizaão: Most recent database release extracted from API
folder_date (datetime): Most recent database release extracted from API
today_date (datetime): Today's date
Returns:
str: The path to the output folder where the data has been organized.
Expand All @@ -80,35 +61,35 @@ def main(tabelas:[str], data_atualizacao:datetime)-> str:
sufixo = tabela.lower()

# Define o caminho para a pasta de entrada (input)
input_path = f"/tmp/data/br_me_cnpj/input/data={data_atualizacao}/"
input_path = f"/tmp/data/br_me_cnpj/input/data={today_date}/"
os.makedirs(input_path, exist_ok=True)
log("Pasta destino input construído")

# Define o caminho para a pasta de saída (output)
output_path = destino_output(sufixo, data_atualizacao)
output_path = destino_output(sufixo, today_date)

# Loop para baixar e processar os arquivos
for i in range(0, 10):
if tabela != "Simples":
nome_arquivo = f"{tabela}{i}"
url_download = f"https://arquivos.receitafederal.gov.br/cnpj/dados_abertos_cnpj/{data_atualizacao.strftime('%Y-%m')}/{tabela}{i}.zip"
url_download = f"https://arquivos.receitafederal.gov.br/cnpj/dados_abertos_cnpj/{folder_date}/{tabela}{i}.zip"
if nome_arquivo not in arquivos_baixados:
arquivos_baixados.append(nome_arquivo)
asyncio.run((download_unzip_csv(url_download, input_path)))
if tabela == "Estabelecimentos":
process_csv_estabelecimentos(
input_path, output_path, data_atualizacao, i
input_path, output_path, today_date, i
)
elif tabela == "Socios":
process_csv_socios(input_path, output_path, data_atualizacao, i)
process_csv_socios(input_path, output_path, today_date, i)
elif tabela == "Empresas":
process_csv_empresas(input_path, output_path, data_atualizacao, i)
process_csv_empresas(input_path, output_path, today_date, i)
else:
nome_arquivo = f"{tabela}"
url_download = f"https://arquivos.receitafederal.gov.br/cnpj/dados_abertos_cnpj/{data_atualizacao.strftime('%Y-%m')}/{tabela}.zip"
url_download = f"https://arquivos.receitafederal.gov.br/cnpj/dados_abertos_cnpj/{folder_date}/{tabela}.zip"
if nome_arquivo not in arquivos_baixados:
arquivos_baixados.append(nome_arquivo)
asyncio.run((download_unzip_csv(url_download, input_path)))
process_csv_simples(input_path, output_path, data_atualizacao, sufixo)
process_csv_simples(input_path, output_path, today_date, sufixo)

return output_path
Loading

0 comments on commit f0bbb9f

Please sign in to comment.