From 265c43de4a293c505681aa29aefed46c7e3ca995 Mon Sep 17 00:00:00 2001 From: tricktx Date: Wed, 13 Nov 2024 11:37:29 -0300 Subject: [PATCH] fix function task --- .../crawler_anatel/telefonia_movel/flows.py | 12 ++++----- .../crawler_anatel/telefonia_movel/tasks.py | 27 ++++++++++--------- .../crawler_anatel/telefonia_movel/utils.py | 1 - 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/pipelines/utils/crawler_anatel/telefonia_movel/flows.py b/pipelines/utils/crawler_anatel/telefonia_movel/flows.py index b6c8b7453..c433c43f0 100644 --- a/pipelines/utils/crawler_anatel/telefonia_movel/flows.py +++ b/pipelines/utils/crawler_anatel/telefonia_movel/flows.py @@ -11,7 +11,7 @@ get_max_date_in_table_microdados, get_year_full, get_semester, - unzip, + #unzip, ) from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow @@ -26,9 +26,7 @@ rename_current_flow_run_dataset_table, ) -with Flow( - name="BD template - Anatel Telefonia Móvel", code_owners=["trick"] -) as flow_anatel_telefonia_movel: +with Flow(name="BD template - Anatel Telefonia Móvel", code_owners=["trick"]) as flow_anatel_telefonia_movel: # Parameters dataset_id = Parameter( "dataset_id", default="br_anatel_telefonia_movel", required=True @@ -61,9 +59,9 @@ # Function dynamic parameters # https://discourse.prefect.io/t/my-parameter-value-shows-the-same-date-every-day-how-can-i-set-parameter-value-dynamically/99 ##### - unzip = unzip() - new_year = get_year_full(year=ano, upstream_tasks=[unzip]) - new_semester = get_semester(semester=semestre, year=ano, upstream_tasks=[unzip]) + # unzip_task = unzip() + new_year = get_year_full(ano) + new_semester = get_semester(semestre, upstream_tasks=[new_year]) update_tables = get_max_date_in_table_microdados( ano=new_year, semestre=new_semester, upstream_tasks=[new_year, new_semester] diff --git a/pipelines/utils/crawler_anatel/telefonia_movel/tasks.py b/pipelines/utils/crawler_anatel/telefonia_movel/tasks.py index 93f70c83d..43193a5fe 100644 --- a/pipelines/utils/crawler_anatel/telefonia_movel/tasks.py +++ b/pipelines/utils/crawler_anatel/telefonia_movel/tasks.py @@ -48,8 +48,11 @@ def join_tables_in_function(table_id, semestre, ano): max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) -def get_max_date_in_table_microdados(ano: int, semestre: int): +def get_max_date_in_table_microdados(ano, semestre): log("Obtendo a data máxima da tabela microdados...") + log( + f"{anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{ano}_{semestre}S.csv" + ) df = pd.read_csv( f"{anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{ano}_{semestre}S.csv", sep=";", @@ -73,24 +76,24 @@ def unzip(): return unzip_file() -@task( - max_retries=constants.TASK_MAX_RETRIES.value, - retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), -) +@task def get_year_full(year): + log("Obtendo o ano...") if year is None: - return get_year + return get_year() -@task( - max_retries=constants.TASK_MAX_RETRIES.value, - retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), -) -def get_semester(semester, year): + +@task +def get_semester(semester): + log("Obtendo o semestre...") + ano = get_year() if semester is None: if os.path.exists( - f"{anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{get_year_full(year=year)}_2S.csv" + f"{anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{ano}_2S.csv" ): + log("Segundo semestre") return 2 else: + log("Primeiro semestre") return 1 diff --git a/pipelines/utils/crawler_anatel/telefonia_movel/utils.py b/pipelines/utils/crawler_anatel/telefonia_movel/utils.py index 8d2259a0b..1d44609fc 100644 --- a/pipelines/utils/crawler_anatel/telefonia_movel/utils.py +++ b/pipelines/utils/crawler_anatel/telefonia_movel/utils.py @@ -212,7 +212,6 @@ def clean_csv_municipio(table_id): def get_year(): lista = [] for x in os.listdir(anatel_constants.INPUT_PATH.value): - print(x) parts = x.split("_") if len(parts) > 3: x = parts[3]