diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py b/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py index 40256993b..887caed1f 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py @@ -12,9 +12,6 @@ from pipelines.constants import constants ############################################################################### -from pipelines.datasets.mundo_transfermarkt_competicoes.constants import ( - constants as mundo_constants, -) from pipelines.datasets.mundo_transfermarkt_competicoes.schedules import ( every_day_brasileirao, every_day_copa, @@ -23,6 +20,7 @@ execucao_coleta_sync, make_partitions, get_data_source_transfermarkt_max_date, + get_data_source_max_date_copa ) from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow @@ -129,7 +127,7 @@ with Flow( name="mundo_transfermarkt_competicoes.copa_brasil", code_owners=[ - "equipe_pipelines", + "luiz", ], ) as transfermarkt_copa_flow: dataset_id = Parameter( @@ -147,59 +145,71 @@ rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id ) - df = execucao_coleta_sync(table_id) - output_filepath = make_partitions(df, upstream_tasks=[df]) - wait_upload_table = create_table_and_upload_to_gcs( - data_path=output_filepath, + data_source_max_date = get_data_source_max_date_copa() + + outdated = check_if_data_is_outdated( dataset_id=dataset_id, table_id=table_id, - dump_mode="append", - wait=output_filepath, + data_source_max_date=data_source_max_date, + upstream_tasks=[data_source_max_date], ) - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id, - "mode": materialization_mode, - "dbt_alias": dbt_alias, - "dbt_command": "run/test", - }, - labels=current_flow_labels, - run_name=r"Materialize {dataset_id}.{table_id}", - ) + with case(outdated, True): - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) + df = execucao_coleta_sync(table_id, upstream_tasks=[outdated]) + output_filepath = make_partitions(df, upstream_tasks=[df]) - update_django_metadata( + wait_upload_table = create_table_and_upload_to_gcs( + data_path=output_filepath, dataset_id=dataset_id, table_id=table_id, - date_column_name={"date": "data"}, - date_format="%Y-%m-%d", - coverage_type="part_bdpro", - time_delta={"months": 6}, - prefect_mode=materialization_mode, - bq_project="basedosdados", - upstream_tasks=[wait_for_materialization], + dump_mode="append", + wait=output_filepath, ) + with case(materialize_after_dump, True): + # Trigger DBT flow run + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": table_id, + "mode": materialization_mode, + "dbt_alias": dbt_alias, + "dbt_command": "run/test", + }, + labels=current_flow_labels, + run_name=r"Materialize {dataset_id}.{table_id}", + ) + + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + update_django_metadata( + dataset_id=dataset_id, + table_id=table_id, + date_column_name={"date": "data"}, + date_format="%Y-%m-%d", + coverage_type="part_bdpro", + time_delta={"months": 6}, + prefect_mode=materialization_mode, + bq_project="basedosdados", + upstream_tasks=[wait_for_materialization], + ) + transfermarkt_copa_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) transfermarkt_copa_flow.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py b/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py index 51c94ef28..f18d2603f 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/tasks.py @@ -9,16 +9,53 @@ from pandas import DataFrame from prefect import task from datetime import timedelta +import requests +from bs4 import BeautifulSoup +from datetime import datetime +import re from pipelines.constants import constants from pipelines.datasets.mundo_transfermarkt_competicoes.utils import ( execucao_coleta, execucao_coleta_copa, data_url, ) + +from pipelines.datasets.mundo_transfermarkt_competicoes.constants import ( + constants as mundo_constants, +) from pipelines.utils.utils import log, to_partitions ############################################################################### +@task( + max_retries=1, + retry_delay=timedelta(seconds=60), +) +def get_data_source_max_date_copa() -> datetime: + + season = mundo_constants.SEASON.value + base_url = f"https://www.transfermarkt.com.br/copa-do-brasil/gesamtspielplan/pokalwettbewerb/BRC/saison_id/{season}" + + headers = { + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36" + } + + html = requests.get(base_url, headers=headers, timeout=120) + + soup = BeautifulSoup(html.text) + + pattern = r'\d+/\d+/\d+' + + datas = [re.findall(pattern, element.text)[0] + for element in soup.select("tr:not([class]) td.hide-for-small") + if re.findall(pattern, element.text)] + + ultima_data = max([datetime.strptime(data, "%d/%m/%Y") + for data in datas + if datetime.strptime(data, "%d/%m/%Y") <= datetime.today()]) + return ultima_data + + @task( max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/utils.py b/pipelines/datasets/mundo_transfermarkt_competicoes/utils.py index 1cbcc875f..fb1e0121b 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/utils.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/utils.py @@ -871,10 +871,6 @@ async def execucao_coleta_copa(): base_link = "https://www.transfermarkt.com" base_link_br = "https://www.transfermarkt.com.br" - links = [] - time_man = [] - time_vis = [] - gols = [] gols_man = [] gols_vis = [] penalti = [] @@ -884,27 +880,21 @@ async def execucao_coleta_copa(): # Pegar o link das partidas # Para cada temporada, adiciona os links dos jogos em `links` log(f"Obtendo links: temporada {season}") + site_data = requests.get(base_url.format(season=season), headers=headers) soup = BeautifulSoup(site_data.content, "html.parser") - link_tags = soup.find_all("a", attrs={"class": "ergebnis-link"}) - for tag in link_tags: - links.append(re.sub(r"\s", "", tag["href"])) + + links = [element.get("href") for element in soup.select("td.zentriert.hauptlink a")] # Na página principal coletar informações gerais de cada partida # Coleta a quantidade de gols e nomes dos times - tabela_grand = soup.findAll("div", class_="box")[1] - tabela = tabela_grand.findAll("tbody") - for i in range(0, len(tabela)): - for row in tabela[i].findAll("tr"): - if not row.get("class"): - td_tags = row.findAll("td") - # Verifica se existem pelo menos três