diff --git a/pipelines/datasets/br_cvm_fi/constants.py b/pipelines/datasets/br_cvm_fi/constants.py index 88ce77ca4..edf8391c8 100644 --- a/pipelines/datasets/br_cvm_fi/constants.py +++ b/pipelines/datasets/br_cvm_fi/constants.py @@ -558,3 +558,5 @@ class constants(Enum): # pylint: disable=c0103 URL_BALANCETE = "https://dados.cvm.gov.br/dados/FI/DOC/BALANCETE/DADOS/" ARQUITETURA_URL_BALANCETE = "https://docs.google.com/spreadsheets/d/1eIMo_hYHy89oh6kHRN9Kh0NytUZzr8__/edit#gid=1045172528" + + ARQUITETURA_URL_CDA = "https://docs.google.com/spreadsheets/d/1V2XHBXBB_biC0cLoMZ3FxtbC7CPLxQXZhIY7iJDtsSw/edit#gid=0" diff --git a/pipelines/datasets/br_cvm_fi/tasks.py b/pipelines/datasets/br_cvm_fi/tasks.py index 1a51ae2e9..303efc2f7 100644 --- a/pipelines/datasets/br_cvm_fi/tasks.py +++ b/pipelines/datasets/br_cvm_fi/tasks.py @@ -242,7 +242,7 @@ def clean_data_and_make_partitions(path: str, table_id: str) -> str: @task def clean_data_make_partitions_cda(diretorio, table_id): - df_arq = sheet_to_df(cvm_constants.ARQUITETURA_URL.value) + df_arq = sheet_to_df(cvm_constants.ARQUITETURA_URL_CDA.value) anos_meses = obter_anos_meses(diretorio) for i in anos_meses: @@ -275,10 +275,13 @@ def clean_data_make_partitions_cda(diretorio, table_id): df_final[cvm_constants.COLUNAS.value] = df_final[ cvm_constants.COLUNAS.value ].applymap(lambda x: cvm_constants.MAPEAMENTO.value.get(x, x)) + df_final["CNPJ_FUNDO"] = df_final["CNPJ_FUNDO"].str.replace(r"[/.-]", "") + df_final["CNPJ_INSTITUICAO_FINANC_COOBR"] = df_final[ "CNPJ_INSTITUICAO_FINANC_COOBR" ].str.replace(r"[/.-]", "") + df_final["CPF_CNPJ_EMISSOR"] = df_final["CPF_CNPJ_EMISSOR"].str.replace( r"[/.-]", "" ) @@ -288,15 +291,21 @@ def clean_data_make_partitions_cda(diretorio, table_id): ) df_final = rename_columns(df_arq, df_final) df_final = df_final.replace(",", ".", regex=True) + df_final[cvm_constants.COLUNAS_ASCI.value] = df_final[ cvm_constants.COLUNAS_ASCI.value ].fillna("") + df_final[cvm_constants.COLUNAS_ASCI.value] = df_final[ cvm_constants.COLUNAS_ASCI.value ].applymap(limpar_string) + df_final = df_final[cvm_constants.COLUNAS_TOTAIS.value] + log(f"Fazendo partições para o ano ------> {i}") + os.makedirs(f"/tmp/data/br_cvm_fi/{table_id}/output/", exist_ok=True) + to_partitions( df_final, partition_columns=["ano", "mes"],