diff --git a/pipelines/datasets/__init__.py b/pipelines/datasets/__init__.py index c689a187e..86e2675e5 100644 --- a/pipelines/datasets/__init__.py +++ b/pipelines/datasets/__init__.py @@ -42,6 +42,7 @@ from pipelines.datasets.br_rf_cafir.flows import * from pipelines.datasets.br_rj_isp_estatisticas_seguranca.flows import * from pipelines.datasets.br_sp_saopaulo_dieese_icv.flows import * +from pipelines.datasets.br_stf_corte_aberta.flows import * from pipelines.datasets.br_tse_eleicoes.flows import * from pipelines.datasets.cross_update.flows import * from pipelines.datasets.delete_flows.flows import * diff --git a/pipelines/datasets/br_stf_corte_aberta/__init__.py b/pipelines/datasets/br_stf_corte_aberta/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/pipelines/datasets/br_stf_corte_aberta/constants.py b/pipelines/datasets/br_stf_corte_aberta/constants.py new file mode 100644 index 000000000..ee6711acd --- /dev/null +++ b/pipelines/datasets/br_stf_corte_aberta/constants.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +""" +Constant values for the datasets projects +""" +from enum import Enum + + +class constants(Enum): # pylint: disable=c0103 + STF_INPUT = "/tmp/input/" + STF_OUTPUT = "/tmp/output/" + + STF_LINK = "https://transparencia.stf.jus.br/extensions/decisoes/decisoes.html" + + RENAME = { + "Ano da decisão": "ano", + "Classe": "classe", + "Número": "numero", + "Relator": "relator", + "Link": "link", + "Subgrupo andamento decisão": "subgrupo_andamento", + "Andamento decisão": "andamento", + "Observação do andamento": "observacao_andamento_decisao", + "Indicador virtual": "modalidade_julgamento", + "Indicador Colegiada": "tipo_julgamento", + "Indicador eletrônico": "meio_tramitacao", + "Indicador de tramitação": "indicador_tramitacao", + "Assuntos do processo": "assunto_processo", + "Ramo direito": "ramo_direito", + "Data de autuação": "data_autuacao", + "Data da decisão": "data_decisao", + "Data baixa processo": "data_baixa_processo", + } + + ORDEM = [ + "ano", + "classe", + "numero", + "relator", + "link", + "subgrupo_andamento", + "andamento", + "observacao_andamento_decisao", + "modalidade_julgamento", + "tipo_julgamento", + "meio_tramitacao", + "indicador_tramitacao", + "assunto_processo", + "ramo_direito", + "data_autuacao", + "data_decisao", + "data_baixa_processo", + ] diff --git a/pipelines/datasets/br_stf_corte_aberta/flows.py b/pipelines/datasets/br_stf_corte_aberta/flows.py new file mode 100644 index 000000000..60e7cf3c1 --- /dev/null +++ b/pipelines/datasets/br_stf_corte_aberta/flows.py @@ -0,0 +1,116 @@ +# -*- coding: utf-8 -*- +""" +Flows for br_stf_corte_aberta +""" +from datetime import timedelta + +from prefect import Parameter, case +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS +from prefect.tasks.prefect import create_flow_run, wait_for_flow_run + +from pipelines.constants import constants +from pipelines.datasets.br_stf_corte_aberta.schedules import every_day_stf +from pipelines.datasets.br_stf_corte_aberta.tasks import ( + check_for_updates, + download_and_transform, + make_partitions, +) +from pipelines.datasets.br_stf_corte_aberta.utils import check_for_data +from pipelines.utils.constants import constants as utils_constants +from pipelines.utils.decorators import Flow +from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata +from pipelines.utils.tasks import ( + create_table_and_upload_to_gcs, + get_current_flow_labels, + log_task, + rename_current_flow_run_dataset_table, +) + +with Flow(name="br_stf_corte_aberta.decisoes", code_owners=["trick"]) as br_stf: + # Parameters + dataset_id = Parameter("dataset_id", default="br_stf_corte_aberta", required=True) + table_id = Parameter("table_id", default="decisoes", required=True) + materialization_mode = Parameter( + "materialization_mode", default="dev", required=False + ) + materialize_after_dump = Parameter( + "materialize_after_dump", default=True, required=False + ) + dbt_alias = Parameter("dbt_alias", default=True, required=False) + update_metadata = Parameter("update_metadata", default=True, required=False) + rename_flow_run = rename_current_flow_run_dataset_table( + prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id + ) + update_metadata = Parameter("update_metadata", default=True, required=False) + dados_desatualizados = check_for_updates( + dataset_id=dataset_id, table_id=table_id, upstream_tasks=[rename_flow_run] + ) + log_task(f"Checando se os dados estão desatualizados: {dados_desatualizados}") + with case(dados_desatualizados, False): + log_task( + "Dados atualizados, não é necessário fazer o download", + upstream_tasks=[dados_desatualizados, rename_flow_run], + ) + with case(dados_desatualizados, True): + df = download_and_transform(upstream_tasks=[rename_flow_run]) + output_path = make_partitions(df=df, upstream_tasks=[df]) + wait_upload_table = create_table_and_upload_to_gcs( + data_path=output_path, + dataset_id=dataset_id, + table_id=table_id, + dump_mode="append", + wait=output_path, + upstream_tasks=[output_path], + ) + with case(materialize_after_dump, True): + # Trigger DBT flow run + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": table_id, + "mode": materialization_mode, + "dbt_alias": dbt_alias, + }, + labels=current_flow_labels, + run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks=[wait_upload_table], + ) + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + get_max_date = check_for_data() + get_max_date_string = str(get_max_date) + with case(update_metadata, True): + update_django_metadata( + dataset_id=dataset_id, + table_id=table_id, + metadata_type="DateTimeRange", + bq_last_update=False, + bq_table_last_year_month=False, + # billing_project_id="basedosdados-dev", + api_mode="prod", + date_format="yy-mm-dd", + is_bd_pro=True, + _last_date=get_max_date_string, + is_free=True, + # time_delta = 3, + time_unit="days", + upstream_tasks=[wait_for_materialization], + ) +br_stf.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_stf.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_stf.schedule = every_day_stf diff --git a/pipelines/datasets/br_stf_corte_aberta/schedules.py b/pipelines/datasets/br_stf_corte_aberta/schedules.py new file mode 100644 index 000000000..a14d57105 --- /dev/null +++ b/pipelines/datasets/br_stf_corte_aberta/schedules.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +""" +Schedules for br_stf_corte_aberta +""" + +from datetime import datetime + +from prefect.schedules import Schedule +from prefect.schedules.clocks import CronClock + +from pipelines.constants import constants + +every_day_stf = Schedule( + clocks=[ + CronClock( + cron="0 12 * * *", # Irá rodar todos os dias meio dia + start_date=datetime(2021, 1, 1), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_stf_corte_aberta", + "table_id": "decisoes", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "update_metadata": True, + }, + ), + ] +) diff --git a/pipelines/datasets/br_stf_corte_aberta/tasks.py b/pipelines/datasets/br_stf_corte_aberta/tasks.py new file mode 100644 index 000000000..fba15990a --- /dev/null +++ b/pipelines/datasets/br_stf_corte_aberta/tasks.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- +""" +Tasks for br_stf_corte_aberta +""" +from datetime import timedelta + +import pandas as pd +from prefect import task + +from pipelines.constants import constants +from pipelines.datasets.br_stf_corte_aberta.constants import constants as stf_constants +from pipelines.datasets.br_stf_corte_aberta.utils import ( + check_for_data, + column_bool, + extract_last_date, + fix_columns_data, + partition_data, + read_csv, + rename_ordening_columns, + replace_columns, +) +from pipelines.utils.utils import log + + +@task( + max_retries=constants.TASK_MAX_RETRIES.value, + retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), +) +def check_for_updates(dataset_id, table_id): + data_obj = check_for_data() + # Obtém a última data no site BD + data_bq_obj = extract_last_date( + dataset_id=dataset_id, + table_id=table_id, + date_format="yy-mm-dd", + billing_project_id="basedosdados", + data="data_decisao", + ) + # Registra a data mais recente do site + log(f"Última data no site do STF: {data_obj}") + log(f"Última data no site da BD: {data_bq_obj}") + # Compara as datas para verificar se há atualizações + if data_obj > data_bq_obj: + return True # Há atualizações disponíveis + else: + return False # Não há novas atualizações disponíveis + + +@task( + max_retries=constants.TASK_MAX_RETRIES.value, + retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), +) +# make_partition +def make_partitions(df): + partition_data(df, "data_decisao", stf_constants.STF_OUTPUT.value) + return stf_constants.STF_OUTPUT.value + + +@task( + max_retries=constants.TASK_MAX_RETRIES.value, + retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), +) +def download_and_transform(): + log("Iniciando a leitura do csv") + df = read_csv() + log("Iniciando a correção das colunas de data") + df = fix_columns_data(df) + log("Iniciando a correção da coluna booleana") + df = column_bool(df) + log("Iniciando a renomeação e ordenação das colunas") + df = rename_ordening_columns(df) + log("Iniciando a substituição de variáveis") + df = replace_columns(df) + return df diff --git a/pipelines/datasets/br_stf_corte_aberta/utils.py b/pipelines/datasets/br_stf_corte_aberta/utils.py new file mode 100644 index 000000000..e1cfa4f3f --- /dev/null +++ b/pipelines/datasets/br_stf_corte_aberta/utils.py @@ -0,0 +1,207 @@ +# -*- coding: utf-8 -*- +""" +General purpose functions for the br_stf_corte_aberta project +""" +import os +import time +from datetime import datetime + +import basedosdados as bd +import numpy as np +import pandas as pd +from selenium import webdriver + +from pipelines.datasets.br_stf_corte_aberta.constants import constants as stf_constants +from pipelines.utils.utils import log + + +def web_scrapping(): + if not os.path.exists(stf_constants.STF_INPUT.value): + os.mkdir(stf_constants.STF_INPUT.value) + options = webdriver.ChromeOptions() + # https://github.com/SeleniumHQ/selenium/issues/11637 + prefs = { + "download.default_directory": stf_constants.STF_INPUT.value, + "download.prompt_for_download": False, + "download.directory_upgrade": True, + "safebrowsing.enabled": True, + } + options.add_experimental_option( + "prefs", + prefs, + ) + options.add_argument("--headless=new") + options.add_argument("--no-sandbox") + options.add_argument("--disable-gpu") + options.add_argument("--disable-dev-shm-usage") + options.add_argument("--crash-dumps-dir=/tmp") + options.add_argument("--remote-debugging-port=9222") + driver = webdriver.Chrome(options=options) + time.sleep(30) + driver.get(stf_constants.STF_LINK.value) + time.sleep(30) + driver.maximize_window() + time.sleep(30) + driver.find_element("xpath", '//*[@id="EXPORT-BUTTON-2"]/button').click() + time.sleep(30) + + +def read_csv(): + arquivos = os.listdir(stf_constants.STF_INPUT.value) + log("Verificando dados dentro do container") + log(arquivos) + for arquivo in arquivos: + if arquivo.endswith(".csv"): + df = pd.read_csv(stf_constants.STF_INPUT.value + arquivo, dtype=str) + return df + + +def fix_columns_data(df): + lista = ["Data de autuação", "Data da decisão", "Data baixa processo"] + for x in lista: + df[x] = df[x].astype(str).str[0:10] + df[x] = ( + df[x].astype(str).str[6:10] + + "-" + + df[x].astype(str).str[3:5] + + "-" + + df[x].astype(str).str[0:2] + ) + return df + + +def column_bool(df): + df["Indicador de tramitação"] = ( + df["Indicador de tramitação"].replace("Não", "false").replace("Sim", "true") + ) + return df + + +def rename_ordening_columns(df): + df.rename(columns=stf_constants.RENAME.value, inplace=True) + df = df[stf_constants.ORDEM.value] + return df + + +def replace_columns(df): + df["assunto_processo"] = df["assunto_processo"].apply( + lambda x: str(x).replace("\r", " ") + ) + df = df.apply(lambda x: x.replace("-", "").replace(np.nan, "")) + return df + + +def partition_data(df: pd.DataFrame, column_name: list[str], output_directory: str): + """ + Particiona os dados em subconjuntos de acordo com os valores únicos de uma coluna. + Salva cada subconjunto em um arquivo CSV separado. + df: DataFrame a ser particionado + column_name: nome da coluna a ser usada para particionar os dados + output_directory: diretório onde os arquivos CSV serão salvos + """ + unique_values = df[column_name].unique() + for value in unique_values: + value_str = str(value)[:10] + date_value = datetime.strptime(value_str, "%Y-%m-%d").date() + formatted_value = date_value.strftime("%Y-%m-%d") + partition_path = os.path.join( + output_directory, f"{column_name}={formatted_value}" + ) + if not os.path.exists(partition_path): + os.makedirs(partition_path) + df_partition = df[df[column_name] == value].copy() + df_partition.drop([column_name], axis=1, inplace=True) + csv_path = os.path.join(partition_path, "data.csv") + # mode = "a" if os.path.exists(csv_path) else "w" + df_partition.to_csv( + csv_path, + sep=",", + index=False, + encoding="utf-8", + na_rep="", + ) + + +def extract_last_date( + dataset_id, table_id, date_format: str, billing_project_id: str, data: str = "data" +): + """ + Extracts the last update date of a given dataset table. + Args: + dataset_id (str): The ID of the dataset. + table_id (str): The ID of the table. + date_format (str): Date format ('yy-mm' or 'yy-mm-dd') + if set to 'yy-mm' the function will look for ano and mes named columns in the table_id + and return a concatenated string in the formar yyyy-mm. if set to 'yyyy-mm-dd' + the function will look for data named column in the format 'yyyy-mm-dd' and return it. + Returns: + str: The last update date in the format 'yyyy-mm' or 'yyyy-mm-dd'. + Raises: + Exception: If an error occurs while extracting the last update date. + """ + if date_format == "yy-mm": + try: + query_bd = f""" + SELECT + MAX(CONCAT(ano,"-",mes)) as max_date + FROM + `{billing_project_id}.{dataset_id}.{table_id}` + """ + t = bd.read_sql( + query=query_bd, + billing_project_id=billing_project_id, + from_file=True, + ) + input_date_str = t["max_date"][0] + date_obj = datetime.strptime(input_date_str, "%Y-%m") + last_date = date_obj.strftime("%Y-%m") + log(f"Última data YYYY-MM: {last_date}") + return last_date + except Exception as e: + log(f"An error occurred while extracting the last update date: {str(e)}") + raise + else: + try: + query_bd = f""" + SELECT + MAX({data}) as max_date + FROM + `{billing_project_id}.{dataset_id}.{table_id}` + """ + log(f"Query: {query_bd}") + t = bd.read_sql( + query=query_bd, + billing_project_id=billing_project_id, + from_file=True, + ) + # it infers that the data variable is already on basedosdados standart format + # yyyy-mm-dd + last_date = t["max_date"][0] + log(f"Última data YYYY-MM-DD: {last_date}") + return last_date + except Exception as e: + log(f"An error occurred while extracting the last update date: {str(e)}") + raise + + +def check_for_data(): + log("Iniciando web scrapping") + web_scrapping() + log("Iniciando o check for data") + arquivos = os.listdir(stf_constants.STF_INPUT.value) + for arquivo in arquivos: + if arquivo.endswith(".csv"): + df = pd.read_csv(stf_constants.STF_INPUT.value + arquivo, dtype=str) + + df["Data da decisão"] = df["Data da decisão"].astype(str).str[0:10] + data_obj = df["Data da decisão"] = ( + df["Data da decisão"].astype(str).str[6:10] + + "-" + + df["Data da decisão"].astype(str).str[3:5] + + "-" + + df["Data da decisão"].astype(str).str[0:2] + ) + data_obj = data_obj.max() + data_obj = datetime.strptime(data_obj, "%Y-%m-%d").date() + + return data_obj diff --git a/pipelines/utils/utils.py b/pipelines/utils/utils.py index d7a2acce1..9dee7e36c 100644 --- a/pipelines/utils/utils.py +++ b/pipelines/utils/utils.py @@ -774,7 +774,7 @@ def extract_last_date( SELECT MAX({data}) as max_date FROM - `basedosdados.{dataset_id}.{table_id}` + `{billing_project_id}.{dataset_id}.{table_id}` """ log(f"Query: {query_bd}") t = bd.read_sql(