diff --git a/pipelines/datasets/br_anp_precos_combustiveis/constants.py b/pipelines/datasets/br_anp_precos_combustiveis/constants.py index b9299317b..d35088bfd 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/constants.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/constants.py @@ -48,6 +48,10 @@ class constants(Enum): # pylint: disable=c0103 "https://www.gov.br/anp/pt-br/centrais-de-conteudo/dados-abertos/arquivos/shpc/qus/ultimas-4-semanas-diesel-gnv.csv", ] + URLS_DATA = [ + "https://www.gov.br/anp/pt-br/centrais-de-conteudo/dados-abertos/arquivos/shpc/qus/ultimas-4-semanas-glp.csv" + ] + PATH_INPUT = "/tmp/input/" PATH_OUTPUT = "/tmp/output/" diff --git a/pipelines/datasets/br_anp_precos_combustiveis/flows.py b/pipelines/datasets/br_anp_precos_combustiveis/flows.py index ce81cd827..ec824870b 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/flows.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/flows.py @@ -13,11 +13,13 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.datasets.br_anp_precos_combustiveis.utils import download_files from pipelines.datasets.br_anp_precos_combustiveis.tasks import ( - tratamento, + download_and_transform, data_max_bd_mais, data_max_bd_pro, make_partitions, + check_for_updates, ) from pipelines.datasets.br_anp_precos_combustiveis.schedules import ( every_week_anp_microdados, @@ -26,6 +28,11 @@ create_table_and_upload_to_gcs, rename_current_flow_run_dataset_table, get_current_flow_labels, + log_task, +) + +from pipelines.datasets.br_anp_precos_combustiveis.constants import ( + constants as anatel_constants, ) with Flow( @@ -37,7 +44,7 @@ table_id = Parameter("table_id", default="microdados", required=True) materialization_mode = Parameter( - "materialization_mode", default="prod", required=False + "materialization_mode", default="dev", required=False ) materialize_after_dump = Parameter( @@ -51,103 +58,68 @@ ) update_metadata = Parameter("update_metadata", default=True, required=False) - df = tratamento(upstream_tasks=[rename_flow_run]) - output_path = make_partitions(df=df, upstream_tasks=[df]) - get_date_max_mais = data_max_bd_mais() - get_date_max_pro = data_max_bd_pro(df=df) - - # pylint: disable=C0103 - wait_upload_table = create_table_and_upload_to_gcs( - data_path=output_path, - dataset_id=dataset_id, - table_id=table_id, - dump_mode="append", - wait=output_path, - ) - - # ! BD MAIS - Atrasado - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id, - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", + dados_desatualizados = check_for_updates(dataset_id=dataset_id, table_id=table_id) + log_task(f"Checando se os dados estão desatualizados: {dados_desatualizados}") + with case(dados_desatualizados, False): + log_task( + "Dados atualizados, não é necessário fazer o download", + upstream_tasks=[dados_desatualizados], ) - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) + with case(dados_desatualizados, True): + df = download_and_transform(upstream_tasks=[rename_flow_run]) + output_path = make_partitions(df=df, upstream_tasks=[df]) + get_date_max_pro = data_max_bd_pro(df=df) - with case(update_metadata, True): - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - api_mode="prod", - date_format="yy-mm-dd", - _last_date=get_date_max_mais, - upstream_tasks=[wait_upload_table], - ) - - # ! BD PRO - Atualizado - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}" "_atualizado", + # pylint: disable=C0103 + wait_upload_table = create_table_and_upload_to_gcs( + data_path=output_path, + dataset_id=dataset_id, + table_id=table_id, + dump_mode="append", + wait=output_path, ) - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) + with case(materialize_after_dump, True): + # Trigger DBT flow run + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": table_id, + "mode": materialization_mode, + "dbt_alias": dbt_alias, + }, + labels=current_flow_labels, + run_name=f"Materialize {dataset_id}.{table_id}", + ) - with case(update_metadata, True): - update_django_metadata( - dataset_id, - table_id + "_atualizado", - metadata_type="DateTimeRange", - bq_last_update=False, - api_mode="prod", - date_format="yy-mm-dd", - _last_date=get_date_max_pro, - upstream_tasks=[wait_upload_table], + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + with case(update_metadata, True): + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_last_update=False, + api_mode="prod", + date_format="yy-mm-dd", + _last_date=get_date_max_pro, + upstream_tasks=[wait_upload_table], + ) anp_microdados.storage = GCS(constants.GCS_FLOWS_BUCKET.value) anp_microdados.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) diff --git a/pipelines/datasets/br_anp_precos_combustiveis/schedules.py b/pipelines/datasets/br_anp_precos_combustiveis/schedules.py index a1c3751ee..9c660ab04 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/schedules.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/schedules.py @@ -4,15 +4,14 @@ """ from datetime import timedelta, datetime - from prefect.schedules import Schedule -from prefect.schedules.clocks import IntervalClock +from prefect.schedules.clocks import CronClock from pipelines.constants import constants every_week_anp_microdados = Schedule( clocks=[ - IntervalClock( - interval=timedelta(days=1), + CronClock( + cron="0 10 * * *", start_date=datetime(2021, 1, 1), labels=[constants.BASEDOSDADOS_PROD_AGENT_LABEL.value], parameter_defaults={ diff --git a/pipelines/datasets/br_anp_precos_combustiveis/tasks.py b/pipelines/datasets/br_anp_precos_combustiveis/tasks.py index b8a226343..48c911278 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/tasks.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/tasks.py @@ -12,99 +12,84 @@ get_id_municipio, open_csvs, partition_data, + merge_table_id_municipio, + orderning_data_coleta, + creating_column_ano, + rename_and_reordening, + rename_columns, + rename_and_to_create_endereco, + lower_colunm_produto, ) from pipelines.datasets.br_anp_precos_combustiveis.constants import ( constants as anatel_constants, ) -from pipelines.utils.utils import log +from pipelines.utils.utils import log, extract_last_date from pipelines.constants import constants +@task +def check_for_updates(dataset_id, table_id): + """ + Checks if there are available updates for a specific dataset and table. + + Returns: + bool: Returns True if updates are available, otherwise returns False. + """ + # Obtém a data mais recente do site + download_files(anatel_constants.URLS_DATA.value, anatel_constants.PATH_INPUT.value) + df = pd.read_csv(anatel_constants.URL_GLP.value, sep=";", encoding="utf-8") + data_obj = df["Data da Coleta"].max() + data_obj = datetime.strptime(data_obj, "%d/%m/%Y").strftime("%Y-%m-%d") + data_obj = datetime.strptime(data_obj, "%Y-%m-%d").date() + + # Obtém a última data no site BD + data_bq_obj = extract_last_date( + dataset_id, table_id, "yy-mm-dd", "basedosdados", data="data_coleta" + ) + + # Registra a data mais recente do site + log(f"Última data no site do ANP: {data_obj}") + log(f"Última data no site da BD: {data_bq_obj}") + + # Compara as datas para verificar se há atualizações + if data_obj > data_bq_obj: + return True # Há atualizações disponíveis + else: + return False # Não há novas atualizações disponíveis + + @task( max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) -def tratamento(): - download_files( - anatel_constants.URLS.value, - anatel_constants.PATH_INPUT.value, - ) +def download_and_transform(): + download_files(anatel_constants.URLS.value, anatel_constants.PATH_INPUT.value) precos_combustiveis = open_csvs( - url_diesel_gnv=anatel_constants.URL_DIESEL_GNV.value, - url_gasolina_etanol=anatel_constants.URL_GASOLINA_ETANOL.value, - url_glp=anatel_constants.URL_GLP.value, + anatel_constants.URL_DIESEL_GNV.value, + anatel_constants.URL_GASOLINA_ETANOL.value, + anatel_constants.URL_GLP.value, ) - id_municipio = get_id_municipio() - log("Iniciando tratamento dos dados precos_combustiveis") - precos_combustiveis = pd.merge( - id_municipio, - precos_combustiveis, - how="right", - left_on=["nome", "sigla_uf"], - right_on=["Municipio", "Estado - Sigla"], - ) - log("----" * 150) - log("Dados mergeados") - precos_combustiveis.rename(columns={"Municipio": "nome"}, inplace=True) - precos_combustiveis.dropna(subset=["Valor de Venda"], inplace=True) - precos_combustiveis["endereco_revenda"] = ( - precos_combustiveis["Nome da Rua"].fillna("") - + "," - + " " - + precos_combustiveis["Numero Rua"].fillna("") - + "," - + " " - + precos_combustiveis["Complemento"].fillna("") - ) - precos_combustiveis.drop(columns=["sigla_uf"], inplace=True) - precos_combustiveis.rename(columns={"Data da Coleta": "data_coleta"}, inplace=True) - precos_combustiveis["data_coleta"] = ( - precos_combustiveis["data_coleta"].str[6:10] - + "-" - + precos_combustiveis["data_coleta"].str[3:5] - + "-" - + precos_combustiveis["data_coleta"].str[0:2] - ) - precos_combustiveis["Produto"] = precos_combustiveis["Produto"].str.lower() - precos_combustiveis["ano"] = precos_combustiveis["data_coleta"].str[0:4] - precos_combustiveis["ano"].replace("nan", "", inplace=True) - precos_combustiveis.rename(columns=anatel_constants.RENAME.value, inplace=True) - precos_combustiveis = precos_combustiveis[anatel_constants.ORDEM.value] - precos_combustiveis["ano"] = precos_combustiveis["ano"].apply( - lambda x: str(x).replace(".0", "") - ) - precos_combustiveis["cep_revenda"] = precos_combustiveis["cep_revenda"].apply( - lambda x: str(x).replace("-", "") - ) - precos_combustiveis["unidade_medida"] = precos_combustiveis["unidade_medida"].map( - {"R$ / litro": "R$/litro", "R$ / m³": "R$/m3", "R$ / 13 kg": "R$/13kg"} - ) - precos_combustiveis["nome_estabelecimento"] = precos_combustiveis[ - "nome_estabelecimento" - ].apply(lambda x: str(x).replace(",", "")) - precos_combustiveis["preco_compra"] = precos_combustiveis["preco_compra"].apply( - lambda x: str(x).replace(",", ".") - ) - precos_combustiveis["preco_venda"] = precos_combustiveis["preco_venda"].apply( - lambda x: str(x).replace(",", ".") - ) - precos_combustiveis["preco_venda"] = precos_combustiveis["preco_venda"].replace( - "nan", "" - ) - precos_combustiveis["preco_compra"] = precos_combustiveis["preco_compra"].replace( - "nan", "" + df = get_id_municipio(id_municipio=precos_combustiveis) + + df = merge_table_id_municipio( + id_municipio=df, pd_precos_combustiveis=precos_combustiveis ) - precos_combustiveis.replace(np.nan, "", inplace=True) - log("----" * 150) - log("Dados tratados com sucesso") - log("----" * 150) - log("Iniciando particionamento dos dados") - log("----" * 150) - log(precos_combustiveis["data_coleta"].unique()) - - return precos_combustiveis + + df = rename_and_to_create_endereco(precos_combustiveis=df) + + df = orderning_data_coleta(precos_combustiveis=df) + + df = lower_colunm_produto(precos_combustiveis=df) + + df = creating_column_ano(precos_combustiveis=df) + + df = rename_and_reordening(precos_combustiveis=df) + + df = rename_columns(precos_combustiveis=df) + + return df @task( diff --git a/pipelines/datasets/br_anp_precos_combustiveis/utils.py b/pipelines/datasets/br_anp_precos_combustiveis/utils.py index 8f8bc3d3b..38cf5eef1 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/utils.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/utils.py @@ -9,6 +9,10 @@ import requests from pipelines.utils.utils import log from datetime import datetime +from pipelines.datasets.br_anp_precos_combustiveis.constants import ( + constants as anatel_constants, +) +import numpy as np def download_files(urls, path): @@ -28,10 +32,8 @@ def download_files(urls, path): for url in urls: response = requests.get(url) if response.status_code == 200: - file_name = url.split("/")[-1] # Get the filename from the URL + file_name = url.split("/")[-1] file_path = os.path.join(path, file_name) - # log("----" * 150) - # log("if response.status_code == 200: SUCESSO") with open(file_path, "wb") as file: file.write(response.content) @@ -46,7 +48,7 @@ def download_files(urls, path): return downloaded_files -def get_id_municipio(): +def get_id_municipio(id_municipio: pd.DataFrame): # ! Carregando os dados direto do Diretório de municipio da BD # Para carregar o dado direto no pandas log("Carregando dados do diretório de municípios da BD") @@ -80,21 +82,18 @@ def open_csvs(url_diesel_gnv, url_gasolina_etanol, url_glp): log("Abrindo os arquivos csvs") diesel = pd.read_csv(f"{url_diesel_gnv}", sep=";", encoding="utf-8") log("----" * 150) - # log("Abrindo os arquivos csvs diesel") log("----" * 150) log(diesel["Data da Coleta"].unique()) gasolina = pd.read_csv(f"{url_gasolina_etanol}", sep=";", encoding="utf-8") log("----" * 150) log("Abrindo os arquivos csvs gasolina") log("----" * 150) - # log(gasolina["Data da Coleta"].unique()) glp = pd.read_csv(f"{url_glp}", sep=";", encoding="utf-8") log("Abrindo os arquivos csvs glp") log("----" * 150) # log(glp["Data da Coleta"].unique()) data_frames.extend([diesel, gasolina, glp]) precos_combustiveis = pd.concat(data_frames, ignore_index=True) - # log(precos_combustiveis["Data da Coleta"].unique()) log("----" * 150) log("Dados concatenados com sucesso") log("----" * 150) @@ -115,21 +114,111 @@ def partition_data(df: pd.DataFrame, column_name: list[str], output_directory: s for value in unique_values: value_str = str(value)[:10] date_value = datetime.strptime(value_str, "%Y-%m-%d").date() - # log(date_value) formatted_value = date_value.strftime("%Y-%m-%d") - # log(formatted_value) partition_path = os.path.join( output_directory, f"{column_name}={formatted_value}" ) - # log(f"Salvando dados em {partition_path}") if not os.path.exists(partition_path): os.makedirs(partition_path) df_partition = df[df[column_name] == value].copy() df_partition.drop([column_name], axis=1, inplace=True) - # log(f"df_partition: {df_partition}") csv_path = os.path.join(partition_path, "data.csv") df_partition.to_csv(csv_path, index=False, encoding="utf-8", na_rep="") log(f"Arquivo {csv_path} salvo com sucesso!") + + +def merge_table_id_municipio( + id_municipio: pd.DataFrame, pd_precos_combustiveis: pd.DataFrame +): + log("Iniciando tratamento dos dados precos_combustiveis") + precos_combustiveis = pd.merge( + id_municipio, + pd_precos_combustiveis, + how="right", + left_on=["nome", "sigla_uf"], + right_on=["Municipio", "Estado - Sigla"], + ) + log("----" * 150) + log("Dados mergeados") + + return precos_combustiveis + + +def rename_and_to_create_endereco(precos_combustiveis: pd.DataFrame): + precos_combustiveis["endereco_revenda"] = ( + precos_combustiveis["Nome da Rua"].fillna("") + + "," + + " " + + precos_combustiveis["Numero Rua"].fillna("") + + "," + + " " + + precos_combustiveis["Complemento"].fillna("") + ) + precos_combustiveis.drop(columns=["sigla_uf"], inplace=True) + precos_combustiveis.rename(columns={"Data da Coleta": "data_coleta"}, inplace=True) + + return precos_combustiveis + + +def orderning_data_coleta(precos_combustiveis: pd.DataFrame): + precos_combustiveis["data_coleta"] = ( + precos_combustiveis["data_coleta"].str[6:10] + + "-" + + precos_combustiveis["data_coleta"].str[3:5] + + "-" + + precos_combustiveis["data_coleta"].str[0:2] + ) + + return precos_combustiveis + + +def lower_colunm_produto(precos_combustiveis: pd.DataFrame): + precos_combustiveis["Produto"] = precos_combustiveis["Produto"].str.lower() + return precos_combustiveis + + +def creating_column_ano(precos_combustiveis: pd.DataFrame): + precos_combustiveis["ano"] = precos_combustiveis["data_coleta"].str[0:4] + precos_combustiveis["ano"].replace("nan", "", inplace=True) + + return precos_combustiveis + + +def rename_and_reordening(precos_combustiveis: pd.DataFrame): + precos_combustiveis.rename(columns=anatel_constants.RENAME.value, inplace=True) + precos_combustiveis = precos_combustiveis[anatel_constants.ORDEM.value] + + return precos_combustiveis + + +def rename_columns(precos_combustiveis: pd.DataFrame): + precos_combustiveis["ano"] = precos_combustiveis["ano"].apply( + lambda x: str(x).replace(".0", "") + ) + precos_combustiveis["cep_revenda"] = precos_combustiveis["cep_revenda"].apply( + lambda x: str(x).replace("-", "") + ) + precos_combustiveis["unidade_medida"] = precos_combustiveis["unidade_medida"].map( + {"R$ / litro": "R$/litro", "R$ / m³": "R$/m3", "R$ / 13 kg": "R$/13kg"} + ) + precos_combustiveis["nome_estabelecimento"] = precos_combustiveis[ + "nome_estabelecimento" + ].apply(lambda x: str(x).replace(",", "")) + precos_combustiveis["preco_compra"] = precos_combustiveis["preco_compra"].apply( + lambda x: str(x).replace(",", ".") + ) + precos_combustiveis["preco_venda"] = precos_combustiveis["preco_venda"].apply( + lambda x: str(x).replace(",", ".") + ) + precos_combustiveis["preco_venda"] = precos_combustiveis["preco_venda"].replace( + "nan", "" + ) + precos_combustiveis["preco_compra"] = precos_combustiveis["preco_compra"].replace( + "nan", "" + ) + precos_combustiveis.replace(np.nan, "", inplace=True) + + return precos_combustiveis diff --git a/pipelines/utils/utils.py b/pipelines/utils/utils.py index 279dc8660..57dc2758e 100644 --- a/pipelines/utils/utils.py +++ b/pipelines/utils/utils.py @@ -725,7 +725,9 @@ def extract_last_update( raise -def extract_last_date(dataset_id, table_id, date_format: str, billing_project_id: str): +def extract_last_date( + dataset_id, table_id, date_format: str, billing_project_id: str, data: str = "data" +): """ Extracts the last update date of a given dataset table. @@ -772,7 +774,7 @@ def extract_last_date(dataset_id, table_id, date_format: str, billing_project_id try: query_bd = f""" SELECT - MAX(data) as max_date + MAX({data}) as max_date FROM `basedosdados.{dataset_id}.{table_id}` """