diff --git a/pipelines/utils/crawler_anatel/banda_larga_fixa/flows.py b/pipelines/utils/crawler_anatel/banda_larga_fixa/flows.py index 38b145d76..a97fb40e6 100644 --- a/pipelines/utils/crawler_anatel/banda_larga_fixa/flows.py +++ b/pipelines/utils/crawler_anatel/banda_larga_fixa/flows.py @@ -7,7 +7,8 @@ from pipelines.constants import constants from pipelines.utils.crawler_anatel.banda_larga_fixa.tasks import ( join_tables_in_function, - get_max_date_in_table_microdados + get_max_date_in_table_microdados, + get_year_and_unzip, ) from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow @@ -39,7 +40,7 @@ ) dbt_alias = Parameter("dbt_alias", default=True, required=False) - ano = Parameter("ano", default=2024, required=False) + ano = Parameter("ano", default=None, required=False) update_metadata = Parameter("update_metadata", default=True, required=False) @@ -50,7 +51,14 @@ wait=table_id, ) - update_tables = get_max_date_in_table_microdados(ano=ano) + ##### + # Function dynamic parameters + # https://discourse.prefect.io/t/my-parameter-value-shows-the-same-date-every-day-how-can-i-set-parameter-value-dynamically/99 + ##### + + new_ano = get_year_and_unzip(day=ano) + + update_tables = get_max_date_in_table_microdados(ano=new_ano, table_id=table_id, upstream_tasks=[new_ano]) get_max_date = check_if_data_is_outdated( dataset_id = dataset_id, @@ -59,7 +67,9 @@ date_format = "%Y-%m") with case(get_max_date, True): - filepath = join_tables_in_function(table_id = table_id, ano=ano, upstream_tasks=[get_max_date]) + filepath = join_tables_in_function( + table_id=table_id, ano=new_ano, upstream_tasks=[get_max_date] + ) wait_upload_table = create_table_and_upload_to_gcs( data_path=filepath, @@ -115,4 +125,4 @@ ) flow_anatel_banda_larga_fixa.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -flow_anatel_banda_larga_fixa.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) \ No newline at end of file +flow_anatel_banda_larga_fixa.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) diff --git a/pipelines/utils/crawler_anatel/banda_larga_fixa/tasks.py b/pipelines/utils/crawler_anatel/banda_larga_fixa/tasks.py index f9a33dec2..564f411d8 100644 --- a/pipelines/utils/crawler_anatel/banda_larga_fixa/tasks.py +++ b/pipelines/utils/crawler_anatel/banda_larga_fixa/tasks.py @@ -13,7 +13,8 @@ treatment_br, treatment_uf, treatment_municipio, - unzip_file + unzip_file, + get_year, ) from pipelines.utils.utils import log, to_partitions @@ -42,19 +43,45 @@ def join_tables_in_function(table_id: str, ano): max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) -def get_max_date_in_table_microdados(ano: int): - unzip_file() - log("Obtendo a data máxima do arquivo microdados da Anatel") - df = pd.read_csv( - f"{anatel_constants.INPUT_PATH.value}Acessos_Banda_Larga_Fixa_{ano}.csv", +def get_max_date_in_table_microdados(table_id: str, ano: int): + if table_id == "microdados": + log("Obtendo a data máxima do arquivo microdados da Anatel") + df = pd.read_csv( + f"{anatel_constants.INPUT_PATH.value}Acessos_Banda_Larga_Fixa_{ano}.csv", + sep=";", + encoding="utf-8", + dtype=str + ) + df['data'] = df['Ano'] + '-' + df['Mês'] + + df['data'] = pd.to_datetime(df['data'], format="%Y-%m") + + log(df['data'].max()) + + return df['data'].max() + + else: + log(f"{anatel_constants.INPUT_PATH.value}Densidade_Telefonia_Movel.csv") + + df = pd.read_csv( + f"{anatel_constants.INPUT_PATH.value}Densidade_Telefonia_Movel.csv", sep=";", encoding="utf-8", dtype=str - ) - df['data'] = df['Ano'] + '-' + df['Mês'] + ) + df['data'] = df['Ano'] + '-' + df['Mês'] + + df['data'] = pd.to_datetime(df['data'], format="%Y-%m") + + log(df['data'].max()) + + return df['data'].max() - df['data'] = pd.to_datetime(df['data'], format="%Y-%m") - log(df['data'].max()) +@task +def get_year_and_unzip(day): + if day is None: + log("Download dos dados...") + unzip_file() - return df['data'].max() \ No newline at end of file + return get_year() diff --git a/pipelines/utils/crawler_anatel/banda_larga_fixa/utils.py b/pipelines/utils/crawler_anatel/banda_larga_fixa/utils.py index e301d902a..fa6c28112 100644 --- a/pipelines/utils/crawler_anatel/banda_larga_fixa/utils.py +++ b/pipelines/utils/crawler_anatel/banda_larga_fixa/utils.py @@ -188,7 +188,6 @@ def treatment_uf(table_id:str): ) - def treatment_municipio(table_id:str): log("Iniciando o tratamento do arquivo densidade municipio da Anatel") df = pd.read_csv( @@ -211,4 +210,18 @@ def treatment_municipio(table_id:str): df_municipio, partition_columns=["ano"], savepath=anatel_constants.TABLES_OUTPUT_PATH.value[table_id], - ) \ No newline at end of file + ) + + +def get_year(): + lista = [] + for x in os.listdir(anatel_constants.INPUT_PATH.value): + parts = x.split("_") + if len(parts) > 4: + x = parts[4] + if len(x) == 4: + lista.append(x) + + max_year = max(lista) + log(f"Ano máximo: {max_year}") + return max_year diff --git a/pipelines/utils/crawler_anatel/telefonia_movel/flows.py b/pipelines/utils/crawler_anatel/telefonia_movel/flows.py index 2974614ba..2c0b6255e 100644 --- a/pipelines/utils/crawler_anatel/telefonia_movel/flows.py +++ b/pipelines/utils/crawler_anatel/telefonia_movel/flows.py @@ -8,7 +8,10 @@ from pipelines.constants import constants from pipelines.utils.crawler_anatel.telefonia_movel.tasks import ( join_tables_in_function, - get_max_date_in_table_microdados + get_max_date_in_table_microdados, + get_year_full, + get_semester, + unzip, ) from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow @@ -23,9 +26,7 @@ rename_current_flow_run_dataset_table, ) -with Flow( - name="BD template - Anatel Telefonia Móvel", code_owners=["trick"] -) as flow_anatel_telefonia_movel: +with Flow(name="BD template - Anatel Telefonia Móvel", code_owners=["trick"]) as flow_anatel_telefonia_movel: # Parameters dataset_id = Parameter( "dataset_id", default="br_anatel_telefonia_movel", required=True @@ -42,9 +43,9 @@ ) dbt_alias = Parameter("dbt_alias", default=True, required=False) - ano = Parameter("ano", default=2024, required=False) + ano = Parameter("ano", default=None, required=False) - semestre = Parameter("semestre", default=1, required=False) + semestre = Parameter("semestre", default=None, required=False) update_metadata = Parameter("update_metadata", default=True, required=False) @@ -54,8 +55,17 @@ table_id=table_id, wait=table_id, ) + ##### + # Function dynamic parameters + # https://discourse.prefect.io/t/my-parameter-value-shows-the-same-date-every-day-how-can-i-set-parameter-value-dynamically/99 + ##### + unzip_task = unzip() + new_year = get_year_full(ano, upstream_tasks=[unzip_task]) + new_semester = get_semester(semestre, upstream_tasks=[new_year]) - update_tables = get_max_date_in_table_microdados(ano=ano, semestre=semestre) + update_tables = get_max_date_in_table_microdados( + table_id = table_id, ano=new_year, semestre=new_semester, upstream_tasks=[new_year, new_semester] + ) get_max_date = check_if_data_is_outdated( dataset_id = dataset_id, @@ -67,11 +77,11 @@ with case(get_max_date, True): filepath = join_tables_in_function( - table_id = table_id, - ano=ano, - semestre=semestre, - upstream_tasks=[get_max_date] - ) + table_id=table_id, + ano=new_year, + semestre=new_semester, + upstream_tasks=[get_max_date], + ) wait_upload_table = create_table_and_upload_to_gcs( data_path=filepath, @@ -127,4 +137,4 @@ ) flow_anatel_telefonia_movel.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -flow_anatel_telefonia_movel.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) \ No newline at end of file +flow_anatel_telefonia_movel.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) diff --git a/pipelines/utils/crawler_anatel/telefonia_movel/tasks.py b/pipelines/utils/crawler_anatel/telefonia_movel/tasks.py index 4cc8fb5d6..48333dddc 100644 --- a/pipelines/utils/crawler_anatel/telefonia_movel/tasks.py +++ b/pipelines/utils/crawler_anatel/telefonia_movel/tasks.py @@ -18,6 +18,8 @@ clean_csv_brasil, clean_csv_municipio, clean_csv_uf, + get_year + ) from pipelines.utils.utils import log, to_partitions @@ -46,19 +48,71 @@ def join_tables_in_function(table_id, semestre, ano): max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) -def get_max_date_in_table_microdados(ano: int, semestre: int): - unzip_file() - log("Obtendo a data máxima da tabela microdados...") - df = pd.read_csv( - f"{anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{ano}_{semestre}S.csv", +def get_max_date_in_table_microdados(table_id, ano, semestre): + + if table_id == 'microdados': + log("Obtendo a data máxima da tabela microdados...") + log( + f"{anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{ano}_{semestre}S.csv" + ) + df = pd.read_csv( + f"{anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{ano}_{semestre}S.csv", + sep=";", + encoding="utf-8", + dtype=str + ) + df['data'] = df['Ano'] + '-' + df['Mês'] + + df['data'] = pd.to_datetime(df['data'], format="%Y-%m") + + log(df['data'].max()) + + return df['data'].max() + + else: + log(f"{anatel_constants.INPUT_PATH.value}Densidade_Telefonia_Movel.csv") + + df = pd.read_csv( + f"{anatel_constants.INPUT_PATH.value}Densidade_Telefonia_Movel.csv", sep=";", encoding="utf-8", dtype=str - ) - df['data'] = df['Ano'] + '-' + df['Mês'] + ) + df['data'] = df['Ano'] + '-' + df['Mês'] + + df['data'] = pd.to_datetime(df['data'], format="%Y-%m") + + log(df['data'].max()) + + return df['data'].max() + + +@task( + max_retries=constants.TASK_MAX_RETRIES.value, + retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), +) +def unzip(): + return unzip_file() + + +@task +def get_year_full(year): + log("Obtendo o ano...") + if year is None: - df['data'] = pd.to_datetime(df['data'], format="%Y-%m") + return get_year() - log(df['data'].max()) - return df['data'].max() \ No newline at end of file +@task +def get_semester(semester): + log("Obtendo o semestre...") + ano = get_year() + if semester is None: + if os.path.exists( + f"{anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{ano}_2S.csv" + ): + log("Segundo semestre") + return 2 + else: + log("Primeiro semestre") + return 1 diff --git a/pipelines/utils/crawler_anatel/telefonia_movel/utils.py b/pipelines/utils/crawler_anatel/telefonia_movel/utils.py index b908eeb25..1d44609fc 100644 --- a/pipelines/utils/crawler_anatel/telefonia_movel/utils.py +++ b/pipelines/utils/crawler_anatel/telefonia_movel/utils.py @@ -206,4 +206,18 @@ def clean_csv_municipio(table_id): sep=",", encoding="utf-8", na_rep="", - ) \ No newline at end of file + ) + + +def get_year(): + lista = [] + for x in os.listdir(anatel_constants.INPUT_PATH.value): + parts = x.split("_") + if len(parts) > 3: + x = parts[3] + if len(x) == 4: + lista.append(x) + + max_year = max(lista) + log(f"Ano máximo: {max_year}") + return max_year