From bbb3eaecd657837b220fd9ad4a90140e3fb7663b Mon Sep 17 00:00:00 2001 From: tricktx Date: Thu, 28 Nov 2024 09:27:31 -0300 Subject: [PATCH 1/9] add pipeline licitacao_contrato cgu --- .../br_cgu_licitacao_contrato/__init__.py | 2 + .../br_cgu_licitacao_contrato/flows.py | 86 ++++++++++ .../br_cgu_licitacao_contrato/schedules.py | 150 ++++++++++++++++++ pipelines/utils/crawler_cgu/constants.py | 63 +++++++- pipelines/utils/crawler_cgu/flows.py | 100 ++++++++++++ pipelines/utils/crawler_cgu/tasks.py | 39 +++-- pipelines/utils/crawler_cgu/utils.py | 98 +++++++++--- 7 files changed, 494 insertions(+), 44 deletions(-) create mode 100644 pipelines/datasets/br_cgu_licitacao_contrato/__init__.py create mode 100644 pipelines/datasets/br_cgu_licitacao_contrato/flows.py create mode 100644 pipelines/datasets/br_cgu_licitacao_contrato/schedules.py diff --git a/pipelines/datasets/br_cgu_licitacao_contrato/__init__.py b/pipelines/datasets/br_cgu_licitacao_contrato/__init__.py new file mode 100644 index 000000000..633f86615 --- /dev/null +++ b/pipelines/datasets/br_cgu_licitacao_contrato/__init__.py @@ -0,0 +1,2 @@ +# -*- coding: utf-8 -*- + diff --git a/pipelines/datasets/br_cgu_licitacao_contrato/flows.py b/pipelines/datasets/br_cgu_licitacao_contrato/flows.py new file mode 100644 index 000000000..d9499a9f7 --- /dev/null +++ b/pipelines/datasets/br_cgu_licitacao_contrato/flows.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +""" +Flows for br_cgu_licitacao_contrato +""" + +from copy import deepcopy, copy +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS +from pipelines.utils.crawler_cgu.flows import flow_cgu_licitacao_contrato +from pipelines.constants import constants +from pipelines.datasets.br_cgu_licitacao_contrato.schedules import ( + every_day_contrato_compra, + every_day_contrato_item, + every_day_contrato_termo_aditivo, + every_day_licitacao, + every_day_licitacao_empenho, + every_day_licitacao_item, + every_day_licitacao_participante +) + +# ! ------------------ Contrato Compra ------------------ + +br_cgu_licitacao_contrato__contrato_compra = deepcopy(flow_cgu_licitacao_contrato) +br_cgu_licitacao_contrato__contrato_compra.name = ("br_cgu_licitacao_contrato.contrato_compra") +br_cgu_licitacao_contrato__contrato_compra.code_owners = ["trick"] +br_cgu_licitacao_contrato__contrato_compra.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_licitacao_contrato__contrato_compra.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_licitacao_contrato__contrato_compra.schedule = every_day_contrato_compra + + +# ! ------------------ Contrato Item ------------------ + +br_cgu_licitacao_contrato__contrato_item = deepcopy(flow_cgu_licitacao_contrato) +br_cgu_licitacao_contrato__contrato_item.name = "br_cgu_licitacao_contrato.contrato_item" +br_cgu_licitacao_contrato__contrato_item.code_owners = ["trick"] +br_cgu_licitacao_contrato__contrato_item.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_licitacao_contrato__contrato_item.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_licitacao_contrato__contrato_item.schedule = every_day_contrato_item + +# ! ------------------ Contrato Termo Aditivo ------------------ + +br_cgu_licitacao_contrato__contrato_termo_aditivo = deepcopy(flow_cgu_licitacao_contrato) +br_cgu_licitacao_contrato__contrato_termo_aditivo.name = ("br_cgu_licitacao_contrato.contrato_termo_aditivo") +br_cgu_licitacao_contrato__contrato_termo_aditivo.code_owners = ["trick"] +br_cgu_licitacao_contrato__contrato_termo_aditivo.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_licitacao_contrato__contrato_termo_aditivo.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_licitacao_contrato__contrato_termo_aditivo.schedule = (every_day_contrato_termo_aditivo) + + +# ! ------------------ Licitação ------------------ + +br_cgu_licitacao_contrato__licitacao = deepcopy(flow_cgu_licitacao_contrato) +br_cgu_licitacao_contrato__licitacao.name = ("br_cgu_licitacao_contrato.licitacao") +br_cgu_licitacao_contrato__licitacao.code_owners = ["trick"] +br_cgu_licitacao_contrato__licitacao.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_licitacao_contrato__licitacao.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_licitacao_contrato__licitacao.schedule = (every_day_licitacao) + + +# ! ------------------ Licitação Empenho ------------------ + +br_cgu_licitacao_contrato__licitacao_empenho = deepcopy(flow_cgu_licitacao_contrato) +br_cgu_licitacao_contrato__licitacao_empenho.name = ("br_cgu_licitacao_contrato.licitacao_empenho") +br_cgu_licitacao_contrato__licitacao_empenho.code_owners = ["trick"] +br_cgu_licitacao_contrato__licitacao_empenho.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_licitacao_contrato__licitacao_empenho.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_licitacao_contrato__licitacao_empenho.schedule = (every_day_licitacao_empenho) + +# ! ------------------ Licitação Item ------------------ + +br_cgu_licitacao_contrato__licitacao_item = deepcopy(flow_cgu_licitacao_contrato) +br_cgu_licitacao_contrato__licitacao_item.name = ("br_cgu_licitacao_contrato.licitacao_item") +br_cgu_licitacao_contrato__licitacao_item.code_owners = ["trick"] +br_cgu_licitacao_contrato__licitacao_item.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_licitacao_contrato__licitacao_item.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_licitacao_contrato__licitacao_item.schedule = (every_day_licitacao_item) + + +# ! ------------------ Licitação Participante ------------------ + +br_cgu_licitacao_contrato__licitacao_participante = deepcopy(flow_cgu_licitacao_contrato) +br_cgu_licitacao_contrato__licitacao_participante.name = ("br_cgu_licitacao_contrato.licitacao_participante") +br_cgu_licitacao_contrato__licitacao_participante.code_owners = ["trick"] +br_cgu_licitacao_contrato__licitacao_participante.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_licitacao_contrato__licitacao_participante.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_licitacao_contrato__licitacao_participante.schedule = (every_day_licitacao_participante) \ No newline at end of file diff --git a/pipelines/datasets/br_cgu_licitacao_contrato/schedules.py b/pipelines/datasets/br_cgu_licitacao_contrato/schedules.py new file mode 100644 index 000000000..e04deb3a8 --- /dev/null +++ b/pipelines/datasets/br_cgu_licitacao_contrato/schedules.py @@ -0,0 +1,150 @@ +# -*- coding: utf-8 -*- +""" +Schedules for br_cgu_licitacao_contrato +""" +# -*- coding: utf-8 -*- +from datetime import datetime +from prefect.schedules import Schedule +from prefect.schedules.clocks import CronClock, IntervalClock +from pipelines.constants import constants +from pipelines.utils.crawler_cgu.constants import constants as constants_cgu + +every_day_contrato_compra = Schedule( + clocks=[ + CronClock( + cron="0 21 * * *", + start_date=datetime(2021, 3, 31, 17, 11), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_cgu_licitacao_contrato", + "table_id": "contrato_compra", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "update_metadata": True, + }, + ), + ], +) + +every_day_contrato_item = Schedule( + clocks=[ + CronClock( + cron="15 20 * * *", + start_date=datetime(2021, 3, 31, 17, 11), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_cgu_licitacao_contrato", + "table_id": "contrato_item", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "update_metadata": True, + }, + ), + ], +) + +every_day_contrato_termo_aditivo = Schedule( + clocks=[ + CronClock( + cron="30 20 * * *", + start_date=datetime(2021, 3, 31, 17, 11), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_cgu_licitacao_contrato", + "table_id": "contrato_termo_aditivo", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "update_metadata": True, + }, + ), + ], +) + +every_day_licitacao = Schedule( + clocks=[ + CronClock( + cron="45 20 * * *", + start_date=datetime(2021, 3, 31, 17, 11), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_cgu_licitacao_contrato", + "table_id": "licitacao", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "update_metadata": True, + }, + ), + ], +) + +every_day_licitacao_empenho = Schedule( + clocks=[ + CronClock( + cron="0 21 * * *", + start_date=datetime(2021, 3, 31, 17, 11), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_cgu_licitacao_contrato", + "table_id": "licitacao_empenho", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "update_metadata": True, + }, + ), + ], +) + +every_day_licitacao_item = Schedule( + clocks=[ + CronClock( + cron="15 20 * * *", + start_date=datetime(2021, 3, 31, 17, 11), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_cgu_licitacao_contrato", + "table_id": "licitacao_item", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "update_metadata": True, + }, + ), + ], +) + +every_day_licitacao_participante = Schedule( + clocks=[ + CronClock( + cron="30 20 * * *", + start_date=datetime(2021, 3, 31, 17, 11), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_cgu_licitacao_contrato", + "table_id": "licitacao_participante", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "update_metadata": True, + }, + ), + ], +) diff --git a/pipelines/utils/crawler_cgu/constants.py b/pipelines/utils/crawler_cgu/constants.py index 4133f1657..ab32534c9 100644 --- a/pipelines/utils/crawler_cgu/constants.py +++ b/pipelines/utils/crawler_cgu/constants.py @@ -6,34 +6,36 @@ from enum import Enum class constants(Enum): # pylint: disable=c0103 + + # ! ================================ CGU - Cartão de Pagamento =========================================== """ Constant values for the br_cgu_cartao_pagamento project """ TABELA = { "microdados_governo_federal" : { - "INPUT_DATA" : "/tmp/input/microdados_governo_federal", - "OUTPUT_DATA" : "/tmp/output/microdados_governo_federal", + "INPUT" : "/tmp/input/microdados_governo_federal", + "OUTPUT" : "/tmp/output/microdados_governo_federal", "URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpgf/", "READ" : "_CPGF", "ONLY_ONE_FILE" : False}, "microdados_compras_centralizadas" : { - "INPUT_DATA" : "/tmp/input/microdados_compras_centralizadas", - "OUTPUT_DATA" : "/tmp/output/microdados_compras_centralizadas", + "INPUT" : "/tmp/input/microdados_compras_centralizadas", + "OUTPUT" : "/tmp/output/microdados_compras_centralizadas", "URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpcc/", "READ" : "_CPGFComprasCentralizadas", "ONLY_ONE_FILE" : False}, "microdados_defesa_civil" : { - "INPUT_DATA" : "/tmp/input/microdados_defesa_civil", - "OUTPUT_DATA" : "/tmp/output/microdados_defesa_civil", + "INPUT" : "/tmp/input/microdados_defesa_civil", + "OUTPUT" : "/tmp/output/microdados_defesa_civil", "URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpdc/", "READ" : "_CPDC", "ONLY_ONE_FILE" : False} } - # ! ============================================== CGU - Servidores Públicos do Executivo Federal ============================================== + # ! ================================ CGU - Servidores Públicos do Executivo Federal =========================================== URL_SERVIDORES = "http://portaldatransparencia.gov.br/download-de-dados/servidores/" TABELA_SERVIDORES = { @@ -126,3 +128,50 @@ class constants(Enum): # pylint: disable=c0103 "OUTPUT": "/tmp/output/cgu_servidores/cadastro_servidores", }, } + + # ! ================================ CGU - Licitação e Contrato =========================================== + + TABELA_LICITACAO_CONTRATO = { + "licitacao": { + "URL": "https://portaldatransparencia.gov.br/download-de-dados/licitacoes/", + "INPUT": "/tmp/input/cgu_licitacao_contrato/licitacao", + "OUTPUT": "/tmp/output/cgu_licitacao_contrato/licitacao", + "READ": "_Licitação.csv", + }, + "licitacao_participante": { + "URL": "https://portaldatransparencia.gov.br/download-de-dados/licitacoes/", + "INPUT": "/tmp/input/cgu_licitacao_contrato/licitacao_participante", + "OUTPUT": "/tmp/output/cgu_licitacao_contrato/licitacao_participante", + "READ": "_ParticipantesLicitação.csv", + }, + "licitacao_item": { + "URL": "https://portaldatransparencia.gov.br/download-de-dados/licitacoes/", + "INPUT": "/tmp/input/cgu_licitacao_contrato/licitacao_item", + "OUTPUT": "/tmp/output/cgu_licitacao_contrato/licitacao_item", + "READ": "_ItemLicitação.csv", + }, + "licitacao_empenho": { + "URL": "https://portaldatransparencia.gov.br/download-de-dados/licitacoes/", + "INPUT": "/tmp/input/cgu_licitacao_contrato/licitacao_empenho", + "OUTPUT": "/tmp/output/cgu_licitacao_contrato/licitacao_empenho", + "READ": "_EmpenhosRelacionados.csv", + }, + "contrato_compra": { + "URL": "https://portaldatransparencia.gov.br/download-de-dados/compras/", + "INPUT": "/tmp/input/cgu_licitacao_contrato/contrato_compra", + "OUTPUT": "/tmp/output/cgu_licitacao_contrato/contrato_compra", + "READ": "_Compras.csv", + }, + "contrato_item": { + "URL": "https://portaldatransparencia.gov.br/download-de-dados/compras/", + "INPUT": "/tmp/input/cgu_licitacao_contrato/contrato_item", + "OUTPUT": "/tmp/output/cgu_licitacao_contrato/contrato_item", + "READ": "_ItemCompra.csv", + }, + "contrato_termo_aditivo": { + "URL": "https://portaldatransparencia.gov.br/download-de-dados/compras/", + "INPUT": "/tmp/input/cgu_licitacao_contrato/contrato_termo_aditivo", + "OUTPUT": "/tmp/output/cgu_licitacao_contrato/contrato_termo_aditivo", + "READ": "_TermoAditivo.csv", + }, + } diff --git a/pipelines/utils/crawler_cgu/flows.py b/pipelines/utils/crawler_cgu/flows.py index ccc914cab..8f51dac5a 100644 --- a/pipelines/utils/crawler_cgu/flows.py +++ b/pipelines/utils/crawler_cgu/flows.py @@ -218,3 +218,103 @@ ) flow_cgu_servidores_publicos.storage = GCS(constants.GCS_FLOWS_BUCKET.value) flow_cgu_servidores_publicos.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) + + +# ! ============================================== CGU - Licitacao e Contrato ============================================== + +with Flow(name="CGU - Licitacao e Contrato") as flow_cgu_licitacao_contrato: + + dataset_id = Parameter( + "dataset_id", default="br_cgu_licitacao_contrato", required=True + ) + table_id = Parameter( + "table_id", required=True + ) + #### + # Relative_month = 1 means that the data will be downloaded for the current month + #### + relative_month = Parameter("relative_month", default=1, required=False) + materialization_mode = Parameter("materialization_mode", default="dev", required=False) + materialize_after_dump = Parameter("materialize_after_dump", default=True, required=False) + dbt_alias = Parameter("dbt_alias", default=True, required=False) + update_metadata = Parameter("update_metadata", default=False, required=False) + rename_flow_run = rename_current_flow_run_dataset_table(prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id) + + data_source_max_date = get_current_date_and_download_file( + table_id=table_id, + dataset_id=dataset_id, + relative_month=relative_month, + ) + + dados_desatualizados = check_if_data_is_outdated( + dataset_id=dataset_id, + table_id=table_id, + data_source_max_date=data_source_max_date, + date_format="%Y-%m", + upstream_tasks=[data_source_max_date], + ) + + with case(dados_desatualizados, True): + + filepath = partition_data( + table_id=table_id, + dataset_id=dataset_id, + upstream_tasks=[data_source_max_date], + ) + + wait_upload_table = create_table_and_upload_to_gcs( + data_path=filepath, + dataset_id=dataset_id, + table_id=table_id, + dump_mode="append", + wait=filepath, + upstream_tasks=[filepath], + ) + + with case(materialize_after_dump, True): + + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": table_id, + "mode": materialization_mode, + "dbt_alias": dbt_alias, + "dbt_command": "run", + "disable_elementary": False, + }, + labels=current_flow_labels, + run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks=[wait_upload_table], + ) + + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + upstream_tasks=[materialization_flow], + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + with case(update_metadata, True): + update_django_metadata( + dataset_id=dataset_id, + table_id=table_id, + date_column_name={"year": "ano_extrato", "month": "mes_extrato"}, + date_format="%Y-%m", + coverage_type="part_bdpro", + time_delta={"months": 6}, + prefect_mode=materialization_mode, + bq_project="basedosdados", + upstream_tasks=[wait_for_materialization], + ) + +flow_cgu_licitacao_contrato.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +flow_cgu_licitacao_contrato.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) diff --git a/pipelines/utils/crawler_cgu/tasks.py b/pipelines/utils/crawler_cgu/tasks.py index 9fa025fe0..89a424fdd 100644 --- a/pipelines/utils/crawler_cgu/tasks.py +++ b/pipelines/utils/crawler_cgu/tasks.py @@ -38,18 +38,31 @@ def partition_data(table_id: str, dataset_id : str) -> str: str: The path where the partitioned data is saved. """ - if dataset_id == "br_cgu_cartao_pagamento": + if dataset_id in ["br_cgu_cartao_pagamento", "br_cgu_licitacao_contrato"]: log("---------------------------- Read data ----------------------------") - df = read_csv(table_id = table_id, - url = constants.TABELA.value[table_id]['URL']) - log(" ---------------------------- Partiting data -----------------------") - to_partitions( - data = df, - partition_columns=['ANO_EXTRATO', 'MES_EXTRATO'], - savepath = constants.TABELA.value[table_id]['OUTPUT_DATA'], - file_type='csv') - log("---------------------------- Data partitioned ----------------------") - return constants.TABELA.value[table_id]['OUTPUT_DATA'] + df = read_csv(dataset_id = dataset_id, table_id = table_id) + log(df.head()) + if dataset_id == "br_cgu_cartao_pagamento:": + log(" ---------------------------- Partiting data -----------------------") + to_partitions( + data = df, + partition_columns=['ANO_EXTRATO', 'MES_EXTRATO'], + savepath = constants.TABELA.value[table_id]['OUTPUT'], + file_type='csv') + + log("---------------------------- Data partitioned ----------------------") + return constants.TABELA.value[table_id]['OUTPUT'] + + if dataset_id == "br_cgu_licitacao_contrato": + log(" ---------------------------- Partiting data -----------------------") + to_partitions( + data=df, + partition_columns=["ano", "mes"], + savepath=constants.TABELA_LICITACAO_CONTRATO.value[table_id]["OUTPUT"], + file_type="csv", + ) + log("---------------------------- Data partitioned ----------------------") + return constants.TABELA_LICITACAO_CONTRATO.value[table_id]["OUTPUT"] elif dataset_id == "br_cgu_servidores_executivo_federal": @@ -84,7 +97,9 @@ def get_current_date_and_download_file(table_id : str, table_id = table_id, relative_month = relative_month ) - + log(f"Last date in API: {last_date_in_api}") + log(f"Next date in API: {next_date_in_api}") + max_date = str(download_file(table_id = table_id, dataset_id = dataset_id, year = next_date_in_api.year, diff --git a/pipelines/utils/crawler_cgu/utils.py b/pipelines/utils/crawler_cgu/utils.py index 001c13c5a..f76b53121 100644 --- a/pipelines/utils/crawler_cgu/utils.py +++ b/pipelines/utils/crawler_cgu/utils.py @@ -7,8 +7,7 @@ from dateutil.relativedelta import relativedelta import gc import shutil -import io -import zipfile +from rapidfuzz import process import pandas as pd import os import unidecode @@ -52,7 +51,7 @@ def build_urls(dataset_id: str, url: str, year: int, month: int, table_id: str) log(f"{dataset_id=}") - if dataset_id == "br_cgu_cartao_pagamento": + if dataset_id in ["br_cgu_cartao_pagamento", "br_cgu_licitacao_contrato"]: log(f"{url}{year}{str(month).zfill(2)}/") return f"{url}{year}{str(month).zfill(2)}/" @@ -94,6 +93,19 @@ def build_input(table_id): return list_input +# municipio = bd.read_table( +# "br_bd_diretorios_brasil", "municipio", billing_project_id="basedosdados" +# ) + + +# def get_similar_cities_process(city): +# municipio["cidade_uf"] = ( +# municipio["nome"].apply(lambda x: x.upper()) + "-" + municipio["sigla_uf"] +# ) +# results = process.extractOne(city, municipio["cidade_uf"], score_cutoff=70) +# return results[0] if results else None + + def download_file(dataset_id: str, table_id: str, year: int, month: int, relative_month=int) -> None: """ Downloads and unzips a file from a specified URL based on the given table ID, year, and month. @@ -110,10 +122,13 @@ def download_file(dataset_id: str, table_id: str, year: int, month: int, relativ str: The last date in the API if the URL is not found. """ - if dataset_id == "br_cgu_cartao_pagamento": - value_constants = constants.TABELA.value[table_id] # ! CGU - Cartão de Pagamento + if dataset_id in ["br_cgu_cartao_pagamento" and "br_cgu_licitacao_contrato"]: + if dataset_id == "br_cgu_cartao_pagamento": + value_constants = constants.TABELA.value[table_id] # ! CGU - Cartão de Pagamento + elif dataset_id == "br_cgu_licitacao_contrato": + value_constants = constants.TABELA_LICITACAO_CONTRATO.value[table_id] - input = value_constants["INPUT_DATA"] + input = value_constants["INPUT"] if not os.path.exists(input): os.makedirs(input) @@ -129,10 +144,10 @@ def download_file(dataset_id: str, table_id: str, year: int, month: int, relativ status = requests.get(url).status_code == 200 if status: log(f'------------------ URL = {url} ------------------') - download_and_unzip_file(url, value_constants['INPUT_DATA']) + download_and_unzip_file(url, value_constants['INPUT']) last_date_in_api, next_date_in_api = last_date_in_metadata( - dataset_id="br_cgu_cartao_pagamento", + dataset_id=dataset_id, table_id=table_id, relative_month=relative_month ) @@ -144,7 +159,7 @@ def download_file(dataset_id: str, table_id: str, year: int, month: int, relativ log(f'------------------ URL = {url} ------------------') last_date_in_api, next_date_in_api = last_date_in_metadata( - dataset_id="br_cgu_cartao_pagamento", + dataset_id=dataset_id, table_id=table_id, relative_month=relative_month ) @@ -178,8 +193,9 @@ def download_file(dataset_id: str, table_id: str, year: int, month: int, relativ return next_date_in_api + def read_csv( - table_id: str, column_replace: List = ["VALOR_TRANSACAO"] + dataset_id : str, table_id: str, column_replace: List = ["VALOR_TRANSACAO"] ) -> pd.DataFrame: """ Reads a CSV file from a specified path and processes its columns. @@ -198,28 +214,60 @@ def read_csv( - Column names are converted to uppercase, spaces are replaced with underscores, and accents are removed. - For columns specified in `column_replace`, commas in their values are replaced with dots and the values are converted to float. """ - value_constants = constants.TABELA_SERVIDORES.value[table_id] - - os.listdir(value_constants["INPUT"]) - csv_file = [ - f for f in os.listdir(value_constants["INPUT_DATA"]) if f.endswith(".csv") - ][0] - log(f"CSV files: {csv_file}") + if dataset_id == "br_cgu_cartao_pagamento": + value_constants = constants.TABELA.value[table_id] - df = pd.read_csv( - f"{value_constants['INPUT_DATA']}/{csv_file}", sep=";", encoding="latin1" - ) + os.listdir(value_constants["INPUT"]) - df.columns = [unidecode.unidecode(x).upper().replace(" ", "_") for x in df.columns] + csv_file = [ + f for f in os.listdir(value_constants["INPUT"]) if f.endswith(".csv") + ][0] + log(f"CSV files: {csv_file}") - for list_column_replace in column_replace: - df[list_column_replace] = ( - df[list_column_replace].str.replace(",", ".").astype(float) + df = pd.read_csv( + f"{value_constants['INPUT']}/{csv_file}", sep=";", encoding="latin1" ) - return df + df.columns = [unidecode.unidecode(x).upper().replace(" ", "_") for x in df.columns] + + for list_column_replace in column_replace: + df[list_column_replace] = ( + df[list_column_replace].str.replace(",", ".").astype(float) + ) + + return df + + if dataset_id == "br_cgu_licitacao_contrato": + constants_cgu_licitacao_contrato = constants.TABELA_LICITACAO_CONTRATO.value[table_id] + print(os.listdir(constants_cgu_licitacao_contrato["INPUT"])) + csv_file = [ + f + for f in os.listdir(constants_cgu_licitacao_contrato["INPUT"]) + if f.endswith(constants_cgu_licitacao_contrato["READ"]) + ][0] + log(f"CSV files: {csv_file}") + log(f"{constants_cgu_licitacao_contrato['INPUT']}/{csv_file}") + df = pd.read_csv(f"{constants_cgu_licitacao_contrato['INPUT']}/{csv_file}", sep=";", encoding="latin1") + df['ano'] = csv_file[:4] + df['mes'] = csv_file[4:6] + + df.columns = [unidecode.unidecode(col) for col in df.columns] + df.columns = [col.replace(" ", "_").lower() for col in df.columns] + + # if table_id == "licitacao": + # df["cidade_uf"] = df["municipio"] + "-" + df["uf"] + + # df["cidade_uf_dir"] = df["cidade_uf"].apply( + # lambda x: get_similar_cities_process(x) + # ) + # df.drop(["cidade_uf", "municipio"], axis=1, inplace=True) + + # df.rename(columns={"cidade_uf_dir": "municipio"}, inplace=True) + + # df["municipio"] = df["municipio"].apply(lambda x: x if x == None else x.split("-")[0]) + return df def last_date_in_metadata( dataset_id: str, table_id: str, relative_month From d7d7ab835564a7dfb19575fd8092da4538ef761a Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 28 Nov 2024 12:31:01 +0000 Subject: [PATCH 2/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pipelines/datasets/br_cgu_licitacao_contrato/__init__.py | 2 -- pipelines/utils/crawler_cgu/tasks.py | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/pipelines/datasets/br_cgu_licitacao_contrato/__init__.py b/pipelines/datasets/br_cgu_licitacao_contrato/__init__.py index 633f86615..e69de29bb 100644 --- a/pipelines/datasets/br_cgu_licitacao_contrato/__init__.py +++ b/pipelines/datasets/br_cgu_licitacao_contrato/__init__.py @@ -1,2 +0,0 @@ -# -*- coding: utf-8 -*- - diff --git a/pipelines/utils/crawler_cgu/tasks.py b/pipelines/utils/crawler_cgu/tasks.py index 89a424fdd..22908242a 100644 --- a/pipelines/utils/crawler_cgu/tasks.py +++ b/pipelines/utils/crawler_cgu/tasks.py @@ -99,7 +99,7 @@ def get_current_date_and_download_file(table_id : str, ) log(f"Last date in API: {last_date_in_api}") log(f"Next date in API: {next_date_in_api}") - + max_date = str(download_file(table_id = table_id, dataset_id = dataset_id, year = next_date_in_api.year, From 8a5e9accc8fa9490bc5ca7d6ed13bd48178abd6d Mon Sep 17 00:00:00 2001 From: tricktx Date: Thu, 28 Nov 2024 10:15:16 -0300 Subject: [PATCH 3/9] register flow 1 --- pipelines/datasets/br_cgu_licitacao_contrato/flows.py | 4 ++-- pipelines/utils/crawler_cgu/flows.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pipelines/datasets/br_cgu_licitacao_contrato/flows.py b/pipelines/datasets/br_cgu_licitacao_contrato/flows.py index d9499a9f7..a410a9bb4 100644 --- a/pipelines/datasets/br_cgu_licitacao_contrato/flows.py +++ b/pipelines/datasets/br_cgu_licitacao_contrato/flows.py @@ -18,7 +18,7 @@ every_day_licitacao_participante ) -# ! ------------------ Contrato Compra ------------------ +# ! ------------------ Contrato Compra -------------------- br_cgu_licitacao_contrato__contrato_compra = deepcopy(flow_cgu_licitacao_contrato) br_cgu_licitacao_contrato__contrato_compra.name = ("br_cgu_licitacao_contrato.contrato_compra") @@ -28,7 +28,7 @@ br_cgu_licitacao_contrato__contrato_compra.schedule = every_day_contrato_compra -# ! ------------------ Contrato Item ------------------ +# ! ------------------ Contrato Item -------------------- br_cgu_licitacao_contrato__contrato_item = deepcopy(flow_cgu_licitacao_contrato) br_cgu_licitacao_contrato__contrato_item.name = "br_cgu_licitacao_contrato.contrato_item" diff --git a/pipelines/utils/crawler_cgu/flows.py b/pipelines/utils/crawler_cgu/flows.py index 8f51dac5a..27891e816 100644 --- a/pipelines/utils/crawler_cgu/flows.py +++ b/pipelines/utils/crawler_cgu/flows.py @@ -220,9 +220,9 @@ flow_cgu_servidores_publicos.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) -# ! ============================================== CGU - Licitacao e Contrato ============================================== +# ! ================================== CGU - Licitacao e Contrato ===================================== -with Flow(name="CGU - Licitacao e Contrato") as flow_cgu_licitacao_contrato: +with Flow(name="CGU - Licitacão e Contrato") as flow_cgu_licitacao_contrato: dataset_id = Parameter( "dataset_id", default="br_cgu_licitacao_contrato", required=True From a4106e541c734fbefab0fc1f638ce7b1b5cae32a Mon Sep 17 00:00:00 2001 From: tricktx Date: Thu, 28 Nov 2024 10:48:52 -0300 Subject: [PATCH 4/9] add model in init --- pipelines/datasets/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pipelines/datasets/__init__.py b/pipelines/datasets/__init__.py index c0ef34e76..b92a698a0 100644 --- a/pipelines/datasets/__init__.py +++ b/pipelines/datasets/__init__.py @@ -61,3 +61,4 @@ from pipelines.datasets.br_ms_sinan.flows import * from pipelines.datasets.br_cgu_emendas_parlamentares.flows import * from pipelines.datasets.br_cgu_cartao_pagamento.flows import * +from pipelines.datasets.br_cgu_licitacao_contrato.flows import * From 36a5ab6a01fbcbe128de100c80882f1513aa828e Mon Sep 17 00:00:00 2001 From: tricktx Date: Fri, 29 Nov 2024 10:24:02 -0300 Subject: [PATCH 5/9] add cache for loading cities and refactor city similarity function --- pipelines/utils/crawler_cgu/utils.py | 49 ++++++++++++++++------------ 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/pipelines/utils/crawler_cgu/utils.py b/pipelines/utils/crawler_cgu/utils.py index f76b53121..6b4f4e3cd 100644 --- a/pipelines/utils/crawler_cgu/utils.py +++ b/pipelines/utils/crawler_cgu/utils.py @@ -7,6 +7,7 @@ from dateutil.relativedelta import relativedelta import gc import shutil +from functools import lru_cache from rapidfuzz import process import pandas as pd import os @@ -93,19 +94,6 @@ def build_input(table_id): return list_input -# municipio = bd.read_table( -# "br_bd_diretorios_brasil", "municipio", billing_project_id="basedosdados" -# ) - - -# def get_similar_cities_process(city): -# municipio["cidade_uf"] = ( -# municipio["nome"].apply(lambda x: x.upper()) + "-" + municipio["sigla_uf"] -# ) -# results = process.extractOne(city, municipio["cidade_uf"], score_cutoff=70) -# return results[0] if results else None - - def download_file(dataset_id: str, table_id: str, year: int, month: int, relative_month=int) -> None: """ Downloads and unzips a file from a specified URL based on the given table ID, year, and month. @@ -194,6 +182,25 @@ def download_file(dataset_id: str, table_id: str, year: int, month: int, relativ return next_date_in_api +# Função para carregar o dataframe +@lru_cache(maxsize=1) # Cache para evitar recarregar a tabela +def load_municipio() -> None: + municipio : pd.DataFrame = bd.read_table( + "br_bd_diretorios_brasil", "municipio", billing_project_id="basedosdados" + ) + municipio["cidade_uf"] = ( + municipio["nome"].apply(lambda x: x.upper()) + "-" + municipio["sigla_uf"] + ) + return municipio + + + +def get_similar_cities_process(city): + municipio = load_municipio() + results = process.extractOne(city, municipio["cidade_uf"], score_cutoff=70) + return results[0] if results else None + + def read_csv( dataset_id : str, table_id: str, column_replace: List = ["VALOR_TRANSACAO"] ) -> pd.DataFrame: @@ -255,17 +262,17 @@ def read_csv( df.columns = [unidecode.unidecode(col) for col in df.columns] df.columns = [col.replace(" ", "_").lower() for col in df.columns] - # if table_id == "licitacao": - # df["cidade_uf"] = df["municipio"] + "-" + df["uf"] + if table_id == "licitacao": + df["cidade_uf"] = df["municipio"] + "-" + df["uf"] - # df["cidade_uf_dir"] = df["cidade_uf"].apply( - # lambda x: get_similar_cities_process(x) - # ) - # df.drop(["cidade_uf", "municipio"], axis=1, inplace=True) + df["cidade_uf_dir"] = df["cidade_uf"].apply( + lambda x: get_similar_cities_process(x) + ) + df.drop(["cidade_uf", "municipio"], axis=1, inplace=True) - # df.rename(columns={"cidade_uf_dir": "municipio"}, inplace=True) + df.rename(columns={"cidade_uf_dir": "municipio"}, inplace=True) - # df["municipio"] = df["municipio"].apply(lambda x: x if x == None else x.split("-")[0]) + df["municipio"] = df["municipio"].apply(lambda x: x if x == None else x.split("-")[0]) return df From 306ce0e24eeec2fd6d1c5ea30c5cb8b4707c64b6 Mon Sep 17 00:00:00 2001 From: tricktx Date: Fri, 29 Nov 2024 11:16:29 -0300 Subject: [PATCH 6/9] basedosdados to basedosdados-dev --- pipelines/utils/crawler_cgu/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/utils/crawler_cgu/utils.py b/pipelines/utils/crawler_cgu/utils.py index 6b4f4e3cd..4360245c4 100644 --- a/pipelines/utils/crawler_cgu/utils.py +++ b/pipelines/utils/crawler_cgu/utils.py @@ -186,7 +186,7 @@ def download_file(dataset_id: str, table_id: str, year: int, month: int, relativ @lru_cache(maxsize=1) # Cache para evitar recarregar a tabela def load_municipio() -> None: municipio : pd.DataFrame = bd.read_table( - "br_bd_diretorios_brasil", "municipio", billing_project_id="basedosdados" + "br_bd_diretorios_brasil", "municipio", billing_project_id="basedosdados-dev" ) municipio["cidade_uf"] = ( municipio["nome"].apply(lambda x: x.upper()) + "-" + municipio["sigla_uf"] From 7147b8dc1c082e7a60c6be7ab66808295c267211 Mon Sep 17 00:00:00 2001 From: tricktx Date: Fri, 29 Nov 2024 12:07:59 -0300 Subject: [PATCH 7/9] add from_file in load_municipio --- pipelines/utils/crawler_cgu/utils.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pipelines/utils/crawler_cgu/utils.py b/pipelines/utils/crawler_cgu/utils.py index 4360245c4..e1c418c3f 100644 --- a/pipelines/utils/crawler_cgu/utils.py +++ b/pipelines/utils/crawler_cgu/utils.py @@ -185,8 +185,11 @@ def download_file(dataset_id: str, table_id: str, year: int, month: int, relativ # Função para carregar o dataframe @lru_cache(maxsize=1) # Cache para evitar recarregar a tabela def load_municipio() -> None: - municipio : pd.DataFrame = bd.read_table( - "br_bd_diretorios_brasil", "municipio", billing_project_id="basedosdados-dev" + municipio: pd.DataFrame = bd.read_table( + "br_bd_diretorios_brasil", + "municipio", + billing_project_id="basedosdados-dev", + from_file=True, ) municipio["cidade_uf"] = ( municipio["nome"].apply(lambda x: x.upper()) + "-" + municipio["sigla_uf"] @@ -194,7 +197,6 @@ def load_municipio() -> None: return municipio - def get_similar_cities_process(city): municipio = load_municipio() results = process.extractOne(city, municipio["cidade_uf"], score_cutoff=70) From 537f7961c6dda9f99ef77724d3deff28963ec195 Mon Sep 17 00:00:00 2001 From: tricktx Date: Thu, 5 Dec 2024 14:46:58 -0300 Subject: [PATCH 8/9] fix billing project id --- pipelines/utils/crawler_cgu/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/utils/crawler_cgu/utils.py b/pipelines/utils/crawler_cgu/utils.py index e1c418c3f..f52465b42 100644 --- a/pipelines/utils/crawler_cgu/utils.py +++ b/pipelines/utils/crawler_cgu/utils.py @@ -188,7 +188,7 @@ def load_municipio() -> None: municipio: pd.DataFrame = bd.read_table( "br_bd_diretorios_brasil", "municipio", - billing_project_id="basedosdados-dev", + billing_project_id="basedosdados", from_file=True, ) municipio["cidade_uf"] = ( From e922171016f0fd1c0e32d974745907abe4218843 Mon Sep 17 00:00:00 2001 From: tricktx Date: Mon, 9 Dec 2024 16:05:58 -0300 Subject: [PATCH 9/9] fix date column --- pipelines/utils/crawler_cgu/flows.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pipelines/utils/crawler_cgu/flows.py b/pipelines/utils/crawler_cgu/flows.py index 27891e816..e7d9c426e 100644 --- a/pipelines/utils/crawler_cgu/flows.py +++ b/pipelines/utils/crawler_cgu/flows.py @@ -307,7 +307,7 @@ update_django_metadata( dataset_id=dataset_id, table_id=table_id, - date_column_name={"year": "ano_extrato", "month": "mes_extrato"}, + date_column_name={"year": "ano", "month": "mes"}, date_format="%Y-%m", coverage_type="part_bdpro", time_delta={"months": 6}, @@ -315,6 +315,5 @@ bq_project="basedosdados", upstream_tasks=[wait_for_materialization], ) - flow_cgu_licitacao_contrato.storage = GCS(constants.GCS_FLOWS_BUCKET.value) flow_cgu_licitacao_contrato.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)