diff --git a/pipelines/datasets/__init__.py b/pipelines/datasets/__init__.py index c0ef34e76..b92a698a0 100644 --- a/pipelines/datasets/__init__.py +++ b/pipelines/datasets/__init__.py @@ -61,3 +61,4 @@ from pipelines.datasets.br_ms_sinan.flows import * from pipelines.datasets.br_cgu_emendas_parlamentares.flows import * from pipelines.datasets.br_cgu_cartao_pagamento.flows import * +from pipelines.datasets.br_cgu_licitacao_contrato.flows import * diff --git a/pipelines/datasets/br_cgu_licitacao_contrato/__init__.py b/pipelines/datasets/br_cgu_licitacao_contrato/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/pipelines/datasets/br_cgu_licitacao_contrato/flows.py b/pipelines/datasets/br_cgu_licitacao_contrato/flows.py new file mode 100644 index 000000000..a410a9bb4 --- /dev/null +++ b/pipelines/datasets/br_cgu_licitacao_contrato/flows.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +""" +Flows for br_cgu_licitacao_contrato +""" + +from copy import deepcopy, copy +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS +from pipelines.utils.crawler_cgu.flows import flow_cgu_licitacao_contrato +from pipelines.constants import constants +from pipelines.datasets.br_cgu_licitacao_contrato.schedules import ( + every_day_contrato_compra, + every_day_contrato_item, + every_day_contrato_termo_aditivo, + every_day_licitacao, + every_day_licitacao_empenho, + every_day_licitacao_item, + every_day_licitacao_participante +) + +# ! ------------------ Contrato Compra -------------------- + +br_cgu_licitacao_contrato__contrato_compra = deepcopy(flow_cgu_licitacao_contrato) +br_cgu_licitacao_contrato__contrato_compra.name = ("br_cgu_licitacao_contrato.contrato_compra") +br_cgu_licitacao_contrato__contrato_compra.code_owners = ["trick"] +br_cgu_licitacao_contrato__contrato_compra.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_licitacao_contrato__contrato_compra.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_licitacao_contrato__contrato_compra.schedule = every_day_contrato_compra + + +# ! ------------------ Contrato Item -------------------- + +br_cgu_licitacao_contrato__contrato_item = deepcopy(flow_cgu_licitacao_contrato) +br_cgu_licitacao_contrato__contrato_item.name = "br_cgu_licitacao_contrato.contrato_item" +br_cgu_licitacao_contrato__contrato_item.code_owners = ["trick"] +br_cgu_licitacao_contrato__contrato_item.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_licitacao_contrato__contrato_item.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_licitacao_contrato__contrato_item.schedule = every_day_contrato_item + +# ! ------------------ Contrato Termo Aditivo ------------------ + +br_cgu_licitacao_contrato__contrato_termo_aditivo = deepcopy(flow_cgu_licitacao_contrato) +br_cgu_licitacao_contrato__contrato_termo_aditivo.name = ("br_cgu_licitacao_contrato.contrato_termo_aditivo") +br_cgu_licitacao_contrato__contrato_termo_aditivo.code_owners = ["trick"] +br_cgu_licitacao_contrato__contrato_termo_aditivo.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_licitacao_contrato__contrato_termo_aditivo.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_licitacao_contrato__contrato_termo_aditivo.schedule = (every_day_contrato_termo_aditivo) + + +# ! ------------------ Licitação ------------------ + +br_cgu_licitacao_contrato__licitacao = deepcopy(flow_cgu_licitacao_contrato) +br_cgu_licitacao_contrato__licitacao.name = ("br_cgu_licitacao_contrato.licitacao") +br_cgu_licitacao_contrato__licitacao.code_owners = ["trick"] +br_cgu_licitacao_contrato__licitacao.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_licitacao_contrato__licitacao.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_licitacao_contrato__licitacao.schedule = (every_day_licitacao) + + +# ! ------------------ Licitação Empenho ------------------ + +br_cgu_licitacao_contrato__licitacao_empenho = deepcopy(flow_cgu_licitacao_contrato) +br_cgu_licitacao_contrato__licitacao_empenho.name = ("br_cgu_licitacao_contrato.licitacao_empenho") +br_cgu_licitacao_contrato__licitacao_empenho.code_owners = ["trick"] +br_cgu_licitacao_contrato__licitacao_empenho.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_licitacao_contrato__licitacao_empenho.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_licitacao_contrato__licitacao_empenho.schedule = (every_day_licitacao_empenho) + +# ! ------------------ Licitação Item ------------------ + +br_cgu_licitacao_contrato__licitacao_item = deepcopy(flow_cgu_licitacao_contrato) +br_cgu_licitacao_contrato__licitacao_item.name = ("br_cgu_licitacao_contrato.licitacao_item") +br_cgu_licitacao_contrato__licitacao_item.code_owners = ["trick"] +br_cgu_licitacao_contrato__licitacao_item.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_licitacao_contrato__licitacao_item.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_licitacao_contrato__licitacao_item.schedule = (every_day_licitacao_item) + + +# ! ------------------ Licitação Participante ------------------ + +br_cgu_licitacao_contrato__licitacao_participante = deepcopy(flow_cgu_licitacao_contrato) +br_cgu_licitacao_contrato__licitacao_participante.name = ("br_cgu_licitacao_contrato.licitacao_participante") +br_cgu_licitacao_contrato__licitacao_participante.code_owners = ["trick"] +br_cgu_licitacao_contrato__licitacao_participante.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_licitacao_contrato__licitacao_participante.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_licitacao_contrato__licitacao_participante.schedule = (every_day_licitacao_participante) \ No newline at end of file diff --git a/pipelines/datasets/br_cgu_licitacao_contrato/schedules.py b/pipelines/datasets/br_cgu_licitacao_contrato/schedules.py new file mode 100644 index 000000000..e04deb3a8 --- /dev/null +++ b/pipelines/datasets/br_cgu_licitacao_contrato/schedules.py @@ -0,0 +1,150 @@ +# -*- coding: utf-8 -*- +""" +Schedules for br_cgu_licitacao_contrato +""" +# -*- coding: utf-8 -*- +from datetime import datetime +from prefect.schedules import Schedule +from prefect.schedules.clocks import CronClock, IntervalClock +from pipelines.constants import constants +from pipelines.utils.crawler_cgu.constants import constants as constants_cgu + +every_day_contrato_compra = Schedule( + clocks=[ + CronClock( + cron="0 21 * * *", + start_date=datetime(2021, 3, 31, 17, 11), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_cgu_licitacao_contrato", + "table_id": "contrato_compra", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "update_metadata": True, + }, + ), + ], +) + +every_day_contrato_item = Schedule( + clocks=[ + CronClock( + cron="15 20 * * *", + start_date=datetime(2021, 3, 31, 17, 11), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_cgu_licitacao_contrato", + "table_id": "contrato_item", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "update_metadata": True, + }, + ), + ], +) + +every_day_contrato_termo_aditivo = Schedule( + clocks=[ + CronClock( + cron="30 20 * * *", + start_date=datetime(2021, 3, 31, 17, 11), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_cgu_licitacao_contrato", + "table_id": "contrato_termo_aditivo", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "update_metadata": True, + }, + ), + ], +) + +every_day_licitacao = Schedule( + clocks=[ + CronClock( + cron="45 20 * * *", + start_date=datetime(2021, 3, 31, 17, 11), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_cgu_licitacao_contrato", + "table_id": "licitacao", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "update_metadata": True, + }, + ), + ], +) + +every_day_licitacao_empenho = Schedule( + clocks=[ + CronClock( + cron="0 21 * * *", + start_date=datetime(2021, 3, 31, 17, 11), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_cgu_licitacao_contrato", + "table_id": "licitacao_empenho", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "update_metadata": True, + }, + ), + ], +) + +every_day_licitacao_item = Schedule( + clocks=[ + CronClock( + cron="15 20 * * *", + start_date=datetime(2021, 3, 31, 17, 11), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_cgu_licitacao_contrato", + "table_id": "licitacao_item", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "update_metadata": True, + }, + ), + ], +) + +every_day_licitacao_participante = Schedule( + clocks=[ + CronClock( + cron="30 20 * * *", + start_date=datetime(2021, 3, 31, 17, 11), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_cgu_licitacao_contrato", + "table_id": "licitacao_participante", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "update_metadata": True, + }, + ), + ], +) diff --git a/pipelines/utils/crawler_cgu/constants.py b/pipelines/utils/crawler_cgu/constants.py index 4133f1657..ab32534c9 100644 --- a/pipelines/utils/crawler_cgu/constants.py +++ b/pipelines/utils/crawler_cgu/constants.py @@ -6,34 +6,36 @@ from enum import Enum class constants(Enum): # pylint: disable=c0103 + + # ! ================================ CGU - Cartão de Pagamento =========================================== """ Constant values for the br_cgu_cartao_pagamento project """ TABELA = { "microdados_governo_federal" : { - "INPUT_DATA" : "/tmp/input/microdados_governo_federal", - "OUTPUT_DATA" : "/tmp/output/microdados_governo_federal", + "INPUT" : "/tmp/input/microdados_governo_federal", + "OUTPUT" : "/tmp/output/microdados_governo_federal", "URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpgf/", "READ" : "_CPGF", "ONLY_ONE_FILE" : False}, "microdados_compras_centralizadas" : { - "INPUT_DATA" : "/tmp/input/microdados_compras_centralizadas", - "OUTPUT_DATA" : "/tmp/output/microdados_compras_centralizadas", + "INPUT" : "/tmp/input/microdados_compras_centralizadas", + "OUTPUT" : "/tmp/output/microdados_compras_centralizadas", "URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpcc/", "READ" : "_CPGFComprasCentralizadas", "ONLY_ONE_FILE" : False}, "microdados_defesa_civil" : { - "INPUT_DATA" : "/tmp/input/microdados_defesa_civil", - "OUTPUT_DATA" : "/tmp/output/microdados_defesa_civil", + "INPUT" : "/tmp/input/microdados_defesa_civil", + "OUTPUT" : "/tmp/output/microdados_defesa_civil", "URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpdc/", "READ" : "_CPDC", "ONLY_ONE_FILE" : False} } - # ! ============================================== CGU - Servidores Públicos do Executivo Federal ============================================== + # ! ================================ CGU - Servidores Públicos do Executivo Federal =========================================== URL_SERVIDORES = "http://portaldatransparencia.gov.br/download-de-dados/servidores/" TABELA_SERVIDORES = { @@ -126,3 +128,50 @@ class constants(Enum): # pylint: disable=c0103 "OUTPUT": "/tmp/output/cgu_servidores/cadastro_servidores", }, } + + # ! ================================ CGU - Licitação e Contrato =========================================== + + TABELA_LICITACAO_CONTRATO = { + "licitacao": { + "URL": "https://portaldatransparencia.gov.br/download-de-dados/licitacoes/", + "INPUT": "/tmp/input/cgu_licitacao_contrato/licitacao", + "OUTPUT": "/tmp/output/cgu_licitacao_contrato/licitacao", + "READ": "_Licitação.csv", + }, + "licitacao_participante": { + "URL": "https://portaldatransparencia.gov.br/download-de-dados/licitacoes/", + "INPUT": "/tmp/input/cgu_licitacao_contrato/licitacao_participante", + "OUTPUT": "/tmp/output/cgu_licitacao_contrato/licitacao_participante", + "READ": "_ParticipantesLicitação.csv", + }, + "licitacao_item": { + "URL": "https://portaldatransparencia.gov.br/download-de-dados/licitacoes/", + "INPUT": "/tmp/input/cgu_licitacao_contrato/licitacao_item", + "OUTPUT": "/tmp/output/cgu_licitacao_contrato/licitacao_item", + "READ": "_ItemLicitação.csv", + }, + "licitacao_empenho": { + "URL": "https://portaldatransparencia.gov.br/download-de-dados/licitacoes/", + "INPUT": "/tmp/input/cgu_licitacao_contrato/licitacao_empenho", + "OUTPUT": "/tmp/output/cgu_licitacao_contrato/licitacao_empenho", + "READ": "_EmpenhosRelacionados.csv", + }, + "contrato_compra": { + "URL": "https://portaldatransparencia.gov.br/download-de-dados/compras/", + "INPUT": "/tmp/input/cgu_licitacao_contrato/contrato_compra", + "OUTPUT": "/tmp/output/cgu_licitacao_contrato/contrato_compra", + "READ": "_Compras.csv", + }, + "contrato_item": { + "URL": "https://portaldatransparencia.gov.br/download-de-dados/compras/", + "INPUT": "/tmp/input/cgu_licitacao_contrato/contrato_item", + "OUTPUT": "/tmp/output/cgu_licitacao_contrato/contrato_item", + "READ": "_ItemCompra.csv", + }, + "contrato_termo_aditivo": { + "URL": "https://portaldatransparencia.gov.br/download-de-dados/compras/", + "INPUT": "/tmp/input/cgu_licitacao_contrato/contrato_termo_aditivo", + "OUTPUT": "/tmp/output/cgu_licitacao_contrato/contrato_termo_aditivo", + "READ": "_TermoAditivo.csv", + }, + } diff --git a/pipelines/utils/crawler_cgu/flows.py b/pipelines/utils/crawler_cgu/flows.py index ccc914cab..e7d9c426e 100644 --- a/pipelines/utils/crawler_cgu/flows.py +++ b/pipelines/utils/crawler_cgu/flows.py @@ -218,3 +218,102 @@ ) flow_cgu_servidores_publicos.storage = GCS(constants.GCS_FLOWS_BUCKET.value) flow_cgu_servidores_publicos.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) + + +# ! ================================== CGU - Licitacao e Contrato ===================================== + +with Flow(name="CGU - Licitacão e Contrato") as flow_cgu_licitacao_contrato: + + dataset_id = Parameter( + "dataset_id", default="br_cgu_licitacao_contrato", required=True + ) + table_id = Parameter( + "table_id", required=True + ) + #### + # Relative_month = 1 means that the data will be downloaded for the current month + #### + relative_month = Parameter("relative_month", default=1, required=False) + materialization_mode = Parameter("materialization_mode", default="dev", required=False) + materialize_after_dump = Parameter("materialize_after_dump", default=True, required=False) + dbt_alias = Parameter("dbt_alias", default=True, required=False) + update_metadata = Parameter("update_metadata", default=False, required=False) + rename_flow_run = rename_current_flow_run_dataset_table(prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id) + + data_source_max_date = get_current_date_and_download_file( + table_id=table_id, + dataset_id=dataset_id, + relative_month=relative_month, + ) + + dados_desatualizados = check_if_data_is_outdated( + dataset_id=dataset_id, + table_id=table_id, + data_source_max_date=data_source_max_date, + date_format="%Y-%m", + upstream_tasks=[data_source_max_date], + ) + + with case(dados_desatualizados, True): + + filepath = partition_data( + table_id=table_id, + dataset_id=dataset_id, + upstream_tasks=[data_source_max_date], + ) + + wait_upload_table = create_table_and_upload_to_gcs( + data_path=filepath, + dataset_id=dataset_id, + table_id=table_id, + dump_mode="append", + wait=filepath, + upstream_tasks=[filepath], + ) + + with case(materialize_after_dump, True): + + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": table_id, + "mode": materialization_mode, + "dbt_alias": dbt_alias, + "dbt_command": "run", + "disable_elementary": False, + }, + labels=current_flow_labels, + run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks=[wait_upload_table], + ) + + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + upstream_tasks=[materialization_flow], + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + with case(update_metadata, True): + update_django_metadata( + dataset_id=dataset_id, + table_id=table_id, + date_column_name={"year": "ano", "month": "mes"}, + date_format="%Y-%m", + coverage_type="part_bdpro", + time_delta={"months": 6}, + prefect_mode=materialization_mode, + bq_project="basedosdados", + upstream_tasks=[wait_for_materialization], + ) +flow_cgu_licitacao_contrato.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +flow_cgu_licitacao_contrato.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) diff --git a/pipelines/utils/crawler_cgu/tasks.py b/pipelines/utils/crawler_cgu/tasks.py index 9fa025fe0..22908242a 100644 --- a/pipelines/utils/crawler_cgu/tasks.py +++ b/pipelines/utils/crawler_cgu/tasks.py @@ -38,18 +38,31 @@ def partition_data(table_id: str, dataset_id : str) -> str: str: The path where the partitioned data is saved. """ - if dataset_id == "br_cgu_cartao_pagamento": + if dataset_id in ["br_cgu_cartao_pagamento", "br_cgu_licitacao_contrato"]: log("---------------------------- Read data ----------------------------") - df = read_csv(table_id = table_id, - url = constants.TABELA.value[table_id]['URL']) - log(" ---------------------------- Partiting data -----------------------") - to_partitions( - data = df, - partition_columns=['ANO_EXTRATO', 'MES_EXTRATO'], - savepath = constants.TABELA.value[table_id]['OUTPUT_DATA'], - file_type='csv') - log("---------------------------- Data partitioned ----------------------") - return constants.TABELA.value[table_id]['OUTPUT_DATA'] + df = read_csv(dataset_id = dataset_id, table_id = table_id) + log(df.head()) + if dataset_id == "br_cgu_cartao_pagamento:": + log(" ---------------------------- Partiting data -----------------------") + to_partitions( + data = df, + partition_columns=['ANO_EXTRATO', 'MES_EXTRATO'], + savepath = constants.TABELA.value[table_id]['OUTPUT'], + file_type='csv') + + log("---------------------------- Data partitioned ----------------------") + return constants.TABELA.value[table_id]['OUTPUT'] + + if dataset_id == "br_cgu_licitacao_contrato": + log(" ---------------------------- Partiting data -----------------------") + to_partitions( + data=df, + partition_columns=["ano", "mes"], + savepath=constants.TABELA_LICITACAO_CONTRATO.value[table_id]["OUTPUT"], + file_type="csv", + ) + log("---------------------------- Data partitioned ----------------------") + return constants.TABELA_LICITACAO_CONTRATO.value[table_id]["OUTPUT"] elif dataset_id == "br_cgu_servidores_executivo_federal": @@ -84,6 +97,8 @@ def get_current_date_and_download_file(table_id : str, table_id = table_id, relative_month = relative_month ) + log(f"Last date in API: {last_date_in_api}") + log(f"Next date in API: {next_date_in_api}") max_date = str(download_file(table_id = table_id, dataset_id = dataset_id, diff --git a/pipelines/utils/crawler_cgu/utils.py b/pipelines/utils/crawler_cgu/utils.py index 001c13c5a..f52465b42 100644 --- a/pipelines/utils/crawler_cgu/utils.py +++ b/pipelines/utils/crawler_cgu/utils.py @@ -7,8 +7,8 @@ from dateutil.relativedelta import relativedelta import gc import shutil -import io -import zipfile +from functools import lru_cache +from rapidfuzz import process import pandas as pd import os import unidecode @@ -52,7 +52,7 @@ def build_urls(dataset_id: str, url: str, year: int, month: int, table_id: str) log(f"{dataset_id=}") - if dataset_id == "br_cgu_cartao_pagamento": + if dataset_id in ["br_cgu_cartao_pagamento", "br_cgu_licitacao_contrato"]: log(f"{url}{year}{str(month).zfill(2)}/") return f"{url}{year}{str(month).zfill(2)}/" @@ -110,10 +110,13 @@ def download_file(dataset_id: str, table_id: str, year: int, month: int, relativ str: The last date in the API if the URL is not found. """ - if dataset_id == "br_cgu_cartao_pagamento": - value_constants = constants.TABELA.value[table_id] # ! CGU - Cartão de Pagamento + if dataset_id in ["br_cgu_cartao_pagamento" and "br_cgu_licitacao_contrato"]: + if dataset_id == "br_cgu_cartao_pagamento": + value_constants = constants.TABELA.value[table_id] # ! CGU - Cartão de Pagamento + elif dataset_id == "br_cgu_licitacao_contrato": + value_constants = constants.TABELA_LICITACAO_CONTRATO.value[table_id] - input = value_constants["INPUT_DATA"] + input = value_constants["INPUT"] if not os.path.exists(input): os.makedirs(input) @@ -129,10 +132,10 @@ def download_file(dataset_id: str, table_id: str, year: int, month: int, relativ status = requests.get(url).status_code == 200 if status: log(f'------------------ URL = {url} ------------------') - download_and_unzip_file(url, value_constants['INPUT_DATA']) + download_and_unzip_file(url, value_constants['INPUT']) last_date_in_api, next_date_in_api = last_date_in_metadata( - dataset_id="br_cgu_cartao_pagamento", + dataset_id=dataset_id, table_id=table_id, relative_month=relative_month ) @@ -144,7 +147,7 @@ def download_file(dataset_id: str, table_id: str, year: int, month: int, relativ log(f'------------------ URL = {url} ------------------') last_date_in_api, next_date_in_api = last_date_in_metadata( - dataset_id="br_cgu_cartao_pagamento", + dataset_id=dataset_id, table_id=table_id, relative_month=relative_month ) @@ -178,8 +181,30 @@ def download_file(dataset_id: str, table_id: str, year: int, month: int, relativ return next_date_in_api + +# Função para carregar o dataframe +@lru_cache(maxsize=1) # Cache para evitar recarregar a tabela +def load_municipio() -> None: + municipio: pd.DataFrame = bd.read_table( + "br_bd_diretorios_brasil", + "municipio", + billing_project_id="basedosdados", + from_file=True, + ) + municipio["cidade_uf"] = ( + municipio["nome"].apply(lambda x: x.upper()) + "-" + municipio["sigla_uf"] + ) + return municipio + + +def get_similar_cities_process(city): + municipio = load_municipio() + results = process.extractOne(city, municipio["cidade_uf"], score_cutoff=70) + return results[0] if results else None + + def read_csv( - table_id: str, column_replace: List = ["VALOR_TRANSACAO"] + dataset_id : str, table_id: str, column_replace: List = ["VALOR_TRANSACAO"] ) -> pd.DataFrame: """ Reads a CSV file from a specified path and processes its columns. @@ -198,28 +223,60 @@ def read_csv( - Column names are converted to uppercase, spaces are replaced with underscores, and accents are removed. - For columns specified in `column_replace`, commas in their values are replaced with dots and the values are converted to float. """ - value_constants = constants.TABELA_SERVIDORES.value[table_id] - - os.listdir(value_constants["INPUT"]) - csv_file = [ - f for f in os.listdir(value_constants["INPUT_DATA"]) if f.endswith(".csv") - ][0] - log(f"CSV files: {csv_file}") + if dataset_id == "br_cgu_cartao_pagamento": + value_constants = constants.TABELA.value[table_id] - df = pd.read_csv( - f"{value_constants['INPUT_DATA']}/{csv_file}", sep=";", encoding="latin1" - ) + os.listdir(value_constants["INPUT"]) - df.columns = [unidecode.unidecode(x).upper().replace(" ", "_") for x in df.columns] + csv_file = [ + f for f in os.listdir(value_constants["INPUT"]) if f.endswith(".csv") + ][0] + log(f"CSV files: {csv_file}") - for list_column_replace in column_replace: - df[list_column_replace] = ( - df[list_column_replace].str.replace(",", ".").astype(float) + df = pd.read_csv( + f"{value_constants['INPUT']}/{csv_file}", sep=";", encoding="latin1" ) - return df + df.columns = [unidecode.unidecode(x).upper().replace(" ", "_") for x in df.columns] + + for list_column_replace in column_replace: + df[list_column_replace] = ( + df[list_column_replace].str.replace(",", ".").astype(float) + ) + + return df + + if dataset_id == "br_cgu_licitacao_contrato": + constants_cgu_licitacao_contrato = constants.TABELA_LICITACAO_CONTRATO.value[table_id] + print(os.listdir(constants_cgu_licitacao_contrato["INPUT"])) + csv_file = [ + f + for f in os.listdir(constants_cgu_licitacao_contrato["INPUT"]) + if f.endswith(constants_cgu_licitacao_contrato["READ"]) + ][0] + log(f"CSV files: {csv_file}") + log(f"{constants_cgu_licitacao_contrato['INPUT']}/{csv_file}") + df = pd.read_csv(f"{constants_cgu_licitacao_contrato['INPUT']}/{csv_file}", sep=";", encoding="latin1") + df['ano'] = csv_file[:4] + df['mes'] = csv_file[4:6] + + df.columns = [unidecode.unidecode(col) for col in df.columns] + df.columns = [col.replace(" ", "_").lower() for col in df.columns] + + if table_id == "licitacao": + df["cidade_uf"] = df["municipio"] + "-" + df["uf"] + + df["cidade_uf_dir"] = df["cidade_uf"].apply( + lambda x: get_similar_cities_process(x) + ) + df.drop(["cidade_uf", "municipio"], axis=1, inplace=True) + + df.rename(columns={"cidade_uf_dir": "municipio"}, inplace=True) + + df["municipio"] = df["municipio"].apply(lambda x: x if x == None else x.split("-")[0]) + return df def last_date_in_metadata( dataset_id: str, table_id: str, relative_month