Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[check_for_updates] br_anp_precos_combustiveis #465

Merged
merged 12 commits into from
Sep 18, 2023
4 changes: 4 additions & 0 deletions pipelines/datasets/br_anp_precos_combustiveis/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ class constants(Enum): # pylint: disable=c0103
"https://www.gov.br/anp/pt-br/centrais-de-conteudo/dados-abertos/arquivos/shpc/qus/ultimas-4-semanas-diesel-gnv.csv",
]

URLS_DATA = [
"https://www.gov.br/anp/pt-br/centrais-de-conteudo/dados-abertos/arquivos/shpc/qus/ultimas-4-semanas-glp.csv"
]

PATH_INPUT = "/tmp/input/"

PATH_OUTPUT = "/tmp/output/"
Expand Down
156 changes: 64 additions & 92 deletions pipelines/datasets/br_anp_precos_combustiveis/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants
from pipelines.datasets.br_anp_precos_combustiveis.utils import download_files
from pipelines.datasets.br_anp_precos_combustiveis.tasks import (
tratamento,
download_and_transform,
data_max_bd_mais,
data_max_bd_pro,
make_partitions,
check_for_updates,
)
from pipelines.datasets.br_anp_precos_combustiveis.schedules import (
every_week_anp_microdados,
Expand All @@ -26,6 +28,11 @@
create_table_and_upload_to_gcs,
rename_current_flow_run_dataset_table,
get_current_flow_labels,
log_task,
)

from pipelines.datasets.br_anp_precos_combustiveis.constants import (
constants as anatel_constants,
)

with Flow(
Expand All @@ -37,7 +44,7 @@
table_id = Parameter("table_id", default="microdados", required=True)

materialization_mode = Parameter(
"materialization_mode", default="prod", required=False
"materialization_mode", default="dev", required=False
)

materialize_after_dump = Parameter(
Expand All @@ -51,103 +58,68 @@
)
update_metadata = Parameter("update_metadata", default=True, required=False)

df = tratamento(upstream_tasks=[rename_flow_run])
output_path = make_partitions(df=df, upstream_tasks=[df])
get_date_max_mais = data_max_bd_mais()
get_date_max_pro = data_max_bd_pro(df=df)

# pylint: disable=C0103
wait_upload_table = create_table_and_upload_to_gcs(
data_path=output_path,
dataset_id=dataset_id,
table_id=table_id,
dump_mode="append",
wait=output_path,
)

# ! BD MAIS - Atrasado
with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": dataset_id,
"table_id": table_id,
"mode": materialization_mode,
"dbt_alias": dbt_alias,
},
labels=current_flow_labels,
run_name=f"Materialize {dataset_id}.{table_id}",
dados_desatualizados = check_for_updates(dataset_id=dataset_id, table_id=table_id)
log_task(f"Checando se os dados estão desatualizados: {dados_desatualizados}")
with case(dados_desatualizados, False):
log_task(
"Dados atualizados, não é necessário fazer o download",
upstream_tasks=[dados_desatualizados],
)

wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)
with case(dados_desatualizados, True):
df = download_and_transform(upstream_tasks=[rename_flow_run])
output_path = make_partitions(df=df, upstream_tasks=[df])
get_date_max_pro = data_max_bd_pro(df=df)

with case(update_metadata, True):
update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
bq_last_update=False,
api_mode="prod",
date_format="yy-mm-dd",
_last_date=get_date_max_mais,
upstream_tasks=[wait_upload_table],
)

# ! BD PRO - Atualizado
with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": dataset_id,
"table_id": table_id + "_atualizado",
"mode": materialization_mode,
"dbt_alias": dbt_alias,
},
labels=current_flow_labels,
run_name=f"Materialize {dataset_id}.{table_id}" "_atualizado",
# pylint: disable=C0103
wait_upload_table = create_table_and_upload_to_gcs(
data_path=output_path,
dataset_id=dataset_id,
table_id=table_id,
dump_mode="append",
wait=output_path,
)

wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)
with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": dataset_id,
"table_id": table_id,
"mode": materialization_mode,
"dbt_alias": dbt_alias,
},
labels=current_flow_labels,
run_name=f"Materialize {dataset_id}.{table_id}",
)

with case(update_metadata, True):
update_django_metadata(
dataset_id,
table_id + "_atualizado",
metadata_type="DateTimeRange",
bq_last_update=False,
api_mode="prod",
date_format="yy-mm-dd",
_last_date=get_date_max_pro,
upstream_tasks=[wait_upload_table],
wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

with case(update_metadata, True):
update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
bq_last_update=False,
api_mode="prod",
date_format="yy-mm-dd",
_last_date=get_date_max_pro,
upstream_tasks=[wait_upload_table],
)

anp_microdados.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
anp_microdados.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
Expand Down
7 changes: 3 additions & 4 deletions pipelines/datasets/br_anp_precos_combustiveis/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
"""

from datetime import timedelta, datetime

from prefect.schedules import Schedule
from prefect.schedules.clocks import IntervalClock
from prefect.schedules.clocks import CronClock
from pipelines.constants import constants

every_week_anp_microdados = Schedule(
clocks=[
IntervalClock(
interval=timedelta(days=1),
CronClock(
cron="0 10 * * *",
start_date=datetime(2021, 1, 1),
labels=[constants.BASEDOSDADOS_PROD_AGENT_LABEL.value],
parameter_defaults={
Expand Down
139 changes: 62 additions & 77 deletions pipelines/datasets/br_anp_precos_combustiveis/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,99 +12,84 @@
get_id_municipio,
open_csvs,
partition_data,
merge_table_id_municipio,
orderning_data_coleta,
creating_column_ano,
rename_and_reordening,
rename_columns,
rename_and_to_create_endereco,
lower_colunm_produto,
)
from pipelines.datasets.br_anp_precos_combustiveis.constants import (
constants as anatel_constants,
)
from pipelines.utils.utils import log
from pipelines.utils.utils import log, extract_last_date
from pipelines.constants import constants


@task
def check_for_updates(dataset_id, table_id):
"""
Checks if there are available updates for a specific dataset and table.

Returns:
bool: Returns True if updates are available, otherwise returns False.
"""
# Obtém a data mais recente do site
download_files(anatel_constants.URLS_DATA.value, anatel_constants.PATH_INPUT.value)
df = pd.read_csv(anatel_constants.URL_GLP.value, sep=";", encoding="utf-8")
data_obj = df["Data da Coleta"].max()
data_obj = datetime.strptime(data_obj, "%d/%m/%Y").strftime("%Y-%m-%d")
data_obj = datetime.strptime(data_obj, "%Y-%m-%d").date()

# Obtém a última data no site BD
data_bq_obj = extract_last_date(
dataset_id, table_id, "yy-mm-dd", "basedosdados", data="data_coleta"
)

# Registra a data mais recente do site
log(f"Última data no site do ANP: {data_obj}")
log(f"Última data no site da BD: {data_bq_obj}")

# Compara as datas para verificar se há atualizações
if data_obj > data_bq_obj:
return True # Há atualizações disponíveis
else:
return False # Não há novas atualizações disponíveis


@task(
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def tratamento():
download_files(
anatel_constants.URLS.value,
anatel_constants.PATH_INPUT.value,
)
def download_and_transform():
download_files(anatel_constants.URLS.value, anatel_constants.PATH_INPUT.value)

precos_combustiveis = open_csvs(
url_diesel_gnv=anatel_constants.URL_DIESEL_GNV.value,
url_gasolina_etanol=anatel_constants.URL_GASOLINA_ETANOL.value,
url_glp=anatel_constants.URL_GLP.value,
anatel_constants.URL_DIESEL_GNV.value,
anatel_constants.URL_GASOLINA_ETANOL.value,
anatel_constants.URL_GLP.value,
)

id_municipio = get_id_municipio()
log("Iniciando tratamento dos dados precos_combustiveis")
precos_combustiveis = pd.merge(
id_municipio,
precos_combustiveis,
how="right",
left_on=["nome", "sigla_uf"],
right_on=["Municipio", "Estado - Sigla"],
)
log("----" * 150)
log("Dados mergeados")
precos_combustiveis.rename(columns={"Municipio": "nome"}, inplace=True)
precos_combustiveis.dropna(subset=["Valor de Venda"], inplace=True)
precos_combustiveis["endereco_revenda"] = (
precos_combustiveis["Nome da Rua"].fillna("")
+ ","
+ " "
+ precos_combustiveis["Numero Rua"].fillna("")
+ ","
+ " "
+ precos_combustiveis["Complemento"].fillna("")
)
precos_combustiveis.drop(columns=["sigla_uf"], inplace=True)
precos_combustiveis.rename(columns={"Data da Coleta": "data_coleta"}, inplace=True)
precos_combustiveis["data_coleta"] = (
precos_combustiveis["data_coleta"].str[6:10]
+ "-"
+ precos_combustiveis["data_coleta"].str[3:5]
+ "-"
+ precos_combustiveis["data_coleta"].str[0:2]
)
precos_combustiveis["Produto"] = precos_combustiveis["Produto"].str.lower()
precos_combustiveis["ano"] = precos_combustiveis["data_coleta"].str[0:4]
precos_combustiveis["ano"].replace("nan", "", inplace=True)
precos_combustiveis.rename(columns=anatel_constants.RENAME.value, inplace=True)
precos_combustiveis = precos_combustiveis[anatel_constants.ORDEM.value]
precos_combustiveis["ano"] = precos_combustiveis["ano"].apply(
lambda x: str(x).replace(".0", "")
)
precos_combustiveis["cep_revenda"] = precos_combustiveis["cep_revenda"].apply(
lambda x: str(x).replace("-", "")
)
precos_combustiveis["unidade_medida"] = precos_combustiveis["unidade_medida"].map(
{"R$ / litro": "R$/litro", "R$ / m³": "R$/m3", "R$ / 13 kg": "R$/13kg"}
)
precos_combustiveis["nome_estabelecimento"] = precos_combustiveis[
"nome_estabelecimento"
].apply(lambda x: str(x).replace(",", ""))
precos_combustiveis["preco_compra"] = precos_combustiveis["preco_compra"].apply(
lambda x: str(x).replace(",", ".")
)
precos_combustiveis["preco_venda"] = precos_combustiveis["preco_venda"].apply(
lambda x: str(x).replace(",", ".")
)
precos_combustiveis["preco_venda"] = precos_combustiveis["preco_venda"].replace(
"nan", ""
)
precos_combustiveis["preco_compra"] = precos_combustiveis["preco_compra"].replace(
"nan", ""
df = get_id_municipio(id_municipio=precos_combustiveis)

df = merge_table_id_municipio(
id_municipio=df, pd_precos_combustiveis=precos_combustiveis
)
precos_combustiveis.replace(np.nan, "", inplace=True)
log("----" * 150)
log("Dados tratados com sucesso")
log("----" * 150)
log("Iniciando particionamento dos dados")
log("----" * 150)
log(precos_combustiveis["data_coleta"].unique())

return precos_combustiveis

df = rename_and_to_create_endereco(precos_combustiveis=df)

df = orderning_data_coleta(precos_combustiveis=df)

df = lower_colunm_produto(precos_combustiveis=df)

df = creating_column_ano(precos_combustiveis=df)

df = rename_and_reordening(precos_combustiveis=df)

df = rename_columns(precos_combustiveis=df)

return df


@task(
Expand Down
Loading