diff --git a/pipelines/datasets/br_stf_corte_aberta/flows.py b/pipelines/datasets/br_stf_corte_aberta/flows.py index b29be6bd2..386a6358e 100644 --- a/pipelines/datasets/br_stf_corte_aberta/flows.py +++ b/pipelines/datasets/br_stf_corte_aberta/flows.py @@ -18,9 +18,8 @@ download_and_transform, make_partitions, check_for_updates, - get_for_date_max, ) - +from pipelines.datasets.br_stf_corte_aberta.utils import get_for_date_max from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, rename_current_flow_run_dataset_table, diff --git a/pipelines/datasets/br_stf_corte_aberta/tasks.py b/pipelines/datasets/br_stf_corte_aberta/tasks.py index a59a93517..823d3f51a 100644 --- a/pipelines/datasets/br_stf_corte_aberta/tasks.py +++ b/pipelines/datasets/br_stf_corte_aberta/tasks.py @@ -108,29 +108,3 @@ def download_and_transform(): df = replace_columns(df) return df - - -@task( - max_retries=constants.TASK_MAX_RETRIES.value, - retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), -) -def get_for_date_max(): - if not os.path.exists(stf_constants.STF_INPUT.value): - os.mkdir(stf_constants.STF_INPUT.value) - arquivos = os.listdir(stf_constants.STF_INPUT.value) - for arquivo in arquivos: - if arquivo.endswith(".csv"): - df = pd.read_csv(stf_constants.STF_INPUT.value + arquivo, dtype=str) - - df["Data da decisão"] = df["Data da decisão"].astype(str).str[0:10] - data_obj = df["Data da decisão"] = ( - df["Data da decisão"].astype(str).str[6:10] - + "-" - + df["Data da decisão"].astype(str).str[3:5] - + "-" - + df["Data da decisão"].astype(str).str[0:2] - ) - data_obj = data_obj.max() - data_obj = datetime.strptime(data_obj, "%Y-%m-%d").date() - - return str(data_obj) diff --git a/pipelines/datasets/br_stf_corte_aberta/utils.py b/pipelines/datasets/br_stf_corte_aberta/utils.py index d26930586..8e94d671a 100644 --- a/pipelines/datasets/br_stf_corte_aberta/utils.py +++ b/pipelines/datasets/br_stf_corte_aberta/utils.py @@ -197,3 +197,25 @@ def extract_last_date( except Exception as e: log(f"An error occurred while extracting the last update date: {str(e)}") raise + + +def get_for_date_max(): + if not os.path.exists(stf_constants.STF_INPUT.value): + os.mkdir(stf_constants.STF_INPUT.value) + arquivos = os.listdir(stf_constants.STF_INPUT.value) + for arquivo in arquivos: + if arquivo.endswith(".csv"): + df = pd.read_csv(stf_constants.STF_INPUT.value + arquivo, dtype=str) + + df["Data da decisão"] = df["Data da decisão"].astype(str).str[0:10] + data_obj = df["Data da decisão"] = ( + df["Data da decisão"].astype(str).str[6:10] + + "-" + + df["Data da decisão"].astype(str).str[3:5] + + "-" + + df["Data da decisão"].astype(str).str[0:2] + ) + data_obj = data_obj.max() + data_obj = datetime.strptime(data_obj, "%Y-%m-%d").date() + + return str(data_obj)