Skip to content

Commit

Permalink
Merge branch 'main' into staging/refactor_cnpj
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Nov 13, 2024
2 parents f58d23a + 66f7613 commit 68a1e6b
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 42 deletions.
20 changes: 15 additions & 5 deletions pipelines/utils/crawler_anatel/banda_larga_fixa/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from pipelines.constants import constants
from pipelines.utils.crawler_anatel.banda_larga_fixa.tasks import (
join_tables_in_function,
get_max_date_in_table_microdados
get_max_date_in_table_microdados,
get_year_and_unzip,
)
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
Expand Down Expand Up @@ -39,7 +40,7 @@
)
dbt_alias = Parameter("dbt_alias", default=True, required=False)

ano = Parameter("ano", default=2024, required=False)
ano = Parameter("ano", default=None, required=False)

update_metadata = Parameter("update_metadata", default=True, required=False)

Expand All @@ -50,7 +51,14 @@
wait=table_id,
)

update_tables = get_max_date_in_table_microdados(ano=ano)
#####
# Function dynamic parameters
# https://discourse.prefect.io/t/my-parameter-value-shows-the-same-date-every-day-how-can-i-set-parameter-value-dynamically/99
#####

new_ano = get_year_and_unzip(day=ano)

update_tables = get_max_date_in_table_microdados(ano=new_ano, table_id=table_id, upstream_tasks=[new_ano])

get_max_date = check_if_data_is_outdated(
dataset_id = dataset_id,
Expand All @@ -59,7 +67,9 @@
date_format = "%Y-%m")

with case(get_max_date, True):
filepath = join_tables_in_function(table_id = table_id, ano=ano, upstream_tasks=[get_max_date])
filepath = join_tables_in_function(
table_id=table_id, ano=new_ano, upstream_tasks=[get_max_date]
)

wait_upload_table = create_table_and_upload_to_gcs(
data_path=filepath,
Expand Down Expand Up @@ -115,4 +125,4 @@
)

flow_anatel_banda_larga_fixa.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
flow_anatel_banda_larga_fixa.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
flow_anatel_banda_larga_fixa.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
49 changes: 38 additions & 11 deletions pipelines/utils/crawler_anatel/banda_larga_fixa/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
treatment_br,
treatment_uf,
treatment_municipio,
unzip_file
unzip_file,
get_year,
)
from pipelines.utils.utils import log, to_partitions

Expand Down Expand Up @@ -42,19 +43,45 @@ def join_tables_in_function(table_id: str, ano):
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def get_max_date_in_table_microdados(ano: int):
unzip_file()
log("Obtendo a data máxima do arquivo microdados da Anatel")
df = pd.read_csv(
f"{anatel_constants.INPUT_PATH.value}Acessos_Banda_Larga_Fixa_{ano}.csv",
def get_max_date_in_table_microdados(table_id: str, ano: int):
if table_id == "microdados":
log("Obtendo a data máxima do arquivo microdados da Anatel")
df = pd.read_csv(
f"{anatel_constants.INPUT_PATH.value}Acessos_Banda_Larga_Fixa_{ano}.csv",
sep=";",
encoding="utf-8",
dtype=str
)
df['data'] = df['Ano'] + '-' + df['Mês']

df['data'] = pd.to_datetime(df['data'], format="%Y-%m")

log(df['data'].max())

return df['data'].max()

else:
log(f"{anatel_constants.INPUT_PATH.value}Densidade_Telefonia_Movel.csv")

df = pd.read_csv(
f"{anatel_constants.INPUT_PATH.value}Densidade_Telefonia_Movel.csv",
sep=";",
encoding="utf-8",
dtype=str
)
df['data'] = df['Ano'] + '-' + df['Mês']
)
df['data'] = df['Ano'] + '-' + df['Mês']

df['data'] = pd.to_datetime(df['data'], format="%Y-%m")

log(df['data'].max())

return df['data'].max()

df['data'] = pd.to_datetime(df['data'], format="%Y-%m")

log(df['data'].max())
@task
def get_year_and_unzip(day):
if day is None:
log("Download dos dados...")
unzip_file()

return df['data'].max()
return get_year()
17 changes: 15 additions & 2 deletions pipelines/utils/crawler_anatel/banda_larga_fixa/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ def treatment_uf(table_id:str):
)



def treatment_municipio(table_id:str):
log("Iniciando o tratamento do arquivo densidade municipio da Anatel")
df = pd.read_csv(
Expand All @@ -211,4 +210,18 @@ def treatment_municipio(table_id:str):
df_municipio,
partition_columns=["ano"],
savepath=anatel_constants.TABLES_OUTPUT_PATH.value[table_id],
)
)


def get_year():
lista = []
for x in os.listdir(anatel_constants.INPUT_PATH.value):
parts = x.split("_")
if len(parts) > 4:
x = parts[4]
if len(x) == 4:
lista.append(x)

max_year = max(lista)
log(f"Ano máximo: {max_year}")
return max_year
36 changes: 23 additions & 13 deletions pipelines/utils/crawler_anatel/telefonia_movel/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
from pipelines.constants import constants
from pipelines.utils.crawler_anatel.telefonia_movel.tasks import (
join_tables_in_function,
get_max_date_in_table_microdados
get_max_date_in_table_microdados,
get_year_full,
get_semester,
unzip,
)
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
Expand All @@ -23,9 +26,7 @@
rename_current_flow_run_dataset_table,
)

with Flow(
name="BD template - Anatel Telefonia Móvel", code_owners=["trick"]
) as flow_anatel_telefonia_movel:
with Flow(name="BD template - Anatel Telefonia Móvel", code_owners=["trick"]) as flow_anatel_telefonia_movel:
# Parameters
dataset_id = Parameter(
"dataset_id", default="br_anatel_telefonia_movel", required=True
Expand All @@ -42,9 +43,9 @@
)
dbt_alias = Parameter("dbt_alias", default=True, required=False)

ano = Parameter("ano", default=2024, required=False)
ano = Parameter("ano", default=None, required=False)

semestre = Parameter("semestre", default=1, required=False)
semestre = Parameter("semestre", default=None, required=False)

update_metadata = Parameter("update_metadata", default=True, required=False)

Expand All @@ -54,8 +55,17 @@
table_id=table_id,
wait=table_id,
)
#####
# Function dynamic parameters
# https://discourse.prefect.io/t/my-parameter-value-shows-the-same-date-every-day-how-can-i-set-parameter-value-dynamically/99
#####
unzip_task = unzip()
new_year = get_year_full(ano, upstream_tasks=[unzip_task])
new_semester = get_semester(semestre, upstream_tasks=[new_year])

update_tables = get_max_date_in_table_microdados(ano=ano, semestre=semestre)
update_tables = get_max_date_in_table_microdados(
table_id = table_id, ano=new_year, semestre=new_semester, upstream_tasks=[new_year, new_semester]
)

get_max_date = check_if_data_is_outdated(
dataset_id = dataset_id,
Expand All @@ -67,11 +77,11 @@
with case(get_max_date, True):

filepath = join_tables_in_function(
table_id = table_id,
ano=ano,
semestre=semestre,
upstream_tasks=[get_max_date]
)
table_id=table_id,
ano=new_year,
semestre=new_semester,
upstream_tasks=[get_max_date],
)

wait_upload_table = create_table_and_upload_to_gcs(
data_path=filepath,
Expand Down Expand Up @@ -127,4 +137,4 @@
)

flow_anatel_telefonia_movel.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
flow_anatel_telefonia_movel.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
flow_anatel_telefonia_movel.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
74 changes: 64 additions & 10 deletions pipelines/utils/crawler_anatel/telefonia_movel/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
clean_csv_brasil,
clean_csv_municipio,
clean_csv_uf,
get_year

)
from pipelines.utils.utils import log, to_partitions

Expand Down Expand Up @@ -46,19 +48,71 @@ def join_tables_in_function(table_id, semestre, ano):
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def get_max_date_in_table_microdados(ano: int, semestre: int):
unzip_file()
log("Obtendo a data máxima da tabela microdados...")
df = pd.read_csv(
f"{anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{ano}_{semestre}S.csv",
def get_max_date_in_table_microdados(table_id, ano, semestre):

if table_id == 'microdados':
log("Obtendo a data máxima da tabela microdados...")
log(
f"{anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{ano}_{semestre}S.csv"
)
df = pd.read_csv(
f"{anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{ano}_{semestre}S.csv",
sep=";",
encoding="utf-8",
dtype=str
)
df['data'] = df['Ano'] + '-' + df['Mês']

df['data'] = pd.to_datetime(df['data'], format="%Y-%m")

log(df['data'].max())

return df['data'].max()

else:
log(f"{anatel_constants.INPUT_PATH.value}Densidade_Telefonia_Movel.csv")

df = pd.read_csv(
f"{anatel_constants.INPUT_PATH.value}Densidade_Telefonia_Movel.csv",
sep=";",
encoding="utf-8",
dtype=str
)
df['data'] = df['Ano'] + '-' + df['Mês']
)
df['data'] = df['Ano'] + '-' + df['Mês']

df['data'] = pd.to_datetime(df['data'], format="%Y-%m")

log(df['data'].max())

return df['data'].max()


@task(
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def unzip():
return unzip_file()


@task
def get_year_full(year):
log("Obtendo o ano...")
if year is None:

df['data'] = pd.to_datetime(df['data'], format="%Y-%m")
return get_year()

log(df['data'].max())

return df['data'].max()
@task
def get_semester(semester):
log("Obtendo o semestre...")
ano = get_year()
if semester is None:
if os.path.exists(
f"{anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{ano}_2S.csv"
):
log("Segundo semestre")
return 2
else:
log("Primeiro semestre")
return 1
16 changes: 15 additions & 1 deletion pipelines/utils/crawler_anatel/telefonia_movel/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,18 @@ def clean_csv_municipio(table_id):
sep=",",
encoding="utf-8",
na_rep="",
)
)


def get_year():
lista = []
for x in os.listdir(anatel_constants.INPUT_PATH.value):
parts = x.split("_")
if len(parts) > 3:
x = parts[3]
if len(x) == 4:
lista.append(x)

max_year = max(lista)
log(f"Ano máximo: {max_year}")
return max_year

0 comments on commit 68a1e6b

Please sign in to comment.