From 39635177baacecc7b99ee2e67adda9e32bc41f4f Mon Sep 17 00:00:00 2001 From: uiro-bi Date: Mon, 2 Sep 2024 07:50:41 -0300 Subject: [PATCH 1/3] feat: reactivate br_ms_sia schedules --- pipelines/datasets/br_ms_sia/flows.py | 4 ++-- pipelines/utils/crawler_datasus/tasks.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pipelines/datasets/br_ms_sia/flows.py b/pipelines/datasets/br_ms_sia/flows.py index fce82dd91..662064a88 100644 --- a/pipelines/datasets/br_ms_sia/flows.py +++ b/pipelines/datasets/br_ms_sia/flows.py @@ -21,7 +21,7 @@ br_ms_sia_producao_ambulatorial.code_owners = ["Gabriel Pisa"] br_ms_sia_producao_ambulatorial.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_ms_sia_producao_ambulatorial.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) -#br_ms_sia_producao_ambulatorial.schedule = schedule_br_ms_sia_producao_ambulatorial +br_ms_sia_producao_ambulatorial.schedule = schedule_br_ms_sia_producao_ambulatorial br_ms_sia_psicossocial = deepcopy(flow_siasus) @@ -29,4 +29,4 @@ br_ms_sia_psicossocial.code_owners = ["Gabriel Pisa"] br_ms_sia_psicossocial.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_ms_sia_psicossocial.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) -#br_ms_sia_psicossocial.schedule = schedule_br_ms_sia_psicossocial +br_ms_sia_psicossocial.schedule = schedule_br_ms_sia_psicossocial diff --git a/pipelines/utils/crawler_datasus/tasks.py b/pipelines/utils/crawler_datasus/tasks.py index 25fb931d7..27d451922 100644 --- a/pipelines/utils/crawler_datasus/tasks.py +++ b/pipelines/utils/crawler_datasus/tasks.py @@ -353,7 +353,7 @@ def is_empty(lista): @task -def read_dbf_save_parquet_chunks(file_list: list, table_id: str, dataset_id:str= "br_ms_sia", chunk_size : int = 400000) -> str: +def read_dbf_save_parquet_chunks(file_list: list, table_id: str, dataset_id:str= "br_ms_sia", chunk_size : int = 200000) -> str: """ Convert dbc to parquet """ From cc3ecde82f1683b84ada76eb71abcaf531495977 Mon Sep 17 00:00:00 2001 From: uiro-bi Date: Tue, 10 Sep 2024 11:59:04 -0300 Subject: [PATCH 2/3] feat: set KubernetesRun momery_limit to 14Gi --- pipelines/utils/crawler_datasus/flows.py | 7 ++++++- pipelines/utils/crawler_datasus/tasks.py | 4 ++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/pipelines/utils/crawler_datasus/flows.py b/pipelines/utils/crawler_datasus/flows.py index b9c081638..fb162c92c 100644 --- a/pipelines/utils/crawler_datasus/flows.py +++ b/pipelines/utils/crawler_datasus/flows.py @@ -253,7 +253,12 @@ upstream_tasks=[wait_for_materialization], ) flow_siasus.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -flow_siasus.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +flow_siasus.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + memory_limit = '12Gi', + memory_request = '4Gi', + cpu_limit = 1, + ) diff --git a/pipelines/utils/crawler_datasus/tasks.py b/pipelines/utils/crawler_datasus/tasks.py index 27d451922..d770f8fd2 100644 --- a/pipelines/utils/crawler_datasus/tasks.py +++ b/pipelines/utils/crawler_datasus/tasks.py @@ -353,7 +353,7 @@ def is_empty(lista): @task -def read_dbf_save_parquet_chunks(file_list: list, table_id: str, dataset_id:str= "br_ms_sia", chunk_size : int = 200000) -> str: +def read_dbf_save_parquet_chunks(file_list: list, table_id: str, dataset_id:str= "br_ms_sia", chunk_size : int = 300000) -> str: """ Convert dbc to parquet """ @@ -362,7 +362,7 @@ def read_dbf_save_parquet_chunks(file_list: list, table_id: str, dataset_id:str= dbf_file_list = [file.replace(".dbc", ".dbf") for file in file_list] _counter = 0 - log(f'----coutner {_counter}') + log(f'----counter {_counter}') for file in tqdm(dbf_file_list): From cbc58d657adb814c9cbe7dd5d2d500de8bb73050 Mon Sep 17 00:00:00 2001 From: uiro-bi Date: Wed, 11 Sep 2024 09:34:51 -0300 Subject: [PATCH 3/3] feat: reduce chunksize to decode --- pipelines/utils/crawler_datasus/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/utils/crawler_datasus/tasks.py b/pipelines/utils/crawler_datasus/tasks.py index d770f8fd2..bd4b8a1e1 100644 --- a/pipelines/utils/crawler_datasus/tasks.py +++ b/pipelines/utils/crawler_datasus/tasks.py @@ -353,7 +353,7 @@ def is_empty(lista): @task -def read_dbf_save_parquet_chunks(file_list: list, table_id: str, dataset_id:str= "br_ms_sia", chunk_size : int = 300000) -> str: +def read_dbf_save_parquet_chunks(file_list: list, table_id: str, dataset_id:str= "br_ms_sia", chunk_size : int = 100000) -> str: """ Convert dbc to parquet """