diff --git a/pipelines/datasets/br_me_cnpj/tasks.py b/pipelines/datasets/br_me_cnpj/tasks.py index 9702fa6a9..4b016cd02 100644 --- a/pipelines/datasets/br_me_cnpj/tasks.py +++ b/pipelines/datasets/br_me_cnpj/tasks.py @@ -43,7 +43,10 @@ def get_data_source_max_date() -> tuple[datetime,datetime]: folder_date, today_date = data_url(url=url, headers=headers) return folder_date, today_date -@task +@task( + max_retries=3, + retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), +) def main(tabelas:[str], folder_date:datetime, today_date:datetime)-> str: """ Performs the download, processing, and organization of CNPJ data. diff --git a/pipelines/datasets/br_me_cnpj/utils.py b/pipelines/datasets/br_me_cnpj/utils.py index 28734a0a1..d892e924c 100644 --- a/pipelines/datasets/br_me_cnpj/utils.py +++ b/pipelines/datasets/br_me_cnpj/utils.py @@ -4,7 +4,7 @@ """ import os import zipfile -from asyncio import Semaphore, gather +from asyncio import Semaphore, gather, sleep from datetime import datetime from httpx import AsyncClient, HTTPError, head @@ -109,92 +109,149 @@ def fill_left_zeros(df: datetime, column, num_digits:int)-> pd.DataFrame: # ! Download assincrono e em chunck`s do zip def chunk_range(content_length: int, chunk_size: int) -> list[tuple[int, int]]: """ - Splits the content length into a list of chunk ranges for downloading. + Splits the content length into a list of chunk ranges for downloading. It Calculates + each chunk range value in bytes. Args: content_length (int): The total content length. chunk_size (int): Size of each chunk. Returns: - List[Tuple[int, int]]: List of start and end byte ranges for each chunk. + List[Tuple[int, int]]: List of start and end byte ranges for each chunk to be used as a header within download_chunk function """ return [(i, min(i + chunk_size - 1, content_length - 1)) for i in range(0, content_length, chunk_size)] # from https://stackoverflow.com/a/64283770 -async def download(url, chunk_size=20 * 1024 * 1024, max_retries=5, max_parallel=5, timeout=5 * 60): +async def download(url, chunk_size=15 * 1024 * 1024, max_retries=5, max_parallel=15, timeout=5 * 60): """ - Downloads a file from a URL in chunks asynchronously. + Downloads a file from a URL asynchronously, splitting it into chunks for parallel downloading. Args: - url (str): URL of the file to download. - chunk_size (int): Size of each chunk in bytes. - max_retries (int): Maximum retry attempts for each chunk. - max_parallel (int): Maximum number of parallel downloads. - timeout (int): Timeout for each request. + url (str): The URL of the file to download. + chunk_size (int): The size of each chunk in bytes (default: 15 MB). + max_retries (int): Maximum number of retries allowed for each chunk (default: 5). + max_parallel (int): Maximum number of parallel downloads (default: 15). + timeout (int): Timeout for each HTTP request, in seconds (default: 5 minutes). Returns: - bytes: The downloaded content in bytes. - """ + bytes: The content of the fully downloaded file. + Raises: + HTTPError: If the server responds with an error or the download fails. + """ try: request_head = head(url) - log(request_head) assert request_head.status_code == 200 assert request_head.headers["accept-ranges"] == "bytes" content_length = int(request_head.headers["content-length"]) - log(f"Baixando {url} com {content_length} bytes / {chunk_size} chunks e {max_parallel} downloads paralelos") + chunk_ranges = chunk_range(content_length, chunk_size) + total_chunks = len(chunk_ranges) + + log( + f"Baixando {url} com {content_length} bytes ({content_length/1e6:.2f} MB). " + f"Cada chunk terá tamanho de {chunk_size} bytes ({chunk_size/1e6:.2f} MB). " + f"Serão feitos {max_parallel} downloads paralelos por vez, com um total de {total_chunks} chunks." + ) semaphore = Semaphore(max_parallel) + progress = {"completed": 0} + last_logged_progress = {"percentage": 0} # Inicializa o progresso acumulado async with AsyncClient() as client: tasks = [ - download_chunk(client, url, (start, end), max_retries, timeout, semaphore) - for start, end in chunk_range(content_length, chunk_size) + download_chunk( + client, url, chunk, max_retries, timeout, semaphore, progress, total_chunks, last_logged_progress + ) + for chunk in chunk_ranges ] return b"".join(await gather(*tasks)) except HTTPError as e: - log(f"Requisição mal sucessedida: {e}") + log(f"Requisição mal sucedida: {e}") return b"" +def print_progress(completed: int, total: int, last_logged_progress: dict) -> None: + """ + Logs the download progress only when a significant change occurs. + + Args: + completed (int): Number of chunks completed. + total (int): Total number of chunks. + last_logged_progress (dict): A dictionary to store the last logged progress. + """ + progress_percentage = (completed / total) * 100 + if progress_percentage - last_logged_progress.get("percentage", 0) >= 10: # Log a progress update every 10% + last_logged_progress["percentage"] = progress_percentage + log(f"Progresso no download: {completed}/{total} chunks baixados ({progress_percentage:.1f}%)") + async def download_chunk( client: AsyncClient, url: str, chunk_range: tuple[int, int], max_retries: int, timeout: int, - semaphore: Semaphore + semaphore: Semaphore, + progress: dict, + total_chunks: int, + last_logged_progress: dict, ) -> bytes: """ - Downloads a chunk of a file asynchronously with retry logic. + Downloads a specific chunk of a file asynchronously, with retry logic and progress tracking. Args: - client (AsyncClient): HTTP client for making requests. + client (AsyncClient): HTTP client instance for making requests. url (str): The URL of the file to download. - chunk_range (Tuple[int, int]): The byte range for the chunk to be downloaded. - max_retries (int): The maximum number of retries for downloading a chunk. - timeout (int): The timeout duration for each request. - semaphore (Semaphore): A semaphore to limit the number of parallel downloads. + chunk_range (tuple[int, int]): The byte range (start, end) for the chunk being downloaded. + max_retries (int): Maximum number of retries allowed for downloading this chunk. + timeout (int): Timeout for each HTTP request, in seconds. + semaphore (Semaphore): Semaphore to limit the number of parallel downloads. + progress (dict): Dictionary to track the number of completed chunks. + total_chunks (int): Total number of chunks to be downloaded. + last_logged_progress (dict): Dictionary to track the last logged progress percentage. Returns: - bytes: The downloaded chunk content. + bytes: The content of the downloaded chunk. + + Raises: + HTTPError: If the download fails after all retry attempts. """ async with semaphore: for attempt in range(max_retries): try: - headers = {"Range": f"bytes={chunk_range[0]}-{chunk_range[1]}"} + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36", + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Accept-Language": "pt-BR,pt;q=0.8,en-US;q=0.5,en;q=0.3", + "Sec-GPC": "1", + "Upgrade-Insecure-Requests": "1", + "Sec-Fetch-Dest": "document", + "Sec-Fetch-Mode": "navigate", + "Sec-Fetch-Site": "same-origin", + "Sec-Fetch-User": "?1", + "Priority": "u=0, i", + "Range": f"bytes={chunk_range[0]}-{chunk_range[1]}" + } + response = await client.get(url, headers=headers, timeout=timeout) response.raise_for_status() + + progress["completed"] += 1 + print_progress(progress["completed"], total_chunks, last_logged_progress) + return response.content except HTTPError as e: delay = 2 ** attempt - log(f"Dowload do chunk {chunk_range} ffalhou na tentativa {attempt + 1}. Tentando novamente em {delay} segundos...") - await asyncio.sleep(delay) + log( + f"Falha no download do chunk {chunk_range[0]}-{chunk_range[1]} " + f"na tentativa {attempt + 1}. Retentando em {delay} segundos..." + ) + await sleep(delay) + + raise HTTPError(f"Download do chunk {chunk_range[0]}-{chunk_range[1]} falhou após {max_retries} tentativas") - raise HTTPError(f"Download do chunk {chunk_range} falhou depois de {max_retries} tentativas") # ! Executa o download do zip file @@ -218,6 +275,7 @@ async def download_unzip_csv(url: str, pasta_destino: str) -> None: log("Dados extraídos com sucesso!") except zipfile.BadZipFile: log(f"O arquivo {os.path.basename(url)} não é um arquivo ZIP válido.") + raise os.remove(save_path)