From 395fe6dbe822032c59353f5bb61d83527e5ef2f2 Mon Sep 17 00:00:00 2001 From: Luiz Eduardo Date: Thu, 7 Nov 2024 06:46:25 -0300 Subject: [PATCH] Adicionando novas tabelas br_rf_arrecadacao --- .../br_rf_arrecadacao__cnae.sql | 36 +++ .../br_rf_arrecadacao__ir_ipi.sql | 26 +++ .../br_rf_arrecadacao__itr.sql | 22 ++ .../br_rf_arrecadacao__natureza_juridica.sql | 47 ++++ .../br_rf_arrecadacao__uf.sql | 130 +++++------ models/br_rf_arrecadacao/code/clean_cnae.py | 50 ++++ .../br_rf_arrecadacao/code/clean_functions.py | 183 +++++++++++++++ models/br_rf_arrecadacao/code/clean_ir_ipi.py | 37 +++ models/br_rf_arrecadacao/code/clean_itr.py | 45 ++++ .../code/clean_natureza_juridica.py | 50 ++++ models/br_rf_arrecadacao/code/clean_uf.py | 73 ++++++ .../br_rf_arrecadacao/code/download_data.py | 36 ++- models/br_rf_arrecadacao/schema.yml | 220 ++++++++++++++++++ 13 files changed, 879 insertions(+), 76 deletions(-) create mode 100644 models/br_rf_arrecadacao/br_rf_arrecadacao__cnae.sql create mode 100644 models/br_rf_arrecadacao/br_rf_arrecadacao__ir_ipi.sql create mode 100644 models/br_rf_arrecadacao/br_rf_arrecadacao__itr.sql create mode 100644 models/br_rf_arrecadacao/br_rf_arrecadacao__natureza_juridica.sql create mode 100644 models/br_rf_arrecadacao/code/clean_cnae.py create mode 100644 models/br_rf_arrecadacao/code/clean_functions.py create mode 100644 models/br_rf_arrecadacao/code/clean_ir_ipi.py create mode 100644 models/br_rf_arrecadacao/code/clean_itr.py create mode 100644 models/br_rf_arrecadacao/code/clean_natureza_juridica.py create mode 100644 models/br_rf_arrecadacao/code/clean_uf.py diff --git a/models/br_rf_arrecadacao/br_rf_arrecadacao__cnae.sql b/models/br_rf_arrecadacao/br_rf_arrecadacao__cnae.sql new file mode 100644 index 00000000..b6f14265 --- /dev/null +++ b/models/br_rf_arrecadacao/br_rf_arrecadacao__cnae.sql @@ -0,0 +1,36 @@ +{{ + config( + schema="br_rf_arrecadacao", + alias="cnae", + materialized="table", + partition_by={ + "field": "ano", + "data_type": "int64", + "range": {"start": 2016, "end": 2024, "interval": 1}, + }, + cluster_by=["mes"], + ) +}} + +select + safe_cast(ano as int64) ano, + safe_cast(mes as int64) mes, + safe_cast(secao_sigla as string) secao_sigla, + safe_cast(imposto_importacao as float64) imposto_importacao, + safe_cast(imposto_exportacao as float64) imposto_exportacao, + safe_cast(ipi as float64) ipi, + safe_cast(irpf as float64) irpf, + safe_cast(irpj as float64) irpj, + safe_cast(irrf as float64) irrf, + safe_cast(iof as float64) iof, + safe_cast(itr as float64) itr, + safe_cast(cofins as float64) cofins, + safe_cast(pis_pasep as float64) pis_pasep, + safe_cast(csll as float64) csll, + safe_cast(cide_combustiveis as float64) cide_combustiveis, + safe_cast(contribuicao_previdenciaria as float64) contribuicao_previdenciaria, + safe_cast(cpsss as float64) cpsss, + safe_cast(pagamento_unificado as float64) pagamento_unificado, + safe_cast(outras_receitas_rfb as float64) outras_receitas_rfb, + safe_cast(demais_receitas as float64) demais_receitas, +from `basedosdados-dev.br_rf_arrecadacao_staging.cnae` as t diff --git a/models/br_rf_arrecadacao/br_rf_arrecadacao__ir_ipi.sql b/models/br_rf_arrecadacao/br_rf_arrecadacao__ir_ipi.sql new file mode 100644 index 00000000..3982c853 --- /dev/null +++ b/models/br_rf_arrecadacao/br_rf_arrecadacao__ir_ipi.sql @@ -0,0 +1,26 @@ +{{ + config( + schema="br_rf_arrecadacao", + alias="ir_ipi", + materialized="table", + partition_by={ + "field": "ano", + "data_type": "int64", + "range": {"start": 2019, "end": 2024, "interval": 1}, + }, + cluster_by=["mes"], + ) +}} + +select + safe_cast(ano as int64) ano, + safe_cast(mes as int64) mes, + safe_cast(tributo as string) tributo, + safe_cast(decendio as string) decendio, + safe_cast(arrecadacao_bruta as float64) arrecadacao_bruta, + safe_cast(retificacao as float64) retificacao, + safe_cast(compensacao as float64) compensacao, + safe_cast(restituicao as float64) restituicao, + safe_cast(outros as float64) outros, + safe_cast(arrecadacao_liquida as float64) arrecadacao_liquida, +from `basedosdados-dev.br_rf_arrecadacao_staging.ir_ipi` as t diff --git a/models/br_rf_arrecadacao/br_rf_arrecadacao__itr.sql b/models/br_rf_arrecadacao/br_rf_arrecadacao__itr.sql new file mode 100644 index 00000000..e52f5758 --- /dev/null +++ b/models/br_rf_arrecadacao/br_rf_arrecadacao__itr.sql @@ -0,0 +1,22 @@ +{{ + config( + schema="br_rf_arrecadacao", + alias="itr", + materialized="table", + partition_by={ + "field": "ano", + "data_type": "int64", + "range": {"start": 2017, "end": 2024, "interval": 1}, + }, + cluster_by=["mes"], + ) +}} + +select + safe_cast(ano as int64) ano, + safe_cast(mes as int64) mes, + safe_cast(sigla_uf as string) sigla_uf, + safe_cast(sigla_regiao as string) sigla_regiao, + safe_cast(cidade as string) cidade, + safe_cast(valor_arrecadado as float64) valor_arrecadado, +from `basedosdados-dev.br_rf_arrecadacao_staging.itr` as t diff --git a/models/br_rf_arrecadacao/br_rf_arrecadacao__natureza_juridica.sql b/models/br_rf_arrecadacao/br_rf_arrecadacao__natureza_juridica.sql new file mode 100644 index 00000000..22f4b1be --- /dev/null +++ b/models/br_rf_arrecadacao/br_rf_arrecadacao__natureza_juridica.sql @@ -0,0 +1,47 @@ +{{ + config( + schema="br_rf_arrecadacao", + alias="natureza_juridica", + materialized="table", + partition_by={ + "field": "ano", + "data_type": "int64", + "range": {"start": 2016, "end": 2024, "interval": 1}, + }, + cluster_by=["mes"], + ) +}} + +with + referencia_codigo as ( + select + id_natureza_juridica, + substr(cast(id_natureza_juridica as string), 0, 3) as inicio_codigo + from basedosdados - staging.br_bd_diretorios_brasil.natureza_juridica + ) +select + safe_cast(t.ano as int64) ano, + safe_cast(t.mes as int64) mes, + safe_cast( + referencia_codigo.id_natureza_juridica as string + ) natureza_juridica_codigo, + safe_cast(t.imposto_importacao as float64) imposto_importacao, + safe_cast(t.imposto_exportacao as float64) imposto_exportacao, + safe_cast(t.ipi as float64) ipi, + safe_cast(t.irpf as float64) irpf, + safe_cast(t.irpj as float64) irpj, + safe_cast(t.irrf as float64) irrf, + safe_cast(t.iof as float64) iof, + safe_cast(t.itr as float64) itr, + safe_cast(t.cofins as float64) cofins, + safe_cast(t.pis_pasep as float64) pis_pasep, + safe_cast(t.csll as float64) csll, + safe_cast(t.cide_combustiveis as float64) cide_combustiveis, + safe_cast(t.contribuicao_previdenciaria as float64) contribuicao_previdenciaria, + safe_cast(t.cpsss as float64) cpsss, + safe_cast(t.pagamento_unificado as float64) pagamento_unificado, + safe_cast(t.outras_receitas_rfb as float64) outras_receitas_rfb, + safe_cast(t.demais_receitas as float64) demais_receitas, +from `basedosdados-dev.br_rf_arrecadacao_staging.natureza_juridica` as t +inner join + referencia_codigo on t.natureza_juridica_codigo = referencia_codigo.inicio_codigo diff --git a/models/br_rf_arrecadacao/br_rf_arrecadacao__uf.sql b/models/br_rf_arrecadacao/br_rf_arrecadacao__uf.sql index 5a9e30af..39f86f7f 100644 --- a/models/br_rf_arrecadacao/br_rf_arrecadacao__uf.sql +++ b/models/br_rf_arrecadacao/br_rf_arrecadacao__uf.sql @@ -1,65 +1,65 @@ -{{ - config( - schema="br_rf_arrecadacao", - alias="uf", - materialized="table", - partition_by={ - "field": "ano", - "data_type": "int64", - "range": {"start": 2000, "end": 2024, "interval": 1}, - }, - cluster_by=["mes"], - ) -}} - -select - safe_cast(ano as int64) ano, - safe_cast(mes as int64) mes, - safe_cast(sigla_uf as string) sigla_uf, - safe_cast(imposto_importacao as float64) imposto_importacao, - safe_cast(imposto_exportacao as float64) imposto_exportacao, - safe_cast(ipi_fumo as float64) ipi_fumo, - safe_cast(ipi_bebidas as float64) ipi_bebidas, - safe_cast(ipi_automoveis as float64) ipi_automoveis, - safe_cast(ipi_importacoes as float64) ipi_importacoes, - safe_cast(ipi_outros as float64) ipi_outros, - safe_cast(irpf as float64) irpf, - safe_cast(irpj_entidades_financeiras as float64) irpj_entidades_financeiras, - safe_cast(irpj_demais_empresas as float64) irpj_demais_empresas, - safe_cast(irrf_rendimentos_trabalho as float64) irrf_rendimentos_trabalho, - safe_cast(irrf_rendimentos_capital as float64) irrf_rendimentos_capital, - safe_cast(irrf_remessas_exterior as float64) irrf_remessas_exterior, - safe_cast(irrf_outros_rendimentos as float64) irrf_outros_rendimentos, - safe_cast(iof as float64) iof, - safe_cast(itr as float64) itr, - safe_cast(ipmf as float64) ipmf, - safe_cast(cpmf as float64) cpmf, - safe_cast(cofins as float64) cofins, - safe_cast(cofins_financeiras as float64) cofins_entidades_financeiras, - safe_cast(cofins_demais_empresas as float64) cofins_demais_empresas, - safe_cast(pis_pasep as float64) pis_pasep, - safe_cast( - pis_pasep_entidades_financeiras as float64 - ) pis_pasep_entidades_financeiras, - safe_cast(pis_pasep_demais_empresas as float64) pis_pasep_demais_empresas, - safe_cast(csll as float64) csll, - safe_cast(csll_financeiras as float64) csll_entidades_financeiras, - safe_cast(csll_demais_empresas as float64) csll_demais_empresas, - safe_cast( - cide_combustiveis_parcela_nao_dedutivel as float64 - ) cide_combustiveis_parcela_nao_dedutivel, - safe_cast(cide_combustiveis as float64) cide_combustiveis, - safe_cast(cpsss_1 as float64) cpsss_1, - safe_cast(cpsss_2 as float64) cpsss_2, - safe_cast(contribuicoes_fundaf as float64) contribuicao_fundaf, - safe_cast(refis as float64) refis, - safe_cast(paes as float64) paes, - safe_cast(retencoes_fonte as float64) retencoes_fonte, - safe_cast(pagamento_unificado as float64) pagamento_unificado, - safe_cast(outras_receitas_ as float64) outras_receitas_rfb, - safe_cast(demais_receitas as float64) demais_receitas, - safe_cast(receita_previdenciaria as float64) receita_previdenciaria, - safe_cast(receita_previdenciaria_propria as float64) receita_previdenciaria_propria, - safe_cast(receita_previdenciaria_demais as float64) receita_previdenciaria_demais, - safe_cast(receitas_outros_orgaos as float64) receitas_outros_orgaos, -from `basedosdados-dev.br_rf_arrecadacao_staging.uf` as t +{{ + config( + schema="br_rf_arrecadacao", + alias="uf", + materialized="table", + partition_by={ + "field": "ano", + "data_type": "int64", + "range": {"start": 2000, "end": 2024, "interval": 1}, + }, + cluster_by=["mes"], + ) +}} + +select + safe_cast(ano as int64) ano, + safe_cast(mes as int64) mes, + safe_cast(sigla_uf as string) sigla_uf, + safe_cast(imposto_importacao as float64) imposto_importacao, + safe_cast(imposto_exportacao as float64) imposto_exportacao, + safe_cast(ipi_fumo as float64) ipi_fumo, + safe_cast(ipi_bebidas as float64) ipi_bebidas, + safe_cast(ipi_automoveis as float64) ipi_automoveis, + safe_cast(ipi_importacoes as float64) ipi_importacoes, + safe_cast(ipi_outros as float64) ipi_outros, + safe_cast(irpf as float64) irpf, + safe_cast(irpj_entidades_financeiras as float64) irpj_entidades_financeiras, + safe_cast(irpj_demais_empresas as float64) irpj_demais_empresas, + safe_cast(irrf_rendimentos_trabalho as float64) irrf_rendimentos_trabalho, + safe_cast(irrf_rendimentos_capital as float64) irrf_rendimentos_capital, + safe_cast(irrf_remessas_exterior as float64) irrf_remessas_exterior, + safe_cast(irrf_outros_rendimentos as float64) irrf_outros_rendimentos, + safe_cast(iof as float64) iof, + safe_cast(itr as float64) itr, + safe_cast(ipmf as float64) ipmf, + safe_cast(cpmf as float64) cpmf, + safe_cast(cofins as float64) cofins, + safe_cast(cofins_financeiras as float64) cofins_entidades_financeiras, + safe_cast(cofins_demais_empresas as float64) cofins_demais_empresas, + safe_cast(pis_pasep as float64) pis_pasep, + safe_cast( + pis_pasep_entidades_financeiras as float64 + ) pis_pasep_entidades_financeiras, + safe_cast(pis_pasep_demais_empresas as float64) pis_pasep_demais_empresas, + safe_cast(csll as float64) csll, + safe_cast(csll_financeiras as float64) csll_entidades_financeiras, + safe_cast(csll_demais_empresas as float64) csll_demais_empresas, + safe_cast( + cide_combustiveis_parcela_nao_dedutivel as float64 + ) cide_combustiveis_parcela_nao_dedutivel, + safe_cast(cide_combustiveis as float64) cide_combustiveis, + safe_cast(cpsss_1 as float64) cpsss_1, + safe_cast(cpsss_2 as float64) cpsss_2, + safe_cast(contribuicoes_fundaf as float64) contribuicao_fundaf, + safe_cast(refis as float64) refis, + safe_cast(paes as float64) paes, + safe_cast(retencoes_fonte as float64) retencoes_fonte, + safe_cast(pagamento_unificado as float64) pagamento_unificado, + safe_cast(outras_receitas_ as float64) outras_receitas_rfb, + safe_cast(demais_receitas as float64) demais_receitas, + safe_cast(receita_previdenciaria as float64) receita_previdenciaria, + safe_cast(receita_previdenciaria_propria as float64) receita_previdenciaria_propria, + safe_cast(receita_previdenciaria_demais as float64) receita_previdenciaria_demais, + safe_cast(receitas_outros_orgaos as float64) receitas_outros_orgaos, +from `basedosdados-dev.br_rf_arrecadacao_staging.uf` as t diff --git a/models/br_rf_arrecadacao/code/clean_cnae.py b/models/br_rf_arrecadacao/code/clean_cnae.py new file mode 100644 index 00000000..a3b98b20 --- /dev/null +++ b/models/br_rf_arrecadacao/code/clean_cnae.py @@ -0,0 +1,50 @@ +import os +import numpy as np +import pandas as pd +from clean_functions import * + +def rename_columns(df): + name_dict = { + 'Ano':'ano', + 'Mês':'mes', + 'Seção - Sigla':'secao_sigla', + 'Seção - Nome':'secao_nome', + 'II':'imposto_importacao', + 'IE':'imposto_exportacao', + 'IPI':'ipi', + 'IRPF':'irpf', + 'IRPJ':'irpj', + 'IRRF':'irrf', + 'IOF':'iof', + 'ITR':'itr', + 'Cofins':'cofins', + 'Pis/Pasep':'pis_pasep', + 'CSLL':'csll', + 'Cide': 'cide_combustiveis', + 'Contribuição Previdenciária':'contribuicao_previdenciaria', + 'CPSSS':'cpsss', + 'Pagamento Unificado':'pagamento_unificado', + 'Outras Receitas Administradas':'outras_receitas_rfb', + 'Receitas Não Administradas':'demais_receitas' + } + + return df.rename(columns=name_dict) + +def change_types(df): + df['ano'] = df['ano'].astype('int') + df['mes'] = get_month_number(df['mes']) + df['secao_nome'] = df['secao_nome'].str.title() + + #All remaining columns are monetary values + for col in df.columns[4:]: + df[col] = df[col].apply(replace_commas).apply(remove_dots).astype('float') + + return df + +if __name__ == '__main__': + df = read_data(file_dir='../input/arrecadacao-cnae.csv') + df = remove_empty_columns(df) + df = remove_empty_rows(df) + df = rename_columns(df) + df = change_types(df) + save_data(df=df,file_dir='../output/br_rf_arrecadacao_cnae',partition_cols=['ano','mes']) diff --git a/models/br_rf_arrecadacao/code/clean_functions.py b/models/br_rf_arrecadacao/code/clean_functions.py new file mode 100644 index 00000000..260da075 --- /dev/null +++ b/models/br_rf_arrecadacao/code/clean_functions.py @@ -0,0 +1,183 @@ +import os +import numpy as np +import pandas as pd +from typing import List +from pathlib import Path + +file_directory = os.path.dirname(__file__) + +def read_data(file_dir,separator=';'): + data_directory = os.path.join(file_directory, file_dir) + + return pd.read_csv(data_directory, sep=separator) + +def remove_empty_rows(df): + return df.dropna(axis=0, how='all') + +def remove_empty_columns(df): + return df.drop(list(df.filter(regex='Unnamed')), axis=1) + +def replace_commas(value): + string_value = str(value) + num_commas = string_value.count(',') + if num_commas == 1: + return string_value.replace(',','.') + elif num_commas > 1: + return string_value.replace(',','',num_commas-1).replace(',','.') + else: + return string_value + +def remove_dots(value): + string_value = str(value) + num_dots = string_value.count('.') + if num_dots > 1: + return string_value.replace('.','',num_dots-1) + else: + return string_value + +def get_month_number(month_column): + + month_lower = month_column.str.lower() + month_inits = month_lower.str[:3] + + month_numbers = { + 'jan': '1', + 'fev': '2', + 'mar': '3', + 'abr': '4', + 'mai': '5', + 'jun': '6', + 'jul': '7', + 'ago': '8', + 'set': '9', + 'out': '10', + 'nov': '11', + 'dez': '12' + } + return month_inits.replace(month_numbers).astype('int') + +def get_state_letters(state_column): + + state_lower = state_column.str.lower() + states = { + "acre": "AC", + "alagoas": "AL", + "amapá": "AP", + "amazonas": "AM", + "bahia": "BA", + "ceará": "CE", + "distrito federal": "DF", + "espírito santo": "ES", + "goiás": "GO", + "maranhão": "MA", + "mato grosso": "MT", + "mato grosso do sul": "MS", + "minas gerais": "MG", + "pará": "PA", + "paraíba": "PB", + "paraná": "PR", + "pernambuco": "PE", + "piauí": "PI", + "rio de janeiro": "RJ", + "rio grande do norte": "RN", + "rio grande do sul": "RS", + "rondônia": "RO", + "roraima": "RR", + "santa catarina": "SC", + "são paulo": "SP", + "sergipe": "SE", + "tocantins": "TO" + } + return state_lower.replace(states) + +def get_region_letters(region_names): + + region_lower = region_names.str.lower() + + regions = { + "norte" : "N", + "sul" : "S", + "centro-oeste" : "CO", + "sudeste" : "SE", + "nordeste" : "NE" + } + return region_lower.replace(regions) + +def to_partitions( + data: pd.DataFrame, + partition_columns: List[str], + savepath: str, + file_type: str = "csv", +): + """Save data in to hive patitions schema, given a dataframe and a list of partition columns. + Args: + data (pandas.core.frame.DataFrame): Dataframe to be partitioned. + partition_columns (list): List of columns to be used as partitions. + savepath (str, pathlib.PosixPath): folder path to save the partitions. + file_type (str): default to csv. Accepts parquet. + Exemple: + data = { + "ano": [2020, 2021, 2020, 2021, 2020, 2021, 2021,2025], + "mes": [1, 2, 3, 4, 5, 6, 6,9], + "sigla_uf": ["SP", "SP", "RJ", "RJ", "PR", "PR", "PR","PR"], + "dado": ["a", "b", "c", "d", "e", "f", "g",'h'], + } + to_partitions( + data=pd.DataFrame(data), + partition_columns=['ano','mes','sigla_uf'], + savepath='partitions/', + ) + """ + + if isinstance(data, (pd.core.frame.DataFrame)): + savepath = Path(savepath) + # create unique combinations between partition columns + unique_combinations = ( + data[partition_columns] + # .astype(str) + .drop_duplicates(subset=partition_columns).to_dict(orient="records") + ) + + for filter_combination in unique_combinations: + patitions_values = [ + f"{partition}={value}" + for partition, value in filter_combination.items() + ] + + # get filtered data + df_filter = data.loc[ + data[filter_combination.keys()] + .isin(filter_combination.values()) + .all(axis=1), + :, + ] + df_filter = df_filter.drop(columns=partition_columns) + + # create folder tree + filter_save_path = Path(savepath / "/".join(patitions_values)) + filter_save_path.mkdir(parents=True, exist_ok=True) + + if file_type == "csv": + # append data to csv + file_filter_save_path = Path(filter_save_path) / "data.csv" + df_filter.to_csv( + file_filter_save_path, + sep=",", + encoding="utf-8", + na_rep="", + index=False, + mode="a", + header=not file_filter_save_path.exists(), + ) + elif file_type == "parquet": + # append data to parquet + file_filter_save_path = Path(filter_save_path) / "data.parquet" + df_filter.to_parquet( + file_filter_save_path, index=False, compression="gzip" + ) + else: + raise BaseException("Data need to be a pandas DataFrame") + +def save_data(df,file_dir,partition_cols): + data_directory = os.path.join(file_directory,file_dir) + to_partitions(data=df,partition_columns=partition_cols,savepath=data_directory) diff --git a/models/br_rf_arrecadacao/code/clean_ir_ipi.py b/models/br_rf_arrecadacao/code/clean_ir_ipi.py new file mode 100644 index 00000000..af7f99b2 --- /dev/null +++ b/models/br_rf_arrecadacao/code/clean_ir_ipi.py @@ -0,0 +1,37 @@ +import os +import numpy as np +import pandas as pd +from clean_functions import * + +def rename_columns(df): + name_dict = { + 'Ano':'ano', + 'Mês':'mes', + 'Tributo':'tributo', + 'Decêndio':'decendio', + 'Arrecadação Bruta':'arrecadacao_bruta', + 'Retificação':'retificacao', + 'Compensação':'compensacao', + 'Restituição':'restituicao', + 'Outros':'outros', + 'Arrecadação Líquida':'arrecadacao_liquida' + } + + return df.rename(columns=name_dict) + +def change_types(df): + df['ano'] = df['ano'].astype('int') + df['mes'] = get_month_number(df['mes']) + + #All remaining columns are monetary values + for col in df.columns[4:]: + df[col] = df[col].apply(replace_commas).apply(remove_dots).astype('float') + + return df + +if __name__ == '__main__': + df = read_data(file_dir='../input/arrecadacao-ir-ipi.csv') + df = remove_empty_rows(df) + df = rename_columns(df) + df = change_types(df) + save_data(df=df,file_dir='../output/br_rf_arrecadacao_ir_ipi',partition_cols=['ano','mes']) diff --git a/models/br_rf_arrecadacao/code/clean_itr.py b/models/br_rf_arrecadacao/code/clean_itr.py new file mode 100644 index 00000000..3a83ea92 --- /dev/null +++ b/models/br_rf_arrecadacao/code/clean_itr.py @@ -0,0 +1,45 @@ +import os +import numpy as np +import pandas as pd +from clean_functions import * + +def rename_columns(df): + name_dict = { + 'Ano':'ano', + 'Mês':'mes', + 'Unidade da Federação':'nome_uf', + 'Região Política':'regiao_politica', + 'Cidade e UF':'cidade_uf', + 'Valor':'valor_arrecadado' + } + + return df.rename(columns=name_dict) + +def change_types(df): + df['ano'] = df['ano'].astype('int') + df['mes'] = get_month_number(df['mes']) + df['valor_arrecadado'] = df['valor_arrecadado'].apply(replace_commas).apply(remove_dots).astype('float') + + return df + +def format_state(df): + df['sigla_uf'] = get_state_letters(df['nome_uf']) + return df.drop('nome_uf',axis=1) + +def format_region(df): + df['sigla_regiao'] = get_region_letters(df['regiao_politica']) + return df.drop('regiao_politica',axis=1) + +def format_city(df): + df['cidade'] = df['cidade_uf'].str.split(' - ').str[0] + return df.drop('cidade_uf',axis=1) + +if __name__ == '__main__': + df = read_data(file_dir='../input/arrecadacao-itr.csv') + df = remove_empty_rows(df) + df = rename_columns(df) + df = change_types(df) + df = format_state(df) + df = format_region(df) + df = format_city(df) + save_data(df=df,file_dir='../output/br_rf_arrecadacao_itr',partition_cols=['ano','mes']) diff --git a/models/br_rf_arrecadacao/code/clean_natureza_juridica.py b/models/br_rf_arrecadacao/code/clean_natureza_juridica.py new file mode 100644 index 00000000..df9ce70d --- /dev/null +++ b/models/br_rf_arrecadacao/code/clean_natureza_juridica.py @@ -0,0 +1,50 @@ +import os +import numpy as np +import pandas as pd +from clean_functions import * + +def rename_columns(df): + name_dict = { + 'Ano':'ano', + 'Mês':'mes', + 'Natureza Jurídica - Código':'natureza_juridica_codigo', + 'Natureza Jurídica - Nome':'natureza_juridica_nome', + 'II':'imposto_importacao', + 'IE':'imposto_exportacao', + 'IPI':'ipi', + 'IRPF':'irpf', + 'IRPJ':'irpj', + 'IRRF':'irrf', + 'IOF':'iof', + 'ITR':'itr', + 'Cofins':'cofins', + 'Pis/Pasep':'pis_pasep', + 'CSLL':'csll', + 'Cide': 'cide_combustiveis', + 'Contribuição Previdenciária':'contribuicao_previdenciaria', + 'CPSSS':'cpsss', + 'Pagamento Unificado':'pagamento_unificado', + 'Outras Receitas Administradas':'outras_receitas_rfb', + 'Receitas Não Administradas':'demais_receitas' + } + + return df.rename(columns=name_dict) + +def change_types(df): + df['ano'] = df['ano'].astype('int') + df['mes'] = get_month_number(df['mes']) + df['natureza_juridica_nome'] = df['natureza_juridica_nome'].str.title() + + #All remaining columns are monetary values + for col in df.columns[4:]: + df[col] = df[col].apply(replace_commas).apply(remove_dots).astype('float') + + return df + +if __name__ == '__main__': + df = read_data(file_dir='../input/arrecadacao-natureza.csv') + df = remove_empty_rows(df) + df = remove_empty_columns(df) + df = rename_columns(df) + df = change_types(df) + save_data(df=df,file_dir='../output/br_rf_arrecadacao_natureza_juridica',partition_cols=['ano','mes']) diff --git a/models/br_rf_arrecadacao/code/clean_uf.py b/models/br_rf_arrecadacao/code/clean_uf.py new file mode 100644 index 00000000..97098652 --- /dev/null +++ b/models/br_rf_arrecadacao/code/clean_uf.py @@ -0,0 +1,73 @@ +import os +import numpy as np +import pandas as pd +from clean_functions import * + +def rename_columns(df): + name_dict = { + 'Ano':'ano', + 'Mês':'mes', + 'UF':'sigla_uf', + 'IMPOSTO SOBRE IMPORTAÇÃO':'imposto_importacao', + 'IMPOSTO SOBRE EXPORTAÇÃO':'imposto_exportacao', + 'IPI - FUMO':'ipi_fumo', + 'IPI - BEBIDAS':'ipi_bebidas', + 'IPI - AUTOMÓVEIS':'ipi_automoveis', + 'IPI - VINCULADO À IMPORTACAO':'ipi_importacoes', + 'IPI - OUTROS':'ipi_outros', + 'IRPF':'irpf', + 'IRPJ - ENTIDADES FINANCEIRAS':'irpj_entidades_financeiras', + 'IRPJ - DEMAIS EMPRESAS':'irpj_demais_empresas', + 'IRRF - RENDIMENTOS DO TRABALHO':'irrf_rendimentos_trabalho', + 'IRRF - RENDIMENTOS DO CAPITAL':'irrf_rendimentos_capital', + 'IRRF - REMESSAS P/ EXTERIOR':'irrf_remessas_exterior', + 'IRRF - OUTROS RENDIMENTOS':'irrf_outros_rendimentos', + 'IMPOSTO S/ OPERAÇÕES FINANCEIRAS':'iof', + 'IMPOSTO TERRITORIAL RURAL':'itr', + 'IMPOSTO PROVIS.S/ MOVIMENT. FINANC. - IPMF':'ipmf', + 'CPMF':'cpmf', + 'COFINS':'cofins', + 'COFINS - FINANCEIRAS':'cofins_entidades_financeiras', + 'COFINS - DEMAIS':'cofins_demais_empresas', + 'CONTRIBUIÇÃO PARA O PIS/PASEP':'pis_pasep', + 'CONTRIBUIÇÃO PARA O PIS/PASEP - FINANCEIRAS':'pis_pasep_entidades_financeiras', + 'CONTRIBUIÇÃO PARA O PIS/PASEP - DEMAIS':'pis_pasep_demais_empresas', + 'CSLL':'csll', + 'CSLL - FINANCEIRAS':'csll_entidades_financeiras', + 'CSLL - DEMAIS':'csll_demais_empresas', + 'CIDE-COMBUSTÍVEIS (parc. não dedutível)': 'cide_combustiveis_parcela_nao_dedutivel', + 'CIDE-COMBUSTÍVEIS':'cide_combustiveis', + 'CONTRIBUIÇÃO PLANO SEG. SOC. SERVIDORES':'cpsss_1', + 'CPSSS - Contrib. p/ o Plano de Segurid. Social Serv. Público':'cpsss_2', + 'CONTRIBUICÕES PARA FUNDAF':'contribuicoes_fundaf', + 'REFIS':'refis', + 'PAES':'paes', + 'RETENÇÃO NA FONTE - LEI 10.833, Art. 30':'retencoes_fonte', + 'PAGAMENTO UNIFICADO':'pagamento_unificado', + 'OUTRAS RECEITAS ADMINISTRADAS':'outras_receitas_rfb', + 'DEMAIS RECEITAS':'demais_receitas', + 'RECEITA PREVIDENCIÁRIA':'receita_previdenciaria', + 'RECEITA PREVIDENCIÁRIA - PRÓPRIA':'receita_previdenciaria_propria', + 'RECEITA PREVIDENCIÁRIA - DEMAIS':'receita_previdenciaria_demais', + 'ADMINISTRADAS POR OUTROS ÓRGÃOS':'receitas_outros_orgaos' + } + + return df.rename(columns=name_dict) + +def change_types(df): + df['ano'] = df['ano'].astype('int') + df['mes'] = get_month_number(df['mes']) + df['sigla_uf'] = df['sigla_uf'].astype('string') + + #All remaining columns are monetary values + for col in df.columns[3:]: + df[col] = df[col].apply(replace_commas).apply(remove_dots).astype('float') + + return df + +if __name__ == '__main__': + df = read_data(file_dir='../input/arrecadacao-estado.csv') + df = remove_empty_rows(df) + df = rename_columns(df) + df = change_types(df) + save_data(df=df,file_dir='../output/br_rf_arrecadacao_uf',partition_cols=['ano','mes']) diff --git a/models/br_rf_arrecadacao/code/download_data.py b/models/br_rf_arrecadacao/code/download_data.py index 4735f6cb..0fca3a40 100644 --- a/models/br_rf_arrecadacao/code/download_data.py +++ b/models/br_rf_arrecadacao/code/download_data.py @@ -1,11 +1,25 @@ -import requests - -def download_data(url, file_path): - response = requests.get(url) - with open(file_path, 'w') as file: - file.write(response.content.decode('latin1')) - -if __name__ == "__main__": - url = "https://www.gov.br/receitafederal/dados/arrecadacao-estado.csv" - file_path = f"input/arrecadacao-estado.csv" - download_data(url, file_path) +import requests +import gzip + +def download_data(url, file_path): + response = requests.get(url) + + with open(file_path, 'w') as file: + if response.headers['Content-Type'] == 'application/gzip': + content = str(gzip.decompress(response.content),'utf-8') + else: + content = response.content.decode('latin1') + file.write(content) + +if __name__ == "__main__": + + extractions = [ + {'url':'https://www.gov.br/receitafederal/dados/arrecadacao-estado.csv','file_path':'input/arrecadacao-estado.csv'}, #Tabela uf + {'url':'https://www.gov.br/receitafederal/dados/arrecadacao-cnae.csv','file_path':'input/arrecadacao-cnae.csv'}, #Tabela cnae + {'url':'https://www.gov.br/receitafederal/dados/arrecadacao-natureza.csv','file_path':'input/arrecadacao-natureza.csv'}, #Tabela natureza_juridica + {'url':'https://www.gov.br/receitafederal/dados/arrecadacao-ir-ipi.csv','file_path':'input/arrecadacao-ir-ipi.csv'}, #Tabela ir_ipi + {'url':'https://www.gov.br/receitafederal/dados/arrecadacao-itr.csv','file_path':'input/arrecadacao-itr.csv'}, #Tabela itr + ] + + for extract in extractions: + download_data(extract['url'], extract['file_path']) diff --git a/models/br_rf_arrecadacao/schema.yml b/models/br_rf_arrecadacao/schema.yml index 821c1cff..f9d55e16 100644 --- a/models/br_rf_arrecadacao/schema.yml +++ b/models/br_rf_arrecadacao/schema.yml @@ -262,3 +262,223 @@ models: tests: - dbt_utils.not_null_proportion: at_least: 0.50 + - name: br_rf_arrecadacao__cnae + description: Contém os dados de arrecadação bruta mensal realizadas em Documento + de Arrecadação de Receitas Federais (Darf), por tributo, efetuada pelos setores + econômicos, identificados pela seção, constantes da tabela de Classificação + Nacional de Atividades Econômicas(Cnae). + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: [ano, mes, secao_sigla] + - not_null_proportion_multiple_columns: + at_least: 0.35 + columns: + - name: ano + description: Ano de referência. + tests: + - relationships: + to: ref('br_bd_diretorios_data_tempo__ano') + field: ano.ano + - name: mes + description: Mês de referência. + tests: + - relationships: + to: ref('br_bd_diretorios_data_tempo__mes') + field: mes.mes + - name: secao_sigla + description: Código da Seção da atividade econômica constante da Cnae. + tests: + - custom_relationships: + to: ref('br_bd_diretorios_brasil__cnae_2') + field: secao + ignore_values: [IN, NI, PF] + - name: imposto_importacao + description: Imposto sobre a Importação. + - name: imposto_exportacao + description: Imposto sobre a Exportação. + - name: ipi + description: Imposto sobre Produtos Industrializados. + - name: irpf + description: Imposto sobre a Renda da Pessoa Física. + - name: irpj + description: Imposto sobre a Renda da Pessoa Jurídica. + - name: irrf + description: Imposto sobre a Renda Retido na Fonte. + - name: iof + description: Imposto sobre Operações Financeiras. + - name: itr + description: Imposto sobre a Propriedade Territorial Rural. + - name: cofins + description: Contribuição para o Financiamento da Seguridade Social. + - name: pis_pasep + description: Programa de Integração Social e o Programa de Formação do Patrimônio + do Servidor Público. + - name: csll + description: Contribuição Social sobre o Lucro Líquido. + - name: cide_combustiveis + description: Contribuição de Intervenção no Domínio Econômico aplicada à combustíveis. + - name: contribuicao_previdenciaria + description: Contribuição Previdenciária arrecadada em Darf. + - name: cpsss + description: Contribuição para o Plano de Seguridade Social do Servidor Público. + - name: pagamento_unificado + description: Pagamentos referentes a vários tributos feitos em um único documento + de arrecadação, como parcelamentos, retenções, Simples etc. + - name: outras_receitas_rfb + description: Outras receitas administradas pela RFB. + - name: demais_receitas + description: Outras receitas não administradas pela RFB. + - name: br_rf_arrecadacao__natureza_juridica + description: Contém os dados de arrecadação bruta mensal realizadas em Documento + de Arrecadação de Receitas Federais (Darf), por tributo, efetuada pelos diversos + tipos de Naturezas Jurídicas. + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: [ano, mes, natureza_juridica_codigo] + - not_null_proportion_multiple_columns: + at_least: 0.15 + columns: + - name: ano + description: Ano de referência. + tests: + - relationships: + to: ref('br_bd_diretorios_data_tempo__ano') + field: ano.ano + - name: mes + description: Mês de referência. + tests: + - relationships: + to: ref('br_bd_diretorios_data_tempo__mes') + field: mes.mes + - name: natureza_juridica_codigo + description: Código do tipo de Natureza Jurídica. + tests: + - relationships: + to: ref('br_bd_diretorios_brasil__natureza_juridica') + field: id_natureza_juridica + - name: imposto_importacao + description: Imposto sobre a Importação. + - name: imposto_exportacao + description: Imposto sobre a Exportação. + - name: ipi + description: Imposto sobre Produtos Industrializados. + - name: irpf + description: Imposto sobre a Renda da Pessoa Física. + - name: irpj + description: Imposto sobre a Renda da Pessoa Jurídica. + - name: irrf + description: Imposto sobre a Renda Retido na Fonte. + - name: iof + description: Imposto sobre Operações Financeiras. + - name: itr + description: Imposto sobre a Propriedade Territorial Rural. + - name: cofins + description: Contribuição para o Financiamento da Seguridade Social. + - name: pis_pasep + description: Programa de Integração Social e o Programa de Formação do Patrimônio + do Servidor Público. + - name: csll + description: Contribuição Social sobre o Lucro Líquido. + - name: cide_combustiveis + description: Contribuição de Intervenção no Domínio Econômico aplicada à combustíveis. + - name: contribuicao_previdenciaria + description: Contribuição Previdenciária arrecadada em Darf. + - name: cpsss + description: Contribuição para o Plano de Seguridade Social do Servidor Público. + - name: pagamento_unificado + description: Pagamentos referentes a vários tributos feitos em um único documento + de arrecadação, como parcelamentos, retenções, Simples etc. + - name: outras_receitas_rfb + description: Outras receitas administradas pela RFB. + - name: demais_receitas + description: Outras receitas não administradas pela RFB. + - name: br_rf_arrecadacao__ir_ipi + description: Contém os dados de arrecadação decendial do Imposto sobre a Renda + (IR) e do Imposto sobre Produtos Industrializados (IPI), discriminados por arrecadação + bruta, os respectivos eventos que afetam essa arrecadação bruta e a arrecadação + líquida, valor este último passível de repasse aos Fundos de Participação dos + Estados e do Municípios, FPM e FPE, de acordo com dispositivo constitucional. + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: [ano, mes, tributo, decendio] + - not_null_proportion_multiple_columns: + at_least: 0.95 + columns: + - name: ano + description: Ano de referência. + tests: + - relationships: + to: ref('br_bd_diretorios_data_tempo__ano') + field: ano.ano + - name: mes + description: Mês de referência. + tests: + - relationships: + to: ref('br_bd_diretorios_data_tempo__mes') + field: mes.mes + - name: tributo + description: Identificação de Tributo (IR ou IPI). + - name: decendio + description: Decêndio de referência. + - name: arrecadacao_bruta + description: Valor da arrecadação bruta dos tributos sem nenhum acréscimo + ou dedução. É a etapa inicial da arrecadação, quando há o pagamento efetuado + pelo contribuinte. + - name: retificacao + description: Valor do saldo das retificações efetuadas nos Documentos de Arrecadação + de Receitas Federais (Darf). São alterações feitas nos documentos de arrecadação + em função de erro de preenchimento. + - name: compensacao + description: Valor do saldo das compensações efetuadas. Utilização de valores + pagos anteriormente para quitação de débitos de outros tributos, distintos + de IR ou IPI, e vice-versa. + - name: restituicao + description: Valor do saldo das restituições efetuadas pela Receita Federal + em função de pagamentos feitos a maior ou indevidos. + - name: outros + description: Demais eventos que agem sobre o valor da arrecadação bruta. + - name: arrecadacao_liquida + description: Valor resultante da arrecadação bruta, depois da incidência dos + demais eventos de arrecadação. + - name: br_rf_arrecadacao__itr + description: Contém os dados de arrecadação líquida mensal do Imposto sobre a + Propriedade Territorial Rural (ITR), realizada pelas propriedades rurais, passível + de repasse aos próprios municípios onde se localizam essas propriedades rurais, + conforme a situação do município, conveniado ou não conveniado com a União para + fiscalizar e cobrar o ITR, de acordo com dispositivo constitucional. + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: [ano, mes, sigla_uf, sigla_regiao, cidade] + - not_null_proportion_multiple_columns: + at_least: 0.95 + columns: + - name: ano + description: Ano de referência. + tests: + - relationships: + to: ref('br_bd_diretorios_data_tempo__ano') + field: ano.ano + - name: mes + description: Mês de referência. + tests: + - relationships: + to: ref('br_bd_diretorios_data_tempo__mes') + field: mes.mes + - name: sigla_uf + description: Sigla da Unidade da Federação onde se localiza o município conveniado + ou não conveniado. + tests: + - relationships: + to: ref('br_bd_diretorios_brasil__uf') + field: sigla + - name: sigla_regiao + description: Sigla da Região Política onde se localiza o município conveniado + ou não conveniado. + tests: + - relationships: + to: ref('br_bd_diretorios_brasil__regiao') + field: sigla + - name: cidade + description: Nome do município conveniado ou não conveniado. + - name: valor_arrecadado + description: Valor arrecadado.