From 1e49ef904d020210a46c55281969f101a17cb617 Mon Sep 17 00:00:00 2001 From: Gagabrielle-carv Date: Wed, 20 Sep 2023 11:04:59 -0300 Subject: [PATCH 1/6] add dados --- .../constants.py | 68 ++- .../mundo_transfermarkt_competicoes/flows.py | 146 +++-- .../schedules.py | 84 +-- .../mundo_transfermarkt_competicoes/tasks.py | 56 +- .../mundo_transfermarkt_competicoes/utils.py | 498 +++++++++++++++++- 5 files changed, 631 insertions(+), 221 deletions(-) diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/constants.py b/pipelines/datasets/mundo_transfermarkt_competicoes/constants.py index 22f4c9070..59fdc6b2e 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/constants.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/constants.py @@ -2,33 +2,6 @@ """ Constant values for the datasets projects """ - - -############################################################################### -# -# Esse é um arquivo onde podem ser declaratas constantes que serão usadas -# pelo projeto mundo_transfermarkt. -# -# Por ser um arquivo opcional, pode ser removido sem prejuízo ao funcionamento -# do projeto, caos não esteja em uso. -# -# Para declarar constantes, basta fazer conforme o exemplo abaixo: -# -# ``` -# class constants(Enum): -# """ -# Constant values for the mundo_transfermarkt_competicoes project -# """ -# FOO = "bar" -# ``` -# -# Para usá-las, basta fazer conforme o exemplo abaixo: -# -# ```py -# from pipelines.datasets.mundo_transfermarkt_competicoes.constants import constants -# print(constants.FOO.value) -# ``` -# ############################################################################### from enum import Enum @@ -81,3 +54,44 @@ class constants(Enum): # pylint: disable=c0103 "chutes_fora_man", "chutes_fora_vis", ] + + ORDEM_COPA_BRASIL = [ + "ano_campeonato", + "data", + "horario", + "fase", + "tipo_fase", + "estadio", + "arbitro", + "publico", + "publico_max", + "time_man", + "time_vis", + "tecnico_man", + "tecnico_vis", + "valor_equipe_titular_man", + "valor_equipe_titular_vis", + "idade_media_titular_man", + "idade_media_titular_vis", + "gols_man", + "gols_vis", + "gols_1_tempo_man", + "gols_1_tempo_vis", + "penalti", + "gols_penalti_man", + "gols_penalti_vis", + "escanteios_man", + "escanteios_vis", + "faltas_man", + "faltas_vis", + "chutes_bola_parada_man", + "chutes_bola_parada_vis", + "defesas_man", + "defesas_vis", + "impedimentos_man", + "impedimentos_vis", + "chutes_man", + "chutes_vis", + "chutes_fora_man", + "chutes_fora_vis", + ] diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py b/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py index 48fcb46ee..b8744b493 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py @@ -2,59 +2,6 @@ """ Flows for mundo_transfermarkt_competicoes """ - -############################################################################### -# -# Aqui é onde devem ser definidos os flows do projeto. -# Cada flow representa uma sequência de passos que serão executados -# em ordem. -# -# Mais informações sobre flows podem ser encontradas na documentação do -# Prefect: https://docs.prefect.io/core/concepts/flows.html -# -# De modo a manter consistência na codebase, todo o código escrito passará -# pelo pylint. Todos os warnings e erros devem ser corrigidos. -# -# Existem diversas maneiras de declarar flows. No entanto, a maneira mais -# conveniente e recomendada pela documentação é usar a API funcional. -# Em essência, isso implica simplesmente na chamada de funções, passando -# os parâmetros necessários para a execução em cada uma delas. -# -# Também, após a definição de um flow, para o adequado funcionamento, é -# mandatório configurar alguns parâmetros dele, os quais são: -# - storage: onde esse flow está armazenado. No caso, o storage é o -# próprio módulo Python que contém o flow. Sendo assim, deve-se -# configurar o storage como o pipelines.datasets -# - run_config: para o caso de execução em cluster Kubernetes, que é -# provavelmente o caso, é necessário configurar o run_config com a -# imagem Docker que será usada para executar o flow. Assim sendo, -# basta usar constants.DOCKER_IMAGE.value, que é automaticamente -# gerado. -# - schedule (opcional): para o caso de execução em intervalos regulares, -# deve-se utilizar algum dos schedules definidos em schedules.py -# -# Um exemplo de flow, considerando todos os pontos acima, é o seguinte: -# -# ----------------------------------------------------------------------------- -# from prefect import task -# from prefect import Flow -# from prefect.run_configs import KubernetesRun -# from prefect.storage import GCS -# from pipelines.constants import constants -# from my_tasks import my_task, another_task -# from my_schedules import some_schedule -# -# with Flow("my_flow") as flow: -# a = my_task(param1=1, param2=2) -# b = another_task(a, param3=3) -# -# flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -# flow.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) -# flow.schedule = some_schedule -# ----------------------------------------------------------------------------- -# -# Abaixo segue um código para exemplificação, que pode ser removido. -# ############################################################################### from pipelines.datasets.mundo_transfermarkt_competicoes.constants import ( constants as mundo_constants, @@ -64,8 +11,14 @@ get_max_data, execucao_coleta_sync, ) -from pipelines.datasets.mundo_transfermarkt_competicoes.utils import execucao_coleta -from pipelines.datasets.mundo_transfermarkt_competicoes.schedules import every_week +from pipelines.datasets.mundo_transfermarkt_competicoes.utils import ( + execucao_coleta, + execucao_coleta_copa, +) +from pipelines.datasets.mundo_transfermarkt_competicoes.schedules import ( + every_week, + every_week_copa, +) from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, rename_current_flow_run_dataset_table, @@ -109,7 +62,7 @@ ) df = execucao_coleta_sync(execucao_coleta) output_filepath = make_partitions(df, upstream_tasks=[df]) - data_maxima = get_max_data() + data_maxima = get_max_data(output_filepath, upstream_tasks=[output_filepath]) wait_upload_table = create_table_and_upload_to_gcs( data_path=output_filepath, @@ -168,3 +121,84 @@ image=constants.DOCKER_IMAGE.value ) transfermarkt_brasileirao_flow.schedule = every_week + +with Flow( + name="mundo_transfermarkt_competicoes.copa_brasil", + code_owners=[ + "Gabs", + ], +) as transfermarkt_copa_flow: + dataset_id = Parameter( + "dataset_id", default="mundo_transfermarkt_competicoes", required=True + ) + table_id = Parameter("table_id", default="copa_brasil", required=True) + materialization_mode = Parameter( + "materialization_mode", default="dev", required=False + ) + materialize_after_dump = Parameter( + "materialize_after_dump", default=True, required=False + ) + dbt_alias = Parameter("dbt_alias", default=True, required=False) + + rename_flow_run = rename_current_flow_run_dataset_table( + prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id + ) + df = execucao_coleta_sync(execucao_coleta_copa) + output_filepath = make_partitions(df, upstream_tasks=[df]) + data_maxima = get_max_data(output_filepath, upstream_tasks=[output_filepath]) + + wait_upload_table = create_table_and_upload_to_gcs( + data_path=output_filepath, + dataset_id=dataset_id, + table_id=table_id, + dump_mode="append", + wait=output_filepath, + ) + + with case(materialize_after_dump, True): + # Trigger DBT flow run + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": table_id, + "mode": materialization_mode, + "dbt_alias": dbt_alias, + }, + labels=current_flow_labels, + run_name=r"Materialize {dataset_id}.{table_id}", + ) + + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + _last_date=data_maxima, + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=True, + is_free=True, + time_delta=1, + time_unit="year", + date_format="yy-mm-dd", + api_mode="prod", + ) + +transfermarkt_copa_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +transfermarkt_copa_flow.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +transfermarkt_copa_flow.schedule = every_week_copa diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py b/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py index a046af2cb..59b5dbbea 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py @@ -3,70 +3,6 @@ Schedules for mundo_transfermarkt_competicoes """ -############################################################################### -# -# Aqui é onde devem ser definidos os schedules para os flows do projeto. -# Cada schedule indica o intervalo de tempo entre as execuções. -# Um schedule pode ser definido para um ou mais flows. -# Mais informações sobre schedules podem ser encontradas na documentação do -# Prefect: https://docs.prefect.io/core/concepts/schedules.html -# -# De modo a manter consistência na codebase, todo o código escrito passará -# pelo pylint. Todos os warnings e erros devem ser corrigidos. -# -# Os schedules devem ser definidos de acordo com a sintaxe do Prefect, como, -# por exemplo, o seguinte (para executar a cada 1 minuto): -# -# ----------------------------------------------------------------------------- -# from datetime import timedelta, datetime -# from prefect.schedules import Schedule -# from prefect.schedules.clocks import IntervalClock -# from pipelines.constants import constants -# -# minute_schedule = Schedule( -# clocks=[ -# IntervalClock( -# interval=timedelta(minutes=1), -# start_date=datetime(2021, 1, 1), -# labels=[ -# constants.DATASETS_AGENT_LABEL.value, -# ] -# ), -# ] -# ) -# ----------------------------------------------------------------------------- -# -# Vale notar que o parâmetro `labels` é obrigatório e deve ser uma lista com -# apenas um elemento, correspondendo ao label do agente que será executado. -# O label do agente é definido em `constants.py` e deve ter o formato -# `DATASETS_AGENT_LABEL`. -# -# Outro exemplo, para executar todos os dias à meia noite, segue abaixo: -# -# ----------------------------------------------------------------------------- -# from prefect import task -# from datetime import timedelta -# import pendulum -# from prefect.schedules import Schedule -# from prefect.schedules.clocks import IntervalClock -# from pipelines.constants import constants -# -# every_day_at_midnight = Schedule( -# clocks=[ -# IntervalClock( -# interval=timedelta(days=1), -# start_date=pendulum.datetime( -# 2021, 1, 1, 0, 0, 0, tz="America/Sao_Paulo"), -# labels=[ -# constants.K8S_AGENT_LABEL.value, -# ] -# ) -# ] -# ) -# ----------------------------------------------------------------------------- -# -# Abaixo segue um código para exemplificação, que pode ser removido. -# ############################################################################### from prefect.schedules.clocks import CronClock @@ -93,3 +29,23 @@ ), ] ) + + +every_week_copa = Schedule( + clocks=[ + CronClock( + cron="0 9 * 2-12 2", + start_date=datetime(2023, 5, 1, 7, 30), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "mundo_transfermarkt_competicoes", + "table_id": "copa_brasil", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": False, + }, + ), + ] +) diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py b/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py index fdc0745cc..bece486bf 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py @@ -3,50 +3,6 @@ Tasks for mundo_transfermarkt_competicoes """ -############################################################################### -# -# Aqui é onde devem ser definidas as tasks para os flows do projeto. -# Cada task representa um passo da pipeline. Não é estritamente necessário -# tratar todas as exceções que podem ocorrer durante a execução de uma task, -# mas é recomendável, ainda que não vá implicar em uma quebra no sistema. -# Mais informações sobre tasks podem ser encontradas na documentação do -# Prefect: https://docs.prefect.io/core/concepts/tasks.html -# -# De modo a manter consistência na codebase, todo o código escrito passará -# pelo pylint. Todos os warnings e erros devem ser corrigidos. -# -# As tasks devem ser definidas como funções comuns ao Python, com o decorador -# @task acima. É recomendado inserir type hints para as variáveis. -# -# Um exemplo de task é o seguinte: -# -# ----------------------------------------------------------------------------- -# from prefect import task -# -# @task -# def my_task(param1: str, param2: int) -> str: -# """ -# My task description. -# """ -# return f'{param1} {param2}' -# ----------------------------------------------------------------------------- -# -# Você também pode usar pacotes Python arbitrários, como numpy, pandas, etc. -# -# ----------------------------------------------------------------------------- -# from prefect import task -# import numpy as np -# -# @task -# def my_task(a: np.ndarray, b: np.ndarray) -> str: -# """ -# My task description. -# """ -# return np.add(a, b) -# ----------------------------------------------------------------------------- -# -# Abaixo segue um código para exemplificação, que pode ser removido. -# ############################################################################### from pipelines.datasets.mundo_transfermarkt_competicoes.constants import ( constants as mundo_constants, @@ -82,9 +38,11 @@ def make_partitions(df): @task -def get_max_data(): - # ano = mundo_constants.DATA_ATUAL_ANO.value - # df = pd.read_csv(f"{file_path}ano_campeonato={ano}/data.csv") - # df["data"] = pd.to_datetime(df["data"]).dt.date - max_data = mundo_constants.DATA_ATUAL.value +def get_max_data(file_path): + ano = mundo_constants.DATA_ATUAL_ANO.value + df = pd.read_csv(f"{file_path}ano_campeonato={ano}/data.csv") + df["data"] = pd.to_datetime(df["data"]).dt.date + max_data = df["data"].max() + + # max_data = mundo_constants.DATA_ATUAL.value return max_data diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/utils.py b/pipelines/datasets/mundo_transfermarkt_competicoes/utils.py index e8209189d..cf1be24f2 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/utils.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/utils.py @@ -3,31 +3,6 @@ General purpose functions for the mundo_transfermarkt_competicoes project """ -############################################################################### - -# Esse é um arquivo onde podem ser declaratas funções que serão usadas -# pelo projeto mundo_transfermarkt_competicoes. -# -# Por ser um arquivo opcional, pode ser removido sem prejuízo ao funcionamento -# do projeto, caos não esteja em uso. -# -# Para declarar funções, basta fazer em código Python comum, como abaixo: -# -# ``` -# def foo(): -# """ -# Function foo -# """ -# print("foo") -# ``` -# -# Para usá-las, basta fazer conforme o exemplo abaixo: -# -# ```py -# from pipelines.datasets.mundo_transfermarkt_competicoes.utils import foo -# foo() -# ``` -# ############################################################################### import re from bs4 import BeautifulSoup @@ -524,3 +499,476 @@ def sem_info(x, y): df = df[mundo_constants.ORDEM_COLUNA_FINAL.value] return df + + +# ! Código para a Copa do Brasil +def process_copa_brasil(df, content): + """ + Process complete + """ + new_content = { + "estadio": content.find_all("td", attrs={"class": "hauptlink"})[0].get_text(), + "data": re.search( + re.compile(r"\d+/\d+/\d+"), + content.find("a", text=re.compile(r"\d+/\d+/\d")).get_text().strip(), + ).group(0), + "horario": content.find_all("p", attrs={"class": "sb-datum hide-for-small"})[0] + .get_text() + .split("|")[2] + .strip(), + "fase": content.find_all("p", attrs={"class": "sb-datum hide-for-small"})[0] + .get_text() + .split("|")[0] + .strip(), + "publico": content.find_all("td", attrs={"class": "hauptlink"})[1].get_text(), + "publico_max": content.find_all("table", attrs={"class": "profilheader"})[0] + .find_all("td")[2] + .get_text(), + "arbitro": content.find_all("table", attrs={"class": "profilheader"})[1] + .find_all("a")[0] + .get_text(), + "gols_1_tempo_man": content.find_all("div", attrs={"class": "sb-halbzeit"})[0] + .get_text() + .split(":", 1)[0], + "gols_1_tempo_vis": content.find_all("div", attrs={"class": "sb-halbzeit"})[0] + .get_text() + .split(":", 1)[1], + "chutes_man": content.find_all("div", attrs={"class": "sb-statistik-zahl"})[ + 0 + ].get_text(), + "chutes_vis": content.find_all("div", attrs={"class": "sb-statistik-zahl"})[ + 1 + ].get_text(), + "chutes_fora_man": content.find_all( + "div", attrs={"class": "sb-statistik-zahl"} + )[2].get_text(), + "chutes_fora_vis": content.find_all( + "div", attrs={"class": "sb-statistik-zahl"} + )[3].get_text(), + "defesas_man": content.find_all("div", attrs={"class": "sb-statistik-zahl"})[ + 4 + ].get_text(), + "defesas_vis": content.find_all("div", attrs={"class": "sb-statistik-zahl"})[ + 5 + ].get_text(), + "faltas_man": content.find_all("div", attrs={"class": "sb-statistik-zahl"})[ + 10 + ].get_text(), + "faltas_vis": content.find_all("div", attrs={"class": "sb-statistik-zahl"})[ + 11 + ].get_text(), + "escanteios_man": content.find_all("div", attrs={"class": "sb-statistik-zahl"})[ + 6 + ].get_text(), + "escanteios_vis": content.find_all("div", attrs={"class": "sb-statistik-zahl"})[ + 7 + ].get_text(), + "impedimentos_man": content.find_all( + "div", attrs={"class": "sb-statistik-zahl"} + )[12].get_text(), + "impedimentos_vis": content.find_all( + "div", attrs={"class": "sb-statistik-zahl"} + )[13].get_text(), + "chutes_bola_parada_man": content.find_all( + "div", attrs={"class": "sb-statistik-zahl"} + )[8].get_text(), + "chutes_bola_parada_vis": content.find_all( + "div", attrs={"class": "sb-statistik-zahl"} + )[9].get_text(), + } + df = pd.concat([df, pd.DataFrame([new_content])], ignore_index=True) + return df + + +def process_basico_copa_brasil(df, content): + """ + Process data + """ + new_content = { + "estadio": content.find_all("td", attrs={"class": "hauptlink"})[0].get_text(), + "data": re.search( + re.compile(r"\d+/\d+/\d+"), + content.find("a", text=re.compile(r"\d+/\d+/\d")).get_text().strip(), + ).group(0), + "horario": content.find_all("p", attrs={"class": "sb-datum hide-for-small"})[0] + .get_text() + .split("|")[2] + .strip(), + "fase": content.find_all("p", attrs={"class": "sb-datum hide-for-small"})[0] + .get_text() + .split("|")[0] + .strip(), + "publico": content.find_all("td", attrs={"class": "hauptlink"})[1].get_text(), + "publico_max": content.find_all("table", attrs={"class": "profilheader"})[0] + .find_all("td")[2] + .get_text(), + "arbitro": None, + "gols_1_tempo_man": None, + "gols_1_tempo_vis": None, + "chutes_man": None, + "chutes_vis": None, + "chutes_fora_man": None, + "chutes_fora_vis": None, + "defesas_man": None, + "defesas_vis": None, + "faltas_man": None, + "faltas_vis": None, + "escanteios_man": None, + "escanteios_vis": None, + "impedimentos_man": None, + "impedimentos_vis": None, + "chutes_bola_parada_man": None, + "chutes_bola_parada_vis": None, + } + df = pd.concat([df, pd.DataFrame([new_content])], ignore_index=True) + return df + + +def vazio_copa_brasil(df): + """ + Return a template DataFrame + """ + new_content = { + "estadio": None, + "data": None, + "horario": None, + "fase": None, + "publico": None, + "publico_max": None, + "arbitro": None, + "gols_1_tempo_man": None, + "gols_1_tempo_vis": None, + "chutes_man": None, + "chutes_vis": None, + "chutes_fora_man": None, + "chutes_fora_vis": None, + "defesas_man": None, + "defesas_vis": None, + "faltas_man": None, + "faltas_vis": None, + "escanteios_man": None, + "escanteios_vis": None, + "impedimentos_man": None, + "impedimentos_vis": None, + "chutes_bola_parada_man": None, + "chutes_bola_parada_vis": None, + } + df = pd.concat([df, pd.DataFrame([new_content])], ignore_index=True) + return df + + +def pegar_valor_copa_brasil(df, content): + """ + Get value + """ + # gera um dicionário + valor_content = { + "valor_equipe_titular_man": content.find_all("div", class_="table-footer")[0] + .find_all("td")[3] + .get_text() + .split("€", 1)[1], + "valor_equipe_titular_vis": content.find_all("div", class_="table-footer")[1] + .find_all("td")[3] + .get_text() + .split("€", 1)[1], + "idade_media_titular_man": content.find_all("div", class_="table-footer")[0] + .find_all("td")[1] + .get_text() + .split(":", 1)[1] + .strip(), + "idade_media_titular_vis": content.find_all("div", class_="table-footer")[1] + .find_all("td")[1] + .get_text() + .split(":", 1)[1] + .strip(), + "tecnico_man": content.find_all("a", attrs={"id": "0"})[1].get_text(), + "tecnico_vis": content.find_all("a", attrs={"id": "0"})[3].get_text(), + } + df = pd.concat([df, pd.DataFrame([valor_content])], ignore_index=True) + return df + + +def pegar_valor_sem_tecnico_copa_brasil(df, content): + """ + Get value without technical + """ + valor_content = { + "valor_equipe_titular_man": content.find_all("div", class_="table-footer")[0] + .find_all("td")[3] + .get_text() + .split("€", 1)[1], + "valor_equipe_titular_vis": content.find_all("div", class_="table-footer")[1] + .find_all("td")[3] + .get_text() + .split("€", 1)[1], + "idade_media_titular_man": content.find_all("div", class_="table-footer")[0] + .find_all("td")[1] + .get_text() + .split(":", 1)[1] + .strip(), + "idade_media_titular_vis": content.find_all("div", class_="table-footer")[1] + .find_all("td")[1] + .get_text() + .split(":", 1)[1] + .strip(), + "tecnico_man": None, + "tecnico_vis": None, + } + df = pd.concat([df, pd.DataFrame([valor_content])], ignore_index=True) + return df + + +def valor_vazio_copa_brasil(df): + """ + Return a temmplate DataFrame + """ + valor_content = { + "valor_equipe_titular_man": None, + "valor_equipe_titular_vis": None, + "idade_media_titular_man": None, + "idade_media_titular_vis": None, + "tecnico_man": None, + "tecnico_vis": None, + } + df = pd.concat([df, pd.DataFrame([valor_content])], ignore_index=True) + return df + + +async def execucao_coleta_copa(): + base_url = "https://www.transfermarkt.com/copa-do-brasil/gesamtspielplan/pokalwettbewerb/BRC/saison_id/{season}" + headers = { + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36" + } + + pattern_man = re.compile(r"\d+:") + pattern_vis = re.compile(r":\d+") + + base_link = "https://www.transfermarkt.com" + base_link_br = "https://www.transfermarkt.com.br" + links = [] + time_man = [] + time_vis = [] + gols = [] + gols_man = [] + gols_vis = [] + penalti = [] + lista_nova = [] + + season = mundo_constants.SEASON.value + # Pegar o link das partidas + # Para cada temporada, adiciona os links dos jogos em `links` + log(f"Obtendo links: temporada {season}") + site_data = requests.get(base_url.format(season=season), headers=headers) + soup = BeautifulSoup(site_data.content, "html.parser") + link_tags = soup.find_all("a", attrs={"class": "ergebnis-link"}) + for tag in link_tags: + links.append(re.sub(r"\s", "", tag["href"])) + + tabela_grand = soup.findAll("div", class_="box")[1] + tabela = tabela_grand.findAll("tbody") + for i in range(0, len(tabela)): + # for i in range(0, 2): + for row in tabela[i].findAll("tr"): + if not row.get("class"): + td_tags = row.findAll("td") + # Verifica se existem pelo menos três na linha + if len(td_tags) >= 3: + time_man.append(td_tags[2].text.strip()) + time_vis.append(td_tags[6].text.strip()) + gols.append(td_tags[4].text.strip()) + + while ( + len(links) != len(time_man) + or len(links) != len(time_vis) + or len(links) != len(gols) + ): + if len(links) != len(time_man): + time_man.pop(0) + if len(links) != len(time_vis): + time_vis.pop(0) + if len(links) != len(gols): + gols.pop(0) + + for gol in gols: + penalti.append(1 if "on pens" in gol else 0) + + pares = zip(links, penalti) + for link, valor_penalti in pares: + if valor_penalti == 1: + link_data = requests.get(base_link + link, headers=headers) + link_soup = BeautifulSoup(link_data.content, "html.parser") + content = link_soup.find("div", id="main") + content_gol = content.find_all("div", attrs={"class": "sb-ereignisse"}) + # Encontre a tag h2 com a classe "content-box-headline" + h2_tags = content.find_all("h2", class_="content-box-headline") + + # Itere pelas tags h2 encontradas + for h2_tag in h2_tags: + if "Goals" in h2_tag.text: + content_gol = content.find_all( + "div", attrs={"class": "sb-ereignisse"} + ) + resultado = ( + content_gol[0] + .find_all("div", attrs={"class": "sb-aktion-spielstand"})[-1] + .get_text() + ) + break # Pare a iteração assim que encontrar "Goals" + else: + resultado = None + # Após a iteração, verifique se resultado é None e, se for, adicione '0:0' à lista + if resultado is None: + lista_nova.append("0:0") + else: + lista_nova.append(resultado) + else: + lista_nova.append(None) + + if len(lista_nova) == len(gols): + for i in range(len(lista_nova)): + # Verifique se o valor em 'lista_nova' é None e substitua pelo valor de 'goals' na mesma posição + if lista_nova[i] is None: + lista_nova[i] = gols.copy()[i] + + for gol in lista_nova: + gols_man.append(str(pattern_man.findall(str(gol)))) + gols_vis.append(str(pattern_vis.findall(str(gol)))) + + gol_pen_man = [] + gol_pen_vis = [] + + for gol in gols: + # Use expressão regular para encontrar os gols das equipes "man" e "vis" apenas quando "on pens" está presente + if "on pens" in gol: + gol_pen_man.append(str(pattern_man.findall(str(gol)))) + gol_pen_vis.append(str(pattern_vis.findall(str(gol)))) + else: + gol_pen_man.append(None) + gol_pen_vis.append(None) + + # links das estatísticas + links_esta = [] + # links das escalações de cada partida + links_valor = [] + + for link in links: + esta = link.replace("index", "statistik") + links_esta.append(esta) + for link in links: + valor = link.replace("index", "aufstellung") + links_valor.append(valor) + + n_links = len(links) + log(f"Encontrados {n_links} partidas.") + log("Extraindo dados...") + + df = pd.DataFrame( + {"time_man": [], "time_vis": [], "gols_man": [], "gols_vis": [], "penalti": []} + ) + df_valor = pd.DataFrame({}) + + for n, link in enumerate(links_esta): + content = await get_content(base_link_br + link, wait_time=0.01) + if content: + try: + df = process_copa_brasil(df, content) + except Exception: + try: + df = process_basico_copa_brasil(df, content) + except Exception: + df = vazio_copa_brasil(df) + else: + df = vazio_copa_brasil(df) + log(f"{n+1} dados sobre estatística de {n_links} extraídos.") + + for n, link in enumerate(links_valor): + content = await get_content(base_link + link, wait_time=0.01) + + if content: + try: + df_valor = pegar_valor_copa_brasil(df_valor, content) + except Exception: + try: + df_valor = pegar_valor_sem_tecnico_copa_brasil(df_valor, content) + except Exception: + df_valor = valor_vazio_copa_brasil(df_valor) + else: + df_valor = valor_vazio_copa_brasil(df_valor) + log(f"{n+1} valores de {n_links} extraídos.") + + df["time_man"] = time_man + df["time_vis"] = time_vis + df["gols_man"] = gols_man + df["gols_vis"] = gols_vis + df["penalti"] = penalti + df["gols_penalti_man"] = gol_pen_man + df["gols_penalti_vis"] = gol_pen_vis + # Limpando variável + df["gols_man"] = df["gols_man"].map(lambda x: x.replace("['", "")) + df["gols_man"] = df["gols_man"].map(lambda x: x.replace(":']", "")) + + df["gols_vis"] = df["gols_vis"].map(lambda x: x.replace("[':", "")) + df["gols_vis"] = df["gols_vis"].map(lambda x: x.replace("']", "")) + + df["gols_penalti_man"] = df["gols_penalti_man"].apply( + lambda x: x.replace("['", "") if pd.notna(x) else x + ) + df["gols_penalti_man"] = df["gols_penalti_man"].apply( + lambda x: x.replace(":']", "") if pd.notna(x) else x + ) + + df["gols_penalti_vis"] = df["gols_penalti_vis"].apply( + lambda x: x.replace("[':", "") if pd.notna(x) else x + ) + df["gols_penalti_vis"] = df["gols_penalti_vis"].apply( + lambda x: x.replace("']", "") if pd.notna(x) else x + ) + + df["gols_1_tempo_vis"] = df["gols_1_tempo_vis"].map( + lambda x: str(x).replace(")", "") + ) + df["gols_1_tempo_man"] = df["gols_1_tempo_man"].map( + lambda x: str(x).replace("(", "") + ) + + df_valor["valor_equipe_titular_man"] = df_valor["valor_equipe_titular_man"].map( + lambda x: str(x).replace("m", "0000") + ) + df_valor["valor_equipe_titular_man"] = df_valor["valor_equipe_titular_man"].map( + lambda x: str(x).replace("k", "000") + ) + df_valor["valor_equipe_titular_man"] = df_valor["valor_equipe_titular_man"].map( + lambda x: str(x).replace(".", "") + ) + + df_valor["valor_equipe_titular_vis"] = df_valor["valor_equipe_titular_vis"].map( + lambda x: str(x).replace("m", "0000") + ) + df_valor["valor_equipe_titular_vis"] = df_valor["valor_equipe_titular_vis"].map( + lambda x: str(x).replace("k", "000") + ) + df_valor["valor_equipe_titular_vis"] = df_valor["valor_equipe_titular_vis"].map( + lambda x: str(x).replace(".", "") + ) + + df["publico_max"] = df["publico_max"].map(lambda x: str(x).replace(".", "")) + df["publico"] = df["publico"].map(lambda x: str(x).replace(".", "")) + + # Extrair a parte antes do traço + df["tipo_fase"] = df["fase"].str.extract(r"(.+)\s*-\s*(.*)")[1] + + # Substituir as células vazias na coluna 'tipo_fase' por "Jogo único" + df["tipo_fase"].fillna("Jogo único", inplace=True) + + # Atualizar a coluna 'fase' com a parte antes do traço ou a própria 'fase' se não houver traço + df["fase"] = df["fase"].str.extract(r"(.+)\s*-\s*(.*)")[0].fillna(df["fase"]) + + df["data"] = pd.to_datetime(df["data"], format="%d/%m/%y").dt.date + df["horario"] = pd.to_datetime(df["horario"], format="%H:%M").dt.strftime("%H:%M") + df["ano_campeonato"] = mundo_constants.DATA_ATUAL_ANO.value + + df = pd.concat([df, df_valor], axis=1) + df.fillna("", inplace=True) + df = df[mundo_constants.ORDEM_COPA_BRASIL.value] + + return df From c90d7e00f12c9d06511c209dcd093132f40cf6c0 Mon Sep 17 00:00:00 2001 From: Gagabrielle-carv Date: Wed, 20 Sep 2023 18:59:22 -0300 Subject: [PATCH 2/6] executa_coleta --- .../mundo_transfermarkt_competicoes/flows.py | 4 ++-- .../mundo_transfermarkt_competicoes/tasks.py | 12 ++++++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py b/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py index b8744b493..e00af2d7f 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py @@ -60,7 +60,7 @@ rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id ) - df = execucao_coleta_sync(execucao_coleta) + df = execucao_coleta_sync(table_id) output_filepath = make_partitions(df, upstream_tasks=[df]) data_maxima = get_max_data(output_filepath, upstream_tasks=[output_filepath]) @@ -143,7 +143,7 @@ rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id ) - df = execucao_coleta_sync(execucao_coleta_copa) + df = execucao_coleta_sync(table_id) output_filepath = make_partitions(df, upstream_tasks=[df]) data_maxima = get_max_data(output_filepath, upstream_tasks=[output_filepath]) diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py b/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py index bece486bf..22fabd1dc 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py @@ -7,6 +7,10 @@ from pipelines.datasets.mundo_transfermarkt_competicoes.constants import ( constants as mundo_constants, ) +from pipelines.datasets.mundo_transfermarkt_competicoes.utils import ( + execucao_coleta_copa, + execucao_coleta, +) from pipelines.utils.utils import log, to_partitions from prefect import task import re @@ -17,10 +21,14 @@ @task -def execucao_coleta_sync(execucao_coleta): +def execucao_coleta_sync(tabela): # Obter o loop de eventos atual e executar a tarefa nele loop = asyncio.get_event_loop() - df = loop.run_until_complete(execucao_coleta()) + if tabela == "brasileirao_serie_a": + df = loop.run_until_complete(execucao_coleta()) + else: + df = loop.run_until_complete(execucao_coleta_copa()) + return df From 2bb7e9566e12dbdeff7f96436d185380a3922fc2 Mon Sep 17 00:00:00 2001 From: Gagabrielle-carv Date: Thu, 21 Sep 2023 09:05:24 -0300 Subject: [PATCH 3/6] upstream_tasks --- pipelines/datasets/mundo_transfermarkt_competicoes/flows.py | 2 ++ .../datasets/mundo_transfermarkt_competicoes/schedules.py | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py b/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py index e00af2d7f..58f4129ad 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py @@ -114,6 +114,7 @@ time_unit="weeks", date_format="yy-mm-dd", api_mode="prod", + upstream_tasks=[materialization_flow], ) transfermarkt_brasileirao_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) @@ -197,6 +198,7 @@ time_unit="year", date_format="yy-mm-dd", api_mode="prod", + upstream_tasks=[materialization_flow], ) transfermarkt_copa_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py b/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py index 59b5dbbea..086fcc3e2 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py @@ -24,7 +24,7 @@ "table_id": "brasileirao_serie_a", "materialization_mode": "prod", "materialize_after_dump": True, - "dbt_alias": False, + "dbt_alias": True, }, ), ] @@ -44,7 +44,7 @@ "table_id": "copa_brasil", "materialization_mode": "prod", "materialize_after_dump": True, - "dbt_alias": False, + "dbt_alias": True, }, ), ] From 993f1cb93e08a13029826246e88c0fe05ea4646f Mon Sep 17 00:00:00 2001 From: Gagabrielle-carv Date: Thu, 21 Sep 2023 14:22:27 -0300 Subject: [PATCH 4/6] fix: \n --- pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py | 2 +- pipelines/datasets/mundo_transfermarkt_competicoes/utils.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py b/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py index 22fabd1dc..28323072e 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py @@ -17,6 +17,7 @@ import numpy as np import pandas as pd import asyncio +import os from datetime import timedelta, datetime @@ -28,7 +29,6 @@ def execucao_coleta_sync(tabela): df = loop.run_until_complete(execucao_coleta()) else: df = loop.run_until_complete(execucao_coleta_copa()) - return df diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/utils.py b/pipelines/datasets/mundo_transfermarkt_competicoes/utils.py index cf1be24f2..dba0e9f62 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/utils.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/utils.py @@ -767,7 +767,7 @@ async def execucao_coleta_copa(): tabela_grand = soup.findAll("div", class_="box")[1] tabela = tabela_grand.findAll("tbody") for i in range(0, len(tabela)): - # for i in range(0, 2): + # for i in range(0, 4): for row in tabela[i].findAll("tr"): if not row.get("class"): td_tags = row.findAll("td") @@ -969,6 +969,7 @@ async def execucao_coleta_copa(): df = pd.concat([df, df_valor], axis=1) df.fillna("", inplace=True) + df["publico_max"] = df["publico_max"].str.replace("\n", "") df = df[mundo_constants.ORDEM_COPA_BRASIL.value] return df From c8e621cc3b6c90cca8ae9e9438dfc05b21ca57a9 Mon Sep 17 00:00:00 2001 From: Gagabrielle-carv Date: Thu, 21 Sep 2023 18:11:48 -0300 Subject: [PATCH 5/6] Schedule --- pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py b/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py index 086fcc3e2..6ca65e2a6 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py @@ -34,7 +34,7 @@ every_week_copa = Schedule( clocks=[ CronClock( - cron="0 9 * 2-12 2", + cron="0 9 * 2-10 2", start_date=datetime(2023, 5, 1, 7, 30), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, From 8238d9bf1178bb395bfa1ec341855254577a945e Mon Sep 17 00:00:00 2001 From: Gagabrielle-carv Date: Thu, 21 Sep 2023 19:23:40 -0300 Subject: [PATCH 6/6] update_django_metadata --- pipelines/datasets/mundo_transfermarkt_competicoes/flows.py | 4 ++-- pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py b/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py index 58f4129ad..b1cfa0c5e 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py @@ -194,8 +194,8 @@ bq_last_update=False, is_bd_pro=True, is_free=True, - time_delta=1, - time_unit="year", + time_delta=6, + time_unit="months", date_format="yy-mm-dd", api_mode="prod", upstream_tasks=[materialization_flow], diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py b/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py index 28323072e..07084eafb 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py @@ -50,7 +50,7 @@ def get_max_data(file_path): ano = mundo_constants.DATA_ATUAL_ANO.value df = pd.read_csv(f"{file_path}ano_campeonato={ano}/data.csv") df["data"] = pd.to_datetime(df["data"]).dt.date - max_data = df["data"].max() + max_data = df["data"].max().strftime("%Y-%m-%d") # max_data = mundo_constants.DATA_ATUAL.value return max_data