From 50a5f205f9921e0133bed35f1fddbe337b673437 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 11 Sep 2023 20:17:40 +0000 Subject: [PATCH 01/43] [pre-commit.ci] pre-commit autoupdate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/psf/black: 23.7.0 → 23.9.1](https://github.com/psf/black/compare/23.7.0...23.9.1) --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6a5844963..0060bee4e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -9,7 +9,7 @@ repos: - id: no-commit-to-branch # prevents committing to protected branches - id: trailing-whitespace # prevents trailing whitespace - repo: https://github.com/psf/black - rev: 23.7.0 + rev: 23.9.1 hooks: - id: black From 0fa7474cd04210bdfd95f8987e5b8c7527aaf5db Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Fri, 15 Sep 2023 10:33:13 -0300 Subject: [PATCH 02/43] =?UTF-8?q?define=20execu=C3=A7=C3=A3o=20sequencial?= =?UTF-8?q?=20das=20schedules?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pipelines/datasets/br_ms_cnes/schedules.py | 26 +++++++++++----------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/pipelines/datasets/br_ms_cnes/schedules.py b/pipelines/datasets/br_ms_cnes/schedules.py index 02f90701e..0f96ad724 100644 --- a/pipelines/datasets/br_ms_cnes/schedules.py +++ b/pipelines/datasets/br_ms_cnes/schedules.py @@ -12,7 +12,7 @@ schedule_br_ms_cnes_estabelecimento = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="0 9 * * *", # every day at 9:00 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -31,7 +31,7 @@ adjustments=[adjustments.next_weekday], ) - +# todo selecionar outro horário schedule_br_ms_cnes_profissional = Schedule( clocks=[ CronClock( @@ -58,7 +58,7 @@ schedule_br_ms_cnes_equipe = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="30 9 * * *", # every day 9:30 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -80,7 +80,7 @@ schedule_br_ms_cnes_leito = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="0 10 * * *", # every day at 10:00 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -102,7 +102,7 @@ schedule_br_ms_cnes_equipamento = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="30 10 * * *", # every day at 10:30 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -124,7 +124,7 @@ schedule_br_ms_cnes_estabelecimento_ensino = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="45 10 * * *", # every day at 10:45 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -146,7 +146,7 @@ schedule_br_ms_cnes_dados_complementares = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="0 11 * * *", # every day 11:00 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -168,7 +168,7 @@ schedule_br_ms_cnes_estabelecimento_filantropico = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="15 11 * * *", # every day at 11:15 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -189,7 +189,7 @@ schedule_br_ms_cnes_gestao_metas = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="30 11 * * *", # every day at 11:30 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -212,7 +212,7 @@ schedule_br_ms_cnes_habilitacao = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="45 11 * * *", # every day at 11:45 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -234,7 +234,7 @@ schedule_br_ms_cnes_incentivos = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="45 11 * * *", # every day at 11:45 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -256,7 +256,7 @@ schedule_br_ms_cnes_regra_contratual = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="0 12 * * *", # every day at 12:00 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -279,7 +279,7 @@ schedule_br_ms_cnes_servico_especializado = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="30 12 * * *", # every day at 12:30 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, From 5187b5f7684d030240b9df18c126453597d0dd6b Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Fri, 15 Sep 2023 10:57:49 -0300 Subject: [PATCH 03/43] debuga erro na task update_django_metadata --- pipelines/datasets/br_ms_cnes/flows.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/pipelines/datasets/br_ms_cnes/flows.py b/pipelines/datasets/br_ms_cnes/flows.py index d76b9dd82..2e474ddfa 100644 --- a/pipelines/datasets/br_ms_cnes/flows.py +++ b/pipelines/datasets/br_ms_cnes/flows.py @@ -398,6 +398,23 @@ "Os dados do FTP CNES-ST ainda não foram atualizados para o ano/mes mais recente" ) + with case(update_metadata, True): + update = update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_last_update=False, + bq_table_last_year_month=True, + api_mode="prod", + billing_project_id="basedosdados", + date_format="yy-mm", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="months", + upstream_tasks=[files_path], + ) + with case(is_empty(files_path), False): dbc_files = access_ftp_donwload_files( file_list=files_path, From fe6d1d50b95a62135c382c4897a859fc471b73b3 Mon Sep 17 00:00:00 2001 From: tricktx Date: Fri, 15 Sep 2023 11:03:17 -0300 Subject: [PATCH 04/43] check for updates and organizing --- .../br_anp_precos_combustiveis/constants.py | 4 + .../br_anp_precos_combustiveis/flows.py | 157 ++++++++---------- .../br_anp_precos_combustiveis/schedules.py | 8 +- .../br_anp_precos_combustiveis/tasks.py | 138 +++++++-------- .../br_anp_precos_combustiveis/utils.py | 111 +++++++++++-- pipelines/utils/utils.py | 6 +- 6 files changed, 238 insertions(+), 186 deletions(-) diff --git a/pipelines/datasets/br_anp_precos_combustiveis/constants.py b/pipelines/datasets/br_anp_precos_combustiveis/constants.py index b9299317b..d35088bfd 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/constants.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/constants.py @@ -48,6 +48,10 @@ class constants(Enum): # pylint: disable=c0103 "https://www.gov.br/anp/pt-br/centrais-de-conteudo/dados-abertos/arquivos/shpc/qus/ultimas-4-semanas-diesel-gnv.csv", ] + URLS_DATA = [ + "https://www.gov.br/anp/pt-br/centrais-de-conteudo/dados-abertos/arquivos/shpc/qus/ultimas-4-semanas-glp.csv" + ] + PATH_INPUT = "/tmp/input/" PATH_OUTPUT = "/tmp/output/" diff --git a/pipelines/datasets/br_anp_precos_combustiveis/flows.py b/pipelines/datasets/br_anp_precos_combustiveis/flows.py index ce81cd827..eb156573d 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/flows.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/flows.py @@ -13,11 +13,13 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.datasets.br_anp_precos_combustiveis.utils import download_files from pipelines.datasets.br_anp_precos_combustiveis.tasks import ( - tratamento, + download_and_transform, data_max_bd_mais, data_max_bd_pro, make_partitions, + check_for_updates, ) from pipelines.datasets.br_anp_precos_combustiveis.schedules import ( every_week_anp_microdados, @@ -26,6 +28,11 @@ create_table_and_upload_to_gcs, rename_current_flow_run_dataset_table, get_current_flow_labels, + log_task, +) + +from pipelines.datasets.br_anp_precos_combustiveis.constants import ( + constants as anatel_constants, ) with Flow( @@ -37,7 +44,7 @@ table_id = Parameter("table_id", default="microdados", required=True) materialization_mode = Parameter( - "materialization_mode", default="prod", required=False + "materialization_mode", default="dev", required=False ) materialize_after_dump = Parameter( @@ -51,103 +58,69 @@ ) update_metadata = Parameter("update_metadata", default=True, required=False) - df = tratamento(upstream_tasks=[rename_flow_run]) - output_path = make_partitions(df=df, upstream_tasks=[df]) - get_date_max_mais = data_max_bd_mais() - get_date_max_pro = data_max_bd_pro(df=df) - - # pylint: disable=C0103 - wait_upload_table = create_table_and_upload_to_gcs( - data_path=output_path, - dataset_id=dataset_id, - table_id=table_id, - dump_mode="append", - wait=output_path, - ) - - # ! BD MAIS - Atrasado - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id, - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", + dados_desatualizados = check_for_updates(dataset_id=dataset_id, table_id=table_id) + log_task(f"Checando se os dados estão desatualizados: {dados_desatualizados}") + with case(dados_desatualizados, False): + log_task( + "Dados atualizados, não é necessário fazer o download", + upstream_tasks=[dados_desatualizados], ) - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) + with case(dados_desatualizados, True): + df = download_and_transform(upstream_tasks=[rename_flow_run]) + output_path = make_partitions(df=df, upstream_tasks=[df]) + get_date_max_mais = data_max_bd_mais() + get_date_max_pro = data_max_bd_pro(df=df) - with case(update_metadata, True): - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - api_mode="prod", - date_format="yy-mm-dd", - _last_date=get_date_max_mais, - upstream_tasks=[wait_upload_table], - ) - - # ! BD PRO - Atualizado - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}" "_atualizado", + # pylint: disable=C0103 + wait_upload_table = create_table_and_upload_to_gcs( + data_path=output_path, + dataset_id=dataset_id, + table_id=table_id, + dump_mode="append", + wait=output_path, ) - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) + with case(materialize_after_dump, True): + # Trigger DBT flow run + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": table_id, + "mode": materialization_mode, + "dbt_alias": dbt_alias, + }, + labels=current_flow_labels, + run_name=f"Materialize {dataset_id}.{table_id}", + ) - with case(update_metadata, True): - update_django_metadata( - dataset_id, - table_id + "_atualizado", - metadata_type="DateTimeRange", - bq_last_update=False, - api_mode="prod", - date_format="yy-mm-dd", - _last_date=get_date_max_pro, - upstream_tasks=[wait_upload_table], + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + with case(update_metadata, True): + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_last_update=False, + api_mode="prod", + date_format="yy-mm-dd", + _last_date=get_date_max_mais, + upstream_tasks=[wait_upload_table], + ) anp_microdados.storage = GCS(constants.GCS_FLOWS_BUCKET.value) anp_microdados.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) diff --git a/pipelines/datasets/br_anp_precos_combustiveis/schedules.py b/pipelines/datasets/br_anp_precos_combustiveis/schedules.py index 76c702d98..46ee4ecb2 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/schedules.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/schedules.py @@ -3,13 +3,7 @@ Schedules for br_anp_precos_combustiveis """ -# -*- coding: utf-8 -*- -""" -Schedules for br_anp_precos_combustiveis -""" - from datetime import timedelta, datetime - from prefect.schedules import Schedule from prefect.schedules.clocks import IntervalClock from pipelines.constants import constants @@ -24,7 +18,7 @@ "update_metadata": True, "dbt_alias": True, "materialize_after_dump": True, - "materialization_mode": "prod", + "materialization_mode": "dev", "table_id": "microdados", "dataset_id": "br_anp_precos_combustiveis", }, diff --git a/pipelines/datasets/br_anp_precos_combustiveis/tasks.py b/pipelines/datasets/br_anp_precos_combustiveis/tasks.py index b8a226343..3bbf18423 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/tasks.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/tasks.py @@ -12,99 +12,89 @@ get_id_municipio, open_csvs, partition_data, + merge_table_id_municipio, + orderning_data_coleta, + creating_column_ano, + rename_and_reordening, + rename_columns, + rename_and_to_create_endereco, + lower_colunm_produto, ) from pipelines.datasets.br_anp_precos_combustiveis.constants import ( constants as anatel_constants, ) -from pipelines.utils.utils import log +from pipelines.utils.utils import log, extract_last_date from pipelines.constants import constants +@task +def check_for_updates(dataset_id, table_id): + """ + Checks if there are available updates for a specific dataset and table. + + Returns: + bool: Returns True if updates are available, otherwise returns False. + """ + # Obtém a data mais recente do site + download_files(anatel_constants.URLS_DATA.value, anatel_constants.PATH_INPUT.value) + df = pd.read_csv(anatel_constants.URL_DIESEL_GNV.value, sep=";", encoding="utf-8") + data_obj = df["Data da Coleta"].max() + data_obj = datetime.strptime(data_obj, "%d/%m/%Y").strftime("%Y-%m-%d") + data_obj = datetime.strptime(data_obj, "%Y-%m-%d").date() + + # Obtém a última data no site BD + data_bq_obj = extract_last_date( + dataset_id, table_id, "yy-mm-dd", "basedosdados-dev", data="data_coleta" + ) + + # Registra a data mais recente do site + log(f"Última data no site do ANP: {data_obj}") + log(f"Última data no site da BD: {data_bq_obj}") + + # Compara as datas para verificar se há atualizações + if data_obj > data_bq_obj: + return True # Há atualizações disponíveis + else: + return False # Não há novas atualizações disponíveis + + @task( max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) -def tratamento(): +def download_and_transform(): download_files( - anatel_constants.URLS.value, + [ + anatel_constants.URLS.value, + ], anatel_constants.PATH_INPUT.value, ) precos_combustiveis = open_csvs( - url_diesel_gnv=anatel_constants.URL_DIESEL_GNV.value, - url_gasolina_etanol=anatel_constants.URL_GASOLINA_ETANOL.value, - url_glp=anatel_constants.URL_GLP.value, + anatel_constants.URL_DIESEL_GNV.value, + anatel_constants.URL_GASOLINA_ETANOL.value, + anatel_constants.URL_GLP.value, ) - id_municipio = get_id_municipio() - log("Iniciando tratamento dos dados precos_combustiveis") - precos_combustiveis = pd.merge( - id_municipio, - precos_combustiveis, - how="right", - left_on=["nome", "sigla_uf"], - right_on=["Municipio", "Estado - Sigla"], - ) - log("----" * 150) - log("Dados mergeados") - precos_combustiveis.rename(columns={"Municipio": "nome"}, inplace=True) - precos_combustiveis.dropna(subset=["Valor de Venda"], inplace=True) - precos_combustiveis["endereco_revenda"] = ( - precos_combustiveis["Nome da Rua"].fillna("") - + "," - + " " - + precos_combustiveis["Numero Rua"].fillna("") - + "," - + " " - + precos_combustiveis["Complemento"].fillna("") - ) - precos_combustiveis.drop(columns=["sigla_uf"], inplace=True) - precos_combustiveis.rename(columns={"Data da Coleta": "data_coleta"}, inplace=True) - precos_combustiveis["data_coleta"] = ( - precos_combustiveis["data_coleta"].str[6:10] - + "-" - + precos_combustiveis["data_coleta"].str[3:5] - + "-" - + precos_combustiveis["data_coleta"].str[0:2] - ) - precos_combustiveis["Produto"] = precos_combustiveis["Produto"].str.lower() - precos_combustiveis["ano"] = precos_combustiveis["data_coleta"].str[0:4] - precos_combustiveis["ano"].replace("nan", "", inplace=True) - precos_combustiveis.rename(columns=anatel_constants.RENAME.value, inplace=True) - precos_combustiveis = precos_combustiveis[anatel_constants.ORDEM.value] - precos_combustiveis["ano"] = precos_combustiveis["ano"].apply( - lambda x: str(x).replace(".0", "") - ) - precos_combustiveis["cep_revenda"] = precos_combustiveis["cep_revenda"].apply( - lambda x: str(x).replace("-", "") - ) - precos_combustiveis["unidade_medida"] = precos_combustiveis["unidade_medida"].map( - {"R$ / litro": "R$/litro", "R$ / m³": "R$/m3", "R$ / 13 kg": "R$/13kg"} - ) - precos_combustiveis["nome_estabelecimento"] = precos_combustiveis[ - "nome_estabelecimento" - ].apply(lambda x: str(x).replace(",", "")) - precos_combustiveis["preco_compra"] = precos_combustiveis["preco_compra"].apply( - lambda x: str(x).replace(",", ".") - ) - precos_combustiveis["preco_venda"] = precos_combustiveis["preco_venda"].apply( - lambda x: str(x).replace(",", ".") - ) - precos_combustiveis["preco_venda"] = precos_combustiveis["preco_venda"].replace( - "nan", "" - ) - precos_combustiveis["preco_compra"] = precos_combustiveis["preco_compra"].replace( - "nan", "" + df = get_id_municipio(id_municipio=precos_combustiveis) + + df = merge_table_id_municipio( + id_municipio=df, pd_precos_combustiveis=precos_combustiveis ) - precos_combustiveis.replace(np.nan, "", inplace=True) - log("----" * 150) - log("Dados tratados com sucesso") - log("----" * 150) - log("Iniciando particionamento dos dados") - log("----" * 150) - log(precos_combustiveis["data_coleta"].unique()) - - return precos_combustiveis + + df = rename_and_to_create_endereco(precos_combustiveis=df) + + df = orderning_data_coleta(precos_combustiveis=df) + + df = lower_colunm_produto(precos_combustiveis=df) + + df = creating_column_ano(precos_combustiveis=df) + + df = rename_and_reordening(precos_combustiveis=df) + + df = rename_columns(precos_combustiveis=df) + + return df @task( diff --git a/pipelines/datasets/br_anp_precos_combustiveis/utils.py b/pipelines/datasets/br_anp_precos_combustiveis/utils.py index 8f8bc3d3b..38cf5eef1 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/utils.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/utils.py @@ -9,6 +9,10 @@ import requests from pipelines.utils.utils import log from datetime import datetime +from pipelines.datasets.br_anp_precos_combustiveis.constants import ( + constants as anatel_constants, +) +import numpy as np def download_files(urls, path): @@ -28,10 +32,8 @@ def download_files(urls, path): for url in urls: response = requests.get(url) if response.status_code == 200: - file_name = url.split("/")[-1] # Get the filename from the URL + file_name = url.split("/")[-1] file_path = os.path.join(path, file_name) - # log("----" * 150) - # log("if response.status_code == 200: SUCESSO") with open(file_path, "wb") as file: file.write(response.content) @@ -46,7 +48,7 @@ def download_files(urls, path): return downloaded_files -def get_id_municipio(): +def get_id_municipio(id_municipio: pd.DataFrame): # ! Carregando os dados direto do Diretório de municipio da BD # Para carregar o dado direto no pandas log("Carregando dados do diretório de municípios da BD") @@ -80,21 +82,18 @@ def open_csvs(url_diesel_gnv, url_gasolina_etanol, url_glp): log("Abrindo os arquivos csvs") diesel = pd.read_csv(f"{url_diesel_gnv}", sep=";", encoding="utf-8") log("----" * 150) - # log("Abrindo os arquivos csvs diesel") log("----" * 150) log(diesel["Data da Coleta"].unique()) gasolina = pd.read_csv(f"{url_gasolina_etanol}", sep=";", encoding="utf-8") log("----" * 150) log("Abrindo os arquivos csvs gasolina") log("----" * 150) - # log(gasolina["Data da Coleta"].unique()) glp = pd.read_csv(f"{url_glp}", sep=";", encoding="utf-8") log("Abrindo os arquivos csvs glp") log("----" * 150) # log(glp["Data da Coleta"].unique()) data_frames.extend([diesel, gasolina, glp]) precos_combustiveis = pd.concat(data_frames, ignore_index=True) - # log(precos_combustiveis["Data da Coleta"].unique()) log("----" * 150) log("Dados concatenados com sucesso") log("----" * 150) @@ -115,21 +114,111 @@ def partition_data(df: pd.DataFrame, column_name: list[str], output_directory: s for value in unique_values: value_str = str(value)[:10] date_value = datetime.strptime(value_str, "%Y-%m-%d").date() - # log(date_value) formatted_value = date_value.strftime("%Y-%m-%d") - # log(formatted_value) partition_path = os.path.join( output_directory, f"{column_name}={formatted_value}" ) - # log(f"Salvando dados em {partition_path}") if not os.path.exists(partition_path): os.makedirs(partition_path) df_partition = df[df[column_name] == value].copy() df_partition.drop([column_name], axis=1, inplace=True) - # log(f"df_partition: {df_partition}") csv_path = os.path.join(partition_path, "data.csv") df_partition.to_csv(csv_path, index=False, encoding="utf-8", na_rep="") log(f"Arquivo {csv_path} salvo com sucesso!") + + +def merge_table_id_municipio( + id_municipio: pd.DataFrame, pd_precos_combustiveis: pd.DataFrame +): + log("Iniciando tratamento dos dados precos_combustiveis") + precos_combustiveis = pd.merge( + id_municipio, + pd_precos_combustiveis, + how="right", + left_on=["nome", "sigla_uf"], + right_on=["Municipio", "Estado - Sigla"], + ) + log("----" * 150) + log("Dados mergeados") + + return precos_combustiveis + + +def rename_and_to_create_endereco(precos_combustiveis: pd.DataFrame): + precos_combustiveis["endereco_revenda"] = ( + precos_combustiveis["Nome da Rua"].fillna("") + + "," + + " " + + precos_combustiveis["Numero Rua"].fillna("") + + "," + + " " + + precos_combustiveis["Complemento"].fillna("") + ) + precos_combustiveis.drop(columns=["sigla_uf"], inplace=True) + precos_combustiveis.rename(columns={"Data da Coleta": "data_coleta"}, inplace=True) + + return precos_combustiveis + + +def orderning_data_coleta(precos_combustiveis: pd.DataFrame): + precos_combustiveis["data_coleta"] = ( + precos_combustiveis["data_coleta"].str[6:10] + + "-" + + precos_combustiveis["data_coleta"].str[3:5] + + "-" + + precos_combustiveis["data_coleta"].str[0:2] + ) + + return precos_combustiveis + + +def lower_colunm_produto(precos_combustiveis: pd.DataFrame): + precos_combustiveis["Produto"] = precos_combustiveis["Produto"].str.lower() + return precos_combustiveis + + +def creating_column_ano(precos_combustiveis: pd.DataFrame): + precos_combustiveis["ano"] = precos_combustiveis["data_coleta"].str[0:4] + precos_combustiveis["ano"].replace("nan", "", inplace=True) + + return precos_combustiveis + + +def rename_and_reordening(precos_combustiveis: pd.DataFrame): + precos_combustiveis.rename(columns=anatel_constants.RENAME.value, inplace=True) + precos_combustiveis = precos_combustiveis[anatel_constants.ORDEM.value] + + return precos_combustiveis + + +def rename_columns(precos_combustiveis: pd.DataFrame): + precos_combustiveis["ano"] = precos_combustiveis["ano"].apply( + lambda x: str(x).replace(".0", "") + ) + precos_combustiveis["cep_revenda"] = precos_combustiveis["cep_revenda"].apply( + lambda x: str(x).replace("-", "") + ) + precos_combustiveis["unidade_medida"] = precos_combustiveis["unidade_medida"].map( + {"R$ / litro": "R$/litro", "R$ / m³": "R$/m3", "R$ / 13 kg": "R$/13kg"} + ) + precos_combustiveis["nome_estabelecimento"] = precos_combustiveis[ + "nome_estabelecimento" + ].apply(lambda x: str(x).replace(",", "")) + precos_combustiveis["preco_compra"] = precos_combustiveis["preco_compra"].apply( + lambda x: str(x).replace(",", ".") + ) + precos_combustiveis["preco_venda"] = precos_combustiveis["preco_venda"].apply( + lambda x: str(x).replace(",", ".") + ) + precos_combustiveis["preco_venda"] = precos_combustiveis["preco_venda"].replace( + "nan", "" + ) + precos_combustiveis["preco_compra"] = precos_combustiveis["preco_compra"].replace( + "nan", "" + ) + precos_combustiveis.replace(np.nan, "", inplace=True) + + return precos_combustiveis diff --git a/pipelines/utils/utils.py b/pipelines/utils/utils.py index 9d391ab7d..95f434e1c 100644 --- a/pipelines/utils/utils.py +++ b/pipelines/utils/utils.py @@ -670,7 +670,9 @@ def extract_last_update( raise -def extract_last_date(dataset_id, table_id, date_format: str, billing_project_id: str): +def extract_last_date( + dataset_id, table_id, date_format: str, billing_project_id: str, data +): """ Extracts the last update date of a given dataset table. @@ -717,7 +719,7 @@ def extract_last_date(dataset_id, table_id, date_format: str, billing_project_id try: query_bd = f""" SELECT - MAX(data) as max_date + MAX({data}) as max_date FROM `basedosdados.{dataset_id}.{table_id}` """ From 6a96e080b579df74cb879ae4f82640685cf9f084 Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Fri, 15 Sep 2023 11:11:07 -0300 Subject: [PATCH 05/43] =?UTF-8?q?insere=20modifica=C3=A7=C3=A3o=20na=20tas?= =?UTF-8?q?k=20update=5Fdjango=5Fmetadata?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pipelines/utils/metadata/tasks.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pipelines/utils/metadata/tasks.py b/pipelines/utils/metadata/tasks.py index 72e22cccd..40e99a8b0 100644 --- a/pipelines/utils/metadata/tasks.py +++ b/pipelines/utils/metadata/tasks.py @@ -107,8 +107,9 @@ def update_django_metadata( "weeks": "weeks", "days": "days", } - if not isinstance(_last_date, str): - raise ValueError("O parâmetro `last_date` deve ser do tipo string") + + if not isinstance(_last_date, str) and _last_date is not None: + raise ValueError("O parâmetro `last_date` deve ser uma string não nula") if time_unit not in unidades_permitidas: raise ValueError( From fe395dffda72d7225ec7684ad726eac205963193 Mon Sep 17 00:00:00 2001 From: tricktx Date: Fri, 15 Sep 2023 11:19:45 -0300 Subject: [PATCH 06/43] url --- pipelines/datasets/br_anp_precos_combustiveis/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/datasets/br_anp_precos_combustiveis/tasks.py b/pipelines/datasets/br_anp_precos_combustiveis/tasks.py index 3bbf18423..23c611e81 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/tasks.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/tasks.py @@ -37,7 +37,7 @@ def check_for_updates(dataset_id, table_id): """ # Obtém a data mais recente do site download_files(anatel_constants.URLS_DATA.value, anatel_constants.PATH_INPUT.value) - df = pd.read_csv(anatel_constants.URL_DIESEL_GNV.value, sep=";", encoding="utf-8") + df = pd.read_csv(anatel_constants.URL_GLP.value, sep=";", encoding="utf-8") data_obj = df["Data da Coleta"].max() data_obj = datetime.strptime(data_obj, "%d/%m/%Y").strftime("%Y-%m-%d") data_obj = datetime.strptime(data_obj, "%Y-%m-%d").date() From 58cff2244cf2bb732a6f2a7adb39b68e1e4352f5 Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Fri, 15 Sep 2023 11:26:03 -0300 Subject: [PATCH 07/43] =?UTF-8?q?corrige=20flow=20e=20setta=20hor=C3=A1rio?= =?UTF-8?q?=20da=20schedule=20de=20profissional?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pipelines/datasets/br_ms_cnes/flows.py | 17 ----------------- pipelines/datasets/br_ms_cnes/schedules.py | 2 +- 2 files changed, 1 insertion(+), 18 deletions(-) diff --git a/pipelines/datasets/br_ms_cnes/flows.py b/pipelines/datasets/br_ms_cnes/flows.py index 2e474ddfa..d76b9dd82 100644 --- a/pipelines/datasets/br_ms_cnes/flows.py +++ b/pipelines/datasets/br_ms_cnes/flows.py @@ -398,23 +398,6 @@ "Os dados do FTP CNES-ST ainda não foram atualizados para o ano/mes mais recente" ) - with case(update_metadata, True): - update = update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - bq_table_last_year_month=True, - api_mode="prod", - billing_project_id="basedosdados", - date_format="yy-mm", - is_bd_pro=True, - is_free=True, - time_delta=6, - time_unit="months", - upstream_tasks=[files_path], - ) - with case(is_empty(files_path), False): dbc_files = access_ftp_donwload_files( file_list=files_path, diff --git a/pipelines/datasets/br_ms_cnes/schedules.py b/pipelines/datasets/br_ms_cnes/schedules.py index 0f96ad724..e18d9d2ad 100644 --- a/pipelines/datasets/br_ms_cnes/schedules.py +++ b/pipelines/datasets/br_ms_cnes/schedules.py @@ -35,7 +35,7 @@ schedule_br_ms_cnes_profissional = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="30 6 * * *", # every day at 18:30 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, From 08c54cb3f523fe173bf82016a53e98b134166f37 Mon Sep 17 00:00:00 2001 From: tricktx Date: Fri, 15 Sep 2023 11:32:27 -0300 Subject: [PATCH 08/43] tupla in download_files --- pipelines/datasets/br_anp_precos_combustiveis/tasks.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pipelines/datasets/br_anp_precos_combustiveis/tasks.py b/pipelines/datasets/br_anp_precos_combustiveis/tasks.py index 23c611e81..77755e3f9 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/tasks.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/tasks.py @@ -63,12 +63,7 @@ def check_for_updates(dataset_id, table_id): retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) def download_and_transform(): - download_files( - [ - anatel_constants.URLS.value, - ], - anatel_constants.PATH_INPUT.value, - ) + download_files(anatel_constants.URLS.value, anatel_constants.PATH_INPUT.value) precos_combustiveis = open_csvs( anatel_constants.URL_DIESEL_GNV.value, From 7d4288c8aa2a81d1126de4d10173501d542fb0f3 Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Fri, 15 Sep 2023 11:46:38 -0300 Subject: [PATCH 09/43] testa update_metadata novamente --- pipelines/datasets/br_rf_cafir/flows.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pipelines/datasets/br_rf_cafir/flows.py b/pipelines/datasets/br_rf_cafir/flows.py index 34f9a5a42..d97924e80 100644 --- a/pipelines/datasets/br_rf_cafir/flows.py +++ b/pipelines/datasets/br_rf_cafir/flows.py @@ -57,6 +57,22 @@ dataset_id=dataset_id, table_id=table_id, data=info[0], upstream_tasks=[info] ) + with case(update_metadata, True): + update = update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + _last_date=info[0], + bq_last_update=False, + api_mode="prod", + date_format="yy-mm-dd", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="months", + upstream_tasks=[info], + ) + with case(is_outdated, False): log_task(f"Não há atualizações para a tabela de {table_id}!") From baafc713d7d3160004c9a203514872f8e21f287d Mon Sep 17 00:00:00 2001 From: tricktx Date: Fri, 15 Sep 2023 11:57:27 -0300 Subject: [PATCH 10/43] =?UTF-8?q?ajustando=20schedules=20para=20todos=20os?= =?UTF-8?q?=20dias=2010=20da=20manh=C3=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../datasets/br_anp_precos_combustiveis/schedules.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pipelines/datasets/br_anp_precos_combustiveis/schedules.py b/pipelines/datasets/br_anp_precos_combustiveis/schedules.py index 46ee4ecb2..9c660ab04 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/schedules.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/schedules.py @@ -5,20 +5,20 @@ from datetime import timedelta, datetime from prefect.schedules import Schedule -from prefect.schedules.clocks import IntervalClock +from prefect.schedules.clocks import CronClock from pipelines.constants import constants every_week_anp_microdados = Schedule( clocks=[ - IntervalClock( - interval=timedelta(days=1), + CronClock( + cron="0 10 * * *", start_date=datetime(2021, 1, 1), labels=[constants.BASEDOSDADOS_PROD_AGENT_LABEL.value], parameter_defaults={ "update_metadata": True, "dbt_alias": True, "materialize_after_dump": True, - "materialization_mode": "dev", + "materialization_mode": "prod", "table_id": "microdados", "dataset_id": "br_anp_precos_combustiveis", }, From f29ff7578f015172ca81556dc49a400d6ab68fee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Fri, 15 Sep 2023 12:00:43 -0300 Subject: [PATCH 11/43] feat: installing readr package --- pipelines/datasets/br_cvm_fi/tasks.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/pipelines/datasets/br_cvm_fi/tasks.py b/pipelines/datasets/br_cvm_fi/tasks.py index c5ccc817f..1a51ae2e9 100644 --- a/pipelines/datasets/br_cvm_fi/tasks.py +++ b/pipelines/datasets/br_cvm_fi/tasks.py @@ -16,6 +16,7 @@ from rpy2.robjects.packages import importr import rpy2.robjects.packages as rpackages import rpy2.robjects as ro +from rpy2.robjects.vectors import StrVector from rpy2.robjects import pandas2ri from pipelines.datasets.br_cvm_fi.utils import ( sheet_to_df, @@ -391,11 +392,25 @@ def clean_data_make_partitions_perfil(diretorio, table_id): df_final = pd.DataFrame() arquivos = glob.glob(f"{diretorio}*.csv") + # import R's utility package + utils = rpackages.importr("utils") + + # select a mirror for R packages + utils.chooseCRANmirror(ind=1) + # R package names + packnames = "readr" + + # R vector of strings + names_to_install = [x for x in packnames if not rpackages.isinstalled(x)] + if len(names_to_install) > 0: + utils.install_packages(StrVector(names_to_install)) + # Import readr + + readr = rpackages.importr("readr") for file in tqdm(arquivos): log(f"Baixando o arquivo ------> {file}") ## reading with R - readr = rpackages.importr("readr") df_r = readr.read_delim( file, delim=";", locale=readr.locale(encoding="ISO-8859-1") ) From 8a1699ea02d0ece0658bc2d7e004e2ea213cab41 Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Fri, 15 Sep 2023 12:26:27 -0300 Subject: [PATCH 12/43] registar o flow --- pipelines/datasets/br_rf_cafir/flows.py | 2 +- pipelines/datasets/br_rf_cafir/tasks.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pipelines/datasets/br_rf_cafir/flows.py b/pipelines/datasets/br_rf_cafir/flows.py index d97924e80..c0d28369d 100644 --- a/pipelines/datasets/br_rf_cafir/flows.py +++ b/pipelines/datasets/br_rf_cafir/flows.py @@ -70,7 +70,7 @@ is_free=True, time_delta=6, time_unit="months", - upstream_tasks=[info], + upstream_tasks=[is_outdated, info], ) with case(is_outdated, False): diff --git a/pipelines/datasets/br_rf_cafir/tasks.py b/pipelines/datasets/br_rf_cafir/tasks.py index 8358c5918..5beaa5d8f 100644 --- a/pipelines/datasets/br_rf_cafir/tasks.py +++ b/pipelines/datasets/br_rf_cafir/tasks.py @@ -155,8 +155,7 @@ def parse_data(url: str, other_task_output: tuple[list[datetime], list[str]]) -> # save new file as csv df.to_csv(save_path, index=False, sep=",", na_rep="", encoding="utf-8") - # resolve ASCII 0 no momento da leitura do BQ - # ler e salvar de novo + # resolve ASCII 0 no momento da leitura do BQ. Ler e salvar de novo. df = pd.read_csv(save_path, dtype=str) df.to_csv(save_path, index=False, sep=",", na_rep="", encoding="utf-8") From 7bb843c93be06fc0e5cec9dd3348807fc7d05a33 Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Fri, 15 Sep 2023 12:59:11 -0300 Subject: [PATCH 13/43] cria task que converte datetime pra str --- pipelines/datasets/br_rf_cafir/flows.py | 4 +++- pipelines/datasets/br_rf_cafir/tasks.py | 13 +++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/pipelines/datasets/br_rf_cafir/flows.py b/pipelines/datasets/br_rf_cafir/flows.py index c0d28369d..73262f2ec 100644 --- a/pipelines/datasets/br_rf_cafir/flows.py +++ b/pipelines/datasets/br_rf_cafir/flows.py @@ -16,6 +16,7 @@ parse_files_parse_date, parse_data, check_if_bq_data_is_outdated, + convert_datetime_to_string, ) from pipelines.utils.constants import constants as utils_constants @@ -56,13 +57,14 @@ is_outdated = check_if_bq_data_is_outdated( dataset_id=dataset_id, table_id=table_id, data=info[0], upstream_tasks=[info] ) + update_metadata_strig_date = convert_datetime_to_string(data=info[0]) with case(update_metadata, True): update = update_django_metadata( dataset_id, table_id, metadata_type="DateTimeRange", - _last_date=info[0], + _last_date=update_metadata_strig_date, bq_last_update=False, api_mode="prod", date_format="yy-mm-dd", diff --git a/pipelines/datasets/br_rf_cafir/tasks.py b/pipelines/datasets/br_rf_cafir/tasks.py index 5beaa5d8f..8c4b9b5b9 100644 --- a/pipelines/datasets/br_rf_cafir/tasks.py +++ b/pipelines/datasets/br_rf_cafir/tasks.py @@ -178,3 +178,16 @@ def parse_data(url: str, other_task_output: tuple[list[datetime], list[str]]) -> ) return files_path + + +@task +def convert_datetime_to_string(data: datetime): + """Converte a data para string + + Args: + date (datetime): Data + + Returns: + string: Data no formato string + """ + return str(data) From 88110e3b8face52c1a265d8c2e2cb9ead02448ee Mon Sep 17 00:00:00 2001 From: tricktx Date: Fri, 15 Sep 2023 13:01:14 -0300 Subject: [PATCH 14/43] default data --- pipelines/utils/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/utils/utils.py b/pipelines/utils/utils.py index 3282dc643..57dc2758e 100644 --- a/pipelines/utils/utils.py +++ b/pipelines/utils/utils.py @@ -726,7 +726,7 @@ def extract_last_update( def extract_last_date( - dataset_id, table_id, date_format: str, billing_project_id: str, data + dataset_id, table_id, date_format: str, billing_project_id: str, data: str = "data" ): """ Extracts the last update date of a given dataset table. From 2a189bdf4f55b68842ae694c6ac42b2316d484fe Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Fri, 15 Sep 2023 13:34:38 -0300 Subject: [PATCH 15/43] reordena upstream tasks --- pipelines/datasets/br_rf_cafir/flows.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pipelines/datasets/br_rf_cafir/flows.py b/pipelines/datasets/br_rf_cafir/flows.py index 73262f2ec..8303a07f4 100644 --- a/pipelines/datasets/br_rf_cafir/flows.py +++ b/pipelines/datasets/br_rf_cafir/flows.py @@ -57,7 +57,9 @@ is_outdated = check_if_bq_data_is_outdated( dataset_id=dataset_id, table_id=table_id, data=info[0], upstream_tasks=[info] ) - update_metadata_strig_date = convert_datetime_to_string(data=info[0]) + update_metadata_strig_date = convert_datetime_to_string( + data=info[0], upstream_tasks=[info, is_outdated] + ) with case(update_metadata, True): update = update_django_metadata( @@ -72,7 +74,7 @@ is_free=True, time_delta=6, time_unit="months", - upstream_tasks=[is_outdated, info], + upstream_tasks=[update_metadata_strig_date], ) with case(is_outdated, False): From a976d640c3a847f89e76113cf083248a920bfdcb Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Fri, 15 Sep 2023 14:39:49 -0300 Subject: [PATCH 16/43] =?UTF-8?q?tira=20updater=20do=20coverage=20gr=C3=A1?= =?UTF-8?q?tis?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pipelines/datasets/br_rf_cafir/flows.py | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/pipelines/datasets/br_rf_cafir/flows.py b/pipelines/datasets/br_rf_cafir/flows.py index 8303a07f4..fbbbadda0 100644 --- a/pipelines/datasets/br_rf_cafir/flows.py +++ b/pipelines/datasets/br_rf_cafir/flows.py @@ -61,22 +61,6 @@ data=info[0], upstream_tasks=[info, is_outdated] ) - with case(update_metadata, True): - update = update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - _last_date=update_metadata_strig_date, - bq_last_update=False, - api_mode="prod", - date_format="yy-mm-dd", - is_bd_pro=True, - is_free=True, - time_delta=6, - time_unit="months", - upstream_tasks=[update_metadata_strig_date], - ) - with case(is_outdated, False): log_task(f"Não há atualizações para a tabela de {table_id}!") @@ -126,6 +110,10 @@ wait_for_materialization.retry_delay = timedelta( seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + # TODO: Quando a nova fotografia for liberada setar is_free como True + # is_free como true. Não setei agora pq a task update_django_metadata depende + # de um coverage já criado na API. Como a lag entre fotográfias é de 5 meses (6 é o padrão no monento) + # não há necessidade de atualizar o coverage agora. with case(update_metadata, True): update = update_django_metadata( From 8a2a7d4f8a6237784601e772b36244272d69ac3c Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Fri, 15 Sep 2023 14:48:43 -0300 Subject: [PATCH 17/43] =?UTF-8?q?insere=20novo=20input=20na=20fun=C3=A7?= =?UTF-8?q?=C3=A3o=20update=5Fdjango=5Fmetadata?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pipelines/datasets/br_rf_cafir/flows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/datasets/br_rf_cafir/flows.py b/pipelines/datasets/br_rf_cafir/flows.py index fbbbadda0..63b9fdb60 100644 --- a/pipelines/datasets/br_rf_cafir/flows.py +++ b/pipelines/datasets/br_rf_cafir/flows.py @@ -120,7 +120,7 @@ dataset_id, table_id, metadata_type="DateTimeRange", - _last_date=info[0], + _last_date=update_metadata_strig_date, bq_last_update=False, api_mode="prod", date_format="yy-mm-dd", From 7158c4232dee7326021faf06ad1baa20ad7064a8 Mon Sep 17 00:00:00 2001 From: Pedro Castro Date: Fri, 15 Sep 2023 15:37:19 -0300 Subject: [PATCH 18/43] br_mp_pep_cargos_funcoes: update table name --- pipelines/datasets/br_mp_pep_cargos_funcoes/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/datasets/br_mp_pep_cargos_funcoes/tasks.py b/pipelines/datasets/br_mp_pep_cargos_funcoes/tasks.py index 1acab03e5..b5a93c810 100644 --- a/pipelines/datasets/br_mp_pep_cargos_funcoes/tasks.py +++ b/pipelines/datasets/br_mp_pep_cargos_funcoes/tasks.py @@ -400,7 +400,7 @@ def is_up_to_date() -> bool: log(f"Last date website: {text}, parsed as {date_website}") last_date_in_bq = extract_last_date( - "br_mp_pep", "cargos_funcoes_atualizado", "yy-mm", "basedosdados" + "br_mp_pep", "cargos_funcoes", "yy-mm", "basedosdados" ) date_in_bq = datetime.datetime.strptime(last_date_in_bq, "%Y-%m") From d1adcae6dce008feccb18b2b107355faf94b667e Mon Sep 17 00:00:00 2001 From: tricktx Date: Mon, 18 Sep 2023 09:34:23 -0300 Subject: [PATCH 19/43] dev --- pipelines/datasets/br_anp_precos_combustiveis/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/datasets/br_anp_precos_combustiveis/utils.py b/pipelines/datasets/br_anp_precos_combustiveis/utils.py index 38cf5eef1..69f821e87 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/utils.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/utils.py @@ -55,7 +55,7 @@ def get_id_municipio(id_municipio: pd.DataFrame): id_municipio = bd.read_table( dataset_id="br_bd_diretorios_brasil", table_id="municipio", - billing_project_id="basedosdados", + billing_project_id="basedosdados-dev", from_file=True, ) log("----" * 150) From e729bae174a65ce9be69923c4b21f9b456d3b924 Mon Sep 17 00:00:00 2001 From: tricktx Date: Mon, 18 Sep 2023 10:02:43 -0300 Subject: [PATCH 20/43] uretirando os dev --- pipelines/datasets/br_anp_precos_combustiveis/flows.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pipelines/datasets/br_anp_precos_combustiveis/flows.py b/pipelines/datasets/br_anp_precos_combustiveis/flows.py index eb156573d..ec824870b 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/flows.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/flows.py @@ -69,7 +69,6 @@ with case(dados_desatualizados, True): df = download_and_transform(upstream_tasks=[rename_flow_run]) output_path = make_partitions(df=df, upstream_tasks=[df]) - get_date_max_mais = data_max_bd_mais() get_date_max_pro = data_max_bd_pro(df=df) # pylint: disable=C0103 @@ -118,7 +117,7 @@ bq_last_update=False, api_mode="prod", date_format="yy-mm-dd", - _last_date=get_date_max_mais, + _last_date=get_date_max_pro, upstream_tasks=[wait_upload_table], ) From f05ec03ec7ca3e64fe6a7b6744daa978da574fe4 Mon Sep 17 00:00:00 2001 From: tricktx Date: Mon, 18 Sep 2023 10:28:49 -0300 Subject: [PATCH 21/43] retirando os dev --- pipelines/datasets/br_anp_precos_combustiveis/tasks.py | 2 +- pipelines/datasets/br_anp_precos_combustiveis/utils.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/datasets/br_anp_precos_combustiveis/tasks.py b/pipelines/datasets/br_anp_precos_combustiveis/tasks.py index 77755e3f9..48c911278 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/tasks.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/tasks.py @@ -44,7 +44,7 @@ def check_for_updates(dataset_id, table_id): # Obtém a última data no site BD data_bq_obj = extract_last_date( - dataset_id, table_id, "yy-mm-dd", "basedosdados-dev", data="data_coleta" + dataset_id, table_id, "yy-mm-dd", "basedosdados", data="data_coleta" ) # Registra a data mais recente do site diff --git a/pipelines/datasets/br_anp_precos_combustiveis/utils.py b/pipelines/datasets/br_anp_precos_combustiveis/utils.py index 69f821e87..38cf5eef1 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/utils.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/utils.py @@ -55,7 +55,7 @@ def get_id_municipio(id_municipio: pd.DataFrame): id_municipio = bd.read_table( dataset_id="br_bd_diretorios_brasil", table_id="municipio", - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", from_file=True, ) log("----" * 150) From 1d5e08c02261b700e81125694920e142d2760a03 Mon Sep 17 00:00:00 2001 From: Laura Amaral Date: Tue, 19 Sep 2023 09:32:21 -0300 Subject: [PATCH 22/43] remove br_sp_dieese_icv schedules --- pipelines/datasets/br_sp_saopaulo_dieese_icv/flows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/datasets/br_sp_saopaulo_dieese_icv/flows.py b/pipelines/datasets/br_sp_saopaulo_dieese_icv/flows.py index 4a43ad439..55ec12014 100644 --- a/pipelines/datasets/br_sp_saopaulo_dieese_icv/flows.py +++ b/pipelines/datasets/br_sp_saopaulo_dieese_icv/flows.py @@ -96,4 +96,4 @@ br_sp_dieese.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_sp_dieese.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) -br_sp_dieese.schedule = every_month +# br_sp_dieese.schedule = every_month From 762884ae525e4b7b0719d0302e4f398d0da9c2bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Tue, 19 Sep 2023 16:18:59 -0300 Subject: [PATCH 23/43] fix: fixing cda flow --- pipelines/datasets/br_cvm_fi/constants.py | 2 ++ pipelines/datasets/br_cvm_fi/tasks.py | 11 ++++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/pipelines/datasets/br_cvm_fi/constants.py b/pipelines/datasets/br_cvm_fi/constants.py index 88ce77ca4..edf8391c8 100644 --- a/pipelines/datasets/br_cvm_fi/constants.py +++ b/pipelines/datasets/br_cvm_fi/constants.py @@ -558,3 +558,5 @@ class constants(Enum): # pylint: disable=c0103 URL_BALANCETE = "https://dados.cvm.gov.br/dados/FI/DOC/BALANCETE/DADOS/" ARQUITETURA_URL_BALANCETE = "https://docs.google.com/spreadsheets/d/1eIMo_hYHy89oh6kHRN9Kh0NytUZzr8__/edit#gid=1045172528" + + ARQUITETURA_URL_CDA = "https://docs.google.com/spreadsheets/d/1V2XHBXBB_biC0cLoMZ3FxtbC7CPLxQXZhIY7iJDtsSw/edit#gid=0" diff --git a/pipelines/datasets/br_cvm_fi/tasks.py b/pipelines/datasets/br_cvm_fi/tasks.py index 1a51ae2e9..303efc2f7 100644 --- a/pipelines/datasets/br_cvm_fi/tasks.py +++ b/pipelines/datasets/br_cvm_fi/tasks.py @@ -242,7 +242,7 @@ def clean_data_and_make_partitions(path: str, table_id: str) -> str: @task def clean_data_make_partitions_cda(diretorio, table_id): - df_arq = sheet_to_df(cvm_constants.ARQUITETURA_URL.value) + df_arq = sheet_to_df(cvm_constants.ARQUITETURA_URL_CDA.value) anos_meses = obter_anos_meses(diretorio) for i in anos_meses: @@ -275,10 +275,13 @@ def clean_data_make_partitions_cda(diretorio, table_id): df_final[cvm_constants.COLUNAS.value] = df_final[ cvm_constants.COLUNAS.value ].applymap(lambda x: cvm_constants.MAPEAMENTO.value.get(x, x)) + df_final["CNPJ_FUNDO"] = df_final["CNPJ_FUNDO"].str.replace(r"[/.-]", "") + df_final["CNPJ_INSTITUICAO_FINANC_COOBR"] = df_final[ "CNPJ_INSTITUICAO_FINANC_COOBR" ].str.replace(r"[/.-]", "") + df_final["CPF_CNPJ_EMISSOR"] = df_final["CPF_CNPJ_EMISSOR"].str.replace( r"[/.-]", "" ) @@ -288,15 +291,21 @@ def clean_data_make_partitions_cda(diretorio, table_id): ) df_final = rename_columns(df_arq, df_final) df_final = df_final.replace(",", ".", regex=True) + df_final[cvm_constants.COLUNAS_ASCI.value] = df_final[ cvm_constants.COLUNAS_ASCI.value ].fillna("") + df_final[cvm_constants.COLUNAS_ASCI.value] = df_final[ cvm_constants.COLUNAS_ASCI.value ].applymap(limpar_string) + df_final = df_final[cvm_constants.COLUNAS_TOTAIS.value] + log(f"Fazendo partições para o ano ------> {i}") + os.makedirs(f"/tmp/data/br_cvm_fi/{table_id}/output/", exist_ok=True) + to_partitions( df_final, partition_columns=["ano", "mes"], From ee5b4e0bd0346ae859f4d00d30d278f6e933ac15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Tue, 19 Sep 2023 16:58:55 -0300 Subject: [PATCH 24/43] feat: testing perfil mensal flow --- pipelines/datasets/br_cvm_fi/flows.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pipelines/datasets/br_cvm_fi/flows.py b/pipelines/datasets/br_cvm_fi/flows.py index 4acaca4ea..55f0c0346 100644 --- a/pipelines/datasets/br_cvm_fi/flows.py +++ b/pipelines/datasets/br_cvm_fi/flows.py @@ -385,9 +385,14 @@ default=cvm_constants.URL_PERFIL_MENSAL.value, required=False, ) + arquivos = Parameter( + "arquivos", + default=["perfil_mensal_fi_202308.csv", "perfil_mensal_fi_202307.csv"], + required=False, + ) df = extract_links_and_dates(url) - arquivos = check_for_updates(df, upstream_tasks=[df]) + # arquivos = check_for_updates(df, upstream_tasks=[df]) with case(is_empty(arquivos), True): log_task(f"Não houveram atualizações em {url.default}!") From 475fbabdee91f2beb0b7c85bb6a345ebd3aa0eb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Tue, 19 Sep 2023 17:09:58 -0300 Subject: [PATCH 25/43] feat: adding readr --- pipelines/datasets/br_cvm_fi/tasks.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pipelines/datasets/br_cvm_fi/tasks.py b/pipelines/datasets/br_cvm_fi/tasks.py index 303efc2f7..491c17f17 100644 --- a/pipelines/datasets/br_cvm_fi/tasks.py +++ b/pipelines/datasets/br_cvm_fi/tasks.py @@ -408,11 +408,11 @@ def clean_data_make_partitions_perfil(diretorio, table_id): utils.chooseCRANmirror(ind=1) # R package names packnames = "readr" - + utils.install_packages(packnames) # R vector of strings - names_to_install = [x for x in packnames if not rpackages.isinstalled(x)] - if len(names_to_install) > 0: - utils.install_packages(StrVector(names_to_install)) + # names_to_install = [x for x in packnames if not rpackages.isinstalled(x)] + # if len(names_to_install) > 0: + # Import readr readr = rpackages.importr("readr") From 0bc6f93a126d5ed6184424668dbc1f1cc443917d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Tue, 19 Sep 2023 17:29:11 -0300 Subject: [PATCH 26/43] fix: small fix --- pipelines/datasets/br_cvm_fi/flows.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pipelines/datasets/br_cvm_fi/flows.py b/pipelines/datasets/br_cvm_fi/flows.py index 55f0c0346..4acaca4ea 100644 --- a/pipelines/datasets/br_cvm_fi/flows.py +++ b/pipelines/datasets/br_cvm_fi/flows.py @@ -385,14 +385,9 @@ default=cvm_constants.URL_PERFIL_MENSAL.value, required=False, ) - arquivos = Parameter( - "arquivos", - default=["perfil_mensal_fi_202308.csv", "perfil_mensal_fi_202307.csv"], - required=False, - ) df = extract_links_and_dates(url) - # arquivos = check_for_updates(df, upstream_tasks=[df]) + arquivos = check_for_updates(df, upstream_tasks=[df]) with case(is_empty(arquivos), True): log_task(f"Não houveram atualizações em {url.default}!") From c832a85c45a7e430ef14652f33c5a5f37a3222fd Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Wed, 20 Sep 2023 09:06:38 -0300 Subject: [PATCH 27/43] Corrige flow estabelecimento_filantropico --- pipelines/datasets/br_ms_cnes/flows.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pipelines/datasets/br_ms_cnes/flows.py b/pipelines/datasets/br_ms_cnes/flows.py index d76b9dd82..be11de58b 100644 --- a/pipelines/datasets/br_ms_cnes/flows.py +++ b/pipelines/datasets/br_ms_cnes/flows.py @@ -820,7 +820,7 @@ table_id=table_id, billing_project_id="basedosdados", cnes_database="CNES", - cnes_group_file=br_ms_cnes_constants.DATABASE_GROUPS.value["CNES"][6], + cnes_group_file=br_ms_cnes_constants.DATABASE_GROUPS.value["CNES"][7], ) with case(is_empty(files_path), True): @@ -832,13 +832,13 @@ dbc_files = access_ftp_donwload_files( file_list=files_path, path=br_ms_cnes_constants.PATH.value[0], - table=br_ms_cnes_constants.TABLE.value[6], + table=br_ms_cnes_constants.TABLE.value[7], ) filepath = read_dbc_save_csv( file_list=dbc_files, path=br_ms_cnes_constants.PATH.value[1], - table=br_ms_cnes_constants.TABLE.value[6], + table=br_ms_cnes_constants.TABLE.value[7], upstream_tasks=[files_path, dbc_files], ) From 1e49ef904d020210a46c55281969f101a17cb617 Mon Sep 17 00:00:00 2001 From: Gagabrielle-carv Date: Wed, 20 Sep 2023 11:04:59 -0300 Subject: [PATCH 28/43] add dados --- .../constants.py | 68 ++- .../mundo_transfermarkt_competicoes/flows.py | 146 +++-- .../schedules.py | 84 +-- .../mundo_transfermarkt_competicoes/tasks.py | 56 +- .../mundo_transfermarkt_competicoes/utils.py | 498 +++++++++++++++++- 5 files changed, 631 insertions(+), 221 deletions(-) diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/constants.py b/pipelines/datasets/mundo_transfermarkt_competicoes/constants.py index 22f4c9070..59fdc6b2e 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/constants.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/constants.py @@ -2,33 +2,6 @@ """ Constant values for the datasets projects """ - - -############################################################################### -# -# Esse é um arquivo onde podem ser declaratas constantes que serão usadas -# pelo projeto mundo_transfermarkt. -# -# Por ser um arquivo opcional, pode ser removido sem prejuízo ao funcionamento -# do projeto, caos não esteja em uso. -# -# Para declarar constantes, basta fazer conforme o exemplo abaixo: -# -# ``` -# class constants(Enum): -# """ -# Constant values for the mundo_transfermarkt_competicoes project -# """ -# FOO = "bar" -# ``` -# -# Para usá-las, basta fazer conforme o exemplo abaixo: -# -# ```py -# from pipelines.datasets.mundo_transfermarkt_competicoes.constants import constants -# print(constants.FOO.value) -# ``` -# ############################################################################### from enum import Enum @@ -81,3 +54,44 @@ class constants(Enum): # pylint: disable=c0103 "chutes_fora_man", "chutes_fora_vis", ] + + ORDEM_COPA_BRASIL = [ + "ano_campeonato", + "data", + "horario", + "fase", + "tipo_fase", + "estadio", + "arbitro", + "publico", + "publico_max", + "time_man", + "time_vis", + "tecnico_man", + "tecnico_vis", + "valor_equipe_titular_man", + "valor_equipe_titular_vis", + "idade_media_titular_man", + "idade_media_titular_vis", + "gols_man", + "gols_vis", + "gols_1_tempo_man", + "gols_1_tempo_vis", + "penalti", + "gols_penalti_man", + "gols_penalti_vis", + "escanteios_man", + "escanteios_vis", + "faltas_man", + "faltas_vis", + "chutes_bola_parada_man", + "chutes_bola_parada_vis", + "defesas_man", + "defesas_vis", + "impedimentos_man", + "impedimentos_vis", + "chutes_man", + "chutes_vis", + "chutes_fora_man", + "chutes_fora_vis", + ] diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py b/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py index 48fcb46ee..b8744b493 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py @@ -2,59 +2,6 @@ """ Flows for mundo_transfermarkt_competicoes """ - -############################################################################### -# -# Aqui é onde devem ser definidos os flows do projeto. -# Cada flow representa uma sequência de passos que serão executados -# em ordem. -# -# Mais informações sobre flows podem ser encontradas na documentação do -# Prefect: https://docs.prefect.io/core/concepts/flows.html -# -# De modo a manter consistência na codebase, todo o código escrito passará -# pelo pylint. Todos os warnings e erros devem ser corrigidos. -# -# Existem diversas maneiras de declarar flows. No entanto, a maneira mais -# conveniente e recomendada pela documentação é usar a API funcional. -# Em essência, isso implica simplesmente na chamada de funções, passando -# os parâmetros necessários para a execução em cada uma delas. -# -# Também, após a definição de um flow, para o adequado funcionamento, é -# mandatório configurar alguns parâmetros dele, os quais são: -# - storage: onde esse flow está armazenado. No caso, o storage é o -# próprio módulo Python que contém o flow. Sendo assim, deve-se -# configurar o storage como o pipelines.datasets -# - run_config: para o caso de execução em cluster Kubernetes, que é -# provavelmente o caso, é necessário configurar o run_config com a -# imagem Docker que será usada para executar o flow. Assim sendo, -# basta usar constants.DOCKER_IMAGE.value, que é automaticamente -# gerado. -# - schedule (opcional): para o caso de execução em intervalos regulares, -# deve-se utilizar algum dos schedules definidos em schedules.py -# -# Um exemplo de flow, considerando todos os pontos acima, é o seguinte: -# -# ----------------------------------------------------------------------------- -# from prefect import task -# from prefect import Flow -# from prefect.run_configs import KubernetesRun -# from prefect.storage import GCS -# from pipelines.constants import constants -# from my_tasks import my_task, another_task -# from my_schedules import some_schedule -# -# with Flow("my_flow") as flow: -# a = my_task(param1=1, param2=2) -# b = another_task(a, param3=3) -# -# flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -# flow.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) -# flow.schedule = some_schedule -# ----------------------------------------------------------------------------- -# -# Abaixo segue um código para exemplificação, que pode ser removido. -# ############################################################################### from pipelines.datasets.mundo_transfermarkt_competicoes.constants import ( constants as mundo_constants, @@ -64,8 +11,14 @@ get_max_data, execucao_coleta_sync, ) -from pipelines.datasets.mundo_transfermarkt_competicoes.utils import execucao_coleta -from pipelines.datasets.mundo_transfermarkt_competicoes.schedules import every_week +from pipelines.datasets.mundo_transfermarkt_competicoes.utils import ( + execucao_coleta, + execucao_coleta_copa, +) +from pipelines.datasets.mundo_transfermarkt_competicoes.schedules import ( + every_week, + every_week_copa, +) from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, rename_current_flow_run_dataset_table, @@ -109,7 +62,7 @@ ) df = execucao_coleta_sync(execucao_coleta) output_filepath = make_partitions(df, upstream_tasks=[df]) - data_maxima = get_max_data() + data_maxima = get_max_data(output_filepath, upstream_tasks=[output_filepath]) wait_upload_table = create_table_and_upload_to_gcs( data_path=output_filepath, @@ -168,3 +121,84 @@ image=constants.DOCKER_IMAGE.value ) transfermarkt_brasileirao_flow.schedule = every_week + +with Flow( + name="mundo_transfermarkt_competicoes.copa_brasil", + code_owners=[ + "Gabs", + ], +) as transfermarkt_copa_flow: + dataset_id = Parameter( + "dataset_id", default="mundo_transfermarkt_competicoes", required=True + ) + table_id = Parameter("table_id", default="copa_brasil", required=True) + materialization_mode = Parameter( + "materialization_mode", default="dev", required=False + ) + materialize_after_dump = Parameter( + "materialize_after_dump", default=True, required=False + ) + dbt_alias = Parameter("dbt_alias", default=True, required=False) + + rename_flow_run = rename_current_flow_run_dataset_table( + prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id + ) + df = execucao_coleta_sync(execucao_coleta_copa) + output_filepath = make_partitions(df, upstream_tasks=[df]) + data_maxima = get_max_data(output_filepath, upstream_tasks=[output_filepath]) + + wait_upload_table = create_table_and_upload_to_gcs( + data_path=output_filepath, + dataset_id=dataset_id, + table_id=table_id, + dump_mode="append", + wait=output_filepath, + ) + + with case(materialize_after_dump, True): + # Trigger DBT flow run + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": table_id, + "mode": materialization_mode, + "dbt_alias": dbt_alias, + }, + labels=current_flow_labels, + run_name=r"Materialize {dataset_id}.{table_id}", + ) + + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + _last_date=data_maxima, + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=True, + is_free=True, + time_delta=1, + time_unit="year", + date_format="yy-mm-dd", + api_mode="prod", + ) + +transfermarkt_copa_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +transfermarkt_copa_flow.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +transfermarkt_copa_flow.schedule = every_week_copa diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py b/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py index a046af2cb..59b5dbbea 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py @@ -3,70 +3,6 @@ Schedules for mundo_transfermarkt_competicoes """ -############################################################################### -# -# Aqui é onde devem ser definidos os schedules para os flows do projeto. -# Cada schedule indica o intervalo de tempo entre as execuções. -# Um schedule pode ser definido para um ou mais flows. -# Mais informações sobre schedules podem ser encontradas na documentação do -# Prefect: https://docs.prefect.io/core/concepts/schedules.html -# -# De modo a manter consistência na codebase, todo o código escrito passará -# pelo pylint. Todos os warnings e erros devem ser corrigidos. -# -# Os schedules devem ser definidos de acordo com a sintaxe do Prefect, como, -# por exemplo, o seguinte (para executar a cada 1 minuto): -# -# ----------------------------------------------------------------------------- -# from datetime import timedelta, datetime -# from prefect.schedules import Schedule -# from prefect.schedules.clocks import IntervalClock -# from pipelines.constants import constants -# -# minute_schedule = Schedule( -# clocks=[ -# IntervalClock( -# interval=timedelta(minutes=1), -# start_date=datetime(2021, 1, 1), -# labels=[ -# constants.DATASETS_AGENT_LABEL.value, -# ] -# ), -# ] -# ) -# ----------------------------------------------------------------------------- -# -# Vale notar que o parâmetro `labels` é obrigatório e deve ser uma lista com -# apenas um elemento, correspondendo ao label do agente que será executado. -# O label do agente é definido em `constants.py` e deve ter o formato -# `DATASETS_AGENT_LABEL`. -# -# Outro exemplo, para executar todos os dias à meia noite, segue abaixo: -# -# ----------------------------------------------------------------------------- -# from prefect import task -# from datetime import timedelta -# import pendulum -# from prefect.schedules import Schedule -# from prefect.schedules.clocks import IntervalClock -# from pipelines.constants import constants -# -# every_day_at_midnight = Schedule( -# clocks=[ -# IntervalClock( -# interval=timedelta(days=1), -# start_date=pendulum.datetime( -# 2021, 1, 1, 0, 0, 0, tz="America/Sao_Paulo"), -# labels=[ -# constants.K8S_AGENT_LABEL.value, -# ] -# ) -# ] -# ) -# ----------------------------------------------------------------------------- -# -# Abaixo segue um código para exemplificação, que pode ser removido. -# ############################################################################### from prefect.schedules.clocks import CronClock @@ -93,3 +29,23 @@ ), ] ) + + +every_week_copa = Schedule( + clocks=[ + CronClock( + cron="0 9 * 2-12 2", + start_date=datetime(2023, 5, 1, 7, 30), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "mundo_transfermarkt_competicoes", + "table_id": "copa_brasil", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": False, + }, + ), + ] +) diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py b/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py index fdc0745cc..bece486bf 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py @@ -3,50 +3,6 @@ Tasks for mundo_transfermarkt_competicoes """ -############################################################################### -# -# Aqui é onde devem ser definidas as tasks para os flows do projeto. -# Cada task representa um passo da pipeline. Não é estritamente necessário -# tratar todas as exceções que podem ocorrer durante a execução de uma task, -# mas é recomendável, ainda que não vá implicar em uma quebra no sistema. -# Mais informações sobre tasks podem ser encontradas na documentação do -# Prefect: https://docs.prefect.io/core/concepts/tasks.html -# -# De modo a manter consistência na codebase, todo o código escrito passará -# pelo pylint. Todos os warnings e erros devem ser corrigidos. -# -# As tasks devem ser definidas como funções comuns ao Python, com o decorador -# @task acima. É recomendado inserir type hints para as variáveis. -# -# Um exemplo de task é o seguinte: -# -# ----------------------------------------------------------------------------- -# from prefect import task -# -# @task -# def my_task(param1: str, param2: int) -> str: -# """ -# My task description. -# """ -# return f'{param1} {param2}' -# ----------------------------------------------------------------------------- -# -# Você também pode usar pacotes Python arbitrários, como numpy, pandas, etc. -# -# ----------------------------------------------------------------------------- -# from prefect import task -# import numpy as np -# -# @task -# def my_task(a: np.ndarray, b: np.ndarray) -> str: -# """ -# My task description. -# """ -# return np.add(a, b) -# ----------------------------------------------------------------------------- -# -# Abaixo segue um código para exemplificação, que pode ser removido. -# ############################################################################### from pipelines.datasets.mundo_transfermarkt_competicoes.constants import ( constants as mundo_constants, @@ -82,9 +38,11 @@ def make_partitions(df): @task -def get_max_data(): - # ano = mundo_constants.DATA_ATUAL_ANO.value - # df = pd.read_csv(f"{file_path}ano_campeonato={ano}/data.csv") - # df["data"] = pd.to_datetime(df["data"]).dt.date - max_data = mundo_constants.DATA_ATUAL.value +def get_max_data(file_path): + ano = mundo_constants.DATA_ATUAL_ANO.value + df = pd.read_csv(f"{file_path}ano_campeonato={ano}/data.csv") + df["data"] = pd.to_datetime(df["data"]).dt.date + max_data = df["data"].max() + + # max_data = mundo_constants.DATA_ATUAL.value return max_data diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/utils.py b/pipelines/datasets/mundo_transfermarkt_competicoes/utils.py index e8209189d..cf1be24f2 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/utils.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/utils.py @@ -3,31 +3,6 @@ General purpose functions for the mundo_transfermarkt_competicoes project """ -############################################################################### - -# Esse é um arquivo onde podem ser declaratas funções que serão usadas -# pelo projeto mundo_transfermarkt_competicoes. -# -# Por ser um arquivo opcional, pode ser removido sem prejuízo ao funcionamento -# do projeto, caos não esteja em uso. -# -# Para declarar funções, basta fazer em código Python comum, como abaixo: -# -# ``` -# def foo(): -# """ -# Function foo -# """ -# print("foo") -# ``` -# -# Para usá-las, basta fazer conforme o exemplo abaixo: -# -# ```py -# from pipelines.datasets.mundo_transfermarkt_competicoes.utils import foo -# foo() -# ``` -# ############################################################################### import re from bs4 import BeautifulSoup @@ -524,3 +499,476 @@ def sem_info(x, y): df = df[mundo_constants.ORDEM_COLUNA_FINAL.value] return df + + +# ! Código para a Copa do Brasil +def process_copa_brasil(df, content): + """ + Process complete + """ + new_content = { + "estadio": content.find_all("td", attrs={"class": "hauptlink"})[0].get_text(), + "data": re.search( + re.compile(r"\d+/\d+/\d+"), + content.find("a", text=re.compile(r"\d+/\d+/\d")).get_text().strip(), + ).group(0), + "horario": content.find_all("p", attrs={"class": "sb-datum hide-for-small"})[0] + .get_text() + .split("|")[2] + .strip(), + "fase": content.find_all("p", attrs={"class": "sb-datum hide-for-small"})[0] + .get_text() + .split("|")[0] + .strip(), + "publico": content.find_all("td", attrs={"class": "hauptlink"})[1].get_text(), + "publico_max": content.find_all("table", attrs={"class": "profilheader"})[0] + .find_all("td")[2] + .get_text(), + "arbitro": content.find_all("table", attrs={"class": "profilheader"})[1] + .find_all("a")[0] + .get_text(), + "gols_1_tempo_man": content.find_all("div", attrs={"class": "sb-halbzeit"})[0] + .get_text() + .split(":", 1)[0], + "gols_1_tempo_vis": content.find_all("div", attrs={"class": "sb-halbzeit"})[0] + .get_text() + .split(":", 1)[1], + "chutes_man": content.find_all("div", attrs={"class": "sb-statistik-zahl"})[ + 0 + ].get_text(), + "chutes_vis": content.find_all("div", attrs={"class": "sb-statistik-zahl"})[ + 1 + ].get_text(), + "chutes_fora_man": content.find_all( + "div", attrs={"class": "sb-statistik-zahl"} + )[2].get_text(), + "chutes_fora_vis": content.find_all( + "div", attrs={"class": "sb-statistik-zahl"} + )[3].get_text(), + "defesas_man": content.find_all("div", attrs={"class": "sb-statistik-zahl"})[ + 4 + ].get_text(), + "defesas_vis": content.find_all("div", attrs={"class": "sb-statistik-zahl"})[ + 5 + ].get_text(), + "faltas_man": content.find_all("div", attrs={"class": "sb-statistik-zahl"})[ + 10 + ].get_text(), + "faltas_vis": content.find_all("div", attrs={"class": "sb-statistik-zahl"})[ + 11 + ].get_text(), + "escanteios_man": content.find_all("div", attrs={"class": "sb-statistik-zahl"})[ + 6 + ].get_text(), + "escanteios_vis": content.find_all("div", attrs={"class": "sb-statistik-zahl"})[ + 7 + ].get_text(), + "impedimentos_man": content.find_all( + "div", attrs={"class": "sb-statistik-zahl"} + )[12].get_text(), + "impedimentos_vis": content.find_all( + "div", attrs={"class": "sb-statistik-zahl"} + )[13].get_text(), + "chutes_bola_parada_man": content.find_all( + "div", attrs={"class": "sb-statistik-zahl"} + )[8].get_text(), + "chutes_bola_parada_vis": content.find_all( + "div", attrs={"class": "sb-statistik-zahl"} + )[9].get_text(), + } + df = pd.concat([df, pd.DataFrame([new_content])], ignore_index=True) + return df + + +def process_basico_copa_brasil(df, content): + """ + Process data + """ + new_content = { + "estadio": content.find_all("td", attrs={"class": "hauptlink"})[0].get_text(), + "data": re.search( + re.compile(r"\d+/\d+/\d+"), + content.find("a", text=re.compile(r"\d+/\d+/\d")).get_text().strip(), + ).group(0), + "horario": content.find_all("p", attrs={"class": "sb-datum hide-for-small"})[0] + .get_text() + .split("|")[2] + .strip(), + "fase": content.find_all("p", attrs={"class": "sb-datum hide-for-small"})[0] + .get_text() + .split("|")[0] + .strip(), + "publico": content.find_all("td", attrs={"class": "hauptlink"})[1].get_text(), + "publico_max": content.find_all("table", attrs={"class": "profilheader"})[0] + .find_all("td")[2] + .get_text(), + "arbitro": None, + "gols_1_tempo_man": None, + "gols_1_tempo_vis": None, + "chutes_man": None, + "chutes_vis": None, + "chutes_fora_man": None, + "chutes_fora_vis": None, + "defesas_man": None, + "defesas_vis": None, + "faltas_man": None, + "faltas_vis": None, + "escanteios_man": None, + "escanteios_vis": None, + "impedimentos_man": None, + "impedimentos_vis": None, + "chutes_bola_parada_man": None, + "chutes_bola_parada_vis": None, + } + df = pd.concat([df, pd.DataFrame([new_content])], ignore_index=True) + return df + + +def vazio_copa_brasil(df): + """ + Return a template DataFrame + """ + new_content = { + "estadio": None, + "data": None, + "horario": None, + "fase": None, + "publico": None, + "publico_max": None, + "arbitro": None, + "gols_1_tempo_man": None, + "gols_1_tempo_vis": None, + "chutes_man": None, + "chutes_vis": None, + "chutes_fora_man": None, + "chutes_fora_vis": None, + "defesas_man": None, + "defesas_vis": None, + "faltas_man": None, + "faltas_vis": None, + "escanteios_man": None, + "escanteios_vis": None, + "impedimentos_man": None, + "impedimentos_vis": None, + "chutes_bola_parada_man": None, + "chutes_bola_parada_vis": None, + } + df = pd.concat([df, pd.DataFrame([new_content])], ignore_index=True) + return df + + +def pegar_valor_copa_brasil(df, content): + """ + Get value + """ + # gera um dicionário + valor_content = { + "valor_equipe_titular_man": content.find_all("div", class_="table-footer")[0] + .find_all("td")[3] + .get_text() + .split("€", 1)[1], + "valor_equipe_titular_vis": content.find_all("div", class_="table-footer")[1] + .find_all("td")[3] + .get_text() + .split("€", 1)[1], + "idade_media_titular_man": content.find_all("div", class_="table-footer")[0] + .find_all("td")[1] + .get_text() + .split(":", 1)[1] + .strip(), + "idade_media_titular_vis": content.find_all("div", class_="table-footer")[1] + .find_all("td")[1] + .get_text() + .split(":", 1)[1] + .strip(), + "tecnico_man": content.find_all("a", attrs={"id": "0"})[1].get_text(), + "tecnico_vis": content.find_all("a", attrs={"id": "0"})[3].get_text(), + } + df = pd.concat([df, pd.DataFrame([valor_content])], ignore_index=True) + return df + + +def pegar_valor_sem_tecnico_copa_brasil(df, content): + """ + Get value without technical + """ + valor_content = { + "valor_equipe_titular_man": content.find_all("div", class_="table-footer")[0] + .find_all("td")[3] + .get_text() + .split("€", 1)[1], + "valor_equipe_titular_vis": content.find_all("div", class_="table-footer")[1] + .find_all("td")[3] + .get_text() + .split("€", 1)[1], + "idade_media_titular_man": content.find_all("div", class_="table-footer")[0] + .find_all("td")[1] + .get_text() + .split(":", 1)[1] + .strip(), + "idade_media_titular_vis": content.find_all("div", class_="table-footer")[1] + .find_all("td")[1] + .get_text() + .split(":", 1)[1] + .strip(), + "tecnico_man": None, + "tecnico_vis": None, + } + df = pd.concat([df, pd.DataFrame([valor_content])], ignore_index=True) + return df + + +def valor_vazio_copa_brasil(df): + """ + Return a temmplate DataFrame + """ + valor_content = { + "valor_equipe_titular_man": None, + "valor_equipe_titular_vis": None, + "idade_media_titular_man": None, + "idade_media_titular_vis": None, + "tecnico_man": None, + "tecnico_vis": None, + } + df = pd.concat([df, pd.DataFrame([valor_content])], ignore_index=True) + return df + + +async def execucao_coleta_copa(): + base_url = "https://www.transfermarkt.com/copa-do-brasil/gesamtspielplan/pokalwettbewerb/BRC/saison_id/{season}" + headers = { + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36" + } + + pattern_man = re.compile(r"\d+:") + pattern_vis = re.compile(r":\d+") + + base_link = "https://www.transfermarkt.com" + base_link_br = "https://www.transfermarkt.com.br" + links = [] + time_man = [] + time_vis = [] + gols = [] + gols_man = [] + gols_vis = [] + penalti = [] + lista_nova = [] + + season = mundo_constants.SEASON.value + # Pegar o link das partidas + # Para cada temporada, adiciona os links dos jogos em `links` + log(f"Obtendo links: temporada {season}") + site_data = requests.get(base_url.format(season=season), headers=headers) + soup = BeautifulSoup(site_data.content, "html.parser") + link_tags = soup.find_all("a", attrs={"class": "ergebnis-link"}) + for tag in link_tags: + links.append(re.sub(r"\s", "", tag["href"])) + + tabela_grand = soup.findAll("div", class_="box")[1] + tabela = tabela_grand.findAll("tbody") + for i in range(0, len(tabela)): + # for i in range(0, 2): + for row in tabela[i].findAll("tr"): + if not row.get("class"): + td_tags = row.findAll("td") + # Verifica se existem pelo menos três na linha + if len(td_tags) >= 3: + time_man.append(td_tags[2].text.strip()) + time_vis.append(td_tags[6].text.strip()) + gols.append(td_tags[4].text.strip()) + + while ( + len(links) != len(time_man) + or len(links) != len(time_vis) + or len(links) != len(gols) + ): + if len(links) != len(time_man): + time_man.pop(0) + if len(links) != len(time_vis): + time_vis.pop(0) + if len(links) != len(gols): + gols.pop(0) + + for gol in gols: + penalti.append(1 if "on pens" in gol else 0) + + pares = zip(links, penalti) + for link, valor_penalti in pares: + if valor_penalti == 1: + link_data = requests.get(base_link + link, headers=headers) + link_soup = BeautifulSoup(link_data.content, "html.parser") + content = link_soup.find("div", id="main") + content_gol = content.find_all("div", attrs={"class": "sb-ereignisse"}) + # Encontre a tag h2 com a classe "content-box-headline" + h2_tags = content.find_all("h2", class_="content-box-headline") + + # Itere pelas tags h2 encontradas + for h2_tag in h2_tags: + if "Goals" in h2_tag.text: + content_gol = content.find_all( + "div", attrs={"class": "sb-ereignisse"} + ) + resultado = ( + content_gol[0] + .find_all("div", attrs={"class": "sb-aktion-spielstand"})[-1] + .get_text() + ) + break # Pare a iteração assim que encontrar "Goals" + else: + resultado = None + # Após a iteração, verifique se resultado é None e, se for, adicione '0:0' à lista + if resultado is None: + lista_nova.append("0:0") + else: + lista_nova.append(resultado) + else: + lista_nova.append(None) + + if len(lista_nova) == len(gols): + for i in range(len(lista_nova)): + # Verifique se o valor em 'lista_nova' é None e substitua pelo valor de 'goals' na mesma posição + if lista_nova[i] is None: + lista_nova[i] = gols.copy()[i] + + for gol in lista_nova: + gols_man.append(str(pattern_man.findall(str(gol)))) + gols_vis.append(str(pattern_vis.findall(str(gol)))) + + gol_pen_man = [] + gol_pen_vis = [] + + for gol in gols: + # Use expressão regular para encontrar os gols das equipes "man" e "vis" apenas quando "on pens" está presente + if "on pens" in gol: + gol_pen_man.append(str(pattern_man.findall(str(gol)))) + gol_pen_vis.append(str(pattern_vis.findall(str(gol)))) + else: + gol_pen_man.append(None) + gol_pen_vis.append(None) + + # links das estatísticas + links_esta = [] + # links das escalações de cada partida + links_valor = [] + + for link in links: + esta = link.replace("index", "statistik") + links_esta.append(esta) + for link in links: + valor = link.replace("index", "aufstellung") + links_valor.append(valor) + + n_links = len(links) + log(f"Encontrados {n_links} partidas.") + log("Extraindo dados...") + + df = pd.DataFrame( + {"time_man": [], "time_vis": [], "gols_man": [], "gols_vis": [], "penalti": []} + ) + df_valor = pd.DataFrame({}) + + for n, link in enumerate(links_esta): + content = await get_content(base_link_br + link, wait_time=0.01) + if content: + try: + df = process_copa_brasil(df, content) + except Exception: + try: + df = process_basico_copa_brasil(df, content) + except Exception: + df = vazio_copa_brasil(df) + else: + df = vazio_copa_brasil(df) + log(f"{n+1} dados sobre estatística de {n_links} extraídos.") + + for n, link in enumerate(links_valor): + content = await get_content(base_link + link, wait_time=0.01) + + if content: + try: + df_valor = pegar_valor_copa_brasil(df_valor, content) + except Exception: + try: + df_valor = pegar_valor_sem_tecnico_copa_brasil(df_valor, content) + except Exception: + df_valor = valor_vazio_copa_brasil(df_valor) + else: + df_valor = valor_vazio_copa_brasil(df_valor) + log(f"{n+1} valores de {n_links} extraídos.") + + df["time_man"] = time_man + df["time_vis"] = time_vis + df["gols_man"] = gols_man + df["gols_vis"] = gols_vis + df["penalti"] = penalti + df["gols_penalti_man"] = gol_pen_man + df["gols_penalti_vis"] = gol_pen_vis + # Limpando variável + df["gols_man"] = df["gols_man"].map(lambda x: x.replace("['", "")) + df["gols_man"] = df["gols_man"].map(lambda x: x.replace(":']", "")) + + df["gols_vis"] = df["gols_vis"].map(lambda x: x.replace("[':", "")) + df["gols_vis"] = df["gols_vis"].map(lambda x: x.replace("']", "")) + + df["gols_penalti_man"] = df["gols_penalti_man"].apply( + lambda x: x.replace("['", "") if pd.notna(x) else x + ) + df["gols_penalti_man"] = df["gols_penalti_man"].apply( + lambda x: x.replace(":']", "") if pd.notna(x) else x + ) + + df["gols_penalti_vis"] = df["gols_penalti_vis"].apply( + lambda x: x.replace("[':", "") if pd.notna(x) else x + ) + df["gols_penalti_vis"] = df["gols_penalti_vis"].apply( + lambda x: x.replace("']", "") if pd.notna(x) else x + ) + + df["gols_1_tempo_vis"] = df["gols_1_tempo_vis"].map( + lambda x: str(x).replace(")", "") + ) + df["gols_1_tempo_man"] = df["gols_1_tempo_man"].map( + lambda x: str(x).replace("(", "") + ) + + df_valor["valor_equipe_titular_man"] = df_valor["valor_equipe_titular_man"].map( + lambda x: str(x).replace("m", "0000") + ) + df_valor["valor_equipe_titular_man"] = df_valor["valor_equipe_titular_man"].map( + lambda x: str(x).replace("k", "000") + ) + df_valor["valor_equipe_titular_man"] = df_valor["valor_equipe_titular_man"].map( + lambda x: str(x).replace(".", "") + ) + + df_valor["valor_equipe_titular_vis"] = df_valor["valor_equipe_titular_vis"].map( + lambda x: str(x).replace("m", "0000") + ) + df_valor["valor_equipe_titular_vis"] = df_valor["valor_equipe_titular_vis"].map( + lambda x: str(x).replace("k", "000") + ) + df_valor["valor_equipe_titular_vis"] = df_valor["valor_equipe_titular_vis"].map( + lambda x: str(x).replace(".", "") + ) + + df["publico_max"] = df["publico_max"].map(lambda x: str(x).replace(".", "")) + df["publico"] = df["publico"].map(lambda x: str(x).replace(".", "")) + + # Extrair a parte antes do traço + df["tipo_fase"] = df["fase"].str.extract(r"(.+)\s*-\s*(.*)")[1] + + # Substituir as células vazias na coluna 'tipo_fase' por "Jogo único" + df["tipo_fase"].fillna("Jogo único", inplace=True) + + # Atualizar a coluna 'fase' com a parte antes do traço ou a própria 'fase' se não houver traço + df["fase"] = df["fase"].str.extract(r"(.+)\s*-\s*(.*)")[0].fillna(df["fase"]) + + df["data"] = pd.to_datetime(df["data"], format="%d/%m/%y").dt.date + df["horario"] = pd.to_datetime(df["horario"], format="%H:%M").dt.strftime("%H:%M") + df["ano_campeonato"] = mundo_constants.DATA_ATUAL_ANO.value + + df = pd.concat([df, df_valor], axis=1) + df.fillna("", inplace=True) + df = df[mundo_constants.ORDEM_COPA_BRASIL.value] + + return df From dcaebb186e16e3fa6f4d113c241a38d4dda46072 Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Wed, 20 Sep 2023 14:30:59 -0300 Subject: [PATCH 29/43] testa udpate_metadata --- .../flows.py | 23 +++++++++- .../tasks.py | 44 +++++++++++++++++++ 2 files changed, 65 insertions(+), 2 deletions(-) diff --git a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py index f364af874..b5666c720 100644 --- a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py +++ b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py @@ -9,6 +9,7 @@ from prefect.storage import GCS from prefect import Parameter, case from prefect.tasks.prefect import create_flow_run, wait_for_flow_run +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants from pipelines.utils.constants import constants as utils_constants @@ -16,12 +17,11 @@ from pipelines.datasets.br_cvm_oferta_publica_distribuicao.tasks import ( crawl, clean_table_oferta_distribuicao, + extract_last_date, ) from pipelines.utils.decorators import Flow from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, - update_metadata, - get_temporal_coverage, rename_current_flow_run_dataset_table, get_current_flow_labels, ) @@ -45,6 +45,7 @@ "materialize after dump", default=True, required=False ) dbt_alias = Parameter("dbt_alias", default=False, required=False) + update_metadata = Parameter("update_metadata", default=False, required=False) rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id @@ -89,6 +90,24 @@ wait_for_materialization.retry_delay = timedelta( seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + data = extract_last_date( + dataset_id, table_id, "basedosdados", var_name="data_abertura_processo" + ) + + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + _last_date=data, + bq_last_update=False, + api_mode="prod", + date_format="yy-mm-dd", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="months", + upstream_tasks=[materialization_flow, data], + ) br_cvm_ofe_pub_dis_dia.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_cvm_ofe_pub_dis_dia.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) diff --git a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py index aacd4b9f9..e77a601ac 100644 --- a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py +++ b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py @@ -9,6 +9,9 @@ from pandas.api.types import is_string_dtype from prefect import task from unidecode import unidecode +import basedosdados as bd +from datetime import datetime +from pipelines.utils.utils import log @task @@ -66,3 +69,44 @@ def clean_table_oferta_distribuicao(root: str) -> str: dataframe.to_csv(ou_filepath, index=False, encoding="utf-8") return ou_filepath + + +@task +def extract_last_date( + dataset_id: str, + table_id: str, + billing_project_id: str, + var_name: str, +) -> datetime: + """ + Extracts the last update date of a given dataset table. + + Args: + dataset_id (str): The ID of the dataset. + table_id (str): The ID of the table. + billing_project_id (str): The billing project ID. + + Returns: + str: The last update date in the format 'yyyy-mm-dd'. + + Raises: + Exception: If an error occurs while extracting the last update date. + """ + + query_bd = f""" + SELECT MAX({var_name}) as max_date + FROM + `{billing_project_id}.{dataset_id}.{table_id}` + """ + + t = bd.read_sql( + query=query_bd, + billing_project_id=billing_project_id, + from_file=True, + ) + + data = t["max_date"][0] + + log(f"A data mais recente da tabela é: {data}") + + return str(data) From fcf8a42af188c7b2968b5f6da650b5b24097d22c Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Wed, 20 Sep 2023 14:35:06 -0300 Subject: [PATCH 30/43] =?UTF-8?q?insere=20execu=C3=A7=C3=A3o=20condicional?= =?UTF-8?q?=20para=20update=5Fmetadata?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../flows.py | 35 ++++++++++--------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py index b5666c720..05953bdf9 100644 --- a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py +++ b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py @@ -90,24 +90,25 @@ wait_for_materialization.retry_delay = timedelta( seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - data = extract_last_date( - dataset_id, table_id, "basedosdados", var_name="data_abertura_processo" - ) - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - _last_date=data, - bq_last_update=False, - api_mode="prod", - date_format="yy-mm-dd", - is_bd_pro=True, - is_free=True, - time_delta=6, - time_unit="months", - upstream_tasks=[materialization_flow, data], - ) + with case(update_metadata, True): + data = extract_last_date( + dataset_id, table_id, "basedosdados", var_name="data_abertura_processo" + ) + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + _last_date=data, + bq_last_update=False, + api_mode="prod", + date_format="yy-mm-dd", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="months", + upstream_tasks=[materialization_flow, data], + ) br_cvm_ofe_pub_dis_dia.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_cvm_ofe_pub_dis_dia.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) From e52dcd17aaa753365b2c69b9966d73d4d943b1a8 Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Wed, 20 Sep 2023 15:21:16 -0300 Subject: [PATCH 31/43] regista o flow --- .../datasets/br_cvm_oferta_publica_distribuicao/flows.py | 2 +- .../datasets/br_cvm_oferta_publica_distribuicao/tasks.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py index 05953bdf9..abd7eb368 100644 --- a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py +++ b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py @@ -31,7 +31,7 @@ URL = "http://dados.cvm.gov.br/dados/OFERTA/DISTRIB/DADOS/oferta_distribuicao.csv" with Flow( - name="br_cvm_oferta_publica_distribuicao.dia", code_owners=["lucas_cr"] + name="br_cvm_oferta_publica_distribuicao.dia", code_owners=["Equipe Pipelines"] ) as br_cvm_ofe_pub_dis_dia: # Parameters dataset_id = Parameter( diff --git a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py index e77a601ac..79f69063a 100644 --- a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py +++ b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py @@ -77,7 +77,7 @@ def extract_last_date( table_id: str, billing_project_id: str, var_name: str, -) -> datetime: +) -> str: """ Extracts the last update date of a given dataset table. @@ -92,12 +92,14 @@ def extract_last_date( Raises: Exception: If an error occurs while extracting the last update date. """ + log("dasdasdsadas") query_bd = f""" SELECT MAX({var_name}) as max_date FROM `{billing_project_id}.{dataset_id}.{table_id}` """ + log("dasdasdsadas") t = bd.read_sql( query=query_bd, From 5b73f44932cb626365b09540ffd75c39e07e8f7a Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Wed, 20 Sep 2023 15:26:51 -0300 Subject: [PATCH 32/43] registra novamente --- pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py index 79f69063a..f96d43e40 100644 --- a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py +++ b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py @@ -99,6 +99,7 @@ def extract_last_date( FROM `{billing_project_id}.{dataset_id}.{table_id}` """ + log(f"{query_bd}") log("dasdasdsadas") t = bd.read_sql( @@ -106,7 +107,7 @@ def extract_last_date( billing_project_id=billing_project_id, from_file=True, ) - + log(f"{t}") data = t["max_date"][0] log(f"A data mais recente da tabela é: {data}") From 079cfc48fef92f38d0f351c6b4ad14ef718cf7df Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Wed, 20 Sep 2023 16:17:08 -0300 Subject: [PATCH 33/43] =?UTF-8?q?testa=20altera=C3=A7=C3=B5es?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../datasets/br_cvm_oferta_publica_distribuicao/flows.py | 4 +--- .../datasets/br_cvm_oferta_publica_distribuicao/tasks.py | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py index abd7eb368..f31bd2d7e 100644 --- a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py +++ b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py @@ -92,9 +92,7 @@ ) with case(update_metadata, True): - data = extract_last_date( - dataset_id, table_id, "basedosdados", var_name="data_abertura_processo" - ) + data = extract_last_date(dataset_id, table_id, "basedosdados") update_django_metadata( dataset_id, table_id, diff --git a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py index f96d43e40..0a1aaf362 100644 --- a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py +++ b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py @@ -95,7 +95,7 @@ def extract_last_date( log("dasdasdsadas") query_bd = f""" - SELECT MAX({var_name}) as max_date + SELECT MAX(data_abertura_processo) as max_date FROM `{billing_project_id}.{dataset_id}.{table_id}` """ From 73471356c5aa313a1b921db3258d82405a4142ba Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Wed, 20 Sep 2023 16:23:55 -0300 Subject: [PATCH 34/43] =?UTF-8?q?corrige=20par=C3=A2metro=20da=20fun=C3=A7?= =?UTF-8?q?=C3=A3o=20extract=5Flast=5Fdate?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../datasets/br_cvm_oferta_publica_distribuicao/tasks.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py index 0a1aaf362..59c063108 100644 --- a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py +++ b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py @@ -72,12 +72,7 @@ def clean_table_oferta_distribuicao(root: str) -> str: @task -def extract_last_date( - dataset_id: str, - table_id: str, - billing_project_id: str, - var_name: str, -) -> str: +def extract_last_date(dataset_id: str, table_id: str, billing_project_id: str) -> str: """ Extracts the last update date of a given dataset table. From 78661dbc269c34edc7100c6efe38672ed9f288c8 Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Wed, 20 Sep 2023 17:29:40 -0300 Subject: [PATCH 35/43] insere update_metadta em br_cvm_administradores_carteira --- .../br_cvm_administradores_carteira/flows.py | 98 +++++++++---------- .../br_cvm_administradores_carteira/tasks.py | 44 +++++++++ 2 files changed, 89 insertions(+), 53 deletions(-) diff --git a/pipelines/datasets/br_cvm_administradores_carteira/flows.py b/pipelines/datasets/br_cvm_administradores_carteira/flows.py index 6741af85f..530db4741 100644 --- a/pipelines/datasets/br_cvm_administradores_carteira/flows.py +++ b/pipelines/datasets/br_cvm_administradores_carteira/flows.py @@ -21,13 +21,15 @@ clean_table_responsavel, clean_table_pessoa_fisica, clean_table_pessoa_juridica, + extract_last_date, ) +from pipelines.utils.metadata.tasks import update_django_metadata + from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, - update_metadata, get_temporal_coverage, rename_current_flow_run_dataset_table, get_current_flow_labels, @@ -37,7 +39,7 @@ URL = "http://dados.cvm.gov.br/dados/ADM_CART/CAD/DADOS/cad_adm_cart.zip" with Flow( - name="br_cvm_administradores_carteira.responsavel", code_owners=["lucas_cr"] + name="br_cvm_administradores_carteira.responsavel", code_owners=["Equipe Pipelines"] ) as br_cvm_adm_car_res: # Parameters dataset_id = Parameter( @@ -68,17 +70,7 @@ dump_mode="append", wait=filepath, ) - - # no generate temporal coverage since there is no date variable - # wait_update_metadata = update_metadata( - # dataset_id=dataset_id, - # table_id=table_id, - # fields_to_update=[ - # {"last_updated": {"data": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}} - # ], - # upstream_tasks=[wait_upload_table], - # ) - + # dont generate temporal coverage since there's no date variable with case(materialize_after_dump, True): # Trigger DBT flow run current_flow_labels = get_current_flow_labels() @@ -113,7 +105,7 @@ br_cvm_adm_car_res.schedule = schedule_responsavel with Flow( - "br_cvm_administradores_carteira.pessoa_fisica", code_owners=["lucas_cr"] + "br_cvm_administradores_carteira.pessoa_fisica", code_owners=["Equipe Pipelines"] ) as br_cvm_adm_car_pes_fis: # Parameters dataset_id = Parameter( @@ -129,6 +121,7 @@ "materialize_after_dump", default=True, required=False ) dbt_alias = Parameter("dbt_alias", default=False, required=False) + update_metadata = Parameter("update_metadata", default=False, required=False) rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id @@ -145,25 +138,6 @@ wait=filepath, ) - # update_metadata - temporal_coverage = get_temporal_coverage( - filepath=filepath, - date_cols=["data_registro"], - time_unit="day", - interval="1", - upstream_tasks=[wait_upload_table], - ) - - # wait_update_metadata = update_metadata( - # dataset_id=dataset_id, - # table_id=table_id, - # fields_to_update=[ - # {"last_updated": {"data": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}}, - # {"temporal_coverage": [temporal_coverage]}, - # ], - # upstream_tasks=[temporal_coverage], - # ) - with case(materialize_after_dump, True): # Trigger DBT flow run current_flow_labels = get_current_flow_labels() @@ -192,13 +166,31 @@ wait_for_materialization.retry_delay = timedelta( seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + with case(update_metadata, True): + data = extract_last_date( + dataset_id, table_id, "basedosdados", var_name="data_registro" + ) + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + _last_date=data, + bq_last_update=False, + api_mode="prod", + date_format="yy-mm-dd", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="months", + upstream_tasks=[materialization_flow, data], + ) br_cvm_adm_car_pes_fis.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_cvm_adm_car_pes_fis.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) br_cvm_adm_car_pes_fis.schedule = schedule_fisica with Flow( - "br_cvm_administradores_carteira.pessoa_juridica", code_owners=["lucas_cr"] + "br_cvm_administradores_carteira.pessoa_juridica", code_owners=["Equipe Pipelines"] ) as br_cvm_adm_car_pes_jur: # Parameters dataset_id = Parameter( @@ -214,6 +206,7 @@ "materialize_after_dump", default=True, required=False ) dbt_alias = Parameter("dbt_alias", default=False, required=False) + update_metadata = Parameter("update_metadata", default=False, required=False) rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id @@ -230,25 +223,6 @@ wait=filepath, ) - # update_metadata - temporal_coverage = get_temporal_coverage( - filepath=filepath, - date_cols=["data_registro"], - time_unit="day", - interval="1", - upstream_tasks=[wait_upload_table], - ) - - # wait_update_metadata = update_metadata( - # dataset_id=dataset_id, - # table_id=table_id, - # fields_to_update=[ - # {"last_updated": {"data": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}}, - # {"temporal_coverage": [temporal_coverage]}, - # ], - # upstream_tasks=[temporal_coverage], - # ) - with case(materialize_after_dump, True): # Trigger DBT flow run current_flow_labels = get_current_flow_labels() @@ -278,6 +252,24 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + with case(update_metadata, True): + data = extract_last_date( + dataset_id, table_id, "basedosdados", var_name="data_registro" + ) + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + _last_date=data, + bq_last_update=False, + api_mode="prod", + date_format="yy-mm-dd", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="months", + upstream_tasks=[materialization_flow, data], + ) br_cvm_adm_car_pes_jur.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_cvm_adm_car_pes_jur.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) br_cvm_adm_car_pes_jur.schedule = schedule_juridica diff --git a/pipelines/datasets/br_cvm_administradores_carteira/tasks.py b/pipelines/datasets/br_cvm_administradores_carteira/tasks.py index 8a3dc2cbf..f6268bf0f 100644 --- a/pipelines/datasets/br_cvm_administradores_carteira/tasks.py +++ b/pipelines/datasets/br_cvm_administradores_carteira/tasks.py @@ -12,6 +12,9 @@ from pandas.api.types import is_string_dtype from prefect import task from unidecode import unidecode +import basedosdados as bd +from datetime import datetime +from pipelines.utils.utils import log @task @@ -150,3 +153,44 @@ def clean_table_pessoa_juridica(root: str) -> str: dataframe.to_csv(ou_filepath, index=False) return ou_filepath + + +@task +def extract_last_date( + dataset_id: str, + table_id: str, + billing_project_id: str, + var_name: str, +) -> datetime: + """ + Extracts the last update date of a given dataset table. + + Args: + dataset_id (str): The ID of the dataset. + table_id (str): The ID of the table. + billing_project_id (str): The billing project ID. + + Returns: + str: The last update date in the format 'yyyy-mm-dd'. + + Raises: + Exception: If an error occurs while extracting the last update date. + """ + + query_bd = f""" + SELECT MAX({var_name}) as max_date + FROM + `{billing_project_id}.{dataset_id}.{table_id}` + """ + + t = bd.read_sql( + query=query_bd, + billing_project_id=billing_project_id, + from_file=True, + ) + + data = t["max_date"][0] + + log(f"A data mais recente da tabela é: {data}") + + return str(data) From c90d7e00f12c9d06511c209dcd093132f40cf6c0 Mon Sep 17 00:00:00 2001 From: Gagabrielle-carv Date: Wed, 20 Sep 2023 18:59:22 -0300 Subject: [PATCH 36/43] executa_coleta --- .../mundo_transfermarkt_competicoes/flows.py | 4 ++-- .../mundo_transfermarkt_competicoes/tasks.py | 12 ++++++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py b/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py index b8744b493..e00af2d7f 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py @@ -60,7 +60,7 @@ rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id ) - df = execucao_coleta_sync(execucao_coleta) + df = execucao_coleta_sync(table_id) output_filepath = make_partitions(df, upstream_tasks=[df]) data_maxima = get_max_data(output_filepath, upstream_tasks=[output_filepath]) @@ -143,7 +143,7 @@ rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id ) - df = execucao_coleta_sync(execucao_coleta_copa) + df = execucao_coleta_sync(table_id) output_filepath = make_partitions(df, upstream_tasks=[df]) data_maxima = get_max_data(output_filepath, upstream_tasks=[output_filepath]) diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py b/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py index bece486bf..22fabd1dc 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py @@ -7,6 +7,10 @@ from pipelines.datasets.mundo_transfermarkt_competicoes.constants import ( constants as mundo_constants, ) +from pipelines.datasets.mundo_transfermarkt_competicoes.utils import ( + execucao_coleta_copa, + execucao_coleta, +) from pipelines.utils.utils import log, to_partitions from prefect import task import re @@ -17,10 +21,14 @@ @task -def execucao_coleta_sync(execucao_coleta): +def execucao_coleta_sync(tabela): # Obter o loop de eventos atual e executar a tarefa nele loop = asyncio.get_event_loop() - df = loop.run_until_complete(execucao_coleta()) + if tabela == "brasileirao_serie_a": + df = loop.run_until_complete(execucao_coleta()) + else: + df = loop.run_until_complete(execucao_coleta_copa()) + return df From b5846193ebcff73012e73d6ab54388299c0b19ee Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Wed, 20 Sep 2023 19:03:22 -0300 Subject: [PATCH 37/43] insere schedules --- .../br_cvm_administradores_carteira/flows.py | 2 +- .../br_cvm_administradores_carteira/schedules.py | 7 +++++-- .../br_cvm_oferta_publica_distribuicao/flows.py | 6 ++++-- .../schedules.py | 3 ++- .../br_cvm_oferta_publica_distribuicao/tasks.py | 14 ++++++++------ 5 files changed, 20 insertions(+), 12 deletions(-) diff --git a/pipelines/datasets/br_cvm_administradores_carteira/flows.py b/pipelines/datasets/br_cvm_administradores_carteira/flows.py index 530db4741..457d2d9f0 100644 --- a/pipelines/datasets/br_cvm_administradores_carteira/flows.py +++ b/pipelines/datasets/br_cvm_administradores_carteira/flows.py @@ -268,7 +268,7 @@ is_free=True, time_delta=6, time_unit="months", - upstream_tasks=[materialization_flow, data], + upstream_tasks=[wait_for_materialization, data], ) br_cvm_adm_car_pes_jur.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_cvm_adm_car_pes_jur.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) diff --git a/pipelines/datasets/br_cvm_administradores_carteira/schedules.py b/pipelines/datasets/br_cvm_administradores_carteira/schedules.py index 1396ab820..28c5d4b4c 100644 --- a/pipelines/datasets/br_cvm_administradores_carteira/schedules.py +++ b/pipelines/datasets/br_cvm_administradores_carteira/schedules.py @@ -22,6 +22,7 @@ "materialization_mode": "prod", "materialize_after_dump": True, "table_id": "responsavel", + "dbt_alias": True, }, ) ], @@ -41,7 +42,8 @@ "materialization_mode": "prod", "materialize_after_dump": True, "table_id": "pessoa_fisica", - "dbt_alias": False, + "udpate_metadata": True, + "dbt_alias": True, }, ) ], @@ -61,7 +63,8 @@ "materialization_mode": "prod", "materialize_after_dump": True, "table_id": "pessoa_juridica", - "dbt_alias": False, + "udpate_metadata": True, + "dbt_alias": True, }, ) ], diff --git a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py index f31bd2d7e..011d164f8 100644 --- a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py +++ b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py @@ -92,7 +92,9 @@ ) with case(update_metadata, True): - data = extract_last_date(dataset_id, table_id, "basedosdados") + data = extract_last_date( + dataset_id, table_id, "basedosdados", var_name="data_abertura_processo" + ) update_django_metadata( dataset_id, table_id, @@ -105,7 +107,7 @@ is_free=True, time_delta=6, time_unit="months", - upstream_tasks=[materialization_flow, data], + upstream_tasks=[wait_for_materialization, data], ) br_cvm_ofe_pub_dis_dia.storage = GCS(constants.GCS_FLOWS_BUCKET.value) diff --git a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/schedules.py b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/schedules.py index 810ca7a4e..978f90d68 100644 --- a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/schedules.py +++ b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/schedules.py @@ -21,7 +21,8 @@ "materialization_mode": "prod", "materialize after dump": True, "table_id": "dia", - "dbt_alias": False, + "udpate_metadata": True, + "dbt_alias": True, }, ), ], diff --git a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py index 59c063108..e77a601ac 100644 --- a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py +++ b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py @@ -72,7 +72,12 @@ def clean_table_oferta_distribuicao(root: str) -> str: @task -def extract_last_date(dataset_id: str, table_id: str, billing_project_id: str) -> str: +def extract_last_date( + dataset_id: str, + table_id: str, + billing_project_id: str, + var_name: str, +) -> datetime: """ Extracts the last update date of a given dataset table. @@ -87,22 +92,19 @@ def extract_last_date(dataset_id: str, table_id: str, billing_project_id: str) - Raises: Exception: If an error occurs while extracting the last update date. """ - log("dasdasdsadas") query_bd = f""" - SELECT MAX(data_abertura_processo) as max_date + SELECT MAX({var_name}) as max_date FROM `{billing_project_id}.{dataset_id}.{table_id}` """ - log(f"{query_bd}") - log("dasdasdsadas") t = bd.read_sql( query=query_bd, billing_project_id=billing_project_id, from_file=True, ) - log(f"{t}") + data = t["max_date"][0] log(f"A data mais recente da tabela é: {data}") From 882b91b2e2e3fbc7c997ad22fbd511b896ffb50b Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Thu, 21 Sep 2023 06:43:00 -0300 Subject: [PATCH 38/43] debuga cvm_oferta_publica_distribuicao --- .../datasets/br_cvm_administradores_carteira/tasks.py | 2 +- .../datasets/br_cvm_oferta_publica_distribuicao/flows.py | 2 +- .../datasets/br_cvm_oferta_publica_distribuicao/tasks.py | 7 ++++--- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pipelines/datasets/br_cvm_administradores_carteira/tasks.py b/pipelines/datasets/br_cvm_administradores_carteira/tasks.py index f6268bf0f..8128d25b8 100644 --- a/pipelines/datasets/br_cvm_administradores_carteira/tasks.py +++ b/pipelines/datasets/br_cvm_administradores_carteira/tasks.py @@ -161,7 +161,7 @@ def extract_last_date( table_id: str, billing_project_id: str, var_name: str, -) -> datetime: +) -> str: """ Extracts the last update date of a given dataset table. diff --git a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py index 011d164f8..d5b17c552 100644 --- a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py +++ b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py @@ -44,7 +44,7 @@ materialize_after_dump = Parameter( "materialize after dump", default=True, required=False ) - dbt_alias = Parameter("dbt_alias", default=False, required=False) + dbt_alias = Parameter("dbt_alias", default=True, required=False) update_metadata = Parameter("update_metadata", default=False, required=False) rename_flow_run = rename_current_flow_run_dataset_table( diff --git a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py index e77a601ac..c2ad29cab 100644 --- a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py +++ b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py @@ -4,7 +4,6 @@ """ import os - import pandas as pd from pandas.api.types import is_string_dtype from prefect import task @@ -77,7 +76,7 @@ def extract_last_date( table_id: str, billing_project_id: str, var_name: str, -) -> datetime: +) -> str: """ Extracts the last update date of a given dataset table. @@ -92,18 +91,20 @@ def extract_last_date( Raises: Exception: If an error occurs while extracting the last update date. """ - + log(f"Extracting last date from {dataset_id}.{table_id}") query_bd = f""" SELECT MAX({var_name}) as max_date FROM `{billing_project_id}.{dataset_id}.{table_id}` """ + log(f"Query: {query_bd}") t = bd.read_sql( query=query_bd, billing_project_id=billing_project_id, from_file=True, ) + log(f"{t}") data = t["max_date"][0] From 56683515c7531d3c364a7b2b1d287e2e2be60425 Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Thu, 21 Sep 2023 07:40:14 -0300 Subject: [PATCH 39/43] atualiza tasks --- .../flows.py | 7 ++-- .../tasks.py | 42 ++----------------- 2 files changed, 6 insertions(+), 43 deletions(-) diff --git a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py index d5b17c552..06aed6696 100644 --- a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py +++ b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py @@ -17,7 +17,7 @@ from pipelines.datasets.br_cvm_oferta_publica_distribuicao.tasks import ( crawl, clean_table_oferta_distribuicao, - extract_last_date, + get_today_date, ) from pipelines.utils.decorators import Flow from pipelines.utils.tasks import ( @@ -92,9 +92,8 @@ ) with case(update_metadata, True): - data = extract_last_date( - dataset_id, table_id, "basedosdados", var_name="data_abertura_processo" - ) + data = get_today_date() + update_django_metadata( dataset_id, table_id, diff --git a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py index c2ad29cab..74792b2c0 100644 --- a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py +++ b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py @@ -71,43 +71,7 @@ def clean_table_oferta_distribuicao(root: str) -> str: @task -def extract_last_date( - dataset_id: str, - table_id: str, - billing_project_id: str, - var_name: str, -) -> str: - """ - Extracts the last update date of a given dataset table. - - Args: - dataset_id (str): The ID of the dataset. - table_id (str): The ID of the table. - billing_project_id (str): The billing project ID. - - Returns: - str: The last update date in the format 'yyyy-mm-dd'. - - Raises: - Exception: If an error occurs while extracting the last update date. - """ - log(f"Extracting last date from {dataset_id}.{table_id}") - query_bd = f""" - SELECT MAX({var_name}) as max_date - FROM - `{billing_project_id}.{dataset_id}.{table_id}` - """ - log(f"Query: {query_bd}") - - t = bd.read_sql( - query=query_bd, - billing_project_id=billing_project_id, - from_file=True, - ) - log(f"{t}") - - data = t["max_date"][0] - - log(f"A data mais recente da tabela é: {data}") +def get_today_date() -> str: + d = datetime.today() - return str(data) + return str(d.strftime("%Y-%m-%d")) From 2bb7e9566e12dbdeff7f96436d185380a3922fc2 Mon Sep 17 00:00:00 2001 From: Gagabrielle-carv Date: Thu, 21 Sep 2023 09:05:24 -0300 Subject: [PATCH 40/43] upstream_tasks --- pipelines/datasets/mundo_transfermarkt_competicoes/flows.py | 2 ++ .../datasets/mundo_transfermarkt_competicoes/schedules.py | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py b/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py index e00af2d7f..58f4129ad 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py @@ -114,6 +114,7 @@ time_unit="weeks", date_format="yy-mm-dd", api_mode="prod", + upstream_tasks=[materialization_flow], ) transfermarkt_brasileirao_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) @@ -197,6 +198,7 @@ time_unit="year", date_format="yy-mm-dd", api_mode="prod", + upstream_tasks=[materialization_flow], ) transfermarkt_copa_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py b/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py index 59b5dbbea..086fcc3e2 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py @@ -24,7 +24,7 @@ "table_id": "brasileirao_serie_a", "materialization_mode": "prod", "materialize_after_dump": True, - "dbt_alias": False, + "dbt_alias": True, }, ), ] @@ -44,7 +44,7 @@ "table_id": "copa_brasil", "materialization_mode": "prod", "materialize_after_dump": True, - "dbt_alias": False, + "dbt_alias": True, }, ), ] From 993f1cb93e08a13029826246e88c0fe05ea4646f Mon Sep 17 00:00:00 2001 From: Gagabrielle-carv Date: Thu, 21 Sep 2023 14:22:27 -0300 Subject: [PATCH 41/43] fix: \n --- pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py | 2 +- pipelines/datasets/mundo_transfermarkt_competicoes/utils.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py b/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py index 22fabd1dc..28323072e 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py @@ -17,6 +17,7 @@ import numpy as np import pandas as pd import asyncio +import os from datetime import timedelta, datetime @@ -28,7 +29,6 @@ def execucao_coleta_sync(tabela): df = loop.run_until_complete(execucao_coleta()) else: df = loop.run_until_complete(execucao_coleta_copa()) - return df diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/utils.py b/pipelines/datasets/mundo_transfermarkt_competicoes/utils.py index cf1be24f2..dba0e9f62 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/utils.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/utils.py @@ -767,7 +767,7 @@ async def execucao_coleta_copa(): tabela_grand = soup.findAll("div", class_="box")[1] tabela = tabela_grand.findAll("tbody") for i in range(0, len(tabela)): - # for i in range(0, 2): + # for i in range(0, 4): for row in tabela[i].findAll("tr"): if not row.get("class"): td_tags = row.findAll("td") @@ -969,6 +969,7 @@ async def execucao_coleta_copa(): df = pd.concat([df, df_valor], axis=1) df.fillna("", inplace=True) + df["publico_max"] = df["publico_max"].str.replace("\n", "") df = df[mundo_constants.ORDEM_COPA_BRASIL.value] return df From c8e621cc3b6c90cca8ae9e9438dfc05b21ca57a9 Mon Sep 17 00:00:00 2001 From: Gagabrielle-carv Date: Thu, 21 Sep 2023 18:11:48 -0300 Subject: [PATCH 42/43] Schedule --- pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py b/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py index 086fcc3e2..6ca65e2a6 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py @@ -34,7 +34,7 @@ every_week_copa = Schedule( clocks=[ CronClock( - cron="0 9 * 2-12 2", + cron="0 9 * 2-10 2", start_date=datetime(2023, 5, 1, 7, 30), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, From 8238d9bf1178bb395bfa1ec341855254577a945e Mon Sep 17 00:00:00 2001 From: Gagabrielle-carv Date: Thu, 21 Sep 2023 19:23:40 -0300 Subject: [PATCH 43/43] update_django_metadata --- pipelines/datasets/mundo_transfermarkt_competicoes/flows.py | 4 ++-- pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py b/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py index 58f4129ad..b1cfa0c5e 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py @@ -194,8 +194,8 @@ bq_last_update=False, is_bd_pro=True, is_free=True, - time_delta=1, - time_unit="year", + time_delta=6, + time_unit="months", date_format="yy-mm-dd", api_mode="prod", upstream_tasks=[materialization_flow], diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py b/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py index 28323072e..07084eafb 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py @@ -50,7 +50,7 @@ def get_max_data(file_path): ano = mundo_constants.DATA_ATUAL_ANO.value df = pd.read_csv(f"{file_path}ano_campeonato={ano}/data.csv") df["data"] = pd.to_datetime(df["data"]).dt.date - max_data = df["data"].max() + max_data = df["data"].max().strftime("%Y-%m-%d") # max_data = mundo_constants.DATA_ATUAL.value return max_data