Skip to content

Commit

Permalink
register flow fix anatel
Browse files Browse the repository at this point in the history
  • Loading branch information
tricktx committed Nov 13, 2024
1 parent 496c158 commit 27b0672
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 23 deletions.
20 changes: 15 additions & 5 deletions pipelines/utils/crawler_anatel/banda_larga_fixa/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from pipelines.constants import constants
from pipelines.utils.crawler_anatel.banda_larga_fixa.tasks import (
join_tables_in_function,
get_max_date_in_table_microdados
get_max_date_in_table_microdados,
get_year_and_unzip,
)
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
Expand Down Expand Up @@ -39,7 +40,7 @@
)
dbt_alias = Parameter("dbt_alias", default=True, required=False)

ano = Parameter("ano", default=2024, required=False)
ano = Parameter("ano", default=None, required=False)

update_metadata = Parameter("update_metadata", default=True, required=False)

Expand All @@ -50,7 +51,14 @@
wait=table_id,
)

update_tables = get_max_date_in_table_microdados(ano=ano)
#####
# Function dynamic parameters
# https://discourse.prefect.io/t/my-parameter-value-shows-the-same-date-every-day-how-can-i-set-parameter-value-dynamically/99
#####

new_ano = get_year_and_unzip(day=ano)

update_tables = get_max_date_in_table_microdados(ano=new_ano)

get_max_date = check_if_data_is_outdated(
dataset_id = dataset_id,
Expand All @@ -59,7 +67,9 @@
date_format = "%Y-%m")

with case(get_max_date, True):
filepath = join_tables_in_function(table_id = table_id, ano=ano, upstream_tasks=[get_max_date])
filepath = join_tables_in_function(
table_id=table_id, ano=new_ano, upstream_tasks=[get_max_date]
)

wait_upload_table = create_table_and_upload_to_gcs(
data_path=filepath,
Expand Down Expand Up @@ -115,4 +125,4 @@
)

flow_anatel_banda_larga_fixa.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
flow_anatel_banda_larga_fixa.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
flow_anatel_banda_larga_fixa.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
15 changes: 12 additions & 3 deletions pipelines/utils/crawler_anatel/banda_larga_fixa/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
treatment_br,
treatment_uf,
treatment_municipio,
unzip_file
unzip_file,
get_year,
)
from pipelines.utils.utils import log, to_partitions

Expand Down Expand Up @@ -43,7 +44,6 @@ def join_tables_in_function(table_id: str, ano):
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def get_max_date_in_table_microdados(ano: int):
unzip_file()
log("Obtendo a data máxima do arquivo microdados da Anatel")
df = pd.read_csv(
f"{anatel_constants.INPUT_PATH.value}Acessos_Banda_Larga_Fixa_{ano}.csv",
Expand All @@ -57,4 +57,13 @@ def get_max_date_in_table_microdados(ano: int):

log(df['data'].max())

return df['data'].max()
return df['data'].max()


@task
def get_year_and_unzip(day):
if day is None:
log("Download dos dados...")
unzip_file()

return get_year()
17 changes: 15 additions & 2 deletions pipelines/utils/crawler_anatel/banda_larga_fixa/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ def treatment_uf(table_id:str):
)



def treatment_municipio(table_id:str):
log("Iniciando o tratamento do arquivo densidade municipio da Anatel")
df = pd.read_csv(
Expand All @@ -211,4 +210,18 @@ def treatment_municipio(table_id:str):
df_municipio,
partition_columns=["ano"],
savepath=anatel_constants.TABLES_OUTPUT_PATH.value[table_id],
)
)


def get_year():
lista = []
for x in os.listdir(anatel_constants.INPUT_PATH.value):
parts = x.split("_")
if len(parts) > 4:
x = parts[4]
if len(x) == 4:
lista.append(x)

max_year = max(lista)
log(f"Ano máximo: {max_year}")
return max_year
32 changes: 22 additions & 10 deletions pipelines/utils/crawler_anatel/telefonia_movel/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
from pipelines.constants import constants
from pipelines.utils.crawler_anatel.telefonia_movel.tasks import (
join_tables_in_function,
get_max_date_in_table_microdados
get_max_date_in_table_microdados,
get_year_full,
get_semester,
unzip,
)
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
Expand Down Expand Up @@ -42,9 +45,9 @@
)
dbt_alias = Parameter("dbt_alias", default=True, required=False)

ano = Parameter("ano", default=2024, required=False)
ano = Parameter("ano", default=None, required=False)

semestre = Parameter("semestre", default=1, required=False)
semestre = Parameter("semestre", default=None, required=False)

update_metadata = Parameter("update_metadata", default=True, required=False)

Expand All @@ -54,8 +57,17 @@
table_id=table_id,
wait=table_id,
)
#####
# Function dynamic parameters
# https://discourse.prefect.io/t/my-parameter-value-shows-the-same-date-every-day-how-can-i-set-parameter-value-dynamically/99
#####
unzip = unzip()
new_year = get_year_full(year=ano, upstream_tasks=[unzip])
new_semester = get_semester(semester=semestre, year=ano, upstream_tasks=[unzip])

update_tables = get_max_date_in_table_microdados(ano=ano, semestre=semestre)
update_tables = get_max_date_in_table_microdados(
ano=new_year, semestre=new_semester, upstream_tasks=[new_year, new_semester]
)

get_max_date = check_if_data_is_outdated(
dataset_id = dataset_id,
Expand All @@ -67,11 +79,11 @@
with case(get_max_date, True):

filepath = join_tables_in_function(
table_id = table_id,
ano=ano,
semestre=semestre,
upstream_tasks=[get_max_date]
)
table_id=table_id,
ano=new_year,
semestre=new_semester,
upstream_tasks=[get_max_date],
)

wait_upload_table = create_table_and_upload_to_gcs(
data_path=filepath,
Expand Down Expand Up @@ -127,4 +139,4 @@
)

flow_anatel_telefonia_movel.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
flow_anatel_telefonia_movel.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
flow_anatel_telefonia_movel.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
36 changes: 34 additions & 2 deletions pipelines/utils/crawler_anatel/telefonia_movel/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
clean_csv_brasil,
clean_csv_municipio,
clean_csv_uf,
get_year

)
from pipelines.utils.utils import log, to_partitions

Expand Down Expand Up @@ -47,7 +49,6 @@ def join_tables_in_function(table_id, semestre, ano):
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def get_max_date_in_table_microdados(ano: int, semestre: int):
unzip_file()
log("Obtendo a data máxima da tabela microdados...")
df = pd.read_csv(
f"{anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{ano}_{semestre}S.csv",
Expand All @@ -61,4 +62,35 @@ def get_max_date_in_table_microdados(ano: int, semestre: int):

log(df['data'].max())

return df['data'].max()
return df['data'].max()


@task(
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def unzip():
return unzip_file()


@task(
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def get_year_full(year):
if year is None:
return get_year


@task(
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def get_semester(semester, year):
if semester is None:
if os.path.exists(
f"{anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{get_year_full(year=year)}_2S.csv"
):
return 2
else:
return 1
17 changes: 16 additions & 1 deletion pipelines/utils/crawler_anatel/telefonia_movel/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,19 @@ def clean_csv_municipio(table_id):
sep=",",
encoding="utf-8",
na_rep="",
)
)


def get_year():
lista = []
for x in os.listdir(anatel_constants.INPUT_PATH.value):
print(x)
parts = x.split("_")
if len(parts) > 3:
x = parts[3]
if len(x) == 4:
lista.append(x)

max_year = max(lista)
log(f"Ano máximo: {max_year}")
return max_year

0 comments on commit 27b0672

Please sign in to comment.