From fb81631742893894172f386c52cafb0ee75eb86e Mon Sep 17 00:00:00 2001 From: tricktx Date: Mon, 4 Nov 2024 18:23:56 -0300 Subject: [PATCH 1/9] refatoring pipeline cgu, add inside cgu crawler and exclude last files --- .../datasets/br_cgu_cartao_pagamento/flows.py | 6 +- .../constants.py | 67 --- .../flows.py | 543 +++--------------- .../schedules.py | 128 ++++- .../tasks.py | 182 ------ .../utils.py | 290 ---------- pipelines/utils/crawler_cgu/constants.py | 97 +++- pipelines/utils/crawler_cgu/flows.py | 107 +++- pipelines/utils/crawler_cgu/tasks.py | 100 +++- pipelines/utils/crawler_cgu/utils.py | 333 +++++++++-- pipelines/utils/utils.py | 2 +- 11 files changed, 769 insertions(+), 1086 deletions(-) delete mode 100644 pipelines/datasets/br_cgu_servidores_executivo_federal/constants.py delete mode 100644 pipelines/datasets/br_cgu_servidores_executivo_federal/tasks.py delete mode 100644 pipelines/datasets/br_cgu_servidores_executivo_federal/utils.py diff --git a/pipelines/datasets/br_cgu_cartao_pagamento/flows.py b/pipelines/datasets/br_cgu_cartao_pagamento/flows.py index 4aa94be4d..a715978ad 100644 --- a/pipelines/datasets/br_cgu_cartao_pagamento/flows.py +++ b/pipelines/datasets/br_cgu_cartao_pagamento/flows.py @@ -10,21 +10,21 @@ every_day_microdados_governo_federal ) -br_cgu_cartao_pagamento__governo_federal = deepcopy(flow_cgu_cartao_pagamento) +br_cgu_cartao_pagamento__governo_federal = copy(flow_cgu_cartao_pagamento) br_cgu_cartao_pagamento__governo_federal.name = "br_cgu_cartao_pagamento.governo_federal" br_cgu_cartao_pagamento__governo_federal.code_owners = ["trick"] br_cgu_cartao_pagamento__governo_federal.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_cgu_cartao_pagamento__governo_federal.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) br_cgu_cartao_pagamento__governo_federal.schedule = every_day_microdados_governo_federal -br_cgu_cartao_pagamento__defesa_civil = deepcopy(flow_cgu_cartao_pagamento) +br_cgu_cartao_pagamento__defesa_civil = copy(flow_cgu_cartao_pagamento) br_cgu_cartao_pagamento__defesa_civil.name = "br_cgu_cartao_pagamento.defesa_civil" br_cgu_cartao_pagamento__defesa_civil.code_owners = ["trick"] br_cgu_cartao_pagamento__defesa_civil.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_cgu_cartao_pagamento__defesa_civil.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) br_cgu_cartao_pagamento__defesa_civil.schedule = every_day_microdados_defesa_civil -br_cgu_cartao_pagamento__compras_centralizadas = deepcopy(flow_cgu_cartao_pagamento) +br_cgu_cartao_pagamento__compras_centralizadas = copy(flow_cgu_cartao_pagamento) br_cgu_cartao_pagamento__compras_centralizadas.name = "br_cgu_cartao_pagamento.compras_centralizadas" br_cgu_cartao_pagamento__compras_centralizadas.code_owners = ["trick"] br_cgu_cartao_pagamento__compras_centralizadas.storage = GCS(constants.GCS_FLOWS_BUCKET.value) diff --git a/pipelines/datasets/br_cgu_servidores_executivo_federal/constants.py b/pipelines/datasets/br_cgu_servidores_executivo_federal/constants.py deleted file mode 100644 index 413d8621b..000000000 --- a/pipelines/datasets/br_cgu_servidores_executivo_federal/constants.py +++ /dev/null @@ -1,67 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Constant values for the datasets projects -""" - -from enum import Enum - -# fmt: off -__ALL_SHEETS__ = [ - "Aposentados_BACEN", - "Aposentados_SIAPE", - "Militares", - "Pensionistas_BACEN", - "Pensionistas_DEFESA", - "Pensionistas_SIAPE", - "Reserva_Reforma_Militares", - "Servidores_BACEN", - "Servidores_SIAPE", -] -# fmt: on - - -class constants(Enum): # pylint: disable=c0103 - """ - Constant values for the br_cgu_servidores_executivo_federal project - """ - - URL = "http://portaldatransparencia.gov.br/download-de-dados/servidores" - - # fmt: off - SHEETS = __ALL_SHEETS__, - # fmt: on - - TABLES = { - "cadastro_aposentados": ["Aposentados_BACEN", "Aposentados_SIAPE"], - "cadastro_pensionistas": [ - "Pensionistas_SIAPE", - "Pensionistas_DEFESA", - "Pensionistas_BACEN", - ], - "cadastro_servidores": ["Servidores_BACEN", "Servidores_SIAPE", "Militares"], - "cadastro_reserva_reforma_militares": ["Reserva_Reforma_Militares"], - "remuneracao": [ - "Militares", - "Pensionistas_BACEN", - "Pensionistas_DEFESA", - "Reserva_Reforma_Militares", - "Servidores_BACEN", - "Servidores_SIAPE", - ], - "afastamentos": ["Servidores_BACEN", "Servidores_SIAPE"], - "observacoes": __ALL_SHEETS__, - } - - ARCH = { - "afastamentos": "https://docs.google.com/spreadsheets/d/1NQ4t9l8znClnfM8NYBLBkI9PoWV5UAosUZ1KGvZe-T0/edit#gid=0", - "cadastro_aposentados": "https://docs.google.com/spreadsheets/d/1_t_JsWbuGlg8cz_2RYYNMuulzA4RHydfJ4TA-wH9Ch8/edit#gid=0", - "observacoes": "https://docs.google.com/spreadsheets/d/1BWt6yvKTfNW0XCDNIsIu8NhSKjhbDVJjnEwvnEmVkRc/edit#gid=0", - "cadastro_pensionistas": "https://docs.google.com/spreadsheets/d/1G_RPhSUZRrCqcQCP1WSjBiYbnjirqTp0yaLNUaPi_7U/edit#gid=0", - "remuneracao": "https://docs.google.com/spreadsheets/d/1LJ8_N53OoNEQQ1PMAeIPKq1asB29MUP5SRoFWnUI6Zg/edit#gid=0", - "cadastro_reserva_reforma_militares": "https://docs.google.com/spreadsheets/d/1vqWjATWjHK-6tbj_ilwbhNWReqe3AKd3VTpzBbPu4qI/edit#gid=0", - "cadastro_servidores": "https://docs.google.com/spreadsheets/d/1U57P5XhCw9gERD8sN24P0vDK0CjkUuOQZwOoELdE3Jg/edit#gid=0", - } - - INPUT = "/tmp/input" - - OUTPUT = "/tmp/output" diff --git a/pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py b/pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py index 0eb11a08d..f88fe58a4 100644 --- a/pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py +++ b/pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py @@ -1,477 +1,78 @@ # -*- coding: utf-8 -*- -""" -Flows for br_cgu_servidores_executivo_federal -""" - -import datetime - -from prefect import Parameter, case +from copy import copy, copy from prefect.run_configs import KubernetesRun from prefect.storage import GCS -from prefect.tasks.prefect import create_flow_run, wait_for_flow_run - +from pipelines.utils.crawler_cgu.flows import flow_cgu_servidores_publicos from pipelines.constants import constants -from pipelines.datasets.br_cgu_servidores_executivo_federal.constants import ( - constants as cgu_constants, +from pipelines.datasets.br_cgu_servidores_executivo_federal.schedules import ( + every_day_afastamentos, + every_day_cadastro_aposentados, + every_day_cadastro_pensionistas, + every_day_cadastro_reserva_reforma_militares, + every_day_cadastro_servidores, + every_day_remuneracao, + every_day_observacoes, ) -from pipelines.datasets.br_cgu_servidores_executivo_federal.schedules import every_day -from pipelines.datasets.br_cgu_servidores_executivo_federal.tasks import ( - download_files, - get_next_date, - is_up_to_date, - make_partitions, - merge_and_clean_data, - table_is_available, - scrape_download_page, - get_source_max_date, -) -from pipelines.utils.constants import constants as utils_constants -from pipelines.utils.decorators import Flow -from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants -from pipelines.utils.metadata.tasks import update_django_metadata -from pipelines.utils.tasks import ( - create_table_and_upload_to_gcs, - get_current_flow_labels, - rename_current_flow_run_dataset_table, -) -from pipelines.utils.utils import log_task - -with Flow( - name="br_cgu_servidores_executivo_federal", - code_owners=[ - "aspeddro", - ], -) as datasets_br_cgu_servidores_executivo_federal_flow: - dataset_id = Parameter( - "dataset_id", default="br_cgu_servidores_executivo_federal", required=False - ) - - tables_ids = list(cgu_constants.TABLES.value.keys()) - - table_id = Parameter("table_id", default=tables_ids, required=False) - - update_metadata = Parameter("update_metadata", default=False, required=False) - materialization_mode = Parameter( - "materialization_mode", default="prod", required=False - ) - materialize_after_dump = Parameter( - "materialize_after_dump", default=False, required=False - ) - dbt_alias = Parameter("dbt_alias", default=True, required=False) - - rename_flow_run = rename_current_flow_run_dataset_table( - prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id - ) - - files_and_dates_dataframe = scrape_download_page() - source_max_date = get_source_max_date(files_and_dates_dataframe, upstream_tasks=[files_and_dates_dataframe]) - next_date = get_next_date(upstream_tasks=[files_and_dates_dataframe, source_max_date]) - data_is_up_to_date = is_up_to_date(next_date, upstream_tasks=[next_date]) - - with case(data_is_up_to_date, True): - log_task("Tabelas estão atualizadas") - - with case(data_is_up_to_date, False): - log_task(f"Starting download, {next_date}, {next_date}") - sheets_info = download_files( - date_start=next_date, date_end=source_max_date, upstream_tasks=[next_date, source_max_date] - ) - log_task("Files downloaded") - - data_clean_by_table = merge_and_clean_data( - sheets_info, upstream_tasks=[sheets_info] - ) - log_task("Data clean finished") - - outputs_path_by_table = make_partitions( - data_clean_by_table, upstream_tasks=[data_clean_by_table] - ) - log_task("Partitions done") - - with case( - table_is_available(outputs_path_by_table, "cadastro_aposentados"), True - ): - wait_upload_table_aposentados_cadastro = create_table_and_upload_to_gcs( - data_path=outputs_path_by_table["cadastro_aposentados"], - dataset_id=dataset_id, - table_id="cadastro_aposentados", - dump_mode="append", - wait=outputs_path_by_table, - ) - - with case( - table_is_available(outputs_path_by_table, "cadastro_pensionistas"), True - ): - wait_upload_table_pensionistas_cadastro = create_table_and_upload_to_gcs( - data_path=outputs_path_by_table["cadastro_pensionistas"], - dataset_id=dataset_id, - table_id="cadastro_pensionistas", - dump_mode="append", - wait=outputs_path_by_table, - ) - - with case( - table_is_available(outputs_path_by_table, "cadastro_servidores"), True - ): - wait_upload_table_servidores_cadastro = create_table_and_upload_to_gcs( - data_path=outputs_path_by_table["cadastro_servidores"], - dataset_id=dataset_id, - table_id="cadastro_servidores", - dump_mode="append", - wait=outputs_path_by_table, - ) - - with case( - table_is_available( - outputs_path_by_table, "cadastro_reserva_reforma_militares" - ), - True, - ): - wait_upload_table_reserva_reforma_militares_cadastro = ( - create_table_and_upload_to_gcs( - data_path=outputs_path_by_table[ - "cadastro_reserva_reforma_militares" - ], - dataset_id=dataset_id, - table_id="cadastro_reserva_reforma_militares", - dump_mode="append", - wait=outputs_path_by_table, - ) - ) - - with case(table_is_available(outputs_path_by_table, "remuneracao"), True): - wait_upload_table_remuneracao = create_table_and_upload_to_gcs( - data_path=outputs_path_by_table["remuneracao"], - dataset_id=dataset_id, - table_id="remuneracao", - dump_mode="append", - wait=outputs_path_by_table, - ) - - with case(table_is_available(outputs_path_by_table, "afastamentos"), True): - wait_upload_table_afastamentos = create_table_and_upload_to_gcs( - data_path=outputs_path_by_table["afastamentos"], - dataset_id=dataset_id, - table_id="afastamentos", - dump_mode="append", - wait=outputs_path_by_table, - ) - - with case(table_is_available(outputs_path_by_table, "observacoes"), True): - wait_upload_table_observacoes = create_table_and_upload_to_gcs( - data_path=outputs_path_by_table["observacoes"], - dataset_id=dataset_id, - table_id="observacoes", - dump_mode="append", - wait=outputs_path_by_table, - ) - - # cadastro_aposentados - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": "cadastro_aposentados", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=r"Materialize {dataset_id}.cadastro_aposentados", - ) - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = datetime.timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - - with case(update_metadata, True): - update_django_metadata( - dataset_id=dataset_id, - table_id="cadastro_aposentados", - date_column_name={"year": "ano", "month": "mes"}, - date_format="%Y-%m", - coverage_type="part_bdpro", - time_delta={"months": 6}, - prefect_mode=materialization_mode, - bq_project="basedosdados", - upstream_tasks=[wait_for_materialization], - ) - - # cadastro_pensionistas - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": "cadastro_pensionistas", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=r"Materialize {dataset_id}.cadastro_pensionistas", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = datetime.timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - - with case(update_metadata, True): - update_django_metadata( - dataset_id=dataset_id, - table_id="cadastro_pensionistas", - date_column_name={"year": "ano", "month": "mes"}, - date_format="%Y-%m", - coverage_type="part_bdpro", - time_delta={"months": 6}, - prefect_mode=materialization_mode, - bq_project="basedosdados", - upstream_tasks=[wait_for_materialization], - ) - - # cadastro_servidores - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": "cadastro_servidores", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=r"Materialize {dataset_id}.cadastro_servidores", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = datetime.timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - - with case(update_metadata, True): - update_django_metadata( - dataset_id=dataset_id, - table_id="cadastro_servidores", - date_column_name={"year": "ano", "month": "mes"}, - date_format="%Y-%m", - coverage_type="part_bdpro", - time_delta={"months": 6}, - prefect_mode=materialization_mode, - bq_project="basedosdados", - upstream_tasks=[wait_for_materialization], - ) - - # cadastro_reserva_reforma_militares - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": "cadastro_reserva_reforma_militares", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=r"Materialize {dataset_id}.cadastro_reserva_reforma_militares", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = datetime.timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - - with case(update_metadata, True): - update_django_metadata( - dataset_id=dataset_id, - table_id="cadastro_reserva_reforma_militares", - date_column_name={"year": "ano", "month": "mes"}, - date_format="%Y-%m", - coverage_type="part_bdpro", - time_delta={"months": 6}, - prefect_mode=materialization_mode, - bq_project="basedosdados", - upstream_tasks=[wait_for_materialization], - ) - - # remuneracao - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": "remuneracao", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=r"Materialize {dataset_id}.remuneracao", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = datetime.timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - - with case(update_metadata, True): - update_django_metadata( - dataset_id=dataset_id, - table_id="remuneracao", - date_column_name={"year": "ano", "month": "mes"}, - date_format="%Y-%m", - coverage_type="part_bdpro", - time_delta={"months": 6}, - prefect_mode=materialization_mode, - bq_project="basedosdados", - upstream_tasks=[wait_for_materialization], - ) - - # afastamentos - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": "afastamentos", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=r"Materialize {dataset_id}.afastamentos", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = datetime.timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - - with case(update_metadata, True): - update_django_metadata( - dataset_id=dataset_id, - table_id="afastamentos", - date_column_name={"year": "ano", "month": "mes"}, - date_format="%Y-%m", - coverage_type="part_bdpro", - time_delta={"months": 6}, - prefect_mode=materialization_mode, - bq_project="basedosdados", - upstream_tasks=[wait_for_materialization], - ) - - # observacoes - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": "observacoes", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=r"Materialize {dataset_id}.observacoes", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = datetime.timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - - with case(update_metadata, True): - update_django_metadata( - dataset_id=dataset_id, - table_id="observacoes", - date_column_name={"year": "ano", "month": "mes"}, - date_format="%Y-%m", - coverage_type="part_bdpro", - time_delta={"months": 6}, - prefect_mode=materialization_mode, - bq_project="basedosdados", - upstream_tasks=[wait_for_materialization], - ) - - -datasets_br_cgu_servidores_executivo_federal_flow.storage = GCS( - constants.GCS_FLOWS_BUCKET.value -) -datasets_br_cgu_servidores_executivo_federal_flow.run_config = KubernetesRun( - image=constants.DOCKER_IMAGE.value -) -datasets_br_cgu_servidores_executivo_federal_flow.schedule = every_day +# ! br_cgu_servidores_federal__afastamentos + +br_cgu_servidores_federal__afastamentos = copy(flow_cgu_servidores_publicos) +br_cgu_servidores_federal__afastamentos.name = ("br_cgu_servidores_executivo_federal.afastamentos") +br_cgu_servidores_federal__afastamentos.code_owners = ["trick"] +br_cgu_servidores_federal__afastamentos.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_servidores_federal__afastamentos.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_servidores_federal__afastamentos.schedule = every_day_afastamentos + +# ! br_cgu_servidores_federal__cadastro_aposentados + +br_cgu_servidores_federal__cadastro_aposentados = copy(flow_cgu_servidores_publicos) +br_cgu_servidores_federal__cadastro_aposentados.name = ("br_cgu_servidores_executivo_federal.cadastro_aposentados") +br_cgu_servidores_federal__cadastro_aposentados.code_owners = ["trick"] +br_cgu_servidores_federal__cadastro_aposentados.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_servidores_federal__cadastro_aposentados.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_servidores_federal__cadastro_aposentados.schedule = every_day_cadastro_aposentados + +# ! br_cgu_servidores_federal__cadastro_pensionistas + +br_cgu_servidores_federal__cadastro_pensionistas = copy(flow_cgu_servidores_publicos) +br_cgu_servidores_federal__cadastro_pensionistas.name = ("br_cgu_servidores_executivo_federal.cadastro_pensionistas") +br_cgu_servidores_federal__cadastro_pensionistas.code_owners = ["trick"] +br_cgu_servidores_federal__cadastro_pensionistas.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_servidores_federal__cadastro_pensionistas.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_servidores_federal__cadastro_pensionistas.schedule = every_day_cadastro_pensionistas + +# ! br_cgu_servidores_federal__cadastro_reserva_reforma_militares + +br_cgu_servidores_federal__cadastro_reserva_reforma_militares = copy(flow_cgu_servidores_publicos) +br_cgu_servidores_federal__cadastro_reserva_reforma_militares.name = ("br_cgu_servidores_executivo_federal.cadastro_reserva_reforma_militares") +br_cgu_servidores_federal__cadastro_reserva_reforma_militares.code_owners = ["trick"] +br_cgu_servidores_federal__cadastro_reserva_reforma_militares.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_servidores_federal__cadastro_reserva_reforma_militares.run_config = (KubernetesRun(image=constants.DOCKER_IMAGE.value)) +br_cgu_servidores_federal__cadastro_reserva_reforma_militares.schedule = (every_day_cadastro_reserva_reforma_militares) + +# ! br_cgu_servidores_federal__cadastro_servidores + +br_cgu_servidores_federal__cadastro_servidores = copy(flow_cgu_servidores_publicos) +br_cgu_servidores_federal__cadastro_servidores.name = ("br_cgu_servidores_executivo_federal.cadastro_servidores") +br_cgu_servidores_federal__cadastro_servidores.code_owners = ["trick"] +br_cgu_servidores_federal__cadastro_servidores.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_servidores_federal__cadastro_servidores.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_servidores_federal__cadastro_servidores.schedule = every_day_cadastro_servidores + +# ! br_cgu_servidores_federal__observacoes + +br_cgu_servidores_federal__observacoes = copy(flow_cgu_servidores_publicos) +br_cgu_servidores_federal__observacoes.name = ("br_cgu_servidores_executivo_federal.observacoes") +br_cgu_servidores_federal__observacoes.code_owners = ["trick"] +br_cgu_servidores_federal__observacoes.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_servidores_federal__observacoes.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_servidores_federal__observacoes.schedule = every_day_observacoes + +# ! br_cgu_servidores_federal__remuneracao + +br_cgu_servidores_federal__remuneracao = copy(flow_cgu_servidores_publicos) +br_cgu_servidores_federal__remuneracao.name = ("br_cgu_servidores_executivo_federal.remuneracao") +br_cgu_servidores_federal__remuneracao.code_owners = ["trick"] +br_cgu_servidores_federal__remuneracao.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_servidores_federal__remuneracao.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_servidores_federal__remuneracao.schedule = every_day_remuneracao diff --git a/pipelines/datasets/br_cgu_servidores_executivo_federal/schedules.py b/pipelines/datasets/br_cgu_servidores_executivo_federal/schedules.py index 9517c6c18..92923ab5e 100644 --- a/pipelines/datasets/br_cgu_servidores_executivo_federal/schedules.py +++ b/pipelines/datasets/br_cgu_servidores_executivo_federal/schedules.py @@ -13,17 +13,17 @@ constants as cgu_constants, ) -every_day = Schedule( +every_day_cadastro_aposentados = Schedule( clocks=[ CronClock( - cron="0 6 * * 0-5", # At 06:00 on every day-of-week from Sunday through Friday. + cron="0 6 * * *", # At 06:00 on every day-of-week from Sunday through Friday. start_date=datetime(2023, 9, 26), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, ], parameter_defaults={ "dataset_id": "br_cgu_servidores_executivo_federal", - "table_id": list(cgu_constants.TABLES.value.keys()), + "table_id": "cadastro_aposentados", "materialization_mode": "prod", "materialize_after_dump": True, "dbt_alias": True, @@ -32,3 +32,125 @@ ), ] ) + +every_day_cadastro_pensionistas = Schedule( + clocks=[ + CronClock( + cron="15 6 * * *", # At 06:00 on every day-of-week from Sunday through Friday. + start_date=datetime(2023, 9, 26), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_cgu_servidores_executivo_federal", + "table_id": "cadastro_pensionistas", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "update_metadata": True, + }, + ), + ] +) + +every_day_cadastro_servidores = Schedule( + clocks=[ + CronClock( + cron="30 6 * * *", # At 06:00 on every day-of-week from Sunday through Friday. + start_date=datetime(2023, 9, 26), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_cgu_servidores_executivo_federal", + "table_id": "cadastro_servidores", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "update_metadata": True, + }, + ), + ] +) + + +every_day_cadastro_reserva_reforma_militares = Schedule( + clocks=[ + CronClock( + cron="45 6 * * *", # At 06:00 on every day-of-week from Sunday through Friday. + start_date=datetime(2023, 9, 26), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_cgu_servidores_executivo_federal", + "table_id": "cadastro_reserva_reforma_militares", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "update_metadata": True, + }, + ), + ] +) + + +every_day_remuneracao = Schedule( + clocks=[ + CronClock( + cron="0 7 * * *", # At 06:00 on every day-of-week from Sunday through Friday. + start_date=datetime(2023, 9, 26), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_cgu_servidores_executivo_federal", + "table_id": "remuneracao", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "update_metadata": True, + }, + ), + ] +) + +every_day_afastamentos = Schedule( + clocks=[ + CronClock( + cron="15 7 * * *", # At 06:00 on every day-of-week from Sunday through Friday. + start_date=datetime(2023, 9, 26), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_cgu_servidores_executivo_federal", + "table_id": "afastamentos", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "update_metadata": True, + }, + ), + ] +) + +every_day_observacoes = Schedule( + clocks=[ + CronClock( + cron="30 7 * * *", # At 06:00 on every day-of-week from Sunday through Friday. + start_date=datetime(2023, 9, 26), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_cgu_servidores_executivo_federal", + "table_id": "observacoes", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "update_metadata": True, + }, + ), + ] +) \ No newline at end of file diff --git a/pipelines/datasets/br_cgu_servidores_executivo_federal/tasks.py b/pipelines/datasets/br_cgu_servidores_executivo_federal/tasks.py deleted file mode 100644 index 8d1a389ae..000000000 --- a/pipelines/datasets/br_cgu_servidores_executivo_federal/tasks.py +++ /dev/null @@ -1,182 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Tasks for br_cgu_servidores_executivo_federal -""" - -import datetime -import os -import basedosdados as bd - -import pandas as pd -import requests -from dateutil.relativedelta import relativedelta -from prefect import task - -from pipelines.datasets.br_cgu_servidores_executivo_federal.constants import constants -from pipelines.datasets.br_cgu_servidores_executivo_federal.utils import ( - build_urls, - download_zip_files_for_sheet, - make_url, - process_table, - extract_dates -) -from pipelines.utils.metadata.utils import get_api_most_recent_date, get_url -from pipelines.utils.utils import log, to_partitions - -@task -def scrape_download_page(): - dates = extract_dates() - return dates - -@task -def get_source_max_date(files_df) -> list: - """ - Encontra a data mais recente em um DataFrame e retorna a data e a URL correspondente. - - Parâmetros: - - table_id (str): O identificador da tabela que contém os dados. - - Retorna: - Uma lista contendo a data mais recente e a URL correspondente. - - """ - files_df["data"] = pd.to_datetime( - files_df["ano"].astype(str) + "-" + files_df["mes_numero"].astype(str) - ) - max_date = files_df["data"].max() - return max_date - -@task # noqa -def download_files(date_start: datetime.date, date_end: datetime.date): - date_range: list[datetime.date] = pd.date_range( - date_start, date_end, freq="MS" - ).to_list() - - dates_before_2020 = [date for date in date_range if date.year < 2020] - dates_pos_2019 = [date for date in date_range if date.year > 2019] - - urls_for_sheets_before_2020 = { - sheet: build_urls(sheet, dates_before_2020) - for sheet in ["Militares", "Servidores_BACEN", "Servidores_SIAPE"] - } - - urls_for_sheets_after_2019 = { - sheet: build_urls(sheet, dates_pos_2019) for sheet in constants.SHEETS.value[0] - } - - for key in urls_for_sheets_after_2019.keys(): - if key in urls_for_sheets_before_2020: - urls_for_sheets_after_2019[key].extend(urls_for_sheets_before_2020[key]) - - valid_sheets = { - sheet: payload - for (sheet, payload) in urls_for_sheets_after_2019.items() - if len(payload) > 0 - } - - log(f"{valid_sheets=}") - - if not os.path.exists(constants.INPUT.value): - os.mkdir(constants.INPUT.value) - - for sheet_name in valid_sheets: - download_zip_files_for_sheet(sheet_name, valid_sheets[sheet_name]) - - return valid_sheets - - -@task -def merge_and_clean_data(sheets_info): - def get_sheets_by_sources(table_name: str, sources: list[str]): - dates = [ - list(map(lambda i: i["date"], info)) - for (name, info) in sheets_info.items() - if name in sources - ] - return {"table_name": table_name, "sources": sources, "dates": dates[0]} - - tables = set( - [ - table_name - for table_name, sources in constants.TABLES.value.items() - for source in sources - if source in list(sheets_info.keys()) - ] - ) - - table_and_source = [ - get_sheets_by_sources(table, constants.TABLES.value[table]) for table in tables - ] - - log(f"{table_and_source=}") - - return [process_table(table_info) for table_info in table_and_source] - - -@task -def make_partitions(tables: list[tuple[str, pd.DataFrame]]) -> dict[str, str]: - output = constants.OUTPUT.value - - if not os.path.exists(output): - os.mkdir(output) - - for table_name, df in tables: - if len(df) > 0: - savepath = f"{output}/{table_name}" - - if not os.path.exists(savepath): - os.mkdir(savepath) - - - to_partitions( - data=df, - partition_columns=["ano", "mes"], - savepath=savepath, - ) - else: - log(f"{table_name=} is empty") - - return { - table_name: f"{output}/{table_name}" for table_name, df in tables if len(df) > 0 - } - - -@task -def table_is_available(tables: dict[str, str], table: str) -> bool: - available = table in tables - - log(f"{table=} in {tables.keys()=}: {available}") - - return available - - -@task -def is_up_to_date(next_date: datetime.date) -> bool: - url = make_url("Servidores_SIAPE", next_date) - - session = requests.Session() - session.mount("https://", requests.adapters.HTTPAdapter(max_retries=3)) # type: ignore - - headers = { - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.93 Safari/537.36", - } - - response = requests.get(url, timeout=1000, stream=True, headers=headers) - - log(f"Response: {response=}") - - return response.status_code != 200 - - -@task -def get_next_date() -> datetime.date: - backend = bd.Backend(graphql_url=get_url("prod")) - last_date_in_api = get_api_most_recent_date( - dataset_id="br_cgu_servidores_executivo_federal", - table_id="cadastro_servidores", - date_format="%Y-%m", - backend=backend, - ) - - next_date = last_date_in_api + relativedelta(months=1) - return next_date diff --git a/pipelines/datasets/br_cgu_servidores_executivo_federal/utils.py b/pipelines/datasets/br_cgu_servidores_executivo_federal/utils.py deleted file mode 100644 index c234c7383..000000000 --- a/pipelines/datasets/br_cgu_servidores_executivo_federal/utils.py +++ /dev/null @@ -1,290 +0,0 @@ -# -*- coding: utf-8 -*- -""" -General purpose functions for the br_cgu_servidores_executivo_federal project -""" - -import datetime -import io -import os -import zipfile - -import pandas as pd -import requests - -from pipelines.datasets.br_cgu_servidores_executivo_federal.constants import constants -from pipelines.utils.apply_architecture_to_dataframe.utils import ( - read_architecture_table, - rename_columns, -) -from pipelines.utils.utils import log - - -from bs4 import BeautifulSoup -from selenium import webdriver -from selenium.webdriver.chrome.service import Service as ChromeService -from selenium.webdriver.common.by import By -from selenium.webdriver.support.ui import Select -from tqdm import tqdm -from webdriver_manager.chrome import ChromeDriverManager - -def make_url(sheet_name: str, date: datetime.date) -> str: - if date.year <= 2019 and sheet_name not in [ - "Militares", - "Servidores_BACEN", - "Servidores_SIAPE", - ]: - raise ValueError(f"Invalid {sheet_name} for {date.year}, {date.month}") - - month = f"0{date.month}" if date.month < 10 else date.month - file = f"{date.year}{month}_{sheet_name}" - - return f"{constants.URL.value}/{file}" - - -def build_urls(sheet_name: str, dates: list[datetime.date]): - return [{"url": make_url(sheet_name, date), "date": date} for date in dates] - - -def download_zip_files_for_sheet(sheet_name: str, sheet_urls: list): - sheet_input_folder = f"{constants.INPUT.value}/{sheet_name}" - - if not os.path.exists(sheet_input_folder): - os.mkdir(sheet_input_folder) - - session = requests.Session() - session.mount("https://", requests.adapters.HTTPAdapter(max_retries=3)) # type: ignore - - headers = { - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.93 Safari/537.36", - } - - log(f"Starting download for {sheet_name=}") - - for sheet_info in sheet_urls: - - href = sheet_info["url"] - date = sheet_info["date"] - year = date.year - month = date.month - try: - response = requests.get(href, timeout=1000, stream=True, headers=headers) - z = zipfile.ZipFile(io.BytesIO(response.content)) - z.extractall(f"{sheet_input_folder}/{year}-{month}") - except: - log(f'Essa url ainda não está disponível --> {href}') - - log(f"Finished download for {sheet_name=}") - - -def get_csv_file_by_table_name_and_date(table_name: str, date: datetime.date) -> str: - if table_name in [ - "cadastro_aposentados", - "cadastro_pensionistas", - "cadastro_servidores", - "cadastro_reserva_reforma_militares", - ]: - pattern = "Cadastro" - elif table_name == "remuneracao": - pattern = "Remuneracao" - elif table_name == "observacoes": - pattern = "Observacoes" - elif table_name == "afastamentos": - pattern = "Afastamentos" - else: - raise ValueError(f"Not found file pattern for {table_name=}, {date=}") - - month = f"0{date.month}" if date.month < 10 else date.month - - path = f"{date.year}{month}_{pattern}.csv" - - return path - - -def get_source(table_name: str, source: str) -> str: - ORIGINS = { - "cadastro_aposentados": { - "Aposentados_BACEN": "BACEN", - "Aposentados_SIAPE": "SIAPE", - }, - "cadastro_pensionistas": { - "Pensionistas_SIAPE": "SIAPE", - "Pensionistas_DEFESA": "Defesa", - "Pensionistas_BACEN": "BACEN", - }, - "cadastro_servidores": { - "Servidores_BACEN": "BACEN", - "Servidores_SIAPE": "SIAPE", - "Militares": "Militares", - }, - "cadastro_reserva_reforma_militares": { - "Reserva_Reforma_Militares": "Reserva Reforma Militares" - }, - "remuneracao": { - "Militares": "Militares", - "Pensionistas_BACEN": "Pensionistas BACEN", - "Pensionistas_DEFESA": "Pensionistas DEFESA", - "Reserva_Reforma_Militares": "Reserva Reforma Militares", - "Servidores_BACEN": "Servidores BACEN", - "Servidores_SIAPE": "Servidores SIAPE", - }, - "afastamentos": {"Servidores_BACEN": "BACEN", "Servidores_SIAPE": "SIAPE"}, - "observacoes": { - "Aposentados_BACEN": "Aposentados BACEN", - "Aposentados_SIAPE": "Aposentados SIAPE", - "Militares": "Militares", - "Pensionistas_BACEN": "Pensionistas BACEN", - "Pensionistas_DEFESA": "Pensionistas DEFESA", - "Pensionistas_SIAPE": "Pensionistas SIAPE", - "Reserva_Reforma_Militares": "Reserva Reforma Militares", - "Servidores_BACEN": "Servidores BACEN", - "Servidores_SIAPE": "Servidores SIAPE", - }, - } - - return ORIGINS[table_name][source] - - -def read_and_clean_csv( - table_name: str, source: str, date: datetime.date -) -> pd.DataFrame: - csv_path = get_csv_file_by_table_name_and_date(table_name, date) - - path = f"{constants.INPUT.value}/{source}/{date.year}-{date.month}/{csv_path}" - - log(f"Reading {table_name=}, {source=}, {date=} {path=}") - - if not os.path.exists(path): - log(f"File {path=} dont exists") - return pd.DataFrame() - - df = pd.read_csv( - path, - sep=";", - encoding="latin-1", - ).rename( - columns=lambda col: col.replace("\x96 ", "") - ) # some csv files contains \x96 in header lines - - url_architecture = constants.ARCH.value[table_name] - - df_architecture = read_architecture_table(url_architecture) - - df = rename_columns(df, df_architecture) - - df["ano"] = date.year - df["mes"] = date.month - - if "origem" in df_architecture["name"].to_list(): - df["origem"] = get_source(table_name, source) - - return df - - -def process_table(table_info: dict) -> tuple[str, pd.DataFrame]: - table_name: str = table_info["table_name"] - sources: list[str] = table_info["sources"] - dates: list[datetime.date] = table_info["dates"] - - def read_csv_by_source(source: str): - dfs = [read_and_clean_csv(table_name, source, date) for date in dates] - - return pd.concat(dfs) - - log(f"Processing {table_name=}, {sources=}") - - return ( - table_name, - pd.concat([read_csv_by_source(source) for source in sources]), - ) - - - - -def extract_dates(table ="Servidores_SIAPE") -> pd.DataFrame: - """ - Extrai datas e URLs de download do site do Portal da Transparência e retorna um DataFrame. - """ - if not os.path.exists("/tmp/data/br_cgu_servidores_executivo_federal/tmp"): - os.makedirs("/tmp/data/br_cgu_servidores_executivo_federal/tmp", exist_ok=True) - options = webdriver.ChromeOptions() - prefs = { - "download.default_directory": "/tmp/data/br_cgu_servidores_executivo_federal/tmp", - "download.prompt_for_download": False, - "download.directory_upgrade": True, - "safebrowsing.enabled": True, - } - options.add_experimental_option( - "prefs", - prefs, - ) - - options.add_argument("--headless") - options.add_argument("--test-type") - options.add_argument("--disable-gpu") - options.add_argument("--no-first-run") - options.add_argument("--no-sandbox") - options.add_argument("--disable-dev-shm-usage") - options.add_argument("--no-default-browser-check") - options.add_argument("--ignore-certificate-errors") - options.add_argument("--start-maximized") - options.add_argument( - "user-agent=Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36" - ) - - driver = webdriver.Chrome( - service=ChromeService(ChromeDriverManager().install()), options=options - ) - - if table == "Servidores_SIAPE": - driver.get("http://portaldatransparencia.gov.br/download-de-dados/servidores/") - driver.implicitly_wait(10) - - page_source = driver.page_source - BeautifulSoup(page_source, "html.parser") - - select_anos = Select(driver.find_element(By.ID, "links-anos")) - anos_meses = {} - - # Iterar sobre as opções dentro do select de anos - for option_ano in select_anos.options: - valor_ano = option_ano.get_attribute("value") - ano = option_ano.text - - select_anos.select_by_value(valor_ano) - - driver.implicitly_wait(5) - - select_meses = Select(driver.find_element(By.ID, "links-meses")) - - meses_dict = {} - - # Iterar sobre as opções dentro do select de meses - for option_mes in select_meses.options: - valor_mes = option_mes.get_attribute("value") - nome_mes = option_mes.text.capitalize() - meses_dict[valor_mes] = nome_mes - - anos_meses[ano] = meses_dict - - driver.quit() - - anos = [] - meses_numeros = [] - meses_nomes = [] - - # Iterar sobre o dicionário para extrair os dados - for ano, meses in anos_meses.items(): - for mes_numero, mes_nome in meses.items(): - anos.append(ano) - meses_numeros.append(mes_numero) - meses_nomes.append(mes_nome) - - # Criar o DataFrame - data = {"ano": anos, "mes_numero": meses_numeros, "mes_nome": meses_nomes} - df = pd.DataFrame(data) - df["urls"] = None - for index, row in df.iterrows(): - if table == "Servidores_SIAPE": - df["urls"][index] = f"{row.ano}{row.mes_numero}_Servidores_SIAPE.zip" - - return df diff --git a/pipelines/utils/crawler_cgu/constants.py b/pipelines/utils/crawler_cgu/constants.py index 419ca410f..4133f1657 100644 --- a/pipelines/utils/crawler_cgu/constants.py +++ b/pipelines/utils/crawler_cgu/constants.py @@ -4,7 +4,6 @@ """ from enum import Enum -from datetime import datetime class constants(Enum): # pylint: disable=c0103 """ @@ -32,4 +31,98 @@ class constants(Enum): # pylint: disable=c0103 "URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpdc/", "READ" : "_CPDC", "ONLY_ONE_FILE" : False} - } \ No newline at end of file + } + + # ! ============================================== CGU - Servidores Públicos do Executivo Federal ============================================== + URL_SERVIDORES = "http://portaldatransparencia.gov.br/download-de-dados/servidores/" + + TABELA_SERVIDORES = { + "afastamentos": { + "NAME_TABLE": "_Afastamentos.csv", + "ARCHITECTURE": "https://docs.google.com/spreadsheets/d/1NQ4t9l8znClnfM8NYBLBkI9PoWV5UAosUZ1KGvZe-T0/edit#gid=0", + "READ": { + "Servidores_BACEN": "BACEN", + "Servidores_SIAPE": "SIAPE", + }, + "ONLY_TABLE": True, + "INPUT": "/tmp/input/cgu_servidores/afastamentos", + "OUTPUT": "/tmp/output/cgu_servidores/afastamentos", + }, + "cadastro_aposentados": { + "ARCHITECTURE": "https://docs.google.com/spreadsheets/d/1_t_JsWbuGlg8cz_2RYYNMuulzA4RHydfJ4TA-wH9Ch8/edit#gid=0", + "NAME_TABLE": "_Cadastro.csv", + "READ": { + "Aposentados_SIAPE": "SIAPE", + "Aposentados_BACEN": "BACEN", + }, + "ONLY_TABLE": True, + "INPUT": "/tmp/input/cgu_servidores/cadastro_aposentados", + "OUTPUT": "/tmp/output/cgu_servidores/cadastro_aposentados", + }, + "observacoes": { + "ARCHITECTURE": "https://docs.google.com/spreadsheets/d/1BWt6yvKTfNW0XCDNIsIu8NhSKjhbDVJjnEwvnEmVkRc/edit#gid=0", + "NAME_TABLE": "_Observacoes.csv", + "READ": { + "Aposentados_BACEN": "Aposentados BACEN", + "Aposentados_SIAPE": "Aposentados SIAPE", + "Militares": "Militares", + "Pensionistas_BACEN": "Pensionistas BACEN", + "Pensionistas_DEFESA": "Pensionistas DEFESA", + "Pensionistas_SIAPE": "Pensionistas SIAPE", + "Reserva_Reforma_Militares": "Reserva Reforma Militares", + "Servidores_BACEN": "Servidores BACEN", + "Servidores_SIAPE": "Servidores SIAPE", + "Militares": "Militares", + }, + "ONLY_TABLE": True, + "INPUT": "/tmp/input/cgu_servidores/observacoes", + "OUTPUT": "/tmp/output/cgu_servidores/observacoes", + }, + "cadastro_pensionistas": { + "ARCHITECTURE": "https://docs.google.com/spreadsheets/d/1G_RPhSUZRrCqcQCP1WSjBiYbnjirqTp0yaLNUaPi_7U/edit#gid=0", + "NAME_TABLE": "_Cadastro.csv", + "READ": { + "Pensionistas_SIAPE": "SIAPE", + "Pensionistas_DEFESA": "Defesa", + "Pensionistas_BACEN": "BACEN", + }, + "ONLY_TABLE": True, + "INPUT": "/tmp/input/cgu_servidores/cadastro_pensionistas", + "OUTPUT": "/tmp/output/cgu_servidores/cadastro_pensionistas", + }, + "remuneracao": { + "ARCHITECTURE": "https://docs.google.com/spreadsheets/d/1LJ8_N53OoNEQQ1PMAeIPKq1asB29MUP5SRoFWnUI6Zg/edit#gid=0", + "NAME_TABLE": "_Remuneracao.csv", + "READ": { + "Militares": "Militares", + "Pensionistas_BACEN": "Pensionistas BACEN", + "Pensionistas_DEFESA": "Pensionistas DEFESA", + "Reserva_Reforma_Militares": "Reserva Reforma Militares", + "Servidores_BACEN": "Servidores BACEN", + "Servidores_SIAPE": "Servidores SIAPE", + }, + "ONLY_TABLE": True, + "INPUT": "/tmp/input/cgu_servidores/remuneracao", + "OUTPUT": "/tmp/output/cgu_servidores/remuneracao", + }, + "cadastro_reserva_reforma_militares": { + "ARCHITECTURE": "https://docs.google.com/spreadsheets/d/1vqWjATWjHK-6tbj_ilwbhNWReqe3AKd3VTpzBbPu4qI/edit#gid=0", + "NAME_TABLE": "_Cadastro.csv", + "READ": {"Reserva_Reforma_Militares": "Reserva Reforma Militares"}, + "ONLY_TABLE": True, + "INPUT": "/tmp/input/cgu_servidores/cadastro_reserva_reforma_militares", + "OUTPUT": "/tmp/output/cgu_servidores/cadastro_reserva_reforma_militares", + }, + "cadastro_servidores": { + "ARCHITECTURE": "https://docs.google.com/spreadsheets/d/1U57P5XhCw9gERD8sN24P0vDK0CjkUuOQZwOoELdE3Jg/edit#gid=0", + "NAME_TABLE": "_Cadastro.csv", + "READ": { + "Servidores_BACEN": "BACEN", + "Servidores_SIAPE": "SIAPE", + "Militares": "Militares", + }, + "ONLY_TABLE": True, + "INPUT": "/tmp/input/cgu_servidores/cadastro_servidores", + "OUTPUT": "/tmp/output/cgu_servidores/cadastro_servidores", + }, + } diff --git a/pipelines/utils/crawler_cgu/flows.py b/pipelines/utils/crawler_cgu/flows.py index f3bcb2a4b..a8106af08 100644 --- a/pipelines/utils/crawler_cgu/flows.py +++ b/pipelines/utils/crawler_cgu/flows.py @@ -2,6 +2,7 @@ """ Flows for br_cgu_cartao_pagamento """ +from calendar import month from datetime import timedelta from prefect.run_configs import KubernetesRun from prefect.storage import GCS @@ -13,6 +14,7 @@ from pipelines.utils.crawler_cgu.tasks import ( partition_data, get_current_date_and_download_file, + verify_all_url_exists_to_download, ) from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants @@ -21,8 +23,10 @@ create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, + log_task ) + with Flow( name="CGU - Cartão de Pagamento" ) as flow_cgu_cartao_pagamento: @@ -32,7 +36,7 @@ #### # Relative_month = 1 means that the data will be downloaded for the current month #### - relative_month = Parameter("relative_month", default=1, required=False) + relative_month = Parameter("relative_month", default=-1, required=False) materialization_mode = Parameter("materialization_mode", default="dev", required=False) materialize_after_dump = Parameter("materialize_after_dump", default=True, required=False) dbt_alias = Parameter("dbt_alias", default=True, required=False) @@ -57,8 +61,10 @@ filepath = partition_data( table_id=table_id, - upstream_tasks=[dados_desatualizados] - ) + dataset_id=dataset_id, + + upstream_tasks=[dados_desatualizados], + ) wait_upload_table = create_table_and_upload_to_gcs( data_path=filepath, @@ -118,3 +124,98 @@ flow_cgu_cartao_pagamento.run_config = KubernetesRun( image=constants.DOCKER_IMAGE.value ) + +# ! ============================================== CGU - Servidores Públicos do Executivo Federal ============================================== + +with Flow( + name="CGU - Servidores Públicos do Executivo Federal", code_owners=["trick"] +) as flow_cgu_servidores_publicos: + + dataset_id = Parameter("dataset_id", default="br_cgu_servidores_executivo_federal", required=True) + table_id = Parameter("table_id", required=True) + materialization_mode = Parameter("materialization_mode", default="dev", required=False) + materialize_after_dump = Parameter("materialize_after_dump", default=True, required=False) + dbt_alias = Parameter("dbt_alias", default=True, required=False) + update_metadata = Parameter("update_metadata", default=False, required=False) + rename_flow_run = rename_current_flow_run_dataset_table(prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id) + + #### + # Relative_month = 1 means that the data will be downloaded for the current month + #### + relative_month = Parameter("relative_month", default=4, required=False) + + with case(verify_all_url_exists_to_download(dataset_id, table_id, relative_month), True): + data_source_max_date = get_current_date_and_download_file( + table_id, + dataset_id, + relative_month, + ) + + dados_desatualizados = check_if_data_is_outdated( + dataset_id=dataset_id, + table_id=table_id, + data_source_max_date=data_source_max_date, + date_format="%Y-%m", + upstream_tasks=[data_source_max_date], + ) + + with case(dados_desatualizados, True): + filepath = partition_data( + table_id=table_id, dataset_id=dataset_id, upstream_tasks=[data_source_max_date] + ) + wait_upload_table = create_table_and_upload_to_gcs( + data_path=filepath, + dataset_id=dataset_id, + table_id=table_id, + dump_mode="append", + wait=filepath, + upstream_tasks=[filepath], + ) + + with case(materialize_after_dump, True): + + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": table_id, + "mode": materialization_mode, + "dbt_alias": dbt_alias, + "dbt_command": "run/test", + "disable_elementary": False, + }, + labels=current_flow_labels, + run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks=[wait_upload_table], + ) + + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + upstream_tasks=[materialization_flow], + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + with case(update_metadata, True): + update_django_metadata( + dataset_id=dataset_id, + table_id=table_id, + date_column_name={"year": "ano", "month": "mes"}, + date_format="%Y-%m", + coverage_type="part_bdpro", + time_delta={"months": 6}, + prefect_mode=materialization_mode, + bq_project="basedosdados", + upstream_tasks=[wait_for_materialization], + ) + +flow_cgu_servidores_publicos.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +flow_cgu_servidores_publicos.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) diff --git a/pipelines/utils/crawler_cgu/tasks.py b/pipelines/utils/crawler_cgu/tasks.py index 2bc2f2b2b..2eed65bd9 100644 --- a/pipelines/utils/crawler_cgu/tasks.py +++ b/pipelines/utils/crawler_cgu/tasks.py @@ -4,16 +4,26 @@ """ from datetime import datetime from prefect import task -from dateutil.relativedelta import relativedelta +import os +import basedosdados as bd +import requests import pandas as pd -from pipelines.utils.utils import log, to_partitions -from pipelines.utils.crawler_cgu.utils import read_csv, last_date_in_metadata +from dateutil.relativedelta import relativedelta +from pipelines.utils.utils import log, to_partitions, download_and_unzip_file +from pipelines.utils.metadata.utils import get_api_most_recent_date, get_url +from pipelines.utils.crawler_cgu.utils import ( + read_csv, + last_date_in_metadata, + read_and_clean_csv, + build_urls, +) from pipelines.utils.crawler_cgu.constants import constants from pipelines.utils.crawler_cgu.utils import download_file from typing import Tuple + @task -def partition_data(table_id: str) -> str: +def partition_data(table_id: str, dataset_id : str) -> str: """ Partition data from a given table. @@ -28,25 +38,31 @@ def partition_data(table_id: str) -> str: str: The path where the partitioned data is saved. """ - value_constants = constants.TABELA.value[table_id] - - log("---------------------------- Read data ----------------------------") - # Read the data - df = read_csv(table_id = table_id, - url = value_constants['URL']) - - # Partition the data - log(" ---------------------------- Partiting data -----------------------") - - to_partitions( - data = df, - partition_columns=['ANO_EXTRATO', 'MES_EXTRATO'], - savepath = value_constants['OUTPUT_DATA'], - file_type='csv') - - log("---------------------------- Data partitioned ----------------------") - - return value_constants['OUTPUT_DATA'] + if dataset_id == "br_cgu_cartao_pagamento": + log("---------------------------- Read data ----------------------------") + df = read_csv(table_id = table_id, + url = constants.TABELA.value[table_id]['URL']) + log(" ---------------------------- Partiting data -----------------------") + to_partitions( + data = df, + partition_columns=['ANO_EXTRATO', 'MES_EXTRATO'], + savepath = constants.TABELA.value[table_id]['OUTPUT_DATA'], + file_type='csv') + log("---------------------------- Data partitioned ----------------------") + return constants.TABELA.value[table_id]['OUTPUT_DATA'] + + elif dataset_id == "br_cgu_servidores_executivo_federal": + + log("---------------------------- Read data ----------------------------") + df = read_and_clean_csv(table_id = table_id) + log(" ---------------------------- Partiting data -----------------------") + to_partitions( + data=df, + partition_columns=["ano", "mes"], + savepath=constants.TABELA_SERVIDORES.value[table_id]['OUTPUT'], + ) + log("---------------------------- Data partitioned ----------------------") + return constants.TABELA_SERVIDORES.value[table_id]['OUTPUT'] @task def get_current_date_and_download_file(table_id : str, @@ -69,12 +85,46 @@ def get_current_date_and_download_file(table_id : str, relative_month = relative_month ) - max_date = str(download_file(table_id = table_id, + dataset_id = dataset_id, year = next_date_in_api.year, month = next_date_in_api.month, relative_month=relative_month)) - + log(f"Max date: {max_date}") date = datetime.strptime(max_date, '%Y-%m-%d') return date + + +@task +def verify_all_url_exists_to_download(dataset_id, table_id, relative_month) -> bool: + """ + Verifies if all URLs are valid and can be downloaded. + + Args: + table_id (str): The identifier for the table to download data for. + year (int): The year for which data is to be downloaded. + month (int): The month for which data is to be downloaded. + + Returns: + bool: True if all URLs are valid and can be downloaded, False otherwise. + """ + last_date_in_api, next_date_in_api = last_date_in_metadata( + dataset_id=dataset_id, table_id=table_id, relative_month=relative_month + ) + + urls = build_urls( + constants.TABELA_SERVIDORES.value, + constants.URL_SERVIDORES.value, + next_date_in_api.year, + next_date_in_api.month, + table_id, + ) + + for url in urls: + if requests.get(url).status_code != 200: + log(f"A URL {url=} não existe!") + return False + + log(f"A URL {url=} existe!") + return True diff --git a/pipelines/utils/crawler_cgu/utils.py b/pipelines/utils/crawler_cgu/utils.py index 68c1d2e5b..55e48b815 100644 --- a/pipelines/utils/crawler_cgu/utils.py +++ b/pipelines/utils/crawler_cgu/utils.py @@ -3,44 +3,125 @@ General purpose functions for the br_cgu_cartao_pagamento project """ import datetime +from arrow import get from dateutil.relativedelta import relativedelta +import gc +import shutil +import io +import zipfile import pandas as pd import os +import unidecode import basedosdados as bd import requests +from dateutil.relativedelta import relativedelta from pipelines.utils.crawler_cgu.constants import constants from typing import List -import unidecode +from tqdm import tqdm from pipelines.utils.utils import log, download_and_unzip_file from pipelines.utils.metadata.utils import get_api_most_recent_date, get_url +from pipelines.utils.crawler_cgu.constants import constants +from pipelines.utils.apply_architecture_to_dataframe.utils import ( + read_architecture_table, + rename_columns, +) +from pipelines.utils.utils import log +from bs4 import BeautifulSoup +from selenium import webdriver +from selenium.webdriver.chrome.service import Service as ChromeService +from selenium.webdriver.common.by import By +from selenium.webdriver.support.ui import Select +from webdriver_manager.chrome import ChromeDriverManager + + +def build_urls(dataset_id: str, url: str, year: int, month: int, table_id: str) -> str: + """ + Constructs URLs based on the provided parameters. + Args: + modelo (str): The model type which determines the URL structure. + url (str): The base URL to which the year, month, and table name will be appended. + year (int): The year to be included in the URL. + month (int): The month to be included in the URL. + table_id (str): The table identifier used to fetch specific table names for constructing URLs. -def download_file(table_id : str, year : int, month : int, relative_month = int) -> None: + Returns: + str: A single URL string if the model is 'TABELA'. + list: A list of URL strings if the model is 'TABELA_SERVIDORES'. """ - Downloads and unzips a file from a specified URL based on the given table ID, year, and month. + if dataset_id == "br_cgu_cartao_pagamento": + log(f"{url}{year}{str(month).zfill(2)}/") - Parameters: - table_id (str): The identifier for the table to download data for. - year (int): The year for which data is to be downloaded. - month (int): The month for which data is to be downloaded. - relative_month (int): The relative month used for querying metadata. + return f"{url}{year}{str(month).zfill(2)}/" - Returns: - None: If the file is successfully downloaded and unzipped. - str: The next date in the API if the URL is found. - str: The last date in the API if the URL is not found. - """ - value_constants = constants.TABELA.value[table_id] - input = value_constants['INPUT_DATA'] - if not os.path.exists(input): - os.makedirs(input) + elif dataset_id == "br_cgu_servidores_executivo_federal": + list_url = [] + for table_name in constants.TABELA_SERVIDORES.value[table_id]["READ"].keys(): + url_completa = f"{url}{year}{str(month).zfill(2)}_{table_name}/" + list_url.append(url_completa) + return list_url - log(f' ---------------------------- Year = {year} --------------------------------------') - log(f' ---------------------------- Month = {month} ------------------------------------') +def build_input(table_id): + """ + Builds a list of input directories based on the given table ID. - if not value_constants['ONLY_ONE_FILE']: + This function retrieves the input keys from the constants.TABELA_SERVIDORES + dictionary for the specified table_id. It then checks if each input directory + exists, creates it if it does not, and appends the directory path to a list. + Finally, it logs the list of input directories and returns it. - url = f"{value_constants['URL']}{year}{str(month).zfill(2)}/" + Args: + table_id (str): The ID of the table for which to build the input directories. + + Returns: + list: A list of input directory paths. + + Raises: + KeyError: If the table_id is not found in constants.TABELA_SERVIDORES. + """ + list_input = [] + for input in constants.TABELA_SERVIDORES.value[table_id]["READ"].keys(): + + value_input = f"{input}" + if not os.path.exists(value_input): + os.makedirs(value_input) + print(value_input) + list_input.append(value_input) + log(f"Lista de inputs: {list_input}") + return list_input + + +def download_file(dataset_id: str, table_id: str, year: int, month: int, relative_month=int) -> None: + """ + Downloads and unzips a file from a specified URL based on the given table ID, year, and month. + + Parameters: + table_id (str): The identifier for the table to download data for. + year (int): The year for which data is to be downloaded. + month (int): The month for which data is to be downloaded. + relative_month (int): The relative month used for querying metadata. + + Returns: + None: If the file is successfully downloaded and unzipped. + str: The next date in the API if the URL is found. + str: The last date in the API if the URL is not found. + """ + + if dataset_id == "br_cgu_cartao_pagamento": + value_constants = constants.TABELA.value[table_id] # ! CGU - Cartão de Pagamento + + input = value_constants["INPUT_DATA"] + if not os.path.exists(input): + os.makedirs(input) + + url: str = build_urls( + url=value_constants["URL"], + year=year, + month=month, + table_id=table_id, + dataset_id=dataset_id, + ) + log(url) status = requests.get(url).status_code == 200 if status: @@ -67,14 +148,34 @@ def download_file(table_id : str, year : int, month : int, relative_month = int) return last_date_in_api - if value_constants['ONLY_ONE_FILE']: - url = value_constants['URL'] - download_and_unzip_file(url, value_constants['INPUT_DATA']) - return None - - - -def read_csv(table_id : str, url : str, column_replace : List = ['VALOR_TRANSACAO']) -> pd.DataFrame: + elif dataset_id == "br_cgu_servidores_executivo_federal": + + constants_cgu_servidores = constants.TABELA_SERVIDORES.value[table_id] # ! CGU - Servidores Públicos do Executivo Federal + + url = build_urls( + constants.TABELA_SERVIDORES.value, + constants.URL_SERVIDORES.value, + year, + month, + table_id, + ) + input_dirs = build_input(table_id) + + for urls, input_dir in zip(url, input_dirs): + if requests.get(urls).status_code == 200: + destino = f"{constants_cgu_servidores['INPUT']}/{input_dir}" + download_and_unzip_file(urls, destino) + + last_date_in_api, next_date_in_api = last_date_in_metadata( + dataset_id="br_cgu_servidores_executivo_federal", + table_id=table_id, + relative_month=relative_month, + ) + return next_date_in_api + +def read_csv( + table_id: str, column_replace: List = ["VALOR_TRANSACAO"] +) -> pd.DataFrame: """ Reads a CSV file from a specified path and processes its columns. @@ -92,25 +193,32 @@ def read_csv(table_id : str, url : str, column_replace : List = ['VALOR_TRANSACA - Column names are converted to uppercase, spaces are replaced with underscores, and accents are removed. - For columns specified in `column_replace`, commas in their values are replaced with dots and the values are converted to float. """ - value_constants = constants.TABELA.value[table_id] + value_constants = constants.TABELA_SERVIDORES.value[table_id] - os.listdir(value_constants['INPUT_DATA']) + os.listdir(value_constants["INPUT"]) - csv_file = [f for f in os.listdir(value_constants['INPUT_DATA']) if f.endswith('.csv')][0] + csv_file = [ + f for f in os.listdir(value_constants["INPUT_DATA"]) if f.endswith(".csv") + ][0] log(f"CSV files: {csv_file}") - df = pd.read_csv(f"{value_constants['INPUT_DATA']}/{csv_file}", sep=';', encoding='latin1') + df = pd.read_csv( + f"{value_constants['INPUT_DATA']}/{csv_file}", sep=";", encoding="latin1" + ) df.columns = [unidecode.unidecode(x).upper().replace(" ", "_") for x in df.columns] for list_column_replace in column_replace: - df[list_column_replace] = df[list_column_replace].str.replace(",", ".").astype(float) + df[list_column_replace] = ( + df[list_column_replace].str.replace(",", ".").astype(float) + ) return df -def last_date_in_metadata(dataset_id : str, - table_id : str, - relative_month) -> datetime.date: + +def last_date_in_metadata( + dataset_id: str, table_id: str, relative_month +) -> datetime.date: """ Retrieves the most recent date from the metadata of a specified dataset and table, and calculates the next date based on a relative month offset. @@ -134,6 +242,153 @@ def last_date_in_metadata(dataset_id : str, backend=backend, ) - next_date_in_api = last_date_in_api + relativedelta(months=relative_month) + next_date_in_api = last_date_in_api + relativedelta(months=relative_month) + + return last_date_in_api, next_date_in_api + +def create_column_ano(df: pd.DataFrame, csv_path: str) -> pd.DataFrame: + """ + Adds a new column 'ano' to the DataFrame based on the first four characters of the csv_path. + + Parameters: + df (pd.DataFrame): The input DataFrame to which the new column will be added. + csv_path (str): The file path string from which the year will be extracted. + + Returns: + pd.DataFrame: The DataFrame with the added 'ano' column. + """ + df['ano'] = int(csv_path[:4]) + return df + +def create_column_month(df : pd.DataFrame, csv_path : str) -> str: + """ + Adds a 'mes' column to the DataFrame based on the month extracted from the csv_path. + + Args: + df (pd.DataFrame): The input DataFrame to which the 'mes' column will be added. + csv_path (str): The file path string from which the month will be extracted. + It is assumed that the month is represented by characters at positions 4 and 5. + + Returns: + pd.DataFrame: The DataFrame with the added 'mes' column. + """ + df["mes"] = int(csv_path[4:6]) + + return df + + +def exclude_used_tables(path: str) -> None: + """ + Deletes the directory at the specified path if it exists and performs garbage collection. + + Args: + path (str): The path to the directory to be deleted. + + Returns: + None + """ + if os.path.exists(path): + shutil.rmtree(path) + gc.collect() + + return None + + +def read_and_clean_csv(table_id: str) -> pd.DataFrame: + """ + Reads and cleans CSV files for a given table ID. + + This function performs the following steps: + 1. Builds the input path for the given table ID. + 2. Iterates through the CSV files in the input path. + 3. Reads each CSV file into a DataFrame. + 4. Renames columns based on a predefined architecture. + 5. Adds 'ano' and 'month' columns based on the CSV file name. + 6. Adds an 'origem' column if it exists in the architecture. + 7. Appends the cleaned DataFrame to a list. + 8. Concatenates all DataFrames in the list into a single DataFrame. + 9. Returns the concatenated DataFrame. + + Args: + table_id (str): The ID of the table to process. + + Returns: + pd.DataFrame: The concatenated and cleaned DataFrame. + """ + append_dataframe = [] + constants_cgu_servidores = constants.TABELA_SERVIDORES.value[table_id] + for csv_path in build_input(table_id): + path = f"{constants_cgu_servidores['INPUT']}/{csv_path}" + for get_csv in os.listdir(path): + if get_csv.endswith(f"{constants_cgu_servidores['NAME_TABLE']}") == True: + log(f"Reading {table_id=}, {csv_path=}, {get_csv=}") + df = pd.read_csv( + os.path.join(path, get_csv), + sep=";", + encoding="latin-1", + ).rename( + columns=lambda col: col.replace("\x96 ", "")) + url_architecture = constants_cgu_servidores["ARCHITECTURE"] + df_architecture = read_architecture_table(url_architecture) + df = rename_columns(df, df_architecture) + + create_column_ano(df, get_csv) + create_column_month(df, get_csv) + if "origem" in df_architecture["name"].to_list(): + df["origem"] = constants_cgu_servidores(csv_path) + + append_dataframe.append(df) + + exclude_used_tables(path) + + if len(append_dataframe) > 1: + df = pd.concat(append_dataframe) + else: + df + + return df - return last_date_in_api, next_date_in_api \ No newline at end of file +def get_source(table_name: str, source: str) -> str: + ORIGINS = { + "cadastro_aposentados": { + "Aposentados_BACEN": "BACEN", + "Aposentados_SIAPE": "SIAPE", + }, + "cadastro_pensionistas": { + "Pensionistas_SIAPE": "SIAPE", + "Pensionistas_DEFESA": "Defesa", + "Pensionistas_BACEN": "BACEN", + }, + "cadastro_servidores": { + "Servidores_BACEN": "BACEN", + "Servidores_SIAPE": "SIAPE", + "Militares": "Militares", + }, + "cadastro_reserva_reforma_militares": { + "Reserva_Reforma_Militares": "Reserva Reforma Militares" + }, + "remuneracao": { + "Militares": "Militares", + "Pensionistas_BACEN": "Pensionistas BACEN", + "Pensionistas_DEFESA": "Pensionistas DEFESA", + "Reserva_Reforma_Militares": "Reserva Reforma Militares", + "Servidores_BACEN": "Servidores BACEN", + "Servidores_SIAPE": "Servidores SIAPE", + }, + "afastamentos": { + "Servidores_BACEN": "BACEN", + "Servidores_SIAPE": "SIAPE"}, + "observacoes": { + "Aposentados_BACEN": "Aposentados BACEN", + "Aposentados_SIAPE": "Aposentados SIAPE", + "Militares": "Militares", + "Pensionistas_BACEN": "Pensionistas BACEN", + "Pensionistas_DEFESA": "Pensionistas DEFESA", + "Pensionistas_SIAPE": "Pensionistas SIAPE", + "Reserva_Reforma_Militares": "Reserva Reforma Militares", + "Servidores_BACEN": "Servidores BACEN", + "Servidores_SIAPE": "Servidores SIAPE", + }, + } + + return ORIGINS[table_name][source] diff --git a/pipelines/utils/utils.py b/pipelines/utils/utils.py index fdefc2994..707ed7faf 100644 --- a/pipelines/utils/utils.py +++ b/pipelines/utils/utils.py @@ -489,7 +489,7 @@ def download_and_unzip_file(url : str, path : str) -> None: r = urlopen(url) zip = zipfile.ZipFile(BytesIO(r.read())) zip.extractall(path=path) - log("Complet download and unzip") + log(f"DOWNLOAD {url} to {path} FINISH") except Exception as e: log(e) From bd3a287200a911c8fc711e601cd25ae05e121a4e Mon Sep 17 00:00:00 2001 From: tricktx Date: Tue, 5 Nov 2024 08:37:45 -0300 Subject: [PATCH 2/9] remove error module --- .../datasets/br_cgu_servidores_executivo_federal/schedules.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pipelines/datasets/br_cgu_servidores_executivo_federal/schedules.py b/pipelines/datasets/br_cgu_servidores_executivo_federal/schedules.py index 92923ab5e..17fd9d4db 100644 --- a/pipelines/datasets/br_cgu_servidores_executivo_federal/schedules.py +++ b/pipelines/datasets/br_cgu_servidores_executivo_federal/schedules.py @@ -7,11 +7,7 @@ from prefect.schedules import Schedule from prefect.schedules.clocks import CronClock - from pipelines.constants import constants -from pipelines.datasets.br_cgu_servidores_executivo_federal.constants import ( - constants as cgu_constants, -) every_day_cadastro_aposentados = Schedule( clocks=[ From c78acfc161d41e033a69f4c854431caacd1982fc Mon Sep 17 00:00:00 2001 From: tricktx Date: Thu, 7 Nov 2024 08:45:50 -0300 Subject: [PATCH 3/9] register flow --- pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py | 1 + pipelines/utils/crawler_cgu/flows.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py b/pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py index f88fe58a4..faf7648eb 100644 --- a/pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py +++ b/pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py @@ -34,6 +34,7 @@ # ! br_cgu_servidores_federal__cadastro_pensionistas + br_cgu_servidores_federal__cadastro_pensionistas = copy(flow_cgu_servidores_publicos) br_cgu_servidores_federal__cadastro_pensionistas.name = ("br_cgu_servidores_executivo_federal.cadastro_pensionistas") br_cgu_servidores_federal__cadastro_pensionistas.code_owners = ["trick"] diff --git a/pipelines/utils/crawler_cgu/flows.py b/pipelines/utils/crawler_cgu/flows.py index a8106af08..7eb7673d0 100644 --- a/pipelines/utils/crawler_cgu/flows.py +++ b/pipelines/utils/crawler_cgu/flows.py @@ -142,7 +142,7 @@ #### # Relative_month = 1 means that the data will be downloaded for the current month #### - relative_month = Parameter("relative_month", default=4, required=False) + relative_month = Parameter("relative_month", default=1, required=False) with case(verify_all_url_exists_to_download(dataset_id, table_id, relative_month), True): data_source_max_date = get_current_date_and_download_file( From b4dd6363a40a63dc4b8ffef31a7249bf4f2b1db6 Mon Sep 17 00:00:00 2001 From: tricktx Date: Thu, 7 Nov 2024 10:27:29 -0300 Subject: [PATCH 4/9] add arrow in poetry --- poetry.lock | 4 ++-- pyproject.toml | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/poetry.lock b/poetry.lock index e14a8591c..ee35f6f14 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. [[package]] name = "aioftp" @@ -5484,4 +5484,4 @@ heapdict = "*" [metadata] lock-version = "2.0" python-versions = ">=3.10,<3.11" -content-hash = "6274ce55c035bb71ed91afef418e6755fd4478cd85dda2b4f0b78e0c7ae26ebf" +content-hash = "6a7b387c35a33e52abe28d92b14e7846f303c84443f693ab8574017ee9cfebb0" diff --git a/pyproject.toml b/pyproject.toml index bc773c815..84d1c13e5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -122,6 +122,7 @@ aiohttp = "^3.9.3" dateparser = "^1.2.0" sicar = {git = "https://github.com/urbanogilson/SICAR.git"} fiona = "^1.10.1" +arrow = "^1.3.0" [tool.poetry.group.dev] From f6789263c9f312588c547bb68ca6bd3807d97aa4 Mon Sep 17 00:00:00 2001 From: tricktx Date: Thu, 7 Nov 2024 11:11:54 -0300 Subject: [PATCH 5/9] register flow part 2 --- pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py b/pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py index faf7648eb..f88fe58a4 100644 --- a/pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py +++ b/pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py @@ -34,7 +34,6 @@ # ! br_cgu_servidores_federal__cadastro_pensionistas - br_cgu_servidores_federal__cadastro_pensionistas = copy(flow_cgu_servidores_publicos) br_cgu_servidores_federal__cadastro_pensionistas.name = ("br_cgu_servidores_executivo_federal.cadastro_pensionistas") br_cgu_servidores_federal__cadastro_pensionistas.code_owners = ["trick"] From 3831d07513a0df3b18362fc532b755cb8a309d47 Mon Sep 17 00:00:00 2001 From: tricktx Date: Thu, 7 Nov 2024 12:24:19 -0300 Subject: [PATCH 6/9] fix dataset_id --- pipelines/utils/crawler_cgu/tasks.py | 4 +++- pipelines/utils/crawler_cgu/utils.py | 8 ++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/pipelines/utils/crawler_cgu/tasks.py b/pipelines/utils/crawler_cgu/tasks.py index 2eed65bd9..447374e8b 100644 --- a/pipelines/utils/crawler_cgu/tasks.py +++ b/pipelines/utils/crawler_cgu/tasks.py @@ -114,7 +114,7 @@ def verify_all_url_exists_to_download(dataset_id, table_id, relative_month) -> b ) urls = build_urls( - constants.TABELA_SERVIDORES.value, + dataset_id, constants.URL_SERVIDORES.value, next_date_in_api.year, next_date_in_api.month, @@ -122,6 +122,8 @@ def verify_all_url_exists_to_download(dataset_id, table_id, relative_month) -> b ) for url in urls: + log(f"Verificando se a URL {url=} existe") + if requests.get(url).status_code != 200: log(f"A URL {url=} não existe!") return False diff --git a/pipelines/utils/crawler_cgu/utils.py b/pipelines/utils/crawler_cgu/utils.py index 55e48b815..07d993886 100644 --- a/pipelines/utils/crawler_cgu/utils.py +++ b/pipelines/utils/crawler_cgu/utils.py @@ -49,6 +49,9 @@ def build_urls(dataset_id: str, url: str, year: int, month: int, table_id: str) str: A single URL string if the model is 'TABELA'. list: A list of URL strings if the model is 'TABELA_SERVIDORES'. """ + + log(f"{dataset_id=}") + if dataset_id == "br_cgu_cartao_pagamento": log(f"{url}{year}{str(month).zfill(2)}/") @@ -153,7 +156,7 @@ def download_file(dataset_id: str, table_id: str, year: int, month: int, relativ constants_cgu_servidores = constants.TABELA_SERVIDORES.value[table_id] # ! CGU - Servidores Públicos do Executivo Federal url = build_urls( - constants.TABELA_SERVIDORES.value, + dataset_id, constants.URL_SERVIDORES.value, year, month, @@ -171,6 +174,7 @@ def download_file(dataset_id: str, table_id: str, year: int, month: int, relativ table_id=table_id, relative_month=relative_month, ) + return next_date_in_api def read_csv( @@ -335,7 +339,7 @@ def read_and_clean_csv(table_id: str) -> pd.DataFrame: create_column_ano(df, get_csv) create_column_month(df, get_csv) if "origem" in df_architecture["name"].to_list(): - df["origem"] = constants_cgu_servidores(csv_path) + df["origem"] = get_source(table_id, csv_path) append_dataframe.append(df) From c80f52738ecf738dcf74b2632f20a85c45635008 Mon Sep 17 00:00:00 2001 From: tricktx Date: Mon, 11 Nov 2024 11:49:46 -0300 Subject: [PATCH 7/9] register flow prefect --- .../datasets/br_cgu_servidores_executivo_federal/flows.py | 1 - pipelines/utils/crawler_cgu/flows.py | 1 - pipelines/utils/crawler_cgu/tasks.py | 1 - pipelines/utils/crawler_cgu/utils.py | 3 ++- 4 files changed, 2 insertions(+), 4 deletions(-) diff --git a/pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py b/pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py index f88fe58a4..0d38300ec 100644 --- a/pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py +++ b/pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py @@ -13,7 +13,6 @@ every_day_remuneracao, every_day_observacoes, ) - # ! br_cgu_servidores_federal__afastamentos br_cgu_servidores_federal__afastamentos = copy(flow_cgu_servidores_publicos) diff --git a/pipelines/utils/crawler_cgu/flows.py b/pipelines/utils/crawler_cgu/flows.py index 7eb7673d0..c1b5536c2 100644 --- a/pipelines/utils/crawler_cgu/flows.py +++ b/pipelines/utils/crawler_cgu/flows.py @@ -216,6 +216,5 @@ bq_project="basedosdados", upstream_tasks=[wait_for_materialization], ) - flow_cgu_servidores_publicos.storage = GCS(constants.GCS_FLOWS_BUCKET.value) flow_cgu_servidores_publicos.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) diff --git a/pipelines/utils/crawler_cgu/tasks.py b/pipelines/utils/crawler_cgu/tasks.py index 447374e8b..9fa025fe0 100644 --- a/pipelines/utils/crawler_cgu/tasks.py +++ b/pipelines/utils/crawler_cgu/tasks.py @@ -123,7 +123,6 @@ def verify_all_url_exists_to_download(dataset_id, table_id, relative_month) -> b for url in urls: log(f"Verificando se a URL {url=} existe") - if requests.get(url).status_code != 200: log(f"A URL {url=} não existe!") return False diff --git a/pipelines/utils/crawler_cgu/utils.py b/pipelines/utils/crawler_cgu/utils.py index 07d993886..001c13c5a 100644 --- a/pipelines/utils/crawler_cgu/utils.py +++ b/pipelines/utils/crawler_cgu/utils.py @@ -163,7 +163,8 @@ def download_file(dataset_id: str, table_id: str, year: int, month: int, relativ table_id, ) input_dirs = build_input(table_id) - + log(url) + log(input_dirs) for urls, input_dir in zip(url, input_dirs): if requests.get(urls).status_code == 200: destino = f"{constants_cgu_servidores['INPUT']}/{input_dir}" From 191e98b90c40b836d4db7fb740965d077c724e9c Mon Sep 17 00:00:00 2001 From: tricktx Date: Wed, 13 Nov 2024 17:00:20 -0300 Subject: [PATCH 8/9] remove test in dbt --- pipelines/utils/crawler_cgu/flows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/utils/crawler_cgu/flows.py b/pipelines/utils/crawler_cgu/flows.py index c1b5536c2..01e15d5f1 100644 --- a/pipelines/utils/crawler_cgu/flows.py +++ b/pipelines/utils/crawler_cgu/flows.py @@ -86,7 +86,7 @@ "table_id": table_id, "mode": materialization_mode, "dbt_alias": dbt_alias, - "dbt_command": "run/test", + "dbt_command": "run", "disable_elementary": False, }, labels=current_flow_labels, From 33723b97b23035d7b640ef4ca76bcf46d9c41db9 Mon Sep 17 00:00:00 2001 From: tricktx Date: Thu, 21 Nov 2024 10:19:59 -0300 Subject: [PATCH 9/9] small fix, remove test --- .../br_cgu_servidores_executivo_federal/flows.py | 16 ++++++++-------- pipelines/utils/crawler_cgu/flows.py | 4 ++-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py b/pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py index 0d38300ec..93d2882fc 100644 --- a/pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py +++ b/pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -from copy import copy, copy +from copy import copy, deepcopy from prefect.run_configs import KubernetesRun from prefect.storage import GCS from pipelines.utils.crawler_cgu.flows import flow_cgu_servidores_publicos @@ -15,7 +15,7 @@ ) # ! br_cgu_servidores_federal__afastamentos -br_cgu_servidores_federal__afastamentos = copy(flow_cgu_servidores_publicos) +br_cgu_servidores_federal__afastamentos = deepcopy(flow_cgu_servidores_publicos) br_cgu_servidores_federal__afastamentos.name = ("br_cgu_servidores_executivo_federal.afastamentos") br_cgu_servidores_federal__afastamentos.code_owners = ["trick"] br_cgu_servidores_federal__afastamentos.storage = GCS(constants.GCS_FLOWS_BUCKET.value) @@ -24,7 +24,7 @@ # ! br_cgu_servidores_federal__cadastro_aposentados -br_cgu_servidores_federal__cadastro_aposentados = copy(flow_cgu_servidores_publicos) +br_cgu_servidores_federal__cadastro_aposentados = deepcopy(flow_cgu_servidores_publicos) br_cgu_servidores_federal__cadastro_aposentados.name = ("br_cgu_servidores_executivo_federal.cadastro_aposentados") br_cgu_servidores_federal__cadastro_aposentados.code_owners = ["trick"] br_cgu_servidores_federal__cadastro_aposentados.storage = GCS(constants.GCS_FLOWS_BUCKET.value) @@ -33,7 +33,7 @@ # ! br_cgu_servidores_federal__cadastro_pensionistas -br_cgu_servidores_federal__cadastro_pensionistas = copy(flow_cgu_servidores_publicos) +br_cgu_servidores_federal__cadastro_pensionistas = deepcopy(flow_cgu_servidores_publicos) br_cgu_servidores_federal__cadastro_pensionistas.name = ("br_cgu_servidores_executivo_federal.cadastro_pensionistas") br_cgu_servidores_federal__cadastro_pensionistas.code_owners = ["trick"] br_cgu_servidores_federal__cadastro_pensionistas.storage = GCS(constants.GCS_FLOWS_BUCKET.value) @@ -42,7 +42,7 @@ # ! br_cgu_servidores_federal__cadastro_reserva_reforma_militares -br_cgu_servidores_federal__cadastro_reserva_reforma_militares = copy(flow_cgu_servidores_publicos) +br_cgu_servidores_federal__cadastro_reserva_reforma_militares = deepcopy(flow_cgu_servidores_publicos) br_cgu_servidores_federal__cadastro_reserva_reforma_militares.name = ("br_cgu_servidores_executivo_federal.cadastro_reserva_reforma_militares") br_cgu_servidores_federal__cadastro_reserva_reforma_militares.code_owners = ["trick"] br_cgu_servidores_federal__cadastro_reserva_reforma_militares.storage = GCS(constants.GCS_FLOWS_BUCKET.value) @@ -51,7 +51,7 @@ # ! br_cgu_servidores_federal__cadastro_servidores -br_cgu_servidores_federal__cadastro_servidores = copy(flow_cgu_servidores_publicos) +br_cgu_servidores_federal__cadastro_servidores = deepcopy(flow_cgu_servidores_publicos) br_cgu_servidores_federal__cadastro_servidores.name = ("br_cgu_servidores_executivo_federal.cadastro_servidores") br_cgu_servidores_federal__cadastro_servidores.code_owners = ["trick"] br_cgu_servidores_federal__cadastro_servidores.storage = GCS(constants.GCS_FLOWS_BUCKET.value) @@ -60,7 +60,7 @@ # ! br_cgu_servidores_federal__observacoes -br_cgu_servidores_federal__observacoes = copy(flow_cgu_servidores_publicos) +br_cgu_servidores_federal__observacoes = deepcopy(flow_cgu_servidores_publicos) br_cgu_servidores_federal__observacoes.name = ("br_cgu_servidores_executivo_federal.observacoes") br_cgu_servidores_federal__observacoes.code_owners = ["trick"] br_cgu_servidores_federal__observacoes.storage = GCS(constants.GCS_FLOWS_BUCKET.value) @@ -69,7 +69,7 @@ # ! br_cgu_servidores_federal__remuneracao -br_cgu_servidores_federal__remuneracao = copy(flow_cgu_servidores_publicos) +br_cgu_servidores_federal__remuneracao = deepcopy(flow_cgu_servidores_publicos) br_cgu_servidores_federal__remuneracao.name = ("br_cgu_servidores_executivo_federal.remuneracao") br_cgu_servidores_federal__remuneracao.code_owners = ["trick"] br_cgu_servidores_federal__remuneracao.storage = GCS(constants.GCS_FLOWS_BUCKET.value) diff --git a/pipelines/utils/crawler_cgu/flows.py b/pipelines/utils/crawler_cgu/flows.py index 01e15d5f1..ccc914cab 100644 --- a/pipelines/utils/crawler_cgu/flows.py +++ b/pipelines/utils/crawler_cgu/flows.py @@ -36,7 +36,7 @@ #### # Relative_month = 1 means that the data will be downloaded for the current month #### - relative_month = Parameter("relative_month", default=-1, required=False) + relative_month = Parameter("relative_month", default=1, required=False) materialization_mode = Parameter("materialization_mode", default="dev", required=False) materialize_after_dump = Parameter("materialize_after_dump", default=True, required=False) dbt_alias = Parameter("dbt_alias", default=True, required=False) @@ -183,7 +183,7 @@ "table_id": table_id, "mode": materialization_mode, "dbt_alias": dbt_alias, - "dbt_command": "run/test", + "dbt_command": "run", "disable_elementary": False, }, labels=current_flow_labels,