Skip to content

Commit

Permalink
Merge branch 'master' into pre-commit-ci-update-config
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Sep 19, 2023
2 parents 0b027e0 + c006926 commit 34b219a
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 2 deletions.
2 changes: 2 additions & 0 deletions pipelines/datasets/br_cvm_fi/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,3 +558,5 @@ class constants(Enum): # pylint: disable=c0103
URL_BALANCETE = "https://dados.cvm.gov.br/dados/FI/DOC/BALANCETE/DADOS/"

ARQUITETURA_URL_BALANCETE = "https://docs.google.com/spreadsheets/d/1eIMo_hYHy89oh6kHRN9Kh0NytUZzr8__/edit#gid=1045172528"

ARQUITETURA_URL_CDA = "https://docs.google.com/spreadsheets/d/1V2XHBXBB_biC0cLoMZ3FxtbC7CPLxQXZhIY7iJDtsSw/edit#gid=0"
28 changes: 26 additions & 2 deletions pipelines/datasets/br_cvm_fi/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from rpy2.robjects.packages import importr
import rpy2.robjects.packages as rpackages
import rpy2.robjects as ro
from rpy2.robjects.vectors import StrVector
from rpy2.robjects import pandas2ri
from pipelines.datasets.br_cvm_fi.utils import (
sheet_to_df,
Expand Down Expand Up @@ -241,7 +242,7 @@ def clean_data_and_make_partitions(path: str, table_id: str) -> str:

@task
def clean_data_make_partitions_cda(diretorio, table_id):
df_arq = sheet_to_df(cvm_constants.ARQUITETURA_URL.value)
df_arq = sheet_to_df(cvm_constants.ARQUITETURA_URL_CDA.value)
anos_meses = obter_anos_meses(diretorio)

for i in anos_meses:
Expand Down Expand Up @@ -274,10 +275,13 @@ def clean_data_make_partitions_cda(diretorio, table_id):
df_final[cvm_constants.COLUNAS.value] = df_final[
cvm_constants.COLUNAS.value
].applymap(lambda x: cvm_constants.MAPEAMENTO.value.get(x, x))

df_final["CNPJ_FUNDO"] = df_final["CNPJ_FUNDO"].str.replace(r"[/.-]", "")

df_final["CNPJ_INSTITUICAO_FINANC_COOBR"] = df_final[
"CNPJ_INSTITUICAO_FINANC_COOBR"
].str.replace(r"[/.-]", "")

df_final["CPF_CNPJ_EMISSOR"] = df_final["CPF_CNPJ_EMISSOR"].str.replace(
r"[/.-]", ""
)
Expand All @@ -287,15 +291,21 @@ def clean_data_make_partitions_cda(diretorio, table_id):
)
df_final = rename_columns(df_arq, df_final)
df_final = df_final.replace(",", ".", regex=True)

df_final[cvm_constants.COLUNAS_ASCI.value] = df_final[
cvm_constants.COLUNAS_ASCI.value
].fillna("")

df_final[cvm_constants.COLUNAS_ASCI.value] = df_final[
cvm_constants.COLUNAS_ASCI.value
].applymap(limpar_string)

df_final = df_final[cvm_constants.COLUNAS_TOTAIS.value]

log(f"Fazendo partições para o ano ------> {i}")

os.makedirs(f"/tmp/data/br_cvm_fi/{table_id}/output/", exist_ok=True)

to_partitions(
df_final,
partition_columns=["ano", "mes"],
Expand Down Expand Up @@ -391,11 +401,25 @@ def clean_data_make_partitions_perfil(diretorio, table_id):
df_final = pd.DataFrame()
arquivos = glob.glob(f"{diretorio}*.csv")

# import R's utility package
utils = rpackages.importr("utils")

# select a mirror for R packages
utils.chooseCRANmirror(ind=1)
# R package names
packnames = "readr"
utils.install_packages(packnames)
# R vector of strings
# names_to_install = [x for x in packnames if not rpackages.isinstalled(x)]
# if len(names_to_install) > 0:

# Import readr

readr = rpackages.importr("readr")
for file in tqdm(arquivos):
log(f"Baixando o arquivo ------> {file}")
## reading with R

readr = rpackages.importr("readr")
df_r = readr.read_delim(
file, delim=";", locale=readr.locale(encoding="ISO-8859-1")
)
Expand Down

0 comments on commit 34b219a

Please sign in to comment.