diff --git a/pipelines/datasets/__init__.py b/pipelines/datasets/__init__.py index 85eb460d1..ac3e02c6b 100644 --- a/pipelines/datasets/__init__.py +++ b/pipelines/datasets/__init__.py @@ -1,61 +1,61 @@ -# -*- coding: utf-8 -*- -""" -Prefect flows for basedosdados project -""" -############################################################################### -# Automatically managed, please do not touch -############################################################################### - -from pipelines.datasets.botdosdados.flows import * -from pipelines.datasets.br_anatel_banda_larga_fixa.flows import * -from pipelines.datasets.br_anatel_telefonia_movel.flows import * -from pipelines.datasets.br_anp_precos_combustiveis.flows import * -from pipelines.datasets.br_ans_beneficiario.flows import * -from pipelines.datasets.br_b3_cotacoes.flows import * -from pipelines.datasets.br_bcb_agencia.flows import * -from pipelines.datasets.br_bcb_estban.flows import * -from pipelines.datasets.br_bcb_taxa_cambio.flows import * -from pipelines.datasets.br_bcb_taxa_selic.flows import * -from pipelines.datasets.br_bd_indicadores.flows import * -from pipelines.datasets.br_bd_metadados.flows import * -from pipelines.datasets.br_camara_dados_abertos.flows import * -from pipelines.datasets.br_cgu_beneficios_cidadao.flows import * -from pipelines.datasets.br_cgu_pessoal_executivo_federal.flows import * -from pipelines.datasets.br_cgu_servidores_executivo_federal.flows import * -from pipelines.datasets.br_cvm_administradores_carteira.flows import * -from pipelines.datasets.br_cvm_fi.flows import * -from pipelines.datasets.br_cvm_oferta_publica_distribuicao.flows import * -from pipelines.datasets.br_denatran_frota.flows import * -from pipelines.datasets.br_fgv_igp.flows import * -from pipelines.datasets.br_ibge_inpc.flows import * -from pipelines.datasets.br_ibge_ipca15.flows import * -from pipelines.datasets.br_ibge_ipca.flows import * -from pipelines.datasets.br_ibge_pnadc.flows import * -from pipelines.datasets.br_inmet_bdmep.flows import * -from pipelines.datasets.br_bd_siga_o_dinheiro.flows import * -from pipelines.datasets.br_me_caged.flows import * -from pipelines.datasets.br_me_cnpj.flows import * -from pipelines.datasets.br_me_comex_stat.flows import * -from pipelines.datasets.br_mercadolivre_ofertas.flows import * -from pipelines.datasets.br_mg_belohorizonte_smfa_iptu.flows import * -from pipelines.datasets.br_mp_pep_cargos_funcoes.flows import * -from pipelines.datasets.br_ms_cnes.flows import * -from pipelines.datasets.br_ms_sia.flows import * -from pipelines.datasets.br_ons_avaliacao_operacao.flows import * -from pipelines.datasets.br_ons_estimativa_custos.flows import * -from pipelines.datasets.br_poder360_pesquisas.flows import * -from pipelines.datasets.br_rf_cafir.flows import * -from pipelines.datasets.br_rf_cno.flows import * -from pipelines.datasets.br_rj_isp_estatisticas_seguranca.flows import * -from pipelines.datasets.br_sp_saopaulo_dieese_icv.flows import * -from pipelines.datasets.br_stf_corte_aberta.flows import * -from pipelines.datasets.br_tse_eleicoes.flows import * -from pipelines.datasets.cross_update.flows import * -from pipelines.datasets.delete_flows.flows import * -from pipelines.datasets.fundacao_lemann.flows import * -from pipelines.datasets.mundo_transfermarkt_competicoes.flows import * -from pipelines.datasets.mundo_transfermarkt_competicoes_internacionais.flows import * -from pipelines.datasets.br_cnj_improbidade_administrativa.flows import * -from pipelines.datasets.br_ms_sih.flows import * -from pipelines.datasets.br_ms_sinan.flows import * +# -*- coding: utf-8 -*- +""" +Prefect flows for basedosdados project +""" +############################################################################### +# Automatically managed, please do not touch +############################################################################### + +from pipelines.datasets.botdosdados.flows import * +from pipelines.datasets.br_anatel_banda_larga_fixa.flows import * +from pipelines.datasets.br_anatel_telefonia_movel.flows import * +from pipelines.datasets.br_anp_precos_combustiveis.flows import * +from pipelines.datasets.br_ans_beneficiario.flows import * +from pipelines.datasets.br_b3_cotacoes.flows import * +from pipelines.datasets.br_bcb_agencia.flows import * +from pipelines.datasets.br_bcb_estban.flows import * +from pipelines.datasets.br_bcb_taxa_cambio.flows import * +from pipelines.datasets.br_bcb_taxa_selic.flows import * +from pipelines.datasets.br_bd_indicadores.flows import * +from pipelines.datasets.br_bd_metadados.flows import * +from pipelines.datasets.br_camara_dados_abertos.flows import * +from pipelines.datasets.br_cgu_beneficios_cidadao.flows import * +from pipelines.datasets.br_cgu_pessoal_executivo_federal.flows import * +from pipelines.datasets.br_cgu_servidores_executivo_federal.flows import * +from pipelines.datasets.br_cvm_administradores_carteira.flows import * +from pipelines.datasets.br_cvm_fi.flows import * +from pipelines.datasets.br_cvm_oferta_publica_distribuicao.flows import * +from pipelines.datasets.br_denatran_frota.flows import * +from pipelines.datasets.br_fgv_igp.flows import * +from pipelines.datasets.br_ibge_inpc.flows import * +from pipelines.datasets.br_ibge_ipca15.flows import * +from pipelines.datasets.br_ibge_ipca.flows import * +from pipelines.datasets.br_ibge_pnadc.flows import * +from pipelines.datasets.br_inmet_bdmep.flows import * +from pipelines.datasets.br_bd_siga_o_dinheiro.flows import * +from pipelines.datasets.br_me_caged.flows import * +from pipelines.datasets.br_me_cnpj.flows import * +from pipelines.datasets.br_me_comex_stat.flows import * +from pipelines.datasets.br_mercadolivre_ofertas.flows import * +from pipelines.datasets.br_mg_belohorizonte_smfa_iptu.flows import * +from pipelines.datasets.br_mp_pep_cargos_funcoes.flows import * +from pipelines.datasets.br_ms_cnes.flows import * +from pipelines.datasets.br_ms_sia.flows import * +from pipelines.datasets.br_ons_avaliacao_operacao.flows import * +from pipelines.datasets.br_ons_estimativa_custos.flows import * +from pipelines.datasets.br_poder360_pesquisas.flows import * +from pipelines.datasets.br_rf_cafir.flows import * +from pipelines.datasets.br_rf_cno.flows import * +from pipelines.datasets.br_rj_isp_estatisticas_seguranca.flows import * +from pipelines.datasets.br_sp_saopaulo_dieese_icv.flows import * +from pipelines.datasets.br_stf_corte_aberta.flows import * +from pipelines.datasets.br_tse_eleicoes.flows import * +from pipelines.datasets.cross_update.flows import * +from pipelines.datasets.delete_flows.flows import * +from pipelines.datasets.fundacao_lemann.flows import * +from pipelines.datasets.mundo_transfermarkt_competicoes.flows import * +from pipelines.datasets.mundo_transfermarkt_competicoes_internacionais.flows import * +from pipelines.datasets.br_cnj_improbidade_administrativa.flows import * +from pipelines.datasets.br_ms_sih.flows import * +from pipelines.datasets.br_ms_sinan.flows import * from pipelines.datasets.br_cgu_emendas_parlamentares.flows import * \ No newline at end of file diff --git a/pipelines/datasets/br_cgu_cartao_pagamento/flows.py b/pipelines/datasets/br_cgu_cartao_pagamento/flows.py new file mode 100644 index 000000000..4aa94be4d --- /dev/null +++ b/pipelines/datasets/br_cgu_cartao_pagamento/flows.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +from copy import deepcopy, copy +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS +from pipelines.utils.crawler_cgu.flows import flow_cgu_cartao_pagamento +from pipelines.constants import constants +from pipelines.datasets.br_cgu_cartao_pagamento.schedules import ( + every_day_microdados_compras_centralizadas, + every_day_microdados_defesa_civil, + every_day_microdados_governo_federal +) + +br_cgu_cartao_pagamento__governo_federal = deepcopy(flow_cgu_cartao_pagamento) +br_cgu_cartao_pagamento__governo_federal.name = "br_cgu_cartao_pagamento.governo_federal" +br_cgu_cartao_pagamento__governo_federal.code_owners = ["trick"] +br_cgu_cartao_pagamento__governo_federal.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_cartao_pagamento__governo_federal.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_cartao_pagamento__governo_federal.schedule = every_day_microdados_governo_federal + +br_cgu_cartao_pagamento__defesa_civil = deepcopy(flow_cgu_cartao_pagamento) +br_cgu_cartao_pagamento__defesa_civil.name = "br_cgu_cartao_pagamento.defesa_civil" +br_cgu_cartao_pagamento__defesa_civil.code_owners = ["trick"] +br_cgu_cartao_pagamento__defesa_civil.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_cartao_pagamento__defesa_civil.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_cartao_pagamento__defesa_civil.schedule = every_day_microdados_defesa_civil + +br_cgu_cartao_pagamento__compras_centralizadas = deepcopy(flow_cgu_cartao_pagamento) +br_cgu_cartao_pagamento__compras_centralizadas.name = "br_cgu_cartao_pagamento.compras_centralizadas" +br_cgu_cartao_pagamento__compras_centralizadas.code_owners = ["trick"] +br_cgu_cartao_pagamento__compras_centralizadas.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_cartao_pagamento__compras_centralizadas.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_cartao_pagamento__compras_centralizadas.schedule = every_day_microdados_compras_centralizadas \ No newline at end of file diff --git a/pipelines/datasets/br_cgu_cartao_pagamento/schedules.py b/pipelines/datasets/br_cgu_cartao_pagamento/schedules.py new file mode 100644 index 000000000..c9bd50a4c --- /dev/null +++ b/pipelines/datasets/br_cgu_cartao_pagamento/schedules.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +from datetime import datetime +from prefect.schedules import Schedule +from prefect.schedules.clocks import CronClock, IntervalClock +from pipelines.constants import constants +from pipelines.utils.crawler_cgu.constants import constants as constants_cgu + +every_day_microdados_governo_federal = Schedule( + clocks=[ + CronClock( + cron="0 20 * * *", + start_date=datetime(2021, 3, 31, 17, 11), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_cgu_cartao_pagamento", + "table_id": "microdados_governo_federal", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "historical_data": False, + "update_metadata": True, + }, + ), + ], +) + +every_day_microdados_defesa_civil = Schedule( + clocks=[ + CronClock( + cron="30 20 * * *", + start_date=datetime(2021, 3, 31, 17, 11), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_cgu_cartao_pagamento", + "table_id": "microdados_defesa_civil", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "historical_data": False, + "update_metadata": True, + }, + ), + ], +) + +every_day_microdados_compras_centralizadas = Schedule( + clocks=[ + CronClock( + cron="00 21 * * *", + start_date=datetime(2021, 3, 31, 17, 11), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_cgu_cartao_pagamento", + "table_id": "microdados_compras_centralizadas", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "historical_data": False, + "update_metadata": True, + }, + ), + ], +) \ No newline at end of file diff --git a/pipelines/utils/crawler_cgu/__init__.py b/pipelines/utils/crawler_cgu/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/pipelines/utils/crawler_cgu/constants.py b/pipelines/utils/crawler_cgu/constants.py new file mode 100644 index 000000000..419ca410f --- /dev/null +++ b/pipelines/utils/crawler_cgu/constants.py @@ -0,0 +1,35 @@ +# -*- coding: utf-8 -*- +""" +Constant values for the datasets projects +""" + +from enum import Enum +from datetime import datetime + +class constants(Enum): # pylint: disable=c0103 + """ + Constant values for the br_cgu_cartao_pagamento project + """ + + TABELA = { + "microdados_governo_federal" : { + "INPUT_DATA" : "/tmp/input/microdados_governo_federal", + "OUTPUT_DATA" : "/tmp/output/microdados_governo_federal", + "URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpgf/", + "READ" : "_CPGF", + "ONLY_ONE_FILE" : False}, + + "microdados_compras_centralizadas" : { + "INPUT_DATA" : "/tmp/input/microdados_compras_centralizadas", + "OUTPUT_DATA" : "/tmp/output/microdados_compras_centralizadas", + "URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpcc/", + "READ" : "_CPGFComprasCentralizadas", + "ONLY_ONE_FILE" : False}, + + "microdados_defesa_civil" : { + "INPUT_DATA" : "/tmp/input/microdados_defesa_civil", + "OUTPUT_DATA" : "/tmp/output/microdados_defesa_civil", + "URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpdc/", + "READ" : "_CPDC", + "ONLY_ONE_FILE" : False} + } \ No newline at end of file diff --git a/pipelines/utils/crawler_cgu/flows.py b/pipelines/utils/crawler_cgu/flows.py new file mode 100644 index 000000000..f3bcb2a4b --- /dev/null +++ b/pipelines/utils/crawler_cgu/flows.py @@ -0,0 +1,120 @@ +# -*- coding: utf-8 -*- +""" +Flows for br_cgu_cartao_pagamento +""" +from datetime import timedelta +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS +from prefect import Parameter, case +from pipelines.constants import constants +from prefect.tasks.prefect import create_flow_run, wait_for_flow_run +from pipelines.utils.constants import constants as utils_constants +from pipelines.utils.decorators import Flow +from pipelines.utils.crawler_cgu.tasks import ( + partition_data, + get_current_date_and_download_file, +) +from pipelines.utils.decorators import Flow +from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata, check_if_data_is_outdated +from pipelines.utils.tasks import ( + create_table_and_upload_to_gcs, + get_current_flow_labels, + rename_current_flow_run_dataset_table, +) + +with Flow( + name="CGU - Cartão de Pagamento" +) as flow_cgu_cartao_pagamento: + + dataset_id = Parameter("dataset_id", default='br_cgu_cartao_pagamento', required=True) + table_id = Parameter("table_id", default ="microdados_governo_federal", required=True) + #### + # Relative_month = 1 means that the data will be downloaded for the current month + #### + relative_month = Parameter("relative_month", default=1, required=False) + materialization_mode = Parameter("materialization_mode", default="dev", required=False) + materialize_after_dump = Parameter("materialize_after_dump", default=True, required=False) + dbt_alias = Parameter("dbt_alias", default=True, required=False) + update_metadata = Parameter("update_metadata", default=False, required=False) + rename_flow_run = rename_current_flow_run_dataset_table(prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id) + + data_source_max_date = get_current_date_and_download_file( + table_id, + dataset_id, + relative_month, + ) + + dados_desatualizados = check_if_data_is_outdated( + dataset_id=dataset_id, + table_id=table_id, + data_source_max_date=data_source_max_date, + date_format="%Y-%m", + upstream_tasks=[data_source_max_date] + ) + + with case(dados_desatualizados, True): + + filepath = partition_data( + table_id=table_id, + upstream_tasks=[dados_desatualizados] + ) + + wait_upload_table = create_table_and_upload_to_gcs( + data_path=filepath, + dataset_id=dataset_id, + table_id=table_id, + dump_mode="append", + wait=filepath, + upstream_tasks=[filepath], + ) + + with case(materialize_after_dump, True): + + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": table_id, + "mode": materialization_mode, + "dbt_alias": dbt_alias, + "dbt_command": "run/test", + "disable_elementary": False, + }, + labels=current_flow_labels, + run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks=[wait_upload_table], + ) + + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + upstream_tasks=[materialization_flow], + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + with case(update_metadata, True): + update_django_metadata( + dataset_id=dataset_id, + table_id=table_id, + date_column_name={"year": "ano_extrato", "month": "mes_extrato"}, + date_format="%Y-%m", + coverage_type="part_bdpro", + time_delta={"months": 6}, + prefect_mode=materialization_mode, + bq_project="basedosdados", + upstream_tasks=[wait_for_materialization], + ) + +flow_cgu_cartao_pagamento.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +flow_cgu_cartao_pagamento.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value +) diff --git a/pipelines/utils/crawler_cgu/tasks.py b/pipelines/utils/crawler_cgu/tasks.py new file mode 100644 index 000000000..2bc2f2b2b --- /dev/null +++ b/pipelines/utils/crawler_cgu/tasks.py @@ -0,0 +1,80 @@ +# -*- coding: utf-8 -*- +""" +Tasks for br_cgu_cartao_pagamento +""" +from datetime import datetime +from prefect import task +from dateutil.relativedelta import relativedelta +import pandas as pd +from pipelines.utils.utils import log, to_partitions +from pipelines.utils.crawler_cgu.utils import read_csv, last_date_in_metadata +from pipelines.utils.crawler_cgu.constants import constants +from pipelines.utils.crawler_cgu.utils import download_file +from typing import Tuple + +@task +def partition_data(table_id: str) -> str: + """ + Partition data from a given table. + + This function reads data from a specified table, partitions it based on + the columns 'ANO_EXTRATO' and 'MES_EXTRATO', and saves the partitioned + data to a specified output path. + + Args: + table_id (str): The identifier of the table to be partitioned. + + Returns: + str: The path where the partitioned data is saved. + """ + + value_constants = constants.TABELA.value[table_id] + + log("---------------------------- Read data ----------------------------") + # Read the data + df = read_csv(table_id = table_id, + url = value_constants['URL']) + + # Partition the data + log(" ---------------------------- Partiting data -----------------------") + + to_partitions( + data = df, + partition_columns=['ANO_EXTRATO', 'MES_EXTRATO'], + savepath = value_constants['OUTPUT_DATA'], + file_type='csv') + + log("---------------------------- Data partitioned ----------------------") + + return value_constants['OUTPUT_DATA'] + +@task +def get_current_date_and_download_file(table_id : str, + dataset_id : str, + relative_month : int = 1) -> datetime: + """ + Get the maximum date from a given table for a specific year and month. + + Args: + table_id (str): The ID of the table. + year (int): The year. + month (int): The month. + + Returns: + datetime: The maximum date as a datetime object. + """ + last_date_in_api, next_date_in_api = last_date_in_metadata( + dataset_id = dataset_id, + table_id = table_id, + relative_month = relative_month + ) + + + max_date = str(download_file(table_id = table_id, + year = next_date_in_api.year, + month = next_date_in_api.month, + relative_month=relative_month)) + + date = datetime.strptime(max_date, '%Y-%m-%d') + + return date diff --git a/pipelines/utils/crawler_cgu/utils.py b/pipelines/utils/crawler_cgu/utils.py new file mode 100644 index 000000000..68c1d2e5b --- /dev/null +++ b/pipelines/utils/crawler_cgu/utils.py @@ -0,0 +1,139 @@ +# -*- coding: utf-8 -*- +""" +General purpose functions for the br_cgu_cartao_pagamento project +""" +import datetime +from dateutil.relativedelta import relativedelta +import pandas as pd +import os +import basedosdados as bd +import requests +from pipelines.utils.crawler_cgu.constants import constants +from typing import List +import unidecode +from pipelines.utils.utils import log, download_and_unzip_file +from pipelines.utils.metadata.utils import get_api_most_recent_date, get_url + + +def download_file(table_id : str, year : int, month : int, relative_month = int) -> None: + """ + Downloads and unzips a file from a specified URL based on the given table ID, year, and month. + + Parameters: + table_id (str): The identifier for the table to download data for. + year (int): The year for which data is to be downloaded. + month (int): The month for which data is to be downloaded. + relative_month (int): The relative month used for querying metadata. + + Returns: + None: If the file is successfully downloaded and unzipped. + str: The next date in the API if the URL is found. + str: The last date in the API if the URL is not found. + """ + value_constants = constants.TABELA.value[table_id] + input = value_constants['INPUT_DATA'] + if not os.path.exists(input): + os.makedirs(input) + + log(f' ---------------------------- Year = {year} --------------------------------------') + log(f' ---------------------------- Month = {month} ------------------------------------') + + if not value_constants['ONLY_ONE_FILE']: + + url = f"{value_constants['URL']}{year}{str(month).zfill(2)}/" + + status = requests.get(url).status_code == 200 + if status: + log(f'------------------ URL = {url} ------------------') + download_and_unzip_file(url, value_constants['INPUT_DATA']) + + last_date_in_api, next_date_in_api = last_date_in_metadata( + dataset_id="br_cgu_cartao_pagamento", + table_id=table_id, + relative_month=relative_month + ) + + return next_date_in_api + + else: + log('URL não encontrada. Fazendo uma query na BD') + log(f'------------------ URL = {url} ------------------') + + last_date_in_api, next_date_in_api = last_date_in_metadata( + dataset_id="br_cgu_cartao_pagamento", + table_id=table_id, + relative_month=relative_month + ) + + return last_date_in_api + + if value_constants['ONLY_ONE_FILE']: + url = value_constants['URL'] + download_and_unzip_file(url, value_constants['INPUT_DATA']) + return None + + + +def read_csv(table_id : str, url : str, column_replace : List = ['VALOR_TRANSACAO']) -> pd.DataFrame: + """ + Reads a CSV file from a specified path and processes its columns. + + Args: + table_id (str): The identifier for the table to be read. + url (str): The URL from which the CSV file is to be read. + column_replace (List, optional): A list of column names whose values need to be replaced. Default is ['VALOR_TRANSACAO']. + + Returns: + pd.DataFrame: A DataFrame containing the processed data from the CSV file. + + Notes: + - The function reads the CSV file from a directory specified in a constants file. + - It assumes the CSV file is encoded in 'latin1' and uses ';' as the separator. + - Column names are converted to uppercase, spaces are replaced with underscores, and accents are removed. + - For columns specified in `column_replace`, commas in their values are replaced with dots and the values are converted to float. + """ + value_constants = constants.TABELA.value[table_id] + + os.listdir(value_constants['INPUT_DATA']) + + csv_file = [f for f in os.listdir(value_constants['INPUT_DATA']) if f.endswith('.csv')][0] + log(f"CSV files: {csv_file}") + + df = pd.read_csv(f"{value_constants['INPUT_DATA']}/{csv_file}", sep=';', encoding='latin1') + + df.columns = [unidecode.unidecode(x).upper().replace(" ", "_") for x in df.columns] + + for list_column_replace in column_replace: + df[list_column_replace] = df[list_column_replace].str.replace(",", ".").astype(float) + + return df + +def last_date_in_metadata(dataset_id : str, + table_id : str, + relative_month) -> datetime.date: + """ + Retrieves the most recent date from the metadata of a specified dataset and table, + and calculates the next date based on a relative month offset. + + Args: + dataset_id (str): The ID of the dataset to query. + table_id (str): The ID of the table within the dataset to query. + relative_month (int): The number of months to add to the most recent date to calculate the next date. + + Returns: + tuple: A tuple containing: + - last_date_in_api (datetime.date): The most recent date found in the API. + - next_date_in_api (datetime.date): The date obtained by adding the relative month to the most recent date. + """ + + backend = bd.Backend(graphql_url=get_url("prod")) + last_date_in_api = get_api_most_recent_date( + dataset_id=dataset_id, + table_id=table_id, + date_format="%Y-%m", + backend=backend, + ) + + next_date_in_api = last_date_in_api + relativedelta(months=relative_month) + + return last_date_in_api, next_date_in_api \ No newline at end of file diff --git a/pipelines/utils/dump_to_gcs/flows.py b/pipelines/utils/dump_to_gcs/flows.py index 5080a6ad2..af28e62dc 100644 --- a/pipelines/utils/dump_to_gcs/flows.py +++ b/pipelines/utils/dump_to_gcs/flows.py @@ -14,7 +14,7 @@ download_data_to_gcs, get_project_id, ) -from pipelines.datasets.cross_update.tasks import get_all_eligible_in_selected_year +#from pipelines.datasets.cross_update.tasks import get_all_eligible_in_selected_year from pipelines.utils.tasks import rename_current_flow_run_dataset_table with Flow( diff --git a/pipelines/utils/utils.py b/pipelines/utils/utils.py index 6c5dc3673..fdefc2994 100644 --- a/pipelines/utils/utils.py +++ b/pipelines/utils/utils.py @@ -14,7 +14,9 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union from uuid import uuid4 - +from io import BytesIO +from urllib.request import urlopen +import zipfile import basedosdados as bd import croniter import hvac @@ -463,6 +465,36 @@ def to_partitions( raise BaseException("Data need to be a pandas DataFrame") + +############### +# +# Download data +# +############### + +def download_and_unzip_file(url : str, path : str) -> None: + """ + Downloads a file from the given URL and extracts it to the specified path. + + Parameters: + url (str): The URL of the file to be downloaded. + path (str): The path where the file will be extracted. + + Returns: + None + """ + + log("------------------ Downloading and unzipping file ------------------") + try: + r = urlopen(url) + zip = zipfile.ZipFile(BytesIO(r.read())) + zip.extractall(path=path) + log("Complet download and unzip") + + except Exception as e: + log(e) + log("Error when downloading and unzipping file") + ############### # # Storage utils