Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] br_me_cnpj #905

Merged
merged 5 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pipelines/datasets/br_me_cnpj/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ def get_data_source_max_date() -> tuple[datetime,datetime]:
folder_date, today_date = data_url(url=url, headers=headers)
return folder_date, today_date

@task
@task(
max_retries=3,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def main(tabelas:[str], folder_date:datetime, today_date:datetime)-> str:
"""
Performs the download, processing, and organization of CNPJ data.
Expand Down
116 changes: 87 additions & 29 deletions pipelines/datasets/br_me_cnpj/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""
import os
import zipfile
from asyncio import Semaphore, gather
from asyncio import Semaphore, gather, sleep
from datetime import datetime

from httpx import AsyncClient, HTTPError, head
Expand Down Expand Up @@ -109,92 +109,149 @@ def fill_left_zeros(df: datetime, column, num_digits:int)-> pd.DataFrame:
# ! Download assincrono e em chunck`s do zip
def chunk_range(content_length: int, chunk_size: int) -> list[tuple[int, int]]:
"""
Splits the content length into a list of chunk ranges for downloading.
Splits the content length into a list of chunk ranges for downloading. It Calculates
each chunk range value in bytes.

Args:
content_length (int): The total content length.
chunk_size (int): Size of each chunk.

Returns:
List[Tuple[int, int]]: List of start and end byte ranges for each chunk.
List[Tuple[int, int]]: List of start and end byte ranges for each chunk to be used as a header within download_chunk function
"""
return [(i, min(i + chunk_size - 1, content_length - 1)) for i in range(0, content_length, chunk_size)]

# from https://stackoverflow.com/a/64283770
async def download(url, chunk_size=20 * 1024 * 1024, max_retries=5, max_parallel=5, timeout=5 * 60):
async def download(url, chunk_size=15 * 1024 * 1024, max_retries=5, max_parallel=15, timeout=5 * 60):
"""
Downloads a file from a URL in chunks asynchronously.
Downloads a file from a URL asynchronously, splitting it into chunks for parallel downloading.

Args:
url (str): URL of the file to download.
chunk_size (int): Size of each chunk in bytes.
max_retries (int): Maximum retry attempts for each chunk.
max_parallel (int): Maximum number of parallel downloads.
timeout (int): Timeout for each request.
url (str): The URL of the file to download.
chunk_size (int): The size of each chunk in bytes (default: 15 MB).
max_retries (int): Maximum number of retries allowed for each chunk (default: 5).
max_parallel (int): Maximum number of parallel downloads (default: 15).
timeout (int): Timeout for each HTTP request, in seconds (default: 5 minutes).

Returns:
bytes: The downloaded content in bytes.
"""
bytes: The content of the fully downloaded file.

Raises:
HTTPError: If the server responds with an error or the download fails.
"""
try:
request_head = head(url)
log(request_head)

assert request_head.status_code == 200
assert request_head.headers["accept-ranges"] == "bytes"

content_length = int(request_head.headers["content-length"])
log(f"Baixando {url} com {content_length} bytes / {chunk_size} chunks e {max_parallel} downloads paralelos")
chunk_ranges = chunk_range(content_length, chunk_size)
total_chunks = len(chunk_ranges)

log(
f"Baixando {url} com {content_length} bytes ({content_length/1e6:.2f} MB). "
f"Cada chunk terá tamanho de {chunk_size} bytes ({chunk_size/1e6:.2f} MB). "
f"Serão feitos {max_parallel} downloads paralelos por vez, com um total de {total_chunks} chunks."
)

semaphore = Semaphore(max_parallel)
progress = {"completed": 0}
last_logged_progress = {"percentage": 0} # Inicializa o progresso acumulado

async with AsyncClient() as client:
tasks = [
download_chunk(client, url, (start, end), max_retries, timeout, semaphore)
for start, end in chunk_range(content_length, chunk_size)
download_chunk(
client, url, chunk, max_retries, timeout, semaphore, progress, total_chunks, last_logged_progress
)
for chunk in chunk_ranges
]
return b"".join(await gather(*tasks))

except HTTPError as e:
log(f"Requisição mal sucessedida: {e}")
log(f"Requisição mal sucedida: {e}")
return b""


def print_progress(completed: int, total: int, last_logged_progress: dict) -> None:
"""
Logs the download progress only when a significant change occurs.

Args:
completed (int): Number of chunks completed.
total (int): Total number of chunks.
last_logged_progress (dict): A dictionary to store the last logged progress.
"""
progress_percentage = (completed / total) * 100
if progress_percentage - last_logged_progress.get("percentage", 0) >= 10: # Log a progress update every 10%
last_logged_progress["percentage"] = progress_percentage
log(f"Progresso no download: {completed}/{total} chunks baixados ({progress_percentage:.1f}%)")

async def download_chunk(
client: AsyncClient,
url: str,
chunk_range: tuple[int, int],
max_retries: int,
timeout: int,
semaphore: Semaphore
semaphore: Semaphore,
progress: dict,
total_chunks: int,
last_logged_progress: dict,
) -> bytes:
"""
Downloads a chunk of a file asynchronously with retry logic.
Downloads a specific chunk of a file asynchronously, with retry logic and progress tracking.

Args:
client (AsyncClient): HTTP client for making requests.
client (AsyncClient): HTTP client instance for making requests.
url (str): The URL of the file to download.
chunk_range (Tuple[int, int]): The byte range for the chunk to be downloaded.
max_retries (int): The maximum number of retries for downloading a chunk.
timeout (int): The timeout duration for each request.
semaphore (Semaphore): A semaphore to limit the number of parallel downloads.
chunk_range (tuple[int, int]): The byte range (start, end) for the chunk being downloaded.
max_retries (int): Maximum number of retries allowed for downloading this chunk.
timeout (int): Timeout for each HTTP request, in seconds.
semaphore (Semaphore): Semaphore to limit the number of parallel downloads.
progress (dict): Dictionary to track the number of completed chunks.
total_chunks (int): Total number of chunks to be downloaded.
last_logged_progress (dict): Dictionary to track the last logged progress percentage.

Returns:
bytes: The downloaded chunk content.
bytes: The content of the downloaded chunk.

Raises:
HTTPError: If the download fails after all retry attempts.
"""
async with semaphore:
for attempt in range(max_retries):
try:
headers = {"Range": f"bytes={chunk_range[0]}-{chunk_range[1]}"}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "pt-BR,pt;q=0.8,en-US;q=0.5,en;q=0.3",
"Sec-GPC": "1",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-User": "?1",
"Priority": "u=0, i",
"Range": f"bytes={chunk_range[0]}-{chunk_range[1]}"
}

response = await client.get(url, headers=headers, timeout=timeout)
response.raise_for_status()

progress["completed"] += 1
print_progress(progress["completed"], total_chunks, last_logged_progress)

return response.content
except HTTPError as e:
delay = 2 ** attempt
log(f"Dowload do chunk {chunk_range} ffalhou na tentativa {attempt + 1}. Tentando novamente em {delay} segundos...")
await asyncio.sleep(delay)
log(
f"Falha no download do chunk {chunk_range[0]}-{chunk_range[1]} "
f"na tentativa {attempt + 1}. Retentando em {delay} segundos..."
)
await sleep(delay)

raise HTTPError(f"Download do chunk {chunk_range[0]}-{chunk_range[1]} falhou após {max_retries} tentativas")

raise HTTPError(f"Download do chunk {chunk_range} falhou depois de {max_retries} tentativas")


# ! Executa o download do zip file
Expand All @@ -218,6 +275,7 @@ async def download_unzip_csv(url: str, pasta_destino: str) -> None:
log("Dados extraídos com sucesso!")
except zipfile.BadZipFile:
log(f"O arquivo {os.path.basename(url)} não é um arquivo ZIP válido.")
raise

os.remove(save_path)

Expand Down
Loading