From f29ff7578f015172ca81556dc49a400d6ab68fee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Fri, 15 Sep 2023 12:00:43 -0300 Subject: [PATCH 1/5] feat: installing readr package --- pipelines/datasets/br_cvm_fi/tasks.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/pipelines/datasets/br_cvm_fi/tasks.py b/pipelines/datasets/br_cvm_fi/tasks.py index c5ccc817f..1a51ae2e9 100644 --- a/pipelines/datasets/br_cvm_fi/tasks.py +++ b/pipelines/datasets/br_cvm_fi/tasks.py @@ -16,6 +16,7 @@ from rpy2.robjects.packages import importr import rpy2.robjects.packages as rpackages import rpy2.robjects as ro +from rpy2.robjects.vectors import StrVector from rpy2.robjects import pandas2ri from pipelines.datasets.br_cvm_fi.utils import ( sheet_to_df, @@ -391,11 +392,25 @@ def clean_data_make_partitions_perfil(diretorio, table_id): df_final = pd.DataFrame() arquivos = glob.glob(f"{diretorio}*.csv") + # import R's utility package + utils = rpackages.importr("utils") + + # select a mirror for R packages + utils.chooseCRANmirror(ind=1) + # R package names + packnames = "readr" + + # R vector of strings + names_to_install = [x for x in packnames if not rpackages.isinstalled(x)] + if len(names_to_install) > 0: + utils.install_packages(StrVector(names_to_install)) + # Import readr + + readr = rpackages.importr("readr") for file in tqdm(arquivos): log(f"Baixando o arquivo ------> {file}") ## reading with R - readr = rpackages.importr("readr") df_r = readr.read_delim( file, delim=";", locale=readr.locale(encoding="ISO-8859-1") ) From 762884ae525e4b7b0719d0302e4f398d0da9c2bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Tue, 19 Sep 2023 16:18:59 -0300 Subject: [PATCH 2/5] fix: fixing cda flow --- pipelines/datasets/br_cvm_fi/constants.py | 2 ++ pipelines/datasets/br_cvm_fi/tasks.py | 11 ++++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/pipelines/datasets/br_cvm_fi/constants.py b/pipelines/datasets/br_cvm_fi/constants.py index 88ce77ca4..edf8391c8 100644 --- a/pipelines/datasets/br_cvm_fi/constants.py +++ b/pipelines/datasets/br_cvm_fi/constants.py @@ -558,3 +558,5 @@ class constants(Enum): # pylint: disable=c0103 URL_BALANCETE = "https://dados.cvm.gov.br/dados/FI/DOC/BALANCETE/DADOS/" ARQUITETURA_URL_BALANCETE = "https://docs.google.com/spreadsheets/d/1eIMo_hYHy89oh6kHRN9Kh0NytUZzr8__/edit#gid=1045172528" + + ARQUITETURA_URL_CDA = "https://docs.google.com/spreadsheets/d/1V2XHBXBB_biC0cLoMZ3FxtbC7CPLxQXZhIY7iJDtsSw/edit#gid=0" diff --git a/pipelines/datasets/br_cvm_fi/tasks.py b/pipelines/datasets/br_cvm_fi/tasks.py index 1a51ae2e9..303efc2f7 100644 --- a/pipelines/datasets/br_cvm_fi/tasks.py +++ b/pipelines/datasets/br_cvm_fi/tasks.py @@ -242,7 +242,7 @@ def clean_data_and_make_partitions(path: str, table_id: str) -> str: @task def clean_data_make_partitions_cda(diretorio, table_id): - df_arq = sheet_to_df(cvm_constants.ARQUITETURA_URL.value) + df_arq = sheet_to_df(cvm_constants.ARQUITETURA_URL_CDA.value) anos_meses = obter_anos_meses(diretorio) for i in anos_meses: @@ -275,10 +275,13 @@ def clean_data_make_partitions_cda(diretorio, table_id): df_final[cvm_constants.COLUNAS.value] = df_final[ cvm_constants.COLUNAS.value ].applymap(lambda x: cvm_constants.MAPEAMENTO.value.get(x, x)) + df_final["CNPJ_FUNDO"] = df_final["CNPJ_FUNDO"].str.replace(r"[/.-]", "") + df_final["CNPJ_INSTITUICAO_FINANC_COOBR"] = df_final[ "CNPJ_INSTITUICAO_FINANC_COOBR" ].str.replace(r"[/.-]", "") + df_final["CPF_CNPJ_EMISSOR"] = df_final["CPF_CNPJ_EMISSOR"].str.replace( r"[/.-]", "" ) @@ -288,15 +291,21 @@ def clean_data_make_partitions_cda(diretorio, table_id): ) df_final = rename_columns(df_arq, df_final) df_final = df_final.replace(",", ".", regex=True) + df_final[cvm_constants.COLUNAS_ASCI.value] = df_final[ cvm_constants.COLUNAS_ASCI.value ].fillna("") + df_final[cvm_constants.COLUNAS_ASCI.value] = df_final[ cvm_constants.COLUNAS_ASCI.value ].applymap(limpar_string) + df_final = df_final[cvm_constants.COLUNAS_TOTAIS.value] + log(f"Fazendo partições para o ano ------> {i}") + os.makedirs(f"/tmp/data/br_cvm_fi/{table_id}/output/", exist_ok=True) + to_partitions( df_final, partition_columns=["ano", "mes"], From ee5b4e0bd0346ae859f4d00d30d278f6e933ac15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Tue, 19 Sep 2023 16:58:55 -0300 Subject: [PATCH 3/5] feat: testing perfil mensal flow --- pipelines/datasets/br_cvm_fi/flows.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pipelines/datasets/br_cvm_fi/flows.py b/pipelines/datasets/br_cvm_fi/flows.py index 4acaca4ea..55f0c0346 100644 --- a/pipelines/datasets/br_cvm_fi/flows.py +++ b/pipelines/datasets/br_cvm_fi/flows.py @@ -385,9 +385,14 @@ default=cvm_constants.URL_PERFIL_MENSAL.value, required=False, ) + arquivos = Parameter( + "arquivos", + default=["perfil_mensal_fi_202308.csv", "perfil_mensal_fi_202307.csv"], + required=False, + ) df = extract_links_and_dates(url) - arquivos = check_for_updates(df, upstream_tasks=[df]) + # arquivos = check_for_updates(df, upstream_tasks=[df]) with case(is_empty(arquivos), True): log_task(f"Não houveram atualizações em {url.default}!") From 475fbabdee91f2beb0b7c85bb6a345ebd3aa0eb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Tue, 19 Sep 2023 17:09:58 -0300 Subject: [PATCH 4/5] feat: adding readr --- pipelines/datasets/br_cvm_fi/tasks.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pipelines/datasets/br_cvm_fi/tasks.py b/pipelines/datasets/br_cvm_fi/tasks.py index 303efc2f7..491c17f17 100644 --- a/pipelines/datasets/br_cvm_fi/tasks.py +++ b/pipelines/datasets/br_cvm_fi/tasks.py @@ -408,11 +408,11 @@ def clean_data_make_partitions_perfil(diretorio, table_id): utils.chooseCRANmirror(ind=1) # R package names packnames = "readr" - + utils.install_packages(packnames) # R vector of strings - names_to_install = [x for x in packnames if not rpackages.isinstalled(x)] - if len(names_to_install) > 0: - utils.install_packages(StrVector(names_to_install)) + # names_to_install = [x for x in packnames if not rpackages.isinstalled(x)] + # if len(names_to_install) > 0: + # Import readr readr = rpackages.importr("readr") From 0bc6f93a126d5ed6184424668dbc1f1cc443917d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arthur=20Gusm=C3=A3o?= Date: Tue, 19 Sep 2023 17:29:11 -0300 Subject: [PATCH 5/5] fix: small fix --- pipelines/datasets/br_cvm_fi/flows.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pipelines/datasets/br_cvm_fi/flows.py b/pipelines/datasets/br_cvm_fi/flows.py index 55f0c0346..4acaca4ea 100644 --- a/pipelines/datasets/br_cvm_fi/flows.py +++ b/pipelines/datasets/br_cvm_fi/flows.py @@ -385,14 +385,9 @@ default=cvm_constants.URL_PERFIL_MENSAL.value, required=False, ) - arquivos = Parameter( - "arquivos", - default=["perfil_mensal_fi_202308.csv", "perfil_mensal_fi_202307.csv"], - required=False, - ) df = extract_links_and_dates(url) - # arquivos = check_for_updates(df, upstream_tasks=[df]) + arquivos = check_for_updates(df, upstream_tasks=[df]) with case(is_empty(arquivos), True): log_task(f"Não houveram atualizações em {url.default}!")