From c090fda680895198eb4cc522951af646c07cc5c3 Mon Sep 17 00:00:00 2001 From: Jean Tozzi Date: Wed, 10 Apr 2024 12:22:05 -0300 Subject: [PATCH] add: br_ibge_pam models + code --- dbt_project.yml | 3 + .../br_ibge_pam__lavoura_permanente.sql | 12 + .../br_ibge_pam__lavoura_temporaria.sql | 12 + models/br_ibge_pam/code/api_to_json.py | 51 +++++ models/br_ibge_pam/code/json_to_parquet.py | 208 ++++++++++++++++++ models/br_ibge_pam/schema.yml | 89 ++++++++ 6 files changed, 375 insertions(+) create mode 100644 models/br_ibge_pam/br_ibge_pam__lavoura_permanente.sql create mode 100644 models/br_ibge_pam/br_ibge_pam__lavoura_temporaria.sql create mode 100644 models/br_ibge_pam/code/api_to_json.py create mode 100644 models/br_ibge_pam/code/json_to_parquet.py create mode 100644 models/br_ibge_pam/schema.yml diff --git a/dbt_project.yml b/dbt_project.yml index ce3e7cdc..196afaa8 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -168,6 +168,9 @@ models: br_ibge_ipca15: +materialized: table +schema: br_ibge_ipca15 + br_ibge_pam: + +materialized: table + +schema: br_ibge_pam br_ibge_pevs: +materialized: table +schema: br_ibge_pevs diff --git a/models/br_ibge_pam/br_ibge_pam__lavoura_permanente.sql b/models/br_ibge_pam/br_ibge_pam__lavoura_permanente.sql new file mode 100644 index 00000000..2e8757d3 --- /dev/null +++ b/models/br_ibge_pam/br_ibge_pam__lavoura_permanente.sql @@ -0,0 +1,12 @@ +{{ config(alias="lavoura_permanente", schema="br_ibge_pam") }} +select + safe_cast(ano as int64) ano, + safe_cast(sigla_uf as string) sigla_uf, + safe_cast(id_municipio as string) id_municipio, + safe_cast(produto as string) produto, + safe_cast(area_destinada_colheita as int64) area_destinada_colheita, + safe_cast(area_colhida as int64) area_colhida, + round(safe_cast(quantidade_produzida as float64), 4) quantidade_produzida, + round(safe_cast(rendimento_medio_producao as float64), 4) rendimento_medio_producao, + round(safe_cast(valor_producao as float64), 4) valor_producao +from `basedosdados-staging.br_ibge_pam_staging.lavoura_permanente` as t diff --git a/models/br_ibge_pam/br_ibge_pam__lavoura_temporaria.sql b/models/br_ibge_pam/br_ibge_pam__lavoura_temporaria.sql new file mode 100644 index 00000000..76092523 --- /dev/null +++ b/models/br_ibge_pam/br_ibge_pam__lavoura_temporaria.sql @@ -0,0 +1,12 @@ +{{ config(alias="lavoura_temporaria", schema="br_ibge_pam") }} +select + safe_cast(ano as int64) ano, + safe_cast(sigla_uf as string) sigla_uf, + safe_cast(id_municipio as string) id_municipio, + safe_cast(produto as string) produto, + safe_cast(area_plantada as int64) area_plantada, + safe_cast(area_colhida as int64) area_colhida, + round(safe_cast(quantidade_produzida as float64), 4) quantidade_produzida, + round(safe_cast(rendimento_medio_producao as float64), 4) rendimento_medio_producao, + round(safe_cast(valor_producao as float64), 4) valor_producao +from `basedosdados-staging.br_ibge_pam_staging.lavoura_temporaria` as t diff --git a/models/br_ibge_pam/code/api_to_json.py b/models/br_ibge_pam/code/api_to_json.py new file mode 100644 index 00000000..8476389e --- /dev/null +++ b/models/br_ibge_pam/code/api_to_json.py @@ -0,0 +1,51 @@ +import asyncio +import aiohttp +import json +import glob +from tqdm.asyncio import tqdm +from aiohttp import ClientTimeout, TCPConnector +from tqdm import tqdm + +API_URL_BASE = "https://servicodados.ibge.gov.br/api/v3/agregados/{}/periodos/{}/variaveis/{}?localidades={}[{}]&classificacao={}[{}]" +AGREGADO = "1613" # É a tabela no SIDRA +PERIODOS = range(1974, 2022 + 1) +VARIAVEIS = ["2313", "1002313", "216", "1000216", "214", "112","215", + "1000215"] # As variáveis da tabela +NIVEL_GEOGRAFICO = "N6" # N6 = Municipal +LOCALIDADES = "all" +CLASSIFICACAO = "82" # Código pré-definido por agregado +CATEGORIAS = ["2717", "2718", "45981", "2719", "2720", "2721", "40472", + "2722", "2723", "31619", "31620", "40473", "2724", "2725", + "2726", "2727", "2728", "2729", "2730", "2731", "2732", + "2733", "2734", "2735", "2736", "2737", "2738", "2739", + "2740", "90001", "2741", "2742", "2743", "2744", "2745", + "2746", "2747", "2748"] # Produtos +ANOS_BAIXADOS = [int(glob.os.path.basename(f).split(".")[0]) for f in glob.glob(f"../json/*.json")] +ANOS_RESTANTES = [int(ANO) for ANO in PERIODOS if ANO not in ANOS_BAIXADOS] + +async def fetch(session, url): + async with session.get(url) as response: + return await response.json() + +async def main(years, variables, categories): + for year in years: + print(f'Consultando dados do ano: {year}') + async with aiohttp.ClientSession(connector=TCPConnector(limit=100, force_close=True), timeout=ClientTimeout(total=1200)) as session: + tasks = [] + for variable in variables: # Foi preciso iterar por cada variável porque senão retorna HTTP 500 + for category in categories: + url = API_URL_BASE.format(AGREGADO, year, variable, NIVEL_GEOGRAFICO, LOCALIDADES, CLASSIFICACAO, category) + task = fetch(session, url) + tasks.append(asyncio.ensure_future(task)) + responses = [] + for future in tqdm(asyncio.as_completed(tasks), total=len(tasks)): + try: + response = await future + responses.append(response) + except asyncio.TimeoutError: + print(f"Request timed out for {url}") + with open(f'../json/{year}.json', 'a') as f: + json.dump(responses, f) + +if __name__ == "__main__": + asyncio.run(main(ANOS_RESTANTES, VARIAVEIS, CATEGORIAS)) diff --git a/models/br_ibge_pam/code/json_to_parquet.py b/models/br_ibge_pam/code/json_to_parquet.py new file mode 100644 index 00000000..917cf3f4 --- /dev/null +++ b/models/br_ibge_pam/code/json_to_parquet.py @@ -0,0 +1,208 @@ +import json +import numpy as np +import os +import re +from collections import OrderedDict +import pandas as pd +import pyarrow as pa +from pathlib import Path + +def parse_file(file_path): + dict_list = [] + + with open(file_path) as f: + json_list = json.load(f) + + for j in json_list: + temp_od = OrderedDict() + temp_variavel = j[0]["variavel"] + for r in j[0]["resultados"]: + temp_produto = list(r["classificacoes"][0]["categoria"].values())[0] + for s in r["series"]: + temp_ano = list(s["serie"].keys())[0] + temp_valor = list(s["serie"].values())[0] + temp_sigla_uf = s["localidade"]["nome"].split("-")[-1].strip() + temp_id_municipio = s["localidade"]["id"] + + temp_od["ano"] = temp_ano + temp_od["sigla_uf"] = temp_sigla_uf + temp_od["id_municipio"] = temp_id_municipio + temp_od["produto"] = temp_produto + temp_od[temp_variavel] = temp_valor + + dict_list.append(dict(temp_od)) + temp_od.clear() + return dict_list + +def join_dicts_by_keys(dict_list, keys): + result = {} + for d in dict_list: + key_tuple = tuple(d[k] for k in keys) + if key_tuple not in result: + result[key_tuple] = d + else: + result[key_tuple].update(d) + return list(result.values()) + +def create_raw_df(dict_list): + return pd.DataFrame(dict_list, dtype=str) + +def drop_columns(dataframe): + dropped_df = dataframe.drop(columns=[ + "Área destinada à colheita - percentual do total geral", + "Área colhida - percentual do total geral", + "Valor da produção - percentual do total geral", + ]) + + return dropped_df + +def rename_columns(dataframe): + renamed_dataframe = dataframe.rename(columns={ + "Área destinada à colheita": "area_destinada_colheita", + "Área colhida": "area_colhida", + "Quantidade produzida": "quantidade_produzida", + "Rendimento médio da produção": "rendimento_medio_producao", + "Valor da produção": "valor_producao" + }) + return renamed_dataframe + +def treat_columns(dataframe): + dataframe = dataframe[["ano", "sigla_uf", "id_municipio", "produto", + "area_destinada_colheita", "area_colhida", + "quantidade_produzida", "rendimento_medio_producao", + "valor_producao"]] + COLUNAS_PARA_TRATAR = ["ano", "area_destinada_colheita", "area_colhida", + "quantidade_produzida", "rendimento_medio_producao", + "valor_producao"] + + for coluna in COLUNAS_PARA_TRATAR: + dataframe[coluna] = dataframe[coluna].apply(lambda x: np.nan if x in ("-", "..", "...", "X") else x) + dataframe[coluna] = dataframe[coluna].astype("Int64") + return dataframe + +def products_weight_ratio_fix(row): + """ + 2 - A partir do ano de 2001 as quantidades produzidas dos produtos abacate, banana, + caqui, figo, goiaba, laranja, limão, maçã, mamão, manga, maracujá, marmelo, pera, + pêssego e tangerina passam a ser expressas em toneladas. Nos anos anteriores eram + expressas em mil frutos, com exceção da banana, que era expressa em mil cachos. O + rendimento médio passa a ser expresso em Kg/ha. Nos anos anteriores era expresso + em frutos/ha, com exceção da banana, que era expressa em cachos/ha. + 3 - Veja em o documento + https://sidra.ibge.gov.br/content/documentos/pam/AlteracoesUnidadesMedidaFrutas.pdf + com as alterações de unidades de medida das frutíferas ocorridas em 2001 e a tabela + de conversão fruto x quilograma. + """ + + DICIONARIO_DE_PROPORCOES = { + "Abacate": 0.38, + "Banana (cacho)": 10.20, + "Caqui": 0.18, + "Figo": 0.09, + "Goiaba": 0.16, + "Larajna": 0.16, + "Limão": 0.10, + "Maçã": 0.15, + "Mamão": 0.80, + "Manga": 0.31, + "Maracujá": 0.15, + "Marmelo": 0.19, + "Pera": 0.17, + "Pêra": 0.17, # Para garantir, pois nos dados parece que só há Pera, sem acento + "Pêssego": 0.13, + "Tangerina": 0.15, + "Melancia": 6.08, + "Melão": 1.39 + } + + if row["ano"] >= 2001: + return row + + if (pd.isna(row["quantidade_produzida"]) or pd.isna(row["area_colhida"]) + or row["quantidade_produzida"] == 0 or row["area_colhida"] == 0): + return row + + if row["produto"] not in DICIONARIO_DE_PROPORCOES.keys(): + return row + + quantidade_produzida = row["quantidade_produzida"] * DICIONARIO_DE_PROPORCOES[row["produto"]] + rendimento_medio_producao = quantidade_produzida / row["area_colhida"] * 1000 # kg / ha + + row["quantidade_produzida"] = quantidade_produzida + row["rendimento_medio_producao"] = rendimento_medio_producao + + return row + +def currency_fix(row): + """ + Valor da produção (Mil Cruzeiros [1974 a 1985, 1990 a 1992], Mil Cruzados [1986 a 1988], + Mil Cruzados Novos [1989], Mil Cruzeiros Reais [1993], Mil Reais [1994 a 2022]) + Verificado em http://www.igf.com.br/calculadoras/conversor/conversor.htm + """ + + if 1974 <= row["ano"] <= 1985: + return row["valor_producao"] / (1000**4 * 2.75) + elif 1986 <= row["ano"] <= 1988: + return row["valor_producao"] / (1000**3 * 2.75) + elif row["ano"] == 1989: + return row["valor_producao"] / (1000**2 * 2.75) + elif 1990 <= row["ano"] <= 1992: + return row["valor_producao"] / (1000**2 * 2.75) + elif row["ano"] == 1993: + return row["valor_producao"] / (1000 * 2.75) + else: + return row["valor_producao"] + +def get_existing_years(directory): + root_dir = Path(directory) + year_dirs = root_dir.glob('ano=*') + years = set() + for year_dir in year_dirs: + match = re.match(r'ano=(\d+)', year_dir.name) + if match: + year = int(match.group(1)) + years.add(year) + + return sorted(list(years)) + +if __name__ == '__main__': + ANOS_TRANSFORMADOS = get_existing_years("../parquet") + ARQUIVOS_JSON = list(Path('../json/').glob('*.json')) + JSON_FALTANTES = [arquivo for arquivo in ARQUIVOS_JSON if int(arquivo.stem) not in ANOS_TRANSFORMADOS] + + for path in JSON_FALTANTES: + print(f"Criando dict_list com base no arquivo: {path}...") + dict_list = parse_file(path) + print("Concatendo as colunas do dict_list...") + dict_list = join_dicts_by_keys(dict_list, ["ano", "sigla_uf", "id_municipio", "produto"]) + print("Criando DataFrame a partir do dict_list...") + df = create_raw_df(dict_list) + print("Removendo as colunas do DataFrame...") + df = drop_columns(df) + print("Renomeando as colunas do DataFrame...") + df = rename_columns(df) + print("Tratando as colunas do DataFrame...") + df = treat_columns(df) + print("Tratando a questão dos pesos dos produtos... Impacto em: quantidade_producao e rendimento_medio_producao") + df = df.apply(products_weight_ratio_fix, axis=1) + print("Aplicando a correção nominal retroativa da moeda... Impacto: valor_producao") + df["valor_producao"] = df.apply(currency_fix, axis=1) + print("Transformações finalizadas!") + temp_ano = df["ano"].max() + print("Deletando a coluna ano para possibilitar o particionamento...") + df.drop(columns=["ano"], inplace=True) + print("Transformações finalizadas!") + temp_export_file_path = f"../parquet/ano={temp_ano}/data.parquet" + print(f"Exportando o DataFrame particionado em {temp_export_file_path}...") + os.makedirs(os.path.dirname(temp_export_file_path), exist_ok=True) + temp_schema = pa.schema([("sigla_uf", pa.string()), + ("id_municipio", pa.string()), + ("produto", pa.string()), + ("area_destinada_colheita", pa.int64()), + ("area_colhida", pa.int64()), + ("quantidade_produzida", pa.float64()), + ("rendimento_medio_producao", pa.float64()), + ("valor_producao", pa.float64())]) + df.to_parquet(temp_export_file_path, schema=temp_schema, index=False) + del(df) # Liberando memória + print() diff --git a/models/br_ibge_pam/schema.yml b/models/br_ibge_pam/schema.yml new file mode 100644 index 00000000..5937dc0f --- /dev/null +++ b/models/br_ibge_pam/schema.yml @@ -0,0 +1,89 @@ +--- +version: 2 +models: + - name: br_ibge_pam__lavoura_temporaria + description: Área plantada, área colhida, quantidade produzida, rendimento médio + e valor da produção das lavouras temporárias + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - ano + - sigla_uf + - id_municipio + - produto + - not_null_proportion_multiple_columns: + at_least: 0.05 + columns: + - name: ano + description: Ano + tests: + - relationships: + to: ref('br_bd_diretorios_data_tempo__ano') + field: ano.ano + - name: sigla_uf + description: Sigla da Unidade da Federação + tests: + - relationships: + to: ref('br_bd_diretorios_brasil__uf') + field: sigla + - name: id_municipio + description: ID Município IBGE 7 dígitos + tests: + - relationships: + to: ref('br_bd_diretorios_brasil__municipio') + field: id_municipio + - name: produto + description: Produto + - name: area_plantada + description: Área destinada à colheita + - name: area_colhida + description: Área colhida + - name: quantidade_produzida + description: Quantidade da produção + - name: rendimento_medio_producao + description: Rendimento médio da produção + - name: valor_producao + description: Valor da produção + - name: br_ibge_pam__lavoura_permanente + description: Área destinada à colheita, área colhida, quantidade produzida, rendimento + médio e valor da produção das lavouras permanentes + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - ano + - sigla_uf + - id_municipio + - produto + - not_null_proportion_multiple_columns: + at_least: 0.05 + columns: + - name: ano + description: Ano + tests: + - relationships: + to: ref('br_bd_diretorios_data_tempo__ano') + field: ano.ano + - name: sigla_uf + description: Sigla da Unidade da Federação + tests: + - relationships: + to: ref('br_bd_diretorios_brasil__uf') + field: sigla + - name: id_municipio + description: ID Município IBGE 7 dígitos + tests: + - relationships: + to: ref('br_bd_diretorios_brasil__municipio') + field: id_municipio + - name: produto + description: Produto + - name: area_destinada_colheita + description: Área destinada à colheita + - name: area_colhida + description: Área colhida + - name: quantidade_produzida + description: Quantidade da produção + - name: rendimento_medio_producao + description: Rendimento médio da produção + - name: valor_producao + description: Valor da produção