diff --git a/pipelines/datasets/br_cvm_fi/constants.py b/pipelines/datasets/br_cvm_fi/constants.py index 88ce77ca4..edf8391c8 100644 --- a/pipelines/datasets/br_cvm_fi/constants.py +++ b/pipelines/datasets/br_cvm_fi/constants.py @@ -558,3 +558,5 @@ class constants(Enum): # pylint: disable=c0103 URL_BALANCETE = "https://dados.cvm.gov.br/dados/FI/DOC/BALANCETE/DADOS/" ARQUITETURA_URL_BALANCETE = "https://docs.google.com/spreadsheets/d/1eIMo_hYHy89oh6kHRN9Kh0NytUZzr8__/edit#gid=1045172528" + + ARQUITETURA_URL_CDA = "https://docs.google.com/spreadsheets/d/1V2XHBXBB_biC0cLoMZ3FxtbC7CPLxQXZhIY7iJDtsSw/edit#gid=0" diff --git a/pipelines/datasets/br_cvm_fi/tasks.py b/pipelines/datasets/br_cvm_fi/tasks.py index c5ccc817f..491c17f17 100644 --- a/pipelines/datasets/br_cvm_fi/tasks.py +++ b/pipelines/datasets/br_cvm_fi/tasks.py @@ -16,6 +16,7 @@ from rpy2.robjects.packages import importr import rpy2.robjects.packages as rpackages import rpy2.robjects as ro +from rpy2.robjects.vectors import StrVector from rpy2.robjects import pandas2ri from pipelines.datasets.br_cvm_fi.utils import ( sheet_to_df, @@ -241,7 +242,7 @@ def clean_data_and_make_partitions(path: str, table_id: str) -> str: @task def clean_data_make_partitions_cda(diretorio, table_id): - df_arq = sheet_to_df(cvm_constants.ARQUITETURA_URL.value) + df_arq = sheet_to_df(cvm_constants.ARQUITETURA_URL_CDA.value) anos_meses = obter_anos_meses(diretorio) for i in anos_meses: @@ -274,10 +275,13 @@ def clean_data_make_partitions_cda(diretorio, table_id): df_final[cvm_constants.COLUNAS.value] = df_final[ cvm_constants.COLUNAS.value ].applymap(lambda x: cvm_constants.MAPEAMENTO.value.get(x, x)) + df_final["CNPJ_FUNDO"] = df_final["CNPJ_FUNDO"].str.replace(r"[/.-]", "") + df_final["CNPJ_INSTITUICAO_FINANC_COOBR"] = df_final[ "CNPJ_INSTITUICAO_FINANC_COOBR" ].str.replace(r"[/.-]", "") + df_final["CPF_CNPJ_EMISSOR"] = df_final["CPF_CNPJ_EMISSOR"].str.replace( r"[/.-]", "" ) @@ -287,15 +291,21 @@ def clean_data_make_partitions_cda(diretorio, table_id): ) df_final = rename_columns(df_arq, df_final) df_final = df_final.replace(",", ".", regex=True) + df_final[cvm_constants.COLUNAS_ASCI.value] = df_final[ cvm_constants.COLUNAS_ASCI.value ].fillna("") + df_final[cvm_constants.COLUNAS_ASCI.value] = df_final[ cvm_constants.COLUNAS_ASCI.value ].applymap(limpar_string) + df_final = df_final[cvm_constants.COLUNAS_TOTAIS.value] + log(f"Fazendo partições para o ano ------> {i}") + os.makedirs(f"/tmp/data/br_cvm_fi/{table_id}/output/", exist_ok=True) + to_partitions( df_final, partition_columns=["ano", "mes"], @@ -391,11 +401,25 @@ def clean_data_make_partitions_perfil(diretorio, table_id): df_final = pd.DataFrame() arquivos = glob.glob(f"{diretorio}*.csv") + # import R's utility package + utils = rpackages.importr("utils") + + # select a mirror for R packages + utils.chooseCRANmirror(ind=1) + # R package names + packnames = "readr" + utils.install_packages(packnames) + # R vector of strings + # names_to_install = [x for x in packnames if not rpackages.isinstalled(x)] + # if len(names_to_install) > 0: + + # Import readr + + readr = rpackages.importr("readr") for file in tqdm(arquivos): log(f"Baixando o arquivo ------> {file}") ## reading with R - readr = rpackages.importr("readr") df_r = readr.read_delim( file, delim=";", locale=readr.locale(encoding="ISO-8859-1") )