Skip to content

Commit

Permalink
Merge pull request #481 from basedosdados/staging/br_stf_corte_aberta
Browse files Browse the repository at this point in the history
[dados] br_stf_corte_aberta
  • Loading branch information
tricktx authored Oct 2, 2023
2 parents d462ac1 + 31f2b43 commit 46d41a4
Show file tree
Hide file tree
Showing 8 changed files with 482 additions and 1 deletion.
1 change: 1 addition & 0 deletions pipelines/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from pipelines.datasets.br_rf_cafir.flows import *
from pipelines.datasets.br_rj_isp_estatisticas_seguranca.flows import *
from pipelines.datasets.br_sp_saopaulo_dieese_icv.flows import *
from pipelines.datasets.br_stf_corte_aberta.flows import *
from pipelines.datasets.br_tse_eleicoes.flows import *
from pipelines.datasets.cross_update.flows import *
from pipelines.datasets.delete_flows.flows import *
Expand Down
Empty file.
52 changes: 52 additions & 0 deletions pipelines/datasets/br_stf_corte_aberta/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
"""
Constant values for the datasets projects
"""
from enum import Enum


class constants(Enum): # pylint: disable=c0103
STF_INPUT = "/tmp/input/"
STF_OUTPUT = "/tmp/output/"

STF_LINK = "https://transparencia.stf.jus.br/extensions/decisoes/decisoes.html"

RENAME = {
"Ano da decisão": "ano",
"Classe": "classe",
"Número": "numero",
"Relator": "relator",
"Link": "link",
"Subgrupo andamento decisão": "subgrupo_andamento",
"Andamento decisão": "andamento",
"Observação do andamento": "observacao_andamento_decisao",
"Indicador virtual": "modalidade_julgamento",
"Indicador Colegiada": "tipo_julgamento",
"Indicador eletrônico": "meio_tramitacao",
"Indicador de tramitação": "indicador_tramitacao",
"Assuntos do processo": "assunto_processo",
"Ramo direito": "ramo_direito",
"Data de autuação": "data_autuacao",
"Data da decisão": "data_decisao",
"Data baixa processo": "data_baixa_processo",
}

ORDEM = [
"ano",
"classe",
"numero",
"relator",
"link",
"subgrupo_andamento",
"andamento",
"observacao_andamento_decisao",
"modalidade_julgamento",
"tipo_julgamento",
"meio_tramitacao",
"indicador_tramitacao",
"assunto_processo",
"ramo_direito",
"data_autuacao",
"data_decisao",
"data_baixa_processo",
]
116 changes: 116 additions & 0 deletions pipelines/datasets/br_stf_corte_aberta/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# -*- coding: utf-8 -*-
"""
Flows for br_stf_corte_aberta
"""
from datetime import timedelta

from prefect import Parameter, case
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

from pipelines.constants import constants
from pipelines.datasets.br_stf_corte_aberta.schedules import every_day_stf
from pipelines.datasets.br_stf_corte_aberta.tasks import (
check_for_updates,
download_and_transform,
make_partitions,
)
from pipelines.datasets.br_stf_corte_aberta.utils import check_for_data
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants
from pipelines.utils.metadata.tasks import update_django_metadata
from pipelines.utils.tasks import (
create_table_and_upload_to_gcs,
get_current_flow_labels,
log_task,
rename_current_flow_run_dataset_table,
)

with Flow(name="br_stf_corte_aberta.decisoes", code_owners=["trick"]) as br_stf:
# Parameters
dataset_id = Parameter("dataset_id", default="br_stf_corte_aberta", required=True)
table_id = Parameter("table_id", default="decisoes", required=True)
materialization_mode = Parameter(
"materialization_mode", default="dev", required=False
)
materialize_after_dump = Parameter(
"materialize_after_dump", default=True, required=False
)
dbt_alias = Parameter("dbt_alias", default=True, required=False)
update_metadata = Parameter("update_metadata", default=True, required=False)
rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
)
update_metadata = Parameter("update_metadata", default=True, required=False)
dados_desatualizados = check_for_updates(
dataset_id=dataset_id, table_id=table_id, upstream_tasks=[rename_flow_run]
)
log_task(f"Checando se os dados estão desatualizados: {dados_desatualizados}")
with case(dados_desatualizados, False):
log_task(
"Dados atualizados, não é necessário fazer o download",
upstream_tasks=[dados_desatualizados, rename_flow_run],
)
with case(dados_desatualizados, True):
df = download_and_transform(upstream_tasks=[rename_flow_run])
output_path = make_partitions(df=df, upstream_tasks=[df])
wait_upload_table = create_table_and_upload_to_gcs(
data_path=output_path,
dataset_id=dataset_id,
table_id=table_id,
dump_mode="append",
wait=output_path,
upstream_tasks=[output_path],
)
with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": dataset_id,
"table_id": table_id,
"mode": materialization_mode,
"dbt_alias": dbt_alias,
},
labels=current_flow_labels,
run_name=f"Materialize {dataset_id}.{table_id}",
upstream_tasks=[wait_upload_table],
)
wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)
get_max_date = check_for_data()
get_max_date_string = str(get_max_date)
with case(update_metadata, True):
update_django_metadata(
dataset_id=dataset_id,
table_id=table_id,
metadata_type="DateTimeRange",
bq_last_update=False,
bq_table_last_year_month=False,
# billing_project_id="basedosdados-dev",
api_mode="prod",
date_format="yy-mm-dd",
is_bd_pro=True,
_last_date=get_max_date_string,
is_free=True,
# time_delta = 3,
time_unit="days",
upstream_tasks=[wait_for_materialization],
)
br_stf.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_stf.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
br_stf.schedule = every_day_stf
31 changes: 31 additions & 0 deletions pipelines/datasets/br_stf_corte_aberta/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
"""
Schedules for br_stf_corte_aberta
"""

from datetime import datetime

from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock

from pipelines.constants import constants

every_day_stf = Schedule(
clocks=[
CronClock(
cron="0 12 * * *", # Irá rodar todos os dias meio dia
start_date=datetime(2021, 1, 1),
labels=[
constants.BASEDOSDADOS_PROD_AGENT_LABEL.value,
],
parameter_defaults={
"dataset_id": "br_stf_corte_aberta",
"table_id": "decisoes",
"materialization_mode": "prod",
"materialize_after_dump": True,
"dbt_alias": True,
"update_metadata": True,
},
),
]
)
74 changes: 74 additions & 0 deletions pipelines/datasets/br_stf_corte_aberta/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# -*- coding: utf-8 -*-
"""
Tasks for br_stf_corte_aberta
"""
from datetime import timedelta

import pandas as pd
from prefect import task

from pipelines.constants import constants
from pipelines.datasets.br_stf_corte_aberta.constants import constants as stf_constants
from pipelines.datasets.br_stf_corte_aberta.utils import (
check_for_data,
column_bool,
extract_last_date,
fix_columns_data,
partition_data,
read_csv,
rename_ordening_columns,
replace_columns,
)
from pipelines.utils.utils import log


@task(
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def check_for_updates(dataset_id, table_id):
data_obj = check_for_data()
# Obtém a última data no site BD
data_bq_obj = extract_last_date(
dataset_id=dataset_id,
table_id=table_id,
date_format="yy-mm-dd",
billing_project_id="basedosdados",
data="data_decisao",
)
# Registra a data mais recente do site
log(f"Última data no site do STF: {data_obj}")
log(f"Última data no site da BD: {data_bq_obj}")
# Compara as datas para verificar se há atualizações
if data_obj > data_bq_obj:
return True # Há atualizações disponíveis
else:
return False # Não há novas atualizações disponíveis


@task(
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
# make_partition
def make_partitions(df):
partition_data(df, "data_decisao", stf_constants.STF_OUTPUT.value)
return stf_constants.STF_OUTPUT.value


@task(
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def download_and_transform():
log("Iniciando a leitura do csv")
df = read_csv()
log("Iniciando a correção das colunas de data")
df = fix_columns_data(df)
log("Iniciando a correção da coluna booleana")
df = column_bool(df)
log("Iniciando a renomeação e ordenação das colunas")
df = rename_ordening_columns(df)
log("Iniciando a substituição de variáveis")
df = replace_columns(df)
return df
Loading

0 comments on commit 46d41a4

Please sign in to comment.