Skip to content

Commit

Permalink
fix-year-semester-anatel
Browse files Browse the repository at this point in the history
  • Loading branch information
tricktx committed Nov 12, 2024
1 parent 496c158 commit cc46c08
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 15 deletions.
7 changes: 4 additions & 3 deletions pipelines/utils/crawler_anatel/banda_larga_fixa/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from pipelines.constants import constants
from pipelines.utils.crawler_anatel.banda_larga_fixa.tasks import (
join_tables_in_function,
get_max_date_in_table_microdados
get_max_date_in_table_microdados,
get_year_and_unzip,
)
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
Expand Down Expand Up @@ -39,7 +40,7 @@
)
dbt_alias = Parameter("dbt_alias", default=True, required=False)

ano = Parameter("ano", default=2024, required=False)
ano = Parameter("ano", default=get_year_and_unzip(), required=False)

update_metadata = Parameter("update_metadata", default=True, required=False)

Expand Down Expand Up @@ -115,4 +116,4 @@
)

flow_anatel_banda_larga_fixa.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
flow_anatel_banda_larga_fixa.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
flow_anatel_banda_larga_fixa.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
13 changes: 10 additions & 3 deletions pipelines/utils/crawler_anatel/banda_larga_fixa/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
treatment_br,
treatment_uf,
treatment_municipio,
unzip_file
unzip_file,
get_year,
)
from pipelines.utils.utils import log, to_partitions

Expand Down Expand Up @@ -43,7 +44,6 @@ def join_tables_in_function(table_id: str, ano):
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def get_max_date_in_table_microdados(ano: int):
unzip_file()
log("Obtendo a data máxima do arquivo microdados da Anatel")
df = pd.read_csv(
f"{anatel_constants.INPUT_PATH.value}Acessos_Banda_Larga_Fixa_{ano}.csv",
Expand All @@ -57,4 +57,11 @@ def get_max_date_in_table_microdados(ano: int):

log(df['data'].max())

return df['data'].max()
return df['data'].max()


def get_year_and_unzip():
log("Download dos dados...")
unzip_file()

return get_year()
18 changes: 16 additions & 2 deletions pipelines/utils/crawler_anatel/banda_larga_fixa/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ def treatment_uf(table_id:str):
)



def treatment_municipio(table_id:str):
log("Iniciando o tratamento do arquivo densidade municipio da Anatel")
df = pd.read_csv(
Expand All @@ -211,4 +210,19 @@ def treatment_municipio(table_id:str):
df_municipio,
partition_columns=["ano"],
savepath=anatel_constants.TABLES_OUTPUT_PATH.value[table_id],
)
)


def get_year():
lista = []
for x in os.listdir(anatel_constants.INPUT_PATH.value):
print(x)
parts = x.split("_")
if len(parts) > 3:
x = parts[3]
if len(x) == 4:
lista.append(x)

max_year = max(lista)
log(f"Ano máximo: {max_year}")
return max_year
13 changes: 9 additions & 4 deletions pipelines/utils/crawler_anatel/telefonia_movel/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
from pipelines.constants import constants
from pipelines.utils.crawler_anatel.telefonia_movel.tasks import (
join_tables_in_function,
get_max_date_in_table_microdados
get_max_date_in_table_microdados,
get_semester,
get_year_and_unzip,
)
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
Expand Down Expand Up @@ -42,9 +44,12 @@
)
dbt_alias = Parameter("dbt_alias", default=True, required=False)

ano = Parameter("ano", default=2024, required=False)
year = get_year_and_unzip()
semestre = get_semester()

semestre = Parameter("semestre", default=1, required=False)
ano = Parameter("ano", default=year, required=False)

semestre = Parameter("semestre", default=semestre, required=False)

update_metadata = Parameter("update_metadata", default=True, required=False)

Expand Down Expand Up @@ -127,4 +132,4 @@
)

flow_anatel_telefonia_movel.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
flow_anatel_telefonia_movel.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
flow_anatel_telefonia_movel.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
22 changes: 20 additions & 2 deletions pipelines/utils/crawler_anatel/telefonia_movel/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
clean_csv_brasil,
clean_csv_municipio,
clean_csv_uf,
get_year,
)
from pipelines.utils.utils import log, to_partitions

Expand Down Expand Up @@ -47,7 +48,7 @@ def join_tables_in_function(table_id, semestre, ano):
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def get_max_date_in_table_microdados(ano: int, semestre: int):
unzip_file()
log(f"PATH: {anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{ano}_{semestre}S.csv")
log("Obtendo a data máxima da tabela microdados...")
df = pd.read_csv(
f"{anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{ano}_{semestre}S.csv",
Expand All @@ -61,4 +62,21 @@ def get_max_date_in_table_microdados(ano: int, semestre: int):

log(df['data'].max())

return df['data'].max()
return df['data'].max()



def get_year_and_unzip():
log("Download dos dados...")
unzip_file()

return get_year()


def get_semester():
if os.path.exists(
f"{anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{get_year()}_2S.csv"
):
return 2
else:
return 1
17 changes: 16 additions & 1 deletion pipelines/utils/crawler_anatel/telefonia_movel/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,19 @@ def clean_csv_municipio(table_id):
sep=",",
encoding="utf-8",
na_rep="",
)
)


def get_year():
lista = []
for x in os.listdir(anatel_constants.INPUT_PATH.value):
print(x)
parts = x.split("_")
if len(parts) > 3:
x = parts[3]
if len(x) == 4:
lista.append(x)

max_year = max(lista)
log(f"Ano máximo: {max_year}")
return max_year

0 comments on commit cc46c08

Please sign in to comment.