Skip to content

Commit

Permalink
feat: add type hints and doc strings in utils
Browse files Browse the repository at this point in the history
  • Loading branch information
folhesgabriel committed Nov 13, 2024
1 parent 537c12a commit f58d23a
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 24 deletions.
6 changes: 3 additions & 3 deletions pipelines/datasets/br_me_cnpj/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@
dataset_id=dataset_id,
table_id=table_id,
data_source_max_date=folder_date,
date_format="%Y-%m-%d",
date_format="%Y-%m",
upstream_tasks=[today_date],
)

Expand Down Expand Up @@ -247,7 +247,7 @@
dataset_id=dataset_id,
table_id=table_id,
data_source_max_date=folder_date,
date_format="%Y-%m-%d",
date_format="%Y-%m",
upstream_tasks=[today_date],
)

Expand Down Expand Up @@ -385,7 +385,7 @@
dataset_id="br_me_cnpj",
table_id="estabelecimentos",
data_source_max_date=folder_date,
date_format="%Y-%m-%d",
date_format="%Y-%m",
upstream_tasks=[today_date],
)

Expand Down
172 changes: 151 additions & 21 deletions pipelines/datasets/br_me_cnpj/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
General purpose functions for the br_me_cnpj project
"""
import os
import time
import zipfile
from asyncio import Semaphore, gather
from datetime import datetime
Expand All @@ -15,8 +14,6 @@
import re
import requests
from bs4 import BeautifulSoup
from loguru import logger
from requests.exceptions import ConnectionError, Timeout
from tqdm import tqdm

from pipelines.datasets.br_me_cnpj.constants import constants as constants_cnpj
Expand All @@ -27,7 +24,16 @@
timeout = constants_cnpj.TIMEOUT.value

def data_url(url:str, headers:dict)-> tuple[datetime,datetime]:
"""
Fetches data from a URL, parses the HTML to find the latest folder date, and compares it to today's date.
Args:
url (str): The URL to fetch the data from.
headers (dict): Headers to include in the request.
Returns:
Tuple[datetime, datetime]: The maximum date found in the folders and today's date.
"""

link_data = requests.get(url, headers=headers, timeout=timeout, verify=False)
link_data.raise_for_status()
Expand All @@ -54,6 +60,16 @@ def data_url(url:str, headers:dict)-> tuple[datetime,datetime]:

# ! Cria o caminho do output
def destino_output(sufixo:str, data_coleta: datetime)-> str:
"""
Constructs the output directory path based on the suffix and collection date.
Args:
sufixo (str): The suffix for directory structure.
data_coleta (datetime): The data collection date.
Returns:
str: The constructed output path.
"""

output_path = f"/tmp/data/br_me_cnpj/output/{sufixo}/"
# Pasta de destino para salvar o arquivo CSV
Expand All @@ -75,17 +91,51 @@ def destino_output(sufixo:str, data_coleta: datetime)-> str:

# ! Adiciona zero a esquerda nas colunas
def fill_left_zeros(df: datetime, column, num_digits:int)-> pd.DataFrame:
"""
Adds left zeros to the specified column of a DataFrame to meet the required digit count.
Args:
df (pd.DataFrame): DataFrame with the target column.
column (str): Column name to fill with zeros.
num_digits (int): Total number of digits for the column.
Returns:
pd.DataFrame: Updated DataFrame with filled zeros.
"""
df[column] = df[column].astype(str).str.zfill(num_digits)
return df


# ! Download assincrono e em chunck`s do zip
def chunk_range(content_length: int, chunk_size: int) -> list[tuple[int, int]]:
"""Split the content length into a list of chunk ranges"""
"""
Splits the content length into a list of chunk ranges for downloading.
Args:
content_length (int): The total content length.
chunk_size (int): Size of each chunk.
Returns:
List[Tuple[int, int]]: List of start and end byte ranges for each chunk.
"""
return [(i, min(i + chunk_size - 1, content_length - 1)) for i in range(0, content_length, chunk_size)]

# from https://stackoverflow.com/a/64283770
async def download(url, chunk_size=20 * 1024 * 1024, max_retries=5, max_parallel=5, timeout=5 * 60):
"""
Downloads a file from a URL in chunks asynchronously.
Args:
url (str): URL of the file to download.
chunk_size (int): Size of each chunk in bytes.
max_retries (int): Maximum retry attempts for each chunk.
max_parallel (int): Maximum number of parallel downloads.
timeout (int): Timeout for each request.
Returns:
bytes: The downloaded content in bytes.
"""

try:
request_head = head(url)
log(request_head)
Expand All @@ -94,7 +144,7 @@ async def download(url, chunk_size=20 * 1024 * 1024, max_retries=5, max_parallel
assert request_head.headers["accept-ranges"] == "bytes"

content_length = int(request_head.headers["content-length"])
log(f"Downloading {url} with {content_length} bytes / {chunk_size} chunks and {max_parallel} parallel downloads")
log(f"Baixando {url} com {content_length} bytes / {chunk_size} chunks e {max_parallel} downloads paralelos")

semaphore = Semaphore(max_parallel)

Expand All @@ -106,11 +156,32 @@ async def download(url, chunk_size=20 * 1024 * 1024, max_retries=5, max_parallel
return b"".join(await gather(*tasks))

except HTTPError as e:
log(f"Initial request failed: {e}")
log(f"Requisição mal sucessedida: {e}")
return b""


async def download_chunk(client, url, chunk_range, max_retries, timeout, semaphore):
async def download_chunk(
client: AsyncClient,
url: str,
chunk_range: tuple[int, int],
max_retries: int,
timeout: int,
semaphore: Semaphore
) -> bytes:
"""
Downloads a chunk of a file asynchronously with retry logic.
Args:
client (AsyncClient): HTTP client for making requests.
url (str): The URL of the file to download.
chunk_range (Tuple[int, int]): The byte range for the chunk to be downloaded.
max_retries (int): The maximum number of retries for downloading a chunk.
timeout (int): The timeout duration for each request.
semaphore (Semaphore): A semaphore to limit the number of parallel downloads.
Returns:
bytes: The downloaded chunk content.
"""
async with semaphore:
for attempt in range(max_retries):
try:
Expand All @@ -120,14 +191,21 @@ async def download_chunk(client, url, chunk_range, max_retries, timeout, semapho
return response.content
except HTTPError as e:
delay = 2 ** attempt
log(f"Chunk {chunk_range} failed on attempt {attempt + 1}. Retrying in {delay} seconds...")
log(f"Dowload do chunk {chunk_range} ffalhou na tentativa {attempt + 1}. Tentando novamente em {delay} segundos...")
await asyncio.sleep(delay)

raise HTTPError(f"Chunk {chunk_range} failed after {max_retries} retries")
raise HTTPError(f"Download do chunk {chunk_range} falhou depois de {max_retries} tentativas")


# ! Executa o download do zip file
async def download_unzip_csv(url, pasta_destino):
async def download_unzip_csv(url: str, pasta_destino: str) -> None:
"""
Downloads a ZIP file from a URL and extracts its content.
Args:
url (str): The URL of the ZIP file.
pasta_destino (str): The directory to save and extract the ZIP file.
"""
log(f"Baixando o arquivo {url}")
save_path = os.path.join(pasta_destino, f"{os.path.basename(url)}.zip")
content = await download(url)
Expand All @@ -147,8 +225,22 @@ async def download_unzip_csv(url, pasta_destino):

# ! Salva os dados CSV Estabelecimentos
def process_csv_estabelecimentos(
input_path: str, output_path: str, data_coleta: str, i: int, chunk_size: int = 1000
):
input_path: str,
output_path: str,
data_coleta: str,
i: int,
chunk_size: int = 100000
) -> None:
"""
Processes and saves CSV data for establishments, organizing data into partitions by state.
Args:
input_path (str): Path to the input data.
output_path (str): Directory to save processed data.
data_coleta (str): Data collection date as string.
i (int): File number or batch index.
chunk_size (int): Number of rows to process per chunk.
"""
ordem = constants_cnpj.COLUNAS_ESTABELECIMENTO_ORDEM.value
colunas = constants_cnpj.COLUNAS_ESTABELECIMENTO.value
save_path = f"{output_path}data={data_coleta}/"
Expand Down Expand Up @@ -215,8 +307,22 @@ def process_csv_estabelecimentos(

# ! Salva os dados CSV Empresas
def process_csv_empresas(
input_path: str, output_path: str, data_coleta: str, i: int, chunk_size: int = 1000
):
input_path: str,
output_path: str,
data_coleta: str,
i: int,
chunk_size: int = 100000
) -> None:
"""
Processes and saves CSV data for companies.
Args:
input_path (str): Path to the input data.
output_path (str): Directory to save processed data.
data_coleta (str): Data collection date as string.
i (int): File number or batch index.
chunk_size (int): Number of rows to process per chunk.
"""
colunas = constants_cnpj.COLUNAS_EMPRESAS.value
save_path = f"{output_path}data={data_coleta}/empresas_{i}.csv"
for nome_arquivo in os.listdir(input_path):
Expand Down Expand Up @@ -254,7 +360,20 @@ def process_csv_empresas(
# ! Salva os dados CSV Socios
def process_csv_socios(
input_path: str, output_path: str, data_coleta: str, i: int, chunk_size: int = 1000
):
) -> None:
"""
Processes and saves CSV data for socios (partners).
Args:
input_path (str): Path to the input data.
output_path (str): Directory to save processed data.
data_coleta (str): Data collection date as string.
i (int): File number or batch index.
chunk_size (int): Number of rows to process per chunk.
Returns:
None
"""
colunas = constants_cnpj.COLUNAS_SOCIOS.value
save_path = f"{output_path}data={data_coleta}/socios_{i}.csv"
for nome_arquivo in os.listdir(input_path):
Expand Down Expand Up @@ -296,12 +415,21 @@ def process_csv_socios(

# ! Salva os dados CSV Simples
def process_csv_simples(
input_path: str,
output_path: str,
data_coleta: str,
sufixo: str,
chunk_size: int = 1000,
):
input_path: str, output_path: str, data_coleta: str, sufixo: str, chunk_size: int = 1000
) -> None:
"""
Processes and saves CSV data for simples.
Args:
input_path (str): Path to the input data.
output_path (str): Directory to save processed data.
data_coleta (str): Data collection date as string.
sufixo (str): Suffix used to construct the output filename.
chunk_size (int): Number of rows to process per chunk.
Returns:
None
"""
colunas = constants_cnpj.COLUNAS_SIMPLES.value
save_path = f"{output_path}{sufixo}.csv"
for nome_arquivo in os.listdir(input_path):
Expand Down Expand Up @@ -340,3 +468,5 @@ def process_csv_simples(

log(f"Arquivo {sufixo} salvo")
os.remove(caminho_arquivo_csv)


0 comments on commit f58d23a

Please sign in to comment.