Skip to content

Commit

Permalink
Merge branch 'main' into staging/fix-ms-sinan-dengue
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Dec 5, 2024
2 parents d390991 + 61c0812 commit fbdb013
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 30 deletions.
5 changes: 4 additions & 1 deletion pipelines/datasets/br_me_cnpj/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ def get_data_source_max_date() -> tuple[datetime,datetime]:
folder_date, today_date = data_url(url=url, headers=headers)
return folder_date, today_date

@task
@task(
max_retries=3,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def main(tabelas:[str], folder_date:datetime, today_date:datetime)-> str:
"""
Performs the download, processing, and organization of CNPJ data.
Expand Down
116 changes: 87 additions & 29 deletions pipelines/datasets/br_me_cnpj/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""
import os
import zipfile
from asyncio import Semaphore, gather
from asyncio import Semaphore, gather, sleep
from datetime import datetime

from httpx import AsyncClient, HTTPError, head
Expand Down Expand Up @@ -109,92 +109,149 @@ def fill_left_zeros(df: datetime, column, num_digits:int)-> pd.DataFrame:
# ! Download assincrono e em chunck`s do zip
def chunk_range(content_length: int, chunk_size: int) -> list[tuple[int, int]]:
"""
Splits the content length into a list of chunk ranges for downloading.
Splits the content length into a list of chunk ranges for downloading. It Calculates
each chunk range value in bytes.

Args:
content_length (int): The total content length.
chunk_size (int): Size of each chunk.

Returns:
List[Tuple[int, int]]: List of start and end byte ranges for each chunk.
List[Tuple[int, int]]: List of start and end byte ranges for each chunk to be used as a header within download_chunk function
"""
return [(i, min(i + chunk_size - 1, content_length - 1)) for i in range(0, content_length, chunk_size)]

# from https://stackoverflow.com/a/64283770
async def download(url, chunk_size=20 * 1024 * 1024, max_retries=5, max_parallel=5, timeout=5 * 60):
async def download(url, chunk_size=15 * 1024 * 1024, max_retries=5, max_parallel=15, timeout=5 * 60):
"""
Downloads a file from a URL in chunks asynchronously.
Downloads a file from a URL asynchronously, splitting it into chunks for parallel downloading.

Args:
url (str): URL of the file to download.
chunk_size (int): Size of each chunk in bytes.
max_retries (int): Maximum retry attempts for each chunk.
max_parallel (int): Maximum number of parallel downloads.
timeout (int): Timeout for each request.
url (str): The URL of the file to download.
chunk_size (int): The size of each chunk in bytes (default: 15 MB).
max_retries (int): Maximum number of retries allowed for each chunk (default: 5).
max_parallel (int): Maximum number of parallel downloads (default: 15).
timeout (int): Timeout for each HTTP request, in seconds (default: 5 minutes).

Returns:
bytes: The downloaded content in bytes.
"""
bytes: The content of the fully downloaded file.

Raises:
HTTPError: If the server responds with an error or the download fails.
"""
try:
request_head = head(url)
log(request_head)

assert request_head.status_code == 200
assert request_head.headers["accept-ranges"] == "bytes"

content_length = int(request_head.headers["content-length"])
log(f"Baixando {url} com {content_length} bytes / {chunk_size} chunks e {max_parallel} downloads paralelos")
chunk_ranges = chunk_range(content_length, chunk_size)
total_chunks = len(chunk_ranges)

log(
f"Baixando {url} com {content_length} bytes ({content_length/1e6:.2f} MB). "
f"Cada chunk terá tamanho de {chunk_size} bytes ({chunk_size/1e6:.2f} MB). "
f"Serão feitos {max_parallel} downloads paralelos por vez, com um total de {total_chunks} chunks."
)

semaphore = Semaphore(max_parallel)
progress = {"completed": 0}
last_logged_progress = {"percentage": 0} # Inicializa o progresso acumulado

async with AsyncClient() as client:
tasks = [
download_chunk(client, url, (start, end), max_retries, timeout, semaphore)
for start, end in chunk_range(content_length, chunk_size)
download_chunk(
client, url, chunk, max_retries, timeout, semaphore, progress, total_chunks, last_logged_progress
)
for chunk in chunk_ranges
]
return b"".join(await gather(*tasks))

except HTTPError as e:
log(f"Requisição mal sucessedida: {e}")
log(f"Requisição mal sucedida: {e}")
return b""


def print_progress(completed: int, total: int, last_logged_progress: dict) -> None:
"""
Logs the download progress only when a significant change occurs.

Args:
completed (int): Number of chunks completed.
total (int): Total number of chunks.
last_logged_progress (dict): A dictionary to store the last logged progress.
"""
progress_percentage = (completed / total) * 100
if progress_percentage - last_logged_progress.get("percentage", 0) >= 10: # Log a progress update every 10%
last_logged_progress["percentage"] = progress_percentage
log(f"Progresso no download: {completed}/{total} chunks baixados ({progress_percentage:.1f}%)")

async def download_chunk(
client: AsyncClient,
url: str,
chunk_range: tuple[int, int],
max_retries: int,
timeout: int,
semaphore: Semaphore
semaphore: Semaphore,
progress: dict,
total_chunks: int,
last_logged_progress: dict,
) -> bytes:
"""
Downloads a chunk of a file asynchronously with retry logic.
Downloads a specific chunk of a file asynchronously, with retry logic and progress tracking.

Args:
client (AsyncClient): HTTP client for making requests.
client (AsyncClient): HTTP client instance for making requests.
url (str): The URL of the file to download.
chunk_range (Tuple[int, int]): The byte range for the chunk to be downloaded.
max_retries (int): The maximum number of retries for downloading a chunk.
timeout (int): The timeout duration for each request.
semaphore (Semaphore): A semaphore to limit the number of parallel downloads.
chunk_range (tuple[int, int]): The byte range (start, end) for the chunk being downloaded.
max_retries (int): Maximum number of retries allowed for downloading this chunk.
timeout (int): Timeout for each HTTP request, in seconds.
semaphore (Semaphore): Semaphore to limit the number of parallel downloads.
progress (dict): Dictionary to track the number of completed chunks.
total_chunks (int): Total number of chunks to be downloaded.
last_logged_progress (dict): Dictionary to track the last logged progress percentage.

Returns:
bytes: The downloaded chunk content.
bytes: The content of the downloaded chunk.

Raises:
HTTPError: If the download fails after all retry attempts.
"""
async with semaphore:
for attempt in range(max_retries):
try:
headers = {"Range": f"bytes={chunk_range[0]}-{chunk_range[1]}"}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "pt-BR,pt;q=0.8,en-US;q=0.5,en;q=0.3",
"Sec-GPC": "1",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-User": "?1",
"Priority": "u=0, i",
"Range": f"bytes={chunk_range[0]}-{chunk_range[1]}"
}

response = await client.get(url, headers=headers, timeout=timeout)
response.raise_for_status()

progress["completed"] += 1
print_progress(progress["completed"], total_chunks, last_logged_progress)

return response.content
except HTTPError as e:
delay = 2 ** attempt
log(f"Dowload do chunk {chunk_range} ffalhou na tentativa {attempt + 1}. Tentando novamente em {delay} segundos...")
await asyncio.sleep(delay)
log(
f"Falha no download do chunk {chunk_range[0]}-{chunk_range[1]} "
f"na tentativa {attempt + 1}. Retentando em {delay} segundos..."
)
await sleep(delay)

raise HTTPError(f"Download do chunk {chunk_range[0]}-{chunk_range[1]} falhou após {max_retries} tentativas")

raise HTTPError(f"Download do chunk {chunk_range} falhou depois de {max_retries} tentativas")


# ! Executa o download do zip file
Expand All @@ -218,6 +275,7 @@ async def download_unzip_csv(url: str, pasta_destino: str) -> None:
log("Dados extraídos com sucesso!")
except zipfile.BadZipFile:
log(f"O arquivo {os.path.basename(url)} não é um arquivo ZIP válido.")
raise

os.remove(save_path)

Expand Down

0 comments on commit fbdb013

Please sign in to comment.