Skip to content

Commit

Permalink
add arquivos
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielle-carv committed Oct 16, 2023
1 parent 21d9f52 commit 3f1692c
Show file tree
Hide file tree
Showing 8 changed files with 949 additions and 0 deletions.
1 change: 1 addition & 0 deletions pipelines/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@
from pipelines.datasets.delete_flows.flows import *
from pipelines.datasets.fundacao_lemann.flows import *
from pipelines.datasets.mundo_transfermarkt_competicoes.flows import *
from pipelines.datasets.mundo_transfermarkt_competicoes_internacionais.flows import *
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# -*- coding: utf-8 -*-
"""
Constant values for the datasets projects
"""

###############################################################################
import datetime
from enum import Enum


class constants(Enum): # pylint: disable=c0103
"""
Constant values for the mundo_transfermarkt_competicoes_internacionais project
"""

SEASON = datetime.datetime.now().year - 1

ORDEM_COLUNA_FINAL = [
"temporada",
"data",
"horario",
"fase",
"tipo_fase",
"estadio",
"arbitro",
"publico",
"publico_max",
"time_man",
"time_vis",
"tecnico_man",
"tecnico_vis",
"idade_tecnico_man",
"idade_tecnico_vis",
"data_inicio_tecnico_man",
"data_inicio_tecnico_vis",
"data_final_tecnico_man",
"data_final_tecnico_vis",
"proporcao_sucesso_man",
"proporcao_sucesso_vis",
"valor_equipe_titular_man",
"valor_equipe_titular_vis",
"valor_medio_equipe_titular_man",
"valor_medio_equipe_titular_vis",
"convocacao_selecao_principal_man",
"convocacao_selecao_principal_vis",
"selecao_juniores_man",
"selecao_juniores_vis",
"estrangeiros_man",
"estrangeiros_vis",
"socios_man",
"socios_vis",
"idade_media_titular_man",
"idade_media_titular_vis",
"gols_man",
"gols_vis",
"prorrogacao",
"penalti",
"gols_1_tempo_man",
"gols_1_tempo_vis",
"escanteios_man",
"escanteios_vis",
"faltas_man",
"faltas_vis",
"chutes_bola_parada_man",
"chutes_bola_parada_vis",
"defesas_man",
"defesas_vis",
"impedimentos_man",
"impedimentos_vis",
"chutes_man",
"chutes_vis",
"chutes_fora_man",
"chutes_fora_vis",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
"""
Custom decorators for pipelines.
"""
import asyncio

import requests
from bs4 import BeautifulSoup


def retry(content_function):
async def wrapper(url, attempts=2, wait_time=2, **kwargs):
content = None
count = 0

while content is None and count < attempts:
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36"
}
try:
response = await asyncio.to_thread(
requests.get, url, headers=headers, timeout=100
)
except Exception:
return None
await asyncio.sleep(wait_time)
soup = BeautifulSoup(response.text, "html.parser")
try:
content = content_function(soup, **kwargs)
except Exception:
if count == (attempts - 1):
# Could not get content
content = None
count += 1
# await asyncio.sleep(10)

return content

return wrapper
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# -*- coding: utf-8 -*-
"""
Flows for mundo_transfermarkt_competicoes_internacionais
"""


from datetime import timedelta

from prefect import Parameter, case
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

from pipelines.constants import constants
from pipelines.datasets.mundo_transfermarkt_competicoes_internacionais.constants import (
constants as mundo_constants,
)
from pipelines.datasets.mundo_transfermarkt_competicoes_internacionais.schedules import (
every_two_weeks,
)
from pipelines.datasets.mundo_transfermarkt_competicoes_internacionais.tasks import (
execucao_coleta_sync,
get_max_data,
make_partitions,
)
from pipelines.datasets.mundo_transfermarkt_competicoes_internacionais.utils import (
execucao_coleta,
)
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants
from pipelines.utils.metadata.tasks import update_django_metadata
from pipelines.utils.tasks import (
create_table_and_upload_to_gcs,
get_current_flow_labels,
log_task,
rename_current_flow_run_dataset_table,
)

###############################################################################


with Flow(
name="mundo_transfermarkt_competicoes_internacionais.champions_league",
code_owners=[
"Gabs",
],
) as transfermarkt_flow:
dataset_id = Parameter(
"dataset_id",
default="mundo_transfermarkt_competicoes_internacionais",
required=True,
)
table_id = Parameter("table_id", default="champions_league", required=True)
materialization_mode = Parameter(
"materialization_mode", default="dev", required=False
)
materialize_after_dump = Parameter(
"materialize_after_dump", default=True, required=False
)
dbt_alias = Parameter("dbt_alias", default=True, required=False)

rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
)

log_task("Checando se os dados estão desatualizados")
df = execucao_coleta_sync()
output_filepath = make_partitions(df, upstream_tasks=[df])
data_maxima = get_max_data(df, upstream_tasks=[df])

wait_upload_table = create_table_and_upload_to_gcs(
data_path=output_filepath,
dataset_id=dataset_id,
table_id=table_id,
dump_mode="append",
wait=output_filepath,
)

with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": dataset_id,
"table_id": table_id,
"mode": materialization_mode,
"dbt_alias": dbt_alias,
},
labels=current_flow_labels,
run_name=r"Materialize {dataset_id}.{table_id}",
)

wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
_last_date=data_maxima,
bq_table_last_year_month=False,
bq_last_update=False,
is_bd_pro=True,
is_free=True,
time_delta=6,
time_unit="months",
date_format="yy-mm-dd",
api_mode="prod",
upstream_tasks=[materialization_flow],
)

transfermarkt_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
transfermarkt_flow.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
transfermarkt_flow.schedule = every_two_weeks
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
"""
Schedules for mundo_transfermarkt_competicoes_internacionais
"""
###############################################################################

from datetime import datetime

from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock

from pipelines.constants import constants

every_two_weeks = Schedule(
clocks=[
CronClock(
cron="0 9 * 2-10 2",
start_date=datetime(2023, 5, 1, 7, 30),
labels=[
constants.BASEDOSDADOS_PROD_AGENT_LABEL.value,
],
parameter_defaults={
"dataset_id": "mundo_transfermarkt_competicoes_internacionais",
"table_id": "champions_league",
"materialization_mode": "prod",
"materialize_after_dump": True,
"dbt_alias": True,
},
),
]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# -*- coding: utf-8 -*-
"""
Tasks for mundo_transfermarkt_competicoes_internacionais
"""

###############################################################################

import asyncio

import numpy as np
import pandas as pd
from prefect import task

###############################################################################
from pipelines.datasets.mundo_transfermarkt_competicoes_internacionais.constants import (
constants as mundo_constants,
)
from pipelines.datasets.mundo_transfermarkt_competicoes_internacionais.utils import (
execucao_coleta,
)
from pipelines.utils.utils import log, to_partitions


@task
def execucao_coleta_sync():
# Obter o loop de eventos atual e executar a tarefa nele
loop = asyncio.get_event_loop()
df = loop.run_until_complete(execucao_coleta())

return df


@task
def make_partitions(df):
log("Particionando os dados...")
df["temporada"] = df["temporada"].astype(str)
to_partitions(
data=df,
partition_columns=["temporada"],
savepath="/tmp/data/mundo_transfermarkt_competicoes_internacionais/output/",
)
log("Dados particionados com sucesso!")
return "/tmp/data/mundo_transfermarkt_competicoes_internacionais/output/"


@task
def get_max_data(df):
df["data"] = pd.to_datetime(df["data"]).dt.date
max_data = df["data"].max().strftime("%Y-%m-%d")

return max_data
Loading

0 comments on commit 3f1692c

Please sign in to comment.