From 250e7c8c3c11a1795eba8980e490a2c85ab0573b Mon Sep 17 00:00:00 2001 From: tricktx Date: Mon, 2 Dec 2024 14:52:17 -0300 Subject: [PATCH 1/2] testing sinan decreasing chunk value --- pipelines/utils/crawler_datasus/flows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/utils/crawler_datasus/flows.py b/pipelines/utils/crawler_datasus/flows.py index fb162c92c..c018a0f52 100644 --- a/pipelines/utils/crawler_datasus/flows.py +++ b/pipelines/utils/crawler_datasus/flows.py @@ -423,7 +423,7 @@ file_list=dbc_files, dataset_id=dataset_id, table_id=table_id, - chunk_size = 150000, + chunk_size = 100000, upstream_tasks=[dbf_files, dbc_files], ) From 14e1a67d04ce1d3960642f3bfe80b211f733b734 Mon Sep 17 00:00:00 2001 From: tricktx Date: Mon, 2 Dec 2024 16:27:10 -0300 Subject: [PATCH 2/2] delete used chunk and collect trash --- pipelines/datasets/br_ms_sinan/flows.py | 2 -- pipelines/utils/crawler_datasus/flows.py | 1 - pipelines/utils/crawler_datasus/utils.py | 8 +++++++- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/pipelines/datasets/br_ms_sinan/flows.py b/pipelines/datasets/br_ms_sinan/flows.py index d8f84392c..f9e41aa86 100644 --- a/pipelines/datasets/br_ms_sinan/flows.py +++ b/pipelines/datasets/br_ms_sinan/flows.py @@ -1,9 +1,7 @@ # -*- coding: utf-8 -*- from copy import deepcopy - from prefect.run_configs import KubernetesRun from prefect.storage import GCS - from pipelines.utils.crawler_datasus.flows import flow_sinan from pipelines.constants import constants from pipelines.datasets.br_ms_sinan.schedules import ( diff --git a/pipelines/utils/crawler_datasus/flows.py b/pipelines/utils/crawler_datasus/flows.py index c018a0f52..9c6515302 100644 --- a/pipelines/utils/crawler_datasus/flows.py +++ b/pipelines/utils/crawler_datasus/flows.py @@ -423,7 +423,6 @@ file_list=dbc_files, dataset_id=dataset_id, table_id=table_id, - chunk_size = 100000, upstream_tasks=[dbf_files, dbc_files], ) diff --git a/pipelines/utils/crawler_datasus/utils.py b/pipelines/utils/crawler_datasus/utils.py index 6e97e6dc4..22738d69a 100644 --- a/pipelines/utils/crawler_datasus/utils.py +++ b/pipelines/utils/crawler_datasus/utils.py @@ -2,7 +2,7 @@ """ General purpose functions for the br_ms_cnes project """ - +import gc import asyncio from datetime import datetime from ftplib import FTP @@ -88,6 +88,12 @@ def dbf_to_parquet(dbf: str, table_id: str, counter: int, chunk_size:int) -> st df.to_parquet(parquet_filepath, index=None, compression='gzip') + del df + + gc.collect() + + + except struct.error as err: #unlink .partquer extension and remove dbf file