Skip to content

Commit

Permalink
Merge pull request #861 from basedosdados/staging/mundo_transfermarkt…
Browse files Browse the repository at this point in the history
…_competicoes

[Fix] mundo_transfermarkt_competicoes.copa_brasil
  • Loading branch information
Winzen authored Sep 10, 2024
2 parents c09c0e3 + e084ff5 commit 548602d
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 65 deletions.
102 changes: 56 additions & 46 deletions pipelines/datasets/mundo_transfermarkt_competicoes/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
from pipelines.constants import constants

###############################################################################
from pipelines.datasets.mundo_transfermarkt_competicoes.constants import (
constants as mundo_constants,
)
from pipelines.datasets.mundo_transfermarkt_competicoes.schedules import (
every_day_brasileirao,
every_day_copa,
Expand All @@ -23,6 +20,7 @@
execucao_coleta_sync,
make_partitions,
get_data_source_transfermarkt_max_date,
get_data_source_max_date_copa
)
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
Expand Down Expand Up @@ -129,7 +127,7 @@
with Flow(
name="mundo_transfermarkt_competicoes.copa_brasil",
code_owners=[
"equipe_pipelines",
"luiz",
],
) as transfermarkt_copa_flow:
dataset_id = Parameter(
Expand All @@ -147,59 +145,71 @@
rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
)
df = execucao_coleta_sync(table_id)
output_filepath = make_partitions(df, upstream_tasks=[df])

wait_upload_table = create_table_and_upload_to_gcs(
data_path=output_filepath,
data_source_max_date = get_data_source_max_date_copa()

outdated = check_if_data_is_outdated(
dataset_id=dataset_id,
table_id=table_id,
dump_mode="append",
wait=output_filepath,
data_source_max_date=data_source_max_date,
upstream_tasks=[data_source_max_date],
)

with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": dataset_id,
"table_id": table_id,
"mode": materialization_mode,
"dbt_alias": dbt_alias,
"dbt_command": "run/test",
},
labels=current_flow_labels,
run_name=r"Materialize {dataset_id}.{table_id}",
)
with case(outdated, True):

wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)
df = execucao_coleta_sync(table_id, upstream_tasks=[outdated])
output_filepath = make_partitions(df, upstream_tasks=[df])

update_django_metadata(
wait_upload_table = create_table_and_upload_to_gcs(
data_path=output_filepath,
dataset_id=dataset_id,
table_id=table_id,
date_column_name={"date": "data"},
date_format="%Y-%m-%d",
coverage_type="part_bdpro",
time_delta={"months": 6},
prefect_mode=materialization_mode,
bq_project="basedosdados",
upstream_tasks=[wait_for_materialization],
dump_mode="append",
wait=output_filepath,
)

with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": dataset_id,
"table_id": table_id,
"mode": materialization_mode,
"dbt_alias": dbt_alias,
"dbt_command": "run/test",
},
labels=current_flow_labels,
run_name=r"Materialize {dataset_id}.{table_id}",
)

wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

update_django_metadata(
dataset_id=dataset_id,
table_id=table_id,
date_column_name={"date": "data"},
date_format="%Y-%m-%d",
coverage_type="part_bdpro",
time_delta={"months": 6},
prefect_mode=materialization_mode,
bq_project="basedosdados",
upstream_tasks=[wait_for_materialization],
)


transfermarkt_copa_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
transfermarkt_copa_flow.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
Expand Down
37 changes: 37 additions & 0 deletions pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,53 @@
from pandas import DataFrame
from prefect import task
from datetime import timedelta
import requests
from bs4 import BeautifulSoup
from datetime import datetime
import re
from pipelines.constants import constants
from pipelines.datasets.mundo_transfermarkt_competicoes.utils import (
execucao_coleta,
execucao_coleta_copa,
data_url,
)

from pipelines.datasets.mundo_transfermarkt_competicoes.constants import (
constants as mundo_constants,
)
from pipelines.utils.utils import log, to_partitions

###############################################################################

@task(
max_retries=1,
retry_delay=timedelta(seconds=60),
)
def get_data_source_max_date_copa() -> datetime:

season = mundo_constants.SEASON.value
base_url = f"https://www.transfermarkt.com.br/copa-do-brasil/gesamtspielplan/pokalwettbewerb/BRC/saison_id/{season}"

headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36"
}

html = requests.get(base_url, headers=headers, timeout=120)

soup = BeautifulSoup(html.text)

pattern = r'\d+/\d+/\d+'

datas = [re.findall(pattern, element.text)[0]
for element in soup.select("tr:not([class]) td.hide-for-small")
if re.findall(pattern, element.text)]

ultima_data = max([datetime.strptime(data, "%d/%m/%Y")
for data in datas
if datetime.strptime(data, "%d/%m/%Y") <= datetime.today()])
return ultima_data


@task(
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
Expand Down
31 changes: 12 additions & 19 deletions pipelines/datasets/mundo_transfermarkt_competicoes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -871,10 +871,6 @@ async def execucao_coleta_copa():

base_link = "https://www.transfermarkt.com"
base_link_br = "https://www.transfermarkt.com.br"
links = []
time_man = []
time_vis = []
gols = []
gols_man = []
gols_vis = []
penalti = []
Expand All @@ -884,27 +880,21 @@ async def execucao_coleta_copa():
# Pegar o link das partidas
# Para cada temporada, adiciona os links dos jogos em `links`
log(f"Obtendo links: temporada {season}")

site_data = requests.get(base_url.format(season=season), headers=headers)
soup = BeautifulSoup(site_data.content, "html.parser")
link_tags = soup.find_all("a", attrs={"class": "ergebnis-link"})
for tag in link_tags:
links.append(re.sub(r"\s", "", tag["href"]))

links = [element.get("href") for element in soup.select("td.zentriert.hauptlink a")]

# Na página principal coletar informações gerais de cada partida
# Coleta a quantidade de gols e nomes dos times
tabela_grand = soup.findAll("div", class_="box")[1]
tabela = tabela_grand.findAll("tbody")
for i in range(0, len(tabela)):
for row in tabela[i].findAll("tr"):
if not row.get("class"):
td_tags = row.findAll("td")
# Verifica se existem pelo menos três <td> na linha
if len(td_tags) >= 3:
time_man.append(td_tags[2].text.strip())
time_vis.append(td_tags[6].text.strip())
gols.append(td_tags[4].text.strip())

time_man = [element.text for element in soup.select("td.text-right a")]
time_vis = [element.text for element in soup.select("tr:not([class]) td.no-border-links.hauptlink a")]
gols = [element.text for element in soup.select("td.zentriert.hauptlink a")]

# Checagem se a quantidade de links coletados é igual a quantidade de informações gerais coletadas

while (
len(links) != len(time_man)
or len(links) != len(time_vis)
Expand Down Expand Up @@ -994,6 +984,7 @@ async def execucao_coleta_copa():
links_valor.append(valor)

n_links = len(links)

log(f"Encontrados {n_links} partidas.")
log("Extraindo dados...")
# Criando o dataframe para informações gerais já coletadas, para o loop sobre os dados de estatística
Expand Down Expand Up @@ -1102,7 +1093,8 @@ async def execucao_coleta_copa():
# Atualizar a coluna 'fase' com a parte antes do traço ou a própria 'fase' se não houver traço
df["fase"] = df["fase"].str.extract(r"(.+)\s*-\s*(.*)")[0].fillna(df["fase"])

df["data"] = pd.to_datetime(df["data"], format="%d/%m/%y").dt.date
df["data"] = pd.to_datetime(df["data"], format="%d/%m/%y")

df["horario"] = pd.to_datetime(df["horario"], format="%H:%M").dt.strftime("%H:%M")
df["ano_campeonato"] = mundo_constants.DATA_ATUAL_ANO.value

Expand All @@ -1111,6 +1103,7 @@ async def execucao_coleta_copa():
df.fillna("", inplace=True)
df["publico_max"] = df["publico_max"].str.replace("\n", "")
df = df[mundo_constants.ORDEM_COPA_BRASIL.value]
df = df[df.data <= datetime.today()] # Retirar posiveis jogos futuros

return df

Expand Down

0 comments on commit 548602d

Please sign in to comment.