diff --git a/pipelines/utils/crawler_anatel/banda_larga_fixa/flows.py b/pipelines/utils/crawler_anatel/banda_larga_fixa/flows.py index 38b145d76..e5aef446c 100644 --- a/pipelines/utils/crawler_anatel/banda_larga_fixa/flows.py +++ b/pipelines/utils/crawler_anatel/banda_larga_fixa/flows.py @@ -7,7 +7,8 @@ from pipelines.constants import constants from pipelines.utils.crawler_anatel.banda_larga_fixa.tasks import ( join_tables_in_function, - get_max_date_in_table_microdados + get_max_date_in_table_microdados, + get_year_and_unzip, ) from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow @@ -39,7 +40,7 @@ ) dbt_alias = Parameter("dbt_alias", default=True, required=False) - ano = Parameter("ano", default=2024, required=False) + ano = Parameter("ano", default=None, required=False) update_metadata = Parameter("update_metadata", default=True, required=False) @@ -50,7 +51,14 @@ wait=table_id, ) - update_tables = get_max_date_in_table_microdados(ano=ano) + ##### + # Function dynamic parameters + # https://discourse.prefect.io/t/my-parameter-value-shows-the-same-date-every-day-how-can-i-set-parameter-value-dynamically/99 + ##### + + new_ano = get_year_and_unzip(day=ano) + + update_tables = get_max_date_in_table_microdados(ano=new_ano) get_max_date = check_if_data_is_outdated( dataset_id = dataset_id, @@ -59,7 +67,9 @@ date_format = "%Y-%m") with case(get_max_date, True): - filepath = join_tables_in_function(table_id = table_id, ano=ano, upstream_tasks=[get_max_date]) + filepath = join_tables_in_function( + table_id=table_id, ano=new_ano, upstream_tasks=[get_max_date] + ) wait_upload_table = create_table_and_upload_to_gcs( data_path=filepath, @@ -115,4 +125,4 @@ ) flow_anatel_banda_larga_fixa.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -flow_anatel_banda_larga_fixa.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) \ No newline at end of file +flow_anatel_banda_larga_fixa.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) diff --git a/pipelines/utils/crawler_anatel/banda_larga_fixa/tasks.py b/pipelines/utils/crawler_anatel/banda_larga_fixa/tasks.py index f9a33dec2..6fda76bdc 100644 --- a/pipelines/utils/crawler_anatel/banda_larga_fixa/tasks.py +++ b/pipelines/utils/crawler_anatel/banda_larga_fixa/tasks.py @@ -13,7 +13,8 @@ treatment_br, treatment_uf, treatment_municipio, - unzip_file + unzip_file, + get_year, ) from pipelines.utils.utils import log, to_partitions @@ -43,7 +44,6 @@ def join_tables_in_function(table_id: str, ano): retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) def get_max_date_in_table_microdados(ano: int): - unzip_file() log("Obtendo a data máxima do arquivo microdados da Anatel") df = pd.read_csv( f"{anatel_constants.INPUT_PATH.value}Acessos_Banda_Larga_Fixa_{ano}.csv", @@ -57,4 +57,13 @@ def get_max_date_in_table_microdados(ano: int): log(df['data'].max()) - return df['data'].max() \ No newline at end of file + return df['data'].max() + + +@task +def get_year_and_unzip(day): + if day is None: + log("Download dos dados...") + unzip_file() + + return get_year() diff --git a/pipelines/utils/crawler_anatel/banda_larga_fixa/utils.py b/pipelines/utils/crawler_anatel/banda_larga_fixa/utils.py index e301d902a..fa6c28112 100644 --- a/pipelines/utils/crawler_anatel/banda_larga_fixa/utils.py +++ b/pipelines/utils/crawler_anatel/banda_larga_fixa/utils.py @@ -188,7 +188,6 @@ def treatment_uf(table_id:str): ) - def treatment_municipio(table_id:str): log("Iniciando o tratamento do arquivo densidade municipio da Anatel") df = pd.read_csv( @@ -211,4 +210,18 @@ def treatment_municipio(table_id:str): df_municipio, partition_columns=["ano"], savepath=anatel_constants.TABLES_OUTPUT_PATH.value[table_id], - ) \ No newline at end of file + ) + + +def get_year(): + lista = [] + for x in os.listdir(anatel_constants.INPUT_PATH.value): + parts = x.split("_") + if len(parts) > 4: + x = parts[4] + if len(x) == 4: + lista.append(x) + + max_year = max(lista) + log(f"Ano máximo: {max_year}") + return max_year diff --git a/pipelines/utils/crawler_anatel/telefonia_movel/flows.py b/pipelines/utils/crawler_anatel/telefonia_movel/flows.py index 2974614ba..b6c8b7453 100644 --- a/pipelines/utils/crawler_anatel/telefonia_movel/flows.py +++ b/pipelines/utils/crawler_anatel/telefonia_movel/flows.py @@ -8,7 +8,10 @@ from pipelines.constants import constants from pipelines.utils.crawler_anatel.telefonia_movel.tasks import ( join_tables_in_function, - get_max_date_in_table_microdados + get_max_date_in_table_microdados, + get_year_full, + get_semester, + unzip, ) from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow @@ -42,9 +45,9 @@ ) dbt_alias = Parameter("dbt_alias", default=True, required=False) - ano = Parameter("ano", default=2024, required=False) + ano = Parameter("ano", default=None, required=False) - semestre = Parameter("semestre", default=1, required=False) + semestre = Parameter("semestre", default=None, required=False) update_metadata = Parameter("update_metadata", default=True, required=False) @@ -54,8 +57,17 @@ table_id=table_id, wait=table_id, ) + ##### + # Function dynamic parameters + # https://discourse.prefect.io/t/my-parameter-value-shows-the-same-date-every-day-how-can-i-set-parameter-value-dynamically/99 + ##### + unzip = unzip() + new_year = get_year_full(year=ano, upstream_tasks=[unzip]) + new_semester = get_semester(semester=semestre, year=ano, upstream_tasks=[unzip]) - update_tables = get_max_date_in_table_microdados(ano=ano, semestre=semestre) + update_tables = get_max_date_in_table_microdados( + ano=new_year, semestre=new_semester, upstream_tasks=[new_year, new_semester] + ) get_max_date = check_if_data_is_outdated( dataset_id = dataset_id, @@ -67,11 +79,11 @@ with case(get_max_date, True): filepath = join_tables_in_function( - table_id = table_id, - ano=ano, - semestre=semestre, - upstream_tasks=[get_max_date] - ) + table_id=table_id, + ano=new_year, + semestre=new_semester, + upstream_tasks=[get_max_date], + ) wait_upload_table = create_table_and_upload_to_gcs( data_path=filepath, @@ -127,4 +139,4 @@ ) flow_anatel_telefonia_movel.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -flow_anatel_telefonia_movel.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) \ No newline at end of file +flow_anatel_telefonia_movel.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) diff --git a/pipelines/utils/crawler_anatel/telefonia_movel/tasks.py b/pipelines/utils/crawler_anatel/telefonia_movel/tasks.py index 4cc8fb5d6..93f70c83d 100644 --- a/pipelines/utils/crawler_anatel/telefonia_movel/tasks.py +++ b/pipelines/utils/crawler_anatel/telefonia_movel/tasks.py @@ -18,6 +18,8 @@ clean_csv_brasil, clean_csv_municipio, clean_csv_uf, + get_year + ) from pipelines.utils.utils import log, to_partitions @@ -47,7 +49,6 @@ def join_tables_in_function(table_id, semestre, ano): retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) def get_max_date_in_table_microdados(ano: int, semestre: int): - unzip_file() log("Obtendo a data máxima da tabela microdados...") df = pd.read_csv( f"{anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{ano}_{semestre}S.csv", @@ -61,4 +62,35 @@ def get_max_date_in_table_microdados(ano: int, semestre: int): log(df['data'].max()) - return df['data'].max() \ No newline at end of file + return df['data'].max() + + +@task( + max_retries=constants.TASK_MAX_RETRIES.value, + retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), +) +def unzip(): + return unzip_file() + + +@task( + max_retries=constants.TASK_MAX_RETRIES.value, + retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), +) +def get_year_full(year): + if year is None: + return get_year + + +@task( + max_retries=constants.TASK_MAX_RETRIES.value, + retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), +) +def get_semester(semester, year): + if semester is None: + if os.path.exists( + f"{anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{get_year_full(year=year)}_2S.csv" + ): + return 2 + else: + return 1 diff --git a/pipelines/utils/crawler_anatel/telefonia_movel/utils.py b/pipelines/utils/crawler_anatel/telefonia_movel/utils.py index b908eeb25..8d2259a0b 100644 --- a/pipelines/utils/crawler_anatel/telefonia_movel/utils.py +++ b/pipelines/utils/crawler_anatel/telefonia_movel/utils.py @@ -206,4 +206,19 @@ def clean_csv_municipio(table_id): sep=",", encoding="utf-8", na_rep="", - ) \ No newline at end of file + ) + + +def get_year(): + lista = [] + for x in os.listdir(anatel_constants.INPUT_PATH.value): + print(x) + parts = x.split("_") + if len(parts) > 3: + x = parts[3] + if len(x) == 4: + lista.append(x) + + max_year = max(lista) + log(f"Ano máximo: {max_year}") + return max_year