Skip to content

Commit

Permalink
Merge pull request #477 from basedosdados/staging/mundo_transfermarkt…
Browse files Browse the repository at this point in the history
…_competicoes_copa_brasil

[feat] mundo_transfermarkt_competicoes.copa_brasil
  • Loading branch information
gabrielle-carv authored Sep 22, 2023
2 parents 449cf4c + 8238d9b commit c0357d0
Show file tree
Hide file tree
Showing 5 changed files with 646 additions and 225 deletions.
68 changes: 41 additions & 27 deletions pipelines/datasets/mundo_transfermarkt_competicoes/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,6 @@
"""
Constant values for the datasets projects
"""


###############################################################################
#
# Esse é um arquivo onde podem ser declaratas constantes que serão usadas
# pelo projeto mundo_transfermarkt.
#
# Por ser um arquivo opcional, pode ser removido sem prejuízo ao funcionamento
# do projeto, caos não esteja em uso.
#
# Para declarar constantes, basta fazer conforme o exemplo abaixo:
#
# ```
# class constants(Enum):
# """
# Constant values for the mundo_transfermarkt_competicoes project
# """
# FOO = "bar"
# ```
#
# Para usá-las, basta fazer conforme o exemplo abaixo:
#
# ```py
# from pipelines.datasets.mundo_transfermarkt_competicoes.constants import constants
# print(constants.FOO.value)
# ```
#
###############################################################################

from enum import Enum
Expand Down Expand Up @@ -81,3 +54,44 @@ class constants(Enum): # pylint: disable=c0103
"chutes_fora_man",
"chutes_fora_vis",
]

ORDEM_COPA_BRASIL = [
"ano_campeonato",
"data",
"horario",
"fase",
"tipo_fase",
"estadio",
"arbitro",
"publico",
"publico_max",
"time_man",
"time_vis",
"tecnico_man",
"tecnico_vis",
"valor_equipe_titular_man",
"valor_equipe_titular_vis",
"idade_media_titular_man",
"idade_media_titular_vis",
"gols_man",
"gols_vis",
"gols_1_tempo_man",
"gols_1_tempo_vis",
"penalti",
"gols_penalti_man",
"gols_penalti_vis",
"escanteios_man",
"escanteios_vis",
"faltas_man",
"faltas_vis",
"chutes_bola_parada_man",
"chutes_bola_parada_vis",
"defesas_man",
"defesas_vis",
"impedimentos_man",
"impedimentos_vis",
"chutes_man",
"chutes_vis",
"chutes_fora_man",
"chutes_fora_vis",
]
150 changes: 93 additions & 57 deletions pipelines/datasets/mundo_transfermarkt_competicoes/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,59 +2,6 @@
"""
Flows for mundo_transfermarkt_competicoes
"""

###############################################################################
#
# Aqui é onde devem ser definidos os flows do projeto.
# Cada flow representa uma sequência de passos que serão executados
# em ordem.
#
# Mais informações sobre flows podem ser encontradas na documentação do
# Prefect: https://docs.prefect.io/core/concepts/flows.html
#
# De modo a manter consistência na codebase, todo o código escrito passará
# pelo pylint. Todos os warnings e erros devem ser corrigidos.
#
# Existem diversas maneiras de declarar flows. No entanto, a maneira mais
# conveniente e recomendada pela documentação é usar a API funcional.
# Em essência, isso implica simplesmente na chamada de funções, passando
# os parâmetros necessários para a execução em cada uma delas.
#
# Também, após a definição de um flow, para o adequado funcionamento, é
# mandatório configurar alguns parâmetros dele, os quais são:
# - storage: onde esse flow está armazenado. No caso, o storage é o
# próprio módulo Python que contém o flow. Sendo assim, deve-se
# configurar o storage como o pipelines.datasets
# - run_config: para o caso de execução em cluster Kubernetes, que é
# provavelmente o caso, é necessário configurar o run_config com a
# imagem Docker que será usada para executar o flow. Assim sendo,
# basta usar constants.DOCKER_IMAGE.value, que é automaticamente
# gerado.
# - schedule (opcional): para o caso de execução em intervalos regulares,
# deve-se utilizar algum dos schedules definidos em schedules.py
#
# Um exemplo de flow, considerando todos os pontos acima, é o seguinte:
#
# -----------------------------------------------------------------------------
# from prefect import task
# from prefect import Flow
# from prefect.run_configs import KubernetesRun
# from prefect.storage import GCS
# from pipelines.constants import constants
# from my_tasks import my_task, another_task
# from my_schedules import some_schedule
#
# with Flow("my_flow") as flow:
# a = my_task(param1=1, param2=2)
# b = another_task(a, param3=3)
#
# flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
# flow.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
# flow.schedule = some_schedule
# -----------------------------------------------------------------------------
#
# Abaixo segue um código para exemplificação, que pode ser removido.
#
###############################################################################
from pipelines.datasets.mundo_transfermarkt_competicoes.constants import (
constants as mundo_constants,
Expand All @@ -64,8 +11,14 @@
get_max_data,
execucao_coleta_sync,
)
from pipelines.datasets.mundo_transfermarkt_competicoes.utils import execucao_coleta
from pipelines.datasets.mundo_transfermarkt_competicoes.schedules import every_week
from pipelines.datasets.mundo_transfermarkt_competicoes.utils import (
execucao_coleta,
execucao_coleta_copa,
)
from pipelines.datasets.mundo_transfermarkt_competicoes.schedules import (
every_week,
every_week_copa,
)
from pipelines.utils.tasks import (
create_table_and_upload_to_gcs,
rename_current_flow_run_dataset_table,
Expand Down Expand Up @@ -107,9 +60,9 @@
rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
)
df = execucao_coleta_sync(execucao_coleta)
df = execucao_coleta_sync(table_id)
output_filepath = make_partitions(df, upstream_tasks=[df])
data_maxima = get_max_data()
data_maxima = get_max_data(output_filepath, upstream_tasks=[output_filepath])

wait_upload_table = create_table_and_upload_to_gcs(
data_path=output_filepath,
Expand Down Expand Up @@ -161,10 +114,93 @@
time_unit="weeks",
date_format="yy-mm-dd",
api_mode="prod",
upstream_tasks=[materialization_flow],
)

transfermarkt_brasileirao_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
transfermarkt_brasileirao_flow.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value
)
transfermarkt_brasileirao_flow.schedule = every_week

with Flow(
name="mundo_transfermarkt_competicoes.copa_brasil",
code_owners=[
"Gabs",
],
) as transfermarkt_copa_flow:
dataset_id = Parameter(
"dataset_id", default="mundo_transfermarkt_competicoes", required=True
)
table_id = Parameter("table_id", default="copa_brasil", required=True)
materialization_mode = Parameter(
"materialization_mode", default="dev", required=False
)
materialize_after_dump = Parameter(
"materialize_after_dump", default=True, required=False
)
dbt_alias = Parameter("dbt_alias", default=True, required=False)

rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
)
df = execucao_coleta_sync(table_id)
output_filepath = make_partitions(df, upstream_tasks=[df])
data_maxima = get_max_data(output_filepath, upstream_tasks=[output_filepath])

wait_upload_table = create_table_and_upload_to_gcs(
data_path=output_filepath,
dataset_id=dataset_id,
table_id=table_id,
dump_mode="append",
wait=output_filepath,
)

with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": dataset_id,
"table_id": table_id,
"mode": materialization_mode,
"dbt_alias": dbt_alias,
},
labels=current_flow_labels,
run_name=r"Materialize {dataset_id}.{table_id}",
)

wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
_last_date=data_maxima,
bq_table_last_year_month=False,
bq_last_update=False,
is_bd_pro=True,
is_free=True,
time_delta=6,
time_unit="months",
date_format="yy-mm-dd",
api_mode="prod",
upstream_tasks=[materialization_flow],
)

transfermarkt_copa_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
transfermarkt_copa_flow.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
transfermarkt_copa_flow.schedule = every_week_copa
86 changes: 21 additions & 65 deletions pipelines/datasets/mundo_transfermarkt_competicoes/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,70 +3,6 @@
Schedules for mundo_transfermarkt_competicoes
"""

###############################################################################
#
# Aqui é onde devem ser definidos os schedules para os flows do projeto.
# Cada schedule indica o intervalo de tempo entre as execuções.
# Um schedule pode ser definido para um ou mais flows.
# Mais informações sobre schedules podem ser encontradas na documentação do
# Prefect: https://docs.prefect.io/core/concepts/schedules.html
#
# De modo a manter consistência na codebase, todo o código escrito passará
# pelo pylint. Todos os warnings e erros devem ser corrigidos.
#
# Os schedules devem ser definidos de acordo com a sintaxe do Prefect, como,
# por exemplo, o seguinte (para executar a cada 1 minuto):
#
# -----------------------------------------------------------------------------
# from datetime import timedelta, datetime
# from prefect.schedules import Schedule
# from prefect.schedules.clocks import IntervalClock
# from pipelines.constants import constants
#
# minute_schedule = Schedule(
# clocks=[
# IntervalClock(
# interval=timedelta(minutes=1),
# start_date=datetime(2021, 1, 1),
# labels=[
# constants.DATASETS_AGENT_LABEL.value,
# ]
# ),
# ]
# )
# -----------------------------------------------------------------------------
#
# Vale notar que o parâmetro `labels` é obrigatório e deve ser uma lista com
# apenas um elemento, correspondendo ao label do agente que será executado.
# O label do agente é definido em `constants.py` e deve ter o formato
# `DATASETS_AGENT_LABEL`.
#
# Outro exemplo, para executar todos os dias à meia noite, segue abaixo:
#
# -----------------------------------------------------------------------------
# from prefect import task
# from datetime import timedelta
# import pendulum
# from prefect.schedules import Schedule
# from prefect.schedules.clocks import IntervalClock
# from pipelines.constants import constants
#
# every_day_at_midnight = Schedule(
# clocks=[
# IntervalClock(
# interval=timedelta(days=1),
# start_date=pendulum.datetime(
# 2021, 1, 1, 0, 0, 0, tz="America/Sao_Paulo"),
# labels=[
# constants.K8S_AGENT_LABEL.value,
# ]
# )
# ]
# )
# -----------------------------------------------------------------------------
#
# Abaixo segue um código para exemplificação, que pode ser removido.
#
###############################################################################

from prefect.schedules.clocks import CronClock
Expand All @@ -88,7 +24,27 @@
"table_id": "brasileirao_serie_a",
"materialization_mode": "prod",
"materialize_after_dump": True,
"dbt_alias": False,
"dbt_alias": True,
},
),
]
)


every_week_copa = Schedule(
clocks=[
CronClock(
cron="0 9 * 2-10 2",
start_date=datetime(2023, 5, 1, 7, 30),
labels=[
constants.BASEDOSDADOS_PROD_AGENT_LABEL.value,
],
parameter_defaults={
"dataset_id": "mundo_transfermarkt_competicoes",
"table_id": "copa_brasil",
"materialization_mode": "prod",
"materialize_after_dump": True,
"dbt_alias": True,
},
),
]
Expand Down
Loading

0 comments on commit c0357d0

Please sign in to comment.