From 0fa7474cd04210bdfd95f8987e5b8c7527aaf5db Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Fri, 15 Sep 2023 10:33:13 -0300 Subject: [PATCH 01/20] =?UTF-8?q?define=20execu=C3=A7=C3=A3o=20sequencial?= =?UTF-8?q?=20das=20schedules?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pipelines/datasets/br_ms_cnes/schedules.py | 26 +++++++++++----------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/pipelines/datasets/br_ms_cnes/schedules.py b/pipelines/datasets/br_ms_cnes/schedules.py index 02f90701e..0f96ad724 100644 --- a/pipelines/datasets/br_ms_cnes/schedules.py +++ b/pipelines/datasets/br_ms_cnes/schedules.py @@ -12,7 +12,7 @@ schedule_br_ms_cnes_estabelecimento = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="0 9 * * *", # every day at 9:00 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -31,7 +31,7 @@ adjustments=[adjustments.next_weekday], ) - +# todo selecionar outro horário schedule_br_ms_cnes_profissional = Schedule( clocks=[ CronClock( @@ -58,7 +58,7 @@ schedule_br_ms_cnes_equipe = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="30 9 * * *", # every day 9:30 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -80,7 +80,7 @@ schedule_br_ms_cnes_leito = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="0 10 * * *", # every day at 10:00 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -102,7 +102,7 @@ schedule_br_ms_cnes_equipamento = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="30 10 * * *", # every day at 10:30 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -124,7 +124,7 @@ schedule_br_ms_cnes_estabelecimento_ensino = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="45 10 * * *", # every day at 10:45 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -146,7 +146,7 @@ schedule_br_ms_cnes_dados_complementares = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="0 11 * * *", # every day 11:00 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -168,7 +168,7 @@ schedule_br_ms_cnes_estabelecimento_filantropico = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="15 11 * * *", # every day at 11:15 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -189,7 +189,7 @@ schedule_br_ms_cnes_gestao_metas = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="30 11 * * *", # every day at 11:30 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -212,7 +212,7 @@ schedule_br_ms_cnes_habilitacao = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="45 11 * * *", # every day at 11:45 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -234,7 +234,7 @@ schedule_br_ms_cnes_incentivos = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="45 11 * * *", # every day at 11:45 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -256,7 +256,7 @@ schedule_br_ms_cnes_regra_contratual = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="0 12 * * *", # every day at 12:00 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -279,7 +279,7 @@ schedule_br_ms_cnes_servico_especializado = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="30 12 * * *", # every day at 12:30 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, From 5187b5f7684d030240b9df18c126453597d0dd6b Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Fri, 15 Sep 2023 10:57:49 -0300 Subject: [PATCH 02/20] debuga erro na task update_django_metadata --- pipelines/datasets/br_ms_cnes/flows.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/pipelines/datasets/br_ms_cnes/flows.py b/pipelines/datasets/br_ms_cnes/flows.py index d76b9dd82..2e474ddfa 100644 --- a/pipelines/datasets/br_ms_cnes/flows.py +++ b/pipelines/datasets/br_ms_cnes/flows.py @@ -398,6 +398,23 @@ "Os dados do FTP CNES-ST ainda não foram atualizados para o ano/mes mais recente" ) + with case(update_metadata, True): + update = update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_last_update=False, + bq_table_last_year_month=True, + api_mode="prod", + billing_project_id="basedosdados", + date_format="yy-mm", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="months", + upstream_tasks=[files_path], + ) + with case(is_empty(files_path), False): dbc_files = access_ftp_donwload_files( file_list=files_path, From fe6d1d50b95a62135c382c4897a859fc471b73b3 Mon Sep 17 00:00:00 2001 From: tricktx Date: Fri, 15 Sep 2023 11:03:17 -0300 Subject: [PATCH 03/20] check for updates and organizing --- .../br_anp_precos_combustiveis/constants.py | 4 + .../br_anp_precos_combustiveis/flows.py | 157 ++++++++---------- .../br_anp_precos_combustiveis/schedules.py | 8 +- .../br_anp_precos_combustiveis/tasks.py | 138 +++++++-------- .../br_anp_precos_combustiveis/utils.py | 111 +++++++++++-- pipelines/utils/utils.py | 6 +- 6 files changed, 238 insertions(+), 186 deletions(-) diff --git a/pipelines/datasets/br_anp_precos_combustiveis/constants.py b/pipelines/datasets/br_anp_precos_combustiveis/constants.py index b9299317b..d35088bfd 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/constants.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/constants.py @@ -48,6 +48,10 @@ class constants(Enum): # pylint: disable=c0103 "https://www.gov.br/anp/pt-br/centrais-de-conteudo/dados-abertos/arquivos/shpc/qus/ultimas-4-semanas-diesel-gnv.csv", ] + URLS_DATA = [ + "https://www.gov.br/anp/pt-br/centrais-de-conteudo/dados-abertos/arquivos/shpc/qus/ultimas-4-semanas-glp.csv" + ] + PATH_INPUT = "/tmp/input/" PATH_OUTPUT = "/tmp/output/" diff --git a/pipelines/datasets/br_anp_precos_combustiveis/flows.py b/pipelines/datasets/br_anp_precos_combustiveis/flows.py index ce81cd827..eb156573d 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/flows.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/flows.py @@ -13,11 +13,13 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.datasets.br_anp_precos_combustiveis.utils import download_files from pipelines.datasets.br_anp_precos_combustiveis.tasks import ( - tratamento, + download_and_transform, data_max_bd_mais, data_max_bd_pro, make_partitions, + check_for_updates, ) from pipelines.datasets.br_anp_precos_combustiveis.schedules import ( every_week_anp_microdados, @@ -26,6 +28,11 @@ create_table_and_upload_to_gcs, rename_current_flow_run_dataset_table, get_current_flow_labels, + log_task, +) + +from pipelines.datasets.br_anp_precos_combustiveis.constants import ( + constants as anatel_constants, ) with Flow( @@ -37,7 +44,7 @@ table_id = Parameter("table_id", default="microdados", required=True) materialization_mode = Parameter( - "materialization_mode", default="prod", required=False + "materialization_mode", default="dev", required=False ) materialize_after_dump = Parameter( @@ -51,103 +58,69 @@ ) update_metadata = Parameter("update_metadata", default=True, required=False) - df = tratamento(upstream_tasks=[rename_flow_run]) - output_path = make_partitions(df=df, upstream_tasks=[df]) - get_date_max_mais = data_max_bd_mais() - get_date_max_pro = data_max_bd_pro(df=df) - - # pylint: disable=C0103 - wait_upload_table = create_table_and_upload_to_gcs( - data_path=output_path, - dataset_id=dataset_id, - table_id=table_id, - dump_mode="append", - wait=output_path, - ) - - # ! BD MAIS - Atrasado - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id, - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", + dados_desatualizados = check_for_updates(dataset_id=dataset_id, table_id=table_id) + log_task(f"Checando se os dados estão desatualizados: {dados_desatualizados}") + with case(dados_desatualizados, False): + log_task( + "Dados atualizados, não é necessário fazer o download", + upstream_tasks=[dados_desatualizados], ) - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) + with case(dados_desatualizados, True): + df = download_and_transform(upstream_tasks=[rename_flow_run]) + output_path = make_partitions(df=df, upstream_tasks=[df]) + get_date_max_mais = data_max_bd_mais() + get_date_max_pro = data_max_bd_pro(df=df) - with case(update_metadata, True): - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - api_mode="prod", - date_format="yy-mm-dd", - _last_date=get_date_max_mais, - upstream_tasks=[wait_upload_table], - ) - - # ! BD PRO - Atualizado - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}" "_atualizado", + # pylint: disable=C0103 + wait_upload_table = create_table_and_upload_to_gcs( + data_path=output_path, + dataset_id=dataset_id, + table_id=table_id, + dump_mode="append", + wait=output_path, ) - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) + with case(materialize_after_dump, True): + # Trigger DBT flow run + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": table_id, + "mode": materialization_mode, + "dbt_alias": dbt_alias, + }, + labels=current_flow_labels, + run_name=f"Materialize {dataset_id}.{table_id}", + ) - with case(update_metadata, True): - update_django_metadata( - dataset_id, - table_id + "_atualizado", - metadata_type="DateTimeRange", - bq_last_update=False, - api_mode="prod", - date_format="yy-mm-dd", - _last_date=get_date_max_pro, - upstream_tasks=[wait_upload_table], + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + with case(update_metadata, True): + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_last_update=False, + api_mode="prod", + date_format="yy-mm-dd", + _last_date=get_date_max_mais, + upstream_tasks=[wait_upload_table], + ) anp_microdados.storage = GCS(constants.GCS_FLOWS_BUCKET.value) anp_microdados.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) diff --git a/pipelines/datasets/br_anp_precos_combustiveis/schedules.py b/pipelines/datasets/br_anp_precos_combustiveis/schedules.py index 76c702d98..46ee4ecb2 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/schedules.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/schedules.py @@ -3,13 +3,7 @@ Schedules for br_anp_precos_combustiveis """ -# -*- coding: utf-8 -*- -""" -Schedules for br_anp_precos_combustiveis -""" - from datetime import timedelta, datetime - from prefect.schedules import Schedule from prefect.schedules.clocks import IntervalClock from pipelines.constants import constants @@ -24,7 +18,7 @@ "update_metadata": True, "dbt_alias": True, "materialize_after_dump": True, - "materialization_mode": "prod", + "materialization_mode": "dev", "table_id": "microdados", "dataset_id": "br_anp_precos_combustiveis", }, diff --git a/pipelines/datasets/br_anp_precos_combustiveis/tasks.py b/pipelines/datasets/br_anp_precos_combustiveis/tasks.py index b8a226343..3bbf18423 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/tasks.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/tasks.py @@ -12,99 +12,89 @@ get_id_municipio, open_csvs, partition_data, + merge_table_id_municipio, + orderning_data_coleta, + creating_column_ano, + rename_and_reordening, + rename_columns, + rename_and_to_create_endereco, + lower_colunm_produto, ) from pipelines.datasets.br_anp_precos_combustiveis.constants import ( constants as anatel_constants, ) -from pipelines.utils.utils import log +from pipelines.utils.utils import log, extract_last_date from pipelines.constants import constants +@task +def check_for_updates(dataset_id, table_id): + """ + Checks if there are available updates for a specific dataset and table. + + Returns: + bool: Returns True if updates are available, otherwise returns False. + """ + # Obtém a data mais recente do site + download_files(anatel_constants.URLS_DATA.value, anatel_constants.PATH_INPUT.value) + df = pd.read_csv(anatel_constants.URL_DIESEL_GNV.value, sep=";", encoding="utf-8") + data_obj = df["Data da Coleta"].max() + data_obj = datetime.strptime(data_obj, "%d/%m/%Y").strftime("%Y-%m-%d") + data_obj = datetime.strptime(data_obj, "%Y-%m-%d").date() + + # Obtém a última data no site BD + data_bq_obj = extract_last_date( + dataset_id, table_id, "yy-mm-dd", "basedosdados-dev", data="data_coleta" + ) + + # Registra a data mais recente do site + log(f"Última data no site do ANP: {data_obj}") + log(f"Última data no site da BD: {data_bq_obj}") + + # Compara as datas para verificar se há atualizações + if data_obj > data_bq_obj: + return True # Há atualizações disponíveis + else: + return False # Não há novas atualizações disponíveis + + @task( max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) -def tratamento(): +def download_and_transform(): download_files( - anatel_constants.URLS.value, + [ + anatel_constants.URLS.value, + ], anatel_constants.PATH_INPUT.value, ) precos_combustiveis = open_csvs( - url_diesel_gnv=anatel_constants.URL_DIESEL_GNV.value, - url_gasolina_etanol=anatel_constants.URL_GASOLINA_ETANOL.value, - url_glp=anatel_constants.URL_GLP.value, + anatel_constants.URL_DIESEL_GNV.value, + anatel_constants.URL_GASOLINA_ETANOL.value, + anatel_constants.URL_GLP.value, ) - id_municipio = get_id_municipio() - log("Iniciando tratamento dos dados precos_combustiveis") - precos_combustiveis = pd.merge( - id_municipio, - precos_combustiveis, - how="right", - left_on=["nome", "sigla_uf"], - right_on=["Municipio", "Estado - Sigla"], - ) - log("----" * 150) - log("Dados mergeados") - precos_combustiveis.rename(columns={"Municipio": "nome"}, inplace=True) - precos_combustiveis.dropna(subset=["Valor de Venda"], inplace=True) - precos_combustiveis["endereco_revenda"] = ( - precos_combustiveis["Nome da Rua"].fillna("") - + "," - + " " - + precos_combustiveis["Numero Rua"].fillna("") - + "," - + " " - + precos_combustiveis["Complemento"].fillna("") - ) - precos_combustiveis.drop(columns=["sigla_uf"], inplace=True) - precos_combustiveis.rename(columns={"Data da Coleta": "data_coleta"}, inplace=True) - precos_combustiveis["data_coleta"] = ( - precos_combustiveis["data_coleta"].str[6:10] - + "-" - + precos_combustiveis["data_coleta"].str[3:5] - + "-" - + precos_combustiveis["data_coleta"].str[0:2] - ) - precos_combustiveis["Produto"] = precos_combustiveis["Produto"].str.lower() - precos_combustiveis["ano"] = precos_combustiveis["data_coleta"].str[0:4] - precos_combustiveis["ano"].replace("nan", "", inplace=True) - precos_combustiveis.rename(columns=anatel_constants.RENAME.value, inplace=True) - precos_combustiveis = precos_combustiveis[anatel_constants.ORDEM.value] - precos_combustiveis["ano"] = precos_combustiveis["ano"].apply( - lambda x: str(x).replace(".0", "") - ) - precos_combustiveis["cep_revenda"] = precos_combustiveis["cep_revenda"].apply( - lambda x: str(x).replace("-", "") - ) - precos_combustiveis["unidade_medida"] = precos_combustiveis["unidade_medida"].map( - {"R$ / litro": "R$/litro", "R$ / m³": "R$/m3", "R$ / 13 kg": "R$/13kg"} - ) - precos_combustiveis["nome_estabelecimento"] = precos_combustiveis[ - "nome_estabelecimento" - ].apply(lambda x: str(x).replace(",", "")) - precos_combustiveis["preco_compra"] = precos_combustiveis["preco_compra"].apply( - lambda x: str(x).replace(",", ".") - ) - precos_combustiveis["preco_venda"] = precos_combustiveis["preco_venda"].apply( - lambda x: str(x).replace(",", ".") - ) - precos_combustiveis["preco_venda"] = precos_combustiveis["preco_venda"].replace( - "nan", "" - ) - precos_combustiveis["preco_compra"] = precos_combustiveis["preco_compra"].replace( - "nan", "" + df = get_id_municipio(id_municipio=precos_combustiveis) + + df = merge_table_id_municipio( + id_municipio=df, pd_precos_combustiveis=precos_combustiveis ) - precos_combustiveis.replace(np.nan, "", inplace=True) - log("----" * 150) - log("Dados tratados com sucesso") - log("----" * 150) - log("Iniciando particionamento dos dados") - log("----" * 150) - log(precos_combustiveis["data_coleta"].unique()) - - return precos_combustiveis + + df = rename_and_to_create_endereco(precos_combustiveis=df) + + df = orderning_data_coleta(precos_combustiveis=df) + + df = lower_colunm_produto(precos_combustiveis=df) + + df = creating_column_ano(precos_combustiveis=df) + + df = rename_and_reordening(precos_combustiveis=df) + + df = rename_columns(precos_combustiveis=df) + + return df @task( diff --git a/pipelines/datasets/br_anp_precos_combustiveis/utils.py b/pipelines/datasets/br_anp_precos_combustiveis/utils.py index 8f8bc3d3b..38cf5eef1 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/utils.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/utils.py @@ -9,6 +9,10 @@ import requests from pipelines.utils.utils import log from datetime import datetime +from pipelines.datasets.br_anp_precos_combustiveis.constants import ( + constants as anatel_constants, +) +import numpy as np def download_files(urls, path): @@ -28,10 +32,8 @@ def download_files(urls, path): for url in urls: response = requests.get(url) if response.status_code == 200: - file_name = url.split("/")[-1] # Get the filename from the URL + file_name = url.split("/")[-1] file_path = os.path.join(path, file_name) - # log("----" * 150) - # log("if response.status_code == 200: SUCESSO") with open(file_path, "wb") as file: file.write(response.content) @@ -46,7 +48,7 @@ def download_files(urls, path): return downloaded_files -def get_id_municipio(): +def get_id_municipio(id_municipio: pd.DataFrame): # ! Carregando os dados direto do Diretório de municipio da BD # Para carregar o dado direto no pandas log("Carregando dados do diretório de municípios da BD") @@ -80,21 +82,18 @@ def open_csvs(url_diesel_gnv, url_gasolina_etanol, url_glp): log("Abrindo os arquivos csvs") diesel = pd.read_csv(f"{url_diesel_gnv}", sep=";", encoding="utf-8") log("----" * 150) - # log("Abrindo os arquivos csvs diesel") log("----" * 150) log(diesel["Data da Coleta"].unique()) gasolina = pd.read_csv(f"{url_gasolina_etanol}", sep=";", encoding="utf-8") log("----" * 150) log("Abrindo os arquivos csvs gasolina") log("----" * 150) - # log(gasolina["Data da Coleta"].unique()) glp = pd.read_csv(f"{url_glp}", sep=";", encoding="utf-8") log("Abrindo os arquivos csvs glp") log("----" * 150) # log(glp["Data da Coleta"].unique()) data_frames.extend([diesel, gasolina, glp]) precos_combustiveis = pd.concat(data_frames, ignore_index=True) - # log(precos_combustiveis["Data da Coleta"].unique()) log("----" * 150) log("Dados concatenados com sucesso") log("----" * 150) @@ -115,21 +114,111 @@ def partition_data(df: pd.DataFrame, column_name: list[str], output_directory: s for value in unique_values: value_str = str(value)[:10] date_value = datetime.strptime(value_str, "%Y-%m-%d").date() - # log(date_value) formatted_value = date_value.strftime("%Y-%m-%d") - # log(formatted_value) partition_path = os.path.join( output_directory, f"{column_name}={formatted_value}" ) - # log(f"Salvando dados em {partition_path}") if not os.path.exists(partition_path): os.makedirs(partition_path) df_partition = df[df[column_name] == value].copy() df_partition.drop([column_name], axis=1, inplace=True) - # log(f"df_partition: {df_partition}") csv_path = os.path.join(partition_path, "data.csv") df_partition.to_csv(csv_path, index=False, encoding="utf-8", na_rep="") log(f"Arquivo {csv_path} salvo com sucesso!") + + +def merge_table_id_municipio( + id_municipio: pd.DataFrame, pd_precos_combustiveis: pd.DataFrame +): + log("Iniciando tratamento dos dados precos_combustiveis") + precos_combustiveis = pd.merge( + id_municipio, + pd_precos_combustiveis, + how="right", + left_on=["nome", "sigla_uf"], + right_on=["Municipio", "Estado - Sigla"], + ) + log("----" * 150) + log("Dados mergeados") + + return precos_combustiveis + + +def rename_and_to_create_endereco(precos_combustiveis: pd.DataFrame): + precos_combustiveis["endereco_revenda"] = ( + precos_combustiveis["Nome da Rua"].fillna("") + + "," + + " " + + precos_combustiveis["Numero Rua"].fillna("") + + "," + + " " + + precos_combustiveis["Complemento"].fillna("") + ) + precos_combustiveis.drop(columns=["sigla_uf"], inplace=True) + precos_combustiveis.rename(columns={"Data da Coleta": "data_coleta"}, inplace=True) + + return precos_combustiveis + + +def orderning_data_coleta(precos_combustiveis: pd.DataFrame): + precos_combustiveis["data_coleta"] = ( + precos_combustiveis["data_coleta"].str[6:10] + + "-" + + precos_combustiveis["data_coleta"].str[3:5] + + "-" + + precos_combustiveis["data_coleta"].str[0:2] + ) + + return precos_combustiveis + + +def lower_colunm_produto(precos_combustiveis: pd.DataFrame): + precos_combustiveis["Produto"] = precos_combustiveis["Produto"].str.lower() + return precos_combustiveis + + +def creating_column_ano(precos_combustiveis: pd.DataFrame): + precos_combustiveis["ano"] = precos_combustiveis["data_coleta"].str[0:4] + precos_combustiveis["ano"].replace("nan", "", inplace=True) + + return precos_combustiveis + + +def rename_and_reordening(precos_combustiveis: pd.DataFrame): + precos_combustiveis.rename(columns=anatel_constants.RENAME.value, inplace=True) + precos_combustiveis = precos_combustiveis[anatel_constants.ORDEM.value] + + return precos_combustiveis + + +def rename_columns(precos_combustiveis: pd.DataFrame): + precos_combustiveis["ano"] = precos_combustiveis["ano"].apply( + lambda x: str(x).replace(".0", "") + ) + precos_combustiveis["cep_revenda"] = precos_combustiveis["cep_revenda"].apply( + lambda x: str(x).replace("-", "") + ) + precos_combustiveis["unidade_medida"] = precos_combustiveis["unidade_medida"].map( + {"R$ / litro": "R$/litro", "R$ / m³": "R$/m3", "R$ / 13 kg": "R$/13kg"} + ) + precos_combustiveis["nome_estabelecimento"] = precos_combustiveis[ + "nome_estabelecimento" + ].apply(lambda x: str(x).replace(",", "")) + precos_combustiveis["preco_compra"] = precos_combustiveis["preco_compra"].apply( + lambda x: str(x).replace(",", ".") + ) + precos_combustiveis["preco_venda"] = precos_combustiveis["preco_venda"].apply( + lambda x: str(x).replace(",", ".") + ) + precos_combustiveis["preco_venda"] = precos_combustiveis["preco_venda"].replace( + "nan", "" + ) + precos_combustiveis["preco_compra"] = precos_combustiveis["preco_compra"].replace( + "nan", "" + ) + precos_combustiveis.replace(np.nan, "", inplace=True) + + return precos_combustiveis diff --git a/pipelines/utils/utils.py b/pipelines/utils/utils.py index 9d391ab7d..95f434e1c 100644 --- a/pipelines/utils/utils.py +++ b/pipelines/utils/utils.py @@ -670,7 +670,9 @@ def extract_last_update( raise -def extract_last_date(dataset_id, table_id, date_format: str, billing_project_id: str): +def extract_last_date( + dataset_id, table_id, date_format: str, billing_project_id: str, data +): """ Extracts the last update date of a given dataset table. @@ -717,7 +719,7 @@ def extract_last_date(dataset_id, table_id, date_format: str, billing_project_id try: query_bd = f""" SELECT - MAX(data) as max_date + MAX({data}) as max_date FROM `basedosdados.{dataset_id}.{table_id}` """ From 6a96e080b579df74cb879ae4f82640685cf9f084 Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Fri, 15 Sep 2023 11:11:07 -0300 Subject: [PATCH 04/20] =?UTF-8?q?insere=20modifica=C3=A7=C3=A3o=20na=20tas?= =?UTF-8?q?k=20update=5Fdjango=5Fmetadata?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pipelines/utils/metadata/tasks.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pipelines/utils/metadata/tasks.py b/pipelines/utils/metadata/tasks.py index 72e22cccd..40e99a8b0 100644 --- a/pipelines/utils/metadata/tasks.py +++ b/pipelines/utils/metadata/tasks.py @@ -107,8 +107,9 @@ def update_django_metadata( "weeks": "weeks", "days": "days", } - if not isinstance(_last_date, str): - raise ValueError("O parâmetro `last_date` deve ser do tipo string") + + if not isinstance(_last_date, str) and _last_date is not None: + raise ValueError("O parâmetro `last_date` deve ser uma string não nula") if time_unit not in unidades_permitidas: raise ValueError( From fe395dffda72d7225ec7684ad726eac205963193 Mon Sep 17 00:00:00 2001 From: tricktx Date: Fri, 15 Sep 2023 11:19:45 -0300 Subject: [PATCH 05/20] url --- pipelines/datasets/br_anp_precos_combustiveis/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/datasets/br_anp_precos_combustiveis/tasks.py b/pipelines/datasets/br_anp_precos_combustiveis/tasks.py index 3bbf18423..23c611e81 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/tasks.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/tasks.py @@ -37,7 +37,7 @@ def check_for_updates(dataset_id, table_id): """ # Obtém a data mais recente do site download_files(anatel_constants.URLS_DATA.value, anatel_constants.PATH_INPUT.value) - df = pd.read_csv(anatel_constants.URL_DIESEL_GNV.value, sep=";", encoding="utf-8") + df = pd.read_csv(anatel_constants.URL_GLP.value, sep=";", encoding="utf-8") data_obj = df["Data da Coleta"].max() data_obj = datetime.strptime(data_obj, "%d/%m/%Y").strftime("%Y-%m-%d") data_obj = datetime.strptime(data_obj, "%Y-%m-%d").date() From 58cff2244cf2bb732a6f2a7adb39b68e1e4352f5 Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Fri, 15 Sep 2023 11:26:03 -0300 Subject: [PATCH 06/20] =?UTF-8?q?corrige=20flow=20e=20setta=20hor=C3=A1rio?= =?UTF-8?q?=20da=20schedule=20de=20profissional?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pipelines/datasets/br_ms_cnes/flows.py | 17 ----------------- pipelines/datasets/br_ms_cnes/schedules.py | 2 +- 2 files changed, 1 insertion(+), 18 deletions(-) diff --git a/pipelines/datasets/br_ms_cnes/flows.py b/pipelines/datasets/br_ms_cnes/flows.py index 2e474ddfa..d76b9dd82 100644 --- a/pipelines/datasets/br_ms_cnes/flows.py +++ b/pipelines/datasets/br_ms_cnes/flows.py @@ -398,23 +398,6 @@ "Os dados do FTP CNES-ST ainda não foram atualizados para o ano/mes mais recente" ) - with case(update_metadata, True): - update = update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - bq_table_last_year_month=True, - api_mode="prod", - billing_project_id="basedosdados", - date_format="yy-mm", - is_bd_pro=True, - is_free=True, - time_delta=6, - time_unit="months", - upstream_tasks=[files_path], - ) - with case(is_empty(files_path), False): dbc_files = access_ftp_donwload_files( file_list=files_path, diff --git a/pipelines/datasets/br_ms_cnes/schedules.py b/pipelines/datasets/br_ms_cnes/schedules.py index 0f96ad724..e18d9d2ad 100644 --- a/pipelines/datasets/br_ms_cnes/schedules.py +++ b/pipelines/datasets/br_ms_cnes/schedules.py @@ -35,7 +35,7 @@ schedule_br_ms_cnes_profissional = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="30 6 * * *", # every day at 18:30 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, From 08c54cb3f523fe173bf82016a53e98b134166f37 Mon Sep 17 00:00:00 2001 From: tricktx Date: Fri, 15 Sep 2023 11:32:27 -0300 Subject: [PATCH 07/20] tupla in download_files --- pipelines/datasets/br_anp_precos_combustiveis/tasks.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pipelines/datasets/br_anp_precos_combustiveis/tasks.py b/pipelines/datasets/br_anp_precos_combustiveis/tasks.py index 23c611e81..77755e3f9 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/tasks.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/tasks.py @@ -63,12 +63,7 @@ def check_for_updates(dataset_id, table_id): retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) def download_and_transform(): - download_files( - [ - anatel_constants.URLS.value, - ], - anatel_constants.PATH_INPUT.value, - ) + download_files(anatel_constants.URLS.value, anatel_constants.PATH_INPUT.value) precos_combustiveis = open_csvs( anatel_constants.URL_DIESEL_GNV.value, From 7d4288c8aa2a81d1126de4d10173501d542fb0f3 Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Fri, 15 Sep 2023 11:46:38 -0300 Subject: [PATCH 08/20] testa update_metadata novamente --- pipelines/datasets/br_rf_cafir/flows.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pipelines/datasets/br_rf_cafir/flows.py b/pipelines/datasets/br_rf_cafir/flows.py index 34f9a5a42..d97924e80 100644 --- a/pipelines/datasets/br_rf_cafir/flows.py +++ b/pipelines/datasets/br_rf_cafir/flows.py @@ -57,6 +57,22 @@ dataset_id=dataset_id, table_id=table_id, data=info[0], upstream_tasks=[info] ) + with case(update_metadata, True): + update = update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + _last_date=info[0], + bq_last_update=False, + api_mode="prod", + date_format="yy-mm-dd", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="months", + upstream_tasks=[info], + ) + with case(is_outdated, False): log_task(f"Não há atualizações para a tabela de {table_id}!") From baafc713d7d3160004c9a203514872f8e21f287d Mon Sep 17 00:00:00 2001 From: tricktx Date: Fri, 15 Sep 2023 11:57:27 -0300 Subject: [PATCH 09/20] =?UTF-8?q?ajustando=20schedules=20para=20todos=20os?= =?UTF-8?q?=20dias=2010=20da=20manh=C3=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../datasets/br_anp_precos_combustiveis/schedules.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pipelines/datasets/br_anp_precos_combustiveis/schedules.py b/pipelines/datasets/br_anp_precos_combustiveis/schedules.py index 46ee4ecb2..9c660ab04 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/schedules.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/schedules.py @@ -5,20 +5,20 @@ from datetime import timedelta, datetime from prefect.schedules import Schedule -from prefect.schedules.clocks import IntervalClock +from prefect.schedules.clocks import CronClock from pipelines.constants import constants every_week_anp_microdados = Schedule( clocks=[ - IntervalClock( - interval=timedelta(days=1), + CronClock( + cron="0 10 * * *", start_date=datetime(2021, 1, 1), labels=[constants.BASEDOSDADOS_PROD_AGENT_LABEL.value], parameter_defaults={ "update_metadata": True, "dbt_alias": True, "materialize_after_dump": True, - "materialization_mode": "dev", + "materialization_mode": "prod", "table_id": "microdados", "dataset_id": "br_anp_precos_combustiveis", }, From 8a1699ea02d0ece0658bc2d7e004e2ea213cab41 Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Fri, 15 Sep 2023 12:26:27 -0300 Subject: [PATCH 10/20] registar o flow --- pipelines/datasets/br_rf_cafir/flows.py | 2 +- pipelines/datasets/br_rf_cafir/tasks.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pipelines/datasets/br_rf_cafir/flows.py b/pipelines/datasets/br_rf_cafir/flows.py index d97924e80..c0d28369d 100644 --- a/pipelines/datasets/br_rf_cafir/flows.py +++ b/pipelines/datasets/br_rf_cafir/flows.py @@ -70,7 +70,7 @@ is_free=True, time_delta=6, time_unit="months", - upstream_tasks=[info], + upstream_tasks=[is_outdated, info], ) with case(is_outdated, False): diff --git a/pipelines/datasets/br_rf_cafir/tasks.py b/pipelines/datasets/br_rf_cafir/tasks.py index 8358c5918..5beaa5d8f 100644 --- a/pipelines/datasets/br_rf_cafir/tasks.py +++ b/pipelines/datasets/br_rf_cafir/tasks.py @@ -155,8 +155,7 @@ def parse_data(url: str, other_task_output: tuple[list[datetime], list[str]]) -> # save new file as csv df.to_csv(save_path, index=False, sep=",", na_rep="", encoding="utf-8") - # resolve ASCII 0 no momento da leitura do BQ - # ler e salvar de novo + # resolve ASCII 0 no momento da leitura do BQ. Ler e salvar de novo. df = pd.read_csv(save_path, dtype=str) df.to_csv(save_path, index=False, sep=",", na_rep="", encoding="utf-8") From 7bb843c93be06fc0e5cec9dd3348807fc7d05a33 Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Fri, 15 Sep 2023 12:59:11 -0300 Subject: [PATCH 11/20] cria task que converte datetime pra str --- pipelines/datasets/br_rf_cafir/flows.py | 4 +++- pipelines/datasets/br_rf_cafir/tasks.py | 13 +++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/pipelines/datasets/br_rf_cafir/flows.py b/pipelines/datasets/br_rf_cafir/flows.py index c0d28369d..73262f2ec 100644 --- a/pipelines/datasets/br_rf_cafir/flows.py +++ b/pipelines/datasets/br_rf_cafir/flows.py @@ -16,6 +16,7 @@ parse_files_parse_date, parse_data, check_if_bq_data_is_outdated, + convert_datetime_to_string, ) from pipelines.utils.constants import constants as utils_constants @@ -56,13 +57,14 @@ is_outdated = check_if_bq_data_is_outdated( dataset_id=dataset_id, table_id=table_id, data=info[0], upstream_tasks=[info] ) + update_metadata_strig_date = convert_datetime_to_string(data=info[0]) with case(update_metadata, True): update = update_django_metadata( dataset_id, table_id, metadata_type="DateTimeRange", - _last_date=info[0], + _last_date=update_metadata_strig_date, bq_last_update=False, api_mode="prod", date_format="yy-mm-dd", diff --git a/pipelines/datasets/br_rf_cafir/tasks.py b/pipelines/datasets/br_rf_cafir/tasks.py index 5beaa5d8f..8c4b9b5b9 100644 --- a/pipelines/datasets/br_rf_cafir/tasks.py +++ b/pipelines/datasets/br_rf_cafir/tasks.py @@ -178,3 +178,16 @@ def parse_data(url: str, other_task_output: tuple[list[datetime], list[str]]) -> ) return files_path + + +@task +def convert_datetime_to_string(data: datetime): + """Converte a data para string + + Args: + date (datetime): Data + + Returns: + string: Data no formato string + """ + return str(data) From 88110e3b8face52c1a265d8c2e2cb9ead02448ee Mon Sep 17 00:00:00 2001 From: tricktx Date: Fri, 15 Sep 2023 13:01:14 -0300 Subject: [PATCH 12/20] default data --- pipelines/utils/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/utils/utils.py b/pipelines/utils/utils.py index 3282dc643..57dc2758e 100644 --- a/pipelines/utils/utils.py +++ b/pipelines/utils/utils.py @@ -726,7 +726,7 @@ def extract_last_update( def extract_last_date( - dataset_id, table_id, date_format: str, billing_project_id: str, data + dataset_id, table_id, date_format: str, billing_project_id: str, data: str = "data" ): """ Extracts the last update date of a given dataset table. From 2a189bdf4f55b68842ae694c6ac42b2316d484fe Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Fri, 15 Sep 2023 13:34:38 -0300 Subject: [PATCH 13/20] reordena upstream tasks --- pipelines/datasets/br_rf_cafir/flows.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pipelines/datasets/br_rf_cafir/flows.py b/pipelines/datasets/br_rf_cafir/flows.py index 73262f2ec..8303a07f4 100644 --- a/pipelines/datasets/br_rf_cafir/flows.py +++ b/pipelines/datasets/br_rf_cafir/flows.py @@ -57,7 +57,9 @@ is_outdated = check_if_bq_data_is_outdated( dataset_id=dataset_id, table_id=table_id, data=info[0], upstream_tasks=[info] ) - update_metadata_strig_date = convert_datetime_to_string(data=info[0]) + update_metadata_strig_date = convert_datetime_to_string( + data=info[0], upstream_tasks=[info, is_outdated] + ) with case(update_metadata, True): update = update_django_metadata( @@ -72,7 +74,7 @@ is_free=True, time_delta=6, time_unit="months", - upstream_tasks=[is_outdated, info], + upstream_tasks=[update_metadata_strig_date], ) with case(is_outdated, False): From a976d640c3a847f89e76113cf083248a920bfdcb Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Fri, 15 Sep 2023 14:39:49 -0300 Subject: [PATCH 14/20] =?UTF-8?q?tira=20updater=20do=20coverage=20gr=C3=A1?= =?UTF-8?q?tis?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pipelines/datasets/br_rf_cafir/flows.py | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/pipelines/datasets/br_rf_cafir/flows.py b/pipelines/datasets/br_rf_cafir/flows.py index 8303a07f4..fbbbadda0 100644 --- a/pipelines/datasets/br_rf_cafir/flows.py +++ b/pipelines/datasets/br_rf_cafir/flows.py @@ -61,22 +61,6 @@ data=info[0], upstream_tasks=[info, is_outdated] ) - with case(update_metadata, True): - update = update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - _last_date=update_metadata_strig_date, - bq_last_update=False, - api_mode="prod", - date_format="yy-mm-dd", - is_bd_pro=True, - is_free=True, - time_delta=6, - time_unit="months", - upstream_tasks=[update_metadata_strig_date], - ) - with case(is_outdated, False): log_task(f"Não há atualizações para a tabela de {table_id}!") @@ -126,6 +110,10 @@ wait_for_materialization.retry_delay = timedelta( seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + # TODO: Quando a nova fotografia for liberada setar is_free como True + # is_free como true. Não setei agora pq a task update_django_metadata depende + # de um coverage já criado na API. Como a lag entre fotográfias é de 5 meses (6 é o padrão no monento) + # não há necessidade de atualizar o coverage agora. with case(update_metadata, True): update = update_django_metadata( From 8a2a7d4f8a6237784601e772b36244272d69ac3c Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Fri, 15 Sep 2023 14:48:43 -0300 Subject: [PATCH 15/20] =?UTF-8?q?insere=20novo=20input=20na=20fun=C3=A7?= =?UTF-8?q?=C3=A3o=20update=5Fdjango=5Fmetadata?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pipelines/datasets/br_rf_cafir/flows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/datasets/br_rf_cafir/flows.py b/pipelines/datasets/br_rf_cafir/flows.py index fbbbadda0..63b9fdb60 100644 --- a/pipelines/datasets/br_rf_cafir/flows.py +++ b/pipelines/datasets/br_rf_cafir/flows.py @@ -120,7 +120,7 @@ dataset_id, table_id, metadata_type="DateTimeRange", - _last_date=info[0], + _last_date=update_metadata_strig_date, bq_last_update=False, api_mode="prod", date_format="yy-mm-dd", From 7158c4232dee7326021faf06ad1baa20ad7064a8 Mon Sep 17 00:00:00 2001 From: Pedro Castro Date: Fri, 15 Sep 2023 15:37:19 -0300 Subject: [PATCH 16/20] br_mp_pep_cargos_funcoes: update table name --- pipelines/datasets/br_mp_pep_cargos_funcoes/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/datasets/br_mp_pep_cargos_funcoes/tasks.py b/pipelines/datasets/br_mp_pep_cargos_funcoes/tasks.py index 1acab03e5..b5a93c810 100644 --- a/pipelines/datasets/br_mp_pep_cargos_funcoes/tasks.py +++ b/pipelines/datasets/br_mp_pep_cargos_funcoes/tasks.py @@ -400,7 +400,7 @@ def is_up_to_date() -> bool: log(f"Last date website: {text}, parsed as {date_website}") last_date_in_bq = extract_last_date( - "br_mp_pep", "cargos_funcoes_atualizado", "yy-mm", "basedosdados" + "br_mp_pep", "cargos_funcoes", "yy-mm", "basedosdados" ) date_in_bq = datetime.datetime.strptime(last_date_in_bq, "%Y-%m") From d1adcae6dce008feccb18b2b107355faf94b667e Mon Sep 17 00:00:00 2001 From: tricktx Date: Mon, 18 Sep 2023 09:34:23 -0300 Subject: [PATCH 17/20] dev --- pipelines/datasets/br_anp_precos_combustiveis/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/datasets/br_anp_precos_combustiveis/utils.py b/pipelines/datasets/br_anp_precos_combustiveis/utils.py index 38cf5eef1..69f821e87 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/utils.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/utils.py @@ -55,7 +55,7 @@ def get_id_municipio(id_municipio: pd.DataFrame): id_municipio = bd.read_table( dataset_id="br_bd_diretorios_brasil", table_id="municipio", - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", from_file=True, ) log("----" * 150) From e729bae174a65ce9be69923c4b21f9b456d3b924 Mon Sep 17 00:00:00 2001 From: tricktx Date: Mon, 18 Sep 2023 10:02:43 -0300 Subject: [PATCH 18/20] uretirando os dev --- pipelines/datasets/br_anp_precos_combustiveis/flows.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pipelines/datasets/br_anp_precos_combustiveis/flows.py b/pipelines/datasets/br_anp_precos_combustiveis/flows.py index eb156573d..ec824870b 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/flows.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/flows.py @@ -69,7 +69,6 @@ with case(dados_desatualizados, True): df = download_and_transform(upstream_tasks=[rename_flow_run]) output_path = make_partitions(df=df, upstream_tasks=[df]) - get_date_max_mais = data_max_bd_mais() get_date_max_pro = data_max_bd_pro(df=df) # pylint: disable=C0103 @@ -118,7 +117,7 @@ bq_last_update=False, api_mode="prod", date_format="yy-mm-dd", - _last_date=get_date_max_mais, + _last_date=get_date_max_pro, upstream_tasks=[wait_upload_table], ) From f05ec03ec7ca3e64fe6a7b6744daa978da574fe4 Mon Sep 17 00:00:00 2001 From: tricktx Date: Mon, 18 Sep 2023 10:28:49 -0300 Subject: [PATCH 19/20] retirando os dev --- pipelines/datasets/br_anp_precos_combustiveis/tasks.py | 2 +- pipelines/datasets/br_anp_precos_combustiveis/utils.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/datasets/br_anp_precos_combustiveis/tasks.py b/pipelines/datasets/br_anp_precos_combustiveis/tasks.py index 77755e3f9..48c911278 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/tasks.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/tasks.py @@ -44,7 +44,7 @@ def check_for_updates(dataset_id, table_id): # Obtém a última data no site BD data_bq_obj = extract_last_date( - dataset_id, table_id, "yy-mm-dd", "basedosdados-dev", data="data_coleta" + dataset_id, table_id, "yy-mm-dd", "basedosdados", data="data_coleta" ) # Registra a data mais recente do site diff --git a/pipelines/datasets/br_anp_precos_combustiveis/utils.py b/pipelines/datasets/br_anp_precos_combustiveis/utils.py index 69f821e87..38cf5eef1 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/utils.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/utils.py @@ -55,7 +55,7 @@ def get_id_municipio(id_municipio: pd.DataFrame): id_municipio = bd.read_table( dataset_id="br_bd_diretorios_brasil", table_id="municipio", - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", from_file=True, ) log("----" * 150) From 1d5e08c02261b700e81125694920e142d2760a03 Mon Sep 17 00:00:00 2001 From: Laura Amaral Date: Tue, 19 Sep 2023 09:32:21 -0300 Subject: [PATCH 20/20] remove br_sp_dieese_icv schedules --- pipelines/datasets/br_sp_saopaulo_dieese_icv/flows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/datasets/br_sp_saopaulo_dieese_icv/flows.py b/pipelines/datasets/br_sp_saopaulo_dieese_icv/flows.py index 4a43ad439..55ec12014 100644 --- a/pipelines/datasets/br_sp_saopaulo_dieese_icv/flows.py +++ b/pipelines/datasets/br_sp_saopaulo_dieese_icv/flows.py @@ -96,4 +96,4 @@ br_sp_dieese.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_sp_dieese.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) -br_sp_dieese.schedule = every_month +# br_sp_dieese.schedule = every_month