Skip to content

Commit

Permalink
fix function task
Browse files Browse the repository at this point in the history
  • Loading branch information
tricktx committed Nov 13, 2024
1 parent 27b0672 commit 265c43d
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 20 deletions.
12 changes: 5 additions & 7 deletions pipelines/utils/crawler_anatel/telefonia_movel/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
get_max_date_in_table_microdados,
get_year_full,
get_semester,
unzip,
#unzip,
)
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
Expand All @@ -26,9 +26,7 @@
rename_current_flow_run_dataset_table,
)

with Flow(
name="BD template - Anatel Telefonia Móvel", code_owners=["trick"]
) as flow_anatel_telefonia_movel:
with Flow(name="BD template - Anatel Telefonia Móvel", code_owners=["trick"]) as flow_anatel_telefonia_movel:
# Parameters
dataset_id = Parameter(
"dataset_id", default="br_anatel_telefonia_movel", required=True
Expand Down Expand Up @@ -61,9 +59,9 @@
# Function dynamic parameters
# https://discourse.prefect.io/t/my-parameter-value-shows-the-same-date-every-day-how-can-i-set-parameter-value-dynamically/99
#####
unzip = unzip()
new_year = get_year_full(year=ano, upstream_tasks=[unzip])
new_semester = get_semester(semester=semestre, year=ano, upstream_tasks=[unzip])
# unzip_task = unzip()
new_year = get_year_full(ano)
new_semester = get_semester(semestre, upstream_tasks=[new_year])

update_tables = get_max_date_in_table_microdados(
ano=new_year, semestre=new_semester, upstream_tasks=[new_year, new_semester]
Expand Down
27 changes: 15 additions & 12 deletions pipelines/utils/crawler_anatel/telefonia_movel/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@ def join_tables_in_function(table_id, semestre, ano):
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def get_max_date_in_table_microdados(ano: int, semestre: int):
def get_max_date_in_table_microdados(ano, semestre):
log("Obtendo a data máxima da tabela microdados...")
log(
f"{anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{ano}_{semestre}S.csv"
)
df = pd.read_csv(
f"{anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{ano}_{semestre}S.csv",
sep=";",
Expand All @@ -73,24 +76,24 @@ def unzip():
return unzip_file()


@task(
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
@task
def get_year_full(year):
log("Obtendo o ano...")
if year is None:
return get_year

return get_year()

@task(
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def get_semester(semester, year):

@task
def get_semester(semester):
log("Obtendo o semestre...")
ano = get_year()
if semester is None:
if os.path.exists(
f"{anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{get_year_full(year=year)}_2S.csv"
f"{anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{ano}_2S.csv"
):
log("Segundo semestre")
return 2
else:
log("Primeiro semestre")
return 1
1 change: 0 additions & 1 deletion pipelines/utils/crawler_anatel/telefonia_movel/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ def clean_csv_municipio(table_id):
def get_year():
lista = []
for x in os.listdir(anatel_constants.INPUT_PATH.value):
print(x)
parts = x.split("_")
if len(parts) > 3:
x = parts[3]
Expand Down

0 comments on commit 265c43d

Please sign in to comment.