Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/basedosdados/pipelines in…
Browse files Browse the repository at this point in the history
…to staging/fix-cvm
  • Loading branch information
arthurfg committed Sep 19, 2023
2 parents f29ff75 + 98731ac commit 8fd1181
Show file tree
Hide file tree
Showing 12 changed files with 279 additions and 207 deletions.
4 changes: 4 additions & 0 deletions pipelines/datasets/br_anp_precos_combustiveis/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ class constants(Enum): # pylint: disable=c0103
"https://www.gov.br/anp/pt-br/centrais-de-conteudo/dados-abertos/arquivos/shpc/qus/ultimas-4-semanas-diesel-gnv.csv",
]

URLS_DATA = [
"https://www.gov.br/anp/pt-br/centrais-de-conteudo/dados-abertos/arquivos/shpc/qus/ultimas-4-semanas-glp.csv"
]

PATH_INPUT = "/tmp/input/"

PATH_OUTPUT = "/tmp/output/"
Expand Down
156 changes: 64 additions & 92 deletions pipelines/datasets/br_anp_precos_combustiveis/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants
from pipelines.datasets.br_anp_precos_combustiveis.utils import download_files
from pipelines.datasets.br_anp_precos_combustiveis.tasks import (
tratamento,
download_and_transform,
data_max_bd_mais,
data_max_bd_pro,
make_partitions,
check_for_updates,
)
from pipelines.datasets.br_anp_precos_combustiveis.schedules import (
every_week_anp_microdados,
Expand All @@ -26,6 +28,11 @@
create_table_and_upload_to_gcs,
rename_current_flow_run_dataset_table,
get_current_flow_labels,
log_task,
)

from pipelines.datasets.br_anp_precos_combustiveis.constants import (
constants as anatel_constants,
)

with Flow(
Expand All @@ -37,7 +44,7 @@
table_id = Parameter("table_id", default="microdados", required=True)

materialization_mode = Parameter(
"materialization_mode", default="prod", required=False
"materialization_mode", default="dev", required=False
)

materialize_after_dump = Parameter(
Expand All @@ -51,103 +58,68 @@
)
update_metadata = Parameter("update_metadata", default=True, required=False)

df = tratamento(upstream_tasks=[rename_flow_run])
output_path = make_partitions(df=df, upstream_tasks=[df])
get_date_max_mais = data_max_bd_mais()
get_date_max_pro = data_max_bd_pro(df=df)

# pylint: disable=C0103
wait_upload_table = create_table_and_upload_to_gcs(
data_path=output_path,
dataset_id=dataset_id,
table_id=table_id,
dump_mode="append",
wait=output_path,
)

# ! BD MAIS - Atrasado
with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": dataset_id,
"table_id": table_id,
"mode": materialization_mode,
"dbt_alias": dbt_alias,
},
labels=current_flow_labels,
run_name=f"Materialize {dataset_id}.{table_id}",
dados_desatualizados = check_for_updates(dataset_id=dataset_id, table_id=table_id)
log_task(f"Checando se os dados estão desatualizados: {dados_desatualizados}")
with case(dados_desatualizados, False):
log_task(
"Dados atualizados, não é necessário fazer o download",
upstream_tasks=[dados_desatualizados],
)

wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)
with case(dados_desatualizados, True):
df = download_and_transform(upstream_tasks=[rename_flow_run])
output_path = make_partitions(df=df, upstream_tasks=[df])
get_date_max_pro = data_max_bd_pro(df=df)

with case(update_metadata, True):
update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
bq_last_update=False,
api_mode="prod",
date_format="yy-mm-dd",
_last_date=get_date_max_mais,
upstream_tasks=[wait_upload_table],
)

# ! BD PRO - Atualizado
with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": dataset_id,
"table_id": table_id + "_atualizado",
"mode": materialization_mode,
"dbt_alias": dbt_alias,
},
labels=current_flow_labels,
run_name=f"Materialize {dataset_id}.{table_id}" "_atualizado",
# pylint: disable=C0103
wait_upload_table = create_table_and_upload_to_gcs(
data_path=output_path,
dataset_id=dataset_id,
table_id=table_id,
dump_mode="append",
wait=output_path,
)

wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)
with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": dataset_id,
"table_id": table_id,
"mode": materialization_mode,
"dbt_alias": dbt_alias,
},
labels=current_flow_labels,
run_name=f"Materialize {dataset_id}.{table_id}",
)

with case(update_metadata, True):
update_django_metadata(
dataset_id,
table_id + "_atualizado",
metadata_type="DateTimeRange",
bq_last_update=False,
api_mode="prod",
date_format="yy-mm-dd",
_last_date=get_date_max_pro,
upstream_tasks=[wait_upload_table],
wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

with case(update_metadata, True):
update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
bq_last_update=False,
api_mode="prod",
date_format="yy-mm-dd",
_last_date=get_date_max_pro,
upstream_tasks=[wait_upload_table],
)

anp_microdados.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
anp_microdados.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
Expand Down
7 changes: 3 additions & 4 deletions pipelines/datasets/br_anp_precos_combustiveis/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
"""

from datetime import timedelta, datetime

from prefect.schedules import Schedule
from prefect.schedules.clocks import IntervalClock
from prefect.schedules.clocks import CronClock
from pipelines.constants import constants

every_week_anp_microdados = Schedule(
clocks=[
IntervalClock(
interval=timedelta(days=1),
CronClock(
cron="0 10 * * *",
start_date=datetime(2021, 1, 1),
labels=[constants.BASEDOSDADOS_PROD_AGENT_LABEL.value],
parameter_defaults={
Expand Down
139 changes: 62 additions & 77 deletions pipelines/datasets/br_anp_precos_combustiveis/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,99 +12,84 @@
get_id_municipio,
open_csvs,
partition_data,
merge_table_id_municipio,
orderning_data_coleta,
creating_column_ano,
rename_and_reordening,
rename_columns,
rename_and_to_create_endereco,
lower_colunm_produto,
)
from pipelines.datasets.br_anp_precos_combustiveis.constants import (
constants as anatel_constants,
)
from pipelines.utils.utils import log
from pipelines.utils.utils import log, extract_last_date
from pipelines.constants import constants


@task
def check_for_updates(dataset_id, table_id):
"""
Checks if there are available updates for a specific dataset and table.
Returns:
bool: Returns True if updates are available, otherwise returns False.
"""
# Obtém a data mais recente do site
download_files(anatel_constants.URLS_DATA.value, anatel_constants.PATH_INPUT.value)
df = pd.read_csv(anatel_constants.URL_GLP.value, sep=";", encoding="utf-8")
data_obj = df["Data da Coleta"].max()
data_obj = datetime.strptime(data_obj, "%d/%m/%Y").strftime("%Y-%m-%d")
data_obj = datetime.strptime(data_obj, "%Y-%m-%d").date()

# Obtém a última data no site BD
data_bq_obj = extract_last_date(
dataset_id, table_id, "yy-mm-dd", "basedosdados", data="data_coleta"
)

# Registra a data mais recente do site
log(f"Última data no site do ANP: {data_obj}")
log(f"Última data no site da BD: {data_bq_obj}")

# Compara as datas para verificar se há atualizações
if data_obj > data_bq_obj:
return True # Há atualizações disponíveis
else:
return False # Não há novas atualizações disponíveis


@task(
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def tratamento():
download_files(
anatel_constants.URLS.value,
anatel_constants.PATH_INPUT.value,
)
def download_and_transform():
download_files(anatel_constants.URLS.value, anatel_constants.PATH_INPUT.value)

precos_combustiveis = open_csvs(
url_diesel_gnv=anatel_constants.URL_DIESEL_GNV.value,
url_gasolina_etanol=anatel_constants.URL_GASOLINA_ETANOL.value,
url_glp=anatel_constants.URL_GLP.value,
anatel_constants.URL_DIESEL_GNV.value,
anatel_constants.URL_GASOLINA_ETANOL.value,
anatel_constants.URL_GLP.value,
)

id_municipio = get_id_municipio()
log("Iniciando tratamento dos dados precos_combustiveis")
precos_combustiveis = pd.merge(
id_municipio,
precos_combustiveis,
how="right",
left_on=["nome", "sigla_uf"],
right_on=["Municipio", "Estado - Sigla"],
)
log("----" * 150)
log("Dados mergeados")
precos_combustiveis.rename(columns={"Municipio": "nome"}, inplace=True)
precos_combustiveis.dropna(subset=["Valor de Venda"], inplace=True)
precos_combustiveis["endereco_revenda"] = (
precos_combustiveis["Nome da Rua"].fillna("")
+ ","
+ " "
+ precos_combustiveis["Numero Rua"].fillna("")
+ ","
+ " "
+ precos_combustiveis["Complemento"].fillna("")
)
precos_combustiveis.drop(columns=["sigla_uf"], inplace=True)
precos_combustiveis.rename(columns={"Data da Coleta": "data_coleta"}, inplace=True)
precos_combustiveis["data_coleta"] = (
precos_combustiveis["data_coleta"].str[6:10]
+ "-"
+ precos_combustiveis["data_coleta"].str[3:5]
+ "-"
+ precos_combustiveis["data_coleta"].str[0:2]
)
precos_combustiveis["Produto"] = precos_combustiveis["Produto"].str.lower()
precos_combustiveis["ano"] = precos_combustiveis["data_coleta"].str[0:4]
precos_combustiveis["ano"].replace("nan", "", inplace=True)
precos_combustiveis.rename(columns=anatel_constants.RENAME.value, inplace=True)
precos_combustiveis = precos_combustiveis[anatel_constants.ORDEM.value]
precos_combustiveis["ano"] = precos_combustiveis["ano"].apply(
lambda x: str(x).replace(".0", "")
)
precos_combustiveis["cep_revenda"] = precos_combustiveis["cep_revenda"].apply(
lambda x: str(x).replace("-", "")
)
precos_combustiveis["unidade_medida"] = precos_combustiveis["unidade_medida"].map(
{"R$ / litro": "R$/litro", "R$ / m³": "R$/m3", "R$ / 13 kg": "R$/13kg"}
)
precos_combustiveis["nome_estabelecimento"] = precos_combustiveis[
"nome_estabelecimento"
].apply(lambda x: str(x).replace(",", ""))
precos_combustiveis["preco_compra"] = precos_combustiveis["preco_compra"].apply(
lambda x: str(x).replace(",", ".")
)
precos_combustiveis["preco_venda"] = precos_combustiveis["preco_venda"].apply(
lambda x: str(x).replace(",", ".")
)
precos_combustiveis["preco_venda"] = precos_combustiveis["preco_venda"].replace(
"nan", ""
)
precos_combustiveis["preco_compra"] = precos_combustiveis["preco_compra"].replace(
"nan", ""
df = get_id_municipio(id_municipio=precos_combustiveis)

df = merge_table_id_municipio(
id_municipio=df, pd_precos_combustiveis=precos_combustiveis
)
precos_combustiveis.replace(np.nan, "", inplace=True)
log("----" * 150)
log("Dados tratados com sucesso")
log("----" * 150)
log("Iniciando particionamento dos dados")
log("----" * 150)
log(precos_combustiveis["data_coleta"].unique())

return precos_combustiveis

df = rename_and_to_create_endereco(precos_combustiveis=df)

df = orderning_data_coleta(precos_combustiveis=df)

df = lower_colunm_produto(precos_combustiveis=df)

df = creating_column_ano(precos_combustiveis=df)

df = rename_and_reordening(precos_combustiveis=df)

df = rename_columns(precos_combustiveis=df)

return df


@task(
Expand Down
Loading

0 comments on commit 8fd1181

Please sign in to comment.