diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6a5844963..0060bee4e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -9,7 +9,7 @@ repos: - id: no-commit-to-branch # prevents committing to protected branches - id: trailing-whitespace # prevents trailing whitespace - repo: https://github.com/psf/black - rev: 23.7.0 + rev: 23.9.1 hooks: - id: black diff --git a/pipelines/datasets/br_anp_precos_combustiveis/constants.py b/pipelines/datasets/br_anp_precos_combustiveis/constants.py index b9299317b..d35088bfd 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/constants.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/constants.py @@ -48,6 +48,10 @@ class constants(Enum): # pylint: disable=c0103 "https://www.gov.br/anp/pt-br/centrais-de-conteudo/dados-abertos/arquivos/shpc/qus/ultimas-4-semanas-diesel-gnv.csv", ] + URLS_DATA = [ + "https://www.gov.br/anp/pt-br/centrais-de-conteudo/dados-abertos/arquivos/shpc/qus/ultimas-4-semanas-glp.csv" + ] + PATH_INPUT = "/tmp/input/" PATH_OUTPUT = "/tmp/output/" diff --git a/pipelines/datasets/br_anp_precos_combustiveis/flows.py b/pipelines/datasets/br_anp_precos_combustiveis/flows.py index 64e3c564d..4a3c2a15e 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/flows.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/flows.py @@ -13,11 +13,13 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.datasets.br_anp_precos_combustiveis.utils import download_files from pipelines.datasets.br_anp_precos_combustiveis.tasks import ( - tratamento, + download_and_transform, data_max_bd_mais, data_max_bd_pro, make_partitions, + check_for_updates, ) from pipelines.datasets.br_anp_precos_combustiveis.schedules import ( every_week_anp_microdados, @@ -26,6 +28,11 @@ create_table_and_upload_to_gcs, rename_current_flow_run_dataset_table, get_current_flow_labels, + log_task, +) + +from pipelines.datasets.br_anp_precos_combustiveis.constants import ( + constants as anatel_constants, ) with Flow( @@ -37,7 +44,7 @@ table_id = Parameter("table_id", default="microdados", required=True) materialization_mode = Parameter( - "materialization_mode", default="prod", required=False + "materialization_mode", default="dev", required=False ) materialize_after_dump = Parameter( @@ -51,49 +58,19 @@ ) update_metadata = Parameter("update_metadata", default=True, required=False) - df = tratamento(upstream_tasks=[rename_flow_run]) - output_path = make_partitions(df=df, upstream_tasks=[df]) - get_date_max_mais = data_max_bd_mais() - get_date_max_pro = data_max_bd_pro(df=df) - - # pylint: disable=C0103 - wait_upload_table = create_table_and_upload_to_gcs( - data_path=output_path, - dataset_id=dataset_id, - table_id=table_id, - dump_mode="append", - wait=output_path, - ) - - # ! BD MAIS - Atrasado - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id, - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", + dados_desatualizados = check_for_updates(dataset_id=dataset_id, table_id=table_id) + log_task(f"Checando se os dados estão desatualizados: {dados_desatualizados}") + with case(dados_desatualizados, False): + log_task( + "Dados atualizados, não é necessário fazer o download", + upstream_tasks=[dados_desatualizados], ) - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) + with case(dados_desatualizados, True): + df = download_and_transform(upstream_tasks=[rename_flow_run]) + output_path = make_partitions(df=df, upstream_tasks=[df]) + get_date_max_pro = data_max_bd_pro(df=df) + with case(update_metadata, True): update_django_metadata( @@ -110,7 +87,57 @@ time_unit="weeks", _last_date=get_date_max_pro, upstream_tasks=[wait_upload_table], + + # pylint: disable=C0103 + wait_upload_table = create_table_and_upload_to_gcs( + data_path=output_path, + dataset_id=dataset_id, + table_id=table_id, + dump_mode="append", + wait=output_path, + ) + + with case(materialize_after_dump, True): + # Trigger DBT flow run + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": table_id, + "mode": materialization_mode, + "dbt_alias": dbt_alias, + }, + labels=current_flow_labels, + run_name=f"Materialize {dataset_id}.{table_id}", + ) + + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + with case(update_metadata, True): + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_last_update=False, + api_mode="prod", + date_format="yy-mm-dd", + _last_date=get_date_max_pro, + upstream_tasks=[wait_upload_table], + ) anp_microdados.storage = GCS(constants.GCS_FLOWS_BUCKET.value) anp_microdados.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) diff --git a/pipelines/datasets/br_anp_precos_combustiveis/schedules.py b/pipelines/datasets/br_anp_precos_combustiveis/schedules.py index a1c3751ee..9c660ab04 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/schedules.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/schedules.py @@ -4,15 +4,14 @@ """ from datetime import timedelta, datetime - from prefect.schedules import Schedule -from prefect.schedules.clocks import IntervalClock +from prefect.schedules.clocks import CronClock from pipelines.constants import constants every_week_anp_microdados = Schedule( clocks=[ - IntervalClock( - interval=timedelta(days=1), + CronClock( + cron="0 10 * * *", start_date=datetime(2021, 1, 1), labels=[constants.BASEDOSDADOS_PROD_AGENT_LABEL.value], parameter_defaults={ diff --git a/pipelines/datasets/br_anp_precos_combustiveis/tasks.py b/pipelines/datasets/br_anp_precos_combustiveis/tasks.py index b8a226343..48c911278 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/tasks.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/tasks.py @@ -12,99 +12,84 @@ get_id_municipio, open_csvs, partition_data, + merge_table_id_municipio, + orderning_data_coleta, + creating_column_ano, + rename_and_reordening, + rename_columns, + rename_and_to_create_endereco, + lower_colunm_produto, ) from pipelines.datasets.br_anp_precos_combustiveis.constants import ( constants as anatel_constants, ) -from pipelines.utils.utils import log +from pipelines.utils.utils import log, extract_last_date from pipelines.constants import constants +@task +def check_for_updates(dataset_id, table_id): + """ + Checks if there are available updates for a specific dataset and table. + + Returns: + bool: Returns True if updates are available, otherwise returns False. + """ + # Obtém a data mais recente do site + download_files(anatel_constants.URLS_DATA.value, anatel_constants.PATH_INPUT.value) + df = pd.read_csv(anatel_constants.URL_GLP.value, sep=";", encoding="utf-8") + data_obj = df["Data da Coleta"].max() + data_obj = datetime.strptime(data_obj, "%d/%m/%Y").strftime("%Y-%m-%d") + data_obj = datetime.strptime(data_obj, "%Y-%m-%d").date() + + # Obtém a última data no site BD + data_bq_obj = extract_last_date( + dataset_id, table_id, "yy-mm-dd", "basedosdados", data="data_coleta" + ) + + # Registra a data mais recente do site + log(f"Última data no site do ANP: {data_obj}") + log(f"Última data no site da BD: {data_bq_obj}") + + # Compara as datas para verificar se há atualizações + if data_obj > data_bq_obj: + return True # Há atualizações disponíveis + else: + return False # Não há novas atualizações disponíveis + + @task( max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) -def tratamento(): - download_files( - anatel_constants.URLS.value, - anatel_constants.PATH_INPUT.value, - ) +def download_and_transform(): + download_files(anatel_constants.URLS.value, anatel_constants.PATH_INPUT.value) precos_combustiveis = open_csvs( - url_diesel_gnv=anatel_constants.URL_DIESEL_GNV.value, - url_gasolina_etanol=anatel_constants.URL_GASOLINA_ETANOL.value, - url_glp=anatel_constants.URL_GLP.value, + anatel_constants.URL_DIESEL_GNV.value, + anatel_constants.URL_GASOLINA_ETANOL.value, + anatel_constants.URL_GLP.value, ) - id_municipio = get_id_municipio() - log("Iniciando tratamento dos dados precos_combustiveis") - precos_combustiveis = pd.merge( - id_municipio, - precos_combustiveis, - how="right", - left_on=["nome", "sigla_uf"], - right_on=["Municipio", "Estado - Sigla"], - ) - log("----" * 150) - log("Dados mergeados") - precos_combustiveis.rename(columns={"Municipio": "nome"}, inplace=True) - precos_combustiveis.dropna(subset=["Valor de Venda"], inplace=True) - precos_combustiveis["endereco_revenda"] = ( - precos_combustiveis["Nome da Rua"].fillna("") - + "," - + " " - + precos_combustiveis["Numero Rua"].fillna("") - + "," - + " " - + precos_combustiveis["Complemento"].fillna("") - ) - precos_combustiveis.drop(columns=["sigla_uf"], inplace=True) - precos_combustiveis.rename(columns={"Data da Coleta": "data_coleta"}, inplace=True) - precos_combustiveis["data_coleta"] = ( - precos_combustiveis["data_coleta"].str[6:10] - + "-" - + precos_combustiveis["data_coleta"].str[3:5] - + "-" - + precos_combustiveis["data_coleta"].str[0:2] - ) - precos_combustiveis["Produto"] = precos_combustiveis["Produto"].str.lower() - precos_combustiveis["ano"] = precos_combustiveis["data_coleta"].str[0:4] - precos_combustiveis["ano"].replace("nan", "", inplace=True) - precos_combustiveis.rename(columns=anatel_constants.RENAME.value, inplace=True) - precos_combustiveis = precos_combustiveis[anatel_constants.ORDEM.value] - precos_combustiveis["ano"] = precos_combustiveis["ano"].apply( - lambda x: str(x).replace(".0", "") - ) - precos_combustiveis["cep_revenda"] = precos_combustiveis["cep_revenda"].apply( - lambda x: str(x).replace("-", "") - ) - precos_combustiveis["unidade_medida"] = precos_combustiveis["unidade_medida"].map( - {"R$ / litro": "R$/litro", "R$ / m³": "R$/m3", "R$ / 13 kg": "R$/13kg"} - ) - precos_combustiveis["nome_estabelecimento"] = precos_combustiveis[ - "nome_estabelecimento" - ].apply(lambda x: str(x).replace(",", "")) - precos_combustiveis["preco_compra"] = precos_combustiveis["preco_compra"].apply( - lambda x: str(x).replace(",", ".") - ) - precos_combustiveis["preco_venda"] = precos_combustiveis["preco_venda"].apply( - lambda x: str(x).replace(",", ".") - ) - precos_combustiveis["preco_venda"] = precos_combustiveis["preco_venda"].replace( - "nan", "" - ) - precos_combustiveis["preco_compra"] = precos_combustiveis["preco_compra"].replace( - "nan", "" + df = get_id_municipio(id_municipio=precos_combustiveis) + + df = merge_table_id_municipio( + id_municipio=df, pd_precos_combustiveis=precos_combustiveis ) - precos_combustiveis.replace(np.nan, "", inplace=True) - log("----" * 150) - log("Dados tratados com sucesso") - log("----" * 150) - log("Iniciando particionamento dos dados") - log("----" * 150) - log(precos_combustiveis["data_coleta"].unique()) - - return precos_combustiveis + + df = rename_and_to_create_endereco(precos_combustiveis=df) + + df = orderning_data_coleta(precos_combustiveis=df) + + df = lower_colunm_produto(precos_combustiveis=df) + + df = creating_column_ano(precos_combustiveis=df) + + df = rename_and_reordening(precos_combustiveis=df) + + df = rename_columns(precos_combustiveis=df) + + return df @task( diff --git a/pipelines/datasets/br_anp_precos_combustiveis/utils.py b/pipelines/datasets/br_anp_precos_combustiveis/utils.py index 8f8bc3d3b..38cf5eef1 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/utils.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/utils.py @@ -9,6 +9,10 @@ import requests from pipelines.utils.utils import log from datetime import datetime +from pipelines.datasets.br_anp_precos_combustiveis.constants import ( + constants as anatel_constants, +) +import numpy as np def download_files(urls, path): @@ -28,10 +32,8 @@ def download_files(urls, path): for url in urls: response = requests.get(url) if response.status_code == 200: - file_name = url.split("/")[-1] # Get the filename from the URL + file_name = url.split("/")[-1] file_path = os.path.join(path, file_name) - # log("----" * 150) - # log("if response.status_code == 200: SUCESSO") with open(file_path, "wb") as file: file.write(response.content) @@ -46,7 +48,7 @@ def download_files(urls, path): return downloaded_files -def get_id_municipio(): +def get_id_municipio(id_municipio: pd.DataFrame): # ! Carregando os dados direto do Diretório de municipio da BD # Para carregar o dado direto no pandas log("Carregando dados do diretório de municípios da BD") @@ -80,21 +82,18 @@ def open_csvs(url_diesel_gnv, url_gasolina_etanol, url_glp): log("Abrindo os arquivos csvs") diesel = pd.read_csv(f"{url_diesel_gnv}", sep=";", encoding="utf-8") log("----" * 150) - # log("Abrindo os arquivos csvs diesel") log("----" * 150) log(diesel["Data da Coleta"].unique()) gasolina = pd.read_csv(f"{url_gasolina_etanol}", sep=";", encoding="utf-8") log("----" * 150) log("Abrindo os arquivos csvs gasolina") log("----" * 150) - # log(gasolina["Data da Coleta"].unique()) glp = pd.read_csv(f"{url_glp}", sep=";", encoding="utf-8") log("Abrindo os arquivos csvs glp") log("----" * 150) # log(glp["Data da Coleta"].unique()) data_frames.extend([diesel, gasolina, glp]) precos_combustiveis = pd.concat(data_frames, ignore_index=True) - # log(precos_combustiveis["Data da Coleta"].unique()) log("----" * 150) log("Dados concatenados com sucesso") log("----" * 150) @@ -115,21 +114,111 @@ def partition_data(df: pd.DataFrame, column_name: list[str], output_directory: s for value in unique_values: value_str = str(value)[:10] date_value = datetime.strptime(value_str, "%Y-%m-%d").date() - # log(date_value) formatted_value = date_value.strftime("%Y-%m-%d") - # log(formatted_value) partition_path = os.path.join( output_directory, f"{column_name}={formatted_value}" ) - # log(f"Salvando dados em {partition_path}") if not os.path.exists(partition_path): os.makedirs(partition_path) df_partition = df[df[column_name] == value].copy() df_partition.drop([column_name], axis=1, inplace=True) - # log(f"df_partition: {df_partition}") csv_path = os.path.join(partition_path, "data.csv") df_partition.to_csv(csv_path, index=False, encoding="utf-8", na_rep="") log(f"Arquivo {csv_path} salvo com sucesso!") + + +def merge_table_id_municipio( + id_municipio: pd.DataFrame, pd_precos_combustiveis: pd.DataFrame +): + log("Iniciando tratamento dos dados precos_combustiveis") + precos_combustiveis = pd.merge( + id_municipio, + pd_precos_combustiveis, + how="right", + left_on=["nome", "sigla_uf"], + right_on=["Municipio", "Estado - Sigla"], + ) + log("----" * 150) + log("Dados mergeados") + + return precos_combustiveis + + +def rename_and_to_create_endereco(precos_combustiveis: pd.DataFrame): + precos_combustiveis["endereco_revenda"] = ( + precos_combustiveis["Nome da Rua"].fillna("") + + "," + + " " + + precos_combustiveis["Numero Rua"].fillna("") + + "," + + " " + + precos_combustiveis["Complemento"].fillna("") + ) + precos_combustiveis.drop(columns=["sigla_uf"], inplace=True) + precos_combustiveis.rename(columns={"Data da Coleta": "data_coleta"}, inplace=True) + + return precos_combustiveis + + +def orderning_data_coleta(precos_combustiveis: pd.DataFrame): + precos_combustiveis["data_coleta"] = ( + precos_combustiveis["data_coleta"].str[6:10] + + "-" + + precos_combustiveis["data_coleta"].str[3:5] + + "-" + + precos_combustiveis["data_coleta"].str[0:2] + ) + + return precos_combustiveis + + +def lower_colunm_produto(precos_combustiveis: pd.DataFrame): + precos_combustiveis["Produto"] = precos_combustiveis["Produto"].str.lower() + return precos_combustiveis + + +def creating_column_ano(precos_combustiveis: pd.DataFrame): + precos_combustiveis["ano"] = precos_combustiveis["data_coleta"].str[0:4] + precos_combustiveis["ano"].replace("nan", "", inplace=True) + + return precos_combustiveis + + +def rename_and_reordening(precos_combustiveis: pd.DataFrame): + precos_combustiveis.rename(columns=anatel_constants.RENAME.value, inplace=True) + precos_combustiveis = precos_combustiveis[anatel_constants.ORDEM.value] + + return precos_combustiveis + + +def rename_columns(precos_combustiveis: pd.DataFrame): + precos_combustiveis["ano"] = precos_combustiveis["ano"].apply( + lambda x: str(x).replace(".0", "") + ) + precos_combustiveis["cep_revenda"] = precos_combustiveis["cep_revenda"].apply( + lambda x: str(x).replace("-", "") + ) + precos_combustiveis["unidade_medida"] = precos_combustiveis["unidade_medida"].map( + {"R$ / litro": "R$/litro", "R$ / m³": "R$/m3", "R$ / 13 kg": "R$/13kg"} + ) + precos_combustiveis["nome_estabelecimento"] = precos_combustiveis[ + "nome_estabelecimento" + ].apply(lambda x: str(x).replace(",", "")) + precos_combustiveis["preco_compra"] = precos_combustiveis["preco_compra"].apply( + lambda x: str(x).replace(",", ".") + ) + precos_combustiveis["preco_venda"] = precos_combustiveis["preco_venda"].apply( + lambda x: str(x).replace(",", ".") + ) + precos_combustiveis["preco_venda"] = precos_combustiveis["preco_venda"].replace( + "nan", "" + ) + precos_combustiveis["preco_compra"] = precos_combustiveis["preco_compra"].replace( + "nan", "" + ) + precos_combustiveis.replace(np.nan, "", inplace=True) + + return precos_combustiveis diff --git a/pipelines/datasets/br_cvm_administradores_carteira/flows.py b/pipelines/datasets/br_cvm_administradores_carteira/flows.py index 6741af85f..457d2d9f0 100644 --- a/pipelines/datasets/br_cvm_administradores_carteira/flows.py +++ b/pipelines/datasets/br_cvm_administradores_carteira/flows.py @@ -21,13 +21,15 @@ clean_table_responsavel, clean_table_pessoa_fisica, clean_table_pessoa_juridica, + extract_last_date, ) +from pipelines.utils.metadata.tasks import update_django_metadata + from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, - update_metadata, get_temporal_coverage, rename_current_flow_run_dataset_table, get_current_flow_labels, @@ -37,7 +39,7 @@ URL = "http://dados.cvm.gov.br/dados/ADM_CART/CAD/DADOS/cad_adm_cart.zip" with Flow( - name="br_cvm_administradores_carteira.responsavel", code_owners=["lucas_cr"] + name="br_cvm_administradores_carteira.responsavel", code_owners=["Equipe Pipelines"] ) as br_cvm_adm_car_res: # Parameters dataset_id = Parameter( @@ -68,17 +70,7 @@ dump_mode="append", wait=filepath, ) - - # no generate temporal coverage since there is no date variable - # wait_update_metadata = update_metadata( - # dataset_id=dataset_id, - # table_id=table_id, - # fields_to_update=[ - # {"last_updated": {"data": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}} - # ], - # upstream_tasks=[wait_upload_table], - # ) - + # dont generate temporal coverage since there's no date variable with case(materialize_after_dump, True): # Trigger DBT flow run current_flow_labels = get_current_flow_labels() @@ -113,7 +105,7 @@ br_cvm_adm_car_res.schedule = schedule_responsavel with Flow( - "br_cvm_administradores_carteira.pessoa_fisica", code_owners=["lucas_cr"] + "br_cvm_administradores_carteira.pessoa_fisica", code_owners=["Equipe Pipelines"] ) as br_cvm_adm_car_pes_fis: # Parameters dataset_id = Parameter( @@ -129,6 +121,7 @@ "materialize_after_dump", default=True, required=False ) dbt_alias = Parameter("dbt_alias", default=False, required=False) + update_metadata = Parameter("update_metadata", default=False, required=False) rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id @@ -145,25 +138,6 @@ wait=filepath, ) - # update_metadata - temporal_coverage = get_temporal_coverage( - filepath=filepath, - date_cols=["data_registro"], - time_unit="day", - interval="1", - upstream_tasks=[wait_upload_table], - ) - - # wait_update_metadata = update_metadata( - # dataset_id=dataset_id, - # table_id=table_id, - # fields_to_update=[ - # {"last_updated": {"data": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}}, - # {"temporal_coverage": [temporal_coverage]}, - # ], - # upstream_tasks=[temporal_coverage], - # ) - with case(materialize_after_dump, True): # Trigger DBT flow run current_flow_labels = get_current_flow_labels() @@ -192,13 +166,31 @@ wait_for_materialization.retry_delay = timedelta( seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + with case(update_metadata, True): + data = extract_last_date( + dataset_id, table_id, "basedosdados", var_name="data_registro" + ) + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + _last_date=data, + bq_last_update=False, + api_mode="prod", + date_format="yy-mm-dd", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="months", + upstream_tasks=[materialization_flow, data], + ) br_cvm_adm_car_pes_fis.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_cvm_adm_car_pes_fis.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) br_cvm_adm_car_pes_fis.schedule = schedule_fisica with Flow( - "br_cvm_administradores_carteira.pessoa_juridica", code_owners=["lucas_cr"] + "br_cvm_administradores_carteira.pessoa_juridica", code_owners=["Equipe Pipelines"] ) as br_cvm_adm_car_pes_jur: # Parameters dataset_id = Parameter( @@ -214,6 +206,7 @@ "materialize_after_dump", default=True, required=False ) dbt_alias = Parameter("dbt_alias", default=False, required=False) + update_metadata = Parameter("update_metadata", default=False, required=False) rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id @@ -230,25 +223,6 @@ wait=filepath, ) - # update_metadata - temporal_coverage = get_temporal_coverage( - filepath=filepath, - date_cols=["data_registro"], - time_unit="day", - interval="1", - upstream_tasks=[wait_upload_table], - ) - - # wait_update_metadata = update_metadata( - # dataset_id=dataset_id, - # table_id=table_id, - # fields_to_update=[ - # {"last_updated": {"data": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}}, - # {"temporal_coverage": [temporal_coverage]}, - # ], - # upstream_tasks=[temporal_coverage], - # ) - with case(materialize_after_dump, True): # Trigger DBT flow run current_flow_labels = get_current_flow_labels() @@ -278,6 +252,24 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + with case(update_metadata, True): + data = extract_last_date( + dataset_id, table_id, "basedosdados", var_name="data_registro" + ) + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + _last_date=data, + bq_last_update=False, + api_mode="prod", + date_format="yy-mm-dd", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="months", + upstream_tasks=[wait_for_materialization, data], + ) br_cvm_adm_car_pes_jur.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_cvm_adm_car_pes_jur.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) br_cvm_adm_car_pes_jur.schedule = schedule_juridica diff --git a/pipelines/datasets/br_cvm_administradores_carteira/schedules.py b/pipelines/datasets/br_cvm_administradores_carteira/schedules.py index 1396ab820..28c5d4b4c 100644 --- a/pipelines/datasets/br_cvm_administradores_carteira/schedules.py +++ b/pipelines/datasets/br_cvm_administradores_carteira/schedules.py @@ -22,6 +22,7 @@ "materialization_mode": "prod", "materialize_after_dump": True, "table_id": "responsavel", + "dbt_alias": True, }, ) ], @@ -41,7 +42,8 @@ "materialization_mode": "prod", "materialize_after_dump": True, "table_id": "pessoa_fisica", - "dbt_alias": False, + "udpate_metadata": True, + "dbt_alias": True, }, ) ], @@ -61,7 +63,8 @@ "materialization_mode": "prod", "materialize_after_dump": True, "table_id": "pessoa_juridica", - "dbt_alias": False, + "udpate_metadata": True, + "dbt_alias": True, }, ) ], diff --git a/pipelines/datasets/br_cvm_administradores_carteira/tasks.py b/pipelines/datasets/br_cvm_administradores_carteira/tasks.py index 8a3dc2cbf..8128d25b8 100644 --- a/pipelines/datasets/br_cvm_administradores_carteira/tasks.py +++ b/pipelines/datasets/br_cvm_administradores_carteira/tasks.py @@ -12,6 +12,9 @@ from pandas.api.types import is_string_dtype from prefect import task from unidecode import unidecode +import basedosdados as bd +from datetime import datetime +from pipelines.utils.utils import log @task @@ -150,3 +153,44 @@ def clean_table_pessoa_juridica(root: str) -> str: dataframe.to_csv(ou_filepath, index=False) return ou_filepath + + +@task +def extract_last_date( + dataset_id: str, + table_id: str, + billing_project_id: str, + var_name: str, +) -> str: + """ + Extracts the last update date of a given dataset table. + + Args: + dataset_id (str): The ID of the dataset. + table_id (str): The ID of the table. + billing_project_id (str): The billing project ID. + + Returns: + str: The last update date in the format 'yyyy-mm-dd'. + + Raises: + Exception: If an error occurs while extracting the last update date. + """ + + query_bd = f""" + SELECT MAX({var_name}) as max_date + FROM + `{billing_project_id}.{dataset_id}.{table_id}` + """ + + t = bd.read_sql( + query=query_bd, + billing_project_id=billing_project_id, + from_file=True, + ) + + data = t["max_date"][0] + + log(f"A data mais recente da tabela é: {data}") + + return str(data) diff --git a/pipelines/datasets/br_cvm_fi/constants.py b/pipelines/datasets/br_cvm_fi/constants.py index 88ce77ca4..edf8391c8 100644 --- a/pipelines/datasets/br_cvm_fi/constants.py +++ b/pipelines/datasets/br_cvm_fi/constants.py @@ -558,3 +558,5 @@ class constants(Enum): # pylint: disable=c0103 URL_BALANCETE = "https://dados.cvm.gov.br/dados/FI/DOC/BALANCETE/DADOS/" ARQUITETURA_URL_BALANCETE = "https://docs.google.com/spreadsheets/d/1eIMo_hYHy89oh6kHRN9Kh0NytUZzr8__/edit#gid=1045172528" + + ARQUITETURA_URL_CDA = "https://docs.google.com/spreadsheets/d/1V2XHBXBB_biC0cLoMZ3FxtbC7CPLxQXZhIY7iJDtsSw/edit#gid=0" diff --git a/pipelines/datasets/br_cvm_fi/tasks.py b/pipelines/datasets/br_cvm_fi/tasks.py index c5ccc817f..491c17f17 100644 --- a/pipelines/datasets/br_cvm_fi/tasks.py +++ b/pipelines/datasets/br_cvm_fi/tasks.py @@ -16,6 +16,7 @@ from rpy2.robjects.packages import importr import rpy2.robjects.packages as rpackages import rpy2.robjects as ro +from rpy2.robjects.vectors import StrVector from rpy2.robjects import pandas2ri from pipelines.datasets.br_cvm_fi.utils import ( sheet_to_df, @@ -241,7 +242,7 @@ def clean_data_and_make_partitions(path: str, table_id: str) -> str: @task def clean_data_make_partitions_cda(diretorio, table_id): - df_arq = sheet_to_df(cvm_constants.ARQUITETURA_URL.value) + df_arq = sheet_to_df(cvm_constants.ARQUITETURA_URL_CDA.value) anos_meses = obter_anos_meses(diretorio) for i in anos_meses: @@ -274,10 +275,13 @@ def clean_data_make_partitions_cda(diretorio, table_id): df_final[cvm_constants.COLUNAS.value] = df_final[ cvm_constants.COLUNAS.value ].applymap(lambda x: cvm_constants.MAPEAMENTO.value.get(x, x)) + df_final["CNPJ_FUNDO"] = df_final["CNPJ_FUNDO"].str.replace(r"[/.-]", "") + df_final["CNPJ_INSTITUICAO_FINANC_COOBR"] = df_final[ "CNPJ_INSTITUICAO_FINANC_COOBR" ].str.replace(r"[/.-]", "") + df_final["CPF_CNPJ_EMISSOR"] = df_final["CPF_CNPJ_EMISSOR"].str.replace( r"[/.-]", "" ) @@ -287,15 +291,21 @@ def clean_data_make_partitions_cda(diretorio, table_id): ) df_final = rename_columns(df_arq, df_final) df_final = df_final.replace(",", ".", regex=True) + df_final[cvm_constants.COLUNAS_ASCI.value] = df_final[ cvm_constants.COLUNAS_ASCI.value ].fillna("") + df_final[cvm_constants.COLUNAS_ASCI.value] = df_final[ cvm_constants.COLUNAS_ASCI.value ].applymap(limpar_string) + df_final = df_final[cvm_constants.COLUNAS_TOTAIS.value] + log(f"Fazendo partições para o ano ------> {i}") + os.makedirs(f"/tmp/data/br_cvm_fi/{table_id}/output/", exist_ok=True) + to_partitions( df_final, partition_columns=["ano", "mes"], @@ -391,11 +401,25 @@ def clean_data_make_partitions_perfil(diretorio, table_id): df_final = pd.DataFrame() arquivos = glob.glob(f"{diretorio}*.csv") + # import R's utility package + utils = rpackages.importr("utils") + + # select a mirror for R packages + utils.chooseCRANmirror(ind=1) + # R package names + packnames = "readr" + utils.install_packages(packnames) + # R vector of strings + # names_to_install = [x for x in packnames if not rpackages.isinstalled(x)] + # if len(names_to_install) > 0: + + # Import readr + + readr = rpackages.importr("readr") for file in tqdm(arquivos): log(f"Baixando o arquivo ------> {file}") ## reading with R - readr = rpackages.importr("readr") df_r = readr.read_delim( file, delim=";", locale=readr.locale(encoding="ISO-8859-1") ) diff --git a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py index f364af874..06aed6696 100644 --- a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py +++ b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/flows.py @@ -9,6 +9,7 @@ from prefect.storage import GCS from prefect import Parameter, case from prefect.tasks.prefect import create_flow_run, wait_for_flow_run +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants from pipelines.utils.constants import constants as utils_constants @@ -16,12 +17,11 @@ from pipelines.datasets.br_cvm_oferta_publica_distribuicao.tasks import ( crawl, clean_table_oferta_distribuicao, + get_today_date, ) from pipelines.utils.decorators import Flow from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, - update_metadata, - get_temporal_coverage, rename_current_flow_run_dataset_table, get_current_flow_labels, ) @@ -31,7 +31,7 @@ URL = "http://dados.cvm.gov.br/dados/OFERTA/DISTRIB/DADOS/oferta_distribuicao.csv" with Flow( - name="br_cvm_oferta_publica_distribuicao.dia", code_owners=["lucas_cr"] + name="br_cvm_oferta_publica_distribuicao.dia", code_owners=["Equipe Pipelines"] ) as br_cvm_ofe_pub_dis_dia: # Parameters dataset_id = Parameter( @@ -44,7 +44,8 @@ materialize_after_dump = Parameter( "materialize after dump", default=True, required=False ) - dbt_alias = Parameter("dbt_alias", default=False, required=False) + dbt_alias = Parameter("dbt_alias", default=True, required=False) + update_metadata = Parameter("update_metadata", default=False, required=False) rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id @@ -90,6 +91,24 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + with case(update_metadata, True): + data = get_today_date() + + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + _last_date=data, + bq_last_update=False, + api_mode="prod", + date_format="yy-mm-dd", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="months", + upstream_tasks=[wait_for_materialization, data], + ) + br_cvm_ofe_pub_dis_dia.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_cvm_ofe_pub_dis_dia.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) br_cvm_ofe_pub_dis_dia.schedule = schedule_dia diff --git a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/schedules.py b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/schedules.py index 810ca7a4e..978f90d68 100644 --- a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/schedules.py +++ b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/schedules.py @@ -21,7 +21,8 @@ "materialization_mode": "prod", "materialize after dump": True, "table_id": "dia", - "dbt_alias": False, + "udpate_metadata": True, + "dbt_alias": True, }, ), ], diff --git a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py index aacd4b9f9..74792b2c0 100644 --- a/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py +++ b/pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py @@ -4,11 +4,13 @@ """ import os - import pandas as pd from pandas.api.types import is_string_dtype from prefect import task from unidecode import unidecode +import basedosdados as bd +from datetime import datetime +from pipelines.utils.utils import log @task @@ -66,3 +68,10 @@ def clean_table_oferta_distribuicao(root: str) -> str: dataframe.to_csv(ou_filepath, index=False, encoding="utf-8") return ou_filepath + + +@task +def get_today_date() -> str: + d = datetime.today() + + return str(d.strftime("%Y-%m-%d")) diff --git a/pipelines/datasets/br_mp_pep_cargos_funcoes/tasks.py b/pipelines/datasets/br_mp_pep_cargos_funcoes/tasks.py index 1acab03e5..b5a93c810 100644 --- a/pipelines/datasets/br_mp_pep_cargos_funcoes/tasks.py +++ b/pipelines/datasets/br_mp_pep_cargos_funcoes/tasks.py @@ -400,7 +400,7 @@ def is_up_to_date() -> bool: log(f"Last date website: {text}, parsed as {date_website}") last_date_in_bq = extract_last_date( - "br_mp_pep", "cargos_funcoes_atualizado", "yy-mm", "basedosdados" + "br_mp_pep", "cargos_funcoes", "yy-mm", "basedosdados" ) date_in_bq = datetime.datetime.strptime(last_date_in_bq, "%Y-%m") diff --git a/pipelines/datasets/br_ms_cnes/flows.py b/pipelines/datasets/br_ms_cnes/flows.py index d76b9dd82..be11de58b 100644 --- a/pipelines/datasets/br_ms_cnes/flows.py +++ b/pipelines/datasets/br_ms_cnes/flows.py @@ -820,7 +820,7 @@ table_id=table_id, billing_project_id="basedosdados", cnes_database="CNES", - cnes_group_file=br_ms_cnes_constants.DATABASE_GROUPS.value["CNES"][6], + cnes_group_file=br_ms_cnes_constants.DATABASE_GROUPS.value["CNES"][7], ) with case(is_empty(files_path), True): @@ -832,13 +832,13 @@ dbc_files = access_ftp_donwload_files( file_list=files_path, path=br_ms_cnes_constants.PATH.value[0], - table=br_ms_cnes_constants.TABLE.value[6], + table=br_ms_cnes_constants.TABLE.value[7], ) filepath = read_dbc_save_csv( file_list=dbc_files, path=br_ms_cnes_constants.PATH.value[1], - table=br_ms_cnes_constants.TABLE.value[6], + table=br_ms_cnes_constants.TABLE.value[7], upstream_tasks=[files_path, dbc_files], ) diff --git a/pipelines/datasets/br_ms_cnes/schedules.py b/pipelines/datasets/br_ms_cnes/schedules.py index 02f90701e..e18d9d2ad 100644 --- a/pipelines/datasets/br_ms_cnes/schedules.py +++ b/pipelines/datasets/br_ms_cnes/schedules.py @@ -12,7 +12,7 @@ schedule_br_ms_cnes_estabelecimento = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="0 9 * * *", # every day at 9:00 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -31,11 +31,11 @@ adjustments=[adjustments.next_weekday], ) - +# todo selecionar outro horário schedule_br_ms_cnes_profissional = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="30 6 * * *", # every day at 18:30 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -58,7 +58,7 @@ schedule_br_ms_cnes_equipe = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="30 9 * * *", # every day 9:30 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -80,7 +80,7 @@ schedule_br_ms_cnes_leito = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="0 10 * * *", # every day at 10:00 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -102,7 +102,7 @@ schedule_br_ms_cnes_equipamento = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="30 10 * * *", # every day at 10:30 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -124,7 +124,7 @@ schedule_br_ms_cnes_estabelecimento_ensino = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="45 10 * * *", # every day at 10:45 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -146,7 +146,7 @@ schedule_br_ms_cnes_dados_complementares = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="0 11 * * *", # every day 11:00 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -168,7 +168,7 @@ schedule_br_ms_cnes_estabelecimento_filantropico = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="15 11 * * *", # every day at 11:15 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -189,7 +189,7 @@ schedule_br_ms_cnes_gestao_metas = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="30 11 * * *", # every day at 11:30 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -212,7 +212,7 @@ schedule_br_ms_cnes_habilitacao = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="45 11 * * *", # every day at 11:45 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -234,7 +234,7 @@ schedule_br_ms_cnes_incentivos = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="45 11 * * *", # every day at 11:45 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -256,7 +256,7 @@ schedule_br_ms_cnes_regra_contratual = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="0 12 * * *", # every day at 12:00 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, @@ -279,7 +279,7 @@ schedule_br_ms_cnes_servico_especializado = Schedule( clocks=[ CronClock( - cron="0 0 * * *", # every day at midnight + cron="30 12 * * *", # every day at 12:30 start_date=datetime(2023, 9, 1, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, diff --git a/pipelines/datasets/br_rf_cafir/flows.py b/pipelines/datasets/br_rf_cafir/flows.py index 34f9a5a42..63b9fdb60 100644 --- a/pipelines/datasets/br_rf_cafir/flows.py +++ b/pipelines/datasets/br_rf_cafir/flows.py @@ -16,6 +16,7 @@ parse_files_parse_date, parse_data, check_if_bq_data_is_outdated, + convert_datetime_to_string, ) from pipelines.utils.constants import constants as utils_constants @@ -56,6 +57,9 @@ is_outdated = check_if_bq_data_is_outdated( dataset_id=dataset_id, table_id=table_id, data=info[0], upstream_tasks=[info] ) + update_metadata_strig_date = convert_datetime_to_string( + data=info[0], upstream_tasks=[info, is_outdated] + ) with case(is_outdated, False): log_task(f"Não há atualizações para a tabela de {table_id}!") @@ -106,13 +110,17 @@ wait_for_materialization.retry_delay = timedelta( seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + # TODO: Quando a nova fotografia for liberada setar is_free como True + # is_free como true. Não setei agora pq a task update_django_metadata depende + # de um coverage já criado na API. Como a lag entre fotográfias é de 5 meses (6 é o padrão no monento) + # não há necessidade de atualizar o coverage agora. with case(update_metadata, True): update = update_django_metadata( dataset_id, table_id, metadata_type="DateTimeRange", - _last_date=info[0], + _last_date=update_metadata_strig_date, bq_last_update=False, api_mode="prod", date_format="yy-mm-dd", diff --git a/pipelines/datasets/br_rf_cafir/tasks.py b/pipelines/datasets/br_rf_cafir/tasks.py index 8358c5918..8c4b9b5b9 100644 --- a/pipelines/datasets/br_rf_cafir/tasks.py +++ b/pipelines/datasets/br_rf_cafir/tasks.py @@ -155,8 +155,7 @@ def parse_data(url: str, other_task_output: tuple[list[datetime], list[str]]) -> # save new file as csv df.to_csv(save_path, index=False, sep=",", na_rep="", encoding="utf-8") - # resolve ASCII 0 no momento da leitura do BQ - # ler e salvar de novo + # resolve ASCII 0 no momento da leitura do BQ. Ler e salvar de novo. df = pd.read_csv(save_path, dtype=str) df.to_csv(save_path, index=False, sep=",", na_rep="", encoding="utf-8") @@ -179,3 +178,16 @@ def parse_data(url: str, other_task_output: tuple[list[datetime], list[str]]) -> ) return files_path + + +@task +def convert_datetime_to_string(data: datetime): + """Converte a data para string + + Args: + date (datetime): Data + + Returns: + string: Data no formato string + """ + return str(data) diff --git a/pipelines/datasets/br_sp_saopaulo_dieese_icv/flows.py b/pipelines/datasets/br_sp_saopaulo_dieese_icv/flows.py index 4a43ad439..55ec12014 100644 --- a/pipelines/datasets/br_sp_saopaulo_dieese_icv/flows.py +++ b/pipelines/datasets/br_sp_saopaulo_dieese_icv/flows.py @@ -96,4 +96,4 @@ br_sp_dieese.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_sp_dieese.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) -br_sp_dieese.schedule = every_month +# br_sp_dieese.schedule = every_month diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/constants.py b/pipelines/datasets/mundo_transfermarkt_competicoes/constants.py index 22f4c9070..59fdc6b2e 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/constants.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/constants.py @@ -2,33 +2,6 @@ """ Constant values for the datasets projects """ - - -############################################################################### -# -# Esse é um arquivo onde podem ser declaratas constantes que serão usadas -# pelo projeto mundo_transfermarkt. -# -# Por ser um arquivo opcional, pode ser removido sem prejuízo ao funcionamento -# do projeto, caos não esteja em uso. -# -# Para declarar constantes, basta fazer conforme o exemplo abaixo: -# -# ``` -# class constants(Enum): -# """ -# Constant values for the mundo_transfermarkt_competicoes project -# """ -# FOO = "bar" -# ``` -# -# Para usá-las, basta fazer conforme o exemplo abaixo: -# -# ```py -# from pipelines.datasets.mundo_transfermarkt_competicoes.constants import constants -# print(constants.FOO.value) -# ``` -# ############################################################################### from enum import Enum @@ -81,3 +54,44 @@ class constants(Enum): # pylint: disable=c0103 "chutes_fora_man", "chutes_fora_vis", ] + + ORDEM_COPA_BRASIL = [ + "ano_campeonato", + "data", + "horario", + "fase", + "tipo_fase", + "estadio", + "arbitro", + "publico", + "publico_max", + "time_man", + "time_vis", + "tecnico_man", + "tecnico_vis", + "valor_equipe_titular_man", + "valor_equipe_titular_vis", + "idade_media_titular_man", + "idade_media_titular_vis", + "gols_man", + "gols_vis", + "gols_1_tempo_man", + "gols_1_tempo_vis", + "penalti", + "gols_penalti_man", + "gols_penalti_vis", + "escanteios_man", + "escanteios_vis", + "faltas_man", + "faltas_vis", + "chutes_bola_parada_man", + "chutes_bola_parada_vis", + "defesas_man", + "defesas_vis", + "impedimentos_man", + "impedimentos_vis", + "chutes_man", + "chutes_vis", + "chutes_fora_man", + "chutes_fora_vis", + ] diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py b/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py index 48fcb46ee..b1cfa0c5e 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py @@ -2,59 +2,6 @@ """ Flows for mundo_transfermarkt_competicoes """ - -############################################################################### -# -# Aqui é onde devem ser definidos os flows do projeto. -# Cada flow representa uma sequência de passos que serão executados -# em ordem. -# -# Mais informações sobre flows podem ser encontradas na documentação do -# Prefect: https://docs.prefect.io/core/concepts/flows.html -# -# De modo a manter consistência na codebase, todo o código escrito passará -# pelo pylint. Todos os warnings e erros devem ser corrigidos. -# -# Existem diversas maneiras de declarar flows. No entanto, a maneira mais -# conveniente e recomendada pela documentação é usar a API funcional. -# Em essência, isso implica simplesmente na chamada de funções, passando -# os parâmetros necessários para a execução em cada uma delas. -# -# Também, após a definição de um flow, para o adequado funcionamento, é -# mandatório configurar alguns parâmetros dele, os quais são: -# - storage: onde esse flow está armazenado. No caso, o storage é o -# próprio módulo Python que contém o flow. Sendo assim, deve-se -# configurar o storage como o pipelines.datasets -# - run_config: para o caso de execução em cluster Kubernetes, que é -# provavelmente o caso, é necessário configurar o run_config com a -# imagem Docker que será usada para executar o flow. Assim sendo, -# basta usar constants.DOCKER_IMAGE.value, que é automaticamente -# gerado. -# - schedule (opcional): para o caso de execução em intervalos regulares, -# deve-se utilizar algum dos schedules definidos em schedules.py -# -# Um exemplo de flow, considerando todos os pontos acima, é o seguinte: -# -# ----------------------------------------------------------------------------- -# from prefect import task -# from prefect import Flow -# from prefect.run_configs import KubernetesRun -# from prefect.storage import GCS -# from pipelines.constants import constants -# from my_tasks import my_task, another_task -# from my_schedules import some_schedule -# -# with Flow("my_flow") as flow: -# a = my_task(param1=1, param2=2) -# b = another_task(a, param3=3) -# -# flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -# flow.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) -# flow.schedule = some_schedule -# ----------------------------------------------------------------------------- -# -# Abaixo segue um código para exemplificação, que pode ser removido. -# ############################################################################### from pipelines.datasets.mundo_transfermarkt_competicoes.constants import ( constants as mundo_constants, @@ -64,8 +11,14 @@ get_max_data, execucao_coleta_sync, ) -from pipelines.datasets.mundo_transfermarkt_competicoes.utils import execucao_coleta -from pipelines.datasets.mundo_transfermarkt_competicoes.schedules import every_week +from pipelines.datasets.mundo_transfermarkt_competicoes.utils import ( + execucao_coleta, + execucao_coleta_copa, +) +from pipelines.datasets.mundo_transfermarkt_competicoes.schedules import ( + every_week, + every_week_copa, +) from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, rename_current_flow_run_dataset_table, @@ -107,9 +60,9 @@ rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id ) - df = execucao_coleta_sync(execucao_coleta) + df = execucao_coleta_sync(table_id) output_filepath = make_partitions(df, upstream_tasks=[df]) - data_maxima = get_max_data() + data_maxima = get_max_data(output_filepath, upstream_tasks=[output_filepath]) wait_upload_table = create_table_and_upload_to_gcs( data_path=output_filepath, @@ -161,6 +114,7 @@ time_unit="weeks", date_format="yy-mm-dd", api_mode="prod", + upstream_tasks=[materialization_flow], ) transfermarkt_brasileirao_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) @@ -168,3 +122,85 @@ image=constants.DOCKER_IMAGE.value ) transfermarkt_brasileirao_flow.schedule = every_week + +with Flow( + name="mundo_transfermarkt_competicoes.copa_brasil", + code_owners=[ + "Gabs", + ], +) as transfermarkt_copa_flow: + dataset_id = Parameter( + "dataset_id", default="mundo_transfermarkt_competicoes", required=True + ) + table_id = Parameter("table_id", default="copa_brasil", required=True) + materialization_mode = Parameter( + "materialization_mode", default="dev", required=False + ) + materialize_after_dump = Parameter( + "materialize_after_dump", default=True, required=False + ) + dbt_alias = Parameter("dbt_alias", default=True, required=False) + + rename_flow_run = rename_current_flow_run_dataset_table( + prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id + ) + df = execucao_coleta_sync(table_id) + output_filepath = make_partitions(df, upstream_tasks=[df]) + data_maxima = get_max_data(output_filepath, upstream_tasks=[output_filepath]) + + wait_upload_table = create_table_and_upload_to_gcs( + data_path=output_filepath, + dataset_id=dataset_id, + table_id=table_id, + dump_mode="append", + wait=output_filepath, + ) + + with case(materialize_after_dump, True): + # Trigger DBT flow run + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": table_id, + "mode": materialization_mode, + "dbt_alias": dbt_alias, + }, + labels=current_flow_labels, + run_name=r"Materialize {dataset_id}.{table_id}", + ) + + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + _last_date=data_maxima, + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="months", + date_format="yy-mm-dd", + api_mode="prod", + upstream_tasks=[materialization_flow], + ) + +transfermarkt_copa_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +transfermarkt_copa_flow.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +transfermarkt_copa_flow.schedule = every_week_copa diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py b/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py index a046af2cb..6ca65e2a6 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py @@ -3,70 +3,6 @@ Schedules for mundo_transfermarkt_competicoes """ -############################################################################### -# -# Aqui é onde devem ser definidos os schedules para os flows do projeto. -# Cada schedule indica o intervalo de tempo entre as execuções. -# Um schedule pode ser definido para um ou mais flows. -# Mais informações sobre schedules podem ser encontradas na documentação do -# Prefect: https://docs.prefect.io/core/concepts/schedules.html -# -# De modo a manter consistência na codebase, todo o código escrito passará -# pelo pylint. Todos os warnings e erros devem ser corrigidos. -# -# Os schedules devem ser definidos de acordo com a sintaxe do Prefect, como, -# por exemplo, o seguinte (para executar a cada 1 minuto): -# -# ----------------------------------------------------------------------------- -# from datetime import timedelta, datetime -# from prefect.schedules import Schedule -# from prefect.schedules.clocks import IntervalClock -# from pipelines.constants import constants -# -# minute_schedule = Schedule( -# clocks=[ -# IntervalClock( -# interval=timedelta(minutes=1), -# start_date=datetime(2021, 1, 1), -# labels=[ -# constants.DATASETS_AGENT_LABEL.value, -# ] -# ), -# ] -# ) -# ----------------------------------------------------------------------------- -# -# Vale notar que o parâmetro `labels` é obrigatório e deve ser uma lista com -# apenas um elemento, correspondendo ao label do agente que será executado. -# O label do agente é definido em `constants.py` e deve ter o formato -# `DATASETS_AGENT_LABEL`. -# -# Outro exemplo, para executar todos os dias à meia noite, segue abaixo: -# -# ----------------------------------------------------------------------------- -# from prefect import task -# from datetime import timedelta -# import pendulum -# from prefect.schedules import Schedule -# from prefect.schedules.clocks import IntervalClock -# from pipelines.constants import constants -# -# every_day_at_midnight = Schedule( -# clocks=[ -# IntervalClock( -# interval=timedelta(days=1), -# start_date=pendulum.datetime( -# 2021, 1, 1, 0, 0, 0, tz="America/Sao_Paulo"), -# labels=[ -# constants.K8S_AGENT_LABEL.value, -# ] -# ) -# ] -# ) -# ----------------------------------------------------------------------------- -# -# Abaixo segue um código para exemplificação, que pode ser removido. -# ############################################################################### from prefect.schedules.clocks import CronClock @@ -88,7 +24,27 @@ "table_id": "brasileirao_serie_a", "materialization_mode": "prod", "materialize_after_dump": True, - "dbt_alias": False, + "dbt_alias": True, + }, + ), + ] +) + + +every_week_copa = Schedule( + clocks=[ + CronClock( + cron="0 9 * 2-10 2", + start_date=datetime(2023, 5, 1, 7, 30), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "mundo_transfermarkt_competicoes", + "table_id": "copa_brasil", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, }, ), ] diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py b/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py index fdc0745cc..07084eafb 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py @@ -3,68 +3,32 @@ Tasks for mundo_transfermarkt_competicoes """ -############################################################################### -# -# Aqui é onde devem ser definidas as tasks para os flows do projeto. -# Cada task representa um passo da pipeline. Não é estritamente necessário -# tratar todas as exceções que podem ocorrer durante a execução de uma task, -# mas é recomendável, ainda que não vá implicar em uma quebra no sistema. -# Mais informações sobre tasks podem ser encontradas na documentação do -# Prefect: https://docs.prefect.io/core/concepts/tasks.html -# -# De modo a manter consistência na codebase, todo o código escrito passará -# pelo pylint. Todos os warnings e erros devem ser corrigidos. -# -# As tasks devem ser definidas como funções comuns ao Python, com o decorador -# @task acima. É recomendado inserir type hints para as variáveis. -# -# Um exemplo de task é o seguinte: -# -# ----------------------------------------------------------------------------- -# from prefect import task -# -# @task -# def my_task(param1: str, param2: int) -> str: -# """ -# My task description. -# """ -# return f'{param1} {param2}' -# ----------------------------------------------------------------------------- -# -# Você também pode usar pacotes Python arbitrários, como numpy, pandas, etc. -# -# ----------------------------------------------------------------------------- -# from prefect import task -# import numpy as np -# -# @task -# def my_task(a: np.ndarray, b: np.ndarray) -> str: -# """ -# My task description. -# """ -# return np.add(a, b) -# ----------------------------------------------------------------------------- -# -# Abaixo segue um código para exemplificação, que pode ser removido. -# ############################################################################### from pipelines.datasets.mundo_transfermarkt_competicoes.constants import ( constants as mundo_constants, ) +from pipelines.datasets.mundo_transfermarkt_competicoes.utils import ( + execucao_coleta_copa, + execucao_coleta, +) from pipelines.utils.utils import log, to_partitions from prefect import task import re import numpy as np import pandas as pd import asyncio +import os from datetime import timedelta, datetime @task -def execucao_coleta_sync(execucao_coleta): +def execucao_coleta_sync(tabela): # Obter o loop de eventos atual e executar a tarefa nele loop = asyncio.get_event_loop() - df = loop.run_until_complete(execucao_coleta()) + if tabela == "brasileirao_serie_a": + df = loop.run_until_complete(execucao_coleta()) + else: + df = loop.run_until_complete(execucao_coleta_copa()) return df @@ -82,9 +46,11 @@ def make_partitions(df): @task -def get_max_data(): - # ano = mundo_constants.DATA_ATUAL_ANO.value - # df = pd.read_csv(f"{file_path}ano_campeonato={ano}/data.csv") - # df["data"] = pd.to_datetime(df["data"]).dt.date - max_data = mundo_constants.DATA_ATUAL.value +def get_max_data(file_path): + ano = mundo_constants.DATA_ATUAL_ANO.value + df = pd.read_csv(f"{file_path}ano_campeonato={ano}/data.csv") + df["data"] = pd.to_datetime(df["data"]).dt.date + max_data = df["data"].max().strftime("%Y-%m-%d") + + # max_data = mundo_constants.DATA_ATUAL.value return max_data diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/utils.py b/pipelines/datasets/mundo_transfermarkt_competicoes/utils.py index e8209189d..dba0e9f62 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/utils.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/utils.py @@ -3,31 +3,6 @@ General purpose functions for the mundo_transfermarkt_competicoes project """ -############################################################################### - -# Esse é um arquivo onde podem ser declaratas funções que serão usadas -# pelo projeto mundo_transfermarkt_competicoes. -# -# Por ser um arquivo opcional, pode ser removido sem prejuízo ao funcionamento -# do projeto, caos não esteja em uso. -# -# Para declarar funções, basta fazer em código Python comum, como abaixo: -# -# ``` -# def foo(): -# """ -# Function foo -# """ -# print("foo") -# ``` -# -# Para usá-las, basta fazer conforme o exemplo abaixo: -# -# ```py -# from pipelines.datasets.mundo_transfermarkt_competicoes.utils import foo -# foo() -# ``` -# ############################################################################### import re from bs4 import BeautifulSoup @@ -524,3 +499,477 @@ def sem_info(x, y): df = df[mundo_constants.ORDEM_COLUNA_FINAL.value] return df + + +# ! Código para a Copa do Brasil +def process_copa_brasil(df, content): + """ + Process complete + """ + new_content = { + "estadio": content.find_all("td", attrs={"class": "hauptlink"})[0].get_text(), + "data": re.search( + re.compile(r"\d+/\d+/\d+"), + content.find("a", text=re.compile(r"\d+/\d+/\d")).get_text().strip(), + ).group(0), + "horario": content.find_all("p", attrs={"class": "sb-datum hide-for-small"})[0] + .get_text() + .split("|")[2] + .strip(), + "fase": content.find_all("p", attrs={"class": "sb-datum hide-for-small"})[0] + .get_text() + .split("|")[0] + .strip(), + "publico": content.find_all("td", attrs={"class": "hauptlink"})[1].get_text(), + "publico_max": content.find_all("table", attrs={"class": "profilheader"})[0] + .find_all("td")[2] + .get_text(), + "arbitro": content.find_all("table", attrs={"class": "profilheader"})[1] + .find_all("a")[0] + .get_text(), + "gols_1_tempo_man": content.find_all("div", attrs={"class": "sb-halbzeit"})[0] + .get_text() + .split(":", 1)[0], + "gols_1_tempo_vis": content.find_all("div", attrs={"class": "sb-halbzeit"})[0] + .get_text() + .split(":", 1)[1], + "chutes_man": content.find_all("div", attrs={"class": "sb-statistik-zahl"})[ + 0 + ].get_text(), + "chutes_vis": content.find_all("div", attrs={"class": "sb-statistik-zahl"})[ + 1 + ].get_text(), + "chutes_fora_man": content.find_all( + "div", attrs={"class": "sb-statistik-zahl"} + )[2].get_text(), + "chutes_fora_vis": content.find_all( + "div", attrs={"class": "sb-statistik-zahl"} + )[3].get_text(), + "defesas_man": content.find_all("div", attrs={"class": "sb-statistik-zahl"})[ + 4 + ].get_text(), + "defesas_vis": content.find_all("div", attrs={"class": "sb-statistik-zahl"})[ + 5 + ].get_text(), + "faltas_man": content.find_all("div", attrs={"class": "sb-statistik-zahl"})[ + 10 + ].get_text(), + "faltas_vis": content.find_all("div", attrs={"class": "sb-statistik-zahl"})[ + 11 + ].get_text(), + "escanteios_man": content.find_all("div", attrs={"class": "sb-statistik-zahl"})[ + 6 + ].get_text(), + "escanteios_vis": content.find_all("div", attrs={"class": "sb-statistik-zahl"})[ + 7 + ].get_text(), + "impedimentos_man": content.find_all( + "div", attrs={"class": "sb-statistik-zahl"} + )[12].get_text(), + "impedimentos_vis": content.find_all( + "div", attrs={"class": "sb-statistik-zahl"} + )[13].get_text(), + "chutes_bola_parada_man": content.find_all( + "div", attrs={"class": "sb-statistik-zahl"} + )[8].get_text(), + "chutes_bola_parada_vis": content.find_all( + "div", attrs={"class": "sb-statistik-zahl"} + )[9].get_text(), + } + df = pd.concat([df, pd.DataFrame([new_content])], ignore_index=True) + return df + + +def process_basico_copa_brasil(df, content): + """ + Process data + """ + new_content = { + "estadio": content.find_all("td", attrs={"class": "hauptlink"})[0].get_text(), + "data": re.search( + re.compile(r"\d+/\d+/\d+"), + content.find("a", text=re.compile(r"\d+/\d+/\d")).get_text().strip(), + ).group(0), + "horario": content.find_all("p", attrs={"class": "sb-datum hide-for-small"})[0] + .get_text() + .split("|")[2] + .strip(), + "fase": content.find_all("p", attrs={"class": "sb-datum hide-for-small"})[0] + .get_text() + .split("|")[0] + .strip(), + "publico": content.find_all("td", attrs={"class": "hauptlink"})[1].get_text(), + "publico_max": content.find_all("table", attrs={"class": "profilheader"})[0] + .find_all("td")[2] + .get_text(), + "arbitro": None, + "gols_1_tempo_man": None, + "gols_1_tempo_vis": None, + "chutes_man": None, + "chutes_vis": None, + "chutes_fora_man": None, + "chutes_fora_vis": None, + "defesas_man": None, + "defesas_vis": None, + "faltas_man": None, + "faltas_vis": None, + "escanteios_man": None, + "escanteios_vis": None, + "impedimentos_man": None, + "impedimentos_vis": None, + "chutes_bola_parada_man": None, + "chutes_bola_parada_vis": None, + } + df = pd.concat([df, pd.DataFrame([new_content])], ignore_index=True) + return df + + +def vazio_copa_brasil(df): + """ + Return a template DataFrame + """ + new_content = { + "estadio": None, + "data": None, + "horario": None, + "fase": None, + "publico": None, + "publico_max": None, + "arbitro": None, + "gols_1_tempo_man": None, + "gols_1_tempo_vis": None, + "chutes_man": None, + "chutes_vis": None, + "chutes_fora_man": None, + "chutes_fora_vis": None, + "defesas_man": None, + "defesas_vis": None, + "faltas_man": None, + "faltas_vis": None, + "escanteios_man": None, + "escanteios_vis": None, + "impedimentos_man": None, + "impedimentos_vis": None, + "chutes_bola_parada_man": None, + "chutes_bola_parada_vis": None, + } + df = pd.concat([df, pd.DataFrame([new_content])], ignore_index=True) + return df + + +def pegar_valor_copa_brasil(df, content): + """ + Get value + """ + # gera um dicionário + valor_content = { + "valor_equipe_titular_man": content.find_all("div", class_="table-footer")[0] + .find_all("td")[3] + .get_text() + .split("€", 1)[1], + "valor_equipe_titular_vis": content.find_all("div", class_="table-footer")[1] + .find_all("td")[3] + .get_text() + .split("€", 1)[1], + "idade_media_titular_man": content.find_all("div", class_="table-footer")[0] + .find_all("td")[1] + .get_text() + .split(":", 1)[1] + .strip(), + "idade_media_titular_vis": content.find_all("div", class_="table-footer")[1] + .find_all("td")[1] + .get_text() + .split(":", 1)[1] + .strip(), + "tecnico_man": content.find_all("a", attrs={"id": "0"})[1].get_text(), + "tecnico_vis": content.find_all("a", attrs={"id": "0"})[3].get_text(), + } + df = pd.concat([df, pd.DataFrame([valor_content])], ignore_index=True) + return df + + +def pegar_valor_sem_tecnico_copa_brasil(df, content): + """ + Get value without technical + """ + valor_content = { + "valor_equipe_titular_man": content.find_all("div", class_="table-footer")[0] + .find_all("td")[3] + .get_text() + .split("€", 1)[1], + "valor_equipe_titular_vis": content.find_all("div", class_="table-footer")[1] + .find_all("td")[3] + .get_text() + .split("€", 1)[1], + "idade_media_titular_man": content.find_all("div", class_="table-footer")[0] + .find_all("td")[1] + .get_text() + .split(":", 1)[1] + .strip(), + "idade_media_titular_vis": content.find_all("div", class_="table-footer")[1] + .find_all("td")[1] + .get_text() + .split(":", 1)[1] + .strip(), + "tecnico_man": None, + "tecnico_vis": None, + } + df = pd.concat([df, pd.DataFrame([valor_content])], ignore_index=True) + return df + + +def valor_vazio_copa_brasil(df): + """ + Return a temmplate DataFrame + """ + valor_content = { + "valor_equipe_titular_man": None, + "valor_equipe_titular_vis": None, + "idade_media_titular_man": None, + "idade_media_titular_vis": None, + "tecnico_man": None, + "tecnico_vis": None, + } + df = pd.concat([df, pd.DataFrame([valor_content])], ignore_index=True) + return df + + +async def execucao_coleta_copa(): + base_url = "https://www.transfermarkt.com/copa-do-brasil/gesamtspielplan/pokalwettbewerb/BRC/saison_id/{season}" + headers = { + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36" + } + + pattern_man = re.compile(r"\d+:") + pattern_vis = re.compile(r":\d+") + + base_link = "https://www.transfermarkt.com" + base_link_br = "https://www.transfermarkt.com.br" + links = [] + time_man = [] + time_vis = [] + gols = [] + gols_man = [] + gols_vis = [] + penalti = [] + lista_nova = [] + + season = mundo_constants.SEASON.value + # Pegar o link das partidas + # Para cada temporada, adiciona os links dos jogos em `links` + log(f"Obtendo links: temporada {season}") + site_data = requests.get(base_url.format(season=season), headers=headers) + soup = BeautifulSoup(site_data.content, "html.parser") + link_tags = soup.find_all("a", attrs={"class": "ergebnis-link"}) + for tag in link_tags: + links.append(re.sub(r"\s", "", tag["href"])) + + tabela_grand = soup.findAll("div", class_="box")[1] + tabela = tabela_grand.findAll("tbody") + for i in range(0, len(tabela)): + # for i in range(0, 4): + for row in tabela[i].findAll("tr"): + if not row.get("class"): + td_tags = row.findAll("td") + # Verifica se existem pelo menos três na linha + if len(td_tags) >= 3: + time_man.append(td_tags[2].text.strip()) + time_vis.append(td_tags[6].text.strip()) + gols.append(td_tags[4].text.strip()) + + while ( + len(links) != len(time_man) + or len(links) != len(time_vis) + or len(links) != len(gols) + ): + if len(links) != len(time_man): + time_man.pop(0) + if len(links) != len(time_vis): + time_vis.pop(0) + if len(links) != len(gols): + gols.pop(0) + + for gol in gols: + penalti.append(1 if "on pens" in gol else 0) + + pares = zip(links, penalti) + for link, valor_penalti in pares: + if valor_penalti == 1: + link_data = requests.get(base_link + link, headers=headers) + link_soup = BeautifulSoup(link_data.content, "html.parser") + content = link_soup.find("div", id="main") + content_gol = content.find_all("div", attrs={"class": "sb-ereignisse"}) + # Encontre a tag h2 com a classe "content-box-headline" + h2_tags = content.find_all("h2", class_="content-box-headline") + + # Itere pelas tags h2 encontradas + for h2_tag in h2_tags: + if "Goals" in h2_tag.text: + content_gol = content.find_all( + "div", attrs={"class": "sb-ereignisse"} + ) + resultado = ( + content_gol[0] + .find_all("div", attrs={"class": "sb-aktion-spielstand"})[-1] + .get_text() + ) + break # Pare a iteração assim que encontrar "Goals" + else: + resultado = None + # Após a iteração, verifique se resultado é None e, se for, adicione '0:0' à lista + if resultado is None: + lista_nova.append("0:0") + else: + lista_nova.append(resultado) + else: + lista_nova.append(None) + + if len(lista_nova) == len(gols): + for i in range(len(lista_nova)): + # Verifique se o valor em 'lista_nova' é None e substitua pelo valor de 'goals' na mesma posição + if lista_nova[i] is None: + lista_nova[i] = gols.copy()[i] + + for gol in lista_nova: + gols_man.append(str(pattern_man.findall(str(gol)))) + gols_vis.append(str(pattern_vis.findall(str(gol)))) + + gol_pen_man = [] + gol_pen_vis = [] + + for gol in gols: + # Use expressão regular para encontrar os gols das equipes "man" e "vis" apenas quando "on pens" está presente + if "on pens" in gol: + gol_pen_man.append(str(pattern_man.findall(str(gol)))) + gol_pen_vis.append(str(pattern_vis.findall(str(gol)))) + else: + gol_pen_man.append(None) + gol_pen_vis.append(None) + + # links das estatísticas + links_esta = [] + # links das escalações de cada partida + links_valor = [] + + for link in links: + esta = link.replace("index", "statistik") + links_esta.append(esta) + for link in links: + valor = link.replace("index", "aufstellung") + links_valor.append(valor) + + n_links = len(links) + log(f"Encontrados {n_links} partidas.") + log("Extraindo dados...") + + df = pd.DataFrame( + {"time_man": [], "time_vis": [], "gols_man": [], "gols_vis": [], "penalti": []} + ) + df_valor = pd.DataFrame({}) + + for n, link in enumerate(links_esta): + content = await get_content(base_link_br + link, wait_time=0.01) + if content: + try: + df = process_copa_brasil(df, content) + except Exception: + try: + df = process_basico_copa_brasil(df, content) + except Exception: + df = vazio_copa_brasil(df) + else: + df = vazio_copa_brasil(df) + log(f"{n+1} dados sobre estatística de {n_links} extraídos.") + + for n, link in enumerate(links_valor): + content = await get_content(base_link + link, wait_time=0.01) + + if content: + try: + df_valor = pegar_valor_copa_brasil(df_valor, content) + except Exception: + try: + df_valor = pegar_valor_sem_tecnico_copa_brasil(df_valor, content) + except Exception: + df_valor = valor_vazio_copa_brasil(df_valor) + else: + df_valor = valor_vazio_copa_brasil(df_valor) + log(f"{n+1} valores de {n_links} extraídos.") + + df["time_man"] = time_man + df["time_vis"] = time_vis + df["gols_man"] = gols_man + df["gols_vis"] = gols_vis + df["penalti"] = penalti + df["gols_penalti_man"] = gol_pen_man + df["gols_penalti_vis"] = gol_pen_vis + # Limpando variável + df["gols_man"] = df["gols_man"].map(lambda x: x.replace("['", "")) + df["gols_man"] = df["gols_man"].map(lambda x: x.replace(":']", "")) + + df["gols_vis"] = df["gols_vis"].map(lambda x: x.replace("[':", "")) + df["gols_vis"] = df["gols_vis"].map(lambda x: x.replace("']", "")) + + df["gols_penalti_man"] = df["gols_penalti_man"].apply( + lambda x: x.replace("['", "") if pd.notna(x) else x + ) + df["gols_penalti_man"] = df["gols_penalti_man"].apply( + lambda x: x.replace(":']", "") if pd.notna(x) else x + ) + + df["gols_penalti_vis"] = df["gols_penalti_vis"].apply( + lambda x: x.replace("[':", "") if pd.notna(x) else x + ) + df["gols_penalti_vis"] = df["gols_penalti_vis"].apply( + lambda x: x.replace("']", "") if pd.notna(x) else x + ) + + df["gols_1_tempo_vis"] = df["gols_1_tempo_vis"].map( + lambda x: str(x).replace(")", "") + ) + df["gols_1_tempo_man"] = df["gols_1_tempo_man"].map( + lambda x: str(x).replace("(", "") + ) + + df_valor["valor_equipe_titular_man"] = df_valor["valor_equipe_titular_man"].map( + lambda x: str(x).replace("m", "0000") + ) + df_valor["valor_equipe_titular_man"] = df_valor["valor_equipe_titular_man"].map( + lambda x: str(x).replace("k", "000") + ) + df_valor["valor_equipe_titular_man"] = df_valor["valor_equipe_titular_man"].map( + lambda x: str(x).replace(".", "") + ) + + df_valor["valor_equipe_titular_vis"] = df_valor["valor_equipe_titular_vis"].map( + lambda x: str(x).replace("m", "0000") + ) + df_valor["valor_equipe_titular_vis"] = df_valor["valor_equipe_titular_vis"].map( + lambda x: str(x).replace("k", "000") + ) + df_valor["valor_equipe_titular_vis"] = df_valor["valor_equipe_titular_vis"].map( + lambda x: str(x).replace(".", "") + ) + + df["publico_max"] = df["publico_max"].map(lambda x: str(x).replace(".", "")) + df["publico"] = df["publico"].map(lambda x: str(x).replace(".", "")) + + # Extrair a parte antes do traço + df["tipo_fase"] = df["fase"].str.extract(r"(.+)\s*-\s*(.*)")[1] + + # Substituir as células vazias na coluna 'tipo_fase' por "Jogo único" + df["tipo_fase"].fillna("Jogo único", inplace=True) + + # Atualizar a coluna 'fase' com a parte antes do traço ou a própria 'fase' se não houver traço + df["fase"] = df["fase"].str.extract(r"(.+)\s*-\s*(.*)")[0].fillna(df["fase"]) + + df["data"] = pd.to_datetime(df["data"], format="%d/%m/%y").dt.date + df["horario"] = pd.to_datetime(df["horario"], format="%H:%M").dt.strftime("%H:%M") + df["ano_campeonato"] = mundo_constants.DATA_ATUAL_ANO.value + + df = pd.concat([df, df_valor], axis=1) + df.fillna("", inplace=True) + df["publico_max"] = df["publico_max"].str.replace("\n", "") + df = df[mundo_constants.ORDEM_COPA_BRASIL.value] + + return df diff --git a/pipelines/utils/metadata/tasks.py b/pipelines/utils/metadata/tasks.py index d440e1066..dbdda6c62 100644 --- a/pipelines/utils/metadata/tasks.py +++ b/pipelines/utils/metadata/tasks.py @@ -107,6 +107,12 @@ def update_django_metadata( "weeks": "weeks", "days": "days", } + + + if not isinstance(_last_date, str) and _last_date is not None: + raise ValueError("O parâmetro `last_date` deve ser uma string não nula") + + if time_unit not in unidades_permitidas: raise ValueError( f"Unidade temporal inválida. Escolha entre {', '.join(unidades_permitidas.keys())}" diff --git a/pipelines/utils/utils.py b/pipelines/utils/utils.py index 279dc8660..57dc2758e 100644 --- a/pipelines/utils/utils.py +++ b/pipelines/utils/utils.py @@ -725,7 +725,9 @@ def extract_last_update( raise -def extract_last_date(dataset_id, table_id, date_format: str, billing_project_id: str): +def extract_last_date( + dataset_id, table_id, date_format: str, billing_project_id: str, data: str = "data" +): """ Extracts the last update date of a given dataset table. @@ -772,7 +774,7 @@ def extract_last_date(dataset_id, table_id, date_format: str, billing_project_id try: query_bd = f""" SELECT - MAX(data) as max_date + MAX({data}) as max_date FROM `basedosdados.{dataset_id}.{table_id}` """